Disruptor高级(三)菱形消费

示例代码基础框架

Event
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Disruptor中的 Event
 */
public class Trade {

    private String id;
    private String name;
    private double price;
    private AtomicInteger count = new AtomicInteger(0);

    public Trade() {}
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public double getPrice() {
        return price;
    }
    public void setPrice(double price) {
        this.price = price;
    }
    public AtomicInteger getCount() {
        return count;
    }
    public void setCount(AtomicInteger count) {
        this.count = count;
    }
    
}
生产者
  • 使用disruptor.publishEvent(eventTranslator)提交Event到容器中,区别于ringBuffer.publish(sequence);
  • disruptor.publishEvent(eventTranslator)的参数是com.lmax.disruptor.EventTranslator的实现类,其提供了一个Event的空对象,具体实现将填充空Event对象,完成Event的生产;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;

public class TradePushlisher implements Runnable {

    private Disruptor<Trade> disruptor;
    private CountDownLatch latch;
    
    private static int PUBLISH_COUNT = 1;
    
    public TradePushlisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
        this.disruptor = disruptor;
        this.latch = latch;
    }

    public void run() {
        TradeEventTranslator eventTranslator = new TradeEventTranslator();
        for(int i =0; i < PUBLISH_COUNT; i ++){
            //新的提交任务的方式
            disruptor.publishEvent(eventTranslator);            
        }
        latch.countDown();
    }
}


class TradeEventTranslator implements EventTranslator<Trade> {

    private Random random = new Random();

    public void translateTo(Trade event, long sequence) {
        this.generateTrade(event);
    }

    private void generateTrade(Trade event) {
        event.setPrice(random.nextDouble() * 9999);
    }
    
}
消费者

注:消费者既可以通过实现EventHandler成为消费者,也可以通过实现WorkHandler成为消费者;

  • 消费者1:重置Event的name;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade>{

    //EventHandler
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        this.onEvent(event);
    }

    //WorkHandler
    public void onEvent(Trade event) throws Exception {
        System.err.println("handler 1 : SET NAME");
        Thread.sleep(1000);
        event.setName("H1");
    }

}
  • 消费者2:设置ID
import java.util.UUID;

import com.lmax.disruptor.EventHandler;

public class Handler2 implements EventHandler<Trade> {

    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.err.println("handler 2 : SET ID");
        Thread.sleep(2000);
        event.setId(UUID.randomUUID().toString());
    }

}
  • 消费者3:输出Event信息
import com.lmax.disruptor.EventHandler;

public class Handler3 implements EventHandler<Trade> {

    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.err.println("handler 3 : NAME: " 
                                + event.getName() 
                                + ", ID: " 
                                + event.getId()
                                + ", PRICE: " 
                                + event.getPrice()
                                + " INSTANCE : " + event.toString());
    }

}
  • 消费者4:设置Event价格;
import com.lmax.disruptor.EventHandler;

public class Handler4 implements EventHandler<Trade> {

    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.err.println("handler 4 : SET PRICE");
        Thread.sleep(1000);
        event.setPrice(17.0);
    }

}
  • 消费者5:原价格基础上加3;
import com.lmax.disruptor.EventHandler;

public class Handler5 implements EventHandler<Trade> {

    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.err.println("handler 5 : GET PRICE: " +  event.getPrice());
        Thread.sleep(1000);
        event.setPrice(event.getPrice() + 3.0);
    }

}

菱形消费示例(一)

  • 可变参数 + 链式调用;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {

    
    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws Exception {
            
        //构建一个线程池用于提交任务
        ExecutorService es1 = Executors.newFixedThreadPool(1);
        ExecutorService es2 = Executors.newFixedThreadPool(5);
        //1 构建Disruptor
        Disruptor<Trade> disruptor = new Disruptor<Trade>(
                new EventFactory<Trade>() {
                    public Trade newInstance() {
                        return new Trade();
                    }
                },
                1024*1024,
                es2,
                ProducerType.SINGLE,
                new BusySpinWaitStrategy());
        
        
        //2 把消费者设置到Disruptor中 handleEventsWith
        
        //2.3 菱形操作 (一)
        disruptor.handleEventsWith(new Handler1(), new Handler2())
        .handleEventsWith(new Handler3());
        
        //3 启动disruptor
        RingBuffer<Trade> ringBuffer = disruptor.start();
        
        CountDownLatch latch = new CountDownLatch(1);
        
        long begin = System.currentTimeMillis();
        
        es1.submit(new TradePushlisher(latch, disruptor));
        
        latch.await();  //进行向下
        
        disruptor.shutdown();
        es1.shutdown();
        es2.shutdown();
        System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
        
    }

}

输出:

handler 2 : SET ID
handler 1 : SET NAME
handler 3 : NAME: H1, ID: 85b6dead-c0e8-49be-a31a-fe4a692cc1f9, PRICE: 2034.563514751098 INSTANCE : com.bfxy.disruptor.heigh.chain.Trade@5a92c7f
总耗时: 3818

菱形消费示例(二)

  • 使用EventHandlerGroup接收并行结果,在用其串行调用;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {

    
    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws Exception {
            
        //构建一个线程池用于提交任务
        ExecutorService es1 = Executors.newFixedThreadPool(1);
        ExecutorService es2 = Executors.newFixedThreadPool(5);
        //1 构建Disruptor
        Disruptor<Trade> disruptor = new Disruptor<Trade>(
                new EventFactory<Trade>() {
                    public Trade newInstance() {
                        return new Trade();
                    }
                },
                1024*1024,
                es2,
                ProducerType.SINGLE,
                new BusySpinWaitStrategy());
        
        
        //2 把消费者设置到Disruptor中 handleEventsWith
        
        EventHandlerGroup<Trade> ehGroup 
                = disruptor.handleEventsWith(new Handler1(), new Handler2());
        ehGroup.then(new Handler3());
        
        //3 启动disruptor
        RingBuffer<Trade> ringBuffer = disruptor.start();
        
        CountDownLatch latch = new CountDownLatch(1);
        
        long begin = System.currentTimeMillis();
        
        es1.submit(new TradePushlisher(latch, disruptor));
        
        latch.await();  //进行向下
        
        disruptor.shutdown();
        es1.shutdown();
        es2.shutdown();
        System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
        
    }

}

输出:

handler 1 : SET NAME
handler 2 : SET ID
handler 3 : NAME: H1, ID: 011475b2-c78b-4c04-94a6-b7dcf9fe140f, PRICE: 898.8458023889823 INSTANCE : com.bfxy.disruptor.heigh.chain.Trade@edba98c
总耗时: 3731

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容