C++并发实战-第四章-总学习笔记第2

C++并发实战-第四章-总学习笔记第2

[TOC]
C++ 并发实战《C++ Concurrency in Action》的学习笔记2, 记录第四章的部分.
内容是 C++ 线程间的同步: condition variable, std::future,std::async, std::promise, std::packaged_task, Functional Programming 与 Communicating Sequential Processes 简介, latch, barrier 等.

4. Synchronizing concurrent operations

通过检查 flag(shared) 的方式来实现线程间的通信会浪费宝贵的系统资源并且锁住 flag 的时候让本该更新 flag 的线程阻塞, 如何做到不去检查也可以实现通信呢?

4.1 Waiting for an event or other condition

事件等待上作者做了一个特别生动的例子, 坐晚上卧铺, 如果整夜不睡等到车到站太累, 设置一个闹钟的话, 有可能火车晚点白白早起了或者是闹钟坏了就过站了 ,最好是有列车员在到站的时候立即把自己叫醒是最合理的.

事件等待也是如此.

  • 解决方案1: shared data 里设置一个 flag, 等待的线程不停地 check 这个 flag, 这个方案一方面浪费了等待期间的 CPU 时间, 另一方面还要不停地 lock 拖累本该早点结束的线程, 恰如不停地问卧铺列车驾驶员有没有到站导致驾驶员不能专心反而导致晚点的现象.

  • 解决方案2: 设置一个固定得到 callback 时间, std::this_thread::sleep_for 但周期不好定.

1
2
3
4
5
6
7
8
bool flag;
std::mutex m;
void wait_for_flag(){
std::unique_lock<std::mutex> lk(m);
while(!flag){
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
lk.lock();}}
  • 解决办法3: 使用 C++ 标准库 condition variable, 如下节.

整体的机制: Conceptually, a condition variable is associated with an event or other condition, and one or more threads can wait(不像 blocked那样, wait 不占用什么资源) for that condition to be satisfied. When a thread has determined that the condition is satisfied, it can then notify one or more of the threads waiting on the condition variable in order to wake them up and allow them to continue processing.

4.1.1 Waiting for a condition with condition variables

两种 std::condition_variable and std::condition_variable_any.
区别: 前者只适用于 std::mutex, 后者适用于所有的 Lockable 类型, 代价是 in terms of size, performance, or OS resources.

接口如下, 可以看到前者构造函数的参数类型是 unique_lock<mutex>, 后者是 Lock, 这就解释了上面说到的两者不同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
namespace std {
class condition_variable {
public:
condition_variable();
~condition_variable();

condition_variable(const condition_variable&) = delete;
condition_variable& operator=(const condition_variable&) = delete;

void notify_one() noexcept;
void notify_all() noexcept;
void wait(unique_lock<mutex>& lock);
template<class Pred>
void wait(unique_lock<mutex>& lock, Pred pred);
template<class Clock, class Duration>
cv_status wait_until(unique_lock<mutex>& lock,
const chrono::time_point<Clock, Duration>& abs_time);
template<class Clock, class Duration, class Pred>
bool wait_until(unique_lock<mutex>& lock,
const chrono::time_point<Clock, Duration>& abs_time, Pred pred);
template<class Rep, class Period>
cv_status wait_for(unique_lock<mutex>& lock,
const chrono::duration<Rep, Period>& rel_time);
template<class Rep, class Period, class Pred>
bool wait_for(unique_lock<mutex>& lock,
const chrono::duration<Rep, Period>& rel_time, Pred pred);

using native_handle_type = /* implementation-defined */;
native_handle_type native_handle();
};

class condition_variable_any {
public:
condition_variable_any();
~condition_variable_any();

condition_variable_any(const condition_variable_any&) = delete;
condition_variable_any& operator=(const condition_variable_any&) = delete;

void notify_one() noexcept;
void notify_all() noexcept;

// noninterruptible waits
template<class Lock>
void wait(Lock& lock);
template<class Lock, class Pred>
void wait(Lock& lock, Pred pred);

template<class Lock, class Clock, class Duration>
cv_status wait_until(Lock& lock,
const chrono::time_point<Clock, Duration>& abs_time);
template<class Lock, class Clock, class Duration, class Pred>
bool wait_until(Lock& lock, const chrono::time_point<Clock, Duration>& abs_time,
Pred pred);
template<class Lock, class Rep, class Period>
cv_status wait_for(Lock& lock, const chrono::duration<Rep, Period>& rel_time);
template<class Lock, class Rep, class Period, class Pred>
bool wait_for(Lock& lock, const chrono::duration<Rep, Period>& rel_time, Pred pred);

// interruptible waits
template<class Lock, class Pred>
bool wait(Lock& lock, stop_token stoken, Pred pred);
template<class Lock, class Clock, class Duration, class Pred>
bool wait_until(Lock& lock, stop_token stoken,
const chrono::time_point<Clock, Duration>& abs_time, Pred pred);
template<class Lock, class Rep, class Period, class Pred>
bool wait_for(Lock& lock, stop_token stoken,
const chrono::duration<Rep, Period>& rel_time, Pred pred);
};
}

应用代码例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
}
data_cond.notify_one();//此处先 unlock 在 notify, 防止被叫醒的线程还要被阻塞等 unlock.
}
}
void data_processing_thread()
{
while(true)
{
std::unique_lock<std::mutex> lk(mut);//使用 unique_lock 而不是 lock_guard 为了后面自由地 unlock.
data_cond.wait(lk,[]{return !data_queue.empty();});//检查 lambda 是否为 true, 是 false 的情况下才会 blocked or waiting state, 等待 notify_one.
//被 notify 后还要再检查 lambda condition 如果不满足 thread unlocks the mutex and resumes waiting
data_chunk data=data_queue.front();
data_queue.pop();
lk.unlock();
process(data);
if(is_last_chunk(data))
break;
}
}

使用条件变量有个问题: spurious wake 假醒
条件变量只是用来”通知”其他线程的, 但是其他线程需要经过自己的检查(互斥锁下)才能最终决定是运行任务还是接着 wait. 尤其是多个线程都被 notified(broadcast) 肯定只有一个线程能进行运行任务(启动任务的同时把互斥锁的内容标记为 false), 其他线程都是假醒的, 需要继续 wait.

还有另外一种假醒的情况: 理想状况下, 一个线程在等待条件变量通知之前都是处于 wait/sleep 状态下的, 只有接收到通知过后采取运行任务(在判断运行任务之前可能会检查互斥锁内容决定到底要不要运行任务). 但是在某些系统中存在即便没有得到通知, 线程也会被唤醒的情况. 这是很明显的假醒. 因此在最终决定是否执行任务最好加上双重条件: 条件变量是否被通知 + 互斥锁中的 Pred 函数返回 true.

此处的假醒可以参考链接1链接2.

因此 std::condition_variable::wait 只是一个优化 an optimization over a busy-wait.

wait() 函数的实现示意:

1
2
3
4
5
6
template<typename Predicate>
void minimal_wait(std::unique_lock<std::mutex>& lk,Predicate pred){
while(!pred()){
lk.unlock();
//检查notify后lock
lk.lock();}}

假醒的副作用:
不要去依赖条件变量的 function object 里的状态(也就是 side effects 例如 static data member), 因为假醒的次数以及频率是无法预测的. 原文如下:
Because the number and frequency of any such spurious wakes are by definition indeterminate, it isn’t advisable to use a function with side effects for the condition check. If you do so, you must be prepared for the side effects to occur multiple times.

4.1.2 Building a thread-safe queue with condition variables

设计好的 queue adaptor(interface)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>

