Flink:Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader....

背景:Flink 1.13.5,引入cdc2.1.1

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.1.1</version>
</dependency>

代码如下:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author: yuninglong
 */
public class CdcFromMysql {
    public static void main(String[] args) throws Exception {
        try {
            //TODO 1)初始化flink流处理的运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 使用MySQLSource创建数据源
// 同时指定StringDebeziumDeserializationSchema,将CDC转换为String类型输出
            String hostname = "192.168.202.10";
            int port = 3301;
            String database = "demo";
            String username = "root";
            String password = "testYnlcom";

            MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(hostname).port(port)
                    .databaseList(database).username(username).password(password)
                    .tableList("demo.student")
                    .serverId("1-100")
                    .deserializer(new JsonDebeziumDeserializationSchema())
                    .includeSchemaChanges(true) // output the schema changes as well
                    .build();

            env.enableCheckpointing(3000);

// set the source parallelism to 1
            env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySqlParallelSource")
//                    .setParallelism(1)
                    .print()
                    .setParallelism(1);


            env.execute();

            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

报如下错误:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitter
    at com.yuninglong.mysql.CdcFromMysql.execute(CdcFromMysql.java:30)
    at com.yuninglong.mysql.CdcFromMysql.main(CdcFromMysql.java:13)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

查找发现还需要引入下面的包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容