log4j2 异步日志(二)-- 高性能队列 Disruptor 的实现

2020-11-08 18:46:54   最后更新: 2020-11-09 18:54:28   访问数量:57




上一篇文章中,我们介绍了 Log4j2 异步日志的实现 -- AsyncAppender:

log4j2 异步日志(一) -- AsyncAppender

 

在文章中提到,log4j2 的异步日志是通过队列来处理的,关于队列,Log4j2 支持生成以下四种队列:

  1. ArrayBlockingQueue -- 默认的队列,通过 java 原生的 ArrayBlockingQueue 实现。
  2. DisruptorBlockingQueue -- disruptor 包实现的高性能队列。
  3. JCToolsBlockingQueue -- JCTools 实现的无锁队列。
  4. LinkedTransferQueue -- 通过 java7 以上原生支持的 LinkedTransferQueue 实现。

 

在上述四中中,默认的是 ArrayBlockingQueue,最为推荐的是 disruptor 包实现的高性能队列 DisruptorBlockingQueue,他是英国外汇交易公司 LMAX 开源的、用于替代并发线程间数据交换的环形队列的、基本无锁的开源线程间通信框架:

https://github.com/LMAX-Exchange/disruptor

 

那么,究竟 DisruptorBlockingQueue 的高性能队列是如何实现的呢?本文我们就来一探究竟。

 

还记得我们在 mysql 系列文章中关于 redolog 的讲解吗,redolog 是通过一个环形的存储区域实现其循环写入的:

 

 

在 linux 内核中,进程间通信所使用的 fifo 也是通过环形存储区域来实现的。

显而易见,环形存储区域有很多好处:

  1. 基于数组实现,内存被循环使用,减少了内存分配、回收扩容等操作。
  2. 对于只有单个读取和写入进程的场景下,读取写入分别在环的不同位置进行,因此,读写过程无需加锁,从而能够让缓存的读写更为高效。

 

disruptor 正是借鉴这一思想,使用环形队列实现缓冲,从而提升缓冲的性能。

 

3.1 RingBuffer

上面我们已经介绍了 RingBuffer 的优势,由于 RingBuffer 实现了空间的循环利用,一次开辟,即可一直驻留在内存中,降低了 GC 的压力。

 

3.2 无锁

此前我们介绍过 synchronized 的实现原理:

synchronized 的使用及实现原理

 

我们知道,锁机制由于需要对底层硬件的访问进行限制,操作系统必须在用户态与内核态之间反复切换,从而导致这一过程的性能非常低下,因此在高并发环境下,最好是避免锁的使用。

disruptor 正是通过 RingBuffer 等多种机制避免了对锁的过度依赖。

disruptor 提供了单生产者、多生产者、单消费者、多消费者组等多种模型供不同的场景中可以灵活使用,在这些模式下,disruptor 尽量通过 Unsafe 包中的 CAS 操作结合自旋的方式避免了锁的使用,从而让整个实现十分简洁而高效。

 

4.1 生产者

4.1.1 单生产者

相对于多生产者模型而言,单生产者模型显然更为简单,我们来看看他是怎么实现的:

 

// Disruptor public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, final A arg) { ringBuffer.publishEvent(eventTranslator, arg); } // RingBuffer public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { long sequence = this.sequencer.next(); this.translateAndPublish(translator, sequence, arg0); } public long next() { return this.next(1); } public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } else { // 获取上次数据写入位置 long nextValue = this.pad.nextValue; // 获取本次数据写入位置 long nextSequence = nextValue + (long)n; // 计算成环点 long wrapPoint = nextSequence - (long)this.bufferSize; // 消费者下次消费位置 long cachedGatingSequence = this.pad.cachedValue; // 缓存位置不足,自旋等待 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence; while(wrapPoint > (minSequence = Util.getMinimumSequence(this.gatingSequences, nextValue))) { Thread.yield(); } // 被唤醒说明有消费者消费,更新消费位置 this.pad.cachedValue = minSequence; } // 获取数据插入位置并返回 this.pad.nextValue = nextSequence; return nextSequence; } }

 

 

可以看到,整个获取写入位置的代码并不复杂,即在 RingBuffer 中获取写入位置,如果 RingBuffer 空间不足,则调用 yield 等待 consumer 唤醒,一旦位置充足,则返回写入位置,之后,调用 translateAndPublish 方法发布数据。

 

4.1.2 多生产者模型

