经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » C# » 查看文章
C#中使用CAS实现无锁算法
来源:cnblogs  作者:黑洞视界  时间:2023/4/21 8:58:46  对本文有异议

CAS 的基本概念

CAS(Compare-and-Swap)是一种多线程并发编程中常用的原子操作,用于实现多线程间的同步和互斥访问。 它操作通常包含三个参数:一个内存地址(通常是一个共享变量的地址)、期望的旧值和新值。

  1. CompareAndSwap(内存地址,期望的旧值,新值)

CAS 操作会比较内存地址处的值与期望的旧值是否相等,如果相等,则将新值写入该内存地址; 如果不相等,则不进行任何操作。这个比较和交换的操作是一个原子操作,不会被其他线程中断。

CAS 通常是通过硬件层面的CPU指令实现的,其原子性是由硬件保证的。具体的实现方式根据环境会有所不同。

CAS 操作通常会有一个返回值,用于表示操作是否成功。返回结果可能是true或false,也可能是内存地址处的旧值。

相比于传统的锁机制,CAS 有一些优势:

  • 原子性:CAS 操作是原子的,不需要额外的锁来保证多线程环境下的数据一致性,避免了锁带来的性能开销和竞争条件。

  • 无阻塞:CAS 操作是无阻塞的,不会因为资源被锁定而导致线程的阻塞和上下文切换,提高了系统的并发性和可伸缩性。

  • 适用性:CAS 操作可以应用于广泛的数据结构和算法,如自旋锁、计数器、队列等,使得它在实际应用中具有较大的灵活性和适用性。

C# 中如何使用 CAS

在 C# 中,我们可以使用 Interlocked 类来实现 CAS 操作。

Interlocked 类提供了一组 CompareExchange 的重载方法,用于实现不同类型的数据的 CAS 操作。

  1. public static int CompareExchange(ref int location1, int value, int comparand);
  2. public static long CompareExchange(ref long location1, long value, long comparand);
  3. // ... 省略其他重载方法
  4. public static object CompareExchange(ref object location1, object value, object comparand);
  5. public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;

CompareExchange 方法将 location1 内存地址处的值与 comparand 比较,如果相等,则将 value 写入 location1 内存地址处,否则不进行任何操作。
该方法返回 location1 内存地址处的值。

通过判断方法返回值与 comparand 是否相等,我们就可以知道 CompareExchange 方法是否执行成功。

算法示例

在使用 CAS 实现无锁算法时,通常我们不光是为了比较和更新一个数据,还需要在更新成功后进行下一步的操作。结合 while(true) 循环,我们可以不断地尝试更新数据,直到更新成功为止。
伪代码如下:

  1. while (true)
  2. {
  3. // 读取数据
  4. oldValue = ...;
  5. // 计算新值
  6. newValue = ...;
  7. // CAS 更新数据
  8. result = CompareExchange(ref location, newValue, oldValue);
  9. // 判断 CAS 是否成功
  10. if (result == oldValue)
  11. {
  12. // CAS 成功,执行后续操作
  13. break;
  14. }
  15. }

在复杂的无锁算法中,因为每一步操作都是独立的,连续的操作并非原子,所以我们不光要借助 CAS,每一步操作前都应判断是否有其他线程已经修改了数据。

示例1:计数器

下面是一个简单的计数器类,它使用 CAS 实现了一个线程安全的自增操作。

  1. public class Counter
  2. {
  3. private int _value;
  4. public int Increment()
  5. {
  6. while (true)
  7. {
  8. int oldValue = _value;
  9. int newValue = oldValue + 1;
  10. int result = Interlocked.CompareExchange(ref _value, newValue, oldValue);
  11. if (result == oldValue)
  12. {
  13. return newValue;
  14. }
  15. }
  16. }
  17. }

