在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ C/ 4.1 等待一個事件或其他條件
3.4 本章總結(jié)
6.3 基于鎖設計更加復雜的數(shù)據(jù)結(jié)構
6.1 為并發(fā)設計的意義何在?
5.2 <code>C++</code>中的原子操作和原子類型
A.7 自動推導變量類型
2.1 線程管理的基礎
8.5 在實踐中設計并發(fā)代碼
2.4 運行時決定線程數(shù)量
2.2 向線程函數(shù)傳遞參數(shù)
第4章 同步并發(fā)操作
2.3 轉(zhuǎn)移線程所有權
8.3 為多線程性能設計數(shù)據(jù)結(jié)構
6.4 本章總結(jié)
7.3 對于設計無鎖數(shù)據(jù)結(jié)構的指導建議
關于這本書
A.1 右值引用
2.6 本章總結(jié)
D.2 &lt;condition_variable&gt;頭文件
A.6 變參模板
6.2 基于鎖的并發(fā)數(shù)據(jù)結(jié)構
4.5 本章總結(jié)
A.9 本章總結(jié)
前言
第10章 多線程程序的測試和調(diào)試
5.4 本章總結(jié)
第9章 高級線程管理
5.1 內(nèi)存模型基礎
2.5 識別線程
第1章 你好,C++的并發(fā)世界!
1.2 為什么使用并發(fā)?
A.5 Lambda函數(shù)
第2章 線程管理
4.3 限定等待時間
D.3 &lt;atomic&gt;頭文件
10.2 定位并發(fā)錯誤的技術
附錄B 并發(fā)庫的簡單比較
5.3 同步操作和強制排序
A.8 線程本地變量
第8章 并發(fā)代碼設計
3.3 保護共享數(shù)據(jù)的替代設施
附錄D C++線程庫參考
第7章 無鎖并發(fā)數(shù)據(jù)結(jié)構設計
D.7 &lt;thread&gt;頭文件
D.1 &lt;chrono&gt;頭文件
4.1 等待一個事件或其他條件
A.3 默認函數(shù)
附錄A 對<code>C++</code>11語言特性的簡要介紹
第6章 基于鎖的并發(fā)數(shù)據(jù)結(jié)構設計
封面圖片介紹
7.2 無鎖數(shù)據(jù)結(jié)構的例子
8.6 本章總結(jié)
8.1 線程間劃分工作的技術
4.2 使用期望等待一次性事件
8.4 設計并發(fā)代碼的注意事項
D.5 &lt;mutex&gt;頭文件
3.1 共享數(shù)據(jù)帶來的問題
資源
9.3 本章總結(jié)
10.3 本章總結(jié)
10.1 與并發(fā)相關的錯誤類型
D.4 &lt;future&gt;頭文件
3.2 使用互斥量保護共享數(shù)據(jù)
9.1 線程池
1.1 何謂并發(fā)
9.2 中斷線程
4.4 使用同步操作簡化代碼
A.2 刪除函數(shù)
1.3 C++中的并發(fā)和多線程
1.4 開始入門
第5章 C++內(nèi)存模型和原子類型操作
消息傳遞框架與完整的ATM示例
8.2 影響并發(fā)代碼性能的因素
7.1 定義和意義
D.6 &lt;ratio&gt;頭文件
A.4 常量表達式函數(shù)
7.4 本章總結(jié)
1.5 本章總結(jié)
第3章 線程間共享數(shù)據(jù)

4.1 等待一個事件或其他條件

假設你在旅游,而且正在一輛在夜間運行的火車上。在夜間,如何在正確的站點下車呢?一種方法是整晚都要醒著,然后注意到了哪一站。這樣,你就不會錯過你要到達的站點,但是這樣會讓你感到很疲倦。另外,你可以看一下時間表,估計一下火車到達目的地的時間,然后在一個稍早的時間點上設置鬧鈴,然后你就可以安心的睡會了。這個方法聽起來也很不錯,也沒有錯過你要下車的站點,但是當火車晚點的時候,你就要被過早的叫醒了。當然,鬧鐘的電池也可能會沒電了,并導致你睡過站。理想的方式是,無論是早或晚,只要當火車到站的時候,有人或其他東西能把你喚醒,就好了。

