经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Redis » 查看文章
kestrel网络编程--开发redis服务器
来源:cnblogs  作者:jiulang  时间:2022/12/12 10:15:48  对本文有异议

1 文章目的

本文讲解基于kestrel开发实现了部分redis命令的redis伪服务器的过程,让读者了解kestrel网络编程的完整步骤,其中redis通讯协议需要读者自行查阅,文章里不做具体解析。

2 开发顺序

  1. 创建Kestrel的Redis协议处理者
  2. 配置监听的EndPoint并使用Redis处理者
  3. 设计交互上下文RedisContext
  4. 设计Redis命令处理者
  5. 设计Redis中间件
  6. 编排Redis中间件构建应用

3. 创建Redis协议处理者

在Kestrel中,末级的中间件是一个没有next的特殊中间件,基表现出来就是一个ConnectionHandler的行为。我们开发redis应用只需要继承ConnectionHandler这个抽象类来,当kestrel接收到新的连接时将连接交给我们来处理,我们处理完成之后,不再有下一个处理者来处理这个连接了。

  1. /// <summary>
  2. /// 表示Redis连接处理者
  3. /// </summary>
  4. sealed class RedisConnectionHandler : ConnectionHandler
  5. {
  6. /// <summary>
  7. /// 处理Redis连接
  8. /// </summary>
  9. /// <param name="context">redis连接上下文</param>
  10. /// <returns></returns>
  11. public async override Task OnConnectedAsync(ConnectionContext context)
  12. {
  13. // 开始处理这个redis连接
  14. ...
  15. // 直到redis连接断开后结束
  16. }
  17. }

4. 配置监听的EndPoint

4.1 json配置文件

我们在配置文件里指定监听本机的5007端口来做服务器,当然你可以指定本机具体的某个IP或任意IP。

  1. {
  2. "Kestrel": {
  3. "Endpoints": {
  4. "Redis": { // redis协议服务器,只监听loopbackIP
  5. "Url": "http://localhost:5007"
  6. }
  7. }
  8. }
  9. }
  1. {
  2. "Kestrel": {
  3. "Endpoints": {
  4. "Redis": { // redis协议服务器,监听所有IP
  5. "Url": "http://*:5007"
  6. }
  7. }
  8. }
  9. }

4.2 在代码中配置Redis处理者

为Redis这个节点关联上RedisConnectionHandler,当redis客户端连接到5007这个端口之后,OnConnectedAsync()方法就得到触发且收到连接上下文对象。

  1. builder.WebHost.ConfigureKestrel((context, kestrel) =>
  2. {
  3. var section = context.Configuration.GetSection("Kestrel");
  4. kestrel.Configure(section).Endpoint("Redis", endpoint =>
  5. {
  6. endpoint.ListenOptions.UseConnectionHandler<RedisConnectionHandler>();
  7. });
  8. });

5 设计RedisContext

在asp.netcore里,我们知道应用层每次http请求都创建一个HttpContext对象,里面就塞着各种与本次请求有关的对象。对于Redis的请求,我们也可以这么抄袭asp.netcore来设计Redis。

5.1 RedisContext

Redis请求上下文,包含Client、Request、Response和Features对象,我们要知道是收到了哪个Redis客户端的什么请求,从而请求命令处理者可以向它响应对应的内容。

  1. /// <summary>
  2. /// 表示redis上下文
  3. /// </summary>
  4. sealed class RedisContext : ApplicationContext
  5. {
  6. /// <summary>
  7. /// 获取redis客户端
  8. /// </summary>
  9. public RedisClient Client { get; }
  10. /// <summary>
  11. /// 获取redis请求
  12. /// </summary>
  13. public RedisRequest Reqeust { get; }
  14. /// <summary>
  15. /// 获取redis响应
  16. /// </summary>
  17. public RedisResponse Response { get; }
  18. /// <summary>
  19. /// redis上下文
  20. /// </summary>
  21. /// <param name="client"></param>
  22. /// <param name="request"></param>
  23. /// <param name="response"></param>
  24. /// <param name="features"></param>
  25. public RedisContext(RedisClient client, RedisRequest request, RedisResponse response, IFeatureCollection features)
  26. : base(features)
  27. {
  28. this.Client = client;
  29. this.Reqeust = request;
  30. this.Response = response;
  31. }
  32. public override string ToString()
  33. {
  34. return $"{this.Client} {this.Reqeust}";
  35. }
  36. }

5.2 ApplicationContext

