TensorFlow7:图像数据处理

在上一章中详细介绍了卷积神经网络,并提到通过卷积神经网络给图像识别技术带来了突破性进展。这一章将从另外一个维度来进一步提升图像识别的精度以及训练的速度。喜欢摄影的读者都知道图像的亮度、对比度等属性对图像的影响是非常大的,相同物体在不同亮度、对比度下差别非常大。然而在很多图像识别问题中,这些因素都不应该影响最后的识别结果。所以本章将介绍如何对图像数据进行预处理使训练得到的神经网络模型尽可能小地减少预处理对于训练速度的影响,在本章中也将详细地介绍TensorFlow中多线程处理输入数据的解决方案。
本章将根据数据预处理的先后顺序来组织不同的小节。
1.首先将介绍如何统一输入数据的格式,使得在之后系统中可以更加方便地处理。来自实际问题的数据往往有很多格式和属性,这一节将介绍的TFRecord格式可以统一不同的原始数据格式,并更加有效地管理不同的属性。
2.介绍如何对图像数据进行预处理。这一节将列举TensorFlow支持的图像处理函数,并介绍如何使用这些处理方式来弱化与图像识别无关的因素。复杂的图像处理函数有可能降低训练的速度。
3.为了加速数据预处理过程,将完整地介绍TensorFlow多线程数据预处理流程。在这一节中将首先介绍TensorFlow中多线程和队列的概念,这是TensorFlow多线程数据预处理的基本组成部分。
4.具体介绍数据预处理流程中的每个部分。
5.给出一个完整的多线程数据预处理流程图和程序框架。

1.TFRecord输入数据格式

TensorFlow提供了一种统一的格式来存储数据,这个格式就是TFRecord。

1.1TFRecord格式介绍

TFRecord文件中的数据都是通过tf.train.Example Protocol的格式存储的。以下代码给出了tf.train.Example的定义:


tf.train.Example定义的代码

从以上代码可以看出tf.train.Example的数据结构是比较简洁的。tf.train.Example中包含了一个从属性名称到取值的字典。其中属性名称为一个字符串,属性的取值可以为字符串,实数列表或者整数列表。比如将一张解码前的图像存为一个字符串,图像所对应的类别编号存为整数列表。下面给出一个使用TFRecord的具体样例

1.2TFRecord样例程序

本小节将给出具体的样例程序来读写TFRecord文件。下面的程序给出了如何将MNIST输入数据转化为TFRecord的格式:

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import numpy as np

# 定义函数转化变量类型。
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

# 将数据转化为tf.train.Example格式。
def _make_example(pixels, label, image):
    image_raw = image.tostring()
    example = tf.train.Example(features=tf.train.Features(feature={
        'pixels': _int64_feature(pixels),
        'label': _int64_feature(np.argmax(label)),
        'image_raw': _bytes_feature(image_raw)
    }))
    return example

# 读取mnist训练数据。
mnist = input_data.read_data_sets("../../datasets/MNIST_data",dtype=tf.uint8, one_hot=True)
images = mnist.train.images
labels = mnist.train.labels
pixels = images.shape[1]
num_examples = mnist.train.num_examples

# 输出包含训练数据的TFRecord文件。
with tf.python_io.TFRecordWriter("output.tfrecords") as writer:
    for index in range(num_examples):
        example = _make_example(pixels, labels[index], images[index])
        writer.write(example.SerializeToString())
print("TFRecord训练文件已保存。")

# 读取mnist测试数据。
images_test = mnist.test.images
labels_test = mnist.test.labels
pixels_test = images_test.shape[1]
num_examples_test = mnist.test.num_examples

# 输出包含测试数据的TFRecord文件。
with tf.python_io.TFRecordWriter("output_test.tfrecords") as writer:
    for index in range(num_examples_test):
        example = _make_example(
            pixels_test, labels_test[index], images_test[index])
        writer.write(example.SerializeToString())
print("TFRecord测试文件已保存。")

运行代码,如下:
Extracting ../../datasets/MNIST_data/train-images-idx3-ubyte.gz
Extracting ../../datasets/MNIST_data/train-labels-idx1-ubyte.gz
Extracting ../../datasets/MNIST_data/t10k-images-idx3-ubyte.gz
Extracting ../../datasets/MNIST_data/t10k-labels-idx1-ubyte.gz
TFRecord训练文件已保存。
TFRecord测试文件已保存。

以上程序可以实现将MNIST数据集中所有的训练数据存储到一个TFRecord文件中。当数据量较大时,也可以将数据写入多个TFRecord文件。TensorFlow对从文件列表中读取数据提供了很好的支持,以下程序给出了如何读取TFRecord文件中的数据:

# 读取文件。
reader = tf.TFRecordReader()
filename_queue = tf.train.string_input_producer(["output.tfrecords"])
_,serialized_example = reader.read(filename_queue)

# 解析读取的样例。
features = tf.parse_single_example(
    serialized_example,
    features={
        'image_raw':tf.FixedLenFeature([],tf.string),
        'pixels':tf.FixedLenFeature([],tf.int64),
        'label':tf.FixedLenFeature([],tf.int64)
    })

images = tf.decode_raw(features['image_raw'],tf.uint8)
labels = tf.cast(features['label'],tf.int32)
pixels = tf.cast(features['pixels'],tf.int32)

sess = tf.Session()

# 启动多线程处理输入数据。
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess,coord=coord)

