经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » ASP.net » 查看文章
[.NET项目实战] Elsa开源工作流组件应用(三):实战演练
来源:cnblogs  作者:林晓lx  时间:2024/3/22 9:24:33  对本文有异议

补充

之前的文章简单介绍了工作流和Elsa工作流库,这里再补充说明两点

  1. 工作流的使用场景非常广泛,几乎涵盖了所有需要进行业务流程自动化管理的领域。

  2. 学习一个开源库,最简单的方法就是看源码,Elsa的工作流引擎源码非常简单易懂,并且提供了非常丰富的示例代码,举一个例子:审批工作流示例.\src\samples\aspnet\Elsa.Samples.AspNet.DocumentApproval

在这里插入图片描述
这个审批流是这样的:
作者发来一个文章,有两个审批人需要全部审批通过,文章才算通过,否则退回。

我们尝试阅读工作流源代码DocumentApprovalWorkflow.cs,并运行此项目,用postman发送请求

第一步:

假设这名叫Amanda的作者要发布文章,请求发送后,作者浏览器显示发送成功稍安勿躁之类的提示

同时后台打印作者信息和4个链接,分别是Jack和Lucy两位审批人“通过”和“退回”的url链接

  1. Activities =
  2. {
  3. new HttpEndpoint
  4. {
  5. Path = new("/documents"),
  6. SupportedMethods = new(new[] { HttpMethods.Post }),
  7. ParsedContent = new(documentVariable),
  8. CanStartWorkflow = true
  9. },
  10. new WriteLine(context => $"Document received from {documentVariable.Get<dynamic>(context)!.Author.Name}."),
  11. new WriteHttpResponse
  12. {
  13. Content = new("<h1>Request for Approval Sent</h1><p>Your document has been received and will be reviewed shortly.</p>"),
  14. ContentType = new(MediaTypeNames.Text.Html),
  15. StatusCode = new(HttpStatusCode.OK),
  16. ResponseHeaders = new(new HttpHeaders { ["X-Powered-By"] = new[] { "Elsa 3.0" } })
  17. },

第二步:

Jack觉得文章不错,通过浏览器请求了“通过”链接,而Lucy觉得文章还不够好,需改进,她在浏览器中请求了“退回”链接。

两位审批人的审批结果存储于approvedVariable变量中

同时他们的浏览器返回的响应内容:Thanks for the approval 或 Sorry to hear that

  1. new Fork
  2. {
  3. JoinMode = ForkJoinMode.WaitAll,
  4. Branches =
  5. {
  6. // Jack
  7. new Sequence
  8. {
  9. Activities =
  10. {
  11. new WriteLine(context => $"Jack approve url: \n {GenerateSignalUrl(context, "Approve:Jack")}"),
  12. new WriteLine(context => $"Jack reject url: \n {GenerateSignalUrl(context, "Reject:Jack")}"),
  13. new Fork
  14. {
  15. JoinMode = ForkJoinMode.WaitAny,
  16. Branches =
  17. {
  18. // Approve
  19. new Sequence
  20. {
  21. Activities =
  22. {
  23. new Event("Approve:Jack"),
  24. new SetVariable
  25. {
  26. Variable = approvedVariable,
  27. Value = new(true)
  28. },
  29. new WriteHttpResponse
  30. {
  31. Content = new("Thanks for the approval, Jack!"),
  32. }
  33. }
  34. },
  35. // Reject
  36. new Sequence
  37. {
  38. Activities =
  39. {
  40. new Event("Reject:Jack"),
  41. new SetVariable
  42. {
  43. Variable = approvedVariable,
  44. Value = new(false)
  45. },
  46. new WriteHttpResponse
  47. {
  48. Content = new("Sorry to hear that, Jack!"),
  49. }
  50. }
  51. }
  52. }
  53. }
  54. }
  55. },
  56. // Lucy
  57. new Sequence
  58. {
  59. Activities =
  60. {
  61. new WriteLine(context => $"Lucy approve url: \n {GenerateSignalUrl(context, "Approve:Lucy")}"),
  62. new WriteLine(context => $"Lucy reject url: \n {GenerateSignalUrl(context, "Reject:Lucy")}"),
  63. new Fork
  64. {
  65. JoinMode = ForkJoinMode.WaitAny,
  66. Branches =
  67. {
  68. // Approve
  69. new Sequence
  70. {
  71. Activities =
  72. {
  73. new Event("Approve:Lucy"),
  74. new SetVariable
  75. {
  76. Variable = approvedVariable,
  77. Value = new(true)
  78. },
  79. new WriteHttpResponse
  80. {
  81. Content = new("Thanks for the approval, Lucy!"),
  82. }
  83. }
  84. },
  85. // Reject
  86. new Sequence
  87. {
  88. Activities =
  89. {
  90. new Event("Reject:Lucy"),
  91. new SetVariable
  92. {
  93. Variable = approvedVariable,
  94. Value = new(false)
  95. },
  96. new WriteHttpResponse
  97. {
  98. Content = new("Sorry to hear that, Lucy!"),
  99. }
  100. }
  101. }
  102. }
  103. }
  104. }
  105. }
  106. }
  107. },

第三步:

根据approvedVariable变量判定文章是否被审核通过。

如果通过则在控制台打印Document document-1 approved!, 否则打印Document document-1 rejected!

  1. new WriteLine(context => $"Approved: {approvedVariable.Get<bool>(context)}"),
  2. new If(context => approvedVariable.Get<bool>(context))
  3. {
  4. Then = new WriteLine(context => $"Document ${documentVariable.Get<dynamic>(context)!.Id} approved!"),
  5. Else = new WriteLine(context => $"Document ${documentVariable.Get<dynamic>(context)!.Id} rejected!")
  6. }
  7. }

