系列设计

Disruptor设计

Disruptor通过以下设计来解决队列速度慢的问题:

  • 环形数组结构:为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
  • 元素位置定位:数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。indexlong类型,即使100QPS的处理速度,也需要30万年才能用完。
  • 无锁设计:每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

核心组件

  • RingBuffer:被看作Disruptor最主要的组件,然而从3.0开始RingBuffer仅仅负责存储和更新在Disruptor中流通的数据。对一些特殊的使用场景能够被用户(使用其他数据结构)完全替代。
  • Sequence:Disruptor使用Sequence来表示一个特殊组件处理的序号。和Disruptor一样,每个消费者(EventProcessor)都维持着一个Sequence。大部分的并发代码依赖这些Sequence值的运转,因此Sequence支持多种当前为AtomicLong类的特性。
  • Sequencer:这是Disruptor真正的核心。实现了这个接口的两种生产者(单生产者和多生产者)均实现了所有的并发算法,为了在生产者和消费者之间进行准确快速的数据传递。
  • SequenceBarrier:Sequencer生成,并且包含了已经发布的Sequence的引用,这些的Sequence源于Sequencer和一些独立的消费者的Sequence。它包含了决定是否有供消费者来消费的Event的逻辑。
  • WaitStrategy:决定一个消费者将如何等待生产者将Event置入Disruptor
  • Event:从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event,因为它完全是由用户定义的。
  • EventProcessor:主要事件循环,处理Disruptor中的Event,并且拥有消费者的Sequence。它有一个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象。
  • EventHandler:由用户实现并且代表了Disruptor中的一个消费者的接口。
  • Producer:由用户实现,它调用RingBuffer来插入事件(Event),在Disruptor中没有相应的实现代码,由用户实现。
  • WorkProcessor:确保每个sequence只被一个processor消费,在同一个WorkPool中的处理多个WorkProcessor不会消费同样的sequence
  • WorkerPool:一个WorkProcessor池,其中WorkProcessor将消费Sequence,所以任务可以在实现WorkHandler接口的worker吃间移交
  • LifecycleAware:当BatchEventProcessor启动和停止时,于实现这个接口用于接收通知。

RingBuffer

