Golang实现并行计算

Golang实现并行计算

代码地址: https://Github.com/gofish2020/easymr

为什么需要 MapReduce

在实际的业务场景中我们常常需要从不同的 rpc 服务中获取相应属性来组装成复杂对象。比如要查询商品详情:

  • 商品服务-查询商品属性
  • 库存服务-查询库存属性
  • 价格服务-查询价格属性
  • 营销服务-查询营销属性

如果是串行调用的话响应时间会随着 rpc 调用次数呈线性增长,所以我们要优化性能一般会将串行改并行。

简单的场景下使用 waitGroup 也能够满足需求,但是如果我们需要对数据进行校验、数据加工转换、数据汇总呢?继续使用 waitGroup 就有点力不从心了。

场景一:从多个服务获取产品数据

func productDetail(uid, pid int64) (*ProductDetail, error) {
var pd ProductDetail
err := mr.Finish(func() (err error) {
pd.User, err = userRpc.User(uid)
return
}, func() (err error) {
pd.Store, err = storeRpc.Store(pid)
return
}, func() (err error) {
pd.Order, err = orderRpc.Order(pid)
return
})

if err != nil {
log.Printf("product detail error: %v", err)
return nil, err
}

return &pd, nil
}

场景二:有一批uids数据需要进行清洗

func TestMapReduce(t *testing.T) {

uids := []int64{1, 2, 3, 4, 5, 6, 79}
r, err := MapReduce(func(source chan<- interface{}) {
for _, uid := range uids {
source <- uid
}
}, func(item interface{}, writer Writer, cancel func(error)) {
uid := item.(int64)
ok, err := check(uid)
if err != nil {
cancel(err)
}
if ok {
writer.Write(uid)
}
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var uids []int64
for p := range pipe {
uids = append(uids, p.(int64))
}
writer.Write(uids)
})

assert.Nil(t, err)
assert.Equal(t, []int64{79}, r)

}

func check(uid int64) (bool, error) {
// do something check user legal
if uid > 6 {
return true, nil
}
return false, nil
}

MapReduce代码架构

  • generate对应代码中的buildSource函数逻辑
  • mapper对应代码中的executeMappers函数逻辑
  • reducer对应代码中的go func() {... reducer(collector, writer, cancel) ...}逻辑Golang实现并行计算

代码解读

这里以 func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error)方法为例讲解:

  • generate GenerateFunc 用户实现的生成数据函数
  • mapper MapperFunc 用户实现的处理数据函数
  • reducer ReducerFunc 用户实现的数据汇总函数
  • opts ...Option基本配置项(用来配置处理协程的数量)
// MapReduce 从source中读取数据,通过mapper并发处理后,将中间结果保存到collector中,并通过reducer函数对中间结果进行汇总,返回
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {
// 创建source通道
source := buildSource(generate)
// 核心处理方法
return MapReduceWithSource(source, mapper, reducer, opts...)
}

buildSource 函数

  • buildSource 创建 source通道,并在协程中调用generate函数,在generate函数中,会将数据写入到source中;
  • 最后在defer中关闭 source表示数据全部发送完成
func buildSource(generate GenerateFunc) chan interface{} {
source := make(chan interface{})

// 启动协程,调用generate函数生产数据到 source通道中
threading.GoSafe(func() {
// generate函数执行完毕后,关闭source
defer close(source)
// generate内部,负责将数据写入到source中
generate(source)
})

return source
}

MapReduceWithSource 函数

建议参考上面的图进行理解

  • 创建 output := make(chan interface{})通道,用来存储汇总后的最终结果
  • 创建 collector := make(chan interface{}, options.workers)通道,用来存储 mapper加工后的中间结果
  • 启动协程go func(){ reducer(collector, writer, cancel) } 目的在于监听 collector通道,汇总结果保存到output通道中(协程的正常退出,需要collector的关闭)
  • 启动协程 go executeMappers 目的在于监听source通道,将加工的中间结果保存到collector通道中;
  • 最后一直阻塞在output通道处,等待最终的结果的输出

