聊聊flink的RichParallelSourceFunction

本文主要研究一下flink的RichParallelSourceFunction

RichParallelSourceFunction

/**
 * Base class for implementing a parallel data source. Upon execution, the runtime will
 * execute as many parallel instances of this function function as configured parallelism
 * of the source.
 *
 * <p>The data source has access to context information (such as the number of parallel
 * instances of the source, and which parallel instance the current instance is)
 * via {@link #getRuntimeContext()}. It also provides additional life-cycle methods
 * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p>
 *
 * @param <OUT> The type of the records produced by this source.
 */
@Public
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
        implements ParallelSourceFunction<OUT> {

    private static final long serialVersionUID = 1L;
}
  • RichParallelSourceFunction实现了ParallelSourceFunction接口,同时继承了AbstractRichFunction

ParallelSourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java

/**
 * A stream data source that is executed in parallel. Upon execution, the runtime will
 * execute as many parallel instances of this function function as configured parallelism
 * of the source.
 *
 * <p>This interface acts only as a marker to tell the system that this source may
 * be executed in parallel. When different parallel instances are required to perform
 * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
 * context, which reveals information like the number of parallel tasks, and which parallel
 * task the current instance is.
 *
 * @param <OUT> The type of the records produced by this source.
 */
@Public
public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
}
  • ParallelSourceFunction继承了SourceFunction接口,它并没有定义其他额外的方法,仅仅是用接口名来表达意图,即可以被并行执行的stream data source

AbstractRichFunction

flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/functions/AbstractRichFunction.java

/**
 * An abstract stub implementation for rich user-defined functions.
 * Rich functions have additional methods for initialization ({@link #open(Configuration)}) and
 * teardown ({@link #close()}), as well as access to their runtime execution context via
 * {@link #getRuntimeContext()}.
 */
@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {

    private static final long serialVersionUID = 1L;

    // --------------------------------------------------------------------------------------------
    //  Runtime context access
    // --------------------------------------------------------------------------------------------

    private transient RuntimeContext runtimeContext;

    @Override
    public void setRuntimeContext(RuntimeContext t) {
        this.runtimeContext = t;
    }

    @Override
    public RuntimeContext getRuntimeContext() {
        if (this.runtimeContext != null) {
            return this.runtimeContext;
        } else {
            throw new IllegalStateException("The runtime context has not been initialized.");
        }
    }

    @Override
    public IterationRuntimeContext getIterationRuntimeContext() {
        if (this.runtimeContext == null) {
            throw new IllegalStateException("The runtime context has not been initialized.");
        } else if (this.runtimeContext instanceof IterationRuntimeContext) {
            return (IterationRuntimeContext) this.runtimeContext;
        } else {
            throw new IllegalStateException("This stub is not part of an iteration step function.");
        }
    }

    // --------------------------------------------------------------------------------------------
    //  Default life cycle methods
    // --------------------------------------------------------------------------------------------

    @Override
    public void open(Configuration parameters) throws Exception {}

    @Override
    public void close() throws Exception {}
}
  • AbstractRichFunction主要实现了RichFunction接口的setRuntimeContext、getRuntimeContext、getIterationRuntimeContext方法;open及close方法都是空操作

RuntimeContext

flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/functions/RuntimeContext.java

/**
 * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
 * of the function will have a context through which it can access static contextual information (such as
 * the current parallelism) and other constructs like accumulators and broadcast variables.
 *
 * <p>A function can, during runtime, obtain the RuntimeContext via a call to
 * {@link AbstractRichFunction#getRuntimeContext()}.
 */
@Public
public interface RuntimeContext {

    /**
     * Returns the name of the task in which the UDF runs, as assigned during plan construction.
     *
     * @return The name of the task in which the UDF runs.
     */
    String getTaskName();

    /**
     * Returns the metric group for this parallel subtask.
     *
     * @return The metric group for this parallel subtask.
     */
    @PublicEvolving
    MetricGroup getMetricGroup();

    /**
     * Gets the parallelism with which the parallel task runs.
     *
     * @return The parallelism with which the parallel task runs.
     */
    int getNumberOfParallelSubtasks();

    /**
     * Gets the number of max-parallelism with which the parallel task runs.
     *
     * @return The max-parallelism with which the parallel task runs.
     */
    @PublicEvolving
    int getMaxNumberOfParallelSubtasks();

    /**
     * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
     * parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
     *
     * @return The index of the parallel subtask.
     */
    int getIndexOfThisSubtask();

    /**
     * Gets the attempt number of this parallel subtask. First attempt is numbered 0.
     *
     * @return Attempt number of the subtask.
     */
    int getAttemptNumber();

