Dask机器学习应用

在本章中,我们将介绍如何使用Dask进行大规模机器学习和数据科学应用。Dask与scikit-learn无缝集成,提供了处理大规模数据集的机器学习能力。

Dask-ML简介

Dask-ML是Dask的机器学习扩展,它提供了与scikit-learn兼容的大规模机器学习算法。

Dask-ML的特点

  • 兼容性:与scikit-learn API兼容,学习成本低
  • 可扩展性:支持大规模数据集和特征
  • 并行化:自动并行化机器学习算法
  • 分布式:支持分布式计算环境

安装Dask-ML

# 安装Dask-ML
pip install dask-ml

# 或者使用conda
conda install -c conda-forge dask-ml
                        

数据预处理

大规模数据标准化

使用Dask-ML进行大规模数据标准化:

import dask.array as da
from dask_ml.preprocessing import StandardScaler
import numpy as np

# 创建大型数据集
X = da.random.random((100000, 100), chunks=(10000, 100))

# 使用Dask-ML标准化器
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# 计算结果
print("原始数据统计:")
print(f"  均值: {X.mean(axis=0).compute()[:5]}")
print(f"  标准差: {X.std(axis=0).compute()[:5]}")

print("\n标准化后数据统计:")
print(f"  均值: {X_scaled.mean(axis=0).compute()[:5]}")
print(f"  标准差: {X_scaled.std(axis=0).compute()[:5]}")
                        

特征编码

处理大规模分类特征:

import dask.dataframe as dd
from dask_ml.preprocessing import LabelEncoder, OneHotEncoder
import pandas as pd

# 创建示例数据
data = {
    'category': [f'Category_{i%10}' for i in range(100000)],
    'subcategory': [f'Sub_{i%100}' for i in range(100000)],
    'value': np.random.random(100000)
}
df = pd.DataFrame(data)
ddf = dd.from_pandas(df, npartitions=10)

# 标签编码
label_encoder = LabelEncoder()
ddf['category_encoded'] = label_encoder.fit_transform(ddf['category'])

# 独热编码
onehot_encoder = OneHotEncoder(sparse=False)
category_onehot = onehot_encoder.fit_transform(ddf[['category']])

print("标签编码结果:")
print(ddf[['category', 'category_encoded']].head())

print("\n独热编码形状:")
print(category_onehot.shape)
                        

模型训练

逻辑回归

使用Dask-ML训练逻辑回归模型:

from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
from sklearn.datasets import make_classification
import dask.array as da

# 创建大型分类数据集
X, y = make_classification(n_samples=100000, n_features=20, n_classes=2, random_state=42)
X_da = da.from_array(X, chunks=(10000, 20))
y_da = da.from_array(y, chunks=(10000,))

# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X_da, y_da, test_size=0.2, random_state=42)

# 训练逻辑回归模型
model = LogisticRegression()
model.fit(X_train, y_train)

# 预测
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)

# 评估模型
from dask_ml.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)
print(f"模型准确率: {accuracy.compute():.4f}")

# 获取模型参数
print(f"模型系数形状: {model.coef_.shape}")
print(f"截距: {model.intercept_}")
                        

随机森林

使用Dask-ML训练随机森林模型:

from dask_ml.ensemble import RandomForestClassifier
from dask_ml.model_selection import train_test_split
from sklearn.datasets import make_classification
import dask.array as da

# 创建大型分类数据集
X, y = make_classification(n_samples=50000, n_features=20, n_informative=10, 
                          n_classes=3, random_state=42)
X_da = da.from_array(X, chunks=(5000, 20))
y_da = da.from_array(y, chunks=(5000,))

# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X_da, y_da, test_size=0.2, random_state=42)

# 训练随机森林模型
rf_model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
rf_model.fit(X_train, y_train)

# 预测
y_pred = rf_model.predict(X_test)

# 评估模型
from dask_ml.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)
print(f"随机森林准确率: {accuracy.compute():.4f}")

# 特征重要性
print("特征重要性:")
print(rf_model.feature_importances_[:10].compute())
                        

模型选择与评估

交叉验证

使用Dask-ML进行交叉验证:

from dask_ml.model_selection import GridSearchCV, KFold
from dask_ml.linear_model import LogisticRegression
from sklearn.datasets import make_classification
import dask.array as da

# 创建数据集
X, y = make_classification(n_samples=50000, n_features=20, n_classes=2, random_state=42)
X_da = da.from_array(X, chunks=(5000, 20))
y_da = da.from_array(y, chunks=(5000,))

# 定义模型和参数网格
model = LogisticRegression()
param_grid = {
    'C': [0.1, 1.0, 10.0],
    'penalty': ['l1', 'l2']
}

# 网格搜索交叉验证
grid_search = GridSearchCV(
    model, 
    param_grid, 
    cv=3,  # 3折交叉验证
    scoring='accuracy'
)

# 执行网格搜索
grid_search.fit(X_da, y_da)

