第5章 Dask数据框操作

在本章中,我们将学习如何使用Dask DataFrame处理大型表格数据。Dask DataFrame是Pandas DataFrame的并行化版本,支持大型数据集的分块处理。

5.1 Dask DataFrame简介

Dask DataFrame是Pandas DataFrame的分布式版本,它将大型数据集分割成多个小块(partitions),每块可以独立处理。Dask DataFrame提供与Pandas几乎相同的API,使得用户可以轻松地将Pandas代码迁移到Dask。

Dask DataFrame的特点

  • 熟悉的API:与Pandas DataFrame具有相同的API
  • 并行处理:不同的分区可以并行处理
  • 内存效率:只在需要时加载数据分区到内存
  • 可扩展性:可以处理超出内存限制的大型数据集
# 示例:创建Dask DataFrame
import dask.dataframe as dd
import pandas as pd

# 从Pandas DataFrame创建Dask DataFrame
df_pandas = pd.DataFrame({'x': range(1000), 'y': range(1000)})
df_dask = dd.from_pandas(df_pandas, npartitions=4)

# 直接从CSV文件创建
df = dd.read_csv('data/*.csv')

print(f"DataFrame形状: {df.shape}")
print(f"分区数量: {df.npartitions}")
                        

5.2 创建Dask DataFrame

有多种方式可以创建Dask DataFrame:

从Pandas DataFrame创建

import pandas as pd
import dask.dataframe as dd

# 创建Pandas DataFrame
df_pandas = pd.DataFrame({
    'name': ['Alice', 'Bob', 'Charlie'],
    'age': [25, 30, 35],
    'city': ['New York', 'London', 'Tokyo']
})

# 转换为Dask DataFrame
df_dask = dd.from_pandas(df_pandas, npartitions=2)
                    

从CSV文件创建

# 从单个CSV文件创建
df = dd.read_csv('data.csv')

# 从多个CSV文件创建
df = dd.read_csv('data/*.csv')

# 指定数据类型以提高性能
df = dd.read_csv('data.csv', dtype={'id': 'int64', 'name': 'object', 'value': 'float64'})

# 只读取特定列
df = dd.read_csv('data.csv', usecols=['id', 'name', 'value'])
                    

从其他数据源创建

# 从Parquet文件创建
df = dd.read_parquet('data.parquet')

# 从HDF5文件创建
df = dd.read_hdf('data.h5', key='table')

# 从JSON文件创建
df = dd.read_json('data/*.json')
                    

5.3 DataFrame基本操作

Dask DataFrame支持大部分Pandas DataFrame操作:

数据查看

import dask.dataframe as dd

# 读取数据
df = dd.read_csv('data.csv')

# 查看前几行(注意:这会触发计算)
print(df.head())

# 查看数据信息
print(df.columns)
print(df.dtypes)
print(df.index)

# 查看数据形状(延迟计算)
print(df.shape)
                    

数据选择

# 选择列
df_names = df['name']
df_subset = df[['name', 'age']]

# 选择行
df_first_100 = df.head(100)  # 前100行
df_sample = df.sample(frac=0.1)  # 随机采样10%

# 条件选择
df_filtered = df[df['age'] > 30]
df_complex = df[(df['age'] > 25) & (df['city'] == 'New York')]
                    

数据处理

# 添加新列
df['age_group'] = df['age'].apply(lambda x: 'Young' if x < 30 else 'Old', meta=('age_group', 'object'))

# 数据转换
df['age_squared'] = df['age'] ** 2

# 缺失值处理
df_cleaned = df.dropna()
df_filled = df.fillna(0)

# 删除列
df_reduced = df.drop('unnecessary_column', axis=1)
                    

5.4 分组和聚合操作

Dask DataFrame支持强大的分组和聚合功能:

import dask.dataframe as dd

# 读取数据
df = dd.read_csv('sales_data.csv')

# 基本分组聚合
grouped = df.groupby('category')['sales'].sum()

# 多列分组
multi_grouped = df.groupby(['category', 'region'])['sales'].mean()

# 多种聚合操作
agg_result = df.groupby('category').agg({
    'sales': ['sum', 'mean', 'count'],
    'quantity': ['sum', 'max', 'min']
})

