WHCSRL 技术网

03 重修C++之并发实战3.3-3.4_baidu

上一篇:03 重修C++之并发实战3.1-3.2

03 重修C++之并发实战3.3-3.4

3.3 一个线程安全堆栈的示范定义

下面将使用上述的1、3选项实现一个线程安全的堆栈,其中pop()有两个重载,一个介绍存储该值的位置引用,另一个返回一个std::shared_ptr<>的指针,简单设计仅包含两个函数接口push()pop()

//File:threadsafe_stack.h
#ifndef _THREADSAFE_STACK_
#define _THREADSAFE_STACK_

#include <stack>
#include <mutex>
#include <exception>
#include <typeinfo>
#include <memory> //For std::shared_ptr<>

struct empty_stack: std::exception //异常函数
{
    const char* what() const throw(){return "This is a empty stack!
";};
};

//模板类(模板类的实现一定要都放在头文件中,否则链接时不识别cpp中的实现)
//模板类需要在使用到的地方利用声明模板的typename或者class参数的时候,才会即时生成代码。
//那么当我把模板声明和实现分开的时候,这个即时过程因为编译器只能通过代码include“看到”
template<typename T> //头文件而找不到模板实现代码,所以会产生链接问题。
class threadsafe_stack 
{					   
private:              
    std::stack<T> data;
    mutable std::mutex m;
public:
    threadsafe_stack();
    threadsafe_stack(const threadsafe_stack &);
    threadsafe_stack &operator=(threadsafe_stack &&) = delete;
    threadsafe_stack &operator=(const threadsafe_stack &) = delete;
    virtual ~threadsafe_stack();

