经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Elasticsearch » 查看文章
ElasticSearch 线程池类型分析之 ResizableBlockingQueue 队列分析
来源:cnblogs  作者:hapjin  时间:2019/7/8 8:37:40  对本文有异议

ElasticSearch 线程池类型分析之 ResizableBlockingQueue 队列分析

在上一篇文章 ElasticSearch 线程池类型分析之 ExecutorScalingQueue的末尾,谈到了处理ES 搜索操作(search)的线程池的一些实现细节,本文就以下几个问题分析SEARCH操作的线程池。

  1. 如何统计一个线程池中的任务的排队等待时间、执行时间?排队等待时间是指任务提交给了线程池,但尚未调度运行。执行时间是任务开始执行到执行完成这一段时间
  2. 如何设计一个可动态调整容量(最大长度)的任务队列?
  3. 执行ES的SEARCH操作任务的线程池的实现细节(下文称作 SEARCH线程池)

在ThreadPool类的构造方法中构造SEARCH线程池:

  1. builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
  2. Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));

SEARCH 线程池的核数线程数与部署ES节点的机器的CPU个数有关,它的任务队列的容量可动态调整,任务队列的初始长度为1000。SEARCH线程池的具体实现类是QueueResizingEsThreadPoolExecutor,采用的任务队列是ResizableBlockingQueue,拒绝策略是 EsAbortPolicy。ResizableBlockingQueue 继承了 SizeBlockingQueue,提供了可动态调整任务队列容量的功能,关于SizeBlockingQueue 可参考ElasticSearch 线程池类型分析之 SizeBlockingQueue的分析。
org.elasticsearch.common.util.concurrent.EsExecutors.newAutoQueueFixed

  1. ResizableBlockingQueue<Runnable> queue =
  2. new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
  3. return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
  4. queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,
  5. new EsAbortPolicy(), contextHolder);

提交的Runnable任务会被封装成TimedRunnable对象,从而能够统计任务的执行时间。在 new TimedRunnable 对象时,this.creationTimeNanos = System.nanoTime();,记录任务的创建时间。
finishTimeNanos-startTimeNanos代表任务的执行时间,startTimeNanos-creationTimeNanos表示任务的排队时间,这样就能记录每个Runnable任务的排队时间和执行时间了,非常完美的设计思路。
org.elasticsearch.common.util.concurrent.TimedRunnable

  1. //TimedRunnable的构造方法
  2. TimedRunnable(final Runnable original) {
  3. this.original = original;
  4. this.creationTimeNanos = System.nanoTime();
  5. }
  6. @Override
  7. public void doRun() {
  8. try {
  9. //任务执行开始时间
  10. startTimeNanos = System.nanoTime();
  11. //任务的执行逻辑
  12. original.run();
  13. } finally {
  14. //任务执行完成时间
  15. finishTimeNanos = System.nanoTime();
  16. }
  17. }

下面我来详细分析如何统计提交到线程池的Runnable任务的执行时间。先看 QueueResizingEsThreadPoolExecutor 的构造方法参数,重点看 runnableWrapper 参数,我把它理解成"处理逻辑"。
从本文的第一个代码片段 new QueueResizingEsThreadPoolExecutor 可知,TimedRunnable::new 赋值给了 runnableWrapper,由于它是java.util.function.Function接口,当java.util.function.Function.apply 方法被调用执行时,就是执行runnableWrapper处理逻辑,即:new 一个 TimedRunnable 对象。看TimedRunnable的构造方法可知,此时已经把任务的创建时间给记录下来了。
这里分析得这么详细的原因是:ES源码中大量地用到了函数式接口、Lambda表达式,刚看源码时,一直不知道这段Lambda表达式所代表的"处理逻辑"是在哪里执行的,当慢慢熟悉了这种Lambda表达式的写法后,就明白这种写法极大地提升了代码的灵活性。

  1. //runnableWrapper声明为函数式接口Function,它接收一个Runnable参数,执行runnableWrapper处理逻辑,返回一个Runnable结果
  2. private final Function<Runnable, Runnable> runnableWrapper;
  3. private final ResizableBlockingQueue<Runnable> workQueue;
  4. QueueResizingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
  5. ResizableBlockingQueue<Runnable> workQueue, int minQueueSize, int maxQueueSize,
  6. Function<Runnable, Runnable> runnableWrapper, final int tasksPerFrame,
  7. TimeValue targetedResponseTime, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
  8. ThreadContext contextHolder) {
  9. super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit,
  10. workQueue, threadFactory, handler, contextHolder);
  11. this.runnableWrapper = runnableWrapper;
  12. this.workQueue = workQueue;
  13. this.tasksPerFrame = tasksPerFrame;
  14. this.startNs = System.nanoTime();
  15. this.minQueueSize = minQueueSize;
  16. this.maxQueueSize = maxQueueSize;
  17. this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
  18. this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0);
  19. logger.debug(
  20. "thread pool [{}] will adjust queue by [{}] when determining automatic queue size", getName(), QUEUE_ADJUSTMENT_AMOUNT);
  21. }

