Modin XGBoost module description

High-level Module Overview

This module holds classes, public interface and internal functions for distributed XGBoost in Modin.

Public classes Booster, DMatrix and function train() provide the user with familiar XGBoost interfaces. They are located in the modin.experimental.xgboost.xgboost module.

The internal module modin.experimental.xgboost.xgboost.xgboost_ray contains the implementation of Modin XGBoost for the Ray backend. This module mainly consists of the Ray actor-class ModinXGBoostActor, a function to distribute Modin’s partitions between actors _assign_row_partitions_to_actors(), an internal _train()/_predict() function used from the public interfaces and additional util functions for computing cluster resources, actor creations etc.

Public interfaces

DMatrix inherits original class xgboost.DMatrix and overrides its constructor, which currently supports only data and label parameters. Both of the parameters must be modin.pandas.DataFrame, which will be internally unwrapped to lists of delayed objects of Modin’s row partitions using the function unwrap_partitions().

class modin.experimental.xgboost.DMatrix(data, label)

DMatrix holds references to partitions of Modin DataFrame.

On init stage unwrapping partitions of Modin DataFrame is started.

Parameters
  • data (modin.pandas.DataFrame) – Data source of DMatrix.

  • label (modin.pandas.DataFrame or modin.pandas.Series) – Labels used for training.

Notes

Currently DMatrix supports only data and label parameters.

Booster inherits original class xgboost.Booster and overrides method predict. The main differences from original class interface for predict method are: (1) changing the type of the data parameter to DMatrix, and (2) a new parameter num_actors, which specifies the number of actors to run for prediction.

class modin.experimental.xgboost.Booster(params=None, cache=(), model_file=None)

A Modin Booster of XGBoost.

Booster is the model of XGBoost, that contains low level routines for training, prediction and evaluation.

Parameters
  • params (dict, optional) – Parameters for boosters.

  • cache (list, default: empty) – List of cache items.

  • model_file (string/os.PathLike/xgb.Booster/bytearray, optional) – Path to the model file if it’s string or PathLike or xgb.Booster.

predict(data: modin.experimental.xgboost.xgboost.DMatrix, num_actors: Optional[int] = None, **kwargs)

Run distributed prediction with a trained booster.

During work it evenly distributes data between workers, runs xgb.predict on each worker for subset of data and creates Modin DataFrame with prediction results.

Parameters
  • data (modin.experimental.xgboost.DMatrix) – Input data used for prediction.

  • num_actors (int, optional) – Number of actors for prediction. If unspecified, this value will be computed automatically.

  • **kwargs (dict) – Other parameters are the same as xgboost.Booster.predict.

Returns

Modin DataFrame with prediction results.

Return type

modin.pandas.DataFrame

train() function (similar to predict method of Booster) has 2 differences from the original train function - (1) the data type of dtrain parameter is DMatrix and (2) a new parameter num_actors.

modin.experimental.xgboost.train(params: Dict, dtrain: modin.experimental.xgboost.xgboost.DMatrix, *args, evals=(), num_actors: Optional[int] = None, evals_result: Optional[Dict] = None, **kwargs)

Run distributed training of XGBoost model.

During work it evenly distributes dtrain between workers according to IP addresses partitions (in case of not even distribution of dtrain over nodes, some partitions will be re-distributed between nodes), runs xgb.train on each worker for subset of dtrain and reduces training results of each worker using Rabit Context.

Parameters
  • params (dict) – Booster params.

  • dtrain (modin.experimental.xgboost.DMatrix) – Data to be trained against.

  • *args (iterable) – Other parameters for xgboost.train.

  • evals (list of pairs (modin.experimental.xgboost.DMatrix, str), default: empty) – List of validation sets for which metrics will evaluated during training. Validation metrics will help us track the performance of the model.

  • num_actors (int, optional) – Number of actors for training. If unspecified, this value will be computed automatically.

  • evals_result (dict, optional) – Dict to store evaluation results in.

  • **kwargs (dict) – Other parameters are the same as xgboost.train.

Returns

A trained booster.

Return type

modin.experimental.xgboost.Booster

Internal execution flow on Ray backend

Internal functions _train() and _predict() work similar to xgboost. Approximate execution flow of internal implementation is the following:

  1. The data is passed to _train()/_predict() function as a DMatrix object. Using an iterator of DMatrix, lists of ray.ObjectRef with row partitions of Modin DataFrame are exctracted. Example:

    # Extract lists of row partitions from dtrain (DMatrix object)
    X_row_parts, y_row_parts = dtrain
    
  2. On this step, the parameter num_actors is processed. The internal function _get_num_actors() examines the value provided by the user and checks if it fits in the set of expected values (int, “default_train”, “default_predict”).

    • int - num_actors won’t be changed. This value will be used.

    • “default_train” - num_actors will be computed using condition that 1 actor should use maximum 2 CPUs. This condition was chosen for using maximum parallel workers with multithreaded XGBoost training (2 threads per worker will be used in this case).

    • “default_predict” - num_actors will be computed using condition that 1 actor should use maximum 8 CPUs. This condition was chosen to combine parallelization techniques: parallel actors and parallel threads.

Note

num_actors parameter is made available for public functions train() and predict method of Booster class to allow fine-tuning for obtaining the best performance in specific use cases.

  1. ray.util.placement_group is created to reserve all available Ray resources. After that ModinXGBoostActor objects are created using resources of the previously created placement group.

  2. Data (dtrain for _train(), data for _predict()) is split between actors evenly. The internal function _split_data_across_actors() runs assigning row partitions to actors using internal function _assign_row_partitions_to_actors(). This function creates a dictionary in the form: {actor_rank: ([part_i0, part_i3, ..], [0, 3, ..]), ..} for training, {actor_rank: [part_i0, part_i1, ..], ..} for prediction.

Note

_assign_row_partitions_to_actors() takes into account IP addresses of row partitions of dtrain data to minimize excess data transfer.

  1. For each ModinXGBoostActor the object methods set_train_data or set_predict_data are called remotely. Those methods run by loading row partitions in actor according to the dictionary with partitions distribution from previous step. When data is passed to the actor, the row partitions are automatically materialized (ray.ObjectRef -> pandas.DataFrame).

  2. Methods train or predict of ModinXGBoostActor class object are called remotely.

    • train: method runs XGBoost training on local data of actor, connects to Rabit Tracker for sharing training state between actors and returns dictionary with booster and evaluation results.

    • predict: method runs XGBoost prediction on local data of actor and returns IP address of actor and partial prediction (pandas.DataFrame).

  3. On the final stage results from actors are returned.

    • train: booster and evals_result is returned using ray.get function from remote actor. Placement group which was created on the step 3 is removed to free resources. Booster object is created and returned to user.

    • predict: using ray.wait function we wait until all actors finish computing local predictions. Placement group which was created on the step 3 is removed to free resources. modin.pandas.DataFrame is created from ray.ObjectRef objects which is returned from actors. Modin DataFrame is returned to user.

Internal API

class modin.experimental.xgboost.xgboost_ray.ModinXGBoostActor(rank, nthread)

Ray actor-class runs training/prediction on remote workers.

Parameters
  • rank (int) – Rank of this actor.

  • nthread (int) – Number of threads used by XGBoost in this actor.

_get_dmatrix(X_y)

Create xgboost.DMatrix from sequence of pandas.DataFrame objects.

First half of X_y should contains objects for X, second for y.

Parameters

X_y (list) – List of pandas.DataFrame objects.

Returns

A XGBoost DMatrix.

Return type

xgb.DMatrix

add_eval_data(*X_y, eval_method)

Add evaluation data for actor.

Parameters
  • *X_y (iterable) – Sequence of ray.ObjectRef objects. First half of sequence is for X data, second for y. When it is passed in actor, auto-materialization of ray.ObjectRef -> pandas.DataFrame happens.

  • eval_method (str) – Name of eval data.

get_ip()

Get IP address of actor.

Returns

An IP address of node used by actor.

Return type

str

predict(booster: xgboost.core.Booster, **kwargs)

Run local XGBoost prediction.

Parameters
  • booster (xgboost.Booster) – A trained booster.

  • **kwargs (dict) – Other parameters for xgboost.Booster.predict.

Returns

Pair of IP address of caller and pandas.DataFrame with partial prediction result.

Return type

tuple

set_predict_data(*X)

Set prediction data for actor.

Parameters

*X (iterable) – Sequence of ray.ObjectRef objects. When it is passed in actor, auto-materialization of ray.ObjectRef -> pandas.DataFrame happens.

set_train_data(*X_y, add_as_eval_method=None)

Set train data for actor.

Parameters
  • *X_y (iterable) – Sequence of ray.ObjectRef objects. First half of sequence is for X data, second for y. When it is passed in actor, auto-materialization of ray.ObjectRef -> pandas.DataFrame happens.

  • add_as_eval_method (str, optional) – Name of eval data. Used in case when train data also used for evaluation.

train(rabit_args, params, *args, **kwargs)

Run local XGBoost training.

Connects to Rabit Tracker environment to share training data between actors and trains XGBoost booster using self._dtrain.

Parameters
  • rabit_args (list) – List with environment variables for Rabit Tracker.

  • params (dict) – Booster params.

  • *args (iterable) – Other parameters for xgboost.train.

  • **kwargs (dict) – Other parameters for xgboost.train.

Returns

A dictionary with trained booster and dict of evaluation results as {“booster”: xgb.Booster, “history”: dict}.

Return type

dict

modin.experimental.xgboost.xgboost_ray._assign_row_partitions_to_actors(actors: List, row_partitions, data_for_aligning=None, is_predict=False)

Assign row_partitions to actors.

In case of is_predict == False, row_partitions will be assigned to actors according to their IPs. If distribution isn’t even, partitions will be moved from actor with excess partitions to actor with lack of them.

In case of is_predict == True, row_partitions will be assigned evenly to actors in order.

Parameters
  • actors (list) – List of used actors.

  • row_partitions (list) – Row partitions of data to assign.

  • data_for_aligning (dict, optional) – Data according to the order of which should be distributed row_partitions. Used to align y with X.

  • is_predict (bool, default: False) – Is split data for predict or not.

Returns

Dictionary of assigned to actors partitions as {actor_rank: (partitions, order)}.

Return type

dict

modin.experimental.xgboost.xgboost_ray._train(dtrain, num_actors, params: Dict, *args, evals=(), **kwargs)

Run distributed training of XGBoost model on Ray backend.

During work it evenly distributes dtrain between workers according to IP addresses partitions (in case of not even distribution of dtrain by nodes, part of partitions will be re-distributed between nodes), runs xgb.train on each worker for subset of dtrain and reduces training results of each worker using Rabit Context.

Parameters
  • dtrain (modin.experimental.DMatrix) – Data to be trained against.

  • num_actors (int, optional) – Number of actors for training. If unspecified, this value will be computed automatically.

  • params (dict) – Booster params.

  • *args (iterable) – Other parameters for xgboost.train.

  • evals (list of pairs (modin.experimental.xgboost.DMatrix, str), default: empty) – List of validation sets for which metrics will be evaluated during training. Validation metrics will help us track the performance of the model.

  • **kwargs (dict) – Other parameters are the same as xgboost.train.

Returns

A dictionary with trained booster and dict of evaluation results as {“booster”: xgboost.Booster, “history”: dict}.

Return type

dict

modin.experimental.xgboost.xgboost_ray._predict(booster, data, num_actors, **kwargs)

Run distributed prediction with a trained booster on Ray backend.

During work it evenly distributes data between workers, runs xgb.predict on each worker for subset of data and creates Modin DataFrame with prediction results.

Parameters
  • booster (xgboost.Booster) – A trained booster.

  • data (modin.experimental.xgboost.DMatrix) – Input data used for prediction.

  • num_actors (int, optional) – Number of actors for prediction. If unspecified, this value will be computed automatically.

  • **kwargs (dist) – Other parameters are the same as xgboost.Booster.predict.

Returns

Modin DataFrame with prediction results.

Return type

modin.pandas.DataFrame

modin.experimental.xgboost.xgboost_ray._get_num_actors(num_actors)

Get number on actors to create.

In case num_actors is ‘default_train’ string, integer number of actors will be computed by condition 2 CPUs per 1 actor, in case ‘default_predict’ 8 CPUs per 1 actor.

Parameters

num_actors (int or {'default_train', 'default_predict'}) – Number of actors to create or str to set defaults.

Returns

Number of actors to create.

Return type

int

modin.experimental.xgboost.xgboost_ray._split_data_across_actors(actors: List, set_func, X_parts, y_parts=None, is_predict=False)

Split row partitions of data between actors.

Parameters
  • actors (list) – List of used actors.

  • set_func (callable) – The function for setting data in actor.

  • X_parts (list) – Row partitions of X data.

  • y_parts (list, optional) – Row partitions of y data.

  • is_predict (bool, default: False) – Is split data for predict or not.