使用场景
sync.Cond是go标准库提供的一个条件变量,用于控制一组goroutine在满足特定条件下被唤醒。
sync.Cond常用于一组goroutine等待,一个goroutine通知(事件发生)的场景。如果只有一个goroutine等待,一个goroutine通知(事件发生),使用Mutex或者Channel就可以实现。
可以用一个全局变量标志特定条件condition,每个sync.Cond都必须要关联一个互斥锁(Mutex或者RWMutex),当condition发生变更或者调用Wait时,都必须加锁,保证多个goroutine安全地访问condition。
下面是go标准库http中关于pipe的部分实现,我们可以看到,pipe使用sync.Cond来控制管道中字节流的写入和读取,在pipe中数据可用并且字节流复制到pipe的缓冲区之前,所有的需要读取该管道数据的goroutine都必须等待,直到数据准备完成。
- type pipe struct {
- mu sync.Mutex
- c sync.Cond // c.L lazily initialized to &p.mu
- b pipeBuffer // nil when done reading
- ...
- }
- // Read waits until data is available and copies bytes
- // from the buffer into p.
- func (p *pipe) Read(d []byte) (n int, err error) {
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.c.L == nil {
- p.c.L = &p.mu
- }
- for {
- ...
- if p.b != nil && p.b.Len() > 0 {
- return p.b.Read(d)
- }
- ...
- p.c.Wait() // write未完成前调用Wait进入等待
- }
- }
- // Write copies bytes from p into the buffer and wakes a reader.
- // It is an error to write more data than the buffer can hold.
- func (p *pipe) Write(d []byte) (n int, err error) {
- p.mu.Lock()
- defer p.mu.Unlock()
- if p.c.L == nil {
- p.c.L = &p.mu
- }
- defer p.c.Signal() // 唤醒所有等待的goroutine
- if p.err != nil {
- return 0, errClosedPipeWrite
- }
- if p.breakErr != nil {
- p.unread += len(d)
- return len(d), nil // discard when there is no reader
- }
- return p.b.Write(d)
- }
实现原理
- type Cond struct {
- noCopy noCopy // 用来保证结构体无法在编译期间拷贝
- // L is held while observing or changing the condition
- L Locker // 用来保证condition变更安全
- notify notifyList // 待通知的goutine列表
- checker copyChecker // 用于禁止运行期间发生的拷贝
- }
- type notifyList struct {
- wait uint32 // 正在等待的goroutine的ticket
- notify uint32 // 已经通知到的goroutine的ticket
- lock uintptr // key field of the mutex
- head unsafe.Pointer // 链表头部
- tail unsafe.Pointer // 链表尾部
- }
copyChecker
copyChecker是一个指针类型,在创建时,它的值指向自身地址,用于检测该对象是否发生了拷贝。如果发生了拷贝,则直接panic。
- // copyChecker holds back pointer to itself to detect object copying.
- type copyChecker uintptr
- func (c *copyChecker) check() {
- if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
- !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
- uintptr(*c) != uintptr(unsafe.Pointer(c)) {
- panic("sync.Cond is copied")
- }
- }
Wait
调用 Wait 会自动释放锁 c.L,并挂起调用者所在的 goroutine,因此当前协程会阻塞在 Wait 方法调用的地方。如果其他协程调用了 Signal 或 Broadcast 唤醒了该协程,那么 Wait 方法在结束阻塞时,会重新给 c.L 加锁,并且继续执行 Wait 后面的代码。
对条件的检查,使用了 for !condition()
而非 if
,是因为当前协程被唤醒时,条件不一定符合要求,需要再次 Wait 等待下次被唤醒。为了保险起见,使用 for
能够确保条件符合要求后,再执行后续的代码。
- func (c *Cond) Wait() {
- c.checker.check()
- t := runtime_notifyListAdd(&c.notify)
- c.L.Unlock()
- runtime_notifyListWait(&c.notify, t)
- c.L.Lock()
- }
- 检查Cond是否被复制,如果被复制,直接panic;
- 调用runtime_notifyListAdd调用者添加到通知列表并解锁,以便可以接收到通知,然后将返回的ticket传入到runtime_notifyListWait来等待通知。
- 当前goroutine会阻塞在wait调用的地方,直到其他goroutine调用Signal或Broadcast唤醒该协程。
- func notifyListAdd(l *notifyList) uint32 {
- return atomic.Xadd(&l.wait, 1) - 1
- }
notifyListWait会将当前goroutine追加到链表的尾端,同时调用goparkunlock让当前goroutine陷入休眠,该方法会直接让出当前处理器的使用权并等待调度器的唤醒。
- func notifyListWait(l *notifyList, t uint32) {
- s := acquireSudog()
- s.g = getg()
- s.ticket = t
- if l.tail == nil {
- l.head = s
- } else {
- l.tail.next = s
- }
- l.tail = s
- goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
- releaseSudog(s)
- }
Signal
Signal会唤醒队列最前面的Goroutine。
- func (c *Cond) Signal() {
- c.checker.check()
- runtime_notifyListNotifyOne(&c.notify)
- }
- func notifyListNotifyOne(l *notifyList) {
- t := l.notify
- atomic.Store(&l.notify, t+1)
- for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
- if s.ticket == t {
- n := s.next
- if p != nil {
- p.next = n
- } else {
- l.head = n
- }
- if n == nil {
- l.tail = p
- }
- s.next = nil
- readyWithTime(s, 4)
- return
- }
- }
- }
Broadcast
Broadcast会唤醒队列中全部的goroutine。
- func (c *Cond) Broadcast() {
- c.checker.check()
- runtime_notifyListNotifyAll(&c.notify)
- }
- func notifyListNotifyAll(l *notifyList) {
- s := l.head
- l.head = nil
- l.tail = nil
- atomic.Store(&l.notify, atomic.Load(&l.wait))
- for s != nil {
- next := s.next
- s.next = nil
- readyWithTime(s, 4)
- s = next
- }
- }
以上就是go并发编程sync.Cond使用场景及实现原理的详细内容,更多关于go并发编程sync.Cond的资料请关注w3xue其它相关文章!