多生产者模型下,disruptor 通过对不同生产者进行隔离实现了生产过程的无冲突,也就是说,每个生产者只能对 RingBuffer 上分配给自己的独立空间进行写入,但这样一来,就引入了一个新的问题,由于 RingBuffer 不再是连续的,consumer 怎么知道到哪里去获取数据呢?解决方法也很简单,disruptor 引入了一个额外的缓冲区 availableBuffer,他的长度与 RingBuffer 长度相同,因此,他的槽位与 RingBuffer 的槽位一一对应,一旦有数据写入就在 availableBuffer 的对应位置置1,消费后则置 0,从而让读取的时候明确获知下一位置。

availableBuffer 在使用中,虽然被多个生产者划分为多个区域,实际上,每个生产者在操作自己所持有的 availableBuffer 片段时,也是将这个片段作为一个 RingBuffer 来使用,这样巧妙地转化,便让多生产模型完全可以复用单生产者模型中的实现,因此在多生产者模型下,写入流程与单生产者模型并无太大区别,仅在 next 方法与 publish 方法的实现上有所区别:

public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } else { long current; long next; do { while(true) { // 获取下一写入位置 current = this.cursor.get(); next = current + (long)n; // 计算持有的 availableBuffer 片段的城环段 long wrapPoint = next - (long)this.bufferSize; // 计算下一消费位置 long cachedGatingSequence = this.gatingSequenceCache.get(); if (wrapPoint <= cachedGatingSequence && cachedGatingSequence <= current) { break; } // 自旋等待 long gatingSequence = Util.getMinimumSequence(this.gatingSequences, current); if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1L); } else { this.gatingSequenceCache.set(gatingSequence); } } } while(!this.cursor.compareAndSet(current, next)); return next; } }

 

 

可以看到,通过自旋与缓存切片的方式,多生产者模型下,成功避免了锁的使用,实现了高效的生产操作。

 

4.2 消费者

4.2.1 消费者等待策略

为了应对不同的使用场景,disruptor 的消费者实现了多套等待策略:

策略 实现方式 适用场景
BlockingWaitStrategy 加锁 适用于 CPU 资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy 自旋 适用于线程绑定到固定的CPU的场景
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 适用于 CPU 资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy 自旋 + yield + sleep 性能和 CPU 资源之间有很好的折中,延迟不均匀
TimeoutBlockingWaitStrategy 加锁,有超时限制 适用于 CPU 资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和 CPU 资源之间有很好的折中,延迟比较均匀

 

4.2.2 消费者的实现方式

下面是 disruptor 的类图:

 

 

如图所示,EventProcessor 是整个消费者事件处理框架,EventProcessor 接口继承了 Runnable 接口,主要有两种实现:

  1. 单线程批量处理 BatchEventProcessor
  2. 多线程处理 WorkProcessor

 

那么,这两种实现方式有什么区别,我们又该如何使用呢?

在 EventHandlerGroup 类中,实现了两个方法:

public EventHandlerGroup<T> handleEventsWith(EventProcessorFactory<T>... eventProcessorFactories) { return this.disruptor.createEventProcessors(this.sequences, eventProcessorFactories); } public EventHandlerGroup<T> handleEventsWithWorkerPool(WorkHandler<? super T>... handlers) { return this.disruptor.createWorkerPool(this.sequences, handlers); }

 

 

他们作为入口,区分了上述两套实现:

  1. 广播模式 -- 使用 handleEventsWith 方法传入多个 EventHandler,内部使用多个 BatchEventProcessor 关联多个线程执行,是典型的发布订阅模式,同一事件会被多个消费者并行消费,适用于同一事件触发多种操作。每个 BatchEventProcessor 是单线程的任务链,任务执行有序且非常快。
  2. 集群消费模式 -- 使用 handleEventsWithWorkerPool 方法传入多个WorkHandler时,内部使用多个 WorkProcessor 关联多个线程执行,类似于 JMS 的点对点模式,同一事件会被一组消费者其中之一消费,适用于提升消费者并行处理能力,每个 WorkProcessor 内部实现是多线程的,无法保证任务执行的顺序

由于代码整体非常易读,但整体流程比较长,较为繁琐,这里不展开了,有兴趣可以从上述两个方法为入口深入进去一探究竟。

 

下图是官网提供的 log4j2 的性能压测结果:

 

 

图中 Loggers all async 就是使用 Disruptor 后的日志处理统计,而 Async Appender 则是使用 ArrayBlockingQueue 作为队列。

可见,在 64 个线程的时候,loggers all async 的吞吐量比 Async Appender 增加了 12 倍,是 Sync 模式的 68 倍,性能提升实在可以说是效果显著!

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,全部原创,只有干货没有鸡汤

 

 

 






log      日志      java      队列      异步      log4j2      disruptor     


京ICP备15018585号