C++下线程池的一种实现解读
[TOC]
本文整理了线程池使用原因以及时机, 并介绍了一种开源实现.
背景
为什么要使用线程池?
传统多线程方案是一旦接受到多线程处理的请求之后, 立即创建一个新的线程, 由该线程执行任务. 任务执行完毕后, 线程退出, 这就是是 “即时创建,即时销毁“ 的策略. 尽管与创建进程相比, 创建线程的时间已经大大的缩短, 但是如果提交给线程的任务是执行时间较短, 而且执行次数极其频繁, 那么 CPU 将处于不停的创建线程, 销毁线程的状态. 实际的一种场景就是目前的大多数网络服务器, 包括 Web 服务器, Email 服务器以及数据库服务器等都具有一个共同点, 就是单位时间内必须处理数目巨大的连接请求, 但处理时间却相对较短.
我们将传统方案中的线程执行过程分为三个过程:$T_1,T_2,T_3$.
$T_1$:线程创建时间
$T_2$:线程执行时间,包括线程的同步等时间
$T_3$:线程销毁时间
因此, 管理线程本身的开销所占的比例为$\frac{T_1 + T_3}{T_1 + T_2 + T_3}$. 如果线程执行的时间很长的话, 这比开销可能占到20%-50%左右. 如果任务执行时间很短的话, 这笔开销将是不可忽略的.
除此之外, 线程池能够减少创建的线程个数. 通常线程池所允许的并发线程是有上界的, 如果同时需要并发的线程数超过上界, 那么一部分线程将会等待. 而传统方案中, 如果同时请求数目为2000, 那么最坏情况下, 系统可能需要产生2000个线程. 尽管这不是一个很大的数目, 但是也有部分机器可能达不到这种要求.
因此线程池的出现正是着眼于减少线程本身带来的开销. 线程池采用预创建的技术, 在应用程序启动之后, 将立即创建一定数量的线程(N1), 放入空闲队列中. 这些线程都是处于阻塞(Suspended)状态, 不消耗 CPU, 但占用较小的内存空间. 当任务到来后, 缓冲池选择一个空闲线程, 把任务传入此线程中运行. 当 N1 个线程都在处理任务后, 缓冲池自动创建一定数量的新线程, 用于处理更多的任务. 在任务执行完毕后线程也不退出, 而是继续保持在池中等待下一次的任务. 当系统比较空闲时, 大部分线程都一直处于暂停状态, 线程池自动销毁一部分线程, 回收系统资源.
基于这种预创建技术, 线程池将线程创建和销毁本身所带来的开销分摊到了各个具体的任务上, 执行次数越多, 每个任务所分担到的线程本身开销则越小, 不过我们另外可能需要考虑进去线程之间同步所带来的开销.
线程池适合场景:
事实上, 线程池并不是万能的. 它有其特定的使用场合. 线程池致力于减少线程本身的开销对应用所产生的影响, 这是有前提的, 前提就是线程本身开销与线程执行任务相比不可忽略. 如果线程本身的开销相对于线程任务执行开销而言是可以忽略不计的, 那么此时线程池所带来的好处是不明显的, 比如对于 FTP 服务器以及 Telnet 服务器, 通常传送文件的时间较长, 开销较大, 那么此时, 我们采用线程池未必是理想的方法, 我们可以选择 “即时创建, 即时销毁” 的策略 .
总之线程池通常适合下面的几个场合:
- 单位时间内处理任务频繁而且任务处理时间短.
- 对实时性要求较高. 如果接受到任务后在创建线程, 可能满足不了实时要求, 因此必须采用线程池进行预创建.
实现
github 上有个 5.2k stars 的实现, 非常简洁. 接下来对其进行解读. 源码如下, 仅有一个头文件 ThreadPool.h
.
1 |
|
调用的例子:
1 |
|
解读
ThreadPool 类
成员变量:
std::vector< std::thread > workers
: 用 vector 容器存放线程组, 线程个数 size_t threads
.
std::queue< std::function<void()> > tasks
: 用 queue 队列容器存放任务. 任务类型为std::function<void()>
函数对象, 函数对象的类型为 void()
也就是任意 callable 类型. 这个 queue 也是不同线程间共享的数据结构, 也就是说函数对象里自己需要处理的数据, 需要自己解决(例如包含进其参数里).
std::mutex queue_mutex
: 一个访问任务队列 queue 的互斥锁, 在插入任务或者线程取出任务都需要借助互斥锁进行安全访问.
std::condition_variable condition
: 一个用于通知线程任务队列状态的条件变量, 若有任务则通知线程池中一个线程去执行, 否则进入 wait 状态.
bool stop
: 标识线程池的状态, 线程池停止使用则置为 true, 这样 ThreadPool 类才能析构成功.
成员函数:
ThreadPool(size_t)
: 线程池的构造函数.
auto enqueue(F&& f, Args&&... args)
: 将任务添加到线程池的任务队列中, 供线程调用执行.
~ThreadPool()
: 线程池的析构函数.
下面对三个成员函数进行解读:
构造函数
构造函数中的 for(;;)
行表明了, 一旦构造出了 size_t threads = n
个 threads, 这些 threads(假设共有 $t_1,t_2…t_n$, $n$ 个 thread)会立即开启并且在 ThreadPool
类析构之前低成本地不停空转, 直到他们被分配到 task 然后执行 task.
这里用到了一些细节总结一下:
对于 thread 的 vector, 不使用
push_back()
而是使用emplace_back()
, 因为后者是支持 movable 对象的, 而前者只支持 copyable 对象. 而 thread 仅支持 movable 对象.通过 lambda 函数初始化 thread 对象. 接下来用 thread $t_1$ 为例子.
[this]
用来捕捉当前类中的this->stop
,this->tasks.empty()
等成员变量.[this]
后的{}
为 lambda 函数的函数体, 即 $t_1$ 需要进行的计算/运行内容.for(;;)
表明 $t_1$ 一直在运行(析构 detach 前).$t_1$ 需要执行 task, 也就是需要去访问 tasks 的 queue 给自己找活干. 为了防止 race condition(即便是对 queue 进行
empty()
这样的查询操作), 先用std::unique_lock
加锁, 使用std::unique_lock
而不是std::lock_guard
的原因是std::unique_lock
对 lock 与 unlock 的使用更自由. 下面则是std::unique_lock
wait()
函数原型, 其谓词判断p
(这里为return this->stop || !this->tasks.empty();
, 即线程池被停止了或者任务队列不为空) 为 false 时才会进入wait(lock, p)
状态. 当前线程阻塞直至条件变量被通知. 对wait()
函数中又一次使用了 lambda 函数, 形式与上一个 lambda 函数类似, 这里不再赘述.1
2
3
4
5
6
7
8
9
10// wait 函数的原型
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
// 简单版
void wait( std::unique_lock<std::mutex>& lock );
// 复杂版等效于
while (!pred()) {
wait(lock);
}
// 实现功能:wait 导致当前线程阻塞直至条件变量被通知,或虚假唤醒发生,可选地循环直至满足某谓词.若后续条件变量来了通知(notify), 线程就会继续往下进行: 若线程池已经被停止了且任务队列为空,则线程返回结束, 没必要进行死循环浪费资源.
1
2if(this->stop && this->tasks.empty())
return;task = std::move(this->tasks.front());
用之前建好的 task 取出队列中的最前面的 task. 这里使用std::move
匹配std::packaged_task
的构造函数参数类型.this->tasks.pop();
然后再将被领走的 task pop 出去, 这里的设计可以避免 《C++ 并发实战》中所说的悬空问题(也就是先取出接稳了,再删除顶部元素).task();
,执行 task. 注意task();
并没有离开for(;;)
作用域, 也就是一旦 task 执行完, $t_1$ 会进行下一次空转/领活/结束的判断/循环中.
添加任务
下面5行其实是模板编程的常规操作, 主要是类型推断与适配. 具体地
F
为 函数对象,... Args
为F
的不定参数, 通过std::result_of<F(Args...)>::type
推断执行F(Args...)
后返回的类型, 用using
将其定义为return_type
.ThreadPool::enqueue()
函数的返回值类型为future
,future
需要被特化成std::future<typename std::result_of<F(Args...)>::type>
. 通过auto ->
可以推断出ThreadPool::enqueue()
函数的返回值类型.1
2
3
4
5template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;下面代码需要一步步拆解.
1
2
3
4
5
6
7
8
9
10auto task =
std::make_shared
<
std::packaged_task<return_type()>
>
(
std::bind(
std::forward<F>(f),
std::forward<Args>(args)...)
);考虑内存安全性, 效率, 我们选择对 task 进行智能指针的包装. 在使用智能指针之前对 task 使用
std::packaged_task
包装(把输入的F
与... Args
变为 C++ 认识的规范并发任务对象), 获得其能导出 future 的特性.std::packaged_task
对象的构建需要一个函数对象, 然而F
与... Args
分别为函数对象及其参数, 不符合std::packaged_task
构造函数要求. 因此使用std::bind
将其绑定为一个函数对象. 但是考虑到F
与... Args
是右值引用&&
, 直接绑定的话可能导致引用折叠导致类型匹配失败, 因此又对F
与... Args
进行了std::forward
完美转发.std::packaged_task
构造也需要知道函数对象的类型, 也就是return_type()
类型. 至此std::make_shared
之后的部分已经拆解完毕,std::make_shared
让std::packaged_task
成为智能指针指向的对象, 最终得到 task .下面是
std::packaged_task
的使用示例. 可以看到std::packaged_task
构造后通过get_future()
获取 future 对象(也就是代码中的std::future<return_type> res = task->get_future();
), 这也是整个ThreadPool::enqueue
函数返回的对象(一个 future ), 然后通过.get()
函数即可获取 $t_1$ 的 future 里的计算结果.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26int countdown (int from, int to) {
for (int i=from; i!=to; --i) {
std::cout << i << '\n';
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Lift off!\n";
return from-to;
}
int main ()
{
std::packaged_task<int(int,int)> tsk (countdown); // set up packaged_task
std::future<int> ret = tsk.get_future(); // get future
std::thread th (std::move(tsk),10,0); // spawn thread to count down from 10 to 0
// ...
int value = ret.get(); // wait for the task to finish and get result
std::cout << "The countdown lasted for " << value << " seconds.\n";
th.join();
return 0;
}下面是对 task 的队列进行线程安全的
emplace
操作. 先使用std::unique_lock
加锁. 然后进行stop
与否的判断. 但是这时的 task 还是个std::packaged_task
对象, 不是一个可执行的函数对象, 这里代码中通过一个 lambda 函数进行了转换,(*task)()
变成F(...Args)
的形式.1
2
3
4
5std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });condition.notify_one();
唤醒下一个线程.
析构函数
- 锁住队列后(很警惕), 把
stop
标志位设置为 true. condition.notify_all();
把这个变化告知所有的 threads.- 然后销毁所有 threads, 剩下的交给 RAII 完成析构.
例子运行
对例子编译后得到 a.out
, 运行后的可能的一个结果:
1 | g++ -std=c++11 -pthread main.cpp |
1 | hello 1 |
参考链接:
https://blog.csdn.net/caoshangpa/article/details/80374651
https://www.cnblogs.com/chenleideblog/p/12915534.html