活动公告

系统通知
05-18 21:22
系统通知
通知:本站资源由网友上传分享,如有违规等问题请到版务模块进行投诉,资源失效请在帖子内回复要求补档,会尽快处理!
10-23 09:31

Python串口释放详解避免程序崩溃与设备占用问题

SunJu_FaceMall

3万

主题

2860

科技点

3万

积分

白金月票

碾压王

积分
32872

塔罗立华奏

<font color=白金月票" /> 发表于 2025-9-21 16:30:00 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Python串口释放详解避免程序崩溃与设备占用问题

串口通信基础

串口通信是一种常见的设备通信方式,在嵌入式系统、工业控制、传感器数据采集等领域广泛应用。在Python中,我们通常使用pyserial库来处理串口通信,它提供了简单而强大的接口来实现串口数据的读写操作。

串口是一种系统资源,当应用程序打开串口后,该串口会被占用,其他应用程序将无法访问它,直到当前应用程序释放该串口。因此,正确地管理串口资源,特别是在程序异常终止或崩溃的情况下,对于系统的稳定性和资源的合理利用至关重要。

Python串口编程基础

在Python中,使用pyserial库进行串口编程的基本流程如下:
  1. import serial
  2. # 打开串口
  3. ser = serial.Serial('COM1', 9600, timeout=1)
  4. # 读写数据
  5. ser.write(b'Hello World')
  6. data = ser.readline()
  7. # 关闭串口
  8. ser.close()
复制代码

这种简单的串口操作在正常情况下可以工作,但如果在读写数据过程中发生异常,ser.close()可能不会被执行,导致串口资源无法释放。

串口资源未释放的问题

当串口资源未正确释放时,会出现以下问题:

1. 设备占用:串口设备保持打开状态,其他程序无法访问该串口。
2. 资源泄漏:系统资源被持续占用,可能导致系统性能下降。
3. 程序崩溃:在多次运行程序时,尝试访问已被占用但未正确释放的串口会导致程序崩溃。
4. 设备异常:某些设备在连接异常断开后可能需要重置才能正常工作。

正确的串口释放方法

最基本的串口资源管理方法是使用try-finally结构:
  1. import serial
  2. ser = None
  3. try:
  4.     ser = serial.Serial('COM1', 9600, timeout=1)
  5.     ser.write(b'Hello World')
  6.     data = ser.readline()
  7.     print(f"Received: {data}")
  8. finally:
  9.     if ser is not None and ser.is_open:
  10.         ser.close()
  11.         print("Serial port closed")
复制代码

这种方法确保无论是否发生异常,finally块中的代码都会被执行,从而保证串口被正确关闭。

Python的with语句提供了一种更优雅的资源管理方式。pyserial库的Serial类支持上下文管理器协议:
  1. import serial
  2. try:
  3.     with serial.Serial('COM1', 9600, timeout=1) as ser:
  4.         ser.write(b'Hello World')
  5.         data = ser.readline()
  6.         print(f"Received: {data}")
  7.     # 串口会在with块结束时自动关闭
  8. except serial.SerialException as e:
  9.     print(f"Serial error: {e}")
  10. except Exception as e:
  11.     print(f"Error: {e}")
复制代码

这种方法更加简洁,并且自动处理了资源的释放。

对于更复杂的应用场景,可以创建一个自定义的串口管理类,封装更多的功能:
  1. import serial
  2. import time
  3. from contextlib import contextmanager
  4. class SerialManager:
  5.     def __init__(self, port, baudrate=9600, timeout=1):
  6.         self.port = port
  7.         self.baudrate = baudrate
  8.         self.timeout = timeout
  9.         self.serial_conn = None
  10.    
  11.     def connect(self):
  12.         """连接串口"""
  13.         try:
  14.             self.serial_conn = serial.Serial(
  15.                 port=self.port,
  16.                 baudrate=self.baudrate,
  17.                 timeout=self.timeout
  18.             )
  19.             print(f"Connected to {self.port} at {self.baudrate} baud")
  20.             return True
  21.         except serial.SerialException as e:
  22.             print(f"Failed to connect: {e}")
  23.             return False
  24.    
  25.     def disconnect(self):
  26.         """断开串口连接"""
  27.         if self.serial_conn is not None and self.serial_conn.is_open:
  28.             self.serial_conn.close()
  29.             print("Serial port disconnected")
  30.    
  31.     def send_data(self, data):
  32.         """发送数据"""
  33.         if self.serial_conn is not None and self.serial_conn.is_open:
  34.             try:
  35.                 if isinstance(data, str):
  36.                     data = data.encode('utf-8')
  37.                 self.serial_conn.write(data)
  38.                 return True
  39.             except serial.SerialException as e:
  40.                 print(f"Failed to send data: {e}")
  41.                 return False
  42.         return False
  43.    
  44.     def read_data(self, size=1):
  45.         """读取数据"""
  46.         if self.serial_conn is not None and self.serial_conn.is_open:
  47.             try:
  48.                 return self.serial_conn.read(size)
  49.             except serial.SerialException as e:
  50.                 print(f"Failed to read data: {e}")
  51.                 return None
  52.         return None
  53.    
  54.     def read_line(self):
  55.         """读取一行数据"""
  56.         if self.serial_conn is not None and self.serial_conn.is_open:
  57.             try:
  58.                 return self.serial_conn.readline()
  59.             except serial.SerialException as e:
  60.                 print(f"Failed to read line: {e}")
  61.                 return None
  62.         return None
  63.    
  64.     def __enter__(self):
  65.         """上下文管理器入口"""
  66.         self.connect()
  67.         return self
  68.    
  69.     def __exit__(self, exc_type, exc_val, exc_tb):
  70.         """上下文管理器出口"""
  71.         self.disconnect()
  72.         # 不处理异常,让它们正常传播
  73.         return False
  74. # 使用示例
  75. def send_command_and_receive_response(command, expected_response=None, timeout=5):
  76.     try:
  77.         with SerialManager('COM1', 9600) as ser:
  78.             # 发送命令
  79.             ser.send_data(command)
  80.             print(f"Sent: {command}")
  81.             
  82.             # 等待并读取响应
  83.             start_time = time.time()
  84.             response = b""
  85.             
  86.             while time.time() - start_time < timeout:
  87.                 line = ser.read_line()
  88.                 if line:
  89.                     response += line
  90.                     print(f"Received: {line.decode('utf-8', errors='replace').strip()}")
  91.                     
  92.                     # 如果有期望响应,检查是否收到
  93.                     if expected_response and expected_response.encode('utf-8') in response:
  94.                         return True
  95.             
  96.             # 如果没有期望响应或超时,返回已接收的数据
  97.             return response.decode('utf-8', errors='replace')
  98.    
  99.     except Exception as e:
  100.         print(f"Error in send_command_and_receive_response: {e}")
  101.         return None
  102. # 测试
  103. result = send_command_and_receive_response("AT\r\n", "OK")
  104. print(f"Result: {result}")
