扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
下面的线程池是基于C++11的线程池。编写的思路跟前文的pthread实现的线程池基本一致。
#include#include#include#include#include#include#include
templateclass ThreadPool
{public:
ThreadPool(size_t thread_nums = std::thread::hardware_concurrency())
:_thread_nums(thread_nums)
, _stop(false)
{for (size_t i = 0; i< _thread_nums; ++i)
{ _vt.emplace_back(std::thread(&ThreadPool::LoopWork, this));//将this指针也传过去
}
}
//禁用拷贝构造和operator=
ThreadPool(const ThreadPool &) = delete;
ThreadPool& operator=(const ThreadPool &) = delete;
private:
//线程的执行函数
void LoopWork()
{std::unique_lockul(_mtx);
for (;;)
{ while (_taskQueue.empty() && !_stop)
{ _isEmpty.wait(ul);
}
//线程从wait中出来有2种情况: 1.有任务了 2.线程池stop为true
if (_taskQueue.empty()) { ul.unlock(); //退出前要先解锁
break;
}
else { T* task = std::move(_taskQueue.front());
_taskQueue.pop();
ul.unlock();
(*task)(); //任务类需要重载operator()()
ul.lock();
}
}
}
public:
void PushTask(T& task)
{//通过{}控制lock_guard的作用域和生命周期
{ std::lock_guardlg(_mtx);
_taskQueue.push(&task); //任务队列是临界资源(其他地方会修改)
}
_isEmpty.notify_one(); //条件变量的通知并不会因为多线程而影响结果(因此可以不加锁)
}
~ThreadPool()
{_stop = true;
_isEmpty.notify_all();
for (size_t i = 0; i< _thread_nums; ++i)
{ if (_vt[i].joinable()) { _vt[i].join();
}
}
}
private:
size_t _thread_nums;
std::vector_vt;
std::queue_taskQueue;
std::mutex _mtx;
std::condition_variable _isEmpty;
std::atomic_stop;
};
struct Task {Task(int x, int y)
:_x(x),
_y(y)
{}
void operator()() {std::cout<< _x<< " + "<< _y<< " = "<< _x + _y<< std::endl;
}
int _x;
int _y;
};
int main()
{std::shared_ptr>tp(new ThreadPool());
Task t(1, 2);
tp->PushTask(t);
}
输出结果:
1 + 2 = 3
2. 优化1 - 支持任意类型任务
在前文的pthread线程池以及根据pthread改写的C++11线程池都是基于模板实现的。因为线程池需要能够接收不同类型的任务。但是将整个ThreadPool
类设为模板其实不是最优解,因为当ThreadPool需要处理其它类型的任务时,还需要再实例化出一个新的ThreadPool类。它并没有实现一个线程池实例接收多种类型任务的功能。它实现的是多个线程池实例接收多种类型任务,每个线程池本质是只处理一种任务。
为了解决上述问题,我们不应该将整个ThreadPool都设为模板类,我们的思路是让任务队列能够存放不同类型的任务。这里使用到了C++11中的function
包装器,让任务队列里存放function
类型的任务。而在PushTask()
方法里,我们需要让该函数能够接收任意类型的任务,因此需要单独将PushTask()
方法设为模板函数。
#include#include#include#include#include#include#include
class ThreadPool
{public:
ThreadPool(size_t thread_nums = std::thread::hardware_concurrency())
:_thread_nums(thread_nums)
, _stop(false)
{for (size_t i = 0; i< _thread_nums; ++i)
{ _vt.emplace_back(std::thread(&ThreadPool::LoopWork, this));//将this指针也传过去
}
}
//禁用拷贝构造和operator=
ThreadPool(const ThreadPool &) = delete;
ThreadPool& operator=(const ThreadPool &) = delete;
private:
//线程的执行函数
void LoopWork()
{std::unique_lockul(_mtx);
for (;;)
{ while (_taskQueue.empty() && !_stop)
{ _isEmpty.wait(ul);
}
//线程从wait中出来有2种情况: 1.有任务了 2.线程池stop为true
if (_taskQueue.empty()) { ul.unlock(); //退出前要先解锁
break;
}
else { auto task = std::move(_taskQueue.front());
_taskQueue.pop();
ul.unlock();
task();
ul.lock();
}
}
}
public:
templatevoid PushTask(F&& task)
{{ std::lock_guardlg(_mtx);
_taskQueue.push(std::forward(task)); //任务队列是临界资源(其他地方会修改)
}
_isEmpty.notify_one(); //条件变量的通知并不会因为多线程而影响结果(因此可以不加锁)
}
~ThreadPool()
{_stop = true;
_isEmpty.notify_all();
for (size_t i = 0; i< _thread_nums; ++i)
{ if (_vt[i].joinable()) { _vt[i].join();
}
}
}
private:
size_t _thread_nums;
std::vector_vt;
std::queue>_taskQueue;//每个任务必须保证是: void返回值、无参
std::mutex _mtx;
std::condition_variable _isEmpty;
std::atomic_stop;
};
• 优化1 - 测试
std::mutex gb_mtx;
void Add(int x, int y)
{std::lock_guardlg(gb_mtx); //为了保证输出结果不会打印错乱
std::cout<< x<< " + "<< y<< " = "<< x + y<< std::endl;
}
int main()
{std::shared_ptrtp(new ThreadPool());
for (int i = 0; i< 10; ++i)
{auto f = bind(Add, i, i + 1); //我们需要手动绑定一下函数参数
tp->PushTask(f);
}
return 0;
}
输出结果:
0 + 1 = 1
1 + 2 = 3
2 + 3 = 5
3 + 4 = 7
4 + 5 = 9
5 + 6 = 11
6 + 7 = 13
7 + 8 = 15
8 + 9 = 17
9 + 10 = 19
3. 优化2 - 支持可变参数
前一个版本的线程池是支持任意类型的任务的,但是我们在使用线程池的时候必须要手动bind一个函数对象,然后再传过去(这是因为任务队列要求任务必须是void返回值且无参数)。为了优化这个问题,我们可以使用可变参数模板+bind解决。下面给出PushTask()
方法的优化:(除了PushTask, 其它地方没有任何修改)
templatevoid PushTask(F&& task, Args&&... args)
{//将可变参数包装起来, 包装后func的类型满足了"void返回值+无参"
auto func = std::bind(std::forward(task), std::forward(args)...);
{ std::lock_guardlg(_mtx);
_taskQueue.push(std::forward(func)); //传入我们包装好的函数对象
}
_isEmpty.notify_one(); //条件变量的通知并不会因为多线程而影响结果(因此可以不加锁)
}
• 优化2 - 测试
std::mutex gb_mtx;
void Add(int x, int y)
{std::lock_guardlg(gb_mtx); //为了保证输出结果不会打印错乱
std::cout<< x<< " + "<< y<< " = "<< x + y<< std::endl;
}
int main()
{std::shared_ptrtp(new ThreadPool());
for (int i = 0; i< 10; ++i)
{//auto f = bind(Add, i, i + 1);
//tp->PushTask(f);
tp->PushTask(Add, i, i + 1); //我们不需要手动绑定了, 直接传参即可
}
return 0;
}
输出结果与测试1的相同。
4. 优化3 - 通过future获取任务函数的返回值
在优化2的基础上,为了能够接收任务函数的返回值,并且还不能让线程阻塞。这里需要使用线程异步。我们在PushTask()
中需要返回一个future
类型的对象。下面是对PushTask()
方法的修改。(除了PushTask, 其它地方没有任何修改)
注意: 记得包含一下
头文件。
templateauto PushTask(F&& task, Args&&... args) ->std::future{//将可变参数包装起来
auto func = std::bind(std::forward(task), std::forward(args)...);
auto task_ptr = std::make_shared>(func); //(1)
std::functionwrapper_func = [task_ptr] { (*task_ptr)();
};//(2)
{ std::lock_guardlg(_mtx);
_taskQueue.push(wrapper_func);
}
_isEmpty.notify_one(); //条件变量的通知并不会因为多线程而影响结果(因此可以不加锁)
return task_ptr->get_future();//(3)
}
在优化2的基础上,我们从(1)开始看。我们使用make_shared
构造了一个share_ptr对象, 方便我们管理。这个智能指针的类型是package_task
。详细介绍看这里: C++11 线程异步。这个package_task
中包装的函数对象的类型为decltype(task(args...))()
。这个类型的含义为: 该函数对象的返回值的类型为task(args...)
,函数参数为空。
(2)是对智能指针再次进行了一层封装,目的为了能够将其作为参数传给任务队列。实际上这里可以省略不写的,然后更改_taskQueue.push(wrapper_func);
为
_taskQueue.push([task_ptr]{(*task_ptr)();
});
(3)返回智能指针管理的package_task的future对象。
• 优化3 - 测试
int Add(int x, int y)
{return x + y;
}
int main()
{std::shared_ptrtp(new ThreadPool());
std::vector>res; //future保存的就是函数的返回值
for (int i = 0; i< 10; ++i)
{res.push_back(tp->PushTask(Add, i, i + 1));
}
for (auto& e : res)
{std::cout<< e.get()<< std::endl;
}
return 0;
}
输出结果:
1
3
5
7
9
11
13
15
17
19
5. 总结
我们实现的pthread的线程池是基于模板的,因此当有不同类型的任务想要添加到线程池处理时,我们就必须额外创建一个对应类型的线程池对象,这个过程往往不是我们想看到的。为了解决这种问题,我们给出了三种递进式优化方案,每种都是基于前面一种的基础上作的进一步优化。
优化1: 支持任意类型的任务函数。它的本质是让任务队列能够接收任意类型的任务函数,我们这里使用了function
作为任务队列的类型,也就是说我们在PushTask()
时需要事前包装好任务函数,保证其类型为void()
。(返回值为void, 参数为空)
优化2: 支持可变参数。这是通过可变参数模板+bind实现的。我们允许使用者直接传任务函数以及它的任意个参数。我们需要在push任务前包装好任务函数,保证任务函数为void()
类型。而如何包装? 这里就需要使用bind进行包装了。auto func = std::bind(std::forward
优化3: 通过future获取任务函数的返回值。这是通过线程异步实现的。获取子线程执行完毕的函数的返回值的方法就是使用异步。因此我们将func
任务函数 包装到了package_task
任务包当中,package_task
中保存了future对象。这里我们使用了智能指针去管理package_task
。由于我们最终的目的是需要将一个类型为void()
的任务添加到任务队列。因此这里我们使用了lambda表达式对智能指针再次进行了一层封装,里面包装了智能指针去调用任务函数的过程。lambada表达式的类型就是void()
,因此我们可以将其添加到任务队列当中。后续取出任务后,直接像调用函数一样执行任务即可。(因为任务队列当中存的都是一个个的function
杂谈:
个人觉得能够掌握前两种优化就够了。第三种相对比较难懂一些,并且没有太大的必要。我们之所以会实现第三种是因为想要获取任务函数的返回值! 因为我们在优化2
的状态下也能够获取到任务的返回值。我们可以传入一个输出型参数即可。示例如下:
void Add(int x, int y, int& res)
{res = x + y;
}
int main()
{std::shared_ptr>tp(new ThreadPool());
int res;
tp->PushTask(Add, 1, 2, std::ref(res)); //注意, 这里必须要使用``ref()``引用传参!!!
std::cout<< res<< std::endl;
return 0;
}
输出结果: 3
注: 还有很多人会将taskQueue
任务队列封装成一个类*(SafeQueue)*,然后封装任务队列的各种接口,将各种加锁、解锁操作都封装到了该类里面。外界在使用该类时,不需要再额外考虑锁的问题了。
参考:
Github: mtrebi/thread-pool: Thread pool implementation using c++11 threads
https://zhuanlan.zhihu.com/p/367309864
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流