经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
Java - ReentrantLock锁分析
来源:cnblogs  作者:M`sakura~  时间:2023/8/29 8:46:56  对本文有异议

Java - JUC核心类AbstractQueuedSynchronizer(AQS)底层实现


 

 一.  AQS内部结构介绍

JUC是Java中一个包   java.util.concurrent 。在这个包下,基本存放了Java中一些有关并发的类,包括并发工具,并发集合,锁等。

AQS(抽象队列同步器)是JUC下的一个基础类,大多数的并发工具都是基于AQS实现的。

AQS本质并没有实现太多的业务功能,只是对外提供了三点核心内容,来帮助实现其他的并发内容。

三点核心内容:

  • int state
    • 比如ReentrantLock或者ReentrantReadWriteLock, 它们获取锁的方式,都是对state变量做修改实现的。
    • 比如CountDownLatch基于state作为计数器,同样的Semaphore也是用state记录资源个数。
  • Node对象组成的双向链表(AQS中)
    • 比如ReentrantLock,有一个线程没有拿到锁资源,当线程需要等待,则需要将线程封装为Node对象,将Node添加到双向链表,将线程挂起,等待即可。
  • Node对象组成的单向链表(AQS中的ConditionObject类中)
    • 比如ReentrantLock,一个线程持有锁资源时,执行了await方法(类比synchronized锁执行对象的wait方法),此时这个线程需要封装为Node对象,并添加到单向链表。

二.  Lock锁和AQS关系

ReentrantLock就是基于AQS实现的。ReentrantLock类中维护这个一个内部抽象类Sync,他继承了AQS类。ReentrantLock的lock和unlock方法就是调用的Sync的方法。

AQS流程(简述)
1. 当new了一个ReentrantLock时,AQS默认state值为0, head 和 tail 都为null;
2. A线程执行lock方法,获取锁资源。
3. A线程将state通过cas操作从0改为1,代表获取锁资源成功。
4. B线程要获取锁资源时,锁资源被A线程持有。
5. B线程获取锁资源失败,需要添加到双向链表中排队。
6. 挂起B线程,等待A线程释放锁资源,再唤醒挂起的B线程。
7. A线程释放锁资源,将state从1改为0,再唤醒head.next节点。
8. B线程就可以重新尝试获取锁资源。
注: 修改AQS双向链表时要保证一个私有属性变化和两个共有属性变化,只需要让tail变化保证原子性即可。不能先改tail(会破坏双向链表)

三.  AQS - Lock锁的tryAcquire方法

ReentrantLock中的lock方法实际是执行的Sync的lock方法。

Sync是一个抽象类,继承了AQS

Sync有两个子类实现:

  • FairSync: 公平锁
  • NonFairSync: 非公平锁

Sync的lock方法实现:

  1. // 非公平锁
  2. final void lock() {
  3. // CAS操作,尝试将state从0改为1
  4. // 成功就拿到锁资源, 失败执行acquire方法
  5. if (compareAndSetState(0, 1))
  6.      // 成功就设置互斥锁的为当前线程拥有
  7. setExclusiveOwnerThread(Thread.currentThread());
  8. else
  9. acquire(1);
  10. }
  11.  
  12. // 公平锁
  13. final void lock() {
  14. acquire(1);
  15. }

如果CAS操作没有成功,需要执行acquire方法走后续

acquire方法是AQS提供的,公平和非公平都是走的这个方法

  1. public final void acquire(int arg) {
  2. // 1. tryAcquire方法: 再次尝试拿锁
  3. // 2. addWaiter方法: 没有获取到锁资源,去排队
  4. // 3. acquireQueued方法:挂起线程和后续被唤醒继续获取锁资源的逻辑
  5. if (!tryAcquire(arg) &&
  6. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  7.      // 如果这个过程中出现中断,在整个过程结束后再自我中断 
  8. selfInterrupt();
  9. }

在AQS中tryAcquire是没有具体实现逻辑的,AQS直接在tryAcquire方法中抛出异常

在公平锁和非公平锁中有自己的实现。

  • 非公平锁tryAcquire方法
  1. // 非公平锁
  2. protected final boolean tryAcquire(int acquires) {
  3. return nonfairTryAcquire(acquires);
  4. }
  5.  
  6. // 非公平锁再次尝试拿锁 (注:该方法属于Sync类中)
  7. final boolean nonfairTryAcquire(int acquires) {
  8. // 获取当前线程对象
  9. final Thread current = Thread.currentThread();
  10. // 获取state状态
  11. int c = getState();
  12. // state是不是没有线程持有锁资源,可以尝试获取锁
  13. if (c == 0) {
  14. // 再次CAS操作尝试修改state状态从0改为1
  15. if (compareAndSetState(0, acquires)) {
  16. // 成功就设置互斥锁的为当前线程拥有
  17. setExclusiveOwnerThread(current);
  18. return true;
  19. }
  20. }
  21. // 锁资源是否被当前线程所持有 (可重入锁)
  22. else if (current == getExclusiveOwnerThread()) {
  23. // 持有锁资源为当前, 则对state + 1
  24. int nextc = c + acquires;
  25. // 健壮性判断
  26. if (nextc < 0) // overflow
  27. // 超过最大锁重入次数会抛异常(几率很小,理论上存在)
  28. throw new Error("Maximum lock count exceeded");
  29. // 设置state状态,代表锁重入成功
  30. setState(nextc);
  31. return true;
  32. }
  33. return false;
  34. }
  • 公平锁tryAcquire方法
  1. // 公平锁
  2. protected final boolean tryAcquire(int acquires) {
  3. // 获取当前线程对象
  4. final Thread current = Thread.currentThread();
  5. // 获取state状态
  6. int c = getState();
  7. // state是不是没有线程持有锁资源
  8. if (c == 0) {
  9. // 当前锁资源没有被其他线程持有
  10. // hasQueuedPredecessors方法: 锁资源没有被持有,进入队列排队
  11. // 排队规则:
  12. // 1. 检查队列没有线程排队,抢锁。
  13. // 2. 检查队列有线程排队,查看当前线程是否排在第一位,如果是抢锁,否则入队列(注:该方法只是判断,没有真正入队列)
  14. if (!hasQueuedPredecessors() &&
  15. compareAndSetState(0, acquires)) {
  16. // 再次CAS操作尝试, 成功就设置互斥锁的为当前线程拥有
  17. setExclusiveOwnerThread(current);
  18. return true;
  19. }
  20. }
  21. // 锁资源是否被当前线程所持有 (可重入锁)
  22. else if (current == getExclusiveOwnerThread()) {
  23. // 持有锁资源为当前, 则对state + 1
  24. int nextc = c + acquires;
  25. // 健壮性判断
  26. if (nextc < 0)
  27. // 超过最大锁重入次数会抛异常(几率很小,理论上存在)
  28. throw new Error("Maximum lock count exceeded");
  29. // 设置state状态,代表锁重入成功
  30. setState(nextc);
  31. return true;
  32. }
  33. return false;
  34. }

四.  AQS的addWaiter方法

 addWaiter方法,就是将当前线程封装为Node对象,并且插入到AQS的双向链表。

  1. // 线程入队列排队
  2. private Node addWaiter(Node mode) {
  3. // 将当前对象封装为Node对象
  4. // Node.EXCLUSIVE 表示互斥 Node.SHARED 表示共享
  5. Node node = new Node(Thread.currentThread(), mode);
  6. // 获取tail节点
  7. Node pred = tail;
  8. // 判断双向链表队列有没有初始化
  9. if (pred != null) {
  10. // 将当前线程封装的Node节点prev属性指向tail尾节点
  11. node.prev = pred;
  12. // 通过CAS操作设置当前线程封装的Node节点为尾节点
  13. if (compareAndSetTail(pred, node)) {
  14. // 成功则将上一个尾节点的next属性指向当前线程封装的Node节点
  15. pred.next = node;
  16. return node;
  17. }
  18. }
  19. // 没有初始化head 和 tail 都等于null
  20. // enq方法: 插入双向链表和初始化双向链表
  21. enq(node);
  22. // 完成节点插入
  23. return node;
  24. }
  25.  
  26. // 插入双向链表和初始化双向链表
  27. private Node enq(final Node node) {
  28. // 死循环
  29. for (;;) {
  30. // 获取当前tail节点
  31. Node t = tail;
  32. // 判断尾节点是否初始
  33. if (t == null) { // Must initialize
  34. // 通过CAS操作初始化初始化一个虚拟的Node节点,赋给head节点
  35. if (compareAndSetHead(new Node()))
  36. tail = head;
  37. } else {
  38. // 完成当前线程Node节点加入AQS双向链表的过程
  39. // 当前线程封装的Node的上一个prev属性指向tail节点
  40. // 流程: 1. prev(私有) ---> 2. tail(共有) ---> 3. next (共有)
  41. node.prev = t;
  42. // 通过CAS操作修改tail尾节点指向当前线程封装的Node
  43. if (compareAndSetTail(t, node)) {
  44. // 将当前线程封装的Node节点赋给上一个Node的下一个next属性
  45. t.next = node;
  46. return t;
  47. }
  48. }
  49. }
  50. }

五.  AQS的acquireQueued方法

acquireQueued方法主要就是线程挂起以及重新尝试获取锁资源的地方

重新获取锁资源主要有两种情况:

  • 上来就排在head.next,就回去尝试拿锁
  • 唤醒之后尝试拿锁
  1. // 当前线程Node添加到AQS队列后续操作
  2. final boolean acquireQueued(final Node node, int arg) {
  3. // 标记,记录拿锁状态 失败
  4. boolean failed = true;
  5. try {
  6. // 中断状态
  7. boolean interrupted = false;
  8. // 死循环
  9. for (;;) {
  10. // 获取当前节点的上一个节点 prev
  11. final Node p = node.predecessor();
  12. // 判断当前节点是否是head,是则代表当前节点排在第一位
  13. // 如果是第一位,执行tryAcquire方法尝试拿锁
  14. if (p == head && tryAcquire(arg)) {
  15. // 都成功,代表拿到锁资源
  16. // 将当前线程Node设置为head节点,同时将Node的thread 和 prev属性设置为null
  17. setHead(node);
  18. // 将上一个head的next属性设置为null,等待GC回收
  19. p.next = null; // help GC
  20. // 拿锁状态 成功
  21. failed = false;
  22. // 返回中断状态
  23. return interrupted;
  24. }
  25. // 没有获取到锁 --- 尝试挂起线程
  26. // shouldParkAfterFailedAcquire方法: 挂起线程前的准备
  27. // parkAndCheckInterrupt方法: 挂起当前线程
  28. if (shouldParkAfterFailedAcquire(p, node) &&
  29. parkAndCheckInterrupt())
  30. // 设置中断线程状态
  31. interrupted = true;
  32. }
  33. } finally {
  34. // 取消节点
  35. if (failed)
  36. cancelAcquire(node);
  37. }
  38. }
  39.  
  40. // 检查并更新无法获取锁节点的状态
  41. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  42. // 获取上一个节点的ws状态
  43. /**
  44. * SIGNAL(-1) 表示当前节点释放锁的时候,需要唤醒下一个节点。或者说后继节点在等待当前节点唤醒,后继节点入队时候,会将前驱节点更新给signal。
  45. * CANCELLED(1) 表示当前节点已取消调度。当timeout或者中断情况下,会触发变更为此状态,进入该状态后的节点不再变化。
  46. * CONDITION(-2) 当其他线程调用了condition的signal方法后,condition状态的节点会从等待队列转移到同步队列中,等待获取同步锁。
  47. * PROPAGATE(-3) 表示共享模式下,前驱节点不仅会唤醒其后继节点,同时也可能唤醒后继的后继节点。
  48. * 默认(0) 新节点入队时候的默认状态。
  49. */
  50. int ws = pred.waitStatus;
  51. // 判断上个节点ws状态是否是 -1, 是则挂起
  52. if (ws == Node.SIGNAL)
  53. return true;
  54. if (ws > 0) {
  55. /**
  56. * 判断上个节点是否是取消或者其他状态。
  57. * 向前找到不是取消状态的节点,修改ws状态。
  58. * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,
  59. * 稍后就会被GC回收,这个操作实际是把队列中的cancelled节点剔除掉。
  60. */
  61. do {
  62. node.prev = pred = pred.prev;
  63. } while (pred.waitStatus > 0);
  64. pred.next = node;
  65. } else {
  66. // 如果前驱节点正常,那就把上一个节点的状态通过CAS的方式设置成-1
  67. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  68. }
  69. return false;
  70. }
  71.  
  72. // 挂起当前线程
  73. private final boolean parkAndCheckInterrupt() {
  74. // 挂起当前线程
  75. LockSupport.park(this);
  76. // 返回中断标志
  77. return Thread.interrupted();
  78. }

六.  AQS的Lock锁的release方法

  1. // 互斥锁模式 解锁
  2. public final boolean release(int arg) {
  3. // 尝试是否可以解锁
  4. if (tryRelease(arg)) {
  5. Node h = head;
  6. // 判断双链表是否存在线程排队
  7. if (h != null && h.waitStatus != 0)
  8. // 唤醒后续线程
  9. unparkSuccessor(h);
  10. return true;
  11. }
  12. return false;
  13. }
  14.  
  15. // 尝试是否可以解锁
  16. protected final boolean tryRelease(int releases) {
  17. // 锁状态 = 状态 - 1
  18. int c = getState() - releases;
  19. // 判断锁是是否是当前线程持有
  20. if (Thread.currentThread() != getExclusiveOwnerThread())
  21. // 当前线程没有持有抛出异常
  22. throw new IllegalMonitorStateException();
  23. boolean free = false;
  24. // 当前锁状态变为0,则清空锁归属线程
  25. if (c == 0) {
  26. free = true;
  27. setExclusiveOwnerThread(null);
  28. }
  29. // 设置锁状态为0
  30. setState(c);
  31. return free;
  32. }
  33.  
  34. // 唤醒线程
  35. private void unparkSuccessor(Node node) {
  36. // 获取头节点的状态
  37. int ws = node.waitStatus;
  38. if (ws < 0)
  39. // 通过CAS将头节点的状态设置为初始状态
  40. compareAndSetWaitStatus(node, ws, 0);
  41. // 后继节点
  42. Node s = node.next;
  43. if (s == null || s.waitStatus > 0) {
  44. s = null;
  45. // 从尾节点开始往前遍历,寻找离头节点最近的等待状态正常的节点
  46. for (Node t = tail; t != null && t != node; t = t.prev)
  47. if (t.waitStatus <= 0)
  48. s = t;
  49. }
  50. if (s != null)
  51. // 真正的唤醒操作
  52. LockSupport.unpark(s.thread);
  53. }

 

以上仅供参考!!

原文链接:https://www.cnblogs.com/M-sakura/p/17602167.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号