经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 移动开发 » Android » 查看文章
Android开发中线程池源码解析
来源:jb51  时间:2021/11/23 16:30:44  对本文有异议

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。----摘自维基百科

我们在Android或者Java开发中,日常所使用的就是ThreadPoolExecutor了,我们先来看下如何使用一个线程池来代替多线程开发。

使用线程池

  1. // 创建一个核心线程数为5,最大线程数为10,空闲线程存活时间为60s的线程池对象
  2. val threadPoolExecutor = ThreadPoolExecutor(
  3. 5, 10, 60,
  4. TimeUnit.MINUTES,
  5. ArrayBlockingQueue<Runnable>(100),
  6. RejectedExecutionHandler { _, _ -> println("reject submit thread to thread pool") }
  7. )
  8. // 测试
  9. for (i in 1..10) {
  10. threadPoolExecutor.execute { println("execute thread is:${Thread.currentThread().name}") }
  11. }
  12. // 结果
  13. // execute thread is:pool-1-thread-1
  14. // execute thread is:pool-1-thread-1
  15. // execute thread is:pool-1-thread-1
  16. // execute thread is:pool-1-thread-1
  17. // execute thread is:pool-1-thread-5
  18. // execute thread is:pool-1-thread-5
  19. // execute thread is:pool-1-thread-4
  20. // execute thread is:pool-1-thread-3
  21. // execute thread is:pool-1-thread-2
  22. // execute thread is:pool-1-thread-1

从结果就可以看出来,执行时间操作,但是只创建了5个线程,另外5次都是复用线程的。这样就达到了复用存在的线程、减少对象的创建和销毁的额外开销;并且可以控制最大线程数,也就是控制了最大并发数。

知道如何使用一个线程池还不够,我们需要看看ThreadPoolExecutor是如何创建、复用这些线程的。下面我们看看创建ThreadPoolExecutor对象的几个参数:

构造方法

  1. /**
  2. * 创建一个ThreadPoolExecutor对象
  3. *
  4. * @param corePoolSize 核心线程数,这些线程会一直在线程池中,除非设置了 allowCoreThreadTimeOut
  5. * @param maximumPoolSize 最大线程数,运行线程创建的最大值
  6. * @param keepAliveTime 当线程数>核心线程数的时候,这个值就是空闲且非核心线程存活的时间
  7. * @param unit keepAliveTime的单位
  8. * @param workQueue 保存task的队列,直到执行execute()方法执行
  9. * @param threadFactory ThreadFactory是一个接口,里面只有Thread newThread(Runnable r)方法,用来创建线程,
  10. * 默认采用Executors.defaultThreadFactory()
  11. * @param handler 拒绝处理任务时的策略,如果线程池满了且所有线程都不处于空闲状态,
  12. * 通过RejectedExecutionHandler接口的rejectedExecution(Runnable r, ThreadPoolExecutor executor)来处理传进来的Runnable
  13. * 系统提供了四种:CallerRunsPolicy(), AbortPolicy(), DiscardPolicy(), DiscardOldestPolicy()
  14. * 默认采用new AbortPolicy()
  15. */
  16. public ThreadPoolExecutor(int corePoolSize,
  17. int maximumPoolSize,
  18. long keepAliveTime,
  19. TimeUnit unit,
  20. BlockingQueue<Runnable> workQueue,
  21. ThreadFactory threadFactory,
  22. RejectedExecutionHandler handler){
  23. if (corePoolSize < 0 ||
  24. maximumPoolSize <= 0 ||
  25. maximumPoolSize < corePoolSize ||
  26. keepAliveTime < 0)
  27. throw new IllegalArgumentException();
  28. if (workQueue == null || threadFactory == null || handler == null)
  29. throw new NullPointerException();
  30. this.acc = System.getSecurityManager() == null ?
  31. null :
  32. AccessController.getContext();
  33. this.corePoolSize = corePoolSize;
  34. this.maximumPoolSize = maximumPoolSize;
  35. this.workQueue = workQueue;
  36. this.keepAliveTime = unit.toNanos(keepAliveTime);
  37. this.threadFactory = threadFactory;
  38. this.handler = handler;
  39. }

我在方法头注释中我都一一解释了几个参数的作用,还有几点需要注意的就是:

  • 核心线程数不能小于0;
  • 最大线程数不能小于0;
  • 最大线程数不能小于核心线程数;
  • 空闲线程的存活时间不能小于0;

通过上面的解释我们很明白的知道前面几个参数的作用,但是最后两个参数我们并不能通过表面的解释通晓它,既然不能通过表象看懂他俩,那就看看默认的实现是如何做的,这样在接下来的源码分析中很有帮助。

