经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » C# » 查看文章
RabbitMQ-消费者"未处理完的消息"丢失
来源:cnblogs  作者:真的是饿洗人  时间:2018/10/23 9:29:18  对本文有异议

一个关于客户端(消费者)开启自动应答,重启后"未处理消息丢失"的小坑。(主要是对RabbitMQ理解不够)

首先,申明一下: 本文所谓的 "丢失消息" 不是指服务器宕机、重启等原因导致内存中消息丢失,也就是说不是关于消息持久化的问题

 

  使用C# 编写测试。

  问题表象:  消费者开启自动应答,某时,消费者掉线(关闭/崩溃等),届时重启消费者,发现消费者未处理完的消息丢失

  条件: 服务器不宕机、不重启,只有一个消费者、一个生产者。

  消息流向:  消息--->生产者--->交换器--->队列--->消费者

  问题的处理: 消费者开启手动应答,若再出现之前情况,消息不丢失。

  先给个代码。

  生产者代码如下:

  1. static void Main(string[] args)
  2. {
  3. var factory = new ConnectionFactory() { HostName = "localhost" };
  4. using (var connection = factory.CreateConnection())
  5. using (var channel = connection.CreateModel())
  6. {
  7.           //申明广播类型交换器
  8. channel.ExchangeDeclare(exchange: "ex1", type: "fanout");
  9.           //申明队列
  10. channel.QueueDeclare(queue: "test1",
  11. durable: false,
  12. exclusive: false,
  13. autoDelete: false,
  14. arguments: null);
  15. int count = 0;
  16. while (true)
  17. {
  18. count++;
  19. var body = Encoding.UTF8.GetBytes(count.ToString());
                //向key为 p 的交换器 ex1 上推数据
  20. channel.BasicPublish(exchange: "ex1",
  21. routingKey: "p",
  22. basicProperties: null,
  23. body: body);
  24. Console.WriteLine($"send msg {count}");
  25. System.Threading.Thread.Sleep(1000);
  26. }
  27. }
  28. }

 

 消费者代码如下(开启自动应答):

  1. static void Main(string[] args)
  2. {
  3. var factory = new ConnectionFactory() { HostName = "localhost" };
  4. using (var connection = factory.CreateConnection())
  5. using (var channel = connection.CreateModel())
  6. {
  7.           //队列与交换器绑定
  8. channel.QueueBind(queue: "test1",
  9. exchange: "ex1",
  10. routingKey: "p");
  11. var consumer = new EventingBasicConsumer(channel);
  12. consumer.Received += (model, ea) =>
  13. {
  14. var body = ea.Body;
  15. var message = Encoding.UTF8.GetString(body);
  16. Console.WriteLine($"收到消息 -- {message}");
  17. System.Threading.Thread.Sleep(2000);
  18. };
  19. channel.BasicConsume(queue: "test1",
  20. autoAck: true,
  21. consumer: consumer);
  22. Console.ReadLine();
  23. }
  24. }

*交换器、队列必须先申明,两样都存在后才能进行绑定。 

 

 

生产者向队列推送消息,队列中展现状态为ready的数据就是未被消费的消息。

推送90个消息:

 

队列中存了90个未应答的消息

 

打开消费者:

发现现在消息已经全部被自动应答,队列已清空。

 

再启动消费者:

如预料,空空一片。

 

        小结 :开启消费者(开启自动应答),发现队列中状态为Ready的消息全部被应答,队列中状态为Ready的消息清空,不等消费者处理完这些消息,关闭消费者,然后再开启消费者,消费者不会再收到消息,出现消费者"未处理"完的消息丢失的问题。

 

同之前先屯90个消息。

然后关闭自动应答。

 

开启消费者:

 

消息状态一次性全部变成unacked。 因为没有写手动处理消息的逻辑,所以unacked状态的消息不会变少。

 

然后关闭消费者:

RabbitMQ 未删除无应答的消息,消息重新转为Ready状态,继续等待连接消费者处理。

 

再开启消费者:

没有出现丢失未处理完消息的情况。

 

     小结:开启消费者(关闭自动应答),发现队列中状态为Ready的消息状态全部转变为unacked,队列中状态为ready的消息清空,随消费者应答,队列中状态为unacked的消息逐渐减少,关闭消费者,发现队列中状态为unacked的消息重新改变回ready状态,

       

  结论:

  关闭自动应答可避免这种消息"丢失的情况"。

  另外在开启自动应答 ack=true 的情况下,需要保证一定有消费者在线,才能保证消息都被接收处理。开启手动应答必然消耗更多资源,因为 RabbitMQ 需要根据应答标号去删除队列中对应的消息。

 

以上仅个人理解,若有错误,欢迎指正~

  

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号