脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|shell|

服务器之家 - 脚本之家 - Golang - Go语言异步高并发编程的秘密:无锁,无条件变量,无回调

Go语言异步高并发编程的秘密:无锁,无条件变量,无回调

2023-05-10 15:39研道鸠摩智 Golang

下面我们针对他给出的case做一些说明与总结,同时对Go语言并发编程的语言特性与技巧进行总结,换句话就是说想提炼出面向场景的go语言高并发编程的八股模式

Go语言异步高并发编程的秘密:无锁,无条件变量,无回调

背景

在并发处理中,资源争用是一个常见的问题。为了避免资源争用,需要进行优化。以下是一些可以优化并发处理中的资源争用问题的建议:

  1. 避免锁竞争:锁竞争是一种常见的资源争用问题。可以通过减少锁的使用,使用更细粒度的锁,以及避免不必要的锁竞争来减少锁竞争。
  2. 使用缓存:在一些情况下,可以使用缓存来减少资源争用。例如,在处理一些计算密集型的任务时,可以使用缓存来避免重复计算。
  3. 使用原子操作:原子操作可以在不使用锁的情况下实现资源的同步访问。Go 语言中提供了一些原子操作,例如 atomic.AddInt32 和 atomic.LoadInt32 等,可以用于实现线程安全的资源访问。
  4. 使用互斥锁:互斥锁是一种用于避免并发资源争用的机制。在需要对资源进行访问的时候,可以使用互斥锁来保证资源的独占访问。
  5. 使用读写锁:读写锁是一种特殊的互斥锁,可以允许多个读操作同时进行,但是只允许一个写操作进行。在读操作频繁的场景下,可以使用读写锁来提高并发性能。
  6. 使用条件变量:条件变量是一种用于在不同线程之间进行协调的机制。可以使用条件变量来避免不必要的资源争用。例如,在一个生产者-消费者模式的程序中,可以使用条件变量来协调生产者和消费者之间的交互,从而避免资源争用。

但是如果让你不用锁,条件变量,回调的话,还怎么写并发程序啊,谷歌大佬Sameer给了大家一个思路。"Advanced Go Concurrency Patterns" by Sameer Ajmani: 这篇博客深入研究了 Golang 中的并发模式,并讨论了如何使用它们来构建高性能系统。它仅仅使用了Go语言的goroutine和channel便实现高效异步并发编程,没有用到诸如await,context等包括锁,条件变量,和回调函数文章包括一些示例和实践建议,帮助读者更好地理解和实践这些概念。下面我们针对他给出的case做一些说明与总结,同时对go语言并发编程的语言特性与技巧进行总结,换句话就是说想提炼出面向场景的go语言高并发编程的八股模式。

select-loop的编程关键要素

1.如何处理事件

2.如何处理元素

3.如何关闭退出

代码示例:

 

Go语言异步高并发编程的秘密:无锁,无条件变量,无回调

核心结构与接口

