C++下线程池的一种实现解读

C++下线程池的一种实现解读

[TOC]

本文整理了线程池使用原因以及时机, 并介绍了一种开源实现.

背景

为什么要使用线程池?
传统多线程方案是一旦接受到多线程处理的请求之后, 立即创建一个新的线程, 由该线程执行任务. 任务执行完毕后, 线程退出, 这就是是 “即时创建,即时销毁“ 的策略. 尽管与创建进程相比, 创建线程的时间已经大大的缩短, 但是如果提交给线程的任务是执行时间较短, 而且执行次数极其频繁, 那么 CPU 将处于不停的创建线程, 销毁线程的状态. 实际的一种场景就是目前的大多数网络服务器, 包括 Web 服务器, Email 服务器以及数据库服务器等都具有一个共同点, 就是单位时间内必须处理数目巨大的连接请求, 但处理时间却相对较短.

我们将传统方案中的线程执行过程分为三个过程:$T_1,T_2,T_3$.
$T_1$:线程创建时间
$T_2$:线程执行时间,包括线程的同步等时间
$T_3$:线程销毁时间
因此, 管理线程本身的开销所占的比例为$\frac{T_1 + T_3}{T_1 + T_2 + T_3}$. 如果线程执行的时间很长的话, 这比开销可能占到20%-50%左右. 如果任务执行时间很短的话, 这笔开销将是不可忽略的.
除此之外, 线程池能够减少创建的线程个数. 通常线程池所允许的并发线程是有上界的, 如果同时需要并发的线程数超过上界, 那么一部分线程将会等待. 而传统方案中, 如果同时请求数目为2000, 那么最坏情况下, 系统可能需要产生2000个线程. 尽管这不是一个很大的数目, 但是也有部分机器可能达不到这种要求.

因此线程池的出现正是着眼于减少线程本身带来的开销. 线程池采用预创建的技术, 在应用程序启动之后, 将立即创建一定数量的线程(N1), 放入空闲队列中. 这些线程都是处于阻塞(Suspended)状态, 不消耗 CPU, 但占用较小的内存空间. 当任务到来后, 缓冲池选择一个空闲线程, 把任务传入此线程中运行. 当 N1 个线程都在处理任务后, 缓冲池自动创建一定数量的新线程, 用于处理更多的任务. 在任务执行完毕后线程也不退出, 而是继续保持在池中等待下一次的任务. 当系统比较空闲时, 大部分线程都一直处于暂停状态, 线程池自动销毁一部分线程, 回收系统资源.

基于这种预创建技术, 线程池将线程创建和销毁本身所带来的开销分摊到了各个具体的任务上, 执行次数越多, 每个任务所分担到的线程本身开销则越小, 不过我们另外可能需要考虑进去线程之间同步所带来的开销.

线程池适合场景:
事实上, 线程池并不是万能的. 它有其特定的使用场合. 线程池致力于减少线程本身的开销对应用所产生的影响, 这是有前提的, 前提就是线程本身开销与线程执行任务相比不可忽略. 如果线程本身的开销相对于线程任务执行开销而言是可以忽略不计的, 那么此时线程池所带来的好处是不明显的, 比如对于 FTP 服务器以及 Telnet 服务器, 通常传送文件的时间较长, 开销较大, 那么此时, 我们采用线程池未必是理想的方法, 我们可以选择 “即时创建, 即时销毁” 的策略 .
总之线程池通常适合下面的几个场合:

  • 单位时间内处理任务频繁而且任务处理时间短.
  • 对实时性要求较高. 如果接受到任务后在创建线程, 可能满足不了实时要求, 因此必须采用线程池进行预创建.

实现

github 上有个 5.2k stars 的实现, 非常简洁. 接下来对其进行解读. 源码如下, 仅有一个头文件 ThreadPool.h.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;

// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;

{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}

task();
}
}
);
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);

// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");

tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}

#endif

调用的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <vector>
#include <chrono>

#include "ThreadPool.h"

int main()
{

ThreadPool pool(4);
std::vector< std::future<int> > results;

for(int i = 0; i < 8; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::cout << "hello " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i*i;
})
);
}

for(auto && result: results)
std::cout << result.get() << ' ';
std::cout << std::endl;

return 0;
}

解读

ThreadPool 类

