依赖
1. flink jdbc api
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
2. 对应数据库的jdbc依赖,如以下为MySQL5.x的jdbc依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
描述
- flinksql以多线程的方式将数据写入外部数据表
- flinksql sink table的ddl中若声明了主键,则上游数据到达sink table中将支持针对主键的append、update、delete模式(即upsert模式)
- mysql connector中,flinksql upsert模式通常需要和外部数据表的主键配合使用(即flink table和外部数据表的主键一致)
示例
String sinkDDL = "create table mysqlSink(" +
"ts timestamp," +
"user_id bigint," +
"item_id bigint," +
"behavior string" +
") with (" +
"'connector'='jdbc'," +
"'url'='jdbc:mysql://localhost:3306/test'," +
"'table-name'='user_event'," +
"'username'='root'," +
"'password'='123456'" +
")" +
"";