环形无锁队列,某些情况下又叫做 ring buffer。

在底层代码的消息收发过程中,收和发通常是分开处理的,即一个读线程、一个写线程。对于这种逻辑上只有两个线程(一读一写)的场景,可以用环形队列来实现线程安全,从而达到无锁的目的。需要注意的是,这种队列在多个读写线程并发时无法保证线程安全,只适用于单一读线程加单一写线程的模型。

end=write     start = read

基本思路是:用 readwrite 两个下标分别表示当前的读写位置,两个线程各自只操作自己的下标。当环形队列 buff 写满后,新数据先缓存到 std::queue(即 out_cache)里;为了保持数据一致性,push 操作会在每次调用时尝试将 out_cache 中的数据刷入 buff。这个设计带来了一个隐患:

两个线程分别操作各自的 push / pop_frontout_cache 是环形队列写满后的溢出缓冲区,里面的数据只有在 push 触发 dump 时才会被写入 buff。这意味着只要 push 的调用次数不超过 popout_cache 中的数据就始终无法被消费——尤其是程序退出时,out_cache 里可能还残留着未处理的数据。

这种无锁处理方式去掉了 mutex,效率较高,但也引入了一个新问题:缺少 mutex / condition 机制后,线程很容易空转(CPU 空跑)。例如在队列为空时,读线程会满载空转。为了充分利用这段 CPU 时间,可以把 encode/decode 等附加操作分配给它处理,或者加上 Sleep——当然一旦加了 Sleep,在极端高频场景下无锁本身的优势就会大打折扣。

writeread 相等时,既可以表示队列满,也可以表示队列空(两者都指向下一个待写入/读取的 index)。此时 buff[index] 是两个线程共享的。如果能在这个临界状态下保证安全,整个队列就是安全的——其他情况下 readwrite 根本不会指向同一个 index,不存在交集。

read == write 的临界情况下,读线程先取走数据再将标志置为 false,写线程先写入数据再将标志置为 true。这个"先读写数据、再改标志"的顺序不能颠倒,否则会出现标志已就绪但数据尚未写入的 BUG。为什么在单读单写的场景下这种操作是安全的?传统多读多写不安全的根本原因在于:可能读到旧数据、多个线程同时写入导致数据错乱、或者写到一半就被读走。而这里严格限定了一个线程读、一个线程写,通过 bool 标志控制避免了上述并发冲突(movbool 赋值是否原子无关紧要,因为只有在满/空的临界情况下读写才会出现先后顺序,其余情况下两者可以完全并行;同时,read 只写 falsewrite 只写 true,不会对同一个 bool 交叉写入不同的值)。

内存安全性:如果无限制地接受写入,极端情况下 out_cache 可能爆满。

以下代码包含自检逻辑(注释中的 verify 部分):如果运行过程中自检失败,程序会退出;反之则说明在安全性上没有 BUG。

#include <fstream>
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <iostream>
#include <windows.h>
#include <atomic>
#include <memory>
#include <queue>
using namespace std;

struct Data
{
	int id = 3;
};

using QueuePair11 = std::pair<volatile std::atomic<bool>, void*>;
using QueuePair = std::pair<volatile bool, void*>;

#define  MAX_SIZE 1024
#define  MAX_OUT_CACHE_SIZE 1024000

class Queue
{
public:
	void* front()
	{
		if (buff[read].first)
		{
			return buff[read].second;
		}
		return nullptr;
	}
	void pop()
	{
		buff[read].first = false;
		read = ++read %MAX_SIZE;
	}
	bool push(void* value)
	{
		bool hasDumpFull = false;
		hasDumpFull = dump();
		if (hasDumpFull == false && buff[write].first == false)
		{
			buff[write].second = value;
			buff[write].first = true;
			write = ++write % MAX_SIZE;
		}
		else
		{
			if (out_cache.size() > MAX_OUT_CACHE_SIZE)return false;
			out_cache.push(value);
		}
		return true;
	}
	//return is dump full ?
	bool dump()
	{
		while (out_cache.empty() == false)
		{
			if (buff[write].first)
			{//  写满为止
				return true;
			}
			auto &v = out_cache.front();
			out_cache.pop();
			buff[write].second = v;
			buff[write].first = true;
			write = ++write%MAX_SIZE;
		}
		return false;
	}
	QueuePair buff[MAX_SIZE];
	int read = 0;
	int write = 0;
	std::queue<void*> out_cache;
};


int main(int argc, char* argv[])
{
	Queue _queue;
	/*int ss = 1024;
	int t = 1020;
	cout << (t&(ss - 1));
	*/
	std::thread t_write([&]()
	{
		for (int i = 0;; i++)
		{
			if (i > 0xffff)
			{
				i = 0;
			}

			auto v = new Data();
			(v->id) = i;
			while (!_queue.push(v)){ Sleep(0); };
		}
	});
	t_write.detach();

	std::thread t_read([&]()
	{
		int last = -1;
		while (true)
		{
			auto v = (Data*)_queue.front();
			if (v == nullptr)
			{
				continue;
			}
			cout << v->id << endl;
			if (v->id >= 0xffff)
			{
				last = -1;
				v->id = 0xffffff;
				delete v;
				_queue.pop();
				continue;;
			}
			if (*(int*)v - last != 1)
			{// verify
				exit(0);
			}
			last = v->id;
			v->id = 0xffffff;
			delete v;
			_queue.pop();
		}
	});
	t_read.detach();

	while (true)
	{
		Sleep(1);
	}

	system("pause");
	return 0;
}

实测自检两个多小时程序无退出,没有测出 BUG。

在实现上述版本时,有一个 BUG 引发了一些思考:如果把上面的 dump 换成下面这个版本,就会出现下图中那个诡异的 BUG。

bool dump33()
	{
		bool hasWrite = false;

		while (out_cache.empty() == false)
		{
			if (buff[write].first)
			{
				return hasWrite;
			}
			auto & v = out_cache.front();
			out_cache.pop();
			hasWrite = true;
			buff[write].second = v;
			buff[write].first = true;
			write = ++write%MAX_SIZE;
		}
		return hasWrite;
	}

这个 BUG 的表象是输出顺序错乱,但相对顺序并没有错——例如 48170 到 48179 的序列,按正常顺序走到 48179 是对的,只是中间插入了其他数字。

根本原因在于:当 dump33 返回 false 时,表示"队列满、本次未写入",此时会触发 push 里的第二个条件判断——&& buff[write].first == false。由于 write 指向的是下一个位置,某些情况下(恰好该位置已被 read 消费并置为 false)这个条件会成立,导致新数据被直接写入 buff 而不是先进 out_cache 排队。正常情况下新 push 的数据应当排在 out_cache 所有数据之后,而这段有 BUG 的代码会让它提前写入,从而打乱了顺序。