用两阶段聚合法解决Flink keyBy()算子倾斜

这是上周出现的问题了,简单做个记录。

有一个按平台类型实时统计用户活跃的程序,代码框架如下。

    DataStream<String> sourceStream = env
      .addSource(new FlinkKafkaConsumer011<>(
        // ...
      ));

    DataStream<UserActionRecord> watermarkedStream = sourceStream
      .map(message -> JSON.parseObject(message, UserActionRecord.class))
      .assignTimestampsAndWatermarks(
        // ...
      );

    WindowedStream<UserActionRecord, Tuple, TimeWindow> windowedStream = watermarkedStream
      .keyBy("platform")
      .window(TumblingEventTimeWindows.of(Time.minutes(1)));

    DataStream<WindowedViewSum> minutelyPartialAggStream = windowedStream
      .aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc());

    minutelyPartialAggStream
      .keyBy("windowEndTimestamp")
      .process(new OutputPvUvProcessFunc(), TypeInformation.of(OutputPvUvResult.class))
      .addSink(new RedisSink<>(jedisPoolConfig, new PvUvStringRedisMapper()))
      .setParallelism(1);

就是水印→开窗→聚合→输出的经典套路。程序正常运行一段时间之后,连续报检查点超时和back pressure。

通过上面的截图,容易看出是keyBy("platform")导致大部分数据集中在了一个SubTask上,处理不过来了。由于该程序只涉及聚合,没有join,因此用两阶段聚合法很合适。在之前编写Spark程序时,我们也经常这样解决数据倾斜的问题,示例思路如下图所示。

图来自美团技术团队的博客:https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

接下来修改代码,先为platform字段增加一个随机后缀(前缀后缀无所谓)。

    sourceStream
      .map(message -> {
        UserActionRecord record = JSON.parseObject(message, UserActionRecord.class);
        String platform = record.getPlatform();
        record.setPlatform(platform + "@" + ThreadLocalRandom.current().nextInt(20));
        return record;
      })

这里用ThreadLocalRandom来产生随机数,后面会写文章来唠唠它。

聚合函数的写法如下:

  public static final class ViewAggregateFunc
    implements AggregateFunction<UserActionRecord, ViewAccumulator, ViewAccumulator> {
    private static final long serialVersionUID = 1L;

    @Override
    public ViewAccumulator createAccumulator() {
      return new ViewAccumulator();
    }

    @Override
    public ViewAccumulator add(UserActionRecord record, ViewAccumulator acc) {
      if (acc.getKey().isEmpty()) {
        acc.setKey(record.getPlatform());
      }
      acc.addCount(1);
      acc.addUserId(record.getUserId());
      return acc;
    }

    @Override
    public ViewAccumulator getResult(ViewAccumulator acc) {
      return acc;
    }

    @Override
    public ViewAccumulator merge(ViewAccumulator acc1, ViewAccumulator acc2) {
      if (acc1.getKey().isEmpty()) {
        acc1.setKey(acc2.getKey());
      }
      acc1.addCount(acc2.getCount());
      acc1.addUserIds(acc2.getUserIds());
      return acc1;
    }
  }

累加器类ViewAccumulator的写法如下:

public class ViewAccumulator extends Tuple3<String, Integer, Set<String>> {
  private static final long serialVersionUID = 1L;

  public ViewAccumulator() { super("", 0, new HashSet<>(2048)); }

  public ViewAccumulator(String key, int count, Set<String> userIds) { super(key, count, userIds); }

  public String getKey() { return this.f0; }

  public void setKey(String key) { this.f0 = key; }

  public int getCount() { return this.f1; }

  public void addCount(int count) { this.f1 += count; }

  public Set<String> getUserIds() { return this.f2; }

  public void addUserId(String userId) { this.f2.add(userId); }

  public void addUserIds(Set<String> userIds) { this.f2.addAll(userIds); }
}

因为是按分钟统计UV,所以用较大的HashSet还是没有瓶颈的。如果窗口更长或者数据量非常大,就要考虑HyperLogLog了。

接下来在WindowFunction输出窗口结果时,把后缀去掉。

  public static final class ViewSumWindowFunc
    implements WindowFunction<ViewAccumulator, WindowedViewSum, Tuple, TimeWindow> {
    private static final long serialVersionUID = 1L;

    @Override
    public void apply(
      Tuple key,
      TimeWindow window,
      Iterable<ViewAccumulator> accs,
      Collector<WindowedViewSum> out) throws Exception {
      ViewAccumulator acc = accs.iterator().next();
      String type = acc.getKey();

      out.collect(new WindowedViewSum(
        type.substring(0, type.indexOf("@")),
        window.getStart(),
        window.getEnd(),
        acc.getCount(),
        acc.getUserIds()
      ));
    }
  }

最后的ProcessFunction输出最终结果时,将各条记录中的PV简单相加,UV则是将各个用户ID的集合拼在一起并计数得到。状态的存储可以用AggregatingState,但是它的文档基本为0,不想冒这个险,所以我们还是选择了传统的ListState,并自己做聚合。代码如下。

  public static final class OutputPvUvProcessFunc
    extends KeyedProcessFunction<Tuple, WindowedViewSum, OutputPvUvResult> {
    private static final long serialVersionUID = 1L;
    private static final String TIME_MINUTE_FORMAT = "yyyy-MM-dd HH:mm";
    private ListState<WindowedViewSum> state;

    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);

      state = this.getRuntimeContext().getListState(new ListStateDescriptor<>(
        "state_windowed_pvuv_sum",
        WindowedViewSum.class
      ));
    }

    @Override
    public void processElement(WindowedViewSum input, Context ctx, Collector<OutputPvUvResult> out) throws Exception {
      state.add(input);
      ctx.timerService().registerEventTimeTimer(input.getWindowEndTimestamp() + 1);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputPvUvResult> out) throws Exception {
      Map<String, Tuple2<Integer, Set<String>>> result = new HashMap<>();
      String timeInMinute = "";

      for (WindowedViewSum viewSum : state.get()) {
        if (timeInMinute.isEmpty()) {
          timeInMinute = new LocalDateTime(viewSum.getWindowStartTimestamp()).toString(TIME_MINUTE_FORMAT);
        }
        String key = viewSum.getKey();
        if (!result.containsKey(key)) {
          result.put(key, new Tuple2<>(0, new HashSet<>(2048)));
        }
        Tuple2<Integer, Set<String>> puv = result.get(key);
        puv.f0 += viewSum.getPv();
        puv.f1.addAll(viewSum.getUserIds());
      }

      JSONObject json = new JSONObject();
      for (Entry<String, Tuple2<Integer, Set<String>>> entry : result.entrySet()) {
        String key = entry.getKey();
        Tuple2<Integer, Set<String>> value = entry.getValue();

        json.put(key.concat("_pv"), value.f0);
        json.put(key.concat("_uv"), value.f1.size());
      }
      json.put("time", timeInMinute.substring(11));

      state.clear();
      out.collect(new OutputPvUvResult(
        timeInMinute.substring(0, 10),
        timeInMinute.substring(11),
        json.toJSONString()
      ));
    }
  }

这样处理之后,程序再也没有出过问题。查看Web UI,虽然数据在SubTask之间的分布仍然不太均匀(因为keyBy()算子是通过key的hash code来分发的),但是完全在可接受的范围内了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容