Flink利用State进行统计

一. 托管State

1.1 主程序

public class StateWordCount {
    
    public static void main(String[] args) throws Exception {

        final ParameterTool parameters = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(parameters);

        // Checkpoint
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

        // StateBackend
        StateBackend stateBackend = new FsStateBackend(new Path("file:///E:\\data\\flink"));
        env.setStateBackend(stateBackend);

        env
            .addSource(new SourceFromFile())
            .setParallelism(1)
            .name("demo-source")
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] arr = value.split(",");
                    for (String item : arr) {
                        out.collect(new Tuple2<>(item, 1));
                    }
                }
            })
            .name("demo-flatMap")
            .keyBy(0)
            .flatMap(new WordCountFlatMap())
            .print();

        env.execute("StateWordCount");
    }

}
  1. 构建 StreamExecutionEnvironment;
  2. 启动 Checkpoint,并设置间隔时间;
  3. 设置 StateBackend 为 FsStateBackend;
  4. 从文件读取数据开始统计。

1.2 Source

public class SourceFromFile extends RichSourceFunction<String> {
    private volatile Boolean isRunning = true;

    @Override
    public void run(SourceContext ctx) throws Exception {
        BufferedReader bufferedReader = new BufferedReader(new FileReader("E:\\documents\\test.txt"));
        while (isRunning) {
            String line = bufferedReader.readLine();
            if (StringUtils.isBlank(line)) {
                continue;
            }
            ctx.collect(line);
            TimeUnit.SECONDS.sleep(60);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

1.3 Process

public class WordCountFlatMap extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private ValueState<Tuple2<String, Integer>> valueState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        // 配置 StateTTL(TimeToLive)
        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(3))   // 存活时间
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)  // 永远不返回过期的用户数据
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)  // 每次写操作创建和更新时,修改上次访问时间戳
            .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime) // 目前只支持 ProcessingTime
            .build();

        // 创建 ValueStateDescriptor
        ValueStateDescriptor descriptor = new ValueStateDescriptor("wordCountStateDesc",
            TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
        
        // 激活 StateTTL
        descriptor.enableTimeToLive(ttlConfig);

        // 基于 ValueStateDescriptor 创建 ValueState
        valueState = getRuntimeContext().getState(descriptor);

    }

    @Override
    public void flatMap(Tuple2<String, Integer> input, Collector<Tuple2<String, Integer>> collector) throws Exception {
        Tuple2<String, Integer> currentState = valueState.value();
        
        // 初始化 ValueState 值
        if (null == currentState) {
            currentState = new Tuple2<>(input.f0, 0);
        }

        Tuple2<String, Integer> newState = new Tuple2<>(currentState.f0, currentState.f1 + input.f1);

        // 更新 ValueState 值
        valueState.update(newState);

        collector.collect(newState);
    }
}

继承 RichFlatMapFunction

  1. 初始化阶段执行 open 方法:
    1.1 配置 StateTTL(TimeToLive),具体见注释
    1.2 创建 ValueStateDescriptor,基于 ValueStateDescriptor 创建 ValueState

  2. 处理数据执行 flatMap 方法:
    2.1 初始化 ValueState
    2.2 更新 ValueState
    2.3 返回结果

注:KeyedStream 中,每个 Key 对应一个 State

1.4 测试

输入:

aaa,bbb
aaa
aaa
aaa
aaa
aaa
aaa,bbb
aaa,bbb
aaa,bbb

输出:

3> (aaa,1)
2> (bbb,1)
3> (aaa,2)
3> (aaa,3)
3> (aaa,4)
3> (aaa,5)
3> (aaa,6)
3> (aaa,7)
2> (bbb,1)
2> (bbb,2)
3> (aaa,8)
3> (aaa,9)
2> (bbb,3)

二. 自主操作State

2.1 Scala写法

2.1.1 主程序

object StateWordCount {

  def main(args: Array[String]): Unit = {
    val parameters: ParameterTool = ParameterTool.fromArgs(args)
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamEnv.getConfig.setGlobalJobParameters(parameters)

    // Checkpoint
    streamEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)

    // StateBackend
    val stateBackend: StateBackend = new FsStateBackend(new Path("file:///E:\\data\\flink"))
    streamEnv.setStateBackend(stateBackend)

    streamEnv
      .addSource(new SourceFromFile)
      .setParallelism(1)
      .flatMap(new WordCountFlatMapFunction)
      .keyBy(_._1)
      .process(new WordCountProcessFunction())
      .print()

    streamEnv.execute()
  }
}

2.1.2 Source

与上面一样,不赘述。

2.1.3 Process

class WordCountFlatMapFunction extends RichFlatMapFunction[String, (String, Integer)] {
  override def flatMap(value: String, out: Collector[(String, Integer)]): Unit = {
    val arr: Array[String] = value.split(",")
    for (item <- arr) {
      out.collect(Tuple2.apply(item, 1))
    }
  }
}

继承 RichFlatMapFunction
功能与 FlatMap 一样。

class WordCountProcessFunction extends KeyedProcessFunction[String, (String, Integer), (String, Integer)] {

  private var valueState: ValueState[(String, Integer)] = _
  private var timerState: ValueState[Long] = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)

    var valueStateDesc: ValueStateDescriptor[(String, Integer)] = new ValueStateDescriptor[(String, Integer)]("valueStateDesc",
      TypeInformation.of(classOf[(String, Integer)]))
    valueState = getRuntimeContext.getState(valueStateDesc)

    var timerStateDesc: ValueStateDescriptor[Long] = new ValueStateDescriptor[Long]("timerStateDesc",
      TypeInformation.of(classOf[Long]))
    timerState = getRuntimeContext.getState(timerStateDesc)
  }

  override def processElement(value: (String, Integer),
                              ctx: KeyedProcessFunction[String, (String, Integer), (String, Integer)]#Context,
                              out: Collector[(String, Integer)]): Unit = {
    var currentState: (String, Integer) = valueState.value()

    if (null == currentState) {
      currentState = (value._1, 0)

      // 只在 ValueState 里没有值的时候才更新定时器触发时间
      val ttlTime: Long = System.currentTimeMillis() + 3 * 60 * 1000;
      ctx.timerService().registerProcessingTimeTimer(ttlTime)

      // 保存定时器触发时间到状态中
      timerState.update(ttlTime)
    }

    var newState: (String, Integer) = (currentState._1, currentState._2 + value._2)
    valueState.update(newState)

    out.collect(newState)
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, Integer), (String, Integer)]#OnTimerContext, out: Collector[(String, Integer)]): Unit = {
    super.onTimer(timestamp, ctx, out)

    System.out.println("clear...")

    valueState.clear()

    // 清除定时器
    val ttlTime = timerState.value()
    ctx.timerService().deleteProcessingTimeTimer(ttlTime)
  }
}

继承 KeyedProcessFunction

  1. 初始化阶段执行 open 方法:
    1.1 创建 ValueStateDescriptor
    1.2 基于 ValueStateDescriptor 初始化 valueState

  2. 处理数据执行 processElement 方法:
    2.1 在 valueState 为空时,初始化 currentState,并注册定时器 Timer 触发时间,同时记录定时器 Timer 触发时间到状态 timerState 中
    2.2 计算出新的 State 值并更新到 valueState
    2.3 返回结果

  3. 定时器 Timer 触发,执行 onTimer 方法:
    3.1 清除 valueState 里的值
    3.2 从状态 timerState 中取出保存的时间值,清除该时间值对应的定时器 Timer

2.1.4 测试

输入:

aaa,bbb
aaa
aaa
aaa
aaa
aaa
aaa,bbb
aaa,bbb
aaa,bbb

输出:

3> (aaa,1)
2> (bbb,1)
3> (aaa,2)
3> (aaa,3)
3> (aaa,4)
clear...
clear...
3> (aaa,1)
3> (aaa,2)
3> (aaa,3)
2> (bbb,1)
3> (aaa,4)
2> (bbb,2)
clear...
2> (bbb,3)
3> (aaa,1)
clear...
clear...

可以看出:
对于 KeyState,每个Key都对应一个ValueState。
根据代码中的逻辑:当各ValueState为空被初始化时,会更新各自对应的定时器触发时间,当各自的触发器被触发的时候执行 onTimer 方法,清除对应的 ValueState 里的值。

