transformer语言翻译!🌞

transformer语言翻译

1.数据来源和处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
SRC_LANGUAGE = 'de'
TGT_LANGUAGE = 'en'

token_transform = {}
vocab_transform = {}

token_transform[SRC_LANGUAGE] = get_tokenizer('spacy', language='de_core_news_md-3.7.0')
token_transform[TGT_LANGUAGE] = get_tokenizer('spacy', language='en_core_web_md-3.7.1')


def yield_tokens(data_iter: Iterable, language: str) -> List[str]:
language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}

for data_sample in data_iter:
tokenized_text = token_transform[language](data_sample[language_index[language]])
# print(tokenized_text)
yield tokenized_text


UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3

special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']

train_path_en = 'train.en'
train_path_de = 'train.de'
data_pipe_train_en = dp.iter.IterableWrapper([train_path_en])
data_pipe_train_de = dp.iter.IterableWrapper([train_path_de])

train_src_data_dp = dp.iter.FileOpener(data_pipe_train_de, encoding="utf-8").readlines(
return_path=False, strip_newline=True
)

train_tgt_data_dp = dp.iter.FileOpener(data_pipe_train_en, encoding="utf-8").readlines(
return_path=False, strip_newline=True
)

train_iter = train_src_data_dp.zip(train_tgt_data_dp).shuffle().set_shuffle(False).sharding_filter()

# for sample in train_iter:
# print(sample)

for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
vocab_transform[ln] = build_vocab_from_iterator(yield_tokens(train_iter, ln),
min_freq=1,
specials=special_symbols,
special_first=True)

for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
vocab_transform[ln].set_default_index(UNK_IDX)

# print(vocab_transform[TGT_LANGUAGE].get_stoi())

2.使用 Transformer 的 Seq2Seq 网络

2.1 位置编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


# print(DEVICE.type)

