FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)

[toc]

FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)

目的

使用FileBeat收集日志,Pipeline解析日志,最终写入ES

日志数据

2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X

模拟Pipeline

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "processors" : [
    
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"
        }
      },
      {
        "split": {
          "field": "logdata",
          "separator": "\\|",
          "target_field": "logdata"
        }
      },
      {
        "set": {
          "field": "actionOrFunction",
          "value": "{{logdata.0}}"
        }
      },
      {
        "set": {
          "field": "businessType",
          "value": "{{logdata.1}}"
        }
      },
      {
        "set": {
          "field": "callMethod",
          "value": "{{logdata.2}}"
        }
      },
      {
        "set": {
          "field": "requestMethod",
          "value": "{{logdata.3}}"
        }
      },
      {
        "set": {
          "field": "callLink",
          "value": "{{logdata.4}}"
        }
      },
      {
        "set": {
          "field": "loginUserIp",
          "value": "{{logdata.5}}"
        }
      },
      {
        "set": {
          "field": "userName",
          "value": "{{logdata.6}}"
        }
      },
      {
        "set": {
          "field": "userId",
          "value": "{{logdata.7}}"
        }
      },
      {
        "set": {
          "field": "paramOrInputData",
          "value": "{{logdata.8}}"
        }
      },
      {
        "set": {
          "field": "resultOrOutputData",
          "value": "{{logdata.9}}"
        }
      },
      {
        "set": {
          "field": "exceptionInfo",
          "value": "{{logdata.10}}"
        }
      },
      {
        "set": {
          "field": "systemEnv",
          "value": "{{logdata.11}}"
        }
      },
      {
        "set": {
          "field": "status",
          "value": "{{logdata.12}}"
        }
      },
      {
        "set": {
          "field": "fullLinkId",
          "value": "{{logdata.13}}"
        }
      },
      {
        "set": {
          "field": "subFullLinkId",
          "value": "{{logdata.14}}"
        }
      },
      {
        "set": {
          "field": "currentTimeMillisecond",
          "value": "{{logdata.15}}"
        }
      },
      {
        "convert": {
          "field": "currentTimeMillisecond",
          "type": "long"
        }
      },
      {
        "set": {
          "field": "detail",
          "value": "{{logdata.16}}"
        }
      },{
        "set": {
          "field": "other",
          "value": "{{logdata.17}}"
        }
      },
      {
        "set": {
          "field": "errorData",
          "value": "{{logdata.18}}"
        }
      },
      {
        "set": {
          "field": "errorDataSource",
          "value": "{{logdata.19}}"
        }
      },
      {
        "set": {
          "field": "errorDataDetail",
          "value": "{{logdata.20}}"
        }
      },
      {
        "set": {
          "field": "logTime",
          "value": "{{logdata.21}}"
        }
      },
      {
        "set": {
          "field": "processTime",
          "value": "{{logdata.22}}"
        }
      },
      {
        "convert": {
          "field": "processTime",
          "type": "long"
        }
      },
      {
        "set": {
          "field": "orgCode",
          "value": "{{logdata.23}}"
        }
      },
      {
        "set": {
          "field": "orgName",
          "value": "{{logdata.24}}"
        }
      },
      {
        "set": {
          "field": "exceptionDetailInfo",
          "value": "{{logdata.25}}"
        }
      },{
        "set": {
          "field": "message",
          "value": ""
        }
      },{
        "set": {
          "field": "logdata",
          "value": ""
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8);  """
        }
      }
  ]
  },
  "docs": [
    {
      "_source": {
        "message": "2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询运营商宽带用户|4|com.bjga.internet.operator.controller.OperatorBroadbandController.list()|GET|http://127.0.0.1:8080/operator2/broadband/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAccount=%E5%8C%97%E4%BA%AC1%E5%B8%8256&installedPhone=639857&accountHolderName=%E4%B8%9C%E7%A5%A5%E6%9E%97&operatorCreditCode=91110108101909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{\"code\":200,\"msg\":\"查询成功\",\"rows\":[],\"took\":2,\"total\":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X"
      }
    }
  ]
}

创建pipeline

PUT _ingest/pipeline/logdatapipeline
{
  "description" : "outer pipeline",
  "processors" : [
    
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"
        }
      },
      {
        "split": {
          "field": "logdata",
          "separator": "\\|",
          "target_field": "logdata"
        }
      },
      {
        "set": {
          "field": "actionOrFunction",
          "value": "{{logdata.0}}"
        }
      },
      {
        "set": {
          "field": "businessType",
          "value": "{{logdata.1}}"
        }
      },
      {
        "set": {
          "field": "callMethod",
          "value": "{{logdata.2}}"
        }
      },
      {
        "set": {
          "field": "requestMethod",
          "value": "{{logdata.3}}"
        }
      },
      {
        "set": {
          "field": "callLink",
          "value": "{{logdata.4}}"
        }
      },
      {
        "set": {
          "field": "loginUserIp",
          "value": "{{logdata.5}}"
        }
      },
      {
        "set": {
          "field": "userName",
          "value": "{{logdata.6}}"
        }
      },
      {
        "set": {
          "field": "userId",
          "value": "{{logdata.7}}"
        }
      },
      {
        "set": {
          "field": "paramOrInputData",
          "value": "{{logdata.8}}"
        }
      },
      {
        "set": {
          "field": "resultOrOutputData",
          "value": "{{logdata.9}}"
        }
      },
      {
        "set": {
          "field": "exceptionInfo",
          "value": "{{logdata.10}}"
        }
      },
      {
        "set": {
          "field": "systemEnv",
          "value": "{{logdata.11}}"
        }
      },
      {
        "set": {
          "field": "status",
          "value": "{{logdata.12}}"
        }
      },
      {
        "set": {
          "field": "fullLinkId",
          "value": "{{logdata.13}}"
        }
      },
      {
        "set": {
          "field": "subFullLinkId",
          "value": "{{logdata.14}}"
        }
      },
      {
        "set": {
          "field": "currentTimeMillisecond",
          "value": "{{logdata.15}}"
        }
      },
      {
        "convert": {
          "field": "currentTimeMillisecond",
          "type": "long"
        }
      },
      {
        "set": {
          "field": "detail",
          "value": "{{logdata.16}}"
        }
      },{
        "set": {
          "field": "other",
          "value": "{{logdata.17}}"
        }
      },
      {
        "set": {
          "field": "errorData",
          "value": "{{logdata.18}}"
        }
      },
      {
        "set": {
          "field": "errorDataSource",
          "value": "{{logdata.19}}"
        }
      },
      {
        "set": {
          "field": "errorDataDetail",
          "value": "{{logdata.20}}"
        }
      },
      {
        "set": {
          "field": "logTime",
          "value": "{{logdata.21}}"
        }
      },
      {
        "set": {
          "field": "processTime",
          "value": "{{logdata.22}}"
        }
      },
      {
        "convert": {
          "field": "processTime",
          "type": "long"
        }
      },
      {
        "set": {
          "field": "orgCode",
          "value": "{{logdata.23}}"
        }
      },
      {
        "set": {
          "field": "orgName",
          "value": "{{logdata.24}}"
        }
      },
      {
        "set": {
          "field": "exceptionDetailInfo",
          "value": "{{logdata.25}}"
        }
      },{
        "set": {
          "field": "message",
          "value": ""
        }
      },{
        "set": {
          "field": "logdata",
          "value": ""
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8);  """
        }
      }
  ]
}

