EFB 算法 #
什么是 EFB? #
EFB(Exclusive Feature Bundling,互斥特征捆绑)是 LightGBM 的核心优化算法之一。它通过将互斥特征(很少同时非零的特征)捆绑在一起,减少特征维度,从而加速训练过程。
核心思想 #
text
┌─────────────────────────────────────────────────────────────┐
│ EFB 核心思想 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 稀疏特征空间中,很多特征是互斥的: │
│ │
│ - 互斥特征:很少同时非零 │
│ - 可以合并为一个特征 │
│ - 零损失降维 │
│ │
│ 示例: │
│ 特征 A: [1, 0, 0, 2, 0, 0] │
│ 特征 B: [0, 3, 0, 0, 4, 0] │
│ 特征 C: [0, 0, 5, 0, 0, 6] │
│ │
│ 捆绑后: │
│ 特征 A+B+C: [1, 3, 5, 2, 4, 6] │
│ │
│ 特征数量: 3 → 1 │
│ 加速: 3 倍 │
│ │
└─────────────────────────────────────────────────────────────┘
算法原理 #
互斥特征检测 #
python
import numpy as np
def calculate_conflict_rate(feature_i, feature_j):
"""
计算两个特征的冲突率
Args:
feature_i: 特征 i
feature_j: 特征 j
Returns:
conflict_rate: 冲突率
"""
non_zero_i = feature_i != 0
non_zero_j = feature_j != 0
conflict = np.sum(non_zero_i & non_zero_j)
total = np.sum(non_zero_i | non_zero_j)
if total == 0:
return 0
return conflict / total
def build_conflict_matrix(feature_matrix):
"""
构建特征冲突矩阵
Args:
feature_matrix: 特征矩阵 (n_samples, n_features)
Returns:
conflict_matrix: 冲突矩阵
"""
n_features = feature_matrix.shape[1]
conflict_matrix = np.zeros((n_features, n_features))
for i in range(n_features):
for j in range(i + 1, n_features):
conflict = calculate_conflict_rate(
feature_matrix[:, i],
feature_matrix[:, j]
)
conflict_matrix[i, j] = conflict
conflict_matrix[j, i] = conflict
return conflict_matrix
features = np.array([
[1, 0, 0, 2, 0, 0],
[0, 3, 0, 0, 4, 0],
[0, 0, 5, 0, 0, 6]
]).T
conflict_matrix = build_conflict_matrix(features)
print("冲突矩阵:")
print(conflict_matrix)
贪婪特征捆绑 #
python
def greedy_feature_bundling(conflict_matrix, max_conflict_rate=0.1):
"""
贪婪特征捆绑算法
Args:
conflict_matrix: 冲突矩阵
max_conflict_rate: 最大冲突率阈值
Returns:
bundles: 特征捆绑列表
"""
n_features = conflict_matrix.shape[0]
bundles = []
used = set()
for i in range(n_features):
if i in used:
continue
bundle = [i]
used.add(i)
for j in range(i + 1, n_features):
if j in used:
continue
can_add = True
for k in bundle:
if conflict_matrix[k, j] > max_conflict_rate:
can_add = False
break
if can_add:
bundle.append(j)
used.add(j)
bundles.append(bundle)
return bundles
bundles = greedy_feature_bundling(conflict_matrix, max_conflict_rate=0.1)
print(f"特征捆绑: {bundles}")
特征合并 #
python
def merge_features(feature_matrix, bundles):
"""
合并捆绑的特征
Args:
feature_matrix: 特征矩阵
bundles: 特征捆绑列表
Returns:
merged_features: 合并后的特征矩阵
"""
n_samples = feature_matrix.shape[0]
merged_features = np.zeros((n_samples, len(bundles)))
for bundle_idx, bundle in enumerate(bundles):
offset = 0
for feature_idx in bundle:
feature = feature_matrix[:, feature_idx]
merged_features[:, bundle_idx] += feature * (feature != 0) + offset * (feature == 0)
offset += np.max(feature) + 1
return merged_features
merged = merge_features(features, bundles)
print(f"原始特征形状: {features.shape}")
print(f"合并后特征形状: {merged.shape}")
print(f"合并后特征:\n{merged}")
完整实现 #
python
import numpy as np
class ExclusiveFeatureBundler:
"""互斥特征捆绑器"""
def __init__(self, max_conflict_rate=0.1):
self.max_conflict_rate = max_conflict_rate
self.bundles = None
self.offsets = None
def fit(self, feature_matrix):
"""
学习特征捆绑
Args:
feature_matrix: 特征矩阵 (n_samples, n_features)
"""
conflict_matrix = self._build_conflict_matrix(feature_matrix)
self.bundles = self._greedy_bundling(conflict_matrix)
self.offsets = self._calculate_offsets(feature_matrix, self.bundles)
def transform(self, feature_matrix):
"""
转换特征矩阵
Args:
feature_matrix: 特征矩阵
Returns:
merged_features: 合并后的特征矩阵
"""
n_samples = feature_matrix.shape[0]
merged_features = np.zeros((n_samples, len(self.bundles)))
for bundle_idx, bundle in enumerate(self.bundles):
for feature_idx in bundle:
feature = feature_matrix[:, feature_idx]
offset = self.offsets[bundle_idx][feature_idx]
mask = feature != 0
merged_features[:, bundle_idx] += feature * mask + offset * (~mask)
return merged_features
def fit_transform(self, feature_matrix):
"""学习并转换"""
self.fit(feature_matrix)
return self.transform(feature_matrix)
def _build_conflict_matrix(self, feature_matrix):
"""构建冲突矩阵"""
n_features = feature_matrix.shape[1]
conflict_matrix = np.zeros((n_features, n_features))
for i in range(n_features):
for j in range(i + 1, n_features):
conflict = self._calculate_conflict(
feature_matrix[:, i],
feature_matrix[:, j]
)
conflict_matrix[i, j] = conflict
conflict_matrix[j, i] = conflict
return conflict_matrix
def _calculate_conflict(self, feature_i, feature_j):
"""计算冲突率"""
non_zero_i = feature_i != 0
non_zero_j = feature_j != 0
conflict = np.sum(non_zero_i & non_zero_j)
total = np.sum(non_zero_i | non_zero_j)
return conflict / total if total > 0 else 0
def _greedy_bundling(self, conflict_matrix):
"""贪婪捆绑"""
n_features = conflict_matrix.shape[0]
bundles = []
used = set()
for i in range(n_features):
if i in used:
continue
bundle = [i]
used.add(i)
for j in range(i + 1, n_features):
if j in used:
continue
can_add = True
for k in bundle:
if conflict_matrix[k, j] > self.max_conflict_rate:
can_add = False
break
if can_add:
bundle.append(j)
used.add(j)
bundles.append(bundle)
return bundles
def _calculate_offsets(self, feature_matrix, bundles):
"""计算偏移量"""
offsets = {}
for bundle_idx, bundle in enumerate(bundles):
offsets[bundle_idx] = {}
cumulative_offset = 0
for feature_idx in bundle:
offsets[bundle_idx][feature_idx] = cumulative_offset
max_val = np.max(feature_matrix[:, feature_idx])
cumulative_offset += max_val + 1
return offsets
性能分析 #
特征压缩效果 #
python
import matplotlib.pyplot as plt
def analyze_feature_compression(n_samples=10000, n_features=100, sparsity=0.9):
"""分析特征压缩效果"""
feature_matrix = np.random.randn(n_samples, n_features) * (np.random.rand(n_samples, n_features) > sparsity)
bundler = ExclusiveFeatureBundler(max_conflict_rate=0.1)
bundler.fit(feature_matrix)
original_features = n_features
compressed_features = len(bundler.bundles)
compression_ratio = (1 - compressed_features / original_features) * 100
print(f"原始特征数: {original_features}")
print(f"压缩后特征数: {compressed_features}")
print(f"压缩率: {compression_ratio:.1f}%")
bundle_sizes = [len(bundle) for bundle in bundler.bundles]
plt.figure(figsize=(10, 5))
plt.hist(bundle_sizes, bins=20, edgecolor='black')
plt.xlabel('捆绑大小')
plt.ylabel('频数')
plt.title(f'特征捆绑分布 (平均捆绑大小: {np.mean(bundle_sizes):.1f})')
plt.grid(True, alpha=0.3)
plt.show()
analyze_feature_compression()
计算复杂度 #
text
┌─────────────────────────────────────────────────────────────┐
│ 计算复杂度对比 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 传统方法: │
│ - 分裂搜索: O(#data × #feature) │
│ - 总计: O(#data × #feature × #iterations) │
│ │
│ EFB 方法: │
│ - 特征捆绑: O(#feature²) (一次性) │
│ - 分裂搜索: O(#data × #bundle) │
│ - 总计: O(#data × #bundle × #iterations) │
│ │
│ 加速比: #feature / #bundle │
│ 例如: 100 特征 → 20 捆绑,加速 5 倍 │
│ │
└─────────────────────────────────────────────────────────────┘
精度影响 #
python
def compare_efb_accuracy(X, y, conflict_rates=[0.0, 0.05, 0.1, 0.2]):
"""比较 EFB 对精度的影响"""
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeRegressor
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
results = []
tree = DecisionTreeRegressor(max_depth=5)
tree.fit(X_train, y_train)
baseline_mse = np.mean((y_test - tree.predict(X_test)) ** 2)
results.append({'method': 'Baseline', 'mse': baseline_mse, 'features': X.shape[1]})
for conflict_rate in conflict_rates:
bundler = ExclusiveFeatureBundler(max_conflict_rate=conflict_rate)
X_train_bundled = bundler.fit_transform(X_train)
X_test_bundled = bundler.transform(X_test)
tree = DecisionTreeRegressor(max_depth=5)
tree.fit(X_train_bundled, y_train)
mse = np.mean((y_test - tree.predict(X_test_bundled)) ** 2)
results.append({
'method': f'EFB({conflict_rate})',
'mse': mse,
'features': X_train_bundled.shape[1]
})
import pandas as pd
results_df = pd.DataFrame(results)
print(results_df)
X = np.random.randn(1000, 50) * (np.random.rand(1000, 50) > 0.8)
y = 2 * X[:, 0] + 3 * X[:, 1] + np.random.randn(1000) * 0.1
compare_efb_accuracy(X, y)
参数调优 #
max_conflict_rate 参数 #
python
def tune_conflict_rate(X, y):
"""调优 max_conflict_rate 参数"""
conflict_rates = [0.0, 0.01, 0.05, 0.1, 0.15, 0.2]
mses = []
feature_counts = []
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeRegressor
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
for conflict_rate in conflict_rates:
bundler = ExclusiveFeatureBundler(max_conflict_rate=conflict_rate)
X_train_bundled = bundler.fit_transform(X_train)
X_test_bundled = bundler.transform(X_test)
tree = DecisionTreeRegressor(max_depth=5)
tree.fit(X_train_bundled, y_train)
mse = np.mean((y_test - tree.predict(X_test_bundled)) ** 2)
mses.append(mse)
feature_counts.append(X_train_bundled.shape[1])
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
axes[0].plot(conflict_rates, mses, 'o-')
axes[0].set_xlabel('max_conflict_rate')
axes[0].set_ylabel('MSE')
axes[0].set_title('MSE vs max_conflict_rate')
axes[1].plot(conflict_rates, feature_counts, 'o-')
axes[1].set_xlabel('max_conflict_rate')
axes[1].set_ylabel('特征数量')
axes[1].set_title('特征数量 vs max_conflict_rate')
plt.tight_layout()
plt.show()
EFB 的优缺点 #
优点 #
text
✅ 减少特征维度
- 捆绑互斥特征
- 加速训练过程
✅ 零损失压缩
- 互斥特征可无损合并
- 保持模型精度
✅ 适合稀疏特征
- 独热编码特征
- 文本特征
✅ 自动化处理
- 无需手动特征选择
- 自动发现互斥特征
缺点 #
text
⚠️ 仅适合稀疏特征
- 密集特征效果有限
- 可能增加计算量
⚠️ 参数调优
- max_conflict_rate 需要调优
- 不同数据最优值不同
⚠️ 冲突处理
- 允许一定冲突
- 可能影响精度
与其他优化组合 #
EFB + 直方图 #
python
class HistogramWithEFB:
"""结合直方图和 EFB 的决策树"""
def __init__(self, max_bin=255, max_depth=3, max_conflict_rate=0.1):
self.max_bin = max_bin
self.max_depth = max_depth
self.max_conflict_rate = max_conflict_rate
self.bundler = ExclusiveFeatureBundler(max_conflict_rate)
def fit(self, X, y):
X_bundled = self.bundler.fit_transform(X)
pass
def predict(self, X):
X_bundled = self.bundler.transform(X)
pass
下一步 #
现在你已经理解了 EFB 算法,接下来学习 特征工程,了解如何在实践中应用 LightGBM 的各种特性!
最后更新:2026-04-04