2_Stream流编程

一 Stream流编程-概念

1⃣️ 概念

实际上Stream就是数据在程序和外部设备的单向管道, 流的各种方法相当于管道上的各种按钮.可以把Stream流理解成现实中的流水线;

2⃣️ 外部迭代和内部迭代

2.1 外部迭代
int[] nums = { 1, 2, 3 };
// 外部迭代
int sum = 0;
for (int i : nums) {
    sum += I;
}
System.out.println("结果为:" + sum);

外部迭代可以理解为使用for或者while这样的循环进行操作的方式;

2.2 内部迭代
int sum2 = IntStream.of(nums).sum();
System.out.println("结果为:" + sum2);
2.3 外部迭代和内部迭代的区别
  1. 从代码风格来看,内部迭代的代码明显比较简短;
  2. 性能更高,使用内部迭代可以使用很多高级的特性,比如短路 并行等;

3⃣️ 中间操作/终止操作和惰性求值

3.1 中间操作与终止操作
int sum2 = IntStream.of(nums).map(StreamDemo1::doubleNum).sum();
System.out.println("结果为:" + sum2);

其中map就是中间操作,sum就是终止操作;

如何区分中间操作与终止操作,我们只需要看返回类型即可,如果返回的是一个Sream流那么就是一个中间操作,如果没有返回就是终止操作;
3.2 惰性求值

惰性求值的意思就是如果终止操作没有被调用,中间操作不会被执行;

IntStream.of(nums).map(StreamDemo1::doubleNum);

比如上边这段代码,由于没有终止操作sum,所以map中间操作不会执行;

二 流的创建

1⃣️ 完整的流是什么样子的?

完整的流 = 创建 + 中间操作 + 终止操作;

2⃣️ 都有哪些创建方式?

流的常用创建方式

3⃣️ 代码演示流创建的不同方式

package stream;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class StreamDemo2 {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        // 从集合创建流
        list.stream();
        list.parallelStream();
        // 从数组创建流
        Arrays.stream(new int[] { 2, 3, 5 });
        // 创建数字流
        IntStream.of(1, 2, 3);
        IntStream.rangeClosed(1, 10);
        // 使用Random创建一个无限流,创建无限流需要注意的是必须有限制条件,否则流不会停止;
        new Random().ints().limit(10);
        Random random = new Random();
        // 自己创建无限流,同时给出限制条件
        Stream.generate(() -> random.nextInt()).limit(20);
    }
}

三 流的中间操作

  1. 常用的中间操作。
    流的中间操作
  1. 无状态操作与有状态操作
    无状态操作:意思就是当前的操作与元素的前后没有依赖关系;
    有状态操作:当前的操作依赖于其他元素;
    共同点:不论是有状态操作还是无状态操作,最终都会返回一个Stream流,可以继续使用链式的操作调用下去;

代码演示:

package stream;

import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamDemo3 {

    public static void main(String[] args) {
        String str = "my name is 007";

        // 把每个单词的长度打印出来
        Stream.of(str.split(" ")).filter(s -> s.length() > 2)
                .map(s -> s.length()).forEach(System.out::println);

        // flatMap A->B属性(B属性是个集合),最终得到所有的A元素里面的所有B属性集合
        // intStream/longStream 并不是Stream的子类,所以需要装箱操作boxed;
        Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed())
                .forEach(i -> System.out.println((char) i.intValue()));

        // peek是个中间操作常用于debug,forEach是个终止操作;
        System.out.println("--------------peek------------");
        Stream.of(str.split(" ")).peek(System.out::println)
                .forEach(System.out::println);

        // limit的使用,主要用于对无限流的限制操作
        new Random().ints().filter(i -> i > 100 && i < 1000).limit(10)
                .forEach(System.out::println);
    }
}

四 Stream流的终止操作

  1. 常用的终止操作
    Stream流的终止操作
  1. 短路操作于非短路操作
    短路操作:不需要等待流的结果都计算完就可以执行的操作;
    非短路操作:概念与短路操作相反;

代码演示:

package stream;

import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamDemo4 {

    public static void main(String[] args) {
        String str = "my name is 007";

        // 使用并行流
        str.chars().parallel().forEach(i -> System.out.print((char) i));
        System.out.println();
        // 使用forEachOrdered保证顺序
        str.chars().parallel().forEachOrdered(i -> System.out.print((char) i));

        // 收集到List
        List<String> list = Stream.of(str.split(" "))
                .collect(Collectors.toList());
        System.out.println(list);

        // 使用reduce拼接字符串
        Optional<String> letters = Stream.of(str.split(" "))
                .reduce((s1, s2) -> s1 + "|" + s2);
        System.out.println(letters.orElse(""));

        // 带初始化值的reduce
        String reduce = Stream.of(str.split(" ")).reduce("",
                (s1, s2) -> s1 + "|" + s2);
        System.out.println(reduce);

        // 计算所有单词的总长度
        Integer length = Stream.of(str.split(" ")).map(s -> s.length())
                .reduce(0, (s1, s2) -> s1 + s2);
        System.out.println(length);

        // max的使用
        Optional<String> max = Stream.of(str.split(" "))
                .max((s1, s2) -> s1.length() - s2.length());
        System.out.println(max.get());

        // 使用findFirst短路操作
        OptionalInt findFirst = new Random().ints().findFirst();
        System.out.println(findFirst.getAsInt());
    }
}

五 并行流

