经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
两个例子带你入门 Disruptor
来源:cnblogs  作者:勇哥编程游记  时间:2023/9/20 9:40:06  对本文有异议

Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列。很多知名开源项目里,比如 canal 、log4j2、 storm 都是用了 Disruptor 以提升系统性能 。

这篇文章,我们通过两个例子一步一个脚印帮助同学们入门 Disruptor 。

1 环形缓冲区

下图展示了 Disruptor 的流程图 。

和线程池机制非常类似, Disruptor 也是非常典型的生产者/消费者模式。线程池存储提交任务的容器是阻塞队列,而 Disruptor 使用的是环形缓冲区 RingBuffer

环形缓冲区的设计相比阻塞队列有如下优点:

  • 环形数组结构

为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。

  • 元素位置定位

数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式,不用担心 index 溢出的问题。index 是 long 类型,即使100万QPS的处理速度,也需要30万年才能用完。

  • 无锁设计

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

2 写一个Hello world

我们写一个非常简单的例子:生产者传递一个单一的长整型值给消费者,而消费者将简单地打印出这个值

2.1 添加依赖

  1. <dependency>
  2. <groupId>com.lmax</groupId>
  3. <artifactId>disruptor</artifactId>
  4. <version>3.3.6</version>
  5. </dependency>

2.2 定义事件

首先,我们将定义一个事件(Event),它将携带数据,并且在接下来的所有示例中都是通用的。

  1. public class LongEvent {
  2. private long value;
  3. public void set(long value) {
  4. this.value = value;
  5. }
  6. @Override
  7. public String toString() {
  8. return "LongEvent{" + "value=" + value + '}';
  9. }
  10. }

为了让 Disruptor 为我们预分配这些事件,我们需要一个 EventFactory 来执行构造。这可以是一个方法引用,比如 LongEvent::new ,或者是 EventFactory 接口的显式实现:

  1. public class LongEventFactory implements EventFactory<LongEvent> {
  2. @Override
  3. public LongEvent newInstance() {
  4. return new LongEvent();
  5. }
  6. }

2.3 定义消费者

定义了事件,我们需要创建一个消费者来处理这些事件。我们会创建一个事件处理器(EventHandler),它会将把值打印到控制台上。

  1. public class LongEventHandler implements EventHandler<LongEvent> {
  2. @Override
  3. public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception {
  4. System.out.println("currentThread:" + Thread.currentThread().getName() + " Event: " + longEvent);
  5. }
  6. }

2.4 发布

  1. public class LongEventMain {
  2. public static void main(String[] args) throws Exception {
  3. int bufferSize = 2;
  4. Disruptor<LongEvent> disruptor =
  5. new Disruptor<>(
  6. new LongEventFactory(),
  7. bufferSize,
  8. DaemonThreadFactory.INSTANCE,
  9. ProducerType.SINGLE,
  10. new BlockingWaitStrategy());
  11. disruptor.handleEventsWith(new LongEventHandler());
  12. disruptor.start();
  13. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
  14. ByteBuffer bb = ByteBuffer.allocate(8);
  15. for (long l = 0; true; l++) {
  16. bb.putLong(0, l);
  17. ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
  18. Thread.sleep(1000);
  19. }
  20. }
  21. }

整个发布流程分为四个部分:

  1. 指定环形缓冲区的大小,必须是2的幂次方,例子中设置的值是 1024 ;

  2. 构建 Disruptor ,参数分别是事件工厂EventFactory 环形缓冲区的大小ringBufferSize 处理器线程池生产者类型(单生产者/多生产者)、消费者阻塞策略

  3. 定义事件处理器eventHandler,我们这里的逻辑是打印数据打印在控制台;

  4. 启动 Disruptor,从 Disruptor 中获取环形缓冲区ringBuffer,在 for 循环里 ,调用环形队列的publishEvent方法。

    这里使用了 ByteBuffer 做为数据的存储容器 , 方便作为参数传递。

我们来看下执行结果 :

3 日志处理

3.1 应用场景

上面的例子比较简单,但假如要应用到生产环境,就显得非常粗糙。

我们模拟一个日志处理的场景,用户进入视频播放页面,浏览器定时的发送浏览日志到服务端,服务端将日志存储起来。

3.2 核心类设计

我们定义一个 DisruptorManager 管理器 , 管理器包含三个核心参数:消费者监听器 DataEventListener消费者数量环形队列长度

  1. public class DisruptorManager<T> {
  2. private static final Integer DEFAULT_CONSUMER_SIZE = 4;
  3. public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;
  4. private DataEventListener<T> dataEventListener;
  5. private DisruptorProducer<T> producer;
  6. private int ringBufferSize;
  7. private int consumerSize;
  8. public DisruptorManager(DataEventListener<T> dataEventListener) {
  9. this(dataEventListener, DEFAULT_CONSUMER_SIZE, DEFAULT_SIZE);
  10. }
  11. public DisruptorManager(DataEventListener<T> dataEventListener, final int consumerSize, final int ringBufferSize) {
  12. this.dataEventListener = dataEventListener;
  13. this.ringBufferSize = ringBufferSize;
  14. this.consumerSize = consumerSize;
  15. }
  16. public void start() {
  17. EventFactory<DataEvent<T>> eventFactory = new DisruptorEventFactory<>();
  18. Disruptor<DataEvent<T>> disruptor = new Disruptor<>(
  19. eventFactory,
  20. ringBufferSize,
  21. DisruptorThreadFactory.create("consumer-thread", false),
  22. ProducerType.MULTI,
  23. new BlockingWaitStrategy()
  24. );
  25. DisruptorConsumer<T>[] consumers = new DisruptorConsumer[consumerSize];
  26. for (int i = 0; i < consumerSize; i++) {
  27. consumers[i] = new DisruptorConsumer<>(dataEventListener);
  28. }
  29. disruptor.handleEventsWithWorkerPool(consumers);
  30. disruptor.start();
  31. RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
  32. this.producer = new DisruptorProducer<>(ringBuffer, disruptor);
  33. }
  34. public DisruptorProducer getProducer() {
  35. return this.producer;
  36. }
  37. }

