1.为什么需要异步IO
flink在做实时处理时,有时候需要和外部数据交互,但是通常情况下这个交互过程是同步的,这样就会产生大量的等待时间;而异步操作可以在单个函数实例中同时处理多个请求,并且同时接收相应。这样等待时间就平均分摊到了多个请求上,大大减少了请求的等待时长,可以提高实时处理的吞吐量。
2.使用flink异步IO的先决条件
- 需要所连接的数据库支持异步客户端
- 在没有异步客户端的情况下,可以通过创建多个客户端并使用线程池处理同步调用来尝试将同步客户端转变为有限的并发客户端
3. flink异步IO的使用步骤
- 实现AsyncFunction接口;
- 一个回调,该函数取回操作的结果,然后将结果传递给ResultFuture;
- 在DataStream上应用异步IO操作。
4. 使用示例
import scala.concurrent._
import ExecutionContext.Implicits.global
/**
* 使用scala并发包的Future模拟一个异步客户端
*/
class DatabaseClient {
def query: Future[Long] = Future {
System.currentTimeMillis() / 1000
}
}
/** 'AsyncFunction' 的一个实现,向数据库发送异步请求并设置回调
* 改编自官网实例
*/
class AsyncDatabaseRequest extends AsyncFunction[Int, (Int, Long)] {
/** The database specific client that can issue concurrent requests with callbacks */
lazy val client: DatabaseClient = new DatabaseClient
/** The context used for the future callbacks */
implicit lazy val executor: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(str: Int,
resultFuture: ResultFuture[(Int, Long)]): Unit = {
// issue the asynchronous request, receive a future for the result
val resultFutureRequested: Future[Long] = client.query
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
resultFutureRequested.onSuccess {
case result: Long => resultFuture.complete(Iterable((str, result)))
}
}
}
object AsynchronousIOTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data: immutable.Seq[Int] = Range(1, 10)
// 创建数据流
val dataStream: DataStream[Int] = env.fromCollection(data)
// 使用异步IO
val asyn = AsyncDataStream.unorderedWait(
dataStream,//执行异步操作的DataStream
new AsyncDatabaseRequest,//
1000, TimeUnit.MILLISECONDS, //超时时间
100 // 进行中的异步请求的最大数量
)
asyn.print()
env.execute("AsynchronousIOTest")
}
}
结果顺序
AsyncDataStream 有两个静态方法,orderedWait 和 unorderedWait,对应了两种输出模式:有序和无序。
- 有序:消息的发送顺序与接受到的顺序相同(包括 watermark ),也就是先进先出。
- 无序:
在 ProcessingTime 的情况下,完全无序,先返回的结果先发送。
在 EventTime 的情况下,watermark 不能超越消息,消息也不能超越 watermark,也就是说 watermark 定义的顺序的边界。在两个 watermark 之间的消息的发送是无序的,但是在watermark之后的消息不能先于该watermark之前的消息发送。
超时处理
当异步IO请求超时时,默认情况下会引发异常并重新启动作业。如果要处理超时,可以重写AsyncFunction#timeout方法。
override def timeout(input: Int,
resultFuture: ResultFuture[(Int, Long)]): Unit =
super.timeout(input, resultFuture)
容错保证
异步IO运算符提供完全一次的容错保证。它在检查点中存储正在进行的异步请求的记录,并在从故障中恢复时恢复/重新触发请求。
其他资料
关于flink异步IO的更多信息可以参考flink官网或者Flink 原理与实现:Aysnc I/O这篇文章。
Futures和事件驱动编程的知识可以参考《AKKA入门与实践》这本书第二章的内容:Actor与并发。
参考资料:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/asyncio.html
http://wuchong.me/blog/2017/05/17/flink-internals-async-io/
《AKKA入门与实践》