2026-03-10
Python
00
请注意,本文编写于 37 天前,最后修改于 37 天前,其中某些信息可能已经过时。

目录

Python 多进程程序优雅退出指南
一、问题现象
二、根本原因分析
2.1 信号处理机制
2.2 问题链条
三、完整解决方案
3.1 方案一:使用信号处理器(推荐)
3.2 方案二:使用上下文管理器
3.3 方案三:调整 multiprocessing 启动方法
3.4 方案四:综合最佳实践
四、验证修复效果
五、总结

Python 多进程程序优雅退出指南

解决 Ctrl+C 后的 resource_tracker 警告和信号量泄漏问题


一、问题现象

当使用 multiprocessing 编写 Python 程序并按 Ctrl+C (SIGINT) 中断时,会看到类似警告:

/usr/lib/python3.13/multiprocessing/resource_tracker.py:264: UserWarning: resource_tracker: There appear to be 120 leaked semaphore objects to clean up at shutdown.

二、根本原因分析

2.1 信号处理机制

Ctrl+C (SIGINT) ↓ 主进程收到信号 → 立即终止 ↓ 子进程未收到通知 → orphaned ↓ 子进程中的信号量/共享内存未释放 ↓ resource_tracker 检测到泄漏 → 警告

2.2 问题链条

问题原因
子进程未清理主进程直接退出,未发送终止信号
线程池未关闭ThreadPoolExecutorshutdown() 未被调用
信号量泄漏shutdown(wait=False) 不等待线程完成
共享内存未释放multiprocessing.Value / Array 未正确清理

三、完整解决方案

3.1 方案一:使用信号处理器(推荐)

python
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 多进程优雅退出示例 """ import signal import sys import time import os from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from multiprocessing import Process, Manager, Value import atexit import threading from typing import Optional class GracefulShutdownManager: """优雅关闭管理器""" def __init__(self): self._running = True self._executor: Optional[ThreadPoolExecutor] = None self._processes: list[Process] = [] self._lock = threading.Lock() def register_executor(self, executor: ThreadPoolExecutor): """注册线程池""" self._executor = executor def register_process(self, process: Process): """注册子进程""" with self._lock: self._processes.append(process) def shutdown(self, signum=None, frame=None): """执行优雅关闭""" print(f"\n收到信号 {signum},开始优雅关闭...") self._running = False # 1. 关闭线程池 if self._executor is not None: print("关闭线程池...") self._executor.shutdown(wait=True, cancel_futures=True) # 2. 终止子进程 with self._lock: for proc in self._processes: if proc.is_alive(): print(f"终止子进程 {proc.pid}...") proc.terminate() proc.join(timeout=5) if proc.is_alive(): proc.kill() print("优雅关闭完成") sys.exit(0) def is_running(self) -> bool: return self._running # 全局实例 shutdown_manager = GracefulShutdownManager() def worker(task_id: int, shared_counter: Value): """工作函数""" try: for i in range(10): with shared_counter.get_lock(): shared_counter.value += 1 time.sleep(0.1) print(f"Worker {task_id} 完成") except Exception as e: print(f"Worker {task_id} 错误: {e}") def main(): # 注册信号处理器 signal.signal(signal.SIGINT, shutdown_manager.shutdown) signal.signal(signal.SIGTERM, shutdown_manager.shutdown) # 注册退出清理函数 atexit.register(lambda: print("程序退出")) # 创建共享内存 manager = Manager() shared_counter = Value('i', 0) # 创建线程池 executor = ThreadPoolExecutor(max_workers=4) shutdown_manager.register_executor(executor) # 创建进程 processes = [] for i in range(3): p = Process(target=worker, args=(i, shared_counter)) p.start() processes.append(p) shutdown_manager.register_process(p) try: # 主循环 while shutdown_manager.is_running(): time.sleep(1) print(f"计数器: {shared_counter.value}") # 检查子进程状态 alive_count = sum(1 for p in processes if p.is_alive()) if alive_count == 0: break except KeyboardInterrupt: shutdown_manager.shutdown() finally: # 清理资源 executor.shutdown(wait=True) for p in processes: if p.is_alive(): p.terminate() p.join() if __name__ == "__main__": main()

3.2 方案二:使用上下文管理器

python
#!/usr/bin/env python3 """使用上下文管理器确保资源清理""" from contextlib import contextmanager from concurrent.futures import ThreadPoolExecutor import atexit import threading class ResourcePool: """资源池管理器""" _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._executor = None cls._instance._initialized = False return cls._instance def __init__(self): if not self._initialized: self._executor = ThreadPoolExecutor(max_workers=4) self._initialized = True # 注册退出清理 atexit.register(self._cleanup) def _cleanup(self): """清理所有资源""" if self._executor is not None: print("清理线程池...") self._executor.shutdown(wait=True, cancel_futures=True) self._executor = None @property def executor(self): return self._executor @contextmanager def managed_executor(max_workers=4): """管理线程池的上下文管理器""" executor = ThreadPoolExecutor(max_workers=max_workers) try: yield executor finally: executor.shutdown(wait=True, cancel_futures=True) def main(): # 方式1:使用单例资源池 pool = ResourcePool() future = pool.executor.submit(lambda: sum(range(100))) print(f"结果: {future.result()}") # 方式2:使用上下文管理器 with managed_executor(max_workers=2) as executor: future = executor.submit(lambda: "完成") print(f"结果: {future.result()}") # 程序退出时自动清理 if __name__ == "__main__": main()

3.3 方案三:调整 multiprocessing 启动方法

python
#!/usr/bin/env python3 """ multiprocessing 启动方法对比和选择 """ import multiprocessing as mp import os import sys # ============================================ # 启动方法说明: # ============================================ # 1. spawn (默认 Windows, Python 3.8+ macOS) # - 启动全新 Python 解释器 # - 最安全,但最慢 # - 适合 Windows 和 macOS # # 2. fork (默认 Linux/Unix) # - 子进程继承父进程状态 # - 速度快,但可能有状态问题 # - 适合 Linux (Python < 3.14) # # 3. forkserver (Linux) # - 预启动专用服务器 # - 介于两者之间 # ============================================ def set_start_method(): """设置启动方法""" if sys.platform == 'win32': # Windows: 使用 spawn mp.set_start_method('spawn', force=True) else: # Linux: 推荐使用 forkserver 或 spawn # spawn 更安全,避免继承状态问题 try: mp.set_start_method('spawn', force=True) except RuntimeError: # 已设置过,忽略 pass class CleanProcess(Process): """带清理的进程类""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._cleanup_callbacks = [] def register_cleanup(self, callback): """注册清理回调""" self._cleanup_callbacks.append(callback) def run(self): try: super().run() finally: # 执行清理回调 for callback in self._cleanup_callbacks: try: callback() except Exception as e: print(f"清理失败: {e}") def child_process(shared_value): """子进程工作函数""" try: for i in range(100): with shared_value.get_lock(): shared_value.value += 1 import time time.sleep(0.01) except KeyboardInterrupt: print("子进程被中断") def main(): # 设置启动方法 set_start_method() # 使用托管值 shared_value = mp.Value('i', 0) # 创建进程 processes = [] for i in range(3): p = CleanProcess(target=child_process, args=(shared_value,)) p.start() processes.append(p) try: # 等待子进程 for p in processes: p.join() except KeyboardInterrupt: print("主进程被中断,终止子进程...") for p in processes: if p.is_alive(): p.terminate() p.join() if __name__ == "__main__": main()

3.4 方案四:综合最佳实践

