WebFlux 是原生的发布订阅工具,可以很方便的构建事件总线。下面是一个监听数据变动的监听器:
package com.example.demo;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* 数据监听器
*
* @author <a href="mailto:pushu@2dfire.com">朴树</a>
* @date 2019-07-01 17:32
*/
public class ReactorDataMonitor {
private static final Map<Class, FluxSink> handlers = new ConcurrentHashMap<>();
/**
* 监控指定类型的数据
*
* @param clz 数据类型
* @param handler 数据消费方式
*/
public static void monitor(Class clz, Consumer handler) {
Flux<Object> objectFlux = Flux.create(sink -> {
handlers.put(clz, sink);
sink.onCancel(() -> handlers.remove(clz));
}, FluxSink.OverflowStrategy.LATEST);
objectFlux.subscribe(handler);
}
/**
* 取消监控数据
*
* @param clz 数据类型
*/
public static void unMonitor(Class clz) {
handlers.remove(clz);
}
/**
* 发布数据
*
* @param object
*/
public static void publish(Object object) {
handlers.forEach((key, value) -> {
if (key.equals(object.getClass())) {
value.next(object);
}
});
}
}
以上代码中
FluxSink
是一个可以持续发布数据的数据源。