seq2seq和注意力机制的翻译 1.加载数据文件 与字符级 RNN 教程中使用的字符编码类似,我们将语言中的每个单词表示为一个热向量,或除单个 0 之外的巨大向量(在单词的索引处)。与一种语言中可能存在的数十个字符相比,单词要多得多,因此编码向量要大得多。但是,我们将进行一些作弊,并将数据修剪为每种语言仅使用几千个单词。
我们需要每个单词的唯一索引,以便稍后用作网络的输入和目标。为了跟踪所有这些,我们将使用一个名为的 Lang
帮助程序类,该类具有单词→索引 ( word2index
) 和索引 → 单词 ( index2word
) 字典,以及每个单词 word2count
的计数,稍后将用于替换生僻词。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 SOS_token = 0 EOS_token = 1 class Lang : def __init__ (self, name ): self.name = name self.word2index = {} self.word2count = {} self.index2word = {0 : "SOS" , 1 : "EOS" } self.n_words = 2 def addSentence (self, sentence ): for word in sentence.split(' ' ): self.addWord(word) def addWord (self, word ): if word not in self.word2index: self.word2index[word] = self.n_words self.word2count[word] = 1 self.index2word[self.n_words] = word self.n_words += 1 else : self.word2count[word] += 1
这些文件都是Unicode格式,为了简化起见,我们将Unicode字符转换为ASCII,使所有内容都小写,并修剪大多数标点符号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def unicodeToAscii (s ): return '' .join( c for c in unicodedata.normalize('NFD' , s) if unicodedata.category(c) != 'Mn' )def normalizeString (s ): s = unicodeToAscii(s.lower().strip()) s = re.sub(r"([.!?])" , r" \1" , s) s = re.sub(r"[^a-zA-Z!?]+" , r" " , s) return s.strip()
要读取数据文件,我们将文件拆分为行,然后将行拆分为成对。这些文件都是英语→其他语言,所以如果我们想从其他语言翻译→英语,我添加了 reverse
标志来反转对。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def readLangs (lang1, lang2, reverse=False ): print ("Reading lines..." ) lines = open ('data/%s-%s.txt' % (lang1, lang2), encoding='utf-8' ).\ read().strip().split('\n' ) pairs = [[normalizeString(s) for s in l.split('\t' )] for l in lines] if reverse: pairs = [list (reversed (p)) for p in pairs] input_lang = Lang(lang2) output_lang = Lang(lang1) else : input_lang = Lang(lang1) output_lang = Lang(lang2) return input_lang, output_lang, pairs
由于有很多例句,并且我们想快速训练一些东西,因此我们将数据集修剪为仅相对简短和简单的句子。这里的最大长度是 10 个单词(包括结束标点符号),我们正在过滤到翻译为“我是”或“他是”等形式的句子(考虑到前面替换的撇号)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 MAX_LENGTH = 10 eng_prefixes = ( "i am " , "i m " , "he is" , "he s " , "she is" , "she s " , "you are" , "you re " , "we are" , "we re " , "they are" , "they re " )def filterPair (p ): return len (p[0 ].split(' ' )) < MAX_LENGTH and \ len (p[1 ].split(' ' )) < MAX_LENGTH and \ p[1 ].startswith(eng_prefixes)def filterPairs (pairs ): return [pair for pair in pairs if filterPair(pair)]
准备数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def prepareData (lang1, lang2, reverse=False ): input_lang, output_lang, pairs = readLangs(lang1, lang2, reverse) print ("Read %s sentence pairs" % len (pairs)) pairs = filterPairs(pairs) print ("Trimmed to %s sentence pairs" % len (pairs)) print ("Counting words..." ) for pair in pairs: input_lang.addSentence(pair[0 ]) output_lang.addSentence(pair[1 ]) print ("Counted words:" ) print (input_lang.name, input_lang.n_words) print (output_lang.name, output_lang.n_words) return input_lang, output_lang, pairs input_lang, output_lang, pairs = prepareData('eng' , 'fra' , True )print (random.choice(pairs))
结果:
1 2 3 4 5 6 7 8 Reading lines... Read 135842 sentence pairs Trimmed to 11445 sentence pairs Counting words... Counted words: fra 4601 eng 2991 ['tu preches une convaincue' , 'you re preaching to the choir' ]
2.Seq2Seq模型 2.1 编码器 seq2seq 网络的编码器是一个 RNN,它为输入句子中的每个单词输出一些值。对于每个输入字,编码器输出一个向量和一个隐藏状态,并将隐藏状态用于下一个输入字。
1 2 3 4 5 6 7 8 9 10 11 12 13 class EncoderRNN (nn.Module): def __init__ (self, input_size, hidden_size, dropout_p=0.1 ): super (EncoderRNN, self).__init__() self.hidden_size = hidden_size self.embedding = nn.Embedding(input_size, hidden_size) self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True ) self.dropout = nn.Dropout(dropout_p) def forward (self, input ): embedded = self.dropout(self.embedding(input )) output, hidden = self.gru(embedded) return output, hidden
tips:
nn.Embedding
中第一个参数代表词表的个数范围,比如有1000个中文词语,那么第一个参数就为1000,接下来的input中就不能有超过1000的值,因为input代表的是词表的索引列表,列表的长度是需要嵌入的词语个数。第二个参数是每一个词映射后的维度。例如embedding = nn.Embedding(8, 10)
如果input为torch.LongTensor([1, 2, 3])
,输出size是torch.Size([3, 10])
如果input为torch.LongTensor([[1, 2, 3], [4, 5, 6]])
,输出size是torch.Size([2, 3, 10])
2.2 解码器 2.2.1 简单解码器 在最简单的 seq2seq 解码器中,我们只使用编码器的最后一个输出。最后一个输出有时称为上下文向量,因为它对整个序列中的上下文进行编码。此上下文向量用作解码器的初始隐藏状态。
在解码的每一步中,解码器都会被赋予输入令牌和隐藏状态。初始输入标记是字符串 <SOS>
开头标记,第一个隐藏状态是上下文向量(编码器的最后一个隐藏状态)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class DecoderRNN (nn.Module): def __init__ (self, hidden_size, output_size ): super (DecoderRNN, self).__init__() self.embedding = nn.Embedding(output_size, hidden_size) self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True ) self.out = nn.Linear(hidden_size, output_size) def forward (self, encoder_outputs, encoder_hidden, target_tensor=None ): batch_size = encoder_outputs.size(0 ) decoder_input = torch.empty(batch_size, 1 , dtype=torch.long, device=device).fill_(SOS_token) decoder_hidden = encoder_hidden decoder_outputs = [] for i in range (MAX_LENGTH): decoder_output, decoder_hidden = self.forward_step(decoder_input, decoder_hidden) decoder_outputs.append(decoder_output) if target_tensor is not None : decoder_input = target_tensor[:, i].unsqueeze(1 ) else : _, topi = decoder_output.topk(1 ) decoder_input = topi.squeeze(-1 ).detach() decoder_outputs = torch.cat(decoder_outputs, dim=1 ) decoder_outputs = F.log_softmax(decoder_outputs, dim=-1 ) return decoder_outputs, decoder_hidden, None def forward_step (self, input , hidden ): output = self.embedding(input ) output = F.relu(output) output, hidden = self.gru(output, hidden) output = self.out(output) return output, hidden
tips:
假设batch_size为3,decoder_input = torch.empty(3, 1, dtype=torch.long).fill_(0)
的输出结果是tensor([[0],[0], [0]])
,decoder_input = target_tensor[:, 0].unsqueeze(1)
的目的是为了保证size一致。
测试代码如下:
1 2 3 4 5 6 7 8 target_tensor = torch.LongTensor([[1 , 2 , 3 ], [4 , 5 , 6 ], [7 , 8 , 9 ]]) decoder_input = target_tensor[:, 0 ].unsqueeze(1 )print (decoder_input) decoder_input = torch.empty(3 , 1 , dtype=torch.long).fill_(0 )print (decoder_input)
结果:
1 2 3 4 5 6 tensor([[1 ], [4 ], [7 ]]) tensor([[0 ], [0 ], [0 ]])
2.2.2 注意力解码器 如果仅在编码器和解码器之间传递上下文向量,则该单个向量承担对整个句子进行编码的负担。
注意力允许解码器网络“聚焦”在编码器输出的不同部分,用于解码器自身输出的每一步。首先,我们计算一组注意力权重。这些将乘以编码器输出向量以创建加权组合。结果(在代码中调用 attn_applied
)应包含有关输入序列的特定部分的信息,从而帮助解码器选择正确的输出字。
注意力权重的计算是用另一个前馈层 attn
完成的,使用解码器的输入和隐藏状态作为输入。由于训练数据中有各种大小的句子,因此要实际创建和训练此层,我们必须选择它可以应用的最大句子长度(编码器输出的输入长度)。最大长度的句子将使用所有注意力权重,而较短的句子将仅使用前几个。
Bahdanau 注意力,也称为加法注意力,是序列到序列模型中常用的注意力机制,尤其是在神经机器翻译任务中。Bahdanau等人在他们的论文中介绍了它,题为“通过共同学习对齐和翻译进行神经机器翻译”。这种注意力机制采用学习对齐模型来计算编码器和解码器隐藏状态之间的注意力分数。它利用前馈神经网络来计算对齐分数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 class BahdanauAttention (nn.Module): def __init__ (self, hidden_size ): super (BahdanauAttention, self).__init__() self.Wa = nn.Linear(hidden_size, hidden_size) self.Ua = nn.Linear(hidden_size, hidden_size) self.Va = nn.Linear(hidden_size, 1 ) def forward (self, query, keys ): scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys))) scores = scores.squeeze(2 ).unsqueeze(1 ) weights = F.softmax(scores, dim=-1 ) context = torch.bmm(weights, keys) return context, weightsclass AttnDecoderRNN (nn.Module): def __init__ (self, hidden_size, output_size, dropout_p=0.1 ): super (AttnDecoderRNN, self).__init__() self.embedding = nn.Embedding(output_size, hidden_size) self.attention = BahdanauAttention(hidden_size) self.gru = nn.GRU(2 * hidden_size, hidden_size, batch_first=True ) self.out = nn.Linear(hidden_size, output_size) self.dropout = nn.Dropout(dropout_p) def forward (self, encoder_outputs, encoder_hidden, target_tensor=None ): batch_size = encoder_outputs.size(0 ) decoder_input = torch.empty(batch_size, 1 , dtype=torch.long, device=device).fill_(SOS_token) decoder_hidden = encoder_hidden decoder_outputs = [] attentions = [] for i in range (MAX_LENGTH): decoder_output, decoder_hidden, attn_weights = self.forward_step( decoder_input, decoder_hidden, encoder_outputs ) decoder_outputs.append(decoder_output) attentions.append(attn_weights) if target_tensor is not None : decoder_input = target_tensor[:, i].unsqueeze(1 ) else : _, topi = decoder_output.topk(1 ) decoder_input = topi.squeeze(-1 ).detach() decoder_outputs = torch.cat(decoder_outputs, dim=1 ) decoder_outputs = F.log_softmax(decoder_outputs, dim=-1 ) attentions = torch.cat(attentions, dim=1 ) return decoder_outputs, decoder_hidden, attentions def forward_step (self, input , hidden, encoder_outputs ): embedded = self.dropout(self.embedding(input )) query = hidden.permute(1 , 0 , 2 ) context, attn_weights = self.attention(query, encoder_outputs) input_gru = torch.cat((embedded, context), dim=2 ) output, hidden = self.gru(input_gru, hidden) output = self.out(output) return output, hidden, attn_weights
tips:
为什么要使用attention :在解码的过程中每个被解码的目标所需要的上下文信息是不一样的,对原文中每个词的需求程度不一样,因此需要一个加权的权重来进行注意力的分配。
为什么不直接将encoder_output加一个线性变换进行梯度优化 :这样整个模型只有一个优化向量,无法满足针对不同的解码目标进行不同的注意力分配,因此需要根据decoder_hidden和encoder_outputs进行共同的梯度优化。
为什么不直接将query和key相加进行变换再梯度优化 :通过将查询向量和键向量分别通过线性变换映射到一个共享的注意力空间,可以增加模型的表达能力 ,使其能够更好地捕捉到它们之间的相关性,且学习到的注意力权重更具灵活性 。
为什么注意力分数不能直接作为上下文向量 :因为注意力分数通常是一个标量值,它仅仅反映了查询向量与单个键向量之间的相似性,而无法反映与其他键向量的关系。因此,使用注意力分数作为上下文向量可能会丢失其他键向量的信息,导致模型无法充分利用所有键向量的信息。
3.训练 3.1 准备训练数据 为了训练,对于每一对,我们需要一个输入张量(输入句子中单词的索引 )和目标张量(目标句子中单词的索引 )。在创建这些向量时,我们将EOS令牌附加到两个序列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 def indexesFromSentence (lang, sentence ): return [lang.word2index[word] for word in sentence.split(' ' )]def tensorFromSentence (lang, sentence ): indexes = indexesFromSentence(lang, sentence) indexes.append(EOS_token) return torch.tensor(indexes, dtype=torch.long, device=device).view(1 , -1 )def tensorsFromPair (pair ): input_tensor = tensorFromSentence(input_lang, pair[0 ]) target_tensor = tensorFromSentence(output_lang, pair[1 ]) return (input_tensor, target_tensor)def get_dataloader (batch_size ): input_lang, output_lang, pairs = prepareData('eng' , 'fra' , True ) n = len (pairs) input_ids = np.zeros((n, MAX_LENGTH), dtype=np.int32) target_ids = np.zeros((n, MAX_LENGTH), dtype=np.int32) for idx, (inp, tgt) in enumerate (pairs): inp_ids = indexesFromSentence(input_lang, inp) tgt_ids = indexesFromSentence(output_lang, tgt) inp_ids.append(EOS_token) tgt_ids.append(EOS_token) input_ids[idx, :len (inp_ids)] = inp_ids target_ids[idx, :len (tgt_ids)] = tgt_ids train_data = TensorDataset(torch.LongTensor(input_ids).to(device), torch.LongTensor(target_ids).to(device)) train_sampler = RandomSampler(train_data) train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size) return input_lang, output_lang, train_dataloader
上例中获取到的索引值先是用np包裹成一个n*MAX_LENGTH
的二维矩阵,然后转换成张量,并生成TensorDataset实例,最后用DataLoader加载。
3.2 训练模型 为了训练,我们通过编码器运行输入句子,并跟踪每个输出和最新的隐藏状态。然后,为解码器提供 <SOS>
令牌作为其第一个输入,并将编码器的最后一个隐藏状态作为其第一个隐藏状态。
“Teacher forcing”
是使用真实目标输出作为每个下一个输入的概念,而不是使用解码器的猜测作为下一个输入。使用Teacher forcing
会使其收敛得更快,但当训练好的网络被利用时,它可能会表现出不稳定 。
于 PyTorch 的 autograd 为我们提供了自由,我们可以通过简单的 if 语句随机选择是否使用Teacher forcing
。打开 teacher_forcing_ratio
以使用更多。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def train_epoch (dataloader, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion ): total_loss = 0 for data in dataloader: input_tensor, target_tensor = data encoder_optimizer.zero_grad() decoder_optimizer.zero_grad() encoder_outputs, encoder_hidden = encoder(input_tensor) decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden, target_tensor) loss = criterion( decoder_outputs.view(-1 , decoder_outputs.size(-1 )), target_tensor.view(-1 ) ) loss.backward() encoder_optimizer.step() decoder_optimizer.step() total_loss += loss.item() return total_loss / len (dataloader)
整个训练过程如下所示:
启动计时器
初始化优化器和条件
创建一组训练对
启动用于绘图的空损失数组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def train (train_dataloader, encoder, decoder, n_epochs, learning_rate=0.001 , print_every=100 , plot_every=100 ): start = time.time() plot_losses = [] print_loss_total = 0 plot_loss_total = 0 encoder_optimizer = optim.Adam(encoder.parameters(), lr=learning_rate) decoder_optimizer = optim.Adam(decoder.parameters(), lr=learning_rate) criterion = nn.NLLLoss() for epoch in range (1 , n_epochs + 1 ): loss = train_epoch(train_dataloader, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion) print_loss_total += loss plot_loss_total += loss if epoch % print_every == 0 : print_loss_avg = print_loss_total / print_every print_loss_total = 0 print ('%s (%d %d%%) %.4f' % (timeSince(start, epoch / n_epochs), epoch, epoch / n_epochs * 100 , print_loss_avg)) if epoch % plot_every == 0 : plot_loss_avg = plot_loss_total / plot_every plot_losses.append(plot_loss_avg) plot_loss_total = 0 showPlot(plot_losses)
4.评估 评估与训练基本相同,但没有目标,因此我们只是将解码器的预测反馈给每个步骤。每当它预测一个单词时,我们都会将其添加到输出字符串中,如果它预测了EOS,我们就停在那里。我们还存储解码器的注意力输出,以便以后显示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def evaluate (encoder, decoder, sentence, input_lang, output_lang ): with torch.no_grad(): input_tensor = tensorFromSentence(input_lang, sentence) encoder_outputs, encoder_hidden = encoder(input_tensor) decoder_outputs, decoder_hidden, decoder_attn = decoder(encoder_outputs, encoder_hidden) _, topi = decoder_outputs.topk(1 ) decoded_ids = topi.squeeze() decoded_words = [] for idx in decoded_ids: if idx.item() == EOS_token: decoded_words.append('<EOS>' ) break decoded_words.append(output_lang.index2word[idx.item()]) return decoded_words, decoder_attn
我们可以评估训练集中的随机句子,并打印出输入、目标和输出,以做出一些主观的质量判断:
1 2 3 4 5 6 7 8 9 def evaluateRandomly (encoder, decoder, n=10 ): for i in range (n): pair = random.choice(pairs) print ('>' , pair[0 ]) print ('=' , pair[1 ]) output_words, _ = evaluate(encoder, decoder, pair[0 ], input_lang, output_lang) output_sentence = ' ' .join(output_words) print ('<' , output_sentence) print ('' )
5.最终训练 1 2 3 4 5 6 7 8 9 hidden_size = 128 batch_size = 32 input_lang, output_lang, train_dataloader = get_dataloader(batch_size) encoder = EncoderRNN(input_lang.n_words, hidden_size).to(device) decoder = AttnDecoderRNN(hidden_size, output_lang.n_words).to(device) train(train_dataloader, encoder, decoder, 80 , print_every=5 , plot_every=5 )