Elsa工作流源码还提供了大量的Sample,这里就不一一列举了,

需求描述

根据不同的时间规则,发送下发问卷给客户填写。

下发问卷给用户填写,且填写有超时时间,期间要提醒用户答题,

如果问卷未在规定的时间内作答则,则作废,并提醒用户。

需求分析

我们将需求尽可能分解成为单一职责的功能单元,并定义这些功能单元的输入输出。

下发问卷任务 PublishQuestionnaireActivity

下发问卷是将问卷(Questionnaire)实例化成问卷实例(Survey),问卷实例绑定用户Id,用户在问卷实例上作答。明确输入和输出:

  • 输入:问卷ID
  • 输出:问卷实例对象SurveyDto

通知任务 NotificationActivity

通知在这个需求中需要发送问卷状态,时间等内容给对应的用户,同通至少包含标题和内容。

  • 输入:标题和内容
  • 输出:无

问卷状态跟踪任务 WaitFillInSurveyActivity

这个任务要追踪问卷实例的状态,当问卷实例状态为已完成时,可以继续执行后续任务。

  • 输入:问卷实例ID
  • 输出:无

定时和延时任务

用于延时执行每个下发问卷的时间,等待问卷超时,以及延时发送通知等。

  • 输入:开始日期,延时日期,间隔时间或cron表达式
  • 输出:无

根任务

根任务包含所有的子任务,完成这个任务后,整个流程结束。在这个需求中根任务只需要知道将什么问卷,发送给哪位用户,以及在何时发送这三个问题。

  • 输入:问卷ID,用户ID,发送时间
  • 输出:无

各子任务参数对于他们的根任务是透明的(Invisible),根任务只需要关心是否完成,而不需要知道任务参数。

代码实现

下发问卷活动 PublishQuestionnaireActivity

下发问卷任务可以抽象成为下发问卷活动 PublishQuestionnaireActivity
创建PublishQuestionnaireActivity类并设置输入QuestionnaireId,输出SurveyDto

  1. public class PublishQuestionnaireActivity : Activity<SurveyDto>
  2. {
  3. public PublishQuestionnaireActivity()
  4. {
  5. }
  6. public PublishQuestionnaireActivity(long questionnaireId)
  7. {
  8. QuestionnaireId = new Input<long>(questionnaireId);
  9. }
  10. public Input<long> QuestionnaireId { get; set; } = default!;
  11. }

重写ExecuteAsync方法,完成问卷下发逻辑

  1. protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
  2. {
  3. var _surveyAppService = context.GetRequiredService<ISurveyAppService>();
  4. if (_surveyAppService != null)
  5. {
  6. var currentUserId = await context.GetInputValueAsync<Guid>("UserId");
  7. var survey = await _surveyAppService.PublishAsync(new PublishInput()
  8. {
  9. QuestionnaireId = this.QuestionnaireId.Get<long>(context),
  10. UserId = currentUserId
  11. }) ?? throw new Exception("创建问卷失败");
  12. context.SetResult(survey);
  13. }
  14. await context.CompleteActivityAsync();
  15. }

如此,其他的任务分别抽象成为相应的活动,这里展示完整代码

通知活动:NotificationActivity

  1. public class NotificationActivity : Activity
  2. {
  3. public NotificationActivity()
  4. {
  5. }
  6. public NotificationActivity(string title, string content)
  7. {
  8. Content = new Input<string>(content);
  9. Title = new Input<string>(title);
  10. }
  11. protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
  12. {
  13. var notificationManager = context.GetRequiredService<NotificationManager>();
  14. if (notificationManager != null)
  15. {
  16. var title = this.Title.Get(context);
  17. var content = this.Content.Get(context);
  18. var currentUserId = await context.GetInputValueAsync<Guid>("UserId");
  19. var data = new CreatePrivateMessageNotificationEto(currentUserId, title, content);
  20. await notificationManager.Send(data);
  21. }
  22. await context.CompleteActivityAsync();
  23. }
  24. public Input<string> Title { get; set; } = default!;
  25. public Input<string> Content { get; set; } = default!;
  26. }

等待问卷完成活动:WaitFillInSurveyActivity

  1. public class WaitFillInSurveyActivity : Activity
  2. {
  3. public WaitFillInSurveyActivity()
  4. {
  5. }
  6. public WaitFillInSurveyActivity(Func<ExpressionExecutionContext, long?> surveyId)
  7. : this(Expression.DelegateExpression(surveyId))
  8. {
  9. }
  10. public WaitFillInSurveyActivity(long surveyId) => SurveyId = new Input<long>(surveyId);
  11. public WaitFillInSurveyActivity(Expression expression) => SurveyId = new Input<long>(expression, new MemoryBlockReference());
  12. /// <inheritdoc />
  13. protected override ValueTask ExecuteAsync(ActivityExecutionContext context)
  14. {
  15. var surveyId = SurveyId.Get(context);
  16. if (surveyId == default)
  17. {
  18. var survey = context.ExpressionExecutionContext.GetLastResult<SurveyDto>();
  19. surveyId = survey.Id;
  20. }
  21. var payload = new WaitFillInSurveyBookmarkPayload(surveyId);
  22. context.CreateBookmark(new CreateBookmarkArgs
  23. {
  24. Payload = payload,
  25. Callback = Resume,
  26. BookmarkName = Type,
  27. IncludeActivityInstanceId = false
  28. });
  29. return ValueTask.CompletedTask;
  30. }
  31. private async ValueTask Resume(ActivityExecutionContext context)
  32. {
  33. await context.CompleteActivityAsync();
  34. }
  35. public Input<long> SurveyId { get; set; } = default!;
  36. }

此任务需要等待,我们创建一个Bookmark,注意创建Bookmark时,我们根据问卷实例SurveyId判断是否完成问卷的回答,因此指定IncludeActivityInstanceIdfalse,创建携带SurveyId的Payload类型:

  1. public record WaitFillInSurveyBookmarkPayload(long SurveyId);

在回调OnResumeAsync中,我们使用context.CompleteActivityAsync来完成任务。

定时和延时活动:

Elsa.Scheduling库提供了用于定时和延时任务的触发器(触发器属于工作流的一种)

在这里插入图片描述

[.NET项目实战] Elsa开源工作流组件应用(二):内核解读 一文 "构建 - 构建活动 "章节 列出了Elsa所有内建的活动。

这里使用Elsa内建的三个触发器:

StartAt 在未来特定的时间戳触发工作流触发器
Delay 延迟执行工作流触发器。
Timer 定期触发工作流触发器。

问卷活动:QuestionnaireActivity

问卷活动是下发问卷,通知,等待填写问卷等活动的父级。

Elsa定义了容器类型的活动Container类型,其中的Activities可以包含其他活动。

在这里插入图片描述

Sequence和Parallel都是容器类型,是Activity的子类,它们分别表示并行和顺序执行。

除此之外我们还需要两个内建活动:

Fork:分支,用于分支并行执行,与Parallel类似,但比它多了一个等待完成功能。

通过ForkJoinMode属性,可以指定分支任务的执行方式,ForkJoinMode.WaitAny:等待任意一个任务完成,ForkJoinMode.WaitAll:等待所有任务完成。

Fault:故障,用于在工作流执行过程中,遇到异常时,触发故障。并结束工作流。

创建问卷活动类型QuestionnaireActivity,继承自Sequence类型,并设置一些属性,如问卷Id,问卷填写超时时间等。

[可选]Elsa在注册工作流时,Activity对象是会被序列化并存储到WorflowDefinition表中的, 因此这些属性可以被持久化到数据库中。

  1. public class QuestionnaireActivity : Sequence
  2. {
  3. //可选,用于持久化一些属性
  4. public TimeSpan Delay { get; set; }
  5. public DateTime StartAt { get; set; }
  6. public TimeSpan Interval { get; set; }
  7. public string Cron { get; set; }
  8. public TimeSpan Duration { get; set; }
  9. public long QuestionnaireId { get; set; }
  10. public TimeSpan FillInTimeout { get; set; } = TimeSpan.FromHours(2);
  11. public QuestionnaireActivity()
  12. {
  13. }
  14. }

重写构造函数,并设置Activities属性

  1. public QuestionnaireActivity(long questionnaireId, TimeSpan fillInTimeout)
  2. {
  3. this.QuestionnaireId = questionnaireId;
  4. this.FillInTimeout = fillInTimeout;
  5. var currentSurvey = new Variable<SurveyDto>();
  6. Variables.Add(currentSurvey);
  7. Activities = new List<IActivity>()
  8. {
  9. //流程开始打印
  10. new WriteLine("问卷流程开始"),
  11. //下发问卷任务
  12. new PublishQuestionnaireActivity(QuestionnaireId)
  13. {
  14. Name="PublishQuestionnaire",
  15. Result=new Output<Questionnaire.Survey.Dto.SurveyDto> (currentSurvey)
  16. },
  17. //问卷到达提醒
  18. new NotificationActivity("新问卷提醒", "您有新的问卷,请查收"),
  19. //问卷处理分支
  20. new Fork
  21. {
  22. JoinMode = ForkJoinMode.WaitAny,
  23. Branches =
  24. {
  25. //问卷即将过期提醒
  26. new Sequence
  27. {
  28. Activities =
  29. {
  30. //等待
  31. new Delay
  32. {
  33. Name = "RemindDelay",
  34. TimeSpan = new(RemindDelay)
  35. },
  36. //通知
  37. new NotificationActivity("问卷即将超时", "问卷即将超时,请尽快回答")
  38. }
  39. },
  40. //问卷过期处理以及提醒
  41. new Sequence
  42. {
  43. Activities =
  44. {
  45. //等待
  46. new Delay
  47. {
  48. Name = "TimeoutDelay",
  49. TimeSpan = new(FillInTimeout)
  50. },
  51. //通知
  52. new NotificationActivity("问卷已过期", "问卷已过期,请等待工作人员处理"),
  53. //处理
  54. new Fault()
  55. {
  56. Message=new ("问卷回答超时")
  57. }
  58. }
  59. },
  60. //问卷状态跟踪
  61. new Sequence
  62. {
  63. Activities =
  64. {
  65. new WriteLine("开始等待问卷提交信号"),
  66. new WaitFillInSurveyActivity(context => currentSurvey.Get<SurveyDto>(context)?.Id)
  67. }
  68. }
  69. }
  70. },
  71. //流程结束打印
  72. new WriteLine("完成流程结束"),
  73. new Finish(),
  74. };
  75. }

创建工作流