ThreadFactory:线程工厂

ThreadFactory 是一个接口,里面只由唯一的 Thread newThread(Runnable r); 方法,此方法是用来创建线程的,从接口中我们得到的就只有这么多,下面我们看看 Executors 默认的 DefaultThreadFactory 类:

  1. // 静态内部类
  2. static class DefaultThreadFactory implements ThreadFactory {
  3. // 线程池的标识,从1开始没创建一个线程池+1
  4. private static final AtomicInteger poolNumber = new AtomicInteger(1);
  5. // 线程组
  6. private final ThreadGroup group;
  7. // 线程名中的结尾标识,从1开始每创建一个线程+1
  8. private final AtomicInteger threadNumber = new AtomicInteger(1);
  9. // 线程名
  10. private final String namePrefix;
  11. DefaultThreadFactory() {
  12. SecurityManager s = System.getSecurityManager();
  13. group = (s != null) ? s.getThreadGroup() :
  14. Thread.currentThread().getThreadGroup();
  15. namePrefix = "pool-" +
  16. poolNumber.getAndIncrement() +
  17. "-thread-";
  18. }
  19. public Thread newThread(Runnable r) {
  20. Thread t = new Thread(group, r,
  21. namePrefix + threadNumber.getAndIncrement(),
  22. 0);
  23. if (t.isDaemon())
  24. t.setDaemon(false);
  25. if (t.getPriority() != Thread.NORM_PRIORITY)
  26. t.setPriority(Thread.NORM_PRIORITY);
  27. return t;
  28. }
  29. }

RejectedExecutionHandler:拒绝处理任务的策略

RejectedExecutionHandler 也是一个接口,并且也只提供了唯一的 void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 方法。我们可以自定义策略,也可以用上面提到的封装好的四种策略,先看一下四种策略分别怎么拒绝任务的:

CallerRunsPolicy

  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2. /**
  3. * Creates a {@code CallerRunsPolicy}.
  4. */
  5. public CallerRunsPolicy() {
  6. }
  7. /**
  8. * 如果线程池还没关闭,那么就再次执行这个Runnable
  9. */
  10. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  11. if (!e.isShutdown()) {
  12. r.run();
  13. }
  14. }
  15. }

AbortPolicy

  1. public static class AbortPolicy implements RejectedExecutionHandler {
  2. /**
  3. * Creates an {@code AbortPolicy}.
  4. */
  5. public AbortPolicy() {
  6. }
  7. /**
  8. * 这个策略就是抛出异常,不做其他处理
  9. */
  10. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  11. throw new RejectedExecutionException("Task " + r.toString() +
  12. " rejected from " +
  13. e.toString());
  14. }
  15. }

DiscardPolicy

  1. public static class DiscardPolicy implements RejectedExecutionHandler {
  2. /**
  3. * Creates a {@code DiscardPolicy}.
  4. */
  5. public DiscardPolicy() {
  6. }
  7. /**
  8. * 什么也不做,也就是抛弃了这个Runnable
  9. */
  10. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  11. }
  12. }

DiscardOldestPolicy

  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  2. /**
  3. * Creates a {@code DiscardOldestPolicy} for the given executor.
  4. */
  5. public DiscardOldestPolicy() {
  6. }
  7. /**
  8. * 1. 线程池未关闭
  9. * 2. 获取队列中的下一个Runnable
  10. * 3. 获取到了,但是不对它进行处理,也就是抛弃它
  11. * 4. 执行我们传过来的这个Runnable
  12. */
  13. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  14. if (!e.isShutdown()) {
  15. e.getQueue().poll();
  16. e.execute(r);
  17. }
  18. }
  19. }

重要的参数

除了上述构造方法中的几个参数外,线程池还有几个比较核心的参数,如下:

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. // ctl 的低29位表示线程池中的线程数,高3位表示当前线程状态
  3. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  4. // 29
  5. private static final int COUNT_BITS = Integer.SIZE - 3;
  6. // (2^29) -1
  7. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  8. // 运行状态:接受新任务并处理排队的任务
  9. private static final int RUNNING = -1 << COUNT_BITS;
  10. // 关闭状态:不接受新任务,但处理排队的任务
  11. private static final int SHUTDOWN = 0 << COUNT_BITS;
  12. // 停止状态:不接受新任务,不处理排队的任务,中断正在进行的任务
  13. private static final int STOP = 1 << COUNT_BITS;
  14. // 整理状态:整理状态,所有任务已终止,workerCount为零,线程将运行terminate()方法
  15. private static final int TIDYING = 2 << COUNT_BITS;
  16. // 终止状态:terminate()方法执行完成
  17. private static final int TERMINATED = 3 << COUNT_BITS;
  18. // 表示线程是否允许或停止
  19. private static int runStateOf(int c) { return c & ~CAPACITY; }
  20. // 线程的有效数量
  21. private static int workerCountOf(int c) { return c & CAPACITY; }
  22. private static int ctlOf(int rs, int wc) { return rs | wc; }
  23. ......后面的源码暂时省略
  24. }

