Skip to content

API Reference

Here you find the api reference for the major components of MOTrainer.

motrainer.splitter:

dataset_split(ds, identifier)

Split a Dataset by indentifier for independent training tasks.

Parameters:

Name Type Description Default
ds Dataset

Xarray Dataset to be splitted.

required
identifier dict | str

When indentifier is a dictionary, its keys should be a subset of {"space", "time"}, and map "space" and/or "time" dimension with corresponding separation indentifier.

When indentifier is a string, the separation will depends on if indentifier is a key of coords/data_vars or a dimension name ("space" or "time"). In the former case the corresponding coords/data_vars will be used as separation indentifier. In the latter case ds will be separated per entry in that dimension.

required

Returns:

Type Description
bag

A Dask Databag of splited Datasets

Source code in motrainer/splitter.py
def dataset_split(ds: xr.Dataset, identifier: dict | str) -> db:
    """Split a Dataset by indentifier for independent training tasks.

    Parameters
    ----------
    ds : xr.Dataset
        Xarray Dataset to be splitted.
    identifier : dict | str
        When `indentifier` is a dictionary, its keys should be a subset of
        {"space", "time"},  and map "space" and/or "time" dimension with corresponding
        separation indentifier.

        When `indentifier` is a string, the separation will depends on if `indentifier`
        is a key of coords/data_vars or a dimension name ("space" or "time").
        In the former case the corresponding coords/data_vars will be used as separation
        indentifier.
        In the latter case ds will be separated per entry in that dimension.

    Returns
    -------
    dask.bag
        A Dask Databag of splited Datasets
    """
    identifier = _regulate_identifier(ds, identifier)

    list_id = []
    for key in MOT_DIMS:
        if key in identifier.keys():
            key_id = key + "_id"
            ds = ds.assign_coords({key_id: (key, identifier[key])})
            list_id.append(key_id)

    # Get the name of attributes to groupby
    if len(list_id) > 1:
        # Use space time cls coordinates as multi index
        # Must stack on MOT_DIMS to reduce dims of data variable
        multi_idx = ds.stack(samples=list_id).samples.values
        for dim in MOT_DIMS:
            if dim in ds.indexes:
                ds = ds.reset_index(dim)
        ds = (
            ds.stack(
                samples=MOT_DIMS, create_index=False
            )  # No index creation since this will be added next.
            .assign_coords(samples=multi_idx)
            .set_xindex(list_id)
        )
        key_gb = "samples"
    else:
        key_gb = list_id[0]

    # Groupby and separate to Dask Databags
    list_db = []
    for grp in list(ds.groupby(key_gb)):
        list_db.append(grp[1])
    bags = db.from_sequence(list_db)

    return bags

is_splitable(ds)

Check if a Dataset is can be splitted using MOTrainer.

The following checks will be applied: - The Dastaset has exactly 2 dimensions - The 2 dims are "space" and "time" - There are no duplicated coordinates A UserWarning will be raised for each failed check.

Parameters:

Name Type Description Default
ds Dataset

Xarray Dataset to be splitted.

required

Returns:

Type Description
bool

Result of check in Boolean. If all checks are passed, it will be True. Otherwise False.

Source code in motrainer/splitter.py
def is_splitable(ds: xr.Dataset) -> bool:
    """Check if a Dataset is can be splitted using MOTrainer.

    The following checks will be applied:
        - The Dastaset has exactly 2 dimensions
        - The 2 dims are "space" and "time"
        - There are no duplicated coordinates
    A UserWarning will be raised for each failed check.

    Parameters
    ----------
    ds : xr.Dataset
        Xarray Dataset to be splitted.

    Returns
    -------
    bool
       Result of check in Boolean.
       If all checks are passed, it will be True. Otherwise False.
    """
    flag_valid = True

    # Dimension size should be 2
    if len(ds.dims) != 2:
        warnings.warn(
            'Dataset should have two dimensions: "space" and "time"',
            UserWarning,
            stacklevel=2,
        )
        flag_valid = False

    # space and time dimensions should exist
    for dim in MOT_DIMS:
        if dim not in ds.dims:
            warnings.warn(
                f"{dim} not found in the dimensions", UserWarning, stacklevel=2
            )
            flag_valid = False

    # Check duplicated coordinates
    for coord in ds.coords:
        if np.unique(ds[coord]).shape != ds[coord].shape:
            warnings.warn(
                f"Duplicated coordinates found in {coord}", UserWarning, stacklevel=2
            )
            flag_valid = False

    return flag_valid