现在我们来创建测试工作流,

  1. 添加一个工作流参数UserId,用于各活动中对用户的查询依赖。
  2. 分别实现4个并行任务:延时发送问卷,定时发送问卷,定期间隔发送问卷,根据Cron表达式执行。和一个串行任务
  1. public class Test1Workflow : WorkflowBase
  2. {
  3. public Guid UserId { get; set; }
  4. protected override void Build(IWorkflowBuilder workflow)
  5. {
  6. var startTime = new Variable<DateTimeOffset>();
  7. workflow.Inputs.Add(
  8. new InputDefinition() { Name = "UserId", Type = typeof(Guid), StorageDriverType = typeof(WorkflowStorageDriver) }
  9. );
  10. workflow.WithVariable(startTime);
  11. workflow.Root = new Sequence
  12. {
  13. Activities =
  14. {
  15. new WriteLine("Start"),
  16. new SetVariable<DateTimeOffset>
  17. {
  18. Variable = startTime,
  19. Value = new (DateTime.Now )
  20. },
  21. new Parallel()
  22. {
  23. Activities =
  24. {
  25. //并行任务1:延时发送问卷
  26. new Sequence()
  27. {
  28. Activities =
  29. {
  30. //问卷1 将在工作流启动后1小时执行
  31. new Delay(TimeSpan.FromHours(1)),
  32. new QuestionnaireActivity(1),
  33. }
  34. },
  35. //并行任务2:定时发送问卷
  36. new Sequence()
  37. {
  38. Activities =
  39. {
  40. //问卷2 将在 2024-4-1 08:30:00 执行
  41. new StartAt(new DateTime(2024,4,1,8,30,0)),
  42. new Delay(TimeSpan.FromHours(2)),
  43. new QuestionnaireActivity(2),
  44. }
  45. },
  46. //并行任务3:定期间隔发送问卷
  47. new Sequence()
  48. {
  49. Activities =
  50. {
  51. //问卷3 每隔两个小时执行
  52. new Timer(new TimeSpan(2,0,0)),
  53. new Delay(TimeSpan.FromHours(2)),
  54. new QuestionnaireActivity(3),
  55. }
  56. },
  57. //并行任务4:根据Cron表达式执行
  58. new Sequence()
  59. {
  60. Activities =
  61. {
  62. //问卷4 每个月的最后一天上午10点执行任务
  63. new Cron(cronExpression:"0 0 10 L * ?"),
  64. new Delay(TimeSpan.FromHours(2)),
  65. new QuestionnaireActivity(4),
  66. }
  67. },
  68. //并行任务5:根据某时间发送问卷
  69. new Sequence()
  70. {
  71. Activities =
  72. {
  73. new StartAt(context=> startTime.Get(context).AddMinutes(90)),
  74. new Delay(TimeSpan.FromHours(2)),
  75. new QuestionnaireActivity(5),
  76. }
  77. },
  78. //串行任务
  79. new Sequence()
  80. {
  81. Activities =
  82. {
  83. //问卷3 将在工作流启动后2小时执行
  84. new Delay(TimeSpan.FromHours(2)),
  85. new QuestionnaireActivity(3),
  86. //问卷4 将在问卷3完成1天后执行
  87. new Delay(TimeSpan.FromDays(1)),
  88. new QuestionnaireActivity(4),
  89. //问卷5 将在问卷4完成3天后执行
  90. new Delay(TimeSpan.FromDays(3)),
  91. new QuestionnaireActivity(5),
  92. }
  93. }
  94. }
  95. },
  96. new Finish(),
  97. },
  98. };
  99. }
  100. }

开始工作流

工作流启动参数需设置Input对象

  1. var input = new Dictionary<string, object>
  2. {
  3. {"UserId", "D1522DBC-5BFC-6173-EB60-3A114454350C"},
  4. };
  5. var startWorkflowOptions = new StartWorkflowRuntimeOptions
  6. {
  7. Input = input,
  8. VersionOptions = versionOptions,
  9. InstanceId = instanceId,
  10. };
  11. // Start the workflow.
  12. var result = await _workflowRuntime.StartWorkflowAsync(workflowDefinition.DefinitionId, startWorkflowOptions);

下面进入喜闻乐见的踩坑填坑环节

TroubleShooting

  1. 在活动中执行异步操作时,会导致报错:

    如下面的代码,执行Excute方法中的 context.CompleteActivityAsync()方法,时报错

在这里插入图片描述

在这里插入图片描述

原因分析:scope资源被提前释放

代码先执行到了112行,scope释放

在这里插入图片描述

解决:带有异步的操作一定要使用ExecuteAsync方法

在这里插入图片描述

  1. delay之后,Workflow的Input无法访问

原因分析:

Delay或其他Schedule类型的Activity,通过创建Bookmark挂起任务,当任务被唤醒时,input被workflowState.Output替换掉,和原先的input不一样了。

在这里插入图片描述

解决:

虽然input被替换了,但数据库的input还在,可以通过workflowInstanceId先取回workflowInstance对象,再通过instance.WorkflowState.Input.TryGetValue方法获取原始input值。

可以创建一个一个扩展方法GetInputValueAsync,Delay之后的活动中调用即可。

  1. public static async Task<TValue> GetInputValueAsync<TValue>(this ActivityExecutionContext context, string name)
  2. {
  3. TValue value;
  4. if (!context.TryGetWorkflowInput(name, out value))
  5. {
  6. var workflowInstanceStore = context.GetRequiredService<IWorkflowInstanceStore>();
  7. var instance = await workflowInstanceStore.FindAsync(new WorkflowInstanceFilter()
  8. {
  9. Id = context.WorkflowExecutionContext.Id
  10. });
  11. if (instance != null)
  12. {
  13. instance.WorkflowState.Input.TryGetValue(name, out value);
  14. }
  15. }
  16. return value;
  17. }

在Activity中调用:

  1. await context.GetInputValueAsync<Guid>("UserId");

持续更新中...

--完结--

原文链接:https://www.cnblogs.com/jevonsflash/p/18087583

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号