经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
延迟队列的常用的实现方式
来源:cnblogs  作者:废物大师兄  时间:2021/4/6 10:21:40  对本文有异议

延迟队列的使用场景还比较多,例如:

1、超时未收到支付回调,主动查询支付状态;

2、规定时间内,订单未支付,自动取消;

。。。

总之,但凡需要在未来的某个确定的时间点执行检查的场景中都可以用延迟队列。

常见的手段主要有:定时任务扫描、RocketMQ延迟队列、Java自动的延迟队列、监听Redis Key过期等等

1.  DelayQueue

首先,定义一个延迟任务

  1. package com.cjs.example;
  2. import lombok.Data;
  3. import java.util.concurrent.Delayed;
  4. import java.util.concurrent.TimeUnit;
  5. /**
  6. * @author ChengJianSheng
  7. * @since 2021/3/18
  8. */
  9. @Data
  10. public class DelayTask implements Delayed {
  11. private Long orderId;
  12. private long expireTime;
  13. public DelayTask(Long orderId, long expireTime) {
  14. this.orderId = orderId;
  15. this.expireTime = expireTime;
  16. }
  17. @Override
  18. public long getDelay(TimeUnit unit) {
  19. return expireTime - System.currentTimeMillis();
  20. }
  21. @Override
  22. public int compareTo(Delayed o) {
  23. return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
  24. }
  25. }

然后,定义一个管理类

  1. package com.cjs.example;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.CommandLineRunner;
  5. import org.springframework.stereotype.Component;
  6. import java.util.concurrent.DelayQueue;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. /**
  10. * @author ChengJianSheng
  11. * @since 2021/3/19
  12. */
  13. @Slf4j
  14. @Component
  15. public class DelayQueueManager implements CommandLineRunner {
  16. private DelayQueue<DelayTask> queue = new DelayQueue<>();
  17. @Autowired
  18. private ParkOrderQueryHandler handler;
  19. @Override
  20. public void run(String... strings) throws Exception {
  21. ExecutorService executorService = Executors.newSingleThreadExecutor();
  22. executorService.execute(new Runnable() {
  23. @Override
  24. public void run() {
  25. while (true) {
  26. try {
  27. DelayTask task = queue.take();
  28. handler.handle(task);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. });
  35. }
  36. public void put(DelayTask task) {
  37. queue.put(task);
  38. }
  39. }

插入任务

  1. @Slf4j
  2. @Service
  3. public class PayServiceImpl implements PayService {
  4. @Autowired
  5. private DelayQueueManager delayQueueManager;
  6. @Override
  7. public void pay() {
  8. delayQueueManager.put(new DelayTask(1, 15));
  9. delayQueueManager.put(new DelayTask(2, 30));
  10. delayQueueManager.put(new DelayTask(3, 60));
  11. }
  12. }

2.  Redis Key过期回调

修改redis.conf文件

  1. # bind 127.0.0.1 -::1
  2. protected-mode no
  3. notify-keyspace-events Ex

  1. [root@localhost redis-6.2.1]$ src/redis-server redis.conf
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.4.4</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.example</groupId>
  12. <artifactId>demo0401</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>demo0401</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-data-redis</artifactId>
  23. </dependency>
  24. </dependencies>
  25. <build>
  26. <plugins>
  27. <plugin>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-maven-plugin</artifactId>
  30. </plugin>
  31. </plugins>
  32. </build>
  33. </project>

RedisConfig.java

  1. package com.example.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.data.redis.connection.RedisConnectionFactory;
  5. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  6. /**
  7. * @author ChengJianSheng
  8. * @since 2021/4/2
  9. */
  10. @Configuration
  11. public class RedisConfig {
  12. @Bean
  13. public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
  14. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  15. container.setConnectionFactory(connectionFactory);
  16. return container;
  17. }
  18. }

创建一个监听类

  1. package com.example.listener;
  2. import org.springframework.data.redis.connection.Message;
  3. import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
  4. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author ChengJianSheng
  8. * @since 2021/4/2
  9. */
  10. @Component
  11. public class MyRedisKeyExpirationListener extends KeyExpirationEventMessageListener {
  12. public MyRedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
  13. super(listenerContainer);
  14. }
  15. @Override
  16. public void onMessage(Message message, byte[] pattern) {
  17. String expiredKey = message.toString();
  18. System.out.println("监听到Key: " + expiredKey + " 已过期");
  19. }
  20. }

3.  RocketMQ

 

https://help.aliyun.com/document_detail/29549.htm 

 

原文链接:http://www.cnblogs.com/cjsblog/p/14612169.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号