Dask部署与扩展
在本章中,我们将介绍如何部署Dask集群和扩展应用规模,包括本地部署、云部署和容器化部署等多种方式。
本地集群部署
手动启动集群
在本地环境中手动启动Dask集群:
# 启动调度器
dask scheduler
# 在新终端中启动工作节点
dask worker localhost:8786
# 启动多个工作节点
dask worker localhost:8786 --nworkers 4
在Python代码中连接到集群:
from dask.distributed import Client
# 连接到本地集群
client = Client('localhost:8786')
# 查看集群信息
print(client)
print(f"Dashboard链接: {client.dashboard_link}")
# 简单测试
import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
result = x.sum().compute()
print(f"计算结果: {result}")
创建本地集群
在Python代码中直接创建本地集群:
from dask.distributed import Client, LocalCluster
# 创建本地集群
cluster = LocalCluster(
n_workers=4, # 4个工作节点
threads_per_worker=2, # 每个工作节点2个线程
memory_limit='2GB', # 每个工作节点2GB内存限制
processes=True # 使用进程而非线程
)
# 连接到集群
client = Client(cluster)
print(f"集群地址: {cluster.scheduler_address}")
print(f"Dashboard链接: {cluster.dashboard_link}")
# 查看工作节点信息
print("工作节点信息:")
workers = client.scheduler_info()['workers']
for worker_addr, worker_info in workers.items():
print(f" {worker_addr}:")
print(f" 内存: {worker_info['memory'] / 1024**3:.2f} GB")
print(f" CPU: {worker_info['cpu']}%")
云平台部署
AWS部署
使用Dask Cloud Provider在AWS上部署集群:
from dask_cloudprovider.aws import FargateCluster
# 创建AWS Fargate集群
cluster = FargateCluster(
region="us-east-1",
n_workers=4,
worker_cpu=1024, # 1 vCPU
worker_mem=2048, # 2 GB内存
image="daskdev/dask:latest",
scheduler_timeout="1 hour"
)
# 连接到集群
client = Client(cluster)
print(f"集群地址: {cluster.scheduler_address}")
print(f"Dashboard链接: {cluster.dashboard_link}")
# 部署应用
def process_data(data_chunk):
# 数据处理逻辑
return data_chunk.sum()
# 提交任务
futures = client.map(process_data, data_chunks)
results = client.gather(futures)
# 关闭集群
# cluster.close()
Google Cloud部署
使用Dask Cloud Provider在Google Cloud上部署集群:
from dask_cloudprovider.gcp import GCPCluster
# 创建GCP集群
cluster = GCPCluster(
projectid="your-project-id",
zone="us-central1-a",
n_workers=4,
machine_type="n1-standard-2",
filesystem_size=50, # 50GB磁盘
docker_image="daskdev/dask:latest"
)
# 连接到集群
client = Client(cluster)
print(f"集群地址: {cluster.scheduler_address}")
print(f"Dashboard链接: {cluster.dashboard_link}")
容器化部署
Docker部署
使用Docker部署Dask集群:
# Dockerfile
FROM daskdev/dask:latest
WORKDIR /app
# 安装额外依赖
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制应用代码
COPY . .
CMD ["dask-worker", "scheduler:8786"]
使用Docker Compose部署:
# docker-compose.yml
version: '3'
services:
scheduler:
image: daskdev/dask:latest
command: ["dask-scheduler", "--host", "0.0.0.0"]
ports:
- "8786:8786" # 调度器端口
- "8787:8787" # Dashboard端口
environment:
- PYTHONPATH=/app
worker:
image: daskdev/dask:latest
command: ["dask-worker", "tcp://scheduler:8786"]
depends_on:
- scheduler
environment:
- PYTHONPATH=/app
deploy:
replicas: 4 # 4个工作节点
notebook:
image: daskdev/dask-notebook:latest
ports:
- "8888:8888"
depends_on:
- scheduler
environment:
- DASK_SCHEDULER_ADDRESS=tcp://scheduler:8786
启动集群:
# 启动集群
docker-compose up -d
# 查看服务状态
docker-compose ps
# 查看日志
docker-compose logs scheduler
# 关闭集群
docker-compose down
Kubernetes部署
使用Helm Chart部署Dask到Kubernetes:
# 添加Dask Helm仓库
helm repo add dask https://helm.dask.org/
helm repo update
# 安装Dask集群
helm install my-dask-cluster dask/dask \
--set scheduler.replicas=1 \
--set worker.replicas=4 \
--set worker.resources.limits.cpu=2 \
--set worker.resources.limits.memory=4Gi
自定义Helm Values:
# values.yaml
scheduler:
serviceType: LoadBalancer
resources:
limits:
cpu: 1
memory: 2Gi
worker:
replicas: 8
resources:
limits:
cpu: 2
memory: 4Gi
env:
- name: EXTRA_PIP_PACKAGES
value: "dask-ml xgboost"
jupyter:
enabled: true
serviceType: LoadBalancer
使用自定义配置部署:
helm install my-dask-cluster dask/dask -f values.yaml
配置管理
环境变量配置
使用环境变量配置Dask:
# 设置环境变量
export DASK_SCHEDULER_ADDRESS="tcp://scheduler:8786"
export DASK_NUM_WORKERS=8
export DASK_THREADS_PER_WORKER=2
export DASK_MEMORY_LIMIT="4GB"
# 在Python中使用
export PYTHONPATH="/app:$PYTHONPATH"
在Python代码中读取环境变量:
import os
from dask.distributed import Client
# 从环境变量读取配置
scheduler_address = os.environ.get('DASK_SCHEDULER_ADDRESS', 'localhost:8786')
num_workers = int(os.environ.get('DASK_NUM_WORKERS', 4))
# 连接到集群
client = Client(scheduler_address)
print(f"连接到集群: {scheduler_address}")
print(f"工作节点数量: {len(client.scheduler_info()['workers'])}")
配置文件
使用YAML配置文件:
# dask-config.yaml
distributed:
scheduler:
work-stealing: True
allowed-failures: 3
bandwidth:
cluster: 100000000 # 100 MB/s
local: 5000000000 # 5 GB/s
worker:
memory:
target: 0.6
spill: 0.7
pause: 0.8
terminate: 0.95
use-file-locking: False
admin:
tick:
interval: 20ms
limit: 3s
comm:
retry:
count: 3
delay:
min: 1s
max: 20s
logging:
distributed: info
distributed.client: warning
bokeh: error
在代码中加载配置:
import dask
import yaml
# 加载配置文件
with open('dask-config.yaml') as f:
config = yaml.safe_load(f)
# 应用配置
dask.config.set(config)
# 或者使用环境变量指定配置文件
# export DASK_CONFIG="dask-config.yaml"
监控和管理
Dashboard监控
使用Dask Dashboard监控集群状态:
from dask.distributed import Client
import webbrowser
# 连接到集群
client = Client('localhost:8786')
# 获取Dashboard链接
dashboard_url = client.dashboard_link
print(f"Dashboard链接: {dashboard_url}")
# 自动打开浏览器(可选)
# webbrowser.open(dashboard_url)
# 获取集群状态
def get_cluster_status(client):
info = client.scheduler_info()
print("集群状态:")
print(f" 调度器地址: {info['address']}")
print(f" 工作节点数量: {len(info['workers'])}")
print(f" 任务数量: {len(info['tasks'])}")
# 工作节点详细信息
print("\n工作节点信息:")
for worker_addr, worker_info in info['workers'].items():
print(f" {worker_addr}:")
print(f" 内存使用: {worker_info['memory'] / 1024**3:.2f} GB")
print(f" CPU使用: {worker_info['cpu']}%")
print(f" 任务数: {worker_info['metrics']['task_count']}")
# 获取状态
get_cluster_status(client)
自定义监控
实现自定义监控逻辑:
from dask.distributed import Client
import time
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ClusterMonitor:
def __init__(self, client):
self.client = client
self.monitoring = False
def start_monitoring(self, interval=30):
"""开始监控集群"""
self.monitoring = True
logger.info("开始监控集群")
while self.monitoring:
self._check_cluster_health()
time.sleep(interval)
def stop_monitoring(self):
"""停止监控"""
self.monitoring = False
logger.info("停止监控集群")
def _check_cluster_health(self):
"""检查集群健康状态"""
try:
info = self.client.scheduler_info()
workers = info['workers']
# 检查工作节点状态
active_workers = len([w for w in workers.values() if w['status'] == 'running'])
total_workers = len(workers)
logger.info(f"工作节点状态: {active_workers}/{total_workers}")
# 检查内存使用
total_memory = sum(w['memory'] for w in workers.values())
avg_memory = total_memory / len(workers) if workers else 0
logger.info(f"平均内存使用: {avg_memory / 1024**3:.2f} GB")
# 检查CPU使用
avg_cpu = sum(w['cpu'] for w in workers.values()) / len(workers) if workers else 0
logger.info(f"平均CPU使用: {avg_cpu:.1f}%")
# 告警检查
if active_workers < total_workers * 0.8:
logger.warning(f"警告: {total_workers - active_workers} 个工作节点离线")
if avg_memory > 0.8 * 4 * 1024**3: # 假设每个节点4GB内存
logger.warning("警告: 内存使用率过高")
except Exception as e:
logger.error(f"监控检查失败: {e}")
# 使用监控器
# client = Client('localhost:8786')
# monitor = ClusterMonitor(client)
# monitor.start_monitoring()
扩展策略
水平扩展
动态添加工作节点:
from dask.distributed import Client
from dask.distributed import Worker
async def scale_cluster(client, target_workers):
"""动态调整集群大小"""
current_workers = len(client.scheduler_info()['workers'])
if target_workers > current_workers:
# 添加工作节点
workers_to_add = target_workers - current_workers
print(f"添加 {workers_to_add} 个工作节点")
# 这里需要根据部署方式添加节点
# 对于LocalCluster:
# await client.cluster.scale(target_workers)
elif target_workers < current_workers:
# 移除工作节点
workers_to_remove = current_workers - target_workers
print(f"移除 {workers_to_remove} 个工作节点")
# 优雅地关闭工作节点
# workers = list(client.scheduler_info()['workers'].keys())
# for i in range(workers_to_remove):
# await client.retire_workers([workers[i]])
# 使用示例
# client = Client('localhost:8786')
# await scale_cluster(client, 8) # 扩展到8个工作节点
垂直扩展
调整工作节点资源配置:
import dask
# 调整内存限制
dask.config.set({
'distributed.worker.memory.target': 0.6,
'distributed.worker.memory.spill': 0.7,
'distributed.worker.memory.pause': 0.8,
'distributed.worker.memory.terminate': 0.95
})
# 调整线程数
dask.config.set({
'distributed.worker.threads': 4 # 每个工作节点4个线程
})
# 在创建集群时指定资源
from dask.distributed import LocalCluster
cluster = LocalCluster(
n_workers=4,
threads_per_worker=4, # 增加线程数
memory_limit='8GB', # 增加内存限制
processes=True
)
安全配置
TLS/SSL加密
配置TLS加密通信:
# 生成证书
openssl req -newkey rsa:4096 -nodes -keyout dask.key -x509 -days 365 -out dask.crt
# 启动加密的调度器
dask-scheduler --tls-ca-file dask.crt --tls-cert dask.crt --tls-key dask.key
# 启动加密的工作节点
dask-worker tls://scheduler-address:8786 --tls-ca-file dask.crt --tls-cert dask.crt --tls-key dask.key
在Python中使用TLS连接:
from dask.distributed import Client
import ssl
# 创建SSL上下文
context = ssl.create_default_context()
context.load_verify_locations('dask.crt')
context.load_cert_chain('dask.crt', 'dask.key')
# 连接到加密集群
client = Client(
'tls://scheduler-address:8786',
security=context
)
认证和授权
配置用户认证:
from dask.distributed import Client
from distributed.security import Security
# 配置认证
security = Security(
tls_ca_file='dask.crt',
tls_worker_cert='worker.crt',
tls_worker_key='worker.key',
tls_client_cert='client.crt',
tls_client_key='client.key'
)
# 使用认证连接
client = Client(
'tls://scheduler-address:8786',
security=security
)
# 或者使用环境变量
# export DASK_SECURITY__TLS_CA_FILE=dask.crt
# export DASK_SECURITY__TLS_CLIENT_CERT=client.crt
# export DASK_SECURITY__TLS_CLIENT_KEY=client.key
备份和恢复
集群状态备份
备份和恢复集群状态:
from dask.distributed import Client
import pickle
import json
def backup_cluster_state(client, backup_file):
"""备份集群状态"""
info = client.scheduler_info()
backup_data = {
'timestamp': time.time(),
'workers': {
addr: {
'memory': worker['memory'],
'cpu': worker['cpu'],
'resources': worker.get('resources', {}),
'metrics': worker.get('metrics', {})
}
for addr, worker in info['workers'].items()
},
'tasks': {
key: {
'state': task['state'],
'worker': task.get('worker', ''),
'nbytes': task.get('nbytes', 0)
}
for key, task in info['tasks'].items()
}
}
with open(backup_file, 'w') as f:
json.dump(backup_data, f, indent=2)
print(f"集群状态已备份到: {backup_file}")
def restore_cluster_state(client, backup_file):
"""恢复集群状态(概念性示例)"""
with open(backup_file, 'r') as f:
backup_data = json.load(f)
print(f"从 {backup_file} 恢复集群状态")
print(f"备份时间: {backup_data['timestamp']}")
print(f"工作节点数: {len(backup_data['workers'])}")
print(f"任务数: {len(backup_data['tasks'])}")
# 使用示例
# client = Client('localhost:8786')
# backup_cluster_state(client, 'cluster_backup.json')
# restore_cluster_state(client, 'cluster_backup.json')
故障转移
高可用配置
配置高可用集群:
from dask.distributed import Client
import time
class HighAvailabilityClient:
def __init__(self, scheduler_addresses):
self.scheduler_addresses = scheduler_addresses
self.current_client = None
self.connect_to_available_scheduler()
def connect_to_available_scheduler(self):
"""连接到可用的调度器"""
for addr in self.scheduler_addresses:
try:
client = Client(addr, timeout=10)
# 测试连接
client.run_on_scheduler(lambda: "test")
self.current_client = client
print(f"成功连接到调度器: {addr}")
return
except Exception as e:
print(f"无法连接到 {addr}: {e}")
continue
raise Exception("无法连接到任何调度器")
def execute_with_failover(self, func, *args, **kwargs):
"""执行任务,支持故障转移"""
max_retries = 3
for attempt in range(max_retries):
try:
if not self.current_client:
self.connect_to_available_scheduler()
result = func(*args, **kwargs)
return result
except Exception as e:
print(f"执行失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
print("尝试重新连接...")
self.current_client = None
time.sleep(5) # 等待后重试
else:
raise
# 使用示例
# ha_client = HighAvailabilityClient([
# 'tcp://scheduler1:8786',
# 'tcp://scheduler2:8786',
# 'tcp://scheduler3:8786'
# ])
#
# def my_computation():
# import dask.array as da
# x = da.random.random((10000, 10000), chunks=(1000, 1000))
# return x.sum().compute()
#
# result = ha_client.execute_with_failover(my_computation)
提示:部署Dask集群时,要根据实际需求选择合适的部署方式。本地部署适合开发和测试,云部署适合生产环境,容器化部署适合微服务架构。无论选择哪种方式,都要考虑安全性、监控和故障恢复机制。