Dask示例代码

在本章中,我们将提供一些实用的Dask代码示例和应用场景,帮助您更好地理解和应用Dask。

基础示例

Hello World示例

最简单的Dask程序示例:

from dask import delayed

@delayed
def greet(name):
    return f"Hello, {name}!"

@delayed
def add_exclamation(text):
    return text + " Welcome to Dask!"

# 创建延迟任务
greeting = greet("World")
final_greeting = add_exclamation(greeting)

# 执行计算
result = final_greeting.compute()
print(result)  # 输出: Hello, World! Welcome to Dask!
                        

并行计算示例

使用Dask进行并行计算:

from dask import delayed
import time

@delayed
def expensive_task(n):
    """模拟耗时任务"""
    time.sleep(1)
    return n ** 2

# 串行执行
start = time.time()
serial_results = [expensive_task(i).compute() for i in range(5)]
serial_time = time.time() - start
print(f"串行执行耗时: {serial_time:.2f}秒")

# 并行执行
start = time.time()
parallel_tasks = [expensive_task(i) for i in range(5)]
parallel_results = delayed.compute(*parallel_tasks)
parallel_time = time.time() - start
print(f"并行执行耗时: {parallel_time:.2f}秒")
print(f"加速比: {serial_time/parallel_time:.2f}x")
                        

数组处理示例

大型数组计算

使用Dask Array处理大型数值数据:

import dask.array as da
import numpy as np

# 创建大型随机数组
x = da.random.random((20000, 20000), chunks=(2000, 2000))
print(f"数组形状: {x.shape}")
print(f"分块大小: {x.chunksize}")
print(f"分区数量: {x.npartitions}")

# 执行复杂计算
y = x + x.T  # 矩阵加其转置
z = y.sum(axis=0)  # 按行求和
mean_val = z.mean()  # 计算平均值

# 计算结果
result = mean_val.compute()
print(f"结果: {result}")
                        

图像处理示例

使用Dask处理大型图像数据:

import dask.array as da
import numpy as np

# 模拟大型彩色图像数据 (高度, 宽度, 通道)
image = da.random.randint(0, 255, size=(10000, 10000, 3), chunks=(1000, 1000, 3))

# 图像处理操作
def process_channel(channel):
    """处理单个颜色通道"""
    # 简单的模糊处理
    return da.map_overlap(
        lambda x: x.mean(axis=(0, 1)), 
        channel, 
        depth=1, 
        boundary='reflect'
    )

# 分别处理每个颜色通道
processed_channels = [process_channel(image[:, :, i]) for i in range(3)]

# 合并处理结果
processed_image = da.stack(processed_channels, axis=-1)

# 计算结果
result = processed_image.compute()
print(f"处理后图像形状: {result.shape}")
                        

数据框操作示例

销售数据分析

使用Dask DataFrame分析大型销售数据:

import dask.dataframe as dd
import pandas as pd

# 创建示例销售数据
def create_sample_data():
    """创建示例销售数据"""
    data = {
        'date': pd.date_range('2020-01-01', periods=1000000, freq='H'),
        'product': [f'Product_{i%100}' for i in range(1000000)],
        'category': [f'Category_{i%10}' for i in range(1000000)],
        'quantity': np.random.randint(1, 100, 1000000),
        'price': np.random.uniform(10, 1000, 1000000),
        'customer_id': np.random.randint(1, 100000, 1000000)
    }
    df = pd.DataFrame(data)
    # 保存为多个CSV文件
    for i in range(10):
        df.iloc[i*100000:(i+1)*100000].to_csv(f'sales_data_{i}.csv', index=False)

# 创建示例数据(运行一次)
# create_sample_data()

# 读取大型销售数据
sales = dd.read_csv('sales_data_*.csv', parse_dates=['date'])

# 数据清洗
sales_clean = sales.dropna()

# 添加计算列
sales_clean['total_amount'] = sales_clean['quantity'] * sales_clean['price']

# 按类别分析销售数据
category_analysis = sales_clean.groupby('category').agg({
    'total_amount': ['sum', 'mean', 'count'],
    'quantity': 'sum'
}).compute()