    /**
     * Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)",
     * where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be
     * {@link #getNumberOfParallelSubtasks()}.
     *
     * @return The name of the task, with subtask indicator.
     */
    String getTaskNameWithSubtasks();

    /**
     * Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing
     * job.
     */
    ExecutionConfig getExecutionConfig();

    //.......
}
  • RuntimeContext定义了很多方法,这里我们看下getNumberOfParallelSubtasks方法,它可以返回当前的task的parallelism;而getIndexOfThisSubtask则可以获取当前parallel subtask的下标;可以根据这些信息,开发既能并行执行但各自发射的数据又不重复的ParallelSourceFunction

JobMaster.startJobExecution

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/jobmaster/JobMaster.java

    private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
        validateRunsInMainThread();

        checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");

        if (Objects.equals(getFencingToken(), newJobMasterId)) {
            log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);

            return Acknowledge.get();
        }

        setNewFencingToken(newJobMasterId);

        startJobMasterServices();

        log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());

        resetAndScheduleExecutionGraph();

        return Acknowledge.get();
    }

    private void resetAndScheduleExecutionGraph() throws Exception {
        validateRunsInMainThread();

        final CompletableFuture<Void> executionGraphAssignedFuture;

        if (executionGraph.getState() == JobStatus.CREATED) {
            executionGraphAssignedFuture = CompletableFuture.completedFuture(null);
        } else {
            suspendAndClearExecutionGraphFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
            final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
            final ExecutionGraph newExecutionGraph = createAndRestoreExecutionGraph(newJobManagerJobMetricGroup);

            executionGraphAssignedFuture = executionGraph.getTerminationFuture().handleAsync(
                (JobStatus ignored, Throwable throwable) -> {
                    assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup);
                    return null;
                },
                getMainThreadExecutor());
        }

        executionGraphAssignedFuture.thenRun(this::scheduleExecutionGraph);
    }

    private void scheduleExecutionGraph() {
        checkState(jobStatusListener == null);
        // register self as job status change listener
        jobStatusListener = new JobManagerJobStatusListener();
        executionGraph.registerJobStatusListener(jobStatusListener);

        try {
            executionGraph.scheduleForExecution();
        }
        catch (Throwable t) {
            executionGraph.failGlobal(t);
        }
    }
  • 这里调用了resetAndScheduleExecutionGraph方法,而resetAndScheduleExecutionGraph则组合了scheduleExecutionGraph方法;scheduleExecutionGraph这里调用executionGraph.scheduleForExecution()来调度执行

ExecutionGraph.scheduleForExecution

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionGraph.java

    public void scheduleForExecution() throws JobException {

        final long currentGlobalModVersion = globalModVersion;

        if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {

            final CompletableFuture<Void> newSchedulingFuture;

            switch (scheduleMode) {

                case LAZY_FROM_SOURCES:
                    newSchedulingFuture = scheduleLazy(slotProvider);
                    break;

                case EAGER:
                    newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
                    break;

                default:
                    throw new JobException("Schedule mode is invalid.");
            }

            if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
                schedulingFuture = newSchedulingFuture;

                newSchedulingFuture.whenCompleteAsync(
                    (Void ignored, Throwable throwable) -> {
                        if (throwable != null && !(throwable instanceof CancellationException)) {
                            // only fail if the scheduling future was not canceled
                            failGlobal(ExceptionUtils.stripCompletionException(throwable));
                        }
                    },
                    futureExecutor);
            } else {
                newSchedulingFuture.cancel(false);
            }
        }
        else {
            throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
        }
    }
  • 这里走的是EAGER模式,因而调用scheduleEager方法

