经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Spring Boot » 查看文章
Spring boot定时任务的原理及动态创建详解
来源:jb51  时间:2019/3/4 9:52:11  对本文有异议

v一、前言

定时任务一般是项目中都需要用到的,可以用于定时处理一些特殊的任务。这篇文章主要给大家介绍了关于Spring boot定时任务的原理及动态创建的相关内容,下面来一起看看详细的介绍吧

上周工作遇到了一个需求,同步多个省份销号数据,解绑微信粉丝。分省定时将销号数据放到SFTP服务器上,我需要开发定时任务去解析文件。因为是多省份,服务器、文件名规则、数据规则都不一定,所以要做成可配置是有一定难度的。数据规则这块必须强烈要求统一,服务器、文件名规则都可以从配置中心去读。每新增一个省份的配置,后台感知到后,动态生成定时任务。

v二、Springboot引入定时任务核心配置

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Import(SchedulingConfiguration.class)
  4. @Documented
  5. public @interface EnableScheduling {
  6.  
  7. }
  8.  
  9. @Configuration
  10. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  11. public class SchedulingConfiguration {
  12.  
  13.  @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
  14.  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  15.  public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
  16.  return new ScheduledAnnotationBeanPostProcessor();
  17.  }
  18.  
  19. }

接下来主要看一下这个核心后置处理器:ScheduledAnnotationBeanPostProcessor 。

  1. @Override
  2. public Object postProcessAfterInitialization(Object bean, String beanName) {
  3.  if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
  4.   bean instanceof ScheduledExecutorService) {
  5.  // Ignore AOP infrastructure such as scoped proxies.
  6.  return bean;
  7.  }
  8.  
  9.  Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
  10.  if (!this.nonAnnotatedClasses.contains(targetClass)) {
  11.  Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
  12.   (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
  13.    Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
  14.     method, Scheduled.class, Schedules.class);
  15.    return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
  16.   });
  17.  if (annotatedMethods.isEmpty()) {
  18.   this.nonAnnotatedClasses.add(targetClass);
  19.   if (logger.isTraceEnabled()) {
  20.   logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
  21.   }
  22.  }
  23.  else {
  24.   // Non-empty set of methods
  25.   annotatedMethods.forEach((method, scheduledMethods) ->
  26.    scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
  27.   if (logger.isTraceEnabled()) {
  28.   logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
  29.    "': " + annotatedMethods);
  30.   }
  31.  }
  32.  }
  33.  return bean;
  34. }

