GOSS 算法 #

什么是 GOSS? #

GOSS(Gradient-based One-Side Sampling,梯度单边采样)是 LightGBM 的核心优化算法之一。它通过保留大梯度样本(对学习重要的样本)和随机采样小梯度样本,在保持精度的同时大幅减少计算量。

核心思想 #

text
┌─────────────────────────────────────────────────────────────┐
│                    GOSS 核心思想                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  样本梯度大小反映了样本对模型学习的重要性:                   │
│                                                             │
│  - 大梯度样本:模型预测误差大,对学习重要                    │
│  - 小梯度样本:模型预测误差小,对学习不重要                  │
│                                                             │
│  GOSS 策略:                                                 │
│  1. 保留所有大梯度样本(top a%)                            │
│  2. 随机采样小梯度样本(b%)                                │
│  3. 对小梯度样本乘以权重 (1-a)/b                            │
│                                                             │
│  结果:                                                      │
│  - 减少样本数量,加速训练                                   │
│  - 保持数据分布,不损失精度                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

算法原理 #

采样策略 #

python
import numpy as np

def goss_sampling(gradients, a=0.1, b=0.1):
    """
    GOSS 采样
    
    Args:
        gradients: 梯度数组
        a: 保留大梯度样本的比例
        b: 随机采样小梯度样本的比例
    
    Returns:
        selected_indices: 选中的样本索引
        weights: 样本权重
    """
    n_samples = len(gradients)
    abs_gradients = np.abs(gradients)
    
    n_top = int(n_samples * a)
    top_indices = np.argsort(abs_gradients)[-n_top:]
    
    remaining_indices = np.setdiff1d(np.arange(n_samples), top_indices)
    n_random = int(len(remaining_indices) * b)
    random_indices = np.random.choice(
        remaining_indices, n_random, replace=False
    )
    
    selected_indices = np.concatenate([top_indices, random_indices])
    
    weights = np.ones(n_samples)
    weights[random_indices] = (1 - a) / b
    
    return selected_indices, weights

gradients = np.array([0.1, 0.5, 0.01, 0.8, 0.02, 0.3, 0.001, 0.6])
selected_idx, weights = goss_sampling(gradients, a=0.25, b=0.25)

print(f"原始梯度: {gradients}")
print(f"选中索引: {selected_idx}")
print(f"选中样本梯度: {gradients[selected_idx]}")
print(f"样本权重: {weights[selected_idx]}")

信息增益计算 #

python
def calculate_gain_with_goss(gradients, hessians, selected_indices, weights, 
                             split_indices, lambda_l2=0.1):
    """
    考虑 GOSS 采样的信息增益计算
    
    Args:
        gradients: 梯度数组
        hessians: Hessian 数组
        selected_indices: 选中的样本索引
        weights: 样本权重
        split_indices: 分裂后的左右节点索引
        lambda_l2: L2 正则化参数
    
    Returns:
        gain: 信息增益
    """
    left_idx, right_idx = split_indices
    
    left_grad = np.sum(gradients[selected_indices[left_idx]] * 
                      weights[selected_indices[left_idx]])
    left_hess = np.sum(hessians[selected_indices[left_idx]] * 
                      weights[selected_indices[left_idx]])
    
    right_grad = np.sum(gradients[selected_indices[right_idx]] * 
                       weights[selected_indices[right_idx]])
    right_hess = np.sum(hessians[selected_indices[right_idx]] * 
                       weights[selected_indices[right_idx]])
    
    total_grad = left_grad + right_grad
    total_hess = left_hess + right_hess
    
    left_score = (left_grad ** 2) / (left_hess + lambda_l2)
    right_score = (right_grad ** 2) / (right_hess + lambda_l2)
    total_score = (total_grad ** 2) / (total_hess + lambda_l2)
    
    gain = left_score + right_score - total_score
    
    return gain

完整实现 #

python
import numpy as np

