本篇将演示如何用Table API 实现上一篇demo3
的功能。上一篇传送门 Apache Flink 学习笔记(二)
Flink
中DataSet
和DataStream
都能与Table
互转,每一种操作也都有相对应的 api
补充:使用Table API
以及下一章的SQL
,请添加以下依赖项
<!-- Table API和SQL捆绑在flink-tableMaven工件中。必须将以下依赖项添加到项目中才能使用Table API和SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.6.0</version>
</dependency>
<!-- 为Flink的Scala批处理或流API添加依赖项 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.6.0</version>
</dependency>
<!-- 对于流式查询 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.0</version>
</dependency>
首先我把 pojo Bean3
抽离出来作为公用,使用pojo
记住这四点
pojo
必须声明为public
,如果是内部类必须是static
的- 必须为
pojo
创建一个无参的构造函数- 必须声明
pojo
的字段为public
,或者生成public
的get
,set
方法- 必须使用
Flink
支持的数据类型
import java.io.Serializable;
/**
* pojo
*/
public class Bean3 implements Serializable{
public Long timestamp;
public String appId;
public String module;
public Bean3() {
}
public Bean3(Long timestamp, String appId, String module) {
this.timestamp = timestamp;
this.appId = appId;
this.module = module;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
public String getModule() {
return module;
}
public void setModule(String module) {
this.module = module;
}
@Override
public String toString() {
return "Bean3{" +
"timestamp=" + timestamp +
", appId='" + appId + '\'' +
", module='" + module + '\'' +
'}';
}
}
demo5
代码部分
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.Tumble;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import java.util.Date;
/**
* Table API
*/
public class Demo5 {
private static final String APP_NAME = "app_name";
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableSysoutLogging();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //设置窗口的时间单位为process time
env.setParallelism(1);//全局并发数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "kafka bootstrap.servers");
//设置topic和 app name
//FlinkKafkaManager 源码见笔记二
FlinkKafkaManager manager = new FlinkKafkaManager("kafka.topic", APP_NAME, properties);
FlinkKafkaConsumer09<JSONObject> consumer = manager.build(JSONObject.class);
consumer.setStartFromLatest();
//获取DataStream,并转成Bean3
DataStream<Bean3> stream = env.addSource(consumer).map(new FlatMap());
final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
//timestamp,appId,module 是pojo的字段名,最后的tt是随意指定的扩展字段,.proctime用来标识process time
Table table = tableEnvironment.fromDataStream(stream, "timestamp,appId,module,tt.proctime");
tableEnvironment.registerTable("common", table);//注册表名
//或者使用 registerDataStream
//tableEnvironment.registerDataStream("common", stream, "timestamp,appId,module,tt.proctime");//注册表名
Table query =
tableEnvironment
.scan("common") //等价from
.window(Tumble.over("10.seconds").on("tt").as("dd"))// 每10s执行一次,必须要取别名,且不能和tt相同,这里还没有搞清楚原理
.groupBy("dd,appId")//必须要用window找那个指定的dd别名聚合
.select("appId,COUNT(module) as totals") //COUNT(module)也可以写成 module.count
.where("appId == '100007336' || appId == '100013668'"); //等价于 filter(); 用or 报错。奇葩的是用=,==,=== 都能通过
DataStream<Row> result = tableEnvironment.toAppendStream(query, Row.class);
result.process(new ProcessFunction<Row, Object>() {
@Override
public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
System.out.println(String.format("AppId:%s, Module Count:%s", value.getField(0).toString(), value.getField(1).toString()));
}
});
try {
env.execute(APP_NAME);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class FlatMap implements MapFunction<JSONObject, Bean3> {
@Override
public Bean3 map(JSONObject jsonObject) throws Exception {
return new Bean3(new Date().getTime(), jsonObject.getString("appId"), jsonObject.getString("module"));
}
}
}
使用Table API
需要先创建StreamTableEnvironment
对象,然后调用fromDataStream
(如果是流处理)创建Table
。或者直接调用registerDataStream
同时指定表名和字段mapping
。
本例中我使用的是process time
定义窗口event time
,所以消息中的timestamp
字段并没有使用。而是通过额外扩展一个自定义字段tt
来作为process timestamp
,该字段只能放在最后,此时还需要在tt后面加上.proctime后缀。
同样的,可以将process time
改造成event time
,改动如下:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置窗口的时间单位为event time
指定事件时间戳,同demo3
的改造
DataStream<Bean3> bean3DataStreamWithAssignTime
= stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Bean3>(Time.seconds(0)) {
@Override
public long extractTimestamp(Bean3 element) {
return element.getTimestamp();
}
});
使用timestamp
作为出发时间时间戳,此时必须添加.rowtime后缀
Table table = tableEnvironment.fromDataStream(bean3DataStreamWithAssignTime, "timestamp.rowtime,appId,module");
//window
内指定timestamp
即可
.window(Tumble.over("10.seconds").on("timestamp").as("dd"))