经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
JUC同步锁原理源码解析五----Phaser
来源:cnblogs  作者:bug的自我救赎  时间:2023/6/19 21:22:38  对本文有异议

JUC同步锁原理源码解析五----Phaser

Phaser

Phaser的来源

  1. A reusable synchronization barrier, similar in functionality to {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and {@link java.util.concurrent.CountDownLatch CountDownLatch} but supporting more flexible usage.

? JDK中对Phaser的定义时,一个可重用的同步栅栏。其作用相当于CyclicBarrier和CountDownLatch的结合体,但是支持更加灵活的使用

Phaser的底层实现

? Phaser的底层实现依旧依赖于CAS的自旋锁操作,通过cas保证原子性的操作

2.Phaser

基本使用

  1. import java.util.List;
  2. import java.util.concurrent.Phaser;
  3. public class PhaserDemo {
  4. void runTasks(List<Runnable> tasks) {
  5. final Phaser phaser = new Phaser(1); // "1" to register self
  6. // create and start threads
  7. for (final Runnable task : tasks) {
  8. phaser.register();
  9. new Thread() {
  10. public void run() {
  11. phaser.arriveAndAwaitAdvance(); // await all creation
  12. task.run();
  13. }
  14. }.start();
  15. }
  16. }
  17. void startTasks(List<Runnable> tasks, int iterations) {
  18. Phaser phaser = new Phaser() {
  19. protected boolean onAdvance(int phase, int registeredParties) {
  20. return phase >= iterations - 1 || registeredParties == 0;
  21. }
  22. };
  23. phaser.register();
  24. for (Runnable task : tasks) {
  25. phaser.register();
  26. new Thread(() -> {
  27. do {
  28. task.run();
  29. phaser.arriveAndAwaitAdvance();
  30. } while (!phaser.isTerminated());
  31. }).start();
  32. }
  33. // allow threads to proceed; don't wait for them
  34. phaser.arriveAndDeregister();
  35. }
  36. }

Phaser类

  1. public class Phaser {
  2. private volatile long state;//采用long 64 位表示state变量。使用位操作来表示,cas单原子性变量保证多变量的原子性
  3. private static final int MAX_PARTIES = 0xffff;
  4. private static final int MAX_PHASE = Integer.MAX_VALUE;
  5. private static final int PARTIES_SHIFT = 16;
  6. private static final int PHASE_SHIFT = 32;
  7. private static final int UNARRIVED_MASK = 0xffff; // to mask ints
  8. private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
  9. private static final long COUNTS_MASK = 0xffffffffL;
  10. private static final long TERMINATION_BIT = 1L << 63;
  11. // some special values
  12. private static final int ONE_ARRIVAL = 1;
  13. private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
  14. private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
  15. private static final int EMPTY = 1;
  16. // The following unpacking methods are usually manually inlined
  17. private static int unarrivedOf(long s) {
  18. int counts = (int)s;
  19. return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
  20. }
  21. private static int partiesOf(long s) {
  22. return (int)s >>> PARTIES_SHIFT;
  23. }
  24. private static int phaseOf(long s) {
  25. return (int)(s >>> PHASE_SHIFT);
  26. }
  27. private static int arrivedOf(long s) {
  28. int counts = (int)s;
  29. return (counts == EMPTY) ? 0 :
  30. (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
  31. }
  32. /**
  33. * The parent of this phaser, or null if none
  34. */
  35. private final Phaser parent;
  36. /**
  37. * The root of phaser tree. Equals this if not in a tree.
  38. */
  39. private final Phaser root;
  40. /**
  41. * Heads of Treiber stacks for waiting threads. To eliminate
  42. * contention when releasing some threads while adding others, we
  43. * use two of them, alternating across even and odd phases.
  44. * Subphasers share queues with root to speed up releases.
  45. */
  46. private final AtomicReference<QNode> evenQ;
  47. private final AtomicReference<QNode> oddQ;

QNode类

  1. static final class QNode implements ForkJoinPool.ManagedBlocker {
  2. final Phaser phaser;
  3. final int phase;
  4. final boolean interruptible;
  5. final boolean timed;
  6. boolean wasInterrupted;
  7. long nanos;
  8. final long deadline;
  9. volatile Thread thread; // nulled to cancel wait
  10. QNode next;
  11. QNode(Phaser phaser, int phase, boolean interruptible,
  12. boolean timed, long nanos) {
  13. this.phaser = phaser;
  14. this.phase = phase;
  15. this.interruptible = interruptible;
  16. this.nanos = nanos;
  17. this.timed = timed;
  18. this.deadline = timed ? System.nanoTime() + nanos : 0L;
  19. thread = Thread.currentThread();
  20. }

Phaser的构造器

  1. public Phaser(int parties) {
  2. this(null, parties);
  3. }
  4. public Phaser(Phaser parent) {
  5. this(parent, 0);
  6. }
  7. //最终都是走这个构造器方法
  8. public Phaser(Phaser parent, int parties) {
  9. if (parties >>> PARTIES_SHIFT != 0)//
  10. throw new IllegalArgumentException("Illegal number of parties");
  11. int phase = 0;
  12. this.parent = parent;
  13. if (parent != null) {//判断父阶段是否为空。如果有父阶段,子阶段的行为由父阶段控制,调用父阶段去处理
  14. final Phaser root = parent.root;//root为父阶段
  15. this.root = root;
  16. this.evenQ = root.evenQ;//使用父阶段的偶队列
  17. this.oddQ = root.oddQ;//使用父阶段的奇队列
  18. if (parties != 0)//如果父阶段不为空
  19. phase = parent.doRegister(1);//将当前阶段注册到父阶段中
  20. }
  21. else {//表示没有父阶段
  22. this.root = this;
  23. this.evenQ = new AtomicReference<QNode>();
  24. this.oddQ = new AtomicReference<QNode>();
  25. }
  26. this.state = (parties == 0) ? (long)EMPTY :
  27. ((long)phase << PHASE_SHIFT) | //64位中高32位表示阶段数,也即phase的数量
  28. ((long)parties << PARTIES_SHIFT) | //64位中低32位的高16位表示参与者的数量
  29. ((long)parties);//64位中低32位的低16位表示未完成的数量
  30. }

register方法

  1. public int register() {
  2. return doRegister(1);
  3. }
  4. private int doRegister(int registrations) {
  5. // adjustment to state
  6. long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;//对当前state变量的参与数量和未完成数量都加 1
  7. final Phaser parent = this.parent;//如果有父阶段,获取父阶段
  8. int phase;
  9. for (;;) {
  10. long s = (parent == null) ? state : reconcileState();//拿到state的值
  11. int counts = (int)s;//将64位取低32位的值
  12. int parties = counts >>> PARTIES_SHIFT;//右移16位,取高16位的值,也即parties的数量
  13. int unarrived = counts & UNARRIVED_MASK;//获取低16位的数值,也即未到达的数量
  14. if (registrations > MAX_PARTIES - parties)//越界判断
  15. throw new IllegalStateException(badRegister(s));
  16. phase = (int)(s >>> PHASE_SHIFT);//获取阶段数
  17. if (phase < 0)//阶段数为0,表示已经超过阶段数了,不需要继续处理了
  18. break;
  19. if (counts != EMPTY) { // not 1st registration
  20. if (parent == null || reconcileState() == s) {
  21. if (unarrived == 0) // wait out advance 如果未完成数量等于0
  22. root.internalAwaitAdvance(phase, null);//阻塞等待或者等到下一阶段推进
  23. else if (UNSAFE.compareAndSwapLong(this, stateOffset,//将当前需要参与的数量放到state变量中
  24. s, s + adjust))
  25. break;//退出循环
  26. }
  27. }
  28. else if (parent == null) {//没有父阶段或自己就是父阶段
  29. long next = ((long)phase << PHASE_SHIFT) | adjust; //阶段数量增加
  30. if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))//cas尝试将阶段数量增加,成功就腿很粗
  31. break;
  32. }
  33. else {
  34. synchronized (this) { //走到这里表示,自身属于子阶段,需要接受父阶段的调度
  35. if (state == s) { //重新检测state变量是否改变
  36. phase = parent.doRegister(1);//向父阶段注册
  37. if (phase < 0)//阶段数已经超过了最大阶段数
  38. break;
  39. //while循环,设置state的中phase阶段数直至成功
  40. while (!UNSAFE.compareAndSwapLong
  41. (this, stateOffset, s,
  42. ((long)phase << PHASE_SHIFT) | adjust)) {
  43. s = state;
  44. phase = (int)(root.state >>> PHASE_SHIFT);
  45. // assert (int)s == EMPTY;
  46. }
  47. break;
  48. }
  49. }
  50. }
  51. }
  52. return phase;
  53. }

reconcileState方法:

  1. //只要使用在有父子阶段的存在的情况下
  2. private long reconcileState() {
  3. final Phaser root = this.root;//获取到当前阶段
  4. long s = state;//取得当前state
  5. if (root != this) {//如果root不是当前阶段
  6. int phase, p;
  7. // CAS to root phase with current parties, tripping unarrived
  8. while ((phase = (int)(root.state >>> PHASE_SHIFT)) != //phase等于root的阶段数
  9. (int)(s >>> PHASE_SHIFT) &&//root的阶段数不等于当前阶段state变量的阶段数
  10. !UNSAFE.compareAndSwapLong//
  11. (this, stateOffset, s,
  12. s = (((long)phase << PHASE_SHIFT) | //root的阶段数
  13. ((phase < 0) ? (s & COUNTS_MASK) : //如果阶段数已经超了,直接取低32位
  14. (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY : //获取到s对应的parties数量,复制给p
  15. ((s & PARTIES_MASK) | p))))))
  16. s = state;
  17. }
  18. return s;
  19. }

arriveAndAwaitAdvance方法

  1. public int arriveAndAwaitAdvance() {
  2. // Specialization of doArrive+awaitAdvance eliminating some reads/paths
  3. final Phaser root = this.root;//获取当前阶段
  4. for (;;) {
  5. long s = (root == this) ? state : reconcileState();//获取到state的状态,如果有父阶段调用reconcileState
  6. int phase = (int)(s >>> PHASE_SHIFT);//等到当前阶段数
  7. if (phase < 0)//阶段数超了,直接返回
  8. return phase;
  9. int counts = (int)s;//获取state的低32位
  10. int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);//获取到未到达的参与者数量
  11. if (unarrived <= 0)//未到达的参与者数量越界检查
  12. throw new IllegalStateException(badArrive(s));
  13. if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
  14. s -= ONE_ARRIVAL)) {//cas将未到达的数量减1
  15. if (unarrived > 1)//如果未到达的数量大于1
  16. return root.internalAwaitAdvance(phase, null);//调用父阶段控制其去睡眠等待
  17. if (root != this)//如果this不是父阶段
  18. return parent.arriveAndAwaitAdvance();//由父类处理,将到达线程数减1或滚动到下一阶段
  19. long n = s & PARTIES_MASK; // base of next state//获得参与者parties数量
  20. int nextUnarrived = (int)n >>> PARTIES_SHIFT;//获得下一个阶段参与者的数量
  21. if (onAdvance(phase, nextUnarrived))//调用onAdvance会掉方法
  22. n |= TERMINATION_BIT;//TERMINATION_BIT:1<<63,标识阶段数结束
  23. else if (nextUnarrived == 0)//如果下一个阶段参与者为0
  24. n |= EMPTY;//异或上EMPTY
  25. else
  26. n |= nextUnarrived;//否则将低32位的低16位置为下一阶段参与者的数量,表示未完成的数量等于下一个阶段参与者的数量
  27. int nextPhase = (phase + 1) & MAX_PHASE;//获得下一个阶段的phase的数量
  28. n |= (long)nextPhase << PHASE_SHIFT;//n异或上下一阶段phase的数量组合成state比那辆
  29. if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))//cas设置state变量。当前线程如果设置state变量失败,是否可以允许爆炸唤醒,不直接退出?
  30. return (int)(state >>> PHASE_SHIFT); // cas失败,返回state中的阶段数
  31. releaseWaiters(phase);//释放等待线程
  32. return nextPhase;
  33. }
  34. }
  35. }

