Flink编程基础/基本框架

Flink程序是对数据流,进行各种分布式处理/转换。通过sources创建输入的数据流(可以读取文件,从kafka的topic读取,或者内存的collecitons);结果通过Sinks输出,可以写到本不是文件系统上,或者输出到标准输出。Flink程序可以独立运行,也可以嵌入到其它程序中运行;Flink可以在本机的JVM中执行,也可以提交到多机器的集群上执行。

Flink程序分为批处理和流处理两种,批处理用来处理有限的数据集,流处理用来处理持续的流数据。这两种类型的基本编程模式是类似的,本别使用DataStream API和DataSet API,接下来基于DataStream API来介绍。

Flink程序的五个部分:
1 获取执行环境
2 载入数据
3 对数据进行处理/转换
4 设置数据输出方式
5 启动程序,开始执行

下面以一个实际程序为例来说明,下面的程序的输入是出租车司机的结单数据,输出每个出租车司机的累积结单数量:

        ParameterTool params = ParameterTool.fromArgs(args);
        final String input = params.get("input", ExerciseBase.pathToRideData);

        final int maxEventDelay = 60;       // events are out of order by max 60 seconds
        final int servingSpeedFactor = 600; // events of 10 minutes are served every second

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // start the data generator
        DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor));

        // map each ride to a tuple of (driverId, 1)
        DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
                    @Override
                    public Tuple2<Long, Long> map(TaxiRide ride) throws Exception {
                        return new Tuple2<Long, Long>(ride.driverId, 1L) ;
                    }
        });

        // partition the stream by the driverId
        KeyedStream<Tuple2<Long, Long>, Tuple> keyedByDriverId = tuples.keyBy(0);

        // count the rides for each driver
        DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);

        // we could, in fact, print out any or all of these streams
        rideCounts.print();

        // run the cleansing pipeline
        env.execute("Ride Count");
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容