第6章 Dask延迟计算

在本章中,我们将学习Dask的延迟计算机制和dask.delayed装饰器。延迟计算是Dask的核心特性之一,它允许我们将任意Python函数并行化执行。

6.1 延迟计算简介

延迟计算(Lazy Evaluation)是Dask的核心机制之一。当我们使用dask.delayed装饰器包装函数时,函数调用不会立即执行,而是创建一个任务图,直到调用compute()方法时才真正执行计算。

延迟计算的优势

  • 优化执行计划:Dask可以在执行前分析整个计算图,优化执行顺序
  • 减少内存使用:只在需要时才计算和存储中间结果
  • 并行执行:独立的任务可以并行执行
  • 错误提前发现:在构建计算图时就能发现一些错误
# 示例:延迟计算与立即计算的对比
import time
from dask import delayed

# 普通函数
def expensive_operation(x):
    time.sleep(1)  # 模拟耗时操作
    return x * 2

# 立即计算(串行执行)
start = time.time()
results = [expensive_operation(i) for i in range(5)]
print(f"立即计算耗时: {time.time() - start:.2f}秒")

# 延迟计算(并行执行)
start = time.time()
delayed_results = [delayed(expensive_operation)(i) for i in range(5)]
computed_results = delayed.compute(*delayed_results)
print(f"延迟计算耗时: {time.time() - start:.2f}秒")
                        

6.2 使用dask.delayed装饰器

dask.delayed装饰器是最常用的延迟计算方式,它可以将任何Python函数转换为延迟执行的函数。

基本用法

from dask import delayed

# 使用装饰器
@delayed
def add(a, b):
    return a + b

@delayed
def multiply(a, b):
    return a * b

# 创建延迟任务
x = add(1, 2)  # 不会立即执行
y = add(3, 4)  # 不会立即执行
z = multiply(x, y)  # 不会立即执行

# 执行计算
result = z.compute()  # 现在才真正执行
print(result)  # 输出: 21
                    

不使用装饰器的方式

from dask import delayed

def add(a, b):
    return a + b

def multiply(a, b):
    return a * b

# 手动创建延迟任务
x = delayed(add)(1, 2)
y = delayed(add)(3, 4)
z = delayed(multiply)(x, y)

# 执行计算
result = z.compute()
print(result)  # 输出: 21
                    

6.3 构建复杂的计算图

通过组合多个延迟任务,可以构建复杂的计算图:

from dask import delayed
import time

@delayed
def load_data(filename):
    time.sleep(1)  # 模拟加载时间
    return f"Data from {filename}"

@delayed
def process_data(data):
    time.sleep(1)  # 模拟处理时间
    return f"Processed {data}"

@delayed
def combine_results(*results):
    time.sleep(1)  # 模拟合并时间
    return f"Combined: {', '.join(results)}"

# 构建计算图
data1 = load_data("file1.txt")
data2 = load_data("file2.txt")
processed1 = process_data(data1)
processed2 = process_data(data2)
final_result = combine_results(processed1, processed2)

# 可视化计算图
final_result.visualize(filename="computation_graph.png")

# 执行计算
result = final_result.compute()
print(result)
                    

6.4 控制任务执行

可以通过参数控制延迟任务的执行方式:

设置任务名称

from dask import delayed

@delayed(name="custom_add")
def add(a, b):
    return a + b

result = add(1, 2)
print(result.key)  # 输出任务名称
                    

控制资源使用

from dask import delayed

@delayed(nout=2)  # 指定返回值数量
def split_data(data):
    mid = len(data) // 2
    return data[:mid], data[mid:]

@delayed
def process_part1(part):
    return f"Part1: {part}"

@delayed
def process_part2(part):
    return f"Part2: {part}"

# 使用
data = [1, 2, 3, 4, 5, 6]
part1, part2 = split_data(data)
result1 = process_part1(part1)
result2 = process_part2(part2)
final_result = delayed(lambda x, y: f"{x} | {y}")(result1, result2)

result = final_result.compute()
print(result)
                    

6.5 批量计算

可以同时计算多个延迟任务:

from dask import delayed, compute

@delayed
def expensive_task(x):
    return x ** 2

# 创建多个任务
tasks = [expensive_task(i) for i in range(10)]

# 方法1: 使用compute函数
results = compute(*tasks)
print(results)  # 返回元组

# 方法2: 逐个计算
results = [task.compute() for task in tasks]
print(results)

