Dask机器学习应用
在本章中,我们将介绍如何使用Dask进行大规模机器学习和数据科学应用。Dask与scikit-learn无缝集成,提供了处理大规模数据集的机器学习能力。
Dask-ML简介
Dask-ML是Dask的机器学习扩展,它提供了与scikit-learn兼容的大规模机器学习算法。
Dask-ML的特点
- 兼容性:与scikit-learn API兼容,学习成本低
- 可扩展性:支持大规模数据集和特征
- 并行化:自动并行化机器学习算法
- 分布式:支持分布式计算环境
安装Dask-ML
# 安装Dask-ML
pip install dask-ml
# 或者使用conda
conda install -c conda-forge dask-ml
数据预处理
大规模数据标准化
使用Dask-ML进行大规模数据标准化:
import dask.array as da
from dask_ml.preprocessing import StandardScaler
import numpy as np
# 创建大型数据集
X = da.random.random((100000, 100), chunks=(10000, 100))
# 使用Dask-ML标准化器
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 计算结果
print("原始数据统计:")
print(f" 均值: {X.mean(axis=0).compute()[:5]}")
print(f" 标准差: {X.std(axis=0).compute()[:5]}")
print("\n标准化后数据统计:")
print(f" 均值: {X_scaled.mean(axis=0).compute()[:5]}")
print(f" 标准差: {X_scaled.std(axis=0).compute()[:5]}")
特征编码
处理大规模分类特征:
import dask.dataframe as dd
from dask_ml.preprocessing import LabelEncoder, OneHotEncoder
import pandas as pd
# 创建示例数据
data = {
'category': [f'Category_{i%10}' for i in range(100000)],
'subcategory': [f'Sub_{i%100}' for i in range(100000)],
'value': np.random.random(100000)
}
df = pd.DataFrame(data)
ddf = dd.from_pandas(df, npartitions=10)
# 标签编码
label_encoder = LabelEncoder()
ddf['category_encoded'] = label_encoder.fit_transform(ddf['category'])
# 独热编码
onehot_encoder = OneHotEncoder(sparse=False)
category_onehot = onehot_encoder.fit_transform(ddf[['category']])
print("标签编码结果:")
print(ddf[['category', 'category_encoded']].head())
print("\n独热编码形状:")
print(category_onehot.shape)
模型训练
逻辑回归
使用Dask-ML训练逻辑回归模型:
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
from sklearn.datasets import make_classification
import dask.array as da
# 创建大型分类数据集
X, y = make_classification(n_samples=100000, n_features=20, n_classes=2, random_state=42)
X_da = da.from_array(X, chunks=(10000, 20))
y_da = da.from_array(y, chunks=(10000,))
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X_da, y_da, test_size=0.2, random_state=42)
# 训练逻辑回归模型
model = LogisticRegression()
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)
# 评估模型
from dask_ml.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)
print(f"模型准确率: {accuracy.compute():.4f}")
# 获取模型参数
print(f"模型系数形状: {model.coef_.shape}")
print(f"截距: {model.intercept_}")
随机森林
使用Dask-ML训练随机森林模型:
from dask_ml.ensemble import RandomForestClassifier
from dask_ml.model_selection import train_test_split
from sklearn.datasets import make_classification
import dask.array as da
# 创建大型分类数据集
X, y = make_classification(n_samples=50000, n_features=20, n_informative=10,
n_classes=3, random_state=42)
X_da = da.from_array(X, chunks=(5000, 20))
y_da = da.from_array(y, chunks=(5000,))
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X_da, y_da, test_size=0.2, random_state=42)
# 训练随机森林模型
rf_model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
rf_model.fit(X_train, y_train)
# 预测
y_pred = rf_model.predict(X_test)
# 评估模型
from dask_ml.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)
print(f"随机森林准确率: {accuracy.compute():.4f}")
# 特征重要性
print("特征重要性:")
print(rf_model.feature_importances_[:10].compute())
模型选择与评估
交叉验证
使用Dask-ML进行交叉验证:
from dask_ml.model_selection import GridSearchCV, KFold
from dask_ml.linear_model import LogisticRegression
from sklearn.datasets import make_classification
import dask.array as da
# 创建数据集
X, y = make_classification(n_samples=50000, n_features=20, n_classes=2, random_state=42)
X_da = da.from_array(X, chunks=(5000, 20))
y_da = da.from_array(y, chunks=(5000,))
# 定义模型和参数网格
model = LogisticRegression()
param_grid = {
'C': [0.1, 1.0, 10.0],
'penalty': ['l1', 'l2']
}
# 网格搜索交叉验证
grid_search = GridSearchCV(
model,
param_grid,
cv=3, # 3折交叉验证
scoring='accuracy'
)
# 执行网格搜索
grid_search.fit(X_da, y_da)
# 输出结果
print("最佳参数:")
print(grid_search.best_params_)
print(f"最佳得分: {grid_search.best_score_.compute():.4f}")
print("\n所有参数组合结果:")
results = grid_search.cv_results_
for i in range(len(results['params'])):
params = results['params'][i]
mean_score = results['mean_test_score'][i].compute()
std_score = results['std_test_score'][i].compute()
print(f" {params}: {mean_score:.4f} (+/- {std_score*2:.4f})")
模型评估指标
使用Dask-ML计算各种评估指标:
from dask_ml.metrics import (
accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
)
from sklearn.datasets import make_classification
import dask.array as da
# 创建数据集
X, y = make_classification(n_samples=50000, n_features=20, n_classes=2, random_state=42)
X_da = da.from_array(X, chunks=(5000, 20))
y_da = da.from_array(y, chunks=(5000,))
# 训练简单模型
from dask_ml.linear_model import LogisticRegression
model = LogisticRegression()
model.fit(X_da, y_da)
# 预测
y_pred = model.predict(X_da)
y_proba = model.predict_proba(X_da)[:, 1]
# 计算各种评估指标
accuracy = accuracy_score(y_da, y_pred)
precision = precision_score(y_da, y_pred)
recall = recall_score(y_da, y_pred)
f1 = f1_score(y_da, y_pred)
roc_auc = roc_auc_score(y_da, y_proba)
# 输出结果
print("模型评估指标:")
print(f" 准确率: {accuracy.compute():.4f}")
print(f" 精确率: {precision.compute():.4f}")
print(f" 召回率: {recall.compute():.4f}")
print(f" F1得分: {f1.compute():.4f}")
print(f" ROC AUC: {roc_auc.compute():.4f}")
聚类分析
K-Means聚类
使用Dask-ML进行大规模K-Means聚类:
from dask_ml.cluster import KMeans
from sklearn.datasets import make_blobs
import dask.array as da
import matplotlib.pyplot as plt
# 创建大型聚类数据集
X, _ = make_blobs(n_samples=100000, centers=5, n_features=2,
random_state=42, cluster_std=1.0)
X_da = da.from_array(X, chunks=(10000, 2))
# K-Means聚类
kmeans = KMeans(n_clusters=5, random_state=42, init_max_iter=2, oversampling_factor=10)
kmeans.fit(X_da)
# 获取聚类结果
labels = kmeans.labels_
centers = kmeans.cluster_centers_
print("聚类结果:")
print(f" 样本数量: {len(labels)}")
print(f" 聚类中心: {centers.compute()}")
# 可视化结果(采样显示)
sample_indices = da.random.choice(len(X_da), size=1000, replace=False)
X_sample = X_da[sample_indices].compute()
labels_sample = labels[sample_indices].compute()
plt.figure(figsize=(10, 8))
plt.scatter(X_sample[:, 0], X_sample[:, 1], c=labels_sample, alpha=0.6)
plt.scatter(centers[:, 0].compute(), centers[:, 1].compute(),
c='red', marker='x', s=200, linewidths=3, label='聚类中心')
plt.title('K-Means聚类结果')
plt.legend()
plt.show()
降维分析
主成分分析(PCA)
使用Dask-ML进行大规模PCA降维:
from dask_ml.decomposition import PCA
from sklearn.datasets import make_classification
import dask.array as da
# 创建高维数据集
X, _ = make_classification(n_samples=50000, n_features=100, n_informative=20,
n_classes=2, random_state=42)
X_da = da.from_array(X, chunks=(5000, 100))
# PCA降维
pca = PCA(n_components=10)
X_reduced = pca.fit_transform(X_da)
print("PCA结果:")
print(f" 原始维度: {X_da.shape}")
print(f" 降维后维度: {X_reduced.shape}")
print(f" 解释方差比: {pca.explained_variance_ratio_.compute()[:5]}")
print(f" 累积解释方差比: {pca.explained_variance_ratio_.cumsum().compute()[:5]}")
# 重构误差
X_reconstructed = pca.inverse_transform(X_reduced)
reconstruction_error = ((X_da - X_reconstructed) ** 2).mean().compute()
print(f" 重构误差: {reconstruction_error:.6f}")
超参数优化
贝叶斯优化
使用Dask-ML进行贝叶斯超参数优化:
from dask_ml.model_selection import RandomizedSearchCV
from dask_ml.linear_model import LogisticRegression
from scipy.stats import uniform, loguniform
from sklearn.datasets import make_classification
import dask.array as da
# 创建数据集
X, y = make_classification(n_samples=30000, n_features=20, n_classes=2, random_state=42)
X_da = da.from_array(X, chunks=(3000, 20))
y_da = da.from_array(y, chunks=(3000,))
# 定义模型
model = LogisticRegression()
# 定义参数分布
param_distributions = {
'C': loguniform(1e-4, 1e4),
'penalty': ['l1', 'l2']
}
# 随机搜索交叉验证
random_search = RandomizedSearchCV(
model,
param_distributions,
n_iter=10, # 尝试10种参数组合
cv=3,
scoring='accuracy',
random_state=42
)
# 执行随机搜索
random_search.fit(X_da, y_da)
# 输出结果
print("贝叶斯优化结果:")
print(f" 最佳参数: {random_search.best_params_}")
print(f" 最佳得分: {random_search.best_score_.compute():.4f}")
print("\n前5个最佳参数组合:")
results = random_search.cv_results_
top_indices = results['mean_test_score'].topk(5).compute()
for i, idx in enumerate(top_indices):
params = results['params'][idx]
mean_score = results['mean_test_score'][idx].compute()
std_score = results['std_test_score'][idx].compute()
print(f" {i+1}. {params}: {mean_score:.4f} (+/- {std_score*2:.4f})")
管道化处理
机器学习管道
使用Dask-ML构建机器学习管道:
from dask_ml.pipeline import Pipeline
from dask_ml.preprocessing import StandardScaler
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
from sklearn.datasets import make_classification
import dask.array as da
# 创建数据集
X, y = make_classification(n_samples=50000, n_features=20, n_classes=2, random_state=42)
X_da = da.from_array(X, chunks=(5000, 20))
y_da = da.from_array(y, chunks=(5000,))
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X_da, y_da, test_size=0.2, random_state=42)
# 构建机器学习管道
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
# 训练管道
pipeline.fit(X_train, y_train)
# 预测
y_pred = pipeline.predict(X_test)
# 评估
from dask_ml.metrics import accuracy_score
accuracy = accuracy_score(y_test, y_pred)
print(f"管道模型准确率: {accuracy.compute():.4f}")
# 获取管道组件
print("\n管道组件:")
for name, estimator in pipeline.named_steps.items():
print(f" {name}: {type(estimator).__name__}")
实际应用案例
大规模文本分类
使用Dask-ML进行大规模文本分类:
from dask_ml.feature_extraction.text import HashingVectorizer
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
import dask.bag as db
import dask.dataframe as dd
import json
# 模拟创建文本数据
def create_sample_text_data():
"""创建示例文本数据"""
texts = []
labels = []
for i in range(50000):
category = f"category_{i%5}"
text = f"这是第{i}个文本样本,属于{category}类别,包含一些示例内容"
texts.append(text)
labels.append(category)
# 保存为JSON文件
data = [{'text': t, 'label': l} for t, l in zip(texts, labels)]
for i in range(5):
batch = data[i*10000:(i+1)*10000]
with open(f'text_data_{i}.json', 'w') as f:
json.dump(batch, f)
# 创建示例数据(运行一次)
# create_sample_text_data()
# 读取文本数据
text_files = [f'text_data_{i}.json' for i in range(5)]
data_bags = [db.read_text(f).map(json.loads) for f in text_files]
all_data = db.concat(data_bags)
# 提取文本和标签
texts = all_data.pluck('text')
labels = all_data.pluck('label')
# 文本向量化
vectorizer = HashingVectorizer(n_features=10000, alternate_sign=False)
X = vectorizer.fit_transform(texts)
# 转换标签
from dask_ml.preprocessing import LabelEncoder
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(labels)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 训练分类器
classifier = LogisticRegression()
classifier.fit(X_train, y_train)
# 预测和评估
y_pred = classifier.predict(X_test)
accuracy = classifier.score(X_test, y_test)
print("文本分类结果:")
print(f" 训练样本数: {len(X_train)}")
print(f" 测试样本数: {len(X_test)}")
print(f" 特征维度: {X.shape[1]}")
print(f" 分类准确率: {accuracy.compute():.4f}")
性能优化技巧
内存管理
优化大规模机器学习的内存使用:
import dask
from dask.distributed import Client
# 配置内存限制
dask.config.set({
'distributed.worker.memory.target': 0.6,
'distributed.worker.memory.spill': 0.7,
'distributed.worker.memory.pause': 0.8,
'distributed.worker.memory.terminate': 0.95
})
# 使用分布式客户端
client = Client(memory_limit='2GB', processes=True)
并行度优化
根据硬件资源调整并行度:
import multiprocessing
# 根据CPU核心数设置工作进程
n_workers = multiprocessing.cpu_count()
client = Client(n_workers=n_workers, threads_per_worker=1)
# 对于I/O密集型任务
# client = Client(n_workers=4, threads_per_worker=4)
增量学习
使用支持增量学习的算法:
from dask_ml.linear_model import SGDClassifier
# 增量学习适用于大规模数据
model = SGDClassifier()
# 分批训练
for batch_X, batch_y in data_batches:
model.partial_fit(batch_X, batch_y, classes=unique_classes)
提示:Dask-ML为大规模机器学习提供了强大的工具,但在使用时要注意内存管理和计算资源的合理分配。对于超大规模数据集,建议使用分布式计算环境以获得更好的性能。