Dask部署与扩展

在本章中,我们将介绍如何部署Dask集群和扩展应用规模,包括本地部署、云部署和容器化部署等多种方式。

本地集群部署

手动启动集群

在本地环境中手动启动Dask集群:

# 启动调度器
dask scheduler

# 在新终端中启动工作节点
dask worker localhost:8786

# 启动多个工作节点
dask worker localhost:8786 --nworkers 4
                        

在Python代码中连接到集群:

from dask.distributed import Client

# 连接到本地集群
client = Client('localhost:8786')

# 查看集群信息
print(client)
print(f"Dashboard链接: {client.dashboard_link}")

# 简单测试
import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
result = x.sum().compute()
print(f"计算结果: {result}")
                        

创建本地集群

在Python代码中直接创建本地集群:

from dask.distributed import Client, LocalCluster

# 创建本地集群
cluster = LocalCluster(
    n_workers=4,           # 4个工作节点
    threads_per_worker=2,  # 每个工作节点2个线程
    memory_limit='2GB',    # 每个工作节点2GB内存限制
    processes=True         # 使用进程而非线程
)

# 连接到集群
client = Client(cluster)

print(f"集群地址: {cluster.scheduler_address}")
print(f"Dashboard链接: {cluster.dashboard_link}")

# 查看工作节点信息
print("工作节点信息:")
workers = client.scheduler_info()['workers']
for worker_addr, worker_info in workers.items():
    print(f"  {worker_addr}:")
    print(f"    内存: {worker_info['memory'] / 1024**3:.2f} GB")
    print(f"    CPU: {worker_info['cpu']}%")
                        

云平台部署

AWS部署

使用Dask Cloud Provider在AWS上部署集群:

from dask_cloudprovider.aws import FargateCluster

# 创建AWS Fargate集群
cluster = FargateCluster(
    region="us-east-1",
    n_workers=4,
    worker_cpu=1024,      # 1 vCPU
    worker_mem=2048,      # 2 GB内存
    image="daskdev/dask:latest",
    scheduler_timeout="1 hour"
)

# 连接到集群
client = Client(cluster)

print(f"集群地址: {cluster.scheduler_address}")
print(f"Dashboard链接: {cluster.dashboard_link}")

# 部署应用
def process_data(data_chunk):
    # 数据处理逻辑
    return data_chunk.sum()

# 提交任务
futures = client.map(process_data, data_chunks)
results = client.gather(futures)

# 关闭集群
# cluster.close()
                        

Google Cloud部署

使用Dask Cloud Provider在Google Cloud上部署集群:

from dask_cloudprovider.gcp import GCPCluster

# 创建GCP集群
cluster = GCPCluster(
    projectid="your-project-id",
    zone="us-central1-a",
    n_workers=4,
    machine_type="n1-standard-2",
    filesystem_size=50,  # 50GB磁盘
    docker_image="daskdev/dask:latest"
)

# 连接到集群
client = Client(cluster)

print(f"集群地址: {cluster.scheduler_address}")
print(f"Dashboard链接: {cluster.dashboard_link}")
                        

容器化部署

Docker部署

使用Docker部署Dask集群:

# Dockerfile
FROM daskdev/dask:latest

WORKDIR /app

# 安装额外依赖
COPY requirements.txt .
RUN pip install -r requirements.txt

# 复制应用代码
COPY . .

CMD ["dask-worker", "scheduler:8786"]
                        

使用Docker Compose部署:

# docker-compose.yml
version: '3'
services:
  scheduler:
    image: daskdev/dask:latest
    command: ["dask-scheduler", "--host", "0.0.0.0"]
    ports:
      - "8786:8786"  # 调度器端口
      - "8787:8787"  # Dashboard端口
    environment:
      - PYTHONPATH=/app

  worker:
    image: daskdev/dask:latest
    command: ["dask-worker", "tcp://scheduler:8786"]
    depends_on:
      - scheduler
    environment:
      - PYTHONPATH=/app
    deploy:
      replicas: 4  # 4个工作节点

  notebook:
    image: daskdev/dask-notebook:latest
    ports:
      - "8888:8888"
    depends_on:
      - scheduler
    environment:
      - DASK_SCHEDULER_ADDRESS=tcp://scheduler:8786
                        

