系列设计
Disruptor 设计
- 环形数组结构:为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
- 元素位置定位:数组长度
2^n ,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index 溢出的问题。index 是long 类型,即使100 万QPS 的处理速度,也需要30 万年才能用完。 - 无锁设计:每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
核心组件
RingBuffer: 被看作Disruptor 最主要的组件,然而从3.0 开始RingBuffer 仅仅负责存储和更新在Disruptor 中流通的数据。对一些特殊的使用场景能够被用户( 使用其他数据结构) 完全替代。- Sequence:
Disruptor 使用Sequence 来表示一个特殊组件处理的序号。和Disruptor 一样,每个消费者(EventProcessor) 都维持着一个Sequence 。大部分的并发代码依赖这些Sequence 值的运转,因此Sequence 支持多种当前为AtomicLong 类的特性。 - Sequencer:这是
Disruptor 真正的核心。实现了这个接口的两种生产者(单生产者和多生产者)均实现了所有的并发算法,为了在生产者和消费者之间进行准确快速的数据传递。 SequenceBarrier: 由Sequencer 生成,并且包含了已经发布的Sequence 的引用,这些的Sequence 源于Sequencer 和一些独立的消费者的Sequence 。它包含了决定是否有供消费者来消费的Event 的逻辑。- WaitStrategy:决定一个消费者将如何等待生产者将
Event 置入Disruptor 。 - Event:从生产者到消费者过程中所处理的数据单元。
Disruptor 中没有代码表示Event ,因为它完全是由用户定义的。 - EventProcessor:主要事件循环,处理
Disruptor 中的Event ,并且拥有消费者的Sequence 。它有一个实现类是BatchEventProcessor ,包含了event loop 有效的实现,并且将回调到一个EventHandler 接口的实现对象。 - EventHandler:由用户实现并且代表了
Disruptor 中的一个消费者的接口。 - Producer:由用户实现,它调用
RingBuffer 来插入事件(Event) ,在Disruptor 中没有相应的实现代码,由用户实现。 - WorkProcessor:确保每个
sequence 只被一个processor 消费,在同一个WorkPool 中的处理多个WorkProcessor 不会消费同样的sequence 。 - WorkerPool:一个
WorkProcessor 池,其中WorkProcessor 将消费Sequence ,所以任务可以在实现WorkHandler 接口的worker 吃间移交 - LifecycleAware:当
BatchEventProcessor 启动和停止时,于实现这个接口用于接收通知。
RingBuffer
正如名字所说的一样,它是一个环(首尾相接的环

- 消费者捡的比扔的快,那么消费者要停下来。生产者扔了新的芝麻,然后消费者继续。
- 数组的长度是有限的,生产者到末尾的时候会再从数组的开始位置继续。这个时候可能会追上消费者,消费者还没从那个地方捡走芝麻,这个时候生产者要等待消费者捡走芝麻,然后继续。
随着你不停地填充这个
Java 内置队列的不足
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist |
ConcurrentLinkedQueue | unbounded | 无锁 | linkedlist |
LinkedTransferQueue | unbounded | 无锁 | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是
通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内
锁的损耗
下面是
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
对于上面的问题

- 当生产者和消费者都只有一个时,由于两个线程分别操作不同的指针,所以不需要锁。
- 当有多个消费者时,每个消费者各自控制自己的指针,依次读取每个
Slot (也就是每个消费者都会读取到所有的产品) ,这时只需要保证生产者指针不会超过最慢的消费者(超过最后一个消费者“一圈”)即可,也不需要锁。 - 当有多个生产者时,多个线程共用一个写指针,此处需要考虑多线程问题,例如两个生产者线程同时写数据,当前写指针
=0 ,运行后其中一个线程应获得缓冲区0 号Slot ,另一个应该获得1 号,写指针=2 。对于这种情况,Disruptor 使用CAS 来保证多线程安全。
伪共享
- takeIndex:需要被取走的元素下标
- putIndex:可被元素插入的位置的下标
- count:队列中元素的数量
这三个变量很容易放到一个缓存行中,但是之间修改没有太多的关联。所以每次修改,都会使之前缓存的数据失效,从而不能完全达到共享的效果。

如上图所示,当生产者线程
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
/**
* <p>Concurrent sequence class used for tracking the progress of
* the ring buffer and event processors. Support a number
* of concurrent operations including CAS and order writes.
*
* <p>Also attempts to be more efficient with regards to false
* sharing by adding padding around the volatile field.
*/
public class Sequence extends RhsPadding
{
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static
{
UNSAFE = Util.getUnsafe();
try
{
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
}
catch (final Exception e)
{
throw new RuntimeException(e);
}
}
// ... ...
}
在