train_test_split(ds, mask=None, split=None, reverse=False)

Split data to train and test datasets.

The split is performed either 1) by specifying the training data mask (mask) where training data locations are True, or 2) by a specifying a coordinate value (split) splitting the data into two.

Parameters:

Name Type Description Default
ds Dataset

Xarray dataset to split

required
mask DataArray

Mask, True at training data locations. By default None

None
split dict

coordinate diactionary in {NAME: coordinates} which split the Dataset into two. The part smaller than it will be training, by default None.

None
reverse bool

Reverse the split results, by default False

False

Returns:

Type Description
tuple[Dataset, Dataset]

Split results. In (training, test).

Raises:

Type Description
ValueError

When neither mask nor split is specified.

ValueError

When both mask and split are specified.

Source code in motrainer/splitter.py
def train_test_split(
    ds: xr.Dataset,
    mask: xr.DataArray = None,
    split: dict = None,
    reverse: bool = False,
) -> tuple[xr.Dataset, xr.Dataset]:
    """Split data to train and test datasets.

    The split is performed either 1) by specifying the training data mask (`mask`)
    where training data locations are True, or 2) by a specifying a coordinate value
    (`split`) splitting the data into two.

    Parameters
    ----------
    ds : xr.Dataset
        Xarray dataset to split
    mask : xr.DataArray, optional
        Mask, True at training data locations. By default None
    split : dict, optional
        coordinate diactionary in {NAME: coordinates} which split the Dataset into two.
        The part smaller than it will be training, by default None.
    reverse : bool, optional
        Reverse the split results, by default False

    Returns
    -------
    tuple[xr.Dataset, xr.Dataset]
        Split results. In (training, test).

    Raises
    ------
    ValueError
        When neither mask nor split is specified.
    ValueError
        When both mask and split are specified.
    """
    if mask is None and split is None:
        raise ValueError("Either mask or split should be specified.")
    elif mask is not None and split is not None:
        raise ValueError("Only one of mask and split should be specified.")

    # Convert split to mask
    if split is not None:
        _validate_train_test_split(split)
        mask = ds[list(split.keys())[0]] < list(split.values())[0]

    train = ds.where(mask, drop=True)
    test = ds.where(~mask, drop=True)

    return (test, train) if reverse else (train, test)

JackknifeGPI:

motrainer.jackknife.JackknifeGPI(gpi_data, val_split_year, input_list, output_list, export_all_years=True, outpath='./jackknife_results')

GPI object for neuron netowork training using Jackknife resampling method.

Methods:

Name Description
train

performance_method='rmse', training_method='dnn', verbose=0) train neuron network with given method

export_best

export the best results in Jackknife process.

Initialize JackknifeGPI object.

Parameters:

Name Type Description Default
gpi_data DataFrame

DataFrame of a single GPI. Each row represents all properties at a certain timestamp. Each column represents a time-series of a property.

required
val_split_year int

Split year of validation. All data after (include) this year will be reserved for benchmarking.

required
input_list list of str

Column names in gpi_data will will be used as input.

required
output_list list of str

Column names in gpi_data will will be used as output.

required
export_all_years bool

Switch to export the results of all years, by default True

True
outpath str

Results exporting path, by default './jackknife_results'

