Flink 状态

  1. state创建
  2. state清理 (TTL/clear)
  3. state存储 (分布式)
  4. state的恢复

flink中对状态的分类有以下2种:

  1. Keyed State (跟key关联在一起,作用于KeyedStream)
  2. Operator State (和并行度有关)

flink中状态的存储有2个格式:

  1. Managed (flink内置的数据结构存储)
  2. Raw (原始数据本身的数据结构,但在checkpoint的时候,转化成的byte数组,flink认不出原来的类型)

Key Groups 的个数与定义的最大并行度相同

1. Keyed State

1.1 Managed Keyed State

状态种类 描述 API
ValueState<T> 单个值 update, value
ListState<T> 一组值 add, addAll, get, update
ReducingState<T> 单个值 (代表加入这个状态中所有的数据的一个聚合) add<T>
AggregatingState<IN,OUT> 单个值 (类似ReducingState,但支持聚合输出为不同的类型) add(IN)
MapState<UK, UV> 一组kv结构 put,putAll, get, entries, key, values

所有的state都支持clear 操作

State is accessed using the RuntimeContext, so it is only possible in rich functions

1.2 使用state实现窗口操作

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.createTypeInformation

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}

object ExampleCountWindowAverage {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.fromCollection(List(
      (1L, 3L),
      (1L, 5L),
      (1L, 7L),
      (1L, 4L),
      (1L, 2L)
    )).keyBy(_._1)
      .flatMap(new CountWindowAverage())
      .print()
    // the printed output will be (1,4) and (1,5)

    env.execute("ExampleManagedState")
  }
}

object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)
  env.execute("ExampleManagedState")
}

1.3 state TTL

  1. 状态可以根据TTL的设置,自动清理掉
  2. TTL的设置可以细粒度到每个元素(一组值)

数据的清理是在读数据的时候做的

清理策略 描述
cleanupFullSnapshot 在进行全量状态的快照去掉过期的数据
cleanupInBackground
cleanupInRocksdbCompactFilter

1.4 state在DataStream API中的使用

stateless api state api Managed Keyed State
map mapWithState ValueState
flatMap flatMapWithState ValueState
val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

2. Operator State

2.0 状态的split和union

Even-split redistribution:

  1. Each operator returns a List of state elements
  2. The whole state is logically a concatenation of all lists
  3. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators

Union redistribution:

  1. Each operator returns a List of state elements
  2. The whole state is logically a concatenation of all lists
  3. On restore/redistribution, each operator gets the complete list of state elements

2.1 添加checkpoint的hook

  1. Whenever a checkpoint has to be performed, snapshotState() is called

  2. The counterpart, initializeState(), is called every time the user-defined function is initialized, be that when the function is first initialized or be that when the function is actually recovering from an earlier checkpoint

initializeState():

  1. when the parallel function instance is created during distributed
  2. but also where state recovery logic is included.

2.1.1 以下示例展示了在sink中添加checkpoint的hook的方法:

import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction

import scala.collection.mutable.ListBuffer

class BufferingSink(threshold: Int = 0)
  extends SinkFunction[(String, Int)]
    with CheckpointedFunction{
  @transient
  private var checkpointedState: ListState[(String, Int)] = _

  private val bufferedElements = ListBuffer[(String, Int)]()


  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    bufferedElements += value
    if (bufferedElements.size == threshold) {
      for (element <- bufferedElements) {
        // send it to the sink
      }
      bufferedElements.clear()
    }
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.clear()
    for (element <- bufferedElements) {
      checkpointedState.add(element)
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor = new ListStateDescriptor[(String, Int)](
      "buffered-elements",
      TypeInformation.of(new TypeHint[(String, Int)]() {})
    )
    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    if(context.isRestored) {  // 此处在进行数据恢复的时候会用到
      for(element <- checkpointedState.get()) {
        bufferedElements += element
      }
    }
  }
}

2.1.2 以下示例展示了在source中添加checkpoint的hook的方法:

当更新state或者发送数据的时候,需要获取checkpoint lock

import java.util

import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}

class CounterSource
  extends RichParallelSourceFunction[Long]
    with ListCheckpointed[Long]{

  @volatile
  private var isRunning = true

  private var offset = 0L

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    val lock = ctx.getCheckpointLock
    while (isRunning) {
      lock.synchronized({
        ctx.collect(offset)
        offset += 1
      })
    }
  }

  override def cancel(): Unit = isRunning = false

  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = {
    util.Collections.singletonList(offset)
  }

  override def restoreState(state: util.List[Long]): Unit = {
    for (s <- state) {
      offset = s
    }
  }
}

ref:

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容