聚类算法 #
概述 #
聚类是一种无监督学习方法,将相似的数据点分组到同一簇中,发现数据的内在结构。
聚类类型 #
| 类型 | 描述 | 代表算法 |
|---|---|---|
| 划分聚类 | 将数据划分为 K 个簇 | K-Means |
| 层次聚类 | 构建层次化的簇结构 | Agglomerative |
| 密度聚类 | 基于密度发现簇 | DBSCAN |
| 谱聚类 | 基于图论的方法 | Spectral Clustering |
K-Means #
基本使用 #
python
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs
import matplotlib.pyplot as plt
X, y_true = make_blobs(n_samples=300, centers=4, cluster_std=0.60, random_state=42)
kmeans = KMeans(n_clusters=4, random_state=42)
y_pred = kmeans.fit_predict(X)
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis')
plt.scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 1],
s=200, c='red', marker='X')
plt.title('K-Means Clustering')
重要参数 #
| 参数 | 描述 | 默认值 |
|---|---|---|
n_clusters |
簇的数量 | 8 |
init |
初始化方法 | ‘k-means++’ |
n_init |
运行次数 | 10 |
max_iter |
最大迭代次数 | 300 |
random_state |
随机种子 | None |
初始化方法 #
python
kmeans_pp = KMeans(n_clusters=4, init='k-means++')
kmeans_random = KMeans(n_clusters=4, init='random')
kmeans_custom = KMeans(n_clusters=4, init=X[:4])
选择最佳 K 值 #
肘部法则 #
python
inertias = []
K_range = range(1, 11)
for k in K_range:
kmeans = KMeans(n_clusters=k, random_state=42)
kmeans.fit(X)
inertias.append(kmeans.inertia_)
plt.plot(K_range, inertias, 'bo-')
plt.xlabel('K')
plt.ylabel('Inertia')
plt.title('Elbow Method')
轮廓系数 #
python
from sklearn.metrics import silhouette_score
silhouette_scores = []
for k in range(2, 11):
kmeans = KMeans(n_clusters=k, random_state=42)
labels = kmeans.fit_predict(X)
score = silhouette_score(X, labels)
silhouette_scores.append(score)
plt.plot(range(2, 11), silhouette_scores, 'bo-')
plt.xlabel('K')
plt.ylabel('Silhouette Score')
Mini-Batch K-Means #
python
from sklearn.cluster import MiniBatchKMeans
mbk = MiniBatchKMeans(
n_clusters=4,
batch_size=100,
random_state=42
)
mbk.fit(X)
print(f"簇中心: {mbk.cluster_centers_.shape}")
层次聚类 #
凝聚聚类 #
python
from sklearn.cluster import AgglomerativeClustering
agg = AgglomerativeClustering(
n_clusters=4,
linkage='ward'
)
y_pred = agg.fit_predict(X)
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis')
链接方式 #
| 方式 | 描述 |
|---|---|
ward |
最小方差合并 |
complete |
最大距离 |
average |
平均距离 |
single |
最小距离 |
python
linkages = ['ward', 'complete', 'average', 'single']
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
for ax, linkage in zip(axes.ravel(), linkages):
agg = AgglomerativeClustering(n_clusters=4, linkage=linkage)
labels = agg.fit_predict(X)
ax.scatter(X[:, 0], X[:, 1], c=labels, cmap='viridis')
ax.set_title(f'Linkage: {linkage}')
树状图 #
python
from scipy.cluster.hierarchy import dendrogram, linkage
Z = linkage(X, method='ward')
plt.figure(figsize=(12, 6))
dendrogram(Z)
plt.title('Hierarchical Clustering Dendrogram')
plt.xlabel('Sample Index')
plt.ylabel('Distance')
DBSCAN #
基本使用 #
python
from sklearn.cluster import DBSCAN
dbscan = DBSCAN(eps=0.5, min_samples=5)
y_pred = dbscan.fit_predict(X)
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis')
plt.title('DBSCAN Clustering')
n_clusters = len(set(y_pred)) - (1 if -1 in y_pred else 0)
n_noise = list(y_pred).count(-1)
print(f"簇数量: {n_clusters}")
print(f"噪声点: {n_noise}")
参数说明 #
| 参数 | 描述 | 默认值 |
|---|---|---|
eps |
邻域半径 | 0.5 |
min_samples |
最小样本数 | 5 |
metric |
距离度量 | ‘euclidean’ |
参数选择 #
python
from sklearn.neighbors import NearestNeighbors
import numpy as np
neighbors = NearestNeighbors(n_neighbors=5)
neighbors.fit(X)
distances, indices = neighbors.kneighbors(X)
distances = np.sort(distances[:, 4])
plt.plot(distances)
plt.xlabel('Points')
plt.ylabel('Distance to 5th Nearest Neighbor')
plt.title('K-distance Graph')
核心点、边界点、噪声点 #
python
dbscan = DBSCAN(eps=0.5, min_samples=5)
dbscan.fit(X)
core_samples_mask = np.zeros_like(dbscan.labels_, dtype=bool)
core_samples_mask[dbscan.core_sample_indices_] = True
print(f"核心点数量: {sum(core_samples_mask)}")
print(f"边界点数量: {sum(~core_samples_mask & (dbscan.labels_ != -1))}")
print(f"噪声点数量: {sum(dbscan.labels_ == -1)}")
谱聚类 #
基本使用 #
python
from sklearn.cluster import SpectralClustering
sc = SpectralClustering(
n_clusters=4,
affinity='rbf',
random_state=42
)
y_pred = sc.fit_predict(X)
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis')
plt.title('Spectral Clustering')
亲和矩阵 #
| 参数 | 描述 |
|---|---|
rbf |
RBF 核 |
nearest_neighbors |
K近邻 |
precomputed |
预计算 |
python
sc_rbf = SpectralClustering(n_clusters=4, affinity='rbf', gamma=1.0)
sc_knn = SpectralClustering(n_clusters=4, affinity='nearest_neighbors', n_neighbors=10)
高斯混合模型 #
GMM 聚类 #
python
from sklearn.mixture import GaussianMixture
gmm = GaussianMixture(
n_components=4,
covariance_type='full',
random_state=42
)
y_pred = gmm.fit_predict(X)
plt.scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis')
协方差类型 #
| 类型 | 描述 |
|---|---|
full |
每个分量有自己的协方差矩阵 |
tied |
所有分量共享协方差矩阵 |
diag |
对角协方差矩阵 |
spherical |
球形协方差矩阵 |
概率预测 #
python
proba = gmm.predict_proba(X)
print(f"概率形状: {proba.shape}")
print(f"第一个样本概率: {proba[0]}")
选择分量数 #
python
n_components_range = range(1, 11)
bics = []
aics = []
for n in n_components_range:
gmm = GaussianMixture(n_components=n, random_state=42)
gmm.fit(X)
bics.append(gmm.bic(X))
aics.append(gmm.aic(X))
plt.plot(n_components_range, bics, label='BIC')
plt.plot(n_components_range, aics, label='AIC')
plt.legend()
plt.xlabel('n_components')
聚类评估 #
内部评估指标 #
python
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
labels = kmeans.fit_predict(X)
print(f"轮廓系数: {silhouette_score(X, labels):.4f}")
print(f"Calinski-Harabasz: {calinski_harabasz_score(X, labels):.4f}")
print(f"Davies-Bouldin: {davies_bouldin_score(X, labels):.4f}")
指标说明 #
| 指标 | 范围 | 最佳值 |
|---|---|---|
| 轮廓系数 | [-1, 1] | 越大越好 |
| Calinski-Harabasz | [0, ∞) | 越大越好 |
| Davies-Bouldin | [0, ∞) | 越小越好 |
外部评估指标 #
python
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score, homogeneity_score
print(f"调整兰德指数: {adjusted_rand_score(y_true, labels):.4f}")
print(f"标准化互信息: {normalized_mutual_info_score(y_true, labels):.4f}")
print(f"同质性: {homogeneity_score(y_true, labels):.4f}")
算法对比 #
python
from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN, SpectralClustering
from sklearn.mixture import GaussianMixture
algorithms = {
'K-Means': KMeans(n_clusters=4, random_state=42),
'Agglomerative': AgglomerativeClustering(n_clusters=4),
'DBSCAN': DBSCAN(eps=0.5, min_samples=5),
'Spectral': SpectralClustering(n_clusters=4, random_state=42),
'GMM': GaussianMixture(n_components=4, random_state=42)
}
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
for ax, (name, algo) in zip(axes.ravel(), algorithms.items()):
labels = algo.fit_predict(X)
ax.scatter(X[:, 0], X[:, 1], c=labels, cmap='viridis')
ax.set_title(name)
特殊形状聚类 #
月牙形数据 #
python
from sklearn.datasets import make_moons
X, y = make_moons(n_samples=300, noise=0.05, random_state=42)
kmeans = KMeans(n_clusters=2, random_state=42)
dbscan = DBSCAN(eps=0.2, min_samples=5)
spectral = SpectralClustering(n_clusters=2, affinity='nearest_neighbors', random_state=42)
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for ax, (name, algo) in zip(axes, [
('K-Means', kmeans),
('DBSCAN', dbscan),
('Spectral', spectral)
]):
labels = algo.fit_predict(X)
ax.scatter(X[:, 0], X[:, 1], c=labels, cmap='viridis')
ax.set_title(name)
实战示例 #
客户细分 #
python
from sklearn.preprocessing import StandardScaler
import pandas as pd
df = pd.DataFrame({
'age': [25, 30, 35, 40, 45, 50, 55, 60],
'income': [30000, 45000, 50000, 60000, 75000, 80000, 90000, 100000],
'spending_score': [40, 50, 60, 70, 80, 75, 85, 90]
})
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df)
kmeans = KMeans(n_clusters=3, random_state=42)
df['cluster'] = kmeans.fit_predict(X_scaled)
print(df.groupby('cluster').mean())
图像压缩 #
python
from sklearn.datasets import load_sample_image
import numpy as np
china = load_sample_image('china.jpg')
X = china.reshape(-1, 3)
kmeans = KMeans(n_clusters=16, random_state=42)
kmeans.fit(X)
compressed = kmeans.cluster_centers_[kmeans.labels_]
compressed = compressed.reshape(china.shape).astype(np.uint8)
fig, axes = plt.subplots(1, 2, figsize=(12, 6))
axes[0].imshow(china)
axes[0].set_title('Original')
axes[1].imshow(compressed)
axes[1].set_title('Compressed (16 colors)')
最佳实践 #
1. 数据预处理 #
python
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
2. 选择算法 #
| 数据特点 | 推荐算法 |
|---|---|
| 球形簇 | K-Means |
| 任意形状 | DBSCAN |
| 层次结构 | 层次聚类 |
| 概率分布 | GMM |
3. 确定簇数 #
python
from sklearn.metrics import silhouette_score
best_k = 2
best_score = -1
for k in range(2, 11):
kmeans = KMeans(n_clusters=k, random_state=42)
labels = kmeans.fit_predict(X)
score = silhouette_score(X, labels)
if score > best_score:
best_score = score
best_k = k
4. 处理噪声 #
python
dbscan = DBSCAN(eps=0.5, min_samples=5)
labels = dbscan.fit_predict(X)
X_clean = X[labels != -1]
labels_clean = labels[labels != -1]
下一步 #
掌握聚类算法后,继续学习 降维技术 了解如何处理高维数据!
最后更新:2026-04-04