kafka应答及重试策略
- kafka在生产者发送完一个消息之后,要求broker在规定的时间内Ack应答;如果没有在规定时间内ack;生产者会尝试n次重新发送消息。
- acks=1 (默认)Leader会将Record写在其本地日志中;但不会等待所有Follower的完全确认的情况下做出响应,这种情况下,如果Leader在确认记录后立即失败,在Follower复制记录之前失败,则记录会丢失
- acks=0 生产者不等待服务器确认,将记录加载到缓冲区即视为发送;这种情况不能保证服务器已收到记录
- acks=all 表示Leader将等待全套同步副本确认记录。保证至少一个同步副本仍处于活动状态,记录不会丢失。-- 等效于acks= -1。
1.创建生产者配置应答策略
public class KafkaProducerAcks {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//设置kafka的 Acks以及retries
props.put(ProducerConfig.ACKS_CONFIG,"all");
//不包含第一次发送,如果尝试3次都失败,则系统放弃发送 默认是Integer.MAX_VALUE
props.put(ProducerConfig.RETRIES_CONFIG,3);
//将检测超时的时间为4ms -- 为测试retries现象 默认是30000ms
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,4);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "ack" , "test acks");
//发送消息
producer.send(record);
producer.flush();
producer.close();
}
2.创建消费者测试应答机制
public class KafkaConsumerAcks {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"123.57.225.102:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG,"g1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅相关的topics
consumer.subscribe(Arrays.asList("topic01"));
//遍历消息队列
while(true){...}
}
}
思考:
我们上面代码配置了应答策略,将请求时间配置成4ms看到了重试的现象,但是我们在利用消费者进行消费的时候,我们实际上是对信息进行了4次消费,实际开发中如果我一条信息网络错误,在重试的策略下可能会造成重复消费,那么这个问题该如何解决呢? ---- [kafka 在0.11版本后支持了幂等性来解决这一问题]