码迷,mamicode.com
首页 > 编程语言 > 详细

C++并发编程 02 数据共享

时间:2018-12-22 18:39:47      阅读:146      评论:0      收藏:0      [点我收藏+]

标签:已经锁定   避免   ssi   lis   数据同步   release   lse   读取   保护   

在《C++并发编程实战》这本书中第3章主要将的是多线程之间的数据共享同步问题。在多线程之间需要进行数据同步的主要是条件竞争。

std::lock_guard<std::mutex>

#include <list>
#include <mutex>
#include <algorithm>

std::list<int> some_list;
std::mutex some_mutex;

void add_to_list(int new_value)
{
    std::lock_guard<std::mutex> guard(some_mutex);
    some_list.push_back(new_value);
}
bool list_contains(int value_to_find)
{
    std::lock_guard<std::mutex> guard(some_mutex);
    return std::find(some_list.begin(),some_list.end(),value_to_find)
        != some_list.end();
}

#include <iostream>

int main()
{
    add_to_list(42);
    std::cout<<"contains(1)="<<list_contains(1)<<", contains(42)="<<list_contains(42)<<std::endl;
}

在上述代码中使用了std::lock_guard<>模板,使用该模板定义的mutex在栈空间分配空间,在构造函数中会对传入的mutex变量进行加锁,并在函数运行结束时在其析构函数中调同mutex.unlock()来自动解锁。我们来看看lock_gard<T>的定义:

 

  /** @brief A simple scoped lock type.
   *
   * A lock_guard controls mutex ownership within a scope, releasing
   * ownership in the destructor.
   */
  template<typename _Mutex>
    class lock_guard
    {
    public:
      typedef _Mutex mutex_type;

      explicit lock_guard(mutex_type& __m) : _M_device(__m)
      { _M_device.lock(); }

      lock_guard(mutex_type& __m, adopt_lock_t) noexcept : _M_device(__m)
      { } // calling thread owns mutex

      ~lock_guard()
      { _M_device.unlock(); }   

      lock_guard(const lock_guard&) = delete;
      lock_guard& operator=(const lock_guard&) = delete;

    private:
      mutex_type&  _M_device;
    };

 2 std::lock(std::mutex,std::mutex)

对于需要一次锁定两个变量的场景,可以使用std::lock(std::mutex,std::mutex)来一次性锁定两个变量,比如在进行swap交换两个变量的值的场景,可以这么使用:

 1 #include <mutex>
 2 
 3 class some_big_object
 4 {};
 5 
 6 void swap(some_big_object& lhs,some_big_object& rhs)
 7 {}
 8 
 9 class X
10 {
11 private:
12     some_big_object some_detail;
13     mutable std::mutex m;
14 public:
15     X(some_big_object const& sd):some_detail(sd){}
16 
17     friend void swap(X& lhs, X& rhs)
18     {
19         if(&lhs==&rhs)
20             return;
21         std::lock(lhs.m,rhs.m);
22         std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);
23         std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);
24         swap(lhs.some_detail,rhs.some_detail);
25     }
26 };
27 
28 int main()
29 {}

例子中some_big_object类的实例是需要保护的对象,在交换时,调用 std::lock() 21行锁住两个互斥量,并且两个 std:lock_guard 实例已经创建好了22,23行,还有一个互斥量。提供 std::adopt_lock 参数除了表示 std::lock_guard 的对象已经上锁外,还表示应使用互斥量现成的锁,而非尝试创建新的互斥锁。 然后调用非锁定版本的swap方法来进行交换操作,以确保操作的原子性(中间态不可修改)

3 std::recursive_mutex可重入锁

对于一个std::mutex对象,如果你已经锁定它了,那么在unock()之前你就不可以再次对他执行lock()函数。但在有些情况下,可能希望在解锁前多次锁定一个mutex。对于这种情况,C++标准库提供了std::recursive_mutex。他与std::mutex用法一样,除了可以在一个线程中多次lock一个recursive_lock。但是它要求,在其他线程使用这个lock前,你必须保证使用的lock次数和unlock的次数是一样的。对于这点,正确使用std::lock_guard<std::recursive_mutex>std::unique_lock<std::recursive_mutex>可以达到目的。