复制代码

这种封装方式提供了更高级的抽象,使代码更加模块化和可重用。

异常处理与串口释放

在串口通信中,可能会遇到各种异常情况,如设备断开连接、权限问题、参数错误等。正确处理这些异常对于确保串口资源被正确释放至关重要。

1. serial.SerialException:所有串口相关异常的基类。
2. serial.SerialTimeoutException:读写操作超时。
3. serial.PortNotOpenError:尝试在未打开的端口上进行操作。
4. serial.SerialIOException:串口I/O错误。
  1. import serial
  2. import time
  3. import threading
  4. import logging
  5. # 配置日志
  6. logging.basicConfig(
  7.     level=logging.INFO,
  8.     format='%(asctime)s - %(levelname)s - %(message)s'
  9. )
  10. logger = logging.getLogger(__name__)
  11. class RobustSerialManager:
  12.     def __init__(self, port, baudrate=9600, timeout=1, reconnect_attempts=3, reconnect_delay=1):
  13.         self.port = port
  14.         self.baudrate = baudrate
  15.         self.timeout = timeout
  16.         self.reconnect_attempts = reconnect_attempts
  17.         self.reconnect_delay = reconnect_delay
  18.         self.serial_conn = None
  19.         self.lock = threading.Lock()
  20.         self._should_reconnect = True
  21.    
  22.     def connect(self):
  23.         """连接串口,带有重试机制"""
  24.         with self.lock:
  25.             if self.serial_conn is not None and self.serial_conn.is_open:
  26.                 return True
  27.             
  28.             attempts = 0
  29.             while attempts < self.reconnect_attempts and self._should_reconnect:
  30.                 try:
  31.                     attempts += 1
  32.                     logger.info(f"Attempting to connect to {self.port} (attempt {attempts}/{self.reconnect_attempts})")
  33.                     
  34.                     self.serial_conn = serial.Serial(
  35.                         port=self.port,
  36.                         baudrate=self.baudrate,
  37.                         timeout=self.timeout
  38.                     )
  39.                     
  40.                     logger.info(f"Successfully connected to {self.port} at {self.baudrate} baud")
  41.                     return True
  42.                
  43.                 except serial.SerialException as e:
  44.                     logger.warning(f"Connection attempt {attempts} failed: {e}")
  45.                     if attempts < self.reconnect_attempts and self._should_reconnect:
  46.                         time.sleep(self.reconnect_delay)
  47.             
  48.             logger.error(f"Failed to connect to {self.port} after {self.reconnect_attempts} attempts")
  49.             return False
  50.    
  51.     def disconnect(self):
  52.         """断开串口连接"""
  53.         with self.lock:
  54.             self._should_reconnect = False  # 停止任何正在进行的重连尝试
  55.             
  56.             if self.serial_conn is not None and self.serial_conn.is_open:
  57.                 try:
  58.                     self.serial_conn.close()
  59.                     logger.info("Serial port disconnected")
  60.                 except Exception as e:
  61.                     logger.error(f"Error while disconnecting: {e}")
  62.                 finally:
  63.                     self.serial_conn = None
  64.    
  65.     def send_data(self, data):
  66.         """发送数据,带有连接检查和重试机制"""
  67.         with self.lock:
  68.             if not self._ensure_connection():
  69.                 return False
  70.             
  71.             try:
  72.                 if isinstance(data, str):
  73.                     data = data.encode('utf-8')
  74.                
  75.                 self.serial_conn.write(data)
  76.                 logger.debug(f"Sent data: {data}")
  77.                 return True
  78.             
  79.             except serial.SerialTimeoutException:
  80.                 logger.warning("Timeout while sending data")
  81.                 return False
  82.             except serial.SerialException as e:
  83.                 logger.error(f"Serial error while sending data: {e}")
  84.                 self._handle_connection_error()
  85.                 return False
  86.             except Exception as e:
  87.                 logger.error(f"Unexpected error while sending data: {e}")
  88.                 return False
  89.    
  90.     def read_data(self, size=1):
  91.         """读取数据,带有连接检查和重试机制"""
  92.         with self.lock:
  93.             if not self._ensure_connection():
  94.                 return None
  95.             
  96.             try:
  97.                 data = self.serial_conn.read(size)
  98.                 logger.debug(f"Received data: {data}")
  99.                 return data
  100.             
  101.             except serial.SerialTimeoutException:
  102.                 logger.debug("Timeout while reading data (this may be normal)")
  103.                 return b""  # 返回空字节而不是None,表示超时但连接仍然正常
  104.             except serial.SerialException as e:
  105.                 logger.error(f"Serial error while reading data: {e}")
  106.                 self._handle_connection_error()
  107.                 return None
  108.             except Exception as e:
  109.                 logger.error(f"Unexpected error while reading data: {e}")
  110.                 return None
  111.    
  112.     def read_line(self):
  113.         """读取一行数据,带有连接检查和重试机制"""
  114.         with self.lock:
  115.             if not self._ensure_connection():
  116.                 return None
  117.             
  118.             try:
  119.                 line = self.serial_conn.readline()
  120.                 if line:
  121.                     logger.debug(f"Received line: {line}")
  122.                 return line
  123.             
  124.             except serial.SerialTimeoutException:
  125.                 logger.debug("Timeout while reading line (this may be normal)")
  126.                 return b""  # 返回空字节而不是None,表示超时但连接仍然正常
  127.             except serial.SerialException as e:
  128.                 logger.error(f"Serial error while reading line: {e}")
  129.                 self._handle_connection_error()
  130.                 return None
  131.             except Exception as e:
  132.                 logger.error(f"Unexpected error while reading line: {e}")
  133.                 return None
  134.    
  135.     def _ensure_connection(self):
  136.         """确保串口连接,如果断开则尝试重新连接"""
  137.         if self.serial_conn is None or not self.serial_conn.is_open:
  138.             return self.connect()
  139.         return True
  140.    
  141.     def _handle_connection_error(self):
  142.         """处理连接错误,关闭当前连接并准备重连"""
  143.         try:
  144.             if self.serial_conn is not None and self.serial_conn.is_open:
  145.                 self.serial_conn.close()
  146.         except:
  147.             pass
  148.         finally:
  149.             self.serial_conn = None
  150.    
  151.     def __enter__(self):
  152.         """上下文管理器入口"""
  153.         self._should_reconnect = True
  154.         self.connect()
  155.         return self
  156.    
  157.     def __exit__(self, exc_type, exc_val, exc_tb):
  158.         """上下文管理器出口"""
  159.         self.disconnect()
  160.         # 不处理异常,让它们正常传播
  161.         return False
  162. # 使用示例
  163. def communicate_with_device():
  164.     try:
  165.         with RobustSerialManager('COM1', 9600, timeout=1, reconnect_attempts=3) as ser:
  166.             # 发送AT命令
  167.             if not ser.send_data("AT\r\n"):
  168.                 logger.error("Failed to send AT command")
  169.                 return False
  170.             
  171.             # 读取响应
  172.             response = b""
  173.             start_time = time.time()
  174.             timeout = 5  # 5秒超时
  175.             
  176.             while time.time() - start_time < timeout:
  177.                 line = ser.read_line()
  178.                 if line:
  179.                     response += line
  180.                     logger.info(f"Received: {line.decode('utf-8', errors='replace').strip()}")
  181.                     
  182.                     # 检查是否收到OK响应
  183.                     if b"OK" in line:
  184.                         logger.info("Received OK response")
  185.                         return True
  186.                
  187.                 # 短暂休眠,避免CPU占用过高
  188.                 time.sleep(0.1)
  189.             
  190.             logger.warning("No OK response received within timeout")
  191.             return False
  192.    
  193.     except Exception as e:
  194.         logger.error(f"Error in communicate_with_device: {e}")
  195.         return False
  196. # 测试
  197. result = communicate_with_device()
  198. logger.info(f"Communication result: {'Success' if result else 'Failed'}")