execute:执行

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. // 如果运行中的线程数小于核心线程数,执行addWorker(command, true)创建新的核心Thread执行任务
  6. if (workerCountOf(c) < corePoolSize) {
  7. if (addWorker(command, true))
  8. return;
  9. c = ctl.get();
  10. }
  11. // 1. 已经满足:运行中的线程数大于核心线程数,但是小于最大线程数
  12. // 2. 需要满足:线程池在运行状态
  13. // 3. 需要满足:添加到工作队列中成功
  14. if (isRunning(c) && workQueue.offer(command)) {
  15. int recheck = ctl.get();
  16. // 如果线程不在运行状态,就从工作队列中移除command
  17. // 并且执行拒绝策略
  18. if (!isRunning(recheck) && remove(command))
  19. reject(command);
  20. // 线程池处于运行状态,但是没有线程,则addWorker(null, false)
  21. // 至于这里为什么要传入一个null,因为在最外层的if条件中我们已经将Runnable添加到工作队列中了
  22. // 而且在runWorker()源码中也可以得到答案,如果传入的Runnable为空,就会去工作队列中取task。
  23. else if (workerCountOf(recheck) == 0)
  24. addWorker(null, false);
  25. }
  26. // 执行addWorker()创建新的非核心线程Thread执行任务
  27. // addWorker() 失败,执行拒绝策略
  28. else if (!addWorker(command, false))
  29. reject(command);
  30. }

从上面源码中可以看出,execute()一个新的任务,主要有以下这几种情况:

1、核心线程未满,直接新建核心线程并执行任务;
2、核心线程满了,工作队列未满,将任务添加到工作队列中;
3、核心线程和工作队列都满,但是最大线程数未达到,新建线程并执行任务;
4、上面条件都不满足,那么就执行拒绝策略。

更形象的可以看下方流程图:

添加任务的流程图

addWorker(Runnable , boolean):添加Worker

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. // 标记外循环,比如在内循环中break retry就直接跳出外循环
  3. retry:
  4. for (; ; ) {
  5. int c = ctl.get();
  6. int rs = runStateOf(c);
  7. // 直接返回false有以下3种情况:
  8. // 1. 线程池状态为STOP、TIDYING、TERMINATED
  9. // 2. 线程池状态不是running状态,并且firstTask不为空
  10. // 3. 线程池状态不是running状态,并且工作队列为空
  11. if (rs >= SHUTDOWN &&
  12. !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
  13. return false;
  14. for (; ; ) {
  15. int wc = workerCountOf(c);
  16. // 如果添加的是核心线程,但是运行的线程数大于等于核心线程数,那么就不添加了,直接返回
  17. // 如果添加的是非核心线程,但是运行的线程数大于等于最大线程数,那么也不添加,直接返回
  18. if (wc >= CAPACITY ||
  19. wc >= (core ? corePoolSize : maximumPoolSize))
  20. return false;
  21. // 增加workerCount的值 +1
  22. if (compareAndIncrementWorkerCount(c))
  23. // 跳出外循环
  24. break retry;
  25. c = ctl.get(); // 重新检查线程池状态
  26. if (runStateOf(c) != rs)
  27. continue retry;
  28. // 重新检查的状态和之前不合,再次从外循环进入
  29. }
  30. }
  31. boolean workerStarted = false;
  32. boolean workerAdded = false;
  33. Worker w = null;
  34. try {
  35. w = new Worker(firstTask);
  36. final Thread t = w.thread;
  37. if (t != null) {
  38. // 线程池重入锁
  39. final ReentrantLock mainLock = this.mainLock;
  40. // 获得锁
  41. mainLock.lock();
  42. try {
  43. // Recheck while holding lock.
  44. // Back out on ThreadFactory failure or if
  45. // shut down before lock acquired.
  46. int rs = runStateOf(ctl.get());
  47. // 线程池在运行状态或者是线程池关闭同时Runnable也为空
  48. if (rs < SHUTDOWN ||
  49. (rs == SHUTDOWN && firstTask == null)) {
  50. if (t.isAlive()) // precheck that t is startable
  51. throw new IllegalThreadStateException();
  52. // 想Worker中添加新的Worker
  53. workers.add(w);
  54. int s = workers.size();
  55. if (s > largestPoolSize)
  56. largestPoolSize = s;
  57. workerAdded = true;
  58. }
  59. } finally {
  60. // 释放锁
  61. mainLock.unlock();
  62. }
  63. // 如果添加成功,启动线程
  64. if (workerAdded) {
  65. t.start();
  66. workerStarted = true;
  67. }
  68. }
  69. } finally {
  70. if (!workerStarted)
  71. addWorkerFailed(w);
  72. }
  73. return workerStarted;
  74. }