通常情况下,如果你想要使用std::recursive_mutex,都要改变程序的设计。一种常见的情况是,一个类需要保护共享数据,所有的公有函数都锁定一个mutex。但有时候一个公有函数需要调用另一个公有函数,这样,另一个公有函数也需要去锁定mutex,这会导致重复锁定同一个mutex,产生未定义行为。一种投机取巧的办法是将mutex换成std::recursive_mutex,使得另一个函数可以毫无顾忌的执行锁定操作。

但是,这种做法是不提倡的。因为这是一种草率的做法、不成熟的设计。保持一个锁状态去意味着类的稳定性被破坏了,也就是说第二个公有函数是在类的稳定性被破坏的前提下被调用的。更好的做法是,提取出一个被两个个公有函数调用的私有函数,这个私有函数无需锁定mutex

4. std::unique_lock

std::unique_lock相比std::lock_gard具有以下特点:

a) unique_lock本身不存储mutex实例,存储空间占用较大,运行速度要慢一些;

b)  更加丰富的API,锁定方式灵活,实现延迟锁定,随时锁定解锁,至此移动构造函数,实现所有权转移。看一下std::unique_lock的定义

 

/** @brief A movable scoped lock type.
   *
   * A unique_lock controls mutex ownership within a scope. Ownership of the
   * mutex can be delayed until after construction and can be transferred
   * to another unique_lock by move construction or move assignment. If a
   * mutex lock is owned when the destructor runs ownership will be released.
   */
  template<typename _Mutex>
    class unique_lock
    {
    public:
      typedef _Mutex mutex_type;

      unique_lock() noexcept
      : _M_device(0), _M_owns(false)
      { }

      explicit unique_lock(mutex_type& __m)
      : _M_device(std::__addressof(__m)), _M_owns(false)
      {
    lock();
    _M_owns = true;
      }

      unique_lock(mutex_type& __m, defer_lock_t) noexcept
      : _M_device(std::__addressof(__m)), _M_owns(false)
      { }

      unique_lock(mutex_type& __m, try_to_lock_t)
      : _M_device(std::__addressof(__m)), _M_owns(_M_device->try_lock())
      { }

      unique_lock(mutex_type& __m, adopt_lock_t) noexcept
      : _M_device(std::__addressof(__m)), _M_owns(true)
      {
    // XXX calling thread owns mutex
      }

      template<typename _Clock, typename _Duration>
    unique_lock(mutex_type& __m,
            const chrono::time_point<_Clock, _Duration>& __atime)
    : _M_device(std::__addressof(__m)),
      _M_owns(_M_device->try_lock_until(__atime))
    { }

      template<typename _Rep, typename _Period>
    unique_lock(mutex_type& __m,
            const chrono::duration<_Rep, _Period>& __rtime)
    : _M_device(std::__addressof(__m)),
      _M_owns(_M_device->try_lock_for(__rtime))
    { }

      ~unique_lock()
      {
    if (_M_owns)
      unlock();
      }

      unique_lock(const unique_lock&) = delete;
      unique_lock& operator=(const unique_lock&) = delete;

      unique_lock(unique_lock&& __u) noexcept
      : _M_device(__u._M_device), _M_owns(__u._M_owns)
      {
    __u._M_device = 0;
    __u._M_owns = false;
      }

      unique_lock& operator=(unique_lock&& __u) noexcept
      {
    if(_M_owns)
      unlock();

    unique_lock(std::move(__u)).swap(*this);

    __u._M_device = 0;
    __u._M_owns = false;

    return *this;
      }

      void
      lock()
      {
    if (!_M_device)
      __throw_system_error(int(errc::operation_not_permitted));
    else if (_M_owns)
      __throw_system_error(int(errc::resource_deadlock_would_occur));
    else
      {
        _M_device->lock();
        _M_owns = true;
      }
      }

      bool
      try_lock()
      {
    if (!_M_device)
      __throw_system_error(int(errc::operation_not_permitted));
    else if (_M_owns)
      __throw_system_error(int(errc::resource_deadlock_would_occur));
    else
      {
        _M_owns = _M_device->try_lock();
        return _M_owns;
      }
      }

      template<typename _Clock, typename _Duration>
    bool
    try_lock_until(const chrono::time_point<_Clock, _Duration>& __atime)
    {
      if (!_M_device)
        __throw_system_error(int(errc::operation_not_permitted));
      else if (_M_owns)
        __throw_system_error(int(errc::resource_deadlock_would_occur));
      else
        {
          _M_owns = _M_device->try_lock_until(__atime);
          return _M_owns;
        }
    }

      template<typename _Rep, typename _Period>
    bool
    try_lock_for(const chrono::duration<_Rep, _Period>& __rtime)
    {
      if (!_M_device)
        __throw_system_error(int(errc::operation_not_permitted));
      else if (_M_owns)
        __throw_system_error(int(errc::resource_deadlock_would_occur));
      else
        {
          _M_owns = _M_device->try_lock_for(__rtime);
          return _M_owns;
        }
     }

      void
      unlock()
      {
    if (!_M_owns)
      __throw_system_error(int(errc::operation_not_permitted));
    else if (_M_device)
      {
        _M_device->unlock();
        _M_owns = false;
      }
      }

      void
      swap(unique_lock& __u) noexcept
      {
    std::swap(_M_device, __u._M_device);
    std::swap(_M_owns, __u._M_owns);
      }

      mutex_type*
      release() noexcept
      {
    mutex_type* __ret = _M_device;
    _M_device = 0;
    _M_owns = false;
    return __ret;
      }

      bool
      owns_lock() const noexcept
      { return _M_owns; }

      explicit operator bool() const noexcept
      { return owns_lock(); }

      mutex_type*
      mutex() const noexcept
      { return _M_device; }

    private:
      mutex_type*    _M_device;
      bool        _M_owns; // XXX use atomic_bool
    };

  /// Swap overload for unique_lock objects.
  template<typename _Mutex>
    inline void
    swap(unique_lock<_Mutex>& __x, unique_lock<_Mutex>& __y) noexcept
    { __x.swap(__y); }

  // @} group mutexes
