经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » ASP.net » 查看文章
基于DotNetty实现自动发布 - 通信实现
来源:cnblogs  作者:Broadm  时间:2023/12/5 12:10:55  对本文有异议

基于 DotNetty 实现通信

DotNetty : 是微软的 Azure 团队,使用 C#实现的 Netty 的版本发布。是.NET 平台的优秀网络库。

项目介绍

OpenDeploy.Communication 类库项目,是通信相关基础设施层

image

  • Codec 模块实现编码解码
  • Convention 模块定义约定,比如抽象的业务 Handler, 消息载体 NettyMessage, 消息上下文 'NettyContext' 等

自定义消息格式

消息类为 NettyMessage ,封装了消息头 NettyHeader 和消息体 Body

image

NettyMessage

封装了消息头 NettyHeader 和消息体 Body

NettyMessage 点击查看代码
  1. /// <summary> Netty消息 </summary>
  2. public class NettyMessage
  3. {
  4. /// <summary> 消息头 </summary>
  5. public NettyHeader Header { get; init; } = default!;
  6. /// <summary> 消息体(可空,可根据具体业务而定) </summary>
  7. public byte[]? Body { get; init; }
  8. /// <summary> 消息头转为字节数组 </summary>
  9. public byte[] GetHeaderBytes()
  10. {
  11. var headerString = Header.ToString();
  12. return Encoding.UTF8.GetBytes(headerString);
  13. }
  14. /// <summary> 是否同步消息 </summary>
  15. public bool IsSync() => Header.Sync;
  16. /// <summary> 创建Netty消息工厂方法 </summary>
  17. public static NettyMessage Create(string endpoint, bool sync = false, byte[]? body = null)
  18. {
  19. return new NettyMessage
  20. {
  21. Header = new NettyHeader { EndPoint = endpoint, Sync = sync },
  22. Body = body
  23. };
  24. }
  25. /// <summary> 序列化为JSON字符串 </summary>
  26. public override string ToString() => Header.ToString();
  27. }

NettyHeader

消息头,包含请求唯一标识,是否同步消息,终结点等, 在传输数据时会序列化为 JSON

NettyHeader 点击查看代码
  1. /// <summary> Netty消息头 </summary>
  2. public class NettyHeader
  3. {
  4. /// <summary> 请求消息唯一标识 </summary>
  5. public Guid RequestId { get; init; } = Guid.NewGuid();
  6. /// <summary> 是否同步消息, 默认false是异步消息 </summary>
  7. public bool Sync { get; init; }
  8. /// <summary> 终结点 (借鉴MVC,约定为Control/Action模式) </summary>
  9. public string EndPoint { get; init; } = string.Empty;
  10. /// <summary> 序列化为JSON字符串 </summary>
  11. public override string ToString() => this.ToJsonString();
  12. }

  • 请求消息唯一标识 RequestId , 用来唯一标识消息, 主要用于 发送同步请求, 因为默认的消息是异步的,只管发出去,不需要等待响应
  • 是否同步消息 Sync , 可以不需要,主要为了可视化,便于调试
  • 终结点 EndPoint , (借鉴 MVC,约定为 Control/Action 模式), 服务端直接解析出对应的处理器

编码器

DefaultEncoder 点击查看代码
  1. public class DefaultEncoder : MessageToByteEncoder<NettyMessage>
  2. {
  3. protected override void Encode(IChannelHandlerContext context, NettyMessage message, IByteBuffer output)
  4. {
  5. //消息头转为字节数组
  6. var headerBytes = message.GetHeaderBytes();
  7. //写入消息头长度
  8. output.WriteInt(headerBytes.Length);
  9. //写入消息头字节数组
  10. output.WriteBytes(headerBytes);
  11. //写入消息体字节数组
  12. if (message.Body != null && message.Body.Length > 0)
  13. {
  14. output.WriteBytes(message.Body);
  15. }
  16. }
  17. }

解码器

DefaultDecoder 点击查看代码
  1. public class DefaultDecoder : MessageToMessageDecoder<IByteBuffer>
  2. {
  3. protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
  4. {
  5. //消息总长度
  6. var totalLength = input.ReadableBytes;
  7. //消息头长度
  8. var headerLength = input.GetInt(input.ReaderIndex);
  9. //消息体长度
  10. var bodyLength = totalLength - 4 - headerLength;
  11. //读取消息头字节数组
  12. var headerBytes = new byte[headerLength];
  13. input.GetBytes(input.ReaderIndex + 4, headerBytes, 0, headerLength);
  14. byte[]? bodyBytes = null;
  15. string? rawHeaderString = null;
  16. NettyHeader? header;
  17. try
  18. {
  19. //把消息头字节数组,反序列化为JSON
  20. rawHeaderString = Encoding.UTF8.GetString(headerBytes);
  21. header = JsonSerializer.Deserialize<NettyHeader>(rawHeaderString);
  22. }
  23. catch (Exception ex)
  24. {
  25. Logger.Error($"解码失败: {rawHeaderString}, {ex}");
  26. return;
  27. }
  28. if (header is null)
  29. {
  30. Logger.Error($"解码失败: {rawHeaderString}");
  31. return;
  32. }
  33. //读取消息体字节数组
  34. if (bodyLength > 0)
  35. {
  36. bodyBytes = new byte[bodyLength];
  37. input.GetBytes(input.ReaderIndex + 4 + headerLength, bodyBytes, 0, bodyLength);
  38. }
  39. //封装为NettyMessage对象
  40. var message = new NettyMessage
  41. {
  42. Header = header,
  43. Body = bodyBytes,
  44. };
  45. output.Add(message);
  46. }
  47. }

