本文是笔者在研究Disruptor过程中对Disruptor官方介绍与入门指南的翻译,有些部分做了适当编辑和增减。
官方介绍原文地址:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction
官方入门指南:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
简介
理解Disruptor的最佳方式是,将其与一些容易理解和目的相似的东西比较。这里的参照物就是java里的阻塞队列(BlockingQueue)。
与BlockingQueue的异同:
同:目的相同,都是为了在同一进程的线程间传输数据。
异:对消费者多播事件;预分配事件内存;可选无锁。
核心概念
- Ring Buffer : 曾经的核心。自从3.0以上,环形缓冲器只作为Disruptor存储和更新数据(事件)的容器。对于一些高级用法,可以完全替换为用户提供的容器。
- Sequence:Disruptor使用Sequence作为一种确定特定组件位置的方法。每个消费者(EventProcessor)都维护一个Sequence,Disruptor自己也是一样。大部分并发代码以来这些Sequence值的移动,因此Sequence支持AtomicLong的当前许多特性。事实上,两者唯一的区别是Sequence包含了附加功能来防止Sequence和其他值的伪共享。
- Sequencer:Disruptor的真正核心。此接口的两个实现(单生产者和多生产者)都实现了用于在生产者和消费者间快速准确传递数据的并发算法。
- Sequence Barrier:由Sequencer产生,持有Sequencer的主要发布Sequence和任意独立消费者的Sequence的索引。它包含判断是否有可供消费者处理的可用事件的逻辑。
- Wait Strategy:等待策略决定了一个消费者如何等待生产者发布到Disruptor的事件。
- Event:生产者传递给消费者的数据单元。用户自定义。
- EventProcessor:处理Disruptor事件的主要循环,拥有消费者的Sequence。有一个BatchEventProcessor包含了一个事件循环的高效实现,会在事件可用时回调用户提供的EventHandler接口实现。
- EventHandler:用户实现接口,代表Disruptor的一个消费者。
- Producer:用户调用Disruptor进行入队的代码。在框架中没有代码表示。
多播事件
这是queue和Disruptor最大的行为区别。队列中的一个事件只能被一个消费者消费,而Disruptor中的时间会发布给所有消费者。这是由于Disruptor意图处理同一数据的独立并行处理操作(译注:类似JMS的topic模式)。比如LMAX中同一数据需要进行记录日志、复制和业务逻辑操作。当然,在Disruptor中同时并行处理不同事件可以使用WorkerPool(译注:类似JMS的queue模式中的多消费者实现)。但需要注意的是,由于这种特性并非是Disruptor的首要工作,所以使用WorkerPool可能并不是最高效的做法。
查看上图,三个消费者JournalConsumer、ReplicationConsumer和ApplicationConsumer将会以相同顺序接收Disruptor所有可用消息。这实现了这些消费者的并行工作。
消费者依赖图
为了支持并发处理的真实世界应用,很有必要支持消费者间的协调工作。回顾上图,在日志记录和复制消费者完成工作前,有必要阻止业务逻辑消费者的进一步工作。我们称这个概念为gating,更准确的说是这种行为的超集称为gating。Gating发生在两个地方:第一用来保证生产者不能超过消费者。这通过调用RingBuffer.addGatingConsumers()把相关消费者添加到Disruptor实现。第二,先前提到的情况是通过从必须首先完成其处理的组件构造包含序列的SequenceBarrier来实现的。
回顾图1,有三个消费者监听RingBuffer的事件。在这个例子中,有一个依赖图。ApplicationConsumer依赖JournalConsumer和ReplicationConsumer。这意味着JournalConsumer和ReplicationConsumer可以相互并行运行。依赖关系可以从ApplicationConsumer的SequenceBarrier和JournalConsumer及ReplicationConsumer的Sequence观察到。同时引起注意的是Sequencer和下游消费者的关系。它的一个角色是保证发布不会环绕RingBuffer。为了做到这点,下游消费者的Sequence不能小于RingBuffer的Sequence。然而,使用依赖图会发生一个有趣的优化。由于ApplicationConsumer Sequence保证小于等于JournalConsumer和ReplicationConsumer(由依赖关系保证),Sequencer只需要观察ApplicationConsumer的Sequence。从广义上来说,Sequencer只需要关注依赖树种叶子节点的消费者Sequence。
事件预分配
Disruptor的一个目标是可以用于低延迟环境中。在低延迟系统中,有必要减少或消除内存分配。在Java系统中,目标是减少由于垃圾回收造成的停顿次数(在低延迟的C/C++系统中,重内存分配会由于内存分配器的征用也可能导致问题)。
为了支持这个目标,用户可以预分配Disruptor中事件的存储。用户提供的EventFactory会在Disruptor中RingBuffer每个条目构建时调用。当发布新数据到Disruptor时,有API供用户调用来持有构建出的对象,这样可以调用对象的方法或更新对象属性。在正确实现下,Disruptor保证这些操作操作是并发安全的。
可选的无锁
对低延迟的渴望造就的另一个实现细节是无锁算法在Disruptor中的大量使用。所有内存可见性和正确性保证使用内存屏障和/或CAS操作实现。真正使用锁的场景只有一个,那就是使用BlockingWatiStrategy。这样做只为了使用Condition让消费线程可以在等待新事件到达前进行park操作。许多低延迟系统使用忙等待(busy-wait)来避免使用Condition可能导致的抖动,然而一些系统的忙等待操作会导致性能的急剧下降,尤其是CPU资源被严重制约时。比方说在虚拟环境下的web服务器。
入门指南
基本的事件生产和消费
从简单的事件开始:
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
}
为了让Disruptor能够预分配事件,我们需要提供一个EventFactory完成构建:
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent>
{
public LongEvent newInstance()
{
return new LongEvent();
}
}
事件定义好后,需要创建消费者处理这些事件。这里只做简单的打印:
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " + event);
}
}
我们还需要一个事件的生产源,举个例子,假定数据是来自某种I/O设备,如网络或文件的字节缓冲(ByteBuffer)。
import com.lmax.disruptor.RingBuffer;
public class LongEventProducer
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb)
{
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
}
可以发现,相比使用简单的queue,事件的发布更具有相关性。这是由于需要事件预分配。事件发布需要(最低)2阶段方式,先声明环形缓冲器中的槽位,再发布可用数据。同时也需要把发布过程使用try/finally块包裹起来。如果声明了环形缓冲的一个槽位(通过调用RingBuffer.next())然后必须发布这个序列。如果没有这么做,会导致Disruptor的状态损坏(corruption)。特别地,在多生产者的情况下,这将会导致消费者阻塞,只能通过重启解决。
使用3.x版本的Translator
Disruptor3.0提供了一种富Lambda风格的API,旨在帮助开发者屏蔽直接操作RingBuffer的复杂性,所以3.0以上版本发布消息更好的办法是通过事件发布者(Event Publisher)或事件翻译器(Event Translator)API。如下
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;
public class LongEventProducerWithTranslator
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>()
{
public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
{
event.set(bb.getLong(0));
}
};
public void onData(ByteBuffer bb)
{
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}
这种方法另一个好处是翻译器代码可以放到一个单独的类中,以便于更容易进行单元测试。Disruptor提供了一些用于翻译器的不同的接口(EventTranslator,EventTranslatorOneArg,EventTranslatorTwoArg,等)。这样做的原因是,允许翻译器表示为静态类,或以非捕获lambda表达式(使用java8时)作为翻译方法参数,通过调用RingBuffer上的翻译器进行传递。
最后一步是把上面这些步骤统一到一起。可以手工把这些组件都组装到一起,但还是有点复杂,所以引入了DSL来简化构建。尽管通过DSL的方式不能使用有些复杂选项,但这种方式还是适合绝大多数场景。
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.onData(bb);
Thread.sleep(1000);
}
}
}
使用Java8
Disruptor API的设计影响之一是Java 8将依靠功能接口的概念作为Java Lambdas的类型声明。 Disruptor API中的大多数接口定义符合功能接口的要求,因此可以使用Lambda而不是自定义类,这样可以减少所需的重复代码(boiler place)。
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}
注意有一些类(如handler,translator)不再需要了。还要注意用于publishEvent()的lambda是如何引用传入的参数的。如果使用如下代码代替:
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
Thread.sleep(1000);
}
这会创建一个capturing lambda,意味着需要实例化一个对象来持有ByteBuffer bb
变量,通过调用publishEvent()来传递lambda。这样会创建额外不必须的垃圾,所以如果需要低GC压力就需要传递参数给lambda。
使用这种方法引用可以代替匿名的lamdba,以这种方式重写这个例子是可能的。
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class LongEventMain
{
public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println(event);
}
public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
event.set(buffer.getLong(0));
}
public static void main(String[] args) throws Exception
{
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith(LongEventMain::handleEvent);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent(LongEventMain::translate, bb);
Thread.sleep(1000);
}
}
}
基本调优选项
使用上述的方法可以在最广泛的部署场景中工作正常。然而,如果你能够确定Disruptor将要运行的硬件和软件环境,就可以调整参数提升性能。主要有以下两种调优方式:单vs.多生产者和替换等待策略。
单vs.多生产者
提高并发系统性能的最佳方法之一就是遵守单作者原则(Single Writer Principle https://mechanical-sympathy.blogspot.tw/2011/09/single-writer-principle.html,这适用于Disruptor。如果你的情况是只有一个线程会在Disruptor中发布事件,那就可以利用此功能获得额外的性能提升。
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
//.....
// Construct the Disruptor with a SingleProducerSequencer
Disruptor<LongEvent> disruptor = new Disruptor(
factory, bufferSize, ProducerType.SINGLE, new BlockingWaitStrategy(), executor);
//.....
}
}
OneToOne 性能测试(https://github.com/LMAX-Exchange/disruptor/blob/master/src/perftest/java/com/lmax/disruptor/sequenced/OneToOneSequencedThroughputTest.java)可以说明这种技术能够提升多少性能。以下测试使用i7 Sandy Bridge MacBook Air。
多生产者
Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec
单生产者
Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec
替换等待策略
BlockingWaitStategy
Disruptor默认的等待策略是BlockingWaitStategy。在BlockingWaitStategy内部使用一个典型的锁和条件(a typical lock and condition)变量处理线程唤醒。BlockingWaitStategy是可用等待策略中最慢的,但也是在CPU使用上最保守的,同时也将在最广泛的部署选项中提供最一致的行为。然而,再说一次,了解部署系统可以获得额外的性能提升。
SleepingWaitStrategy
类似BlockingWaitStategy,SleepingWaitStrategy也试图在CPU使用上保持保守,这通过一个忙等待(busy wait loop)循环实现,但在循环中间会调用LockSupport.parkNanos(1)。在一个典型的Linux系统,这样会暂停线程大概60µs(译注1µs=1000ns)。但它的好处是生产者线程除了增加响应的计数器外,不需要采取任何行动,而且不需要给条件变量发信号的成本(cost of signalling a condition variable)。然而,生产者和消费者转移事件的平均延迟会增加。这种方式最好工作在不需要低延迟,但对生产者线程影响最小的情况下。一个常见的使用场景是异步日志。
YieldingWaitStrategy
可用于低延迟系统的两种等待策略其中之一,这种策略通过消耗CPU时钟周期来达到优化延迟的目的。这种策略使用忙循环(busy spin)等待正确的序号到达。在循环内部,Thread.yield()将被调用,来允许其他排队中的线程运行。当需要很高的性能,而且事件处理者EventHandler的线程数少于CPU逻辑核心数时(比如使用超线程时),推荐使用这种策略。
BusySpinWaitStrategy
这种策略有最高的性能,但也有最高的部署边境限制。这种等待策略应该只用于事件处理者线程小于CPU物理核心数。
清除环形缓冲的对象
使用Disruptor传输数据时,对象的存活周期有可能比预期更长。为了避免发生这种情况,有必要在事件处理完毕后做清理。如果有一个事件处理器,在这个事件处理器中做清理就足够了。如果有一个事件处理链,那就可能会在链尾需要一个特定的处理器来清理这个对象。
class ObjectEvent<T>
{
T val;
void clear()
{
val = null;
}
}
public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
{
public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
{
// Failing to call clear here will result in the
// object associated with the event to live until
// it is overwritten once the ring buffer has wrapped
// around to the beginning.
event.clear();
}
}
public static void main(String[] args)
{
Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
() -> ObjectEvent<String>(), bufferSize, executor);
disruptor
.handleEventsWith(new ProcessingEventHandler())
.then(new ClearingObjectHandler());
}