ELK+logback+kafka+nginx 搭建分布式日志分析平台

ELK+logback+kafka+nginx 搭建分布式日志分析平台

ELK(Elasticsearch , Logstash, Kibana)是一套开源的日志收集、存储和分析软件组合。而且不只是java能用,其他的开发语言也可以使用,今天给大家带来的是elk+logback+kafka搭建分布式日志分析平台。本文主要讲解一下两种流程,全程linux环境(模拟现实环境,可用内存一定要大于2G,当然也可以使用windows),至于elk这些组件的原理,百度太多了,我就不重复了,重在整合。

1.我们是通过logback打印日志,然后将日志通过kafka消息队列发送到Logstash,经过处理以后存储到Elasticsearch中,然后通过Kibana图形化界面进行分析和处理。
2.我们使用Logstash读取日志文件,经过处理以后存储到Elasticsearch中,然后通过Kibana图形化界面进行分析和处理。例如我们读取nginx的日志文件,可以统计访问用户的ip地域,请求地址等等。

一、文章案例环境

1.centos 7.2(linux)
2.elasticsearch / logstash / kibana 6.3.2 下载地址
3.nginx 1.12.2
4.kafka 2.12 下载地址
5.logback/springboot 使用springboot2.0.4.RELEASE和默认的logback
6.zookeeper 3.4.12 下载地址

二、安装Elasticsearch

1.创建用户
如果你是root用户,要新建一个用户,elasticsearch不允许root用户登录,如果不是root登录请忽略这一步。

adduser elsearch
su elsearch

2.下载安装elasticsearch,以下简称es

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.2.tar.gz
tar -zxvf elasticsearch-6.3.2.tar.gz

3.修改配置

进入es的config目录,vi elasticsearch.yml,打开注释并修改

#如果配置集群的话,只要name一样就可以自动集群了,不需要单独配置
cluster.name: nelson
network.host: 0.0.0.0 #4个0表示外网可以访问
http.port: 9200 #默认http端口
transport.tcp.port: 9300 #默认tcp端口

vi jvm.options 修改一下内存配置,我这里内存不是很多所以修改为450Mb,两者保持一致,如果你内存足够,这个可以忽略。

-Xms450M
-Xmx450M

然后就可以启动es,执行 /bin/elasticsearch 启动的时候可能会有一些提示,比如修改一些配置等,复制提示然后百度就会找到解决方案。
4.测试
如果es启动成功,可以通过浏览器访问 ip:9200,下图表示安装成功,如果无法访问,检查es是否成功启动或者是否防火墙拦截


1.png
二、安装Nginx
yum install -y nginx

然后 vi /etc/nginx/nginx.conf修改nginx的日志默认输出格式

log_format json '{"@timestamp":"$time_iso8601",'
             '"@version":"1",'
             '"client":"$remote_addr",'
             '"url":"$uri",'
             '"status":"$status",'
             '"domian":"$host",'
             '"host":"$server_addr",'
             '"size":"$body_bytes_sent",'
             '"responsetime":"$request_time",'
             '"referer":"$http_referer",'
             '"ua":"$http_user_agent"'
          '}';
access_log  /opt/access.log json;

安装完成以后service nginx start启动nginx服务
打开浏览器访问 ip,nginx默认是80端口,如果可以访问表示成功安装

2.png

三、安装Logstash

1.下载安装 logstash

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.2.tar.gz
tar -zxvf elasticsearch-6.3.2.tar.gz

2.修改内存 vi jvm.options,内存足够的话,跳过这一步

-Xms400M
-Xmx400M

3.配置输入输出
在config目录下新建文件nginx.conf
1)input表示输入源,他这里有好多插件,支持很多数据源包括文件,http等,我们这里首先收集nginx的日志。file表示读取文件;codec表示读取的文件格式,因为我们前边配置了nginx的日志格式为json,所以这里是json;start_position表示从那一行读取,他会记录上一次读取到那个位置,所以就不用担心遗漏日志了。type相当于一个tag一样,可能这里有很多输入源,后面会根据这个type进行过滤。
2)filter表示处理输入数据,因为我们前边配置了nginx的日志里边记录了用户的ip,所以我们使用geoip组件,可以根据ip匹配位置信息,下面表示你将使用那些fields字段;source表示输入json的那个属性。
3)output表示输出到哪里,可以文件、redis等,这里我们保存到es里。利用elasticsearch插件,然后配置一下es的地址,索引我们是通过日期自动生成,表示每天创建一个索引