当任务提交时,就是执行QueueResizingEsThreadPoolExecutor的doExecute()方法:

  1. @Override
  2. protected void doExecute(final Runnable command) {
  3. // we are submitting a task, it has not yet started running (because super.excute() has not
  4. // been called), but it could be immediately run, or run at a later time. We need the time
  5. // this task entered the queue, which we get by creating a TimedRunnable, which starts the
  6. // clock as soon as it is created.
  7. super.doExecute(this.runnableWrapper.apply(command));//apply方法 触发 TimedRunnable::new执行,创建TimedRunnable对象
  8. }

上面已经能够记录每一个任务的执行时间了,但是任务队列的容量设置为多少合适呢?这是由排队理论里面的little's law决定的。关于利特尔法则,可自行Google。

  1. /**
  2. * Calculate Little's Law (L), which is the "optimal" queue size for a particular task rate (lambda) and targeted response time.
  3. *
  4. * @param lambda the arrival rate of tasks in nanoseconds
  5. * @param targetedResponseTimeNanos nanoseconds for the average targeted response rate of requests
  6. * @return the optimal queue size for the give task rate and targeted response time
  7. */
  8. static int calculateL(final double lambda, final long targetedResponseTimeNanos) {
  9. assert targetedResponseTimeNanos > 0 : "cannot calculate for instantaneous requests";
  10. // L = λ * W
  11. return Math.toIntExact((long)(lambda * targetedResponseTimeNanos));
  12. }

Little's law 需要2个参数,一个是lambda,另一个是W。

  • lambda 的值可理解为线程池处理任务的速率,即:\(速率= \frac{执行成功的任务个数}{这些任务总耗时}\),总耗时为任务的排队时间加上处理时间。
  • w 是请求的平均响应时间。一个SEARCH请求,最终是转化成Runnable任务在线程池中提交执行的,那么这里的平均响应时间,我的理解是:Runnable任务的排队等待时间和执行时间,并不是通常意义上我们看到的一个Client发送SEARCH请求,ES将搜索结果返回给Client这个过程的时间,因为这个过程显然包含了网络延时。

在ES中,这个平均响应时间可以在配置文件中指定,若未指定,则默认为1s。代码如下:AutoQueueAdjustingExecutorBuilder的构造方法中将响应时间配置为1s

  1. final String targetedResponseTimeKey = settingsKey(prefix, "target_response_time");
  2. this.targetedResponseTimeSetting = Setting.timeSetting(targetedResponseTimeKey, TimeValue.timeValueSeconds(1),
  3. TimeValue.timeValueMillis(10), Setting.Property.NodeScope);

