背景:Flink 1.13.5,引入cdc2.1.1
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.1.1</version>
</dependency>
代码如下:
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author: yuninglong
*/
public class CdcFromMysql {
public static void main(String[] args) throws Exception {
try {
//TODO 1)初始化flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 使用MySQLSource创建数据源
// 同时指定StringDebeziumDeserializationSchema,将CDC转换为String类型输出
String hostname = "192.168.202.10";
int port = 3301;
String database = "demo";
String username = "root";
String password = "testYnlcom";
MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(hostname).port(port)
.databaseList(database).username(username).password(password)
.tableList("demo.student")
.serverId("1-100")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.build();
env.enableCheckpointing(3000);
// set the source parallelism to 1
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySqlParallelSource")
// .setParallelism(1)
.print()
.setParallelism(1);
env.execute();
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
报如下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitter
at com.yuninglong.mysql.CdcFromMysql.execute(CdcFromMysql.java:30)
at com.yuninglong.mysql.CdcFromMysql.main(CdcFromMysql.java:13)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
查找发现还需要引入下面的包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>