数据管道 #

数据管道概述 #

数据管道是将数据处理步骤组织成可复用、可维护的流程,是现代数据分析的核心技能。

text
┌─────────────────────────────────────────────────────────────┐
│                    数据管道架构                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  原始数据                                                   │
│     │                                                       │
│     ▼                                                       │
│  ┌─────────────┐                                            │
│  │  数据提取    │  Extract                                  │
│  └─────────────┘                                            │
│     │                                                       │
│     ▼                                                       │
│  ┌─────────────┐                                            │
│  │  数据转换    │  Transform                                │
│  └─────────────┘                                            │
│     │                                                       │
│     ▼                                                       │
│  ┌─────────────┐                                            │
│  │  数据加载    │  Load                                     │
│  └─────────────┘                                            │
│     │                                                       │
│     ▼                                                       │
│  处理后数据                                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

基本管道结构 #

使用函数构建管道 #

python
import pandas as pd
import numpy as np

def extract_data(source):
    """数据提取"""
    return pd.read_csv(source)

def clean_data(df):
    """数据清洗"""
    df = df.drop_duplicates()
    df = df.dropna()
    return df

def transform_data(df):
    """数据转换"""
    df['total'] = df['quantity'] * df['price']
    df['date'] = pd.to_datetime(df['date'])
    return df

def load_data(df, destination):
    """数据加载"""
    df.to_csv(destination, index=False)
    return df

# 执行管道
df = extract_data('raw_data.csv')
df = clean_data(df)
df = transform_data(df)
df = load_data(df, 'processed_data.csv')

使用 pipe 方法 #

python
def remove_duplicates(df):
    return df.drop_duplicates()

def fill_missing(df, strategy='mean'):
    return df.fillna(df.mean(numeric_only=True))

def add_features(df):
    df = df.copy()
    df['total'] = df['quantity'] * df['price']
    return df

# 链式管道
df = (pd.read_csv('data.csv')
    .pipe(remove_duplicates)
    .pipe(fill_missing, strategy='mean')
    .pipe(add_features)
)

模块化管道 #

创建管道类 #

python
class DataPipeline:
    def __init__(self):
        self.steps = []
    
    def add_step(self, name, func, **kwargs):
        self.steps.append({'name': name, 'func': func, 'kwargs': kwargs})
        return self
    
    def run(self, df):
        for step in self.steps:
            print(f"执行步骤: {step['name']}")
            df = step['func'](df, **step['kwargs'])
        return df

# 定义处理函数
def remove_duplicates(df):
    return df.drop_duplicates()

def handle_missing(df, columns=None):
    if columns:
        df[columns] = df[columns].fillna(df[columns].mean())
    else:
        df = df.fillna(df.mean(numeric_only=True))
    return df

def normalize_columns(df, columns):
    for col in columns:
        df[col] = (df[col] - df[col].min()) / (df[col].max() - df[col].min())
    return df

# 使用管道
pipeline = DataPipeline()
pipeline.add_step('去重', remove_duplicates)
pipeline.add_step('处理缺失值', handle_missing, columns=['price', 'quantity'])
pipeline.add_step('标准化', normalize_columns, columns=['price'])

df = pd.read_csv('data.csv')
df_processed = pipeline.run(df)

使用 sklearn Pipeline #

python
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer

# 创建 sklearn 管道
pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler())
])

# 只适用于数值数据
numeric_data = df[['price', 'quantity']]
processed = pipeline.fit_transform(numeric_data)

ETL 管道示例 #

完整 ETL 管道 #

python
import pandas as pd
import numpy as np
from datetime import datetime

class SalesETL:
    def __init__(self, config):
        self.config = config
        self.log = []
    
    def _log(self, message):
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        self.log.append(f"[{timestamp}] {message}")
        print(f"[{timestamp}] {message}")
    
    def extract(self):
        self._log("开始数据提取")
        df = pd.read_csv(self.config['source'])
        self._log(f"提取 {len(df)} 条记录")
        return df
    
    def transform(self, df):
        self._log("开始数据转换")
        
        # 去重
        before = len(df)
        df = df.drop_duplicates()
        self._log(f"去重: {before - len(df)} 条记录")
        
        # 处理缺失值
        df = df.dropna(subset=['date', 'product'])
        df['quantity'] = df['quantity'].fillna(0)
        df['price'] = df['price'].fillna(df['price'].median())
        
        # 数据类型转换
        df['date'] = pd.to_datetime(df['date'])
        df['quantity'] = df['quantity'].astype(int)
        
        # 计算派生字段
        df['revenue'] = df['quantity'] * df['price']
        df['year'] = df['date'].dt.year
        df['month'] = df['date'].dt.month
        
        self._log(f"转换后 {len(df)} 条记录")
        return df
    
    def load(self, df):
        self._log("开始数据加载")
        df.to_csv(self.config['destination'], index=False)
        self._log(f"保存到 {self.config['destination']}")
        return df
    
    def run(self):
        self._log("ETL 管道开始")
        df = self.extract()
        df = self.transform(df)
        df = self.load(df)
        self._log("ETL 管道完成")
        return df

# 使用 ETL 管道
config = {
    'source': 'raw_sales.csv',
    'destination': 'processed_sales.csv'
}

etl = SalesETL(config)
df = etl.run()

数据验证管道 #

python
class DataValidator:
    def __init__(self):
        self.rules = []
        self.errors = []
    
    def add_rule(self, name, func, message):
        self.rules.append({
            'name': name,
            'func': func,
            'message': message
        })
    
    def validate(self, df):
        self.errors = []
        for rule in self.rules:
            if not rule['func'](df):
                self.errors.append({
                    'rule': rule['name'],
                    'message': rule['message']
                })
        return len(self.errors) == 0
    
    def get_errors(self):
        return self.errors

# 定义验证规则
validator = DataValidator()

validator.add_rule(
    'no_nulls',
    lambda df: not df['id'].isna().any(),
    'ID 列不能有空值'
)

validator.add_rule(
    'positive_quantity',
    lambda df: (df['quantity'] >= 0).all(),
    '数量必须为非负数'
)

validator.add_rule(
    'valid_date',
    lambda df: pd.to_datetime(df['date'], errors='coerce').notna().all(),
    '日期格式无效'
)

# 执行验证
df = pd.read_csv('data.csv')
if validator.validate(df):
    print("数据验证通过")
else:
    print("数据验证失败:")
    for error in validator.get_errors():
        print(f"  - {error['rule']}: {error['message']}")

增量处理管道 #

python
class IncrementalPipeline:
    def __init__(self, checkpoint_file):
        self.checkpoint_file = checkpoint_file
        self.last_processed = self._load_checkpoint()
    
    def _load_checkpoint(self):
        try:
            with open(self.checkpoint_file, 'r') as f:
                return f.read().strip()
        except FileNotFoundError:
            return None
    
    def _save_checkpoint(self, value):
        with open(self.checkpoint_file, 'w') as f:
            f.write(str(value))
    
    def process_incremental(self, df, date_column='date'):
        if self.last_processed:
            df = df[df[date_column] > self.last_processed]
        
        if len(df) > 0:
            # 处理数据
            processed = self._process(df)
            
            # 更新检查点
            self._save_checkpoint(df[date_column].max())
            return processed
        return None
    
    def _process(self, df):
        # 数据处理逻辑
        return df

# 使用增量管道
pipeline = IncrementalPipeline('checkpoint.txt')
new_data = pipeline.process_incremental(df)

并行处理管道 #

python
from multiprocessing import Pool
import pandas as pd

def process_chunk(chunk):
    # 处理数据块
    chunk['total'] = chunk['quantity'] * chunk['price']
    return chunk

def parallel_process(df, n_workers=4):
    chunks = np.array_split(df, n_workers)
    
    with Pool(n_workers) as pool:
        results = pool.map(process_chunk, chunks)
    
    return pd.concat(results)

# 使用并行处理
df = pd.read_csv('large_data.csv')
df_processed = parallel_process(df)

管道最佳实践 #

text
┌─────────────────────────────────────────────────────────────┐
│                    管道最佳实践                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  设计原则                                                   │
│  ├── 单一职责:每个步骤只做一件事                           │
│  ├── 可测试性:每个步骤可独立测试                           │
│  ├── 可配置性:参数可配置                                   │
│  └── 可监控性:记录日志和指标                               │
│                                                             │
│  错误处理                                                   │
│  ├── 数据验证:处理前验证数据                               │
│  ├── 异常捕获:捕获并记录异常                               │
│  └── 回滚机制:支持失败回滚                                 │
│                                                             │
│  性能优化                                                   │
│  ├── 批处理:大数据分块处理                                 │
│  ├── 并行化:利用多核处理                                   │
│  └── 增量处理:只处理新数据                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

下一步 #

掌握数据管道后,接下来学习 最佳实践,了解 Pandas 编码规范和常见陷阱!

最后更新:2026-04-04