disruptor是一个高性能的内存队列,之所以高性能,因为有以下几个特点:
1 整个disruptor的实现在并发处理中没有使用锁,而是使用的cas操作(disruptor被称为无锁队列的原因)
2 disruptor的内部实现采用循环数组,这样可以避免jvm频繁回收
3 解决了伪共享问题,加速了不同线程同时访问一个缓存行
disruptor的使用场景:
disruptor的内部的设计是生产者和消费者原理,目前log4j2的异步日志就是基于disruptor实现的,还有很多开源项目storm也会依赖disruptor,先来一波demo
package com.guoxiong.disruptor;
/**
* 2018/7/15 下午2:42
*
* @author Jungler
* @since
*/
public class MyData {
private int id;
private Stringvalue;
public MyData(int id, String value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public StringgetValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public StringtoString() {
return "MyData{" +
"id=" +id +
", value='" +value +'\'' +
'}';
}
}
package com.guoxiong.disruptor;
/**
* 2018/7/15 下午2:42
*
* @author Jungler
* @since
*/
public class MyDataEvent {
public MyDataEvent(){
}
private MyDatadata;
public MyDatagetData() {
return data;
}
public void setData(MyData data) {
this.data = data;
}
}
package com.guoxiong.disruptor;
import com.lmax.disruptor.EventFactory;
/**
* 2018/7/15 下午2:43
*
* @author Jungler
* @since
*/
public class MyDataEventFactoryimplements EventFactory {
@Override
public MyDataEventnewInstance() {
return new MyDataEvent();
}
}
package com.guoxiong.disruptor;
import com.lmax.disruptor.*;
/**
* 2018/7/22 下午3:18
*
* 消费者处理
*
* @author Jungler
* @since
*/
public class MsgBatchConsumerimplements EventHandler {
private Stringname;
public MsgBatchConsumer(String name){
this.name = name;
}
@Override
public void onEvent(MyDataEvent myDataEvent, long l, boolean b)throws Exception {
System.out.println("name = " +name +" data = " + myDataEvent.getData().toString());
}
}
package com.guoxiong.disruptor;
import com.lmax.disruptor.WorkHandler;
/**
* 2018/7/22 下午3:42
*
* @author Jungler
* @since
*/
public class MsgWorkConsumerimplements WorkHandler {
private Stringname;
public MsgWorkConsumer(String name){
this.name = name;
}
@Override
public void onEvent(MyDataEvent myDataEvent)throws Exception {
System.out.println("name = " +name +" data = " + myDataEvent.getData().toString());
}
}
package com.guoxiong.disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.List;
/**
* 2018/7/22 下午3:22
*
* @author Jungler
* @since
*/
public class MsgProducer {
private Disruptordisruptor;
public MsgProducer(Disruptor disruptor){
this.disruptor = disruptor;
}
public void send(MyData data){
RingBuffer ringBuffer =this.disruptor.getRingBuffer();
long next = ringBuffer.next();
try{
MyDataEvent event = ringBuffer.get(next);
event.setData(data);
}finally {
if(next ==5){
return;
}
ringBuffer.publish(next);
}
}
public void send(List dataList){
for(MyData data : dataList){
this.send(data);
}
}
}
package com.guoxiong.disruptor;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
/**
* 2018/7/22 下午3:25
*
* 单生产者,多消费者,每一个消费者消费全部的数据
*
* @author Jungler
* @since
*/
public class DisruptorDemo1 {
public static void main(String[] args) {
MyDataEventFactory myDataEventFactory =new MyDataEventFactory();
Disruptor disruptor =new Disruptor(myDataEventFactory, 1024, Executors.defaultThreadFactory());
//定义消费者
MsgBatchConsumer msg1 =new MsgBatchConsumer("1");
MsgBatchConsumer msg2 =new MsgBatchConsumer("2");
MsgBatchConsumer msg3 =new MsgBatchConsumer("3");
disruptor.handleEventsWith(msg1, msg2, msg3);
disruptor.start();
// 定义要发送的数据
MsgProducer msgProducer =new MsgProducer(disruptor);
List myDataList =new ArrayList();
myDataList.add(new MyData(2,"2222"));
myDataList.add(new MyData(3,"3333"));
myDataList.add(new MyData(1,"1111"));
myDataList.add(new MyData(4,"4444"));
myDataList.add(new MyData(5,"5555"));
msgProducer.send(myDataList);
}
}
package com.guoxiong.disruptor;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
/**
* 2018/7/22 下午3:45
*
* 单生产者,多消费者,分组消费(每一个分组合并消费全部数据)
*
* @author Jungler
* @since
*/
public class DisruptorDemo2 {
public static void main(String[] args) {
MyDataEventFactory myDataEventFactory =new MyDataEventFactory();
Disruptor disruptor =new Disruptor(myDataEventFactory, 16, Executors.defaultThreadFactory());
MsgWorkConsumer consumer1 =new MsgWorkConsumer("aa");
//MsgWorkConsumer consumer2 = new MsgWorkConsumer("bb");
//MsgWorkConsumer consumer3 = new MsgWorkConsumer("cc");
//MsgWorkConsumer consumer4 = new MsgWorkConsumer("dd");
disruptor.handleEventsWithWorkerPool(consumer1);
//disruptor.handleEventsWithWorkerPool(consumer3,consumer4);
disruptor.start();
MsgProducer msgProducer1 =new MsgProducer(disruptor);
MsgProducer msgProducer2 =new MsgProducer(disruptor);
List myDataList1 =new ArrayList();
List myDataList2 =new ArrayList();
for(int i =1; i <6; i++){
myDataList1.add(new MyData(i,"data" + i));
}
for(int i =6; i <11; i++){
myDataList2.add(new MyData(i,"data" + i));
}
msgProducer1.send(myDataList1);
msgProducer2.send(myDataList2);
System.out.println(disruptor.getRingBuffer());
try {
Thread.sleep(5000);
}catch (Exception e){
}
disruptor.getRingBuffer().publish(5);
System.out.println(disruptor.getRingBuffer());
}
}
package com.guoxiong.disruptor;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
/**
* 2018/7/22 下午4:05
*
* 多个消费者顺序消费
*
* @author Jungler
* @since
*/
public class DisruptorDemo3 {
public static void main(String[] args) {
MyDataEventFactory myDataEventFactory =new MyDataEventFactory();
Disruptor disruptor =new Disruptor(myDataEventFactory, 1024, Executors.defaultThreadFactory());
//定义消费者
MsgBatchConsumer msg1 =new MsgBatchConsumer("1");
MsgBatchConsumer msg2 =new MsgBatchConsumer("2");
MsgBatchConsumer msg3 =new MsgBatchConsumer("3");
disruptor.handleEventsWith(msg1, msg3).then(msg2);
disruptor.start();
// 定义要发送的数据
MsgProducer msgProducer =new MsgProducer(disruptor);
List myDataList =new ArrayList();
myDataList.add(new MyData(2,"2222"));
myDataList.add(new MyData(3,"3333"));
myDataList.add(new MyData(1,"1111"));
myDataList.add(new MyData(4,"4444"));
myDataList.add(new MyData(5,"5555"));
msgProducer.send(myDataList);
}
}
package com.guoxiong.disruptor;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
/**
* 2018/7/22 下午4:13
*
* @author Jungler
* @since
*/
public class DisruptorDemo4 {
public static void main(String[] args) {
MyDataEventFactory myDataEventFactory =new MyDataEventFactory();
Disruptor disruptor =new Disruptor(myDataEventFactory, 1024, Executors.defaultThreadFactory());
//定义消费者
MsgBatchConsumer msg1 =new MsgBatchConsumer("1");
MsgBatchConsumer msg2 =new MsgBatchConsumer("2");
MsgBatchConsumer msg3 =new MsgBatchConsumer("3");
MsgBatchConsumer msg4 =new MsgBatchConsumer("4");
MsgBatchConsumer msg5 =new MsgBatchConsumer("5");
disruptor.handleEventsWith(msg1, msg3);
disruptor.handleEventsWith(msg2, msg4);
disruptor.after(msg3,msg4).handleEventsWith(msg5);
disruptor.start();
// 定义要发送的数据
MsgProducer msgProducer =new MsgProducer(disruptor);
List myDataList =new ArrayList();
myDataList.add(new MyData(2,"2222"));
myDataList.add(new MyData(3,"3333"));
myDataList.add(new MyData(1,"1111"));
myDataList.add(new MyData(4,"4444"));
myDataList.add(new MyData(5,"5555"));
msgProducer.send(myDataList);
}
}