這和線程有什么關系呢?好吧,讓我們來聯(lián)系一下。當一個線程等待另一個線程完成任務時,它會有很多選擇。第一,它可以持續(xù)的檢查共享數(shù)據(jù)標志(用于做保護工作的互斥量),直到另一線程完成工作時對這個標志進行重設。不過,就是一種浪費:線程消耗寶貴的執(zhí)行時間持續(xù)的檢查對應標志,并且當互斥量被等待線程上鎖后,其他線程就沒有辦法獲取鎖,這樣線程就會持續(xù)等待。因為以上方式對等待線程限制資源,并且在完成時阻礙對標識的設置。這種情況類似與,保持清醒狀態(tài)和列車駕駛員聊了一晚上:駕駛員不得不緩慢駕駛,因為你分散了他的注意力,所以火車需要更長的時間,才能到站。同樣的,等待的線程會等待更長的時間,這些線程也在消耗著系統(tǒng)資源。

第二個選擇是在等待線程在檢查間隙,使用std::this_thread::sleep_for()進行周期性的間歇(詳見4.3節(jié)):

bool flag;
std::mutex m;

void wait_for_flag()
{
  std::unique_lock<std::mutex> lk(m);
  while(!flag)
  {
    lk.unlock();  // 1 解鎖互斥量
    std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 2 休眠100ms
    lk.lock();   // 3 再鎖互斥量
  }
}

這個循環(huán)中,在休眠前②,函數(shù)對互斥量進行解鎖①,并且在休眠結(jié)束后再對互斥量進行上鎖,所以另外的線程就有機會獲取鎖并設置標識。

這個實現(xiàn)就進步很多,因為當線程休眠時,線程沒有浪費執(zhí)行時間,但是很難確定正確的休眠時間。太短的休眠和沒有休眠一樣,都會浪費執(zhí)行時間;太長的休眠時間,可能會讓任務等待線程醒來。休眠時間過長是很少見的情況,因為這會直接影響到程序的行為,當在高節(jié)奏游戲中,它意味著丟幀,或在一個實時應用中超越了一個時間片。

第三個選擇(也是優(yōu)先的選擇)是,使用C++標準庫提供的工具去等待事件的發(fā)生。通過另一線程觸發(fā)等待事件的機制是最基本的喚醒方式(例如:流水線上存在額外的任務時),這種機制就稱為“條件變量”。從概念上來說,一個條件變量會與多個事件或其他條件相關,并且一個或多個線程會等待條件的達成。當某些線程被終止時,為了喚醒等待線程(允許等待線程繼續(xù)執(zhí)行)終止的線程將會向等待著的線程廣播“條件達成”的信息。

4.1.1 等待條件達成

C++標準庫對條件變量有兩套實現(xiàn):std::condition_variablestd::condition_variable_any。這兩個實現(xiàn)都包含在<condition_variable>頭文件的聲明中。兩者都需要與一個互斥量一起才能工作(互斥量是為了同步);前者僅限于與std::mutex一起工作,而后者可以和任何滿足最低標準的互斥量一起工作,從而加上了_any的后綴。因為std::condition_variable_any更加通用,這就可能從體積、性能,以及系統(tǒng)資源的使用方面產(chǎn)生額外的開銷,所以std::condition_variable一般作為首選的類型,當對靈活性有硬性要求時,我們才會去考慮std::condition_variable_any

所以,如何使用std::condition_variable去處理之前提到的情況——當有數(shù)據(jù)需要處理時,如何喚醒休眠中的線程對其進行處理?以下清單展示了一種使用條件變量做喚醒的方式。

清單4.1 使用std::condition_variable處理數(shù)據(jù)等待

std::mutex mut;
std::queue<data_chunk> data_queue;  // 1
std::condition_variable data_cond;

void data_preparation_thread()
{
  while(more_data_to_prepare())
  {
    data_chunk const data=prepare_data();
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(data);  // 2
    data_cond.notify_one();  // 3
  }
}