NettyServer 实现

NettyServer 点击查看代码
  1. public static class NettyServer
  2. {
  3. /// <summary>
  4. /// 开启Netty服务
  5. /// </summary>
  6. public static async Task RunAsync(int port = 20007)
  7. {
  8. var bossGroup = new MultithreadEventLoopGroup(1);
  9. var workerGroup = new MultithreadEventLoopGroup();
  10. try
  11. {
  12. var bootstrap = new ServerBootstrap().Group(bossGroup, workerGroup);
  13. bootstrap
  14. .Channel<TcpServerSocketChannel>()
  15. .Option(ChannelOption.SoBacklog, 100)
  16. .Option(ChannelOption.SoReuseaddr, true)
  17. .Option(ChannelOption.SoReuseport, true)
  18. .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
  19. {
  20. IChannelPipeline pipeline = channel.Pipeline;
  21. pipeline.AddLast("framing-enc", new LengthFieldPrepender(4));
  22. pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
  23. pipeline.AddLast("decoder", new DefaultDecoder());
  24. pipeline.AddLast("encoder", new DefaultEncoder());
  25. pipeline.AddLast("handler", new ServerMessageEntry());
  26. }));
  27. var boundChannel = await bootstrap.BindAsync(port);
  28. Logger.Info($"NettyServer启动成功...{boundChannel}");
  29. Console.ReadLine();
  30. await boundChannel.CloseAsync();
  31. Logger.Info($"NettyServer关闭监听了...{boundChannel}");
  32. }
  33. finally
  34. {
  35. await Task.WhenAll(
  36. bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
  37. workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))
  38. );
  39. Logger.Info($"NettyServer退出了...");
  40. }
  41. }
  42. }
  • 服务端管道最后我们添加了 ServerMessageEntry ,作为消息处理的入口
ServerMessageEntry 点击查看代码
  1. public class ServerMessageEntry : ChannelHandlerAdapter
  2. {
  3. /// <summary> Netty处理器选择器 </summary>
  4. private readonly DefaultNettyHandlerSelector handlerSelector = new();
  5. public ServerMessageEntry()
  6. {
  7. //注册Netty处理器
  8. handlerSelector.RegisterHandlerTypes(typeof(EchoHandler).Assembly.GetTypes());
  9. }
  10. /// <summary> 通道激活 </summary>
  11. public override void ChannelActive(IChannelHandlerContext context)
  12. {
  13. Logger.Warn($"ChannelActive: {context.Channel}");
  14. }
  15. /// <summary> 通道关闭 </summary>
  16. public override void ChannelInactive(IChannelHandlerContext context)
  17. {
  18. Logger.Warn($"ChannelInactive: {context.Channel}");
  19. }
  20. /// <summary> 收到客户端的消息 </summary>
  21. public override async void ChannelRead(IChannelHandlerContext context, object message)
  22. {
  23. if (message is not NettyMessage nettyMessage)
  24. {
  25. Logger.Error("从客户端接收消息为空");
  26. return;
  27. }
  28. try
  29. {
  30. Logger.Info($"收到客户端的消息: {nettyMessage}");
  31. //封装请求
  32. var nettyContext = new NettyContext(context.Channel, nettyMessage);
  33. //选择处理器
  34. AbstractNettyHandler handler = handlerSelector.SelectHandler(nettyContext);
  35. //处理请求
  36. await handler.ProcessAsync();
  37. }
  38. catch(Exception ex)
  39. {
  40. Logger.Error($"ServerMessageEntry.ChannelRead: {ex}");
  41. }
  42. }
  43. }
  • 按照约定, 把继承 AbstractNettyHandler 的类视为业务处理器

  • ServerMessageEntry 拿到消息后,首先把消息封装为 NettyContext, 类似与 MVC 中的 HttpContext, 封装了请求和响应对象, 内部解析请求的 EndPoint, 拆分为 HandlerName, ActionName

  • DefaultNettyHandlerSelector 提供注册处理器的方法 RegisterHandlerTypes, 和选择处理器的方法 SelectHandler

  • SelectHandler, 默认规则是查找已注册的处理器中以 HandlerName 开头的类型

  • AbstractNettyHandlerProcessAsync 方法,通过 ActionName, 反射拿到 MethodInfo, 调用终结点