启动集群:

# 启动集群
docker-compose up -d

# 查看服务状态
docker-compose ps

# 查看日志
docker-compose logs scheduler

# 关闭集群
docker-compose down
                        

Kubernetes部署

使用Helm Chart部署Dask到Kubernetes:

# 添加Dask Helm仓库
helm repo add dask https://helm.dask.org/
helm repo update

# 安装Dask集群
helm install my-dask-cluster dask/dask \
  --set scheduler.replicas=1 \
  --set worker.replicas=4 \
  --set worker.resources.limits.cpu=2 \
  --set worker.resources.limits.memory=4Gi
                        

自定义Helm Values:

# values.yaml
scheduler:
  serviceType: LoadBalancer
  resources:
    limits:
      cpu: 1
      memory: 2Gi

worker:
  replicas: 8
  resources:
    limits:
      cpu: 2
      memory: 4Gi
  env:
    - name: EXTRA_PIP_PACKAGES
      value: "dask-ml xgboost"

jupyter:
  enabled: true
  serviceType: LoadBalancer
                        

使用自定义配置部署:

helm install my-dask-cluster dask/dask -f values.yaml
                        

配置管理

环境变量配置

使用环境变量配置Dask:

# 设置环境变量
export DASK_SCHEDULER_ADDRESS="tcp://scheduler:8786"
export DASK_NUM_WORKERS=8
export DASK_THREADS_PER_WORKER=2
export DASK_MEMORY_LIMIT="4GB"

# 在Python中使用
export PYTHONPATH="/app:$PYTHONPATH"
                        

在Python代码中读取环境变量:

import os
from dask.distributed import Client

# 从环境变量读取配置
scheduler_address = os.environ.get('DASK_SCHEDULER_ADDRESS', 'localhost:8786')
num_workers = int(os.environ.get('DASK_NUM_WORKERS', 4))

# 连接到集群
client = Client(scheduler_address)

print(f"连接到集群: {scheduler_address}")
print(f"工作节点数量: {len(client.scheduler_info()['workers'])}")
                        

配置文件

使用YAML配置文件:

# dask-config.yaml
distributed:
  scheduler:
    work-stealing: True
    allowed-failures: 3
    bandwidth:
      cluster: 100000000  # 100 MB/s
      local: 5000000000   # 5 GB/s
  worker:
    memory:
      target: 0.6
      spill: 0.7
      pause: 0.8
      terminate: 0.95
    use-file-locking: False
  admin:
    tick:
      interval: 20ms
      limit: 3s
  comm:
    retry:
      count: 3
      delay:
        min: 1s
        max: 20s

logging:
  distributed: info
  distributed.client: warning
  bokeh: error
                        

在代码中加载配置:

import dask
import yaml

# 加载配置文件
with open('dask-config.yaml') as f:
    config = yaml.safe_load(f)

# 应用配置
dask.config.set(config)

# 或者使用环境变量指定配置文件
# export DASK_CONFIG="dask-config.yaml"
                        

监控和管理

Dashboard监控

使用Dask Dashboard监控集群状态:

from dask.distributed import Client
import webbrowser

# 连接到集群
client = Client('localhost:8786')

# 获取Dashboard链接
dashboard_url = client.dashboard_link
print(f"Dashboard链接: {dashboard_url}")

# 自动打开浏览器(可选)
# webbrowser.open(dashboard_url)

# 获取集群状态
def get_cluster_status(client):
    info = client.scheduler_info()
    
    print("集群状态:")
    print(f"  调度器地址: {info['address']}")
    print(f"  工作节点数量: {len(info['workers'])}")
    print(f"  任务数量: {len(info['tasks'])}")
    
    # 工作节点详细信息
    print("\n工作节点信息:")
    for worker_addr, worker_info in info['workers'].items():
        print(f"  {worker_addr}:")
        print(f"    内存使用: {worker_info['memory'] / 1024**3:.2f} GB")
        print(f"    CPU使用: {worker_info['cpu']}%")
        print(f"    任务数: {worker_info['metrics']['task_count']}")

# 获取状态
get_cluster_status(client)
                        

自定义监控

实现自定义监控逻辑:

from dask.distributed import Client
import time
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ClusterMonitor:
    def __init__(self, client):
        self.client = client
        self.monitoring = False
    
    def start_monitoring(self, interval=30):
        """开始监控集群"""
        self.monitoring = True
        logger.info("开始监控集群")
        
        while self.monitoring:
            self._check_cluster_health()
            time.sleep(interval)
    
    def stop_monitoring(self):
        """停止监控"""
        self.monitoring = False
        logger.info("停止监控集群")
    
    def _check_cluster_health(self):
        """检查集群健康状态"""
        try:
            info = self.client.scheduler_info()
            workers = info['workers']
            
            # 检查工作节点状态
            active_workers = len([w for w in workers.values() if w['status'] == 'running'])
            total_workers = len(workers)
            
            logger.info(f"工作节点状态: {active_workers}/{total_workers}")
            
            # 检查内存使用
            total_memory = sum(w['memory'] for w in workers.values())
            avg_memory = total_memory / len(workers) if workers else 0
            
            logger.info(f"平均内存使用: {avg_memory / 1024**3:.2f} GB")
            
            # 检查CPU使用
            avg_cpu = sum(w['cpu'] for w in workers.values()) / len(workers) if workers else 0
            logger.info(f"平均CPU使用: {avg_cpu:.1f}%")
            
            # 告警检查
            if active_workers < total_workers * 0.8:
                logger.warning(f"警告: {total_workers - active_workers} 个工作节点离线")
            
            if avg_memory > 0.8 * 4 * 1024**3:  # 假设每个节点4GB内存
                logger.warning("警告: 内存使用率过高")
                
        except Exception as e:
            logger.error(f"监控检查失败: {e}")

# 使用监控器
# client = Client('localhost:8786')
# monitor = ClusterMonitor(client)
# monitor.start_monitoring()
                        

扩展策略

水平扩展

动态添加工作节点:

from dask.distributed import Client
from dask.distributed import Worker

async def scale_cluster(client, target_workers):
    """动态调整集群大小"""
    current_workers = len(client.scheduler_info()['workers'])
    
    if target_workers > current_workers:
        # 添加工作节点
        workers_to_add = target_workers - current_workers
        print(f"添加 {workers_to_add} 个工作节点")
        
        # 这里需要根据部署方式添加节点
        # 对于LocalCluster:
        # await client.cluster.scale(target_workers)
        
    elif target_workers < current_workers:
        # 移除工作节点
        workers_to_remove = current_workers - target_workers
        print(f"移除 {workers_to_remove} 个工作节点")
        
        # 优雅地关闭工作节点
        # workers = list(client.scheduler_info()['workers'].keys())
        # for i in range(workers_to_remove):
        #     await client.retire_workers([workers[i]])

# 使用示例
# client = Client('localhost:8786')
# await scale_cluster(client, 8)  # 扩展到8个工作节点
                        

垂直扩展

调整工作节点资源配置:

import dask

# 调整内存限制
dask.config.set({
    'distributed.worker.memory.target': 0.6,
    'distributed.worker.memory.spill': 0.7,
    'distributed.worker.memory.pause': 0.8,
    'distributed.worker.memory.terminate': 0.95
})

# 调整线程数
dask.config.set({
    'distributed.worker.threads': 4  # 每个工作节点4个线程
})

# 在创建集群时指定资源
from dask.distributed import LocalCluster

cluster = LocalCluster(
    n_workers=4,
    threads_per_worker=4,    # 增加线程数
    memory_limit='8GB',      # 增加内存限制
    processes=True
)
                        

安全配置

TLS/SSL加密

配置TLS加密通信:

# 生成证书
openssl req -newkey rsa:4096 -nodes -keyout dask.key -x509 -days 365 -out dask.crt

# 启动加密的调度器
dask-scheduler --tls-ca-file dask.crt --tls-cert dask.crt --tls-key dask.key

# 启动加密的工作节点
dask-worker tls://scheduler-address:8786 --tls-ca-file dask.crt --tls-cert dask.crt --tls-key dask.key
                        

在Python中使用TLS连接:

from dask.distributed import Client
import ssl

# 创建SSL上下文
context = ssl.create_default_context()
context.load_verify_locations('dask.crt')
context.load_cert_chain('dask.crt', 'dask.key')