复制代码

这个示例实现了一个健壮的串口管理器,具有以下特点:

1. 线程安全:使用锁确保多线程环境下的安全操作。
2. 自动重连:在连接断开时自动尝试重新连接。
3. 全面的异常处理:处理各种可能的异常情况。
4. 日志记录:详细记录操作过程和错误信息。
5. 资源管理:确保在任何情况下都能正确释放串口资源。

串口通信的最佳实践

• 始终使用上下文管理器:使用with语句或实现自定义的上下文管理器,确保资源被正确释放。
• 避免全局串口对象:全局对象使得资源管理变得复杂,应尽量使用局部对象或通过类封装。
• 显式关闭串口:即使使用了上下文管理器,在程序结束前也显式关闭所有打开的串口。

• 捕获特定异常:尽量捕获特定的异常类型,而不是使用通用的except:。
• 记录异常信息:使用日志记录异常信息,便于调试和问题追踪。
• 优雅降级:在发生异常时,提供合理的降级方案,而不是直接崩溃。

• 合理的超时设置:根据实际需求设置合适的超时时间,避免过长或过短。
• 批量操作:对于大量数据传输,考虑使用批量操作而非频繁的小数据传输。
• 异步通信:对于需要同时处理多个任务的应用,考虑使用异步串口通信。

• 封装串口操作:将串口操作封装在类或模块中,提供清晰的接口。
• 分离业务逻辑:将串口通信逻辑与业务逻辑分离,提高代码的可维护性。
• 单元测试:为串口通信代码编写单元测试,确保其正确性和稳定性。

实际应用案例
  1. import serial
  2. import time
  3. import json
  4. import csv
  5. from datetime import datetime
  6. import os
  7. import signal
  8. import sys
  9. class SensorDataCollector:
  10.     def __init__(self, port, baudrate=9600, output_file='sensor_data.csv', max_retries=3):
  11.         self.port = port
  12.         self.baudrate = baudrate
  13.         self.output_file = output_file
  14.         self.max_retries = max_retries
  15.         self.serial_conn = None
  16.         self.running = False
  17.         self._setup_signal_handlers()
  18.    
  19.     def _setup_signal_handlers(self):
  20.         """设置信号处理器,确保程序退出时正确关闭资源"""
  21.         signal.signal(signal.SIGINT, self._signal_handler)
  22.         signal.signal(signal.SIGTERM, self._signal_handler)
  23.    
  24.     def _signal_handler(self, signum, frame):
  25.         """信号处理器,优雅地关闭程序"""
  26.         print(f"\nReceived signal {signum}, shutting down...")
  27.         self.stop()
  28.         sys.exit(0)
  29.    
  30.     def start(self):
  31.         """启动数据采集"""
  32.         if self.running:
  33.             print("Data collection is already running")
  34.             return False
  35.         
  36.         self.running = True
  37.         try:
  38.             # 初始化CSV文件
  39.             self._init_csv_file()
  40.             
  41.             # 连接串口
  42.             if not self._connect_serial():
  43.                 return False
  44.             
  45.             # 开始采集数据
  46.             self._collect_data()
  47.             return True
  48.         
  49.         except Exception as e:
  50.             print(f"Error starting data collection: {e}")
  51.             self.stop()
  52.             return False
  53.    
  54.     def stop(self):
  55.         """停止数据采集"""
  56.         self.running = False
  57.         self._disconnect_serial()
  58.         print("Data collection stopped")
  59.    
  60.     def _init_csv_file(self):
  61.         """初始化CSV文件,写入表头"""
  62.         file_exists = os.path.isfile(self.output_file)
  63.         
  64.         with open(self.output_file, 'a', newline='') as csvfile:
  65.             fieldnames = ['timestamp', 'temperature', 'humidity', 'pressure']
  66.             writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
  67.             
  68.             if not file_exists:
  69.                 writer.writeheader()
  70.    
  71.     def _connect_serial(self):
  72.         """连接串口,带有重试机制"""
  73.         retries = 0
  74.         while retries < self.max_retries:
  75.             try:
  76.                 retries += 1
  77.                 print(f"Attempting to connect to {self.port} (attempt {retries}/{self.max_retries})")
  78.                
  79.                 self.serial_conn = serial.Serial(
  80.                     port=self.port,
  81.                     baudrate=self.baudrate,
  82.                     timeout=1
  83.                 )
  84.                
  85.                 print(f"Successfully connected to {self.port}")
  86.                 return True
  87.             
  88.             except serial.SerialException as e:
  89.                 print(f"Connection attempt {retries} failed: {e}")
  90.                 if retries < self.max_retries:
  91.                     time.sleep(1)  # 等待1秒后重试
  92.         
  93.         print(f"Failed to connect to {self.port} after {self.max_retries} attempts")
  94.         return False
  95.    
  96.     def _disconnect_serial(self):
  97.         """断开串口连接"""
  98.         if self.serial_conn is not None and self.serial_conn.is_open:
  99.             try:
  100.                 self.serial_conn.close()
  101.                 print("Serial port disconnected")
  102.             except Exception as e:
  103.                 print(f"Error disconnecting serial port: {e}")
  104.             finally:
  105.                 self.serial_conn = None
  106.    
  107.     def _collect_data(self):
  108.         """采集数据并写入CSV文件"""
  109.         print("Starting data collection...")
  110.         
  111.         while self.running:
  112.             try:
  113.                 # 读取一行数据
  114.                 line = self.serial_conn.readline()
  115.                 if not line:
  116.                     continue
  117.                
  118.                 # 解析JSON数据
  119.                 try:
  120.                     data = json.loads(line.decode('utf-8').strip())
  121.                 except json.JSONDecodeError:
  122.                     print(f"Invalid JSON data: {line}")
  123.                     continue
  124.                
  125.                 # 验证数据格式
  126.                 if not all(key in data for key in ['temperature', 'humidity', 'pressure']):
  127.                     print(f"Incomplete data: {data}")
  128.                     continue
  129.                
  130.                 # 添加时间戳
  131.                 data['timestamp'] = datetime.now().isoformat()
  132.                
  133.                 # 写入CSV文件
  134.                 self._write_to_csv(data)
  135.                
  136.                 # 打印数据
  137.                 print(f"Collected: {data}")
  138.                
  139.             except serial.SerialException as e:
  140.                 print(f"Serial error: {e}")
  141.                 # 尝试重新连接
  142.                 self._disconnect_serial()
  143.                 if not self._connect_serial():
  144.                     print("Failed to reconnect, stopping data collection")
  145.                     break
  146.             
  147.             except Exception as e:
  148.                 print(f"Unexpected error: {e}")
  149.                 # 短暂休眠后继续
  150.                 time.sleep(1)
  151.    
  152.     def _write_to_csv(self, data):
  153.         """将数据写入CSV文件"""
  154.         try:
  155.             with open(self.output_file, 'a', newline='') as csvfile:
  156.                 fieldnames = ['timestamp', 'temperature', 'humidity', 'pressure']
  157.                 writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
  158.                 writer.writerow(data)
  159.         except Exception as e:
  160.             print(f"Error writing to CSV file: {e}")
  161. # 使用示例
  162. if __name__ == "__main__":
  163.     collector = SensorDataCollector('COM1', 9600, 'sensor_data.csv')
  164.    
  165.     try:
  166.         collector.start()
  167.     except KeyboardInterrupt:
  168.         collector.stop()
