0%

大模型推理中的三种并行:数据并行、张量并行与流水线并行

这篇文章我们学习模型推理中的几个基础知识,包括数据并行、张量并行与流水线并行,并通过实际的 Demo 来加深理解。

1. 为什么需要并行?

当大语言模型的参数量达到数十亿甚至数千亿时,单张 GPU 的显存和算力已远远不够。我们需要将计算分布到多张 GPU 上,有三种核心策略:

  • 数据并行(Data Parallelism, DP):每组设备持有完整的模型副本,但处理不同的用户请求,通过增加副本数线性提升系统的总吞吐量(Throughput)
  • 张量并行(Tensor Parallelism, TP):把同一层的权重切分到多张 GPU,多卡利用高速互联(如 NVLink)并行计算,突破单卡显存带宽与容量限制,极致降低单个 Token 的生成延迟(Latency)
  • 流水线并行(Pipeline Parallelism, PP):把不同层纵向划分到不同的 GPU/机器上,数据像工厂流水线一样依次流过各 Stage,突破单台机器的显存极限以容纳超规模模型

它们解决的是完全不同的问题:

数据并行 (DP) 张量并行 (TP) 流水线并行 (PP)
解决什么问题 吞吐量不够 单卡显存带宽/容量不够 单机显存不够
切分什么 请求(数据) 权重 模型层
每卡存什么 完整模型 + 不同请求 部分权重 + 完整数据 部分层 + 完整数据
推理时通信 零通信 每层 All-Reduce stage 间 send/recv
对延迟的影响 不降低单条延迟 降低单 token 延迟 不降低(甚至增加)

一个更直观的类比——假设要翻译一本 1000 页的书,人手不够需要多人协作:

  • DP = 多人各拿一本副本,每人翻不同章节:每人做同样的翻译工作,但处理不同的页,总速度翻倍。前提:每人都能看懂整本书
  • TP = 多人合翻每一页:一个人翻一页的速度受限于阅读速度(带宽瓶颈),于是对每一页,每人都读不同段落再拼起来,相当于多人并行阅读,每一页都更快
  • PP = 多人接力:每人只翻固定范围的页(第 1-200 页、第 201-400 页…),翻完一页传给下一个人。每人只需掌握自己那部分,但前后有等待

数据并行(Data Parallelism)

核心思想

数据并行是最直觉的并行方式:每张 GPU 上跑一份完整的模型,但处理不同的请求。各卡完全独立,零通信,吞吐量线性增长。

1
2
GPU 0: 完整模型 + 请求 A, B, C   → 结果 A, B, C    (独立,零通信)
GPU 1: 完整模型 + 请求 D, E, F → 结果 D, E, F (独立,零通信)

推理场景中数据并行极其简单——不需要任何卡间通信,每张卡就是一个独立的推理服务,卡越多同时处理的请求越多。

注:训练场景下数据并行需要在反向传播后通过 All-Reduce 同步梯度,保证各卡参数一致,但本文聚焦推理,不再展开。

数据并行 Demo

接下来我们通过一个简单示例来演示数据并行的基础原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""
Demo 5: 数据并行(Data Parallelism)基础原理

核心思想:每张 GPU 持有模型的完整副本,但处理不同的用户请求。
推理时各卡完全独立,无需任何通信,吞吐量线性增长。

