Flink源码阅读之AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks的区别

任务提交任务执行分别参考后面文章。

在这里插入图片描述

从OneInputStreamTask入口,init()方法会初始化StreamInputProcessor对象,

public void init() throws Exception {
   StreamConfig configuration = getConfiguration();

   TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
   int numberOfInputs = configuration.getNumberOfInputs();

   if (numberOfInputs > 0) {
      InputGate[] inputGates = getEnvironment().getAllInputGates();

      inputProcessor = new StreamInputProcessor<>(
            inputGates,
            inSerializer,
            this,
            configuration.getCheckpointMode(),
            getCheckpointLock(),
            getEnvironment().getIOManager(),
            getEnvironment().getTaskManagerInfo().getConfiguration(),
            getStreamStatusMaintainer(),
            this.headOperator,
            getEnvironment().getMetricGroup().getIOMetricGroup(),
            inputWatermarkGauge);
   }
   headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
   getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}

run方法中会调用StreamInputProcessor的processInput方法,

protected void run() throws Exception {
   // cache processor reference on the stack, to make the code more JIT friendly
   final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

   while (running && inputProcessor.processInput()) {
      // all the work happens in the "processInput" method
   }
}


public boolean processInput() throws Exception {
   if (isFinished) {
      return false;
   }
   if (numRecordsIn == null) {
      try {
         numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
      } catch (Exception e) {
         LOG.warn("An exception occurred during the metrics setup.", e);
         numRecordsIn = new SimpleCounter();
      }
   }

   while (true) {
      if (currentRecordDeserializer != null) {
         DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

         if (result.isBufferConsumed()) {
            currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
            currentRecordDeserializer = null;
         }

         if (result.isFullRecord()) {
            StreamElement recordOrMark = deserializationDelegate.getInstance();

            if (recordOrMark.isWatermark()) {
               // handle watermark 处理watermark
               statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
               continue;
            } else if (recordOrMark.isStreamStatus()) {
               // handle stream status
               statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
               continue;
            } else if (recordOrMark.isLatencyMarker()) {
               // handle latency marker
               synchronized (lock) {
                  streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
               }
               continue;
            } else {
               // now we can do the actual processing
               //  到这开始处理正常数据
               StreamRecord<IN> record = recordOrMark.asRecord();
               synchronized (lock) {
                  numRecordsIn.inc();
                  streamOperator.setKeyContextElement1(record);
                  streamOperator.processElement(record);
               }
               return true;
            }
         }
      }

      final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
      if (bufferOrEvent != null) {
         if (bufferOrEvent.isBuffer()) {
            currentChannel = bufferOrEvent.getChannelIndex();
            currentRecordDeserializer = recordDeserializers[currentChannel];
            currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
         }
         else {
            // Event received
            final AbstractEvent event = bufferOrEvent.getEvent();
            if (event.getClass() != EndOfPartitionEvent.class) {
               throw new IOException("Unexpected event: " + event);
            }
         }
      }
      else {
         isFinished = true;
         if (!barrierHandler.isEmpty()) {
            throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
         }
         return false;
      }
   }
}

接下来调用OneInputStreamOperator的processElement方法,实现类如下


在这里插入图片描述

常用的是TimestampsAndPunctuatedWatermarksOperator和TimestampsAndPeriodicWatermarksOperator
先看下TimestampsAndPunctuatedWatermarksOperator

public void processElement(StreamRecord<T> element) throws Exception {
   final T value = element.getValue();
   final long newTimestamp = userFunction.extractTimestamp(value,
         element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

   output.collect(element.replace(element.getValue(), newTimestamp));

   final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
   if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) {
      currentWatermark = nextWatermark.getTimestamp();
      output.emitWatermark(nextWatermark);
   }
}

先调用用户自定义的extractTimestamp方法获取时间戳,先判断element有没有时间戳,没有则传入Long.MIN_VALUE

@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long currentTimestamp) {
    System.out.println("=======WatermarkAssigner:" + currentTimestamp);
    try {
        long etlTime = element.f2;
        currentMaxTimestamp = Math.max(etlTime, currentMaxTimestamp);
        return etlTime;
    } catch (Exception e) {
        LOG.error("the  orderTime parse fail! " + element.toString(), e);
    }
    return 0l;
}

将元素的timestamp赋值为eventTime,
element.replace(element.getValue(), newTimestamp)

然后调用自定义的checkAndGetNextWatermark获取下一个时间戳,将newTimestamp传入

