C++并发实战-第一至第三章-总学习笔记第1

C++并发实战-第一至第三章-总学习笔记第1

[TOC]
C++ 并发实战《C++ Concurrency in Action》的学习笔记1, 记录第一章到第三章的部分.
内容包括 C++ 并发的背景, 线程管理, 线程间的数据共享等.

1. 你好, C++ 的并发世界

C++11 开始支持多线程

1.1 何谓并发

并发, 是指两个或更多独立的活动同时发生.

1.1.1 计算机系统中的并发

计算机领域的并发指的是在单个系统里同时执行多个独立的任务, 而非顺序的进行一些活动.
任务切换得太快造成”并发的假象”其实也是并发.
硬件并发(hardware concurrency): 多个核心并发.
现在的并发一般是硬件并发 + 任务切换组合.

1.1.2 并发的途径

  • 多进程并发

将应用程序分为多个独立的进程, 独立的进程可以通过进程间常规的通信渠道传递讯息(信号, 套接字, 文件, 管道等等).

缺点:

  1. 进程之间的通信通常不是设置复杂, 就是速度慢.
  2. 需要时间启动进程, 操作系统需要内部资源来管理进程.

优点:

  1. 更容易编写安全的并发代码.
  2. 可以使用远程连接(可能需要联网)的方式,在 不同的机器上运行独立的进程.
  • 多线程并发

进程中的所有线程都共享地址空间, 并且所有线程访问到大部分数据———全局变量仍然是全局的, 指针, 对象的引用或数据可以在线程之间传递.
优点: 开销小.
缺点: 安全性低.

C++ 标准并未对进程间通信提供任何原生支持, 全是多线程并发.

1.1.3 并发与并行

其区别主要在于关注点和意图方面(差距甚微).

People talk about parallelism when their primary concern is taking advantage of the available hardware to increase the performance of bulk data processing, whereas people talk about concurrency when their primary concern is separation of concerns, or responsiveness.

1.2 为什么使用并发?

1.2.1 为了分离关注点

不同任务的代码分离, 仅需关注自身代码以及任务关联信号.

1.2.2 为了性能

任务并行(task parallelism, 各个部分之间可能存在着依赖)与数据并行(data parallelism, 每个线程在不同的数据部分上执行相同的操作).
可以清楚地分割任务/数据的做法: 易并行(embarrassingly parallel)算法: have good scalability properties(随着可用的 thread 数目增加, 性能可能会线性增加).

1.2.3 什么时候不使用并发

不使用并发的唯一原因就是收益比不上成本.例如,开发维护时间成本.

性能增益:

  • 实际执行任务的时间可能要比启动线程的时间小.
  • 每个线程都需要一个独立的堆栈空间,所以运行太多的线程也会耗尽进程的可用内存或地址空间.
  • 运行越多的线程,操作系统就需要越多的上下文切换.

1.3 C++中的并发和多线程

1.3.1 C++多线程历史

起初: C 语言中流行的多线程 API———POSIX 标准中的 C 标准和 Microsoft Windows API + Boost 和 ACE 平台无关库.

1.3.2 新标准支持并发

C++11 标准: Boost 线程库作为新类库的主要模型.

1.3.3 C++14 和 C++17 对并发和并行的更多支持

1.3.4 C++ 线程库的效率

抽象代价(abstraction penalty):使用高级工具和使用低级工具的开销差.
C++ 实现了底层操作工具与抽象易用工具之间平衡.

1.3.5 平台相关的工具

native_handle() 成员函数,允许通过使用平台相关 API 直接操作底层实现.

1.4 开始入门

initial function: Thread 开始工作的入口, 比如 main 函数.

2. 线程管理

2.1 线程管理的基础

2.1.1 启动线程

构造 std::thread 对象.

可以用可调用类型构造, 将带有函数调用符类型的实例传入 std::thread 类中, 替换默认的构造函数.

注意语法解析:

1
2
3
4
5
6
7
8
std::thread my_thread(background_task());//本意是使用产生的一个临时的函数对象,
//但是可能会被编译认为是函数的声明.
//导致输入参数类型为 type pointer-to-a-function-taking-no-parameters-and-returning-a-background_task-object
//整个函数返回一个空的 thread 对象, 而不是启动一个 thread.
//应该用下面的 2 种方式.
std::thread my_thread((background_task()));//防止被误解为函数声明
std::thread my_thread{background_task()};//uniform initialization
std::thread my_thread([]{do_something();do_something_else();});//也可以使用 lambda 表达式.

2.1.2 等待线程完成

要保证线程在 main 线程 std::terminate() 之前对启动的线程作出处理, 注意也要考虑 exception 的情况下. 有 2 种处理方式.

  • 线程分离 main 线程自主运行: detach(), 只有执行的函数返回了, 分离出去的线程才会被结束. 此时注意访问已经销毁的未定义行为, 尤其是线程函数还持有函数局部变量的指针或引用的情况下.
    解决办法之一: make the thread function self-contained and copy the data into the thread rather than sharing the data. 把数据考进入 callable 里. 还有一种办法就是使用下一种处理方式.

  • main 线程等待其完成, 期间被阻塞: join().

只能对一个线程使用一次 join(). 判断是否 join 过: joinable(). 原因如下: The act of calling join() also cleans up any storage associ
ated with the thread, so the std::thread object is no longer associated with the now-finished thread; it isn’t associated with any thread.

PS. 这一切在 C++ 20 里有了很大的改善. std::jthread 替代了 std::thread 可以通过 stop source/stop token 来控制线程什么时候结束.

2.1.3 Waiting in exceptional circumstances

注意 if an exception is thrown after the thread has been started but before the call to join() 可能会导致线程的资源不能被释放. detach() 没有这个问题.