ExecutionGraph.scheduleEager

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionGraph.java

    /**
     *
     *
     * @param slotProvider  The resource provider from which the slots are allocated
     * @param timeout       The maximum time that the deployment may take, before a
     *                      TimeoutException is thrown.
     * @returns Future which is completed once the {@link ExecutionGraph} has been scheduled.
     * The future can also be completed exceptionally if an error happened.
     */
    private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) {
        checkState(state == JobStatus.RUNNING, "job is not running currently");

        // Important: reserve all the space we need up front.
        // that way we do not have any operation that can fail between allocating the slots
        // and adding them to the list. If we had a failure in between there, that would
        // cause the slots to get lost
        final boolean queued = allowQueuedScheduling;

        // collecting all the slots may resize and fail in that operation without slots getting lost
        final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());

        // allocate the slots (obtain all their futures
        for (ExecutionJobVertex ejv : getVerticesTopologically()) {
            // these calls are not blocking, they only return futures
            Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
                slotProvider,
                queued,
                LocationPreferenceConstraint.ALL,
                allocationTimeout);

            allAllocationFutures.addAll(allocationFutures);
        }

        // this future is complete once all slot futures are complete.
        // the future fails once one slot future fails.
        final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

        final CompletableFuture<Void> currentSchedulingFuture = allAllocationsFuture
            .thenAccept(
                (Collection<Execution> executionsToDeploy) -> {
                    for (Execution execution : executionsToDeploy) {
                        try {
                            execution.deploy();
                        } catch (Throwable t) {
                            throw new CompletionException(
                                new FlinkException(
                                    String.format("Could not deploy execution %s.", execution),
                                    t));
                        }
                    }
                })
            // Generate a more specific failure message for the eager scheduling
            .exceptionally(
                (Throwable throwable) -> {
                    final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
                    final Throwable resultThrowable;

                    if (strippedThrowable instanceof TimeoutException) {
                        int numTotal = allAllocationsFuture.getNumFuturesTotal();
                        int numComplete = allAllocationsFuture.getNumFuturesCompleted();
                        String message = "Could not allocate all requires slots within timeout of " +
                            timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;

                        resultThrowable = new NoResourceAvailableException(message);
                    } else {
                        resultThrowable = strippedThrowable;
                    }

                    throw new CompletionException(resultThrowable);
                });

        return currentSchedulingFuture;
    }
  • scheduleEager方法这里先调用getVerticesTopologically来获取ExecutionJobVertex
  • 之后调用ExecutionJobVertex.allocateResourcesForAll来分配资源得到Collection<CompletableFuture<Execution>>
  • 最后通过FutureUtils.combineAll(allAllocationFutures)等待这批Future,之后挨个调用execution.deploy()进行部署

ExecutionJobVertex.allocateResourcesForAll

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java

    /**
     * Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns
     * pairs of the slots and execution attempts, to ease correlation between vertices and execution
     * attempts.
     *
     * <p>If this method throws an exception, it makes sure to release all so far requested slots.
     *
     * @param resourceProvider The resource provider from whom the slots are requested.
     * @param queued if the allocation can be queued
     * @param locationPreferenceConstraint constraint for the location preferences
     * @param allocationTimeout timeout for allocating the individual slots
     */
    public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
            SlotProvider resourceProvider,
            boolean queued,
            LocationPreferenceConstraint locationPreferenceConstraint,
            Time allocationTimeout) {
        final ExecutionVertex[] vertices = this.taskVertices;
        final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length];

        // try to acquire a slot future for each execution.
        // we store the execution with the future just to be on the safe side
        for (int i = 0; i < vertices.length; i++) {
            // allocate the next slot (future)
            final Execution exec = vertices[i].getCurrentExecutionAttempt();
            final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(
                resourceProvider,
                queued,
                locationPreferenceConstraint,
                allocationTimeout);
            slots[i] = allocationFuture;
        }

        // all good, we acquired all slots
        return Arrays.asList(slots);
    }
  • 这里根据ExecutionJobVertex的taskVertices来挨个调用exec.allocateAndAssignSlotForExecution进行分配;可以发现整个并行度由taskVertices来决定

Execution.deploy

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.java

    /**
     * Deploys the execution to the previously assigned resource.
     *
     * @throws JobException if the execution cannot be deployed to the assigned resource
     */
    public void deploy() throws JobException {
        final LogicalSlot slot  = assignedResource;

        checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");

        //......

        try {

            //......

            final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
                attemptId,
                slot,
                taskRestore,
                attemptNumber);

            // null taskRestore to let it be GC'ed
            taskRestore = null;

            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout);

            submitResultFuture.whenCompleteAsync(
                (ack, failure) -> {
                    // only respond to the failure case
                    if (failure != null) {
                        if (failure instanceof TimeoutException) {
                            String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';

                            markFailed(new Exception(
                                "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
                                    + ") not responding after a rpcTimeout of " + rpcTimeout, failure));
                        } else {
                            markFailed(failure);
                        }
                    }
                },
                executor);
        }
        catch (Throwable t) {
            markFailed(t);
            ExceptionUtils.rethrow(t);
        }
    }
  • Execution.deploy会创建TaskDeploymentDescriptor,之后通过taskManagerGateway.submitTask提交这个deployment;之后就是触发TaskExecutor去触发Task的run方法

