Flume 示例

示例1 指定端口采集数据输出到控制台

# 命名组件
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

# 设置sources
agent1.sources.source1.type = netcat
agent1.sources.source1.bind = host000
agent1.sources.source1.port = 44444

# 设置channel
agent1.channels.channel1.type = memory

# 设置sink
agent1.sinks.sink1.type = logger

# 绑定sources和sink到channel
# source可以绑定多个channel
agent1.sources.source1.channels = channel1
# sink只能绑定一个channel
agent1.sinks.sink1.channel = channel1

运行:# --conf 指定flume配置文件文件夹,--conf-file 指定我们自定义flume设置。

$ flume-ng agent \
 --name agent1 \
 --conf $FLUME_HOME/conf \
 --conf-file /home/user000/confs/flume_conf/example1.conf \
 -Dflume.root.logger=INFO,console

测试

$ telnet host000 44444
hello
baozi

再看看flume输出:Event是Flume处理数据的基础单元

2018-10-27 10:01:44,074 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 0D                               hello. }
2018-10-27 10:01:45,126 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69 0D                               baozi. }

示例2 监控文件新增数据到控制台

$ vim example2.conf

agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F ~/data/data.log
agent1.sources.source1.shell = /bin/sh -c

agent1.channels.channel1.type = memory

agent1.sinks.sink1.type = logger

agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

运行

$ flume-ng agent \
 --name agent1 \
 --conf $FLUME_HOME/conf \
 --conf-file /home/user000/confs/flume_conf/example2.conf \
 -Dflume.root.logger=INFO,console

测试

$ echo hello >> ~/data/data.log
$ echo baozi >> ~/data/data.log
2018-10-27 10:24:40,884 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
2018-10-27 10:24:52,548 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69                                  baozi }

示例3 从一台服务器到另一台

$ vim exec-memory-avro.conf

exec-memory-avro.sources = exec-source
exec-memory-avro.channels = memory-channel
exec-memory-avro.sinks = avro-sink

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F ~/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = host000
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel

$ vim avro-memory-logger.conf

avro-memory-logger.sources = avro-source
avro-memory-logger.channels = memory-channel
avro-memory-logger.sinks = logger-sink

avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = host000
avro-memory-logger.sources.avro-source.port = 44444

avro-memory-logger.channels.memory-channel.type = memory

avro-memory-logger.sinks.logger-sink.type = logger

avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel

运行:先启动 avro-memory-logger.conf再启动exec-memory-avro.conf

$ flume-ng agent \
--name avro-memory-logger \
--conf $FLUME_HOME/conf \
 --conf-file  /home/user000/confs/flume_conf/avro-memory-logger.conf  \
-Dflume.root.logger=INFO,console

$ flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
 --conf-file  /home/user000/confs/flume_conf/exec-memory-avro.conf  \
-Dflume.root.logger=INFO,console

测试:之前写到data.log里的数据也一并被打印出来

echo hello2 >> ~/data/data.log
echo baozi2 >> ~/data/data.log

2018-10-27 11:14:25,841 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
2018-10-27 11:14:25,841 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69                                  baozi }
2018-10-27 11:15:06,146 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 32                               hello2 }
2018-10-27 11:15:10,148 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 62 61 6F 7A 69 32                               baozi2 }

示例4 Flume 与Kafka整合示例

Flume相当于生产者,发送数据给Kafka,Kafka启动消费者。
Kafka的各项配置在Flume不同版本中可能不一致。
$ vim exec-memory-avro.conf

exec-memory-avro.sources = exec-source
exec-memory-avro.channels = memory-channel
exec-memory-avro.sinks = avro-sink

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F ~/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = host000
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel

$ vim avro-memory-kafka.conf

avro-memory-kafka.sources = avro-source
avro-memory-kafka.channels = memory-channel
avro-memory-kafka.sinks = kafka-sink

avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = host000
avro-memory-kafka.sources.avro-source.port = 44444

avro-memory-kafka.channels.memory-channel.type = memory

avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = host000:9092
avro-memory-kafka.sinks.kafka-sink.topic = hello_test
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1

avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

运行

$ flume-ng agent \
--name avro-memory-kafka \
--conf $FLUME_HOME/conf \
 --conf-file  /home/user000/confs/flume_conf/avro-memory-kafka.conf  \
-Dflume.root.logger=INFO,console

$ flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
 --conf-file  /home/user000/confs/flume_conf/exec-memory-avro.conf  \
-Dflume.root.logger=INFO,console

$ kafka-console-consumer.sh --zookeeper host000:2181 --topic hello_test --from-beginning

测试

$ echo aaa >> ~/data/data.log
$ echo bbb >> ~/data/data.log
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 面对以上的问题,我们如何将这些日志移动到hdfs集群上尼???? 第一种方案:使用shell脚本cp 文件,然后通...
    机灵鬼鬼阅读 1,453评论 1 1
  • Flume 日志收集系统 #安装 在node01下 在hadoop用户下 cd ~/apps 在此路径下 解压后是...
    快乐大数据阅读 609评论 0 0
  • Flume是一个分布式的、高可靠的、高可用的用于高效收集、聚合、移动大量日志数据的框架(Flume is a di...
    Sx_Ren阅读 3,802评论 0 1
  • flume 有三大组件source 、channel和sink,各个组件之间都可以相互组合使用,各组件间耦合度低。...
    三万_chenbing阅读 5,897评论 0 5
  • 那时候天刚好下着雨,空荡荡的街头在阴冷的雨天格外寂寞。雨打在地上,也打在每个人的心里。路上闪烁着不同色彩的灯火,每...
    sjhxz阅读 210评论 0 0