解决办法:

  • uses a try/catch block: 不好.
  • use the standard Resource Acquisition Is Initialization (RAII) idiom and provide a class that does the join() in its destructor.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class thread_guard
{
std::thread& t;
public:
explicit thread_guard(std::thread& t_): t(t_) {}
~thread_guard()
{
if(t.joinable()) t.join(); //析构的时候 join()
}
thread_guard(thread_guard const&)=delete;//delete 保证 not automatically provided by the compiler
thread_guard& operator=(thread_guard const&)=delete;
};

struct func;//a functor

void f()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread t(my_func);
thread_guard g(t);
do_something_in_current_thread();
}

2.1.4 Running threads in the background

detach() 后 ownership and control are passed over to the C++ Runtime Library. detached 线程被叫做 daemon threads.

  • detached 线程不能再 join().
  • 注意不能 detach 没有绑定线程的 std::thread 对象. you can’t call detach() on a std::thread object with no associated thread of execution.
  • 因此也可以通过 t.joinable() returns true 来判断 std::thread 是否可以 detach.

2.2 Passing arguments to a thread function

先让我们看看 std::thread接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
namespace std {
class thread {
public:
// types
class id;
using native_handle_type = /* implementation-defined */;

// default construct
thread() noexcept;
// universal reference as argument type
template<class F, class... Args> explicit thread(F&& f, Args&&... args);
~thread();

//movable only
thread(thread&&) noexcept;
thread& operator=(thread&&) noexcept;
thread(const thread&) = delete;
thread& operator=(const thread&) = delete;


// members
void swap(thread&) noexcept;
bool joinable() const noexcept;
void join();
void detach();
id get_id() const noexcept;
native_handle_type native_handle();

// static members
static unsigned int hardware_concurrency() noexcept;
};
}

后面的注意点可能就比较好懂一些

向线程传递函数参数注意:

  1. the arguments are copied into internal storage. 不管是什么传入的是什么东西都是被 copy 进去的, 不管是 value/pointer/reference.

  2. 参数的初始化(因为 copy)是在新开的线程内进行的, 而不是在创造新线程的线程内. 因此当传入参数是指针/引用的时候, 可能会导致深拷贝而导致从外考到 std::thread 内部时的 lifetime 不一致的情况. 有人会说我只 pass-by-value 不就行了. 但是还是有 type decay 的情况. 例如下面的字符数组到 std::string 的情形.

1
2
3
4
5
6
7
8
9
void f(int i,std::string const& s);
void oops(int some_param){
char buffer[1024];
sprintf(buffer, "%i",some_param);
//std::thread t(f,3,buffer);
//NG: 数组 decay 成指针 char * 被 copy initilization 成 std::string 的过程是在新开的线程 t 里, 万一当前线程里把 buffer 销毁了, lifetime 不一致 -> undefined behaviror
std::thread t(f,3,std::string(buffer)); //OK: 在当前线程里构造好传进去
std::threadt.detach();
}
  1. 函数参数是 non-const reference 时, pass-by-value 要注意使用 std::ref 包装一下, 否则无法编译. 原因是传进去的 value 被盲目地 copy 进入构造函数内, 成为了 rvalue(编译实现会先拷贝成 temporary value 然后在通过 implicit/explicit conversion 看看能不能与参数类型匹配), 而 non-const reference 无法匹配 rvalue. 因此我们需要将 pass-by-value 通过 std::ref 包装成引用.
1
2
3
4
5
6
7
8
9
10
void update_data_for_widget(widget_id w,widget_data& data);
void oops_again(widget_id w)
{
widget_data data;
//std::thread t(update_data_for_widget,w,data); //NG
std::thread t(update_data_for_widget,w,std::ref(data)); //OK
display_status();
t.join();
process_widget_data(data);
}
  1. 类似 std::bind,std::thread 接受成员函数作为函数指针传参.
1
2
3
4
5
6
7
8
class X{
public:
void do_lengthy_work();
};

X my_x;
std::thread t(&X::do_lengthy_work,&my_x,double para 1); //成员函数以及类实例作为thread构造函数参数.
//第三个参数起可以传递成员函数的参数para.
  1. 支持 move semantics: 对于 temporary object, move is automatic. 对于 named value, the transfer must be requested directly by invoking std::move().
1
2
3
4
void process_big_object(std::unique_ptr<big_object>);
std::unique_ptr<big_object> p(new big_object);
p->prepare_data(42);
std::thread t(process_big_object,std::move(p));//transfer ownership

instances of std::thread are movable,but not copyable. This ensures that only one object is associated with a particular thread of execution at any one time.

2.3 Transferring ownership of a thread

resource-owning types: std::ifstream, std::unique_ptr, std::thread.

  • you can’t just drop a thread by assigning a new value to the std::thread object that manages it.
1
2
3
4
5
6
7
8
void some_function();
void some_other_function();
std::thread t1(some_function);
std::thread t2=std::move(t1);
t1=std::thread(some_other_function);
std::thread t3;
t3=std::move(t2);
t1=std::move(t3);//std::terminate(), 因为 t1 的函数 some_other_function 有可能还没运行完成
  • std::thread 可以作为函数的参数以及返回值.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//as return 
std::thread f(){
void some_function();
return std::thread(some_function);
}

std::thread g(){
void some_other_function(int);
std::thread t(some_other_function,42);
return t;
}

//as argument
void f(std::thread t);
void g()
{
void some_function();
f(std::thread(some_function));
std::thread t(some_function);
f(std::move(t));
}
  • 创建一个 scoped_threadstd::thread 生命周期进行管理的类.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class scoped_thread{
std::thread t;
public:
explicit scoped_thread(std::thread t_):
t(std::move(t_)){
if(!t.joinable())
throw std::logic_error("No thread");
}
~scoped_thread(){t.join();}
scoped_thread(scoped_thread const&)=delete;
scoped_thread& operator=(scoped_thread const&)=delete;
};

