Flume1.7+Kafka+Streaming集成开发

为什么选择Flume 1.7版本呢?
Flume1.7有了很多新功能,而且对Kafka支持更加全面。其中一个TAILDIR source可以自动监控目录下所有文件变化,在我做的项目中用的就是这个TAILDIR Source的使用。

1. 安装

  • 下载地址:apache-flume-1.7.0

  • 下载完成后,在/opt/ebohailife/目录下上传、解压


[ebohailife@e-bohailife-dat002 ~]$ tar -zxvf apache-flume-1.7.0-bin.tar.gz

  • 检测安装是否成功:/opt/ebohailife/flume/apache-flume-1.7.0-bin/bin/flume-ng version

打印以下信息,则表示安装成功了


[ebohailife@e-bohailife-dat002 conf]$ ../bin/flume-ng version

Flume 1.7.0

Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git

Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707

Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016

From source with checksum 0d21b3ffdc55a07e1d08875872c00523

2. 开发

- 更改Flume配置文件


[ebohailife@e-bohailife-dat002 conf]$ echo $JAVA_HOME 

/opt/ebohailife/jdk1.7.0_80 

[ebohailife@e-bohailife-dat002 conf]$ cp flume-env.sh.template flume-env.sh

vi flume-env.sh # 修改flume-env.sh中JAVA_HOME变量的值

- 创建Flume任务的配置文件 taildir_behavior.conf


[ebohailife@e-bohailife-uat002 conf]$ vi taildir_behavior.conf 

#agent命名为a1

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = TAILDIR

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /opt/ebohailife/logs/rest/.*behavior.*

a1.sources.r1.positionFile = /tmp/flume/taildir_behavior_position.json

a1.sources.r1.fileHeader = false

# Describe the sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.bootstrap.servers = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092

a1.sinks.k1.kafka.topic = behaviorlog_r1p3

a1.sinks.k1.kafka.producer.acks= 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.k1.flumeBatchSize = 100

# a1.sinks.k1.topic = behaviorlog_r1p3

# Kafka集群Broker列表,以下属性在1.7以上版本已弃用

# a1.sinks.k1.brokerList = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092

# a1.sinks.k1.requiredAcks = 1

# a1.sinks.k1.batchSize = 100

# Use a channel which buffers events in file

a1.channels.c1.type = file

#检查点文件存储路径

a1.channels.c1.checkpointDir = /opt/ebohailife/apache-flume-1.7.0-bin/checkpoint

#消息数据存储路径

a1.channels.c1.dataDirs = /opt/ebohailife/apache-flume-1.7.0-bin/data

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

- 创建Flume任务的配置文件 taildir_phoneinfo.conf


[ebohailife@e-bohailife-uat002 conf]$ vi taildir_phoneinfo.conf

#agent命名为a2

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = TAILDIR

a2.sources.r2.filegroups = f1

a2.sources.r2.filegroups.f1 = /opt/ebohailife/logs/rest/.*phoneinfo.*

a2.sources.r2.positionFile = /tmp/flume/taildir_phoneinfo_position.json

a2.sources.r2.fileHeader = false

# Describe the sink

a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink

a2.sinks.k2.kafka.bootstrap.servers = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092

a2.sinks.k2.kafka.topic = phoneinfolog_r1p3

a2.sinks.k2.kafka.producer.acks= 1

a2.sinks.k2.kafka.producer.linger.ms = 1

a2.sinks.k2.flumeBatchSize = 100

# a2.sinks.k2.topic = behaviorlog_r1p3

# Kafka集群Broker列表,以下属性在1.7以上版本已弃用

# a2.sinks.k2.brokerList = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092

# a2.sinks.k2.requiredAcks = 1

# a2.sinks.k2.batchSize = 100

# Use a channel which buffers events in file

a2.channels.c2.type = file

#检查点文件存储路径

a2.channels.c2.checkpointDir = /opt/ebohailife/apache-flume-1.7.0-bin/checkpoint

#消息数据存储路径

a2.channels.c2.dataDirs = /opt/ebohailife/apache-flume-1.7.0-bin/data

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

- 创建Kafka Topic


# 创建topic behaviorlog_r1p3 

./kafka-topics.sh --zookeeper 10.104.0.227:2181 --create --topic behaviorlog_r1p3 --partition 3 --replication-factor 1