_GLIBCXX_END_NAMESPACE_VERSION
} // namespace

 

使用std::unique_lock进行交换的例子如下:

 1 #include <mutex>
 2 
 3 class some_big_object
 4 {};
 5 
 6 void swap(some_big_object& lhs,some_big_object& rhs)
 7 {}
 8 
 9 class X
10 {
11 private:
12     some_big_object some_detail;
13     mutable std::mutex m;
14 public:
15     X(some_big_object const& sd):some_detail(sd){}
16 
17     friend void swap(X& lhs, X& rhs)
18     {
19         if(&lhs==&rhs)
20             return;
21         std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock);
22         std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock);
23         std::lock(lock_a,lock_b);
24         swap(lhs.some_detail,rhs.some_detail);
25     }
26 };
27 
28 int main()
29 {}

第21,22行中std::defer_lock的意思是表明互斥量在结构上应该保持解锁状态。这样,就可以被后面调用lock()函数的 std::unique_lock 对象(不是互斥量)所获取.

 使用std::unique_lock进行灵活锁定并延迟初始化的例子如下:

 1 #include <memory>
 2 #include <mutex>
 3 
 4 struct some_resource
 5 {
 6     void do_something()
 7     {}
 8     
 9 };
10 
11 
12 std::shared_ptr<some_resource> resource_ptr;
13 std::mutex resource_mutex;
14 void foo()
15 {
16     std::unique_lock<std::mutex> lk(resource_mutex);
17     if(!resource_ptr)
18     {
19         resource_ptr.reset(new some_resource);
20     }
21     lk.unlock();
22     resource_ptr->do_something();
23 }
24 
25 int main()
26 {
27     foo();
28 }