struct func;
void f(){
int some_local_state;
scoped_thread t{std::thread(func(some_local_state))};
do_something_in_current_thread();} //f运行结束t也自动结束
  • C++17 未能得到通过的 joining_thread class 实现了上述 scoped_thread 的功能, 也许会在 C++20 看到(可惜还是没有得到完全实现).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class joining_thread{
std::thread t;
public:
joining_thread() noexcept=default;

template<typename Callable,typename ... Args>
explicit joining_thread(Callable&& func,Args&& ... args):
t(std::forward<Callable>(func),std::forward<Args>(args)...){}

explicit joining_thread(std::thread t_) noexcept:
t(std::move(t_)){}

joining_thread(joining_thread&& other) noexcept:
t(std::move(other.t)){}

joining_thread& operator=(joining_thread&& other) noexcept{
if(joinable())
join();}

joining_thread& operator=(std::thread other) noexcept{
if(joinable())
join();
t=std::move(other);
return *this;}

~joining_thread() noexcept{
if(joinable())
join();}

void swap(joining_thread& other) noexcept{t.swap(other.t);}

std::thread::id get_id() const noexcept{return t.get_id();}

bool joinable() const noexcept{return t.joinable();}

void join(){t.join();}

void detach(){t.detach();}

std::thread& as_thread() noexcept{return t;}

const std::thread& as_thread() const noexcept{return t;}
};
  • thread 可以放入容器中, 例如 std::vector<>.
1
2
3
4
5
6
7
8
void do_work(unsigned id);
void f(){}
std::vector<std::thread> threads;
for(unsigned i=0;i<20;++i){
threads.emplace_back(do_work,i);//直接用构造函数的参数构造 std::thread.
}
for(auto& entry: threads)
entry.join();}

2.4 Choosing the number of threads at runtime

std::thread::hardware_concurrency() 成员变量给出能够实现并发的 thread 数目的 hint.

A naïve parallel version of std::accumulate:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
template<typename Iterator,typename T>
struct accumulate_block{
void operator()(Iterator first,Iterator last,T& result){
result=std::accumulate(first,last,result);}
};
template<typename Iterator,typename T> //Iterator至少有forward iterators.T: default-constructible
T parallel_accumulate(Iterator first,Iterator last,T init){
unsigned long const length=std::distance(first,last);
if(!length) return init;

unsigned long const min_per_thread=25;
unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads= std::thread::hardware_concurrency(); //返回平台核心数目
unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads-1);
Iterator block_start=first;

for(unsigned long i=0;i<(num_threads-1);++i){ //这里减1的原因是main本身已经占了一个线程
Iterator block_end=block_start;
std::advance(block_end,block_size);
threads[i]=std::thread(
accumulate_block<Iterator,T>(),
block_start,block_end,std::ref(results[i]));//you can’t return a value directly from a thread, you must pass in a eference
block_start=block_end;
}

accumulate_block<Iterator,T>()(block_start,last,results[num_threads-1]);//最后一个单独拿出来是考虑分不均匀的情况
for(auto& entry: threads)
entry.join();

return std::accumulate(results.begin(),results.end(),init);//最后把所有线程的计算结果累加起来返回结果
}

2.5 Identifying threads

thread 的 ID 使用 std::thread::id 类型.
两种获取 id 的方式:

  1. 成员函数 get_id().
  2. std::this_thread:: get_id().当前 thread 的 id.

接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
namespace std {
class thread::id {
public:
id() noexcept;
};

bool operator==(thread::id x, thread::id y) noexcept;
strong_ordering operator<=>(thread::id x, thread::id y) noexcept; //c++20 里新增的 spaceship operator 与 ordering type

template<class charT, class traits>
basic_ostream<charT, traits>&
operator<<(basic_ostream<charT, traits>& out, thread::id id);

// hash support
template<class T> struct hash;
template<> struct hash<thread::id>;
}

If the std::thread object doesn’t have an associated thread of execution, the call to get_id() returns a default-constructed std::thread::id object, which indicates “not any thread.”

下面的例子作为匹配任务与 thread 的示意.

1
2
3
4
5
6
7
std::thread::id master_thread;
void some_core_part_of_algorithm(){
if(std::this_thread::get_id()==master_thread){
do_master_thread_work();
}
do_common_work();
}

thread 虽然没有语义信息(还是可以 std::cout 打印出来)但可以作为关联容器的 key, 也可以用来比较.

比较时,如果不同代表不同的 thread, 相同代表相同的 thread 或者空 thread.
std::hash<std::thread::id> 可以让 id 参与比较, 排序.

3. Sharing data between threads

3.1 Problems with sharing data between threads

  • reading 不会导致问题.
  • invariants 概念: 如何保证 shared 数据结构在所有时刻是不变的?
    作者举的例子是双向链表做删除一个中间节点时, 有些时刻不是 invariants 的. 这是对值改变的进一步抽象.

3.1.1 Race conditions

problematic race condition: 跟多窗口买票一样抢电影座. 尤其是这个问题不好复现, 也具有偶发性(timing 强相关, debugger affects the timing of the program), 因此必须重视(hard to find and hard to duplicate because the window of opportunity is small.). 也可以被叫做 data races => the dreaded undefined behavior.

定义:
In concurrency, a race condition is anything where the outcome depends on the relative ordering of execution of operations on two or more threads; the threads race to perform their respective operations.

3.1.2 Avoiding problematic race conditions

  1. 方法1: wrap your data structure with a protection mechanism. 对既有的数据结构套一层保护机制(例如锁). 这个应用更加普遍.
  2. 方法2: lock-free programming, modify the design of your data structure and its invariants. 从根本上改变数据结构, 不用锁也可以的数据结构.
  3. 方法3: software transactional memory (STM), 不是标准内容, 本书不介绍(但作为 TS 在发展: Technical Specification for C++ Extensions for Transactional Memory). The basic idea of doing something privately and then committing in a single step.

3.2 Protecting shared data with mutexes

mutex: (mutual exclusion) 互斥锁.

3.2.1 Using mutexes in C++

虽然 std::mutex 有成员函数可以实现 lock()unlock(), 但需要手动准确指定有时不准确/方便.

std::lock_guard 管理 std::mutex 的周期(使用 RAII 方式, 即其在构造函数中对 std::mutex 变量进行锁定, 在其析构函数中对 std::mutex 变量进行解锁, 整个类没有对 std::mutex 进行解锁和加锁的对外接口).

最好把 std::mutex 以及要锁的数据放到一个 class 里,并且注意接口设计不要留出 stray pointer or reference 的可访问 backdoor(std::mutex 也锁不住指针访问).

Any code that has access to that pointer or reference can now access (and potentially modify) the protected data without locking the mutex.

3.2.2 Structuring code for protecting shared data

对 shared data 不要暴露任何形式的指针/引用接口.

Don’t pass pointers and references to protected data outside the scope of the lock, whether by returning them from a function, storing them in externally visible memory, or passing them as arguments to user-supplied functions.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class some_data{
int a;
std::string b;
public:
void do_something(){}
};
class data_wrapper
{
private:
some_data data;
std::mutex m;
public:
template<typename Function>
void process_data(Function func){
std::lock_guard<std::mutex> l(m);
func(data);
}
};

some_data* unprotected;

void malicious_function(some_data& protected_data){
unprotected=&protected_data; //这里恶意函数把 data_wrapper 类里的 data 从被保护变成了不被保护, 存在风险.
}
data_wrapper x;

void foo(){
x.process_data(malicious_function);
unprotected->do_something();//unprotected 指针可以访问被锁起来的数据
}

3.2.3 Spotting race conditions inherent in interfaces

即便加上锁, 并不意味着万事大吉. 还有两种情况也有可能出现 race condition.

假设一个 stack adaptor 的接口设计如下, 并且保证每次对 stack 的访问都是加锁的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template<typename T,typename Container=std::deque<T> >
class stack
{
public:
explicit stack(const Container&);
explicit stack(Container&& = Container());
template <class Alloc> explicit stack(const Alloc&);
template <class Alloc> stack(const Container&, const Alloc&);
template <class Alloc> stack(Container&&, const Alloc&);
template <class Alloc> stack(stack&&, const Alloc&);
bool empty() const;
size_t size() const;
T& top();
T const& top() const;
void push(T const&);
void push(T&&);
void pop();
void swap(stack&&);
template <class... Args> void emplace(Args&&... args);
};
  1. 不同成员函数之间可能存在依赖: 例如 size(),empty()pop().
1
2
3
4
5
6
7
stack<int> s;
if(!s.empty())
{
int const value=s.top();
s.pop();
do_something(value);
}

一个线程判断 empty()false 的时候, 进入处理程序, 但是另一个程序先进入后 pop() 了唯一的 node 导致第一个线程访问 top() 是空的. 如果你不嫌麻烦/增加成本的话, 可以加 exception.

  1. 进一步加锁, 不止是对数据加速, 对所有可能访问数据的成员函数也进行加锁. 也有可能出现问题: 执行过程的 interleave(交错).

如下图, 同一个元素被处理了 2 遍, 一个元素被跳过, 并且系统不会报错, 很难查到原因.
A_possible_ordering_of_operations_on_a_stack_from_two_threads.png

很自然地大家就想到了, 那把 pop()top() 函数结合成一个函数不就可以避免这个问题了吗? sometimes naive!

大牛们(Herb Sutter 在设计 exception-safety 时)早就想过这一点,std::stackpop()top() 的分离是有合理的原因的.
比如 std::stack<std::vector<int>>, 顺序是 pop() 删除 stack 的 top 元素 => copy top 元素给 top() 函数的返回值, 但是如果返回过程中有 exception 的话, 出现了悬空的 copy temporary 元素. 至于是为啥是这个顺序, 书里没介绍, 至少说明这个顺序是可能的.

那有啥办法解决这个悬空的元素的问题?

  • OPTION 1: PASS IN A REFERENCE
1
2
std::vector<int> result;
some_stack.pop(result);

​缺点: 浪费构造资源, 也许没有足够的参数在进行 pop() 之前就能构造出对象, 也许有些自定义的类根本就没有这个构造函数(stored type be assignable).

  • OPTION 2: REQUIRE A NO-THROW COPY CONSTRUCTOR OR MOVE CONSTRUCTOR

表面上的原因就是 pop 扔了异常出来导致中断悬空, 那我不扔不就完了. 虽然现在很多对象/type 在移动构造函数不会扔异常, 虽然拷贝构造函数还会扔, 但是这样的 type 还是少(通过 std::is_nothrow_copy_constructible and std::is_nothrow_move_constructible查看), 很多自定义的对象更不用说了, 不能保证没有.

  • OPTION 3: RETURN A POINTER TO THE POPPED ITEM

返回指针呢? 对于简单的对象/type 指针的代价比 value 高.
智能指针 std::shared_ptr 可以采用.

  • OPTION 4: PROVIDE BOTH OPTION 1 AND EITHER OPTION 2 OR 3

终极办法: 把上面三个方法揉到一起取长补短. 例如以下例子(option 1 + 3):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <exception>
#include <stack>
#include <mutex>
#include <memory>

