C++并发实战-第八章-总学习笔记第6

C++并发实战-第八章-总学习笔记第6

[TOC]
C++ 并发实战《C++ Concurrency in Action》的学习笔记 6, 记录第八章的部分 Designing concurrent code.
内容是:

  • 线程间如何分数据. Techniques for dividing data between threads.
  • 影响并发代码的因素. Factors that affect the performance of concurrent code.
  • 影响因素如何影响数据结构的设计. How performance factors affect the design of data structures.
  • 异常安全. Exception safety in multithreaded code.
  • Scalability.
  • Example implementations of several parallel algorithms.

Chapter 8 Designing concurrent code

8.1 Techniques for dividing work between threads

类比于建房子请工人 You need to decide whether to have “generalist” threads that do whatever work is necessary at any point in time or “specialist” threads that do one thing well, or some combination.

8.1.1 Dividing data between threads before processing begins

map-reduce 开局就均分法:

f8_1.png

实际应用举例: Message Passing Interface (MPI, http://www.mpi-forum.org/) or OpenMP (http://www.openmp.org/) frameworks.

对最后的 reduction step 也可以考虑使用多线程: reduction operation: the worker threads could be made to perform some of the reduction steps as each one completes its task,
rather than spawning new threads each time.

8.1.2 Dividing data recursively

实际例子: Quicksort algorithm, These recursive calls are entirely independent.

f8_2.png

决定线程数分配的策略:

  • std::async() 自动分配决定分配 thread 的策略.
  • use the std::thread::hardware_concurrency() function to choose the number of threads.
    If a thread has nothing else to do, either because it has finished processing all its chunks or because it’s waiting for a chunk to be sorted, it can take a chunk from the stack and sort that.

一个非 production-ready code 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include <thread>
#include <vector>
#include <future>
#include <list>
template <typename T>
struct sorter
{
struct chunk_to_sort
{
std::list<T> data;
std::promise<std::list<T>> promise;
};

thread_safe_stack<chunk_to_sort> chunks;
std::vector<std::thread> threads;
unsigned const max_thread_count;
std::atomic<bool> end_of_data;

sorter() : max_thread_count(std::thread::hardware_concurrency() - 1),
end_of_data(false)
{
}

~sorter()
{
end_of_data = true;
for (unsigned i = 0; i < threads.size(); ++i)
{
threads[i].join();
}
}

void try_sort_chunk()
{
boost::shared_ptr<chunk_to_sort> chunk = chunks.pop();
if (chunk)
{
sort_chunk(chunk);
}
}

std::list<T> do_sort(std::list<T> &chunk_data)
{
if (chunk_data.empty())
{
return chunk_data;
}

std::list<T> result;
result.splice(result.begin(), chunk_data, chunk_data.begin());
T const &partition_val = *result.begin();

typename std::list<T>::iterator divide_point =
std::partition(chunk_data.begin(), chunk_data.end(),
[&](T const &val)
{ return val < partition_val; });
chunk_to_sort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(),
chunk_data, chunk_data.begin(),
divide_point);

std::future<std::list<T>> new_lower =
new_lower_chunk.promise.get_future();
chunks.push(std::move(new_lower_chunk)); // rather than spawning a new thread for one chunk, it pushes it onto the stack.
if (threads.size() < max_thread_count) // spawns a new thread while you still have processors to spare.
{
threads.push_back(std::thread(&sorter<T>::sort_thread, this));
}

std::list<T> new_higher(do_sort(chunk_data));

result.splice(result.end(), new_higher);
while (new_lower.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready) // Because the lower chunk might be handled by another thread, you then have to wait for it to be ready.
{
try_sort_chunk(); // try to process chunks from the stack on this thread while you’re waiting.
}

result.splice(result.begin(), new_lower.get());
return result;
}

void sort_chunk(boost::shared_ptr<chunk_to_sort> const &chunk)
{
chunk->promise.set_value(do_sort(chunk->data)); // ready to be picked up by the thread that posted the chunk on the stack
}

void sort_thread()
{
while (!end_of_data)
{
try_sort_chunk();
std::this_thread::yield(); // In between checking, they yield to other threads to give them a chance to put some more work on the stack.
}
}
};

template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if (input.empty())
{
return input;
}
sorter<T> s;
return s.do_sort(input);
}

上面做法的问题:

  • the management of these threads and the communication between them add quite a lot of complexity to the code.
  • threads are processing separate data elements, they all access the stack to add new chunks and to remove chunks for processing. ==> reduce performance

