环形无锁队列,某些情况下又叫做 ring buffer。
在底层代码的消息收发过程中,收和发通常是分开处理的,即一个读线程、一个写线程。对于这种逻辑上只有两个线程(一读一写)的场景,可以用环形队列来实现线程安全,从而达到无锁的目的。需要注意的是,这种队列在多个读写线程并发时无法保证线程安全,只适用于单一读线程加单一写线程的模型。
基本思路是:用 read 和 write 两个下标分别表示当前的读写位置,两个线程各自只操作自己的下标。当环形队列 buff 写满后,新数据先缓存到 std::queue(即 out_cache)里;为了保持数据一致性,push 操作会在每次调用时尝试将 out_cache 中的数据刷入 buff。这个设计带来了一个隐患:
两个线程分别操作各自的 push / pop_front。out_cache 是环形队列写满后的溢出缓冲区,里面的数据只有在 push 触发 dump 时才会被写入 buff。这意味着只要 push 的调用次数不超过 pop,out_cache 中的数据就始终无法被消费——尤其是程序退出时,out_cache 里可能还残留着未处理的数据。
这种无锁处理方式去掉了 mutex,效率较高,但也引入了一个新问题:缺少 mutex / condition 机制后,线程很容易空转(CPU 空跑)。例如在队列为空时,读线程会满载空转。为了充分利用这段 CPU 时间,可以把 encode/decode 等附加操作分配给它处理,或者加上 Sleep——当然一旦加了 Sleep,在极端高频场景下无锁本身的优势就会大打折扣。
当 write 与 read 相等时,既可以表示队列满,也可以表示队列空(两者都指向下一个待写入/读取的 index)。此时 buff[index] 是两个线程共享的。如果能在这个临界状态下保证安全,整个队列就是安全的——其他情况下 read 和 write 根本不会指向同一个 index,不存在交集。
在 read == write 的临界情况下,读线程先取走数据再将标志置为 false,写线程先写入数据再将标志置为 true。这个"先读写数据、再改标志"的顺序不能颠倒,否则会出现标志已就绪但数据尚未写入的 BUG。为什么在单读单写的场景下这种操作是安全的?传统多读多写不安全的根本原因在于:可能读到旧数据、多个线程同时写入导致数据错乱、或者写到一半就被读走。而这里严格限定了一个线程读、一个线程写,通过 bool 标志控制避免了上述并发冲突(mov 对 bool 赋值是否原子无关紧要,因为只有在满/空的临界情况下读写才会出现先后顺序,其余情况下两者可以完全并行;同时,read 只写 false,write 只写 true,不会对同一个 bool 交叉写入不同的值)。
内存安全性:如果无限制地接受写入,极端情况下 out_cache 可能爆满。
以下代码包含自检逻辑(注释中的 verify 部分):如果运行过程中自检失败,程序会退出;反之则说明在安全性上没有 BUG。
#include <fstream>
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <iostream>
#include <windows.h>
#include <atomic>
#include <memory>
#include <queue>
using namespace std;
struct Data
{
int id = 3;
};
using QueuePair11 = std::pair<volatile std::atomic<bool>, void*>;
using QueuePair = std::pair<volatile bool, void*>;
#define MAX_SIZE 1024
#define MAX_OUT_CACHE_SIZE 1024000
class Queue
{
public:
void* front()
{
if (buff[read].first)
{
return buff[read].second;
}
return nullptr;
}
void pop()
{
buff[read].first = false;
read = ++read %MAX_SIZE;
}
bool push(void* value)
{
bool hasDumpFull = false;
hasDumpFull = dump();
if (hasDumpFull == false && buff[write].first == false)
{
buff[write].second = value;
buff[write].first = true;
write = ++write % MAX_SIZE;
}
else
{
if (out_cache.size() > MAX_OUT_CACHE_SIZE)return false;
out_cache.push(value);
}
return true;
}
//return is dump full ?
bool dump()
{
while (out_cache.empty() == false)
{
if (buff[write].first)
{// 写满为止
return true;
}
auto &v = out_cache.front();
out_cache.pop();
buff[write].second = v;
buff[write].first = true;
write = ++write%MAX_SIZE;
}
return false;
}
QueuePair buff[MAX_SIZE];
int read = 0;
int write = 0;
std::queue<void*> out_cache;
};
int main(int argc, char* argv[])
{
Queue _queue;
/*int ss = 1024;
int t = 1020;
cout << (t&(ss - 1));
*/
std::thread t_write([&]()
{
for (int i = 0;; i++)
{
if (i > 0xffff)
{
i = 0;
}
auto v = new Data();
(v->id) = i;
while (!_queue.push(v)){ Sleep(0); };
}
});
t_write.detach();
std::thread t_read([&]()
{
int last = -1;
while (true)
{
auto v = (Data*)_queue.front();
if (v == nullptr)
{
continue;
}
cout << v->id << endl;
if (v->id >= 0xffff)
{
last = -1;
v->id = 0xffffff;
delete v;
_queue.pop();
continue;;
}
if (*(int*)v - last != 1)
{// verify
exit(0);
}
last = v->id;
v->id = 0xffffff;
delete v;
_queue.pop();
}
});
t_read.detach();
while (true)
{
Sleep(1);
}
system("pause");
return 0;
}
实测自检两个多小时程序无退出,没有测出 BUG。
在实现上述版本时,有一个 BUG 引发了一些思考:如果把上面的 dump 换成下面这个版本,就会出现下图中那个诡异的 BUG。
bool dump33()
{
bool hasWrite = false;
while (out_cache.empty() == false)
{
if (buff[write].first)
{
return hasWrite;
}
auto & v = out_cache.front();
out_cache.pop();
hasWrite = true;
buff[write].second = v;
buff[write].first = true;
write = ++write%MAX_SIZE;
}
return hasWrite;
}
这个 BUG 的表象是输出顺序错乱,但相对顺序并没有错——例如 48170 到 48179 的序列,按正常顺序走到 48179 是对的,只是中间插入了其他数字。
根本原因在于:当 dump33 返回 false 时,表示"队列满、本次未写入",此时会触发 push 里的第二个条件判断——&& buff[write].first == false。由于 write 指向的是下一个位置,某些情况下(恰好该位置已被 read 消费并置为 false)这个条件会成立,导致新数据被直接写入 buff 而不是先进 out_cache 排队。正常情况下新 push 的数据应当排在 out_cache 所有数据之后,而这段有 BUG 的代码会让它提前写入,从而打乱了顺序。