经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » C# » 查看文章
C# 线程同步查漏补缺
来源:cnblogs  作者:鹅群中的鸭霸  时间:2023/2/3 8:42:43  对本文有异议

同步构造

当线程 A 在等待一个同步构造,另一个线程 B 持有构造一直不释放,那么就会导致线程 A 阻塞。同步构造有用户模式构造和内核模式构造。

  • 用户模式构造通过 CPU 指令来协调线程,所以速度很快。也意味着不受操作系统控制,所以等待构造的线程会不停自旋,浪费 CPU 时间。
  • 内核模式构造通过操作系统来协调线程。内核构造在获取时,需要先转换成本机代码,在转换成内核代码,返回时则需要反方向再转换一次,所以速度会比用户构造慢很多。
    因为使用了操作系统来协调线程,所以也有了更加强大的功能。
    1. 不同线程在竞争一个资源时,操作系统会阻塞线程,所以不会自旋。
    2. 可以实现托管线程和本机线程的同步。
    3. 可以跨进程跨 domain 同步。
    4. 可以利用 WaitHandle 类的方法实现多个构造的同步或者超时机制。

活锁和死锁:
当线程获取不到资源,从而不停在 CPU 上自旋等待资源,就会形成活锁。这是通过用户构造实现的。
当线程获取不到资源,被操作系统阻塞,就会形成死锁。这是通过内核构造实现的。

用户模式构造

.Net 提供了两种用户构造,易变构造 Volatile、互锁构造 Interlocked,这两种构造都提供了原子性读写的功能。
.Net 提供了基于易变构造、互锁构造、SpinWait 实现的自旋锁 SpinLock。

原子性读写:
在 32 位 CPU 中,CPU 一次只能存储 32 位的数据,所以如果是 64 位的数据类型(如 double),就得执行两次 MOV 指令,所以在 32 位 CPU 和 32 位操作系统中,不同线程对 64 位的数据类型进行读写可能得到不同的结果。原子性读写就是保证了即使是 64 位的数据类型,不同线程读写也会得到相同的结果。现在的 CPU 和操作系统基本都是 64 位的,所以一般也不会遇到这种问题。

易变构造 Volatile 和 volatile 关键字

Volatile 一般用于阻止编译器代码优化,编译器优化代码会优化掉一些在单线程情况下无用的变量或者语句,在多线程代码下有时候会导致程序运行结果跟设计的不一样。
Volatile.Read() 强制对变量的取值必须在调用时读取,Volatile.Write() 强制对变量的赋值必须在调用时写入。

  1. /// <summary>
  2. /// 在 debug 模式下不开启代码优化,所以需要用 release 模式下生成。
  3. /// 执行 dotnet build -c release --no-incremental 后运行代码,如果没有标记为易变,则不会打印 x。
  4. /// </summary>
  5. public void Test2()
  6. {
  7. var switchTrue = false;
  8. var t = new Thread(() =>
  9. {
  10. var x = 0;
  11. while (!switchTrue) // 如果没有标记变量为易变,编译器会把 while(!switchTrue) 优化为 while(true) 从而导致永远不会打印出 x 的值
  12. //while (!Volatile.Read(ref switchTrue)) // 标记为易变,可以保证在调用时才进行取值,不会进行代码优化。
  13. {
  14. x++;
  15. }
  16. Console.WriteLine($"x: {x}");
  17. });
  18. t.IsBackground = true;
  19. t.Start();
  20. Thread.Sleep(100);
  21. switchTrue = true;
  22. Console.WriteLine("ok");
  23. }

互锁构造 Interlocked

  1. Interlocked 除了保证原子性读写外,还提供了很多方便的方法,在调用的地方建立了内存屏障,所以可以用来实现各种锁。
  1. /// <summary>
  2. /// 用 Interlocked 实现一个简单的自旋锁
  3. /// 注意:
  4. /// 1. 自旋锁在获取不到锁的时候,会进行空转。所以在自旋的时候,会占用 CPU,所以一般不在单 CPU 机器上用。
  5. /// 2. 当占有锁的线程优先级比获取锁的线程更低的时候,会导致占有锁的线程一直获取不到CPU进行工作,从而无法释放锁,导致活锁。
  6. /// 所以使用自旋锁的线程,应该禁用线程优先级提升功能。
  7. /// </summary>
  8. public class SimpleSpinLock
  9. {
  10. private int _count;
  11. public void Enter()
  12. {
  13. while (true)
  14. {
  15. if (Interlocked.Exchange(ref _count, 1) == 0)
  16. {
  17. return;
  18. }
  19. }
  20. }
  21. public void Exit()
  22. {
  23. Volatile.Write(ref _count, 0);
  24. }
  25. }
  1. Interlocked 也经常用来实现单例模式。实现单例模式经常用 lock 关键字和双检索模式的,但我都是用 Interlocked 或者 Lazy,因为更轻量代码也简单。
  1. /// <summary>
  2. /// 使用 Interlocked 实现的单例,轻量且简单。
  3. /// 可能会同时调用多次构造函数,所以适合构造函数没有副作用的类
  4. /// </summary>
  5. internal class DoubleCheckLocking3
  6. {
  7. private static DoubleCheckLocking3? _value;
  8. private DoubleCheckLocking3()
  9. {
  10. }
  11. private DoubleCheckLocking3 GetInstance()
  12. {
  13. if (_value != null) return _value;
  14. Interlocked.CompareExchange(ref _value, new DoubleCheckLocking3(), null);
  15. return _value;
  16. }
  17. }
  18. /// <summary>
  19. /// 使用 lock 和双检索实现的单例化
  20. /// </summary>
  21. internal class DoubleCheckLocking
  22. {
  23. private static DoubleCheckLocking? _value;
  24. private static readonly object _lock = new();
  25. private DoubleCheckLocking()
  26. {
  27. }
  28. public static DoubleCheckLocking GetInstance()
  29. {
  30. if (_value != null) return _value;
  31. lock (_lock)
  32. {
  33. if (_value == null)
  34. {
  35. var t = new DoubleCheckLocking();
  36. Volatile.Write(ref _value, t);
  37. }
  38. }
  39. return _value;
  40. }
  41. }

自旋锁 SpinLock

.Net 提供了一个轻量化的同步构造 SpinLock,很适合在不常发生竞争的场景使用。如果发生竞争了,会先在 CPU 上自旋一段时间,如果还不能获取到资源,就会让出 CPU 控制权给其他线程(使用 SpinWait 实现的)。

  1. SpinLock 不支持重入锁,当给构造函数 SpinLock(bool) 传入 true 时,重入锁会抛出异常,否则就会死锁。

重入锁(Re-Enter): 就是一个线程调用了 SpinLock.Enter() 后,没有调用 SpinLock.Exit(),再次调用了 SpinLock.Enter()。

  1. /// <summary>
  2. /// 测试 SpinLock 重入锁
  3. /// </summary>
  4. public void Test3()
  5. {
  6. var spinLock = new SpinLock(true); // 如果传 true,如果 SpinLock 重入锁,就会抛出异常,传 false 则不会,只会死锁。
  7. ThreadPool.QueueUserWorkItem(_ => DoWork());
  8. void DoWork()
  9. {
  10. var lockTaken = false;
  11. for (int i = 0; i < 10; i++)
  12. {
  13. try
  14. {
  15. Thread.Sleep(100);
  16. if (!spinLock.IsHeldByCurrentThread) // SpinLock.IsHeldByCurrentThread 可以判断是不是当前线程拥有锁,如果是就不再获取锁
  17. {
  18. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 获取锁,i 为 {i}");
  19. spinLock.Enter(ref lockTaken);
  20. }
  21. //spinLock.Enter(ref lockTaken); // 重入锁会死锁
  22. }
  23. catch (Exception e)
  24. {
  25. Console.WriteLine(e);
  26. }
  27. }
  28. if (lockTaken) // 使用 lockTaken 来判断锁是否已经被持有
  29. {
  30. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 释放锁");
  31. spinLock.Exit();
  32. }
  33. Console.WriteLine("结束");
  34. }
  35. }
  1. SpinLock 是 Struct 类型的,所以注意装箱拆箱。
  1. /// <summary>
  2. /// 测试装箱拆箱问题
  3. /// </summary>
  4. public void Test4()
  5. {
  6. var spinLock = new SpinLock(false);
  7. Task.Run(() => DoWork(ref spinLock));
  8. Task.Run(() => DoWork(ref spinLock));
  9. // SpinLock 是 Struct 类型,要注意装箱拆箱的问题,试试看不加 ref 关键字的效果
  10. void DoWork(ref SpinLock spinLock)
  11. {
  12. var lockTaken = false;
  13. Thread.Sleep(500);
  14. spinLock.Enter(ref lockTaken);
  15. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 获取锁");
  16. }
  17. }

内核模式构造

WaitHandle

.Net 提供了 System.Threading.WaitHandle 和 WaitHandle 的子类来支持内核构造,WaitHandle 封装内核同步构造的句柄,并且提供了操作的方法,并且每个方法都会在调用处建立内存屏障。

