经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » JS/JS库/框架 » JavaScript » 查看文章
并发工具类Phaser
来源:cnblogs  作者:jtea  时间:2023/8/11 9:00:11  对本文有异议

前言

在面试这一篇我们介绍过CountDownLatch和CyclicBarrier,它们都是jdk1.5提供的多线程并发控制类,内部都是用AQS这个同步框架实现。
在我们的实际项目中,有很多场景是需要从数据库查询一批数据,多线池执行某些操作,并且要统计结果,我们对这个过程做了一些封装,由于要统计结果,所以需要等所有任务都处理完成,我们用到了CountDownLatch实现同步。伪代码如下:

  1. ExecuteInstance ei = ExecuteInstance.build(myExecutor); //线程池
  2. //循环
  3. LoopShutdown.build("myTask").loop(() -> {
  4. //不断从数据获取数据
  5. List<Task> list = getFromDb();
  6. //设置countdownlatch
  7. ei.setCountDownSize(list.size());
  8. list.forEach(item -> ei.execute(() -> {
  9. //提交到线程池执行,并且统计
  10. }));
  11. //等待这一批做完
  12. ei.await();
  13. });
  14. //内部使用了CountDownLatch await()
  15. return ei.awaitResult();

代码很简单,容易理解。不过后来有同学提到每次都要setCountDownSize() + await() 这套组合太麻烦,能不能省略这两步呢。另外也不够灵活,有些场景不能提前知道要处理的数据总数,例如从迭代器遍历数据,Iterator接口并没有size方法可以获取到总数。

那怎么实现这个功能呢?就是本篇要介绍的Phaser。

Phaser原理

Phaser类是jdk7提供的,可重用的,同步的,在功能上和CountDownLatch,CyclicBarrier类似,但更加灵活的类。
"phaser" google翻译一下是:"移相器"的意思,完全不知道是什么~,不过"phase"是阶段的意思,还是能从名字了解到一些信息。

Phaser运行机制:

  • Registration(注册)
    跟其他barrier不同,在phaser上注册的parties会随着时间的变化而变化。任务可以随时注册(使用方法register,bulkRegister注册,或者由构造器确定初始parties),并且在任何抵达点可以随意地撤销注册(方法arriveAndDeregister)。就像大多数基本的同步结构一样,注册和撤销只影响内部计数;不会创建更深的内部记录,所以任务不能查询他们是否已经注册。(不过,可以通过继承来实现类似的记录)
    可以动态的注册是它的特点之一,我们知道CountDownLatch之类的在开始就需要指定一个计数,并且不能更改,而Phaser可以开始指定,也可以运行时更改。

  • Synchronization(同步机制)
    和CyclicBarrier一样,Phaser也可以重复await。方法arriveAndAwaitAdvance的效果类似CyclicBarrier.await。phaser的每一代都有一个相关的phase number,初始值为0,当所有注册的任务都到达phaser时phase+1,到达最大值(Integer.MAX_VALUE)之后清零。使用phase number可以独立控制到达phaser和等待其他线程的动作,通过下面两种类型的方法:

    Arrival(到达机制) arrive和arriveAndDeregister方法记录到达状态。这些方法不会阻塞,但是会返回一个相关的arrival phase number;也就是说,phase number用来确定到达状态。当所有任务都到达给定phase时,可以执行一个可选的函数,这个函数通过重写onAdvance方法实现,通常可以用来控制终止状态。重写此方法类似于为CyclicBarrier提供一个barrierAction,但比它更灵活。

    Waiting(等待机制) awaitAdvance方法需要一个表示arrival phase number的参数,并且在phaser前进到与给定phase不同的phase时返回。和CyclicBarrier不同,即使等待线程已经被中断,awaitAdvance方法也会一直等待。中断状态和超时时间同样可用,但是当任务等待中断或超时后未改变phaser的状态时会遭遇异常。如果有必要,在方法forceTermination之后可以执行这些异常的相关的handler进行恢复操作,Phaser也可能被ForkJoinPool中的任务使用,这样在其他任务阻塞等待一个phase时可以保证足够的并行度来执行任务。

  • Termination(终止机制)
    可以用isTerminated方法检查phaser的终止状态。在终止时,所有同步方法立刻返回一个负值。在终止时尝试注册也没有效果。当调用onAdvance返回true时Termination被触发。当deregistration操作使已注册的parties变为0时,onAdvance的默认实现就会返回true。也可以重写onAdvance方法来定义终止动作。forceTermination方法也可以释放等待线程并且允许它们终止。

  • Tiering(分层结构)
    Phaser支持分层结构(树状构造)来减少竞争。注册了大量parties的Phaser可能会因为同步竞争消耗很高的成本, 因此可以设置一些子Phaser来共享一个通用的parent。这样的话即使每个操作消耗了更多的开销,但是会提高整体吞吐量。在一个分层结构的phaser里,子节点phaser的注册和取消注册都通过父节点管理。子节点phaser通过构造或方法register、bulkRegister进行首次注册时,在其父节点上注册。子节点phaser通过调用arriveAndDeregister进行最后一次取消注册时,也在其父节点上取消注册。
    这也是它的主要亮点之一,这一点很像ConcurrentHashMap(对HashTable)和LongAdder(对AtomicLong),通过分散热点来降低资源竞争,提升并发效率。

  • Monitoring(状态监控)
    由于同步方法可能只被已注册的parties调用,所以phaser的当前状态也可能被任何调用者监控。在任何时候,可以通过getRegisteredParties获取parties数,其中getArrivedParties方法返回已经到达当前phase的parties数。当剩余的parties(通过方法getUnarrivedParties获取)到达时,phase进入下一代。这些方法返回的值可能只表示短暂的状态,所以一般来说在同步结构里并没有啥卵用。

