(以下测试内容都是基于jdk1.8.0_66,操作系统为win10,仅供学习和测试,部分内容引用自官网资源)
安装并测试flume
下载apache-flume-1.8.0版,解压。
在目录conf下,有两个文件需要配置,一个是flume-env.ps1,用于配置环境变量。另一个flume-conf.properties.template是示例配置文件。
修改flume-env.ps1,加入JAVA变量:
$JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "
复制配置文件,命名为ota-conf.properties,并修改内容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
运行:
bin\flume-ng.cmd agent -name a1 -conf-file conf\ota-conf.properties
打开另一个命令窗口,通过telnet localhost 44444之后发送命令,在flume控制台可以看到消息输出。
安装并测试kafka
安装kafka之前需要先安装zookeeper(或者使用kafka自带的zk,笔者为自行下载安装),下载并解压到系统目录,笔者下载的是zookeeper-3.4.8版本。
运行zk,修改配置文件,注意调整dataDir和clientPort两项。
dataDir=D:\\develop\\zookeeper-3.4.8\\data
clientPort=2181
执行命令:
bin\zkServer.cmd
下载并解压kafka,笔者下载的是kafka_2.11-2.0.0版本。修改配置文件,server.properties,主要调整zk的连接地址。
zookeeper.connect=localhost:2181
启动kafka并创建一个主题:
bin\windows\kafka-server-start.bat config\server.properties
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
测试发布消息:
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
测试消费消息:
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
配置flume数据输出到kafka
flume配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#a1.sources.r1.type = exec
#a1.sources.r1.command = tail -f -n+1 info.log
# Describe the sink
#a1.sinks.k1.type = logger
#a1.sinks.k1.maxBytesToLog =1024
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type= snappy
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#a1.channels.c1.byteCapacity = 1000000
#a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
重启flume。然后通过telnet输入消息。通过上文中的kafka测试comsumer查看消息是否正常被kafka接收并消费。