聊聊flink KeyedStream的KeySelector

本文主要研究一下flink KeyedStream的KeySelector

KeyedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

@Public
public class KeyedStream<T, KEY> extends DataStream<T> {

    /**
     * The key selector that can get the key by which the stream if partitioned from the elements.
     */
    private final KeySelector<T, KEY> keySelector;

    /** The type of the key by which the stream is partitioned. */
    private final TypeInformation<KEY> keyType;

    /**
     * Creates a new {@link KeyedStream} using the given {@link KeySelector}
     * to partition operator state by key.
     *
     * @param dataStream
     *            Base stream of data
     * @param keySelector
     *            Function for determining state partitions
     */
    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
        this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
    }

    /**
     * Creates a new {@link KeyedStream} using the given {@link KeySelector}
     * to partition operator state by key.
     *
     * @param dataStream
     *            Base stream of data
     * @param keySelector
     *            Function for determining state partitions
     */
    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
        this(
            dataStream,
            new PartitionTransformation<>(
                dataStream.getTransformation(),
                new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
            keySelector,
            keyType);
    }

    /**
     * Creates a new {@link KeyedStream} using the given {@link KeySelector} and {@link TypeInformation}
     * to partition operator state by key, where the partitioning is defined by a {@link PartitionTransformation}.
     *
     * @param stream
     *            Base stream of data
     * @param partitionTransformation
     *            Function that determines how the keys are distributed to downstream operator(s)
     * @param keySelector
     *            Function to extract keys from the base stream
     * @param keyType
     *            Defines the type of the extracted keys
     */
    @Internal
    KeyedStream(
        DataStream<T> stream,
        PartitionTransformation<T> partitionTransformation,
        KeySelector<T, KEY> keySelector,
        TypeInformation<KEY> keyType) {

        super(stream.getExecutionEnvironment(), partitionTransformation);
        this.keySelector = clean(keySelector);
        this.keyType = validateKeyType(keyType);
    }

    //......
}
  • 这里可以看到KeyedStream的不同构造器中都需要一个KeySelector类型的参数

KeySelector

flink-core-1.7.0-sources.jar!/org/apache/flink/api/java/functions/KeySelector.java

@Public
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {

    /**
     * User-defined function that deterministically extracts the key from an object.
     *
     * <p>For example for a class:
     * <pre>
     *  public class Word {
     *      String word;
     *      int count;
     *  }
     * </pre>
     * The key extractor could return the word as
     * a key to group all Word objects by the String they contain.
     *
     * <p>The code would look like this
     * <pre>
     *  public String getKey(Word w) {
     *      return w.word;
     *  }
     * </pre>
     *
     * @param value The object to get the key from.
     * @return The extracted key.
     *
     * @throws Exception Throwing an exception will cause the execution of the respective task to fail,
     *                   and trigger recovery or cancellation of the program.
     */
    KEY getKey(IN value) throws Exception;
}
  • KeySelector接口继承了Function接口,定义了getKey方法,用于从IN类型中提取出KEY

DataStream.keyBy

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

    /**
     * It creates a new {@link KeyedStream} that uses the provided key for partitioning
     * its operator states.
     *
     * @param key
     *            The KeySelector to be used for extracting the key for partitioning
     * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
     */
    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
        Preconditions.checkNotNull(key);
        return new KeyedStream<>(this, clean(key));
    }

    /**
     * It creates a new {@link KeyedStream} that uses the provided key with explicit type information
     * for partitioning its operator states.
     *
     * @param key The KeySelector to be used for extracting the key for partitioning.
     * @param keyType The type information describing the key type.
     * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
     */
    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
        Preconditions.checkNotNull(key);
        Preconditions.checkNotNull(keyType);
        return new KeyedStream<>(this, clean(key), keyType);
    }

    /**
     * Partitions the operator state of a {@link DataStream} by the given key positions.
     *
     * @param fields
     *            The position of the fields on which the {@link DataStream}
     *            will be grouped.
     * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
     */
    public KeyedStream<T, Tuple> keyBy(int... fields) {
        if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
            return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
        } else {
            return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
        }
    }

    /**
     * Partitions the operator state of a {@link DataStream} using field expressions.
     * A field expression is either the name of a public field or a getter method with parentheses
     * of the {@link DataStream}'s underlying type. A dot can be used to drill
     * down into objects, as in {@code "field1.getInnerField2()" }.
     *
     * @param fields
     *            One or more field expressions on which the state of the {@link DataStream} operators will be
     *            partitioned.
     * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
     **/
    public KeyedStream<T, Tuple> keyBy(String... fields) {
        return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
    }

    private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
        return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
                getType(), getExecutionConfig())));
    }
  • DataStream的keyBy方法用于将DataStream转换为KeyedStream,该方法有不同的重载
  • 一个是支持变长int数组,这个通常用于简单tuple类型,int为tuple的小标,从0开始,如果是多个int,表示是组合key,比如keyBy(0,1)表示要用tuple的第一个和第二个字段作为key;
  • 一个是支持变长String数组,这个通常用于复杂tuple类型及POJO类型,对于POJO,String用于指定字段名,也支持对象/tuple嵌套属性,比如user.zip,对于对象类型的tuple,f0表示该tuple的第一个字段
  • 一个是支持KeySelector,通过Key Selector Function可以自由指定key,比如从对象提取然后做些处理
  • keyBy(int... fields)及keyBy(String... fields)里头均有调用到私有的keyBy(Keys<T> keys)方法,由于KeyedStream的构造器都需要KeySelector参数,所以该方法最后也是通过KeySelectorUtil.getSelectorForKeys将Keys转换为KeySelector对象