这是抽象的应用层上下文,它强调Features,做为多个中间件之间的沟通渠道。

  1. /// <summary>
  2. /// 表示应用程序请求上下文
  3. /// </summary>
  4. public abstract class ApplicationContext
  5. {
  6. /// <summary>
  7. /// 获取特征集合
  8. /// </summary>
  9. public IFeatureCollection Features { get; }
  10. /// <summary>
  11. /// 应用程序请求上下文
  12. /// </summary>
  13. /// <param name="features"></param>
  14. public ApplicationContext(IFeatureCollection features)
  15. {
  16. this.Features = new FeatureCollection(features);
  17. }
  18. }

5.3 RedisRequest

一个redis请求包含请求的命令和0到多个参数值。

  1. /// <summary>
  2. /// 表示Redis请求
  3. /// </summary>
  4. sealed class RedisRequest
  5. {
  6. private readonly List<RedisValue> values = new();
  7. /// <summary>
  8. /// 获取命令名称
  9. /// </summary>
  10. public RedisCmd Cmd { get; private set; }
  11. /// <summary>
  12. /// 获取参数数量
  13. /// </summary>
  14. public int ArgumentCount => this.values.Count - 1;
  15. /// <summary>
  16. /// 获取参数
  17. /// </summary>
  18. /// <param name="index"></param>
  19. /// <returns></returns>
  20. public RedisValue Argument(int index)
  21. {
  22. return this.values[index + 1];
  23. }
  24. }

RedisRequest的解析:

  1. /// <summary>
  2. /// 从内存中解析
  3. /// </summary>
  4. /// <param name="memory"></param>
  5. /// <param name="request"></param>
  6. /// <exception cref="RedisProtocolException"></exception>
  7. /// <returns></returns>
  8. private static bool TryParse(ReadOnlyMemory<byte> memory, [MaybeNullWhen(false)] out RedisRequest request)
  9. {
  10. request = default;
  11. if (memory.IsEmpty == true)
  12. {
  13. return false;
  14. }
  15. var span = memory.Span;
  16. if (span[0] != '*')
  17. {
  18. throw new RedisProtocolException();
  19. }
  20. if (span.Length < 4)
  21. {
  22. return false;
  23. }
  24. var lineLength = span.IndexOf((byte)'\n') + 1;
  25. if (lineLength < 4)
  26. {
  27. throw new RedisProtocolException();
  28. }
  29. var lineCountSpan = span.Slice(1, lineLength - 3);
  30. var lineCountString = Encoding.ASCII.GetString(lineCountSpan);
  31. if (int.TryParse(lineCountString, out var lineCount) == false || lineCount < 0)
  32. {
  33. throw new RedisProtocolException();
  34. }
  35. request = new RedisRequest();
  36. span = span.Slice(lineLength);
  37. for (var i = 0; i < lineCount; i++)
  38. {
  39. if (span[0] != '$')
  40. {
  41. throw new RedisProtocolException();
  42. }
  43. lineLength = span.IndexOf((byte)'\n') + 1;
  44. if (lineLength < 4)
  45. {
  46. throw new RedisProtocolException();
  47. }
  48. var lineContentLengthSpan = span.Slice(1, lineLength - 3);
  49. var lineContentLengthString = Encoding.ASCII.GetString(lineContentLengthSpan);
  50. if (int.TryParse(lineContentLengthString, out var lineContentLength) == false)
  51. {
  52. throw new RedisProtocolException();
  53. }
  54. span = span.Slice(lineLength);
  55. if (span.Length < lineContentLength + 2)
  56. {
  57. return false;
  58. }
  59. var lineContentBytes = span.Slice(0, lineContentLength).ToArray();
  60. var value = new RedisValue(lineContentBytes);
  61. request.values.Add(value);
  62. span = span.Slice(lineContentLength + 2);
  63. }
  64. request.Size = memory.Span.Length - span.Length;
  65. Enum.TryParse<RedisCmd>(request.values[0].ToString(), ignoreCase: true, out var name);
  66. request.Cmd = name;
  67. return true;
  68. }

5.4 RedisResponse

  1. /// <summary>
  2. /// 表示redis回复
  3. /// </summary>
  4. sealed class RedisResponse
  5. {
  6. private readonly PipeWriter writer;
  7. public RedisResponse(PipeWriter writer)
  8. {
  9. this.writer = writer;
  10. }
  11. /// <summary>
  12. /// 写入\r\n
  13. /// </summary>
  14. /// <returns></returns>
  15. public RedisResponse WriteLine()
  16. {
  17. this.writer.WriteCRLF();
  18. return this;
  19. }
  20. public RedisResponse Write(char value)
  21. {
  22. this.writer.Write((byte)value);
  23. return this;
  24. }
  25. public RedisResponse Write(ReadOnlySpan<char> value)
  26. {
  27. this.writer.Write(value, Encoding.UTF8);
  28. return this;
  29. }
  30. public RedisResponse Write(ReadOnlyMemory<byte> value)
  31. {
  32. this.writer.Write(value.Span);
  33. return this;
  34. }
  35. public ValueTask<FlushResult> FlushAsync()
  36. {
  37. return this.writer.FlushAsync();
  38. }
  39. public ValueTask<FlushResult> WriteAsync(ResponseContent content)
  40. {
  41. return this.writer.WriteAsync(content.ToMemory());
  42. }
  43. }

