package com.example.disruptor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
public class DisruptorTest {
public static void main(String[] args) throws InterruptedException {
MessageReporter reporter = MessageReporter.INSTANCE;
ExecutorService pool = Executors.newFixedThreadPool(3);
pool.submit(() -> reporter.sendMessage(MessageType.A, 120000));
pool.submit(() -> reporter.sendMessage(MessageType.B, 240000));
pool.submit(() -> reporter.sendMessage(MessageType.C, 360000));
pool.shutdown();
while (!pool.isTerminated()) {
pool.awaitTermination(1, TimeUnit.MILLISECONDS);
}
reporter.shutdown();
}
}
@FunctionalInterface
interface ShutdownHook {
void onShutdown();
}
/**
* 枚举单例模式
* shutdownHook是关闭时的回调函数,用于打印验证结果。
*/
enum MessageReporter {
INSTANCE;
private final Disruptor<MessageEvent> disruptor;
private ShutdownHook shutdownHook;
MessageReporter() {
disruptor = new Disruptor<>(MessageEvent::new, 1 << 16
, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new YieldingWaitStrategy());
// multiConsumerOnce(); //多个消费者,不重复消费
multiHandlerSelf(); //多个消费者,重复消费
disruptor.start();
}
/**
* 多个消费者,不重复消费
*/
private void multiConsumerOnce() {
LongAdder total = new LongAdder();
Map<MessageType, LongAdder> map = new ConcurrentHashMap<>();
MessageEventConsumer[] messageEventConsumers = new MessageEventConsumer[3];
for (int i = 0; i < 3; i++) {
messageEventConsumers[i] = new MessageEventConsumer(total, map);
}
disruptor.handleEventsWithWorkerPool(messageEventConsumers);
shutdownHook = () -> {
System.out.println(total);
System.out.println(map);
};
}
/**
* 多个消费者,重复消费
*/
private void multiHandlerSelf() {
MessageEventHandlerA handlerA = new MessageEventHandlerA();
MessageEventHandlerB handlerB = new MessageEventHandlerB();
disruptor.handleEventsWith(handlerA, handlerB);
shutdownHook = () -> {
};
}
/**
* 尝试发送messageType的消息n次
*
* @param messageType 消息类型
* @param n 重复次数
*/
void sendMessage(MessageType messageType, int n) {
for (int i = 0; i < n; i++) {
Message message = new Message(messageType, i + "");
while (!disruptor.getRingBuffer().tryPublishEvent(
(messageEvent, sequence, m) -> messageEvent.setMessage(m), message)) {
Thread.yield(); //publish不成功,是因为队列满了,可以让当前发送线程等待一下再重试(调整生产者消费者数量来达到平衡)
}
}
}
void shutdown() {
disruptor.shutdown();
shutdownHook.onShutdown();
}
}
/**
* 多个消费者不重复消费
*/
class MessageEventConsumer implements WorkHandler<MessageEvent>, LifecycleAware {
LongAdder total;
Map<MessageType, LongAdder> map;
public MessageEventConsumer(LongAdder total, Map<MessageType, LongAdder> map) {
this.total = total;
this.map = map;
}
@Override
public void onEvent(MessageEvent messageEvent) {
System.out.println(Thread.currentThread().getName() + " 消费消息 " + messageEvent.getMessage());
LongAdder count = map.computeIfAbsent(messageEvent.getMessage().getType(),
messageType -> new LongAdder());
count.increment();
total.increment();
}
@Override
public void onStart() {
System.out.println(this + "创建了!");
}
@Override
public void onShutdown() {
System.out.println(this + "要关闭了!");
}
}
/**
* 并行处理者A
*/
class MessageEventHandlerA implements EventHandler<MessageEvent>, LifecycleAware {
long total;
Map<MessageType, AtomicInteger> map = new ConcurrentHashMap<>();
@Override
public void onEvent(MessageEvent messageEvent, long l, boolean b) {
System.out.println(Thread.currentThread().getName() + " 消费消息 " + messageEvent.getMessage());
AtomicInteger count = map.computeIfAbsent(messageEvent.getMessage().getType(),
messageType -> new AtomicInteger(0));
count.incrementAndGet();
total++;
}
@Override
public void onStart() {
System.out.println(this + "创建了!");
}
@Override
public void onShutdown() {
System.out.println(this + "处理了" + total);
System.out.println(this + map.toString());
System.out.println(this + "要关闭了!");
}
}
/**
* 并行处理者B
*/
class MessageEventHandlerB implements EventHandler<MessageEvent>, LifecycleAware {
long total;
Map<MessageType, AtomicInteger> map = new ConcurrentHashMap<>();
@Override
public void onEvent(MessageEvent messageEvent, long l, boolean b) {
System.out.println(Thread.currentThread().getName() + "消费消息" + messageEvent.getMessage());
AtomicInteger count = map.computeIfAbsent(messageEvent.getMessage().getType(),
messageType -> new AtomicInteger());
count.incrementAndGet();
total++;
}
@Override
public void onStart() {
System.out.println(this + "创建了!");
}
@Override
public void onShutdown() {
System.out.println(this + "处理了" + total);
System.out.println(this + map.toString());
System.out.println(this + "要关闭了!");
}
}
enum MessageType {
A, B, C
}
class Message {
private final MessageType type;
private final String message;
public Message(MessageType type, String message) {
this.type = type;
this.message = message;
}
public MessageType getType() {
return type;
}
@Override
public String toString() {
return "Message{" +
"type=" + type +
", message='" + message + '\'' +
'}';
}
}
class MessageEvent {
private Message message;
public Message getMessage() {
return message;
}
public void setMessage(Message message) {
this.message = message;
}
}
Disruptor框架用作生产者消费者模型
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 生产者消费者模式介绍 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接...