Flink-1.13.0 Table Api & SQL Java Demo

1、导入依赖

      <!-- 使用table api 引入的依赖,使用桥接器和底层datastream api连接支持-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--如果需要在本地运行table api和sql 还需要引入一下依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--如果想实现自定义的数据格式来做序列化,需要引入一下依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--连接外部数据格式解析,采用csv方式来解析-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

1.1、从文件中输入

路径:input/clicks.txt

Bob,./test/111,1000
Bob,./test/222,1000
Bob,./test/333,1000
Bob,./test/444,1000
image.png

2、输出到文件demo

package com.flinktest.wc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.mesos.Protos;

import static org.apache.flink.table.api.Expressions.$;

public class CommApiTest {
    public static void main(String[] args) throws Exception{
        // 创建执行环境的两种方式,流方式 & 表方式
        // 1 创建执行环境(流方式创建)
        //        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //        env.setParallelism(1);
        //        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2 创建执行环境(表方式创建) 基于alibaba 的 blink planner实现
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 3 创建一张连接器表(输入表)
        String createInDDL = "CREATE TABLE clickTable (" +
                "user_name STRING, " +
                "url STRING, " +
                "ts BIGINT " +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createInDDL);

        // 4 创建一张连接器表(输出表)
        String createOutDDL = "CREATE TABLE outTable (" +
                "user_name STRING, " +
                "url STRING " +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'output'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createOutDDL);

        // 使用输入表
        Table clickTable = tableEnv.from("clickTable");

        // 使用table api方式来查询
        Table resultTable = clickTable.where($("user_name").isEqual("Bob"))
                .select($("user_name"), $("url"));

        // 将结果表注册到临时表中,这样就可以使用这张表了
        tableEnv.createTemporaryView("result",resultTable);

        // 使用sql 方式来查询查询
        // Table resultTable2 = tableEnv.sqlQuery("select user_name,url from result");

        // 输出表到文件
        resultTable.executeInsert("outTable");
    }
}

3、输出到控制台demo

package com.flinktest.wc;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class CommApiTest1 {
    public static void main(String[] args) throws Exception{
        // 创建执行环境的两种方式,流方式 & 表方式
        // 1 创建执行环境(流方式创建)
        //        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //        env.setParallelism(1);
        //        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2 创建执行环境(表方式创建) 基于alibaba 的 blink planner实现
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 3 创建一张连接器表(输入表)
        String createInDDL = "CREATE TABLE clickTable (" +
                "user_name STRING, " +
                "url STRING, " +
                "ts BIGINT " +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createInDDL);

        // 4 创建一张连接器表(输出表)
        String createOutDDL = "CREATE TABLE outTable (" +
                "user_name STRING, " +
                "url STRING " +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'output'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createOutDDL);

        // 使用输入表
        Table clickTable = tableEnv.from("clickTable");

        // 使用table api方式来查询
        Table resultTable = clickTable.where($("user_name").isEqual("Bob"))
                .select($("user_name"), $("url"));

        // 将结果表注册到临时表中,这样就可以使用这张表了
        tableEnv.createTemporaryView("result2",resultTable);

        // 使用sql 方式来查询查询
        Table resultTable2 = tableEnv.sqlQuery("select user_name,url from result2");

        // 输出表,输出到文件
        // resultTable.executeInsert("outTable");

        // 创建一张控制台打印的一张表
        String createPrintOutDDL = "CREATE TABLE printOutTable (" +
                "user_name STRING, " +
                "url STRING " +
                ") WITH (" +
                " 'connector' = 'print' " +
                ")";
        tableEnv.executeSql(createPrintOutDDL);
        // 输出到控制台
        resultTable2.executeInsert("printOutTable");
    }
}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 一、Python介绍 Python 是一个高层次的结合了解释性、编译性、互动性和面向对象的脚本语言。 Python...
    5888eb1818d9阅读 762评论 0 0
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,269评论 19 139
  • FileInputStream : 输入流 int available() : 一次读取所有的字节数 read()...
    小小一技术驿站阅读 249评论 0 0
  • 1、IO流 1.1、概述 之前学习的File类它只能操作文件或文件夹,并不能去操作文件中的数据。真正保存数据的是文...
    Villain丶Cc阅读 2,704评论 0 5
  • 一、Python介绍Python 是一个高层次的结合了解释性、编译性、互动性和面向对象的脚本语言。Python 的...
    ad458edb873e阅读 355评论 0 0