kafka多线程、定时、按时间段消费

最近做大数据相关的工作,用到了kafka。因为时间工期较赶的缘故,消费工程设计得比较简单、没有集群、没有分布式。单机的小钢炮,跑起来处理一天将近小100万的数据量,还是有点生猛的,不过暂时也能hold住。主要是把kafka生产出来的前一天数据,集中到凌晨以后流量低峰期去处理。要是放到白天或者遇到流量高峰期,实时处理的话,那对数据库、服务器来说还是有点压力的,其他业务可能直接就被拖死。那这样,就涉及消费者,按开始时间和结束时间,来处理数据。offsetsForTimes()这个方法按时间偏移量找的时候,会在你提供的时间点的附近往前查找偏移量,简单说,你想找 2019-05-21 00:00:00 至 2019-05-21 23:59:59 这一天产生的消息,那kafka在找结束时间为 2019-05-21 23:59:59 的时候,消息找不着,那它就往前继续找最近一条消息的位置,就可能找到 2019-05-22 00:00:03 这个时间点的消息,显然不符合我们只需要消费前一天数据的要求。所以,我们得往后,回溯,我这里是按照1秒的时间间隔往后查找,直至找到为止。

在网上找了很多kafka关于按某个时间戳消费的资料,并不是按时间段,那些都不是特别理想。后边自己想了想办法,根据kafka官方提供的consumerAPI,粗略的实现了这一功能,再加上多线程之后,处理速度相当快!处理消息数据时,当计算出来了前一天的消息量之后,在多线程处理消息的过程中,采用计数的方式,来停止消费,即100万条消息全部消费完毕时,便关闭客户端连接。当然这其中肯定还会存在很多其他问题,待完善补充,如何确保这100万条消息全部被完整的处理掉,或者在处理期间程序出异常了,中断消费了呢,异常消息的如何处理、补偿,重复消费的控制等等。
现在把核心的demo代码贴出来,供参考。

线程池的配置

TaskExecutorConfig

import org.apache.log4j.Logger;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* @author yangpin
* @Description
* @Date 2018/8/15 16:08
* @Param
* @return
**/
@Configuration
@EnableAsync
public class TaskExecutorConfig  implements AsyncConfigurer {

    private static final Logger logger =Logger.getLogger(TaskExecutorConfig.class);

    @Autowired
    private TaskThreadPoolConfig config;

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(config.getCorePoolSize());
        executor.setMaxPoolSize(config.getMaxPoolSize());
        executor.setQueueCapacity(config.getQueueCapacity());
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        executor.setThreadNamePrefix("mq-executor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler() {
            @Override
            public void handleUncaughtException(Throwable arg0, Method arg1, Object... arg2) {
                logger.error("=========================="+arg0.getMessage()+"=======================", arg0);
                logger.error("exception method:"+arg1.getName());
            }
        };
    }

}

自定义配置类

TaskThreadPoolConfig

import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author yangpin
* @Description
* @Date 2018/8/15 16:08
* @Param
* @return
**/
@ConfigurationProperties(prefix ="spring.task.pool")
public class TaskThreadPoolConfig {

    private int corePoolSize;
    private int maxPoolSize;
    private int keepAliveSeconds;
    private int queueCapacity;



    public int getCorePoolSize() {return corePoolSize; }
    public void setCorePoolSize(int corePoolSize) {this.corePoolSize = corePoolSize;}
    public int getMaxPoolSize() {return maxPoolSize;}
    public void setMaxPoolSize(int maxPoolSize) {this.maxPoolSize = maxPoolSize;}
    public int getKeepAliveSeconds() {return keepAliveSeconds;}
    public void setKeepAliveSeconds(int keepAliveSeconds) {this.keepAliveSeconds = keepAliveSeconds;}
    public int getQueueCapacity() {return queueCapacity; }
    public void setQueueCapacity(int queueCapacity) {this.queueCapacity = queueCapacity; }


}



消费端配置


import com.xyy.bi.configure.mq.MqConfigProperties;
import com.xyy.bi.service.SourceDataService;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;

/**
* @author yangpin
* @Description mq配置
* @Date 2019/5/13 10:16
* @Param
* @return
**/
@Configuration
@EnableKafka
public class MqConfig {

    private static final Logger logger =LoggerFactory.getLogger(MqConfig.class);

    @Autowired
    MqConfigProperties mqConfigProperties;

    @Autowired
    SourceDataService sourceDataService;

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    @Value("${spring.kafka.ocnsumer.session-timeout}")
     private String sessionTimeout;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.max-poll-interval}")
     private String maxPollInterval;

    @Value("${spring.kafka.listener.concurrency}")
     private Integer concurrency;

    @Value("${kafka.app.topic.test1}")
     private String test1Topic;

    @Value("${kafka.app.topic.test2}")
     private String test2Topic;

    @Value("${kafka.app.topic.test3}")
     private String test3Topic;

    @Bean
    public KafkaListenerContainerFactory kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory factory =new ConcurrentKafkaListenerContainerFactory();
        factory.setConcurrency(concurrency);
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setPollTimeout(300000);
        return factory;
    }

    public MapconsumerConfigs() {
        Mapprops =new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollInterval);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
        return props;
    }

    @Bean
    public ConsumerFactory consumerFactory(){
        DefaultKafkaConsumerFactory consumerFactory =new             
        DefaultKafkaConsumerFactory(consumerConfigs());
        return consumerFactory;
    }

    public static Logger getLogger() { return logger;  }
    public String getAutoOffsetReset() { return autoOffsetReset; }
    public String getTest1Topic() { return test1Topic; }
    public String getTest2Topic() { return test2Topic; }
    public String getTest3Topic() { return test3Topic; }
}

test1消费者


import com.xyy.bi.service.SourceDataService;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;

/**
* @author yangpin
* @Description mq数据处理
* @Date 2019/5/13 10:17
* @Param
* @return
**/
public class MqTest1Consumer extends ShutdownableThread {

    private static final Logger logger =LoggerFactory.getLogger(MqTest1Consumer.class);
    
    private final KafkaConsumerconsumer;
    private final long endOffset ;
    private final long startOffset ;
    private long counts  ;
    private final MqConfig configs;
    SourceDataService sourceDataService;

    public MqTest1Consumer(MqConfig configs,SourceDataService sourceDataService, KafkaConsumer consumer,long startOffset,long endOffset) {
        super("test1-consumer", false);
        this.configs = configs;
        this.consumer = consumer;
        this.startOffset = startOffset;
        this.endOffset = endOffset;
        this.sourceDataService = sourceDataService;
//        consumer = new KafkaConsumer<>(configs.consumerConfigs());

    }

    @Override
    public void doWork() {
        try {
              //consumer.assign(topicPartitions);
              ConsumerRecordsrecords =consumer.poll(Duration.ofSeconds(configs.mqConfigProperties.getFrequency()));
                    
               if (records ==null ||records.count() ==0 ){
                 consumer.close();
                 shutdown();
                }

              for (final ConsumerRecordrecord :records) {
                    if (record.offset() <=endOffset){
                         counts++;
                    
                      //此处为你的消息数据业务处理


                         logger.info("总计需要处理条数: " + (endOffset-startOffset) +" ,test1第: "+counts+ " 条 , test1结束offset = " + 
                         endOffset + " , test1当前offset = " + record.offset());
                         consumer.commitSync();
                     }else {
                        break;
                     }
               }
             if ((endOffset -startOffset) == counts){
                 consumer.close();
                 shutdown();
             }
          }catch (Exception e){
            logger.error("mq消息队列处理异常!" + e.getMessage());
            e.printStackTrace();
          }
     }

    @Override
    public boolean isInterruptible() {return false;}

test2消费者

import com.xyy.bi.service.SourceDataService;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;

public class MqTest2Consumer  extends ShutdownableThread {

    private static final Logger logger =LoggerFactory.getLogger(MqTest2Consumer.class);

