- state创建
- state清理 (TTL/clear)
- state存储 (分布式)
- state的恢复
flink中对状态的分类有以下2种:
- Keyed State (跟key关联在一起,作用于KeyedStream)
- Operator State (和并行度有关)
flink中状态的存储有2个格式:
- Managed (flink内置的数据结构存储)
- Raw (原始数据本身的数据结构,但在checkpoint的时候,转化成的byte数组,flink认不出原来的类型)
Key Groups 的个数与定义的最大并行度相同
1. Keyed State
1.1 Managed Keyed State
状态种类 | 描述 | API |
---|---|---|
ValueState<T> | 单个值 | update, value |
ListState<T> | 一组值 | add, addAll, get, update |
ReducingState<T> | 单个值 (代表加入这个状态中所有的数据的一个聚合) | add<T> |
AggregatingState<IN,OUT> | 单个值 (类似ReducingState,但支持聚合输出为不同的类型) | add(IN) |
MapState<UK, UV> | 一组kv结构 | put,putAll, get, entries, key, values |
所有的state都支持clear
操作
State is accessed using the RuntimeContext
, so it is only possible in rich functions
1.2 使用state实现窗口操作
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.createTypeInformation
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// access the state value
val tmpCurrentSum = sum.value
// If it hasn't been used before, it will be null
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// update the count
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
// update the state
sum.update(newSum)
// if the count reaches 2, emit the average and clear the state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
)
}
}
object ExampleCountWindowAverage {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(List(
(1L, 3L),
(1L, 5L),
(1L, 7L),
(1L, 4L),
(1L, 2L)
)).keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// the printed output will be (1,4) and (1,5)
env.execute("ExampleManagedState")
}
}
object ExampleCountWindowAverage extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(List(
(1L, 3L),
(1L, 5L),
(1L, 7L),
(1L, 4L),
(1L, 2L)
)).keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// the printed output will be (1,4) and (1,5)
env.execute("ExampleManagedState")
}
1.3 state TTL
- 状态可以根据TTL的设置,自动清理掉
- TTL的设置可以细粒度到每个元素(一组值)
数据的清理是在读数据的时候做的
清理策略 | 描述 |
---|---|
cleanupFullSnapshot | 在进行全量状态的快照去掉过期的数据 |
cleanupInBackground | |
cleanupInRocksdbCompactFilter |
1.4 state在DataStream API中的使用
stateless api | state api | Managed Keyed State |
---|---|---|
map |
mapWithState |
ValueState |
flatMap |
flatMapWithState |
ValueState |
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
2. Operator State
2.0 状态的split和union
Even-split redistribution:
- Each operator returns a List of state elements
- The whole state is logically a concatenation of all lists
- On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators
Union redistribution:
- Each operator returns a List of state elements
- The whole state is logically a concatenation of all lists
- On restore/redistribution, each operator gets the complete list of state elements
2.1 添加checkpoint的hook
Whenever a checkpoint has to be performed,
snapshotState()
is calledThe counterpart,
initializeState()
, is called every time the user-defined function is initialized, be that when the function is first initialized or be that when the function is actually recovering from an earlier checkpoint
initializeState()
:
- when the parallel function instance is created during distributed
- but also where state recovery logic is included.
2.1.1 以下示例展示了在sink中添加checkpoint的hook的方法:
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import scala.collection.mutable.ListBuffer
class BufferingSink(threshold: Int = 0)
extends SinkFunction[(String, Int)]
with CheckpointedFunction{
@transient
private var checkpointedState: ListState[(String, Int)] = _
private val bufferedElements = ListBuffer[(String, Int)]()
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
bufferedElements += value
if (bufferedElements.size == threshold) {
for (element <- bufferedElements) {
// send it to the sink
}
bufferedElements.clear()
}
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
for (element <- bufferedElements) {
checkpointedState.add(element)
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Int)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
if(context.isRestored) { // 此处在进行数据恢复的时候会用到
for(element <- checkpointedState.get()) {
bufferedElements += element
}
}
}
}
2.1.2 以下示例展示了在source中添加checkpoint的hook的方法:
当更新state或者发送数据的时候,需要获取checkpoint lock
import java.util
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
class CounterSource
extends RichParallelSourceFunction[Long]
with ListCheckpointed[Long]{
@volatile
private var isRunning = true
private var offset = 0L
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
val lock = ctx.getCheckpointLock
while (isRunning) {
lock.synchronized({
ctx.collect(offset)
offset += 1
})
}
}
override def cancel(): Unit = isRunning = false
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = {
util.Collections.singletonList(offset)
}
override def restoreState(state: util.List[Long]): Unit = {
for (s <- state) {
offset = s
}
}
}
ref: