Использование Python reduce() для объединения нескольких PySpark DataFrames

Кто-нибудь знает, почему использование Python3 functools.reduce() приведет к снижению производительности при объединении нескольких PySpark DataFrames, чем просто итеративное объединение одних и тех же DataFrames с использованием цикла for? В частности, это приводит к значительному замедлению работы, за которым следует ошибка нехватки памяти:

def join_dataframes(list_of_join_columns, left_df, right_df):
    return left_df.join(right_df, on=list_of_join_columns)

joined_df = functools.reduce(
    functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)

тогда как этот нет:

joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
    joined_df = joined_df.join(right_df, on=list_of_join_columns)

Любые идеи очень приветствуются. Спасибо!


person Eric Smith    schedule 07.07.2017    source источник


Ответы (2)


arrow_upward
1
arrow_downward

Одна из причин заключается в том, что редукция или свертка обычно функционально чисты: результат каждой операции накопления записывается не в одну и ту же часть памяти, а в новый блок памяти.

В принципе, сборщик мусора может освобождать предыдущий блок после каждого накопления, но если этого не произойдет, вы будете выделять память для каждой обновленной версии накопителя.

person Alex    schedule 07.07.2017

arrow_upward
1
arrow_downward

Пока вы используете CPython (разные реализации могут, но на самом деле не должны демонстрировать существенно различное поведение в этом конкретном случае). Если вы посмотрите на reduce реализацию вы увидите, что это просто цикл for с минимальной обработкой исключений.

Ядро точно эквивалентно циклу, который вы используете

for element in it:
    value = function(value, element)

и нет никаких доказательств, подтверждающих заявления о каком-либо особом поведении.

Кроме того, простые тесты с количеством кадров, практические ограничения соединений Spark (соединения являются одними из самых затратных операций в Spark)

dfs = [
    spark.range(10000).selectExpr(
        "rand({}) AS id".format(i), "id AS value",  "{} AS loop ".format(i)
    )
    for i in range(200)
]

Не показывать существенной разницы во времени между прямым циклом for

def f(dfs):
    df1 = dfs[0]
    for df2 in dfs[1:]:
        df1 = df1.join(df2, ["id"])
    return df1

%timeit -n3 f(dfs)
## 6.25 s ± 257 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

и reduce вызов

from functools import reduce

def g(dfs):
    return reduce(lambda x, y: x.join(y, ["id"]), dfs)

%timeit -n3 g(dfs)
### 6.47 s ± 455 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

Точно так же общие шаблоны поведения JVM сопоставимы между циклами for

Для циклического использования ЦП и памяти — VisualVM

и reduce

уменьшить использование ЦП и памяти — VisualVM

Наконец, оба генерируют идентичные планы выполнения

g(dfs)._jdf.queryExecution().optimizedPlan().equals(
    f(dfs)._jdf.queryExecution().optimizedPlan()
)
## True

что указывает на отсутствие разницы при оценке планов и вероятность возникновения OOM.

Другими словами, ваша корреляция не подразумевает причинно-следственную связь, и наблюдаемые проблемы с производительностью вряд ли связаны с методом, который вы используете для объединения DataFrames.

person user11024414    schedule 06.02.2019
comment
Можно ли использовать левые соединения или внешние соединения с этим методом, например: return reduce(lambda x, y: x.join(y, ["id"]), how="left", dfs) ? - person thentangler; 12.05.2021