# 自定义聚合函数
def custom_agg(series):
    return series.max() - series.min()

custom_result = df.groupby('category')['sales'].apply(custom_agg, meta=('sales', 'float64'))
                    

5.5 数据连接和合并

Dask DataFrame支持多种数据连接操作:

import dask.dataframe as dd

# 创建示例数据
df1 = dd.read_csv('data1.csv')
df2 = dd.read_csv('data2.csv')

# 内连接
inner_join = dd.merge(df1, df2, on='id', how='inner')

# 左连接
left_join = dd.merge(df1, df2, on='id', how='left')

# 外连接
outer_join = dd.merge(df1, df2, on='id', how='outer')

# 按索引连接
index_join = dd.merge(df1, df2, left_index=True, right_index=True)
                    

5.6 数据排序和去重

Dask DataFrame支持数据排序和去重操作:

import dask.dataframe as dd

# 读取数据
df = dd.read_csv('data.csv')

# 排序
df_sorted = df.sort_values('age')
df_multi_sorted = df.sort_values(['age', 'name'])

# 去重
df_unique = df.drop_duplicates()
df_unique_subset = df.drop_duplicates(subset=['name'])

# 获取最大/最小值
max_age = df['age'].max().compute()
min_age = df['age'].min().compute()
                    

5.7 数据写入

Dask DataFrame支持将处理结果写入多种格式:

import dask.dataframe as dd

# 处理数据
df = dd.read_csv('input.csv')
processed_df = df[df['age'] > 25]

# 写入CSV文件
processed_df.to_csv('output/*.csv', index=False)

# 写入Parquet文件
processed_df.to_parquet('output.parquet')

# 写入HDF5文件
processed_df.to_hdf('output.h5', key='data')

# 写入JSON文件
processed_df.to_json('output/*.json')
                    

5.8 性能优化技巧

为了提高Dask DataFrame的性能,可以采用以下优化策略:

合理设置分区数

# 重新分区
df_repartitioned = df.repartition(npartitions=10)

# 根据列值分区
df_partitioned = df.repartition(columns=['date'])

# 根据文件分区
df_from_files = dd.read_csv('data/*.csv')  # 每个文件一个分区
                    

持久化常用数据

# 持久化DataFrame到内存
df_persisted = df.persist()

# 后续操作将更快
result = df_persisted.groupby('category').sales.sum()
                    

优化数据类型

# 优化数据类型以节省内存
df['id'] = df['id'].astype('int32')  # 如果数据范围允许
df['category'] = df['category'].astype('category')  # 对于重复值多的字符串列
                    

5.9 实际应用示例

以下是一些Dask DataFrame在实际应用中的示例:

日志数据分析

# 分析Web服务器日志
import dask.dataframe as dd

# 读取日志文件
logs = dd.read_csv(
    'access_logs/*.log',
    sep=' ',
    names=['ip', 'timestamp', 'method', 'url', 'status', 'size'],
    dtype={'size': 'float64'}
)

# 数据清洗
logs_clean = logs.dropna()

# 分析请求模式
requests_by_hour = logs_clean.groupby(logs_clean['timestamp'].dt.hour)['ip'].count()

# 分析热门页面
popular_pages = logs_clean.groupby('url')['ip'].count().nlargest(10)

# 计算错误率
error_rate = (logs_clean['status'] >= 400).mean()
                        

销售数据分析

# 分析销售数据
import dask.dataframe as dd

# 读取销售数据
sales = dd.read_csv('sales_data/*.csv', parse_dates=['date'])

# 月度销售统计
monthly_sales = sales.groupby(sales['date'].dt.to_period('M'))['amount'].sum()

# 产品类别分析
category_analysis = sales.groupby('category').agg({
    'amount': ['sum', 'mean', 'count'],
    'quantity': 'sum'
})

# 客户价值分析
customer_value = sales.groupby('customer_id')['amount'].sum().nlargest(100)
                        

提示:在使用Dask DataFrame时,要注意延迟计算的特点。许多操作(如head()、shape等)会触发实际计算,而大多数转换操作(如filter、groupby等)是延迟的。合理使用compute()和persist()方法可以优化性能。