经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » 编程经验 » 查看文章
领域驱动模型DDD(四)——Eventuate Tram Saga源码讲解
来源:cnblogs  作者:键盘三个键  时间:2023/9/13 15:37:03  对本文有异议

前言

虽然本人一直抱怨《微服务架构设计模式》中DDD模式下采用的Eventuate Tram Saga不算简单易用,但是为了更加深入了解原文作者的设计思路,还是花了点时间去阅读源码,并且为了自己日后自己返回来看的懂,就斗胆地对整个Eventuate Tram Saga从注册到执行的代码运行流程进行注释解读下,其中若是有什么错误疏漏以及需要改进的地方,希望各位在评论区指正。

源码讲解

1:Saga流程如何被记录注册

1-1 CreateOrderSaga类

  1. //1-1构建流程,为了便以后续讲解,我们将一个step到下一个step之间的所有方法概括成一个“步骤”
  2. public class CreateOrderSaga implements SimpleSaga<CreateOrderSagaData> {
  3. private SagaDefinition<CreateOrderSagaData> sagaDefinition =
  4. //初始化起点,调用1-2的step
  5. step()
  6. //调用1-3 的invokeLocal方法,this::reject是一个输入CreateOrderSagaData类型参数返回CommandWithDestination的方法
  7. .invokeLocal(this::create)
  8. .withCompensation(this::reject)
  9. //步骤一
  10. //调用1-2step
  11. .step()
  12. //调用1-3 的invokeParticipant方法
  13. .invokeParticipant(this::reserveCredit)
  14. .onReply(CustomerNotFound.class, this::handleCustomerNotFound)
  15. .onReply(CustomerCreditLimitExceeded.class, this::handleCustomerCreditLimitExceeded)
  16. //步骤二
  17. //调用1-6step
  18. .step()
  19. .invokeLocal(this::approve)
  20. .build();

1-2 step初始化方法(saga流程的起点)

  1. //1-2创建sagaDefinition的源码:因为实现了SimpleSaga,SimpleSaga默认方法
  2. //此处的step对应1-1开头调用的step,1-1中后面的两个step并不调用此方法
  3. default StepBuilder<Data> step() {
  4. SimpleSagaDefinitionBuilder<Data> builder = new SimpleSagaDefinitionBuilder<>();
  5. return new StepBuilder<>(builder);
  6. }

1-3 StepBuilder类

  1. //1-3StepBuilder内部方法:
  2. public class StepBuilder<Data>{
  3. private final SimpleSagaDefinitionBuilder<Data> parent;
  4. //初始化时,这里parent被赋值为一个new SimpleSagaDefinitionBuilder;之后会用上第一次创建SimpleSagaDefinitionBuilder
  5. public StepBuilder(SimpleSagaDefinitionBuilder<Data> builder) {
  6. this.parent = builder;
  7. }
  8. //此处第一次时传入1-1中的this::create
  9. public LocalStepBuilder<Data> invokeLocal(Consumer<Data> localFunction) {
  10. return new LocalStepBuilder<>(parent, localFunction);
  11. }
  12. //此处传入1-1中的this::reserveCredit方法
  13. public InvokeParticipantStepBuilder<Data> invokeParticipant(Function<Data, CommandWithDestination> action) {
  14. //调用1-6的withAction方法
  15. return new InvokeParticipantStepBuilder<>(parent).withAction(Optional.empty(), action);
  16. }

1-4 LocalStepBuilder类

  1. //1-4根据CreateOrderSaga 中的流程可得知,后续invokeLocal方法:
  2. public class LocalStepBuilder<Data> {
  3. private final SimpleSagaDefinitionBuilder<Data> parent;
  4. private final Consumer<Data> localFunction;
  5. private Optional<Consumer<Data>> compensation = Optional.empty();
  6. //设置父节点,设置执行方法
  7. public LocalStepBuilder(SimpleSagaDefinitionBuilder<Data> parent, Consumer<Data> localFunction) {
  8. this.parent = parent;
  9. this.localFunction = localFunction;
  10. }
  11. //设置补偿方法,传入1-1中的this::reject
  12. public LocalStepBuilder<Data> withCompensation(Consumer<Data> localCompensation) {
  13. this.compensation = Optional.of(localCompensation);
  14. return this;
  15. }
  16. //再次调用step方法,此处的step方法对应的是1-1中后面的两个step方法
  17. public StepBuilder<Data> step() {
  18. //调用1-5中的方法,将step之前接收的每个“步骤”放入到SimpleSagaDefinitionBuilder的List中
  19. parent.addStep(makeLocalStep());
  20. //创建一个新的“步骤”并承接之前的“步骤”
  21. return new StepBuilder<>(parent);
  22. }
  23. //创建一个“步骤”
  24. private LocalStep<Data> makeLocalStep() {
  25. return new LocalStep<>(localFunction, compensation, localExceptionSavers, rollbackExceptions);
  26. }
  27. //对用1-1中的build方法,将最后一个“步骤”放入流程列表中,调用1-5中的方法创建一个SimpleSagaDefinition对象
  28. public SagaDefinition<Data> build() {
  29. //添加最后一个“步骤”并调用1-5中build()方法
  30. parent.addStep(makeLocalStep());
  31. return parent.build();
  32. }

1-5 SimpleSagaDefinitionBuilder类

  1. //1-5
  2. public class SimpleSagaDefinitionBuilder<Data> {
  3. //存放“步骤”列表
  4. private List<SagaStep<Data>> sagaSteps = new LinkedList<>();
  5. //添加“步骤节点”到列表中
  6. public void addStep(SagaStep<Data> sagaStep) {
  7. sagaSteps.add(sagaStep);
  8. }
  9. //将所有步骤节点整合构建整个大“流程业务”
  10. public SagaDefinition<Data> build() {
  11. //将整个“步骤”节点列表传入2-3List<Step> steps内
  12. return new SimpleSagaDefinition<>(sagaSteps);
  13. }
  14. }

1-6 InvokeParticipantStepBuilder类

  1. //1-6
  2. public class InvokeParticipantStepBuilder<Data> implements WithCompensationBuilder<Data> {
  3. private Optional<ParticipantInvocation<Data>> action = Optional.empty();
  4. public InvokeParticipantStepBuilder(SimpleSagaDefinitionBuilder<Data> parent) {
  5. this.parent = parent;
  6. }
  7. //调用1-7的构造函数,1-1的this::reserveCredit方法传给ParticipantInvocationImpl构建出上面Optional<ParticipantInvocation<Data>> action
  8. InvokeParticipantStepBuilder<Data> withAction(Optional<Predicate<Data>> participantInvocationPredicate, Function<Data, CommandWithDestination> action) {
  9. this.action = Optional.of(new ParticipantInvocationImpl<>(participantInvocationPredicate, action));
  10. return this;
  11. }
  12. public StepBuilder<Data> step() {
  13. addStep();
  14. return new StepBuilder<>(parent);
  15. }
  16. //本例中因为只使用了withAction方法,所以compensation, actionReplyHandlers, compensationReplyHandlers都为bull
  17. //调用2-7的中ParticipantInvocationStep的构造方法,将action赋予participantInvocation
  18. private void addStep() {
  19. parent.addStep(new ParticipantInvocationStep<>(action, compensation, actionReplyHandlers, compensationReplyHandlers));
  20. }
  21. }

1-7 ParticipantInvocationImpl类

  1. //1-7 AbstractParticipantInvocation实现ParticipantInvocation接口
  2. public class ParticipantInvocationImpl<Data, C extends Command> extends AbstractParticipantInvocation<Data> {
  3. private final boolean notification;
  4. //commandBuilder对应的就是1-6中withAction方法action参数
  5. private final Function<Data, CommandWithDestination> commandBuilder;
  6. public ParticipantInvocationImpl(Optional<Predicate<Data>> invocablePredicate, Function<Data, CommandWithDestination> commandBuilder) {
  7. this(invocablePredicate, commandBuilder, false);
  8. }
  9. @Override
  10. public boolean isSuccessfulReply(Message message) {
  11. return CommandReplyOutcome.SUCCESS.name().equals(message.getRequiredHeader(ReplyMessageHeaders.REPLY_OUTCOME));
  12. }
  13. //被2-7调用,执行Function<Data, CommandWithDestination> commandBuilder;并传入值为false的notification
  14. @Override
  15. public CommandWithDestinationAndType makeCommandToSend(Data data) {
  16. return new CommandWithDestinationAndType(commandBuilder.apply(data), notification);
  17. }
  18. }

至此所有Saga步骤注册完整的业务流程完毕。

2、Saga实例化工厂如何运作

2-0 sagaInstanceFactory创建createOrderSaga

  1. sagaInstanceFactory.create(createOrderSaga, data);

2-1 SagaInstanceFactory类

  1. //2-1Saga实例化工厂
  2. public class SagaInstanceFactory {
  3. private Logger logger = LoggerFactory.getLogger(this.getClass());
  4. private ConcurrentMap<Saga<?>, SagaManager<?>> sagaManagers = new ConcurrentHashMap<>();
  5. public <SagaData> SagaInstance create(Saga<SagaData> saga, SagaData data) {
  6. SagaManager<SagaData> sagaManager = (SagaManager<SagaData>)sagaManagers.get(saga);
  7. if (sagaManager == null)
  8. throw new RuntimeException(("No SagaManager for " + saga));
  9. //调用2-2的create方法
  10. return sagaManager.create(data);
  11. }
  12. private <SagaData> SagaManager<SagaData> makeSagaManager(SagaManagerFactory sagaManagerFactory, Saga<SagaData> saga) {
  13. SagaManagerImpl<SagaData> sagaManager = sagaManagerFactory.make(saga);
  14. sagaManager.subscribeToReplyChannel();
  15. return sagaManager;
  16. }
  17. }

2-2 SagaManagerImpl类

  1. //2-2
  2. public class SagaManagerImpl<Data> implements SagaManager<Data> {
  3. @Override
  4. public SagaInstance create(Data sagaData, Optional<String> resource) {
  5. SagaInstance sagaInstance = new SagaInstance(getSagaType(),null,"????",null,SagaDataSerde.serializeSagaData(sagaData), new HashSet<>());
  6. //使用数据库保存saga实例
  7. sagaInstanceRepository.save(sagaInstance);
  8. String sagaId = sagaInstance.getId();
  9. //此步骤没什么意义
  10. saga.onStarting(sagaId, sagaData);
  11. resource.ifPresent(r -> {
  12. if (!sagaLockManager.claimLock(getSagaType(), sagaId, r)) {
  13. throw new RuntimeException("Cannot claim lock for resource");
  14. }
  15. });
  16. //getStateDefinition()获取SagaDefinition,即1-1中。
  17. //调用2-4start方法,启动saga流程,构建第一个“步骤”的SagaActions
  18. SagaActions<Data> actions = getStateDefinition().start(sagaData);
  19. actions.getLocalException().ifPresent(e -> {
  20. throw e;
  21. });
  22. //过程操作,传入参数:saga.getSagaType()是获取sagaData(操作数据)的类名,比CreateOrderSagaData并修改成固定格式
  23. //sagaId:唯一sagaId
  24. //sagaInstance:saga实例化
  25. //sagaData:要操作的数据
  26. //actions:第一个“步骤”的SagaActions
  27. //调用下面的processActions方法
  28. processActions(saga.getSagaType(), sagaId, sagaInstance, sagaData, actions);
  29. return sagaInstance;
  30. }
  31. private void processActions(String sagaType, String sagaId, SagaInstance sagaInstance, Data sagaData, SagaActions<Data> actions) {
  32. //进行循环
  33. while (true) {
  34. //如果传入的actions存在执行报错则信息执行if内的方法
  35. if (actions.getLocalException().isPresent()) {
  36. actions = getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder
  37. .withPayload("{}")
  38. .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.FAILURE.name())
  39. .withHeader(ReplyMessageHeaders.REPLY_TYPE, Failure.class.getName())
  40. .build());
  41. } else {
  42. // only do this if successful
  43. //如果成功,通过消息队列发送给接收方。
  44. //第一次进来时:因为1-1中构建的流程内,“步骤一”是调用本地方法,因此不需要发送消息
  45. //第二次进来时:由于1-1中构建的流程内,“步骤二”是调用参与者方法,因此需要发送消息给参与者
  46. String lastRequestId = sagaCommandProducer.sendCommands(this.getSagaType(), sagaId, actions.getCommands(), this.makeSagaReplyChannel());
  47. //第一次进来时:lastRequestId第一个步骤时为null
  48. //第二次进来时:返回一个请求Id
  49. sagaInstance.setLastRequestId(lastRequestId);
  50. //第一次进来时:更新“步骤一”sagaInstance实例状态信息:更新是否是最后节点(布尔值),更新是否需要补偿(布尔值),更新是否报错,更新更新的状态(对应2-5中executeStep方法的newState)
  51. //第二次进来时:更新“步骤二”sagaInstance实例状态
  52. updateState(sagaInstance, actions);
  53. sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(actions.getUpdatedSagaData().orElse(sagaData)));
  54. //执行第一个步骤时,并不是最后一个步骤节点所以不进入if中
  55. if (actions.isEndState()) {
  56. performEndStateActions(sagaId, sagaInstance, actions.isCompensating(), actions.isFailed(), sagaData);
  57. }
  58. //使用数据库更新sagaInstance实例状态
  59. sagaInstanceRepository.update(sagaInstance);
  60. //public boolean isReplyExpected() {return (commands.isEmpty() || commands.stream().anyMatch(CommandWithDestinationAndType::isCommand)) && !local;}
  61. //第一次进来时:在2-5的step.makeStepOutcome过程中因为将local设置为true,所以执行第一个步骤时actions.isReplyExpected()为false
  62. if (actions.isReplyExpected()) {
  63. break;
  64. } else {
  65. //模拟成功回复本地动作或通知,调用下面的simulateSuccessfulReplyToLocalActionOrNotification方法
  66. //第一次进来时:传入“步骤一”的SagaActions,返回“步骤二”的SagaActions,继续循环
  67. //第二次进来时:传入“步骤二”的SagaActions,返回“步骤三”的SagaActions
  68. actions = simulateSuccessfulReplyToLocalActionOrNotification(sagaType, sagaId, actions);
  69. }
  70. }
  71. }
  72. }
  73. //模拟成功回复本地动作或通知
  74. private SagaActions<Data> simulateSuccessfulReplyToLocalActionOrNotification(String sagaType, String sagaId, SagaActions<Data> actions) {
  75. //获取1-1中的整个业务流程后,调用2-4中的handleReply方法,并设置REPLY_OUTCOME和REPLY_TYPE头为success,最后返回“步骤二”SagaActions,重新进入上面的while循环中
  76. return getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder
  77. .withPayload("{}")
  78. .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.SUCCESS.name())
  79. .withHeader(ReplyMessageHeaders.REPLY_TYPE, Success.class.getName())
  80. .build());
  81. }

