经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Redis » 查看文章
基于Redis的简易延时队列
来源:cnblogs  作者:熊熊会发光哦  时间:2023/12/11 10:23:57  对本文有异议

基于Redis的简易延时队列

一、背景

在实际的业务场景中,经常会遇到需要延时处理的业务,比如订单超时未支付,需要取消订单,或者是用户注册后,需要在一段时间内激活账号,否则账号失效等等。这些业务场景都可以通过延时队列来实现。
最近在实际业务当中就遇到了这样的一个场景,需要实现一个延时队列,用来处理订单超时未支付的业务。在网上找了一些资料,发现大部分都是使用了mq来实现,比如rabbitmq,rocketmq等等,但是这些mq都是需要安装的,而且还需要配置,对于此项目来说不想增加额外的依赖,所以就想到了使用redis来实现一个简易的延时队列。

二、实现思路

1. 业务场景

订单超时未支付,需要取消订单,这个业务场景可以分为两个步骤来实现:

  1. 用户下单后,将订单信息存入数据库,并将订单信息存入延时队列中,设置延时时间为30分钟。
  2. 30分钟后,从延时队列中取出订单信息,判断订单是否已支付,如果未支付,则取消订单。
  3. 如果用户在30分钟内支付了订单,则将订单从延时队列中删除。

2. 实现思路

  1. 使用redis的zset来实现延时队列,zset的score用来存储订单的超时时间,value用来存储订单信息。
  2. 使用redis的set来存储已支付的订单,set中的value为订单id。

三、实现代码

1. 使用了两个注解类分别标记生产者类、生产者方法,消费者方法

  1. /**
  2. * @program:
  3. * @description: redis延时队列生产者类注解,标记生产者类,用来扫描生产者类中的生产者方法,将生产者方法注册到redis延时队列中
  4. * @author: jiangchengxuan
  5. * @created: 2023/12/09 10:32
  6. */
  7. @Component
  8. @Documented
  9. @Target({ElementType.TYPE})
  10. @Retention(RetentionPolicy.RUNTIME)
  11. public @interface RedisMessageQueue {}
  1. /**
  2. * @program:
  3. * @description:
  4. * 带有此注解的方法,方法的入参首先会被转换为json字符串,然后存入redis的zset中,score为当前时间+延时时间,value为json字符串
  5. * 当延时时间到达后,会从redis的zset中取出value,然后将value转换为入参类型,调用此方法,执行业务逻辑
  6. * 此注解只能标记在方法上,且方法必须为public,且只能有一个参数
  7. * 此注解标记的方法,必须在redis延时队列生产者类中,否则不会生效
  8. * @author: jiangchengxuan
  9. * @created: 2023/12/09 10:37
  10. */
  11. @Documented
  12. @Target(ElementType.METHOD)
  13. @Retention(RetentionPolicy.RUNTIME)
  14. public @interface RedisMessageQueueMethod {
  15. String threadName() default "redis消息队列默认线程";
  16. String queueKey(); // 队列key值
  17. int threadNum() default 1; //默认线程数量
  18. int threadSleepTime() default 500; //默认线程休眠时间默认500ms
  19. }

