java实现kafka消费端

1.pom.xml


<dependency>

  <groupId>org.apache.kafka

  <artifactId>kafka-clients</artifactId>

  <version>0.10.2.1</version>

</dependency>

<dependency>

  <groupId>org.springframework.kafka

  <artifactId>spring-kafka

  <version>1.2.3.RELEASE</version>

</dependency>

2.Application.yml

spring:

    kafka:

        bootstrap-servers: 10.254.8.29:9092

        listener:

            concurrency: 3

        consumer:

            group-id: GP_DATAMALL_DEFAULT_CONSUMER

            key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

            value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

            auto-offset-reset: earliest

3.定义kafka消费基础类



@Configuration

public class BaseKafkaComsumerService {

protected final static Loggerlogger = LoggerFactory.getLogger(BaseKafkaComsumerService.class);

    /**

* 消费组的开头

*/

    private static final String GROUP_START ="GP-";

    /**

* 话题前缀

*/

    protected static final String TOPIC_START ="TP-";

    /**

* 最大消费量

*/

    @Value("${dataservice.consumer.consume.max:1}")

private IntegerCONSUME_MAX =1;

    @Value("${spring.kafka.bootstrap-servers:localhost:9092}")

private StringkafkaServers;

    @Value("${dataservice.consumer.poll.timeout.ms:2000}")

private LongpollTimeout;

    @Value("${dataservice.consumer.auto.commit.interval.ms:1000}")

private StringcommitInterval;

    @Value("${dataservice.consumer.session.timeout.ms:15000}")

private StringsessionTimeout;

    @Value("${dataservice.consumer.max.partition.fetch.bytes:1048576}")

private StringmaxPartitionFetchBytes;

    @Value("${dataservice.consumer.request.timeout:16000}")

private IntegerrequestTimeout;

    @Autowired

    private AsyncServiceasyncService;

    @Autowired

    protected ClientOffsetServiceclientOffsetService;

    /**

* 获取客户端的消费组

    * @param clientId

    * @return

    */

    protected StringgetConsumerGroup(String clientId){

return new StringBuilder().append(GROUP_START).append(clientId).toString();

    }

/**

* 消费最新主题记录

    * @param groupId

    * @param topic

    * @return

    */

    protected KafKaResult  consumeHead(KafkaConsumer consumer, String groupId, String topic)throws Exception{

TopicPartition tp =new TopicPartition(topic, 0);

        /*返回值*/

        ConsumeDataResult result =new ConsumeDataResult();

        List baseResult =new ArrayList();

        /*最后偏移量*/

        long lastOffset = -1L;

        try {

             consumer.assign(Arrays.asList(tp));

            Long maxOffset =this.getCurrentMaxOffset(consumer, tp);

            //设置偏移量为最后一个的前移一位

            setConsumerOffset(consumer, tp, maxOffset-1);

            ConsumerRecords records = consumer.poll(pollTimeout);

            for (ConsumerRecord record : records){

baseResult.add(this.bulidConsumeBase(record.value(), record.offset()));

                lastOffset = record.offset();

                consumer.commitAsync();

                if (baseResult.size() >=CONSUME_MAX ){

break;

                }

}

}catch (Exception e) {

logger.error("获取topic数据失败",e);

            return KafKaResult.fail("10000001", "获取topic数据失败");

        }

if (lastOffset >=0) {

clientOffsetService.saveOffset(groupId,topic,lastOffset);

        }

result.setOffset(lastOffset);

        result.setResult(baseResult);

        return KafKaResult.success(result);

    }

/**

* 从上次记录的开始消费

    * @param groupId

    * @param topic

    * @return

    */

