Operations that support range-partitioning in Modin#

The following operations change their behavior once cfg.RangePartitioning variable is set to True. Go through the list find out when it could be beneficial to engage range-partitioning for a certain method.

GroupBy#

Note

When grouping on multiple columns using range-partitioning implementation, the result may not be sorted even if groupby(sort=True, ...) was passed: modin-project/modin#6875.

Range-partitioning groupby implementation is automatically engaged for groupby.apply(), groupby.transform(), groupby.rolling(). For groupby aggregations from this list, MapReduce implementation is used by default. MapReduce tends to show better performance for groupby with low-cardinality. If the cardinality of your columns to group is expected to be high, it’s recommended to engage range-partitioning implementation.

Merge#

Note

Range-partitioning approach is implemented only for “left” and “inner” merge and only when merging on a single column using on argument.

Range-partitioning merge replaces broadcast merge. It is recommended to use range-partitioning implementation if the right dataframe in merge is as big as the left dataframe. In this case, range-partitioning implementation works faster and consumes less RAM.

Under the spoiler you can find performance comparison of range-partitioning and broadcast merge in different scenarios:

Performance measurements for merge

The performance was measured on h2o join queries using Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz (56 cores), with the number of cores allocated for Modin limited by 44 (MODIN_CPUS=44).

Measurements for small 500mb data:

../../_images/merge_h2o_500mb.jpg

Measurements for medium 5gb data:

../../_images/merge_h2o_5gb.png

.unique() and .drop_duplicates()#

Note

When range-partitioning is enabled, both .unique() and .drop_duplicates() will yield results that are sorted along rows. If range-partitioning is disabled, the original order will be maintained.

Range-partitioning implementation of .unique() / .drop_duplicates() works best when the input data size is big (more than 5_000_000 rows) and when the output size is also expected to be big (no more than 80% values are duplicates).

Under the spoiler you can find performance comparisons in different scenarios:

Performance measurements for ``.unique()``

The performance was measured on randomly generated data using Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz (56 cores). The duplicate rate shows the procentage of duplicated rows in the dataset. You can learn more about this micro-benchmark by reading its source code:

Micro-benchmark's source code
import modin.pandas as pd
import numpy as np
import modin.config as cfg

from modin.utils import execute
from timeit import default_timer as timer
import pandas

cfg.CpuCount.put(16)

def get_data(nrows, dtype):
    if dtype == int:
        return np.arange(nrows)
    elif dtype == float:
        return np.arange(nrows).astype(float)
    elif dtype == str:
        return np.array([f"value{i}" for i in range(nrows)])
    else:
        raise NotImplementedError(dtype)

pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())).to_numpy()

nrows = [1_000_000, 5_000_000, 10_000_000, 25_000_000, 50_000_000, 100_000_000]
duplicate_rate = [0, 0.1, 0.5, 0.95]
dtypes = [int, str]
use_range_part = [True, False]

columns = pandas.MultiIndex.from_product([dtypes, duplicate_rate, use_range_part], names=["dtype", "duplicate rate", "use range-part"])
result = pandas.DataFrame(index=nrows, columns=columns)

i = 0
total_its = len(nrows) * len(duplicate_rate) * len(dtypes) * len(use_range_part)

for dt in dtypes:
    for nrow in nrows:
        data = get_data(nrow, dt)
        np.random.shuffle(data)
        for dpr in duplicate_rate:
            data_c = data.copy()
            dupl_val = data_c[0]

            num_duplicates = int(dpr * nrow)
            dupl_indices = np.random.choice(np.arange(nrow), num_duplicates, replace=False)
            data_c[dupl_indices] = dupl_val

            for impl in use_range_part:
                print(f"{round((i / total_its) * 100, 2)}%")
                i += 1
                cfg.RangePartitioning.put(impl)

                sr = pd.Series(data_c)
                execute(sr)

                t1 = timer()
                # returns a list, so no need for materialization
                sr.unique()
                tm = timer() - t1
                print(nrow, dpr, dt, impl, tm)
                result.loc[nrow, (dt, dpr, impl)] = tm
                result.to_excel("unique.xlsx")