2-3 AbstractSimpleSagaDefinition类

  1. //2-3
  2. public abstract class AbstractSimpleSagaDefinition<Data, Step extends ISagaStep<Data>,
  3. ToExecute extends AbstractStepToExecute<Data, Step>,
  4. Provider extends AbstractSagaActionsProvider<Data,?>> {
  5. protected Logger logger = LoggerFactory.getLogger(this.getClass());
  6. //steps接受1-5中的sagaSteps
  7. protected List<Step> steps;
  8. public AbstractSimpleSagaDefinition(List<Step> steps) {
  9. this.steps = steps;
  10. }
  11. //被2-4start方法调用
  12. protected Provider firstStepToExecute(Data data) {
  13. //SagaExecutionState.startingState()返回SagaExecutionState(-1, false);
  14. //初始化时步骤节点currentlyExecuting为-1(因为0为第一个“步骤”,所以设置-1为起点),是否补偿compensating为false
  15. //开始执行下一个步骤节点(此处执行的是第一个“步骤”)
  16. return nextStepToExecute(SagaExecutionState.startingState(), data);
  17. }
  18. //被2-4的handleReply方法调用
  19. protected Provider sagaActionsForNextStep(String sagaType, String sagaId, Data sagaData, Message message,
  20. SagaExecutionState state, Step currentStep, boolean compensating) {
  21. //此处分两种情况:本地调方法用CommandReplyOutcome.SUCCESS.name().equals(message.getRequiredHeader(ReplyMessageHeaders.REPLY_OUTCOME));【例如1-1中的“步骤一”】;远程调用参与者方法用getParticipantInvocation(compensating).get().isSuccessfulReply(message);【例如1-1中的“步骤二”】
  22. //由于“步骤一”是本地方法且头中包含信息为success,直接调用下方的nextStepToExecute方法
  23. if (currentStep.isSuccessfulReply(compensating, message)) {
  24. return nextStepToExecute(state, sagaData);
  25. } else if (compensating) {
  26. return handleFailedCompensatingTransaction(sagaType, sagaId, state, message);
  27. } else {
  28. return nextStepToExecute(state.startCompensating(), sagaData);
  29. }
  30. }
  31. protected Provider nextStepToExecute(SagaExecutionState state, Data data) {
  32. int skipped = 0;
  33. //初始化compensating为false无需补偿
  34. boolean compensating = state.isCompensating();
  35. //初始化compensating为false,所以下个要执行的步骤节点+1,如果为true说明出错需要回滚,因此direction初始化为-1
  36. int direction = compensating ? -1 : +1;
  37. //第一次进入时:direction初始化为-1,所以i初始值为0,说明从第一个步骤开始执行;i必须小于节点长度;i根据direction来判断是否需要回滚到上一个节点还是进入下一个阶段
  38. //第二次进入时:步骤一执行后state.getCurrentlyExecuting()变为0,所以i变为1,steps.get(i)获取“步骤二”。由于compensating依然为false,所以不进行回滚继续向下执行
  39. for (int i = state.getCurrentlyExecuting() + direction; i >= 0 && i < steps.size(); i = i + direction) {
  40. //获取步骤节点
  41. Step step = steps.get(i);
  42. //每个步骤节点中有正常执行的方法事务,可能有补偿事物。
  43. //因此使用compensating进行判断。如果需要补偿且存在补偿事务,或则不需要补偿,以上两种情况则为true
  44. //step.hasCompensation(data)和step.hasAction(data)返回值都是布尔值。
  45. //step.hasCompensation(data)用来判断是否存在补偿事务,step.hasAction(data)直接返回true
  46. if ((compensating ? step.hasCompensation(data) : step.hasAction(data))) {
  47. //makeStepToExecute指定执行步骤,调用SimpleSagaDefinition方法中的makeStepToExecute方法
  48. //传入参数:当前跳过的skipped计数,是否需要补偿,以及“步骤”
  49. //makeSagaActionsProvider调用2-4的makeStepmakeStepToExecuteToExecute方法,构建一个StepToExecute对象,
  50. ToExecute stepToExecute = makeStepToExecute(skipped, compensating, step);
  51. //makeSagaActionsProvider传入:执行节点,处理数据,节点初始状态,调用2-4中的makeSagaActionsProvider方法
  52. return makeSagaActionsProvider(stepToExecute, data, state);
  53. } else
  54. //如果需要补偿但没有补偿事务
  55. //跳过计数+1
  56. skipped++;
  57. }
  58. return makeSagaActionsProvider(makeEndStateSagaActions(state));
  59. }
  60. protected Provider handleFailedCompensatingTransaction(String sagaType, String sagaId, SagaExecutionState state, Message message) {
  61. logger.error("Saga {} {} failed due to failed compensating transaction {}", sagaType, sagaId, message);
  62. return makeSagaActionsProvider(SagaActions.<Data>builder()
  63. .withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeFailedEndState()))
  64. .withIsEndState(true)
  65. .withIsCompensating(state.isCompensating())
  66. .withIsFailed(true)
  67. .build());
  68. }
  69. protected SagaActions<Data> makeEndStateSagaActions(SagaExecutionState state) {
  70. return SagaActions.<Data>builder()
  71. .withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeEndState()))
  72. .withIsEndState(true)
  73. .withIsCompensating(state.isCompensating())
  74. .build();
  75. }
  76. }

