深度学习的模型训练过程往往需要大量的数据,而将这些数据一次性的读入和预处理需要大量的时间开销,所以通常采用队列与多线程的思想解决这个问题,而且TensorFlow为我们提供了完善的函数。
本文介绍了TensorFlow的线程和队列。在使用TensorFlow进行异步计算时,队列是一种强大的机制。正如TensorFlow中的其他组件一样,队列就是TensorFlow图中的节点。这是一种有状态的节点,就像变量一样:其他节点可以修改它的内容。具体来说,其他节点可以把新元素插入到队列后端(rear),也可以把队列前端(front)的元素删除。
为了感受一下队列,先来看一个简单的例子。我们先创建一个“先入先出”的队列(FIFOQueue),并将其内部所有元素初始化为零。然后,我们构建一个TensorFlow图,它从队列前端取走一个元素,加上1之后,放回队列的后端。慢慢地,队列的元素的值就会增加。
Enqueue、 EnqueueMany和Dequeue都是特殊的节点,在Python API中,它们都是队列对象的方法(例如q.enqueue(...)
)。
下面我们深入了解下细节。
概述
诸如FIFOQueue
和RandomShuffleQueue
这样的队列,在TensorFlow的tensor
异步计算时非常重要。
例如,一个典型的输入结构:使用一个RandomShuffleQueue
来作为模型训练的输入:
- 多个线程准备训练样本,并且把这些样本推入队列。
- 一个训练线程执行一个训练操作,此操作会从队列中移除最小批次的样本(mini-batches)。
TensorFlow的Session
对象是可以支持多线程的,因此多个线程可以很方便地使用同一个会话(Session)并且并行地执行操作。然而,在Python程序实现这样的并行运算却并不容易。所有线程都必须能被同步终止,异常必须能被正确捕获并报告,会话终止的时候, 队列必须能被正确地关闭。
TensorFlow提供了两个类来帮助多线程的实现:tf.Coordinator和 tf.QueueRunner
,通常来说这两个类必须被一起使用。Coordinator
类用来同时停止多个工作线程并且向那个在等待所有工作线程终止的程序报告异常。QueueRunner
类用来协调多个工作线程并将多个张量推入同一个队列中。
实现队列
在Python中是没有提供直接实现队列的函数的,所以通常会使用列表模拟队列。
而TensorFlow提供了整套实现队列的函数和方法,在TensorFlow中,队列和变量类似,都是计算图上有状态的节点。操作队列的函数主要有:
FIFOQueue():创建一个先入先出(FIFO)的队列
RandomShuffleQueue():创建一个随机出队的队列
enqueue_many():初始化队列中的元素
dequeue():出队
enqueue():入队
例如
import tensorflow as tf
q = tf.FIFOQueue(3,"int32")
init = q.enqueue_many(([0,1,2],))
x = q.dequeue()
y = x + 1
q_inc = q.enqueue([y])
with tf.Session() as sess:
init.run()
for a in range(5):
v,a = sess.run([x,q_inc])
print(v)
#output
#0
#1
#2
#1
#2
原理如下图:
Coordinator多线程协同
Coordinator类用来帮助多个线程协同工作,多个线程同步终止。 其主要方法有:
- should_stop():如果线程应该停止则返回True。
- request_stop(<exception>):请求该线程停止。
- join(<list of threads>):等待被指定的线程终止。
首先创建一个 Coordinator
对象,然后建立一些使用Coordinator
对象的线程。这些线程通常一直循环运行,每次循环前首先判断should_stop()
是否返回True,如果是的话就停止。 任何线程都可以决定什么时候应该停止,它只需要调用request_stop()
,同时其他线程的 should_stop()
将会返回True,然后就都停下来。
假设有五个线程同时在工作,每个线程自身会先判断should_stop()
的值,当其返回值为True时,则退出当前线程;如果为Flase,也继续该线程。此时如果线程3发出了request_stop()
通知,则其它4个线程的should_stop()
将全部变为True,然后线程4自身的should_stop()
也将变为True,则退出了所有线程。
import tensorflow as tf
import numpy as np
import time
import threading
def MyLoop(coord,worker_id):
while not coord.should_stop():
if np.random.rand()<0.09:
print('stoping from id:',worker_id)
coord.request_stop()
else:
print('working from id:',worker_id)
time.sleep(1)
coord = tf.train.Coordinator()
#声明5个线程
threads=[threading.Thread(target=MyLoop,args=(coord,i,)) for i in range(5)]
#遍历五个线程
for t in threads:
t.start()
coord.join(threads)
#output
#working from id: 0
#working from id: 1
#working from id: 2
#working from id: 3
#working from id: 4
#stoping from id: 0
在第一轮遍历过程中,所有进程的should_stop()
都为Flase
,且随机数都大于等于0.09,所以依次打印了working from id: 0-5,再重新回到进程0时,出现了小于0.09的随机数,即进程0发出了request_stop()请求,进程1-4的should_stop()
返回值全部为True
(进程退出),也就无法进入while,进程0的should_stop()
返回值也将为True
(退出),五个进程全部退出。
QueueRunner多线程操作队列
QueueRunner
类会创建一组线程, 这些线程可以重复的执行Enquene
操作, 他们使用同一个Coordinator
来处理线程同步终止。此外,一个QueueRunner
会运行一个用于异常处理的closer thread
,当Coordinator
收到异常报告时,这个closer thread
会自动关闭队列。
我们可以使用一个一个QueueRunner
来实现上述结构。 首先建立一个TensorFlow图表,这个图表使用队列来输入样本,处理样本并将样本推入队列中,用training操作来移除队列中的样本。
前面说到了队列的操作,多线程协同的操作,在多线程协同的代码中让每一个线程打印自己的id编号,下面我们说下如何用多线程操作一个队列。
TensorFlow提供了队列tf.QueueRunner
类处理多个线程操作同一队列,启动的线程由上面提到的tf.Coordinator
类统一管理,常用的操作有:
- QueueRunner():启动线程,第一个参数为线程需要操作的队列,第二个参数为对队列的操作,如enqueue_op,此时的enqueue_op = queue.enqueue()
- add_queue_runner():在图中的一个集合中加
QueueRunner
,如果没有指定的合集的话,会被添加到tf.GraphKeys.QUEUE_RUNNERS
合集 - start_queue_runners():启动所有被添加到图中的线程
import tensorflow as tf
#创建队列
queue = tf.FIFOQueue(100,'float')
#入队
enqueue_op = queue.enqueue(tf.random_normal([1]))
#启动5个线程,执行enqueue_op
qr = tf.train.QueueRunner( queue,[enqueue_op] * 5)
#添加线程到图
tf.train.add_queue_runner(qr)
#出队
out_tensor = queue.dequeue()
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads=tf.train.start_queue_runners(sess=sess,coord=coord)
for i in range(6):
print(sess.run(out_tensor)[0])
coord.request_stop()
coord.join(threads)
#output
#-0.543751
#-0.712543
#1.32066
#0.2471
#0.313005
#-2.16349
异常处理
通过 queue runners
启动的线程不仅仅推送样本到队列。它们还捕捉和处理由队列产生的异常,包括OutOfRangeError
异常,这个异常是用于报告队列被关闭。 使用Coordinator
训练时在主循环中必须同时捕捉和报告异常。 下面是对上面训练循环的改进版本。
try:
for step in xrange(1000000):
if coord.should_stop():
break
sess.run(train_op)
except Exception as e:
# Report exceptions to the coordinator.
coord.request_stop(e)
finally:
# Terminate as usual. It is safe to call `coord.request_stop()` twice.
coord.request_stop()
coord.join(threads)
参考
TensorFlow 队列与多线程的应用
http://zangbo.me/2017/07/03/TensorFlow_8/