    protected KafKaResultconsumeRemember(KafkaConsumer consumer,String groupId, String topic)throws Exception {

        Long offset =clientOffsetService.queryOffset(groupId,topic,null);//查询客户端offset,根据实际项目中存储规则自己实现

        logger.info("type:2 redis offset get.... groupId:{}  topic:{}  offset:{}",groupId,topic,offset);

        if(offset ==null) {//防止起始位置小于offset值,接口永远拿不到数据

            offset =0L;

        }else {

offset = offset+1;

        }

//排除超时异常

        KafKaResult result = consumeOffset(consumer,groupId, topic,offset,null);

        if(!result.isSuccess() && result.getCode().equals("10000012")) {

ConsumeDataResult resultData =new ConsumeDataResult();

            resultData.setOffset(-1L);

            resultData.setResult(null);

            return KafKaResult.success(resultData);

        }

return result;

    }

/**

* 指定偏移量去消费主题记录

    * @param groupId

    * @param topic

    * @param offset

    * @return

    */

    protected KafKaResult consumeOffset(KafkaConsumer consumer,String groupId, String topic,Long offset,Long endOffset)throws Exception{

ConsumeDataResult result =new ConsumeDataResult();

        List baseResult =new ArrayList();

        /*最后偏移量*/

        long lastOffset = -1L;

        TopicPartition tp =new TopicPartition(topic, 0);

        consumer.assign(Arrays.asList(tp));

        //校验最小

        Long beginOffset =  getCurrentBeginOffset(consumer, tp);

        if(offset !=null

                && offset !=0L && beginOffset > offset) {

ConsumeDataResult errorResult =new ConsumeDataResult();

            errorResult.setOffset(beginOffset);

            logger.error("输入offset小于实际topic中开始offset值. 输入Offset:{}  topic实际Offset:{}",offset,beginOffset);

            return KafKaResult.fail("10000011", "输入offset小于实际topic中开始offset值."+" 输入Offset:"+offset+" topic实际Offset:"+beginOffset,errorResult);

        }

//校验最大

        Long maxOffset =this.getCurrentMaxOffset(consumer, tp);

        if(offset >= maxOffset) {

ConsumeDataResult errorResult =new ConsumeDataResult();

            errorResult.setOffset(maxOffset-1);

            logger.error("输入offset大于实际topic中开始offset值. 输入Offset:{}  topic实际Offset:{}",offset,maxOffset-1);

            return KafKaResult.fail("10000012", "输入offset大于实际topic中开始offset值. 输入Offset:"+offset+" topic实际Offset:"+(maxOffset-1),errorResult);

        }

ConsumerRecords records =null;

        long lastOffsetNum = -1L;

        if (0 == offset) {

consumer.seekToBeginning(Arrays.asList(tp));

        }else {

//设置偏移量为最后一个的前移一位

            setConsumerOffset(consumer, tp, offset);

        }

logger.info("groupId:{} --topic:{} --offset:{} --endOffset:{} --maxOffset:{}",groupId,topic,offset,endOffset,maxOffset);

        if(endOffset ==null) {

endOffset = maxOffset;

        }

try {

while (true) {

records = consumer.poll(pollTimeout);

                if (records.isEmpty()){

break;

                }

for (ConsumerRecord record : records){

lastOffsetNum = record.offset();

                    if (baseResult.size() >=CONSUME_MAX || lastOffsetNum >= maxOffset || lastOffsetNum > endOffset){

break;

                    }

baseResult.add(bulidConsumeBase(record.value(), record.offset()));

                    lastOffset = record.offset();

                    consumer.commitAsync();

                }

if (baseResult.size() >=CONSUME_MAX || lastOffsetNum >= maxOffset || lastOffsetNum >= endOffset){

break;

                }

}

}catch (Exception e) {

logger.error("获取topic数据错误",e);

            return KafKaResult.fail("10000001", "获取topic数据错误");

        }

result.setOffset(lastOffset);

        result.setResult(baseResult);

        if (lastOffset >=0) {

clientOffsetService.saveOffset(groupId,topic,lastOffset);

        }

return KafKaResult.success(result);

    }

/**

*

    * @Description: 组装数据

    * @param @param data 原始数据

    * @param @param offset 偏移量

    * @return ConsumeDataBaseResult

    * @throws

    */

    public ConsumeDataBaseResult  bulidConsumeBase(String data,Long offset) {

ConsumeDataBaseResult result =new ConsumeDataBaseResult();

        result.setData(JSONObject.parseObject(data));

        result.setOffset(offset);

        return result;

    }

/**

* 设置话题的偏移量

*/

