迭代处理是批量处理处理中的常见操作, Flink 的 迭代计算支持两种模式, 分别是 Bulk Iteration (全量迭代计算) 和 Delt Iteration (增量迭代计算). 下面就一个计算圆周率的例子 来说一下使用 Bulk Iteration 都有哪几个步骤.
在 Bulk Iteration 中, 主要的步骤其实是分为3步, 第一步是指定最大循环次数, 第二步是指定在循环时的一个计算处理的过程, 最后一步就是调用计算过程, 指定结束条件. 具体代码如下所示
public class BulkIteration {
public static void main(String[] args) throws Exception {
// 获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 构建输出数据
DataSource<Integer> data = env.fromElements(0);
// 1. 指定循环次数
IterativeDataSet<Integer> loop = data.iterate(1000);
// 2. 指循环计算过程
MapOperator<Integer, Integer> process = loop.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer i) throws Exception {
double x = Math.random();
double y = Math.random();
int result = (x * x + y * y) < 1 ? 1 : 0;
return i + result;
}
});
// 3. 使用 closeWith 调用计算过程
List<Integer> collect = loop.closeWith(process).collect();
// 输出最终结果
for (Integer i : collect) {
System.out.println( i / 1000.0 * 4);
}
}
}