|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
Python的requests库是进行HTTP请求的流行选择,它简洁易用的API使得发送网络请求变得异常简单。然而,在实际开发中,许多开发者往往忽视了资源管理的重要性,导致内存泄漏、连接耗尽等问题,影响应用程序的稳定性和性能。本文将深入探讨requests库中的资源释放关键技巧,帮助你避免常见陷阱,让你的网络请求代码更高效、更稳定。
requests库基础回顾
requests库是一个用于Python的HTTP库,它提供了简洁的API来发送HTTP请求。以下是一个基本的requests使用示例:
- import requests
- # 发送GET请求
- response = requests.get('https://api.example.com/data')
- # 获取响应内容
- print(response.text)
复制代码
虽然这种简单的用法在日常开发中很常见,但它隐藏了潜在的资源管理问题。requests库基于urllib3构建,它使用连接池来管理HTTP连接。当你发送一个请求时,requests会从连接池中获取一个连接(如果可用),或者创建一个新连接。请求完成后,连接会被返回到连接池中,而不是立即关闭。这种机制可以提高性能,因为重用连接比创建新连接更高效。
然而,如果不正确地管理这些连接,就可能导致资源泄漏和连接耗尽问题。
常见资源泄漏问题
1. 未正确关闭响应对象
requests的响应对象包含底层的文件描述符和网络连接,如果不正确关闭,会导致资源泄漏。考虑以下代码:
- import requests
- def fetch_data(url):
- response = requests.get(url, stream=True)
- # 处理响应数据
- data = response.json()
- return data
复制代码
在这个例子中,我们使用了stream=True参数,这意味着响应体不会被立即下载,而是保持连接打开状态。如果我们不显式关闭响应,连接将保持打开状态,消耗系统资源。
2. 连接池耗尽
requests默认使用连接池来重用连接,但如果同时打开太多连接而不释放,可能会导致连接池耗尽。例如:
- import requests
- import threading
- def make_request(url):
- response = requests.get(url)
- print(response.text)
- # 创建多个线程同时发送请求
- threads = []
- for _ in range(100):
- thread = threading.Thread(target=make_request, args=('https://api.example.com/data',))
- threads.append(thread)
- thread.start()
- for thread in threads:
- thread.join()
复制代码
在这个例子中,我们创建了100个线程同时发送请求。如果服务器响应慢或者网络延迟高,可能会导致大量连接同时打开,耗尽连接池。
3. 大文件下载时的内存问题
当下载大文件时,如果不使用流式传输,整个文件内容会被加载到内存中,可能导致内存溢出:
- import requests
- def download_large_file(url):
- response = requests.get(url) # 不使用stream=True
- with open('large_file.zip', 'wb') as f:
- f.write(response.content) # 整个文件被加载到内存
复制代码
资源释放的关键技巧
1. 使用上下文管理器(with语句)
Python的上下文管理器(with语句)是管理资源的最佳方式,它可以确保资源在使用后被正确释放。对于requests,我们可以这样使用:
- import requests
- def fetch_data(url):
- with requests.get(url, stream=True) as response:
- response.raise_for_status() # 检查请求是否成功
- data = response.json()
- # 响应会自动关闭,不需要显式调用response.close()
- return data
复制代码
使用with语句可以确保无论请求过程中是否发生异常,响应对象都会被正确关闭。
2. 显式关闭响应
如果不使用with语句,应该显式关闭响应:
- import requests
- def fetch_data(url):
- response = requests.get(url, stream=True)
- try:
- response.raise_for_status()
- data = response.json()
- return data
- finally:
- response.close() # 确保响应被关闭
复制代码
3. 使用Session对象管理连接池
requests的Session对象可以管理连接池,提供连接重用和cookie持久化等功能。使用Session对象可以更有效地管理连接:
- import requests
- def fetch_multiple_urls(urls):
- with requests.Session() as session:
- results = []
- for url in urls:
- try:
- response = session.get(url)
- response.raise_for_status()
- results.append(response.json())
- finally:
- response.close()
- return results
复制代码
使用Session对象可以重用连接,减少建立新连接的开销,提高性能。
4. 流式处理大文件
对于大文件下载,应该使用流式处理,避免将整个文件加载到内存:
- import requests
- def download_large_file(url, file_path):
- with requests.get(url, stream=True) as response:
- response.raise_for_status()
- with open(file_path, 'wb') as f:
- for chunk in response.iter_content(chunk_size=8192):
- if chunk: # 过滤掉保持连接的新块
- f.write(chunk)
复制代码
这种方式可以逐块处理响应内容,避免内存溢出。
5. 设置合理的超时时间
设置合理的超时时间可以避免长时间等待响应,导致连接资源被占用:
- import requests
- def fetch_data_with_timeout(url, timeout=10):
- try:
- with requests.get(url, timeout=timeout) as response:
- response.raise_for_status()
- return response.json()
- except requests.Timeout:
- print(f"请求超时: {url}")
- return None
复制代码
连接池管理
requests库通过urllib3的连接池管理HTTP连接。了解如何配置连接池参数可以帮助我们更有效地管理连接资源。
1. 配置连接池参数
可以通过Session对象配置连接池参数:
- import requests
- def create_session_with_pool_settings():
- session = requests.Session()
-
- # 设置连接池大小
- adapter = requests.adapters.HTTPAdapter(
- pool_connections=10, # 连接池大小
- pool_maxsize=20, # 最大连接数
- max_retries=3 # 最大重试次数
- )
-
- session.mount('http://', adapter)
- session.mount('https://', adapter)
-
- return session
复制代码
在这个例子中,我们配置了连接池大小为10,最大连接数为20,最大重试次数为3。
2. 监控连接池状态
可以通过适配器监控连接池状态:
- import requests
- def monitor_pool_status(session):
- for prefix, adapter in session.adapters.items():
- # 获取连接池
- pool = adapter.poolmanager.connection_from_url(prefix + '://example.com').pool
-
- print(f"Prefix: {prefix}")
- print(f"Pool size: {pool.qsize()}")
- print(f"Active connections: {len(pool.pool)}")
复制代码
3. 清理连接池
在某些情况下,可能需要手动清理连接池:
- import requests
- def clear_connection_pool(session):
- for adapter in session.adapters.values():
- if hasattr(adapter, 'poolmanager'):
- adapter.poolmanager.clear()
复制代码
实践案例
案例1:批量API请求处理
假设我们需要从API批量获取数据,以下是使用正确资源管理方式的实现:
- import requests
- from concurrent.futures import ThreadPoolExecutor, as_completed
- def fetch_single_data(url, session):
- try:
- with session.get(url, timeout=10) as response:
- response.raise_for_status()
- return response.json()
- except requests.RequestException as e:
- print(f"请求失败: {url}, 错误: {e}")
- return None
- def batch_fetch_data(urls, max_workers=5):
- results = []
-
- # 使用Session对象重用连接
- with requests.Session() as session:
- # 配置连接池
- adapter = requests.adapters.HTTPAdapter(
- pool_connections=max_workers,
- pool_maxsize=max_workers * 2,
- max_retries=3
- )
- session.mount('http://', adapter)
- session.mount('https://', adapter)
-
- # 使用线程池并发处理请求
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- future_to_url = {
- executor.submit(fetch_single_data, url, session): url
- for url in urls
- }
-
- for future in as_completed(future_to_url):
- url = future_to_url[future]
- try:
- data = future.result()
- if data:
- results.append(data)
- except Exception as e:
- print(f"处理结果时出错: {url}, 错误: {e}")
-
- return results
复制代码
在这个例子中,我们:
1. 使用Session对象重用连接
2. 配置了合适的连接池参数
3. 使用线程池并发处理请求
4. 每个请求都使用with语句确保响应被正确关闭
5. 设置了合理的超时时间
6. 正确处理了异常情况
案例2:大文件下载器
以下是一个高效的大文件下载器实现:
- import os
- import requests
- import time
- from tqdm import tqdm # 进度条库
- class FileDownloader:
- def __init__(self, chunk_size=8192, timeout=30):
- self.chunk_size = chunk_size
- self.timeout = timeout
- self.session = requests.Session()
-
- # 配置连接池
- adapter = requests.adapters.HTTPAdapter(
- pool_connections=1,
- pool_maxsize=1,
- max_retries=3
- )
- self.session.mount('http://', adapter)
- self.session.mount('https://', adapter)
-
- def download(self, url, file_path):
- # 检查文件是否已存在
- if os.path.exists(file_path):
- file_size = os.path.getsize(file_path)
-
- # 检查服务器是否支持断点续传
- headers = {'Range': f'bytes={file_size}-'}
- try:
- with self.session.get(url, headers=headers, stream=True, timeout=self.timeout) as response:
- if response.status_code == 206: # Partial Content
- # 支持断点续传
- mode = 'ab'
- total_size = file_size + int(response.headers.get('content-length', 0))
- else:
- # 不支持断点续传,重新下载
- mode = 'wb'
- total_size = int(response.headers.get('content-length', 0))
- except requests.RequestException:
- # 出错时重新下载
- mode = 'wb'
- total_size = 0
- else:
- # 文件不存在,从头下载
- mode = 'wb'
- total_size = 0
-
- try:
- # 发送请求
- headers = {}
- if mode == 'ab':
- headers['Range'] = f'bytes={os.path.getsize(file_path)}-'
-
- with self.session.get(url, headers=headers, stream=True, timeout=self.timeout) as response:
- response.raise_for_status()
-
- # 获取总大小
- if total_size == 0:
- total_size = int(response.headers.get('content-length', 0))
-
- # 下载文件
- with open(file_path, mode) as f, tqdm(
- desc=os.path.basename(file_path),
- total=total_size,
- unit='B',
- unit_scale=True,
- unit_divisor=1024,
- initial=0 if mode == 'wb' else os.path.getsize(file_path)
- ) as progress_bar:
- start_time = time.time()
- for chunk in response.iter_content(chunk_size=self.chunk_size):
- if chunk: # 过滤掉保持连接的新块
- f.write(chunk)
- progress_bar.update(len(chunk))
-
- # 计算下载速度
- elapsed_time = time.time() - start_time
- download_size = os.path.getsize(file_path)
- speed = download_size / elapsed_time if elapsed_time > 0 else 0
- progress_bar.set_postfix({
- 'speed': f'{speed/1024/1024:.2f}MB/s'
- })
-
- return True
- except requests.RequestException as e:
- print(f"下载失败: {e}")
- return False
- except Exception as e:
- print(f"文件操作失败: {e}")
- return False
-
- def close(self):
- self.session.close()
复制代码
这个文件下载器实现了以下功能:
1. 支持断点续传
2. 显示下载进度条和速度
3. 使用流式下载,避免内存溢出
4. 正确管理连接资源
5. 处理各种异常情况
案例3:高并发API客户端
以下是一个高并发API客户端的实现,适用于需要处理大量请求的场景:
- import requests
- import time
- import threading
- from queue import Queue
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from dataclasses import dataclass
- from typing import List, Dict, Any, Optional, Callable
- @dataclass
- class APIResponse:
- success: bool
- data: Optional[Any] = None
- error: Optional[str] = None
- status_code: Optional[int] = None
- response_time: Optional[float] = None
- class HighConcurrencyAPIClient:
- def __init__(
- self,
- base_url: str,
- max_workers: int = 10,
- pool_connections: int = 10,
- pool_maxsize: int = 20,
- max_retries: int = 3,
- request_timeout: int = 30
- ):
- self.base_url = base_url.rstrip('/')
- self.max_workers = max_workers
- self.request_timeout = request_timeout
-
- # 创建Session对象
- self.session = requests.Session()
-
- # 配置连接池
- adapter = requests.adapters.HTTPAdapter(
- pool_connections=pool_connections,
- pool_maxsize=pool_maxsize,
- max_retries=max_retries
- )
- self.session.mount('http://', adapter)
- self.session.mount('https://', adapter)
-
- # 请求队列
- self.request_queue = Queue()
-
- # 结果队列
- self.result_queue = Queue()
-
- # 线程池
- self.executor = ThreadPoolExecutor(max_workers=max_workers)
-
- # 统计信息
- self.stats = {
- 'total_requests': 0,
- 'successful_requests': 0,
- 'failed_requests': 0,
- 'total_response_time': 0,
- 'average_response_time': 0
- }
-
- # 锁对象,用于线程同步
- self.lock = threading.Lock()
-
- def _make_request(self, endpoint: str, method: str = 'GET',
- params: Optional[Dict] = None,
- data: Optional[Dict] = None,
- json: Optional[Dict] = None,
- headers: Optional[Dict] = None) -> APIResponse:
- url = f"{self.base_url}/{endpoint.lstrip('/')}"
- start_time = time.time()
-
- try:
- with self.session.request(
- method=method,
- url=url,
- params=params,
- data=data,
- json=json,
- headers=headers,
- timeout=self.request_timeout
- ) as response:
- response_time = time.time() - start_time
-
- # 更新统计信息
- with self.lock:
- self.stats['total_requests'] += 1
- self.stats['total_response_time'] += response_time
- self.stats['average_response_time'] = (
- self.stats['total_response_time'] / self.stats['total_requests']
- )
-
- if response.ok:
- try:
- response_data = response.json()
- with self.lock:
- self.stats['successful_requests'] += 1
- return APIResponse(
- success=True,
- data=response_data,
- status_code=response.status_code,
- response_time=response_time
- )
- except ValueError:
- with self.lock:
- self.stats['failed_requests'] += 1
- return APIResponse(
- success=False,
- error="Invalid JSON response",
- status_code=response.status_code,
- response_time=response_time
- )
- else:
- with self.lock:
- self.stats['failed_requests'] += 1
- return APIResponse(
- success=False,
- error=f"HTTP Error: {response.status_code}",
- status_code=response.status_code,
- response_time=response_time
- )
-
- except requests.Timeout:
- with self.lock:
- self.stats['failed_requests'] += 1
- return APIResponse(
- success=False,
- error="Request timeout",
- response_time=time.time() - start_time
- )
- except requests.ConnectionError:
- with self.lock:
- self.stats['failed_requests'] += 1
- return APIResponse(
- success=False,
- error="Connection error",
- response_time=time.time() - start_time
- )
- except requests.RequestException as e:
- with self.lock:
- self.stats['failed_requests'] += 1
- return APIResponse(
- success=False,
- error=f"Request error: {str(e)}",
- response_time=time.time() - start_time
- )
- except Exception as e:
- with self.lock:
- self.stats['failed_requests'] += 1
- return APIResponse(
- success=False,
- error=f"Unexpected error: {str(e)}",
- response_time=time.time() - start_time
- )
-
- def request(self, endpoint: str, method: str = 'GET',
- params: Optional[Dict] = None,
- data: Optional[Dict] = None,
- json: Optional[Dict] = None,
- headers: Optional[Dict] = None,
- async_request: bool = False) -> APIResponse:
- if async_request:
- # 异步请求
- future = self.executor.submit(
- self._make_request,
- endpoint=endpoint,
- method=method,
- params=params,
- data=data,
- json=json,
- headers=headers
- )
- return future
- else:
- # 同步请求
- return self._make_request(
- endpoint=endpoint,
- method=method,
- params=params,
- data=data,
- json=json,
- headers=headers
- )
-
- def batch_request(self, requests_list: List[Dict]) -> List[APIResponse]:
- futures = []
-
- for req in requests_list:
- future = self.request(
- endpoint=req.get('endpoint', ''),
- method=req.get('method', 'GET'),
- params=req.get('params'),
- data=req.get('data'),
- json=req.get('json'),
- headers=req.get('headers'),
- async_request=True
- )
- futures.append(future)
-
- results = []
- for future in as_completed(futures):
- try:
- result = future.result()
- results.append(result)
- except Exception as e:
- results.append(APIResponse(
- success=False,
- error=f"Future error: {str(e)}"
- ))
-
- return results
-
- def get_stats(self) -> Dict[str, Any]:
- return self.stats.copy()
-
- def reset_stats(self):
- with self.lock:
- self.stats = {
- 'total_requests': 0,
- 'successful_requests': 0,
- 'failed_requests': 0,
- 'total_response_time': 0,
- 'average_response_time': 0
- }
-
- def close(self):
- self.executor.shutdown(wait=True)
- self.session.close()
复制代码
这个高并发API客户端实现了以下功能:
1. 支持同步和异步请求
2. 批量请求处理
3. 连接池管理
4. 请求统计
5. 线程安全
6. 完善的错误处理
7. 资源正确释放
性能优化
通过正确的资源管理,我们可以显著提升requests库的性能。以下是一些性能优化技巧:
1. 重用Session对象
重用Session对象可以避免每次请求都创建新的连接,减少连接建立的开销:
- import requests
- import time
- def without_session(urls):
- results = []
- for url in urls:
- response = requests.get(url)
- results.append(response.json())
- return results
- def with_session(urls):
- results = []
- with requests.Session() as session:
- for url in urls:
- response = session.get(url)
- results.append(response.json())
- return results
- # 性能测试
- urls = ['https://api.example.com/data'] * 10
- start_time = time.time()
- without_session(urls)
- print(f"不使用Session: {time.time() - start_time:.2f}秒")
- start_time = time.time()
- with_session(urls)
- print(f"使用Session: {time.time() - start_time:.2f}秒")
复制代码
2. 调整连接池参数
根据应用场景调整连接池参数可以提升性能:
- import requests
- from concurrent.futures import ThreadPoolExecutor
- def create_optimized_session(max_workers):
- session = requests.Session()
-
- # 根据并发数调整连接池参数
- adapter = requests.adapters.HTTPAdapter(
- pool_connections=max_workers,
- pool_maxsize=max_workers * 2,
- max_retries=3
- )
-
- session.mount('http://', adapter)
- session.mount('https://', adapter)
-
- return session
- def fetch_url(session, url):
- with session.get(url) as response:
- return response.json()
- def concurrent_fetch(urls, max_workers=10):
- session = create_optimized_session(max_workers)
-
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- results = list(executor.map(lambda url: fetch_url(session, url), urls))
-
- session.close()
- return results
复制代码
3. 使用压缩
启用压缩可以减少网络传输量,提高性能:
- import requests
- def fetch_with_compression(url):
- headers = {
- 'Accept-Encoding': 'gzip, deflate'
- }
-
- with requests.get(url, headers=headers) as response:
- return response.json()
复制代码
4. 缓存响应
对于不经常变化的数据,可以使用缓存减少重复请求:
- import requests
- import time
- from functools import lru_cache
- @lru_cache(maxsize=128)
- def fetch_with_cache(url, ttl=60):
- # 检查缓存是否过期
- current_time = time.time()
- if hasattr(fetch_with_cache, '_cache_time'):
- if url in fetch_with_cache._cache_time:
- if current_time - fetch_with_cache._cache_time[url] > ttl:
- # 缓存过期,清除
- fetch_with_cache.cache_clear()
- if hasattr(fetch_with_cache, '_cache_time'):
- del fetch_with_cache._cache_time[url]
-
- with requests.get(url) as response:
- # 记录缓存时间
- if not hasattr(fetch_with_cache, '_cache_time'):
- fetch_with_cache._cache_time = {}
- fetch_with_cache._cache_time[url] = current_time
-
- return response.json()
复制代码
最佳实践
1. 始终使用with语句或显式关闭响应
- # 推荐
- with requests.get(url, stream=True) as response:
- # 处理响应
- # 或者
- response = requests.get(url, stream=True)
- try:
- # 处理响应
- finally:
- response.close()
复制代码
2. 使用Session对象管理连接
- # 推荐
- with requests.Session() as session:
- response = session.get(url)
- # 处理响应
复制代码
3. 设置合理的超时时间
- # 推荐
- try:
- with requests.get(url, timeout=10) as response:
- # 处理响应
- except requests.Timeout:
- # 处理超时
复制代码
4. 使用流式传输处理大文件
- # 推荐
- with requests.get(url, stream=True) as response:
- with open('file.zip', 'wb') as f:
- for chunk in response.iter_content(chunk_size=8192):
- if chunk:
- f.write(chunk)
复制代码
5. 适当配置连接池参数
- # 推荐
- session = requests.Session()
- adapter = requests.adapters.HTTPAdapter(
- pool_connections=10,
- pool_maxsize=20,
- max_retries=3
- )
- session.mount('http://', adapter)
- session.mount('https://', adapter)
复制代码
6. 处理异常情况
- # 推荐
- try:
- with requests.get(url, timeout=10) as response:
- response.raise_for_status()
- # 处理响应
- except requests.HTTPError as e:
- # 处理HTTP错误
- except requests.ConnectionError:
- # 处理连接错误
- except requests.Timeout:
- # 处理超时
- except requests.RequestException as e:
- # 处理其他请求错误
- except Exception as e:
- # 处理其他异常
复制代码
7. 监控资源使用情况
- # 推荐
- import psutil
- import os
- def monitor_resources():
- process = psutil.Process(os.getpid())
- print(f"内存使用: {process.memory_info().rss / 1024 / 1024:.2f} MB")
- print(f"打开的文件数: {process.num_fds()}")
- print(f"网络连接数: {len(process.connections())}")
复制代码
结论
正确管理requests库中的资源对于构建高效、稳定的网络应用程序至关重要。通过使用上下文管理器、Session对象、合理配置连接池参数、设置超时时间、使用流式传输等技术,我们可以有效避免内存泄漏和连接耗尽问题,显著提升应用程序的性能和稳定性。
在实际开发中,我们应该养成良好的资源管理习惯,始终关注资源的使用情况,及时发现和解决潜在的问题。通过本文介绍的关键技巧和最佳实践,你可以让你的网络请求代码更加高效、稳定,显著提升开发效率。
记住,资源管理不是一次性的任务,而是贯穿整个开发过程的持续关注点。只有时刻保持警惕,才能构建出真正高质量的网络应用程序。 |
|