异常检测 #

概述 #

异常检测是识别数据中与正常模式显著不同的观测值的技术,广泛应用于欺诈检测、故障诊断等领域。

异常类型 #

类型 描述 示例
点异常 单个数据点异常 信用卡异常交易
上下文异常 特定上下文下异常 夏天穿棉袄
集体异常 一组数据异常 网络流量突增

检测方法 #

方法 类型 适用场景
IsolationForest 集成方法 通用
LocalOutlierFactor 密度方法 局部异常
One-Class SVM 边界方法 高维数据
EllipticEnvelope 统计方法 正态分布

孤立森林 #

基本使用 #

python
from sklearn.ensemble import IsolationForest
from sklearn.datasets import make_blobs
import matplotlib.pyplot as plt
import numpy as np

X, _ = make_blobs(n_samples=300, centers=1, cluster_std=0.5, random_state=42)
X_outliers = np.random.uniform(low=-4, high=4, size=(20, 2))
X = np.vstack([X, X_outliers])

iso = IsolationForest(
    n_estimators=100,
    contamination=0.1,
    random_state=42
)
y_pred = iso.fit_predict(X)

plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
plt.title('Isolation Forest')
print(f"异常点数量: {sum(y_pred == -1)}")

重要参数 #

参数 描述 默认值
n_estimators 树的数量 100
contamination 异常比例 ‘auto’
max_samples 采样数量 ‘auto’
max_features 特征数量 1.0

异常分数 #

python
iso = IsolationForest(random_state=42)
iso.fit(X)

scores = iso.decision_function(X)
print(f"异常分数范围: [{scores.min():.4f}, {scores.max():.4f}]")

plt.hist(scores, bins=50)
plt.xlabel('Anomaly Score')
plt.ylabel('Count')
plt.title('Anomaly Score Distribution')

contamination 参数 #

python
contaminations = [0.01, 0.05, 0.1, 0.2]

fig, axes = plt.subplots(2, 2, figsize=(12, 10))

for ax, cont in zip(axes.ravel(), contaminations):
    iso = IsolationForest(contamination=cont, random_state=42)
    y_pred = iso.fit_predict(X)
    ax.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
    ax.set_title(f'contamination={cont}')

Local Outlier Factor (LOF) #

基本使用 #

python
from sklearn.neighbors import LocalOutlierFactor

lof = LocalOutlierFactor(
    n_neighbors=20,
    contamination=0.1
)
y_pred = lof.fit_predict(X)

plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
plt.title('Local Outlier Factor')

参数说明 #

参数 描述 默认值
n_neighbors 邻居数量 20
contamination 异常比例 ‘auto’
metric 距离度量 ‘minkowski’

LOF 分数 #

python
lof = LocalOutlierFactor(n_neighbors=20)
y_pred = lof.fit_predict(X)

scores = -lof.negative_outlier_factor_

plt.scatter(X[:, 0], X[:, 1], c=scores, cmap='coolwarm', s=10)
plt.colorbar(label='LOF Score')
plt.title('LOF Scores')

邻居数影响 #

python
neighbors_range = [5, 10, 20, 50]

fig, axes = plt.subplots(2, 2, figsize=(12, 10))

for ax, n in zip(axes.ravel(), neighbors_range):
    lof = LocalOutlierFactor(n_neighbors=n, contamination=0.1)
    y_pred = lof.fit_predict(X)
    ax.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
    ax.set_title(f'n_neighbors={n}')

One-Class SVM #

基本使用 #

python
from sklearn.svm import OneClassSVM

ocsvm = OneClassSVM(
    kernel='rbf',
    gamma='scale',
    nu=0.1
)
y_pred = ocsvm.fit_predict(X)

plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
plt.title('One-Class SVM')

参数说明 #

参数 描述 默认值
kernel 核函数 ‘rbf’
gamma 核系数 ‘scale’
nu 异常比例上界 0.5

核函数选择 #

python
kernels = ['linear', 'poly', 'rbf', 'sigmoid']

fig, axes = plt.subplots(2, 2, figsize=(12, 10))

for ax, kernel in zip(axes.ravel(), kernels):
    ocsvm = OneClassSVM(kernel=kernel, nu=0.1)
    y_pred = ocsvm.fit_predict(X)
    ax.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
    ax.set_title(f'Kernel: {kernel}')

决策边界 #

python
ocsvm = OneClassSVM(kernel='rbf', gamma=0.5, nu=0.1)
ocsvm.fit(X)

xx, yy = np.meshgrid(
    np.linspace(X[:, 0].min() - 1, X[:, 0].max() + 1, 100),
    np.linspace(X[:, 1].min() - 1, X[:, 1].max() + 1, 100)
)
Z = ocsvm.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)

plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap='Blues_r')
plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='red')
plt.scatter(X[:, 0], X[:, 1], c='black', s=10)
plt.title('One-Class SVM Decision Boundary')

Elliptic Envelope #