这种做法其实是线程池: This approach is a specialized version of a thread pool.

到目前介绍的两种分数据都是建立在假设: the data itself is fixed beforehand.

对于 dynamically generated or is coming from external input 如何做? ==> it might make more sense to divide the work by task type rather than dividing based on the data.

8.1.3 Dividing work by task type

好处: Occasionally other threads may give it data or trigger events that it needs to handle, but in general each thread focuses on doing one thing well. In itself, this is basic good design; each piece of code should have a single responsibility.

DIVIDING WORK BY TASK TYPE TO SEPARATE CONCERNS

对于单线程下的多任务系统:

  • In the single-threaded world you end up manually writing code that performs a bit of task A, a bit of task B, checks for key presses, checks for incoming network packets, and then loops back to perform another bit of task A.
  • This means that the code for task A ends up being complicated by the need to save its state and return control to the main loop periodically.
  • If you add too many tasks to the loop, things might slow down too much, and the user may find it takes too long to respond to the key press.

多线程下, 分清楚任务在不同的线程上可以提升效率:

  • The operating system will automatically save the state and switch to task B or C when appropriate, and if the target system has multiple cores or processors, tasks A and B may be able to run concurrently.
  • simpler code because each thread can focus on doing operations related directly to its responsibilities, rather than getting mixed up with control flow and user interaction.

虽然各线程无法实现完全的独立, 但通过线程间的通讯与同步可以解决这个问题.

通信的问题: too much communication between threads, 两种表现:

  1. there is a lot of data shared between the threads or
  2. the different threads end up waiting for each other.

解决思路:

  1. 创建新线程 If all the communication relates to the same issue, maybe that should be the key responsibility of a single thread and extracted from all the threads that refer to it.
  2. 合并线程 if two threads are communicating a lot with each other but much less with other threads, maybe they should be combined into a single thread.

DIVIDING A SEQUENCE OF TASKS BETWEEN THREADS

pipeline: data flows in at one end through a series of operations (pipes) and out at the other end.

通信的方式: the data element is put in a queue to be picked up by the next thread.

这种方式好处: 除了刚开动时比较慢, 实现的效果是 You now get 1 item processed every 3 seconds rather than having the items processed in batches of 4 every 12 seconds. 单个 item 处理的时间虽然不变但是可以一个个拿到而不是一个 batch 拿到.
But smoother, more regular processing can be beneficial in some circumstances. 例如看视频不是 100 帧加一个延迟, 而是 using a pipeline that outputs frames at a nice steady rate is probably preferable.

8.2 Factors affecting the performance of concurrent code

每个因素的细微的变化都有可能导致重大性能影响: even something as simple as changing which data elements are processed by each thread (while keeping everything else identical) can have a dramatic effect on performance.

8.2.1 How many processors?

To allow applications to scale the number of threads in line with the number of threads the hardware can run concurrently, the C++11 Standard Thread Library provides std::thread::hardware_concurrency().

consult your system documentation to see what options are available to you.

并行任务的复杂度与计算单元数目之间的关系: One additional twist to this situation is that the ideal algorithm for a problem can depend on the size of the problem compared to the number of processing units.
具体地, If you have a massively parallel system with many processing units, an algorithm that performs more operations overall may finish more quickly than one that performs fewer operations, because each processor performs only a few operations.

8.2.2 Data contention and cache ping-pong

cache 同步问题: the data will be copied into their respective caches, and both processors can proceed. But if one of the threads modifies the data, this change then has to propagate to the cache on the other core, which takes time.

如下例子:

1
2
3
4
5
6
7
8
std::atomic<unsigned long> counter(0);
void processing_loop()
{
while(counter.fetch_add(1,std::memory_order_relaxed)<100000000)
{
do_something();
}
}

high contention: If do_something() is short enough, or if there are too many processors running this code(前提), the processors might find themselves waiting for each other(注意此处的主体对象是 processors 而不是线程); one processor is ready to update the value, but another processor is currently doing that, so it has to wait until the second processor has completed its update and the change has propagated.

In a loop like this one, the data for counter will be passed back and forth between the caches many times. This is called cache ping-pong. 对性能的影响: If a processor stalls because it has to wait for a cache transfer, it can’t do any work in the meantime, even if there are other threads waiting that could do useful work. 整个 Processor 的停顿.

这种现象也会出现在 mutex 情境下: extra transfer time: In order to lock the mutex, another thread must transfer the data that makes up the mutex to its processor and modify it. ==> if the data and mutex are accessed by more than one thread, then as you add more cores and processors to the system, it becomes more likely that you will get high contention and one processor having to wait for another.