for i in range(10):
    image, label, pixel = sess.run([images, labels, pixels])

2.图像数据处理

在之前的几章中多次使用到了图像识别数据集。然而在之前的章节中都是直接使用图像原始的像素矩阵。这一节将介绍图像预处理过程。通过对图像的预处理,可以尽量避免模型受到无关因素的影响。在大部分图像识别问题中,通过图像预处理过程可以提高模型的准确率。在下一小节中将介绍TensorFlow提供的主要图像处理函数,并给出具体图像在处理前和处理后的变化。然后将给出一个完整的图像处理流程

2.1TensorFlow图像处理函数

TensorFlow提供了几类图像处理函数,在本小节中将一一介绍这些图像处理函数

图像编码处理

在之前的章节中提到一张RGB色彩模式的图像可以看成一个三维矩阵,矩阵中的每一个数表示了图像上不同位置,不同颜色的亮度。然而图像在存储时不是直接记录这些矩阵中的数字,而是记录经过压缩编码之后的结果。所以要将一张图像还原成一个三维矩阵,需要解码的过程。TensorFlow提供了对jpeg和png格图像的编码/解码函数。以下代码示范了如何使用TensorFlow中对jpeg格式的编码/解码函数:

import matplotlib.pyplot as plt
import tensorflow as tf   
import numpy as np

# 读取图片
image_raw_data = tf.gfile.FastGFile("C:\\Users\\1003342\\Desktop\\work\\picture\\cat.jpg",'rb').read()
with tf.Session() as sess:
    img_data = tf.image.decode_jpeg(image_raw_data)
    
    # 输出解码之后的三维矩阵。
    print(img_data.eval())
    img_data.set_shape([1797, 2673, 3])
    print(img_data.get_shape())

