生产与消费流程

生产者

下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量 CAS,保证操作的线程安全。

单个生产者

生产者单线程写数据的流程比较简单:

  • 申请写入 m 个元素;
  • 若是有 m 个元素可以入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;
  • 若是返回的正确,则生产者开始写入元素。

单个生产者生产过程示意图

多个生产者

多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor 的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过 CAS 很容易达到。只需要在分配元素的时候,通过 CAS 判断一下这段空间是否已经分配出去即可。

但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor 在多个生产者的情况下,引入了一个与 Ring Buffer 大小相同的 buffer:available Buffer。当某个位置写入成功的时候,便把 availble Buffer 相应的位置置位,标记为写入成功。读取的时候,会遍历 available Buffer,来判断元素是否已经就绪。

读数据

消费者多线程读取的情况会复杂很多:

  • 申请读取到序号 n;
  • 若 writer cursor >= n,这时仍然无法确定连续可读的最大下标。从 reader cursor 开始读取 available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
  • 消费者读取元素。

如下图所示,读线程读到下标为 2 的元素,三个线程 Writer1/Writer2/Writer3 正在向 RingBuffer 相应位置写数据,写线程被分配到的最大元素下标是 11。读线程申请读取到下标从 3 到 11 的元素,判断 writer cursor>=11。然后开始读取 availableBuffer,从 3 开始,往后读取,发现下标为 7 的元素没有生产成功,于是 WaitFor(11)返回 6。

然后,消费者读取下标从 3 到 6 共计 4 个元素。

多个生产者情况下,消费者消费过程示意图

写数据

多个生产者写入的时候:

  • 申请写入 m 个元素;
  • 若是有 m 个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
  • 生产者写入元素,写入元素的同时设置 available Buffer 里面相应的位置,以标记自己哪些位置是已经写入成功的。

如下图所示,Writer1 和 Writer2 两个线程写入数组,都申请可写的数组空间。Writer1 被分配了下标 3 到下表 5 的空间,Writer2 被分配了下标 6 到下标 9 的空间。Writer1 写入下标 3 位置的元素,同时把 available Buffer 相应位置置位,标记已经写入成功,往后移一位,开始写下标 4 位置的元素。Writer2 同样的方式。最终都写入完成。

多个生产者情况下,生产者生产过程示意图

防止不同生产者对同一段空间写入的代码,如下所示:

public long tryNext(int n) throws InsufficientCapacityException
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long current;
    long next;

    do
    {
        current = cursor.get();
        next = current + n;

        if (!hasAvailableCapacity(gatingSequences, n, current))
        {
            throw InsufficientCapacityException.INSTANCE;
        }
    }
    while (!cursor.compareAndSet(current, next));

    return next;
}

通过 do/while 循环的条件 cursor.compareAndSet(current, next),来判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,该函数会返回失败,While 循环重新执行,申请写入空间。

下一页