聊聊flink的ParallelIteratorInputFormat

本文主要研究一下flink的ParallelIteratorInputFormat

实例

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Long> dataSet = env.generateSequence(15,106)
                .setParallelism(3);
        dataSet.print();
  • 这里使用ExecutionEnvironment的generateSequence方法创建了带NumberSequenceIterator的ParallelIteratorInputFormat

ParallelIteratorInputFormat

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java

/**
 * An input format that generates data in parallel through a {@link SplittableIterator}.
 */
@PublicEvolving
public class ParallelIteratorInputFormat<T> extends GenericInputFormat<T> {

    private static final long serialVersionUID = 1L;

    private final SplittableIterator<T> source;

    private transient Iterator<T> splitIterator;

    public ParallelIteratorInputFormat(SplittableIterator<T> iterator) {
        this.source = iterator;
    }

    @Override
    public void open(GenericInputSplit split) throws IOException {
        super.open(split);

        this.splitIterator = this.source.getSplit(split.getSplitNumber(), split.getTotalNumberOfSplits());
    }

    @Override
    public boolean reachedEnd() {
        return !this.splitIterator.hasNext();
    }

    @Override
    public T nextRecord(T reuse) {
        return this.splitIterator.next();
    }
}
  • ParallelIteratorInputFormat继承了GenericInputFormat类,而GenericInputFormat类底下还有其他四个子类,分别是CRowValuesInputFormat、CollectionInputFormat、IteratorInputFormat、ValuesInputFormat,它们有一个共同的特点就是都实现了NonParallelInput接口

NonParallelInput

flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/NonParallelInput.java

/**
 * This interface acts as a marker for input formats for inputs which cannot be split.
 * Data sources with a non-parallel input formats are always executed with a parallelism
 * of one.
 * 
 * @see InputFormat
 */
@Public
public interface NonParallelInput {
}
  • 这个接口没有定义任何方法,仅仅是一个标识,表示该InputFormat是否支持split

GenericInputFormat.createInputSplits

flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/GenericInputFormat.java

    @Override
    public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
        if (numSplits < 1) {
            throw new IllegalArgumentException("Number of input splits has to be at least 1.");
        }

        numSplits = (this instanceof NonParallelInput) ? 1 : numSplits;
        GenericInputSplit[] splits = new GenericInputSplit[numSplits];
        for (int i = 0; i < splits.length; i++) {
            splits[i] = new GenericInputSplit(i, numSplits);
        }
        return splits;
    }
  • GenericInputFormat的createInputSplits方法对输入的numSplits进行了限制,如果小于1则抛出IllegalArgumentException异常,如果当前InputFormat有实现NonParallelInput接口,则将numSplits重置为1

ExecutionEnvironment.fromParallelCollection

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java

    /**
     * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
     * framework to create a parallel data source that returns the elements in the iterator.
     *
     * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data
     * returned by the iterator must be given explicitly in the form of the type class (this is due to the
     * fact that the Java compiler erases the generic type information).
     *
     * @param iterator The iterator that produces the elements of the data set.
     * @param type The class of the data produced by the iterator. Must not be a generic class.
     * @return A DataSet representing the elements in the iterator.
     *
     * @see #fromParallelCollection(SplittableIterator, TypeInformation)
     */
    public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, Class<X> type) {
        return fromParallelCollection(iterator, TypeExtractor.getForClass(type));
    }

    /**
     * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
     * framework to create a parallel data source that returns the elements in the iterator.
     *
     * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data
     * returned by the iterator must be given explicitly in the form of the type information.
     * This method is useful for cases where the type is generic. In that case, the type class
     * (as given in {@link #fromParallelCollection(SplittableIterator, Class)} does not supply all type information.
     *
     * @param iterator The iterator that produces the elements of the data set.
     * @param type The TypeInformation for the produced data set.
     * @return A DataSet representing the elements in the iterator.
     *
     * @see #fromParallelCollection(SplittableIterator, Class)
     */
    public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type) {
        return fromParallelCollection(iterator, type, Utils.getCallLocationName());
    }

    // private helper for passing different call location names
    private <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type, String callLocationName) {
        return new DataSource<>(this, new ParallelIteratorInputFormat<>(iterator), type, callLocationName);
    }

    /**
     * Creates a new data set that contains a sequence of numbers. The data set will be created in parallel,
     * so there is no guarantee about the order of the elements.
     *
     * @param from The number to start at (inclusive).
     * @param to The number to stop at (inclusive).
     * @return A DataSet, containing all number in the {@code [from, to]} interval.
     */
    public DataSource<Long> generateSequence(long from, long to) {
        return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, Utils.getCallLocationName());
    }
  • ExecutionEnvironment的fromParallelCollection方法,针对SplittableIterator类型的iterator,会创建ParallelIteratorInputFormat;generateSequence方法也调用了fromParallelCollection方法,它创建的是NumberSequenceIterator(是SplittableIterator的子类)

