goroutine Pool池化技术

goroutine Pool池化技术

池化技术介绍

池化技术指的是提前准备一些资源,在需要时可以重复使用这些预先准备的资源。

在系统开发过程中,我们经常会用到池化技术。通俗的讲,池化技术就是:把一些资源预先分配好,组织到对象池中,之后的业务使用资源从对象池中获取,使用完后放回到对象池中。这样做带来几个明显的好处:

  • 资源重复使用, 减少了资源分配和释放过程中的系统消耗。比如,在IO密集型的服务器上,并发处理过程中的子线程或子进程的创建和销毁过程,带来的系统开销将是难以接受的。所以在业务实现上,通常把一些资源预先分配好,如线程池,数据库连接池,Redis连接池,HTTP连接池等,来减少系统消耗,提升系统性能。

  • 可以对资源的整体使用做限制。这个好理解,相关资源预分配且只在预分配是生成,后续不再动态添加,从而限制了整个系统对资源的使用上限。类似一个令牌桶的功能。

  • 池化技术分配对象池,通常会集中分配,这样有效避免了碎片化的问题。

池化技术简单点来说,就是提前保存大量的资源,以备不时之需。池化技术有两个特点,提前创建和重复利用。

应用场景

  • 线程池
  • 内存池
  • 连接池
  • 协称池

协称池代码实现

package main


import (
 "fmt"
 "log"
 "sync"
 "time"
)

// 创建真正执行任务的worker
type worker struct {
 workerPool chan *worker
 jobChannel chan Job
 stop       chan struct{}
}

// 开始执行任务
func (w *worker) start() {
 go func() {
  var job Job
  for {
   // worker free, add it to pool
   w.workerPool <- w

   select {
   case job = <-w.jobChannel:
    runJob(job)
   case <-w.stop:
    w.stop <- struct{}{}
    return
   }
  }
 }()
}

// 真正执行任务
func runJob(f func()) {
 defer func() {
  if err := recover(); err != nil {
    log.Printf("gpool Job panic err: %v", err)
  }
 }()

 f()
}

// 初始化worker
func newWorker(pool chan *worker) *worker {
 return &worker{
  workerPool: pool,
  jobChannel: make(chan Job),
  stop:       make(chan struct{}),
 }
}

// 接受来自客户端的任务,并等待第一个空闲的worker交付工作
type dispatcher struct {
 workerPool chan *worker
 jobQueue   chan Job
 stop       chan struct{}
}

// 分配任务
func (d *dispatcher) dispatch() {
 for {
  select {
  case job := <-d.jobQueue:
   worker := <-d.workerPool
   worker.jobChannel <- job
  case <-d.stop:
   for i := 0; i < cap(d.workerPool); i++ {
    worker := <-d.workerPool

    worker.stop <- struct{}{}
    <-worker.stop
   }

   d.stop <- struct{}{}
   return
  }
 }
}

// 初始化分配器
func newDispatcher(workerPool chan *worker, jobQueue chan Job) *dispatcher {
 d := &dispatcher{
  workerPool: workerPool,
  jobQueue:   jobQueue,
  stop:       make(chan struct{}),
 }

 for i := 0; i < cap(d.workerPool); i++ {
  worker := newWorker(d.workerPool)
  worker.start()
 }

 go d.dispatch()
 return d
}


type Job func()

type Pool struct {
 JobQueue   chan Job
 dispatcher *dispatcher
 wg         sync.WaitGroup
}

// NewPool 初始化goroutine Pool
func NewPool(numWorkers int, jobQueueLen int) *Pool {
 jobQueue := make(chan Job, jobQueueLen)
 workerPool := make(chan *worker, numWorkers)

 pool := &Pool{
  JobQueue:   jobQueue,
  dispatcher: newDispatcher(workerPool, jobQueue),
 }

 return pool
}

// 打包任务
func (p *Pool) wrapJob(job func()) func() {
 return func() {
  defer p.JobDone()
  job()
 }
}

func (p *Pool) SendJobWithTimeout(job func(), t time.Duration) bool {
 select {
 case <-time.After(t):
  return false
 case p.JobQueue <- p.wrapJob(job):
  p.WaitCount(1)
  return true
 }
}

func (p *Pool) SendJobWithDeadline(job func(), t time.Time) bool {
 s := t.Sub(time.Now())
 if s <= 0 {
  s = time.Second // timeout
 }
 select {
 case <-time.After(s):
  return false
 case p.JobQueue <- p.wrapJob(job):
  p.WaitCount(1)
  return true
 }
}

// SendJob 发送任务
func (p *Pool) SendJob(job func()) {
 p.WaitCount(1)
 p.JobQueue <- p.wrapJob(job)
}


func (p *Pool) JobDone() {
 p.wg.Done()
}


func (p *Pool) WaitCount(count int) {
 p.wg.Add(count)
}

// WaitAll 等待所有goroutine退出
func (p *Pool) WaitAll() {
 p.wg.Wait()
}

// Release 释放资源
func (p *Pool) Release() {
 p.dispatcher.stop <- struct{}{}
 <-p.dispatcher.stop
}

func main() {
 // 初始化 10个worker(goroutine) 任务队列长度是1000
    var pool = NewPool(10, 1000)

    pool.SendJobWithTimeout(func() {
  fmt.Println("SendJobWithTimeout")
 }, 2*time.Second)

    // 发送任务
 pool.SendJob(func() {
  fmt.Println("send job")
 })


 pool.SendJobWithDeadline(func() {
  fmt.Println("SendJobWithDeadline")
 }, time.Now().Add(time.Second*3))

    // 等待资源释放和退出
 pool.WaitAll()
    pool.Release()
}


原文始发于微信公众号(堆栈future):goroutine Pool池化技术

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/103526.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
半码博客——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!