经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » ASP.net » 查看文章
NETCore中实现一个轻量无负担的极简任务调度ScheduleTask
来源:cnblogs  作者:万雅虎  时间:2024/5/21 8:51:25  对本文有异议

至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel
这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度,如果只是到分钟级别的粒度基本够用

技术栈用到了:BackgroundServiceNCrontab

第一步我们定义一个简单的任务约定,不干别的就是一个执行方法:

  1. public interface IScheduleTask
  2. {
  3. Task ExecuteAsync();
  4. }
  5. public abstract class ScheduleTask : IScheduleTask
  6. {
  7. public virtual Task ExecuteAsync()
  8. {
  9. return Task.CompletedTask;
  10. }
  11. }

第二步定义特性标注任务执行周期等信的metadata

  1. [AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]
  2. public class ScheduleTaskAttribute(string cron) : Attribute
  3. {
  4. /// <summary>
  5. /// 支持的cron表达式格式 * * * * *:https://en.wikipedia.org/wiki/Cron
  6. /// 最小单位为分钟
  7. /// </summary>
  8. public string Cron { get; set; } = cron;
  9. public string? Description { get; set; }
  10. /// <summary>
  11. /// 是否异步执行.默认false会阻塞接下来的同类任务
  12. /// </summary>
  13. public bool IsAsync { get; set; } = false;
  14. /// <summary>
  15. /// 是否初始化即启动,默认false
  16. /// </summary>
  17. public bool IsStartOnInit { get; set; } = false;
  18. }

第三步我们定义一个调度器约定,不干别的就是判断当前的任务是否可以执行:

  1. public interface IScheduler
  2. {
  3. /// <summary>
  4. /// 判断当前的任务是否可以执行
  5. /// </summary>
  6. bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);
  7. }

好了,基础步骤就完成了,如果我们需要实现配置级别的任务调度或者动态的任务调度 那我们再抽象一个Store:

  1. public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)
  2. {
  3. public Type ScheduleTaskType { get; set; } = scheduleTaskType;
  4. public string Cron { get; set; } = cron;
  5. public string? Description { get; set; }
  6. public bool IsAsync { get; set; } = false;
  7. public bool IsStartOnInit { get; set; } = false;
  8. }
  9. public interface IScheduleMetadataStore
  10. {
  11. /// <summary>
  12. /// 获取所有ScheduleTaskMetadata
  13. /// </summary>
  14. Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync();
  15. }

实现一个Configuration级别的Store

  1. internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore
  2. {
  3. const string Key = "BiwenQuickApi:Schedules";
  4. public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
  5. {
  6. var options = configuration.GetSection(Key).GetChildren();
  7. if (options?.Any() is true)
  8. {
  9. var metadatas = options.Select(x =>
  10. {
  11. var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);
  12. if (type is null)
  13. throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");
  14. return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!)
  15. {
  16. Description = x[nameof(ConfigurationScheduleOption.Description)],
  17. IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),
  18. IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),
  19. };
  20. });
  21. return Task.FromResult(metadatas);
  22. }
  23. return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>());
  24. }
  25. }

然后呢,我们可能需要多任务调度的事件做一些操作或者日志存储.比如失败了该干嘛,完成了回调其他后续业务等.我们再来定义一下具体的事件IEvent,具体可以参考我上一篇文章:
https://www.cnblogs.com/vipwan/p/18184088

  1. public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent
  2. {
  3. /// <summary>
  4. /// 任务
  5. /// </summary>
  6. public IScheduleTask ScheduleTask { get; set; } = scheduleTask;
  7. /// <summary>
  8. /// 触发时间
  9. /// </summary>
  10. public DateTime EventTime { get; set; } = eventTime;
  11. }
  12. /// <summary>
  13. /// 执行完成
  14. /// </summary>
  15. public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)
  16. {
  17. /// <summary>
  18. /// 执行结束的时间
  19. /// </summary>
  20. public DateTime EndTime { get; set; } = endTime;
  21. }
  22. /// <summary>
  23. /// 执行开始
  24. /// </summary>
  25. public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);
  26. /// <summary>
  27. /// 执行失败
  28. /// </summary>
  29. public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)
  30. {
  31. /// <summary>
  32. /// 异常信息
  33. /// </summary>
  34. public Exception Exception { get; private set; } = exception;
  35. }

