v一、前言
定时任务一般是项目中都需要用到的,可以用于定时处理一些特殊的任务。这篇文章主要给大家介绍了关于Spring boot定时任务的原理及动态创建的相关内容,下面来一起看看详细的介绍吧
上周工作遇到了一个需求,同步多个省份销号数据,解绑微信粉丝。分省定时将销号数据放到SFTP服务器上,我需要开发定时任务去解析文件。因为是多省份,服务器、文件名规则、数据规则都不一定,所以要做成可配置是有一定难度的。数据规则这块必须强烈要求统一,服务器、文件名规则都可以从配置中心去读。每新增一个省份的配置,后台感知到后,动态生成定时任务。
v二、Springboot引入定时任务核心配置
- @Target(ElementType.TYPE)
- @Retention(RetentionPolicy.RUNTIME)
- @Import(SchedulingConfiguration.class)
- @Documented
- public @interface EnableScheduling {
-
- }
-
- @Configuration
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public class SchedulingConfiguration {
-
- @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
- return new ScheduledAnnotationBeanPostProcessor();
- }
-
- }
接下来主要看一下这个核心后置处理器:ScheduledAnnotationBeanPostProcessor 。
- @Override
- public Object postProcessAfterInitialization(Object bean, String beanName) {
- if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
- bean instanceof ScheduledExecutorService) {
- // Ignore AOP infrastructure such as scoped proxies.
- return bean;
- }
-
- Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
- if (!this.nonAnnotatedClasses.contains(targetClass)) {
- Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
- (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
- Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
- method, Scheduled.class, Schedules.class);
- return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
- });
- if (annotatedMethods.isEmpty()) {
- this.nonAnnotatedClasses.add(targetClass);
- if (logger.isTraceEnabled()) {
- logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
- }
- }
- else {
- // Non-empty set of methods
- annotatedMethods.forEach((method, scheduledMethods) ->
- scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
- if (logger.isTraceEnabled()) {
- logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
- "': " + annotatedMethods);
- }
- }
- }
- return bean;
- }
1、处理Scheduled注解,通过ScheduledTaskRegistrar注册定时任务。
- private void finishRegistration() {
- if (this.scheduler != null) {
- this.registrar.setScheduler(this.scheduler);
- }
-
- if (this.beanFactory instanceof ListableBeanFactory) {
- Map<String, SchedulingConfigurer> beans =
- ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
- List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
- AnnotationAwareOrderComparator.sort(configurers);
- for (SchedulingConfigurer configurer : configurers) {
- configurer.configureTasks(this.registrar);
- }
- }
-
- if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
- Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
- try {
- // Search for TaskScheduler bean...
- this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
- }
- catch (NoUniqueBeanDefinitionException ex) {
- logger.trace("Could not find unique TaskScheduler bean", ex);
- try {
- this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
- }
- catch (NoSuchBeanDefinitionException ex2) {
- if (logger.isInfoEnabled()) {
- logger.info("More than one TaskScheduler bean exists within the context, and " +
- "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
- "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
- "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
- ex.getBeanNamesFound());
- }
- }
- }
- catch (NoSuchBeanDefinitionException ex) {
- logger.trace("Could not find default TaskScheduler bean", ex);
- // Search for ScheduledExecutorService bean next...
- try {
- this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
- }
- catch (NoUniqueBeanDefinitionException ex2) {
- logger.trace("Could not find unique ScheduledExecutorService bean", ex2);
- try {
- this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
- }
- catch (NoSuchBeanDefinitionException ex3) {
- if (logger.isInfoEnabled()) {
- logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
- "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
- "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
- "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
- ex2.getBeanNamesFound());
- }
- }
- }
- catch (NoSuchBeanDefinitionException ex2) {
- logger.trace("Could not find default ScheduledExecutorService bean", ex2);
- // Giving up -> falling back to default scheduler within the registrar...
- logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
- }
- }
- }
-
- this.registrar.afterPropertiesSet();
- }
1、通过一系列的SchedulingConfigurer动态配置ScheduledTaskRegistrar。
2、向ScheduledTaskRegistrar注册一个TaskScheduler(用于对Runnable的任务进行调度,它包含有多种触发规则)。
3、registrar.afterPropertiesSet(),在这开始安排所有的定时任务开始执行了。
- protected void scheduleTasks() {
- if (this.taskScheduler == null) {
- this.localExecutor = Executors.newSingleThreadScheduledExecutor();
- this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
- }
- if (this.triggerTasks != null) {
- for (TriggerTask task : this.triggerTasks) {
- addScheduledTask(scheduleTriggerTask(task));
- }
- }
- if (this.cronTasks != null) {
- for (CronTask task : this.cronTasks) {
- addScheduledTask(scheduleCronTask(task));
- }
- }
- if (this.fixedRateTasks != null) {
- for (IntervalTask task : this.fixedRateTasks) {
- addScheduledTask(scheduleFixedRateTask(task));
- }
- }
- if (this.fixedDelayTasks != null) {
- for (IntervalTask task : this.fixedDelayTasks) {
- addScheduledTask(scheduleFixedDelayTask(task));
- }
- }
- }
1、TriggerTask:动态定时任务。通过Trigger#nextExecutionTime 给定的触发上下文确定下一个执行时间。
2、CronTask:动态定时任务,TriggerTask子类。通过cron表达式确定的时间触发下一个任务执行。
3、IntervalTask:一定时间延迟之后,周期性执行的任务。
4、taskScheduler 如果为空,默认是ConcurrentTaskScheduler,并使用默认单线程的ScheduledExecutor。
v三、主要看一下CronTask工作原理
- ScheduledTaskRegistrar.java
- @Nullable
- public ScheduledTask scheduleCronTask(CronTask task) {
- ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
- boolean newTask = false;
- if (scheduledTask == null) {
- scheduledTask = new ScheduledTask(task);
- newTask = true;
- }
- if (this.taskScheduler != null) {
- scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
- }
- else {
- addCronTask(task);
- this.unresolvedTasks.put(task, scheduledTask);
- }
- return (newTask ? scheduledTask : null);
- }
-
- ConcurrentTaskScheduler.java
- @Override
- @Nullable
- public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
- try {
- if (this.enterpriseConcurrentScheduler) {
- return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
- }
- else {
- ErrorHandler errorHandler =
- (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
- return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
- }
- }
- catch (RejectedExecutionException ex) {
- throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
- }
- }
-
- ReschedulingRunnable.java
- @Nullable
- public ScheduledFuture<?> schedule() {
- synchronized (this.triggerContextMonitor) {
- this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
- if (this.scheduledExecutionTime == null) {
- return null;
- }
- long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
- this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
- return this;
- }
- }
-
- private ScheduledFuture<?> obtainCurrentFuture() {
- Assert.state(this.currentFuture != null, "No scheduled future");
- return this.currentFuture;
- }
-
- @Override
- public void run() {
- Date actualExecutionTime = new Date();
- super.run();
- Date completionTime = new Date();
- synchronized (this.triggerContextMonitor) {
- Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
- this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
- if (!obtainCurrentFuture().isCancelled()) {
- schedule();
- }
- }
- }
1、最终将task和trigger都封装到了ReschedulingRunnable中。
2、ReschedulingRunnable实现了任务重复调度(schedule方法中调用调度器executor并传入自身对象,executor会调用run方法,run方法又调用了schedule方法)。
3、ReschedulingRunnable schedule方法加了同步锁,只能有一个线程拿到下次执行时间并加入执行器的调度。
4、不同的ReschedulingRunnable对象之间在线程池够用的情况下是不会相互影响的,也就是说满足线程池的条件下,TaskScheduler的schedule方法的多次调用是可以交叉执行的。
- ScheduledThreadPoolExecutor.java
- public ScheduledFuture<?> schedule(Runnable command,
- long delay,
- TimeUnit unit) {
- if (command == null || unit == null)
- throw new NullPointerException();
- RunnableScheduledFuture<?> t = decorateTask(command,
- new ScheduledFutureTask<Void>(command, null,
- triggerTime(delay, unit)));
- delayedExecute(t);
- return t;
- }
-
-
- private void delayedExecute(RunnableScheduledFuture<?> task) {
- if (isShutdown())
- reject(task);
- else {
- super.getQueue().add(task);
- if (isShutdown() &&
- !canRunInCurrentRunState(task.isPeriodic()) &&
- remove(task))
- task.cancel(false);
- else
- ensurePrestart();
- }
- }
ScheduledFutureTask 工作原理如下图所示【太懒了,不想画图了,盗图一张】。
1、ScheduledFutureTask会放入优先阻塞队列:ScheduledThreadPoolExecutor.DelayedWorkQueue(二叉最小堆实现)
2、上图中的Thread对象即ThreadPoolExecutor.Worker,实现了Runnable接口
- /**
- * Creates with given first task and thread from ThreadFactory.
- * @param firstTask the first task (null if none)
- */
- Worker(Runnable firstTask) {
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }
-
- /** Delegates main run loop to outer runWorker */
- public void run() {
- runWorker(this);
- }
1、Worker中维护了Thread对象,Thread对象的Runnable实例即Worker自身
2、ThreadPoolExecutor#addWorker方法中会创建Worker对象,然后拿到Worker中的thread实例并start,这样就创建了线程池中的一个线程实例
3、Worker的run方法会调用ThreadPoolExecutor#runWorker方法,这才是任务最终被执行的地方,该方法示意如下
(1)首先取传入的task执行,如果task是null,只要该线程池处于运行状态,就会通过getTask方法从workQueue中取任务。ThreadPoolExecutor的execute方法会在无法产生core线程的时候向 workQueue队列中offer任务。
getTask方法从队列中取task的时候会根据相关配置决定是否阻塞和阻塞多久。如果getTask方法结束,返回的是null,runWorker循环结束,执行processWorkerExit方法。
至此,该线程结束自己的使命,从线程池中“消失”。
(2)在开始执行任务之前,会调用Worker的lock方法,目的是阻止task正在被执行的时候被interrupt,通过调用clearInterruptsForTaskRun方法来保证的(后面可以看一下这个方法),该线程没有自己的interrupt set了。
(3)beforeExecute和afterExecute方法用于在执行任务前后执行一些自定义的操作,这两个方法是空的,留给继承类去填充功能。
我们可以在beforeExecute方法中抛出异常,这样task不会被执行,而且在跳出该循环的时候completedAbruptly的值是true,表示the worker died due to user exception,会用decrementWorkerCount调整wc。
(4)因为Runnable的run方法不能抛出Throwables异常,所以这里重新包装异常然后抛出,抛出的异常会使当当前线程死掉,可以在afterExecute中对异常做一些处理。
(5)afterExecute方法也可能抛出异常,也可能使当前线程死掉。
v四、动态创建定时任务
v TaskConfiguration 配置类
- @Configuration
- @EnableScheduling
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public class TaskConfiguration {
-
- @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public ScheduledExecutorService scheduledAnnotationProcessor() {
- return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());
- }
-
- private static class DefaultThreadFactory implements ThreadFactory {
- private static final AtomicInteger poolNumber = new AtomicInteger(1);
- private final ThreadGroup group;
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final String namePrefix;
-
- DefaultThreadFactory() {
- SecurityManager s = System.getSecurityManager();
- group = (s != null) ? s.getThreadGroup() :
- Thread.currentThread().getThreadGroup();
- namePrefix = "pool-" +
- poolNumber.getAndIncrement() +
- "-schedule-";
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
- if (t.isDaemon()) {
- t.setDaemon(false);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- }
- }
- }
1、保证ConcurrentTaskScheduler不使用默认单线程的ScheduledExecutor,而是corePoolSize=5的线程池
2、自定义线程池工厂类
v DynamicTask 动态定时任务
- @Configuration
- public class DynamicTask implements SchedulingConfigurer {
- private static Logger LOGGER = LoggerFactory.getLogger(DynamicTask.class);
-
- private static final ExecutorService es = new ThreadPoolExecutor(10, 20,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(10),
- new DynamicTaskConsumeThreadFactory());
-
-
- private volatile ScheduledTaskRegistrar registrar;
- private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>();
-
- private volatile List<TaskConstant> taskConstants = Lists.newArrayList();
-
- @Override
- public void configureTasks(ScheduledTaskRegistrar registrar) {
- this.registrar = registrar;
- this.registrar.addTriggerTask(() -> {
- if (!CollectionUtils.isEmpty(taskConstants)) {
- LOGGER.info("检测动态定时任务列表...");
- List<TimingTask> tts = new ArrayList<>();
- taskConstants
- .forEach(taskConstant -> {
- TimingTask tt = new TimingTask();
- tt.setExpression(taskConstant.getCron());
- tt.setTaskId("dynamic-task-" + taskConstant.getTaskId());
- tts.add(tt);
- });
- this.refreshTasks(tts);
- }
- }
- , triggerContext -> new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext));
- }
-
-
- public List<TaskConstant> getTaskConstants() {
- return taskConstants;
- }
-
- private void refreshTasks(List<TimingTask> tasks) {
- //取消已经删除的策略任务
- Set<String> taskIds = scheduledFutures.keySet();
- for (String taskId : taskIds) {
- if (!exists(tasks, taskId)) {
- scheduledFutures.get(taskId).cancel(false);
- }
- }
- for (TimingTask tt : tasks) {
- String expression = tt.getExpression();
- if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) {
- LOGGER.error("定时任务DynamicTask cron表达式不合法: " + expression);
- continue;
- }
- //如果配置一致,则不需要重新创建定时任务
- if (scheduledFutures.containsKey(tt.getTaskId())
- && cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) {
- continue;
- }
- //如果策略执行时间发生了变化,则取消当前策略的任务
- if (scheduledFutures.containsKey(tt.getTaskId())) {
- scheduledFutures.remove(tt.getTaskId()).cancel(false);
- cronTasks.remove(tt.getTaskId());
- }
- CronTask task = new CronTask(tt, expression);
- ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
- cronTasks.put(tt.getTaskId(), task);
- scheduledFutures.put(tt.getTaskId(), future);
- }
- }
-
- private boolean exists(List<TimingTask> tasks, String taskId) {
- for (TimingTask task : tasks) {
- if (task.getTaskId().equals(taskId)) {
- return true;
- }
- }
- return false;
- }
-
- @PreDestroy
- public void destroy() {
- this.registrar.destroy();
- }
-
- public static class TaskConstant {
- private String cron;
- private String taskId;
-
- public String getCron() {
- return cron;
- }
-
- public void setCron(String cron) {
- this.cron = cron;
- }
-
- public String getTaskId() {
- return taskId;
- }
-
- public void setTaskId(String taskId) {
- this.taskId = taskId;
- }
- }
-
- private class TimingTask implements Runnable {
- private String expression;
-
- private String taskId;
-
- public String getTaskId() {
- return taskId;
- }
-
- public void setTaskId(String taskId) {
- this.taskId = taskId;
- }
-
- @Override
- public void run() {
- //设置队列大小10
- LOGGER.error("当前CronTask: " + this);
- DynamicBlockingQueue queue = new DynamicBlockingQueue(3);
- es.submit(() -> {
- while (!queue.isDone() || !queue.isEmpty()) {
- try {
- String content = queue.poll(500, TimeUnit.MILLISECONDS);
- if (StringUtils.isBlank(content)) {
- return;
- }
- LOGGER.info("DynamicBlockingQueue 消费:" + content);
- TimeUnit.MILLISECONDS.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
-
- //队列放入数据
- for (int i = 0; i < 5; ++i) {
- try {
- queue.put(String.valueOf(i));
- LOGGER.info("DynamicBlockingQueue 生产:" + i);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- queue.setDone(true);
- }
-
- public String getExpression() {
- return expression;
- }
-
- public void setExpression(String expression) {
- this.expression = expression;
- }
-
- @Override
- public String toString() {
- return ReflectionToStringBuilder.toString(this
- , ToStringStyle.JSON_STYLE
- , false
- , false
- , TimingTask.class);
- }
-
- }
-
- /**
- * 队列消费线程工厂类
- */
- private static class DynamicTaskConsumeThreadFactory implements ThreadFactory {
- private static final AtomicInteger poolNumber = new AtomicInteger(1);
- private final ThreadGroup group;
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final String namePrefix;
-
- DynamicTaskConsumeThreadFactory() {
- SecurityManager s = System.getSecurityManager();
- group = (s != null) ? s.getThreadGroup() :
- Thread.currentThread().getThreadGroup();
- namePrefix = "pool-" +
- poolNumber.getAndIncrement() +
- "-dynamic-task-";
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
- if (t.isDaemon()) {
- t.setDaemon(false);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- }
- }
-
- private static class DynamicBlockingQueue extends LinkedBlockingQueue<String> {
- DynamicBlockingQueue(int capacity) {
- super(capacity);
- }
-
-
- private volatile boolean done = false;
-
- public boolean isDone() {
- return done;
- }
-
- public void setDone(boolean done) {
- this.done = done;
- }
- }
- }
1、taskConstants 动态任务列表
2、ScheduledTaskRegistrar#addTriggerTask 添加动态周期定时任务,检测动态任务列表的变化
- CronTask task = new CronTask(tt, expression);
- ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
- cronTasks.put(tt.getTaskId(), task);
- scheduledFutures.put(tt.getTaskId(), future);
3、动态创建cron定时任务,拿到ScheduledFuture实例并缓存起来
4、在刷新任务列表时,通过缓存的ScheduledFuture实例和CronTask实例,来决定是否取消、移除失效的动态定时任务。
v DynamicTaskTest 动态定时任务测试类
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class DynamicTaskTest {
-
- @Autowired
- private DynamicTask dynamicTask;
-
- @Test
- public void test() throws InterruptedException {
- List<DynamicTask.TaskConstant> taskConstans = dynamicTask.getTaskConstants();
- DynamicTask.TaskConstant taskConstant = new DynamicTask.TaskConstant();
- taskConstant.setCron("0/5 * * * * ?");
- taskConstant.setTaskId("test1");
- taskConstans.add(taskConstant);
-
-
- DynamicTask.TaskConstant taskConstant1 = new DynamicTask.TaskConstant();
- taskConstant1.setCron("0/5 * * * * ?");
- taskConstant1.setTaskId("test2");
- taskConstans.add(taskConstant1);
-
- DynamicTask.TaskConstant taskConstant2 = new DynamicTask.TaskConstant();
- taskConstant2.setCron("0/5 * * * * ?");
- taskConstant2.setTaskId("test3");
- taskConstans.add(taskConstant2);
-
- TimeUnit.SECONDS.sleep(40);
- //移除并添加新的配置
- taskConstans.remove(taskConstans.size() - 1);
- DynamicTask.TaskConstant taskConstant3 = new DynamicTask.TaskConstant();
- taskConstant3.setCron("0/5 * * * * ?");
- taskConstant3.setTaskId("test4");
- taskConstans.add(taskConstant3);
- //
- TimeUnit.MINUTES.sleep(50);
- }
- }
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对w3xue的支持。