运行代码,得到部分结果如下:
[[[162 161 140]
[162 162 138]
[161 161 137]
...,
[106 140 46]
[101 137 47]
[102 141 52]]

[[164 162 139]
[163 161 136]
[163 161 138]
...,

打印图片代码如下:

with tf.Session() as sess:
    plt.imshow(img_data.eval())
    plt.show()

运行结果如下:


本样例代码中使用到的原始图像
图像大小调整

一般来说,网络上获取的图像大小是不固定,但神经网络输入节点的个数是固定的。所以在将图像的像素作为输入提供给神经网络之前,需要先将图像的大小统一。这就是图像大小调整需要完成的任务。图像大小调整有两种方式,第一种是通过算法使得新的图像尽量保存原始图像上的所有信息。TensorFlow提供了四种不同的方法,并且将它们封装到了tf.image.resize_images函数。


tf.image.resize_images函数使用

下表给出了tf.image.resize_images函数的method参数取值对应的图像大小调整算法:


tf.image.resize_images函数的method参数取值对应的图像大小调整算法

下图对比了不同大小调整算法得到的结果:
使用tf.image.resize_images函数中不同图像大小调整算法的效果对比图

从上图中可以看出,不同算法调整出来的结果会有细微差别,但不会相差太远。除了将整张图像信息完整保存,TensorFlow还提供了API对图像进行剪裁或者填充。以下代码展示了通过tf.image.resize_image_with_crop_or_pad函数来调整图像大小的功能:

with tf.Session() as sess:    
    croped = tf.image.resize_image_with_crop_or_pad(img_data, 1000, 1000)
    padded = tf.image.resize_image_with_crop_or_pad(img_data, 3000, 3000)
    plt.imshow(croped.eval())
    plt.show()
    plt.imshow(padded.eval())
    plt.show()

运行代码,得到如下图所示的结果:


使用tf.image.resize_image_with_crop_or_pad函数调整图像大小结果对比图

TensorFlow还支持通过比例调整图像大小,以下代码给出了一个样例:

with tf.Session() as sess:   
    central_cropped = tf.image.central_crop(img_data, 0.5)
    plt.imshow(central_cropped.eval())
    plt.show()

运行代码,如下:


截取中间50%的图像

上面介绍的图像剪裁函数都是截取或者填充图像中间的部分。TensorFlow也提供了tf.image.crop_to_bounding_box函数和tf.image.pad_to_bounding_box函数来剪裁或者填充给定区域的图像。这两个函数都要求给出的尺寸满足一定的要求,否则程序会报错。比如在使用tf.image.crop_to_bounding_box函数时,TensorFlow要求提供的图像尺寸要大于目标尺寸,也就是要求原始图像能够剪裁出目标图像的大小。这里就不再给出每个函数的具体样例,有兴趣的读者可以自行参考TensorFlow的API文档。

图像翻转

TensorFlow提供了一些函数来支持对图像的翻转。以下代码实现了将图像上下反转、左右翻转以及沿对角线翻转的功能:

with tf.Session() as sess: 
    # 上下翻转
    flipped1 = tf.image.flip_up_down(img_data)
    plt.imshow(flipped1.eval())
    plt.show()
    # 左右翻转
    flipped2 = tf.image.flip_left_right(img_data)
    plt.imshow(flipped2.eval())
    plt.show()
    
    #对角线翻转
    transposed = tf.image.transpose_image(img_data)
    plt.imshow(transposed.eval())
    plt.show()
    
    # 以一定概率上下翻转图片。
    flipped3 = tf.image.random_flip_up_down(img_data)
    plt.imshow(flipped3.eval())
    plt.show()
    # 以一定概率左右翻转图片。
    flipped4 = tf.image.random_flip_left_right(img_data)
    plt.imshow(flipped4.eval())
    plt.show()

运行代码,得到如下结果:


图片翻转效果图

在很多图像识别问题中,图像的翻转不会影响到识别的结果。于是在训练图像识别的神经网络模型时,可以随机地翻转训练图像,这样训练得到的模型可以识别不同角度的实体。比如假设在训练数据中所有的猫头都是向右的,那么训练出来的模型就无法很好的识别猫头向左的猫。虽然这个问题可以通过收集更多的训练数据来解决,但是通过随机翻转训练图像的方式可以在零成本的情况下很大程度地缓解该问题。所以随即翻转训练图像是一种很常用的图像预处理方式。TensorFlow提供了方便的API完成随机图像翻转的过程。如上代码中最后的几行就是图像翻转的样例代码。

图像色彩调整

和图像翻转类似,调整图像的亮度、对比度、饱和度和色相在很多图像识别应用中都不会影响到识别结果。所以在训练神经网络模型时,可以随机调整训练图像的这些属性,从而使得训练得到的模型尽可能小地受到无关因素的影响。TensorFlow提供了调整这些色彩相关属性的API。以下代码显示了如何修改图像的亮度:

with tf.Session() as sess:
    # 在进行一系列图片调整前,先将图片转换为实数形式,有利于保持计算精度。
    image_float = tf.image.convert_image_dtype(img_data, tf.float32)
    
    # 将图片的亮度-0.5。
    #adjusted = tf.image.adjust_brightness(image_float, -0.5)
    
    # 将图片的亮度-0.5
    #adjusted = tf.image.adjust_brightness(image_float, 0.5)
    
    # 在[-max_delta, max_delta)的范围随机调整图片的亮度。
    adjusted = tf.image.random_brightness(image_float, max_delta=0.5)
    
    # 将图片的对比度-5
    #adjusted = tf.image.adjust_contrast(image_float, -5)
    
    # 将图片的对比度+5
    #adjusted = tf.image.adjust_contrast(image_float, 5)
    
    # 在[lower, upper]的范围随机调整图的对比度。
    #adjusted = tf.image.random_contrast(image_float, lower, upper)

    # 在最终输出前,将实数取值截取到0-1范围内。
    adjusted = tf.clip_by_value(adjusted, 0.0, 1.0)
    plt.imshow(adjusted.eval())
    plt.show()

依次运行每段代码,得到如下结果:


亮度-0.5
亮度+0.5

![在-max_delta, max_delta)的范围随机调整图片的亮度。

图片对比度-5
图片对比度+5
对比度在lower=1和upper=3之间的图像

以下代码显示了如何调整图像的饱和度:

with tf.Session() as sess:
    # 在进行一系列图片调整前,先将图片转换为实数形式,有利于保持计算精度。
    image_float = tf.image.convert_image_dtype(img_data, tf.float32)
    
    adjusted = tf.image.adjust_hue(image_float, 0.1)
    #adjusted = tf.image.adjust_hue(image_float, 0.3)
    #adjusted = tf.image.adjust_hue(image_float, 0.6)
    #adjusted = tf.image.adjust_hue(image_float, 0.9)
    
    # 在[-max_delta, max_delta]的范围随机调整图片的色相。max_delta的取值在[0, 0.5]之间。
    #adjusted = tf.image.random_hue(image_float, max_delta)
    
    # 将图片的饱和度-5。
    #adjusted = tf.image.adjust_saturation(image_float, -5)
    # 将图片的饱和度+5。
    #adjusted = tf.image.adjust_saturation(image_float, 5)
    # 在[lower, upper]的范围随机调整图的饱和度。
    #adjusted = tf.image.random_saturation(image_float, lower, upper)
    
    # 将代表一张图片的三维矩阵中的数字均值变为0,方差变为1。
    #adjusted = tf.image.per_image_whitening(image_float)
    
    # 在最终输出前,将实数取值截取到0-1范围内。
    adjusted = tf.clip_by_value(adjusted, 0.0, 1.0)
    plt.imshow(adjusted.eval())
    plt.show()

依次运行每行代码,得到如下图结果:


色调调整为0.1的图像
色调调整为0.3的图像
色调调整为0.6的图像
色调调整为0.9的图像
色彩调整为0~0.5之间随机调整图像的色相
将图片的饱和度-5
将图片的饱和度+5
在[lower=1, upper=5]的范围随机调整图的饱和度

除了调整图像的亮度、对比度、饱和度和色相,TensorFlow还提供API来完成图像标准化的过程。这个操作就是将图像上的亮度均值变为0,方差变为1.以下代码实现了这个功能:

# 将代表一张图片的三维矩阵中的数字均值变为0,方差变为1。
    #adjusted = tf.image.per_image_whitening(image_float)
处理标注框

在很多图像识别的数据集中,图像中需要关注的物体通常会被标注框圈出来。TensorFlow提供了一些工具来处理标注框。下面这段代码展示了如何通过tf.image.draw_bounding_boxes函数在图像中加入标注框:

with tf.Session() as sess:         
    boxes = tf.constant([[[0.05, 0.05, 0.9, 0.7], [0.35, 0.47, 0.5, 0.56]]])
    
    # sample_distorted_bounding_box要求输入图片必须是实数类型。
    image_float = tf.image.convert_image_dtype(img_data, tf.float32)
    
    begin, size, bbox_for_draw = tf.image.sample_distorted_bounding_box(
        tf.shape(image_float), bounding_boxes=boxes, min_object_covered=0.4)
    
    # 截取后的图片
    distorted_image = tf.slice(image_float, begin, size)
    plt.imshow(distorted_image.eval())
    plt.show()

    # 在原图上用标注框画出截取的范围。由于原图的分辨率较大(2673x1797),生成的标注框 
    # 在Jupyter Notebook上通常因边框过细而无法分辨,这里为了演示方便先缩小分辨率。
    image_small = tf.image.resize_images(image_float, [180, 267], method=0)
    batchced_img = tf.expand_dims(image_small, 0)
    image_with_box = tf.image.draw_bounding_boxes(batchced_img, bbox_for_draw)
    print(bbox_for_draw.eval())
    plt.imshow(image_with_box[0].eval())
    plt.show()

运行代码,得到结果如下图:


添加标注框并剪裁
2.2图像预处理完整样例

在上一小节详细讲解了TensorFlow提供的主要的图像处理函数。在解决真实的图像识别问题时,一般会同时使用多种处理方法。这一小节将给出一个完整的样例程序展示如何将不同的图像处理函数结合成一个完成的图像预处理流程。以下TensorFlow程序完成了从图像片段截取,到图像大小调整再到图像翻转及色彩调整的整个图像预处理过程:

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
  1. 随机调整图片的色彩,定义两种顺序。
def distort_color(image, color_ordering=0):
    if color_ordering == 0:
        image = tf.image.random_brightness(image, max_delta=32./255.)
        image = tf.image.random_saturation(image, lower=0.5, upper=1.5)
        image = tf.image.random_hue(image, max_delta=0.2)
        image = tf.image.random_contrast(image, lower=0.5, upper=1.5)
    else:
        image = tf.image.random_saturation(image, lower=0.5, upper=1.5)
        image = tf.image.random_brightness(image, max_delta=32./255.)
        image = tf.image.random_contrast(image, lower=0.5, upper=1.5)
        image = tf.image.random_hue(image, max_delta=0.2)

    return tf.clip_by_value(image, 0.0, 1.0)
  1. 对图片进行预处理,将图片转化成神经网络的输入层数据。
def preprocess_for_train(image, height, width, bbox):
    # 查看是否存在标注框。
    if bbox is None:
        bbox = tf.constant([0.0, 0.0, 1.0, 1.0], dtype=tf.float32, shape=[1, 1, 4])
    if image.dtype != tf.float32:
        image = tf.image.convert_image_dtype(image, dtype=tf.float32)
        
    # 随机的截取图片中一个块。
    bbox_begin, bbox_size, _ = tf.image.sample_distorted_bounding_box(
        tf.shape(image), bounding_boxes=bbox)
    bbox_begin, bbox_size, _ = tf.image.sample_distorted_bounding_box(
        tf.shape(image), bounding_boxes=bbox)
    distorted_image = tf.slice(image, bbox_begin, bbox_size)

    # 将随机截取的图片调整为神经网络输入层的大小。
    distorted_image = tf.image.resize_images(distorted_image, [height, width], method=np.random.randint(4))
    distorted_image = tf.image.random_flip_left_right(distorted_image)
    distorted_image = distort_color(distorted_image, np.random.randint(2))
    return distorted_image
  1. 读取图片。
image_raw_data = tf.gfile.FastGFile("../../datasets/cat.jpg", "r").read()
with tf.Session() as sess:
    img_data = tf.image.decode_jpeg(image_raw_data)
    boxes = tf.constant([[[0.05, 0.05, 0.9, 0.7], [0.35, 0.47, 0.5, 0.56]]])
    for i in range(9):
        result = preprocess_for_train(img_data, 299, 299, boxes)
        plt.imshow(result.eval())
        plt.show()

运行代码,依次弹出如下图片:


图1

图2

图3

图4

图5

图6

图7

图8

图9

从以上程序可以看到,通过一张训练图像衍生出很多训练样本。通过将训练图像进行预处理,训练得到的神经网络模型可以识别不同大小、方位、色彩等方面的实体。

3.多线程输入数据处理框架

在上一节中介绍了使用TensorFlow对图像进行预处理的方法。虽然使用这些图像数据预处理的方法可以减小无关因素对图像识别模型效果的影响,但这些复杂的预处理过程也会减慢整个训练过程。为了避免图像预处理成为神经网络模型训练效率的瓶颈,TensorFlow提供了一套多线程处理输入数据的框架。在本节中将详细介绍这个框架。下图总结了一个经典的输入数据处理的流程,在以下的各个小节中,将依次介绍这个流程的不同部分。


经典数据数据处理流程图

下面我们将介绍:
1.TensorFlow中队列的概念。在TensorFlow中,队列不仅是一种数据结构,它更提供了多线程机制。队列也是TensorFlow多线程输入数据处理框架的基础。
2.TensorFlow实现上图中的前三步。TensorFlow提供了tf.train.string_input_producer函数来有效管理原始输入文件列表。重点介绍如何使用这个函数。上图中数据预处理的部分已经在之前有过详细介绍,本节不再重复。接着下一小节将介绍4.上图中最后一个流程。这个流程将处理好的单个训练数据整理成训练数据batch,这些batch就可以作为神经网络的输入。
5.介绍tf.train.shuffle_batch_join和tf.train.shuffle_batch函数,并比较不同函数的多线程并行方式
6.最后将给出一个完整的TensorFlow程序来展示整个输入数据处理框架。

3.1队列与多线程

在TensorFlow中,队列和变量类似,都是计算图上有状态的节点。其他的计算节点可以修改它们的状态。对于变量,可以通过赋值操作修改变量的取值。对于队列,修改队列状态的操作主要有Enqueue、EnqueneMany和Dequeue.以下程序展示了如何使用这些函数来操作一个队列:

# 创建一个先进先出的队列,指定队列中最多可以保存两个元素,并指定类型为整数
q = tf.FIFOQueue(2, "int32")
# 使用enqueue_many函数来初始化队列中的元素。和变量初始化类似,在使用队列之前需要明确的调用这个初始化过程。
init = q.enqueue_many(([0, 10],))
# 使用Dequeue函数将队列中的第一个元素出队列。这个元素的值将被存在变量x中。
x = q.dequeue()
# 将得到的值加1.
y = x + 1
# 将加1后的值在重新加入队列
q_inc = q.enqueue([y])

with tf.Session() as sess:
    # 运行初始化队列的操作。
    init.run()
    for _ in range(5):
        # 运行q_inc将执行数据出队列、出队的元素+1、重新加入队列的整个过程。
        v, _ = sess.run([x, q_inc])
        # 打印出队元素的取值
        print v

运行代码,结果如下:
0
10
1
11
2
结果分析:
队列开始有[0,10]两个元素,第一个出队的为0,加1之后再次入队得到的队列为[10,1];第二次出队的为10,加1之后入队的为11,得到的队列为[1,11];以此类推,最后得到的输出为以上结果。
TensorFlow中提供了FIFOQueue和RandomShuffleQueue两种队列。在上面的程序中已经展示了如何使用FIFOQue,它的实现的是一个先进先出队列。RandomShffleQueue会将队列中的元素打乱,每次出队列操作得到的是从当前队列所有元素中随机选择的一个。在训练神经网络时希望每次使用的训练数据尽量随机,RandomShuffleQueue就提供了这样的功能。
在TensorFlow中,队列不仅仅是一种数据结构,还是异步计算张量取值的一个重要机制。比如多个线程可以同时向一个队列中写元素,或者同时读取一个队列中的元素。在后面的小节中将具体介绍TensorFlow是如何利用队列来实现多线程输入数据处理的。在本小节之后的内容中将先介绍TensorFlow提供的辅助函数来更好地协同不同的线程。
TensorFlow提供了tf.Coordinator和tf.QueueRunner两个类来完成多线程协同的功能。tf.Coordinator主要用于协同多个线程一起停止,并提供了should_stop、request_stop和join三个函数。在启动线程之前,需要先声明一个tf.Coordinator类,并将这个类传入每一个创建的线程中。启动的线程需要一直查询tf.Coordinator类中提供的should_stop函数,当这个函数的返回值为True时,则当前线程也需要退出。每一个启动的线程都可以通过调用request_stop函数来通知其他线程退出。当某一个线程调用request_stop函数之后,should_stop函数的返回值将被设置为True,这样其他的线程就可以同时终止了。以下程序展示了如何使用tf.Coordinator:

import tensorflow as tf
import numpy as np
import threading
import time

# 线程中运行的程序,这个程序每隔1秒判断是否需要停止并打印自己的ID。
def MyLoop(coord, worker_id):
        # 使用tf.Coordinator类提供的协同工具判断当前线程是否需要停止
        while not coord.should_stop():
            # 随机停止所有的线程
            if np.random.rand() <0.1:
                print("stoping from id:{}".format(worker_id))
                # 调用coord.request_stop()函数来通知其他线程停止
                coord.request_stop()
            else:
                # 打印当前线程的Id
                print("working on id:{}".format(worker_id))
            time.sleep(1)
            
# 声明一个tf.train.Coordinator类来协同多个线程
coord = tf.train.Coordinator()
# 声明创建5个线程
threads = [threading.Thread(target=MyLoop, args=(coord,i,)) for i in range(5)]
# 启动所有的线程
for t in threads:t.start()
# 等待所有线程退出
coord.join(threads)

运行代码,得到结果如下:
working on id:0
working on id:1
working on id:2
stoping from id:4
working on id:3

当所有线程启动之后,每个线程会打印各自的ID,于是前面4行打印出了它们的ID。然后在暂停1秒之后,所有线程又开始第二遍打印ID。在这个时候有一个线程退出的条件达到,于是调用了coord.request_stop函数来停止所有其他的线程。然而在打印Stoping from id:4之后可以看到有线程仍然在输出。这是因为这些线程已经执行完coord.should_stop的判断,于是仍然会继续输出自己的ID。但在下一轮判断是否需要停止时将退出线程。于是在打印一次ID之后就不会再有输出了。
tf.QueueRunner主要用于启动多个线程来操作同一个队列,启动的这些线程可以通过上面介绍的tf.Coordinator类来统一管理。以下代码展示了如何使用tf.QueueRunner和tf.Coordinator来管理多线程队列操作:
1.定义队列及其操作

import tensorflow as tf
# 声明一个先进先出的队列,队列中最多100个元素,类型为实数
queue = tf.FIFOQueue(100,"float")
# 定义队列的入队操作
enqueue_op = queue.enqueue([tf.random_normal([1])])
# 使用tf.train.QueueRunner来创建多个线程运行队列的入队操作。
# tf.train.QueueRunner的第一个参数给出了被操作的队列,[enqueue_op]*5表示需要启动5个线程,
# 每个县城中运行的是enqueue_op操作
qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)
# 将定义过的QueueRunner加入TensorFlow计算图上指定的集合。
tf.train.add_queue_runner(qr)
# 定义出队操作
out_tensor = queue.dequeue()

2.启动线程

with tf.Session() as sess:
    # 使用tf.train.Coordinator()来协同启动的线程
    coord = tf.train.Coordinator()
    # 启动所有线程
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # 获取队列中的值
    for _ in range(3): print(sess.run(out_tensor)[0])
    # 使用tf.train.Coordinator来停止所有的线程
    coord.request_stop()
    coord.join(threads)

运行代码,结果如下:
-0.494588
-0.31537
0.153534
上面的程序将启动五个线程来执行队列入队的操作,其中每一个线程都是将随机数写入队列。于是在每次运行出队操作时,可以得到一个随机数。

3.2输入文件队列

本小节将介绍如何使用TensorFlow中的队列管理输入文件列表。在这一小节中,假设所有的输入数据都已经整理成TFRecord格式。虽然一个TFRecord文件中可以存储多个训练样例,但是当训练数据量较大时,可以将数据分成多个TFRecord文件来提高处理效率。TensorFlow提供了tf.train.match_filenames_once函数来获取符合一个正则表达式的所有文件,得到的文件列表可以通过tf.train.string_input_producer函数进行有效的管理。
tf.train.string_input_producer函数会使用初始化时提供的文件列表创建一个输入队列,输入队列中原始的像素为文件列表中的所有文件。如上一节中的样例代码所示,创建好的输入队列可以作为文件读取函数的参数。每次调用文件读取函数时,该函数会先判断当前是否已有打开的文件可读,如果没有或者打开的文件已经读完,这个函数会从输入队列中出队一个文件并从这个文件中读取数据。
通过设置shuffle参数,tf.train.string_input_producer函数支持随机打乱文件列表中文件出队顺序。当shuffle参数为True时,文件在加入队列之前会被打乱顺序,所以出队的顺序也是随机的。随机打乱文件顺序以及加入输入队列的过程会跑在一个单独的线程上,这样不会影响获取文件的速度。tf.train.string_input_producer生成的输入队列可以同时被多个文件读取线程操作,而且输入队列会将队列中的文件均匀地分给不同的线程,不出现有些文件被处理过多次而有些文件还没有被处理过的情况。
当一个输入队列中的所有文件都被处理完后,它会将初始化时提供的文件列表中的文件全部重新加入队列。tf.train.string_input_producer函数可以设置num_epochs参数来限制加载初始文件列表的最大轮数。当所有文件都已经被使用了设定的轮数后,如果继续尝试读取新的文件,输入队列会报OutOfRange的错误。在测试神经网络模型时,因为所有测试数据只需要使用一次,所以可以将num_epochs参数设置为1.这样在计算完一轮之后程序将自动停止。在展示tf.train.match_filenames_once和tf.train.string_input_producer函数的使用方法之前,下面先给出一个简单的程序来生成样例数据:

import tensorflow as tf
# 创建TFRecord文件的帮助函数
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
# 模拟海量数据情况下将数据写入不同的文件。num_shards定义了总共写入多少文件
# instances_per_shard定义了每个文件中有多少个数据
num_shards = 2
instances_per_shard = 2
for i in range(num_shards):
    # 将数据分为多个文件时,可以将不同文件以类似0000n-of-0000m的后缀区分。其中m表示了数据总共被存在了多少个文件中,
    # n表示当前文件的编号。式样的方式既方便了通过正则表达式获取文件列表,又在文件名中加入了更多的信息。
    filename = ('data.tfrecords-%.5d-of-%.5d' % (i, num_shards)) 
    # 将Example结构写入TFRecord文件。
    writer = tf.python_io.TFRecordWriter(filename)
    for j in range(instances_per_shard):
    # Example结构仅包含当前样例属于第几个文件以及是当前文件的第几个样本。
        example = tf.train.Example(features=tf.train.Features(feature={
            'i': _int64_feature(i),
            'j': _int64_feature(j)}))
        writer.write(example.SerializeToString())
    writer.close()  

程序运行之后,在指定的目录下将生成两个文件,如下图:


生成样例数据

每一个文件中存储了两个样例。在生成了样例数据之后,以下代码展示了tf.train.match_filenames_once函数和tf.train.string_input_producer函数的使用方法。

# 使用tf.train.match_filenames_once函数获取文件列表
files = tf.train.match_filenames_once("data.tfrecords-*")
# 通过tf.train.string_input_producer函数创建输入队列,输入队列中的文件列表为上面步骤获取的文件列表。
# 这里shuffle参数设为false来避免随机打乱读文件的顺序。但一般在解决真实问题时,会将shuffle参数设置为True。
filename_queue = tf.train.string_input_producer(files, shuffle=False) 
# 如上一节所示读取并解析一个样本
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
      serialized_example,
      features={
          'i': tf.FixedLenFeature([], tf.int64),
          'j': tf.FixedLenFeature([], tf.int64),
      })