CLR 底层源码中,我们也会经常看到这样的代码,比如 ThreadPool 增加线程时的计数器。
https://github.com/dotnet/runtime/blob/release/6.0/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs#L446

  1. internal void EnsureThreadRequested()
  2. {
  3. //
  4. // If we have not yet requested #procs threads, then request a new thread.
  5. //
  6. // CoreCLR: Note that there is a separate count in the VM which has already been incremented
  7. // by the VM by the time we reach this point.
  8. //
  9. int count = _separated.numOutstandingThreadRequests;
  10. while (count < Environment.ProcessorCount)
  11. {
  12. int prev = Interlocked.CompareExchange(ref _separated.numOutstandingThreadRequests, count + 1, count);
  13. if (prev == count)
  14. {
  15. ThreadPool.RequestWorkerThread();
  16. break;
  17. }
  18. count = prev;
  19. }
  20. }

示例2:队列

下面是一个简单的队列类,它使用 CAS 实现了一个线程安全的入队和出队操作。相较于上面的计数器,这里的操作更加复杂,我们每一步都需要考虑是否有其他线程已经修改了数据。

这样的算法有点像薛定谔的猫,你不知道它是死是活,只有当你试图去观察它的时候,它才可能会变成死或者活。

  1. public class ConcurrentQueue<T>
  2. {
  3. // _head 和 _tail 是两个伪节点,_head._next 指向队列的第一个节点,_tail 指向队列的最后一个节点。
  4. // _head 和 _tail 会被多个线程修改和访问,所以要用 volatile 修饰。
  5. private volatile Node _head;
  6. private volatile Node _tail;
  7. public ConcurrentQueue()
  8. {
  9. _head = new Node(default);
  10. // _tail 指向 _head 时,队列为空。
  11. _tail = _head;
  12. }
  13. public void Enqueue(T item)
  14. {
  15. var node = new Node(item);
  16. while (true)
  17. {
  18. Node tail = _tail;
  19. Node next = tail._next;
  20. // 判断给 next 赋值的这段时间,是否有其他线程修改过 _tail
  21. if (tail == _tail)
  22. {
  23. // 如果 next 为 null,则说明从给 tail 赋值到给 next 赋值这段时间,没有其他线程修改过 tail._next,
  24. if (next == null)
  25. {
  26. // 如果 tail._next 为 null,则说明从给 tail 赋值到这里,没有其他线程修改过 tail._next,
  27. // tail 依旧是队列的最后一个节点,我们就可以直接将 node 赋值给 tail._next。
  28. if (Interlocked.CompareExchange(ref tail._next, node, null) == null)
  29. {
  30. // 如果_tail == tail,则说明从上一步 CAS 操作到这里,没有其他线程修改过 _tail,也就是没有其他线程执行过 Enqueue 操作。
  31. // 那么当前线程 Enqueue 的 node 就是队列的最后一个节点,我们就可以直接将 node 赋值给 _tail。
  32. Interlocked.CompareExchange(ref _tail, node, tail);
  33. break;
  34. }
  35. }
  36. // 如果 next 不为 null,则说明从给 tail 赋值到给 next 赋值这段时间,有其他线程修改过 tail._next,
  37. else
  38. {
  39. // 如果没有其他线程修改过 _tail,那么 next 就是队列的最后一个节点,我们就可以直接将 next 赋值给 _tail。
  40. Interlocked.CompareExchange(ref _tail, next, tail);
  41. }
  42. }
  43. }
  44. }
  45. public bool TryDequeue(out T item)
  46. {
  47. while (true)
  48. {
  49. Node head = _head;
  50. Node tail = _tail;
  51. Node next = head._next;
  52. // 判断 _head 是否被修改过
  53. // 如果没有被修改过,说明从给 head 赋值到给 next 赋值这段时间,没有其他线程执行过 Dequeue 操作。
  54. if (head == _head)
  55. {
  56. // 如果 head == tail,说明队列为空
  57. if (head == tail)
  58. {
  59. // 虽然上面已经判断过队列是否为空,但是在这里再判断一次
  60. // 是为了防止在给 tail 赋值到给 next 赋值这段时间,有其他线程执行过 Enqueue 操作。
  61. if (next == null)
  62. {
  63. item = default;
  64. return false;
  65. }
  66. // 如果 next 不为 null,则说明从给 tail 赋值到给 next 赋值这段时间,有其他线程修改过 tail._next,也就是有其他线程执行过 Enqueue 操作。
  67. // 那么 next 就可能是队列的最后一个节点,我们尝试将 next 赋值给 _tail。
  68. Interlocked.CompareExchange(ref _tail, next, tail);
  69. }
  70. // 如果 head != tail,说明队列不为空
  71. else
  72. {
  73. item = next._item;
  74. if (Interlocked.CompareExchange(ref _head, next, head) == head)
  75. {
  76. // 如果 _head 没有被修改过
  77. // 说明从给 head 赋值到这里,没有其他线程执行过 Dequeue 操作,上面的 item 就是队列的第一个节点的值。
  78. // 我们就可以直接返回。
  79. break;
  80. }
  81. // 如果 _head 被修改过
  82. // 说明从给 head 赋值到这里,有其他线程执行过 Dequeue 操作,上面的 item 就不是队列的第一个节点的值。
  83. // 我们就需要重新执行 Dequeue 操作。
  84. }
  85. }
  86. }
  87. return true;
  88. }
  89. private class Node
  90. {
  91. public readonly T _item;
  92. public Node _next;
  93. public Node(T item)
  94. {
  95. _item = item;
  96. }
  97. }
  98. }

