Ernie模型实现口语理解任务
一、构建词表
def word2id_func(raw_dataset):
# returns a dictionary of words and their ids
# print('raw_dataset:' , raw_dataset)
words = []
for entry in raw_dataset:
words.extend(entry['utterance'].split())
words = list(set(words))
words_dict = {'[PAD]': PAD_TOKEN}
words_dict.update({w:i+1 for i, w in enumerate(words)})
words_dict['[UNK]'] = len(words_dict)
print('words_dict:', words_dict)
return words_dict
将训练数据、验证数据、测试数据全部传入,输出一个字典,字典的键为词,值为对应序号。
第一个为'[PAD]': 0
,最后一个为 '[UNK]': 12135
words_dict.update({w:i+1 for i, w in enumerate(words)})
这段代码使用了列表推导式(list comprehension)来生成一个字典,其中键是列表 words 中的元素,值是对应元素在列表中的索引加一。
- enumerate(words) 遍历列表 words 中的元素,并同时返回元素的索引和值;
- i+1 表示将索引值加一,从而使索引从1开始而不是默认的0。w 是当前元素的值;
- w:i+1 for i, w in enumerate(words) 指定了字典的键和值的生成方式;
举例来说,如果 words = [‘I’, ‘love’, ‘you’],则该代码会生成一个字典 { ‘I’: 1, ‘love’: 2, ‘you’: 3 }。
构建槽位索引序列
def slot2id_func(raw_dataset):
# returns a dictionary of slots and their ids
slots = ['[PAD]']
for entry in raw_dataset:
slots.extend(entry['slots'].split())
slots = list(set(slots))
slots_dict = {s:i for i, s in enumerate(slots)}
print('slots_dict:', slots_dict)
return slots_dict
根据给定的原始数据集,返回一个包含槽位及其ID的字典。
- 定义了一个名为slots的列表,并将
[PAD]
作为初始元素添加到该列表中;
- 对于原始数据集中的每个条目,将其槽位信息通过空格分割拆分为多个槽位,并extend到slots列表中;
- 使用
set()
函数将slots列表转换为集合,在此过程中去除重复的槽位,并再次将其转换为列表;
- 使用字典推导式创建一个字典slots_dict,其中槽位作为键,它们的索引作为值。索引值通过
enumerate(slots)
获取,从0开始递增。
构建意图标签索引
def intent2id_func(raw_dataset):
# returns a dictionary of intents and their ids
intents = [entry['intent'] for entry in raw_dataset]
intents = list(set(intents))
intents_dict = {inte:i for i, inte in enumerate(intents)}
# print('intents_dict:', intents_dict)
return intents_dict
构建词表
def vocab_func(raw_dataset):
vocab = set()
for entry in raw_dataset:
vocab = vocab.union(set(entry['utterance'].split()))
print('list(vocab):', list(vocab))
return ['[PAD]'] + list(vocab) + ['[UNK]']
二、定义IntentsAndSlots类
class IntentsAndSlots(data.Dataset):
# Mandatory methods are __init__, __len__ and __getitem__
def __init__(self, dataset, lang, unk='[UNK]'):
self.utterances = []
self.intents = []
self.slots = []
self.unk = unk
for x in dataset:
self.utterances.append(x['utterance'])
self.slots.append(x['slots'])
self.intents.append(x['intent'])
self.utt_ids = self.mapping_seq(self.utterances, lang.word2id)
self.slot_ids = self.mapping_seq(self.slots, lang.slot2id)
self.intent_ids = self.mapping_lab(self.intents, lang.intent2id)
def __len__(self):
return len(self.utterances)
def __getitem__(self, idx):
utt = torch.Tensor(self.utt_ids[idx])
slots = torch.Tensor(self.slot_ids[idx])
intent = self.intent_ids[idx]
sample = {'utterance': utt, 'slots': slots, 'intent': intent}
return sample
# Auxiliary methods
def mapping_lab(self, data, mapper):
return [mapper[x] if x in mapper else mapper[self.unk] for x in data]
def mapping_seq(self, data, mapper): # Map sequences to number
res = []
for seq in data:
tmp_seq = []
for x in seq.split():
if x in mapper:
tmp_seq.append(mapper[x])
else:
tmp_seq.append(mapper[self.unk])
res.append(tmp_seq)
return res
调用了mapping_seq
方法,将utterances、slots和intents转换为相应的ID序列,并保存在utt_ids、slot_ids和intent_ids中。
__getitem__
方法:给定的索引idx,获取相应位置上的utt_ids、slot_ids和intent_ids,并使用torch.Tensor进行转换。然后创建一个字典sample,将转换后的数据作为键值对存储在字典中。
mapping_lab
将给定的数据列表data根据映射字典mapper进行映射转换。该方法使用列表推导式,遍历数据列表data中的元素x。如果x存在于映射字典mapper中,则将x映射为对应的值;否则,将x映射为指定的未知标记unk对应的值。
数据加载器
def collate_fn(data):
def merge(sequences):
'''
merge from batch * sent_len to batch * max_len
'''
lengths = [len(seq) for seq in sequences]
max_len = 1 if max(lengths) == 0 else max(lengths)
# Pad token is zero in our case
# So we create a matrix full of PAD_TOKEN (i.e. 0) with the shape
# batch_size X maximum length of a sequence
padded_seqs = torch.LongTensor(len(sequences), max_len).fill_(PAD_TOKEN)
for i, seq in enumerate(sequences):
end = lengths[i]
padded_seqs[i, :end] = seq # We copy each sequence into the matrix
# print(padded_seqs)
padded_seqs = padded_seqs.detach() # We remove these tensors from the computational graph
return padded_seqs, lengths
# Sort data by seq lengths
data.sort(key=lambda x: len(x['utterance']), reverse=True)
new_item = {}
for key in data[0].keys():
new_item[key] = [d[key] for d in data]
# We just need one length for packed pad seq, since len(utt) == len(slots)
src_utt, _ = merge(new_item['utterance'])
y_slots, y_lengths = merge(new_item["slots"])
intent = torch.LongTensor(new_item["intent"])
src_utt = src_utt.to(device) # We load the Tensor on our seleceted device
y_slots = y_slots.to(device)
intent = intent.to(device)
y_lengths = torch.LongTensor(y_lengths).to(device)
new_item["utterances"] = src_utt
new_item["intents"] = intent
new_item["y_slots"] = y_slots
new_item["slots_len"] = y_lengths
return new_item
collate_fn函数的作用是对数据进行填充和合并操作,以便于模型训练过程中对不同长度的序列进行批次化处理。
三、定义参数
hid_size = 200
emb_size = 300
lr = 0.0001 # learning rate
clip = 5 # gradient clipping
out_slot = len(lang.slot2id)
out_int = len(lang.intent2id)
vocab_len = len(lang.word2id)
train_raw = load_data(os.path.join('data', dataset, 'train.json'))
test_raw = load_data(os.path.join('data', dataset, 'test.json'))
dev_raw = load_data(os.path.join('data', dataset, 'valid.json'))
四、定义模型
class JERNIE(nn.Module):
def __init__(self, out_int, out_slot):
super(JERNIE, self).__init__()
self.tokenizer = AutoTokenizer.from_pretrained("nghuyong/ernie-2.0-base-en")
self.ERNIE = AutoModel.from_pretrained("nghuyong/ernie-2.0-base-en")
self.ERNIE.to(device)
# 定义两个线性层,用于意图分类与槽位填充的输出
self.intent_classifier = nn.Linear(768, out_int)
self.slot_classifier = nn.Linear(768, out_slot)
def forward(self, input, lang):
# get back the input sentence
utterance = []
for element in input:
utterance.append(' '.join(lang.vocab[i] for i in element if i > 0))
tokenized = self.tokenizer(utterance, return_tensors='pt', add_special_tokens=True, padding=True).to(device)
output = self.ERNIE(**tokenized)
intent = output.pooler_output
slots = output.last_hidden_state[:, :input.size(1), :]
intent = self.intent_classifier(intent)
slots = self.slot_classifier(slots)
slots = slots.permute(0, 2, 1)
return intent, slots
- 前向传播函数中,输入数据被表示为一个列表input,每个元素代表一个句子。通过遍历输入句子,将索引转换为对应的词汇,形成句子列表utterance;
- 然后,使用tokenizer对utterance进行编码处理,得到tokenized对象,其中包含编码后的词汇索引和特殊标记等信息。将其移动到指定的设备上;
- 接下来,将编码后的句子输入ERNIE模型,获得模型的输出。其中,意图部分直接使用output.pooler_output作为意图分类的输入,槽部分则使用output.last_hidden_state进行处理;
- 最后,通过意图分类器和槽分类器分别对意图和槽进行分类,并对槽分类结果进行维度变换。返回意图分类结果intent和槽分类结果slots。
slots = slots.permute(0, 2, 1)
这行代码使用了PyTorch的permute函数,用于交换张量的维度顺序。在这里,slots.permute(0, 2, 1)将slots张量的维度从(批大小, 序列长度, 隐藏单元数)变为(批大小, 隐藏单元数, 序列长度)。通过这个操作,槽分类结果的维度被重新排列,以便后续处理或计算的需要。
五、循环评估函数
from conll import evaluate
from sklearn.metrics import classification_report
def evaluation_loop(data, criterion_slots, criterion_intents, model, lang):
model.eval()
loss_array = []
ref_intents = []
hyp_intents = []
ref_slots = []
hyp_slots = []
with torch.no_grad(): # It used to avoid the creation of computational graph
for sample in data:
intents, slots = model(sample['utterances'], lang)
loss_intent = criterion_intents(intents, sample['intents'])
loss_slot = criterion_slots(slots, sample['y_slots'])
loss = loss_intent + loss_slot
loss_array.append(loss.item())
# Intent inference
# Get the highest probable class
out_intents = [lang.id2intent[x]
for x in torch.argmax(intents, dim=1).tolist()]
gt_intents = [lang.id2intent[x] for x in sample['intents'].tolist()]
ref_intents.extend(gt_intents)
hyp_intents.extend(out_intents)
# Slot inference
output_slots = torch.argmax(slots, dim=1)
for id_seq, seq in enumerate(output_slots):
length = sample['slots_len'].tolist()[id_seq]
utt_ids = sample['utterance'][id_seq][:length].tolist()
gt_ids = sample['y_slots'][id_seq].tolist()
gt_slots = [lang.id2slot[elem] for elem in gt_ids[:length]]
utterance = [lang.id2word[elem] for elem in utt_ids]
to_decode = seq[:length].tolist()
ref_slots.append([(utterance[id_el], elem) for id_el, elem in enumerate(gt_slots)])
tmp_seq = []
for id_el, elem in enumerate(to_decode):
tmp_seq.append((utterance[id_el], lang.id2slot[elem]))
hyp_slots.append(tmp_seq)
try:
results = evaluate(ref_slots, hyp_slots)
except Exception as ex:
# Sometimes the model predics a class that is not in REF
print(ex)
ref_s = set([x[1] for x in ref_slots])
hyp_s = set([x[1] for x in hyp_slots])
print(hyp_s.difference(ref_s))
report_intent = classification_report(ref_intents, hyp_intents,
zero_division=False, output_dict=True)
return results, report_intent, loss_array
评估循环函数。它用于对模型在给定数据集上进行评估,并返回评估结果。
- 将模型设为评估模式(model.eval())
- 初始化损失数组(loss_array)以及存储参考意图、预测意图、参考槽值和预测槽值的列表(ref_intents、hyp_intents、ref_slots、hyp_slots)
- 使用torch.no_grad()块,禁止梯度计算,以加快评估过程
- 针对数据集中的每个样本,使用模型输入句子的编码表示并获得输出的意图和槽表示
- 计算意图分类损失和槽分类损失,并将它们相加得到总损失,将总损失添加到损失数组中
- 根据意图输出,将索引转换为对应的意图标签,分别存储参考意图和预测意图
- 对槽输出进行处理,将索引转换为对应的槽标签,并将参考槽和预测槽分别存储在ref_slots和hyp_slots中
- 在evaluate函数中评估参考槽和预测槽之间的性能,得到评估结果。如果出现异常(例如模型预测的类别不在参考槽中),则打印异常信息和差异部分
- 对意图分类的预测结果和参考结果进行评估,得到分类报告
- 返回评估结果、意图分类报告和损失数组。
def training_loop(data, optimizer, criterion_slots, criterion_intents, model, lang):
model.train()
loss_array = []
for sample in data:
optimizer.zero_grad() # Zeroing the gradient
intent, slots = model(sample['utterances'], lang)
loss_intent = criterion_intents(intent, sample['intents'])
loss_slot = criterion_slots(slots, sample['y_slots'])
loss = loss_intent + loss_slot # In joint training we sum the losses.
# Is there another way to do that?
loss_array.append(loss.item())
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
optimizer.step() # Update the weights
return loss_array
- 代码首先将模型设置为训练模式,然后创建一个空的损失数组用于保存每个训练样本的损失值。
- 接下来,通过一个循环遍历训练数据集中的每个样本。在每次迭代中,首先将优化器的梯度清零(optimizer.zero_grad()),以便计算新的梯度。
- 通过调用模型(model)传入输入序列(sample[‘utterances’])和lang,获取模型对意图(intent)和槽(slots)的预测结果。
- 分别计算意图损失(loss_intent)和槽损失(loss_slot)。在联合训练中,将两者的损失相加得到总损失(loss)。
- 将当前样本的总损失值添加到损失数组(loss_array)中,以便后续分析和可视化。
- 通过调用 loss.backward() 计算损失相对于模型参数的梯度,并通过 torch.nn.utils.clip_grad_norm_() 对梯度进行裁剪,以避免梯度爆炸的问题。
- 最后,调用 optimizer.step() 来更新模型的权重参数,使其朝着损失减小的方向更新。
- 函数返回损失数组,用于后续分析和可视化训练过程中的损失变化。
六、开始训练
from tqdm import tqdm
for x in tqdm(range(1, n_epochs)):
loss = training_loop(train_loader, optimizer, criterion_slots,
criterion_intents, model, lang)
if x % 5 == 0:
sampled_epochs.append(x)
losses_train.append(np.asarray(loss).mean())
results_dev, intent_res, loss_dev = evaluation_loop(dev_loader, criterion_slots,
criterion_intents, model, lang)
losses_dev.append(np.asarray(loss_dev).mean())
f1 = results_dev['total']['f']
if f1 > best_f1:
best_f1 = f1
else:
# halve optimizer learning rate
if patience % 3 == 0:
for param_group in optimizer.param_groups:
param_group['lr'] = param_group['lr'] / 2
patience -= 1
if patience <= 0: # Early stopping with patience
break # Not nice but it keeps the code clean
results_test, intent_test, _ = evaluation_loop(test_loader, criterion_slots,
criterion_intents, model, lang)
完整的训练过程:
- 首先定义了一个循环,循环次数是n_epochs(训练轮数)。在每一轮训练中,调用training_loop函数对模型进行训练,并将返回的损失值保存在变量loss中。
- if条件语句检查当前轮数x是否可以被5整除。如果满足条件,则进行以下操作:
- 将当前轮数x添加到sampled_epochs列表中;
- 计算训练集的平均损失,并将其添加到losses_train列表中;
- 调用evaluation_loop函数对开发集进行评估,计算评估结果results_dev、意图结果intent_res和损失值loss_dev;
- 计算验证集评估结果中的总体F1分数f1;
- 如果当前的F1分数大于最佳F1分数best_f1,则将最佳F1分数更新为当前F1分数;
- 否则,减少优化器的学习率(learning rate);
- 如果patience(耐心)被3整除,则将所有参数组的学习率除以2,即将学习率减半;
- 如果耐心(patience)小于等于0,则结束训练循环,即提前停止训练。
- 对测试集进行评估,计算评估结果results_test、意图结果intent_test和损失值。
七、运行结果
num |
measure |
model |
score |
1 |
Slot F1 |
ERNIE |
0.9454038997214484 |
1 |
Intent Accuracy |
ERNIE |
0.8628571428571429 |
2 |
Slot F1 |
ERNIE |
0.9372222222222222 |
2 |
Intent Accuracy |
ERNIE |
0.8642857142857143 |