经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » 编程经验 » 查看文章
将Abp默认事件总线改造为分布式事件总线
来源:cnblogs  作者:林晓lx  时间:2023/12/21 9:02:59  对本文有异议

@

原理

本地事件总线是通过Ioc容器来实现的。

IEventBus接口定义了事件总线的基本功能,如注册事件、取消注册事件、触发事件等。

Abp.Events.Bus.EventBus是本地事件总线的实现类,其中私有成员ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories是事件订阅表。通过维护事件订阅表来实现事件处理器的注册和取消注册。当对应类型的事件触发时,通过订阅表查找所有事件处理器,通过Ioc容器来获取处理器实例,然后通过反射来调用事件处理器的"HandleEvent"方法。

创建分布式事件总线

首先,我们需要一个分布式事件总线中间件,用来将事件从本地事件总线转发到分布式事件总线。常用的中间件有RabbitMQ、Kafka、Redis等。

开源社区已经有实现好的库,本项目参考了 wuyi6216/Abp.RemoteEventBus

这里已经定义好了一个分布式事件总线接口

  1. public interface IDistributedEventBus : IDisposable
  2. {
  3. void MessageHandle(string topic, string message);
  4. void Publish(IDistributedEventData eventData);
  5. void Subscribe(string topic);
  6. void Unsubscribe(string topic);
  7. void UnsubscribeAll();
  8. }

为了兼容本地事件总线,我们需要定义一个分布式事件总线接口,继承自IEventBus接口。

  1. public interface IMultipleEventBus : IDistributedEventBus, IEventBus
  2. {
  3. }

实现自动订阅和事件转发

当注册本地事件时,将订阅分布式事件,事件Topic为类型的字符串表现形式

  1. public IDisposable Register(Type eventType, IEventHandlerFactory factory)
  2. {
  3. GetOrCreateHandlerFactories(eventType);
  4. List<IEventHandlerFactory> currentLists;
  5. if (_handlerFactories.TryGetValue(eventType, out currentLists))
  6. {
  7. lock (currentLists)
  8. {
  9. if (currentLists.Count == 0)
  10. {
  11. //Register to distributed event
  12. this.Subscribe(eventType.ToString());
  13. }
  14. currentLists.Add(factory);
  15. }
  16. }
  17. return new FactoryUnregistrar(this, eventType, factory);
  18. }

创建TriggerRemote,此方法用于将本地事件参数打包成为分布式事件消息payload,并发布该消息

  1. public void TriggerRemote(Type eventType, object eventSource, IEventData eventData)
  2. {
  3. var exceptions = new List<Exception>();
  4. eventData.EventSource = eventSource;
  5. try
  6. {
  7. var payloadDictionary = new Dictionary<string, object>
  8. {
  9. { PayloadKey, eventData }
  10. };
  11. var distributedeventData = new DistributedEventData(eventType.ToString(), payloadDictionary);
  12. Publish(distributedeventData);
  13. }
  14. catch (Exception ex)
  15. {
  16. exceptions.Add(ex);
  17. }
  18. if (exceptions.Any())
  19. {
  20. if (exceptions.Count == 1)
  21. {
  22. exceptions[0].ReThrow();
  23. }
  24. throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions);
  25. }
  26. }

当触发本地事件时,将消息转发至分布式事件总线。
在Trigger方法中调用TriggerRemote,事件状态回调和事件异常回调将不会被转发。

  1. if (!(typeof(DistributedEventBusEvent) == eventType
  2. || typeof(DistributedEventBusEvent).IsAssignableFrom(eventType)
  3. || typeof(DistributedEventMessageHandleExceptionData) == eventType
  4. || typeof(DistributedEventHandleExceptionData) == eventType
  5. ))
  6. {
  7. if (typeof(DistributedEventArgs) != eventType)
  8. {
  9. TriggerRemote(eventType, eventSource, eventData);
  10. }
  11. }

