聊聊flink的Async I/O

本文主要研究一下flink的Async I/O

实例

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

        // issue the asynchronous request, receive a future for result
        final Future<String> result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
  • 本实例展示了flink Async I/O的基本用法,首先是实现AsyncFunction接口,用于编写异步请求逻辑及将结果或异常设置到resultFuture,然后就是使用AsyncDataStream的unorderedWait或orderedWait方法将AsyncFunction作用到DataStream作为transformation;AsyncDataStream的unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发)async request在执行

AsyncFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/AsyncFunction.java

/**
 * A function to trigger Async I/O operation.
 *
 * <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
 * the result can be collected by calling {@link ResultFuture#complete}. For each async
 * operation, its context is stored in the operator immediately after invoking
 * #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
 *
 * <p>{@link ResultFuture} can be passed into callbacks or futures to collect the result data.
 * An error can also be propagate to the async IO operator by
 * {@link ResultFuture#completeExceptionally(Throwable)}.
 *
 * <p>Callback example usage:
 *
 * <pre>{@code
 * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
 *
 *   public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {
 *     HBaseCallback cb = new HBaseCallback(result);
 *     Get get = new Get(Bytes.toBytes(row));
 *     hbase.asyncGet(get, cb);
 *   }
 * }
 * }</pre>
 *
 * <p>Future example usage:
 *
 * <pre>{@code
 * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
 *
 *   public void asyncInvoke(String row, final ResultFuture<String> result) throws Exception {
 *     Get get = new Get(Bytes.toBytes(row));
 *     ListenableFuture<Result> future = hbase.asyncGet(get);
 *     Futures.addCallback(future, new FutureCallback<Result>() {
 *       public void onSuccess(Result result) {
 *         List<String> ret = process(result);
 *         result.complete(ret);
 *       }
 *       public void onFailure(Throwable thrown) {
 *         result.completeExceptionally(thrown);
 *       }
 *     });
 *   }
 * }
 * }</pre>
 *
 * @param <IN> The type of the input elements.
 * @param <OUT> The type of the returned elements.
 */
@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable {

    /**
     * Trigger async operation for each stream input.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     * @exception Exception in case of a user code error. An exception will make the task fail and
     * trigger fail-over process.
     */
    void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

    /**
     * {@link AsyncFunction#asyncInvoke} timeout occurred.
     * By default, the result future is exceptionally completed with a timeout exception.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     */
    default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        resultFuture.completeExceptionally(
            new TimeoutException("Async function call has timed out."));
    }

}
  • AsyncFunction接口继承了Function,它定义了asyncInvoke方法以及一个default的timeout方法;asyncInvoke方法执行异步逻辑,然后通过ResultFuture.complete将结果设置到ResultFuture,如果异常则通过ResultFuture.completeExceptionally(Throwable)来传递到ResultFuture

RichAsyncFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java

@PublicEvolving
public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {

    private static final long serialVersionUID = 3858030061138121840L;

    @Override
    public void setRuntimeContext(RuntimeContext runtimeContext) {
        Preconditions.checkNotNull(runtimeContext);

        if (runtimeContext instanceof IterationRuntimeContext) {
            super.setRuntimeContext(
                new RichAsyncFunctionIterationRuntimeContext(
                    (IterationRuntimeContext) runtimeContext));
        } else {
            super.setRuntimeContext(new RichAsyncFunctionRuntimeContext(runtimeContext));
        }
    }

    @Override
    public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

    //......
}
  • RichAsyncFunction继承了AbstractRichFunction,同时声明实现AsyncFunction接口,它不没有实现asyncInvoke,交由子类实现;它覆盖了setRuntimeContext方法,这里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext进行包装

