[toc]
FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)
目的
使用FileBeat收集日志,Pipeline解析日志,最终写入ES
日志数据
2021-07-01 20:07:25 [XNIO-1 task-2] INFO fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X
模拟Pipeline
POST /_ingest/pipeline/_simulate
{
"pipeline": {
"processors" : [
{
"dissect": {
"field": "message",
"pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"
}
},
{
"split": {
"field": "logdata",
"separator": "\\|",
"target_field": "logdata"
}
},
{
"set": {
"field": "actionOrFunction",
"value": "{{logdata.0}}"
}
},
{
"set": {
"field": "businessType",
"value": "{{logdata.1}}"
}
},
{
"set": {
"field": "callMethod",
"value": "{{logdata.2}}"
}
},
{
"set": {
"field": "requestMethod",
"value": "{{logdata.3}}"
}
},
{
"set": {
"field": "callLink",
"value": "{{logdata.4}}"
}
},
{
"set": {
"field": "loginUserIp",
"value": "{{logdata.5}}"
}
},
{
"set": {
"field": "userName",
"value": "{{logdata.6}}"
}
},
{
"set": {
"field": "userId",
"value": "{{logdata.7}}"
}
},
{
"set": {
"field": "paramOrInputData",
"value": "{{logdata.8}}"
}
},
{
"set": {
"field": "resultOrOutputData",
"value": "{{logdata.9}}"
}
},
{
"set": {
"field": "exceptionInfo",
"value": "{{logdata.10}}"
}
},
{
"set": {
"field": "systemEnv",
"value": "{{logdata.11}}"
}
},
{
"set": {
"field": "status",
"value": "{{logdata.12}}"
}
},
{
"set": {
"field": "fullLinkId",
"value": "{{logdata.13}}"
}
},
{
"set": {
"field": "subFullLinkId",
"value": "{{logdata.14}}"
}
},
{
"set": {
"field": "currentTimeMillisecond",
"value": "{{logdata.15}}"
}
},
{
"convert": {
"field": "currentTimeMillisecond",
"type": "long"
}
},
{
"set": {
"field": "detail",
"value": "{{logdata.16}}"
}
},{
"set": {
"field": "other",
"value": "{{logdata.17}}"
}
},
{
"set": {
"field": "errorData",
"value": "{{logdata.18}}"
}
},
{
"set": {
"field": "errorDataSource",
"value": "{{logdata.19}}"
}
},
{
"set": {
"field": "errorDataDetail",
"value": "{{logdata.20}}"
}
},
{
"set": {
"field": "logTime",
"value": "{{logdata.21}}"
}
},
{
"set": {
"field": "processTime",
"value": "{{logdata.22}}"
}
},
{
"convert": {
"field": "processTime",
"type": "long"
}
},
{
"set": {
"field": "orgCode",
"value": "{{logdata.23}}"
}
},
{
"set": {
"field": "orgName",
"value": "{{logdata.24}}"
}
},
{
"set": {
"field": "exceptionDetailInfo",
"value": "{{logdata.25}}"
}
},{
"set": {
"field": "message",
"value": ""
}
},{
"set": {
"field": "logdata",
"value": ""
}
},
{
"script": {
"lang": "painless",
"source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8); """
}
}
]
},
"docs": [
{
"_source": {
"message": "2021-07-01 20:07:25 [XNIO-1 task-2] INFO fileBeatLogData - 查询运营商宽带用户|4|com.bjga.internet.operator.controller.OperatorBroadbandController.list()|GET|http://127.0.0.1:8080/operator2/broadband/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAccount=%E5%8C%97%E4%BA%AC1%E5%B8%8256&installedPhone=639857&accountHolderName=%E4%B8%9C%E7%A5%A5%E6%9E%97&operatorCreditCode=91110108101909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{\"code\":200,\"msg\":\"查询成功\",\"rows\":[],\"took\":2,\"total\":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X"
}
}
]
}
创建pipeline
PUT _ingest/pipeline/logdatapipeline
{
"description" : "outer pipeline",
"processors" : [
{
"dissect": {
"field": "message",
"pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"
}
},
{
"split": {
"field": "logdata",
"separator": "\\|",
"target_field": "logdata"
}
},
{
"set": {
"field": "actionOrFunction",
"value": "{{logdata.0}}"
}
},
{
"set": {
"field": "businessType",
"value": "{{logdata.1}}"
}
},
{
"set": {
"field": "callMethod",
"value": "{{logdata.2}}"
}
},
{
"set": {
"field": "requestMethod",
"value": "{{logdata.3}}"
}
},
{
"set": {
"field": "callLink",
"value": "{{logdata.4}}"
}
},
{
"set": {
"field": "loginUserIp",
"value": "{{logdata.5}}"
}
},
{
"set": {
"field": "userName",
"value": "{{logdata.6}}"
}
},
{
"set": {
"field": "userId",
"value": "{{logdata.7}}"
}
},
{
"set": {
"field": "paramOrInputData",
"value": "{{logdata.8}}"
}
},
{
"set": {
"field": "resultOrOutputData",
"value": "{{logdata.9}}"
}
},
{
"set": {
"field": "exceptionInfo",
"value": "{{logdata.10}}"
}
},
{
"set": {
"field": "systemEnv",
"value": "{{logdata.11}}"
}
},
{
"set": {
"field": "status",
"value": "{{logdata.12}}"
}
},
{
"set": {
"field": "fullLinkId",
"value": "{{logdata.13}}"
}
},
{
"set": {
"field": "subFullLinkId",
"value": "{{logdata.14}}"
}
},
{
"set": {
"field": "currentTimeMillisecond",
"value": "{{logdata.15}}"
}
},
{
"convert": {
"field": "currentTimeMillisecond",
"type": "long"
}
},
{
"set": {
"field": "detail",
"value": "{{logdata.16}}"
}
},{
"set": {
"field": "other",
"value": "{{logdata.17}}"
}
},
{
"set": {
"field": "errorData",
"value": "{{logdata.18}}"
}
},
{
"set": {
"field": "errorDataSource",
"value": "{{logdata.19}}"
}
},
{
"set": {
"field": "errorDataDetail",
"value": "{{logdata.20}}"
}
},
{
"set": {
"field": "logTime",
"value": "{{logdata.21}}"
}
},
{
"set": {
"field": "processTime",
"value": "{{logdata.22}}"
}
},
{
"convert": {
"field": "processTime",
"type": "long"
}
},
{
"set": {
"field": "orgCode",
"value": "{{logdata.23}}"
}
},
{
"set": {
"field": "orgName",
"value": "{{logdata.24}}"
}
},
{
"set": {
"field": "exceptionDetailInfo",
"value": "{{logdata.25}}"
}
},{
"set": {
"field": "message",
"value": ""
}
},{
"set": {
"field": "logdata",
"value": ""
}
},
{
"script": {
"lang": "painless",
"source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8); """
}
}
]
}
创建FileBeat配置文件 filebeat.yml
读取 /var/log2/*.log 文件写入ES
filebeat.inputs:
- type: log
enabled: true
#读取的文件
paths:
- /var/log2/*.log
# 标记,在后面用于判断写入的索引
fields:
type: logDataPipeline
source: common
- type: log
enabled: true
paths:
- /var/log/1.log
- /var/log/2.log
fields:
source: exception
- type: log
enabled: true
paths:
- /var/log/3.log
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
# ======================= Elasticsearch template setting =======================
setup.template.settings:
index.number_of_shards: 1
#index.codec: best_compression
#_source.enabled: false
# # 生成index模板的名称
#允许自动生成index模板
setup.template.enabled: true
# # 如果存在模块则覆盖
setup.template.overwrite: true
# # # 生成index模板时字段配置文件
setup.template.fields: fields.yml
setup.template.name: "logdata"
# # # 生成index模板匹配的index格式
setup.template.pattern: "logdata-*"
setup.ilm.enabled: auto
# 这里一定要注意 会在alias后面自动添加-*
setup.ilm.rollover_alias: "park-ssm"
setup.ilm.pattern: "{now/d}"
# # # 生成kibana中的index pattern,便于检索日志
# #setup.dashboards.index: myfilebeat-7.0.0-*
# #filebeat默认值为auto,创建的elasticsearch索引生命周期为50GB+30天。如果不改,可以不用设置
setup.ilm.enabled: false
# =================================== Kibana ===================================
setup.kibana:
# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["10.8.10.12:9200"]
index: "logdata-%{+yyyy.MM.dd}"
indices:
- index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"
when.equals:
fields:
source: "common"
- index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"
when.equals:
fields:
source: "exception"
pipelines:
- pipeline: logDataPipeline
when.equals:
fields.type: logDataPipeline
# ================================= Processors =================================
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
创建自定义字段 FileBeat fields.yml
# 我们自定义的
- key: rbt
title: rbt
description: rbt log data fields
fields:
- name: logdata
type: keyword
- name: actionOrFunction
type: keyword
- name: businessType
type: keyword
- name: callMethod
type: keyword
- name: requestMethod
type: keyword
- name: callLink
type: keyword
- name: loginUserIp
type: keyword
- name: userName
type: keyword
- name: userId
type: keyword
- name: paramOrInputData
type: keyword
- name: resultOrOutputData
type: keyword
- name: exceptionInfo
type: keyword
- name: systemEnv
type: keyword
- name: status
type: long
- name: fullLinkId
type: keyword
- name: subFullLinkId
type: keyword
- name: currentTimeMillisecond
type: long
- name: detail
type: keyword
- name: other
type: keyword
- name: errorData
type: keyword
- name: errorDataSource
type: keyword
- name: errorDataDetail
type: keyword
- name: logTime
type: keyword
- name: processTime
type: long
- name: orgCode
type: keyword
- name: orgName
type: keyword
- name: exceptionDetailInfo
type: keyword
- name: insertTime
type: date
# FileBeat自带的
- key: ecs
title: ECS
description: ECS Fields.
fields:
- name: '@timestamp'
level: core
required: true
type: date
description: 'Date/time when the event originated.
This is the date/time extracted from the event, typically representing when
the event was generated by the source.
If the event source has no original timestamp, this value is typically populated
by the first time the event was received by the pipeline.
Required field for all events.'
example: '2016-05-23T08:05:34.853Z'
执行 FileBeat
[root@test13 filebeat-7.9.3-linux-x86_64]# ls
data fields.yml.bak filebeat.reference.yml filebeat.yml.bak LICENSE.txt modules.d README.md
fields.yml filebeat filebeat.yml kibana module NOTICE.txt s.log
[root@test13 filebeat-7.9.3-linux-x86_64]# ./filebeat -e
测试
新增数据到 vim /var/log2/test.log
2021-07-01 20:07:25 [XNIO-1 task-2] INFO fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X
查询结果发现日志已经进入到ES
个人公众号(大数据学习交流): hadoopwiki