我不知道从哪里开始你的多重问题,我决定用一句话来回答:
Your code definitely should not look like that and is nowhere near current Tensorflow best practices
抱歉,一步一步地调试它是浪费每个人的时间,对我们双方都没有好处.
现在,转到第三点:
- 下面的代码中还有什么我可以进一步优化的吗
是的,你可以使用tensorflow2.0
个功能,看起来你正在逃离这些功能(tf.function
decorator实际上在这里没有用,暂时别管它).
以下新的指导原则也会缓解你的第五点问题,即:
- 我还需要帮助,以一种更通用的方式编写这段代码
因为它是专门为此设计的.在简单介绍之后,我将try 通过以下几个步骤向您介绍这些概念:
1. Divide your program into logical parts
在代码可读性方面,Tensorflow造成了很大的危害;tf1.x
中的所有东西通常都在一个地方处理,全局的函数定义,然后是另一个全局的函数定义,或者可能是数据加载,所有这些都是一团糟.这并不是开发者的错,因为系统的设计鼓励了这些行为.
现在,在tf2.0
中,程序员被鼓励以类似于pytorch
、chainer
和其他更用户友好的框架的 struct 来划分他的工作.
1.1数据加载
你的成绩很好,有Tensorflow Datasets分,但你没有明显的理由就拒绝了.
以下是您的代码和注释:
# You already have tf.data.Dataset objects after load
(x_train, y_train), (x_test, y_test) = tfds.load('mnist', split=['train', 'test'],
batch_size=-1, as_supervised=True)
# But you are reshaping them in a strange manner...
x_train = tf.reshape(x_train, shape=(x_train.shape[0], 784))
x_test = tf.reshape(x_test, shape=(x_test.shape[0], 784))
# And building from slices...
ds_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# Unreadable rescaling (there are built-ins for that)
你可以很容易地概括这个 idea for any dataset,把它放在单独的模块中,比如datasets.py
:
import tensorflow as tf
import tensorflow_datasets as tfds
class ImageDatasetCreator:
@classmethod
# More portable and readable than dividing by 255
def _convert_image_dtype(cls, dataset):
return dataset.map(
lambda image, label: (
tf.image.convert_image_dtype(image, tf.float32),
label,
)
)
def __init__(self, name: str, batch: int, cache: bool = True, split=None):
# Load dataset, every dataset has default train, test split
dataset = tfds.load(name, as_supervised=True, split=split)
# Convert to float range
try:
self.train = ImageDatasetCreator._convert_image_dtype(dataset["train"])
self.test = ImageDatasetCreator._convert_image_dtype(dataset["test"])
except KeyError as exception:
raise ValueError(
f"Dataset {name} does not have train and test, write your own custom dataset handler."
) from exception
if cache:
self.train = self.train.cache() # speed things up considerably
self.test = self.test.cache()
self.batch: int = batch
def get_train(self):
return self.train.shuffle().batch(self.batch).repeat()
def get_test(self):
return self.test.batch(self.batch).repeat()
现在,您可以使用简单的命令加载mnist
多个:
from datasets import ImageDatasetCreator
if __name__ == "__main__":
dataloader = ImageDatasetCreator("mnist", batch=64, cache = True)
train, test = dataloader.get_train(), dataloader.get_test()
从现在开始,你可以使用除mnist
以外的任何名称来加载数据集.
Please, stop making everything deep learning related one hand-off scripts, you are a programmer as well
1.2模型创建
根据模型的复杂程度,有两种建议方法:
tensorflow.keras.models.Sequential
-@Stewart_R显示了这种方式,无需重申他的观点.用于最简单的模型(您应该将此模型与前馈一起使用).
- 继承
tensorflow.keras.Model
并编写自定义模型.当您的模块中有某种逻辑或更复杂时(如resnet、多路径网络等),应该使用这个选项.总之,它更具可读性和可定制性.
你们班的Model
名学生试图模仿这样的东西,但它又南下了;backprop
肯定不是模型本身的一部分,loss
或accuracy
separate them into another module or function, defo not a member!也不是
也就是说,让我们使用第二种方法对网络进行编码(为了简洁起见,应该将此代码放在model.py
中).在此之前,我将从tf.keras.Layers
继承YourDense
前馈层,从头开始编写YourDense
前馈层(这个可能会进入layers.py
模块):
import tensorflow as tf
class YourDense(tf.keras.layers.Layer):
def __init__(self, units):
# It's Python 3, you don't have to specify super parents explicitly
super().__init__()
self.units = units
# Use build to create variables, as shape can be inferred from previous layers
# If you were to create layers in __init__, one would have to provide input_shape
# (same as it occurs in PyTorch for example)
def build(self, input_shape):
# You could use different initializers here as well
self.kernel = self.add_weight(
shape=(input_shape[-1], self.units),
initializer="random_normal",
trainable=True,
)
# You could define bias in __init__ as well as it's not input dependent
self.bias = self.add_weight(shape=(self.units,), initializer="random_normal")
# Oh, trainable=True is default
def call(self, inputs):
# Use overloaded operators instead of tf.add, better readability
return tf.matmul(inputs, self.kernel) + self.bias
关于你的
- 如何在这个自定义窗口中添加一个退出和批处理规范化层
我想您应该创建这些层的自定义实现.
class CustomDropout(layers.Layer):
def __init__(self, rate, **kwargs):
super().__init__(**kwargs)
self.rate = rate
def call(self, inputs, training=None):
if training:
# You could simply create binary mask and multiply here
return tf.nn.dropout(inputs, rate=self.rate)
# You would need to multiply by dropout rate if you were to do that
return inputs
from here层,经过修改,更适合展示目的.
现在您可以最终创建模型(简单的双前馈):
import tensorflow as tf
from layers import YourDense
class Model(tf.keras.Model):
def __init__(self):
super().__init__()
# Use Sequential here for readability
self.network = tf.keras.Sequential(
[YourDense(100), tf.keras.layers.ReLU(), YourDense(10)]
)
def call(self, inputs):
# You can use non-parametric layers inside call as well
flattened = tf.keras.layers.Flatten()(inputs)
return self.network(flattened)
对于Ofc,您应该在一般实现中尽可能多地使用内置功能.
This structure is pretty extensible, so generalization to convolutional nets, resnets, senets, whatever should be done via this module.你可以阅读更多关于它的信息.
我认为这满足了你的第五点:
- 我还需要帮助,以一种更通用的方式编写这段代码
最后一件事,你可能必须使用model.build(shape)
来建立你的模型的图表.
model.build((None, 28, 28, 1))
这将是MNIST的28x28x1
输入形状,其中None
代表批次.
1.3培训
同样,培训可以通过两种不同的方式进行:
正如@Leevo再次指出的,如果要使用第二种方法,就不能简单地使用Keras提供的回调,因此我建议尽可能使用第一种方法.
理论上,你可以手动调用回调函数,比如on_batch_begin()
和其他需要的函数,但这会很麻烦,我不确定这是如何工作的.
说到第一个选项,您可以直接使用tf.data.Dataset
个具有fit的对象.这是在另一个模块(最好是train.py
)中介绍的:
def train(
model: tf.keras.Model,
path: str,
train: tf.data.Dataset,
epochs: int,
steps_per_epoch: int,
validation: tf.data.Dataset,
steps_per_validation: int,
stopping_epochs: int,
optimizer=tf.optimizers.Adam(),
):
model.compile(
optimizer=optimizer,
# I used logits as output from the last layer, hence this
loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.metrics.SparseCategoricalAccuracy()],
)
model.fit(
train,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
validation_data=validation,
validation_steps=steps_per_validation,
callbacks=[
# Tensorboard logging
tf.keras.callbacks.TensorBoard(
pathlib.Path("logs")
/ pathlib.Path(datetime.datetime.now().strftime("%Y%m%d-%H%M%S")),
histogram_freq=1,
),
# Early stopping with best weights preserving
tf.keras.callbacks.EarlyStopping(
monitor="val_sparse_categorical_accuracy",
patience=stopping_epochs,
restore_best_weights=True,
),
],
)
model.save(path)
更复杂的方法与PyTorch
个训练循环非常相似(几乎是复制和粘贴),所以如果您熟悉这些循环,它们应该不会带来太多问题.
你可以在tf2.0
个文档中找到示例,例如here或here.
2. Other things
2.1未回答的问题
- 代码中还有什么我可以进一步优化的吗
上面已经将模型转换为图形,因此我认为在这种情况下调用它不会有什么好处.过早的优化是万恶之源,记住在做这件事之前要衡量一下你的代码.
通过适当的数据缓存(如#1.1开头所述)和良好的管道(而不是那些),您将获得更多.
- 我还需要一种方法来提取所有层的最终权重
正如上面@Leevo所指出的,
weights = model.get_weights()
可以帮你拿重量.你可以把它们转换成np.array
,用seaborn
、matplotlib
、分析、判断或任何你想要的东西来绘图.
2.2总的来说
总而言之,你的main.py
(或入口点或类似的东西)将包括以下内容(或多或少):
from dataset import ImageDatasetCreator
from model import Model
from train import train
# You could use argparse for things like batch, epochs etc.
if __name__ == "__main__":
dataloader = ImageDatasetCreator("mnist", batch=64, cache=True)
train, test = dataloader.get_train(), dataloader.get_test()
model = Model()
model.build((None, 28, 28, 1))
train(
model, train, path epochs, test, len(train) // batch, len(test) // batch, ...
) # provide necessary arguments appropriately
# Do whatever you want with those
weights = model.get_weights()
哦,记住,上面的功能不是用于复制粘贴,应该更像是一个指南.如果你有任何问题,请打电话给我.
3. Questions from comments
3.1如何初始化自定义层和内置层
3.1.1 TLDR您将要阅读的内容
- 自定义泊松初始化函数,但需要three
tf.keras.initalization
API需要two个参数(见最后一点in their docs),因此一个是
- 添加了层的可选偏移,可以使用
为什么它如此复杂?To show that in 100 you can finally use Python's functionality,没有更多的图形麻烦,if
而不是tf.cond
等等.
3.1.2从TLDR到实施
Keras初始值设定项可以在here和Tensorflow's flavor here中找到.
请注意API的不一致性(大写字母类似于类,小写字母带有下划线类似的函数),尤其是在tf2.0
中,但这与重点无关.
可以通过传递字符串(如上面YourDense
中所述)或在对象创建过程中使用它们.
为了允许在自定义层中进行自定义初始化,只需向构造函数添加额外的参数(tf.keras.Model
类仍然是Python类,__init__
类的用法应该与Python类相同).
在此之前,我将向您展示如何创建自定义初始化:
# Poisson custom initialization because why not.
def my_dumb_init(shape, lam, dtype=None):
return tf.squeeze(tf.random.poisson(shape, lam, dtype=dtype))
注意,它的签名需要三个参数,而它应该只需要(shape, dtype)
个参数.尽管如此,在创建自己的图层时,仍可以轻松地"修复",如下面的图层(扩展YourLinear
):
import typing
import tensorflow as tf
class YourDense(tf.keras.layers.Layer):
# It's still Python, use it as Python, that's the point of tf.2.0
@classmethod
def register_initialization(cls, initializer):
# Set defaults if init not provided by user
if initializer is None:
# let's make the signature proper for init in tf.keras
return lambda shape, dtype: my_dumb_init(shape, 1, dtype)
return initializer
def __init__(
self,
units: int,
bias: bool = True,
# can be string or callable, some typing info added as well...
kernel_initializer: typing.Union[str, typing.Callable] = None,
bias_initializer: typing.Union[str, typing.Callable] = None,
):
super().__init__()
self.units: int = units
self.kernel_initializer = YourDense.register_initialization(kernel_initializer)
if bias:
self.bias_initializer = YourDense.register_initialization(bias_initializer)
else:
self.bias_initializer = None
def build(self, input_shape):
# Simply pass your init here
self.kernel = self.add_weight(
shape=(input_shape[-1], self.units),
initializer=self.kernel_initializer,
trainable=True,
)
if self.bias_initializer is not None:
self.bias = self.add_weight(
shape=(self.units,), initializer=self.bias_initializer
)
else:
self.bias = None
def call(self, inputs):
weights = tf.matmul(inputs, self.kernel)
if self.bias is not None:
return weights + self.bias
我已经添加了my_dumb_initialization
作为默认值(如果用户没有提供),并使用bias
参数 Select 了偏差.注:只要if
不依赖数据,就可以自由使用.如果它是(或以某种方式依赖于tf.Tensor
),则必须使用@tf.function
decorator,将Python的流更改为tensorflow
对应的流(例如if
到tf.cond
).
更多关于签名的信息,请参见here,这很容易理解.
如果你想将上述初始值设定项更改合并到你的模型中,你必须创建合适的对象,就这样.
... # Previous of code Model here
self.network = tf.keras.Sequential(
[
YourDense(100, bias=False, kernel_initializer="lecun_uniform"),
tf.keras.layers.ReLU(),
YourDense(10, bias_initializer=tf.initializers.Ones()),
]
)
... # and the same afterwards
对于内置的tf.keras.layers.Dense
层,你也可以这样做(参数名称不同,但idea适用).
3.2 Automatic Differentiation using tf.GradientTape
3.2.1简介
tf.GradientTape
点是为了让用户能够控制变量相对于另一个变量的流量和梯度计算.
示例取自here个,但被分成了不同的部分:
def f(x, y):
output = 1.0
for i in range(y):
if i > 1 and i < 5:
output = tf.multiply(output, x)
return output
带有for
和if
条流控制语句的常规python函数
def grad(x, y):
with tf.GradientTape() as t:
t.watch(x)
out = f(x, y)
return t.gradient(out, x)
使用渐变磁带,您可以记录Tensors
上的所有操作(以及它们的中间状态),并向后"播放"(使用chaingrule执行自动向后微分).
tf.GradientTape()
上下文管理器中的每Tensor
个自动记录.如果某个张量超出了范围,可以使用watch()
方法,如上图所示.
最后,相对于x
的梯度为output
(返回输入).
3.2.2与深度学习的联系
上面描述的是backpropagation
算法.为网络中的每个 node (或者更确切地说, for each 层)计算梯度w.r.t(相对于)输出.然后,各种优化器会使用这些梯度进行修正,因此它会重复.
让我们继续,假设已经设置了tf.keras.Model
、optimizer实例、tf.data.Dataset
和loss函数.
我们可以定义一个Trainer
级的课程来为我们提供培训.Please read comments in the code if in doubt:
class Trainer:
def __init__(self, model, optimizer, loss_function):
self.model = model
self.loss_function = loss_function
self.optimizer = optimizer
# You could pass custom metrics in constructor
# and adjust train_step and test_step accordingly
self.train_loss = tf.keras.metrics.Mean(name="train_loss")
self.test_loss = tf.keras.metrics.Mean(name="train_loss")
def train_step(self, x, y):
# Setup tape
with tf.GradientTape() as tape:
# Get current predictions of network
y_pred = self.model(x)
# Calculate loss generated by predictions
loss = self.loss_function(y, y_pred)
# Get gradients of loss w.r.t. EVERY trainable variable (iterable returned)
gradients = tape.gradient(loss, self.model.trainable_variables)
# Change trainable variable values according to gradient by applying optimizer policy
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
# Record loss of current step
self.train_loss(loss)
def train(self, dataset):
# For N epochs iterate over dataset and perform train steps each time
for x, y in dataset:
self.train_step(x, y)
def test_step(self, x, y):
# Record test loss separately
self.test_loss(self.loss_function(y, self.model(x)))
def test(self, dataset):
# Iterate over whole dataset
for x, y in dataset:
self.test_step(x, y)
def __str__(self):
# You need Python 3.7 with f-string support
# Just return metrics
return f"Loss: {self.train_loss.result()}, Test Loss: {self.test_loss.result()}"
现在,您可以在代码中使用这个类,非常简单,如下所示:
EPOCHS = 5
# model, optimizer, loss defined beforehand
trainer = Trainer(model, optimizer, loss)
for _ in range(EPOCHS):
trainer.train(train_dataset) # Same for training and test datasets
trainer.test(test_dataset)
print(f"Epoch {epoch}: {trainer})")
打印会告诉你每个时代的训练和测试损失.您可以按照自己的方式混合培训和测试(例如,5个培训阶段和1个测试阶段),您可以添加不同的指标等.
如果您想要非面向对象的方法,请参阅here(在我看来,可读性较差,但每个方法都有自己的特点).