TensorFlow程序讀取數(shù)據(jù)一共有3種方法:
TensorFlow的數(shù)據(jù)供給機(jī)制允許你在TensorFlow運算圖中將數(shù)據(jù)注入到任一張量中。因此,python運算可以把數(shù)據(jù)直接設(shè)置到TensorFlow圖中。
通過給run()或者eval()函數(shù)輸入feed_dict參數(shù), 可以啟動運算過程。
with tf.Session():
input = tf.placeholder(tf.float32)
classifier = ...
print classifier.eval(feed_dict={input: my_python_preprocessing_fn()})
雖然你可以使用常量和變量來替換任何一個張量, 但是最好的做法應(yīng)該是使用placeholder op節(jié)點。設(shè)計placeholder節(jié)點的唯一的意圖就是為了提供數(shù)據(jù)供給(feeding)的方法。placeholder節(jié)點被聲明的時候是未初始化的, 也不包含數(shù)據(jù), 如果沒有為它供給數(shù)據(jù), 則TensorFlow運算的時候會產(chǎn)生錯誤, 所以千萬不要忘了為placeholder提供數(shù)據(jù)。
可以在tensorflow/g3doc/tutorials/mnist/fully_connected_feed.py找到使用placeholder和MNIST訓(xùn)練的例子,MNIST tutorial也講述了這一例子。
一共典型的文件讀取管線會包含下面這些步驟:
可以使用字符串張量(比如["file0", "file1"], [("file%d" % i) for i in range(2)], [("file%d" % i) for i in range(2)]) 或者tf.train.match_filenames_once 函數(shù)來產(chǎn)生文件名列表。
將文件名列表交給tf.train.string_input_producer 函數(shù).string_input_producer來生成一個先入先出的隊列, 文件閱讀器會需要它來讀取數(shù)據(jù)。
string_input_producer 提供的可配置參數(shù)來設(shè)置文件名亂序和最大的訓(xùn)練迭代數(shù), QueueRunner會為每次迭代(epoch)將所有的文件名加入文件名隊列中, 如果shuffle=True的話, 會對文件名進(jìn)行亂序處理。這一過程是比較均勻的,因此它可以產(chǎn)生均衡的文件名隊列。
這個QueueRunner的工作線程是獨立于文件閱讀器的線程, 因此亂序和將文件名推入到文件名隊列這些過程不會阻塞文件閱讀器運行。
根據(jù)你的文件格式, 選擇對應(yīng)的文件閱讀器, 然后將文件名隊列提供給閱讀器的read方法。閱讀器的read方法會輸出一個key來表征輸入的文件和其中的紀(jì)錄(對于調(diào)試非常有用),同時得到一個字符串標(biāo)量, 這個字符串標(biāo)量可以被一個或多個解析器,或者轉(zhuǎn)換操作將其解碼為張量并且構(gòu)造成為樣本。
從CSV文件中讀取數(shù)據(jù), 需要使用TextLineReader和decode_csv 操作, 如下面的例子所示:
filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"])
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
# Default values, in case of empty columns. Also specifies the type of the
# decoded result.
record_defaults = [[1], [1], [1], [1], [1]]
col1, col2, col3, col4, col5 = tf.decode_csv(
value, record_defaults=record_defaults)
features = tf.concat(0, [col1, col2, col3, col4])
with tf.Session() as sess:
# Start populating the filename queue.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(1200):
# Retrieve a single instance:
example, label = sess.run([features, col5])
coord.request_stop()
coord.join(threads)
每次read的執(zhí)行都會從文件中讀取一行內(nèi)容, decode_csv 操作會解析這一行內(nèi)容并將其轉(zhuǎn)為張量列表。如果輸入的參數(shù)有缺失,record_default參數(shù)可以根據(jù)張量的類型來設(shè)置默認(rèn)值。
在調(diào)用run或者eval去執(zhí)行read之前, 你必須調(diào)用tf.train.start_queue_runners來將文件名填充到隊列。否則read操作會被阻塞到文件名隊列中有值為止。
從二進(jìn)制文件中讀取固定長度紀(jì)錄, 可以使用tf.FixedLengthRecordReader的tf.decode_raw操作。decode_raw操作可以講一個字符串轉(zhuǎn)換為一個uint8的張量。
舉例來說,the CIFAR-10 dataset的文件格式定義是:每條記錄的長度都是固定的,一個字節(jié)的標(biāo)簽,后面是3072字節(jié)的圖像數(shù)據(jù)。uint8的張量的標(biāo)準(zhǔn)操作就可以從中獲取圖像片并且根據(jù)需要進(jìn)行重組。 例子代碼可以在tensorflow/models/image/cifar10/cifar10_input.py找到,具體講述可參見教程.
另一種保存記錄的方法可以允許你講任意的數(shù)據(jù)轉(zhuǎn)換為TensorFlow所支持的格式, 這種方法可以使TensorFlow的數(shù)據(jù)集更容易與網(wǎng)絡(luò)應(yīng)用架構(gòu)相匹配。這種建議的方法就是使用TFRecords文件,TFRecords文件包含了tf.train.Example 協(xié)議內(nèi)存塊(protocol buffer)(協(xié)議內(nèi)存塊包含了字段
Features)。你可以寫一段代碼獲取你的數(shù)據(jù), 將數(shù)據(jù)填入到Example協(xié)議內(nèi)存塊(protocol buffer),將協(xié)議內(nèi)存塊序列化為一個字符串, 并且通過tf.python_io.TFRecordWriter class寫入到TFRecords文件。tensorflow/g3doc/how_tos/reading_data/convert_to_records.py就是這樣的一個例子。
從TFRecords文件中讀取數(shù)據(jù), 可以使用tf.TFRecordReader的tf.parse_single_example解析器。這個parse_single_example操作可以將Example協(xié)議內(nèi)存塊(protocol buffer)解析為張量。 MNIST的例子就使用了convert_to_records 所構(gòu)建的數(shù)據(jù)。 請參看tensorflow/g3doc/how_tos/reading_data/fully_connected_reader.py, 您也可以將這個例子跟fully_connected_feed的版本加以比較。
你可以對輸入的樣本進(jìn)行任意的預(yù)處理, 這些預(yù)處理不依賴于訓(xùn)練參數(shù), 你可以在tensorflow/models/image/cifar10/cifar10.py找到數(shù)據(jù)歸一化, 提取隨機(jī)數(shù)據(jù)片,增加噪聲或失真等等預(yù)處理的例子。
在數(shù)據(jù)輸入管線的末端, 我們需要有另一個隊列來執(zhí)行輸入樣本的訓(xùn)練,評價和推理。因此我們使用tf.train.shuffle_batch 函數(shù)來對隊列中的樣本進(jìn)行亂序處理
示例:
def read_my_file_format(filename_queue):
reader = tf.SomeReader()
key, record_string = reader.read(filename_queue)
example, label = tf.some_decoder(record_string)
processed_example = some_processing(example)
return processed_example, label
def input_pipeline(filenames, batch_size, num_epochs=None):
filename_queue = tf.train.string_input_producer(
filenames, num_epochs=num_epochs, shuffle=True)
example, label = read_my_file_format(filename_queue)
# min_after_dequeue defines how big a buffer we will randomly sample
# from -- bigger means better shuffling but slower start up and more
# memory used.
# capacity must be larger than min_after_dequeue and the amount larger
# determines the maximum we will prefetch. Recommendation:
# min_after_dequeue + (num_threads + a small safety margin) * batch_size
min_after_dequeue = 10000
capacity = min_after_dequeue + 3 * batch_size
example_batch, label_batch = tf.train.shuffle_batch(
[example, label], batch_size=batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue)
return example_batch, label_batch
如果你需要對不同文件中的樣子有更強(qiáng)的亂序和并行處理,可以使用tf.train.shuffle_batch_join 函數(shù).
示例:
def read_my_file_format(filename_queue):
# Same as above
def input_pipeline(filenames, batch_size, read_threads, num_epochs=None):
filename_queue = tf.train.string_input_producer(
filenames, num_epochs=num_epochs, shuffle=True)
example_list = [read_my_file_format(filename_queue)
for _ in range(read_threads)]
min_after_dequeue = 10000
capacity = min_after_dequeue + 3 * batch_size
example_batch, label_batch = tf.train.shuffle_batch_join(
example_list, batch_size=batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue)
return example_batch, label_batch
在這個例子中, 你雖然只使用了一個文件名隊列, 但是TensorFlow依然能保證多個文件閱讀器從同一次迭代(epoch)的不同文件中讀取數(shù)據(jù),知道這次迭代的所有文件都被開始讀取為止。(通常來說一個線程來對文件名隊列進(jìn)行填充的效率是足夠的)
另一種替代方案是: 使用tf.train.shuffle_batch 函數(shù),設(shè)置num_threads的值大于1。 這種方案可以保證同一時刻只在一個文件中進(jìn)行讀取操作(但是讀取速度依然優(yōu)于單線程),而不是之前的同時讀取多個文件。這種方案的優(yōu)點是:
你一共需要多少個讀取線程呢? 函數(shù)tf.train.shuffle_batch*為TensorFlow圖提供了獲取文件名隊列中的元素個數(shù)之和的方法。 如果你有足夠多的讀取線程, 文件名隊列中的元素個數(shù)之和應(yīng)該一直是一個略高于0的數(shù)。具體可以參考TensorBoard:可視化學(xué)習(xí).
QueueRunner對象來預(yù)取 簡單來說:使用上面列出的許多tf.train函數(shù)添加QueueRunner到你的數(shù)據(jù)流圖中。在你運行任何訓(xùn)練步驟之前,需要調(diào)用tf.train.start_queue_runners函數(shù),否則數(shù)據(jù)流圖將一直掛起。tf.train.start_queue_runners 這個函數(shù)將會啟動輸入管道的線程,填充樣本到隊列中,以便出隊操作可以從隊列中拿到樣本。這種情況下最好配合使用一個tf.train.Coordinator,這樣可以在發(fā)生錯誤的情況下正確地關(guān)閉這些線程。如果你對訓(xùn)練迭代數(shù)做了限制,那么需要使用一個訓(xùn)練迭代數(shù)計數(shù)器,并且需要被初始化。推薦的代碼模板如下:
# Create the graph, etc.
init_op = tf.initialize_all_variables()
# Create a session for running operations in the Graph.
sess = tf.Session()
# Initialize the variables (like the epoch counter).
sess.run(init_op)
# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
while not coord.should_stop():
# Run training steps or whatever
sess.run(train_op)
except tf.errors.OutOfRangeError:
print 'Done training -- epoch limit reached'
finally:
# When done, ask the threads to stop.
coord.request_stop()
# Wait for threads to finish.
coord.join(threads)
sess.close()
首先,我們先創(chuàng)建數(shù)據(jù)流圖,這個數(shù)據(jù)流圖由一些流水線的階段組成,階段間用隊列連接在一起。第一階段將生成文件名,我們讀取這些文件名并且把他們排到文件名隊列中。第二階段從文件中讀取數(shù)據(jù)(使用Reader),產(chǎn)生樣本,而且把樣本放在一個樣本隊列中。根據(jù)你的設(shè)置,實際上也可以拷貝第二階段的樣本,使得他們相互獨立,這樣就可以從多個文件中并行讀取。在第二階段的最后是一個排隊操作,就是入隊到隊列中去,在下一階段出隊。因為我們是要開始運行這些入隊操作的線程,所以我們的訓(xùn)練循環(huán)會使得樣本隊列中的樣本不斷地出隊。

