脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服务器之家 - 脚本之家 - Golang - Golang协程池gopool设计与实现

Golang协程池gopool设计与实现

2022-09-20 18:14ag9920 Golang

本文主要介绍了Golang协程池gopool设计与实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

Goroutine

Goroutine 是 Golang 提供的一种轻量级线程,我们通常称之为「协程」,相比较线程,创建一个协程的成本是很低的。所以你会经常看到 Golang 开发的应用出现上千个协程并发的场景。

Goroutine 的优势:

  • 与线程相比,Goroutines 成本很低。

它们的堆栈大小只有几 kb,堆栈可以根据应用程序的需要增长和缩小,context switch 也很快,而在线程的情况下,堆栈大小必须指定并固定。

  • Goroutine 被多路复用到更少数量的 OS 线程。

一个包含数千个 Goroutine 的程序中可能只有一个线程。如果该线程中的任何 Goroutine 阻塞等待用户输入,则创建另一个 OS 线程并将剩余的 Goroutine 移动到新的 OS 线程。所有这些都由运行时处理,作为开发者无需耗费心力关心,这也使得我们有很干净的 API 来支持并发。

  • Goroutines 使用 channel 进行通信。

channel 的设计有效防止了在使用 Goroutine 访问共享内存时发生竞争条件(race conditions) 。channel 可以被认为是 Goroutine 进行通信的管道。

下文中我们会以「协程」来代指 Goroutine。

协程池

在高并发场景下,我们可能会启动大量的协程来处理业务逻辑。协程池是一种利用池化技术,复用对象,减少内存分配的频率以及协程创建开销,从而提高协程执行效率的技术。

最近抽空了解了字节官方开源的 gopkg 库提供的 gopool 协程池实现,感觉还是很高质量的,代码也非常简洁清晰,而且 Kitex 底层也在使用 gopool 来管理协程,这里我们梳理一下设计和实现。

gopool

Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool

gopool is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines. It is an alternative to the go keyword.

了解官方 README 就会发现gopool的用法其实非常简单,将曾经我们经常使用的 go func(){...} 替换为 gopool.Go(func(){...}) 即可。

此时 gopool 将会使用默认的配置来管理你启动的协程,你也可以选择针对业务场景配置池子大小,以及扩容上限。

old:

?
1
2
3
go func() {
    // do your job
}()

new:

?
1
2
3
4
5
6
7
import (
    "github.com/bytedance/gopkg/util/gopool"
)
 
gopool.Go(func(){
    /// do your job
})

核心实现

下面我们来看看gopool是怎样实现协程池管理的。

Pool

Pool 是一个定义了协程池能力的接口。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Pool interface {
    // 池子的名称
    Name() string
        
    // 设置池子内Goroutine的容量
    SetCap(cap int32)
        
    // 执行 f 函数
    Go(f func())
        
    // 带 ctx,执行 f 函数
    CtxGo(ctx context.Context, f func())
        
    // 设置发生panic时调用的函数
    SetPanicHandler(f func(context.Context, interface{}))
}

gopool 提供了这个接口的默认实现(即下面即将介绍的pool),当我们直接调用 gopool.CtxGo 时依赖的就是这个。

这样的设计模式在 Kitex 中也经常出现,所有的依赖均设计为接口,便于随后扩展,底层提供一个默认的实现暴露出去,这样对调用方也很友好。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type pool struct {
    // 池子名称
    name string
 
    // 池子的容量, 即最大并发工作的 goroutine 的数量
    cap int32
        
    // 池子配置
    config *Config
        
    // task 链表
    taskHead  *task
    taskTail  *task
    taskLock  sync.Mutex
    taskCount int32
 
    // 记录当前正在运行的 worker 的数量
    workerCount int32
 
    // 当 worker 出现panic时被调用
    panicHandler func(context.Context, interface{})
}
 
// NewPool 创建一个新的协程池,初始化名称,容量,配置
func NewPool(name string, cap int32, config *Config) Pool {
    p := &pool{
        name:   name,
        cap:    cap,
        config: config,
    }
    return p
}

调用 NewPool 获取了以 Pool 的形式返回的 pool 结构体。

