EFB 算法 #

什么是 EFB? #

EFB(Exclusive Feature Bundling,互斥特征捆绑)是 LightGBM 的核心优化算法之一。它通过将互斥特征(很少同时非零的特征)捆绑在一起,减少特征维度,从而加速训练过程。

核心思想 #

text
┌─────────────────────────────────────────────────────────────┐
│                    EFB 核心思想                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  稀疏特征空间中,很多特征是互斥的:                          │
│                                                             │
│  - 互斥特征:很少同时非零                                   │
│  - 可以合并为一个特征                                       │
│  - 零损失降维                                               │
│                                                             │
│  示例:                                                      │
│  特征 A: [1, 0, 0, 2, 0, 0]                                 │
│  特征 B: [0, 3, 0, 0, 4, 0]                                 │
│  特征 C: [0, 0, 5, 0, 0, 6]                                 │
│                                                             │
│  捆绑后:                                                    │
│  特征 A+B+C: [1, 3, 5, 2, 4, 6]                             │
│                                                             │
│  特征数量: 3 → 1                                            │
│  加速: 3 倍                                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

算法原理 #

互斥特征检测 #

python
import numpy as np

def calculate_conflict_rate(feature_i, feature_j):
    """
    计算两个特征的冲突率
    
    Args:
        feature_i: 特征 i
        feature_j: 特征 j
    
    Returns:
        conflict_rate: 冲突率
    """
    non_zero_i = feature_i != 0
    non_zero_j = feature_j != 0
    
    conflict = np.sum(non_zero_i & non_zero_j)
    total = np.sum(non_zero_i | non_zero_j)
    
    if total == 0:
        return 0
    
    return conflict / total

def build_conflict_matrix(feature_matrix):
    """
    构建特征冲突矩阵
    
    Args:
        feature_matrix: 特征矩阵 (n_samples, n_features)
    
    Returns:
        conflict_matrix: 冲突矩阵
    """
    n_features = feature_matrix.shape[1]
    conflict_matrix = np.zeros((n_features, n_features))
    
    for i in range(n_features):
        for j in range(i + 1, n_features):
            conflict = calculate_conflict_rate(
                feature_matrix[:, i], 
                feature_matrix[:, j]
            )
            conflict_matrix[i, j] = conflict
            conflict_matrix[j, i] = conflict
    
    return conflict_matrix

features = np.array([
    [1, 0, 0, 2, 0, 0],
    [0, 3, 0, 0, 4, 0],
    [0, 0, 5, 0, 0, 6]
]).T

conflict_matrix = build_conflict_matrix(features)
print("冲突矩阵:")
print(conflict_matrix)

贪婪特征捆绑 #

python
def greedy_feature_bundling(conflict_matrix, max_conflict_rate=0.1):
    """
    贪婪特征捆绑算法
    
    Args:
        conflict_matrix: 冲突矩阵
        max_conflict_rate: 最大冲突率阈值
    
    Returns:
        bundles: 特征捆绑列表
    """
    n_features = conflict_matrix.shape[0]
    bundles = []
    used = set()
    
    for i in range(n_features):
        if i in used:
            continue
        
        bundle = [i]
        used.add(i)
        
        for j in range(i + 1, n_features):
            if j in used:
                continue
            
            can_add = True
            for k in bundle:
                if conflict_matrix[k, j] > max_conflict_rate:
                    can_add = False
                    break
            
            if can_add:
                bundle.append(j)
                used.add(j)
        
        bundles.append(bundle)
    
    return bundles

bundles = greedy_feature_bundling(conflict_matrix, max_conflict_rate=0.1)
print(f"特征捆绑: {bundles}")

特征合并 #

python
def merge_features(feature_matrix, bundles):
    """
    合并捆绑的特征
    
    Args:
        feature_matrix: 特征矩阵
        bundles: 特征捆绑列表
    
    Returns:
        merged_features: 合并后的特征矩阵
    """
    n_samples = feature_matrix.shape[0]
    merged_features = np.zeros((n_samples, len(bundles)))
    
    for bundle_idx, bundle in enumerate(bundles):
        offset = 0
        for feature_idx in bundle:
            feature = feature_matrix[:, feature_idx]
            merged_features[:, bundle_idx] += feature * (feature != 0) + offset * (feature == 0)
            offset += np.max(feature) + 1
    
    return merged_features

merged = merge_features(features, bundles)
print(f"原始特征形状: {features.shape}")
print(f"合并后特征形状: {merged.shape}")
print(f"合并后特征:\n{merged}")

完整实现 #

python
import numpy as np

