线程死锁一引起的系列思考

从一个实际遇到的死锁 Bug 出发,梳理死锁的产生条件、典型模式、以及 C++ 标准库提供的防御手段。

死锁产生的四个必要条件

要让死锁真正发生,必须同时满足下面四个条件,缺一不可:

01
互斥条件
资源在一段时间内只能由一个进程占用,其他进程若要访问只能排队等待。
02
请求和保持条件
进程已持有至少一个资源,又去申请被其他进程占用的新资源,被阻塞时仍不释放已有资源。
03
不剥夺条件
进程获得的资源在用完之前不能被强行夺走,只能由自己释放。
04
环路等待条件
存在进程-资源的环形等待链:P0 等 P1 的资源,P1 等 P2 的…… Pn 等 P0 的。像抢筷子,谁都凑不齐。

一个典型的死锁示例

两把互斥量 _mutex1_mutex2,主线程和子线程以相反顺序加锁,构成环路等待:

环路等待 — 时序分析
子线程 t
lock(_mutex1) ✓
Sleep(12)
lock(_mutex2) — 阻塞!
×
DEADLOCK
主线程 main
lock(_mutex2) ✓
Sleep(12)
lock(_mutex1) — 阻塞!
两个线程各持一把锁,又各自等对方释放另一把。四个条件全部满足:互斥(mutex)、请求保持(持有一把还要另一把)、不可剥夺(lock 后只能自己 unlock)、环路等待(t→mutex2→main→mutex1→t)。
经典死锁示例C++
 static  std::recursive_mutex _mutex1;
static  std::recursive_mutex _mutex2;
int hp;

 int main(int argc, char *argv[])
{
  thread t ( []()
  {
    _mutex1.lock();
    Sleep(12);
    _mutex2.lock();
   cout << "t:"<< hp << endl;

    _mutex1.unlock();
   _mutex2.unlock();
  });
  t.detach();

  _mutex2.lock();
  Sleep(12);
  _mutex1.lock();
  cout << "main:" << hp << endl;


  _mutex1.unlock();
  _mutex2.unlock();
 system("pause");
 return 0;
}

递归加锁带来的另一种死锁

同一把锁被同一个线程反复加锁也会锁死自己——std::mutex 不是递归锁:

递归死锁 — 调用链
thread t: _mutex1.lock() ✓
cout << "t:" << hp
调用 doWith()
doWith: _mutex1.lock() — 同一线程再次加锁,阻塞!
外层 unlock 永远不会执行 → DEADLOCK
递归死锁示例C++
void doWith()
{
 _mutex1.lock();
 cout << "doWith:" << hp << endl;
  _mutex1.unlock();

}
  int main(int argc, char *argv[])
 {

  thread t ( [=]()
  {
    _mutex1.lock();

   cout << "t:"<< hp << endl;
   doWith();
     _mutex1.unlock();

  });
  t.detach();
  system("pause");
 return 0;
}

解决办法:换用 std::recursive_mutex,或自己实现递归锁。

自己实现一把递归锁

核心思路:如果当前线程就是上次 lock 成功的线程,只增加引用计数,不真正 lock;否则才去竞争底层 mutex。

Re_mutex::lock() 逻辑
获取 this_thread::get_id()
当前线程 == _last_lock_id 且 _islock?
是 → ++count(不真正 lock)
否 → _mutex.lock(),记录 _last_lock_id
_mutex
底层真正的 std::mutex
_islock
是否有线程持有锁
_last_lock_id
上一次成功 lock 的线程 ID
count
递归加锁计数,unlock 时 --count,归零才真正 unlock
Re_mutex 实现C++
class Re_mutex
{
public:
 void lock()
 {
  cout << "lock id "<<this_thread::get_id() << endl;
  if (this_thread::get_id() == _last_lock_id &&_islock == true)
  {
   ++count;
  }
  else
  {
   _last_lock_id = this_thread::get_id();
   _islock = true;
   _mutex.lock();
  }
 }

 void unlock()
 {
  --count;
  if (count == 0 && _islock)
  {
   _mutex.unlock();
  }
 }
private:
 bool _islock = false;
 mutex _mutex;
 int count = 0;
 thread::id _last_lock_id;
};
static  Re_mutex _mutex1;
//static  std::recursive_mutex _mutex2;
int hp;
void doWith()
{
 _mutex1.lock();
 cout << "doWith:" << hp << endl;
 _mutex1.unlock();

}

 int main(int argc, char *argv[])
 {

  thread t ( [=]()
  {
    _mutex1.lock();

   cout << "t:"<< hp << endl;
   doWith();
    _mutex1.unlock();

  });
  t.detach();


 system("pause");
 return 0;
}

用标准库的工具避免死锁

工具语义解决的问题
std::lock(m1, m2, ...)要么全部锁上,要么一个都不锁破坏"请求保持"条件,避免锁住一半的中间态
std::lock_guard + adopt_lockRAII 接管已锁好的 mutex避免忘记 unlock / 重复加锁
std::recursive_mutex允许同一线程多次 lock递归调用场景的死锁
std::lock 实现原理C++
template<class _Lock0,
	class _Lock1,
	class... _LockN> inline
	void lock(_Lock0& _Lk0, _Lock1& _Lk1, _LockN&... _LkN)
	{	// lock N mutexes
	int _Res = 0;
	while (_Res != -1)
		_Res = _Try_lock(_Lk0, _Lk1, _LkN...);
	}
lock_guard + adopt_lockC++
	std::lock_guard<std::mutex>  l(m,std::adopt_lock);
std::lock 内部使用 try-lock 循环:尝试依次锁所有 mutex,任何一个失败就全部释放重试。这从根本上避免了"持有一半等另一半"的中间态。

避免死锁的经验法则

避免嵌套锁
一个线程已经获得了一个锁时,别去获取第二个。如果确实需要多把锁,用 std::lock 一次性获取。
避免在持有锁时调用用户代码
你不知道用户代码内部会做什么——可能去获取另一把锁,可能阻塞,可能回调回来递归加锁。
使用固定顺序获取锁
所有线程都按相同顺序加锁(如先 A 后 B),就不可能形成环路等待。std::lock 的原理正是如此。