Skip to content

Commit

Permalink
修改完善线程池的内容 #12
Browse files Browse the repository at this point in the history
1. fix 线程池的实现中存在错误 #24
2. 修改示例的打印输出,增加线程 id
  • Loading branch information
Mq-b committed Aug 3, 2024
1 parent f2d31f4 commit 0b7e481
Showing 1 changed file with 48 additions and 64 deletions.
112 changes: 48 additions & 64 deletions md/详细分析/04线程池.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,16 @@ graph TD
#include <boost/asio.hpp>
#include <iostream>

std::mutex m;

void print_task(int n) {
std::cout << "Task " << n << " is running." << std::endl;
std::lock_guard<std::mutex> lc{ m };
std::cout << "Task " << n << " is running on thr: " <<
std::this_thread::get_id() << '\n';
}

int main() {
boost::asio::thread_pool pool{4}; // 创建一个包含 4 个线程的线程池
boost::asio::thread_pool pool{ 4 }; // 创建一个包含 4 个线程的线程池

for (int i = 0; i < 10; ++i) {
boost::asio::post(pool, [i] { print_task(i); });
Expand All @@ -111,7 +115,7 @@ int main() {
}
```
> [运行](https://godbolt.org/z/Pa3z1oYej)测试。
> [运行](https://godbolt.org/z/41445Kab5)测试。
- 创建线程池时,指定线程数量,线程池会创建对应数量的线程。
Expand Down Expand Up @@ -163,11 +167,11 @@ thread_pool::~thread_pool()
boost::asio::thread_pool pool{ 4 };

for (int i = 0; i < 10; ++i) {
boost::asio::post(pool, [i]() { print_task(i); });
boost::asio::post(pool, [i] { print_task(i); });
}
```

> [运行](https://godbolt.org/z/haPqKb1h7)测试。
> [运行](https://godbolt.org/z/MPoxrY9Yo)测试。
因为析构函数并不是阻塞直到执行完所有任务,而是先**停止**,再 `join()` 以及 `shutdown()`

Expand Down Expand Up @@ -345,17 +349,13 @@ public:
start();
}

~ThreadPool(){
~ThreadPool() {
stop();
join();
}

void stop() {
stop_.store(true);
cv_.notify_all();
}

void join(){
for (auto& thread : pool_) {
if (thread.joinable()) {
thread.join();
Expand Down Expand Up @@ -383,16 +383,16 @@ public:
return ret;
}

void start(){
for (std::size_t i = 0; i < num_threads_; ++i){
void start() {
for (std::size_t i = 0; i < num_threads_; ++i) {
pool_.emplace_back([this] {
while (!stop_) {
Task task;
{
std::unique_lock<std::mutex> lc{ mutex_ };
cv_.wait(lc, [this] {return stop_ || !tasks_.empty(); });
if (tasks_.empty())
return;
cv_.wait(lc, [this] {return stop_ || !tasks_.empty(); });
task = std::move(tasks_.front());
tasks_.pop();
}
Expand All @@ -415,89 +415,77 @@ private:
**测试 demo**

```cpp
int print_task(int n) {
std::osyncstream{ std::cout } << "Task " << n << " is running." << std::endl;
return n;
}
int print_task2(int n) {
std::osyncstream{ std::cout } << "🐢🐢🐢 " << n << " 🐉🐉🐉" << std::endl;
return n;
}

int main() {
ThreadPool pool{ 4 }; // 创建一个有 4 个线程的线程池 构造函数自动启动线程池
ThreadPool pool{ 4 }; // 创建一个有 4 个线程的线程池
std::vector<std::future<int>> futures; // future 集合,获取返回值

for (int i = 0; i < 10; ++i) {
futures.emplace_back(pool.submit(print_task, i));
}
pool.join(); // 阻塞,让任务全部执行完毕

std::puts("---------------------");

pool.start(); // 重新启动线程池

for (int i = 0; i < 10; ++i) {
futures.emplace_back(pool.submit(print_task2, i));
}
pool.join(); // 阻塞,让任务全部执行完毕

int sum = 0;
for(auto& future : futures){
sum += future.get();
for (auto& future : futures) {
sum += future.get(); // get() 成员函数 阻塞到任务执行完毕,获取返回值
}
std::cout << "sum: " << sum << '\n';
} // 析构自动 stop() join()
} // 析构自动 stop()
```

**可能的[运行结果](https://godbolt.org/z/3rbExqbb7)**:
**可能的[运行结果](https://godbolt.org/z/n7Tana59x)**

```shell
Task 0 is running.
Task 4 is running.
Task 5 is running.
Task 6 is running.
Task 7 is running.
Task 8 is running.
Task 9 is running.
Task 2 is running.
Task 3 is running.
Task 1 is running.
---------------------
🐢🐢🐢 0 🐉🐉🐉
Task 0 is running on thr: 6900
Task 1 is running on thr: 36304
Task 5 is running on thr: 36304
Task 3 is running on thr: 6900
Task 7 is running on thr: 6900
Task 2 is running on thr: 29376
Task 6 is running on thr: 36304
Task 4 is running on thr: 31416
🐢🐢🐢 1 🐉🐉🐉
Task 9 is running on thr: 29376
🐢🐢🐢 0 🐉🐉🐉
Task 8 is running on thr: 6900
🐢🐢🐢 2 🐉🐉🐉
🐢🐢🐢 5 🐉🐉🐉
🐢🐢🐢 6 🐉🐉🐉
🐢🐢🐢 4 🐉🐉🐉
🐢🐢🐢 5 🐉🐉🐉
🐢🐢🐢 3 🐉🐉🐉
🐢🐢🐢 6 🐉🐉🐉
🐢🐢🐢 7 🐉🐉🐉
🐢🐢🐢 8 🐉🐉🐉
🐢🐢🐢 9 🐉🐉🐉
sum: 90
```

> 如果不自己显式调用 `join()` ,而是等待线程池对象调用析构函数,那么效果如同 `asio::thread_pool`,会先进行 `stop`导致一些任务无法执行
> 如果等待线程池对象调用析构函数,那么效果如同 `asio::thread_pool`,会先进行 `stop`这可能导致一些任务无法执行。不过我们在最后**循环遍历了 `futures`**,调用 `get()` 成员函数,不存在这个问题
它支持**任意可调用类型**,当然也包括非静态成员函数。我们使用了 [`std::decay_t`](https://zh.cppreference.com/w/cpp/types/decay),所以参数的传递其实是**按值复制**,而不是引用传递,这一点和大部分库的设计一致。示例如下:

```cpp
struct X{
void f(const int& n)const{
std::cout << &n << '\n';
struct X {
void f(const int& n) const {
std::osyncstream{ std::cout } << &n << '\n';
}
};

X x;
int n = 6;
std::cout << &n << '\n';
pool.start();
pool.submit(&X::f, &x, n); // 默认复制,地址不同
pool.submit(&X::f, &x, std::ref(n));
pool.join();
int main() {
ThreadPool pool{ 4 }; // 创建一个有 4 个线程的线程池

X x;
int n = 6;
std::cout << &n << '\n';
auto t = pool.submit(&X::f, &x, n); // 默认复制,地址不同
auto t2 = pool.submit(&X::f, &x, std::ref(n));
t.wait();
t2.wait();
} // 析构自动 stop()
```
> [运行](https://godbolt.org/z/vTc7M8Kov)测试。
> [运行](https://godbolt.org/z/vY458T44e)测试。
我们的线程池的 `submit` 成员函数在传递参数的行为上,与先前介绍的 `std::thread` 和 `std::async` 等设施基本一致。
Expand All @@ -512,11 +500,7 @@ pool.join();
**外部接口:**
- **`stop()`**:停止线程池,通知所有线程退出(不会等待所有任务执行完毕)。

- **`join()`**:等待所有线程完成任务。

- **`submit()`**:将任务提交到任务队列,并返回一个`std::future`对象用于获取任务结果。

- **`submit()`**:将任务提交到任务队列,并返回一个`std::future`对象用于获取任务结果以及确保任务执行完毕。
- **`start()`**:启动线程池,创建并启动指定数量的线程。
我们并没有提供一个功能强大的所谓的“***调度器***”,我们只是利用条件变量和互斥量,让操作系统自行调度而已,它并不具备设置任务优先级之类的调度功能。
Expand Down

0 comments on commit 0b7e481

Please sign in to comment.