复制代码

这个案例实现了一个传感器数据采集系统,具有以下特点:

1. 健壮的资源管理:使用信号处理器确保程序退出时正确关闭资源。
2. 自动重连机制:在连接断开时自动尝试重新连接。
3. 数据持久化:将采集的数据保存到CSV文件中。
4. 错误处理:处理各种可能的异常情况,确保程序稳定运行。
5. 优雅关闭:支持通过Ctrl+C或系统信号优雅地关闭程序。
  1. import serial
  2. import threading
  3. import queue
  4. import time
  5. import json
  6. from enum import Enum, auto
  7. from typing import Dict, List, Optional, Callable, Any
  8. class DeviceType(Enum):
  9.     """设备类型枚举"""
  10.     TEMPERATURE_SENSOR = auto()
  11.     HUMIDITY_SENSOR = auto()
  12.     PRESSURE_SENSOR = auto()
  13.     GPS_MODULE = auto()
  14.     CUSTOM_DEVICE = auto()
  15. class DeviceStatus(Enum):
  16.     """设备状态枚举"""
  17.     DISCONNECTED = auto()
  18.     CONNECTING = auto()
  19.     CONNECTED = auto()
  20.     ERROR = auto()
  21.     RECONNECTING = auto()
  22. class SerialDevice:
  23.     """串口设备类"""
  24.     def __init__(self, device_id: str, port: str, baudrate: int = 9600,
  25.                  device_type: DeviceType = DeviceType.CUSTOM_DEVICE,
  26.                  timeout: float = 1.0, reconnect_attempts: int = 3,
  27.                  reconnect_delay: float = 1.0):
  28.         self.device_id = device_id
  29.         self.port = port
  30.         self.baudrate = baudrate
  31.         self.device_type = device_type
  32.         self.timeout = timeout
  33.         self.reconnect_attempts = reconnect_attempts
  34.         self.reconnect_delay = reconnect_delay
  35.         
  36.         self.serial_conn = None
  37.         self.status = DeviceStatus.DISCONNECTED
  38.         self.lock = threading.Lock()
  39.         self.data_queue = queue.Queue()
  40.         self.command_queue = queue.Queue()
  41.         self.running = False
  42.         self.worker_thread = None
  43.         self.data_callback = None
  44.         self.error_callback = None
  45.         
  46.         # 设备特定配置
  47.         self.config = {}
  48.    
  49.     def set_config(self, config: Dict[str, Any]):
  50.         """设置设备配置"""
  51.         with self.lock:
  52.             self.config.update(config)
  53.    
  54.     def get_config(self, key: str, default: Any = None) -> Any:
  55.         """获取设备配置"""
  56.         with self.lock:
  57.             return self.config.get(key, default)
  58.    
  59.     def set_data_callback(self, callback: Callable[[str, bytes], None]):
  60.         """设置数据回调函数"""
  61.         self.data_callback = callback
  62.    
  63.     def set_error_callback(self, callback: Callable[[str, Exception], None]):
  64.         """设置错误回调函数"""
  65.         self.error_callback = callback
  66.    
  67.     def connect(self) -> bool:
  68.         """连接设备"""
  69.         with self.lock:
  70.             if self.status == DeviceStatus.CONNECTED:
  71.                 return True
  72.             
  73.             self.status = DeviceStatus.CONNECTING
  74.             
  75.             attempts = 0
  76.             while attempts < self.reconnect_attempts:
  77.                 try:
  78.                     attempts += 1
  79.                     print(f"[{self.device_id}] Attempting to connect to {self.port} (attempt {attempts}/{self.reconnect_attempts})")
  80.                     
  81.                     self.serial_conn = serial.Serial(
  82.                         port=self.port,
  83.                         baudrate=self.baudrate,
  84.                         timeout=self.timeout
  85.                     )
  86.                     
  87.                     self.status = DeviceStatus.CONNECTED
  88.                     print(f"[{self.device_id}] Successfully connected to {self.port}")
  89.                     return True
  90.                
  91.                 except serial.SerialException as e:
  92.                     print(f"[{self.device_id}] Connection attempt {attempts} failed: {e}")
  93.                     if attempts < self.reconnect_attempts:
  94.                         time.sleep(self.reconnect_delay)
  95.             
  96.             self.status = DeviceStatus.ERROR
  97.             print(f"[{self.device_id}] Failed to connect to {self.port} after {self.reconnect_attempts} attempts")
  98.             
  99.             if self.error_callback:
  100.                 self.error_callback(self.device_id, serial.SerialException(f"Failed to connect after {self.reconnect_attempts} attempts"))
  101.             
  102.             return False
  103.    
  104.     def disconnect(self):
  105.         """断开设备连接"""
  106.         with self.lock:
  107.             self.running = False
  108.             
  109.             if self.serial_conn is not None and self.serial_conn.is_open:
  110.                 try:
  111.                     self.serial_conn.close()
  112.                     print(f"[{self.device_id}] Serial port disconnected")
  113.                 except Exception as e:
  114.                     print(f"[{self.device_id}] Error disconnecting serial port: {e}")
  115.                 finally:
  116.                     self.serial_conn = None
  117.             
  118.             self.status = DeviceStatus.DISCONNECTED
  119.    
  120.     def start(self):
  121.         """启动设备通信"""
  122.         with self.lock:
  123.             if self.running:
  124.                 return
  125.             
  126.             if not self.connect():
  127.                 return
  128.             
  129.             self.running = True
  130.             self.worker_thread = threading.Thread(target=self._worker, daemon=True)
  131.             self.worker_thread.start()
  132.             print(f"[{self.device_id}] Device communication started")
  133.    
  134.     def stop(self):
  135.         """停止设备通信"""
  136.         with self.lock:
  137.             self.running = False
  138.             
  139.             if self.worker_thread and self.worker_thread.is_alive():
  140.                 self.worker_thread.join(timeout=1.0)
  141.             
  142.             self.disconnect()
  143.             print(f"[{self.device_id}] Device communication stopped")
  144.    
  145.     def send_command(self, command: str or bytes) -> bool:
  146.         """发送命令到设备"""
  147.         with self.lock:
  148.             if not self.running or self.status != DeviceStatus.CONNECTED:
  149.                 print(f"[{self.device_id}] Cannot send command, device not connected")
  150.                 return False
  151.             
  152.             try:
  153.                 if isinstance(command, str):
  154.                     command = command.encode('utf-8')
  155.                
  156.                 self.command_queue.put(command)
  157.                 return True
  158.             
  159.             except Exception as e:
  160.                 print(f"[{self.device_id}] Error queuing command: {e}")
  161.                 return False
  162.    
  163.     def _worker(self):
  164.         """工作线程,处理数据收发"""
  165.         while self.running:
  166.             try:
  167.                 # 处理命令发送
  168.                 try:
  169.                     while not self.command_queue.empty():
  170.                         command = self.command_queue.get_nowait()
  171.                         self._send_data(command)
  172.                 except queue.Empty:
  173.                     pass
  174.                
  175.                 # 读取数据
  176.                 data = self._read_data()
  177.                 if data:
  178.                     # 处理数据
  179.                     self._process_data(data)
  180.                
  181.                 # 短暂休眠,避免CPU占用过高
  182.                 time.sleep(0.01)
  183.             
  184.             except Exception as e:
  185.                 print(f"[{self.device_id}] Error in worker thread: {e}")
  186.                
  187.                 if self.error_callback:
  188.                     self.error_callback(self.device_id, e)
  189.                
  190.                 # 尝试重新连接
  191.                 self._handle_connection_error()
  192.    
  193.     def _send_data(self, data: bytes) -> bool:
  194.         """发送数据到设备"""
  195.         try:
  196.             self.serial_conn.write(data)
  197.             self.serial_conn.flush()
  198.             print(f"[{self.device_id}] Sent data: {data}")
  199.             return True
  200.         
  201.         except serial.SerialTimeoutException:
  202.             print(f"[{self.device_id}] Timeout while sending data")
  203.             return False
  204.         except serial.SerialException as e:
  205.             print(f"[{self.device_id}] Serial error while sending data: {e}")
  206.             self._handle_connection_error()
  207.             return False
  208.         except Exception as e:
  209.             print(f"[{self.device_id}] Unexpected error while sending data: {e}")
  210.             return False
  211.    
  212.     def _read_data(self, size: int = 1) -> Optional[bytes]:
  213.         """从设备读取数据"""
  214.         try:
  215.             data = self.serial_conn.read(size)
  216.             if data:
  217.                 print(f"[{self.device_id}] Received data: {data}")
  218.             return data
  219.         
  220.         except serial.SerialTimeoutException:
  221.             # 超时是正常的,不记录错误
  222.             return None
  223.         except serial.SerialException as e:
  224.             print(f"[{self.device_id}] Serial error while reading data: {e}")
  225.             self._handle_connection_error()
  226.             return None
  227.         except Exception as e:
  228.             print(f"[{self.device_id}] Unexpected error while reading data: {e}")
  229.             return None
  230.    
  231.     def _read_line(self) -> Optional[bytes]:
  232.         """从设备读取一行数据"""
  233.         try:
  234.             line = self.serial_conn.readline()
  235.             if line:
  236.                 print(f"[{self.device_id}] Received line: {line}")
  237.             return line
  238.         
  239.         except serial.SerialTimeoutException:
  240.             # 超时是正常的,不记录错误
  241.             return None
  242.         except serial.SerialException as e:
  243.             print(f"[{self.device_id}] Serial error while reading line: {e}")
  244.             self._handle_connection_error()
  245.             return None
  246.         except Exception as e:
  247.             print(f"[{self.device_id}] Unexpected error while reading line: {e}")
  248.             return None
  249.    
  250.     def _process_data(self, data: bytes):
  251.         """处理接收到的数据"""
  252.         # 将数据放入队列
  253.         self.data_queue.put(data)
  254.         
  255.         # 调用回调函数
  256.         if self.data_callback:
  257.             try:
  258.                 self.data_callback(self.device_id, data)
  259.             except Exception as e:
  260.                 print(f"[{self.device_id}] Error in data callback: {e}")
  261.    
  262.     def _handle_connection_error(self):
  263.         """处理连接错误"""
  264.         with self.lock:
  265.             self.status = DeviceStatus.RECONNECTING
  266.             
  267.             try:
  268.                 if self.serial_conn is not None and self.serial_conn.is_open:
  269.                     self.serial_conn.close()
  270.             except:
  271.                 pass
  272.             finally:
  273.                 self.serial_conn = None
  274.             
  275.             # 尝试重新连接
  276.             if self.running:
  277.                 print(f"[{self.device_id}] Attempting to reconnect...")
  278.                 if not self.connect():
  279.                     self.status = DeviceStatus.ERROR
  280.                     print(f"[{self.device_id}] Failed to reconnect")
  281.                 else:
  282.                     print(f"[{self.device_id}] Reconnected successfully")
  283. class SerialDeviceManager:
  284.     """串口设备管理器"""
  285.     def __init__(self):
  286.         self.devices: Dict[str, SerialDevice] = {}
  287.         self.lock = threading.Lock()
  288.         self.running = False
  289.    
  290.     def add_device(self, device_id: str, port: str, baudrate: int = 9600,
  291.                    device_type: DeviceType = DeviceType.CUSTOM_DEVICE,
  292.                    timeout: float = 1.0, reconnect_attempts: int = 3,
  293.                    reconnect_delay: float = 1.0) -> bool:
  294.         """添加设备"""
  295.         with self.lock:
  296.             if device_id in self.devices:
  297.                 print(f"[Manager] Device {device_id} already exists")
  298.                 return False
  299.             
  300.             device = SerialDevice(
  301.                 device_id=device_id,
  302.                 port=port,
  303.                 baudrate=baudrate,
  304.                 device_type=device_type,
  305.                 timeout=timeout,
  306.                 reconnect_attempts=reconnect_attempts,
  307.                 reconnect_delay=reconnect_delay
  308.             )
  309.             
  310.             # 设置回调函数
  311.             device.set_data_callback(self._on_device_data)
  312.             device.set_error_callback(self._on_device_error)
  313.             
  314.             self.devices[device_id] = device
  315.             print(f"[Manager] Added device {device_id} on port {port}")
  316.             return True
  317.    
  318.     def remove_device(self, device_id: str) -> bool:
  319.         """移除设备"""
  320.         with self.lock:
  321.             if device_id not in self.devices:
  322.                 print(f"[Manager] Device {device_id} not found")
  323.                 return False
  324.             
  325.             device = self.devices[device_id]
  326.             device.stop()
  327.             del self.devices[device_id]
  328.             print(f"[Manager] Removed device {device_id}")
  329.             return True
  330.    
  331.     def start_device(self, device_id: str) -> bool:
  332.         """启动设备"""
  333.         with self.lock:
  334.             if device_id not in self.devices:
  335.                 print(f"[Manager] Device {device_id} not found")
  336.                 return False
  337.             
  338.             device = self.devices[device_id]
  339.             device.start()
  340.             return True
  341.    
  342.     def stop_device(self, device_id: str) -> bool:
  343.         """停止设备"""
  344.         with self.lock:
  345.             if device_id not in self.devices:
  346.                 print(f"[Manager] Device {device_id} not found")
  347.                 return False
  348.             
  349.             device = self.devices[device_id]
  350.             device.stop()
  351.             return True
  352.    
  353.     def start_all(self):
  354.         """启动所有设备"""
  355.         with self.lock:
  356.             self.running = True
  357.             for device_id, device in self.devices.items():
  358.                 device.start()
  359.             print(f"[Manager] Started all devices")
  360.    
  361.     def stop_all(self):
  362.         """停止所有设备"""
  363.         with self.lock:
  364.             self.running = False
  365.             for device_id, device in self.devices.items():
  366.                 device.stop()
  367.             print(f"[Manager] Stopped all devices")
  368.    
  369.     def send_command(self, device_id: str, command: str or bytes) -> bool:
  370.         """发送命令到指定设备"""
  371.         with self.lock:
  372.             if device_id not in self.devices:
  373.                 print(f"[Manager] Device {device_id} not found")
  374.                 return False
  375.             
  376.             device = self.devices[device_id]
  377.             return device.send_command(command)
  378.    
  379.     def get_device_status(self, device_id: str) -> Optional[DeviceStatus]:
  380.         """获取设备状态"""
  381.         with self.lock:
  382.             if device_id not in self.devices:
  383.                 return None
  384.             
  385.             device = self.devices[device_id]
  386.             return device.status
  387.    
  388.     def get_device_data(self, device_id: str, timeout: float = 1.0) -> Optional[bytes]:
  389.         """获取设备数据"""
  390.         with self.lock:
  391.             if device_id not in self.devices:
  392.                 return None
  393.             
  394.             device = self.devices[device_id]
  395.             try:
  396.                 return device.data_queue.get(timeout=timeout)
  397.             except queue.Empty:
  398.                 return None
  399.    
  400.     def set_device_config(self, device_id: str, config: Dict[str, Any]) -> bool:
  401.         """设置设备配置"""
  402.         with self.lock:
  403.             if device_id not in self.devices:
  404.                 return False
  405.             
  406.             device = self.devices[device_id]
  407.             device.set_config(config)
  408.             return True
  409.    
  410.     def get_device_config(self, device_id: str, key: str, default: Any = None) -> Any:
  411.         """获取设备配置"""
  412.         with self.lock:
  413.             if device_id not in self.devices:
  414.                 return default
  415.             
  416.             device = self.devices[device_id]
  417.             return device.get_config(key, default)
  418.    
  419.     def _on_device_data(self, device_id: str, data: bytes):
  420.         """设备数据回调"""
  421.         print(f"[Manager] Received data from {device_id}: {data}")
  422.         # 这里可以添加全局数据处理逻辑
  423.    
  424.     def _on_device_error(self, device_id: str, error: Exception):
  425.         """设备错误回调"""
  426.         print(f"[Manager] Error from {device_id}: {error}")
  427.         # 这里可以添加全局错误处理逻辑
  428. # 使用示例
  429. def main():
  430.     # 创建设备管理器
  431.     manager = SerialDeviceManager()
  432.    
  433.     try:
  434.         # 添加设备
  435.         manager.add_device("temp_sensor", "COM1", 9600, DeviceType.TEMPERATURE_SENSOR)
  436.         manager.add_device("humidity_sensor", "COM2", 9600, DeviceType.HUMIDITY_SENSOR)
  437.         manager.add_device("gps_module", "COM3", 4800, DeviceType.GPS_MODULE)
  438.         
  439.         # 设置设备配置
  440.         manager.set_device_config("temp_sensor", {"sample_rate": 1.0, "unit": "celsius"})
  441.         manager.set_device_config("humidity_sensor", {"sample_rate": 2.0, "unit": "percent"})
  442.         manager.set_device_config("gps_module", {"update_rate": 1.0, "format": "NMEA"})
  443.         
  444.         # 启动所有设备
  445.         manager.start_all()
  446.         
  447.         # 主循环
  448.         running = True
  449.         while running:
  450.             try:
  451.                 # 获取并处理设备数据
  452.                 for device_id in ["temp_sensor", "humidity_sensor", "gps_module"]:
  453.                     data = manager.get_device_data(device_id, timeout=0.1)
  454.                     if data:
  455.                         try:
  456.                             # 尝试解析JSON数据
  457.                             parsed_data = json.loads(data.decode('utf-8'))
  458.                             print(f"[Main] Parsed data from {device_id}: {parsed_data}")
  459.                         except json.JSONDecodeError:
  460.                             print(f"[Main] Non-JSON data from {device_id}: {data}")
  461.                
  462.                 # 定期发送命令
  463.                 current_time = time.time()
  464.                 if int(current_time) % 10 == 0:  # 每10秒发送一次命令
  465.                     manager.send_command("temp_sensor", "GET_DATA\n")
  466.                     manager.send_command("humidity_sensor", "GET_DATA\n")
  467.                     manager.send_command("gps_module", "GET_LOCATION\n")
  468.                
  469.                 # 短暂休眠,避免CPU占用过高
  470.                 time.sleep(0.1)
  471.             
  472.             except KeyboardInterrupt:
  473.                 print("\n[Main] Received keyboard interrupt, shutting down...")
  474.                 running = False
  475.    
  476.     finally:
  477.         # 确保所有设备都停止
  478.         manager.stop_all()
  479. if __name__ == "__main__":
  480.     main()