'./jackknife_results'
Source code in motrainer/jackknife.py
def __init__(self,
             gpi_data,
             val_split_year,
             input_list,
             output_list,
             export_all_years=True,
             outpath='./jackknife_results'):
    """Initialize JackknifeGPI object.

    Parameters
    ----------
    gpi_data : pandas.DataFrame
        DataFrame of a single GPI.
        Each row represents all properties at a certain timestamp.
        Each column represents a time-series of a property.
    val_split_year : int
        Split year of validation. All data after (include) this year will
        be reserved for benchmarking.
    input_list : list of str
        Column names in gpi_data will will be used as input.
    output_list : list of str
        Column names in gpi_data will will be used as output.
    export_all_years : bool, optional
        Switch to export the results of all years, by default True
    outpath : str, optional
        Results exporting path, by default './jackknife_results'
    """
    logger.info('Initializing Jackkinfe trainning:\n'
                f'val_split_year: {val_split_year}\n'
                f'input_list: {input_list}\n'
                f'output_list: {output_list}\n')

    assert not (
        gpi_data.isnull().values.any()), 'Nan value(s) in gpi_data!'

    self.gpi_data = gpi_data
    self.input_list = input_list
    self.output_list = output_list
    self.gpi_input = gpi_data[input_list].copy()
    self.gpi_output = gpi_data[output_list].copy()
    self.val_split_year = val_split_year
    self.export_all_years = export_all_years
    self.outpath = outpath
    Path(self.outpath).parent.mkdir(parents=True, exist_ok=True)

export_best(model_name='best_optimized_model')

Export the best results in Jackknife process.

Source code in motrainer/jackknife.py
def export_best(self, model_name='best_optimized_model'):
    """Export the best results in Jackknife process."""
    logger.info(
        'Exporting model and hyperparameters of '
        f'year {self.best_year} to {self.outpath}'
        )

    if model_name is not None:
        path_model = f'{self.outpath}/{model_name}_{self.best_year}'
    else:
        path_model = f'{self.outpath}/best_optimized_model_{self.best_year}'

    # write metadata
    metedata = {}
    metedata['input_list'] = self.input_list
    metedata['output_list'] = self.input_list
    metedata['best_year'] = int(self.best_year)

    for key in ['lat', 'lon', 'latitude', 'longitude']:
        if key in self.gpi_data.keys():
            metedata[key] = float(self.gpi_data[key].iloc[0])

    self.best_train.export(path_model=path_model, meta_data=metedata)

train(searching_space, optimize_space, normalize_method='standard', performance_method='rmse', training_method='dnn', verbose=0)

Train neuron network with Jackknife resampling method.

Procedures: 1. Reserve in/output after self.val_split_year for later benchmarking. 2. From the rest in/output data, leave out one year as validation data. 3. Perform neuron network training. 4. Repeat Step 2 and 3 until all years exept benchmarking years have been used for validation. 5. Select the best trainning by best performance. 6. Perform benchmarking on reserved data.

Parameters:

Name Type Description Default
searching_space dict

Arguments of searching space.

required
optimize_space dict

Arguments of optimazation space.

required
normalize_method str

Method of normalization. Choose from 'standard' and 'min_max'. By default 'standard'

'standard'
performance_method str

Method of computing performance. Choose from 'rmse', 'mae', 'pearson' and 'spearman'. By default 'rmse'.

'rmse'
training_method str

Traning method selection. Select from 'dnn' or 'dnn_lossweights'. By default 'dnn'

'dnn'
verbose int

Control the verbosity. By default 0, which means no screen feedback.

