看了几天flink,刚入门。
简单说下对flink的感受,flink有4层(有些说3层,将Table API和SQL看成一层)API,越底层,对数据的操作就越精细,越高层完成功能所需要的代码就越少,而且代码越易读。
api使用起来很像java中的stream,这个其实很显然,都是为了对流数据进行处理。感觉就像flink是java中并行流的分布式版本,所以对stream熟悉的话,flink上手不难,或者说使用flink编写代码并不难。
Flink的编程模式:输入(source) -> 处理(转换transform) -> 输出(sink),3部分,相当清爽。
统一术语
数据比对一般针对两个数据集A/B,在选定一个基准方A后,定义如下:
F000:A/B两方数据相同
F113:A中存在,但B中没有,A比B多
F114:B中存在,但A中没有,B比A多
F115:A与B的关键字段相同,但毕竟字段不同,如A与B都有同一笔订单,但订单金额不同
新建工程
这里我们使用官方提供的quickstart做模板,如果是比较新版的idea(如2020.1)里面直接有flink的quickstart模板,旧版的idea的话,需要自己添加一下。
下次使用的时候可以直接从这里看到:
如果你使用的是scala,ArtifactId则填flink-quickstart-scala。具体的版本信息可以根据最新版的填写。
添加Table API依赖
在pom.xml中添加Table API依赖。
<!-- Table API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Table API需要scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
编写代码
利用模板里的BatchJob来编写:
package com.flink;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import java.util.List;
/**
* Skeleton for a Flink Batch Job.
*
* <p>For a tutorial how to write a Flink batch application, check the
* tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution,
* change the main class in the POM.xml file to this class (simply search for 'mainClass')
* and run 'mvn clean package' on the command line.
*/
public class BatchJob {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Table Environment
BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);
/**
* 构造两个数据集,实际生产从自己需要的source中获取即可
*/
DataSource<String> dataSourceA_unique = env.fromElements("orderId_1_f113", "orderId_2_f000", "orderId_3_f115");
DataSource<String> dataSourceB_unique = env.fromElements("orderId_2_f000", "orderId_3_f115", "orderId_4_f114");
// 转换成table
Table tableA_unique = tableEnvironment.fromDataSet(dataSourceA_unique);
Table tableB_unique = tableEnvironment.fromDataSet(dataSourceB_unique);
/**
* 核心比对(对账)逻辑
*/
Table f113_table = tableA_unique.minusAll(tableB_unique);// 差集
Table f114_table = tableB_unique.minusAll(tableA_unique);// 差集
Table f000_table = tableA_unique.intersect(tableB_unique);// 交集
// 转回DataSet用于输出
DataSet<String> f000 = tableEnvironment.toDataSet(f000_table, String.class);
DataSet<String> f113 = tableEnvironment.toDataSet(f113_table, String.class);
DataSet<String> f114 = tableEnvironment.toDataSet(f114_table, String.class);
/**
* 输出,实际输出到自己需要的sink即可
*/
List<String> f000_list = f000.collect();
List<String> f113_list = f113.collect();
List<String> f114_list = f114.collect();
System.out.println("==============================");
System.out.println("f000 ->" + f000_list);
System.out.println("==============================");
System.out.println("f113 ->" + f113_list);
System.out.println("==============================");
System.out.println("f114 ->" + f114_list);
// 批处理不需要显示调用execute,否则会报错
// env.execute("Flink Batch Java API Skeleton");
}
}
简单说下几个关键点:
- 使用Table API需要创建对应的执行环境:
BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);
- 模板代码中最后显式调用
env.execute()
,其实在批处理中不需要,显式调用反而会报错。
源码
总结
本质上就是利用Table API中对数据集的处理函数(交集、差集)来完成数据比对。
如果你有更好的想法,欢迎留言,多多指教。
转载请注明出处