    private void setConsumerOffset(KafkaConsumer consumer, TopicPartition partition, Long offset){

if (offset <0) {

consumer.seek(partition, 0 );

        }else {

consumer.seek(partition, offset );

        }

}

/**

* 获取当前消费的分区最大偏移量

    * @param consumer

    * @param tp

    * @return

    */

    private LonggetCurrentMaxOffset(KafkaConsumer consumer, TopicPartition tp){

Map map = consumer.endOffsets(Arrays.asList(tp));

        if (CollectionUtils.isEmpty(map)) {

return 0L;

        }

Long maxOffset = map.get(tp);

        return maxOffset;

    }

/**

* 获取当前消费的分区起始偏移量

    * @param consumer

    * @param tp

    * @return

    */

    private LonggetCurrentBeginOffset(KafkaConsumer consumer, TopicPartition tp){

Map map = consumer.beginningOffsets(Arrays.asList(tp));

        if (CollectionUtils.isEmpty(map)) {

return 0L;

        }

Long beginningOffset = map.get(tp);

        return beginningOffset;

    }

/**

* 设置一些消费信息

*/

    protected void transConsumeInfo(ConsumeDataParam param){

param.setGroup(getConsumerGroup(param.getClientId()));

        param.setTopic(TOPIC_START+param.getTopic());

    }

/**

* kafka消费属性

    * @param group : 所属消费组

    * @return

    */

    private MapconsumerProps(String group) {

return consumerProps(group,true);

    }

/**

* kafka消费属性

    * @param group : 所属消费组

    * @param autoCommit : 是否自动提交

    * @return

    */

    private Map consumerProps(String group, boolean autoCommit) {

Map props =new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);

        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);

        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, commitInterval);

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);

        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);

        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,CONSUME_MAX);

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        return props;

    }

/**

*

    * @Description: 创建consumer

    * @param @param groupId

    * @param @return

    * @return KafkaConsumer

* @throws

    */

    protected KafkaConsumer createConsumer(String groupId){

Map props =this.consumerProps(groupId,false);

        KafkaConsumer consumer =new KafkaConsumer<>(props);

        return consumer;

    }

}


4.kafka消费服务实现类.



@Service

public class KafkaComsumerServiceImpl extends BaseKafkaComsumerService implements KafkaComsumerService {

@Autowired

    private AsyncServiceasyncService;

    /**

* 消费

*

    * @param consumeDataParam

    * @return

    */

    public KafKaResult consumeData(ConsumeDataParam consumeDataParam)throws ServiceException {

super.transConsumeInfo(consumeDataParam);

        Integer type = consumeDataParam.getType();

        //1.生成consumer

        KafkaConsumer consumer =super.createConsumer(consumeDataParam.getGroup());

        //2.处理数据

        try {

switch (type) {

case 0:

return super.consumeOffset(consumer, consumeDataParam.getGroup(), consumeDataParam.getTopic(), 0L, null);

                case 1:

return super.consumeHead(consumer, consumeDataParam.getGroup(), consumeDataParam.getTopic());

                case 3:

return super.consumeOffset(consumer, consumeDataParam.getGroup(), consumeDataParam.getTopic(), consumeDataParam.getOffset(), consumeDataParam.getEndOffset());

                default:// 2:

                    return super.consumeRemember(consumer, consumeDataParam.getGroup(), consumeDataParam.getTopic());

            }

}catch (Exception e) {

throw new ServiceException(e);

        }finally {

this.asyncService.colse(consumer);

        }

}

@Override

    public KafKaResult  consumeOffset(ConsumerOffsetParam param)throws ServiceException {

super.clientOffsetService.saveOffset(super.getConsumerGroup(param.getClientId()), TOPIC_START + param.getTopic(), param.getOffset());

        return KafKaResult.success("");

    }

}

异步类:调用kafka消费段关闭


import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.springframework.scheduling.annotation.Async;

import org.springframework.stereotype.Component;

/**

* @ClassName:  AsyncService

* @Description: 异步方法调用) 

*/

@Component

public class AsyncService {

@Async

    public void colse(KafkaConsumer consumer) {

consumer.close();

    }

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容