接下来我们再实现基于NCrontab的简易调度器,这个调度器主要是解析Cron表达式判断传入时间是否可以执行ScheduleTask,具体的代码:

  1. internal class SampleNCrontabScheduler : IScheduler
  2. {
  3. /// <summary>
  4. /// 暂存上次执行时间
  5. /// </summary>
  6. private static ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new();
  7. public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime)
  8. {
  9. var now = DateTime.Now;
  10. var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time);
  11. if (!haveExcuteTime)
  12. {
  13. var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
  14. LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);
  15. //如果不是初始化启动,则不执行
  16. if (!scheduleMetadata.IsStartOnInit)
  17. return false;
  18. }
  19. if (now >= time)
  20. {
  21. var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
  22. //更新下次执行时间
  23. LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);
  24. return true;
  25. }
  26. return false;
  27. }
  28. }

然后就是核心的BackgroundService了,这里我用的IdleTime心跳来实现,粒度分钟,当然内部也可以封装Timer等实现更复杂精度更高的调度,这里就不展开讲了,代码如下:

  1. internal class ScheduleBackgroundService : BackgroundService
  2. {
  3. private static readonly TimeSpan _pollingTime
  4. #if DEBUG
  5. //轮询20s 测试环境下,方便测试。
  6. = TimeSpan.FromSeconds(20);
  7. #endif
  8. #if !DEBUG
  9. //轮询60s 正式环境下,考虑性能轮询时间延长到60s
  10. = TimeSpan.FromSeconds(60);
  11. #endif
  12. //心跳10s.
  13. private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);
  14. private readonly ILogger<ScheduleBackgroundService> _logger;
  15. private readonly IServiceProvider _serviceProvider;
  16. public ScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider)
  17. {
  18. _logger = logger;
  19. _serviceProvider = serviceProvider;
  20. }
  21. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  22. {
  23. while (!stoppingToken.IsCancellationRequested)
  24. {
  25. var pollingDelay = Task.Delay(_pollingTime, stoppingToken);
  26. try
  27. {
  28. await RunAsync(stoppingToken);
  29. }
  30. catch (Exception ex)
  31. {
  32. //todo:
  33. _logger.LogError(ex.Message);
  34. }
  35. await WaitAsync(pollingDelay, stoppingToken);
  36. }
  37. }
  38. private async Task RunAsync(CancellationToken stoppingToken)
  39. {
  40. using var scope = _serviceProvider.CreateScope();
  41. var tasks = scope.ServiceProvider.GetServices<IScheduleTask>();
  42. if (tasks is null || !tasks.Any())
  43. {
  44. return;
  45. }
  46. //调度器
  47. var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();
  48. async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata)
  49. {
  50. if (scheduler.CanRun(metadata, DateTime.Now))
  51. {
  52. var eventTime = DateTime.Now;
  53. //通知启动
  54. _ = new TaskStartedEvent(task, eventTime).PublishAsync(default);
  55. try
  56. {
  57. if (metadata.IsAsync)
  58. {
  59. //异步执行
  60. _ = task.ExecuteAsync();
  61. }
  62. else
  63. {
  64. //同步执行
  65. await task.ExecuteAsync();
  66. }
  67. //执行完成
  68. _ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);
  69. }
  70. catch (Exception ex)
  71. {
  72. _ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);
  73. }
  74. }
  75. };
  76. //注解中的task
  77. foreach (var task in tasks)
  78. {
  79. if (stoppingToken.IsCancellationRequested)
  80. {
  81. break;
  82. }
  83. //标注的metadatas
  84. var metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>();
  85. if (!metadatas.Any())
  86. {
  87. continue;
  88. }
  89. foreach (var metadata in metadatas)
  90. {
  91. await DoTaskAsync(task, metadata);
  92. }
  93. }
  94. //store中的scheduler
  95. var stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray();
  96. //并行执行,提高性能
  97. Parallel.ForEach(stores, async store =>
  98. {
  99. if (stoppingToken.IsCancellationRequested)
  100. {
  101. return;
  102. }
  103. var metadatas = await store.GetAllAsync();
  104. if (metadatas is null || !metadatas.Any())
  105. {
  106. return;
  107. }
  108. foreach (var metadata in metadatas)
  109. {
  110. var attr = new ScheduleTaskAttribute(metadata.Cron)
  111. {
  112. Description = metadata.Description,
  113. IsAsync = metadata.IsAsync,
  114. IsStartOnInit = metadata.IsStartOnInit,
  115. };
  116. var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;
  117. if (task is null)
  118. {
  119. return;
  120. }
  121. await DoTaskAsync(task, attr);
  122. }
  123. });
  124. }
  125. private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken)
  126. {
  127. try
  128. {
  129. await Task.Delay(_minIdleTime, stoppingToken);
  130. await pollingDelay;
  131. }
  132. catch (OperationCanceledException)
  133. {
  134. }
  135. }
  136. }

