ThreadPool
线程池项目
基本概念
线程池的概念:
线程池是一种用于优化线程管理和任务调度的并发编程机制,其核心在于复用线程、控制资源使用,并提升系统性能。
说白了就是,为了防止这么构造线程的浪费太多的时间,先把线程创建好,放在那等着,有任务提交的话就直接处理,省去了构造线程的过程
那么我们就可以想到一个线程池的最基础的构造
变量:
- 一个vector 用来存储构造好的线程
- 一个queue 用来存储task (利用先进先出的思想,先来先服务 嘛 )
- mutex 有了queue自然得考虑到互斥的问题
- condition_variable 条件变量 同mutex理
- flag 停止标识
函数:
- 首先得有初始化函数,用来构造线程 (此处可以使用类的构造函数)
- 其次得有线程的释放join 的函数 (此处可以使用类的析构函数)
- 还有得有添加任务的函数
ThreadPool声明
通过前面描述来声明一下CLASS
class ThreadPool{ |
这里其他的都比较好理解,主要是enqueue函数难以理解,这个先放一放,后面再详细解释
ThreadPool构造函数,初始化函数的实现
这个其实没啥说的,很常规的东西
inline ThreadPool::ThreadPool(size_t threadsNum):stop(false){ |
~ThreadPool析构函数,函数的释放
emmm 这个也常规
inline ThreadPool::~ThreadPool() |
enqueue 添加任务功能的实现
这里就是最为复杂的地方,本来也是挺常规的,正常的传入(无参void函数)就行了,但是确实会出现问题,这样的话就只能处理 无参void函数 这一种类型了,这显然很有局限性。因此便有了这样一种书写方式
基本思想:将一个带参数的有返回值的函数,转化一下,转化为 无参void函数 就行了
先把代码发放上来,稍后一句一句分析
template <class F, class... Args> |
首先看一下函数的声明
template <class F, class... Args> |
定义的是一个模板,template <class F, class… Args> 两个参数
- class F:接受任意类型的可调用对象(函数、Lambda、函数对象等)。
- class… Args:接受任意数量和类型的参数包。
函数参数
- F&& f:万能引用(不是右值引用!),可以绑定到任意类型的可调用对象(左值或右值)。
- Args&&… args:参数包的万能引用,保留参数的原始值类别(左值/右值)。
注意:这里的返回值是auto 是根据箭头后面的来推理出来的
- -> std::future<typename std::result_of<F(Args…)>::type>
- future用于异步获取进程的返回值
- result_of获取函数 F(Args…) 的返回值
using return_type = typename std::result_of<F(Args...)>::type; |
auto task = std::make_shared<std::packaged_task<return_type()>>( |
packaged_task包装,这个下面解释,先解释一下bind函数
bind函数,这里使用的很巧妙
这里可以看一下bind的用法,这个挺重要的 https://blog.csdn.net/qq_38410730/article/details/103637778
这里就介绍一下bind在这里的用法
对于一个函数
void fun_1(int x,int y,int z) { |
这里是有三个参数的,x y z ,我们通过 bind 来做处理
//表示绑定函数 fun 的第三个参数为 3,而fun 的第一,二个参数分别由调用 f2 的第一,二个参数指定 |
这里的用法是一样的:
std::bind(std::forward<F>(f), std::forward<Args>(args)...) |
实现了从有参到无参数的转化
继续:
//从 packaged_task 中提取 future,用户可通过 res.get() 获取最终结果。 |
这里解释一下上面说的 packaged_task
std::packaged_task<return_type()> 的核心目的是将任意函数 包装成异步任务,并自动关联一个 std::future 用于获取结果。
- std::packaged_task<return_type()> task(func); // 包装为 packaged_task
- 此时,task 的签名是 return_type(),即调用 task() 会执行 func 并返回结果。
- packaged_task 内部维护一个 共享状态(shared state),当调用 task() 执行函数时:
- 执行绑定的函数 func
- 将函数返回值存入共享状态
- 唤醒通过 future 等待该结果的线程
为什么上面的语句 未执行就能获取 future
std::future
- future 是一个 结果的占位符,它的有效性从 packaged_task 创建时就开始,与任务执行时机无关。调用 res.get() 时:
- 若任务已执行:立即返回结果
- 若任务未执行:阻塞等待结果
为什么要用make_shared管理
auto task = std::make_shared<std::packaged_task<return_type()>>(…);
- 确保 packaged_task 存活至任务被执行
- 不用手动释放(只能指针的妙用嘛)
继续:
{ |
前面不用多说,主要是最后一行
task经过之前的处理,已经相当于一个 函数指针 了,这里直接调用 (*task)() 他的返回值由 packaged_task 管理
我们整体来看:Lambda 表达式 task { (*task)(); } 就是一个无参数,无返回值的函数 因此 可以正常的 放到 this->tasks里面
继续:
// 唤醒一个正在等待的 worker 线程来处理新任务。 |
没啥好说的
我们最后再整体看一下代码:
template <class F, class... Args> |
整理流程:
- 用户调用 enqueue
- 提交函数 f 和参数 args,要求返回 future。
- 创建 packaged_task
- 将 f(args…) 绑定为 return_type() 类型的任务。
- 获取 future
- 立即通过 task->get_future() 获取结果占位符。
- 封装为 void() 任务
- 将任务包装为 task{ (*task)(); },符合队列类型。
- 任务入队与通知
- 唤醒线程池中的工作线程。
- 任务执行
- 工作线程调用 (*task)(),执行用户函数并存储结果到共享状态。
- 用户获取结果
- 用户通过 future.get() 等待或获取结果。
main测试
#include <iostream>
#include "include/threadPool.h"
using namespace std;
int main(){
// 测试1:基本任务执行和返回值
{
ThreadPool pool(2);
auto future_add = pool.enqueue([](int a, int b) {
return a + b;
}, 2, 3);
int result = future_add.get();
std::cout << "Test 1: 2 + 3 = " << result << std::endl; // 预期输出5
}
// 测试2:并行执行多个任务
{
const int num_tasks = 8;
ThreadPool pool(4);
std::vector<std::future<int>> futures;
for (int i = 0; i < num_tasks; ++i) {
futures.emplace_back(pool.enqueue([i] {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟耗时操作
return i * i;
}));
}
std::cout << "Test 2: Results: ";
for (auto& fut : futures) {
std::cout << fut.get() << " "; // 预期输出0 1 4 9 16 25 36 49(顺序可能不同)
}
std::cout << "\n";
}
// 测试3:异常传播
{
ThreadPool pool(2);
auto future_exception = pool.enqueue([] {
throw std::runtime_error("Intentional exception");
return 0;
});
try {
future_exception.get();
} catch (const std::exception& e) {
std::cout << "Test 3: Caught exception: " << e.what() << std::endl;
}
}
// 测试4:线程池自动回收(通过作用域生命周期)
{
std::cout << "Test 4: Creating nested thread pool..." << std::endl;
{
ThreadPool inner_pool(2);
inner_pool.enqueue([] {
std::cout << "Task executed in inner pool" << std::endl;
});
} // 此处inner_pool析构,自动等待任务完成
std::cout << "Inner pool destroyed" << std::endl;
}
std::cout << "All tests completed successfully." << std::endl;
return 0;
}
cd build
cmake ..
make
./threadPool