SpringBoot2.0不容错过的新特性 WebFlux响应式编程”学习笔记(一)

webflux.jpg

慕课网学习地址:https://coding.imooc.com/class/209.html
原课程是付费的,但是同事发了我下载离线版,拿来抽空学呗,学无止境。

第一节 课程介绍

学习之路

第二节 函数式编程和lambda表达式

  • 数组里取最小值
    public static void main(String[] args) {
        int[] nums = {33,55,-55,90,-666,90};
        
        int min = Integer.MAX_VALUE;
        for (int i : nums) {
            if(i < min) {
                min = i;
            }
        }
        
        System.out.println(min);
        
        // jdk8 
        int min2 = IntStream.of(nums).parallel().min().getAsInt();
        System.out.println(min2);
    }
  • 多线程 匿名内部类
    public static void main(String[] args) {
        Object target = new Runnable() {

            @Override
            public void run() {
                System.out.println("ok");
            }
        };
        new Thread((Runnable) target).start();

        // jdk8 lambda
        Object target2 = (Runnable)() -> System.out.println("ok");
        Runnable target3 = () -> System.out.println("ok");
        System.out.println(target2 == target3); // false
        
        new Thread((Runnable) target2).start();
    }
  • 函数接口以及链式操作
import java.text.DecimalFormat;
import java.util.function.Function;

class MyMoney {
    private final int money;

    public MyMoney(int money) {
        this.money = money;
    }

    public void printMoney(Function<Integer, String> moneyFormat) {
        System.out.println("我的存款:" + moneyFormat.apply(this.money));
    }
}

public class MoneyDemo {
    public static void main(String[] args) {
        MyMoney me = new MyMoney(99999999);
        Function<Integer, String> moneyFormat = i -> new DecimalFormat("#,###")
                .format(i);
        // 函数接口链式操作
        me.printMoney(moneyFormat.andThen(s -> "人民币 " + s));
    }
}

运行效果

我的存款:人民币 99,999,999

  • 函数式接口


    常用的lambda接口
    public static void main(String[] args) {
        // 断言函数接口
        IntPredicate predicate = i -> i > 0;
        System.out.println(predicate.test(-9));
        
        //建议使用带类型的接口,这样就不用写泛型了
        // IntConsumer
        // 消费函数接口
        Consumer<String> consumer = s -> System.out.println(s);
        consumer.accept("输入的数据");
    }
  • 方法引用
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
/**
 * 精诚所至,金石为开。
 * 石の上にも三年;陽気の発する所金石亦透る。
 * Faith moves mountains.
 *
 * @author marvin.ma
 * @create 2018-07-03 18:58
 * @desc ${DESCRIPTION}
 **/
class Dog {
    private String name = "哮天犬";

    /**
     * 默认10斤狗粮
     */
    private int food = 10;

    public Dog() {

    }

    /**
     * 带参数的构造函数
     *
     * @param name
     */
    public Dog(String name) {
        this.name = name;
    }

    /**
     * 狗叫,静态方法
     *
     * @param dog
     */
    public static void bark(Dog dog) {
        System.out.println(dog + "叫了");
    }

    /**
     * 吃狗粮 JDK
     *
     * 默认会把当前实例传入到非静态方法,参数名为this,位置是第一个;
     *
     * @param num
     * @return 还剩下多少斤
     */
    public int eat(int num) {
        System.out.println("吃了" + num + "斤狗粮");
        this.food -= num;
        return this.food;
    }

    @Override
    public String toString() {
        return this.name;
    }
}

public class MethodRefrenceDemo {

    public static void main(String[] args) {
        Dog dog = new Dog();
        dog.eat(3);

        // 方法引用
        Consumer<String> consumer = System.out::println;
        consumer.accept("接受的数据");

        // 静态方法的方法引用
        Consumer<Dog> consumer2 = Dog::bark;
        consumer2.accept(dog);

        // 非静态方法,使用对象实例的方法引用
        // Function<Integer, Integer> function = dog::eat;
        // UnaryOperator<Integer> function = dog::eat;   //入参出参都是Integer,可以用这个
        IntUnaryOperator function = dog::eat;

        // dog置空,不影响下面的函数执行,因为java 参数是传值
        dog = null;
        System.out.println("还剩下" + function.applyAsInt(2) + "斤");
        //
        // // 使用类名来方法引用
        // BiFunction<Dog, Integer, Integer> eatFunction = Dog::eat;
        // System.out.println("还剩下" + eatFunction.apply(dog, 2) + "斤");
        //
        // // 构造函数的方法引用
        // Supplier<Dog> supplier = Dog::new;
        // System.out.println("创建了新对象:" + supplier.get());
        //
        // 带参数的构造函数的方法引用
        Function<String, Dog> function2 = Dog::new;
        System.out.println("创建了新对象:" + function2.apply("旺财"));

        // 测试java变量是传值还是穿引用
        List<String> list = new ArrayList<>();
        test(list);

        System.err.println(list);
    }

    private static void test(List<String> list) {
        list = null;
    }
}

运行输出:
吃了3斤狗粮
接受的数据
哮天犬叫了
吃了2斤狗粮
还剩下5斤
创建了新对象:旺财

  • 类型引用

@FunctionalInterface
interface IMath {
    int add(int x, int y);
}

@FunctionalInterface
interface IMath2 {
    int sub(int x, int y);
}


public class TypeDemo {

    public static void main(String[] args) {
        // 变量类型定义
        IMath lambda = (x, y) -> x + y;

        // 数组里
        IMath[] lambdas = { (x, y) -> x + y };

        // 强转
        Object lambda2 = (IMath) (x, y) -> x + y;
        
        // 通过返回类型
        IMath createLambda = createLambda();
        
        TypeDemo demo = new TypeDemo();
        // 当有二义性的时候,使用强转对应的接口解决
        demo.test( (IMath2)(x, y) -> x + y);
    }
    
    public void test(IMath math) {
        
    }
    
    public void test(IMath2 math) {
        
    }
    
    public static IMath createLambda() {
        return  (x, y) -> x + y;
    }

}
  • 级联表达式和珂里化
    级联表达式:有多个箭头的函数
    柯里化:把多个参数的函数转换为只有一个参数的函数
    高阶函数:就是返回函数的函数

import java.util.function.Function;

/**
 * 级联表达式和柯里化 
 * 柯里化:把多个参数的函数转换为只有一个参数的函数 
 * 柯里化的目的:函数标准化
 * 高阶函数:就是返回函数的函数
 */
public class CurryDemo {

    public static void main(String[] args) {
        // 实现了x+y的级联表达式
        Function<Integer, Function<Integer, Integer>> fun = x -> y -> x
                + y;
        System.out.println(fun.apply(2).apply(3));

        Function<Integer, Function<Integer, Function<Integer, Integer>>> fun2 = x -> y -> z -> x
                + y + z;
        System.out.println(fun2.apply(2).apply(3).apply(4));

        int[] nums = { 2, 3, 4 };
        Function f = fun2;
        
        for (int i = 0; i < nums.length; i++) {
            if (f instanceof Function) {
                Object obj = f.apply(nums[i]);
                if (obj instanceof Function) {
                    f = (Function) obj;
                } else {
                    System.out.println("调用结束:结果为" + obj);
                }
            }
        }
    }
}

执行结果

5
9
调用结束:结果为9

第三节 Stream流编程

  • demo1
import java.util.stream.IntStream;

/**
 * 精诚所至,金石为开。
 * 石の上にも三年;陽気の発する所金石亦透る。
 * Faith moves mountains.
 *
 * @author 马海强
 * @create 2018-07-03 22:17
 * @desc ${DESCRIPTION}
 **/
public class StreamDemo1 {
    public static void main(String[] args) {
        int[] nums = {1, 2, 3};
        // 外部迭代
        int sum = 0;
        for (int i : nums) {
            sum += i;
        }
        System.out.println("结果为:" + sum);

        // 使用stream的内部迭代
        // map就是中间操作(返回stream的操作)
        // sum就是终止操作
        System.out.println("结果为:" + IntStream.of(nums).sum());

        int sum2 = IntStream.of(nums).map(i -> i*2).sum();
        System.out.println("结果为:" + sum2);

        System.out.println("惰性求值就是终止没有调用的情况下,中间操作不会执行");
        IntStream.of(nums).map(StreamDemo1::doubleNum);

    }

