Batch Pipline API Usage Guide#

Modin provides an experimental batching feature that pipelines row-parallel queries. This feature is currently only supported for the PandasOnRay engine. Please note that this feature is experimental and behavior or interfaces could be changed.

Usage examples#

In examples below we build and run some pipelines. It is important to note that the queries passed to the pipeline operate on Modin DataFrame partitions, which are backed by pandas. When using pandas- module level functions, please make sure to import and use pandas rather than modin.pandas.

Simple Batch Pipelining#

This example walks through a simple batch pipeline in order to familiarize the user with the API.

from modin.experimental.batch import PandasQueryPipeline
import modin.pandas as pd
import numpy as np

df = pd.DataFrame(
    np.random.randint(0, 100, (100, 100)),
    columns=[f"col {i}" for i in range(1, 101)],
) # Build the dataframe we will pipeline.
pipeline = PandasQueryPipeline(df) # Build the pipeline.
pipeline.add_query(lambda df: df + 1, is_output=True) # Add the first query and specify that
                                                      # it is an output query.
pipeline.add_query(
    lambda df: df.rename(columns={f"col {i}":f"col {i-1}" for i in range(1, 101)})
) # Add a second query.
pipeline.add_query(
    lambda df: df.drop(columns=['col 99']),
    is_output=True,
) # Add a third query and specify that it is an output query.
new_df = pd.DataFrame(
    np.ones((100, 100)),
    columns=[f"col {i}" for i in range(1, 101)],
) # Build a second dataframe that we will pipeline now instead.
pipeline.update_df(new_df) # Update the dataframe that we will pipeline to be `new_df`
                           # instead of `df`.
result_dfs = pipeline.compute_batch() # Begin batch processing.

# Print pipeline results
print(f"Result of Query 1:\n{result_dfs[0]}")
print(f"Result of Query 2:\n{result_dfs[1]}")
# Output IDs can also be specified
pipeline = PandasQueryPipeline(df) # Build the pipeline.
pipeline.add_query(
    lambda df: df + 1,
    is_output=True,
    output_id=1,
) # Add the first query, specify that it is an output query, as well as specify an output id.
pipeline.add_query(
    lambda df: df.rename(columns={f"col {i}":f"col {i-1}" for i in range(1, 101)})
) # Add a second query.
pipeline.add_query(
    lambda df: df.drop(columns=['col 99']),
    is_output=True,
    output_id=2,
) # Add a third query, specify that it is an output query, and specify an output_id.
result_dfs = pipeline.compute_batch() # Begin batch processing.

# Print pipeline results - should be a dictionary mapping Output IDs to resulting dataframes:
print(f"Mapping of Output ID to dataframe:\n{result_dfs}")
# Print results
for query_id, res_df in result_dfs.items():
    print(f"Query {query_id} resulted in\n{res_df}")

Batch Pipelining with Postprocessing#

A postprocessing function can also be provided when calling pipeline.compute_batch. The example below runs a similar pipeline as above, but the postprocessing function writes the output dfs to a parquet file.

from modin.experimental.batch import PandasQueryPipeline
import modin.pandas as pd
import numpy as np
import os
import shutil

df = pd.DataFrame(
    np.random.randint(0, 100, (100, 100)),
    columns=[f"col {i}" for i in range(1, 101)],
) # Build the dataframe we will pipeline.
pipeline = PandasQueryPipeline(df) # Build the pipeline.
pipeline.add_query(
    lambda df: df + 1,
    is_output=True,
    output_id=1,
) # Add the first query, specify that it is an output query, as well as specify an output id.
pipeline.add_query(
    lambda df: df.rename(columns={f"col {i}":f"col {i-1}" for i in range(1, 101)})
) # Add a second query.
pipeline.add_query(
    lambda df: df.drop(columns=['col 99']),
    is_output=True,
    output_id=2,
) # Add a third query, specify that it is an output query, and specify an output_id.
def postprocessing_func(df, output_id, partition_id):
    filepath = f"query_{output_id}/"
    os.makedirs(filepath, exist_ok=True)
    filepath += f"part-{partition_id:04d}.parquet"
    df.to_parquet(filepath)
    return df
