Flink处理数据时候,遇到比较耗时的操作时,需要异步处理数据。
例子如下:
DataStream<Order> asyncStream = AsyncDataStream.unorderedWait(orderStream, new RichAsyncFunction<Order, Order>() {
public transient ThreadPoolExecutor executor;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executor = new ThreadPoolExecutor(5, //
10, 1, TimeUnit.MINUTES, new LinkedBlockingDeque<>());//
}
@Override
public void close() throws Exception {
super.close();
executor.shutdownNow();
}
@Override
public void timeout(SkuOrder input, ResultFuture<SkuOrder> resultFuture) {
//超时后的处理
}
@Override
public void asyncInvoke(Order input, ResultFuture<Order> resultFuture) throws Exception {
CompletableFuture.runAsync( ()->{
int mills = new Random().nextInt(10);
System.out.println("异步处理数据:" + Thread.currentThread().getId() + "|" + JSON.toJSONString(input));
try {
TimeUnit.SECONDS.sleep(mills);
} catch (InterruptedException e) {
e.printStackTrace();
}
resultFuture.complete(Collections.singleton(input));
},executor);
}
},1, TimeUnit.MINUTES, 1000).setParallelism(1);
说明:
1、AsyncDataStream有2个方法,unorderedWait表示数据不需要关注顺序,处理完立即发送,orderedWait表示数据需要关注顺序,为了实现该目标,操作算子会在该结果记录之前的记录为发送之前缓存该记录。这往往会引入额外的延迟和一些Checkpoint负载,因为相比于无序模式结果记录会保存在Checkpoint状态内部较长的时间。
2、Timeout配置,主要是为了处理死掉或者失败的任务,防止资源被长期阻塞占用。
3、最后一个参数Capacity表示同时最多有多少个异步请求在处理,异步IO的方式会导致更高的吞吐量,但是对于实时应用来说该操作也是一个瓶颈。限制并发请求数,算子不会积压过多的未处理请求,但是一旦超过容量的显示会触发背压。
该参数可以不配置,但是默认是100。