CountDownLatch和CyclicBarrier都非常简单,从Phaser提供的api数量就可以看出为什么说它更加灵活,show me the code,接下来我们通过几个例子感受一下。

Phaser例子

例子1:子线程会等全部子线程达到后才开始执行,实现类似CyclicBarrier的效果。

  1. @Test
  2. public void test1() throws InterruptedException {
  3. List<Runnable> list = Lists.newArrayList();
  4. for (int i = 0; i < 10; i++) {
  5. final int j = i;
  6. list.add(() -> System.out.println(j));
  7. }
  8. final Phaser phaser = new Phaser(); // "1" to register self
  9. // create and start threads
  10. int i = 0;
  11. for (final Runnable task : list) {
  12. i++;
  13. final int j = i;
  14. phaser.register();
  15. new Thread(() -> {
  16. try {
  17. Thread.sleep(j * 1000);
  18. } catch (InterruptedException e) {
  19. }
  20. //全部子线程到达后才开始执行
  21. phaser.arriveAndAwaitAdvance(); // await all creation
  22. task.run();
  23. }).start();
  24. }
  25. Thread.sleep(15000);
  26. }

例子2:task会循环做3次,通过重写onAdvance可以控制phaser结束的条件。

  1. @Test
  2. public void test2() throws InterruptedException {
  3. //重复做3次
  4. int iterations = 3;
  5. List<Runnable> list = Lists.newArrayList();
  6. for (int i = 0; i < 2; i++) {
  7. final int j = i;
  8. list.add(() -> System.out.println(j));
  9. }
  10. final Phaser phaser = new Phaser() {
  11. //每做一次,phase+1,该方法返回true,就会结束
  12. protected boolean onAdvance(int phase, int registeredParties) {
  13. return phase > iterations || registeredParties == 0;
  14. }
  15. };
  16. phaser.register();
  17. for (final Runnable task : list) {
  18. phaser.register();
  19. new Thread(() -> {
  20. do {
  21. task.run();
  22. phaser.arriveAndAwaitAdvance();
  23. } while (!phaser.isTerminated());
  24. }).start();
  25. }
  26. phaser.arriveAndDeregister(); // deregister self, don't wait
  27. Thread.sleep(5000);
  28. }

