tl;dr
如何在每 K 次训练迭代之后评估验证集,使用单独的队列进行训练和验证数据,而不需要单独使用tf.Sessions
在多个进程中?考虑到我的特定问题,似乎没有一种干净的方法来实现这一目标,而我当前的解决方法(我认为可行)给了我未定义的行为。帮助!
整个故事
我想每 K 次训练迭代评估一个验证集,但我不知道如何在 TensorFlow 中正确实现这一点。这应该是最常见的操作之一,但感觉 TensorFlow 的 API/架构在这里对我不利,或者至少让事情变得不必要的困难。
我的假设是:
- [A1] 此处描述的用于训练/验证的多进程模型https://www.tensorflow.org/how_tos/reading_data/#multiple_input_pipelines https://www.tensorflow.org/how_tos/reading_data/#multiple_input_pipelines不适用于我的问题,因为我必须假设没有足够的 GPU 内存来加载变量两次。
- [A2] 我想每 K 次训练迭代对验证集进行评估。
- [A3] 训练数据和验证数据都不能简单地从磁盘读取,而是即时生成的。这使得不可能提前可靠地预先计算验证集的大小。
- [A4] 验证集太大,无法预先计算并存储到磁盘上。
- [A5] 有效验证集大小不一定是批量大小的倍数。
训练输入管道设置如下:
- A
tf.train.slice_input_producer()
生成一个(已打乱顺序的)文件名列表,每个文件名都引用原始输入数据。
- 自定义数据生成函数从每个原始输入数据块生成可变数量的训练样本/标签。
- 生成的训练样本/标签通过以下方式排队
tf.train.shuffle_batch()
在被输入网络之前。
由于[A3]、[A4]、[A5],验证输入管道以几乎相同的方式设置,除了最终输入队列是通过tf.train.batch()
,因为洗牌是不可取的。由于上述假设,基于 feed_dict 的方法也是不可行的,并且似乎与使用更高级别的函数(例如tf.train.batch
.
然而,使用两组不同的队列进行训练和验证的简单实现是行不通的。据我了解,我有两种选择:
-
[B1] 设置num_epochs
验证的参数tf.train.slice_input_producer
to None
.
在这种情况下,验证集会无限循环,但我需要提前知道验证集的大小,以明确限制每次运行验证集时要评估的批次数量。此外,如果验证集大小不能被批次大小整除,我总是会在最后一批中多拉一点。由于这会改变每次验证数据的评估顺序,因此这是不可接受的。
-
[B2] 设置num_epochs
验证的参数tf.train.slice_input_producer
to 1
,并另外设置allow_smaller_final_batch
的论点tf.train.batch
功能为True
.
在这种情况下,验证集仅循环一次,之后相应的队列将永远关闭。默认情况下,这将导致无法对验证集进行两次或多次评估。由于我不知道在 TensorFlow 中重新打开队列的好方法,因此我需要解决此限制。
由于选项[B1]有更大的限制,我选择解决选项[B2]的问题。概述我当前方法的(伪)代码如下:
训练循环应该相当规范。每 K 次迭代,都会调用一个评估验证集的函数。
请注意,我只启动名称以“train_”开头的队列;这是为收集生成的训练数据而设置的队列。为了做到这一点,我创建了两个辅助函数,get_queues_by_name
and start_queue_runners
.
def train_loop(train_ops, vali_ops, ...):
with tf.Session() as sess:
coord = tf.train.Coordinator()
sess.run([tf.initialize_all_variables(), tf.initialize_local_variables()])
load_latest_snapshot(sess, loader, snapshot_file)
# Launch the queue runners
queues = get_queues_by_name("train")
threads = start_queue_runners(sess, coord, queues)
try:
for step in range(start_iteration, num_train_iterations):
# Runs the session on validation set
if step % K == 0:
validation_results = run_validation(vali_ops, snapshot_file)
# TRAINING:
# ...
except Exception as e:
coord.request_stop(e)
finally:
coord.request_stop()
coord.join(threads)
辅助函数如下所示:
def get_queues_by_name(name):
"""Retrieves all queues that contain the string given by 'name'"""
all_queues = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
return [q for q in all_queues if name in q.name]
def start_queue_runners(session, coordinator, queues):
"""Similar to tf.train.start_queue_runners but now accepts a list of queues instead of a graph collection"""
with session.graph.as_default():
threads = []
for queue in queues:
log("Queue", "Starting queue '%s'" % queue.name, level=2)
threads.extend(queue.create_threads(session, coordinator, daemon=True, start=True))
return threads
In the run_validation
函数,我选择的解决封闭队列问题的解决方法是创建一个新的tf.Session
。我也只启动与收集验证集数据的队列关联的线程。
def run_validation(ops, snapshot_file): # Called inside train_loop()
results = None
loader = tf.train.Saver()
with tf.Session() as sess:
coord = tf.train.Coordinator()
sess.run([tf.initialize_local_variables()])
load_latest_snapshot(sess, loader, snapshot_file)
# Launch the queue runners
queues = get_queues_by_name("eval")
threads = start_queue_runners(sess, coord, queues)
# Performs the inference in batches
try:
# Evaluate validation set:
results = eval_in_batches(ops, sess)
except Exception as e:
coord.request_stop(e)
finally:
coord.request_stop()
coord.join(threads)
return results
不知道是否要新建一个tf.Session
这是一个好主意,但这似乎是完成重新启动验证队列的唯一方法。理想情况下,我也不想重新加载模型快照,因为这在概念上似乎是不必要的。
这段代码的问题是,我在运行期间看到不稳定/未定义的行为,例如在验证集评估期间网络内部出现 NaN 或 Inf。这种情况似乎主要发生在验证集队列与训练集队列同时被填充的情况下(因为训练队列在验证集评估期间是打开的)。例如,如果我在迭代 0 处评估验证集(当两个队列仍需要填充时),这种情况经常发生。尽管训练/验证队列在不同的会话中运行,但它们似乎共享一些全局状态。
有人可以解释为什么会发生这种情况,以及如何在考虑到上述假设 [A1]-[A5] 的情况下更干净地解决这个问题?