class ExclusiveFeatureBundler:
    """互斥特征捆绑器"""
    
    def __init__(self, max_conflict_rate=0.1):
        self.max_conflict_rate = max_conflict_rate
        self.bundles = None
        self.offsets = None
    
    def fit(self, feature_matrix):
        """
        学习特征捆绑
        
        Args:
            feature_matrix: 特征矩阵 (n_samples, n_features)
        """
        conflict_matrix = self._build_conflict_matrix(feature_matrix)
        self.bundles = self._greedy_bundling(conflict_matrix)
        self.offsets = self._calculate_offsets(feature_matrix, self.bundles)
    
    def transform(self, feature_matrix):
        """
        转换特征矩阵
        
        Args:
            feature_matrix: 特征矩阵
        
        Returns:
            merged_features: 合并后的特征矩阵
        """
        n_samples = feature_matrix.shape[0]
        merged_features = np.zeros((n_samples, len(self.bundles)))
        
        for bundle_idx, bundle in enumerate(self.bundles):
            for feature_idx in bundle:
                feature = feature_matrix[:, feature_idx]
                offset = self.offsets[bundle_idx][feature_idx]
                
                mask = feature != 0
                merged_features[:, bundle_idx] += feature * mask + offset * (~mask)
        
        return merged_features
    
    def fit_transform(self, feature_matrix):
        """学习并转换"""
        self.fit(feature_matrix)
        return self.transform(feature_matrix)
    
    def _build_conflict_matrix(self, feature_matrix):
        """构建冲突矩阵"""
        n_features = feature_matrix.shape[1]
        conflict_matrix = np.zeros((n_features, n_features))
        
        for i in range(n_features):
            for j in range(i + 1, n_features):
                conflict = self._calculate_conflict(
                    feature_matrix[:, i], 
                    feature_matrix[:, j]
                )
                conflict_matrix[i, j] = conflict
                conflict_matrix[j, i] = conflict
        
        return conflict_matrix
    
    def _calculate_conflict(self, feature_i, feature_j):
        """计算冲突率"""
        non_zero_i = feature_i != 0
        non_zero_j = feature_j != 0
        
        conflict = np.sum(non_zero_i & non_zero_j)
        total = np.sum(non_zero_i | non_zero_j)
        
        return conflict / total if total > 0 else 0
    
    def _greedy_bundling(self, conflict_matrix):
        """贪婪捆绑"""
        n_features = conflict_matrix.shape[0]
        bundles = []
        used = set()
        
        for i in range(n_features):
            if i in used:
                continue
            
            bundle = [i]
            used.add(i)
            
            for j in range(i + 1, n_features):
                if j in used:
                    continue
                
                can_add = True
                for k in bundle:
                    if conflict_matrix[k, j] > self.max_conflict_rate:
                        can_add = False
                        break
                
                if can_add:
                    bundle.append(j)
                    used.add(j)
            
            bundles.append(bundle)
        
        return bundles
    
    def _calculate_offsets(self, feature_matrix, bundles):
        """计算偏移量"""
        offsets = {}
        
        for bundle_idx, bundle in enumerate(bundles):
            offsets[bundle_idx] = {}
            cumulative_offset = 0
            
            for feature_idx in bundle:
                offsets[bundle_idx][feature_idx] = cumulative_offset
                max_val = np.max(feature_matrix[:, feature_idx])
                cumulative_offset += max_val + 1
        
        return offsets

性能分析 #

特征压缩效果 #

python
import matplotlib.pyplot as plt

def analyze_feature_compression(n_samples=10000, n_features=100, sparsity=0.9):
    """分析特征压缩效果"""
    feature_matrix = np.random.randn(n_samples, n_features) * (np.random.rand(n_samples, n_features) > sparsity)
    
    bundler = ExclusiveFeatureBundler(max_conflict_rate=0.1)
    bundler.fit(feature_matrix)
    
    original_features = n_features
    compressed_features = len(bundler.bundles)
    compression_ratio = (1 - compressed_features / original_features) * 100
    
    print(f"原始特征数: {original_features}")
    print(f"压缩后特征数: {compressed_features}")
    print(f"压缩率: {compression_ratio:.1f}%")
    
    bundle_sizes = [len(bundle) for bundle in bundler.bundles]
    
    plt.figure(figsize=(10, 5))
    plt.hist(bundle_sizes, bins=20, edgecolor='black')
    plt.xlabel('捆绑大小')
    plt.ylabel('频数')
    plt.title(f'特征捆绑分布 (平均捆绑大小: {np.mean(bundle_sizes):.1f})')
    plt.grid(True, alpha=0.3)
    plt.show()

analyze_feature_compression()

计算复杂度 #

