1.应用的一致性保障
Fink的检查点和恢复机制和可以重置读位置的source连接器结合使用,可以保证应用程序不会丢失任何数据。尽管如此,应用程序可能会发出两次计算结果,因为从上一次检查点恢复的应用程序所计算的结果将会被重新发送一次(一些结果已经发送出去了,这时任务故障,然后从上一次检查点恢复,这些结果将被重新计算一次然后发送出去)。所以,可重置读位置的source和Flink的恢复机制不足以提供端到端的恰好处理一次语义,即使应用程序的状态是恰好处理一次一致性级别。
(精准一次性)
幂等性写
一个幂等操作无论执行多少次都会返回同样的结果。例如,重复的向hashmap中插入同样的key-value对就是幂等操作,因为头一次插入操作之后所有的插入操作都不会改变这个hashmap,因为hashmap已经包含这个key-value对了。另一方面,append操作就不是幂等操作了,因为多次append同一个元素将会导致列表每次都会添加一个元素。在流处理程序中,幂等写入操作是很有意思的,因为幂等写入操作可以执行多次但不改变结果。所以它们可以在某种程度上缓和Flink检查点机制带来的重播计算结果的效应。
需要注意的是,依赖于幂等性sink来达到exactly-once语义的应用程序,必须保证在从检查点恢复以后,它将会覆盖之前已经写入的结果。例如,一个包含有sink操作的应用在sink到一个key-value存储时必须保证它能够确定的计算出将要更新的key值。同时,从Flink程序sink到的key-value存储中读取数据的应用,在Flink从检查点恢复的过程中,可能会看到不想看到的结果。当重播开始时,之前已经发出的计算结果可能会被更早的结果所覆盖(因为在恢复过程中)。所以,一个消费Flink程序输出数据的应用,可能会观察到时间回退,例如读到了比之前小的计数。也就是说,当流处理程序处于恢复过程中时,流处理程序的结果将处于不稳定的状态,因为一些结果被覆盖掉,而另一些结果还没有被覆盖。一旦重播完成,也就是说应用程序已经通过了之前出故障的点,结果将会继续保持一致性。
事务性写
第二种实现端到端的恰好处理一次一致性语义的方法基于事务性写入。其思想是只将最近一次成功保存的检查点之前的计算结果写入到外部系统中去。这样就保证了在任务故障的情况下,端到端恰好处理一次语义。应用将被重置到最近一次的检查点,而在这个检查点之后并没有向外部系统发出任何计算结果。通过只有当检查点保存完成以后再写入数据这种方法,事务性的方法将不会遭受幂等性写入所遭受的重播不一致的问题。尽管如此,事务性写入却带来了延迟,因为只有在检查点完成以后,我们才能看到计算结果。
Flink提供了两种构建模块来实现事务性sink连接器:write-ahead-log(WAL,预写式日志)sink和两阶段提交sink。WAL式sink将会把所有计算结果写入到应用程序的状态中,等接到检查点完成的通知,才会将计算结果发送到sink系统。因为sink操作会把数据都缓存在状态后段,所以WAL可以使用在任何外部sink系统上。尽管如此,WAL还是无法提供刀枪不入的恰好处理一次语义的保证,再加上由于要缓存数据带来的状态后段的状态大小的问题,WAL模型并不十分完美。
与之形成对比的,2PC sink需要sink系统提供事务的支持或者可以模拟出事务特性的模块。对于每一个检查点,sink开始一个事务,然后将所有的接收到的数据都添加到事务中,并将这些数据写入到sink系统,但并没有提交(commit)它们。当事务接收到检查点完成的通知时,事务将被commit,数据将被真正的写入sink系统。这项机制主要依赖于一次sink可以在检查点完成之前开始事务,并在应用程序从一次故障中恢复以后再commit的能力。
2PC协议依赖于Flink的检查点机制。检查点屏障是开始一个新的事务的通知,所有操作符自己的检查点成功的通知是它们可以commit的投票,而作业管理器通知一个检查点成功的消息是commit事务的指令。于WAL sink形成对比的是,2PC sinks依赖于sink系统和sink本身的实现可以实现恰好处理一次语义。更多的,2PC sink不断的将数据写入到sink系统中,而WAL写模型就会有之前所述的问题。
2.内置连接器
Apache Kafka Source连接器
Kafka将事件流组织为所谓的topics。一个主题就是一个事件日志系统,Kafka可以保证主题中的数据在被读取时和这些数据在被写入时相同的顺序。为了扩大读写的规模,主题可以分裂为多个分区,这些分区分布在一个集群上面。这时,读写顺序的保证就限制到了分区这个粒度, Kafka并没有提供从不同分区读取数据时的顺序保证。Kafka分区的读位置称为偏移量(offset)。
Kafka的依赖引入如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version></version>
</dependency>
kafka连接器使用代码
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val stream: DataStream[String] = env.addSource(
new FlinkKafkaConsumer[String](
"topic",
new SimpleStringSchema(),
properties))
- 第一个参数定义了从哪些topic中读取数据,可以是一个topic,也可以是topic列表,还可以是匹配所有想要读取的topic的正则表达式。当从多个topic中读取数据时,Kafka连接器将会处理所有topic的分区,将这些分区的数据放到一条流中去。
- 第二个参数是一个DeserializationSchema或者KeyedDeserializationSchema。Kafka消息被存储为原始的字节数据,所以需要反序列化成Java或者Scala对象。上例中使用的SimpleStringSchema,是一个内置的DeserializationSchema,它仅仅是简单的将字节数组反序列化成字符串。DeserializationSchema和KeyedDeserializationSchema是公共的接口,所以我们可以自定义反序列化逻辑。
- 第三个参数是一个Properties对象,设置了用来读写的Kafka客户端的一些属性。
为了抽取事件时间的时间戳然后产生水印,我们可以通过调用
FlinkKafkaConsumer.assignTimestampsAndWatermark()
Apache Kafka Sink连接器
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version></version>
</dependency>
Kakfa Sink的at-least-once保证
Flink的Kafka sink提供了基于配置的一致性保证。Kafka sink使用下面的条件提供了至少处理一次保证:
Flink检查点机制开启,所有的数据源都是可重置的。
当写入失败时,sink连接器将会抛出异常,使得应用程序挂掉然后重启。这是默认行为。应用程序内部的Kafka客户端还可以配置为重试写入,只要提前声明当写入失败时,重试几次这样的属性(retries property)。
sink连接器在完成它的检查点之前会等待Kafka发送已经将数据写入的通知。
Kafka Sink的恰好处理一次语义保证
Kafka 0.11版本引入了事务写特性。由于这个新特性,Flink Kafka sink可以为输出结果提供恰好处理一次语义的一致性保证,只要经过合适的配置就行。Flink程序必须开启检查点机制,并从可重置的数据源进行消费。FlinkKafkaProducer还提供了包含Semantic参数的构造器来控制sink提供的一致性保证。可能的取值如下:
- Semantic.NONE,不提供任何一致性保证。数据可能丢失或者被重写多次。
- Semantic.AT_LEAST_ONCE,保证无数据丢失,但可能被处理多次。这个是默认设置。
- Semantic.EXACTLY_ONCE,基于Kafka的事务性写入特性实现,保证每条数据恰好处理一次。
文件系统source连接器
val lineReader = new TextInputFormat(null)
val lineStream: DataStream[String] = env.readFile[String](
lineReader, // The FileInputFormat
"hdfs:///path/to/my/data", // The path to read
FileProcessingMode
.PROCESS_CONTINUOUSLY, // The processing mode
30000L) // The monitoring interval in ms
StreamExecutionEnvironment.readFile()接收如下参数:
FileInputFormat参数,负责读取文件中的内容。
文件路径。如果文件路径指向单个文件,那么将会读取这个文件。如果路径指向一个文件夹,FileInputFormat将会扫描文件夹中所有的文件。
PROCESS_CONTINUOUSLY将会周期性的扫描文件,以便扫描到文件新的改变。
30000L表示多久扫描一次监听的文件。
FileInputFormat是一个特定的InputFormat,用来从文件系统中读取文件。FileInputFormat分两步读取文件。首先扫描文件系统的路径,然后为所有匹配到的文件创建所谓的input splits。一个input split将会定义文件上的一个范围,一般通过读取的开始偏移量和读取长度来定义。在将一个大的文件分割成一堆小的splits以后,这些splits可以分发到不同的读任务,这样就可以并行的读取文件了。FileInputFormat的第二步会接收一个input split,读取被split定义的文件范围,然后返回对应的数据。
DataStream应用中使用的FileInputFormat需要实现CheckpointableInputFormat接口。这个接口定义了方法来做检查点和重置文件片段的当前的读取位置。
在Flink 1.7中,Flink提供了一些类,这些类继承了FileInputFormat,并实现了CheckpointableInputFormat接口。TextInputFormat一行一行的读取文件,而CsvInputFormat使用逗号分隔符来读取文件。
文件系统sink连接器
在将流处理应用配置成exactly-once检查点机制,以及配置成所有源数据都能在故障的情况下可以重置,Flink的StreamingFileSink提供了端到端的恰好处理一次语义保证。下面的例子展示了StreamingFileSink的使用方式。
val input: DataStream[String] = …
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(
new Path("/base/path"),
new SimpleStringEncoder[String]("UTF-8"))
.build()
input.addSink(sink)
当StreamingFileSink接到一条数据,这条数据将被分配到一个桶(bucket)中。一个桶是我们配置的“/base/path”的子目录。
Flink使用BucketAssigner来分配桶。BucketAssigner是一个公共的接口,为每一条数据返回一个BucketId,BucketId决定了数据被分配到哪个子目录。如果没有指定BucketAssigner,Flink将使用DateTimeBucketAssigner来将每条数据分配到每个一个小时所产生的桶中去,基于数据写入的处理时间(机器时间,墙上时钟)。
StreamingFileSink提供了exactly-once输出的保证。sink通过一个commit协议来达到恰好处理一次语义的保证。这个commit协议会将文件移动到不同的阶段,有以下状态:in progress,pending,finished。这个协议基于Flink的检查点机制。当Flink决定roll a file时,这个文件将被关闭并移动到pending状态,通过重命名文件来实现。当下一个检查点完成时,pending文件将被移动到finished状态,同样是通过重命名来实现。
一旦任务故障,sink任务需要将处于in progress状态的文件重置到上一次检查点的写偏移量。这个可以通过关闭当前in progress的文件,并将文件结尾无效的部分丢弃掉来实现。
3.实现自定义数据源函数
DataStream API
提供了两个接口来实现source连接器:
SourceFunction
和RichSourceFunction
可以用来定义非并行的source连接器,source跑在单任务上。
ParallelSourceFunction
和RichParallelSourceFunction
可以用来定义跑在并行实例上的source连接器。
除了并行于非并行的区别,这两种接口完全一样。就像process function的rich版本一样,RichSourceFunction
和RichParallelSourceFunction
的子类可以override open()
和close()
方法,也可以访问RuntimeContext
,RuntimeContext
提供了并行任务实例的数量,当前任务实例的索引,以及一些其他信息。
SourceFunction
和ParallelSourceFunction
定义了两种方法:
void run(SourceContext ctx)
cancel()
run()
方法用来读取或者接收数据然后将数据摄入到Flink应用中。根据接收数据的系统,数据可能是推送的也可能是拉取的。Flink仅仅在特定的线程调用run()方法一次,通常情况下会是一个无限循环来读取或者接收数据并发送数据。任务可以在某个时间点被显式的取消,或者由于流是有限流,当数据被消费完毕时,任务也会停止。
当应用被取消或者关闭时,cancel()
方法会被Flink调用。为了优雅的关闭Flink应用,run()
方法需要在cancel()
被调用以后,立即终止执行。下面的例子显示了一个简单的源函数的例子:从0数到Long.MaxValue
。
class CountSource extends SourceFunction[Long] {
var isRunning: Boolean = true
override def run(ctx: SourceFunction.SourceContext[Long]) = {
var cnt: Long = -1
while (isRunning && cnt < Long.MaxValue) {
cnt += 1
ctx.collect(cnt)
}
}
override def cancel() = isRunning = false
}
可重置的源函数
支持重播输出的源函数需要和Flink的检查点机制集成起来,还需要在检查点被处理时,持久化当前所有的读取位置。当应用从一个保存点(savepoint)恢复或者从故障恢复时,Flink会从最近一次的检查点或者保存点中获取读偏移量。如果程序开始时并不存在状态,那么读偏移量将会被设置到一个默认值。一个可重置的源函数需要实现CheckpointedFunction接口,还需要能够存储读偏移量和相关的元数据,例如文件的路径,分区的ID。这些数据将被保存在list state或者union list state中
class ResettableCountSource
extends SourceFunction[Long] with CheckpointedFunction {
var isRunning: Boolean = true
var cnt: Long = _
var offsetState: ListState[Long] = _
override def run(ctx: SourceFunction.SourceContext[Long]) = {
while (isRunning && cnt < Long.MaxValue) {
// synchronize data emission and checkpoints
ctx.getCheckpointLock.synchronized {
cnt += 1
ctx.collect(cnt)
}
}
}
override def cancel() = isRunning = false
override def snapshotState(
snapshotCtx: FunctionSnapshotContext
): Unit = {
// remove previous cnt
offsetState.clear()
// add current cnt
offsetState.add(cnt)
}
override def initializeState(
initCtx: FunctionInitializationContext): Unit = {
val desc = new ListStateDescriptor[Long](
"offset", classOf[Long])
offsetState = initCtx
.getOperatorStateStore
.getListState(desc)
// initialize cnt variable
val it = offsetState.get()
cnt = if (null == it || !it.iterator().hasNext) {
-1L
} else {
it.iterator().next()
}
}
}
4.实现自定义数据汇函数
DataStream API中,任何运算符或者函数都可以向外部系统发送数据。DataStream不需要最终流向sink运算符。例如,我们可能实现了一个FlatMapFunction
,这个函数将每一个接收到的数据通过HTTP POST
请求发送出去,而不使用Collecto
r发送到下一个运算符。DataStream API也提供了SinkFunction
接口以及对应的rich版本RichSinkFunction
抽象类。SinkFunction
接口提供了一个方法:
void invode(IN value, Context ctx)
SinkFunction的Context可以访问当前处理时间,当前水位线,以及数据的时间戳。
下面的例子展示了一个简单的SinkFunction,可以将传感器读数写入到socket中去。需要注意的是,我们需要在启动Flink程序前启动一个监听相关端口的进程。否则将会抛出ConnectException异常。可以运行“nc -l localhost 9191”命令。
val readings: DataStream[SensorReading] = ...
// write the sensor readings to a socket
readings.addSink(new SimpleSocketSink("localhost", 9191))
// set parallelism to 1 because only one thread can write to a socket
.setParallelism(1)
// -----
class SimpleSocketSink(val host: String, val port: Int)
extends RichSinkFunction[SensorReading] {
var socket: Socket = _
var writer: PrintStream = _
override def open(config: Configuration): Unit = {
// open socket and writer
socket = new Socket(InetAddress.getByName(host), port)
writer = new PrintStream(socket.getOutputStream)
}
override def invoke(
value: SensorReading,
ctx: SinkFunction.Context[_]): Unit = {
// write sensor reading to socket
writer.println(value.toString)
writer.flush()
}
override def close(): Unit = {
// close writer and socket
writer.close()
socket.close()
}
}
端到端的一致性保证建立在sink连接器的属性上面。为了达到端到端的恰好处理一次语义的目的,应用程序需要幂等性的sink连接器或者事务性的sink连接器。上面例子中的SinkFunction既不是幂等写入也不是事务性的写入。由于socket具有只能添加(append-only)这样的属性,所以不可能实现幂等性的写入。又因为socket不具备内置的事务支持,所以事务性写入就只能使用Flink的WAL sink特性来实现了。
幂等sink连接器
对于大多数应用,SinkFunction接口足以实现一个幂等性写入的sink连接器了。需要以下两个条件:
- 结果数据必须具有确定性的key,在这个key上面幂等性更新才能实现。例如一个计算每分钟每个传感器的平均温度值的程序,确定性的key值可以是传感器的ID和每分钟的时间戳。确定性的key值,对于在故障恢复的场景下,能够正确的覆盖结果非常的重要。
- 外部系统支持针对每个key的更新,例如关系型数据库或者key-value存储。
val readings: DataStream[SensorReading] = ...
// write the sensor readings to a Derby table
readings.addSink(new DerbyUpsertSink)
// -----
class DerbyUpsertSink extends RichSinkFunction[SensorReading] {
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
// connect to embedded in-memory Derby
conn = DriverManager.getConnection(
"jdbc:derby:memory:flinkExample",
new Properties())
// prepare insert and update statements
insertStmt = conn.prepareStatement(
"INSERT INTO Temperatures (sensor, temp) VALUES (?, ?)")
updateStmt = conn.prepareStatement(
"UPDATE Temperatures SET temp = ? WHERE sensor = ?")
}
override def invoke(SensorReading r, context: Context[_]): Unit = {
// set parameters for update statement and execute it
updateStmt.setDouble(1, r.temperature)
updateStmt.setString(2, r.id)
updateStmt.execute()
// execute insert statement
// if update statement did not update any row
if (updateStmt.getUpdateCount == 0) {
// set parameters for insert statement
insertStmt.setString(1, r.id)
insertStmt.setDouble(2, r.temperature)
// execute insert statement
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
事务性sink连接器
事务写入sink连接器需要和Flink的检查点机制集成,因为只有在检查点成功完成以后,事务写入sink连接器才会向外部系统commit数据。
为了简化事务性sink的实现,Flink提供了两个模版用来实现自定义sink运算符。这两个模版都实现了CheckpointListener接口。CheckpointListener接口将会从作业管理器接收到检查点完成的通知。
GenericWriteAheadSink模版会收集检查点之前的所有的数据,并将数据存储到sink任务的运算符状态中。状态保存到了检查点中,并在任务故障的情况下恢复。当任务接收到检查点完成的通知时,任务会将所有的数据写入到外部系统中。
TwoPhaseCommitSinkFunction模版利用了外部系统的事务特性。对于每一个检查点,任务首先开始一个新的事务,并将接下来所有的数据都写到外部系统的当前事务上下文中去。当任务接收到检查点完成的通知时,sink连接器将会commit这个事务。
GENERICWRITEAHEADSINK
GenericWriteAheadSink使得sink运算符可以很方便的实现。这个运算符和Flink的检查点机制集成使用,目标是将每一条数据恰好一次写入到外部系统中去。需要注意的是,在发生故障的情况下,write-ahead log sink可能会不止一次的发送相同的数据。所以GenericWriteAheadSink无法提供完美无缺的恰好处理一次语义的一致性保证,而是仅能提供at-least-once这样的保证。我们接下来详细的讨论这些场景。
GenericWriteAheadSink的原理是将接收到的所有数据都追加到有检查点分割好的预写式日志中去。每当sink运算符碰到检查点屏障,运算符将会开辟一个新的section,并将接下来的所有数据都追加到新的section中去。WAL(预写式日志)将会保存到运算符状态中。由于log能被恢复,所有不会有数据丢失。
当GenericWriteAheadSink接收到检查点完成的通知时,将会发送对应检查点的WAL中存储的所有数据。当所有数据发送成功,对应的检查点必须在内部提交。
检查点的提交分两步。第一步,sink持久化检查点被提交的信息。第二步,删除WAL中所有的数据。我们不能将commit信息保存在Flink应用程序状态中,因为状态不是持久化的,会在故障恢复时重置状态。相反,GenericWriteAheadSink依赖于可插拔的组件在一个外部持久化存储中存储和查找提交信息。这个组件就是CheckpointCommitter。
继承GenericWriteAheadSink的运算符需要提供三个构造器函数。
- CheckpointCommitter
- TypeSerializer,用来序列化输入数据。
- 一个job ID,传给CheckpointCommitter,当应用重启时可以识别commit信息。
还有,write-ahead运算符需要实现一个单独的方法:
boolean sendValues(Iterable<IN> values, long chkpntId, long timestamp)
val readings: DataStream[SensorReading] = ...
// write the sensor readings to the standard out via a write-ahead log
readings.transform(
"WriteAheadSink", new SocketWriteAheadSink)
class StdOutWriteAheadSink extends GenericWriteAheadSink[SensorReading](
// CheckpointCommitter that commits
// checkpoints to the local filesystem
new FileCheckpointCommitter(System.getProperty("java.io.tmpdir")),
// Serializer for records
createTypeInformation[SensorReading]
.createSerializer(new ExecutionConfig),
// Random JobID used by the CheckpointCommitter
UUID.randomUUID.toString) {
override def sendValues(
readings: Iterable[SensorReading],
checkpointId: Long,
timestamp: Long): Boolean = {
for (r <- readings.asScala) {
// write record to standard out
println(r)
}
true
}
}
GenericWriteAheadSink无法提供完美的exactly-once保证。有两个故障状况会导致数据可能被发送不止一次。
当任务执行sendValues()方法时,程序挂掉了。如果外部系统无法原子性的写入所有数据(要么都写入要么都不写),一些数据可能会写入,而另一些数据并没有被写入。由于checkpoint还没有commit,所以在任务恢复的过程中一些数据可能会被再次写入。
所有数据都写入成功了,sendValues()方法也返回true了;但在CheckpointCommitter方法被调用之前程序挂了,或者CheckpointCommitter在commit检查点时失败了。那么在恢复的过程中,所有未被提交的检查点将会被重新写入。
TWOPHASECOMMITSINKFUNCTION
Flink提供了TwoPhaseCommitSinkFunction接口来简化sink函数的实现。这个接口保证了端到端的exactly-once语义。2PC sink函数是否提供这样的一致性保证取决于我们的实现细节。我们需要讨论一个问题:“2PC协议是否开销太大?”
通常来讲,为了保证分布式系统的一致性,2PC是一个非常昂贵的方法。尽管如此,在Flink的语境下,2PC协议针对每一个检查点只运行一次。TwoPhaseCommitSinkFunction和WAL sink很相似,不同点在于前者不会将数据收集到state中,而是会写入到外部系统事务的上下文中。
TwoPhaseCommitSinkFunction实现了以下协议。在sink任务发送出第一条数据之前,任务将在外部系统中开始一个事务,所有接下来的数据将被写入这个事务的上下文中。当作业管理器初始化检查点并将检查点屏障插入到流中的时候,2PC协议的投票阶段开始。当运算符接收到检查点屏障,运算符将保存它的状态,当保存完成时,运算符将发送一个acknowledgement信息给作业管理器。当sink任务接收到检查点屏障时,运算符将会持久化它的状态,并准备提交当前的事务,以及acknowledge JobManager中的检查点。发送给作业管理器的acknowledgement信息类似于2PC协议中的commit投票。sink任务还不能提交事务,因为它还没有保证所有的任务都已经完成了它们的检查点操作。sink任务也会为下一个检查点屏障之前的所有数据开始一个新的事务。
当作业管理器成功接收到所有任务实例发出的检查点操作成功的通知时,作业管理器将会把检查点完成的通知发送给所有感兴趣的任务。这里的通知对应于2PC协议的提交命令。当sink任务接收到通知时,它将commit所有处于开启状态的事务。一旦sink任务acknowledge了检查点操作,它必须能够commit对应的事务,即使任务发生故障。如果commit失败,数据将会丢失。
让我们总结一下外部系统需要满足什么样的要求:
- 外部系统必须提供事务支持,或者sink的实现能在外部系统上模拟事务功能。
- 在检查点操作期间,事务必须处于open状态,并接收这段时间数据的持续写入。
- 事务必须等到检查点操作完成的通知到来才可以提交。在恢复周期中,可能需要一段时间等待。如果sink系统关闭了事务(例如超时了),那么未被commit的数据将会丢失。
- sink必须在进程挂掉后能够恢复事务。一些sink系统会提供事务ID,用来commit或者abort一个开始的事务。
- commit一个事务必须是一个幂等性操作。sink系统或者外部系统能够观察到事务已经被提交,或者重复提交并没有副作用。
class TransactionalFileSink(val targetPath: String, val tempPath: String)
extends TwoPhaseCommitSinkFunction[(String, Double), String, Void](
createTypeInformation[String].createSerializer(new ExecutionConfig),
createTypeInformation[Void].createSerializer(new ExecutionConfig)) {
var transactionWriter: BufferedWriter = _
// Creates a temporary file for a transaction into
// which the records are written.
override def beginTransaction(): String = {
// path of transaction file
// is built from current time and task index
val timeNow = LocalDateTime.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
val taskIdx = this.getRuntimeContext.getIndexOfThisSubtask
val transactionFile = s"$timeNow-$taskIdx"
// create transaction file and writer
val tFilePath = Paths.get(s"$tempPath/$transactionFile")
Files.createFile(tFilePath)
this.transactionWriter = Files.newBufferedWriter(tFilePath)
println(s"Creating Transaction File: $tFilePath")
// name of transaction file is returned to
// later identify the transaction
transactionFile
}
/** Write record into the current transaction file. */
override def invoke(
transaction: String,
value: (String, Double),
context: Context[_]): Unit = {
transactionWriter.write(value.toString)
transactionWriter.write('\n')
}
/** Flush and close the current transaction file. */
override def preCommit(transaction: String): Unit = {
transactionWriter.flush()
transactionWriter.close()
}
// Commit a transaction by moving
// the precommitted transaction file
// to the target directory.
override def commit(transaction: String): Unit = {
val tFilePath = Paths.get(s"$tempPath/$transaction")
// check if the file exists to ensure
// that the commit is idempotent
if (Files.exists(tFilePath)) {
val cFilePath = Paths.get(s"$targetPath/$transaction")
Files.move(tFilePath, cFilePath)
}
}
// Aborts a transaction by deleting the transaction file.
override def abort(transaction: String): Unit = {
val tFilePath = Paths.get(s"$tempPath/$transaction")
if (Files.exists(tFilePath)) {
Files.delete(tFilePath)
}
}
}
TwoPhaseCommitSinkFunction[IN, TXN, CONTEXT]包含如下三个范型参数:
- IN表示输入数据的类型。
- TXN定义了一个事务的标识符,可以用来识别和恢复事务。
- CONTEXT定义了自定义的上下文。
TwoPhaseCommitSinkFunction的构造器需要两个TypeSerializer。一个是TXN的类型,另一个是CONTEXT的类型。
最后,TwoPhaseCommitSinkFunction定义了五个需要实现的方法:
- beginTransaction(): TXN开始一个事务,并返回事务的标识符。
- invoke(txn: TXN, value: IN, context: Context[_]): Unit将值写入到当前事务中。
- preCommit(txn: TXN): Unit预提交一个事务。一个预提交的事务不会接收新的写入。
- commit(txn: TXN): Unit提交一个事务。这个操作必须是幂等的。
- abort(txn: TXN): Unit终止一个事务。
(自定义数据源和自定义数据汇没学过,有时间看一遍)
5.异步访问外部系统
AsyncFunction
https://blog.csdn.net/dajiangtai007/article/details/88743860