1、处理Scheduled注解,通过ScheduledTaskRegistrar注册定时任务。

  1. private void finishRegistration() {
  2.  if (this.scheduler != null) {
  3.  this.registrar.setScheduler(this.scheduler);
  4.  }
  5.  
  6.  if (this.beanFactory instanceof ListableBeanFactory) {
  7.  Map<String, SchedulingConfigurer> beans =
  8.   ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
  9.  List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
  10.  AnnotationAwareOrderComparator.sort(configurers);
  11.  for (SchedulingConfigurer configurer : configurers) {
  12.   configurer.configureTasks(this.registrar);
  13.  }
  14.  }
  15.  
  16.  if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
  17.  Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
  18.  try {
  19.   // Search for TaskScheduler bean...
  20.   this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
  21.  }
  22.  catch (NoUniqueBeanDefinitionException ex) {
  23.   logger.trace("Could not find unique TaskScheduler bean", ex);
  24.   try {
  25.   this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
  26.   }
  27.   catch (NoSuchBeanDefinitionException ex2) {
  28.   if (logger.isInfoEnabled()) {
  29.    logger.info("More than one TaskScheduler bean exists within the context, and " +
  30.     "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
  31.     "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
  32.     "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
  33.     ex.getBeanNamesFound());
  34.   }
  35.   }
  36.  }
  37.  catch (NoSuchBeanDefinitionException ex) {
  38.   logger.trace("Could not find default TaskScheduler bean", ex);
  39.   // Search for ScheduledExecutorService bean next...
  40.   try {
  41.   this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
  42.   }
  43.   catch (NoUniqueBeanDefinitionException ex2) {
  44.   logger.trace("Could not find unique ScheduledExecutorService bean", ex2);
  45.   try {
  46.    this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
  47.   }
  48.   catch (NoSuchBeanDefinitionException ex3) {
  49.    if (logger.isInfoEnabled()) {
  50.    logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
  51.     "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
  52.     "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
  53.     "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
  54.     ex2.getBeanNamesFound());
  55.    }
  56.   }
  57.   }
  58.   catch (NoSuchBeanDefinitionException ex2) {
  59.   logger.trace("Could not find default ScheduledExecutorService bean", ex2);
  60.   // Giving up -> falling back to default scheduler within the registrar...
  61.   logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
  62.   }
  63.  }
  64.  }
  65.  
  66.  this.registrar.afterPropertiesSet();
  67. }

  1、通过一系列的SchedulingConfigurer动态配置ScheduledTaskRegistrar。

  2、向ScheduledTaskRegistrar注册一个TaskScheduler(用于对Runnable的任务进行调度,它包含有多种触发规则)。

  3、registrar.afterPropertiesSet(),在这开始安排所有的定时任务开始执行了。

  1. protected void scheduleTasks() {
  2.  if (this.taskScheduler == null) {
  3.  this.localExecutor = Executors.newSingleThreadScheduledExecutor();
  4.  this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
  5.  }
  6.  if (this.triggerTasks != null) {
  7.  for (TriggerTask task : this.triggerTasks) {
  8.   addScheduledTask(scheduleTriggerTask(task));
  9.  }
  10.  }
  11.  if (this.cronTasks != null) {
  12.  for (CronTask task : this.cronTasks) {
  13.   addScheduledTask(scheduleCronTask(task));
  14.  }
  15.  }
  16.  if (this.fixedRateTasks != null) {
  17.  for (IntervalTask task : this.fixedRateTasks) {
  18.   addScheduledTask(scheduleFixedRateTask(task));
  19.  }
  20.  }
  21.  if (this.fixedDelayTasks != null) {
  22.  for (IntervalTask task : this.fixedDelayTasks) {
  23.   addScheduledTask(scheduleFixedDelayTask(task));
  24.  }
  25.  }
  26. }

  1、TriggerTask:动态定时任务。通过Trigger#nextExecutionTime 给定的触发上下文确定下一个执行时间。

  2、CronTask:动态定时任务,TriggerTask子类。通过cron表达式确定的时间触发下一个任务执行。

  3、IntervalTask:一定时间延迟之后,周期性执行的任务。

  4、taskScheduler 如果为空,默认是ConcurrentTaskScheduler,并使用默认单线程的ScheduledExecutor。

v三、主要看一下CronTask工作原理

  1. ScheduledTaskRegistrar.java
  2. @Nullable
  3. public ScheduledTask scheduleCronTask(CronTask task) {
  4.  ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
  5.  boolean newTask = false;
  6.  if (scheduledTask == null) {
  7.  scheduledTask = new ScheduledTask(task);
  8.  newTask = true;
  9.  }
  10.  if (this.taskScheduler != null) {
  11.  scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
  12.  }
  13.  else {
  14.  addCronTask(task);
  15.  this.unresolvedTasks.put(task, scheduledTask);
  16.  }
  17.  return (newTask ? scheduledTask : null);
  18. }
  19.  
  20. ConcurrentTaskScheduler.java
  21. @Override
  22. @Nullable
  23. public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
  24.  try {
  25.  if (this.enterpriseConcurrentScheduler) {
  26.   return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
  27.  }
  28.  else {
  29.   ErrorHandler errorHandler =
  30.    (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
  31.   return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
  32.  }
  33.  }
  34.  catch (RejectedExecutionException ex) {
  35.  throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
  36.  }
  37. }
  38.  
  39. ReschedulingRunnable.java
  40. @Nullable
  41. public ScheduledFuture<?> schedule() {
  42.  synchronized (this.triggerContextMonitor) {
  43.  this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
  44.  if (this.scheduledExecutionTime == null) {
  45.   return null;
  46.  }
  47.  long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
  48.  this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
  49.  return this;
  50.  }
  51. }
  52.  
  53. private ScheduledFuture<?> obtainCurrentFuture() {
  54.  Assert.state(this.currentFuture != null, "No scheduled future");
  55.  return this.currentFuture;
  56. }
  57.  
  58. @Override
  59. public void run() {
  60.  Date actualExecutionTime = new Date();
  61.  super.run();
  62.  Date completionTime = new Date();
  63.  synchronized (this.triggerContextMonitor) {
  64.  Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
  65.  this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
  66.  if (!obtainCurrentFuture().isCancelled()) {
  67.   schedule();
  68.  }
  69.  }
  70. }

  1、最终将task和trigger都封装到了ReschedulingRunnable中。

  2、ReschedulingRunnable实现了任务重复调度(schedule方法中调用调度器executor并传入自身对象,executor会调用run方法,run方法又调用了schedule方法)。

  3、ReschedulingRunnable schedule方法加了同步锁,只能有一个线程拿到下次执行时间并加入执行器的调度。

  4、不同的ReschedulingRunnable对象之间在线程池够用的情况下是不会相互影响的,也就是说满足线程池的条件下,TaskScheduler的schedule方法的多次调用是可以交叉执行的。

  1. ScheduledThreadPoolExecutor.java
  2. public ScheduledFuture<?> schedule(Runnable command,
  3.      long delay,
  4.      TimeUnit unit) {
  5.  if (command == null || unit == null)
  6.  throw new NullPointerException();
  7.  RunnableScheduledFuture<?> t = decorateTask(command,
  8.  new ScheduledFutureTask<Void>(command, null,
  9.      triggerTime(delay, unit)));
  10.  delayedExecute(t);
  11.  return t;
  12. }
  13.  
  14.  
  15. private void delayedExecute(RunnableScheduledFuture<?> task) {
  16.  if (isShutdown())
  17.  reject(task);
  18.  else {
  19.  super.getQueue().add(task);
  20.  if (isShutdown() &&
  21.   !canRunInCurrentRunState(task.isPeriodic()) &&
  22.   remove(task))
  23.   task.cancel(false);
  24.  else
  25.   ensurePrestart();
  26.  }
  27. }

  ScheduledFutureTask 工作原理如下图所示【太懒了,不想画图了,盗图一张】。

 

  1、ScheduledFutureTask会放入优先阻塞队列:ScheduledThreadPoolExecutor.DelayedWorkQueue(二叉最小堆实现)

  2、上图中的Thread对象即ThreadPoolExecutor.Worker,实现了Runnable接口

  1. /**
  2.  * Creates with given first task and thread from ThreadFactory.
  3.  * @param firstTask the first task (null if none)
  4.  */
  5. Worker(Runnable firstTask) {
  6.  setState(-1); // inhibit interrupts until runWorker
  7.  this.firstTask = firstTask;
  8.  this.thread = getThreadFactory().newThread(this);
  9. }
  10.  
  11. /** Delegates main run loop to outer runWorker */
  12. public void run() {
  13.  runWorker(this);
  14. }

  1、Worker中维护了Thread对象,Thread对象的Runnable实例即Worker自身

  2、ThreadPoolExecutor#addWorker方法中会创建Worker对象,然后拿到Worker中的thread实例并start,这样就创建了线程池中的一个线程实例

  3、Worker的run方法会调用ThreadPoolExecutor#runWorker方法,这才是任务最终被执行的地方,该方法示意如下

  (1)首先取传入的task执行,如果task是null,只要该线程池处于运行状态,就会通过getTask方法从workQueue中取任务。ThreadPoolExecutor的execute方法会在无法产生core线程的时候向  workQueue队列中offer任务。
getTask方法从队列中取task的时候会根据相关配置决定是否阻塞和阻塞多久。如果getTask方法结束,返回的是null,runWorker循环结束,执行processWorkerExit方法。
至此,该线程结束自己的使命,从线程池中“消失”。

  (2)在开始执行任务之前,会调用Worker的lock方法,目的是阻止task正在被执行的时候被interrupt,通过调用clearInterruptsForTaskRun方法来保证的(后面可以看一下这个方法),该线程没有自己的interrupt set了。

  (3)beforeExecute和afterExecute方法用于在执行任务前后执行一些自定义的操作,这两个方法是空的,留给继承类去填充功能。