2. 生产者类具体实现

  1. /**
  2. * @program:
  3. * @description: 生产者类具体实现
  4. * @author: jiangchengxuan
  5. * @created: 2023/12/09 10:44
  6. */
  7. @Slf4j
  8. @Component
  9. public class DelayQueueWorkerConfig implements InitializingBean {
  10. private volatile boolean monitorStarted = false;
  11. private volatile boolean monitorShutDowned = false;
  12. private ExecutorService executorService;
  13. // 需要监控的延时队列
  14. @Autowired
  15. protected IDelayQueue<String> monitorQueue;
  16. @Autowired
  17. private ApplicationContext applicationContext;
  18. @Override
  19. public void afterPropertiesSet(){
  20. //spring工具类,可以获取指定注解的类
  21. Map<String, Object> allNeedClass = applicationContext.getBeansWithAnnotation(RedisMessageQueue.class);
  22. for (Map.Entry<String, Object> entry : allNeedClass.entrySet()) {
  23. Object bean = entry.getValue();
  24. Method[] methods = bean.getClass().getMethods();
  25. for (Method method : methods) {
  26. Annotation[] annotations = method.getDeclaredAnnotations();
  27. for (Annotation annotation : annotations) {
  28. if (annotation instanceof RedisMessageQueueMethod) {
  29. RedisMessageQueueMethod queueMethod = (RedisMessageQueueMethod) annotation;
  30. //找的需要使用消息队列的方法后,
  31. initExecuteQueue(queueMethod, method, bean);
  32. }
  33. }
  34. }
  35. }
  36. }
  37. /**
  38. * 初始化执行造作
  39. * @param queueAnnotations 注解
  40. * @param method 方法
  41. * @param bean 对象
  42. */
  43. void initExecuteQueue(RedisMessageQueueMethod queueAnnotations ,Method method,Object bean) {
  44. String threadName = queueAnnotations.threadName();
  45. int threadNum = queueAnnotations.threadNum();
  46. int threadSheepTime = queueAnnotations.threadSleepTime();
  47. String queueKey = queueAnnotations.queueKey();
  48. //获取所有消息队列名称
  49. executorService = Executors.newFixedThreadPool(threadNum);
  50. for (int i = 0; i < threadNum; i++) {
  51. final int num = i;
  52. executorService.execute(() -> {
  53. Thread.currentThread().setName(threadName + "[" + num + "]");
  54. //如果没有设置队列queuekey或者已经暂停则不执行
  55. while (!monitorShutDowned) {
  56. String value = null;
  57. try {
  58. value = monitorQueue.get(queueKey);
  59. // 获取数据时进行删除操作,删除成功,则进行处理,业务逻辑处理失败则继续添加回队列但是时间设置最大以达到保存现场的目的,防止并发获取重复数据
  60. if (StringUtils.isNotEmpty(value)) {
  61. if (log.isDebugEnabled()) {
  62. log.debug("Monitor Thread[" + Thread.currentThread().getName() + "], get from queue,value = {}", value);
  63. }
  64. boolean success = (Boolean) method.invoke(bean, value);
  65. // 失败重试
  66. if (!success) {
  67. success = (Boolean) method.invoke(bean, value);;
  68. if (!success) {
  69. log.warn("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = {}", value);
  70. monitorQueue.add(TimeUnit.DAYS,365, value, queueKey);
  71. }
  72. } else {
  73. if (log.isDebugEnabled()) {
  74. log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:execute successfully!values = {}", value);
  75. }
  76. }
  77. } else {
  78. if (log.isDebugEnabled()) {
  79. log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:monitorThreadRunning = {}", monitorStarted);
  80. }
  81. Thread.sleep(threadSheepTime);
  82. }
  83. } catch (Exception e) {
  84. log.error("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = " + value, e);
  85. }
  86. }
  87. log.info("Monitor Thread[" + Thread.currentThread().getName() + "] Completed...");
  88. });
  89. }
  90. log.info("thread pool is started...");
  91. }
  92. }
  1. /**
  2. * @program:
  3. * @description:
  4. * 延时队列接口实现类,
  5. * 使用redis的zset实现延时队列,
  6. * @author: jiangchengxuan
  7. * @created: 2023/12/09 23:34
  8. */
  9. public interface IDelayQueue <E> {
  10. /**
  11. * 向延时队列中添加数据
  12. *
  13. * @param score 分数
  14. * @param data 数据
  15. * @return true 成功 false 失败
  16. */
  17. boolean add(long score, E data,String queueKey);
  18. /**
  19. * 向延时队列中添加数据
  20. *
  21. * @param timeUnit 时间单位
  22. * @param time 延后时间
  23. * @param data 数据
  24. * @param queueKey
  25. * @return true 成功 false 失败
  26. */
  27. boolean add(TimeUnit timeUnit, long time, E data, String queueKey);
  28. /**
  29. * 从延时队列中获取数据
  30. * @param queueKey 队列key
  31. * @return 数据
  32. */
  33. String get(String queueKey);
  34. /**
  35. * 删除数据
  36. *
  37. * @param key
  38. * @param data 数据
  39. * @return
  40. */
  41. public<T> boolean rem(String key, T data) ;
  42. }
  1. /**
  2. * @program:
  3. * @description: redis操作类,封装了redis的操作方法,使用时直接注入即可使用,不需要关心redis的操作细节,使用时只需要关心业务逻辑即可
  4. * @author: jiangchengxuan
  5. * @created: 2023/12/09 23:35
  6. */
  7. @Service
  8. public class RedisDelayQueue implements IDelayQueue<String> {
  9. @Autowired
  10. private RedisService redisService;
  11. @Override
  12. public boolean add(long score, String data,String queueKey) {
  13. return redisService.opsForZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, data, score);
  14. }
  15. @Override
  16. public boolean add(TimeUnit timeUnit, long time, String data, String queueKey) {
  17. switch (timeUnit) {
  18. case SECONDS:
  19. return add(LocalDateTime.now().plusSeconds(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data, queueKey);
  20. case MINUTES:
  21. return add(LocalDateTime.now().plusMinutes(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
  22. case HOURS:
  23. return add(LocalDateTime.now().plusHours(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
  24. case DAYS:
  25. return add(LocalDateTime.now().plusDays(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
  26. default:
  27. return false;
  28. }
  29. }
  30. @Override
  31. public String get(String queueKey) {
  32. long now = System.currentTimeMillis();
  33. long min = Long.MIN_VALUE;
  34. Set<String> res = redisService.rangeByScoreZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, min, now, 0, 10);
  35. if (!CollectionUtils.isEmpty(res)) {
  36. for (String data : res){
  37. // 删除成功,则进行处理,防止并发获取重复数据
  38. if (rem(queueKey, data)){
  39. return data;
  40. }
  41. }
  42. }
  43. return null;
  44. }
  45. @Override
  46. public<T> boolean rem(String key, T data) {
  47. return redisService.remZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+key, data);
  48. }
  49. }
  1. 使用
  1. @RedisMessageQueue
  2. public class SomethingClass
  3. {
  4. @Autowired
  5. private IDelayQueue<String> messageQueue;
  6. /**
  7. * 生产者,向队列中添加数据,30秒后消费者进行消费
  8. */
  9. public void test(){
  10. messageQueue.add(TimeUnit.SECONDS,30L,"这是参数数据","new_queue");
  11. }
  12. /**
  13. * 消费者,如果按此配置的话,会启动一个线程,线程名称为:测试线程名称,线程数量为1,线程休眠时间为10毫秒
  14. * 注意:queueKey需要与生产者中的queueKey保持一致才能进行消费
  15. * @param data
  16. */
  17. @Override
  18. @RedisMessageQueueMethod(threadName = "测试线程名称",queueKey = "new_queue",threadNum = 1,threadSleepTime = 10)
  19. public void testMethod(String data) {
  20. //do something
  21. }
  22. }

原文链接:https://www.cnblogs.com/bearcanlight/p/17891151.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号