例子3:创建多个phaser,并关联到父phaser上,就是上面提到的分层结构。

  1. @Test
  2. public void test3() {
  3. Phaser parent = new Phaser(1);
  4. Phaser phaser1 = new Phaser(parent);
  5. Phaser phaser2 = new Phaser(parent);
  6. for (int i = 0; i < 20; i++) {
  7. final int j = i;
  8. if (i < 10) {
  9. phaser1.register();
  10. new Thread(() -> {
  11. try {
  12. Thread.sleep(1000);
  13. phaser1.arriveAndAwaitAdvance(); // await all creation
  14. System.out.println(j);
  15. } catch (InterruptedException e) {
  16. }
  17. }).start();
  18. } else if (i < 20) {
  19. phaser2.register();
  20. new Thread(() -> {
  21. try {
  22. Thread.sleep(10000);
  23. phaser2.arriveAndAwaitAdvance(); // await all creation
  24. System.out.println(j);
  25. } catch (InterruptedException e) {
  26. }
  27. }).start();
  28. }
  29. }
  30. parent.arriveAndAwaitAdvance();
  31. System.out.println("done");
  32. }

例子4:使用Phaser改写我们的代码,如下:

  1. //维护一个Phaser
  2. public static ExecuteInstance buildWithPhaser(Executor executor) {
  3. ExecuteInstance ei = new ExecuteInstance();
  4. ei.executor = executor;
  5. ei.phaser = new Phaser(1);
  6. return ei;
  7. }
  8. //提交线程池前注册一下
  9. public void executeRR(Callable<ReturnResult> task, Consumer<Exception> exceptionHandler, int batch) {
  10. phaser.register();
  11. executor.execute(() -> executeStatistics(task, exceptionHandler, batch));
  12. }
  13. //执行后deregister一下
  14. private void executeStatistics(Callable<ReturnResult> task, Consumer<Exception> exceptionHandler, int batch) {
  15. ReturnResult result = ReturnResult.NONE;
  16. try {
  17. //任务处理
  18. result = task.call();
  19. } catch (Exception e) {
  20. if (statistics) {
  21. counter.incrException(batch);
  22. }
  23. if (exceptionHandler != null) {
  24. //自定义异常处理
  25. try {
  26. exceptionHandler.accept(e);
  27. } catch (Exception he) {
  28. }
  29. }
  30. } finally {
  31. phaser.arriveAndDeregister(); //deregister
  32. if (statistics) {
  33. if (ReturnResult.SUCCESS.equals(result)) {
  34. counter.incrSuccess(batch);
  35. } else if (ReturnResult.FAIL.equals(result)) {
  36. counter.incrFail(batch);
  37. } else if (ReturnResult.FILTER.equals(result)) {
  38. counter.incrFilter(batch);
  39. }
  40. }
  41. }
  42. }
  43. //等待结果
  44. public ExecuteResult awaitResult() {
  45. phaser.arriveAndAwaitAdvance();
  46. return getExecuteResult();
  47. }

使用就非常简单了

  1. ExecuteInstance ei = ExecuteInstance.buildWithPhaser(myExecutor); //线程池
  2. //循环
  3. LoopShutdown.build("myTask").loop(() -> {
  4. //不断从数据获取数据
  5. List<Task> list = getFromDb();
  6. list.forEach(item -> ei.execute(() -> {
  7. //提交到线程池执行,并且统计
  8. }));
  9. });
  10. return ei.awaitResult();

总结

Phaser是jkd7后提供的同步工具类,它底层并没有使用AQS同步工具。相比CountDownLatch等它提供了更丰富的功能,但也意味着它更复杂,需要更多的资源,一些简单的场景CountDownLatch等工具类能满足的就使用它们即可,考虑性能,还有灵活性时才考虑使用Phaser,如笔者的场景使用Phaser就更加适合。

更多分享,欢迎关注我的github:https://github.com/jmilktea/jtea

原文链接:https://www.cnblogs.com/jtea/p/17622139.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号