Hadoop Guide--Flume

Flume的官网地址:http://flume.apache.org/FlumeUserGuide.html#exec-source

source,sink,channel:https://www.iteblog.com/archives/948

简介Flume:

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输系统,Flume提供在日志采集系统中定制各类的数据发送方,用于接收数据,同时提供对数据进行简单的处理,并写到各个数据接收方中的过程。


流程结构:

Flume的主要结构分为三部分:source,channel,sink;其中source是源头,负责采集日志;channel是管道,负责传输和暂时的存储;sink为目的地,将采集的日志保存起来;

根据需求对Flume的三部分进行组合,构成一个完整的agent,处理日志的传输;PS:agent为flume处理消息的单位,数据经过agent进行传输。


具体配置:

(启动flume的服务要安装JDK)

source:包括的配置为{Avro,Thrift,Exec,Spooling,...}

sink:包括的配置为{HDFS,Hive,Avro,Thrift,Logger,...}

channel:包括的配置为{Memory,JDBC,Kafka,File,...}

PS:具体的配置参考Flume的官网进行查询;

一个简单的Flume的配置为:


Sample example:

# example.conf: A single-node Flume configuration

# Name the components on this agent

a1.sources = r1 #定义数据的入口

a1.sinks = k1 #定义数据的出口

a1.channels = c1 #定义管道

# Describe/configure the source

a1.sources.r1.type = netcat #定义数据源的类型

a1.sources.r1.bind = localhost #监听地址

a1.sources.r1.port = 44444 #监听端口

# Describe the sink

a1.sinks.k1.type = logger      #定义数据出口,出口类型

# Use a channel which buffers events in memory

a1.channels.c1.type = memory  #临时存储文件的方式

a1.channels.c1.capacity = 1000 #存储大小

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1    #配置连接的方式

a1.sinks.k1.channel = c1


source源的配置:(常见配置)

1.Avro配置:需要配置-channels,type,bind,port四组参数;

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = avro #type为组件的名称,需要填写avro,avro方式使用RPC方式接收,故此需要端口号

a1.sources.r1.channels = c1 #匹配agent创建的channel即可

a1.sources.r1.bind = 0.0.0.0#限制接收数据的发送方ID,0.0.0.0是接收任何IP,不做限制

a1.sources.r1.port = 4141 #接收端口(与flume的客户端的sink相呼应)

Avro的配置的话需要:需要在客户端和接收端都要配置相应的Avro配置才可。

2.exec source配置:可以通过指定操作对日志进行读取,使用exec时需要指定shell命令,对日志进行读取;exce的配置就是设定一个Linux命令,通过这个命令不断的传输数据;我们使用命令去查看tail -F 命令去查看日志的尾部;

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = exce #type为组件的名称,采用命令行的方式进行读取数据

a1.sources.r1.channels = c1 #匹配agent创建的channel即可

a1.sources.r1.command = tail -F /var/log/secure #监控查看日志的尾部进行输出的操作

3.spooling-directory source配置:spo_dir可以读取文件夹里的日志,使用时指定一个文件夹,可以读取文件夹中的所有文件,该文件夹下的文件不可以进行再打开编辑的操作,spool的目录下不可包含相应的子目录;

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = spooldir #type为组件的名称,需要填写spooldir为spool类型

a1.sources.r1.channels = c1 #匹配agent创建的channel即可

a1.sources.r1.spoolDir = /home/hadoop/flume/logs #spool接收的目录信息

4.Syslogtcp source配置:Syslogtcp监听tcp端口作为数据源;

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = syslogtcp #type为组件的名称,需要填写syslogtcp,监听端口号dd

a1.sources.r1.channels = c1 #匹配agent创建的channel即可

a1.sources.r1.port = 5140 #端口号

a1.sources.r1.host = localhost #发送方的IP地址

5.HTTP Source配置:是HTTP POST和GET来发送事件数据,使用Hander程序实现转换;

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = http #type为组件的名称,需要填写http类型

