Limit Concurrency你真的会吗

100个任务起10个goroutine去消费

一. 按照正常思路来一个架构设计

1. 基础知识必备

  • sync.WaitGroup
  • chan
  • go func

2. 原始消费架构模式设计

package main

import (
 "fmt"
 "math/rand"
 "sync"
 "time"
)

func main() {
 const concurrencyProcesses = 10 // 限制最大并发处理数量
 const jobCount = 100 // 有多少任务

 var wg sync.WaitGroup
 wg.Add(jobCount)
 found := make(chan int)
 limitCh := make(chan struct{}, concurrencyProcesses) // 用chan去控制并发数量

 
 for i := 0; i < jobCount; i++ { //处理100个job
  limitCh <- struct{}{} // 控制并发数量
  go func(val int) {
   defer func() {
    wg.Done()
    <-limitCh
   }()
   waitTime := rand.Int31n(1000)
   fmt.Println("job:", val, "wait time:", waitTime, "millisecond")
   time.Sleep(time.Duration(waitTime) * time.Millisecond)
   found <- val
  }(i)
 }
 
 go func() { // 等待goroutine结束并且关闭结果chan
  wg.Wait()
  close(found)
 }()
 
 var results []int
 for p := range found { // 从结果chan获取数据
  fmt.Println("Finished job:", p)
  results = append(results, p)
 }

 fmt.Println("result:", results)
}

大家猜猜结果是怎么样的?

3. 公布结果

Limit Concurrency你真的会吗

为什么deadlock?

4. 究其原因

这就是Limit Concurrency问题,即限制并发问题

在读取第一个job的时候,现将空的struct放进limitCh中,这个时候limitCh中就只剩下9个可以继续处理,接着重复这个步骤继续放入job,直到10个job装满limitCh。但是当第11个job需要处理的时候,程序就直接停止在limitCh <- struct{}{}处,因为前10个goroutine不知道什么时候才能处理结束,这样第11个job它后面的代码就完全没机会执行,造成整个系统deadlock

但是如果你的job数量小于等于10,是完全看不出来任何问题的,系统可以正常运行,只有job数量大于并发数量的时候才会出这个BUG。

二. 按照BUG再设计一版

相信大家想到了一种解决方式,既然程序是卡在limitCh <- struct{}{},那么就直接将这段处理逻辑丢到go func中处理就好了。

    ...
    for i := 0; i < jobCount; i++ {
        go func() { //丢到go func中去处理
            limitCh <- struct{}{}
        }()
        go func(val int) {
        ...

结果:Limit Concurrency你真的会吗

发现100个job同时处理直到结束,并没有达到限制并发处理的要求,虽然满足最终把这些job处理完成,但是我们要的可是Limit Concurrency

三. 最佳实践,也是大家拿来就可以用的限制并发框架模式

既然要限制并发数量,那么就建立特定数量的worker,每个worker读取chan就可以了,所以

第一步就是先建立queue的通道,将所有job都放入queue中,但是以go func方式去处理,避免阻塞main程序。

第二步就是建立特定的worker数量来消化全部的job。

package main

import (
 "fmt"
 "math/rand"
 "sync"
 "time"
)

func main() {
 const concurrencyProcesses = 10 // 限制最大并发处理数量
 const jobCount = 100 // 有多少任务

 var wg sync.WaitGroup
 wg.Add(jobCount)
 found := make(chan int)
 queue := make(chan int)

 go func(queue chan<- int) { // 生产端负责生产任务到queue
  for i := 0; i < jobCount; i++ {
   queue <- i
  }
  close(queue)
 }(queue)

 for i := 0; i < concurrencyProcesses; i++ { // 当前只有10个并发 他们做的事情就是处理任务 并且将结果set in found
  go func(queue <-chan int, found chan<- int) {
   for val := range queue {
    defer wg.Done()
    waitTime := rand.Int31n(1000)
    fmt.Println("job:", val, "wait time:", waitTime, "millisecond")
    time.Sleep(time.Duration(waitTime) * time.Millisecond)
    found <- val
   }
  }(queue, found) // 传递进去 防止data race
 }

 go func() { // 等待goroutine结束并且关闭结果chan
  wg.Wait()
  close(found)
 }()

 var results []int
 for p := range found { // 从结果chan获取数据
  fmt.Println("Finished job:", p)
  results = append(results, p)
 }

 fmt.Println("result:", results)
}

可以看到这里的for循环是以concurrencyProcesses为并发数量去处理job了。10个goroutine通过内层for循环不断读取chan中的job,直到queue中没有job为止,这样生产端生产完job之后会关闭queue,那么goroutine最后收到关闭消息后退出for循环,结束goroutine。

可以看下结果:Limit Concurrency你真的会吗

小结

其实还有很多常见的其他限制并发处理模式,但是我喜欢用上面这种处理模式。这里有个细节需要大家注意哈,就是控制并发的时候要处理好数据竞争(data race)问题,如果处理不好,可能最后结果不一定是对的。

– END –


原文始发于微信公众号(堆栈future):Limit Concurrency你真的会吗

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/103546.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!