假設你在旅游,而且正在一輛在夜間運行的火車上。在夜間,如何在正確的站點下車呢?一種方法是整晚都要醒著,然后注意到了哪一站。這樣,你就不會錯過你要到達的站點,但是這樣會讓你感到很疲倦。另外,你可以看一下時間表,估計一下火車到達目的地的時間,然后在一個稍早的時間點上設置鬧鈴,然后你就可以安心的睡會了。這個方法聽起來也很不錯,也沒有錯過你要下車的站點,但是當火車晚點的時候,你就要被過早的叫醒了。當然,鬧鐘的電池也可能會沒電了,并導致你睡過站。理想的方式是,無論是早或晚,只要當火車到站的時候,有人或其他東西能把你喚醒,就好了。
這和線程有什么關系呢?好吧,讓我們來聯(lián)系一下。當一個線程等待另一個線程完成任務時,它會有很多選擇。第一,它可以持續(xù)的檢查共享數(shù)據(jù)標志(用于做保護工作的互斥量),直到另一線程完成工作時對這個標志進行重設。不過,就是一種浪費:線程消耗寶貴的執(zhí)行時間持續(xù)的檢查對應標志,并且當互斥量被等待線程上鎖后,其他線程就沒有辦法獲取鎖,這樣線程就會持續(xù)等待。因為以上方式對等待線程限制資源,并且在完成時阻礙對標識的設置。這種情況類似與,保持清醒狀態(tài)和列車駕駛員聊了一晚上:駕駛員不得不緩慢駕駛,因為你分散了他的注意力,所以火車需要更長的時間,才能到站。同樣的,等待的線程會等待更長的時間,這些線程也在消耗著系統(tǒng)資源。
第二個選擇是在等待線程在檢查間隙,使用std::this_thread::sleep_for()進行周期性的間歇(詳見4.3節(jié)):
bool flag;
std::mutex m;
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock(); // 1 解鎖互斥量
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms
lk.lock(); // 3 再鎖互斥量
}
}
這個循環(huán)中,在休眠前②,函數(shù)對互斥量進行解鎖①,并且在休眠結(jié)束后再對互斥量進行上鎖,所以另外的線程就有機會獲取鎖并設置標識。
這個實現(xiàn)就進步很多,因為當線程休眠時,線程沒有浪費執(zhí)行時間,但是很難確定正確的休眠時間。太短的休眠和沒有休眠一樣,都會浪費執(zhí)行時間;太長的休眠時間,可能會讓任務等待線程醒來。休眠時間過長是很少見的情況,因為這會直接影響到程序的行為,當在高節(jié)奏游戲中,它意味著丟幀,或在一個實時應用中超越了一個時間片。
第三個選擇(也是優(yōu)先的選擇)是,使用C++標準庫提供的工具去等待事件的發(fā)生。通過另一線程觸發(fā)等待事件的機制是最基本的喚醒方式(例如:流水線上存在額外的任務時),這種機制就稱為“條件變量”。從概念上來說,一個條件變量會與多個事件或其他條件相關,并且一個或多個線程會等待條件的達成。當某些線程被終止時,為了喚醒等待線程(允許等待線程繼續(xù)執(zhí)行)終止的線程將會向等待著的線程廣播“條件達成”的信息。
C++標準庫對條件變量有兩套實現(xiàn):std::condition_variable和std::condition_variable_any。這兩個實現(xiàn)都包含在<condition_variable>頭文件的聲明中。兩者都需要與一個互斥量一起才能工作(互斥量是為了同步);前者僅限于與std::mutex一起工作,而后者可以和任何滿足最低標準的互斥量一起工作,從而加上了_any的后綴。因為std::condition_variable_any更加通用,這就可能從體積、性能,以及系統(tǒng)資源的使用方面產(chǎn)生額外的開銷,所以std::condition_variable一般作為首選的類型,當對靈活性有硬性要求時,我們才會去考慮std::condition_variable_any。
所以,如何使用std::condition_variable去處理之前提到的情況——當有數(shù)據(jù)需要處理時,如何喚醒休眠中的線程對其進行處理?以下清單展示了一種使用條件變量做喚醒的方式。
清單4.1 使用std::condition_variable處理數(shù)據(jù)等待
std::mutex mut;
std::queue<data_chunk> data_queue; // 1
std::condition_variable data_cond;
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data); // 2
data_cond.notify_one(); // 3
}
}
void data_processing_thread()
{
while(true)
{
std::unique_lock<std::mutex> lk(mut); // 4
data_cond.wait(
lk,[]{return !data_queue.empty();}); // 5
data_chunk data=data_queue.front();
data_queue.pop();
lk.unlock(); // 6
process(data);
if(is_last_chunk(data))
break;
}
}
首先,你擁有一個用來在兩個線程之間傳遞數(shù)據(jù)的隊列①。當數(shù)據(jù)準備好時,使用std::lock_guard對隊列上鎖,將準備好的數(shù)據(jù)壓入隊列中②,之后線程會對隊列中的數(shù)據(jù)上鎖。然后調(diào)用std::condition_variable的notify_one()成員函數(shù),對等待的線程(如果有等待線程)進行通知③。
在另外一側(cè),你有一個正在處理數(shù)據(jù)的線程,這個線程首先對互斥量上鎖,但在這里std::unique_lock要比std::lock_guard④更加合適——且聽我細細道來。線程之后會調(diào)用std::condition_variable的成員函數(shù)wait(),傳遞一個鎖和一個lambda函數(shù)表達式(作為等待的條件⑤)。Lambda函數(shù)是C++11添加的新特性,它可以讓一個匿名函數(shù)作為其他表達式的一部分,并且非常合適作為標準函數(shù)的謂詞,例如wait()函數(shù)。在這個例子中,簡單的lambda函數(shù)[]{return !data_queue.empty();}會去檢查data_queue是否不為空,當data_queue不為空——那就意味著隊列中已經(jīng)準備好數(shù)據(jù)了。附錄A的A.5節(jié)有Lambda函數(shù)更多的信息。
wait()會去檢查這些條件(通過調(diào)用所提供的lambda函數(shù)),當條件滿足(lambda函數(shù)返回true)時返回。如果條件不滿足(lambda函數(shù)返回false),wait()函數(shù)將解鎖互斥量,并且將這個線程(上段提到的處理數(shù)據(jù)的線程)置于阻塞或等待狀態(tài)。當準備數(shù)據(jù)的線程調(diào)用notify_one()通知條件變量時,處理數(shù)據(jù)的線程從睡眠狀態(tài)中蘇醒,重新獲取互斥鎖,并且對條件再次檢查,在條件滿足的情況下,從wait()返回并繼續(xù)持有鎖。當條件不滿足時,線程將對互斥量解鎖,并且重新開始等待。這就是為什么用std::unique_lock而不使用std::lock_guard——等待中的線程必須在等待期間解鎖互斥量,并在這這之后對互斥量再次上鎖,而std::lock_guard沒有這么靈活。如果互斥量在線程休眠期間保持鎖住狀態(tài),準備數(shù)據(jù)的線程將無法鎖住互斥量,也無法添加數(shù)據(jù)到隊列中;同樣的,等待線程也永遠不會知道條件何時滿足。
清單4.1使用了一個簡單的lambda函數(shù)用于等待⑤,這個函數(shù)用于檢查隊列何時不為空,不過任意的函數(shù)和可調(diào)用對象都可以傳入wait()。當你已經(jīng)寫好了一個函數(shù)去做檢查條件(或許比清單中簡單檢查要復雜很多),那就可以直接將這個函數(shù)傳入wait();不一定非要放在一個lambda表達式中。在調(diào)用wait()的過程中,一個條件變量可能會去檢查給定條件若干次;然而,它總是在互斥量被鎖定時這樣做,當且僅當提供測試條件的函數(shù)返回true時,它就會立即返回。當?shù)却€程重新獲取互斥量并檢查條件時,如果它并非直接響應另一個線程的通知,這就是所謂的偽喚醒(spurious wakeup)。因為任何偽喚醒的數(shù)量和頻率都是不確定的,這里不建議使用一個有副作用的函數(shù)做條件檢查。當你這樣做了,就必須做好多次產(chǎn)生副作用的心理準備。
解鎖std::unique_lock的靈活性,不僅適用于對wait()的調(diào)用;它還可以用于有待處理但還未處理的數(shù)據(jù)⑥。處理數(shù)據(jù)可能是一個耗時的操作,并且如你在第3章見到的,你就知道持有鎖的時間過長是一個多么糟糕的主意。
使用隊列在多個線程中轉(zhuǎn)移數(shù)據(jù)(如清單4.1)是很常見的。做得好的話,同步操作可以限制在隊列本身,同步問題和條件競爭出現(xiàn)的概率也會降低。鑒于這些好處,現(xiàn)在從清單4.1中提取出一個通用線程安全的隊列。
當你正在設計一個通用隊列時,花一些時間想想有哪些操作需要添加到隊列實現(xiàn)中去,就如之前在3.2.3節(jié)看到的線程安全的棧??梢钥匆幌翪++標準庫提供的實現(xiàn),找找靈感;std::queue<>容器的接口展示如下:
清單4.2 std::queue接口
template <class T, class Container = std::deque<T> >
class queue {
public:
explicit queue(const Container&);
explicit queue(Container&& = Container());
template <class Alloc> explicit queue(const Alloc&);
template <class Alloc> queue(const Container&, const Alloc&);
template <class Alloc> queue(Container&&, const Alloc&);
template <class Alloc> queue(queue&&, const Alloc&);
void swap(queue& q);
bool empty() const;
size_type size() const;
T& front();
const T& front() const;
T& back();
const T& back() const;
void push(const T& x);
void push(T&& x);
void pop();
template <class... Args> void emplace(Args&&... args);
};
當你忽略構造、賦值以及交換操作時,你就剩下了三組操作:1. 對整個隊列的狀態(tài)進行查詢(empty()和size());2.查詢在隊列中的各個元素(front()和back());3.修改隊列的操作(push(), pop()和emplace())。這就和3.2.3中的棧一樣了,因此你也會遇到在固有接口上的條件競爭。因此,你需要將front()和pop()合并成一個函數(shù)調(diào)用,就像之前在棧實現(xiàn)時合并top()和pop()一樣。與清單4.1中的代碼不同的是:當使用隊列在多個線程中傳遞數(shù)據(jù)時,接收線程通常需要等待數(shù)據(jù)的壓入。這里我們提供pop()函數(shù)的兩個變種:try_pop()和wait_and_pop()。try_pop() ,嘗試從隊列中彈出數(shù)據(jù),總會直接返回(當有失敗時),即使沒有值可檢索;wait_and_pop(),將會等待有值可檢索的時候才返回。當你使用之前棧的方式來實現(xiàn)你的隊列,你實現(xiàn)的隊列接口就可能會是下面這樣:
清單4.3 線程安全隊列的接口
#include <memory> // 為了使用std::shared_ptr
template<typename T>
class threadsafe_queue
{
public:
threadsafe_queue();
threadsafe_queue(const threadsafe_queue&);
threadsafe_queue& operator=(
const threadsafe_queue&) = delete; // 不允許簡單的賦值
void push(T new_value);
bool try_pop(T& value); // 1
std::shared_ptr<T> try_pop(); // 2
void wait_and_pop(T& value);
std::shared_ptr<T> wait_and_pop();
bool empty() const;
};
就像之前對棧做的那樣,在這里你將很多構造函數(shù)剪掉了,并且禁止了對隊列的簡單賦值。和之前一樣,你也需要提供兩個版本的try_pop()和wait_for_pop()。第一個重載的try_pop()①在引用變量中存儲著檢索值,所以它可以用來返回隊列中值的狀態(tài);當檢索到一個變量時,他將返回true,否則將返回false(詳見A.2節(jié))。第二個重載②就不能做這樣了,因為它是用來直接返回檢索值的。當沒有值可檢索時,這個函數(shù)可以返回NULL指針。
那么問題來了,如何將以上這些和清單4.1中的代碼相關聯(lián)呢?好吧,我們現(xiàn)在就來看看怎么去關聯(lián)。你可以從之前的代碼中提取push()和wait_and_pop(),如以下清單所示。
清單4.4 從清單4.1中提取push()和wait_and_pop()
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue
{
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
};
threadsafe_queue<data_chunk> data_queue; // 1
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
data_queue.push(data); // 2
}
}
void data_processing_thread()
{
while(true)
{
data_chunk data;
data_queue.wait_and_pop(data); // 3
process(data);
if(is_last_chunk(data))
break;
}
}
線程隊列的實例中包含有互斥量和條件變量,所以獨立的變量就不需要了①,并且調(diào)用push()也不需要外部同步②。當然,wait_and_pop()還要兼顧條件變量的等待③。
另一個wait_and_pop()函數(shù)的重載寫起來就很瑣碎了,剩下的函數(shù)就像從清單3.5實現(xiàn)的棧中一個個的粘過來一樣。最終的隊列實現(xiàn)如下所示。
清單4.5 使用條件變量的線程安全隊列(完整版)
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut; // 1 互斥量必須是可變的
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
threadsafe_queue(threadsafe_queue const& other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
empty()是一個const成員函數(shù),并且傳入拷貝構造函數(shù)的other形參是一個const引用;因為其他線程可能有這個類型的非const引用對象,并調(diào)用變種成員函數(shù),所以這里有必要對互斥量上鎖。如果鎖住互斥量是一個可變操作,那么這個互斥量對象就會標記為可變的①,之后他就可以在empty()和拷貝構造函數(shù)中上鎖了。
條件變量在多個線程等待同一個事件時,也是很有用的。當線程用來分解工作負載,并且只有一個線程可以對通知做出反應,與清單4.1中使用的結(jié)構完全相同;運行多個數(shù)據(jù)實例——處理線程(processing thread)。當新的數(shù)據(jù)準備完成,調(diào)用notify_one()將會觸發(fā)一個正在執(zhí)行wait()的線程,去檢查條件和wait()函數(shù)的返回狀態(tài)(因為你僅是向data_queue添加一個數(shù)據(jù)項)。 這里不保證線程一定會被通知到,即使只有一個等待線程被通知時,所有處線程也有可能都在處理數(shù)據(jù)。
另一種可能是,很多線程等待同一事件,對于通知他們都需要做出回應。這會發(fā)生在共享數(shù)據(jù)正在初始化的時候,當處理線程可以使用同一數(shù)據(jù)時,就要等待數(shù)據(jù)被初始化(有不錯的機制可用來應對;可見第3章,3.3.1節(jié)),或等待共享數(shù)據(jù)的更新,比如,定期重新初始化(periodic reinitialization)。在這些情況下,準備線程準備數(shù)據(jù)數(shù)據(jù)時,就會通過條件變量調(diào)用notify_all()成員函數(shù),而非直接調(diào)用notify_one()函數(shù)。顧名思義,這就是全部線程在都去執(zhí)行wait()(檢查他們等待的條件是否滿足)的原因。
當?shù)却€程只等待一次,當條件為true時,它就不會再等待條件變量了,所以一個條件變量可能并非同步機制的最好選擇。尤其是,條件在等待一組可用的數(shù)據(jù)塊時。在這樣的情況下,期望(future)就是一個適合的選擇。