2021-02-01-Flink-37(Flink TableAPI 二)

1.从文件中读取数据

public class Test1 {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
        String PATH = "C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\1.csv";
        //注册一张表
        //指定路径,格式,元数据字段类型
        tableEnvironment.connect(new FileSystem().path(PATH)
        ).withFormat(new Csv()).withSchema(new Schema()
                .field("id", DataTypes.STRING())
                .field("age", DataTypes.INT())
        ).createTemporaryTable("myTable");

        Table table = tableEnvironment.from("myTable");
        tableEnvironment.toRetractStream(table, Row.class).print();
        environment.execute("job");
    }
}

2.更新模式

image.png

image.png

3.时间特性

processing Time

public class Test1 {
    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
        String PATH = "C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\1.csv";
        DataStreamSource<String> source = environment.readTextFile(PATH);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
        SingleOutputStreamOperator<Tuple2<String, String>> streamOperator = source.map(x -> Tuple2.of(x.split(",")[0], x.split(",")[1])).returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
        }));
        Table table = tableEnvironment.fromDataStream(streamOperator, "id,age,pt.proctime");
        tableEnvironment.toAppendStream(table, Row.class).print();
        environment.execute();

    }
}

event Time

public class Test1 {
    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
        environment.setParallelism(1);
        String PATH = "C:\\Users\\hp\\IdeaProjects\\Flink_Java\\src\\main\\resources\\1.csv";
        DataStreamSource<String> source = environment.readTextFile(PATH);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
        SingleOutputStreamOperator<String> operator = source.assignTimestampsAndWatermarks(WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));
        SingleOutputStreamOperator<Tuple2<Long, String>> operator1 = operator.map(x -> Tuple2.of(Long.parseLong(x.split(",")[0]), x.split(",")[1])).returns(TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {
        }));
        Table table = tableEnvironment.fromDataStream(operator1, "id.rowtime  ,age");
        tableEnvironment.toAppendStream(table, Row.class).print();
        environment.execute();

    }
}

4.窗口

image.png

image.png

image.png

image.png
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容