log4j logstash日志收集

使用默认的log4j的SocketAppender,直接序列化LoggingEvent,所以格式很难看。

这里我自己实现了一个Appender:https://gitee.com/acgist/demo/blob/master/acgist-share/2019/LogstashAppender.java,代码如下:

package com.acgist.demo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Layout;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;

/**
 * <p>Logstash日志输出</p>
 * <p>使用TCP协议传输,支持自定义日志分隔符。</p>
 * <p>配置参考</p>
 * <pre>
# Appender
log4j.appender.logstash=com.acgist.demo.LogstashAppender
# Logstash端口
log4j.appender.logstash.Port=4567
# Logstash地址
log4j.appender.logstash.Host=192.168.1.240
# 断线重连时间(毫秒)
log4j.appender.logstash.Delay=10000
# Logstash日志分隔符
log4j.appender.logstash.Delimiter=
# 队列长度
log4j.appender.logstash.BufferSize=102400
# 日志格式
log4j.appender.logstash.layout=org.apache.log4j.PatternLayout
log4j.appender.logstash.layout.ConversionPattern=[acgist] %d %p [%c] - %m%n
 * </pre>
 * 
 * @author acgist
 */
public class LogstashAppender extends AppenderSkeleton {
	
	/**
	 * 最大重新添加次数
	 */
	private static final int MAX_RETRY_TIMES = 10;
	
	/**
	 * 是否关闭
	 */
	private volatile boolean close = false;

	/**
	 * 缓存大小,默认:1024
	 */
	private int bufferSize = 1024;
	/**
	 * 远程端口
	 */
	private int port;
	/**
	 * 远程地址
	 */
	private String host;
	/**
	 * 重连时间
	 */
	private long delay;
	/**
	 * 日志分隔符
	 */
	private String delimiter;
	
	/**
	 * 异步线程
	 */
	private Thread thread;
	/**
	 * Socket连接
	 */
	private SocketChannel channel;
	/**
	 * 日志缓存
	 */
	private BlockingQueue<String> buffer;
	
	public LogstashAppender() {
	}

	@Override
	protected void append(LoggingEvent event) {
		if(event != null) {
			final StringBuffer logBuilder = new StringBuffer(this.layout.format(event));
			if (this.layout.ignoresThrowable()) {
				final String[] ems = event.getThrowableStrRep();
				if (ems != null) {
					final int length = ems.length;
					for (int index = 0; index < length; index++) {
						logBuilder.append(ems[index]).append(Layout.LINE_SEP);
					}
				}
			}
			if(this.delimiter != null) {
				logBuilder.append(this.delimiter);
			}
			final String log = logBuilder.toString();
			boolean done = this.buffer.offer(log);
			int times = 0;
			while(!done) {
				Thread.yield();
				done = this.buffer.offer(log);
				if(++times > MAX_RETRY_TIMES) {
					LogLog.error("超过最大重试失败次数,日志记录失败:" + log + ",重试次数:" + times);
					break;
				}
			}
		}
	}

	@Override
	public void close() {
		this.close = true;
		this.releaseThread();
		this.releaseChannel();
	}

	@Override
	public boolean requiresLayout() {
		return true;
	}

	@Override
	public void activateOptions() {
		this.buffer = new LinkedBlockingQueue<String>(this.bufferSize);
		this.buildChannel();
		this.buildThread();
	}

	/**
	 * 创建客户端连接
	 */
	private void buildChannel() {
		try {
			this.channel = SocketChannel.open();
			this.channel.connect(new InetSocketAddress(this.host, this.port));
			this.channel.configureBlocking(false);
//			this.channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
//			this.channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
			this.channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
		} catch (IOException e) {
			LogLog.error("Logstash-Socket远程服务器连接异常", e);
		}
	}

	/**
	 * 释放客户端连接
	 */
	private void releaseChannel() {
		if(this.channel != null) {
			try {
				this.channel.close();
			} catch (Exception e) {
				LogLog.error("Logstash-Socket关闭连接异常", e);
			}
		}
	}
	
