|
|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
Python串口释放详解避免程序崩溃与设备占用问题
串口通信基础
串口通信是一种常见的设备通信方式,在嵌入式系统、工业控制、传感器数据采集等领域广泛应用。在Python中,我们通常使用pyserial库来处理串口通信,它提供了简单而强大的接口来实现串口数据的读写操作。
串口是一种系统资源,当应用程序打开串口后,该串口会被占用,其他应用程序将无法访问它,直到当前应用程序释放该串口。因此,正确地管理串口资源,特别是在程序异常终止或崩溃的情况下,对于系统的稳定性和资源的合理利用至关重要。
Python串口编程基础
在Python中,使用pyserial库进行串口编程的基本流程如下:
- import serial
- # 打开串口
- ser = serial.Serial('COM1', 9600, timeout=1)
- # 读写数据
- ser.write(b'Hello World')
- data = ser.readline()
- # 关闭串口
- ser.close()
复制代码
这种简单的串口操作在正常情况下可以工作,但如果在读写数据过程中发生异常,ser.close()可能不会被执行,导致串口资源无法释放。
串口资源未释放的问题
当串口资源未正确释放时,会出现以下问题:
1. 设备占用:串口设备保持打开状态,其他程序无法访问该串口。
2. 资源泄漏:系统资源被持续占用,可能导致系统性能下降。
3. 程序崩溃:在多次运行程序时,尝试访问已被占用但未正确释放的串口会导致程序崩溃。
4. 设备异常:某些设备在连接异常断开后可能需要重置才能正常工作。
正确的串口释放方法
最基本的串口资源管理方法是使用try-finally结构:
- import serial
- ser = None
- try:
- ser = serial.Serial('COM1', 9600, timeout=1)
- ser.write(b'Hello World')
- data = ser.readline()
- print(f"Received: {data}")
- finally:
- if ser is not None and ser.is_open:
- ser.close()
- print("Serial port closed")
复制代码
这种方法确保无论是否发生异常,finally块中的代码都会被执行,从而保证串口被正确关闭。
Python的with语句提供了一种更优雅的资源管理方式。pyserial库的Serial类支持上下文管理器协议:
- import serial
- try:
- with serial.Serial('COM1', 9600, timeout=1) as ser:
- ser.write(b'Hello World')
- data = ser.readline()
- print(f"Received: {data}")
- # 串口会在with块结束时自动关闭
- except serial.SerialException as e:
- print(f"Serial error: {e}")
- except Exception as e:
- print(f"Error: {e}")
复制代码
这种方法更加简洁,并且自动处理了资源的释放。
对于更复杂的应用场景,可以创建一个自定义的串口管理类,封装更多的功能:
- import serial
- import time
- from contextlib import contextmanager
- class SerialManager:
- def __init__(self, port, baudrate=9600, timeout=1):
- self.port = port
- self.baudrate = baudrate
- self.timeout = timeout
- self.serial_conn = None
-
- def connect(self):
- """连接串口"""
- try:
- self.serial_conn = serial.Serial(
- port=self.port,
- baudrate=self.baudrate,
- timeout=self.timeout
- )
- print(f"Connected to {self.port} at {self.baudrate} baud")
- return True
- except serial.SerialException as e:
- print(f"Failed to connect: {e}")
- return False
-
- def disconnect(self):
- """断开串口连接"""
- if self.serial_conn is not None and self.serial_conn.is_open:
- self.serial_conn.close()
- print("Serial port disconnected")
-
- def send_data(self, data):
- """发送数据"""
- if self.serial_conn is not None and self.serial_conn.is_open:
- try:
- if isinstance(data, str):
- data = data.encode('utf-8')
- self.serial_conn.write(data)
- return True
- except serial.SerialException as e:
- print(f"Failed to send data: {e}")
- return False
- return False
-
- def read_data(self, size=1):
- """读取数据"""
- if self.serial_conn is not None and self.serial_conn.is_open:
- try:
- return self.serial_conn.read(size)
- except serial.SerialException as e:
- print(f"Failed to read data: {e}")
- return None
- return None
-
- def read_line(self):
- """读取一行数据"""
- if self.serial_conn is not None and self.serial_conn.is_open:
- try:
- return self.serial_conn.readline()
- except serial.SerialException as e:
- print(f"Failed to read line: {e}")
- return None
- return None
-
- def __enter__(self):
- """上下文管理器入口"""
- self.connect()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- """上下文管理器出口"""
- self.disconnect()
- # 不处理异常,让它们正常传播
- return False
- # 使用示例
- def send_command_and_receive_response(command, expected_response=None, timeout=5):
- try:
- with SerialManager('COM1', 9600) as ser:
- # 发送命令
- ser.send_data(command)
- print(f"Sent: {command}")
-
- # 等待并读取响应
- start_time = time.time()
- response = b""
-
- while time.time() - start_time < timeout:
- line = ser.read_line()
- if line:
- response += line
- print(f"Received: {line.decode('utf-8', errors='replace').strip()}")
-
- # 如果有期望响应,检查是否收到
- if expected_response and expected_response.encode('utf-8') in response:
- return True
-
- # 如果没有期望响应或超时,返回已接收的数据
- return response.decode('utf-8', errors='replace')
-
- except Exception as e:
- print(f"Error in send_command_and_receive_response: {e}")
- return None
- # 测试
- result = send_command_and_receive_response("AT\r\n", "OK")
- print(f"Result: {result}")
复制代码
这种封装方式提供了更高级的抽象,使代码更加模块化和可重用。
异常处理与串口释放
在串口通信中,可能会遇到各种异常情况,如设备断开连接、权限问题、参数错误等。正确处理这些异常对于确保串口资源被正确释放至关重要。
1. serial.SerialException:所有串口相关异常的基类。
2. serial.SerialTimeoutException:读写操作超时。
3. serial.PortNotOpenError:尝试在未打开的端口上进行操作。
4. serial.SerialIOException:串口I/O错误。
- import serial
- import time
- import threading
- import logging
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- class RobustSerialManager:
- def __init__(self, port, baudrate=9600, timeout=1, reconnect_attempts=3, reconnect_delay=1):
- self.port = port
- self.baudrate = baudrate
- self.timeout = timeout
- self.reconnect_attempts = reconnect_attempts
- self.reconnect_delay = reconnect_delay
- self.serial_conn = None
- self.lock = threading.Lock()
- self._should_reconnect = True
-
- def connect(self):
- """连接串口,带有重试机制"""
- with self.lock:
- if self.serial_conn is not None and self.serial_conn.is_open:
- return True
-
- attempts = 0
- while attempts < self.reconnect_attempts and self._should_reconnect:
- try:
- attempts += 1
- logger.info(f"Attempting to connect to {self.port} (attempt {attempts}/{self.reconnect_attempts})")
-
- self.serial_conn = serial.Serial(
- port=self.port,
- baudrate=self.baudrate,
- timeout=self.timeout
- )
-
- logger.info(f"Successfully connected to {self.port} at {self.baudrate} baud")
- return True
-
- except serial.SerialException as e:
- logger.warning(f"Connection attempt {attempts} failed: {e}")
- if attempts < self.reconnect_attempts and self._should_reconnect:
- time.sleep(self.reconnect_delay)
-
- logger.error(f"Failed to connect to {self.port} after {self.reconnect_attempts} attempts")
- return False
-
- def disconnect(self):
- """断开串口连接"""
- with self.lock:
- self._should_reconnect = False # 停止任何正在进行的重连尝试
-
- if self.serial_conn is not None and self.serial_conn.is_open:
- try:
- self.serial_conn.close()
- logger.info("Serial port disconnected")
- except Exception as e:
- logger.error(f"Error while disconnecting: {e}")
- finally:
- self.serial_conn = None
-
- def send_data(self, data):
- """发送数据,带有连接检查和重试机制"""
- with self.lock:
- if not self._ensure_connection():
- return False
-
- try:
- if isinstance(data, str):
- data = data.encode('utf-8')
-
- self.serial_conn.write(data)
- logger.debug(f"Sent data: {data}")
- return True
-
- except serial.SerialTimeoutException:
- logger.warning("Timeout while sending data")
- return False
- except serial.SerialException as e:
- logger.error(f"Serial error while sending data: {e}")
- self._handle_connection_error()
- return False
- except Exception as e:
- logger.error(f"Unexpected error while sending data: {e}")
- return False
-
- def read_data(self, size=1):
- """读取数据,带有连接检查和重试机制"""
- with self.lock:
- if not self._ensure_connection():
- return None
-
- try:
- data = self.serial_conn.read(size)
- logger.debug(f"Received data: {data}")
- return data
-
- except serial.SerialTimeoutException:
- logger.debug("Timeout while reading data (this may be normal)")
- return b"" # 返回空字节而不是None,表示超时但连接仍然正常
- except serial.SerialException as e:
- logger.error(f"Serial error while reading data: {e}")
- self._handle_connection_error()
- return None
- except Exception as e:
- logger.error(f"Unexpected error while reading data: {e}")
- return None
-
- def read_line(self):
- """读取一行数据,带有连接检查和重试机制"""
- with self.lock:
- if not self._ensure_connection():
- return None
-
- try:
- line = self.serial_conn.readline()
- if line:
- logger.debug(f"Received line: {line}")
- return line
-
- except serial.SerialTimeoutException:
- logger.debug("Timeout while reading line (this may be normal)")
- return b"" # 返回空字节而不是None,表示超时但连接仍然正常
- except serial.SerialException as e:
- logger.error(f"Serial error while reading line: {e}")
- self._handle_connection_error()
- return None
- except Exception as e:
- logger.error(f"Unexpected error while reading line: {e}")
- return None
-
- def _ensure_connection(self):
- """确保串口连接,如果断开则尝试重新连接"""
- if self.serial_conn is None or not self.serial_conn.is_open:
- return self.connect()
- return True
-
- def _handle_connection_error(self):
- """处理连接错误,关闭当前连接并准备重连"""
- try:
- if self.serial_conn is not None and self.serial_conn.is_open:
- self.serial_conn.close()
- except:
- pass
- finally:
- self.serial_conn = None
-
- def __enter__(self):
- """上下文管理器入口"""
- self._should_reconnect = True
- self.connect()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- """上下文管理器出口"""
- self.disconnect()
- # 不处理异常,让它们正常传播
- return False
- # 使用示例
- def communicate_with_device():
- try:
- with RobustSerialManager('COM1', 9600, timeout=1, reconnect_attempts=3) as ser:
- # 发送AT命令
- if not ser.send_data("AT\r\n"):
- logger.error("Failed to send AT command")
- return False
-
- # 读取响应
- response = b""
- start_time = time.time()
- timeout = 5 # 5秒超时
-
- while time.time() - start_time < timeout:
- line = ser.read_line()
- if line:
- response += line
- logger.info(f"Received: {line.decode('utf-8', errors='replace').strip()}")
-
- # 检查是否收到OK响应
- if b"OK" in line:
- logger.info("Received OK response")
- return True
-
- # 短暂休眠,避免CPU占用过高
- time.sleep(0.1)
-
- logger.warning("No OK response received within timeout")
- return False
-
- except Exception as e:
- logger.error(f"Error in communicate_with_device: {e}")
- return False
- # 测试
- result = communicate_with_device()
- logger.info(f"Communication result: {'Success' if result else 'Failed'}")
复制代码
这个示例实现了一个健壮的串口管理器,具有以下特点:
1. 线程安全:使用锁确保多线程环境下的安全操作。
2. 自动重连:在连接断开时自动尝试重新连接。
3. 全面的异常处理:处理各种可能的异常情况。
4. 日志记录:详细记录操作过程和错误信息。
5. 资源管理:确保在任何情况下都能正确释放串口资源。
串口通信的最佳实践
• 始终使用上下文管理器:使用with语句或实现自定义的上下文管理器,确保资源被正确释放。
• 避免全局串口对象:全局对象使得资源管理变得复杂,应尽量使用局部对象或通过类封装。
• 显式关闭串口:即使使用了上下文管理器,在程序结束前也显式关闭所有打开的串口。
• 捕获特定异常:尽量捕获特定的异常类型,而不是使用通用的except:。
• 记录异常信息:使用日志记录异常信息,便于调试和问题追踪。
• 优雅降级:在发生异常时,提供合理的降级方案,而不是直接崩溃。
• 合理的超时设置:根据实际需求设置合适的超时时间,避免过长或过短。
• 批量操作:对于大量数据传输,考虑使用批量操作而非频繁的小数据传输。
• 异步通信:对于需要同时处理多个任务的应用,考虑使用异步串口通信。
• 封装串口操作:将串口操作封装在类或模块中,提供清晰的接口。
• 分离业务逻辑:将串口通信逻辑与业务逻辑分离,提高代码的可维护性。
• 单元测试:为串口通信代码编写单元测试,确保其正确性和稳定性。
实际应用案例
- import serial
- import time
- import json
- import csv
- from datetime import datetime
- import os
- import signal
- import sys
- class SensorDataCollector:
- def __init__(self, port, baudrate=9600, output_file='sensor_data.csv', max_retries=3):
- self.port = port
- self.baudrate = baudrate
- self.output_file = output_file
- self.max_retries = max_retries
- self.serial_conn = None
- self.running = False
- self._setup_signal_handlers()
-
- def _setup_signal_handlers(self):
- """设置信号处理器,确保程序退出时正确关闭资源"""
- signal.signal(signal.SIGINT, self._signal_handler)
- signal.signal(signal.SIGTERM, self._signal_handler)
-
- def _signal_handler(self, signum, frame):
- """信号处理器,优雅地关闭程序"""
- print(f"\nReceived signal {signum}, shutting down...")
- self.stop()
- sys.exit(0)
-
- def start(self):
- """启动数据采集"""
- if self.running:
- print("Data collection is already running")
- return False
-
- self.running = True
- try:
- # 初始化CSV文件
- self._init_csv_file()
-
- # 连接串口
- if not self._connect_serial():
- return False
-
- # 开始采集数据
- self._collect_data()
- return True
-
- except Exception as e:
- print(f"Error starting data collection: {e}")
- self.stop()
- return False
-
- def stop(self):
- """停止数据采集"""
- self.running = False
- self._disconnect_serial()
- print("Data collection stopped")
-
- def _init_csv_file(self):
- """初始化CSV文件,写入表头"""
- file_exists = os.path.isfile(self.output_file)
-
- with open(self.output_file, 'a', newline='') as csvfile:
- fieldnames = ['timestamp', 'temperature', 'humidity', 'pressure']
- writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
-
- if not file_exists:
- writer.writeheader()
-
- def _connect_serial(self):
- """连接串口,带有重试机制"""
- retries = 0
- while retries < self.max_retries:
- try:
- retries += 1
- print(f"Attempting to connect to {self.port} (attempt {retries}/{self.max_retries})")
-
- self.serial_conn = serial.Serial(
- port=self.port,
- baudrate=self.baudrate,
- timeout=1
- )
-
- print(f"Successfully connected to {self.port}")
- return True
-
- except serial.SerialException as e:
- print(f"Connection attempt {retries} failed: {e}")
- if retries < self.max_retries:
- time.sleep(1) # 等待1秒后重试
-
- print(f"Failed to connect to {self.port} after {self.max_retries} attempts")
- return False
-
- def _disconnect_serial(self):
- """断开串口连接"""
- if self.serial_conn is not None and self.serial_conn.is_open:
- try:
- self.serial_conn.close()
- print("Serial port disconnected")
- except Exception as e:
- print(f"Error disconnecting serial port: {e}")
- finally:
- self.serial_conn = None
-
- def _collect_data(self):
- """采集数据并写入CSV文件"""
- print("Starting data collection...")
-
- while self.running:
- try:
- # 读取一行数据
- line = self.serial_conn.readline()
- if not line:
- continue
-
- # 解析JSON数据
- try:
- data = json.loads(line.decode('utf-8').strip())
- except json.JSONDecodeError:
- print(f"Invalid JSON data: {line}")
- continue
-
- # 验证数据格式
- if not all(key in data for key in ['temperature', 'humidity', 'pressure']):
- print(f"Incomplete data: {data}")
- continue
-
- # 添加时间戳
- data['timestamp'] = datetime.now().isoformat()
-
- # 写入CSV文件
- self._write_to_csv(data)
-
- # 打印数据
- print(f"Collected: {data}")
-
- except serial.SerialException as e:
- print(f"Serial error: {e}")
- # 尝试重新连接
- self._disconnect_serial()
- if not self._connect_serial():
- print("Failed to reconnect, stopping data collection")
- break
-
- except Exception as e:
- print(f"Unexpected error: {e}")
- # 短暂休眠后继续
- time.sleep(1)
-
- def _write_to_csv(self, data):
- """将数据写入CSV文件"""
- try:
- with open(self.output_file, 'a', newline='') as csvfile:
- fieldnames = ['timestamp', 'temperature', 'humidity', 'pressure']
- writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
- writer.writerow(data)
- except Exception as e:
- print(f"Error writing to CSV file: {e}")
- # 使用示例
- if __name__ == "__main__":
- collector = SensorDataCollector('COM1', 9600, 'sensor_data.csv')
-
- try:
- collector.start()
- except KeyboardInterrupt:
- collector.stop()
复制代码
这个案例实现了一个传感器数据采集系统,具有以下特点:
1. 健壮的资源管理:使用信号处理器确保程序退出时正确关闭资源。
2. 自动重连机制:在连接断开时自动尝试重新连接。
3. 数据持久化:将采集的数据保存到CSV文件中。
4. 错误处理:处理各种可能的异常情况,确保程序稳定运行。
5. 优雅关闭:支持通过Ctrl+C或系统信号优雅地关闭程序。
- import serial
- import threading
- import queue
- import time
- import json
- from enum import Enum, auto
- from typing import Dict, List, Optional, Callable, Any
- class DeviceType(Enum):
- """设备类型枚举"""
- TEMPERATURE_SENSOR = auto()
- HUMIDITY_SENSOR = auto()
- PRESSURE_SENSOR = auto()
- GPS_MODULE = auto()
- CUSTOM_DEVICE = auto()
- class DeviceStatus(Enum):
- """设备状态枚举"""
- DISCONNECTED = auto()
- CONNECTING = auto()
- CONNECTED = auto()
- ERROR = auto()
- RECONNECTING = auto()
- class SerialDevice:
- """串口设备类"""
- def __init__(self, device_id: str, port: str, baudrate: int = 9600,
- device_type: DeviceType = DeviceType.CUSTOM_DEVICE,
- timeout: float = 1.0, reconnect_attempts: int = 3,
- reconnect_delay: float = 1.0):
- self.device_id = device_id
- self.port = port
- self.baudrate = baudrate
- self.device_type = device_type
- self.timeout = timeout
- self.reconnect_attempts = reconnect_attempts
- self.reconnect_delay = reconnect_delay
-
- self.serial_conn = None
- self.status = DeviceStatus.DISCONNECTED
- self.lock = threading.Lock()
- self.data_queue = queue.Queue()
- self.command_queue = queue.Queue()
- self.running = False
- self.worker_thread = None
- self.data_callback = None
- self.error_callback = None
-
- # 设备特定配置
- self.config = {}
-
- def set_config(self, config: Dict[str, Any]):
- """设置设备配置"""
- with self.lock:
- self.config.update(config)
-
- def get_config(self, key: str, default: Any = None) -> Any:
- """获取设备配置"""
- with self.lock:
- return self.config.get(key, default)
-
- def set_data_callback(self, callback: Callable[[str, bytes], None]):
- """设置数据回调函数"""
- self.data_callback = callback
-
- def set_error_callback(self, callback: Callable[[str, Exception], None]):
- """设置错误回调函数"""
- self.error_callback = callback
-
- def connect(self) -> bool:
- """连接设备"""
- with self.lock:
- if self.status == DeviceStatus.CONNECTED:
- return True
-
- self.status = DeviceStatus.CONNECTING
-
- attempts = 0
- while attempts < self.reconnect_attempts:
- try:
- attempts += 1
- print(f"[{self.device_id}] Attempting to connect to {self.port} (attempt {attempts}/{self.reconnect_attempts})")
-
- self.serial_conn = serial.Serial(
- port=self.port,
- baudrate=self.baudrate,
- timeout=self.timeout
- )
-
- self.status = DeviceStatus.CONNECTED
- print(f"[{self.device_id}] Successfully connected to {self.port}")
- return True
-
- except serial.SerialException as e:
- print(f"[{self.device_id}] Connection attempt {attempts} failed: {e}")
- if attempts < self.reconnect_attempts:
- time.sleep(self.reconnect_delay)
-
- self.status = DeviceStatus.ERROR
- print(f"[{self.device_id}] Failed to connect to {self.port} after {self.reconnect_attempts} attempts")
-
- if self.error_callback:
- self.error_callback(self.device_id, serial.SerialException(f"Failed to connect after {self.reconnect_attempts} attempts"))
-
- return False
-
- def disconnect(self):
- """断开设备连接"""
- with self.lock:
- self.running = False
-
- if self.serial_conn is not None and self.serial_conn.is_open:
- try:
- self.serial_conn.close()
- print(f"[{self.device_id}] Serial port disconnected")
- except Exception as e:
- print(f"[{self.device_id}] Error disconnecting serial port: {e}")
- finally:
- self.serial_conn = None
-
- self.status = DeviceStatus.DISCONNECTED
-
- def start(self):
- """启动设备通信"""
- with self.lock:
- if self.running:
- return
-
- if not self.connect():
- return
-
- self.running = True
- self.worker_thread = threading.Thread(target=self._worker, daemon=True)
- self.worker_thread.start()
- print(f"[{self.device_id}] Device communication started")
-
- def stop(self):
- """停止设备通信"""
- with self.lock:
- self.running = False
-
- if self.worker_thread and self.worker_thread.is_alive():
- self.worker_thread.join(timeout=1.0)
-
- self.disconnect()
- print(f"[{self.device_id}] Device communication stopped")
-
- def send_command(self, command: str or bytes) -> bool:
- """发送命令到设备"""
- with self.lock:
- if not self.running or self.status != DeviceStatus.CONNECTED:
- print(f"[{self.device_id}] Cannot send command, device not connected")
- return False
-
- try:
- if isinstance(command, str):
- command = command.encode('utf-8')
-
- self.command_queue.put(command)
- return True
-
- except Exception as e:
- print(f"[{self.device_id}] Error queuing command: {e}")
- return False
-
- def _worker(self):
- """工作线程,处理数据收发"""
- while self.running:
- try:
- # 处理命令发送
- try:
- while not self.command_queue.empty():
- command = self.command_queue.get_nowait()
- self._send_data(command)
- except queue.Empty:
- pass
-
- # 读取数据
- data = self._read_data()
- if data:
- # 处理数据
- self._process_data(data)
-
- # 短暂休眠,避免CPU占用过高
- time.sleep(0.01)
-
- except Exception as e:
- print(f"[{self.device_id}] Error in worker thread: {e}")
-
- if self.error_callback:
- self.error_callback(self.device_id, e)
-
- # 尝试重新连接
- self._handle_connection_error()
-
- def _send_data(self, data: bytes) -> bool:
- """发送数据到设备"""
- try:
- self.serial_conn.write(data)
- self.serial_conn.flush()
- print(f"[{self.device_id}] Sent data: {data}")
- return True
-
- except serial.SerialTimeoutException:
- print(f"[{self.device_id}] Timeout while sending data")
- return False
- except serial.SerialException as e:
- print(f"[{self.device_id}] Serial error while sending data: {e}")
- self._handle_connection_error()
- return False
- except Exception as e:
- print(f"[{self.device_id}] Unexpected error while sending data: {e}")
- return False
-
- def _read_data(self, size: int = 1) -> Optional[bytes]:
- """从设备读取数据"""
- try:
- data = self.serial_conn.read(size)
- if data:
- print(f"[{self.device_id}] Received data: {data}")
- return data
-
- except serial.SerialTimeoutException:
- # 超时是正常的,不记录错误
- return None
- except serial.SerialException as e:
- print(f"[{self.device_id}] Serial error while reading data: {e}")
- self._handle_connection_error()
- return None
- except Exception as e:
- print(f"[{self.device_id}] Unexpected error while reading data: {e}")
- return None
-
- def _read_line(self) -> Optional[bytes]:
- """从设备读取一行数据"""
- try:
- line = self.serial_conn.readline()
- if line:
- print(f"[{self.device_id}] Received line: {line}")
- return line
-
- except serial.SerialTimeoutException:
- # 超时是正常的,不记录错误
- return None
- except serial.SerialException as e:
- print(f"[{self.device_id}] Serial error while reading line: {e}")
- self._handle_connection_error()
- return None
- except Exception as e:
- print(f"[{self.device_id}] Unexpected error while reading line: {e}")
- return None
-
- def _process_data(self, data: bytes):
- """处理接收到的数据"""
- # 将数据放入队列
- self.data_queue.put(data)
-
- # 调用回调函数
- if self.data_callback:
- try:
- self.data_callback(self.device_id, data)
- except Exception as e:
- print(f"[{self.device_id}] Error in data callback: {e}")
-
- def _handle_connection_error(self):
- """处理连接错误"""
- with self.lock:
- self.status = DeviceStatus.RECONNECTING
-
- try:
- if self.serial_conn is not None and self.serial_conn.is_open:
- self.serial_conn.close()
- except:
- pass
- finally:
- self.serial_conn = None
-
- # 尝试重新连接
- if self.running:
- print(f"[{self.device_id}] Attempting to reconnect...")
- if not self.connect():
- self.status = DeviceStatus.ERROR
- print(f"[{self.device_id}] Failed to reconnect")
- else:
- print(f"[{self.device_id}] Reconnected successfully")
- class SerialDeviceManager:
- """串口设备管理器"""
- def __init__(self):
- self.devices: Dict[str, SerialDevice] = {}
- self.lock = threading.Lock()
- self.running = False
-
- def add_device(self, device_id: str, port: str, baudrate: int = 9600,
- device_type: DeviceType = DeviceType.CUSTOM_DEVICE,
- timeout: float = 1.0, reconnect_attempts: int = 3,
- reconnect_delay: float = 1.0) -> bool:
- """添加设备"""
- with self.lock:
- if device_id in self.devices:
- print(f"[Manager] Device {device_id} already exists")
- return False
-
- device = SerialDevice(
- device_id=device_id,
- port=port,
- baudrate=baudrate,
- device_type=device_type,
- timeout=timeout,
- reconnect_attempts=reconnect_attempts,
- reconnect_delay=reconnect_delay
- )
-
- # 设置回调函数
- device.set_data_callback(self._on_device_data)
- device.set_error_callback(self._on_device_error)
-
- self.devices[device_id] = device
- print(f"[Manager] Added device {device_id} on port {port}")
- return True
-
- def remove_device(self, device_id: str) -> bool:
- """移除设备"""
- with self.lock:
- if device_id not in self.devices:
- print(f"[Manager] Device {device_id} not found")
- return False
-
- device = self.devices[device_id]
- device.stop()
- del self.devices[device_id]
- print(f"[Manager] Removed device {device_id}")
- return True
-
- def start_device(self, device_id: str) -> bool:
- """启动设备"""
- with self.lock:
- if device_id not in self.devices:
- print(f"[Manager] Device {device_id} not found")
- return False
-
- device = self.devices[device_id]
- device.start()
- return True
-
- def stop_device(self, device_id: str) -> bool:
- """停止设备"""
- with self.lock:
- if device_id not in self.devices:
- print(f"[Manager] Device {device_id} not found")
- return False
-
- device = self.devices[device_id]
- device.stop()
- return True
-
- def start_all(self):
- """启动所有设备"""
- with self.lock:
- self.running = True
- for device_id, device in self.devices.items():
- device.start()
- print(f"[Manager] Started all devices")
-
- def stop_all(self):
- """停止所有设备"""
- with self.lock:
- self.running = False
- for device_id, device in self.devices.items():
- device.stop()
- print(f"[Manager] Stopped all devices")
-
- def send_command(self, device_id: str, command: str or bytes) -> bool:
- """发送命令到指定设备"""
- with self.lock:
- if device_id not in self.devices:
- print(f"[Manager] Device {device_id} not found")
- return False
-
- device = self.devices[device_id]
- return device.send_command(command)
-
- def get_device_status(self, device_id: str) -> Optional[DeviceStatus]:
- """获取设备状态"""
- with self.lock:
- if device_id not in self.devices:
- return None
-
- device = self.devices[device_id]
- return device.status
-
- def get_device_data(self, device_id: str, timeout: float = 1.0) -> Optional[bytes]:
- """获取设备数据"""
- with self.lock:
- if device_id not in self.devices:
- return None
-
- device = self.devices[device_id]
- try:
- return device.data_queue.get(timeout=timeout)
- except queue.Empty:
- return None
-
- def set_device_config(self, device_id: str, config: Dict[str, Any]) -> bool:
- """设置设备配置"""
- with self.lock:
- if device_id not in self.devices:
- return False
-
- device = self.devices[device_id]
- device.set_config(config)
- return True
-
- def get_device_config(self, device_id: str, key: str, default: Any = None) -> Any:
- """获取设备配置"""
- with self.lock:
- if device_id not in self.devices:
- return default
-
- device = self.devices[device_id]
- return device.get_config(key, default)
-
- def _on_device_data(self, device_id: str, data: bytes):
- """设备数据回调"""
- print(f"[Manager] Received data from {device_id}: {data}")
- # 这里可以添加全局数据处理逻辑
-
- def _on_device_error(self, device_id: str, error: Exception):
- """设备错误回调"""
- print(f"[Manager] Error from {device_id}: {error}")
- # 这里可以添加全局错误处理逻辑
- # 使用示例
- def main():
- # 创建设备管理器
- manager = SerialDeviceManager()
-
- try:
- # 添加设备
- manager.add_device("temp_sensor", "COM1", 9600, DeviceType.TEMPERATURE_SENSOR)
- manager.add_device("humidity_sensor", "COM2", 9600, DeviceType.HUMIDITY_SENSOR)
- manager.add_device("gps_module", "COM3", 4800, DeviceType.GPS_MODULE)
-
- # 设置设备配置
- manager.set_device_config("temp_sensor", {"sample_rate": 1.0, "unit": "celsius"})
- manager.set_device_config("humidity_sensor", {"sample_rate": 2.0, "unit": "percent"})
- manager.set_device_config("gps_module", {"update_rate": 1.0, "format": "NMEA"})
-
- # 启动所有设备
- manager.start_all()
-
- # 主循环
- running = True
- while running:
- try:
- # 获取并处理设备数据
- for device_id in ["temp_sensor", "humidity_sensor", "gps_module"]:
- data = manager.get_device_data(device_id, timeout=0.1)
- if data:
- try:
- # 尝试解析JSON数据
- parsed_data = json.loads(data.decode('utf-8'))
- print(f"[Main] Parsed data from {device_id}: {parsed_data}")
- except json.JSONDecodeError:
- print(f"[Main] Non-JSON data from {device_id}: {data}")
-
- # 定期发送命令
- current_time = time.time()
- if int(current_time) % 10 == 0: # 每10秒发送一次命令
- manager.send_command("temp_sensor", "GET_DATA\n")
- manager.send_command("humidity_sensor", "GET_DATA\n")
- manager.send_command("gps_module", "GET_LOCATION\n")
-
- # 短暂休眠,避免CPU占用过高
- time.sleep(0.1)
-
- except KeyboardInterrupt:
- print("\n[Main] Received keyboard interrupt, shutting down...")
- running = False
-
- finally:
- # 确保所有设备都停止
- manager.stop_all()
- if __name__ == "__main__":
- main()
复制代码
这个案例实现了一个多设备串口通信管理器,具有以下特点:
1. 多设备管理:可以同时管理多个串口设备,每个设备独立运行。
2. 线程安全:使用锁确保多线程环境下的安全操作。
3. 异步通信:每个设备在独立的线程中运行,不会阻塞主线程。
4. 队列机制:使用队列处理命令发送和数据接收,提高通信效率。
5. 回调机制:支持数据回调和错误回调,便于处理设备事件。
6. 设备状态管理:跟踪每个设备的状态,支持自动重连。
7. 配置管理:支持设备特定的配置。
8. 资源管理:确保在任何情况下都能正确释放串口资源。
常见问题与解决方案
现象:尝试打开串口时,提示”Access denied”或”Port is already open”。
原因:
1. 串口已被其他程序占用。
2. 之前运行的程序未正确关闭串口。
3. 没有足够的权限访问串口。
解决方案:
1. 检查串口占用:
“`python
import serial.tools.list_ports
# 列出所有可用串口
ports = serial.tools.list_ports.comports()
for port in ports:
- print(f"Device: {port.device}, Description: {port.description}")
复制代码- 2. **强制关闭串口**:
- ```python
- import serial
-
- def force_close_port(port):
- try:
- # 尝试打开并立即关闭串口
- ser = serial.Serial(port)
- ser.close()
- print(f"Port {port} closed successfully")
- return True
- except serial.SerialException as e:
- print(f"Failed to close port {port}: {e}")
- return False
复制代码
1. 使用管理员权限运行程序:在某些系统上,访问串口可能需要管理员权限。
现象:程序异常终止后,串口仍然被占用,无法重新打开。
原因:程序崩溃时未执行串口关闭代码。
解决方案:
1. 使用try-finally确保资源释放:
“`python
import serial
ser = None
try:
- ser = serial.Serial('COM1', 9600)
- # 执行操作...
复制代码
finally:
- if ser is not None and ser.is_open:
- ser.close()
复制代码- 2. **使用信号处理器**:
- ```python
- import signal
- import serial
-
- ser = None
-
- def signal_handler(signum, frame):
- global ser
- if ser is not None and ser.is_open:
- ser.close()
- exit(0)
-
- # 注册信号处理器
- signal.signal(signal.SIGINT, signal_handler)
- signal.signal(signal.SIGTERM, signal_handler)
-
- # 主程序
- try:
- ser = serial.Serial('COM1', 9600)
- # 执行操作...
- except Exception as e:
- print(f"Error: {e}")
- finally:
- if ser is not None and ser.is_open:
- ser.close()
复制代码
1. 使用atexit注册退出函数:
“`python
import atexit
import serial
ser = None
def cleanup():
- global ser
- if ser is not None and ser.is_open:
- ser.close()
复制代码
# 注册退出函数
atexit.register(cleanup)
# 主程序
try:
- ser = serial.Serial('COM1', 9600)
- # 执行操作...
复制代码
except Exception as e:
- #### 问题3:串口通信不稳定,数据丢失
- **现象**:串口通信过程中数据丢失或损坏。
- **原因**:
- 1. 缓冲区溢出。
- 2. 数据接收不及时。
- 3. 串口参数不匹配。
- 4. 硬件连接问题。
- **解决方案**:
- 1. **调整缓冲区大小**:
- ```python
- import serial
-
- ser = serial.Serial('COM1', 9600)
-
- # 设置输入缓冲区大小
- ser.set_buffer_size(rx_size=65536, tx_size=65536)
复制代码
1. 使用流控制:
“`python
import serial
ser = serial.Serial(
- 'COM1',
- 9600,
- rtscts=True, # 启用硬件流控制
- xonxoff=True # 启用软件流控制
复制代码
)
- 3. **实现数据校验和重传机制**:
- ```python
- import serial
- import time
-
- class ReliableSerial:
- def __init__(self, port, baudrate=9600, max_retries=3, retry_delay=0.1):
- self.serial = serial.Serial(port, baudrate)
- self.max_retries = max_retries
- self.retry_delay = retry_delay
-
- def send_with_ack(self, data, ack=b'ACK', timeout=1.0):
- """发送数据并等待确认"""
- retries = 0
- while retries < self.max_retries:
- try:
- # 发送数据
- self.serial.write(data)
- self.serial.flush()
-
- # 等待确认
- start_time = time.time()
- while time.time() - start_time < timeout:
- response = self.serial.read(len(ack))
- if response == ack:
- return True
- time.sleep(0.01)
-
- # 超时,增加重试计数
- retries += 1
- if retries < self.max_retries:
- time.sleep(self.retry_delay)
-
- except Exception as e:
- print(f"Error in send_with_ack: {e}")
- retries += 1
- if retries < self.max_retries:
- time.sleep(self.retry_delay)
-
- return False
-
- def close(self):
- if self.serial.is_open:
- self.serial.close()
复制代码
1. 使用多线程或异步IO:
“`python
import serial
import threading
import queue
class AsyncSerial:
- def __init__(self, port, baudrate=9600):
- self.serial = serial.Serial(port, baudrate)
- self.receive_queue = queue.Queue()
- self.running = False
- self.receive_thread = None
- def start(self):
- """启动接收线程"""
- if not self.running:
- self.running = True
- self.receive_thread = threading.Thread(target=self._receive_loop)
- self.receive_thread.daemon = True
- self.receive_thread.start()
- def stop(self):
- """停止接收线程"""
- self.running = False
- if self.receive_thread and self.receive_thread.is_alive():
- self.receive_thread.join(timeout=1.0)
- def _receive_loop(self):
- """接收循环"""
- while self.running:
- try:
- if self.serial.in_waiting > 0:
- data = self.serial.read(self.serial.in_waiting)
- self.receive_queue.put(data)
- else:
- time.sleep(0.01) # 短暂休眠,避免CPU占用过高
- except Exception as e:
- print(f"Error in receive loop: {e}")
- time.sleep(0.1)
- def send(self, data):
- """发送数据"""
- try:
- self.serial.write(data)
- self.serial.flush()
- return True
- except Exception as e:
- print(f"Error sending data: {e}")
- return False
- def receive(self, timeout=1.0):
- """接收数据"""
- try:
- return self.receive_queue.get(timeout=timeout)
- except queue.Empty:
- return None
- def close(self):
- """关闭串口"""
- self.stop()
- if self.serial.is_open:
- self.serial.close()
复制代码- #### 问题4:跨平台串口兼容性问题
- **现象**:程序在不同操作系统上表现不一致,特别是在串口命名和参数方面。
- **原因**:不同操作系统对串口的命名方式和参数支持不同。
- **解决方案**:
- 1. **使用条件判断处理不同平台**:
- ```python
- import serial
- import sys
- import platform
-
- def get_platform_compatible_port(port):
- """获取平台兼容的串口名称"""
- system = platform.system()
-
- if system == 'Windows':
- # Windows平台使用COMx格式
- if not port.upper().startswith('COM'):
- return f'COM{port}'
- return port
-
- elif system == 'Linux':
- # Linux平台使用/dev/ttyXXX格式
- if not port.startswith('/dev/'):
- return f'/dev/{port}'
- return port
-
- elif system == 'Darwin': # macOS
- # macOS平台使用/dev/cu.XXX或/dev/tty.XXX格式
- if not port.startswith('/dev/'):
- return f'/dev/cu.{port}'
- return port
-
- else:
- # 未知平台,返回原始名称
- return port
-
- def get_platform_compatible_params():
- """获取平台兼容的串口参数"""
- system = platform.system()
-
- params = {
- 'baudrate': 9600,
- 'bytesize': serial.EIGHTBITS,
- 'parity': serial.PARITY_NONE,
- 'stopbits': serial.STOPBITS_ONE,
- 'timeout': 1
- }
-
- # 根据平台调整参数
- if system == 'Windows':
- params['rtscts'] = False # Windows默认不启用RTS/CTS流控制
- elif system == 'Linux':
- params['rtscts'] = True # Linux可以启用RTS/CTS流控制
-
- return params
-
- # 使用示例
- port_name = "1" # 可以是"1", "COM1", "/dev/ttyS1"等
- compatible_port = get_platform_compatible_port(port_name)
- compatible_params = get_platform_compatible_params()
-
- try:
- ser = serial.Serial(compatible_port, **compatible_params)
- print(f"Successfully opened {compatible_port}")
- # 执行操作...
- ser.close()
- except serial.SerialException as e:
- print(f"Failed to open {compatible_port}: {e}")
复制代码
1. 使用serial.tools.list_ports检测可用串口:
“`python
import serial
import serial.tools.list_ports
def list_available_ports():
- """列出所有可用串口"""
- ports = serial.tools.list_ports.comports()
- result = []
- for port in ports:
- result.append({
- 'device': port.device,
- 'description': port.description,
- 'hwid': port.hwid,
- 'vid': port.vid if hasattr(port, 'vid') else None,
- 'pid': port.pid if hasattr(port, 'pid') else None,
- 'serial_number': port.serial_number if hasattr(port, 'serial_number') else None,
- 'manufacturer': port.manufacturer if hasattr(port, 'manufacturer') else None,
- 'product': port.product if hasattr(port, 'product') else None,
- })
- return result
复制代码
def find_port_by_vid_pid(vid, pid):
- """通过VID和PID查找串口"""
- ports = serial.tools.list_ports.comports()
- for port in ports:
- if hasattr(port, 'vid') and hasattr(port, 'pid'):
- if port.vid == vid and port.pid == pid:
- return port.device
- return None
复制代码
def find_port_by_description(description_substring):
- """通过描述查找串口"""
- ports = serial.tools.list_ports.comports()
- for port in ports:
- if description_substring.lower() in port.description.lower():
- return port.device
- return None
复制代码
# 使用示例
available_ports = list_available_ports()
print(“Available ports:”)
for port_info in available_ports:
- print(f"Device: {port_info['device']}, Description: {port_info['description']}")
复制代码
# 通过VID和PID查找特定设备
arduino_port = find_port_by_vid_pid(0x2341, 0x0043) # Arduino Uno的VID和PID
if arduino_port:
- print(f"Found Arduino at: {arduino_port}")
复制代码
# 通过描述查找设备
ftdi_port = find_port_by_description(“FTDI”)
if ftdi_port:
- print(f"Found FTDI device at: {ftdi_port}")
复制代码
”`
总结
在Python中进行串口编程时,正确管理串口资源是确保程序稳定运行的关键。本文详细介绍了串口资源的释放方法,包括使用try-finally结构、with语句(上下文管理器)以及自定义串口管理类等技术。我们还探讨了异常处理与串口释放的关系,并提供了多个实际应用案例,帮助读者理解如何在实际项目中应用这些技术。
通过遵循本文提供的最佳实践,开发者可以避免常见的串口资源管理问题,如设备占用、程序崩溃和资源泄漏等。同时,我们还讨论了常见问题的解决方案,帮助开发者快速定位和解决串口通信中的问题。
串口通信是许多嵌入式系统和工业控制应用的基础,掌握正确的资源管理技术对于开发稳定可靠的应用程序至关重要。希望本文能够帮助读者更好地理解和应用Python串口编程技术,开发出更加健壮的应用程序。 |
|