下面代码给出了核心结构sub,以及它实现了接口subscription的关键代码。

  1. updates属性是一个通道,用于用户对元素进行处理。
  2. fetcher是用于获取元素的客户端,它可以是从数据库读取,也可以是从消息队列读取。
  3. closing用于关闭退出select-loop主体
  1. // sub implements the Subscription interface. 
  2. type sub struct { 
  3.     fetcher Fetcher         // fetches items 
  4.     updates chan Item       // sends items to the user 
  5.     closing chan chan error // for Close 
  6.  
  7. func (s *sub) Updates() <-chan Item { 
  8.     return s.updates 
  9.  
  10. func (s *sub) Close() error { 
  11.     errc := make(chan error) 
  12.     s.closing <- errc // 向closing通道中同步写入errc 
  13.     return <-errc     // 等待主loop返回 
  14.  
  15. // Subscribe returns a new Subscription that uses fetcher to fetch Items. 
  16. func Subscribe(fetcher Fetcher) Subscription { 
  17.     s := &sub{ 
  18.         fetcher: fetcher, 
  19.         updates: make(chan Item),       // for Updates 
  20.         closing: make(chan chan error), // for Close 
  21.     } 
  22.     go s.loop() 
  23.     return s 

sub的核心处理逻辑

  1. // loop periodically fecthes Items, sends them on s.updates, and exits 
  2. // when Close is called.  It extends dedupeLoop with logic to run 
  3. // Fetch asynchronously. 
  4. func (s *sub) loop() { 
  5.     const maxPending = 10 
  6.     type fetchResult struct { 
  7.         fetched []Item 
  8.         next    time.Time 
  9.         err     error 
  10.     } 
  11.     var fetchDone chan fetchResult // if non-nil, Fetch is running 
  12.     var pending []Item 
  13.     var next time.Time 
  14.     var err error 
  15.     var seen = make(map[string]bool) 
  16.     for { 
  17.         var fetchDelay time.Duration 
  18.         if now := time.Now(); next.After(now) { 
  19.             fetchDelay = next.Sub(now) 
  20.         } 
  21.         var startFetch <-chan time.Time 
  22.         if fetchDone == nil && len(pending) < maxPending {  
  23.       //等待队列长度未超过最大设置且fetchDone是空,即元素已经都入队列了 
  24.       // 设置fetchDelay时间后,startFetch通道有值 
  25.             startFetch = time.After(fetchDelay)  
  26.         } 
  27.         var first Item 
  28.         var updates chan Item 
  29.         if len(pending) > 0 { 
  30.             first = pending[0] 
  31.             updates = s.updates // updates通道是为了用户进一步消费的 
  32.         } 
  33.         select { 
  34.         case <-startFetch: 
  35.             fetchDone = make(chan fetchResult, 1) 
  36.             go func() { 
  37.                 fetched, next, err := s.fetcher.Fetch() 
  38.                 fetchDone <- fetchResult{fetched, next, err} 
  39.             }() 
  40.         case result := <-fetchDone: 
  41.             fetchDone = nil 
  42.             // Use result.fetched, result.next, result.err 
  43.             fetched := result.fetched 
  44.             next, err = result.next, result.err 
  45.             if err != nil { 
  46.                 next = time.Now().Add(10 * time.Second) 
  47.                 break 
  48.             } 
  49.             for _, item := range fetched { 
  50.                 if id := item.GUID; !seen[id] { 
  51.                     pending = append(pending, item) 
  52.                     seen[id] = true 
  53.                 } 
  54.             } 
  55.         case errc := <-s.closing: 
  56.             errc <- err 
  57.             close(s.updates) 
  58.             return 
  59.         case updates <- first: 
  60.             pending = pending[1:] 
  61.         } 
  62.     } 

那么上面的代码是如何处理三个关键问题的呢?

  • 首先关于关闭并退出loop

上述代码通过监听sub结构的closing属性,实现退出。

  1. //Close asks loop to exit and waits for a response. 
  2. func (s *sub) Close() error { 
  3.     errc := make(chan error) 
  4.     s.closing <- errc 
  5.     return <-errc 

当调用sub的Close方法时,s.closing会接收一个errc的通道,loop主体向errc中写入error信息并退出,调用sub的Close方法的客户端从errc中也同步收到error信息。这是一个同步关闭的过程。loop主体可以在给客户端发送error信息之前,可以完成一系列的关闭清理工作。

  • 关于事件处理与调度

程序中设置的下一次获取元素的延迟调度的最小单位是10秒,从下面第22行可以看到,如果获取元素很快,没有耗费10秒,那么fetchDelay便有个时间gap,startFetch(第7行)这个时间通道便会通过time.After这个方法,在fetchDelay时间后,收到信号,完成18到25行的获取元素工作。

  1. var pending []Item // appended by fetch; consumed by send 
  2.     var next time.Time // initially January 1, year 0 
  3.     var err error 
  4.     for { 
  5.         var fetchDelay time.Duration // initially 0 (no delay) 
  6.         if now := time.Now(); next.After(now) { 
  7.             fetchDelay = next.Sub(now) 
  8.         } 
  9.         startFetch := time.After(fetchDelay) 
  10.  
  11.      select { 
  12.         case <-startFetch: 
  13.             var fetched []Item 
  14.             fetched, next, err = s.fetcher.Fetch() 
  15.             if err != nil { 
  16.                 next = time.Now().Add(10 * time.Second) 
  17.                 break 
  18.             } 
  19.             pending = append(pending, fetched...) 
  20.         
  21.         } 
  22.     } 

问题:为了防止等待队列过大,所以只有当长度不超过maxPending,并且获取的数据已经入队了的时候,才会设置startFetch,否则就不触发fetch。这块可以结合上面整个代码看看

  1. var fetchDelay time.Duration 
  2.         if now := time.Now(); next.After(now) { 
  3.             fetchDelay = next.Sub(now) 
  4.         } 
  5.         var startFetch <-chan time.Time 
  6.         if fetchDone == nil && len(pending) < maxPending { 
  7.             startFetch = time.After(fetchDelay) // enable fetch case 
  8.         } 

问题: Loop blocks on Fetch。

golang有个特性,就是Sends and receives on nil channels block.利用这个特性,当fetchDone是nil或者他里面没有准备好结果的时候,相关的case都会阻塞,那么select也不会选择它。同时为了防止fetch函数阻塞loop主函数,通过启动协程(下面9-12行),再次提升主loop的性能。

  1. type fetchResult struct{ fetched []Item; next time.Time; err error } 
  1. var fetchDone chan fetchResult // if non-nil, Fetch is running 
  2. var startFetch <-chan time.Time 
  3.         if fetchDone == nil && len(pending) < maxPending { 
  4.             startFetch = time.After(fetchDelay) // enable fetch case 
  5.         } 
  6. select { 
  7.         case <-startFetch: 
  8.             fetchDone = make(chan fetchResult, 1) 
  9.             go func() { 
  10.                 fetched, next, err := s.fetcher.Fetch() 
  11.                 fetchDone <- fetchResult{fetched, next, err} 
  12.             }() 
  13.         case result := <-fetchDone: 
  14.             fetchDone = nil 
  15.             // Use result.fetched, result.next, result.err 

总结

上面用到了3个技巧,如下所示:

  • for-select loop
  • service channel, reply channels (chan chan error)
  • nil channels in select cases

通过err,next,pending三个变量,就实现了在没有锁,条件变量,回调情况下,编写高效并发go程序的需求。

Go语言异步高并发编程的秘密:无锁,无条件变量,无回调

原文地址:https://www.toutiao.com/article/7202411335232274984/

延伸 · 阅读

精彩推荐
  • GolangGolang学习之反射机制的用法详解

    Golang学习之反射机制的用法详解

    反射的本质就是在程序运行的时候,获取对象的类型信息和内存结语构,反射是把双刃剑,功能强大但可读性差。本文将详细讲讲Golang中的反射机制,感兴...

    LiberHome7092022-10-12
  • GolangGolang搭建开发环境的图文教程

    Golang搭建开发环境的图文教程

    这篇文章主要介绍了Golang搭建开发环境,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参...

    Payne-Wu11502021-02-04
  • Golang记一次 Golang 踩坑 RabbitMQ

    记一次 Golang 踩坑 RabbitMQ

    最近在项目中遇到了一个使用 RabbitMQ 时的问题,这个问题我觉得还是有一定普适性的,和大家分享一下,避免大家后续在同一个问题上犯错。...

    跨界架构师5432022-01-07
  • GolangGoLang中生成UUID唯一标识的实现

    GoLang中生成UUID唯一标识的实现

    这篇文章主要介绍了GoLang中生成UUID唯一标识的实现,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...

    私念16252021-06-25
  • Golanggolang一些常用的静态检查工具详解

    golang一些常用的静态检查工具详解

    这篇文章主要介绍了golang一些常用的静态检查工具,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的...

    xiaoliizi7732021-02-04
  • GolangGo语言学习函数+结构体+方法+接口

    Go语言学习函数+结构体+方法+接口

    这篇文章主要介绍了Go语言学习函数+结构体+方法+接口,文章围绕主题的相关资料展开详细的文章说明,具有一定的参考价值,需要的小伙伴可以参考一下...

    Arway8802022-10-10
  • Golanggolang利用unsafe操作未导出变量-Pointer使用详解

    golang利用unsafe操作未导出变量-Pointer使用详解

    这篇文章主要给大家介绍了关于golang利用unsafe操作未导出变量-Pointer使用的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考借鉴,下面...

    豆瓣奶茶3402020-05-17
  • Golanggo语言对文件按照指定块大小进行分割的方法

    go语言对文件按照指定块大小进行分割的方法

    这篇文章主要介绍了go语言对文件按照指定块大小进行分割的方法,实例分析了Go语言文件操作的技巧,需要的朋友可以参考下 ...

    work245472020-04-22