# 方法3: 使用dask.compute
import dask
results = dask.compute(*tasks)
print(results)
                    

6.6 错误处理

延迟计算中的错误处理:

from dask import delayed

@delayed
def risky_function(x):
    if x == 5:
        raise ValueError("模拟错误")
    return x * 2

# 创建任务
tasks = [risky_function(i) for i in range(10)]

try:
    results = delayed.compute(*tasks)
except ValueError as e:
    print(f"捕获到错误: {e}")

# 或者单独处理每个任务
for i, task in enumerate(tasks):
    try:
        result = task.compute()
        print(f"任务 {i}: {result}")
    except ValueError as e:
        print(f"任务 {i} 失败: {e}")
                    

6.7 与Pandas和NumPy集成

延迟计算可以与Pandas和NumPy无缝集成:

import pandas as pd
import numpy as np
from dask import delayed

@delayed
def process_dataframe(df):
    # 数据处理
    df['new_column'] = df['value'] * 2
    return df

@delayed
def process_array(arr):
    # 数组处理
    return np.sum(arr, axis=0)

# 创建数据
df = pd.DataFrame({'value': range(1000)})
arr = np.random.random((1000, 1000))

# 延迟处理
processed_df = process_dataframe(df)
processed_arr = process_array(arr)

# 计算结果
result_df, result_arr = delayed.compute(processed_df, processed_arr)
                    

6.8 性能优化

优化延迟计算性能的技巧:

避免过细的分块

from dask import delayed

@delayed
def process_item(item):
    return item * 2

# 不好的做法:过多的小任务
items = range(10000)
tasks = [process_item(item) for item in items]
result = delayed(sum)(tasks).compute()

# 好的做法:合并任务
def process_batch(batch):
    return [item * 2 for item in batch]

items = range(10000)
batches = [items[i:i+100] for i in range(0, len(items), 100)]
batch_tasks = [delayed(process_batch)(batch) for batch in batches]
results = delayed.compute(*batch_tasks)
flattened_result = [item for batch in results for item in batch]
                    

使用persist()优化

from dask import delayed

@delayed
def expensive_computation(x):
    return x ** 2

# 创建复杂的计算图
x = expensive_computation(10)
y = expensive_computation(20)
z = delayed(lambda a, b: a + b)(x, y)

# 如果z会被多次使用,可以先持久化
z_persisted = z.persist()

# 后续计算会更快
result1 = delayed(lambda x: x * 2)(z_persisted)
result2 = delayed(lambda x: x * 3)(z_persisted)
final_result = delayed.compute(result1, result2)
                    

6.9 实际应用示例

以下是一些延迟计算在实际应用中的示例:

并行文件处理

from dask import delayed
import os

@delayed
def process_file(filename):
    # 模拟文件处理
    with open(filename, 'r') as f:
        content = f.read()
        word_count = len(content.split())
        return {filename: word_count}

# 获取文件列表
files = ['file1.txt', 'file2.txt', 'file3.txt']

# 并行处理文件
tasks = [process_file(f) for f in files if os.path.exists(f)]
results = delayed.compute(*tasks)

# 合并结果
total_word_count = sum(result[list(result.keys())[0]] for result in results)
print(f"总词数: {total_word_count}")
                        

机器学习模型训练

from dask import delayed
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier

@delayed
def train_and_evaluate_model(X, y, params):
    model = RandomForestClassifier(**params)
    scores = cross_val_score(model, X, y, cv=5)
    return {
        'params': params,
        'mean_score': scores.mean(),
        'std_score': scores.std()
    }

# 参数网格
param_grid = [
    {'n_estimators': 10, 'max_depth': 3},
    {'n_estimators': 50, 'max_depth': 5},
    {'n_estimators': 100, 'max_depth': 7}
]

# 并行训练和评估模型
tasks = [train_and_evaluate_model(X, y, params) for params in param_grid]
results = delayed.compute(*tasks)

# 找到最佳参数
best_result = max(results, key=lambda x: x['mean_score'])
print(f"最佳参数: {best_result['params']}")
print(f"最佳得分: {best_result['mean_score']:.4f} (+/- {best_result['std_score']*2:.4f})")
                        

提示:延迟计算是Dask最灵活的功能之一,适用于各种并行计算场景。但要注意避免创建过多的小任务,这会增加调度开销。合理设计计算图结构,可以显著提高计算效率。