因为项目环境限制,流处理引擎只能使用阿里云 Blink3.3.0 版本,翻阅阿里云官网 Blink 和 GitHub Flink 的Blink分支资料,成功构建并运行 Blink 的 DataStream 和 Table,记录在此作为笔记。
一、说明
1、据阿里云官网资料显示,Blink已经停售
https://help.aliyun.com/document_detail/168049.html
Blink.png
2、Blink3.X版本完全兼容Flink1.5版本
https://help.aliyun.com/document_detail/111873.html
Flink-Blnk.png
3、Datastream完全兼容开源Flink1.5.2版本
https://help.aliyun.com/document_detail/156813.html
Blink-Datastream.png
4、GitHub上的Blink分支显示兼容Flink1.5.1
https://github.com/apache/flink/tree/blink
Blink-GitHubTree.png
5、当前 Apache Flink 版本是 1.14.0
https://flink.apache.org/news/2021/09/29/release-1.14.0.html
查阅Apache Flink官网版本资料发现,对阿里内部版本 Blink 合并后的首次版本发布是Apache Flink 1.9.0
二、构建Blink3.3.0工程
使用Maven3、IntelliJ IDEA构建开发环境
1、pom.xml文件
<properties>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<blink.version>blink-3.3.0</blink.version>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-core</artifactId>
<version>${blink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-java</artifactId>
<version>${blink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${blink.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${blink.version}</version>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${blink.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-codec</artifactId>
<groupId>commons-codec</groupId>
</exclusion>
<exclusion>
<artifactId>commons-collections</artifactId>
<groupId>commons-collections</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
<exclusion>
<artifactId>antlr4-runtime</artifactId>
<groupId>org.antlr</groupId>
</exclusion>
</exclusions>
<!-- <scope>provided</scope> -->
</dependency>
<!-- Add test framework -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<!-- Add logging framework -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>4.7.2</version>
</dependency>
</dependencies>
2、Blink Datastream简单示例
使用Socket产生流数据:
public class SocketServerTest {
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(9000);
Socket client = server.accept();
System.out.println("客户端:" + client.getInetAddress().getHostAddress());
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
while (true) {
bw.write((int) (Math.random() * 100) % 10 + "\n");
bw.flush();
Thread.sleep(1000L);
}
}
}
Flink socketTextStream处理流数据:
public class StreamWordCountTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//生成DataSource
DataStream dataSource = env.socketTextStream("localhost", 9000, "\n");
//解析数据:传入的数据使用Tuple2结构化,窗口周期5秒,按Tuple2第1个元素分组,第2个元素统计,
DataStream<Tuple2<String, Integer>> windowCounts = dataSource
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
System.out.println("value=" + value);
for (String word : value.split("\\n")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
windowCounts.print().setParallelism(1);
//执行
env.execute("StreamTest");
}
}
3、Blink Table简单示例
public class TableWordCountTest {
public static void main(String[] args) throws Exception {
//构建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getBatchTableEnvironment(env);
//生成DataSource
DataStreamSource<WordCount> elementSource = env.fromElements(
new WordCount("Lily", 1),
new WordCount("Lily", 1),
new WordCount("Lily", 1),
new WordCount("Tom", 1),
new WordCount("Tom", 1),
new WordCount("Jack", 1));
//注册 WordCountTableTemp 表,设置字段
tEnv.registerBoundedStream("TEMP_TABLE", elementSource, "word, num");
Table table = tEnv.sqlQuery(
"SELECT word, SUM(num) as num FROM TEMP_TABLE GROUP BY word");
//输出
table.printSchema();
table.print();
//执行
tEnv.execute("TableTest");
}
public static class WordCount {
public String word;
public long num;
public WordCount() {
}
public WordCount(String word, long num) {
this.word = word;
this.num = num;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", num=" + num +
'}';
}
}
}
三、总结
Blink 3.X 对应 Flink 1.5版本;
开源Flink1.9版本首次完成阿里Blink整合,但其实 Blink 的查询处理器的集成还没有完全完成;
开源Flink1.10版本完全整合Blink,增强了流式SQL处理能力和成熟的批处理能力;
开源Flink1.12版本,DataStream流批一体迈出了第一步;
流批一体化,也是大数据实时处理的趋势。