5.5 RedisClient

Redis是有状态的长连接协议,所以在服务端,我把连接接收到的连接包装为RedisClient的概念,方便我们业务理解。对于连接级生命周期的对象属性,我们都应该放到RedisClient上,比如是否已认证授权等。

  1. /// <summary>
  2. /// 表示Redis客户端
  3. /// </summary>
  4. sealed class RedisClient
  5. {
  6. private readonly ConnectionContext context;
  7. /// <summary>
  8. /// 获取或设置是否已授权
  9. /// </summary>
  10. public bool? IsAuthed { get; set; }
  11. /// <summary>
  12. /// 获取远程终结点
  13. /// </summary>
  14. public EndPoint? RemoteEndPoint => context.RemoteEndPoint;
  15. /// <summary>
  16. /// Redis客户端
  17. /// </summary>
  18. /// <param name="context"></param>
  19. public RedisClient(ConnectionContext context)
  20. {
  21. this.context = context;
  22. }
  23. /// <summary>
  24. /// 关闭连接
  25. /// </summary>
  26. public void Close()
  27. {
  28. this.context.Abort();
  29. }
  30. /// <summary>
  31. /// 转换为字符串
  32. /// </summary>
  33. /// <returns></returns>
  34. public override string? ToString()
  35. {
  36. return this.RemoteEndPoint?.ToString();
  37. }
  38. }

6. 设计Redis命令处理者

redis命令非常多,我们希望有一一对应的cmdHandler来对应处理,来各尽其责。所以我们要设计cmdHandler的接口,然后每个命令增加一个实现类型,最后使用一个中间件来聚合这些cmdHandler。

6.1 IRedisCmdHanler接口

  1. /// <summary>
  2. /// 定义redis请求处理者
  3. /// </summary>
  4. interface IRedisCmdHanler
  5. {
  6. /// <summary>
  7. /// 获取能处理的请求命令
  8. /// </summary>
  9. RedisCmd Cmd { get; }
  10. /// <summary>
  11. /// 处理请求
  12. /// </summary>
  13. /// <param name="context"></param>
  14. /// <returns></returns>
  15. ValueTask HandleAsync(RedisContext context);
  16. }

6.2 IRedisCmdHanler实现

由于实现类型特别多,这里只举个例子

  1. /// <summary>
  2. /// Ping处理者
  3. /// </summary>
  4. sealed class PingHandler : IRedisCmdHanler
  5. {
  6. public RedisCmd Cmd => RedisCmd.Ping;
  7. /// <summary>
  8. /// 处理请求
  9. /// </summary>
  10. /// <param name="context"></param>
  11. /// <returns></returns>
  12. public async ValueTask HandleAsync(RedisContext context)
  13. {
  14. await context.Response.WriteAsync(ResponseContent.Pong);
  15. }
  16. }

7.设计Redis中间件

对于Redis服务器应用而言,我们处理一个请求需要经过多个大的步骤:

  1. 如果服务器要求Auth的话,验证连接是否已Auth
  2. 如果Auth验证通过之后,则查找与请求对应的IRedisCmdHanler来处理请求
  3. 如果没有IRedisCmdHanler来处理,则告诉客户端命令不支持。

7.1 中间件接口

  1. /// <summary>
  2. /// redis中间件
  3. /// </summary>
  4. interface IRedisMiddleware : IApplicationMiddleware<RedisContext>
  5. {
  6. }
  1. /// <summary>
  2. /// 应用程序中间件的接口
  3. /// </summary>
  4. /// <typeparam name="TContext"></typeparam>
  5. public interface IApplicationMiddleware<TContext>
  6. {
  7. /// <summary>
  8. /// 执行中间件
  9. /// </summary>
  10. /// <param name="next">下一个中间件</param>
  11. /// <param name="context">上下文</param>
  12. /// <returns></returns>
  13. Task InvokeAsync(ApplicationDelegate<TContext> next, TContext context);
  14. }

7.2 命令处理者中间件