我们可以在beforeExecute方法中抛出异常,这样task不会被执行,而且在跳出该循环的时候completedAbruptly的值是true,表示the worker died due to user exception,会用decrementWorkerCount调整wc。

  (4)因为Runnable的run方法不能抛出Throwables异常,所以这里重新包装异常然后抛出,抛出的异常会使当当前线程死掉,可以在afterExecute中对异常做一些处理。

  (5)afterExecute方法也可能抛出异常,也可能使当前线程死掉。

v四、动态创建定时任务

v  TaskConfiguration 配置类

  1. @Configuration
  2. @EnableScheduling
  3. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  4. public class TaskConfiguration {
  5.  
  6.  @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
  7.  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  8.  public ScheduledExecutorService scheduledAnnotationProcessor() {
  9.  return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());
  10.  }
  11.  
  12.  private static class DefaultThreadFactory implements ThreadFactory {
  13.  private static final AtomicInteger poolNumber = new AtomicInteger(1);
  14.  private final ThreadGroup group;
  15.  private final AtomicInteger threadNumber = new AtomicInteger(1);
  16.  private final String namePrefix;
  17.  
  18.  DefaultThreadFactory() {
  19.   SecurityManager s = System.getSecurityManager();
  20.   group = (!= null) ? s.getThreadGroup() :
  21.    Thread.currentThread().getThreadGroup();
  22.   namePrefix = "pool-" +
  23.    poolNumber.getAndIncrement() +
  24.    "-schedule-";
  25.  }
  26.  
  27.  @Override
  28.  public Thread newThread(Runnable r) {
  29.   Thread t = new Thread(group, r,
  30.    namePrefix + threadNumber.getAndIncrement(),
  31.    0);
  32.   if (t.isDaemon()) {
  33.   t.setDaemon(false);
  34.   }
  35.   if (t.getPriority() != Thread.NORM_PRIORITY) {
  36.   t.setPriority(Thread.NORM_PRIORITY);
  37.   }
  38.   return t;
  39.  }
  40.  }
  41. }

  1、保证ConcurrentTaskScheduler不使用默认单线程的ScheduledExecutor,而是corePoolSize=5的线程池

  2、自定义线程池工厂类

