整体的机制: Conceptually, a condition variable is associated with an event or other condition, and one or more threads can wait(不像 blocked那样, wait 不占用什么资源) for that condition to be satisfied. When a thread has determined that the condition is satisfied, it can then notifyone or more of the threads waiting on the condition variable in order to wake them up and allow them to continue processing.
4.1.1 Waiting for a condition with condition variables
两种 std::condition_variable and std::condition_variable_any. 区别: 前者只适用于 std::mutex, 后者适用于所有的 Lockable 类型, 代价是 in terms of size, performance, or OS resources.
// noninterruptible waits template<class Lock> voidwait(Lock& lock); template<class Lock, class Pred> voidwait(Lock& lock, Pred pred);
template<class Lock, class Clock, class Duration> cv_status wait_until(Lock& lock, const chrono::time_point<Clock, Duration>& abs_time); template<class Lock, class Clock, class Duration, class Pred> boolwait_until(Lock& lock, const chrono::time_point<Clock, Duration>& abs_time, Pred pred); template<class Lock, class Rep, class Period> cv_status wait_for(Lock& lock, const chrono::duration<Rep, Period>& rel_time); template<class Lock, class Rep, class Period, class Pred> boolwait_for(Lock& lock, const chrono::duration<Rep, Period>& rel_time, Pred pred);
// interruptible waits template<class Lock, class Pred> boolwait(Lock& lock, stop_token stoken, Pred pred); template<class Lock, class Clock, class Duration, class Pred> boolwait_until(Lock& lock, stop_token stoken, const chrono::time_point<Clock, Duration>& abs_time, Pred pred); template<class Lock, class Rep, class Period, class Pred> boolwait_for(Lock& lock, stop_token stoken, const chrono::duration<Rep, Period>& rel_time, Pred pred); }; }
假醒的副作用: 不要去依赖条件变量的 function object 里的状态(也就是 side effects 例如 static data member), 因为假醒的次数以及频率是无法预测的. 原文如下: Because the number and frequency of any such spurious wakes are by definition indeterminate, it isn’t advisable to use a function with side effects for the condition check. If you do so, you must be prepared for the side effects to occur multiple times.
4.1.2 Building a thread-safe queue with condition variables
std::launch::deferred: the task is executed on the calling thread the first time its result is requested (lazy evaluation). 控制任务什么时候开始执行. 默认是立即执行, 传入此策略可以延迟执行时机, 同时不会新开线程只在本线程里同步执行.
std::launch::async: the task is executed on a different thread, potentially by creating and launching it first. 必须新开线程进行异步执行任务. 默认是不一定新开, 看 implementation 的策略.
define additional bits and bitmasks to specify restrictions on task interactions applicable to a subset of launch policies, and
enable those additional bitmasks for the first (default) overload of std::async.
1 2 3 4 5
auto f6=std::async(std::launch::async,Y(),1.2); //立即开始新线程 auto f7=std::async(std::launch::deferred,baz,std::ref(x));//表示延迟执行任务, 调用 get 或者 wait 时才会执行, 不会创建线程. auto f8=std::async(std::launch::deferred | std::launch::async, baz,std::ref(x));//新线程取决于实现, 延迟执行 auto f9=std::async(baz,std::ref(x));//默认如上取决于具体实现 f7.wait();
std::packaged_task<> 的缺点: What about those tasks that can’t be expressed as a simple function call or those tasks where the result may come from more than one place? 引出 std::promise.
std::async 通过 std::async 不仅仅可以传递任务执行的结果, 还可以传递 exception, 这个 exception 也被放入 future 里. exception is stored in the future in place of a stored value, the future becomes ready, and a call to get() rethrows that stored exception. 注意, future 返回的 exception 有可能是之前的对象, 也有可能是重新抛出的重新生成的对象. Note: the standard leaves it unspecified whether it is the original exception object that’s rethrown or a copy; different compilers and libraries make different choices on this matter.
The type of the value used to represent the times obtained from the clock–时间类型
The tick period of the clock–时间单位
Whether or not the clock ticks at a uniform rate (steady clock)–稳定时间(a clock ticks at a uniform rate (whether or not that rate matches the period) and can’t be adjusted)
std::chrono::system_clock::now() 返回现在的 colck 下的时刻, 返回类型 some_clock::time_point, tick period of the clock 用秒的分数表示 std::ratio<1,25> 即 1 秒 25 次.
std::chrono::steady_clock 使用steady clock(is_steady 可以用来确认). std::chrono::system_clock 获取系统时间. 不为 steady 的, an adjustment may cause a call to now() to return a value earlier than that returned by a prior call to now(). std::chrono::high_resolution_clock 最小分辨率的 clock.
4.3.2 Durations
std::chrono::duration<> class template 规定间隔, std::chrono::duration<double,std::ratio<1,1000>> 表示用 double 表达 1/1000 秒.
std::chrono 命名空间里已经预定义好了 nanoseconds, microseconds, milliseconds, seconds, minutes, and hours.
C++14 std::chrono_literals 命名空间甚至可以直接使用时间的字面量. 注意一般只用在整数类型后面, 浮点表示: 2.5min will be std::chrono::duration<some-floating-point-type,std::ratio<60,1>>.
1 2 3 4
usingnamespace std::chrono_literals; auto one_day=24h; auto half_an_hour=30min; auto max_time_between_messages=30ms;
Conversion between durations is implicit, 向上转换会 truncation.
std::chrono::time_point 是相对于一个 epoch(由特定的 clock 决定此时刻) 经过时间的时间差, 例如 00:00 on January 1, 1970. 注意: Although you can’t find out when the epoch is, you can get the time_since_epoch() for a given time_point.
将 time_point 应用到 wait() 的机制: the wait tracks the clock change and won’t return until the clock’s now() function returns a value later than the specified timeout. 但是注意 now() 是根据不同的 clock 不同的.
各个类与时刻/间隔结合的使用情况, 要求 Parameters listed as duration must be an instance of std::duration<>, and those listed as time_point must be an instance of std::time_point<>:
4.4 Using synchronization of operations to simplify code
4.4.2 Synchronizing operations with message passing
CSP 模式是不依靠 shared data 而是线程间的 message 来实现分工, 最典型的设计就是每个线程设计成 infinite state machine, 线程间通过 message 相互改变 state 完成复杂任务, 例子是 ATM 的人机交互线程+机器硬件线程+银行系统交互线程的合作, 被叫做 Actor model —there are several discrete actors in the system (each running on a separate thread), which send messages to each other to perform the task at hand, and there’s no shared state except that which is directly passed via messages. .
状态机的结构图如下:
作者在书中完整地完成了一个此系统的设计, 在附录 C 中.
4.4.3 Continuation-style concurrency with the Concurrency TS
造成 block 的原因是函数执行的先后顺序与依赖顺序的天然矛盾产生的. 函数 A 里调用 B 依赖于 B 的计算结果, 造成的结果是 A 先执行然后 block/wait B 在另一个线程里进行运算, 运算结束了将其返回到 A, 唤醒 A 继续运行. 如果这个执行链条里有多个函数/依赖任务的话, 会有大量的 block/wait 线程. continuation 通过 future-unwrapping 的机制通过传入返回均是 continuation 实现 no blocking in your asynchronous function chain.
同样地还存在 std::experimental::shared_future can have more than one continuation 而不是多个任务共享一个 future, 这与常规的 shared 的设计不一致. 目的是避免 multiple objects can refer to the same shared state, if only one continuation was allowed, there would be a race condition between two threads that were each trying to add continuations to their own std::experimental::shared_future objects.
1 2 3 4 5 6
auto fut=spawn_async(some_function).share(); auto fut2=fut.then([](std::experimental::shared_future<some_data> data){ do_stuff(data);}); //fut 可以对应多个 continuations auto fut3=fut.then([](std::experimental::shared_future<some_data> data){ returndo_other_stuff(data);});
注意如果不用 share() 的话, 返回值是 a plain std::experimental:: future.
这种做法的缺点: 最终收集的线程会被执行计算的线程一次次叫醒, 浪费资源. 能不能所有的计算都完成了再去叫醒收集的线程呢? 这其实是个 futures 之间的 AND 的逻辑关系.
because it waits for each task individually, it will repeatedly be woken by the scheduler at B as each result becomes available, and then go back to sleep again when it finds another result that is not yet ready. Not only does this occupy the thread doing the waiting, but it adds additional context switches as each future becomes ready, which adds additional overhead.
std::latch: 多个线程把它的 counter 减到0, 它就会变成 ready 直到析构, 并且不关心哪个/哪些线程用什么形式把 counter 减到 0(包括一个线程多次减小 counter). A lightweight facility for waiting for a series of events to occur.
std::barrier: reusable synchronization component used for internal synchronization between a set of threads. with barriers, each thread can only arrive at the barrier once per cycle. 当一个 thread 到达 barrier, 所有已经到达 barrier 的 threads 都会被 block, 直到所有的 thread 都到达. 一旦都到达, 也可以 reuse: 重置 barrier, 进入下一个循环.
4.4.8 A basic latch type: std::experimental::latch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
namespace std { classlatch { public: staticconstexprptrdiff_tmax()noexcept; constexprexplicitlatch(ptrdiff_t expected); ~latch(); latch(const latch&) = delete; latch& operator=(const latch&) = delete; voidcount_down(ptrdiff_t update = 1);//默认为 1, 到达一个 event 减 1. booltry_wait()constnoexcept;//确认 ready 与否. voidwait()const;//等待此 latch counter 为 0, 即 ready. voidarrive_and_wait(ptrdiff_t update = 1);//both count down the counter and then wait for the counter to reach zero private: ptrdiff_t counter; // exposition only }; }
namespace std { template<classCompletionFunction =/* a function object type, must meet the requirements of MoveConstructible and Destructible. */> classbarrier { public: using arrival_token = /* an unspecified object type meeting requirements of MoveConstructible, MoveAssignable and Destructible */; staticconstexprptrdiff_tmax()noexcept;//the maximum value of expected count supported by the implementation constexprexplicitbarrier(ptrdiff_t expected, CompletionFunction f = CompletionFunction()); ~barrier(); //not assignable barrier(const barrier&) = delete; barrier& operator=(const barrier&) = delete; [[nodiscard]] arrival_token arrive(ptrdiff_t update = 1);//arrives at barrier and decrements the expected count voidwait(arrival_token&& arrival)const;//blocks at the phase synchronization point until its phase completion step is run voidarrive_and_wait();//arrives at barrier and decrements the expected count by one, then blocks until current phase completes voidarrive_and_drop();//decrements both the initial expected count for subsequent phases and the expected count for current phase by one //thread 退出此 barrier 的接口 private: CompletionFunction completion; // exposition only }; }