WaitHandle 有以下实现类,这些类定义了一个信号机制,根据信号去释放线程或者阻塞线程,用于在多线程的场景下访问共享资源:
WaitHandle:抽象基类,封装了系统内核构造的句柄。继承自 MarshalByRefObject,所以可以跨进程和 domain 边界。

  • EventWaitHandle:事件构造。由内核维护了一个 bool 变量,为 false 阻塞线程,为 true 时释放线程。
    • AutoResetEvent:自动重置事件构造。调用 AutoResetEvent.Set() 每次只释放一个阻塞线程。
    • ManualResetEvent:手动重置事件构造。调用 ManualResetEvent.Set() 会释放所有阻塞线程,并且不会有阻塞线程的功能,需要调用 ManualResetEvent.ReSet() 才能再次阻塞线程。
  • Semaphore:信号量。由内核维护了一个 Int32 变量,为当值为 0 时,阻塞线程,调用 Semaphore.Release() 会把变量加 1,调用 WaitHandle.WaitOne() 会把变量减 1。
  • Mutex:互斥体。功能跟 Semaphore(1) 和 AutoResetEvent 类似,一次只能释放一个线程。

WaitHandle 有以下常用方法:

  • WaitHandle.WaitOne() 虚方法,等待一个同步构造。
  • WaitHandle.WaitAll() 等待一组同步构造全部解除阻塞。
  • WaitHandle.WaitAny() 等待一组同步构造中的一个解除阻塞。
  • WaitHandle.SignalAndWait(WaitHandle x, WaitHandle y) 传入两个同步构造,解除第一个构造的阻塞,等待第二个构造。
  1. public class WaitHandleDemo
  2. {
  3. /// <summary>
  4. /// 测试 WaitHandle.WaitAll(), 成功运行返回 true, 支持超时,当超时时,返回 false
  5. /// WaitHandle.WaitAny(), 成功运行返回对应的 索引,支持超时,当超时时,返回 WaitHandle.WaitTimeout
  6. /// </summary>
  7. public void Test()
  8. {
  9. var waitHandleList = new WaitHandle[] { new AutoResetEvent(false), new AutoResetEvent(false) };
  10. ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[0]);
  11. ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[1]);
  12. var timeout = WaitHandle.WaitAll(waitHandleList);
  13. Console.WriteLine($"是否超时:{!timeout},WaitHandle.WaitAll() 结束");
  14. Thread.Sleep(500);
  15. ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[0]);
  16. ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[1]);
  17. timeout = WaitHandle.WaitAll(waitHandleList,1000);
  18. Console.WriteLine($"是否超时:{!timeout},WaitHandle.WaitAll() 结束");
  19. ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[0]);
  20. ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[1]);
  21. var index = WaitHandle.WaitAny(waitHandleList);
  22. Console.WriteLine($"{index} 已经结束运行,WaitHandle.WaitAny() 结束");
  23. ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[0]);
  24. ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[1]);
  25. index = WaitHandle.WaitAny(waitHandleList, 1000);
  26. Console.WriteLine($"是否超时:{WaitHandle.WaitTimeout == index},WaitHandle.WaitAny() 结束");
  27. void DoWork(object? state)
  28. {
  29. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 开始");
  30. var r = new Random();
  31. var interval = 1000 * r.Next(2, 10);
  32. Thread.Sleep(interval);
  33. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 结束");
  34. ((AutoResetEvent)state).Set();
  35. }
  36. }
  37. /// <summary>
  38. /// 测试 WaitHandle.SignalAndWait(), 成功运行返回 true, 支持超时,当超时时,返回 false
  39. /// </summary>
  40. public void Test2()
  41. {
  42. var are = new AutoResetEvent(false);
  43. var are2 = new AutoResetEvent(false);
  44. foreach (var i in Enumerable.Range(1,5))
  45. {
  46. Console.WriteLine($"按下 Enter 启动线程 {i}");
  47. Console.ReadLine();
  48. var t = new Thread(DoWork)
  49. {
  50. Name = $"线程 {i}"
  51. };
  52. t.Start();
  53. WaitHandle.SignalAndWait(are, are2); // 给 are 发信号,同时等待 are2
  54. }
  55. Console.WriteLine("全部线程运行结束");
  56. void DoWork()
  57. {
  58. are.WaitOne();
  59. Console.WriteLine($"{Thread.CurrentThread.Name} 开始");
  60. Thread.Sleep(1000);
  61. Console.WriteLine($"{Thread.CurrentThread.Name} 结束");
  62. are2.Set();
  63. }
  64. }
  65. }

EventWaitHandle、ManualResetEvent、AutoResetEvent、ManualResetEventSlim

