经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
Netty开发redis客户端,Netty发送redis命令,netty解析redis消息
来源:cnblogs  作者:逃离沙漠  时间:2018/11/20 22:53:42  对本文有异议

关键字:Netty开发redis客户端,Netty发送redis命令,netty解析redis消息, netty redis ,redis RESP协议。redis客户端,netty redis协议

我们可以使用redis-cli这个客户端来操作redis,也可以使用window的命令行telnet连接redis。本文,我们的目标是使用netty来实现redis客户端,实现目标为:

  1. 1. 启动netty程序
  2. 2. 在命令行输入 set mykey hello,由netty发送给redis服务器
  3. 3. 在命令行输入 get mykey hello,得到结果:hello
  4. 4. 在命令行输入 quit,程序退出

前言

Redis在TCP端口6379(默认,可修改端口)上监听到来的连接,客户端连接到来时,Redis服务器为此创建一个TCP连接。在客户端与服务器端之间传输的每个Redis命令或者数据都以\r\n结尾。当redis服务启动之后,我们可以使用TCP与之链接,连接之后便可以发消息,也会受到redis服务器的消息。而这个消息是有格式的,这个格式是事先商量好的,我们称之为协议,redis的协议叫做RESP,比方说我们有一条redis命令set hello 123,这条命令我们知道它是一条设置命令,通过RESP协议“翻译”一下,他就是这样的:

*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$3\r\n123\r\n

然后,这条协议通过网络传输(二进制形式),传到redis服务器,被redis服务器解析,最后完成设置。关于RESP洗衣详细可以看这里.

思路

上面我们介绍了redis是基于TCP传输,并使用了其自己的协议——RESP。RESP其实是数据交换可解析的协议,你可以理解为数据交换的格式,按照此格式组装好要传输的命令,并以二进制的形式由client端发往redis服务端。服务端接收这个消息之后,解析消息,执行命令,并将结果以协议好的格式组装好,传输给client端。client端接收到响应,解释成人类可以看懂的结果展示。

因此,我们可以整理一下思路:

  1. 1. 我们需要连接redis服务端,因此需要编写一个netty client端(此处联想一下netty client端的样板代码)。
  2. 2. 我们需要向redis服务端发送redis命令,很简单,获取channel,然后write。即channel.write(...)
  3. 3. 我们所编写的直白的命令,如set xx,get xx之类的需要编码之后才能传输给redis服务器。
  4. 因此,我们需要 **编码器**。很荣幸netty自带了,可以直接使用。
  5. 这里是 【输出】 所以要有 outbound handler.
  6. 4. redis会响应结果给我们,因此我们需要在 chanelRead方法中处理数据。
  7. 这里是 【输入】 所以要有 inbound handler.

编写代码

上的思路整理好了之后,我们可以写代码了。得益于netty的良好设计,我们只需要把netty client的“样板代码”拷贝过来生成一个client端代码即可。剩下的就是 handler ,decoder ,encoder 。我们需要编写的类有:

  • RedisClient 见名知义,我们的主类,包含client bootstrap信息。 接收用户控制台输入redis命令。
  • RedisClientInitializer 初始化器,在此添加 handler,decoder,encoder
  • RedisClientHandler 核心逻辑,需要处理 inbound ,outbound 两种类型事件。

RedisClient 代码如下:

  1. public class RedisClient {
  2. String host; // 目标主机
  3. int port; // 目标主机端口
  4. public RedisClient(String host,int port){
  5. this.host = host;
  6. this.port = port;
  7. }
  8. public void start() throws Exception{
  9. EventLoopGroup group = new NioEventLoopGroup();
  10. try {
  11. Bootstrap bootstrap = new Bootstrap();
  12. bootstrap.group(group)
  13. .channel(NioSocketChannel.class)
  14. .handler(new RedisClientInitializer());
  15. Channel channel = bootstrap.connect(host, port).sync().channel();
  16. System.out.println(" connected to host : " + host + ", port : " + port);
  17. System.out.println(" type redis's command to communicate with redis-server or type 'quit' to shutdown ");
  18. BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
  19. ChannelFuture lastWriteFuture = null;
  20. for (;;) {
  21. String s = in.readLine();
  22. if(s.equalsIgnoreCase("quit")) {
  23. break;
  24. }
  25. System.out.print(">");
  26. lastWriteFuture = channel.writeAndFlush(s);
  27. lastWriteFuture.addListener(new GenericFutureListener<ChannelFuture>() {
  28. @Override
  29. public void operationComplete(ChannelFuture future) throws Exception {
  30. if (!future.isSuccess()) {
  31. System.err.print("write failed: ");
  32. future.cause().printStackTrace(System.err);
  33. }
  34. }
  35. });
  36. }
  37. if (lastWriteFuture != null) {
  38. lastWriteFuture.sync();
  39. }
  40. System.out.println(" bye ");
  41. }finally {
  42. group.shutdownGracefully();
  43. }
  44. }
  45. public static void main(String[] args) throws Exception{
  46. RedisClient client = new RedisClient("redis-cache2.228",5001);
  47. client.start();
  48. }
  49. }

