вторник, 5 января 2021 г.

Оптимизация Spark Scala UDF


Проблемы Scala UDF

1. UDF - черный ящик для Codegen: представляет из себя вызов java функции и не встраивается в wholestagegen
2. Нет возможности not null оптимизации
3. UDF не может быть спущена на уровень файлов (predicate pushdown)
4. Конвертация UTF-8 строк Spark в UTF-16 строки JVM при передаче параметров и получении результата.

Создание Native Scala UDF для Catalyst

- Создаем udf в org.apache.spark.sql.catalyst.expressions:
* При создании указываем trait NullIntolerant, который говорит, что функция применима только для not null колонок
* Реализуем doGenCode - текст java программы с логикой udf.
В нашем случае просто "имя колонки + 777"
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._

case class Mynativefunc(child: Expression)
  extends UnaryExpression with ExpectsInputTypes with NullIntolerant {

  override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)

  override def dataType: DataType = child.dataType

  private lazy val numeric = TypeUtils.getNumeric(dataType)

  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = dataType match {
    case _ =>
      defineCodeGen(ctx, ev, c => s"$c + 777")
  }

  protected override def nullSafeEval(input: Any): Any = numeric.plus(input, 777)
}
- Регистриуем функцию в org.apache.spark.sql
package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions._

object custom_functions {
  def mynativefunc(e: Column): Column = Column { Mynativefunc(e.expr) }
}
- в sbt подключаем spark-core, spark-sql, spark-catalyst и собираем jar
name := "NativeUdfTest"
version := "1.0"
scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.0"
libraryDependencies += "org.apache.spark" %% "spark-catalyst" % "3.1.0" 
Полный пример можно посмотреть в Github: SparkNativeUDF

Использование

- При запуске указываем jar с нашей Native UDF
bin\spark-shell --jars file://target/scala-2.12/nativeudftest_2.12-1.0.jar
- Имортим mynativefunc UDF
import org.apache.spark.sql.custom_functions.mynativefunc
- далее испольузем, как обычную UDF

Тестовые данные

- Тестовый parquet файл и DF из него
(1 to 1000000).toDF("id").write.mode("overwrite").parquet("test")
for(i <- 1 to 20) {
    (1 to 1000000).toDF("id").write.mode("append").parquet("test")
}
val df = spark.read.parquet("test")

- Обычная UDF "myudffunc" с которой будем сравнивать Native UDF "mynativefunc"
val myudffunc = udf((x: Int) => x + 777)
spark.udf.register("myudffunc", myudffunc)

- Замерять время будем функцией "time", которая берет среднее от 5 запусков, кроме 1 и последнего:
def time[R](block: => R): Long = {
    var ret: Long = 0
    for (i <- 1 to 5) {
        val t0 = System.nanoTime()
        block 
        if(i > 1 && i < 5) {
            ret = ret + (System.nanoTime() - t0)
        }
    }
    ret / 3
}

- Проверим, что прямое сложение, UDF и Native UDF дают одинаковый результат:
df.agg(sum($"id" + 777)).collect
//Array[org.apache.spark.sql.Row] = Array([10516327500000])

df.agg(sum(myudffunc($"id"))).collect
//Array[org.apache.spark.sql.Row] = Array([10516327500000])

df.agg(sum(mynativefunc($"id"))).collect
//Array[org.apache.spark.sql.Row] = Array([10516327500000])

Сравнение планов, Codegen и производительности

Агрегация чисел

- Прямое сложение дает в java codegen ожидаемо сложение
df.agg(sum($"id" + 777)).queryExecution.debug.codegen 
/* 063 */         agg_value_8 = agg_expr_0_0 + 777;
- Вызов native функции дает также сложение в java коде:
df.agg(sum(mynativefunc($"id"))).queryExecution.debug.codegen 
/* 061 */         agg_value_8 = agg_expr_0_0 + 777;
- Вызов же UDF порождаем вызов java функции с оберткой в try+catch:
df.agg(sum(myudffunc($"id"))).queryExecution.debug.codegen 
/* 066 */         Integer agg_result_0 = null;
/* 067 */         try {
/* 068 */           agg_result_0 = (Integer)((scala.Function1) references[4] /* udf */).apply(agg_arg_0);
/* 069 */         } catch (Exception e) {
/* 070 */           throw new org.apache.spark.SparkException(((java.lang.String) references[3] /* errMsg */), e);
/* 071 */         }

