经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Go语言 » 查看文章
go并发编程sync.Cond使用场景及实现原理
来源:jb51  时间:2022/8/31 17:26:52  对本文有异议

使用场景

sync.Cond是go标准库提供的一个条件变量,用于控制一组goroutine在满足特定条件下被唤醒。

sync.Cond常用于一组goroutine等待,一个goroutine通知(事件发生)的场景。如果只有一个goroutine等待,一个goroutine通知(事件发生),使用Mutex或者Channel就可以实现。

可以用一个全局变量标志特定条件condition,每个sync.Cond都必须要关联一个互斥锁(Mutex或者RWMutex),当condition发生变更或者调用Wait时,都必须加锁,保证多个goroutine安全地访问condition。

下面是go标准库http中关于pipe的部分实现,我们可以看到,pipe使用sync.Cond来控制管道中字节流的写入和读取,在pipe中数据可用并且字节流复制到pipe的缓冲区之前,所有的需要读取该管道数据的goroutine都必须等待,直到数据准备完成。

  1. type pipe struct {
  2. mu sync.Mutex
  3. c sync.Cond // c.L lazily initialized to &p.mu
  4. b pipeBuffer // nil when done reading
  5. ...
  6. }
  1. // Read waits until data is available and copies bytes
  2. // from the buffer into p.
  3. func (p *pipe) Read(d []byte) (n int, err error) {
  4. p.mu.Lock()
  5. defer p.mu.Unlock()
  6. if p.c.L == nil {
  7. p.c.L = &p.mu
  8. }
  9. for {
  10. ...
  11. if p.b != nil && p.b.Len() > 0 {
  12. return p.b.Read(d)
  13. }
  14. ...
  15. p.c.Wait() // write未完成前调用Wait进入等待
  16. }
  17. }
  1. // Write copies bytes from p into the buffer and wakes a reader.
  2. // It is an error to write more data than the buffer can hold.
  3. func (p *pipe) Write(d []byte) (n int, err error) {
  4. p.mu.Lock()
  5. defer p.mu.Unlock()
  6. if p.c.L == nil {
  7. p.c.L = &p.mu
  8. }
  9. defer p.c.Signal() // 唤醒所有等待的goroutine
  10. if p.err != nil {
  11. return 0, errClosedPipeWrite
  12. }
  13. if p.breakErr != nil {
  14. p.unread += len(d)
  15. return len(d), nil // discard when there is no reader
  16. }
  17. return p.b.Write(d)
  18. }

实现原理

  1. type Cond struct {
  2. noCopy noCopy // 用来保证结构体无法在编译期间拷贝
  3. // L is held while observing or changing the condition
  4. L Locker // 用来保证condition变更安全
  5. notify notifyList // 待通知的goutine列表
  6. checker copyChecker // 用于禁止运行期间发生的拷贝
  7. }
  1. type notifyList struct {
  2. wait uint32 // 正在等待的goroutine的ticket
  3. notify uint32 // 已经通知到的goroutine的ticket
  4. lock uintptr // key field of the mutex
  5. head unsafe.Pointer // 链表头部
  6. tail unsafe.Pointer // 链表尾部
  7. }

copyChecker

copyChecker是一个指针类型,在创建时,它的值指向自身地址,用于检测该对象是否发生了拷贝。如果发生了拷贝,则直接panic。

  1. // copyChecker holds back pointer to itself to detect object copying.
  2. type copyChecker uintptr
  3. func (c *copyChecker) check() {
  4. if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
  5. !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
  6. uintptr(*c) != uintptr(unsafe.Pointer(c)) {
  7. panic("sync.Cond is copied")
  8. }
  9. }

Wait

调用 Wait 会自动释放锁 c.L,并挂起调用者所在的 goroutine,因此当前协程会阻塞在 Wait 方法调用的地方。如果其他协程调用了 Signal 或 Broadcast 唤醒了该协程,那么 Wait 方法在结束阻塞时,会重新给 c.L 加锁,并且继续执行 Wait 后面的代码。

对条件的检查,使用了 for !condition() 而非 if,是因为当前协程被唤醒时,条件不一定符合要求,需要再次 Wait 等待下次被唤醒。为了保险起见,使用 for 能够确保条件符合要求后,再执行后续的代码。

  1. func (c *Cond) Wait() {
  2. c.checker.check()
  3. t := runtime_notifyListAdd(&c.notify)
  4. c.L.Unlock()
  5. runtime_notifyListWait(&c.notify, t)
  6. c.L.Lock()
  7. }
  • 检查Cond是否被复制,如果被复制,直接panic;
  • 调用runtime_notifyListAdd调用者添加到通知列表并解锁,以便可以接收到通知,然后将返回的ticket传入到runtime_notifyListWait来等待通知。
  • 当前goroutine会阻塞在wait调用的地方,直到其他goroutine调用Signal或Broadcast唤醒该协程。
  1. func notifyListAdd(l *notifyList) uint32 {
  2. return atomic.Xadd(&l.wait, 1) - 1
  3. }

notifyListWait会将当前goroutine追加到链表的尾端,同时调用goparkunlock让当前goroutine陷入休眠,该方法会直接让出当前处理器的使用权并等待调度器的唤醒。

  1. func notifyListWait(l *notifyList, t uint32) {
  2. s := acquireSudog()
  3. s.g = getg()
  4. s.ticket = t
  5. if l.tail == nil {
  6. l.head = s
  7. } else {
  8. l.tail.next = s
  9. }
  10. l.tail = s
  11. goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
  12. releaseSudog(s)
  13. }

Signal

Signal会唤醒队列最前面的Goroutine。

  1. func (c *Cond) Signal() {
  2. c.checker.check()
  3. runtime_notifyListNotifyOne(&c.notify)
  4. }
  1. func notifyListNotifyOne(l *notifyList) {
  2. t := l.notify
  3. atomic.Store(&l.notify, t+1)
  4. for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
  5. if s.ticket == t {
  6. n := s.next
  7. if p != nil {
  8. p.next = n
  9. } else {
  10. l.head = n
  11. }
  12. if n == nil {
  13. l.tail = p
  14. }
  15. s.next = nil
  16. readyWithTime(s, 4)
  17. return
  18. }
  19. }
  20. }

Broadcast

Broadcast会唤醒队列中全部的goroutine。

  1. func (c *Cond) Broadcast() {
  2. c.checker.check()
  3. runtime_notifyListNotifyAll(&c.notify)
  4. }
  1. func notifyListNotifyAll(l *notifyList) {
  2. s := l.head
  3. l.head = nil
  4. l.tail = nil
  5. atomic.Store(&l.notify, atomic.Load(&l.wait))
  6. for s != nil {
  7. next := s.next
  8. s.next = nil
  9. readyWithTime(s, 4)
  10. s = next
  11. }
  12. }

以上就是go并发编程sync.Cond使用场景及实现原理的详细内容,更多关于go并发编程sync.Cond的资料请关注w3xue其它相关文章!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号