基本使用 #

python
from sklearn.covariance import EllipticEnvelope

ee = EllipticEnvelope(
    contamination=0.1,
    random_state=42
)
y_pred = ee.fit_predict(X)

plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
plt.title('Elliptic Envelope')

马氏距离 #

python
ee = EllipticEnvelope(contamination=0.1)
ee.fit(X)

mahal_dist = ee.mahalanobis(X)

plt.scatter(X[:, 0], X[:, 1], c=mahal_dist, cmap='coolwarm')
plt.colorbar(label='Mahalanobis Distance')
plt.title('Mahalanobis Distance')

方法对比 #

python
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope

methods = {
    'Isolation Forest': IsolationForest(contamination=0.1, random_state=42),
    'Local Outlier Factor': LocalOutlierFactor(n_neighbors=20, contamination=0.1),
    'One-Class SVM': OneClassSVM(nu=0.1),
    'Elliptic Envelope': EllipticEnvelope(contamination=0.1, random_state=42)
}

fig, axes = plt.subplots(2, 2, figsize=(12, 10))

for ax, (name, method) in zip(axes.ravel(), methods.items()):
    y_pred = method.fit_predict(X)
    ax.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='coolwarm')
    ax.set_title(f'{name}\nOutliers: {sum(y_pred == -1)}')

异常检测评估 #

有标签评估 #

python
from sklearn.metrics import classification_report, confusion_matrix

y_true = np.ones(len(X))
y_true[-20:] = -1

iso = IsolationForest(contamination=0.1, random_state=42)
y_pred = iso.fit_predict(X)

print(classification_report(y_true, y_pred, target_names=['Normal', 'Anomaly']))
print(confusion_matrix(y_true, y_pred))

ROC 曲线 #

python
from sklearn.metrics import roc_curve, auc

iso = IsolationForest(random_state=42)
iso.fit(X)
scores = -iso.decision_function(X)

fpr, tpr, thresholds = roc_curve(y_true, scores)
roc_auc = auc(fpr, tpr)

plt.plot(fpr, tpr, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.legend()
plt.title('ROC Curve for Anomaly Detection')

时间序列异常检测 #

滑动窗口方法 #

python
def detect_anomalies_rolling(X, window_size=10, threshold=3):
    anomalies = []
    for i in range(len(X) - window_size):
        window = X[i:i+window_size]
        mean = np.mean(window)
        std = np.std(window)
        if abs(X[i+window_size] - mean) > threshold * std:
            anomalies.append(i + window_size)
    return anomalies

ts = np.random.randn(100)
ts[50] = 10
ts[80] = -8

anomalies = detect_anomalies_rolling(ts)
print(f"检测到的异常位置: {anomalies}")

实战示例 #

信用卡欺诈检测 #

python
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

iso = IsolationForest(contamination=0.01, random_state=42)
iso.fit(X_train[y_train == 0])

y_pred = iso.predict(X_test)
y_pred = np.where(y_pred == 1, 0, 1)

print(classification_report(y_test, y_pred))

网络入侵检测 #

python
from sklearn.neighbors import LocalOutlierFactor

lof = LocalOutlierFactor(n_neighbors=20, contamination=0.05, novelty=True)
lof.fit(X_train)

y_pred = lof.predict(X_test)

设备故障预测 #

python
from sklearn.svm import OneClassSVM

ocsvm = OneClassSVM(kernel='rbf', gamma='auto', nu=0.05)
ocsvm.fit(X_normal)

anomaly_scores = ocsvm.decision_function(X_new)
threshold = np.percentile(anomaly_scores, 5)
anomalies = anomaly_scores < threshold

新颖性检测 #

设置 novelty=True #

python
lof = LocalOutlierFactor(n_neighbors=20, novelty=True)
lof.fit(X_train)

y_pred_test = lof.predict(X_test)

方法对比 #

方法 novelty 支持 fit_predict predict
IsolationForest
LocalOutlierFactor ✅ (novelty=False) ✅ (novelty=True)
OneClassSVM
EllipticEnvelope

最佳实践 #

1. 数据预处理 #

python
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

2. 选择方法 #

数据特点 推荐方法
高维数据 IsolationForest
局部异常 LocalOutlierFactor
正态分布 EllipticEnvelope
复杂边界 One-Class SVM

3. 设置 contamination #

python
if known_anomaly_rate:
    contamination = known_anomaly_rate
else:
    contamination = 'auto'

4. Pipeline 使用 #

python
from sklearn.pipeline import Pipeline

pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('detector', IsolationForest(contamination=0.1))
])

5. 阈值调整 #

python
iso = IsolationForest(random_state=42)
iso.fit(X)
scores = iso.decision_function(X)

threshold = np.percentile(scores, 10)
y_pred = np.where(scores < threshold, -1, 1)

下一步 #

掌握异常检测后,继续学习 交叉验证 了解模型评估技术!

最后更新:2026-04-04