EventWaitHandle、ManualResetEvent、AutoResetEvent 是内核同步构造,EventWaitHandle 由内核维护了一个 bool 变量,为 false 的时候阻塞线程,为 true 的时候释放线程。ManualResetEvent、AutoResetEvent 继承自 EventWaitHandle,所以拥有一样的行为,同时可以跨进程跨 domain 通信。
ManualResetEventSlim 并不继承自 EventWaitHandle,只是功能跟 ManualResetEvent、AutoResetEvent 一样的混合同步构造,使用用户构造和内核构造混合实现,遇到竞争的情况,会先自旋一下,还无法获取到资源,再使用内核构造阻塞线程,所以有更好的性能。

  1. EventWaitHandle 一般在构造函数中传入 name,用来跨进程或者跨 domain 通信。
  1. /// <summary>
  2. /// 测试 EventWaitHandle 跟其他线程通信
  3. /// </summary>
  4. public void Test2()
  5. {
  6. EventWaitHandle ewh;
  7. if (EventWaitHandle.TryOpenExisting("multi-process", out ewh))
  8. {
  9. Console.WriteLine("等待 EventWaitHandle");
  10. ewh.WaitOne();
  11. Console.WriteLine("结束运行");
  12. }
  13. else
  14. {
  15. ewh = new EventWaitHandle(false, EventResetMode.AutoReset, "multi-process");
  16. while (true)
  17. {
  18. Console.WriteLine("按下 Enter 跟其他线程通讯");
  19. Console.ReadLine();
  20. ewh.Set();
  21. }
  22. }
  23. }
  1. ManualResetEvent 调用完 ManualResetEvent.Set() 后会释放所有阻塞线程,如果需要再次阻塞线程,需要调用 ManualResetEvent.Reset()。
  1. /// <summary>
  2. /// 测试 ManualResetEvent.Set() 和 ManualResetEvent.Reset()
  3. /// </summary>
  4. public void Test1()
  5. {
  6. var mre = new ManualResetEvent(false);
  7. foreach (var i in Enumerable.Range(1, 3))
  8. {
  9. StartThread(i);
  10. }
  11. Thread.Sleep(500);
  12. Console.WriteLine("按下 Enter 调用 Set(),释放所有线程");
  13. Console.ReadLine();
  14. mre.Set();
  15. Thread.Sleep(500);
  16. Console.WriteLine("ManualResetEvent 内部值为 true 时,不会阻塞线程。按下 Enter 启动一个新线程进行测试");
  17. Console.ReadLine();
  18. StartThread(4);
  19. Thread.Sleep(500);
  20. Console.WriteLine("按下 Enter 调用 Reset(),可以再次阻塞线程");
  21. Console.ReadLine();
  22. mre.Reset();
  23. Thread.Sleep(500);
  24. foreach (var i in Enumerable.Range(5, 2))
  25. {
  26. StartThread(i);
  27. }
  28. Thread.Sleep(500);
  29. Console.WriteLine("按下 Enter 调用 Set(),释放所有线程,结束 demo");
  30. Console.ReadLine();
  31. mre.Set();
  32. Thread.Sleep(500);
  33. void StartThread(int i)
  34. {
  35. var t = new Thread(() =>
  36. {
  37. Console.WriteLine($"{Thread.CurrentThread.Name} 启动并调用 WaitOne()");
  38. mre.WaitOne();
  39. Console.WriteLine($"{Thread.CurrentThread.Name} 结束运行");
  40. })
  41. {
  42. Name = $"线程_{i}"
  43. };
  44. t.Start();
  45. }
  46. }
  1. AutoResetEvent 每次调用 AutoResetEvent.Set() 都只会释放一个阻塞的线程。
  1. public void Test()
  2. {
  3. var are = new AutoResetEvent(false);
  4. Task.Run(() =>
  5. {
  6. for (int i = 0; i < 5; i++)
  7. {
  8. Thread.Sleep(500);
  9. Console.WriteLine("按下 Enter 释放一个线程");
  10. Console.ReadLine();
  11. are.Set();
  12. }
  13. });
  14. foreach (var i in Enumerable.Range(1,5))
  15. {
  16. var t = new Thread(DoWork);
  17. t.Name = $"线程 {i}";
  18. t.Start();
  19. }
  20. void DoWork()
  21. {
  22. Console.WriteLine($"{Thread.CurrentThread.Name} 开始");
  23. are.WaitOne();
  24. Console.WriteLine($"{Thread.CurrentThread.Name} 结束");
  25. }
  26. }

Semaphore、SemaphoreSlim

