Flume简介
Flume 是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送 方, 用于收集数据,同时,Flume 提供对数据的简单处理,并写到各种数据接收方的能力。
1、 Apache Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,和 Sqoop 同 属于数据采集系统组件,但是 Sqoop 用来采集关系型数据库数据,而 Flume 用 来采集流动型数据。
2、 Flume 名字来源于原始的近乎实时的日志数据采集工具,现在被广泛用于任何流事件数 据的采集, 它支持从很多数据源聚合数据到 HDFS。
3、 一般的采集需求,通过对 flume 的简单配置即可实现。Flume 针对特殊场景也具备良好 的自定义 扩展能力,因此,flume 可以适用于大部分的日常数据采集场景 。
4、 Flume 最初由 Cloudera 开发,在 2011 年贡献给了 Apache 基金会,2012 年变成了 Apache 的顶 级项目。Flume OG(Original Generation)是 Flume 最初版本,后升级换代成 Flume NG(Next/New Generation)。
5、 Flume 的优势:可横向扩展、延展性、可靠性。
Flume体系结构/核心组件
介绍
Flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字 节 数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事 件后会进 行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。你可以把 Channel 看作是一个 缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久化日志或 者把事件推向另一个 Source。
核心组件
Agent:能够独立执行一个数据收集任务的JVM进程,Flume 以 Agent 为最小的独立运行单位,一个 Agent 就是一个 JVM,每台机器运行一个Agent。单 Agent 由 Source、Sink 和 Channel 三大组件构,可以包含多个。
Client:生产数据运行在一个独立的线程
Source:一个Agent中的用来跟数据源对接的服务,从Client收集数据,传递给Channel
Channel:Agent内部的一个中转组件
Sink:一个Agent中的用来跟目的地进行对接的服务,从Channel收集数据
Event:在Source、Channel、Sink中间进行流转的消息的封装对象
常见的source分类:
avro source:接收网络端口中的数据
exec source:检测新增内容。tail -f 监听某个文件新增加的内容。
spooldir source:监控文件夹的,如果这个文件夹中的数据变化,就可以采集
customer source:自定义
常见的channel分类:
memory:内存,快,但是不安全
file:安全,但是效率低
jdbc:使用数据库进行保存
常见的sink分类:
loggerSink:做测试用
HDFSSink:离线数据的sink
KafkaSink:流式数据使用
Flume三大核心组件
Event
Event 是 Flume 数据传输的基本单元。
Flume 以事件的形式将数据从源头传送到最终的目的地。
Event 由可选的 header 和载有数据的一个 byte array 构成。
载有的数据度 flume 是不透明的。
Header 是容纳了 key-value 字符串对的无序集合,key 在集合内是唯一的。
Header 可以在上下文路由中使用扩展。
Client
Client 是一个将原始 log 包装成 events 并且发送他们到一个或多个 agent 的实体
目的是从数据源系统中解耦 Flume
在 Flume 的拓扑结构中不是必须的
Agent
一个 Agent 包含 source,channel,sink 和其他组件。
它利用这些组件将 events 从一个节点传输到另一个节点或最终目的地。
Agent 是 flume 流的基础部分。
Flume为这些组件提供了配置,声明周期管理,监控支持。
Agent 之 Source
Source 负责接收 event 或通过特殊机制产生 event,并将 events 批量的放到一个或多个
包含 event 驱动和轮询两种类型
不同类型的 Source
与系统集成的 Source:Syslog,Netcat,监测目录池
自动生成事件的 Source:Exec 用于 Agent 和 Agent
之间通信的 IPC source:avro,thrift
Source 必须至少和一个 channel 关联
Agent 之 Channel
Channel 位于 Source 和 Sink 之间,用于缓存进来的 event
当 sink 成功的将 event 发送到下一个的 channel 或最终目的,event 从 channel 删除
不同的 channel 提供的持久化水平也是不一样的
Memory Channel:volatile(不稳定的)
File Channel:基于 WAL(预写式日志 Write-Ahead Logging)实现
JDBC Channel:基于嵌入式 database 实现
Channel 支持事务,提供较弱的顺序保证
可以和任何数量的 source 和 sink 工作
Agent 之 Sink
Sink 负责将 event 传输到下一级或最终目的地,成功后将 event 从 channel 移除
不同类型的 sink ,比如 HDFS,HBase
Flume三大案例:
一、官方案例监控端口数据案例
1、在flume的目录下面创建文件夹
cd /home/bigdata/apps/apache-flume-1.7.0-bin/
mkdir job
cd job
2、定义配置文件telnet-logger.conf
vim telnet-logger.conf
添加内容如下:
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
解释
# example.conf: A single-node Flume configuration
# a1:表示agent的名称
# Name the components on this agent
a1.sources = r1 #r1:表示a1的输入源
a1.sinks = k1 #k1:表示a1输出目的地
a1.channels = c1 #c1:表示a1的缓存区
# Describe/configure the source
a1.sources.r1.type = netcat #表示a的输入的数据源的类型是netcat端口类型
a1.sources.r1.bind = localhost #表示a1监听的主机
a1.sources.r1.port = 44444 #表示a1监听的端口号
# Describe the sink
a1.sinks.k1.type = logger #a1的输出目的地是控制台的logger类型
# Use a channel which buffers events in memory
a1.channels.c1.type = memory #a1的channel类型的memory
a1.channels.c1.capacity = 1000 #channel的总容量是1000个event
a1.channels.c1.transactionCapacity = 100 #传输的时候收集够了100个event在提交
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 #将r1和c1连接起来
a1.sinks.k1.channel = c1 #将k1和c1连接起来
3、先开启flume监听端口
退到flume目录
cd /home/bigdata/apps/apache-flume-1.7.0-bin/
官方样例:
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
实际操作:
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console
参数说明:
--conf conf: 表示配置文件存储在conf这个目录
--name a1: 表示给agent起名为a1
--conf-file job/telnet-logger.conf: flume本次启动读取的配置文件是在job文件夹下面的telnet-logger.conf文件
-Dflume.root.logger=INFO,console 打印日志
4、使用telnet测试端口
yum -y install telnet
telnet localhost 44444
5、发送命令测试即可
二、监控目录中的文件到HDFS
1、创建配置文件dir-hdfs.conf
在job目录下面
vim dir-hdfs.conf
添加下面的内容:
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /software/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop0:8020/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
解释
a3.sources = r3 #定义source
a3.sinks = k3 #定义sink
a3.channels = c3 #定义channel
# Describe/configure the source
a3.sources.r3.type = spooldir #定义source的类型是目录
a3.sources.r3.spoolDir =/home/bigdata/data/flumedata #监控的本地目录
a3.sources.r3.fileSuffix = .COMPLETED #上传完了的后缀
a3.sources.r3.fileHeader = true #是否有文件头
a3.sources.r3.ignorePattern = ([^ ]*\.tmp) #忽略以tmp结尾的
# Describe the sink
a3.sinks.k3.type = hdfs #sink的类型hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata02:9000/flume/upload/%Y%m%d/%H #本地文件上传到hdfs上面的路径
a3.sinks.k3.hdfs.filePrefix = upload- #上传到hdfs上面的文件的前缀
a3.sinks.k3.hdfs.round = true #是否按照时间滚动生成文件
a3.sinks.k3.hdfs.roundValue = 1 #多长时间生成文件
a3.sinks.k3.hdfs.roundUnit = hour #单位
a3.sinks.k3.hdfs.useLocalTimeStamp = true #是否使用本地时间戳
a3.sinks.k3.hdfs.batchSize = 100 #到100个event刷写到hdfs
a3.sinks.k3.hdfs.fileType = DataStream #文件类型
a3.sinks.k3.hdfs.rollInterval = 600 #多久生成新文件
a3.sinks.k3.hdfs.rollSize = 134217700 #多大的时候生成新文件
a3.sinks.k3.hdfs.rollCount = 0 #多少个event生成新文件
a3.sinks.k3.hdfs.minBlockReplicas = 1 #最小副本数
# Use a channel which buffers events in memory
a3.channels.c3.type = memory #第一个案例中有
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3 #第一个案例中有
a3.sinks.k3.channel = c3
2、启动监控目录命令
cd ..
mkdir /home/bigdata/data/flumedata/
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf
cp text3.txt /home/bigdata/data/flumedata/
等600秒查看
新传一个文件立即查看,文件结尾
.tmp
三、监控文件到HDFS
1、创建一个自动化文件
vim mydateauto.sh
写入:
#!/bin/bash
while true
do
echo `date`
sleep 1
done
然后运行测试:
sh mydateauto.sh
然后修改配置,将输出的日志追加到某个文件中
#!/bin/bash
while true
do
echo `date` >> /home/bigdata/data/flumedata/mydate.txt
sleep 1
done
再次执行
sh mydateauto.sh
就会在/home/bigdata/data/flumedata/
的文件夹下面生成了mydate.txt文件
查看
tail -f /home/bigdata/data/flumedata/mydate.txt
2、创建配置file-hdfs.conf
vim file-hdfs.conf
添加下面的内容:
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /home/bigdata/data/flumedata/mydate.txt
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata02:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
解释
# Name the components on this agent
a2.sources = r2 #定义source
a2.sinks = k2 #定义sink
a2.channels = c2 #定义channel
# Describe/configure the source
a2.sources.r2.type = exec #source的类型
a2.sources.r2.command = tail -F /home/bigdata/data/flumedata/mydate.txt #监控的本地文件
a2.sources.r2.shell = /bin/bash -c #执行脚本的绝对路径
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata02:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs- #前缀
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3、启动
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf