概念

参考文章:
并发框架——Distruptor_disruptor-CSDN博客
Disruptor 是一种高效的”生产者-消费者”模型,性能远远高于传统的 BlockingQueue 容器。BlockingQueue是基于锁实现的,而锁的效率通常较低。没有使用CAS机制实现的生产者-消费者
Disruptor使用观察者模式,主动将消息发送给消费者,而不是等消费者从队列中取;在无锁的情况下, 实现queue(环形, RingBuffer)的并发操作, 性能远高于BlockingQueue
其实就是一个事件发布监听的框架~

基本使用

1、导入依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

2、创建事件

@Data
public class NotifyEvent<T> { 

    private T data;

    /**
     * 清空数据
     */
    public void clear() { 
        this.data = null;
    } 

} 

3、创建事件工厂

public class NotifyEventFactory implements EventFactory<NotifyEvent> { 
    @Override
    public NotifyEvent newInstance() { 
        return new NotifyEvent();
    } 
} 

4、创建消息实体

@Data
@Builder
public class NotifyInfo { 
    private Integer msg;
} 

5、创建事件监听器(消费者)

@Slf4j
public class NotifyEventHandler implements EventHandler<NotifyEvent<NotifyInfo>> { 

    @Override
    public void onEvent(NotifyEvent<NotifyInfo> event, long sequence, boolean endOfBatch) throws InterruptedException { 
        System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + ":" + event.getData());
        Thread.sleep(3000);
    } 

} 

6、调用测试

public class Test1 { 

    public static void main(String[] args) throws InterruptedException { 
        // 初始化事件工厂
        EventFactory eventFactory = new NotifyEventFactory();
        // 指定环形队列大小,只能是 2 的次方
        int ringBufferSize = 128;
        // 创建线程工厂,用于给 Disruptor 创建线程,可以在这里指定线程配置
        ThreadFactory threadFactory = new ThreadFactory() { 
            @Override
            public Thread newThread(Runnable r) { 
                Thread thread = new Thread(r, "Notify");
                // 将此线程标记为守护进程线程或用户线程。当唯一运行的线程都是守护进程线程时,Java虚拟机将退出。不然这个线程会一直运行
                thread.setDaemon(true);
                return thread;
            } 
        } ;

        // 初始化事件监听器,可以在 spring 中注册为 bean
        NotifyEventHandler eventHandler = new NotifyEventHandler();
        // 创建 Disruptor
        Disruptor<NotifyEvent<NotifyInfo>> disruptor = new Disruptor(eventFactory,
                ringBufferSize,
                threadFactory);
        // 指定事件处理器
        disruptor.handleEventsWith(eventHandler);
        // 启动 disruptor
        disruptor.start();

        // 尝试异步发布事件
        for (int i = 0; i < 2; i++) { 
            int finalI = i;
            // 发布事件
            disruptor.publishEvent((element, sequence) -> { 
                element.setData(NotifyInfo.builder().msg(finalI).build());
            } );
        } 

        // 因为 disruptor 线程设置为 守护线程,所以需要在主线程等待他们执行完成
        Thread.sleep(10000);
    } 
} 

运行结果如下:即使是异步发布事件的,但却是按顺序触发监听器的
image.png

源码分析

1、首先查看构造方法
image.png
2、我们传进来的 threadFactory 会被 BasicExecutor 封装起来,成为 Disruptor 的 executor 变量的值,用于创建线程
image.png
3、接着查看 disruptor.handleEventsWith() 方法,将我们的事件监听器保存起来
image.png
image.png
4、在这个过程中将 eventHandler 封装成 batchEventProcessor,并添加到 Disruptor 的 consumerRepository 属性中
image.png
5、而 ConsumerRepository 继承了 Iterable 迭代器,内容为 consumerInfos
image.png
6、好了,到了这里就算是把监听器添加进去了,然后就是查看启动方法
image.png
7、这里将遍历上方的 consumerRepository,所以这里调用的 consumerInfo.start() 方法,其实就是调用了 EventProcessorInfo 类的 start() 方法
而 executor 属性则是我们一开始传进来的 threadFactory 包装类
image.png
image.png
8、所以实际的调用方法就是,把 batchEventProcessor 当作 runnable 传给 threadFactory 创建线程并启动
image.png
9、所以 BatchEventProcessor 也实现了 Runnable 接口,重写 run() 方法
image.png
10、查看 BatchEventProcessor 的 run() 方法

@Override
public void run()
{ 
    // 使用 cas
    if (running.compareAndSet(IDLE, RUNNING))
    { 
        sequenceBarrier.clearAlert();

        // 启动前通知
        notifyStart();
        try
        { 
            if (running.get() == RUNNING)
            { 
                // 启动事件监听方法
                processEvents();
            } 
        } 
        finally
        { 
            notifyShutdown();
            running.set(IDLE);
        } 
    } 
    else
    { 
        // This is a little bit of guess work.  The running state could of changed to HALTED by
        // this point.  However, Java does not have compareAndExchange which is the only way
        // to get it exactly correct.
        if (running.get() == RUNNING)
        { 
            throw new IllegalStateException("Thread is already running");
        } 
        else
        { 
            earlyExit();
        } 
    } 
} 

11、查看 notifyStart() 方法,如果我们的 eventHandler 实现了 LifecycleAware 接口,则会在启动前调用 onStart() 方法
image.png

  1. Disruptor 就使用一个线程(而不是线程池)一直在循环判断是否有事件发布

  2. 当接收到事件时,则依次调用监听器的 onEvent() 方法,发送对应的事件

  3. 所以如果该线程不是守护线程的话,即使主线程结束,该监听线程也会一直运行

    private void processEvents()
     { 
         T event = null;
         long nextSequence = sequence.get() + 1L;
    
         while (true)
         { 
             try
             { 
                 // 循环判断是否有事件发布
                 final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                 if (batchStartAware != null)
                 { 
                     batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                 } 
    
                 // 接收到事件,依次调用 eventHandler.onEvent() 方法
                 while (nextSequence <= availableSequence)
                 { 
                     event = dataProvider.get(nextSequence);
                     eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                     nextSequence++;
                 } 
    
                 sequence.set(availableSequence);
             } 
             catch (final TimeoutException e)
             { 
                 notifyTimeout(sequence.get());
             } 
             catch (final AlertException ex)
             { 
                 if (running.get() != RUNNING)
                 { 
                     break;
                 } 
             } 
             catch (final Throwable ex)
             { 
                 handleEventException(ex, nextSequence, event);
                 sequence.set(nextSequence);
                 nextSequence++;
             } 
         } 
     } 

    13、再回过头来看 start() 方法,Disruptor 对一个监听器会单独创建一个线程,并且创建完成之后这个线程便会一直循环监听事件,并调用该线程对应的 onEvent() 方法
    image.png
    不过我感觉对于同一个事件配置多个监听器并没有多大作用,配置多个的话,就是重复触发啦,除非每个监听器做不同的事情~
    所以总的来说,“Disruptor 发布事件” 和 “事件监听触发” 是分来线程来的,事件在主线程发布,而监听和触发是又一个单独的线程处理的