c++并行与并发

追求适度,才能走向成功;人在顶峰,迈步就是下坡;身在低谷,抬足既是登高;弦,绷得太紧会断;人,思虑过度会疯;水至清无鱼,人至真无友,山至高无树;适度,不是中庸,而是一种明智的生活态度。

导读:本篇文章讲解 c++并行与并发,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

1.并行基础

std::thread 用于创建一个执行的线程实例

get_id() 来获取所创建线程的线程 ID

使用 join() 来加入一个线程等等

使用detach()拆下线程

代码如下:

#include <iostream>
#include <thread>
using namespace std;

int main()
{
   auto func1 = []()
   {
      cout << "is func1 running" << endl;
   };
   std::thread t(func1);
   if (t.joinable()) // 检查线程可否被join
   {
      t.detach();
   }
   auto func2 = [](int value)
   {
      cout << "is func2 running, "
           << "value is " << value << endl;
   };
   std::thread tt(func2, 20);
   if (tt.joinable())
   {
      tt.join();
   }
   return 0;
}

2 互斥量与临界区

我们在操作系统、亦或是数据库的相关知识中已经了解过了有关并发技术的基本知识,mutex 就是 其中的核心之一。C++11 引入了 mutex 相关的类,其所有相关的函数都放在 头文件中。 std::mutex 是 C++11 中最基本的 mutex 类,通过实例化 std::mutex 可以创建互斥量,而通过 其成员函数 lock() 可以进行上锁,unlock() 可以进行解锁。但是在在实际编写代码的过程中,最好不 去直接调用成员函数,因为调用成员函数就需要在每个临界区的出口处调用 unlock(),当然,还包括异 常。这时候 C++11 还为互斥量提供了一个 RAII 语法的模板类 std::lock_guard。RAII 在不失代码简 洁性的同时,很好的保证了代码的异常安全性。

代码:

#include<iostream>
#include<mutex>
#include<thread>

int v=1;
void critical_section(int change_v)
{
    static std::mutex mtx;
    std::lock_guard<std::mutex>lock(mtx);
    v=change_v;
     离开此作用域后 mtx 会被释放

}
int main()
{
    std::thread t1(critical_section, 2),t2(critical_section,3);
    t1.join();
    t2.join();
    std::cout<<v<<std::endl;
}

c++并行与并发

由于 C++ 保证了所有栈对象在声明周期结束时会被销毁,所以这样的代码也是异常安全的。无论 critical_section() 正常返回、还是在中途抛出异常,都会引发堆栈回退,也就自动调用了 unlock()。

 std::unique_lock 则相对于 std::lock_guard 出现的,std::unique_lock 更加灵活, std::unique_lock 的对象会以独占所有权(没有其他的 unique_lock 对象同时拥有某个 mutex 对象的所有权)的方式管理 mutex 对象上的上锁和解锁的操作。所以在并发编程中,推荐使用 std::unique_lock。 

代码:

#include<iostream>
#include<mutex>
#include<thread>

int v=1;
void critical_section(int change_v)
{
    static std::mutex mtx;
    std::unique_lock<std::mutex>lock(mtx);
    v=change_v;
     离开此作用域后 mtx 会被释放
    lock.unlock();
    // 在此期间,任何人都可以抢夺 v 的持有权

    // 开始另一组竞争操作,再次加锁
    lock.lock();
    v += 1;
    std::cout << v << std::endl;


}
int main()
{
    std::thread t1(critical_section, 2),t2(critical_section,3);
    t1.join();
    t2.join();
    std::cout<<v<<std::endl;
}

结果:

c++并行与并发

3 期物

如果我们的主线程 A 希望新开辟一个线程 B 去执行某个我们预期的任务,并返回我一个结果。而这时候,线程 A 可能正在忙其他的事情,无暇顾及 B 的结果,所以我们会很自然的希望能够在某 个特定的时间获得线程 B 的结果。

C++11 提供的 std::future 简化了这个流程,可以用来获取异步任务的结果。自然地,我们很 容易能够想象到把它作为一种简单的线程同步手段,即屏障(barrier)。

为了看一个例子,我们这里额外使用 std::packaged_task,它可以用来封装任何可以调用的目标, 从而用于实现异步的调用。举例来说:

#include<iostream>
#include<future>
#include<thread>
int main()
{
    // 将一个返回值为 7 的 lambda 表达式封装到 task 中
    // std::packaged_task 的模板参数为要封装函数的类型
    std::packaged_task<int()> task([](){return 7;});
    // 在一个线程中执行 task
    std::future<int> result=task.get_future();
    std::thread(std::move(task)).detach();
    std::cout << "waiting...";
    result.wait();// 在此设置屏障,阻塞到期物的完成
    // 输出执行结果
    std::cout << "done!" << std:: endl << "future result is " << result.get() << std::endl;
    return 0;
}

4 条件变量

条件变量 std::condition_variable 是为了解决死锁而生,

当互斥操作不够用而引入的。比如,线程 可能需要等待某个条件为真才能继续执行,而一个忙等待循环中可能会导致所有其他线程都无法进入临 界区使得条件为真时,就会发生死锁。所condition_variable 实例被创建出现主要就是用于唤醒等 待线程从而避免死锁。std::condition_variable 的 notify_one() 用于唤醒一个线程;notify_all() 则是通知所有线程。下面是一个生产者和消费者模型的例子:

#include<queue>
#include<mutex>
#include<iostream>
#include<condition_variable>
#include<thread>

int main()
{
    std::queue<int> produced_nums;
    std::mutex mtx;
    std::condition_variable cv;
    bool notified=false;

    auto producer=[&]()
    {
        for(int i=0;;i++)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(900));
            std::unique_lock<std::mutex>lock(mtx);
            std::cout<<"producing"<<i<<std::endl;
            produced_nums.push(i);
            notified=true;
            cv.notify_all();//


        }
    };
    auto consumer=[&]()
    {
        while(true)
        {
            std::unique_lock<std::mutex>lock(mtx);
            while(!notified)// 避免虚假唤醒
            {
                cv.wait(lock);
            }
            //暂取消锁,使得生产者有机会在消费者消费空前继续生产
            lock.unlock();
            std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // 消费者慢于生产者
            lock.lock();
            while(!produced_nums.empty())
            {
                std::cout << "consuming: " << produced_nums.front() << std::endl;
                 produced_nums.pop();
            }
            notified=false;

        }
    };
    // 分别在不同的线程中运行
    std::thread p(producer);
    std::thread cs[2];
    for(int i=0;i<2;i++)
    {
        cs[i]=std::thread(consumer);
    }
    p.join();
    for(int i=0;i<2;i++)
    {
        cs[i].join();
    }
    return 0;
}

结果:

c++并行与并发

 5 原子操作和内存模型

细心的读者可能会对前一小节中生产者消费者模型的例子可能存在编译器优化导致程序出错的情况 产生疑惑。例如,布尔值 notified 没有被 volatile 修饰,编译器可能对此变量存在优化,例如将其作 为一个寄存器的值,从而导致消费者线程永远无法观察到此值的变化。这是一个好问题,为了解释清楚 这个问题,我们需要进一步讨论从 C++ 11 起引入的内存模型这一概念


int main()
{
    int a=0;
    int flag=0;
    std::thread t1([&](){
        while(flag!=1);
        int b=a;
        std::cout<<"b="<<std::endl;
    });
    std::thread t2([&]{
        a=5;
        flag=1;

    });
    t1.join();
    t2.join();
    return 0;

}

从直观上看,t2 中 a = 5; 这一条语句似乎总在 flag = 1; 之前得到执行,而 t1 中 while (flag != 1) 似乎保证了 std::cout << “b = ” << b << std::endl;不会再标记被改变前执行。从逻辑上 看,似乎 b 的值应该等于 5。但实际情况远比此复杂得多,或者说这段代码本身属于未定义的行为,因 为对于 a 和 flag 而言,他们在两个并行的线程中被读写,出现了竞争。除此之外,即便我们忽略竞争 读写,仍然可能受 CPU 的乱序执行,编译器对指令的重排的影响,导致 a = 5 发生在 flag = 1 之后。 从而 b 可能输出 0。

5.1原子操作

std::mutex 可以解决上面出现的并发读写的问题,但互斥锁是操作系统级的功能,这是因为一个互 斥锁的实现通常包含两条基本原理:

1. 提供线程间自动的状态转换,即『锁住』这个状态

2. 保障在互斥锁操作期间,所操作变量的内存与临界区外进行隔离