	/**
	 * 创建线程
	 */
	private void buildThread() {
		this.thread = new Thread(new Runnable() {
			@Override
			public void run() {
				String log = null;
				while(!close) {
					try {
						if(log == null) {
							log = buffer.take();
						}
						if(log != null) {
							channel.write(ByteBuffer.wrap(log.getBytes()));
							log = null;
						}
					} catch (Exception e) {
						LogLog.error("Logstash-日志发送异常", e);
						try {
							Thread.sleep(delay);
						} catch (Exception ex) {
							LogLog.error("Logstash-休眠异常", e);
						}
						LogLog.warn("Logstash-失败重连");
						if(channel == null || !channel.isOpen() || !channel.isConnected()) {
							releaseChannel(); // 释放
							buildChannel(); // 重连
						}
					}
				}
			}
		});
		this.thread.setDaemon(true);
		this.thread.setName("Logstash-日志发送线程");
		this.thread.start();
	}
	
	/**
	 * 释放线程
	 */
	private void releaseThread() {
	}
	
	public int getBufferSize() {
		return bufferSize;
	}

	public void setBufferSize(int bufferSize) {
		this.bufferSize = bufferSize;
	}

	public String getHost() {
		return host;
	}

	public void setHost(String host) {
		this.host = host;
	}

	public int getPort() {
		return port;
	}

	public void setPort(int port) {
		this.port = port;
	}

	public long getDelay() {
		return delay;
	}

	public void setDelay(long delay) {
		this.delay = delay;
	}

	public String getDelimiter() {
		return delimiter;
	}

	public void setDelimiter(String delimiter) {
		this.delimiter = delimiter;
	}

}

配置:

############Logstash############
log4j.appender.logstash=com.acgist.demo.LogstashAppender
log4j.appender.logstash.Port=4567
log4j.appender.logstash.Host=127.0.0.1
log4j.appender.logstash.Delay=10000
log4j.appender.logstash.BufferSize=102400
log4j.appender.logstash.layout=org.apache.log4j.PatternLayout
log4j.appender.logstash.layout.ConversionPattern=[acgist] %d %p [%c] - %m%n

logstash.conf:

input {
	tcp {
		port => 4567
#		type => "tomcat"
# 日志粘包
#		codec => cef { delimiter => "\n" }
# 多行日志,单个日志必须以 [ 开始。
		codec => multiline {
			pattern => "^\["
# 否定,不匹配pattern的行。
			negate => true
			what => "previous"
		}
	}
}

filter {
	grok {
# 匹配并且添加属性
		match => { "message" => "\[(?<project>.+?)\] (?<timestamp>\d{4}\-\d{2}\-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) (?<level>.{4,5}) \[(?<clazz>.+?)\] \- (?<content>.+)" }
# 按照定义次序依次尝试匹配,直到匹配成功为止。匹配不成功会添加tags属性=_grokparsefailure。
#		match => [
# Tomcat日志:[项目名称] yyyy-MM-dd HH:mm:ss.SSS 日志级别 JAVA类 - 日志信息
#			"message", "\[(?<project>.+?)\] (?<timestamp>\d{4}\-\d{2}\-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) (?<level>.{4,5}) \[(?<clazz>.+?)\] \- (?<content>.+)",
#			"message", "regex"
#		]
# 重写属性
#		overwrite => [ "message" ]
# 删除属性
#		remove_field => [ "tags" ]
	}
# 不修改@timestamp属性,避免乱序。
#	date {
#		match => [ "timestamp", "YYYY-MM-dd HH:mm:ss.SSS", "ISO8601" ]
#		locale => "zh"
#		target => [ "@timestamp" ]
#		timezone => "Asia/Shanghai"
#	}
}

output {
	if [project] {
		elasticsearch {
			hosts => "localhost:9200"
			index => "elk-%{[project]}-%{+YYYY.MM.dd}"
			user => logstash_writer
			password => ayanami
#			ssl => true
#			cacert => "/opt/elk/cert/ca.crt"
		}
	} else {
		elasticsearch {
			hosts => "localhost:9200"
			index => "elk-unknown-%{+YYYY.MM.dd}"
			user => logstash_writer
			password => ayanami
#			ssl => true
#			cacert => "/opt/elk/cert/ca.crt"
		}
	}
}

这里有一个问题就是旧版本的slf4j,下面的写法,异常日志是不会输出的:

LOGGER.error("异常:{}", "测试", e);

建议更新比较新的版本。

