监控与漂移检测
→ 返回工程实践
模型上线后,现实世界的数据分布会随时间变化,导致模型性能悄悄下降——这就是”漂移”。没有监控的模型就像没有仪表盘的飞机,出问题时往往已经造成损失。
漂移类型
| 类型 | 定义 | 示例 |
|---|---|---|
| 数据漂移(Data Drift) | 输入特征分布 P(X) 变化 | 用户年龄分布随季节变化 |
| 概念漂移(Concept Drift) | 输入输出关系 P(Y|X) 变化 | 疫情改变了用户购物行为 |
| 标签漂移(Label Drift) | 输出分布 P(Y) 变化 | 欺诈类型出现新模式 |
| 预测漂移 | 模型输出分布变化 | 模型总是预测某一类 |
监控指标体系
系统层(基础设施)
响应时间(P50/P99)
请求错误率(4xx/5xx)
GPU/CPU 利用率
内存使用
队列积压深度
模型层(业务指标)
预测分布(各类别比例随时间变化)
输入特征统计(均值、方差、分位数)
预测置信度分布
缺失值率
效果层(Ground Truth,有延迟)
准确率 / AUC / F1(标签返回后才能算)
点击率、转化率(推荐系统)
用户反馈(👍👎)
漂移检测方法
统计检验
from scipy import stats
import numpy as np
def detect_drift_ks(reference: np.ndarray, current: np.ndarray,
threshold: float = 0.05) -> bool:
"""KS 检验:检测连续特征分布变化"""
statistic, p_value = stats.ks_2samp(reference, current)
return p_value < threshold # p 小 = 有漂移
def detect_drift_chi2(reference: np.ndarray, current: np.ndarray,
threshold: float = 0.05) -> bool:
"""卡方检验:检测类别特征分布变化"""
# 统一 bins
bins = np.unique(np.concatenate([reference, current]))
ref_counts = np.bincount(np.searchsorted(bins, reference))
cur_counts = np.bincount(np.searchsorted(bins, current))
_, p_value = stats.chisquare(cur_counts, ref_counts)
return p_value < thresholdPSI(Population Stability Index)
评估评分/概率分布的稳定性,信贷风控中最常用:
def psi(reference: np.ndarray, current: np.ndarray, bins: int = 10) -> float:
"""PSI < 0.1: 稳定;0.1-0.2: 轻微变化;> 0.2: 显著漂移"""
ref_hist, edges = np.histogram(reference, bins=bins)
cur_hist, _ = np.histogram(current, bins=edges)
# 避免除零
ref_pct = (ref_hist + 1e-6) / len(reference)
cur_pct = (cur_hist + 1e-6) / len(current)
psi_value = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
return psi_valueEvidently 监控实践
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, ClassificationPreset
# 数据漂移报告
report = Report(metrics=[DataDriftPreset()])
report.run(
reference_data=reference_df, # 训练时数据
current_data=production_df, # 生产环境最近一批数据
)
report.save_html("drift_report.html")
# 分类模型效果报告
perf_report = Report(metrics=[ClassificationPreset()])
perf_report.run(
reference_data=test_df,
current_data=production_df.dropna(subset=["label"]), # 有 Ground Truth 的部分
)告警策略
class ModelMonitor:
def __init__(self, reference_data, alert_webhook):
self.reference = reference_data
self.webhook = alert_webhook
self.thresholds = {
"psi": 0.2,
"error_rate": 0.01,
"p99_latency_ms": 1000,
}
def check(self, current_data, metrics):
issues = []
# PSI 检测
psi_val = psi(self.reference["score"], current_data["score"])
if psi_val > self.thresholds["psi"]:
issues.append(f"PSI 超阈值: {psi_val:.3f}")
# 延迟检测
if metrics["p99_latency"] > self.thresholds["p99_latency_ms"]:
issues.append(f"P99 延迟过高: {metrics['p99_latency']}ms")
if issues:
self.alert(issues)
def alert(self, issues):
import requests
requests.post(self.webhook, json={
"text": f"模型监控告警:\n" + "\n".join(f"• {i}" for i in issues)
})重训触发策略
| 触发条件 | 说明 |
|---|---|
| 定时重训 | 每周/每月定期用新数据重训,成本可控 |
| 漂移触发 | PSI > 阈值时自动触发,敏感但可能误报 |
| 效果触发 | AUC 下降超过 N% 时触发,最准确但有延迟 |
| 数据量触发 | 新数据累积到 N 条时触发,适合增量学习 |
监控工具生态
| 工具 | 类型 | 特点 |
|---|---|---|
| Evidently | 开源 | 报告丰富,易集成 |
| Arize | SaaS | LLM + 传统 ML,告警完善 |
| WhyLabs | SaaS | 数据隐私友好,可本地化 |
| Grafana + Prometheus | 开源 | 系统指标监控,需自建模型指标采集 |
| MLflow | 开源 | 指标存储,但无实时告警 |