这里只拿重要的命令处理者中间件来做代码说明,其它中间件也是一样处理方式。

  1. /// <summary>
  2. /// 命令处理中间件
  3. /// </summary>
  4. sealed class CmdMiddleware : IRedisMiddleware
  5. {
  6. private readonly Dictionary<RedisCmd, IRedisCmdHanler> cmdHandlers;
  7. public CmdMiddleware(IEnumerable<IRedisCmdHanler> cmdHanlers)
  8. {
  9. this.cmdHandlers = cmdHanlers.ToDictionary(item => item.Cmd, item => item);
  10. }
  11. public async Task InvokeAsync(ApplicationDelegate<RedisContext> next, RedisContext context)
  12. {
  13. if (this.cmdHandlers.TryGetValue(context.Reqeust.Cmd, out var hanler))
  14. {
  15. // 这里是本中间件要干的活
  16. await hanler.HandleAsync(context);
  17. }
  18. else
  19. {
  20. // 本中间件干不了,留给下一个中间件来干
  21. await next(context);
  22. }
  23. }
  24. }

8 编排Redis中间件

回到RedisConnectionHandler,我们需要实现它,实现逻辑是编排Redis中间件并创建可以处理应用请求的委托application,再将收到的redis请求创建RedisContext对象的实例,最后使用application来执行RedisContext实例即可。

8.1 构建application委托

  1. sealed class RedisConnectionHandler : ConnectionHandler
  2. {
  3. private readonly ILogger<RedisConnectionHandler> logger;
  4. private readonly ApplicationDelegate<RedisContext> application;
  5. /// <summary>
  6. /// Redis连接处理者
  7. /// </summary>
  8. /// <param name="appServices"></param>
  9. /// <param name="logger"></param>
  10. public RedisConnectionHandler(
  11. IServiceProvider appServices,
  12. ILogger<RedisConnectionHandler> logger)
  13. {
  14. this.logger = logger;
  15. this.application = new ApplicationBuilder<RedisContext>(appServices)
  16. .Use<AuthMiddleware>()
  17. .Use<CmdMiddleware>()
  18. .Use<FallbackMiddlware>()
  19. .Build();
  20. }
  21. }

8.2 使用application委托处理请求

  1. sealed class RedisConnectionHandler : ConnectionHandler
  2. {
  3. /// <summary>
  4. /// 处理Redis连接
  5. /// </summary>
  6. /// <param name="context"></param>
  7. /// <returns></returns>
  8. public async override Task OnConnectedAsync(ConnectionContext context)
  9. {
  10. try
  11. {
  12. await this.HandleRequestsAsync(context);
  13. }
  14. catch (Exception ex)
  15. {
  16. this.logger.LogDebug(ex.Message);
  17. }
  18. finally
  19. {
  20. await context.DisposeAsync();
  21. }
  22. }
  23. /// <summary>
  24. /// 处理redis请求
  25. /// </summary>
  26. /// <param name="context"></param>
  27. /// <returns></returns>
  28. private async Task HandleRequestsAsync(ConnectionContext context)
  29. {
  30. var input = context.Transport.Input;
  31. var client = new RedisClient(context);
  32. var response = new RedisResponse(context.Transport.Output);
  33. while (context.ConnectionClosed.IsCancellationRequested == false)
  34. {
  35. var result = await input.ReadAsync();
  36. if (result.IsCanceled)
  37. {
  38. break;
  39. }
  40. var requests = RedisRequest.Parse(result.Buffer, out var consumed);
  41. if (requests.Count > 0)
  42. {
  43. foreach (var request in requests)
  44. {
  45. var redisContext = new RedisContext(client, request, response, context.Features);
  46. await this.application.Invoke(redisContext);
  47. }
  48. input.AdvanceTo(consumed);
  49. }
  50. else
  51. {
  52. input.AdvanceTo(result.Buffer.Start, result.Buffer.End);
  53. }
  54. if (result.IsCompleted)
  55. {
  56. break;
  57. }
  58. }
  59. }
  60. }

9 文章总结

在还没有进入阅读本文章之前,您可能会觉得我会大量讲解Socket知识内容,例如Socket BindSocket AcceptSocket SendSocket Receive等。但实际上没完全没有任何涉及,因为终结点的监听、连接的接收、缓冲区的处理、数据接收与发送等这些基础而复杂的网络底层kestrel已经帮我处理好,我们关注是我们的应用协议层的解析、还有应用本身功能的开发两个本质问题。

您可能发也现了,本文章的RedisRequest解析,也没有多少行代码!反而文章中都是抽象的中间件、处理者、上下文等概念。实际上这不但不会带来项目复杂度,反而让项目更好的解耦,比如要增加一个新的指令的支持,只需要增加一个xxxRedisCmdHanler的文件,其它地方都不用任何修改。

本文章是KestrelApp项目里面的一个demo的讲解,希望对您有用。

原文链接:https://www.cnblogs.com/kewei/p/16975371.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号