经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Spring » 查看文章
详解SpringBoot中使用RabbitMQ的RPC功能
来源:jb51  时间:2021/11/15 20:02:43  对本文有异议

一、RabbitMQ的RPC简介

实际业务中,有的时候我们还需要等待消费者返回结果给我们,或者是说我们需要消费者上的一个功能、一个方法或是一个接口返回给我们相应的值,而往往大型的系统软件,生产者跟消费者之间都是相互独立的两个系统,部署在两个不同的电脑上,不能通过直接对象.方法的形式获取想要的结果,这时候我们就需要用到RPC(Remote Procedure Call)远程过程调用方式。
RabbitMQ实现RPC的方式很简单,生产者发送一条带有标签(消息ID(correlation_id)+回调队列名称)的消息到发送队列,消费者(也称RPC服务端)从发送队列获取消息并处理业务,解析标签的信息将业务结果发送到指定的回调队列,生产者从回调队列中根据标签的信息获取发送消息的返回结果。

在这里插入图片描述

如图,客户端C发送消息,指定消息的ID=rpc_id,回调响应的队列名称为rpc_resp,消息从C发送到rpc_request队列,服务端S获取消息业务处理之后,将correlation_id附加到响应的结果发送到指定的回调队列rpc_resp中,客户端从回调队列获取消息,匹配与发送消息的correlation_id相同的值为消息应答结果。

二、SpringBoot中使用RabbitMQ的RPC功能

注意:springboot中使用的时候,correlation_id为系统自动生成的,reply_to在加载AmqpTemplate实例的时候设置的。

实例:
说明:队列1为发送队列,队列2为返回队列

1.先配置rabbitmq

  1. package com.ws.common;
  2.  
  3. import org.springframework.amqp.core.Binding;
  4. import org.springframework.amqp.core.BindingBuilder;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.amqp.core.TopicExchange;
  7. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  8. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  9. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  10. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.beans.factory.annotation.Value;
  13. import org.springframework.context.annotation.Bean;
  14. import org.springframework.context.annotation.Configuration;
  15.  
  16.  
  17. /*
  18. * rabbitMQ配置类
  19. */
  20. @Configuration
  21. public class RabbitMQConfig {
  22. public static final String TOPIC_QUEUE1 = "topic.queue1";
  23. public static final String TOPIC_QUEUE2 = "topic.queue2";
  24. public static final String TOPIC_EXCHANGE = "topic.exchange";
  25. @Value("${spring.rabbitmq.host}")
  26. private String host;
  27. @Value("${spring.rabbitmq.port}")
  28. private int port;
  29. @Value("${spring.rabbitmq.username}")
  30. private String username;
  31. @Value("${spring.rabbitmq.password}")
  32. private String password;
  33. @Autowired
  34. ConnectionFactory connectionFactory;
  35. @Bean(name = "connectionFactory")
  36. public ConnectionFactory connectionFactory() {
  37. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  38. connectionFactory.setHost(host);
  39. connectionFactory.setPort(port);
  40. connectionFactory.setUsername(username);
  41. connectionFactory.setPassword(password);
  42. connectionFactory.setVirtualHost("/");
  43. return connectionFactory;
  44. }
  45. @Bean
  46. public RabbitTemplate rabbitTemplate() {
  47. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  48. //设置reply_to(返回队列,只能在这设置)
  49. rabbitTemplate.setReplyAddress(TOPIC_QUEUE2);
  50. rabbitTemplate.setReplyTimeout(60000);
  51. return rabbitTemplate;
  52. }
  53. //返回队列监听器(必须有)
  54. @Bean(name="replyMessageListenerContainer")
  55. public SimpleMessageListenerContainer createReplyListenerContainer() {
  56. SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
  57. listenerContainer.setConnectionFactory(connectionFactory);
  58. listenerContainer.setQueueNames(TOPIC_QUEUE2);
  59. listenerContainer.setMessageListener(rabbitTemplate());
  60. return listenerContainer;
  61. }
  62.  
  63. //创建队列
  64. @Bean
  65. public Queue topicQueue1() {
  66. return new Queue(TOPIC_QUEUE1);
  67. }
  68. @Bean
  69. public Queue topicQueue2() {
  70. return new Queue(TOPIC_QUEUE2);
  71. }
  72. //创建交换机
  73. @Bean
  74. public TopicExchange topicExchange() {
  75. return new TopicExchange(TOPIC_EXCHANGE);
  76. }
  77. //交换机与队列进行绑定
  78. @Bean
  79. public Binding topicBinding1() {
  80. return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_QUEUE1);
  81. }
  82. @Bean
  83. public Binding topicBinding2() {
  84. return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_QUEUE2);
  85. }
  86. }
  87.  