v  DynamicTask 动态定时任务

  1. @Configuration
  2. public class DynamicTask implements SchedulingConfigurer {
  3.  private static Logger LOGGER = LoggerFactory.getLogger(DynamicTask.class);
  4.  
  5.  private static final ExecutorService es = new ThreadPoolExecutor(10, 20,
  6.    0L, TimeUnit.MILLISECONDS,
  7.    new LinkedBlockingQueue<>(10),
  8.    new DynamicTaskConsumeThreadFactory());
  9.  
  10.  
  11.  private volatile ScheduledTaskRegistrar registrar;
  12.  private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
  13.  private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>();
  14.  
  15.  private volatile List<TaskConstant> taskConstants = Lists.newArrayList();
  16.  
  17.  @Override
  18.  public void configureTasks(ScheduledTaskRegistrar registrar) {
  19.   this.registrar = registrar;
  20.   this.registrar.addTriggerTask(() -> {
  21.      if (!CollectionUtils.isEmpty(taskConstants)) {
  22.       LOGGER.info("检测动态定时任务列表...");
  23.       List<TimingTask> tts = new ArrayList<>();
  24.       taskConstants
  25.         .forEach(taskConstant -> {
  26.          TimingTask tt = new TimingTask();
  27.          tt.setExpression(taskConstant.getCron());
  28.          tt.setTaskId("dynamic-task-" + taskConstant.getTaskId());
  29.          tts.add(tt);
  30.         });
  31.       this.refreshTasks(tts);
  32.      }
  33.     }
  34.     , triggerContext -> new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext));
  35.  }
  36.  
  37.  
  38.  public List<TaskConstant> getTaskConstants() {
  39.   return taskConstants;
  40.  }
  41.  
  42.  private void refreshTasks(List<TimingTask> tasks) {
  43.   //取消已经删除的策略任务
  44.   Set<String> taskIds = scheduledFutures.keySet();
  45.   for (String taskId : taskIds) {
  46.    if (!exists(tasks, taskId)) {
  47.     scheduledFutures.get(taskId).cancel(false);
  48.    }
  49.   }
  50.   for (TimingTask tt : tasks) {
  51.    String expression = tt.getExpression();
  52.    if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) {
  53.     LOGGER.error("定时任务DynamicTask cron表达式不合法: " + expression);
  54.     continue;
  55.    }
  56.    //如果配置一致,则不需要重新创建定时任务
  57.    if (scheduledFutures.containsKey(tt.getTaskId())
  58.      && cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) {
  59.     continue;
  60.    }
  61.    //如果策略执行时间发生了变化,则取消当前策略的任务
  62.    if (scheduledFutures.containsKey(tt.getTaskId())) {
  63.     scheduledFutures.remove(tt.getTaskId()).cancel(false);
  64.     cronTasks.remove(tt.getTaskId());
  65.    }
  66.    CronTask task = new CronTask(tt, expression);
  67.    ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
  68.    cronTasks.put(tt.getTaskId(), task);
  69.    scheduledFutures.put(tt.getTaskId(), future);
  70.   }
  71.  }
  72.  
  73.  private boolean exists(List<TimingTask> tasks, String taskId) {
  74.   for (TimingTask task : tasks) {
  75.    if (task.getTaskId().equals(taskId)) {
  76.     return true;
  77.    }
  78.   }
  79.   return false;
  80.  }
  81.  
  82.  @PreDestroy
  83.  public void destroy() {
  84.   this.registrar.destroy();
  85.  }
  86.  
  87.  public static class TaskConstant {
  88.   private String cron;
  89.   private String taskId;
  90.  
  91.   public String getCron() {
  92.    return cron;
  93.   }
  94.  
  95.   public void setCron(String cron) {
  96.    this.cron = cron;
  97.   }
  98.  
  99.   public String getTaskId() {
  100.    return taskId;
  101.   }
  102.  
  103.   public void setTaskId(String taskId) {
  104.    this.taskId = taskId;
  105.   }
  106.  }
  107.  
  108.  private class TimingTask implements Runnable {
  109.   private String expression;
  110.  
  111.   private String taskId;
  112.  
  113.   public String getTaskId() {
  114.    return taskId;
  115.   }
  116.  
  117.   public void setTaskId(String taskId) {
  118.    this.taskId = taskId;
  119.   }
  120.  
  121.   @Override
  122.   public void run() {
  123.    //设置队列大小10
  124.    LOGGER.error("当前CronTask: " + this);
  125.    DynamicBlockingQueue queue = new DynamicBlockingQueue(3);
  126.    es.submit(() -> {
  127.     while (!queue.isDone() || !queue.isEmpty()) {
  128.      try {
  129.       String content = queue.poll(500, TimeUnit.MILLISECONDS);
  130.       if (StringUtils.isBlank(content)) {
  131.        return;
  132.       }
  133.       LOGGER.info("DynamicBlockingQueue 消费:" + content);
  134.       TimeUnit.MILLISECONDS.sleep(500);
  135.      } catch (InterruptedException e) {
  136.       e.printStackTrace();
  137.      }
  138.     }
  139.    });
  140.  
  141.    //队列放入数据
  142.    for (int i = 0; i < 5; ++i) {
  143.     try {
  144.      queue.put(String.valueOf(i));
  145.      LOGGER.info("DynamicBlockingQueue 生产:" + i);
  146.     } catch (InterruptedException e) {
  147.      e.printStackTrace();
  148.     }
  149.    }
  150.    queue.setDone(true);
  151.   }
  152.  
  153.   public String getExpression() {
  154.    return expression;
  155.   }
  156.  
  157.   public void setExpression(String expression) {
  158.    this.expression = expression;
  159.   }
  160.  
  161.   @Override
  162.   public String toString() {
  163.    return ReflectionToStringBuilder.toString(this
  164.      , ToStringStyle.JSON_STYLE
  165.      , false
  166.      , false
  167.      , TimingTask.class);
  168.   }
  169.  
  170.  }
  171.  
  172.  /**
  173.   * 队列消费线程工厂类
  174.   */
  175.  private static class DynamicTaskConsumeThreadFactory implements ThreadFactory {
  176.   private static final AtomicInteger poolNumber = new AtomicInteger(1);
  177.   private final ThreadGroup group;
  178.   private final AtomicInteger threadNumber = new AtomicInteger(1);
  179.   private final String namePrefix;
  180.  
  181.   DynamicTaskConsumeThreadFactory() {
  182.    SecurityManager s = System.getSecurityManager();
  183.    group = (!= null) ? s.getThreadGroup() :
  184.      Thread.currentThread().getThreadGroup();
  185.    namePrefix = "pool-" +
  186.      poolNumber.getAndIncrement() +
  187.      "-dynamic-task-";
  188.   }
  189.  
  190.   @Override
  191.   public Thread newThread(Runnable r) {
  192.    Thread t = new Thread(group, r,
  193.      namePrefix + threadNumber.getAndIncrement(),
  194.      0);
  195.    if (t.isDaemon()) {
  196.     t.setDaemon(false);
  197.    }
  198.    if (t.getPriority() != Thread.NORM_PRIORITY) {
  199.     t.setPriority(Thread.NORM_PRIORITY);
  200.    }
  201.    return t;
  202.   }
  203.  }
  204.  
  205.  private static class DynamicBlockingQueue extends LinkedBlockingQueue<String> {
  206.   DynamicBlockingQueue(int capacity) {
  207.    super(capacity);
  208.   }
  209.  
  210.  
  211.   private volatile boolean done = false;
  212.  
  213.   public boolean isDone() {
  214.    return done;
  215.   }
  216.  
  217.   public void setDone(boolean done) {
  218.    this.done = done;
  219.   }
  220.  }
  221. }

  1、taskConstants 动态任务列表

  2、ScheduledTaskRegistrar#addTriggerTask 添加动态周期定时任务,检测动态任务列表的变化

  1. CronTask task = new CronTask(tt, expression);
  2. ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
  3. cronTasks.put(tt.getTaskId(), task);
  4. scheduledFutures.put(tt.getTaskId(), future);

  3、动态创建cron定时任务,拿到ScheduledFuture实例并缓存起来

  4、在刷新任务列表时,通过缓存的ScheduledFuture实例和CronTask实例,来决定是否取消、移除失效的动态定时任务。