print("按类别销售分析:")
print(category_analysis)

# 月度销售趋势
monthly_sales = sales_clean.groupby(sales_clean['date'].dt.to_period('M'))['total_amount'].sum().compute()
print("\n月度销售趋势:")
print(monthly_sales)

# 热门产品
top_products = sales_clean.groupby('product')['total_amount'].sum().nlargest(10).compute()
print("\n热门产品:")
print(top_products)
                        

日志数据分析

使用Dask分析Web服务器日志:

import dask.bag as db
import json
import re
from datetime import datetime

# 模拟创建日志数据
def create_sample_logs():
    """创建示例日志数据"""
    log_entries = []
    for i in range(100000):
        log_entry = {
            'ip': f'192.168.1.{i%255}',
            'timestamp': datetime.now().isoformat(),
            'method': 'GET' if i%2==0 else 'POST',
            'url': f'/page/{i%1000}',
            'status': 200 if i%100<95 else 404,
            'size': np.random.randint(100, 10000)
        }
        log_entries.append(json.dumps(log_entry))
    
    # 保存到多个文件
    for i in range(10):
        with open(f'access_logs_{i}.json', 'w') as f:
            f.write('\n'.join(log_entries[i*10000:(i+1)*10000]))

# 创建示例日志(运行一次)
# create_sample_logs()

# 读取日志文件
logs = db.read_text('access_logs_*.json').map(json.loads)

# 分析错误请求
error_logs = logs.filter(lambda x: x['status'] >= 400)
error_count = error_logs.count().compute()
print(f"错误请求数量: {error_count}")

# 分析请求方法
method_counts = logs.map(lambda x: x['method']).frequencies().compute()
print(f"请求方法统计: {method_counts}")

# 分析热门页面
popular_pages = logs.map(lambda x: x['url']).frequencies().topk(10, lambda x: x[1]).compute()
print(f"热门页面: {popular_pages}")

# 分析流量统计
total_bytes = logs.pluck('size').sum().compute()
avg_bytes = logs.pluck('size').mean().compute()
print(f"总流量: {total_bytes} bytes")
print(f"平均响应大小: {avg_bytes:.2f} bytes")
                        

机器学习示例

并行模型训练

使用Dask并行训练多个机器学习模型:

from dask import delayed
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
import numpy as np

# 生成示例数据
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000, n_features=20, n_classes=2, random_state=42)

# 定义模型参数
models_params = [
    (RandomForestClassifier, {'n_estimators': 10, 'max_depth': 3}),
    (RandomForestClassifier, {'n_estimators': 50, 'max_depth': 5}),
    (RandomForestClassifier, {'n_estimators': 100, 'max_depth': 7}),
    (SVC, {'C': 1.0, 'kernel': 'rbf'}),
    (SVC, {'C': 10.0, 'kernel': 'rbf'}),
    (LogisticRegression, {'C': 1.0}),
    (LogisticRegression, {'C': 10.0})
]

@delayed
def train_and_evaluate(model_class, params, X, y):
    """训练和评估模型"""
    model = model_class(**params)
    scores = cross_val_score(model, X, y, cv=5)
    return {
        'model': model_class.__name__,
        'params': params,
        'mean_score': scores.mean(),
        'std_score': scores.std()
    }

# 并行训练和评估所有模型
tasks = [train_and_evaluate(model_class, params, X, y) 
         for model_class, params in models_params]

# 计算结果
results = delayed.compute(*tasks)

# 找到最佳模型
best_result = max(results, key=lambda x: x['mean_score'])
print("最佳模型:")
print(f"  模型: {best_result['model']}")
print(f"  参数: {best_result['params']}")
print(f"  得分: {best_result['mean_score']:.4f} (+/- {best_result['std_score']*2:.4f})")

# 按性能排序所有模型
sorted_results = sorted(results, key=lambda x: x['mean_score'], reverse=True)
print("\n所有模型性能排序:")
for i, result in enumerate(sorted_results, 1):
    print(f"{i}. {result['model']}: {result['mean_score']:.4f}")
                        