with tf.Session() as sess:
    # 虽然在本段程序中没有声明任何变量,但使用tf.train.match_filenames_once函数时需要初始化一些变量。
    sess.run([tf.global_variables_initializer(), tf.local_variables_initializer()])
    print(sess.run(files))
    # 声明tf.train.Coordinator类来协同不同线程,并启动线程。
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # 多次执行获取数据的操作
    for i in range(6):
        print(sess.run([features['i'], features['j']]))
    coord.request_stop()
    coord.join(threads)

运行代码,打印输出如下:


读取文件的结果

在不打乱文件列表的情况下,会依次读出样例数据中的每一个样例。而且当所有样例都被读完之后,程序会自动从头开始。如果限制num_epochs为1,那么程序将会报错,如下图所示:


限制num_epochs为1的报错
3.3组合训练数据(batching)

在上一小节中已经介绍了如何从文件列表中读取单个样例,将这些单个样例通过上节介绍的预处理方法进行处理,就可以得到提供给神经网络输入层的训练数据了。前面介绍过,将多个输入样例组织成一个batch可以提高模型训练的效率。所以在得到单个样例的与处理结果之后,还需要将它们组织成batch,然后再提供给神经网络的输入层。TensorFlow提供了tf.train.batch和tf.train.shuffle_batch函数来将单个的样例组织成batch的形式输出。这两个函数都会生成一个队列,队列的入队操作是生成单个样例的方法,而每次出队得到的是一个batch的样例。它们唯一的区别在于是否会将数据顺序打乱。以下代码展示了这两个函数的使用方法:

# 读取并解析得到样例。这里假设Example结构中i表示一个样例的特征向量,j表示该样例对应的标签
example, label = features['i'], features['j']
# 一个batch中样例的个数
batch_size = 2
# 组合样例的队列中最多可以存储的样例个数。这个队列如果太大,那么需要占用很多内存资源;
# 如果太小,那么出队操作可能会因为没有数据而被阻碍(block),从而导致训练效率降低。一般来说
# 这个队列的大小会和每一个batch的大小相关,下面一行代码给出了设置队列大小的一种方式:
capacity = 1000 + 3 * batch_size
# 使用tf.train.batch函数来组合样例。example, label参数给出了需要组合的元素,
# 一般example和label分别代表训练样本和样本对应的正确标签。batch_size参数给出TensorFlow
# 将暂停入队操作,而只是等待元素出队。当元素个数小于容量时,TensorFlow将暂停入队操作,而只是等待元素出队。当
# 元素小于容量时,TensorFlow将自动重新启动入队操作。
example_batch, label_batch = tf.train.batch([example, label], batch_size=batch_size, capacity=capacity)

with tf.Session() as sess:
    tf.global_variables_initializer().run()
    tf.local_variables_initializer().run()
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # 获取并打印组合之后的样例。在真是问题中,这个输出一般会作为神经网络的输入。
    for i in range(3):
        cur_example_batch, cur_label_batch = sess.run([example_batch, label_batch])
        print cur_example_batch, cur_label_batch
    coord.request_stop()
    coord.join(threads)

