Java8学习笔记之以并发方式在同一个流上执行多种操作

在Java 8中,流有一个非常大的局限性,使用时,对它操作一次仅能得到 一个处理结果。如果试图多次遍历同一个流,将会遭遇下面这样的异常: java.lang.IllegalStateException: stream has already been operated upon or closed
流的设计就是如此,但我们在处理流时经常希望能同时获取多个结果。比如用一个流来解析日志文件,而不是在某个单一步骤中收集多个数据。
希望一次性向流中传递多个Lambda表达式。为达到这一目标,需要一个fork类型的方法,对每个复制的流应用不同的函数。更理想的情况是能以并发的方式执行这些操作,用不同的线程执行各自的运算得到对应的结果。但这些特性目前还没有在Java 8的流实现中提供。另外一种方式是利用一个通用API:Spliterator,尤其是它的延迟绑定能力,结合BlockingQueues和Futures来实现。
要达到在一个流上并发执行多个操作的效果,需要创建一个StreamForker,这个StreamForker会对原始的流进行封装,在此基础之上继续定义希望执行的各种操作。

public class StreamForker<T> {
  private final Stream<T> stream;
  private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();

  public StreamForker(Stream<T> stream) {this.stream = stream;}

  public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
    forks.put(key, f); //使用一个键对流上的函数进行索引
    return this; //返回this从而保证多次流畅地调用fork方法
  }
  public Results getResults() {
    // To be implemented
  }
}

fork方法接受两个参数:

  • key参数:通过它可以取得操作结果,并将这些键/函数对累积到一个内部的Map中。
  • Function参数:它对流进行处理,将流转变为代表这些操作结果的任何类型。

fork方法返回StreamForker自身,因此,可以通过复制多个操作构造一个流水线。


StreamForker详解

这里定义了希望在流上执行的三种操作,这三种操作通过三个键索引标识。StreamForker会遍历原始的流,并创建它的三个副本。这时就可以并行地在复制的流上执行这三种操作,这些函数运行的结果由对应的键进行索引,最终会填入到结果的Map。
所有由fork方法添加的操作的执行都是通过getResults方法调用触发的,该方法返回一个Results接口的实现,具体的定义如下:

public static interface Results {
  public <R> R get(Object key);
}

该接口只有一个方法,可以将fork方法中使用的key对象作为参数传入,方法会返回该键对应的操作结果。

1、使用ForkingStreamConsumer实现Results接口
public Results getResults() {
  ForkingStreamConsumer<T> consumer = build();
  try {
    stream.sequential().forEach(consumer);
  } finally {
    consumer.finish();
  }
  return consumer;
}

ForkingStreamConsumer同时实现了前面定义的Results接口和Consumer接口。它主要的任务就是处理流中的元素,将它们分发到多个BlockingQueues中处理,BlockingQueues的数量和通过fork方法提交的操作数是一致的。如果你在一个并发流上执行forEach方法,它的元素可能就不是顺序地被插入到队列中了。finish方法会在队列的末尾插入特殊元素表明该队列已经没有更多需要处理的元素了。build方法主要用于创建ForkingStreamConsumer。
使用build方法创建ForkingStreamConsumer:

private ForkingStreamConsumer<T> build() {
  List<BlockingQueue<T>> queues = new ArrayList<>();//创建由队列组成的列表,每个队列对应一个操作
  Map<Object, Future<?>> actions = forks.entrySet().stream().reduce(
      new HashMap<Object, Future<?>>(), 
      (map, e) -> {
        map.put(e.getKey(), getOperationResult(queues, e.getValue()));
        return map;
      },
      (m1, m2) -> {
        m1.putAll(m2);
        return m1;
      });
  return new ForkingStreamConsumer<>(queues, actions);
}

首先创建了由BlockingQueues组成的列表,接着创建了一个Map,Map的键就是在流中用于标识不同操作的键,值包含在Future中,Future中包含了这些操作对应的处理结果。BlockingQueues的列表和Future组成的Map会被传递给 ForkingStreamConsumer的构造函数。每个Future都是通过getOperationResult方法创建的,代码如下:

private Future<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> f) {
  BlockingQueue<T> queue = new LinkedBlockingQueue<>();
  queues.add(queue);//创建一个队列,并将其添加到队列的列表中
  Spliterator<T> spliterator = new BlockingQueueSpliterator<>(queue);//创建一个 Spliterator,遍历队列中的元素
  Stream<T> source = StreamSupport.stream(spliterator, false);//创建一个流,将 Spliterator作为数据源
  return CompletableFuture.supplyAsync(() -> f.apply(source));//创建一个Future对象,以异步方式计算在流上执行特定函数的结果
}

getOperationResult方法会创建一个新的BlockingQueue,并将其添加到队列的列表。这个队列会被传递给一个新的BlockingQueueSpliterator对象,后者是一个延迟绑定的Spliterator,它会遍历读取队列中的每个元素;接下来创建了一个顺序流对该Spliterator进行遍历,最终会创建一个Future在流上执行某个希望的操作并收集其结果。这里的Future使用CompletableFuture类的一个静态工厂方法创建,CompletableFuture实现了Future接口。

2、开发ForkingStreamConsumer和BlockingQueueSpliterator

实现ForkingStreamConsumer类,为其添加处理多个队列的流元素:

static class ForkingStreamConsumer<T> implements Consumer<T>, Results {
  static final Object END_OF_STREAM = new Object();
  private final List<BlockingQueue<T>> queues;
  private final Map<Object, Future<?>> actions;
  ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) {
    this.queues = queues;
    this.actions = actions;
  }

  @Override
  public void accept(T t) {
    queues.forEach(q -> q.add(t));//将流中遍历的元素添加到所有的队列中
  }
  
  void finish() {
    accept((T) END_OF_STREAM);//将最后一个元素添加到队列中,表明该流已经结束
  }

  @Override
  public <R> R get(Object key) {
    try {
      return ((Future<R>) actions.get(key)).get();//等待Future完成相关的计算,返 回由特定键标识的处理结果
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

这个类同时实现了Consumer和Results接口,并持有两个引用,一个指向由BlockingQueues组成的列表,另一个指向了由Future构成的Map结构,它们表示的是即将在流上执行的各种操作。
Consumer接口要求实现accept方法。每当ForkingStreamConsumer接受流中的一 个元素,它就会将该元素添加到所有的BlockingQueues中。另外,当原始流中的所有元素都添加到所有队列后,finish方法会将最后一个元素添加所有队列。BlockingQueueSpliterators碰到最后这个元素时会知道队列中不再有需要处理的元素了。
Results接口需要实现get方法。一旦处理结束,get方法会获得Map中由键索引的Future,解析处理的结果并返回。
最后,流上要进行的每个操作都会对应一个BlockingQueueSpliterator。每个BlockingQueueSpliterator都持有一个指向BlockingQueues的引用,这个BlockingQueues是由ForkingStreamConsumer生成的。
遍历BlockingQueue并读取其中元素的Spliterator:

class BlockingQueueSpliterator<T> implements Spliterator<T> {
  private final BlockingQueue<T> q;
  BlockingQueueSpliterator(BlockingQueue<T> q) {this.q = q;}
 
  @Override
  public boolean tryAdvance(Consumer<? super T> action) {
    T t;
    while (true) {
      try {
        t = q.take();
        break;
      } catch (InterruptedException e) {}
    } 
 
    if (t != ForkingStreamConsumer.END_OF_STREAM) {
      action.accept(t);
      return true;
    }
    return false;
  }

  @Override
  public Spliterator<T> trySplit() {return null;}
 
  @Override
  public long estimateSize() {return 0;}
 
  @Override
  public int characteristics() {return 0;}
}

这段代码实现了一个Spliterator,它并未定义如何切分流的策略,仅仅利用了流的 延迟绑定能力。由于这个原因,它也没有实现trySplit方法。
由于无法预测能从队列中取得多少个元素,所以estimatedSize方法也无法返回任何有意义的值。

这段代码中提供了实现的唯一方法是tryAdvance,它从BlockingQueue中取得原始流中的元素,而这些元素最初由ForkingSteamConsumer添加。依据getOperationResult方法创建Spliterator同样的方式,这些元素会被作为进一步处理流的源头传递给Consumer对象(在流上要执行的函数会作为参数传递给某个fork方法调用)。tryAdvance方法返回true通知调用方还有其他的元素需要处理,直到它发现由ForkingSteamConsumer添加的特殊对象,表明队列中已经没有更多需要处理的元素了。
StreamForker及其合作的构造块

图中左上角的StreamForker中包含一个Map结构,以方法的形式定义了流上要执行 的操作,这些方法分别有对应的键索引。右边的ForkingStreamConsumer为每一种操作的对象维护了一个队列,原始流中的所有元素会被分发到这些队列中。
图的下半部分,每一个队列都有一个BlockingQueueSpliterator从队列中提取元素作为各个流处理的源头。最后,由原始流复制创建的每个流,都会被作为参数传递给某个处理函数,执行对应的操作。
3、将StreamForker用于实战

使用StreamForker,通过复制原始的菜肴(dish)流,以并发的方式执行四种不同的操作。生成一份由逗号分隔的菜肴名列表,计算菜单的总热量,找出热量最高的菜肴,并按照菜的类型对这些菜进行分类。

Stream<Dish> menuStream = menu.stream();
StreamForker.Results results = new StreamForker<Dish>(menuStream)
  .fork("shortMenu", s -> s.map(Dish::getName).collect(joining(", ")))
  .fork("totalCalories", s -> s.mapToInt(Dish::getCalories).sum())
  .fork("mostCaloricDish", s -> s.collect(reducing((d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2)).get())
  .fork("dishesByType", s -> s.collect(groupingBy(Dish::getType)))
  .getResults();
String shortMenu = results.get("shortMenu");
int totalCalories = results.get("totalCalories");
Dish mostCaloricDish = results.get("mostCaloricDish");
Map<Dish.Type, List<Dish>> dishesByType = results.get("dishesByType");

System.out.println("Short menu: " + shortMenu);
System.out.println("Total calories: " + totalCalories);
System.out.println("Most caloric dish: " + mostCaloricDish);
System.out.println("Dishes by type: " + dishesByType);

StreamForker提供了一种使用简便、结构流畅的API,它能够复制流,并对每个复制的流施加不同的操作。这些应用在流上以函数的形式表示,可以用任何对象的方式标识。
如果你没有更多的流需要添加,可以调用StreamForker的getResults方法,触发所有定义的操作开始执行,并取得StreamForker.Results。这些操作的内部实现是异步的,getResults方法调用后会立刻返回,不会等待所有的操作完成拿到所有的执行结果才返回。
可以通过向StreamForker.Results接口传递标识特定操作的键取得某个操作的结果。如果该时刻操作已经完成,get方法会返回对应的结果;否则,该方法会阻塞,直到计算结束,取得对应的操作结果。

--参考文献《Java8实战》

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容