Parallelly training sklearn models with dask-ml
This notebooks demonstrate how to execute parallel machine learning training using dask-ml
and motrainer.
The example dataset ./example1_data.zarr/
can be generated using the following Jupyter Notebook:
In [1]:
Copied!
import motrainer
import numpy as np
import xarray as xr
import motrainer
import numpy as np
import xarray as xr
Load data¶
In [2]:
Copied!
data_path = "./example1_data.zarr"
ds = xr.open_zarr(data_path)
ds
data_path = "./example1_data.zarr"
ds = xr.open_zarr(data_path)
ds
Out[2]:
<xarray.Dataset> Dimensions: (space: 5, time: 8506) Coordinates: latitude (space) float64 dask.array<chunksize=(5,), meta=np.ndarray> longitude (space) float64 dask.array<chunksize=(5,), meta=np.ndarray> * time (time) datetime64[ns] 2007-01-02 ... 2020-01-01T01:00:00 Dimensions without coordinates: space Data variables: BIOMA1 (space, time) float64 dask.array<chunksize=(3, 8506), meta=np.ndarray> BIOMA2 (space, time) float64 dask.array<chunksize=(3, 8506), meta=np.ndarray> TG1 (space, time) float64 dask.array<chunksize=(3, 8506), meta=np.ndarray> TG2 (space, time) float64 dask.array<chunksize=(3, 8506), meta=np.ndarray> TG3 (space, time) float64 dask.array<chunksize=(3, 8506), meta=np.ndarray> WG1 (space, time) float64 dask.array<chunksize=(3, 8506), meta=np.ndarray> WG2 (space, time) float64 dask.array<chunksize=(3, 8506), meta=np.ndarray> WG3 (space, time) float64 dask.array<chunksize=(3, 8506), meta=np.ndarray> curv (space, time) float64 dask.array<chunksize=(3, 8506), meta=np.ndarray> sig (space, time) float64 dask.array<chunksize=(3, 8506), meta=np.ndarray> slop (space, time) float64 dask.array<chunksize=(3, 8506), meta=np.ndarray> Attributes: license: data license source: data source
Split per gridcell¶
In [3]:
Copied!
# Check if the dataset is splitable
motrainer.is_splitable(ds)
# Check if the dataset is splitable
motrainer.is_splitable(ds)
Out[3]:
True
In [4]:
Copied!
# split the dataset per grid cell
bags = motrainer.dataset_split(ds, "space")
bags
# split the dataset per grid cell
bags = motrainer.dataset_split(ds, "space")
bags
Out[4]:
dask.bag<from_sequence, npartitions=5>
Train Test Split¶
In [5]:
Copied!
def to_dataframe(ds):
return ds.to_dask_dataframe()
def chunk(ds, chunks):
return ds.chunk(chunks)
def to_dataframe(ds):
return ds.to_dask_dataframe()
def chunk(ds, chunks):
return ds.chunk(chunks)
In [6]:
Copied!
# Train test split, mapped to each element of the bag
train_test_bags = bags.map(
motrainer.train_test_split, split={"time": np.datetime64("2016-01-01")}
)
# # Or split by mask
# mask = ds["time"]<np.datetime64("2016-01-01")
# train_test_bags = bags.map(
# motrainer.train_test_split, mask={"time": np.datetime64("2016-01-01")}
# )
# Train test split, mapped to each element of the bag
train_test_bags = bags.map(
motrainer.train_test_split, split={"time": np.datetime64("2016-01-01")}
)
# # Or split by mask
# mask = ds["time"]
In [7]:
Copied!
# Retrieve the train and test bags
train_bags = train_test_bags.pluck(0).map(chunk, {"space": 500}).map(to_dataframe)
test_bags = train_test_bags.pluck(1).map(chunk, {"space": 500}).map(to_dataframe)
# Retrieve the train and test bags
train_bags = train_test_bags.pluck(0).map(chunk, {"space": 500}).map(to_dataframe)
test_bags = train_test_bags.pluck(1).map(chunk, {"space": 500}).map(to_dataframe)
Setup Training¶
In [8]:
Copied!
# Setup grid search
from sklearn.svm import SVR
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import RepeatedKFold
from dask_ml.preprocessing import MinMaxScaler
from dask_ml.model_selection import GridSearchCV
regSVR = make_pipeline(MinMaxScaler(), SVR())
kernel = ["poly", "rbf", "sigmoid"]
C = [1, 0.1]
gamma = ["scale"]
grid = dict(svr__kernel=kernel, svr__C=C, svr__gamma=gamma)
cv = RepeatedKFold(n_splits=4, n_repeats=2, random_state=1)
grid_search = GridSearchCV(
estimator=regSVR,
param_grid=grid,
cv=cv,
scoring=["r2", "neg_mean_squared_error"],
refit="r2",
)
# Setup grid search
from sklearn.svm import SVR
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import RepeatedKFold
from dask_ml.preprocessing import MinMaxScaler
from dask_ml.model_selection import GridSearchCV
regSVR = make_pipeline(MinMaxScaler(), SVR())
kernel = ["poly", "rbf", "sigmoid"]
C = [1, 0.1]
gamma = ["scale"]
grid = dict(svr__kernel=kernel, svr__C=C, svr__gamma=gamma)
cv = RepeatedKFold(n_splits=4, n_repeats=2, random_state=1)
grid_search = GridSearchCV(
estimator=regSVR,
param_grid=grid,
cv=cv,
scoring=["r2", "neg_mean_squared_error"],
refit="r2",
)
Model Optimization¶
In [9]:
Copied!
# Setup optimization function
import warnings
from sklearn.exceptions import DataConversionWarning
def optimize(df, grid_search, input_list, output_list):
"""Customized Optimization Function
"""
df = df.dropna()
# Because a dask dataframe is a delayed object, fit function raises warning DataConversionWarning.
# Here the warning is supressed.
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DataConversionWarning)
grid_result = grid_search.fit(df[input_list], df[output_list])
return grid_result
# Map the optimization function to the train bags
input_list = ["BIOMA1", "BIOMA1", "TG1", "TG2", "TG3"]
output_list = ["slop"]
optimazed_estimators = train_bags.map(
optimize, grid_search=grid_search, input_list=input_list, output_list=output_list
)
# Setup optimization function
import warnings
from sklearn.exceptions import DataConversionWarning
def optimize(df, grid_search, input_list, output_list):
"""Customized Optimization Function
"""
df = df.dropna()
# Because a dask dataframe is a delayed object, fit function raises warning DataConversionWarning.
# Here the warning is supressed.
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DataConversionWarning)
grid_result = grid_search.fit(df[input_list], df[output_list])
return grid_result
# Map the optimization function to the train bags
input_list = ["BIOMA1", "BIOMA1", "TG1", "TG2", "TG3"]
output_list = ["slop"]
optimazed_estimators = train_bags.map(
optimize, grid_search=grid_search, input_list=input_list, output_list=output_list
)
In [10]:
Copied!
# Execute the training
optimazed_estimators_realized = optimazed_estimators.compute()
# Execute the training
optimazed_estimators_realized = optimazed_estimators.compute()
Save model¶
In [11]:
Copied!
from motrainer import util
from motrainer import util
2024-05-10 13:04:59.611087: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-05-10 13:04:59.611235: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-05-10 13:04:59.614657: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
In [12]:
Copied!
from pathlib import Path
Path('./results').mkdir(exist_ok=True)
for id, model in enumerate(optimazed_estimators_realized):
util.sklearn_save(model, f"./results/{id}_model.h5", meta_data = {"data_source": data_path})
print(f"{id}_model.h5 is saved")
from pathlib import Path
Path('./results').mkdir(exist_ok=True)
for id, model in enumerate(optimazed_estimators_realized):
util.sklearn_save(model, f"./results/{id}_model.h5", meta_data = {"data_source": data_path})
print(f"{id}_model.h5 is saved")
0_model.h5 is saved 1_model.h5 is saved 2_model.h5 is saved 3_model.h5 is saved 4_model.h5 is saved
Model performance evaluation¶
In [13]:
Copied!
# Load the models back
from pathlib import Path
list_model = []
for file in Path("./results").glob("*.h5"):
model, metadata = util.sklearn_load(file)
list_model.append(model)
list_model
# Load the models back
from pathlib import Path
list_model = []
for file in Path("./results").glob("*.h5"):
model, metadata = util.sklearn_load(file)
list_model.append(model)
list_model
Out[13]:
[GridSearchCV(cv=RepeatedKFold(n_repeats=2, n_splits=4, random_state=1), estimator=Pipeline(steps=[('minmaxscaler', MinMaxScaler()), ('svr', SVR())]), param_grid={'svr__C': [1, 0.1], 'svr__gamma': ['scale'], 'svr__kernel': ['poly', 'rbf', 'sigmoid']}, refit='r2', scoring=['r2', 'neg_mean_squared_error']), GridSearchCV(cv=RepeatedKFold(n_repeats=2, n_splits=4, random_state=1), estimator=Pipeline(steps=[('minmaxscaler', MinMaxScaler()), ('svr', SVR())]), param_grid={'svr__C': [1, 0.1], 'svr__gamma': ['scale'], 'svr__kernel': ['poly', 'rbf', 'sigmoid']}, refit='r2', scoring=['r2', 'neg_mean_squared_error']), GridSearchCV(cv=RepeatedKFold(n_repeats=2, n_splits=4, random_state=1), estimator=Pipeline(steps=[('minmaxscaler', MinMaxScaler()), ('svr', SVR())]), param_grid={'svr__C': [1, 0.1], 'svr__gamma': ['scale'], 'svr__kernel': ['poly', 'rbf', 'sigmoid']}, refit='r2', scoring=['r2', 'neg_mean_squared_error']), GridSearchCV(cv=RepeatedKFold(n_repeats=2, n_splits=4, random_state=1), estimator=Pipeline(steps=[('minmaxscaler', MinMaxScaler()), ('svr', SVR())]), param_grid={'svr__C': [1, 0.1], 'svr__gamma': ['scale'], 'svr__kernel': ['poly', 'rbf', 'sigmoid']}, refit='r2', scoring=['r2', 'neg_mean_squared_error']), GridSearchCV(cv=RepeatedKFold(n_repeats=2, n_splits=4, random_state=1), estimator=Pipeline(steps=[('minmaxscaler', MinMaxScaler()), ('svr', SVR())]), param_grid={'svr__C': [1, 0.1], 'svr__gamma': ['scale'], 'svr__kernel': ['poly', 'rbf', 'sigmoid']}, refit='r2', scoring=['r2', 'neg_mean_squared_error'])]
In [14]:
Copied!
from sklearn.metrics import mean_squared_error , r2_score, mean_absolute_error
# This for need to be coverted to a user defined
list_metrics = []
for model, test_data in zip(list_model, test_bags.compute()):
test_data = test_data.dropna()
X_test = test_data[input_list]
Y_test = test_data[output_list]
Y_eval = model.predict(X_test)
metrics = {"MSE_SVR": mean_squared_error(Y_test,Y_eval),
"MAE_SVR": mean_absolute_error(Y_test,Y_eval),
"R_2":r2_score(Y_test,Y_eval)}
list_metrics.append(metrics)
from sklearn.metrics import mean_squared_error , r2_score, mean_absolute_error
# This for need to be coverted to a user defined
list_metrics = []
for model, test_data in zip(list_model, test_bags.compute()):
test_data = test_data.dropna()
X_test = test_data[input_list]
Y_test = test_data[output_list]
Y_eval = model.predict(X_test)
metrics = {"MSE_SVR": mean_squared_error(Y_test,Y_eval),
"MAE_SVR": mean_absolute_error(Y_test,Y_eval),
"R_2":r2_score(Y_test,Y_eval)}
list_metrics.append(metrics)