v  DynamicTaskTest 动态定时任务测试类

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class DynamicTaskTest {
  4.  
  5.  @Autowired
  6.  private DynamicTask dynamicTask;
  7.  
  8.  @Test
  9.  public void test() throws InterruptedException {
  10.   List<DynamicTask.TaskConstant> taskConstans = dynamicTask.getTaskConstants();
  11.   DynamicTask.TaskConstant taskConstant = new DynamicTask.TaskConstant();
  12.   taskConstant.setCron("0/5 * * * * ?");
  13.   taskConstant.setTaskId("test1");
  14.   taskConstans.add(taskConstant);
  15.  
  16.  
  17.   DynamicTask.TaskConstant taskConstant1 = new DynamicTask.TaskConstant();
  18.   taskConstant1.setCron("0/5 * * * * ?");
  19.   taskConstant1.setTaskId("test2");
  20.   taskConstans.add(taskConstant1);
  21.  
  22.   DynamicTask.TaskConstant taskConstant2 = new DynamicTask.TaskConstant();
  23.   taskConstant2.setCron("0/5 * * * * ?");
  24.   taskConstant2.setTaskId("test3");
  25.   taskConstans.add(taskConstant2);
  26.  
  27.   TimeUnit.SECONDS.sleep(40);
  28.   //移除并添加新的配置
  29.   taskConstans.remove(taskConstans.size() - 1);
  30.   DynamicTask.TaskConstant taskConstant3 = new DynamicTask.TaskConstant();
  31.   taskConstant3.setCron("0/5 * * * * ?");
  32.   taskConstant3.setTaskId("test4");
  33.   taskConstans.add(taskConstant3);
  34. //
  35.   TimeUnit.MINUTES.sleep(50);
  36.  }
  37. }

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对w3xue的支持。

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号