在流式处理的过程中, 在中间步骤的处理中, 如果涉及到一些费事的操作或者是外部系统的数据交互, 那么就会给整个流造成一定的延迟. 在 flink 的 1.2 版本中引入了 Asynchronous I/O, 能够支持异步的操作, 以提高 flink 系统与外部数据系统交互的性能及吞吐量.
在使用 Flink 的异步 IO 时, 主要有两个 API可以使用, 一个是AsyncDataStream.unorderedWait( ), 另一个AsyncDataStream.orderedWait( ).在异步处理过程中,原本数据的顺序可能会发生变化, 使用unorderWait的方法, 不会考虑顺序的问题, 一旦处理完成就会直接返回结果, 这种方法具有较低的延迟和负载. 那么orderWait的方法就是想对应的, 严格按照原本流中的数据顺序做返回, 会对系统造成一定的延迟. 实际中应该根据具体的业务情况做选择.unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发
)async request在执行;
在使用异步IO时,需要自己去继承AsyncFunction,AsyncFunction接口继承了Function,它定义了asyncInvoke方法以及一个default的timeout方法;asyncInvoke方法执行异步逻辑,然后通过ResultFuture.complete将结果或异常设置到ResultFuture,如果异常则通过ResultFuture.completeExceptionally(Throwable)来传递 ResultFuture;RichAsyncFunction继承了AbstractRichFunction,同时声明实现AsyncFunction接口,它不没有实现asyncInvoke,交由子类实现;它覆盖了setRuntimeContext方法,这里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext进行包装.
下面是一个验证 Async I/O 的demo, 具体代码见仓库 -> code link
public class AsyncIOExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> inp = env.fromElements(AsyncIOData.WORDS);
// 接收数据
SingleOutputStreamOperator<String> out = inp.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.println("读取数据:" + s + " 当前时间:" + System.currentTimeMillis());
return s;
}
});
// 使用 AsyncFunction 对函数做一个简单的处理, 中间随机睡眠 1-10s
DataStream<String> asyncStream = AsyncDataStream.unorderedWait(out, new SimpleAsyncFunction(), 20_000L, TimeUnit.MILLISECONDS);
// 对已经被 AsyncFunction 处理过的数据再输出一次
asyncStream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.println("数据处理完毕:" + s + " 当前时间:" + System.currentTimeMillis());
return s;
}
});
env.execute("AsyncFunction Demo");
}
public static class SimpleAsyncFunction extends RichAsyncFunction<String, String>{
private long waitTime;
private final Random rnd = new Random(hashCode());
@Override
public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
// 随机睡眠 1 - 10s
System.out.println("开始 AsyncFunction target -> " + input);
waitTime = rnd.nextInt(10);
Thread.sleep(waitTime * 1000);
String out = input + input;
resultFuture.complete(Collections.singletonList(out));
System.out.println("结束 AsyncFunction target -> " + input + " Sleep time = " + waitTime + "s");
}
}
}
以上代码的输出结果为:
读取数据:D 当前时间:1569574233046
读取数据:C 当前时间:1569574233047
读取数据:A 当前时间:1569574233048
读取数据:B 当前时间:1569574233049
开始 AsyncFunction target -> D
开始 AsyncFunction target -> C
开始 AsyncFunction target -> A
开始 AsyncFunction target -> B
结束 AsyncFunction target -> DSleep time = 6s
数据处理完毕:DD 当前时间:1569574239065
结束 AsyncFunction target -> CSleep time = 6s
数据处理完毕:CC 当前时间:1569574239069
结束 AsyncFunction target -> ASleep time = 6s
数据处理完毕:AA 当前时间:1569574239072
结束 AsyncFunction target -> BSleep time = 6s
数据处理完毕:BB 当前时间:1569574239076