运行方式:
CUDA_VISIBLE_DEVICES=6,7 torchrun --nproc_per_node=2 5_data_parallel.py
"""

import os
import time
import torch
import torch.nn as nn
import torch.distributed as dist


def main():
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
world_size = dist.get_world_size()
local_gpu = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_gpu)
device = torch.cuda.current_device()

hidden_dim = 256
batch_size = 16
num_iters = 100
total_batch = batch_size * world_size

model = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1),
).to(device)

torch.manual_seed(42)
for p in model.parameters():
nn.init.normal_(p, std=0.02)

# warmup
with torch.no_grad():
for _ in range(10):
_ = model(torch.randn(batch_size, hidden_dim, device=device))
torch.cuda.synchronize()

# --- 单卡基线:仅 rank 0 处理全部 total_batch 条请求 ---
if rank == 0:
X_single = torch.randn(total_batch, hidden_dim, device=device)
t0 = time.time()
with torch.no_grad():
for _ in range(num_iters):
_ = model(X_single)
torch.cuda.synchronize()
single_time = (time.time() - t0) / num_iters * 1000
else:
single_time = 0.0

# --- 数据并行:每卡各处理 batch_size 条,总量相同 ---
torch.manual_seed(100 + rank)
X_dp = torch.randn(batch_size, hidden_dim, device=device)
dist.barrier()
t0 = time.time()
with torch.no_grad():
for _ in range(num_iters):
_ = model(X_dp)
torch.cuda.synchronize()
dp_time = (time.time() - t0) / num_iters * 1000

single_tensor = torch.tensor([single_time], device=device)
dp_tensor = torch.tensor([dp_time], device=device)
dist.all_reduce(single_tensor, op=dist.ReduceOp.MAX)
dist.all_reduce(dp_tensor, op=dist.ReduceOp.MAX)

if rank == 0:
single_throughput = total_batch / (single_tensor.item() / 1000)
dp_throughput = total_batch / (dp_tensor.item() / 1000)
print(f"=== 推理数据并行 vs 单卡 ===")
print(f"模型: {hidden_dim}-dim, {sum(p.numel() for p in model.parameters())} 参数")
print(f"总请求数: {total_batch}, 迭代次数: {num_iters}")
print()
print(f"单卡基线:")
print(f" 1 卡处理 {total_batch} 条请求, 耗时 {single_tensor.item():.2f} ms, 吞吐 {single_throughput:.0f} samples/s")
print()
print(f"数据并行 ({world_size} 卡):")
print(f" 每卡处理 {batch_size} 条请求 (共 {total_batch} 条)")
print(f" 耗时 {dp_tensor.item():.2f} ms, 吞吐 {dp_throughput:.0f} samples/s")
print()
print(f"吞吐提升: {dp_throughput / single_throughput:.2f}x (接近线性 {world_size}x)")
print(f"前提: 单卡必须放得下完整模型")

dist.destroy_process_group()


if __name__ == "__main__":
main()

测试结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# CUDA_VISIBLE_DEVICES=6,7 torchrun --nproc_per_node=2 5_data_parallel.py
o
=== 推理数据并行 vs 单卡 ===
模型: 256-dim, 197633 参数
总请求数: 32, 迭代次数: 100

单卡基线:
1 卡处理 32 条请求, 耗时 0.41 ms, 吞吐 78287 samples/s

数据并行 (2 卡):
每卡处理 16 条请求 (共 32 条)
耗时 0.16 ms, 吞吐 206222 samples/s

吞吐提升: 2.63x (接近线性 2x)
前提: 单卡必须放得下完整模型

demo 基本原理

torchrun 是 PyTorch 提供的分布式训练启动工具,用于替代旧版的 torch.distributed.launch。核心功能:

  • 一键启动多进程分布式训练(单机多卡 / 多机多卡)
  • 自动设置环境变量(LOCAL_RANK、WORLD_RANK、WORLD_SIZE、MASTER_ADDR 等)
  • 处理进程间通信(NCCL/GLOO backend)

torchrun 的本质是替代手动执行多次 python script.py,它确保所有进程同时启动、环境变量一致、并且自动管理进程组初始化所需的通信地址。

上述启动命令中:

  • CUDA_VISIBLE_DEVICES=6,7:告诉 PyTorch 只能看到第 6、7 号 GPU,这样进程内的 GPU 编号为 0 和 1,避免意外占用其他卡
  • torchrun:PyTorch 提供的多进程启动器,它会同时拉起 --nproc_per_node 个进程,每个进程运行相同的 Python 脚本,并自动设置以下环境变量:
    • RANK:全局进程编号(0, 1, …)
    • LOCAL_RANK:本机内的进程编号,对应使用的 GPU 编号(0 或 1)
    • WORLD_SIZE:总进程数(这里为 2)
    • MASTER_ADDR / MASTER_PORT:用于进程间通信的主地址和端口
  • nproc_per_node:每个节点的进程数(通常等于 GPU 数),每个进程分配一个 GPU(通过 LOCAL_RANK 区分)

torch.distributed(简称 dist)是 PyTorch 的分布式通信库,提供多进程间的协调与数据交换原语:

API 作用 本 Demo 中的用途
dist.init_process_group(backend="nccl") 初始化进程组,建立所有进程间的通信通道 启动时调用,nccl 后端针对 GPU 间高速通信(NVLink/PCIe)优化
dist.get_rank() 获取当前进程的全局编号 区分不同进程的角色(rank 0 负责打印)
dist.get_world_size() 获取总进程数 计算总 batch = batch_size × world_size
dist.barrier() 同步屏障,所有进程都到达后才继续 确保计时前所有进程都完成 warmup
dist.all_reduce(tensor, op) 对所有进程的 tensor 做规约操作(SUM/MAX 等),结果写回每个进程 用 MAX 取所有卡中最慢的耗时,作为统一的计时基准
dist.destroy_process_group() 销毁进程组,释放通信资源 程序结束前清理

NCCL 后端:NVIDIA Collective Communications Library,专门为 GPU 集合通信优化。它自动选择最快的通信路径(NVLink > PCIe > 网络),在单机多卡场景下几乎零开销。

  • rank:全局进程编号,0 到 world_size-1,用于标识"我是哪个进程"
  • local_rank:单机内的 GPU 编号,用于绑定进程到具体的 GPU

每个进程必须通过 set_device 绑定到不同的 GPU,否则多个进程会竞争同一张卡的显存和算力。例如如下例子中只提供一个卡,但进程数为 2,则运行错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# CUDA_VISIBLE_DEVICES=5 torchrun --nproc_per_node=2 5_data_parallel.py

[rank1]: Traceback (most recent call last):
[rank1]: File "/root/fuchencong/python_test/ai_demo/5_data_parallel.py", line 100, in <module>
[rank1]: main()
[rank1]: File "/root/fuchencong/python_test/ai_demo/5_data_parallel.py", line 23, in main
[rank1]: torch.cuda.set_device(local_gpu)
[rank1]: File "/root/.pyenv/versions/3.12.12/lib/python3.12/site-packages/torch/cuda/__init__.py", line 584, in set_device
[rank1]: torch._C._cuda_setDevice(device)
[rank1]: torch.AcceleratorError: CUDA error: invalid device ordinal
[rank1]: GPU device may be out of range, do you have enough GPUs?
[rank1]: CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
[rank1]: For debugging consider passing CUDA_LAUNCH_BLOCKING=1
[rank1]: Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.

接下来我们再详细分析这个 demo 中各段代码中的主要功能:

1. 初始化与 GPU 绑定

1
2
3
4
5
6
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
world_size = dist.get_world_size()
local_gpu = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_gpu)
device = torch.cuda.current_device()
  • 每个进程执行相同代码,但通过 rank 和 local_rank 走不同的分支。init_process_group 让所有进程互相发现并建立通信通道

2. 模型构建——每卡一份完整副本

1
2
3
4
5
6
7
8
9
10
11
12
13
model = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1),
).to(device)

torch.manual_seed(42)
for p in model.parameters():
nn.init.normal_(p, std=0.02)
  • 所有进程使用相同的随机种子 42 初始化权重,确保每张卡上的模型参数完全一致。这就是 数据并行的前提——每张卡持有相同的模型副本。如果参数不一致,各卡的推理结果就没有可比性

3. Warmup——预热 CUDA Kernel

1
2
3
4
with torch.no_grad():
for _ in range(10):
_ = model(torch.randn(batch_size, hidden_dim, device=device))
torch.cuda.synchronize()

CUDA 的 kernel 编译和缓存是惰性的:第一次执行时会触发 JIT 编译,耗时远高于后续执行。warmup 的目的就是提前触发编译,确保正式计时时不包含编译开销。torch.cuda.synchronize() 确保所有 GPU 操作完成后再继续。

4. 单卡基线——1 张卡处理全部请求

1
2
3
4
5
6
7
8
9
10
if rank == 0:
X_single = torch.randn(total_batch, hidden_dim, device=device)
t0 = time.time()
with torch.no_grad():
for _ in range(num_iters):
_ = model(X_single)
torch.cuda.synchronize()
single_time = (time.time() - t0) / num_iters * 1000
else:
single_time = 0.0
  • 只有 rank 0 执行单卡基线测试:用 1 张卡处理 total_batch = batch_size × world_size = 32 条请求。torch.cuda.synchronize() 确保计时覆盖完整的 GPU 执行时间:
  • GPU 操作是异步的,time.time() 只度量 CPU 侧发起调用的时间,必须 synchronize 等待 GPU 完成才能得到真实耗时
  • num_iters=100 次迭代取平均,消除单次执行的波动

5. 数据并行——每卡处理部分请求

1
2
3
4
5
6
7
8
9
torch.manual_seed(100 + rank)
X_dp = torch.randn(batch_size, hidden_dim, device=device)
dist.barrier()
t0 = time.time()
with torch.no_grad():
for _ in range(num_iters):
_ = model(X_dp)
torch.cuda.synchronize()
dp_time = (time.time() - t0) / num_iters * 1000

与单卡基线的关键区别:

  • torch.manual_seed(100 + rank):不同 rank 使用不同种子,模拟各卡处理不同的用户请求
  • 每卡只处理 batch_size = 16 条(而非 total_batch = 32),2 卡合计处理 32 条,与单卡基线的总请求量相同
  • dist.barrier():同步所有进程,确保计时起点一致
  • 推理过程本身零通信——每张卡独立前向传播,不需要与其他卡交换任何数据

6. 汇总结果——all_reduce 仅用于统计

1
2
3
4
single_tensor = torch.tensor([single_time], device=device)
dp_tensor = torch.tensor([dp_time], device=device)
dist.all_reduce(single_tensor, op=dist.ReduceOp.MAX)
dist.all_reduce(dp_tensor, op=dist.ReduceOp.MAX)

all_reduce(MAX) 取所有卡中的最大耗时。这是性能统计的手段,不是推理计算的一部分。实际推理服务中不需要任何 all_reduce。用 MAX 而非 SUM 的原因:数据并行中各卡并行执行,总耗时取决于最慢的那张卡。

7. 吞吐量计算

1
2
single_throughput = total_batch / (single_tensor.item() / 1000)
dp_throughput = total_batch / (dp_tensor.item() / 1000)

吞吐量 = 总请求数 / 总耗时。两组实验处理的总请求数相同(都是 32 条),但数据并行的耗时更短,因此吞吐量更高。

数据并行的限制

  • 数据并行有一个前提条件:单卡必须能放下完整的模型。如果一个 175B 参数的模型需要 350GB 显存,而单张 GPU 只有 80GB,数据并行根本无法使用——模型都放不进去,更谈不上处理请求。
  • 而且推理场景下,数据并行只能提升吞吐量(QPS),不能降低单条请求的延迟——每条请求仍然只由一张卡处理,耗时不变
  • 这就是张量并行和流水线并行存在的意义:它们不是替代数据并行,而是解决数据并行解决不了的问题

张量并行(Tensor Parallelism)

核心思想

大模型自回归推理的性能瓶颈通常不是算力不足,而是显存带宽不足——每生成一个 token,都需要从显存加载全部权重,而计算只用了很少的周期。张量并行把每一层的权重都切到多张 GPU 上,每卡只需加载和计算自己那份分片,通过 NVLink 高速互联同步中间结果,从而突破单卡带宽和容量限制,降低单 token 延迟。

对于矩阵乘法 Y = X @ W,权重 W 可以按列切分:

1
2
3
4
5
6
W = [W₀ | W₁]    (按列切为两块)

GPU 0: Y₀ = X @ W₀ (只算左半部分)
GPU 1: Y₁ = X @ W₁ (只算右半部分)

最终: Y = [Y₀ | Y₁] (All-Gather 合并)

需要注意的是,每一层的权重都会被切分,而不是只有某一层。也就是说,每张卡存的是所有层各自的一部分权重:

1
2
GPU 0: Layer0_W₀, Layer1_W₀, Layer2_W₀, ...
GPU 1: Layer0_W₁, Layer1_W₁, Layer2_W₁, ...

每个 token 生成时,数据依次经过所有层,每一层都在多卡之间并行计算和通信。

张量并行的优势

大模型的推理分为两个阶段:

  • Prefill(首字延迟阶段):输入一整个 Prompt,可以进行矩阵乘法(Batching),此时是算力受限(Compute-bound)。
  • Decoding(后续 Token 生成阶段):每生成一个新 Token,都需要把整个模型的权重从 HBM(显存)里读出来,与这一个 Token 进行计算

可以用计算强度(Operational Intensity)来量化这个过程:

计算强度=计算量 (FLOPs)访存量 (Bytes)\text{计算强度} = \frac{\text{计算量 (FLOPs)}}{\text{访存量 (Bytes)}}

在 Decoding 阶段,对于一个大小为 PP 的模型,生成一个 Token:

  • 访存量:约 2×P2 \times P 字节(FP16 格式下,每个参数 2 字节)
  • 计算量:约 2×P2 \times P FLOPs(每个参数做一次乘加法,即 2 次浮点运算)
  • 此时的计算强度:2P/2P=1 FLOP/Byte2P / 2P = 1 \text{ FLOP/Byte}

现代 GPU(如 A100 或 H100)的算力与带宽比通常在 100+ FLOP/Byte 级别。这意味着,GPU 每读取 1 字节的数据,其算力足够支撑做 100 多次计算。而在 Decoding 阶段,GPU 每读 1 字节只做 1 次计算,99% 的时间算力都在痛苦地等待显存数据的读取。这就是典型的访存受限(Memory-bound)

而张量并行(TP)通过如下方式来突破这个限制:

  • 显存带宽翻倍(最核心原因):假设单卡显存带宽为 BB。切分到 NN 张卡后,每张卡只需要加载 1N\frac{1}{N} 的权重,花费的时间变为原来的 1N\frac{1}{N}。更重要的是,多张 GPU 是并行读取各自显存的,整个系统的总显存带宽变成了 N×BN \times B。既然瓶颈在带宽,带宽翻了 NN 倍,读取时间自然大幅缩短
  • 突破容量限制:超大模型(如 70B 甚至更大)单卡根本装不下。TP 把模型打散,利用了多卡的显存总容量
  • NVLink 高速同步:由于每一层都被切开了,两张卡算出的都是 部分结果(Partial Sum),在进入下一层之前,必须通过 All-Reduce 或 All-Gather 操作把结果拼起来。这就极其依赖 NVLink 这种百 GB/s 级别的机内互联带宽

张量并行 demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""
Demo 1: 张量并行(Tensor Parallelism)基础原理

核心思想:将同一个矩阵乘法 Y = XW 的权重 W 按列切分到多张 GPU 上,
每张 GPU 独立计算自己那份分块,最后通过 All-Gather 合并得到完整结果。

运行方式:
CUDA_VISIBLE_DEVICES=6,7 torchrun --nproc_per_node=2 1_tensor_parallel.py
"""

