Prior to spark 3.0+, to optimize for performance and utilize vectorized operations, you'd generally have to repartition the dataset and invoke mapPartitions.
This had the major drawback of performance impact that was incurred from repartitioning (caused by shuffle) the DataFrame.
With spark 3.0+, if your underlying function is using pandas dataframe, you should use mapInPandas instead.
The reasons for doing so ?
- Don't need to repartition, so it avoids shuffle and increases the performance. You can instead just set spark.sql.execution.arrow.maxRecordsPerBatch config.
- Underlying pyarrow optimizations for converting to pandas dataframe are baked in.
Let's benchmark with a simple example below:
@pytest.fixture(scope='session')
def spark_session() -> sql.SparkSession:
"""
Yields spark session with driver memory 1g
:return: yields the spark session
"""
conf = SparkConf()
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
conf.setAll([('spark.driver.memory', '1g')])
spark_session = (sql.SparkSession.builder.master('local[*]').
config(conf=conf).enableHiveSupport().getOrCreate())
spark_session.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
yield spark_session
spark_session.stop()
def pdf(size: int) -> pd.DataFrame:
"""
Creates a pandas dataframe of specified size
:param size: the length of the dataframe
:return: the created pandas dataframe
"""
data = {"client_id": np.arange(size), "prob": np.random.random(size)}
return pd.DataFrame(data)
def add_subtract_do_map_partitions(df: pd.DataFrame) -> pd.DataFrame:
df["prob"] = df["prob"].apply(lambda x: x + 0.05 - 0.05)
return df
def do_map_partitions(df, parts):
df = df.repartition(parts)
cols = df.columns
return df.rdd.mapPartitions(
lambda part: add_subtract_do_map_partitions(
pd.DataFrame(list(part), columns=cols)
),
preservesPartitioning=True,
).count()
def map_add_subtract_do_map_partitions(dfs):
for df in dfs:
yield add_subtract_do_map_partitions(df)
def do_map_in_pandas(df):
return df.mapInPandas(map_add_subtract_do_map_partitions, df.schema).count()
def create_df(spark_session, pdf) -> pyspark.sql.DataFrame:
"""
Creates a spark dataframe from pandas dataframe of partitions
:param spark_session: the spark session
:param pdf: the pandas dataframe to use
:return:
"""
return spark_session.createDataFrame(pdf)
@pytest.mark.parametrize("size,partitions", [(10000, 1), (100000, 10), (1000000, 100)])
@pytest.mark.benchmark_do_map_partitions
def test_benchmark_do_map_partitions(benchmark, spark_session, size, partitions):
df = create_df(spark_session, pdf(size))
benchmark.pedantic(do_map_partitions, args=(df, partitions), iterations=3)
@pytest.mark.parametrize("size", [10000, 100000, 1000000])
@pytest.mark.benchmark_do_map_in_pandas
def test_benchmark_do_map_in_pandas(benchmark, spark_session, size):
df = create_df(spark_session, pdf(size))
benchmark.pedantic(do_map_in_pandas, args=(df,), iterations=3)
And the results are shown in the image below:
So, in conclusion, you are better off with mapInPandas.
Go Top
comments powered by Disqus