# 输出结果
print("最佳参数:")
print(grid_search.best_params_)
print(f"最佳得分: {grid_search.best_score_.compute():.4f}")

print("\n所有参数组合结果:")
results = grid_search.cv_results_
for i in range(len(results['params'])):
    params = results['params'][i]
    mean_score = results['mean_test_score'][i].compute()
    std_score = results['std_test_score'][i].compute()
    print(f"  {params}: {mean_score:.4f} (+/- {std_score*2:.4f})")
                        

模型评估指标

使用Dask-ML计算各种评估指标:

from dask_ml.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
)
from sklearn.datasets import make_classification
import dask.array as da

# 创建数据集
X, y = make_classification(n_samples=50000, n_features=20, n_classes=2, random_state=42)
X_da = da.from_array(X, chunks=(5000, 20))
y_da = da.from_array(y, chunks=(5000,))

# 训练简单模型
from dask_ml.linear_model import LogisticRegression
model = LogisticRegression()
model.fit(X_da, y_da)

# 预测
y_pred = model.predict(X_da)
y_proba = model.predict_proba(X_da)[:, 1]

# 计算各种评估指标
accuracy = accuracy_score(y_da, y_pred)
precision = precision_score(y_da, y_pred)
recall = recall_score(y_da, y_pred)
f1 = f1_score(y_da, y_pred)
roc_auc = roc_auc_score(y_da, y_proba)

# 输出结果
print("模型评估指标:")
print(f"  准确率: {accuracy.compute():.4f}")
print(f"  精确率: {precision.compute():.4f}")
print(f"  召回率: {recall.compute():.4f}")
print(f"  F1得分: {f1.compute():.4f}")
print(f"  ROC AUC: {roc_auc.compute():.4f}")
                        

聚类分析

K-Means聚类

使用Dask-ML进行大规模K-Means聚类:

from dask_ml.cluster import KMeans
from sklearn.datasets import make_blobs
import dask.array as da
import matplotlib.pyplot as plt

# 创建大型聚类数据集
X, _ = make_blobs(n_samples=100000, centers=5, n_features=2, 
                  random_state=42, cluster_std=1.0)
X_da = da.from_array(X, chunks=(10000, 2))

# K-Means聚类
kmeans = KMeans(n_clusters=5, random_state=42, init_max_iter=2, oversampling_factor=10)
kmeans.fit(X_da)

# 获取聚类结果
labels = kmeans.labels_
centers = kmeans.cluster_centers_

print("聚类结果:")
print(f"  样本数量: {len(labels)}")
print(f"  聚类中心: {centers.compute()}")

# 可视化结果(采样显示)
sample_indices = da.random.choice(len(X_da), size=1000, replace=False)
X_sample = X_da[sample_indices].compute()
labels_sample = labels[sample_indices].compute()

plt.figure(figsize=(10, 8))
plt.scatter(X_sample[:, 0], X_sample[:, 1], c=labels_sample, alpha=0.6)
plt.scatter(centers[:, 0].compute(), centers[:, 1].compute(), 
            c='red', marker='x', s=200, linewidths=3, label='聚类中心')
plt.title('K-Means聚类结果')
plt.legend()
plt.show()
                        

降维分析

主成分分析(PCA)

使用Dask-ML进行大规模PCA降维:

from dask_ml.decomposition import PCA
from sklearn.datasets import make_classification
import dask.array as da

# 创建高维数据集
X, _ = make_classification(n_samples=50000, n_features=100, n_informative=20, 
                          n_classes=2, random_state=42)
X_da = da.from_array(X, chunks=(5000, 100))

# PCA降维
pca = PCA(n_components=10)
X_reduced = pca.fit_transform(X_da)

print("PCA结果:")
print(f"  原始维度: {X_da.shape}")
print(f"  降维后维度: {X_reduced.shape}")
print(f"  解释方差比: {pca.explained_variance_ratio_.compute()[:5]}")
print(f"  累积解释方差比: {pca.explained_variance_ratio_.cumsum().compute()[:5]}")

# 重构误差
X_reconstructed = pca.inverse_transform(X_reduced)
reconstruction_error = ((X_da - X_reconstructed) ** 2).mean().compute()
print(f"  重构误差: {reconstruction_error:.6f}")
                        

超参数优化

贝叶斯优化

使用Dask-ML进行贝叶斯超参数优化:

from dask_ml.model_selection import RandomizedSearchCV
from dask_ml.linear_model import LogisticRegression
from scipy.stats import uniform, loguniform
from sklearn.datasets import make_classification
import dask.array as da

# 创建数据集
X, y = make_classification(n_samples=30000, n_features=20, n_classes=2, random_state=42)
X_da = da.from_array(X, chunks=(3000, 20))
y_da = da.from_array(y, chunks=(3000,))

# 定义模型
model = LogisticRegression()

# 定义参数分布
param_distributions = {
    'C': loguniform(1e-4, 1e4),
    'penalty': ['l1', 'l2']
}

