Featured image of post torch.nn&LoRALayer

torch.nn&LoRALayer

学习LoRA的代码部分

基础知识

dense layer

  • Dense 层是Keras中构建神经网络的基本组件,用于添加全连接层,主要参数包括:

    输出神经元数量、激活函数、是否使用偏置等。

    1
    2
    3
    4
    5
    6
    7
    8
    
    model.add(Dense(units, #输出的大小(神经元个数)
                    activation=None, #激活函数
                    use_bias=True, #是否添加偏置
                    kernel_initializer='glorot_uniform', #权重矩阵初始化
                    bias_initializer='zeros', #偏置初始化
                    kernel_regularizer=None, #权重矩阵的正则函数
                    bias_regularizer=None,) #偏置的的正则函数
              )
    

    当Dense作为输入层时需要添加一个参数 input_dim

  • 作用

    Dense层可在model中添加神经网络层,model.add(Dense())。

  • 举例子

    1
    
    model.add(Dense(512, activation= 'sigmoid', input_dim=2))
    

    input_dim= 2:输入是(*,2)的数组;

    units=512:输出是 (*,512) 的数组;

    由于Dense层的输出公式为:Out=Activation( Input·Kernel )+Bias,该Dense层的输入Input是(,2),输出Out是(,512),因此Bias和Kernel是(2,512)的向量。

    注意:当input的秩小于等于2时,那么它直接与权重矩阵进行点乘;当input的秩大于2时,它首先被展平flatten,再计算与权重矩阵的点乘。

  • Dense层参数计算 由于Dense层的输出公式为:Out=Activation( Input·Kernel )+Bias,因此Dense层参数计算公式为:Param = (上一层神经元数量)x (本层的神经元数量) + (本层的神经元数量)。其中,(上一层神经元数量)x (本层的神经元数量)代表的是 的参数个数,加上的本层的神经元数量代表的是的参数。

    1
    2
    3
    4
    5
    
    from keras.layers import Dense
    from keras.models import Sequential
    model=Sequential()
    model.add(Dense(10,input_dim=5))
    model.summary()
    