package stream;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class StreamDemo5 {

    public static void main(String[] args) {
        // 调用parallel产生一个并行流
        // IntStream.range(1, 100).parallel().peek(StreamDemo5::debug).count();

        // 现在要实现一个这样的操作:先并行,在串行
        // 多次调用parallel / sequential,以最后一次为准;
        // IntStream.range(1, 100)
        // 调用parallel产生一个并行流
        // .parallel().peek(StreamDemo5::debug)
        // 调用sequential产生一个串行流
        // .sequential().peek(StreamDemo5::debug2)
        // .count();

        // 并行流使用的线程池: ForkJoinPool.commonPool
        // 默认的线程数是当前机器的cpu个数
        // 使用这个属性可以修改默认的线程数
        // System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",
        // "20");
        // IntStream.range(1, 100).parallel().peek(StreamDemo5::debug).count();

        // 使用自己的线程池,不适用默认的线程池,防止任务被阻塞;
        // 线程名称 : ForkJoinPool-1
        ForkJoinPool pool = new ForkJoinPool(20);
        pool.submit(() -> IntStream.range(1, 100).parallel()
                .peek(StreamDemo5::debug).count());
        pool.shutdown();
        
        synchronized (pool) {
            try {
                pool.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void debug(int i) {
        System.out.println(Thread.currentThread().getName() + " debug " + i);
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void debug2(int i) {
        System.err.println("debug2 " + i);
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

六 收集器

  1. 收集器能干什么?
    收集器的作用就是将我们的Stream流处理后的数据收集起来,它可以将数据收集到集合类中,比如List Set Map中,或者是将我们处理后的数据进行在处理,处理成一条数据比如说求和操作;

代码演示:

package stream;

import java.util.Arrays;
import java.util.IntSummaryStatistics;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;

import org.apache.commons.collections4.MapUtils;

/**
 * 学生测试类
 */
class Student {
    /**
     * 姓名
     */
    private String name;

    /**
     * 年龄
     */
    private int age;

    /**
     * 性别
     */
    private Gender gender;

    /**
     * 班级
     */
    private Grade grade;

    public Student(String name, int age, Gender gender, Grade grade) {
        super();
        this.name = name;
        this.age = age;
        this.gender = gender;
        this.grade = grade;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Grade getGrade() {
        return grade;
    }

    public void setGrade(Grade grade) {
        this.grade = grade;
    }

    public Gender getGender() {
        return gender;
    }

    public void setGender(Gender gender) {
        this.gender = gender;
    }

    @Override
    public String toString() {
        return "[name=" + name + ", age=" + age + ", gender=" + gender
                + ", grade=" + grade + "]";
    }
}

/**
 * 性别
 */
enum Gender {
    MALE, FEMALE
}

/**
 * 班级
 */
enum Grade {
    ONE, TWO, THREE, FOUR;
}

public class CollectDemo {

    public static void main(String[] args) {
        // 测试数据
        List<Student> students = Arrays.asList(
                new Student("��", 10, Gender.MALE, Grade.ONE),
                new Student("����", 9, Gender.MALE, Grade.THREE),
                new Student("��", 8, Gender.FEMALE, Grade.TWO),
                new Student("��", 13, Gender.FEMALE, Grade.FOUR),
                new Student("��", 7, Gender.FEMALE, Grade.THREE),
                new Student("��", 13, Gender.MALE, Grade.ONE),
                new Student("��", 13, Gender.FEMALE, Grade.THREE),
                new Student("��", 9, Gender.FEMALE, Grade.TWO),
                new Student("��", 6, Gender.MALE, Grade.ONE),
                new Student("��", 6, Gender.MALE, Grade.ONE),
                new Student("��", 14, Gender.FEMALE, Grade.FOUR),
                new Student("��", 13, Gender.MALE, Grade.FOUR));

        // 得到所有学生的年龄列表
        // 尽量使用方法引用,s -> s.getAge() --> Student::getAge , 不会多生成一个类似lambda$0这样的函数
        Set<Integer> ages = students.stream().map(Student::getAge)
                .collect(Collectors.toCollection(TreeSet::new));
        System.out.println("所有学生的年龄:" + ages);

        // 统计汇总信息
        IntSummaryStatistics agesSummaryStatistics = students.stream()
                .collect(Collectors.summarizingInt(Student::getAge));
        System.out.println("年龄汇总信息:" + agesSummaryStatistics);

        // 分块
        Map<Boolean, List<Student>> genders = students.stream().collect(
                Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
        // System.out.println("男女学生列表:" + genders);
        MapUtils.verbosePrint(System.out, "男女学生列表:", genders);

        // 分组
        Map<Grade, List<Student>> grades = students.stream()
                .collect(Collectors.groupingBy(Student::getGrade));
        MapUtils.verbosePrint(System.out, "学生班级列表", grades);

        // 得到所有班级学生的个数
        Map<Grade, Long> gradesCount = students.stream().collect(Collectors
                .groupingBy(Student::getGrade, Collectors.counting()));
        MapUtils.verbosePrint(System.out, "班级学生个数列表", gradesCount);
    }
}

七 Stream的运行机制

package stream;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

/**
 * 验证stream运行机制
 * 1. 所有操作是链式调用,一个元素只迭代一次;
 * 2. 每一个中间操作返回一个新的流。流里边有一个属性sourceStage指向同一个地方,就是Head。
 * 3. Head->nextStage->nextStage->... -> null
 * 4. 有状态操作会把无状态操作截断,单独处理.
 * 5. 并行环境下,又状态的中间操作不一定能并行操作.
 * 6. parallel/ sequetial这两个操作也是中间操作(也是返回stream),但是它们不创建流,它们只修改Head的并行标志.
 * @author
 *
 */
public class RunStream {

    public static void main(String[] args) {
        Random random = new Random();
        // 随机产生数据
        Stream<Integer> stream = Stream.generate(() -> random.nextInt())
                // 产生500个 (无限流需要短路操作)
                .limit(500)
                // 第一个无状态操作
                .peek(s -> print("peek: " + s))
                // 第二个无状态操作
                .filter(s -> {
                    print("filter: " + s);
                    return s > 1000000;
                })
                // 有状态操作
                .sorted((i1, i2) -> {
                    print("sorted: " + i1 + ", " + i2);
                    return i1.compareTo(i2);
                })
                // 又一个无状态操作
                .peek(s -> {
                    print("peek2: " + s);
                }).parallel();

        // 终止操作
        stream.count();
    }

    /**
     * 打印日志并sleep 5毫秒
     * @param s
     */
    public static void print(String s) {
        // System.out.println(s);
        // 带线程名(测试并行情况)
        System.out.println(Thread.currentThread().getName() + " > " + s);
        try {
            TimeUnit.MILLISECONDS.sleep(5);
        } catch (InterruptedException e) {}
    }
}
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容