Java8 学习总结 - 函数式数据处理

1. 为什么要用流?

先看下流处理有什么优势吧:
下面两段代码都是用来返回低热量的菜肴名称的,并按照卡路里排序,一个是用Java 7写的,另一个是用Java 8的流写的。
之前(Java 7):

List<Dish> lowCaloricDishes = new ArrayList<>();
  for(Dish d: menu){
    if(d.getCalories() < 400){
      lowCaloricDishes.add(d);
    }
  }

Collections.sort(lowCaloricDishes, new Comparator<Dish>() {
  public int compare(Dish d1, Dish d2){
    return Integer.compare(d1.getCalories(), d2.getCalories());
  }
});

List<String> lowCaloricDishesName = new ArrayList<>();
for(Dish d: lowCaloricDishes){
  lowCaloricDishesName.add(d.getName());
}

在这段代码中,你用了一个“垃圾变量”lowCaloricDishes。它唯一的作用就是作为一次
性的中间容器。在Java 8中,实现的细节被放在它本该归属的库里了。
之后(Java 8):

import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;

List<String> lowCaloricDishesName = menu.stream().filter(d -> d.getCalories() < 400)
                                                 .sorted(comparing(Dish::getCalories))
                                                 .map(Dish::getName)
                                                 .collect(toList());

为了利用多核架构并行执行这段代码,你只需要把stream()换成parallelStream():

List<String> lowCaloricDishesName = menu.parallelStream().filter(d -> d.getCalories() < 400)
                                                         .sorted(comparing(Dishes::getCalories))
                                                         .map(Dish::getName)
                                                         .collect(toList());

总结一下,Java 8中的Stream API可以让你写出这样的代码:

  • 声明性——更简洁,更易读
  • 可复合——更灵活
  • 可并行——性能更好

2. 流到底是什么呢?

简短的定义就是“从支持数据处理操作的源生成的元素序列”。

  • 元素序列——就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据结构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如ArrayList 与 LinkedList)。但流的目的在于表达计算,比如你前面见到的filter、sorted和map。集合讲的是数据,流讲的是计算。我们会在后面几节中详细解释这个思想。
  • 源——流会使用一个提供数据的源,如集合、数组或输入/输出资源。 请注意,从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。
  • 数据处理操作——流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,如filter、map、reduce、find、match、sort等。流操作可以顺序执行,也可并行执行。
    此外,流操作有两个重要的特点。
  • 流水线——很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大的流水线。这让我们下一章中的一些优化成为可能,如延迟和短路。流水线的操作可以看作对数据源进行数据库式查询。
  • 内部迭代——与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的

3. 流与集合

流与集合

请注意,和迭代器类似,流只能遍历一次。遍历完之后,我们就说这个流已经被消费掉了。

4. 流操作

可以连接起来的流操作称为中间操作,关闭流的操作称为终端操作。

流操作

总而言之,流的使用一般包括三件事:

  1. 一个数据源(如集合)来执行一个查询;
  2. 一个中间操作链,形成一条流的流水线;
  3. 一个终端操作,执行流水线,并能生成结果。
操作分类

5. 使用流

  • 筛选和切片
  1. 筛选 -> filter(Lambda表达式)
  2. 去重 -> distinct()
  3. 截短 -> limit(n)
  4. 跳过 -> skip(n)

-映射
map:
给定一个数字列表,如何返回一个由每个数的平方构成的列表呢?例如,给定[1, 2, 3, 4,5],应该返回[1, 4, 9, 16, 25]。

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> squares =numbers.stream().map(n -> n * n).collect(toList());

flatmap:
给定两个数字列表,如何返回所有的数对呢?例如,给定列表[1, 2, 3]和列表[3, 4],应该返回[(1, 3), (1, 4), (2, 3), (2, 4), (3, 3), (3, 4)]。为简单起见,你可以用有两个元素的数组来代表数对。

List<Integer> numbers1 = Arrays.asList(1, 2, 3);
List<Integer> numbers2 = Arrays.asList(3, 4);
List<int[]> pairs =numbers1.stream().
                            flatMap(i -> numbers2.stream().map(j -> new int[]{i, j})).
                            collect(toList());
  • 查找和匹配
    Stream API通过allMatch、anyMatch、noneMatch、findFirst和findAny方法提供了这样的工具

  • 归约
    元素求和:
    int sum = numbers.stream().reduce(0, (a, b) -> a + b);

求和

实践
(1) 找出2011年发生的所有交易,并按交易额排序(从低到高)

List<Transaction> sorted = transactions.stream().sorted(Comparator.comparing(Transaction::getValue)).collect(toList());

(2) 交易员都在哪些不同的城市工作过?

List<String> cities = transactions.stream().map(transaction -> transaction.getTrader().getCity()).distinct().collect(toList());

(3) 查找所有来自于剑桥的交易员,并按姓名排序。

List<Trader> cambridge = transactions.stream().filter(transaction -> transaction.getTrader().getCity() == "Cambridge").
                                               map(transaction -> transaction.getTrader()).
                                               distinct().
                                               sorted(Comparator.comparing(Trader::getName)).
                                               collect(toList());

(4) 返回所有交易员的姓名字符串,按字母顺序排序。

List<String> traders = transactions.stream().map(transaction -> transaction.getTrader().getName()).
                                             distinct().
                                             sorted(Comparator.comparing(String::toString)).
                                             collect(toList());

(5) 有没有交易员是在米兰工作的?

boolean match = transactions.stream().anyMatch(transaction -> transaction.getTrader().getCity() == "Milan");

(6) 打印生活在剑桥的交易员的所有交易额。

int total = transactions.stream().filter(transaction -> transaction.getTrader().getCity() == "Cambridge").map(transaction -> transaction.getValue()).reduce(0, (a,b) -> a+b);

(7) 所有交易中,最高的交易额是多少?

int max = transactions.stream().map(transaction -> transaction.getValue()).reduce(Integer::max).get()

(8) 找到交易额最小的交易。

Transaction min = transactions.stream().reduce((a,b)-> a.getValue() > b.getValue()? b:a).get();

6. 创建流

  • 由值创建流
Stream<String> stream = Stream.of("Java 8 ", "Lambdas ", "In ", "Action");
  • 由数组创建流
int[] numbers = {2, 3, 5, 7, 11, 13};
int sum = Arrays.stream(numbers).sum();
  • 由文件生成流
    Java中用于处理文件等I/O操作的NIO API(非阻塞 I/O)已更新,以便利用Stream API。java.nio.file.Files中的很多静态方法都会返回一个流。 例如,一个很有用的方法是Files.lines,它会返回一个由指定文件中的各行构成的字符串流。

  • 由函数生成流:创建无限流
    1. 迭代
    Stream.iterate(0, n -> n + 2).limit(10).forEach(System.out::println);
    2. 生成
    Stream.generate(Math::random).limit(5).forEach(System.out::println);

7. 用流收集数据

  • 归约和汇总
    (1) 查找流中的最大值和最小值
    Optional<Dish> mostCalorieDish = menu.stream() .collect(maxBy(dishCaloriesComparator));
    (2) 汇总
    int totalCalories = menu.stream().collect(summingInt(Dish::getCalories));
    double avgCalories = menu.stream().collect(averagingInt(Dish::getCalories));
    (3) 连接字符串
    String shortMenu = menu.stream().map(Dish::getName).collect(joining());

  • 分组
    (1) 如果有定义的function
    Map<Dish.Type, List<Dish>> dishesByType = menu.stream().collect(groupingBy(Dish::getType));
    (2) 自己分组:

public enum CaloricLevel { DIET, NORMAL, FAT }
Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(groupingBy(
        dish -> {
        if (dish.getCalories() <= 400) return CaloricLevel.DIET;
        else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
        else return CaloricLevel.FAT;
        } ));

(3) 还可以多级分组

Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel =
menu.stream().collect(
        groupingBy(Dish::getType,
            groupingBy(dish -> {
                if (dish.getCalories() <= 400) return CaloricLevel.DIET;
                else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
                else return CaloricLevel.FAT;
            } )
        )
);

(4) 按子组收集数据

Map<Dish.Type, Long> typesCount = menu.stream().collect(groupingBy(Dish::getType, counting()));

8. 自定义Collector

下面是Collector的接口

public interface Collector<T, A, R> {
  Supplier<A> supplier();
  BiConsumer<A, T> accumulator();
  Function<A, R> finisher();
  BinaryOperator<A> combiner();
  Set<Characteristics> characteristics();
}
  • 建立新的结果容器:supplier方法
  • 将元素添加到结果容器:accumulator方法
  • 对结果容器应用最终转换:finisher方法
  • 合并两个结果容器:combiner方法
  • characteristics方法
归约过程

9. 并行数据处理

  • 使用RecursiveTask
    要定义RecursiveTask,只需实现它唯一的抽象方法compute:
    protected abstract R compute();
fork-join过程

示例:

public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 10_000;
public ForkJoinSumCalculator(long[] numbers) {
  this(numbers, 0, numbers.length);
}

private ForkJoinSumCalculator(long[] numbers, int start, int end) {
  this.numbers = numbers;
  this.start = start;
  this.end = end;
}

@Override
protected Long compute() {
  int length = end - start;
  if (length <= THRESHOLD) {
    return computeSequentially();
  }
  ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
  leftTask.fork();
  ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
  Long rightResult = rightTask.compute();
  Long leftResult = leftTask.join();
  return leftResult + rightResult;
}

private long computeSequentially() {
  long sum = 0;
  for (int i = start; i < end; i++) {{
    sum += numbers[i];
  }
  return sum;
}

//执行
public static long forkJoinSum(long n) {
  long[] numbers = LongStream.rangeClosed(1, n).toArray();
  ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
  return new ForkJoinPool().invoke(task);
}
  • Spliterator
    示例
class WordCounterSpliterator implements Spliterator<Character> {
  private final String string;
  private int currentChar = 0;
  public WordCounterSpliterator(String string) {
    this.string = string;
  }

@Override
public boolean tryAdvance(Consumer<? super Character> action) {
  action.accept(string.charAt(currentChar++));
  return currentChar < string.length();
}

@Override
public Spliterator<Character> trySplit() {
  int currentSize = string.length() - currentChar;
  if (currentSize < 10) {
    return null;
  }
  for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
    if (Character.isWhitespace(string.charAt(splitPos))) {
      Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
      currentChar = splitPos;
      return spliterator;
    }
  }
  return null;
}

@Override
public long estimateSize() {
  return string.length() - currentChar;
}

@Override
public int characteristics() {
  return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
}
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • Java8 in action 没有共享的可变数据,将方法和函数即代码传递给其他方法的能力就是我们平常所说的函数式...
    铁牛很铁阅读 1,225评论 1 2
  • Java 8的新特性可以帮助你: 1.使用Java 8可以减少冗长的代码,让代码更易于理解 2.通过方法引用和St...
    Phoenix小彬阅读 947评论 0 2
  • 第一章 为什么要关心Java 8 使用Stream库来选择最佳低级执行机制可以避免使用Synchronized(同...
    谢随安阅读 1,490评论 0 4
  • 这段时间,对众多人来说,又开始了一轮寻寻觅觅。 “找工作不仅仅是看企业提供的职位高低、薪水多少,而且从某种角度讲,...
    喵小姐huan阅读 626评论 0 0