我试图优化我的数据输入管道.

以下是管道:

def build_dataset(file_pattern):
    return tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        tf.data.TFRecordDataset,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        buffer_size=2048
    ).batch(
        batch_size=2048,
        drop_remainder=True,
    ).cache(
    ).repeat(
    ).map(
        map_func=_parse_example_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).prefetch(
        buffer_size=1
    )

使用映射函数:

def _bit_to_float(string_batch: tf.Tensor):
    return tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(
        tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),
        tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))
    ), tf.float32), 2), (tf.shape(string_batch)[0], -1))


def _parse_example_batch(example_batch):
    preprocessed_sample_columns = {
        "features": tf.io.VarLenFeature(tf.float32),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(example_batch, preprocessed_sample_columns)
    dense_float = tf.sparse.to_dense(samples["features"])
    bits_to_float = _bit_to_float(samples["booleanFeatures"])
    return (
        tf.concat([dense_float, bits_to_float], 1),
        tf.reshape(samples["label"], (-1, 1))
    )

我试图遵循data pipeline tutorial的最佳实践,并将我的映射函数矢量化(正如mrry建议的那样).

在这种设置下,当数据以高速(带宽约为200MB/s)下载时,CPU使用不足(14%),训练非常缓慢(一个历元超过1小时).

我try 了一些参数配置,改变了interleave()个参数,比如num_parallel_callscycle_length,或者TFRecordDataset个参数,比如num_parallel_calls.

最快的配置使用这组参数:

  • interleave.num_parallel_calls: 1
  • interleave.cycle_length: 8
  • TFRecordDataset.num_parallel_calls: 8

有了这个,一个历元只需要20分钟就可以运行.However, CPU usage is only at 50% while bandwidth consumption is around 55MB/s

问题:

  1. 如何优化流水线以达到100%的CPU使用率(以及大约100MB/s的带宽消耗)?
  2. 为什么tf.data.experimental.AUTOTUNE没有找到加速训练的最佳价值?

友善的


编辑

经过更多的实验,我得出了以下解决方案.

  1. 如果num_parallel_calls大于0,则删除TFRecordDataset已经处理的interleave步.
  2. 将映射函数更新为仅执行parse_exampledecode_raw,返回元组"(,),())
  3. mapcache
  4. _bit_to_float功能作为模型的一个组件移动

最后,这里是数据管道代码:

def build_dataset(file_pattern):
    return tf.data.TFRecordDataset(
        tf.data.Dataset.list_files(file_pattern),
        num_parallel_reads=multiprocessing.cpu_count(),
        buffer_size=70*1000*1000
    ).shuffle(
        buffer_size=2048
    ).map(
        map_func=split,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).batch(
        batch_size=2048,
        drop_remainder=True,
    ).cache(
    ).repeat(
    ).prefetch(
        buffer_size=32
    )


def split(example):
    preprocessed_sample_columns = {
        "features": tf.io.VarLenFeature(tf.float32),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_single_example(example, preprocessed_sample_columns)
    dense_float = tf.sparse.to_dense(samples["features"])
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (dense_float, bits_to_float),
        tf.reshape(samples["label"], (1,))
    )


def build_model(input_shape):
    feature = keras.Input(shape=(N,))
    bool_feature = keras.Input(shape=(M,), dtype="uint8")
    one_hot = dataset._bit_to_float(bool_feature)
    dense_input = tf.reshape(
        keras.backend.concatenate([feature, one_hot], 1),
        input_shape)
    output = actual_model(dense_input)

    model = keras.Model([feature, bool_feature], output)
    return model

def _bit_to_float(string_batch: tf.Tensor):
    return tf.dtypes.cast(tf.reshape(
        tf.bitwise.bitwise_and(
            tf.bitwise.right_shift(
                tf.expand_dims(string_batch, 2),
                tf.reshape(
                    tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),
                    (1, 1, 8)
                ),
            ),
            tf.constant(0x01, dtype=tf.uint8)
        ),
        (tf.shape(string_batch)[0], -1)
    ), tf.float32)

感谢所有这些优化:

  • 带宽消耗约为90MB/s
  • CPU使用率约为20%
  • 第一纪元花了20分钟
  • Successives历险记每次花费5分钟

所以这似乎是一个很好的第一次设置.但是CPU和BW仍然没有被过度使用,所以任何建议都是受欢迎的!


编辑 Bis

所以,在进行了一些基准测试之后,我了解了我认为最好的输入管道:

def build_dataset(file_pattern):
    tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        TFRecordDataset,
        cycle_length=tf.data.experimental.AUTOTUNE,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        2048
    ).batch(
        batch_size=64,
        drop_remainder=True,
    ).map(
        map_func=parse_examples_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).cache(
    ).prefetch(
        tf.data.experimental.AUTOTUNE
    )

def parse_examples_batch(examples):
    preprocessed_sample_columns = {
        "features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(examples, preprocessed_sample_columns)
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (samples['features'], bits_to_float),
        tf.expand_dims(samples["label"], 1)
    )

那么,最新消息是:

  • 根据这个GitHub issueTFRecordDataset交织是传统的,所以interleave功能更好.
  • map之前的batch是一个好习惯(vectorizing your function),并减少调用映射函数的次数.
  • 不再需要repeat了.自TF2以来.0时,Keras模型API支持dataset API并可以使用缓存(请参阅SO post)
  • VarLenFeature切换到FixedLenSequenceFeature,取消对tf.sparse.to_dense的无用呼叫.

希望这能有所帮助.建议仍受欢迎.

推荐答案

为了社区的利益,在回答部分提到@AlexisBRENON的解决方案和重要观察结果.

以下是重要的观察结果:

  1. 根据这GitHub issueTFRecordDataset interleaving是一个传统的,所以interleave的功能更好.
  2. map之前的batch是一个好习惯(vectorizing your function),并减少调用映射函数的次数.
  3. 不再需要repeat了.自TF2以来.0时,Keras模型API支持dataset API并可以使用缓存(请参阅SO post)
  4. VarLenFeature切换到FixedLenSequenceFeature,取消对tf.sparse.to_dense的无用呼叫.

根据上述观察结果,性能得到改善的管道代码如下所述:

def build_dataset(file_pattern):
    tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        TFRecordDataset,
        cycle_length=tf.data.experimental.AUTOTUNE,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        2048
    ).batch(
        batch_size=64,
        drop_remainder=True,
    ).map(
        map_func=parse_examples_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).cache(
    ).prefetch(
        tf.data.experimental.AUTOTUNE
    )

def parse_examples_batch(examples):
    preprocessed_sample_columns = {
        "features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(examples, preprocessed_sample_columns)
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (samples['features'], bits_to_float),
        tf.expand_dims(samples["label"], 1)
    )

Python-3.x相关问答推荐

如何有效地计算Kernel/Matrix

数据类对象列表的字典获取方法-在数据类列表中查找具有特定变量值的数据类

使用数据库将文件从Sharepoint下载到文件系统

如何使用python将pdf文件的页面合并为单个垂直组合页面

如何使用 Selenium Python 连续单击一个按钮直到另一个元素出现?

Pandas 转换为日期时间

python 分代垃圾收集:get_count 没有报告正确的对象创建数?

如何转置和 Pandas DataFrame 并命名新列?

如何在 Python 中 cv2 的窗口标题上动态更新 FPS

Pandas 按值和索引对 DF 进行排序

使用条件参数为 super() 调用 __init__

如何在python 3.10中将列表项(字符串类型)转换为模块函数

删除重复项,但将值相加为一

在 Python 3.5 中使用 aiohttp 获取多个 url

try 注释散列变量时,ABCMeta对象不可下标

PIL 在图像上绘制半透明方形覆盖

是否可以在每个路由的基础上限制 Flask POST 数据大小?

Windows 下 Python 3.x 的 OpenCV

如何将文档字符串放在 Enums 上?

三个参数的reduce函数