运行上面的程序可以得到下面的输出:
[0 0 1] [0 1 0]
[1 0 0] [1 0 1]

这个输出可以看到tf.train.batch函数可以将单个的数据组织成3个一组的batch。在example,label中读到的数据依次为:
example:0,example:0
example:0,example:1
example:1,example:0
example:1,example:1
这是因为tf.train.batch函数不会随机打乱顺序,所以组合之后得到的数据组合成了上面给出的输出。
下面一段代码展示了tf.train.shuffle_batch函数的使用方法:

example, label = features['i'], features['j']

# 使用tf.train.shuffle_batch函数来组合样例。tf.train.shuffle_batch函数的参数大部分都和tf.train.batch函数相似
# 但是min_after_dequeue参数是tf.train.shuffle_batch函数特有的。min_after_dequeue参数限制了出队时队列中元素的最少个数。
# 当队列中元素太少时,随机打乱样例顺序的作用就不大了。所以tf.train.shuffle_batch函数提供了限制出队时最少元素的个数
# 来保证随机打乱顺序的作用。当出队函数被调用但是队列中元素不够时,出队操作将等待更多的元素入队才会完成。
# 如果min_after_dequeue参数被设定,capacity也应该相应调整来满足性能需求。
example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=batch_size, capacity=capacity,min_after_dequeue=30)

