概念
参考文章:
并发框架——Distruptor_disruptor-CSDN博客
Disruptor 是一种高效的”生产者-消费者”模型,性能远远高于传统的 BlockingQueue 容器。BlockingQueue是基于锁实现的,而锁的效率通常较低。没有使用CAS机制实现的生产者-消费者
Disruptor使用观察者模式,主动将消息发送给消费者,而不是等消费者从队列中取;在无锁的情况下, 实现queue(环形, RingBuffer)的并发操作, 性能远高于BlockingQueue
其实就是一个事件发布监听的框架~
基本使用
1、导入依赖
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
2、创建事件
@Data
public class NotifyEvent<T> {
private T data;
/**
* 清空数据
*/
public void clear() {
this.data = null;
}
}
3、创建事件工厂
public class NotifyEventFactory implements EventFactory<NotifyEvent> {
@Override
public NotifyEvent newInstance() {
return new NotifyEvent();
}
}
4、创建消息实体
@Data
@Builder
public class NotifyInfo {
private Integer msg;
}
5、创建事件监听器(消费者)
@Slf4j
public class NotifyEventHandler implements EventHandler<NotifyEvent<NotifyInfo>> {
@Override
public void onEvent(NotifyEvent<NotifyInfo> event, long sequence, boolean endOfBatch) throws InterruptedException {
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + ":" + event.getData());
Thread.sleep(3000);
}
}
6、调用测试
public class Test1 {
public static void main(String[] args) throws InterruptedException {
// 初始化事件工厂
EventFactory eventFactory = new NotifyEventFactory();
// 指定环形队列大小,只能是 2 的次方
int ringBufferSize = 128;
// 创建线程工厂,用于给 Disruptor 创建线程,可以在这里指定线程配置
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Notify");
// 将此线程标记为守护进程线程或用户线程。当唯一运行的线程都是守护进程线程时,Java虚拟机将退出。不然这个线程会一直运行
thread.setDaemon(true);
return thread;
}
} ;
// 初始化事件监听器,可以在 spring 中注册为 bean
NotifyEventHandler eventHandler = new NotifyEventHandler();
// 创建 Disruptor
Disruptor<NotifyEvent<NotifyInfo>> disruptor = new Disruptor(eventFactory,
ringBufferSize,
threadFactory);
// 指定事件处理器
disruptor.handleEventsWith(eventHandler);
// 启动 disruptor
disruptor.start();
// 尝试异步发布事件
for (int i = 0; i < 2; i++) {
int finalI = i;
// 发布事件
disruptor.publishEvent((element, sequence) -> {
element.setData(NotifyInfo.builder().msg(finalI).build());
} );
}
// 因为 disruptor 线程设置为 守护线程,所以需要在主线程等待他们执行完成
Thread.sleep(10000);
}
}
运行结果如下:即使是异步发布事件的,但却是按顺序触发监听器的
源码分析
1、首先查看构造方法
2、我们传进来的 threadFactory 会被 BasicExecutor 封装起来,成为 Disruptor 的 executor 变量的值,用于创建线程
3、接着查看 disruptor.handleEventsWith() 方法,将我们的事件监听器保存起来

4、在这个过程中将 eventHandler 封装成 batchEventProcessor,并添加到 Disruptor 的 consumerRepository 属性中
5、而 ConsumerRepository 继承了 Iterable 迭代器,内容为 consumerInfos
6、好了,到了这里就算是把监听器添加进去了,然后就是查看启动方法
7、这里将遍历上方的 consumerRepository,所以这里调用的 consumerInfo.start() 方法,其实就是调用了 EventProcessorInfo 类的 start() 方法
而 executor 属性则是我们一开始传进来的 threadFactory 包装类

8、所以实际的调用方法就是,把 batchEventProcessor 当作 runnable 传给 threadFactory 创建线程并启动
9、所以 BatchEventProcessor 也实现了 Runnable 接口,重写 run() 方法
10、查看 BatchEventProcessor 的 run() 方法
@Override
public void run()
{
// 使用 cas
if (running.compareAndSet(IDLE, RUNNING))
{
sequenceBarrier.clearAlert();
// 启动前通知
notifyStart();
try
{
if (running.get() == RUNNING)
{
// 启动事件监听方法
processEvents();
}
}
finally
{
notifyShutdown();
running.set(IDLE);
}
}
else
{
// This is a little bit of guess work. The running state could of changed to HALTED by
// this point. However, Java does not have compareAndExchange which is the only way
// to get it exactly correct.
if (running.get() == RUNNING)
{
throw new IllegalStateException("Thread is already running");
}
else
{
earlyExit();
}
}
}
11、查看 notifyStart() 方法,如果我们的 eventHandler 实现了 LifecycleAware 接口,则会在启动前调用 onStart() 方法
Disruptor 就使用一个线程(而不是线程池)一直在循环判断是否有事件发布
当接收到事件时,则依次调用监听器的 onEvent() 方法,发送对应的事件
所以如果该线程不是守护线程的话,即使主线程结束,该监听线程也会一直运行
private void processEvents() { T event = null; long nextSequence = sequence.get() + 1L; while (true) { try { // 循环判断是否有事件发布 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } // 接收到事件,依次调用 eventHandler.onEvent() 方法 while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (running.get() != RUNNING) { break; } } catch (final Throwable ex) { handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } }13、再回过头来看 start() 方法,Disruptor 对一个监听器会单独创建一个线程,并且创建完成之后这个线程便会一直循环监听事件,并调用该线程对应的 onEvent() 方法

不过我感觉对于同一个事件配置多个监听器并没有多大作用,配置多个的话,就是重复触发啦,除非每个监听器做不同的事情~
所以总的来说,“Disruptor 发布事件” 和 “事件监听触发” 是分来线程来的,事件在主线程发布,而监听和触发是又一个单独的线程处理的
- 本文链接:https://lxjblog.gitee.io/2024/06/06/Disruptor/
- 版权声明:本博客所有文章除特别声明外,均默认采用 许可协议。