成员变量:

std::vector< std::thread > workers: 用 vector 容器存放线程组, 线程个数 size_t threads.

std::queue< std::function<void()> > tasks: 用 queue 队列容器存放任务. 任务类型为std::function<void()>函数对象, 函数对象的类型为 void() 也就是任意 callable 类型. 这个 queue 也是不同线程间共享的数据结构, 也就是说函数对象里自己需要处理的数据, 需要自己解决(例如包含进其参数里).

std::mutex queue_mutex: 一个访问任务队列 queue 的互斥锁, 在插入任务或者线程取出任务都需要借助互斥锁进行安全访问.

std::condition_variable condition: 一个用于通知线程任务队列状态的条件变量, 若有任务则通知线程池中一个线程去执行, 否则进入 wait 状态.

bool stop: 标识线程池的状态, 线程池停止使用则置为 true, 这样 ThreadPool 类才能析构成功.

成员函数:

ThreadPool(size_t): 线程池的构造函数.

auto enqueue(F&& f, Args&&... args): 将任务添加到线程池的任务队列中, 供线程调用执行.

~ThreadPool(): 线程池的析构函数.

下面对三个成员函数进行解读:

构造函数

构造函数中的 for(;;) 行表明了, 一旦构造出了 size_t threads = n 个 threads, 这些 threads(假设共有 $t_1,t_2…t_n$, $n$ 个 thread)会立即开启并且在 ThreadPool 类析构之前低成本地不停空转, 直到他们被分配到 task 然后执行 task.

这里用到了一些细节总结一下:

  1. 对于 thread 的 vector, 不使用 push_back() 而是使用 emplace_back(), 因为后者是支持 movable 对象的, 而前者只支持 copyable 对象. 而 thread 仅支持 movable 对象.

  2. 通过 lambda 函数初始化 thread 对象. 接下来用 thread $t_1$ 为例子. [this] 用来捕捉当前类中的 this->stop, this->tasks.empty() 等成员变量.[this]后的 {} 为 lambda 函数的函数体, 即 $t_1$ 需要进行的计算/运行内容.

  3. for(;;) 表明 $t_1$ 一直在运行(析构 detach 前).

  4. $t_1$ 需要执行 task, 也就是需要去访问 tasks 的 queue 给自己找活干. 为了防止 race condition(即便是对 queue 进行 empty() 这样的查询操作), 先用 std::unique_lock 加锁, 使用 std::unique_lock 而不是 std::lock_guard 的原因是 std::unique_lock 对 lock 与 unlock 的使用更自由. 下面则是 std::unique_lock wait() 函数原型, 其谓词判断 p(这里为 return this->stop || !this->tasks.empty();, 即线程池被停止了或者任务队列不为空) 为 false 时才会进入 wait(lock, p) 状态. 当前线程阻塞直至条件变量被通知. 对 wait() 函数中又一次使用了 lambda 函数, 形式与上一个 lambda 函数类似, 这里不再赘述.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // wait 函数的原型
    template< class Predicate >
    void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
    // 简单版
    void wait( std::unique_lock<std::mutex>& lock );
    // 复杂版等效于
    while (!pred()) {
    wait(lock);
    }
    // 实现功能:wait 导致当前线程阻塞直至条件变量被通知,或虚假唤醒发生,可选地循环直至满足某谓词.
  5. 若后续条件变量来了通知(notify), 线程就会继续往下进行: 若线程池已经被停止了且任务队列为空,则线程返回结束, 没必要进行死循环浪费资源.

    1
    2
    if(this->stop && this->tasks.empty())
    return;
  6. task = std::move(this->tasks.front()); 用之前建好的 task 取出队列中的最前面的 task. 这里使用 std::move 匹配 std::packaged_task 的构造函数参数类型. this->tasks.pop(); 然后再将被领走的 task pop 出去, 这里的设计可以避免 《C++ 并发实战》中所说的悬空问题(也就是先取出接稳了,再删除顶部元素).

  7. task();,执行 task. 注意 task(); 并没有离开 for(;;) 作用域, 也就是一旦 task 执行完, $t_1$ 会进行下一次空转/领活/结束的判断/循环中.