Keys.ExpressionKeys

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/operators/Keys.java

    /**
     * Represents (nested) field access through string and integer-based keys
     */
    public static class ExpressionKeys<T> extends Keys<T> {
        
        public static final String SELECT_ALL_CHAR = "*";
        public static final String SELECT_ALL_CHAR_SCALA = "_";
        private static final Pattern WILD_CARD_REGEX = Pattern.compile("[\\.]?("
                + "\\" + SELECT_ALL_CHAR + "|"
                + "\\" + SELECT_ALL_CHAR_SCALA +")$");

        // Flattened fields representing keys fields
        private List<FlatFieldDescriptor> keyFields;
        private TypeInformation<?>[] originalKeyTypes;

        //......

        /**
         * Create String-based (nested) field expression keys on a composite type.
         */
        public ExpressionKeys(String[] keyExpressions, TypeInformation<T> type) {
            checkNotNull(keyExpressions, "Field expression cannot be null.");

            this.keyFields = new ArrayList<>(keyExpressions.length);

            if (type instanceof CompositeType){
                CompositeType<T> cType = (CompositeType<T>) type;
                this.originalKeyTypes = new TypeInformation<?>[keyExpressions.length];

                // extract the keys on their flat position
                for (int i = 0; i < keyExpressions.length; i++) {
                    String keyExpr = keyExpressions[i];

                    if (keyExpr == null) {
                        throw new InvalidProgramException("Expression key may not be null.");
                    }
                    // strip off whitespace
                    keyExpr = keyExpr.trim();

                    List<FlatFieldDescriptor> flatFields = cType.getFlatFields(keyExpr);

                    if (flatFields.size() == 0) {
                        throw new InvalidProgramException("Unable to extract key from expression '" + keyExpr + "' on key " + cType);
                    }
                    // check if all nested fields can be used as keys
                    for (FlatFieldDescriptor field : flatFields) {
                        if (!field.getType().isKeyType()) {
                            throw new InvalidProgramException("This type (" + field.getType() + ") cannot be used as key.");
                        }
                    }
                    // add flat fields to key fields
                    keyFields.addAll(flatFields);

                    String strippedKeyExpr = WILD_CARD_REGEX.matcher(keyExpr).replaceAll("");
                    if (strippedKeyExpr.isEmpty()) {
                        this.originalKeyTypes[i] = type;
                    } else {
                        this.originalKeyTypes[i] = cType.getTypeAt(strippedKeyExpr);
                    }
                }
            }
            else {
                if (!type.isKeyType()) {
                    throw new InvalidProgramException("This type (" + type + ") cannot be used as key.");
                }

                // check that all key expressions are valid
                for (String keyExpr : keyExpressions) {
                    if (keyExpr == null) {
                        throw new InvalidProgramException("Expression key may not be null.");
                    }
                    // strip off whitespace
                    keyExpr = keyExpr.trim();
                    // check that full type is addressed
                    if (!(SELECT_ALL_CHAR.equals(keyExpr) || SELECT_ALL_CHAR_SCALA.equals(keyExpr))) {
                        throw new InvalidProgramException(
                            "Field expression must be equal to '" + SELECT_ALL_CHAR + "' or '" + SELECT_ALL_CHAR_SCALA + "' for non-composite types.");
                    }
                    // add full type as key
                    keyFields.add(new FlatFieldDescriptor(0, type));
                }
                this.originalKeyTypes = new TypeInformation[] {type};
            }
        }

        //......
    }
  • ExpressionKeys是Keys里头的一个静态类,它继承了Keys对象;keyBy(int... fields)及keyBy(String... fields)里头均有通过new Keys.ExpressionKeys,将fields转换为Keys.ExpressionKeys,最后调用私有的keyBy(Keys<T> keys)方法

KeySelectorUtil.getSelectorForKeys

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/keys/KeySelectorUtil.java

@Internal
public final class KeySelectorUtil {

    public static <X> KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
        if (!(typeInfo instanceof CompositeType)) {
            throw new InvalidTypesException(
                    "This key operation requires a composite type such as Tuples, POJOs, or Case Classes.");
        }

        CompositeType<X> compositeType = (CompositeType<X>) typeInfo;

        int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
        int numKeyFields = logicalKeyPositions.length;

        TypeInformation<?>[] typeInfos = keys.getKeyFieldTypes();
        // use ascending order here, the code paths for that are usually a slight bit faster
        boolean[] orders = new boolean[numKeyFields];
        for (int i = 0; i < numKeyFields; i++) {
            orders[i] = true;
        }

        TypeComparator<X> comparator = compositeType.createComparator(logicalKeyPositions, orders, 0, executionConfig);
        return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos));
    }

    //......
}
  • KeySelectorUtil.getSelectorForKeys方法用于将Keys转换为KeySelector类型

小结

  • KeyedStream的不同构造器中都需要一个KeySelector参数
  • DataStream的keyBy方法有不同的重载,支持变长int数组,变长String数组以及KeySelector类型
  • keyBy(int... fields)及keyBy(String... fields)里头均有通过new Keys.ExpressionKeys,将fields转换为Keys.ExpressionKeys,最后调用私有的keyBy(Keys<T> keys)方法,该方法通过调用KeySelectorUtil.getSelectorForKeys方法将Keys转换为KeySelector类型

doc

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352