<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<flink.version>1.13.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- for flink-1.13 -->
<version>1.2.3_flink-1.13_2.11</version>
<!-- <version>flink-connector-starrocks-1.2.3_flink-1.13_2.12</version>-->
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table-planner-blink_2.11</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-format-changelog-json</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-elasticsearch7_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>5.0.0</version>
</dependency>
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>pinyin</artifactId>
<version>0.2.1</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}.${git.branch}.${project.version}.${build.version}</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
<excludes>
<exclude>static/**</exclude>
</excludes>
<includes>
<include>mybatis/**</include>
<include>bin/*.sh</include>
<include>*.properties</include>
<include>*.xml</include>
<include>*.json</include>
</includes>
</resource>
<!-- fonts file cannot use filter as the data structure of byte file will
be changed via filter -->
<resource>
<directory>src/main/resources</directory>
<filtering>false</filtering>
<includes>
<include>static/**</include>
<include>templates/**</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.iflytek.swk.MysqlTransElastic</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<descriptors>
<descriptor>src/assembly/distribution.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
public static void main(String[] args) throws Exception {
// 加载外部配置
List<Props> propsList = new ArrayList<>();
for (String arg : args) {
if (FileUtil.isFile(arg)) {
Props props = Props.create();
props.load(FileUtil.getUtf8Reader(arg));
propsList.add(props);
} else if (FileUtil.isDirectory(arg)) {
File dir = FileUtil.file(arg);
for (File file : Objects.requireNonNull(dir.listFiles())) {
if (!StrUtil.equals(FileUtil.extName(file), "properties")) {
continue;
}
Props props = Props.create();
props.load(FileUtil.getUtf8Reader(file));
propsList.add(props);
}
} else {
throw new FileNotFoundException("找不到配置文件");
}
}
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings Settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, Settings);
env.getConfig().enableObjectReuse();//启用对象重用
//1.1Checkpoint相关
/*读取的是binlog中的数据,如果集群挂掉,尽量能实现断点续传功能。如果从最新的读取(丢数据)。如果从最开始读(重复数据)。理想状态:读取binlog中的数据读一行,保存一次读取到的(读取到的行)位置信息。而flink中读取行位置信息保存在Checkpoint中。使用Checkpoint可以把flink中读取(按行)的位置信息保存在Checkpoint中*/
env.enableCheckpointing(60000L);//1m执行一次Checkpoint
//设置Checkpoint的模式:精准一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//任务挂掉的时候是否清理checkpoint。使任务正常退出时不删除CK内容,有助于任务恢复。默认的是取消的时候清空checkpoint中的数据。RETAIN_ON_CANCELLATION表示取消任务的时候,保存最后一次的checkpoint。便于任务的重启和恢复,正常情况下都使用RETAIN
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置一个重启策略:默认的固定延时重启次数,重启的次数是Integer的最大值,重启的间隔是1s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(64, 10000L));
//join表state较大,需要使用rocksdb模式处理大的state
env.setStateBackend(new EmbeddedRocksDBStateBackend());
//设置并行数
env.setParallelism(2);
//设置一个状态后端 jobManager。如果使用的yarn集群模式,jobManager随着任务的生成而生成,任务挂了jobManager就没了。因此需要启动一个状态后端。只要设置checkpoint,尽量就设置一个状态后端。保存在各个节点都能读取的位置:hdfs中
Configuration configuration = tenv.getConfig().getConfiguration();
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "10s");
configuration.setString("table.exec.mini-batch.size", "1000");
configuration.setString("table.exec.hive.infer-source-parallelism.max", "100");
configuration.setString("table.exec.hive.fallback-mapred-reader", "true");
for (Props props : propsList) {
String source = props.getStr("source");
String sink = props.getStr("sink");
// 执行语句
if (StrUtil.isNotBlank(source)) {
tenv.executeSql(source);
}
if (StrUtil.isNotBlank(sink)) {
tenv.executeSql(sink);
}
}
for (Props props : propsList) {
String existSource = props.getStr("source[0]");
if (StringUtils.isNotBlank(existSource)) {
Map<String, List<String>> map = new HashMap<>();
props.forEach((k, v) -> {
String key = StringUtils.split(k.toString(), "[")[0];
List<String> val = map.getOrDefault(key, new ArrayList<>());
val.add(StrUtil.toString(v));
map.putIfAbsent(key, val);
});
List<String> sourceList = map.get("source");
// 执行语句
for (String source : sourceList) {
tenv.executeSql(source);
}
List<String> sinkList = map.get("sink");
// 执行语句
for (String sink : sinkList) {
tenv.executeSql(sink);
}
}
}
// 数据视图
for (Props props : propsList) {
String view = props.getStr("view");
if (StrUtil.isNotBlank(view)) {
tenv.executeSql(view);
}
}
// 数据转移sql
for (Props props : propsList) {
String sqlcdc = props.getStr("trans");
if (StrUtil.isNotBlank(sqlcdc)) {
tenv.executeSql(sqlcdc);
}
}
//env.execute();
}