Skip to content

Commit

Permalink
完成 asyncfuture 源码解析的所有内容 #12
Browse files Browse the repository at this point in the history
  • Loading branch information
Mq-b committed Jul 17, 2024
1 parent e191580 commit c485d44
Showing 1 changed file with 202 additions and 2 deletions.
204 changes: 202 additions & 2 deletions md/详细分析/03async与future源码解析.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ _NODISCARD_ASYNC future<_Invoke_result_t<decay_t<_Fty>, decay_t<_ArgTypes>...>>
`_Get_associated_state` 函数返回一个 `_Associated_state` 指针,该指针指向一个新的 `_Deferred_async_state` 或 `_Task_async_state` 对象。这两个类分别对应于异步任务的两种不同执行策略:**延迟执行**和**异步执行**。
> 其实就是父类指针指向了子类对象,注意 **`_Associated_state` 是有虚函数的**,子类进行覆盖,这很重要。比如在后续聊 `std::future` 的 `get()` 成员函数的时候就会讲到
> 这段代码也很好的说明在 MSVC STL 中,`launch::async | launch::deferred` 和 `launch::async` 的行为是相同的,即都是异步执行。
---
Expand Down Expand Up @@ -580,7 +582,7 @@ virtual void _Wait() { // wait for signal
}
```

先使用锁进行保护,然后调用函数,再循环等待任务执行完毕。关键实际上在于 `_Maybe_run_deferred_function`
先使用锁进行保护,然后调用函数,再循环等待任务执行完毕。`_Maybe_run_deferred_function`

```cpp
void _Maybe_run_deferred_function(unique_lock<mutex>& _Lock) { // run a deferred function if not already done
Expand All @@ -591,4 +593,202 @@ void _Maybe_run_deferred_function(unique_lock<mutex>& _Lock) { // run a deferred
}
```
`_Run_deferred_function` 相信你不会陌生,在讲述 `std::async` 源码中其实已经提到了。
`_Run_deferred_function` 相信你不会陌生,在讲述 `std::async` 源码中其实已经提到了,就是解锁然后调用 `_Call_immediate` 罢了。
```cpp
void _Run_deferred_function(unique_lock<mutex>& _Lock) override { // run the deferred function
_Lock.unlock();
_Packaged_state<_Rx()>::_Call_immediate();
_Lock.lock();
}
```

> `_Call_immediate` 就是执行我们实际传入的函数对象,先前已经提过。
`_Wait` 函数中调用 `_Maybe_run_deferred_function` 是为了确保延迟执行(`launch::deferred`)的任务能够在等待前被启动并执行完毕。这样,在调用 `wait` 时可以正确地等待任务完成。

至于下面的循环等待部分:

```cpp
while (!_Ready) {
_Cond.wait(_Lock);
}
```

这段代码使用了条件变量、互斥量、以及一个状态对象,主要目的有两个:

1. **避免虚假唤醒**
- 条件变量的 `wait` 函数在被唤醒后,会重新检查条件(即 `_Ready` 是否为 `true`),确保只有在条件满足时才会继续执行。这防止了由于虚假唤醒导致的错误行为。
2. **等待 `launch::async` 的任务在其它线程执行完毕**
- 对于 `launch::async` 模式的任务,这段代码确保当前线程会等待任务在另一个线程中执行完毕,并接收到任务完成的信号。只有当任务完成并设置 `_Ready``true` 后,条件变量才会被通知,从而结束等待。

这样,当调用 `wait` 函数时,可以保证无论任务是 `launch::deferred` 还是 `launch::async` 模式,当前线程都会正确地等待任务的完成信号,然后继续执行。

---

`wait()` 介绍完了,那么接下来就是 `get()`

```cpp
// std::future<void>
void get() {
// block until ready then return or throw the stored exception
future _Local{_STD move(*this)};
_Local._Get_value();
}
// std::future<T>
_Ty get() {
// block until ready then return the stored result or throw the stored exception
future _Local{_STD move(*this)};
return _STD move(_Local._Get_value());
}
// std::future<T&>
_Ty& get() {
// block until ready then return the stored result or throw the stored exception
future _Local{_STD move(*this)};
return *_Local._Get_value();
}
```

在第四章的 “*future 的状态变化*”一节中我们也详细聊过 `get()` 成员函数。由于 future 本身有三个特化,`get()` 成员函数自然那也有三个版本,不过总体并无多大区别。

它们都是将当前对象(`*this`)的共享状态转移给了这个局部对象 `_Local`,然后再去调用父类`_State_manager` 的成员函数 `_Get_value()` 获取值并返回。而局部对象 `_Local` 在函数结束时析构。这意味着当前对象(`*this`)失去共享状态,并且状态被完全销毁。

`_Get_value()`

```cpp
_Ty& _Get_value() const {
if (!valid()) {
_Throw_future_error2(future_errc::no_state);
}

return _Assoc_state->_Get_value(_Get_only_once);
}
```

先进行一下状态判断,如果拥有共享状态则继续,调用 `_Assoc_state` 的成员函数 `_Get_value` ,传递 `_Get_only_once` 参数,其实就是代表这个成员函数只能调用一次,次参数是里面进行状态判断的而已。

`_Assoc_state` 的类型是 ` _Associated_state<_Ty>* ` ,是一个指针类型,它实际会指向自己的子类对象,我们在讲 `std::async` 源码的时候提到了,它必然指向 `_Deferred_async_state` 或者 `_Task_async_state`

`_Assoc_state->_Get_value` 这其实是个多态调用,父类有这个虚函数:

```cpp
virtual _Ty& _Get_value(bool _Get_only_once) {
unique_lock<mutex> _Lock(_Mtx);
if (_Get_only_once && _Retrieved) {
_Throw_future_error2(future_errc::future_already_retrieved);
}

if (_Exception) {
_STD rethrow_exception(_Exception);
}

// TRANSITION: `_Retrieved` should be assigned before `_Exception` is thrown so that a `future::get`
// that throws a stored exception invalidates the future (N4950 [futures.unique.future]/17)
_Retrieved = true;
_Maybe_run_deferred_function(_Lock);
while (!_Ready) {
_Cond.wait(_Lock);
}

if (_Exception) {
_STD rethrow_exception(_Exception);
}

if constexpr (is_default_constructible_v<_Ty>) {
return _Result;
} else {
return _Result._Held_value;
}
}

```
但是子类 `_Task_async_state` 进行了重写,以 `launch::async` 策略创建的 future,那么实际会调用 `_Task_async_state::_Get_value` :
```cpp
_State_type& _Get_value(bool _Get_only_once) override {
// return the stored result or throw stored exception
_Task.wait();
return _Mybase::_Get_value(_Get_only_once);
}
```

> `_Deferred_async_state` 则没有进行重写,就是直接调用父类虚函数。
`_Task` 就是 `::Concurrency::task<void> _Task;`,调用 `wait()` 成员函数确保任务执行完毕。

`_Mybase::_Get_value(_Get_only_once)` 其实又是回去调用父类的虚函数了。

> ### `_Get_value` 方法详细解释
1. **状态检查**
- 如果`_Get_only_once`为真并且结果已被检索过,则抛出`future_already_retrieved`异常。
2. **异常处理**
- 如果存在存储的异常,重新抛出该异常。
3. **标记结果已被检索**
-`_Retrieved`设置为`true`
4. **执行延迟函数**
- 调用`_Maybe_run_deferred_function`来运行可能的延迟任务。这个函数很简单,就是单纯的执行延时任务而已,在讲述 `wait` 成员函数的时候已经讲完了。
5. **等待结果就绪**
- 如果结果尚未准备好,等待条件变量通知结果已就绪。(这里和 `std::async``std::future` 的组合无关,因为如果是 `launch::async` 模式创建的任务,重写的 `_Get_value` 是先调用了 `_Task.wait();` 确保异步任务执行完毕,此处根本无需等待它)
6. **再次检查异常**
- 再次检查是否有存储的异常,并重新抛出它。
7. **返回结果**
- 如果`_Ty`是默认可构造的,返回结果`_Result`
- 否则,返回`_Result._Held_value`

`_Result` 是通过执行 `_Call_immediate` 函数,然后 `_Call_immediate` 再执行 `_Set_value``_Set_value` 再执行 `_Emplace_result``_Emplace_result` 再执行 `_Emplace_result` 获取到我们执行任务的值的。以 `Ty` 的偏特化为例:

```cpp
// _Packaged_state
void _Call_immediate(_ArgTypes... _Args) {
_TRY_BEGIN
// 调用函数对象并捕获异常 传递返回值
this->_Set_value(_Fn(_STD forward<_ArgTypes>(_Args)...), false);
_CATCH_ALL
// 函数对象抛出异常就记录
this->_Set_exception(_STD current_exception(), false);
_CATCH_END
}

// _Asscoiated_state
void _Set_value(const _Ty& _Val, bool _At_thread_exit) { // store a result
unique_lock<mutex> _Lock(_Mtx);
_Set_value_raw(_Val, &_Lock, _At_thread_exit);
}
void _Set_value_raw(const _Ty& _Val, unique_lock<mutex>* _Lock, bool _At_thread_exit) {
// store a result while inside a locked block
if (_Already_has_stored_result()) {
_Throw_future_error2(future_errc::promise_already_satisfied);
}

_Emplace_result(_Val);
_Do_notify(_Lock, _At_thread_exit);
}
template <class _Ty2>
void _Emplace_result(_Ty2&& _Val) {
// TRANSITION, incorrectly assigns _Result when _Ty is default constructible
if constexpr (is_default_constructible_v<_Ty>) {
_Result = _STD forward<_Ty2>(_Val); // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
} else {
::new (static_cast<void*>(_STD addressof(_Result._Held_value))) _Ty(_STD forward<_Ty2>(_Val));
_Has_stored_result = true;
}
}
```
## 总结
好了,到此也就可以了。
你不会期待我们将每一个成员函数都分析一遍吧?首先是没有必要,其次是篇幅限制。
`std::future` 的继承关系让人感到头疼,但是如果耐心的看了一遍,全部搞明白了继承关系, `std::async` 如何创建的 `std::future` 也就没有问题了。
其实各位不用着急完全理解,可以慢慢看,至少有许多的显著的信息,比如:
1. `sttd::future` 的很多部分,如 `get()` 成员函数实现中,实际使用了虚函数。
2. `std::async` 创建 `std::future` 对象中,内部其实也有父类指针指向子类对象,以及多态调用。
3. `std::async` 的非延迟执行策略,使用到了自家的 PPL 库。
4. 微软的 `std::async` 策略实现并不符合标准,不区分 `launch::async | launch::deferred` 和 `launch::async`。
5. `std::future` 内部使用到了互斥量、条件变量、异常指针等设施。

0 comments on commit c485d44

Please sign in to comment.