addWorker() 主要就是在满足种种条件(上述源码中解释了)后,新建一个Worker对象,并添加到HashSet<Worker> workers中去,最后调用新建Worker对象的Thread变量的start()方法。

Worker类

Worker是一个继承了AQS并实现了Runnable的内部类,我们重点看看它的run()方法,因为上面addWorker()中,t.start()触发的就是它的run()方法:

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable {
  4. /**
  5. * This class will never be serialized, but we provide a
  6. * serialVersionUID to suppress a javac warning.
  7. */
  8. private static final long serialVersionUID = 6138294804551838833L;
  9. /**
  10. * Thread this worker is running in. Null if factory fails.
  11. */
  12. final Thread thread;
  13. /**
  14. * Initial task to run. Possibly null.
  15. */
  16. Runnable firstTask;
  17. /**
  18. * Per-thread task counter
  19. */
  20. volatile long completedTasks;
  21. /**
  22. * Creates with given first task and thread from ThreadFactory.
  23. *
  24. * @param firstTask the first task (null if none)
  25. */
  26. Worker(Runnable firstTask) {
  27. setState(-1); // inhibit interrupts until runWorker
  28. this.firstTask = firstTask;
  29. // 这边是把Runnable传给了Thread,也就是说Thread.run()就是执行了下面的run()方法
  30. this.thread = getThreadFactory().newThread(this);
  31. }
  32. /**
  33. * Delegates main run loop to outer runWorker
  34. */
  35. public void run() {
  36. runWorker(this);
  37. }
  38. }

run()方法实际调用了runWorker(Worker)方法

runWorker(Worker)方法:

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // 释放锁,允许中断
  6. boolean completedAbruptly = true;
  7. try {
  8. // 1. worker中的task不为空
  9. // 2. 如果worker的task为空,那么取WorkerQueue的task
  10. while (task != null || (task = getTask()) != null) {
  11. w.lock();
  12. // If pool is stopping, ensure thread is interrupted;
  13. // if not, ensure thread is not interrupted. This
  14. // requires a recheck in second case to deal with
  15. // shutdownNow race while clearing interrupt
  16. if ((runStateAtLeast(ctl.get(), STOP) ||
  17. (Thread.interrupted() &&
  18. runStateAtLeast(ctl.get(), STOP))) &&
  19. !wt.isInterrupted())
  20. wt.interrupt();
  21. try {
  22. // 这是一个空方法,可由子类实现
  23. beforeExecute(wt, task);
  24. Throwable thrown = null;
  25. try {
  26. // 执行task
  27. task.run();
  28. }
  29. .... 省略
  30. // 这是一个空方法,可由子类实现
  31. finally {
  32. afterExecute(task, thrown);
  33. }
  34. } finally {
  35. task = null;
  36. w.completedTasks++;
  37. w.unlock();
  38. }
  39. }
  40. completedAbruptly = false;
  41. } finally {
  42. processWorkerExit(w, completedAbruptly);
  43. }
  44. }

getTask():

  1. ```java
  2. private Runnable getTask() {
  3. // 进入死循环
  4. for (; ; ) {
  5. try {
  6. // 为true的条件:
  7. // allowCoreThreadTimeOut=true: 核心线程需根据keepAliveTime超时等待
  8. // 核心线程数已满
  9. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  10. // 如果timed为true,执行BlockQueue.poll(),这个操作在取不到task的时候会等待keepAliveTime,然后返回null
  11. // 如果timed为false,执行BlockQueue.take(),这个操作在队列为空的时候一直阻塞
  12. Runnable r = timed ?
  13. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  14. workQueue.take();
  15. if (r != null)
  16. return r;
  17. }
  18. }
  19. }
  20. ```

线程池的源码按照上述的几个方法(execute(runnable) -> addWorker(runnable,core) -> Worker -> runWorker(worker) -> getTask())的顺序来分析,你就可以很清晰的将运作过程了解清楚,同事构造方法和几个重要的参数一定要懂,不然对于后面的源码分析很受阻碍,相信大家通过这篇文章可以加深对线程池的理解。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持w3xue。

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号