Java8对核心库的改进是新特性中很关键的一点,其中主要包括了对集合类的API的扩充和新引入的流(Stream)。流是Java8表示有序数据,并能灵活地表示这些数据是否可以并行处理的新方式。其思路和在数据库查询语言中的思路类似——即用更高级的方式表达想要的东西,而由“实现”(即Stream库)来选择最佳低级执行机制,这样就可以避免使用synchronized来编写并行代码,减少出错的可能性以及提高多核CPU下的数据处理性能。
从上面的描述中,我们可以看出,流的使用主要是在数据处理方面。我们假设有这样一个需求:需要从给定的交易列表中寻找出大宗交易列表。
传统方式写法:
public List<Trade> fetchBigTrade(List<Trade> tradeList) {
List<Trade> bigTradeList = Lists.newArrayList();
for (Trade trade : tradeList) {
if (trade.isBigTrade()) {
bigTradeList.add(trade);
}
}
return bigTradeList;
}
使用流方式写法:
public List<Trade> fetchBigTrade(List<Trade> tradeList) {
return tradeList.stream()
.filter(trade -> trade.isBigTrade())
.collect(Collectors.toList());
}
使用流方式的代码,我们可以很清晰地理解想要执行的逻辑,先是生成流,其次过滤出大额交易,最后生成列表返回。这样的写法代码不仅简单,而且更直观,另外也去掉了循环迭代及临时变量。
流实际上就是一个遵循管道架构(管道及过滤器模式,云计算的一种设计模式)的自由流动的元素序列。每个流以一组原始数据开始,创建一个管道,使用中间操作(Intermediate Operation)处理通过管道的每个数据,使用终止操作(Terminate Operation)结束流的处理。终止操作是流的最终处理,只有调用了终止操作,流才会产生一个结果,其他中间操作都不产生处理结果。
现在我们想要实现在多核处理器下并行处理该数据,再容易不过了。
return tradeList.stream()
.parallel()
.filter(trade -> trade.isBigTrade())
.collect(Collectors.toList());
可以看到,我们并不需要为并行模式设计多么复杂的类或架构,也不需要使用fork/join框架来管理父job和子job,此处只需要在流中简单地调用一个方法(parallel)即可完成并行的实现。
虽然我们可以看到,切换程序到并行模式是毫无困难的,但是这并不意味着处理速度就一定会得到很大的提高,以及所有的程序都使用并行。这个主要取决于业务逻辑是否真正适合于并行,也需要我们在设计时要注意,如何对我们的数据处理进行划分,以便找出真正适用于并行的数据从而采取并行模式来提高性能。
最后我们再看几个例子,来对Lambda和流加深下印象:
Example 1 一个Callable类型的Lambda表达式,对给定的trade进行各种处理并返回:
Callable<Trade> c = () -> {
Trade t = new Trade("trade1",100000);
purchase(t);
process(t);
return t;
}
Example 2 一个并行处理的流,去统计所有交易的总量:
tradeList.stream()
.parallel()
.map(Trade::getQuantity)
.reduce(Long::sum);
Example 3 找出并且打印交易国家为日本的交易列表名单:
tradeList.stream()
.filter(trade -> trade.getCountry().equals("Japan"))
.map(Trade::getName)
.distinct()
.forEach(System.out::println);