logstash还有一个BUG,就是TCP日志缓冲32KB,如果日志量很大时,就会导致缓冲区占满,这个时候默认会插入一个\n。详细参考:
https://github.com/logstash-plugins/logstash-input-stdin/issues/8
https://github.com/logstash-plugins/logstash-input-tcp/issues/31

后来我使用UDP,UDP每一个包就是一个日志记录,可以解决换行问题,但是出现了另外一个问题:数据包太长就会出现异常,日志必须拆成小包。

我先改成JSON使用TCP再试试。

换为JSON+TCP后基本上上面的问题都解决了,下面是代码和配置:

package com.acgist.demo;

import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Layout;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;

/**
 * <p>Logstash日志输出</p>
 * <p>配置参考:</p>
 * <pre>
# Appender
log4j.appender.logstash=com.acgist.demo.LogstashAppender
# Logstash端口
log4j.appender.logstash.Port=4567
# Logstash地址
log4j.appender.logstash.Host=192.168.1.240
# 断线重连时间(毫秒)
log4j.appender.logstash.Delay=10000
# 格式化:json(默认)、plain(文本)
log4j.appender.logstash.Format=json
# 项目名称
log4j.appender.logstash.Project=acgist
# 使用协议:tcp(默认)、udp
log4j.appender.logstash.Protocol=tcp
# 队列长度
log4j.appender.logstash.BufferSize=102400
# 日志格式
log4j.appender.logstash.layout=org.apache.log4j.PatternLayout
log4j.appender.logstash.layout.ConversionPattern=[acgist] %d %p [%c] - %m%n
 * </pre>
 * 
 * @author acgist
 */
public class LogstashAppender extends AppenderSkeleton {
	
	private static final ThreadLocal<DateFormat> FORMATER = new ThreadLocal<DateFormat>() {
		@Override
		protected DateFormat initialValue() {
			return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
		}
	};
	
	/**
	 * UDP协议
	 */
	public static final String PROTOCOL_UDP = "udp";
	/**
	 * TCP协议
	 */
	public static final String PROTOCOL_TCP = "tcp";
	/**
	 * JSON
	 */
	public static final String FORMAT_JSON = "json";
	/**
	 * 简单文本
	 */
	public static final String FORMAT_PLAIN = "plain";
	/**
	 * 最大重新添加次数
	 */
	private static final int MAX_RETRY_TIMES = 10;
	/**
	 * UDP最大包长度
	 */
	private static final int MAX_UDP_PACKET_SIZE = 1024;
	
	/**
	 * 远程端口
	 */
	private int port;
	/**
	 * 远程地址
	 */
	private String host;
	/**
	 * 重连时间
	 */
	private long delay;
	/**
	 * 日志格式
	 */
	private String format;
	/**
	 * 项目名称
	 */
	private String project;
	/**
	 * 传输协议
	 */
	private String protocol;
	/**
	 * 缓存大小,默认:1024
	 */
	private int bufferSize = 1024;
	
	/**
	 * 使用TCP
	 */
	private boolean tcp;
	/**
	 * 使用JSON
	 */
	private boolean json;
	/**
	 * 异步线程
	 */
	private Thread thread;
	/**
	 * Socket连接
	 */
	private WritableByteChannel channel;
	/**
	 * 日志缓存
	 */
	private BlockingQueue<String> buffer;
	/**
	 * 是否关闭
	 */
	private volatile boolean close = false;
	
	public LogstashAppender() {
	}

	@Override
	protected void append(LoggingEvent event) {
		if(event != null) {
			final String message = buildMessage(event);
			messageBlock(event, message);
		}
	}

	@Override
	public void close() {
		this.close = true;
		this.releaseThread();
		this.releaseChannel();
	}

	@Override
	public boolean requiresLayout() {
		return true;
	}

	@Override
	public void activateOptions() {
		this.buffer = new LinkedBlockingQueue<String>(this.bufferSize);
		this.tcp = !PROTOCOL_UDP.equalsIgnoreCase(this.protocol);
		this.json = !FORMAT_PLAIN.equalsIgnoreCase(this.format);
		this.buildChannel();
		this.buildThread();
	}

