离线数据分析平台实战——150Flume介绍
Nginx介绍
Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件(IMAP/POP3)代理服务器。
其特点是占有内存少,并发能力强,事实上nginx的并发能力确实在同类型的网页服务器中表现较好。
一般情况下,我们会将nginx服务器作为一个静态资源的访问容器。
Nginx安装步骤
Nginx安装步骤如下:(使用yum命令安装)
- 使用root用户登录。
- 查看nginx信息,命令:yum info nginx.
- 如果查看nginx信息提示nginx找不到,那么可以通过修改rpm源来进行后续步骤,执行命令:rpm -ivh http://nginx.org/packages/centos/6/noarch/RPMS/nginx-release-centos-6-0.el6.ngx.noarch.rpm
- 在查看nginx信息。
- 安装,命令: yum install nginx。在安装过程中直接输入y。
- 启动nginx,命令:service nginx start
- 访问http://192.168.0.120查看nginx的web页面。
Flume介绍
Flume是Apache基金会组织的一个提供的高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,
Flume支持在日志系统中定制各类数据发送方,用于收集数据;
同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
当前Flume有两个版本,
Flume0.9x版本之前的统称为Flume-og,
Flume1.X版本被统称为Flume-ng。
参考文档:http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6/FlumeUserGuide.html
Flume-og和Flume-ng的区别
主要区别如下:
- Flume-og中采用master结构,为了保证数据的一致性,引入zookeeper进行管理。Flume-ng中取消了集中master机制和zookeeper管理机制,变成了一个纯粹的传输工具。
- Flume-ng中采用不同的线程进行数据的读写操作;在Flume-og中,读数据和写数据是由同一个线程操作的,如果写出比较慢的话,可能会阻塞flume的接收数据的能力。
Flume结构
Flume中以Agent为基本单位,一个agent可以包括source、channel、sink,三种组件都可以有多个。
其中source组件主要功能是接收外部数据,并将数据传递到channel中;
sink组件主要功能是发送flume接收到的数据到目的地;
channel的主要作用就是数据传输和保存的一个作用。
Flume主要分为三类结构:单agent结构、多agent链式结构和多路复用agent结构。
单agent结构
多agent链式结构
多路复用agent结构
Source介绍
Source的主要作用是接收客户端发送的数据,并将数据发送到channel中,source和channel之间的关系是多对多关系,不过一般情况下使用一个source对应多个channel。
通过名称区分不同的source。
Flume常用source有:Avro Source、Thrift Source、Exec Source、Kafka Source、Netcat Source等。
设置格式如下:
<agent-name>.sources.<source_name>.type=指定类型
<agent-name>.sources.<source_name>.channels=channels
.... 其他对应source类型需要的参数
Channel介绍
Channel的主要作用是提供一个数据传输通道,提供数据传输和数据存储(可选)等功能。
source将数据放到channel中,sink从channel中拿数据。
通过不同的名称来区分channel。
Flume常用channel有:Memory Channel、JDBC Channel、Kafka Channel、File Channel等。设置格式如下:
<agent-name>.channels.<channel_name>.type=指定类型
.... 其他对应channel类型需要的参数
Sink介绍
Sink的主要作用是定义数据写出方式,一般情况下sink从channel中获取数据,然后将数据写出到file、hdfs或者网络上。
channel和sink之间的关系是一对多的关系。
通过不同的名称来区分sink。
Flume常用sink有:Hdfs Sink、Hive Sink、File Sink、HBase Sink、Avro Sink、Thrift Sink、Logger Sink等。
设置格式如下:
<agent-name>.sinks.<sink_name1>.type=指定类型
<agent-name>.sinks.<sink_name1>.channel=<channe_name>
.... 其他对应sink类型需要的参数
Flume安装
安装步骤如下:
- 下载flume:wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.3.6.tar.gz
- 解压flume。
- 修改conf/flume-env.sh文件,如果没有就新建一个。
- 添加flume的bin目录到环境变量中去。
- 验证是否安装成功, flume-ng version
Flume案例1
使用netcat source监听客户端的请求,使用memory channel作为数据的传输通道,使用logger sink打印监听到的信息。
步骤:
- 在conf文件夹中建立test1.conf,里面是agent的配置。
- 启动flume-ng agent --conf ./conf/ --conf-file ./conf/test.conf --name a1 -Dflume.root.logger=INFO,console。
- 使用telenet命令连接机器,命令:telnet 192.168.100.120 4444
- 输入信息查看是否成功。
test1.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4444
a1.sources.r1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
Flume案例2
Nginx作为日志服务器,通过exec source监听nginx的日志文件,使用memory channel作为数据传输通道,使用hdfs sink将数据存储到hdfs上。
项目用到的配置文件
nginx.conf
user nginx;
worker_processes 1;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
log_format log_format '$remote_addr^A$msec^A$http_host^A$request_uri';
sendfile on;
keepalive_timeout 65;
#include /etc/nginx/conf.d/*.conf;
server {
listen 80;
server_name hh 0.0.0.0;
location ~ .*(BfImg)\.(gif)$ {
default_type image/gif;
access_log /home/hadoop/access.log log_format;
root /etc/nginx/www/source;
}
}
}
test2.conf
agent.sources = r1
agent.sinks = k1
agent.channels = c1
## common
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
## sources config
agent.sources.r1.type = exec
agent.sources.r1.command = tail -F /home/hadoop/access.log
## channels config
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 1000
agent.channels.c1.byteCapacityBufferPercentage = 20
agent.channels.c1.byteCapacity = 1000000
agent.channels.c1.keep-alive = 60
#sinks config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.path = hdfs://hh:8020/logs/%m/%d
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = BF-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.minBlockReplicas=1
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollSize=132692539
agent.sinks.k1.hdfs.idleTimeout=10
agent.sinks.k1.hdfs.batchSize = 1
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.round = true
agent.sinks.k1.hdfs.roundValue = 2
agent.sinks.k1.hdfs.roundUnit = minute
agent.sinks.k1.hdfs.useLocalTimeStamp = true
复杂的配置方法
agent.sources = r1
agent.sinks = k1 k2
agent.channels = c1
## common
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
agent.sinks.k2.channel = c1
## sources config
agent.sources.r1.type = exec
agent.sources.r1.command = tail -F /home/hadoop/access.log
agent.sources.r1.selector.type= replicating
## channels config
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000000
agent.channels.c1.transactionCapacity = 10000000
agent.channels.c1.byteCapacityBufferPercentage = 60
agent.channels.c1.byteCapacity = 12800000000
agent.channels.c1.keep-alive = 60
#sinks config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.path = hdfs://server30-244:8020/logs/%Y/%m/%d
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = BF-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.minBlockReplicas=1
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollSize=132692539
agent.sinks.k1.hdfs.idleTimeout=600
agent.sinks.k1.hdfs.batchSize = 100
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.round = true
agent.sinks.k1.hdfs.roundValue = 2
agent.sinks.k1.hdfs.roundUnit = minute
agent.sinks.k1.hdfs.useLocalTimeStamp = true
agent.sinks.k2.type = hdfs
agent.sinks.k2.channel = c1
agent.sinks.k2.hdfs.path = hdfs://server30-99:8020/agentLog/%Y/%m/%d
agent.sinks.k2.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.filePrefix = ub-%H
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k2.hdfs.minBlockReplicas=1
agent.sinks.k2.hdfs.rollInterval=3600
agent.sinks.k2.hdfs.rollSize=132692539
agent.sinks.k2.hdfs.idleTimeout=600
agent.sinks.k2.hdfs.batchSize = 100
agent.sinks.k2.hdfs.rollCount=0
agent.sinks.k2.hdfs.round = true
agent.sinks.k2.hdfs.roundValue = 2
agent.sinks.k2.hdfs.roundUnit = minute
agent.sinks.k2.hdfs.useLocalTimeStamp = true
#####sink groups
agent.sinkgroups=g1
agent.sinkgroups.g1.sinks=k1 k2
agent.sinkgroups.g1.processor.type= failover
agent.sinkgroups.g1.processor.priority.k1=10
agent.sinkgroups.g1.processor.priority.k2=5
agent.sinkgroups.g1.processor.maxpenalty=10000