- Нативное сложение и вызов нативной "mynativefunc" работает на 14% быстрей, чем вызов простой UDF
val native_res = time { df.agg(sum($"id" + 777)).collect }
val udf_res = time { df.agg(sum(myudffunc($"id"))).collect }
val native_func_res = time { df.agg(sum(mynativefunc($"id"))).collect }

native_res.toDouble/udf_res.toDouble
//= Native / Normal UDF = 0.8607967494045435

native_res.toDouble/native_func_res.toDouble
//= Native / Native UDF = 1.0923988307860604
Надо понимать, что такая разница вызвана простой логикой UDF. Если UDF будет сложней, то разница будет менее заметна.

Фильтрация по числовому столбцу

- При прямой фильтрации по числу есть фильтр по сложению и pushdown not null ограничения
df.where($"id" + 777 === 1000).explain()
*(1) Filter (isnotnull(id#281) AND ((id#281 + 777) = 1000))
+- *(1) ColumnarToRow
   +- FileScan parquet [id#281] Batched: true, DataFilters: [isnotnull(id#281), ((id#281 + 777) = 1000)], 
       Format: Parquet, Location: InMemoryFileIndex[file:/C:/spark-master/test], PartitionFilters: [], 
       PushedFilters: [IsNotNull(id)], ReadSchema: struct
- Аналогично в native вызове:
df.where(mynativefunc($"id") === 1000).explain() 
*(1) Filter (isnotnull(id#288) AND (mynativefunc(id#288) = 1000))
+- *(1) ColumnarToRow
   +- FileScan parquet [id#288] Batched: true, DataFilters: [isnotnull(id#288), (mynativefunc(id#288) = 1000)], 
        Format: Parquet, Location: InMemoryFileIndex[file:/C:/spark-master/test], PartitionFilters: [], 
        PushedFilters: [IsNotNull(id)], ReadSchema: struct
- При вызове обычной UDF неизвестно, вернет ли она знаение или NULL, по этому в плане есть проверка на NULL и нет NOT NULL pushdown на уровень файлов:
df.where(myudffunc($"id") === 1000).explain()
*(1) Filter (if (isnull(id#283)) null else UDF(knownnotnull(id#283)) = 1000)
+- *(1) ColumnarToRow
   +- FileScan parquet [id#283] Batched: true, DataFilters: [(if (isnull(id#283)) null else UDF(knownnotnull(id#283)) = 1000)], 
        Format: Parquet, Location: InMemoryFileIndex[file:/C:/spark-master/test], PartitionFilters: [], 
        PushedFilters: [], ReadSchema: struct

- сгенерированный java код у прямой фильтрации и native udf одинаково простой:
df.where($"id" + 777 === 1000).queryExecution.debug.codegen 
/* 058 */           filter_value_4 = columnartorow_value_0 + 777;
/* 059 */
/* 060 */           boolean filter_value_3 = false;
/* 061 */           filter_value_3 = filter_value_4 == 1000;
/* 062 */           if (!filter_value_3) continue;
df.where(mynativefunc($"id") === 1000).queryExecution.debug.codegen 
/* 057 */           filter_value_4 = columnartorow_value_0 + 777;
/* 058 */
/* 059 */           boolean filter_value_3 = false;
/* 060 */           filter_value_3 = filter_value_4 == 1000;
/* 061 */           if (!filter_value_3) continue;
- В обычной UDF код значительно запутанней из-за вызова java функции UDF и проверок на NULL:
df.where(myudffunc($"id") === 1000).queryExecution.debug.codegen 
/* 064 */             Integer filter_result_0 = null;
/* 065 */             try {
/* 066 */               filter_result_0 = (Integer)((scala.Function1) references[5] /* udf */).apply(filter_arg_0);
/* 067 */             } catch (Exception e) {
/* 068 */               throw new org.apache.spark.SparkException(((java.lang.String) references[4] /* errMsg */), e);
/* 069 */             }
/* 070 */
/* 071 */             boolean filter_isNull_5 = filter_result_0 == null;
/* 072 */             int filter_value_5 = -1;
/* 073 */             if (!filter_isNull_5) {
/* 074 */               filter_value_5 = filter_result_0;
/* 075 */             }
/* 076 */             filter_isNull_1 = filter_isNull_5;
/* 077 */             filter_value_1 = filter_value_5;
/* 078 */           }
/* 079 */           if (!filter_isNull_1) {
/* 080 */             filter_isNull_0 = false; // resultCode could change nullability.
/* 081 */             filter_value_0 = filter_value_1 == 1000;
/* 082 */
/* 083 */           }
/* 084 */           if (filter_isNull_0 || !filter_value_0) continue;

Агрегация строк

Spark хранит данные строк в UTF-8, а java в UTF-16, по этому при передаче параметров и возвращении результатов происходит двойная конвертация, которая замедляет работу UDF

- Создадим string DF "sdf" из исходного df как конкатенацию "Ф + id + Я"
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val sdf = df.select(concat(lit("Ф"), $"id".cast(StringType), lit("Я")).as("id"))
- Сравним работу нативной обработки строк с обработкой строк через UDF "subOne":
val subOne = udf((x: String) => x.substring(1,2).toInt)
spark.udf.register("subOne", subOne)
- Сгенерированный java код для нативного substr содержит обычный substr и приведение к int
sdf.select($"id".substr(2,1).cast(IntegerType).as("id")).queryExecution.debug.codegen 
/* 052 */         UTF8String columnartorow_value_0 = columnartorow_isNull_0 ? null : (columnartorow_mutableStateArray_2[0].getUTF8String(columnartorow_rowIdx_0));
/* 053 */         if (!columnartorow_isNull_0) {
/* 054 */           project_isNull_1 = false; // resultCode could change nullability.
/* 055 */           project_value_1 = columnartorow_value_0.substringSQL(2, 1);
/* 056 */
/* 057 */         }
/* 058 */         boolean project_isNull_0 = project_isNull_1;
/* 059 */         int project_value_0 = -1;
/* 060 */         if (!project_isNull_1) {
/* 061 */           UTF8String.IntWrapper project_intWrapper_0 = new UTF8String.IntWrapper();
/* 062 */           if (project_value_1.toInt(project_intWrapper_0)) {
/* 063 */             project_value_0 = project_intWrapper_0.value;
/* 064 */           } else {
/* 065 */             project_isNull_0 = true;
/* 066 */           }
/* 067 */           project_intWrapper_0 = null;
/* 068 */         }
- Сгенерированный java код для subOne UDF содержит вызов java функции:
sdf.select(subOne($"id").as("id")).queryExecution.debug.codegen 
/* 050 */         UTF8String columnartorow_value_0 = columnartorow_isNull_0 ? null : (columnartorow_mutableStateArray_2[0].getUTF8String(columnartorow_rowIdx_0));
/* 051 */
/* 052 */         Object project_arg_0 = null;
/* 053 */         if (columnartorow_isNull_0) {
/* 054 */           project_arg_0 = ((scala.Function1[]) references[2] /* converters */)[0].apply(null);
/* 055 */         } else {
/* 056 */           project_arg_0 = ((scala.Function1[]) references[2] /* converters */)[0].apply(columnartorow_value_0);
/* 057 */         }
/* 058 */
/* 059 */         Integer project_result_0 = null;
/* 060 */         try {
/* 061 */           project_result_0 = (Integer)((scala.Function1) references[4] /* udf */).apply(project_arg_0);
/* 062 */         } catch (Exception e) {
/* 063 */           throw new org.apache.spark.SparkException(((java.lang.String) references[3] /* errMsg */), e);
/* 064 */         }
/* 065 */
/* 066 */         boolean project_isNull_0 = project_result_0 == null;
/* 067 */         int project_value_0 = -1;
/* 068 */         if (!project_isNull_0) {
/* 069 */           project_value_0 = project_result_0;
/* 070 */         }

- UDF на 28% медленней нативного варианта за счет дополнительных конвертаций строк:
val native_res = time { sdf.select($"id".substr(2,1).cast(IntegerType).as("id")).agg(sum($"id")).collect }
val udf_res = time { sdf.select(subOne($"id").as("id")).agg(sum($"id")).collect }

native_res.toDouble/udf_res.toDouble
//= Native / string UDF = 0.7206604510597114

Использование в SQL

К сожалению, не нашел способа добавить свою нативную udf в sql без прямой модификации кода Spark.
Для этого нужно:
- Добавить свой jar в зависимости проекта catalyst
- заимпортить org.apache.spark.sql.custom_functions.mynativefunc в sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
- добавить маппинг функции по аналогии в массив expressions объекта FunctionRegistry того же файла
после этого функцию можно будет использовать в sql

Статья вдохновлена докладом на databricks summit 2020

Комментариев нет:

Отправить комментарий