学习LoRA的代码部分
基础知识
dense layer
-
Dense 层是Keras中构建神经网络的基本组件,用于添加全连接层,主要参数包括:
输出神经元数量、激活函数、是否使用偏置等。
1 2 3 4 5 6 7 8
model.add(Dense(units, #输出的大小(神经元个数) activation=None, #激活函数 use_bias=True, #是否添加偏置 kernel_initializer='glorot_uniform', #权重矩阵初始化 bias_initializer='zeros', #偏置初始化 kernel_regularizer=None, #权重矩阵的正则函数 bias_regularizer=None,) #偏置的的正则函数 )
当Dense作为输入层时需要添加一个参数
input_dim
。 -
作用
Dense层可在model中添加神经网络层,model.add(Dense())。
-
举例子
1
model.add(Dense(512, activation= 'sigmoid', input_dim=2))
input_dim= 2:输入是(*,2)的数组;
units=512:输出是 (*,512) 的数组;
由于Dense层的输出公式为:Out=Activation( Input·Kernel )+Bias,该Dense层的输入Input是(,2),输出Out是(,512),因此Bias和Kernel是(2,512)的向量。
注意:当input的秩小于等于2时,那么它直接与权重矩阵进行点乘;当input的秩大于2时,它首先被展平flatten,再计算与权重矩阵的点乘。
-
Dense层参数计算 由于Dense层的输出公式为:Out=Activation( Input·Kernel )+Bias,因此Dense层参数计算公式为:Param = (上一层神经元数量)x (本层的神经元数量) + (本层的神经元数量)。其中,(上一层神经元数量)x (本层的神经元数量)代表的是 的参数个数,加上的本层的神经元数量代表的是的参数。
1 2 3 4 5
from keras.layers import Dense from keras.models import Sequential model=Sequential() model.add(Dense(10,input_dim=5)) model.summary()
LoRA
-
LoRALayer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
class LoRALayer(): def __init__( self, r: int, lora_alpha: int, lora_dropout: float, merge_weights: bool, ): self.r = r self.lora_alpha = lora_alpha # Optional dropout if lora_dropout > 0.: self.lora_dropout = nn.Dropout(p=lora_dropout) else: self.lora_dropout = lambda x: x # Mark the weight as unmerged self.merged = False self.merge_weights = merge_weights
-
Embedding
输入嵌入:输入的 token ID 会先通过 embedding 层变成向量。
位置嵌入:加上位置编码,用于表示词在句子中的顺序。
共享嵌入:有些模型输入嵌入和输出的线性层权重会共享(比如 GPT)。
Embedding 的作用 作用解释 在这段代码中的体现 1️⃣ 映射词 ID → 连续向量 将输入的 token ID 转换为稠密向量,供模型使用 nn.Embedding.forward(self, x)
是主查表操作2️⃣ 可学习参数 每个 token 对应的向量都是可训练的 self.weight
是主 embedding 矩阵,lora_A
,lora_B
是可学习的 LoRA 增量3️⃣ 表征语义关系 相似词学出相近向量,是训练出来的 self.weight
+ LoRA 的动态增量,可以学出更精细的词向量表达(微调而不是全部更新)1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
class Embedding(nn.Embedding, LoRALayer): # LoRA implemented in a dense layer # 模型的Embedding层 def __init__( self, num_embeddings: int, embedding_dim: int, r: int = 0, lora_alpha: int = 1, merge_weights: bool = True, **kwargs ): nn.Embedding.__init__(self, num_embeddings, embedding_dim, **kwargs) LoRALayer.__init__(self, r=r, lora_alpha=lora_alpha, lora_dropout=0, merge_weights=merge_weights) # Actual trainable parameters if r > 0: self.lora_A = nn.Parameter(self.weight.new_zeros((r, num_embeddings))) self.lora_B = nn.Parameter(self.weight.new_zeros((embedding_dim, r))) self.scaling = self.lora_alpha / self.r # Freezing the pre-trained weight matrix self.weight.requires_grad = False self.reset_parameters() def reset_parameters(self): nn.Embedding.reset_parameters(self) if hasattr(self, 'lora_A'): # initialize A the same way as the default for nn.Linear and B to zero nn.init.zeros_(self.lora_A) nn.init.normal_(self.lora_B) def train(self, mode: bool = True): # 决定是训练模式还是评估模式 nn.Embedding.train(self, mode) if mode: # 训练模式 if self.merge_weights and self.merged: # Make sure that the weights are not merged if self.r > 0: self.weight.data -= (self.lora_B @ self.lora_A).transpose(0, 1) * self.scaling self.merged = False else: # 评估模式 if self.merge_weights and not self.merged: # Merge the weights and mark it if self.r > 0: self.weight.data += (self.lora_B @ self.lora_A).transpose(0, 1) * self.scaling self.merged = True def forward(self, x: torch.Tensor): # x 是输入的 token index(通常 shape 是 [batch_size, seq_len]),对应原始 embedding 的 lookup 操作。(相当于输入? if self.r > 0 and not self.merged: # 合并就用原始嵌入+动态生成方式 result = nn.Embedding.forward(self, x) after_A = F.embedding( x, self.lora_A.transpose(0, 1), self.padding_idx, self.max_norm, self.norm_type, self.scale_grad_by_freq, self.sparse ) result += (after_A @ self.lora_B.transpose(0, 1)) * self.scaling return result else: # 合并就直接查表 return nn.Embedding.forward(self, x)
-
MergedLinear:在一个标准的
nn.Linear
层中注入可训练的低秩 LoRA 参数,从而实现高效微调。参数名 类型 含义 示例 in_features
int
线性层输入的维度(也就是列数) 比如输入是 [batch, 768]
,那就是 768out_features
int
线性层输出的维度(也就是行数) 比如输出是 [batch, 3072]
,那就是 3072r
int
LoRA 的秩(rank),决定注入的低秩矩阵的大小。设为 0 就代表不使用 LoRA。 典型值如 4、8 lora_alpha
int
缩放因子,用于控制 LoRA 注入矩阵的幅度。最终结果会乘上 lora_alpha / r
如果 r=4
,alpha=16
,则 scaling=4lora_dropout
float
注入 LoRA 之前的 Dropout,增加鲁棒性,一般设为 0 或 0.05 表示丢弃比例 enable_lora
List[bool]
是否对每个输出头启用 LoRA(多头注意力用)。例如 [True, False, True]
代表三个头,开关为开关开。可选参数,不用多管 fan_in_fan_out
bool
控制权重矩阵的方向是否转置(兼容某些模型实现的习惯)。 True 时表示 W 的 shape 是反的 merge_weights
bool
是否在评估模式下合并权重(提高推理效率)。 True 表示合并到 weight
中**kwargs
任意参数 会被传给 nn.Linear
的构造函数,如bias=True
等一般不用改 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
class MergedLinear(nn.Linear, LoRALayer): # LoRA implemented in a dense layer # 模型的,,层 def __init__( self, in_features: int, out_features: int, r: int = 0, lora_alpha: int = 1, lora_dropout: float = 0., enable_lora: List[bool] = [False], fan_in_fan_out: bool = False, merge_weights: bool = True, **kwargs ): nn.Linear.__init__(self, in_features, out_features, **kwargs) LoRALayer.__init__(self, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout, merge_weights=merge_weights) assert out_features % len(enable_lora) == 0, \ 'The length of enable_lora must divide out_features' self.enable_lora = enable_lora self.fan_in_fan_out = fan_in_fan_out # Actual trainable parameters if r > 0 and any(enable_lora): self.lora_A = nn.Parameter( self.weight.new_zeros((r * sum(enable_lora), in_features))) self.lora_B = nn.Parameter( self.weight.new_zeros((out_features // len(enable_lora) * sum(enable_lora), r)) ) # weights for Conv1D with groups=sum(enable_lora) # 这里的lora_A和lora_B不会不匹配,因为LoRA中的A和B是按组计算并拼接的,运算是B @ A @ x self.scaling = self.lora_alpha / self.r # Freezing the pre-trained weight matrix self.weight.requires_grad = False # Compute the indices self.lora_ind = self.weight.new_zeros( (out_features, ), dtype=torch.bool ).view(len(enable_lora), -1) self.lora_ind[enable_lora, :] = True self.lora_ind = self.lora_ind.view(-1) self.reset_parameters() if fan_in_fan_out: self.weight.data = self.weight.data.transpose(0, 1) def reset_parameters(self): nn.Linear.reset_parameters(self) if hasattr(self, 'lora_A'): # initialize A the same way as the default for nn.Linear and B to zero nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5)) nn.init.zeros_(self.lora_B) def zero_pad(self, x): result = x.new_zeros((len(self.lora_ind), *x.shape[1:])) result[self.lora_ind] = x return result def merge_AB(self): def T(w): return w.transpose(0, 1) if self.fan_in_fan_out else w delta_w = F.conv1d( self.lora_A.unsqueeze(0), self.lora_B.unsqueeze(-1), groups=sum(self.enable_lora) ).squeeze(0) return T(self.zero_pad(delta_w)) def train(self, mode: bool = True): def T(w): return w.transpose(0, 1) if self.fan_in_fan_out else w nn.Linear.train(self, mode) if mode: if self.merge_weights and self.merged: # Make sure that the weights are not merged if self.r > 0 and any(self.enable_lora): self.weight.data -= self.merge_AB() * self.scaling self.merged = False else: if self.merge_weights and not self.merged: # Merge the weights and mark it if self.r > 0 and any(self.enable_lora): self.weight.data += self.merge_AB() * self.scaling self.merged = True def forward(self, x: torch.Tensor): def T(w): return w.transpose(0, 1) if self.fan_in_fan_out else w if self.merged: return F.linear(x, T(self.weight), bias=self.bias) else: result = F.linear(x, T(self.weight), bias=self.bias) if self.r > 0: result += self.lora_dropout(x) @ T(self.merge_AB().T) * self.scaling return result
-
ConvLoRA:LoRA的卷积计算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
class ConvLoRA(nn.Module, LoRALayer): def __init__(self, conv_module, in_channels, out_channels, kernel_size, r=0, lora_alpha=1, lora_dropout=0., merge_weights=True, **kwargs): super(ConvLoRA, self).__init__() self.conv = conv_module(in_channels, out_channels, kernel_size, **kwargs) for name, param in self.conv.named_parameters(): self.register_parameter(name, param) LoRALayer.__init__(self, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout, merge_weights=merge_weights) assert isinstance(kernel_size, int) # Actual trainable parameters if r > 0: self.lora_A = nn.Parameter( self.conv.weight.new_zeros((r * kernel_size, in_channels * kernel_size)) ) self.lora_B = nn.Parameter( self.conv.weight.new_zeros((out_channels//self.conv.groups*kernel_size, r*kernel_size)) ) self.scaling = self.lora_alpha / self.r # Freezing the pre-trained weight matrix self.conv.weight.requires_grad = False self.reset_parameters() self.merged = False def reset_parameters(self): self.conv.reset_parameters() if hasattr(self, 'lora_A'): # initialize A the same way as the default for nn.Linear and B to zero nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5)) nn.init.zeros_(self.lora_B) def train(self, mode=True): super(ConvLoRA, self).train(mode) if mode: if self.merge_weights and self.merged: if self.r > 0: # Make sure that the weights are not merged self.conv.weight.data -= (self.lora_B @ self.lora_A).view(self.conv.weight.shape) * self.scaling self.merged = False else: if self.merge_weights and not self.merged: if self.r > 0: # Merge the weights and mark it self.conv.weight.data += (self.lora_B @ self.lora_A).view(self.conv.weight.shape) * self.scaling self.merged = True def forward(self, x): if self.r > 0 and not self.merged: return self.conv._conv_forward( x, self.conv.weight + (self.lora_B @ self.lora_A).view(self.conv.weight.shape) * self.scaling, self.conv.bias ) return self.conv(x) class Conv2d(ConvLoRA): def __init__(self, *args, **kwargs): super(Conv2d, self).__init__(nn.Conv2d, *args, **kwargs) class Conv1d(ConvLoRA): def __init__(self, *args, **kwargs): super(Conv1d, self).__init__(nn.Conv1d, *args, **kwargs) # Can Extend to other ones like this class Conv3d(ConvLoRA): def __init__(self, *args, **kwargs): super(Conv3d, self).__init__(nn.Conv3d, *args, **kwargs)
参考资料:https://github.com/microsoft/LoRA/tree/main