让消费数据处理更快版本2(有并发控制)-一次性并发获取或者初始化任务最快有效方式

让消费数据处理更快版本2

一次性获取或者初始化所有数据并发模式

上个版本Go让消费速度更快我们重点有个LOOP,那就表明它是在后台一直运行的,那如果我们想一次性执行并发获取任务或者初始化任务,那该如何做呢?里面又有哪些需要注意的细节呢?show me the code,don‘t just say say!

代码

package main

import (
 "context"
 "fmt"
 "time"
)

// TaskRunner 运行task
type TaskRunner struct {
}

// Task 任务对象
type Task struct {
 Name  string
 Age  string
 IDCard  string
 Addr    string
}

func (taskRunner *TaskRunner)getConcurrency() int {
 //qps, _ := config.YAMLInt("qps") // 从配置文件获取qps
  qps := 5 //这里先写死 上线替换成从配置文件获取
 if qps <= 0 {
  return 5
 }
 return qps
}

func  (taskRunner *TaskRunner)Run(ctx context.Context, task Task) {
 // 这里你就可以对task做处理
 // 比如请求第三方接口
 // 比如插入数据
 // 比如通过task查询数据然后组装数据发送给kafka等消息队列
 // 比如发送通知等

 fmt.Println("消费结束", task.Name)
}

func (taskRunner *TaskRunner)initTask(ctx context.Context, tasks []Task) (interface{}, error) {
 if len(tasks) == 0 {
  return nil, nil
 }
 
 // 获取并发
 channelSize := taskRunner.getConcurrency()
 // 初始化并发pool
 pool := make(chan Task, channelSize)

 // 初始化并发goroutine
 for i := 0; i < channelSize; i++ {
  go func(id int) {
   start := time.Now()
   fmt.Println("初始化任务开始, runner_id=", id)
   for v := range pool { // 消费task
    taskRunner.Run(ctx, v)
   }
   fmt.Println("初始化任务结束, runner_id=", id, " duration=", time.Since(start))
  }(i)
 }

 // 塞数据
 go func() {
  for _, v := range tasks {
   pool <- v // 当然在往pool赛数据之前 你可以针对task v做些额外的初始化或者改变值行为 比如v.Addr="北京"改为v.Addr="上海"
  }
  close(pool)
 }()
 return tasks, nil
}

func main() {
 // 测试demo
 var tasks []Task
 for i:=0; i< 10; i++ {
  task := Task{
   Name: fmt.Sprintf("我是第%d个任务", i+1),
  }
  tasks = append(tasks, task)
 }

 runner := TaskRunner{}
 runner.initTask(context.Background(), tasks)
 time.Sleep(100*time.Second)
}

运行结果:

初始化任务开始, runner_id= 2
消费结束 我是第1个任务
消费结束 我是第2个任务
消费结束 我是第3个任务
消费结束 我是第4个任务
消费结束 我是第5个任务
消费结束 我是第6个任务
消费结束 我是第7个任务
消费结束 我是第8个任务
消费结束 我是第9个任务
消费结束 我是第10个任务
初始化任务结束, runner_id= 2  duration= 39.417µs
初始化任务开始, runner_id= 4
初始化任务结束, runner_id= 4  duration= 1.166µs
初始化任务开始, runner_id= 0
初始化任务开始, runner_id= 1
初始化任务开始, runner_id= 3
初始化任务结束, runner_id= 1  duration= 69.042µs
初始化任务结束, runner_id= 3  duration= 15.041µs
初始化任务结束, runner_id= 0  duration= 58.042µs

小结

这套模板主要是在web项目接口中或者说一直运行的异步任务中针对一次性初始化任务比较常见,大家可以拿来直接用,简单高效,欢迎有兴趣的同学一起交流哈。

– END –


原文始发于微信公众号(堆栈future):让消费数据处理更快版本2(有并发控制)-一次性并发获取或者初始化任务最快有效方式

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/103494.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之家——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!