上面代码很长,但是,我们要熟悉netty的套路,它的样板代码就是如此。我们只需要看handler(new RedisClientInitializer()); 这一行,下面的就是一个 for(;;)循环,用来接收我们在控制台输入的redis命令。

RedisClientInitializer代码如下:

  1. public class RedisClientInitializer extends ChannelInitializer<Channel>{
  2. @Override
  3. protected void initChannel(Channel ch) throws Exception {
  4. ChannelPipeline pipeline = ch.pipeline();
  5. pipeline.addLast(new RedisDecoder());
  6. pipeline.addLast(new RedisBulkStringAggregator());
  7. pipeline.addLast(new RedisArrayAggregator());
  8. pipeline.addLast(new RedisEncoder());
  9. pipeline.addLast(new RedisClientHandler());
  10. }
  11. }

这个类,很简单,上面的几个addLast方法,除了最后一个外,其他都是netty自带的redis协议实现相关的编解码。最后一个是我们自定义的业务逻辑处理器。源码如下:

  1. public class RedisClientHandler extends ChannelDuplexHandler {
  2. // 发送 redis 命令
  3. @Override
  4. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
  5. String[] commands = ((String) msg).split("\\s+");
  6. List<RedisMessage> children = new ArrayList<>(commands.length);
  7. for (String cmdString : commands) {
  8. children.add(new FullBulkStringRedisMessage(ByteBufUtil.writeUtf8(ctx.alloc(), cmdString)));
  9. }
  10. RedisMessage request = new ArrayRedisMessage(children);
  11. ctx.write(request, promise);
  12. }
  13. // 接收 redis 响应数据
  14. @Override
  15. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  16. RedisMessage redisMessage = (RedisMessage) msg;
  17. // 打印响应消息
  18. printAggregatedRedisResponse(redisMessage);
  19. // 是否资源
  20. ReferenceCountUtil.release(redisMessage);
  21. }
  22. @Override
  23. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  24. System.err.print("exceptionCaught: ");
  25. cause.printStackTrace(System.err);
  26. ctx.close();
  27. }
  28. private static void printAggregatedRedisResponse(RedisMessage msg) {
  29. if (msg instanceof SimpleStringRedisMessage) {
  30. System.out.println(((SimpleStringRedisMessage) msg).content());
  31. } else if (msg instanceof ErrorRedisMessage) {
  32. System.out.println(((ErrorRedisMessage) msg).content());
  33. } else if (msg instanceof IntegerRedisMessage) {
  34. System.out.println(((IntegerRedisMessage) msg).value());
  35. } else if (msg instanceof FullBulkStringRedisMessage) {
  36. System.out.println(getString((FullBulkStringRedisMessage) msg));
  37. } else if (msg instanceof ArrayRedisMessage) {
  38. for (RedisMessage child : ((ArrayRedisMessage) msg).children()) {
  39. printAggregatedRedisResponse(child);
  40. }
  41. } else {
  42. throw new CodecException("unknown message type: " + msg);
  43. }
  44. }
  45. private static String getString(FullBulkStringRedisMessage msg) {
  46. if (msg.isNull()) {
  47. return "(null)";
  48. }
  49. return msg.content().toString(CharsetUtil.UTF_8);
  50. }
  51. }

注意,上面我们讨论过,我们需要两个handler,分别是inbound handler 和outbound handler 。这里我们使用的是ChannelDuplexHandler。这个ChannelDuplexHandler 支持处理 inbound 和 outbound,其定义如下:

  1. public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
  2. ....
  3. }

运行演示

按照开篇的思路分析,上面我们已经编写好了netty redis client所需要的代码。下面我们需要运行看看。main函数如下:

  1. public static void main(String[] args) throws Exception{
  2. RedisClient client = new RedisClient("your-redis-server-ip",6379);
  3. client.start();
  4. }

我在本地运行了一下,演示了一些命令:

  • get ,set 以及 错误的命令
  • expire命令设置超时时间,及 ttl 命令查看超时时间
  • del 命令删除可以
  • quit退出程序。

结果如下:

  1. connected to host : 192.168.2.120, port : 6379
  2. type redis's command to communicate with redis-server or type 'quit' to shutdown
  3. get hello
  4. >(null)
  5. set hello
  6. >ERR wrong number of arguments for 'set' command
  7. set hello 123
  8. >OK
  9. expire hello 10
  10. >1
  11. ttl hello
  12. >6
  13. ttl hello
  14. >4
  15. get hello
  16. >(null)
  17. set hello world
  18. >OK
  19. get hello
  20. >world
  21. del hello
  22. >1
  23. quit
  24. bye
  25. Process finished with exit code 0

如此,我们便用netty实现了redis的client端。代码下载

如果你觉得还可以,给点个推荐吧!


使用Netty实现HTTP服务器

Netty实现心跳机制

Netty系列

spring如何启动的?这里结合spring源码描述了启动过程

SpringMVC是怎么工作的,SpringMVC的工作原理

spring 异常处理。结合spring源码分析400异常处理流程及解决方法

Mybatis Mapper接口是如何找到实现类的-源码分析

Lua脚本在redis分布式锁场景的运用

CORS详解,CORS原理分析

Keep-Alive 是什么?

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号