import os
import torch
import torch.distributed as dist


def main():
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
world_size = dist.get_world_size()
local_gpu = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_gpu)
device = torch.cuda.current_device()

torch.manual_seed(42)

seq_len = 4
hidden_dim = 8
X = torch.randn(seq_len, hidden_dim, device=device)

W_full = torch.randn(hidden_dim, hidden_dim, device=device)

# --- 列切分:W_full 按列切成 world_size 份 ---
W_chunks = list(W_full.chunk(world_size, dim=1))
W_local = W_chunks[rank]

col_dim = W_local.shape[1]

# --- 单卡完整计算(ground truth) ---
Y_full = X @ W_full

# --- 张量并行计算:每卡只算自己那部分 ---
Y_local = X @ W_local # shape: (seq_len, col_dim)

# --- All-Gather 合并所有分块结果 ---
gathered = [torch.zeros(seq_len, col_dim, device=device) for _ in range(world_size)]
dist.all_gather(gathered, Y_local)
Y_tp = torch.cat(gathered, dim=1)

max_diff = (Y_full - Y_tp).abs().max().item()

if rank == 0:
print(f"=== 张量并行基础 Demo ===")
print(f"权重形状: {W_full.shape}, 列切分后每卡: {W_local.shape}")
print(f"完整计算 Y[:2,:4]:\n{Y_full[:2, :4]}")
print(f"TP 计算 Y[:2,:4]:\n{Y_tp[:2, :4]}")
print(f"最大误差: {max_diff:.2e}")
print(f"结果一致: {'YES' if max_diff < 1e-5 else 'NO'}")