text
┌─────────────────────────────────────────────────────────────┐
│                    计算复杂度对比                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  传统方法:                                                   │
│  - 分裂搜索: O(#data × #feature)                            │
│  - 总计: O(#data × #feature × #iterations)                  │
│                                                             │
│  EFB 方法:                                                   │
│  - 特征捆绑: O(#feature²) (一次性)                          │
│  - 分裂搜索: O(#data × #bundle)                             │
│  - 总计: O(#data × #bundle × #iterations)                   │
│                                                             │
│  加速比: #feature / #bundle                                 │
│  例如: 100 特征 → 20 捆绑,加速 5 倍                        │
│                                                             │
└─────────────────────────────────────────────────────────────┘

精度影响 #

python
def compare_efb_accuracy(X, y, conflict_rates=[0.0, 0.05, 0.1, 0.2]):
    """比较 EFB 对精度的影响"""
    from sklearn.model_selection import train_test_split
    from sklearn.tree import DecisionTreeRegressor
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    
    results = []
    
    tree = DecisionTreeRegressor(max_depth=5)
    tree.fit(X_train, y_train)
    baseline_mse = np.mean((y_test - tree.predict(X_test)) ** 2)
    results.append({'method': 'Baseline', 'mse': baseline_mse, 'features': X.shape[1]})
    
    for conflict_rate in conflict_rates:
        bundler = ExclusiveFeatureBundler(max_conflict_rate=conflict_rate)
        X_train_bundled = bundler.fit_transform(X_train)
        X_test_bundled = bundler.transform(X_test)
        
        tree = DecisionTreeRegressor(max_depth=5)
        tree.fit(X_train_bundled, y_train)
        mse = np.mean((y_test - tree.predict(X_test_bundled)) ** 2)
        
        results.append({
            'method': f'EFB({conflict_rate})',
            'mse': mse,
            'features': X_train_bundled.shape[1]
        })
    
    import pandas as pd
    results_df = pd.DataFrame(results)
    print(results_df)

X = np.random.randn(1000, 50) * (np.random.rand(1000, 50) > 0.8)
y = 2 * X[:, 0] + 3 * X[:, 1] + np.random.randn(1000) * 0.1
compare_efb_accuracy(X, y)

参数调优 #

max_conflict_rate 参数 #

python
def tune_conflict_rate(X, y):
    """调优 max_conflict_rate 参数"""
    conflict_rates = [0.0, 0.01, 0.05, 0.1, 0.15, 0.2]
    mses = []
    feature_counts = []
    
    from sklearn.model_selection import train_test_split
    from sklearn.tree import DecisionTreeRegressor
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    
    for conflict_rate in conflict_rates:
        bundler = ExclusiveFeatureBundler(max_conflict_rate=conflict_rate)
        X_train_bundled = bundler.fit_transform(X_train)
        X_test_bundled = bundler.transform(X_test)
        
        tree = DecisionTreeRegressor(max_depth=5)
        tree.fit(X_train_bundled, y_train)
        mse = np.mean((y_test - tree.predict(X_test_bundled)) ** 2)
        
        mses.append(mse)
        feature_counts.append(X_train_bundled.shape[1])
    
    fig, axes = plt.subplots(1, 2, figsize=(12, 5))
    
    axes[0].plot(conflict_rates, mses, 'o-')
    axes[0].set_xlabel('max_conflict_rate')
    axes[0].set_ylabel('MSE')
    axes[0].set_title('MSE vs max_conflict_rate')
    
    axes[1].plot(conflict_rates, feature_counts, 'o-')
    axes[1].set_xlabel('max_conflict_rate')
    axes[1].set_ylabel('特征数量')
    axes[1].set_title('特征数量 vs max_conflict_rate')
    
    plt.tight_layout()
    plt.show()

EFB 的优缺点 #

优点 #

text
✅ 减少特征维度
   - 捆绑互斥特征
   - 加速训练过程

✅ 零损失压缩
   - 互斥特征可无损合并
   - 保持模型精度

✅ 适合稀疏特征
   - 独热编码特征
   - 文本特征

✅ 自动化处理
   - 无需手动特征选择
   - 自动发现互斥特征

缺点 #

text
⚠️ 仅适合稀疏特征
   - 密集特征效果有限
   - 可能增加计算量

⚠️ 参数调优
   - max_conflict_rate 需要调优
   - 不同数据最优值不同

⚠️ 冲突处理
   - 允许一定冲突
   - 可能影响精度

与其他优化组合 #

EFB + 直方图 #

python
class HistogramWithEFB:
    """结合直方图和 EFB 的决策树"""
    
    def __init__(self, max_bin=255, max_depth=3, max_conflict_rate=0.1):
        self.max_bin = max_bin
        self.max_depth = max_depth
        self.max_conflict_rate = max_conflict_rate
        self.bundler = ExclusiveFeatureBundler(max_conflict_rate)
    
    def fit(self, X, y):
        X_bundled = self.bundler.fit_transform(X)
        
        pass
    
    def predict(self, X):
        X_bundled = self.bundler.transform(X)
        
        pass

下一步 #

现在你已经理解了 EFB 算法,接下来学习 特征工程,了解如何在实践中应用 LightGBM 的各种特性!

最后更新:2026-04-04