Process Function (Low-level Operations)

原文地址


The ProcessFunction

ProcessFunction是一个低级的流处理操作,可以访问所有(非循环)流应用程序的基本组件:

  • Events(流元素)
  • state (容错, 一致性,只在KeyedStream上)
  • timers (事件时间和处理时间,只在KeyedStream上)

可以将ProcessFunction看做是具备访问keyed状态和定时器的FlatMapFunction。它通过invoked方法处理从输入流接收到的每个事件。

对于容错状态,ProcessFunction通过RuntimeContext访问Flink的keyed状态,类似于有状态的函数访问keyed状态。
定时器允许应用程序基于处理时间和事件时间响应变化。每次调用函数的 processElement(...)方法获得一个Context对象,它可以访问元素的事件时间戳,和对TimerService的访问。TimerService可以用来为将来的事件/处理时间注册回调。当定时器的达到定时时间时,会调用onTimer(...) 方法。
注意:您想访问keyed状态和定时器,则必须在键控流上应用ProcessFunction:

stream.keyBy(...).process(new MyProcessFunction())

Low-level Joins

为了在两个输入上实现低级别的操作,应用可以使用CoProcessFunction。。该函数绑定到两个不同的输入,并为不同的输入记录分别单独调用processElement1(…)和processElement2(…)。
实现低级别join通常遵循以下模式:

  • 为一个(或两个)输入创建一个状态对象。
  • 当从输入源收到元素时,更新状态
  • 当从输入源收到元素时,探测状态并生成join的结果。

例如,当为客户数据保存状态时,你可能会join客户数据和财务交易。If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.

Example

下面的示例为每个键进行计数,并当超过一分钟(事件时间)没有更新key的计数,则下发key/计数对:

  • 数量,key和最后一次更新时间都存储在ValueState中,它由key隐式关联。
  • 对于每个记录,ProcessFunction将数量加一并且设置最后一次更新时间
  • function将会在一分钟后(事件时间)调度回调
  • 每当回调执行时,它检查回调的事件时间戳和存储的最后一次更新时间,如果匹配(也就是说,一分钟内没有更新)则下发key/count对

注意:这个简单的示例可以通过会话窗口实现。我们在这里使用ProcessFunction来说明它提供的基本模式。

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;


// the source data stream
DataStream<Tuple2<String, String>> stream = ...;

// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
    .keyBy(0)
    .process(new CountWithTimeoutFunction());

/**
 * The data type stored in the state
 */
public class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

/**
 * The implementation of the ProcessFunction that maintains the count and timeouts
 */
public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {

    /** The state that is maintained by this process function */
    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}

注意:在Flink1.4.0之前,当调用来自处理时间的定时器时,ProcessFunction.onTimer()方法设置当前处理时间为事件时间戳。这种行为非常微妙,用户可能不会注意到。嗯,这是有害的,因为处理时间是不确定的并且不与水位对齐。此外,用户实现的逻辑依赖于这个错误的时间戳很可能是无意的错误。因此我们决定修复它。在升级到1.4.0时,使用这个错误事件时间戳的Flink作业将会失败,用户应该将他们的工作调整为正确的逻辑。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,079评论 25 707
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,652评论 18 139
  • 如果不是看见照片 他的笑颜 红了的眼 我不会想起 曾经…… 梦见过他
    花木雀跃是紫紫阅读 246评论 0 0
  • 七月似火, 我来到白沙溪畔, 倚靠一棵松树, 粗糙的树皮 书写岁月的沧桑, 蚂蚁自由徜徉, 松脂不断流淌, 松果迅...
    金赛月阅读 303评论 4 8
  • 你好吗?
    秦声风韵阅读 107评论 0 0