template<typename T>
class threadsafe_queue{
private:
mutable std::mutex mut; // 注意必须为 mutable, 像 empty() 函数是 const 的
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue(){}
threadsafe_queue(threadsafe_queue const& other){
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}

void push(T new_value){
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}

//等待push结束后再pop
void wait_and_pop(T& value){
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}

//使用shared_ptr而不是引用得到pop元素
std::shared_ptr<T> wait_and_pop(){
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

//不判断push锁住后直接pop
bool try_pop(T& value){
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty)
return false;
value=data_queue.front();
data_queue.pop();
return true;
}

std::shared_ptr<T> try_pop(){
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool empty() const{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

notify_all() 的引入:
notify_one() 的缺点是不知道会通知到哪一个线程, 甚至也不去判断有没有线程能接到通知(也就是即便没有 waiting 的线程, 也会发出通知). notify_all()能实现通知所有的线程去检查 condition(如果这个机制只运行一次的话, 可以考虑用 std::call_once).

future 的引入:
如果 condition variable 只判断一次的话, 到目前位置的方案有些冗余, 这时候就会用到 future.

4.2 Waiting for one-off events with futures

对于 one-off event(the future can’t be reset), C++标准库提供 std::future and std::shared_future 分别应对只接收一个 event 与接收多个 event 的消息的模板. 直观理解可以对应为 std::unique_ptr(ownership move only) and std::shared_ptr(ownership shared to be read). 需要注意的是, 对 future 本身的访问不提供多线程访问的 synchronization 功能, 依旧需要通过 mutex 等实现. 但是可以把 std::shared_future<> 复制多份, 每个线程单独访问也可行(但是还是不支持同时写操作, 跟 std::shared_ptr 一样).

接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
namespace std {
template<class R>
class future {
public:
future() noexcept;
future(future&&) noexcept;
future& operator=(future&&) noexcept;
shared_future<R> share() noexcept;

// move only
future(const future&) = delete;
future& operator=(const future&) = delete;
~future();

// retrieving the value
R get();

// functions to check state
bool valid() const noexcept;

void wait() const;
template<class Rep, class Period>
future_status wait_for(const chrono::duration<Rep, Period>& rel_time) const;
template<class Clock, class Duration>
future_status wait_until(const chrono::time_point<Clock, Duration>& abs_time) const;
};

template<class R>
class shared_future {
public:
shared_future() noexcept;
shared_future(const shared_future& rhs) noexcept;
shared_future(future<R>&&) noexcept;
shared_future(shared_future&& rhs) noexcept;
~shared_future();
shared_future& operator=(const shared_future& rhs) noexcept;
shared_future& operator=(shared_future&& rhs) noexcept;

// retrieving the value
R get() const;

// functions to check state
bool valid() const noexcept;

void wait() const;
template<class Rep, class Period>
future_status wait_for(const chrono::duration<Rep, Period>& rel_time) const;
template<class Clock, class Duration>
future_status wait_until(const chrono::time_point<Clock, Duration>& abs_time) const;
};
}

后面发现一个图对future的理解有帮助,原网址
async_future.png

如何利用返回以及接受 future 对象进行线程间的通信呢? 有三种方式:

  1. std::async
  2. std::packaged_task<>
  3. std::promise

4.2.1 Returning values from background tasks

std::async 是一个模板函数, 原型如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//(since C++11) (until C++17)
template< class Function, class... Args >
std::future<typename std::result_of<typename std::decay<Function>::type(
typename std::decay<Args>::type...)>::type>
async( Function&& f, Args&&... args );

//(since C++17) (until C++20)
template< class Function, class... Args >
std::future<std::invoke_result_t<std::decay_t<Function>, std::decay_t<Args>...>>
async( Function&& f, Args&&... args );

//(since C++20)
template< class Function, class... Args >
[[nodiscard]]
std::future<std::invoke_result_t<std::decay_t<Function>, std::decay_t<Args>...>>
async( Function&& f, Args&&... args );

//(since C++11) (until C++17)
template< class Function, class... Args >
std::future<typename std::result_of<typename std::decay<Function>::type(
typename std::decay<Args>::type...)>::type>
async( std::launch policy, Function&& f, Args&&... args );

//(since C++17) (until C++20)
template< class Function, class... Args >
std::future<std::invoke_result_t<std::decay_t<Function>, std::decay_t<Args>...>>
async( std::launch policy, Function&& f, Args&&... args );

//(since C++20)
template< class Function, class... Args >
[[nodiscard]]
std::future<std::invoke_result_t<std::decay_t<Function>, std::decay_t<Args>...>>
async( std::launch policy, Function&& f, Args&&... args );

可以看到就是对函数对象及其参数的包装, 并有一个接受 std::launch policy 的 overload.
在此模板函数内部创建 thread 异步完成函数对象的任务(也可能是不创建 thread 同步地完成任务), 并返回 std::future 对象作为任务完成的 flag 以及完成后的数据结果.

it’s up to the implementation whether std::async starts a new thread, or whether the task runs synchronously when the future is waited for.

基本用法如下:

1
2
3
4
5
6
7
8
9
#include <future>
#include <iostream>
int find_the_answer_to_ltuae();
void do_other_stuff();
int main(){
std::future<int> the_answer=std::async(find_the_answer_to_ltuae);//开启新线程异步计算
do_other_stuff();
std::cout<<"The answer is "<<the_answer.get()<<std::endl;//在合适的时间点 get() 获取计算值
}

std::async 的使用如同 std::thread(可以看到模板参数都是 universal reference &&), 注意可能会用到 std::ref 进行包装.

std::launch policy 策略参数的使用:

  • std::launch::deferred: the task is executed on the calling thread the first time its result is requested (lazy evaluation). 控制任务什么时候开始执行. 默认是立即执行, 传入此策略可以延迟执行时机, 同时不会新开线程只在本线程里同步执行.
  • std::launch::async: the task is executed on a different thread, potentially by creating and launching it first. 必须新开线程进行异步执行任务. 默认是不一定新开, 看 implementation 的策略.

这两个策略可以组合使用: std::launch::deferred | std::launch::async, 可以看出与 I/O 库的一些参数一样是 bit mask 策略. 具体地:

In addition, implementations are allowed to:

  • define additional bits and bitmasks to specify restrictions on task interactions applicable to a subset of launch policies, and
  • enable those additional bitmasks for the first (default) overload of std::async.
1
2
3
4
5
auto f6=std::async(std::launch::async,Y(),1.2); //立即开始新线程
auto f7=std::async(std::launch::deferred,baz,std::ref(x));//表示延迟执行任务, 调用 get 或者 wait 时才会执行, 不会创建线程.
auto f8=std::async(std::launch::deferred | std::launch::async, baz,std::ref(x));//新线程取决于实现, 延迟执行
auto f9=std::async(baz,std::ref(x));//默认如上取决于具体实现
f7.wait();

表面来看 std::threadstd::async 都可以创建新线程, 那区别在哪里?

  1. std::thread 一定会创建新线程, std::async 不一定创建看参数与实际环境.
  2. std::thread 如果创建新线程失败, 会导致崩溃. std::async 如果不是强制创建新线程, 即便线程资源紧张, 也不会崩溃, 由 get_future().get() 所在的线程执行.

4.2.2 Associating a task with a future

std::packaged_task<> 模板类的接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
namespace std {
template<class> class packaged_task;

template<class R, class... ArgTypes>
class packaged_task<R(ArgTypes...)> {
public:
// construction and destruction
packaged_task() noexcept;
template<class F> explicit packaged_task(F&& f);
~packaged_task();

// no copy
packaged_task(const packaged_task&) = delete;
packaged_task& operator=(const packaged_task&) = delete;

// move support
packaged_task(packaged_task&& rhs) noexcept;
packaged_task& operator=(packaged_task&& rhs) noexcept;
void swap(packaged_task& other) noexcept;

bool valid() const noexcept;

// result retrieval
future<R> get_future();

// execution
void operator()(ArgTypes... );//callable object
void make_ready_at_thread_exit(ArgTypes...);

void reset();
};

template<class R, class... ArgTypes>
packaged_task(R (*)(ArgTypes...)) -> packaged_task<R(ArgTypes...)>;

template<class F> packaged_task(F) -> packaged_task</* see description */>;

template<class R, class... ArgTypes>
void swap(packaged_task<R(ArgTypes...)>& x, packaged_task<R(ArgTypes...)>& y) noexcept;
}

std::packaged_task<> 是对象, 因此可以更方便地用来管理任务, 函数以及 lambda 毕竟没有那么方便实现封装以及状态管理. 同时它重载了 operator() 是一个 functors, 本身也是 callable 类型的.

This abstracts out the details of the tasks; the scheduler just deals with std::packaged_task<> instances rather than individual functions.

一个简单的模板实例化

1
2
3
4
5
6
7
8
9
template<>
class packaged_task<std::string(std::vector<char>*,int)>
{
public:
template<typename Callable>
explicit packaged_task(Callable&& f);
std::future<std::string> get_future();
void operator()(std::vector<char>*,int);//callable
};

应用的示例代码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>
template<>
class packaged_task<std::string(std::vector<char>*,int)>{
public:
template<typename Callable>
explicit packaged_task(Callable&& f);
std::future<std::string> get_future();//get_future
void operator()(std::vector<char>*,int);//call operator()规定了task的入参的类型
}

std::mutex m;
std::deque<std::packaged_task<void()> > tasks;

bool gui_shutdown_message_received();
void get_and_process_gui_message();

void gui_thread(){
while(!gui_shutdown_message_received()){
get_and_process_gui_message();
std::packaged_task<void()> task;//先定义一个 task 用来接 deque pop 出来的元素
{
std::lock_guard<std::mutex> lk(m); //对于 deque 的检查以及 pop_front 加锁
if(tasks.empty()) continue;
task=std::move(tasks.front());
tasks.pop_front();
}
task();//执行 task
}
}

std::thread gui_bg_thread(gui_thread);

template<typename Func>
std::future<void> post_task_for_gui_thread(Func f){
std::packaged_task<void()> task(f); //初始化一个 task
std::future<void> res=task.get_future();//初始化 task 对应的 future 对象
std::lock_guard<std::mutex> lk(m); //对 deque 添加元素的操作加锁
tasks.push_back(std::move(task));
return res; //返回的 future 对象里面可能含有想要的数据(虽然这里是 void 没有数据), 由后续的操作通过 get() 函数抽取, 执行任务(可能)
}

std::packaged_task<> 的缺点: What about those tasks that can’t be expressed as a simple function call or those tasks where the result may come from more than one place? 引出 std::promise.

4.2.3 Making (std::)promises

std::promises 是更加灵活, 只要有一个 std::promisesstd::future 的 pair 即可. 通过 set_value()get() 实现通信与同步.

接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
namespace std {
template<class R>
class promise {
public:
promise();
template<class Allocator>
promise(allocator_arg_t, const Allocator& a);
promise(promise&& rhs) noexcept;
//move only
promise(const promise&) = delete;
~promise();

// assignment
promise& operator=(promise&& rhs) noexcept;
promise& operator=(const promise&) = delete;
void swap(promise& other) noexcept;

// retrieving the result
future<R> get_future();

// setting the result
void set_value(/* see description */);
void set_exception(exception_ptr p);

// setting the result with deferred notification
void set_value_at_thread_exit(/* see description */);
void set_exception_at_thread_exit(exception_ptr p);
};

template<class R>
void swap(promise<R>& x, promise<R>& y) noexcept;

template<class R, class Alloc>
struct uses_allocator<promise<R>, Alloc>;
}

一个直白的例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#include <cstdio>
#include <thread>
#include <future>
#include <chrono>

int main(int argc, char * argv[])
{
using namespace std::literals;
auto promise = std::promise<int>();
auto t1 = std::thread([&promise]{
std::cout << "thread 1 running\n";
// 模拟耗时操作
std::this_thread::sleep_for(2s);
// 写入数据,这会唤醒正在等待数据的线程
promise.set_value(42);
std::cout << "thread 1 end\n";
});
//两个线程都想得到 t1 的结果
auto shared_future = std::shared_future<int>(promise.get_future());
auto t2 = std::thread([shared_future]{
std::cout << "thread 2 running\n";
// 获取数据,如果数据还没准备好就会阻塞
// 这里使用 std::printf 而不是 std::cout,是为了保证输出在同一行
std::printf("thread 2: %d\n", shared_future.get());
std::cout << "thread 2 end\n";
});
// 获取数据,如果数据还没准备好就会阻塞
std::printf("main: %d\n", shared_future.get());
t1.join();
t2.join();
return 0;
}

总结一下 std::async, std::packaged_taskstd::promise 的区别.

std::async, std::packaged_taskstd::promise 三者有一个共同点:它们都可以返回一个 future 对象, 用户可以通过这个 future 的 get() 方法获取最终的结果.

get future 的方式稍有不同:

  • std::async:返回值即 std::future.
  • std::packaged_task:调用 get_future() 方法获得 std::future.
  • std::promise:调用 get_future()方法获得 std::future.

区别:

  • std::async:提供最高层次的抽象. 它会自动创建 thread 对象并管理之. (一般)会立即在 thread 中执行任务, 但是也可以一定程度上控制(std::launch::deferred).

  • std::packaged_task:抽象层次比 std::async低. 需要自己创建 thread . 使用分为3步, 1. 创建 std::packaged_task 对象, 传入函数对象, 2. 获取返回的 get_future(), 3. 调用函数对象 (可以通过 future 的 get() 隐式地触发), 从而触发新线程运行.

  • std::promise:抽象层次最低. 需要自己创建 thread 并且通过 set_value() 传递值给 thread. 粒度是最细的.

4.2.4 Saving an exception for the future

  1. std::async
    通过 std::async 不仅仅可以传递任务执行的结果, 还可以传递 exception, 这个 exception 也被放入 future 里. exception is stored in the future in place of a stored value, the future becomes ready, and a call to get() rethrows that stored exception.
    注意, future 返回的 exception 有可能是之前的对象, 也有可能是重新抛出的重新生成的对象. Note: the standard leaves it unspecified whether it is the original exception object that’s rethrown or a copy; different compilers and libraries make different choices on this matter.

  2. std::promisestd::packaged_task
    有 2 种方式实现 exception 的捕获与转发

  • 需要我们自己实现 exception 的传递与获取: an explicit function call. => set_exception() 获取. 过程如下
1
2
3
4
5
6
7
8
extern std::promise<double> some_promise;
try
{some_promise.set_value(calculate_value());}
catch(...){
some_promise.set_exception(std::current_exception());//to retrieve the thrown exception
//或者使用如下方式直接存储 exception 而不是 再次抛出.
//some_promise.set_exception(std::make_exception_ptr(std::logic_error("foo ")));
}
  • future 无法正常生成下的 std::future_error exception with an error code of std::future_errc::broken_promise. 因此只要前面的 exception 导致了 future 无法正常生成就可以获得这个 exception, 当然缺点是无法传递准确的原始 exception.

4.2.5 Waiting from multiple threads

std::future 是 move only 因此无法实现多个线程都去访问它的功能, 但是 std::shared_future 是 copyable. 只需要把有访问需要的线程自己拷贝一份 std::shared_future 就可以实现多线程对一份 future 的访问(通过指针 read, no write)了, 也不会产生 race condition.

下图是 std::shared_future 的原理

原书提出了一个非常有意思的例子来使用 std::shared_future: 例如在一个 excel 表里面每个单元格都有公式相互计算, 并且单元格之间有相互依赖关系, 我们可以通过不满足依赖的单元格一直被阻塞/等待, 满足依赖的单元格可以先执行, 然后导致依赖于这个已经完成计算的单元格又可以继续执行下去, 这样的链式爆炸的形式可以最大化地利用并发. 这个例子中 std::shared_future 的使用是一个单元格可能会被多个单元格依赖.

std::shared_future 的初始化有三种形式:

  1. std::shared_future<int> sf(std::move(f));std::future 实例移动, 与此同时原 std::future 实例 f 失效.
  2. std::promise<std::string> p;std::shared_future<std::string> sf(p.get_future()); get_future() 隐式把 std::future 转换成 std::shared_future.
  3. std::promise< std::map< SomeIndexType, SomeDataType, SomeComparator,SomeAllocator>::iterator> p; auto sf=p.get_future().share(); 使用 get_future()share() 成员函数显式转换.

4.3 Waiting with a time limit

如何设置等待时间, 时间设置有两种, duration-based timeoutabsolute timeout , 前者 _for suffix 后缀, 后者 _until suffix 后缀.

4.3.1 Clocks

第一个问题是如何获得时间. 时间有四个要素:

  1. The time now–现在时刻
  2. The type of the value used to represent the times obtained from the clock–时间类型
  3. The tick period of the clock–时间单位
  4. Whether or not the clock ticks at a uniform rate (steady clock)–稳定时间(a clock ticks at a uniform rate (whether or not that rate matches the period) and can’t be adjusted)

std::chrono::system_clock::now() 返回现在的 colck 下的时刻, 返回类型 some_clock::time_point, tick period of the clock 用秒的分数表示 std::ratio<1,25> 即 1 秒 25 次.

std::chrono::steady_clock 使用steady clock(is_steady 可以用来确认).
std::chrono::system_clock 获取系统时间. 不为 steady 的, an adjustment may cause a call to now() to return a value earlier than that returned by a prior call to now().
std::chrono::high_resolution_clock 最小分辨率的 clock.

4.3.2 Durations

std::chrono::duration<> class template 规定间隔, std::chrono::duration<double,std::ratio<1,1000>> 表示用 double 表达 1/1000 秒.

std::chrono 命名空间里已经预定义好了 nanoseconds, microseconds, milliseconds, seconds, minutes, and hours.

C++14 std::chrono_literals 命名空间甚至可以直接使用时间的字面量. 注意一般只用在整数类型后面, 浮点表示: 2.5min will be std::chrono::duration<some-floating-point-type,std::ratio<60,1>>.

1
2
3
4
using namespace std::chrono_literals;
auto one_day=24h;
auto half_an_hour=30min;
auto max_time_between_messages=30ms;

Conversion between durations is implicit, 向上转换会 truncation.

间隔支持四则运算,minutes(1) – seconds(55) = 5 * seconds(1).

下面是对 std::future 设置 wait 时间的简单例子:

1
2
3
std::future<int> f=std::async(some_task);
if(f.wait_for(std::chrono::milliseconds(35))==std::future_status::ready)
do_something_with(f.get());

设置等待时间后会有 3 种可能的结果:

  1. 等待时间到: std::future 的类型是 std::future_status::timeout.
  2. std::future 变成 ready: std::future 的类型是 std::future_status::ready.
  3. 被延迟时: std::future_status::deferred.

最后补充 std::chrono::duration 的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
namespace std::chrono {
template<class Rep, class Period = ratio<1>>
class duration {
public:
using rep = Rep;
using period = typename Period::type;

private:
rep rep_; // exposition only

public:
// construct/copy/destroy
constexpr duration() = default;
template<class Rep2>
constexpr explicit duration(const Rep2& r);
template<class Rep2, class Period2>
constexpr duration(const duration<Rep2, Period2>& d);
~duration() = default;
duration(const duration&) = default;
duration& operator=(const duration&) = default;

// observer
constexpr rep count() const;

// arithmetic
constexpr common_type_t<duration> operator+() const;
constexpr common_type_t<duration> operator-() const;
constexpr duration& operator++();
constexpr duration operator++(int);
constexpr duration& operator--();
constexpr duration operator--(int);

constexpr duration& operator+=(const duration& d);
constexpr duration& operator-=(const duration& d);

constexpr duration& operator*=(const rep& rhs);
constexpr duration& operator/=(const rep& rhs);
constexpr duration& operator%=(const rep& rhs);
constexpr duration& operator%=(const duration& rhs);

// special values
static constexpr duration zero() noexcept;
static constexpr duration min() noexcept;
static constexpr duration max() noexcept;
};
}

4.3.3 Time points

std::chrono::time_point 的接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
namespace std::chrono {
template<class Clock, class Duration = typename Clock::duration>
class time_point {
public:
using clock = Clock;
using duration = Duration;
using rep = typename duration::rep;
using period = typename duration::period;

private:
duration d_; // exposition only

public:
// construct
constexpr time_point(); // has value epoch
constexpr explicit time_point(const duration& d); // same as time_point() + d
template<class Duration2>
constexpr time_point(const time_point<clock, Duration2>& t);

// observer
constexpr duration time_since_epoch() const;

// arithmetic
constexpr time_point& operator++();
constexpr time_point operator++(int);
constexpr time_point& operator--();
constexpr time_point operator--(int);
constexpr time_point& operator+=(const duration& d);
constexpr time_point& operator-=(const duration& d);

// special values
static constexpr time_point min() noexcept;
static constexpr time_point max() noexcept;
};
}

此模板类的第一个模板参数是 clock, 第二个是 std::chrono::duration<>.

std::chrono::time_point 是相对于一个 epoch(由特定的 clock 决定此时刻) 经过时间的时间差, 例如 00:00 on January 1, 1970. 注意: Although you can’t find out when the epoch is, you can get the time_since_epoch() for a given time_point.

time_point 应用到 wait() 的机制: the wait tracks the clock change and won’t return until the clock’s now() function returns a value later than the specified timeout. 但是注意 now() 是根据不同的 clock 不同的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
std::condition_variable cv;
bool done;
std::mutex m;
bool wait_loop()
{
auto const timeout= std::chrono::steady_clock::now()+
std::chrono::milliseconds(500);
std::unique_lock<std::mutex> lk(m);
while(!done)
{
if(cv.wait_until(lk,timeout)==std::cv_status::timeout)
//其中的 == 判断很有必要, 考虑到假醒的可能性, 如果不判断每次都会等待完整的 duration, 甚至可能导致 total wait time unbounded.
break;
}
return done;
}

其中 enum class cv_status { no_timeout, timeout }; 为 scoped enumeration 表征状态.

4.3.4 Functions that accept timeouts

各个类与时刻/间隔结合的使用情况, 要求 Parameters listed as duration must be an instance of std::duration<>, and those listed as time_point must be an instance of std::time_point<>:

table41_1.JPG table41_2.JPG

4.4 Using synchronization of operations to simplify code

4.4.1 Functional programming with futures

函数式编程的意思大概是只要输入一样每次的输出都会一样, 不会因为环境改变也不会去改变环境.

下面是单线程(注释里有基于其的多线程版本)下的函数式编程快排实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input){
if(input.empty()){
return input;
}
std::list<T> result;
result.splice(result.begin(),input,input.begin());
T const& pivot=*result.begin();
auto divide_point=std::partition(input.begin(),input.end(),
[&](T const& t){return t<pivot;});
std::list<T> lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),
divide_point);
auto new_lower(sequential_quick_sort(std::move(lower_part)));
//std::future<std::list<T> > new_lower(
// std::async(&parallel_quick_sort<T>,std::move(lower_part)));
//创建异步线程解决下半部分,线程数=2^递归数
auto new_higher(sequential_quick_sort(std::move(input)));
auto new_higher(parallel_quick_sort(std::move(input)));
result.splice(result.end(),new_higher);
result.splice(result.begin(),new_lower);
return result;
}