input {
    file {
        path => "/var/log/nginx/access.log"
        type => "nginx"
        codec => "json"
        start_position => "beginning"
    }
}

filter {
    geoip {
        fields => ["city_name", "country_name", "latitude", "longitude", "region_name","region_code"]
        source => "client"
    }
}

output {
    if [type] == "nginx" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "nelson-nginx-%{+YYYY.MM.dd}"
        }
        stdout {}
    }
}

4.启动
进入到logstash 目录执行以下命令,记得加-f

./bin/logstash -f ./config/nginx.conf

然后我们在浏览器访问nginx,输入ip就可以,这时候可以在控制台看到如下输出。


3.png
四、安装Kibana

1.下载解压
我买的服务器内存只有2G,所以我用的windows安装的Kibana

wget https://artifacts.elastic.co/downloads/kibana/kibana-6.3.2-linux-x86_64.tar.gz
tar -zxvf kibana-6.3.2-linux-x86_64.tar.gz

windows下载地址,我这里用的是windows
https://artifacts.elastic.co/downloads/kibana/kibana-6.3.2-windows-x86_64.zip
2.修改配置
进入到config下,修改kibana.yml文件,如果你的kibana和es在一台机器上请忽略这一步,如果不在一台机器上,放开注释修改地址,我这里是在windows上运行的。

elasticsearch.url: "http://localhost:9200"
//我的配置
//elasticsearch.url: "http://47.98.109.5:9200"

3.启动
进入到kibana目录下

//linux
./bin/kibana
//windows 双击运行bin目录下的kibana.bat文件

4.实战
打开浏览器访问ip:5601
第一次进来我们要创建index pattern,因为我们的日志是按照日期每天存储的,所以要将这些日志聚合到一起。按照下图进行设置,因为我已经有4天的日志了,所以过滤后有四条满足。

4.png

5.png

可以根据时间段过滤,查看数据的录入量,这也表示网站访问量。


6.png

当然也可以通过rest api查询数据,支持复杂查询。索引也可以使用通配符

7.png

5.统计用户区域分布
我们要创建一个统计,然后选择饼状图,下一步选择你要统计的index pattern,这个在上一步已经创建成功

8.png

9.png

添加子查询
10.png

最后我们来一个很复杂的统计,按照国家->城市->浏览器类型,但是用kibana是很简单的,而且速度超快。
11.png

到这里利用elk分析nginx的日志就算完成了,剩下的自己研究,基本类似,一些基本概念还是要自己去百度了。

接下来是通过logback+kafka保存程序日志。因为生产环境中,分布式系统,你的服务可能有N个,例如基于docker,我们不可能给每个docker容器里安装一个logstash,所以需要通过网络向logbash传输数据。这里是通过logback产生日志,然后通过kafka消息队列传输到logstash。

五、安装Zookeeper

kafka 是需要zookeeper的,下面简称zk。

wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
tar -zxvf zookeeper-3.4.12.tar.gz
#复制配置
cp zoo_sample.cfg zoo.cfg
修改配置
vi zoo.cfg
dataDir=/root/zk/data #改为你zk目录/data

然后进入zk目录启动,如果不保存说明就启动成功了。

./bin/zkServer.sh start
六、安装Kafka

安装解压

wget http://mirror.bit.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz
tar -zxvf kafka_2.12-2.0.0.tgz
cd  kafka_2.12-2.0.0

然后修改config目录吓得server.properties,如果你的zk和你的kafka不在一台机器的话,你要修改zk的地址。
还有一点要注意的是如果你使用阿里云这一类产品的时候一定要注意下面配置,特别坑:

listeners=PLAINTEXT://172.31.167.25:9092  #阿里云内网地址
advertised.listeners=PLAINTEXT://47.104.255.217:9092  #阿里云外网地址

启动kafka server

bin/kafka-server-start.sh config/server.properties

创建一个 名称为applog 的topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic applog

查看所有topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

消息的生产者,启动以后,在控制台输入信息,然后回车发送

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic applog

