经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Spring » 查看文章
Springboot整合RabbitMq测试TTL的方法详解
来源:jb51  时间:2022/3/1 11:38:50  对本文有异议

什么是TTL?

RabbitMq中,存在一种高级特性 TTL

TTLTime To Live的缩写,含义为存活时间或者过期时间。即:

设定消息在队列中存活的时间。
指定时间内,消息依旧未被消费,则由队列自动将其删除。

如何设置TTL?

既然涉及到设定消息的存活时间,在RabbitMq中,存在两种设置方式:

  • 设置整个队列的过期时间。
  • 设置单个消息的过期时间。

在这里插入图片描述

设定整个队列的过期时间

按照上一篇文章的依赖导入和配置编写方式进行。

Springboot——整合Rabbitmq之Confirm和Return详解

配置类编写

在原有基础之上,新创建几个配置的bean类,申明bean对象,并进行交换机队列的关联,如下所示:、

  1. package cn.linkpower.config;
  2.  
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.beans.factory.annotation.Qualifier;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. @Configuration
  10. public class MQConfiguration {
  11. // =========================== Direct 直连模式 ==================================
  12. //队列名称
  13. public static final String QUEUQ_NAME = "xiangjiao.queue";
  14. //交换器名称
  15. public static final String EXCHANGE = "xiangjiao.exchange";
  16. //路由key
  17. public static final String ROUTING_KEY = "xiangjiao.routingKey";
  18. // =========================== Direct 普通队列申明 和 交换机绑定 ===================
  19. //创建队列
  20. @Bean(value = "getQueue")
  21. public Queue getQueue(){
  22. //QueueBuilder.durable(QUEUQ_NAME).build();
  23. return new Queue(QUEUQ_NAME);
  24. }
  25. //实例化交换机
  26. @Bean(value = "getDirectExchange")
  27. public DirectExchange getDirectExchange(){
  28. //DirectExchange(String name, boolean durable, boolean autoDelete)
  29. /**
  30. * 参数一:交换机名称;<br>
  31. * 参数二:是否永久;<br>
  32. * 参数三:是否自动删除;<br>
  33. */
  34. //ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
  35. return new DirectExchange(EXCHANGE, true, false);
  36. //绑定消息队列和交换机
  37. @Bean
  38. public Binding bindExchangeAndQueue(@Qualifier(value = "getDirectExchange") DirectExchange exchange,
  39. @Qualifier(value = "getQueue") Queue queue){
  40. return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
  41. // =========================== TTL ================================
  42. public static final String ttl_queue_name = "xiangjiao.ttl.queue";
  43. public static final String ttl_exchange_name = "xiangjiao.ttl.exchange";
  44. public static final String ttl_routing_key = "xiangjiao.ttl.routingKey";
  45. @Bean(value = "getTtlQueue")
  46. public Queue getTtlQueue(){
  47. // 设置 ttl 队列,并设定 x-message-ttl 参数,表示 消息存活最大时间,单位 ms
  48. //return QueueBuilder.durable(ttl_queue_name).withArgument("x-message-ttl",10000).build();
  49. Map<String, Object> arguments = new HashMap<>();
  50. arguments.put("x-message-ttl",10000);
  51. return new Queue(ttl_queue_name,true,false,false,arguments);
  52. @Bean(value = "getTTlExchange")
  53. public DirectExchange getTTlExchange(){
  54. // 设置交换机属性,并保证交换机持久化
  55. return new DirectExchange(ttl_exchange_name, true, false);
  56. public Binding bindExchangeAndQueueTTL(@Qualifier(value = "getTTlExchange") DirectExchange getTTlExchange,
  57. @Qualifier(value = "getTtlQueue") Queue queue){
  58. return BindingBuilder.bind(queue).to(getTTlExchange).with(ttl_routing_key);
  59. }

对比原有的配置类,不难发现区别:

队列设置过期属性,只需要传递一个 x-message-ttl 的属性值即可。(单位:ms)

  1. Map<String, Object> arguments = new HashMap<>();
  2. arguments.put("x-message-ttl",10000);
  3. return new Queue(ttl_queue_name,true,false,false,arguments);

然后定义交换机类型,并将指定的交换机和队列进行绑定。

为了测试效果,暂未定义任何该队列的消费者信息。

测试

为了便于测试,需要定义一个接口,生产新的数据信息,并将数据向对应的Exchange中传递。

  1. /**
  2. * 发送消息,指定ttl参数信息(队列)
  3. * @return
  4. */
  5. @RequestMapping("/sendQueueTtl")
  6. @ResponseBody
  7. public String sendQueueTtl(){
  8. //发送10条消息
  9. for (int i = 0; i < 10; i++) {
  10. String msg = "msg"+i;
  11. System.out.println("发送消息 msg:"+msg);
  12. rabbitmqService.sendMessage(MQConfiguration.ttl_exchange_name,MQConfiguration.ttl_routing_key,msg);
  13. //每两秒发送一次
  14. try {
  15. Thread.sleep(8000);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. return "send ok";
  21. }

两条消息之间的过期时间为8s

请求链接进行测试,查看Rabbitmq web视图信息:

http://localhost/sendQueueTtl

在这里插入图片描述

在这里插入图片描述

查看控制台输出日志:

在这里插入图片描述

消息正常发送到了Exchange,同时Exchange 也将消息推送到了指定的队列 !

设置有ConfirmReturn监听。

【说明:】

给队列设定时间后,单位时间内的消息如果未被消费,则队列会将其中的数据进行删除处理。

对单个消息设定过期时间

上面的操作和测试,已经验证对队列设定过期时间,会导致所有的消息过期时间都是一样的现象。

但实际开发中,可能一个队列需要存放不同过期时间的消息信息,如果需要进行实现,就不能再设定队列的过期时间信息了,需要采取下面要说到的针对单个消息,设置不同过期时间

配置

既然是针对单个消息设定不同的过期时间操作,则需要去掉队列过期设置。

为了测试的简单化,此处采取直连 Direct 交换机类型,进行交换机和队列数据的绑定方式。如下所示:

  1. // =========================== Direct 直连模式 ==================================
  2. //队列名称
  3. public static final String QUEUQ_NAME = "xiangjiao.queue";
  4. //交换器名称
  5. public static final String EXCHANGE = "xiangjiao.exchange";
  6. //路由key
  7. public static final String ROUTING_KEY = "xiangjiao.routingKey";
  8.  
  9. // =========================== Direct 普通队列申明 和 交换机绑定 ===================
  10. //创建队列
  11. @Bean(value = "getQueue")
  12. public Queue getQueue(){
  13. //QueueBuilder.durable(QUEUQ_NAME).build();
  14. return new Queue(QUEUQ_NAME);
  15. }
  16. //实例化交换机
  17. @Bean(value = "getDirectExchange")
  18. public DirectExchange getDirectExchange(){
  19. //DirectExchange(String name, boolean durable, boolean autoDelete)
  20. /**
  21. * 参数一:交换机名称;<br>
  22. * 参数二:是否永久;<br>
  23. * 参数三:是否自动删除;<br>
  24. */
  25. //ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
  26. return new DirectExchange(EXCHANGE, true, false);
  27. }
  28. //绑定消息队列和交换机
  29. @Bean
  30. public Binding bindExchangeAndQueue(@Qualifier(value = "getDirectExchange") DirectExchange exchange,
  31. @Qualifier(value = "getQueue") Queue queue){
  32.  
  33. return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
  34. }

对于消息的发送,依旧沿用之前写的发送处理方式

设定confirmreturn监听,保证消息能够正常到达指定的队列中。

测试

编写一个测试的接口,设定单个消息的过期时间属性,保证不同消息具备不同的过期时间

在之前博客中,针对消息的持久化设置,需要保证消息向队列设定属性时,传递一个deliveryMode参数值信息。

同理,设定每个消息的过期时间,也需要设定对应的属性信息。如下所示:

  1. /**
  2. * 发送消息,指定ttl参数信息(单个消息);
  3. * 测试需要将消息消费者关闭监听
  4. * @return
  5. */
  6. @RequestMapping("/sendTtl")
  7. @ResponseBody
  8. public String sendTtl(){
  9. //发送10条消息
  10. for (int i = 0; i < 10; i++) {
  11. String msg = "msg"+i;
  12. System.out.println("发送消息 msg:"+msg);
  13. MessageProperties messageProperties = new MessageProperties();
  14. messageProperties.setExpiration("5000"); // 针对消息设定时限
  15. // 将消息数据和设置属性进行封装,采取消息发送模板,将消息数据推送至指定的交换机 exchange 中
  16. Message message = new Message(msg.getBytes(), messageProperties);
  17. rabbitmqService.sendMessage(MQConfiguration.EXCHANGE, MQConfiguration.ROUTING_KEY,message);
  18. //每两秒发送一次
  19. try {
  20. Thread.sleep(3000);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. return "send ok";
  26. }

上面代码的编写核心为将消息内容体和消息对象属性进行封装

  1. MessageProperties messageProperties = new MessageProperties();
  2. messageProperties.setExpiration("5000"); // 针对消息设定时限
  3. // 将消息数据和设置属性进行封装,采取消息发送模板,将消息数据推送至指定的交换机 exchange 中
  4. Message message = new Message(msg.getBytes(), messageProperties);

引申一点:消息的持久化
Springboot 2.x ——RabbitTemplate为什么会默认消息持久化?

请求连接进行测试:

http://localhost/sendTtl

在这里插入图片描述

查看控制台打印日志情况:

在这里插入图片描述

总结

1、设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期

2、设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期

3、如果两者都进行了设置,以时间短的为准。

代码下载

gitee 代码下载

到此这篇关于Springboot整合RabbitMq测试TTL的文章就介绍到这了,更多相关Springboot整合RabbitMq内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号