经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Spring Boot » 查看文章
SpringBoot之RabbitMQ的使用方法
来源:jb51  时间:2018/12/24 10:44:15  对本文有异议

一 、RabbitMQ的介绍

RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件,消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下:

从上图可看出,对于消息队列来说,生产者、消息队列、消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息队列,并且当消息队列收到消息之后,接收消息队列传来的消息,并且给予相应的处理。消息队列常用于分布式系统之间互相信息的传递。

对于RabbitMQ来说,除了这三个基本模块以外,还添加了一个模块,即交换机(Exchange)。它使得生产者和消息队列之间产生了隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列。

交换机的主要作用是接收相应的消息并且绑定到指定的队列。交换机有四种类型,分别为Direct、topic、headers、Fanout。

Direct是RabbitMQ默认的交换机模式,也是最简单的模式。即创建消息队列的时候,指定一个BindingKey。当发送者发送消息的时候,指定对应的Key。当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中。

topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中。

headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中。

Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略。

二 、SpringBoot整合RabbitMQ(Direct模式)

SpringBoot整合RabbitMQ非常简单,首先还是pom.xml引入依赖。

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

在application.properties中配置RabbitMQ相关的信息,并首先启动了RabbitMQ实例,并创建两个queue。

  1. spring.application.name=spirng-boot-rabbitmq
  2. spring.rabbitmq.host=127.0.0.1
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=admin
  5. spring.rabbitmq.password=admin

配置Queue(消息队列),由于采用的是Direct模式,需要在配置Queue的时候指定一个键,使其和交换机绑定。

  1. @Configuration
  2. public class RabbitConfig {
  3. @Bean
  4. public org.springframework.amqp.core.Queue Queue() {
  5.  
  6. return new org.springframework.amqp.core.Queue("hello");
  7.  
  8. }
  9. }

接着就可以发送消息啦。在SpringBoot中,我们使用AmqpTemplate去发送消息。代码如下:

  1. @Component
  2. public class HelloSender {
  3. @Autowired
  4. private AmqpTemplate rabbitTemplate;
  5.  
  6. public void send(int index) {
  7.  
  8. String context = "hello Queue "+index + new Date();
  9. System.out.println("Sender : " + context);
  10. this.rabbitTemplate.convertAndSend("hello", context);
  11. }
  12. }

生产者发送消息之后就需要消费者接收消息。这里定义了两个消息消费者,用来模拟生产者与消费者一对多的关系。

  1. @Component
  2. @RabbitListener(queues = "hello")
  3. public class HelloReceiver {
  4. @RabbitHandler
  5. public void process(String hello) {
  6. System.out.println("Receiver1 : " + hello);
  7. }
  8. }
  9. @Component
  10. @RabbitListener(queues = "hello")
  11. public class HelloReceiver2 {
  12. @RabbitHandler
  13. public void process(String hello) {
  14. System.out.println("Receiver2 : " + hello);
  15. }
  16. }

在单元测试中模拟发送消息,批量发送10条消息,两个接收者分别接收了5条消息。

  1. @Autowired
  2. private HelloSender helloSender;
  3. @Test
  4. public void hello() throws Exception {
  5. for(int i=0;i<10;i++)
  6. {
  7. helloSender.send(i);
  8. }
  9. }

实际上RabbitMQ还可以支持发送对象,当然由于涉及到序列化和反序列化,该对象要实现Serilizable接口。这里定义了User对象,用来做发送消息内容。

  1. import java.io.Serializable;
  2. public class User implements Serializable{
  3. private String name;
  4. private String pwd;
  5. public String getPwd() {
  6. return pwd;
  7. }
  8. public void setPwd(String pwd) {
  9. this.pwd = pwd;
  10. }
  11. public String getName() {
  12. return name;
  13. }
  14. public void setName(String name) {
  15. this.name = name;
  16. }
  17. public User(String name, String pwd) {
  18. this.name = name;
  19. this.pwd = pwd;
  20. }
  21. @Override
  22. public String toString() {
  23. return "User{" +"name='" + name + '\'' +", pwd='" + pwd + '\'' +'}';
  24. }
  25. }

在生产者中发送User对象。

  1. @Component
  2. public class ModelSender {
  3. @Autowired
  4. private AmqpTemplate rabbitTemplate;
  5. public void sendModel(User user) {
  6.  
  7. System.out.println("Sender object: " + user.toString());
  8. this.rabbitTemplate.convertAndSend("object", user);
  9.  
  10. }
  11. }

在消费者中接收User对象。

  1. @Component
  2. @RabbitListener(queues = "object")
  3. public class ModelRecevicer {
  4. @RabbitHandler
  5. public void process(User user) {
  6.  
  7. System.out.println("Receiver object : " + user);
  8.  
  9. }
  10. }

在单元测试中注入ModelSender 对象,实例化User对象,然后发送。

  1. @Autowired
  2. private ModelSender modelSender;
  3. @Test
  4. public void model() throws Exception {
  5. User user=new User("abc","123");
  6. modelSender.sendModel(user);
  7. }

三 、SpringBoot整合RabbitMQ(Topic转发模式)

首先需要在RabbitMQ服务端创建交换机topicExchange,并绑定两个queue:topic.message、topic.messages。