Task

?
1
2
3
4
5
6
type task struct {
    ctx context.Context
    f   func()
 
    next *task
}

task 是一个链表结构,可以把它理解为一个待执行的任务,它包含了当前节点需要执行的函数f, 以及指向下一个task的指针。

综合前一节 pool 的定义,我们可以看到,一个协程池 pool 对应了一组task

pool 维护了指向链表的头尾的两个指针:taskHeadtaskTail,以及链表的长度taskCount 和对应的锁 taskLock

Worker

?
1
2
3
type worker struct {
    pool *pool
}

一个 worker 就是逻辑上的一个执行器,它唯一对应到一个协程池 pool。当一个worker被唤起,将会开启一个goroutine ,不断地从 pool 中的 task链表获取任务并执行。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (w *worker) run() {
    go func() {
        for {
                        // 声明即将执行的 task
            var t *task
                        
                        // 操作 pool 中的 task 链表,加锁
            w.pool.taskLock.Lock()
            if w.pool.taskHead != nil {
                                // 拿到 taskHead 准备执行
                t = w.pool.taskHead
                                
                                // 更新链表的 head 以及数量
                w.pool.taskHead = w.pool.taskHead.next
                atomic.AddInt32(&w.pool.taskCount, -1)
            }
                        // 如果前一步拿到的 taskHead 为空,说明无任务需要执行,清理后返回
            if t == nil {
                w.close()
                w.pool.taskLock.Unlock()
                w.Recycle()
                return
            }
            w.pool.taskLock.Unlock()
                        
                        // 执行任务,针对 panic 会recover,并调用配置的 handler
            func() {
                defer func() {
                    if r := recover(); r != nil {
                        msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
                        logger.CtxErrorf(t.ctx, msg)
                        if w.pool.panicHandler != nil {
                            w.pool.panicHandler(t.ctx, r)
                        }
                    }
                }()
                t.f()
            }()
            t.Recycle()
        }
    }()
}

整体来看

看到这里,其实就能把整个流程串起来了。我们来看看对外的接口 CtxGo(context.Context, f func()) 到底做了什么?

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func Go(f func()) {
    CtxGo(context.Background(), f)
}
 
func CtxGo(ctx context.Context, f func()) {
    defaultPool.CtxGo(ctx, f)
}
 
func (p *pool) CtxGo(ctx context.Context, f func()) {
 
        // 创建一个 task 对象,将 ctx 和待执行的函数赋值
    t := taskPool.Get().(*task)
    t.ctx = ctx
    t.f = f
        
        // 将 task 插入 pool 的链表的尾部,更新链表数量
    p.taskLock.Lock()
    if p.taskHead == nil {
        p.taskHead = t
        p.taskTail = t
    } else {
        p.taskTail.next = t
        p.taskTail = t
    }
    p.taskLock.Unlock()
    atomic.AddInt32(&p.taskCount, 1)
        
        
    // 以下两个条件满足时,创建新的 worker 并唤起执行:
    // 1. task的数量超过了配置的限制
    // 2. 当前运行的worker数量小于上限(或无worker运行)
    if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
        
                // worker数量+1
        p.incWorkerCount()
                
                // 创建一个新的worker,并把当前 pool 赋值
        w := workerPool.Get().(*worker)
        w.pool = p
                
                // 唤起worker执行
        w.run()
    }
}

相信看了代码注释,大家就能理解发生了什么。

gopool 会自行维护一个 defaultPool,这是一个默认的 pool 结构体,在引入包的时候就进行初始化。当我们直接调用 gopool.CtxGo() 时,本质上是调用了 defaultPool 的同名方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func init() {
    defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}
 
const (
    defaultScalaThreshold = 1
)
 
// Config is used to config pool.
type Config struct {
    // 控制扩容的门槛,一旦待执行的 task 超过此值,且 worker 数量未达到上限,就开始启动新的 worker
    ScaleThreshold int32
}
 
// NewConfig creates a default Config.
func NewConfig() *Config {
    c := &Config{
        ScaleThreshold: defaultScalaThreshold,
    }
    return c
}

defaultPool 的名称为 gopool.DefaultPool,池子容量一万,扩容下限为 1。

