描述
- 异步读取外部数据源,并使用这些外部数据对主流数据进行必要的转换
- 要求外部数据源支持异步读取
- 要求代码中的client支持发起异步请求
输入
DataStream
输出
DataStream
用法
import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
class MyAsyncFunc extends RichAsyncFunction[input数据类型, output数据类型] {
lazy val client = ... // 建立外部数据源的客户端
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) // 指定executor,可选
override def asyncInvoke(input: input数据类型, resultFuture: ResultFuture[output数据类型]): Unit = {
val requryResult = ... // 异步查询请求返回的结果,如Future
resultFuture.complete(Iterable(requryResult获取到的数据,数据类型需要和output数据类型保持一致)) // 把异步请求的数据转发给flink的异步框架,complete表示主动完成flink的异步请求
}
}
val resultStream: DataStream[...] = AsyncDataStream.unorderedWait(dataStream, new MyAsyncFunc, 异步请求超时时间, TimeUnit.超时时间的单位, 异步并发数)
示例
使用lettuce库异步读取redis的hash结构数据关联到dataStream中
依赖
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.3.RELEASE</version>
</dependency>
class MyAsyncFunc extends RichAsyncFunction[(String, String), (String, String, String, String)] {
lazy val client: RedisClient = RedisClient.create(RedisURI.builder()
.withHost("...") // redis host
.withPort(...) // redis port
.withPassword("...".toCharArray) // redis password
.build())
lazy val conn: StatefulRedisConnection[String, String] = client.connect()
lazy val async: RedisAsyncCommands[String, String] = conn.async()
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(input: (String, String), resultFuture: ResultFuture[(String, String, String, String)]): Unit = {
val category_name_fut: RedisFuture[String] = async.hget(s"category.${input._1}", "category_name")
val item_name_fut: RedisFuture[String] = async.hget(s"item.${input._2}", "item_name")
// 使用thenCombineAsync合并两个redisFuture,如果查询单个维度的话直接在redisFuture后执行thenAccept即可
category_name_fut.thenCombineAsync(item_name_fut,
(t: String, u: String) => {
t + "," + u // BiFunction的返回类型需要与category_name_fut的值类型一致
})
.thenAccept(x => resultFuture.complete(Iterable((input._1, x.toString.split(",")(0), input._2, x.toString.split(",")(1)))))
}
}
val resultStream = AsyncDataStream.unorderedWait(dataStream, new MyAsyncFunc, 10000, TimeUnit.MILLISECONDS, 100)
输入:(1,1)
输出: (1,熟食,1,对夹)