spring kafka之如何批量给topic加前缀

前言

最近业务开发部门给我们部门提了一个需求,因为他们开发环境和测试环境共用一套kafka,他们希望我们部门能帮他们实现自动给kafka的topic加上环境前缀,比如开发环境,则topic为dev_topic,测试环境,则topic为test_topic,他们kafka客户端是使用spring-kafka。一开始接到这个需求的时候,我心里是拒绝的,为啥开发环境和测试环境不分别部署一套kafka,还要那么麻烦。但老大都答应接这个需求了,作为小罗罗也只能接了

实现思路

1、生产者端

可以通过生产者拦截器,来给topic加前缀

2、实现步骤

a、编写一个生产者拦截器

@Slf4j
public class KafkaProducerInterceptor implements ProducerInterceptor<String, MessageDTO> {



    /**
     * 运行在用户主线程中,在消息被序列化之前调用
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, MessageDTO> onSend(ProducerRecord<String, MessageDTO> record) {
        log.info("原始topic:{}",record.topic());
        return new ProducerRecord<String, MessageDTO>(TOPIC_KEY_PREFIX + record.topic(),
                record.partition(),record.timestamp(),record.key(), record.value());
    }




    /**
     * 在消息被应答之前或者消息发送失败时调用,通常在producer回调逻辑触发之前,运行在produer的io线程中
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
      log.info("实际topic:{}",metadata.topic());
    }


    /**
     *  清理工作
     */
    @Override
    public void close() {
    }


    /**
     * 初始化工作
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }

b、配置拦截器

kafka:
    producer:
      # 生产者拦截器配置
      properties:
        interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor

c、测试

image.png

2、消费者端

这个就稍微有点难搞了,因为业务开发部门他们是直接用@KafkaListener的注解,形如下

 @KafkaListener(id = "msgId",topics = {Constant.TOPIC})

像这种也没啥好的办法,就只能通过源码了,通过源码可以发现在如下地方

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

会把@KafkaListener的值赋值给消费者,如果对spring有了解的朋友,可能会知道postProcessAfterInitialization是spring后置处理器的方法,主要用来bean初始化后的一些操作,既然我们知道@KafkaListener会在bean初始化后再进行赋值,那我们就可以在bean初始化前,修改掉@KafkaListener的值。具体实现如下

@Component
public class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor {

    @SneakyThrows
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

        List<String> packageNames = AutoConfigurationPackages.get(beanFactory);

        for (String packageName : packageNames) {
            Reflections reflections = new Reflections(new ConfigurationBuilder()
                    .forPackages(packageName) // 指定路径URL
                    .addScanners(new SubTypesScanner()) // 添加子类扫描工具
                    .addScanners(new FieldAnnotationsScanner()) // 添加 属性注解扫描工具
                    .addScanners(new MethodAnnotationsScanner() ) // 添加 方法注解扫描工具
                    .addScanners(new MethodParameterScanner() ) // 添加方法参数扫描工具
            );

            Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
            if(!CollectionUtils.isEmpty(methodSet)){
                for (Method method : methodSet) {
                    KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
                    changeTopics(kafkaListener);
                }
            }
        }

    }


    private void changeTopics(KafkaListener kafkaListener) throws Exception{
        InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
        Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
        memberValuesField.setAccessible(true);
        Map<String,Object> memberValues = (Map<String,Object>)memberValuesField.get(invocationHandler);
        String[] topics = (String[])memberValues.get("topics");
        System.out.println("修改前topics:" + Lists.newArrayList(topics));
        for (int i = 0; i < topics.length; i++) {
            topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i];
        }
        memberValues.put("topics", topics);
        System.out.println("修改后topics:" + Lists.newArrayList(kafkaListener.topics()));

    }
}

测试

image.png

image.png

总结

虽然实现了动态修改topic,但我还是觉得topic不要随便改变,有条件的话,kafka还是得基于物理环境隔离,其次真的客观条件不允许,要动态变更topic,则需做好topic动态变更宣导以及相关wiki的编写,不然很容易掉坑

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-mq-idempotent-consume

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,367评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,959评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,750评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,226评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,252评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,975评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,592评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,497评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,027评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,147评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,274评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,953评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,623评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,143评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,260评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,607评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,271评论 2 358

推荐阅读更多精彩内容