Flink BroadcastStream

假设存在这样一种场景,需要实时对运行在我们集群上的程序进行日志监控。但是程序的监控规则经常变更。这个时候就需要我们在处理各程序日志数据的时候要实时和当前程序的监控规则进行匹配判断,而且监控规则的变更要实时的被我们处理逻辑感知到。

这个时候就可以使用广播状态,将程序的日志数据看做是一个流ActionStream,监控规则数据也看做是一个流RuleStream,将RuleStream流中数据下发到ActionStream流中,使得在ActionStream流中每一个Task都能获取到RuleStream流中所有数据。这种行为称为广播,RuleStream流称为广播流,ActionStream称为非广播流,流入到ActionStream流中的rule数据称之为广播数据,放入到Flink的状态中就称之为广播状态。

下边我们就通过一个简单的例子来学习理解一下

我们将kafka topic名为test中的数据读取到flink中作为非广播流--actionStream,kafka topic名为test_1中的数据读取到flink中作为广播流--ruleStream。当我们处理actionStream中每条记录时去和当前ruleStream最新的记录进行最简单的连接操作。我们通过BroadcastState(本质上是一个MapState)这个广播状态来保存ruleStream中的最新记录。

主要的操作步骤

1.定义一个广播流

// 广播状态描述
val broadcastStateDesc: MapStateDescriptor[String, String] =
    new MapStateDescriptor[String, String]("broadcast-desc", classOf[String], classOf[String])
// 将普通的非广播流转为广播流
val ruleStream: BroadcastStream[String] = normalStream.broadcast(broadcastStateDesc) 

将一个正常非广播流转化为广播流时需要指定它的广播状态描述,并且只能是 MapStateDescriptor类型,在后续的处理中可通过该描述获取到广播状态。

2.连接非广播流和广播流

val connectedStream: BroadcastConnectedStream[(String, Int), String] = actionStream.connect(ruleStream)

通过connect算子来将两条流连接在一起,此时广播流ruleStream就会被广播到非广播流actionStream中,得到的是一个BroadcastConnectedStream的流。BroadcastConnectedStream流本质上包含了广播流ruleStream和非广播流actionStream。

3.后续通过process算子来处理BroadcastConnectedStream流

connectedStream.process(...)

此时process算子中的参数类型会根据非广播流actionStream的类型分为两种。如果actionStream有经过keyBy算子操作后转为KeyedStream类型那么process()中为KeyedBroadcastProcessFunction否则为BroadcastProcessFunction。(此处就只简单使用下KeyedBroadcastProcessFunction,它们两具体的区别和功能大家去参考下官网哈)。在使用上都有两个方法:processElement处理非connected流数据并且只可读取广播状态,processBroadcastElement处理connectedStream流数据并且可读写广播状态。因为flink里面没有跨任务通信的机制,在一个任务实例中的修改不能在并行任务间传递。 得保证BroadcastState在算子的并行实例是相同的,所以不能让单个任务去修改状态,只能让广播方修改。

4.代码如下

  def main(args: Array[String]): Unit = {
    // 获取执行流处理引擎
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 行为流 -- 非广播流
    val actionStream: KeyedStream[(String, Int), String] = env
      .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), initProps()))
      .map((_, 1))
      .keyBy(new KeySelector[(String, Int), String] {
        override def getKey(in: (String, Int)): String = in._1
      })

    // 广播状态描述
    val broadcastStateDesc: MapStateDescriptor[String, String] =
      new MapStateDescriptor[String, String]("broadcast-desc", classOf[String], classOf[String])

    // 规则流 -- 广播流
    val ruleStream: BroadcastStream[String] = env
      .addSource(new FlinkKafkaConsumer010[String]("test_1", new SimpleStringSchema(), initProps()))
      .broadcast(broadcastStateDesc) // 将基础流转为广播流的时候需要指定广播流的描述信息

    // 使用connect算子将 主体基本流 和 广播流连接起来
    val connectedStream: BroadcastConnectedStream[(String, Int), String] = actionStream.connect(ruleStream)

    // 处理连接流数据
    connectedStream
      .process(new MyKeyedBroadcastProcessFunction(broadcastStateDesc))
      .print()

    env.execute("broadcast_stream")
  }
class MyKeyedBroadcastProcessFunction(broadcastStateDesc: MapStateDescriptor[String, String])
  extends KeyedBroadcastProcessFunction[String, (String, Int), String, String] {
  // 每当 主体基本流新增一条记录,该方法就会执行一次
  override def processElement(in1: (String, Int),
                              readOnlyCtx: KeyedBroadcastProcessFunction[String, (String, Int), String, String]#ReadOnlyContext,
                              collector: Collector[String]): Unit = {
    // 从 广播状态中根据key获取数据(规则数据)
    val ruleString: String = readOnlyCtx.getBroadcastState(broadcastStateDesc).get("rule")
    collector.collect(in1 + ruleString)
  }

  // 每当 广播流新增一条记录,该方法就会执行一次
  override def processBroadcastElement(in2: String,
                                       ctx: KeyedBroadcastProcessFunction[String, (String, Int), String, String]#Context,
                                       collector: Collector[String]): Unit = {
    // 获取广播状态并更新状态数据(规则数据)
    ctx.getBroadcastState(broadcastStateDesc).put("rule", in2)
  }
}
01_broadcast_stream_demo.png
下边说明需要注意的是:我们仅对BroadcastState中key为"rule"对应的value值进行更改操作

第一步,生产数据step_01到test_1中,此时控制台没有打印信息
       但此时,我们已经将step_01这条数据存放到了broadcastState中
       
第二步,生产数据step_02到test中,此时控制台打印信息(step_02,1)step_01
       说明actionStream中的数据和从broadcastState捕获到规则数据拼接并打印
       
第三步,生产数据step_03到test中,actionStream中的数据和从broadcastState捕获到规则数据拼接并打印

第四步,生产数据step_04到test_1中,此时控制台没有打印信息
      但此时,我们已经更新了broadcastState中存放的数据为step_04
      
第五步,生产数据step_05到test中,根据打印信息可以得知,我们当前获取到了broadcastState中最新的值
      也就是,我们的非广播流数据实时的感知到广播流数据的流动情况

以后我们遇到这种监控程序日志时,监控规则经常变更的需求时就可以考虑使用广播流来进行处理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容

  • 了解Flink是什么,Flink应用程序运行的多样化,对比业界常用的流处理框架,Flink的发展趋势,Flink生...
    JavaEdge阅读 5,072评论 1 18
  • 架构 Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink设计为在...
    盗梦者_56f2阅读 37,798评论 0 6
  • 今天的超长视频聊天。很自在,很舒服。 亀亀那个眼镜真的很不错呢。等会亀亀还要去开车,19号还要考试。心疼亀亀,但只...
    WoodSage阅读 147评论 0 1
  • 【一周配方推荐】女性生理问题用油 1、女性抑郁情绪问题:首先玫瑰,其次仕女; 2、乳腺增生:乳香、野橘、天竺葵,柑...
    时明妍苏州评弹阅读 721评论 0 2
  • 文 / 芹菜 最近,我真的特别忙。忙碌到让我怀疑参加弗兰克的写作营是否是一个正确的选择。 前天的打卡是凌晨2点写的...
    芹菜qincai阅读 206评论 0 1