flume:一个例子的分析(一)

数据采集方案


采用 tail-dir 的方式,实时读取 ngix 日志。

Agent 负责对单机的日志收集工作 , collector 负责接收Agent层发送的日志, 数据分流。

物理架构图


每个flume agent 通过负载均衡的方式发送给2个flume collector。

具体 flume 配置

Agent:
ngixAgent.sources = tail_dir_source
ngixAgent.channels = memory_channel
ngixAgent.sinks = sink1 sink2


# Configure source
ngixAgent.sources.tail_dir_source.type = org.apache.flume.source.taildir.TaildirSource
ngixAgent.sources.tail_dir_source.channels = memory_channel
# 重新执行的间隔,单位为毫秒
ngixAgent.sources.tail_dir_source.restartThrottle = 10000
# 判断是否命令一次执行成功后需要重新执行
ngixAgent.sources.tail_dir_source.restart = true
ngixAgent.sources.tail_dir_source.logStdErr = true
ngixAgent.sources.tail_dir_source.positionFile = /home/logsss/positionfile/taildir_position.json
ngixAgent.sources.tail_dir_source.filegroups = f1
ngixAgent.sources.tail_dir_source.filegroups.f1 = /usr/local/ver01/tengine/logs/access.log
# 一次性你可以处理batchSize个event,batchsize <= transactionCapacity <= capacity
ngixAgent.sources.tail_dir_source.batchSize = 100
ngixAgent.sources.tail_dir_source.backoffSleepIncrement  = 1000
ngixAgent.sources.tail_dir_source.maxBackoffSleep  = 5000
ngixAgent.sources.tail_dir_source.recursiveDirectorySearch = true
ngixAgent.sources.tail_dir_source.yarnApplicationHeader = true
ngixAgent.sources.tail_dir_source.yarnContainerHeader = true


# Configure channel
ngixAgent.channels.memory_channel.type = memory
# The maximum number of events the channel will take from a source or give to a sink per transaction
ngixAgent.channels.memory_channel.transactionCapacity = 50000
# The maximum number of events stored in the channel
ngixAgent.channels.memory_channel.capacity = 100000


# Configure sinks
ngixAgent.sinks.sink1.channel = memory_channel
ngixAgent.sinks.sink1.type = avro
ngixAgent.sinks.sink1.hostname = 172.31.38.39
ngixAgent.sinks.sink1.port = 44447
ngixAgent.sinks.sink1.connect-timeout = 120000
ngixAgent.sinks.sink1.request-timeout = 120000
ngixAgent.sinks.sink2.channel = memory_channel
ngixAgent.sinks.sink2.type = avro
ngixAgent.sinks.sink2.hostname = 172.31.58.11 
ngixAgent.sinks.sink2.port = 44447
ngixAgent.sinks.sink2.connect-timeout = 120000
ngixAgent.sinks.sink2.request-timeout = 120000


# Configure load_balance
ngixAgent.sinkgroups = g1
ngixAgent.sinkgroups.g1.sinks = sink1 sink2
ngixAgent.sinkgroups.g1.processor.type = load_balance
ngixAgent.sinkgroups.g1.processor.priority.sink1 = 7
ngixAgent.sinkgroups.g1.processor.priority.sink2 = 9
ngixAgent.sinkgroups.g1.processor.backoff = true
ngixAgent.sinkgroups.g1.processor.selector = round_robin
ngixAgent.sinkgroups.g1.processor.selector.maxTimeOut=30000
Collector:
nginx-collector.sources = src1
nginx-collector.channels = ch1 ch3
nginx-collector.sinks = hdfssink wrongpathsink


# Configure source: src1
nginx-collector.sources.src1.channels = ch1 ch3 
nginx-collector.sources.src1.type = avro
nginx-collector.sources.src1.restartThrottle = 10000
nginx-collector.sources.src1.restart = true
nginx-collector.sources.src1.logStdErr = true
nginx-collector.sources.src1.bind = 0.0.0.0
nginx-collector.sources.src1.port = 44447
nginx-collector.sources.src1.threads = 12