SplittableIterator

flink-core-1.6.2-sources.jar!/org/apache/flink/util/SplittableIterator.java

/**
 * Abstract base class for iterators that can split themselves into multiple disjoint
 * iterators. The union of these iterators returns the original iterator values.
 *
 * @param <T> The type of elements returned by the iterator.
 */
@Public
public abstract class SplittableIterator<T> implements Iterator<T>, Serializable {

    private static final long serialVersionUID = 200377674313072307L;

    /**
     * Splits this iterator into a number disjoint iterators.
     * The union of these iterators returns the original iterator values.
     *
     * @param numPartitions The number of iterators to split into.
     * @return An array with the split iterators.
     */
    public abstract Iterator<T>[] split(int numPartitions);

    /**
     * Splits this iterator into <i>n</i> partitions and returns the <i>i-th</i> partition
     * out of those.
     *
     * @param num The partition to return (<i>i</i>).
     * @param numPartitions The number of partitions to split into (<i>n</i>).
     * @return The iterator for the partition.
     */
    public Iterator<T> getSplit(int num, int numPartitions) {
        if (numPartitions < 1 || num < 0 || num >= numPartitions) {
            throw new IllegalArgumentException();
        }

        return split(numPartitions)[num];
    }

    /**
     * The maximum number of splits into which this iterator can be split up.
     *
     * @return The maximum number of splits into which this iterator can be split up.
     */
    public abstract int getMaximumNumberOfSplits();
}
  • SplittableIterator是个抽象类,它定义了抽象方法split以及getMaximumNumberOfSplits;它有两个实现类,分别是LongValueSequenceIterator以及NumberSequenceIterator,这里我们看下NumberSequenceIterator

NumberSequenceIterator

flink-core-1.6.2-sources.jar!/org/apache/flink/util/NumberSequenceIterator.java

/**
 * The {@code NumberSequenceIterator} is an iterator that returns a sequence of numbers (as {@code Long})s.
 * The iterator is splittable (as defined by {@link SplittableIterator}, i.e., it can be divided into multiple
 * iterators that each return a subsequence of the number sequence.
 */
@Public
public class NumberSequenceIterator extends SplittableIterator<Long> {

    private static final long serialVersionUID = 1L;

    /** The last number returned by the iterator. */
    private final long to;

    /** The next number to be returned. */
    private long current;


    /**
     * Creates a new splittable iterator, returning the range [from, to].
     * Both boundaries of the interval are inclusive.
     *
     * @param from The first number returned by the iterator.
     * @param to The last number returned by the iterator.
     */
    public NumberSequenceIterator(long from, long to) {
        if (from > to) {
            throw new IllegalArgumentException("The 'to' value must not be smaller than the 'from' value.");
        }

        this.current = from;
        this.to = to;
    }


    @Override
    public boolean hasNext() {
        return current <= to;
    }

    @Override
    public Long next() {
        if (current <= to) {
            return current++;
        } else {
            throw new NoSuchElementException();
        }
    }