struct empty_stack: std::exception{
const char* what() const throw(){
return "empty stack";
}
};

template<typename T>
class threadsafe_stack{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack(){};
threadsafe_stack(const threadsafe_stack& other){
// You do the copy in the constructor body B rather than the member initializer list
// in order to ensure that the mutex is held across the copy.
std::lock_guard<std::mutex> lock(other.m);
data=other.data;
}

threadsafe_stack& operator=(const threadsafe_stack&) = delete;

void push(T new_value){
std::lock_guard<std::mutex> lock(m);
data.push(new_value);
}

std::shared_ptr<T> pop(){ //返回智能指针对象
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
std::shared_ptr<T> const res(std::make_shared<T>(data.top())); //先用智能指针把元素接住, 避免悬空
data.pop(); //悬空避免了, 再进行实际的元素删除工作
return res;
}
void pop(T& value){ //返回引用, 如果返回对象比较大, 这不是个好选择
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value=data.top();
data.pop();
}
bool empty() const{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

int main()
{
threadsafe_stack<int> si;
si.push(5);
si.pop();
if(!si.empty()){
int x;
si.pop(x);}
}

这个 stack 几乎只有 push()pop() 功能了, 虽然能保证这两个操作是安全的.
那要是直接做个 global 的 mutex 锁住所有的操作就没这么多考虑了, 但是粒度太粗. 我们需要写出 fine-grained locking schemes.

3.2.4 Deadlock: the problem and a solution

互等都不执行即为死锁. 不要以为把锁与解锁的顺序对上了就一定不会产生死锁.
例如两个类都锁住自己等着 swap. 下面的代码是解决这个问题的例子. 使用 std::lock(): Locks the given Lockable objects lock1, lock2, …, lockn using a deadlock avoidance algorithm to avoid deadlock. 接口如下.

1
2
template< class Lockable1, class Lockable2, class... LockableN >
void lock( Lockable1& lock1, Lockable2& lock2, LockableN&... lockn );
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class some_big_object{};
void swap(some_big_object& lhs,some_big_object& rhs){}
class X{
private:
some_big_object some_detail;
mutable std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs){
if(&lhs==&rhs) //这个判断很有必要,attempting to acquire a lock on std::mutex when you already hold it is undefined behavior.
return;
std::lock(lhs.m,rhs.m);
std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);
// std::adopt_lock 的用法: 告诉 std::lock_guard 对象, 传入的 mutex 对象已经被锁过了,
// 直接寻找锁即可, 而不用在构造 std::lock_guard 对象的过程中对 mutex 对象重新上锁
swap(lhs.some_detail,rhs.some_detail);
}
};

std::adopt_lock 是一种传入 std::lock_guard, std::unique_lock and std::shared_lock 等加锁对象的策略. 除了这种以外还有 std::defer_lock, std::try_to_lock. 三者从前往后分别代表加锁对象已经有锁的控制权了, 直接 adopt 即可, 不用重复初始化; 延迟获取锁的控制权, 后面通过 lock() 再锁; 如果没有被阻塞的话尝试获取锁的控制权. 它们都是 empty struct tag types.

对于 exception, If std::lock has successfully acquired a lock on one mutex and an exception is thrown when it tries to acquire a lock on the other mutex, this first lock is released automatically: std::lock provides all-or-nothing semantics with regard to locking the supplied mutexes.

C++17 中有 std::scoped_lock(RAII 对象) 来代替 std::lockstd::lock_guard 的操作. 其接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
namespace std {
template<class... MutexTypes>
class scoped_lock {
public:
using mutex_type = Mutex; // If MutexTypes... consists of the single type Mutex

explicit scoped_lock(MutexTypes&... m);
explicit scoped_lock(adopt_lock_t, MutexTypes&... m);
~scoped_lock();

//move only
scoped_lock(const scoped_lock&) = delete;
scoped_lock& operator=(const scoped_lock&) = delete;

private:
// use tuple as container
tuple<MutexTypes&...> pm; // exposition only
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class X{
private:
some_big_object some_detail;
mutable std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}

friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::scoped_lock guard(lhs.m,rhs.m);//C++17还带类型推断,等效写成下面
//std::scoped_lock<std::mutex,std::mutex> guard(lhs.m,rhs.m);
swap(lhs.some_detail,rhs.some_detail);
}
};

本节做法的限制: need to acquire two or more locks together, it doesn’t help if they’re acquired separately. 分开锁的情况下, 需要自己去避免死锁.

3.2.5 Further guidelines for avoiding deadlock

死锁不止会发生在 std::mutex 上也会发生在 thread 的 join() 上. 甚至不仅仅发生一对之间: a cycle of three or more threads will still cause deadlock.
通用原则:

Don’t wait for another thread if there’s a chance it’s waiting for you.

  • AVOID NESTED LOCKS 避免嵌套锁
    don’t acquire a lock if you already hold one.

  • AVOID CALLING USER-SUPPLIED CODE(未知的 code) WHILE HOLDING A LOCK

你无法确定用户代码里的情况,最好不要在有锁的时候调用用户代码, 但是很多时候这么做不太现实.

  • ACQUIRE LOCKS IN A FIXED ORDER
    acquire them in the same order in every thread.

我觉得这里作者的意思如果更抽象一点的话是根据数据结构已经目的, 所有线程上的操作都是采用一致的加锁策略. 例如双向链表的删除操作可以每次 lock 三个操作包含的 node. 对于 reverse 操作, 每次 lock 住前后 2 个 node 即可. 当然这里要对每个 node 分别加锁. 必须注意 nodes must always be locked in the same order(否则有可能 they could deadlock with each other in the middle of the list), 可能导致的问题如下图(删除中间 node).