统计线程池任务的执行个数和总耗时,是在 afterExecute 方法中完成的,ES自定义线程池重写了ThreadPoolExecutor.afterExecute 方法,每当线程池中的任务执行完成时,会自动调用afterExecute方法做一些"后处理"

  1. @Override
  2. protected void afterExecute(Runnable r, Throwable t) {
  3. //重写 afterExecute 方法时,要先调用 super.afterExecute
  4. super.afterExecute(r, t);
  5. // A task has been completed, it has left the building. We should now be able to get the
  6. // total time as a combination of the time in the queue and time spent running the task. We
  7. // only want runnables that did not throw errors though, because they could be fast-failures
  8. // that throw off our timings, so only check when t is null.
  9. //只统计 类型为TimedRunnable任务 的执行时间和任务个数
  10. assert r instanceof TimedRunnable : "expected only TimedRunnables in queue";
  11. //单个任务的耗时(排队时间加上执行时间)
  12. final long taskNanos = ((TimedRunnable) r).getTotalNanos();
  13. //所有任务的总耗时(每个任务的耗时累加求和)
  14. final long totalNanos = totalTaskNanos.addAndGet(taskNanos);
  15. //单个任务的执行时间(其实就是单个任务的耗时减去排队时间)
  16. final long taskExecutionNanos = ((TimedRunnable) r).getTotalExecutionNanos();
  17. assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos;
  18. executionEWMA.addValue(taskExecutionNanos);
  19. //tasksPerFrame默认为2000, 线程池每执行完一批任务(tasksPerFrame个)就进行一次任务队列长度的调整。
  20. if (taskCount.incrementAndGet() == this.tasksPerFrame) {
  21. final long endTimeNs = System.nanoTime();
  22. //线程池从启动时刻(startNs)开始,一共运行了多长时间(注意不仅仅Runnable任务有生命周期,线程池也是有生命周期的)
  23. final long totalRuntime = endTimeNs - this.startNs;
  24. // Reset the start time for all tasks. At first glance this appears to need to be
  25. // volatile, since we are reading from a different thread when it is set, but it
  26. // is protected by the taskCount memory barrier.
  27. // See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html
  28. startNs = endTimeNs;
  29. // Calculate the new desired queue size
  30. try {
  31. //计算lambda,tasksPerFrame个任务执行成功的总时间是 totalNanos. 因此,lambda可理解为处理速率
  32. final double lambda = calculateLambda(tasksPerFrame, Math.max(totalNanos, 1L));
  33. //根据 little's law 计算出来的任务队列的理想容量(任务队列所允许的最大长度)
  34. final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos);
  35. //当前任务队列的长度
  36. final int oldCapacity = workQueue.capacity();
  37. if (logger.isDebugEnabled()) {
  38. final long avgTaskTime = totalNanos / tasksPerFrame;
  39. logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " +
  40. "[{} tasks/s], optimal queue is [{}], current capacity [{}]",
  41. getName(),
  42. tasksPerFrame,
  43. TimeValue.timeValueNanos(totalRuntime),
  44. TimeValue.timeValueNanos(avgTaskTime),
  45. TimeValue.timeValueNanos((long)executionEWMA.getAverage()),
  46. String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()),
  47. desiredQueueSize,
  48. oldCapacity);
  49. }
  50. // Adjust the queue size towards the desired capacity using an adjust of
  51. // QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max
  52. // values the queue size can have.
  53. // 将任务队列的容量从 oldCapacity 调整到 newCapacity,并不是直接将任务队列的长度调整到desiredQueueSize
  54. final int newCapacity =
  55. workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize);
  56. if (oldCapacity != newCapacity && logger.isDebugEnabled()) {
  57. logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", getName(),
  58. newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT,
  59. oldCapacity, newCapacity);
  60. }
  61. } catch (ArithmeticException e) {
  62. // There was an integer overflow, so just log about it, rather than adjust the queue size
  63. logger.warn(() -> new ParameterizedMessage(
  64. "failed to calculate optimal queue size for [{}] thread pool, " +
  65. "total frame time [{}ns], tasks [{}], task execution time [{}ns]",
  66. getName(), totalRuntime, tasksPerFrame, totalNanos),
  67. e);
  68. } finally {
  69. // Finally, decrement the task count and time back to their starting values. We
  70. // do this at the end so there is no concurrent adjustments happening. We also
  71. // decrement them instead of resetting them back to zero, as resetting them back
  72. // to zero causes operations that came in during the adjustment to be uncounted
  73. int tasks = taskCount.addAndGet(-this.tasksPerFrame);
  74. assert tasks >= 0 : "tasks should never be negative, got: " + tasks;
  75. if (tasks >= this.tasksPerFrame) {
  76. // Start over, because we can potentially reach a "never adjusting" state,
  77. //
  78. // consider the following:
  79. // - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10)
  80. // - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25
  81. // - Adjustment happens and we decrement the tasks by 10, taskCount is now 15
  82. // - Since taskCount will now be incremented forever, it will never be 10 again,
  83. // so there will be no further adjustments
  84. logger.debug(
  85. "[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", getName());
  86. //任务队列的长度调整完成后,总任务耗时重置为1,这样可开始下一轮统计
  87. totalTaskNanos.getAndSet(1);
  88. taskCount.getAndSet(0);
  89. startNs = System.nanoTime();
  90. } else {
  91. // Do a regular adjustment
  92. totalTaskNanos.addAndGet(-totalNanos);
  93. }
  94. }
  95. }
  96. }

