看了很多ELK日子分析系统的实现,多数是写怎么搭建,至于怎么在不影响主项目的情况下、异步去记录日志没有过多说明,为此我以Logback为列,通过Logback底层原理,在不影响主代码的情况下,实现ELK日志分析系统。
PS:至于安装ELK,和ELK的说明的,可以参考一下连接:
https://www.cnblogs.com/kevingrace/p/5919021.html
https://www.cnblogs.com/yuhuLin/p/7018858.html
https://my.oschina.net/itblog/blog/547250
Logback:
Logback是由log4j创始人设计的另一个开源日志组件,官方网站: http://logback.qos.ch。
看看(最)简单的Logback配置logback.xml:
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoder 默认配置为PatternLayoutEncoder -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
我们配置一个ConsoleAppender就可以在控制台上打印出日志。
实现思路:
Logback(我用的是logback-classic-1.2.3版本)有个DBAppender,功能就是将日志插入数据库,这样的话我们可以仿照它,实现将日志插入到elasticsearch的功能。
我们看看DBAppender所需要加的配置:
<appender name="db-classic-mysql" class="ch.qos.logback.classic.db.DBAppender">
<connectionSource class="ch.qos.logback.core.db.DataSourceConnectionSource">
<dataSource class="com.mchange.v2.c3p0.ComboPooledDataSource">
<driverClass>com.mysql.jdbc.Driver</driverClass>
<jdbcUrl>jdbc:mysql://{$server ip}:3306/{$dbname}</jdbcUrl>
<user>{$user}</user>
<password>{$password}</password>
</dataSource>
</connectionSource>
</appender>
从配置上看就是把参数传到DBAppender里面,我们ES的Appender只需仿照它的做法就可以。
目标:找出DBAppender的sql在哪里commit
我们再看看DBAppender:
其初始化参数:
protected String insertPropertiesSQL;
protected String insertExceptionSQL;
protected String insertSQL;
protected static final Method GET_GENERATED_KEYS_METHOD;
private DBNameResolver dbNameResolver;
static final int TIMESTMP_INDEX = 1;
static final int FORMATTED_MESSAGE_INDEX = 2;
static final int LOGGER_NAME_INDEX = 3;
static final int LEVEL_STRING_INDEX = 4;
static final int THREAD_NAME_INDEX = 5;
static final int REFERENCE_FLAG_INDEX = 6;
static final int ARG0_INDEX = 7;
static final int ARG1_INDEX = 8;
static final int ARG2_INDEX = 9;
static final int ARG3_INDEX = 10;
static final int CALLER_FILENAME_INDEX = 11;
static final int CALLER_CLASS_INDEX = 12;
static final int CALLER_METHOD_INDEX = 13;
static final int CALLER_LINE_INDEX = 14;
static final int EVENT_ID_INDEX = 15;
static final StackTraceElement EMPTY_CALLER_DATA = CallerData.naInstance();
涉及sql的就只有
insertPropertiesSQL,insertExceptionSQL,insertSQL。而我们顺藤摸瓜,发现最后他们都做这样的操作:
super.start();
我们看看他的类继承关系图(idea自带这个插件,eclipse可以安装类似的插件):
看看DBAppender继承的DBAppenderBase:
protected abstract的就有getGeneratedKeysMethod();getInsertSQL();subAppend(E var1, Connection var2, PreparedStatement var3);secondarySubAppend(E var1, Connection var2, long var3);
就说我们仿照DBAppender继承DBAppenderBase就要实现上面4个方法;
看看DBAppenderBase继承的UnsynchronizedAppenderBase:
protected abstract的就只有有append(E var1);
再看回DBAppenderBase的append(E eventObject):
connection.commit();
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!就是在这里!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
这样的话我们仿照DBAppenderBase写一个ESAppender,继承UnsynchronizedAppenderBase,实现append方法,将日志插入ES里!
下面就是贴代码、贴图片时间:
项目结构图:
主类ESAppender:
package com.elk.log.appender;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.ThrowableProxyUtil;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import com.alibaba.fastjson.JSON;
import com.elk.log.utils.InitES;
import com.elk.log.vo.EsLogVo;
import com.elk.log.vo.Location;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.core.Index;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
public class ESAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
private static JestClient jestClient ;
//索引名称
String esIndex = "java-log-#date#";
//索引类型
String esType = "java-log";
//是否打印行号
boolean isLocationInfo = true;
//运行环境
String env = "";
//es地址
String esAddress = "";
public String getEsIndex() {
return esIndex;
}
public void setEsIndex(String esIndex) {
this.esIndex = esIndex;
}
public String getEsType() {
return esType;
}
public void setEsType(String esType) {
this.esType = esType;
}
public boolean isLocationInfo() {
return isLocationInfo;
}
public void setLocationInfo(boolean locationInfo) {
isLocationInfo = locationInfo;
}
public String getEnv() {
return env;
}
public void setEnv(String env) {
this.env = env;
}
public String getEsAddress() {
return esAddress;
}
public void setEsAddress(String esAddress) {
this.esAddress = esAddress;
}
@Override
protected void append(ILoggingEvent event) {
EsLogVo esLogVo = new EsLogVo();
esLogVo.setHost("HostName");
esLogVo.setIp("127.0.0.1");
esLogVo.setEnv(this.env);
esLogVo.setLevel(event.getLevel().toString());
Location location = new Location();
StackTraceElement[] callerDataArray = event.getCallerData();
if(callerDataArray != null && callerDataArray.length >0){
StackTraceElement immediateCallerData = callerDataArray[0];
location.setClassName(immediateCallerData.getClassName());
location.setMethod(immediateCallerData.getMethodName());
location.setFile(immediateCallerData.getFileName());
location.setLine(Integer.toString(immediateCallerData.getLineNumber()));
}
IThrowableProxy tp = event.getThrowableProxy();
if (tp != null){
String throwable = ThrowableProxyUtil.asString(tp);
esLogVo.setThrowable(throwable);
}
esLogVo.setLocation(location);
esLogVo.setLogger(event.getLoggerName());
esLogVo.setMessage(event.getFormattedMessage());
SimpleDateFormat df = new SimpleDateFormat("yyyy.MM.dd");
SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd");
esLogVo.setTimestamp(df2.format(new Date(event.getTimeStamp())));
esLogVo.setThread(event.getThreadName());
Map<String ,String > mdcPropertyMap = event.getMDCPropertyMap();
esLogVo.setTraceId(mdcPropertyMap.get("traceId"));
esLogVo.setRpcId(mdcPropertyMap.get("rpcId"));
String jsonString = JSON.toJSONString(esLogVo);
String esIndex_format = esIndex.replace("#date#",df.format(new Date(event.getTimeStamp())));
Index index = new Index.Builder(esLogVo).index(esIndex_format).type(esType).build();
try{
JestResult result = jestClient.execute(index);
System.out.println(result);
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void start() {
super.start();
Properties properties = new Properties();
properties.put("es.hosts",esAddress);
properties.put("es.username","zkpk");
properties.put("es.password","123456");
jestClient = InitES.jestClient(properties);
}
@Override
public void stop() {
super.stop();
jestClient.shutdownClient();
}
}
我们主要实现append方法,重写start(),stop()。
append主要是插入es索引,start和stop主要用来打开和关闭ES的链接。
工具类InitES(用于初始化es连接用的):
package com.elk.log.utils;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.client.JestClient;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Properties;
public class InitES {
private static io.searchbox.client.JestClient JestClient;
public static JestClient jestClient(Properties properties) {
JestClientFactory factory = new JestClientFactory();
String userName = properties.getProperty("es.username");
String password = properties.getProperty("es.password");
String esHosts = properties.getProperty("es.hosts");
List<String> serverList = new ArrayList <String>();
for (String address:esHosts.split(",")) {
serverList.add("http://"+address);
}
HttpClientConfig.Builder builder = new HttpClientConfig.Builder(serverList);
builder.maxTotalConnection(20);
builder.defaultMaxTotalConnectionPerRoute(5);
builder.defaultCredentials(userName,password);
builder.multiThreaded(true);
factory.setHttpClientConfig(builder.build());
if (JestClient == null) {
JestClient = factory.getObject();
}
return JestClient;
}
}
这里,我们用Jest方式连接ES。
两个Javabean:EsLogVo,Location:
public class EsLogVo {
private String host;
private String ip;
private String env;
private String message;
private String timestamp;
private String logger;
private String level;
private String thread;
private String throwable;
private Location location;
private String traceId;
private String rpcId;
...set and get...
}
public class Location {
private String className;
private String method;
private String file;
private String line;
...set and get...
}
测试类ESLogTest:
package com.elk.log;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class ESLogTest {
private static final Logger logger = LoggerFactory.getLogger(ESLogTest.class);
@Test
public void testLog() throws InterruptedException{
logger.info("我是正常信息 test message info");
logger.error("我一条异常的信息",new Exception("项目报错了,加班吧!!!!"));
logger.debug("debug消息 debug hello hi");
logger.warn("警告警告");
TimeUnit.SECONDS.sleep(10);
}
}
配置文件logback.xml:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<contextName>logback-api</contextName>
<property name="logback.system" value="logback-system"/>
<property name="logback.path" value="../logs/logback-system"/>
<property name="logback.level" value="DEBUG"/>
<property name="logback.pattern" value="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5p - %m%n"/>
<property name="logback.env" value="dev"/>
<property name="logback.isLocation" value="true"/>
<property name="logback.esAddress" value="192.168.0.128:9200,192.168.0.129:9200,192.168.0.130:9200"/>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%-5p %d{yy-MM-dd HH:mm:ss} %m] %caller{1}</pattern>
</encoder>
</appender>
<appender name="ES" class="com.elk.log.appender.ESAppender">
<!--索引名字 date为通配符 ,会自动替换为yyyy.MM.dd格式-->
<esIndex>java-log-#date#</esIndex>
<!--索引类型-->
<esType>${logback.system}</esType>
<!--运行环境-->
<env>${logback.env}</env>
<!--ES地址-->
<esAddress>${logback.esAddress}</esAddress>
</appender>
<appender name="ASYNC_ES" class="ch.qos.logback.classic.AsyncAppender">
<!--默认情况下,当BlockingQueue还有20%容量,他将丢弃TRACE,DEBUG和INFO级别的event,只保留warn和error-->
<discardingThreshold>0</discardingThreshold>
<!--BlockingQueue的最大容量,默认情况下,大小为256-->
<queueSize>256</queueSize>
<appender-ref ref="ES"/>
<!--要是保留行号,需要开启为true-->
<includeCallerData>true</includeCallerData>
</appender>
<logger name="com.elk.log" additivity="true">
<level value="${logback.level}"></level>
<appender-ref ref="ASYNC_ES" />
</logger>
<root level="${logback.level}">
<appender-ref ref="stdout" />
</root>
</configuration>
这里我们定义自己的ESAppender,取名未ES,并将所需参数传到ESAppender。
然后我们再把它放到AsyncAppender里面进行异步处理,防止ELK报错影响到主程序。
OK,测试:
我们启动ELK,在Kibana的Dev Tools上创建一个java的动态日志模板:
PUT _template/java-log
{
"template" : "java-log-*",
"order" : 0,
"settings" : {
"index":{
"refresh_interval": "5s"
}
},
"mappings": {
"_default_": {
"dynamic_templates": [
{
"message_field": {
"match_mapping_type": "string",
"path_match": "message",
"mapping": {
"norms": false,
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
},
{
"throwable_fields": {
"match_mapping_type": "string",
"path_match": "throwable",
"mapping": {
"norms": false,
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
},
{
"string_fields": {
"match_mapping_type": "string",
"match": "*",
"mapping": {
"norms": false,
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
],
"_all": {
"enabled": false
},
"properties": {
"env": {
"type": "keyword"
},
"host": {
"type": "keyword"
},
"ip": {
"type": "ip"
},
"level": {
"type": "keyword"
},
"location": {
"properties": {
"line": {
"type": "integer"
}
}
},
"timestamp": {
"type": "date"
}
}
}
}
}
用的是IK分词器,refresh时间定为5s用于生产上调高效率,定义了几个参数作为关键搜索。
然后将java-log-*作为主要观察对象,回到Discover
回到idea,在ESLogTest上跑testLog:
[INFO 18-03-07 16:56:49 我是正常信息 test message info] Caller+0 at com.elk.log.ESLogTest.testLog(ESLogTest.java:14)
[ERROR 18-03-07 16:56:49 我一条异常的信息] Caller+0 at com.elk.log.ESLogTest.testLog(ESLogTest.java:15)
java.lang.Exception: 项目报错了,加班吧!!!!
at com.elk.log.ESLogTest.testLog(ESLogTest.java:15)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59)
at org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98)
at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79)
at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87)
at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77)
at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42)
at org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88)
at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51)
at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44)
at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27)
at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37)
at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42)
at org.junit.runner.JUnitCore.run(JUnitCore.java:130)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
[DEBUG 18-03-07 16:56:49 debug消息 debug hello hi] Caller+0 at com.elk.log.ESLogTest.testLog(ESLogTest.java:16)
[WARN 18-03-07 16:56:49 警告警告] Caller+0 at com.elk.log.ESLogTest.testLog(ESLogTest.java:17)
[DEBUG 18-03-07 16:56:50 Request and operation succeeded] Caller+0 at io.searchbox.action.AbstractAction.createNewElasticSearchResult(AbstractAction.java:75)
io.searchbox.core.DocumentResult@4421d75d
[DEBUG 18-03-07 16:56:51 Request and operation succeeded] Caller+0 at io.searchbox.action.AbstractAction.createNewElasticSearchResult(AbstractAction.java:75)
io.searchbox.core.DocumentResult@25a5426c
[DEBUG 18-03-07 16:56:51 Request and operation succeeded] Caller+0 at io.searchbox.action.AbstractAction.createNewElasticSearchResult(AbstractAction.java:75)
io.searchbox.core.DocumentResult@5fd33fbc
[DEBUG 18-03-07 16:56:51 Request and operation succeeded] Caller+0 at io.searchbox.action.AbstractAction.createNewElasticSearchResult(AbstractAction.java:75)
io.searchbox.core.DocumentResult@25a52369
可以说都是我们想要的信息。
我们再回到Kibana:
可以看到,测试上的4条日志信息已经可以插到ES上。
然后再看看错误的那条日志信息:
报错的类,方法,行数,报错内容。。。都一一列出来。
任务完成,到此,我们可以在不影响其他代码的情况下,将日志插入到ES里,在以后的编程中可以快速定位问题。
扩展:
我们看看除了DBAppender实现Appender这条线外,还有哪些类走这条线:
我们看到实现Appender接口的就有两个类:AppenderBase和UnsynchronizedAppenderBase。
从字面上的意思就是一个同步的,一个异步的,我们用对比工具对比一下,发现这两个类几乎没什么区别,最大不同就是AppenderBase的doAppend方法多了synchronized的锁。
像插到数据库日志的DBAppender,控制台打印日志的ConsoleAppender。。。就会走异步的UnsynchronizedAppenderBase这条线。
像SMTPAppender,JMSAppenderBase,SocketAppenderBase。。。就会走AppenderBase这条线。
所以像DBAppender和DBAppenderBase,这两个类可以合成一个整体,直接继承UnsynchronizedAppenderBase,走异步这条线。
-----------------------谢谢观看,希望本文对你有帮助----------------------------