我使用这里的变压器Keras 文档示例用于多实例分类。每个实例的类取决于一个包中的其他实例。我使用变压器模型是因为:
它不对数据之间的时间/空间关系做出任何假设。这非常适合处理一组对象
例如,每个包最多可以有 5 个实例,每个实例有 3 个特征。
# Generate data
max_length = 5
x_lst = []
y_lst = []
for _ in range(10):
num_instances = np.random.randint(2, max_length + 1)
x_bag = np.random.randint(0, 9, size=(num_instances, 3))
y_bag = np.random.randint(0, 2, size=num_instances)
x_lst.append(x_bag)
y_lst.append(y_bag)
前 2 个袋子的特征和标签(分别有 5 个和 2 个实例):
x_lst[:2]
[array([[8, 0, 3],
[8, 1, 0],
[4, 6, 8],
[1, 6, 4],
[7, 4, 6]]),
array([[5, 8, 4],
[2, 1, 1]])]
y_lst[:2]
[array([0, 1, 1, 1, 0]), array([0, 0])]
接下来,我用零填充特征,用 -1 填充目标:
x_padded = []
y_padded = []
for x, y in zip(x_lst, y_lst):
x_p = np.zeros((max_length, 3))
x_p[:x.shape[0], :x.shape[1]] = x
x_padded.append(x_p)
y_p = np.negative(np.ones(max_length))
y_p[:y.shape[0]] = y
y_padded.append(y_p)
X = np.stack(x_padded)
y = np.stack(y_padded)
where X.shape
等于(10, 5, 3)
and y.shape
等于(10, 5)
.
我对原始模型做了两处更改:添加了 Masking 层
在输入层之后,并将最后一个密集层中的单元数量设置为包的最大尺寸(加上“sigmoid”激活):
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
# Attention and Normalization
x = layers.MultiHeadAttention(
key_dim=head_size, num_heads=num_heads, dropout=dropout
)(inputs, inputs)
x = layers.Dropout(dropout)(x)
x = layers.LayerNormalization(epsilon=1e-6)(x)
res = x + inputs
# Feed Forward Part
x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res)
x = layers.Dropout(dropout)(x)
x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
x = layers.LayerNormalization(epsilon=1e-6)(x)
return x + res
def build_model(
input_shape,
head_size,
num_heads,
ff_dim,
num_transformer_blocks,
mlp_units,
dropout=0,
mlp_dropout=0,
):
inputs = keras.Input(shape=input_shape)
inputs = keras.layers.Masking(mask_value=0)(inputs) # ADDED MASKING LAYER
x = inputs
for _ in range(num_transformer_blocks):
x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)
x = layers.GlobalAveragePooling1D(data_format="channels_first")(x)
for dim in mlp_units:
x = layers.Dense(dim, activation="relu")(x)
x = layers.Dropout(mlp_dropout)(x)
outputs = layers.Dense(5, activation='sigmoid')(x) # CHANGED ACCORDING TO MY OUTPUT
return keras.Model(inputs, outputs)
input_shape = (5, 3)
model = build_model(
input_shape,
head_size=256,
num_heads=4,
ff_dim=4,
num_transformer_blocks=4,
mlp_units=[128],
mlp_dropout=0.4,
dropout=0.25,
)
model.compile(
loss="binary_crossentropy",
optimizer=keras.optimizers.Adam(learning_rate=1e-4),
metrics=["binary_accuracy"],
)
model.summary()
看起来我的模型并没有学到太多东西。如果我使用每个包的真实值数量(y.sum(axis=1)
and Dense(1)
)作为目标而不是对每个实例进行分类,模型学习效果很好。我的错误在哪里?在这种情况下我应该如何构建输出层?我需要自定义丢失功能吗?
更新:
我做了一个自定义损失函数:
def my_loss_fn(y_true, y_pred):
mask = tf.cast(tf.math.not_equal(y_true, tf.constant(-1.)), tf.float32)
y_true, y_pred = tf.expand_dims(y_true, axis=-1), tf.expand_dims(y_pred, axis=-1)
bce = tf.keras.losses.BinaryCrossentropy(reduction='none')
return tf.reduce_sum(tf.cast(bce(y_true, y_pred), tf.float32) * mask)
mask = (y_test != -1).astype(int)
pd.DataFrame({'n_labels': mask.sum(axis=1), 'preds': ((preds * mask) >= .5).sum(axis=1)}).plot(figsize=(20, 5))
And it looks like the model learns:
But it predicts all nonmasked labels as 1.
@thushv89 这是我的问题。我取 2 个时间点:t1 和 t2,并查找在时间 t1 进行维护的所有车辆以及计划在时间 t2 进行维护的所有车辆。这是我的袋子里的东西。然后我计算一些特征,例如 t1 车辆已经花费了多少时间进行维护、从 t1 到 t2 车辆的计划开始需要多长时间等。如果我尝试预测 t2 时刻进行维护的车辆数量,我的模型会学得很好,但我想预测他们中的哪一个会离开,哪一个会进来(3 vs [True, False, True, True] 包里有 4 辆车)。