第6章 Dask延迟计算
在本章中,我们将学习Dask的延迟计算机制和dask.delayed装饰器。延迟计算是Dask的核心特性之一,它允许我们将任意Python函数并行化执行。
6.1 延迟计算简介
延迟计算(Lazy Evaluation)是Dask的核心机制之一。当我们使用dask.delayed装饰器包装函数时,函数调用不会立即执行,而是创建一个任务图,直到调用compute()方法时才真正执行计算。
延迟计算的优势
- 优化执行计划:Dask可以在执行前分析整个计算图,优化执行顺序
- 减少内存使用:只在需要时才计算和存储中间结果
- 并行执行:独立的任务可以并行执行
- 错误提前发现:在构建计算图时就能发现一些错误
# 示例:延迟计算与立即计算的对比
import time
from dask import delayed
# 普通函数
def expensive_operation(x):
time.sleep(1) # 模拟耗时操作
return x * 2
# 立即计算(串行执行)
start = time.time()
results = [expensive_operation(i) for i in range(5)]
print(f"立即计算耗时: {time.time() - start:.2f}秒")
# 延迟计算(并行执行)
start = time.time()
delayed_results = [delayed(expensive_operation)(i) for i in range(5)]
computed_results = delayed.compute(*delayed_results)
print(f"延迟计算耗时: {time.time() - start:.2f}秒")
6.2 使用dask.delayed装饰器
dask.delayed装饰器是最常用的延迟计算方式,它可以将任何Python函数转换为延迟执行的函数。
基本用法
from dask import delayed
# 使用装饰器
@delayed
def add(a, b):
return a + b
@delayed
def multiply(a, b):
return a * b
# 创建延迟任务
x = add(1, 2) # 不会立即执行
y = add(3, 4) # 不会立即执行
z = multiply(x, y) # 不会立即执行
# 执行计算
result = z.compute() # 现在才真正执行
print(result) # 输出: 21
不使用装饰器的方式
from dask import delayed
def add(a, b):
return a + b
def multiply(a, b):
return a * b
# 手动创建延迟任务
x = delayed(add)(1, 2)
y = delayed(add)(3, 4)
z = delayed(multiply)(x, y)
# 执行计算
result = z.compute()
print(result) # 输出: 21
6.3 构建复杂的计算图
通过组合多个延迟任务,可以构建复杂的计算图:
from dask import delayed
import time
@delayed
def load_data(filename):
time.sleep(1) # 模拟加载时间
return f"Data from {filename}"
@delayed
def process_data(data):
time.sleep(1) # 模拟处理时间
return f"Processed {data}"
@delayed
def combine_results(*results):
time.sleep(1) # 模拟合并时间
return f"Combined: {', '.join(results)}"
# 构建计算图
data1 = load_data("file1.txt")
data2 = load_data("file2.txt")
processed1 = process_data(data1)
processed2 = process_data(data2)
final_result = combine_results(processed1, processed2)
# 可视化计算图
final_result.visualize(filename="computation_graph.png")
# 执行计算
result = final_result.compute()
print(result)
6.4 控制任务执行
可以通过参数控制延迟任务的执行方式:
设置任务名称
from dask import delayed
@delayed(name="custom_add")
def add(a, b):
return a + b
result = add(1, 2)
print(result.key) # 输出任务名称
控制资源使用
from dask import delayed
@delayed(nout=2) # 指定返回值数量
def split_data(data):
mid = len(data) // 2
return data[:mid], data[mid:]
@delayed
def process_part1(part):
return f"Part1: {part}"
@delayed
def process_part2(part):
return f"Part2: {part}"
# 使用
data = [1, 2, 3, 4, 5, 6]
part1, part2 = split_data(data)
result1 = process_part1(part1)
result2 = process_part2(part2)
final_result = delayed(lambda x, y: f"{x} | {y}")(result1, result2)
result = final_result.compute()
print(result)
6.5 批量计算
可以同时计算多个延迟任务:
from dask import delayed, compute
@delayed
def expensive_task(x):
return x ** 2
# 创建多个任务
tasks = [expensive_task(i) for i in range(10)]
# 方法1: 使用compute函数
results = compute(*tasks)
print(results) # 返回元组
# 方法2: 逐个计算
results = [task.compute() for task in tasks]
print(results)
# 方法3: 使用dask.compute
import dask
results = dask.compute(*tasks)
print(results)
6.6 错误处理
延迟计算中的错误处理:
from dask import delayed
@delayed
def risky_function(x):
if x == 5:
raise ValueError("模拟错误")
return x * 2
# 创建任务
tasks = [risky_function(i) for i in range(10)]
try:
results = delayed.compute(*tasks)
except ValueError as e:
print(f"捕获到错误: {e}")
# 或者单独处理每个任务
for i, task in enumerate(tasks):
try:
result = task.compute()
print(f"任务 {i}: {result}")
except ValueError as e:
print(f"任务 {i} 失败: {e}")
6.7 与Pandas和NumPy集成
延迟计算可以与Pandas和NumPy无缝集成:
import pandas as pd
import numpy as np
from dask import delayed
@delayed
def process_dataframe(df):
# 数据处理
df['new_column'] = df['value'] * 2
return df
@delayed
def process_array(arr):
# 数组处理
return np.sum(arr, axis=0)
# 创建数据
df = pd.DataFrame({'value': range(1000)})
arr = np.random.random((1000, 1000))
# 延迟处理
processed_df = process_dataframe(df)
processed_arr = process_array(arr)
# 计算结果
result_df, result_arr = delayed.compute(processed_df, processed_arr)
6.8 性能优化
优化延迟计算性能的技巧:
避免过细的分块
from dask import delayed
@delayed
def process_item(item):
return item * 2
# 不好的做法:过多的小任务
items = range(10000)
tasks = [process_item(item) for item in items]
result = delayed(sum)(tasks).compute()
# 好的做法:合并任务
def process_batch(batch):
return [item * 2 for item in batch]
items = range(10000)
batches = [items[i:i+100] for i in range(0, len(items), 100)]
batch_tasks = [delayed(process_batch)(batch) for batch in batches]
results = delayed.compute(*batch_tasks)
flattened_result = [item for batch in results for item in batch]
使用persist()优化
from dask import delayed
@delayed
def expensive_computation(x):
return x ** 2
# 创建复杂的计算图
x = expensive_computation(10)
y = expensive_computation(20)
z = delayed(lambda a, b: a + b)(x, y)
# 如果z会被多次使用,可以先持久化
z_persisted = z.persist()
# 后续计算会更快
result1 = delayed(lambda x: x * 2)(z_persisted)
result2 = delayed(lambda x: x * 3)(z_persisted)
final_result = delayed.compute(result1, result2)
6.9 实际应用示例
以下是一些延迟计算在实际应用中的示例:
并行文件处理
from dask import delayed
import os
@delayed
def process_file(filename):
# 模拟文件处理
with open(filename, 'r') as f:
content = f.read()
word_count = len(content.split())
return {filename: word_count}
# 获取文件列表
files = ['file1.txt', 'file2.txt', 'file3.txt']
# 并行处理文件
tasks = [process_file(f) for f in files if os.path.exists(f)]
results = delayed.compute(*tasks)
# 合并结果
total_word_count = sum(result[list(result.keys())[0]] for result in results)
print(f"总词数: {total_word_count}")
机器学习模型训练
from dask import delayed
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
@delayed
def train_and_evaluate_model(X, y, params):
model = RandomForestClassifier(**params)
scores = cross_val_score(model, X, y, cv=5)
return {
'params': params,
'mean_score': scores.mean(),
'std_score': scores.std()
}
# 参数网格
param_grid = [
{'n_estimators': 10, 'max_depth': 3},
{'n_estimators': 50, 'max_depth': 5},
{'n_estimators': 100, 'max_depth': 7}
]
# 并行训练和评估模型
tasks = [train_and_evaluate_model(X, y, params) for params in param_grid]
results = delayed.compute(*tasks)
# 找到最佳参数
best_result = max(results, key=lambda x: x['mean_score'])
print(f"最佳参数: {best_result['params']}")
print(f"最佳得分: {best_result['mean_score']:.4f} (+/- {best_result['std_score']*2:.4f})")
提示:延迟计算是Dask最灵活的功能之一,适用于各种并行计算场景。但要注意避免创建过多的小任务,这会增加调度开销。合理设计计算图结构,可以显著提高计算效率。