class PositionalEncoding(nn.Module):
def __init__(self,
emb_size: int,
dropout: float,
maxlen: int = 5000):
super().__init__()
# (emb_size/2)的一维张量
den = torch.exp(- torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
# (5000, 1)的二维张量
pos = torch.arange(0, maxlen).reshape(maxlen, 1)
# (5000, emb_size)的二维全0张量
pos_embedding = torch.zeros((maxlen, emb_size))
pos_embedding[:, 0::2] = torch.sin(pos * den)
pos_embedding[:, 1::2] = torch.cos(pos * den)
pos_embedding = pos_embedding.unsqueeze(-2)

self.dropout = nn.Dropout(dropout)
self.register_buffer('pos_embedding', pos_embedding)

def forward(self, token_embedding: Tensor):
# pos_embedding[:token_embedding.size(0), :],其中token_embedding.size(0)是获取输入字符串的长度,所以在初始pos_embedding时会用maxlen的大小
return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

tips:

1.一旦使用 self.register_buffer('pos_embedding', pos_embedding)pos_embedding 注册为模型的缓冲区,就可以使用 self.pos_embedding 来调用它。

2.为什么要注册到缓冲区?在 PyTorch 中,模型的参数通常是通过 nn.Parameter 对象进行管理的,这些参数会随着模型的训练而更新。然而,并非所有的模型参数都需要在训练过程中被更新。有些参数是在模型的初始化阶段就固定下来的,比如在 Transformer 模型中的位置编码。

对于这种不需要更新的固定参数,将它们注册为模型的缓冲区是一个比较好的做法,这样做有几个好处:

  1. 状态保存和加载:注册为缓冲区的参数会被包含在模型的状态字典(state_dict)中,因此在保存模型时,这些参数会自动保存。在加载模型时,这些参数也会被自动加载,而不需要额外的处理。
  2. GPU/CPU 转移:当模型移动到 GPU 或者 CPU 上时,注册为缓冲区的参数也会自动跟着移动,而不需要额外的处理。这样可以使得代码更具有通用性,不需要针对不同设备编写不同的逻辑。
  3. 代码可读性和可维护性:通过将不需要更新的参数注册为缓冲区,可以更加清晰地表达模型结构。这样可以使得代码更易于理解和维护。

综上所述,将不需要更新的固定参数注册为模型的缓冲区是一种良好的实践,能够提高代码的可读性、可维护性,并且能够自动处理状态保存、加载和设备转移等问题。

2.2 字符编码

1
2
3
4
5
6
7
8
9
class TokenEmbedding(nn.Module):
def __init__(self, vocab_size: int, emb_size):
super().__init__()
self.embedding = nn.Embedding(vocab_size, emb_size)
self.emb_size = emb_size

def forward(self, tokens: Tensor):
# embedding的输入是一个张量
return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

tips:

1.为什么要用tokens.long()?在PyTorch中,nn.Embedding层的输入需要是长整型(LongTensor)类型的数据。tokens.long() 的作用就是将输入的 tokens 张量中的数据类型转换为长整型。这是因为在实际应用中,tokens 往往是表示词汇表中某个词的索引,索引一般是整数类型,因此需要将其转换为长整型,以便与 nn.Embedding 层兼容。

2.为什么要乘math.sqrt(self.emb_size)?math.sqrt(self.emb_size) 被用来对嵌入向量进行缩放操作,可能是为了控制嵌入向量的数值范围或方差。

2.3 Transformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class Seq2SeqTransformer(nn.Module):
def __init__(self,
num_encoder_layers: int,
num_decoder_layers: int,
emb_size: int,
nhead: int,
src_vocab_size: int,
tgt_vocab_size: int,
dim_feedforward: int = 512,
dropout: float = 0.1):
super().__init__()
self.transformer = Transformer(d_model=emb_size,
nhead=nhead,
num_encoder_layers=num_encoder_layers,
num_decoder_layers=num_decoder_layers,
dim_feedforward=dim_feedforward,
dropout=dropout)
# 初始化一个线性层作为生成器,用于将 Transformer 输出转换为目标语言词汇表的大小。
self.generator = nn.Linear(emb_size, tgt_vocab_size)
self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
self.positional_encoding = PositionalEncoding(
emb_size, dropout=dropout)

def forward(self,
src: Tensor,
trg: Tensor,
src_mask: Tensor,
tgt_mask: Tensor,
src_padding_mask: Tensor,
tgt_padding_mask: Tensor,
memory_key_padding_mask: Tensor):
src_emb = self.positional_encoding(self.src_tok_emb(src))
tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None,
src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
return self.generator(outs)

def encode(self, src: Tensor, src_mask: Tensor):
return self.transformer.encoder(self.positional_encoding(
self.src_tok_emb(src)), src_mask)

def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
return self.transformer.decoder(self.positional_encoding(
self.tgt_tok_emb(tgt)), memory,
tgt_mask)

tips:

在 Seq2Seq 模型中,通常分为编码器 (Encoder) 和解码器 (Decoder) 两个部分。编码器负责将输入序列编码为一个语义空间中的表示,而解码器则根据这个表示生成输出序列。

虽然在 forward 方法中已经调用了 Transformer 模型进行编码器-解码器的处理,但是在某些情况下,我们可能需要分别对编码器和解码器进行操作,这就是为什么额外定义了 encodedecode 方法的原因。

  1. 编码器操作encode 方法允许我们单独对输入序列进行编码,而不需要进行解码器的操作。这在某些情况下是有用的,比如对于一些无需生成输出的任务,只需要输入序列的表示即可。
  2. 解码器操作decode 方法允许我们在给定编码后的记忆的情况下,单独对目标序列进行解码。这在一些场景下也是有用的,比如基于已有的语义表示生成一些补充信息,或者在解码器训练中进行推理。

通过将编码器和解码器的操作分别定义成方法,可以使模型更加灵活,可以根据需求进行更细粒度的操作。

2.4 单词掩码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def generate_square_subsequent_mask(sz):
mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask


def create_mask(src, tgt):
src_seq_len = src.shape[0]
tgt_seq_len = tgt.shape[0]

tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
src_mask = torch.zeros((src_seq_len, src_seq_len),device=DEVICE).type(torch.bool)

src_padding_mask = (src == PAD_IDX).transpose(0, 1)
tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

假如:

1
2
src = torch.tensor([[2, 3, 5, 1, 0], [4, 2, 1, 0, 0], [3, 2, 1, 5, 0]])
tgt = torch.tensor([[2, 4, 3, 1, 0], [3, 1, 4, 0, 0]])

则输出结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
tensor([[0., -inf, -inf, -inf],
[0., 0., -inf, -inf],
[0., 0., 0., -inf],
[0., 0., 0., 0.]], device='cuda:0')
src_mask:
tensor([[False, False, False],
[False, False, False],
[False, False, False]], device='cuda:0')

tgt_mask:
tensor([[0., -inf],
[0., 0.]], device='cuda:0')

src_padding_mask:
tensor([[False, False, False],
[False, False, False],
[False, True, True],
[ True, False, False],
[False, False, False]])

tgt_padding_mask:
tensor([[False, False],
[False, True],
[False, False],
[ True, False],
[False, False]])

2.5 实例化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
torch.manual_seed(0)

SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE])
EMB_SIZE = 512
NHEAD = 8
FFN_HID_DIM = 512
BATCH_SIZE = 128
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3

transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,
NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)

# 权重初始化
for p in transformer.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)

transformer = transformer.to(DEVICE)

loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)

optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

tips:

权重初始化时,这段代码遍历了 Transformer 模型中的所有参数,如果参数的维度大于 1(即不是偏置项),则使用 Xavier 均匀分布进行初始化。Xavier 初始化旨在使得每一层的输出方差保持相等,从而避免梯度消失或爆炸问题,有助于加速模型的收敛。

3.整理

3.1 数据转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

from torch.nn.utils.rnn import pad_sequence

# 对输入的字符串进行转化,*transforms分别表示进行分词,数字化并且添加开始符和结束符
def sequential_transforms(*transforms):
def func(txt_input):
for transform in transforms:
txt_input = transform(txt_input)
return txt_input
return func

# 添加开始符和结束符
def tensor_transform(token_ids: List[int]):
return torch.cat((torch.tensor([BOS_IDX]),
torch.tensor(token_ids),
torch.tensor([EOS_IDX])))

# 对源字符串和目标字符串进行转换
text_transform = {}
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
text_transform[ln] = sequential_transforms(token_transform[ln], # 分词
vocab_transform[ln], # 数字化
tensor_transform) # 添加开始和终止符


# 对批量的数据进行填充,填充的长度按批次中最大长度进行处理
def collate_fn(batch):
src_batch, tgt_batch = [], []
for src_sample, tgt_sample in batch:
src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))
tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))

src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
return src_batch, tgt_batch

# 将数据集用数据加载器包裹
train_dataloader = DataLoader(train_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)

# 输出第一个批次的结果
for src, tgt in train_dataloader:
print(src)
print(tgt)
print(src.shape, tgt.shape)
break

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
tensor([[ 2,  2,  2,  ...,  2,  2,  2],
[21, 84, 5, ..., 21, 14, 14],
[85, 31, 69, ..., 46, 38, 17],
...,
[ 1, 1, 1, ..., 1, 1, 1],
[ 1, 1, 1, ..., 1, 1, 1],
[ 1, 1, 1, ..., 1, 1, 1]])
tensor([[ 2, 2, 2, ..., 2, 2, 2],
[ 19, 165, 6, ..., 19, 6, 6],
[ 25, 36, 61, ..., 52, 39, 16],
...,
[ 1, 1, 1, ..., 1, 1, 1],
[ 1, 1, 1, ..., 1, 1, 1],
[ 1, 1, 1, ..., 1, 1, 1]])
torch.Size([27, 128]) torch.Size([24, 128])

tips:

如果padded_sequences中没加 batch_first=True,则填充过程中张量会发生转置。

举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import torch
from torch.nn.utils.rnn import pad_sequence

# 假设我们有一组序列张量
sequences = [torch.tensor([1, 2, 3]),
torch.tensor([4, 5]),
torch.tensor([6, 7, 8, 9])]

# 使用pad_sequence进行填充
padded_sequences = pad_sequence(sequences, padding_value=0)

print(padded_sequences)

