异常检测 #
概述 #
异常检测是识别数据中与正常模式显著不同的观测值的技术,广泛应用于欺诈检测、故障诊断等领域。
异常类型 #
| 类型 | 描述 | 示例 |
|---|---|---|
| 点异常 | 单个数据点异常 | 信用卡异常交易 |
| 上下文异常 | 特定上下文下异常 | 夏天穿棉袄 |
| 集体异常 | 一组数据异常 | 网络流量突增 |
检测方法 #
| 方法 | 类型 | 适用场景 |
|---|---|---|
| IsolationForest | 集成方法 | 通用 |
| LocalOutlierFactor | 密度方法 | 局部异常 |
| One-Class SVM | 边界方法 | 高维数据 |
| EllipticEnvelope | 统计方法 | 正态分布 |
孤立森林 #
基本使用 #
python
from sklearn.ensemble import IsolationForest
from sklearn.datasets import make_blobs
import matplotlib.pyplot as plt
import numpy as np
X, _ = make_blobs(n_samples=300, centers=1, cluster_std=0.5, random_state=42)
X_outliers = np.random.uniform(low=-4, high=4, size=(20, 2))
X = np.vstack([X, X_outliers])
iso = IsolationForest(
n_estimators=100,
contamination=0.1,
random_state=42
)
y_pred = iso.fit_predict(X)
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
plt.title('Isolation Forest')
print(f"异常点数量: {sum(y_pred == -1)}")
重要参数 #
| 参数 | 描述 | 默认值 |
|---|---|---|
n_estimators |
树的数量 | 100 |
contamination |
异常比例 | ‘auto’ |
max_samples |
采样数量 | ‘auto’ |
max_features |
特征数量 | 1.0 |
异常分数 #
python
iso = IsolationForest(random_state=42)
iso.fit(X)
scores = iso.decision_function(X)
print(f"异常分数范围: [{scores.min():.4f}, {scores.max():.4f}]")
plt.hist(scores, bins=50)
plt.xlabel('Anomaly Score')
plt.ylabel('Count')
plt.title('Anomaly Score Distribution')
contamination 参数 #
python
contaminations = [0.01, 0.05, 0.1, 0.2]
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
for ax, cont in zip(axes.ravel(), contaminations):
iso = IsolationForest(contamination=cont, random_state=42)
y_pred = iso.fit_predict(X)
ax.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
ax.set_title(f'contamination={cont}')
Local Outlier Factor (LOF) #
基本使用 #
python
from sklearn.neighbors import LocalOutlierFactor
lof = LocalOutlierFactor(
n_neighbors=20,
contamination=0.1
)
y_pred = lof.fit_predict(X)
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
plt.title('Local Outlier Factor')
参数说明 #
| 参数 | 描述 | 默认值 |
|---|---|---|
n_neighbors |
邻居数量 | 20 |
contamination |
异常比例 | ‘auto’ |
metric |
距离度量 | ‘minkowski’ |
LOF 分数 #
python
lof = LocalOutlierFactor(n_neighbors=20)
y_pred = lof.fit_predict(X)
scores = -lof.negative_outlier_factor_
plt.scatter(X[:, 0], X[:, 1], c=scores, cmap='coolwarm', s=10)
plt.colorbar(label='LOF Score')
plt.title('LOF Scores')
邻居数影响 #
python
neighbors_range = [5, 10, 20, 50]
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
for ax, n in zip(axes.ravel(), neighbors_range):
lof = LocalOutlierFactor(n_neighbors=n, contamination=0.1)
y_pred = lof.fit_predict(X)
ax.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
ax.set_title(f'n_neighbors={n}')
One-Class SVM #
基本使用 #
python
from sklearn.svm import OneClassSVM
ocsvm = OneClassSVM(
kernel='rbf',
gamma='scale',
nu=0.1
)
y_pred = ocsvm.fit_predict(X)
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
plt.title('One-Class SVM')
参数说明 #
| 参数 | 描述 | 默认值 |
|---|---|---|
kernel |
核函数 | ‘rbf’ |
gamma |
核系数 | ‘scale’ |
nu |
异常比例上界 | 0.5 |
核函数选择 #
python
kernels = ['linear', 'poly', 'rbf', 'sigmoid']
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
for ax, kernel in zip(axes.ravel(), kernels):
ocsvm = OneClassSVM(kernel=kernel, nu=0.1)
y_pred = ocsvm.fit_predict(X)
ax.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
ax.set_title(f'Kernel: {kernel}')
决策边界 #
python
ocsvm = OneClassSVM(kernel='rbf', gamma=0.5, nu=0.1)
ocsvm.fit(X)
xx, yy = np.meshgrid(
np.linspace(X[:, 0].min() - 1, X[:, 0].max() + 1, 100),
np.linspace(X[:, 1].min() - 1, X[:, 1].max() + 1, 100)
)
Z = ocsvm.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap='Blues_r')
plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='red')
plt.scatter(X[:, 0], X[:, 1], c='black', s=10)
plt.title('One-Class SVM Decision Boundary')
Elliptic Envelope #
基本使用 #
python
from sklearn.covariance import EllipticEnvelope
ee = EllipticEnvelope(
contamination=0.1,
random_state=42
)
y_pred = ee.fit_predict(X)
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
plt.title('Elliptic Envelope')
马氏距离 #
python
ee = EllipticEnvelope(contamination=0.1)
ee.fit(X)
mahal_dist = ee.mahalanobis(X)
plt.scatter(X[:, 0], X[:, 1], c=mahal_dist, cmap='coolwarm')
plt.colorbar(label='Mahalanobis Distance')
plt.title('Mahalanobis Distance')
方法对比 #
python
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope
methods = {
'Isolation Forest': IsolationForest(contamination=0.1, random_state=42),
'Local Outlier Factor': LocalOutlierFactor(n_neighbors=20, contamination=0.1),
'One-Class SVM': OneClassSVM(nu=0.1),
'Elliptic Envelope': EllipticEnvelope(contamination=0.1, random_state=42)
}
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
for ax, (name, method) in zip(axes.ravel(), methods.items()):
y_pred = method.fit_predict(X)
ax.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
ax.set_title(f'{name}\nOutliers: {sum(y_pred == -1)}')
异常检测评估 #
有标签评估 #
python
from sklearn.metrics import classification_report, confusion_matrix
y_true = np.ones(len(X))
y_true[-20:] = -1
iso = IsolationForest(contamination=0.1, random_state=42)
y_pred = iso.fit_predict(X)
print(classification_report(y_true, y_pred, target_names=['Normal', 'Anomaly']))
print(confusion_matrix(y_true, y_pred))
ROC 曲线 #
python
from sklearn.metrics import roc_curve, auc
iso = IsolationForest(random_state=42)
iso.fit(X)
scores = -iso.decision_function(X)
fpr, tpr, thresholds = roc_curve(y_true, scores)
roc_auc = auc(fpr, tpr)
plt.plot(fpr, tpr, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.legend()
plt.title('ROC Curve for Anomaly Detection')
时间序列异常检测 #
滑动窗口方法 #
python
def detect_anomalies_rolling(X, window_size=10, threshold=3):
anomalies = []
for i in range(len(X) - window_size):
window = X[i:i+window_size]
mean = np.mean(window)
std = np.std(window)
if abs(X[i+window_size] - mean) > threshold * std:
anomalies.append(i + window_size)
return anomalies
ts = np.random.randn(100)
ts[50] = 10
ts[80] = -8
anomalies = detect_anomalies_rolling(ts)
print(f"检测到的异常位置: {anomalies}")
实战示例 #
信用卡欺诈检测 #
python
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
iso = IsolationForest(contamination=0.01, random_state=42)
iso.fit(X_train[y_train == 0])
y_pred = iso.predict(X_test)
y_pred = np.where(y_pred == 1, 0, 1)
print(classification_report(y_test, y_pred))
网络入侵检测 #
python
from sklearn.neighbors import LocalOutlierFactor
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.05, novelty=True)
lof.fit(X_train)
y_pred = lof.predict(X_test)
设备故障预测 #
python
from sklearn.svm import OneClassSVM
ocsvm = OneClassSVM(kernel='rbf', gamma='auto', nu=0.05)
ocsvm.fit(X_normal)
anomaly_scores = ocsvm.decision_function(X_new)
threshold = np.percentile(anomaly_scores, 5)
anomalies = anomaly_scores < threshold
新颖性检测 #
设置 novelty=True #
python
lof = LocalOutlierFactor(n_neighbors=20, novelty=True)
lof.fit(X_train)
y_pred_test = lof.predict(X_test)
方法对比 #
| 方法 | novelty 支持 | fit_predict | predict |
|---|---|---|---|
| IsolationForest | ✅ | ✅ | ✅ |
| LocalOutlierFactor | ✅ | ✅ (novelty=False) | ✅ (novelty=True) |
| OneClassSVM | ✅ | ✅ | ✅ |
| EllipticEnvelope | ✅ | ✅ | ✅ |
最佳实践 #
1. 数据预处理 #
python
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
2. 选择方法 #
| 数据特点 | 推荐方法 |
|---|---|
| 高维数据 | IsolationForest |
| 局部异常 | LocalOutlierFactor |
| 正态分布 | EllipticEnvelope |
| 复杂边界 | One-Class SVM |
3. 设置 contamination #
python
if known_anomaly_rate:
contamination = known_anomaly_rate
else:
contamination = 'auto'
4. Pipeline 使用 #
python
from sklearn.pipeline import Pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('detector', IsolationForest(contamination=0.1))
])
5. 阈值调整 #
python
iso = IsolationForest(random_state=42)
iso.fit(X)
scores = iso.decision_function(X)
threshold = np.percentile(scores, 10)
y_pred = np.where(scores < threshold, -1, 1)
下一步 #
掌握异常检测后,继续学习 交叉验证 了解模型评估技术!
最后更新:2026-04-04