新建TopicRabbitConfig,设置对应的queue与binding。

  1. @Configuration
  2. public class TopicRabbitConfig {
  3. final static String message = "topic.message";
  4. final static String messages = "topic.messages";
  5. @Bean
  6. public Queue queueMessage() {
  7. return new Queue(TopicRabbitConfig.message);
  8. }
  9. @Bean
  10. public Queue queueMessages() {
  11. return new Queue(TopicRabbitConfig.messages);
  12. }
  13.  
  14. @Bean
  15. TopicExchange exchange() {
  16. return new TopicExchange("topicExchange");
  17. }
  18.  
  19. @Bean
  20. Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
  21. return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
  22. }
  23.  
  24. @Bean
  25. Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
  26. return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
  27. }
  28. }

创建消息生产者,在TopicSender中发送3个消息。

  1. @Component
  2. public class TopicSender {
  3. @Autowired
  4. private AmqpTemplate rabbitTemplate;
  5.  
  6. public void send() {
  7. String context = "hi, i am message all";
  8. System.out.println("Sender : " + context);
  9. this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context);
  10. }
  11.  
  12. public void send1() {
  13. String context = "hi, i am message 1";
  14. System.out.println("Sender : " + context);
  15. this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
  16. }
  17.  
  18. public void send2() {
  19. String context = "hi, i am messages 2";
  20. System.out.println("Sender : " + context);
  21. this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);
  22. }
  23. }

生产者发送消息,这里创建了两个接收消息的消费者。

  1. @Component
  2. @RabbitListener(queues = "topic.message")
  3. public class TopicReceiver {
  4. @RabbitHandler
  5. public void process(String message) {
  6. System.out.println("Topic Receiver1 : " + message);
  7. }
  8. }
  9.  
  10. @Component
  11. @RabbitListener(queues = "topic.messages")
  12. public class TopicReceiver2 {
  13. @RabbitHandler
  14. public void process(String message) {
  15.  
  16. System.out.println("Topic Receiver2 : " + message);
  17.  
  18. }
  19. }

在单元测试中注入TopicSender,利用topicSender 发送消息。

  1. @Autowired
  2. private TopicSender topicSender;
  3. @Test
  4. public void topicSender() throws Exception {
  5. topicSender.send();
  6. topicSender.send1();
  7. topicSender.send2();
  8. }

从上面的输出结果可以看到,Topic Receiver2 匹配到了所有消息,Topic Receiver1只匹配到了1个消息。

四 、SpringBoot整合RabbitMQ(Fanout Exchange形式)

Fanout Exchange形式又叫广播形式,因此我们发送到路由器的消息会使得绑定到该路由器的每一个Queue接收到消息。首先需要在RabbitMQ服务端创建交换机fanoutExchange,并绑定三个queue:fanout.A、fanout.B、fanout.C。

与Topic类似,新建FanoutRabbitConfig,绑定交换机和队列。

  1. @Configuration
  2. public class FanoutRabbitConfig {
  3. @Bean
  4. public Queue AMessage() {
  5. return new Queue("fanout.A");
  6. }
  7. @Bean
  8. public Queue BMessage() {
  9. return new Queue("fanout.B");
  10. }
  11.  
  12. @Bean
  13. public Queue CMessage() {
  14. return new Queue("fanout.C");
  15. }
  16.  
  17. @Bean
  18. FanoutExchange fanoutExchange() {
  19. return new FanoutExchange("fanoutExchange");
  20. }
  21.  
  22. @Bean
  23. Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
  24. return BindingBuilder.bind(AMessage).to(fanoutExchange);
  25. }
  26.  
  27. @Bean
  28. Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
  29. return BindingBuilder.bind(BMessage).to(fanoutExchange);
  30. }
  31.  
  32. @Bean
  33. Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
  34. return BindingBuilder.bind(CMessage).to(fanoutExchange);
  35. }
  36. }

创建消息生产者,在FanoutSender中发送消息。

  1. @Component
  2. public class FanoutSender {
  3.  
  4. @Autowired
  5. private AmqpTemplate rabbitTemplate;
  6.  
  7. public void send() {
  8. String context = "hi, fanout msg ";
  9. System.out.println("FanoutSender : " + context);
  10. this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
  11.  
  12. }
  13. }

然后创建了3个接收者FanoutReceiverA、FanoutReceiverB、FanoutReceiverC。

  1. @Component
  2. @RabbitListener(queues = "fanout.A")
  3. public class FanoutReceiverA {
  4. @RabbitHandler
  5. public void process(String message) {
  6. System.out.println("fanout Receiver A : " + message);
  7. }
  8. }
  9. @Component
  10. @RabbitListener(queues = "fanout.B")
  11. public class FanoutReceiverB {
  12. @RabbitHandler
  13. public void process(String message) {
  14. System.out.println("fanout Receiver B: " + message);
  15. }
  16. }
  17. @Component
  18. @RabbitListener(queues = "fanout.C")
  19. public class FanoutReceiverC {
  20. @RabbitHandler
  21. public void process(String message) {
  22. System.out.println("fanout Receiver C: " + message);
  23. }
  24. }

在单元测试中注入消息发送者,发送消息。

  1. @Autowired
  2. private FanoutSender fanoutSender;
  3. @Test
  4. public void fanoutSender() throws Exception {
  5. fanoutSender.send();
  6. }

从下图可以看到3个队列都接收到了消息。

本章节创建的类比较多,下图为本章节的结构,也可以直接查看demo源码了解。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持w3xue。

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号