# 定义source selector     
nginx-collector.sources.src1.selector.type = multiplexing
nginx-collector.sources.src1.selector.header = dataroute
# key 带有wrong 字段的都输出为错误路径,使用 ch3 这个 channel
nginx-collector.sources.src1.selector.mapping.wrong = ch3
# key 带有right 字段的都输出为正确路径,使用 ch1
nginx-collector.sources.src1.selector.mapping.right = ch1 
# memory channel 默认输出错误路径 按年月日拆分
nginx-collector.sources.src1.selector.default = ch3 


# 定义flume 拦截器
nginx-collector.sources.src1.interceptors = i1 i2 i3
#nginx-collector.sources.src1.interceptors.i1.type=org.flume.glbg.interceptors.EventTimeInterceptor$Builder
nginx-collector.sources.src1.interceptors.i1.type=com.glbg.flume.interceptors.LogAnalysisInterceptor$Builder

nginx-collector.sources.src1.interceptors.i1.preserveExisting = false
nginx-collector.sources.src1.interceptors.i2.type = timestamp  
nginx-collector.sources.src1.interceptors.i3.type=static
nginx-collector.sources.src1.interceptors.i3.key=topic
nginx-collector.sources.src1.interceptors.i3.preserveExisting = false
nginx-collector.sources.src1.interceptors.i3.value=glbg-analitic


nginx-collector.channels.ch1.type = file
nginx-collector.channels.ch1.transactionCapacity = 50000
nginx-collector.channels.ch1.capacity = 1000000
nginx-collector.channels.ch1.checkpointDir = /opt/cloudera/flumelog/hdfs/file/checkpoint
nginx-collector.channels.ch1.dataDirs = /opt/cloudera/flumelog/hdfs/file/data

nginx-collector.channels.ch3.type = memory
nginx-collector.channels.ch3.transactionCapacity = 50000
nginx-collector.channels.ch3.capacity = 1000000


nginx-collector.sinks.hdfssink.type=hdfs
nginx-collector.sinks.hdfssink.hdfs.path=hdfs://glbgnameservice/globalegrow/nginx-log/%{YEAR}/%{MONTH}/%{DAY}/%{ubcd}/%{logtype}
nginx-collector.sinks.hdfssink.channel=ch1
nginx-collector.sinks.hdfssink.hdfs.fileType = DataStream
nginx-collector.sinks.hdfssink.hdfs.filePrefix = nginx.%{YEAR}-%{MONTH}-%{DAY}
nginx-collector.sinks.hdfssink.hdfs.fileSuffix=.log
nginx-collector.sinks.hdfssink.hdfs.minBlockReplicas=1
nginx-collector.sinks.hdfssink.hdfs.rollInterval=0
# 文件到达多大再产生一个新文件,大约是 128M
nginx-collector.sinks.hdfssink.hdfs.rollSize=132692539
nginx-collector.sinks.hdfssink.hdfs.idleTimeout=300 
nginx-collector.sinks.hdfssink.hdfs.batchSize=500
nginx-collector.sinks.hdfssink.hdfs.rollCount=0
nginx-collector.sinks.hdfssink.hdfs.round=true
nginx-collector.sinks.hdfssink.hdfs.roundValue=10
nginx-collector.sinks.hdfssink.hdfs.roundUnit=minute
nginx-collector.sinks.hdfssink.hdfs.threadsPoolSize=60
nginx-collector.sinks.hdfssink.hdfs.rollTimerPoolSize=6
nginx-collector.sinks.hdfssink.hdfs.useLocalTimeStamp=true



