【100 Mistakes】golang并发的坑-2.md

原文:《 Manning.100.Go.Mistakes.and.How.to.Avoid.Them》68-74

从本文中将会学到

  • 防止 goroutine 的常见错误和channel
  • 了解标准数据结构与并发代码使用的影响
  • 使用标准库和一些扩展
  • 避免数据竞争和死锁

忘记字符串格式化作用

格式化字符串是开发者常用的操作,是否返回错误或记录消息。然而,人们很容易忘记字符串的潜在副作用,在并发应用程序中工作时进行格式化。本节将看到两个具体示例:

  • etcd 存储库数据竞争
  • 死锁

etcd数据竞争

etcd 是用 Go 实现的分布式键值存储。它被用在很多项目中,包括Kubernetes,用于存储所有集群数据。它提供了一个 API 来与集群交互。例如,Watcher接口用于通知数据变化:

type Watcher interface {
// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel.
// ...
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
Close() error
}

该 API 依赖于 gRPC 流。如果你不熟悉它,这是一项技术客户端和服务器之间不断地交换数据。服务器必须维护 使用此功能的所有客户端的列表。因此,实现了Watcher接口通过包含所有活动流的watcher结构:

type watcher struct {
// ...
// streams hold all the active gRPC streams keyed by ctx value.
streams map[string]*watchGrpcStream
}

map的key是基于调用 Watch 方法时提供的上下文:

func (w *watcher) Watch(ctx context.Context, key string,
opts ...OpOption)
 WatchChan
 {
 // ...
    ctxKey := fmt.Sprintf("%v", ctx)
 // ...
 wgs := w.streams[ctxKey]
 // ...
}

