tf.data.Dataset是tensorflow从1.4版本开始正式引入的中等级API,比起手动管理多线程数据输入,更加便捷,尤其是在读取TFRecord上基本都使用该api。
官方guide
https://zhuanlan.zhihu.com/p/30751039
https://zhuanlan.zhihu.com/p/33223782
总的来说分三步:
- 创建数据源:Creating a source (e.g. Dataset.from_tensor_slices()) 后,应用变换构建新数据集:Applying a transformation (e.g. Dataset.batch()) constructs a dataset。
- 定义遍历器:从数据集Dataset使用
make_one_shot_iterator()
方法构建遍历器tf.data.Iterator
。 - 使用get_next()方法从遍历器中读取数据tensor,作为graph的输入。
一、Dataset
https://blog.csdn.net/qq_21579045/article/details/86904202
# 从TFRecord文件创建数据集。
input_files = ['./input_file1', './input_file2']
dataset = tf.data.TFRecordDataset(input_files)
# map()函数表示对数据集中的每一条数据进行调用相应方法。
# 使用TFRecordDataset读出的是二进制的数据。
# 这里需要通过map()函数来调用parser()对二进制数据进行解析。
# 类似的,map()函数也可以用来完成其他的数据预处理工作如数据增强。
dataset = dataset.map(parser)
主要就各种Dataset的构建,以及map函数的应用。
map函数可以是解析tfrecords,也可以是数据增强的一些方法。
同时,Dataset还有shuffle(), batch(), repeat()等方法以创建数据输入的pipeline。这几个方法都是计算图中的节点而已,并不会记录结果,因此shuffle后再repeat,两次输出的结果还是会不同的。
Q:shuffle是会在一定容量的数据中预先shuffle然后取出batch个,因此若连续10w个数据的label都是0,则无法做到全局shuffle。
解决方法:在制作tfrecord的时候就对(图片文件名,label)预先shuffle两次,之后再写入tfrecords。避免连续图片是同一类。
二、Iterator
# 定义遍历数据集的迭代器
iterator = dataset.make_one_shot_iterator()
# feat1, feat2是parser()返回的一维int64型张量,可以作为输入用于进一步的计算。
feat1, feat2 = iterator.get_next()
取出的次数也是有限制的,具体来说,若图片数为M,batch为B,repeat为R,则取出次数为 ceil(M * R / B)。超过次数再取,则会发生tf.errors.OutOfRangeError
报错。repeat若无参数则代表无限循环,不会报错。
注意:
此处构建的tf.data.Iterator
遍历器并非符合python传统定义中的iterator。
print(next(iterator))
TypeError: 'Iterator' object is not an iterator
那为何这个Iterator不去实现成python定义中的迭代器形式呢?因为在这里的Iterator也只是一个graph中的operation,并非直接能获得结果,输出依旧是tensor,需要运行session去获取输出值。
with tf.Session() as sess:
for i in range(10):
f1, f2 = sess.run([feat1, feat2])
print(f1, f2)
官方解释:
https://github.com/tensorflow/tensorflow/issues/15273#issuecomment-350835022
因此,我们不能将这个tf.data.Iterator
直接传入keras.model.fit_generator(),keras的函数中需要的生成器每次yield的是实际真实的数据值。
三、纯tf+Dataset API
通过iterator.get_next()
获取到的tensor值,是准备喂入graph中的输入。只有当session中run了train_step后,才会真正地从Dataset中取出数据并被喂入模型进行训练。
完整案例:
https://blog.csdn.net/qq_21579045/article/details/87092969
案例中使用Dataset.repeat()方法间接确定了epoch数目。
四、Keras结合Dataset API
4.1 使用光fit
官方example:https://github.com/keras-team/keras/blob/2.2.4/examples/mnist_dataset_api.py
对应版本tf1.7,此写法中,keras.Model使用Dataset迭代器输出的节点内容。即:Model creation using tensors from the get_next() graph node.
# Model creation using tensors from the get_next() graph node.
inputs, targets = iterator.get_next()
model_input = layers.Input(tensor=inputs)
model_output = cnn_layers(model_input)
train_model = keras.models.Model(inputs=model_input, outputs=model_output)
train_model.compile(optimizer=keras.optimizers.RMSprop(lr=2e-3, decay=1e-5),
loss='categorical_crossentropy',
metrics=['accuracy'],
target_tensors=[targets])
train_model.summary()
train_model.fit(epochs=epochs,
steps_per_epoch=steps_per_epoch)
此写法中,keras.model.fit()方法 中的x,y置空。x
can be None
(default) if feeding from framework-native tensors (e.g. TensorFlow data tensors).
4.2 直接fit(dataset)
【兼容性很差,不推荐!】
新版tf>1.9中,可直接使用tf.keras.model.fit(dataset)
https://stackoverflow.com/questions/46135499/how-to-properly-combine-tensorflows-dataset-api-and-keras
https://github.com/keras-team/keras-io/blob/master/guides/md/customizing_what_happens_in_fit.md
多输入与多输出(需要tf1.11):
https://blog.csdn.net/Murdock_C/article/details/83832033
4.3 fit_generator()传入自行构建一个python的生成器
python生成器只需要一个方法+yield即可。
即将Dataset改造为生成器,每次yield真实数据。以方便Keras使用。
if __name__ == '__main__':
a = Input(shape=(368, 368, 3))
conv1 = layers.Conv2D(64, 3, 1)(a)
conv2 = layers.Conv2D(64, 3, 1)(conv1)
maxpool = layers.MaxPooling2D(pool_size=8, strides=8, padding='same')(conv2)
conv3 = layers.Conv2D(5, 1, 1)(maxpool)
model = keras.Model(inputs=a, outputs=conv3)
model.compile(optimizer=keras.optimizers.SGD(lr=0.05),
loss=keras.losses.mean_squared_error)
import numpy as np
data = np.random.rand(10, 368, 368, 3)
label = np.random.rand(10, 46, 46, 5)
dataset = tf.data.Dataset.from_tensor_slices((data, label)).batch(5).repeat()
iterator = dataset.make_one_shot_iterator()
# print(next(iterator)) # 会报错
# print(K.get_session().run(iterator.get_next())[1][0])
def mannual_iter(iter_):
next_batch = iter_.get_next()
while True:
yield K.get_session().run(next_batch)
# 必须加这一句,否则自定义生成器的graph和之后keras模型训练的graph就不是一个了会报错。
with K.get_session() as sess:
model.fit_generator(mannual_iter(iterator), epochs=3, steps_per_epoch=2,
workers=1, verbose=1)
可以看到,数据量为10,batch为5,因此steps_per_epoch设为2。epoch随便。
此处,我们就自定义创建了一个符合python定义的生成器,封装了Dataset。但是这种做法略带风险,当多进程的时候可能对数据重复引用。
4.4 fit_generator()传入自行构建一个Keras.Sequence的生成器
【推荐】:https://mp.weixin.qq.com/s/vl2-qHV0a80eMYReX9M8vQ 该文章中因为没用到Dataset API,直接读取对应文件名的图片,因此无需session.run。
fit_generator()传入不光可以是python迭代器,还能是Keras.Sequence.
Keras Sequence方法用于拟合一个数据序列,每一个Sequence必须提供getitem和len方法,这跟Torch的Dataset模块类似。Sequence是进行多进程处理的更安全的方法,这种结构保证网络在每个时期每个样本只训练一次,这与生成器不同。
class TfrecordGenerator(Sequence):
def __init__():
self.batch = self.set_batch()
self.sess = K.get_session()
def set_batch(self):
dataset = tf.data.TFRecordDataset(self.files).map(lambda x: self.parse_record_func(x, class_num=self.class_num))
if self.shuffle:
self.dataset = dataset.shuffle(self.batch_size * 4).batch(self.batch_size).repeat()
else:
self.dataset = dataset.batch(self.batch_size).repeat()
iterator = self.dataset.make_initializable_iterator()
batch = iterator.get_next()
self.sess.run(iterator.initializer)
return batch
def __len__(self):
return (self.record_num + self.batch_size - 1) // self.batch_size
def __getitem__(self, index):
img, lbl = self.sess.run(self.batch)
if self.augment:
# 图像增强
... ...
img = preprocess_input(img)
return img, lbl
def on_epoch_end(self):
pass
自定义生成器构建完毕后,训练的时候直接初始化,并作为fit_generator()的参数即可。