无锁环形队列
稍微接触过并发编程的人就知道队列在多线程中是多么广泛地被使用。最常见的使用模式是构建生产者-消费者模型。普通的队列,比如C++ STL提供的std::queue,不是线程安全的。因此,为了安全地读写队列,需要对队列加锁。于是,队列很可能成了系统的性能瓶颈。事实上,锁只是一个表象,最本质的原因是这种方式下队列是串行操作的。 无锁队列的想法早就存在,但是最常见的是以链表方式实现的。链表实现中,需要频繁地new/delete节点。我希望使用环形队列实现,这样存储空间可以被重用。以存储类型为int、最大容量为1000的无锁环形队列为例。基本思想很简单,首先预先分配好一个数组 +++code int* slots = new int[1000]; ---code 使用两个原子变量分别表示head和tail: +++code std::atomic<size_t> head, tail; ---code 且一开始都初始化为0。 另外,为了能够实现阻塞式操作(当队列为空时,Pop操作被阻塞直到Push操作被执行;当队列为满时,Push操作被阻塞直到Pop操作被执行),需要使用两个信号量(PV量): +++code sem_t data_available; sem_t slot_available; ---code 且一开始data_available初始化为0,而slot_available初始化为1000: +++code sem_init(&data_available, 0, 0); sem_init(&slot_available, 0, 1000); ---code ============================阶段一:直观的想法=========================== 当需要Push时,首先需要得到一个空槽(即对slot_available做V操作),然后把数据放到tail所指向的slot中,然后tail++,并对data_available做P操作(即表示增加了一个有效数据)。 当需要Pop时,首先需要得到一个数据(即对data_available做V操作),然后从head所指向的slot中取出数据,然后head++,并对slot_available做P操作(即表示增加了一个空槽)。 所以代码应该是这样: +++code void Push(int data) { sem_wait(&slot_available); auto index = (tail++) % 1000; slots[index] = data; sem_post(&data_available); } int Pop() { sem_wait(&data_available); auto index = (head++) % 1000; int data = slots[index]; sem_post(&slot_available); return data; } ---code 这样对吗?年轻人你还是太年轻啊。如果任何时刻只有至多一个线程Push且至多一个线程Pop,那么是对的,否则就会崩溃。我举个简单的例子:假设有一个刚刚初始化完的空队列。某个瞬间,有两个线程同时Push,线程1抢到了slots[0],线程2抢到了slots[1]。另外有一个线程3试图Pop,它阻塞在sem_wait(&data_available)上: 1.png 结果,T1由于被OS抢占了,卡顿了一会儿,T2先完成了Push,于是调用了sem_post(&data_available),这唤醒了T3,此时head是0,于是T3去读slots[0]: 2.png 结果是,T3读到了一个垃圾值。 ============================阶段二:稍微成熟些的想法=========================== 于是你意识到,每一个slot需要有一个状态flag,标记当前slot是否包含了有效数据。于是你额外引入了一个数组 +++code bool* valids = new bool[1000]; ---code 并且都初始化为false。代码改写为: +++code void yield() { sleep(0); } void Push(int data) { sem_wait(&slot_available); auto index = (tail++) % 1000; while(valids[index]) yield(); slots[index] = data; valids[index] = true; sem_post(&data_available); } int Pop() { sem_wait(&data_available); auto index = (head++) % 1000; while(!valids[index]) yield(); int data = slots[index]; valids[index] = false; sem_post(&slot_available); return data; } ---code 这样,读线程会等到slot包含了有效数据才读,而写线程会等到slot不再包含有效数据才写。乍一眼看都是对的,而且你在实际项目中使用也没出现问题。 直到有一天,程序莫名其妙地崩溃了!!!你苦思冥想,怀疑了编译器的优化,于是加了volatile,还考虑了是不是CPU乱序执行,导致valids[index] = true先于slots[index] = data执行,于是加了__builtin_ia32_sfence(),可是依旧是运行一段时间后崩溃。 但,你毕竟智商高人一等(这个自夸也是没谁了),不出半天你就想到了原因。 现在考察一个容量为4的环形队列。一开始为空队列。某个瞬间有4个线程T1~T4同时Push,T1~T4分别抢到了slots[0]~slots[3]: 3.png 可是,T1由于OS调度关系,卡顿了很久,当T2~T4都返回时,T1还没开始真正写入数据: 4.png 这时,又来了2个线程T5和T6试图Pop,很自然地,他们都能无阻塞地通过sem_wait(&data_available),分别读slots[0]和slots[1]: 5.png 由于T1还是没有写入slots[0],于是valids[0]一直是false,于是T5只能等着,而T6取到数据后返回: 6.png 此时,又来了一个线程T7,试图做Push。很自然地,他无阻碍地通过sem_wait(&slot_available),写slots[0],于是诡异的事情发生了: 7.png 有两个线程T1和T7都试图往slots[0]里写数据。而且,由于Push()的逻辑中,只是判断true/false,因此T1和T7都认为自己是可以写进去的,于是就脏写了,从而bug现形! ============================阶段三:深思熟虑=========================== 之所以会出现这种问题,还是归咎于“空间折叠”,听起来很高大上,其实很简单。假设这个队列是建立在一个无穷长的数组上的,那么上面的方法没有问题。但是当数组会“绕回来”时,导致无数个逻辑上的slot对应到了同一个物理上的slot。 因此。每一个slot就得是一个“交易所”,能够撮合多个买家和多个卖家。如果只用一个bool表示当前状态,那么虽然能够协调买家和卖家,却无法协调多个卖家, 也不能协调多个买家。因此,得使用一个有四种状态的变量,四个状态分别是IDLE, PUTTING, VALID, TAKING。IDLE表示当前slot为空,可以写入数据;PUTTING表示有一个线程正在写入数据;VALID表示当前slot有数据,可以读出;TAKING表示有一个线程正在取出数据。Push线程必须原子性地把state从IDLE改为PUTTING,才能写入数据,然后把state设为VALID。Pop线程必须原子性地把state从VALID改为TAKING,才能取出数据,然后把state设为IDLE。这样,既隔离了Push与Pop的争抢,又隔离了多个Push/Pop之间的争抢。 最终代码如下: ring_buffer.hpp +++code #ifndef RING_BUFFER #define RING_BUFFER #include <common.h> #include <unistd.h> #include <semaphore.h> template <typename T> class RingBuffer { private: enum State { IDLE = 0, PUTTING, VALID, TAKING, }; size_t capacity; T* slots; State* states; std::atomic<size_t> head; std::atomic<size_t> tail; sem_t data_available; sem_t slot_available; private: void yield() { sleep(0); } public: RingBuffer(size_t _capacity) : capacity(_capacity), head(0), tail(0) { assert(capacity > 0); slots = new T[capacity]; states = new State[capacity]; for(size_t i = 0; i < capacity; ++i) states[i] = State::IDLE; DO_WITH_ASSERT(sem_init(&data_available, 0, 0), _ret_ == 0); DO_WITH_ASSERT(sem_init(&slot_available, 0, capacity), _ret_ == 0); } ~RingBuffer() { delete[] slots; delete[] states; DO_WITH_ASSERT(sem_destroy(&data_available), _ret_ == 0); DO_WITH_ASSERT(sem_destroy(&slot_available), _ret_ == 0); } void Push(T data) { DO_WITH_ASSERT(sem_wait(&slot_available), _ret_ == 0); auto index = (tail++) % capacity; auto* slot = slots + index; auto* state = states + index; while(!__sync_bool_compare_and_swap(state, State::IDLE, State::PUTTING)) yield(); *slot = data; DO_WITH_ASSERT(__sync_lock_test_and_set(state, State::VALID), _ret_ == State::PUTTING); DO_WITH_ASSERT(sem_post(&data_available), _ret_ == 0); } T Pop() { DO_WITH_ASSERT(sem_wait(&data_available), _ret_ == 0); auto index = (head++) % capacity; auto* slot = slots + index; auto* state = states + index; while(!__sync_bool_compare_and_swap(state, State::VALID, State::TAKING)) yield(); T data = *slot; DO_WITH_ASSERT(__sync_lock_test_and_set(state, State::IDLE), _ret_ == State::TAKING); DO_WITH_ASSERT(sem_post(&slot_available), _ret_ == 0); return data; } size_t Length() { size_t head_snap = head.load(); size_t tail_snap = tail.load(); if(head_snap < tail_snap) return tail_snap - head_snap; else return 0; } }; #endif ---code ===============================最终============================= 然而,在实际的应用中,有可能只需要Pop阻塞,而不需要Push阻塞(比如使用ring_buffer实现某种资源管理器,当需要资源时,使用Pop获得一个空闲资源,用完后使用Push归还。那么可以肯定的是,归还时一定是有“空槽”可以直接归还成功的)。这种情况下,slot_available就是多余的,只会增加性能开销。因此,我需要能够使用条件编译、或者更灵活的模板来实现“定制化”功能。最终的代码如下: ring_buffer.h +++code #pragma once #include <common.h> #include <unistd.h> #include <semaphore.h> // T: 负载类型 // block_pop: 若为true,当队列为空时,会阻塞等待。若为false,请确保pop时队列一定不为空 // block_push: 若为true,当队列为满时,会阻塞等待。若为false,请确保push时队列一定不为满 // busy_wait: 若slot存在争抢,是否忙等(否则放弃时间片) template <typename T, bool block_pop, bool block_push, bool busy_wait = false> class RingBuffer { private: enum State { IDLE = 0, // 该slot没有数据 PUTTING, // 有一个线程正在放入数据 VALID, // 该slot有数据 TAKING, // 有一个线程正在取出数据 }; size_t capacity; // 容量 T* slots; // 所有的slot State* states; // 每一个slot的状态 std::atomic<size_t> head; // 逻辑上的头(不回滚) std::atomic<size_t> tail; // 逻辑上的尾(不回滚) sem_t data_available; // 可以取得的数据个数 sem_t slot_available; // 可以使用的slot个数 private: void wait() { if(!busy_wait) sleep(0); } public: // capacity: 最大容量 RingBuffer(size_t _capacity) : capacity(_capacity), head(0), tail(0) { assert(capacity > 0); slots = new T[capacity]; states = new State[capacity]; for(size_t i = 0; i < capacity; i++) states[i] = State::IDLE; if(block_pop) DO_WITH_ASSERT(sem_init(&data_available, 0, 0), _ret_ == 0); if(block_push) DO_WITH_ASSERT(sem_init(&slot_available, 0, capacity), _ret_ == 0); } ~RingBuffer() { delete[] slots; delete[] states; if(block_pop) DO_WITH_ASSERT(sem_destroy(&data_available), _ret_ == 0); if(block_push) DO_WITH_ASSERT(sem_destroy(&slot_available), _ret_ == 0); } // 向队列塞入(多线程安全) void Push(T data) { if(block_push) DO_WITH_ASSERT(sem_wait(&slot_available), _ret_ == 0); auto index = (tail++) % capacity; auto* slot = slots + index; auto* state = states + index; // 争抢放入权 while(!__sync_bool_compare_and_swap(state, State::IDLE, State::PUTTING)) wait(); *slot = data; // 必然有且只有一个线程(即当前线程)会修改state,所以必然一次性成功 DO_WITH_ASSERT(__sync_lock_test_and_set(state, State::VALID), _ret_ == State::PUTTING); if(block_pop) DO_WITH_ASSERT(sem_post(&data_available), _ret_ == 0); } // 从队列取出(多线程安全) T Pop() { if(block_pop) DO_WITH_ASSERT(sem_wait(&data_available), _ret_ == 0); auto index = (head++) % capacity; auto* slot = slots + index; auto* state = states + index; // 争抢取出权 while(!__sync_bool_compare_and_swap(state, State::VALID, State::TAKING)) wait(); T data = *slot; // 必然有且只有一个线程(即当前线程)会修改state,所以必然一次性成功 DO_WITH_ASSERT(__sync_lock_test_and_set(state, State::IDLE), _ret_ == State::TAKING); if(block_push) DO_WITH_ASSERT(sem_post(&slot_available), _ret_ == 0); return data; } // 获取当前队列长度(多线程安全) size_t Length() { size_t head_snap = head.load(); size_t tail_snap = tail.load(); if(head_snap < tail_snap) return tail_snap - head_snap; else return 0; } }; ---code 依赖的common.h如下: +++code #pragma once #include <errno.h> #include <stdio.h> #include <assert.h> #include <stdint.h> #include <stdlib.h> #include <string.h> #define ERROR(cleanup, ret, show_errstr, msgs...) \ ({ \ const char* _errstr_ = (show_errstr) ? strerror(errno) : ""; \ (cleanup); \ fprintf(stderr, "[<%s> @ %s: %d]: ", __FUNCTION__, __FILE__, __LINE__); \ fprintf(stderr, ##msgs); \ fprintf(stderr, "%s\n", _errstr_); \ return (ret); \ }) #ifdef NDEBUG #define DO_WITH_ASSERT(statement, expection) \ (statement) #else #define DO_WITH_ASSERT(statement, expection) \ ({ \ auto _ret_ = (statement); \ assert(expection); \ }) #endif #define DIV_CEIL(n, p) \ ({ \ assert((n) >= 0); \ assert((p) > 0); \ ((n) == 0 ? 0 : ((n) - 1) / (p) + 1); \ }) #define DIV_FLOOR(n, p) \ ({ \ assert((n) >= 0); \ assert((p) > 0); \ (n) / (p); \ }) ---code