Jasaxion一只大雄

风打,碎琉璃; 打不碎的,是那阳光漫地。The wind strikes, shattering the glazed glass; Yet unbroken remains the sunlight spilling across the earth.

[强化学习]RL 学习以及开源 RL 框架的一些个人解读

Jasaxion / 2025-11-08


主页
少雄@Jasaxion(email: jasaxion@gmail.com)
Blog: https://jasaxion.github.io

本着想要学习一下 verl,RL 火了这么久了,一直都是零零散散学的东西,于是顺便把 RL 的一些碎片的知识整理了一下。

前置知识

前置知识,主要是一些名词&概念解释

前置知识(I):RL 相关

前置知识(II):训练&并行策略相关

前置知识(III):分布式架构相关

相关论文

verl「HybirdFlow」

HybridFlow: A Flexible and Efficient RLHF Framework

创新点:
1)HybridFlow,以混合方式结合单控制器与多控制器范式,实现RLHF数据流的灵活表征与高效执行。

2)3D-HybridEngine以实现训练与生成阶段间执行者模型的重分片,在达成零内存冗余的同时显著降低通信开销

一些常见的 RL算法的数据流图:可以划分为生成阶段–>预处理阶段–>训练阶段。

image-20251029192929433

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
for prompts in dataloader:
    #第一阶段:推理生成 response
    batch = actor.generate_sequences(prompts)
    
    #第二阶段:准备训练数据
    batch = critic.compute_values(batch) #计算价值
    batch = reference.compute_log_prob(batch) #计算参考网络损失
    batch = reward.compute_reward(batch) #计算奖励
    batch = compute_advantages(batch) #计算优势函数
    
    #第三阶段:继续 actor 和 critic 模型的训练
    critic_metrics = critic.update_critic(batch)
    actor_metrics = actor.update_actor(batch)

HybirdFlow给出了一个非常详细的 AI Infra 的各种并行的交替流程(TP,DP,PP 等),值得一提的是,目前verl 似乎没有开源 Hybird3D flow 的过程,不过当前的版本已经有足够多的优化了。

原文过多强调如何去优化这个 RL 训练流程的,对 RL 的具体算法过程表述比较少,所以推荐另外一篇OpenRLHF,论文中对如何实现 PPO 的过程讲得比较详细。

拓展阅读框架:OpenRLHF

https://arxiv.org/pdf/2405.11143,几乎跟 verl 同期发布的一个RL 框架

OpenRLHF 也是基于 Ray 来完成分布式训练,这是 AI Infra 架构图

下面这张图展示 PPO 算法的一个完整的代码实现逻辑

RL数据产生的过长,经过左侧一系列算法过程,把数据放在 Replay Buffer 内。

流程,左侧全部用于计算和返回 Advantages

  1. Prompt $x$ 输入给 Actor Model 完成一个 Response 的输出
  2. 将 Prompt 与 Response 结合起来输入给 Actor Model 和 Ref Model,分别得到 2 个 logits 值,用于计算 KL散度;
  3. 将 Prompt 与 Response 结合起来输入Critic Model,用于生成 Value 值;
  4. 将 Prompt 与 Response 结合起来输入Reward Model,用于生成 Reward 值;
  5. 优势计算采用 GAE,注意这里 RLHF 的设计方式是把 KL 散度融合到了 Reward 里面,从而影响了优势函数,换言之优势函数已经包含了 KL 惩罚
    $\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$
    ​$A_t = \sum_l (\gamma \lambda)^l \delta_{t+l}$

在完成优势函数的计算之后,分别训练 Actor Model 和 Critic Model

OpenRLHF 设计了多个“引擎”分别负责不同任务:

关于 OpenRLHF 的相关代码解读,可以查阅:OpenRLHF-PPO 实践

High-level看verl

RL算法核心PPO

基础的强化学习认识

image-20251030151237633

在一个特定环境下执行动作的智能体

特定行为在策略下的概率建模为$π_θ(a_t | s_t)$

环境状态的转移函数记作为$P(s_t+1 | a_t, s_t)$ <–在LLM RL 中这一点被重新建模为$s_t = {x, a_1, a_2, …, a_t}$ ,这里的 $x$一般代表提示词

每个动作($a_t$)、奖励($r_t$)和状态($s_t$)都与时间步长t相关联。将这些时间步长组合在一起就形成了一条轨迹:

image-20251030152045738

使用链式法则,可以得到概率轨迹的完整表达式:

image-20251030152221563

强化学习的目标

题外话 1:存在一些变体,例如是否期望尽早获得奖励而非延迟获得,这其实也是一个值得平衡的问题。

探索与利用的平衡是强化学习的核心难题之一。探索(Exploration)指智能体尝试未知的动作,以期发现更优的策略;利用(Exploitation)则指智能体基于现有知识选择最优动作,从而最大化即时奖励。二者看似对立,但必须平衡,才能在有限的尝试次数内获得最大的累计奖励。

我们可以通过 ​K 臂老虎机问题(K-armed Bandit) 直观理解:在这个理论模型中,智能体面临 K 个选择,每个选择的奖励分布未知,目标是通过策略最大化奖励。

为解决这一问题,常用方法例如 ε-贪婪策略(ϵ-greedy),以一定概率 ϵ 随机探索,而以 1−ϵ 的概率选择当前最优动作。

image-20251030152654356

引入一个折现因子$\gamma$

一般情况下,强化学习的目标通常可以表达为期望的累计奖励,其中的期望值是对轨迹进行计算,可以用连续情况和离散情况进行表述。

image-20251030152818516

定义新的函数:状态、价值和优势函数,这些与强化学习目标息息相关

题外话 2:强化学习的两种范式

  1. Value-based 方法(值函数方法):

    1. 学习一个 动作价值函数 $Q(s,a)$,用于估计“在状态 $s$ 执行动作 $a$ 后能得到多少未来奖励”。

    2. 策略不是显式表示的,而是通过 Q 值间接获得

      1. $Q^*(s,a) = \mathbb{E}[r_t + \gamma \max_{a'} Q^*(s', a')]$
      2. Bellman Optimality Equation(贝尔曼最优方程)
    3. 代表算法:Q-Learning,DQN

  2. Policy-based 方法(策略梯度方法):

    1. 不再学习“值函数”,而是直接学习策略函数 $\pi_\theta(a|s)$。
    2. 策略可以是确定性的,也可以是随机的(如 softmax over logits)
    3. $\nabla_\theta J(\theta) = \mathbb{E}*{\pi*\theta} \left[ \nabla_\theta \log \pi_\theta(a_t|s_t) R_t \right]$ 即直接通过梯度上升优化策略,使得高奖励的动作概率更大
    4. 代表算法:REINFORCE,Actor-Critic,PPO(主流)

LLM 对 RL 的重新定义

一些概念的映射

image-20251030160055086

题外话 3:关于 LLM 模式中的一些状态定义

LLM通过下一个词元预测来生成输出;即通过逐个生成输出补全中的每个词元「LLM 的自回归范式」

image-20251030161635687

对于这样一个自回归过程,可以建模为 MDP 过程:

image-20251030161715435

强化学习训练

对于强化学习的训练,我们的目标是最大化目标函数,即累积(可能经过折扣的)奖励,基于 Policy-based 的范式,可以直接采用梯度上升的方法:

image-20251030161855232

备注:

那如何计算这个梯度?

几乎所有用于大语言模型训练的强化学习优化器(例如PPO 、GRPO和REINFORCE)都属于策略梯度算法,其运作方式为:i)估计策略梯度,ii)使用该估计值执行梯度上升。这些算法采用不同方法估计策略梯度,但后面的思想其实非常相似。

强化学习的目标是最大化累积奖励。如果我们尝试计算该目标相对于策略参数θ的梯度,可以推导出如下结果,最终得到策略梯度的基本表达式。

image-20251030162406788

推导过程,主要是对对数导数的使用技巧以及最后一步引入轨迹概率的定义。对于最后一步,我们可以发现:初始状态概率和状态转移函数关于策略参数的梯度始终为零,因为这两者均不依赖于策略。

因为初始状态分布 $(\rho_0(s_0)$) 和状态转移概率 ($P(s_{t+1}|s_t, a_t)$) 都由环境决定、与策略参数 ($\theta)$ 无关,所以它们关于 ($\theta$) 的梯度为 0。
因此,轨迹对数概率的梯度只依赖于策略本身:
$\nabla_\theta \log P(\tau|\theta) = \sum_t \nabla_\theta \log \pi_\theta(a_t|s_t)$

image-20251030162748692

  1. 奖励从哪来?验证器或奖励模型
  2. action 的概率从哪来?LLM 的 logits

伪代码实现策略梯度的基本表达式,我们并非直接计算策略梯度,而是构建一个损失函数,使其梯度等于策略梯度,然后借助PyTorch的自动微分功能来计算策略梯度,这一过程发生在loss.backward()执行时。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
with torch.no_grad():
    # B(batch size), G(generation per prompt), L(max sequence length)
    completions = LLM(prompts);
    rewards = RM(completions);
    
completion_mask = <... mask padding token>
llm_out = LLM(completions)
token_logp = F.log_softmax(llm_out, dim=-1) #计算logits

#计算损失,取负数的目的是将求最大值转化为求最小值的过程,这样可以直接使用 torck 的梯度下降的机制
#只计算有效 token 的 - reward * log_prob,忽略 padding
loss = (- token_logp * rewards) * completion_mask

#聚合loss(一些归一化操作)
#每个样本内部按有效 token 平均,避免长句子损失更大带来的 bias
loss = (loss.sum(axis=-1) / completions_mask.sum(axis=-1)).mean()
#按整个 batch 的有效 token 总数进行平均,确保 loss 不依赖 batch 形状,实际上跟上一步是等价的
loss = loss.sum() / completion_mask.sum()
#额外的归一化
loss = ((loss.sum(axis=-1)) / const).mean()

#更新梯度
optimizer.zero_grad()
loss.backward()
optimizer.step()

image-20251030163837472

以上是基础的策略梯度的表达式和伪代码,简单直接,但存在几个明显的问题:

  1. 高方差:梯度估计可能具有高方差,导致训练不稳定。
  2. 策略更新不稳定:缺乏防止对策略进行大规模、可能破坏稳定性的更新的机制。

题外话4:通用策略梯度,通过更通用的策略梯度表达式总结了计算策略梯度的可选方法

image-20251030164936881

PPO 以及绝大多数LLM强化学习优化器,都聚焦于将$Ψ_t$设定为优势函数$A(s_t, a_t)$ 这个称为原始策略梯度 VPG

image-20251030165023700

开始PPO(Proximal Policy Optimization)

前身TRPO

来到开胃菜,先看看PPO的前身TRPO

TRPO的核心动机是创建一种数据效率高且无需过多超参数调整的算法。为实现这一目标,研究者提出了以下约束目标函数,该函数能保证策略的单调改进。该目标通过强制策略更新处于置信域内,从而消除了可能破坏训练稳定性的大幅度策略更新风险。

在强化学习中,我们的目标是最大化累积奖励,但正如我们在VPG的讨论中所见,直接最大化这个强化学习的"真实"目标可能导致训练不稳定。TRPO通过构建替代目标来替代真实目标进行最大化。

与 VPG 的差异

  1. 当前策略中的动作概率通过该动作在旧策略(即训练前的策略)中的概率进行归一化——这构成了策略比率(也称为重要性比率)。在此公式中,我们同样使用概率而非对数概率。
  2. 目标函数上设置了一个约束条件,以确保新旧策略之间的期望KL散度小于阈值δ。

$r_t$ 策略比率:TRPO损失函数的核心是策略比率,其定义如下所示。策略比率告诉我们,在当前策略下执行某个动作的概率相对于训练过程开始前(即"旧"策略)该动作概率的比值。

image-20251030165615017

若新策略对某个动作的赋予概率高于旧策略,则该比率大于1,从而增强该动作优势值在目标函数中的影响力;反之,若新策略赋予的概率较低,比率则小于1,相应动作的影响力便会减弱。策略比率机制确保策略更新过程能够重点强化新策略更倾向于采用的动作——特别是那些具有高优势值的动作——同时抑制新策略中可能性降低的动作。通过这种方式,我们能够根据新旧策略的差异程度进行精确加权更新,从而实现稳定高效的政策优化。

从 TRPO 到 PPO

于是希望开发一种算法,既能保留TRPO的优势——如稳定性、数据效率和可靠性——又能避免其复杂性。理想情况下,该算法应具有广泛适用性,并能通过基本梯度上升方法求解。这些目标促使我们提出了PPO算法,该算法主要受TRPO启发。

PPO的目标函数借鉴了TRPO的代理目标,但通过用剪裁机制替代严格的KL约束,以更简单的方式实现了信任区域的实施。

image-20251030170232080

训练过程与TRPO类似,PPO专注于优化替代目标函数,但PPO中的目标函数没有约束条件且经过轻微修改。如上算法所示,PPO在每一步中执行多次策略更新,而非单次更新,具体交替执行以下操作:

  1. 从策略中采样新数据或轨迹。
  2. 对采样数据执行多个周期的优化。

PPO 替代目标

PPO 的替代目标也是基于当前策略与旧模型之间的策略比率,我们将策略比率记为$r_t(θ)$,这与表示时间步t奖励的$r_t$符号相似,但策略比率与奖励无关!为推导PPO目标,我们从TRPO在无KL约束条件下最大化的代理目标开始,具体如下所示:

image-20251030170913976

