FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)

[toc]

FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)

目的

使用FileBeat收集日志,Pipeline解析日志,最终写入ES

日志数据

2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X

模拟Pipeline

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "processors" : [
    
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"
        }
      },
      {
        "split": {
          "field": "logdata",
          "separator": "\\|",
          "target_field": "logdata"
        }
      },
      {
        "set": {
          "field": "actionOrFunction",
          "value": "{{logdata.0}}"
        }
      },
      {
        "set": {
          "field": "businessType",
          "value": "{{logdata.1}}"
        }
      },
      {
        "set": {
          "field": "callMethod",
          "value": "{{logdata.2}}"
        }
      },
      {
        "set": {
          "field": "requestMethod",
          "value": "{{logdata.3}}"
        }
      },
      {
        "set": {
          "field": "callLink",
          "value": "{{logdata.4}}"
        }
      },
      {
        "set": {
          "field": "loginUserIp",
          "value": "{{logdata.5}}"
        }
      },
      {
        "set": {
          "field": "userName",
          "value": "{{logdata.6}}"
        }
      },
      {
        "set": {
          "field": "userId",
          "value": "{{logdata.7}}"
        }
      },
      {
        "set": {
          "field": "paramOrInputData",
          "value": "{{logdata.8}}"
        }
      },
      {
        "set": {
          "field": "resultOrOutputData",
          "value": "{{logdata.9}}"
        }
      },
      {
        "set": {
          "field": "exceptionInfo",
          "value": "{{logdata.10}}"
        }
      },
      {
        "set": {
          "field": "systemEnv",
          "value": "{{logdata.11}}"
        }
      },
      {
        "set": {
          "field": "status",
          "value": "{{logdata.12}}"
        }
      },
      {
        "set": {
          "field": "fullLinkId",
          "value": "{{logdata.13}}"
        }
      },
      {
        "set": {
          "field": "subFullLinkId",
          "value": "{{logdata.14}}"
        }
      },
      {
        "set": {
          "field": "currentTimeMillisecond",
          "value": "{{logdata.15}}"
        }
      },
      {
        "convert": {
          "field": "currentTimeMillisecond",
          "type": "long"
        }
      },
      {
        "set": {
          "field": "detail",
          "value": "{{logdata.16}}"
        }
      },{
        "set": {
          "field": "other",
          "value": "{{logdata.17}}"
        }
      },
      {
        "set": {
          "field": "errorData",
          "value": "{{logdata.18}}"
        }
      },
      {
        "set": {
          "field": "errorDataSource",
          "value": "{{logdata.19}}"
        }
      },
      {
        "set": {
          "field": "errorDataDetail",
          "value": "{{logdata.20}}"
        }
      },
      {
        "set": {
          "field": "logTime",
          "value": "{{logdata.21}}"
        }
      },
      {
        "set": {
          "field": "processTime",
          "value": "{{logdata.22}}"
        }
      },
      {
        "convert": {
          "field": "processTime",
          "type": "long"
        }
      },
      {
        "set": {
          "field": "orgCode",
          "value": "{{logdata.23}}"
        }
      },
      {
        "set": {
          "field": "orgName",
          "value": "{{logdata.24}}"
        }
      },
      {
        "set": {
          "field": "exceptionDetailInfo",
          "value": "{{logdata.25}}"
        }
      },{
        "set": {
          "field": "message",
          "value": ""
        }
      },{
        "set": {
          "field": "logdata",
          "value": ""
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8);  """
        }
      }
  ]
  },
  "docs": [
    {
      "_source": {
        "message": "2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询运营商宽带用户|4|com.bjga.internet.operator.controller.OperatorBroadbandController.list()|GET|http://127.0.0.1:8080/operator2/broadband/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAccount=%E5%8C%97%E4%BA%AC1%E5%B8%8256&installedPhone=639857&accountHolderName=%E4%B8%9C%E7%A5%A5%E6%9E%97&operatorCreditCode=91110108101909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{\"code\":200,\"msg\":\"查询成功\",\"rows\":[],\"took\":2,\"total\":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X"
      }
    }
  ]
}

创建pipeline

PUT _ingest/pipeline/logdatapipeline
{
  "description" : "outer pipeline",
  "processors" : [
    
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"
        }
      },
      {
        "split": {
          "field": "logdata",
          "separator": "\\|",
          "target_field": "logdata"
        }
      },
      {
        "set": {
          "field": "actionOrFunction",
          "value": "{{logdata.0}}"
        }
      },
      {
        "set": {
          "field": "businessType",
          "value": "{{logdata.1}}"
        }
      },
      {
        "set": {
          "field": "callMethod",
          "value": "{{logdata.2}}"
        }
      },
      {
        "set": {
          "field": "requestMethod",
          "value": "{{logdata.3}}"
        }
      },
      {
        "set": {
          "field": "callLink",
          "value": "{{logdata.4}}"
        }
      },
      {
        "set": {
          "field": "loginUserIp",
          "value": "{{logdata.5}}"
        }
      },
      {
        "set": {
          "field": "userName",
          "value": "{{logdata.6}}"
        }
      },
      {
        "set": {
          "field": "userId",
          "value": "{{logdata.7}}"
        }
      },
      {
        "set": {
          "field": "paramOrInputData",
          "value": "{{logdata.8}}"
        }
      },
      {
        "set": {
          "field": "resultOrOutputData",
          "value": "{{logdata.9}}"
        }
      },
      {
        "set": {
          "field": "exceptionInfo",
          "value": "{{logdata.10}}"
        }
      },
      {
        "set": {
          "field": "systemEnv",
          "value": "{{logdata.11}}"
        }
      },
      {
        "set": {
          "field": "status",
          "value": "{{logdata.12}}"
        }
      },
      {
        "set": {
          "field": "fullLinkId",
          "value": "{{logdata.13}}"
        }
      },
      {
        "set": {
          "field": "subFullLinkId",
          "value": "{{logdata.14}}"
        }
      },
      {
        "set": {
          "field": "currentTimeMillisecond",
          "value": "{{logdata.15}}"
        }
      },
      {
        "convert": {
          "field": "currentTimeMillisecond",
          "type": "long"
        }
      },
      {
        "set": {
          "field": "detail",
          "value": "{{logdata.16}}"
        }
      },{
        "set": {
          "field": "other",
          "value": "{{logdata.17}}"
        }
      },
      {
        "set": {
          "field": "errorData",
          "value": "{{logdata.18}}"
        }
      },
      {
        "set": {
          "field": "errorDataSource",
          "value": "{{logdata.19}}"
        }
      },
      {
        "set": {
          "field": "errorDataDetail",
          "value": "{{logdata.20}}"
        }
      },
      {
        "set": {
          "field": "logTime",
          "value": "{{logdata.21}}"
        }
      },
      {
        "set": {
          "field": "processTime",
          "value": "{{logdata.22}}"
        }
      },
      {
        "convert": {
          "field": "processTime",
          "type": "long"
        }
      },
      {
        "set": {
          "field": "orgCode",
          "value": "{{logdata.23}}"
        }
      },
      {
        "set": {
          "field": "orgName",
          "value": "{{logdata.24}}"
        }
      },
      {
        "set": {
          "field": "exceptionDetailInfo",
          "value": "{{logdata.25}}"
        }
      },{
        "set": {
          "field": "message",
          "value": ""
        }
      },{
        "set": {
          "field": "logdata",
          "value": ""
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8);  """
        }
      }
  ]
}

