数据管道 #
数据管道概述 #
数据管道是将数据处理步骤组织成可复用、可维护的流程,是现代数据分析的核心技能。
text
┌─────────────────────────────────────────────────────────────┐
│ 数据管道架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 原始数据 │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ 数据提取 │ Extract │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ 数据转换 │ Transform │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ 数据加载 │ Load │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ 处理后数据 │
│ │
└─────────────────────────────────────────────────────────────┘
基本管道结构 #
使用函数构建管道 #
python
import pandas as pd
import numpy as np
def extract_data(source):
"""数据提取"""
return pd.read_csv(source)
def clean_data(df):
"""数据清洗"""
df = df.drop_duplicates()
df = df.dropna()
return df
def transform_data(df):
"""数据转换"""
df['total'] = df['quantity'] * df['price']
df['date'] = pd.to_datetime(df['date'])
return df
def load_data(df, destination):
"""数据加载"""
df.to_csv(destination, index=False)
return df
# 执行管道
df = extract_data('raw_data.csv')
df = clean_data(df)
df = transform_data(df)
df = load_data(df, 'processed_data.csv')
使用 pipe 方法 #
python
def remove_duplicates(df):
return df.drop_duplicates()
def fill_missing(df, strategy='mean'):
return df.fillna(df.mean(numeric_only=True))
def add_features(df):
df = df.copy()
df['total'] = df['quantity'] * df['price']
return df
# 链式管道
df = (pd.read_csv('data.csv')
.pipe(remove_duplicates)
.pipe(fill_missing, strategy='mean')
.pipe(add_features)
)
模块化管道 #
创建管道类 #
python
class DataPipeline:
def __init__(self):
self.steps = []
def add_step(self, name, func, **kwargs):
self.steps.append({'name': name, 'func': func, 'kwargs': kwargs})
return self
def run(self, df):
for step in self.steps:
print(f"执行步骤: {step['name']}")
df = step['func'](df, **step['kwargs'])
return df
# 定义处理函数
def remove_duplicates(df):
return df.drop_duplicates()
def handle_missing(df, columns=None):
if columns:
df[columns] = df[columns].fillna(df[columns].mean())
else:
df = df.fillna(df.mean(numeric_only=True))
return df
def normalize_columns(df, columns):
for col in columns:
df[col] = (df[col] - df[col].min()) / (df[col].max() - df[col].min())
return df
# 使用管道
pipeline = DataPipeline()
pipeline.add_step('去重', remove_duplicates)
pipeline.add_step('处理缺失值', handle_missing, columns=['price', 'quantity'])
pipeline.add_step('标准化', normalize_columns, columns=['price'])
df = pd.read_csv('data.csv')
df_processed = pipeline.run(df)
使用 sklearn Pipeline #
python
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
# 创建 sklearn 管道
pipeline = Pipeline([
('imputer', SimpleImputer(strategy='mean')),
('scaler', StandardScaler())
])
# 只适用于数值数据
numeric_data = df[['price', 'quantity']]
processed = pipeline.fit_transform(numeric_data)
ETL 管道示例 #
完整 ETL 管道 #
python
import pandas as pd
import numpy as np
from datetime import datetime
class SalesETL:
def __init__(self, config):
self.config = config
self.log = []
def _log(self, message):
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.log.append(f"[{timestamp}] {message}")
print(f"[{timestamp}] {message}")
def extract(self):
self._log("开始数据提取")
df = pd.read_csv(self.config['source'])
self._log(f"提取 {len(df)} 条记录")
return df
def transform(self, df):
self._log("开始数据转换")
# 去重
before = len(df)
df = df.drop_duplicates()
self._log(f"去重: {before - len(df)} 条记录")
# 处理缺失值
df = df.dropna(subset=['date', 'product'])
df['quantity'] = df['quantity'].fillna(0)
df['price'] = df['price'].fillna(df['price'].median())
# 数据类型转换
df['date'] = pd.to_datetime(df['date'])
df['quantity'] = df['quantity'].astype(int)
# 计算派生字段
df['revenue'] = df['quantity'] * df['price']
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
self._log(f"转换后 {len(df)} 条记录")
return df
def load(self, df):
self._log("开始数据加载")
df.to_csv(self.config['destination'], index=False)
self._log(f"保存到 {self.config['destination']}")
return df
def run(self):
self._log("ETL 管道开始")
df = self.extract()
df = self.transform(df)
df = self.load(df)
self._log("ETL 管道完成")
return df
# 使用 ETL 管道
config = {
'source': 'raw_sales.csv',
'destination': 'processed_sales.csv'
}
etl = SalesETL(config)
df = etl.run()
数据验证管道 #
python
class DataValidator:
def __init__(self):
self.rules = []
self.errors = []
def add_rule(self, name, func, message):
self.rules.append({
'name': name,
'func': func,
'message': message
})
def validate(self, df):
self.errors = []
for rule in self.rules:
if not rule['func'](df):
self.errors.append({
'rule': rule['name'],
'message': rule['message']
})
return len(self.errors) == 0
def get_errors(self):
return self.errors
# 定义验证规则
validator = DataValidator()
validator.add_rule(
'no_nulls',
lambda df: not df['id'].isna().any(),
'ID 列不能有空值'
)
validator.add_rule(
'positive_quantity',
lambda df: (df['quantity'] >= 0).all(),
'数量必须为非负数'
)
validator.add_rule(
'valid_date',
lambda df: pd.to_datetime(df['date'], errors='coerce').notna().all(),
'日期格式无效'
)
# 执行验证
df = pd.read_csv('data.csv')
if validator.validate(df):
print("数据验证通过")
else:
print("数据验证失败:")
for error in validator.get_errors():
print(f" - {error['rule']}: {error['message']}")
增量处理管道 #
python
class IncrementalPipeline:
def __init__(self, checkpoint_file):
self.checkpoint_file = checkpoint_file
self.last_processed = self._load_checkpoint()
def _load_checkpoint(self):
try:
with open(self.checkpoint_file, 'r') as f:
return f.read().strip()
except FileNotFoundError:
return None
def _save_checkpoint(self, value):
with open(self.checkpoint_file, 'w') as f:
f.write(str(value))
def process_incremental(self, df, date_column='date'):
if self.last_processed:
df = df[df[date_column] > self.last_processed]
if len(df) > 0:
# 处理数据
processed = self._process(df)
# 更新检查点
self._save_checkpoint(df[date_column].max())
return processed
return None
def _process(self, df):
# 数据处理逻辑
return df
# 使用增量管道
pipeline = IncrementalPipeline('checkpoint.txt')
new_data = pipeline.process_incremental(df)
并行处理管道 #
python
from multiprocessing import Pool
import pandas as pd
def process_chunk(chunk):
# 处理数据块
chunk['total'] = chunk['quantity'] * chunk['price']
return chunk
def parallel_process(df, n_workers=4):
chunks = np.array_split(df, n_workers)
with Pool(n_workers) as pool:
results = pool.map(process_chunk, chunks)
return pd.concat(results)
# 使用并行处理
df = pd.read_csv('large_data.csv')
df_processed = parallel_process(df)
管道最佳实践 #
text
┌─────────────────────────────────────────────────────────────┐
│ 管道最佳实践 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 设计原则 │
│ ├── 单一职责:每个步骤只做一件事 │
│ ├── 可测试性:每个步骤可独立测试 │
│ ├── 可配置性:参数可配置 │
│ └── 可监控性:记录日志和指标 │
│ │
│ 错误处理 │
│ ├── 数据验证:处理前验证数据 │
│ ├── 异常捕获:捕获并记录异常 │
│ └── 回滚机制:支持失败回滚 │
│ │
│ 性能优化 │
│ ├── 批处理:大数据分块处理 │
│ ├── 并行化:利用多核处理 │
│ └── 增量处理:只处理新数据 │
│ │
└─────────────────────────────────────────────────────────────┘
下一步 #
掌握数据管道后,接下来学习 最佳实践,了解 Pandas 编码规范和常见陷阱!
最后更新:2026-04-04