线程死锁一引起的系列思考
从一个实际遇到的死锁 Bug 出发,梳理死锁的产生条件、典型模式、以及 C++ 标准库提供的防御手段。
死锁产生的四个必要条件
要让死锁真正发生,必须同时满足下面四个条件,缺一不可:
01
互斥条件
资源在一段时间内只能由一个进程占用,其他进程若要访问只能排队等待。
02
请求和保持条件
进程已持有至少一个资源,又去申请被其他进程占用的新资源,被阻塞时仍不释放已有资源。
03
不剥夺条件
进程获得的资源在用完之前不能被强行夺走,只能由自己释放。
04
环路等待条件
存在进程-资源的环形等待链:P0 等 P1 的资源,P1 等 P2 的…… Pn 等 P0 的。像抢筷子,谁都凑不齐。
一个典型的死锁示例
两把互斥量 _mutex1 和 _mutex2,主线程和子线程以相反顺序加锁,构成环路等待:
环路等待 — 时序分析
子线程 t
lock(_mutex1) ✓
Sleep(12)
lock(_mutex2) — 阻塞!
×
DEADLOCK
主线程 main
lock(_mutex2) ✓
Sleep(12)
lock(_mutex1) — 阻塞!
两个线程各持一把锁,又各自等对方释放另一把。四个条件全部满足:互斥(mutex)、请求保持(持有一把还要另一把)、不可剥夺(lock 后只能自己 unlock)、环路等待(t→mutex2→main→mutex1→t)。
经典死锁示例C++
static std::recursive_mutex _mutex1;
static std::recursive_mutex _mutex2;
int hp;
int main(int argc, char *argv[])
{
thread t ( []()
{
_mutex1.lock();
Sleep(12);
_mutex2.lock();
cout << "t:"<< hp << endl;
_mutex1.unlock();
_mutex2.unlock();
});
t.detach();
_mutex2.lock();
Sleep(12);
_mutex1.lock();
cout << "main:" << hp << endl;
_mutex1.unlock();
_mutex2.unlock();
system("pause");
return 0;
}
递归加锁带来的另一种死锁
同一把锁被同一个线程反复加锁也会锁死自己——std::mutex 不是递归锁:
递归死锁 — 调用链
thread t: _mutex1.lock() ✓
↓
cout << "t:" << hp
↓
调用 doWith()
↓
doWith: _mutex1.lock() — 同一线程再次加锁,阻塞!
↓
外层 unlock 永远不会执行 → DEADLOCK
递归死锁示例C++
void doWith()
{
_mutex1.lock();
cout << "doWith:" << hp << endl;
_mutex1.unlock();
}
int main(int argc, char *argv[])
{
thread t ( [=]()
{
_mutex1.lock();
cout << "t:"<< hp << endl;
doWith();
_mutex1.unlock();
});
t.detach();
system("pause");
return 0;
}
解决办法:换用 std::recursive_mutex,或自己实现递归锁。
自己实现一把递归锁
核心思路:如果当前线程就是上次 lock 成功的线程,只增加引用计数,不真正 lock;否则才去竞争底层 mutex。
Re_mutex::lock() 逻辑
获取 this_thread::get_id()
↓
当前线程 == _last_lock_id 且 _islock?
↓
是 → ++count(不真正 lock)
↓
否 → _mutex.lock(),记录 _last_lock_id
_mutex
底层真正的 std::mutex
_islock
是否有线程持有锁
_last_lock_id
上一次成功 lock 的线程 ID
count
递归加锁计数,unlock 时 --count,归零才真正 unlock
Re_mutex 实现C++
class Re_mutex
{
public:
void lock()
{
cout << "lock id "<<this_thread::get_id() << endl;
if (this_thread::get_id() == _last_lock_id &&_islock == true)
{
++count;
}
else
{
_last_lock_id = this_thread::get_id();
_islock = true;
_mutex.lock();
}
}
void unlock()
{
--count;
if (count == 0 && _islock)
{
_mutex.unlock();
}
}
private:
bool _islock = false;
mutex _mutex;
int count = 0;
thread::id _last_lock_id;
};
static Re_mutex _mutex1;
//static std::recursive_mutex _mutex2;
int hp;
void doWith()
{
_mutex1.lock();
cout << "doWith:" << hp << endl;
_mutex1.unlock();
}
int main(int argc, char *argv[])
{
thread t ( [=]()
{
_mutex1.lock();
cout << "t:"<< hp << endl;
doWith();
_mutex1.unlock();
});
t.detach();
system("pause");
return 0;
}
用标准库的工具避免死锁
| 工具 | 语义 | 解决的问题 |
|---|---|---|
std::lock(m1, m2, ...) | 要么全部锁上,要么一个都不锁 | 破坏"请求保持"条件,避免锁住一半的中间态 |
std::lock_guard + adopt_lock | RAII 接管已锁好的 mutex | 避免忘记 unlock / 重复加锁 |
std::recursive_mutex | 允许同一线程多次 lock | 递归调用场景的死锁 |
std::lock 实现原理C++
template<class _Lock0,
class _Lock1,
class... _LockN> inline
void lock(_Lock0& _Lk0, _Lock1& _Lk1, _LockN&... _LkN)
{ // lock N mutexes
int _Res = 0;
while (_Res != -1)
_Res = _Try_lock(_Lk0, _Lk1, _LkN...);
}
lock_guard + adopt_lockC++
std::lock_guard<std::mutex> l(m,std::adopt_lock);
std::lock 内部使用 try-lock 循环:尝试依次锁所有 mutex,任何一个失败就全部释放重试。这从根本上避免了"持有一半等另一半"的中间态。
避免死锁的经验法则
避免嵌套锁
一个线程已经获得了一个锁时,别去获取第二个。如果确实需要多把锁,用
std::lock 一次性获取。避免在持有锁时调用用户代码
你不知道用户代码内部会做什么——可能去获取另一把锁,可能阻塞,可能回调回来递归加锁。
使用固定顺序获取锁
所有线程都按相同顺序加锁(如先 A 后 B),就不可能形成环路等待。
std::lock 的原理正是如此。