经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » ASP.net » 查看文章
.NET分布式Orleans - 7 - Streaming
来源:cnblogs  作者:chester·chen  时间:2024/3/29 11:44:08  对本文有异议

概念

在Orleans中,Streaming是一组API和功能集,它提供了一种构建、发布和消费数据流的方式。

这些流可以是任何类型的数据,从简单的消息到复杂的事件或数据记录。Streaming API允许你定义、发布和消费这些流,而无需关心底层的传输机制或数据存储。

每个流都有一个唯一的标识符,称为StreamId,用于区分不同的流。流可以是持久的,也可以是临时的,具体取决于所使用的流提供者(Stream Provider)。流提供者负责处理流的存储、传输和故障恢复。

作用

Streaming在Orleans中起到了至关重要的作用,主要体现在以下几个方面:

  1. 解耦:Streaming允许将数据的产生者和消费者解耦。生产者可以发布数据到流中,而消费者可以独立地订阅这些流并处理数据。这种解耦使得系统更加灵活和可扩展。

  2. 实时性:通过Streaming,你可以实时地处理和响应数据流。这对于需要实时分析、监控或响应的场景非常有用。

  3. 故障恢复:Orleans的Streaming机制具有强大的故障恢复能力。即使在出现网络分区或节点故障的情况下,流提供者也能够确保数据的可靠性和一致性。

应用场景

  1. 实时日志分析:你可以将应用程序的日志消息发布到流中,并使用专门的消费者来分析这些日志。这允许你实时地监控和响应应用程序的行为。

  2. 事件驱动架构:在事件驱动架构中,你可以使用Streaming来发布事件,并由多个消费者来处理这些事件。这有助于构建松耦合、可扩展和响应式的系统。

  3. 分布式协作:Streaming也可以用于实现分布式系统中的协作和通信。例如,多个节点可以发布状态更新到流中,其他节点可以订阅这些流以获取最新的状态信息。

示例

安装nuget包

  1. <PackageReference Include="Microsoft.Orleans.Streaming" Version="8.0.0" />

配置Streaming

  1. siloHostBuilder.AddMemoryStreams("StreamProvider").AddMemoryGrainStorage("PubSubStore");

定义一个Grain生成事件

  1. public interface ISender : IGrainWithStringKey
  2. {
  3. Task Send(Guid rid);
  4. }
  5. public class SenderGrain : Grain, ISender
  6. {
  7. public Task Send(Guid rid)
  8. {
  9. var streamProvider = this.GetStreamProvider("StreamProvider");
  10. var streamId = StreamId.Create("RANDOMDATA", rid);
  11. var stream = streamProvider.GetStream<int>(streamId);
  12. RegisterTimer(_ =>
  13. {
  14. return stream.OnNextAsync(Random.Shared.Next());
  15. }, null, TimeSpan.FromMilliseconds(1_000), TimeSpan.FromMilliseconds(1_000));
  16. return Task.CompletedTask;
  17. }
  18. }

再定义一个Grain订阅事件

  1. public interface IRandomReceiver : IGrainWithGuidKey
  2. {
  3. Task Receive();
  4. }
  5. [ImplicitStreamSubscription("RANDOMDATA")]
  6. public class ReceiverGrain : Grain, IRandomReceiver
  7. {
  8. public override async Task OnActivateAsync(CancellationToken cancellationToken)
  9. {
  10. var streamProvider = this.GetStreamProvider("StreamProvider");
  11. var rid = this.GetPrimaryKey();
  12. var streamId = StreamId.Create("RANDOMDATA", rid);
  13. var stream = streamProvider.GetStream<int>(streamId);
  14. await stream.SubscribeAsync<int>(
  15. async (data, token) =>
  16. {
  17. Console.WriteLine(data);
  18. await Task.CompletedTask;
  19. });
  20. base.OnActivateAsync(cancellationToken);
  21. }
  22. public async Task Receive()
  23. {
  24. }
  25. }

然后即可测试

  1. var rid = Guid.NewGuid();
  2. var sender1 = client.GetGrain<ISender>("sender1");
  3. await sender1.Send(rid);
  4. var reciver1 = client.GetGrain<IRandomReceiver>(new Guid());
  5. await reciver1.Receive();

流提供程序

提供程序可以通过在nuget种搜索Orleans.Streaming,也可以通过PersistentStreamProvider 与 IQueueAdapter 重写来自定义Provider

 

原文链接:https://www.cnblogs.com/chenyishi/p/18103492

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号