void data_processing_thread()
{
  while(true)
  {
    std::unique_lock<std::mutex> lk(mut);  // 4
    data_cond.wait(
         lk,[]{return !data_queue.empty();});  // 5
    data_chunk data=data_queue.front();
    data_queue.pop();
    lk.unlock();  // 6
    process(data);
    if(is_last_chunk(data))
      break;
  }
}

首先,你擁有一個用來在兩個線程之間傳遞數(shù)據(jù)的隊列①。當數(shù)據(jù)準備好時,使用std::lock_guard對隊列上鎖,將準備好的數(shù)據(jù)壓入隊列中②,之后線程會對隊列中的數(shù)據(jù)上鎖。然后調(diào)用std::condition_variable的notify_one()成員函數(shù),對等待的線程(如果有等待線程)進行通知③。

在另外一側(cè),你有一個正在處理數(shù)據(jù)的線程,這個線程首先對互斥量上鎖,但在這里std::unique_lock要比std::lock_guard④更加合適——且聽我細細道來。線程之后會調(diào)用std::condition_variable的成員函數(shù)wait(),傳遞一個鎖和一個lambda函數(shù)表達式(作為等待的條件⑤)。Lambda函數(shù)是C++11添加的新特性,它可以讓一個匿名函數(shù)作為其他表達式的一部分,并且非常合適作為標準函數(shù)的謂詞,例如wait()函數(shù)。在這個例子中,簡單的lambda函數(shù)[]{return !data_queue.empty();}會去檢查data_queue是否不為空,當data_queue不為空——那就意味著隊列中已經(jīng)準備好數(shù)據(jù)了。附錄A的A.5節(jié)有Lambda函數(shù)更多的信息。

wait()會去檢查這些條件(通過調(diào)用所提供的lambda函數(shù)),當條件滿足(lambda函數(shù)返回true)時返回。如果條件不滿足(lambda函數(shù)返回false),wait()函數(shù)將解鎖互斥量,并且將這個線程(上段提到的處理數(shù)據(jù)的線程)置于阻塞或等待狀態(tài)。當準備數(shù)據(jù)的線程調(diào)用notify_one()通知條件變量時,處理數(shù)據(jù)的線程從睡眠狀態(tài)中蘇醒,重新獲取互斥鎖,并且對條件再次檢查,在條件滿足的情況下,從wait()返回并繼續(xù)持有鎖。當條件不滿足時,線程將對互斥量解鎖,并且重新開始等待。這就是為什么用std::unique_lock而不使用std::lock_guard——等待中的線程必須在等待期間解鎖互斥量,并在這這之后對互斥量再次上鎖,而std::lock_guard沒有這么靈活。如果互斥量在線程休眠期間保持鎖住狀態(tài),準備數(shù)據(jù)的線程將無法鎖住互斥量,也無法添加數(shù)據(jù)到隊列中;同樣的,等待線程也永遠不會知道條件何時滿足。

清單4.1使用了一個簡單的lambda函數(shù)用于等待⑤,這個函數(shù)用于檢查隊列何時不為空,不過任意的函數(shù)和可調(diào)用對象都可以傳入wait()。當你已經(jīng)寫好了一個函數(shù)去做檢查條件(或許比清單中簡單檢查要復雜很多),那就可以直接將這個函數(shù)傳入wait();不一定非要放在一個lambda表達式中。在調(diào)用wait()的過程中,一個條件變量可能會去檢查給定條件若干次;然而,它總是在互斥量被鎖定時這樣做,當且僅當提供測試條件的函數(shù)返回true時,它就會立即返回。當?shù)却€程重新獲取互斥量并檢查條件時,如果它并非直接響應另一個線程的通知,這就是所謂的偽喚醒(spurious wakeup)。因為任何偽喚醒的數(shù)量和頻率都是不確定的,這里不建議使用一個有副作用的函數(shù)做條件檢查。當你這樣做了,就必須做好多次產(chǎn)生副作用的心理準備。

解鎖std::unique_lock的靈活性,不僅適用于對wait()的調(diào)用;它還可以用于有待處理但還未處理的數(shù)據(jù)⑥。處理數(shù)據(jù)可能是一個耗時的操作,并且如你在第3章見到的,你就知道持有鎖的時間過長是一個多么糟糕的主意。

