log4j2 异步日志 -- AsyncAppender

2020-10-17 13:24:05   最后更新: 2020-10-27 10:01:43   访问数量:48




在我们的工程项目中,日志记录是必不可少的,在 java 项目中,我们通常会使用 log4j、logback、log4j2 等等组件中的一个来实现日志的记录。

首先,日志记录需要进行磁盘 IO 操作,这无疑是非常耗时的,虽然很多组件提供了写入内存 buffer 区的功能,但这可能造成宕机前部分日志的丢失,另一方面,为了保证日志记录的原子性,这些组件在写入日志时通常需要加锁,在高并发场景下,很容易出现锁等待造成性能的严重下降,这常常是十分令人抓狂的一件事。

log4j2 之所以能够在众多日志组件中脱颖而出,其异步日志的实现,无疑是一个重要的特性。

本文,我们就来详细了解一下,log4j2 的异步日志是如何实现的。

 

我们知道,Appender 是 log4j2 的核心组件之一,他用来接收 LogEvent 并按照预先的配置打印到指定的位置。而 AsyncAppender 则是 log4j2 提供用来实现异步日志的收集和打印的。

下图就是官方提供的各个日志组件异步 Appender 的执行耗时:

 

 

可见 log4j2 的 AsyncAppender 优势是非常明显的。

 

2.1 配置参数

首先我们来看看 AsyncAppender 如何配置和使用,可以参看官方文档:

https://logging.apache.org/log4j/2.x/manual/appenders.html#AsyncAppender

 

AsyncAppender 拥有以下配置参数:

参数名 参数类型 描述
AppenderRef String 最终执行日志写入的 appender,可配置多个
blocking boolean 如果为 true 或默认值,当队列满时,阻塞等待,否则将 logEvent 放入 ErrorAppender
shutdownTimeout integer LogEvent 的最长等待时间(毫秒数),为 0 或默认值表示永远等待
bufferSize integer 队列长度,默认值为 1024
errorRef String ErrorAppender,如果不配置,则丢弃所有 ErrorLogEvent
filter Filter 用来过滤 LogEvent,只有符合条件的事件才会被处理
name String Appender 名称
ignoreExceptions boolean 需要被忽略的异常
includeLocation boolean 是否打印线程本地信息,默认为 false,如果为 true,会有一定的性能损耗
BlockingQueueFactory BlockingQueueFactory BlockingQueue 的生产工厂类

2.2 配置示例

下面是一个典型配置:

<Configuration name="LinkedTransferQueueExample"> <Appenders> <List name="List"/> <Async name="Async" bufferSize="262144"> <AppenderRef ref="List"/> <LinkedTransferQueue/> </Async> </Appenders> <Loggers> <Root> <AppenderRef ref="Async"/> </Root> </Loggers> </Configuration>

 

 

接下来我们就深入源码,看看 AsyncAppender 是如何实现的。

 

3.1 类关系图

 

 

从类关系图我们可以看到,AsyncAppender 继承自 AbstractAppender。

AbstractAppender 实现了 Appender 接口,并且提供了其中 LifeCycle 接口中生命周期治理的一系列方法的默认实现,同时 AbstractAppender 继承自 AbstractFilterable,提供了 Filterable 接口的默认实现。

 

3.2 append 方法的实现

public void append(LogEvent logEvent) { if (!this.isStarted()) { throw new IllegalStateException("AsyncAppender " + this.getName() + " is not active"); } else { // 构建 Log4jLogEvent 对象 Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, this.includeLocation); InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); // 将 event 放入队列 if (!this.transfer(memento)) { // transfer 返回 false 表示队列已满 // blocking 参数决定当队列满时是否放入 ErrorAppender if (this.blocking) { if (Logger.getRecursionDepth() > 1) { Message message = AsyncQueueFullMessageUtil.transform(logEvent.getMessage()); this.logMessageInCurrentThread((new org.apache.logging.log4j.core.impl.Log4jLogEvent.Builder(logEvent)).setMessage(message).build()); } else { EventRoute route = this.asyncQueueFullPolicy.getRoute(this.thread.getId(), memento.getLevel()); route.logMessage(this, memento); } } else { this.error("Appender " + this.getName() + " is unable to write primary appenders. queue is full"); this.logToErrorAppenderIfNecessary(false, memento); } } } }

 

 

结合上面介绍的配置参数来看,这些代码非常容易理解。

 

 

3.3 AsyncQueueFullPolicy

上面流程图中提到了队列 full 策略,他指的是在队列已满且 blocking 参数为 true 时,log4j2 要执行的操作,主要有以下两种:

 

3.3.1 DefaultAsyncQueueFullPolicy -- 等待队列,转为同步操作策略

public class DefaultAsyncQueueFullPolicy implements AsyncQueueFullPolicy { @Override public EventRoute getRoute(final long backgroundThreadId, final Level level) { // LOG4J2-471: prevent deadlock when RingBuffer is full and object // being logged calls Logger.log() from its toString() method if (Thread.currentThread().getId() == backgroundThreadId) { return EventRoute.SYNCHRONOUS; } return EventRoute.ENQUEUE; } }

 

 

3.3.2 DiscardingAsyncQueueFullPolicy -- 按照日志等级抛弃日志策略

public class DiscardingAsyncQueueFullPolicy implements AsyncQueueFullPolicy { @Override public EventRoute getRoute(final long backgroundThreadId, final Level level) { if (level.isLessSpecificThan(thresholdLevel)) { if (discardCount.getAndIncrement() == 0) { LOGGER.warn("Async queue is full, discarding event with level {}. " + "This message will only appear once; future events from {} " + "are silently discarded until queue capacity becomes available.", level, thresholdLevel); } return EventRoute.DISCARD; } return super.getRoute(backgroundThreadId, level); } }

 

 

通过上述的源码和讲解,我们已经窥知 log4j2 异步日志提升性能的一些端倪了。

没错,log4j2 是通过将 LogEvent 放入队列,异步消费来实现的。

这里提到的队列,就是我们在配置文件中配置的 BlockingQueueFactory 所生产的队列对象,Log4j2 支持生成以下四种队列:

  1. ArrayBlockingQueue -- 默认的队列,通过 java 原生的 ArrayBlockingQueue 实现。
  2. DisruptorBlockingQueue -- disruptor 包实现的高性能队列。
  3. JCToolsBlockingQueue -- JCTools 实现的无锁队列。
  4. LinkedTransferQueue -- 通过 java7 以上原生支持的 LinkedTransferQueue 实现。

 

在上述四中中,默认的是 ArrayBlockingQueue,最为推荐的是 disruptor 包实现的高性能队列 DisruptorBlockingQueue。

那么,究竟 DisruptorBlockingQueue 的高性能队列是如何实现的呢?后续我会写专题文章来进行讲解,敬请期待。

 

有了上面讲述的 AsyncAppender 的 append 方法作为队列的生产者,自然还有对了的消费者,他就是 AsyncThread,也包含在 AsyncAppender.java 中。

他的主要工作是消费队列中的 LogEvent,并将日志按照已配置的 AppenderRef 以指定的方式输出到指定的位置。

AsyncThread 的源码非常容易理解:

 

private class AsyncThread extends Log4jThread { private volatile boolean shutdown = false; private final List<AppenderControl> appenders; private final BlockingQueue<LogEvent> queue; public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) { super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement()); this.appenders = appenders; this.queue = queue; setDaemon(true); } @Override public void run() { while (!shutdown) { LogEvent event; try { event = queue.take(); if (event == SHUTDOWN_LOG_EVENT) { shutdown = true; continue; } } catch (final InterruptedException ex) { break; // LOG4J2-830 } event.setEndOfBatch(queue.isEmpty()); final boolean success = callAppenders(event); if (!success && errorAppender != null) { try { errorAppender.callAppender(event); } catch (final Exception ex) { // Silently accept the error. } } } // Process any remaining items in the queue. LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.", queue.size()); int count = 0; int ignored = 0; while (!queue.isEmpty()) { try { final LogEvent event = queue.take(); if (event instanceof Log4jLogEvent) { final Log4jLogEvent logEvent = (Log4jLogEvent) event; logEvent.setEndOfBatch(queue.isEmpty()); callAppenders(logEvent); count++; } else { ignored++; LOGGER.trace("Ignoring event of class {}", event.getClass().getName()); } } catch (final InterruptedException ex) { // May have been interrupted to shut down. // Here we ignore interrupts and try to process all remaining events. } } LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored); } ... }

 

 

6.1 记录峰值吞吐量

 

 

6.2 与其他日志记录包的异步吞吐量比较

 

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,只有全部原创,只有干货没有鸡汤

 

 

 

Spring 从入门到实战

 






log      日志      java      队列      异步      log4j2      生产者消费者     


京ICP备15018585号