ctxKey 是map的键,根据客户端提供的上下文进行格式化。当使用值(context.WithValue)创建上下文中的字符串时,Go 将读取此上下文中的所有值。在这种情况下,etcd 开发人员发现提供给 Watch 的上下文是包含可变值的上下文(例如,指向结构的指针)在某些情况下。他们发现了一个 goroutine 的情况更新其中一个上下文值,而另一个正在执行 Watch,因此读此上下文中的所有值,这导致了一场数据竞争。 修复方法(https://Github.com/etcd-io/etcd/pull/7816):是不再依赖fmt.Sprintf 来格式化map的key,以防止遍历和读取在上下文中封装的值的链表。相反,解决方案是自定义StreamKeyFromCtx 函数从特定上下文值中提取key是不可变的。

上下文中潜在可变的值可能会引入额外的复杂性以防止数据争用。这可能是一个需要仔细考虑的设计决定。

此示例说明我们必须小心并发应用程序中用于格式化的字符串的副作用,在本例中为数据争用。在下面的示例中,我们将看到导致死锁情况的副作用。

死锁

假设我们必须处理一个可以并发访问的 Customer 结构。我们将使用sync.RWMutex来保护访问,无论是读还是写。我们将实现 UpdateAge 方法来更新客户的年龄,并检查年龄为正值。同时,我们将实现Stringer接口。 您能看出这段代码中的 Customer 结构有什么问题吗?UpdateAge 方法并实现 fmt.Stringer 接口?

type Customer struct {
    mutex sync.RWMutex
    id string
    age int
}
func (c *Customer) UpdateAge(age int) error {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    if age < 0 {
     return fmt.Errorf("age should be positive for customer %v", c)
    }
    c.age = age
 return nil
}
func (c *Customer) String() string {
    c.mutex.RLock()
    defer c.mutex.RUnlock()
    return fmt.Sprintf("id %s, age %d", c.id, c.age)
}

这里的问题可能并不简单。如果提供的年龄为负数,我们返回一个错误。因为格式错误,在接收器上使用 %s 指令,它将调用 String 方法格式化客户。但是因为 UpdateAge 已经获取了互斥锁,String 方法将无法获取它(见图 9.10)。 因此,这导致了僵局。如果所有的 goroutine 都处于睡眠状态,这会导致panic:

fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_SemacquireMutex(0xc00009818c0x10b7d000x0)
..

遇到这种情况我们应该如何处理呢?首先,它说明了单元测试很重要。在这种情况下,我们可能会认为创建一个测试年龄为负值是不值得的,因为逻辑很简单。然而,如果没有适当的测试覆盖范围,我们可能会错过这个问题。 这里可以改进的一件事是限制范围的互斥锁。在UpdateAge中,我们首先获取锁并检查输入是否有效。我们应该做相反的事情:首先检查输入,如果输入有效,则获取锁。这有减少潜在副作用的好处,但也可以有对性能的影响——只有在以下情况下,才会获取锁:

func (c *Customer) UpdateAge(age int) error {
    if age < 0 {
     return fmt.Errorf("age should be positive for customer %v", c)
    }
    c.mutex.Lock()
    defer c.mutex.Unlock()
    c.age = age
    return nil
}

在我们的例子中,仅在检查了年龄后才能获取锁,可以避免死锁情况。如果年龄为负数,则可以在不锁定的情况下调用 String函数。 但在某些情况下,限制互斥锁的范围。在这些情况下,我们必须非常小心字符串格式化。也许我们想调用另一个不尝试获取的函数互斥体,或者我们只想更改格式化错误的方式,以便它不会调用字符串方法。例如,以下代码不会导致死锁,因为我们只在访问某一个字段,则不会调用字符串方法:

func (c *Customer) UpdateAge(age int) error {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    if age < 0 {
     return fmt.Errorf("age should be positive for customer id %s", c.id)
    }
    c.age = age
    return nil
}

我们已经看到了两个具体的例子,一个是根据上下文格式化键,另一个是返回格式化结构的错误。在这两种情况下,格式化字符串导致一个问题:分别是数据争用和死锁情况。因此,在并发应用程序,我们应该对可能的副作用保持谨慎字符串格式化。

使用append时创建导致数据竞争

我们之前提到了什么是数据竞争以及其影响是什么。现在,让我们看看切片以及使用追加将元素添加到切片是否是无数据争用的。剧透?这取决于。 在下面的示例中,我们将初始化一个切片并创建两个 goroutine将使用append创建一个带有附加元素的新切片:

s := make([]int1)
go func() {
    s1 := append(s, 1)
    fmt.Println(s1)
}()
go func() {
    s2 := append(s, 1)
    fmt.Println(s2)
}()

您认为这个例子存在数据竞争吗?答案是不会。 切片是受支持的由数组组成,有两个属性:长度和容量。长度是个数切片中的可用元素,而容量是切片中元素的总数支持阵列。当我们使用append时,行为取决于切片是否是满(长度==容量)。如果是,Go 运行时会创建一个新的支持数组来添加新元素;否则,运行时会将其添加到现有的支持数组中。 在此示例中,我们使用 make([]int, 1) 创建一个切片。该代码创建一个单一长度、单一容量的切片。因此,由于切片已满,因此在每个 goroutine 中使用append会返回一个由新数组支持的切片。它不会改变现有的数组;因此,它不会导致数据竞争。 现在,让我们运行相同的示例,但初始化 s 的方式略有不同。我们不是创建长度为 1 的切片,而是创建长度为 0 但容量1:

s := make([]int01)

这个新例子怎么样?它是否包含数据竞争?答案是肯定的。

==================
WARNING: DATA RACE
Write at 0x00c00009e080 by goroutine 10:
...
Previous write at 0x00c00009e080 by goroutine 9:
...
==================

我们使用 make([]int, 0, 1) 创建一个切片。因此,数组未满。两个 goroutine尝试更新后备数组的相同索引(索引 1),这是数据竞争。 如果我们希望两个 goroutine 在切片上工作,我们如何防止数据竞争包含 s 的初始元素加上一个额外的元素?一种解决方案是创建一个s 的副本:

s := make([]int01)
go func() {
    sCopy := make([]intlen(s), cap(s))
    copy(sCopy, s)
    s1 := append(sCopy, 1)
    fmt.Println(s1)
}()
go func() {
    sCopy := make([]intlen(s), cap(s))
    copy(sCopy, s)
    s2 := append(sCopy, 1)
    fmt.Println(s2)
}()

两个 goroutine 都会复制该切片。然后他们在切片副本上使用追加,而不是原始切片。这可以防止数据竞争,因为两个 goroutine 都在隔离的状态下工作。

slices和map的数据竞争 数据竞争对slice和map的影响有多大?当我们有多个 goroutine 时,以下是正确的:

  • 使用至少一个更新值的 goroutine 访问相同的切片索引,这是数据竞争。 goroutine 访问相同的内存位置。
  • 无论操作如何,访问不同的切片索引都不是数据竞争;不同的索引意味着不同的内存位置。
  • 访问同一map结构体(无论是相同还是不同key), 至少有一个 goroutine 更新,这是数据竞争。为什么这与切片数据结构不同?map是桶数组,每个桶都是一个指向键值对数组的指针。 哈希算法用于确定桶的数组索引。由于该算法在映射初始化期间包含一些随机性,因此一次执行可能会导致相同的数组索引,而另一次执行可能不会。竞争检测器通过发出警告来处理这种情况,无论是否发生实际的数据竞争。

在并发上下文中使用切片时,我们必须记住使用append slices 并不总是无竞争的。根据切片以及它是否已满,行为会改变。如果切片已满,则追加是无竞争的。否则,多个 goroutine 可能会,竞争更新相同的数组索引,导致数据竞争。 一般来说,我们不应该根据是否有不同的实现切片已满。我们应该考虑在并发中对共享切片使用追加,应用程序可能会导致数据竞争。因此,应该避免。

对切片和映射不准确地使用锁

在数据可变且共享的并发上下文中工作时,我们通常必须使用mutex对数据结构实现受保护的访问。 常见的错误是在处理切片和映射时不准确地使用mutex。让我们看一个具体的例子并了解潜在的问题。 我们将实现一个 Cache 结构,用于处理客户余额的缓存。该结构将包含每个客户 ID 的余额映射和一个mute用于保护并发访问。

type Cache struct {
    mu sync.RWMutex
    balances map[string]float64
}

该解决方案使用sync.RWMutex来允许多个读操作,只要没有写操作。

接下来,我们添加一个 AddBalance 方法来改变余额映射。改动是在关键部分完成(在互斥锁和互斥解锁内):

func (c *Cache) AddBalance(id string, balance float64) {
    c.mu.Lock()
    c.balances[id] = balance
    c.mu.Unlock()
}

同时,我们必须实现一种方法来计算所有客户的平均余额。一个想法是以这种方式处理最小关键部分:

func (c *Cache) AverageBalance() float64 {
    c.mu.RLock()
    balances := c.balances
    c.mu.RUnlock()
    sum := 0.
    for _, balance := range balances {
     sum += balance
    }
    return sum / float64(len(balances))
}

首先,我们创建本地余额变量映射的副本。仅复制完成迭代每个余额并计算外部平均值的关键部分 关键部分。这个解决方案有效吗? 如果我们使用 -race 标志和两个并发 goroutine 运行测试,其中一个调用AddBalance(因此改变余额)和另一个调用 AverageBalance,一个数据竞争发生。这里有什么问题呢?在内部,映射是一个runtime.hmap结构,主要包含元数据(例如,计数器)和引用数据桶的指针。所以balances := c.balances,不复制实际数据。切片的原理是一样的:

s1 := []int{123}
s2 := s1
s2[0] = 42
fmt.Println(s1)

即使我们修改了 s2,打印 s1 也会返回 [42 2 3]。原因是 s2 := s1创建一个新切片:s2 具有相同的长度和相同的容量,并由与 s1 相同的数组。 回到我们的示例,我们分配一个引用的map来平衡与 c.balances 相同的数据桶。同时,两个goroutine执行操作 在同一数据集上,其中之一会对其进行变异。因此,这是一场数据竞争。我们怎样才能解决数据竞争问题?我们有两个选择。 如果迭代操作并不繁重(就是这里的情况,当我们执行增量操作时),我们应该保护整个函数:

func (c *Cache) AverageBalance() float64 {
    c.mu.RLock()
    defer c.mu.RUnlock()
    sum := 0.
    for _, balance := range c.balances {
     sum += balance
    }
    return sum / float64(len(c.balances))
}

关键部分现在包含整个函数,包括迭代。这可以防止数据竞争。 如果迭代操作不是轻量级的,另一种选择是处理实际的数据的副本并仅保护拷贝操作:

func (c *Cache) AverageBalance() float64 {
    c.mu.RLock()
    m := make(map[string]float64len(c.balances))
    for k, v := range c.balances {
     m[k] = v
    }
    c.mu.RUnlock()
    sum := 0.
    for _, balance := range m {
     sum += balance
    }
    return sum / float64(len(m))
}

一旦我们完成了深度复制,我们就释放mutex。迭代是在复制到临界区之外。 让我们考虑一下这个解决方案。我们必须对map进行两次迭代:一次复制,另一次是完成某些操作(此处为增量)。但关键部分只是map的副本。因此,当且仅当操作速度不快这种方式是比较好的。例如,如果某个操作需要调用外部数据库,这个解决方案可能会更有效。当选择一种解决方案或另一种解决方案时,因为选择取决于诸如 元素的数量和结构的平均大小。 总之,我们必须小心mutex的边界。在本节中,我们已经了解了为什么将现有map(或现有切片)分配给map是不足以防止数据竞争。新变量(无论是映射还是切片)底层都是相同的数据结构。有两种主要的解决方案可以防止这种情况发生:整个函数,或处理实际数据的副本。在所有情况下,我们都要保持谨慎设计关键部分时并确保准确定义边界。

滥用sync.WaitGroup

sync.WaitGroup是一种等待n个操作完成的机制;一般来说,我们用它来等待 n 个 goroutine 完成。我们首先回顾一下公共API;然后再介绍导致非确定性行为的相当频繁的错误。 可以使用sync.WaitGroup的零值创建等待组:

wg := sync.WaitGroup{}

在内部,sync.WaitGroup 拥有一个默认初始化为 0 的内部计数器。可以使用 Add(int) 方法增加此计数器,并使用,Done() 或Add(负值)。如果我们想等待计数器等于0,我们必须使用阻塞的 Wait() 方法。

计数器不能为负数,否则 goroutine 会出现恐慌。

在下面的例子中,我们将初始化一个sync.WaitGroup,启动三个 goroutine以原子方式更新计数器,然后等待它们完成。我们想等待,这三个 goroutine 打印计数器的值(应该是 3)。你可以猜猜这段代码是否有问题?

wg := sync.WaitGroup{}
var v uint64
for i := 0; i < 3; i++ {
    go func() {
        wg.Add(1)
        atomic.AddUint64(&v, 1)
        wg.Done()
    }()
}
wg.Wait()
fmt.Println(v)

如果我们运行这个示例,我们会得到一个不确定的值:代码可以打印任何从 0 到 3的值。此外,如果我们启用 -race 标志,Go 甚至会遇到数据竞争。我们已经使用sync/atomic包来更新v,这可能吗?这段代码有问题吗?问题是 wg.Add(1) 在新创建的 goroutine 中调用,而不是在父 Goroutine。因此,不能保证我们已向等待者表明,我们希望在调用 wg.Wait() 之前等待三个 goroutine。 在这种情况下,main goroutine 启动三个 goroutine。但最后一个 goroutine 是在前两个 Goroutine 已经调用了 wg.Done(),所以父 Goroutine 已经调用了解锁。因此,在这种情况下,当主协程读取 v 时,它等于2. 竞争检测器还可以检测对 v 的不安全访问。【100 Mistakes】golang并发的坑-2.md在处理 goroutine 时,重要的是要记住,如果没有同步,执行就不是确定性的。例如,以下代码可以打印ab 或 ba:

go func() {
 fmt.Print("a")
}()
go func() {
 fmt.Print("b")
}()

两个 goroutine 都可以分配给不同的线程,并且不能保证哪个线程将首先被执行。 CPU 必须使用内存栅栏(也称为内存屏障)来确保顺序。提供不同的同步技术来实现内存栅栏: 例如,sync.WaitGroup 在 wg.Add 和wg.Wait等。 回到我们的示例,有两个选项可以解决我们的问题。首先,我们可以在循环之前使用 3 调用 wg.Add:

wg := sync.WaitGroup{}
var v uint64
wg.Add(3)
for i := 0; i < 3; i++ {
    go func() {
    // ...
    }()
}
// ...

或者,其次,我们可以在每次循环迭代期间调用 wg.Add,然后再调用子协程:

wg := sync.WaitGroup{}
var v uint64
for i := 0; i < 3; i++ {
    wg.Add(1)
    go func() {
    // ...
    }()
}

两种解决方案都很好。如果我们最终要设置的值是等待组计数器预先知道,第一个解决方案使我们不必调用 wg.Add多次。然而,它需要确保到处使用相同的计数以避免微妙的错误。 我们要小心,不要重现 Go 开发人员所犯的这个常见错误。使用sync.WaitGroup时,必须在启动之前完成添加操作goroutine 位于父 goroutine 中,而 Done 操作必须在子goroutine 中完成。

忘记sync.Cond

在sync包中的同步原语中,sync.Cond可能是最重要的,但最少使用和理解的。然而,它提供了我们无法用channel实现的功能。本节通过一个具体示例来展示sync.Cond何时可以有帮助以及如何使用它。 本节中的示例实现了捐赠目标机制:每当达到特定目标时就会发出警报的应用程序。我们将有一个 goroutine 负责增加余额(更新程序 goroutine)。相比之下,其他 goroutine将在达到特定目标时接收更新并打印消息 (侦听器 goroutine)。例如,一个 goroutine 正在等待 10 美元的捐赠目标,而另一个正在等待 15 美元的捐赠目标。 第一个简单的解决方案使用mutex。 updater goroutine 每秒都会增加余额。另一方面,监听器 goroutine 循环直到满足其捐赠目标:

type Donation struct {
    mu sync.RWMutex
    balance int
}
donation := &Donation{}

// Listener goroutines
f := func(goal int) {
    donation.mu.RLock()
    for donation.balance < goal {
        donation.mu.RUnlock()
        donation.mu.RLock()
 }
 fmt.Printf("$%d goal reachedn", donation.balance)
 donation.mu.RUnlock()
}
go f(10)
go f(15)
// Updater goroutine
go func() {
    for {
        time.Sleep(time.Second)
        donation.mu.Lock()
        donation.balance++
        donation.mu.Unlock()
    }
}()

我们使用互斥体保护对共享的 grant.balance 变量的访问。如果我们运行这个例子,它按预期工作:

$10 goal reached
$15 goal reached

主要问题是繁忙循环,这也是导致该实现糟糕的原因。 每个监听器 goroutine 不断循环,直到达到其捐赠目标,这会浪费很多CPU 周期并使 CPU 使用率巨大。我们需要找到更好的解决方案。 让我们回退一步。我们必须找到一种从 updater goroutine 发出信号的方法,每当余额更新时。如果我们考虑 Go 中的信号传递,我们应该考虑通道。那么,让我们尝试使用通道原语的另一个版本:

type Donation struct {
    balance int
    ch chan int
}
donation := &Donation{ch: make(chan int)}
// Listener goroutines
f := func(goal int) {
    for balance := range donation.ch {
        if balance >= goal {
            fmt.Printf("$%d goal reachedn", balance)
        return
     }
 }
}
go f(10)
go f(15)
// Updater goroutine
for {
    time.Sleep(time.Second)
    donation.balance++
    donation.ch <- donation.balance
}

每个侦听器 goroutine 从共享通道接收数据。与此同时,更新程序每当余额更新时,goroutine 都会发送消息。但如果我们尝试一下这个解决方案,可能会输出以下结果:

$11 goal reached
$15 goal reached

当余额为 10 美元而不是 11 美元时,应该通知第一个 goroutine。发生了什么? 发送到通道的消息仅由一个 goroutine 接收。在我们的例子中,如果第一个 goroutine 在第二个 goroutine 之前从通道接收数据,如下图所示,可能会发生什么。【100 Mistakes】golang并发的坑-2.md默认分发模式是循环:多个 goroutine 从共享通道接收数据的。如果一个 goroutine 没有准备好接收消息(不是 在通道上处于等待状态);在这种情况下,Go 将消息分发给下一个可用的协程。 每条消息都由一个 goroutine 接收。因此,在本例中,第一个 goroutine没有收到 10 美元的消息,但第二条却收到了。只有一个通道关闭事件可以广播到多个 goroutine。但在这里我们不想关闭通道,因为这样更新程序将无法发送消息。 在这种情况下使用channel还有另一个问题。监听器 goroutine每当他们的捐赠目标达到时就返回。因此,更新协程必须知道当所有侦听器停止接收到通道的消息时。否则,通道最终会变满并阻止发送数据。一个可能的解决方案是添加一个混合使用sync.WaitGroup,但这样做会使解决方案更加复杂。 理想情况下,我们需要找到一种方法来重复广播,将差额通知到多个 goroutine。幸运的是,Go 有一个解决方案:sync.Cond。我们先来讨论一下理论;然后我们将看到如何使用它来解决我们的原始问题。 根据官方文档:

Cond 实现了一个条件变量,一个对于 goroutine的等待或者宣布事件的发生的集合点。

条件变量是等待某个特定的线程(这里是 goroutine)确定条件的容器。在我们的示例中,条件是余额更新。updater goroutine每当余额更新时广播通知,并且侦听器 goroutine等待更新。此外,sync.Cond 依赖于sync.Locker(一个 *sync.Mutex 或 *sync.RWMutex)以防止数据争用。以下是一种可能的实现:

type Donation struct {
    cond *sync.Cond
    balance int
}
donation := &Donation{
 cond: sync.NewCond(&sync.Mutex{}),
}
// Listener goroutines
f := func(goal int) {
    donation.cond.L.Lock()
    for donation.balance < goal {
     donation.cond.Wait()
    }
    fmt.Printf("%d$ goal reachedn", donation.balance)
    donation.cond.L.Unlock()
}
go f(10)
go f(15)
// Updater goroutine
for {
    time.Sleep(time.Second)
    donation.cond.L.Lock()
    donation.balance++
    donation.cond.L.Unlock()
    donation.cond.Broadcast()
}

首先,我们使用sync.NewCond 创建一个sync.Cond 并提供一个sync.Mutex。什么是监听器和更新器 goroutine? 侦听器 goroutine 会循环,直到达到捐赠余额。在循环内,我们使用 Wait 方法,该方法会阻塞直到满足条件。

让我们确保这里理解术语“条件”。在此背景下,我们谈论的是正在更新的余额,而不是捐赠目标条件。因此,它是两个侦听器 goroutine 共享的单个条件变量。

对 Wait 的调用必须发生在关键流程内,这可能听起来很奇怪。惯于锁可以防止其他 goroutine 等待相同的条件吗?事实上,Wait 的实现如下:

  1. 解锁mutex。
  2. 挂起goroutine,等待通知。
  3. 当通知到达时锁定mutex。

因此,监听器 goroutine 有两个关键部分:

  • 在for donation.balance < goal中访问grant.balance 时
  • 在fmt.Printf中访问donation.balance时,

这样,对共享的 grant.balance 变量的所有访问都受到保护。 现在,更新程序 goroutine 怎么样?余额更新在关键部分完成,以防止数据争用。然后我们调用Broadcast方法,每次更新余额时,唤醒所有等待条件的goroutine。 因此,如果我们运行这个示例,它会打印出我们期望的内容: 10

goal reached
在我们的实现中,条件变量基于正在更新的余额。因此,每次进行新的捐赠时,侦听器变量都会唤醒,以检查 是否达到了他们的捐赠目标。这个解决方案可以防止我们出现繁忙的循环,在重复检查中消耗 CPU 周期。 我们还要注意使用sync.Cond 时可能存在的一个缺点。当我们发送一个通知——例如,发送给 chan 结构——即使没有活动的接收者,消息也会被缓冲,这保证了最终会收到该通知。将sync.Cond与Broadcast方法一起使用会唤醒当前等待条件的所有goroutine;如果没有,则将错过通知。这也是一个我们必须牢记的基本原则。


Signal() vs. Broadcast()
我们可以使用 Signal() 而不是 Broadcast() 来唤醒单个 goroutine。就条款而言从语义上来说,它与以相同的非阻塞方式在 chan 结构体中发送消息: ch := make(chan struct{}) select { case ch <- struct{}{}: default: }

在golang中Signal使用channel来实现。多个 goroutine 可以捕获的唯一事件是通道关闭,但这只能发生一次;因此,向多个 goroutine 重复发送通知,sync.Cond 是一个解决方案。这原语基于条件变量,这些变量设置等待线程的容器具体情况。使用sync.Cond,我们可以广播信号来唤醒所有等待条件的 goroutine。

不使用 errgroup

无论使用哪种编程语言,重新发明轮子都不是一个好主意。代码库重新实现如何启动多个 goroutine 并聚合错误也很常见。但是 Go 生态系统中的一个包旨在支持这种频繁的用例。让我们看看它并理解为什么它应该成为Go 开发人员的工具集。 golang.org/x 是一个提供标准库扩展的存储库。这同步子存储库包含一个方便的包:errgroup。 假设我们必须处理一个函数,并且我们接收一些数据作为参数。我们想用它来调用外部服务。由于条件限制,我们无法进行单一调用;我们每次都会使用不同的子集进行多次调用。此外,这些调用是并行的(见图):【100 Mistakes】golang并发的坑-2.md如果在调用过程中出现错误,我们希望将其返回。如果出现多个错误,我们只想返回其中之一。让我们使用以下代码,仅标准并发原语来编写实现的框架:

func handler(ctx context.Context, circles []Circle) ([]Result, error) {
    results := make([]Result, len(circles))
    wg := sync.WaitGroup{}    // 创建等待组
    wg.Add(len(results))    // 声明有多少个goroutine需要管理
    for i, circle := range circles {
        i := i
        circle := circle
        go func() {
            defer wg.Done()
            result, err := foo(ctx, circle)
            if err != nil {
            // ?
            }
            results[i] = result
        }()
    }
    wg.Wait()
// ...
}

我们决定使用sync.WaitGroup来等待所有goroutines完成,并处理切片中的聚合。这是一种方法;另一个是将每个部分结果发送到通道并将它们聚合到另一个 goroutine 中。这主要的挑战是如果需要排序,则对传入消息进行重新排序。因此,我们决定采用最简单的方法和共享切片。

因为每个 goroutine 都会写入特定的索引,所以这个实现无数据竞争。

然而,有一个关键场景我们尚未解决。如果 foo (调用的在新的 goroutine 中)返回错误?我们应该如何处理呢?有各种选项,包括:

  • 就像results切片一样,我们可以在goroutine之间共享错误切片。如果发生错误,每个 goroutine 都会写入该切片。我们必须迭代父 Goroutine 中的这个切片才能确定是否发生错误(时间复杂度为 O(n))。
  • 我们可以让 goroutine 通过共享mutex来访问一个错误变量。
  • 我们可以考虑共享一个错误通道,以及父 goroutine将接收并处理这些错误。

无论选择什么选项,它都会开始使解决方案变得相当复杂。因此,设计并开发了 errgroup 包。 它导出一个 WithContext 函数,该函数返回给定上下文的 *Group 结构。该结构为一组 goroutine 提供同步、错误传播和上下文取消,并且仅导出两个方法:

  • Go 在新的 goroutine 中触发调用。
  • 等待阻塞,直到所有 goroutine 完成。它返回第一个非零错误(如果有)。

让我们使用 errgroup 重写解决方案。首先我们需要导入errgroup:

go get golang.org/x/sync/errgroup

实现如下:

func handler(ctx context.Context, circles []Circle) ([]Result, error) {
    results := make([]Result, len(circles))
    g, ctx := errgroup.WithContext(ctx)
    for i, circle := range circles {
        i := i
        circle := circle
        g.Go(func() error {
            result, err := foo(ctx, circle)
            if err != nil {
             return err
            }
            results[i] = result
            return nil
        })
    }
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

首先,我们通过提供父上下文来创建一个 *errgroup.Group。在每次迭代中, 我们使用 g.Go 来触发新 goroutine 中的调用。此方法将 func() 错误视为一个输入,用一个闭包包装对 foo 的调用并处理结果和错误。与我们第一个实现的主要区别是,如果我们收到错误,将从闭包协程中返回。然后,g.Wait 允许我们等待所有 goroutine 完成。 这个解决方案本质上比第一个解决方案更简单(这是部分的,因为我们没有处理错误)。我们不必依赖额外的并发原语,并且 errgroup.Group 足以解决我们的用例。 我们尚未解决的问题,即另一个好处是共享上下文。让我们想象一下我们必须触发三个并行调用:

  • 第一个在 1 毫秒内返回错误。
  • 第二次和第三次调用将在 5 秒内返回结果或错误。

我们希望返回一个错误(如果有)。因此,没有必要等到第二次,第三次通话已完成。使用errgroup.WithContext 创建共享上下文,在所有并行调用中使用。因为第一次调用会在 1 毫秒内返回错误, 将取消上下文,从而取消其他 goroutine。因此,我们不必等待 5 秒即可返回错误。这是使用 errgroup 时的另一个好处。

g.Go 调用的进程必须是上下文感知的。否则,取消上下文不会有任何效果。

总而言之,当我们必须触发多个 goroutine 并处理错误以及上下文传播时,可能值得考虑 errgroup 是否可以作为解决方案。 正如我们所看到的,这个包支持一组 goroutine 的同步,并且提供了处理错误和共享上下文的答案。

拷贝同步类型

sync包提供了基本的同步原语,例如mutex、Cond和WaitGroup。对于所有这些类型,有一个硬性规则需要遵循:它们永远不应该被复制。让我们了解一下原理和可能出现的问题。 我们将创建一个线程安全的数据结构来存储计数器。它将包含一个map[string]int 表示每个计数器的当前值。我们还将使用一个sync.Mutex 因为访问必须受到保护。让我们添加一个增量增加给定计数器名称的方法:

type Counter struct {
 mu sync.Mutex
 counters map[string]int
}
func NewCounter() Counter {
 return Counter{counters: map[string]int{}}
}
func (c Counter) Increment(name string) {
 c.mu.Lock()
 defer c.mu.Unlock()
 c.counters[name]++
}

增量逻辑在关键部分完成:c.mu.Lock() 和 c.mu.UnLock 。让我们尝试一下我们的方法,使用 -race 选项运行以下例子,启动两个 goroutine 并递增它们各自的计数器的示例:

counter := NewCounter()
go func() {
 counter.Increment("foo")
}()
go func() {
 counter.Increment("bar")
}()

如果我们运行这个示例,它会引发数据竞争:

==================
WARNING: DATA RACE
...

我们的 Counter 实现中的问题是互斥体被复制。因为Increment的接收者是一个值,每当我们调用Increment时,它将拷贝Counter结构体,顺带着也拷贝了锁。由此,加数将不在同一共享临界区。 不应复制同步类型。该规则适用于以下类型:

  • sync.Cond
  • sync.Map
  • sync.Mutex
  • sync.RWMutex
  • sync.Once
  • sync.Pool
  • sync.WaitGroup

因此,互斥锁不应该被复制。有哪些替代方案? 首先将 Increment 方法修改为receive类型:

func (c *Counter) Increment(name string) {
// Same code
}

更改接收器类型可以避免在调用 Increment 时复制 Counter。因此,内部互斥锁不会被复制。如果我们想保留一个值接收器,第二个选项是更改值接收器的类型Counter 中的 mu 字段为指针:

type Counter struct {
    mu *sync.Mutex
    counters map[string]int
}
func NewCounter() Counter {
    return Counter{
    mu: &sync.Mutex{},
    counters: map[string]int{},
    }
}

如果 Increment 有值接收器,它仍然会复制 Counter 结构。然而,正如现在mu是一个指针,它将仅执行指针复制,而不是sync.Mutex的实际复制。 因此,该解决方案还可以防止数据竞争。

我们还改变了 mu 的初始化方式。因为 mu 是一个指针,如果我们创建 Counter 时省略它,它将被初始化为指针的零值:零。这将导致 goroutine 在调用 c.mu.Lock() 时出现恐慌。

我们可能在以下条件下会遇到无意中复制同步字段状况:

  • 调用带有值接收器的方法(正如我们所见);
  • 使用同步参数调用函数;
  • 使用包含同步字段的参数调用函数。

在每种情况下,我们都应该保持非常谨慎。另外,注意linter 可以捕获这个问题——例如,使用 go vet:

go vet .
./main.go:19:9: Increment passes lock by value: Counter contains sync.Mutex

根据经验,每当多个 goroutine 必须访问一个公共sync元素时,我们必须确保它们都依赖于同一个实例。这条规则适用于所有同步包中定义的类型。使用指针是解决这个问题的一种方法: 我们可以有一个指向sync元素的指针,也可以有一个指向包含sync类型的元素。


原文始发于微信公众号(小唐云原生):【100 Mistakes】golang并发的坑-2.md

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/247540.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!