经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » C# » 查看文章
通过一个示例形象地理解C# async await 非并行异步、并行异步、并行异步的并发量控制
来源:cnblogs  作者:0611163  时间:2023/2/6 9:03:25  对本文有异议

前言

接上一篇 通过一个示例形象地理解C# async await异步
我在 .NET与大数据 中吐槽前同事在双层循环体中(肯定是单线程了)频繁请求es,导致接口的总耗时很长。这不能怪前同事,确实难写,会使代码复杂度增加。
评论区有人说他的理解是使用异步增加了系统吞吐能力,这个理解是正确的,但对于单个接口的单次请求而言,它是单线程的,耗时反而可能比同步还慢。如何缩短单个接口的单次请求的时间呢(要求:尽量不增加代码复杂度)?请看下文。

示例的测试步骤

先直接测试,看结果,下面再放代码

  1. 点击VS2022的启动按钮,启动程序,它会先启动Server工程,再启动AsyncAwaitDemo2工程
  2. 分别点击三个button
  3. 观察思考输出结果

测试截图

非并行异步(顺序执行的异步)


截图说明:单次请求耗时约0.5秒,共10次请求,耗时约 0.5秒×10=5秒

并行异步


截图说明:单次请求耗时约0.5秒,共10次请求,耗时约 0.5秒

并行异步(控制并发数量)


截图说明:单次请求耗时约0.5秒,共10次请求,并发数是5,耗时约 0.5秒×10÷5=1秒

服务端
服务端和客户端是两个独立的工程,测试时在一起跑,但其实可以分开部署,部署到不同的机器上
服务端是一个web api接口,用.NET 6、VS2022开发,代码如下:

  1. [ApiController]
  2. [Route("[controller]")]
  3. public class TestController : ControllerBase
  4. {
  5. [HttpGet]
  6. [Route("[action]")]
  7. public async Task<Dictionary<int, int>> Get(int i)
  8. {
  9. var result = new Dictionary<int, int>();
  10. await Task.Delay(500); //模拟耗时操作
  11. if (i == 0)
  12. {
  13. result.Add(0, 5);
  14. result.Add(1, 4);
  15. result.Add(2, 3);
  16. result.Add(3, 2);
  17. result.Add(4, 1);
  18. }
  19. else if (i == 1)
  20. {
  21. result.Add(0, 10);
  22. result.Add(1, 9);
  23. result.Add(2, 8);
  24. result.Add(3, 7);
  25. result.Add(4, 6);
  26. }
  27. return result;
  28. }
  29. }

