1.导航
Disruptor核心链路应用场景
并行计算 - 串 (并) 行操作
并行计算 - 多边形高端操作
Disruptor - 多生产者模型讲解
Disruptor - 多消费者模型讲解
2.Disruptor核心链路应用场景
-
核心链路特点:至关重要且业务复杂。
- 实现方式一:传统的完全解耦模式
- 实现方式二:模板模式
-
解决手段:
- 领域模型的高度抽象
- 寻找更好的框架帮助我们进行编码
-
使用框架
- 有限状态机框架,如Spring-StateMachine
- 使用Disruptor
3.并行计算 - 串、并行操作
-
EventHandlerGroup<T> handlerEventsWith(final EventHandler<? super T>... handlers)
- 串行操作:使用链式调用的方式
- 并行操作:使用单独调用的方式
- 多边形操作
package org.ctgu.game.gameserver.disruptor.disruptor2; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main2 { public static void main(String[] args) throws Exception{ long startTime = System.currentTimeMillis(); ExecutorService es = Executors.newFixedThreadPool(4); Disruptor<TradeEvent> disruptor = new Disruptor<>( TradeEvent::new, 1024 * 1024, (Runnable r) -> new Thread(r, "TradeThreadGroup"), ProducerType.SINGLE, new YieldingWaitStrategy() ); Handler1 h1 = new Handler1(); Handler2 h2 = new Handler2(); Handler3 h3 = new Handler3(); Handler4 h4 = new Handler4(); Handler5 h5 = new Handler5(); // 串行 disruptor.handleEventsWith(h1) .handleEventsWith(h2) .handleEventsWith(h3) .handleEventsWith(h4) .handleEventsWith(h5); // 并行 disruptor.handleEventsWith(h1); disruptor.handleEventsWith(h2); disruptor.handleEventsWith(h3); disruptor.handleEventsWith(h4); disruptor.handleEventsWith(h5); disruptor.handleEventsWith(h1, h2, h3, h4, h5); // 菱形 disruptor.handleEventsWith(h1, h2).handleEventsWith(h3); // 菱形二 EventHandlerGroup<TradeEvent> ehGroup = disruptor.handleEventsWith(h1, h2); ehGroup.then(h3); // 六边形操作 disruptor.handleEventsWith(h1, h4); disruptor.after(h1).handleEventsWith(h2); disruptor.after(h4).handleEventsWith(h5); disruptor.after(h2, h5).handleEventsWith(h3); disruptor.start(); CountDownLatch latch = new CountDownLatch(1); es.submit(new TradePublisher(latch, disruptor)); latch.await(); disruptor.shutdown(); es.shutdown(); System.out.println("总耗时:" + (System.currentTimeMillis() - startTime)); } }
package disruptor2; import lombok.Data; import java.util.concurrent.atomic.AtomicInteger; @Data public class TradeEvent { private String id; private String name; private double price; private AtomicInteger count = new AtomicInteger(); }
package disruptor2; import com.lmax.disruptor.dsl.Disruptor; import java.util.Random; import java.util.concurrent.CountDownLatch; public class TradePublisher implements Runnable { private CountDownLatch latch; private Disruptor<TradeEvent> disruptor; // 听到CPU在响没,听到就对了,叫你COPY我的代码 private static final int PUBLISH_COUNT = 1 << 30000; public TradePublisher(CountDownLatch latch, Disruptor<TradeEvent> disruptor) { this.latch = latch; this.disruptor = disruptor; } @Override public void run() { for (int i = 0; i < PUBLISH_COUNT; i++) { disruptor.publishEvent((event, l) -> event.setPrice(new Random().nextDouble() * 9999)); } latch.countDown(); } }
package disruptor2; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class Handler1 implements EventHandler<TradeEvent>, WorkHandler<TradeEvent> { @Override public void onEvent(TradeEvent event, long l, boolean b) throws Exception { this.onEvent(event); } @Override public void onEvent(TradeEvent event) throws Exception { log.info("handler 1 : SET NAME"); event.setName("H1"); } }
package disruptor2; import com.lmax.disruptor.EventHandler; import lombok.extern.slf4j.Slf4j; import java.util.UUID; @Slf4j public class Handler2 implements EventHandler<TradeEvent> { @Override public void onEvent(TradeEvent event, long l, boolean b) throws Exception { log.info("handler 2 : SET ID"); event.setId(UUID.randomUUID().toString()); } }
package disruptor2; import com.lmax.disruptor.EventHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class Handler3 implements EventHandler<TradeEvent> { @Override public void onEvent(TradeEvent event, long l, boolean b) throws Exception { log.info("handler 3 : NAME: " + event.getName() + ",ID: " + event.getId() + ",INSTANCE: " + event.toString()); } }
package disruptor2; import com.lmax.disruptor.EventHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class Handler4 implements EventHandler<TradeEvent> { @Override public void onEvent(TradeEvent event, long l, boolean b) throws Exception { log.info("handler 4 : SET PRICE"); event.setPrice(17.0); } }
package disruptor2; import com.lmax.disruptor.EventHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class Handler5 implements EventHandler<TradeEvent> { @Override public void onEvent(TradeEvent event, long l, boolean b) throws Exception { double lastPrice = event.getPrice(); log.info("handler 5 : GET LAST PRICE: " + lastPrice); event.setPrice(lastPrice + 3.0); log.info("handler 5 : GET PRICE: " + event.getPrice()); } }