复制代码

这个案例实现了一个多设备串口通信管理器,具有以下特点:

1. 多设备管理:可以同时管理多个串口设备,每个设备独立运行。
2. 线程安全:使用锁确保多线程环境下的安全操作。
3. 异步通信:每个设备在独立的线程中运行,不会阻塞主线程。
4. 队列机制:使用队列处理命令发送和数据接收,提高通信效率。
5. 回调机制:支持数据回调和错误回调,便于处理设备事件。
6. 设备状态管理:跟踪每个设备的状态,支持自动重连。
7. 配置管理:支持设备特定的配置。
8. 资源管理:确保在任何情况下都能正确释放串口资源。

常见问题与解决方案

现象:尝试打开串口时,提示”Access denied”或”Port is already open”。

原因:

1. 串口已被其他程序占用。
2. 之前运行的程序未正确关闭串口。
3. 没有足够的权限访问串口。

解决方案:

1. 检查串口占用:
“`python
import serial.tools.list_ports

# 列出所有可用串口
   ports = serial.tools.list_ports.comports()
   for port in ports:
  1. print(f"Device: {port.device}, Description: {port.description}")
复制代码
  1. 2. **强制关闭串口**:
  2.    ```python
  3.    import serial
  4.    
  5.    def force_close_port(port):
  6.        try:
  7.            # 尝试打开并立即关闭串口
  8.            ser = serial.Serial(port)
  9.            ser.close()
  10.            print(f"Port {port} closed successfully")
  11.            return True
  12.        except serial.SerialException as e:
  13.            print(f"Failed to close port {port}: {e}")
  14.            return False