ExecutionJobVertex

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java

    private final ExecutionVertex[] taskVertices;

    public ExecutionJobVertex(
            ExecutionGraph graph,
            JobVertex jobVertex,
            int defaultParallelism,
            Time timeout,
            long initialGlobalModVersion,
            long createTimestamp) throws JobException {

        if (graph == null || jobVertex == null) {
            throw new NullPointerException();
        }

        this.graph = graph;
        this.jobVertex = jobVertex;

        int vertexParallelism = jobVertex.getParallelism();
        int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;

        final int configuredMaxParallelism = jobVertex.getMaxParallelism();

        this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism);

        // if no max parallelism was configured by the user, we calculate and set a default
        setMaxParallelismInternal(maxParallelismConfigured ?
                configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));

        // verify that our parallelism is not higher than the maximum parallelism
        if (numTaskVertices > maxParallelism) {
            throw new JobException(
                String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",
                    jobVertex.getName(),
                    numTaskVertices,
                    maxParallelism));
        }

        this.parallelism = numTaskVertices;

        this.serializedTaskInformation = null;

        this.taskVertices = new ExecutionVertex[numTaskVertices];
        //......

        // create all task vertices
        for (int i = 0; i < numTaskVertices; i++) {
            ExecutionVertex vertex = new ExecutionVertex(
                    this,
                    i,
                    producedDataSets,
                    timeout,
                    initialGlobalModVersion,
                    createTimestamp,
                    maxPriorAttemptsHistoryLength);

            this.taskVertices[i] = vertex;
        }

        //......
    }
  • taskVertices是一个ExecutionVertex[],它的大小由numTaskVertices决定
  • ExecutionJobVertex先判断jobVertex.getParallelism()是否大于0(一般大于0),大于0则取jobVertex.getParallelism()的值为numTaskVertices;如果不大于0则取defaultParallelism(ExecutionGraph的attachJobGraph方法里头创建ExecutionJobVertex时,传递的defaultParallelism为1)
  • 之后就是根据numTaskVertices挨个创建ExecutionVertex,放入到taskVertices数据中
  • 而jobVertex的parallelism是StreamingJobGraphGenerator在createJobVertex方法中根据streamNode.getParallelism()来设置的(如果streamNode.getParallelism()的值大于0的话)
  • streamNode的parallelism如果自己没有设置,则默认是取StreamExecutionEnvironment的parallelism(详见DataStreamSource的构造器、DataStream.transform方法、DataStreamSink的构造器;DataStreamSource里头会将不是parallel类型的source的parallelism重置为1);如果是LocalEnvironment的话,它默认是取Runtime.getRuntime().availableProcessors()

小结

  • RichParallelSourceFunction实现了ParallelSourceFunction接口,同时继承了AbstractRichFunction;AbstractRichFunction主要实现了RichFunction接口的setRuntimeContext、getRuntimeContext、getIterationRuntimeContext方法;RuntimeContext定义的getNumberOfParallelSubtasks方法(返回当前的task的parallelism)以及getIndexOfThisSubtask(获取当前parallel subtask的下标)方法,可以方便开发既能并行执行但各自发射的数据又不重复的ParallelSourceFunction
  • JobMaster在startJobExecution的时候调用executionGraph.scheduleForExecution()进行调度;期间通过ExecutionJobVertex.allocateResourcesForAll来分配资源得到Collection<CompletableFuture<Execution>>,之后挨个执行execution.deploy()进行部署;Execution.deploy会创建TaskDeploymentDescriptor,之后通过taskManagerGateway.submitTask提交这个deployment;之后就是触发TaskExecutor去触发Task的run方法
  • ExecutionJobVertex.allocateResourcesForAll是根据ExecutionJobVertex的taskVertices来挨个调用exec.allocateAndAssignSlotForExecution进行分配,整个并行度由taskVertices来决定;而taskVertices是在ExecutionJobVertex构造器里头初始化的,如果jobVertex.getParallelism()大于0则取该值,否则取defaultParallelism为1;而jobVertex的parallelism是StreamingJobGraphGenerator在createJobVertex方法中根据streamNode.getParallelism()来设置(如果streamNode.getParallelism()的值大于0的话),如果用户没有设置则默认是取StreamExecutionEnvironment的parallelism;LocalEnvironment的话,它默认是取Runtime.getRuntime().availableProcessors()

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容

  • 序 本文主要研究一下flink的SourceFunction 实例 这里通过addSource方法来添加自定义的S...
    go4it阅读 8,676评论 0 3
  • 序 本文主要研究一下flink的CsvReader 实例 ExecutionEnvironment.readCsv...
    go4it阅读 3,642评论 0 0
  • 序 本文主要研究一下flink的log.file配置 log4j.properties flink-release...
    go4it阅读 5,706评论 0 1
  • Flink初体验 安装 官网:http://flink.apache.org/downloads.html 可以看...
    it_zzy阅读 29,785评论 0 10
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,596评论 18 139