经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
java协程线程之虚拟线程
来源:cnblogs  作者:無心道(失足程序员)  时间:2023/7/21 8:50:01  对本文有异议

前言

众所周知,java 是没有协程线程的,在我们如此熟知的jdk 1.8时代,大佬们想出来的办法就是异步io,甚至用并行的stream流来实现,高并发也好,缩短事件处理时间也好;大家都在想着自己认为更好的实现方式;

在来说说吧,我为什么会在今天研究这个破b玩意儿呢,

这事情还的从一个月前的版本维护说起,

目前公司游戏运营的算中规中矩吧,日新增和日活跃用户基本保持在1w,2.5w样子;

大概1-2周会有一次版本更新,需要停服维护的,

我想大部分做游戏的同僚可能都知道,游戏架构里面包含一个登录服这么一个环节,用于对账号管理以及和sdk平台做登录二次验证;

我们的问题也就出在了这sdk二次登录验证环境;

 从这个截图中不难看出,我在向sdk服务器进行验证的时候http请求耗时,一个请求多长达400ms,按照这个逻辑,一个线程一秒钟也只能是2个登录;

然后面对停服维护阶段,玩家疯狂的尝试登录,导致登录服务器直接积压了30万个登录请求等待处理;

在寻求方案的时候,看到了http请求池化方案,目前已经大线程池(这里是本人自定义线程池)和http池化(基于 Apache CloseableHttpClient)处理方案 因为平台是jdk11的

在寻求方案同时发现了jdk19开放的预览版新功能虚拟线程;翻阅了一些资料,就像这虚拟线程能不能为我带来更好性能体验,让现有的系统,吞吐量更上一层楼;

一下测试代码用的是jdk20测试

 构建虚拟线程

 第一步我们需要先创建虚拟线程,才能去理解什么是虚拟线程

  1. 1 public static void main(String[] args) throws Exception {
  2. 2
  3. 3 Thread.startVirtualThread(() -> {
  4. 4 System.out.println(Thread.currentThread().toString());
  5. 5 });
  6. 6
  7. 7 Thread.sleep(3000);
  8. 8 }

 

 这就正确的启动了一个虚拟线程;从线程明明输出看着是不是有点眼熟,是不是跟stream的并行流很相似;

接下来我们看看虚拟线程的运行是怎么回事,

  1. 1 public static void main(String[] args) throws Exception {
  2. 2
  3. 3 Thread.startVirtualThread(() -> {
  4. 4 try {
  5. 5 Thread.sleep(5000);
  6. 6 } catch (InterruptedException e) {
  7. 7 throw new RuntimeException(e);
  8. 8 }
  9. 9 System.out.println(Thread.currentThread().toString());
  10. 10 });
  11. 11
  12. 12 Thread.startVirtualThread(() -> {
  13. 13 try {
  14. 14 Thread.sleep(5000);
  15. 15 } catch (InterruptedException e) {
  16. 16 throw new RuntimeException(e);
  17. 17 }
  18. 18 System.out.println(Thread.currentThread().toString());
  19. 19 });
  20. 20 Thread.startVirtualThread(() -> {
  21. 21 try {
  22. 22 Thread.sleep(5000);
  23. 23 } catch (InterruptedException e) {
  24. 24 throw new RuntimeException(e);
  25. 25 }
  26. 26 System.out.println(Thread.currentThread().toString());
  27. 27 });
  28. 28 Thread.startVirtualThread(() -> {
  29. 29 try {
  30. 30 Thread.sleep(5000);
  31. 31 } catch (InterruptedException e) {
  32. 32 throw new RuntimeException(e);
  33. 33 }
  34. 34 System.out.println(Thread.currentThread().toString());
  35. 35 });
  36. 36 Thread.startVirtualThread(() -> {
  37. 37 try {
  38. 38 Thread.sleep(5000);
  39. 39 } catch (InterruptedException e) {
  40. 40 throw new RuntimeException(e);
  41. 41 }
  42. 42 System.out.println(Thread.currentThread().toString());
  43. 43 });
  44. 44 Thread.sleep(3000);
  45. 45 }
View Code

我们多new几个虚拟线程来看看监控

 看到了吧,实际上你new的虚拟线程,其实是被当成了一个任务丢到了线程池里面在运行;

 在翻阅了现有的代码逻辑还不能定义这个底部线程池,只能使用默认的;

当然目前是预览版,不确定之后会不会可以自定义实现,stream流一样,可以定义它并行数量;

 

线程池对比