上面的代码注释大概描述了线程池任务队列的长度是如何动态调整的,下面再记录一些细节方便更好地理解整个调整过程。

1 线程池也是有生命周期的

关于线程池状态的描述可参考java.util.concurrent.ThreadPoolExecutor类的源码注释。当线程池处于RUNNING状态时,可接收新提交的任务并且能处理已在队列中排队的任务;当处于SHUTDOWN状态时,不能接收新提交的任务,但能处理已在队列中排队等待的任务。当处于STOP状态时,不能接收新提交的任务了,也不能处理在任务队列中排队等待的任务了,正在执行中的任务也会被强制中断。所以,要想"正确"地关闭线程池,应该分步骤处理:这里给一个ES中实现的处理定时任务的线程池如何关闭的示例:
org.elasticsearch.threadpool.Scheduler.terminate

  1. static boolean terminate(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, long timeout, TimeUnit timeUnit) {
  2. //先调用 shutdown(), 线程池不再接收新提交的任务了
  3. scheduledThreadPoolExecutor.shutdown();
  4. //超时等待, 如果在timeout时间内线程池中排队的任务和正在执行的任务都执行完成了返回true,否则返回false
  5. if (awaitTermination(scheduledThreadPoolExecutor, timeout, timeUnit)) {
  6. return true;
  7. }
  8. //last resort. 在上面awaitTermination timeout后线程池中仍有任务在执行
  9. //调用shutdownNow强制中断任务,关闭线程池
  10. scheduledThreadPoolExecutor.shutdownNow();
  11. return awaitTermination(scheduledThreadPoolExecutor, timeout, timeUnit);
  12. }

这种先调用shutdown,再调用 awaitTermination,最后再调用shutdownNow的“三步曲”方式关闭线程池,awaitTermination起到了"缓冲"作用,尽可能减少关闭线程池导致的任务执行结果不确定的影响。看JDK源码:java.util.concurrent.ScheduledThreadPoolExecutor.shutdownNow,可知:关闭线程池时,最好不要一开始就直接调用shutdownNow方法,而是分步骤地关闭线程池。

  1. /**
  2. * Attempts to stop all actively executing tasks, halts the
  3. * processing of waiting tasks, and returns a list of the tasks
  4. * that were awaiting execution.
  5. *
  6. * <p>This method does not wait for actively executing tasks to
  7. * terminate. Use {@link #awaitTermination awaitTermination} to
  8. * do that.
  9. *
  10. * <p>There are no guarantees beyond best-effort attempts to stop
  11. * processing actively executing tasks. This implementation
  12. * cancels tasks via {@link Thread#interrupt}, so any task that
  13. * fails to respond to interrupts may never terminate.
  14. *
  15. * @return list of tasks that never commenced execution.
  16. * Each element of this list is a {@link ScheduledFuture},
  17. * including those tasks submitted using {@code execute},
  18. * which are for scheduling purposes used as the basis of a
  19. * zero-delay {@code ScheduledFuture}.
  20. * @throws SecurityException {@inheritDoc}
  21. */
  22. public List<Runnable> shutdownNow() {
  23. return super.shutdownNow();
  24. }

shutdownNow方法会停止所有正在执行的任务(线程),stop all actively executing tasks。会中止所有处于等待状态的任务 halts the processing of waiting tasks,这里的waiting tasks,我的理解:就是在java.lang.Thread.State类中那些处于WAITING状态的线程所执行的任务。并且,shutdownNow返回所有在任务队列中排队等待处理的所有任务 returns a list of the tasks that were awaiting execution.

shutdownNow方法不会等待正在执行的任务执行完成,而是通过中断方式直接请求中断该任务,This method does not wait for actively executing tasks to terminate。由于,有些任务(线程)可能会忽略中断请求、甚至屏蔽中断请求,因此它只能做到 best-effort 结束线程。对于那些未能响应中断的线程而言,有可能它所执行的任务就永远不会结束了,so any task that fails to respond to interrupts may never terminate.
因此,从这里可知:我们在编程中 implements Runnable 接口时,run方法代码逻辑里面最好能够保证对中断异常的响应,而不是直接把所有的异常都catch住,只做简单的打印处理,也不向上抛出。

2 ResizableBlockingQueue并不是每次执行完一个任务就进行一次调整

这样显然代价太大。而是执行完一批任务后,再进行调整。每批任务默认2000个,由tasksPerFrame变量决定每批任务个数。