Deadlock_with_threads_traversing_a_list_in_opposite_orders.png

  • USE A LOCK HIERARCHY
    设置 lock 等级, 高级的必须先锁住低级的. 对于同级别的: This does mean that you can’t hold two locks at the same time if they’re the same level in the hierarchy.

下面是自定义的 hierarchical_mutex 类型, 只要满足 lock(), unlock(), and try_lock() 接口就可以成为 Lockable 类型.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <mutex>
#include <stdexcept>
#include <climits>

class hierarchical_mutex{
std::mutex internal_mutex;
unsigned long const hierarchy_value;
unsigned long previous_hierarchy_value; //方便 unlock() 的时候 restore hierarchy_value
static thread_local unsigned long this_thread_hierarchy_value;

void check_for_hierarchy_violation(){
if(this_thread_hierarchy_value <= hierarchy_value)
{
throw std::logic_error("mutex hierarchy violated");
}
}
void update_hierarchy_value()
{
previous_hierarchy_value=this_thread_hierarchy_value;
this_thread_hierarchy_value=hierarchy_value;
}
public:
explicit hierarchical_mutex(unsigned long value):
hierarchy_value(value),
previous_hierarchy_value(0){}
void lock()
{
check_for_hierarchy_violation();
internal_mutex.lock();
update_hierarchy_value();
}
void unlock(){
this_thread_hierarchy_value=previous_hierarchy_value;
internal_mutex.unlock();}
bool try_lock(){
check_for_hierarchy_violation();
if(!internal_mutex.try_lock())
return false;
update_hierarchy_value();
return true;
}
};
thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);//max number

int main()
{
hierarchical_mutex m1(42);
hierarchical_mutex m2(2000);
}

这种实现 Although detection is a runtime check, it’s at least not timing-dependent.

使用等级 mutex.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
hierarchical_mutex high_level_mutex(10000);
hierarchical_mutex low_level_mutex(5000);

int do_low_level_stuff(){
return 42;}
int low_level_func()
{
std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
return do_low_level_stuff();}

void high_level_stuff(int some_param){}
void high_level_func()
{
std::lock_guard<hierarchical_mutex> lk(high_level_mutex);
high_level_stuff(low_level_func());}
void thread_a()
{
high_level_func();}
hierarchical_mutex other_mutex(100);
void do_other_stuff()
{}
void other_stuff()
{
high_level_func();
do_other_stuff();}
void thread_b()
{
std::lock_guard<hierarchical_mutex> lk(other_mutex);
other_stuff();} //run-time error, 不能通过先高等级再低等级
  • EXTENDING THESE GUIDELINES BEYOND LOCKS

thread 与 thread 之间也要注意死锁.

3.2.6 Flexible locking with std::unique_lock

接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
namespace std {
template<class Mutex>
class unique_lock {
public:
using mutex_type = Mutex;

// construct/copy/destroy
unique_lock() noexcept;
explicit unique_lock(mutex_type& m);
unique_lock(mutex_type& m, defer_lock_t) noexcept;
unique_lock(mutex_type& m, try_to_lock_t);
unique_lock(mutex_type& m, adopt_lock_t);
template<class Clock, class Duration>
unique_lock(mutex_type& m, const chrono::time_point<Clock, Duration>& abs_time);
template<class Rep, class Period>
unique_lock(mutex_type& m, const chrono::duration<Rep, Period>& rel_time);
~unique_lock();

unique_lock(const unique_lock&) = delete;
unique_lock& operator=(const unique_lock&) = delete;

unique_lock(unique_lock&& u) noexcept;
unique_lock& operator=(unique_lock&& u);

// locking
void lock();
bool try_lock();

template<class Rep, class Period>
bool try_lock_for(const chrono::duration<Rep, Period>& rel_time);
template<class Clock, class Duration>
bool try_lock_until(const chrono::time_point<Clock, Duration>& abs_time);

void unlock();

// modifiers
void swap(unique_lock& u) noexcept;
mutex_type* release() noexcept;

// observers
bool owns_lock() const noexcept;
explicit operator bool () const noexcept;
mutex_type* mutex() const noexcept;

private:
mutex_type* pm; // exposition only
bool owns; // exposition only
};

template<class Mutex>
void swap(unique_lock<Mutex>& x, unique_lock<Mutex>& y) noexcept;
}

std::unique_lockstd::lock_guard 更灵活, 可以控制对锁加锁的时机. 通过 std::std::defer_lock, std::std::adopt_lock, std::std::try_to_lock, 并且多出了一个 flag 控制 mutex 是否属于当前实例(owns_lock() 查询),是的话 destructor 必须 unlock, 不是的话不能 unlock.

为了维护这个 flag 的代价就是 std::unique_lock 更消耗资源(overhead).

进一步地 std::unique_lock 提供了对 mutex 的加锁(lock()try_lock())和解锁(unlock())操作, 同时可以配合条件变量 condition variable 使用, 使之自由度变高很多. 与 std::scoped_lock 不同的是 std::unique_lock 更严格: If the instance does own the mutex, the destructor must call unlock(), and if the instance does not own the mutex, it must not call unlock(). 同时, 也没有 flag 这个成员变量的 overhead. 因此可以考虑在满足需求的前提下, 使用 C++17 的 std::scoped_lock 代替它(毕竟 varaic 也包括 1). 需要使用 std::unique_lock 的两种场景的例子:

  • deferred locking.
1
2
3
std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock);
std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock);
std::lock(lock_a,lock_b);
  • the ownership of the lock needs to be transferred from one scope to another.

3.2.7 Transferring mutex ownership between scopes

std::unique_lock 也是 moveable but not copyable(这也许就是 unique 的出处, 与 std::unique_ptr 类似), 因此传递其值时要么 std::move(), 要么本身是 rvalue.