Semaphore 是一个内核构造,由内核维护了一个 Int32 变量,为当值为 0 时,阻塞线程,调用 Semaphore.Release() 会把变量加 1,调用 WaitHandle.WaitOne() 会把变量减 1。
SemaphoreSlim 是一个混合构造,功能跟 Semaphore 一致,使用用户构造和内核构造混合实现,遇到竞争的情况,会先自旋一下,还无法获取到资源,再使用内核构造阻塞线程,所以有更好的性能。

  1. 使用 Semaphore 释放多个线程。
  1. /// <summary>
  2. /// 测试 Semaphore
  3. /// </summary>
  4. public void Test4()
  5. {
  6. var pool = new Semaphore(1, 3); // 初始化计数 1,最大计数 3
  7. foreach (var i in Enumerable.Range(1, 5))
  8. {
  9. var t = new Thread(DoWork);
  10. t.Name = $"线程 {i}";
  11. t.Start();
  12. }
  13. Thread.Sleep(500);
  14. Console.WriteLine("按下 Enter 释放 3 个线程");
  15. Console.ReadLine();
  16. pool.Release(3); // 计数加3
  17. Thread.Sleep(500);
  18. Console.WriteLine("再按下 Enter 释放 1 个线程");
  19. Console.ReadLine();
  20. pool.Release(); // 计数加1
  21. void DoWork()
  22. {
  23. Console.WriteLine($"{Thread.CurrentThread.Name} 开始");
  24. pool.WaitOne(); // 计数减1
  25. Console.WriteLine($"{Thread.CurrentThread.Name} 结束");
  26. }
  27. }
  1. Semaphore 继承自 WaitHandle,所以在构造函数中传入 name 可以跨进程跨 domain 同步。把 Semaphore 的最大计数设置为 1,可以实现跟 AutoResetEvent 一样每次只解除一个阻塞线程的行为。
  1. /// <summary>
  2. /// 测试跟其他进程通讯
  3. /// </summary>
  4. public void Test5()
  5. {
  6. Semaphore pool;
  7. if (Semaphore.TryOpenExisting("multi-process", out pool))
  8. {
  9. Console.WriteLine("等待 Semaphore");
  10. pool.WaitOne();
  11. Console.WriteLine("结束");
  12. }
  13. else
  14. {
  15. pool = new Semaphore(0, 1, "multi-process"); // 最大计数设置为 1,每次只解除一个阻塞。
  16. while (true)
  17. {
  18. Console.WriteLine("按下 Enter 跟其他线程通讯");
  19. Console.ReadLine();
  20. pool.Release();
  21. }
  22. }
  23. }

Mutex

Mutex 是一个内核构造,经常用于进程同步(如保证只有程序只能有一个进程)。功能跟 AutoResetEvent(false) 和 Semaphore(0,1) 类似,每次只能阻塞一个线程或者进程。
Mutex 跟 EventWaitHandle 和 Semaphore 不一样的地方是,Mutex 要求线程一致(也就是获取和释放都必须在同一个线程),并且支持重入锁。

  1. /// <summary>
  2. /// Mutex 支持重入锁,支持线程一致
  3. /// </summary>
  4. public void Test()
  5. {
  6. var mutex = new Mutex(false);
  7. var count = 0;
  8. DoWork(mutex);
  9. void DoWork(Mutex mutex)
  10. {
  11. try
  12. {
  13. mutex.WaitOne();
  14. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 获取 Mutex");
  15. Interlocked.Increment(ref count);
  16. Thread.Sleep(1000);
  17. if (Interlocked.CompareExchange(ref count, 3, 3) == 3)
  18. {
  19. return;
  20. }
  21. DoWork(mutex);
  22. }
  23. finally
  24. {
  25. mutex.ReleaseMutex(); // 调用几次 WaitOne() 就必须调用几次 ReleaseMutex(),并且调用 WaitOne() 和 ReleaseMutex() 必须在同一个线程。
  26. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 释放 Mutex");
  27. }
  28. }
  29. }

用户模式构造、内核模式构造性能对比

从上面可以看出,.Net 内核构造功能比用户构造强大得多,所以看起来似乎直接使用内核构造,而不使用用户模式构造更加明智。
但是用户构造会比内核构造快,所以在不常发生竞争或者性能敏感的场景下,使用用户构造会是一个更加优秀的做法。接下来用一个没有竞争的空方法测试一下快多少。

  1. internal class PerformanceDemo
  2. {
  3. /// <summary>
  4. /// 测试用户模式构造和内核模式构造,在锁没有发生竞争的情况下的性能差距
  5. /// </summary>
  6. public void Test()
  7. {
  8. var count = 1000 * 10000;
  9. var spinLock = new SpinLock(false);
  10. var are = new AutoResetEvent(true);
  11. var pool = new Semaphore(1, 1);
  12. var sw = Stopwatch.StartNew();
  13. foreach (var _ in Enumerable.Range(0, count))
  14. {
  15. var lockTaken = false;
  16. spinLock.Enter(ref lockTaken);
  17. DoWork();
  18. spinLock.Exit(lockTaken);
  19. }
  20. Console.WriteLine($"在没有竞争的场景下,执行一个空方法一千万次,SpinLock 耗时:{sw.ElapsedMilliseconds} ms");
  21. sw.Restart();
  22. foreach (var _ in Enumerable.Range(0, count))
  23. {
  24. are.WaitOne();
  25. DoWork();
  26. are.Set();
  27. }
  28. Console.WriteLine($"在没有竞争的场景下,执行一个空方法一千万次,AutoResetEvent 耗时:{sw.ElapsedMilliseconds} ms");
  29. sw.Restart();
  30. foreach (var _ in Enumerable.Range(0, count))
  31. {
  32. pool.WaitOne();
  33. DoWork();
  34. pool.Release();
  35. }
  36. Console.WriteLine($"在没有竞争的场景下,执行一个空方法一千万次,Semaphore 耗时:{sw.ElapsedMilliseconds} ms");
  37. // 空方法
  38. void DoWork()
  39. {
  40. }
  41. }
  42. }
  43. // 输出:
  44. // 在没有竞争的场景下,执行一个空方法一千万次,SpinLock 耗时:184 ms
  45. // 在没有竞争的场景下,执行一个空方法一千万次,AutoResetEvent 耗时:5449 ms
  46. // 在没有竞争的场景下,执行一个空方法一千万次,Semaphore 耗时:5366 ms

