为什么选择Flume 1.7版本呢?
Flume1.7有了很多新功能,而且对Kafka支持更加全面。其中一个TAILDIR source可以自动监控目录下所有文件变化,在我做的项目中用的就是这个TAILDIR Source的使用。
1. 安装
下载地址:apache-flume-1.7.0
下载完成后,在/opt/ebohailife/目录下上传、解压
[ebohailife@e-bohailife-dat002 ~]$ tar -zxvf apache-flume-1.7.0-bin.tar.gz
- 检测安装是否成功:/opt/ebohailife/flume/apache-flume-1.7.0-bin/bin/flume-ng version
打印以下信息,则表示安装成功了
[ebohailife@e-bohailife-dat002 conf]$ ../bin/flume-ng version
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523
2. 开发
- 更改Flume配置文件
[ebohailife@e-bohailife-dat002 conf]$ echo $JAVA_HOME
/opt/ebohailife/jdk1.7.0_80
[ebohailife@e-bohailife-dat002 conf]$ cp flume-env.sh.template flume-env.sh
vi flume-env.sh # 修改flume-env.sh中JAVA_HOME变量的值
- 创建Flume任务的配置文件 taildir_behavior.conf
[ebohailife@e-bohailife-uat002 conf]$ vi taildir_behavior.conf
#agent命名为a1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/ebohailife/logs/rest/.*behavior.*
a1.sources.r1.positionFile = /tmp/flume/taildir_behavior_position.json
a1.sources.r1.fileHeader = false
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
a1.sinks.k1.kafka.topic = behaviorlog_r1p3
a1.sinks.k1.kafka.producer.acks= 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.flumeBatchSize = 100
# a1.sinks.k1.topic = behaviorlog_r1p3
# Kafka集群Broker列表,以下属性在1.7以上版本已弃用
# a1.sinks.k1.brokerList = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
# a1.sinks.k1.requiredAcks = 1
# a1.sinks.k1.batchSize = 100
# Use a channel which buffers events in file
a1.channels.c1.type = file
#检查点文件存储路径
a1.channels.c1.checkpointDir = /opt/ebohailife/apache-flume-1.7.0-bin/checkpoint
#消息数据存储路径
a1.channels.c1.dataDirs = /opt/ebohailife/apache-flume-1.7.0-bin/data
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 创建Flume任务的配置文件 taildir_phoneinfo.conf
[ebohailife@e-bohailife-uat002 conf]$ vi taildir_phoneinfo.conf
#agent命名为a2
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = TAILDIR
a2.sources.r2.filegroups = f1
a2.sources.r2.filegroups.f1 = /opt/ebohailife/logs/rest/.*phoneinfo.*
a2.sources.r2.positionFile = /tmp/flume/taildir_phoneinfo_position.json
a2.sources.r2.fileHeader = false
# Describe the sink
a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.k2.kafka.bootstrap.servers = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
a2.sinks.k2.kafka.topic = phoneinfolog_r1p3
a2.sinks.k2.kafka.producer.acks= 1
a2.sinks.k2.kafka.producer.linger.ms = 1
a2.sinks.k2.flumeBatchSize = 100
# a2.sinks.k2.topic = behaviorlog_r1p3
# Kafka集群Broker列表,以下属性在1.7以上版本已弃用
# a2.sinks.k2.brokerList = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
# a2.sinks.k2.requiredAcks = 1
# a2.sinks.k2.batchSize = 100
# Use a channel which buffers events in file
a2.channels.c2.type = file
#检查点文件存储路径
a2.channels.c2.checkpointDir = /opt/ebohailife/apache-flume-1.7.0-bin/checkpoint
#消息数据存储路径
a2.channels.c2.dataDirs = /opt/ebohailife/apache-flume-1.7.0-bin/data
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
- 创建Kafka Topic
# 创建topic behaviorlog_r1p3
./kafka-topics.sh --zookeeper 10.104.0.227:2181 --create --topic behaviorlog_r1p3 --partition 3 --replication-factor 1
# 创建topic phoneinfolog_r1p3
./kafka-topics.sh --zookeeper 10.104.0.227:2181 --create --topic phoneinfolog_r1p3 --partition 3 --replication-factor 1
- 查看topic
./kafka-topics.sh --list --zookeeper 10.104.0.227:2181
- 启动Flume NG,后台运行
./flume-ng agent -c /opt/ebohailife/apache-flume-1.7.0-bin/conf -f /opt/ebohailife/apache-flume-1.7.0-bin/conf/taildir_behavior.conf -n a1 >/dev/null 2>&1 &
./flume-ng agent -c /opt/ebohailife/apache-flume-1.7.0-bin/conf -f /opt/ebohailife/apache-flume-1.7.0-bin/conf/taildir_phoneinfo.conf -n a2 >/dev/null 2>&1 &
# -Dflume.root.logger=INFO,console
- 启动Kafka Consumer,后台运行
# 启动behaviorlog_r1p3
./kafka-console-consumer.sh --topic behaviorlog_r1p3 --bootstrap-server 10.104.0.226:9092 >/dev/null 2>&1 &
# 启动phoneinfolog_r1p3
./kafka-console-consumer.sh --topic phoneinfolog_r1p3 --bootstrap-server 10.104.0.226:9092 >/dev/null 2>&1 &
- 创建日志收集流
# 创建phoneinfo_log流
CREATE STREAM phoneinfo_log_stream(phoneinfo STRING, tmp1 STRING, ip STRING, tmp2 STRING, phone_model STRING, tmp3 STRING, phone_version STRING, tmp4 STRING, area STRING, tmp5 STRING, start_time TIMESTAMP, tmp6 STRING, KDDI STRING, tmp7 STRING, app_source STRING, tmp8 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' TBLPROPERTIES("topic"="phoneinfolog_r1p3" ,"kafka.zookeeper"="10.104.0.227:2181","kafka.broker.list"="10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092");
# 创建behavior_log流
CREATE STREAM behavior_log_stream(eventid STRING, tmp1 STRING, ip STRING, tmp2 STRING, user_id STRING, tmp3 STRING, user_name STRING, tmp4 STRING, in_time TIMESTAMP, tmp5 STRING, operate_time TIMESTAMP, tmp6 STRING, phone_unicode STRING, tmp7 STRING, trigger_count STRING, tmp8 STRING, diff_in_oper INT, tmp9 STRING, tel_no STRING, tmp10 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' TBLPROPERTIES("topic"="behaviorlog_r1p3" ,"kafka.zookeeper"="10.104.0.227:2181","kafka.broker.list"="10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092");
- 创建日志表
# 创建phoneinfo_log表
CREATE TABLE phoneinfo_log_tab(phone STRING, ip STRING, phone_model STRING, phone_version STRING, area STRING, start_time TIMESTAMP, KDDI STRING, app_source STRING);
# 创建behavior_log表
CREATE TABLE behavior_log_tab(eventid STRING, ip STRING, user_id STRING, user_name STRING, in_time TIMESTAMP, operate_time TIMESTAMP,
phone_unicode STRING, trigger_count STRING, diff_in_oper INT, tel_no STRING);
为防止小文件过多,进行以下设置:
set streamsql.enable.hdfs.batchflush = true # 打开批量flush开关
set streamsql.hdfs.batchflush.size = <num> #设置一次flush的消息个数,消息量达到该参数时flush一次
set [streamsql.hdfs.batchflush.interval.ms](http://streamsql.hdfs.batchflush.interval.ms) = <num> #设置每过多长时间(单位为毫秒)flush一次
# 需满足 batchflush.size 和 [batchflush.interval.ms](http://batchflush.interval.ms) 其中的一个条件即会触发一次flush
- 启动日志流
# 触发phoneinfo_log_stream流计算
INSERT INTO phoneinfo_log_tab SELECT phoneinfo, ip, phone_model, phone_version, area, start_time, KDDI, app_source FROM phoneinfo_log_stream;
# 触发behavior_log_stream流计算
INSERT INTO behavior_log_tab SELECT eventid, ip, user_id, user_name, in_time, operate_time, phone_unicode, trigger_count, diff_in_oper, tel_no FROM behavior_log_stream;