因此在 C++11 中多线程下共享变量的读写这一问题上,还引 入了 std::atomic 模板,使得我们实例化一个原子类型,将一个原子类型读写操作从一组指令,最小化 到单个 CPU 指令。

例:std::atomic<int> counter;

并为整数或浮点数的原子类型提供了基本的数值成员函数,举例来说,包括 fetch_add, fetch_sub 等,同时通过重载方便的提供了对应的 +,- 版本。比如下面的例子:

#include <atomic>
#include <thread>
#include <iostream>
std::atomic<int> count = {0};

int main() {
std::thread t1([](){
count.fetch_add(1);
});
std::thread t2([](){
count++; // 等价于 fetch_add
count += 1; // 等价于 fetch_add
});
t1.join();
t2.join();
std::cout << count << std::endl;
return 0;
}

当然,并非所有的类型都能提供原子操作,这是因为原子操作的可行性取决于 CPU 的架构以及所实 例化的类型结构是否满足该架构对内存对齐条件的要求,因而我们总是可以通过 std::atomic::is_lock_free 来检查该原子类型是否需支持原子操作,例如:

#include <atomic>
#include <iostream>
struct A {
float x;
int y;
long long z;
};
int main() {
std::atomic<A> a;
std::cout << std::boolalpha << a.is_lock_free() << std::endl;
return 0;
}

5.2一致性模型

如果我们强行将一个变量 v 在多个线程之间的操作设为原子操作,即任何一个线程在操作完 v 后, 其他线程均能同步感知到 v 的变化,则对于变量 v 而言,表现为顺序执行的程序,它并没有由于引入多 线程而得到任何效率上的收益。对此有什么办法能够适当的加速呢?

答案便是削弱原子操作的在进程间 的同步条件。

1. 线性一致性:又称强一致性或原子一致性。它要求任何一次读操作都能读到某个数据的最近一次写 的数据,并且所有线程的操作顺序与全局时钟下的顺序是一致的。

          x.store(1)          x.load()

T1 ———+—————-+——>

T2 ——————-+————->

                   x.store(2)

线程 T1, T2 对 x 的两次写操作是原子的,且 x.store(1) 是严格的发生在 x.store(2) 之前,x.store(2) 严格的发生在 x.load() 之前 

2. 顺序一致性:同样要求任何一次读操作都能读到数据最近一次写入的数据,但未要求与全局时钟的 顺序一致。

x.store(1)   x.store(3)   x.load()

T1 ———+———–+———-+—–>

T2 —————+———————->

                     x.store(2)

或者

x.store(1)     x.store(3)     x.load()

T1 ———+———–+———-+—–>

T2 ——+——————————->

       x.store(2)

在顺序一致性的要求下,x.load() 必须读到最近一次写入的数据,因此 x.store(2) 与 x.store(1) 并无任何先后保障,即只要 T2 的 x.store(2) 发生在 x.store(3) 之前即可。

3. 因果一致性:它的要求进一步降低,只需要有因果关系的操作顺序得到保障,而非因果关系的操作 顺序则不做要求。

    a = 1              b = 2

T1 —-+———–+—————————->

T2 ——+——————–+——–+——–>

             x.store(3)       c = a + b   y.load()

或者

   a = 1             b = 2

T1 —-+———–+—————————->

T2 ——+——————–+——–+——–>

   x.store(3)              y.load()     c = a + b

亦或者

         b = 2               a = 1

T1 —-+———–+—————————->

T2 ——+——————–+——–+——–>

      y.load()             c = a + b     x.store(3)

上面给出的三种例子都是属于因果一致的,因为整个过程中,只有 c 对 a 和 b 产生依赖,而 x 和 y 在此例子中表现为没有关系

5.3内存的模型

为了追求极致的性能,实现各种强度要求的一致性,C++11 为原子操作定义了六种不同的内存顺序 std::memory_order 的选项,表达了四种多线程间的同步模型:

1. 宽松模型:

在此模型下,单个线程内的原子操作都是顺序执行的,不允许指令重排,但不同线程间 原子操作的顺序是任意的。类型通过 std::memory_order_relaxed 指定。我们来看一个例子:

#include<atomic>
#include<iostream>
#include<vector>
#include<thread>

