软件开发公司 实施教程|从零起初用pytorch搭建Transformer模子
图片
作家丨梁云1991开首丨算法好意思食屋剪辑丨极市平台极市导读
超凝视易懂的Transformer初学教程。 >>加入极市CV技巧交流群,走在狡计机视觉的最前沿
前列干货预警:这可能是你概况找到的最容易懂的最具实操性的最系统的学习transformer模子的初学教程。咱们从零起初用pytorch搭建Transformer模子(汉文不错翻译成变形金刚)。
执行它来完了一个道理道理的实例:两数之和。
输入输出肖似如下:
输入:'12345+54321' 输出:'66666'
咱们把这个任务看成念一个机器翻译任务来进行。输入是一个字符序列,输出亦然一个字符序列(seq-to-seq).
这和机器翻译的输入输出结构是肖似的,是以不错用Transformer来作念。
参考贵府:
论文《Attention is All you needed》: https://arxiv.org/pdf/1706.03762.pdf
哈佛博客:https://github.com/harvardnlp/annotated-transformer/
一,准备数据import random import numpy as np import torch from torch.utils.data import Dataset,DataLoader # 界说字典 words_x = '<PAD>,1,2,3,4,5,6,7,8,9,0,<SOS>,<EOS>,+' vocab_x = {word: i for i, word in enumerate(words_x.split(','))} vocab_xr = [k for k, v in vocab_x.items()] #反查辞书 words_y = '<PAD>,1,2,3,4,5,6,7,8,9,0,<SOS>,<EOS>' vocab_y = {word: i for i, word in enumerate(words_y.split(','))} vocab_yr = [k for k, v in vocab_y.items()] #反查辞书#两数相加数据集 def get_data(): # 界说词结合 words = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] # 每个词被选中的概率 p = np.array([7, 5, 5, 7, 6, 5, 7, 6, 5, 7]) p = p / p.sum() # 立时采样n1个词作为s1 n1 = random.randint(10, 20) s1 = np.random.choice(words, size=n1, replace=True, p=p) s1 = s1.tolist() # 立时采样n2个词作为s2 n2 = random.randint(10, 20) s2 = np.random.choice(words, size=n2, replace=True, p=p) s2 = s2.tolist() # x等于s1和s2字符上的相加 x = s1 + ['+'] + s2 # y等于s1和s2数值上的相加 y = int(''.join(s1)) + int(''.join(s2)) y = list(str(y)) # 加上首尾象征 x = ['<SOS>'] + x + ['<EOS>'] y = ['<SOS>'] + y + ['<EOS>'] # 补pad到固定长度 x = x + ['<PAD>'] * 50 y = y + ['<PAD>'] * 51 x = x[:50] y = y[:51] # 编码成token token_x = [vocab_x[i] for i in x] token_y = [vocab_y[i] for i in y] # 转tensor tensor_x = torch.LongTensor(token_x) tensor_y = torch.LongTensor(token_y) return tensor_x, tensor_y def show_data(tensor_x,tensor_y) ->'str': words_x = ''.join([vocab_xr[i] for i in tensor_x.tolist()]) words_y = ''.join([vocab_yr[i] for i in tensor_y.tolist()]) return words_x,words_y x,y = get_data() print(x,y,'\n') print(show_data(x,y))
图片
# 界说数据集 class TwoSumDataset(torch.utils.data.Dataset): def __init__(self,size = 100000): super(Dataset, self).__init__() self.size = size def __len__(self): return self.size def __getitem__(self, i): return get_data() ds_train = TwoSumDataset(size = 100000) ds_val = TwoSumDataset(size = 10000) # 数据加载器 dl_train = DataLoader(dataset=ds_train, batch_size=200, drop_last=True, shuffle=True) dl_val = DataLoader(dataset=ds_val, batch_size=200, drop_last=True, shuffle=False)for src,tgt in dl_train: print(src.shape) print(tgt.shape) breaktorch.Size([200, 50])torch.Size([200, 51])二,界说模子
底下,咱们会像搭积木建城堡那样从低往高地构建Transformer模子。
先构建6个基础组件:多头贯注力、前馈采集、层归一化、残差结合、单词镶嵌、位置编码。肖似用最基础的积木块搭建了 墙壁,屋顶,竹篱,厅柱,大门,窗户 这么的模块。
然后用这6个基础组件构建了3个中间制品: 编码器,解码器,产生器。肖似用基础组件构建了城堡的主楼,塔楼,花坛。
终末用这3个中间制品拼装成Tranformer完整模子。肖似用主楼,塔楼,花坛这么的中间制品拼集出一座完整美丽的城堡。
1, 多头贯注力: MultiHeadAttention (用于交融不同单词之间的信息, 三处使用场景,①Encoder self-attention, ② Decoder masked-self-attention, ③ Encoder-Decoder cross-attention)
2, 前馈采集: PositionwiseFeedForward (用于逐位置将多头贯注力交融后的信息进行高维映射变换,简称FFN)
3, 层归一化: LayerNorm (用于踏实输入,每个样本在Sequece和Feature维度归一化,比较BatchNorm更能适合NLP畛域变长序列)
4, 残差结合: ResConnection (用于增强梯度流动以裁汰采集学习难度, 不错先LayerNorm再Add,LayerNorm也不错放在残差Add之后)
5, 单词镶嵌: WordEmbedding (用于编码单词信息,权进犯学习,输出乘了sqrt(d_model)来和位置编码保执相称量级)
6, 位置编码: PositionEncoding (用于编码位置信息,使用sin和cos函数告成编码齐全位置)
7, 编码器: TransformerEncoder (用于将输入Sequence编码成与Sequence等长的memory向量序列, 由N个TransformerEncoderLayer堆叠而成)
8, 解码器: TransformerDecoder (用于将编码器编码的memory向量解码成另一个不定长的向量序列, 由N个TransformerDecoderLayer堆叠而成)
9, 生成器: Generator (用于将解码器解码的向量序列中的每个向量映射成为输出辞书中的词,一般由一个Linear层组成)
10, 变形金刚: Transformer (用于Seq2Seq转码,举例用于机器翻译,接管EncoderDecoder架构,由Encoder, Decoder 和 Generator组成)
import torch from torch import nn import torch.nn.functional as F import copy import math import numpy as np import pandas as pd def clones(module, N): 'Produce N identical layers.' return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])1,多头贯注力 MultiHeadAttention
需要任性承接 ScaledDotProductAttention->MultiHeadAttention->MaskedMultiHeadAttention
先承接什么是 ScaledDotProductAttention,再承接MultiHeadAttention, 然后承接MaskedMultiHeadAttention
class ScaledDotProductAttention(nn.Module): 'Compute 'Scaled Dot Product Attention'' def __init__(self): super(ScaledDotProductAttention, self).__init__() def forward(self,query, key, value, mask=None, dropout=None): d_k = query.size(-1) scores = query@key.transpose(-2,-1) / math.sqrt(d_k) if mask is not None: scores = scores.masked_fill(mask == 0, -1e20) p_attn = F.softmax(scores, dim = -1) if dropout is not None: p_attn = dropout(p_attn) return p_attn@value, p_attn class MultiHeadAttention(nn.Module): def __init__(self, h, d_model, dropout=0.1): 'Take in model size and number of heads.' super(MultiHeadAttention, self).__init__() assert d_model % h == 0 # We assume d_v always equals d_k self.d_k = d_model // h self.h = h self.linears = clones(nn.Linear(d_model, d_model), 4) self.attn = None #纪录 attention矩阵甘休 self.dropout = nn.Dropout(p=dropout) self.attention = ScaledDotProductAttention() def forward(self, query, key, value, mask=None): if mask is not None: # Same mask applied to all h heads. mask = mask.unsqueeze(1) nbatches = query.size(0) # 1) Do all the linear projections in batch from d_model => h x d_k query, key, value = [ l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2) for l, x in zip(self.linears, (query, key, value)) ] # 2) Apply attention on all the projected vectors in batch. x, self.attn = self.attention(query, key, value, mask=mask, dropout=self.dropout) # 3) 'Concat' using a view and apply a final linear. x = x.transpose(1, 2).contiguous() \ .view(nbatches, -1, self.h * self.d_k) return self.linears[-1](x) #为了让执行过程与解码过程信息流一致,障翳tgt序列背面元素,斥地其贯注力为0 def tril_mask(data): 'Mask out future positions.' size = data.size(-1) #size为序列长度 full = torch.full((1,size,size),1,dtype=torch.int,device=data.device) mask = torch.tril(full).bool() return mask #斥地对<PAD>的贯注力为0 def pad_mask(data, pad=0): 'Mask out pad positions.' mask = (data!=pad).unsqueeze(-2) return mask #狡计一个batch数据的src_mask和tgt_mask class MaskedBatch: 'Object for holding a batch of data with mask during training.' def __init__(self, src, tgt=None, pad=0): self.src = src self.src_mask = pad_mask(src,pad) if tgt is not None: self.tgt = tgt[:,:-1] #执行时,拿tgt的每一个词输入,去忖度下一个词,是以终末一个词无需输入 self.tgt_y = tgt[:, 1:] #第一个老是<SOS>无需忖度,忖度从第二个词起初 self.tgt_mask = \ self.make_tgt_mask(self.tgt, pad) self.ntokens = (self.tgt_y!= pad).sum() @staticmethod def make_tgt_mask(tgt, pad): 'Create a mask to hide padding and future words.' tgt_pad_mask = pad_mask(tgt,pad) tgt_tril_mask = tril_mask(tgt) tgt_mask = tgt_pad_mask & (tgt_tril_mask) return tgt_maskimport plotly.express as px # 测试tril_mask mask = tril_mask(torch.zeros(1,10)) #序列长度为10 #sns.heatmap(mask[0],cmap=sns.cm.rocket); px.imshow(mask[0],color_continuous_scale='blues',height=600,width=600)
图片
纵观大乐透上市至今累计2594期开奖,开出前区跨度10的次数不多,算上本次共计开出22次,该形态占比只有0.84%。
#测试 ScaledDotProductAttention query = torch.tensor([[[0.0,1.414],[1.414,0.0],[1.0,1.0],[-1.0,1.0],[1.0,-1.0]]]) key = query.clone() value = query.clone() attention = ScaledDotProductAttention() #莫得mask out,p_att = attention(query, key, value) fig = px.imshow(p_att[0],color_continuous_scale='blues', title='without mask',height=600,width=600) fig.show()图片
#推敲mask out,p_att = attention(query, key, value, mask = tril_mask(torch.zeros(3,5))) fig = px.imshow(p_att[0],color_continuous_scale='blues', height=600,width=600, title='with mask') fig.show()
图片
# 测试MultiHeadAttention cross_attn = MultiHeadAttention(h=2, d_model=4) cross_attn.eval() q1 = torch.tensor([[[0.1,0.1,0.1,0.1],[0.1,0.3,0.1,0.3]]]) k1 = q1.clone() v1 = q1.clone() tgt_mask = tril_mask(torch.zeros(2,2)) out1 = cross_attn.forward(q1,k1,v1,mask = tgt_mask) print('out1:\n',out1) #篡改序列的第2个元素取值,由于有mask的障翳,不会影响第1个输出 q2 = torch.tensor([[[0.1,0.1,0.1,0.1],[0.4,0.5,0.5,0.8]]]) k2 = q2.clone() v2 = q2.clone() tgt_mask = tril_mask(torch.zeros(2,2)) out2 = cross_attn.forward(q2,k2,v2,mask = tgt_mask) print('out2:\n',out2)图片
# 测试MaskedBatch mbatch = MaskedBatch(src = src,tgt = tgt, pad = 0) print(mbatch.src.shape) print(mbatch.tgt.shape) print(mbatch.tgt_y.shape) print(mbatch.src_mask.shape) print(mbatch.tgt_mask.shape) px.imshow(mbatch.tgt_mask[0],color_continuous_scale='blues',width=600,height=600)
图片
对于Transformer的多头贯注力机制,有几个重点问题,此处作念一些梳理:
(1),Transformer是怎样惩办长距离依赖的问题的?
Transformer是通过引入Scale-Dot-Product贯注力机制来交融序列上不同位置的信息,从而惩办长距离依赖问题。以文本数据为例,在轮回神经采集LSTM结构中,输入序列上相距很远的两个单词无法告成发生交互,只可通过避讳层输出或者细胞景况按照时候要领一个一个向后进行传递。对于两个在序列上相距至极远的单词,中间经过的其它单词让避讳层输出和细胞景况混入了太多的信息,很难灵验地捕捉这种长距离依赖特征。然而在Scale-Dot-Product贯注力机制中,序列上的每个单词都会和其它悉数单词作念一次点积狡计贯注力得分,这种贯注力机制中单词之间的交互是强制的不受距离影响的,是以不错惩办长距离依赖问题。
(2),Transformer在执行和测试阶段不错在时候(序列)维度上进行并行吗?
在执行阶段,Encoder和Decoder在时候(序列)维度都是并行的,在测试阶段,Encoder在序列维度是并行的,Decoder是串行的。
领先,Encoder部分在执行阶段和忖度阶段都不错并行比较好承接,不论在执行如故忖度阶段,它干的事情都是把已知的完整输入编码成memory,在序列维度不错并行。
对于Decoder部分有些玄妙。在忖度阶段Decoder确定是不成并行的,因为Decoder骨子上是一个自纪念,它前边k-1位置的输出会形成第k位的输入的。前边莫得狡计完,背面是拿不到输入的,确定不不错并行。那么执行阶段能否并行呢?诚然执行阶段知说念了全部的解码甘休,然而执行阶段要和忖度阶段一致啊,前边的解码输出不成受到背面解码甘休的影响啊。但Transformer通过在Decoder中深奥地引入Mask手段,使得在用Attention机制作念序列特征交融的时候,每个单词对位于它之后的单词的贯注力得分都为0,这么就保证了前边的解码输出不会受到背面解码甘休的影响,因此Decoder在执行阶段不错在序列维度作念并行。
(3), Scaled-Dot Product Attention为什么要除以 ?
为了幸免 变得很大时 softmax函数的梯度趋于0。假定 和 中的取出的两个向量 和 的每个元素值都是正态立时散播, 数学上不错评释两个零丁的正态立时变量的积依然是一个正态立时变量, 那么两个向量作念点积, 会得到 个正态立时变量的和, 数学上 个正态立时变量的和依然是一个正态立时变量, 其方差是原本的 倍, 法度差是原本的 倍。如若不作念 scale, 当 很大时, 求得的 元素的齐全值容易很大, 导致落在 softmax的极点区域(趋于 0 或者 1 ), 极点区域softmax函数的梯度值趋于 0 , 不利于模子学习。除以 , 正值作念了归一, 不受 变化影响。
(4),MultiHeadAttention的参数数目和head数目有何相干?
MultiHeadAttention的参数数目和head数目无关。多头贯注力的参数来自对QKV的三个变换矩阵以及多头甘休concat后的输出变换矩阵。假定镶嵌向量的长度是d_model, 一共有h个head. 对每个head,这三个变换矩阵的尺寸都是 d_model×(d_model/h),是以h个head总的参数数目等于3×d_model×(d_model/h)×h = 3×d_model×d_model。它们的输出向量长度都形成 d_model/h,经过attention作用后向量长度保执,h个head的输出拼接到一皆后向量长度如故d_model,是以终末输出变换矩阵的尺寸是d_model×d_model。因此,MultiHeadAttention的参数数目为 4×d_model×d_model,和head数目无关。
2,前馈采集: PositionwiseFeedForward用于逐位置将多头贯注力交融后的信息进行高维映射变换,简称FFN。
FFN仅有两个线性层,第一层将模子向量维度 从 d_model(512) 升到 d_ff(2048), 第二层再降回 d_model(512)
两个线性层之间加了一个0.1的Dropout
class PositionwiseFeedForward(nn.Module): 'Implements FFN equation.' def __init__(self, d_model, d_ff, dropout=0.1): super(PositionwiseFeedForward, self).__init__() self.linear1 = nn.Linear(d_model, d_ff) #线性层默许作用在终末一维度 self.linear2 = nn.Linear(d_ff, d_model) self.dropout = nn.Dropout(dropout) def forward(self, x): return self.linear2(self.dropout(F.relu(self.linear1(x)))) 3,层归一化:LayerNorm在视觉畛域,归一化一般用BatchNorm,然而在NLP畛域,归一化一般用LayerNorm。
这是由于NLP畛域的输入时时是不等长的Sequence,使用BatchNorm会让较长的Sequence输入的背面特征概况使用的参与归一化的样本数太少,让输入变得不踏实。
同期合并个Sequence的被PADDING填充的特征也会因BatchNorm得到不同的非零值,这对模子至极不友好。
比较之下,LayerNorm老是对一个样本我方的特征进行归一化,莫得上述问题。
class LayerNorm(nn.Module): 'Construct a layernorm module (similar to torch.nn.LayerNorm).' def __init__(self, features, eps=1e-6): super(LayerNorm, self).__init__() self.weight = nn.Parameter(torch.ones(features)) self.bias = nn.Parameter(torch.zeros(features)) self.eps = eps def forward(self, x): mean = x.mean(-1, keepdim=True) std = x.std(-1, keepdim=True) return self.weight * (x - mean) / (std + self.eps) + self.bias4,残差结合:ResConnection
用于增强梯度流动以裁汰采集学习难度。
ResConnection 包括LayerNorm和Add残差结合操作, LayerNorm不错放在最起初(norm_first=True),也不错放在终末(norm_first=False)。
《Attention is All you needed》论文原文是残差结合之后再 LayerNorm,但背面的一些研究发现最起初的时候就LayerNorm更好一些。
残差结合对于执行深度采集至关进犯。有好多研究残差结合(ResNet)作用机制,解释它为什么灵验的著作,主要的一些不雅点如下。
1,残差结合增强了梯度流动。直不雅上看,loss端的梯度概况通过进步结合快速传递到不同深度的各个层,增强了梯度流动,裁汰了采集的学习难度。数学上看,残差块的导数 f(x)=x+h(x) 为 f'(x)=1+h'(x) 在1.0隔邻,幸免了梯度隐匿问题。
2,残差结合缩小了采集退化。一个采集层h(x)不错用一个变换矩阵H来示意,由于好多神经元有交流的反馈气象,h(x)等价的变换矩阵H可能有好多行是线性接洽的,这使得H的行列式为0,H为非可逆矩阵,h(x)会导致采集的退化和信息丢失。但增多了残差结合之后,开发软件需要多少钱呢f(x)=x+h(x)对应的变换矩阵F=H+I,单元阵I排斥了H中接洽行的线性接洽性,缩小了退化的可能。
3,残差结合完了了模子集成。如若将执行好的ResNet的一些block移除,模子的忖度精度并不会崩溃式着落,然而如若将执行好的VGG的一些block移除,模子的忖度精度会雪崩。这证实ResNet中的各个Block肖似基模子,ResNet通过残差结合将它们整合成了一个ensemble集成模子,增强了泛化智商。
4,残差结合增强了抒发智商。使用残差块构建的深层采集所代表的函数簇结合是浅层采集所代表的的函数簇结合的超集,抒发智商更强,是以不错通过添加残差块不停推论模子抒发智商。如若不使用残差结合,一个一层的采集f(x) = h1(x) 所能示意的函数簇 不一定能被一个二层的采集 f(x) = h2(h1(x))所覆盖,然而使用残差结合后,f(x) = h1(x)+h2(h1(x))一定不错覆盖一层的采集所示意的函数簇,只须h2的全部权重取0即可。
参考:https://zhuanlan.zhihu.com/p/165350103
class ResConnection(nn.Module): ''' A residual connection with a layer norm. Note the norm is at last according to the paper, but it may be better at first. ''' def __init__(self, size, dropout, norm_first=True): super(ResConnection, self).__init__() self.norm = LayerNorm(size) self.dropout = nn.Dropout(dropout) self.norm_first = norm_first def forward(self, x, sublayer): 'Apply residual connection to any sublayer with the same size.' if self.norm_first: return x + self.dropout(sublayer(self.norm(x))) else: return self.norm(x + self.dropout(sublayer(x))) 5,单词镶嵌: WordEmbedding(权进犯学习)用于编码单词信息,权进犯学习,输出乘了sqrt(d_model)来和位置编码保执相称量级。
当d_model越大的时候,字据 nn.init.xavier_uniform 运行化计谋运行化的权重取值会越小。
# 单词镶嵌 class WordEmbedding(nn.Module): def __init__(self, d_model, vocab): super(WordEmbedding, self).__init__() self.embedding = nn.Embedding(vocab, d_model) self.d_model = d_model def forward(self, x): return self.embedding(x) * math.sqrt(self.d_model) #note here, multiply sqrt(d_model)6,位置编码:PositionEncoding(告成编码)
PositionEncoding用于编码位置信息,使用sin和cos函数告成编码齐全位置。
单词和单词次第对谈话意旨都至极进犯。
'你欠我1000块钱'和'我欠你1000块钱'是由完全交流的单词组成,但由于词的次第不同,含义截然不同。
在Transformer之前,一般用RNN模子来处理句子序列。
RNN模子自己蕴含了对次第的建模,单词是按照它们在句子中的当然次第一个个地被RNN单元处理,逐一地被编码。
但Transformer是并行地处理句子中的单词的,穷乏单词的位置信息表征。
为了灵验地表征单词的位置信息,Transformer瞎想了位置编码 PositionalEncoding,并添加到模子的输入中。
于是,Transformer 用单词镶嵌(权进犯学习)向量 和位置编码(告成编码)向量 之和 来示意输入。
怎样构造位置编码呢?即怎样 把 pos = 0,1,2,3,4,5,... 这么的位置序列映射成为 一个一个的向量呢?
Transformer瞎想了基于正弦函数和余弦函数的位置编码方法。
图片
这种编码方法有以下几个优点:
1,编码值散播在[-1,1]之间,这么的散播对神经采集是比较友好的。
2,编码了齐全位置信息,对于0<=pos<=2_pi_10000,每个pos的位置编码向量都是不雷同的。
更多位置编码的规划参考如下博客:《
让研究东说念主员静思默想的Transformer位置编码》
https://kexue.fm/archives/8130
# 位置编码 class PositionEncoding(nn.Module): 'Implement the PE function.' def __init__(self, d_model, dropout, max_len=5000): super(PositionEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout) # Compute the positional encodings once in log space. pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) self.register_buffer('pe', pe) def forward(self, x): x = x + self.pe[:, :x.size(1)] return self.dropout(x)pe = PositionEncoding(120, 0) z = pe.forward(torch.zeros(1, 100, 120)) df = pd.DataFrame(z[0, :, [0,20,60,110]].data.numpy(),columns = ['dim'+c for c in ['0','20','60','110']]) df.insert(0,'x',np.arange(100)) px.line(df, x = 'x',y = ['dim'+c for c in ['0','20','60','110']]).show()
图片
px.imshow(np.squeeze(z.data.numpy()) ,color_continuous_scale='blues',width=1000,height=800)图片
7, 编码器: TransformerEncoder用于将输入Sequence编码成与Sequence等长的memory向量序列, 由N个TransformerEncoderLayer堆叠而成
class TransformerEncoderLayer(nn.Module): 'TransformerEncoderLayer is made up of self-attn and feed forward (defined below)' def __init__(self, size, self_attn, feed_forward, dropout): super(TransformerEncoderLayer, self).__init__() self.self_attn = self_attn self.feed_forward = feed_forward self.res_layers = clones(ResConnection(size, dropout), 2) self.size = size def forward(self, x, mask): 'Follow Figure 1 (left) for connections.' x = self.res_layers[0](x, lambda x: self.self_attn(x, x, x, mask)) return self.res_layers[1](x, self.feed_forward) class TransformerEncoder(nn.Module): 'TransformerEncoder is a stack of N TransformerEncoderLayer' def __init__(self, layer, N): super(TransformerEncoder, self).__init__() self.layers = clones(layer, N) self.norm = LayerNorm(layer.size) def forward(self, x, mask): 'Pass the input (and mask) through each layer in turn.' for layer in self.layers: x = layer(x, mask) return self.norm(x) @classmethod def from_config(cls,N=6,d_model=512, d_ff=2048, h=8, dropout=0.1): attn = MultiHeadAttention(h, d_model) ff = PositionwiseFeedForward(d_model, d_ff, dropout) layer = TransformerEncoderLayer(d_model, attn, ff, dropout) return cls(layer,N)from torchkeras import summary src_embed = nn.Sequential(WordEmbedding(d_model=32, vocab = len(vocab_x)), PositionEncoding(d_model=32, dropout=0.1)) encoder = TransformerEncoder.from_config(N=3,d_model=32, d_ff=128, h=8, dropout=0.1) src_mask = pad_mask(src) memory = encoder(*[src_embed(src),src_mask]) summary(encoder,input_data_args = [src_embed(src),src_mask]); 8,解码器:TransformerDecoder
用于将编码器编码的memory向量解码成另一个不定长的向量序列, 由N个TransformerDecoderLayer堆叠而成
class TransformerDecoderLayer(nn.Module): 'TransformerDecoderLayer is made of self-attn, cross-attn, and feed forward (defined below)' def __init__(self, size, self_attn, cross_attn, feed_forward, dropout): super(TransformerDecoderLayer, self).__init__() self.size = size self.self_attn = self_attn self.cross_attn = cross_attn self.feed_forward = feed_forward self.res_layers = clones(ResConnection(size, dropout), 3) def forward(self, x, memory, src_mask, tgt_mask): 'Follow Figure 1 (right) for connections.' m = memory x = self.res_layers[0](x, lambda x: self.self_attn(x, x, x, tgt_mask)) x = self.res_layers[1](x, lambda x: self.cross_attn(x, m, m, src_mask)) return self.res_layers[2](x, self.feed_forward)class TransformerDecoder(nn.Module): 'Generic N layer decoder with masking.' def __init__(self, layer, N): super(TransformerDecoder, self).__init__() self.layers = clones(layer, N) self.norm = LayerNorm(layer.size) def forward(self, x, memory, src_mask, tgt_mask): for layer in self.layers: x = layer(x, memory, src_mask, tgt_mask) return self.norm(x) @classmethod def from_config(cls,N=6,d_model=512, d_ff=2048, h=8, dropout=0.1): self_attn = MultiHeadAttention(h, d_model) cross_attn = MultiHeadAttention(h, d_model) ff = PositionwiseFeedForward(d_model, d_ff, dropout) layer = TransformerDecoderLayer(d_model, self_attn, cross_attn, ff, dropout) return cls(layer,N)
from torchkeras import summary mbatch = MaskedBatch(src=src,tgt=tgt,pad=0) src_embed = nn.Sequential(WordEmbedding(d_model=32, vocab = len(vocab_x)), PositionEncoding(d_model=32, dropout=0.1)) encoder = TransformerEncoder.from_config(N=3,d_model=32, d_ff=128, h=8, dropout=0.1) memory = encoder(src_embed(src),mbatch.src_mask) tgt_embed = nn.Sequential(WordEmbedding(d_model=32, vocab = len(vocab_y)), PositionEncoding(d_model=32, dropout=0.1)) decoder = TransformerDecoder.from_config(N=3,d_model=32, d_ff=128, h=8, dropout=0.1) result = decoder.forward(tgt_embed(mbatch.tgt),memory,mbatch.src_mask,mbatch.tgt_mask) summary(decoder,input_data_args = [tgt_embed(mbatch.tgt),memory, mbatch.src_mask,mbatch.tgt_mask]);decoder.eval() mbatch.tgt[0][1]=8 result = decoder.forward(tgt_embed(mbatch.tgt),memory,mbatch.src_mask,mbatch.tgt_mask) print(torch.sum(result[0][0])) mbatch.tgt[0][1]=7 result = decoder.forward(tgt_embed(mbatch.tgt),memory,mbatch.src_mask,mbatch.tgt_mask) print(torch.sum(result[0][0])) 9,生成器: Generator
用于将解码器解码输出的向量序列中的每个向量逐一映射成为输出辞书中各个词的取词概率。一般由一个Linear层接F.log_softmax组成,比较简单。接F.log_softmax而不接F.softmax的原因是对于一些荒芜小的概率如1e-100,在精度遏抑要求下,F.log_softmax概况愈加准确地示意其大小。
class Generator(nn.Module): 'Define standard linear + softmax generation step.' def __init__(self, d_model, vocab): super(Generator, self).__init__() self.proj = nn.Linear(d_model, vocab) def forward(self, x): return F.log_softmax(self.proj(x), dim=-1)generator = Generator(d_model = 32, vocab = len(vocab_y)) log_probs = generator(result) probs = torch.exp(log_probs) print('output_probs.shape:',probs.shape) print('sum(probs)=1:') print(torch.sum(probs,dim = -1)[0]) summary(generator,input_data = result); 10,变形金刚:Transformer
用于Seq2Seq转码,举例用于机器翻译,接管EncoderDecoder架构,由Encoder, Decoder 和 Generator组成
from torch import nn class Transformer(nn.Module): ''' A standard Encoder-Decoder architecture. Base for this and many other models. ''' def __init__(self, encoder, decoder, src_embed, tgt_embed, generator): super(Transformer, self).__init__() self.encoder = encoder self.decoder = decoder self.src_embed = src_embed self.tgt_embed = tgt_embed self.generator = generator self.reset_parameters() def forward(self, src, tgt, src_mask, tgt_mask): 'Take in and process masked src and target sequences.' return self.generator(self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)) def encode(self, src, src_mask): return self.encoder(self.src_embed(src), src_mask) def decode(self, memory, src_mask, tgt, tgt_mask): return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask) @classmethod def from_config(cls,src_vocab,tgt_vocab,N=6,d_model=512, d_ff=2048, h=8, dropout=0.1): encoder = TransformerEncoder.from_config(N=N,d_model=d_model, d_ff=d_ff, h=h, dropout=dropout) decoder = TransformerDecoder.from_config(N=N,d_model=d_model, d_ff=d_ff, h=h, dropout=dropout) src_embed = nn.Sequential(WordEmbedding(d_model, src_vocab), PositionEncoding(d_model, dropout)) tgt_embed = nn.Sequential(WordEmbedding(d_model, tgt_vocab), PositionEncoding(d_model, dropout)) generator = Generator(d_model, tgt_vocab) return cls(encoder, decoder, src_embed, tgt_embed, generator) def reset_parameters(self): for p in self.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p)from torchkeras import summary net = Transformer.from_config(src_vocab = len(vocab_x),tgt_vocab = len(vocab_y), N=2, d_model=32, d_ff=128, h=8, dropout=0.1) mbatch = MaskedBatch(src=src,tgt=tgt,pad=0) summary(net,input_data_args = [mbatch.src,mbatch.tgt,mbatch.src_mask,mbatch.tgt_mask]);
图片
图片
图片
三,执行模子Transformer的执行主要用到了以下两个手段:
1,学习率调解: Learning Rate Scheduler (用于升迁模子学习踏实性。作念法是学习率先warm up线性增长,再按照 1/sqrt(step) 章程任性着落)
2,标签平滑: Label Smoothing. (用于让模子愈加齐集在对分类造作的样本的学习,而不是扩大照旧分类正确样本中正负样本忖度差距。作念法是将正例标签由1改成0.1,负例标签由0改成0.9/vocab_size)
先容了用这两个方法封装的 Optimizer和 Loss 后,咱们进一步完了完整执行代码。
3,完整执行代码。
1,学习率调解:Learning Rate Scheduler用于升迁模子学习踏实性。
作念法是学习率先warm up线性增长,再按照 1/sqrt(step) 章程任性着落。
学习率的warm up为何灵验呢?
一种解释性不雅点是以为这概况让模子运行学习时参数牢固变化并幸免对起初的几个batch数据过拟合堕入局部最优。
由于刚学习时,loss比较大,梯度会很大,如若学习率也很大,两者相乘会更大,那么模子参数会跟着不同batch数据的各异剧烈抖动,无法灵验地学习,也容易对起初的几个batch数据过拟合,后期很难拉回来。
比及模子学习了一些时候,loss变小了,梯度也会小,学习率调大,两者相乘也不会很大,模子依然不错牢固灵验地学习。
后期为何又要让调低学习率呢?
这是因为后期模子loss照旧很小了,在最优参数隔邻了,如若学习率过大,容易在最优参数隔邻动荡,无法面对最优参数。
参考:https://www.zhihu.com/question/338066667
#注1:此处通过接纳方法将学习率调解计谋融入Optimizer #注2:NoamOpt中的Noam是论文作家之一的名字 #注3:学习率是按照step而非epoch去篡改的 class NoamOpt(torch.optim.AdamW): def __init__(self, params, model_size=512, factor=1.0, warmup=4000, lr=0, betas=(0.9, 0.98), eps=1e-9, weight_decay=0, amsgrad=False): super(NoamOpt,self).__init__(params, lr=lr, betas=betas, eps=eps, weight_decay=weight_decay, amsgrad=amsgrad) self._step = 0 self.warmup = warmup self.factor = factor self.model_size = model_size def step(self,closure=None): 'Update parameters and rate' self._step += 1 rate = self.rate() for p in self.param_groups: p['lr'] = rate super(NoamOpt,self).step(closure=closure) def rate(self, step = None): 'Implement `lrate` above' if step is None: step = self._step return self.factor * \ (self.model_size ** (-0.5) * min(step * self.warmup ** (-1.5),step ** (-0.5))) optimizer = NoamOpt(net.parameters(), &nbnbsp;model_size=net.src_embed[0].d_model, factor=1.0, warmup=400)import plotly.express as px opts = [NoamOpt(net.parameters(),model_size=512, factor =1, warmup=4000), NoamOpt(net.parameters(),model_size=512, factor=1, warmup=8000), NoamOpt(net.parameters(),model_size=256, factor=1, warmup=4000)] steps = np.arange(1, 20000) rates = [[opt.rate(i) for opt in opts] for i in steps] dfrates = pd.DataFrame(rates,columns = ['512:4000', '512:8000', '256:4000']) dfrates['steps'] = steps fig = px.line(dfrates,x='steps',y=['512:4000', '512:8000', '256:4000']) fig.layout.yaxis.title = 'lr' fig
图片
2,标签平滑:Label Smoothing用于让模子愈加齐集在对分类造作的样本的学习,而不是扩大照旧分类正确样本中正负样本忖度差距。
作念法是将正例标签由1改成0.1,负例标签由0改成0.9/vocab_size
多分类一般用softmax激活函数,要让模子对正例标签忖度值为1横蛮常窒碍的,那需要输出正无限才不错.
对负例标签忖度值为0也横蛮常窒碍的,那需要输出负无限才不错。
但骨子上咱们不需要模子那么信服,只须正例标签的忖度值比负例标签大就行了。
因此不错作念标签平滑,让模子无谓汉典地无限扩大分类正确样本中正负样本之间的忖度差距,而是齐集在对分类造作的样本的学习。
由于在激活函数中照旧接管了F.log_softmax, 是以耗损函数不成用nn.CrossEntropyLoss,而需要使用 nn.NLLoss.
(注:nn.LogSoftmax + nn.NLLLoss = nn.CrossEntropyLoss)
同期由于使用了标签平滑,接管nn.NLLoss时耗损的最小值无法形成0,需要扣除标签散播自己的熵,耗损函数进一步形成 nn.KLDivLoss.
在接管标签平滑的时候,nn.KLDivLoss和nn.NLLoss的梯度交流,优化着力交流,但其最小值是0,更允洽咱们对耗损的直不雅承接。
class LabelSmoothingLoss(nn.Module): 'Implement label smoothing.' def __init__(self, size, padding_idx, smoothing=0.0): #size为辞书大小 super(LabelSmoothingLoss, self).__init__() self.criterion = nn.KLDivLoss(reduction='sum') self.padding_idx = padding_idx self.confidence = 1.0 - smoothing self.smoothing = smoothing self.size = size self.true_dist = None def forward(self, x, target): assert x.size(1) == self.size true_dist = x.data.clone() true_dist.fill_(self.smoothing / (self.size - 2)) #忖度甘休不会是<SOS> #和<PAD> true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence) true_dist[:, self.padding_idx] = 0 mask = torch.nonzero((target.data == self.padding_idx).int()) if mask.dim() > 0: true_dist.index_fill_(0, mask.squeeze(), 0.0) self.true_dist = true_dist return self.criterion(x, true_dist)# Example of label smoothing. smooth_loss = LabelSmoothingLoss(5, 0, 0.4) predict = torch.FloatTensor([[1e-10, 0.2, 0.7, 0.1, 1e-10], [1e-10, 0.2, 0.7, 0.1, 1e-10], [1e-10, 0.2, 0.7, 0.1, 1e-10]]) loss = smooth_loss(predict.log(), torch.LongTensor([2, 1, 0])) print('smoothed target:\n',smooth_loss.true_dist,'\n') print('loss:',loss) px.imshow(smooth_loss.true_dist,color_continuous_scale='blues',height=600,width=1000)
smoothed target: tensor([[0.0000, 0.1333, 0.6000, 0.1333, 0.1333], [0.0000, 0.6000, 0.1333, 0.1333, 0.1333], [0.0000, 0.0000, 0.0000, 0.0000, 0.0000]]) loss: tensor(5.9712)3,完整执行代码
有了优化器和Loss后,咱们便不错执行模子了。
咱们先全体试算loss和metric,然后再套上torchkeras的执行模版。
#全体历程试算 for src,tgt in dl_train: break mbatch = MaskedBatch(src=src,tgt=tgt,pad = 0) net = Transformer.from_config(src_vocab = len(vocab_x),tgt_vocab = len(vocab_y), N=3, d_model=64, d_ff=128, h=8, dropout=0.1) #loss loss_fn = LabelSmoothingLoss(size=len(vocab_y), padding_idx=0, smoothing=0.2) preds = net.forward(mbatch.src, mbatch.tgt, mbatch.src_mask, mbatch.tgt_mask) preds = preds.reshape(-1, preds.size(-1)) labels = mbatch.tgt_y.reshape(-1) loss = loss_fn(preds, labels)/mbatch.ntokens print('loss=',loss.item()) #metric preds = preds.argmax(dim=-1).view(-1)[labels!=0] labels = labels[labels!=0] acc = (preds==labels).sum()/(labels==labels).sum() print('acc=',acc.item())loss= 2.1108953952789307acc= 0.08041179925203323from torchmetrics import Accuracy #使用torchmetrics中的办法 accuracy = Accuracy(task='multiclass',num_classes=len(vocab_y)) accuracy.update(preds,labels) print('acc=',accuracy.compute().item())
acc= 0.08041179925203323
底下使用咱们的梦中情炉来完了最优雅的执行轮回~
from torchkeras import KerasModel class StepRunner: def __init__(self, net, loss_fn, accelerator=None, stage = 'train', metrics_dict = None, optimizer = None, lr_scheduler = None ): self.net,self.loss_fn,self.metrics_dict,self.stage = net,loss_fn,metrics_dict,stage self.optimizer,self.lr_scheduler = optimizer,lr_scheduler self.accelerator = accelerator if self.stage=='train': self.net.train() else: self.net.eval() def __call__(self, batch): src,tgt = batch mbatch = MaskedBatch(src=src,tgt=tgt,pad = 0) #loss with self.accelerator.autocast(): preds = net.forward(mbatch.src, mbatch.tgt, mbatch.src_mask, mbatch.tgt_mask) preds = preds.reshape(-1, preds.size(-1)) labels = mbatch.tgt_y.reshape(-1) loss = loss_fn(preds, labels)/mbatch.ntokens #filter padding preds = preds.argmax(dim=-1).view(-1)[labels!=0] labels = labels[labels!=0] #backward() if self.stage=='train' and self.optimizer is not None: self.accelerator.backward(loss) if self.accelerator.sync_gradients: self.accelerator.clip_grad_norm_(self.net.parameters(), 1.0) self.optimizer.step() if self.lr_scheduler is not None: self.lr_scheduler.step() self.optimizer.zero_grad() all_loss = self.accelerator.gather(loss).sum() all_preds = self.accelerator.gather(preds) all_labels = self.accelerator.gather(labels) #losses (or plain metrics that can be averaged) step_losses = {self.stage+'_loss':all_loss.item()} step_metrics = {self.stage+'_'+name:metric_fn(all_preds, all_labels).item() for name,metric_fn in self.metrics_dict.items()} if self.stage=='train': if self.optimizer is not None: step_metrics['lr'] = self.optimizer.state_dict()['param_groups'][0]['lr'] else: step_metrics['lr'] = 0.0 return step_losses,step_metrics KerasModel.StepRunner = StepRunnerfrom torchmetrics import Accuracy net = Transformer.from_config(src_vocab = len(vocab_x),tgt_vocab = len(vocab_y), N=5, d_model=64, d_ff=128, h=8, dropout=0.1)loss_fn = LabelSmoothingLoss(size=len(vocab_y), padding_idx=0, smoothing=0.1) metrics_dict = {'acc':Accuracy(task='multiclass',num_classes=len(vocab_y))} optimizer = NoamOpt(net.parameters(),model_size=64) model = KerasModel(net, loss_fn=loss_fn, metrics_dict=metrics_dict, optimizer = optimizer) model.fit( train_data=dl_train, val_data=dl_val, epochs=100, ckpt_path='checkpoint', patience=10, monitor='val_acc', mode='max', callbacks=None, plot=True )
图片
四,使用模子底下使用有打算法进行翻译推理过程。
和执行过程不错通过掩码障翳翌日token,从而完了一个句子在序列长度场合并行执行不同。
翻译推理过程唯一先翻译了前边的内容,添加到输出中,智力够翻译背面的内容,这个过程是无法在序列维度并行的。
Decoder&Generator第k位的输出骨子上对应的是 已知 输入编码后的memory和前k位Deocder输入(解码序列)
的情况下解码序列第k+1位取 输出辞书中各个词的概率。
有打算法是获取解码甘休的简化决策,工程实施当中一般使用束搜索方法(Beam Search)
参考:《十分钟读懂Beam Search》 https://zhuanlan.zhihu.com/p/114669778
def greedy_decode(net, src, src_mask, max_len, start_symbol): net.eval() memory = net.encode(src, src_mask) ys = torch.full((len(src),max_len),start_symbol,dtype = src.dtype).to(src.device) for i in range(max_len-1): out = net.generator(net.decode(memory, src_mask, ys, tril_mask(ys))) ys[:,i+1]=out.argmax(dim=-1)[:,i] return ys def get_raw_words(tensor,vocab_r) ->'str': words = [vocab_r[i] for i in tensor.tolist()] return words def get_words(tensor,vocab_r) ->'str': s = ''.join([vocab_r[i] for i in tensor.tolist()]) words = s[:s.find('<EOS>')].replace('<SOS>','') return words def prepare(x,accelerator=model.accelerator): return x.to(accelerator.device)##解码翻译甘休 net = model.net net.eval() net = prepare(net) src,tgt = get_data() src,tgt = prepare(src),prepare(tgt) mbatch = MaskedBatch(src=src.unsqueeze(dim=0),tgt=tgt.unsqueeze(dim=0)) y_pred = greedy_decode(net,mbatch.src,mbatch.src_mask,50,vocab_y['<SOS>']) print('input:') print(get_words(mbatch.src[0],vocab_xr),'\n') #标签甘休 print('ground truth:') print(get_words(mbatch.tgt[0],vocab_yr),'\n') #标签甘休 print('prediction:') print(get_words(y_pred[0],vocab_yr)) #解码忖度甘休,原始标签中<PAD>位置的忖度不错忽略
input: 744905345112863593+7323038062936802655
小程序开发ground truth: 8067943408049666248
prediction: 8067943408049666248
五,评估模子咱们执行过程中监控的acc骨子上是字符级别的acc,当今咱们来狡计样本级别的准确率。
from tqdm.auto import tqdm net = prepare(net) loop = tqdm(range(1,201)) correct = 0 for i in loop: src,tgt = get_data() src,tgt = prepare(src),prepare(tgt) mbatch = MaskedBatch(src=src.unsqueeze(dim=0),tgt=tgt.unsqueeze(dim=0)) y_pred = greedy_decode(net,mbatch.src,mbatch.src_mask,50,vocab_y['<SOS>']) inputs = get_words(mbatch.src[0],vocab_xr) #标签甘休 gt = get_words(mbatch.tgt[0],vocab_yr) #标签甘休 preds = get_words(y_pred[0],vocab_yr) #解码忖度甘休,原始标签中<PAD>位置的忖度不错忽略 if preds==gt: correct+=1 loop.set_postfix(acc = correct/i) print('acc=',correct/len(loop))
图片
perfect软件开发公司,基本齐备完了两数之和。
本站仅提供存储管事,悉数内容均由用户发布,如发现存害或侵权内容,请点击举报。