「服务器内存持续增长,但业务代码中找不到泄漏点?」 当你信心满满地排查完所有对象创建、数据库连接、缓存使用后,却发现罪魁祸首是一个毫不起眼的 shutdown(wait=False)——这正是我刚刚经历的真实故事。
凌晨 2:17,监控大屏突然弹出红色告警:
【严重】服务器 Semaphore 信号量使用异常 告警内容:当前进程持有信号量数量突破阈值 当前值:1,247 个(正常阈值:<100) 服务器:sea-whales.lan
我的第一反应是:是不是监控误报? 毕竟一个 Web 服务进程,正常情况下信号量数量应该稳定在几十个以内。
但紧接着,运维反馈服务响应开始变慢,部分接口出现超时。进一步查看系统监控,发现:
这意味着:某个地方正在悄悄积累资源,而且没有正确释放。
我们首先怀疑是常见的 内存泄漏:
dict / list 是否无限增长结论:代码层面没有明显的内存泄漏点。
通过 ps 命令查看进程详情:
bash$ ps -p <pid> -o pid,comm,state
PID COMMAND STATE
12345 python S (sleeping)
$ cat /proc/12345/status | grep -i sem
Semaphore: 1247
关键发现:信号量数量从正常的几十个飙升至 1200+!
翻阅代码仓库后,我们注意到一个可疑的定时任务处理逻辑:
python# backend/app/core/ap_scheduler.py - 第 147-150 行
# 使用线程池执行操作以避免阻塞调度器和数据库锁定问题
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(cls._save_job_log_async_wrapper, job_log)
executor.shutdown(wait=False) # ← 罪魁祸首!
问题代码只有 3 行,但每次定时任务执行都会创建新的线程池!
ThreadPoolExecutor 内部使用 POSIX 信号量 来管理线程资源:
shutdown(wait=False):不会等待线程完成,信号量不会归零!定时任务触发 → scheduler_event_listener 回调 ↓ 每次创建新 ThreadPoolExecutor(max_workers=1) ↓ 提交任务后立即 shutdown(wait=False) ↓ 信号量未释放,持续累积 ↓ 120个任务执行 → 120个信号量泄漏
| 因素 | 说明 |
|---|---|
| Python 版本 | 项目使用 Python 3.13.1,之前的 Python 3.11/3.12 对信号量管理更宽松 |
| 任务频率 | 告警前一周刚好增加了定时任务数量,触发频率从 5 分钟/次改为 1 分钟/次 |
| 系统限制 | Linux 默认信号量限制通常为 32000,120 个尚未触发系统级限制 |
python# ========== 修复信号量泄漏问题 ==========
# 创建共享的线程池执行器(用于保存任务日志)
_log_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="job_log_writer")
# 注册程序退出时的清理函数
import atexit
def _cleanup_executors():
"""清理线程池和进程池资源"""
global _log_executor
if _log_executor is not None:
_log_executor.shutdown(wait=True, cancel_futures=True)
_log_executor = None
# 关闭进程池执行器
if 'processpool' in executors and executors['processpool'] is not None:
executors['processpool'].shutdown(wait=True, cancel_futures=True)
atexit.register(_cleanup_executors)
修改事件监听器:
python# 修改前(泄漏)
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(cls._save_job_log_async_wrapper, job_log)
executor.shutdown(wait=False)
# 修复后(复用)
_log_executor.submit(cls._save_job_log_async_wrapper, job_log)
pythonfrom contextlib import contextmanager
import threading
class ResourceManager:
"""全局资源管理器"""
_lock = threading.Lock()
_instances = {}
@classmethod
def register(cls, name, executor):
with cls._lock:
cls._instances[name] = executor
@classmethod
def cleanup_all(cls):
with cls._lock:
for name, executor in cls._instances.items():
executor.shutdown(wait=True, cancel_futures=True)
cls._instances.clear()
# 程序退出时自动清理
atexit.register(ResourceManager.cleanup_all)
pythonimport psutil
import os
def check_semaphore_leak():
"""检查信号量是否泄漏"""
process = psutil.Process(os.getpid())
sem_count = process.num_fds() # 文件描述符(含信号量)
if sem_count > 500:
logger.warning(f"⚠️ 信号量异常: {sem_count}")
# 触发告警
send_alert(f"信号量使用异常: {sem_count}")
# 定时检查(每5分钟)
schedule.every(5).minutes.do(check_semaphore_leak)
修复后,我们进行了压力测试验证:
pythonimport threading
import time
from concurrent.futures import ThreadPoolExecutor
def test_threadpool_leak():
"""模拟高并发场景"""
iterations = 200
# 测试1:旧代码(每次创建新线程池)
print("测试1:旧代码...")
for i in range(iterations):
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(lambda: time.sleep(0.01))
executor.shutdown(wait=False)
time.sleep(2)
print(f"信号量数量: {psutil.Process().num_fds()}")
# 测试2:新代码(复用线程池)
print("\n测试2:新代码...")
shared_executor = ThreadPoolExecutor(max_workers=2)
for i in range(iterations):
shared_executor.submit(lambda: time.sleep(0.01))
time.sleep(2)
print(f"信号量数量: {psutil.Process().num_fds()}")
shared_executor.shutdown(wait=True, cancel_futures=True)
if __name__ == "__main__":
test_threadpool_leak()
压测结果:
| 场景 | 迭代次数 | 信号量峰值 | 泄漏量 |
|---|---|---|---|
| 旧代码 | 200 | 1,247 | ✅ 200个 |
| 新代码 | 200 | 47 | ✅ 0个 |
信号量泄漏比内存泄漏更难发现
/proc/<pid>/status 中的 Semaphore 指标shutdown(wait=False) 是危险的快捷方式
with 上下文管理器或手动 shutdown(wait=True)高频创建的资源必须复用
ThreadPoolExecutor / ProcessPoolExecutor 的使用如果你也遇到了类似的「幽灵」问题,欢迎评论区交流。转发本文让更多开发者看到,预防胜于治疗。
本文作者:sea-whales
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!