活动公告

系统通知
06-18 23:43
系统通知
06-14 00:00
系统通知
通知:本站资源由网友上传分享,如有违规等问题请到版务模块进行投诉,资源失效请在帖子内回复要求补档,会尽快处理!
10-23 09:31

Python释放显存实用技术在深度学习项目开发中如何有效管理GPU内存资源防止显存泄漏和程序崩溃的完整指南包括手动释放方法自动内存管理策略和常见问题解决方案

SunJu_FaceMall

3万

主题

3077

科技点

3万

积分

执行版主

碾压王

积分
32876

塔罗立华奏

执行版主 发表于 2025-9-29 15:30:00 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
引言

在深度学习项目开发中,GPU内存(显存)管理是一个至关重要的环节。随着模型规模的不断扩大和数据集的日益增长,GPU显存往往成为限制模型训练和推理的瓶颈。不当的内存管理不仅会导致程序运行效率低下,还可能引发显存泄漏,最终导致程序崩溃。本文将全面介绍Python中释放显存的实用技术,帮助开发者有效管理GPU内存资源,防止显存泄漏和程序崩溃。

GPU内存基础

GPU显存的工作原理

GPU显存(Video RAM,简称VRAM)是专门用于存储GPU计算所需数据的内存。与系统内存(RAM)不同,GPU显存具有更高的带宽和更低的延迟,专门为并行计算优化。在深度学习中,模型参数、中间激活值、梯度以及批处理数据都需要存储在GPU显存中。

Python与GPU内存的交互

Python通过深度学习框架(如PyTorch、TensorFlow)与GPU进行交互。这些框架提供了高级API,使开发者能够轻松地将计算任务和数据分配到GPU上。然而,这种抽象也意味着开发者对底层内存管理的控制有限,容易导致内存问题。
  1. # 检查GPU可用性
  2. import torch
  3. if torch.cuda.is_available():
  4.     print(f"可用GPU数量: {torch.cuda.device_count()}")
  5.     print(f"当前GPU: {torch.cuda.current_device()}")
  6.     print(f"GPU名称: {torch.cuda.get_device_name(torch.cuda.current_device())}")
  7.     print(f"GPU显存总量: {torch.cuda.get_device_properties(torch.cuda.current_device()).total_memory / 1024**3:.2f} GB")
复制代码

深度学习框架中的内存管理

PyTorch内存管理机制

PyTorch使用缓存分配器(Caching Allocator)来管理GPU内存。这种机制通过维护一个内存池来减少内存分配和释放的开销。当PyTorch需要分配内存时,它会首先检查内存池中是否有足够大的空闲块;如果没有,它会向操作系统申请新的内存。当释放内存时,PyTorch不会立即将内存返回给操作系统,而是将其保留在内存池中以备将来使用。
  1. import torch
  2. # 查看PyTorch内存分配统计
  3. def print_memory_stats():
  4.     allocated = torch.cuda.memory_allocated() / 1024**3  # GB
  5.     cached = torch.cuda.memory_reserved() / 1024**3  # GB
  6.     print(f"已分配显存: {allocated:.2f} GB")
  7.     print(f"缓存显存: {cached:.2f} GB")
  8. # 创建张量并查看内存使用
  9. a = torch.randn(10000, 10000, device='cuda')
  10. print_memory_stats()
  11. # 删除张量
  12. del a
  13. torch.cuda.empty_cache()  # 清空缓存
  14. print_memory_stats()
复制代码

TensorFlow内存管理机制

TensorFlow也使用类似的内存管理策略。在TensorFlow 2.x中,默认启用即时执行(Eager Execution),内存管理更加动态。TensorFlow会根据需要自动分配和释放内存,但也提供了手动控制内存分配的选项。
  1. import tensorflow as tf
  2. # 配置GPU内存增长选项,而不是一次性分配所有内存
  3. gpus = tf.config.experimental.list_physical_devices('GPU')
  4. if gpus:
  5.     try:
  6.         for gpu in gpus:
  7.             tf.config.experimental.set_memory_growth(gpu, True)
  8.     except RuntimeError as e:
  9.         print(e)
  10. # 查看内存使用情况
  11. def print_tf_memory_stats():
  12.     for gpu in gpus:
  13.         memory_info = tf.config.experimental.get_memory_info(gpu.name)
  14.         print(f"GPU {gpu.name}:")
  15.         print(f"  当前使用: {memory_info['current'] / 1024**3:.2f} GB")
  16.         print(f"  峰值使用: {memory_info['peak'] / 1024**3:.2f} GB")
  17. # 创建张量并查看内存使用
  18. a = tf.random.normal((10000, 10000))
  19. print_tf_memory_stats()
  20. # 删除张量
  21. del a
  22. print_tf_memory_stats()
复制代码

手动释放显存的方法

使用垃圾回收机制