在tf.train中要創(chuàng)建這些隊列和執(zhí)行入隊操作,就要添加tf.train.QueueRunner到一個使用tf.train.add_queue_runner函數(shù)的數(shù)據(jù)流圖中。每個QueueRunner負(fù)責(zé)一個階段,處理那些需要在線程中運行的入隊操作的列表。一旦數(shù)據(jù)流圖構(gòu)造成功,tf.train.start_queue_runners函數(shù)就會要求數(shù)據(jù)流圖中每個QueueRunner去開始它的線程運行入隊操作。
如果一切順利的話,你現(xiàn)在可以執(zhí)行你的訓(xùn)練步驟,同時隊列也會被后臺線程來填充。如果您設(shè)置了最大訓(xùn)練迭代數(shù),在某些時候,樣本出隊的操作可能會得到一個tf.OutOfRangeError的錯誤。這其實是TensorFlow的“文件結(jié)束”(EOF) ———— 這就意味著已經(jīng)達(dá)到了最大訓(xùn)練迭代數(shù),已經(jīng)沒有更多可用的樣本了。
最后一個因素是Coordinator。這是負(fù)責(zé)在收到任何關(guān)閉信號的時候,讓所有的線程都知道。最常用的是在發(fā)生異常時這種情況就會呈現(xiàn)出來,比如說其中一個線程在運行某些操作時出現(xiàn)錯誤(或一個普通的Python異常)。
想要了解更多的關(guān)于threading, queues, QueueRunners, and Coordinators的內(nèi)容可以看這里.
想象一下,你有一個模型并且設(shè)置了最大訓(xùn)練迭代數(shù)。這意味著,生成文件的那個線程將只會在產(chǎn)生OutOfRange錯誤之前運行許多次。該QueueRunner會捕獲該錯誤,并且關(guān)閉文件名的隊列,最后退出線程。關(guān)閉隊列做了兩件事情:
OutOfRange錯誤)。它們不會防止等待更多的元素被添加到隊列中,因為上面的一點已經(jīng)保證了這種情況不會發(fā)生。關(guān)鍵是,當(dāng)在文件名隊列被關(guān)閉時候,有可能還有許多文件名在該隊列中,這樣下一階段的流水線(包括reader和其它預(yù)處理)還可以繼續(xù)運行一段時間。 一旦文件名隊列空了之后,如果后面的流水線還要嘗試從文件名隊列中取出一個文件名(例如,從一個已經(jīng)處理完文件的reader中),這將會觸發(fā)OutOfRange錯誤。在這種情況下,即使你可能有一個QueueRunner關(guān)聯(lián)著多個線程。如果這不是在QueueRunner中的最后那個線程,OutOfRange錯誤僅僅只會使得一個線程退出。這使得其他那些正處理自己的最后一個文件的線程繼續(xù)運行,直至他們完成為止。 (但如果假設(shè)你使用的是tf.train.Coordinator,其他類型的錯誤將導(dǎo)致所有線程停止)。一旦所有的reader線程觸發(fā)OutOfRange錯誤,然后才是下一個隊列,再是樣本隊列被關(guān)閉。
同樣,樣本隊列中會有一些已經(jīng)入隊的元素,所以樣本訓(xùn)練將一直持續(xù)直到樣本隊列中再沒有樣本為止。如果樣本隊列是一個RandomShuffleQueue,因為你使用了shuffle_batch 或者 shuffle_batch_join,所以通常不會出現(xiàn)以往那種隊列中的元素會比min_after_dequeue 定義的更少的情況。 然而,一旦該隊列被關(guān)閉,min_after_dequeue設(shè)置的限定值將失效,最終隊列將為空。在這一點來說,當(dāng)實際訓(xùn)練線程嘗試從樣本隊列中取出數(shù)據(jù)時,將會觸發(fā)OutOfRange錯誤,然后訓(xùn)練線程會退出。一旦所有的培訓(xùn)線程完成,tf.train.Coordinator.join會返回,你就可以正常退出了。
舉個例子,有形式為[x, y, z]的樣本,我們可以生成一批形式為[batch, x, y, z]的樣本。 如果你想濾除這個記錄(或許不需要這樣的設(shè)置),那么可以設(shè)置batch的大小為0;但如果你需要每個記錄產(chǎn)生多個樣本,那么batch的值可以大于1。 然后很簡單,只需調(diào)用批處理函數(shù)(比如: shuffle_batch or shuffle_batch_join)去設(shè)置enqueue_many=True就可以實現(xiàn)。
SparseTensors這種數(shù)據(jù)類型使用隊列來處理不是太好。如果要使用SparseTensors你就必須在批處理之后使用tf.parse_example 去解析字符串記錄 (而不是在批處理之前使用 tf.parse_single_example) 。
這僅用于可以完全加載到存儲器中的小的數(shù)據(jù)集。有兩種方法:
使用常數(shù)更簡單一些,但是會使用更多的內(nèi)存(因為常數(shù)會內(nèi)聯(lián)的存儲在數(shù)據(jù)流圖數(shù)據(jù)結(jié)構(gòu)中,這個結(jié)構(gòu)體可能會被復(fù)制幾次)。
training_data = ...
training_labels = ...
with tf.Session():
input_data = tf.constant(training_data)
input_labels = tf.constant(training_labels)
...
要改為使用變量的方式,您就需要在數(shù)據(jù)流圖建立后初始化這個變量。
training_data = ...
training_labels = ...
with tf.Session() as sess:
data_initializer = tf.placeholder(dtype=training_data.dtype,
shape=training_data.shape)
label_initializer = tf.placeholder(dtype=training_labels.dtype,
shape=training_labels.shape)
input_data = tf.Variable(data_initalizer, trainable=False, collections=[])
input_labels = tf.Variable(label_initalizer, trainable=False, collections=[])
...
sess.run(input_data.initializer,
feed_dict={data_initializer: training_data})
sess.run(input_labels.initializer,
feed_dict={label_initializer: training_lables})
設(shè)定trainable=False 可以防止該變量被數(shù)據(jù)流圖的 GraphKeys.TRAINABLE_VARIABLES 收集, 這樣我們就不會在訓(xùn)練的時候嘗試更新它的值; 設(shè)定 collections=[] 可以防止GraphKeys.VARIABLES 收集后做為保存和恢復(fù)的中斷點。
無論哪種方式,tf.train.slice_input_producer function函數(shù)可以被用來每次產(chǎn)生一個切片。這樣就會讓樣本在整個迭代中被打亂,所以在使用批處理的時候不需要再次打亂樣本。所以我們不使用shuffle_batch函數(shù),取而代之的是純tf.train.batch 函數(shù)。 如果要使用多個線程進(jìn)行預(yù)處理,需要將num_threads參數(shù)設(shè)置為大于1的數(shù)字。
在tensorflow/g3doc/how_tos/reading_data/fully_connected_preloaded.py 中可以找到一個MNIST例子,使用常數(shù)來預(yù)加載。 另外使用變量來預(yù)加載的例子在tensorflow/g3doc/how_tos/reading_data/fully_connected_preloaded_var.py,你可以用上面 fully_connected_feed 和 fully_connected_reader 的描述來進(jìn)行比較。
通常你會在一個數(shù)據(jù)集上面訓(xùn)練,然后在另外一個數(shù)據(jù)集上做評估計算(或稱為 "eval")。 這樣做的一種方法是,實際上包含兩個獨立的進(jìn)程:
這兩個進(jìn)程在下面的例子中已經(jīng)完成了:the example CIFAR-10 model,有以下幾個好處:
您可以在同一個進(jìn)程的相同的數(shù)據(jù)流圖中有訓(xùn)練和eval,并分享他們的訓(xùn)練后的變量。參考the shared variables tutorial.
原文地址:Reading data 翻譯:volvet and zhangkom 校對: