object Stream2Table {
def main(args: Array[String]):Unit = {
import org.apache.flink.streaming.api.scala._
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
val stream: DataStream[(String, Int)] = environment
.readTextFile("/Users/run/Downloads/workspaceIDEA/flink-tutorials/data/words.txt")
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
val tableEnvironment: StreamTableEnvironment =
StreamTableEnvironment.create(environment)
val table: Table = tableEnvironment.fromDataStream(stream)
val res: Table = table.select("_1 as word, _2 as count")
.filter("count<>5")
// toAppendStream 需要导入隐式转换 import org.apache.flink.table.api.scala._
//把table 追加成流才行
val result: DataStream[(String, Int)] = res.toAppendStream[(String,Int)]
result.print()
environment.execute(this.getClass.getName)
如图: