PandasDataframePartitionManager#
The class is base for any partition manager class of pandas
storage format and serves as
intermediate level between Modin PandasDataframe and conforming partition class.
The class is responsible for partitions manipulation and applying a function to individual partitions:
block partitions, row partitions or column partitions, i.e. the class can form axis partitions from
block partitions to apply a function if an operation requires access to an entire column or row.
The class translates frame API into partition API and also can have some preprocessing operations
depending on the partition type for improving performance (for example,
preprocess_func()
).
Main task of partition manager is to keep knowledge of how partitions are stored and managed internal to itself, so surrounding code could use it via lean enough API without worrying about implementation details.
Partition manager can apply user-passed (arbitrary) function in different modes:
block-wise (apply a function to individual block partitions):
optionally accepting partition indices along each axis
optionally accepting an item to be split so parts of it would be sent to each partition
along a full axis (apply a function to an entire column or row made up of block partitions when user function needs information about the whole axis)
It can also broadcast partitions from right to left when executing certain operations making right partitions available for functions executed where left live.
Partition manager also is used to create “logical” partitions, or axis partitions by joining existing partitions along specified axis (either rows or labels), and to concatenate different partition sets along given axis.
It also maintains mapping from “external” (end user-visible) indices along all axes to internal indices which are actually pairs of indices of partitions and indices inside the partitions, as well as manages conversion to numpy and pandas representations.
Public API#
- class modin.core.dataframe.pandas.partitioning.partition_manager.PandasDataframePartitionManager#
Base class for managing the dataframe data layout and operators across the distribution of partitions.
Partition class is the class to use for storing each partition. Each partition must extend the PandasDataframePartition class.
- classmethod apply_func_to_indices_both_axis(partitions, func, row_partitions_list, col_partitions_list, item_to_distribute=_NoDefault.no_default, row_lengths=None, col_widths=None)#
Apply a function along both axes.
- Parameters:
partitions (np.ndarray) – The partitions to which the func will apply.
func (callable) – The function to apply.
row_partitions_list (iterable of tuples) –
- Iterable of tuples, containing 2 values:
Integer row partition index.
Internal row indexer of this partition.
col_partitions_list (iterable of tuples) –
- Iterable of tuples, containing 2 values:
Integer column partition index.
Internal column indexer of this partition.
item_to_distribute (np.ndarray or scalar, default: no_default) – The item to split up so it can be applied over both axes.
row_lengths (list of ints, optional) – Lengths of partitions for every row. If not specified this information is extracted from partitions itself.
col_widths (list of ints, optional) – Widths of partitions for every column. If not specified this information is extracted from partitions itself.
- Returns:
A NumPy array with partitions.
- Return type:
np.ndarray
Notes
For your func to operate directly on the indices provided, it must use row_internal_indices, col_internal_indices as keyword arguments.
- classmethod apply_func_to_select_indices(axis, partitions, func, indices, keep_remaining=False)#
Apply a function to select indices.
- Parameters:
axis ({0, 1}) – Axis to apply the func over.
partitions (np.ndarray) – The partitions to which the func will apply.
func (callable) – The function to apply to these indices of partitions.
indices (dict) – The indices to apply the function to.
keep_remaining (bool, default: False) – Whether or not to keep the other partitions. Some operations may want to drop the remaining partitions and keep only the results.
- Returns:
A NumPy array with partitions.
- Return type:
np.ndarray
Notes
Your internal function must take a kwarg internal_indices for this to work correctly. This prevents information leakage of the internal index to the external representation.
- classmethod apply_func_to_select_indices_along_full_axis(axis, partitions, func, indices, keep_remaining=False)#
Apply a function to a select subset of full columns/rows.
- Parameters:
axis ({0, 1}) – The axis to apply the function over.
partitions (np.ndarray) – The partitions to which the func will apply.
func (callable) – The function to apply.
indices (list-like) – The global indices to apply the func to.
keep_remaining (bool, default: False) – Whether or not to keep the other partitions. Some operations may want to drop the remaining partitions and keep only the results.
- Returns:
A NumPy array with partitions.
- Return type:
np.ndarray
Notes
This should be used when you need to apply a function that relies on some global information for the entire column/row, but only need to apply a function to a subset. For your func to operate directly on the indices provided, it must use internal_indices as a keyword argument.
- classmethod axis_partition(partitions, axis, full_axis: bool = True)#
Logically partition along given axis (columns or rows).
- Parameters:
partitions (list-like) – List of partitions to be combined.
axis ({0, 1}) – 0 for column partitions, 1 for row partitions.
full_axis (bool, default: True) – Whether or not this partition contains the entire column axis.
- Returns:
A list of BaseDataframeAxisPartition objects.
- Return type:
list
- classmethod base_broadcast_apply(axis, apply_func, left, right)#
Broadcast the right partitions to left and apply apply_func function.
- Parameters:
axis ({0, 1}) – Axis to apply and broadcast over.
apply_func (callable) – Function to apply.
left (np.ndarray) – NumPy array of left partitions.
right (np.ndarray) – NumPy array of right partitions.
- Returns:
NumPy array of result partition objects.
- Return type:
np.ndarray
Notes
This will often be overridden by implementations. It materializes the entire partitions of the right and applies them to the left through apply.
- classmethod base_map_partitions(partitions, map_func, func_args=None, func_kwargs=None)#
Apply map_func to every partition in partitions.
- Parameters:
partitions (NumPy 2D array) – Partitions housing the data of Modin Frame.
map_func (callable) – Function to apply.
func_args (iterable, optional) – Positional arguments for the ‘map_func’.
func_kwargs (dict, optional) – Keyword arguments for the ‘map_func’.
- Returns:
An array of partitions
- Return type:
NumPy array
- classmethod broadcast_apply(axis, apply_func, left, right)#
Broadcast the right partitions to left and apply apply_func function using different approaches to achieve the best performance.
- Parameters:
axis ({0, 1}) – Axis to apply and broadcast over.
apply_func (callable) – Function to apply.
left (np.ndarray) – NumPy array of left partitions.
right (np.ndarray) – NumPy array of right partitions.
- Returns:
NumPy array of result partition objects.
- Return type:
np.ndarray
- classmethod broadcast_apply_select_indices(axis, apply_func, left, right, left_indices, right_indices, keep_remaining=False)#
Broadcast the right partitions to left and apply apply_func to selected indices.
- Parameters:
axis ({0, 1}) – Axis to apply and broadcast over.
apply_func (callable) – Function to apply.
left (NumPy 2D array) – Left partitions.
right (NumPy 2D array) – Right partitions.
left_indices (list-like) – Indices to apply function to.
right_indices (dictionary of indices of right partitions) – Indices that you want to bring at specified left partition, for example dict {key: {key1: [0, 1], key2: [5]}} means that in left[key] you want to broadcast [right[key1], right[key2]] partitions and internal indices for right must be [[0, 1], [5]].
keep_remaining (bool, default: False) – Whether or not to keep the other partitions. Some operations may want to drop the remaining partitions and keep only the results.
- Returns:
An array of partition objects.
- Return type:
NumPy array
Notes
Your internal function must take these kwargs: [internal_indices, other, internal_other_indices] to work correctly!
- classmethod broadcast_axis_partitions(axis, apply_func, left, right, keep_partitioning=False, num_splits=None, apply_indices=None, broadcast_all=True, enumerate_partitions=False, lengths=None, apply_func_args=None, **kwargs)#
Broadcast the right partitions to left and apply apply_func along full axis.
- Parameters:
axis ({0, 1}) – Axis to apply and broadcast over.
apply_func (callable) – Function to apply.
left (NumPy 2D array) – Left partitions.
right (NumPy 2D array) – Right partitions.
keep_partitioning (boolean, default: False) – The flag to keep partition boundaries for Modin Frame if possible. Setting it to True disables shuffling data from one partition to another in case the resulting number of splits is equal to the initial number of splits.
num_splits (int, optional) – The number of partitions to split the result into across the axis. If None, then the number of splits will be infered automatically. If num_splits is None and keep_partitioning=True then the number of splits is preserved.
apply_indices (list of ints, default: None) – Indices of axis ^ 1 to apply function over.
broadcast_all (bool, default: True) – Whether or not to pass all right axis partitions to each of the left axis partitions.
enumerate_partitions (bool, default: False) – Whether or not to pass partition index into apply_func. Note that apply_func must be able to accept partition_idx kwarg.
lengths (list of ints, default: None) –
- The list of lengths to shuffle the object. Note:
1. Passing lengths omits the num_splits parameter as the number of splits will now be inferred from the number of integers present in lengths. 2. When passing lengths you must explicitly specify keep_partitioning=False.
apply_func_args (list-like, optional) – Positional arguments to pass to the func.
**kwargs (dict) – Additional options that could be used by different engines.
- Returns:
An array of partition objects.
- Return type:
NumPy array
- classmethod column_partitions(partitions, full_axis=True)#
Get the list of BaseDataframeAxisPartition objects representing column-wise partitions.
- Parameters:
partitions (list-like) – List of (smaller) partitions to be combined to column-wise partitions.
full_axis (bool, default: True) – Whether or not this partition contains the entire column axis.
- Returns:
A list of BaseDataframeAxisPartition objects.
- Return type:
list
Notes
Each value in this list will be an BaseDataframeAxisPartition object. BaseDataframeAxisPartition is located in axis_partition.py.
- classmethod combine(partitions, new_index=None, new_columns=None)#
Convert a NumPy 2D array of partitions to a NumPy 2D array of a single partition.
- Parameters:
partitions (np.ndarray) – The partitions which have to be converted to a single partition.
new_index (pandas.Index, optional) – Index for propagation into internal partitions. Optimization allowing to do this in one remote kernel.
new_columns (pandas.Index, optional) – Columns for propagation into internal partitions. Optimization allowing to do this in one remote kernel.
- Returns:
A NumPy 2D array of a single partition.
- Return type:
np.ndarray
- classmethod concat(axis, left_parts, right_parts)#
Concatenate the blocks of partitions with another set of blocks.
- Parameters:
axis (int) – The axis to concatenate to.
left_parts (np.ndarray) – NumPy array of partitions to concatenate with.
right_parts (np.ndarray or list) – NumPy array of partitions to be concatenated.
- Returns:
np.ndarray – A new NumPy array with concatenated partitions.
list[int] or None – Row lengths if possible to compute it.
Notes
Assumes that the blocks are already the same shape on the dimension being concatenated. A ValueError will be thrown if this condition is not met.
- classmethod create_partition_from_metadata(dtypes: Optional[Series] = None, **metadata)#
Create NumPy array of partitions that holds an empty dataframe with given metadata.
- Parameters:
dtypes (pandas.Series, optional) – Column dtypes. Upon creating a pandas DataFrame from metadata we call astype since pandas doesn’t allow to pass a list of dtypes directly in the constructor.
**metadata (dict) – Metadata that has to be wrapped in a partition.
- Returns:
A NumPy 2D array of a single partition which contains the data.
- Return type:
np.ndarray
- classmethod finalize(partitions)#
Perform all deferred calls on partitions.
- Parameters:
partitions (np.ndarray) – Partitions of Modin Dataframe on which all deferred calls should be performed.
- classmethod from_arrow(at, return_dims=False)#
Return the partitions from Apache Arrow (PyArrow).
- Parameters:
at (pyarrow.table) – Arrow Table.
return_dims (bool, default: False) – If it’s True, return as (np.ndarray, row_lengths, col_widths), else np.ndarray.
- Returns:
A NumPy array with partitions (with dimensions or not).
- Return type:
(np.ndarray, backend) or (np.ndarray, backend, row_lengths, col_widths)
- classmethod from_pandas(df, return_dims=False)#
Return the partitions from pandas.DataFrame.
- Parameters:
df (pandas.DataFrame) – A pandas.DataFrame.
return_dims (bool, default: False) – If it’s True, return as (np.ndarray, row_lengths, col_widths), else np.ndarray.
- Returns:
A NumPy array with partitions (with dimensions or not).
- Return type:
(np.ndarray, backend) or (np.ndarray, backend, row_lengths, col_widths)
- classmethod get_indices(axis, partitions, index_func=None)#
Get the internal indices stored in the partitions.
- Parameters:
axis ({0, 1}) – Axis to extract the labels over.
partitions (np.ndarray) – NumPy array with PandasDataframePartition’s.
index_func (callable, default: None) – The function to be used to extract the indices.
- Returns:
pandas.Index – A pandas Index object.
list of pandas.Index – The list of internal indices for each partition.
Notes
These are the global indices of the object. This is mostly useful when you have deleted rows/columns internally, but do not know which ones were deleted.
- classmethod get_objects_from_partitions(partitions)#
Get the objects wrapped by partitions (in parallel if supported).
- Parameters:
partitions (np.ndarray) – NumPy array with
PandasDataframePartition
-s.- Returns:
The objects wrapped by partitions.
- Return type:
list
- classmethod groupby_reduce(axis, partitions, by, map_func, reduce_func, apply_indices=None)#
Groupby data using the map_func provided along the axis over the partitions then reduce using reduce_func.
- Parameters:
axis ({0, 1}) – Axis to groupby over.
partitions (NumPy 2D array) – Partitions of the ModinFrame to groupby.
by (NumPy 2D array) – Partitions of ‘by’ to broadcast.
map_func (callable) – Map function.
reduce_func (callable,) – Reduce function.
apply_indices (list of ints, default: None) – Indices of axis ^ 1 to apply function over.
- Returns:
Partitions with applied groupby.
- Return type:
NumPy array
- classmethod lazy_map_partitions(partitions, map_func, func_args=None, func_kwargs=None, enumerate_partitions=False)#
Apply map_func to every partition in partitions lazily.
- Parameters:
partitions (NumPy 2D array) – Partitions of Modin Frame.
map_func (callable) – Function to apply.
func_args (iterable, optional) – Positional arguments for the ‘map_func’.
func_kwargs (dict, optional) – Keyword arguments for the ‘map_func’.
enumerate_partitions (bool, default: False) –
- Returns:
An array of partitions
- Return type:
NumPy array
- classmethod map_axis_partitions(axis, partitions, map_func, keep_partitioning=False, num_splits=None, lengths=None, enumerate_partitions=False, **kwargs)#
Apply map_func to every partition in partitions along given axis.
- Parameters:
axis ({0, 1}) – Axis to perform the map across (0 - index, 1 - columns).
partitions (NumPy 2D array) – Partitions of Modin Frame.
map_func (callable) – Function to apply.
keep_partitioning (boolean, default: False) – The flag to keep partition boundaries for Modin Frame if possible. Setting it to True disables shuffling data from one partition to another in case the resulting number of splits is equal to the initial number of splits.
num_splits (int, optional) – The number of partitions to split the result into across the axis. If None, then the number of splits will be infered automatically. If num_splits is None and keep_partitioning=True then the number of splits is preserved.
lengths (list of ints, default: None) –
- The list of lengths to shuffle the object. Note:
1. Passing lengths omits the num_splits parameter as the number of splits will now be inferred from the number of integers present in lengths. 2. When passing lengths you must explicitly specify keep_partitioning=False.
enumerate_partitions (bool, default: False) – Whether or not to pass partition index into map_func. Note that map_func must be able to accept partition_idx kwarg.
**kwargs (dict) – Additional options that could be used by different engines.
- Returns:
An array of new partitions for Modin Frame.
- Return type:
NumPy array
Notes
This method should be used in the case when map_func relies on some global information about the axis.
- classmethod map_partitions(partitions, map_func, func_args=None, func_kwargs=None)#
Apply map_func to partitions using different approaches to achieve the best performance.
- Parameters:
partitions (NumPy 2D array) – Partitions housing the data of Modin Frame.
map_func (callable) – Function to apply.
func_args (iterable, optional) – Positional arguments for the ‘map_func’.
func_kwargs (dict, optional) – Keyword arguments for the ‘map_func’.
- Returns:
An array of partitions
- Return type:
NumPy array
- classmethod map_partitions_joined_by_column(partitions, column_splits, map_func, map_func_args=None, map_func_kwargs=None)#
Combine several blocks by column into one virtual partition and apply “map_func” to them.
- Parameters:
partitions (NumPy 2D array) – Partitions of Modin Frame.
column_splits (int) – The number of splits by column.
map_func (callable) – Function to apply.
map_func_args (iterable, optional) – Positional arguments for the ‘map_func’.
map_func_kwargs (dict, optional) – Keyword arguments for the ‘map_func’.
- Returns:
An array of new partitions for Modin Frame.
- Return type:
NumPy array
- classmethod materialize_futures(input_list)#
Materialize all futures in the input list.
- Parameters:
input_list (list) – The list that has to be manipulated.
- Returns:
A new list with materialized objects.
- Return type:
list
- classmethod n_ary_operation(left, func, right: list)#
Apply an n-ary operation to multiple
PandasDataframe
objects.This method assumes that all the partitions of the dataframes in left and right have the same dimensions. For each position i, j in each dataframe’s partitions, the result has a partition at (i, j) whose data is func(left_partitions[i,j], *each_right_partitions[i,j]).
- Parameters:
left (np.ndarray) – The partitions of left
PandasDataframe
.func (callable) – The function to apply.
right (list of np.ndarray) – The list of partitions of other
PandasDataframe
.
- Returns:
A NumPy array with new partitions.
- Return type:
np.ndarray
- classmethod preprocess_func(map_func)#
Preprocess a function to be applied to PandasDataframePartition objects.
- Parameters:
map_func (callable) – The function to be preprocessed.
- Returns:
The preprocessed version of the map_func provided.
- Return type:
callable
Notes
Preprocessing does not require any specific format, only that the PandasDataframePartition.apply method will recognize it (for the subclass being used).
If your PandasDataframePartition objects assume that a function provided is serialized or wrapped or in some other format, this is the place to add that logic. It is possible that this can also just return map_func if the apply method of the PandasDataframePartition object you are using does not require any modification to a given function.
- classmethod rebalance_partitions(partitions)#
Rebalance a 2-d array of partitions if we are using
PandasOnRay
orPandasOnDask
executions.For all other executions, the partitions are returned unchanged.
Rebalance the partitions by building a new array of partitions out of the original ones so that:
If all partitions have a length, each new partition has roughly the same number of rows.
Otherwise, each new partition spans roughly the same number of old partitions.
- Parameters:
partitions (np.ndarray) – The 2-d array of partitions to rebalance.
- Returns:
np.ndarray – A NumPy array with the same; or new, rebalanced, partitions, depending on the execution engine and storage format.
list[int] or None – Row lengths if possible to compute it.
- classmethod row_partitions(partitions)#
List of BaseDataframeAxisPartition objects representing row-wise partitions.
- Parameters:
partitions (list-like) – List of (smaller) partitions to be combined to row-wise partitions.
- Returns:
A list of BaseDataframeAxisPartition objects.
- Return type:
list
Notes
Each value in this list will an BaseDataframeAxisPartition object. BaseDataframeAxisPartition is located in axis_partition.py.
- classmethod shuffle_partitions(partitions, index, shuffle_functions: ShuffleFunctions, final_shuffle_func, right_partitions=None)#
Return shuffled partitions.
- Parameters:
partitions (np.ndarray) – The 2-d array of partitions to shuffle.
index (int or list of ints) – The index(es) of the column partitions corresponding to the partitions that contain the column to sample.
shuffle_functions (ShuffleFunctions) – An object implementing the functions that we will be using to perform this shuffle.
final_shuffle_func (Callable(pandas.DataFrame) -> pandas.DataFrame) – Function that shuffles the data within each new partition.
right_partitions (np.ndarray, optional) – Partitions to broadcast to self partitions. If specified, the method builds range-partitioning for right_partitions basing on bins calculated for partitions, then performs broadcasting.
- Returns:
A list of row-partitions that have been shuffled.
- Return type:
np.ndarray
- classmethod split_pandas_df_into_partitions(df, row_chunksize, col_chunksize, update_bar)#
Split given pandas DataFrame according to the row/column chunk sizes into distributed partitions.
- Parameters:
df (pandas.DataFrame) –
row_chunksize (int) –
col_chunksize (int) –
update_bar (callable(x) -> x) – Function that updates a progress bar.
- Return type:
2D np.ndarray[PandasDataframePartition]
- classmethod to_numpy(partitions, **kwargs)#
Convert NumPy array of PandasDataframePartition to NumPy array of data stored within partitions.
- Parameters:
partitions (np.ndarray) – NumPy array of PandasDataframePartition.
**kwargs (dict) – Keyword arguments for PandasDataframePartition.to_numpy function.
- Returns:
A NumPy array.
- Return type:
np.ndarray
- classmethod to_pandas(partitions)#
Convert NumPy array of PandasDataframePartition to pandas DataFrame.
- Parameters:
partitions (np.ndarray) – NumPy array of PandasDataframePartition.
- Returns:
A pandas DataFrame
- Return type:
pandas.DataFrame
- classmethod wait_partitions(partitions)#
Wait on the objects wrapped by partitions, without materializing them.
This method will block until all computations in the list have completed.
- Parameters:
partitions (np.ndarray) – NumPy array with
PandasDataframePartition
-s.
Notes
This method should be implemented in a more efficient way for engines that supports waiting on objects in parallel.