第7章 Dask并行循环
在本章中,我们将学习如何使用Dask并行化循环和函数调用。Dask提供了多种方式来并行化Python代码,包括dask.bag、map函数等。
7.1 Dask Bag简介
Dask Bag是用于处理大型Python序列的并行化工具,适用于处理JSON记录、日志文件等非结构化数据。它提供了一种类似于函数式编程的接口。
Dask Bag的特点
- 函数式接口:提供map、filter、groupby等函数式操作
- 并行处理:自动并行化序列操作
- 内存效率:流式处理,不需要将所有数据加载到内存
- 灵活性:适用于各种非结构化数据
# 示例:创建和使用Dask Bag
import dask.bag as db
# 从序列创建Bag
bag = db.from_sequence(range(1000), npartitions=4)
# 基本操作
result = bag.map(lambda x: x * 2).filter(lambda x: x > 100).take(5)
print(list(result))
7.2 创建Dask Bag
有多种方式可以创建Dask Bag:
从序列创建
import dask.bag as db
# 从Python序列创建
bag = db.from_sequence([1, 2, 3, 4, 5], npartitions=2)
# 从range创建
bag = db.from_sequence(range(1000), npartitions=10)
# 从生成器创建
def number_generator():
for i in range(1000):
yield i
bag = db.from_sequence(number_generator(), npartitions=5)
从文件创建
# 从文本文件创建
bag = db.read_text('data/*.txt')
# 从JSON文件创建
bag = db.read_text('data/*.json').map(json.loads)
# 从CSV文件创建(需要额外处理)
bag = db.read_text('data.csv', blocksize=1024*1024) # 1MB块
从URL创建
# 从URL列表创建
urls = ['http://example.com/file1.txt', 'http://example.com/file2.txt']
bag = db.read_text(urls)
7.3 Bag基本操作
Dask Bag提供丰富的函数式操作:
映射操作
import dask.bag as db
# 创建Bag
bag = db.from_sequence(range(100), npartitions=4)
# 基本映射
squared = bag.map(lambda x: x ** 2)
# 带参数的映射
def power(x, n):
return x ** n
powered = bag.map(power, 3) # 立方
# 映射多个参数
def add(x, y):
return x + y
bag1 = db.from_sequence(range(10))
bag2 = db.from_sequence(range(10, 20))
added = db.map(add, bag1, bag2)
过滤操作
# 过滤操作
bag = db.from_sequence(range(100), npartitions=4)
# 过滤偶数
evens = bag.filter(lambda x: x % 2 == 0)
# 复杂过滤
filtered = bag.filter(lambda x: x > 10 and x < 50)
聚合操作
# 聚合操作
bag = db.from_sequence(range(100), npartitions=4)
# 求和
total = bag.sum().compute()
# 计数
count = bag.count().compute()
# 最大值/最小值
maximum = bag.max().compute()
minimum = bag.min().compute()
# 平均值
mean = bag.mean().compute()
7.4 分组和归约操作
Dask Bag支持强大的分组和归约功能:
import dask.bag as db
# 创建示例数据
data = [('apple', 3), ('banana', 2), ('apple', 5), ('banana', 1), ('cherry', 4)]
bag = db.from_sequence(data, npartitions=2)
# 按键分组
grouped = bag.groupby(lambda x: x[0])
# 归约操作
def sum_values(group):
key, values = group
total = sum(value[1] for value in values)
return (key, total)
result = grouped.map(sum_values).compute()
print(result) # [('apple', 8), ('banana', 3), ('cherry', 4)]
# 使用内置的pluck方法
bag = db.from_sequence([{'name': 'Alice', 'age': 25}, {'name': 'Bob', 'age': 30}])
names = bag.pluck('name').compute()
ages = bag.pluck('age').compute()
7.5 并行化普通循环
使用Dask可以轻松并行化普通的Python循环:
from dask import delayed, compute
import time
# 普通串行循环
def expensive_task(x):
time.sleep(0.1) # 模拟耗时操作
return x ** 2
# 串行执行
start = time.time()
results_serial = [expensive_task(i) for i in range(20)]
print(f"串行执行耗时: {time.time() - start:.2f}秒")
# 并行执行
start = time.time()
tasks = [delayed(expensive_task)(i) for i in range(20)]
results_parallel = compute(*tasks)
print(f"并行执行耗时: {time.time() - start:.2f}秒")
7.6 使用map函数并行化
Dask的map函数提供了另一种并行化方式:
from dask import map as dask_map
def process_item(item):
return item * 2
# 并行映射
items = range(100)
results = list(dask_map(process_item, items))
7.7 处理复杂数据结构
Dask Bag可以处理复杂的嵌套数据结构:
import dask.bag as db
import json
# 处理JSON数据
json_data = [
'{"name": "Alice", "age": 25, "skills": ["Python", "Dask"]}',
'{"name": "Bob", "age": 30, "skills": ["Java", "Spark"]}',
'{"name": "Charlie", "age": 35, "skills": ["Python", "TensorFlow"]}'
]
bag = db.from_sequence(json_data).map(json.loads)
# 提取特定字段
names = bag.pluck('name').compute()
ages = bag.pluck('age').compute()
# 复杂处理
python_users = bag.filter(lambda x: 'Python' in x['skills']).pluck('name').compute()
avg_age = bag.pluck('age').mean().compute()
7.8 错误处理和调试
在并行处理中处理错误和调试:
import dask.bag as db
def risky_function(x):
if x == 5:
raise ValueError("模拟错误")
return x * 2
# 创建Bag
bag = db.from_sequence(range(10), npartitions=3)
# 处理可能的错误
def safe_process(x):
try:
return risky_function(x)
except Exception as e:
return None
result = bag.map(safe_process).filter(lambda x: x is not None).compute()
print(result)
# 调试:查看部分结果
bag = db.from_sequence(range(1000), npartitions=10)
partial_result = bag.map(lambda x: x * 2).take(5)
print(list(partial_result))
7.9 性能优化技巧
优化Dask Bag性能的技巧:
合理设置分区数
import dask.bag as db
# 数据量大时增加分区数
large_bag = db.from_sequence(range(1000000), npartitions=100)
# 数据量小时减少分区数
small_bag = db.from_sequence(range(100), npartitions=2)
使用persist()优化
import dask.bag as db
# 复杂的多步骤处理
bag = db.from_sequence(range(10000), npartitions=50)
processed = bag.map(lambda x: x ** 2).filter(lambda x: x > 1000)
# 如果后续会多次使用,可以先持久化
persisted = processed.persist()
# 后续操作会更快
result1 = persisted.sum().compute()
result2 = persisted.count().compute()
7.10 实际应用示例
以下是一些Dask Bag在实际应用中的示例:
日志分析
import dask.bag as db
import re
from datetime import datetime
# 分析Web服务器日志
def parse_log_line(line):
# 解析日志行
pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\d+) (\d+)'
match = re.match(pattern, line)
if match:
ip, timestamp, request, status, size = match.groups()
return {
'ip': ip,
'timestamp': timestamp,
'request': request,
'status': int(status),
'size': int(size)
}
return None
# 读取日志文件
logs = db.read_text('access_logs/*.log').map(parse_log_line).filter(lambda x: x is not None)
# 分析错误请求
error_logs = logs.filter(lambda x: x['status'] >= 400)
error_count = error_logs.count().compute()
# 分析热门页面
popular_pages = logs.map(lambda x: x['request']).frequencies().topk(10, lambda x: x[1]).compute()
# 分析流量统计
total_bytes = logs.pluck('size').sum().compute()
print(f"总流量: {total_bytes} bytes")
数据清洗
import dask.bag as db
import json
# 清洗用户数据
def clean_user_data(user_json):
try:
user = json.loads(user_json)
# 数据验证和清洗
if 'email' in user and '@' in user['email']:
user['email'] = user['email'].lower().strip()
user['valid'] = True
else:
user['valid'] = False
return user
except:
return {'valid': False}
# 读取用户数据
users = db.read_text('users/*.json').map(clean_user_data)
# 统计有效用户
valid_users = users.filter(lambda x: x['valid']).count().compute()
invalid_users = users.filter(lambda x: not x['valid']).count().compute()
print(f"有效用户: {valid_users}")
print(f"无效用户: {invalid_users}")
# 保存清洗后的数据
valid_users_data = users.filter(lambda x: x['valid']).map(json.dumps).to_textfiles('cleaned_users/*.json')
提示:Dask Bag适用于处理非结构化或半结构化数据,特别是当数据可以表示为序列时。对于结构化数据(如表格数据),建议使用Dask DataFrame;对于数值计算,建议使用Dask Array。