第7章 Dask并行循环

在本章中,我们将学习如何使用Dask并行化循环和函数调用。Dask提供了多种方式来并行化Python代码,包括dask.bag、map函数等。

7.1 Dask Bag简介

Dask Bag是用于处理大型Python序列的并行化工具,适用于处理JSON记录、日志文件等非结构化数据。它提供了一种类似于函数式编程的接口。

Dask Bag的特点

  • 函数式接口:提供map、filter、groupby等函数式操作
  • 并行处理:自动并行化序列操作
  • 内存效率:流式处理,不需要将所有数据加载到内存
  • 灵活性:适用于各种非结构化数据
# 示例:创建和使用Dask Bag
import dask.bag as db

# 从序列创建Bag
bag = db.from_sequence(range(1000), npartitions=4)

# 基本操作
result = bag.map(lambda x: x * 2).filter(lambda x: x > 100).take(5)
print(list(result))
                        

7.2 创建Dask Bag

有多种方式可以创建Dask Bag:

从序列创建

import dask.bag as db

# 从Python序列创建
bag = db.from_sequence([1, 2, 3, 4, 5], npartitions=2)

# 从range创建
bag = db.from_sequence(range(1000), npartitions=10)

# 从生成器创建
def number_generator():
    for i in range(1000):
        yield i

bag = db.from_sequence(number_generator(), npartitions=5)
                    

从文件创建

# 从文本文件创建
bag = db.read_text('data/*.txt')

# 从JSON文件创建
bag = db.read_text('data/*.json').map(json.loads)

# 从CSV文件创建(需要额外处理)
bag = db.read_text('data.csv', blocksize=1024*1024)  # 1MB块
                    

从URL创建

# 从URL列表创建
urls = ['http://example.com/file1.txt', 'http://example.com/file2.txt']
bag = db.read_text(urls)
                    

7.3 Bag基本操作

Dask Bag提供丰富的函数式操作:

映射操作

import dask.bag as db

# 创建Bag
bag = db.from_sequence(range(100), npartitions=4)

# 基本映射
squared = bag.map(lambda x: x ** 2)

# 带参数的映射
def power(x, n):
    return x ** n

powered = bag.map(power, 3)  # 立方

# 映射多个参数
def add(x, y):
    return x + y

bag1 = db.from_sequence(range(10))
bag2 = db.from_sequence(range(10, 20))
added = db.map(add, bag1, bag2)
                    

过滤操作

# 过滤操作
bag = db.from_sequence(range(100), npartitions=4)

# 过滤偶数
evens = bag.filter(lambda x: x % 2 == 0)

# 复杂过滤
filtered = bag.filter(lambda x: x > 10 and x < 50)
                    

聚合操作

# 聚合操作
bag = db.from_sequence(range(100), npartitions=4)

# 求和
total = bag.sum().compute()

# 计数
count = bag.count().compute()

# 最大值/最小值
maximum = bag.max().compute()
minimum = bag.min().compute()

# 平均值
mean = bag.mean().compute()
                    

7.4 分组和归约操作

Dask Bag支持强大的分组和归约功能:

import dask.bag as db

# 创建示例数据
data = [('apple', 3), ('banana', 2), ('apple', 5), ('banana', 1), ('cherry', 4)]
bag = db.from_sequence(data, npartitions=2)

# 按键分组
grouped = bag.groupby(lambda x: x[0])

# 归约操作
def sum_values(group):
    key, values = group
    total = sum(value[1] for value in values)
    return (key, total)

result = grouped.map(sum_values).compute()
print(result)  # [('apple', 8), ('banana', 3), ('cherry', 4)]

# 使用内置的pluck方法
bag = db.from_sequence([{'name': 'Alice', 'age': 25}, {'name': 'Bob', 'age': 30}])
names = bag.pluck('name').compute()
ages = bag.pluck('age').compute()
                    

7.5 并行化普通循环

使用Dask可以轻松并行化普通的Python循环:

from dask import delayed, compute
import time

# 普通串行循环
def expensive_task(x):
    time.sleep(0.1)  # 模拟耗时操作
    return x ** 2

# 串行执行
start = time.time()
results_serial = [expensive_task(i) for i in range(20)]
print(f"串行执行耗时: {time.time() - start:.2f}秒")

# 并行执行
start = time.time()
tasks = [delayed(expensive_task)(i) for i in range(20)]
results_parallel = compute(*tasks)
print(f"并行执行耗时: {time.time() - start:.2f}秒")
                    

