您只需要执行自定义训练循环,但所有操作都需要执行 3 次(如果您还有连续变量,则需要执行 1 次)。这是使用四输出架构的示例:
import tensorflow as tf
import numpy as np
(xtrain, train_target), (xtest, test_target) = tf.keras.datasets.mnist.load_data()
# 10 categories, one for each digit
ytrain1 = tf.keras.utils.to_categorical(train_target, num_classes=10)
ytest1 = tf.keras.utils.to_categorical(test_target, num_classes=10)
# 2 categories, if the digit is odd or not
ytrain2 = tf.keras.utils.to_categorical((train_target % 2 == 0).astype(int),
num_classes=2)
ytest2 = tf.keras.utils.to_categorical((test_target % 2 == 0).astype(int),
num_classes=2)
# 4 categories, based on the interval of the digit
ytrain3 = tf.keras.utils.to_categorical(np.digitize(train_target, [3, 6, 8]),
num_classes=4)
ytest3 = tf.keras.utils.to_categorical(np.digitize(test_target, [3, 6, 8]),
num_classes=4)
# Regression, the square of the digit
ytrain4 = tf.square(tf.cast(train_target, tf.float32))
ytest4 = tf.square(tf.cast(test_target, tf.float32))
# train dataset
train_ds = tf.data.Dataset. \
from_tensor_slices((xtrain, ytrain1, ytrain2, ytrain3, ytrain4)). \
shuffle(32). \
batch(32).map(lambda a, *rest: (tf.divide(a[..., None], 255), rest)). \
prefetch(tf.data.experimental.AUTOTUNE)
# test dataset
test_ds = tf.data.Dataset. \
from_tensor_slices((xtest, ytest1, ytest2, ytest3, ytest4)). \
shuffle(32). \
batch(32).map(lambda a, *rest: (tf.divide(a[..., None], 255), rest)). \
prefetch(tf.data.experimental.AUTOTUNE)
# architecture
class Net(tf.keras.Model):
def __init__(self):
super(Net, self).__init__()
self.conv1 = tf.keras.layers.Conv2D(filters=16, kernel_size=(3, 3),
strides=(1, 1), input_shape=(28, 28, 1),
activation='relu')
self.maxp1 = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
self.conv2 = tf.keras.layers.Conv2D(filters=32, kernel_size=(3, 3),
strides=(1, 1),
activation='relu')
self.maxp2 = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
self.conv3 = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3),
strides=(1, 1),
activation='relu')
self.maxp3 = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
self.gap = tf.keras.layers.Flatten()
self.dense = tf.keras.layers.Dense(64, activation='relu')
self.output1 = tf.keras.layers.Dense(10, activation='softmax')
self.output2 = tf.keras.layers.Dense(2, activation='softmax')
self.output3 = tf.keras.layers.Dense(4, activation='softmax')
self.output4 = tf.keras.layers.Dense(1, activation='linear')
def call(self, inputs, training=False, **kwargs):
x = self.conv1(inputs)
x = self.maxp1(x)
x = self.conv2(x)
x = self.maxp2(x)
x = self.conv3(x)
x = self.maxp3(x)
x = self.gap(x)
x = self.dense(x)
out1 = self.output1(x)
out2 = self.output2(x)
out3 = self.output3(x)
out4 = self.output4(x)
return out1, out2, out3, out4
model = Net()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
# the three losses
loss_1 = tf.losses.CategoricalCrossentropy()
loss_2 = tf.losses.CategoricalCrossentropy()
loss_3 = tf.losses.CategoricalCrossentropy()
loss_4 = tf.losses.MeanAbsoluteError()
# mean object that keeps track of the train losses
loss_1_train = tf.metrics.Mean(name='tr_loss_1')
loss_2_train = tf.metrics.Mean(name='tr_loss_2')
loss_3_train = tf.metrics.Mean(name='tr_loss_3')
loss_4_train = tf.metrics.Mean(name='tr_loss_4')
# mean object that keeps track of the test losses
loss_1_test = tf.metrics.Mean(name='ts_loss_1')
loss_2_test = tf.metrics.Mean(name='ts_loss_2')
loss_3_test = tf.metrics.Mean(name='ts_loss_3')
loss_4_test = tf.metrics.Mean(name='ts_loss_4')
# accuracies for printout
acc_1_train = tf.metrics.CategoricalAccuracy(name='tr_acc_1')
acc_2_train = tf.metrics.CategoricalAccuracy(name='tr_acc_2')
acc_3_train = tf.metrics.CategoricalAccuracy(name='tr_acc_3')
# accuracies for printout
acc_1_test = tf.metrics.CategoricalAccuracy(name='ts_acc_1')
acc_2_test = tf.metrics.CategoricalAccuracy(name='ts_acc_2')
acc_3_test = tf.metrics.CategoricalAccuracy(name='ts_acc_3')
# custom training loop
@tf.function
def train_step(x, y1, y2, y3, y4):
with tf.GradientTape(persistent=True) as tape:
out1, out2, out3, out4 = model(x, training=True)
loss_1_value = loss_1(y1, out1)
loss_2_value = loss_2(y2, out2)
loss_3_value = loss_3(y3, out3)
loss_4_value = loss_4(y4, out4)
losses = [loss_1_value, loss_2_value, loss_3_value, loss_4_value]
# a list of losses is passed
grads = tape.gradient(losses, model.trainable_variables)
# gradients are applied
optimizer.apply_gradients(zip(grads, model.trainable_variables))
# losses are updated
loss_1_train(loss_1_value)
loss_2_train(loss_2_value)
loss_3_train(loss_3_value)
loss_4_train(loss_4_value)
# accuracies are updated
acc_1_train.update_state(y1, out1)
acc_2_train.update_state(y2, out2)
acc_3_train.update_state(y3, out3)
@tf.function
def test_step(x, y1, y2, y3, y4):
out1, out2, out3, out4 = model(x, training=False)
loss_1_value = loss_1(y1, out1)
loss_2_value = loss_2(y2, out2)
loss_3_value = loss_3(y3, out3)
loss_4_value = loss_4(y4, out4)
loss_1_test(loss_1_value)
loss_2_test(loss_2_value)
loss_3_test(loss_3_value)
loss_4_test(loss_4_value)
acc_1_test.update_state(y1, out1)
acc_2_test.update_state(y2, out2)
acc_3_test.update_state(y3, out3)
for epoch in range(5):
# train step
for inputs, outputs1, outputs2, outputs3, outputs4 in train_ds:
train_step(inputs, outputs1, outputs2, outputs3, outputs4)
# test step
for inputs, outputs1, outputs2, outputs3, outputs4 in test_ds:
test_step(inputs, outputs1, outputs2, outputs3, outputs4)
metrics = [acc_1_train, acc_1_test,
acc_2_train, acc_2_test,
acc_3_train, acc_3_test,
loss_4_train, loss_4_test]
# printing metrics
for metric in metrics:
print(f'{metric.name}:{metric.result():=6.4f}', end=' ')
print()
# resetting the states of the metrics
loss_1_train.reset_states()
loss_2_train.reset_states()
loss_3_train.reset_states()
loss_1_test.reset_states()
loss_2_test.reset_states()
loss_3_test.reset_states()
acc_1_train.reset_states()
acc_2_train.reset_states()
acc_3_train.reset_states()
acc_1_test.reset_states()
acc_2_test.reset_states()
acc_3_test.reset_states()
ts_acc_1:0.9495 ts_acc_2:0.9685 ts_acc_3:0.9589 ts_loss_4:5.5617
ts_acc_1:0.9628 ts_acc_2:0.9747 ts_acc_3:0.9697 ts_loss_4:4.8953
ts_acc_1:0.9697 ts_acc_2:0.9758 ts_acc_3:0.9733 ts_loss_4:4.5209
ts_acc_1:0.9715 ts_acc_2:0.9796 ts_acc_3:0.9745 ts_loss_4:4.2175
ts_acc_1:0.9742 ts_acc_2:0.9834 ts_acc_3:0.9775 ts_loss_4:3.9825
我不知道如何在自定义训练循环中使用 Keras 回调,也不知道最热门的问题 https://stackoverflow.com/questions/59438904/applying-callbacks-in-a-custom-training-loop-in-tensorflow-2-0关于这个话题。如果您想使用 EarlyStopping,我个人使用一个collections.deque https://stackoverflow.com/a/63458302/10908375,并在最小损失为倒数第 n 时中断。这是一个例子:
from collections import deque
import numpy as np
epochs = 100
early_stopping = 5
loss_hist = deque(maxlen=early_stopping)
for epoch in range(epochs):
loss_value = np.random.rand()
loss_hist.append(loss_value)
print('Last 5 values: ', *np.round(loss_hist, 3))
if len(loss_hist) == early_stopping and loss_hist.popleft() < min(loss_hist):
print('Early stopping. No loss decrease in %i epochs.\n' % early_stopping)
break
Last 5 values: 0.456
Last 5 values: 0.456 0.153
Last 5 values: 0.456 0.153 0.2
Last 5 values: 0.456 0.153 0.2 0.433
Last 5 values: 0.456 0.153 0.2 0.433 0.528
Last 5 values: 0.153 0.2 0.433 0.528 0.349
Early stopping. No loss decrease in 5 epochs.
可以看到,最后一次,最里面的值是最小的,所以验证损失没有增加。这就是停止条件。