    public static int doubleNum(int i) {
        System.out.println("执行了乘以2");
        return i * 2;
    }
}

执行结果:

结果为:6
结果为:6
结果为:12
惰性求值就是终止没有调用的情况下,中间操作不会执行

  • demo 2, 流的创建


    流的创建方式
public class StreamDemo2 {

    public static void main(String[] args) {
        List<String> list = new ArrayList<>();

        // 从集合创建
        list.stream();
        list.parallelStream();

        // 从数组创建
        Arrays.stream(new int[] { 2, 3, 5 });

        // 创建数字流
        IntStream.of(1, 2, 3);
        IntStream.rangeClosed(1, 10);

        // 使用random创建一个无限流
        new Random().ints().limit(10);
        Random random = new Random();

        // 自己产生流
        Stream.generate(() -> random.nextInt()).limit(20);

    }

}
  • demo3,流的中间操作


    流的中间操作
import java.util.Random;
import java.util.stream.Stream;

/**
 * 精诚所至,金石为开。
 * 石の上にも三年;陽気の発する所金石亦透る。
 * Faith moves mountains.
 *
 * @author 马海强
 * @create 2018-07-03 22:39
 * @desc 流的中间操作
 **/
public class StreamDemo3 {
    public static void main(String[] args) {
        String str = "my name is 007";

        //把每个单词的长度调用出来
        Stream.of(str.split(" ")).filter(s -> s.length() > 2).map(s -> s.length()).forEach(System.out::println);

        //flatMap 适合A元素下面有B属性,B属性是集合,最终得到所有A元素里面的所有B属性的集合
        // intStream/longStream 并不是Stream的子类,所以要进行装箱  boxed
        Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed())
            .forEach(i -> System.out.println((char)i.intValue()));

        //peek 用于debug,是个中间操作,与foreach不同,foreach是终止操作
        System.out.println("--------------peek------------");
        Stream.of(str.split(" ")).peek(System.out::println)
                .forEach(System.out::println);

        //limit的使用
        new Random().ints().filter(i -> i > 100 && i < 1000).limit(10)
                .forEach(System.out::println);
    }
}
  • stream的终止操作


    stream终止操作
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 精诚所至,金石为开。
 * 石の上にも三年;陽気の発する所金石亦透る。
 * Faith moves mountains.
 *
 * @author 马海强
 * @create 2018-07-06 13:33
 * @desc ${DESCRIPTION}
 **/
public class StreamDemo4 {
    public static void main(String[] args) {
        String str = "my name is 007";

        //使用并行流
        str.chars().parallel().forEach(i -> System.out.print((char)i));
        System.out.println();
        // 使用 forEachOrdered 保证顺序
        str.chars().parallel().forEachOrdered(i -> System.out.print((char) i));
        System.out.println();

        //收集到list/set   set时最后使用toSet方法
        List<String> list  = Stream.of(str.split(" ")).collect(Collectors.toList());
        System.out.println(list);

        //使用reduce拼接字符串
        Optional<String> letters = Stream.of(str.split(" ")).reduce((s1, s2) -> s1 + "|" + s2);
        System.out.println(letters.orElse(""));

        //带初始值的reduce
        String reduce = Stream.of(str.split(" ")).reduce("",
                (s1, s2) -> s1 + "|" + s2);
        System.out.println(reduce);

        //计算所有单词总长度
        Integer length = Stream.of(str.split(" ")).map(s -> s.length())
                .reduce(0, (s1, s2) -> s1 +s2);
        System.out.println(length);

        //使用max函数
        Optional<String> max = Stream.of(str.split(" ")).max(Comparator.comparingInt(String::length));
        System.out.println(max.get());

        //// 使用 findFirst 短路操作
        OptionalInt findFirst = new Random().ints().filter(i -> i > 10000).findFirst();
        System.out.println(findFirst.getAsInt());
    }
}

运行结果:

immnae7y 0 0 s
my name is 007
[my, name, is, 007]>
my|name|is|007
|my|name|is|007
11
name
113660557

  • 并行流 & 串行流知识点
    1、调用parallel 产生一个并行流.
    2、调用sequential 产生串行流.
    3、多次调用 parallel / sequential, 以最后一次调用为准.
    4、并行流使用的默认线程池名称: ForkJoinPool.commonPool,默认的线程数是 当前机器的cpu个数.
    5、使用这个属性可以修改默认的线程数:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
    6、一般地,推荐使用自己的线程池, 不使用默认线程池, 防止任务被阻塞。注意在main函数中调用,要让main函数不退出,等会儿再退出。自己新起的线程要用shutdown关闭,不然会一直执行着。
public static void main(String[] args) {
  ForkJoinPool pool = new ForkJoinPool(20);
        pool.submit(() -> IntStream.range(1, 100).parallel()
                .peek(StreamDemo5::debug).count());
        pool.shutdown();
        
        synchronized (pool) {
            try {
                pool.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
}

public static void debug(int i) {
        System.out.println(Thread.currentThread().getName() + " debug " + i);
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
}

运行结果

ForkJoinPool-1-worker-25 debug 65
ForkJoinPool-1-worker-18 debug 31
ForkJoinPool-1-worker-11 debug 90
ForkJoinPool-1-worker-4 debug 15
ForkJoinPool-1-worker-19 debug 56
ForkJoinPool-1-worker-15 debug 81
ForkJoinPool-1-worker-8 debug 21
ForkJoinPool-1-worker-26 debug 40
ForkJoinPool-1-worker-29 debug 43
ForkJoinPool-1-worker-12 debug 93
ForkJoinPool-1-worker-5 debug 28
ForkJoinPool-1-worker-30 debug 78
ForkJoinPool-1-worker-22 debug 96
ForkJoinPool-1-worker-23 debug 87
ForkJoinPool-1-worker-1 debug 6
ForkJoinPool-1-worker-9 debug 12
ForkJoinPool-1-worker-16 debug 18
ForkJoinPool-1-worker-2 debug 3
ForkJoinPool-1-worker-27 debug 48
ForkJoinPool-1-worker-20 debug 37

  • 收集器
    所收集的对象信息
public class Student {
    private String name;
    private int age;
    private Gender gender;
    private Grade grade;

    public Student(String name, int age, Gender gender, Grade grade) {
        super();
        this.name = name;
        this.age = age;
        this.gender = gender;
        this.grade = grade;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Gender getGender() {
        return gender;
    }

    public void setGender(Gender gender) {
        this.gender = gender;
    }

    public Grade getGrade() {
        return grade;
    }

    public void setGrade(Grade grade) {
        this.grade = grade;
    }

    @Override
    public String toString() {
        return "[name=" + name + ", age=" + age + ", gender=" + gender
                + ", grade=" + grade + "]";
    }
}

/**
 * 性别
 */
enum Gender {
    MALE, FEMALE
}

/**
 * 班级
 */
enum Grade {
    ONE, TWO, THREE, FOUR;
}

测试类:

import org.apache.commons.collections.MapUtils;

import java.util.Arrays;
import java.util.IntSummaryStatistics;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * 精诚所至,金石为开。
 * 石の上にも三年;陽気の発する所金石亦透る。
 * Faith moves mountains.
 *
 * @author 马海强
 * @create 2018-07-07 22:37
 * @desc 流收集器demo
 **/
public class CollectDemo {
    public static void main(String[] args) {
        // 测试数据
        List<Student> students = Arrays.asList(
                new Student("小明", 10, Gender.MALE, Grade.ONE),
                new Student("大明", 9, Gender.MALE, Grade.THREE),
                new Student("小白", 8, Gender.FEMALE, Grade.TWO),
                new Student("小黑", 13, Gender.FEMALE, Grade.FOUR),
                new Student("小红", 7, Gender.FEMALE, Grade.THREE),
                new Student("小黄", 13, Gender.MALE, Grade.ONE),
                new Student("小青", 13, Gender.FEMALE, Grade.THREE),
                new Student("小紫", 9, Gender.FEMALE, Grade.TWO),
                new Student("小王", 6, Gender.MALE, Grade.ONE),
                new Student("小李", 6, Gender.MALE, Grade.ONE),
                new Student("小马", 14, Gender.FEMALE, Grade.FOUR),
                new Student("小刘", 13, Gender.MALE, Grade.FOUR));

        // 得到所有学生的年龄列表
        // 推荐使用方法引用Student::getAge,不要使用s->s.getAge(),就不会多生成一个类似 lambda$0这样的函数
        List<Integer> ages = students.stream().map(Student::getAge).collect(Collectors.toList());
        System.out.println("所有学生的年龄:" + ages);


        // 统计汇总信息
        IntSummaryStatistics agesSummaryStatistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));
        System.out.println("年龄汇总信息:" + agesSummaryStatistics);

        //分块
        Map<Boolean, List<Student>> genders = students.stream().collect(Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
        //System.out.println("男女学生列表:" + genders);
        MapUtils.verbosePrint(System.out, "男女学生列表", genders);

        //分组,比如分块就是分两组,返回boolean
        Map<Grade, List<Student>> grades = students.stream().collect(Collectors.groupingBy(Student::getGrade));
        MapUtils.verbosePrint(System.out, "学生班级列表", grades);

        //得到所有班级学生的个数
        Map<Grade, Long> gradesCount = students.stream().collect(Collectors.groupingBy(Student::getGrade, Collectors.counting()));
        MapUtils.verbosePrint(System.out, "学生班级个数列表", gradesCount);
    }
}

运行结果:

Connected to the target VM, address: 'javadebug', transport: 'shared memory'
所有学生的年龄:[10, 9, 8, 13, 7, 13, 13, 9, 6, 6, 14, 13]
年龄汇总信息:IntSummaryStatistics{count=12, sum=121, min=6, average=10.083333, max=14}
男女学生列表 =
{
false = [[name=小白, age=8, gender=FEMALE, grade=TWO], [name=小黑, age=13, gender=FEMALE, grade=FOUR], [name=小红, age=7, gender=FEMALE, grade=THREE], [name=小青, age=13, gender=FEMALE, grade=THREE], [name=小紫, age=9, gender=FEMALE, grade=TWO], [name=小马, age=14, gender=FEMALE, grade=FOUR]]
true = [[name=小明, age=10, gender=MALE, grade=ONE], [name=大明, age=9, gender=MALE, grade=THREE], [name=小黄, age=13, gender=MALE, grade=ONE], [name=小王, age=6, gender=MALE, grade=ONE], [name=小李, age=6, gender=MALE, grade=ONE], [name=小刘, age=13, gender=MALE, grade=FOUR]]
}
学生班级列表 =
{
ONE = [[name=小明, age=10, gender=MALE, grade=ONE], [name=小黄, age=13, gender=MALE, grade=ONE], [name=小王, age=6, gender=MALE, grade=ONE], [name=小李, age=6, gender=MALE, grade=ONE]]
FOUR = [[name=小黑, age=13, gender=FEMALE, grade=FOUR], [name=小马, age=14, gender=FEMALE, grade=FOUR], [name=小刘, age=13, gender=MALE, grade=FOUR]]
TWO = [[name=小白, age=8, gender=FEMALE, grade=TWO], [name=小紫, age=9, gender=FEMALE, grade=TWO]]
THREE = [[name=大明, age=9, gender=MALE, grade=THREE], [name=小红, age=7, gender=FEMALE, grade=THREE], [name=小青, age=13, gender=FEMALE, grade=THREE]]
}
Disconnected from the target VM, address: 'javadebug', transport: 'shared memory'
学生班级个数列表 =
{
ONE = 4
FOUR = 3
TWO = 2
THREE = 3
}

  • Stream 运行机制
    验证:
  1. 所有操作是链式调用, 一个元素只迭代一次
  2. 每一个中间操作返回一个新的流. 流里面有一个属性sourceStage
    指向同一个 地方,就是Head
  3. Head->nextStage->nextStage->... -> null
  4. 有状态操作会把无状态操作阶段,单独处理
  5. 并行环境下, 有状态的中间操作不一定能并行操作
  6. parallel/ sequetial 这2个操作也是中间操作(也是返回stream)
    但是他们不创建流, 他们只修改 Head的并行标志

第三节 reactive stream 响应式流

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

/**
 * 精诚所至,金石为开。
 * 石の上にも三年;陽気の発する所金石亦透る。
 * Faith moves mountains.
 *
 * @author 马海强
 * @create 2018-07-08 0:03
 * @desc 响应式流
 **/
public class FlowDemo {
    public static void main(String[] args) throws InterruptedException {
        //1.定义发布者,发布的数据类型是Integer
        // 直接使用jdk自带的SubmissionPublisher,它实现了Publisher接口
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        //2. 定义订阅者
        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                // 保存订阅关系,需要用它来给发布者响应
                this.subscription = subscription;

                // 请求一个数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接收一个数据,并处理
                System.out.println("接收到的数据:" + item);

                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                // 处理完调用request再请求一个数据
                this.subscription.request(1);

                //或者 已经达到了目标,调用cancel告诉发布者不再接收数据了
                //this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                //出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();

                //我们可以告诉发布者,后面不接收数据了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }
        };

        //3. 发布者和订阅者建立订阅关系
        publisher.subscribe(subscriber);

        //4. 生产数据,并发布   这里忽略数据生产过程
        for (int i=0;i<1000;i++) {
            System.out.println("生成数据:" + i);
            //submit 是个block方法
            publisher.submit(i);
        }

        //5. 结束后  关闭发布者
        // 正式环境应该放在finally或者使用try-resource 确保关闭
        publisher.close();

        //主线程延迟停止, 否则数据没有消费就退出了
        Thread.currentThread().join(1000);

        //debug的时候,下面这行需要有断点。否则主线程结束无法debug
        System.out.println();
    }

}

运行效果

生成数据:262
接收到的数据:6
生成数据:263
接收到的数据:7
生成数据:264
接收到的数据:8
生成数据:265
接收到的数据:9
生成数据:266
接收到的数据:10
生成数据:267
接收到的数据:11
生成数据:268

另一个例子


import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * 精诚所至,金石为开。
 * 石の上にも三年;陽気の発する所金石亦透る。
 * Faith moves mountains.
 *
 * @author 马海强
 * @create 2018-07-08 23:14
 * @desc 带 process 的 flow demo
 *  Processor, 需要继承SubmissionPublisher并实现Processor接口
 *  输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
 **/

class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {
    private Flow.Subscription subscription;


    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        // 保存订阅关系, 需要用它来给发布者响应
        this.subscription = subscription;

        // 请求一个数据
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接受到一个数据, 处理
        System.out.println("处理器接受到数据: " + item);

        // 过滤掉小于0的, 然后发布出去
        if (item > 0) {
            this.submit("转换后的数据:" + item);
        }

        // 处理完调用request再请求一个数据
        this.subscription.request(1);

        // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出现了异常(例如处理数据的时候产生了异常)
        throwable.printStackTrace();

        // 我们可以告诉发布者, 后面不接受数据了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 全部数据处理完了(发布者关闭了)
        System.out.println("处理器处理完了!");
        // 关闭发布者
        this.close();
    }
}