LoRA

  • LoRALayer

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    class LoRALayer():
        def __init__(
            self, 
            r: int, 
            lora_alpha: int, 
            lora_dropout: float,
            merge_weights: bool,
        ):
            self.r = r
            self.lora_alpha = lora_alpha
            # Optional dropout
            if lora_dropout > 0.:
                self.lora_dropout = nn.Dropout(p=lora_dropout)
            else:
                self.lora_dropout = lambda x: x
            # Mark the weight as unmerged
            self.merged = False
            self.merge_weights = merge_weights
    
  • Embedding

    输入嵌入:输入的 token ID 会先通过 embedding 层变成向量。

    位置嵌入:加上位置编码,用于表示词在句子中的顺序。

    共享嵌入:有些模型输入嵌入和输出的线性层权重会共享(比如 GPT)。

    Embedding 的作用 作用解释 在这段代码中的体现
    1️⃣ 映射词 ID → 连续向量 将输入的 token ID 转换为稠密向量,供模型使用 nn.Embedding.forward(self, x) 是主查表操作
    2️⃣ 可学习参数 每个 token 对应的向量都是可训练的 self.weight 是主 embedding 矩阵,lora_A, lora_B 是可学习的 LoRA 增量
    3️⃣ 表征语义关系 相似词学出相近向量,是训练出来的 self.weight + LoRA 的动态增量,可以学出更精细的词向量表达(微调而不是全部更新)
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    
    class Embedding(nn.Embedding, LoRALayer):
        # LoRA implemented in a dense layer
        # 模型的Embedding层
        def __init__(
            self,
            num_embeddings: int,
            embedding_dim: int,
            r: int = 0,
            lora_alpha: int = 1,
            merge_weights: bool = True,
            **kwargs
        ):
            nn.Embedding.__init__(self, num_embeddings, embedding_dim, **kwargs)
            LoRALayer.__init__(self, r=r, lora_alpha=lora_alpha, lora_dropout=0,
                               merge_weights=merge_weights)
            # Actual trainable parameters
            if r > 0:
                self.lora_A = nn.Parameter(self.weight.new_zeros((r, num_embeddings)))
                self.lora_B = nn.Parameter(self.weight.new_zeros((embedding_dim, r)))
                self.scaling = self.lora_alpha / self.r
                # Freezing the pre-trained weight matrix
                self.weight.requires_grad = False
            self.reset_parameters()
    
        def reset_parameters(self):
            nn.Embedding.reset_parameters(self)
            if hasattr(self, 'lora_A'):
                # initialize A the same way as the default for nn.Linear and B to zero
                nn.init.zeros_(self.lora_A)
                nn.init.normal_(self.lora_B)
    
        def train(self, mode: bool = True):
          	# 决定是训练模式还是评估模式 
            nn.Embedding.train(self, mode)
            if mode:
              	# 训练模式
                if self.merge_weights and self.merged:
                    # Make sure that the weights are not merged
                    if self.r > 0:
                        self.weight.data -= (self.lora_B @ self.lora_A).transpose(0, 1) * self.scaling
                    self.merged = False
            else:
              	# 评估模式
                if self.merge_weights and not self.merged:
                    # Merge the weights and mark it
                    if self.r > 0:
                        self.weight.data += (self.lora_B @ self.lora_A).transpose(0, 1) * self.scaling
                    self.merged = True
    
        def forward(self, x: torch.Tensor):
          	# x 是输入的 token index(通常 shape 是 [batch_size, seq_len]),对应原始 embedding 的 lookup 操作。(相当于输入?
            if self.r > 0 and not self.merged: # 合并就用原始嵌入+动态生成方式
                result = nn.Embedding.forward(self, x)
                after_A = F.embedding(
                    x, self.lora_A.transpose(0, 1), self.padding_idx, self.max_norm,
                    self.norm_type, self.scale_grad_by_freq, self.sparse
                )
                result += (after_A @ self.lora_B.transpose(0, 1)) * self.scaling
                return result
            else: # 合并就直接查表
                return nn.Embedding.forward(self, x)
    
  • MergedLinear:在一个标准的 nn.Linear 层中注入可训练的低秩 LoRA 参数,从而实现高效微调。

    参数名 类型 含义 示例
    in_features int 线性层输入的维度(也就是列数) 比如输入是 [batch, 768],那就是 768
    out_features int 线性层输出的维度(也就是行数) 比如输出是 [batch, 3072],那就是 3072
    r int LoRA 的秩(rank),决定注入的低秩矩阵的大小。设为 0 就代表不使用 LoRA。 典型值如 4、8
    lora_alpha int 缩放因子,用于控制 LoRA 注入矩阵的幅度。最终结果会乘上 lora_alpha / r 如果 r=4, alpha=16,则 scaling=4
    lora_dropout float 注入 LoRA 之前的 Dropout,增加鲁棒性,一般设为 0 或 0.05 表示丢弃比例
    enable_lora List[bool] 是否对每个输出头启用 LoRA(多头注意力用)。例如 [True, False, True] 代表三个头,开关为开关开。 可选参数,不用多管
    fan_in_fan_out bool 控制权重矩阵的方向是否转置(兼容某些模型实现的习惯)。 True 时表示 W 的 shape 是反的
    merge_weights bool 是否在评估模式下合并权重(提高推理效率)。 True 表示合并到 weight
    **kwargs 任意参数 会被传给 nn.Linear 的构造函数,如 bias=True 一般不用改
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    
    class MergedLinear(nn.Linear, LoRALayer):
        # LoRA implemented in a dense layer
        # 模型的,,层
        def __init__(
            self, 
            in_features: int, 
            out_features: int, 
            r: int = 0, 
            lora_alpha: int = 1, 
            lora_dropout: float = 0.,
            enable_lora: List[bool] = [False],
            fan_in_fan_out: bool = False,
            merge_weights: bool = True,
            **kwargs
        ):
            nn.Linear.__init__(self, in_features, out_features, **kwargs)
            LoRALayer.__init__(self, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout,
                               merge_weights=merge_weights)
            assert out_features % len(enable_lora) == 0, \
                'The length of enable_lora must divide out_features'
            self.enable_lora = enable_lora
            self.fan_in_fan_out = fan_in_fan_out
            # Actual trainable parameters
            if r > 0 and any(enable_lora):
                self.lora_A = nn.Parameter(
                    self.weight.new_zeros((r * sum(enable_lora), in_features)))
                self.lora_B = nn.Parameter(
                    self.weight.new_zeros((out_features // len(enable_lora) * sum(enable_lora), r))
                ) # weights for Conv1D with groups=sum(enable_lora)
                # 这里的lora_A和lora_B不会不匹配,因为LoRA中的A和B是按组计算并拼接的,运算是B @ A @ x
                self.scaling = self.lora_alpha / self.r
                # Freezing the pre-trained weight matrix
                self.weight.requires_grad = False
                # Compute the indices
                self.lora_ind = self.weight.new_zeros(
                    (out_features, ), dtype=torch.bool
                ).view(len(enable_lora), -1)
                self.lora_ind[enable_lora, :] = True
                self.lora_ind = self.lora_ind.view(-1)
            self.reset_parameters()
            if fan_in_fan_out:
                self.weight.data = self.weight.data.transpose(0, 1)
    
        def reset_parameters(self):
            nn.Linear.reset_parameters(self)
            if hasattr(self, 'lora_A'):
                # initialize A the same way as the default for nn.Linear and B to zero
                nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
                nn.init.zeros_(self.lora_B)
    
        def zero_pad(self, x):
            result = x.new_zeros((len(self.lora_ind), *x.shape[1:]))
            result[self.lora_ind] = x
            return result
    
        def merge_AB(self):
            def T(w):
                return w.transpose(0, 1) if self.fan_in_fan_out else w
            delta_w = F.conv1d(
                self.lora_A.unsqueeze(0), 
                self.lora_B.unsqueeze(-1), 
                groups=sum(self.enable_lora)
            ).squeeze(0)
            return T(self.zero_pad(delta_w))
    
        def train(self, mode: bool = True):
            def T(w):
                return w.transpose(0, 1) if self.fan_in_fan_out else w
            nn.Linear.train(self, mode)
            if mode:
                if self.merge_weights and self.merged:
                    # Make sure that the weights are not merged
                    if self.r > 0 and any(self.enable_lora):
                        self.weight.data -= self.merge_AB() * self.scaling
                    self.merged = False
            else:
                if self.merge_weights and not self.merged:
                    # Merge the weights and mark it
                    if self.r > 0 and any(self.enable_lora):
                        self.weight.data += self.merge_AB() * self.scaling
                    self.merged = True        
    
        def forward(self, x: torch.Tensor):
            def T(w):
                return w.transpose(0, 1) if self.fan_in_fan_out else w
            if self.merged:
                return F.linear(x, T(self.weight), bias=self.bias)
            else:
                result = F.linear(x, T(self.weight), bias=self.bias)
                if self.r > 0:
                    result += self.lora_dropout(x) @ T(self.merge_AB().T) * self.scaling
                return result
    
  • ConvLoRA:LoRA的卷积计算

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    
    class ConvLoRA(nn.Module, LoRALayer):
        def __init__(self, conv_module, in_channels, out_channels, kernel_size, r=0, lora_alpha=1, lora_dropout=0., merge_weights=True, **kwargs):
            super(ConvLoRA, self).__init__()
            self.conv = conv_module(in_channels, out_channels, kernel_size, **kwargs)
            for name, param in self.conv.named_parameters():
                self.register_parameter(name, param)
            LoRALayer.__init__(self, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout, merge_weights=merge_weights)
            assert isinstance(kernel_size, int)
            # Actual trainable parameters
            if r > 0:
                self.lora_A = nn.Parameter(
                    self.conv.weight.new_zeros((r * kernel_size, in_channels * kernel_size))
                )
                self.lora_B = nn.Parameter(
                  self.conv.weight.new_zeros((out_channels//self.conv.groups*kernel_size, r*kernel_size))
                )
                self.scaling = self.lora_alpha / self.r
                # Freezing the pre-trained weight matrix
                self.conv.weight.requires_grad = False
            self.reset_parameters()
            self.merged = False
    
        def reset_parameters(self):
            self.conv.reset_parameters()
            if hasattr(self, 'lora_A'):
                # initialize A the same way as the default for nn.Linear and B to zero
                nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
                nn.init.zeros_(self.lora_B)
    
        def train(self, mode=True):
            super(ConvLoRA, self).train(mode)
            if mode:
                if self.merge_weights and self.merged:
                    if self.r > 0:
                        # Make sure that the weights are not merged
                        self.conv.weight.data -= (self.lora_B @ self.lora_A).view(self.conv.weight.shape) * self.scaling
                    self.merged = False
            else:
                if self.merge_weights and not self.merged:
                    if self.r > 0:
                        # Merge the weights and mark it
                        self.conv.weight.data += (self.lora_B @ self.lora_A).view(self.conv.weight.shape) * self.scaling
                    self.merged = True
    
        def forward(self, x):
            if self.r > 0 and not self.merged:
                return self.conv._conv_forward(
                    x, 
                    self.conv.weight + (self.lora_B @ self.lora_A).view(self.conv.weight.shape) * self.scaling,
                    self.conv.bias
                )
            return self.conv(x)
    
    class Conv2d(ConvLoRA):
        def __init__(self, *args, **kwargs):
            super(Conv2d, self).__init__(nn.Conv2d, *args, **kwargs)
    
    class Conv1d(ConvLoRA):
        def __init__(self, *args, **kwargs):
            super(Conv1d, self).__init__(nn.Conv1d, *args, **kwargs)
    
    # Can Extend to other ones like this
    
    class Conv3d(ConvLoRA):
        def __init__(self, *args, **kwargs):
            super(Conv3d, self).__init__(nn.Conv3d, *args, **kwargs)
    

参考资料:https://github.com/microsoft/LoRA/tree/main

自定义文本
使用 Hugo 构建
主题 StackJimmy 设计