flink table (word count)

object Stream2Table {

def main(args: Array[String]):Unit = {

import org.apache.flink.streaming.api.scala._

val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    environment.setParallelism(1)

val stream: DataStream[(String, Int)] = environment

.readTextFile("/Users/run/Downloads/workspaceIDEA/flink-tutorials/data/words.txt")

.flatMap(_.split(" "))

.filter(_.nonEmpty)

.map((_, 1))

.keyBy(0)

.sum(1)

val tableEnvironment: StreamTableEnvironment =

StreamTableEnvironment.create(environment)

val table: Table = tableEnvironment.fromDataStream(stream)

val res: Table = table.select("_1 as word, _2 as count")

.filter("count<>5")

// toAppendStream  需要导入隐式转换  import org.apache.flink.table.api.scala._

//把table 追加成流才行

    val result: DataStream[(String, Int)] = res.toAppendStream[(String,Int)]

result.print()

environment.execute(this.getClass.getName)

如图:

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容