2.发送消息并同步等待返回值

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3.  
  4.  
  5. //报文body
  6. String sss = "报文的内容";
  7. //封装Message
  8. Message msg = this.con(sss);
  9. log.info("客户端--------------------"+msg.toString());
  10. //使用sendAndReceive方法完成rpc调用
  11. Message message=rabbitTemplate.sendAndReceive(RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE1, msg);
  12. //提取rpc回应内容body
  13. String response = new String(message.getBody());
  14. log.info("回应:" + response);
  15. log.info("rpc完成---------------------------------------------");
  16.  
  17.  
  18. public Message con(String s) {
  19. MessageProperties mp = new MessageProperties();
  20. byte[] src = s.getBytes(Charset.forName("UTF-8"));
  21. //mp.setReplyTo("adsdas"); 加载AmqpTemplate时设置,这里设置没用
  22. //mp.setCorrelationId("2222"); 系统生成,这里设置没用
  23. mp.setContentType("application/json");
  24. mp.setContentEncoding("UTF-8");
  25. mp.setContentLength((long)s.length());
  26. return new Message(src, mp);
  27. }

3.写消费者

  1. package com.ws.listener.mq;
  2.  
  3. import java.nio.charset.Charset;
  4.  
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.core.MessageProperties;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Component;
  11.  
  12. import com.ws.common.RabbitMQConfig;
  13.  
  14. import lombok.extern.slf4j.Slf4j;
  15.  
  16. @Slf4j
  17. @Component
  18. public class Receiver {
  19. @Autowired
  20. private RabbitTemplate rabbitTemplate;
  21. @RabbitListener(queues=RabbitMQConfig.TOPIC_QUEUE1)
  22. public void receiveTopic1(Message msg) {
  23. log.info("队列1:"+msg.toString());
  24. String msgBody = new String(msg.getBody());
  25. //数据处理,返回的Message
  26. Message repMsg = con(msgBody+"返回了", msg.getMessageProperties().getCorrelationId());
  27. rabbitTemplate.send(RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE2, repMsg);
  28. }
  29. @RabbitListener(queues=RabbitMQConfig.TOPIC_QUEUE2)
  30. public void receiveTopic2(Message msg) {
  31. log.info("队列2:"+msg.toString());
  32. }
  33. public Message con(String s, String id) {
  34. MessageProperties mp = new MessageProperties();
  35. byte[] src = s.getBytes(Charset.forName("UTF-8"));
  36. mp.setContentType("application/json");
  37. mp.setContentEncoding("UTF-8");
  38. mp.setCorrelationId(id);
  39. return new Message(src, mp);
  40. }
  41. }

日志打印:

2019-06-26 17:11:16.607 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - 客户端--------------------(Body:‘报文的内容' MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=5, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])

2019-06-26 17:11:16.618 [SimpleAsyncTaskExecutor-1] INFO com.ws.listener.mq.Receiver - 队列1:(Body:‘报文的内容' MessageProperties [headers={}, correlationId=1, replyTo=topic.queue2, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=topic.queue1, deliveryTag=1, consumerTag=amq.ctag-8IzlhblYmTebqUYd-uferw, consumerQueue=topic.queue1])

2019-06-26 17:11:16.623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - 回应:报文的内容返回了

2019-06-26 17:11:16.623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - rpc完成---------------------------------------------

到此这篇关于SpringBoot中使用RabbitMQ的RPC功能的文章就介绍到这了,更多相关SpringBoot使用RabbitMQ内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号