flink使用08-在dataStream中使用AsyncFunction

在流式处理的过程中, 在中间步骤的处理中, 如果涉及到一些费事的操作或者是外部系统的数据交互, 那么就会给整个流造成一定的延迟. 在 flink 的 1.2 版本中引入了 Asynchronous I/O, 能够支持异步的操作, 以提高 flink 系统与外部数据系统交互的性能及吞吐量.

在使用 Flink 的异步 IO 时, 主要有两个 API可以使用, 一个是AsyncDataStream.unorderedWait( ), 另一个AsyncDataStream.orderedWait( ).在异步处理过程中,原本数据的顺序可能会发生变化, 使用unorderWait的方法, 不会考虑顺序的问题, 一旦处理完成就会直接返回结果, 这种方法具有较低的延迟和负载. 那么orderWait的方法就是想对应的, 严格按照原本流中的数据顺序做返回, 会对系统造成一定的延迟. 实际中应该根据具体的业务情况做选择.unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发)async request在执行;

在使用异步IO时,需要自己去继承AsyncFunction,AsyncFunction接口继承了Function,它定义了asyncInvoke方法以及一个default的timeout方法;asyncInvoke方法执行异步逻辑,然后通过ResultFuture.complete将结果或异常设置到ResultFuture,如果异常则通过ResultFuture.completeExceptionally(Throwable)来传递 ResultFuture;RichAsyncFunction继承了AbstractRichFunction,同时声明实现AsyncFunction接口,它不没有实现asyncInvoke,交由子类实现;它覆盖了setRuntimeContext方法,这里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext进行包装.

下面是一个验证 Async I/O 的demo, 具体代码见仓库 -> code link

public class AsyncIOExample {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> inp = env.fromElements(AsyncIOData.WORDS);
        // 接收数据
        SingleOutputStreamOperator<String> out = inp.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.println("读取数据:" + s + "  当前时间:" + System.currentTimeMillis());
                return s;
            }
        });
        // 使用 AsyncFunction 对函数做一个简单的处理, 中间随机睡眠 1-10s
        DataStream<String> asyncStream = AsyncDataStream.unorderedWait(out, new SimpleAsyncFunction(), 20_000L, TimeUnit.MILLISECONDS);
        // 对已经被 AsyncFunction 处理过的数据再输出一次
        asyncStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.println("数据处理完毕:" + s + "  当前时间:" + System.currentTimeMillis());
                return s;
            }
        });


        env.execute("AsyncFunction Demo");
    }

    public static class SimpleAsyncFunction extends RichAsyncFunction<String, String>{

        private long waitTime;
        private final Random rnd = new Random(hashCode());

        @Override
        public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
            // 随机睡眠 1 - 10s
            System.out.println("开始 AsyncFunction  target -> " + input);
            waitTime = rnd.nextInt(10);
            Thread.sleep(waitTime * 1000);
            String out = input + input;
            resultFuture.complete(Collections.singletonList(out));
            System.out.println("结束 AsyncFunction  target -> " + input + "  Sleep time = " + waitTime + "s");
        }
    }
}

以上代码的输出结果为:

读取数据:D  当前时间:1569574233046
读取数据:C  当前时间:1569574233047
读取数据:A  当前时间:1569574233048
读取数据:B  当前时间:1569574233049
开始 AsyncFunction  target -> D
开始 AsyncFunction  target -> C
开始 AsyncFunction  target -> A
开始 AsyncFunction  target -> B
结束 AsyncFunction  target -> DSleep time = 6s
数据处理完毕:DD  当前时间:1569574239065
结束 AsyncFunction  target -> CSleep time = 6s
数据处理完毕:CC  当前时间:1569574239069
结束 AsyncFunction  target -> ASleep time = 6s
数据处理完毕:AA  当前时间:1569574239072
结束 AsyncFunction  target -> BSleep time = 6s
数据处理完毕:BB  当前时间:1569574239076
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容