数据采集工具Flume实践

Flume简介

Flume 是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送 方, 用于收集数据,同时,Flume 提供对数据的简单处理,并写到各种数据接收方的能力。

1、 Apache Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,和 Sqoop 同 属于数据采集系统组件,但是 Sqoop 用来采集关系型数据库数据,而 Flume 用 来采集流动型数据。

2、 Flume 名字来源于原始的近乎实时的日志数据采集工具,现在被广泛用于任何流事件数 据的采集, 它支持从很多数据源聚合数据到 HDFS。

3、 一般的采集需求,通过对 flume 的简单配置即可实现。Flume 针对特殊场景也具备良好 的自定义 扩展能力,因此,flume 可以适用于大部分的日常数据采集场景 。

4、 Flume 最初由 Cloudera 开发,在 2011 年贡献给了 Apache 基金会,2012 年变成了 Apache 的顶 级项目。Flume OG(Original Generation)是 Flume 最初版本,后升级换代成 Flume NG(Next/New Generation)。

5、 Flume 的优势:可横向扩展、延展性、可靠性。

Flume体系结构/核心组件

介绍

Flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字 节 数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事 件后会进 行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。你可以把 Channel 看作是一个 缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久化日志或 者把事件推向另一个 Source。

核心组件

Agent:能够独立执行一个数据收集任务的JVM进程,Flume 以 Agent 为最小的独立运行单位,一个 Agent 就是一个 JVM,每台机器运行一个Agent。单 Agent 由 Source、Sink 和 Channel 三大组件构,可以包含多个。

Client:生产数据运行在一个独立的线程

Source:一个Agent中的用来跟数据源对接的服务,从Client收集数据,传递给Channel

Channel:Agent内部的一个中转组件

Sink:一个Agent中的用来跟目的地进行对接的服务,从Channel收集数据

Event:在Source、Channel、Sink中间进行流转的消息的封装对象

常见的source分类:
avro source:接收网络端口中的数据
exec source:检测新增内容。tail -f 监听某个文件新增加的内容。
spooldir source:监控文件夹的,如果这个文件夹中的数据变化,就可以采集
customer source:自定义
常见的channel分类:
memory:内存,快,但是不安全
file:安全,但是效率低
jdbc:使用数据库进行保存
常见的sink分类:
loggerSink:做测试用
HDFSSink:离线数据的sink
KafkaSink:流式数据使用

Flume三大核心组件

Event
Event 是 Flume 数据传输的基本单元。   
Flume 以事件的形式将数据从源头传送到最终的目的地。   
Event 由可选的 header 和载有数据的一个 byte array 构成。    
      载有的数据度 flume 是不透明的。   
      Header 是容纳了 key-value 字符串对的无序集合,key 在集合内是唯一的。   
      Header 可以在上下文路由中使用扩展。
Client
Client 是一个将原始 log 包装成 events 并且发送他们到一个或多个 agent 的实体   
目的是从数据源系统中解耦 Flume   
在 Flume 的拓扑结构中不是必须的
Agent
一个 Agent 包含 source,channel,sink 和其他组件。   
它利用这些组件将 events 从一个节点传输到另一个节点或最终目的地。   
Agent 是 flume 流的基础部分。   
Flume为这些组件提供了配置,声明周期管理,监控支持。  
Agent 之 Source
Source 负责接收 event 或通过特殊机制产生 event,并将 events 批量的放到一个或多个  
包含 event 驱动和轮询两种类型  
不同类型的 Source    
     与系统集成的 Source:Syslog,Netcat,监测目录池
     自动生成事件的 Source:Exec   用于 Agent 和 Agent
     之间通信的 IPC source:avro,thrift  
Source 必须至少和一个 channel 关联   
Agent 之 Channel
Channel 位于 Source 和 Sink 之间,用于缓存进来的 event  
当 sink 成功的将 event 发送到下一个的 channel 或最终目的,event 从 channel 删除   
不同的 channel 提供的持久化水平也是不一样的    
    Memory Channel:volatile(不稳定的)   
    File Channel:基于 WAL(预写式日志 Write-Ahead Logging)实现    
    JDBC Channel:基于嵌入式 database 实现   
Channel 支持事务,提供较弱的顺序保证   
可以和任何数量的 source 和 sink 工作
Agent 之 Sink
Sink 负责将 event 传输到下一级或最终目的地,成功后将 event 从 channel 移除  
不同类型的 sink ,比如 HDFS,HBase 



Flume三大案例:

一、官方案例监控端口数据案例

1、在flume的目录下面创建文件夹

cd /home/bigdata/apps/apache-flume-1.7.0-bin/

mkdir job
cd job

2、定义配置文件telnet-logger.conf

vim telnet-logger.conf

添加内容如下:

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

解释

# example.conf: A single-node Flume configuration
# a1:表示agent的名称
# Name the components on this agent
a1.sources = r1  #r1:表示a1的输入源
a1.sinks = k1  #k1:表示a1输出目的地
a1.channels = c1 #c1:表示a1的缓存区