RichAsyncFunctionRuntimeContext

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java

    /**
     * A wrapper class for async function's {@link RuntimeContext}. The async function runtime
     * context only supports basic operations which are thread safe. Consequently, state access,
     * accumulators, broadcast variables and the distributed cache are disabled.
     */
    private static class RichAsyncFunctionRuntimeContext implements RuntimeContext {
        private final RuntimeContext runtimeContext;

        RichAsyncFunctionRuntimeContext(RuntimeContext context) {
            runtimeContext = Preconditions.checkNotNull(context);
        }

        @Override
        public String getTaskName() {
            return runtimeContext.getTaskName();
        }

        @Override
        public MetricGroup getMetricGroup() {
            return runtimeContext.getMetricGroup();
        }

        @Override
        public int getNumberOfParallelSubtasks() {
            return runtimeContext.getNumberOfParallelSubtasks();
        }

        @Override
        public int getMaxNumberOfParallelSubtasks() {
            return runtimeContext.getMaxNumberOfParallelSubtasks();
        }

        @Override
        public int getIndexOfThisSubtask() {
            return runtimeContext.getIndexOfThisSubtask();
        }

        @Override
        public int getAttemptNumber() {
            return runtimeContext.getAttemptNumber();
        }

        @Override
        public String getTaskNameWithSubtasks() {
            return runtimeContext.getTaskNameWithSubtasks();
        }

        @Override
        public ExecutionConfig getExecutionConfig() {
            return runtimeContext.getExecutionConfig();
        }

        @Override
        public ClassLoader getUserCodeClassLoader() {
            return runtimeContext.getUserCodeClassLoader();
        }

        // -----------------------------------------------------------------------------------
        // Unsupported operations
        // -----------------------------------------------------------------------------------

        @Override
        public DistributedCache getDistributedCache() {
            throw new UnsupportedOperationException("Distributed cache is not supported in rich async functions.");
        }

        @Override
        public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
            throw new UnsupportedOperationException("State is not supported in rich async functions.");
        }

        @Override
        public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
            throw new UnsupportedOperationException("State is not supported in rich async functions.");
        }

        @Override
        public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
            throw new UnsupportedOperationException("State is not supported in rich async functions.");
        }

        @Override
        public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
            throw new UnsupportedOperationException("State is not supported in rich async functions.");
        }

        @Override
        public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
            throw new UnsupportedOperationException("State is not supported in rich async functions.");
        }

        @Override
        public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
            throw new UnsupportedOperationException("State is not supported in rich async functions.");
        }

        @Override
        public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
            throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
        }

        @Override
        public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
            throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
        }

        @Override
        public Map<String, Accumulator<?, ?>> getAllAccumulators() {
            throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
        }

        @Override
        public IntCounter getIntCounter(String name) {
            throw new UnsupportedOperationException("Int counters are not supported in rich async functions.");
        }

        @Override
        public LongCounter getLongCounter(String name) {
            throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
        }

        @Override
        public DoubleCounter getDoubleCounter(String name) {
            throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
        }

        @Override
        public Histogram getHistogram(String name) {
            throw new UnsupportedOperationException("Histograms are not supported in rich async functions.");
        }

        @Override
        public boolean hasBroadcastVariable(String name) {
            throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
        }

        @Override
        public <RT> List<RT> getBroadcastVariable(String name) {
            throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
        }

        @Override
        public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
            throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
        }
    }
  • RichAsyncFunctionRuntimeContext实现了RuntimeContext接口,它将一些方法代理给RuntimeContext,其余的Unsupported的方法都覆盖抛出UnsupportedOperationException

RichAsyncFunctionIterationRuntimeContext

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java

    private static class RichAsyncFunctionIterationRuntimeContext extends RichAsyncFunctionRuntimeContext implements IterationRuntimeContext {

        private final IterationRuntimeContext iterationRuntimeContext;

        RichAsyncFunctionIterationRuntimeContext(IterationRuntimeContext iterationRuntimeContext) {
            super(iterationRuntimeContext);

            this.iterationRuntimeContext = Preconditions.checkNotNull(iterationRuntimeContext);
        }

        @Override
        public int getSuperstepNumber() {
            return iterationRuntimeContext.getSuperstepNumber();
        }

        // -----------------------------------------------------------------------------------
        // Unsupported operations
        // -----------------------------------------------------------------------------------

        @Override
        public <T extends Aggregator<?>> T getIterationAggregator(String name) {
            throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
        }

        @Override
        public <T extends Value> T getPreviousIterationAggregate(String name) {
            throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
        }
    }
  • RichAsyncFunctionIterationRuntimeContext继承了RichAsyncFunctionRuntimeContext,实现了IterationRuntimeContext接口,它将getSuperstepNumber方法交由IterationRuntimeContext处理,然后覆盖getIterationAggregator、getPreviousIterationAggregate方法抛出UnsupportedOperationException

AsyncDataStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/AsyncDataStream.java

@PublicEvolving
public class AsyncDataStream {

    /**
     * Output mode for asynchronous operations.
     */
    public enum OutputMode { ORDERED, UNORDERED }

    private static final int DEFAULT_QUEUE_CAPACITY = 100;

    private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            int bufSize,
            OutputMode mode) {

        TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
            func,
            AsyncFunction.class,
            0,
            1,
            new int[]{1, 0},
            in.getType(),
            Utils.getCallLocationName(),
            true);

        // create transform
        AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
            in.getExecutionEnvironment().clean(func),
            timeout,
            bufSize,
            mode);

        return in.transform("async wait operator", outTypeInfo, operator);
    }

    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            TimeUnit timeUnit,
            int capacity) {
        return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);
    }

    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            TimeUnit timeUnit) {
        return addOperator(
            in,
            func,
            timeUnit.toMillis(timeout),
            DEFAULT_QUEUE_CAPACITY,
            OutputMode.UNORDERED);
    }

    public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            TimeUnit timeUnit,
            int capacity) {
        return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
    }

    public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            TimeUnit timeUnit) {
        return addOperator(
            in,
            func,
            timeUnit.toMillis(timeout),
            DEFAULT_QUEUE_CAPACITY,
            OutputMode.ORDERED);
    }
}
  • AsyncDataStream提供了unorderedWait、orderedWait两类方法来将AsyncFunction作用于DataStream
  • unorderedWait、orderedWait方法有带capacity参数的也有不带capacity参数的,不带capacity参数即默认使用DEFAULT_QUEUE_CAPACITY,即100;这些方法最后都是调用addOperator私有方法来实现,它使用的是AsyncWaitOperator;unorderedWait、orderedWait方法都带了timeout参数,用于指定等待async操作完成的超时时间
  • AsyncDataStream提供了两种OutputMode,其中UNORDERED是无序的,即一旦async操作完成就emit结果,当使用TimeCharacteristic.ProcessingTime的时候这种模式延迟最低、负载最低;ORDERED是有序的,即按element的输入顺序emit结果,为了保证有序operator需要缓冲数据,因而会造成一定的延迟及负载

小结

  • flink给外部数据访问提供了Asynchronous I/O的API,用于提升streaming的吞吐量,其基本使用就是定义一个实现AsyncFunction接口的function,然后使用AsyncDataStream的unorderedWait或orderedWait方法将AsyncFunction作用到DataStream作为transformation
  • AsyncFunction接口继承了Function,它定义了asyncInvoke方法以及一个default的timeout方法;asyncInvoke方法执行异步逻辑,然后通过ResultFuture.complete将结果或异常设置到ResultFuture,如果异常则通过ResultFuture.completeExceptionally(Throwable)来传递到ResultFuture;RichAsyncFunction继承了AbstractRichFunction,同时声明实现AsyncFunction接口,它不没有实现asyncInvoke,交由子类实现;它覆盖了setRuntimeContext方法,这里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext进行包装
  • AsyncDataStream的unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发)async request在执行;AsyncDataStream提供了两种OutputMode,其中UNORDERED是无序的,即一旦async操作完成就emit结果,当使用TimeCharacteristic.ProcessingTime的时候这种模式延迟最低、负载最低;ORDERED是有序的,即按element的输入顺序emit结果,为了保证有序operator需要缓冲数据,因而会造成一定的延迟及负载

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容