dist.destroy_process_group()


if __name__ == "__main__":
main()

测试结果:

1
2
3
4
5
6
7
8
9
10
11
12
# CUDA_VISIBLE_DEVICES=6,7 torchrun --nproc_per_node=2 5_data_parallel.py

=== 张量并行基础 Demo ===
权重形状: torch.Size([8, 8]), 列切分后每卡: torch.Size([8, 4])
完整计算 Y[:2,:4]:
tensor([[ 3.3126, -2.5128, -0.9426, -0.1065],
[ 1.6430, 2.9247, 1.1576, 3.2384]], device='cuda:0')
TP 计算 Y[:2,:4]:
tensor([[ 3.3126, -2.5128, -0.9426, -0.1065],
[ 1.6430, 2.9247, 1.1576, 3.2384]], device='cuda:0')
最大误差: 0.00e+00
结果一致: YES
  • X(输入矩阵):形状为 (4, 8)
  • W_full (完整权重):形状为 (8, 8)
  • W_full.chunk(2, dim=1):沿着列(dim=1)将 8×88 \times 8 的矩阵平分成两个 8×48 \times 4 的子矩阵
  • W_local = W_chunks[rank]
    • Rank 0 拿走前半部分列,形状为 (8, 4)
    • Rank 1 拿走后半部分列,形状为 (8, 4)
  • dist.all_gather(gathered, Y_local) 聚合所有卡的结果

流水线并行(Pipeline Parallelism)

核心思想

当模型太大单卡放不下时,最直接的办法是把不同层纵向划分到不同 GPU/机器上:

1
2
3
4
GPU 0: Layer 0, 1, 2, 3    (Stage 0)
GPU 1: Layer 4, 5, 6, 7 (Stage 1)

数据流: Input → Stage 0 → Stage 1 → Output

PP 的代价是流水线气泡——同一时刻只有一个 stage 在工作,其他 stage 在等待,导致硬件利用率下降。同时单条请求需要串行经过所有 stage,延迟不降反增。微批次技术可以缓解气泡,但无法完全消除。

微批次技术:降低推理首条结果延迟

推理场景只有前向传播。流水线并行的核心问题是:Stage 1 必须等 Stage 0 算完才能开始,导致首条结果的延迟等于 Stage 0 的完整处理时间。

朴素方式:整个 batch 在 Stage 0 一次性处理完,再一次性传给 Stage 1。

1
2
3
Stage 0: |======== 处理全部 32 条 ========| → 一次 send
Stage 1: |======== 处理全部 32 条 ========|
↑── Stage 1 长时间空闲等待 ──↑ 首条结果延迟 = Stage 0 耗时 + Stage 1 耗时

微批次方式:拆成多个 micro-batch,Stage 0 算完一个就立刻传给 Stage 1。