a1.sources.r1.channels = c1 #匹配agent创建的channel即可

a1.sources.r1.handler= org.example.rest.RestHandler

a1.sources.r1.handler.nickname=random.props

sink源的配置:(常见配置)

sink会消费channel中的数据,然后送给外部数据源或者source;

1.Hive sink:hive的数据是只限制了text和JSON数据直接在hive的表中或partition中;

hive的sink的主要参数详解:

type:构建的类型的名字,此处填写hive;

hive.metastore:Hive metastore的URL;

hive.database:Hive database 名字;

hive table:Hive table名字;

使用hive的额话需要进行先创建表的过程:

create table weblogs ( id int , msg string )

partitioned by (continent string, country string, time string)

clustered by (id) into 5 buckets

stored as orc;

a1.channels = c1

a1.channels.c1.type = memory

a1.sinks = k1

a1.sinks.k1.type = hive

a1.sinks.k1.channel = c1

a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083

a1.sinks.k1.hive.database = logsdb

a1.sinks.k1.hive.table = weblogs

a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M

a1.sinks.k1.useLocalTimeStamp = false

a1.sinks.k1.round = true

a1.sinks.k1.roundValue = 10

a1.sinks.k1.roundUnit = minute

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\t"

a1.sinks.k1.serializer.serdeSeparator = '\t'

a1.sinks.k1.serializer.fieldnames =id,,msg

2.Logger Sink:Logs是INFO level,This sink is the only exception which doesn’t require the extra configuration explained in the Logging raw data section.

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

3.Avro Sink:

需要配置hostname和port,需要在两端都要安装Avro的客户端:

type:组件名称,此处填写Avro;

hostname和port:填写地址;

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 10.10.10.10

a1.sinks.k1.port = 4545

4.Kafka Sink:

a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = mytopic

a1.sinks.k1.kafka.bootstrap.servers = localhost:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.ki.kafka.producer.compression.type = snappy

Channel 配置:(常见配置)

1.Memory Chanel:

使用Memory作为中间的缓存;

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 10000

a1.channels.c1.byteCapacityBufferPercentage = 20

a1.channels.c1.byteCapacity = 800000

2.JDBC Channel:

a1.channels = c1

a1.channels.c1.type = jdbc

3.Kafka Channel:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092

a1.channels.channel1.kafka.topic = channel1

a1.channels.channel1.kafka.consumer.group.id = flume-consumer

4.File Channel:

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /mnt/flume/checkpoint

a1.channels.c1.dataDirs = /mnt/flume/data


source源的配置:(常见配置)

1.Avro配置:需要配置-channels,type,bind,port四组参数;

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = avro #type为组件的名称,需要填写avro,avro方式使用RPC方式接收,故此需要端口号

a1.sources.r1.channels = c1 #匹配agent创建的channel即可

a1.sources.r1.bind = 0.0.0.0#限制接收数据的发送方ID,0.0.0.0是接收任何IP,不做限制

a1.sources.r1.port = 4141 #接收端口(与flume的客户端的sink相呼应)

Avro的配置的话需要:需要在客户端和接收端都要配置相应的Avro配置才可。

2.exec source配置:可以通过指定操作对日志进行读取,使用exec时需要指定shell命令,对日志进行读取;exce的配置就是设定一个Linux命令,通过这个命令不断的传输数据;我们使用命令去查看tail -F 命令去查看日志的尾部;

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = exce #type为组件的名称,采用命令行的方式进行读取数据

a1.sources.r1.channels = c1 #匹配agent创建的channel即可

a1.sources.r1.command = tail -F /var/log/secure #监控查看日志的尾部进行输出的操作

3.spooling-directory source配置:spo_dir可以读取文件夹里的日志,使用时指定一个文件夹,可以读取文件夹中的所有文件,该文件夹下的文件不可以进行再打开编辑的操作,spool的目录下不可包含相应的子目录;

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = spooldir #type为组件的名称,需要填写spooldir为spool类型

a1.sources.r1.channels = c1 #匹配agent创建的channel即可