复制代码

1. 使用管理员权限运行程序:在某些系统上,访问串口可能需要管理员权限。

现象:程序异常终止后,串口仍然被占用,无法重新打开。

原因:程序崩溃时未执行串口关闭代码。

解决方案:

1. 使用try-finally确保资源释放:
“`python
import serial

ser = None
   try:
  1. ser = serial.Serial('COM1', 9600)
  2.    # 执行操作...
复制代码

finally:
  1. if ser is not None and ser.is_open:
  2.        ser.close()
复制代码
  1. 2. **使用信号处理器**:
  2.    ```python
  3.    import signal
  4.    import serial
  5.    
  6.    ser = None
  7.    
  8.    def signal_handler(signum, frame):
  9.        global ser
  10.        if ser is not None and ser.is_open:
  11.            ser.close()
  12.        exit(0)
  13.    
  14.    # 注册信号处理器
  15.    signal.signal(signal.SIGINT, signal_handler)
  16.    signal.signal(signal.SIGTERM, signal_handler)
  17.    
  18.    # 主程序
  19.    try:
  20.        ser = serial.Serial('COM1', 9600)
  21.        # 执行操作...
  22.    except Exception as e:
  23.        print(f"Error: {e}")
  24.    finally:
  25.        if ser is not None and ser.is_open:
  26.            ser.close()
复制代码

1. 使用atexit注册退出函数:
“`python
import atexit
import serial

ser = None

def cleanup():
  1. global ser
  2.    if ser is not None and ser.is_open:
  3.        ser.close()
复制代码

# 注册退出函数
   atexit.register(cleanup)

# 主程序
   try:
  1. ser = serial.Serial('COM1', 9600)
  2.    # 执行操作...
复制代码

except Exception as e:
  1. print(f"Error: {e}")
复制代码
  1. #### 问题3:串口通信不稳定,数据丢失
  2. **现象**:串口通信过程中数据丢失或损坏。
  3. **原因**:
  4. 1. 缓冲区溢出。
  5. 2. 数据接收不及时。
  6. 3. 串口参数不匹配。
  7. 4. 硬件连接问题。
  8. **解决方案**:
  9. 1. **调整缓冲区大小**:
  10.    ```python
  11.    import serial
  12.    
  13.    ser = serial.Serial('COM1', 9600)
  14.    
  15.    # 设置输入缓冲区大小
  16.    ser.set_buffer_size(rx_size=65536, tx_size=65536)
复制代码

1. 使用流控制:
“`python
import serial

ser = serial.Serial(
  1. 'COM1',
  2.    9600,
  3.    rtscts=True,  # 启用硬件流控制
  4.    xonxoff=True  # 启用软件流控制
复制代码

)
  1. 3. **实现数据校验和重传机制**:
  2.    ```python
  3.    import serial
  4.    import time
  5.    
  6.    class ReliableSerial:
  7.        def __init__(self, port, baudrate=9600, max_retries=3, retry_delay=0.1):
  8.            self.serial = serial.Serial(port, baudrate)
  9.            self.max_retries = max_retries
  10.            self.retry_delay = retry_delay
  11.       
  12.        def send_with_ack(self, data, ack=b'ACK', timeout=1.0):
  13.            """发送数据并等待确认"""
  14.            retries = 0
  15.            while retries < self.max_retries:
  16.                try:
  17.                    # 发送数据
  18.                    self.serial.write(data)
  19.                    self.serial.flush()
  20.                   
  21.                    # 等待确认
  22.                    start_time = time.time()
  23.                    while time.time() - start_time < timeout:
  24.                        response = self.serial.read(len(ack))
  25.                        if response == ack:
  26.                            return True
  27.                        time.sleep(0.01)
  28.                   
  29.                    # 超时,增加重试计数
  30.                    retries += 1
  31.                    if retries < self.max_retries:
  32.                        time.sleep(self.retry_delay)
  33.                
  34.                except Exception as e:
  35.                    print(f"Error in send_with_ack: {e}")
  36.                    retries += 1
  37.                    if retries < self.max_retries:
  38.                        time.sleep(self.retry_delay)
  39.            
  40.            return False
  41.       
  42.        def close(self):
  43.            if self.serial.is_open:
  44.                self.serial.close()
复制代码