	/**
	 * 创建客户端连接
	 */
	private void buildChannel() {
		try {
			if(this.tcp) {
				final SocketChannel channel = SocketChannel.open();
				channel.connect(new InetSocketAddress(this.host, this.port));
				channel.configureBlocking(false);
//				channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
//				channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
				channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
				this.channel = channel;
			} else {
				final DatagramChannel channel = DatagramChannel.open();
				channel.configureBlocking(false); // 不阻塞
				channel.connect(new InetSocketAddress(this.host, this.port)); // 连接后使用:read、write
				this.channel = channel;
			}
		} catch (Exception e) {
			LogLog.error("Logstash-Socket远程服务器连接异常", e);
		}
	}

	/**
	 * 释放客户端连接
	 */
	private void releaseChannel() {
		if(this.channel != null) {
			try {
				this.channel.close();
			} catch (Exception e) {
				LogLog.error("Logstash-Socket关闭连接异常", e);
			}
		}
	}
	
	/**
	 * 创建线程:TODO:UDP丢包
	 */
	private void buildThread() {
		this.thread = new Thread(new Runnable() {
			@Override
			public void run() {
				String log = null;
				long index = 0;
				while(!close) {
					try {
						if(log == null) {
							log = buffer.take();
						}
						if(log != null) {
							channel.write(ByteBuffer.wrap(log.getBytes()));
							log = null;
						}
					} catch (Exception e) {
						LogLog.error("Logstash-日志发送异常:" + log, e);
						if(channel == null || !channel.isOpen()) {
							try {
								Thread.sleep(delay);
							} catch (Exception ex) {
								LogLog.error("Logstash-休眠异常", e);
							}
							LogLog.warn("Logstash-失败重连");
							releaseChannel(); // 释放
							buildChannel(); // 重连
						}
					}
				}
			}
		});
		this.thread.setDaemon(true);
		this.thread.setName("Logstash-日志发送线程");
		this.thread.start();
	}
	
	/**
	 * 释放线程
	 */
	private void releaseThread() {
	}
	
	/**
	 * 日志信息拆分
	 */
	private void messageBlock(LoggingEvent event, final String message) {
		if(this.tcp) {
			offerMessage(event, message);
		} else {
			int length = message.length();
			if(length <= MAX_UDP_PACKET_SIZE) {
				offerMessage(event, message);
			} else {
				int index = 0;
				long track = System.nanoTime();
				while(index < length) {
					if(length < index + MAX_UDP_PACKET_SIZE) {
						offerMessage(event, message.substring(index), track);
					} else {
						offerMessage(event, message.substring(index, index + MAX_UDP_PACKET_SIZE), track);
					}
					index += MAX_UDP_PACKET_SIZE;
				}
			}
		}
	}
	
	/**
	 * 创建日志
	 */
	private String buildMessage(LoggingEvent event) {
		final StringBuilder logBuilder = new StringBuilder(this.layout.format(event));
		if (this.layout.ignoresThrowable()) {
			final String[] ems = event.getThrowableStrRep();
			if (ems != null) {
				final int length = ems.length;
				for (int index = 0; index < length; index++) {
					logBuilder.append(ems[index]).append(Layout.LINE_SEP);
				}
			}
		}
		return logBuilder.toString();
	}
	
	/**
	 * 缓存日志
	 */
	private void offerMessage(LoggingEvent event, String message) {
		offerMessage(event, message, 0L);
	}
	
	/**
	 * 缓存日志
	 */
	private void offerMessage(LoggingEvent event, String message, long track) {
		String log;
		if(this.json) {
			log = buildJSONLog(event, message, track);
		} else {
			log = buildPlainLog(event, message, track);
		}
		int times = 0;
		boolean done = this.buffer.offer(log);
		while(!done) {
			Thread.yield();
			done = this.buffer.offer(log);
			if(++times > MAX_RETRY_TIMES) {
				LogLog.error("超过最大重试失败次数,日志记录失败:" + log + ",重试次数:" + times);
				break;
			}
		}
	}
	
	/**
	 * 创建JSON日志
	 */
	private String buildJSONLog(LoggingEvent event, String message, long track) {
		final Map<String, String> map = new HashMap<>();
		map.put("level", event.getLevel().toString()); // 级别
		map.put("project", this.project); // 项目
		map.put("clazz", event.getLoggerName()); // 类名
		map.put("timestamp", FORMATER.get().format(new Date(event.getTimeStamp()))); // 时间
		map.put("message", message); // 信息
		if(track > 0) {
			map.put("track", String.valueOf(track)); // 跟踪
		}
		return obj2json(map) + Layout.LINE_SEP;
	}