a1.sources.r1.spoolDir = /home/hadoop/flume/logs #spool接收的目录信息

4.Syslogtcp source配置:Syslogtcp监听tcp端口作为数据源;

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = syslogtcp #type为组件的名称,需要填写syslogtcp,监听端口号dd

a1.sources.r1.channels = c1 #匹配agent创建的channel即可

a1.sources.r1.port = 5140 #端口号

a1.sources.r1.host = localhost #发送方的IP地址

5.HTTP Source配置:是HTTP POST和GET来发送事件数据,使用Hander程序实现转换;

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = http #type为组件的名称,需要填写http类型

a1.sources.r1.channels = c1 #匹配agent创建的channel即可

a1.sources.r1.handler= org.example.rest.RestHandler

a1.sources.r1.handler.nickname=random.props

sink源的配置:(常见配置)

sink会消费channel中的数据,然后送给外部数据源或者source;

1.Hive sink:hive的数据是只限制了text和JSON数据直接在hive的表中或partition中;

hive的sink的主要参数详解:

type:构建的类型的名字,此处填写hive;

hive.metastore:Hive metastore的URL;

hive.database:Hive database 名字;

hive table:Hive table名字;

使用hive的额话需要进行先创建表的过程:

create table weblogs ( id int , msg string )

partitioned by (continent string, country string, time string)

clustered by (id) into 5 buckets

stored as orc;

a1.channels = c1

a1.channels.c1.type = memory

a1.sinks = k1

a1.sinks.k1.type = hive

a1.sinks.k1.channel = c1

a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083

a1.sinks.k1.hive.database = logsdb

a1.sinks.k1.hive.table = weblogs

a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M

a1.sinks.k1.useLocalTimeStamp = false

a1.sinks.k1.round = true

a1.sinks.k1.roundValue = 10

a1.sinks.k1.roundUnit = minute

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\t"

a1.sinks.k1.serializer.serdeSeparator = '\t'

a1.sinks.k1.serializer.fieldnames =id,,msg

2.Logger Sink:Logs是INFO level,This sink is the only exception which doesn’t require the extra configuration explained in the Logging raw data section.

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

3.Avro Sink:

需要配置hostname和port,需要在两端都要安装Avro的客户端:

type:组件名称,此处填写Avro;

hostname和port:填写地址;

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 10.10.10.10

a1.sinks.k1.port = 4545

4.Kafka Sink:

a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = mytopic

a1.sinks.k1.kafka.bootstrap.servers = localhost:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.ki.kafka.producer.compression.type = snappy

Channel 配置:(常见配置)

1.Memory Chanel:

使用Memory作为中间的缓存;

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 10000

a1.channels.c1.byteCapacityBufferPercentage = 20

a1.channels.c1.byteCapacity = 800000

2.JDBC Channel:

a1.channels = c1

a1.channels.c1.type = jdbc

3.Kafka Channel:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092

a1.channels.channel1.kafka.topic = channel1

a1.channels.channel1.kafka.consumer.group.id = flume-consumer

4.File Channel:

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /mnt/flume/checkpoint

a1.channels.c1.dataDirs = /mnt/flume/data

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容

  • 常用的source1.1 nettcat1.2 Avro Source1.3 Exec Source1.4 spo...
    水他阅读 1,742评论 0 50
  • Flume的功能和架构特点 ** 功能 **flume 是一个分布式的,可靠的,可用的,可以非常有效率的对大数据的...
    心_的方向阅读 2,512评论 1 10
  • 这里主要介绍几种常见的日志的source来源,包括监控文件型,监控文件内容增量,TCP和HTTP。 Spool类型...
    欢醉阅读 1,386评论 0 10
  • kafka 集群开启kafka服务:nohup bin/kafka-server-start.sh config/...
    夙夜M阅读 1,412评论 0 0
  • 1. Flume简介 Apache Flume是一个分布式的、可靠的、可用的,从多种不同的源收集、聚集、移动大量日...
    奉先阅读 4,466评论 2 5