超参数优化

使用Dask进行超参数优化:

from dask import delayed
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import numpy as np

# 生成示例数据
X, y = make_classification(n_samples=10000, n_features=20, n_classes=2, random_state=42)

# 定义参数网格
param_grid = {
    'n_estimators': [10, 50, 100],
    'max_depth': [3, 5, 7, None],
    'min_samples_split': [2, 5, 10]
}

@delayed
def grid_search_evaluation(params, X, y):
    """评估参数组合"""
    model = RandomForestClassifier(random_state=42)
    # 设置参数
    model.set_params(**params)
    
    # 简单的交叉验证
    from sklearn.model_selection import cross_val_score
    scores = cross_val_score(model, X, y, cv=3)
    
    return {
        'params': params,
        'mean_score': scores.mean(),
        'std_score': scores.std()
    }

# 生成所有参数组合
from sklearn.model_selection import ParameterGrid
param_combinations = list(ParameterGrid(param_grid))
print(f"参数组合数量: {len(param_combinations)}")

# 并行评估所有参数组合
tasks = [grid_search_evaluation(params, X, y) for params in param_combinations[:20]]

# 计算结果
results = delayed.compute(*tasks)

# 找到最佳参数
best_result = max(results, key=lambda x: x['mean_score'])
print("最佳参数:")
for param, value in best_result['params'].items():
    print(f"  {param}: {value}")
print(f"得分: {best_result['mean_score']:.4f} (+/- {best_result['std_score']*2:.4f})")
                        

高级应用示例

自定义并行处理

创建自定义的并行处理管道:

from dask import delayed
import time
import hashlib

# 模拟数据处理任务
@delayed
def load_data(file_path):
    """加载数据"""
    time.sleep(0.1)  # 模拟I/O延迟
    return f"Data from {file_path}"

@delayed
def process_data(data):
    """处理数据"""
    time.sleep(0.2)  # 模拟CPU处理
    # 简单的数据转换
    processed = data.upper()
    return processed

@delayed
def validate_data(data):
    """验证数据"""
    time.sleep(0.05)  # 模拟验证
    # 简单的验证:检查数据长度
    is_valid = len(data) > 10
    return {'data': data, 'valid': is_valid}

@delayed
def save_data(processed_data):
    """保存数据"""
    time.sleep(0.1)  # 模拟保存操作
    # 生成数据指纹
    fingerprint = hashlib.md5(processed_data['data'].encode()).hexdigest()
    return {
        'fingerprint': fingerprint,
        'status': 'saved' if processed_data['valid'] else 'skipped'
    }

# 构建处理管道
def build_processing_pipeline(file_paths):
    """构建并行处理管道"""
    pipelines = []
    
    for file_path in file_paths:
        # 加载数据
        data = load_data(file_path)
        
        # 处理数据
        processed = process_data(data)
        
        # 验证数据
        validated = validate_data(processed)
        
        # 保存数据
        saved = save_data(validated)
        
        pipelines.append(saved)
    
    return pipelines

# 示例文件路径
file_paths = [f"data_file_{i}.txt" for i in range(10)]

# 构建处理管道
pipelines = build_processing_pipeline(file_paths)

# 执行并行处理
print("开始并行处理...")
start_time = time.time()
results = delayed.compute(*pipelines)
end_time = time.time()

print(f"处理完成,耗时: {end_time - start_time:.2f}秒")
print("处理结果:")
for i, result in enumerate(results):
    print(f"  文件 {i}: {result['status']} (指纹: {result['fingerprint'][:8]}...)")
                        

实时数据处理

使用Dask处理实时数据流:

from dask import delayed
import time
import random
from collections import deque

# 模拟实时数据源
class RealTimeDataSource:
    def __init__(self):
        self.data_buffer = deque()
        self.counter = 0
    
    def generate_data(self):
        """生成新数据"""
        self.counter += 1
        data = {
            'id': self.counter,
            'value': random.random() * 100,
            'timestamp': time.time(),
            'category': f'Category_{random.randint(1, 5)}'
        }
        self.data_buffer.append(data)
        return data
    
    def get_batch(self, batch_size=10):
        """获取一批数据"""
        batch = []
        for _ in range(min(batch_size, len(self.data_buffer))):
            if self.data_buffer:
                batch.append(self.data_buffer.popleft())
        return batch

