Java 8新特性概览
- Lambda表达式
- 流
- 默认方法
Lambda表达式
-
Lambda和函数式接口
Java 8中新增了函数 -- 值的一种新形式. 在运行时传递方法/函数能将方法/函数变成"一等值"(传统编程语言中, 值(eg: 实例)是"一等公民", 值的结构(eg: 类和方法)是"二等公民").
Lambda是表示函数的强有力的方式. 它允许你直接以内联的形式为函数式接口的抽象方法提供实现, 并把整个表达式作为函数式接口的实例.-
函数式接口: 只定义一个抽象方法的接口.
@FunctionalInterface public interface Predicate<T> { boolean test(T t); }
-
Lambda的签名: 函数式接口的抽象方法的签名基本上就是Lambda表达式的签名 (只不过Lambda没有方法名).
Predicate<T> ((T) -> boolean)
-
-
Lambda与方法引用 (从现有方法获取函数)
静态方法引用: [className]::[method]
-
实例方法引用: [className]::[method]
注意, 实例方法引用和静态方法引用格式相同; 区别是, 实例方法引用需要以每个元素为引用调用指定的方法
list.stream().map(Objects::isNull /* 静态方法引用, 每个元素作为toString的参数, 元素本身不需要有该方法 */) list.stream().map(Object::toString /* 实例方法引用, 每个元素作为toString的调用对象 */)
已有对象的方法引用: [objName]::[method]
-
Java 8自带的常用函数式接口(java.util.function包)
- Predicate<T>, Consumer<T>, Function<F, T>, Supplier<T>, UnaryOperator<T>
- BiPredicate<L, R>, BiConsumer<T, U>, BiFunction<T, U, R>, BinaryOperator<T>
- 特化的函数: IntFunction<R>, LongFunction<R>, DoubleFunction<R>
流
-
流和集合
-
流
- 从支持数据处理操作的源生成的一系列元素
-
流与集合
- 一次性迭代: 只能遍历一次
- 内部迭代:环绕执行模式
Collection主要是为了存储和访问数据, 而Stream则主要用于描述对数据的计算.
与集合不同, 流并没有把所有元素保存在内存中, 而是实时计算需要的元素, 对同一个流而言这个过程是不可重置的, 每次迭代都必须生成新的流;
流的迭代是内置的, 对流的使用往往一气呵成 -- 我们只管组装/转换流, 通过函数定制流的行为, 并最终消费流, 其中对每一个元素的命令式迭代不需要我们来关心. -
流的操作类型
- 中间操作: 给流附加转换/计算规则, 将流包装成另一个流 (可以把流组装成包含复杂操作的流水线)
- 终端操作: 驱动流的计算, 得到结果
要完成一次流式计算, 一个流要经过"零个或多个中间操作"将自己组装成流水线, 然后由"一个终端操作"最终驱动流水线的运行;
流是慵懒的, 这意味着, 只有调用了终端操作, 才会发生实际计算.
-
-
流的基本使用
- 流的基本操作 (Stream API)
- 筛选和切片: filter, limit, skip
- 映射: map, flatMap
- 查找和匹配: anyMatch, allMatch, noneMatch, findFirst, findAny
- 归约: reduce
- 数值流
- 原始类型特化:IntStream, LongStream, DoubleStream
- 流的构建
- 集合: Collection.stream
- 值: Stream.of
- 数组: Arrays.stream
- 文件: Files.lines
- 无限流: Stream.iterate, Stream.generate
- 流的调试
- 查看流的运行轨迹: Stream.peek
- 流的基本操作 (Stream API)
-
流和收集器
- 流的收集
- 归约和汇总: ...
- 分组(多级分组): Collectors.groupingBy
- 分区(多级分区): Collectors.partitioningBy
- 收集器:
-
Collector接口
public interface Collector<T, A, R> { // A function that creates and returns a new mutable result container. Supplier<A> supplier(); // A function that folds a value into a mutable result container. BiConsumer<A, T> accumulator(); // A function that accepts two partial results and merges them. BinaryOperator<A> combiner(); // Perform the final transformation from the intermediate accumulation type "A" to the final result type "R". Function<A, R> finisher(); // Returns a Set of "Collector.Characteristics" indicating the characteristics of this Collector. // This set should be immutable. Set<Characteristics> characteristics(); }
-
自定义收集
参见Collectors工具类中的默认实现, eg:
toList(), toSet(), toCollection()
counting(), maxBy()
groupingBy(), reducing(), joining()
-
- 流的收集
-
流的并行化
- 并行流:
- 顺序流并行化
- 并行化的考虑(正确并行,高效并行)
- 分支合并框架:
- 分支合并框架的原理和实践
- 工作窃取
- 新型可拆分的迭代器:
-
Spliterator接口
public interface Spliterator<T> { // If a remaining element exists, performs the given action on it, returning true; else returns false. boolean tryAdvance(Consumer<? super T> action); // If this spliterator can be partitioned, returns a Spliterator covering elements, that will, // upon return from this method, not be covered by this Spliterator. Spliterator<T> trySplit(); // Returns an estimate of the number of elements that would be encountered by a #forEachRemaining traversal, // or returns Long#MAX_VALUE if infinite, unknown, or too expensive to compute. long estimateSize(); // Returns a set of characteristics of this Spliterator and its elements. int characteristics(); }
Stream/Spliterator vs Iterable/Iterator
-
- 并行流:
默认方法
// TODO
Java 8与响应式编程
组合式异步编程:
Future
->ListenableFuture
->CompletableFuture
异步化和异常处理
-
异步任务和组合/连接/...
-
Future/RunnableFuture
异步任务的提交/执行: ExecutorService.submit/execute
判断完成状态: isDone, isCancelled
进入完成状态: run, cancel
-
获取完成结果: get
// 假设Math.random方法是一个耗时较长的方法, 需要使用异步计算 Future<Double> future = executorService.submit(Math::random); // 执行到需要依赖future的返回结果的位置, 需要获取结果, 可能导致阻塞 // 可能需要预先判断(isDone, isCancelled) && 需要捕获异常(ExecutionException, InterruptedException) Double result = future.get();
-
ListenableFuture (extends Future)
-
添加异步回调: addListener
// 将普通线程池装饰成ListeningExecutorService, 从而可以返回ListenableFuture ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService); // 假设Math.random方法是一个耗时较长的方法, 需要使用异步计算 ListenableFuture<Double> listenableFuture = listeningExecutorService.submit(Math::random); // 假设有后续操作需要依赖listenableFuture的返回结果, 可以以回调的方式异步处理, 避免阻塞 // Futures.addCallback封装了异常处理, 并且回调的方式避免了对任务状态的判断 // 注意, 这里回调的执行使用了同一个线程池; 如果不指定线程池, 则默认在原任务线程中执行回调 Futures.addCallback(listenableFuture, new FutureCallback<Double>() { @Override public void onSuccess(Double aDouble) { // on success } @Override public void onFailure(Throwable throwable) { // on failure } }, executorService);
可以看到, 异步API的返回/响应方式, 要么是通过回调函数, 要么是由调用方再次执行一个"等待, 直到计算完成"的方法调用.
-
-
CompletableFuture (implements Future, CompletionStage)
Future接口的局限性: 难以表述Future结果之间的依赖性.
我们需要更具描述能力的特性, 考虑以下场景:- 将两个异步计算合并为一个
- 等待Future集合中的所有任务都完成
- 仅等待Future集合中最快结束的任务完成
- 以手工设定异步操作结果的方式完成一个Future任务的执行
- 应对Future的完成事件
CompletableFuture和Stream的设计都遵循了类似的模式: 它们都使用了Lambda表达式以及流水线的思想.
从这个角度, 你可以说CompletableFuture和Future的关系就跟Stream和Collection的关系一样.-
同步API转异步化API
// 假设getPrice方法是一个耗时较长的方法, 需要改成异步API public Double getPrice() { return Math.random() * 10000; } // 1.1 public Future<Double> getPriceAsync() { CompletableFuture<Double> completableFuture = new CompletableFuture<>(); new Thread(() -> { try { double price = getPrice(); completableFuture.complete(price); // 通过设置返回值的方式完成Future (由执行任务的线程) } catch (Exception e) { completableFuture.completeExceptionally(e); // 记录异常, 以便在Future.get中重新抛出 } }).start(); return completableFuture; } // 1.2 public Future<Double> getPriceAsyncV2() { return CompletableFuture.supplyAsync(this::getPrice); // 通过CompletableFuture的supplyAsync工厂方法 }
-
并行流 vs CompletableFuture流
// 2.1 public List<Double> getPrices() { // 模拟100次请求 IntStream intStream = IntStream.iterate(1, i -> i + 1).limit(100); // 并行化 Stream<Double> doubleStream = intStream.parallel().mapToObj(i -> getPrice()); return doubleStream.collect(Collectors.toList()); } // 2.2 public List<Double> getPricesAsync() { // 模拟100次请求 IntStream intStream = IntStream.iterate(1, i -> i + 1).limit(100); // 异步化; 需要调用collect触发所有异步任务并返回future引用 List<CompletableFuture<Double>> futures = intStream.mapToObj(i -> CompletableFuture.supplyAsync(this::getPrice)) .collect(Collectors.toList()); // 获取结果; CompletableFuture.join类似get, 区别是不会抛出任何受检异常 (适合用作函数进行传递) return futures.stream().map(CompletableFuture::join).collect(Collectors.toList()); }
CompletableFuture的优势:
- 灵活的线程池配置 (N[threads] = N[cpu] * U[cpu] * (1 + W/C))
- 对顺序流(不易并行的)流有同样好的效果
- 无论是IO密集型还是CPU密集型都同样适用
-
组合式异步 (CompletableFuture + Stream, 同步 + 异步)
// 假设getDiscountedPrice方法是另一个耗时较长的方法, 需要用于和getPrice方法的结果组合生成折扣价 private Double getDiscountedPrice(Double price) { return price * 0.88d; } // 3. public List<Double> getDiscountedPricesAsync() { // 模拟100次请求 IntStream intStream = IntStream.iterate(1, i -> i + 1).limit(100); List<CompletableFuture<Double>> futures = intStream.mapToObj(i -> CompletableFuture.supplyAsync(this::getPrice)) // 组合同步调用: 打印future的初步结果 .map(future -> future.thenApply(this::print)) // 组合异步调用: 计算折扣价 // 这里虽然调用的是thenCompose而不是thenComposeAsync, 但是使用了CompletableFuture.supplyAsync, 因此仍是异步的 .map(future -> future.thenCompose(r2 -> CompletableFuture.supplyAsync(() -> getDiscountedPrice(r2)))) .collect(Collectors.toList()); // 注意future之间的同步/异步依赖关系 return futures.stream().map(CompletableFuture::join).collect(Collectors.toList()); } private Double print(Double d) { System.out.println(d); return d; }
通常而言, 同步的调用和它的前一个任务一样, 在同一个线程中运行(future.isDone()的情况下会由调用线程直接执行?!); 而异步的调用会将后续的任务提交到一个线程池, 由不同的线程处理.
可以类比带Executor参数和不带Executor参数的Futures.addCallback方法. -
来自CompletionStage接口的方法 (下列的每个方法几乎都有个对应的异步版本, 方法名带Async后缀):
thenApply, thenAccept, thenRun, thenCombine, thenCompose
thenAcceptBoth, runAfterBoth
applyToEither, acceptEither, runAfterEither
exceptionally, whenComplete, handle
-
toCompletableFuture
// TODO
-
Java 8与函数式编程
// TODO