任务队列的调整长度是有上限的,每次最多调整 QUEUE_ADJUSTMENT_AMOUNT(默认50)

任务队列长度的调整并不是直接调整到little's law 计算出来的理想任务队列长度(desiredQueueSize)。每次调整是有限制的,长度的变化不超过QUEUE_ADJUSTMENT_AMOUNT

  1. if (optimalCapacity > capacity + adjustmentAmount) {
  2. // adjust up
  3. final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount);
  4. this.capacity = newCapacity;
  5. return newCapacity;
  6. } else if (optimalCapacity < capacity - adjustmentAmount) {
  7. // adjust down
  8. final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount);
  9. this.capacity = newCapacity;
  10. return newCapacity;
  11. } else {
  12. return this.capacity;
  13. }

总结

本文记录了ES6.3.2 SEARCH线程池的源码实现。用户发起的搜索请求会封装成SEARCH操作。SEARCH操作的任务是由QueueResizingEsThreadPoolExecutor处理的,采用的任务队列是 ResizableBlockingQueue,ResizableBlockingQueue封装了LinkedTransferQueue,但是提供了容量限制。
随着源源不断的搜索请求被处理,可动态调整任务队列的容量。SEARCH线程池采用的拒绝策略是 EsAbortPolicy,搜索请求太频繁时线程池处理不过来时会被拒绝掉。
通过将Runnable任务封装成TimedRunnable,可实现统计每个搜索任务的执行时间、排队时间。这些统计都是在线程池的afterExecute()方法中实现的。
另外,本文还分析了如何正确地关闭线程池,以及不恰当地关闭线程池给任务的执行结果带来的不确定性的分析。看完ES的线程池模块的源码后,对线程池的认识和理解深刻了许多,后面还会分析在ES中如何实现执行定时任务、周期性任务的线程池,这种线程池可用来执行一些周期性的 ping 命令(节点之间的心跳包)等ES节点之间的通信。以上若有错误,还请批评指正。

参考链接:

到这里,ES的线程池模块所有源码分析都结束了。总体来说,ES对线程池的管理是"集中式"的,试想:一个大型系统,里面有各种各样复杂的操作,是将线程池散落在代码各处呢,还是在系统启动时创建好,然后统一集中管理?
另外,由于JDK java.util.concurrent.Future#get()获取任务的执行结果时必须"阻塞",另一个方法 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 也是超时阻塞,这意味着线程池在提交任务执行后,在获取结果这个步骤是必须阻塞等待的。那有没有一种方法在获取结果时也不阻塞呢?这就需要Listener机制(监听器)了,Listener其实就是一种处理逻辑,一种怎样处理某个结果(Runnable/Callable执行完成的结果)的处理逻辑。其大概思想是:当Runnable(Callable)任务执行完成后,有了结果,回调Listener,执行 处理结果的逻辑。这样,就不用像 java.util.concurrent.Future#get() 那样,get()阻塞直至获取到结果,然后再执行某种处理逻辑 处理 get()获取到的结果。

最后扯一扯看源码的一些体会:当开始看一个系统的源代码时,一般是先用过它了,在使用的过程中了解了一些功能,然后不满足于现状,想要了解背后的原理。那面对一个几十万行代码的系统,从哪个地方入手开始看呢?我觉得有以下几点可供参考:

  • docs 了解、官方文档、github issue。
  • 系统的启动流程debug,从main函数开始,调试跟踪它是怎么启动的?
  • 优先看一些基础模块(比如ES的线程池模块),一是基础模块一般代码比较少,不涉及到具体的使用功能,而且是通用的,并不需要"领域特定知识"。作为一个新手,对某个功能的流程都不清楚,直接看功能模块源码,很容易陷入细节,懵住。
  • 对自己使用过的功能,尝试阅读它的源码实现。因为,你用了这个功能解决了问题,有好奇心、带着问题驱动,再去读源码细节实现好一些。
  • 再说一个更具体的,当阅读一个复杂的JAVA类 时,里面有很多实例变量(方法)、我一般是挑基础的类入手,基础的类是系统的"公共服务",很重要,其他地方又用到了它,理解了它,会更好地理解其他地方的代码。基础的类一般是作为其他类的实例变量,提供一个小功能,简单。

原文:https://www.cnblogs.com/hapjin/p/11011712.html

原文链接:http://www.cnblogs.com/hapjin/p/11011712.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号