# Describe/configure the source
a1.sources.r1.type = netcat #表示a的输入的数据源的类型是netcat端口类型
a1.sources.r1.bind = localhost #表示a1监听的主机
a1.sources.r1.port = 44444 #表示a1监听的端口号

# Describe the sink
a1.sinks.k1.type = logger #a1的输出目的地是控制台的logger类型

# Use a channel which buffers events in memory
a1.channels.c1.type = memory #a1的channel类型的memory
a1.channels.c1.capacity = 1000 #channel的总容量是1000个event
a1.channels.c1.transactionCapacity = 100 #传输的时候收集够了100个event在提交

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 #将r1和c1连接起来
a1.sinks.k1.channel = c1 #将k1和c1连接起来

3、先开启flume监听端口

退到flume目录

cd /home/bigdata/apps/apache-flume-1.7.0-bin/

官方样例:

bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

实际操作:

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console
参数说明:
    --conf conf: 表示配置文件存储在conf这个目录
    --name a1: 表示给agent起名为a1
    --conf-file job/telnet-logger.conf: flume本次启动读取的配置文件是在job文件夹下面的telnet-logger.conf文件
    -Dflume.root.logger=INFO,console 打印日志

4、使用telnet测试端口

yum -y install telnet
telnet localhost 44444

5、发送命令测试即可


二、监控目录中的文件到HDFS

1、创建配置文件dir-hdfs.conf

在job目录下面

vim dir-hdfs.conf

添加下面的内容:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /software/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop0:8020/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

解释

a3.sources = r3 #定义source
a3.sinks = k3 #定义sink
a3.channels = c3 #定义channel

# Describe/configure the source
a3.sources.r3.type = spooldir #定义source的类型是目录
a3.sources.r3.spoolDir =/home/bigdata/data/flumedata  #监控的本地目录
a3.sources.r3.fileSuffix = .COMPLETED #上传完了的后缀
a3.sources.r3.fileHeader = true #是否有文件头
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)  #忽略以tmp结尾的

# Describe the sink
a3.sinks.k3.type = hdfs #sink的类型hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata02:9000/flume/upload/%Y%m%d/%H  #本地文件上传到hdfs上面的路径
a3.sinks.k3.hdfs.filePrefix = upload- #上传到hdfs上面的文件的前缀
a3.sinks.k3.hdfs.round = true #是否按照时间滚动生成文件
a3.sinks.k3.hdfs.roundValue = 1 #多长时间生成文件
a3.sinks.k3.hdfs.roundUnit = hour #单位
a3.sinks.k3.hdfs.useLocalTimeStamp = true #是否使用本地时间戳
a3.sinks.k3.hdfs.batchSize = 100 #到100个event刷写到hdfs
a3.sinks.k3.hdfs.fileType = DataStream #文件类型
a3.sinks.k3.hdfs.rollInterval = 600 #多久生成新文件
a3.sinks.k3.hdfs.rollSize = 134217700 #多大的时候生成新文件
a3.sinks.k3.hdfs.rollCount = 0 #多少个event生成新文件
a3.sinks.k3.hdfs.minBlockReplicas = 1 #最小副本数

# Use a channel which buffers events in memory
a3.channels.c3.type = memory #第一个案例中有
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3 #第一个案例中有
a3.sinks.k3.channel = c3

2、启动监控目录命令

cd ..
mkdir /home/bigdata/data/flumedata/

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf
cp text3.txt /home/bigdata/data/flumedata/

等600秒查看


新传一个文件立即查看,文件结尾.tmp

三、监控文件到HDFS

1、创建一个自动化文件

vim mydateauto.sh

写入:

#!/bin/bash

while true
do
    echo `date`
    sleep 1
done

然后运行测试:

sh mydateauto.sh 

然后修改配置,将输出的日志追加到某个文件中

#!/bin/bash

while true
do
        echo `date` >> /home/bigdata/data/flumedata/mydate.txt
        sleep 1
done

再次执行

sh mydateauto.sh 

就会在/home/bigdata/data/flumedata/的文件夹下面生成了mydate.txt文件

查看

tail -f /home/bigdata/data/flumedata/mydate.txt 

2、创建配置file-hdfs.conf

vim file-hdfs.conf

添加下面的内容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /home/bigdata/data/flumedata/mydate.txt
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata02:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

解释

# Name the components on this agent
a2.sources = r2 #定义source
a2.sinks = k2 #定义sink
a2.channels = c2 #定义channel

# Describe/configure the source
a2.sources.r2.type = exec #source的类型
a2.sources.r2.command = tail -F /home/bigdata/data/flumedata/mydate.txt #监控的本地文件
a2.sources.r2.shell = /bin/bash -c #执行脚本的绝对路径

# Describe the sink
a2.sinks.k2.type = hdfs 
a2.sinks.k2.hdfs.path = hdfs://bigdata02:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs- #前缀
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

3、启动

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351