@Override
public Watermark checkAndGetNextWatermark(Tuple3<String, String, Long> lastElement, long watermarkTimestamp) {
    System.out.println("========currentMaxTimestamp:" + currentMaxTimestamp);
    return new Watermark(currentMaxTimestamp - DELAY_TIME);
}

所以第一个watermark时间就是第一个元素的eventTime-DELAY_TIME,从第二个开始Math.max(eventTime, currentMaxTimestamp)-DELAY_TIME,调用顺序是先extractTimestamp后
checkAndGetNextWatermark。每个元素调用一次。
测试代码

public class WaterMarkTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(200L);
        env.setParallelism(1);
        env.addSource(new StreamDataSource()).assignTimestampsAndWatermarks(new PunctuatedWatermarkAssigner()).print();
        env.execute();
    }
}

public class PunctuatedWatermarkAssigner implements AssignerWithPunctuatedWatermarks<Tuple3<String, String, Long>> {
    private static final Logger LOG = LoggerFactory.getLogger(PunctuatedWatermarkAssigner.class);
    private Long currentMaxTimestamp = 0l;
    //水印延迟时间
    private static final Long DELAY_TIME = 3 * 1000l;

    /**
     * 再执行该函数,watermarkTimestamp的值是方法extractTimestamp()的返回值
     *
     * @param lastElement        数据流元素
     * @param watermarkTimestamp
     * @return
     */
    @Override
    public Watermark checkAndGetNextWatermark(Tuple3<String, String, Long> lastElement, long watermarkTimestamp) {
        System.out.println("========currentwatermark:" + (currentMaxTimestamp-DELAY_TIME));
        return new Watermark(currentMaxTimestamp - DELAY_TIME);
    }

    /**
     * 先执行该函数,从element中提取时间戳
     *
     * @param element          数据流元素
     * @param currentTimestamp 当前的系统时间
     * @return 数据的事件时间戳,触发器Trigger中的时间也是这个返回值
     */
    @Override
    public long extractTimestamp(Tuple3<String, String, Long> element, long currentTimestamp) {
        System.out.println("=======WatermarkAssigner:" + currentTimestamp);
        try {
            long etlTime = element.f2;
            currentMaxTimestamp = Math.max(etlTime, currentMaxTimestamp);
            return etlTime;
        } catch (Exception e) {
            LOG.error("the  orderTime parse fail! " + element.toString(), e);
        }
        return 0l;
    }
}

结果

=======WatermarkAssigner:-9223372036854775808
(a,1,1551169050000)
========currentwatermark:1551169047000
=======WatermarkAssigner:-9223372036854775808
(aa,33,1551169064001)
========currentwatermark:1551169061001
=======WatermarkAssigner:-9223372036854775808
(a,2,1551169054000)
========currentwatermark:1551169061001
=======WatermarkAssigner:-9223372036854775808
(a,3,1551169064002)
========currentwatermark:1551169061002
=======WatermarkAssigner:-9223372036854775808
(b,5,1551169100000)
========currentwatermark:1551169097000
=======WatermarkAssigner:-9223372036854775808
(a,4,1551169079003)
========currentwatermark:1551169097000
=======WatermarkAssigner:-9223372036854775808
(aa,44,1551169079004)
========currentwatermark:1551169097000
=======WatermarkAssigner:-9223372036854775808
(b,6,1551169108000)
========currentwatermark:1551169105000

再看下TimestampsAndPeriodicWatermarksOperator

@Override
public void processElement(StreamRecord<T> element) throws Exception {
   final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
         element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

   output.collect(element.replace(element.getValue(), newTimestamp));
}

只调用了extractTimestamp,没有调用getCurrentWatermark
同样是先判断当前element有没有timestamp,如果没有则给Long.MIN_VALUE传入extractTimestamp,将返回的eventTime作为时间戳。没有调用getCurrentWatermark方法是因为周期性生成watermark
测试代码

public class WaterMarkTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(200L);//200ms生成一次watermark
        env.setParallelism(1);
        env.addSource(new StreamDataSource()).assignTimestampsAndWatermarks(new PeriodicWatermarkAssigner()).print();
        env.execute();
    }
}

public class PeriodicWatermarkAssigner implements AssignerWithPeriodicWatermarks<Tuple3<String, String, Long>> {
    private static final Logger LOG = LoggerFactory.getLogger(PeriodicWatermarkAssigner.class);
    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private Long currentMaxTimestamp = 0l;
    //水印延迟时间
    private static final Long DELAY_TIME = 0 * 1000l;
    private Watermark watermark = null;