0
Source code in motrainer/jackknife.py
def train(self,
          searching_space,
          optimize_space,
          normalize_method='standard',
          performance_method='rmse',
          training_method='dnn',
          verbose=0):
    """Train neuron network with Jackknife resampling method.

    Procedures:
    1. Reserve in/output after self.val_split_year for later benchmarking.
    2. From the rest in/output data, leave out one year as validation data.
    3. Perform neuron network training.
    4. Repeat Step 2 and 3 until all years exept benchmarking years have
        been used for validation.
    5. Select the best trainning by best performance.
    6. Perform benchmarking on reserved data.

    Parameters
    ----------
    searching_space : dict
        Arguments of searching space.
    optimize_space : dict
        Arguments of optimazation space.
    normalize_method : str, optional
        Method of normalization. Choose from 'standard' and 'min_max'.
        By default 'standard'
    performance_method : str, optional
        Method of computing performance. Choose from 'rmse', 'mae',
        'pearson' and 'spearman'.
        By default 'rmse'.
    training_method : str, optional
        Traning method selection. Select from 'dnn' or 'dnn_lossweights'.
        By default 'dnn'
    verbose : int, optional
        Control the verbosity.
        By default 0, which means no screen feedback.
    """
    # Data normalization
    logger.debug(f'Normalizing input/output data. Method: {normalize_method}.')
    self.gpi_input[:], scaler_input = normalize(self.gpi_input,
                                                normalize_method)
    self.gpi_output[:], scaler_output = normalize(self.gpi_output,
                                                  normalize_method)

    # Data split
    input_years = self.gpi_input.index.year
    output_years = self.gpi_output.index.year
    logger.debug(
        f'Spliting Trainning and validation data. \
          Split year: {self.val_split_year}.')
    jackknife_input = self.gpi_input[input_years < self.val_split_year]
    jackknife_output = self.gpi_output[output_years < self.val_split_year]
    vali_input = self.gpi_input[input_years >= self.val_split_year]
    vali_output = self.gpi_output[output_years >= self.val_split_year]
    year_list = jackknife_input.index.year.unique()

    # Jackknife in time
    loo = LeaveOneOut()
    best_perf_sum = None
    for train_index, test_index in loo.split(year_list): # noqa: B007
        this_year = year_list[test_index[0]]

        input_years = jackknife_input.index.year
        output_years = jackknife_output.index.year

        logger.info(f'Jackknife on year: {this_year}.')
        train_input = jackknife_input[input_years != this_year]
        train_output = jackknife_output[output_years != this_year]

        # check if train_input and train_output are empty, raise value error
        if train_input.empty or train_output.empty:
            raise ValueError(
                'Trainning data is empty. Please check the val_split_year.'
                )

        test_input = jackknife_input[input_years == this_year]
        test_output = jackknife_output[output_years == this_year]

        # check if test_input and test_output are empty, raise value error
        if test_input.empty or test_output.empty:
            raise ValueError(
                'Testing data is empty. Please check the val_split_year.'
                )

        # Execute training
        training = NNTrain(train_input, train_output)

        # Set searching space
        training.update_space(**searching_space)

        # Optimization
        training.optimize(**optimize_space,
                          training_method=training_method,
                          verbose=verbose)

        # TODO: Add warning if no model selected for the year
        if training.model is None:
            logger.warning(f'No best model was found for year: {str(this_year)}.')
            continue

        if self.export_all_years:
            path_model = f'{self.outpath}/all_years/optimized_model_{this_year}'
            training.export(path_model=path_model)

        # find minimum rmse
        # TODO: mae, pearson, spearman
        apr_perf = performance(test_input, test_output, training.model,
                               performance_method, scaler_output)
        perf_sum = np.nansum(apr_perf)
        if best_perf_sum is None:
            best_perf_sum = perf_sum
        if perf_sum <= best_perf_sum:
            self.apr_perf = apr_perf
            self.post_perf = performance(vali_input, vali_output,
                                         training.model,
                                         performance_method, scaler_output)
            self.best_train = training
            self.best_year = this_year
    logger.info(f'Found best year: {str(self.best_year)}'
                f'A-priori performance: {self.apr_perf}'
                f'Post-priori performance: {self.post_perf}')

Utility Functions:

motrainer.util

normalize(data, method)

Pre-normalization for input/output.

Parameters:

Name Type Description Default
data DataFrAME

Data to normalize.

required
method str

Data to normalize. Choose from 'standard' or 'min_max'.

required

Returns:

Type Description
list

A list of [data_norm, scaler]. Normalized data and scaler used for normalization.

Source code in motrainer/util.py
def normalize(data, method):
    """Pre-normalization for input/output.

    Parameters
    ----------
    data : pandas.DataFrAME
        Data to normalize.
    method : str
        Data to normalize. Choose from 'standard' or 'min_max'.

    Returns
    -------
    list
        A list of [data_norm, scaler]. Normalized data and scaler used for
        normalization.

    """
    if method == 'standard':
        scaler = sklearn.preprocessing.StandardScaler()
    elif method == 'min_max':
        scaler = sklearn.preprocessing.MinMaxScaler()
    else:
        raise NotImplementedError

    data_norm = scaler.fit_transform(data)
    return data_norm, scaler

performance(data_input, data_label, model, method, scaler_output=None)

Compute performance of trained neuron netowrk.

Parameters:

Name Type Description Default
data_input DataFrame

Input data.

required
data_label DataFrame

Label data.

required
model models

Trained model to compute performance.

required
method str

Method to compute

required
scaler_output optional

Scaler of output, by default None. When not None, function will assume that a normalization has been performed to output, and will use scaler_output to transform the output back to the original scale.

None

Returns:

Type Description
float or list of float

Performance value. If the model gives multiple output, the performance will be a list.

Source code in motrainer/util.py
def performance(data_input, data_label, model, method, scaler_output=None):
    """Compute performance of trained neuron netowrk.

    Parameters
    ----------
    data_input : pandas.DataFrame
        Input data.
    data_label : pandas.DataFrame
        Label data.
    model : tf.keras.models
        Trained model to compute performance.
    method : str
        Method to compute
    scaler_output : optional
        Scaler of output, by default None.
        When not None, function will assume that a normalization has been
        performed to output, and will use scaler_output to transform the output
        back to the original scale.

    Returns
    -------
    float or list of float
        Performance value. If the model gives multiple output, the performance
        will be a list.
    """
    predicted = model.predict(data_input)

    # In case multiple outputs, re-arrange to one df
    if isinstance(predicted, list):
        predicted = np.hstack(predicted)

    # Scale back if the data was normalized
    if scaler_output is not None:
        re_predicted = scaler_output.inverse_transform(predicted, 'f')
        re_label = scaler_output.inverse_transform(data_label, 'f')
    else:
        re_predicted = predicted
        re_label = data_label

    difference = re_predicted - re_label
    perf = np.zeros([predicted.shape[1], 1])
    if method == 'rmse':
        for j in range(predicted.shape[1]):
            perf[j, 0] = np.round(np.sqrt(((difference[j])**2).mean()), 5)
    elif method == 'mae':
        for j in range(predicted.shape[1]):
            perf[j, 0] = np.round((difference[j].mean()), 5)
    elif method == 'pearson':
        for j in range(predicted.shape[1]):
            perf[j, 0] = np.round(pearsonr(re_predicted[:, j], re_label[:, j]),
                                  5)[0]
    elif method == 'spearman':
        for j in range(predicted.shape[1]):
            perf[j,
                 0] = np.round(spearmanr(re_predicted[:, j], re_label[:, j]),
                               5)[0]

    return perf

sklearn_load(path_model)

Load sklearn model from hdf5 file.

Parameters:

Name Type Description Default
path_model str

Path to the model.

required

Returns:

Type Description
model

Sklearn model.

Source code in motrainer/util.py
def sklearn_load(path_model):
    """Load sklearn model from hdf5 file.

    Parameters
    ----------
    path_model : str
        Path to the model.

    Returns
    -------
    sklearn.model
        Sklearn model.

    """
    with h5py.File(path_model, 'r') as f:
        if 'model' not in f.attrs:
            raise ValueError("No model found in the hdf5 file.")

        model_base64 = f.attrs['model']

        # Decode the bytes
        model_bytes = base64.b64decode(model_base64)

        # Load the model
        model = pickle.loads(model_bytes)

        meta_data = {}
        for key in f.attrs.keys():
            if key != 'model':
                meta_data[key] = f.attrs[key]


    return model, meta_data

sklearn_save(model, path_model, meta_data=None)

Save sklearn model to hdf5 file.

Parameters:

Name Type Description Default
model model

Sklearn model to save.

required
path_model str

Path to save the model.

required
meta_data Dict

optional. A dict of meta data to save.

None
Source code in motrainer/util.py
def sklearn_save(model, path_model, meta_data=None):
    """Save sklearn model to hdf5 file.

    Parameters
    ----------
    model : sklearn.model
        Sklearn model to save.
    path_model : str
        Path to save the model.
    meta_data : Dict, optional
        optional. A dict of meta data to save.

    """
    model_bytes = pickle.dumps(model)

    # Encode the bytes as base64
    model_base64 = base64.b64encode(model_bytes)

    with h5py.File(path_model, 'w') as f:
        f.attrs['model'] = model_base64

        if meta_data is not None:
            for key, value in meta_data.items():
                f.attrs[key] = value