Golang:深入理解 errgroup

导读:本篇文章讲解 Golang:深入理解 errgroup,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

深入理解 errgroup

原文,此处只是做记录学习使用

前言

我们平时使用go来并发操作时,在主协程需要等待其他协成完成后才会继续执行的情况,我们会使用sync.WaitGroup 来进行操作。

  • 在一个 goroutine 需要等待多个 goroutine 完成和多个 goroutine 等待一个 goroutine 干活时都可以解决问题。

虽然 WaitGroup 已经做了很好的封装,但是仍然存在一些问题,例如如果需要返回错误,或者只要一个 goroutine 出错我们就不再等其他 goroutine 了,减少资源浪费,这些 WaitGroup 都不能很好的解决,这时候就派出本文的选手 errgroup 出场了。

函数签名

type Group
    func WithContext(ctx context.Context) (*Group, context.Context)
    func (g *Group) Go(f func() error)
    func (g *Group) Wait() error

整个包就一个 Group 结构体

  • 通过 WithContext 可以创建一个带取消的 Group
  • 当然除此之外也可以零值的 Group 也可以直接使用,但是出错之后就不会取消其他的 goroutine 了
  • Go 方法传入一个 func() error 内部会启动一个 goroutine 去处理
  • Wait 类似 WaitGroup 的 Wait 方法,等待所有的 goroutine 结束后退出,返回的错误是一个出错的 err

源码

Group

type Group struct {
    // context 的 cancel 方法
 cancel func()

    // 复用 WaitGroup
 wg sync.WaitGroup

 // 用来保证只会接受一次错误
 errOnce sync.Once
    // 保存第一个返回的错误
 err     error
}

WithContext

func WithContext(ctx context.Context) (*Group, context.Context) {
 ctx, cancel := context.WithCancel(ctx)
 return &Group{cancel: cancel}, ctx
}

WithContext 就是使用 WithCancel 创建一个可以取消的 context 将 cancel 赋值给 Group 保存起来,然后再将 context 返回回去

注意这里有一个坑,在后面的代码中不要把这个 ctx 当做父 context 又传给下游,因为 errgroup 取消了,这个 context 就没用了,会导致下游复用的时候出错

Go

func (g *Group) Go(f func() error) {
 g.wg.Add(1)

 go func() {
  defer g.wg.Done()

  if err := f(); err != nil {
   g.errOnce.Do(func() {
    g.err = err
    if g.cancel != nil {
     g.cancel()
    }
   })
  }
 }()
}

Go 方法其实就类似于 go 关键字,会启动一个携程,然后利用 waitgroup 来控制是否结束,如果有一个非 nil 的 error 出现就会保存起来并且如果有 cancel 就会调用 cancel 取消掉,使 ctx 返回

Wait

func (g *Group) Wait() error {
 g.wg.Wait()
 if g.cancel != nil {
  g.cancel()
 }
 return g.err
}

Wait 方法其实就是调用 WaitGroup 等待,如果有 cancel 就调用一下

案例

基于 errgroup 实现一个 http server 的启动和关闭 ,以及 os signal 信号的注册和处理,要保证能够 一个退出,全部注销退出。

func main() {
 g, ctx := errgroup.WithContext(context.Background())

 mux := http.NewServeMux()
 mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
  w.Write([]byte("pong"))
 })

 // 模拟单个服务错误退出
 serverOut := make(chan struct{})
 mux.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) {
  serverOut <- struct{}{}
 })

 server := http.Server{
  Handler: mux,
  Addr:    ":8080",
 }

 // g1
 // g1 退出了所有的协程都能退出么?
 // g1 退出后, context 将不再阻塞,g2, g3 都会随之退出
 // 然后 main 函数中的 g.Wait() 退出,所有协程都会退出
 g.Go(func() error {
  return server.ListenAndServe()
 })

 // g2
 // g2 退出了所有的协程都能退出么?
 // g2 退出时,调用了 shutdown,g1 会退出
 // g2 退出后, context 将不再阻塞,g3 会随之退出
 // 然后 main 函数中的 g.Wait() 退出,所有协程都会退出
 g.Go(func() error {
  select {
  case <-ctx.Done():
   log.Println("errgroup exit...")
  case <-serverOut:
   log.Println("server will out...")
  }

  timeoutCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  // 这里不是必须的,但是如果使用 _ 的话静态扫描工具会报错,加上也无伤大雅
  defer cancel()

  log.Println("shutting down server...")
  return server.Shutdown(timeoutCtx)
 })

 // g3
 // g3 捕获到 os 退出信号将会退出
 // g3 退出了所有的协程都能退出么?
 // g3 退出后, context 将不再阻塞,g2 会随之退出
 // g2 退出时,调用了 shutdown,g1 会退出
 // 然后 main 函数中的 g.Wait() 退出,所有协程都会退出
 g.Go(func() error {
  quit := make(chan os.Signal, 0)
  signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

  select {
  case <-ctx.Done():
   return ctx.Err()
  case sig := <-quit:
   return errors.Errorf("get os signal: %v", sig)
  }
 })

 fmt.Printf("errgroup exiting: %+v\n", g.Wait())
}

这里主要用到了 errgroup 一个出错,其余取消的能力

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/64667.html

(0)
小半的头像小半

相关推荐

半码博客——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!