NettyClient 实现

NettyClient 点击查看代码
  1. public sealed class NettyClient(string serverHost, int serverPort) : IDisposable
  2. {
  3. public EndPoint ServerEndPoint { get; } = new IPEndPoint(IPAddress.Parse(serverHost), serverPort);
  4. private static readonly Bootstrap bootstrap = new();
  5. private static readonly IEventLoopGroup eventLoopGroup = new SingleThreadEventLoop();
  6. private bool _disposed;
  7. private IChannel? _channel;
  8. public bool IsConnected => _channel != null && _channel.Open;
  9. public bool IsWritable => _channel != null && _channel.IsWritable;
  10. static NettyClient()
  11. {
  12. bootstrap
  13. .Group(eventLoopGroup)
  14. .Channel<TcpSocketChannel>()
  15. .Option(ChannelOption.SoReuseaddr, true)
  16. .Option(ChannelOption.SoReuseport, true)
  17. .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
  18. {
  19. IChannelPipeline pipeline = channel.Pipeline;
  20. //pipeline.AddLast("ping", new IdleStateHandler(0, 5, 0));
  21. pipeline.AddLast("framing-enc", new LengthFieldPrepender(4));
  22. pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
  23. pipeline.AddLast("decoder", new DefaultDecoder());
  24. pipeline.AddLast("encoder", new DefaultEncoder());
  25. pipeline.AddLast("handler", new ClientMessageEntry());
  26. }));
  27. }
  28. /// <summary> 连接服务器 </summary>
  29. private async Task TryConnectAsync()
  30. {
  31. try
  32. {
  33. if (IsConnected) { return; }
  34. _channel = await bootstrap.ConnectAsync(ServerEndPoint);
  35. }
  36. catch (Exception ex)
  37. {
  38. throw new Exception($"连接服务器失败 : {ServerEndPoint} {ex.Message}");
  39. }
  40. }
  41. /// <summary>
  42. /// 发送消息
  43. /// </summary>
  44. /// <param name="endpoint">终结点</param>
  45. /// <param name="sync">是否同步等待响应</param>
  46. /// <param name="body">正文</param>
  47. public async Task SendAsync(string endpoint, bool sync = false, byte[]? body = null)
  48. {
  49. var message = NettyMessage.Create(endpoint, sync, body);
  50. if (sync)
  51. {
  52. var task = ClientMessageSynchronizer.TryAdd(message);
  53. try
  54. {
  55. await SendAsync(message);
  56. await task;
  57. }
  58. catch
  59. {
  60. ClientMessageSynchronizer.TryRemove(message);
  61. throw;
  62. }
  63. }
  64. else
  65. {
  66. await SendAsync(message);
  67. }
  68. }
  69. /// <summary>
  70. /// 发送消息
  71. /// </summary>
  72. private async Task SendAsync(NettyMessage message)
  73. {
  74. await TryConnectAsync();
  75. await _channel!.WriteAndFlushAsync(message);
  76. }
  77. /// <summary> 释放连接(程序员手动释放, 一般在代码使用using语句,或在finally里面Dispose) </summary>
  78. public void Dispose()
  79. {
  80. Dispose(true);
  81. GC.SuppressFinalize(this);
  82. }
  83. /// <summary> 释放连接 </summary>
  84. private void Dispose(bool disposing)
  85. {
  86. if (_disposed)
  87. {
  88. return;
  89. }
  90. //释放托管资源,比如嵌套的对象
  91. if (disposing)
  92. {
  93. }
  94. //释放非托管资源
  95. if (_channel != null)
  96. {
  97. _channel.CloseAsync();
  98. _channel = null;
  99. }
  100. _disposed = true;
  101. }
  102. ~NettyClient()
  103. {
  104. Dispose(true);
  105. }
  106. }
  • NettyClient 封装了 Netty 客户端逻辑,提供发送异步请求(默认)和发布同步请求方法
  • DotNetty 默认不提供同步请求,但是有些情况我们需要同步等待服务器的响应,所有需要自行实现,实现也很简单,把消息 ID 缓存起来,收到服务器响应后激活就行了,具体实现在消息同步器 ClientMessageSynchronizer, 就不贴了

总结

至此,我们实现了基于 DotNetty 搭建通信模块, 实现了客户端和服务器的编解码,处理器选择,客户端实现了同步消息等,大家可以在 ConsoleHost 结尾的控制台项目中,测试下同步和异步的消息,实现的简单的 Echo 模式

代码仓库

项目暂且就叫 OpenDeploy

欢迎大家拍砖,Star

下一步

计划下一步,基于WPF的客户端, 实现接口项目的配置与发现

原文链接:https://www.cnblogs.com/broadm/p/17875559.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号