基本概念
channel:消息通道
exchage:消息路由规则,四种模式(topic、direct、fanout、header)
- direct:默认,根据routingKey完全匹配,好处是先匹配再发送
- topic:根据绑定关键字通配符规则匹配、比较灵活
- fanout:不需要指定routingkey,相当于群发
- header:不太常用,可以自定义匹配规则
queue:消息存储
routerKey:消息路由关键字(发送的时候成为bindingkey,接收成为routingKey)
队列的概念(生产者消费者启动报错大多数都是这几个不匹配导致)
durable:持久化到硬盘
exclusive:唯一性
autoDelete:自动删除
- /**
- * 第二个参数:queue的持久化是通过durable=true来实现的。
- * 第三个参数:exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:
1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;
2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。 - * 第四个参数:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
- * @param
- * @return
- * @Author zxj
- */
- @Bean
- public Queue queue() {
- Map<String, Object> arguments = new HashMap<>();
- arguments.put("x-message-ttl", 25000);//25秒自动删除
- Queue queue = new Queue("topic.messages", true, false, true, arguments);
- return queue;
- }
- @Configuration
- public class RabbitConfig {
- // 发送消息的格式转换器
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate template = new RabbitTemplate(connectionFactory);
- template.setMessageConverter(new Jackson2JsonMessageConverter());
- return template;
- }
- // 接收消息的格式转换器
- @Bean
- public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- factory.setMessageConverter(new Jackson2JsonMessageConverter());
- return factory;
- }
- // 信道配置,此地使用direct模式
- @Bean
- public DirectExchange defaultExchange() {
- return new DirectExchange(MQConstant.EXCHANGE, true, false);
- }
- // 配置队列规则属性 例如保活时间 持久化 是否自动删除等
- @Bean
- public Queue queue() {
- Map<String, Object> arguments = new HashMap<>();
- arguments.put("x-message-ttl", 25000);//25秒自动删除
- Queue queue = new Queue(MQConstant.QUEUE_NAME, true, false, true, arguments);
- return queue;
- }
- // 绑定队列和exchange
- @Bean
- public Binding binding() {
- return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.QUEUE_NAME);
- }
- }
- 如何发送
- @Autowired
- private RabbitTemplate template;
- //convertAndSend和send的区别是这个convert更方便使用,可以传object进去
- template.convertAndSend(MQConstant.EXCHANGE, bindingKey, msg);
- //如何接收,注意队列名称、exchange名称、routingKey的指定。
- //注意:队列的消息只要被一个消费者匹配消费后就不存在了
- @Component
- @RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "default_queue", durable = "true"), exchange = @Exchange(value = "default_exchange", type = ExchangeTypes.TOPIC), key = "meeting"))
- @Log4j
- public class RabbitMqListener {
- @RabbitHandler
- public void processMessage(MqMsg message) {
- log.error( message);
- }
- }
Rabbitmq常用配置
- spring.rabbitmq.addresses指定client连接到的server的地址,多个以逗号分隔.
- spring.rabbitmq.dynamic是否创建AmqpAdmin bean. 默认为: true)
- spring.rabbitmq.host指定RabbitMQ host.默认为: localhost)
- spring.rabbitmq.listener.acknowledge-mode指定Acknowledge的模式.
- spring.rabbitmq.listener.auto-startup是否在启动时就启动mq,默认: true)
- spring.rabbitmq.listener.concurrency指定最小的消费者数量.
- spring.rabbitmq.listener.max-concurrency指定最大的消费者数量.
- spring.rabbitmq.listener.prefetch指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
- spring.rabbitmq.listener.transaction-size指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
- spring.rabbitmq.password指定broker的密码.
- spring.rabbitmq.port指定RabbitMQ 的端口,默认: 5672)
- spring.rabbitmq.requested-heartbeat指定心跳超时,0为不指定.
- spring.rabbitmq.ssl.enabled是否开始SSL,默认: false)
- spring.rabbitmq.ssl.key-store指定持有SSL certificate的key store的路径
- spring.rabbitmq.ssl.key-store-password指定访问key store的密码.
- spring.rabbitmq.ssl.trust-store指定持有SSL certificates的Trust store.
- spring.rabbitmq.ssl.trust-store-password指定访问trust store的密码.
- spring.rabbitmq.username指定登陆broker的用户名.
- spring.rabbitmq.virtual-host指定连接到broker的Virtual host.
- spring.rabbitmq.publisher-confirms=true 开启发送确认
- spring.rabbitmq.publisher-returns=true 开启发送失败退回