有矩阵
X
∈
R
n
×
d
X\in\R^{n\times d}
X∈Rn×d 和指示向量
m
∈
{
0
,
1
}
n
m\in\{0,1\}^n
m∈{0,1}n,其中
m
i
=
1
m_i=1
mi=1 指明的行是常量,不可训练,即 requires_grad=False
;而
m
i
=
0
m_i=0
mi=0 对应的行是 learnable 的变量,requires_grad=True
(如缺失数据)。此处为此实现一个 wrapper 类,使其调用类似一般 tensor。
preliminaries
- 验证:constant 和 variable 放在同一个 tensor 里,能否正常计算梯度,即 constant 无梯度、variable 有梯度。
- 两种写法:concatenating、预分配空间 + copying。
- 结论:两种都可以。
import torch
X = torch.arange(12).view(4, 3).float()
print(X)
mask = torch.tensor([1, 0, 1, 0]).int()
n_var = (0 == mask).sum()
# 常量部分
X_const = X[mask > 0]
print(X_const)
# 变量部分
X_var = torch.normal(0, 1, size=[n_var, X.size(1)])
X_var.requires_grad_(True)
print(X_var)
print("写法 1. grad_fn=<CatBackward>")
ic, ip = 0, 0
X_mix = []
for i in range(X.size(0)):
if mask[i] > 0:
X_mix.append(X_const[ic:ic+1])
ic += 1
else:
X_mix.append(X_var[ip:ip+1])
ip += 1
X_mix = torch.cat(X_mix, dim=0)
print(X_mix)
"""print("写法 2. grad_fn=<CopySlices>")
ic, ip = 0, 0
X_mix = torch.zeros_like(X)
for i in range(X.size(0)):
if mask[i] > 0:
X_mix[i] = X_const[ic:ic+1]
ic += 1
else:
X_mix[i] = X_var[ip:ip+1]
ip += 1
print(X_mix)
"""
loss = ((X - X_mix) ** 2).sum()
loss.backward()
print("--- grad ---")
print(X_const.grad)
print(X_var.grad)
print("--- update ---")
X_var.data -= X_var.grad
print(X_var)
wrapper class & sample
-
MixVar
是 wrapper 类
- 一个 reconstruction 的例子
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.parameter import Parameter
class MixVar(nn.Module):
"""mixture of constants & trainable variables"""
def __init__(self, X, const_mask, init_val=None, process_fn=None):
"""
Input:
X: [n, d], FULL matrix including both constants & (placeholders of) variables
const_mask: [n], in {0, 1}, indicating whether the i-th item is constant
init_val: constant initializer of variables
process_fn: something to do before returning the var,
e.g. normalization, activation, etc.
"""
super(MixVar, self).__init__()
self.X = X
self.const_mask = const_mask
self.process_fn = process_fn
self.full_indices = np.arange(X.size(0))
assert X.size(0) == const_mask.size(0)
n = X.size(0) # 总数据量,包括 constant 和 variable
n_const = const_mask.sum() # constant 数
n_var = n - n_const # variable 数
assert n_var > 0, "* constant only, no need to use this class"
size = [n_var, X.size(1)]
# variable 另外放在 `self.weight` 里
# 注意此时其 indexing 和 constant 已**不同**
# 所以需要下面的 id map
if init_val is None:
self.weight = Parameter(torch.Tensor(*size))
self.reset_parameters()
else:
self.weight = Parameter(init_val * torch.ones(*size, dtype=torch.float))
# map the full id in `X` to the relative one in `weight`
_cnt = 0
self.id_map = {}
for i in range(n):
if 0 == const_mask[i]:
self.id_map[i] = _cnt
_cnt += 1
assert _cnt == n_var
def reset_parameters(self):
nn.init.kaiming_uniform_(self.weight, a=math.sqrt(5))
def forward(self, index=None):
"""MUST use this function for slicing instead of slicing manually"""
if index is None:
index = self.full_indices
res = torch.zeros(index.shape[0], self.X.size(1),
dtype=self.X.dtype).to(self.weight.device)
for i in range(index.shape[0]):
_idx = index[i]
if self.const_mask[_idx] > 0:
res[i] = self.X[_idx].to(self.weight.device)
else:
res[i] = self.weight[self.id_map[_idx]]
if self.process_fn:
res = self.process_fn(res)
return res
def extra_repr(self):
return 'size={}'.format(self.X.size())
# 一个使用例子
X = torch.arange(12).view(6, 2).float()
print("original:\n", X)
mask = torch.tensor([1, 0, 1, 0, 0, 1]).int()
X_mix = MixVar(X, mask)
indices = np.arange(X.size(0))
optimizer = torch.optim.SGD(X_mix.parameters(), lr=0.1)
batch_size = 2
for epoch in range(100):
for i in range(0, X.size(0), batch_size):
index = indices[i: i + batch_size]
loss = F.mse_loss(X[index], X_mix(index))
optimizer.zero_grad()
loss.backward()
optimizer.step()
print("reconstructed:\n", X_mix().data)
original:
tensor([[ 0., 1.],
[ 2., 3.],
[ 4., 5.],
[ 6., 7.],
[ 8., 9.],
[10., 11.]])
reconstructed:
tensor([[ 0.0000, 1.0000],
[ 1.9914, 2.9829],
[ 4.0000, 5.0000],
[ 5.9679, 6.9619],
[ 7.9526, 8.9491],
[10.0000, 11.0000]])