class GOSSTree:
    """使用 GOSS 的决策树"""
    
    def __init__(self, max_depth=3, min_samples=10, 
                 top_rate=0.2, other_rate=0.1, lambda_l2=0.1):
        self.max_depth = max_depth
        self.min_samples = min_samples
        self.top_rate = top_rate
        self.other_rate = other_rate
        self.lambda_l2 = lambda_l2
        self.tree = None
    
    def fit(self, X, gradients, hessians):
        """训练决策树"""
        self.tree = self._build_tree(X, gradients, hessians, depth=0)
    
    def _build_tree(self, X, gradients, hessians, depth):
        """递归构建树"""
        if depth >= self.max_depth or len(gradients) < self.min_samples:
            return {
                'leaf': True,
                'value': -np.sum(gradients) / (np.sum(hessians) + self.lambda_l2)
            }
        
        selected_idx, weights = self._goss_sample(gradients)
        
        best_feature, best_threshold, best_gain = self._find_best_split(
            X[selected_idx], gradients[selected_idx], 
            hessians[selected_idx], weights[selected_idx]
        )
        
        if best_gain <= 0:
            return {
                'leaf': True,
                'value': -np.sum(gradients) / (np.sum(hessians) + self.lambda_l2)
            }
        
        left_mask = X[:, best_feature] <= best_threshold
        right_mask = ~left_mask
        
        return {
            'leaf': False,
            'feature': best_feature,
            'threshold': best_threshold,
            'left': self._build_tree(
                X[left_mask], gradients[left_mask], 
                hessians[left_mask], depth + 1
            ),
            'right': self._build_tree(
                X[right_mask], gradients[right_mask], 
                hessians[right_mask], depth + 1
            )
        }
    
    def _goss_sample(self, gradients):
        """GOSS 采样"""
        n_samples = len(gradients)
        abs_gradients = np.abs(gradients)
        
        n_top = int(n_samples * self.top_rate)
        top_indices = np.argsort(abs_gradients)[-n_top:]
        
        remaining_indices = np.setdiff1d(np.arange(n_samples), top_indices)
        n_random = int(len(remaining_indices) * self.other_rate)
        random_indices = np.random.choice(
            remaining_indices, n_random, replace=False
        )
        
        selected_indices = np.concatenate([top_indices, random_indices])
        
        weights = np.ones(n_samples)
        weights[random_indices] = (1 - self.top_rate) / self.other_rate
        
        return selected_indices, weights
    
    def _find_best_split(self, X, gradients, hessians, weights):
        """找最佳分裂点"""
        best_gain = -np.inf
        best_feature = None
        best_threshold = None
        
        n_features = X.shape[1]
        
        for feature in range(n_features):
            thresholds = np.unique(X[:, feature])
            
            for threshold in thresholds:
                left_mask = X[:, feature] <= threshold
                right_mask = ~left_mask
                
                if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
                    continue
                
                gain = self._calculate_gain(
                    gradients, hessians, weights, left_mask, right_mask
                )
                
                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_threshold = threshold
        
        return best_feature, best_threshold, best_gain
    
    def _calculate_gain(self, gradients, hessians, weights, left_mask, right_mask):
        """计算信息增益"""
        left_grad = np.sum(gradients[left_mask] * weights[left_mask])
        left_hess = np.sum(hessians[left_mask] * weights[left_mask])
        
        right_grad = np.sum(gradients[right_mask] * weights[right_mask])
        right_hess = np.sum(hessians[right_mask] * weights[right_mask])
        
        total_grad = left_grad + right_grad
        total_hess = left_hess + right_hess
        
        left_score = (left_grad ** 2) / (left_hess + self.lambda_l2)
        right_score = (right_grad ** 2) / (right_hess + self.lambda_l2)
        total_score = (total_grad ** 2) / (total_hess + self.lambda_l2)
        
        return left_score + right_score - total_score
    
    def predict(self, X):
        """预测"""
        predictions = np.zeros(len(X))
        for i in range(len(X)):
            predictions[i] = self._predict_single(X[i], self.tree)
        return predictions
    
    def _predict_single(self, x, node):
        """单个样本预测"""
        if node['leaf']:
            return node['value']
        
        if x[node['feature']] <= node['threshold']:
            return self._predict_single(x, node['left'])
        else:
            return self._predict_single(x, node['right'])

性能分析 #

采样效果 #

python
import matplotlib.pyplot as plt

def analyze_goss_sampling(n_samples=10000, top_rate=0.1, other_rate=0.1):
    """分析 GOSS 采样效果"""
    gradients = np.random.randn(n_samples)
    
    abs_gradients = np.abs(gradients)
    n_top = int(n_samples * top_rate)
    top_indices = np.argsort(abs_gradients)[-n_top:]
    
    remaining_indices = np.setdiff1d(np.arange(n_samples), top_indices)
    n_random = int(len(remaining_indices) * other_rate)
    random_indices = np.random.choice(
        remaining_indices, n_random, replace=False
    )
    
    selected_indices = np.concatenate([top_indices, random_indices])
    
    fig, axes = plt.subplots(1, 3, figsize=(15, 4))
    
    axes[0].hist(abs_gradients, bins=50, alpha=0.7, label='所有样本')
    axes[0].set_xlabel('|梯度|')
    axes[0].set_ylabel('频数')
    axes[0].set_title('梯度分布')
    axes[0].legend()
    
    axes[1].hist(abs_gradients[top_indices], bins=20, alpha=0.7, 
                label=f'大梯度样本 (top {top_rate*100}%)')
    axes[1].hist(abs_gradients[random_indices], bins=20, alpha=0.7, 
                label=f'随机采样 ({other_rate*100}%)')
    axes[1].set_xlabel('|梯度|')
    axes[1].set_ylabel('频数')
    axes[1].set_title('采样分布')
    axes[1].legend()
    
    original_size = n_samples
    sampled_size = len(selected_indices)
    reduction = (1 - sampled_size / original_size) * 100
    
    axes[2].bar(['原始样本', '采样后'], [original_size, sampled_size])
    axes[2].set_ylabel('样本数量')
    axes[2].set_title(f'样本减少: {reduction:.1f}%')
    
    plt.tight_layout()
    plt.show()