自己创建线程的 FP 并行算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(),input,input.begin());
T const& pivot=*result.begin();
auto divide_point=std::partition(input.begin(),input.end(),
[&](T const& t){return t<pivot;});
std::list<T> lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),
divide_point);
std::future<std::list<T> > new_lower(std::async(&parallel_quick_sort<T>,std::move(lower_part)));
auto new_higher(parallel_quick_sort(std::move(input)));
result.splice(result.end(),new_higher);
result.splice(result.begin(),new_lower.get());
return result;
}

除了函数式编程 FP 以外还有 CSP (Communicating Sequential Processes) paradigm.

4.4.2 Synchronizing operations with message passing

CSP 模式是不依靠 shared data 而是线程间的 message 来实现分工, 最典型的设计就是每个线程设计成 infinite state machine, 线程间通过 message 相互改变 state 完成复杂任务, 例子是 ATM 的人机交互线程+机器硬件线程+银行系统交互线程的合作, 被叫做 Actor model —there are several discrete actors in the system (each running on a separate thread), which send messages to each other to perform the task at hand, and there’s no shared state except that which is directly passed via messages.
.

状态机的结构图如下:

作者在书中完整地完成了一个此系统的设计, 在附录 C 中.

4.4.3 Continuation-style concurrency with the Concurrency TS