创建FileBeat配置文件 filebeat.yml

读取 /var/log2/*.log 文件写入ES

filebeat.inputs:
- type: log
  enabled: true
#读取的文件
  paths:
    - /var/log2/*.log
# 标记,在后面用于判断写入的索引
  fields:
    type: logDataPipeline
    source: common
- type: log
  enabled: true
  paths:
    - /var/log/1.log
    - /var/log/2.log
  fields:
    source: exception
- type: log
  enabled: true
  paths:
    - /var/log/3.log


filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

# ======================= Elasticsearch template setting =======================

setup.template.settings:
  index.number_of_shards: 1
  #index.codec: best_compression
  #_source.enabled: false
  # # 生成index模板的名称
#允许自动生成index模板
setup.template.enabled: true
# # 如果存在模块则覆盖
setup.template.overwrite: true
# # # 生成index模板时字段配置文件
setup.template.fields: fields.yml
setup.template.name: "logdata" 
# # # 生成index模板匹配的index格式       
setup.template.pattern: "logdata-*" 
setup.ilm.enabled: auto
# 这里一定要注意 会在alias后面自动添加-*
setup.ilm.rollover_alias: "park-ssm"
setup.ilm.pattern: "{now/d}"
# # # 生成kibana中的index pattern,便于检索日志
# #setup.dashboards.index: myfilebeat-7.0.0-*
# #filebeat默认值为auto,创建的elasticsearch索引生命周期为50GB+30天。如果不改,可以不用设置
setup.ilm.enabled: false

# =================================== Kibana ===================================
setup.kibana:


# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["10.8.10.12:9200"]
  index: "logdata-%{+yyyy.MM.dd}"
  indices:
    - index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"
      when.equals: 
        fields: 
          source: "common"
    - index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"
      when.equals:
        fields:
          source: "exception"
  pipelines:
    - pipeline: logDataPipeline
      when.equals:
        fields.type: logDataPipeline

# ================================= Processors =================================
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

创建自定义字段 FileBeat fields.yml

# 我们自定义的
- key: rbt
  title: rbt
  description: rbt log data fields 
  fields:
    - name: logdata
      type: keyword
    - name: actionOrFunction
      type: keyword
    - name: businessType
      type: keyword
    - name: callMethod
      type: keyword
    - name: requestMethod
      type: keyword
    - name: callLink
      type: keyword
    - name: loginUserIp
      type: keyword
    - name: userName
      type: keyword
    - name: userId
      type: keyword
    - name: paramOrInputData
      type: keyword
    - name: resultOrOutputData
      type: keyword
    - name: exceptionInfo
      type: keyword
    - name: systemEnv
      type: keyword
    - name: status
      type: long
    - name: fullLinkId
      type: keyword
    - name: subFullLinkId
      type: keyword
    - name: currentTimeMillisecond
      type: long
    - name: detail
      type: keyword
    - name: other
      type: keyword
    - name: errorData
      type: keyword
    - name: errorDataSource
      type: keyword
    - name: errorDataDetail
      type: keyword
    - name: logTime
      type: keyword
    - name: processTime
      type: long
    - name: orgCode
      type: keyword
    - name: orgName
      type: keyword
    - name: exceptionDetailInfo
      type: keyword
    - name: insertTime
      type: date
            
# FileBeat自带的
- key: ecs
  title: ECS
  description: ECS Fields.
  fields:
  - name: '@timestamp'
    level: core
    required: true
    type: date
    description: 'Date/time when the event originated.

      This is the date/time extracted from the event, typically representing when
      the event was generated by the source.

      If the event source has no original timestamp, this value is typically populated
      by the first time the event was received by the pipeline.

      Required field for all events.'
    example: '2016-05-23T08:05:34.853Z'

执行 FileBeat

[root@test13 filebeat-7.9.3-linux-x86_64]# ls
data        fields.yml.bak  filebeat.reference.yml  filebeat.yml.bak  LICENSE.txt  modules.d   README.md
fields.yml  filebeat        filebeat.yml            kibana            module       NOTICE.txt  s.log
[root@test13 filebeat-7.9.3-linux-x86_64]# ./filebeat -e 

测试

新增数据到 vim /var/log2/test.log

2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X

查询结果发现日志已经进入到ES


在这里插入图片描述

个人公众号(大数据学习交流): hadoopwiki

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容