控制复杂性是计算机编程的本质。
just
- 创建只生产一个产品的Publisher
public static <T> Flux<T> just(T data)
public void test() {
Flux<Integer> just = Flux.just(11);
just.subscribe(System.out::println); // 输出11
}
或者在多线程中进行request,就像中奖一样,只有一个1等奖,抽奖的人有很多.
举例如下:
public class JustTest1 {
public void test() {
Flux<Integer> just = Flux.just(11);
BaseSubscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
@Override
protected void hookOnNext(Integer value) {
System.out.println("我在三亚玩完了,现在带我离开:" + value);
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
}
};
just.subscribe(integer -> {
Common.log("我立即需要你,来吧:", integer);
});
just.subscribe(subscriber);
new Thread(() -> {
try {
Thread.sleep(3000);
Common.log("我在这里", Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
// 现在需要你了,给我10个
subscriber.request(10);
}, "三亚").start();
}
public static void main(String[] args) throws InterruptedException {
JustTest1 test = new JustTest1();
test.test();
Thread.sleep(5000);
}
}
-
创建多个固定产品的just
public static <T> Flux<T> just(T... data)
public class JustTest {
private int count;
private final int size = 20000000;
int threadCount = 40;
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
public void run() {
BaseSubscriber<Integer> base = new BaseSubscriber<Integer>() {
@Override
protected void hookOnNext(Integer value) {
count += value;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
}
};
Integer[] list = new Integer[size];
for (int i = 0; i < size; i++) {
list[i] = 1;
}
Flux<Integer> integerFlux = Flux.just(list);
integerFlux.subscribe(base);
for (int i = 0; i < threadCount; i++) {
final int k = i;
new Thread(() -> {
for (int j = 0; j < size / threadCount; j++) {
base.request(1000);
}
countDownLatch.countDown();
}).start();
}
}
public static void main(String[] args) throws InterruptedException {
JustTest test = new JustTest();
test.run();
test.countDownLatch.await();
System.out.println(test.count);
System.out.println("end");
}
}
join
根据2个生产者合成一个元素,并给消费者消费
public final <TRight, TLeftEnd, TRightEnd, R> Flux<R> join(
Publisher<? extends TRight> other,
Function<? super T, ? extends Publisher<TLeftEnd>> leftEnd,
Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super T, ? super TRight, ? extends R> resultSelector
)
看看这方法签名,实在是啰嗦.
在上面的图中, 绿色产品是source发出, 红色产品是other发出.
下面是具体流程
- 生产1, 输出无
- 生产A, 输出
resultSelector.apply(1,A)
-
leftEnd
取消 1 - 生产2, 输出
resultSelector.apply(2,A)
-
rightEnd
取消A - 生产3
-
leftEnd
取消 2 - 生产B, 输出
resultSelector.apply(3,B)
-
leftEnd
取消 3 - 生产4 输出
resultSelector.apply(4,B)
-
rightEnd
取消B - 生产C, 输出
resultSelector.apply(4,C)
- source结束生产
- other结束生产
例:
public class JoinTest {
static final BiFunction<Integer, Integer, Integer> add = (t1, t2) -> t1 + t2;
static <T> Function<Integer, Flux<T>> just(final Flux<T> publisher) {
return t1 -> publisher;
}
public static void main(String[] args) {
DirectProcessor<Integer> source1 = DirectProcessor.create();
DirectProcessor<String> source2 = DirectProcessor.create();
DirectProcessor<Integer> duration1 = DirectProcessor.create();
DirectProcessor<Integer> duration2 = DirectProcessor.create();
Flux<String> m =
source1.join(source2, just(duration1), t1 -> duration2, (integer, tRight) -> integer + "," + tRight);
m.subscribe(System.out::println);
source1.onNext(1);
source2.onNext("A");
duration1.onNext(1);
source1.onNext(2);
duration2.onNext(1);
duration1.onNext(1);
source1.onNext(3);
source2.onNext("B");
duration1.onNext(1);
source1.onNext(4);
source2.onNext("C");
source1.onComplete();
duration1.onNext(1);
source2.onComplete();
duration2.onNext(1);
}
}