|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
引言
在深度学习项目开发中,GPU内存(显存)管理是一个至关重要的环节。随着模型规模的不断扩大和数据集的日益增长,GPU显存往往成为限制模型训练和推理的瓶颈。不当的内存管理不仅会导致程序运行效率低下,还可能引发显存泄漏,最终导致程序崩溃。本文将全面介绍Python中释放显存的实用技术,帮助开发者有效管理GPU内存资源,防止显存泄漏和程序崩溃。
GPU内存基础
GPU显存的工作原理
GPU显存(Video RAM,简称VRAM)是专门用于存储GPU计算所需数据的内存。与系统内存(RAM)不同,GPU显存具有更高的带宽和更低的延迟,专门为并行计算优化。在深度学习中,模型参数、中间激活值、梯度以及批处理数据都需要存储在GPU显存中。
Python与GPU内存的交互
Python通过深度学习框架(如PyTorch、TensorFlow)与GPU进行交互。这些框架提供了高级API,使开发者能够轻松地将计算任务和数据分配到GPU上。然而,这种抽象也意味着开发者对底层内存管理的控制有限,容易导致内存问题。
- # 检查GPU可用性
- import torch
- if torch.cuda.is_available():
- print(f"可用GPU数量: {torch.cuda.device_count()}")
- print(f"当前GPU: {torch.cuda.current_device()}")
- print(f"GPU名称: {torch.cuda.get_device_name(torch.cuda.current_device())}")
- print(f"GPU显存总量: {torch.cuda.get_device_properties(torch.cuda.current_device()).total_memory / 1024**3:.2f} GB")
复制代码
深度学习框架中的内存管理
PyTorch内存管理机制
PyTorch使用缓存分配器(Caching Allocator)来管理GPU内存。这种机制通过维护一个内存池来减少内存分配和释放的开销。当PyTorch需要分配内存时,它会首先检查内存池中是否有足够大的空闲块;如果没有,它会向操作系统申请新的内存。当释放内存时,PyTorch不会立即将内存返回给操作系统,而是将其保留在内存池中以备将来使用。
- import torch
- # 查看PyTorch内存分配统计
- def print_memory_stats():
- allocated = torch.cuda.memory_allocated() / 1024**3 # GB
- cached = torch.cuda.memory_reserved() / 1024**3 # GB
- print(f"已分配显存: {allocated:.2f} GB")
- print(f"缓存显存: {cached:.2f} GB")
- # 创建张量并查看内存使用
- a = torch.randn(10000, 10000, device='cuda')
- print_memory_stats()
- # 删除张量
- del a
- torch.cuda.empty_cache() # 清空缓存
- print_memory_stats()
复制代码
TensorFlow内存管理机制
TensorFlow也使用类似的内存管理策略。在TensorFlow 2.x中,默认启用即时执行(Eager Execution),内存管理更加动态。TensorFlow会根据需要自动分配和释放内存,但也提供了手动控制内存分配的选项。
- import tensorflow as tf
- # 配置GPU内存增长选项,而不是一次性分配所有内存
- gpus = tf.config.experimental.list_physical_devices('GPU')
- if gpus:
- try:
- for gpu in gpus:
- tf.config.experimental.set_memory_growth(gpu, True)
- except RuntimeError as e:
- print(e)
- # 查看内存使用情况
- def print_tf_memory_stats():
- for gpu in gpus:
- memory_info = tf.config.experimental.get_memory_info(gpu.name)
- print(f"GPU {gpu.name}:")
- print(f" 当前使用: {memory_info['current'] / 1024**3:.2f} GB")
- print(f" 峰值使用: {memory_info['peak'] / 1024**3:.2f} GB")
- # 创建张量并查看内存使用
- a = tf.random.normal((10000, 10000))
- print_tf_memory_stats()
- # 删除张量
- del a
- print_tf_memory_stats()
复制代码
手动释放显存的方法
使用垃圾回收机制
Python的垃圾回收机制可以自动释放不再使用的对象,但在GPU内存管理中,仅依靠垃圾回收可能不够及时。我们可以手动触发垃圾回收来释放内存。
- import gc
- import torch
- # 创建一些大的张量
- a = torch.randn(10000, 10000, device='cuda')
- b = torch.randn(10000, 10000, device='cuda')
- print(f"分配前: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
- # 删除引用
- del a
- del b
- # 手动触发垃圾回收
- gc.collect()
- # 清空PyTorch的缓存分配器
- torch.cuda.empty_cache()
- print(f"释放后: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
复制代码
使用上下文管理器
上下文管理器可以确保资源在使用后被正确释放,即使在发生异常的情况下也是如此。我们可以创建自定义的上下文管理器来管理GPU内存。
- import torch
- from contextlib import contextmanager
- @contextmanager
- def gpu_memory_context(device='cuda'):
- try:
- # 记录初始内存状态
- initial_memory = torch.cuda.memory_allocated(device=device)
- yield
- finally:
- # 清理并释放内存
- gc.collect()
- torch.cuda.empty_cache()
- final_memory = torch.cuda.memory_allocated(device=device)
- print(f"内存变化: {(initial_memory - final_memory) / 1024**3:.2f} GB")
- # 使用上下文管理器
- with gpu_memory_context():
- a = torch.randn(5000, 5000, device='cuda')
- b = torch.randn(5000, 5000, device='cuda')
- c = torch.matmul(a, b)
- # 在这里进行计算
- # 退出上下文时,内存会被自动释放
复制代码
显式释放模型和张量
在深度学习中,模型和张量是占用显存的主要对象。显式地删除不再需要的模型和张量,并释放其占用的显存,可以有效管理内存。
- import torch
- import torch.nn as nn
- # 定义一个简单的模型
- class SimpleModel(nn.Module):
- def __init__(self):
- super(SimpleModel, self).__init__()
- self.linear = nn.Linear(1000, 1000)
-
- def forward(self, x):
- return self.linear(x)
- # 创建模型并移至GPU
- model = SimpleModel().cuda()
- input = torch.randn(100, 1000).cuda()
- # 查看内存使用
- print(f"模型创建后: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
- # 使用模型进行前向传播
- output = model(input)
- loss = output.sum()
- loss.backward()
- # 查看内存使用
- print(f"反向传播后: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
- # 释放模型和梯度
- del model, input, output, loss
- torch.cuda.empty_cache()
- # 查看内存使用
- print(f"释放后: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
复制代码
使用内存映射文件处理大数据
对于非常大的数据集,可以考虑使用内存映射文件(Memory-mapped files)技术,这样可以避免一次性将所有数据加载到GPU显存中。
- import torch
- import numpy as np
- # 创建一个大的numpy数组并保存到磁盘
- large_array = np.random.rand(10000, 10000)
- np.save('large_array.npy', large_array)
- # 使用内存映射方式加载数组
- memmap_array = np.load('large_array.npy', mmap_mode='r')
- # 创建PyTorch张量,但不立即复制到GPU
- tensor = torch.from_numpy(memmap_array)
- # 分批处理数据
- batch_size = 1000
- for i in range(0, len(tensor), batch_size):
- batch = tensor[i:i+batch_size].cuda() # 只将当前批次加载到GPU
- # 在这里处理批次数据
- result = torch.mean(batch)
- print(f"批次 {i//batch_size}: {result.item()}")
-
- # 释放当前批次的GPU内存
- del batch
- torch.cuda.empty_cache()
复制代码
自动内存管理策略
PyTorch的自动内存管理
PyTorch提供了几种自动内存管理的策略,包括内存池、梯度检查点和自动混合精度等。
PyTorch的缓存分配器会自动管理内存池,但我们可以通过调整环境变量来优化其行为。
- import os
- import torch
- # 设置PyTorch内存分配器的环境变量
- # PYTORCH_CUDA_ALLOC_CONF可以控制内存分配器的行为
- # 例如,可以设置最大内存池大小
- os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128'
- # 现在PyTorch会使用更小的内存块,减少内存碎片
- a = torch.randn(5000, 5000, device='cuda')
- b = torch.randn(5000, 5000, device='cuda')
- c = torch.matmul(a, b)
- # 查看内存使用情况
- print(f"内存使用: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
复制代码
梯度检查点是一种用计算换内存的技术,它在前向传播时不保存所有中间激活值,而是在反向传播时重新计算它们。
- import torch
- import torch.nn as nn
- from torch.utils.checkpoint import checkpoint
- # 定义一个深度模型
- class DeepModel(nn.Module):
- def __init__(self):
- super(DeepModel, self).__init__()
- self.layers = nn.ModuleList([nn.Linear(1000, 1000) for _ in range(10)])
-
- def forward(self, x):
- for layer in self.layers:
- # 使用checkpoint包装每一层
- x = checkpoint(layer, x)
- return x
- # 创建模型
- model = DeepModel().cuda()
- input = torch.randn(100, 1000).cuda()
- # 查看内存使用
- print(f"使用梯度检查点前: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
- # 前向传播
- output = model(input)
- loss = output.sum()
- loss.backward()
- # 查看内存使用
- print(f"使用梯度检查点后: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
复制代码
自动混合精度可以减少内存使用,同时保持模型性能。它使用半精度浮点数(FP16)进行计算,同时在必要时使用全精度浮点数(FP32)保持数值稳定性。
- import torch
- import torch.nn as nn
- from torch.cuda.amp import autocast, GradScaler
- # 定义模型
- model = nn.Sequential(
- nn.Linear(1000, 1000),
- nn.ReLU(),
- nn.Linear(1000, 1000),
- nn.ReLU(),
- nn.Linear(1000, 10)
- ).cuda()
- # 创建GradScaler用于梯度缩放
- scaler = GradScaler()
- # 生成输入数据
- input = torch.randn(100, 1000).cuda()
- target = torch.randint(0, 10, (100,)).cuda()
- # 查看内存使用
- print(f"AMP前: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
- # 使用自动混合精度进行训练
- optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
- with autocast():
- output = model(input)
- loss = nn.CrossEntropyLoss()(output, target)
- # 缩放损失并反向传播
- scaler.scale(loss).backward()
- # 缩放梯度并更新参数
- scaler.step(optimizer)
- scaler.update()
- # 查看内存使用
- print(f"AMP后: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
复制代码
TensorFlow的自动内存管理
TensorFlow也提供了多种自动内存管理策略,包括内存增长选项、虚拟设备和内存优化器等。
默认情况下,TensorFlow会一次性分配GPU上几乎所有可用的内存。我们可以启用内存增长选项,使TensorFlow只在需要时分配内存。
- import tensorflow as tf
- # 获取所有GPU设备
- gpus = tf.config.experimental.list_physical_devices('GPU')
- if gpus:
- try:
- # 启用内存增长
- for gpu in gpus:
- tf.config.experimental.set_memory_growth(gpu, True)
- print("内存增长已启用")
- except RuntimeError as e:
- print(f"内存增长设置失败: {e}")
- # 查看内存使用情况
- for gpu in gpus:
- memory_info = tf.config.experimental.get_memory_info(gpu.name)
- print(f"GPU {gpu.name}:")
- print(f" 当前使用: {memory_info['current'] / 1024**3:.2f} GB")
- print(f" 峰值使用: {memory_info['peak'] / 1024**3:.2f} GB")
复制代码
我们可以配置虚拟设备,限制TensorFlow可以使用的GPU内存量。
- import tensorflow as tf
- # 获取所有GPU设备
- gpus = tf.config.experimental.list_physical_devices('GPU')
- if gpus:
- try:
- # 限制第一个GPU只能使用4GB内存
- tf.config.experimental.set_virtual_device_configuration(
- gpus[0],
- [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=4096)]
- )
- print("虚拟设备配置已设置")
- except RuntimeError as e:
- print(f"虚拟设备配置失败: {e}")
- # 查看内存使用情况
- for gpu in gpus:
- memory_info = tf.config.experimental.get_memory_info(gpu.name)
- print(f"GPU {gpu.name}:")
- print(f" 当前使用: {memory_info['current'] / 1024**3:.2f} GB")
- print(f" 峰值使用: {memory_info['peak'] / 1024**3:.2f} GB")
复制代码
TensorFlow提供了多种内存优化技术,包括模型优化、梯度累积和混合精度训练等。
- import tensorflow as tf
- # 启用混合精度训练
- policy = tf.keras.mixed_precision.Policy('mixed_float16')
- tf.keras.mixed_precision.set_global_policy(policy)
- # 定义模型
- model = tf.keras.Sequential([
- tf.keras.layers.Dense(1000, activation='relu'),
- tf.keras.layers.Dense(1000, activation='relu'),
- tf.keras.layers.Dense(10)
- ])
- # 编译模型
- model.compile(
- optimizer='adam',
- loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
- metrics=['accuracy']
- )
- # 生成数据
- x_train = tf.random.normal((1000, 100))
- y_train = tf.random.uniform((1000,), maxval=10, dtype=tf.int32)
- # 查看内存使用
- gpus = tf.config.experimental.list_physical_devices('GPU')
- if gpus:
- memory_info = tf.config.experimental.get_memory_info(gpus[0].name)
- print(f"训练前内存使用: {memory_info['current'] / 1024**3:.2f} GB")
- # 训练模型
- model.fit(x_train, y_train, epochs=2)
- # 查看内存使用
- if gpus:
- memory_info = tf.config.experimental.get_memory_info(gpus[0].name)
- print(f"训练后内存使用: {memory_info['current'] / 1024**3:.2f} GB")
复制代码
常见问题及解决方案
显存泄漏的识别与解决
显存泄漏是指程序在运行过程中持续占用越来越多的显存,而不释放不再使用的内存。这最终会导致程序因显存不足而崩溃。
- import torch
- import time
- import matplotlib.pyplot as plt
- def check_memory_leak():
- memory_usage = []
- timestamps = []
-
- # 记录初始内存
- initial_memory = torch.cuda.memory_allocated()
- print(f"初始内存使用: {initial_memory / 1024**3:.2f} GB")
-
- # 模拟可能泄漏的操作
- tensors = []
- for i in range(10):
- # 创建一些张量并添加到列表中
- a = torch.randn(1000, 1000, device='cuda')
- b = torch.randn(1000, 1000, device='cuda')
- c = torch.matmul(a, b)
- tensors.append(c)
-
- # 记录内存使用
- current_memory = torch.cuda.memory_allocated()
- memory_usage.append(current_memory / 1024**3)
- timestamps.append(i)
-
- print(f"步骤 {i}: {current_memory / 1024**3:.2f} GB")
- time.sleep(0.5)
-
- # 绘制内存使用图
- plt.figure(figsize=(10, 6))
- plt.plot(timestamps, memory_usage, 'b-')
- plt.xlabel('步骤')
- plt.ylabel('内存使用 (GB)')
- plt.title('内存使用趋势')
- plt.grid(True)
- plt.show()
-
- # 释放张量
- del tensors
- torch.cuda.empty_cache()
-
- final_memory = torch.cuda.memory_allocated()
- print(f"最终内存使用: {final_memory / 1024**3:.2f} GB")
- # 运行检查
- check_memory_leak()
复制代码- import torch
- import gc
- import weakref
- # 解决方案1: 确保删除不再需要的对象
- def memory_leak_solution_1():
- tensors = []
- for i in range(10):
- a = torch.randn(1000, 1000, device='cuda')
- b = torch.randn(1000, 1000, device='cuda')
- c = torch.matmul(a, b)
- tensors.append(c)
-
- # 定期清理不再需要的张量
- if i % 3 == 0:
- # 删除前几个张量
- for j in range(min(3, len(tensors))):
- del tensors[j]
- tensors = tensors[3:]
-
- # 手动触发垃圾回收和清空缓存
- gc.collect()
- torch.cuda.empty_cache()
-
- print(f"步骤 {i}: 清理后内存使用 {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
- # 解决方案2: 使用弱引用避免循环引用
- def memory_leak_solution_2():
- class TensorHolder:
- def __init__(self, tensor):
- self.tensor = tensor
-
- holders = []
- for i in range(10):
- a = torch.randn(1000, 1000, device='cuda')
- b = torch.randn(1000, 1000, device='cuda')
- c = torch.matmul(a, b)
-
- # 使用弱引用
- holder = TensorHolder(c)
- weak_holder = weakref.ref(holder)
- holders.append(weak_holder)
-
- print(f"步骤 {i}: 内存使用 {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
-
- # 清理
- del holders
- gc.collect()
- torch.cuda.empty_cache()
- print(f"清理后内存使用 {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
- # 运行解决方案
- print("解决方案1:")
- memory_leak_solution_1()
- print("\n解决方案2:")
- memory_leak_solution_2()
复制代码
CUDA out of memory错误处理
“CUDA out of memory”是深度学习开发中常见的错误,通常发生在尝试分配的内存超过可用显存时。
- import torch
- import torch.nn as nn
- class DynamicBatchTrainer:
- def __init__(self, model, initial_batch_size=32, min_batch_size=1):
- self.model = model
- self.current_batch_size = initial_batch_size
- self.min_batch_size = min_batch_size
-
- def train_step(self, data, target, optimizer, criterion):
- # 尝试使用当前批处理大小
- while self.current_batch_size >= self.min_batch_size:
- try:
- # 分割数据以适应当前批处理大小
- for i in range(0, len(data), self.current_batch_size):
- batch_data = data[i:i+self.current_batch_size]
- batch_target = target[i:i+self.current_batch_size]
-
- # 前向传播
- output = self.model(batch_data)
- loss = criterion(output, batch_target)
-
- # 反向传播
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
-
- # 如果成功,增加批处理大小以获得更好的性能
- self.current_batch_size = min(self.current_batch_size * 2, len(data))
- return loss.item()
-
- except RuntimeError as e:
- if 'out of memory' in str(e):
- # 如果内存不足,减少批处理大小并重试
- torch.cuda.empty_cache()
- self.current_batch_size = max(self.current_batch_size // 2, self.min_batch_size)
- print(f"内存不足,减少批处理大小至 {self.current_batch_size}")
- else:
- # 如果是其他错误,重新抛出
- raise e
-
- # 如果达到最小批处理大小仍然失败,抛出错误
- raise RuntimeError(f"即使使用最小批处理大小 {self.min_batch_size} 也无法训练")
- # 使用示例
- model = nn.Sequential(
- nn.Linear(1000, 1000),
- nn.ReLU(),
- nn.Linear(1000, 10)
- ).cuda()
- optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
- criterion = nn.CrossEntropyLoss()
- # 生成大数据集
- data = torch.randn(1000, 1000).cuda()
- target = torch.randint(0, 10, (1000,)).cuda()
- # 创建动态批处理训练器
- trainer = DynamicBatchTrainer(model, initial_batch_size=256, min_batch_size=1)
- # 训练
- for epoch in range(3):
- loss = trainer.train_step(data, target, optimizer, criterion)
- print(f"Epoch {epoch}, Loss: {loss:.4f}, Batch Size: {trainer.current_batch_size}")
复制代码
梯度累积是一种技术,允许我们使用较小的批处理大小模拟较大的批处理大小,从而减少内存使用。
- import torch
- import torch.nn as nn
- class GradientAccumulator:
- def __init__(self, model, accumulation_steps=4):
- self.model = model
- self.accumulation_steps = accumulation_steps
- self.current_step = 0
-
- def train_step(self, data, target, optimizer, criterion):
- # 前向传播
- output = self.model(data)
- loss = criterion(output, target)
-
- # 归一化损失以考虑累积
- loss = loss / self.accumulation_steps
-
- # 反向传播
- loss.backward()
-
- # 增加步骤计数
- self.current_step += 1
-
- # 如果达到累积步骤,更新参数
- if self.current_step % self.accumulation_steps == 0:
- optimizer.step()
- optimizer.zero_grad()
- return True, loss.item() * self.accumulation_steps
-
- return False, loss.item() * self.accumulation_steps
- # 使用示例
- model = nn.Sequential(
- nn.Linear(1000, 1000),
- nn.ReLU(),
- nn.Linear(1000, 10)
- ).cuda()
- optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
- criterion = nn.CrossEntropyLoss()
- # 生成大数据集
- data = torch.randn(1000, 1000).cuda()
- target = torch.randint(0, 10, (1000,)).cuda()
- # 创建梯度累积器
- accumulator = GradientAccumulator(model, accumulation_steps=4)
- # 训练
- batch_size = 32
- for epoch in range(3):
- total_loss = 0
- update_count = 0
-
- for i in range(0, len(data), batch_size):
- batch_data = data[i:i+batch_size]
- batch_target = target[i:i+batch_size]
-
- updated, loss = accumulator.train_step(batch_data, batch_target, optimizer, criterion)
- total_loss += loss
-
- if updated:
- update_count += 1
-
- avg_loss = total_loss / (len(data) // batch_size)
- print(f"Epoch {epoch}, Loss: {avg_loss:.4f}, Updates: {update_count}")
复制代码
多GPU环境下的内存管理
在多GPU环境中,内存管理变得更加复杂,需要考虑数据并行、模型并行和负载均衡等问题。
- import torch
- import torch.nn as nn
- from torch.nn.parallel import DistributedDataParallel as DDP
- import torch.distributed as dist
- import torch.multiprocessing as mp
- import os
- def setup(rank, world_size):
- os.environ['MASTER_ADDR'] = 'localhost'
- os.environ['MASTER_PORT'] = '12355'
-
- # 初始化进程组
- dist.init_process_group(
- backend='nccl',
- rank=rank,
- world_size=world_size
- )
- def cleanup():
- dist.destroy_process_group()
- class ToyModel(nn.Module):
- def __init__(self):
- super(ToyModel, self).__init__()
- self.net1 = nn.Linear(1000, 1000)
- self.relu = nn.ReLU()
- self.net2 = nn.Linear(1000, 10)
-
- def forward(self, x):
- return self.net2(self.relu(self.net1(x)))
- def train(rank, world_size):
- print(f"在GPU {rank} 上运行 DDP 示例")
- setup(rank, world_size)
-
- # 创建模型并移至当前GPU
- model = ToyModel().to(rank)
-
- # 使用DDP包装模型
- ddp_model = DDP(model, device_ids=[rank])
-
- # 创建优化器
- optimizer = torch.optim.SGD(ddp_model.parameters(), lr=0.001)
-
- # 生成数据
- inputs = torch.randn(2000, 1000).to(rank)
- labels = torch.randn(2000, 10).to(rank)
-
- # 训练循环
- for epoch in range(2):
- # 在每个GPU上处理数据的一部分
- inputs_shard = inputs[rank::world_size]
- labels_shard = labels[rank::world_size]
-
- # 前向传播
- outputs = ddp_model(inputs_shard)
- loss = torch.nn.functional.mse_loss(outputs, labels_shard)
-
- # 反向传播
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
-
- # 打印内存使用情况
- allocated = torch.cuda.memory_allocated(rank) / 1024**3
- print(f"GPU {rank}, Epoch {epoch}, Loss: {loss.item()}, Memory: {allocated:.2f} GB")
-
- # 清理
- cleanup()
- def main():
- world_size = torch.cuda.device_count()
- print(f"发现 {world_size} 个GPU")
- if world_size > 1:
- mp.spawn(train, args=(world_size,), nprocs=world_size, join=True)
- else:
- print("需要多个GPU才能运行此示例")
- if __name__ == '__main__':
- main()
复制代码- import torch
- import torch.nn as nn
- from torch.nn.parallel import DistributedDataParallel as DDP
- import torch.distributed as dist
- import torch.multiprocessing as mp
- import os
- def setup(rank, world_size):
- os.environ['MASTER_ADDR'] = 'localhost'
- os.environ['MASTER_PORT'] = '12355'
-
- # 初始化进程组
- dist.init_process_group(
- backend='nccl',
- rank=rank,
- world_size=world_size
- )
- def cleanup():
- dist.destroy_process_group()
- class ParallelModel(nn.Module):
- def __init__(self, rank, world_size):
- super(ParallelModel, self).__init__()
- self.rank = rank
- self.world_size = world_size
-
- # 将模型的不同层分配到不同的GPU
- if rank == 0:
- # 第一个GPU处理前半部分
- self.part1 = nn.Sequential(
- nn.Linear(1000, 2000),
- nn.ReLU(),
- nn.Linear(2000, 2000),
- nn.ReLU()
- )
- else:
- # 第二个GPU处理后半部分
- self.part2 = nn.Sequential(
- nn.Linear(2000, 2000),
- nn.ReLU(),
- nn.Linear(2000, 10)
- )
-
- def forward(self, x):
- # 第一个GPU处理前向传播的第一部分
- if self.rank == 0:
- x = self.part1(x)
- # 发送到下一个GPU
- dist.send(x, dst=1)
- # 接收最终结果
- dist.recv(x, src=1)
- return x
- else:
- # 第二个GPU接收中间结果
- dist.recv(x, src=0)
- # 处理后半部分
- x = self.part2(x)
- # 发送回第一个GPU
- dist.send(x, dst=0)
- return x
- def train(rank, world_size):
- print(f"在GPU {rank} 上运行模型并行示例")
- setup(rank, world_size)
-
- # 创建模型
- model = ParallelModel(rank, world_size).to(rank)
-
- # 只在主进程创建优化器
- if rank == 0:
- optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
-
- # 生成数据
- if rank == 0:
- inputs = torch.randn(64, 1000).to(rank)
- labels = torch.randn(64, 10).to(rank)
-
- # 训练循环
- for epoch in range(2):
- if rank == 0:
- # 前向传播
- outputs = model(inputs)
- loss = torch.nn.functional.mse_loss(outputs, labels)
-
- # 反向传播
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
-
- # 打印内存使用情况
- allocated = torch.cuda.memory_allocated(rank) / 1024**3
- print(f"GPU {rank}, Epoch {epoch}, Loss: {loss.item()}, Memory: {allocated:.2f} GB")
- else:
- # 其他GPU只参与前向和反向传播
- outputs = model(inputs)
- # 反向传播会自动处理
- outputs.sum().backward()
-
- # 打印内存使用情况
- allocated = torch.cuda.memory_allocated(rank) / 1024**3
- print(f"GPU {rank}, Epoch {epoch}, Memory: {allocated:.2f} GB")
-
- # 清理
- cleanup()
- def main():
- world_size = 2 # 使用2个GPU
- if torch.cuda.device_count() >= world_size:
- mp.spawn(train, args=(world_size,), nprocs=world_size, join=True)
- else:
- print(f"需要至少 {world_size} 个GPU才能运行此示例")
- if __name__ == '__main__':
- main()
复制代码
最佳实践
监控GPU内存使用
有效管理GPU内存的第一步是了解内存的使用情况。以下是一些监控GPU内存使用的方法:
- import torch
- import time
- import psutil
- import GPUtil
- from datetime import datetime
- class GPUMemoryMonitor:
- def __init__(self, interval=1.0):
- self.interval = interval
- self.running = False
- self.log_data = []
-
- def start(self):
- self.running = True
- self.log_data = []
- print("开始监控GPU内存使用...")
-
- while self.running:
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-
- # 获取GPU信息
- gpus = GPUtil.getGPUs()
- for i, gpu in enumerate(gpus):
- memory_used = gpu.memoryUsed
- memory_total = gpu.memoryTotal
- memory_percent = gpu.memoryUtil * 100
- gpu_load = gpu.load * 100
-
- log_entry = {
- 'timestamp': timestamp,
- 'gpu_id': i,
- 'memory_used': memory_used,
- 'memory_total': memory_total,
- 'memory_percent': memory_percent,
- 'gpu_load': gpu_load
- }
-
- self.log_data.append(log_entry)
-
- print(f"[{timestamp}] GPU {i}: 内存 {memory_used}/{memory_total} MB ({memory_percent:.1f}%), 负载 {gpu_load:.1f}%")
-
- time.sleep(self.interval)
-
- def stop(self):
- self.running = False
- print("停止监控GPU内存使用")
-
- def get_memory_usage_plot(self):
- import matplotlib.pyplot as plt
- import pandas as pd
-
- if not self.log_data:
- print("没有可用的监控数据")
- return
-
- # 转换为DataFrame
- df = pd.DataFrame(self.log_data)
-
- # 绘制内存使用图
- plt.figure(figsize=(12, 6))
-
- for gpu_id in df['gpu_id'].unique():
- gpu_data = df[df['gpu_id'] == gpu_id]
- plt.plot(gpu_data['timestamp'], gpu_data['memory_percent'], label=f'GPU {gpu_id}')
-
- plt.xlabel('时间')
- plt.ylabel('内存使用率 (%)')
- plt.title('GPU内存使用率随时间变化')
- plt.legend()
- plt.grid(True)
- plt.xticks(rotation=45)
- plt.tight_layout()
- plt.show()
- # 使用示例
- monitor = GPUMemoryMonitor(interval=0.5)
- # 在单独的线程中启动监控
- import threading
- monitor_thread = threading.Thread(target=monitor.start)
- monitor_thread.start()
- # 模拟一些GPU操作
- try:
- a = torch.randn(5000, 5000, device='cuda')
- time.sleep(2)
- b = torch.randn(5000, 5000, device='cuda')
- time.sleep(2)
- c = torch.matmul(a, b)
- time.sleep(2)
- del a, b, c
- torch.cuda.empty_cache()
- time.sleep(2)
- finally:
- # 停止监控
- monitor.stop()
- monitor_thread.join()
-
- # 显示内存使用图
- monitor.get_memory_usage_plot()
复制代码
内存优化策略
以下是一些有效的内存优化策略:
- import torch
- import torch.nn as nn
- from torch.utils.checkpoint import checkpoint
- from torch.cuda.amp import autocast, GradScaler
- class MemoryOptimizedTrainer:
- def __init__(self, model, device='cuda'):
- self.model = model.to(device)
- self.device = device
- self.scaler = GradScaler()
-
- def train_with_gradient_checkpointing(self, data_loader, optimizer, criterion, epochs=1):
- """使用梯度检查点进行训练,减少内存使用"""
- self.model.train()
-
- for epoch in range(epochs):
- total_loss = 0
- for batch_idx, (data, target) in enumerate(data_loader):
- data, target = data.to(self.device), target.to(self.device)
-
- def custom_forward(*inputs):
- """自定义前向函数,用于梯度检查点"""
- return self.model(inputs[0])
-
- # 使用梯度检查点
- output = checkpoint(custom_forward, data)
- loss = criterion(output, target)
-
- # 反向传播
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
-
- total_loss += loss.item()
-
- # 打印内存使用情况
- if batch_idx % 10 == 0:
- memory_used = torch.cuda.memory_allocated(self.device) / 1024**3
- print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}, Memory: {memory_used:.2f} GB')
-
- avg_loss = total_loss / len(data_loader)
- print(f'Epoch {epoch}, Average Loss: {avg_loss:.4f}')
-
- def train_with_mixed_precision(self, data_loader, optimizer, criterion, epochs=1):
- """使用混合精度进行训练,减少内存使用并提高速度"""
- self.model.train()
-
- for epoch in range(epochs):
- total_loss = 0
- for batch_idx, (data, target) in enumerate(data_loader):
- data, target = data.to(self.device), target.to(self.device)
-
- # 使用自动混合精度
- with autocast():
- output = self.model(data)
- loss = criterion(output, target)
-
- # 缩放损失并反向传播
- optimizer.zero_grad()
- self.scaler.scale(loss).backward()
-
- # 缩放梯度并更新参数
- self.scaler.step(optimizer)
- self.scaler.update()
-
- total_loss += loss.item()
-
- # 打印内存使用情况
- if batch_idx % 10 == 0:
- memory_used = torch.cuda.memory_allocated(self.device) / 1024**3
- print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}, Memory: {memory_used:.2f} GB')
-
- avg_loss = total_loss / len(data_loader)
- print(f'Epoch {epoch}, Average Loss: {avg_loss:.4f}')
-
- def train_with_gradient_accumulation(self, data_loader, optimizer, criterion, accumulation_steps=4, epochs=1):
- """使用梯度累积进行训练,模拟更大的批处理大小"""
- self.model.train()
-
- for epoch in range(epochs):
- total_loss = 0
- optimizer.zero_grad()
-
- for batch_idx, (data, target) in enumerate(data_loader):
- data, target = data.to(self.device), target.to(self.device)
-
- # 前向传播
- with autocast():
- output = self.model(data)
- loss = criterion(output, target)
-
- # 归一化损失
- loss = loss / accumulation_steps
-
- # 反向传播
- self.scaler.scale(loss).backward()
-
- total_loss += loss.item() * accumulation_steps
-
- # 如果达到累积步骤,更新参数
- if (batch_idx + 1) % accumulation_steps == 0:
- # 缩放梯度并更新参数
- self.scaler.step(optimizer)
- self.scaler.update()
- optimizer.zero_grad()
-
- # 打印内存使用情况
- if batch_idx % 10 == 0:
- memory_used = torch.cuda.memory_allocated(self.device) / 1024**3
- print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item() * accumulation_steps:.4f}, Memory: {memory_used:.2f} GB')
-
- # 处理剩余的批次
- if len(data_loader) % accumulation_steps != 0:
- self.scaler.step(optimizer)
- self.scaler.update()
- optimizer.zero_grad()
-
- avg_loss = total_loss / len(data_loader)
- print(f'Epoch {epoch}, Average Loss: {avg_loss:.4f}')
- # 使用示例
- # 创建一个简单的模型
- model = nn.Sequential(
- nn.Linear(1000, 2000),
- nn.ReLU(),
- nn.Linear(2000, 2000),
- nn.ReLU(),
- nn.Linear(2000, 10)
- )
- # 创建优化器和损失函数
- optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
- criterion = nn.CrossEntropyLoss()
- # 创建数据加载器
- from torch.utils.data import DataLoader, TensorDataset
- data = torch.randn(1000, 1000)
- target = torch.randint(0, 10, (1000,))
- dataset = TensorDataset(data, target)
- data_loader = DataLoader(dataset, batch_size=32, shuffle=True)
- # 创建内存优化训练器
- trainer = MemoryOptimizedTrainer(model)
- # 使用不同的优化策略进行训练
- print("使用梯度检查点训练:")
- trainer.train_with_gradient_checkpointing(data_loader, optimizer, criterion, epochs=1)
- print("\n使用混合精度训练:")
- trainer.train_with_mixed_precision(data_loader, optimizer, criterion, epochs=1)
- print("\n使用梯度累积训练:")
- trainer.train_with_gradient_accumulation(data_loader, optimizer, criterion, accumulation_steps=4, epochs=1)
复制代码
内存泄漏预防
预防内存泄漏比解决内存泄漏更容易。以下是一些预防内存泄漏的最佳实践:
- import torch
- import gc
- import weakref
- from contextlib import contextmanager
- class MemoryLeakPrevention:
- @staticmethod
- @contextmanager
- def memory_context(device='cuda'):
- """上下文管理器,确保在退出时清理内存"""
- try:
- # 记录初始内存状态
- initial_memory = torch.cuda.memory_allocated(device=device)
- yield
- finally:
- # 清理内存
- gc.collect()
- torch.cuda.empty_cache(device=device)
- final_memory = torch.cuda.memory_allocated(device=device)
- print(f"内存变化: {(initial_memory - final_memory) / 1024**3:.2f} GB")
-
- @staticmethod
- def safe_model_loading(model_path, device='cuda'):
- """安全加载模型,避免内存泄漏"""
- with MemoryLeakPrevention.memory_context(device):
- # 首先在CPU上加载模型
- model = torch.load(model_path, map_location='cpu')
-
- # 然后移动到目标设备
- model = model.to(device)
-
- return model
-
- @staticmethod
- def safe_tensor_creation(size, device='cuda', dtype=torch.float32):
- """安全创建张量,避免内存泄漏"""
- with MemoryLeakPrevention.memory_context(device):
- return torch.randn(size, device=device, dtype=dtype)
-
- @staticmethod
- def safe_training_loop(model, data_loader, optimizer, criterion, epochs=1, device='cuda'):
- """安全的训练循环,避免内存泄漏"""
- model = model.to(device)
-
- for epoch in range(epochs):
- model.train()
- total_loss = 0
-
- for batch_idx, (data, target) in enumerate(data_loader):
- data, target = data.to(device), target.to(device)
-
- # 前向传播
- output = model(data)
- loss = criterion(output, target)
-
- # 反向传播
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
-
- total_loss += loss.item()
-
- # 定期清理内存
- if batch_idx % 100 == 0:
- with MemoryLeakPrevention.memory_context(device):
- pass # 上下文管理器会自动清理内存
-
- avg_loss = total_loss / len(data_loader)
- print(f'Epoch {epoch}, Average Loss: {avg_loss:.4f}')
-
- @staticmethod
- def safe_inference(model, data_loader, device='cuda'):
- """安全的推理过程,避免内存泄漏"""
- model = model.to(device)
- model.eval()
-
- results = []
-
- with torch.no_grad():
- for batch_idx, (data, _) in enumerate(data_loader):
- data = data.to(device)
-
- # 前向传播
- output = model(data)
-
- # 保存结果
- results.append(output.cpu())
-
- # 定期清理内存
- if batch_idx % 100 == 0:
- with MemoryLeakPrevention.memory_context(device):
- pass # 上下文管理器会自动清理内存
-
- return torch.cat(results)
- # 使用示例
- # 创建一个简单的模型
- model = torch.nn.Sequential(
- torch.nn.Linear(1000, 100),
- torch.nn.ReLU(),
- torch.nn.Linear(100, 10)
- )
- # 创建数据加载器
- from torch.utils.data import DataLoader, TensorDataset
- data = torch.randn(1000, 1000)
- target = torch.randint(0, 10, (1000,))
- dataset = TensorDataset(data, target)
- data_loader = DataLoader(dataset, batch_size=32, shuffle=True)
- # 创建优化器和损失函数
- optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
- criterion = torch.nn.CrossEntropyLoss()
- # 使用安全的训练循环
- print("使用安全的训练循环:")
- MemoryLeakPrevention.safe_training_loop(model, data_loader, optimizer, criterion, epochs=1)
- # 使用安全的推理过程
- print("\n使用安全的推理过程:")
- results = MemoryLeakPrevention.safe_inference(model, data_loader)
- print(f"推理结果形状: {results.shape}")
- # 安全创建张量
- print("\n安全创建张量:")
- tensor = MemoryLeakPrevention.safe_tensor_creation((1000, 1000))
- print(f"张量形状: {tensor.shape}")
复制代码
结论
在深度学习项目开发中,有效管理GPU内存资源是确保程序稳定运行的关键。本文全面介绍了Python中释放显存的实用技术,包括手动释放方法、自动内存管理策略和常见问题解决方案。
通过掌握这些技术,开发者可以:
1. 更有效地管理GPU内存资源,避免内存不足的问题
2. 防止显存泄漏,提高程序的稳定性
3. 在有限的硬件资源上训练更大的模型
4. 优化内存使用,提高程序运行效率
在实际应用中,建议开发者结合多种技术,根据具体项目需求选择最适合的内存管理策略。同时,定期监控内存使用情况,及时发现和解决内存问题,也是确保程序稳定运行的重要措施。
随着深度学习技术的不断发展,GPU内存管理技术也在不断进步。开发者应保持学习,关注最新的内存管理技术和最佳实践,以应对日益复杂的深度学习项目需求。 |
|