即:
一个key --> 一个ValueState(MapState类似) --> 一个TTL Timer
当某个 TTL Timer 被触发而调用 onTimer 方法时,在 onTimer 方法内利用 ctx.getCurrentKey() 可取到触发该函数的那个 key,此时如果调用valueState.clear()则会清除该key对应的ValueState。

注意:
这边的逻辑和上面的示例有所区别:

  • 上面示例中配置StateTtlConfig.UpdateTypeOnCreateAndWrite,则每次更新 ValueState 时都会更新 TTL Time;
  • 此处示例中只在 ValueState 为空时才更新 TTL Time,并不是每次更新 ValueState 时都更新 TTL Time。

2.2 Java写法

2.2.1 主程序

    public static void main(String[] args) throws Exception {

        final ParameterTool parameters = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(parameters);

        // Checkpoint
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

        // StateBackend
        StateBackend stateBackend = new FsStateBackend(new Path("file:///E:\\data\\flink"));
        env.setStateBackend(stateBackend);

        env
            .addSource(new SourceFromFile())
            .setParallelism(1)
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] arr = value.split(",");
                    for (String item : arr) {
                        out.collect(new Tuple2<>(item, 1));
                    }
                }
            })
            .keyBy(0)
            .process(new WordCountProcess())
            .print();

        env.execute("StateWordCount");
    }

2.2.2 Source

与上面一样,不赘述。

2.2.3 Process

public class WordCountProcess extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private ValueState<Tuple2<String, Integer>> valueState;
    private ValueState<Long> timerState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        ValueStateDescriptor<Tuple2<String, Integer>> valueStateDescriptor = new ValueStateDescriptor<>("valueStateDesc",
            TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
            }));
        valueState = getRuntimeContext().getState(valueStateDescriptor);

        ValueStateDescriptor<Long> timerStateDescriptor = new ValueStateDescriptor<>("timerStateDesc",
            TypeInformation.of(new TypeHint<Long>() {
            }));
        timerState = getRuntimeContext().getState(timerStateDescriptor);
    }


    @Override
    public void processElement(Tuple2<String, Integer> input, Context ctx, Collector<Tuple2<String, Integer>> collector) throws Exception {
        Tuple2<String, Integer> currentState = valueState.value();

        // 初始化ValueState值
        if (null == currentState) {
            currentState = new Tuple2<>(input.f0, 0);
            Long ttlTime = System.currentTimeMillis() + 3 * 60 * 1000;
            ctx.timerService().registerProcessingTimeTimer(ttlTime);

            // 保存定时器触发时间到状态中
            timerState.update(ttlTime)
        }

        Tuple2<String, Integer> newState = new Tuple2<>(currentState.f0, currentState.f1 + input.f1);

        // 更新ValueState值
        valueState.update(newState);

        collector.collect(newState);
    }


    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
        super.onTimer(timestamp, ctx, out);

        System.out.println("clear...");

        valueState.clear();

        // 清除定时器
        val ttlTime = timerState.value()
        ctx.timerService().deleteProcessingTimeTimer(ttlTime);
    }
}

2.2.4 测试

输入:

aaa,bbb
aaa
aaa
aaa
aaa
aaa
aaa,bbb
aaa,bbb
aaa,bbb

输出:

3> (aaa,1)
2> (bbb,1)
3> (aaa,2)
3> (aaa,3)
3> (aaa,4)
clear...
clear...
3> (aaa,1)
3> (aaa,2)
3> (aaa,3)
2> (bbb,1)
2> (bbb,2)
3> (aaa,4)
clear...
2> (bbb,3)
3> (aaa,1)
clear...
clear...

逻辑与上面 Scala 写法一样,测试结果也保持一致。

需要注意一点:keyBy方法的区别:

  • keyBy(int... fields),返回的是 KeyedStream<T, Tuple>,其 key 固定为 Tuple 类型,(此例为 KeyedStream<Tuple2<String, Integer>, Tuple>),对应 Process 中继承泛型为:KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>

  • keyBy(KeySelector<T, K> key),返回的是 KeyedStream<T, K>,其 key 自定义为 K 类型(假设为String类型),则对应 Process 中继承泛型为:KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容