internalAwaitAdvance方法

  1. private int internalAwaitAdvance(int phase, QNode node) {
  2. // assert root == this;
  3. releaseWaiters(phase-1); // ensure old queue clean 将上一个阶段等待线程唤醒,将队列清空
  4. boolean queued = false; // true when node is enqueued
  5. int lastUnarrived = 0; // to increase spins upon change
  6. int spins = SPINS_PER_ARRIVAL;//SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8,单核CPU没有自旋的必要,浪费时间
  7. long s;
  8. int p;
  9. while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {//判断当前阶段数是否等于phase
  10. if (node == null) { // spinning in noninterruptible mode
  11. int unarrived = (int)s & UNARRIVED_MASK;//获取未到达的参与者数量
  12. if (unarrived != lastUnarrived &&//未到达的参与者数量不等于lastUnarrived
  13. (lastUnarrived = unarrived) < NCPU)//lastUnarrived 赋值lastUnarrived。小于CPU的核心数,证明任务很快可以调度,值得等待。但是考虑业务线程,实际中如果CPU的核心数没有大于2,其实没有自旋的必要。
  14. spins += SPINS_PER_ARRIVAL;//增加自旋次数
  15. boolean interrupted = Thread.interrupted();//判断中断标志位
  16. if (interrupted || --spins < 0) { // need node to record intr //如果中断了,或者自旋次数小于0
  17. node = new QNode(this, phase, false, false, 0L);
  18. node.wasInterrupted = interrupted;//将中断标识赋值
  19. }
  20. }
  21. else if (node.isReleasable()) // done or aborted 判断是否已经完成,或者说中断等方式释放
  22. break;
  23. else if (!queued) { // push onto queue 不在队列中
  24. AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; //根据phase的奇偶性,选择队列
  25. QNode q = node.next = head.get();//头插法
  26. if ((q == null || q.phase == phase) &&
  27. (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
  28. queued = head.compareAndSet(q, node);
  29. }
  30. else {
  31. try {
  32. ForkJoinPool.managedBlock(node);//由于兼容forkJoin线程池,所以这里提供模板。这里进行阻塞等待
  33. } catch (InterruptedException ie) {
  34. node.wasInterrupted = true;
  35. }
  36. }
  37. }
  38. if (node != null) {//进入这里表示,当前阶段不一致
  39. if (node.thread != null)
  40. node.thread = null; // avoid need for unpark() //将thread置为空
  41. if (node.wasInterrupted && !node.interruptible) //节点被中断,并且节点不可中断
  42. Thread.currentThread().interrupt();//重置中断标志位
  43. if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)//当前阶段数量一致,也即属于同一阶段
  44. return abortWait(phase); // possibly clean up on abort
  45. }
  46. releaseWaiters(phase);//唤醒等待的线程
  47. return p;
  48. }

releaseWaiters方法

  1. private void releaseWaiters(int phase) {
  2. QNode q; // first element of queue
  3. Thread t; // its thread
  4. AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;//根据阶段数判断是奇数队列还是偶数队列
  5. while ((q = head.get()) != null &&//获取到头结点,如果头结点补位空
  6. q.phase != (int)(root.state >>> PHASE_SHIFT)) {//并且当前阶段数已经滚动到下一个阶段
  7. if (head.compareAndSet(q, q.next) &&//cas替换头结点
  8. (t = q.thread) != null) {//如果旧的头结点不为空
  9. q.thread = null;//将节点q的线程置为空
  10. LockSupport.unpark(t);//唤醒节点q的线程
  11. }
  12. }
  13. }

arriveAndDeregister方法

  1. public int arriveAndDeregister() {
  2. return doArrive(ONE_DEREGISTER);//ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
  3. }

doArrive方法

  1. private int doArrive(int adjust) {
  2. final Phaser root = this.root;//获取当前阶段
  3. for (;;) {
  4. long s = (root == this) ? state : reconcileState();//如果有父阶段获取父阶段的state,没有取当前阶段的state
  5. int phase = (int)(s >>> PHASE_SHIFT);//获取阶段数
  6. if (phase < 0)//如果阶段数已经小于0,表示已经结束,直接返回
  7. return phase;
  8. int counts = (int)s;//获取state的低32位,业绩参与者和未完成的参与者
  9. int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);//获取未到达的参与者数量EMPTY是特殊值,表示没有未到达的参与者
  10. if (unarrived <= 0)//如果未到达的参与者小于0,非法直接抛出异常
  11. throw new IllegalStateException(badArrive(s));
  12. if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {//直接cas自旋,更新state变量
  13. if (unarrived == 1) {//如果当前线程是最后一个未完成的参与者,需要做收尾工作
  14. long n = s & PARTIES_MASK; // base of next state 获取参与者的数量
  15. int nextUnarrived = (int)n >>> PARTIES_SHIFT;//设置未到达的参与者数量为下一个阶段的参与者数量
  16. if (root == this) {//如果root是当前阶段
  17. if (onAdvance(phase, nextUnarrived))//回调钩子函数
  18. n |= TERMINATION_BIT;//置为TERMINATION状态,TERMINATION_BIT = 1L << 63;最高位符号位表示终止标志位
  19. else if (nextUnarrived == 0)//如果下一个阶段没有参与者
  20. n |= EMPTY;//直接或上一个EMPTY
  21. else
  22. n |= nextUnarrived;//否则直接或上下一个阶段的未达到的参与者数量
  23. int nextPhase = (phase + 1) & MAX_PHASE;//阶段数加1
  24. n |= (long)nextPhase << PHASE_SHIFT;//将阶段数组合到变量n中,
  25. UNSAFE.compareAndSwapLong(this, stateOffset, s, n);//cas自旋将state置为n,表示滚动到下一个阶段
  26. releaseWaiters(phase);//释放所有等待的节点
  27. }
  28. else if (nextUnarrived == 0) { // propagate deregistration 这里表示root有父阶段且自己已经完成
  29. phase = parent.doArrive(ONE_DEREGISTER);//父阶段中标识自己已经完成并且将参与者数量减1,未到达的参与者也减1
  30. UNSAFE.compareAndSwapLong(this, stateOffset,//将当前的state,case自旋,置为EMPTY
  31. s, s | EMPTY);
  32. }
  33. else
  34. phase = parent.doArrive(ONE_ARRIVAL);//当前线程不是最后一个完成的线程,将未到达的参与者数量减1即可
  35. }
  36. return phase;
  37. }
  38. }
  39. }

4.留言

? 到了这里,其实AQS的源码基本已经覆盖了,对于AQS的源码也应该有了清楚的认知。总结就是:一个volatile 的state变量,两个等待队列(竞争队列,条件队列),通过cas的方式保证单变量的原子性。后续将会对Exchanger以及Phaser进行源码解析,到此基本AQS已经到了一个段落了。后续观看源码时,请注意多考虑一下多线程并发时可能出现的情况,去理解doug lea写代码的思路。

原文链接:https://www.cnblogs.com/selfsalvation/p/17492237.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号