1
2
3
Stage 0: |mb0|mb1|mb2|mb3|  (逐个处理,逐个传递)
Stage 1: |mb0|mb1|mb2|mb3| (更早开始)
↑ 首条结果延迟大幅降低

流水线 Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
Demo 2: 流水线并行(Pipeline Parallelism)基础原理

核心思想:将模型的不同层分配到不同 GPU 上,数据像流水线一样依次流过各 stage。
朴素方式下,同一时刻只有一个 stage 在工作,效率很低。
微批次(micro-batch)技术将一个大 batch 拆成多个小 batch,
让各 stage 可以交错执行,大幅减少流水线气泡(bubble)。

运行方式:
CUDA_VISIBLE_DEVICES=6,7 torchrun --nproc_per_node=2 2_pipeline_parallel.py
"""

import os
import time
import torch
import torch.nn as nn
import torch.distributed as dist


class Stage(nn.Module):
def __init__(self, layers_list):
super().__init__()
self.layers = nn.Sequential(*layers_list)

def forward(self, x):
return self.layers(x)


def simulate_naive_pipeline(stage, x_batches, rank, world_size, device):
"""朴素流水线:整个 batch 在 stage 0 算完,再传给 stage 1 算完"""
if rank == 0:
outputs = []
for mb in x_batches:
out = stage(mb)
dist.send(out, dst=1)
outputs.append(out)
return outputs
else:
outputs = []
for _ in x_batches:
buf = torch.empty_like(x_batches[0]).to(device)
dist.recv(buf, src=0)
out = stage(buf)
outputs.append(out)
return outputs


def simulate_microbatch_pipeline(stage, x_batches, rank, world_size, device):
"""微批次流水线:每个 micro-batch 算完立刻传给下一个 stage"""
if rank == 0:
outputs = []
for i, mb in enumerate(x_batches):
out = stage(mb)
dist.send(out, dst=1)
outputs.append(out)
return outputs
else:
outputs = []
for i in range(len(x_batches)):
buf = torch.empty_like(x_batches[0]).to(device)
dist.recv(buf, src=0)
out = stage(buf)
outputs.append(out)
return outputs


def main():
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
world_size = dist.get_world_size()
local_gpu = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_gpu)
device = torch.cuda.current_device()

torch.manual_seed(42)

hidden_dim = 256
num_layers = 8
num_microbatches = 4
batch_per_mb = 8

layers = [nn.Linear(hidden_dim, hidden_dim) for _ in range(num_layers)]
for l in layers:
nn.init.eye_(l.weight)
nn.init.zeros_(l.bias)

layers_per_stage = num_layers // world_size
my_layers = layers[rank * layers_per_stage : (rank + 1) * layers_per_stage]
stage = Stage(my_layers).to(device)

x_batches = [torch.randn(batch_per_mb, hidden_dim, device=device) for _ in range(num_microbatches)]

# --- 朴素流水线 ---
dist.barrier()
t0 = time.time()
naive_outs = simulate_naive_pipeline(stage, x_batches, rank, world_size, device)
torch.cuda.synchronize()
dist.barrier()
naive_time = time.time() - t0

# --- 微批次流水线 ---
dist.barrier()
t0 = time.time()
mb_outs = simulate_microbatch_pipeline(stage, x_batches, rank, world_size, device)
torch.cuda.synchronize()
dist.barrier()
mb_time = time.time() - t0

# --- 单卡完整计算作为 ground truth ---
full_model = Stage(layers).to(device)
full_outs = [full_model(mb) for mb in x_batches]

if rank == 1:
max_diff = max(
(naive_outs[i] - full_outs[i]).abs().max().item()
for i in range(num_microbatches)
)
print(f"=== 流水线并行基础 Demo ===")
print(f"模型: {num_layers} 层, {world_size} 个 stage, 每个 stage {layers_per_stage} 层")
print(f"微批次: {num_microbatches} 个, 每个 {batch_per_mb} 条样本")
print(f"朴素流水线耗时: {naive_time*1000:.1f} ms")
print(f"微批次流水线耗时: {mb_time*1000:.1f} ms")
print(f"最大误差 (vs 单卡): {max_diff:.2e}")
print(f"结果一致: {'YES' if max_diff < 1e-3 else 'NO'}")

dist.destroy_process_group()


if __name__ == "__main__":
main()

测试结果:

1
2
3
4
5
6
7
=== 流水线并行基础 Demo ===
模型: 8 层, 2 个 stage, 每个 stage 4 层
微批次: 4 个, 每个 8 条样本
朴素流水线耗时: 319.4 ms
微批次流水线耗时: 1.5 ms
最大误差 (vs 单卡): 0.00e+00
结果一致: YES

Demo 详解

流水线并行的 stage 间通信使用点对点(P2P)原语,而非集合通信:

API 作用 本 Demo 中的用途
dist.send(tensor, dst) 将 tensor 发送到目标 rank Stage 0 将前向结果发送给 Stage 1
dist.recv(tensor, src) 从源 rank 接收 tensor 到预分配缓冲区 Stage 1 接收 Stage 0 的中间结果

与张量并行中使用的集合通信(All-Gather、All-Reduce)不同,send/recv 是两个进程之间的直接通信,只涉及一对 stage,不需要所有进程同时参与。这也意味着流水线并行的通信带宽需求较低,即使跨机器的普通网络也能胜任。

1. 模型切分——按层划分 stage

1
2
3
layers_per_stage = num_layers // world_size
my_layers = layers[rank * layers_per_stage : (rank + 1) * layers_per_stage]
stage = Stage(my_layers).to(device)

8 层模型、2 个 stage:rank 0 持有第 0-3 层,rank 1 持有第 4-7 层。这就是流水线并行的核心切分方式——纵向切分模型,与张量并行的横向切分权重完全不同。

2. 朴素流水线——整个 batch 一次性处理

1
2
3
4
5
6
7
8
9
10
# Stage 0: 一次性处理全部 32 条,一次性发送
if rank == 0:
out = stage(full_batch) # shape: (32, 256)
dist.send(out, dst=1) # 一次 send

# Stage 1: 必须等 Stage 0 全部完成
else:
buf = torch.empty_like(full_batch, device=device)
dist.recv(buf, src=0) # recv 是阻塞的,必须等 send 完成
naive_out = stage(buf) # 一次性处理全部 32 条

代码结构简洁:一次 stage() + 一次 send/recv。但 recv 是阻塞的——Stage 1 必须等 Stage 0 把全部 32 条请求都处理完才能开始。首条结果的延迟等于 Stage 0 的完整处理时间。

3. 微批次流水线——逐个处理、逐个传递

1
2
3
4
5
6
7
8
9
10
11
12
13
# Stage 0: 循环处理,算完一个就发
if rank == 0:
for mb in micro_batches: # 4 个 × 8 条
out = stage(mb) # 每个 shape: (8, 256)
dist.send(out, dst=1) # 立刻发送

# Stage 1: 循环接收,收到一个就处理
else:
for i in range(num_microbatches):
buf = torch.empty_like(micro_batches[0], device=device)
dist.recv(buf, src=0) # 收到第一个 micro-batch 就开始
out = stage(buf)
mb_outputs.append(out)

代码结构与朴素流水线截然不同:循环 stage() + 循环 send/recv。Stage 0 算完 8 条就立刻传给 Stage 1,Stage 1 可以更早开始计算。关键指标是首条结果延迟——Stage 1 处理完第一个 micro-batch的时间,而非全部完成的时间。

4. 正确性验证与跨进程数据传递

1
2
3
4
5
6
7
if rank == 1:
full_model = Stage(layers).to(device) # 单卡完整 8 层模型
naive_diff = (naive_out - ref_out).abs().max().item()
mb_diff = (mb_all - ref_mb_all).abs().max().item()
timing = torch.tensor([naive_first, naive_total, mb_first, mb_total, ...], device=device)

dist.broadcast(timing, src=1) # 将 rank 1 的计时数据广播给 rank 0

正确性验证和计时都在 rank 1 上完成(因为最终结果在 Stage 1 产生),然后通过 dist.broadcast 传给 rank 0 打印。broadcastall_reduce 不同——它是单向的:只有 src 进程的数据被发送,其他进程接收。

6. 总结

  • 数据并行解决吞吐量问题——推理时各卡独立处理不同请求(零通信),训练时 All-Reduce 同步梯度,前提是单卡放得下模型
  • 张量并行解决单卡显存带宽与容量瓶颈——在同一层内切分权重,每卡加载更少权重、通过 NVLink 并行计算,降低单 token 生成延迟
  • 流水线并行解决单机显存瓶颈——按层纵向切分模型以容纳超规模模型,代价是流水线气泡和延迟增加,属于"能用但不快"的方案
  • 三种并行是正交的,解决不同维度的问题,实际部署中常组合使用