一、技术说明
springboot2.0
kafka1.0.2
spring-kafka
采用配置文件的方式配置kafka,因为springboo配置文件配置并不全
二、场景说明
客户服务器部署kafka集群,目前是3台,每天的数据量大约是100万条,存入kafka中,我们这边需要消费kafka中数据,取出kafka中数据入库并发送到大屏页面
三、代码截图
1、配置文件
2、配置类
3、代码
四、遇到的问题
1、目前的代码是一条条获取数据,本来想做成批量处理,每次发送过来1000条数据,然后再进行处理,但发现每次关掉服务,再重新启动,数据的偏移量很大,也就是数据丢失很多,所以目前是采用一条条获取数据,这样能保证数据没有偏移。而且如果处理过程简单,一条条这样处理也很快,一秒能几万条。
2、消费者有两种提交策略,自动提交和手动提交。如果消费者处理数据过程简单,耗时较小,那么这两种策略都没问题,都能进行提交。但消费者处理数据过程复杂,耗时较大,使用自动提交时,取出的数据在session.timeout.ms时间内没有处理完成,自动提交offset失败,然后kafka会重新分配partition给消费者,消费者又重新消费之前的一批数据,又出现了消费超时,所以会造成死循环,一直消费相同的数据。
3、代码中使用Acknowledgment进行消息提交,很多文章中都说使用KafkaConsumer,同步提交,但我测试了一上午,数据不会重复,但是每次关闭重启偏移量都是500,不知道因为什么问题。
4、配置文件中能配置消费线程数,由于topic分区数是3个,所以这里设置线程数为3,但感觉速度也不快。
五、目前存在的问题
1、由于消费者处理速度低,导致获取数据一直延时,跑两天数据,延时大约4小时。无法满足实时获取数据的要求。
2、目前想到的方法是增加分区数、增加消费者线程、异步入库(但不满足项目需求)
六、知识点
1、一个topic中可以有多个分区,每个分区都存着数据,负载均衡由kafka内部机制控制,一个分区中数据只能被一个消费组中一个消费者消费。
2、一个消费者能消费多个分区中数据,网上说最好一个分区由一个消费者消费,或者设置线程数等于分区数
3、auto.offset.reset这个参数的说明
消费组中如果存在已经提交的offest时,不管设置为earliest 或者latest 都会从已经提交的offest处开始消费
消费组中如果不存在已经提交的offest时,earliest 表示从头开始消费,latest 表示从最新的数据消费,也就是新产生的数据.
none topic各分区都存在已提交的offset时,从提交的offest处开始消费;只要有一个分区不存在已提交的offset,则抛出异常
如果想让auto.offset.reset生效,那么最好的办法是重新设置一个groupid
4、分区中数据是有序的,一条数据对应一个分区中一个偏移量,各个分区中偏移量可以相同,这个要注意。