本文由叩丁狼教育Java大神班六期杨光同学原创
安装kafka:
kafka的使用需要zookeeper,默认的kafka包中自带zookeeper。但是有局限性。
安装注意:
1:需要自己写zookeeper关联的配置文件zoo.cnf
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20(从节点与主节点建立初始化连接的时间上限)
syncLimit=5(从节点与主节点不同步状态的时间上线)
群组需要额外配置的:
server.1=zoo1.example.com:2888:3888
server.2=zoo2.example.com:2888:3888
server.3=zoo3.example.com:2888:3888
(server.X=hostname:peerPort:leaderPort X:服务器ID,hostname:服务器的及其名或IP地址,peerPort:节点间通信的TCP端口,leaderPort:首领选举的TCP端口)
每个服务器必须在data Dir目录中创建一个叫做myid的文件,文件需要包含服务器ID,与配置文件中的ID保持一致
kafka的broker选举成为控制器机制:
核心思想是:首先创建一个ephemeral(短暂的)节点,例如“/election”,然后每一个服务器在此节点下创建一个sequence/ephemeral类型的节点,例如”/election/n_”.在sequence标志下,Zookeeper将自动的为每一个Zookeeper服务器分配一个比前一个序号要大的序号。此时创建节点的Zookeeper服务器中拥有最小序号编号的为控制器。
实际操作中还需要保障,当控制器服务器发生故障的时候,能够快速的选出下一个Zookeeper服务器作为控制器。一个简单的解决方案就是:所有的follwer监视控制器所对应的节点。当控制器发生故障时,所对应的控制器临时节点被删除,此时触发所有监视控制器服务器的watch。这样这些服务器收到控制器故障消息,并进行下一次的控制器选举操作。但是这样会引发“从众效应”的发生,尤其是当集群中服务器众多,带宽延时较大的时候,更为明显。
Zookeeper为了避免从众效应的产生,这样实现:每个follower对follwer集群中对应比自己节点序号小一号的节点(也就是对应序号比自己小的节点中的序号最大的节点)设置一个watch。只有当follower所设置的watch被触发的时候,他才进行控制器选举操作,一般情况下它将成为下一个控制器
Kafka的broker配置
- broker.id 每个broker的标识符(从0开始的任意整数,集群中唯一)
- port 默认9092.注意:使用1024以下端口需要root权限
zookeeper.connect=hostname:port/path
hostname: Zookeeper服务器的机器名或IP地址
port: Zookeeper客户端的链接端口号
/path 可选Zookeeper路径,作为Kafka集群的chroot环境,默认跟路径
- log.dirs 逗号分隔路径,“最少使用原则”,把同一个分区的日志片段保存到同一个路径下。
- num.recovery.threads.per.data.dir 每个数据dir恢复的时候需要使用的线程数服务器正常启动,用于打开每个分区的日志片段
- auto.create.topics.enable
:当一个生产者开始往主题写入消息时
:当一个消费者开始从主题读取消息时
:当任意一个客户端向主题发送元数据请求时
Topic的配置
- num.partitions
- log.retention.ms
- log.retention.bytes(上面两个参数,满足任意一个消息就会被删除)
- log.segment.bytes
- log.segment.ms(上面两个参数,满足任意一个消息片段则会被关闭)
- message.max.bytes(消息大小)
生产者概览
[图片上传失败...(image-bb2c4f-1538215358853)]
(kafka发送消息的主要步骤)
创建Kafka生产者
3个必选属性:
1:bootstrap.servers broker地址,建议两个及以上
2:key.serializer
3:value.serializer
创建生产者对象:
Properties props=new Properties();
props.put("bootstrap.servers","broker1:9092,broker2.9092");
props.put("key.serializer",".........StringSerializer");
props.put("value.serializer","......StringSerializer");
KafkaProducer<String,String> producer=new KafkaPropducer<>(props);
-
发送消息的三种用法
ProducerRecord<String,String> record=new ProducerRecord<>("topicName","messageKey","messageValue"); producer.send(record);
注意:
1:key值相同的会被分配到同一个partition中,但是如果topic增加了partition那么就不能保证了。
2:发送的消息会先放到缓冲区,然后使用单独线程发送到服务器端(默认配置)
3:send()方法会返回一个包含RecordMetadata的Future对象,可以知道是否成功。如果不关心返回结果则可以不去管理。
ProducerRecord<String,String> record=new ProducerRecord<>("topicName","messageKey","messageValue");
record.send(record).get();
注意:
1:send()方法返回一个Future对象
2:调用Future对象,调用Future对象的get()方法等待Kafka响应,如果服务器返回错误,get()方法会抛出异常,如果没有发生错误,我们会得到一个RecordMetadate对象,可用来获取消息的偏移量
private class DemoProducerCallback implements Callback{
@Override
public void onCompletion(RecordMetadata recordMetadata,Exception e){
if(e!=null){
e.printStackTrace();
}
}
}
ProducerRecord<String,Stirng> record=new ProducerRecord<>("topicName","messageKey","messageValue");
producer.send(record,new DemoProducerCallback());
题外话:
在我们的producer发送消息到kafka上之后,成功了则返回包含RecordMetadate的Future,失败了抛出异常,但是如何判断消息发送成功了?参数”request.required.acks”值来确定。参数值为0:不管结果,发送了则表示成功。1:一个副本确认了则证明成功。-1:所有的都要确认了才表示成功。
同步发送,异步发送,oneway发送三种方式
[图片上传失败...(image-872404-1538215358852)]
该字段用来表示是同步发送还是异步发送,异步发送可以设置批量发送消息,但是数据丢失可能性也会大一些。
异步发送其他的参数:
[图片上传失败...(image-3655e6-1538215358852)]
一般对各模式的对应配置
分区器:
实现Partitioner接口,有三个方法:configure,partition,close。实现partition方法即可。
Kafka消费者
kafka消费者从属于消费者群组consumerGroup,一个群组中订阅的是同一个主题,每个消费者接受主题的一部分分区的消息。每个群组都有个群组协调器broker
注意点:
1、第一个consumer成为群组协调器,协调器进行给consumer进行分配partition
2、消费者可以横向进行拓展(不建议频繁操作)
3、ConsumerGroup中的consumer数量不要超过Topic中的partition数量
4、同一个Topic中的单一partition不会同时分配到同一个ConsumerGroup中的不同的consumer
5、如何判断一个consumer在线:向群组协调器的broker发送心跳(轮训获取消息和提交偏移量的时候)。
6、有consumer挂了,就会进行一次再均衡。
创建Kafka消费者:
Properties prop=new Properties();
prop.put("bootstrap.servers","broker1.9092,broker2.9092");
prop.put("group.id","testGroup");
prop.put("key.deserializer","...");
prop.put("value.deserializer","...");
KafkaConsumer<String,String> consumer=new KafkaConsumer<>(prop);
try{
while(true){//循环轮训,防止被认为consumer死了
ConsumerRecords<String,String> records=consumer.poll(100);//发送心跳在此处
for(ConsumerRecord<String,String> record:records){
String topic=record.topic();
Integer partition=record.partition();
Integer offset=record.offset();
String key=record.key();
String value=record.value();
}
}
}finally{
consumer.close();
}
注意:
1、单个线程只能运行一个消费者
Consumer参数:
1、fetch.min.bytes
2、fetch.max.wait.ms
3、max.partition.fetch.bytes //最大分区获取数据大小,默认1M,如果consumer需要处理4个分区,那就是4M数据,数据过大会导致consumer消费时间过长影响session.timeout.ms
4、session.timeout.ms //消费者被认为死亡之前可以与服务器断开连接的时间
5、auto.offset.reset //没有偏移量的分区或者偏移量无效(消费者长时间失效,偏移量记录过时并被删除),该如何处理,默认latest(最近的),还有earliest(起始位置)。
6、enbale.auto.commit //是否自动提交偏移量true或者false,如果设置为true需要auto.commit.interval.ms来决定提交频率
7、partition.assignment.strategy //重分配的分配策略
Range:partitionNum/consumerNum多余的会按照顺序多分配(默认)
RoundRobin:按照顺序循环分配
8、client.id //broker用来标识来源,主要用于日志,度量指标和配额中
9、max.poll.records //每次poll的最大数量
10、receive.buffer.bytes和send.buffer.bytes //TCP缓冲区的大小,设置为-1则使用系统默认的
偏移量:
consumer提交偏移量原理:
向名为consumer_offset的topic发送消息,包含每个partition的offset。如果consumer一直在运行则没什么用处,但是如果发生了再均衡,则会读取该topic中的partition最后一次提交的offset进行继续处理。
自动提交:
设置enable.auto.commit为true则进行自动提交,提交频率auto.commit.interval.ms单位为妙,默认5.
提交当前偏移量:
将auto.commit.offset设置为false,让应用决定何时设置偏移量。使用commitSync()提交偏移量,此方法会提交由poll方法返回的最新偏移量。
while(true){
ConsummerRecords<String,String> records=consumer.poll(100);
for(ConsummerRecord record:records){
//获取数据,进行处理
}
try{
consumer.commitSync();
}catch(CommitFailedException e){
log.error("commit failed",e);
}
}
commitAsync(); 进行异步提交
while(true){
ConsummerRecords<String,String> records=consumer.poll(100);
for(ConsummerRecord record:records){
//获取数据,进行处理
}
consumer.commitAsync();
}
提交之后不必等待,直接去操作接下来的执行动作。但是他也有弊端。
解决方法:
while(true){
ConsummerRecords<String,String> records=consumer.poll(100);
for(ConsummerRecord record:records){
//获取数据,进行处理
}
consumer.commitAsync(new OffsetCommitCallback(){//回调方法用于记录错误和重试,回调函数成不成功都执行
public void onComplete(Map<TopicPartition,OffsetAndMetadata> offsets,Exception e){
if(e!=null){
log.error("...");
}
}
});
}
手动提交偏移量
private Map<TopicPartition,OffsetAndMetadata> currentOffsets=new HashMap<>();
int count=0;
while(true){
ConsummerRecords<String,String> records=consumer.poll(100);
for(ConsummerRecord record:records){
//获取数据,进行处理
currentOffsets.put(new TopicPartition(record.topic(),record.partion()),new OffsetAndMetadata(record.offset()+1,"no metada"));
if(count%1000==0){
consumer.commitAsync(currentOffsets,null);//null类型为OffsetCommitCallback
}
count++;
}
}
再均衡监听器
进行分配新分区或者移除旧分区的时候导致的再均衡。
实现:调用subscribe()方法的时候参数传:ConsumerRebalanceListener实例就可以了。该接口有两个待实现接口:
1)void onPartitionsRevoked(Collection<TopicPartition> partitions)
该方法在再均衡开始之前和消费者停止读取消息之后被调用。在此处提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。
2)void onPartitionsAssigned(Collection<TopicPartitino> partitions)
该方法会在再均之后和消费者开始读取消息之前被调用
示例:
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new Hash<>();
private class HandleRebalance implements ConsumerRebalanceListener {
private void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
private void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffsets);
}
}
int count = 0;
try {
consumer.subscribe(topics, new HandleRebalance());// 传再均衡监听器
while (true) {
ConsummerRecords<String, String> records = consumer.poll(100);
for (ConsummerRecord record : records) {
// 获取数据,进行处理
currentOffsets.put(new TopicPartition(record.topic(), record.partion()),
new OffsetAndMetadata(record.offset() + 1, "no metada"));
if (count % 1000 == 0) {
consumer.commitAsync(currentOffsets, null);// null类型为OffsetCommitCallback
}
count++;
}
}
} catch (WakeupException e) {
// 忽略异常,正在关闭消费者
} catch (Exception e) {
log.error("Unexceped error", e);
} finally {
try {
consumer.commitSync(cuurentOffsets);
} finally {
consumer.close();
}
}
将偏移量保存到数据库,从数据库读取偏移量
需要改动的在实现ConsumerRebalanceListener接口的实现类中做修改:
private class HandleRebalance implements ConsumerRebalanceListener {
private void onPartitionsAssigned(Collection<TopicPartition> partitions) {
commitDBTransaction();// 在处理数据之前开启事务,在结束之前将事务提交
}
private void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
consumer.seek(partition, getOffsetFromDB(partition));
}
}
}
退出:
consumer.wakeup()是消费者唯一一个可以从其他线程里安全调用的退出方法;
final Thread mainThread = Tread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.info("Starting exist...");
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
log.error("UnexceptedException", e);
}
}
});
想获取更多技术视频,请前往叩丁狼官网:http://www.wolfcode.cn/openClassWeb_listDetail.html