客户端
大家看客户端代码时,不需要关心服务端怎么写
客户端是一个Winform工程,用.NET 6、VS2022开发,代码如下:

  1. public partial class Form1 : Form
  2. {
  3. private readonly string _url = "http://localhost:5028/Test/Get";
  4. public Form1()
  5. {
  6. InitializeComponent();
  7. }
  8. private async void Form1_Load(object sender, EventArgs e)
  9. {
  10. //预热
  11. HttpClient httpClient = HttpClientFactory.GetClient();
  12. await (await httpClient.GetAsync(_url)).Content.ReadAsStringAsync();
  13. }
  14. //非并行异步(顺序执行的异步)
  15. private async void button3_Click(object sender, EventArgs e)
  16. {
  17. await Task.Run(async () =>
  18. {
  19. Log($"==== 非并行异步 开始,线程ID={Thread.CurrentThread.ManagedThreadId} ========================");
  20. Stopwatch sw = Stopwatch.StartNew();
  21. HttpClient httpClient = HttpClientFactory.GetClient();
  22. var tasks = new Dictionary<string, Task<string>>();
  23. StringBuilder sb = new StringBuilder();
  24. for (int i = 0; i < 2; i++)
  25. {
  26. int sum = 0;
  27. for (int j = 0; j < 5; j++)
  28. {
  29. Dictionary<int, int> dict = await RequestAsync(_url, i);
  30. if (dict.ContainsKey(j))
  31. {
  32. int num = dict[j];
  33. sum += num;
  34. sb.Append($"{num}, ");
  35. }
  36. }
  37. Log($"输出:sum={sum}");
  38. }
  39. Log($"输出:{sb}");
  40. sw.Stop();
  41. Log($"==== 结束,线程ID={Thread.CurrentThread.ManagedThreadId},耗时:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
  42. });
  43. }
  44. // 并行异步
  45. private async void button4_Click(object sender, EventArgs e)
  46. {
  47. await Task.Run(async () =>
  48. {
  49. Log($"==== 并行异步 开始,线程ID={Thread.CurrentThread.ManagedThreadId} ========================");
  50. Stopwatch sw = Stopwatch.StartNew();
  51. HttpClient httpClient = HttpClientFactory.GetClient();
  52. var tasks = new Dictionary<string, Task<Dictionary<int, int>>>();
  53. StringBuilder sb = new StringBuilder();
  54. //双层循环写第一遍
  55. for (int i = 0; i < 2; i++)
  56. {
  57. for (int j = 0; j < 5; j++)
  58. {
  59. var task = RequestAsync(_url, i);
  60. tasks.Add($"{i}_{j}", task);
  61. }
  62. }
  63. //双层循环写第二遍
  64. for (int i = 0; i < 2; i++)
  65. {
  66. int sum = 0;
  67. for (int j = 0; j < 5; j++)
  68. {
  69. Dictionary<int, int> dict = await tasks[$"{i}_{j}"];
  70. if (dict.ContainsKey(j))
  71. {
  72. int num = dict[j];
  73. sum += num;
  74. sb.Append($"{num}, ");
  75. }
  76. }
  77. Log($"输出:sum={sum}");
  78. }
  79. Log($"输出:{sb}");
  80. sw.Stop();
  81. Log($"==== 结束,线程ID={Thread.CurrentThread.ManagedThreadId},耗时:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
  82. });
  83. }
  84. // 并行异步(控制并发数量)
  85. private async void button5_Click(object sender, EventArgs e)
  86. {
  87. await Task.Run(async () =>
  88. {
  89. Log($"==== 并行异步(控制并发数量) 开始,线程ID={Thread.CurrentThread.ManagedThreadId} ===================");
  90. Stopwatch sw = Stopwatch.StartNew();
  91. HttpClient httpClient = HttpClientFactory.GetClient();
  92. var tasks = new Dictionary<string, Task<Dictionary<int, int>>>();
  93. Semaphore sem = new Semaphore(5, 5);
  94. StringBuilder sb = new StringBuilder();
  95. //双层循环写第一遍
  96. for (int i = 0; i < 2; i++)
  97. {
  98. for (int j = 0; j < 5; j++)
  99. {
  100. var task = RequestAsync(_url, i, sem);
  101. tasks.Add($"{i}_{j}", task);
  102. }
  103. }
  104. //双层循环写第二遍
  105. for (int i = 0; i < 2; i++)
  106. {
  107. int sum = 0;
  108. for (int j = 0; j < 5; j++)
  109. {
  110. Dictionary<int, int> dict = await tasks[$"{i}_{j}"];
  111. if (dict.ContainsKey(j))
  112. {
  113. int num = dict[j];
  114. sum += num;
  115. sb.Append($"{num}, ");
  116. }
  117. }
  118. Log($"输出:sum={sum}");
  119. }
  120. sem.Dispose(); //别忘了释放
  121. Log($"输出:{sb}");
  122. sw.Stop();
  123. Log($"==== 结束,线程ID={Thread.CurrentThread.ManagedThreadId},耗时:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
  124. });
  125. }
  126. private async Task<Dictionary<int, int>> RequestAsync(string url, int i)
  127. {
  128. Stopwatch sw = Stopwatch.StartNew();
  129. HttpClient httpClient = HttpClientFactory.GetClient();
  130. var result = await (await httpClient.GetAsync($"{url}?i={i}")).Content.ReadAsStringAsync();
  131. sw.Stop();
  132. Log($"线程ID={Thread.CurrentThread.ManagedThreadId},请求耗时:{sw.Elapsed.TotalSeconds:0.000}秒");
  133. return JsonSerializer.Deserialize<Dictionary<int, int>>(result);
  134. }
  135. private async Task<Dictionary<int, int>> RequestAsync(string url, int i, Semaphore semaphore)
  136. {
  137. semaphore.WaitOne();
  138. try
  139. {
  140. Stopwatch sw = Stopwatch.StartNew();
  141. HttpClient httpClient = HttpClientFactory.GetClient();
  142. var result = await (await httpClient.GetAsync($"{url}?i={i}")).Content.ReadAsStringAsync();
  143. sw.Stop();
  144. Log($"线程ID={Thread.CurrentThread.ManagedThreadId},请求耗时:{sw.Elapsed.TotalSeconds:0.000}秒");
  145. return JsonSerializer.Deserialize<Dictionary<int, int>>(result);
  146. }
  147. catch (Exception ex)
  148. {
  149. Log($"错误:{ex}");
  150. throw;
  151. }
  152. finally
  153. {
  154. semaphore.Release();
  155. }
  156. }
  157. #region Log
  158. private void Log(string msg)
  159. {
  160. msg = $"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")} {msg}\r\n";
  161. if (this.InvokeRequired)
  162. {
  163. this.BeginInvoke(new Action(() =>
  164. {
  165. txtLog.AppendText(msg);
  166. }));
  167. }
  168. else
  169. {
  170. txtLog.AppendText(msg);
  171. }
  172. }
  173. #endregion
  174. private void button6_Click(object sender, EventArgs e)
  175. {
  176. txtLog.Text = string.Empty;
  177. }
  178. }

思考

1. 使用Semaphore的注意事项

  1. 如果是Winform程序,可以在button事件方法中定义它的局部变量。如果是WebAPI接口服务,请在接口方法中定义Semaphore的局部变量。注意,别定义成全局的,或者定义成静态的,或者定义成Controller的成员变量,那样会严重限制使用它的接口的吞吐能力!
  2. 用完调用Dispose释放

2. 尽量不增加代码复杂度

请思考代码中的注释"双层循环写第一遍""双层循环写第二遍",这个写法尽量不增加代码复杂度,试想一下,如果你用Task.Run,且不说占用线程,就问你怎么写能简单?
有人说,这题我会,这样写不就行了:

  1. Dictionary<int, int>[] result = await Task.WhenAll(tasks.Values);

那请问,你接下来怎么写?我相信你肯定会写,但问题是,代码的逻辑结构变了,代码复杂度增加了!
所以"双层循环写第一遍""双层循环写第二遍"是什么意思?你即能方便合并,又能方便拆分,代码逻辑结构没变,只是复制了一份。

3. RequestAsync的复杂度可控

RequestAsync的复杂度并没有因为Semaphore的引入变得更复杂,增加的代码可以接受。

我写这篇博客不只是写个Demo,我确实有实际项目中的问题需要解决,代码如下:

WebAPI的Controller层:

  1. [HttpPost]
  2. [Route("[action]")]
  3. public async Task<List<NightActivitiesResultItem>> Get([FromBody] NightActivitiesPostData data)
  4. {
  5. return await ServiceFactory.Get<NightActivitiesService>().Get(data.startDate, data.endDate, data.startTime, data.endTime, data.threshold, data.peopleClusters);
  6. }

WebAPI的Service层:

  1. public async Task<List<NightActivitiesResultItem>> Get(string strStartDate, string strEndDate, string strStartTime, string strEndTime, decimal threshold, List<PeopleCluster> peopleClusterList)
  2. {
  3. List<NightActivitiesResultItem> result = new List<NightActivitiesResultItem>();
  4. DateTime startDate = DateTime.ParseExact(strStartDate, "yyyyMMdd", CultureInfo.InvariantCulture);
  5. DateTime endDate = DateTime.ParseExact(strEndDate, "yyyyMMdd", CultureInfo.InvariantCulture);
  6. string[][] strTimes;
  7. if (string.Compare(strStartTime, strEndTime) > 0)
  8. {
  9. strTimes = new string[2][] { new string[2], new string[2] };
  10. strTimes[0][0] = strStartTime;
  11. strTimes[0][1] = "235959";
  12. strTimes[1][0] = "000000";
  13. strTimes[1][1] = strEndTime;
  14. }
  15. else
  16. {
  17. strTimes = new string[1][] { new string[2] };
  18. strTimes[0][0] = strStartTime;
  19. strTimes[0][1] = strEndTime;
  20. }
  21. foreach (PeopleCluster peopleCluster in peopleClusterList)
  22. {
  23. for (DateTime day = startDate; day <= endDate; day = day.AddDays(1))
  24. {
  25. string strDate = day.ToString("yyyyMMdd");
  26. int sum = 0;
  27. foreach (string[] timeArr in strTimes)
  28. {
  29. List<PeopleFeatureAgg> list = await ServiceFactory.Get<PeopleFeatureQueryService>().QueryAgg(strDate + timeArr[0], strDate + timeArr[1], peopleCluster.ClusterIds);
  30. Dictionary<string, int> agg = list.ToLookup(a => a.ClusterId).ToDictionary(a => a.Key, a => a.First().Count);
  31. foreach (string clusterId in peopleCluster.ClusterIds)
  32. {
  33. if (agg.TryGetValue(clusterId, out int count))
  34. {
  35. sum += count;
  36. }
  37. }
  38. }
  39. if (sum >= threshold) //大于或等于阈值
  40. {
  41. NightActivitiesResultItem item = new NightActivitiesResultItem();
  42. item.peopleCluster = peopleCluster;
  43. item.date = strDate;
  44. item.count = sum;
  45. foreach (string[] timeArr in strTimes)
  46. {
  47. PeopleFeatureQueryResult featureList = await ServiceFactory.Get<PeopleFeatureQueryService>().Query(strDate + timeArr[0], strDate + timeArr[1], peopleCluster.ClusterIds, 10000);
  48. item.list.AddRange(featureList.list);
  49. }
  50. item.dataType = "xxx";
  51. result.Add(item);
  52. }
  53. }
  54. }
  55. var clusters = result.ConvertAll<PeopleCluster>(a => a.peopleCluster);
  56. await ServiceFactory.Get<PersonScoreService>().Set(OpeType.Xxx, peopleClusterList, clusters, startDate.ToString("yyyyMMddHHmmss"), endDate.ToString("yyyyMMddHHmmss"));
  57. return result;
  58. }

思考

上述接口代码,它有三层循环,在第三层循环体中await,第一层循环的数量会达到1000甚至10000,第二层循环的数量会达到30(一个月30天),甚至90(三个月),第三层循环的数量很少。
那么总请求次数会达到3万甚至90万,如果不使用并行异步请求,那耗时将会很长。

请问:在尽量不增加代码复杂度的前提下,怎么优化,能缩短该服务接口的执行时间?

我知道肯定有人要说我了,你傻啊,请求3万次?你可以改写一下,只请求一次,或者按天来,每天的数据只请求一次,那最多也才90次。然后在内存中计算,这不就快了?
确实是这样的,确实不应该请求3万次。但问题没这么简单:

  1. 且不说代码的复杂度,你写的不是一个接口,可能会有几十个这样的接口要写,复杂度增加一点这么多接口都要写死人。
  2. 这3万请求,可都是精确查询,es强大的缓存机制,肯定会命中缓存,也就是这些请求实际上基本是直接从内存中拿数据,连遍历集合都不需要,直接命中索引。只是网络往返次数太多。
  3. 这1次请求,或30次请求,对es来说,变成了范围查询,es要遍历,要给你查询并组织数据,返回集合给你。当然es集群的运算速度肯定很快。
  4. 这1次请求,或30次请求,结果返回后,你就要在内存中计算了,有的接口我就是这样写的,但要多写代码,比如在内存中计算,为了提高效率,先创建字典,相当于建索引。
  5. 只是逻辑复杂了吗?你还要多定义一些临时的变量啊!还可能要多定义一些实体类,哪怕是匿名对象。
  6. 代码写着写着就变懒了,对于每个接口,先组织好数据,再进行1次请求,然后在内存中再遍历再计算,心智负担好重
  7. 我在网上看到es集群默认最多支持10000个并发查询,需要请求es的业务程序肯定不止一个,对一个业务程序而言,确实要控制并发量
  8. 根据我的观察,一个WebAPI程序,线程数一般也就几十,多的时候上百,在没有异步的时候,并发请求数量实际上受限于物理线程。
  9. 使用异步之后,并发请求数量实际上受限于虚拟线程。确实会增加请求es的并发数量,压力大的时候,这个并发数量可能会很大。

怎么查看并发请求数

windows的cmd命令:
netstat -ano | findstr 5028

还有两个问题,博客中没有体现

1. 客户端程序执行请求时,客户端线程数量

通过任务管理器查看,非并行异步,线程数很少,请求开始后只增加了一两个线程。并行异步线程数较多。并行异步控制并发数量,线程数少很多。

2. Semaphore会阻塞当前线程

semaphore.WaitOne()阻塞线程一直阻塞到semaphore.Release(),使用了Semaphore的接口,被请求一次,阻塞一个线程,不过问题不是很大。

思考

.NET只有一个CLR线程池和一个异步线程池(完成端口线程池),当线程池中线程数量不够用时,.NET每秒才增加1到2个线程,线程增加的速度非常缓慢。结合异步,考虑一下这是为什么?
我认为(不一定对):

  1. 异步不需要大量物理线程,少量即可
  2. 如果线程增加速度很快,以异步的吞吐量,怕不是要把es请求挂!因为并发请求数太多了。

总结

  1. 并行异步,会有并发量太大,导致诸如数据库或者es集群抗不住的问题,谨慎使用。
  2. 并行异步(控制并发数量),这个目前是最佳实践。

完整测试源码

注意是AsyncParallel分支
https://gitee.com/s0611163/AsyncAwaitDemo2/tree/AsyncParallel/

最后

上述我写的实际接口,可优化也可不优化,耗时长没有问题,还有很多服务接口,它们通过定时任务在凌晨错开时间跑,结果存储在数据库中供前端查询。这是离线分析。
前同事写的接口是实时的,所以他觉得es慢了,如果只请求一次呢,可能es的查询语句也不好写,所以用ClickHouse,利用SQL灵活性,只查询一次,然后在内存中计算。

后续

又写了个测试程序,测试大量请求,并限制请求并发量。
注意是AsyncParallel2分支
https://gitee.com/s0611163/AsyncAwaitDemo2/tree/AsyncParallel2/

怎么测试?

  1. 启动服务端后,再启动客户端
  2. 点击第一个按钮,观察输出,打开Windows的资源管理器,查看Server.exe进程和AsyncAwaitDemo2.exe进程的线程数量,然后客户端可以关了,因为跑完至少要半小时。
  3. 点击第二个按钮,观察输出,打开Windows的资源管理器同上,观察工作线程数和异步线程数占用,能看到数据明显变化,大概几秒后就可以跑完。
  4. 点击第三个按钮,观察输出,打开Windows的资源管理器同上,观察工作线程数和异步线程数占用,能看到数据明显变化,大概20秒能跑完。0.065秒/每次请求×3万次请求÷100并发量≈20秒。

注意观察服务端线程数量

  1. 并行异步请求
    并行异步请求时,请求3万次只需要几秒,第二次点击需要的时间更短,仅需大约2.5秒。注意观察服务端线程数量,50不到!
    我把服务端修改成同步接口,客户端代码不动,试了一下,3万个请求,客户端报异常:由于目标计算机积极拒绝,无法连接。
    我把服务端的线程池改大一些,ThreadPool.SetMinThreads(200, 200),客户还是报异常:远程主机强迫关闭了一个现有的连接。

  2. 并行异步请求(控制并发数量)
    3万次请求,耗时大约60秒,很显然服务端的吞吐量较低。
    把服务端的线程池改大一些,ThreadPool.SetMinThreads(200, 200),可以达到异步接口同样的吞吐量。

原文链接:https://www.cnblogs.com/s0611163/p/17090954.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号