第9课: 输入流程与风格迁移
在看完GANs后,课程回到TensorFlow的正题上来。
队列(Queue)和协调器(Coordinator)
我们简要提到过队列但是从没有详细讨论它,在TensorFlow文档中,队列被描述为:“在计算图中实现异步计算的重要对象”。
如果你做过任何深度学习的项目,可能就不需要我来说服你为什么需要异步编程。在输入管道中,多个线程可以帮助我们减轻读取数据阶段的瓶颈,因为读取数据需要等很长时间。例如用队列来准备训练模型所需的数据,我们需要:
- 用多线程准备训练样本,然后将它们推入队列中。
- 一个训练线程从队列中取出mini-batch执行一个训练计算。
TensorFlow的Session对象被设计为支持多线程的,所以多个线程可以简单的用同一个Session并行的执行运算。然而,实现一个Python程序像上面描述那样驾驭线程并不那么容易。所有线程必须能够一起停止,异常必须被捕获并报出,而且停止时队列必须被合理的关闭。
文档上看起来像是在使用队列时多线程是可选的,但是实际上不用多线程,你的队列很可能会阻塞然后使程序崩溃。幸运的是,TensorFlow提供了两个类帮助使用多线程队列:tf.Coordinator
和tf.QueueRunner
,这两个类需要一起使用。Coordinator类帮助多个线程一起停止并报告异常,QueueRunner类用来创建一定数量的线程推送tensors到一个队列中。
这里有两个主要的队列类:tf.FIFOQueue
和tf.RandomShuffleQueue
。FIFOQueue创建一个先入先出的队列,而RandomShuffleQueue创建随机出队的队列。这两个队列支持enqueue
、enqueue_many
和dequeue
的操作。一种常见的做法是在读取数据时enqueue
多个样本,然后一个个的dequeue
它们,dequeue_many
是不允许的。如果你需要为mini-batch训练读取多个样本时,可以使用tf.train.batch
或者tf.train.shuffle_batch
。
import tensorflow as tf
q = tf.FIFOQueue(3, "float")
init = q.enqueue_many(([0., 0., 0.],))
x = q.dequeue()
y = x + 1
q_inc = q.enqueue([y])
with tf.Session() as sess:
init.run()
q_inc.run()
q_inc.run()
q_inc.run()
q_inc.run()
print(sess.run(x))
print(sess.run(x))
print(sess.run(x))
tf.PaddingFIFOQueue
是支持批量大小可变的FIFOQueue,它还支持dequeue_many
。有时候你需要灌入不同大小batch,例如NLP中的序列到序列模型,大多数时候你希望以一句话作为一个batch,但是句子的长度是不同的。除此之外还有tf.PriorityQueue
,它的进队和出队参考另一个参数:权重(priority)。
我不知道为什么只有PaddingFIFOQueue支持dequeue_many
,但是从TensorFlow的GitHub上报告的问题来看似乎是因为其它队列在使用'dequeue_many'时出现了很多问题所以被禁止了。
你可以不使用参数创建一个队列,例如参数:min_after_dequeue
(出队列后队列中元素的最小数量)、bounded capacity
(队列中的最大元素个数)和队列中元素的形状(如果形状为None,
元素可以是任何形状)。然而在实践中很少单独使用队列,而总是和string_input_producer
一起使用,因此我们将简要地过一下这一节,而在string_input_producer中详细介绍。
tf . RandomShuffleQueue ( capacity , min_after_dequeue , dtypes , shapes = None , names = None ,
seed = None , shared_name = None , name = 'random_shuffle_queue')
在课程GitHub中的09_queue_example.py中可以看到这样一个列子。
N_SAMPLES = 1000
NUM_THREADS = 4
# Generating some simple data
# create 1000 random samples, each is a 1D array from the normal distribution (10, 1)
data = 10 * np . random . randn ( N_SAMPLES , 4 ) + 1
# create 1000 random labels of 0 and 1
target = np . random . randint ( 0 , 2 , size = N_SAMPLES )
queue = tf . FIFOQueue ( capacity = 50 , dtypes =[ tf . float32 , tf . int32 ], shapes =[[ 4 ], []])
enqueue_op = queue . enqueue_many ([ data , target ])
dequeue_op = queue . dequeue ()
# create NUM_THREADS to do enqueue
qr = tf . train . QueueRunner ( queue , [ enqueue_op ] * NUM_THREADS)
with tf . Session () as sess:
# Create a coordinator, launch the queue runner threads.
coord = tf . train . Coordinator ()
enqueue_threads = qr . create_threads ( sess , coord = coord , start = True)
for step in xrange ( 100 ): # do to 100 iterations
if coord . should_stop ():
break
data_batch , label_batch = sess . run ( dequeue_op)
coord . request_stop ()
coord . join ( enqueue_threads)
在TensorFlow队列中你也可以不使用tf.Coordinator
,但是可以使用它来管理你创建的任何线程。例如你使用Python线程包创建线程做一些事,你仍然可以使用tf.Coordinator
来管理这些线程。(译者:在1.8版中为tf.train.Coordinator)
import threading
# thread body: loop until the coordinator indicates a stop was requested.
# if some condition becomes true, ask the coordinator to stop.
def my_loop ( coord ):
while not coord . should_stop ():
... do something ...
if ... some condition ...:
coord . request_stop ()
# main code: create a coordinator.
coord = tf . Coordinator ()
# create 10 threads that run 'my_loop()'
# you can also create threads using QueueRunner as the example above
threads = [ threading . Thread ( target = my_loop , args =( coord ,)) for _ in xrange ( 10 )]
# start the threads and wait for all of them to stop.
for t in threads :
t . start ()
coord . join ( threads)
数据读取器(Data Reader)
我们已经学习了3种TensorFlow读取数据的方法,第一种是从常量(Constant)中读取,第二种是用feed_dict读取,第三种也是最常用的做法是用DataReader直接从存储中读取数据。
TensorFlow为常见的数据类型内建了一些Reader,最通用的一个是TextLineReader
,它每次读取一行。除此之外还有读定长数据的Reader,读取整个文件的Reader,读取TFRecord类型数据(下面要讲到)的Reader。
tf . TextLineReader
Outputs the lines of a file delimited by newlines
E . g . text files , CSV files
tf . FixedLengthRecordReader
Outputs the entire file when all files have same fixed lengths
E . g . each MNIST file has 28 x 28 pixels , CIFAR - 10 32 x 32 x 3
tf . WholeFileReader
Outputs the entire file content. This is useful when each file contains a sample
tf . TFRecordReader
Reads samples from TensorFlow ' s own binary format ( TFRecord)
tf . ReaderBase
Allows you to create your own readers
要使用Data Reader,我们首先要建立一个队列并用tf.train.string_input_producer
获得所有你要读取的文件名。
filename_queue = tf . train . string_input_producer ([ "heart.csv" ])
reader = tf . TextLineReader (skip_header_lines=1)
# it means you choose to skip the first line for every file in the queue
你可以将Reader想象成你每调用一次只返回一个值的运算 - 类似于Python的生成器(generator)。所以当你调用reader.read()
的时候它返回一个键值对,其中的键是能够标识文件和数据的字符串。
key , value = reader . read ( filename_queue)
例如上面的语句可能返回:
key = data / heart . csv :2
value = 144 , 0.01 , 4.41 , 28.61 , Absent , 55 , 28.87 , 2.06 , 63 ,1
表示数据144 , 0.01 , 4.41 , 28.61 , Absent , 55 , 28.87 , 2.06 , 63 ,1
是文件data/heart.csv的第2航。
tf.train.string_input_producer在后台创建了一个FIFOQueue,所以要用队列,我们需要同时使用tf.Coordinator
和tf.QueueRunner
。
filename_queue = tf . train . string_input_producer ( filenames)
reader = tf . TextLineReader ( skip_header_lines = 1 ) # skip the first line in the file
key , value = reader . read ( filename_queue)
with tf . Session () as sess:
coord = tf . train . Coordinator ()
threads = tf . train . start_queue_runners ( coord = coord)
print sess . run ( key) # data / heart . csv :2
print sess . run ( value) # 144 , 0.01 , 4.41 , 28.61 , Absent , 55 , 28.87 , 2.06 , 63 ,1
coord . request_stop ()
coord . join ( threads)
我们获得的是value是字符串tensor,接下来用TensorFlow的CSV解码器将value转换为向量。
content = tf . decode_csv ( value , record_defaults = record_defaults )
上面的record_defaults需要我们自己建立,它表示两点内容:
- 告诉解码器每一列都是什么数据类型。
- 在数值为空的情况下每一列的默认值是多少。
定义所有列的类型和初始值。
record_defaults = [[ 1.0 ] for _ in range ( N_FEATURES )] # define all features to be floats
record_defaults [ 4 ] = [ ''] # make the fifth feature string
record_defaults . append ([ 1 ])
content = tf . decode_csv ( value , record_defaults = record_defaults )
你可以在灌数据之前做所有你想做的数据预处理。例如我们的数据有8个浮点值,1个字符串和1个整数值,我们将把字符串转换为浮点数,然后将9个特征转换为一个tensor以便灌入模型。
# convert the 5th column (present/absent) to the binary value 0 and 1
condition = tf . equal ( content [ 4 ], tf . constant ( 'Present' ))
content [ 4 ] = tf . select ( condition , tf . constant ( 1.0 ), tf . constant ( 0.0 ))
# pack all 9 features into a tensor
features = tf . pack ( content [: N_FEATURES ])
# assign the last column to label
label = content [- 1]
于是每一次Reader从CSV文件中读取一行,就会被转换为特征向量和标签。
但是你一般不想灌入一个单独的样本到模型中,而是希望灌入一个batch数据。你可以用tf.train.batch
或者tf.train.shuffle_batch
做这个。
# minimum number elements in the queue after a dequeue, used to ensure
# that the samples are sufficiently mixed
# I think 10 times the BATCH_SIZE is sufficient
min_after_dequeue = 10 * BATCH_SIZE
# the maximum number of elements in the queue
capacity = 20 * BATCH_SIZE
# shuffle the data to generate BATCH_SIZE sample pairs
data_batch , label_batch = tf . train . shuffle_batch ([ features , label ], batch_size = BATCH_SIZE ,
capacity = capacity , min_after_dequeue = min_after_dequeue)
这样就做完了,你可以简单的像在以前的模型中使用input_placeholder
和label_placeholder
中那样使用data_batch
和label_batch
,除非你不需要通过feed_dict
灌数据。完整的代码在课程GitHub中的05_csv_reader.py中。
TFRecord
二进制文件非常有用,虽然我遇到过很多人都不喜欢用,因为他们认为二进制文件很麻烦。如果你是他们中的一员,我希望通过这节课能够帮助你克服对二进制文件的非理性恐惧。它们能更好的利用磁盘缓存,它们能很快的迁移,它们能存储不同类型的数据。(所以你可以把图片和标签放在一个地方)
像很多机器学习框架一样,TensorFlow有自己的二进制数据格式,名叫TFRecord。TFRecord是一个序列化的tf.train.Example
类型的Protobuf对象,可以用简单几行代码进行创建。下面是将一幅图片转换为TFRecord的例子。
首先,我们需要读取图片然后将它转换为二进制字节流。
def get_image_binary ( filename ):
image = Image . open ( filename)
image = np . asarray ( image , np . uint8)
shape = np . array ( image . shape , np . int32)
return shape . tobytes (), image . tobytes () # convert image to raw data bytes in the array.
下一步,用tf.python_io.TFRecordWriter
和tf.train.Feature
将这些字节写入TFRecord。你需要shape信息去重建图片。
def write_to_tfrecord ( label , shape , binary_image , tfrecord_file ):
""" This example is to write a sample to TFRecord file. If you want to write
more samples , just use a loop.
"""
writer = tf . python_io . TFRecordWriter ( tfrecord_file)
# write label, shape, and image content to the TFRecord file
example = tf . train . Example ( features = tf . train . Features ( feature ={
'label' : tf . train . Feature ( bytes_list = tf . train . BytesList ( value =[ label ])),
'shape' : tf . train . Feature ( bytes_list = tf . train . BytesList ( value =[ shape ])),
'image' : tf . train . Feature ( bytes_list = tf . train . BytesList (
value =[ binary_image ]))
}))
writer . write ( example . SerializeToString ())
writer . close ()
要读取一个TFRecord文件,可以用TFRecordReader
和tf.decode_raw
。
def read_from_tfrecord ( filenames ):
tfrecord_file_queue = tf . train . string_input_producer ( filenames , name = 'queue' )
reader = tf . TFRecordReader ()
_ , tfrecord_serialized = reader . read ( tfrecord_file_queue )
# label and image are stored as bytes but could be stored as # int64 or float64 values in a serialized tf.Example protobuf.
tfrecord_features = tf . parse_single_example ( tfrecord_serialized ,
features ={
'label' : tf . FixedLenFeature ([], tf . string ),
'shape' : tf . FixedLenFeature ([], tf . string ),
'image' : tf . FixedLenFeature ([], tf . string ),
}, name = 'features' )
# image was saved as uint8, so we have to decode as uint8.
image = tf . decode_raw ( tfrecord_features [ 'image' ], tf . uint8 )
shape = tf . decode_raw ( tfrecord_features [ 'shape' ], tf . int32 )
# the image tensor is flattened out, so we have to reconstruct the shape
image = tf . reshape ( image , shape )
label = tf . cast ( tfrecord_features [ 'label' ], tf . string )
return label , shape , image
记住这些标签、图片还原成一些tensor对象,要获得它们的值你必须使用tf.Session()
计算。
风格迁移
这部分内容在第二次作业中。