前言碎语
首先说明下需求,一个用户中心产品,用户在试用产品有三天的期限,三天到期后准时准点通知用户,试用产品到期了。这个需求如果不是准时通知,而是每天定点通知就简单了。如果需要准时通知就只能上延迟队列了。使用场景除了如上,典型的业务场景还有电商中的延时未支付订单失效等等。
延迟队列多种实现方式
- 1.如基于RabbitMQ的队列ttl+死信路由策略:通过设置一个队列的超时未消费时间,配合死信路由策略,到达时间未消费后,回会将此消息路由到指定队列
- 2.基于RabbitMQ延迟队列插件(rabbitmq-delayed-message-exchange):发送消息时通过在请求头添加延时参数(headers.put("x-delay", 5000))即可达到延迟队列的效果
- 3.使用redis的zset有序性,轮询zset中的每个元素,到点后将内容迁移至待消费的队列,(redisson已有实现)
- 4.使用redis的key的过期通知策略,设置一个key的过期时间为延迟时间,过期后通知客户端
redisson中的延迟队列实现
怎么封装便于业务使用。
1.首先定义一个延迟job,里面包含一个map参数,和队列执行器的具体实现class,触发任务执行时,map参数会被传递到具体的业务执行器实现内
- /**
- * Created by kl on 2018/7/20.
- * Content :延时job
- */
- public class DelayJob {
- private Map jobParams;//job执行参数
- private Class aClass;//具体执行实例实现
- }
2.定义一个延迟job执行器接口,业务需要实现这个接口,然后在execute方法内写自己的业务逻辑
- /**
- * Created by kl on 2018/7/20.
- * Content :延时job执行器接口
- */
- public interface ExecuteJob {
- void execute(DelayJob job);
- }
3.消费已经到点的延时job服务,通过job参数调用业务执行器实现
- @Component
- public class JobTimer {
- static final String jobsTag = "customer_jobtimer_jobs";
- @Autowired
- private RedissonClient client;
- @Autowired
- private ApplicationContext context;
- ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
- @PostConstruct
- public void startJobTimer() {
- RBlockingQueueblockingQueue = client.getBlockingQueue(jobsTag);
- new Thread() {
- @Override
- public void run() {
- while (true) {
- try {
- DelayJob job = blockingQueue.take();
- executorService.execute(new ExecutorTask(context, job));
- } catch (Exception e) {
- e.printStackTrace();
- try {
- TimeUnit.SECONDS.sleep(60);
- } catch (Exception ex) {
- }
- }
- }
- }
- }.start();
- }
- class ExecutorTask implements Runnable {
- private ApplicationContext context;
- private DelayJob delayJob;
- public ExecutorTask(ApplicationContext context, DelayJob delayJob) {
- this.context = context;
- this.delayJob = delayJob;
- }
- @Override
- public void run() {
- ExecuteJob service = (ExecuteJob) context.getBean(delayJob.getaClass());
- service.execute(delayJob);
- }
- }
- }
4.封装延时job服务
- /**
- * Created by kl on 2018/7/20.
- * Content :延时job服务
- */
- @Component
- public class DelayJobService {
- @Autowired
- private RedissonClient client;
- public void submitJob(DelayJob job, Long delay, TimeUnit timeUnit){
- RBlockingQueueblockingQueue = client.getBlockingQueue(JobTimer.jobsTag);
- RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue);
- delayedQueue.offer(job,delay,timeUnit);
- }
- }
文末结语
redisson作为一个分布式利器,这么好用的工具没人用有点可惜,还有一个原因是有个想法,想将延迟队列这个功能封装成一个spring boot的start依赖,然后开源出来,造福四方,希望大家以后多多支持w3xue!