# 随机搜索交叉验证
random_search = RandomizedSearchCV(
    model,
    param_distributions,
    n_iter=10,  # 尝试10种参数组合
    cv=3,
    scoring='accuracy',
    random_state=42
)

# 执行随机搜索
random_search.fit(X_da, y_da)

# 输出结果
print("贝叶斯优化结果:")
print(f"  最佳参数: {random_search.best_params_}")
print(f"  最佳得分: {random_search.best_score_.compute():.4f}")

print("\n前5个最佳参数组合:")
results = random_search.cv_results_
top_indices = results['mean_test_score'].topk(5).compute()
for i, idx in enumerate(top_indices):
    params = results['params'][idx]
    mean_score = results['mean_test_score'][idx].compute()
    std_score = results['std_test_score'][idx].compute()
    print(f"  {i+1}. {params}: {mean_score:.4f} (+/- {std_score*2:.4f})")
                        

管道化处理

机器学习管道

使用Dask-ML构建机器学习管道:

from dask_ml.pipeline import Pipeline
from dask_ml.preprocessing import StandardScaler
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
from sklearn.datasets import make_classification
import dask.array as da

# 创建数据集
X, y = make_classification(n_samples=50000, n_features=20, n_classes=2, random_state=42)
X_da = da.from_array(X, chunks=(5000, 20))
y_da = da.from_array(y, chunks=(5000,))

# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X_da, y_da, test_size=0.2, random_state=42)

# 构建机器学习管道
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

# 训练管道
pipeline.fit(X_train, y_train)

# 预测
y_pred = pipeline.predict(X_test)

# 评估
from dask_ml.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)
print(f"管道模型准确率: {accuracy.compute():.4f}")

# 获取管道组件
print("\n管道组件:")
for name, estimator in pipeline.named_steps.items():
    print(f"  {name}: {type(estimator).__name__}")
                        

实际应用案例

大规模文本分类

使用Dask-ML进行大规模文本分类:

from dask_ml.feature_extraction.text import HashingVectorizer
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
import dask.bag as db
import dask.dataframe as dd
import json

# 模拟创建文本数据
def create_sample_text_data():
    """创建示例文本数据"""
    texts = []
    labels = []
    for i in range(50000):
        category = f"category_{i%5}"
        text = f"这是第{i}个文本样本,属于{category}类别,包含一些示例内容"
        texts.append(text)
        labels.append(category)
    
    # 保存为JSON文件
    data = [{'text': t, 'label': l} for t, l in zip(texts, labels)]
    for i in range(5):
        batch = data[i*10000:(i+1)*10000]
        with open(f'text_data_{i}.json', 'w') as f:
            json.dump(batch, f)

# 创建示例数据(运行一次)
# create_sample_text_data()

# 读取文本数据
text_files = [f'text_data_{i}.json' for i in range(5)]
data_bags = [db.read_text(f).map(json.loads) for f in text_files]
all_data = db.concat(data_bags)

# 提取文本和标签
texts = all_data.pluck('text')
labels = all_data.pluck('label')

# 文本向量化
vectorizer = HashingVectorizer(n_features=10000, alternate_sign=False)
X = vectorizer.fit_transform(texts)

# 转换标签
from dask_ml.preprocessing import LabelEncoder
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(labels)

# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 训练分类器
classifier = LogisticRegression()
classifier.fit(X_train, y_train)

# 预测和评估
y_pred = classifier.predict(X_test)
accuracy = classifier.score(X_test, y_test)

print("文本分类结果:")
print(f"  训练样本数: {len(X_train)}")
print(f"  测试样本数: {len(X_test)}")
print(f"  特征维度: {X.shape[1]}")
print(f"  分类准确率: {accuracy.compute():.4f}")
                        

性能优化技巧

内存管理

优化大规模机器学习的内存使用:

import dask
from dask.distributed import Client

# 配置内存限制
dask.config.set({
    'distributed.worker.memory.target': 0.6,
    'distributed.worker.memory.spill': 0.7,
    'distributed.worker.memory.pause': 0.8,
    'distributed.worker.memory.terminate': 0.95
})

# 使用分布式客户端
client = Client(memory_limit='2GB', processes=True)
                            

并行度优化

根据硬件资源调整并行度:

import multiprocessing

# 根据CPU核心数设置工作进程
n_workers = multiprocessing.cpu_count()
client = Client(n_workers=n_workers, threads_per_worker=1)

# 对于I/O密集型任务
# client = Client(n_workers=4, threads_per_worker=4)
                            

增量学习

使用支持增量学习的算法:

from dask_ml.linear_model import SGDClassifier

# 增量学习适用于大规模数据
model = SGDClassifier()

# 分批训练
for batch_X, batch_y in data_batches:
    model.partial_fit(batch_X, batch_y, classes=unique_classes)
                            

提示:Dask-ML为大规模机器学习提供了强大的工具,但在使用时要注意内存管理和计算资源的合理分配。对于超大规模数据集,建议使用分布式计算环境以获得更好的性能。