nginx-collector.sinks.wrongpathsink.type=hdfs
nginx-collector.sinks.wrongpathsink.hdfs.path=hdfs://glbgnameservice/wrongpath/%Y/%m/%d/%{logtype}
nginx-collector.sinks.wrongpathsink.channel=ch3
nginx-collector.sinks.wrongpathsink.hdfs.fileType = DataStream
nginx-collector.sinks.wrongpathsink.hdfs.filePrefix = nginx.%Y-%m-%d
nginx-collector.sinks.wrongpathsink.hdfs.fileSuffix=.log
nginx-collector.sinks.wrongpathsink.hdfs.minBlockReplicas=1
nginx-collector.sinks.wrongpathsink.hdfs.rollInterval=0
nginx-collector.sinks.wrongpathsink.hdfs.rollSize=132692539
nginx-collector.sinks.wrongpathsink.hdfs.idleTimeout=600 
nginx-collector.sinks.wrongpathsink.hdfs.batchSize=100
nginx-collector.sinks.wrongpathsink.hdfs.rollCount=0
nginx-collector.sinks.wrongpathsink.hdfs.round=true
nginx-collector.sinks.wrongpathsink.hdfs.roundValue=2
nginx-collector.sinks.wrongpathsink.hdfs.roundUnit=minute
nginx-collector.sinks.wrongpathsink.hdfs.threadsPoolSize=30
nginx-collector.sinks.wrongpathsink.hdfs.rollTimerPoolSize=3
nginx-collector.sinks.wrongpathsink.hdfs.useLocalTimeStamp=true
processor

一个channel可以有多个sink应用场景有两种。
实现负载均衡:可以使用两个sink,向两个flume节点发送数据以减小flume节点服务器的压力。例子直接参考上面的配置。

backoff:开启后,故障的节点会列入黑名单,过一定时间再次发送,如果还失败,则等待是指数增长;直到达到最大的时间。
如果不开启,故障的节点每次都会被重试。

selector.maxTimeOut:最大的黑名单时间(单位为毫秒)。

source selector

selector 有两种,分别是 Replicating 和 Multiplexing。

  • Replicating
    会针对每一个 Event,拷贝到所有的 Channel 中
  • Multiplexing
    会根据 Event 中 Header 中的某个属性决定分发到哪个 Channel。
event
  1. event是flume中处理消息的基本单元,由零个或者多个header和正文body组成。
  2. Header 是 key/value 形式的,可以用来制造路由决策或携带其他结构化信息(如事件的时间戳或事件来源的服务器主机名)。你可以把它想象成和 HTTP 头一样提供相同的功能——通过该方法来传输正文之外的额外信息。
  3. Body是一个字节数组,包含了实际的内容。
flume 拦截器(Interceptor)
Timestamp Interceptor:

时间戳拦截器,将当前时间戳(毫秒)加入到 event header中,key名字为:timestamp,value 为当前时间戳:

a1.sources = s1
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = timestamp

使用例子:

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/flume/%Y%m%d
# 文件名前缀
a1.sinks.k1.hdfs.filePrefix = log_
# 文件名后缀
a1.sinks.k1.hdfs.fileSuffix = .log

在使用HDFS Sink时候,hdfs.path = hdfs://ns1/flume/%Y%m%d 表示在 hdfs 上的存储路径,%Y%m%d :占位符,表示具体的某一天,这一串占位符要求 event 的 header 中必须有 timestamp 这个 key。
也就是说,会根据 event 的 header 中保存的时间戳,把数据放入到对应时间的文件(比如 log_20180619.log)中。

会根据时间戳将数据写入相应的文件中。

Host Interceptor:

主机名拦截器。将运行Flume agent的 主机名 或者 IP 地址加入到 events header 中。key名字为:host, value为当前机器的 hostname 或者 ip。

a1.sources.s1.interceptors.i1.type = host
a1.sources.s1.interceptors.i1.hostHeader = hostname

例子:

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/flume/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =  log_%{hostname}

在 hdfs 对应目录下生成文件名为 log_zyb01.log 的文件

Static Interceptor:

可以在event的header中添加自定义的 key 和 value。

a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = static_key
a1.sources.r1.interceptors.i1.value = static_value

例子:

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/flume/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =  log_%{static_key}

最终在 hdfs 上生成一个文件名为 log_static_key.log 的文件。

Regex Filtering Interceptor:

该拦截器使用正则表达式过滤原始 events 中的内容。

配置示例如下:

## source 拦截器
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^lxw1234.*
a1.sources.r1.interceptors.i1.excludeEvents = false
 
# sink 配置 
a1.sinks.k1.type = logger

该配置表示过滤掉不是以 lxw1234 开头的 events。

如果 excludeEvents 设为 true,则表示过滤掉以 lxw1234 开头的 events。

Regex Extractor Interceptor:
该拦截器使用正则表达式抽取原始 events 中的内容,并将该内容加入events header 中。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353