	/**
	 * 创建文本日志
	 */
	private String buildPlainLog(LoggingEvent event, String message, long track) {
		if(track > 0) {
			return message + " - " + track;
		} else {
			return message;
		}
	}
	
	/**
	 * JSON序列化
	 */
	public static final String obj2json(Object object) {
		if(object == null) {
			return null;
		}
		final ObjectMapper mapper = buildMapper();
		try {
			return mapper.writeValueAsString(object);
		} catch (Exception e) {
			LogLog.error("JAVA对象转JSON异常,对象:" + object, e);
		}
		return null;
	}
	

	/**
	 * 不序列化null,未知属性不反序列化。
	 */
	public static final ObjectMapper buildMapper() {
		final ObjectMapper mapper = new ObjectMapper();
//		mapper.enableDefaultTyping(); // 漏洞:CVE-2019-12384
		mapper.setSerializationInclusion(Inclusion.NON_NULL); // 不序列化 null 值,使用注解:@JsonInclude(Include.NON_NULL)
		mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); // 未知属性不反序列化,使用注解:@JsonIgnoreProperties(ignoreUnknown = true)
		return mapper;
	}

	public int getPort() {
		return port;
	}

	public void setPort(int port) {
		this.port = port;
	}

	public String getHost() {
		return host;
	}

	public void setHost(String host) {
		this.host = host;
	}

	public long getDelay() {
		return delay;
	}

	public void setDelay(long delay) {
		this.delay = delay;
	}

	public String getFormat() {
		return format;
	}

	public void setFormat(String format) {
		this.format = format;
	}

	public String getProject() {
		return project;
	}

	public void setProject(String project) {
		this.project = project;
	}

	public String getProtocol() {
		return protocol;
	}

	public void setProtocol(String protocol) {
		this.protocol = protocol;
	}
	
	public int getBufferSize() {
		return bufferSize;
	}
	
	public void setBufferSize(int bufferSize) {
		this.bufferSize = bufferSize;
	}

}
input {
	tcp {
		port => 4567
#		type => "tomcat"
# 日志粘包
#		codec => cef { delimiter => "\n" }
# 多行日志,单个日志必须以 [ 开始。
#		codec => multiline {
#			pattern => "^\["
# 否定,不匹配pattern的行。
#			negate => true
#			what => "previous"
#			auto_flush_interval => 5
#		}
#		codec => json
		codec => json_lines
	}
}

filter {
#	grok {
# 匹配并且添加属性
#		match => { "message" => "\[(?<project>.+?)\] (?<timestamp>\d{4}\-\d{2}\-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) (?<level>.{4,5}) \[(?<clazz>.+?)\] \- (?<content>.+)" }
# 按照定义次序依次尝试匹配,直到匹配成功为止。匹配不成功会添加tags属性=_grokparsefailure。
#		match => [
# Tomcat日志:[项目名称] yyyy-MM-dd HH:mm:ss.SSS 日志级别 JAVA类 - 日志信息
#			"message", "\[(?<project>.+?)\] (?<timestamp>\d{4}\-\d{2}\-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) (?<level>.{4,5}) \[(?<clazz>.+?)\] \- (?<content>.+)",
#			"message", "regex"
#		]
# 重写属性
#		overwrite => [ "message" ]
# 删除属性
#		remove_field => [ "tags" ]
#	}
	
# 不修改@timestamp属性,避免乱序。
	date {
		match => [ "timestamp", "YYYY-MM-dd HH:mm:ss.SSS", "ISO8601" ]
		locale => "zh"
#		target => [ "@timestamp" ]
		timezone => "Asia/Shanghai"
	}

}

output {
	if [project] {
		elasticsearch {
			hosts => "localhost:9200"
			index => "elk-%{[project]}-%{+YYYY.MM.dd}"
			user => logstash_writer
			password => ayanami
#			ssl => true
#			cacert => "/opt/elk/cert/ca.crt"
		}
	} else {
		elasticsearch {
			hosts => "localhost:9200"
			index => "elk-unknown-%{+YYYY.MM.dd}"
			user => logstash_writer
			password => ayanami
#			ssl => true
#			cacert => "/opt/elk/cert/ca.crt"
		}
	}
}