常用于锁住预处理数据后返回一个 std::unique_lock 的实例给 caller 进行进一步的处理(当然还是在锁住的状态下). 例子如下:

1
2
3
4
5
6
7
8
9
10
std::unique_lock<std::mutex> get_lock(){
extern std::mutex some_mutex; //确保 lifetime 与 scope visible
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk;
}
void process_data(){
std::unique_lock<std::mutex> lk(get_lock());//rvalue 初始化
do_something();
}

这样 std::unique_lock 可以作为一个 gateway, 即访问的大门, 一切想要访问共享数据的操作都可以通过这个 gateway 进行. 并且支持 move, 当然还有销毁.

std::unique_lock 支持在自己被析构之前放弃 mutex (成员函数 unlock() 或者 std::lock() ), 因为有些场景确定需要解锁, 否则可能一直阻塞可以运行的其他线程.

3.2.8 Locking at an appropriate granularity

Lock a mutex only while accessing the shared data; try to do any processing of the data outside the lock.

例子: In particular, don’t do any time-consuming activities like file I/O while holding a lock.

总结下来就是

  • 只对需要加锁的数据加锁.
  • 尽量保证访问 lock 的时间短. 如果访问有间隔可以考虑使用 std::unique_lock, 用完 unlock 再次用 lock.
1
2
3
4
5
6
7
8
9
void get_and_process_data()
{
std::unique_lock<std::mutex> my_lock(the_mutex);
some_class data_to_process=get_next_data_chunk();
my_lock.unlock(); //不必要的时候, unlock()
result_type result=process(data_to_process);
my_lock.lock();
write_result(data_to_process,result);
}

还有如果将 int 这种 copy 出来代价不高的操作, 可以考出来操作, 而不是一直锁住所有操作都在锁里进行, 下面是例子代码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Y{
private:
int some_detail;

mutable std::mutex m;

int get_detail() const{
std::lock_guard<std::mutex> lock_a(m);
return some_detail; //只对取值的部分加锁
}

public:
Y(int sd):some_detail(sd){}

friend bool operator==(Y const& lhs, Y const& rhs){
if(&lhs==&rhs)
return true;
int const lhs_value=lhs.get_detail();
int const rhs_value=rhs.get_detail();
return lhs_value==rhs_value; //在锁的外面进行比较操作
}
};

但是这个策略也有 drawback, 那就是语义上可能没意义了. 因为可能在平行的时间点上两个值从来没有相等过, 但是由于 lock 时间太短导致比较的时间点是不一致的.

3.3 Alternative facilities for protecting shared data

进针对初始化时期的机制: C++ Standard provides a mechanism purely for protecting shared data during initialization.

3.3.1 Protecting shared data during initialization

只初始化时可能造成 race condition 的场景. 例如多个线程都在等一个数据结构初始化, 初始化结束后变成 read-only 不用考虑后面的 race condition 了. 再接着锁没有必要.

Lazy initialization 的单线程思路:

1
2
3
4
5
6
7
8
9
std::shared_ptr<some_resource> resource_ptr;
void foo()
{
if(!resource_ptr)
{
resource_ptr.reset(new some_resource);
}
resource_ptr->do_something();
}

多线程下的直接转换, 也就是粗暴全部锁住, 但是缺点是所有的线程都要锁住并检查, 造成大量的阻塞:

1
2
3
4
5
6
7
8
9
10
11
12
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo()
{
std::unique_lock<std::mutex> lk(resource_mutex);
if(!resource_ptr)
{
resource_ptr.reset(new some_resource);
}
lk.unlock();
resource_ptr->do_something();
}

想当然的改进: double-checked locking pattern, 在锁住之前先 check 有没有被 release, 如果其他线程已经帮他初始化好了, 那就不需要锁住阻塞其他线程了.

1
2
3
4
5
6
7
8
9
void undefined_behaviour_with_double_checked_locking(){
if(!resource_ptr){
std::lock_guard<std::mutex> lk(resource_mutex);
if(!resource_ptr){
resource_ptr.reset(new some_resource);
}
}
resource_ptr->do_something();
}

但是这是个 Anti-pattern, 表面上看起来 work, 其实是 broken 的. 点在于 resource_ptr.reset(new some_resource); 这个过程不是原子性的. 具体分析可以参考 Scott Meyers 与 Andrei Alexandrescu 关于 DCLP 论述的 paper.

针对这种情况, C++ 标准库添加了 std::once_flag(状态类, 作为一个 flag 比锁要效率很多) 和 std::call_once(模板函数) 专门解决这个问题. std::call_once 里通过入参里的 Callable 与 … Args 实现一次性的初始化, 然后将已经完成初始化的状态传给 std::once_flag 实例, 这样其他线程只要去查询这个 std::once_flag 的状态就好了.

接口如下:

1
2
3
4
5
6
7
8
9
10
11
namespace std {
template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args );

struct once_flag {
constexpr once_flag() noexcept;

once_flag(const once_flag&) = delete;
once_flag& operator=(const once_flag&) = delete;
};
}

改进后的代码例子:

1
2
3
4
5
6
7
8
9
10
11
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;
void init_resource()
{
resource_ptr.reset(new some_resource);
}
void foo()
{
std::call_once(resource_flag,init_resource);//初始化只进行一次
resource_ptr->do_something();
}

C++11 还解决了一个可能的问题, static 数据初始化多线程都去做这件事导致 race condition, C++11 会保证只有 1 个线程能初始化它: the initialization is defined to happen on exactly one thread, and no other threads will proceed until that initialization is complete. 也就是被标准化了.

1
2
3
4
5
6
class my_class;
my_class& get_my_class_instance()
{
static my_class instance; //多个线程 call get_my_class_instance, 标准保证只有一个线程能初始化 instance
return instance;
}

