From c485d442fa8f8a2f300f39ad4ab8f9b2291ad569 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?mq=E7=99=BD?= <3326284481@qq.com> Date: Wed, 17 Jul 2024 21:13:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=20`async`=20=E4=B8=8E=20`fut?= =?UTF-8?q?ure`=20=E6=BA=90=E7=A0=81=E8=A7=A3=E6=9E=90=E7=9A=84=E6=89=80?= =?UTF-8?q?=E6=9C=89=E5=86=85=E5=AE=B9=20#12?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...20\347\240\201\350\247\243\346\236\220.md" | 204 +++++++++++++++++- 1 file changed, 202 insertions(+), 2 deletions(-) diff --git "a/md/\350\257\246\347\273\206\345\210\206\346\236\220/03async\344\270\216future\346\272\220\347\240\201\350\247\243\346\236\220.md" "b/md/\350\257\246\347\273\206\345\210\206\346\236\220/03async\344\270\216future\346\272\220\347\240\201\350\247\243\346\236\220.md" index c219e861..85fb2db5 100644 --- "a/md/\350\257\246\347\273\206\345\210\206\346\236\220/03async\344\270\216future\346\272\220\347\240\201\350\247\243\346\236\220.md" +++ "b/md/\350\257\246\347\273\206\345\210\206\346\236\220/03async\344\270\216future\346\272\220\347\240\201\350\247\243\346\236\220.md" @@ -273,6 +273,8 @@ _NODISCARD_ASYNC future<_Invoke_result_t, decay_t<_ArgTypes>...>> `_Get_associated_state` 函数返回一个 `_Associated_state` 指针,该指针指向一个新的 `_Deferred_async_state` 或 `_Task_async_state` 对象。这两个类分别对应于异步任务的两种不同执行策略:**延迟执行**和**异步执行**。 + > 其实就是父类指针指向了子类对象,注意 **`_Associated_state` 是有虚函数的**,子类进行覆盖,这很重要。比如在后续聊 `std::future` 的 `get()` 成员函数的时候就会讲到 + > 这段代码也很好的说明在 MSVC STL 中,`launch::async | launch::deferred` 和 `launch::async` 的行为是相同的,即都是异步执行。 --- @@ -580,7 +582,7 @@ virtual void _Wait() { // wait for signal } ``` -先使用锁进行保护,然后调用函数,再循环等待任务执行完毕。关键实际上在于 `_Maybe_run_deferred_function`: +先使用锁进行保护,然后调用函数,再循环等待任务执行完毕。`_Maybe_run_deferred_function`: ```cpp void _Maybe_run_deferred_function(unique_lock& _Lock) { // run a deferred function if not already done @@ -591,4 +593,202 @@ void _Maybe_run_deferred_function(unique_lock& _Lock) { // run a deferred } ``` -`_Run_deferred_function` 相信你不会陌生,在讲述 `std::async` 源码中其实已经提到了。 +`_Run_deferred_function` 相信你不会陌生,在讲述 `std::async` 源码中其实已经提到了,就是解锁然后调用 `_Call_immediate` 罢了。 + +```cpp +void _Run_deferred_function(unique_lock& _Lock) override { // run the deferred function + _Lock.unlock(); + _Packaged_state<_Rx()>::_Call_immediate(); + _Lock.lock(); +} +``` + +> `_Call_immediate` 就是执行我们实际传入的函数对象,先前已经提过。 + +在 `_Wait` 函数中调用 `_Maybe_run_deferred_function` 是为了确保延迟执行(`launch::deferred`)的任务能够在等待前被启动并执行完毕。这样,在调用 `wait` 时可以正确地等待任务完成。 + +至于下面的循环等待部分: + +```cpp +while (!_Ready) { + _Cond.wait(_Lock); +} +``` + +这段代码使用了条件变量、互斥量、以及一个状态对象,主要目的有两个: + +1. **避免虚假唤醒**: + - 条件变量的 `wait` 函数在被唤醒后,会重新检查条件(即 `_Ready` 是否为 `true`),确保只有在条件满足时才会继续执行。这防止了由于虚假唤醒导致的错误行为。 +2. **等待 `launch::async` 的任务在其它线程执行完毕**: + - 对于 `launch::async` 模式的任务,这段代码确保当前线程会等待任务在另一个线程中执行完毕,并接收到任务完成的信号。只有当任务完成并设置 `_Ready` 为 `true` 后,条件变量才会被通知,从而结束等待。 + +这样,当调用 `wait` 函数时,可以保证无论任务是 `launch::deferred` 还是 `launch::async` 模式,当前线程都会正确地等待任务的完成信号,然后继续执行。 + +--- + +`wait()` 介绍完了,那么接下来就是 `get()` : + +```cpp +// std::future +void get() { + // block until ready then return or throw the stored exception + future _Local{_STD move(*this)}; + _Local._Get_value(); +} +// std::future +_Ty get() { + // block until ready then return the stored result or throw the stored exception + future _Local{_STD move(*this)}; + return _STD move(_Local._Get_value()); +} +// std::future +_Ty& get() { + // block until ready then return the stored result or throw the stored exception + future _Local{_STD move(*this)}; + return *_Local._Get_value(); +} +``` + +在第四章的 “*future 的状态变化*”一节中我们也详细聊过 `get()` 成员函数。由于 future 本身有三个特化,`get()` 成员函数自然那也有三个版本,不过总体并无多大区别。 + +它们都是将当前对象(`*this`)的共享状态转移给了这个局部对象 `_Local`,然后再去调用父类`_State_manager` 的成员函数 `_Get_value()` 获取值并返回。而局部对象 `_Local` 在函数结束时析构。这意味着当前对象(`*this`)失去共享状态,并且状态被完全销毁。 + +`_Get_value()` : + +```cpp +_Ty& _Get_value() const { + if (!valid()) { + _Throw_future_error2(future_errc::no_state); + } + + return _Assoc_state->_Get_value(_Get_only_once); +} +``` + +先进行一下状态判断,如果拥有共享状态则继续,调用 `_Assoc_state` 的成员函数 `_Get_value` ,传递 `_Get_only_once` 参数,其实就是代表这个成员函数只能调用一次,次参数是里面进行状态判断的而已。 + +`_Assoc_state` 的类型是 ` _Associated_state<_Ty>* ` ,是一个指针类型,它实际会指向自己的子类对象,我们在讲 `std::async` 源码的时候提到了,它必然指向 `_Deferred_async_state` 或者 `_Task_async_state`。 + +`_Assoc_state->_Get_value` 这其实是个多态调用,父类有这个虚函数: + +```cpp +virtual _Ty& _Get_value(bool _Get_only_once) { + unique_lock _Lock(_Mtx); + if (_Get_only_once && _Retrieved) { + _Throw_future_error2(future_errc::future_already_retrieved); + } + + if (_Exception) { + _STD rethrow_exception(_Exception); + } + + // TRANSITION: `_Retrieved` should be assigned before `_Exception` is thrown so that a `future::get` + // that throws a stored exception invalidates the future (N4950 [futures.unique.future]/17) + _Retrieved = true; + _Maybe_run_deferred_function(_Lock); + while (!_Ready) { + _Cond.wait(_Lock); + } + + if (_Exception) { + _STD rethrow_exception(_Exception); + } + + if constexpr (is_default_constructible_v<_Ty>) { + return _Result; + } else { + return _Result._Held_value; + } +} + +``` + +但是子类 `_Task_async_state` 进行了重写,以 `launch::async` 策略创建的 future,那么实际会调用 `_Task_async_state::_Get_value` : + +```cpp +_State_type& _Get_value(bool _Get_only_once) override { + // return the stored result or throw stored exception + _Task.wait(); + return _Mybase::_Get_value(_Get_only_once); +} +``` + +> `_Deferred_async_state` 则没有进行重写,就是直接调用父类虚函数。 + +`_Task` 就是 `::Concurrency::task _Task;`,调用 `wait()` 成员函数确保任务执行完毕。 + +`_Mybase::_Get_value(_Get_only_once)` 其实又是回去调用父类的虚函数了。 + +> ### `_Get_value` 方法详细解释 + +1. **状态检查**: + - 如果`_Get_only_once`为真并且结果已被检索过,则抛出`future_already_retrieved`异常。 +2. **异常处理**: + - 如果存在存储的异常,重新抛出该异常。 +3. **标记结果已被检索**: + - 将`_Retrieved`设置为`true`。 +4. **执行延迟函数**: + - 调用`_Maybe_run_deferred_function`来运行可能的延迟任务。这个函数很简单,就是单纯的执行延时任务而已,在讲述 `wait` 成员函数的时候已经讲完了。 +5. **等待结果就绪**: + - 如果结果尚未准备好,等待条件变量通知结果已就绪。(这里和 `std::async` 和 `std::future` 的组合无关,因为如果是 `launch::async` 模式创建的任务,重写的 `_Get_value` 是先调用了 `_Task.wait();` 确保异步任务执行完毕,此处根本无需等待它) +6. **再次检查异常**: + - 再次检查是否有存储的异常,并重新抛出它。 +7. **返回结果**: + - 如果`_Ty`是默认可构造的,返回结果`_Result`。 + - 否则,返回`_Result._Held_value`。 + +`_Result` 是通过执行 `_Call_immediate` 函数,然后 `_Call_immediate` 再执行 `_Set_value` ,`_Set_value` 再执行 `_Emplace_result`,`_Emplace_result` 再执行 `_Emplace_result` 获取到我们执行任务的值的。以 `Ty` 的偏特化为例: + +```cpp +// _Packaged_state +void _Call_immediate(_ArgTypes... _Args) { + _TRY_BEGIN + // 调用函数对象并捕获异常 传递返回值 + this->_Set_value(_Fn(_STD forward<_ArgTypes>(_Args)...), false); + _CATCH_ALL + // 函数对象抛出异常就记录 + this->_Set_exception(_STD current_exception(), false); + _CATCH_END +} + +// _Asscoiated_state +void _Set_value(const _Ty& _Val, bool _At_thread_exit) { // store a result + unique_lock _Lock(_Mtx); + _Set_value_raw(_Val, &_Lock, _At_thread_exit); +} +void _Set_value_raw(const _Ty& _Val, unique_lock* _Lock, bool _At_thread_exit) { + // store a result while inside a locked block + if (_Already_has_stored_result()) { + _Throw_future_error2(future_errc::promise_already_satisfied); + } + + _Emplace_result(_Val); + _Do_notify(_Lock, _At_thread_exit); +} +template +void _Emplace_result(_Ty2&& _Val) { + // TRANSITION, incorrectly assigns _Result when _Ty is default constructible + if constexpr (is_default_constructible_v<_Ty>) { + _Result = _STD forward<_Ty2>(_Val); // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + } else { + ::new (static_cast(_STD addressof(_Result._Held_value))) _Ty(_STD forward<_Ty2>(_Val)); + _Has_stored_result = true; + } +} +``` + +## 总结 + +好了,到此也就可以了。 + +你不会期待我们将每一个成员函数都分析一遍吧?首先是没有必要,其次是篇幅限制。 + +`std::future` 的继承关系让人感到头疼,但是如果耐心的看了一遍,全部搞明白了继承关系, `std::async` 如何创建的 `std::future` 也就没有问题了。 + +其实各位不用着急完全理解,可以慢慢看,至少有许多的显著的信息,比如: + +1. `sttd::future` 的很多部分,如 `get()` 成员函数实现中,实际使用了虚函数。 +2. `std::async` 创建 `std::future` 对象中,内部其实也有父类指针指向子类对象,以及多态调用。 +3. `std::async` 的非延迟执行策略,使用到了自家的 PPL 库。 +4. 微软的 `std::async` 策略实现并不符合标准,不区分 `launch::async | launch::deferred` 和 `launch::async`。 +5. `std::future` 内部使用到了互斥量、条件变量、异常指针等设施。