我们可以通过以下代码来进行测试

  1. using System.Collections.Concurrent;
  2. var queue = new ConcurrentQueue<int>();
  3. var results = new ConcurrentBag<int>();
  4. int dequeueRetryCount = 0;
  5. var enqueueTask = Task.Run(() =>
  6. {
  7. // 确保 Enqueue 前 dequeueTask 已经开始运行
  8. Thread.Sleep(10);
  9. Console.WriteLine("Enqueue start");
  10. Parallel.For(0, 100000, i => queue.Enqueue(i));
  11. Console.WriteLine("Enqueue done");
  12. });
  13. var dequeueTask = Task.Run(() =>
  14. {
  15. Thread.Sleep(10);
  16. Console.WriteLine("Dequeue start");
  17. Parallel.For(0, 100000, i =>
  18. {
  19. while (true)
  20. {
  21. if (queue.TryDequeue(out int result))
  22. {
  23. results.Add(result);
  24. break;
  25. }
  26. Interlocked.Increment(ref dequeueRetryCount);
  27. }
  28. });
  29. Console.WriteLine("Dequeue done");
  30. });
  31. await Task.WhenAll(enqueueTask, dequeueTask);
  32. Console.WriteLine(
  33. $"Enqueue and dequeue done, total data count: {results.Count}, dequeue retry count: {dequeueRetryCount}");
  34. var hashSet = results.ToHashSet();
  35. for (int i = 0; i < 100000; i++)
  36. {
  37. if (!hashSet.Contains(i))
  38. {
  39. Console.WriteLine("Error, missing " + i);
  40. break;
  41. }
  42. }
  43. Console.WriteLine("Done");

输出结果:

  1. Dequeue start
  2. Enqueue start
  3. Enqueue done
  4. Dequeue done
  5. Enqueue and dequeue done, total data count: 100000, dequeue retry count: 10586
  6. Done

上述的 retry count 为 797,说明在 100000 次的 Dequeue 操作中,有 10586 次的 Dequeue 操作需要重试,那是因为在 Dequeue 操作中,可能暂时没有数据可供 Dequeue,需要等待其他线程执行 Enqueue 操作。

当然这个 retry count 是不稳定的,因为在多线程环境下,每次执行的结果都可能不一样。

总结

CAS 操作是一种乐观锁,它假设没有其他线程修改过数据,如果没有修改过,那么就直接修改数据,如果修改过,那么就重新获取数据,再次尝试修改。

在借助 CAS 实现较为复杂的数据结构时,我们不光要依靠 CAS 操作,还需要注意每次操作的数据是否被其他线程修改过,考虑各个可能的分支,以及在不同的分支中,如何处理数据。

欢迎关注个人技术公众号

原文链接:https://www.cnblogs.com/eventhorizon/p/17338890.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号