int main()
{
   std::atomic<int> counter={0};
   std::vector<std::thread> vt;
   for(int i=0;i<100;i++)
   {
    vt.emplace_back([&]()
    {
     counter.fetch_add(1,std::memory_order_relaxed);   
    });

   }
   for(auto& t:vt)
   {
    t.join();
   }
   std::cout<<"current counter:" << counter << std::endl;

}
//result:
100

2.释放/消费模型

在此模型中,我们开始限制进程间的操作顺序,如果某个线程需要修改某个值,但 另一个线程会对该值的某次操作产生依赖,即后者依赖前者。具体而言,线程 A 完成了三次对 x 的 写操作,线程 B 仅依赖其中第三次 x 的写操作,与 x 的前两次写行为无关,则当 A 主动 x.release() 时候(即使用 std::memory_order_release),选项 std::memory_order_consume 能够确保 B 在 调用 x.load() 时候观察到 A 中第三次对 x 的写操作。我们来看一个例子:

#include<atomic>
#include<iostream>
#include<vector>
#include<thread>

int main()
{
   std::atomic<int*>ptr(nullptr);
   int v;
   std::thread producer([&]() {
       int* p = new int(42);
       v = 1024;
       ptr.store(p, std::memory_order_release);});
    std::thread consumer([&]()
    {
        int* p;
       while(!(p=ptr.load(std::memory_order_consume)));
        std::cout << "p: " << *p << std::endl;
        std::cout << "v: " << v << std::endl;
    });
    producer.join();
    consumer.join();


}
没//while(!(p=ptr.load(std::memory_order_consume)));
p: 42
v: 1024
while(!(p=ptr.load(std::memory_order_consume)));
p: -281502060
v: 0

3. 释放/获取模型

在此模型下,我们可以进一步加紧对不同线程间原子操作的顺序的限制,在释放 std::memory_order_release 和获取 std::memory_order_acquire 之间规定时序,即发生在释放 操作之前的所有写操作,对其他线程的任何获取操作都是可见的,亦即发生顺序(happens-before)。 可以看到,std::memory_order_release 确保了它之后的写行为不会发生在释放操作之前,是一 个向后的屏障,而 std::memory_order_acquire 确保了它之前的写行为,不会发生在该获取操作 之后,是一个向前的屏障。对于选项 std::memory_order_acq_rel 而言,则结合了这两者的特点, 唯一确定了一个内存屏障,使得当前线程对内存的读写不会被重排到此操作的前后。

#include<atomic>
#include<iostream>
#include<vector>
#include<thread>

int main()
{
   std::vector<int> v;
   std::atomic<int> flag={0};
   std::thread release(
    [&](){
        v.push_back(42);
        flag.store(1,std::memory_order_release);
    }
   );
   std::thread acqrel([&]() {
   int expected = 1; // must before compare_exchange_strong
   while(!flag.compare_exchange_strong(expected, 2, std::memory_order_acq_rel)) 
   {
     expected = 1; // must after compare_exchange_strong
   }
// flag has changed to  2
   });
   std::thread acquire([&]() {
   while(flag.load(std::memory_order_acquire) < 2);
   std::cout << v.at(0) << std::endl; // must be 42
   });
   release.join();
   acqrel.join();
   acquire.join();



}

在此例中我们使用了 compare_exchange_strong,它便是比较交换原语(Compare-and-swap primitive),它有一个更弱的版本,即 compare_exchange_weak,它允许即便交换成功,也仍然返回 false 失败。其原因是因为在某些平台上虚假故障导致的

4. 顺序一致模型

在此模型下,原子操作满足顺序一致性,进而可能对性能产生损耗。可显式的通过 std::memory_order_seq_cst 进行指定。最后来看一个例子:

#include<atomic>
#include<iostream>
#include<vector>
#include<thread>

int main()
{
   std::atomic<int> counter = {0};
std::vector<std::thread> vt;
for (int i = 0; i < 100; ++i) {

vt.emplace_back([&](){
counter.fetch_add(1, std::memory_order_seq_cst);
});
}
for (auto& t : vt) {
t.join();
}
std::cout << "current counter:" << counter << std::endl;


}

这个例子与第一个宽松模型的例子本质上没有区别,仅仅只是将原子操作的内存顺序修改为了 memory_order_seq_cst,有兴趣的读者可以自行编写程序测量这两种不同内存顺序导致的性能差 异。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/129683.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!