如下图:
代码:
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
object FlinkSQLWordCount {
def main(args: Array[String]):Unit = {
import org.apache.flink.streaming.api.scala._
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
val stream: DataStream[(String, Int)] = environment
.readTextFile("/Users/run/Downloads/workspaceIDEA/flink-tutorials/data/words.txt")
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
val tableEnvironment: StreamTableEnvironment =
StreamTableEnvironment.create(environment)
val table: Table = tableEnvironment.fromDataStream(stream)
// 注册成一个表
tableEnvironment.createTemporaryView("t_word_count",table)
// 需要通过 tableEnvironment 来调用对应的操作
tableEnvironment.sqlQuery(
"""
|select * from t_word_count
""".stripMargin).toAppendStream[(String,Int)].print()
environment.execute(this.getClass.getName)