microgpt.py Andrej Karpathy (AK)源码详解GPT训练和推理艺术过程(附中文注释)

microgpt.py Andrej Karpathy (AK)源码详解GPT训练和推理艺术过程(附中文注释)

Andrej Karpathy (AK) 发布的 microgpt.py。它的伟大之处在于,除了 Python 内置的数学库,它不依赖于任何深度学习框架(如 PyTorch 或 TensorFlow),却完整实现了 GPT 的核心组件:自动求导、Transformer 架构、Adam 优化器以及推理逻辑。

源码:https://gist.github.com/karpathy/8627fe009c40f57531cb18360106ce95

仅用约一百多行逻辑代码就完成了一个 GPT 模型的全生命周期:从原始数据输入,到神经网络的梯度更新,再到最后的文本生成。👍

Karpathy 的这段代码本质上是将 深度学习的物理实现 进行了扁平化。

  • 解决了“神经网络如何记住错误”(自动求导)。
  • 解决了“模型如何处理序列”(Transformer 结构)。
  • 解决了“如何最高效地修正错误”(Adam 优化)。

核心代码中最精华的三个算法模块

用极简代码还原复杂数学的过程解构:

1. 自动求导引擎 (Autograd)

这是模型能够“学习”的根基。它通过 Value 类将普通的数学运算包装成一个计算图

  • 链式法则的闭包实现:每个运算符(如 __mul__)都定义了一个 _backward 函数。它捕捉当前运算的梯度,并根据导数公式(如乘法法则 )将其分配给父节点。
  • 拓扑排序backward() 函数通过深度优先遍历确保梯度从损失函数(Loss)开始,严格按照运算逆序回溯,避免了顺序错误。

2. 多头自注意力机制 (Self-Attention)

gpt 函数内部,Karpathy 用最原始的列表推导式实现了这个 Transformer 的核心:

1
2
3
4
5
# 核心逻辑简述
attn_logits = [sum(q_h[j] * k_h[t][j] for j in range(head_dim)) / head_dim**0.5 for t in range(len(k_h))]
attn_weights = softmax(attn_logits)
head_out = [sum(attn_weights[t] * v_h[t][j] for t in range(len(v_h))) for j in range(head_dim)]

  • 点积注意力:通过 sum(q * k) 计算当前字符与历史所有字符的相关性。
  • 缩放因子:除以 head_dim**0.5(即 )来防止梯度消失。
  • KV Cache 的雏形:通过 keys[li].append(k) 手动维护历史记录,这正是 GPT 实现高效自回归生成的关键。

3. Adam 优化器

位于训练循环的最底部,这是目前大模型训练的标准配置。

  • 一阶与二阶动量m[i] 记录梯度的方向(惯性),v[i] 记录梯度的波动(环境自适应)。
  • **偏差修正 (Bias Correction)**:m_hatv_hat 的计算消除了训练初期的冷启动偏差。
  • 参数更新:仅用一行 p.data -= lr_t * m_hat / (v_hat ** 0.5 + eps_adam) 就完成了复杂的参数调整。

microgpt.py 的意义

microgpt.py代码之所以被称为“原子级”,是因为它把 PyTorch 里的 loss.backward()optimizer.step() 这种“黑盒”彻底拆解成了基础的加减乘除。

microgpt.py (附中文注释)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
"""
这是以最原子(atomic)的方式在纯 Python 中实现 GPT 的训练和推理。
该文件包含完整的算法,其余所有工业级库(如 PyTorch)只是为了提高效率。
"""

import os # 用于处理文件路径
import math # 提供 log, exp 等基础数学运算
import random # 提供随机数初始化和数据打乱

# 固定随机种子,确保实验可重复
random.seed(42)

# --- 1. 数据准备 (Dataset) ---
# 下载并读取 names.txt,这是一个包含数千个名字的数据集
if not os.path.exists('input.txt'):
import urllib.request
names_url = 'https://raw.githubusercontent.com/karpathy/makemore/refs/heads/master/names.txt'
urllib.request.urlretrieve(names_url, 'input.txt')

# 将数据集处理成字符串列表,每个名字是一个 document
docs = [l.strip() for l in open('input.txt').read().strip().split('\n') if l.strip()]
random.shuffle(docs)
print(f"num docs: {len(docs)}")

# --- 2. 分词器 (Tokenizer) ---
# 构建字符级词表。<BOS> (Beginning of String) 是起始占位符
chars = ['<BOS>'] + sorted(set(''.join(docs)))
vocab_size = len(chars)
# 映射表:字符 <-> 索引 (stoi: string to index, itos: index to string)
stoi = { ch:i for i, ch in enumerate(chars) }
itos = { i:ch for i, ch in enumerate(chars) }
BOS = stoi['<BOS>']
print(f"vocab size: {vocab_size}")

# --- 3. 自动求导引擎 (Autograd) ---
# 这是微型版的 Micrograd。它追踪每一个标量的运算,用于后续通过链式法则计算梯度。
class Value:
""" 存储单个标量及其梯度 """

def __init__(self, data, _children=(), _op=''):
self.data = data
self.grad = 0 # 存储损失函数对该变量的偏导数
self._backward = lambda: None # 反向传播的闭包函数
self._prev = set(_children) # 记录生成该节点的父节点(用于回溯)
self._op = _op

def __add__(self, other):
other = other if isinstance(other, Value) else Value(other)
out = Value(self.data + other.data, (self, other), '+')
def _backward():
self.grad += out.grad # 加法门:梯度直接传递给两个操作数
other.grad += out.grad
out._backward = _backward
return out

def __mul__(self, other):
other = other if isinstance(other, Value) else Value(other)
out = Value(self.data * other.data, (self, other), '*')
def _backward():
# 乘法门:每个操作数的梯度 = 另一个操作数的值 * 输出梯度
self.grad += other.data * out.grad
other.grad += self.data * out.grad
out._backward = _backward
return out

def __pow__(self, other):
assert isinstance(other, (int, float))
out = Value(self.data**other, (self,), f'**{other}')
def _backward():
# 幂函数导数:d(x^n)/dx = n * x^(n-1)
self.grad += (other * self.data**(other-1)) * out.grad
out._backward = _backward
return out

def log(self):
out = Value(math.log(self.data), (self,), 'log')
def _backward():
# log(x) 导数:1/x
self.grad += (1 / self.data) * out.grad
out._backward = _backward
return out

def exp(self):
out = Value(math.exp(self.data), (self,), 'exp')
def _backward():
# exp(x) 导数:exp(x)
self.grad += out.data * out.grad
out._backward = _backward
return out

def relu(self):
out = Value(0 if self.data < 0 else self.data, (self,), 'ReLU')
def _backward():
# ReLU 导数:大于0为1,小于0为0
self.grad += (out.data > 0) * out.grad
out._backward = _backward
return out

def backward(self):
# 拓扑排序:确保计算梯度时,先处理子节点,再处理父节点
topo = []
visited = set()
def build_topo(v):
if v not in visited:
visited.add(v)
for child in v._prev:
build_topo(child)
topo.append(v)
build_topo(self)
self.grad = 1 # 损失函数自身的梯度是 1
for v in reversed(topo):
v._backward()

# 运算符重载,让 Value 像普通浮点数一样参与运算
def __neg__(self): return self * -1
def __radd__(self, other): return self + other
def __sub__(self, other): return self + (-other)
def __rsub__(self, other): return other + (-self)
def __rmul__(self, other): return self * other
def __truediv__(self, other): return self * other**-1
def __rtruediv__(self, other): return other * self**-1
def __repr__(self): return f"Value(data={self.data}, grad={self.grad})"

# --- 4. 模型超参数与参数初始化 ---
n_embd = 16 # 每个字符的向量维度
n_head = 4 # 注意力头数
n_layer = 1 # 变压器层数
block_size = 8 # 上下文窗口最大长度
head_dim = n_embd // n_head # 每个头的维度

# 初始化工具:生成包含随机 Value 的矩阵
matrix = lambda nout, nin, std=0.02: [[Value(random.gauss(0, std)) for _ in range(nin)] for _ in range(nout)]

# 状态字典:存储模型权重
state_dict = {
'wte': matrix(vocab_size, n_embd), # 词嵌入矩阵 (Token Embedding)
'wpe': matrix(block_size, n_embd), # 位置嵌入矩阵 (Positional Embedding)
'lm_head': matrix(vocab_size, n_embd) # 输出层 (Language Model Head)
}
for i in range(n_layer):
state_dict[f'layer{i}.attn_wq'] = matrix(n_embd, n_embd) # Q权重
state_dict[f'layer{i}.attn_wk'] = matrix(n_embd, n_embd) # K权重
state_dict[f'layer{i}.attn_wv'] = matrix(n_embd, n_embd) # V权重
state_dict[f'layer{i}.attn_wo'] = matrix(n_embd, n_embd, std=0) # 注意力输出投影
state_dict[f'layer{i}.mlp_fc1'] = matrix(4 * n_embd, n_embd) # FFN 第一层 (4x 扩张)
state_dict[f'layer{i}.mlp_fc2'] = matrix(n_embd, 4 * n_embd, std=0) # FFN 第二层

# 将所有 Value 扁平化存入列表,方便优化器处理
params = [p for mat in state_dict.values() for row in mat for p in row]
print(f"num params: {len(params)}")

# --- 5. GPT 架构实现 ---

def linear(x, w):
""" 线性层:计算矩阵相乘 w * x """
return [sum(wi * xi for wi, xi in zip(wo, x)) for wo in w]

def softmax(logits):
""" 归一化操作:将分数转化为概率分布 """
max_val = max(val.data for val in logits) # 减去最大值防止溢出
exps = [(val - max_val).exp() for val in logits]
total = sum(exps)
return [e / total for e in exps]

def rmsnorm(x):
""" 均方根归一化:用于稳定网络训练 """
ms = sum(xi * xi for xi in x) / len(x)
scale = (ms + 1e-5) ** -0.5
return [xi * scale for xi in x]

def gpt(token_id, pos_id, keys, values):
""" GPT 单步前向传播逻辑 """
tok_emb = state_dict['wte'][token_id] # 获取字符向量
pos_emb = state_dict['wpe'][pos_id] # 获取位置向量
x = [t + p for t, p in zip(tok_emb, pos_emb)] # 融合语义和位置
x = rmsnorm(x)

for li in range(n_layer):
# 1) 多头自注意力机制 (Multi-head Attention)
x_residual = x # 残差连接
x = rmsnorm(x)
q = linear(x, state_dict[f'layer{li}.attn_wq'])
k = linear(x, state_dict[f'layer{li}.attn_wk'])
v = linear(x, state_dict[f'layer{li}.attn_wv'])
# 缓存 K 和 V 向量(KV Cache 逻辑,用于自回归生成)
keys[li].append(k)
values[li].append(v)

x_attn = []
for h in range(n_head):
hs = h * head_dim
q_h = q[hs:hs+head_dim]
k_h = [ki[hs:hs+head_dim] for ki in keys[li]]
v_h = [vi[hs:hs+head_dim] for vi in values[li]]
# 计算注意力分数: Dot(Q, K) / sqrt(d)
attn_logits = [sum(q_h[j] * k_h[t][j] for j in range(head_dim)) / head_dim**0.5 for t in range(len(k_h))]
attn_weights = softmax(attn_logits)
# 计算加权后的 V: Sum(Weight * V)
head_out = [sum(attn_weights[t] * v_h[t][j] for t in range(len(v_h))) for j in range(head_dim)]
x_attn.extend(head_out)

x = linear(x_attn, state_dict[f'layer{li}.attn_wo'])
x = [a + b for a, b in zip(x, x_residual)] # 残差相加

# 2) 前馈网络 (MLP Block)
x_residual = x
x = rmsnorm(x)
x = linear(x, state_dict[f'layer{li}.mlp_fc1'])
x = [xi.relu() ** 2 for xi in x] # ReLU^2 激活函数
x = linear(x, state_dict[f'layer{li}.mlp_fc2'])
x = [a + b for a, b in zip(x, x_residual)]

# 输出层投影到词表大小的 logits
logits = linear(x, state_dict['lm_head'])
return logits

# --- 6. 训练循环 (Training) ---

# Adam 优化器参数
learning_rate, beta1, beta2, eps_adam = 1e-2, 0.9, 0.95, 1e-8
m = [0.0] * len(params) # 一阶动量缓存
v = [0.0] * len(params) # 二阶动量缓存

num_steps = 500
for step in range(num_steps):

# 随机取一个名字作为训练样本,在首尾添加 <BOS>
doc = docs[step % len(docs)]
tokens = [BOS] + [stoi[ch] for ch in doc] + [BOS]
n = min(block_size, len(tokens) - 1)

# 前向传播:计算 Loss
keys, values = [[] for _ in range(n_layer)], [[] for _ in range(n_layer)]
losses = []
for pos_id in range(n):
token_id, target_id = tokens[pos_id], tokens[pos_id + 1]
logits = gpt(token_id, pos_id, keys, values)
probs = softmax(logits)
# 交叉熵损失:-log(预测正确 token 的概率)
loss_t = -probs[target_id].log()
losses.append(loss_t)
loss = (1 / n) * sum(losses) # 计算整句的平均损失

# 反向传播:计算所有参数的 grad
loss.backward()

# Adam 优化器参数更新
lr_t = learning_rate * (1 - step / num_steps) # 学习率衰减
for i, p in enumerate(params):
m[i] = beta1 * m[i] + (1 - beta1) * p.grad
v[i] = beta2 * v[i] + (1 - beta2) * p.grad ** 2
m_hat = m[i] / (1 - beta1 ** (step + 1))
v_hat = v[i] / (1 - beta2 ** (step + 1))
p.data -= lr_t * m_hat / (v_hat ** 0.5 + eps_adam)
p.grad = 0 # !!!非常重要:更新后将梯度清零,否则会累加到下一次

print(f"step {step+1:4d} / {num_steps:4d} | loss {loss.data:.4f}")

# --- 7. 推理生成 (Inference) ---
temperature = 0.6 # 控制采样的随机性,值越低生成越保守
print("\n--- 推理结果 ---")
for sample_idx in range(20):
keys, values = [[] for _ in range(n_layer)], [[] for _ in range(n_layer)]
token_id = BOS
print(f"sample {sample_idx+1}: ", end="")
for pos_id in range(block_size):
logits = gpt(token_id, pos_id, keys, values)
# 应用温度系数,重新计算概率分布
probs = softmax([l / temperature for l in logits])
# 根据概率随机采样下一个字符
token_id = random.choices(range(vocab_size), weights=[p.data for p in probs])[0]
if token_id == BOS: # 如果遇到结束符则停止该句生成
break
print(itos[token_id], end="")
print()

如 Andrej Karpathy (AK) 所言:AI 既是天才也是智障。🤔

microgpt.py Andrej Karpathy (AK)源码详解GPT训练和推理艺术过程(附中文注释)

https://www.wdft.com/90a300cf.html

Author

Jaco Liu

Posted on

2025-02-12

Updated on

2026-02-12

Licensed under