- Проблемы Scala UDF
- Создание Native Scala UDF для Catalyst
- Использование
- Тестовые данные
- Сравнение планов, Codegen и производительности
- Использование в SQL
Проблемы Scala UDF
1. UDF - черный ящик для Codegen: представляет из себя вызов java функции и не встраивается в wholestagegen2. Нет возможности not null оптимизации
3. UDF не может быть спущена на уровень файлов (predicate pushdown)
4. Конвертация UTF-8 строк Spark в UTF-16 строки JVM при передаче параметров и получении результата.
Создание Native Scala UDF для Catalyst
- Создаем udf в org.apache.spark.sql.catalyst.expressions:* При создании указываем trait NullIntolerant, который говорит, что функция применима только для not null колонок
* Реализуем doGenCode - текст java программы с логикой udf.
В нашем случае просто "имя колонки + 777"
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._
case class Mynativefunc(child: Expression)
extends UnaryExpression with ExpectsInputTypes with NullIntolerant {
override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)
override def dataType: DataType = child.dataType
private lazy val numeric = TypeUtils.getNumeric(dataType)
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = dataType match {
case _ =>
defineCodeGen(ctx, ev, c => s"$c + 777")
}
protected override def nullSafeEval(input: Any): Any = numeric.plus(input, 777)
}
- Регистриуем функцию в org.apache.spark.sql
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions._
object custom_functions {
def mynativefunc(e: Column): Column = Column { Mynativefunc(e.expr) }
}
- в sbt подключаем spark-core, spark-sql, spark-catalyst и собираем jar
name := "NativeUdfTest" version := "1.0" scalaVersion := "2.12.10" libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.0" libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.0" libraryDependencies += "org.apache.spark" %% "spark-catalyst" % "3.1.0"Полный пример можно посмотреть в Github: SparkNativeUDF
Использование
- При запуске указываем jar с нашей Native UDFbin\spark-shell --jars file://target/scala-2.12/nativeudftest_2.12-1.0.jar- Имортим mynativefunc UDF
import org.apache.spark.sql.custom_functions.mynativefunc- далее испольузем, как обычную UDF
Тестовые данные
- Тестовый parquet файл и DF из него
(1 to 1000000).toDF("id").write.mode("overwrite").parquet("test")
for(i <- 1 to 20) {
(1 to 1000000).toDF("id").write.mode("append").parquet("test")
}
val df = spark.read.parquet("test")
- Обычная UDF "myudffunc" с которой будем сравнивать Native UDF "mynativefunc"
val myudffunc = udf((x: Int) => x + 777)
spark.udf.register("myudffunc", myudffunc)
- Замерять время будем функцией "time", которая берет среднее от 5 запусков, кроме 1 и последнего:
def time[R](block: => R): Long = {
var ret: Long = 0
for (i <- 1 to 5) {
val t0 = System.nanoTime()
block
if(i > 1 && i < 5) {
ret = ret + (System.nanoTime() - t0)
}
}
ret / 3
}
- Проверим, что прямое сложение, UDF и Native UDF дают одинаковый результат:
df.agg(sum($"id" + 777)).collect //Array[org.apache.spark.sql.Row] = Array([10516327500000]) df.agg(sum(myudffunc($"id"))).collect //Array[org.apache.spark.sql.Row] = Array([10516327500000]) df.agg(sum(mynativefunc($"id"))).collect //Array[org.apache.spark.sql.Row] = Array([10516327500000])
Сравнение планов, Codegen и производительности
Агрегация чисел
- Прямое сложение дает в java codegen ожидаемо сложениеdf.agg(sum($"id" + 777)).queryExecution.debug.codegen /* 063 */ agg_value_8 = agg_expr_0_0 + 777;- Вызов native функции дает также сложение в java коде:
df.agg(sum(mynativefunc($"id"))).queryExecution.debug.codegen /* 061 */ agg_value_8 = agg_expr_0_0 + 777;- Вызов же UDF порождаем вызов java функции с оберткой в try+catch:
df.agg(sum(myudffunc($"id"))).queryExecution.debug.codegen
/* 066 */ Integer agg_result_0 = null;
/* 067 */ try {
/* 068 */ agg_result_0 = (Integer)((scala.Function1) references[4] /* udf */).apply(agg_arg_0);
/* 069 */ } catch (Exception e) {
/* 070 */ throw new org.apache.spark.SparkException(((java.lang.String) references[3] /* errMsg */), e);
/* 071 */ }
- Нативное сложение и вызов нативной "mynativefunc" работает на 14% быстрей, чем вызов простой UDF
val native_res = time { df.agg(sum($"id" + 777)).collect }
val udf_res = time { df.agg(sum(myudffunc($"id"))).collect }
val native_func_res = time { df.agg(sum(mynativefunc($"id"))).collect }
native_res.toDouble/udf_res.toDouble
//= Native / Normal UDF = 0.8607967494045435
native_res.toDouble/native_func_res.toDouble
//= Native / Native UDF = 1.0923988307860604
Надо понимать, что такая разница вызвана простой логикой UDF. Если UDF будет сложней, то разница будет менее заметна.
Фильтрация по числовому столбцу
- При прямой фильтрации по числу есть фильтр по сложению и pushdown not null ограничения
df.where($"id" + 777 === 1000).explain()
*(1) Filter (isnotnull(id#281) AND ((id#281 + 777) = 1000))
+- *(1) ColumnarToRow
+- FileScan parquet [id#281] Batched: true, DataFilters: [isnotnull(id#281), ((id#281 + 777) = 1000)],
Format: Parquet, Location: InMemoryFileIndex[file:/C:/spark-master/test], PartitionFilters: [],
PushedFilters: [IsNotNull(id)], ReadSchema: struct
- Аналогично в native вызове:
df.where(mynativefunc($"id") === 1000).explain()
*(1) Filter (isnotnull(id#288) AND (mynativefunc(id#288) = 1000))
+- *(1) ColumnarToRow
+- FileScan parquet [id#288] Batched: true, DataFilters: [isnotnull(id#288), (mynativefunc(id#288) = 1000)],
Format: Parquet, Location: InMemoryFileIndex[file:/C:/spark-master/test], PartitionFilters: [],
PushedFilters: [IsNotNull(id)], ReadSchema: struct
- При вызове обычной UDF неизвестно, вернет ли она знаение или NULL, по этому в плане есть проверка на NULL и нет NOT NULL pushdown на уровень файлов:
df.where(myudffunc($"id") === 1000).explain()
*(1) Filter (if (isnull(id#283)) null else UDF(knownnotnull(id#283)) = 1000)
+- *(1) ColumnarToRow
+- FileScan parquet [id#283] Batched: true, DataFilters: [(if (isnull(id#283)) null else UDF(knownnotnull(id#283)) = 1000)],
Format: Parquet, Location: InMemoryFileIndex[file:/C:/spark-master/test], PartitionFilters: [],
PushedFilters: [], ReadSchema: struct
- сгенерированный java код у прямой фильтрации и native udf одинаково простой:
df.where($"id" + 777 === 1000).queryExecution.debug.codegen /* 058 */ filter_value_4 = columnartorow_value_0 + 777; /* 059 */ /* 060 */ boolean filter_value_3 = false; /* 061 */ filter_value_3 = filter_value_4 == 1000; /* 062 */ if (!filter_value_3) continue;
df.where(mynativefunc($"id") === 1000).queryExecution.debug.codegen /* 057 */ filter_value_4 = columnartorow_value_0 + 777; /* 058 */ /* 059 */ boolean filter_value_3 = false; /* 060 */ filter_value_3 = filter_value_4 == 1000; /* 061 */ if (!filter_value_3) continue;- В обычной UDF код значительно запутанней из-за вызова java функции UDF и проверок на NULL:
df.where(myudffunc($"id") === 1000).queryExecution.debug.codegen
/* 064 */ Integer filter_result_0 = null;
/* 065 */ try {
/* 066 */ filter_result_0 = (Integer)((scala.Function1) references[5] /* udf */).apply(filter_arg_0);
/* 067 */ } catch (Exception e) {
/* 068 */ throw new org.apache.spark.SparkException(((java.lang.String) references[4] /* errMsg */), e);
/* 069 */ }
/* 070 */
/* 071 */ boolean filter_isNull_5 = filter_result_0 == null;
/* 072 */ int filter_value_5 = -1;
/* 073 */ if (!filter_isNull_5) {
/* 074 */ filter_value_5 = filter_result_0;
/* 075 */ }
/* 076 */ filter_isNull_1 = filter_isNull_5;
/* 077 */ filter_value_1 = filter_value_5;
/* 078 */ }
/* 079 */ if (!filter_isNull_1) {
/* 080 */ filter_isNull_0 = false; // resultCode could change nullability.
/* 081 */ filter_value_0 = filter_value_1 == 1000;
/* 082 */
/* 083 */ }
/* 084 */ if (filter_isNull_0 || !filter_value_0) continue;
Агрегация строк
Spark хранит данные строк в UTF-8, а java в UTF-16, по этому при передаче параметров и возвращении результатов происходит двойная конвертация, которая замедляет работу UDF- Создадим string DF "sdf" из исходного df как конкатенацию "Ф + id + Я"
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val sdf = df.select(concat(lit("Ф"), $"id".cast(StringType), lit("Я")).as("id"))
- Сравним работу нативной обработки строк с обработкой строк через UDF "subOne":
val subOne = udf((x: String) => x.substring(1,2).toInt)
spark.udf.register("subOne", subOne)
- Сгенерированный java код для нативного substr содержит обычный substr и приведение к int
sdf.select($"id".substr(2,1).cast(IntegerType).as("id")).queryExecution.debug.codegen
/* 052 */ UTF8String columnartorow_value_0 = columnartorow_isNull_0 ? null : (columnartorow_mutableStateArray_2[0].getUTF8String(columnartorow_rowIdx_0));
/* 053 */ if (!columnartorow_isNull_0) {
/* 054 */ project_isNull_1 = false; // resultCode could change nullability.
/* 055 */ project_value_1 = columnartorow_value_0.substringSQL(2, 1);
/* 056 */
/* 057 */ }
/* 058 */ boolean project_isNull_0 = project_isNull_1;
/* 059 */ int project_value_0 = -1;
/* 060 */ if (!project_isNull_1) {
/* 061 */ UTF8String.IntWrapper project_intWrapper_0 = new UTF8String.IntWrapper();
/* 062 */ if (project_value_1.toInt(project_intWrapper_0)) {
/* 063 */ project_value_0 = project_intWrapper_0.value;
/* 064 */ } else {
/* 065 */ project_isNull_0 = true;
/* 066 */ }
/* 067 */ project_intWrapper_0 = null;
/* 068 */ }
- Сгенерированный java код для subOne UDF содержит вызов java функции:
sdf.select(subOne($"id").as("id")).queryExecution.debug.codegen
/* 050 */ UTF8String columnartorow_value_0 = columnartorow_isNull_0 ? null : (columnartorow_mutableStateArray_2[0].getUTF8String(columnartorow_rowIdx_0));
/* 051 */
/* 052 */ Object project_arg_0 = null;
/* 053 */ if (columnartorow_isNull_0) {
/* 054 */ project_arg_0 = ((scala.Function1[]) references[2] /* converters */)[0].apply(null);
/* 055 */ } else {
/* 056 */ project_arg_0 = ((scala.Function1[]) references[2] /* converters */)[0].apply(columnartorow_value_0);
/* 057 */ }
/* 058 */
/* 059 */ Integer project_result_0 = null;
/* 060 */ try {
/* 061 */ project_result_0 = (Integer)((scala.Function1) references[4] /* udf */).apply(project_arg_0);
/* 062 */ } catch (Exception e) {
/* 063 */ throw new org.apache.spark.SparkException(((java.lang.String) references[3] /* errMsg */), e);
/* 064 */ }
/* 065 */
/* 066 */ boolean project_isNull_0 = project_result_0 == null;
/* 067 */ int project_value_0 = -1;
/* 068 */ if (!project_isNull_0) {
/* 069 */ project_value_0 = project_result_0;
/* 070 */ }
- UDF на 28% медленней нативного варианта за счет дополнительных конвертаций строк:
val native_res = time { sdf.select($"id".substr(2,1).cast(IntegerType).as("id")).agg(sum($"id")).collect }
val udf_res = time { sdf.select(subOne($"id").as("id")).agg(sum($"id")).collect }
native_res.toDouble/udf_res.toDouble
//= Native / string UDF = 0.7206604510597114
Использование в SQL
К сожалению, не нашел способа добавить свою нативную udf в sql без прямой модификации кода Spark.Для этого нужно:
- Добавить свой jar в зависимости проекта catalyst
- заимпортить org.apache.spark.sql.custom_functions.mynativefunc в sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
- добавить маппинг функции по аналогии в массив expressions объекта FunctionRegistry того же файла
после этого функцию можно будет использовать в sql
Статья вдохновлена докладом на databricks summit 2020
Комментариев нет:
Отправить комментарий