例子中第12行代表某种共享的昂贵初始化资源,我们可以在真正使用到这个被变量的时候,使用std::unique_lock对共享变量进行保护 16行,在检验对象没有初始化17行的时候,动态初始化该资源,并在初始化完成后立刻解锁 21行。后面的22行就可以实现并发处理了。

注意:此案例仅用于说明std::unique_lock的灵活解锁模式,实际上不建议这么使用来作延迟初始化。更好的方式见后面条款6 std::once_flag与std::call_once.

 

5.死锁

线程在占有了一个资源时,还需要另外的资源才能完成一个操作。而两个以上的线程,互相在等待着别的线程占有的资源,整个系统陷入停顿状态。死锁产生的原因很多:
a). 可能发生在多线程中每个线程需要锁定两个及以上的互斥元的情况,所以叫死锁。

b). 可能与mutex、lock无关。比如,两个线程互相join对方,等待对方终止,而自己不会先终止,这就会陷入死锁。

 作者在书中给出了避免死锁的方法有:
a) 同时锁定std::lock(mutex,mutex),

b) 避免重复加锁(如果实在有这种需求,可以使用std::recursive_mutex互斥量,或重新设计接口).

c) 多线程之间以固定顺序获取锁,

d) 使用层次锁,在锁建立的时候,为每种锁确定一个层次值,这个值到后面会允许,低的层次可以在高层次的基础上继续上锁,但反之不行。同时同层的不能再锁。

并且作者在书中给出了一个层次锁的代码案例:

 1 #include <mutex>
 2 #include <stdexcept>
 3 
 4 class hierarchical_mutex
 5 {
 6     std::mutex internal_mutex;
 7     unsigned long const hierarchy_value;
 8     unsigned long previous_hierarchy_value;
 9     static thread_local unsigned long this_thread_hierarchy_value;
10 
11     void check_for_hierarchy_violation()
12     {
13         if(this_thread_hierarchy_value <= hierarchy_value)
14         {
15             throw std::logic_error("mutex hierarchy violated");
16         }
17     }
18     void update_hierarchy_value()
19     {
20         previous_hierarchy_value=this_thread_hierarchy_value;
21         this_thread_hierarchy_value=hierarchy_value;
22     }
23 public:
24     explicit hierarchical_mutex(unsigned long value):
25         hierarchy_value(value),
26         previous_hierarchy_value(0)
27     {}
28     void lock()
29     {
30         check_for_hierarchy_violation();
31         internal_mutex.lock();
32         update_hierarchy_value();
33     }
34     void unlock()
35     {
36         this_thread_hierarchy_value=previous_hierarchy_value;
37         internal_mutex.unlock();
38     }
39     bool try_lock()
40     {
41         check_for_hierarchy_violation();
42         if(!internal_mutex.try_lock())
43             return false;
44         update_hierarchy_value();
45         return true;
46     }
47 };
48 thread_local unsigned long
49     hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX); 

说明如下:

a) 使用了thread_local的值来代表当前线程的层级值:this_thread_hierarchy_value。它被初始话为最大值49行,所以最初所有线程都能被锁住。因为其声明中有thread_local,所以每个线程都有其拷贝副本,这样在线程中变量的状态就完全独立了,当从另一个线程进行读取时,变量的状态也是完全独立的。

b) 第一次线程锁住一个hierarchical_mutex时,this_thread_hierarchy_value的值是ULONG_MAX。由于其本身的性质,这个值会大于其他任何值,所以会通过check_for_hierarchy_vilation() 13行的检查。在这种检查方式下,lock()代表内部互斥锁已被锁住31行。一旦成功锁住,你可以更新层级值了32行。

c) 当你现在锁住另一个hierarchical_mutex时,还持有第一个锁,this_thread_hierarchy_value的值将会显示第一个互斥量的层级值。第二个互斥量的层级值必须小于已经持有互斥量检查函数2才能通过。
d) 当前线程存储之前的层级值,所以你可以调用unlock() 36行对层级值进行保存;否则,你就锁不住任何互斥量(第二个互斥量的层级数高于第一个互斥量),即使线程没有持有任何锁。因为你保存了之前的层级值,只有当你持有internal_mutex 20行,并且在解锁内部互斥量 36行之前存储它的层级值,你才能安全的将hierarchical_mutex自身进行存储。这是因为hierarchical_mutex被内部互斥量的锁所保护着。
e) try_lock()与lock()的功能相似,除非在调用internal_mutex的try_lock() 42行失败时,然后你就不能持有对应锁了,所以不必更新层级值,并直接返回false就好。


虽然是运行时检测,但是它至少没有时间依赖性——不必去等待那些导致死锁出现的罕见条件。同时,设计过程需要去拆分应用,互斥量在这样的情况下可以帮助消除很多可能导致死锁的情况。

 

6. std:once_flag 与std::call_once

使用std::once_flag与std::call_once一起配合来完成延迟初始化的过程。

在《C++并发编程实战》这本书中,作者狠狠批评了大家在使用多线程开发中经常使用的一个双重锁定的代码写法,按照我的印象,好像很多单例模式都是这么写的吧,哈哈!!作者对这种写法使用的形容词是“声名狼藉”!!!直接上案例代码片段:

 1 struct some_resource
 2 {
 3     void do_something()
 4     {
 5     }
 6 };
 7 
 8 std::shared_ptr<some_resource> resource_ptr;
 9 std::mutex resource_mutex;
10 void undefined_behaviour_with_double_checked_locking()
11 {
12     if (!resource_ptr) // 1
13     {
14         std::lock_guard<std::mutex> lk(resource_mutex);
15         if (!resource_ptr) // 2
16         {
17             resource_ptr.reset(new some_resource); // 3
18         }
19     }
20     resource_ptr->do_something();
21     // 4
22 }

 在此案例中,第12行检查共享变量resource_ptr智能指针是否初始化,如果没有初始化,则14行构造出一个std:lock_gard锁住互斥量,并在15行对共享变量进行再次,在共享变量被保护的情况下,如果共享变量依然没有被初始化,则在17行完成对共享资源的初始化操作。看起来好像是对的,但是内里玄机暗藏。解释说明如下:

因为外部的读取锁12行没有与内部的写入锁进行同步17行。因此就会产生条件竞争,这个条件竞争不仅覆盖指针本身,还会影响到其指向的对象;即使一个线程知道另一个线程完成对指针进行写入,它可能没有看到新创建的some_resource实例,然后调用do_something() 20行后,得到不正确的结果。这个例子是在一种典型的条件竞争——数据竞争,C++标准中这就会被指定为“未定义行为”(underfined behavior)。这种竞争肯定是可以避免的。

 

解决办法为使用std::once_flag和std::once_call,直接上代码:

 1 #include <mutex>
 2 
 3 struct connection_info
 4 {};
 5 
 6 struct data_packet
 7 {};
 8 
 9 struct connection_handle
10 {
11     void send_data(data_packet const&)
12     {}
13     data_packet receive_data()
14     {
15         return data_packet();
16     }
17 };
18 
19 struct remote_connection_manager
20 {
21     connection_handle open(connection_info const&)
22     {
23         return connection_handle();
24     }
25 } connection_manager;
26 
27 
28 class X
29 {
30 private:
31     connection_info connection_details;
32     connection_handle connection;
33     std::once_flag connection_init_flag;
34 
35     void open_connection()
36     {
37         connection=connection_manager.open(connection_details);
38     }
39 public:
40     X(connection_info const& connection_details_):
41         connection_details(connection_details_)
42     {}
43     void send_data(data_packet const& data)
44     {
45         std::call_once(connection_init_flag,&X::open_connection,this);
46         connection.send_data(data);
47     }
48     data_packet receive_data()
49     {
50         std::call_once(connection_init_flag,&X::open_connection,this);
51         return connection.receive_data();
52     }
53 };

