简介
flume官网里面有user guide。
作用:日志采集、聚合、传输
核心组件:Agent
agent内部组件:source,sink,channel(缓存)
这些组件可以理解为是抽象类,有很多实现类。在使用时我们可以配置不同的实现类
运行机制
agent可以组织为拓扑网络:
部署运行
上传解压就o了
然后我们就配置采集方案
- 例1:
从网络端口接收数据,下沉到logger
新建采集配置文件,放在conf文件夹下:
netcat-logger
# Name the components on this agent
#给那三个组件取个名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#类型, 从网络端口接收数据,在本机启动, 所以localhost, type=netcat
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost(本机)
a1.sources.r1.port = 44444
相当于一个服务器
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
#下沉的时候是一批一批的, 下沉的时候是一个个eventChannel参数解释:
#capacity:默认该通道中最大的可以存储的event数量
#trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
log4j可以控制日志信息输送的目的地是控制台、文件、GUI组件,甚至是套接口服务器、NT的事件记录器、UNIX Syslog守护进程等
事件event:source取一次数据
启动:
告诉flum启动一个agent,指定配置参数
$ bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
最后的参数-Dflume.root.logger=INFO,console
给log4j传的jvm参数,下沉到控制台
数据传入:
[hadoop@mini1 ~]$ telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
nishishui
OK
2017-08-12 00:09:21,505 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 6E 69 73 68 69 73 68 75 69 0D nishishui. }
move file /home/hadoop/flumespool/t.dat to /home/hadoop/flumespool/t.dat.COMPLETED
- 例2
监视文件夹:
spooldir-logger
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#监听目录,spoolDir指定目录, fileHeader要不要给文件夹前坠名
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/flumespool
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -c ./conf -f ./conf/spool-logger.conf -n a1 -Dflume.root.logger=INFO,console
move file /home/hadoop/flumespool/t.dat to /home/hadoop/flumespool/t.dat.COMPLETED
最后往/home/hadoop/flumeSpool放文件,看到监听
不能放重复文件名的文件
source:
avro source:一种序列化框架,通过网络发送序列化数据,跨平台。flume可以接受。
thrift
exec:unix命令结果
kafka:分布式消息缓存系统
netcat绑定地址是localhost时telnet只能本机连,改为主机名mini1时可以从其他机器连。start-yarn不能在其他的机器起,因为start-yarn配置绑定在mini1:8031提供服务。socket编程,socket服务器只能绑定本地的地址。
- 例3
tail-hdfs.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/log/test.log(-F根据文件名跟踪,-f根据inode跟踪。)
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10(过10分钟改目录)
a1.sinks.k1.hdfs.roundUnit = minute
#文件滚动周期(s)
a1.sinks.k1.hdfs.rollInterval = 3
#文件滚动大小限制(bytes)
a1.sinks.k1.hdfs.rollSize = 500
#写入多少个event数据后滚动文件
a1.sinks.k1.hdfs.rollCount = 20
a1.sinks.k1.hdfs.batchSize = 5
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
mkdir /home/hadoop/log
touch /home/hadoop/log/test.log
while true
do
echo 111111111111111 >> /home/hadoop/log/test.log
sleep 0.5
done
tail -F /home/hadoop/log/test.log
start-dfs.sh
bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
hdfs dfsadmin -report(以防刚启动的时候datanode汇报还没全,处于safemode)
hadoop fs -ls /(mini1:50070)
inode:理解inode,要从文件储存说起。
文件储存在硬盘上,硬盘的最小存储单位叫做"扇区"(Sector)。每个扇区储存512字节(相当于0.5KB)。操作系统读取硬盘的时候,不会一个个扇区地读取,这样效率太低,而是一次性连续读取多个扇区,即一次性读取一个"块"(block)。这种由多个扇区组成的"块",是文件存取的最小单位。"块"的大小,最常见的是4KB,即连续八个 sector组成一个 block。文件数据都储存在"块"中,那么很显然,我们还必须找到一个地方储存文件的元信息,比如文件的创建者、文件的创建日期、文件的大小等等。这种储存文件元信息的区域就叫做inode,中文译名为"索引节点"。
linux软连接和硬链接:
ln -s /xxx /xxx(快捷方式,本质是一个文件)
例:
ln -s /home/hadoop/aaa/ /home/bbb
rm /home/bbb(只删除快捷方式这个文件)
rm -rf /home/bbb(会删除指向数据)
ln /xxx /xxx(同一文件,有两个inode,文件名相当于一个引用)
例4
串联
tail-avro-avro
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/log/test.log
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks = k1
#sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = mini2
a1.sinks.k1.port = 4141
a1.sinks.k1.batch-size = 2
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
avro-hdfs/logger
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#source中的avro是一个数据接收者
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
发送数据:
$ bin/flume-ng avro-client -H localhost -p 4141 -F /usr/logs/log.10
线看一波串联tail-avro-avro-logger的效果:
scp -r /flume mini2:$PWD
mini1: vi tail-avro.conf
mini2: vi avro-log.conf
bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
netstat -nltp查看端口是否监控
bin/flume-ng agent -c conf -f conf/tail-avro.conf -n a1
没办法高可用,但可以写脚本监控
某些特定场景不好实现,常用日志场景,需要定制开发