    @Override
    public NumberSequenceIterator[] split(int numPartitions) {
        if (numPartitions < 1) {
            throw new IllegalArgumentException("The number of partitions must be at least 1.");
        }

        if (numPartitions == 1) {
            return new NumberSequenceIterator[] { new NumberSequenceIterator(current, to) };
        }

        // here, numPartitions >= 2 !!!

        long elementsPerSplit;

        if (to - current + 1 >= 0) {
            elementsPerSplit = (to - current + 1) / numPartitions;
        }
        else {
            // long overflow of the range.
            // we compute based on half the distance, to prevent the overflow.
            // in most cases it holds that: current < 0 and to > 0, except for: to == 0 and current == Long.MIN_VALUE
            // the later needs a special case
            final long halfDiff; // must be positive

            if (current == Long.MIN_VALUE) {
                // this means to >= 0
                halfDiff = (Long.MAX_VALUE / 2 + 1) + to / 2;
            } else {
                long posFrom = -current;
                if (posFrom > to) {
                    halfDiff = to + ((posFrom - to) / 2);
                } else {
                    halfDiff = posFrom + ((to - posFrom) / 2);
                }
            }
            elementsPerSplit = halfDiff / numPartitions * 2;
        }

        if (elementsPerSplit < Long.MAX_VALUE) {
            // figure out how many get one in addition
            long numWithExtra = -(elementsPerSplit * numPartitions) + to - current + 1;

            // based on rounding errors, we may have lost one)
            if (numWithExtra > numPartitions) {
                elementsPerSplit++;
                numWithExtra -= numPartitions;

                if (numWithExtra > numPartitions) {
                    throw new RuntimeException("Bug in splitting logic. To much rounding loss.");
                }
            }

            NumberSequenceIterator[] iters = new NumberSequenceIterator[numPartitions];
            long curr = current;
            int i = 0;
            for (; i < numWithExtra; i++) {
                long next = curr + elementsPerSplit + 1;
                iters[i] = new NumberSequenceIterator(curr, next - 1);
                curr = next;
            }
            for (; i < numPartitions; i++) {
                long next = curr + elementsPerSplit;
                iters[i] = new NumberSequenceIterator(curr, next - 1, true);
                curr = next;
            }

            return iters;
        }
        else {
            // this can only be the case when there are two partitions
            if (numPartitions != 2) {
                throw new RuntimeException("Bug in splitting logic.");
            }

            return new NumberSequenceIterator[] {
                new NumberSequenceIterator(current, current + elementsPerSplit),
                new NumberSequenceIterator(current + elementsPerSplit, to)
            };
        }
    }

    @Override
    public int getMaximumNumberOfSplits() {
        if (to >= Integer.MAX_VALUE || current <= Integer.MIN_VALUE || to - current + 1 >= Integer.MAX_VALUE) {
            return Integer.MAX_VALUE;
        }
        else {
            return (int) (to - current + 1);
        }
    }

    //......
}
  • NumberSequenceIterator的构造器提供了from及to两个参数,它内部有一个current值,初始的时候等于from
  • split方法首先根据numPartitions,来计算elementsPerSplit,当to - current + 1 >= 0时,计算公式为(to - current + 1) / numPartitions
  • 之后根据计算出来的elementsPerSplit来计算numWithExtra,这是因为计算elementsPerSplit的时候用的是取整操作,如果每一批都按elementsPerSplit,可能存在多余的,于是就算出这个多余的numWithExtra,如果它大于numPartitions,则对elementsPerSplit增加1,然后对numWithExtra减去numPartitions
  • 最后就是先根据numWithExtra来循环分配前numWithExtra个批次,将多余的numWithExtra平均分配给前numWithExtra个批次;numWithExtra之后到numPartitions的批次,就正常的使用from + elementsPerSplit -1来计算to
  • getMaximumNumberOfSplits则是返回可以split的最大数量,(to >= Integer.MAX_VALUE || current <= Integer.MIN_VALUE || to - current + 1 >= Integer.MAX_VALUE)的条件下返回Integer.MAX_VALUE,否则返回(int) (to - current + 1)

小结

  • GenericInputFormat类底下有五个子类,除了ParallelIteratorInputFormat外,其他的分别是CRowValuesInputFormat、CollectionInputFormat、IteratorInputFormat、ValuesInputFormat,后面这四个子类有一个共同的特点就是都实现了NonParallelInput接口
  • GenericInputFormat的createInputSplits会对输入的numSplits进行限制,如果是NonParallelInput类型的,则强制重置为1
  • NumberSequenceIterator是SplittableIterator的一个实现类,在ExecutionEnvironment的fromParallelCollection方法,generateSequence方法(它创建的是NumberSequenceIterator),针对SplittableIterator类型的iterator,创建ParallelIteratorInputFormat;而NumberSequenceIterator的split方法,它先计算elementsPerSplit,然后计算numWithExtra,把numWithExtra均分到前面几个批次,最后在按elementsPerSplit均分剩余的批次

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 序 本文主要研究一下flink的SourceFunction 实例 这里通过addSource方法来添加自定义的S...
    go4it阅读 8,704评论 0 3
  • 随着大数据 2.0 时代悄然到来,大数据从简单的批处理扩展到了实时处理、流处理、交互式查询和机器学习应用。近年来涌...
    Java大生阅读 2,106评论 0 6
  • 序 本文主要研究一下flink的BoltWrapper BoltWrapper flink-storm_2.11-...
    go4it阅读 443评论 0 1
  • Flink初体验 安装 官网:http://flink.apache.org/downloads.html 可以看...
    it_zzy阅读 29,794评论 0 10