Flume 日志收集系统
#安装
在node01下
在hadoop用户下
cd ~/apps
在此路径下 解压后是 flume-1.8.0
ll
cd conf
再把演示用的各种conf文件上传给node01
再通过scp命令把 这个flume-1.8.0传给另外4给node
cat example.conf
sources = r1 #多个source可以用空格隔开
channels = c1
sinks = k1
sources.r1.type = netcat #绑定一个本地端口,往flume里面传输数据
sources.r1.bind = localhost
sources.r1.port = 44444
sources.r1.channels = c1 #把source和channel关联起来
channels.c1.type = memory ¥在内存中存储
channels.c1.capacity = 1000 #可以存1000个event
channels.c1.transactionCapacity = 100 #一次事务提交多少给event
sinks.k1.type =logger #以日志的形式在黑窗口中打出来
sinks.k1.channel = c1 #在哪个channel中拉取数据
启动flume
再新开一个node01
telnet localhost 44444
访问这个端口
输入 哈哈哈
#详细介绍flume的组件
一个event在一个agent中的传输处理流程如下:source--interceptor--selector->channel->sink processor--sink->中心存储/下一级
agent
#1.avro source
Avro Source:支持Avro协议,接收RPC事件请求。Avro Source通过监
听Avro端口接收外部Avro客户端流事件(event),在Flume的多层架
构中经常被使用接收上游Avro Sink发送的event
? 关键参数说明
? type:类型名称avro
? bind :绑定的IP
? port :监听的端口
? threads:接收请求的线程数,当需要接收多个avro客户端的数据流时要设置合
适的线程数,否则会造成avro客户端数据流积压
? compression-type:是否使用压缩,如果使用压缩设则值为“deflate”,avro
source一般用于多个Agent组成的数据流,接收来自avro sink的event,如果avro
source设置了压缩,name上一阶段的avro sink也要设置压缩。默认值none
? channels:Source对接的Channel名称
#2.exec source
Exec Source:支持Linux命令,收集标准输的方式监听指定文件。
Exec Source可以实现实时的消息传输,但文件的位置,不支持断点续传,当Exec Source后续增加的消息丢失,一般在测试环境使用
关键参数说明
? type :source类型为exec
? command :Linux命令
? channels :Source对接的Channel名称。
-----演示Avro Source和exec Source
在启动前 先vim avrosource.conf
修改绑定的IP为192.16.183.101
在node01中启动 bin/flume-ng agent --conf conf --conf-file conf/avrosource.conf --name avroagent -Dflume.root.logger=INFO,console
接着新开一个node01(1)的窗口
mkdir -p /home/hadoop/apps/flume/execsource/
touch exectest.log
echo 123 > exectest.log
echo 34567 >> exectest.log
启动execagent
再新建一个窗口node01(2),启动execagent
再回到node01(1)的窗口
cd /home/hadoop/apps/flume/execsource/
echo 8040 >> exectest.log
原理是 execsource.conf 监听node01/home/hadoop/apps/flume/execsource/exectest.log中日志文件的变化,收集日志里面的数据再传给avrosource
他接收后,在黑窗口中打印出数据
----演示Spooling Directory Source
Spooling Directory Source:监听一个文件夹,收集文件夹下文件数据,收集完文件数据会将文件名称的后缀改为.COMPLETED
缺点不支持已存在文件新增数据的收集,且不能够对嵌套文件夹递归监听
关键参数说明
? type :source类型为spooldir
? spoolDir:source监听的文件夹
? fileHeader :是否添加文件的绝对路径到event的header中,默认值false
? fileHeaderKey:添加到event header中文件绝对路径的键值,默认值file
? fileSuffix:收集完新文件数据给文件添加的后缀名称,默认值:
.COMPLETED
? channels :Source对接的Channel名称
先做好预备工作
在node01下
cd ~/apps/flume
mkdir spoolDir
mkdir selector
mkdir taildir
mkdir filechannel
mkdir multiplexing
启动Spooling Directory Source
在node01的一个新窗口中 bin/flume-ng agent --conf conf --conf-file conf/spooldirsource.conf --name a1 -Dflume.root.logger=INFO,console
换一个窗口
cd ~/apps/flume/spoolDir
echo 134 > test1
echo 477 >> test1 不会响应,也不能监听子文件夹下面的数据
echo 477 >> test2 新的文件就会收集到
---演示Kafka source
先创建kafka 主题 先在node03,node04,node05上启动kafka
在node03上, cd ~/apps/kafka_2.11-0.10.2.1
bin/kafka-topics.sh --create --zookeeper 192.168.183.101:2181,192.168.183.102:2181,192.168.183.103:2181,192.168.183.104:2181,192.168.183.105:2181, --replication-factor 2 --partitions 3 --topic flumetopictest1
在node01上先vim kafkasource.conf的主机名称
然后
bin/flume-ng agent --conf conf --conf-file conf/kafkasource.conf --name kafkasourceagent -Dflume.root.logger=INFO,console
在node03上启动kafka的客户端
cd /home/hadoop/apps/kafka_2.11-0.10.2.1
bin/kafka-console-producer.sh --broker-list 192.168.183.103:9092,192.168.183.104:9092 --topic flumetopictest1
写入12345
node01上就会输出写入的结果
---演示taildir source
Taildir Source:监听一个文件夹或者文件,通过正则表达式匹配需要监听的数据源文件,Taildir Source通过将监听的文件位置写入到文件中来实现断点
续传,并且能够保证没有重复数据的读取
关键参数说明
? type:source类型TAILDIR
? positionFile:保存监听文件读取位置的文件路径
? idleTimeout:关闭空闲文件延迟时间,如果有新的记录添加到已关闭的空闲文件,taildir srouce将继续打开该空闲文件,默认值120000毫秒(2分钟)
? writePosInterval:向保存读取位置文件中写入读取文件位置的时间间隔,默认值
3000毫秒
? batchSize:批量写入channel最大event数,默认值100
? maxBackoffSleep:每次最后一次尝试没有获取到监听文件最新数据的最大延迟时间,默认值5000毫秒
先做好预备工作 在node01下
cd ~/apps/flume/taildir
mkdir test1
mkdir test2
mkdir position
在node01中启动 在flume-1.8.0文件夹下
bin/flume-ng agent --n a1 --conf conf --conf-file conf/taildirsource.conf --name taildiragent -Dflume.root.logger=INFO,console
在另一个窗口中
cd ~/apps/flume/taildir/test1
echo 123 > test.log
cd ~/apps/flume/taildir/test2
echo 4590 > file2.log
----演示filechannel
先做好预备工作 在node01下
cd /home/hadoop/apps/flume/filechannel
mkdir data
mkdir checkpoint
mkdir backup
cd /home/hadoop/apps/flume-1.8.0
bin/flume-ng agent --conf conf --conf-file conf/filechannle.conf --name a1 -Dflume.root.logger=INFO,console
在node01的另一个窗口
telnet localhost 44444
发送12345
再开一个node01的窗口
cd /home/hadoop/apps/flume/filechannel/data 发现已经创建了文件。
cd /home/hadoop/apps/flume/filechannel/checkpoint 已经创建了检查点文件
----演示kafkachannel(首选)
存储容量更大,容错更好
关键参数说明:
? type:Kafka Channel类型org.apache.flume.channel.kafka.KafkaChannel
? kafka.bootstrap.servers:Kafka broker列表,格式为ip1:port1, ip2:port2…,建
议配置多个值提高容错能力,多个值之间用逗号隔开
? kafka.topic:topic名称,默认值“flume-channel”
? kafka.consumer.group.id:Consumer Group Id,全局唯一
? parseAsFlumeEvent:是否以Avro FlumeEvent模式写入到Kafka Channel中,
默认值true,event的header信息与event body都写入到kafka中
? pollTimeout:轮询超时时间,默认值500毫秒
? kafka.consumer.auto.offset.reset:earliest表示从最早的偏移量开始拉取,latest
表示从最新的偏移量开始拉取,none表示如果没有发现该Consumer组之前拉
取的偏移量则抛异常
首先做好预备工作
在node01上
cd /home/hadoop/apps/flume-1.8.0/conf
vim kafkachannel.conf(修改kafka broker的机器号)
在node03上
创建一个topic
cd /home/hadoop/apps/kafka_2.11-0.10.2.1
bin/kafka-topics.sh --create --zookeeper 192.168.183.101:2181 --replication-factor 1 --partitions 3 --topic flumechannel2
在node01上启动agent
cd /home/hadoop/apps/flume-1.8.0
bin/flume-ng agent --conf conf --conf-file conf/kafkachannel.conf --name a1 -Dflume.root.logger=INFO,console
在node01的一个新窗口中
telnet localhost 44444 发送数据123456789
----演示HDFSsink
在node01上,cd /home/hadoop/apps/flume-1.8.0
bin/flume-ng agent --conf conf --conf-file conf/hdfssink.conf --name a1 -Dflume.root.logger=INFO,console
在node01上
使用telnet发送数据
telnet localhost 44444 发送12345555
在node01的一个新窗口上,
hadoop fs -ls /data/flume/20180811 可以发现一个前缀为hdfssink的文件
---演示kafkasink(略)
---演示replicating seletor
cd /home/hadoop/apps/flume-1.8.0/conf
先修改 vim replicating_selector.conf kafka的server
修好后
在node03上创建kakfka的 topic
cd /home/hadoop/apps/kafka_2.11-0.10.2.1
bin/kafka-topics.sh --create --zookeeper 192.168.183.101:2181 --replication-factor 1 --partitions 3 --topic FlumeSelectorTopic1
在node01上启动agent
cd /home/hadoop/apps/flume-1.8.0/
bin/flume-ng agent --conf conf --conf-file conf/replicating_selector.conf --name a1
新开一个node01的窗口
telnet localhost 44444
再在一个新窗口中可以发现
cd /home/hadoop/apps/flume/selector 发现已经写入数据
在node03 启动kafka客户端监听主题
bin/kafka-console-consumer.sh --zookeeper 192.168.183.101:2181 --from-beginning --topic FlumeSelectorTopic1
回到node01的窗口
telnet localhost 44444 发送数据 78834 发送node03的kafka已经读到了
----演示Multiplexing Channel Selector
在node01上 cd ~/apps/flume/multiplexing
mkdir k11
mkdir k22
mkdir k33
修改四个配置文件绑定的端口号
avro_sink1.conf avro_sink2.conf avro_sink3.conf multiplexing.conf
在node01上
bin/flume-ng agent --conf conf --conf-file conf/multiplexing_selector.conf --name a3 -Dflume.root.logger=INFO,console
再分别启动三个阶段的agent
在node01的一个新窗口下
cd /home/hadoop/apps/flume-1.8.0
bin/flume-ng agent --conf conf --conf-file conf/avro_sink1.conf --name agent1 >/dev/null 2>&1 &
bin/flume-ng agent --conf conf --conf-file conf/avro_sink2.conf --name agent2 >/dev/null 2>&1 &
bin/flume-ng agent --conf conf --conf-file conf/avro_sink3.conf --name agent3 >/dev/null 2>&1 &
jps后
发现的application进程就是新的agent进程
看看端口在不在
lsof -i:44444
lsof -i:44445
lsof -i:44446 发现端口正常监听
在node01的一个新窗口中
telnet localhost 44444 发送4444444444
telnet localhost 44445 发送5555555555
telnet localhost 44446 发送6666666666
查看
cd /home/hadoop/apps/flume/multiplexing/k11
cat 1533997666003-10
发现了4444444444这个数据
cd /home/hadoop/apps/flume/multiplexing/k12
cat 1533997666003-10
发现5555555555这个数据
cd /home/hadoop/apps/flume/multiplexing/k12
cat 1533997666003-10
发现6666666666这个数据
---sink processor
多个sink processor需要
负载均衡或者容错的processor