std::experimental namespace 可以实现 continuation, 简单理解的话就是把需要执行的任务函数与需要等待的数据结合在一起, 而不是 std::future 里仅仅只是等待的数据. 通过添加 then() 成员函数实现, 同样也支持 move 语义:

1
2
3
4
5
6
std::experimental::future<int> find_the_answer;
std::string find_the_question(std::experimental::future<int> the_answer);
auto fut=find_the_answer();
auto fut2=fut.then(find_the_question);//find_the_answer 一旦 ready 就会执行 find_the_question
assert(!fut.valid()); //then 会 move future 实例, 原 fut 失效.
assert(fut2.valid());

引入这一特性是希望实现 When the data is ready, then do this processing. 而不是 block/wait 浪费资源, 具体如何实现的可以看下一节 chained future.

下面的例子是设计一个 spawn_async 接受一个函数作为入参(类似于 std::async), 生成一个 thread 后通过返回 std::experimental::future 返回任务函数的计算结果.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <experimental/future>
template<typename Func>
std::experimental::future<decltype(std::declval<Func>()())>
spawn_async(Func&& func){
std::experimental::promise<
decltype(std::declval<Func>()())> p;
auto res=p.get_future();
std::thread t(
[p=std::move(p),f=std::decay_t<Func>(func)]()
mutable{
try{
p.set_value_at_thread_exit(f());
} catch(...){
p.set_exception_at_thread_exit(std::current_exception());//既可以返回值也可以返回 exception
}
});
t.detach();
return res;
}

4.4.4 Chaining continuations

造成 block 的原因是函数执行的先后顺序与依赖顺序的天然矛盾产生的. 函数 A 里调用 B 依赖于 B 的计算结果, 造成的结果是 A 先执行然后 block/wait B 在另一个线程里进行运算, 运算结束了将其返回到 A, 唤醒 A 继续运行. 如果这个执行链条里有多个函数/依赖任务的话, 会有大量的 block/wait 线程. continuation 通过 future-unwrapping 的机制通过传入返回均是 continuation 实现 no blocking in your asynchronous function chain.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <experimental/future>
std::experimental::future<void> process_login(
std::string const& username,std::string const& password)
{
return spawn_async([=](){
return backend.authenticate_user(username,password);
}).then([](std::experimental::future<user_id> id){
// }).then([](auto id){
//C++14可以省略成auto
return backend.request_current_info(id.get());
}).then([](std::experimental::future<user_data> info_to_display){
try{
update_display(info_to_display.get());//final continuation
} catch(std::exception& e){
display_error(e);
}
});
}

同样地还存在 std::experimental::shared_future can have more than one continuation 而不是多个任务共享一个 future, 这与常规的 shared 的设计不一致. 目的是避免 multiple objects can refer to the same shared state, if only one continuation was allowed, there would be a race condition between two threads that were each trying to add continuations to their own std::experimental::shared_future objects.

