Partition API in Modin

When you are working with a Modin Dataframe, you can unwrap its remote partitions to get the raw futures objects compatible with the execution engine (e.g. ray.ObjectRef for Ray). In addition to unwrapping of the remote partitions we also provide an API to construct a modin.pandas.DataFrame from raw futures objects.

Partition IPs

For finer grained placement control, Modin also provides an API to get the IP addresses of the nodes that hold each partition. You can pass the partitions having needed IPs to your function. It can help with minimazing of data movement between nodes.

unwrap_partitions

modin.distributed.dataframe.pandas.unwrap_partitions(api_layer_object, axis=None, get_ip=False)

Unwrap partitions of the api_layer_object.

Parameters:
  • api_layer_object (DataFrame or Series) – The API layer object.
  • axis (None, 0 or 1. Default is None) – The axis to unwrap partitions for (0 - row partitions, 1 - column partitions). If axis is None, the partitions are unwrapped as they are currently stored.
  • get_ip (boolean. Default is False) – Whether to get node ip address to each partition or not.
Returns:

A list of Ray.ObjectRef/Dask.Future to partitions of the api_layer_object if Ray/Dask is used as an engine.

Return type:

list

Notes

If get_ip=True, a list of tuples of Ray.ObjectRef/Dask.Future to node ip addresses and partitions of the api_layer_object, respectively, is returned if Ray/Dask is used as an engine (i.e. [(Ray.ObjectRef/Dask.Future, Ray.ObjectRef/Dask.Future), ...]).

from_partitions

modin.distributed.dataframe.pandas.from_partitions(partitions, axis)

Create DataFrame from remote partitions.

Parameters:
  • partitions (list) – A list of Ray.ObjectRef/Dask.Future to partitions depending on the engine used. Or a list of tuples of Ray.ObjectRef/Dask.Future to node ip addresses and partitions depending on the engine used (i.e. [(Ray.ObjectRef/Dask.Future, Ray.ObjectRef/Dask.Future), ...]).
  • axis (None, 0 or 1) –

    The axis parameter is used to identify what are the partitions passed. You have to set:

    • axis=0 if you want to create DataFrame from row partitions
    • axis=1 if you want to create DataFrame from column partitions
    • axis=None if you want to create DataFrame from 2D list of partitions
Returns:

DataFrame instance created from remote partitions.

Return type:

DataFrame

Example

import modin.pandas as pd
from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions
import numpy as np
data = np.random.randint(0, 100, size=(2 ** 10, 2 ** 8))
df = pd.DataFrame(data)
partitions = unwrap_partitions(df, axis=0, get_ip=True)
print(partitions)
new_df = from_partitions(partitions, axis=0)
print(new_df)

Ray engine

However, it is worth noting that for Modin on Ray engine with pandas backend IPs of the remote partitions may not match actual locations if the partitions are lower than 100 kB. Ray saves such objects (<= 100 kB, by default) in in-process store of the calling process (please, refer to Ray documentation for more information). We can’t get IPs for such objects while maintaining good performance. So, you should keep in mind this for unwrapping of the remote partitions with their IPs. Several options are provided to handle the case in How to handle Ray objects that are lower 100 kB section.

Dask engine

There is no mentioned above issue for Modin on Dask engine with pandas backend because Dask saves any objects in the worker process that processes a function (please, refer to Dask documentation for more information).

How to handle Ray objects that are lower than 100 kB

  • If you are sure that each of the remote partitions being unwrapped is higher than 100 kB, you can just import Modin or perform ray.init() manually.
  • If you don’t know partition sizes you can pass the option _system_config={"max_direct_call_object_size": <nbytes>,}, where nbytes is threshold for objects that will be stored in in-process store, to ray.init().
  • You can also start Ray as follows: ray start --head --system-config='{"max_direct_call_object_size":<nbytes>}'.

Note that when specifying the threshold the performance of some Modin operations may change.