和异步相似的概念,最初是在读书时从计算机原理中关于 CPU 时间最优分配算法的讲解里接触到的。
问题
假设有三段代码需要顺序执行,执行时间分别为:代码壹 20ms、代码贰 30ms、代码叁 5ms。执行顺序为壹 → 贰 → 叁,三者之间的相对顺序可以打乱。
按顺序执行时,各段代码的完成时间依次为:壹 20ms、贰 50ms、叁 55ms,平均等待时间为 41ms,全部任务完成需要 55ms。
问题是:这种方式在 CPU 密集场景下会带来较高的平均等待时间(响应时间/延迟),有没有办法降低?
方案分析
将各段代码拆分为更细的子任务:壹 → A1、A2(各 10ms),贰 → B1、B2、B3(各 10ms),叁 → C1。
原顺序为 A1 A2 B1 B2 B3 C1,若调整为 A1 B1 C1 A2 B2 B3,则壹贰叁的完成时间变为 25ms + 35ms + 55ms,平均等待时间降至 38ms,任务总完成时间仍为 55ms。平均等待时间因此有所减少。
其背后的守恒定律是:通过增加内耗(调度开销)换取平均等待时间的减少。拆分粒度需要根据实际情况调整——一般只对特别耗时或需要等待的操作进行异步处理,比如 IO 操作。异步代码的底层实现可以结合多线程甚至系统级手法来实现非阻塞运行。
这里提供的异步机制以回调形式暴露给使用者,因此理论上任何可通过回调执行的逻辑都可以利用该原理进行异步化。
实现原理
- 基本前提:壹、贰、叁之间没有依赖关系,但各自拆分出的子任务相对顺序不能改变。
- 用队列作为子任务的容器,子任务通过闭包实现。
- 这是异步的核心思想,大量异步手法都是该思想的具体应用。
异步的应用
TODO
示例代码
- 示例1 展示了基本运作原理,但 C++ 代码稍显晦涩。
- 示例2 通过语法糖让写法更为清晰。
- 如果语言层面原生支持该种手法,书写起来将会非常优雅,例如 Go。
示例代码1
class AsyncQueue
{
public:
inline void Post(const std::function<void(AsyncQueue*)> & cb)
{
_queue.push(cb);
}
void Run()
{
while (true)
{
InternalYield();
while (_queue.empty() == false)
{
auto cb = _queue.front();
_queue.pop();
cb(this);
}
}
}
private:
inline void InternalYield()
{
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
}
private:
std::queue<std::function<void(AsyncQueue*)>> _queue;
};
AsyncQueue async;
void func1()
{
async.Post([=](AsyncQueue*async)
{
cout << "11111111" << endl;
});
async.Post([=](AsyncQueue*async)
{
cout << "22222222" << endl;
async->Post([=](AsyncQueue*async)
{
cout << "3333333333" << endl;
});
});
}
void func2()
{
async.Post([=](AsyncQueue*async)
{
cout << "AAAAAAA" << endl;
async->Post([=](AsyncQueue*async)
{
cout << "BBBBBBBBBBB" << endl;
func1();
async->Post([=](AsyncQueue*async)
{
cout << "CCCCCCCC" << endl;
});
});
});
}
int main(int argc, char* argv[])
{
func2();
async.Run();
::system("pause");
return 0;
}
示例代码2
class AsyncQueue
{
public:
inline void Post(const std::function<void(AsyncQueue*)> & cb)
{
_queue.push(cb);
}
void Run()
{
while (true)
{
InternalYield();
while (_queue.empty() == false)
{
auto cb = _queue.front();
_queue.pop();
cb(this);
}
}
}
private:
inline void InternalYield()
{
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
}
private:
std::queue<std::function<void(AsyncQueue*)>> _queue;
};
class Async
{
public:
static Async*Create()
{
return new Async;
}
public:
Async* Add(const std::function<void(Async*)>&cb)
{
_queue.push(cb);//pass by copy
return this;
}
void Continue()
{
if (_queue.empty() == false)
{
auto cb = _queue.front();
_queue.pop();
cb(this);
if (_queue.empty())
{
delete this;
}
}
else
{
delete this;
}
}
void Go()
{
this->Continue();
}
private:
std::queue<std::function<void(Async *)>> _queue;
};
AsyncQueue async;
void func1()
{
Async::Create()->Add([=](Async*as)
{
//step 1
cout << "11111111" << endl;
as->Continue();
})->Add([=](Async*as)
{
//step 2
cout << "2222222222222" << endl;
async.Post([=](AsyncQueue*async)
{
cout << "3333333333" << endl;
});
})->Go();
}
void func2()
{
//step 1
Async::Create()->Add([=](Async*as)
{
//step 1
cout << "AAAAAAA" << endl;
as->Continue();
})->Add([=](Async*as)
{
//step 2
cout << "BBBBBBBBBBB" << endl;
async.Post([=](AsyncQueue*async)
{
func1();
as->Continue();
});
})->Add([=](Async*as)
{
//strp3
async.Post([=](AsyncQueue*async)
{
cout << "CCCCCCCC" << endl;
});
})->Go();
}
int main(int argc, char* argv[])
{
func2();
async.Run();
_CrtDumpMemoryLeaks();
::system("pause");
return 0;
}