测试用例1 

  1. 1 @Test
  2. 2 public void r() {
  3. 3 t1();
  4. 4 t2();
  5. 5 }
  6. 6
  7. 7 public void t1() {
  8. 8 AtomicInteger atomicInteger = new AtomicInteger(100);
  9. 9 try (var executor = Executors.newFixedThreadPool(10)) {
  10. 10 long nanoTime = System.nanoTime();
  11. 11 for (int i = 0; i < 100; i++) {
  12. 12 executor.execute(() -> {
  13. 13 try {
  14. 14 Thread.sleep(50);
  15. 15 } catch (InterruptedException e) {
  16. 16 throw new RuntimeException(e);
  17. 17 }
  18. 18 atomicInteger.decrementAndGet();
  19. 19 });
  20. 20 }
  21. 21 while (atomicInteger.get() > 0) {}
  22. 22 System.out.println("平台线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
  23. 23 }
  24. 24 }
  25. 25
  26. 26 public void t2() {
  27. 27 AtomicInteger atomicInteger = new AtomicInteger(100);
  28. 28 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
  29. 29 long nanoTime = System.nanoTime();
  30. 30 for (int i = 0; i < 100; i++) {
  31. 31 executor.execute(() -> {
  32. 32 try {
  33. 33 Thread.sleep(50);
  34. 34 } catch (InterruptedException e) {
  35. 35 throw new RuntimeException(e);
  36. 36 }
  37. 37 atomicInteger.decrementAndGet();
  38. 38 });
  39. 39 }
  40. 40 while (atomicInteger.get() > 0) {}
  41. 41 System.out.println("虚拟线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
  42. 42 }
  43. 43 }
View Code

 通过这段测试代码对比,总任务耗时,显而易见性能;

测试用例2

  1. 1 public void t2p() {
  2. 2 Runnable runnable = () -> {
  3. 3 long g = 0;
  4. 4 for (int i = 0; i < 10000; i++) {
  5. 5 for (int j = 0; j < 10000; j++) {
  6. 6 for (int k = 0; k < 100; k++) {
  7. 7 g++;
  8. 8 }
  9. 9 }
  10. 10 }
  11. 11 };
  12. 12 AtomicInteger atomicInteger = new AtomicInteger(100);
  13. 13 try (var executor = Executors.newFixedThreadPool(10)) {
  14. 14 long nanoTime = System.nanoTime();
  15. 15 for (int i = 0; i < 100; i++) {
  16. 16 executor.execute(() -> {
  17. 17 runnable.run();
  18. 18 atomicInteger.decrementAndGet();
  19. 19 });
  20. 20 }
  21. 21 while (atomicInteger.get() > 0) {}
  22. 22 System.out.println("平台线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
  23. 23 }
  24. 24 }
  25. 25
  26. 26 public void t2v() {
  27. 27 Runnable runnable = () -> {
  28. 28 long g = 0;
  29. 29 for (int i = 0; i < 10000; i++) {
  30. 30 for (int j = 0; j < 10000; j++) {
  31. 31 for (int k = 0; k < 100; k++) {
  32. 32 g++;
  33. 33 }
  34. 34 }
  35. 35 }
  36. 36 };
  37. 37 AtomicInteger atomicInteger = new AtomicInteger(100);
  38. 38 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
  39. 39 long nanoTime = System.nanoTime();
  40. 40 for (int i = 0; i < 100; i++) {
  41. 41 executor.execute(() -> {
  42. 42 runnable.run();
  43. 43 atomicInteger.decrementAndGet();
  44. 44 });
  45. 45 }
  46. 46 while (atomicInteger.get() > 0) {}
  47. 47 System.out.println("虚拟线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
  48. 48 }
  49. 49 }

 通过测试用例2不难看出,虚拟线程已经不占优势;

这是为什么呢?

总结

平台线程我就不过多描述因为大家都知道,网上的描述也特别多;

虚拟线程,其实我们更多可以可以考虑他只是一个任务,异步的任务;

区别在于,平台线程受制于cpu,如果你执行任务很耗时或者比如网络io等挂起等待,那么这个cpu也会一直挂起等待无法处理其他事情;

虚拟线程是异步任务凌驾于平台线程之上,也就是说,当你的虚拟线程等待挂起的时候,平台线程就去执行其他任务(其他虚拟线程)去了

我们通过上面测试用例可以这样理解,

 

用例1,通常我们的RPC服务或者SDK跟我开通SDK二次验证大部分时间处于等待挂起业务,这时候虚拟线程的作用就会非常大,他可以发起大量的验证请求,等待回答;我们通常定义的IO密集型应用;

 

用例2,属于计算型的,它会一直占用cpu时间片,不会腾出cpu去执行其他事件;我们通常说cpu密集型应用不太适用虚拟线程;

 

目前虚拟线程的执行依赖于底层线程池,我们无法自主控制它,所以不是很建议使用

关于虚拟线程的描述或者定义我就不在过多的去阐述,

我只说一下它运行的逻辑吧,

1,在不同时间段一个虚拟线程可以由不同的平台线程调度,也可以由一个平台线程调度,平台线程=系统线程=cpu

2,在不同时间段一个平台线程在可以调度不同的虚拟线程,也可以反复调度一个虚拟线程

3,在同一时间段,一个平台线程只能调用一个虚拟线程,一个虚拟线程只能由一个平台线程调度

换言之,其实虚拟线程可以看成一个task,你可以new很多的task,至于他什么时候被执行,就看你的工人(cpu)什么时候有空,

 

  1. 1 package code.threading;
  2. 2
  3. 3 import org.junit.Test;
  4. 4
  5. 5 import java.util.ArrayList;
  6. 6 import java.util.List;
  7. 7 import java.util.concurrent.Executors;
  8. 8 import java.util.concurrent.atomic.AtomicBoolean;
  9. 9 import java.util.concurrent.atomic.AtomicInteger;
  10. 10
  11. 11 /**
  12. 12 * 线程测试
  13. 13 *
  14. 14 * @author: Troy.Chen(無心道, 15388152619)
  15. 15 * @version: 2023-05-29 21:31
  16. 16 **/
  17. 17 public class ThreadCode {
  18. 18
  19. 19 public static void main(String[] args) throws Exception {
  20. 20
  21. 21 }
  22. 22
  23. 23 @Test
  24. 24 public void s() throws Exception {
  25. 25
  26. 26 Runnable runnable = () -> {
  27. 27 long nanoTime = System.nanoTime();
  28. 28 long g = 0;
  29. 29 for (int i = 0; i < 10000; i++) {
  30. 30 for (int j = 0; j < 10000; j++) {
  31. 31 for (int k = 0; k < 100; k++) {
  32. 32 g++;
  33. 33 }
  34. 34 }
  35. 35 }
  36. 36 Thread thread = Thread.currentThread();
  37. 37 System.out.println(g + " - " + thread.isVirtual() + " - " + thread.threadId() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
  38. 38 };
  39. 39
  40. 40 List<VirtualThread> ts = new ArrayList<>();
  41. 41 ts.add(new VirtualThread(runnable));
  42. 42 ts.add(new VirtualThread(runnable));
  43. 43 ts.add(new VirtualThread(runnable));
  44. 44 ts.add(new VirtualThread(runnable));
  45. 45 ts.add(new VirtualThread(runnable));
  46. 46 ts.add(new VirtualThread(runnable));
  47. 47 ts.add(new VirtualThread(runnable));
  48. 48 ts.add(new VirtualThread(runnable));
  49. 49 ts.add(new VirtualThread(runnable));
  50. 50 ts.add(new VirtualThread(runnable));
  51. 51 ts.add(new VirtualThread(runnable));
  52. 52 ts.add(new VirtualThread(runnable));
  53. 53 ts.add(new VirtualThread(runnable));
  54. 54 ts.add(new VirtualThread(runnable));
  55. 55 ts.add(new VirtualThread(runnable));
  56. 56 ts.add(new VirtualThread(runnable));
  57. 57 ts.add(new VirtualThread(runnable));
  58. 58 ts.add(new VirtualThread(runnable));
  59. 59 ts.add(new VirtualThread(runnable));
  60. 60 ts.add(new VirtualThread(runnable));
  61. 61 for (VirtualThread t : ts) {
  62. 62 t.shutdown();
  63. 63 }
  64. 64 for (VirtualThread t : ts) {
  65. 65 t.join();
  66. 66 }
  67. 67 }
  68. 68
  69. 69 public static class VirtualThread implements Runnable {
  70. 70
  71. 71 /*虚拟线程构建器*/
  72. 72 static final Thread.Builder.OfVirtual ofVirtual = Thread.ofVirtual().name("v-", 1);
  73. 73
  74. 74 AtomicBoolean shutdown = new AtomicBoolean();
  75. 75 Thread _thread;
  76. 76 Runnable runnable;
  77. 77
  78. 78 public VirtualThread(Runnable runnable) {
  79. 79 this.runnable = runnable;
  80. 80 _thread = ofVirtual.start(this);
  81. 81 }
  82. 82
  83. 83 @Override public void run() {
  84. 84 do {
  85. 85 try {
  86. 86 try {
  87. 87 this.runnable.run();
  88. 88 } catch (Throwable e) {
  89. 89 e.printStackTrace();
  90. 90 }
  91. 91 } catch (Throwable throwable) {}
  92. 92 } while (!shutdown.get());
  93. 93 System.out.println("虚拟线程退出 " + _thread.isVirtual() + " - " + _thread.threadId() + " - " + _thread.getName());
  94. 94 }
  95. 95
  96. 96 public void shutdown() {
  97. 97 shutdown.lazySet(true);
  98. 98 }
  99. 99
  100. 100 public void join() throws InterruptedException {
  101. 101 _thread.join();
  102. 102 }
  103. 103 }
  104. 104
  105. 105 @Test
  106. 106 public void r() {
  107. 107 t2p();
  108. 108 t2v();
  109. 109 }
  110. 110
  111. 111 public void t1p() {
  112. 112 AtomicInteger atomicInteger = new AtomicInteger(100);
  113. 113 try (var executor = Executors.newFixedThreadPool(10)) {
  114. 114 long nanoTime = System.nanoTime();
  115. 115 for (int i = 0; i < 100; i++) {
  116. 116 executor.execute(() -> {
  117. 117 try {
  118. 118 Thread.sleep(50);
  119. 119 } catch (InterruptedException e) {
  120. 120 throw new RuntimeException(e);
  121. 121 }
  122. 122 atomicInteger.decrementAndGet();
  123. 123 });
  124. 124 }
  125. 125 while (atomicInteger.get() > 0) {}
  126. 126 System.out.println("平台线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
  127. 127 }
  128. 128 }
  129. 129
  130. 130 public void t1v() {
  131. 131 AtomicInteger atomicInteger = new AtomicInteger(100);
  132. 132 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
  133. 133 long nanoTime = System.nanoTime();
  134. 134 for (int i = 0; i < 100; i++) {
  135. 135 executor.execute(() -> {
  136. 136 try {
  137. 137 Thread.sleep(50);
  138. 138 } catch (InterruptedException e) {
  139. 139 throw new RuntimeException(e);
  140. 140 }
  141. 141 atomicInteger.decrementAndGet();
  142. 142 });
  143. 143 }
  144. 144 while (atomicInteger.get() > 0) {}
  145. 145 System.out.println("虚拟线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
  146. 146 }
  147. 147 }
  148. 148
  149. 149 public void t2p() {
  150. 150 Runnable runnable = () -> {
  151. 151 long g = 0;
  152. 152 for (int i = 0; i < 10000; i++) {
  153. 153 for (int j = 0; j < 10000; j++) {
  154. 154 for (int k = 0; k < 100; k++) {
  155. 155 g++;
  156. 156 }
  157. 157 }
  158. 158 }
  159. 159 };
  160. 160 AtomicInteger atomicInteger = new AtomicInteger(100);
  161. 161 try (var executor = Executors.newFixedThreadPool(10)) {
  162. 162 long nanoTime = System.nanoTime();
  163. 163 for (int i = 0; i < 100; i++) {
  164. 164 executor.execute(() -> {
  165. 165 runnable.run();
  166. 166 atomicInteger.decrementAndGet();
  167. 167 });
  168. 168 }
  169. 169 while (atomicInteger.get() > 0) {}
  170. 170 System.out.println("平台线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
  171. 171 }
  172. 172 }
  173. 173
  174. 174 public void t2v() {
  175. 175 Runnable runnable = () -> {
  176. 176 long g = 0;
  177. 177 for (int i = 0; i < 10000; i++) {
  178. 178 for (int j = 0; j < 10000; j++) {
  179. 179 for (int k = 0; k < 100; k++) {
  180. 180 g++;
  181. 181 }
  182. 182 }
  183. 183 }
  184. 184 };
  185. 185 AtomicInteger atomicInteger = new AtomicInteger(100);
  186. 186 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
  187. 187 long nanoTime = System.nanoTime();
  188. 188 for (int i = 0; i < 100; i++) {
  189. 189 executor.execute(() -> {
  190. 190 runnable.run();
  191. 191 atomicInteger.decrementAndGet();
  192. 192 });
  193. 193 }
  194. 194 while (atomicInteger.get() > 0) {}
  195. 195 System.out.println("虚拟线程 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
  196. 196 }
  197. 197 }
  198. 198 }
View Code

附加一段全部测试代码

  

原文链接:https://www.cnblogs.com/shizuchengxuyuan/p/17565269.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号