TensorFlow:如何在训练期间多次评估验证数据队列?

2024-03-01

tl;dr

如何在每 K 次训练迭代之后评估验证集,使用单独的队列进行训练和验证数据,而不需要单独使用tf.Sessions在多个进程中?考虑到我的特定问题,似乎没有一种干净的方法来实现这一目标,而我当前的解决方法(我认为可行)给了我未定义的行为。帮助!

整个故事

我想每 K 次训练迭代评估一个验证集,但我不知道如何在 TensorFlow 中正确实现这一点。这应该是最常见的操作之一,但感觉 TensorFlow 的 API/架构在这里对我不利,或者至少让事情变得不必要的困难。

我的假设是:

  • [A1] 此处描述的用于训练/验证的多进程模型https://www.tensorflow.org/how_tos/reading_data/#multiple_input_pipelines https://www.tensorflow.org/how_tos/reading_data/#multiple_input_pipelines不适用于我的问题,因为我必须假设没有足够的 GPU 内存来加载变量两次。
  • [A2] 我想每 K 次训练迭代对验证集进行评估。
  • [A3] 训练数据和验证数据都不能简单地从磁盘读取,而是即时生成的。这使得不可能提前可靠地预先计算验证集的大小。
  • [A4] 验证集太大,无法预先计算并存储到磁盘上。
  • [A5] 有效验证集大小不一定是批量大小的倍数。

训练输入管道设置如下:

  • A tf.train.slice_input_producer()生成一个(已打乱顺序的)文件名列表,每个文件名都引用原始输入数据。
  • 自定义数据生成函数从每个原始输入数据块生成可变数量的训练样本/标签。
  • 生成的训练样本/标签通过以下方式排队tf.train.shuffle_batch()在被输入网络之前。

由于[A3]、[A4]、[A5],验证输入管道以几乎相同的方式设置,除了最终输入队列是通过tf.train.batch(),因为洗牌是不可取的。由于上述假设,基于 feed_dict 的方法也是不可行的,并且似乎与使用更高级别的函数(例如tf.train.batch.

然而,使用两组不同的队列进行训练和验证的简单实现是行不通的。据我了解,我有两种选择:

  • [B1] 设置num_epochs验证的参数tf.train.slice_input_producer to None.

    在这种情况下,验证集会无限循环,但我需要提前知道验证集的大小,以明确限制每次运行验证集时要评估的批次数量。此外,如果验证集大小不能被批次大小整除,我总是会在最后一批中多拉一点。由于这会改变每次验证数据的评估顺序,因此这是不可接受的。

  • [B2] 设置num_epochs验证的参数tf.train.slice_input_producer to 1,并另外设置allow_smaller_final_batch的论点tf.train.batch功能为True.

    在这种情况下,验证集仅循环一次,之后相应的队列将永远关闭。默认情况下,这将导致无法对验证集进行两次或多次评估。由于我不知道在 TensorFlow 中重新打开队列的好方法,因此我需要解决此限制。

由于选项[B1]有更大的限制,我选择解决选项[B2]的问题。概述我当前方法的(伪)代码如下:

训练循环应该相当规范。每 K 次迭代,都会调用一个评估验证集的函数。 请注意,我只启动名称以“train_”开头的队列;这是为收集生成的训练数据而设置的队列。为了做到这一点,我创建了两个辅助函数,get_queues_by_name and start_queue_runners.

def train_loop(train_ops, vali_ops, ...):
    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        sess.run([tf.initialize_all_variables(), tf.initialize_local_variables()])
        load_latest_snapshot(sess, loader, snapshot_file)

        # Launch the queue runners
        queues = get_queues_by_name("train")
        threads = start_queue_runners(sess, coord, queues)

        try:
            for step in range(start_iteration, num_train_iterations):
                # Runs the session on validation set
                if step % K == 0:
                    validation_results = run_validation(vali_ops, snapshot_file)

                # TRAINING:
                # ...

        except Exception as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)

辅助函数如下所示:

def get_queues_by_name(name):
    """Retrieves all queues that contain the string given by 'name'"""
    all_queues = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
    return [q for q in all_queues if name in q.name]