我们将这一公式称为"无裁剪"目标函数。由于没有约束条件,该目标函数可以轻松计算得出策略梯度,具体通过:i) 估计优势函数,ii) 计算策略比率。然而,如果我们试图最大化这个无约束目标函数,可能会导致巨大且破坏性的策略梯度更新,从而使训练过程变得不稳定。

Clip 裁剪机制「方式 1」

为解决这个问题,PPO在替代目标函数中引入了一种新颖的裁剪机制,该机制有助于维持信任区域,通过这种方式,PPO算法抑制过大的策略比率,从而确保策略在训练更新后不会与旧策略产生过大偏离。

image-20251030171000011

题外话 5:一些对 Clip 函数的理解

  1. 案例 #1 [ $A > 0 , r_t(θ) ≤ 1 + ε$]:优势值为正——这是我们希望强化的行为。策略比率低于 $1 + ε$,因此我们执行常规策略梯度更新来增加该行为的概率。
  2. 案例 #2 [ $A > 0 , r_t(θ) > 1 + ε$]:优势函数再次为正,但我们的策略比率大于$1 + ε$。这意味着相较于旧策略,该动作在新策略中已经更可能出现。此时目标函数会被截断,且策略比率进一步增大的梯度为零。这可以防止策略使该动作出现的概率继续增加。
  3. 案例三 [ $A < 0 , r_t(θ) ≥ 1 - ε$]:优势函数为负——这是我们需要负向强化的动作(即降低概率)。我们的策略比率高于$1 - ε$,因此执行常规策略梯度更新来降低该动作的概率。
  4. 案例 #4 [ $A < 0 , r_t(θ) < 1 - ε$]:优势函数再次为负,但我们的策略比率小于 $1 - ε$。这意味着相较于旧策略,该动作在新策略中已经更不可能发生。目标函数被截断,且关于策略比率进一步降低的梯度为零。这防止策略使该动作发生的可能性进一步降低。
KL散度「方式 2」

对于TRPO的替代目标的求解,这个求解过程相当复杂,还需要求二阶优化(共轭梯度近似),但是可以转换一下求解目标,例如将 KL 散度作为惩罚项加入到损失函数,这种无约束的损失更为简单,并且可以再次通过基本梯度上升法进行求解。

image-20251030165910011

在使用PPO训练LLM时,我们通常会将当前策略与参考策略(通常是强化学习训练开始前的某个策略,例如SFT模型)之间的KL散度纳入训练过程。这个附加的KL散度项会惩罚策略与参考策略差异过大的情况,从而起到正则化效果。我们通过比较两个LLM对序列中每个标记输出的概率分布来计算逐标记的KL散度。

两种加入方式:

  1. RL的奖励中减去KL散度

    image-20251030171907609

  2. 作为训练目标的惩罚项「⭐️现在用的比较多,但DAPO直接去掉了」

    image-20251030171929164

Critic价值网络

image-20251030172516497

例如,我们可以创建策略的独立副本,或者——为了获得更好的参数效率——添加一个与策略共享权重的专用价值头来预测价值函数。这种习得的价值函数通常被称为价值模型或Critic。Critic以部分响应作为输入,预测序列中每个标记位置的预期最终奖励;详见下文。