消息的消费者,如果生产者那里给applog这个top输入信息发送,消费者这边就会在收到,然后在控制台打印出来。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic applog --from-beginning

接下来开发,我们只需要启动kafka server即可。上面这个消费者和生产者只是为了测试。

七、程序中使用logback

我们新建一个springboot项目,然后加入如下依赖。coding地址:点击访问

compile('org.springframework.boot:spring-boot-starter-webflux')
compile('org.springframework.kafka:spring-kafka')
compile group: 'net.logstash.logback', name: 'logstash-logback-encoder', version: '5.2'
compile group: 'com.github.danielwegener', name: 'logback-kafka-appender', version: '0.1.0'

因为springboot自带logback,所以我们也不需要手动增加依赖。然后我们在resource目录下新建文件logback-spring.xml,这样话,springboot自动读取配置文件优先顺序比较高,具体文章可以去springboot文档去查看。

<?xml version="1.0" encoding="UTF-8"?>
<configuration  scan="true" scanPeriod="60 seconds" debug="false">
    <contextName>logback</contextName>
    <property name="log.path" value="logs/elk.log" />
    <!--输出到控制台-->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
             <level>ERROR</level>
         </filter>-->
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <!--输出到文件-->
    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>logback.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>
    <!--输出到kafka-->
    <appender name="KafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
            <layout class="net.logstash.logback.layout.LogstashLayout" >
                <includeContext>false</includeContext>
                <includeCallerData>true</includeCallerData>
                <customFields>{"system":"test"}</customFields>
                <fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>
            </layout>
            <charset>UTF-8</charset>
        </encoder>
        <!--kafka topic 需要与配置文件里面的topic一致 否则kafka会沉默并鄙视你-->
        <topic>applog</topic>
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" />
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
        <producerConfig>bootstrap.servers=47.104.255.217:9092</producerConfig>
    </appender>
    
    <!--你可能还需要加点这个玩意儿-->
    <logger name="Application_ERROR">
        <appender-ref ref="KafkaAppender"/>
    </logger>
    <root level="info">
        <appender-ref ref="console" />
        <appender-ref ref="file" />
        <appender-ref ref="KafkaAppender" />
    </root>
</configuration>

然后我们新建controller

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// 注意导包,不要导错
@RestController
public class IndexController {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @RequestMapping("/")
    public void  index(){
        logger.trace("日志输出 trace");
        logger.debug("日志输出 debug");
        logger.info("日志输出 info");
        logger.warn("日志输出 warn");
        logger.error("日志输出 error");
    }
}

这个时候我们需要,修改一下logback的配置文件了,要加入kafka的输入。修改logbstash的config目录吓得nginx.conf

input {
    #nginx日志的输入
    file {
        path => "/opt/access.log"
        type => "nginx"
        codec => "json"
        start_position => "beginning"
    }
    #kafka日志输入
     kafka {
        topics => "applog"
        type => "kafka"
        bootstrap_servers => "47.104.255.217:9092"
        codec => "json"
    }
}

filter {
    if [type] == "nginx" {
         geoip {
                fields => ["city_name", "country_name", "latitude", "longitude", "region_name","region_code"]
                source => "client"
         }
    }
}

output {
    #都输出到es中,但是索引不一样
    if [type] == "nginx" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "nelson-nginx-%{+YYYY.MM.dd}"
        }
        stdout {}
    }
    if [type] == "kafka" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "nelson-applogs-%{+YYYY.MM.dd}"
        }
        stdout {}
    }
}

然后执行../bin/logstash -f ./nginx.conf,启动logstash,这时候我们的logstash就有两个输入源了。
访问项目中controller地址,看日志是否打印出来。
idea的控制台打印了日志

12.png

这是logstash打印出来的日志,如果这个出来基本可以说明成功了。


13.png

最后我们在kibana中通过rest请求es,这里表示查到数据。


14.png

要是想统计日志,可以参考上边kibanam那一块,类似。

到这里,本篇文章就结束了,elk+nginx 和 elk+logback+kafka都已经实现了,考虑篇幅,所以这里没有细讲这些概念。这些呢就自行百度吧,不重复造轮子了,只要能串通,剩下的用到啥看官方文档或者百度。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容