最终在我的机子上测试,在没有发生竞争的场景下,.NET 提供的用户构造性能是内核构造的 30 倍,所以性能差距还是非常大的。

混合构造

用户构造在遇到竞争,在长时间获取不到资源的场景,会一直在 CPU 上自旋,既浪费 CPU 时间,又耽误其他线程执行,内核构造在操作系统的协调下,会把获取不到资源的线程阻塞,不会浪费 CPU 时间。
内核构造在没有竞争的场景下,性能会比用户构造差几十倍。
混合构造就是组合用户构造和内核构造的实现,遇到竞争的时候,先使用用户构造自旋一下,自旋一段时间还没获取到资源,就使用内核构造阻塞线程,这样就能结合两种构造的优点了。
.Net 提供了 ManualResetEventSlim、SemaphoreSlim、Monitor、lock 关键字、ReaderWriterLockSlim、CountDownEvent、Barrier 等混合构造,可以在不同的场景下使用。

自定义一个简单的混合构造

通过这个例子可以了解一下是怎么组合内核构造和用户构造的。

  1. /// <summary>
  2. /// 一个简单的混合构造,组合 AutoResetEvent 和 Interlocked 实现
  3. /// </summary>
  4. internal class SimpleHybridLock : IDisposable
  5. {
  6. private int _waiter;
  7. private AutoResetEvent _waiterLock = new(false);
  8. public void Enter()
  9. {
  10. if (Interlocked.Increment(ref _waiter) == 1)
  11. {
  12. return;
  13. }
  14. _waiterLock.WaitOne();
  15. }
  16. public void Exit()
  17. {
  18. if (Interlocked.Decrement(ref _waiter) == 0)
  19. {
  20. return;
  21. }
  22. _waiterLock.Set();
  23. }
  24. public void Dispose()
  25. {
  26. _waiterLock.Dispose();
  27. }
  28. }

Monitor 和 lock 关键字

