Published

Mon 11 July 2022

←Home

mapPartitions vs mapInPandas

Prior to spark 3.0+, to optimize for performance and utilize vectorized operations, you'd generally have to repartition the dataset and invoke mapPartitions.

This had the major drawback of performance impact that was incurred from repartitioning (caused by shuffle) the DataFrame.

With spark 3.0+, if your underlying function is using pandas dataframe, you should use mapInPandas instead.

The reasons for doing so ?

  1. Don't need to repartition, so it avoids shuffle and increases the performance. You can instead just set spark.sql.execution.arrow.maxRecordsPerBatch config.
  2. Underlying pyarrow optimizations for converting to pandas dataframe are baked in.

Let's benchmark with a simple example below:

@pytest.fixture(scope='session')
def spark_session() -> sql.SparkSession:
    """
    Yields spark session with driver memory 1g
    :return: yields the spark session
    """
    conf = SparkConf()
    os.environ['PYSPARK_PYTHON'] = sys.executable
    os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
    conf.setAll([('spark.driver.memory', '1g')])
    spark_session = (sql.SparkSession.builder.master('local[*]').
                     config(conf=conf).enableHiveSupport().getOrCreate())
    spark_session.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
    yield spark_session
    spark_session.stop()

def pdf(size: int) -> pd.DataFrame:
    """
    Creates a pandas dataframe of specified size
    :param size: the length of the dataframe
    :return: the created pandas dataframe
    """
    data = {"client_id": np.arange(size), "prob": np.random.random(size)}
    return pd.DataFrame(data)


def add_subtract_do_map_partitions(df: pd.DataFrame) -> pd.DataFrame:
    df["prob"] = df["prob"].apply(lambda x: x + 0.05 - 0.05)
    return df


def do_map_partitions(df, parts):
    df = df.repartition(parts)
    cols = df.columns
    return df.rdd.mapPartitions(
         lambda part: add_subtract_do_map_partitions(
             pd.DataFrame(list(part), columns=cols)
         ),
         preservesPartitioning=True,
    ).count()


def map_add_subtract_do_map_partitions(dfs):
    for df in dfs:
        yield add_subtract_do_map_partitions(df)


def do_map_in_pandas(df):
    return df.mapInPandas(map_add_subtract_do_map_partitions, df.schema).count()


def create_df(spark_session, pdf) -> pyspark.sql.DataFrame:
    """
    Creates a spark dataframe from pandas dataframe of partitions
    :param spark_session: the spark session
    :param pdf: the pandas dataframe to use
    :return:
    """
    return spark_session.createDataFrame(pdf)


@pytest.mark.parametrize("size,partitions", [(10000, 1), (100000, 10), (1000000, 100)])
@pytest.mark.benchmark_do_map_partitions
def test_benchmark_do_map_partitions(benchmark, spark_session, size, partitions):
    df = create_df(spark_session, pdf(size))
    benchmark.pedantic(do_map_partitions, args=(df, partitions), iterations=3)


@pytest.mark.parametrize("size", [10000, 100000, 1000000])
@pytest.mark.benchmark_do_map_in_pandas
def test_benchmark_do_map_in_pandas(benchmark, spark_session, size):
    df = create_df(spark_session, pdf(size))
    benchmark.pedantic(do_map_in_pandas, args=(df,), iterations=3)

And the results are shown in the image below:

Speed in Milliseconds (ms)24024032032040040048048056056064064072072080080088088096096010401040112011201200120012801280136013601440144015201520test_benchmark_do_map_in_pandas[100000]test_benchmark_do_map_in_pandas[1000000]test_benchmark_do_map_partitions[100000-10]test_benchmark_do_map_in_pandas[10000]test_benchmark_do_map_partitions[10000-1]test_benchmark_do_map_partitions[1000000-100]Min: 170.5856 Q1-1.5IQR: 170.5856 Q1: 170.5856 Median: 170.5856 Q3: 170.5856 Q3+1.5IQR: 170.5856 Max: 170.585669.55128205128204223.52459506582517Min: 302.9352 Q1-1.5IQR: 302.9352 Q1: 302.9352 Median: 302.9352 Q3: 302.9352 Q3+1.5IQR: 302.9352 Max: 302.9352181.73076923076925203.00471475900346Min: 517.2206 Q1-1.5IQR: 517.2206 Q1: 517.2206 Median: 517.2206 Q3: 517.2206 Q3+1.5IQR: 517.2206 Max: 517.2206293.9102564102564169.78126964149718Min: 842.0743 Q1-1.5IQR: 842.0743 Q1: 842.0743 Median: 842.0743 Q3: 842.0743 Q3+1.5IQR: 842.0743 Max: 842.0743406.0897435897436119.41499049069733Min: 956.9044 Q1-1.5IQR: 956.9044 Q1: 956.9044 Median: 956.9044 Q3: 956.9044 Q3+1.5IQR: 956.9044 Max: 956.9044518.2692307692307101.61138820041344Min: 1583.2606 Q1-1.5IQR: 1583.2606 Q1: 1583.2606 Median: 1583.2606 Q3: 1583.2606 Q3+1.5IQR: 1583.2606 Max: 1583.2606630.4487179487184.499258761288246Speed in Milliseconds (ms)TrialDuration

So, in conclusion, you are better off with mapInPandas.

Go Top
comments powered by Disqus