with tf.Session() as sess:
    tf.global_variables_initializer().run()
    tf.local_variables_initializer().run()
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # 获取并打印组合之后的样例。在真是问题中,这个输出一般会作为神经网络的输入。
    for i in range(2):
        cur_example_batch, cur_label_batch = sess.run([example_batch, label_batch])
        print(cur_example_batch, cur_label_batch)
    coord.request_stop()
    coord.join(threads)

第一次运行代码结果:
[0 0 1] [0 1 0]
[1 0 1] [0 0 0]
第二次运行代码结果:
[1 0 1] [0 1 0]
[1 1 1] [1 1 1]
从输出中可以看到,得到的样例顺序已经被打乱了。
tf.train.batch函数和tf.train.shuffle_batch函数除了可以将单个训练数据整理成输入batch,也提供了并行化处理输入数据的方法。tf.train.batch函数和tf.train.shuffle_batch函数并行化的方式一致。所以在本小节中仅以应用得更多的tf.train.shuffle_batch函数为例。通过设置tf.train.shuffle_batch函数中的num_threads参数,可以指定多个线程同时执行入队操作。tf.train.shuffle_batch函数的入队操作就是数据读取以及预处理的过程。当num_threads参数大于1时,多个线程会同时读取一个文件中的不同样例并进行预处理。如果需要多个线程处理不同文件中的样例时,可以使用tf.train.shuffle_batch_join函数。此函数会从输入文件队列中获取不同的文件分配给不同的线程。一般来说,输入文件队列是通过前面介绍的tf.train.string_input_producer函数生成的。这个函数会平均分配文件以保证不同文件中的数据会被尽量平均地使用。