7.6 使用map函数并行化

Dask的map函数提供了另一种并行化方式:

from dask import map as dask_map

def process_item(item):
    return item * 2

# 并行映射
items = range(100)
results = list(dask_map(process_item, items))
                    

7.7 处理复杂数据结构

Dask Bag可以处理复杂的嵌套数据结构:

import dask.bag as db
import json

# 处理JSON数据
json_data = [
    '{"name": "Alice", "age": 25, "skills": ["Python", "Dask"]}',
    '{"name": "Bob", "age": 30, "skills": ["Java", "Spark"]}',
    '{"name": "Charlie", "age": 35, "skills": ["Python", "TensorFlow"]}'
]

bag = db.from_sequence(json_data).map(json.loads)

# 提取特定字段
names = bag.pluck('name').compute()
ages = bag.pluck('age').compute()

# 复杂处理
python_users = bag.filter(lambda x: 'Python' in x['skills']).pluck('name').compute()
avg_age = bag.pluck('age').mean().compute()
                    

7.8 错误处理和调试

在并行处理中处理错误和调试:

import dask.bag as db

def risky_function(x):
    if x == 5:
        raise ValueError("模拟错误")
    return x * 2

# 创建Bag
bag = db.from_sequence(range(10), npartitions=3)

# 处理可能的错误
def safe_process(x):
    try:
        return risky_function(x)
    except Exception as e:
        return None

result = bag.map(safe_process).filter(lambda x: x is not None).compute()
print(result)

# 调试:查看部分结果
bag = db.from_sequence(range(1000), npartitions=10)
partial_result = bag.map(lambda x: x * 2).take(5)
print(list(partial_result))
                    

7.9 性能优化技巧

优化Dask Bag性能的技巧:

合理设置分区数

import dask.bag as db

# 数据量大时增加分区数
large_bag = db.from_sequence(range(1000000), npartitions=100)

# 数据量小时减少分区数
small_bag = db.from_sequence(range(100), npartitions=2)
                    

使用persist()优化

import dask.bag as db

# 复杂的多步骤处理
bag = db.from_sequence(range(10000), npartitions=50)
processed = bag.map(lambda x: x ** 2).filter(lambda x: x > 1000)

# 如果后续会多次使用,可以先持久化
persisted = processed.persist()

# 后续操作会更快
result1 = persisted.sum().compute()
result2 = persisted.count().compute()
                    

7.10 实际应用示例

以下是一些Dask Bag在实际应用中的示例:

日志分析

import dask.bag as db
import re
from datetime import datetime

# 分析Web服务器日志
def parse_log_line(line):
    # 解析日志行
    pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\d+) (\d+)'
    match = re.match(pattern, line)
    if match:
        ip, timestamp, request, status, size = match.groups()
        return {
            'ip': ip,
            'timestamp': timestamp,
            'request': request,
            'status': int(status),
            'size': int(size)
        }
    return None

# 读取日志文件
logs = db.read_text('access_logs/*.log').map(parse_log_line).filter(lambda x: x is not None)

# 分析错误请求
error_logs = logs.filter(lambda x: x['status'] >= 400)
error_count = error_logs.count().compute()

# 分析热门页面
popular_pages = logs.map(lambda x: x['request']).frequencies().topk(10, lambda x: x[1]).compute()

# 分析流量统计
total_bytes = logs.pluck('size').sum().compute()
print(f"总流量: {total_bytes} bytes")
                        

数据清洗

import dask.bag as db
import json

# 清洗用户数据
def clean_user_data(user_json):
    try:
        user = json.loads(user_json)
        # 数据验证和清洗
        if 'email' in user and '@' in user['email']:
            user['email'] = user['email'].lower().strip()
            user['valid'] = True
        else:
            user['valid'] = False
        return user
    except:
        return {'valid': False}

# 读取用户数据
users = db.read_text('users/*.json').map(clean_user_data)

# 统计有效用户
valid_users = users.filter(lambda x: x['valid']).count().compute()
invalid_users = users.filter(lambda x: not x['valid']).count().compute()

print(f"有效用户: {valid_users}")
print(f"无效用户: {invalid_users}")

# 保存清洗后的数据
valid_users_data = users.filter(lambda x: x['valid']).map(json.dumps).to_textfiles('cleaned_users/*.json')
                    

提示:Dask Bag适用于处理非结构化或半结构化数据,特别是当数据可以表示为序列时。对于结构化数据(如表格数据),建议使用Dask DataFrame;对于数值计算,建议使用Dask Array。