springboot整合kafka,把收到的数据直接解析成对应的实例对象中

1、先配置kafka的consumer的反序列化

配置key-deserializer 和value-deserializer


image.png

2、在batchFactory 中配置具体的json解析

    @Bean("batchFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> batchFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> batchConsumerFactory) {
        log.info("kafka服务器地址:" + kafkaConsumerPro.getBootstrapServers());
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(batchConsumerFactory);
        // todo, thread setting
        factory.setConcurrency(kafkaConsumerPro.getSource().getTopic().numPartitions() / kafkaConsumerPro.getConsumer().getInstanceNum());
        factory.setBatchListener(true); //设置为批量消费,每个批次数量在Kafka配置参数中设置
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
        factory.getContainerProperties().setPollTimeout(10000);
        Properties props = new Properties();
//        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.alibaba.fastjson.JSONObject");
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE,"com.cheng.fei.FeiMessage");
        factory.getContainerProperties().setKafkaConsumerProperties(props);

        return factory;
    }

props.put(JsonDeserializer.VALUE_DEFAULT_TYPE,"com.cheng.fei.FeiMessage");
就是你想要把kafka中的一笔数据解析成具体的实例类

3、实例类写法

@Data
public class FeiMessage{
    private String appId;

    private String group;

    private String code;

    private String name;

    public Object put(String key, Object value) {
        String valStr = value.toString();
        if("app_id".equals(key)){
            setAppId(valStr);
        }else if("group".equals(key)){
            setGroup(valStr);
        }else if("code".equals(key)){
            setCode(valStr);
        }else if("name".equals(key)){
            setName(valStr);
        }
        return null;
    }
}

相当于是把json串中的每个字段拿出来,解析成怎样,put方法里的key,value就是json串里的key,value

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容