经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Redis » 查看文章
分布式利器redis及redisson的延迟队列实践
来源:jb51  时间:2022/3/1 15:40:05  对本文有异议

前言碎语

首先说明下需求,一个用户中心产品,用户在试用产品有三天的期限,三天到期后准时准点通知用户,试用产品到期了。这个需求如果不是准时通知,而是每天定点通知就简单了。如果需要准时通知就只能上延迟队列了。使用场景除了如上,典型的业务场景还有电商中的延时未支付订单失效等等。

延迟队列多种实现方式

  • 1.如基于RabbitMQ的队列ttl+死信路由策略:通过设置一个队列的超时未消费时间,配合死信路由策略,到达时间未消费后,回会将此消息路由到指定队列
  • 2.基于RabbitMQ延迟队列插件(rabbitmq-delayed-message-exchange):发送消息时通过在请求头添加延时参数(headers.put("x-delay", 5000))即可达到延迟队列的效果
  • 3.使用redis的zset有序性,轮询zset中的每个元素,到点后将内容迁移至待消费的队列,(redisson已有实现)
  • 4.使用redis的key的过期通知策略,设置一个key的过期时间为延迟时间,过期后通知客户端

redisson中的延迟队列实现

怎么封装便于业务使用。

1.首先定义一个延迟job,里面包含一个map参数,和队列执行器的具体实现class,触发任务执行时,map参数会被传递到具体的业务执行器实现内

  1. /**
  2. * Created by kl on 2018/7/20.
  3. * Content :延时job
  4. */
  5. public class DelayJob {
  6. private Map jobParams;//job执行参数
  7. private Class aClass;//具体执行实例实现
  8. }

2.定义一个延迟job执行器接口,业务需要实现这个接口,然后在execute方法内写自己的业务逻辑

  1. /**
  2. * Created by kl on 2018/7/20.
  3. * Content :延时job执行器接口
  4. */
  5. public interface ExecuteJob {
  6. void execute(DelayJob job);
  7. }

3.消费已经到点的延时job服务,通过job参数调用业务执行器实现

  1. @Component
  2. public class JobTimer {
  3. static final String jobsTag = "customer_jobtimer_jobs";
  4. @Autowired
  5. private RedissonClient client;
  6. @Autowired
  7. private ApplicationContext context;
  8. ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
  9. @PostConstruct
  10. public void startJobTimer() {
  11. RBlockingQueueblockingQueue = client.getBlockingQueue(jobsTag);
  12. new Thread() {
  13. @Override
  14. public void run() {
  15. while (true) {
  16. try {
  17. DelayJob job = blockingQueue.take();
  18. executorService.execute(new ExecutorTask(context, job));
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. try {
  22. TimeUnit.SECONDS.sleep(60);
  23. } catch (Exception ex) {
  24. }
  25. }
  26. }
  27. }
  28. }.start();
  29. }
  30. class ExecutorTask implements Runnable {
  31. private ApplicationContext context;
  32. private DelayJob delayJob;
  33. public ExecutorTask(ApplicationContext context, DelayJob delayJob) {
  34. this.context = context;
  35. this.delayJob = delayJob;
  36. }
  37. @Override
  38. public void run() {
  39. ExecuteJob service = (ExecuteJob) context.getBean(delayJob.getaClass());
  40. service.execute(delayJob);
  41. }
  42. }
  43. }

4.封装延时job服务

  1. /**
  2. * Created by kl on 2018/7/20.
  3. * Content :延时job服务
  4. */
  5. @Component
  6. public class DelayJobService {
  7. @Autowired
  8. private RedissonClient client;
  9. public void submitJob(DelayJob job, Long delay, TimeUnit timeUnit){
  10. RBlockingQueueblockingQueue = client.getBlockingQueue(JobTimer.jobsTag);
  11. RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue);
  12. delayedQueue.offer(job,delay,timeUnit);
  13. }
  14. }

文末结语

redisson作为一个分布式利器,这么好用的工具没人用有点可惜,还有一个原因是有个想法,想将延迟队列这个功能封装成一个spring boot的start依赖,然后开源出来,造福四方,希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号