def start_queue_runners(session, coordinator, queues):
    """Similar to tf.train.start_queue_runners but now accepts a list of queues instead of a graph collection"""
    with session.graph.as_default():
        threads = []
        for queue in queues:
            log("Queue", "Starting queue '%s'" % queue.name, level=2)
            threads.extend(queue.create_threads(session, coordinator, daemon=True, start=True))
    return threads

In the run_validation函数,我选择的解决封闭队列问题的解决方法是创建一个新的tf.Session。我也只启动与收集验证集数据的队列关联的线程。

def run_validation(ops, snapshot_file):  # Called inside train_loop()
    results = None
    loader = tf.train.Saver()

    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        sess.run([tf.initialize_local_variables()])
        load_latest_snapshot(sess, loader, snapshot_file)

        # Launch the queue runners
        queues = get_queues_by_name("eval")
        threads = start_queue_runners(sess, coord, queues)

        # Performs the inference in batches
        try:
            # Evaluate validation set:
            results = eval_in_batches(ops, sess)
        except Exception as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)

    return results

不知道是否要新建一个tf.Session这是一个好主意,但这似乎是完成重新启动验证队列的唯一方法。理想情况下,我也不想重新加载模型快照,因为这在概念上似乎是不必要的。

这段代码的问题是,我在运行期间看到不稳定/未定义的行为,例如在验证集评估期间网络内部出现 NaN 或 Inf。这种情况似乎主要发生在验证集队列与训练集队列同时被填充的情况下(因为训练队列在验证集评估期间是打开的)。例如,如果我在迭代 0 处评估验证集(当两个队列仍需要填充时),这种情况经常发生。尽管训练/验证队列在不同的会话中运行,但它们似乎共享一些全局状态。

有人可以解释为什么会发生这种情况,以及如何在考虑到上述假设 [A1]-[A5] 的情况下更干净地解决这个问题?


我目前面临类似的问题。到目前为止,我完全避免了任何队列,只是通过feed_dict但由于不使用队列和并行性,我显然失去了一些性能(尽管我仍然对当前的速度感到满意,因为我之前在 Theano 中做了同样的事情)。现在我想重新设计它并使用队列并偶然发现了这个问题。有this https://github.com/tensorflow/tensorflow/issues/2514, this https://github.com/tensorflow/tensorflow/issues/4535, this https://github.com/tensorflow/tensorflow/issues/7951相关问题。

我目前正在考虑这样做:

  • 在训练中,我想使用RandomShuffleQueue这使得事情变得更加复杂。我想我会忽略这个问题,一旦将张量排入队列的读取器线程完成,我会让训练停止,所以我失去了剩余的 up-tocapacity该时期的项目,并将其用于下一个时期。也许为了使其具有确定性,我检查了仍然从队列中读取的火车线程,直到只有min_after_dequeue留下的物品。

  • 在评估中,我想使用相同的图表和相同的会话。我可以用tf.cond从另一个单独的队列读取而不是RandomShuffleQueue。或者我可以使用feed_dict在评价中。如果我要使用单独的队列,我会使用FIFOQueue并仔细跟踪我是否执行了正确的步数。我还可以引入另一个虚拟张量,我将其排入队列,这给了我一个end_of_epoch标志左右,这样我就知道评估线程何时停止。


在 TensorFlow 1.2 中,将会有tf.contrib.data界面 (发表评论 https://github.com/tensorflow/tensorflow/issues/4535#issuecomment-302713431, 文档概述 https://github.com/tensorflow/tensorflow/blob/master/tensorflow/contrib/data/README.md#randomly-shuffling-input-data, API文档 https://www.tensorflow.org/versions/r1.2/api_docs/python/tf/contrib/data),它提供了tf.contrib.data.DatasetAPI也支持类似的洗牌tf.RandomShuffleQueue以及在多个纪元上进行批处理和循环。此外,您还可以通过在数据上创建迭代器来访问数据,并且可以重置迭代器。一些相关的 StackOverflow 问题是here https://stackoverflow.com/questions/44132307/tf-contrib-data-dataset-repeat-with-shuffle-notice-epoch-end-mixed-epochs and here https://stackoverflow.com/questions/44132579/feed-data-into-a-tf-contrib-data-dataset-like-a-queue.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

TensorFlow:如何在训练期间多次评估验证数据队列? 的相关文章

随机推荐