1、先配置kafka的consumer的反序列化
配置key-deserializer 和value-deserializer
image.png
2、在batchFactory 中配置具体的json解析
@Bean("batchFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> batchFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> batchConsumerFactory) {
log.info("kafka服务器地址:" + kafkaConsumerPro.getBootstrapServers());
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(batchConsumerFactory);
// todo, thread setting
factory.setConcurrency(kafkaConsumerPro.getSource().getTopic().numPartitions() / kafkaConsumerPro.getConsumer().getInstanceNum());
factory.setBatchListener(true); //设置为批量消费,每个批次数量在Kafka配置参数中设置
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
factory.getContainerProperties().setPollTimeout(10000);
Properties props = new Properties();
// props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.alibaba.fastjson.JSONObject");
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE,"com.cheng.fei.FeiMessage");
factory.getContainerProperties().setKafkaConsumerProperties(props);
return factory;
}
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE,"com.cheng.fei.FeiMessage");
就是你想要把kafka中的一笔数据解析成具体的实例类
3、实例类写法
@Data
public class FeiMessage{
private String appId;
private String group;
private String code;
private String name;
public Object put(String key, Object value) {
String valStr = value.toString();
if("app_id".equals(key)){
setAppId(valStr);
}else if("group".equals(key)){
setGroup(valStr);
}else if("code".equals(key)){
setCode(valStr);
}else if("name".equals(key)){
setName(valStr);
}
return null;
}
}
相当于是把json串中的每个字段拿出来,解析成怎样,put方法里的key,value就是json串里的key,value