当我们调用 CtxGo时,gopool 就会更新维护的任务链表,并且判断是否需要扩容 worker

  • 若此时已经有很多 worker 启动(底层一个 worker 对应一个 goroutine),不需要扩容,就直接返回。
  • 若判断需要扩容,就创建一个新的worker,并调用 worker.run()方法启动,各个worker会异步地检查 pool 里面的任务链表是否还有待执行的任务,如果有就执行。

三个角色的定位

  • task 是一个待执行的任务节点,同时还包含了指向下一个任务的指针,链表结构;
  • worker 是一个实际执行任务的执行器,它会异步启动一个 goroutine 执行协程池里面未执行的task
  • pool 是一个逻辑上的协程池,对应了一个task链表,同时负责维护task状态的更新,以及在需要的时候创建新的 worker

使用 sync.Pool 进行性能优化

其实到这个地方,gopool已经是一个代码简洁清晰的协程池库了,但是性能上显然有改进空间,所以gopool的作者应用了多次 sync.Pool 来池化对象的创建,复用woker和task对象。

这里建议大家直接看源码,其实在上面的代码中已经有所涉及。

  • task 池化
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
var taskPool sync.Pool
 
func init() {
    taskPool.New = newTask
}
 
func newTask() interface{} {
    return &task{}
}
 
func (t *task) Recycle() {
    t.zero()
    taskPool.Put(t)
}
  • worker 池化
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
var workerPool sync.Pool
 
func init() {
    workerPool.New = newWorker
}
 
func newWorker() interface{} {
    return &worker{}
}
 
func (w *worker) Recycle() {
    w.zero()
    workerPool.Put(w)
}

到此这篇关于Golang协程池gopool设计与实现的文章就介绍到这了,更多相关Golang协程池gopool内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://juejin.cn/post/7086443265309818894

延伸 · 阅读

精彩推荐
  • Golanggolang time常用方法详解

    golang time常用方法详解

    这篇文章主要介绍了golang time常用方法详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下...

    yunlongliang10212021-09-16
  • GolangGo语言Web编程实现Get和Post请求发送与解析的方法详解

    Go语言Web编程实现Get和Post请求发送与解析的方法详解

    这篇文章主要介绍了Go语言Web编程实现Get和Post请求发送与解析的方法,结合实例形式分析了Go语言客户端、服务器端结合实现web数据get、post发送与接收数据的...

    typ20044122020-05-06
  • Golang几个小技巧帮你实现Golang永久阻塞

    几个小技巧帮你实现Golang永久阻塞

    Go 的运行时的当前设计,假定程序员自己负责检测何时终止一个 goroutine 以及何时终止该程序。有时候我们需要的是使程序阻塞在这一行,本文就来详细的...

    折叠椅3442022-01-19
  • Golang浅析Go语言版本的forgery

    浅析Go语言版本的forgery

    使用过Python语言的朋友们可能使用过 forgery_py ,它是一个伪造数据的工具。这篇文章主要介绍了Go语言版本的forgery,需要的朋友可以参考下 ...

    xingyys5522020-05-17
  • GolangGolang 实现复制文件夹同时复制文件

    Golang 实现复制文件夹同时复制文件

    这篇文章主要介绍了Golang 实现复制文件夹同时复制文件,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...

    印象丶亮仔11052021-03-16
  • GolangWindows下Goland的环境搭建过程详解

    Windows下Goland的环境搭建过程详解

    这篇文章主要介绍了Windows下Goland的环境搭建过程,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的...

    Taco_Tuesdayyy16382021-01-26
  • GolangGO语言框架快速集成日志模块的操作方法

    GO语言框架快速集成日志模块的操作方法

    zap是一个可以在go项目中进行快速, 结构化且分级的日志记录包, git star数高达16.3k, Git 项目地址, 在各大公司项目中被广泛使用,这篇文章主要介绍了GO语言...

    Masters7652022-07-13
  • Golang解析golang中的并发安全和锁问题

    解析golang中的并发安全和锁问题

    本文我们来学习一下golang中的并发安全和锁问题,文章通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参...

    专职10182021-11-30