kafka幂等性
幂等性:
一次和多次请求某一个资源,资源本身应该具有同样的结果(网络超时等问题除外);即:其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
kafka幂等性:
- 0.11.0.0版本开始支持幂等性
- 幂等是针对生产者角度的特性;可以保证生产者发送的信息不丢失;不重复。
- kafka幂等的关键就是服务端可以区分请求是否重复,过滤掉重复请求。
服务端区分请求的两点要求:
- 唯一标识 - - 要想区分请求是否重复,请求中就得有唯一标识;例如:支付请求中的订单号就是唯一标识。
- 记录下已处理过的请求标识 - - 光有唯一标识还不够,还得记录下那些请求时已经处理过的,这样当接收新请求时,用新请求中的标识和处理记录进行比较;如果记录中间有相同的标识,说明重复记录,拒绝掉。
1.创建kafka生产者配置幂等
public class KafkaProducerIdempotence {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//设置kafka的 Acks以及retries
props.put(ProducerConfig.ACKS_CONFIG,"all");
//不包含第一次发送,如果尝试3次都失败,则系统放弃发送
props.put(ProducerConfig.RETRIES_CONFIG,3);
//将检测超时的时间为1ms -- 为测试retries现象
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1);
//开启kafka 幂等性 注意:在使用幂等时 必须开启acks=all和retires
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
//保证信息有序
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "idempotence" , "test idempotence");
//发送消息
producer.send(record);
producer.flush();
producer.close();
}
}
思考:
- kafka的幂等性只保证了一条记录在分区发送的原子性;如果要保证多条记录(多分区)之间的完整性,需要如何处理呢? ---- 开启kafka事务操作