Python的垃圾回收机制可以自动释放不再使用的对象,但在GPU内存管理中,仅依靠垃圾回收可能不够及时。我们可以手动触发垃圾回收来释放内存。
  1. import gc
  2. import torch
  3. # 创建一些大的张量
  4. a = torch.randn(10000, 10000, device='cuda')
  5. b = torch.randn(10000, 10000, device='cuda')
  6. print(f"分配前: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
  7. # 删除引用
  8. del a
  9. del b
  10. # 手动触发垃圾回收
  11. gc.collect()
  12. # 清空PyTorch的缓存分配器
  13. torch.cuda.empty_cache()
  14. print(f"释放后: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
复制代码

使用上下文管理器

上下文管理器可以确保资源在使用后被正确释放,即使在发生异常的情况下也是如此。我们可以创建自定义的上下文管理器来管理GPU内存。
  1. import torch
  2. from contextlib import contextmanager
  3. @contextmanager
  4. def gpu_memory_context(device='cuda'):
  5.     try:
  6.         # 记录初始内存状态
  7.         initial_memory = torch.cuda.memory_allocated(device=device)
  8.         yield
  9.     finally:
  10.         # 清理并释放内存
  11.         gc.collect()
  12.         torch.cuda.empty_cache()
  13.         final_memory = torch.cuda.memory_allocated(device=device)
  14.         print(f"内存变化: {(initial_memory - final_memory) / 1024**3:.2f} GB")
  15. # 使用上下文管理器
  16. with gpu_memory_context():
  17.     a = torch.randn(5000, 5000, device='cuda')
  18.     b = torch.randn(5000, 5000, device='cuda')
  19.     c = torch.matmul(a, b)
  20.     # 在这里进行计算
  21.     # 退出上下文时,内存会被自动释放
复制代码

显式释放模型和张量

在深度学习中,模型和张量是占用显存的主要对象。显式地删除不再需要的模型和张量,并释放其占用的显存,可以有效管理内存。
  1. import torch
  2. import torch.nn as nn
  3. # 定义一个简单的模型
  4. class SimpleModel(nn.Module):
  5.     def __init__(self):
  6.         super(SimpleModel, self).__init__()
  7.         self.linear = nn.Linear(1000, 1000)
  8.    
  9.     def forward(self, x):
  10.         return self.linear(x)
  11. # 创建模型并移至GPU
  12. model = SimpleModel().cuda()
  13. input = torch.randn(100, 1000).cuda()
  14. # 查看内存使用
  15. print(f"模型创建后: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
  16. # 使用模型进行前向传播
  17. output = model(input)
  18. loss = output.sum()
  19. loss.backward()
  20. # 查看内存使用
  21. print(f"反向传播后: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
  22. # 释放模型和梯度
  23. del model, input, output, loss
  24. torch.cuda.empty_cache()
  25. # 查看内存使用
  26. print(f"释放后: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
复制代码

使用内存映射文件处理大数据

对于非常大的数据集,可以考虑使用内存映射文件(Memory-mapped files)技术,这样可以避免一次性将所有数据加载到GPU显存中。
  1. import torch
  2. import numpy as np
  3. # 创建一个大的numpy数组并保存到磁盘
  4. large_array = np.random.rand(10000, 10000)
  5. np.save('large_array.npy', large_array)
  6. # 使用内存映射方式加载数组
  7. memmap_array = np.load('large_array.npy', mmap_mode='r')
  8. # 创建PyTorch张量,但不立即复制到GPU
  9. tensor = torch.from_numpy(memmap_array)
  10. # 分批处理数据
  11. batch_size = 1000
  12. for i in range(0, len(tensor), batch_size):
  13.     batch = tensor[i:i+batch_size].cuda()  # 只将当前批次加载到GPU
  14.     # 在这里处理批次数据
  15.     result = torch.mean(batch)
  16.     print(f"批次 {i//batch_size}: {result.item()}")
  17.    
  18.     # 释放当前批次的GPU内存
  19.     del batch
  20.     torch.cuda.empty_cache()
复制代码

自动内存管理策略

PyTorch的自动内存管理

PyTorch提供了几种自动内存管理的策略,包括内存池、梯度检查点和自动混合精度等。

PyTorch的缓存分配器会自动管理内存池,但我们可以通过调整环境变量来优化其行为。
  1. import os
  2. import torch
  3. # 设置PyTorch内存分配器的环境变量
  4. # PYTORCH_CUDA_ALLOC_CONF可以控制内存分配器的行为
  5. # 例如,可以设置最大内存池大小
  6. os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128'
  7. # 现在PyTorch会使用更小的内存块,减少内存碎片
  8. a = torch.randn(5000, 5000, device='cuda')
  9. b = torch.randn(5000, 5000, device='cuda')
  10. c = torch.matmul(a, b)
  11. # 查看内存使用情况
  12. print(f"内存使用: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
复制代码

梯度检查点是一种用计算换内存的技术,它在前向传播时不保存所有中间激活值,而是在反向传播时重新计算它们。
  1. import torch
  2. import torch.nn as nn
  3. from torch.utils.checkpoint import checkpoint
  4. # 定义一个深度模型
  5. class DeepModel(nn.Module):
  6.     def __init__(self):
  7.         super(DeepModel, self).__init__()
  8.         self.layers = nn.ModuleList([nn.Linear(1000, 1000) for _ in range(10)])
  9.    
  10.     def forward(self, x):
  11.         for layer in self.layers:
  12.             # 使用checkpoint包装每一层
  13.             x = checkpoint(layer, x)
  14.         return x
  15. # 创建模型
  16. model = DeepModel().cuda()
  17. input = torch.randn(100, 1000).cuda()
  18. # 查看内存使用
  19. print(f"使用梯度检查点前: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
  20. # 前向传播
  21. output = model(input)
  22. loss = output.sum()
  23. loss.backward()
  24. # 查看内存使用
  25. print(f"使用梯度检查点后: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
复制代码

自动混合精度可以减少内存使用,同时保持模型性能。它使用半精度浮点数(FP16)进行计算,同时在必要时使用全精度浮点数(FP32)保持数值稳定性。
  1. import torch
  2. import torch.nn as nn
  3. from torch.cuda.amp import autocast, GradScaler
  4. # 定义模型
  5. model = nn.Sequential(
  6.     nn.Linear(1000, 1000),
  7.     nn.ReLU(),
  8.     nn.Linear(1000, 1000),
  9.     nn.ReLU(),
  10.     nn.Linear(1000, 10)
  11. ).cuda()
  12. # 创建GradScaler用于梯度缩放
  13. scaler = GradScaler()
  14. # 生成输入数据
  15. input = torch.randn(100, 1000).cuda()
  16. target = torch.randint(0, 10, (100,)).cuda()
  17. # 查看内存使用
  18. print(f"AMP前: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
  19. # 使用自动混合精度进行训练
  20. optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
  21. with autocast():
  22.     output = model(input)
  23.     loss = nn.CrossEntropyLoss()(output, target)
  24. # 缩放损失并反向传播
  25. scaler.scale(loss).backward()
  26. # 缩放梯度并更新参数
  27. scaler.step(optimizer)
  28. scaler.update()
  29. # 查看内存使用
  30. print(f"AMP后: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
复制代码

TensorFlow的自动内存管理

TensorFlow也提供了多种自动内存管理策略,包括内存增长选项、虚拟设备和内存优化器等。

默认情况下,TensorFlow会一次性分配GPU上几乎所有可用的内存。我们可以启用内存增长选项,使TensorFlow只在需要时分配内存。
  1. import tensorflow as tf
  2. # 获取所有GPU设备
  3. gpus = tf.config.experimental.list_physical_devices('GPU')
  4. if gpus:
  5.     try:
  6.         # 启用内存增长
  7.         for gpu in gpus:
  8.             tf.config.experimental.set_memory_growth(gpu, True)
  9.         print("内存增长已启用")
  10.     except RuntimeError as e:
  11.         print(f"内存增长设置失败: {e}")
  12. # 查看内存使用情况
  13. for gpu in gpus:
  14.     memory_info = tf.config.experimental.get_memory_info(gpu.name)
  15.     print(f"GPU {gpu.name}:")
  16.     print(f"  当前使用: {memory_info['current'] / 1024**3:.2f} GB")
  17.     print(f"  峰值使用: {memory_info['peak'] / 1024**3:.2f} GB")
复制代码

我们可以配置虚拟设备,限制TensorFlow可以使用的GPU内存量。
  1. import tensorflow as tf
  2. # 获取所有GPU设备
  3. gpus = tf.config.experimental.list_physical_devices('GPU')
  4. if gpus:
  5.     try:
  6.         # 限制第一个GPU只能使用4GB内存
  7.         tf.config.experimental.set_virtual_device_configuration(
  8.             gpus[0],
  9.             [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=4096)]
  10.         )
  11.         print("虚拟设备配置已设置")
  12.     except RuntimeError as e:
  13.         print(f"虚拟设备配置失败: {e}")
  14. # 查看内存使用情况
  15. for gpu in gpus:
  16.     memory_info = tf.config.experimental.get_memory_info(gpu.name)
  17.     print(f"GPU {gpu.name}:")
  18.     print(f"  当前使用: {memory_info['current'] / 1024**3:.2f} GB")
  19.     print(f"  峰值使用: {memory_info['peak'] / 1024**3:.2f} GB")
复制代码

TensorFlow提供了多种内存优化技术,包括模型优化、梯度累积和混合精度训练等。
  1. import tensorflow as tf
  2. # 启用混合精度训练
  3. policy = tf.keras.mixed_precision.Policy('mixed_float16')
  4. tf.keras.mixed_precision.set_global_policy(policy)
  5. # 定义模型
  6. model = tf.keras.Sequential([
  7.     tf.keras.layers.Dense(1000, activation='relu'),
  8.     tf.keras.layers.Dense(1000, activation='relu'),
  9.     tf.keras.layers.Dense(10)
  10. ])
  11. # 编译模型
  12. model.compile(
  13.     optimizer='adam',
  14.     loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
  15.     metrics=['accuracy']
  16. )
  17. # 生成数据
  18. x_train = tf.random.normal((1000, 100))
  19. y_train = tf.random.uniform((1000,), maxval=10, dtype=tf.int32)
  20. # 查看内存使用
  21. gpus = tf.config.experimental.list_physical_devices('GPU')
  22. if gpus:
  23.     memory_info = tf.config.experimental.get_memory_info(gpus[0].name)
  24.     print(f"训练前内存使用: {memory_info['current'] / 1024**3:.2f} GB")
  25. # 训练模型
  26. model.fit(x_train, y_train, epochs=2)
  27. # 查看内存使用
  28. if gpus:
  29.     memory_info = tf.config.experimental.get_memory_info(gpus[0].name)
  30.     print(f"训练后内存使用: {memory_info['current'] / 1024**3:.2f} GB")
复制代码

常见问题及解决方案

显存泄漏的识别与解决

显存泄漏是指程序在运行过程中持续占用越来越多的显存,而不释放不再使用的内存。这最终会导致程序因显存不足而崩溃。
  1. import torch
  2. import time
  3. import matplotlib.pyplot as plt
  4. def check_memory_leak():
  5.     memory_usage = []
  6.     timestamps = []
  7.    
  8.     # 记录初始内存
  9.     initial_memory = torch.cuda.memory_allocated()
  10.     print(f"初始内存使用: {initial_memory / 1024**3:.2f} GB")
  11.    
  12.     # 模拟可能泄漏的操作
  13.     tensors = []
  14.     for i in range(10):
  15.         # 创建一些张量并添加到列表中
  16.         a = torch.randn(1000, 1000, device='cuda')
  17.         b = torch.randn(1000, 1000, device='cuda')
  18.         c = torch.matmul(a, b)
  19.         tensors.append(c)
  20.         
  21.         # 记录内存使用
  22.         current_memory = torch.cuda.memory_allocated()
  23.         memory_usage.append(current_memory / 1024**3)
  24.         timestamps.append(i)
  25.         
  26.         print(f"步骤 {i}: {current_memory / 1024**3:.2f} GB")
  27.         time.sleep(0.5)
  28.    
  29.     # 绘制内存使用图
  30.     plt.figure(figsize=(10, 6))
  31.     plt.plot(timestamps, memory_usage, 'b-')
  32.     plt.xlabel('步骤')
  33.     plt.ylabel('内存使用 (GB)')
  34.     plt.title('内存使用趋势')
  35.     plt.grid(True)
  36.     plt.show()
  37.    
  38.     # 释放张量
  39.     del tensors
  40.     torch.cuda.empty_cache()
  41.    
  42.     final_memory = torch.cuda.memory_allocated()
  43.     print(f"最终内存使用: {final_memory / 1024**3:.2f} GB")
  44. # 运行检查
  45. check_memory_leak()
复制代码
  1. import torch
  2. import gc
  3. import weakref
  4. # 解决方案1: 确保删除不再需要的对象
  5. def memory_leak_solution_1():
  6.     tensors = []
  7.     for i in range(10):
  8.         a = torch.randn(1000, 1000, device='cuda')
  9.         b = torch.randn(1000, 1000, device='cuda')
  10.         c = torch.matmul(a, b)
  11.         tensors.append(c)
  12.         
  13.         # 定期清理不再需要的张量
  14.         if i % 3 == 0:
  15.             # 删除前几个张量
  16.             for j in range(min(3, len(tensors))):
  17.                 del tensors[j]
  18.             tensors = tensors[3:]
  19.             
  20.             # 手动触发垃圾回收和清空缓存
  21.             gc.collect()
  22.             torch.cuda.empty_cache()
  23.             
  24.             print(f"步骤 {i}: 清理后内存使用 {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
  25. # 解决方案2: 使用弱引用避免循环引用
  26. def memory_leak_solution_2():
  27.     class TensorHolder:
  28.         def __init__(self, tensor):
  29.             self.tensor = tensor
  30.    
  31.     holders = []
  32.     for i in range(10):
  33.         a = torch.randn(1000, 1000, device='cuda')
  34.         b = torch.randn(1000, 1000, device='cuda')
  35.         c = torch.matmul(a, b)
  36.         
  37.         # 使用弱引用
  38.         holder = TensorHolder(c)
  39.         weak_holder = weakref.ref(holder)
  40.         holders.append(weak_holder)
  41.         
  42.         print(f"步骤 {i}: 内存使用 {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
  43.    
  44.     # 清理
  45.     del holders
  46.     gc.collect()
  47.     torch.cuda.empty_cache()
  48.     print(f"清理后内存使用 {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
  49. # 运行解决方案
  50. print("解决方案1:")
  51. memory_leak_solution_1()
  52. print("\n解决方案2:")
  53. memory_leak_solution_2()
复制代码

CUDA out of memory错误处理

“CUDA out of memory”是深度学习开发中常见的错误,通常发生在尝试分配的内存超过可用显存时。
  1. import torch
  2. import torch.nn as nn
  3. class DynamicBatchTrainer:
  4.     def __init__(self, model, initial_batch_size=32, min_batch_size=1):
  5.         self.model = model
  6.         self.current_batch_size = initial_batch_size
  7.         self.min_batch_size = min_batch_size
  8.         
  9.     def train_step(self, data, target, optimizer, criterion):
  10.         # 尝试使用当前批处理大小
  11.         while self.current_batch_size >= self.min_batch_size:
  12.             try:
  13.                 # 分割数据以适应当前批处理大小
  14.                 for i in range(0, len(data), self.current_batch_size):
  15.                     batch_data = data[i:i+self.current_batch_size]
  16.                     batch_target = target[i:i+self.current_batch_size]
  17.                     
  18.                     # 前向传播
  19.                     output = self.model(batch_data)
  20.                     loss = criterion(output, batch_target)
  21.                     
  22.                     # 反向传播
  23.                     optimizer.zero_grad()
  24.                     loss.backward()
  25.                     optimizer.step()
  26.                
  27.                 # 如果成功,增加批处理大小以获得更好的性能
  28.                 self.current_batch_size = min(self.current_batch_size * 2, len(data))
  29.                 return loss.item()
  30.                
  31.             except RuntimeError as e:
  32.                 if 'out of memory' in str(e):
  33.                     # 如果内存不足,减少批处理大小并重试
  34.                     torch.cuda.empty_cache()
  35.                     self.current_batch_size = max(self.current_batch_size // 2, self.min_batch_size)
  36.                     print(f"内存不足,减少批处理大小至 {self.current_batch_size}")
  37.                 else:
  38.                     # 如果是其他错误,重新抛出
  39.                     raise e
  40.         
  41.         # 如果达到最小批处理大小仍然失败,抛出错误
  42.         raise RuntimeError(f"即使使用最小批处理大小 {self.min_batch_size} 也无法训练")
  43. # 使用示例
  44. model = nn.Sequential(
  45.     nn.Linear(1000, 1000),
  46.     nn.ReLU(),
  47.     nn.Linear(1000, 10)
  48. ).cuda()
  49. optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
  50. criterion = nn.CrossEntropyLoss()
  51. # 生成大数据集
  52. data = torch.randn(1000, 1000).cuda()
  53. target = torch.randint(0, 10, (1000,)).cuda()
  54. # 创建动态批处理训练器
  55. trainer = DynamicBatchTrainer(model, initial_batch_size=256, min_batch_size=1)
  56. # 训练
  57. for epoch in range(3):
  58.     loss = trainer.train_step(data, target, optimizer, criterion)
  59.     print(f"Epoch {epoch}, Loss: {loss:.4f}, Batch Size: {trainer.current_batch_size}")
复制代码

梯度累积是一种技术,允许我们使用较小的批处理大小模拟较大的批处理大小,从而减少内存使用。
  1. import torch
  2. import torch.nn as nn
  3. class GradientAccumulator:
  4.     def __init__(self, model, accumulation_steps=4):
  5.         self.model = model
  6.         self.accumulation_steps = accumulation_steps
  7.         self.current_step = 0
  8.         
  9.     def train_step(self, data, target, optimizer, criterion):
  10.         # 前向传播
  11.         output = self.model(data)
  12.         loss = criterion(output, target)
  13.         
  14.         # 归一化损失以考虑累积
  15.         loss = loss / self.accumulation_steps
  16.         
  17.         # 反向传播
  18.         loss.backward()
  19.         
  20.         # 增加步骤计数
  21.         self.current_step += 1
  22.         
  23.         # 如果达到累积步骤,更新参数
  24.         if self.current_step % self.accumulation_steps == 0:
  25.             optimizer.step()
  26.             optimizer.zero_grad()
  27.             return True, loss.item() * self.accumulation_steps
  28.         
  29.         return False, loss.item() * self.accumulation_steps
  30. # 使用示例
  31. model = nn.Sequential(
  32.     nn.Linear(1000, 1000),
  33.     nn.ReLU(),
  34.     nn.Linear(1000, 10)
  35. ).cuda()
  36. optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
  37. criterion = nn.CrossEntropyLoss()
  38. # 生成大数据集
  39. data = torch.randn(1000, 1000).cuda()
  40. target = torch.randint(0, 10, (1000,)).cuda()
  41. # 创建梯度累积器
  42. accumulator = GradientAccumulator(model, accumulation_steps=4)
  43. # 训练
  44. batch_size = 32
  45. for epoch in range(3):
  46.     total_loss = 0
  47.     update_count = 0
  48.    
  49.     for i in range(0, len(data), batch_size):
  50.         batch_data = data[i:i+batch_size]
  51.         batch_target = target[i:i+batch_size]
  52.         
  53.         updated, loss = accumulator.train_step(batch_data, batch_target, optimizer, criterion)
  54.         total_loss += loss
  55.         
  56.         if updated:
  57.             update_count += 1
  58.    
  59.     avg_loss = total_loss / (len(data) // batch_size)
  60.     print(f"Epoch {epoch}, Loss: {avg_loss:.4f}, Updates: {update_count}")
复制代码

多GPU环境下的内存管理

在多GPU环境中,内存管理变得更加复杂,需要考虑数据并行、模型并行和负载均衡等问题。
  1. import torch
  2. import torch.nn as nn
  3. from torch.nn.parallel import DistributedDataParallel as DDP
  4. import torch.distributed as dist
  5. import torch.multiprocessing as mp
  6. import os
  7. def setup(rank, world_size):
  8.     os.environ['MASTER_ADDR'] = 'localhost'
  9.     os.environ['MASTER_PORT'] = '12355'
  10.    
  11.     # 初始化进程组
  12.     dist.init_process_group(
  13.         backend='nccl',
  14.         rank=rank,
  15.         world_size=world_size
  16.     )
  17. def cleanup():
  18.     dist.destroy_process_group()
  19. class ToyModel(nn.Module):
  20.     def __init__(self):
  21.         super(ToyModel, self).__init__()
  22.         self.net1 = nn.Linear(1000, 1000)
  23.         self.relu = nn.ReLU()
  24.         self.net2 = nn.Linear(1000, 10)
  25.         
  26.     def forward(self, x):
  27.         return self.net2(self.relu(self.net1(x)))
  28. def train(rank, world_size):
  29.     print(f"在GPU {rank} 上运行 DDP 示例")
  30.     setup(rank, world_size)
  31.    
  32.     # 创建模型并移至当前GPU
  33.     model = ToyModel().to(rank)
  34.    
  35.     # 使用DDP包装模型
  36.     ddp_model = DDP(model, device_ids=[rank])
  37.    
  38.     # 创建优化器
  39.     optimizer = torch.optim.SGD(ddp_model.parameters(), lr=0.001)
  40.    
  41.     # 生成数据
  42.     inputs = torch.randn(2000, 1000).to(rank)
  43.     labels = torch.randn(2000, 10).to(rank)
  44.    
  45.     # 训练循环
  46.     for epoch in range(2):
  47.         # 在每个GPU上处理数据的一部分
  48.         inputs_shard = inputs[rank::world_size]
  49.         labels_shard = labels[rank::world_size]
  50.         
  51.         # 前向传播
  52.         outputs = ddp_model(inputs_shard)
  53.         loss = torch.nn.functional.mse_loss(outputs, labels_shard)
  54.         
  55.         # 反向传播
  56.         optimizer.zero_grad()
  57.         loss.backward()
  58.         optimizer.step()
  59.         
  60.         # 打印内存使用情况
  61.         allocated = torch.cuda.memory_allocated(rank) / 1024**3
  62.         print(f"GPU {rank}, Epoch {epoch}, Loss: {loss.item()}, Memory: {allocated:.2f} GB")
  63.    
  64.     # 清理
  65.     cleanup()
  66. def main():
  67.     world_size = torch.cuda.device_count()
  68.     print(f"发现 {world_size} 个GPU")
  69.     if world_size > 1:
  70.         mp.spawn(train, args=(world_size,), nprocs=world_size, join=True)
  71.     else:
  72.         print("需要多个GPU才能运行此示例")
  73. if __name__ == '__main__':
  74.     main()
复制代码
  1. import torch
  2. import torch.nn as nn
  3. from torch.nn.parallel import DistributedDataParallel as DDP
  4. import torch.distributed as dist
  5. import torch.multiprocessing as mp
  6. import os
  7. def setup(rank, world_size):
  8.     os.environ['MASTER_ADDR'] = 'localhost'
  9.     os.environ['MASTER_PORT'] = '12355'
  10.    
  11.     # 初始化进程组
  12.     dist.init_process_group(
  13.         backend='nccl',
  14.         rank=rank,
  15.         world_size=world_size
  16.     )
  17. def cleanup():
  18.     dist.destroy_process_group()
  19. class ParallelModel(nn.Module):
  20.     def __init__(self, rank, world_size):
  21.         super(ParallelModel, self).__init__()
  22.         self.rank = rank
  23.         self.world_size = world_size
  24.         
  25.         # 将模型的不同层分配到不同的GPU
  26.         if rank == 0:
  27.             # 第一个GPU处理前半部分
  28.             self.part1 = nn.Sequential(
  29.                 nn.Linear(1000, 2000),
  30.                 nn.ReLU(),
  31.                 nn.Linear(2000, 2000),
  32.                 nn.ReLU()
  33.             )
  34.         else:
  35.             # 第二个GPU处理后半部分
  36.             self.part2 = nn.Sequential(
  37.                 nn.Linear(2000, 2000),
  38.                 nn.ReLU(),
  39.                 nn.Linear(2000, 10)
  40.             )
  41.    
  42.     def forward(self, x):
  43.         # 第一个GPU处理前向传播的第一部分
  44.         if self.rank == 0:
  45.             x = self.part1(x)
  46.             # 发送到下一个GPU
  47.             dist.send(x, dst=1)
  48.             # 接收最终结果
  49.             dist.recv(x, src=1)
  50.             return x
  51.         else:
  52.             # 第二个GPU接收中间结果
  53.             dist.recv(x, src=0)
  54.             # 处理后半部分
  55.             x = self.part2(x)
  56.             # 发送回第一个GPU
  57.             dist.send(x, dst=0)
  58.             return x
  59. def train(rank, world_size):
  60.     print(f"在GPU {rank} 上运行模型并行示例")
  61.     setup(rank, world_size)
  62.    
  63.     # 创建模型
  64.     model = ParallelModel(rank, world_size).to(rank)
  65.    
  66.     # 只在主进程创建优化器
  67.     if rank == 0:
  68.         optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
  69.    
  70.     # 生成数据
  71.     if rank == 0:
  72.         inputs = torch.randn(64, 1000).to(rank)
  73.         labels = torch.randn(64, 10).to(rank)
  74.    
  75.     # 训练循环
  76.     for epoch in range(2):
  77.         if rank == 0:
  78.             # 前向传播
  79.             outputs = model(inputs)
  80.             loss = torch.nn.functional.mse_loss(outputs, labels)
  81.             
  82.             # 反向传播
  83.             optimizer.zero_grad()
  84.             loss.backward()
  85.             optimizer.step()
  86.             
  87.             # 打印内存使用情况
  88.             allocated = torch.cuda.memory_allocated(rank) / 1024**3
  89.             print(f"GPU {rank}, Epoch {epoch}, Loss: {loss.item()}, Memory: {allocated:.2f} GB")
  90.         else:
  91.             # 其他GPU只参与前向和反向传播
  92.             outputs = model(inputs)
  93.             # 反向传播会自动处理
  94.             outputs.sum().backward()
  95.             
  96.             # 打印内存使用情况
  97.             allocated = torch.cuda.memory_allocated(rank) / 1024**3
  98.             print(f"GPU {rank}, Epoch {epoch}, Memory: {allocated:.2f} GB")
  99.    
  100.     # 清理
  101.     cleanup()
  102. def main():
  103.     world_size = 2  # 使用2个GPU
  104.     if torch.cuda.device_count() >= world_size:
  105.         mp.spawn(train, args=(world_size,), nprocs=world_size, join=True)
  106.     else:
  107.         print(f"需要至少 {world_size} 个GPU才能运行此示例")
  108. if __name__ == '__main__':
  109.     main()
复制代码

最佳实践

监控GPU内存使用

有效管理GPU内存的第一步是了解内存的使用情况。以下是一些监控GPU内存使用的方法:
  1. import torch
  2. import time
  3. import psutil
  4. import GPUtil
  5. from datetime import datetime
  6. class GPUMemoryMonitor:
  7.     def __init__(self, interval=1.0):
  8.         self.interval = interval
  9.         self.running = False
  10.         self.log_data = []
  11.         
  12.     def start(self):
  13.         self.running = True
  14.         self.log_data = []
  15.         print("开始监控GPU内存使用...")
  16.         
  17.         while self.running:
  18.             timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  19.             
  20.             # 获取GPU信息
  21.             gpus = GPUtil.getGPUs()
  22.             for i, gpu in enumerate(gpus):
  23.                 memory_used = gpu.memoryUsed
  24.                 memory_total = gpu.memoryTotal
  25.                 memory_percent = gpu.memoryUtil * 100
  26.                 gpu_load = gpu.load * 100
  27.                
  28.                 log_entry = {
  29.                     'timestamp': timestamp,
  30.                     'gpu_id': i,
  31.                     'memory_used': memory_used,
  32.                     'memory_total': memory_total,
  33.                     'memory_percent': memory_percent,
  34.                     'gpu_load': gpu_load
  35.                 }
  36.                
  37.                 self.log_data.append(log_entry)
  38.                
  39.                 print(f"[{timestamp}] GPU {i}: 内存 {memory_used}/{memory_total} MB ({memory_percent:.1f}%), 负载 {gpu_load:.1f}%")
  40.             
  41.             time.sleep(self.interval)
  42.    
  43.     def stop(self):
  44.         self.running = False
  45.         print("停止监控GPU内存使用")
  46.    
  47.     def get_memory_usage_plot(self):
  48.         import matplotlib.pyplot as plt
  49.         import pandas as pd
  50.         
  51.         if not self.log_data:
  52.             print("没有可用的监控数据")
  53.             return
  54.         
  55.         # 转换为DataFrame
  56.         df = pd.DataFrame(self.log_data)
  57.         
  58.         # 绘制内存使用图
  59.         plt.figure(figsize=(12, 6))
  60.         
  61.         for gpu_id in df['gpu_id'].unique():
  62.             gpu_data = df[df['gpu_id'] == gpu_id]
  63.             plt.plot(gpu_data['timestamp'], gpu_data['memory_percent'], label=f'GPU {gpu_id}')
  64.         
  65.         plt.xlabel('时间')
  66.         plt.ylabel('内存使用率 (%)')
  67.         plt.title('GPU内存使用率随时间变化')
  68.         plt.legend()
  69.         plt.grid(True)
  70.         plt.xticks(rotation=45)
  71.         plt.tight_layout()
  72.         plt.show()
  73. # 使用示例
  74. monitor = GPUMemoryMonitor(interval=0.5)
  75. # 在单独的线程中启动监控
  76. import threading
  77. monitor_thread = threading.Thread(target=monitor.start)
  78. monitor_thread.start()
  79. # 模拟一些GPU操作
  80. try:
  81.     a = torch.randn(5000, 5000, device='cuda')
  82.     time.sleep(2)
  83.     b = torch.randn(5000, 5000, device='cuda')
  84.     time.sleep(2)
  85.     c = torch.matmul(a, b)
  86.     time.sleep(2)
  87.     del a, b, c
  88.     torch.cuda.empty_cache()
  89.     time.sleep(2)
  90. finally:
  91.     # 停止监控
  92.     monitor.stop()
  93.     monitor_thread.join()
  94.    
  95.     # 显示内存使用图
  96.     monitor.get_memory_usage_plot()
复制代码

内存优化策略

以下是一些有效的内存优化策略:
  1. import torch
  2. import torch.nn as nn
  3. from torch.utils.checkpoint import checkpoint
  4. from torch.cuda.amp import autocast, GradScaler
  5. class MemoryOptimizedTrainer:
  6.     def __init__(self, model, device='cuda'):
  7.         self.model = model.to(device)
  8.         self.device = device
  9.         self.scaler = GradScaler()
  10.         
  11.     def train_with_gradient_checkpointing(self, data_loader, optimizer, criterion, epochs=1):
  12.         """使用梯度检查点进行训练,减少内存使用"""
  13.         self.model.train()
  14.         
  15.         for epoch in range(epochs):
  16.             total_loss = 0
  17.             for batch_idx, (data, target) in enumerate(data_loader):
  18.                 data, target = data.to(self.device), target.to(self.device)
  19.                
  20.                 def custom_forward(*inputs):
  21.                     """自定义前向函数,用于梯度检查点"""
  22.                     return self.model(inputs[0])
  23.                
  24.                 # 使用梯度检查点
  25.                 output = checkpoint(custom_forward, data)
  26.                 loss = criterion(output, target)
  27.                
  28.                 # 反向传播
  29.                 optimizer.zero_grad()
  30.                 loss.backward()
  31.                 optimizer.step()
  32.                
  33.                 total_loss += loss.item()
  34.                
  35.                 # 打印内存使用情况
  36.                 if batch_idx % 10 == 0:
  37.                     memory_used = torch.cuda.memory_allocated(self.device) / 1024**3
  38.                     print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}, Memory: {memory_used:.2f} GB')
  39.             
  40.             avg_loss = total_loss / len(data_loader)
  41.             print(f'Epoch {epoch}, Average Loss: {avg_loss:.4f}')
  42.    
  43.     def train_with_mixed_precision(self, data_loader, optimizer, criterion, epochs=1):
  44.         """使用混合精度进行训练,减少内存使用并提高速度"""
  45.         self.model.train()
  46.         
  47.         for epoch in range(epochs):
  48.             total_loss = 0
  49.             for batch_idx, (data, target) in enumerate(data_loader):
  50.                 data, target = data.to(self.device), target.to(self.device)
  51.                
  52.                 # 使用自动混合精度
  53.                 with autocast():
  54.                     output = self.model(data)
  55.                     loss = criterion(output, target)
  56.                
  57.                 # 缩放损失并反向传播
  58.                 optimizer.zero_grad()
  59.                 self.scaler.scale(loss).backward()
  60.                
  61.                 # 缩放梯度并更新参数
  62.                 self.scaler.step(optimizer)
  63.                 self.scaler.update()
  64.                
  65.                 total_loss += loss.item()
  66.                
  67.                 # 打印内存使用情况
  68.                 if batch_idx % 10 == 0:
  69.                     memory_used = torch.cuda.memory_allocated(self.device) / 1024**3
  70.                     print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}, Memory: {memory_used:.2f} GB')
  71.             
  72.             avg_loss = total_loss / len(data_loader)
  73.             print(f'Epoch {epoch}, Average Loss: {avg_loss:.4f}')
  74.    
  75.     def train_with_gradient_accumulation(self, data_loader, optimizer, criterion, accumulation_steps=4, epochs=1):
  76.         """使用梯度累积进行训练,模拟更大的批处理大小"""
  77.         self.model.train()
  78.         
  79.         for epoch in range(epochs):
  80.             total_loss = 0
  81.             optimizer.zero_grad()
  82.             
  83.             for batch_idx, (data, target) in enumerate(data_loader):
  84.                 data, target = data.to(self.device), target.to(self.device)
  85.                
  86.                 # 前向传播
  87.                 with autocast():
  88.                     output = self.model(data)
  89.                     loss = criterion(output, target)
  90.                
  91.                 # 归一化损失
  92.                 loss = loss / accumulation_steps
  93.                
  94.                 # 反向传播
  95.                 self.scaler.scale(loss).backward()
  96.                
  97.                 total_loss += loss.item() * accumulation_steps
  98.                
  99.                 # 如果达到累积步骤,更新参数
  100.                 if (batch_idx + 1) % accumulation_steps == 0:
  101.                     # 缩放梯度并更新参数
  102.                     self.scaler.step(optimizer)
  103.                     self.scaler.update()
  104.                     optimizer.zero_grad()
  105.                
  106.                 # 打印内存使用情况
  107.                 if batch_idx % 10 == 0:
  108.                     memory_used = torch.cuda.memory_allocated(self.device) / 1024**3
  109.                     print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item() * accumulation_steps:.4f}, Memory: {memory_used:.2f} GB')
  110.             
  111.             # 处理剩余的批次
  112.             if len(data_loader) % accumulation_steps != 0:
  113.                 self.scaler.step(optimizer)
  114.                 self.scaler.update()
  115.                 optimizer.zero_grad()
  116.             
  117.             avg_loss = total_loss / len(data_loader)
  118.             print(f'Epoch {epoch}, Average Loss: {avg_loss:.4f}')
  119. # 使用示例
  120. # 创建一个简单的模型
  121. model = nn.Sequential(
  122.     nn.Linear(1000, 2000),
  123.     nn.ReLU(),
  124.     nn.Linear(2000, 2000),
  125.     nn.ReLU(),
  126.     nn.Linear(2000, 10)
  127. )
  128. # 创建优化器和损失函数
  129. optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
  130. criterion = nn.CrossEntropyLoss()
  131. # 创建数据加载器
  132. from torch.utils.data import DataLoader, TensorDataset
  133. data = torch.randn(1000, 1000)
  134. target = torch.randint(0, 10, (1000,))
  135. dataset = TensorDataset(data, target)
  136. data_loader = DataLoader(dataset, batch_size=32, shuffle=True)
  137. # 创建内存优化训练器
  138. trainer = MemoryOptimizedTrainer(model)
  139. # 使用不同的优化策略进行训练
  140. print("使用梯度检查点训练:")
  141. trainer.train_with_gradient_checkpointing(data_loader, optimizer, criterion, epochs=1)
  142. print("\n使用混合精度训练:")
  143. trainer.train_with_mixed_precision(data_loader, optimizer, criterion, epochs=1)
  144. print("\n使用梯度累积训练:")
  145. trainer.train_with_gradient_accumulation(data_loader, optimizer, criterion, accumulation_steps=4, epochs=1)
复制代码

内存泄漏预防

预防内存泄漏比解决内存泄漏更容易。以下是一些预防内存泄漏的最佳实践:
  1. import torch
  2. import gc
  3. import weakref
  4. from contextlib import contextmanager
  5. class MemoryLeakPrevention:
  6.     @staticmethod
  7.     @contextmanager
  8.     def memory_context(device='cuda'):
  9.         """上下文管理器,确保在退出时清理内存"""
  10.         try:
  11.             # 记录初始内存状态
  12.             initial_memory = torch.cuda.memory_allocated(device=device)
  13.             yield
  14.         finally:
  15.             # 清理内存
  16.             gc.collect()
  17.             torch.cuda.empty_cache(device=device)
  18.             final_memory = torch.cuda.memory_allocated(device=device)
  19.             print(f"内存变化: {(initial_memory - final_memory) / 1024**3:.2f} GB")
  20.    
  21.     @staticmethod
  22.     def safe_model_loading(model_path, device='cuda'):
  23.         """安全加载模型,避免内存泄漏"""
  24.         with MemoryLeakPrevention.memory_context(device):
  25.             # 首先在CPU上加载模型
  26.             model = torch.load(model_path, map_location='cpu')
  27.             
  28.             # 然后移动到目标设备
  29.             model = model.to(device)
  30.             
  31.             return model
  32.    
  33.     @staticmethod
  34.     def safe_tensor_creation(size, device='cuda', dtype=torch.float32):
  35.         """安全创建张量,避免内存泄漏"""
  36.         with MemoryLeakPrevention.memory_context(device):
  37.             return torch.randn(size, device=device, dtype=dtype)
  38.    
  39.     @staticmethod
  40.     def safe_training_loop(model, data_loader, optimizer, criterion, epochs=1, device='cuda'):
  41.         """安全的训练循环,避免内存泄漏"""
  42.         model = model.to(device)
  43.         
  44.         for epoch in range(epochs):
  45.             model.train()
  46.             total_loss = 0
  47.             
  48.             for batch_idx, (data, target) in enumerate(data_loader):
  49.                 data, target = data.to(device), target.to(device)
  50.                
  51.                 # 前向传播
  52.                 output = model(data)
  53.                 loss = criterion(output, target)
  54.                
  55.                 # 反向传播
  56.                 optimizer.zero_grad()
  57.                 loss.backward()
  58.                 optimizer.step()
  59.                
  60.                 total_loss += loss.item()
  61.                
  62.                 # 定期清理内存
  63.                 if batch_idx % 100 == 0:
  64.                     with MemoryLeakPrevention.memory_context(device):
  65.                         pass  # 上下文管理器会自动清理内存
  66.             
  67.             avg_loss = total_loss / len(data_loader)
  68.             print(f'Epoch {epoch}, Average Loss: {avg_loss:.4f}')
  69.    
  70.     @staticmethod
  71.     def safe_inference(model, data_loader, device='cuda'):
  72.         """安全的推理过程,避免内存泄漏"""
  73.         model = model.to(device)
  74.         model.eval()
  75.         
  76.         results = []
  77.         
  78.         with torch.no_grad():
  79.             for batch_idx, (data, _) in enumerate(data_loader):
  80.                 data = data.to(device)
  81.                
  82.                 # 前向传播
  83.                 output = model(data)
  84.                
  85.                 # 保存结果
  86.                 results.append(output.cpu())
  87.                
  88.                 # 定期清理内存
  89.                 if batch_idx % 100 == 0:
  90.                     with MemoryLeakPrevention.memory_context(device):
  91.                         pass  # 上下文管理器会自动清理内存
  92.         
  93.         return torch.cat(results)
  94. # 使用示例
  95. # 创建一个简单的模型
  96. model = torch.nn.Sequential(
  97.     torch.nn.Linear(1000, 100),
  98.     torch.nn.ReLU(),
  99.     torch.nn.Linear(100, 10)
  100. )
  101. # 创建数据加载器
  102. from torch.utils.data import DataLoader, TensorDataset
  103. data = torch.randn(1000, 1000)
  104. target = torch.randint(0, 10, (1000,))
  105. dataset = TensorDataset(data, target)
  106. data_loader = DataLoader(dataset, batch_size=32, shuffle=True)
  107. # 创建优化器和损失函数
  108. optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
  109. criterion = torch.nn.CrossEntropyLoss()
  110. # 使用安全的训练循环
  111. print("使用安全的训练循环:")
  112. MemoryLeakPrevention.safe_training_loop(model, data_loader, optimizer, criterion, epochs=1)
  113. # 使用安全的推理过程
  114. print("\n使用安全的推理过程:")
  115. results = MemoryLeakPrevention.safe_inference(model, data_loader)
  116. print(f"推理结果形状: {results.shape}")
  117. # 安全创建张量
  118. print("\n安全创建张量:")
  119. tensor = MemoryLeakPrevention.safe_tensor_creation((1000, 1000))
  120. print(f"张量形状: {tensor.shape}")
复制代码

结论

在深度学习项目开发中,有效管理GPU内存资源是确保程序稳定运行的关键。本文全面介绍了Python中释放显存的实用技术,包括手动释放方法、自动内存管理策略和常见问题解决方案。

通过掌握这些技术,开发者可以:

1. 更有效地管理GPU内存资源,避免内存不足的问题
2. 防止显存泄漏,提高程序的稳定性
3. 在有限的硬件资源上训练更大的模型
4. 优化内存使用,提高程序运行效率

在实际应用中,建议开发者结合多种技术,根据具体项目需求选择最适合的内存管理策略。同时,定期监控内存使用情况,及时发现和解决内存问题,也是确保程序稳定运行的重要措施。

随着深度学习技术的不断发展,GPU内存管理技术也在不断进步。开发者应保持学习,关注最新的内存管理技术和最佳实践,以应对日益复杂的深度学习项目需求。
「七転び八起き(ななころびやおき)」
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则