Flink程序是对数据流,进行各种分布式处理/转换。通过sources创建输入的数据流(可以读取文件,从kafka的topic读取,或者内存的collecitons);结果通过Sinks输出,可以写到本不是文件系统上,或者输出到标准输出。Flink程序可以独立运行,也可以嵌入到其它程序中运行;Flink可以在本机的JVM中执行,也可以提交到多机器的集群上执行。
Flink程序分为批处理和流处理两种,批处理用来处理有限的数据集,流处理用来处理持续的流数据。这两种类型的基本编程模式是类似的,本别使用DataStream API和DataSet API,接下来基于DataStream API来介绍。
Flink程序的五个部分:
1 获取执行环境
2 载入数据
3 对数据进行处理/转换
4 设置数据输出方式
5 启动程序,开始执行
下面以一个实际程序为例来说明,下面的程序的输入是出租车司机的结单数据,输出每个出租车司机的累积结单数量:
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);
final int maxEventDelay = 60; // events are out of order by max 60 seconds
final int servingSpeedFactor = 600; // events of 10 minutes are served every second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// start the data generator
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor));
// map each ride to a tuple of (driverId, 1)
DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(TaxiRide ride) throws Exception {
return new Tuple2<Long, Long>(ride.driverId, 1L) ;
}
});
// partition the stream by the driverId
KeyedStream<Tuple2<Long, Long>, Tuple> keyedByDriverId = tuples.keyBy(0);
// count the rides for each driver
DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);
// we could, in fact, print out any or all of these streams
rideCounts.print();
// run the cleansing pipeline
env.execute("Ride Count");