    private final KafkaConsumerconsumer;
    private final long endOffset ;
    private final long startOffset ;
    private final MqConfig configs;
    SourceDataService sourceDataService;
    private long counts  ;

    public MqTest2Consumer(MqConfig configs, SourceDataService sourceDataService, KafkaConsumer consumer,long startOffset,long endOffset) {
        super("test2-consumer", false);
        this.configs = configs;
        this.consumer = consumer;
        this.startOffset = startOffset;
        this.endOffset = endOffset;
        this.sourceDataService = sourceDataService;
        //consumer = new KafkaConsumer<>(configs.consumerConfigs());
    }

    @Override
    public void doWork() {
      try {
            //consumer.assign(topicPartitions);
            ConsumerRecordsrecords =consumer.poll(Duration.ofSeconds(configs.mqConfigProperties.getFrequency()));
     
            if (records ==null ||records.count() ==0 ){
                 consumer.close();
                 shutdown();
            }
          

            for (final ConsumerRecordrecord :records  ) {
                 if (record.offset() <=endOffset){

                     //此处为你的消息数据业务处理

                      counts++;
                      logger.info("总计需要处理条数: " + (endOffset-startOffset) +" ,test2第: "+counts+ " 条 , test2结束offset = " +         
                                      endOffset + " , test2当前offset = " + record.offset());
                      consumer.commitSync();
                 }else {
                     break;
                 }
              }
            if ((endOffset -startOffset) ==counts){
                consumer.close();
                shutdown();
             }
        }catch (Exception e){
              logger.error("mq消息队列处理异常!" + e.getMessage());
              e.printStackTrace();
        }
    }

    @Override
    public boolean isInterruptible() { return false;  }

核心任务处理类


import com.xyy.bi.service.SourceDataService;
import com.xyy.bi.thread.TaskExecutorConfig;
import com.xyy.bi.utils.DateUtil;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;


@Component
@Configuration
@EnableScheduling
public class MqTask {


    private static final Logger logger = LoggerFactory.getLogger(MqTask.class);


    @Autowired
    TaskExecutorConfig taskExecutorConfig;

    @Autowired
    MqConfig mqConfig;

    @Autowired
    SourceDataService sourceDataService;