Measurements with 16 cores being allocated for Modin (MODIN_CPUS=16):

../../_images/unique_16cpus.jpg

Measurements with 44 cores being allocated for Modin (MODIN_CPUS=4):

../../_images/unique_44cpus.jpg
Performance measurements for ``.drop_duplicates()``

The performance was measured on randomly generated data using Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz (56 cores). The duplicate rate shows the procentage of duplicated rows in the dataset. The subset size shows the number of columns being specified as a subset parameter for df.drop_duplicates(). You can learn more about this micro-benchmark by reading its source code:

Micro-benchmark's source code
import modin.pandas as pd
import numpy as np
import modin.config as cfg

from modin.utils import execute
from timeit import default_timer as timer
import pandas

cfg.CpuCount.put(16)

pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())).to_numpy()

nrows = [1_000_000, 5_000_000, 10_000_000, 25_000_000]
duplicate_rate = [0, 0.1, 0.5, 0.95]
subset = [["col0"], ["col1", "col2", "col3", "col4"], None]
ncols = 15
use_range_part = [True, False]

columns = pandas.MultiIndex.from_product(
    [
        [len(sbs) if sbs is not None else ncols for sbs in subset],
        duplicate_rate,
        use_range_part
    ],
    names=["subset size", "duplicate rate", "use range-part"]
)
result = pandas.DataFrame(index=nrows, columns=columns)

i = 0
total_its = len(nrows) * len(duplicate_rate) * len(subset) * len(use_range_part)

for sbs in subset:
    for nrow in nrows:
        data = {f"col{i}": np.arange(nrow) for i in range(ncols)}
        pandas_df = pandas.DataFrame(data)

        for dpr in duplicate_rate:
            pandas_df_c = pandas_df.copy()
            dupl_val = pandas_df_c.iloc[0]

            num_duplicates = int(dpr * nrow)
            dupl_indices = np.random.choice(np.arange(nrow), num_duplicates, replace=False)
            pandas_df_c.iloc[dupl_indices] = dupl_val

            for impl in use_range_part:
                print(f"{round((i / total_its) * 100, 2)}%")
                i += 1
                cfg.RangePartitioning.put(impl)

                md_df = pd.DataFrame(pandas_df_c)
                execute(md_df)

                t1 = timer()
                res = md_df.drop_duplicates(subset=sbs)
                execute(res)
                tm = timer() - t1

                sbs_s = len(sbs) if sbs is not None else ncols
                print("len()", res.shape, nrow, dpr, sbs_s, impl, tm)
                result.loc[nrow, (sbs_s, dpr, impl)] = tm
                result.to_excel("drop_dupl.xlsx")

Measurements with 16 cores being allocated for Modin (MODIN_CPUS=16):

../../_images/drop_duplicates_16cpus.jpg

Measurements with 44 cores being allocated for Modin (MODIN_CPUS=44):

../../_images/drop_duplicates_44cpus.jpg

‘.nunique()’#

Note

Range-partitioning approach is implemented only for pd.Series.nunique() and 1-column dataframes. For multi-column dataframes .nunique() can only use full-axis reduce implementation.

Range-partitioning implementation of ‘.nunique()’’ works best when the input data size is big (more than 5_000_000 rows) and when the output size is also expected to be big (no more than 80% values are duplicates).

Under the spoiler you can find performance comparisons in different scenarios:

Performance measurements for ``.nunique()``

The performance was measured on randomly generated data using Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz (56 cores). The duplicate rate shows the procentage of duplicated rows in the dataset. You can learn more about this micro-benchmark by reading its source code:

Micro-benchmark's source code
import modin.pandas as pd
import numpy as np
import modin.config as cfg

from modin.utils import execute
from timeit import default_timer as timer
import pandas

cfg.CpuCount.put(16)

def get_data(nrows, dtype):
    if dtype == int:
        return np.arange(nrows)
    elif dtype == float:
        return np.arange(nrows).astype(float)
    elif dtype == str:
        return np.array([f"value{i}" for i in range(nrows)])
    else:
        raise NotImplementedError(dtype)

pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())).to_numpy()

nrows = [1_000_000, 5_000_000, 10_000_000, 25_000_000, 50_000_000, 100_000_000]
duplicate_rate = [0, 0.1, 0.5, 0.95]
dtypes = [int, str]
use_range_part = [True, False]

columns = pandas.MultiIndex.from_product([dtypes, duplicate_rate, use_range_part], names=["dtype", "duplicate rate", "use range-part"])
result = pandas.DataFrame(index=nrows, columns=columns)

i = 0
total_its = len(nrows) * len(duplicate_rate) * len(dtypes) * len(use_range_part)

for dt in dtypes:
    for nrow in nrows:
        data = get_data(nrow, dt)
        np.random.shuffle(data)
        for dpr in duplicate_rate:
            data_c = data.copy()
            dupl_val = data_c[0]

            num_duplicates = int(dpr * nrow)
            dupl_indices = np.random.choice(np.arange(nrow), num_duplicates, replace=False)
            data_c[dupl_indices] = dupl_val

            for impl in use_range_part:
                print(f"{round((i / total_its) * 100, 2)}%")
                i += 1
                cfg.RangePartitioning.put(impl)

                sr = pd.Series(data_c)
                execute(sr)

                t1 = timer()
                # returns a scalar, so no need for materialization
                res = sr.nunique()
                tm = timer() - t1
                print(nrow, dpr, dt, impl, tm)
                result.loc[nrow, (dt, dpr, impl)] = tm
                result.to_excel("nunique.xlsx")

Measurements with 16 cores being allocated for Modin (MODIN_CPUS=16):

../../_images/nunique_16cpus.jpg

Resample#

Note

Range-partitioning approach doesn’t support transform-like functions (like .interpolate(), .ffill(), .bfill(), …)

It is recommended to use range-partitioning for resampling if you’re dealing with a dataframe that has more than 5_000_000 rows and the expected output is also expected to be big (more than 500_000 rows).

Under the spoiler you can find performance comparisons in different scenarios:

Performance measurements for ``.resample()``

The script below measures performance of df.resample(rule).sum() using Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz (56 cores). You can learn more about this micro-benchmark by reading its source code:

Micro-benchmark's source code
import pandas
import numpy as np
import modin.pandas as pd
import modin.config as cfg

from timeit import default_timer as timer

from modin.utils import execute

cfg.CpuCount.put(16)

nrows = [1_000_000, 5_000_000, 10_000_000]
ncols = [5, 33]
rules = [
    "500ms", # doubles nrows
    "30s", # decreases nrows in 30 times
    "5min", # decreases nrows in 300
]
use_rparts = [True, False]

cols = pandas.MultiIndex.from_product([rules, ncols, use_rparts], names=["rule", "ncols", "USE RANGE PART"])
rres = pandas.DataFrame(index=nrows, columns=cols)

total_nits = len(nrows) * len(ncols) * len(rules) * len(use_rparts)
i = 0

for nrow in nrows:
    for ncol in ncols:
        index = pandas.date_range("31/12/2000", periods=nrow, freq="s")
        data = {f"col{i}": np.arange(nrow) for i in range(ncol)}
        pd_df = pandas.DataFrame(data, index=index)
        for rule in rules:
            for rparts in use_rparts:
                print(f"{round((i / total_nits) * 100, 2)}%")
                i += 1
                cfg.RangePartitioning.put(rparts)

                df = pd.DataFrame(data, index=index)
                execute(df)

                t1 = timer()
                res = df.resample(rule).sum()
                execute(res)
                ts = timer() - t1
                print(nrow, ncol, rule, rparts, ts)

                rres.loc[nrow, (rule, ncol, rparts)] = ts
                rres.to_excel("resample.xlsx")

Measurements with 16 cores being allocated for Modin (MODIN_CPUS=16):

../../_images/resample_16cpus.jpg

pivot_table#

Range-partitioning implementation is automatically applied for df.pivot_table whenever possible, users can’t control this.

sort_values#

Range-partitioning implementation is automatically applied for df.sort_values whenever possible, users can’t control this.