1. 使用多线程或异步IO:
“`python
import serial
import threading
import queue

class AsyncSerial:
  1. def __init__(self, port, baudrate=9600):
  2.        self.serial = serial.Serial(port, baudrate)
  3.        self.receive_queue = queue.Queue()
  4.        self.running = False
  5.        self.receive_thread = None
  6.    def start(self):
  7.        """启动接收线程"""
  8.        if not self.running:
  9.            self.running = True
  10.            self.receive_thread = threading.Thread(target=self._receive_loop)
  11.            self.receive_thread.daemon = True
  12.            self.receive_thread.start()
  13.    def stop(self):
  14.        """停止接收线程"""
  15.        self.running = False
  16.        if self.receive_thread and self.receive_thread.is_alive():
  17.            self.receive_thread.join(timeout=1.0)
  18.    def _receive_loop(self):
  19.        """接收循环"""
  20.        while self.running:
  21.            try:
  22.                if self.serial.in_waiting > 0:
  23.                    data = self.serial.read(self.serial.in_waiting)
  24.                    self.receive_queue.put(data)
  25.                else:
  26.                    time.sleep(0.01)  # 短暂休眠,避免CPU占用过高
  27.            except Exception as e:
  28.                print(f"Error in receive loop: {e}")
  29.                time.sleep(0.1)
  30.    def send(self, data):
  31.        """发送数据"""
  32.        try:
  33.            self.serial.write(data)
  34.            self.serial.flush()
  35.            return True
  36.        except Exception as e:
  37.            print(f"Error sending data: {e}")
  38.            return False
  39.    def receive(self, timeout=1.0):
  40.        """接收数据"""
  41.        try:
  42.            return self.receive_queue.get(timeout=timeout)
  43.        except queue.Empty:
  44.            return None
  45.    def close(self):
  46.        """关闭串口"""
  47.        self.stop()
  48.        if self.serial.is_open:
  49.            self.serial.close()
复制代码
  1. #### 问题4:跨平台串口兼容性问题
  2. **现象**:程序在不同操作系统上表现不一致,特别是在串口命名和参数方面。
  3. **原因**:不同操作系统对串口的命名方式和参数支持不同。
  4. **解决方案**:
  5. 1. **使用条件判断处理不同平台**:
  6.    ```python
  7.    import serial
  8.    import sys
  9.    import platform
  10.    
  11.    def get_platform_compatible_port(port):
  12.        """获取平台兼容的串口名称"""
  13.        system = platform.system()
  14.       
  15.        if system == 'Windows':
  16.            # Windows平台使用COMx格式
  17.            if not port.upper().startswith('COM'):
  18.                return f'COM{port}'
  19.            return port
  20.       
  21.        elif system == 'Linux':
  22.            # Linux平台使用/dev/ttyXXX格式
  23.            if not port.startswith('/dev/'):
  24.                return f'/dev/{port}'
  25.            return port
  26.       
  27.        elif system == 'Darwin':  # macOS
  28.            # macOS平台使用/dev/cu.XXX或/dev/tty.XXX格式
  29.            if not port.startswith('/dev/'):
  30.                return f'/dev/cu.{port}'
  31.            return port
  32.       
  33.        else:
  34.            # 未知平台,返回原始名称
  35.            return port
  36.    
  37.    def get_platform_compatible_params():
  38.        """获取平台兼容的串口参数"""
  39.        system = platform.system()
  40.       
  41.        params = {
  42.            'baudrate': 9600,
  43.            'bytesize': serial.EIGHTBITS,
  44.            'parity': serial.PARITY_NONE,
  45.            'stopbits': serial.STOPBITS_ONE,
  46.            'timeout': 1
  47.        }
  48.       
  49.        # 根据平台调整参数
  50.        if system == 'Windows':
  51.            params['rtscts'] = False  # Windows默认不启用RTS/CTS流控制
  52.        elif system == 'Linux':
  53.            params['rtscts'] = True   # Linux可以启用RTS/CTS流控制
  54.       
  55.        return params
  56.    
  57.    # 使用示例
  58.    port_name = "1"  # 可以是"1", "COM1", "/dev/ttyS1"等
  59.    compatible_port = get_platform_compatible_port(port_name)
  60.    compatible_params = get_platform_compatible_params()
  61.    
  62.    try:
  63.        ser = serial.Serial(compatible_port, **compatible_params)
  64.        print(f"Successfully opened {compatible_port}")
  65.        # 执行操作...
  66.        ser.close()
  67.    except serial.SerialException as e:
  68.        print(f"Failed to open {compatible_port}: {e}")
复制代码

1. 使用serial.tools.list_ports检测可用串口:
“`python
import serial
import serial.tools.list_ports

def list_available_ports():
  1. """列出所有可用串口"""
  2.    ports = serial.tools.list_ports.comports()
  3.    result = []
  4.    for port in ports:
  5.        result.append({
  6.            'device': port.device,
  7.            'description': port.description,
  8.            'hwid': port.hwid,
  9.            'vid': port.vid if hasattr(port, 'vid') else None,
  10.            'pid': port.pid if hasattr(port, 'pid') else None,
  11.            'serial_number': port.serial_number if hasattr(port, 'serial_number') else None,
  12.            'manufacturer': port.manufacturer if hasattr(port, 'manufacturer') else None,
  13.            'product': port.product if hasattr(port, 'product') else None,
  14.        })
  15.    return result
复制代码

def find_port_by_vid_pid(vid, pid):
  1. """通过VID和PID查找串口"""
  2.    ports = serial.tools.list_ports.comports()
  3.    for port in ports:
  4.        if hasattr(port, 'vid') and hasattr(port, 'pid'):
  5.            if port.vid == vid and port.pid == pid:
  6.                return port.device
  7.    return None
复制代码

def find_port_by_description(description_substring):
  1. """通过描述查找串口"""
  2.    ports = serial.tools.list_ports.comports()
  3.    for port in ports:
  4.        if description_substring.lower() in port.description.lower():
  5.            return port.device
  6.    return None
复制代码

# 使用示例
   available_ports = list_available_ports()
   print(“Available ports:”)
   for port_info in available_ports:
  1. print(f"Device: {port_info['device']}, Description: {port_info['description']}")
复制代码

# 通过VID和PID查找特定设备
   arduino_port = find_port_by_vid_pid(0x2341, 0x0043)  # Arduino Uno的VID和PID
   if arduino_port:
  1. print(f"Found Arduino at: {arduino_port}")
复制代码

# 通过描述查找设备
   ftdi_port = find_port_by_description(“FTDI”)
   if ftdi_port:
  1. print(f"Found FTDI device at: {ftdi_port}")
复制代码

”`

总结

在Python中进行串口编程时,正确管理串口资源是确保程序稳定运行的关键。本文详细介绍了串口资源的释放方法,包括使用try-finally结构、with语句(上下文管理器)以及自定义串口管理类等技术。我们还探讨了异常处理与串口释放的关系,并提供了多个实际应用案例,帮助读者理解如何在实际项目中应用这些技术。

通过遵循本文提供的最佳实践,开发者可以避免常见的串口资源管理问题,如设备占用、程序崩溃和资源泄漏等。同时,我们还讨论了常见问题的解决方案,帮助开发者快速定位和解决串口通信中的问题。

串口通信是许多嵌入式系统和工业控制应用的基础,掌握正确的资源管理技术对于开发稳定可靠的应用程序至关重要。希望本文能够帮助读者更好地理解和应用Python串口编程技术,开发出更加健壮的应用程序。
「七転び八起き(ななころびやおき)」
回复

使用道具 举报

0

主题

1304

科技点

654

积分

候风辨气

积分
654
候风辨气 发表于 2025-9-21 17:08:01 | 显示全部楼层
感謝分享
温馨提示:看帖回帖是一种美德,您的每一次发帖、回帖都是对论坛最大的支持,谢谢! [这是默认签名,点我更换签名]
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则