confluent jdbc connector - mysql

JDBC Source Connector

Quickstart

数据库环境准备

CREATE DATABASE connector;
USE connector;
CREATE TABLE `from_source` (
  `fdsid` int(11) NOT NULL AUTO_INCREMENT,
  `dsid` int(11) DEFAULT NULL,
  `from` int(11) DEFAULT NULL,
  `stype` int(11) DEFAULT NULL,
  PRIMARY KEY (`fdsid`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;
INSERT INTO from_source VALUES(1, 2, 1, 1);
INSERT INTO from_source VALUES(2, 2, 1, 2);
INSERT INTO from_source VALUES(3, 2, 1, 5);
INSERT INTO from_source VALUES(4, 2, 1, 6);

MySQL JDBC 驱动准备

测试环境使用的mysql版本信息如下:


mysql-info.png

在mysql官网上选择合适的驱动下载,测试中下载的是mysql-connector-java-5.1.42.tar.gz。
将此驱动拷贝到$CONFLUENT_PATH/share/java/kafka-connect-jdbc目录下,使用解压命令解压

tar -xzvf mysql-connector-java-5.1.42.tar.gz
cd mysql-connector-java-5.1.42
cp mysql-connector-java-5.1.42-bin.jar ../

最终的目的就是将mysql-connector-java-5.1.42-bin.jar放在$CONFLUENT_PATH/share/java/kafka-connect-jdbc目录下,这样confluecnt connector在启动是就可以找到mysql的jdbc驱动了

配置文件准备

单机环境下运行connector的命令如下:

//INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:61)
bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/mysql-source.properties

其中:connect-avro-standalone.properties可使用默认配置;
mysql-source.properties的内容如下:

# tasks to create:
name=test-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# a table called 'users' will be written to the topic 'test-mysql-jdbc-users'.
connection.url=jdbc:mysql://172.24.8.114:3306/connector?user=$USER&password=$PASSWORD
mode=incrementing
incrementing.column.name=fdsid
topic.prefix=test-mysql-jdbc-

其中,$USER,$PASSWORD是访问mysql数据库的用户名和地址。
参考:
JDBC驱动下载地址
MySQL Connector配置参考

从MySQL导入数据到Kafka中

启动Connector

 bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/mysql-source.properties

验证产生了相应的topic

bin/kafka-topics --zookeeper localhost:2181 --list

topic列表中会包含:test-mysql-jdbc-from_source

验证topic中的数据

bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic test-mysql-jdbc-from_source --from-beginning

此时,可以获取topic:test-mysql-jdbc-from_source中的所有数据。

验证可增量从MySQL中导入数据

向from_source表中添加一个数据

insert into from_source values(7, 2, 1, 1)

相应的消费者会接收到插入到from_source表中的数据。
总参考:
JDBC Source Connector 官网

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,992评论 19 139
  • Kafka官网:http://kafka.apache.org/入门1.1 介绍Kafka™ 是一个分布式流处理系...
    it_zzy阅读 3,913评论 3 53
  • 发行说明 - Kafka - 版本1.0.0 以下是Kafka 1.0.0发行版中解决的JIRA问题的摘要。有关该...
    全能程序猿阅读 2,906评论 2 7
  • 【每日一谈心】:QQ,来来来,随性派画家咱俩谈谈心,你觉得在身上、地板上、墙上、门上、爸爸的床上绘画有什么不同吗?...
    674e09b5464a阅读 395评论 0 0
  • 距离第一次遇见苏晴已有十年了。 十年过去了,我仍记得心中那砰然而动的感觉,仿佛她从不曾离去一样。时隔多年,每当我回...
    二非_阅读 1,096评论 2 40