解决 Ctrl+C 后的 resource_tracker 警告和信号量泄漏问题
当使用 multiprocessing 编写 Python 程序并按 Ctrl+C (SIGINT) 中断时,会看到类似警告:
/usr/lib/python3.13/multiprocessing/resource_tracker.py:264: UserWarning: resource_tracker: There appear to be 120 leaked semaphore objects to clean up at shutdown.
Ctrl+C (SIGINT) ↓ 主进程收到信号 → 立即终止 ↓ 子进程未收到通知 → orphaned ↓ 子进程中的信号量/共享内存未释放 ↓ resource_tracker 检测到泄漏 → 警告
| 问题 | 原因 |
|---|---|
| 子进程未清理 | 主进程直接退出,未发送终止信号 |
| 线程池未关闭 | ThreadPoolExecutor 的 shutdown() 未被调用 |
| 信号量泄漏 | shutdown(wait=False) 不等待线程完成 |
| 共享内存未释放 | multiprocessing.Value / Array 未正确清理 |
python#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多进程优雅退出示例
"""
import signal
import sys
import time
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Process, Manager, Value
import atexit
import threading
from typing import Optional
class GracefulShutdownManager:
"""优雅关闭管理器"""
def __init__(self):
self._running = True
self._executor: Optional[ThreadPoolExecutor] = None
self._processes: list[Process] = []
self._lock = threading.Lock()
def register_executor(self, executor: ThreadPoolExecutor):
"""注册线程池"""
self._executor = executor
def register_process(self, process: Process):
"""注册子进程"""
with self._lock:
self._processes.append(process)
def shutdown(self, signum=None, frame=None):
"""执行优雅关闭"""
print(f"\n收到信号 {signum},开始优雅关闭...")
self._running = False
# 1. 关闭线程池
if self._executor is not None:
print("关闭线程池...")
self._executor.shutdown(wait=True, cancel_futures=True)
# 2. 终止子进程
with self._lock:
for proc in self._processes:
if proc.is_alive():
print(f"终止子进程 {proc.pid}...")
proc.terminate()
proc.join(timeout=5)
if proc.is_alive():
proc.kill()
print("优雅关闭完成")
sys.exit(0)
def is_running(self) -> bool:
return self._running
# 全局实例
shutdown_manager = GracefulShutdownManager()
def worker(task_id: int, shared_counter: Value):
"""工作函数"""
try:
for i in range(10):
with shared_counter.get_lock():
shared_counter.value += 1
time.sleep(0.1)
print(f"Worker {task_id} 完成")
except Exception as e:
print(f"Worker {task_id} 错误: {e}")
def main():
# 注册信号处理器
signal.signal(signal.SIGINT, shutdown_manager.shutdown)
signal.signal(signal.SIGTERM, shutdown_manager.shutdown)
# 注册退出清理函数
atexit.register(lambda: print("程序退出"))
# 创建共享内存
manager = Manager()
shared_counter = Value('i', 0)
# 创建线程池
executor = ThreadPoolExecutor(max_workers=4)
shutdown_manager.register_executor(executor)
# 创建进程
processes = []
for i in range(3):
p = Process(target=worker, args=(i, shared_counter))
p.start()
processes.append(p)
shutdown_manager.register_process(p)
try:
# 主循环
while shutdown_manager.is_running():
time.sleep(1)
print(f"计数器: {shared_counter.value}")
# 检查子进程状态
alive_count = sum(1 for p in processes if p.is_alive())
if alive_count == 0:
break
except KeyboardInterrupt:
shutdown_manager.shutdown()
finally:
# 清理资源
executor.shutdown(wait=True)
for p in processes:
if p.is_alive():
p.terminate()
p.join()
if __name__ == "__main__":
main()
python#!/usr/bin/env python3
"""使用上下文管理器确保资源清理"""
from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor
import atexit
import threading
class ResourcePool:
"""资源池管理器"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._executor = None
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not self._initialized:
self._executor = ThreadPoolExecutor(max_workers=4)
self._initialized = True
# 注册退出清理
atexit.register(self._cleanup)
def _cleanup(self):
"""清理所有资源"""
if self._executor is not None:
print("清理线程池...")
self._executor.shutdown(wait=True, cancel_futures=True)
self._executor = None
@property
def executor(self):
return self._executor
@contextmanager
def managed_executor(max_workers=4):
"""管理线程池的上下文管理器"""
executor = ThreadPoolExecutor(max_workers=max_workers)
try:
yield executor
finally:
executor.shutdown(wait=True, cancel_futures=True)
def main():
# 方式1:使用单例资源池
pool = ResourcePool()
future = pool.executor.submit(lambda: sum(range(100)))
print(f"结果: {future.result()}")
# 方式2:使用上下文管理器
with managed_executor(max_workers=2) as executor:
future = executor.submit(lambda: "完成")
print(f"结果: {future.result()}")
# 程序退出时自动清理
if __name__ == "__main__":
main()
python#!/usr/bin/env python3
"""
multiprocessing 启动方法对比和选择
"""
import multiprocessing as mp
import os
import sys
# ============================================
# 启动方法说明:
# ============================================
# 1. spawn (默认 Windows, Python 3.8+ macOS)
# - 启动全新 Python 解释器
# - 最安全,但最慢
# - 适合 Windows 和 macOS
#
# 2. fork (默认 Linux/Unix)
# - 子进程继承父进程状态
# - 速度快,但可能有状态问题
# - 适合 Linux (Python < 3.14)
#
# 3. forkserver (Linux)
# - 预启动专用服务器
# - 介于两者之间
# ============================================
def set_start_method():
"""设置启动方法"""
if sys.platform == 'win32':
# Windows: 使用 spawn
mp.set_start_method('spawn', force=True)
else:
# Linux: 推荐使用 forkserver 或 spawn
# spawn 更安全,避免继承状态问题
try:
mp.set_start_method('spawn', force=True)
except RuntimeError:
# 已设置过,忽略
pass
class CleanProcess(Process):
"""带清理的进程类"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._cleanup_callbacks = []
def register_cleanup(self, callback):
"""注册清理回调"""
self._cleanup_callbacks.append(callback)
def run(self):
try:
super().run()
finally:
# 执行清理回调
for callback in self._cleanup_callbacks:
try:
callback()
except Exception as e:
print(f"清理失败: {e}")
def child_process(shared_value):
"""子进程工作函数"""
try:
for i in range(100):
with shared_value.get_lock():
shared_value.value += 1
import time
time.sleep(0.01)
except KeyboardInterrupt:
print("子进程被中断")
def main():
# 设置启动方法
set_start_method()
# 使用托管值
shared_value = mp.Value('i', 0)
# 创建进程
processes = []
for i in range(3):
p = CleanProcess(target=child_process, args=(shared_value,))
p.start()
processes.append(p)
try:
# 等待子进程
for p in processes:
p.join()
except KeyboardInterrupt:
print("主进程被中断,终止子进程...")
for p in processes:
if p.is_alive():
p.terminate()
p.join()
if __name__ == "__main__":
main()
python#!/usr/bin/env python3
"""
生产环境推荐的多进程程序模板
"""
import signal
import sys
import atexit
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Process, Manager, Value
from typing import Optional, Callable
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Application:
"""应用程序基类 - 支持优雅退出"""
def __init__(self):
self._executor: Optional[ThreadPoolExecutor] = None
self._process_executor: Optional[ProcessPoolExecutor] = None
self._processes: list[Process] = []
self._running = False
self._lock = threading.Lock()
self._shutdown_callbacks: list[Callable] = []
def register_shutdown_callback(self, callback: Callable):
"""注册关闭回调"""
self._shutdown_callbacks.append(callback)
def _cleanup(self):
"""资源清理"""
logger.info("执行清理...")
# 执行回调
for callback in self._shutdown_callbacks:
try:
callback()
except Exception as e:
logger.error(f"清理回调失败: {e}")
# 关闭线程池
if self._executor:
logger.info("关闭线程池...")
self._executor.shutdown(wait=True, cancel_futures=True)
self._executor = None
# 关闭进程池
if self._process_executor:
logger.info("关闭进程池...")
self._process_executor.shutdown(wait=True, cancel_futures=True)
self._process_executor = None
# 终止子进程
with self._lock:
for proc in self._processes:
if proc.is_alive():
logger.info(f"终止进程 {proc.pid}...")
proc.terminate()
proc.join(timeout=5)
if proc.is_alive():
proc.kill()
proc.join()
self._processes.clear()
logger.info("清理完成")
def _signal_handler(self, signum, frame):
"""信号处理器"""
logger.info(f"收到信号 {signum}")
self._running = False
self._cleanup()
sys.exit(0)
def setup(self):
"""初始化"""
# 注册信号处理器
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
# 注册退出清理
atexit.register(self._cleanup)
# 初始化线程池
self._executor = ThreadPoolExecutor(max_workers=4)
self._executor.submit(self._heartbeat)
self._running = True
logger.info("应用初始化完成")
def _heartbeat(self):
"""心跳线程"""
import time
while self._running:
time.sleep(10)
logger.debug("心跳...")
def run(self):
"""主循环"""
logger.info("应用启动")
try:
while self._running:
# 业务逻辑
time.sleep(1)
except Exception as e:
logger.error(f"错误: {e}")
finally:
self._cleanup()
def add_process(self, proc: Process):
"""添加子进程"""
with self._lock:
self._processes.append(proc)
# 使用示例
if __name__ == "__main__":
app = Application()
# 注册清理回调
app.register_shutdown_callback(lambda: print("回调1: 自定义清理"))
app.register_shutdown_callback(lambda: print("回调2: 关闭数据库连接"))
# 初始化并运行
app.setup()
app.run()
python#!/usr/bin/env python3
"""验证脚本 - 测试信号量清理"""
import subprocess
import time
import os
import signal
def test_graceful_shutdown():
"""测试优雅退出"""
# 创建测试脚本
test_code = '''
import signal
import sys
import time
from concurrent.futures import ThreadPoolExecutor
import atexit
executor = ThreadPoolExecutor(max_workers=2)
def worker():
for i in range(100):
print(f"Working {i}")
time.sleep(0.1)
# 注册退出清理
def cleanup():
print("执行清理...")
executor.shutdown(wait=True, cancel_futures=True)
print("清理完成")
atexit.register(cleanup)
# 信号处理
def handler(signum, frame):
print("收到信号")
cleanup()
sys.exit(0)
signal.signal(signal.SIGINT, handler)
# 运行
executor.submit(worker)
time.sleep(2)
executor.shutdown(wait=True)
'''
with open('/tmp/test_shutdown.py', 'w') as f:
f.write(test_code)
# 运行并发送 SIGINT
proc = subprocess.Popen(
['python', '/tmp/test_shutdown.py'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
time.sleep(1)
proc.send_signal(signal.SIGINT)
try:
stdout, stderr = proc.communicate(timeout=5)
print("STDOUT:", stdout)
print("STDERR:", stderr)
except subprocess.TimeoutExpired:
proc.kill()
print("超时")
# 检查是否有泄漏警告
if "leaked semaphore" in stderr or "resource_tracker" in stderr:
print("❌ 仍存在泄漏")
else:
print("✅ 无泄漏警告")
if __name__ == "__main__":
test_graceful_shutdown()
| 问题 | 解决方案 |
|---|---|
| Ctrl+C 后子进程未退出 | 注册 SIGINT/SIGTERM 信号处理器 |
| 线程池未关闭 | 使用 atexit 或上下文管理器 |
| 信号量泄漏 | 不使用 shutdown(wait=False) |
| 进程资源未释放 | 在关闭时调用 terminate() + join() |
| 状态继承问题 | 使用 spawn 启动方法 |
核心原则:始终确保在程序退出时释放所有资源,注册多个清理点(信号处理器 + atexit),并使用 wait=True 等待任务完成。
本文作者:sea-whales
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!