使用隊列在多個線程中轉(zhuǎn)移數(shù)據(jù)(如清單4.1)是很常見的。做得好的話,同步操作可以限制在隊列本身,同步問題和條件競爭出現(xiàn)的概率也會降低。鑒于這些好處,現(xiàn)在從清單4.1中提取出一個通用線程安全的隊列。

4.1.2 使用條件變量構建線程安全隊列

當你正在設計一個通用隊列時,花一些時間想想有哪些操作需要添加到隊列實現(xiàn)中去,就如之前在3.2.3節(jié)看到的線程安全的棧??梢钥匆幌翪++標準庫提供的實現(xiàn),找找靈感;std::queue<>容器的接口展示如下:

清單4.2 std::queue接口

template <class T, class Container = std::deque<T> >
class queue {
public:
  explicit queue(const Container&);
  explicit queue(Container&& = Container());
  template <class Alloc> explicit queue(const Alloc&);
  template <class Alloc> queue(const Container&, const Alloc&);
  template <class Alloc> queue(Container&&, const Alloc&);
  template <class Alloc> queue(queue&&, const Alloc&);

  void swap(queue& q);

  bool empty() const;
  size_type size() const;

  T& front();
  const T& front() const;
  T& back();
  const T& back() const;

  void push(const T& x);
  void push(T&& x);
  void pop();
  template <class... Args> void emplace(Args&&... args);
};

當你忽略構造、賦值以及交換操作時,你就剩下了三組操作:1. 對整個隊列的狀態(tài)進行查詢(empty()和size());2.查詢在隊列中的各個元素(front()和back());3.修改隊列的操作(push(), pop()和emplace())。這就和3.2.3中的棧一樣了,因此你也會遇到在固有接口上的條件競爭。因此,你需要將front()和pop()合并成一個函數(shù)調(diào)用,就像之前在棧實現(xiàn)時合并top()和pop()一樣。與清單4.1中的代碼不同的是:當使用隊列在多個線程中傳遞數(shù)據(jù)時,接收線程通常需要等待數(shù)據(jù)的壓入。這里我們提供pop()函數(shù)的兩個變種:try_pop()和wait_and_pop()。try_pop() ,嘗試從隊列中彈出數(shù)據(jù),總會直接返回(當有失敗時),即使沒有值可檢索;wait_and_pop(),將會等待有值可檢索的時候才返回。當你使用之前棧的方式來實現(xiàn)你的隊列,你實現(xiàn)的隊列接口就可能會是下面這樣:

清單4.3 線程安全隊列的接口

#include <memory> // 為了使用std::shared_ptr

template<typename T>
class threadsafe_queue
{
public:
  threadsafe_queue();
  threadsafe_queue(const threadsafe_queue&);
  threadsafe_queue& operator=(
      const threadsafe_queue&) = delete;  // 不允許簡單的賦值

  void push(T new_value);

  bool try_pop(T& value);  // 1
  std::shared_ptr<T> try_pop();  // 2

  void wait_and_pop(T& value);
  std::shared_ptr<T> wait_and_pop();

  bool empty() const;
};

就像之前對棧做的那樣,在這里你將很多構造函數(shù)剪掉了,并且禁止了對隊列的簡單賦值。和之前一樣,你也需要提供兩個版本的try_pop()和wait_for_pop()。第一個重載的try_pop()①在引用變量中存儲著檢索值,所以它可以用來返回隊列中值的狀態(tài);當檢索到一個變量時,他將返回true,否則將返回false(詳見A.2節(jié))。第二個重載②就不能做這樣了,因為它是用來直接返回檢索值的。當沒有值可檢索時,這個函數(shù)可以返回NULL指針。

那么問題來了,如何將以上這些和清單4.1中的代碼相關聯(lián)呢?好吧,我們現(xiàn)在就來看看怎么去關聯(lián)。你可以從之前的代碼中提取push()和wait_and_pop(),如以下清單所示。

清單4.4 從清單4.1中提取push()和wait_and_pop()