mutex 下比 atomic operation 情况要好一些的原因: The effects of contention with mutexes are usually different from the effects of contention with atomic operations for the simple reason that the use of a mutex naturally serializes threads at the operating system level rather than at the processor level. If you have enough threads ready to run, the operating system can schedule another thread to run while one thread is waiting for the mutex, whereas a processor stall prevents any threads from running on that processor. 最多阻碍线程跑在一个特定的 processor 上, 但是 OS 还是可以安排一些线程跑在其他 processor 上.

即便是读写锁 a single-writer, multiple-reader mutex 也会在 reading 过程中受到 cache ping-pong 现象的影响.
具体地: Even reader threads still have to modify the mutex itself. As the number of processors accessing the data goes up, the contention on the mutex itself increases, and the cache line holding the mutex must be transferred between cores, potentially increasing the time taken to acquire and release locks to undesirable levels.

解决 cache ping-pong 的原则: do what you can to reduce the potential for two threads competing for the same memory location.

但是也不是那么容易的, 即便 follow 这个原则, Even if a particular memory location is only ever accessed by one thread, you can still get cache ping-pong due to an effect known as false sharing.

8.2.3 False sharing

cache lines: unit by blocks of memory, typically 32 or 64 bytes in size.

Because the cache hardware only deals in cache-line-sized blocks of memory, small data items in adjacent memory locations will be in the same cache line. But if the data items in a cache line are unrelated and need to be accessed by different threads, this can be a major cause of performance problems.

an array of int values: even though each thread only accesses its own array entry, the cache hardware still has to play cache ping-pong.

定义: The cache line is shared, even though none of the data is, hence the term false sharing.
解决思路: 只把同一个线程处理的数据放在相邻的位置上(cache line里). The solution here is to structure the data so that data items to be accessed by the same thread are close together in memory (and thus more likely to be in the same cache line).

C++ 17 提供了解决工具: The C++17 standard defines std::hardware_destructive_interference_size in the header <new>, which specifies the maximum number of consecutive bytes that may be subject to false sharing for the current compilation target. If you ensure that your data is at least this number of bytes apart, then there will be no false sharing.

8.2.4 How close is your data?

data proximity: if the data accessed by a single thread is spread out in memory, it’s likely that it lies on separate cache lines. On the flip side, if the data accessed by a single thread is close together in memory, it’s more likely to lie on the same cache line. 空间相邻性.
空间相邻性差的结果:

  • increase memory access latency
  • cache miss up

从单线程的此问题到多线程情景下: task switching: when the processor switches threads, it’s more likely to have to reload the cache lines if each thread uses data spread across multiple cache lines than if each thread’s data is close together in the same cache line.(前提 If there are more threads than cores in the system).

还有一种情况下, 不同 processors 之间 threads 切换导致的 cache line transfering: the operating system might also choose to schedule a thread on one core for one time slice and then on another core for the next time slice. This will therefore require transferring the cache lines for that thread’s data from the cache for the first core to the cache for the second; the more cache lines that need transferring, the more time-consuming this will be.

8.2.5 Oversubscription and excessive task switching

Oversubscription can arise when you have a task that repeatedly spawns new threads without limits.
由于此现象所以怎么分任务就显得很重要. choosing the appropriate division may require more knowledge of the target platform than you have available and is only worth doing if performance is unacceptable and it can be demonstrated that changing the division of work does improve performance. 有时候需要通过测试来最终确定.

8.3 Designing data structures for multithreaded performance

三个注意考虑因素: contention, false sharing, and data proximity.

8.3.1 Dividing array elements for complex operations

例子, 矩阵相乘运算如何分配: It may therefore be better to divide the result matrix into small, square or almost square blocks rather than have each thread compute the entirety of a small number of rows. You can adjust the size of each block at runtime, depending on the size of the matrices and the available number of processors.

原则除了细心检查别无他法: The same principles apply to any situation where you have large blocks of data to divide between threads; look at all the aspects of the data access patterns carefully, and identify the potential causes of performance hits.

8.3.2 Data access patterns in other data structures

原则:

  • Try to adjust the data distribution between threads so that data that’s close together is worked on by the same thread.
  • Try to minimize the data required by any given thread.
  • Try to ensure that data accessed by separate threads is sufficiently far apart to avoid false sharing using std::hardware_destructive_interference_size as a guide.

一些思路:

  • 用指针作为线程处理的第一层数据
    if the tree nodes only contain pointers to the real data held at the node, then the processor only has to load the data from memory if it’s needed.
  • 把 mutex 与其处理的数据放在一起
    If the mutex and the data items are close together in memory, this is ideal for a thread that acquires the mutex; the data it needs may already be in the processor cache, because it was loaded in order to modify the mutex.
    But there’s also a downside: if other threads try to lock the mutex while it’s held by the first thread, they’ll need access to that memory. Mutex locks are typically implemented as a read-modify-write atomic operation on a memory location within the mutex to try to acquire the mutex, followed by a call to the operating system kernel if the mutex is already locked. This read-modify-write operation may cause the data held in the cache by the thread that owns the mutex to be invalidated. 例如 DCAS 会增加一些数据进去, 改变原先的数据.

检查方法: One way to test whether this kind of false sharing is a problem is to add huge blocks of padding between the data elements that can be concurrently accessed by different threads.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct protected_data
{
std::mutex m;
char padding[std::hardware_destructive_interference_size];
my_data data_to_protect;
};

//or
struct my_data
{
data_item1 d1;
data_item2 d2;
char padding[std::hardware_destructive_interference_size];
};
my_data some_array[256];

If this improves the performance, you know that false sharing was a problem, and you can either leave the padding in or work to eliminate the false sharing in another way by rearranging the data accesses.

8.4 Additional considerations when designing for concurrency

除了上面的因素还有 2 个方面需要考虑:

  1. scalability: Code is said to be scalable if the performance (whether in terms of reduced speed of execution or increased throughput) increases as more processing cores are added to the system.

  2. exception safety is a matter of correctness.

8.4.1 Exception safety in parallel algorithms

与单线程不同, 多线程下的异常无法传递到 caller. By contrast, in a parallel algorithm many of the operations will be running on separate threads. In this case, the exception can’t be allowed to propagate because it’s on the wrong call stack. If a function spawned on a new thread exits with an exception, the application is terminated.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <vector>
#include <thread>
#include <algorithm>
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result);
}
};

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);

if(!length)
return init;

unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;

unsigned long const hardware_threads=
std::thread::hardware_concurrency();

unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);

unsigned long const block_size=length/num_threads;

std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads-1);

Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
threads[i]=std::thread(
accumulate_block<Iterator,T>(),
block_start,block_end,std::ref(results[i]));
block_start=block_end;
}
accumulate_block<Iterator,T>()(block_start,last,results[num_threads-1]);

std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));

return std::accumulate(results.begin(),results.end(),init);
}

主要在 2 个地方不是异常安全的:

  1. thread 创建的过程中出现异常会直接 std::terminate.
  2. the calls to accumulate_block on the new threads might throw. 因为没有 catch 语句, 异常不会传播.

ADDING EXCEPTION SAFETY

使用 std::packaged_task and std::future 的组合:

Listing 8.3 A parallel version of std::accumulate using std::packaged_task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <algorithm>
#include <thread>
#include <vector>
#include <future>
template<typename Iterator,typename T>
struct accumulate_block
{
T operator()(Iterator first,Iterator last) //returns the result directly, rather than taking a reference to somewhere to store it(transfer exception as well)
{
return std::accumulate(first,last,T()); //explicitly pass a default-constructed T
}
};

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);

if(!length)
return init;

unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;

unsigned long const hardware_threads=
std::thread::hardware_concurrency();

unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);

unsigned long const block_size=length/num_threads;

std::vector<std::future<T> > futures(num_threads-1);
std::vector<std::thread> threads(num_threads-1);

Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
std::packaged_task<T(Iterator,Iterator)> task(
accumulate_block<Iterator,T>());
futures[i]=task.get_future();
threads[i]=std::thread(std::move(task),block_start,block_end);
block_start=block_end;
}
T last_result=accumulate_block<Iterator,T>(block_start,last);

std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));

T result=init;
//get the values out of the futures, it’s now simpler to use a basic for loop rather than std::accumulate
for(unsigned long i=0;i<(num_threads-1);++i)
{
result+=futures[i].get();
}
result += last_result;
return result;
}

结果: If more than one of the worker threads throws an exception, only one will be propagated. 或者 you can use something like std::nested_exception to capture all the exceptions and throw that instead.

另一个问题: The remaining problem is the leaking threads if an exception is thrown between when you spawn the first thread and when you’ve joined with them all.
方法: The simplest solution is to catch any exceptions, join with the threads that are still joinable(), and rethrow the exception.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
try
{
for(unsigned long i=0;i<(num_threads-1);++i)
{
// ... as before
}
T last_result=accumulate_block<Iterator,T>()(block_start,last);
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));
}
catch(...)
{
for(unsigned long i=0;i<(num_thread-1);++i)
{
if(threads[i].joinable())
thread[i].join();
}
throw;
}

这种做法有 duplicate code, 更好的方式 let’s extract this out into the destructor of an object.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_):
threads(threads_)
{}
~join_threads()
{
for(unsigned long i=0;i<threads.size();++i)
{
if(threads[i].joinable())
threads[i].join();
}
}
};

修改后的代码, Listing 8.4 An exception-safe parallel version of std::accumulate:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <iterator>
#include <vector>
#include <thread>
#include <future>
#include <numeric>

struct join_threads
{
join_threads(std::vector<std::thread>&)
{}
};

template<typename Iterator,typename T>
struct accumulate_block
{
T operator()(Iterator first,Iterator last)
{
return std::accumulate(first,last,T());
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);

if(!length)
return init;

unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;

unsigned long const hardware_threads=
std::thread::hardware_concurrency();

unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);

unsigned long const block_size=length/num_threads;

std::vector<std::future<T> > futures(num_threads-1);
std::vector<std::thread> threads(num_threads-1);
join_threads joiner(threads); //container of threads, thus we can remove your explicit join loop

Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
std::packaged_task<T(Iterator,Iterator)> task(
accumulate_block<Iterator,T>());
futures[i]=task.get_future();
threads[i]=std::thread(std::move(task),block_start,block_end);
block_start=block_end;
}
T last_result=accumulate_block<Iterator,T>(block_start,last);
T result=init;
for(unsigned long i=0;i<(num_threads-1);++i)
{
result+=futures[i].get();
}
result += last_result;
return result;
}

EXCEPTION SAFETY WITH STD::ASYNC()

The key thing to note for exception safety is that if you destroy the future without waiting for it, the destructor will wait for the thread to complete. This neatly avoids the problem of leaked threads that are still executing and holding references to the data.

Listing 8.5 An exception-safe parallel version of std::accumulate using std::async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <future>
#include <algorithm>
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
unsigned long const max_chunk_size=25;
if(length<=max_chunk_size)
{
return std::accumulate(first,last,init);
}
else
{
Iterator mid_point=first;
std::advance(mid_point,length/2);
std::future<T> first_half_result=
std::async(parallel_accumulate<Iterator,T>,
first,mid_point,init);
T second_half_result=parallel_accumulate(mid_point,last,T()); //The second half of the range is handled with a direct recursive call
return first_half_result.get()+second_half_result; //Some of the “asynchronous” calls will be executed synchronously in the call to get()
}
}

使用的是 a recursive division of the data.

两个地方可能抛出异常需要关注:

  • If an exception is thrown by the recursive call, the future created from the call to std::async will be destroyed as the exception propagates. This will in turn wait for the asynchronous task to finish, avoiding a dangling thread.
  • On the other hand, if the asynchronous call throws, this is captured by the future, and the call to get() will rethrow the exception.

8.4.2 Scalability and Amdahl’s law

Scalability 底下产生的源泉: Every time one thread has to wait for something (whatever that something is), unless there’s another thread ready to take its place on the processor, you have a processor sitting idle that could be doing useful work.

对任务进行分类: A simplified way of looking at this is to divide the program into “serial” sections where only one thread is doing any useful work and “parallel” sections where all the available processors are doing useful work.
Amdahl’s law: “serial” sections constitute a fraction, $f_s$, of the program.
performance gain, $P$
using $N$ processors

$$
P=\frac{1}{f_s+\frac{1-f_s}{N}}
$$

得到定义: Scalability is about reducing the time it takes to perform an action or increasing the amount of data that can be processed in a given time as more processors are added.

2 种思路:

  1. reduce the size of the “serial” sections or reduce the potential for threads to wait,
  2. provide more data for the system to process, and thus keep the parallel sections primed with work.

8.4.3 Hiding latency with multiple threads

一些削减 wait 的思路:
if a thread is blocked because it’s waiting for an I/O operation to complete, it might make sense to use asynchronous I/O if that’s available, and then the thread can perform other useful work while the I/O is performed in the background.

In other cases, if a thread is waiting for another thread to perform an operation, then rather than blocking, the waiting thread might be able to perform that operation itself.

8.4.4 Improving responsiveness with concurrency

event-driven frameworks, 例如 modern graphical user interface.

1
2
3
4
5
6
7
while(true)
{
event_data event=get_event();
if(event.type==quit)
break;
process(event);
}

直观的想法:

  1. This means that either the task must periodically suspend itself and return control to the event loop,
  2. or the get_event()/process() code must be called from within the code at convenient points.

把执行任务的耗时较长的部分单独分离为 thread, 通过 event/message 与 interface 的部分通信: you can put the lengthy task on a whole new thread and leave a dedicated GUI thread to process the events. The threads can then communicate through simple mechanisms rather than having to somehow mix the event-handling code in with the task code.

Listing 8.6 Separating GUI thread from task thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <future>
#include <algorithm>
#include <thread>
std::thread task_thread;
std::atomic<bool> task_cancelled(false);

void gui_thread()
{
while(true)
{
event_data event=get_event();
if(event.type==quit)
break;
process(event);
}
}

void task()
{
while(!task_complete() && !task_cancelled)
{
do_next_operation();
}
if(task_cancelled)
{
perform_cleanup();
}
else
{
post_gui_event(task_complete);
}
}

void process(event_data const& event)
{
switch(event.type)
{
case start_task:
task_cancelled=false;
task_thread=std::thread(task);
break;
case stop_task:
task_cancelled=true;
task_thread.join();
break;
case task_complete:
task_thread.join();
display_results();
break;
default:
//...
}
}

8.5 Designing concurrent code in practice

8.5.1 A parallel implementation of std::for_each

Listing 8.7 A parallel version of std::for_each

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <future>
#include <algorithm>
struct join_threads
{
join_threads(std::vector<std::thread>&)
{}
};

template<typename Iterator,typename Func>
void parallel_for_each(Iterator first,Iterator last,Func f)
{
unsigned long const length=std::distance(first,last);

if(!length)
return;

unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;

unsigned long const hardware_threads=
std::thread::hardware_concurrency();

unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);

unsigned long const block_size=length/num_threads;

std::vector<std::future<void> > futures(num_threads-1); //futures vector stores std::future<void>, because the worker threads don’t return a value.
std::vector<std::thread> threads(num_threads-1);
join_threads joiner(threads);

Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
std::packaged_task<void(void)> task(
[=]()
{
std::for_each(block_start,block_end,f);
});
futures[i]=task.get_future();
threads[i]=std::thread(std::move(task));
block_start=block_end;
}
std::for_each(block_start,last,f);
for(unsigned long i=0;i<(num_threads-1);++i)
{
futures[i].get();
}
}

Listing 8.8 A parallel version of std::for_each using std::async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <future>
#include <algorithm>
template<typename Iterator,typename Func>
void parallel_for_each(Iterator first,Iterator last,Func f)
{
unsigned long const length=std::distance(first,last);

if(!length)
return;

unsigned long const min_per_thread=25;

if(length<(2*min_per_thread))
{
std::for_each(first,last,f);
}
else
{
Iterator const mid_point=first+length/2;
std::future<void> first_half=
std::async(&parallel_for_each<Iterator,Func>,
first,mid_point,f);
parallel_for_each(mid_point,last,f);
first_half.get();
}
}

8.5.2 A parallel implementation of std::find

特点: can complete without every element having been processed. ===> to interrupt the other tasks in some way when the answer is known.
Other algorithms in this category include std::equal and std::any_of.

思路: atomic variable as a flag and checking the flag after processing every element. ===> downside to this is that atomic loads can be slow operations.

遇到异常时, 需不需要在其他线程上继续完成搜索?

  1. 终止全部搜索: use std::promise to set the final result directly from the worker threads.
  2. 异常的线程终止, 其他线程继续: use std::packaged_task, store all the exceptions, and then rethrow one of them if a match isn’t found.

有一点需要注意的是 You need to wait for all the threads to finish before getting the result from the future. If you block on the future, you’ll be waiting forever if the value isn’t there.

Listing 8.9 An implementation of a parallel find algorithm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include <future>
#include <algorithm>
struct join_threads
{
join_threads(std::vector<std::thread>&)
{}
};

template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
struct find_element
{
void operator()(Iterator begin,Iterator end,
MatchType match,
std::promise<Iterator>* result,
std::atomic<bool>* done_flag)
{
try
{
for(;(begin!=end) && !done_flag->load();++begin)
{
if(*begin==match)
{
result->set_value(begin);
done_flag->store(true);
return;
}
}
}
catch(...)
{
try
{
result->set_exception(std::current_exception());
done_flag->store(true);
}
//Setting the value on the promise might throw an exception if the promise is already set,
//so you catch and discard any exceptions that happen here
catch(...)
{}
}
}
};

unsigned long const length=std::distance(first,last);

if(!length)
return last;

unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;

unsigned long const hardware_threads=
std::thread::hardware_concurrency();

unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);

unsigned long const block_size=length/num_threads;

std::promise<Iterator> result;
std::atomic<bool> done_flag(false);
std::vector<std::thread> threads(num_threads-1);
{//enclosing the thread launching-and-joining code in a block
join_threads joiner(threads);

Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
threads[i]=std::thread(find_element(),
block_start,block_end,match,
&result,&done_flag);
block_start=block_end;
}
find_element()(block_start,last,match,&result,&done_flag);
}
if(!done_flag.load())
{
return last;
}
return result.get_future().get();
}

一个良性的 data race: If multiple threads find a match or throw at the same time, they’ll race to set the result in the promise. But this is a benign race condition; whichever succeeds is nominally “first” and therefore an acceptable result.

注意中间有一段 {} 的 block: you need to wait for all threads to finish before you check the result, because there might not be any matching elements. You do this by enclosing the thread launching-and-joining code in a block.

Listing 8.10 An implementation of a parallel find algorithm using std::async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <atomic>
#include <future>
template<typename Iterator,typename MatchType>
Iterator parallel_find_impl(Iterator first,Iterator last,MatchType match,
std::atomic<bool>& done)
{
try
{
unsigned long const length=std::distance(first,last);
unsigned long const min_per_thread=25;
if(length<(2*min_per_thread))
{
for(;(first!=last) && !done.load();++first)
{
if(*first==match)
{
done=true;//If you do find a match, the done flag is set before returning
return first;
}
}
return last;//return last to indicate that no match was found here
}
else
{
Iterator const mid_point=first+(length/2);
std::future<Iterator> async_result=
std::async(&parallel_find_impl<Iterator,MatchType>,
mid_point,last,match,std::ref(done));//being careful to use std::ref to pass a reference to the done flag
Iterator const direct_result=
parallel_find_impl(first,mid_point,match,done);
return (direct_result==mid_point)?
async_result.get():direct_result; //the destructor of the async_result variable will wait for the thread to complete, so you don’t have any leaking threads.
}
}
catch(...)
{
done=true;
throw; //ensure that all threads terminate quickly if an exception is thrown
}
}

template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
std::atomic<bool> done(false);
return parallel_find_impl(first,last,match,done);
}

8.5.3 A parallel implementation of std::partial_sum

特点: you can’t just divide the range into chunks and calculate each chunk independently.

书中展示了一个算法, 总共 $N$ 个元素, 共进行 $\log_N$ 轮, 对于第 $n$ 个元素, 第一轮进行计算 $v_n=v_n+v_{n-1}$, 第二轮进行计算 $v_n=v_n+v_{n-2}$, 以此类推第 $i$ 轮 计算 $v_n=v_n+v_{n-i}$ 直到 $v_n=v_n+v_{0}$.

算法复杂度: Overall, the second approach takes $/log_2N$ steps of approximately $N$ operations (one per processor), where $N$ is the number of elements in the list. $O(N \logN) $

第一种算法 $O(N)$, 分段分别计算并且把上一段的结果加到下一段上(因此存在嵌套的等待关系).
Listing 8.11 Calculating partial sums in parallel by dividing the problem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#include <future>
#include <algorithm>
#include <vector>
struct join_threads
{
join_threads(std::vector<std::thread>&)
{}
};

template<typename Iterator>
void parallel_partial_sum(Iterator first,Iterator last)
{
typedef typename Iterator::value_type value_type;
struct process_chunk
{
void operator()(Iterator begin,Iterator last,
std::future<value_type>* previous_end_value,
std::promise<value_type>* end_value)
{
try
{
Iterator end=last;
++end;
std::partial_sum(begin,end,begin);
if(previous_end_value)
{
//not the first chunk, need to wait for previous_end_value
value_type& addend=previous_end_value->get();
*last+=addend; //In order to maximize the parallelism of the algorithm,
//you then update the last element first,
//so you can pass the value on to the next chunk (if there is one)
if(end_value)
{
end_value->set_value(*last);
}
std::for_each(begin,last,[addend](value_type& item)
{
item+=addend;
});
}
else if(end_value)
{
end_value->set_value(*last);//update the end_value for the next chunk (again, if there is one)
}
}
catch(...)
{
if(end_value)
{
end_value->set_exception(std::current_exception()); // store exception in the promise
}
else
{
throw; //propagate all exceptions into the final chunk, which then rethrows
}
}
}
};

unsigned long const length=std::distance(first,last);

if(!length)
return last;

unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;

unsigned long const hardware_threads=
std::thread::hardware_concurrency();

unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);

unsigned long const block_size=length/num_threads;

typedef typename Iterator::value_type value_type;

std::vector<std::thread> threads(num_threads-1);
std::vector<std::promise<value_type> >
end_values(num_threads-1); //to store the value of the last element in the chunk
std::vector<std::future<value_type> >
previous_end_values; //to retrieve the last value from the previous chunk.
previous_end_values.reserve(num_threads-1); //reserve the space for the futures 1^ to avoid a reallocation
join_threads joiner(threads);

Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_last=block_start;
std::advance(block_last,block_size-1);//points to the last element in each block
threads[i]=std::thread(process_chunk(),
block_start,block_last,
(i!=0)?&previous_end_values[i-1]:0,
&end_values[i]);
block_start=block_last;
++block_start;
previous_end_values.push_back(end_values[i].get_future());// store the future for the last value in the current chunk into the vector of futures
}
Iterator final_element=block_start;
std::advance(final_element,std::distance(block_start,last)-1);
process_chunk()(block_start,final_element,
(num_threads>1)?&previous_end_values.back():0,
0);
}
IMPLEMENTING THE INCREMENTAL PAIRWISE ALGORITHM FOR PARTIAL SUMS

第二种算法.

Listing 8.12 A simple barrier class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <thread>
#include <atomic>
class barrier
{
unsigned const count; //capacity of threads
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
public:
explicit barrier(unsigned count_):
count(count_),spaces(count),generation(0)
{}
void wait()
{
unsigned const my_generation=generation;
if(!--spaces) //As each thread waits, the number of spaces is decremented
{
spaces=count; //When it reaches zero, the number of spaces is reset back to count
++generation; //generation is increased to signal to the other threads that they can continue
}
else
{
while(generation==my_generation)//a simple spin lock
std::this_thread::yield(); //Because the generation is only updated when all the threads have reached the barrier, you yield() while waiting
}
}
};

this implementation was simple:

  • uses a spin wait.
  • it doesn’t work if there’s more than count threads that can potentially call wait() at any one time.
  • stuck to sequentially consistent operations.
  • possible cache ping-pong.

Listing 8.13 A parallel implementation of partial_sum by pairwise updates

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <atomic>
#include <thread>
#include <vector>

struct join_threads
{
join_threads(std::vector<std::thread>&)
{}
};


struct barrier
{
std::atomic<unsigned> count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
barrier(unsigned count_):
count(count_),spaces(count_),generation(0)
{}
void wait()
{
unsigned const gen=generation.load();
if(!--spaces)
{
spaces=count.load();
++generation;
}
else
{
while(generation.load()==gen)
{
std::this_thread::yield();
}
}
}

void done_waiting()
{
--count; //所有 count 减为 0, 意味着最后一轮 barrier 结束所有计算.
if(!--spaces)
{
spaces=count.load();
++generation;
}
}
};

template<typename Iterator>
void parallel_partial_sum(Iterator first,Iterator last)
{
typedef typename Iterator::value_type value_type;

struct process_element
{
void operator()(Iterator first,Iterator last,
std::vector<value_type>& buffer,
unsigned i,barrier& b)
{
value_type& ith_element=*(first+i);
bool update_source=false;
for(unsigned step=0,stride=1;stride<=i;++step,stride*=2)
{
value_type const& source=(step%2)?
buffer[i]:ith_element;
value_type& dest=(step%2)?
ith_element:buffer[i];
value_type const& addend=(step%2)?
buffer[i-stride]:*(first+i-stride);
dest=source+addend;
update_source=!(step%2);
b.wait();
}
if(update_source)
{
ith_element=buffer[i];
}
b.done_waiting();
}
};

unsigned long const length=std::distance(first,last);

if(length<=1)
return;

std::vector<value_type> buffer(length);
barrier b(length);
std::vector<std::thread> threads(length-1);
join_threads joiner(threads);

Iterator block_start=first;
for(unsigned long i=0;i<(length-1);++i)
{
threads[i]=std::thread(process_element(),first,last,
std::ref(buffer),i,std::ref(b));
}
process_element()(first,last,buffer,length-1,b);//main thread
}

Note that this solution isn’t exception-safe.

作者

cx

发布于

2022-10-07

更新于

2022-10-25

许可协议