# 连接到加密集群
client = Client(
    'tls://scheduler-address:8786',
    security=context
)
                        

认证和授权

配置用户认证:

from dask.distributed import Client
from distributed.security import Security

# 配置认证
security = Security(
    tls_ca_file='dask.crt',
    tls_worker_cert='worker.crt',
    tls_worker_key='worker.key',
    tls_client_cert='client.crt',
    tls_client_key='client.key'
)

# 使用认证连接
client = Client(
    'tls://scheduler-address:8786',
    security=security
)

# 或者使用环境变量
# export DASK_SECURITY__TLS_CA_FILE=dask.crt
# export DASK_SECURITY__TLS_CLIENT_CERT=client.crt
# export DASK_SECURITY__TLS_CLIENT_KEY=client.key
                        

备份和恢复

集群状态备份

备份和恢复集群状态:

from dask.distributed import Client
import pickle
import json

def backup_cluster_state(client, backup_file):
    """备份集群状态"""
    info = client.scheduler_info()
    
    backup_data = {
        'timestamp': time.time(),
        'workers': {
            addr: {
                'memory': worker['memory'],
                'cpu': worker['cpu'],
                'resources': worker.get('resources', {}),
                'metrics': worker.get('metrics', {})
            }
            for addr, worker in info['workers'].items()
        },
        'tasks': {
            key: {
                'state': task['state'],
                'worker': task.get('worker', ''),
                'nbytes': task.get('nbytes', 0)
            }
            for key, task in info['tasks'].items()
        }
    }
    
    with open(backup_file, 'w') as f:
        json.dump(backup_data, f, indent=2)
    
    print(f"集群状态已备份到: {backup_file}")

def restore_cluster_state(client, backup_file):
    """恢复集群状态(概念性示例)"""
    with open(backup_file, 'r') as f:
        backup_data = json.load(f)
    
    print(f"从 {backup_file} 恢复集群状态")
    print(f"备份时间: {backup_data['timestamp']}")
    print(f"工作节点数: {len(backup_data['workers'])}")
    print(f"任务数: {len(backup_data['tasks'])}")

# 使用示例
# client = Client('localhost:8786')
# backup_cluster_state(client, 'cluster_backup.json')
# restore_cluster_state(client, 'cluster_backup.json')
                        

故障转移

高可用配置

配置高可用集群:

from dask.distributed import Client
import time

class HighAvailabilityClient:
    def __init__(self, scheduler_addresses):
        self.scheduler_addresses = scheduler_addresses
        self.current_client = None
        self.connect_to_available_scheduler()
    
    def connect_to_available_scheduler(self):
        """连接到可用的调度器"""
        for addr in self.scheduler_addresses:
            try:
                client = Client(addr, timeout=10)
                # 测试连接
                client.run_on_scheduler(lambda: "test")
                self.current_client = client
                print(f"成功连接到调度器: {addr}")
                return
            except Exception as e:
                print(f"无法连接到 {addr}: {e}")
                continue
        
        raise Exception("无法连接到任何调度器")
    
    def execute_with_failover(self, func, *args, **kwargs):
        """执行任务,支持故障转移"""
        max_retries = 3
        for attempt in range(max_retries):
            try:
                if not self.current_client:
                    self.connect_to_available_scheduler()
                
                result = func(*args, **kwargs)
                return result
            except Exception as e:
                print(f"执行失败 (尝试 {attempt + 1}/{max_retries}): {e}")
                if attempt < max_retries - 1:
                    print("尝试重新连接...")
                    self.current_client = None
                    time.sleep(5)  # 等待后重试
                else:
                    raise

# 使用示例
# ha_client = HighAvailabilityClient([
#     'tcp://scheduler1:8786',
#     'tcp://scheduler2:8786',
#     'tcp://scheduler3:8786'
# ])
# 
# def my_computation():
#     import dask.array as da
#     x = da.random.random((10000, 10000), chunks=(1000, 1000))
#     return x.sum().compute()
# 
# result = ha_client.execute_with_failover(my_computation)
                        

提示:部署Dask集群时,要根据实际需求选择合适的部署方式。本地部署适合开发和测试,云部署适合生产环境,容器化部署适合微服务架构。无论选择哪种方式,都要考虑安全性、监控和故障恢复机制。