# 没有 batch_first=True 的输出
tensor([[1, 4, 6],
[2, 5, 7],
[3, 0, 8],
[0, 0, 9]])

# 有 batch_first=True 的输出
padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=0)

tensor([[1, 2, 3, 0],
[4, 5, 0, 0],
[6, 7, 8, 9]])

3.2 训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def train_epoch(model, optimizer):
model.train()
losses = 0
train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
train_dataloader = DataLoader(train_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)

for src, tgt in train_dataloader:
src = src.to(DEVICE)
tgt = tgt.to(DEVICE)

# 解码器的输入,输入序列是包括开始符但不包括结束符
tgt_input = tgt[:-1, :]

src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)

optimizer.zero_grad()

# 解码器的输出,输出包括终止符但不包括开始符
tgt_out = tgt[1:, :]

# 将模型的输出和应该有的输出进行损失的计算
loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
loss.backward()

optimizer.step()
losses += loss.item()

return losses / len(list(train_dataloader))

tips:

1.logits的shape为torch.Size([23, 128, 10837]),也就是经过一个线性层之后,会输出10837的维度,也就是输出词表的长度。

2.为什么最后一层不用softmax?通常,在使用交叉熵损失函数时,softmax 操作会被包含在损失函数中。因此,在训练过程中,我们通常不需要显式地在模型中加入 softmax 操作。

举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import torch
import torch.nn as nn

# 假设模型输出的 logits 是一个包含三个类别的向量
logits = torch.tensor([[2.0, 1.0, 0.1], [0.9, 2.0, 1.0]])

# 假设目标标签是 [0, 2]
targets = torch.tensor([0, 2])

# 创建交叉熵损失函数
loss_function = nn.CrossEntropyLoss()

# 计算损失
loss = loss_function(logits, targets)

print(loss)

在上述示例中,logits 是模型的输出,targets 是目标标签。在计算损失时,我们只需要将 logits 和 targets 传递给交叉熵损失函数 nn.CrossEntropyLoss(),而不需要显式地在模型中使用 softmax 激活函数。交叉熵损失函数内部将 logits 应用 softmax 操作,并计算损失。

3.3 评估

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def evaluate(model):
model.eval()
losses = 0

val_iter = Multi30k(split='valid', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
val_dataloader = DataLoader(val_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)

for src, tgt in val_dataloader:
src = src.to(DEVICE)
tgt = tgt.to(DEVICE)

tgt_input = tgt[:-1, :]

src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)

tgt_out = tgt[1:, :]
loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
losses += loss.item()

return losses / len(list(val_dataloader))

4.开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from timeit import default_timer as timer
NUM_EPOCHS = 18

for epoch in range(1, NUM_EPOCHS+1):
start_time = timer()
train_loss = train_epoch(transformer, optimizer)
end_time = timer()
val_loss = evaluate(transformer)
print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f}, "f"Epoch time = {(end_time - start_time):.3f}s"))


# function to generate output sequence using greedy algorithm
def greedy_decode(model, src, src_mask, max_len, start_symbol):
src = src.to(DEVICE)
src_mask = src_mask.to(DEVICE)

memory = model.encode(src, src_mask)
ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
for i in range(max_len-1):
memory = memory.to(DEVICE)
tgt_mask = (generate_square_subsequent_mask(ys.size(0))
.type(torch.bool)).to(DEVICE)
out = model.decode(ys, memory, tgt_mask)
out = out.transpose(0, 1)
prob = model.generator(out[:, -1])
_, next_word = torch.max(prob, dim=1)
next_word = next_word.item()

ys = torch.cat([ys,
torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
if next_word == EOS_IDX:
break
return ys


# actual function to translate input sentence into target language
def translate(model: torch.nn.Module, src_sentence: str):
model.eval()
src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)
num_tokens = src.shape[0]
src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
tgt_tokens = greedy_decode(
model, src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()
return " ".join(vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))).replace("<bos>", "").replace("<eos>", "")

transformer语言翻译!🌞
https://yangchuanzhi20.github.io/2024/03/19/人工智能/Pytorch/项目实战/transformer语言翻译/
作者
白色很哇塞
发布于
2024年3月19日
许可协议