- 生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
- transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事
- 物(channel.txCommit())。
- 然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦
- 消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个N
- ack消息给你,你可以进行重试操作。
- //测试确认后回调
- @Service
- public class HelloSender1 implements RabbitTemplate.ConfirmCallback {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- public void send() {
- String context = "你好现在是 " + new Date() +"";
- System.out.println("HelloSender发送内容 : " + context);
- this.rabbitTemplate.setConfirmCallback(this);
- //exchange,queue 都正确,confirm被回调, ack=true
- //this.rabbitTemplate.convertAndSend("exchange","topic.message", context);
- //exchange 错误,queue 正确,confirm被回调, ack=false
- //this.rabbitTemplate.convertAndSend("fasss","topic.message", context);
- //exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
- //this.rabbitTemplate.convertAndSend("exchange","", context);
- //exchange 错误,queue 错误,confirm被回调, ack=false
- this.rabbitTemplate.convertAndSend("fasss","fass", context);
- }
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
- }
- }
- 处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘
- 之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
- 那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步
- ①、将queue的持久化标识durable设置为true,则代表是一个持久的队列
- ②、发送消息的时候将deliveryMode=2
- 这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢
- 失(整个集群都挂掉)