flink使用08-在dataStream中使用AsyncFunction

在流式处理的过程中, 在中间步骤的处理中, 如果涉及到一些费事的操作或者是外部系统的数据交互, 那么就会给整个流造成一定的延迟. 在 flink 的 1.2 版本中引入了 Asynchronous I/O, 能够支持异步的操作, 以提高 flink 系统与外部数据系统交互的性能及吞吐量.

在使用 Flink 的异步 IO 时, 主要有两个 API可以使用, 一个是AsyncDataStream.unorderedWait( ), 另一个AsyncDataStream.orderedWait( ).在异步处理过程中,原本数据的顺序可能会发生变化, 使用unorderWait的方法, 不会考虑顺序的问题, 一旦处理完成就会直接返回结果, 这种方法具有较低的延迟和负载. 那么orderWait的方法就是想对应的, 严格按照原本流中的数据顺序做返回, 会对系统造成一定的延迟. 实际中应该根据具体的业务情况做选择.unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发)async request在执行;

在使用异步IO时,需要自己去继承AsyncFunction,AsyncFunction接口继承了Function,它定义了asyncInvoke方法以及一个default的timeout方法;asyncInvoke方法执行异步逻辑,然后通过ResultFuture.complete将结果或异常设置到ResultFuture,如果异常则通过ResultFuture.completeExceptionally(Throwable)来传递 ResultFuture;RichAsyncFunction继承了AbstractRichFunction,同时声明实现AsyncFunction接口,它不没有实现asyncInvoke,交由子类实现;它覆盖了setRuntimeContext方法,这里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext进行包装.

下面是一个验证 Async I/O 的demo, 具体代码见仓库 -> code link

public class AsyncIOExample {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> inp = env.fromElements(AsyncIOData.WORDS);
        // 接收数据
        SingleOutputStreamOperator<String> out = inp.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.println("读取数据:" + s + "  当前时间:" + System.currentTimeMillis());
                return s;
            }
        });
        // 使用 AsyncFunction 对函数做一个简单的处理, 中间随机睡眠 1-10s
        DataStream<String> asyncStream = AsyncDataStream.unorderedWait(out, new SimpleAsyncFunction(), 20_000L, TimeUnit.MILLISECONDS);
        // 对已经被 AsyncFunction 处理过的数据再输出一次
        asyncStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.println("数据处理完毕:" + s + "  当前时间:" + System.currentTimeMillis());
                return s;
            }
        });


        env.execute("AsyncFunction Demo");
    }

    public static class SimpleAsyncFunction extends RichAsyncFunction<String, String>{

        private long waitTime;
        private final Random rnd = new Random(hashCode());

        @Override
        public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
            // 随机睡眠 1 - 10s
            System.out.println("开始 AsyncFunction  target -> " + input);
            waitTime = rnd.nextInt(10);
            Thread.sleep(waitTime * 1000);
            String out = input + input;
            resultFuture.complete(Collections.singletonList(out));
            System.out.println("结束 AsyncFunction  target -> " + input + "  Sleep time = " + waitTime + "s");
        }
    }
}

以上代码的输出结果为:

读取数据:D  当前时间:1569574233046
读取数据:C  当前时间:1569574233047
读取数据:A  当前时间:1569574233048
读取数据:B  当前时间:1569574233049
开始 AsyncFunction  target -> D
开始 AsyncFunction  target -> C
开始 AsyncFunction  target -> A
开始 AsyncFunction  target -> B
结束 AsyncFunction  target -> DSleep time = 6s
数据处理完毕:DD  当前时间:1569574239065
结束 AsyncFunction  target -> CSleep time = 6s
数据处理完毕:CC  当前时间:1569574239069
结束 AsyncFunction  target -> ASleep time = 6s
数据处理完毕:AA  当前时间:1569574239072
结束 AsyncFunction  target -> BSleep time = 6s
数据处理完毕:BB  当前时间:1569574239076
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容