    @Override
    public Watermark getCurrentWatermark() {
        watermark = new Watermark(currentMaxTimestamp - DELAY_TIME);
        System.out.println("currentMaxTimestamp:" + currentMaxTimestamp + " watermark: " + watermark.getTimestamp());
        return watermark;
    }

    @Override
    public long extractTimestamp(Tuple3<String, String, Long> element, long l) {
        try {
            System.out.println("WatermarkAssigner:" + l);
            long etlTimestamp = element.f2;
            currentMaxTimestamp = Math.max(etlTimestamp, currentMaxTimestamp);
            return etlTimestamp;
        } catch (Exception e) {
            LOG.error("the orderTime parse fail! " + element.toString(), e);
        }
        return 0l;
    }
}

结果

WatermarkAssigner:-9223372036854775808
(a,1,1551169050000)
currentMaxTimestamp:1551169050000 watermark: 1551169050000
currentMaxTimestamp:1551169050000 watermark: 1551169050000
currentMaxTimestamp:1551169050000 watermark: 1551169050000
currentMaxTimestamp:1551169050000 watermark: 1551169050000
WatermarkAssigner:-9223372036854775808
(aa,33,1551169064001)
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
WatermarkAssigner:-9223372036854775808
(a,2,1551169054000)
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
currentMaxTimestamp:1551169064001 watermark: 1551169064001
WatermarkAssigner:-9223372036854775808
(a,3,1551169064002)
currentMaxTimestamp:1551169064002 watermark: 1551169064002
currentMaxTimestamp:1551169064002 watermark: 1551169064002
currentMaxTimestamp:1551169064002 watermark: 1551169064002
currentMaxTimestamp:1551169064002 watermark: 1551169064002
currentMaxTimestamp:1551169064002 watermark: 1551169064002
WatermarkAssigner:-9223372036854775808
(b,5,1551169100000)
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
WatermarkAssigner:-9223372036854775808
(a,4,1551169079003)
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
WatermarkAssigner:-9223372036854775808
(aa,44,1551169079004)
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
currentMaxTimestamp:1551169100000 watermark: 1551169100000
WatermarkAssigner:-9223372036854775808
(b,6,1551169108000)
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000
currentMaxTimestamp:1551169108000 watermark: 1551169108000

为何element本来都没有timestamp,都是long的最小值?
那生产实际运行的程序验证一下,结果也是如此

====previousElementTimestamp:-9223372036854775808
orderid =004247789642,orderitemid = 00424778964207,etl_time = 2019-08-27 17:11:48,current wartermark = 2019-08-27 17:11:28
====previousElementTimestamp:-9223372036854775808
orderid =004247sss789642,orderitemid = 00424778964207,etl_time = 2019-08-27 17:11:48,current wartermark = 2019-08-27 17:11:28
====previousElementTimestamp:-9223372036854775808
orderid =004247aaas789642,orderitemid = 00424778964207,etl_time = 2019-08-27 17:11:48,current wartermark = 2019-08-27 17:11:28

那么什么情况下element会有timestamp?
只有在以TimeCharacteristic.IngestionTime进行处理时则element会被打上进入系统的时间

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

设置后结果如下:

=======WatermarkAssigner:1566899708826
(a,1,1551169050000)
========currentwatermark:1551169047000
=======WatermarkAssigner:1566899709850
(aa,33,1551169064001)
========currentwatermark:1551169061001
=======WatermarkAssigner:1566899710851
(a,2,1551169054000)
========currentwatermark:1551169061001
=======WatermarkAssigner:1566899711851
(a,3,1551169064002)
========currentwatermark:1551169061002
=======WatermarkAssigner:1566899712851
(b,5,1551169100000)
========currentwatermark:1551169097000
=======WatermarkAssigner:1566899713855
(a,4,1551169079003)
========currentwatermark:1551169097000
=======WatermarkAssigner:1566899714855
(aa,44,1551169079004)
========currentwatermark:1551169097000
=======WatermarkAssigner:1566899715855
(b,6,1551169108000)
========currentwatermark:1551169105000

总结:
AssignerWithPeriodicWatermarks 周期性的生成watermark,生成间隔可配置,根据数据的eventTime来更新watermark时间
AssignerWithPunctuatedWatermarks 不会周期性生成watermark,只根据元素
的eventTime来更新watermark。
当用EventTime和ProcessTime来计算时,元素本身都是不带时间戳的,只有以IngestionTime计算时才会打上进入系统的时间戳。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容