2-4 SimpleSagaDefinition类

  1. //2-4
  2. public class SimpleSagaDefinition<Data>{
  3. public SimpleSagaDefinition(List<SagaStep<Data>> steps) {
  4. super(steps);
  5. }
  6. @Override
  7. public SagaActions<Data> start(Data sagaData) {
  8. //执行2-3中的firstStepToExecute方法,启动第一个流程
  9. return toSagaActions(firstStepToExecute(sagaData));
  10. //构建SagaActions完毕,返回并继续2-2中的create方法内getStateDefinition().start(sagaData)后的代码
  11. }
  12. //被2-2中的processActions方法调用
  13. @Override
  14. public SagaActions<Data> handleReply(String sagaType, String sagaId, String currentState, Data sagaData, Message message) {
  15. //将前一个“步骤”中的之前被JSON格式化(2-5的makeSagaActions把newState进行JSON格式化)的newState此处的currentState进行解码
  16. SagaExecutionState state = SagaExecutionStateJsonSerde.decodeState(currentState);
  17. //state.getCurrentlyExecuting()的值为“0”,因为初始化currentlyExecuting的计数是-1,而在2-5 中执行currentState.nextState(size())已经把当前“步骤”计数+1.而,所以step.get(0)获取了整个业务流程中的“步骤一”
  18. SagaStep<Data> currentStep = steps.get(state.getCurrentlyExecuting());
  19. //获取前一个“步骤”是否需要回滚
  20. boolean compensating = state.isCompensating();
  21. currentStep.getReplyHandler(message, compensating).ifPresent(handler -> invokeReplyHandler(message, sagaData, (d, m) -> {
  22. handler.accept(d, m);
  23. return null;
  24. }));
  25. //sagaActionsForNextStep会根据是否需要补偿来判断是采用nextStepToExecute(2-3)方法,还是调用handleFailedCompensatingTransaction(2-3)方法。
  26. SagaActionsProvider<Data> sap = sagaActionsForNextStep(sagaType, sagaId, sagaData, message, state, currentStep, compensating);
  27. return toSagaActions(sap);
  28. }
  29. //被2-3的nextStepToExecute中调用
  30. @Override
  31. protected StepToExecute<Data> makeStepToExecute(int skipped, boolean compensating, SagaStep<Data> step) {
  32. return new StepToExecute<>(step, skipped, compensating);
  33. }
  34. //被2-3的nextStepToExecute中调用
  35. @Override
  36. protected SagaActionsProvider<Data> makeSagaActionsProvider(StepToExecute<Data> stepToExecute, Data data, SagaExecutionState state) {
  37. //调用2-5的executeStep方法
  38. return new SagaActionsProvider<>(() -> stepToExecute.executeStep(data, state));
  39. }
  40. }

