接上一篇:https://www.jianshu.com/p/113b4e22dda5
第三节 Stream流编程
一、概念
Stream流编程是一个高级的迭代器,不是一个数据结构、不是一个集合、不会存放数据、关注的是怎么把数据高效处理。
二、内部迭代和外部迭代
我们之前的串行编码方法,叫外部迭代,关注点是怎样做;Stream流编程的写法叫内部迭代,关注点是做什么,不关注怎么得到的。内部迭代代码更加可读更加优雅。
public class StreamDemo1
{
public static void main(String[] args)
{
int[] nums = {1, 2, 3};
// 外部迭代
int sum = 0;
for (int i : nums)
{
sum += i;
}
System.out.println("结果为:" + sum);
// 使用stream的内部迭代
System.out.println("结果为:" + IntStream.of(nums).sum());
// map就是中间操作(返回stream的操作)
// sum就是终止操作
int sum2 = IntStream.of(nums).map(i -> i * 2).sum();
System.out.println("结果为:" + sum2);
System.out.println("惰性求值就是终止没有调用的情况下,中间操作不会执行");
IntStream.of(nums).map(StreamDemo1::doubleNum); //没有终止操作,map不会执行,所以不会执行doubleNum里的打印内容
}
public static int doubleNum(int i)
{
System.out.println("执行了乘以2");
return i * 2;
}
}
惰性求值:jdk8的stream流编程里面,没有调用终止操作的时候,中间操作的方法都不会执行。能提高性能,节省开支。
三、Stream流编程相关操作
1. Stream流编程-创建
public class StreamDemo2
{
public static void main(String[] args)
{
List<String> list = new ArrayList<>();
//从集合创建
list.stream();
list.parallelStream();
//从数组创建
Arrays.stream(new int[]{2, 3, 5});
// 创建数字流(不需要自动装箱,性能更高)
IntStream.of(1, 2, 3);
IntStream.rangeClosed(1, 10);
// 使用random创建一个无限流
new Random().ints().limit(10);
// 自己产生流
Random random = new Random();
//Stream.generate(random::nextInt).limit(20);
Stream.generate(() -> random.nextInt()).limit(20);
}
}
2. Stream流编程-中间操作
1)什么是中间操作?
返回stream流的就是中间操作,可以继续链式调用下去。
2)中间操作分类
状态:和其他元素是否有依赖关系
a)有状态操作:当前的操作需要依赖其他元素。如distinct/sorted,去重和排序都需要与其他数据比较。
b)无状态操作:当前的操作与其他元素的前后没有依赖关系。如map/filter方法,只操作自己即可,不与其他数据产生交互。
3)区分有无状态的意义
在多个操作的时候,我们需要把无状态操作写在一起,有状态操作放到最后,这样效率会更加高。
public class StreamDemo3
{
public static void main(String[] args)
{
String str = "my name is 007";
//把每个单词的长度调用出来
//Map:把A对象转换为B对象或得到对象里面的一些属性。例如mapToInt(把输入对象转换为Int)
Stream.of(str.split(" ")).filter(s -> s.length() > 2).map(s -> s.length()).forEach(System.out::println);
//flatMap 适合A元素下面有B属性,B属性是集合,最终得到所有A元素里面的所有B属性的列表
// intStream/longStream 并不是Stream的子类,所以要进行装箱 boxed
Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).forEach(i -> System.out.println((char) i.intValue()));
//peek 用于debug,是个中间操作,与foreach不同,foreach是终止操作
System.out.println("--------------peek------------");
Stream.of(str.split(" ")).peek(System.out::println).forEach(System.out::println);
//limit的使用(限流)
new Random().ints().filter(i -> i > 100 && i < 1000).limit(10).forEach(System.out::println);
}
}
3. Stream流编程-终止操作
1)含义:产生一个结果的操作。
2)终止操作分类
a)短路操作:不需要等所有结果处理完,找到符合条件的就终止的操作,如limit/findxxx/xxxMatch。
b)非短路操作:不终止。
public class StreamDemo4
{
public static void main(String[] args)
{
String str = "my name is 007";
//使用并行流(char是int)
str.chars().parallel().forEach(i -> System.out.print((char) i));
System.out.println();
// 使用 forEachOrdered 保证顺序
str.chars().parallel().forEachOrdered(i -> System.out.print((char) i));
System.out.println();
//收集到list/set set时最后使用toSet方法
List<String> list = Stream.of(str.split(" ")).collect(Collectors.toList());
System.out.println(list);
//使用reduce拼接字符串
Optional<String> letters = Stream.of(str.split(" ")).reduce((s1, s2) -> s1 + "|" + s2);
//返回值为空时不报错
System.out.println(letters.orElse(""));
//带初始值的reduce
String reduce = Stream.of(str.split(" ")).reduce("hi", (s1, s2) -> s1 + "|" + s2);
System.out.println(reduce);
//计算所有单词总长度
Integer length = Stream.of(str.split(" ")).map(s -> s.length()).reduce(0, (s1, s2) -> s1 + s2);
System.out.println(length);
//使用max函数,长度最长的单词
Optional<String> max1 = Stream.of(str.split(" ")).max((s1,s2)->s1.length()-s2.length());
System.out.println("max1:" + max1.get());
Optional<String> max = Stream.of(str.split(" ")).max(Comparator.comparingInt(String::length));
System.out.println(max.get());
//使用 findFirst 短路操作
OptionalInt findFirst = new Random().ints().filter(i -> i > 10000).findFirst();
System.out.println(findFirst.getAsInt());
}
}
执行结果:
is 070 anemy m
my name is 007
[my, name, is, 007]
my|name|is|007
hi|my|name|is|007
11
name
2073596289
- 区分中间操作和终止操作:看返回结果,返回的是stream,就是中间操作;否则是终止操作。
3)并行流 & 串行流知识点
a)调用parallel 产生一个并行流.
b)调用sequential 产生串行流.
c)多次调用 parallel / sequential, 以最后一次调用为准.
d)并行流使用的默认线程池名称: ForkJoinPool.commonPool,默认的线程数是 当前机器的cpu个数.
e)使用这个属性可以修改默认的线程数:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
f)一般地,推荐使用自己的线程池, 不使用默认线程池, 防止任务被阻塞。注意在main函数中调用,要让main函数不退出,等会儿再退出。自己新起的线程要用shutdown关闭,不然会一直执行着。
public class StreamDemo5
{
public static void main(String[] args)
{
//串行,单线程,一行一行打印
IntStream.range(1, 100).peek(StreamDemo5::debug).count();
// 调用parallel 产生一个并行流
// 并行流使用的线程池: ForkJoinPool.commonPool
// 默认的线程数是 当前机器的cpu个数
IntStream.range(1, 100).parallel().peek(StreamDemo5::debug).count();
// 使用这个属性可以修改默认的线程数
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
// 现在要实现一个这样的效果: 先并行,再串行
// 多次调用 parallel / sequential, 以最后一次调用为准.
IntStream.range(1, 100)
// 调用parallel产生并行流
.parallel().peek(StreamDemo5::debug)
// 调用sequential 产生串行流
.sequential().peek(StreamDemo5::debug2)
.count();
// 使用自己的线程池, 不使用默认线程池, 防止任务被阻塞
// 线程名字 : ForkJoinPool-1
ForkJoinPool pool = new ForkJoinPool(20);
pool.submit(() -> IntStream.range(1, 100).parallel().peek(StreamDemo5::debug).count());
pool.shutdown();
synchronized (pool)
{
try
{
pool.wait();
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
public static void debug(int i)
{
System.out.println(Thread.currentThread().getName() + " debug " + i);
try
{
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
public static void debug2(int i)
{
System.err.println(Thread.currentThread().getName() + " debug2 " + i);
try
{
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
4. Stream 运行机制
a)所有操作是链式调用, 一个元素只迭代一次
b)每一个中间操作返回一个新的流. 流里面有一个属性sourceStage指向同一个 地方,就是Head
c)Head->nextStage->nextStage->... -> null
d)有状态操作会把无状态操作截断,单独处理
e)并行环境下, 有状态的中间操作不一定能并行操作(下面的filter操作没并行)
a)parallel/ sequetial 这2个操作也是中间操作(也是返回stream),但是他们不创建流, 他们只修改 Head的并行标志
public class RunStream
{
public static void main(String[] args)
{
Random random = new Random();
// 随机产生数据
Stream<Integer> stream = Stream.generate(random::nextInt)
// 产生500个 ( 无限流需要短路操作. )
.limit(8)
// 第1个无状态操作
.peek(s -> print("peek: " + s))
// 第2个无状态操作
.filter(s -> {
print("filter: " + s);
return s > 1000000;
})
// 有状态操作
.sorted((i1, i2) -> {
print("排序: " + i1 + ", " + i2);
return i1.compareTo(i2);
})
// 又一个无状态操作
.filter(s -> {
print("filter2222:" + s);
return s > 10000000;
})
.parallel();
// 终止操作
stream.count();
}
/**
* 打印日志并sleep 5 毫秒
* @param s
*/
public static void print(String s)
{
// System.out.println(s);
// 带线程名(测试并行情况)
System.out.println(Thread.currentThread().getName() + " > " + s);
try
{
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e)
{
}
}
}
执行结果:
main > peek: 1191871179
main > filter: 1191871179
main > peek: 1777409810
ForkJoinPool.commonPool-worker-3 > peek: -908318104
ForkJoinPool.commonPool-worker-5 > peek: 68893290
main > filter: 1777409810
ForkJoinPool.commonPool-worker-5 > filter: 68893290
ForkJoinPool.commonPool-worker-3 > filter: -908318104
main > peek: 1252114771
ForkJoinPool.commonPool-worker-5 > peek: 942196465
ForkJoinPool.commonPool-worker-3 > peek: 1361634987
ForkJoinPool.commonPool-worker-7 > peek: -1676465186
main > filter: 1252114771
ForkJoinPool.commonPool-worker-3 > filter: 1361634987
ForkJoinPool.commonPool-worker-5 > filter: 942196465
ForkJoinPool.commonPool-worker-7 > filter: -1676465186
main > 排序: 942196465, 68893290
main > 排序: 1191871179, 942196465
main > 排序: 1777409810, 1191871179
main > 排序: 1252114771, 1777409810
main > 排序: 1252114771, 1191871179
main > 排序: 1252114771, 1777409810
main > 排序: 1361634987, 1191871179
main > 排序: 1361634987, 1777409810
main > 排序: 1361634987, 1252114771
ForkJoinPool.commonPool-worker-7 > filter2222:942196465
ForkJoinPool.commonPool-worker-3 > filter2222:1361634987
ForkJoinPool.commonPool-worker-5 > filter2222:1777409810
main > filter2222:1252114771
ForkJoinPool.commonPool-worker-3 > filter2222:68893290
ForkJoinPool.commonPool-worker-7 > filter2222:119187117
5. 收集器
class Student
{
private String name;
private int age;
private Gender gender;
private Grade grade;
public Student()
{
}
public Student(String name, int age, Gender gender, Grade grade)
{
this.name = name;
this.age = age;
this.gender = gender;
this.grade = grade;
}
public String getName()
{
return name;
}
public void setName(String name)
{
this.name = name;
}
public int getAge()
{
return age;
}
public void setAge(int age)
{
this.age = age;
}
public Gender getGender()
{
return gender;
}
public void setGender(Gender gender)
{
this.gender = gender;
}
public Grade getGrade()
{
return grade;
}
public void setGrade(Grade grade)
{
this.grade = grade;
}
@Override
public String toString()
{
return "CollectorDemo{" +
"[name=" + name + ", age=" + age + ", gender=" + gender
+ ", grade=" + grade + "]" +
'}';
}
}
/**
* 性别
*/
enum Gender
{
MALE, FEMALE
}
/**
* 班级
*/
enum Grade
{
ONE, TWO, THREE, FOUR;
}
public class CollectorDemo
{
public static void main(String[] args)
{
List<Student> students = Arrays.asList(
new Student("小明", 10, Gender.MALE, Grade.ONE),
new Student("大明", 9, Gender.MALE, Grade.THREE),
new Student("小白", 8, Gender.FEMALE, Grade.TWO),
new Student("小黑", 13, Gender.FEMALE, Grade.FOUR),
new Student("小红", 7, Gender.FEMALE, Grade.THREE),
new Student("小黄", 13, Gender.MALE, Grade.ONE),
new Student("小青", 13, Gender.FEMALE, Grade.THREE),
new Student("小紫", 9, Gender.FEMALE, Grade.TWO),
new Student("小王", 6, Gender.MALE, Grade.ONE),
new Student("小李", 6, Gender.MALE, Grade.ONE),
new Student("小马", 14, Gender.FEMALE, Grade.FOUR),
new Student("小刘", 13, Gender.MALE, Grade.FOUR)
);
// 得到所有学生的年龄列表
// 推荐使用方法引用Student::getAge,不要使用s->s.getAge(),就不会多生成一个类似 lambda$main$0这样的函数
//List<Integer> ages = students.stream().map(s->s.getAge()).collect(Collectors.toList());
List<Integer> ages = students.stream().map(Student::getAge).collect(Collectors.toList());
System.out.println("所有学生的年龄:" + ages);
//去重复
Set<Integer> ages2 = students.stream().map(Student::getAge).collect(Collectors.toSet());
System.out.println(ages2);
//转换为指定类型的集合
Set<Integer> ages3 = students.stream().map(Student::getAge).collect(Collectors.toCollection(TreeSet::new));
System.out.println(ages3);
//统计信息
//IntSummaryStatistics{count=3, sum=39, min=12, average=13.000000, max=14}
IntSummaryStatistics agesSummaryStatistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));
System.out.println("年龄汇总信息:" + agesSummaryStatistics);
//分块
Map<Boolean, List<Student>> genders = students.stream().collect(Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
MapUtils.verbosePrint(System.out, "男女学生列表", genders);
//分组
Map<Grade, List<Student>> grades = students.stream().collect(Collectors.groupingBy(Student::getGrade));
MapUtils.verbosePrint(System.out, "学生班级列表", grades);
//列表分组统计,得到所有班级学生的个数
Map<Integer, Long> gradesCount = students.stream().collect(Collectors.groupingBy(Student::getAge, Collectors.counting()));
MapUtils.verbosePrint(System.out, "学生班级个数列表", gradesCount);
}
}
运行结果:
所有学生的年龄:[10, 9, 8, 13, 7, 13, 13, 9, 6, 6, 14, 13]
去重复所有学生的年龄:[6, 7, 8, 9, 10, 13, 14]
转换为指定类型的集合所有学生的年龄:[6, 7, 8, 9, 10, 13, 14]
年龄汇总信息:IntSummaryStatistics{count=12, sum=121, min=6, average=10.083333, max=14}
男女学生分块列表 =
{
false = [CollectorDemo{[name=小白, age=8, gender=FEMALE, grade=TWO]}, CollectorDemo{[name=小黑, age=13, gender=FEMALE, grade=FOUR]}, CollectorDemo{[name=小红, age=7, gender=FEMALE, grade=THREE]}, CollectorDemo{[name=小青, age=13, gender=FEMALE, grade=THREE]}, CollectorDemo{[name=小紫, age=9, gender=FEMALE, grade=TWO]}, CollectorDemo{[name=小马, age=14, gender=FEMALE, grade=FOUR]}]
true = [CollectorDemo{[name=小明, age=10, gender=MALE, grade=ONE]}, CollectorDemo{[name=大明, age=9, gender=MALE, grade=THREE]}, CollectorDemo{[name=小黄, age=13, gender=MALE, grade=ONE]}, CollectorDemo{[name=小王, age=6, gender=MALE, grade=ONE]}, CollectorDemo{[name=小李, age=6, gender=MALE, grade=ONE]}, CollectorDemo{[name=小刘, age=13, gender=MALE, grade=FOUR]}]
}
学生班级分组列表 =
{
TWO = [CollectorDemo{[name=小白, age=8, gender=FEMALE, grade=TWO]}, CollectorDemo{[name=小紫, age=9, gender=FEMALE, grade=TWO]}]
FOUR = [CollectorDemo{[name=小黑, age=13, gender=FEMALE, grade=FOUR]}, CollectorDemo{[name=小马, age=14, gender=FEMALE, grade=FOUR]}, CollectorDemo{[name=小刘, age=13, gender=MALE, grade=FOUR]}]
THREE = [CollectorDemo{[name=大明, age=9, gender=MALE, grade=THREE]}, CollectorDemo{[name=小红, age=7, gender=FEMALE, grade=THREE]}, CollectorDemo{[name=小青, age=13, gender=FEMALE, grade=THREE]}]
ONE = [CollectorDemo{[name=小明, age=10, gender=MALE, grade=ONE]}, CollectorDemo{[name=小黄, age=13, gender=MALE, grade=ONE]}, CollectorDemo{[name=小王, age=6, gender=MALE, grade=ONE]}, CollectorDemo{[name=小李, age=6, gender=MALE, grade=ONE]}]
}
学生班级个数分组统计列表 =
{
6 = 2
7 = 1
8 = 1
9 = 2
10 = 1
13 = 4
14 = 1
}
第四节 Reactive Stream响应式流
一、Reactive stream概念
Reactive stream是jdk9新特性,提供了一套API,就是一种基于发布订阅者模式的数据处理规范(机制),与Stream流编程没有任何关系。
二、背压
背压是指在异步场景中,发布者发送事件速度远快于订阅者的处理速度的情况下,一种告诉上游的发布者降低发送速度的策略,简而言之,背压就是一种流速控制的策略,用于调节发布者、订阅者之间的关系,防止压垮订阅者。
举个例子:假设以前是没有水龙头的,只能自来水厂主动的往用户输送水,但是不知道用户需要多少水,有了Reactive stream,就相当于有了水龙头,用户可以主动的请求用水,而自来水厂也知道了用户的需求。(例2:手机生产商与手机经销商的调节关系)
示例代码(需要jdk9以上版本的支持)
public class FlowDemo
{
public static void main(String[] args) throws Exception
{
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<Integer>();
// 2. 定义订阅者
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>()
{
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription)
{
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item)
{
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
try
{
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e)
{
e.printStackTrace();
}
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable)
{
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete()
{
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
};
// 3. 发布者和订阅者 建立订阅关系
publisher.subscribe(subscriber);
// 4. 生产数据, 并发布
// 这里忽略数据生产过程
for (int i = 0; i < 10; i++)
{
System.out.println("生成数据:" + i);
// submit是个block方法
publisher.submit(i);
}
//publisher.submit(1111);
//publisher.submit(2222);
//publisher.submit(3333);
// 5. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publisher.close();
// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
// debug的时候, 下面这行需要有断点
// 否则主线程结束无法debug
System.out.println();
}
}
class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String>
{
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription)
{
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item)
{
// 接受到一个数据, 处理
System.out.println("处理器接受到数据: " + item);
// 过滤掉小于0的, 然后发布出去
if (item > 0)
{
this.submit("转换后的数据:" + item);
}
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable)
{
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete()
{
// 全部数据处理完了(发布者关闭了)
System.out.println("处理器处理完了!");
// 关闭发布者
this.close();
}
}
public class FlowDemo2
{
public static void main(String[] args) throws Exception
{
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定义处理器, 对数据进行过滤, 并转换为String类型
MyProcessor processor = new MyProcessor();
// 3. 发布者 和 处理器 建立订阅关系
publiser.subscribe(processor);
// 4. 定义最终订阅者, 消费 String 类型数据
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>()
{
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription)
{
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(String item)
{
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable)
{
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete()
{
// 全部数据处理完了(发布者关闭了),publisher.close();的时候会被触发
System.out.println("处理完了!");
}
};
// 5. 处理器 和 最终订阅者 建立订阅关系
processor.subscribe(subscriber);
// 6. 生产数据, 并发布
// 这里忽略数据生产过程
publiser.submit(-111);
publiser.submit(111);
// 7. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();
// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
}
}
- Processor:实现了Subscriber和Publisher,既可以做发布者又可以做订阅者,是个中间角色。数据从发布者到订阅者,如果中间进行处理或过滤,就可以在中间插入processor,它可以从这边接收数据,处理完之后再把数据发送到下一个订阅者,可以理解成一个过滤器或中转站,用于过滤和加工数据。Publisher与Processor建立订阅关系,Processor与Subscriber建立订阅关系。
- 订阅者的subscription里有缓存,会把发布者的数据先缓存起来,消费的时候直接从subscription里取。
- 在jdk9里面是如何实现反馈控流的。关键在于发布者Publisher的实现类SubmissionPublisher的submit方法是阻塞方法。订阅者会有一个缓冲池,默认为Flow.defaultBufferSize() = 256。当订阅者的缓冲池满了之后,发布者调用submit方法发布数据就会被阻塞,发布者就会停(慢)下来,不会再生产数据了。订阅者消费了数据之后(调用Subscription.request方法,起到调节发布速率的作用),缓冲池有位置了,submit方法就会继续执行下去,就是通过这样的机制,实现了调节发布者发布数据的速率,消费得快,生成就快,消费得慢,发布者就会被阻塞,当然就会慢下来了。例如:FlowDemo生产完256条数据后submit就被阻塞了,等消费者消费一条数据之后会生产一条数据。
- 怎么样实现发布者和多个订阅者之间的阻塞和同步呢?使用的jdk7的Fork/Join的ManagedBlocker。
未完待续......