第八部分-并行和并发

并行和并发

该部分内容在并发编程里有详细讲解,这里就简单概括一下

1.并行基础

std::thread 用于创建一个执行的线程实例,所以它是一切并发编程的基础,使用时需要包含 头文件, 它提供了很多基本的线程操作,例如 get_id() 来获取所创建线程的线程 ID,使用 join() 来等待一个线程结束(与该线程汇合)等等

2.互斥区和临界区

std::mutex 是 C++11 中最基本的 mutex 类,通过实例化 std::mutex 可以创建互斥量, 而通过其成员函数 lock() 可以进行上锁,unlock() 可以进行解锁。 但是在实际编写代码的过程中,最好不去直接调用成员函数, 因为调用成员函数就需要在每个临界区的出口处调用 unlock(),当然,还包括异常。 这时候 C++11 还为互斥量提供了一个 RAII 语法的模板类 std::lock_guard。 RAII 在不失代码简洁性的同时,很好的保证了代码的异常安全性。

3.期物

期物(Future)表现为 std::future,它提供了一个访问异步操作结果的途径,这句话很不好理解。 为了理解这个特性,我们需要先理解一下在 C++11 之前的多线程行为。

试想,如果我们的主线程 A 希望新开辟一个线程 B 去执行某个我们预期的任务,并返回我一个结果。 而这时候,线程 A 可能正在忙其他的事情,无暇顾及 B 的结果, 所以我们会很自然的希望能够在某个特定的时间获得线程 B 的结果。

在 C++11 的 std::future 被引入之前,通常的做法是: 创建一个线程 A,在线程 A 里启动任务 B,当准备完毕后发送一个事件,并将结果保存在全局变量中。 而主函数线程 A 里正在做其他的事情,当需要结果的时候,调用一个线程等待函数来获得执行的结果。

而 C++11 提供的 std::future 简化了这个流程,可以用来获取异步任务的结果。 自然地,我们很容易能够想象到把它作为一种简单的线程同步手段,即屏障(barrier)。

4.条件变量

条件变量 std::condition_variable 是为了解决死锁而生,当互斥操作不够用而引入的。 比如,线程可能需要等待某个条件为真才能继续执行, 而一个忙等待循环中可能会导致所有其他线程都无法进入临界区使得条件为真时,就会发生死锁。 所以,condition_variable 实例被创建出现主要就是用于唤醒等待线程从而避免死锁。 std::condition_variable的 notify_one() 用于唤醒一个线程; notify_all() 则是通知所有线程

5.原子操作和内存模型

std::mutex 可以解决并发读写的问题,但互斥锁是操作系统级的功能, 这是因为一个互斥锁的实现通常包含两条基本原理:
1.提供线程间自动的状态转换,即『锁住』这个状态
2.保障在互斥锁操作期间,所操作变量的内存与临界区外进行隔离

这是一组非常强的同步条件,换句话说当最终编译为 CPU 指令时会表现为非常多的指令(我们之后再来看如何实现一个简单的互斥锁)。 这对于一个仅需原子级操作(没有中间态)的变量,似乎太苛刻了。

原子操作

现代 CPU 体系结构提供了 CPU 指令级的原子操作, 因此在 C++11 中多线程下共享变量的读写这一问题上,还引入了 std::atomic 模板,使得我们实例化一个原子类型,将一个 原子类型读写操作从一组指令,最小化到单个 CPU 指令。

当然,并非所有的类型都能提供原子操作,这是因为原子操作的可行性取决于具体的 CPU 架构,以及所实例化的类型结构是否能够满足该 CPU 架构对内存对齐 条件的要求,因而我们总是可以通过 std::atomic::is_lock_free 来检查该原子类型是否需支持原子操作

一致性模型

并行执行的多个线程,从某种宏观层面上讨论,可以粗略的视为一种分布式系统。 在分布式系统中,任何通信乃至本地操作都需要消耗一定时间,甚至出现不可靠的通信。

如果我们强行将一个变量 v 在多个线程之间的操作设为原子操作,即任何一个线程在操作完 v 后, 其他线程均能同步感知到 v 的变化,则对于变量 v 而言,表现为顺序执行的程序,它并没有由于引入多线程 而得到任何效率上的收益。对此有什么办法能够适当的加速呢?答案便是削弱原子操作的在进程间的同步条件。

从原理上看,每个线程可以对应为一个集群节点,而线程间的通信也几乎等价于集群节点间的通信。 削弱进程间的同步条件,通常我们会考虑四种不同的一致性模型:
1.线性一致性:又称强一致性或原子一致性。它要求任何一次读操作都能读到某个数据的最近一次写的数据,并且所有线程的操作顺序与全局时钟下的顺序是一致的。
2.顺序一致性:同样要求任何一次读操作都能读到数据最近一次写入的数据,但未要求与全局时钟的顺序一致。
3.因果一致性:它的要求进一步降低,只需要有因果关系的操作顺序得到保障,而非因果关系的操作顺序则不做要求。
4.最终一致性:是最弱的一致性要求,它只保障某个操作在未来的某个时间节点上会被观察到,但并未要求被观察到的时间。因此我们甚至可以对此条件稍作加强,例如规定某个操作被观察到的时间总是有界的。当然这已经不在我们的讨论范围之内了。

内存顺序

为了追求极致的性能,实现各种强度要求的一致性,C++11 为原子操作定义了六种不同的内存顺序 std::memory_order 的选项,表达了四种多线程间的同步模型:
1.宽松模型:在此模型下,单个线程内的原子操作都是顺序执行的,不允许指令重排,但不同线程间原子操作的顺序是任意的。类型通过std::memory_order_relaxed 指定

1
2
3
4
5
6
7
8
9
10
11
12
std::atomic<int> counter = {0};
std::vector<std::thread> vt;
for (int i = 0; i < 100; ++i) {
vt.emplace_back([&](){
counter.fetch_add(1, std::memory_order_relaxed);
});
}

for (auto& t : vt) {
t.join();
}
std::cout << "current counter:" << counter << std::endl;

2.释放/消费模型:在此模型中,我们开始限制进程间的操作顺序,如果某个线程需要修改某个值,但另一个线程会对该值的某次操作产生依赖,即后者依赖前者。具体而言,线程 A 完成了三次对 x 的写操作,线程 B 仅依赖其中第三次 x 的写操作,与 x 的前两次写行为无关,则当 A 主动 x.release() 时候(即使用 std::memory_order_release),选项 std::memory_order_consume 能够确保 B 在调用 x.load() 时候观察到 A 中第三次对 x 的写操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 初始化为 nullptr 防止 consumer 线程从野指针进行读取
std::atomic<int*> ptr(nullptr);
int v;
std::thread producer([&]() {
int* p = new int(42);
v = 1024;
ptr.store(p, std::memory_order_release);
});
std::thread consumer([&]() {
int* p;
while(!(p = ptr.load(std::memory_order_consume)));

std::cout << "p: " << *p << std::endl;
std::cout << "v: " << v << std::endl;
});
producer.join();
consumer.join();

3.释放/获取模型:在此模型下,我们可以进一步加紧对不同线程间原子操作的顺序的限制,在释放 std::memory_order_release 和获取 std::memory_order_acquire 之间规定时序,即发生在释放(release)操作之前的所有写操作,对其他线程的任何获取(acquire)操作都是可见的,亦即发生顺序(happens-before)。

可以看到,std::memory_order_release 确保了它之前的写操作不会发生在释放操作之后,是一个向后的屏障(backward),而 std::memory_order_acquire 确保了它之前的写行为不会发生在该获取操作之后,是一个向前的屏障(forward)。对于选项 std::memory_order_acq_rel 而言,则结合了这两者的特点,唯一确定了一个内存屏障,使得当前线程对内存的读写不会被重排并越过此操作的前后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
std::vector<int> v;
std::atomic<int> flag = {0};
std::thread release([&]() {
v.push_back(42);
flag.store(1, std::memory_order_release);
});
std::thread acqrel([&]() {
int expected = 1; // must before compare_exchange_strong
while(!flag.compare_exchange_strong(expected, 2, std::memory_order_acq_rel))
expected = 1; // must after compare_exchange_strong
// flag has changed to 2
});
std::thread acquire([&]() {
while(flag.load(std::memory_order_acquire) < 2);

std::cout << v.at(0) << std::endl; // must be 42
});
release.join();
acqrel.join();
acquire.join();

在此例中我们使用了 compare_exchange_strong 比较交换原语(Compare-and-swap primitive),它有一个更弱的版本,即 compare_exchange_weak,它允许即便交换成功,也仍然返回 false 失败。其原因是因为在某些平台上虚假故障导致的,具体而言,当 CPU 进行上下文切换时,另一线程加载同一地址产生的不一致。除此之外,compare_exchange_strong 的性能可能稍差于 compare_exchange_weak,但大部分情况下,鉴于其使用的复杂度而言,compare_exchange_weak 应该被有限考虑

4.顺序一致模型:在此模型下,原子操作满足顺序一致性,进而可能对性能产生损耗。可显式的通过 std::memory_order_seq_cst 进行指定

1
2
3
4
5
6
7
8
9
10
11
12
std::atomic<int> counter = {0};
std::vector<std::thread> vt;
for (int i = 0; i < 100; ++i) {
vt.emplace_back([&](){
counter.fetch_add(1, std::memory_order_seq_cst);
});
}

for (auto& t : vt) {
t.join();
}
std::cout << "current counter:" << counter << std::endl;