result_dfs = pipeline.compute_batch(
    postprocessor=postprocessing_func,
    pass_partition_id=True,
    pass_output_id=True,
) # Begin computation, pass in a postprocessing function, and specify that partition ID and
  # output ID should be passed to that postprocessing function.

print(os.system("ls query_1/")) # Should show `NPartitions.get()` parquet files - which
                                # correspond to partitions of the output of query 1.
print(os.system("ls query_2/")) # Should show `NPartitions.get()` parquet files - which
                                # correspond to partitions of the output of query 2.

for query_id, res_df in result_dfs.items():
    written_df = pd.read_parquet(f"query_{query_id}/")
    shutil.rmtree(f"query_{query_id}/") # Clean up
    print(f"Written and Computed DF are " +
          f"{'equal' if res_df.equals(written_df) else 'not equal'} for query {query_id}")

Batch Pipelining with Fan Out#

If the input dataframe to a query is small (consisting of only one partition), it is possible to induce additional parallelism using the fan_out argument. The fan_out argument replicates the input partition, applies the query to each replica, and then coalesces all of the replicas back to one partition using the reduce_fn that must also be specified when fan_out is True.

It is possible to control the parallelism via the num_partitions parameter passed to the constructor of the PandasQueryPipeline. This parameter designates the desired number of partitions, and defaults to NPartitions.get() when not specified. During fan out, the input partition is replicated num_partitions times. In the previous examples, num_partitions was not specified, and so defaulted to NPartitions.get().

The example below demonstrates the usage of fan_out and num_partitions. We first demonstrate an example of a function that would benefit from this computation pattern:

import glob
from PIL import Image
import torchvision.transforms as T
import torchvision