正如名字所说的一样,它是一个环(首尾相接的环,你可以把它用做在不同上下文(线程)间传递数据的buffer。基本来说,ringbuffer拥有一个序号,这个序号指向数组中下一个可用元素。

环示意图

Disruptor说的是生产者和消费者的故事。有一个数组,生产者往里面扔芝麻;消费者从里面捡芝麻。但是扔芝麻和捡芝麻也要考虑速度的问题:

  • 消费者捡的比扔的快,那么消费者要停下来。生产者扔了新的芝麻,然后消费者继续。
  • 数组的长度是有限的,生产者到末尾的时候会再从数组的开始位置继续。这个时候可能会追上消费者,消费者还没从那个地方捡走芝麻,这个时候生产者要等待消费者捡走芝麻,然后继续。

随着你不停地填充这个buffer(可能也会有相应的读取,这个序号会一直增长,直到绕过这个环。要找到数组中当前序号指向的元素,可以通过mod操作:sequence mod array length = array index(取模操作)以上面的ringbuffer为例(javamod语法:12 % 10 = 2。很简单吧。

Java内置队列的不足

PCP又称Bounded-Buffer问题,其核心就是保证对一个Buffer的存取操作在多线程环境下不会出错。使用Java中的ArrayBlockingQueueLinkedBlockingQueue类能轻松的完成PCP模型,这对于一般程序已经没问题了,但是对于并发度高、TPS要求较大的系统则不然。

Java的内置队列如下表所示。

队列 有界性 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
ConcurrentLinkedQueue unbounded 无锁 linkedlist
LinkedTransferQueue unbounded 无锁 linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap

队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueueConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。

通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内;而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue

锁的损耗

BlockingQueue使用的是package java.util.concurrent.locks中实现的锁,当多个线程(例如生产者)同时写入Queue时,锁的争抢会导致只有一个生产者可以执行,其他线程都中断了,也就是线程的状态从RUNNING切换到BLOCKED,直到某个生产者线程使用完Buffer后释放锁,其他线程状态才从BLOCKED切换到RUNNABLE,然后时间片到其他线程后再进行锁的争抢。上述过程中,一般来说生产者存放一个数据到Buffer中所需时间是非常短的,操作系统切换线程上下文的速度也是非常快的,但是当线程数量增多后,OS切换线程所带来的开销逐渐增多,锁的反复申请和释放成为性能瓶颈。BlockingQueue除了使用锁带来的性能损失外,还可能因为线程争抢的顺序问题造成性能再次损失:实际使用中发现线程的调度顺序并不理想,可能出现短时间内OS频繁调度出生产者或消费者的情况,这样造成缓冲区可能短时间内被填满或被清空的极端情况(理想情况应该是缓冲区长度适中,生产和消费速度基本一致)

下面是ArrayBlockingQueue通过加锁的方式实现的offer方法,保证线程安全。

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            insert(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

对于上面的问题Disruptor的解决方案是:不用锁。

环示意图

Disruptor使用一个Ring Buffer存放生产者的产品,环形缓冲区实际上还是一段连续内存,之所以称作环形是因为它对数据存放位置的处理,生产者和消费者各有一个指针(数组下标,消费者的指针指向下一个要读取的Slot,生产者指针指向下一个要放入的Slot,消费或生产后,各自的指针值p = (p +1) % nn是缓冲区长度,这样指针在缓冲区上反复游走,故可以将缓冲区看成环状Ring Buffer并非Disruptor原创,Linux内核中就有环形缓冲区的实现)使用Ring Buffer时:

  • 当生产者和消费者都只有一个时,由于两个线程分别操作不同的指针,所以不需要锁。
  • 当有多个消费者时,每个消费者各自控制自己的指针,依次读取每个Slot(也就是每个消费者都会读取到所有的产品,这时只需要保证生产者指针不会超过最慢的消费者(超过最后一个消费者“一圈”)即可,也不需要锁。
  • 当有多个生产者时,多个线程共用一个写指针,此处需要考虑多线程问题,例如两个生产者线程同时写数据,当前写指针=0,运行后其中一个线程应获得缓冲区0Slot,另一个应该获得1号,写指针=2。对于这种情况,Disruptor使用CAS来保证多线程安全。

伪共享

ArrayBlockingQueue有三个成员变量:

  • takeIndex:需要被取走的元素下标
  • putIndex:可被元素插入的位置的下标
  • count:队列中元素的数量

这三个变量很容易放到一个缓存行中,但是之间修改没有太多的关联。所以每次修改,都会使之前缓存的数据失效,从而不能完全达到共享的效果。

ArrayBlockingQueue 伪共享示意图

如上图所示,当生产者线程put一个元素到ArrayBlockingQueue时,putIndex会修改,从而导致消费者线程的缓存中的缓存行无效,需要从主存中重新读取。这种无法充分使用缓存行特性的现象,称为伪共享。解决伪共享问题,可以在变量的前后都占据一定的填充位置,尽量让变量占用一个完整的缓存行。CPU1上的线程更新了X,则CPU2上的Y则不会失效。同样地,CPU2上的线程更新了Y,则CPU1的不会失效。

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

/**
 * <p>Concurrent sequence class used for tracking the progress of
 * the ring buffer and event processors.  Support a number
 * of concurrent operations including CAS and order writes.
 *
 * <p>Also attempts to be more efficient with regards to false
 * sharing by adding padding around the volatile field.
 */
public class Sequence extends RhsPadding
{
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

    static
    {
        UNSAFE = Util.getUnsafe();
        try
        {
            VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
        }
        catch (final Exception e)
        {
            throw new RuntimeException(e);
        }
    }
    // ... ...
}

Sequence的实现中,主要使用的是Value,但通过LhsPaddingRhsPaddingValue的前后填充了一些空间,使Value无冲突的存在于缓存行中。

上一页