在大语言模型的语境中,我们通过奖励模型来预测奖励值。此外,大多数大语言模型采用结果监督进行训练,这意味着只有在模型生成完整回复后(即输出

image-20251030172627247

Critic training,价值函数采用 On-policy 的模式,依赖于我们策略的当前参数。与强化学习训练开始时固定的奖励模型不同,评论者在每次策略更新中与大型语言模型同步训练,以确保其预测保持同策略性,这被称为 actor-critic setup。具体实现方式是在替代目标的损失函数基础上,额外添加评论者预测奖励与实际奖励之间的均方误差损失。

⭐️伪代码实现
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import torch
import torch.nn.functional as F

# 常数定义
kl_beta = 0.1 #控制 KL 惩罚项的权重
critic_weight = 0.5 # critic 损失(价值函数)在总损失中的权重
ppo_eps = 0.2 # PPO clip 参数 ε,用于限制策略更新幅度

# 完成生成和奖励
with torch.no_grad():
    completions = LLM.generate(prompts)  # (B*G, L) 从 LLM 根据输入 prompt 生成文本序列

    rewards = RM(completions)  # (B*G, 1) 奖励模型 RM 给出每个生成序列的整体奖励分数

# 掩码生成 padding
completion_mask = <... mask out padding tokens ...>

# 计算价值函数/由critic输出
values = CRITIC(completions)  # (B*G, L) - 计算每个 token 的预测奖励,critic 输出每个 token 对应的预期回报(V 值)

# 获取每个动作的策略对数概率
llm_out = LLM(completions)
per_token_logps = F.log_softmax(llm_out, dim=-1)  # (B*G, L),得到策略在每个 token 的 log 概率

# 获取每个动作的参考对数概率
ref_out = REF(completions)
ref_per_token_logps = F.log_softmax(ref_out, dim=-1)  # (B*G, L),参考策略 log 概率

# 计算策略与参考策略之间的KL散度
kl_div = per_token_logps - ref_per_token_logps # 越大代表当前策略与参考模型差异越大

# 奖励中加入对策略偏离参考模型的惩罚项(防止模型发散)
# 注意:KL 是按 token 计算的,因此每个 token 都有一个修正后的奖励
rewards -= kl_beta * kl_div # (B*G, L)

# (Reward-value)使用 Critic 的预测值(values)作为 baseline,减少方差
advantage = rewards - values.detach()  # (B*G, L)

# compute the policy ratio
# NOTE: old_per_token_logps must be persisted during first policy
# update for this batch of data and re-used in each subsequent update
#old_per_token_logps 是更新前策略的 log 概率(用于稳定训练)
policy_ratio = torch.exp(
    per_token_logps - old_per_token_logps,
)  # (B*G, L)
# 将比率裁剪到 [1-ε, 1+ε],防止策略更新过大
clip_policy_ratio = torch.clamp(
    policy_ratio,
    min=1.0 - ppo_eps,
    max=1.0 + ppo_eps,
)

# 取最小值对应的项以限制更新(clip objective)
ppo_loss = torch.min(
    advantage * policy_ratio,
    advantage * clip_policy_ratio,
)  # (B*G, L)
#  PyTorch 默认做梯度下降,因此取负号(想要最大化目标)
ppo_loss = -ppo_loss

# 计算 Critic 的 MSE 损失
critic_loss = ((rewards - values) ** 2)  # (B*G, L),最小化预测值与真实奖励的误差
# 合并损失,总损失 = 策略损失 + critic 损失 * 权重
loss = ppo_loss + critic_weight * critic_loss

# 聚合(平均)token 级损失
# 对 padding 掩码求加权平均,得到 batch 级损失
loss = ((loss * completion_mask).sum(axis=-1) /
        completion_mask.sum(axis=-1)).mean()

# 反向传播与参数更新policy 模型
optimizer.zero_grad()
loss.backward()
optimizer.step() # 更新参数(policy + critic)
求取优势函数-GAE(Generalized Advantage Estimation

优势函数告诉我们,在特定状态下某个动作比平均动作优越多少:$A(s_t, a_t) = Q(s_t, a_t) - V(s_t)$。该公式中的价值函数由评论家估计,但我们尚未详细讨论如何计算优势函数。在PPO中,优势函数是基于每个标记(或动作)进行估计的。计算优势主要有两种方法,这些方法构成了大多数其他技术的基础。

求解优势函数的一些方法:

  1. 蒙特卡洛法(MC)。蒙特卡洛法对优势函数的估计依赖于完整轨迹中观测到的实际奖励。具体而言,优势函数计算为完整轨迹的累积奖励 $R(s_t)$与评论家预测的当前状态价值函数 $V(s_t)$之间的差值。

    1. 到目前为止,我们对PPO的讨论都假设采用蒙特卡洛方法来估计优势函数。蒙特卡洛估计具有较低的偏差,因为它依赖于轨迹观测到的实际奖励(精确信息),但蒙特卡洛估计也存在高方差的问题。因此,我们需要采集大量样本并进行足够多的观测才能获得准确的优势估计——这可能会带来高昂的计算成本。
    2. 无偏估计,方差极高
    3. 等想办法优化一下,于是就有了时序差分
  2. 时序差分。TD残差利用评论家对每个令牌的价值预测来形成一步优势估计,如下所示。

    image-20251030174312461

    1. N 步估计器,TD残差分析单步实际奖励与预期奖励之间的差异。但我们可以推广这一思路来捕捉任意步数的情况,N步优势估计器与TD残差具有相似结构,但它融合了N个状态的真实奖励,其中N可以大于1。

      image-20251030175535466

    2. 不同的 N 表示着不同的方差与偏差的权衡。

  3. GAE能更好地平衡偏差-方差权衡。传统的单步优势估计可能引入过多偏差,而使用完整轨迹又往往存在高方差问题。GAE通过结合两种思想——多步预测和加权滑动平均(或仅采用其中一种)来解决这个问题。

    1. 广义优势估计(GAE)是PPO中最常用的优势估计方法,它利用了N步优势估计。不过,GAE并非选择单一的N值,而是通过取不同N值的N步优势估计的平均值来使用所有N值。这是通过为GAE引入混合参数λ来实现的。

    2. image-20251030175916262

    3. 将λ设为0会产生单步时序差分残差,因为只有求和中的第一项获得非零权重。此外,将λ设为1则恢复了蒙特卡洛估计 MC。

    4. 展开求和中每个时序差分残差的定义,可得到累计折扣奖励与当前状态值函数之间的差值,如下:

      image-20251030180024226

    5. GAE的优势在于λ∈[0,1]的取值能够控制偏差-方差的权衡。当我们增大λ值时,优势估计中会使用更精确的奖励信息,从而降低偏差(但会增加方差)。同样地,我们可以使用较小的λ值来降低方差,但代价是更高的偏差。

    6. 结果奖励。当我们使用LLMs时,通常采用结果奖励设置,这简化了GAE。除非处于轨迹的最后一步,否则奖励始终为零。在这种情况下,我们GAE求和中的大多数TD残差项仅仅是两个时间步之间(折现后)价值函数的差值$γV(s_{t + 1}) - V(s_t)$。求和中的最后一项则包含轨迹实际观察到的结果奖励。

      时间步奖励 (r_t)TD 残差 ( \delta_t )含义
      中间步骤0$( \gamma V(s_{t+1}) - V(s_t) )$价值变化(预测更新)
      最后一步结果奖励 (R)$( R - V(s_{T-1}) )$真正的反馈信号

GAE 的代码实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import torch

# 初始化一个空列表,用于反向存储每一步的优势值
advantages_reversed = []

#GAE 是 从后往前 计算的,因为每个时间步的优势依赖下一个时间步
lastgaelam = 0
gen_length = responses.shape[1]
for t in reversed(range(gen_length)):
    if t < gen_length - 1:
        # 获取时间 t + 1 的价值模型预测
        nextvalues = values[:, t + 1]
    else:
        # 对最后一个时间步,没有 “下一个状态”,设 nextvalues = 0
        nextvalues = 0.0

    # 计算 TD 残差,在 LLM 中,大多数时间步 rewards_t=0,
    delta = rewards[:, t] + gamma * nextvalues - values[:, t]

    # 计算 GAE 递推项
    lastgaelam = delta + gamma * lam * lastgaelam
    
    #保存结果
    advantages_reversed.append(lastgaelam)

# 因为是反向遍历,所以需要之后再把顺序倒过来。
advantages = torch.stack(advantages_reversed[::-1], axis=1)

在LLM中使用PPO

两种常用于训练大语言模型的不同强化学习训练类型,区别主要在于如何获取奖励信息。

image-20251030181446588

  1. 基于人类反馈的强化学习(RLHF)通过使用源自人类偏好奖励模型的奖励,采用强化学习方法来训练大型语言模型。
  2. 基于可验证奖励的强化学习(RLVR)通过使用源自基于规则或确定性验证器的奖励,对大型语言模型进行强化学习训练。

PPO的缺点。尽管它迅速成为RLHF的默认强化学习优化器,但PPO是一种复杂的actor-critic算法,具有较高的计算和内存开销,以及许多底层实现复杂性。PPO的内存开销很高,因为我们需要在内存中保存四个LLM副本:

  1. The policy: 训练的模型 「参数需要更新」
  2. The reference policy:参考模型,原来的模型 「参数不更新」
  3. The critic:计算价值函数的模型 「参数需要更新」
  4. The reward model:可能需要

题外话6:老是容易搞混policy模型和critic模型,他们共用一套架构,但出自不同头,下面这个代码可以清晰展示,一个是输出一个概率,一个是输出一个标量。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class ActorCriticModel(nn.Module):
    def __init__(self, base_model):
        super().__init__()
        self.shared = base_model
        self.policy_head = nn.Linear(hidden_dim, vocab_size)
        self.value_head = nn.Linear(hidden_dim, 1)

    def forward(self, input_ids):
        hidden = self.shared(input_ids)
        logits = self.policy_head(hidden)
        values = self.value_head(hidden)
        return logits, values

同时也需要注意,PPO 的开销实在是太大了,PPO是一种敏感的算法,容易产生不稳定性——我们可能会投入大量计算资源和时间训练模型,最终却因超参数设置不当而导致性能不佳。正因如此,像REINFORCE和GRPO这类更简单的强化学习算法——甚至无需强化学习的DPO等技术——已成为PPO的热门替代方案。

LLM 中应用 RLHF 的一些常见思路

阶段输入输出目的
SFT(Supervised Fine-Tuning)人类编写的高质量指令与回答初始模型(policy π₀)教模型“会说话”
奖励模型训练(Reward Model, RM)**同一提示的多种模型回答 + 人类偏好排序奖励模型 r(x, y)教模型“分辨好坏”
强化学习优化(RL with PPO/GRPO)SFT 模型 + 奖励模型 + 参考模型对齐后的最终模型 π*教模型“更讨人喜欢”

REINFORCE&RLOO 算法优化

REINFORCE算法是一种基于蒙特卡洛采样的策略梯度(Policy Gradient)算法,属于原始策略梯度(Vanilla Policy Gradient, VPG)的具体实现形式。它通过直接优化参数化的策略网络,利用完整轨迹的累积回报(Return)估计策略梯度,进而更新策略参数以最大化期望奖励。

该算法具有实现开销低、原理直观的特点,在大语言模型的强化学习训练场景中表现出有效性。其核心优化逻辑与带基线的策略梯度估计一致,特别采用训练过程中观察到的奖励平均值(如滑动奖励平均值或当前批次内奖励的算术平均值)作为基线,在保证梯度估计无偏性的前提下,有效降低了梯度估计的方差。

为了计算批次数据的梯度更新,执行以下的步骤:

  1. 使用当前的策略$\pi_\theta$为每个提示词生成回答;
  2. 存储每个回答中token 的对数概率;
  3. 为每个回答计算奖励(通常使用奖励模型);
  4. 通过计算奖励的平均值来得到基线值 Baseline;
  5. 然后从奖励中减去 Basline 来计算优势;
  6. 计算每个回答的 log 概率与优势值的乘积之和,然后对批次求平均以形成蒙特卡洛估计;

REINFORCE在其表达式中包含了学习率作为一个“非负因子”,因为我们正在进行梯度上升并试图最大化奖励。

简单来实现一下 REINFORCE算法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import torch

#常数,例如KL散度比例
kl_beta = 0.1

#两个批次,每批次包含三个token的生成结果。
per_token_logprobs = torch.tensor(
    [
        [-12.3, -8.3, -2.3],
        [-10.0, -7.0, -3.0],
    ],
    requires_grad=True,
)
reference_per_token_logprobs = torch.tensor([
    [-11.3, -8.4, -2.0],
    [-9.5, -7.2, -2.8],
])

#1.计算 KL 散度(策略模型生成的 logprob - 参考模型的 logprob)
kl_div = per_token_logprobs - reference_per_token_logprobs
kl_div = -kl_beta * kl_div

#2.计算奖励
score_from_rm = torch.tensor([1.0, 0.5)] #这里假定第一个生成奖励为 1.0,第二个生成奖励为 0.5

#3.将奖励归因到最后一个 token,一般来说是<eos>
# 与此同时 KL 散度也是直接加入到奖励里面
per_token_reward = kl_div.clone() #奖励里面集成KL散度
per_token_reward[range(per_token_reward.size(0)), -1] += score_from_rm #集成到<eos>标记

# 计算完整序列上的 REINFORCE 更新
entire_completion_reward = per_token_reward.sum(dim=1)
baseline = entire_completion_reward.mean().detach()

# 计算优势
advantage = entire_completion_reward - baseline

# 计算loss和完成梯度更新
reinforce_loss = -per_token_logprobs.sum(dim=1) * advantage # 梯度上升,取负转化为梯度下降
reinforce_loss.mean().backward()

REINFORCE 留一法 RLOO

REINFORCE 算法中通过为每个提示生成单个on-policy 的结果,然后使用这些完成结果的奖励通过移动平均「之前所有问题回答的打分平均值」或批次内奖励平均值「当前一批问题里所有回答的打分平均值」来构建基线,而RLOO 的主要优化如下:

  1. 对于每个提示完成 多次(K)采样
  2. 使用这些多次采样的结果分别计算每个单独的提示的奖励平均值

简单举例:

核心是每个问题生成K个回答,然后用“留一法”算基线:
比如对某个问题生成了3个回答(K=3),打分分别是8分、7分、9分。

每个回答的基线,都是同问题下其他所有回答的平均分(公式里的1/(K-1)就是除以剩下的K-1个,求和j≠i就是排除自己)。

a. 算总奖励=8+7+9=24 → 平均奖励=24/3=8

b. 对每个生成i:–>只需要计算一次平均值

优势$A_i$= $(K/(K-1)) × (R_i - 平均奖励)$

A₁=3/(3-1) × (8-8)= 1.5×0=0

A₂=3/2 ×(7-8)= -1.5

A₃=3/2 ×(9-8)=1.5

Putting RL back in RLHF 阐述了一些 RLOO 的一些对比。

可以发现,RLOO比PPO节省50-70%的内存,运行速度快2-3倍。随着模型规模的增大,这些优势会更加明显。除了效率提升之外,RLOO与PPO相比具有相当竞争力,并且持续优于DPO等离线算法。这些结果证明了RLOO(以及REINFORCE)的核心价值主张——这些算法在保持在线强化学习算法性能优势的同时,实现更简单且运行成本更低。

简单来实现一下 RLOO 算法「直接在 REINFORCE 上进行修改」

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import torch

#常数,例如KL散度比例
kl_beta = 0.1

#两个批次,每批次包含3个提示词,并且每个提示词生成 3 次(K=3)
per_token_logprobs = torch.tensor(
    [
        # prompt 1
        [
            [-12.3, -8.3, -2.3], # 生成 1
            [-10.0, -7.0, -3.0], # 生成 2
            [-10.5, -12.2, -9.1], # 生成 3
        ],

        # prompt 2
        [
            [-11.0, -10.3, -1.3],
            [-11.1, -11.1, -0.8],   
            [-8.2, -11.9, -0.1],        

        ],
        
        # prompt 3
        [
            [-1.8, -2.1, -0.2],
            [-0.7, -3.5, -0.1],
            [-1.0, -2.2, -1.1],
        ],
    ],
    requires_grad=True,
)
reference_per_token_logprobs = torch.tensor([
    [
        [-11.8, -8.4, -2.3], 
        [-10.1, -7.2, -3.1],
        [-10.3, -12.9, -9.1],
    ],
    [
        [-11.8, -9.7, -1.3],
        [-12.3, -11.9, -0.2],
        [-8.1, -12.0, -0.5],
    ],
    [
        [-2.7, -2.0, -1.2],
        [-0.7, -3.6, -0.2],
        [-0.7, -1.2, -0.9],
    ],
])

#1.计算 KL 散度(策略模型生成的 logprob - 参考模型的 logprob)
kl_div = per_token_logprobs - reference_per_token_logprobs
kl_div = -kl_beta * kl_div

#2.计算奖励,并按组进行分配
score_from_rm = torch.tensor([
    [1, 2, 3], # 完成提示1的奖励,分别对应各个生成结果
    [2, 3, 4], # 完成提示2的奖励
    [3, 4, 5], # 完成提示3的奖励
]).float()

#3.将奖励归因到最后一个 token,一般来说是<eos>
# 与此同时 KL 散度也是直接加入到奖励里面
per_token_reward = kl_div.clone() #奖励里面集成KL散度
per_token_reward[:, :, -1] += score_from_rm #集成到<eos>标记

# 计算完整序列上的 REINFORCE 更新
entire_completion_reward = per_token_reward.sum(dim=-1)

# RLOO 的优化,在 Baseline 的计算部分,使用如上公式,只需计算一次平均值
baseline = (
    entire_completion_reward.sum(dim=-1)[:, None]
    - entire_completion_reward
) / (K - 1)
baseline = baseline.detach()

# 计算优势
advantage = entire_completion_reward - baseline

# 计算loss和完成梯度更新
reinforce_loss = -per_token_logprobs.sum(dim=1) * advantage # 梯度上升,取负转化为梯度下降
reinforce_loss.mean().backward()

GRPO/DAPO/GSPO 系列优化器

GRPO 算法

https://arxiv.org/abs/2402.03300

原文其实写的很详细了,GRPO 与 PPO 的区别

PPO的Critic 模型其实主要作用就是在计算优势函数$A_t$时是为了降低方差而被当做baseline,但是LLM的奖励模型的性质就决定了它只会为每个回答的最后一个token分配奖励$r$而其余的token的奖励都是0,就是因为这个性质,我们很难在每个token处训练出准确的价值函数。

因此 GRPO 决定去掉了 $V$函数,也就是 Critic 模型,在旧策略$\pi_\theta$中采样多个输出,将输出的奖励平均值作为 baseline 来降低方差。

将 PPO 的优化目标改写为,KL 散度也写在了目标函数里面,这与 REINFORCE 算法也不同。

$$ \begin{align*} \mathcal{J}_{GRPO}(\theta) &= \mathbb{E}[q \sim P(Q), \{o_i\}_{i=1}^G \sim \pi_{\theta_{old}}(O|q)] \\ &\frac{1}{G} \sum_{i=1}^G \frac{1}{|o_i|} \sum_{t=1}^{|o_i|} \left\{ \min \left[ \frac{\pi_{\theta}(o_{i,t}|q, o_{i,<t})}{\pi_{\theta, id}(o_{i,t}|q, o_{i,<t})} \hat{A}_{i,t}, \text{clip} \left( \frac{\pi_{\theta}(o_{i,t}|q, o_{i,<t})}{\pi_{\theta_{old}}(o_{i,t}|q, o_{i,<t})}, 1 - \epsilon, 1 + \epsilon \right) \hat{A}_{i,t} \right] - \beta \mathbb{D}_{KL} \left[ \pi_{\theta} \| \pi_{ref} \right] \right\} \end{align*} $$

$1/G ∑_{i=1}^G$ → 把G个回答的损失「平均化」,每个回答的权重一样(不管长短,每个回答占1/G)。

$1/|o_i| ∑_{t=1}^{|o_i|}$→ 对单个回答$o_i$,把里面所有词的损失加起来,再除以词数,得到「这个回答的平均词损失」。

与此同时,在计算 KL 散度时,也有一点区别,使用的是无偏估计

$$ \mathbb{D}_{KL}[\pi_{\theta}||\pi_{ref}]=\frac{\pi_{ref}(o_{i,t}|q,o_{i,<t})}{\pi_{\theta}(o_{i,t}|q,o_{i,<t})}-\log\frac{\pi_{ref}(o_{i,t}|q,o_{i,<t})}{\pi_{\theta}(o_{i,t}|q,o_{i,<t})}-1 $$

优势函数计算:

$$ \hat{A}_{i,t} = r_i - \frac{r_i - \text{mean}(r)}{\text{std}(r)} $$

GRPO 的伪代码实现「跟之前一样,实现一轮的策略更新过程」

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import torch
import torch.nn.functional as F

kl_beta = 0.1     # 控制 KL 惩罚项的权重 β
ppo_eps = 0.2     # PPO clip 参数 ε,用于限制策略更新幅度
group_size = G    # 每个 prompt 生成 G 个样本

# 1.生成候选输出
with torch.no_grad():
    completions = POLICY.generate(prompts, num_return_sequences=group_size)  # (B*G, L)
	# 计算每个生成序列的任务奖励 r_i(非学习式、任务规则)⭐️Deepseek 用的规则奖励 RLVR
    rewards = compute_task_reward(prompts, completions)  # (B*G, 1)
    # 组内标准化奖励,得到优势,公式: A_i = r_i - (r_i - mean(r)) / std(r)
    rewards = rewards.view(-1, group_size)
    mean_r, std_r = rewards.mean(1, keepdim=True), rewards.std(1, keepdim=True) + 1e-8
    advantages = rewards - (rewards - mean_r) / std_r
    advantages = advantages.view(-1, 1)  # (B*G, 1)

# 2. 获取旧策略与参考策略 log 概率 (per-token)
old_logps = get_logps(OLD_POLICY, prompts, completions)  # (B*G, L)
ref_logps = get_logps(REF_POLICY, prompts, completions)
logps = get_logps(POLICY, prompts, completions)

# 3. KL 正则 (token级) ⭐️GRPO 采用无偏估计
p_theta, p_ref = torch.exp(logps), torch.exp(ref_logps)
kl = (p_ref / p_theta) - torch.log(p_ref / p_theta) - 1

# 4. 计算策略比率与裁剪, 这里与 PPO 一致
# clipped: clip(π_θ / π_{θ_old}, 1-ε, 1+ε)
ratio = torch.exp(logps - old_logps)                      # 每个token独立
ratio_clipped = torch.clamp(ratio, 1 - ppo_eps, 1 + ppo_eps)

# 5. 计算损失
# J_GRPO = min(ratio_unclipped * Â, ratio_clipped * Â) - β * KL
# PyTorch 默认梯度下降,因此取负号
loss_per_token = -torch.min(ratio * advantages, ratio_clipped * advantages) + kl_beta * kl

# 6. 聚合损失,同时 mask 掉 padding
completion_mask = <... mask out padding tokens ...>
mask = completion_mask
loss = ((loss_per_token * mask).sum(-1) / mask.sum(-1)).mean()

# 7. 反向传播与参数更新
optimizer.zero_grad()
loss.backward()
optimizer.step()

DAPO 算法

https://arxiv.org/abs/2503.14476

在 GRPO 的基础上,主要修改:

DAPO的目标函数

遵循更高裁剪策略,DAPO将下裁剪和上裁剪范围解耦为 $\epsilon_{\text{low}}$ 和 $\epsilon_{\text{high}}$,DAPO增加 $\epsilon_{high}$ 的值,为低概率 Token 的增加留出更多空间。在论文实验中,这种调整有效地提高了策略的熵,促进了更多样化样本的生成。与此同时DAPO选择保持 $\epsilon_{low}$ 相对较小,因为增加它会最终抑制这些 Token 的概率,导致采样空间的崩塌。

具体来说,当 $\epsilon = 0.2$(大多数算法的默认值)时,考虑两个动作,其概率分别为 $\pi_{\text{data}}(o_i | q) = 0.01$ 和 $0.9$。更新后的最大可能概率分别为 $\pi(o_i | q) = 0.012$ 和 $1.08$。这意味着对于概率较高的 token(如 $0.9$),受到的约束较少。相反,对于低概率 token,要实现概率的显著增加要困难得多。

DAPO 的伪代码实现

GSPO 算法

https://arxiv.org/pdf/2507.18071v1

相较于 GRPO,GSPO 的修改在于直接将奖励计算从 token 级别转换到了 sequence 级别,解决了 LLM 过多关注 token 导致训练不稳定的问题。

GSPO 的优化目标,「可以发现,主要改动是在策略比率(也有人叫做重要性采样)

知乎上有句话说的好:原文

GSPO本质上是把GRPO优化目标里的T个$ratio_t * a$从算术平均改为了几何平均

$$ > \text{GRPO} = \frac{1}{n} \sum_{t=1}^{n} \text{ratio}_t > $$$$ > \text{GSPO} = \left( \prod_{t=1}^{n} \text{ratio}_t \right)^{\frac{1}{n}} = \sqrt[n]{\prod_{t=1}^{n} \text{ratio}_t} > $$

GSPO 论文中还提到一个变体,GSPO-token,主要是为了能够兼顾 token 级别的细粒度

GSPO-token:

image

这里的 sg 是 stop-gradient(detach)操作,为了在梯度上结合 token-level 的优势

换言之:

因此:

$$ > \text{数值上}:\quad s_{i,t}(\theta) = s_i(\theta) \times 1 = s_i(\theta) > $$

但在梯度传播时,只让整体句子比值的梯度 $s_i(\theta)$ 发挥作用。

理解这里可能涉及到 Pytorch(或其他自动微分系统中的训练流的分配),例如在 Pytorch 中:

  • 数值流(forward):负责计算损失的具体数值。
  • 梯度流(backward):负责反向传播更新参数。

.detach()​ 会保留数值,但切断梯度

ratio_token = seq_ratio_detach * (torch.exp(logps) / torch.exp(logps.detach()))

其中:seq_ratio_detach数值上是句子的比值$s_i(\theta)$,但梯度会被切断,不会反传;

torch.exp(logps) / torch.exp(logps.detach(),数值上横为 1,但梯度能够沿着 token 的 log 概率方向传

于是就实现了:

  • 模型“看到”的比值保持整句统一(数值一致)
  • 但每个 token 都能被独立更新(梯度分开走)。

GSPO 相较于 GRPO 改动并不大,可以从伪代码层面进行理解

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import torch
import torch.nn.functional as F

kl_beta = 0.1     # 控制 KL 惩罚项的权重 β
ppo_eps = 0.2     # PPO clip 参数 ε,用于限制策略更新幅度
group_size = G    # 每个 prompt 生成 G 个样本

# 1.生成候选输出
with torch.no_grad():
    completions = POLICY.generate(prompts, num_return_sequences=group_size)  # (B*G, L)
	# 计算每个生成序列的任务奖励 r_i(非学习式、任务规则)⭐️Deepseek 用的规则奖励 RLVR
    rewards = compute_task_reward(prompts, completions)  # (B*G, 1)
    # 组内标准化奖励,得到优势,公式: A_i = r_i - (r_i - mean(r)) / std(r)
    rewards = rewards.view(-1, group_size)
    mean_r, std_r = rewards.mean(1, keepdim=True), rewards.std(1, keepdim=True) + 1e-8
    advantages = rewards - (rewards - mean_r) / std_r
    advantages = advantages.view(-1, 1)  # (B*G, 1)

# 2. 获取旧策略与参考策略 log 概率 (per-token)
old_logps = get_logps(OLD_POLICY, prompts, completions)  # (B*G, L)
ref_logps = get_logps(REF_POLICY, prompts, completions)
logps = get_logps(POLICY, prompts, completions)

# 3. KL 正则 (token级) ⭐️GRPO 采用无偏估计
p_theta, p_ref = torch.exp(logps), torch.exp(ref_logps)
kl = (p_ref / p_theta) - torch.log(p_ref / p_theta) - 1

# ==== 以上与 GRPO 一致 ====

# 4. 计算策略比率与裁剪
# 这里计算句子级别的平均log_ratio
completion_mask = <... mask out padding tokens ...>
log_ratio = logps - old_logps  # (B*G, L)
mask = completion_mask #mask掉padding 部分
seq_log_ratio = (log_ratio * mask).sum(-1) / mask.sum(-1).clamp(min=1.0)  # 平均 log 概率差
seq_ratio = torch.exp(seq_log_ratio).unsqueeze(-1)  # (B*G,1)

# clip 裁剪操作应用于整个句子
seq_ratio_clipped = torch.clamp(seq_ratio, 1 - ppo_eps, 1 + ppo_eps)

# 5. 广播回所有token,让每个 token 都具有相同的策略权重,这就是与 GRPO 的关键不同
ratio = seq_ratio.expand_as(logps)
ratio_clipped = seq_ratio_clipped.expand_as(logps)

#===== GSPO变体 GSPO-token =====

# GSPO-token variant
# seq_ratio 仍然是 (B*G, 1)
# A_token 是 (B*G, L),代表每个 token 的个体优势
seq_ratio_detach = seq_ratio.detach()  # sg[s_i(θ)]

# token-wise ratio with stop-gradient trick
ratio_token = seq_ratio_detach * (torch.exp(logps) / torch.exp(logps.detach()))
ratio_clipped = torch.clamp(ratio_token, 1 - eps, 1 + eps)

loss = -torch.min(ratio_token * A_token, ratio_clipped * A_token) + beta * KL

#===== ======== =====

# 6. 计算损失(GSPO 是sequence 级损失)
# J_GRPO = min(ratio_unclipped * Â, ratio_clipped * Â) - β * KL
# PyTorch 默认梯度下降,因此取负号
loss_per_token = -torch.min(ratio * advantages, ratio_clipped * advantages) + kl_beta * kl
loss = ((loss_per_token * mask).sum(-1) / mask.sum(-1)).mean()

# 7. 反向传播与参数更新
optimizer.zero_grad()
loss.backward()
optimizer.step()

verl代码解析

(外层库结构)代码架构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
.
├── CONTRIBUTING.md
├── docker //docker 环境下的部署文件
├── docs //相关文档
├── examples //一些官方的运行与训练示例
├── LICENSE
├── Notice.txt
├── pyproject.toml
├── README.md
├── recipe //一些官方的运行与训练示例
├── requirements-cuda.txt
├── requirements-npu.txt
├── requirements_sglang.txt
├── requirements_transferqueue.txt
├── requirements.txt
├── scripts //官方的一些脚本集合
├── setup.py
├── tests
└── verl //verl 关键代码

(内层主要结构)代码结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
.
├── base_config.py
├── experimental
├── __init__.py
├── interactions
├── model_merger
├── models
├── protocol.py
├── py.typed
├── single_controller
├── third_party
├── tools
├── trainer
├── utils
├── version
└── workers

verl 框架的训练流程

由于 verl 的框架代码infra 含量过高,我主要针对其 PPO 的流程进行梳理和分析,主要流程以及涉及代码。

一些关键参数

参数含义
data.train_batch_size用于生成一组采样轨迹/推演的全局提示批次大小。响应/轨迹数量为 data.train_batch_size * actor_rollout.ref.rollout.n
actor_rollout_ref.actor.ppo_mini_batch_size采样轨迹集被分割为多个小批次,其批次大小=ppo_mini_batch_size,用于PPO的 actor 模型的更新。该ppo_mini_batch_size是所有工作节点间统一的全局尺寸。
critic.ppo_mini_batch_size采样轨迹集被分割成多个小批次,批次大小=ppo_mini_batch_size,用于PPO critic 模型的更新。ppo_mini_batch_size是所有工作进程间的全局统一尺寸。
actor_rollout_ref.actor.clip_ratioPPO裁剪比率。默认值为0.2
actor_rollout_ref.actor.ppo_epochsactor model 在一组采样生成中多久更新一次
critic.ppo_epochscritic model 在一组采样生成中多久更新一次,默认为actor_rollout_ref.actor.ppo_epochs。
algorithm.gemma折扣因子
algorithm.lam计算 GAE 时平衡偏差和方差的 lambda 项
algorithm.adv_estimator优势估计器(计算优势函数的方式),目前verl 支持gae、grpo、reinforce_plus_plus、reinforce_plus_plus_baseline、rloo
KL 散度控制
actor_rollout_ref.actor.use_kl_loss在 actor 模型损失函数中加入 kl 散度
actor_rollout_ref.actor.kl_loss_coefkl损失的系数。默认值为0.001
actor_rollout_ref.actor.kl_loss_type支持 kl(k1)、abs、mse(k2)、low_var_kl(k3) 和 full。在末尾添加“+”(例如“k1+”和“k3+”)将应用直通估计器,无论 kl 值估计如何都使用 k2 进行无偏梯度估计
也可以在奖励中加入 kl 散度
algorithm.use_kl_in_reward是否在奖励中启用KL惩罚。默认为False
algorithm.kl_penalty支持kl(k1)、abs、mse(k2)、low_var_kl(k3)和full。这定义了计算演员策略与参考策略之间kl散度的方法。具体选项请参考core_algos.py中的kl_penalty。详细分析请参阅此博客文章:http://joschu.net/blog/kl-approx.html
algorithm.kl_ctrl.kl_coef(初始)奖励内kl惩罚系数。默认值为0.001。
algorithm.kl_ctrl.type‘fixed’ 对应 FixedKLController,‘adaptive’ 对应 AdaptiveKLController
algorithm.kl_ctrl.horizon详见AdaptiveKLController源代码
algorithm.kl_ctrl.target_kl详见AdaptiveKLController源代码

用户配置文件

./examples目录中提供了许多 PPO/GRPO等优化算法的配置文件的实现。

run_gemma.sh为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
python3 -m verl.trainer.main_ppo \
    algorithm.adv_estimator=gae \
    data.train_files=$HOME/data/gsm8k/train.parquet \
    data.val_files=$HOME/data/gsm8k/test.parquet \
    data.train_batch_size=512 \
    data.max_prompt_length=1024 \
    data.max_response_length=512 \
    data.filter_overlong_prompts=True \
    data.truncation='error' \
    actor_rollout_ref.model.path=google/gemma-2-2b-it \
    actor_rollout_ref.actor.optim.lr=1e-6 \
    actor_rollout_ref.model.use_remove_padding=False \
    actor_rollout_ref.actor.ppo_mini_batch_size=128 \
    actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \
    actor_rollout_ref.actor.fsdp_config.param_offload=False \
    actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \
    actor_rollout_ref.actor.use_kl_loss=False \
    actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=4 \
    actor_rollout_ref.rollout.tensor_model_parallel_size=2 \
    actor_rollout_ref.rollout.name=vllm \
    actor_rollout_ref.rollout.gpu_memory_utilization=0.4 \
    critic.optim.lr=1e-5 \
    critic.model.use_remove_padding=False \
    critic.model.path=google/gemma-2-2b-it \
    critic.model.enable_gradient_checkpointing=False \
    critic.ppo_micro_batch_size_per_gpu=4 \
    critic.model.fsdp_config.param_offload=False \
    critic.model.fsdp_config.optimizer_offload=False \
    algorithm.use_kl_in_reward=False \
    trainer.critic_warmup=0 \
    trainer.logger='["console","wandb"]' \
    trainer.project_name='verl_example' \
    trainer.experiment_name='gemma2b_function_rm' \
    trainer.n_gpus_per_node=2 \
    trainer.nnodes=1 \
    trainer.save_freq=20 \
    trainer.test_freq=10 \
    trainer.total_epochs=15 $@

入口函数 main_ppo.py

verl 把 RL 训练过程看成是一个数据流的过程,一个完整的训练需要完成这些步骤:

  1. 定义数据,这个由RLHFDataset​定义,至少包含一个字段prompt​,定义数据的教程
  2. 为数据集绑定奖励函数,规则类奖励模式还是说独立的奖励模型,verl 提供了两种定义示例,规则类方法(GSM8k),包含了一个_select_rm_score_fn方法,奖励模型类型full_hh_rlhf
  3. 定义工作类,worker 节点定义;
  4. 构建数据流/训练流,也就是定义角色与 worker 类之间的映射关系,以及角色与worker 类(这个 verl 预先实现了)
  5. 定义资源池 ID 和资源池规格;
  6. 完成奖励函数/模型,以及初始化 PPO 训练器

我们对 main_ppo.py​进行分析,完整代码

  1. Hydra 入口,VERL 使用 Hydra 管理配置。执行 python main_ppo.py​ 时,Hydra 会自动加载 config/ppo_trainer.yaml​,并注入配置对象 config
1
2
3
@hydra.main(config_path="config", config_name="ppo_trainer", version_base=None)
def main(config):
    run_ppo(config)

Hydra 的好处是:模块化配置、多层合并、命令行覆盖参数,可适配复杂分布式训练场景。

  1. verl 数据加载与采样

    • create_rl_dataset数据集的构建
    • 自定义数据集加载,load_extern_type:通过模块路径字符串加载外部 Python 类,这个在配置文件中定义 data: path_cls 和 name
    • 动态生成数据模式,当训练阶段配置了 datagen​,会启用 DynamicGenDataset,这类数据集会在每个 epoch 动态生成样本(例如模型自生成 Prompt-Response 对)
    • 都没配置,那就使用默认的 RLHF 模式
    • 最后不管使用哪种模式,都会被写入为,tokenizer 将文本转化为 token,如果是多模态数据还会使用到 processor,config 用于调控采样、清洗、最大长度等
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    
    def create_rl_dataset(data_paths, data_config, tokenizer, processor, is_train=True, max_samples: int = -1):
        from torch.utils.data import Dataset
        from verl.utils.dataset.rl_dataset import RLHFDataset #默认加载RLHF模式的数据集
    
        # 检查数据配置中是否指定了自定义数据集类,以及是否提供了自定义类的路径
        if "custom_cls" in data_config and data_config.custom_cls.get("path", None) is not None:
            # 动态加载自定义数据集类
            dataset_cls = load_extern_type(data_config.custom_cls.path, data_config.custom_cls.name)
            if not issubclass(dataset_cls, Dataset):
                raise TypeError(
                    f"The custom dataset class '{data_config.custom_cls.name}' from "
                    f"'{data_config.custom_cls.path}' must inherit from torch.utils.data.Dataset"
                )
        elif "datagen" in data_config and data_config.datagen.get("path", None) is not None and is_train:
            from verl.utils.dataset.dynamicgen_dataset import DynamicGenDataset
    
            dataset_cls = DynamicGenDataset
            print("Using DynamicGenDataset for data generation.")
        else:
            dataset_cls = RLHFDataset
        print(f"Using dataset class: {dataset_cls.__name__}")
    
        dataset = dataset_cls(
            data_files=data_paths,
            tokenizer=tokenizer,
            processor=processor,
            config=data_config,
            max_samples=max_samples,
        )
        return dataset
    

    create_rl_sampler:采样器与课程学习,主要支持三种采样方式

    策略条件实现类说明
    Curriculum Sampler指定data_config.sampler.class_path用户自定义类(继承AbstractSampler可动态调整样本顺序,如从简单到复杂
    RandomSamplerdata_config.shuffle=TruePyTorch 内置随机打乱样本顺序
    SequentialSamplerPyTorch 内置按原始顺序迭代样本
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    
    def create_rl_sampler(data_config, dataset):
        import torch
        from torch.utils.data import RandomSampler, SequentialSampler
    
        if data_config.sampler is not None and data_config.sampler.get("class_path", None) is not None: #课程学习采样,难度递进(easy->hard),要求 dataloader_num_workers == 0,防止缓存导致样本顺序混乱,通过 AbstractSampler 抽象类,将 curriculum learning 机制内置为一等功能,不再依赖外部脚本实现。
            curriculum_class = load_extern_type(
                data_config.sampler.class_path,
                data_config.sampler.class_name,
            )
            sampler = curriculum_class(
                data_source=dataset,
                data_config=data_config,
            )
            assert isinstance(sampler, AbstractSampler)
            assert data_config.get("dataloader_num_workers", 8) == 0, (
                "If using curriculum, num_workers must be 0 to prevent data caching. "
                "If the dataloader caches data before the batch is done the "
                "curriculum sampler won't have the opportunity to reorder it. "
            )
    
        # 使用采样器以方便检查点的恢复。
        # 若数据配置中启用了随机打乱功能,则创建一个随机采样器。
        elif data_config.shuffle: #随机采样
            train_dataloader_generator = torch.Generator()
            seed = data_config.get("seed")
            if seed is not None:
                train_dataloader_generator.manual_seed(seed)
            sampler = RandomSampler(data_source=dataset, generator=train_dataloader_generator)
        else: #原始顺序
            # 如果禁用随机打乱功能,则使用顺序采样器按顺序遍历数据集。
            sampler = SequentialSampler(data_source=dataset)
        return sampler
    
  2. 训练主流程入口 run_ppo(config)

    主要完成的工作有;初始化 Ray 分布式集群(单控制器架构)、构建 TaskRunner、执行远程训练任务以及完成性能日志追踪等

1
2
3
4
5
6
7
8
9
#代码鲁棒性检查比较多,将核心逻辑可以简化如下
def run_ppo(config):
    if not ray.is_initialized():
        env = get_ppo_ray_runtime_env() #get_ppo_ray_runtime_env():定义一组运行时环境变量(如 TOKENIZERS_PARALLELISM, NCCL_DEBUG, VLLM_LOGGING_LEVEL)
        ray.init(runtime_env=env)

    task_runner_class = ray.remote(num_cpus=1)(TaskRunner)
    runner = task_runner_class.remote()
    ray.get(runner.run.remote(config))

相比 OpenRLHF 的单机单脚本实现,VERL 在主函数层面已经显式切入 分布式多角色训练 模式。

  1. 核心类:TaskRunner

    TaskRunner​ 是 VERL 中的核心执行类,用于在 Ray 集群上分配各类 Worker(Actor、Critic、Reward、RefPolicy) 并调度训练任务。

    TaskRunner 通过一系列 add_*_worker 方法动态注册角色。

1
2
3
4
class TaskRunner:
    def __init__(self):
        self.role_worker_mapping = {} # role_worker_mapping: 角色名 → Worker 类的 Ray 封装(ray.remote(worker_class))
        self.mapping = {} #角色名 → GPU 资源池标识(如 global_pool, reward_pool)
  1. 定义工作类,PPO 的四大组成成分

    • 注册 Actor,支持 fsdp 和 megatron 并行策略加载,使用 ray 远程加载,由 Role 枚举管理角色类型。

      1
      2
      3
      4
      5
      6
      
      def add_actor_rollout_worker(self, config):
          if config.actor_rollout_ref.actor.strategy in {"fsdp", "fsdp2"}:
              from verl.workers.fsdp_workers import ActorRolloutRefWorker
          elif config.actor_rollout_ref.actor.strategy == "megatron":
              from verl.workers.megatron_workers import ActorRolloutRefWorker
          self.role_worker_mapping[Role.ActorRollout] = ray.remote(ActorRolloutRefWorker)
      
    • 注册 Critic

      1
      2
      3
      4
      5
      6
      
      def add_critic_worker(self, config):
          if config.critic.strategy in {"fsdp", "fsdp2"}:
              from verl.workers.fsdp_workers import CriticWorker
          elif config.critic.strategy == "megatron":
              from verl.workers.megatron_workers import CriticWorker
          self.role_worker_mapping[Role.Critic] = ray.remote(CriticWorker)
      
    • 注册 Reward Model

      1
      2
      3
      4
      
      def add_reward_model_worker(self, config):
          if config.reward_model.enable:
              from verl.workers.fsdp_workers import RewardModelWorker
              self.role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker)
      
    • 注册 Reference Policy

      1
      2
      3
      
      def add_ref_policy_worker(self, config, ref_policy_cls):
          if config.algorithm.use_kl_in_reward or config.actor_rollout_ref.actor.use_kl_loss:
              self.role_worker_mapping[Role.RefPolicy] = ray.remote(ref_policy_cls)
      
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    
    role_worker_mapping = {
        Role.ActorRollout: ActorRolloutRefWorker,
        Role.Critic: CriticWorker,
        Role.RefPolicy: ActorRolloutRefWorker
    }
    
    global_pool_id = 'global_pool'
    resource_pool_spec = {
        global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes,
    }
    mapping = {
        Role.ActorRollout: global_pool_id,
        Role.Critic: global_pool_id,
        Role.RefPolicy: global_pool_id,
    }
    
  2. 资源池管理与初始化

    verl 不直接绑定 GPU 设备,而是通过资源池抽象:

    这样支持多节点、多池的结构,Reward Model 可使用独立 GPU 池,ResourcePoolManager 动态分配 GPU,统一调度。

    1
    2
    3
    4
    
    def init_resource_pool_mgr(self, config):
        global_pool_id = "global_pool"
        resource_pool_spec = {global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes}
        resource_pool_manager = ResourcePoolManager(resource_pool_spec, mapping=self.mapping)
    
  3. 训练主流程 run(config)

    • 打印配置与环境信息

      1
      
      pprint(OmegaConf.to_container(config, resolve=True))
      
    • 注册全部角色

      1
      2
      3
      4
      
      actor_rollout_cls, ray_worker_group_cls = self.add_actor_rollout_worker(config)
      self.add_critic_worker(config)
      self.add_reward_model_worker(config)
      self.add_ref_policy_worker(config, actor_rollout_cls)
      
    • 配置验证

      1
      
      validate_config(config, use_reference_policy=need_reference_policy(...), use_critic=need_critic(config))
      
    • 加载模型与 tokenizer

      1
      2
      3
      
      local_path = copy_to_local(config.actor_rollout_ref.model.path)
      tokenizer = hf_tokenizer(local_path)
      processor = hf_processor(local_path)
      
    • 加载奖励函数

      1
      2
      
      reward_fn = load_reward_manager(config, tokenizer)
      val_reward_fn = load_reward_manager(config, tokenizer, num_examine=1)
      
    • 创建数据集与采样器

      1
      2
      
      train_dataset = create_rl_dataset(config.data.train_files, ...)
      train_sampler = create_rl_sampler(config.data, train_dataset)
      
    • 构建 PPO Trainer

      verl 的 Trainer 被抽象为 远程多进程任务协调器,PPO 的核心逻辑(如策略损失、价值函数、优势估计)则封装在 RayPPOTrainer 中。

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      
      trainer = RayPPOTrainer(
          config=config,
          tokenizer=tokenizer,
          processor=processor,
          role_worker_mapping=self.role_worker_mapping,
          resource_pool_manager=resource_pool_manager,
          reward_fn=reward_fn,
          ...
      )
      trainer.init_workers()
      trainer.fit()
      

训练器 ray_trainer.py

完整代码

verl 的 RayPPOTrainer​ 是一个 分布式 PPO 训练器, 囊括了PPO 训练的各个步骤,主要职责包括:

首先先看看,主要的类和函数

模块 / 函数作用
ResourcePoolManager管理 GPU 资源池(节点 → GPU 数)映射与调度
apply_kl_penalty()计算 KL 散度惩罚,用于在奖励层面控制策略更新幅度
compute_response_mask()生成响应部分的 attention mask
compute_advantage()根据不同算法计算优势函数(GAE、GRPO、REINFORCE 等)
RayPPOTrainer核心类,负责分布式 PPO 训练的完整生命周期
  1. ResourcePoolManager:GPU 资源管理器

    参数名类型说明
    resource_pool_specdict[str, list[int]]每个节点的 GPU 分布(如{ "global_pool": [8, 8] }
    mappingdict[Role, str]角色与资源池名称的映射(如ActorRollout → global_pool
    resource_pool_dictdict[str, RayResourcePool]具体的 Ray GPU 资源池对象

    核心代码

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    @dataclass
    class ResourcePoolManager:
        def create_resource_pool(self):
            for pool_name, process_on_nodes in self.resource_pool_spec.items():
                resource_pool = RayResourcePool(
                    process_on_nodes=process_on_nodes, use_gpu=True,
                    max_colocate_count=1, name_prefix=pool_name
                )
                self.resource_pool_dict[pool_name] = resource_pool
            self._check_resource_available()
    

    RayResourcePool​ 是 VERL 封装的 GPU 资源抽象,每个 pool 对应一组节点和 GPU。其中FSDP 模式下 max_colocate_count=1 表示所有 worker 合并为一个进程,而Megatron 模式可设置更高以支持 pipeline 并行。

  2. apply_kl_penalty KL 惩罚项

    主要参数:

    参数名类型说明
    dataDataProto包含生成序列的 logits、奖励、掩码等数据结构
    kl_ctrlAdaptiveKLController动态 KL 系数控制器
    kl_penaltystrKL 惩罚类型(默认"kl"

    核心代码

    1
    2
    3
    4
    5
    6
    
    kld = core_algos.kl_penalty(
        data.batch["old_log_probs"], data.batch["ref_log_prob"], kl_penalty=kl_penalty
    )
    token_level_rewards = token_level_scores - beta * kld
    # 这部分的代码参考自 trl: https://github.com/huggingface/trl/blob/951ca1841f29114b969b57b26c7d3e80a39f75a0/trl/trainer/ppo_trainer.py#L837
    kl_ctrl.update(current_kl=current_kl, n_steps=batch_size)
    

    计算新旧策略之间的 token-level KL 散度;根据动态系数 β(由 AdaptiveKLController 控制)惩罚奖励;

  3. compute_response_mask:响应掩码生成

    在 RLHF 中,输入序列通常为 [prompt + response]​,该函数的主要目的是提取生成部分的attention_mask,用于区分哪些 token 属于动作(需优化),哪些是 prompt(仅上下文),这样后续的 reward、advantage、loss 都只作用于 response 区域

    1
    2
    3
    4
    5
    
    def compute_response_mask(data):
        responses = data.batch["responses"]
        response_length = responses.size(1)
        attention_mask = data.batch["attention_mask"]
        return attention_mask[:, -response_length:]
    
  4. compute_advantage:优势函数估计模块

    • GAE 模式:标准 PPO 方式,通过时间差(TD-error)累计获得平滑优势:

      $$ A_t = \delta_t + (\gamma \lambda) \delta_{t+1} + ... $$
    • GRPO 模式(Group Relative Policy Optimization):基于多样本比较的相对优势:「GRPO, DAPO, GSPO, REINFORCE/RLOO 都移除了 Critic 模型」

      • 对同一 Prompt 的多个 Response 比较相对得分;
      • 不依赖 Critic;
      • 常用于无 Value Model 的偏好学习。
    • 其他模式,core_algos.get_adv_estimator_fn(adv_estimator)来自定义优势函数

    主要参数:

    参数类型说明
    dataDataProto训练样本数据
    adv_estimatorAdvantageEstimator优势估计方法(GAE、GRPO、REINFORCE 等)
    gammafloat折扣因子
    lamfloatGAE 中的 λ 参数
    num_repeatint每个 prompt 生成样本数
    norm_adv_by_std_in_grpobool是否在 GRPO 中标准化优势
    configAlgoConfig算法配置

    核心代码

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    
    if adv_estimator == AdvantageEstimator.GAE:
        advantages, returns = core_algos.compute_gae_advantage_return(
            token_level_rewards=data.batch["token_level_rewards"],
            values=data.batch["values"],
            response_mask=data.batch["response_mask"],
            gamma=gamma, lam=lam
        )
    elif adv_estimator == AdvantageEstimator.GRPO:
        advantages, returns = core_algos.compute_grpo_outcome_advantage(
            token_level_rewards=data.batch["token_level_rewards"],
            response_mask=data.batch["response_mask"],
            index=data.non_tensor_batch["uid"],
            norm_adv_by_std_in_grpo=norm_adv_by_std_in_grpo,
        )
    else:
        adv_estimator_fn = core_algos.get_adv_estimator_fn(adv_estimator)
        adv_kwargs = {
            "token_level_rewards": data.batch["token_level_rewards"],
            "response_mask": data.batch["response_mask"],
            "config": config,
        }
        if "uid" in data.non_tensor_batch:  # 可选
            adv_kwargs["index"] = data.non_tensor_batch["uid"]
        if "reward_baselines" in data.batch:  # 可选
            adv_kwargs["reward_baselines"] = data.batch["reward_baselines"]
        advantages, returns = adv_estimator_fn(**adv_kwargs)
        data.batch["advantages"] = advantages
        data.batch["returns"] = returns
    
  5. RayPPOTrainer:主训练类

    参数表(包含构造函数)

    参数名类型说明
    configOmegaConf配置文件(Hydra 格式)
    tokenizerHF Tokenizer用于文本编码与解码
    role_worker_mappingdict[Role, WorkerType]各角色对应的 Ray Worker 类
    resource_pool_managerResourcePoolManagerGPU 资源池管理器
    ray_worker_group_clsRayWorkerGroupWorkerGroup 类(默认)
    processoroptional处理多模态数据的 Processor
    reward_fncallable训练阶段奖励函数
    val_reward_fncallable验证阶段奖励函数
    train_dataset​,val_datasetDataset数据集
    collate_fncallableBatch 拼接函数
    train_samplerSampler数据采样器
    device_namestr设备名(cuda / cpu)
    • 初始化逻辑

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      
      def __init__(...):
          self.tokenizer = tokenizer
          self.processor = processor
          self.config = config
          self.reward_fn = reward_fn
          self.val_reward_fn = val_reward_fn
      
          self.hybrid_engine = config.actor_rollout_ref.hybrid_engine
          assert self.hybrid_engine, "Currently, only support hybrid engine"
      
          if self.hybrid_engine:
              assert Role.ActorRollout in role_worker_mapping, f"{role_worker_mapping.keys()=}"
      
          self.role_worker_mapping = role_worker_mapping
          self.resource_pool_manager = resource_pool_manager
          self.use_reference_policy = need_reference_policy(self.role_worker_mapping)
          self.use_rm = need_reward_model(self.role_worker_mapping)
          self.use_critic = need_critic(self.config)
          self.ray_worker_group_cls = ray_worker_group_cls
          self.device_name = device_name if device_name else self.config.trainer.device
          self.validation_generations_logger = ValidationGenerationsLogger(
              project_name=self.config.trainer.project_name,
              experiment_name=self.config.trainer.experiment_name,
          )
      
      	# 若ref_in_actor为True,则参考策略将采用未应用LoRA的原来的 actor 模型。
          self.ref_in_actor = (
              config.actor_rollout_ref.model.get("lora_rank", 0) > 0
              or config.actor_rollout_ref.model.get("lora_adapter_path") is not None
          )
      
          # 在奖励中控制 KL 散度
          # 目前不支持在损失中控制 KL 散度
          if self.config.algorithm.use_kl_in_reward:
              self.kl_ctrl_in_reward = core_algos.get_kl_controller(self.config.algorithm.kl_ctrl)
      
          self._create_dataloader(train_dataset, val_dataset, collate_fn, train_sampler)
      

      核心要点:

      1. 目前只支持 Hybrid Engine 模式,当前还没有实现论文中的 Hybrid3D flow
      2. 加载日志工具,以及完成动态确定是否加载模型,例如 Reference Model、Reward Model 和 Critic Model
    • 数据加载与分布式 DataLoader

      该部分完成数据集创建、采样器创建、DataLoader 实例化以及计算总训练步数等

      核心代码:

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      
      def _create_dataloader(self, train_dataset, val_dataset, collate_fn, train_sampler):
      
      	#调用前面的 create_rl_dataset()自动选择合适的数据集类,包含 RLHF 类型数据集以及 DynamicGenDataset 动态生成的数据集
      	train_dataset = create_rl_dataset(self.config.data.train_files, ...)
      	val_dataset = create_rl_dataset(self.config.data.val_files, ...)
      
      	#创建采样器
      	train_sampler = create_rl_sampler(self.config.data, self.train_dataset)
      
      	#Dataloader 实例化
      	self.train_dataloader = StatefulDataLoader(dataset=self.train_dataset,
          	batch_size=self.config.data.get("gen_batch_size", self.config.data.train_batch_size),
          	num_workers=num_workers,
          	drop_last=True, collate_fn=collate_fn, sampler=train_sampler)
      	#StatefulDataLoader 是 VERL 自定义版本,支持:
      	#  - 保存/加载迭代状态(断点恢复);
      	#  - 动态调整采样;
      
      	#计算总训练步数,并写回配置供优化器调度。
      	total_training_steps = len(self.train_dataloader) * self.config.trainer.total_epochs
      
    • Worker 初始化

      负责用 Ray 初始化所有分布式工作进程(Worker Groups)。为不同角色(Actor、Critic、Reference、Reward Model)分配资源池并远程启动模型实例,实现多 GPU 并行执行。

      核心代码:

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      
      def init_workers(self):
      	self.resource_pool_manager.create_resource_pool() #按配置在各节点组装 Ray 资源配额(GPU 数、放置策略)
      	# 把每个角色(ActorRollout/Critic/Ref/RM)绑定到其进程入口类和初始化参数。Ray 会在远端据此构造对象。
      	self.resource_pool_to_cls = {pool: {} for pool in self.resource_pool_manager.resource_pool_dict.values()} 
      
      	# 注册各角色到资源池
      	rp = self.resource_pool_manager.get_resource_pool(Role.ActorRollout)
      	self.resource_pool_to_cls[rp][str(Role.ActorRollout)] = RayClassWithInitArgs(
          	cls=self.role_worker_mapping[Role.ActorRollout],
          	config=self.config.actor_rollout_ref,
          	role=str(Role.ActorRollout),
      	)
      if self.use_critic:
          rp = self.resource_pool_manager.get_resource_pool(Role.Critic)
          self.resource_pool_to_cls[rp][str(Role.Critic)] = RayClassWithInitArgs(
              cls=self.role_worker_mapping[Role.Critic],
              config=omega_conf_to_dataclass(self.config.critic),
      )
      if self.use_reference_policy:
          rp = self.resource_pool_manager.get_resource_pool(Role.RefPolicy)
          self.resource_pool_to_cls[rp][str(Role.RefPolicy)] = RayClassWithInitArgs(
              self.role_worker_mapping[Role.RefPolicy],
              config=self.config.actor_rollout_ref,
              role=str(Role.RefPolicy),
      )
      if self.use_rm:
          rp = self.resource_pool_manager.get_resource_pool(Role.RewardModel)
          self.resource_pool_to_cls[rp][str(Role.RewardModel)] = RayClassWithInitArgs(
              self.role_worker_mapping[Role.RewardModel],
              config=self.config.reward_model,
      )
      
      # spawn WorkerGroup
      wg_kwargs = {"device_name": self.device_name}
      all_wg = {}
      for rp, class_dict in self.resource_pool_to_cls.items():
          worker_dict_cls = create_colocated_worker_cls(class_dict=class_dict) #把同一进程内的多个角色封装到一个“共址容器类”,减少重复 CUDA/通信上下文。Colocate 就是放在一起共享 GPU
          wg_dict = self.ray_worker_group_cls(resource_pool=rp, ray_cls_with_init=worker_dict_cls, **wg_kwargs)
          all_wg.update(wg_dict.spawn(prefix_set=class_dict.keys()))
      
      # 依次 init_model;Actor 模型是肯定有的
      # 让每个远端角色加载权重与优化器状态。Actor 放最后,同时给 vLLM 预估 KV cache 留空间。
      if self.use_critic:
          self.critic_wg = all_wg[str(Role.Critic)]; self.critic_wg.init_model()
      if self.use_reference_policy and not self.ref_in_actor:
          self.ref_policy_wg = all_wg[str(Role.RefPolicy)]; self.ref_policy_wg.init_model()
      if self.use_rm:
          self.rm_wg = all_wg[str(Role.RewardModel)]; self.rm_wg.init_model()
      self.actor_rollout_wg = all_wg[str(Role.ActorRollout)]; self.actor_rollout_wg.init_model()
      
      # 异步 rollout
      self.async_rollout_mode = (self.config.actor_rollout_ref.rollout.mode == "async")
      if self.async_rollout_mode:
          from verl.experimental.agent_loop import AgentLoopManager #异步,统筹 rollout 与 RM 调度,提升吞吐。
          self.async_rollout_manager = AgentLoopManager(config=self.config, worker_group=self.actor_rollout_wg, rm_wg=self.rm_wg)
      
    • 保存模型检查点_save_checkpoint()

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      
      def _save_checkpoint(self):
          root = os.path.join(self.config.trainer.default_local_dir, f"global_step_{self.global_steps}") #以 global_step_X 为目录组织快照。
          actor_local = os.path.join(root, "actor")
          actor_remote = (None if self.config.trainer.default_hdfs_dir is None
                          else os.path.join(self.config.trainer.default_hdfs_dir, f"global_step_{self.global_steps}", "actor"))
      
      	#调用远端 save_checkpoint,由各 Worker 在其进程内落盘(可选同时写远端存储)。
          self.actor_rollout_wg.save_checkpoint(actor_local, actor_remote, self.global_steps,
                                                max_ckpt_to_keep=self.config.trainer.get("max_actor_ckpt_to_keep"))
      
          if self.use_critic:
              critic_local = os.path.join(root, str(Role.Critic))
              critic_remote = (None if self.config.trainer.default_hdfs_dir is None
                               else os.path.join(self.config.trainer.default_hdfs_dir, f"global_step_{self.global_steps}", str(Role.Critic)))
              self.critic_wg.save_checkpoint(critic_local, critic_remote, self.global_steps,
                                             max_ckpt_to_keep=self.config.trainer.get("max_critic_ckpt_to_keep"))
      
          torch.save(self.train_dataloader.state_dict(), os.path.join(root, "data.pt")) #保存 dataloader.state_dict(),以便断点续训恢复迭代位置。
          with open(os.path.join(self.config.trainer.default_local_dir, "latest_checkpointed_iteration.txt"), "w") as f:
              f.write(str(self.global_steps)) #写 latest_checkpointed_iteration.txt 便于“自动恢复最新”
      
    • 读取模型检查点_load_checkpoint

      三种模式:disable​(不恢复)、auto​(找最新)、resume_path(指定路径)。

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      
      def _load_checkpoint(self):
          if self.config.trainer.resume_mode == "disable":
              self.actor_rollout_wg.load_checkpoint(None); return 0
      
          base = self.config.trainer.default_local_dir
          if not os.path.isabs(base): base = os.path.join(os.getcwd(), base)
      	#找最新的模型检查点
          latest = (find_latest_ckpt_path(base) if self.config.trainer.resume_mode == "auto"
                    else os.path.join(os.getcwd(), self.config.trainer.resume_from_path))
      
      	#解析 step 编号,设置 self.global_steps,分别让 Actor/Critic 远端加载。
          self.global_steps = int(str(latest).split("global_step_")[-1])
          actor_path  = os.path.join(latest, "actor")
          critic_path = os.path.join(latest, str(Role.Critic))
      
          self.actor_rollout_wg.load_checkpoint(actor_path, del_local_after_load=self.config.trainer.del_local_ckpt_after_load)
          if self.use_critic:
              self.critic_wg.load_checkpoint(critic_path, del_local_after_load=self.config.trainer.del_local_ckpt_after_load)
      
      	#本地恢复 dataloader 状态
          data_path = os.path.join(latest, "data.pt")
          if os.path.exists(data_path):
              self.train_dataloader.load_state_dict(torch.load(data_path, weights_only=False))
      
    • _start_profiling​/_stop_profiling

      统一向各远端 Worker 发送“开始/结束”性能采集命令,保证时间窗一致,便于跨进程关联分析。

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      
          def _start_profiling(self, do_profile: bool) -> None:
              """Start profiling for all worker groups if profiling is enabled."""
              if do_profile:
                  self.actor_rollout_wg.start_profile(role="e2e", profile_step=self.global_steps)
                  if self.use_reference_policy:
                      self.ref_policy_wg.start_profile(profile_step=self.global_steps)
                  if self.use_critic:
                      self.critic_wg.start_profile(profile_step=self.global_steps)
                  if self.use_rm:
                      self.rm_wg.start_profile(profile_step=self.global_steps)
      
          def _stop_profiling(self, do_profile: bool) -> None:
              """Stop profiling for all worker groups if profiling is enabled."""
              if do_profile:
                  self.actor_rollout_wg.stop_profile()
                  if self.use_reference_policy:
                      self.ref_policy_wg.stop_profile()
                  if self.use_critic:
                      self.critic_wg.stop_profile()
                  if self.use_rm:
                      self.rm_wg.stop_profile()
      
    • _balance_batch对 batch 进行优化操作

      对单个控制器上的数据进行重新排序,使得每个数据并行层级获得相近的总token数。

      这个主要是加快流水线并行的效率,普通的 batch 拆分是“按样本数量”平均分,但语言模型的样本长度差异极大。→ 结果:某些 GPU 处理很长序列,其他 GPU 很快就闲置,导致 pipeline bubble(GPU 等待)。

      所以为了减少 GPU 的并行等待(DP)呢,就需要重新调整一下 batch,让每张卡处理的 token 数尽可能相等,从而提升吞吐率,减少因序列长度差异带来的等待与 GPU 空转。

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      
      def _balance_batch(self, batch: DataProto, metrics, logging_prefix="global_seqlen", keep_minibatch=False):
          attn = batch.batch["attention_mask"]
          seqlen = attn.view(attn.shape[0], -1).sum(-1) #以 attention_mask 统计每条样本 token 数
          seqlen = calculate_workload(seqlen)              # 统计量或平滑
          ws = self.actor_rollout_wg.world_size
          parts = ( # 可选“按小批保持”或整体划分
              _decoupled_parts(seqlen, ws, self.config.actor_rollout_ref.actor.get("ppo_mini_batch_size"))
              if keep_minibatch else get_seqlen_balanced_partitions(seqlen, k_partitions=ws, equal_size=True) #get_seqlen_balanced_partitions 把样本划分到各 DP rank,使总 token接近一致
          )#keep_minibatch, 按小批次单独平衡,而不是对整个 batch 平衡
          for i, p in enumerate(parts):                    # 小的在两端,减少流水线气泡
              p.sort(key=lambda x: (seqlen[x], x))
              parts[i] = p[::2] + p[1::2][::-1] # 按长度从短到长排序,然后两端交错排列(短、长、短、长…)
      		# 这主要是流水线并行的优化,流水线并行时,让长序列与短序列交替,有助于保持稳定的 GPU 利用率(减少 bubble)
          idx = torch.tensor([j for p in parts for j in p])
          batch.reorder(idx) #根据索引重新排序。数据将通过调度函数自动进行均等分区。
          metrics.update(log_seqlen_unbalance(seqlen_list=seqlen, partitions=parts, prefix=logging_prefix))
      
    • compute_rollout_importance_weights_and_add_to_batch计算重要性采样(IS)权重,并应用拒绝采样以解决推演与训练不匹配的问题。

      前提:有 rollout_log_probs,并设置了阈值。

      该函数主要是修正策略比率(重要性比率),在优势估计前完成这个步骤,包含 1.计算 IS weights = π_new(a|s) / π_rollout(a|s) 的近似;2.拒绝极端情况(mask|veto);3.可选加入权重来修正期望;

      compute_rollout_importance_weights(...)通常$r=exp(log\pi_{rollout}-log\pi_{old})$计算比值作为重要性权重,随后按 level/mode/threshold​ 做裁剪、平滑或掩码

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      
      def compute_rollout_importance_weights_and_add_to_batch(self, batch: DataProto) -> tuple[DataProto, dict]:
          thr = self.config.algorithm.get("rollout_is_threshold", None)
          if thr is not None and thr > 0 and "rollout_log_probs" in batch.batch:
      		#产生重要性权重 w,更新后的 response_mask,以及 mismatch 统计
              w, new_mask, m = compute_rollout_importance_weights(
                  old_log_prob=batch.batch["old_log_probs"], #旧策略 prob
                  rollout_log_prob=batch.batch["rollout_log_probs"], #新策略prob
                  response_mask=batch.batch["response_mask"], #掩码去掉prompt 和 padding
                  rollout_is_level=self.config.algorithm.rollout_is_level, # token级还是序列级?
                  rollout_is_mode=self.config.algorithm.rollout_is_mode, # 仅掩码、上下截断
                  rollout_is_threshold=self.config.algorithm.rollout_is_threshold, #clip 阈值
                  rollout_is_threshold_lower=self.config.algorithm.get("rollout_is_threshold_lower", None), #clip 阈值
                  rollout_is_veto_threshold=self.config.algorithm.get("rollout_is_veto_threshold", None), #极端值的处理
              )
              batch.batch["response_mask"] = new_mask                  # 无论是否真正用权重,一律更新掩码。这保证被拒绝的 token 不参与损失与归一化,防止分母泄漏。
      		#只有 rollout_is=True 才把 w并入 batch 用于损失加权;否则仅做拒绝与监控。
              if self.config.algorithm.get("rollout_is", False):       # 仅在开启时并入权重
                  batch = batch.union(w)
              return batch, m
      		#返回 m(metrics):用于监控 rollout 与 old 的失配程度,便于先“只看指标不施加权重”的安全过渡
          return batch, {}
      
    • 训练循环 fit()-PPO 的训练循环

      其流程可以转化与 PPO 理论截断进行匹配

      PPO 理论阶段代码对应实现作用
      ① 收集样本(Rollout)generate_sequences()用当前 Actor 策略生成回复
      ② 计算奖励(Reward)compute_rm_score()​+reward_fn()使用奖励模型与规则综合奖励
      ③ KL 惩罚apply_kl_penalty()对策略偏离程度施加惩罚项
      ④ 优势估计compute_advantage()用 GAE 或其他方法计算 Advantage
      ⑤ 更新 Criticupdate_critic()拟合价值函数 (V_\phi(s))
      ⑥ 更新 Actorupdate_actor()用 PPO 目标更新策略参数
      ⑦ 验证与日志_validate()​+logger.log()在周期性迭代后评估模型

      PPO 中需要训练的模型是 Critic 模型和 Actor 模型

      • KL 惩罚项的引入 $r_t = r_t - \beta , KL[\pi_t||\pi_{ref}]$

      • GAE 计算优势:$A_t = \delta_t + (\gamma \lambda)\delta_{t+1} + \dots$

        • 系数含义控制效果
          $\gamma$ (gamma)折扣未来奖励控制长期性:未来奖励对当前决策的影响程度,回到了经典的问题,这里再重申一下 exploration(探索) 「只探索->行为随机无法收敛」和 exploitation(利用) 「只利用,陷入局部最优解,无法改进」
          $\lambda$ (lambda)GAE 平滑参数控制偏差-方差权衡:λ→1 近似 MC,λ→0 近似 TD(0)
      • 对于 Critic 模型的更新目标是:最小化 时序差分TD 误差,$L_V = \frac{1}{2} (V_\phi(s_t) - \hat{R}_t)^2$

        • 之前也讲过,Critic 的存在是为了给 Actor 提供一个基线 (baseline),从而让策略梯度更稳定、方差更低。

        • 在强化学习中,我们希望 Vϕ(s) 尽可能接近真实的回报:$R_t = r_t + \gamma r_{t+1} + \gamma^2 r_{t+2} + \dots$,由于未来回报难以直接计算,我们用“估计的目标回报” $(\hat{R}_t)$代替,例如用 GAE (Generalized Advantage Estimation) 或 n-step TD 计算得到:

          $$ \hat{R}_t = r_t + \gamma (1 - \lambda) V*\phi(s_{t+1}) + (\gamma \lambda) (r_{t+1} + \gamma V_\phi(s_{t+2})) + \dots $$

          这个 $(\hat{R}_t)$相当于 Critic 应该输出的“真值”,而 $Vϕ(s_t)$是 Critic 当前的预测。

        • 推导一下啊哈这个损失函数,我们希望让 $Vϕ$预测的值尽可能接近 $(\hat{R}_t)$,那么最直接的方式就是最小化两者的均方误差MSE。

        • Crtic 在给定状态输出更准确的价值估计对于计算 Advantage 至关重要,$A_t = \hat{R}_t - V_\phi(s_t)$

      • Actor 模型的更新目标,已经提到很多次啦~

        $$ L^{CLIP} = -\mathbb{E}_t [\min(r_t A_t, \text{clip}(r_t, 1-\epsilon, 1+\epsilon)A_t)] $$

        其中 ($r_t = \frac{\pi*\theta(a_t|s_t)}{\pi_{old}(a_t|s_t)}$)。

        1
        2
        3
        4
        5
        6
        7
        8
        9
       10
       11
       12
       13
       14
       15
       16
       17
       18
       19
       20
       21
       22
       23
       24
       25
       26
       27
       28
       29
       30
       31
       32
       33
       34
       35
       36
       37
       38
       39
       40
       41
       42
       43
       44
       45
       46
       47
       48
       49
       50
       51
       52
       53
       54
       55
       56
       57
       58
       59
       60
       61
       62
       63
       64
       65
       66
       67
       68
       69
       70
       71
       72
       73
       74
       75
       76
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      
      def fit(self):
          """
          PPO 主训练循环:驱动器节点负责调度数据流,WorkerGroup 在 GPU 上执行计算。
          """
          from verl.utils.tracking import Tracking
          from omegaconf import OmegaConf
      
          logger = Tracking(project_name=self.config.trainer.project_name,
                              experiment_name=self.config.trainer.experiment_name,
                              default_backend=self.config.trainer.logger,
                              config=OmegaConf.to_container(self.config, resolve=True))
      
          global_steps = 0
      
          # perform validation before training
          # currently, we only support validation using the reward_function.
      	# 训练前先测试一下模型的性能(按照奖励函数)
          if self.val_reward_fn is not None: 
              val_metrics = self._validate() # 若配置了验证奖励函数,则先在验证集上测试当前模型性能(baseline)
              pprint(f'Initial validation metrics: {val_metrics}')
      
          for epoch in range(self.config.trainer.total_epochs):
              for batch_dict in self.train_dataloader: #每个 batch_dict 是从强化学习数据集中加载的一批 prompt
                  metrics = {}
      
                  batch: DataProto = DataProto.from_single_dict(batch_dict) #封装为一个 DataProto(verl 的数据集格式)
                  # batch = batch.to('cuda')
      
                  # pop those keys for generation
                  gen_batch = batch.pop(batch_keys=['input_ids', 'attention_mask', 'position_ids']) #提取用于生成的输入字段,形成新的 `batch gen_batch`
      
                  # 生成样本(rollout)
                  with Timer(name='gen', logger=None) as timer:
                      gen_batch_output = self.actor_rollout_wg.generate_sequences(gen_batch) #调用远程 worker group(actor model) 的 generate_sequences()。
                  metrics['timing/gen'] = timer.last
      
                  batch = batch.union(gen_batch_output) #合并生成结果,这里 actor 在GPU 上生成了一批新的回复,生成完后我们将其合并起来
      
                  if self.use_reference_policy: #如果有使用参考模型的话,那么就需要开始计算参考模型的 log_prob,这一步的主要目的是为了计算 PPO 中的 KL 散度
                      # compute reference log_prob
                      with Timer(name='ref', logger=None) as timer:
                          ref_log_prob = self.ref_policy_wg.compute_ref_log_prob(batch) #调用远程 worker(reference model) 的 compute_ref_log_prob 完成 ref模型的 log 计算
                          batch = batch.union(ref_log_prob) #合并各个 batch 结果
                      metrics['timing/ref'] = timer.last
      
                  # compute values
      			# Critic模型来计算value
                  with Timer(name='values', logger=None) as timer: 
                      values = self.critic_wg.compute_values(batch) #调用远程 worker(critic model) 的 compute_values 来计算 value
                      batch = batch.union(values) #同样合并结果
                  metrics['timing/values'] = timer.last
      
      			#=====这上面这部分,其实都是准备阶段,为了计算优势,我们需要先准备好这些结果=======
      			#因此可以并行进行计算,verl 使用 ray 来调控各个节点,各个节点内部是并行的
      			#======================================================================
      
      			#下面我们需要开始整合结果,包括计算奖励、KL 惩罚的加入还有进行优势估计了
                  with Timer(name='adv', logger=None) as timer:
                      # 计算分数。支持基于模型和基于函数的。我们首先使用奖励模型计算分数。然后,我们调用reward_fn来结合奖励模型和基于规则的结果。
                      if self.use_rm:
                          # 若有奖励模型(RM),先获得基于模型的奖励;
                          reward_tensor = self.rm_wg.compute_rm_score(batch)
                          batch = batch.union(reward_tensor)
      
                      # reward_fn 再融合规则式奖励;
                      reward_tensor = self.reward_fn(batch)
                      batch.batch['token_level_scores'] = reward_tensor
      
                      # 应用 KL 惩罚 r_t = r_t - βKL[π_t || π_ref]
                      batch, kl_metrics = apply_kl_penalty(batch,
                                                              kl_ctrl=self.kl_ctrl_in_reward,
                                                              kl_penalty=self.config.algorithm.kl_penalty)
                      metrics.update(kl_metrics)
      
                      # 调用 compute_advantage() 计算 PPO 的优势,algorithm.adv_estimator可以用为 GAE。
                      batch = compute_advantage(batch,
                                                  self.config.algorithm.gamma, #<----
                                                  self.config.algorithm.lam, #<----
                                                  adv_estimator=self.config.algorithm.adv_estimator)
                  metrics['timing/adv'] = timer.last
      
                  # 更新模型(更新 Critic)
      			# Critic 更新目标:最小化 TD 误差,
                  if self.use_critic:
                      with Timer(name='update_critic', logger=None) as timer:
                          critic_output = self.critic_wg.update_critic(batch)
                      metrics['timing/update_critic'] = timer.last
                      critic_output_metrics = reduce_metrics(critic_output.meta_info['metrics'])
                      metrics.update(critic_output_metrics)
      
                  # 当 Critic 预热完毕后才更新 Actor;
      			# 如果 Critic 不准的话,会严重影响到 Adavantage 的计算,在训练的前若干步,只更新 Critic,不更新 Actor,确保 Critic的稳定
                  if self.config.trainer.critic_warmup <= global_steps: #只有当步数 > critic_warmup 后,Actor 才开始参与训练
                      # 更新 Actor(策略优化)
                      with Timer(name='update_actor', logger=None) as timer:
                          actor_output = self.actor_rollout_wg.update_actor(batch)
                      metrics['timing/update_actor'] = timer.last
                      actor_output_metrics = reduce_metrics(actor_output.meta_info['metrics'])
                      metrics.update(actor_output_metrics)
      
                  # 下面主要是完成验证和日志记录等操作;
                  if self.val_reward_fn is not None and (global_steps + 1) % self.config.trainer.test_freq == 0:
                      with Timer(name='testing', logger=None) as timer:
                          val_metrics: dict = self._validate() #定期调用 _validate();来看看模型在测试集上的性能怎么样
                          val_metrics = {f'val/{key}': val for key, val in val_metrics.items()}
                      metrics['timing/testing'] = timer.last
                      metrics.update(val_metrics)
      
                  # collect metrics
                  data_metrics = compute_data_metrics(batch=batch)
                  metrics.update(data_metrics)
      
                  # TODO: make a canonical logger that supports various backend
                  logger.log(data=metrics, step=global_steps) #计算训练统计量并写入日志系统。
      
      			# 这里就是保存模型检查点了
                  if self.config.trainer.save_freq > 0 and (global_steps + 1) % self.config.trainer.save_freq == 0:
                      actor_local_path = os.path.join(self.config.trainer.default_local_dir, 'actor',
                                                      f'global_step_{global_steps}')
                      actor_remote_path = os.path.join(self.config.trainer.default_hdfs_dir, 'actor')
                      self.actor_rollout_wg.save_checkpoint(actor_local_path, actor_remote_path)
      
                      if self.use_critic:
                          critic_local_path = os.path.join(self.config.trainer.default_local_dir, 'critic',
                                                              f'global_step_{global_steps}')
                          critic_remote_path = os.path.join(self.config.trainer.default_hdfs_dir, 'critic')
                          self.critic_wg.save_checkpoint(critic_local_path, critic_remote_path)
      
                  global_steps += 1
      
          # perform validation after training
          if self.val_reward_fn is not None:
              val_metrics = self._validate()
              pprint(f'Final validation metrics: {val_metrics}')
      
  6. 总结 verl 实现的 PPO 的逻辑流程

    阶段主要函数关键数据数学意义
    生成generate_sequencesresponses, old_log_probs从 $pi_{old}$采样
    打分compute_rm_score​+compute_rewardtoken_level_scores获得 $r_t$
    奖励惩罚apply_kl_penaltytoken_level_rewards加入 $−β·KL$
    优势compute_advantageadvantages, returns计算 $A\_t 、 R\_t$
    Critic 更新update_criticvalues, returns最小化 MSE
    Actor 更新update_actorold_log_probs, advantages最大化 clip 目标

RL的一些 tricks

经常会问到 RL 和 SFT 有啥区别,RL 那么强,SFT 是不是没啥用了这里的,可以做一个简单的分析,首先捋清RL/SFT的 loss 逻辑:

RL 的单个 step 的 loss 是$l_t(\theta)=-A_t·log\pi_\theta(a_t)$

LLM 中的每个 step 输出一个 logits 向量,对这个 logits 做一次 softmax 函数就得到了每个 token 被选中的概率。

若记$z$为logits 向量,那么$\pi$=softmax(z),令 loss 对 logits 向量 $z$ 求导得: $\nabla_{z_{t,k}} \ell_t = A_t \cdot (\pi_\theta(k) - \mathbf{1}[k=a_t])$

换言之,RL 的优化动力可以概括为:如果 Advantage 大于零,logits 向量在当前 token 维度的梯度是负数,这个 logit 会朝增大的方向优化,logits 向量在其他所有维度的梯度都是正数,剩余的 vocab_size - 1 个 logit 都会朝着减小的方向优化;反之亦然。


那么对 SFT是怎么样的呢?SFT 的 loss 是目标分布和模型分布的交叉熵函数(极大似然估计/交叉熵)。

给定目标分布 $q$(通常是 one-hot),模型预测分布 $\pi_\theta$,

交叉熵定义为: $\ell^{\text{CE}}(q, \pi_\theta) = - \sum_{k} q(k) \, \log \pi_\theta(k)$

在 SFT 中,我们有一个 参考答案 $a_t$,所以目标分布是: $q(k) = \mathbf{1}[k=a_t]$

综上, SFT 的单个 token 的loss 是:

$\ell_t^{\text{SFT}}(\theta) = - \sum_k \mathbf{1}[k=a_t] \log \pi_\theta(k) = - \log \pi_\theta(a_t)$

令 loss 对 logits 向量 $z$ 求导得: $\nabla_{z_{t,k}} \ell_t^{\text{SFT}} = \pi_\theta(k) - \mathbf{1}[k=a_t]$


SFT loss 和 RL loss 在形式上没有区别,SFT 仅仅是 RL advantage 全为 1 时的一个特例罢了。更准确的说法是:SFT 是一种全部样本都为 off_policy,只计算正例 loss,且 advantage 均为 1 的 RL。

RL 中的 $A_t$ 并不是一个加权常数,$A_t$的“完整”写法是 $A_t(\pi)$,这是一个随着$\pi$的变化而发生变化的函数,更麻烦的是,这是一个黑箱函数,无法对 $A_t(\pi)$求导,而只能通过采样来评估,采样必然存在误差,进而导致训练不稳定。与之相对的,SFT 的 $A_t$ 实打实就是常数 1。
–> loss 形式一致但训练差异性较大,说明我们应该聚焦在两种训练方式的数据分布究竟有何差异。

那为什么 RL 训练那么不稳定?不如 SFT 稳定呢?

那如何来提高 RL 训练过程的稳定性呢?

  1. entropy collapse:训练的时候加不加 entropy loss,至今仍未达成共识

  2. CILP 裁剪,至少一半的强化工作都围绕 clip 做文章,这些工作分析的非常有道理,实际用起来却乏善可陈;

  3. Token Mask:对高熵 / 低熵 token 做特殊的逻辑,或鼓励某些 token 的学习,或阻止某些 token 的更新,也是重点雕花区域;

  4. Reward Shape:

    • 控制 reward 样本中 0 / 1 的分布在一个区间范围内;
    • 用 pass@K 代替 pass@1 作为优化目标;
    • 用 test case 的通过率作为 reward;
    • 长度惩罚;
    • 大致思路是想稳定奖励,减少奖励之间的方差;
  5. 训推一致性:目前比较热的话题,以 tis、icepop 最为火热,但可以说和算法没啥关系,全看 infra 功底,比如之前有一篇就是把 verl 里面的 vllm 推理部分和模型部分拎出来了,效果就好了很多;

    • 推荐阅读:Your Efficient RL Framework Secretly Brings You Off-Policy RL Training
    • 梯度上升计算时:$\theta \gets \theta + \mu \cdot \mathbb{E}_{\underbrace{a \sim{\pi}(\theta)}_{rollout}} [R(a)\cdot \underbrace{\nabla_\theta \log {\pi}(a, \theta)}_{\tiny{training}}].$
    • 但是这里 rollout 和 training 的模型都不一样「训练和采样的模型不一样」:$\theta \gets \theta + \mu \cdot \mathbb{E}_{a \sim \textcolor{red}{\pi_{\text{sampler}}}(\theta)} [R(a)\cdot \nabla_\theta \log \textcolor{blue}{\pi_{\text{learner}}}(a, \theta)].$
    • 导致了分布的不一样,那么怎么解决?
    • 把$\mathbb{E}_{a \sim \textcolor{red}{\pi_{\text{sampler}}}(\theta)} [R(a)\cdot \nabla_\theta \log \textcolor{blue}{\pi_{\text{learner}}}(a, \theta)]$转换为$\mathbb{E}_{a \sim \textcolor{red}{\pi_{\text{sampler}}}(\theta)} \Bigl[\frac{\textcolor{blue}{\pi_{\text{learner}}}(a, \theta)}{\textcolor{red}{\pi_{\text{sampler}}}(a, \theta)} \cdot R(a)\cdot \nabla_\theta \log \textcolor{blue}{\pi_{\text{learner}}}(a, \theta)\Bigr].$
    • $\mathbb{E}_{a \sim \textcolor{red}{\pi_{\text{sampler}}}(\theta)} \Bigl[\underbrace{\min\Bigl(\frac{\textcolor{blue}{\pi_{\text{learner}}}(a, \theta)}{\textcolor{red}{\pi_{\text{sampler}}}(a, \theta)}, C\Bigr)}_{\text{truncated importance ratio}} \cdot R(a) \cdot \nabla_\theta \log \textcolor{blue}{\pi_{\text{learner}}}(a, \theta)\Bigr]$ 增加一个这样的截断性重要性比例
  6. 为什么会熵增?按理说训练是一个确定性增加的过程应该熵减,但是模型确实在增,那就说明训练的过程中:高概率 token 常被当作负例,或者是低概率 token 被常当作正例

  7. 为什么会熵减过快?rollout 多样性差呗。是不是调整下 rollout temperature 和 rollout prompt 要比加一个 entropy loss 更合理些;

总而言之,各种技巧的本质其实都是为了帮助模型寻找一个适合它的训练数据的分布,因此,分析 rollout 数据分布的变化,优先级要始终领先于尝试引入某个稳定训练的技巧。 这些技巧在稳定某次训练的同时,也会掩盖训练崩溃的原因。但同时,若某个技巧确实有用,也可以反过来推哪种数据分布“有利于/有损于”模型的学习:例如, off_policy 和训推不一致会引起崩溃,是不是在间接说明“一个整体上与模型分布很接近,但却在个别 token 上和模型分布差异很大的样本”可能是一种不太适合模型的数据。

引入训练技巧必然会引起训练数据分布的变化,有些分布的变化是在我们预期之内的,有些分布的变化则是我们预期之外且不知情的。 CISPO 的作者就曾分享过:在 off_policy 的时候, 被 clip 掉的 token 是具有多重分布的,概率值与当前模型的分布不一致只是其所具有的一个分布,“概率低但影响 long cot涌现”则是这些 token 的另外一个分布。作为训练者,我们往往不能留意到所有分布的变化,从而总结出一些错误的结论。

那么,RL 的没有任何副作用的 trick 是什么呢?——洗数据,高质量的数据获取很重要

先说数据吧,今年大家普遍进入了 post train 深水区之后(从 math、gsm8k 进阶到 aime、imo),一个很严重的问题就是:训模型者看不懂数据了,没有办法通过肉眼看解题过程来判断数据质量了。而训模型者日常批量清洗数据的手段,往往都存在一个问题:“无法区分难题和错题”。

对于错题,不要以为错题的答案是离谱到一眼就能看出来的那种,事实上,错题往往是十分接近 ground_truth 且非常具有迷惑性的。这里举几个例子:

  1. 一张票 2 块钱,9 块钱能买几张票?我们以为错题的答案会是 356 张这种离谱的数字,其实是 4.5 张;
  2. 一个一元七次方程,错题的答案给了 3 个实数解,我们 review 的时候,反代入进去发现是对的,留下了这道题目。但在训练的时候,模型拿着 3 个实数解和 4 个复数解,高高兴兴的去找 reward_model 要奖励的时候,反手被打了 0 分,这对模型是多大的心理阴影呀。
  3. ……

目前的开源 RL 数据的质量真的是一言难尽。要么请专业的硕博理科生去标注,要么用启发式的规则去清洗,在不够干净的数据上只能得到错误的实验结论。

对于reward model,千万不要以为所谓的 rule_based reward model 真的就是靠 rule 来打分的,或者是靠 math_verify 这种规则库 。有很多情况下,靠 rule 几乎无解:

  1. 问题是盈利__%?标准答案是 96,而模型输出了“盈利96%”,模型活该拿 0 分吗?
  2. 标准答案是 3.14,模型输出了 $\pi$、圆周率、3.1415926,模型活该拿 0 分吗?
  3. ……

reward 要准,有研究者建议使用 generate reward(LLM),而且得是能力巨强的那种。这个模型需要读的懂题目要求的输出格式、 ground_truth 的等价变换,以及各种复杂的高阶公式。除了较强的知识能力外,模型还要具备很强的指令遵循能力,否则它容易自己亲自下场解题。

参考资料

[1] PPO for LLMs: A Guide for Normal People: Cameron R. Wolfe, Ph.D. https://cameronrwolfe.substack.com/p/ppo-llm

[2] RL 杂谈,作者: ybq, https://zhuanlan.zhihu.com/p/1966609550032475890

[3] Your Efficient RL Framework Secretly Brings You Off-Policy RL Training, fengyao et al. https://fengyao.notion.site/off-policy-rl

[4] 如何理解verl框架中那些Batch Size,https://zhuanlan.zhihu.com/p/1944151286984471847

[5] RL 学习笔记 #01 基本概念 https://hwcoder.top/RL-Note-1

[6] Putting RL back in RLHF:https://huggingface.co/blog/putting_rl_back_in_rlhf_with_rloo

[7] REINFORCE: Easy Online RL for LLMs: https://cameronrwolfe.substack.com/p/reinforce