# 数据处理函数
@delayed
def process_record(record):
    """处理单条记录"""
    # 模拟处理时间
    time.sleep(0.01)
    
    # 数据处理逻辑
    processed = {
        'id': record['id'],
        'processed_value': record['value'] * 2,
        'category': record['category'],
        'timestamp': record['timestamp']
    }
    return processed

@delayed
def aggregate_batch(records):
    """聚合一批记录"""
    if not records:
        return {}
    
    # 计算统计信息
    values = [r['processed_value'] for r in records]
    categories = [r['category'] for r in records]
    
    aggregation = {
        'count': len(records),
        'avg_value': sum(values) / len(values),
        'max_value': max(values),
        'min_value': min(values),
        'category_distribution': {cat: categories.count(cat) for cat in set(categories)}
    }
    return aggregation

# 实时处理系统
def real_time_processing_system():
    """实时数据处理系统"""
    data_source = RealTimeDataSource()
    
    print("启动实时数据处理系统...")
    
    for batch_num in range(5):  # 处理5批数据
        print(f"\n处理第 {batch_num + 1} 批数据:")
        
        # 生成数据
        new_records = []
        for _ in range(20):
            record = data_source.generate_data()
            new_records.append(record)
        
        # 并行处理记录
        processed_records = [process_record(record) for record in new_records]
        
        # 计算结果
        processed_results = delayed.compute(*processed_records)
        
        # 聚合批次
        batch_aggregation = aggregate_batch(list(processed_results))
        aggregation_result = batch_aggregation.compute()
        
        # 输出结果
        print(f"  处理记录数: {aggregation_result.get('count', 0)}")
        print(f"  平均值: {aggregation_result.get('avg_value', 0):.2f}")
        print(f"  最大值: {aggregation_result.get('max_value', 0):.2f}")
        print(f"  最小值: {aggregation_result.get('min_value', 0):.2f}")
        print(f"  类别分布: {aggregation_result.get('category_distribution', {})}")
        
        # 模拟处理间隔
        time.sleep(1)
    
    print("\n实时数据处理系统完成!")

# 运行实时处理系统
# real_time_processing_system()
                        

性能测试示例

基准测试

比较不同方法的性能:

import time
import numpy as np
import dask.array as da
from dask.diagnostics import ProgressBar

def benchmark_array_operations():
    """基准测试数组操作"""
    print("数组操作性能基准测试")
    print("=" * 50)
    
    # 测试数据大小
    sizes = [(1000, 1000), (5000, 5000), (10000, 10000)]
    
    for size in sizes:
        print(f"\n测试数组大小: {size}")
        
        # NumPy基准测试
        print("  NumPy测试:")
        np_array = np.random.random(size)
        start_time = time.time()
        np_result = np_array.sum(axis=0).mean()
        np_time = time.time() - start_time
        print(f"    耗时: {np_time:.4f}秒")
        
        # Dask测试
        print("  Dask测试:")
        chunk_size = min(size[0]//10, 1000)  # 动态调整分块大小
        da_array = da.random.random(size, chunks=(chunk_size, chunk_size))
        
        with ProgressBar():
            start_time = time.time()
            da_result = da_array.sum(axis=0).mean().compute()
            da_time = time.time() - start_time
            print(f"    耗时: {da_time:.4f}秒")
        
        # 性能比较
        speedup = da_time / np_time if np_time > 0 else 1
        print(f"    加速比: {speedup:.2f}x")
        print(f"    结果一致性: {abs(np_result - da_result) < 1e-10}")

# 运行基准测试
# benchmark_array_operations()
                        

提示:这些示例代码展示了Dask在不同场景下的应用。在实际使用中,请根据具体需求调整代码参数和逻辑。建议先在小规模数据上测试代码,确保正确性后再应用到大规模数据处理中。