创建FileBeat配置文件 filebeat.yml

读取 /var/log2/*.log 文件写入ES

filebeat.inputs:
- type: log
  enabled: true
#读取的文件
  paths:
    - /var/log2/*.log
# 标记,在后面用于判断写入的索引
  fields:
    type: logDataPipeline
    source: common
- type: log
  enabled: true
  paths:
    - /var/log/1.log
    - /var/log/2.log
  fields:
    source: exception
- type: log
  enabled: true
  paths:
    - /var/log/3.log


filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

# ======================= Elasticsearch template setting =======================

setup.template.settings:
  index.number_of_shards: 1
  #index.codec: best_compression
  #_source.enabled: false
  # # 生成index模板的名称
#允许自动生成index模板
setup.template.enabled: true
# # 如果存在模块则覆盖
setup.template.overwrite: true
# # # 生成index模板时字段配置文件
setup.template.fields: fields.yml
setup.template.name: "logdata" 
# # # 生成index模板匹配的index格式       
setup.template.pattern: "logdata-*" 
setup.ilm.enabled: auto
# 这里一定要注意 会在alias后面自动添加-*
setup.ilm.rollover_alias: "park-ssm"
setup.ilm.pattern: "{now/d}"
# # # 生成kibana中的index pattern,便于检索日志
# #setup.dashboards.index: myfilebeat-7.0.0-*
# #filebeat默认值为auto,创建的elasticsearch索引生命周期为50GB+30天。如果不改,可以不用设置
setup.ilm.enabled: false

# =================================== Kibana ===================================
setup.kibana:


# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["10.8.10.12:9200"]
  index: "logdata-%{+yyyy.MM.dd}"
  indices:
    - index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"
      when.equals: 
        fields: 
          source: "common"
    - index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"
      when.equals:
        fields:
          source: "exception"
  pipelines:
    - pipeline: logDataPipeline
      when.equals:
        fields.type: logDataPipeline

# ================================= Processors =================================
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

创建自定义字段 FileBeat fields.yml

# 我们自定义的
- key: rbt
  title: rbt
  description: rbt log data fields 
  fields:
    - name: logdata
      type: keyword
    - name: actionOrFunction
      type: keyword
    - name: businessType
      type: keyword
    - name: callMethod
      type: keyword
    - name: requestMethod
      type: keyword
    - name: callLink
      type: keyword
    - name: loginUserIp
      type: keyword
    - name: userName
      type: keyword
    - name: userId
      type: keyword
    - name: paramOrInputData
      type: keyword
    - name: resultOrOutputData
      type: keyword
    - name: exceptionInfo
      type: keyword
    - name: systemEnv
      type: keyword
    - name: status
      type: long
    - name: fullLinkId
      type: keyword
    - name: subFullLinkId
      type: keyword
    - name: currentTimeMillisecond
      type: long
    - name: detail
      type: keyword
    - name: other
      type: keyword
    - name: errorData
      type: keyword
    - name: errorDataSource
      type: keyword
    - name: errorDataDetail
      type: keyword
    - name: logTime
      type: keyword
    - name: processTime
      type: long
    - name: orgCode
      type: keyword
    - name: orgName
      type: keyword
    - name: exceptionDetailInfo
      type: keyword
    - name: insertTime
      type: date
            
# FileBeat自带的
- key: ecs
  title: ECS
  description: ECS Fields.
  fields:
  - name: '@timestamp'
    level: core
    required: true
    type: date
    description: 'Date/time when the event originated.

      This is the date/time extracted from the event, typically representing when
      the event was generated by the source.

      If the event source has no original timestamp, this value is typically populated
      by the first time the event was received by the pipeline.

      Required field for all events.'
    example: '2016-05-23T08:05:34.853Z'

执行 FileBeat

[root@test13 filebeat-7.9.3-linux-x86_64]# ls
data        fields.yml.bak  filebeat.reference.yml  filebeat.yml.bak  LICENSE.txt  modules.d   README.md
fields.yml  filebeat        filebeat.yml            kibana            module       NOTICE.txt  s.log
[root@test13 filebeat-7.9.3-linux-x86_64]# ./filebeat -e 

测试

新增数据到 vim /var/log2/test.log

2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X

查询结果发现日志已经进入到ES


在这里插入图片描述

个人公众号(大数据学习交流): hadoopwiki

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容