# 创建topic phoneinfolog_r1p3

./kafka-topics.sh --zookeeper 10.104.0.227:2181 --create --topic phoneinfolog_r1p3 --partition 3 --replication-factor 1

- 查看topic


./kafka-topics.sh  --list --zookeeper 10.104.0.227:2181

- 启动Flume NG,后台运行


./flume-ng agent -c /opt/ebohailife/apache-flume-1.7.0-bin/conf -f /opt/ebohailife/apache-flume-1.7.0-bin/conf/taildir_behavior.conf  -n a1  >/dev/null 2>&1 &

./flume-ng agent -c /opt/ebohailife/apache-flume-1.7.0-bin/conf -f /opt/ebohailife/apache-flume-1.7.0-bin/conf/taildir_phoneinfo.conf  -n a2  >/dev/null 2>&1 & 

#  -Dflume.root.logger=INFO,console 

- 启动Kafka Consumer,后台运行


# 启动behaviorlog_r1p3

./kafka-console-consumer.sh --topic behaviorlog_r1p3 --bootstrap-server 10.104.0.226:9092 >/dev/null 2>&1 &

 # 启动phoneinfolog_r1p3

./kafka-console-consumer.sh --topic phoneinfolog_r1p3 --bootstrap-server 10.104.0.226:9092 >/dev/null 2>&1 & 

- 创建日志收集流


# 创建phoneinfo_log流

CREATE STREAM phoneinfo_log_stream(phoneinfo STRING, tmp1 STRING, ip STRING, tmp2 STRING, phone_model STRING, tmp3 STRING,  phone_version STRING, tmp4 STRING,  area STRING, tmp5 STRING, start_time TIMESTAMP, tmp6 STRING, KDDI STRING, tmp7 STRING, app_source STRING, tmp8 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' TBLPROPERTIES("topic"="phoneinfolog_r1p3" ,"kafka.zookeeper"="10.104.0.227:2181","kafka.broker.list"="10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092"); 

# 创建behavior_log流

CREATE STREAM behavior_log_stream(eventid STRING, tmp1 STRING, ip STRING, tmp2 STRING, user_id STRING, tmp3 STRING,  user_name STRING, tmp4 STRING,  in_time TIMESTAMP, tmp5 STRING, operate_time TIMESTAMP, tmp6 STRING, phone_unicode STRING, tmp7 STRING, trigger_count STRING, tmp8 STRING, diff_in_oper INT, tmp9 STRING, tel_no STRING, tmp10 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' TBLPROPERTIES("topic"="behaviorlog_r1p3" ,"kafka.zookeeper"="10.104.0.227:2181","kafka.broker.list"="10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092"); 

- 创建日志表


# 创建phoneinfo_log表

CREATE TABLE phoneinfo_log_tab(phone STRING, ip STRING, phone_model STRING, phone_version STRING, area STRING, start_time TIMESTAMP, KDDI STRING, app_source STRING);

# 创建behavior_log表

CREATE TABLE behavior_log_tab(eventid STRING, ip STRING, user_id STRING, user_name STRING,  in_time TIMESTAMP,  operate_time TIMESTAMP,  

phone_unicode STRING, trigger_count STRING, diff_in_oper INT, tel_no STRING);

为防止小文件过多,进行以下设置:

set streamsql.enable.hdfs.batchflush = true # 打开批量flush开关
set streamsql.hdfs.batchflush.size = <num> #设置一次flush的消息个数,消息量达到该参数时flush一次
set [streamsql.hdfs.batchflush.interval.ms](http://streamsql.hdfs.batchflush.interval.ms) = <num> #设置每过多长时间(单位为毫秒)flush一次 

# 需满足 batchflush.size 和 [batchflush.interval.ms](http://batchflush.interval.ms) 其中的一个条件即会触发一次flush

- 启动日志流


# 触发phoneinfo_log_stream流计算

INSERT INTO phoneinfo_log_tab SELECT phoneinfo, ip, phone_model, phone_version, area, start_time, KDDI, app_source FROM phoneinfo_log_stream; 

# 触发behavior_log_stream流计算 

INSERT INTO behavior_log_tab SELECT eventid, ip, user_id, user_name,  in_time,  operate_time, phone_unicode, trigger_count, diff_in_oper, tel_no FROM behavior_log_stream;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容