lock 关键字是最常使用的同步构造了,lock 可以锁定一个代码块,保证每次只有一个线程访问执行该代码块,lock 是基于 Montor 实现的,通过 try{...}finally{...} 把代码块包围起来。

  1. Monitor 是一个静态类,调用 Monitor.Enter(obj) 获取锁,调用 Monitor.Exit(obj) 释放。还可以在已经获取锁的线程上,调用 Monitor.Wait(obj) 释放锁,同时把线程放到等待队列,其他线程可以调用 Monitor.Pulse() 或 Monitor.PulseAll() 通知调用了 Monitor.Wait() 的线程继续获得锁。
    Monitor 支持重入锁,线程一致。
  1. /// <summary>
  2. /// 测试 Monitor.Wait(object)、Monitor.Pulse(object)、Monitor.PulseAll(object)
  3. /// 注意点:
  4. /// 调用 Wait()、Pulse()、PulseAll() 也必须先调用 Enter() 获取锁,退出的时候也必须调用 Exit() 释放锁
  5. /// </summary>
  6. public void Test()
  7. {
  8. var lockObj = new object();
  9. Task.Factory.StartNew(() =>
  10. {
  11. Thread.Sleep(500);
  12. Console.WriteLine("按下 c 调用 Monitor.Pulse(object)");
  13. if (Console.ReadKey().Key == ConsoleKey.C)
  14. {
  15. try
  16. {
  17. Monitor.Enter(lockObj);
  18. Monitor.Pulse(lockObj);
  19. }
  20. finally
  21. {
  22. Monitor.Exit(lockObj);
  23. }
  24. }
  25. Thread.Sleep(500);
  26. if (Console.ReadKey().Key == ConsoleKey.C)
  27. {
  28. try
  29. {
  30. Monitor.Enter(lockObj);
  31. Monitor.PulseAll(lockObj);
  32. }
  33. finally
  34. {
  35. Monitor.Exit(lockObj);
  36. }
  37. }
  38. });
  39. Parallel.Invoke(DoWork, DoWork, DoWork);
  40. void DoWork()
  41. {
  42. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 启动");
  43. try
  44. {
  45. Monitor.Enter(lockObj);
  46. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 获得 Monitor");
  47. Thread.Sleep(100);
  48. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 调用 Monitor.Wait()");
  49. Monitor.Wait(lockObj);
  50. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 重新获得 Monitor");
  51. }
  52. finally
  53. {
  54. Monitor.Exit(lockObj);
  55. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 释放 Monitor");
  56. }
  57. }
  58. }
  1. Monitor.Enter(object) 参数是一个 object 类型,代表可以传入任何类型的参数,所以就有一些细节需要注意。
  • Monitor.Enter(值类型),涉及到值类型传参,就必须注意装箱拆箱的问题。
  • Monitor.Enter(字符串),虽然字符串是引用类型,但是字符串会留用,所以锁定同一个字符串就会导致互斥。
  • 如果一个实例对象的方法使用了 lock(this),如果外部调用也 lock 这个实例方法,那么就会死锁,所以最佳做法是永远不要 lock(this)。
  1. /// <summary>
  2. /// 测试 Monitor.Enter(字符串)
  3. /// 因为字符串会被留用,所以会导致不同线程间互斥访问。
  4. /// </summary>
  5. public void Test2()
  6. {
  7. var mre = new ManualResetEventSlim(false);
  8. Task.Run(() =>
  9. {
  10. Console.WriteLine("按下 c 启动");
  11. if (Console.ReadKey().Key == ConsoleKey.C)
  12. {
  13. mre.Set();
  14. }
  15. });
  16. Parallel.Invoke(DoWork, DoWork, DoWork);
  17. void DoWork()
  18. {
  19. mre.Wait();
  20. try
  21. {
  22. Monitor.Enter("1");
  23. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 进入同步代码块");
  24. Thread.Sleep(1000);
  25. }
  26. finally
  27. {
  28. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 退出同步代码块");
  29. Monitor.Exit("1");
  30. }
  31. }
  32. }
  33. /// <summary>
  34. /// 测试 Monitor.Enter(值类型)
  35. /// 因为 Monitor.Enter(object) 参数是 object,所以值类型必须装箱,那样其实就会有问题了。
  36. /// 值类型在堆栈上,没有引用,引用类型在堆上,有引用,所以装箱就是在堆上新建一个实例,然后复制栈上值的内容,拆箱就是把堆上实例的值,复制到栈上。
  37. /// </summary>
  38. public void Test3()
  39. {
  40. var mre = new ManualResetEventSlim(false);
  41. var i = 1;
  42. //Object o = i;
  43. Task.Run(() =>
  44. {
  45. Console.WriteLine("按下 c 启动");
  46. if (Console.ReadKey().Key == ConsoleKey.C)
  47. {
  48. mre.Set();
  49. }
  50. });
  51. Parallel.Invoke(DoWork, DoWork, DoWork);
  52. void DoWork()
  53. {
  54. mre.Wait();
  55. object o = i;
  56. try
  57. {
  58. Monitor.Enter(o);
  59. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 进入同步代码块");
  60. Thread.Sleep(1000);
  61. }
  62. finally
  63. {
  64. Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 退出同步代码块");
  65. Monitor.Exit(o);
  66. }
  67. }
  68. }

CountdownEvent

CountdownEvent 是一个混合构造,经常用于 fork/join 等场景,就是等待多个并行任务完成,再执行下一个任务。CountdownEvent 内部会维护一个计数,当计数为 0 时,解除线程的阻塞。

  • 调用 CountdownEvent.Reset(int) 可以重新初始化 CountdownEvent。
  • 调用 Signal() Signal(int count) 把计数减 1 或减 count。
  • 调用 AddCount() AddCount(int) 把计数加 1 或加 count。
  1. public void Test2()
  2. {
  3. var queue = new ConcurrentQueue<int>(Enumerable.Range(1, 100));
  4. var cde = new CountdownEvent(queue.Count);
  5. var doWork = new Action(() =>
  6. {
  7. while (queue.TryDequeue(out var result))
  8. {
  9. Thread.Sleep(100);
  10. Console.WriteLine(result);
  11. cde.Signal();
  12. }
  13. });
  14. var _ = Task.Run(doWork); // fork
  15. var _2 = Task.Run(doWork); // fork
  16. var complete = new Action(() =>
  17. {
  18. cde.Wait(); // join
  19. Console.WriteLine($"queue Count {queue.Count}");
  20. });
  21. var t = Task.Run(complete);
  22. var t2 = Task.Run(complete);
  23. Task.WaitAll(t, t2);
  24. Console.WriteLine($"CountdownEvent 重新初始化");
  25. cde.Reset(2); // 调用 Reset() 将 cde 重新初始化
  26. cde.AddCount(10); // 调用 AddCount() cde 内部计数 + 1
  27. var cts = new CancellationTokenSource(1000); // 测试超时机制
  28. try
  29. {
  30. cde.Wait(cts.Token);
  31. }
  32. catch (Exception e)
  33. {
  34. Console.WriteLine(e);
  35. }
  36. cde.Dispose();
  37. }

Barrier

