Batch Pipeline API#

This API exposes the ability to pipeline row-parallel batch queries on a Modin DataFrame. Currently, this feature is only supported for the PandasOnRay execution.

API#

Module houses PandasQueryPipeline and PandasQuery classes, that implement a batch pipeline protocol for Modin Dataframes.

class modin.experimental.batch.pipeline.PandasQuery(func: Callable, is_output: bool = False, repartition_after: bool = False, fan_out: bool = False, pass_partition_id: bool = False, reduce_fn: Optional[Callable] = None, output_id: Optional[int] = None)#

Internal representation of a single query in a pipeline.

This object represents a single function to be pipelined in a batch pipeline.

Parameters:
  • func (Callable) – The function to apply to the dataframe.

  • is_output (bool, default: False) – Whether this query is an output query and should be passed both to the next query, and directly to postprocessing.

  • repartition_after (bool, default: False) – Whether to repartition after this query is computed. Currently, repartitioning is only supported if there is 1 partition prior to repartitioning.

  • fan_out (bool, default: False) – Whether to fan out this node. If True and only 1 partition is passed as input, the partition is replicated PandasQueryPipeline.num_partitions (default: NPartitions.get) times, and the function is called on each. The reduce_fn must also be specified.

  • pass_partition_id (bool, default: False) – Whether to pass the numerical partition id to the query.

  • reduce_fn (Callable, default: None) – The reduce function to apply if fan_out is set to True. This takes the PandasQueryPipeline.num_partitions (default: NPartitions.get) partitions that result from this query, and combines them into 1 partition.

  • output_id (int, default: None) – An id to assign to this node if it is an output.

Notes

func must be a function that is applied along an axis of the dataframe.

Use pandas for any module level functions inside func since it operates directly on partitions.

class modin.experimental.batch.pipeline.PandasQueryPipeline(df, num_partitions: Optional[int] = None)#

Internal representation of a query pipeline.

This object keeps track of the functions that compose to form a query pipeline.

Parameters:
  • df (modin.pandas.Dataframe) – The dataframe to perform this pipeline on.

  • num_partitions (int, optional) – The number of partitions to maintain for the batched dataframe. If not specified, the value is assumed equal to NPartitions.get().

Notes

Only row-parallel pipelines are supported. All queries will be applied along the row axis.

add_query(func: Callable, is_output: bool = False, repartition_after: bool = False, fan_out: bool = False, pass_partition_id: bool = False, reduce_fn: Optional[Callable] = None, output_id: Optional[int] = None)#

Add a query to the current pipeline.

Parameters:
  • func (Callable) – DataFrame query to perform.

  • is_output (bool, default: False) – Whether this query should be designated as an output query. If True, the output of this query is passed both to the next query and directly to postprocessing.

  • repartition_after (bool, default: False) – Whether the dataframe should be repartitioned after this query. Currently, repartitioning is only supported if there is 1 partition prior.

  • fan_out (bool, default: False) – Whether to fan out this node. If True and only 1 partition is passed as input, the partition is replicated self.num_partitions (default: NPartitions.get) times, and the function is called on each. The reduce_fn must also be specified.

  • pass_partition_id (bool, default: False) – Whether to pass the numerical partition id to the query.

  • reduce_fn (Callable, default: None) – The reduce function to apply if fan_out is set to True. This takes the self.num_partitions (default: NPartitions.get) partitions that result from this query, and combines them into 1 partition.

  • output_id (int, default: None) – An id to assign to this node if it is an output.

Notes

Use pandas for any module level functions inside func since it operates directly on partitions.

compute_batch(postprocessor: Optional[Callable] = None, pass_partition_id: Optional[bool] = False, pass_output_id: Optional[bool] = False)#

Run the completed pipeline + any postprocessing steps end to end.

Parameters:
  • postprocessor (Callable, default: None) – A postprocessing function to be applied to each output partition. The order of arguments passed is df (the partition), output_id (if pass_output_id=True), and partition_id (if pass_partition_id=True).

  • pass_partition_id (bool, default: False) – Whether or not to pass the numerical partition id to the postprocessing function.

  • pass_output_id (bool, default: False) – Whether or not to pass the output ID associated with output queries to the postprocessing function.

Returns:

If output ids are specified, a dictionary mapping output id to the resulting dataframe is returned, otherwise, a list of the resulting dataframes is returned.

Return type:

list or dict or DataFrame

update_df(df)#

Update the dataframe to perform this pipeline on.

Parameters:

df (modin.pandas.DataFrame) – The new dataframe to perform this pipeline on.