    void push(T new_value);
    void pop(T& value);
    std::shared_ptr<T> pop();
    bool empty() const;

};

template<typename T>
threadsafe_stack<T>::threadsafe_stack() { }

template<typename T>
threadsafe_stack<T>::threadsafe_stack(const threadsafe_stack &other)
{
    std::lock_guard<std::mutex> lock(other.m);
    data = other.data;
}

template<typename T>
threadsafe_stack<T>::~threadsafe_stack() { }

template<typename T>
void threadsafe_stack<T>::push(T new_value)
{
    std::lock_guard<std::mutex> lock(m);
    data.push(new_value);
}

template<typename T>
void threadsafe_stack<T>::pop(T& value)
{
    std::lock_guard<std::mutex> lock(m);
    if (data.empty()) throw empty_stack();
    value = data.top();
    data.pop();
}

template<typename T>
std::shared_ptr<T> threadsafe_stack<T>::pop()
{
    std::lock_guard<std::mutex> lock(m);
    if (data.empty()) throw empty_stack();
    std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
    data.pop();
    return res;
}

template<typename T>
bool threadsafe_stack<T>::empty() const
{
    std::lock_guard<std::mutex> lock(m);
    return data.empty();
}

#endif // !_THREADSAFE_STACK_
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85

main_test.c

#include <iostream>
#include <thread>
#include <cstdlib>
#include <unistd.h>

#include "threadsafe_stack.h"

#define MAXNUM 20
int main(int argc, const char** argv) {
    threadsafe_stack<int> stack;
    bool state1 = true;
    bool state2 = true;

    std::thread add1([&]() //线程add1
    {
        for (int i = 0; i < MAXNUM / 2; i++)
        {
            stack.push(i);
            std::cout << "thread add1::push " << i << " in the stack.
" << std::endl;
            sleep(1);
        }
        state1 = false;

    });
    
    std::thread add2([&]() //线程add2
    {
        for (int i = MAXNUM / 2; i < MAXNUM; i++)
        {
            stack.push(i);
            std::cout << "thread add2::push " << i << " in the stack.
" << std::endl;
            sleep(1);
        }
        state2 = false;
    });

    std::thread del([&]() //线程del 包含两种方式
    {
        std::shared_ptr<int> p;
        int value;
        int i = 0;
        while (state1 || state2)
        {
            if (stack.empty()) continue;
            if (i++ / 2 == 1)
            {
                p = stack.pop();
                std::cout << "std::shared_ptr<int> p = " << *p << std::endl;
            }
            else
            {
                stack.pop(value);
                std::cout << "value is " << value << std::endl;
            }
        }
        
    });

    add1.join();
    add2.join();
    del.join();

    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

上述方法通过削减接口,考虑了最大安全性,甚至对整个堆栈的操作都受限。这里堆栈本身不能被赋值,因为删除了赋值运算符的操作,然而堆栈可以被复制。如果栈是空的,pop()将引发一个empty_stack异常,如果需要,std::shared_ptr的使用允许栈来处理内存分配问题同时避免对newdelete的过多使用。五个堆栈操作现在变成三个pop()push()empty(),甚至不需要empty(),接口的简化可以更好的控制数据,并且使用互斥锁保证整体操作是在锁定的情况下进行的。

上述的讨论表明,接口中有问题的竞争条件基本上因为锁定的粒度过小而引起的。保护没有覆盖期望操作的整体。当然互斥锁的锁定粒度过大也是不合适的,在一个有大量共享数据的系统中大粒度的互斥锁会大大削弱并发性能。但是细粒度的锁定方案有一个问题,就是有时为了保护操作中的所有数据,需要不止一个互斥锁。然而这是不合适的,因为互斥锁保护一个类的各个实例,在这种情况下,在下个级别进行锁定将意味着,要么将锁丢给用户,要么就让单个互斥锁保护该类的所有实例,这些都不是很理想。

3.4 死锁:问题和解决方案

3.4.1 简单死锁

如果对于一个给定的操作最终需要两个或者更多的互斥锁,那么就还有可能出现另一个潜在问题:死锁。一对线程中的每个都需要同时锁定两个互斥元来执行一些操作,并且每个线程都拥有一个互斥元,同时等待另一个,那么这两个线程都无法继续,这种情况被称为死锁。举两个例子:

例1:有1、2两个线程都需要同时锁定A、B两个互斥元才能操作,但是此时1拿到A,2拿到B,且两者都在等待另一个锁被释放,那么就会产生死锁。

例2:有三个线程1、2、3和三个互斥元A、B、C。1需要同时锁定A、B;2需要同时锁定B、C;3需要同时锁定C、A。此时1拿到A,2拿到B,3拿到C,且都等待另一把锁被释放,那么这三个线程都将陷入死锁。

为了避免死锁,常见的建议是始终按照相同的顺序锁定这两个互斥元。比如例1,如果总在锁定B之前锁定A,那么这两个线程永远不会死锁。这是理想条件,但是事实上大多互斥元服务于不同的目的,很难保证每次锁定的顺序相同,如果一味选择固定顺序锁定可能会使整个程序出现错误。暂时不考虑顺序的问题,C++标准库中的std::lock()允许用户同时锁定两个或更多的互斥元,同时不产生死锁。使用方法如下:

void swap(int& ldata, int& rdata)
{
    int temp;
    temp = ldata;
    ldata = rdata;
    rdata = temp;
}


class X
{
private:
    int _data;
    std::mutex m;
public:
    X(const int& data):_data(data){ }
    virtual ~X() {}

    friend void swap(X& lhs, X& rhs)
    {	
        //检查参数是不是相同的实例
        if (&lhs == &rhs) //试图在已经锁定的 std::mutex 上获取锁是未定义行为
        {
            return; //允许在同一线程中多重锁定的互斥元为 std::recursive_mutex
        }
        std::lock(lhs.m, rhs.m); //同时锁定两个互斥元
        
        //额外参数 std::adopt_lock 告知该方法,锁对象已被锁定,
        //并沿用已有锁的所有权而不是试图在构造函数中锁定互斥元。
        std::lock_guard<std::mutex> lock_l(lhs.m, std::adopt_lock);
        std::lock_guard<std::mutex> lock_r(rhs.m, std::adopt_lock);
        
        swap(lhs._data, rhs._data);
    }
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

在对std::lock的调用中,获取任何一个锁都可能引发异常,一旦在获取锁的过程中失败就会引发异常,之前获取的所有锁都会被释放。

3.4.2 避免死锁的进一步指南

死锁不仅仅产生与锁定,虽然这是最常见的诱因。还有一种情况,通过两个线程来制造死锁,不用锁定,只需要每个线程在 std::thread对象尚未另一个线程调用join()。在这种情况下,两个线程在互相等待,都无法获取进展。这种简单的问题可以发生在很多地方,比如一个线程在等待另一个线程完成,而另一个线程同时又在等待第一个线程,又或是三个线程互相等待,同上锁定造成死锁的两个例子。避免死锁的准则全都可以归结为一个思路,如果另外有一个线程可能在等待,那就别让它等。虽然这种方法有一些特殊情况不适用,但是能够解决大多数死锁的问题。

1.避免嵌套锁

第一个思路是最简单,如果已经持有一个锁那就别再获取其它锁。光凭单个锁是不可能造成死锁的。如果要获取多个锁最好使用std::lock,当然还有线程互相等待造成死锁的风险,但是至少在互斥元这方面不会造成死锁。

2.在持有锁时避免调用用户提供代码’

因为在用户提供的代码中,你不知道会做什么,如果在持有一个锁时下调用用户代码,就有可能获取其它锁进而产生死锁。有时候这种情况是无法避免的。如果在泛型编程中,在参数类型上的每一个操作都是用户提供的,这种情况下就需要新的准则。

3.以固定次序获取锁

如果绝对需要两个甚至多个锁,并且不能以std::lock单个操作获取锁,次优的做法是在每个线程中以固定的次序获取它们,在这种情况下无论有多少锁,由于获取次序是固定的,所以无论中间那个线程卡住了锁的获取流程,总会有一个线程拿够所需的锁,并在执行结束后释放,然后其它线程会慢慢”解锁“;如果后面没有其它线程,那么卡住锁的获取流程的线程很快就能拿全锁并开始执行,也就不会产生死锁。

4.使用层次锁

层次锁实际上是定义顺序锁的一个特例,但锁层次能够提供一种方法来检查在运行时是否遵守了约定。其思路是将应用程序分层,并确认所有能够在任意给定的层级上的互斥锁是否被锁定。当代码试图锁定一个互斥元时,如果它在较低层已经持有锁定,那么就不允许它锁定该互斥元。通过给每一个互斥元分配一个层号,并记录下每个线程都锁定了那些互斥元,就可以在运行时检查了。

#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <stack>

#include "hierarchical_mutex.h" //自己定义头文件

using namespace std;

hierarchical_mutex hight_level_mutex(10000); //高层次锁
hierarchical_mutex low_level_mutex(5000); //低层次锁

int do_low_level_stuff()
{
    return 0;
}
int low_leve_func() //获取低层次锁 执行相应方法
{
    std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
    return do_low_level_stuff();
}

void do_high_level_stuff(int data)
{
    if (data == 0)
        std::cout << "do_high_level_stuff(int data):data == 0" << std::endl;
}

void high_level_func() //获取高层次锁 并执行相应方法调用低层次方法
{
    std::lock_guard<hierarchical_mutex> lk(hight_level_mutex);
    do_high_level_stuff(low_leve_func());
}

void thread_a() //测试线程a
{
    high_level_func(); //由高到低 依次获取锁没有问题可以正确执行
}

hierarchical_mutex other_level_mutex(100); //其他层次锁
void do_other_level_stuff()
{
    std::cout << "do_other_level_stuff()" << std::endl;
}

void other_level_func() //先执行高层次函数 在调用 do_other_level_stuff
{
    high_level_func(); //目前没有问题
    do_other_level_stuff();
}

void thread_b() //测试线程b
{
    //获取其他层次锁
    std::lock_guard<hierarchical_mutex> lk(other_level_mutex);
    //调用other_level_func
    other_level_func(); //这里出现问题先拿到了低级锁(500)再要高层锁是非法的
}

int main(int argc, const char** argv) {

    thread ta(thread_a); //ta能正常执行
    thread tb(thread_b); //tb不能正常执行

    ta.join();
    tb.join();

    return 0;
}
/*************************************************
运行结果:
[wangs7@localhost 3rd_chapter]$ ./exec
do_high_level_stuff(int data):data == 0
terminate called after throwing an instance of 'std::logic_error'
  what():  mutex hierarchy violated
Aborted (core dumped)
*************************************************/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

这种层次锁是严格要求索取锁的层次要逐节向下的,不能越级向上拿锁,hierarchical_mutex.h的实现如下。比较简单不做分析。

#ifndef _HIERARCHICAL_MUTEX_
#define _HIERARCHICAL_MUTEX_

#include <mutex>
#include <climits>

class hierarchical_mutex
{
    std::mutex internal_mutex;
    unsigned long const hierarchy_value;
    unsigned long previous_hierarchy_value;
    static thread_local unsigned long this_thread_hierarchy_value;
    void check_for_hierarchy_violation()
    {
        if (this_thread_hierarchy_value <= hierarchy_value)
        {
            throw std::logic_error("mutex hierarchy violated");
        }
    }
    void update_hierarchy_value()
    {
        previous_hierarchy_value = this_thread_hierarchy_value;
        this_thread_hierarchy_value = hierarchy_value;
    }
public:
    explicit hierarchical_mutex(unsigned long value) :
        hierarchy_value(value),
        previous_hierarchy_value(0)
    {}

    void lock()
    {
        check_for_hierarchy_violation();
        internal_mutex.lock();
        update_hierarchy_value();
    }
    void unlock()
    {
        this_thread_hierarchy_value = previous_hierarchy_value;
        internal_mutex.unlock();
    }
    bool try_lock()
    {
        check_for_hierarchy_violation();
        if (!internal_mutex.try_lock())
            return false;
        update_hierarchy_value();
        return true;
    }

    static unsigned long get_thread_hierarchy_value()
    {
        return this_thread_hierarchy_value;
    }
};

thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);

#endif // !_HIERARCHICAL_MUTEX_
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
5.将这些设计准则扩展到锁之外

由于死锁不只是出现再锁定中,它可以发生再任何可以导致循环等待的同步结构中。因此扩展上面所述的准则来涵盖哪些情况也是值得的。举个例子,正如应该避免在持有所的时候等待另一个线程。因为该线程可能就会因为这个锁难以向下运行。类似的,如果要等待一个线程完成,指定线程的层次结构可能也是可行的,这样线程就只需要等待低层次上的线程。一个简单的做法就是,确保线程在启动它们的同一个函数中被join()【哪个函数启动的哪个函数join】。

【2021.11.01】

3.5 用 std::unique_lock 灵活锁定

推荐阅读