Barrier 是一个混合构造,可以通过 participantCount 来指定一个数值,同时会维护一个内部数值 total,每次调用 SignalAndWait() 的时候,阻塞调用线程,同时把total 加 1,等到 total == participantCount,调用 postPhaseAction,通过 postPhaseAction 来确定汇总每个线程的数据,并且执行下个阶段的工作。
Barrier 适合一种特殊场景,把一个大任务拆分成多个小任务,然后每个小任务又会分阶段执行。像是 Parallel 的 Plus 版,如果任务步骤很多,用 Parallel 来分拆很麻烦,可以考虑用 Barrier。

  1. public class BarrierDemo
  2. {
  3. public void Test()
  4. {
  5. var words = new string[] { "山", "飞", "千", "鸟", "绝" };
  6. var words2 = new string[] { "人", "灭", "径", "万", "踪" };
  7. var solution = "千山鸟飞绝,万径人踪灭";
  8. bool success = false;
  9. var barrier = new Barrier(2, b =>
  10. {
  11. var sb = new StringBuilder();
  12. sb.Append(string.Concat(words));
  13. sb.Append(',');
  14. sb.Append(string.Concat(words2));
  15. Console.WriteLine(sb.ToString());
  16. //Thread.Sleep(1000);
  17. if (string.CompareOrdinal(solution, sb.ToString()) == 0)
  18. {
  19. success = true;
  20. Console.WriteLine($"已完成");
  21. }
  22. Console.WriteLine($"当前阶段数:{b.CurrentPhaseNumber}");
  23. });
  24. var t = Task.Run(() => DoWork(words));
  25. var t2 = Task.Run(() => DoWork(words2));
  26. Console.ReadLine();
  27. void DoWork(string[] words)
  28. {
  29. while (!success)
  30. {
  31. var r = new Random();
  32. for (int i = 0; i < words.Length; i++)
  33. {
  34. var swapIndex = r.Next(i, words.Length);
  35. (words[swapIndex], words[i]) = (words[i], words[swapIndex]);
  36. }
  37. barrier.SignalAndWait();
  38. }
  39. }
  40. }
  41. }

ReaderWriterLockSlim

ReaderWriterLockSlim 是一个混合构造。一般场景中在读取数据的时候,不会涉及到数据的修改,所以可以并发读取,在修改数据的时候,才会涉及到数据的修改,所以应该互斥修改。其他同步构造无论读取还是修改数据都是锁定的,所以 .Net 提供了一个读写锁 ReaderWriterLockSlim。
ReaderWriterLockSlim 的逻辑如下:

  • 一个线程向数据写入时,请求访问的其他所有线程都阻塞。
  • 一个线程向数据读取时,请求读取的其他线程允许继续执行,但是请求写入的线程仍被阻塞。
  • 一个向数据写入的线程结束后,要么解除一个写入线程(writer)的阻塞,使它能向数据写入,要么解除所有读取线程(reader)的阻塞,使它们能够进行并发读取。如果没有线程被阻塞,则锁进入自由状态,可以被下一个 reader 或者 writer 线程获取。
  • 所有向数据读取的线程结束后,一个 writer 线程被解除阻塞,使它能向数据写入。如果没有线程被阻塞,则锁进入自由状态,可以被下一个reader 或者 writer 线程获取。
  1. /// <summary>
  2. /// ReaderWriterLockerSlim 用法
  3. /// </summary>
  4. internal class Transaction2
  5. {
  6. private DateTime _timeLastTrans;
  7. public DateTime TimeLastTrans
  8. {
  9. get
  10. {
  11. _lock.EnterReadLock();
  12. Thread.Sleep(1000);
  13. var t = _timeLastTrans;
  14. Console.WriteLine($"调用 ReadLock {Thread.CurrentThread.ManagedThreadId}");
  15. _lock.ExitReadLock();
  16. return t;
  17. }
  18. }
  19. private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.NoRecursion);
  20. public void PerformTransaction()
  21. {
  22. _lock.EnterWriteLock();
  23. _timeLastTrans = DateTime.Now;
  24. Console.WriteLine($"调用 WriteLock {Thread.CurrentThread.ManagedThreadId}");
  25. _lock.ExitWriteLock();
  26. }
  27. public void Test()
  28. {
  29. PerformTransaction();
  30. ThreadPool.QueueUserWorkItem(_ => Console.WriteLine(TimeLastTrans));
  31. PerformTransaction();
  32. Thread.Sleep(500); // 就算睡眠500ms,在锁释放后,依旧先进行读操作,读完才有写操作。
  33. ThreadPool.QueueUserWorkItem(_ => Console.WriteLine(TimeLastTrans));
  34. }
  35. }

最后

回顾了一下知识,总结了一下,发现自己又学到不少。下次回顾一下 Task 的知识。
源码 https://github.com/yijidao/blog/tree/master/TPL/ThreadDemo/ThreadDemo3

原文链接:https://www.cnblogs.com/wengzp/p/17086895.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号