Kafka stream读写分离

通过自定义实现KafkaClientSupplier接口实现从一个kafka集群读取数据,再写入到另一个kafka集群中。主要实现如下:

1.自定义实现KafkaClientSupplier接口

package org.feiyu.dataprocess.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.streams.KafkaClientSupplier;

import java.util.HashMap;
import java.util.Map;

/**
 * 自定义KafkaClientSupplier
 */
public class MyKafkaClientSupplier implements KafkaClientSupplier {
    //生产者配置文件
    private Map<String,Object> producerConfig;
    //消费者配置文件
    private Map<String,Object> consumerConfig;

    public MyKafkaClientSupplier(Map<String, Object> producerConfig, Map<String, Object> consumerConfig) {
        this.producerConfig = producerConfig;
        this.consumerConfig = consumerConfig;
    }

    @Override
    public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
        Map<String,Object> map = new HashMap<>(config);
        map.putAll(producerConfig);
        return new KafkaProducer<>(map,new ByteArraySerializer(),new ByteArraySerializer());
    }

    @Override
    public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
        Map<String,Object> map = new HashMap<>(config);
        map.putAll(consumerConfig);
        return new KafkaConsumer<>(map,new ByteArrayDeserializer(),new ByteArrayDeserializer());
    }

    @Override
    public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
        Map<String,Object> map = new HashMap<>(config);
        map.putAll(consumerConfig);
        return new KafkaConsumer<>(map,new ByteArrayDeserializer(),new ByteArrayDeserializer());
    }
}

2.创建自定义的KafkaClientSupplier实例并传入KafkaStreams构造器中

 StreamsConfig config = new StreamsConfig(props);

 MyKafkaClientSupplier supplier = new MyKafkaClientSupplier(
                    Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"//生产者kafka集群配置"),
                    Collections.singletonMap(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"//消费者kafka集群配置"));

 KafkaStreams streams = new KafkaStreams(builder, config,supplier);
 streams.start();
 kafkaStreams.add(streams);

3.在kafka stream启动日志中可以看到配置生效,实现了kafka stream的读写分离

消费者配置

生产者配置
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,680评论 19 139
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,913评论 13 425
  • 4. 设计思想 4.1 动机 我们设计的 Kafka 能够作为一个统一的平台来处理大公司可能拥有的所有实时数据馈送...
    疯狂的橙阅读 1,150评论 1 4
  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,987评论 4 54
  • 我们好像早就过了那个年纪,那个青涩的,单纯的年纪,那个多说一句话就被当做是有好感,心跳加速就是喜欢,牵了手就是男女...
    難歌啊阅读 627评论 0 0

友情链接更多精彩内容