    /**
    * @author yangpin
    * @Description kafka定时消费
    * @Date 2019/5/21 18:06
    * @Param []
    * @return void
    **/
    //每天凌晨0点
    @Scheduled(cron = "0 0 00 * * ?")
    private void MqTask() {
        try {
            logger.info("mq消息队列消费线程初始化开始!......");
            ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutorConfig.getAsyncExecutor();
            KafkaConsumer<Integer, String> test1Consumer = new KafkaConsumer(mqConfig.consumerConfigs());
            KafkaConsumer<Integer, String> test2Consumer = new KafkaConsumer(mqConfig.consumerConfigs());

            List<PartitionInfo> test1PartitionInfos = test1Consumer.partitionsFor(mqConfig.getTest1Topic());
            List<PartitionInfo> test2PartitionInfos = test2Consumer.partitionsFor(mqConfig.getTest2Topic());

            List<TopicPartition> test1TopicPartitions = new ArrayList<>();
            List<TopicPartition> test2TopicPartitions = new ArrayList<>();

            Map<TopicPartition, Long> test1StartTimestampsToSearch = new HashMap<>();
            Map<TopicPartition, Long> test1EndTimestampsToSearch = new HashMap<>();

            Map<TopicPartition, Long> test2StartTimestampsToSearch = new HashMap<>();
            Map<TopicPartition, Long> test2EndTimestampsToSearch = new HashMap<>();

            final AtomicLong test1StartOffset = new AtomicLong(0L);
            final AtomicLong test1EndOffset = new AtomicLong(0L);


            final AtomicLong test2StartOffset = new AtomicLong(0L);
            final AtomicLong test2EndOffset = new AtomicLong(0L);

            //是否开启偏移消费
            if (mqConfig.mqConfigProperties.getOffset() == true  && mqConfig.getAutoOffsetReset().equals("latest")){
                logger.info("偏移消费开启!......");

                Date now = new Date();
                Calendar calendar = Calendar.getInstance();
                calendar.setTime(now);
                calendar.add(calendar.DATE, -1);
                SimpleDateFormat sd= new SimpleDateFormat(DateUtil.DEFALT_DATE_FORMAT);
                SimpleDateFormat df= new SimpleDateFormat(DateUtil.DATE_FORMATE_YYYYMMDDHHMMSS);

                logger.info("当前时间:   " + DateUtil.getDate(DateUtil.DATE_FORMATE_YYYYMMDDHHMMSS) +"");
                logger.info("偏移消费时间段:" + sd.format(calendar.getTime()) + " 00:00:00" +  " 至 " + sd.format(calendar.getTime()) + " 23:59:59" );

                test1PartitionInfos.forEach(n ->{
                    test1TopicPartitions.add(new TopicPartition(n.topic(), n.partition()));
                    //开始时间
                    test1StartTimestampsToSearch.put(new TopicPartition(n.topic(), n.partition()), DateUtil.getLastDayStartTimeStamp(0));
                    test1EndTimestampsToSearch.put(new TopicPartition(n.topic(), n.partition()), DateUtil.getLastDayStartTimeStamp(1));
                });

                test2PartitionInfos.forEach(n ->{
                    test2TopicPartitions.add(new TopicPartition(n.topic(), n.partition()));
                    test2StartTimestampsToSearch.put(new TopicPartition(n.topic(), n.partition()), DateUtil.getLastDayStartTimeStamp(0));
                    test2EndTimestampsToSearch.put(new TopicPartition(n.topic(), n.partition()), DateUtil.getLastDayStartTimeStamp(1));
                });
                test1Consumer.assign(test1TopicPartitions);
                test2Consumer.assign(test2TopicPartitions);
                // 获取每个partition指定时间之前的偏移量
                Map<TopicPartition, OffsetAndTimestamp> test1StartTimeMap = test1Consumer.offsetsForTimes(test1StartTimestampsToSearch);
                Map<TopicPartition, OffsetAndTimestamp> test1EndTimeMap = test1Consumer.offsetsForTimes(test1EndTimestampsToSearch);

                Map<TopicPartition, OffsetAndTimestamp> test2StartTimeMap = test2Consumer.offsetsForTimes(test2StartTimestampsToSearch);
                Map<TopicPartition, OffsetAndTimestamp> test2EndTimeMap = test2Consumer.offsetsForTimes(test2EndTimestampsToSearch);

                logger.info("开始设置各分区初始偏移量!......");
                offsetHandle(test1StartTimeMap,test1EndTimeMap,test1StartOffset,test1EndOffset,test1EndTimestampsToSearch,test1Consumer,df);
                offsetHandle(test2StartTimeMap,test2EndTimeMap,test2StartOffset,test2EndOffset,test2EndTimestampsToSearch,test2Consumer,df);
                logger.info("设置各分区初始偏移量完毕!......");


            }else if (mqConfig.getAutoOffsetReset().equals("earliest") && mqConfig.mqConfigProperties.getOffset() == false){
                test1PartitionInfos.forEach(n ->{
                    test1TopicPartitions.add(new TopicPartition(n.topic(), n.partition()));
                });
                test2PartitionInfos.forEach(n ->{
                    test2TopicPartitions.add(new TopicPartition(n.topic(), n.partition()));
                });
                logger.info("isSetOffsetTime = " + mqConfig.mqConfigProperties.getOffset() + "消费策略 = " + mqConfig.getAutoOffsetReset() );
                test1Consumer.assign(test1TopicPartitions);
                test2Consumer.assign(test2TopicPartitions);
            }else {
                logger.error("mq消息参数配置有误,请检查配置文件!");
                System.exit(-1);
            }
            executor.execute(new MqTest1Consumer(mqConfig,sourceDataService,test1Consumer,test1StartOffset.get(),test1EndOffset.get()));
            executor.execute(new MqTest2Consumer(mqConfig,sourceDataService,test2Consumer,test2StartOffset.get(),test2EndOffset.get()));
            logger.info("mq消息队列消费线程初始化完成!......");
        }catch (Exception e){
            e.printStackTrace();
            logger.error("mq消息队列消费线程初始化失败!......" + e.getMessage());
            System.exit(-1);
        }
    }