python
#!/usr/bin/env python3 """ 生产环境推荐的多进程程序模板 """ import signal import sys import atexit import threading from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from multiprocessing import Process, Manager, Value from typing import Optional, Callable import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class Application: """应用程序基类 - 支持优雅退出""" def __init__(self): self._executor: Optional[ThreadPoolExecutor] = None self._process_executor: Optional[ProcessPoolExecutor] = None self._processes: list[Process] = [] self._running = False self._lock = threading.Lock() self._shutdown_callbacks: list[Callable] = [] def register_shutdown_callback(self, callback: Callable): """注册关闭回调""" self._shutdown_callbacks.append(callback) def _cleanup(self): """资源清理""" logger.info("执行清理...") # 执行回调 for callback in self._shutdown_callbacks: try: callback() except Exception as e: logger.error(f"清理回调失败: {e}") # 关闭线程池 if self._executor: logger.info("关闭线程池...") self._executor.shutdown(wait=True, cancel_futures=True) self._executor = None # 关闭进程池 if self._process_executor: logger.info("关闭进程池...") self._process_executor.shutdown(wait=True, cancel_futures=True) self._process_executor = None # 终止子进程 with self._lock: for proc in self._processes: if proc.is_alive(): logger.info(f"终止进程 {proc.pid}...") proc.terminate() proc.join(timeout=5) if proc.is_alive(): proc.kill() proc.join() self._processes.clear() logger.info("清理完成") def _signal_handler(self, signum, frame): """信号处理器""" logger.info(f"收到信号 {signum}") self._running = False self._cleanup() sys.exit(0) def setup(self): """初始化""" # 注册信号处理器 signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) # 注册退出清理 atexit.register(self._cleanup) # 初始化线程池 self._executor = ThreadPoolExecutor(max_workers=4) self._executor.submit(self._heartbeat) self._running = True logger.info("应用初始化完成") def _heartbeat(self): """心跳线程""" import time while self._running: time.sleep(10) logger.debug("心跳...") def run(self): """主循环""" logger.info("应用启动") try: while self._running: # 业务逻辑 time.sleep(1) except Exception as e: logger.error(f"错误: {e}") finally: self._cleanup() def add_process(self, proc: Process): """添加子进程""" with self._lock: self._processes.append(proc) # 使用示例 if __name__ == "__main__": app = Application() # 注册清理回调 app.register_shutdown_callback(lambda: print("回调1: 自定义清理")) app.register_shutdown_callback(lambda: print("回调2: 关闭数据库连接")) # 初始化并运行 app.setup() app.run()

四、验证修复效果

python
#!/usr/bin/env python3 """验证脚本 - 测试信号量清理""" import subprocess import time import os import signal def test_graceful_shutdown(): """测试优雅退出""" # 创建测试脚本 test_code = ''' import signal import sys import time from concurrent.futures import ThreadPoolExecutor import atexit executor = ThreadPoolExecutor(max_workers=2) def worker(): for i in range(100): print(f"Working {i}") time.sleep(0.1) # 注册退出清理 def cleanup(): print("执行清理...") executor.shutdown(wait=True, cancel_futures=True) print("清理完成") atexit.register(cleanup) # 信号处理 def handler(signum, frame): print("收到信号") cleanup() sys.exit(0) signal.signal(signal.SIGINT, handler) # 运行 executor.submit(worker) time.sleep(2) executor.shutdown(wait=True) ''' with open('/tmp/test_shutdown.py', 'w') as f: f.write(test_code) # 运行并发送 SIGINT proc = subprocess.Popen( ['python', '/tmp/test_shutdown.py'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) time.sleep(1) proc.send_signal(signal.SIGINT) try: stdout, stderr = proc.communicate(timeout=5) print("STDOUT:", stdout) print("STDERR:", stderr) except subprocess.TimeoutExpired: proc.kill() print("超时") # 检查是否有泄漏警告 if "leaked semaphore" in stderr or "resource_tracker" in stderr: print("❌ 仍存在泄漏") else: print("✅ 无泄漏警告") if __name__ == "__main__": test_graceful_shutdown()

五、总结

问题解决方案
Ctrl+C 后子进程未退出注册 SIGINT/SIGTERM 信号处理器
线程池未关闭使用 atexit 或上下文管理器
信号量泄漏不使用 shutdown(wait=False)
进程资源未释放在关闭时调用 terminate() + join()
状态继承问题使用 spawn 启动方法

核心原则:始终确保在程序退出时释放所有资源,注册多个清理点(信号处理器 + atexit),并使用 wait=True 等待任务完成。

本文作者:sea-whales

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!