1
2
3
4
5
6
auto fut=spawn_async(some_function).share();
auto fut2=fut.then([](std::experimental::shared_future<some_data> data){
do_stuff(data);});
//fut 可以对应多个 continuations
auto fut3=fut.then([](std::experimental::shared_future<some_data> data){
return do_other_stuff(data);});

注意如果不用 share() 的话, 返回值是 a plain std::experimental:: future.

4.4.5 Waiting for more than one future

一个任务的完成依赖于多个 futures 的情况怎么办? 一个案例如下, 将数据分段, 每段用单独线程计算后用一个线程将所有计算结果收集在一起.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
std::future<FinalResult> process_data(std::vector<MyData>& vec)
{
size_t const chunk_size=whatever;
std::vector<std::future<ChunkResult>> results;
for(auto begin=vec.begin(),end=vec.end();beg!=end;){
size_t const remaining_size=end-begin;
size_t const this_chunk_size=std::min(remaining_size,chunk_size);
results.push_back(
std::async(process_chunk,begin,begin+this_chunk_size));
begin+=this_chunk_size;
}
return std::async([all_results=std::move(results)](){
std::vector<ChunkResult> v;
v.reserve(all_results.size());
for(auto& f: all_results)
{
v.push_back(f.get());//B spot
}
return gather_results(v);
});
}

这种做法的缺点: 最终收集的线程会被执行计算的线程一次次叫醒, 浪费资源. 能不能所有的计算都完成了再去叫醒收集的线程呢? 这其实是个 futures 之间的 AND 的逻辑关系.

because it waits for each task individually, it will repeatedly be woken by the scheduler at B as each result becomes available, and then go back to sleep again when it finds another result that is not yet ready. Not only does this occupy the thread doing the waiting, but it adds additional context switches as each future becomes ready, which adds additional overhead.

std::experimental::when_all 可以接受一组 futures 然后组合成一个 future 返回.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
std::experimental::future<FinalResult> process_data(
std::vector<MyData>& vec)
{
size_t const chunk_size=whatever;
std::vector<std::experimental::future<ChunkResult>> results;
for(auto begin=vec.begin(),end=vec.end();beg!=end;){
size_t const remaining_size=end-begin;
size_t const this_chunk_size=std::min(remaining_size,chunk_size);
results.push_back(
spawn_async(
process_chunk,begin,begin+this_chunk_size));
begin+=this_chunk_size;
}
return std::experimental::when_all( //所有 spawn_async 生成的线程的 futures 都 ready 才会 then()
results.begin(),results.end()).then(
[](std::future<std::vector<
std::experimental::future<ChunkResult>>> ready_results)
{
std::vector<std::experimental::future<ChunkResult>>
all_results=ready_results .get();
std::vector<ChunkResult> v;
v.reserve(all_results.size());
for(auto& f: all_results)
{
v.push_back(f.get());
}
return gather_results(v);
});
}

除了对 futures 做 AND 逻辑, 还有对其做 OR 逻辑的 when_any. This creates a future that becomes ready when any of the supplied futures becomes ready.

4.4.6 Waiting for the first future in a set with when_any

下面的案例是从分段的 data set 中分多个线程找到符合标准的一个数据, 只要有一个找到了就可以进行下一步, 并且终止其他寻找线程的执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <atomic>
#include <experimental/future>
#include <memory>
#include <vector>
struct MyData {};
struct FinalResult {};

bool matches_find_criteria(MyData const &);
FinalResult process_found_value(MyData const &);

std::experimental::future<FinalResult>
find_and_process_value(std::vector<MyData> &data) {
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_tasks = (concurrency > 0) ? concurrency : 2;
std::vector<std::experimental::future<MyData *>> results;
auto const chunk_size = (data.size() + num_tasks - 1) / num_tasks;
auto chunk_begin = data.begin();
std::shared_ptr<std::atomic<bool>> done_flag =
std::make_shared<std::atomic<bool>>(false); //flag 设置成 shared_ptr 所有线程都可以去修改, 并且不用管理生命周期
for (unsigned i = 0; i < num_tasks; ++i) {
auto chunk_end =
(i < (num_tasks - 1)) ? chunk_begin + chunk_size : data.end();
results.push_back(std::experimental::async([=] { //每个 future 都有每段数据的指针, 保证不会错误访问
for (auto entry = chunk_begin; !*done_flag && (entry != chunk_end);
++entry) {
if (matches_find_criteria(*entry)) {
*done_flag = true;
return &*entry;
}
}
return (MyData *)nullptr;//指针类型转换
}));
chunk_begin = chunk_end;
}
std::shared_ptr<std::experimental::promise<FinalResult>> final_result =
std::make_shared<std::experimental::promise<FinalResult>>();
struct DoneCheck {
std::shared_ptr<std::experimental::promise<FinalResult>> final_result;

DoneCheck(
std::shared_ptr<std::experimental::promise<FinalResult>>
final_result_)
: final_result(std::move(final_result_)) {}

void operator()(
std::experimental::future<std::experimental::when_any_result<
std::vector<std::experimental::future<MyData *>>>>
results_param) {
auto results = results_param.get();
MyData *const ready_result = results.futures[results.index].get();//when_any_result 内部有 index 与 每个 future 关联
if (ready_result) //找到了 set_value
final_result->set_value(process_found_value(*ready_result));
else {//没找到drop掉,并且从剩余的future中继续寻找使用 when_any
results.futures.erase(results.futures.begin() + results.index);
if (!results.futures.empty()) {
std::experimental::when_any(
results.futures.begin(), results.futures.end())
.then(std::move(*this));//递归调用
} else {//遍历完也没有符合的,抛出错误
final_result->set_exception(
std::make_exception_ptr(
std::runtime_error("Not found")));
}
}
}
};

std::experimental::when_any(results.begin(), results.end())
.then(DoneCheck(final_result));//只要有一个寻找任务成功就执行 DoneCheck
return final_result->get_future();
}

注意这个例子里面的查找是双重确定, 即先 matches_find_criteria() 然后 DoneCheck().

