基本实现方案参考SQLServer CDC数据通过Kafka connect实时同步至分析型数据库 AnalyticDB For PostgreSQL及OSS-阿里云开发者社区 (aliyun.com)
提示Failed to find any class that implements Connector
解决方案:假设配置了plugin.path=/home/text/kafka-plugin,那么在kafka-plugin目录下建立子目录sqlserver,然后将插件的所有jar包复制到子目录中,重启connect。如果kafka通过SASL_PLAINTEXT协议配置了用户名密码,启动connect的时候,一直报disconnect
解决方案:需要在connect的配置文件connect-distributed.properties和插件的配置文件里同时添加相关的配置项。
connect-distributed.properties
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
security.protocol=SASL_PLAINTEXT
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
producer.security.protocol=SASL_PLAINTEXT
sqlserver-cdc-source.json
{
"name": "sqlserver-cdc-source",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "DESKTOP-0T78TIH",
"database.hostname" : "192.168.1.1",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "123456",
"database.dbname" : "test-cdc",
"schemas.enable" : "false",
"mode":"incrementing",
"incrementing.column.name":"ID",
"database.history.kafka.bootstrap.servers" : "192.168.1.1:9092",
"database.history.kafka.topic": "test-cdc.bak",
"database.history.consumer.sasl.mechanism":"PLAIN",
"database.history.consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";",
"database.history.consumer.security.protocol":"SASL_PLAINTEXT",
"database.history.producer.sasl.mechanism":"PLAIN",
"database.history.producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";",
"database.history.producer.security.protocol":"SASL_PLAINTEXT",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}