Flink 状态

  1. state创建
  2. state清理 (TTL/clear)
  3. state存储 (分布式)
  4. state的恢复

flink中对状态的分类有以下2种:

  1. Keyed State (跟key关联在一起,作用于KeyedStream)
  2. Operator State (和并行度有关)

flink中状态的存储有2个格式:

  1. Managed (flink内置的数据结构存储)
  2. Raw (原始数据本身的数据结构,但在checkpoint的时候,转化成的byte数组,flink认不出原来的类型)

Key Groups 的个数与定义的最大并行度相同

1. Keyed State

1.1 Managed Keyed State

状态种类 描述 API
ValueState<T> 单个值 update, value
ListState<T> 一组值 add, addAll, get, update
ReducingState<T> 单个值 (代表加入这个状态中所有的数据的一个聚合) add<T>
AggregatingState<IN,OUT> 单个值 (类似ReducingState,但支持聚合输出为不同的类型) add(IN)
MapState<UK, UV> 一组kv结构 put,putAll, get, entries, key, values

所有的state都支持clear 操作

State is accessed using the RuntimeContext, so it is only possible in rich functions

1.2 使用state实现窗口操作

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.createTypeInformation

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}

object ExampleCountWindowAverage {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.fromCollection(List(
      (1L, 3L),
      (1L, 5L),
      (1L, 7L),
      (1L, 4L),
      (1L, 2L)
    )).keyBy(_._1)
      .flatMap(new CountWindowAverage())
      .print()
    // the printed output will be (1,4) and (1,5)

    env.execute("ExampleManagedState")
  }
}

object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)
  env.execute("ExampleManagedState")
}

1.3 state TTL

  1. 状态可以根据TTL的设置,自动清理掉
  2. TTL的设置可以细粒度到每个元素(一组值)

数据的清理是在读数据的时候做的

清理策略 描述
cleanupFullSnapshot 在进行全量状态的快照去掉过期的数据
cleanupInBackground
cleanupInRocksdbCompactFilter

1.4 state在DataStream API中的使用

stateless api state api Managed Keyed State
map mapWithState ValueState
flatMap flatMapWithState ValueState
val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

2. Operator State

2.0 状态的split和union

Even-split redistribution:

  1. Each operator returns a List of state elements
  2. The whole state is logically a concatenation of all lists
  3. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators

Union redistribution:

  1. Each operator returns a List of state elements
  2. The whole state is logically a concatenation of all lists
  3. On restore/redistribution, each operator gets the complete list of state elements

2.1 添加checkpoint的hook

  1. Whenever a checkpoint has to be performed, snapshotState() is called

  2. The counterpart, initializeState(), is called every time the user-defined function is initialized, be that when the function is first initialized or be that when the function is actually recovering from an earlier checkpoint

initializeState():

  1. when the parallel function instance is created during distributed
  2. but also where state recovery logic is included.

2.1.1 以下示例展示了在sink中添加checkpoint的hook的方法:

import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction

import scala.collection.mutable.ListBuffer

class BufferingSink(threshold: Int = 0)
  extends SinkFunction[(String, Int)]
    with CheckpointedFunction{
  @transient
  private var checkpointedState: ListState[(String, Int)] = _

  private val bufferedElements = ListBuffer[(String, Int)]()


  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    bufferedElements += value
    if (bufferedElements.size == threshold) {
      for (element <- bufferedElements) {
        // send it to the sink
      }
      bufferedElements.clear()
    }
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.clear()
    for (element <- bufferedElements) {
      checkpointedState.add(element)
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor = new ListStateDescriptor[(String, Int)](
      "buffered-elements",
      TypeInformation.of(new TypeHint[(String, Int)]() {})
    )
    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    if(context.isRestored) {  // 此处在进行数据恢复的时候会用到
      for(element <- checkpointedState.get()) {
        bufferedElements += element
      }
    }
  }
}

2.1.2 以下示例展示了在source中添加checkpoint的hook的方法:

当更新state或者发送数据的时候,需要获取checkpoint lock

import java.util

import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}

class CounterSource
  extends RichParallelSourceFunction[Long]
    with ListCheckpointed[Long]{

  @volatile
  private var isRunning = true

  private var offset = 0L

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    val lock = ctx.getCheckpointLock
    while (isRunning) {
      lock.synchronized({
        ctx.collect(offset)
        offset += 1
      })
    }
  }

  override def cancel(): Unit = isRunning = false

  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = {
    util.Collections.singletonList(offset)
  }

  override def restoreState(state: util.List[Long]): Unit = {
    for (s <- state) {
      offset = s
    }
  }
}

ref:

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容