Flume基础案例

核心概念

  • Agent:使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
    Client:生产数据,运行在一个独立的线程。
  • Source:从Client专门用来收集数据,传递给Channel,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
  • Sink:从Channel收集数据,运行在一个独立线程,sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。
  • Channel:连接 sources 和 sinks ,这个有点像一个队列,source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
  • Events:可以是日志记录、 avro 对象等。

Agent的概念

Flume以agent为最小的独立运行单位。一个agent就是一个JVM,agent本身是一个Java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。

单agent由Source、Sink和Channel三大组件构成,类似生产者、仓库、消费者的架构.如下图:

[站外图片上传中...(image-64c038-1541939331883)]

a single node flume

NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。

在/home/hadoop/script/flume下新建配置文件a-single-node.conf,配置文件如下:

#a-single-node.conf : A single node flume configuration


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

保存之后运行,执行命令:

flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/a-single-node.conf \
-Dflume.root.logger=INFO,console

参数说明:

-n 指定agent名称(与配置文件中代理的名字相同)

-c 指定flume中配置文件的目录

-f 指定配置文件

-Dflume.root.logger=DEBUG,console 设置日志等级

通过telnet监听端口:

telnet localhost 44444

输入任意数据,在flume中可以看到输出:

18/08/02 15:25:29 INFO sink.LoggerSink: Event: { headers:{} body: 61 62 63 0D                                     abc. }

采集指定文件数据存入到hdfs

source-channel-sink :exec - memory - hdfs

配置文件如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
# 监听文件路径
a1.sources.r1.command = tail -F /home/hadoop/data/flume/logs/access.log


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Describe the sink
a1.sinks.k1.type = hdfs
# hdfs路径
a1.sinks.k1.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/tail
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.batchSize=10
a1.sinks.k1.hdfs.useLocalTimeStamp=true

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

缺少这个配置的时候

a1.sinks.k1.hdfs.useLocalTimeStamp=true

会出现异常

java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

原因是因为写入到hfds时使用到了时间戳来区分目录结构,flume的消息组件event在接受到之后在header中没有发现时间戳参数,导致该错误发生,有三种方法可以解决这个错误:

  1. agent1.sources.source1.interceptors = t1
    agent1.sources.source1.interceptors.t1.type = timestamp 为source添加拦截,每条event头中加入时间戳;(效率会慢一些)
  2. agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 为sink指定该参数为true (如果客户端和flume集群时间不一致数据时间会不准确)
  3. 在向source发送event时,将时间戳参数添加到event的header中即可,header是一个map,添加时mapkey为timestamp(推荐使用)

采集指定文件夹的内容到控制台

source - channel - sink :spooling - memory - logger
目录下的文件如果已经读取完毕会增加后缀.COMPELETE,且文件名不能相同
配置文件如下:


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir=/home/hadoop/temp
a1.sources.r1.fileHeader=true

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Describe the sink
a1.sinks.k1.type = logger

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

小文件问题

案例:采集指定文件夹内容到hdfs taildir - memory - hdfs
配置文件如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/hadoop/temp/position/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/home/hadoop/temp/flume/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2=/home/hadoop/temp/flume/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.hdfs.batchSize=15
#关键参数  三个是或的关系 满足一个就会roll
a1.sinks.k1.hdfs.rollInterval= 0  #按时间 0为参数不生效
a1.sinks.k1.hdfs.rollSize= 500    #按大小 0为参数不生效
a1.sinks.k1.hdfs.rollCount = 0    #按记录数 0为参数不生效

a1.sinks.k1.hdfs.useLocalTimeStamp=true

# Bind the source and sink to the channel
a1.sources.r1.channels = 

多个channel

image

一个channel对应输出到日志的sink,另外一个对应写入到Hdfs的sink
配置文件如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/hadoop/temp/position/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/home/hadoop/temp/flume/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2=/home/hadoop/temp/flume/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true


# Use a channel which buffers events in memory
a1.channels.c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


a1.channels.c2.type=memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Describe the sink
a1.sinks.k1.type = logger


a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0  
a1.sinks.k2.hdfs.rollSize= 0  
a1.sinks.k2.hdfs.rollCount = 100 

# Bind the source and sink to the channel
a1.sources.r1.channels =c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

sink processor