3.3.2 Protecting rarely updated data structures

上面解决的是只需要初始化 1 次就不会变的数据结构, 还有一种情况, 大多数情况下都是 read, 偶尔 write. 典型的例子是 DNS 解析表. 如果能区分读与写, 只在写的时候进行锁, 读的时候不锁就好了. 下面是实现这样功能的 mutex(与 std::mutex 一样的 Lockable 类型).

C++11: boost shared_mutex.

C++14: std::shared_timed_mutex std::shared_lock(不是 Lockable 类型, 与 std::unique_lock 类似, 用于控制 shared_mutex).

C++17: std::shared_mutex and std::shared_timed_mutex(在 std::shared_mutex 增加了一些时间控制选项).

接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
namespace std {
class shared_mutex {
public:
shared_mutex();
~shared_mutex();

// move only
shared_mutex(const shared_mutex&) = delete;
shared_mutex& operator=(const shared_mutex&) = delete;

// exclusive ownership
void lock(); // blocking
bool try_lock();
void unlock();

// shared ownership
void lock_shared(); // blocking
bool try_lock_shared();
void unlock_shared();

using native_handle_type = /* implementation-defined */;
native_handle_type native_handle();
};

class shared_timed_mutex {
public:
shared_timed_mutex();
~shared_timed_mutex();

// move only
shared_timed_mutex(const shared_timed_mutex&) = delete;
shared_timed_mutex& operator=(const shared_timed_mutex&) = delete;

// exclusive ownership
void lock(); // blocking
bool try_lock();
template<class Rep, class Period>
bool try_lock_for(const chrono::duration<Rep, Period>& rel_time);
template<class Clock, class Duration>
bool try_lock_until(const chrono::time_point<Clock, Duration>& abs_time);
void unlock();

// shared ownership
void lock_shared(); // blocking
bool try_lock_shared();
template<class Rep, class Period>
bool try_lock_shared_for(const chrono::duration<Rep, Period>& rel_time);
template<class Clock, class Duration>
bool try_lock_shared_until(const chrono::time_point<Clock, Duration>& abs_time);
void unlock_shared();
};

template<class Mutex>
class shared_lock {
public:
using mutex_type = Mutex; //当只传入一个锁时, 可以用

// construct/copy/destroy
shared_lock() noexcept;
explicit shared_lock(mutex_type& m); // blocking
shared_lock(mutex_type& m, defer_lock_t) noexcept;
shared_lock(mutex_type& m, try_to_lock_t);
shared_lock(mutex_type& m, adopt_lock_t);
template<class Clock, class Duration>
shared_lock(mutex_type& m, const chrono::time_point<Clock, Duration>& abs_time);
template<class Rep, class Period>
shared_lock(mutex_type& m, const chrono::duration<Rep, Period>& rel_time);
~shared_lock();

shared_lock(const shared_lock&) = delete;
shared_lock& operator=(const shared_lock&) = delete;

shared_lock(shared_lock&& u) noexcept;
shared_lock& operator=(shared_lock&& u) noexcept;

// locking
void lock(); // blocking
bool try_lock();
template<class Rep, class Period>
bool try_lock_for(const chrono::duration<Rep, Period>& rel_time);
template<class Clock, class Duration>
bool try_lock_until(const chrono::time_point<Clock, Duration>& abs_time);
void unlock();

// modifiers
void swap(shared_lock& u) noexcept;
mutex_type* release() noexcept;

// observers
bool owns_lock() const noexcept;
explicit operator bool () const noexcept;
mutex_type* mutex() const noexcept;

private:
mutex_type* pm; // exposition only
bool owns; // exposition only
};

template<class Mutex>
void swap(shared_lock<Mutex>& x, shared_lock<Mutex>& y) noexcept;
}

动作机制: Any thread has a shared lock, a thread that tries to acquire an exclusive lock will block until all other threads have released their locks, and likewise if any thread has an exclusive lock, no other thread may acquire a shared or exclusive lock until the first thread has released its lock.

例子代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class dns_cache{
std::map<std::string,dns_entry> entries;
std::shared_mutex entry_mutex;
public:
dns_entry find_entry(std::string const& domain){
std::shared_lock<std::shared_mutex> lk(entry_mutex);//std::shared_lock 用于 shared, read-only access
std::map<std::string,dns_entry>::const_iterator const it=
entries.find(domain);
return (it==entries.end())?dns_entry():it->second;
}

void update_or_add_entry(std::string const& domain,
dns_entry const& dns_details){
std::lock_guard<std::shared_mutex> lk(entry_mutex);//std::lock_guard 用于 updates
entries[domain]=dns_details;
}
};

3.3.3 Recursive locking

如果想要在一个实例里多次 lock, unlock 该怎么办? 首先直接用用 mutex 是非法的. C++ 标准提供了 std::recursive_mutex 可以让一个实例多次 lock. 限制是 You must release all your locks before the mutex can be locked by another thread, so if you call lock() three times, you must also call unlock() three times.

应用这个的一个典型场景: 类中一个成员函数已经 lock 了成员数据结构, 但是它里面调用的另一个成员函数也会去 lock, 重复地 lock 是 undefined behavior. 简单直接的解决办法是把 std::mutex 之类的 mutex 换成 std::recursive_mutex. 但是作者比较排斥使用 std::recursive_mutex , 因为这会让人停止思考从根本解决这个问题的动力, 例如提供一个 private 成员函数专门用来 lock 其他的成员函数如果想要 lock 就都去调用它即可.

C++并发实战-第一至第三章-总学习笔记第1

https://www.chuxin911.com/Notes_On_C++_Concurrency_in_Action_1_20211108/

作者

cx

发布于

2021-11-08

更新于

2022-10-20

许可协议