添加任务

  1. 下面5行其实是模板编程的常规操作, 主要是类型推断与适配. 具体地 F 为 函数对象,... ArgsF 的不定参数, 通过 std::result_of<F(Args...)>::type 推断执行 F(Args...)后返回的类型, 用 using 将其定义为 return_type .ThreadPool::enqueue() 函数的返回值类型为 future, future 需要被特化成 std::future<typename std::result_of<F(Args...)>::type>. 通过 auto -> 可以推断出 ThreadPool::enqueue() 函数的返回值类型.

    1
    2
    3
    4
    5
    template<class F, class... Args>
    auto ThreadPool::enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>
    {
    using return_type = typename std::result_of<F(Args...)>::type;
  2. 下面代码需要一步步拆解.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    auto task = 
    std::make_shared
    <
    std::packaged_task<return_type()>
    >
    (
    std::bind(
    std::forward<F>(f),
    std::forward<Args>(args)...)
    );

    考虑内存安全性, 效率, 我们选择对 task 进行智能指针的包装. 在使用智能指针之前对 task 使用 std::packaged_task 包装(把输入的 F... Args 变为 C++ 认识的规范并发任务对象), 获得其能导出 future 的特性. std::packaged_task 对象的构建需要一个函数对象, 然而 F... Args 分别为函数对象及其参数, 不符合 std::packaged_task 构造函数要求. 因此使用 std::bind 将其绑定为一个函数对象. 但是考虑到 F... Args 是右值引用 &&, 直接绑定的话可能导致引用折叠导致类型匹配失败, 因此又对 F... Args进行了 std::forward 完美转发. std::packaged_task 构造也需要知道函数对象的类型, 也就是 return_type() 类型. 至此 std::make_shared 之后的部分已经拆解完毕, std::make_sharedstd::packaged_task 成为智能指针指向的对象, 最终得到 task .

  3. 下面是 std::packaged_task 的使用示例. 可以看到 std::packaged_task 构造后通过 get_future()获取 future 对象(也就是代码中的 std::future<return_type> res = task->get_future();), 这也是整个 ThreadPool::enqueue 函数返回的对象(一个 future ), 然后通过 .get() 函数即可获取 $t_1$ 的 future 里的计算结果.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    int countdown (int from, int to) {
    for (int i=from; i!=to; --i) {
    std::cout << i << '\n';
    std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    std::cout << "Lift off!\n";
    return from-to;
    }

    int main ()
    {
    std::packaged_task<int(int,int)> tsk (countdown); // set up packaged_task
    std::future<int> ret = tsk.get_future(); // get future

    std::thread th (std::move(tsk),10,0); // spawn thread to count down from 10 to 0

    // ...

    int value = ret.get(); // wait for the task to finish and get result

    std::cout << "The countdown lasted for " << value << " seconds.\n";

    th.join();

    return 0;
    }
  4. 下面是对 task 的队列进行线程安全的 emplace 操作. 先使用 std::unique_lock 加锁. 然后进行 stop 与否的判断. 但是这时的 task 还是个 std::packaged_task 对象, 不是一个可执行的函数对象, 这里代码中通过一个 lambda 函数进行了转换, (*task)() 变成 F(...Args) 的形式.

    1
    2
    3
    4
    5
    std::unique_lock<std::mutex> lock(queue_mutex);
    // don't allow enqueueing after stopping the pool
    if(stop)
    throw std::runtime_error("enqueue on stopped ThreadPool");
    tasks.emplace([task](){ (*task)(); });
  5. condition.notify_one(); 唤醒下一个线程.

析构函数

  1. 锁住队列后(很警惕), 把 stop 标志位设置为 true.
  2. condition.notify_all(); 把这个变化告知所有的 threads.
  3. 然后销毁所有 threads, 剩下的交给 RAII 完成析构.

例子运行

对例子编译后得到 a.out, 运行后的可能的一个结果:

1
g++ -std=c++11 -pthread main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
hello 1
hello hello 20

hello 3
world world world world 1302

hello 4

hello
5
hello hello 7
6
0 1 4 9 world 4
world 5
16 25 world 7
world 6
36 49

参考链接:
https://blog.csdn.net/caoshangpa/article/details/80374651
https://www.cnblogs.com/chenleideblog/p/12915534.html

作者

cx

发布于

2022-02-28

更新于

2022-07-16

许可协议