Java多consumer消费kafka

本项目基于springboot2

引入依赖

  <properties>
    <hutool-version>5.7.12</hutool-version>
    <lombok-version>1.18.20</lombok-version>
    <slf4j-api-version>1.7.10</slf4j-api-version>
    <logback-classic-version>1.0.13</logback-classic-version>
  </properties>
<dependency>
  <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>${hutool-version}</version>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>${lombok-version}</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>${slf4j-api-version}</version>
  </dependency>
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>${logback-classic-version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
  </dependency>

配置文件

创建subscribe/kafka.json,producer为预留参数

{
  "producer": {
    "init": {
      "bootstrap.servers": "50.1.172.137:9092",
      "acks": "all",
      "retries": 0,
      "batch.size": 16384,
      "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
      "value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
    },
    "topic": {
      "topicname1": "topic1",
      "topicname1": "topic2"
    }
  },
  "consumer": {
    "init": {
      "bootstrap.servers": "50.1.172.137:9092",
      "group.id": "daps10017dev",
      "enable.auto.commit": "false",
      "auto.commit.interval.ms": "1000",
      "session.timeout.ms": "30000",
      "max.poll.records": "100",
      "auto.offset.reset": "latest",
      "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
    },
    "topic": {
      "face": "SNAP_IMAGE_INFO_TOPIC",
      "dapsFace": "daps_face_snap_topic"
    },
    "thread": 10
  }
}

代码实现

KafkaConfig

import cn.hutool.core.io.FileUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.File;

@Configuration
public class KafkaConfig {

    @Bean
    public JSONObject kafkaConf() {
        //kafka配置
        String kafkaStr = FileUtil.readUtf8String(
                Thread.currentThread().getContextClassLoader().getResource("").getPath()
                        + File.separator + "subscribe/kafka.json");
        return JSONUtil.parseObj(kafkaStr);
    }
}

ThreadPoolConfig

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig {
    @Bean
    public ThreadPoolTaskExecutor subscribeThreadPool() {
        log.info("start subscribeThreadPool");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(10);
        //配置最大线程数
        executor.setMaxPoolSize(12);
        //配置队列大小
        executor.setQueueCapacity(0);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-subscribe-");

        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }

KafkaConfigPathConstants

public interface KafkaConfigPathConstants {
    /**
     * 消费者线程
     */
    String CONSUMER_THREAD = "consumer.thread";
    /**
     * topic
     */
    String CONSUMER_TOPIC = "consumer.topic";
    /**
     * 初始化参数
     */
    String CONSUMER_INIT = "consumer.init";
}

KafkaConsumerFactory

import cn.hutool.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Properties;

@Slf4j
public class KafkaConsumerFactory {

    public static KafkaConsumer<String, String> getSingletonConsumer(JSONObject kafkaConf) {
        //初始化参数
        JSONObject init = kafkaConf.getByPath(KafkaConfigPathConstants.CONSUMER_INIT, JSONObject.class);
        Properties props = new Properties();
        init.entrySet().stream().forEach(set -> props.put(set.getKey(), set.getValue()));
        return new KafkaConsumer<>(props);
    }

}

KafkaCache

import java.util.concurrent.CopyOnWriteArraySet;

public class KafkaCache {

    /**
     * 是否需要kafka守护线程启动kafka消费线程
     */
    public static final AtomicBoolean kafkaConsumerDaemonNeed = new AtomicBoolean(true);

    /**
     * 线程缓存,用于恢复线程运行
     */
    public static final CopyOnWriteArraySet<Thread> kafkaConsumerThread = new CopyOnWriteArraySet<>();
    /**
     * kafka消费者线程,用于停止消费者
     */
    public static final CopyOnWriteArraySet<KafkaConsumerThread> kafkaConsumerThread2 = new CopyOnWriteArraySet<>();
}

KafkaManager

import cn.hutool.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

@Component
@EnableScheduling   // 1.开启定时任务
@EnableAsync        // 2.开启多线程
@Slf4j
public class KafkaManager {

    @Autowired
    private ThreadPoolTaskExecutor subscribeThreadPool;

    @Resource
    private JSONObject kafkaConf;
    
    private AtomicInteger currentFaceConsumer = new AtomicInteger();

    private Integer consumerCount = Integer.MAX_VALUE;

    @Async
    @Scheduled(cron = "${subscribe.kafka.daemon.cron}")
    public void kafkaStart() {
        if (!KafkaCache.kafkaConsumerDaemonNeed.get()) {
            return;
        }
        if (currentFaceConsumer.get() >= consumerCount) {
            return;
        }
        try {
            //这里要检查下配置了topic没有。没有配置的话,不启动线程.
            Map topicMap = kafkaConf.getByPath(KafkaConfigPathConstants.CONSUMER_TOPIC, Map.class);
            if (topicMap.size() == 0) {
                return;
            }
            consumerCount = kafkaConf.getByPath(KafkaConfigPathConstants.CONSUMER_THREAD, Integer.class);
            //任务加入线程池
            while (consumerCount > currentFaceConsumer.get()) {
                subscribeThreadPool.execute(new KafkaConsumerThread(kafkaConf));
                currentFaceConsumer.incrementAndGet();
            }
        } catch (Exception e) {
            log.error("kafka消费者启动失败", e);
        }

    }

    @Async
    @Scheduled(cron = "${subscribe.kafka.thread.log.cron}")
    public void threadLog() {
        log.info("检查任务运行状态--运行标志={}", KafkaCache.kafkaConsumerDaemonNeed.get());
        log.debug("检查任务运行状态--线程存活数量={}", subscribeThreadPool.getActiveCount());
        log.debug("检查任务运行状态--线程池大小={}", subscribeThreadPool.getPoolSize());
        log.debug("检查任务运行状态--线程队列剩余长度={}", subscribeThreadPool.getThreadPoolExecutor().getQueue().remainingCapacity());
        log.debug("检查任务运行状态--线程队列使用长度={}", subscribeThreadPool.getThreadPoolExecutor().getQueue().size());
    }

}

KafkaConsumerThread

import cn.hutool.json.JSONObject;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

@Slf4j
@NoArgsConstructor
public class KafkaConsumerThread implements Runnable {

    private JSONObject kafkaConf;

    private KafkaConsumer<String, String> consumer;

    /**
     * 消费者开启状态
     */
    private AtomicBoolean opened = new AtomicBoolean(true);

    public KafkaConsumerThread(JSONObject kafkaConf) {
        this.kafkaConf = kafkaConf;
    }

    @Override
    public void run() {
        if (kafkaConf == null) {
            return;
        }
        consumer = initConsumer();
        KafkaCache.kafkaConsumerThread.add(Thread.currentThread());
        KafkaCache.kafkaConsumerThread2.add(this);
        consumerMsg();
    }

    /**
     * 初始化消费者
     *
     * @return
     */
    private KafkaConsumer<String, String> initConsumer() {
        KafkaConsumer<String, String> consumer = KafkaConsumerFactory.getSingletonConsumer(kafkaConf);
        Map<String, String> topic = kafkaConf.getByPath(KafkaConfigPathConstants.CONSUMER_TOPIC, Map.class);
        List<String> topicList = topic.entrySet().stream()
                .map(o -> o.getValue())
                .collect(Collectors.toList());
        consumer.subscribe(topicList);
        return consumer;
    }

    /**
     * 关闭消费
     */
    public void shutdownConsumer() {
        KafkaCache.kafkaConsumerDaemonNeed.set(false);
        //退出消费循环
        opened.set(false);
        // wakeup 可以安全地从外部线程来中断活动操作
        consumer.wakeup();
    }

    /**
     * 消费数据
     */
    private void consumerMsg() {
        ConsumerRecords<String, String> msgList;
        try {
            while (opened.get()) {
                //try catch放wile里面保证一直消费
                try {
                    msgList = consumer.poll(1000);
                    if (null == msgList || msgList.count() == 0) {
                        continue;
                    }
                    log.debug("消费到 {} 条数据", msgList.count());
                    for (ConsumerRecord<String, String> record : msgList) {
                        String topic = record.topic();
                        String value = record.value();
                        if (StringUtils.isBlank(value)) {
                            continue;
                        }
                        log.info("消费kafka数据。topic={};value={}", topic, value);
                        //todo
                        log.info("消费kafka数据。topic={} 数据处理完成", topic);
                    }
                    consumer.commitAsync();
                } catch (WakeupException e) {
                } catch (Exception e) {
                    if (e instanceof InterruptException) {
                        return;
                    }
                    log.error("消费数据报错", e);
                }
            }
        } finally {
            //最外层finally在退出时处理consumer提交、关闭
            try {
                consumer.commitSync();
                consumer.close();
            } catch (Exception e) {
            } finally {
                log.info("已关闭消费者");
            }
        }
    }
}

ShutdownConfig

import com.hikvision.daps.modules.subscribe.kafka.KafkaCache;
import com.hikvision.daps.modules.subscribe.kafka.KafkaConsumerThread;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;

@Configuration
@Slf4j
public class ShutdownConfig implements ApplicationRunner, Ordered {

    @Override
    public void run(ApplicationArguments args) {
        try {
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                // 在关闭钩子中执行收尾工作
                // 注意事项:
                // 1.在这里执行的动作不能耗时太久
                // 2.不能在这里再执行注册,移除关闭钩子的操作
                // 3 不能在这里调用System.exit()
                int delayTime = 15;
                log.info("关机前工作开始");
                try {
                    //关闭缓存中的消费者
                    KafkaCache.kafkaConsumerThread2.forEach(KafkaConsumerThread::shutdownConsumer);
                    // 主线程继续执行,以便可以关闭consumer,提交偏移量
                    KafkaCache.kafkaConsumerThread.forEach(c -> {
                        try {
                            c.join();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }

                long start = System.currentTimeMillis();
                while (true) {
                    //关闭要时间,先睡
                    try {
                        Thread.sleep(1000L);
                    } catch (Exception e) {
                    }
                    long now = System.currentTimeMillis();
                    if (now - start >= delayTime * 1000L) {
                        log.info("关机前工作超过{}秒,将强行关机", delayTime);
                        break;
                    }
//                KafkaCache.kafkaConsumerSet.forEach(c ->);
//                if (TaskCache.doingFacePlateNoEventCount.get() <= 0 && TaskCache.taskPushFaceConsumer.size() <= 0) {
//                    log.info("缓存的队列处理完毕");
//                    break;
//                }
                }
                log.info("关机前工作结束");
            }));
        } catch (Exception e) {
            log.info("关机前工作出错", e);
        }
    }

    @Override
    public int getOrder() {
        return Integer.MAX_VALUE;
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容