    /**
    * @author yangpin
    * @Description offset偏移处理
    * @Date 2019/5/21 18:05
    * @Param [startTimeMap, endTimeMap, startOffset, endOffset, endTimestampsToSearch, consumer, df]
    * @return void
    **/
    private void offsetHandle(Map<TopicPartition, OffsetAndTimestamp> startTimeMap,
                              Map<TopicPartition, OffsetAndTimestamp> endTimeMap,
                              final AtomicLong startOffset,
                              final AtomicLong endOffset,
                              Map<TopicPartition, Long> endTimestampsToSearch,
                              KafkaConsumer<Integer, String> consumer,
                              SimpleDateFormat df){

        startTimeMap.forEach((k,v) ->{
            OffsetAndTimestamp startOffsetTimestamp =  v;
            OffsetAndTimestamp endOffsetTimestamp =  endTimeMap.get(k);
            if(startOffsetTimestamp != null) {
                long endTimestamp = 0L;
                String topic = k.topic();
                int partition = k.partition();
                long startTimestamp = startOffsetTimestamp.timestamp();
                long startOffsetTmp = startOffsetTimestamp.offset();
                if (endOffsetTimestamp != null){
                    //86,400,000
                    //86,399,000
                    endTimestamp = endOffsetTimestamp.timestamp();
                    endOffset.set(endOffsetTimestamp.offset());
                    long lastDayEndTime = DateUtil.getLastDayStartTimeStamp(1);
                    boolean flag = false;
                    if (endTimestamp > lastDayEndTime){
                        while (true){
                            endTimestamp = endTimestamp - 1000;
                            //往后回溯一秒查找
                            endTimestampsToSearch.put(new TopicPartition(k.topic(), k.partition()), endTimestamp);
                            Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(endTimestampsToSearch);
                            for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {
//                                        logger.info("反向查找时间节点 = " + df.format(new Date(entry.getValue().timestamp())));
                                if (entry.getValue().timestamp() <= lastDayEndTime){
                                    endTimestamp = entry.getValue().timestamp();
                                    endOffset.set(entry.getValue().offset());
                                    flag = true;
                                    break;
                                }
                            }
                            if (flag == true) break;
                        }
                    }
                }
                logger.info("consumer : " + " topic = " + topic + " , partition = " +
                        partition + " , period of time = " + df.format(new Date(startTimestamp))+" - " + df.format(new Date(endTimestamp))
                        + " , period of offset = " + startOffsetTmp + " - " + endOffset.get() +" ,共计: " + (endOffset.get() - startOffsetTmp));
                // 设置读取消息的偏移量
                startOffset.set(startOffsetTmp);
                consumer.seek(k, startOffsetTmp);
            }
        });

    }



}

pom文件主要用到的包

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-kafka.version>2.2.5.RELEASE</spring-kafka.version>
        <kafka.version>2.2.0</kafka.version>
    </properties>

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>${spring-kafka.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>${kafka.version}</version>
     </dependency>

application.properties主要配置


#mq configuration
#是否自动提交偏移量
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.group-id=test-group
spring.kafka.consumer.bootstrap-servers=localhost:9092,
#指定消费策略(earliest|latest|none)
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.auto-commit-interval=5000
spring.kafka.consumer.max-poll-records=1000
spring.kafka.consumer.max-poll-interval=300000
spring.kafka.ocnsumer.session-timeout=150000
spring.kafka.listener.concurrency=5

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#topic
kafka.app.topic.test1=test1
kafka.app.topic.test2=test2

kafka.consumer.frequency=20
kafka.consumer.offset=true
kafka.consumer.offsetTime=2

spring.task.pool.corePoolSize=30
spring.task.pool.maxPoolSize=30
spring.task.pool.keepAliveSeconds=70
spring.task.pool.queueCapacity=25

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342