2-5 executeStep方法

  1. //2-5 StepToExecute类中的方法,被2-4中的makeSagaActionsProvider方法调用
  2. public SagaActions<Data> executeStep(Data data, SagaExecutionState currentState) {
  3. //nextState方法执行:SagaExecutionState(compensating ? currentlyExecuting - size : currentlyExecuting + size, compensating)
  4. //protected int size() {return 1 + skipped;}
  5. //需要补偿:则下一个状态为当前节点currentlyExecuting-已跳过(没有补偿事务)的节点的长度,即回到最后(有补偿事务)执行的步骤节点。
  6. //不需要补偿:则下一个状态为当前节点currentlyExecuting+需要跳过(没有补偿事务)的节点的长度
  7. //计算完后将currentlyExecuting进行更新
  8. SagaExecutionState newState = currentState.nextState(size());
  9. SagaActions.Builder<Data> builder = SagaActions.builder();
  10. //当前正执行是否需要补偿回滚
  11. boolean compensating = currentState.isCompensating();
  12. //调用2-6中的makeStepOutcome方法
  13. //第一次进来时:因为步骤一执行的是本地方法,调用的2-6中的makeStepOutcome,作用在于判断当前“步骤”是该执行补偿事务还是正常的本地事务,如果执行出现错误则返回一个带有报错信息的StepOutcome对象
  14. //第二次进来时:因为步骤二执行的是让参与方执行方法,调用的2-7中的makeStepOutcomemakeStepOutcome
  15. //执行StepOutcome的visit方法:将StepOutcome的RuntimeException类型的localOutcome属性赋值给SagaActions中的RuntimeException类型localException属性,同时将SagaActions中的local属性设置为ture;
  16. step.makeStepOutcome(data, this.compensating).visit(builder::withIsLocal, builder::withCommands);
  17. //SagaActions的makeSagaActions方法做两件事:将当前节点的数据newState格式化成JSON数据,newState信息包括:当前执行步骤的计数(第几个步骤)、是否回滚(布尔值)、是否是最后一个步骤(布尔值),是否出现错误(布尔值)。然后调用buildActions方法构建返回一个新的SagaActions
  18. //String state = encodeState(newState);
  19. //builder.buildActions(data, compensating, state, newState.isEndState());
  20. //public SagaActions<Data> buildActions(Data data, boolean compensating, String state, boolean endState) {
  21. //return withUpdatedSagaData(data)
  22. // .withUpdatedState(state)
  23. // .withIsEndState(endState)
  24. // .withIsCompensating(compensating)
  25. // .build();
  26. //}
  27. return makeSagaActions(builder, data, newState, compensating);
  28. //第一次进来时:构建SagaActions完毕,返回到2-4中的start方法
  29. //第二次进来时:构建SagagAcions完毕,返回到2-2中的simulateSuccessfulReplyToLocalActionOrNotification方法
  30. }

