Debezium:数据实时采集从Postgresql到Kafka

总体架构

postgresql

postgresql.conf

listen_addresses = '*'
shared_buffers = 128MB
dynamic_shared_memory_type = posix
wal_level = logical
max_wal_size = 1GB
min_wal_size = 80MB
max_wal_senders = 8
wal_keep_segments = 4
max_replication_slots = 4 

pg_hba.conf

pg_hba.conf

重启PG

service postgresql restart

zk和kafka

  1. 下载
    https://mirrors.aliyun.com/apache/
    应该下载可执行文件的压缩包apache-zookeeper-x.x.x-bin.tar.gz,apache-zookeeper-x.x.x.tar.gz是源码
  2. 然后解压至指定目录
tar -xzvf 

Debezium

  1. 下载对应数据库的Debezium连接器
    https://debezium.io/documentation/reference/1.0/install.html
  2. 解压将jar拷贝到kafka的libs目录下


    image.png

配置

服务器eth0_ip为172.18.4.109,公网ip为8.129.215.124

kafka

mkdir /usr/local/zgxsoft/etl/kafka/log

connect-distributed.properties

bootstrap.servers=172.18.4.109:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000

server.properties

broker.id=0
listeners=PLAINTEXT://172.18.4.109:9092
host.name = 172.18.4.109
advertised.listeners=PLAINTEXT://8.129.215.124:9092
num.network.threads=3
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400      
socket.request.max.bytes=104857600
log.dirs=/usr/local/zgxsoft/etl/kafka/log
num.partitions=1
num.recovery.threads.per.data.dir=1
such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1    
zookeeper.connect=172.18.4.109:2181

zk

mkdir /usr/local/zgxsoft/etl/zk/log
mkdir /usr/local/zgxsoft/etl/zk/data
cp zookeeper-3.4.14/conf/zoo_sample.cfg zookeeper-3.4.14/conf/zoo.cfg

zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zgxsoft/etl/zk/log
dataLogDir=/usr/local/zgxsoft/etl/zk/data
clientPort=2181

kafka connector

增加postgresql connector

postman发送post请求示例

启动

  • 启动zk
sh zkServer.sh start
  • 启动kafaka connector
sh connect-distributed.sh ../../config/connect-distributed.properties
  • 启动kafaka
./kafka-server-start.sh -daemon config/server.properties



springboot消费端

public class MQDict {
    public static final String MQ_ADDRESS_COLLECTION="8.129.215.124:9092";          //kafka集群地址
    public static final String CONSUMER_TOPIC = "track_iqifu.public.track_iqifu";   //消费者连接的topic
    public static final String CONSUMER_GROUP_ID = "iqifu_group_001";               //groupId,可以分开配置
    public static final String CONSUMER_ENABLE_AUTO_COMMIT = "true";                //是否自动提交(消费者)
    /*
    消费者自动提交偏移量。enable.auto.commit 设为 true,那么每过一定时间间隔,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。
    提交时间间隔由 auto.commit.interval.ms 控制,默认是5s。
     */
    public static final String CONSUMER_AUTO_COMMIT_INTERVAL_MS = "1000";
    public static final String CONSUMER_SESSION_TIMEOUT_MS = "30000";               //连接超时时间
    public static final int CONSUMER_MAX_POLL_RECORDS = 10;                         //每次拉取数
    public static final Duration CONSUMER_POLL_TIME_OUT = Duration.ofMillis(6000);  //拉去数据超时时间
    /*
    指定了消费者在读取一个没有偏移量(offset)的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时井被删除)该作何处理,
    默认值是 latest,表示在 offset 无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。
    另一个值是 earliest,消费者将从起始位置读取分区的记录。
     */
    public static final String AUTO_OFFSET_RESET = "earliest";
    //client.id是用户特定的字符串,用来在每次请求中帮助跟踪调用。它应该可以逻辑上确认产生这个请求的应用
    public static final String CLIENT_ID = "client_1";

}
public class MQConsumer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    private static KafkaConsumer<String,String> consumer;

    /**
     *  初始化消费者
     */
    static {
        Properties configs = initConfig();
        consumer = new KafkaConsumer<String, String>(configs);
        /*
         创建了消费者之后,需要订阅 Topic,subscribe() 方法接受一个主题列表作为参数。
         subscribe() 也可以接收一个正则表达式,匹配多个主题(如果有新的名称匹配的主题创建,会立即触发一次再均衡,消费者就可以读取新添加的主题)
         */
        consumer.subscribe(Arrays.asList(MQDict.CONSUMER_TOPIC));
    }

    /**
     *  初始化配置
     */
    private static Properties initConfig(){
        Properties props = new Properties();
        props.put("bootstrap.servers", MQDict.MQ_ADDRESS_COLLECTION);
        props.put("group.id", MQDict.CONSUMER_GROUP_ID);
        props.put("enable.auto.commit", MQDict.CONSUMER_ENABLE_AUTO_COMMIT);
        props.put("auto.commit.interval.ms", MQDict.CONSUMER_AUTO_COMMIT_INTERVAL_MS);
        props.put("session.timeout.ms", MQDict.CONSUMER_SESSION_TIMEOUT_MS);
        props.put("max.poll.records", MQDict.CONSUMER_MAX_POLL_RECORDS);
        props.put("auto.offset.reset", MQDict.AUTO_OFFSET_RESET);
        props.put("client.id", MQDict.CLIENT_ID);
        // key反序列化
        props.put("key.deserializer", StringDeserializer.class.getName());
        // value反序列化
        props.put("value.deserializer", StringDeserializer.class.getName());
        return props;
    }

    public void exeConsumer(IqifuBakDAO iqifuBakDAO) {
        try {
            while (true) {
                /*
                 poll() 返回一个记录列表。
                 每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
                 */
                ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_TIME_OUT);
                if (records != null) {
                    records.forEach((ConsumerRecord<String, String> record) -> {
                         //打印偏移量,key,value
                         System.out.printf("offset = %d, value = %s", record.offset(), record.key(),record.value());
                         //插入数据库
                         iqifuBakDAO.insert(record);
                    });
                }
            }
        }finally {
            // 关闭消费者,网络连接和 socket 也会随之关闭,并立即触发一次再均衡
            consumer.close();
        }
    }
}
/**
 * Spring boot监听类,用于启动Spring boot容器后加载数据
 */
@Component
public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent> {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @SuppressWarnings("all")
    @Autowired
    IqifuBakDAO iqifuBakDAO;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        logger.info("#########################################注入DAO层接口,开始消费#########################################");
        new MQConsumer().exeConsumer(iqifuBakDAO);
    }
}

参考资料:

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353