代码描述的是一个数据访问的例子,我们以数据库访问为例吧,类X封装了数据库访问逻辑,其中的connection_handle connection 创建是比较昂贵的资源。这里使用了std::once_flag connection_init_flag来标识connection是否已经初始化。并在send_data,receive_data

中需要和数据库进行通讯操作时才进行具体的初始化,配对std::call_once来调用具体的初始化代码,并在成功后设置std::once_flag。每个线程只需要使用 std::call_once ,在 std::call_once 的结束时,就能安全的知道指针已经被其他的线程初始化了。使用 std::call_once 比显式使用互斥量消耗的资源更少,特别是当初始化完成后。

 

当然作者也提出了一个延迟加载的替代方案,具体说明请参见Effective C++ 3nd.

class my_class;
my_class& get_my_class_instance()
{
static my_class instance; // 线程安全的初始化过程
return instance;
}

 

 

7. 接口设计导致的竞争条件

 先来看一个存在条件竞争的stack的接口设计

 1 #include <deque>
 2 template<typename T,typename Container=std::deque<T> >
 3 class stack
 4 {
 5 public:
 6     explicit stack(const Container&);
 7     explicit stack(Container&& = Container());
 8     template <class Alloc> explicit stack(const Alloc&);
 9     template <class Alloc> stack(const Container&, const Alloc&);
10     template <class Alloc> stack(Container&&, const Alloc&);
11     template <class Alloc> stack(stack&&, const Alloc&);
12 
13     bool empty() const;
14     size_t size() const;
15     T& top();
16     T const& top() const;
17     void push(T const&);
18     void push(T&&);
19     void pop();
20     void swap(stack&&);
21 };

在这个stack的接口中,是存在竞争条件存在的。 主要发生在size(),empty(),top()和pop()接口中。

1. 在多线程情况下,即使使用了mutex对stack进行保护,在empty()和size()函数调用完成后,另外的线程可能进行了push()或pop()操作,这个就会导致empty(),size()调用返回的值只能保证在返回那一刻是正确的,后续就不对了。

2. 从stack弹出一个值的操作分为两步进行,即先调用top获取栈顶元素,再调用pop删除元素。用于是采用了两个api来操作的,这个可能会破环中间的不可变态。

解决办法是从接口层面进行重新设计,将size() api去掉,并将top和pop 进行合并.

 1 #include <exception>
 2 #include <stack>
 3 #include <mutex>
 4 #include <memory>
 5 
 6 struct empty_stack: std::exception
 7 {
 8     const char* what() const throw()
 9     {
10         return "empty stack";
11     }
12     
13 };
14 
15 template<typename T>
16 class threadsafe_stack
17 {
18 private:
19     std::stack<T> data;
20     mutable std::mutex m;
21 public:
22     threadsafe_stack(){}
23     threadsafe_stack(const threadsafe_stack& other)
24     {
25         std::lock_guard<std::mutex> lock(other.m);
26         data=other.data;
27     }
28     threadsafe_stack& operator=(const threadsafe_stack&) = delete;
29 
30     void push(T new_value)
31     {
32         std::lock_guard<std::mutex> lock(m);
33         data.push(new_value);
34     }
35     std::shared_ptr<T> pop()
36     {
37         std::lock_guard<std::mutex> lock(m);
38         if(data.empty()) throw empty_stack();
39         std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
40         data.pop();
41         return res;
42     }
43     void pop(T& value)
44     {
45         std::lock_guard<std::mutex> lock(m);
46         if(data.empty()) throw empty_stack();
47         value=data.top();
48         data.pop();
49     }
50     bool empty() const
51     {
52         std::lock_guard<std::mutex> lock(m);
53         return data.empty();
54     }
55 };

 

C++并发编程 02 数据共享

标签:已经锁定   避免   ssi   lis   数据同步   release   lse   读取   保护   

原文地址:https://www.cnblogs.com/lenmom/p/10161092.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!