func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {

// 1.配置协程数
options := buildOptions(opts...)
// 2.输出结果
output := make(chan interface{})

defer func() {
// 限定输出结果只能有一个,不能往output中多次写入
for range output {
panic("more than one element written in reducer")
}
}()

// 3.mapper处理的中间结果,存储collector通道中
collector := make(chan interface{}, options.workers)

done := syncx.NewDoneChan() // 理解为:用通道作为关闭标识

writer := newGuardedWriter(output, done.Done()) // writer就是指代的output通道

var closeOnce sync.Once // 目的:在finish中只执行一次关闭
var retErr error

finish := func() {

closeOnce.Do(func() {
done.Close() //理解为:打开了关闭标识
close(output) // 关闭output可以避免,如果没有向output中写入数据,output最后不会阻塞
})
}

cancel := once(func(err error) { // 取消操作
// 设置错误信息
if err != nil {
retErr = err
} else {
retErr = ErrCancelWithNil
}
drain(source) //一旦取消,source中的数据就不会被消费,那么生产协程就会一直阻塞中(避免内存泄漏,让生产协程能正常退出)
finish()
})

// 4. 本协程的目的就是从collector通道中读取数据,在reducer函数中进行最后的汇总后,写入到writer中(即:output)
go func() {
defer func() {
// drain函数:如果collector不关闭,等待
drain(collector) // (这里也是防御编程,避免在reducer中忘记读取collector数据)
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
}()
// writer就是指代的output通道
reducer(collector, writer, cancel)
}()

// 5. 从source中读取数据,并启动最多workers个子协程并行处理,处理后到结果保存到collector中
go executeMappers(func(item interface{}, w Writer) {
// item就是从source中获取的数据
// w 指代的是collector
mapper(item, w, cancel)
}, source, collector, done.Done(), options.workers)

// 这里是阻塞中(解除阻塞条件:要么关闭output,要么向output写入数据)
value, ok := <-output
if err := retErr; err != nil {
return nil, err
} else if ok {
return value, nil
} else {
return nil, ErrReduceNoOutput
}
}

drain 函数

drain函数的目的是让阻塞在channel上的协程,能正常的退出

// drain 函数有两种含义:
// 1.一直阻塞直到 channel关闭
// 2.不断的读取channel中的数据,让写入端可以一直写入(直到没有新数据写入)
func drain(channel <-chan interface{}) {
for range channel {
}
}

请参考这里的代码,理解drain函数

func TestDrain(t *testing.T) {

ch := make(chan interface{})
// 启动3个协程
for i := 0; i < 3; i++ {
go func(val int) {
ch <- val
fmt.Println(val)
}(i)
}
// 如果不调用drain,ch就一直不被读取,那么上面的3个协程就一直阻塞在ch上,无法退出(造成内存泄漏)
drain(ch)
}

executeMappers 函数

开启for死循环,一直从input中读取数据item,并行开启多个协程调用mapper函数处理,并将结果保存到 collector 中; 这里的input就是source; 当处理完成source中的数据后,需要将close(collector),因为 reducer在一直监视着collector通道,不关闭的话,reducer就无法终止(因为drain(collector)函数)

// executeMappers 从input中读取数据item,并行开启多个协程处理,并将结果保存到 collector 中
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
done <-chan PlaceholderType, workers int)
{

var wg sync.WaitGroup

// 只有source or done 关闭了,defer才会执行
defer func() {
wg.Wait() // 等协程处理完数据
close(collector) // 关闭collector,reducer协程才会退出
}()

// 目的:限定启动的协程的最大数量
pool := make(chan PlaceholderType, workers)

// 这里的writer指代的是collector
writer := newGuardedWriter(collector, done)

// 这里是一个死循环,负责消费source中的数据(和buildSource函数呼应)
for {
select {
case <-done: // done关闭
return
case pool <- Placeholder: // 最多启动 workers个协程处理
item, ok := <-input
if !ok { //source被关闭(buildSource函数中,发送完数据后会关闭source)
<-pool
return
}
wg.Add(1)

// 协程
threading.GoSafe(func() {
defer func() {
wg.Done()
<-pool
}()
// source中的item数据,处理完成的结果保存到 collector中
mapper(item, writer)
})
}
}
}


原文始发于微信公众号(nullbody笔记):Golang实现并行计算

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/223606.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!