在消费端接收到分布式事件消息时,从Topic中解析类型,转发给本地事件。若此类型在本地事件注册过,则将消息反序列化为本地事件参数,然后触发本地事件。
本地事件处理器将触发最终的处理方法。

  1. public virtual void MessageHandle(string topic, string message)
  2. {
  3. Logger.Debug($"Receive message on topic {topic}");
  4. try
  5. {
  6. var eventData = _remoteEventSerializer.Deserialize<DistributedEventData>(message);
  7. var eventArgs = new DistributedEventArgs(eventData, topic, message);
  8. Trigger(this, new DistributedEventBusHandlingEvent(eventArgs));
  9. if (!string.IsNullOrEmpty(eventData.Type))
  10. {
  11. string pattern = @"(.*?)\[(.*?)\]";
  12. Match match = Regex.Match(eventData.Type, pattern);
  13. if (match.Success)
  14. {
  15. var type = match.Groups[1].Value;
  16. var type2 = match.Groups[2].Value;
  17. var localTriggerType = typeFinder.Find(c => c.FullName == type).FirstOrDefault();
  18. var genericType = typeFinder.Find(c => c.FullName == type2).FirstOrDefault();
  19. if (localTriggerType != null && genericType != null)
  20. {
  21. if (localTriggerType.GetTypeInfo().IsGenericType
  22. && localTriggerType.GetGenericArguments().Length == 1
  23. && !genericType.IsAbstract && !genericType.IsInterface
  24. )
  25. {
  26. var localTriggerGenericType = localTriggerType.GetGenericTypeDefinition().MakeGenericType(genericType);
  27. if (eventData.Data.TryGetValue(PayloadKey, out var payload))
  28. {
  29. var payloadObject = (payload as JObject).ToObject(localTriggerGenericType);
  30. Trigger(localTriggerGenericType, this, (IEventData)payloadObject);
  31. }
  32. }
  33. }
  34. }
  35. else
  36. {
  37. var localTriggerType = typeFinder.Find(c => c.FullName == eventData.Type).FirstOrDefault();
  38. if (localTriggerType != null && !localTriggerType.IsAbstract && !localTriggerType.IsInterface)
  39. {
  40. if (eventData.Data.TryGetValue(PayloadKey, out var payload))
  41. {
  42. var payloadObject = (payload as JObject).ToObject(localTriggerType);
  43. Trigger(localTriggerType, this, (IEventData)payloadObject);
  44. }
  45. }
  46. }
  47. Trigger(this, new DistributedEventBusHandledEvent(eventArgs));
  48. }
  49. }
  50. catch (Exception ex)
  51. {
  52. Logger.Error("Consume remote message exception", ex);
  53. Trigger(this, new DistributedEventMessageHandleExceptionData(ex, topic, topic));
  54. }
  55. }

使用

DistributedEventBus有不同的实现方式,这里以Redis为例

启动Redis服务

下载Redis并启动服务,使用默认端口6379

配置

生产者和消费者端都需要配置分布式事件总线

首先引用Abp.DistributedEventBus.Redis,并配置Abp模块依赖

[DependsOn(typeof(AbpDistributedEventBusRedisModule))]

在PreInitialize方法中配置Redis连接信息

  1. Configuration.Modules.DistributedEventBus().UseRedis().Configure(setting =>
  2. {
  3. setting.Server = "127.0.0.1:6379";
  4. });

用MultipleEventBus替换Abp默认事件总线

  1. //todo: 事件总线
  2. Configuration.ReplaceService(
  3. typeof(IEventBus),
  4. () => IocManager.IocContainer.Register(
  5. Component.For<IEventBus>().ImplementedBy<MultipleEventBus>()
  6. ));

传递Abp默认事件

我们知道在使用仓储时,Abp会自动触发一些事件,如创建、更新、删除等。我们来测试这些事件是否能通过分布式事件总线来传递。

定义一个实体类,用于传递实体的增删改事件。

  1. public class Person : FullAuditedEntity<int>
  2. {
  3. public string Name { get; set; }
  4. public int Age { get; set; }
  5. public string PhoneNumber { get; set; }
  6. }

在消费者端,定义一个事件处理器,用于处理实体的增删改事件。

  1. public class RemoteEntityChangedEventHandler :
  2. IEventHandler<EntityUpdatedEventData<Person>>,
  3. IEventHandler<EntityCreatedEventData<Person>>,
  4. IEventHandler<EntityDeletedEventData<Person>>,
  5. ITransientDependency
  6. {
  7. void IEventHandler<EntityUpdatedEventData<Person>>.HandleEvent(EntityUpdatedEventData<Person> eventData)
  8. {
  9. var person = eventData.Entity;
  10. Console.WriteLine($"Remote Entity Updated - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");
  11. }
  12. void IEventHandler<EntityCreatedEventData<Person>>.HandleEvent(EntityCreatedEventData<Person> eventData)
  13. {
  14. var person = eventData.Entity;
  15. Console.WriteLine($"Remote Entity Created - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");
  16. }
  17. void IEventHandler<EntityDeletedEventData<Person>>.HandleEvent(EntityDeletedEventData<Person> eventData)
  18. {
  19. var person = eventData.Entity;
  20. Console.WriteLine($"Remote Entity Deleted - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");
  21. }
  22. }

在生产者端,用IRepository对实体进行增删改操作。

  1. var person = new Person()
  2. {
  3. Name = "John",
  4. Age = 36,
  5. PhoneNumber = "18588888888"
  6. };
  7. personRepository.Insert(person);
  8. var person2 = new Person()
  9. {
  10. Name = "John2",
  11. Age = 36,
  12. PhoneNumber = "18588888889"
  13. };
  14. personRepository.Insert(person2);
  15. var persons = personRepository.GetAllList();
  16. foreach (var p in persons)
  17. {
  18. p.Age += 1;
  19. personRepository.Update(p);
  20. Console.WriteLine($"Entity Updated - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");
  21. }
  22. foreach (var p in persons)
  23. {
  24. personRepository.Delete(p);
  25. Console.WriteLine($"Entity Deleted - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");
  26. }

运行程序(同时运行消费者端和生产者端),可以看到消费者端打印出了实体的增删改事件。

在这里插入图片描述

注意:

分布式事件总线在两个独立系统间传递事件,所以需要定义一个共同的类型对象,用于事件参数的传递。
因此消费者端需要引用生产者端的模块,以便获取共同的类型对象。

  1. public override Assembly[] GetAdditionalAssemblies()
  2. {
  3. var clientModuleAssembly = typeof(Person).GetAssembly();
  4. return [clientModuleAssembly];
  5. }

传递自定义事件

定义NotificationEventData,用于传递自定义事件。

  1. public class NotificationEventData : EventData
  2. {
  3. public int Id { get; set; }
  4. public string Title { get; set; }
  5. public string Message { get; set; }
  6. public bool IsRead { get; set; }
  7. }

在消费者端,定义一个事件处理器,用于处理自定义事件。

  1. public class NotificationEventHandler :
  2. IEventHandler<NotificationEventData>,
  3. ITransientDependency
  4. {
  5. void IEventHandler<NotificationEventData>.HandleEvent(NotificationEventData eventData)
  6. {
  7. Console.WriteLine($"Id: {eventData.Id}");
  8. Console.WriteLine($"Title: {eventData.Title}");
  9. Console.WriteLine($"Message: {eventData.Message}");
  10. Console.WriteLine($"IsRead: {eventData.IsRead}");
  11. }
  12. }

在生产者端,触发自定义事件。

  1. var eventBus = IocManager.Instance.Resolve<IEventBus>();
  2. eventBus.Trigger<NotificationEventData>(new NotificationEventData()
  3. {
  4. Title = "Hi",
  5. Message = "Customized definition event test!",
  6. Id = 100,
  7. IsRead = true,
  8. });

运行程序(同时运行消费者端和生产者端),可以看到消费者端打印出了自定义事件。

在这里插入图片描述

项目地址

Github:DistributedEventBus

原文链接:https://www.cnblogs.com/jevonsflash/p/17917031.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号