经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
RabbitMQ如何解决各种情况下丢数据的问题
来源:cnblogs  作者:撸码识途  时间:2018/11/19 9:28:29  对本文有异议

 

  1. spring.rabbitmq.listener.direct.acknowledge-mode=manual
 如果在配置文件中没有设置以上这个ACK确认,那么消费者每次重启都会收到这个消息。
 可以结合confirm使用,处理生产者和消费者丢数据的问题。
 生产者丢数据:
  1. confirm会发送一个Ack给生产者(包含消息的唯一ID),如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。这个解决生产者丢失消息的问题。

消费者丢数据:

  1. 如果在配置文件中设置了ACK确认,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,
    然后一直抛异常;如果对异常进行了捕获,但是没有在finallyack,也会一直重复发送消息(重试机制)。

 

 
 
如何解决各种丢数据的问题:

1.生产者丢数据

  1. 生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transactionconfirm模式来确保生产者不丢消息。
  2. transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事
  3. 物(channel.txCommit())。
  4. 然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦
  5. 消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个N
  6. ack消息给你,你可以进行重试操作。

下面演示一下confirm模式:

  1. //测试确认后回调
  2. @Service
  3. public class HelloSender1 implements RabbitTemplate.ConfirmCallback {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. public void send() {
  7. String context = "你好现在是 " + new Date() +"";
  8. System.out.println("HelloSender发送内容 : " + context);
  9. this.rabbitTemplate.setConfirmCallback(this);
  10. //exchange,queue 都正确,confirm被回调, ack=true
  11. //this.rabbitTemplate.convertAndSend("exchange","topic.message", context);
  12. //exchange 错误,queue 正确,confirm被回调, ack=false
  13. //this.rabbitTemplate.convertAndSend("fasss","topic.message", context);
  14. //exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
  15. //this.rabbitTemplate.convertAndSend("exchange","", context);
  16. //exchange 错误,queue 错误,confirm被回调, ack=false
  17. this.rabbitTemplate.convertAndSend("fasss","fass", context);
  18. }
  19. @Override
  20. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  21. System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
  22. }
  23. }

2.消息队列丢数据

  1. 处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘
  2. 之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
  3. 那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步
  4. ①、将queue的持久化标识durable设置为true,则代表是一个持久的队列
  5. ②、发送消息的时候将deliveryMode=2
  6. 这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢
  7. 失(整个集群都挂掉)
  1. /**
  2. * 第二个参数:queue的持久化是通过durable=true来实现的。
  3. * 第三个参数:exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:
       1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;
       2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
       3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。
  4. * 第四个参数:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
  5. * @param
  6. * @return
  7. * @Author zxj
  8. */
  9. @Bean
  10. public Queue queue() {
  11. Map<String, Object> arguments = new HashMap<>();
  12. arguments.put("x-message-ttl", 25000);//25秒自动删除
  13. Queue queue = new Queue("topic.messages", true, false, true, arguments);
  14. return queue;
  15. }

 

  1. MessageProperties properties=new MessageProperties();
  2. properties.setContentType(MessageProperties.DEFAULT_CONTENT_TYPE);
  3. properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);
  4. Message message=new Message("hello".getBytes(),properties);
  5. this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);

 

3.消费者丢数据

  1. 启用手动确认模式可以解决这个问题
  2. ①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让
    消息不断重试。
  3. ②手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如
    果对异常进行了捕获,但是没有在finallyack,也会一直重复发送消息(重试机制)。
  4. ③不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

 

 

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号