这里的 when_any 使用了 iterator-range overloads.
如果 futures 的个数/类型不确定/不一致的话, 还可以使用 tuple-like interface.

1
2
3
4
5
6
7
8
9
std::experimental::future<int> f1=spawn_async(func1);
std::experimental::future<std::string> f2=spawn_async(func2);
std::experimental::future<double> f3=spawn_async(func3);
std::experimental::future<std::tuple<
std::experimental::future<int>,
std::experimental::future<std::string>,
std::experimental::future<double>>>
result=
std::experimental::when_all(std::move(f1),std::move(f2),std::move(f3));

4.4.7 Latches and barriers in the Concurrency TS

C++20 里已经进入标准了:

std::latch: 多个线程把它的 counter 减到0, 它就会变成 ready 直到析构, 并且不关心哪个/哪些线程用什么形式把 counter 减到 0(包括一个线程多次减小 counter). A lightweight facility for waiting for a series of events to occur.

std::barrier: reusable synchronization component used for internal synchronization between a set of threads. with barriers, each thread can only arrive at the barrier once per cycle.
当一个 thread 到达 barrier, 所有已经到达 barrier 的 threads 都会被 block, 直到所有的 thread 都到达. 一旦都到达, 也可以 reuse: 重置 barrier, 进入下一个循环.

4.4.8 A basic latch type: std::experimental::latch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
namespace std {
class latch {
public:
static constexpr ptrdiff_t max() noexcept;

constexpr explicit latch(ptrdiff_t expected);
~latch();

latch(const latch&) = delete;
latch& operator=(const latch&) = delete;

void count_down(ptrdiff_t update = 1);//默认为 1, 到达一个 event 减 1.
bool try_wait() const noexcept;//确认 ready 与否.
void wait() const;//等待此 latch counter 为 0, 即 ready.
void arrive_and_wait(ptrdiff_t update = 1);//both count down the counter and then wait for the counter to reach zero
private:
ptrdiff_t counter; // exposition only
};
}

例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void foo(){
unsigned const thread_count=...;
latch done(thread_count);
my_data data[thread_count];
std::vector<std::future<void> > threads;
for(unsigned i=0;i<thread_count;++i)
//只有i是值传递,防止i上的race condition
threads.push_back(std::async(std::launch::async,[&,i]{
data[i]=make_data(i);
done.count_down();
do_more_stuff();//注意此处各个 std::async 线程的执行可能与 main 并行, 并且无法保证在 main 线程结束之前一定结束
}));
done.wait();
process_data(data,thread_count);
}

4.4.9 std::experimental::barrier: a basic barrier

C++20 里的标准接口如下, 更接近 std::experimental::flex_barrier:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
namespace std {
template<class CompletionFunction = /* a function object type, must meet the requirements of MoveConstructible and Destructible. */>
class barrier {
public:
using arrival_token = /* an unspecified object type meeting requirements of MoveConstructible, MoveAssignable and Destructible */;

static constexpr ptrdiff_t max() noexcept;//the maximum value of expected count supported by the implementation

constexpr explicit barrier(ptrdiff_t expected,
CompletionFunction f = CompletionFunction());
~barrier();
//not assignable
barrier(const barrier&) = delete;
barrier& operator=(const barrier&) = delete;

[[nodiscard]] arrival_token arrive(ptrdiff_t update = 1);//arrives at barrier and decrements the expected count
void wait(arrival_token&& arrival) const;//blocks at the phase synchronization point until its phase completion step is run

void arrive_and_wait();//arrives at barrier and decrements the expected count by one, then blocks until current phase completes
void arrive_and_drop();//decrements both the initial expected count for subsequent phases and the expected count for current phase by one
//thread 退出此 barrier 的接口

private:
CompletionFunction completion; // exposition only
};
}

应用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
result_chunk process(data_chunk);
std::vector<data_chunk> divide_into_chunks(data_block data, unsigned num_threads);
void process_data(data_source &source, data_sink &sink) {
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_threads = (concurrency > 0) ? concurrency : 2;
std::experimental::barrier sync(num_threads);
std::vector<joining_thread> threads(num_threads);
std::vector<data_chunk> chunks;
result_block result;
for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = joining_thread([&, i] {
while (!source.done()) {
if (!i) {
data_block current_block =
source.get_next_data_block();
chunks = divide_into_chunks(
current_block, num_threads);
} //thread 0 进行数据的分段预处理
sync.arrive_and_wait();//在 thread 0 分段时, 大家都 arrive 并 等待其完成
result.set_chunk(i, num_threads, process(chunks[i]));//并行计算
sync.arrive_and_wait();//再次利用同一个 barrier
if (!i) {
sink.write_data(std::move(result));
}
}
});
}
}

4.4.10 std::experimental::flex_barrierstd::experimental::barrier’s flexible friend

std::experimental::flex_barrierstd::experimental::barrier 相比可以在一个 thread 上传入函数处理一些收尾的工作, 同时下次循环之前可以改变线程的数量(增加/减少均可以).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void process_data(data_source &source, data_sink &sink) {
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_threads = (concurrency > 0) ? concurrency : 2;

std::vector<data_chunk> chunks;

auto split_source = [&] {
if (!source.done()) {
data_block current_block = source.get_next_data_block();
chunks = divide_into_chunks(current_block, num_threads);
}
};

split_source();

result_block result;

std::experimental::flex_barrier sync(num_threads, [&] {//传入的函数在thread 0上执行
sink.write_data(std::move(result));
split_source();
return -1;//-1 代表下个循环线程数不变 ,0-N 指定线程数目
});
std::vector<joining_thread> threads(num_threads);


for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = joining_thread([&, i] {
while (!source.done()) {
result.set_chunk(i, num_threads, process(chunks[i]));
sync.arrive_and_wait(); //这里只需要一次 arrive_and_wait 即可
}
});
}
}
作者

cx

发布于

2021-11-09

更新于

2022-10-20

许可协议