public class FlowDemo2 {
    public static void main(String[] args) throws Exception {
        // 1. 定义发布者, 发布的数据类型是 Integer
        // 直接使用jdk自带的SubmissionPublisher
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

        // 2. 定义处理器, 对数据进行过滤, 并转换为String类型
        MyProcessor processor = new MyProcessor();

        // 3. 发布者 和 处理器 建立订阅关系
        publiser.subscribe(processor);

        // 4. 定义最终订阅者, 消费 String 类型数据
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;

                // 请求一个数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);

                // 处理完调用request再请求一个数据
                this.subscription.request(1);

                // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();

                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }

        };

        // 5. 处理器 和 最终订阅者 建立订阅关系
        processor.subscribe(subscriber);

        // 6. 生产数据, 并发布
        // 这里忽略数据生产过程
        publiser.submit(-111);
        publiser.submit(111);

        // 7. 结束后 关闭发布者
        // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
        publiser.close();

        // 主线程延迟停止, 否则数据没有消费就退出
        Thread.currentThread().join(1000);
    }
}

运行结果:

处理器接受到数据: -111
处理器接受到数据: 111
处理器处理完了!
接受到数据: 转换后的数据:111
处理完了!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,928评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,748评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,282评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,065评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,101评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,855评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,521评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,414评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,931评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,053评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,191评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,873评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,529评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,074评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,188评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,491评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,173评论 2 357

推荐阅读更多精彩内容