最后收尾阶段我们老规矩扩展一下IServiceCollection:

  1. internal static IServiceCollection AddScheduleTask(this IServiceCollection services)
  2. {
  3. foreach (var task in ScheduleTasks)
  4. {
  5. services.AddTransient(task);
  6. services.AddTransient(typeof(IScheduleTask), task);
  7. }
  8. //调度器
  9. services.AddScheduler<SampleNCrontabScheduler>();
  10. //配置文件Store:
  11. services.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>();
  12. //BackgroundService
  13. services.AddHostedService<ScheduleBackgroundService>();
  14. return services;
  15. }
  16. /// <summary>
  17. /// 注册调度器AddScheduler
  18. /// </summary>
  19. public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler
  20. {
  21. services.AddSingleton<IScheduler, T>();
  22. return services;
  23. }
  24. /// <summary>
  25. /// 注册ScheduleMetadataStore
  26. /// </summary>
  27. public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore
  28. {
  29. services.AddSingleton<IScheduleMetadataStore, T>();
  30. return services;
  31. }

老规矩我们来测试一下:

  1. //通过特性标注的方式执行:
  2. [ScheduleTask(Constants.CronEveryMinute)] //每分钟一次
  3. [ScheduleTask("0/3 * * * *")]//每3分钟执行一次
  4. public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask
  5. {
  6. public async Task ExecuteAsync()
  7. {
  8. //执行5s
  9. await Task.Delay(TimeSpan.FromSeconds(5));
  10. logger.LogInformation("keep alive!");
  11. }
  12. }
  13. public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask
  14. {
  15. public Task ExecuteAsync()
  16. {
  17. logger.LogInformation("Demo Config Schedule Done!");
  18. return Task.CompletedTask;
  19. }
  20. }

通过配置文件的方式配置Store:

  1. {
  2. "BiwenQuickApi": {
  3. "Schedules": [
  4. {
  5. "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
  6. "Cron": "0/5 * * * *",
  7. "Description": "Every 5 mins",
  8. "IsAsync": true,
  9. "IsStartOnInit": false
  10. },
  11. {
  12. "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
  13. "Cron": "0/10 * * * *",
  14. "Description": "Every 10 mins",
  15. "IsAsync": false,
  16. "IsStartOnInit": true
  17. }
  18. ]
  19. }
  20. }

我们还可以实现自己的Store,这里以放到内存为例,如果有兴趣 你可以可以自行开发一个面板管理:

  1. public class DemoStore : IScheduleMetadataStore
  2. {
  3. public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
  4. {
  5. //模拟从数据库或配置文件中获取ScheduleTaskMetadata
  6. IEnumerable<ScheduleTaskMetadata> metadatas =
  7. [
  8. new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2))
  9. {
  10. Description="测试的Schedule"
  11. },
  12. ];
  13. return Task.FromResult(metadatas);
  14. }
  15. }
  16. //然后注册这个Store:
  17. builder.Services.AddScheduleMetadataStore<DemoStore>();

所有的一切都大功告成,最后我们来跑一下Demo,成功了:
image

当然这里是自己的固定思维设计的一个简约版,还存在一些不足,欢迎板砖轻拍指正!

2024/05/16更新:
提供同一时间单一运行中的任务实现

  1. /// <summary>
  2. /// 模拟一个只能同时存在一个的任务.一分钟执行一次,但是耗时两分钟.
  3. /// </summary>
  4. /// <param name="logger"></param>
  5. [ScheduleTask(Constants.CronEveryMinute, IsStartOnInit = true)]
  6. public class OnlyOneTask(ILogger<OnlyOneTask> logger) : OnlyOneRunningScheduleTask
  7. {
  8. public override Task OnAbort()
  9. {
  10. logger.LogWarning($"[{DateTime.Now}]任务被打断.因为有一个相同的任务正在执行!");
  11. return Task.CompletedTask;
  12. }
  13. public override async Task ExecuteAsync()
  14. {
  15. var now = DateTime.Now;
  16. //模拟一个耗时2分钟的任务
  17. await Task.Delay(TimeSpan.FromMinutes(2));
  18. logger.LogInformation($"[{now}] ~ {DateTime.Now} 执行一个耗时两分钟的任务!");
  19. }
  20. }

源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi
https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling

原文链接:https://www.cnblogs.com/vipwan/p/18194062/biwen-quickapi-scheduletask

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号