Flume 是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统, Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。(Flume使用java编写,支持Java1.6及以上。)
由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡。
1.主要的核心概念:
Event:flume最基本的数据单元,带有一个可选的消息头(headers)。如果是文本,event通常是一行记录,event也是事务的基本单位。
Flow:Event从源点到达目的点的迁移的抽象。
Client:操作位于源点处的Event,将其发送到Flume Agent。
Agent:一个独立的Flume进程,包含组件Source、Channel、Sink。
Source:用来消费传递到该组件的Event,完成对数据的收集,分成transtion和event打入到channel之中。不同的source,可以接受不同的数据格式。
Channel:主要提供一个队列的功能,对source提供中的数据进行简单缓存,作用是保证source到sink的数据传输过程一定能成功。
Sink:取出Channel中的数据,进行相应的存储文件系统、数据库等。
Flume逻辑上分三层架构:agent,collector,storage。
agent 用于采集数据,agent是flume中产生数据流的地方,同时,agent会将产生的数据流传输到collector。
collector 的作用是将多个agent的数据汇总后,加载到storage中。
storage 是存储系统,可以是一个普通file,也可以是HDFS,HIVE,HBase等。
source:client端操作消费数据的来源,支持的类型有Avro、log4j、tailDir、http post、Thrift、JMS、Spooling Directory等类型。
对原程序影响最小的方式是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。对于直接读取文件 Source,有两种方式:
ExecSource: 以运行 Linux 命令的方式,持续的输出最新的数据,如tail -F文件名指令,在这种方式下,取的文件名必须是指定的。 ExecSource 可以实现对日志的实时收集,但是存在Flume不运行、指令执行出错或者channel爆仓,将导致event传送失败,无法保证日志数据的完整性。
SpoolSource: 监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:拷贝到 spool 目录下的文件不可以再打开编辑;spool 目录下不可包含相应的子目录。
SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。
使用SpoolingDirectorySource的时候,一定要避免同时读写一个文件的情况。 可以通过 source1.ignorePattern=^(.)*\\.tmp$ 配置,让spoolingsource不读取该格式的文件。
RPC: RPC 的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。
比如服务器A、B,一个应用部署在A服务器上,相应调用B服务器上应用提供的函数\方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。1.首先解决通讯问题,建立TCP连接,解决寻址问题。通过序列化利用二进制传输,收到数据后反序列化,恢复内存中的表达方式,找到对应方法后执行得到返回值。
Avro: 是一个基于二进制数据传输的高性能的中间件,avro可以将数据结构或对象转化为便于存储或传输的格式,适合远程或本地大规模的存储和交。Avro支持两种序列化编码方式:二进制编码和JSON编码。使用二进制编码会高效序列化,并且序列化后得到的结果会比较小;而JSON一般用于调试系统或是基于WEB的应用。Avro也被作为一种RPC框架来使用。
Thrift: Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目。Thrift通过IDL(Interface Definition Language,接口定义语言)来定义RPC(Remote Procedure Call,远程过程调用)的接口和数据类型,然后通过thrift编译器生成不同语言的代码(目前支持C++,Java, Python, PHP, Ruby, Erlang, Perl,Haskell, C#, Cocoa, Smalltalk和OCaml),并由生成的代码负责RPC协议层和传输层的实现。
Netcat: NetCat是一个非常简单的Unix工具,可以读、写TCP或UDP网络连接(network connection)。最简单的使用方法,”nc host port”,能建立一个TCP连接,连向指定的主机和端口。接下来,你的从标准输入中输入的任何内容都会被发送到指定的主机,任何通过连接返回来的信息都被显示在你的标准输出上。这个连接会一直持续下去,至到连接两端的程序关闭连接。
channel: 有MemoryChannel, JDBC Channel, File Channel, Kafka Channel. 比较常用的是前三种。有capacity、transactionCapacity、keep-alive等属性。
MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。
FileChannel保证数据的完整性与一致性。在具体配置FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
Sink:支持的数据类型:HDFS Sink,Logger Sink, Kafka Sink,Avro Sink, Thrift, IPC, File Roll 等。
Sink在设置存储数据时,可以向文件系统、数据库、hadoop存数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。
多个 agent 顺序连接:
可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。
多个Agent的数据汇聚到同一个Agent:
这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个agent上用来存储数据存储系统,如HDFS上。
多路(Multiplexing)Agent
这种模式,有两种方式,一种是用来复制(Replication),另一种是用来分流(Multiplexing)。Replication方式,可以将最前端的数据源复制多份,分别传递到多个channel中,每个channel接收到的数据都是相同的。Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel。
实现load balance功能
Load balancing Sink Processor能够实现load balance功能,上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上。
实现failover能
Failover Sink Processor能够实现failover功能,具体流程类似load balance(可参考load balance图),但是内部处理机制与load balance完全不同:Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。如果一个Sink能够成功处理Event,则会加入到一个Pool中,否则会被移出Pool并计算失败次数,设置一个惩罚因子。
代码实例:
启动agent的shell操作:
flume-ng agent -n a1 -c ../conf -f ../conf/example.file
-Dflume.root.logger=DEBUG,console
参数说明: -n 指定agent名称(与配置文件中代理的名字相同)
-c 指定flume中配置文件的目录 -f 指定配置文件
-Dflume.root.logger=DEBUG,console 设置日志等级
例:flume-ng agent -n a1 -c/usr/local/flume/conf -f /usr/local/flume/conf/avro.conf
-Dflume.root.logger=INFO,console
Kafka、Flume区别:都可以实现数据传输,但侧重点不同。
Kafka追求的是高吞吐量、高负载(topic下可以有多个partition)
Flume追求的是数据的多样性:数据来源的多样性、数据流向的多样性。
如果数据来源很单一、想要高吞吐的话可以使用Kafka;如果数据来源很多、数据流向很多的话可以使用Flume;也可以将Kafka和Flume结合起来使用。
关于flume和kafka的集成
Flume的kafka Sink目前只支持Kafka 0.9及以上的版本,目前kafka最新版本是1.0
Flume的source、channel、sink均支持kafka接口。
参考:Flume官方文档