#include <queue>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
  std::mutex mut;
  std::queue<T> data_queue;
  std::condition_variable data_cond;
public:
  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }

  void wait_and_pop(T& value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    value=data_queue.front();
    data_queue.pop();
  }
};
threadsafe_queue<data_chunk> data_queue;  // 1

void data_preparation_thread()
{
  while(more_data_to_prepare())
  {
    data_chunk const data=prepare_data();
    data_queue.push(data);  // 2
  }
}

void data_processing_thread()
{
  while(true)
  {
    data_chunk data;
    data_queue.wait_and_pop(data);  // 3
    process(data);
    if(is_last_chunk(data))
      break;
  }
}

線程隊列的實例中包含有互斥量和條件變量,所以獨立的變量就不需要了①,并且調(diào)用push()也不需要外部同步②。當然,wait_and_pop()還要兼顧條件變量的等待③。

另一個wait_and_pop()函數(shù)的重載寫起來就很瑣碎了,剩下的函數(shù)就像從清單3.5實現(xiàn)的棧中一個個的粘過來一樣。最終的隊列實現(xiàn)如下所示。

清單4.5 使用條件變量的線程安全隊列(完整版)

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
  mutable std::mutex mut;  // 1 互斥量必須是可變的 
  std::queue<T> data_queue;
  std::condition_variable data_cond;
public:
  threadsafe_queue()
  {}
  threadsafe_queue(threadsafe_queue const& other)
  {
    std::lock_guard<std::mutex> lk(other.mut);
    data_queue=other.data_queue;
  }

  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }

  void wait_and_pop(T& value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    value=data_queue.front();
    data_queue.pop();
  }

  std::shared_ptr<T> wait_and_pop()
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool try_pop(T& value)
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return false;
    value=data_queue.front();
    data_queue.pop();
    return true;
  }

  std::shared_ptr<T> try_pop()
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return std::shared_ptr<T>();
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool empty() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
};

empty()是一個const成員函數(shù),并且傳入拷貝構造函數(shù)的other形參是一個const引用;因為其他線程可能有這個類型的非const引用對象,并調(diào)用變種成員函數(shù),所以這里有必要對互斥量上鎖。如果鎖住互斥量是一個可變操作,那么這個互斥量對象就會標記為可變的①,之后他就可以在empty()和拷貝構造函數(shù)中上鎖了。

條件變量在多個線程等待同一個事件時,也是很有用的。當線程用來分解工作負載,并且只有一個線程可以對通知做出反應,與清單4.1中使用的結(jié)構完全相同;運行多個數(shù)據(jù)實例——處理線程(processing thread)。當新的數(shù)據(jù)準備完成,調(diào)用notify_one()將會觸發(fā)一個正在執(zhí)行wait()的線程,去檢查條件和wait()函數(shù)的返回狀態(tài)(因為你僅是向data_queue添加一個數(shù)據(jù)項)。 這里不保證線程一定會被通知到,即使只有一個等待線程被通知時,所有處線程也有可能都在處理數(shù)據(jù)。

另一種可能是,很多線程等待同一事件,對于通知他們都需要做出回應。這會發(fā)生在共享數(shù)據(jù)正在初始化的時候,當處理線程可以使用同一數(shù)據(jù)時,就要等待數(shù)據(jù)被初始化(有不錯的機制可用來應對;可見第3章,3.3.1節(jié)),或等待共享數(shù)據(jù)的更新,比如,定期重新初始化(periodic reinitialization)。在這些情況下,準備線程準備數(shù)據(jù)數(shù)據(jù)時,就會通過條件變量調(diào)用notify_all()成員函數(shù),而非直接調(diào)用notify_one()函數(shù)。顧名思義,這就是全部線程在都去執(zhí)行wait()(檢查他們等待的條件是否滿足)的原因。

當?shù)却€程只等待一次,當條件為true時,它就不會再等待條件變量了,所以一個條件變量可能并非同步機制的最好選擇。尤其是,條件在等待一組可用的數(shù)據(jù)塊時。在這樣的情況下,期望(future)就是一個適合的選擇。