1 Kafka
概述
1.1 定义
Kafka
是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
应用场景:
解耦
异步
削峰
1.2 消息队列
1.2.1 传统消息队列的应用场景
1.2.2 消息队列的两种模式
点对点模式:
消息生产者生产消息发送到Queue
中,然后消息消费者从Queue
中取出并且消费消息,消息被消费以后,Queue
中不再有存储,所以消息消费者不可能消费到已经被消费的消息,Queue
支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
发布订阅模式:
消息生产者将消息发布到Topic
,同时有多个消息消费者该消息,和点对点不同的是,发布到Topic
中的消息会被所有订阅者消费。
1.3 基础架构
Producer
:消息生产者,就是向Kafka Broker
发消息的客户端
Consumer
:消息消费者,向Kafka Broker
取消息的客户端
Consumer Group (CG)
:消费者组,由多个Consumer
组成,消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费,消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
Broker
:一台Kafka
服务器就是一个Broker
,一个集群由多个Broker
组成,一个Broker
可以容纳多个Topic
Topic
:可以理解为一个队列,生产者和消费者面向的都是一个Topic
Partition
:为了实现扩展性,一个非常大的Topic
可以分布到多个Broker
(即服务器)上,一个Topic
可以分为多个Partition
,每个Partition
是一个有序的队列
Replica
:副本,为保证集群中的某个节点发生故障时,该节点上的Partition
数据不丢失,且Kafka
仍然能够继续工作,Kafka
提供了副本机制,一个Topic
的每个分区都有若干个副本,一个leader
和若干个follower
leader
:每个分区多个副本的主,生产者发送数据的对象,以及消费者消费数据的对象都是leader
follower
:每个分区多个副本中的从,实时从leader
中同步数据,保持和leader
数据的同步,leader
发生故障时,某个follower
会成为新的follower
2 Kafka
快速入门
2.1 安装部署
1、解压
[djm@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
2、修改解压后的文件夹名称
[djm@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka
3、在/opt/module/kafka
目录下创建logs
文件夹
[djm@hadoop102 kafka]$ mkdir logs
4、修改配置文件
[djm@hadoop102 kafka]$ vi config/server.properties
修改以下内容
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
5、分发
[djm@hadoop102 kafka]$ xsync kafka
6、修改其他Broker
的broker.id
7、Kafka
群起脚本
[djm@hadoop102 kafka]$ vim start-kafka
for i in `cat /opt/module/hadoop-2.7.2/etc/hadoop/slaves`
do
echo "========== $i =========="
ssh $i 'source /etc/profile&&/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties'
echo $?
done
[djm@hadoop102 kafka]$ chmod 777 start-kafka
[djm@hadoop102 kafka]$ sudo mv start-kafka /bin
8、启动Kafka
集群
[djm@hadoop102 kafka]$ start-kafka
2.2 命令行操作
1、查看所有Topic
[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
2、创建Topic
[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
#--topic 定义topic名
#--replication-factor 定义副本数
#--partitions 定义分区数
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
3、删除Topic
[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
4、发送消息
[djm@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
5、消费消息
[djm@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
--from-beginning 会把topic中以往所有的消息消费出来
6、查看Topic
详细信息
[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
7、修改分区数
[djm@hadoop102 kafka]$bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
分区数只能增加,不能减少
3 Kafka
架构
3.1 Kafka
工作流程及文件存储机制
Kafka
中消息是以Topic
进行分类的,生产者生产消息,消费者消费消息,都是面向Topic
的;
Topic
是逻辑上的概念,而Partition
是物理上的概念,每个Partition
对应于一个log
文件,该log
文件中存储的就是Producer
生产的数据;
Producer
生产的数据会被不断追加到该log
文件末端,且每条数据都有自己的offset
,消费者组中的每个消费者,都会实时记录自己消费到了哪个offset
,以便出错恢复时,从上次的位置继续消费。
由于生产者生产的消息会不断的追加到log
文件末尾,为了防止文件过大而导致数据定位效率低下,Kafka
采取了分片和索引机制,将每个Partiton
分为多个segment
,每个segment
对应两个文件,分别是.log
和.index
,这些文件位于同一个文件夹下,文件夹的命名规则为Topic
名称+Partiton
序号,.log
和.index
文件以当前segment
的第一条消息的offset
命名,index
存储索引信息,.log
存储数据信息,索引文件中的元数据指向对应数据文件中message
的物理偏移地址。
3.2 Producer
3.2.1 分区策略
为什么要进行分区?
- 方便在群集中扩展,每个
Partition
可以通过调整以适应它所在的机器,而一个Topic
又可以有多个Partition
组成,因此整个集群就可以适应任意大小的数据了 - 可以提高并发
分区的原则是什么?
我们需要将Producer
发送的数据封装成一个ProducerRecord
对象:
- 指明
Partition
的情况下,直接将指明的值直接作为Partition
值 - 没有指明
Partition
值但有key
的情况下,将key
的hash
值与Topic
的Partition
数进行取余得到Partition
值 - 既没有
Partition
值又没有key
值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与Topic
可用的Partition
总数取余得到Partition
值,也就是常说的round-robin
算法
3.2.2 数据可靠性保证
为保证Partition
发送的数据,能可靠的发送到指定的Topic
,Topic
的每个Partition
收到Producer
发送的数据后,都需要向Producer
发送ack
(acknowledgement
确认收到),如果Producer
收到ack
,就会进行下一轮的发送,否则重新发送数据。
副本数据同步策略:
方案 | 优点 | 缺点 |
---|---|---|
半数以上完成同步,就发送ack
|
延迟低 | 选举新的leader 时,容忍n 台节点的故障,需要2n+1 个副本 |
全部完成同步,才发送ack
|
选举新的leader 时,容忍n 台节点的故障,需要n+1 个副本 |
延迟高 |
Kafka
选择了第二种方案,原因如下:
同样为了容忍n
台节点的故障,第一种方案需要2n+1
个副本,而第二种方案只需要n+1
个副本,而Kafka
的每个Partition
存储大量的数据,这样会造成大量的数据冗余;
虽然第二种方案的延迟会比较高,但是相比而言延迟对Kafka
的影响较小。
采用第二种方案后,leader
收到数据,所有的follower
都开始同步数据,但是有一个follower
,因为某种故障,迟迟不能与leader
同步,那leader
就要一直等下去,直到它同步完才能发送ack
,这个问题怎么解决呢?
leader
维护了一个动态的in-syncreplica set (ISR)
,意为和leader
保持同步的follower
集合,当ISR
中的follower
完成数据的同步之后,leader
就会给follower
发送ack
,如果follower
长时间未向leader
同步数据,则该follower
将被踢出ISR
,该时间阈值由replica.lag.time.max.ms
参数设定,leader
发生故障之后,就会从ISR
中选举新的leader
。
ack
应答机制:
对于某些不重要的数据,能够容忍少量数据的丢失,所以没必要等ISR
中的所有follower
全部同步完成
所以Kafka
提供了三种可靠性级别,根据对可靠性和延迟的要求权衡,分别是:
- 0
Producer
不等待Broker
的ack
,这一操作提供了最低的延迟,Broker
一接收到还没有落盘就已经返回,当Broker
故障时可能会丢失数据 - 1
Producer
等待Broker
的ack
,Partition
的leader
落盘成功后返回ack
,如果在follower
同步成功之前leader
故障,那么将会丢失数据 - -1
Producer
等待Broker
的ack
,Partition
的leader
和follower
全部落盘成功后才返回ack
,但是如果在follower
同步完成后,Broker
发送ack
之前,leader
发生故障,那么会造成数据重复
故障处理:
follower
挂了被会暂时提出ISR
,等到follower
恢复后,follower
会读取本地磁盘记录上次的HW
,并将log
文件中高于HW
的部分截取掉,从HW
开始向leader
进行同步,等leader
的LEO
高于Partition
的HW
,就可以被重新加入ISR
leader
发生故障之后,会从ISR
中选出一个新的leader
,为保证多个副本之间的数据一致性,每个leader
会将各自log
文件中高于HW
的数据切掉,然后从新的leader
同步数据
3.3.3 Exactly Once
语义
对于某些比较重要的消息,我们需要保证Exactly Once
语义,即保证每条消息被发送且仅被发送一次
在0.11
版本之后,Kafka
引入了幂等性机制(idempotent
),配合acks = -1
时的at least once
语义,实现了Producer
到Broker
的Exactly once
语义
idempotent + at least once = exactly once
使用时,只需将enable.idempotence
属性设置为true
,Kafka
自动将acks
属性设为-1
3.3 Consumer
3.3.1 消费方式
Consumer
采取pull
的方式从Broker
中读取数据
为什么采用pull
方式呢?
因为push
模式很难适应不同速率的Consumer
,因此发送速率是由Broker
决定的,它的目的就是尽可能快的传递消息,但是这样容易造成Consumer
来不及处理消息,典型的表现就是网络拥堵以及拒绝服务,而poll
模式则可以根据Consumer
的消费能力消费消息。
但是poll
也有不足,就是如果队列中没有消息,Consumer
可能陷入循环中,一直返回空数据,针对这个缺点,Consumer
在消费数据时会传入一个timeout
,如果当前没有消息可供消费,Consumer
会等待一段时间再返回,这段时间就是timeout
。
3.3.2 分区分配策略
Kafka
有两种分配策略,分别是:
3.3.3 offset
维护
由于Consumer
在消息过程中可能会出现断电宕机等故障,Consumer
恢复后,需要从故障的位置继续消费,所以Consumer
需要实时记录自己消费到了哪个offset
0.9
以前,Consumer
默认将offset
保存在ZK
中
0.9
以后,Consumer
默认将offset
保存在Kafka
一个内置的Topic
,该Topic
为__consumer_offsets
3.4 Kafka
高效读取数据
顺序写磁盘
Kafka
的Producer
生产数据,要写入到log
文件中,写的过程是一直追加到文件末端
零拷贝技术
3.5 Zookeeper
在Kafka
中的作用
Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。
Controller的管理工作都是依赖于Zookeeper的。
以下为partition的leader选举过程:
4 Kafka API
4.1 Producer API
4.1.1 消息发送流程
Kafka
的Producer
发送消息采用的是异步发送的方式,在消息发送的过程中,涉及到了两个线程——main
线程和Sender
线程,以及一个线程共享变量——RecordAccumulator
,main
线程将消息发送给RecordAccumulator
,Sender
线程不断从RecordAccumulator
中拉取消息发送到Kafka broker
。
相关参数:
batch.size
:只有数据积累到batch.size
之后,sender
才会发送数据
linger.ms
:如果数据迟迟未达到batch.size
,sender
等待linger.time
之后就会发送数据
相关类:
KafkaProducer
:需要创建一个生产者对象,用来发送数据
ProducerConfig
:获取所需的一系列配置参数
ProducerRecord
:每条数据都要封装成一个ProducerRecord
对象
导入依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
4.1.2 异步发送
package com.djm.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("success -> " + metadata.offset());
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
}
}
4.1.3 同步发送
package com.djm.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("success -> " + metadata.offset());
} else {
exception.printStackTrace();
}
}
}).get();
}
producer.close();
}
}
4.2 Consumer API
Consumer
消费数据时的可靠性是很容易保证的,因为数据在Kafka
中是持久化的,故不用担心数据丢失问题。
由于Consumer
在消费过程中可能会出现断电宕机等故障,Consumer
恢复后,需要从故障前的位置的继续消费,所以Consumer
需要实时记录自己消费到了哪个offset
,以便故障恢复后继续消费。
所以offset
的维护是Consumer
消费数据是必须考虑的问题。
相关类:
KafkaConsumer
:需要创建一个消费者对象,用来消费数据
ConsumerConfig
:获取所需的一系列配置参数
ConsuemrRecord
:每条数据都要封装成一个ConsumerRecord
对象
导入依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
4.2.1 手动提交offset
package com.djm.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("first"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}
}
手动提交offset的方法有两种:
-
commitSync
(同步提交):将本次poll
的一批数据最高的偏移量提交,失败重试,一直到提交成功 -
commitAsync
(异步提交):将本次poll
的一批数据最高的偏移量提交,没有失败重试机制,有可能提交失败
4.2.2 自动提交offset
自动提交offset
的相关参数:
enable.auto.commit
:是否开启自动提交offset
功能
auto.commit.interval.ms
:自动提交offset
的时间间隔
package com.djm.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("first"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
4.3 自定义Interceptor
Interceptor
是在Kafka
0.10
版本被引入的,主要用于实现Client
端的定制化控制逻辑。
对于Producer
而言,Interceptor
使得用户在消息发送前以及Producer
回调逻辑前有机会对消息做一些定制化需求,比如修改消息等,同时,Producer
允许用户指定多个Interceptor
按序作用于同一条消息从而形成一个Interceptorchain
。
Interceptor
的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor
,其定义的方法包括:
-
configure(configs)
:获取配置信息和初始化数据时调用 -
onSend(ProducerRecord)
:Producer
确保在消息被序列化以及计算分区前调用该方法,用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的Topic
和Partition
,否则会影响目标分区的计算 -
onAcknowledgement(RecordMetadata, Exception)
:该方法会在消息从RecordAccumulator
成功发送到Kafka Broker
之后,或者在发送过程中失败时调用 -
close
:关闭Interceptor
,主要用于执行一些资源清理工作
拦截器案例
1、需求分析:
实现一个简单的双Interceptor
组成的拦截链,第一个Interceptor
会在消息发送前将时间戳信息加到消息value
的最前部,第二个Interceptor
会在消息发送后更新成功发送消息数或失败发送消息数
2、编写TimeInterceptor
package com.djm.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
3、编写CounterInterceptor
package com.djm.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private static long successCounter = 0L;
private static long errorCounter = 0L;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
@Override
public void configure(Map<String, ?> configs) {
}
}
4、修改CustomProducer
package com.djm.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
List<String> interceptors = new ArrayList<>();
interceptors.add("com.djm.kafka.interceptor.TimeInterceptor");
interceptors.add("com.djm.kafka.interceptor.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("success -> " + metadata.offset());
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
}
}
5 Flume
对接Kafka
1、配置Flume
编写flume-kafka.conf
[djm@hadoop102 job]$ vim flume-kafka.conf
输入一下内容
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2、启动消费者
[djm@hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
3、启动Flume
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf
4、向/opt/module/datas/flume.log
里追加数据,查看Kafka
消费情况
6 Kafka
监控
6.1 Monitor
1、上传jar
包KafkaOffsetMonitor-assembly-0.4.6.jar
到集群
2、在/opt/module/
下创建kafka-offset-console
文件夹
3、将上传的jar
包放入刚创建的目录下
4、在/opt/module/kafka-offset-console
目录下创建启动脚本start.sh
,内容如下:
#!/bin/bash
java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaBrokers hadoop102:9092,hadoop103:9092,hadoop104:9092 \
--kafkaSecurityProtocol PLAINTEXT \
--zk hadoop102:2181,hadoop103:2181,hadoop104:2181 \
--port 8086 \
--refresh 10.seconds \
--retain 2.days \
--dbName offsetapp_kafka &
5、在/opt/module/kafka-offset-console
目录下创建mobile-logs
文件夹
6、启动Monitor
./start.sh
6.2 Manager
1、上传压缩包kafka-manager-1.3.3.15.zip
到集群
2、解压到/opt/module
3、修改配置文件conf/application.conf
kafka-manager.zkhosts="kafka-manager-zookeeper:2181"
修改为:
kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"
4、启动kafka-manager
[djm@hadoop102 kafka-manager-1.3.3.15]$ bin/kafka-manager
5、登录hadoop102:9000
页面查看详细信息