3.4输入数据处理框架
经典输入数据处理流程图

在前面的小节中已经介绍了上图所展示的流程图中的所有步骤。在这一小节将把这些步骤串成一个完整的TensorFlow来处理输入数据。以下代码给出了这个完整的程序:

  1. 创建文件列表,通过文件列表创建输入文件队列,读取文件为本章第一节创建的文件。
import tensorflow as tf
# 创建文件列表,并通过文件列表创建输入文件队列。在调用输入数据处理流程前,需要统一所有原始数据的格式并
# 将它们存储到TFRecord文件中。下面给出的文件列表应该包含所有提供训练数据的TFRecord文件
files = tf.train.match_filenames_once("output.tfrecords")
filename_queue = tf.train.string_input_producer(files, shuffle=False) 

2.解析TFRecord文件里的数据

# 读取文件。

reader = tf.TFRecordReader()
_,serialized_example = reader.read(filename_queue)

# 解析读取的样例。
features = tf.parse_single_example(
    serialized_example,
    features={
        'image_raw':tf.FixedLenFeature([],tf.string),
        'pixels':tf.FixedLenFeature([],tf.int64),
        'label':tf.FixedLenFeature([],tf.int64)
    })

decoded_images = tf.decode_raw(features['image_raw'],tf.uint8)
retyped_images = tf.cast(decoded_images, tf.float32)
labels = tf.cast(features['label'],tf.int32)
#pixels = tf.cast(features['pixels'],tf.int32)
images = tf.reshape(retyped_images, [784])

3.将文件以100个为一组打包。

min_after_dequeue = 10000
batch_size = 100
capacity = min_after_dequeue + 3 * batch_size

image_batch, label_batch = tf.train.shuffle_batch([images, labels], 
                                                    batch_size=batch_size, 
                                                    capacity=capacity, 
                                                    min_after_dequeue=min_after_dequeue)
  1. 训练模型
def inference(input_tensor, weights1, biases1, weights2, biases2):
    layer1 = tf.nn.relu(tf.matmul(input_tensor, weights1) + biases1)
    return tf.matmul(layer1, weights2) + biases2

# 模型相关的参数
INPUT_NODE = 784
OUTPUT_NODE = 10
LAYER1_NODE = 500
REGULARAZTION_RATE = 0.0001   
TRAINING_STEPS = 5000        

weights1 = tf.Variable(tf.truncated_normal([INPUT_NODE, LAYER1_NODE], stddev=0.1))
biases1 = tf.Variable(tf.constant(0.1, shape=[LAYER1_NODE]))

weights2 = tf.Variable(tf.truncated_normal([LAYER1_NODE, OUTPUT_NODE], stddev=0.1))
biases2 = tf.Variable(tf.constant(0.1, shape=[OUTPUT_NODE]))

y = inference(image_batch, weights1, biases1, weights2, biases2)
    
# 计算交叉熵及其平均值
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=label_batch)
cross_entropy_mean = tf.reduce_mean(cross_entropy)
    
# 损失函数的计算
regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
regularaztion = regularizer(weights1) + regularizer(weights2)
loss = cross_entropy_mean + regularaztion

# 优化损失函数
train_step = tf.train.GradientDescentOptimizer(0.01).minimize(loss)
    
# 初始化会话,并开始训练过程。
with tf.Session() as sess:
    # tf.global_variables_initializer().run()
    sess.run((tf.global_variables_initializer(),
              tf.local_variables_initializer()))
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # 循环的训练神经网络。
    for i in range(TRAINING_STEPS):
        if i % 1000 == 0:
            print("After %d training step(s), loss is %g " % (i, sess.run(loss)))
                  
        sess.run(train_step) 
    coord.request_stop()
    coord.join(threads)       

运行代码,结果如下:
After 0 training step(s), loss is 504.133
After 1000 training step(s), loss is 1.45948
After 2000 training step(s), loss is 1.85053
After 3000 training step(s), loss is 1.56273
After 4000 training step(s), loss is 1.65542

输入数据处理流程示意图

上图展示了输入数据处理的整个流程。从图上可以看出,输入数据处理的第一步为获取存储训练数据的文件列表。在图中,这个文件列表为{A,B,C}。通过tf.train.string_input_producer函数,可以选择性地将文件列表中文件的顺序打乱,并加入输入队列。因为是否打乱的顺序是可选的,所以在图中通过虚线表示。tf.train.string_input_producer函数会生成并维护一个输入文件队列,不同线程中的文件读取函数可以共享这个输入文件队列。在读取样例数据之后,需要将图像进行预处理。图像预处理的过程也会通过tf.train.shuffle_batch提供的机制并行地跑在多个线程中。输入数据处理流程的最后通过tf.train.shuffle_batch函数将处理好的单个输入样例整理成batch提供给神经网络的输入层。通过这种方式,可以有效地提高数据预处理的效率,避免数据预处理称为神经网络模型训练过程中的性能瓶颈。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容