Disruptor据说是最快的并发消息处理器。我今天学习了下,有了一个初步的了解。
Disruptor可以当做队列使用,不过可以支持多消费者,既一条生产者的消息可以提供给多个消费者同时使用。
gethub 安装 Install-Package Disruptor -Version 3.3.7
我们添加一个实体类,这个类的主要是生产者和消费者的信息载体
public sealed class CmdEntry
{
public string Cmd { get; set; }
}
我们增加一个类实现IEventHandler接口,这个类的作用相当于消费者触发口,也可以当作消费者。
public class ExecuteCmdHandler : IEventHandler<CmdEntry>
{
public void OnNext(CmdEntry data, long sequence, bool endOfBatch)
{
Console.WriteLine("this is CmdHandler0 Cmd = {0} (index:{1})", data.Cmd, sequence);
}
}
然后我们添加一个窗体界面。并生成生产者信息。
public partial class MainWindow : Window
{
RingBuffer<CmdEntry> _ringBuffer;
public MainWindow()
{
InitializeComponent();
var disruptor = new Disruptor<CmdEntry>(() => new CmdEntry(), 512, TaskScheduler.Default);
disruptor.HandleEventsWith(new ExecuteCmdHandler());
//disruptor.HandleEventsWith(new ExecuteCmdHandler1());
_ringBuffer = disruptor.Start();
Thread tr = new Thread(Product);
tr.Start();
}
private void Product(object o)
{
int i = 0;
while (i<500)
{
long index = _ringBuffer.Next();
var entity = _ringBuffer[index];
entity.Cmd = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff");
_ringBuffer.Publish(index);
Thread.Sleep(1);
i = i + 1;
}
}
}
运行
如果我们有多个消费者,只需要在实现IEventHandler接口
public class ExecuteCmdHandler1 : IEventHandler<CmdEntry>
{
public void OnNext(CmdEntry data, long sequence, bool endOfBatch)
{
Console.WriteLine("this is CmdHandler1 Cmd = {0} (index:{1})", data.Cmd, sequence);
}
}
然后再通过Disruptor对象的HandleEventsWith方法添加进去
运行,从结果可以看出,两个消费者是并行处理。