性能优化 #
性能优化概述 #
Pandas 性能优化是处理大数据集的关键技能,可以显著提升代码执行效率。
text
┌─────────────────────────────────────────────────────────────┐
│ 性能优化策略 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 向量化操作 │
│ ├── 避免循环 │
│ ├── 使用内置方法 │
│ └── 使用 NumPy 函数 │
│ │
│ 内存优化 │
│ ├── 选择合适的数据类型 │
│ ├── 使用 category 类型 │
│ └── 分块处理大文件 │
│ │
│ 查询优化 │
│ ├── 使用 loc 替代链式索引 │
│ ├── 使用 query 方法 │
│ └── 使用 isin 替代多个 or │
│ │
│ 并行处理 │
│ ├── 使用 swifter │
│ ├── 使用 Dask │
│ └── 多进程处理 │
│ │
└─────────────────────────────────────────────────────────────┘
向量化操作 #
避免循环 #
python
import pandas as pd
import numpy as np
# 创建测试数据
df = pd.DataFrame({'value': np.random.rand(100000)})
# 不推荐:使用循环
def slow_method():
result = []
for i in range(len(df)):
result.append(df.iloc[i]['value'] * 2)
return result
# 推荐:向量化操作
def fast_method():
return df['value'] * 2
# 性能对比
%timeit slow_method() # 较慢
%timeit fast_method() # 快很多
使用内置方法 #
python
# 不推荐
df['value'].apply(lambda x: x * 2)
# 推荐
df['value'] * 2
# 不推荐
df['value'].apply(np.sqrt)
# 推荐
np.sqrt(df['value'])
使用 NumPy 函数 #
python
# NumPy 函数通常比 Pandas 方法更快
%timeit df['value'].sum()
%timeit np.sum(df['value'].values)
%timeit df['value'].mean()
%timeit np.mean(df['value'].values)
内存优化 #
选择合适的数据类型 #
python
# 创建测试数据
df = pd.DataFrame({
'small_int': np.random.randint(0, 100, 100000),
'big_int': np.random.randint(0, 1000000, 100000),
'float': np.random.randn(100000),
'text': ['text'] * 100000
})
# 查看内存使用
print(df.memory_usage(deep=True))
# 优化数据类型
df['small_int'] = df['small_int'].astype('int8')
df['big_int'] = df['big_int'].astype('int32')
df['float'] = df['float'].astype('float32')
print('优化后:')
print(df.memory_usage(deep=True))
使用 category 类型 #
python
# 分类数据使用 category 类型
df = pd.DataFrame({
'category': np.random.choice(['A', 'B', 'C', 'D'], 100000)
})
print('字符串内存:', df['category'].memory_usage(deep=True))
df['category'] = df['category'].astype('category')
print('分类内存:', df['category'].memory_usage(deep=True))
读取时指定类型 #
python
# 读取时指定类型,避免自动推断
dtypes = {
'col1': 'int32',
'col2': 'float32',
'col3': 'category'
}
df = pd.read_csv('data.csv', dtype=dtypes)
分块处理大文件 #
python
# 分块读取大文件
chunks = pd.read_csv('large.csv', chunksize=100000)
results = []
for chunk in chunks:
# 处理每个块
result = chunk.groupby('key').sum()
results.append(result)
# 合并结果
final = pd.concat(results).groupby(level=0).sum()
只读取需要的列 #
python
# 只读取需要的列
df = pd.read_csv('large.csv', usecols=['col1', 'col2', 'col3'])
查询优化 #
使用 loc 替代链式索引 #
python
# 不推荐:链式索引
df[df['value'] > 0]['result'] = 1 # SettingWithCopyWarning
# 推荐:loc
df.loc[df['value'] > 0, 'result'] = 1
使用 query 方法 #
python
# 复杂条件使用 query
%timeit df[(df['A'] > 0) & (df['B'] < 100) & (df['C'] == 'value')]
%timeit df.query('A > 0 and B < 100 and C == "value"')
使用 isin 替代多个 or #
python
# 不推荐
df[(df['category'] == 'A') | (df['category'] == 'B') | (df['category'] == 'C')]
# 推荐
df[df['category'].isin(['A', 'B', 'C'])]
使用 numpy.where #
python
# 条件赋值
# 不推荐
df['result'] = df['value'].apply(lambda x: 'high' if x > 0 else 'low')
# 推荐
df['result'] = np.where(df['value'] > 0, 'high', 'low')
使用 numpy.select #
python
# 多条件赋值
conditions = [
df['value'] > 100,
df['value'] > 50,
df['value'] > 0
]
choices = ['high', 'medium', 'low']
df['level'] = np.select(conditions, choices, default='very low')
并行处理 #
使用 swifter #
python
# 安装: pip install swifter
import swifter
# 并行 apply
df['result'] = df['value'].swifter.apply(lambda x: x * 2)
# 进度条
df['result'] = df['value'].swifter.progress_bar(True).apply(lambda x: x * 2)
使用 Dask #
python
# 安装: pip install dask
import dask.dataframe as dd
# 读取大数据
ddf = dd.read_csv('very_large.csv')
# 操作(惰性执行)
result = ddf.groupby('key').value.sum()
# 计算
result.compute()
多进程处理 #
python
from multiprocessing import Pool
def process_chunk(chunk):
return chunk.groupby('key').sum()
# 分块并行处理
chunks = np.array_split(df, 4)
with Pool(4) as pool:
results = pool.map(process_chunk, chunks)
final = pd.concat(results).groupby(level=0).sum()
索引优化 #
使用唯一索引 #
python
# 唯一索引查找更快
df = df.set_index('unique_id')
# 检查索引是否唯一
print(df.index.is_unique)
使用排序索引 #
python
# 排序索引查找更快
df = df.sort_index()
# 检查索引是否排序
print(df.index.is_monotonic_increasing)
避免重复索引 #
python
# 检查重复索引
print(df.index.duplicated().sum())
# 删除重复索引
df = df[~df.index.duplicated(keep='first')]
性能分析 #
使用 timeit #
python
%timeit df['value'].mean()
%timeit df['value'].sum()
使用 %prun #
python
%prun df.groupby('key').agg({'value': ['mean', 'sum']})
使用 memory_profiler #
python
# 安装: pip install memory_profiler
%load_ext memory_profiler
%memit df.groupby('key').sum()
性能优化清单 #
text
┌─────────────────────────────────────────────────────────────┐
│ 性能优化检查清单 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 数据类型 │
│ □ 使用最小数据类型 │
│ □ 分类数据使用 category │
│ □ 字符串使用 string 类型 │
│ │
│ 操作方式 │
│ □ 避免循环,使用向量化 │
│ □ 使用内置方法 │
│ □ 使用 NumPy 函数 │
│ │
│ 索引优化 │
│ □ 使用唯一索引 │
│ □ 保持索引排序 │
│ □ 避免重复索引 │
│ │
│ 查询优化 │
│ □ 使用 loc 替代链式索引 │
│ □ 使用 isin 替代多个 or │
│ □ 使用 query 处理复杂条件 │
│ │
│ 大数据处理 │
│ □ 分块读取 │
│ □ 只读取需要的列 │
│ □ 考虑使用 Dask │
│ │
└─────────────────────────────────────────────────────────────┘
下一步 #
掌握了性能优化后,接下来学习 高级操作,了解 Pandas 的高级功能和技巧!
最后更新:2026-04-04