flink - operator - RichAsyncFunction

描述
  1. 异步读取外部数据源,并使用这些外部数据对主流数据进行必要的转换
  2. 要求外部数据源支持异步读取
  3. 要求代码中的client支持发起异步请求
输入

DataStream

输出

DataStream

用法
import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}

class MyAsyncFunc extends RichAsyncFunction[input数据类型, output数据类型] {
    lazy val client = ... // 建立外部数据源的客户端

    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) // 指定executor,可选


    override def asyncInvoke(input: input数据类型, resultFuture: ResultFuture[output数据类型]): Unit = {
      val requryResult = ... // 异步查询请求返回的结果,如Future
      resultFuture.complete(Iterable(requryResult获取到的数据,数据类型需要和output数据类型保持一致)) // 把异步请求的数据转发给flink的异步框架,complete表示主动完成flink的异步请求
    }
  }
  
val resultStream: DataStream[...] = AsyncDataStream.unorderedWait(dataStream, new MyAsyncFunc, 异步请求超时时间, TimeUnit.超时时间的单位, 异步并发数)
示例

使用lettuce库异步读取redis的hash结构数据关联到dataStream中

依赖
<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>6.1.3.RELEASE</version>
</dependency>
class MyAsyncFunc extends RichAsyncFunction[(String, String), (String, String, String, String)] {
    lazy val client: RedisClient = RedisClient.create(RedisURI.builder()
      .withHost("...") // redis host
      .withPort(...) // redis port
      .withPassword("...".toCharArray) // redis password
      .build())
    lazy val conn: StatefulRedisConnection[String, String] = client.connect()
    lazy val async: RedisAsyncCommands[String, String] = conn.async()

    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(input: (String, String), resultFuture: ResultFuture[(String, String, String, String)]): Unit = {
      val category_name_fut: RedisFuture[String] = async.hget(s"category.${input._1}", "category_name")
      val item_name_fut: RedisFuture[String] = async.hget(s"item.${input._2}", "item_name")

    // 使用thenCombineAsync合并两个redisFuture,如果查询单个维度的话直接在redisFuture后执行thenAccept即可
      category_name_fut.thenCombineAsync(item_name_fut,
        (t: String, u: String) => {
          t + "," + u  // BiFunction的返回类型需要与category_name_fut的值类型一致
        })
        .thenAccept(x => resultFuture.complete(Iterable((input._1, x.toString.split(",")(0), input._2, x.toString.split(",")(1)))))
    }
 }
 
 
val resultStream = AsyncDataStream.unorderedWait(dataStream, new MyAsyncFunc, 10000, TimeUnit.MILLISECONDS, 100)

输入:(1,1)
输出: (1,熟食,1,对夹)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容