- 拦截器定义
拦截器参数命名为:interceptor.classes。官方文档解析如下:
A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.
即拦截发送到kafka服务器之前的消息,且在序列化&反序列化之前调用,序列化&反序列化又在分区策略之前调用,这个调用顺序在[kafka生产者&消费者]已经分析过了。拦截器允许修改key和value,同时拦截器可以指定多个,多个拦截器形成拦截器链,且有先后顺序,假定按照如下方式配置2个拦截器,那么会先调用AProducerInterceptor,再调用BProducerInterceptor,且调用BProducerInterceptor时的ProducerRecord是经过AProducerInterceptor修改过的ProducerRecord(如果在AProducerInterceptor中修改过ProducerRecord的话):
props.put("interceptor.classes", "com.afei.kafka.interceptor.AProducerInterceptor,com.afei.kafka.interceptor.BProducerInterceptor");
- ProducerInterceptor接口定义源码解读
public interface ProducerInterceptor<K, V> extends Configurable {
/**
* This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
* get serialized and partition is assigned (if partition is not specified in ProducerRecord).
* 即这个方法调用在KafkaProducer.send()之后,但是在key&value序列化之前,以及分配分区之前。
* <p>
* This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
* key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
* 这个方法中允许修改ProducerRecord即发送的消息,接下来的分区分配,根据修改后的key(如果会在onSend()中修改key的话)进行计算.
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
/**
* 不管消息发送成功还是发送过程抛出异常,这个方法都会执行
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
/**
* This is called when interceptor is closed
*/
public void close();
}
- 自定义拦截器实现
/**
* @author afei
* @version 1.0.0
* @since 2018年06月27日
*/
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
/**
* 统计发送成功数
*/
private static AtomicLong sendSuccess = new AtomicLong(0);
/**
* 统计发送失败数
*/
private static AtomicLong sendFailure = new AtomicLong(0);
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.outputSendStat();
// 改写ProducerRecord, 将key置为null, 分区全部交给kafka去决定
return new ProducerRecord<>(record.topic(),
record.partition(), record.timestamp(), null, record.value(),
record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 如果没有异常表示发送成功, 那么发送成功数+1, 否则发送失败数+1
if (exception!=null){
sendFailure.getAndIncrement();
}else{
sendSuccess.getAndIncrement();
}
}
/**
* 打印出发送的成功&失败次数的统计信息
*/
private void outputSendStat(){
long successCount = sendSuccess.get();
long failedCount = sendFailure.get();
System.out.println("success count: "+successCount+", failed count:"+failedCount);
}
@Override
public void close() {
this.outputSendStat();
}
@Override
public void configure(Map<String, ?> configs) {
}
}