Dask示例代码
在本章中,我们将提供一些实用的Dask代码示例和应用场景,帮助您更好地理解和应用Dask。
基础示例
Hello World示例
最简单的Dask程序示例:
from dask import delayed
@delayed
def greet(name):
return f"Hello, {name}!"
@delayed
def add_exclamation(text):
return text + " Welcome to Dask!"
# 创建延迟任务
greeting = greet("World")
final_greeting = add_exclamation(greeting)
# 执行计算
result = final_greeting.compute()
print(result) # 输出: Hello, World! Welcome to Dask!
并行计算示例
使用Dask进行并行计算:
from dask import delayed
import time
@delayed
def expensive_task(n):
"""模拟耗时任务"""
time.sleep(1)
return n ** 2
# 串行执行
start = time.time()
serial_results = [expensive_task(i).compute() for i in range(5)]
serial_time = time.time() - start
print(f"串行执行耗时: {serial_time:.2f}秒")
# 并行执行
start = time.time()
parallel_tasks = [expensive_task(i) for i in range(5)]
parallel_results = delayed.compute(*parallel_tasks)
parallel_time = time.time() - start
print(f"并行执行耗时: {parallel_time:.2f}秒")
print(f"加速比: {serial_time/parallel_time:.2f}x")
数组处理示例
大型数组计算
使用Dask Array处理大型数值数据:
import dask.array as da
import numpy as np
# 创建大型随机数组
x = da.random.random((20000, 20000), chunks=(2000, 2000))
print(f"数组形状: {x.shape}")
print(f"分块大小: {x.chunksize}")
print(f"分区数量: {x.npartitions}")
# 执行复杂计算
y = x + x.T # 矩阵加其转置
z = y.sum(axis=0) # 按行求和
mean_val = z.mean() # 计算平均值
# 计算结果
result = mean_val.compute()
print(f"结果: {result}")
图像处理示例
使用Dask处理大型图像数据:
import dask.array as da
import numpy as np
# 模拟大型彩色图像数据 (高度, 宽度, 通道)
image = da.random.randint(0, 255, size=(10000, 10000, 3), chunks=(1000, 1000, 3))
# 图像处理操作
def process_channel(channel):
"""处理单个颜色通道"""
# 简单的模糊处理
return da.map_overlap(
lambda x: x.mean(axis=(0, 1)),
channel,
depth=1,
boundary='reflect'
)
# 分别处理每个颜色通道
processed_channels = [process_channel(image[:, :, i]) for i in range(3)]
# 合并处理结果
processed_image = da.stack(processed_channels, axis=-1)
# 计算结果
result = processed_image.compute()
print(f"处理后图像形状: {result.shape}")
数据框操作示例
销售数据分析
使用Dask DataFrame分析大型销售数据:
import dask.dataframe as dd
import pandas as pd
# 创建示例销售数据
def create_sample_data():
"""创建示例销售数据"""
data = {
'date': pd.date_range('2020-01-01', periods=1000000, freq='H'),
'product': [f'Product_{i%100}' for i in range(1000000)],
'category': [f'Category_{i%10}' for i in range(1000000)],
'quantity': np.random.randint(1, 100, 1000000),
'price': np.random.uniform(10, 1000, 1000000),
'customer_id': np.random.randint(1, 100000, 1000000)
}
df = pd.DataFrame(data)
# 保存为多个CSV文件
for i in range(10):
df.iloc[i*100000:(i+1)*100000].to_csv(f'sales_data_{i}.csv', index=False)
# 创建示例数据(运行一次)
# create_sample_data()
# 读取大型销售数据
sales = dd.read_csv('sales_data_*.csv', parse_dates=['date'])
# 数据清洗
sales_clean = sales.dropna()
# 添加计算列
sales_clean['total_amount'] = sales_clean['quantity'] * sales_clean['price']
# 按类别分析销售数据
category_analysis = sales_clean.groupby('category').agg({
'total_amount': ['sum', 'mean', 'count'],
'quantity': 'sum'
}).compute()
print("按类别销售分析:")
print(category_analysis)
# 月度销售趋势
monthly_sales = sales_clean.groupby(sales_clean['date'].dt.to_period('M'))['total_amount'].sum().compute()
print("\n月度销售趋势:")
print(monthly_sales)
# 热门产品
top_products = sales_clean.groupby('product')['total_amount'].sum().nlargest(10).compute()
print("\n热门产品:")
print(top_products)
日志数据分析
使用Dask分析Web服务器日志:
import dask.bag as db
import json
import re
from datetime import datetime
# 模拟创建日志数据
def create_sample_logs():
"""创建示例日志数据"""
log_entries = []
for i in range(100000):
log_entry = {
'ip': f'192.168.1.{i%255}',
'timestamp': datetime.now().isoformat(),
'method': 'GET' if i%2==0 else 'POST',
'url': f'/page/{i%1000}',
'status': 200 if i%100<95 else 404,
'size': np.random.randint(100, 10000)
}
log_entries.append(json.dumps(log_entry))
# 保存到多个文件
for i in range(10):
with open(f'access_logs_{i}.json', 'w') as f:
f.write('\n'.join(log_entries[i*10000:(i+1)*10000]))
# 创建示例日志(运行一次)
# create_sample_logs()
# 读取日志文件
logs = db.read_text('access_logs_*.json').map(json.loads)
# 分析错误请求
error_logs = logs.filter(lambda x: x['status'] >= 400)
error_count = error_logs.count().compute()
print(f"错误请求数量: {error_count}")
# 分析请求方法
method_counts = logs.map(lambda x: x['method']).frequencies().compute()
print(f"请求方法统计: {method_counts}")
# 分析热门页面
popular_pages = logs.map(lambda x: x['url']).frequencies().topk(10, lambda x: x[1]).compute()
print(f"热门页面: {popular_pages}")
# 分析流量统计
total_bytes = logs.pluck('size').sum().compute()
avg_bytes = logs.pluck('size').mean().compute()
print(f"总流量: {total_bytes} bytes")
print(f"平均响应大小: {avg_bytes:.2f} bytes")
机器学习示例
并行模型训练
使用Dask并行训练多个机器学习模型:
from dask import delayed
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
import numpy as np
# 生成示例数据
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000, n_features=20, n_classes=2, random_state=42)
# 定义模型参数
models_params = [
(RandomForestClassifier, {'n_estimators': 10, 'max_depth': 3}),
(RandomForestClassifier, {'n_estimators': 50, 'max_depth': 5}),
(RandomForestClassifier, {'n_estimators': 100, 'max_depth': 7}),
(SVC, {'C': 1.0, 'kernel': 'rbf'}),
(SVC, {'C': 10.0, 'kernel': 'rbf'}),
(LogisticRegression, {'C': 1.0}),
(LogisticRegression, {'C': 10.0})
]
@delayed
def train_and_evaluate(model_class, params, X, y):
"""训练和评估模型"""
model = model_class(**params)
scores = cross_val_score(model, X, y, cv=5)
return {
'model': model_class.__name__,
'params': params,
'mean_score': scores.mean(),
'std_score': scores.std()
}
# 并行训练和评估所有模型
tasks = [train_and_evaluate(model_class, params, X, y)
for model_class, params in models_params]
# 计算结果
results = delayed.compute(*tasks)
# 找到最佳模型
best_result = max(results, key=lambda x: x['mean_score'])
print("最佳模型:")
print(f" 模型: {best_result['model']}")
print(f" 参数: {best_result['params']}")
print(f" 得分: {best_result['mean_score']:.4f} (+/- {best_result['std_score']*2:.4f})")
# 按性能排序所有模型
sorted_results = sorted(results, key=lambda x: x['mean_score'], reverse=True)
print("\n所有模型性能排序:")
for i, result in enumerate(sorted_results, 1):
print(f"{i}. {result['model']}: {result['mean_score']:.4f}")
超参数优化
使用Dask进行超参数优化:
from dask import delayed
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import numpy as np
# 生成示例数据
X, y = make_classification(n_samples=10000, n_features=20, n_classes=2, random_state=42)
# 定义参数网格
param_grid = {
'n_estimators': [10, 50, 100],
'max_depth': [3, 5, 7, None],
'min_samples_split': [2, 5, 10]
}
@delayed
def grid_search_evaluation(params, X, y):
"""评估参数组合"""
model = RandomForestClassifier(random_state=42)
# 设置参数
model.set_params(**params)
# 简单的交叉验证
from sklearn.model_selection import cross_val_score
scores = cross_val_score(model, X, y, cv=3)
return {
'params': params,
'mean_score': scores.mean(),
'std_score': scores.std()
}
# 生成所有参数组合
from sklearn.model_selection import ParameterGrid
param_combinations = list(ParameterGrid(param_grid))
print(f"参数组合数量: {len(param_combinations)}")
# 并行评估所有参数组合
tasks = [grid_search_evaluation(params, X, y) for params in param_combinations[:20]]
# 计算结果
results = delayed.compute(*tasks)
# 找到最佳参数
best_result = max(results, key=lambda x: x['mean_score'])
print("最佳参数:")
for param, value in best_result['params'].items():
print(f" {param}: {value}")
print(f"得分: {best_result['mean_score']:.4f} (+/- {best_result['std_score']*2:.4f})")
高级应用示例
自定义并行处理
创建自定义的并行处理管道:
from dask import delayed
import time
import hashlib
# 模拟数据处理任务
@delayed
def load_data(file_path):
"""加载数据"""
time.sleep(0.1) # 模拟I/O延迟
return f"Data from {file_path}"
@delayed
def process_data(data):
"""处理数据"""
time.sleep(0.2) # 模拟CPU处理
# 简单的数据转换
processed = data.upper()
return processed
@delayed
def validate_data(data):
"""验证数据"""
time.sleep(0.05) # 模拟验证
# 简单的验证:检查数据长度
is_valid = len(data) > 10
return {'data': data, 'valid': is_valid}
@delayed
def save_data(processed_data):
"""保存数据"""
time.sleep(0.1) # 模拟保存操作
# 生成数据指纹
fingerprint = hashlib.md5(processed_data['data'].encode()).hexdigest()
return {
'fingerprint': fingerprint,
'status': 'saved' if processed_data['valid'] else 'skipped'
}
# 构建处理管道
def build_processing_pipeline(file_paths):
"""构建并行处理管道"""
pipelines = []
for file_path in file_paths:
# 加载数据
data = load_data(file_path)
# 处理数据
processed = process_data(data)
# 验证数据
validated = validate_data(processed)
# 保存数据
saved = save_data(validated)
pipelines.append(saved)
return pipelines
# 示例文件路径
file_paths = [f"data_file_{i}.txt" for i in range(10)]
# 构建处理管道
pipelines = build_processing_pipeline(file_paths)
# 执行并行处理
print("开始并行处理...")
start_time = time.time()
results = delayed.compute(*pipelines)
end_time = time.time()
print(f"处理完成,耗时: {end_time - start_time:.2f}秒")
print("处理结果:")
for i, result in enumerate(results):
print(f" 文件 {i}: {result['status']} (指纹: {result['fingerprint'][:8]}...)")
实时数据处理
使用Dask处理实时数据流:
from dask import delayed
import time
import random
from collections import deque
# 模拟实时数据源
class RealTimeDataSource:
def __init__(self):
self.data_buffer = deque()
self.counter = 0
def generate_data(self):
"""生成新数据"""
self.counter += 1
data = {
'id': self.counter,
'value': random.random() * 100,
'timestamp': time.time(),
'category': f'Category_{random.randint(1, 5)}'
}
self.data_buffer.append(data)
return data
def get_batch(self, batch_size=10):
"""获取一批数据"""
batch = []
for _ in range(min(batch_size, len(self.data_buffer))):
if self.data_buffer:
batch.append(self.data_buffer.popleft())
return batch
# 数据处理函数
@delayed
def process_record(record):
"""处理单条记录"""
# 模拟处理时间
time.sleep(0.01)
# 数据处理逻辑
processed = {
'id': record['id'],
'processed_value': record['value'] * 2,
'category': record['category'],
'timestamp': record['timestamp']
}
return processed
@delayed
def aggregate_batch(records):
"""聚合一批记录"""
if not records:
return {}
# 计算统计信息
values = [r['processed_value'] for r in records]
categories = [r['category'] for r in records]
aggregation = {
'count': len(records),
'avg_value': sum(values) / len(values),
'max_value': max(values),
'min_value': min(values),
'category_distribution': {cat: categories.count(cat) for cat in set(categories)}
}
return aggregation
# 实时处理系统
def real_time_processing_system():
"""实时数据处理系统"""
data_source = RealTimeDataSource()
print("启动实时数据处理系统...")
for batch_num in range(5): # 处理5批数据
print(f"\n处理第 {batch_num + 1} 批数据:")
# 生成数据
new_records = []
for _ in range(20):
record = data_source.generate_data()
new_records.append(record)
# 并行处理记录
processed_records = [process_record(record) for record in new_records]
# 计算结果
processed_results = delayed.compute(*processed_records)
# 聚合批次
batch_aggregation = aggregate_batch(list(processed_results))
aggregation_result = batch_aggregation.compute()
# 输出结果
print(f" 处理记录数: {aggregation_result.get('count', 0)}")
print(f" 平均值: {aggregation_result.get('avg_value', 0):.2f}")
print(f" 最大值: {aggregation_result.get('max_value', 0):.2f}")
print(f" 最小值: {aggregation_result.get('min_value', 0):.2f}")
print(f" 类别分布: {aggregation_result.get('category_distribution', {})}")
# 模拟处理间隔
time.sleep(1)
print("\n实时数据处理系统完成!")
# 运行实时处理系统
# real_time_processing_system()
性能测试示例
基准测试
比较不同方法的性能:
import time
import numpy as np
import dask.array as da
from dask.diagnostics import ProgressBar
def benchmark_array_operations():
"""基准测试数组操作"""
print("数组操作性能基准测试")
print("=" * 50)
# 测试数据大小
sizes = [(1000, 1000), (5000, 5000), (10000, 10000)]
for size in sizes:
print(f"\n测试数组大小: {size}")
# NumPy基准测试
print(" NumPy测试:")
np_array = np.random.random(size)
start_time = time.time()
np_result = np_array.sum(axis=0).mean()
np_time = time.time() - start_time
print(f" 耗时: {np_time:.4f}秒")
# Dask测试
print(" Dask测试:")
chunk_size = min(size[0]//10, 1000) # 动态调整分块大小
da_array = da.random.random(size, chunks=(chunk_size, chunk_size))
with ProgressBar():
start_time = time.time()
da_result = da_array.sum(axis=0).mean().compute()
da_time = time.time() - start_time
print(f" 耗时: {da_time:.4f}秒")
# 性能比较
speedup = da_time / np_time if np_time > 0 else 1
print(f" 加速比: {speedup:.2f}x")
print(f" 结果一致性: {abs(np_result - da_result) < 1e-10}")
# 运行基准测试
# benchmark_array_operations()
提示:这些示例代码展示了Dask在不同场景下的应用。在实际使用中,请根据具体需求调整代码参数和逻辑。建议先在小规模数据上测试代码,确保正确性后再应用到大规模数据处理中。