transforms = T.Compose([T.ToTensor()])
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()
COCO_INSTANCE_CATEGORY_NAMES = [
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A', 'stop sign',
    'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack', 'umbrella', 'N/A', 'N/A',
    'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
    'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket',
    'bottle', 'N/A', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl',
    'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza',
    'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table',
    'N/A', 'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
    'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A', 'book',
    'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]

def contains_cat(image, model):
    image = transforms(image)
    labels = [COCO_INSTANCE_CATEGORY_NAMES[i] for i in model([image])[0]['labels']]
    return 'cat' in labels

def serial_query(df):
    """
    This function takes as input a dataframe with a single row corresponding to a folder
    containing images to parse. Each image in the folder is passed through a neural network
    that detects whether it contains a cat, in serial, and a new column is computed for the
    dataframe that counts the number of images containing cats.

    Parameters
    ----------
    df : a dataframe
        The dataframe to process

    Returns
    -------
    The same dataframe as before, with an additional column containing the count of images
    containing cats.
    """
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    model.eval()
    img_folder = df['images'][0]
    images = sorted(glob.glob(f"{img_folder}/*.jpg"))
    cats = 0
    for img in images:
        cats = cats + 1 if contains_cat(Image.open(img), model) else cats
    df['cat_count'] = cats
    return df

To download the image files to test out this code, run the following bash script, which downloads the images from the fast-ai-coco S3 bucket to a folder called images in your current working directory:

aws s3 cp s3://fast-ai-coco/coco_tiny.tgz . --no-sign-request; tar -xf coco_tiny.tgz; mkdir \
    images; mv coco_tiny/train/* images/; rm -rf coco_tiny; rm -rf coco_tiny.tgz

We can pipeline that code like so:

import modin.pandas as pd
from modin.experimental.batch import PandasQueryPipeline
from time import time
df = pd.DataFrame([['images']], columns=['images'])
pipeline = PandasQueryPipeline(df)
pipeline.add_query(serial_query, is_output=True)
serial_start = time()
df_with_cat_count = pipeline.compute_batch()[0]
serial_end = time()
print(f"Result of pipeline:\n{df_with_cat_count}")

We can induce 8x parallelism into the pipeline above by combining the fan_out and num_partitions parameters like so:

import modin.pandas as pd
from modin.experimental.batch import PandasQueryPipeline
import shutil
from time import time
df = pd.DataFrame([['images']], columns=['images'])
desired_num_partitions = 8
def parallel_query(df, partition_id):
    """
    This function takes as input a dataframe with a single row corresponding to a folder
    containing images to parse. It parses `total_images/desired_num_partitions` images every
    time it is called. A new column is computed for the dataframe that counts the number of
    images containing cats.

    Parameters
    ----------
    df : a dataframe
        The dataframe to process
    partition_id : int
        The partition id of the dataframe that this function runs on.

    Returns
    -------
    The same dataframe as before, with an additional column containing the count of images
    containing cats.
    """
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    model.eval()
    img_folder = df['images'][0]
    images = sorted(glob.glob(f"{img_folder}/*.jpg"))
    total_images = len(images)
    cats = 0
    start_index = partition_id * (total_images // desired_num_partitions)
    if partition_id == desired_num_partitions - 1: # Last partition must parse to end of list
        images = images[start_index:]
    else:
        end_index = (partition_id + 1) * (total_images // desired_num_partitions)
        images = images[start_index:end_index]
    for img in images:
        cats = cats + 1 if contains_cat(Image.open(img), model) else cats
    df['cat_count'] = cats
    return df

def reduce_fn(dfs):
    """
    Coalesce the results of fanning out the `parallel_query` query.

    Parameters
    ----------
    dfs : a list of dataframes
        The resulting dataframes from fanning out `parallel_query`

    Returns
    -------
    A new dataframe whose `cat_count` column is the sum of the `cat_count` column of all
    dataframes in `dfs`
    """
    df = dfs[0]
    cat_count = df['cat_count'][0]
    for dataframe in dfs[1:]:
        cat_count += dataframe['cat_count'][0]
    df['cat_count'] = cat_count
    return df
pipeline = PandasQueryPipeline(df, desired_num_partitions)
pipeline.add_query(
    parallel_query,
    fan_out=True,
    reduce_fn=reduce_fn,
    is_output=True,
    pass_partition_id=True
)
parallel_start = time()
df_with_cat_count = pipeline.compute_batch()[0]
parallel_end = time()
print(f"Result of pipeline:\n{df_with_cat_count}")
print(f"Total Time in Serial: {serial_end - serial_start}")
print(f"Total time with induced parallelism: {parallel_end - parallel_start}")
shutil.rmtree("images/") # Clean up

Batch Pipelining with Dynamic Repartitioning#

Similarly, it is also possible to hint to the Pipeline API to repartition after a node completes computation. This is currently only supported if the input dataframe consists of only one partition. The number of partitions after repartitioning is controlled by the num_partitions parameter passed to the constructor of the PandasQueryPipeline.

The following example demonstrates how to use the repartition_after parameter.

import modin.pandas as pd
from modin.experimental.batch import PandasQueryPipeline
import numpy as np

small_df = pd.DataFrame([[1, 2, 3]]) # Create a small dataframe

def increase_dataframe_size(df):
    import pandas
    new_df = pandas.concat([df] * 1000)
    new_df = new_df.reset_index(drop=True) # Get a new range index that isn't duplicated
    return new_df

desired_num_partitions = 24 # We will repartition to 24 partitions

def add_partition_id_to_df(df, partition_id):
    import pandas
    new_col = pandas.Series([partition_id]*len(df), name="partition_id", index=df.index)
    return pandas.concat([df, new_col], axis=1)

pipeline = PandasQueryPipeline(small_df, desired_num_partitions)
pipeline.add_query(increase_dataframe_size, repartition_after=True)
pipeline.add_query(add_partition_id_to_df, pass_partition_id=True, is_output=True)
result_df = pipeline.compute_batch()[0]
print(f"Number of partitions passed to second query: " +
      f"{len(np.unique(result_df['partition_id'].values))}")
print(f"Result of pipeline:\n{result_df}")