主要包含两种方式:failover和load_balance

  • failover:Failover Sink Processor维护了一个sink的优先级列表,具有故障转移的功能,每个sink都有一个权值用于表示自己的优先级,优先级值高Sink会更早被激活。值越大,优先级越高。表示优先级的权值不能相同。
  • load_balance:按照一定的算法选择sink输出到指定地方,如果在文件输出量很大的情况下,负载均衡还是很有必要的,通过多个通道输出缓解输出压力,flume内置的负载均衡的算法默认是round robin(轮询算法),还有一个random(随机算法)。

failover 配置如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10 
a1.sinkgroups.g1.processor.maxpenalty = 10000

a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1


a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9001/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0  
a1.sinks.k2.hdfs.rollSize= 0  
a1.sinks.k2.hdfs.rollCount = 10
a1.sinks.k2.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1

load_balance配置如下(更改负载均衡策略进行测试):

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random


a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1


a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0
a1.sinks.k2.hdfs.rollSize= 0
a1.sinks.k2.hdfs.rollCount = 10
a1.sinks.k2.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1

avro source和avro sink

该案例需要用到两个agent,一个作为数据源:产生数据,一个作为数据接收端:接收数据

数据源agent配置如下:

# Name the components on this agent
avro-source-agent.sources = exec-source
avro-source-agent.sinks = avro-sink
avro-source-agent.channels = avro-memory-channel

# Describe/configure the source

avro-source-agent.sources.exec-source.type = exec
avro-source-agent.sources.exec-source.command = tail -F /home/hadoop/data/flume/logs/access.log


# Use a channel which buffers events in memory
avro-source-agent.channels.avro-memory-channel.type = memory
avro-source-agent.channels.avro-memory-channel.capacity = 1000
avro-source-agent.channels.avro-memory-channel.transactionCapacity = 100


avro-source-agent.sinks.avro-sink.type=avro
avro-source-agent.sinks.avro-sink.hostname=hadoop002
avro-source-agent.sinks.avro-sink.port=44444

# Bind the source and sink to the channel
avro-source-agent.sources.exec-source.channels = avro-memory-channel
avro-source-agent.sinks.avro-sink.channel = avro-memory-channel

数据接收端配置如下:

# Name the components on this agent
avro-sink-agent.sources = avro-source
avro-sink-agent.sinks = avro-logger
avro-sink-agent.channels = avro-memory-channel

# Describe/configure the source
avro-sink-agent.sources.avro-source.type = avro
avro-sink-agent.sources.avro-source.bind = hadoop002
avro-sink-agent.sources.avro-source.port = 44444


# Use a channel which buffers events in memory
avro-sink-agent.channels.avro-memory-channel.type = memory
avro-sink-agent.channels.avro-memory-channel.capacity = 1000
avro-sink-agent.channels.avro-memory-channel.transactionCapacity = 100


avro-sink-agent.sinks.avro-logger.type=logger

# Bind the source and sink to the channel
avro-sink-agent.sources.avro-source.channels = avro-memory-channel
avro-sink-agent.sinks.avro-logger.channel = avro-memory-channel

依次启动avro-sink-agent,和avro-source-agent

flume-ng agent \
--name avro-sink-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/avro-sink.conf \
-Dflume.root.logger=INFO,console 


flume-ng agent \
--name avro-source-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/avro-source.conf \
-Dflume.root.logger=INFO,console
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容

  • 博客原文 翻译作品,水平有限,如有错误,烦请留言指正。原文请见 官网英文文档 引言 概述 Apache Flume...
    rabbitGYK阅读 11,469评论 13 34
  • 介绍 概述 Apache Flume是为有效收集聚合和移动大量来自不同源到中心数据存储而设计的可分布,可靠的,可用...
    ximengchj阅读 3,522评论 0 13
  • Flume的官网地址:http://flume.apache.org/FlumeUserGuide.html#ex...
    24格的世界阅读 906评论 0 1
  • flume 有三大组件source 、channel和sink,各个组件之间都可以相互组合使用,各组件间耦合度低。...
    三万_chenbing阅读 5,780评论 0 5
  • 题目1: 轮播的实现原理是怎样的?如果让你来实现,你会抽象出哪些函数(or接口)供使用?(比如 play()) 例...
    _达斯基阅读 344评论 0 0