analyze_goss_sampling()

计算复杂度 #

text
┌─────────────────────────────────────────────────────────────┐
│                    计算复杂度对比                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  传统方法:                                                   │
│  - 每次分裂: O(#data × #feature)                            │
│  - 总计: O(#data × #feature × #iterations)                  │
│                                                             │
│  GOSS 方法:                                                  │
│  - 采样: O(#data × log(#data))                              │
│  - 每次分裂: O((a+b) × #data × #feature)                    │
│  - 总计: O((a+b) × #data × #feature × #iterations)          │
│                                                             │
│  加速比: 1 / (a + b)                                        │
│  例如: a=0.1, b=0.1 → 加速 5 倍                             │
│                                                             │
└─────────────────────────────────────────────────────────────┘

精度影响 #

python
def compare_goss_accuracy(X, y, top_rates=[0.1, 0.2, 0.3], 
                         other_rates=[0.1, 0.2, 0.3]):
    """比较 GOSS 对精度的影响"""
    from sklearn.model_selection import train_test_split
    from sklearn.tree import DecisionTreeRegressor
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    
    gradients = y_train - np.mean(y_train)
    hessians = np.ones(len(y_train))
    
    results = []
    
    tree = DecisionTreeRegressor(max_depth=5)
    tree.fit(X_train, y_train)
    baseline_mse = np.mean((y_test - tree.predict(X_test)) ** 2)
    results.append({'method': 'Baseline', 'mse': baseline_mse})
    
    for top_rate in top_rates:
        for other_rate in other_rates:
            goss_tree = GOSSTree(
                max_depth=5, 
                top_rate=top_rate, 
                other_rate=other_rate
            )
            goss_tree.fit(X_train, gradients, hessians)
            y_pred = goss_tree.predict(X_test)
            mse = np.mean((y_test - y_pred) ** 2)
            
            results.append({
                'method': f'GOSS({top_rate},{other_rate})',
                'mse': mse
            })
    
    import pandas as pd
    results_df = pd.DataFrame(results)
    print(results_df)

X = np.random.randn(1000, 10)
y = 2 * X[:, 0] + 3 * X[:, 1] + np.random.randn(1000) * 0.1
compare_goss_accuracy(X, y)

参数调优 #

top_rate 参数 #

python
def tune_top_rate(X, y):
    """调优 top_rate 参数"""
    top_rates = [0.05, 0.1, 0.15, 0.2, 0.25, 0.3]
    mses = []
    times = []
    
    gradients = y - np.mean(y)
    hessians = np.ones(len(y))
    
    for top_rate in top_rates:
        start = time.time()
        tree = GOSSTree(max_depth=5, top_rate=top_rate, other_rate=0.1)
        tree.fit(X, gradients, hessians)
        elapsed = time.time() - start
        
        y_pred = tree.predict(X)
        mse = np.mean((y - y_pred) ** 2)
        
        mses.append(mse)
        times.append(elapsed)
    
    fig, axes = plt.subplots(1, 2, figsize=(12, 5))
    
    axes[0].plot(top_rates, mses, 'o-')
    axes[0].set_xlabel('top_rate')
    axes[0].set_ylabel('MSE')
    axes[0].set_title('MSE vs top_rate')
    
    axes[1].plot(top_rates, times, 'o-')
    axes[1].set_xlabel('top_rate')
    axes[1].set_ylabel('训练时间 (s)')
    axes[1].set_title('训练时间 vs top_rate')
    
    plt.tight_layout()
    plt.show()

GOSS 的优缺点 #

优点 #

text
✅ 减少计算量
   - 采样减少样本数量
   - 加速训练过程

✅ 保持精度
   - 保留重要样本
   - 权重补偿采样

✅ 自适应采样
   - 根据梯度动态调整
   - 关注难样本

✅ 理论保证
   - 有理论误差分析
   - 保证精度损失可控

缺点 #

text
⚠️ 参数调优
   - top_rate 和 other_rate 需要调优
   - 不同数据集最优参数不同

⚠️ 随机性
   - 随机采样引入随机性
   - 结果可能不稳定

⚠️ 小数据集效果有限
   - 数据量小时优势不明显
   - 可能降低精度

下一步 #

现在你已经理解了 GOSS 算法,接下来学习 EFB 算法,了解 LightGBM 如何通过特征捆绑进一步优化!

最后更新:2026-04-04