一 Stream流编程-概念
1⃣️ 概念
实际上Stream就是数据在程序和外部设备的单向管道, 流的各种方法相当于管道上的各种按钮.可以把Stream流理解成现实中的流水线;
2⃣️ 外部迭代和内部迭代
2.1 外部迭代
int[] nums = { 1, 2, 3 };
// 外部迭代
int sum = 0;
for (int i : nums) {
sum += I;
}
System.out.println("结果为:" + sum);
外部迭代可以理解为使用for或者while这样的循环进行操作的方式;
2.2 内部迭代
int sum2 = IntStream.of(nums).sum();
System.out.println("结果为:" + sum2);
2.3 外部迭代和内部迭代的区别
- 从代码风格来看,内部迭代的代码明显比较简短;
- 性能更高,使用内部迭代可以使用很多高级的特性,比如短路 并行等;
3⃣️ 中间操作/终止操作和惰性求值
3.1 中间操作与终止操作
int sum2 = IntStream.of(nums).map(StreamDemo1::doubleNum).sum();
System.out.println("结果为:" + sum2);
其中map就是中间操作,sum就是终止操作;
如何区分中间操作与终止操作,我们只需要看返回类型即可,如果返回的是一个Sream流那么就是一个中间操作,如果没有返回就是终止操作;
3.2 惰性求值
惰性求值的意思就是如果终止操作没有被调用,中间操作不会被执行;
IntStream.of(nums).map(StreamDemo1::doubleNum);
比如上边这段代码,由于没有终止操作sum,所以map中间操作不会执行;
二 流的创建
1⃣️ 完整的流是什么样子的?
完整的流 = 创建 + 中间操作 + 终止操作;
2⃣️ 都有哪些创建方式?
流的常用创建方式
3⃣️ 代码演示流创建的不同方式
package stream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class StreamDemo2 {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
// 从集合创建流
list.stream();
list.parallelStream();
// 从数组创建流
Arrays.stream(new int[] { 2, 3, 5 });
// 创建数字流
IntStream.of(1, 2, 3);
IntStream.rangeClosed(1, 10);
// 使用Random创建一个无限流,创建无限流需要注意的是必须有限制条件,否则流不会停止;
new Random().ints().limit(10);
Random random = new Random();
// 自己创建无限流,同时给出限制条件
Stream.generate(() -> random.nextInt()).limit(20);
}
}
三 流的中间操作
- 常用的中间操作。
流的中间操作
- 无状态操作与有状态操作
无状态操作:意思就是当前的操作与元素的前后没有依赖关系;
有状态操作:当前的操作依赖于其他元素;
共同点:不论是有状态操作还是无状态操作,最终都会返回一个Stream流,可以继续使用链式的操作调用下去;
代码演示:
package stream;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class StreamDemo3 {
public static void main(String[] args) {
String str = "my name is 007";
// 把每个单词的长度打印出来
Stream.of(str.split(" ")).filter(s -> s.length() > 2)
.map(s -> s.length()).forEach(System.out::println);
// flatMap A->B属性(B属性是个集合),最终得到所有的A元素里面的所有B属性集合
// intStream/longStream 并不是Stream的子类,所以需要装箱操作boxed;
Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed())
.forEach(i -> System.out.println((char) i.intValue()));
// peek是个中间操作常用于debug,forEach是个终止操作;
System.out.println("--------------peek------------");
Stream.of(str.split(" ")).peek(System.out::println)
.forEach(System.out::println);
// limit的使用,主要用于对无限流的限制操作
new Random().ints().filter(i -> i > 100 && i < 1000).limit(10)
.forEach(System.out::println);
}
}
四 Stream流的终止操作
- 常用的终止操作
Stream流的终止操作
- 短路操作于非短路操作
短路操作:不需要等待流的结果都计算完就可以执行的操作;
非短路操作:概念与短路操作相反;
代码演示:
package stream;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class StreamDemo4 {
public static void main(String[] args) {
String str = "my name is 007";
// 使用并行流
str.chars().parallel().forEach(i -> System.out.print((char) i));
System.out.println();
// 使用forEachOrdered保证顺序
str.chars().parallel().forEachOrdered(i -> System.out.print((char) i));
// 收集到List
List<String> list = Stream.of(str.split(" "))
.collect(Collectors.toList());
System.out.println(list);
// 使用reduce拼接字符串
Optional<String> letters = Stream.of(str.split(" "))
.reduce((s1, s2) -> s1 + "|" + s2);
System.out.println(letters.orElse(""));
// 带初始化值的reduce
String reduce = Stream.of(str.split(" ")).reduce("",
(s1, s2) -> s1 + "|" + s2);
System.out.println(reduce);
// 计算所有单词的总长度
Integer length = Stream.of(str.split(" ")).map(s -> s.length())
.reduce(0, (s1, s2) -> s1 + s2);
System.out.println(length);
// max的使用
Optional<String> max = Stream.of(str.split(" "))
.max((s1, s2) -> s1.length() - s2.length());
System.out.println(max.get());
// 使用findFirst短路操作
OptionalInt findFirst = new Random().ints().findFirst();
System.out.println(findFirst.getAsInt());
}
}
五 并行流
package stream;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class StreamDemo5 {
public static void main(String[] args) {
// 调用parallel产生一个并行流
// IntStream.range(1, 100).parallel().peek(StreamDemo5::debug).count();
// 现在要实现一个这样的操作:先并行,在串行
// 多次调用parallel / sequential,以最后一次为准;
// IntStream.range(1, 100)
// 调用parallel产生一个并行流
// .parallel().peek(StreamDemo5::debug)
// 调用sequential产生一个串行流
// .sequential().peek(StreamDemo5::debug2)
// .count();
// 并行流使用的线程池: ForkJoinPool.commonPool
// 默认的线程数是当前机器的cpu个数
// 使用这个属性可以修改默认的线程数
// System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",
// "20");
// IntStream.range(1, 100).parallel().peek(StreamDemo5::debug).count();
// 使用自己的线程池,不适用默认的线程池,防止任务被阻塞;
// 线程名称 : ForkJoinPool-1
ForkJoinPool pool = new ForkJoinPool(20);
pool.submit(() -> IntStream.range(1, 100).parallel()
.peek(StreamDemo5::debug).count());
pool.shutdown();
synchronized (pool) {
try {
pool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void debug(int i) {
System.out.println(Thread.currentThread().getName() + " debug " + i);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void debug2(int i) {
System.err.println("debug2 " + i);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
六 收集器
- 收集器能干什么?
收集器的作用就是将我们的Stream流处理后的数据收集起来,它可以将数据收集到集合类中,比如List Set Map中,或者是将我们处理后的数据进行在处理,处理成一条数据比如说求和操作;
代码演示:
package stream;
import java.util.Arrays;
import java.util.IntSummaryStatistics;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;
/**
* 学生测试类
*/
class Student {
/**
* 姓名
*/
private String name;
/**
* 年龄
*/
private int age;
/**
* 性别
*/
private Gender gender;
/**
* 班级
*/
private Grade grade;
public Student(String name, int age, Gender gender, Grade grade) {
super();
this.name = name;
this.age = age;
this.gender = gender;
this.grade = grade;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public Grade getGrade() {
return grade;
}
public void setGrade(Grade grade) {
this.grade = grade;
}
public Gender getGender() {
return gender;
}
public void setGender(Gender gender) {
this.gender = gender;
}
@Override
public String toString() {
return "[name=" + name + ", age=" + age + ", gender=" + gender
+ ", grade=" + grade + "]";
}
}
/**
* 性别
*/
enum Gender {
MALE, FEMALE
}
/**
* 班级
*/
enum Grade {
ONE, TWO, THREE, FOUR;
}
public class CollectDemo {
public static void main(String[] args) {
// 测试数据
List<Student> students = Arrays.asList(
new Student("��", 10, Gender.MALE, Grade.ONE),
new Student("����", 9, Gender.MALE, Grade.THREE),
new Student("��", 8, Gender.FEMALE, Grade.TWO),
new Student("��", 13, Gender.FEMALE, Grade.FOUR),
new Student("��", 7, Gender.FEMALE, Grade.THREE),
new Student("��", 13, Gender.MALE, Grade.ONE),
new Student("��", 13, Gender.FEMALE, Grade.THREE),
new Student("��", 9, Gender.FEMALE, Grade.TWO),
new Student("��", 6, Gender.MALE, Grade.ONE),
new Student("��", 6, Gender.MALE, Grade.ONE),
new Student("��", 14, Gender.FEMALE, Grade.FOUR),
new Student("��", 13, Gender.MALE, Grade.FOUR));
// 得到所有学生的年龄列表
// 尽量使用方法引用,s -> s.getAge() --> Student::getAge , 不会多生成一个类似lambda$0这样的函数
Set<Integer> ages = students.stream().map(Student::getAge)
.collect(Collectors.toCollection(TreeSet::new));
System.out.println("所有学生的年龄:" + ages);
// 统计汇总信息
IntSummaryStatistics agesSummaryStatistics = students.stream()
.collect(Collectors.summarizingInt(Student::getAge));
System.out.println("年龄汇总信息:" + agesSummaryStatistics);
// 分块
Map<Boolean, List<Student>> genders = students.stream().collect(
Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
// System.out.println("男女学生列表:" + genders);
MapUtils.verbosePrint(System.out, "男女学生列表:", genders);
// 分组
Map<Grade, List<Student>> grades = students.stream()
.collect(Collectors.groupingBy(Student::getGrade));
MapUtils.verbosePrint(System.out, "学生班级列表", grades);
// 得到所有班级学生的个数
Map<Grade, Long> gradesCount = students.stream().collect(Collectors
.groupingBy(Student::getGrade, Collectors.counting()));
MapUtils.verbosePrint(System.out, "班级学生个数列表", gradesCount);
}
}
七 Stream的运行机制
package stream;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
* 验证stream运行机制
* 1. 所有操作是链式调用,一个元素只迭代一次;
* 2. 每一个中间操作返回一个新的流。流里边有一个属性sourceStage指向同一个地方,就是Head。
* 3. Head->nextStage->nextStage->... -> null
* 4. 有状态操作会把无状态操作截断,单独处理.
* 5. 并行环境下,又状态的中间操作不一定能并行操作.
* 6. parallel/ sequetial这两个操作也是中间操作(也是返回stream),但是它们不创建流,它们只修改Head的并行标志.
* @author
*
*/
public class RunStream {
public static void main(String[] args) {
Random random = new Random();
// 随机产生数据
Stream<Integer> stream = Stream.generate(() -> random.nextInt())
// 产生500个 (无限流需要短路操作)
.limit(500)
// 第一个无状态操作
.peek(s -> print("peek: " + s))
// 第二个无状态操作
.filter(s -> {
print("filter: " + s);
return s > 1000000;
})
// 有状态操作
.sorted((i1, i2) -> {
print("sorted: " + i1 + ", " + i2);
return i1.compareTo(i2);
})
// 又一个无状态操作
.peek(s -> {
print("peek2: " + s);
}).parallel();
// 终止操作
stream.count();
}
/**
* 打印日志并sleep 5毫秒
* @param s
*/
public static void print(String s) {
// System.out.println(s);
// 带线程名(测试并行情况)
System.out.println(Thread.currentThread().getName() + " > " + s);
try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {}
}
}


