1.mysql连接
//t1只有一列a int型
TypeInformation[] fieldTypes =new TypeInformation[]{
BasicTypeInfo.INT_TYPE_INFO
};
RowTypeInfo rowTypeInfo =new RowTypeInfo(fieldTypes);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost/test1")
.setUsername("root")
.setPassword("123456")
.setQuery("select * from t1")
.setRowTypeInfo(rowTypeInfo)
.finish();
2.flink sql
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource s = env.createInput(jdbcInputFormat); //datasource
BatchTableEnvironment tableEnv =new BatchTableEnvironment(env, TableConfig.DEFAULT());
tableEnv.registerDataSet("t2", s, "a");
tableEnv.sqlQuery("select * from t2").printSchema();
Table query = tableEnv.sqlQuery("select * from t2").where("a > 1");
DataSet result = tableEnv.toDataSet(query, Row.class);
result.print();
System.out.println(s.count());
3.dependency
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>1.7.0</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>