在TensorFlow中使用pipeline加载数据

前面对TensorFlow的多线程做了测试,接下来就利用多线程和Queue pipeline地加载数据。数据流如下图所示:


首先,A、B、C三个文件通过RandomShuffle进程被随机加载到FilenameQueue里,然后Reader1和Reader2进程同FilenameQueue里取文件名读取文件,读取的内容再被放到ExampleQueue里。最后,计算进程会从ExampleQueue里取数据。各个进程独立操作,互不影响,这样可以加快程序速度。
我们简单地生成3个样本文件。

#生成三个样本文件,每个文件包含5列,假设前4列为特征,最后1列为标签
data = np.zeros([20,5])
np.savetxt('file0.csv', data, fmt='%d', delimiter=',')
data += 1
np.savetxt('file1.csv', data, fmt='%d', delimiter=',')
data += 1
np.savetxt('file2.csv', data, fmt='%d', delimiter=',')

然后,创建pipeline数据流。

#定义FilenameQueue
filename_queue = tf.train.string_input_producer(["file%d.csv"%i for i in range(3)])
#定义ExampleQueue
example_queue = tf.RandomShuffleQueue(
    capacity=1000,
    min_after_dequeue=0,
    dtypes=[tf.int32,tf.int32],
    shapes=[[4],[1]]
)
#读取CSV文件,每次读一行
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
#对一行数据进行解码
record_defaults = [[1], [1], [1], [1], [1]]
col1, col2, col3, col4, col5 = tf.decode_csv(
    value, record_defaults=record_defaults)
features = tf.stack([col1, col2, col3, col4])
#将特征和标签push进ExampleQueue
enq_op = example_queue.enqueue([features, [col5]])
#使用QueueRunner创建两个进程加载数据到ExampleQueue
qr = tf.train.QueueRunner(example_queue, [enq_op]*2)
#使用此方法方便后面tf.train.start_queue_runner统一开始进程
tf.train.add_queue_runner(qr)
xs = example_queue.dequeue()
with tf.Session() as sess:
    coord = tf.train.Coordinator()
#开始所有进程
    threads = tf.train.start_queue_runners(coord=coord)
    for i in range(200):
        x = sess.run(xs)
        print(x)
    coord.request_stop()
    coord.join(threads)

以上我们采用for循环step_num次来控制训练迭代次数。我们也可以通过tf.train.string_input_producer的num_epochs参数来设置FilenameQueue循环次数来控制训练,当达到num_epochs时,TensorFlow会抛出OutOfRangeError异常,通过捕获该异常,停止训练。

filename_queue = tf.train.string_input_producer(["file%d.csv"%i for i in range(3)], num_epochs=6)
...
with tf.Session() as sess:
    sess.run(tf.initialize_local_variables()) #必须加上这句话,否则报错!
    coord = tf.train.Coordinator()
#开始所有进程
    threads = tf.train.start_queue_runners(coord=coord)
    try:
        while not coord.should_stop():
            x = sess.run(xs)
            print(x)
    except tf.errors.OutOfRangeError:
        print('Done training -- epch limit reached')
    finally:
        coord.request_stop()

捕获到异常时,请求结束所有进程。

原文: 在TensorFlow中使用pipeline加载数据

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 生成检查点文件(chekpoint file),扩展名.ckpt,tf.train.Saver对象调用Saver....
    利炳根阅读 1,711评论 0 4
  • 西月坠,藏林间惹流云追。 徒笑些,星河转把归人接。 虽病累,人头痛笑中带谑。 看是谁,荆门承影影成玦。
    流连丶阅读 272评论 0 1
  • 1、确定院校及专业(2016年考研院校选择、专业选择) 2、查询目标院校的招生简章(2016年xxx学校考研招生简...
    瘋子0o阅读 889评论 2 18
  • 今天,就刚才,几分钟前还笑意盈盈,现在就留下滩血!从小一起长大的伙伴,走高高的房顶上摔下来了!
    大妈yy阅读 209评论 0 1
  • 如今的你也算一个大人了,可是你真的了解自己?你认为自己是个还算不错的人,虽然算不上是社会好公民,可也并不丧尽天良...
    xber0o阅读 299评论 0 2