2-6 LocalStep类

  1. //2-6
  2. public class LocalStep<Data> implements SagaStep<Data> {
  3. private final Consumer<Data> localFunction;
  4. private final Optional<Consumer<Data>> compensation;
  5. private final List<LocalExceptionSaver<Data>> localExceptionSavers;
  6. private final List<Class<RuntimeException>> rollbackExceptions;
  7. @Override
  8. public StepOutcome makeStepOutcome(Data data, boolean compensating) {
  9. try {
  10. //如果需要回滚,执行回滚方法,compensation在1-1时的.withCompensation已经传入
  11. if (compensating) {
  12. //真正执行业务逻辑方法的地方
  13. compensation.ifPresent(localStep -> localStep.accept(data));
  14. } else {
  15. //如果不需要回滚,直接执行补偿方法,localFunction在1-1的.invokeLocal时已经传入
  16. localFunction.accept(data);
  17. }
  18. return makeLocalOutcome(Optional.empty());
  19. } catch (RuntimeException e) {
  20. localExceptionSavers.stream().filter(saver -> saver.shouldSave(e)).findFirst().ifPresent(saver -> saver.save(data, e));
  21. if (rollbackExceptions.isEmpty() || rollbackExceptions.stream().anyMatch(c -> c.isInstance(e)))
  22. return makeLocalOutcome(Optional.of(e));
  23. else
  24. throw e;
  25. }
  26. }
  27. }

2-7 ParticipantInvocationStep类

  1. //2-7
  2. public class ParticipantInvocationStep<Data> implements SagaStep<Data> {
  3. //participantInvocation被1-6的aaddStep方法传递action赋值
  4. private Optional<ParticipantInvocation<Data>> participantInvocation;
  5. private Optional<ParticipantInvocation<Data>> compensation;
  6. public ParticipantInvocationStep(Optional<ParticipantInvocation<Data>> participantInvocation,
  7. Optional<ParticipantInvocation<Data>> compensation,
  8. Map<String, BiConsumer<Data, Object>> actionReplyHandlers,
  9. Map<String, BiConsumer<Data, Object>> compensationReplyHandlers) {
  10. this.actionReplyHandlers = actionReplyHandlers;
  11. this.compensationReplyHandlers = compensationReplyHandlers;
  12. this.participantInvocation = participantInvocation;
  13. this.compensation = compensation;
  14. }
  15. //判断是否需要回滚,如果需要回滚执行compensation方法,如果不需要执行participantInvocation方法
  16. private Optional<ParticipantInvocation<Data>> getParticipantInvocation(boolean compensating) {
  17. return compensating ? compensation : participantInvocation;
  18. }
  19. @Override
  20. public boolean isSuccessfulReply(boolean compensating, Message message) {
  21. return getParticipantInvocation(compensating).get().isSuccessfulReply(message);
  22. }
  23. @Override
  24. public StepOutcome makeStepOutcome(Data data, boolean compensating) {
  25. //先调用getParticipantInvocation方法,此处假设不需要回滚所以返回participantInvocation
  26. //调用makeRemoteStepOutcome方法,传入List<CommandWithDestinationAndType> commandsToSend 返回RemoteStepOutcome类型的结果
  27. return StepOutcome.makeRemoteStepOutcome(getParticipantInvocation(compensating)
  28. //调用1-7中的makeCommandToSend方法,执行消息发送方法
  29. .map(pi -> pi.makeCommandToSend(data))
  30. //将返回的CommandWithDestinationAndType包装成单元素列表
  31. .map(Collections::singletonList)
  32. //如果上述返回为空则返回一个空列表
  33. .orElseGet(Collections::emptyList));
  34. }
  35. }

图解流程

有空再补....

简单案例

花了两天写了个以领域驱动为思想的Saga模式事务管理简陋框架,主要为了讲解:领域驱动模型DDD(三)——使用Saga管理事务 教学而设计的,目前只能在单体架构中使用,后续有时间会更新分布式情况下的新版本。请记住,领域驱动模型是一种思想,它不一定捆绑分布式微服务,只是领域驱动模型思想更有利于分布式情况下对微服务应用的划分。
项目框架地址:https://github.com/CG-Lin/mvn-lin

原文链接:https://www.cnblogs.com/jianpansangejian/p/17699095.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号