Jasaxion一只大雄

风打,碎琉璃; 打不碎的,是那阳光漫地。The wind strikes, shattering the glazed glass; Yet unbroken remains the sunlight spilling across the earth.

[LLM 面试] Numpy手写 Top P 和 Top K 算法

2025-02-17


大模型根据给定的输入文本(比如一个开头或一个问题)生成输出文本(比如一个答案或一个结尾)。为了生成输出文本,模型会逐个预测每个 token,直到达到一个终止条件(如一个标点符号或一个最大长度)。在每一步,模型会给出一个概率分布,表示它对下一个单词的预测概率。

贪心策略

image-20250217152726514

Beam Search 集束搜索

image-20250217152904177

代码实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import numpy as np

def beam_search(logits: np.ndarray, num_beams: int, max_length: int) -> list:
    vocab_size = len(logits)
    beams = [([], 0.0)]  # (sequence, score)

    # 逐步扩展序列
    for _ in range(max_length):
        candidates = []
        
        # 扩展每个当前光束
        for sequence, cum_score in beams:
            # 第一步,或继续一个序列时
            current_logits = logits
            
            # 转换为概率分布
            # probs = np.exp(current_logits) / np.sum(np.exp(current_logits))
            probs = current_logits
            
            # 获取前num_beams个可能出现的下一个标记
            top_tokens = np.argsort(probs)[-num_beams:]
            
            # 通过扩展当前序列来创建候选对象。
            for token in top_tokens:
                new_sequence = sequence + [token]
                new_score = cum_score + np.log(probs[token])  # Log probability
                candidates.append((new_sequence, new_score))
        
        # 选择前num_beams个候选者
        candidates.sort(key=lambda x: x[1], reverse=True)  # Sort by score
        beams = candidates[:num_beams]
    return beams

if __name__ == "__main__":
    vocab_size = 5
    example_logits = np.array([0.35, 0.25, 0.20, 0.15, 0.05])
    tokens = ["猫", "狗", "兔", "鸟", "鱼"]

    print("原始概率分布:")
    for token, prob in zip(tokens, example_logits):
        print(f"{token}: {prob:.3f}")
    
    results = beam_search(logits=example_logits, num_beams=3, max_length=3)
    
    print("通过束搜索找到的顶级序列:")
    for sequence, score in results:
        words = [tokens[idx] for idx in sequence]
        print(f"序列: {' '.join(words)}, 分数: {np.exp(score):.4f}")

Top K 抽样

image-20250217152536401

代码实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import numpy as np

# 易错点:要将 top_k 以外的数值的概率置为 0,再归一化
def top_k(logits:np.ndarray, k:int = 3) -> np.ndarray:
    # 确保 k 不超过序列长度
    k = min(max(k, 1), len(logits))

    probs = logits.copy()
    top_k_idx = np.argsort(probs)[-k:]

    # 将非 top-k 的概率置为 0
    mask = np.zeros_like(probs, dtype=bool)
    mask[top_k_idx] = True
    probs[~mask] = 0

    probs = probs / np.sum(probs) #归一化调整概率
    return probs, top_k_idx

if __name__ == '__main__':
    vocab_size = 5
    candidate = np.array([0.35, 0.25, 0.20, 0.15, 0.05])
    tokens = ["猫", "狗", "兔", "鸟", "鱼"]
    
    print("原始概率分布:")
    for token, prob in zip(tokens, candidate):
        print(f"{token}: {prob:.3f}")
    
    k_list = [1, 2, 3]
    for k in k_list:
        top_k_probs, top_k_idx = top_k(candidate, k)
        print(f"\nTop-{k} 采样后的概率分布:")
        for idx, prob in enumerate(top_k_probs):
            if prob > 0:  # 只打印非零概率
                print(f"{tokens[idx]}: {prob:.3f}")
        
        print(f"保留的索引: {top_k_idx}")
        print(f"概率和: {np.sum(top_k_probs):.3f}")  # 验证概率和为 1

Top P 抽样(核采样 Nucleus Sampling)

image-20250217153235974

代码实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import numpy as np

def top_p(logits: np.ndarray, p: float) -> np.ndarray:
    p = min(max(p, 1e-8), 1) # 限制 p 的范围

    # # 1.softmax将logits转换为概率分布
    # probs = np.exp(logits) / np.sum(np.exp(logits))
    probs = logits
    # 2. 按概率从大到小排序
    sorted_indices = np.argsort(probs)[::-1]
    sorted_probs = probs[sorted_indices]
    cumsum_probs = np.cumsum(sorted_probs)

    # 3. 找到累积概率刚好超过p的位置
    cutoff_idx = np.argmax(cumsum_probs > p)

    # 4. 将概率分布中,小于该位置的概率置0
    mask = np.zeros_like(probs)
    mask[sorted_indices[:cutoff_idx]] = 1

    # 5. 应用mask并重新归一化
    probs = probs * mask
    probs = probs / np.sum(probs)
    
    return probs, sorted_indices[:cutoff_idx]

if __name__ == "__main__":
    candidate = np.array([0.35, 0.25, 0.20, 0.15, 0.05])
    tokens = ["猫", "狗", "兔", "鸟", "鱼"]

    print("原始概率分布:")
    for token, prob in zip(tokens, candidate):
        print(f"{token}: {prob:.3f}")

    p_list = [0.35, 0.65, 0.9]
    
    top_p(candidate, 0.8)

    for p in p_list:
        top_p_probs, top_index = top_p(candidate, p)
        print(f"\nTop-{p} 采样后的概率分布:")
        for idx, prob in zip(top_index, top_p_probs):
            print(f"{tokens[idx]}: {prob:.3f}")

温度 Temperature

工作原理:

代码实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import numpy as np

def apply_temperature(logits: np.ndarray, temperature: float) -> np.ndarray:
    temperature = max(temperature, 1e-8) # 除零错误
    logits = logits / temperature # 应用温度常数
    
    # 优化:Softmax 将 logits 转化为概率分布
    exp_logits = np.exp(logits - np.max(logits))
    probs = exp_logits / np.sum(exp_logits)
    return probs

# 采样计算
def sampling_with_temperature(logits: np.ndarray, temperature: float, num_samples: int) -> np.ndarray:
    # 应用温度调节
    probs = apply_temperature(logits, temperature)
    # 从概率分布中采样,采样 1000 次,其中样本为token 个数,概率为probs
    samples = np.random.choice(len(probs), num_samples, p=probs)
    return samples

if __name__ == '__main__':
    # 创建一个示例概率分布
    vocab_size = 5
    original_probs = np.array([0.35, 0.25, 0.20, 0.15, 0.05])
    vocab = ["猫", "狗", "兔", "鸟", "鱼"]

    print("原始概率分布:")
    for token, prob in zip(vocab, original_probs):
        print(f"{token}: {prob:.3f}")

    temperature_list = [0.5, 1.0, 1.5]
    num_samples = 1000

    for temperature in temperature_list:
        logits = np.log(original_probs)
        #模拟抽样的过程
        samples = sampling_with_temperature(logits, temperature, num_samples)

        unique, counts = np.unique(samples, return_counts=True) #统计采样结果
        sampled_probs = counts / num_samples #实际的采样概率

        print(f"\n温度: {temperature}")
        for token, prob in zip(vocab, sampled_probs):
            print(f"{token}: {prob:.3f}")

结果「随着温度变大,低概率的词更有可能被选中」:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
原始概率分布:
猫: 0.350
狗: 0.250
兔: 0.200
鸟: 0.150
鱼: 0.050

温度: 0.5
猫: 0.479
狗: 0.257
兔: 0.169
鸟: 0.086
鱼: 0.009

温度: 1.0
猫: 0.340
狗: 0.270
兔: 0.204
鸟: 0.136
鱼: 0.050

温度: 1.5
猫: 0.298
狗: 0.248
兔: 0.211
鸟: 0.166
鱼: 0.077

联合采样(结合 Top K + Top P + Temperature)

联合采样就像是做菜时调味一样,将多种采样方法的"调料"组合在一起,取长补短。它主要结合了三种方法:Temperature(温度)、Top K 和 Top P。

想象一下这个过程:

  1. 首先用 Temperature(温度)调节整体的"口味":
    • 就像调节火候,温度低时会让高概率的词更容易被选中
    • 温度高时则会让各个词的机会更平均
  2. 然后用 Top K 来"粗筛":
    • 相当于先选出 K 个最可能的候选词
    • 就像先挑出 K 个最好的原材料
  3. 最后用 Top P 来"细筛":
    • 在剩下的候选词中,只保留累积概率达到 P 的那些词
    • 就像在好食材中进一步筛选,确保质量

举个具体的例子: 假设模型在预测下一个词,有这些选项:

联合采样的过程:

  1. 先用温度调节这些概率(比如温度 0.7 会让高概率更高)

  2. 然后用 Top K=3 选出前三名:猫、狗、兔

  3. 最后用 Top P=0.8 确认这三个的累积概率(0.35+0.25+0.20=0.8)符合要求

最终的采样过程:

  1. 经过 Temperature、Top K、Top P 的层层过滤后,我们得到的不是一个简单的 token 列表,而是一个经过调整的概率分布。这个分布中:

    • 一些 token 的概率被设为 0(被过滤掉的)
    • 剩下的 token 保持它们的相对概率关系(但概率值会被重新归一化)
  2. 最终的采样是基于这个调整后的概率分布进行的随机采样,不是简单地从列表中随机选一个。

举个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
原始分布
"猫": 0.35
"狗": 0.25
"兔": 0.20
"鸟": 0.15
"鱼": 0.05

经过联合采样处理后可能变成
"猫": 0.44  # (0.35 / 0.8)
"狗": 0.31  # (0.25 / 0.8)
"兔": 0.25  # (0.20 / 0.8)
"鸟": 0     # 被过滤掉
"鱼": 0     # 被过滤掉

# 最后用 np.random.choice 这样的函数基于概率采样一个token
chosen_token = np.random.choice(tokens, p=adjusted_probs)

这样,概率大的 token 仍然更可能被选中,但也给其他合理的 token 一定机会,保证了生成文本的多样性和合理性。