首先和 Hello world 代码中的不同的点,Disruptor 的构造函数中我们自定义了消费者的处理器线程。

  1. DisruptorThreadFactory.create("consumer-thread", false),

然后我们定义消费者的业务逻辑 :

  1. DisruptorConsumer<T>[] consumers = new DisruptorConsumer[consumerSize];
  2. for (int i = 0; i < consumerSize; i++) {
  3. consumers[i] = new DisruptorConsumer<>(dataEventListener);
  4. }
  5. disruptor.handleEventsWithWorkerPool(consumers);

消费者本质上是workHandler的实现类,只不过初始化时将 DataEventListener 作为构造函数的参数。

  1. public class DisruptorConsumer<T> implements WorkHandler<DataEvent<T>> {
  2. private DataEventListener<T> dataEventListener;
  3. public DisruptorConsumer(DataEventListener dataEventListener) {
  4. this.dataEventListener = dataEventListener;
  5. }
  6. @Override
  7. public void onEvent(DataEvent<T> dataEvent) throws Exception {
  8. if (dataEvent != null) {
  9. dataEventListener.processDataEvent(dataEvent);
  10. }
  11. }
  12. }

因为我们是希望线程池并行的处理这些消息数据,使用的是disruptor.handleEventsWithWorkerPool 可以保证每个事件只会由一个工作处理器处理

在 springboot 项目中,我们需要初始化相关 bean。

  1. @Configuration
  2. @AutoConfigureBefore(RedisConfig.class)
  3. public class DisruptorConfig {
  4. private final static Logger logger = LoggerFactory.getLogger(DisruptorConfig.class);
  5. private final static String LIST_KEY = "disruptor:list";
  6. @Autowired
  7. private RedissonClient redissonClient;
  8. @Bean
  9. public DataEventListener<String> createConsumerListener() {
  10. DataEventListener<String> dataEventListener = new DataEventListener<String>() {
  11. @Override
  12. public void processDataEvent(DataEvent<String> dataEvent) throws InterruptedException {
  13. logger.info("processDateEvent data:" + dataEvent.getData());
  14. redissonClient.getList(LIST_KEY).add(dataEvent.getData());
  15. }
  16. };
  17. return dataEventListener;
  18. }
  19. @Bean
  20. public DisruptorProducer<String> createProducer(DataEventListener dataEventListener) {
  21. DisruptorManager disruptorManage = new DisruptorManager(dataEventListener,
  22. 8,
  23. 1024 * 1024);
  24. disruptorManage.start();
  25. return disruptorManage.getProducer();
  26. }

首先,我们定义好消费者的事件监听器,然后定义 DisruptorProducer, 该类用来将数据提交到环形队列。

  1. public class DisruptorProducer<T> {
  2. private final Logger logger = LoggerFactory.getLogger(DisruptorProducer.class);
  3. private final RingBuffer<DataEvent<T>> ringBuffer;
  4. private final Disruptor<DataEvent<T>> disruptor;
  5. private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t);
  6. public DisruptorProducer(final RingBuffer<DataEvent<T>> ringBuffer, final Disruptor<DataEvent<T>> disruptor) {
  7. this.ringBuffer = ringBuffer;
  8. this.disruptor = disruptor;
  9. }
  10. /**
  11. * Send a data.
  12. *
  13. * @param data the data
  14. */
  15. public void onData(final T data) {
  16. try {
  17. ringBuffer.publishEvent(translatorOneArg, data);
  18. } catch (Exception ex) {
  19. logger.error("publish event error:", ex);
  20. }
  21. }
  22. public void shutdown() {
  23. if (null != disruptor) {
  24. disruptor.shutdown();
  25. }
  26. }
  27. }

最后,在控制器中,接收前端请求:

  1. @Autowired
  2. private DisruptorProducer<String> producer;
  3. @GetMapping("/pushlog")
  4. public ResponseEntity pushlog(String log) {
  5. producer.onData(log);
  6. return ResponseEntity.successResult(null);
  7. }

从下图中,我们可以看到从控制器接收到请求后,消费处理器线程不断地将数据打印出来,并且发送到队列中。

4 写到最后

日志处理的例子里,我们试图封装 Disruptor 相关 API ,以便在 springboot 项目中更方便的使用。

笔者在测试过程时,发现若消费逻辑慢的时候,生产者发送数据事件时,可能会阻塞。

为什么生产者会阻塞,Disruptor 的核心原理是什么 ,如何使用 Disruptor 的高级特性顺序依赖执行 ?

正因为有这些疑问,笔者觉得深入理解 Disruptor 原理特别有必要,笔者也会在接下来的文章里一一为大家答疑解惑。


参考资料:

https://lmax-exchange.github.io/disruptor/disruptor.html

https://zhuanlan.zhihu.com/p/45575008

原文链接:https://www.cnblogs.com/makemylife/p/17714364.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号