Цена
Databricks - лидер и основной разработчик Spark, но он и выходит на 40% дороже, чем кластер HDInsight того же размера.Для сравнения выбраны "D14 v2 (16 Cores, 112 GB RAM)", т.к. они присутствуют в обоих сборках.
Кластер состоит из 1 драйвера и 6 воркеров.
Стоимосить кластера Databricks (Job кластер) - 14.83$:

Стоимосить кластера HDInsight - 10.47$:

Разница как раз складывается из DBU (databricks unit), которых нет в HDInsight. Сами железные сервера стоят одинаково.
При этом Ineractive Databricks кластер 21.8$, т.е. в 2 раза дороже HDInsight
Но справедливости ради, стоит заметить, что HDInsight не так удобен:
* Его нужно каждый раз вручную удалять и создавать (т.е. состояние кода и настроек не сохраняется из коробки)
* Периодические проблемы с падениями сервисов ambari и устаревшие версии Spark
Методика
Подготовительный этап - подключаемся к blob storage, читаем данные и задаем схему вручую.( Подключиться к metastorе Databricks из HDInsight сходу не получилось)
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import java.time.format.DateTimeFormatter
import java.time.LocalDate
import java.time.Duration
import org.apache.spark.sql.types.Decimal
val pos_rec_path = "abfs://dbricks-fs@***.dfs.core.windows.net/warehouse/rdw.db/pos_rec_itm/"
val mat_path = "abfs://dbricks-fs@***.dfs.core.windows.net/warehouse/rdw.db/bi0_pmaterial/"
val calc_days = Array(("20210501", "20210631"),//simpl
("20210301", "20210431"), //complex
("20210201", "20210631"))//adhoc
val pos_rec_cols = Array("check_id", "rc_rt_fdy", ..., "calday")
val mat_cols = Array("material", "objvers", ..., "cont_unit")
val pos_df = spark.read.orc(pos_rec_path).toDF(pos_rec_cols:_*)
val mat_df = spark.read.orc(mat_path).toDF(mat_cols:_*)
Важно: тестирование будет происходить на ORC файлахДля сравнения производительности использовалось 3 теста:
* Простой запрос на 1 таблице с агрегацией
def simplQ() = {
val ret = pos_df.
where($"calday".between(calc_days(0)._1, calc_days(0)._2)).
groupBy($"plant").
agg(
sum("rtsaexcust").as("rtsaexcust"),
count("*").as("cnt")
).
collect
ret.length
}
* AdHoc запросы на большом периоде, но с сильной фильтрацией по товарам и магазинам
def adHocQ(p_wgh3: String) = {
val ret = pos_df.where($"calday".between(calc_days(2)._1, calc_days(2)._2) && $"plant".isin("0002", "0001", "0128")).
join(mat_df.where($"rpa_wgh3" === p_wgh3), mat_df("material") === pos_df("material"), "inner").
groupBy($"rpa_wgh4").
agg(
sum("rtsaexcust").as("rtsaexcust"),
countDistinct("check_id").as("checks"),
countDistinct("bic_client").as("clients"),
count("*").as("cnt")
).
orderBy($"rpa_wgh4").
collect
ret.length
}
* Сложный запрос с join, group by, order, case, distinct и аналитической функцией
def complexQ() = {
val ret = pos_df.where($"calday".between(calc_days(1)._1, calc_days(1)._2)).
join(mat_df.where($"bic_apur_grp" =!= "Z01"), mat_df("material") === pos_df("material"), "inner").
groupBy($"rpa_wgh2", $"rpa_wgh3").
agg(
sum("rtsaexcust").as("rtsaexcust"),
sum(when($"RT_PROMO" =!= " ", "bic_zaltcost").otherwise(0)).as("bic_zaltcost_promo"),
countDistinct("check_id").as("checks"),
countDistinct("bic_client").as("clients"),
count("*").as("cnt"),
sum(sum($"rtsaexcusv")).over( Window.partitionBy( $"rpa_wgh2" )).as("rtsaexcusv_w2")
).
orderBy($"rpa_wgh3").
collect
ret.length
}
Методика тестирования: каждый запрос прогоняется 5 раз и замеряется время работы. Из результатов исключается самый быстрый и самый медленный запуск, от оставшихся 3 берется среднее:
def avgTiming(block: => Unit) = {
var timing = Array[Int]()
for(i <- 1 to 5) {
val st_dt = java.time.LocalDateTime.now
block
val ed_dt = java.time.LocalDateTime.now
timing = timing :+ Duration.between(st_dt, ed_dt).toMillis.toInt
}
(timing.sum - timing.max - timing.min) / (timing.length - 2)
}
adHoc запрос дополнительно прогонялся по 3 раза на разных фильтрах, а для сложного запроса отключался broadcast, чтобы проверить скорость больших shuffle при SMJ
println("simplQ = " + avgTiming(simplQ))
println("adHocQ = " + avgTiming(List("5803", "5302", "7202").map(adHocQ)))
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.databricks.adaptive.autoBroadcastJoinThreshold", "-1")
println("complexQ = " + avgTiming(complexQ))
Производительность
Databricks
| Замер | Простой запрос (сек) | adHoc запрос (сек) | Сложный запрос (сек) |
| Spark 2.4.3 | 12.4 | 46.8 | 164.2 |
| Spark 3.0.1 | 37.5 | 71.8 | 207.9 |
| Spark 3.1.2 | 29.4 | 70.1 | 247.0 |
Странно, но Spark 2.4 показывает лучшую производительность по сравнению с 3 при работе с ORC файлами.
Если сравнить планы, то видно разницу, что в Spark 3.0 этам чтения Orc файла не включен в wholestagegen (1), что по непонятной причине приводит к отключению части фильтрации на диске и увеличению объему фильтруемых данных после считывания почти в 3 раза:
Аналогичное сравнение было проведено на parquet файлах, в них так же выиграл Spark 2.4, но разницы в скорости чтения "pos_rec_itm" уже не было:Если сравнить планы, то видно разницу, что в Spark 3.0 этам чтения Orc файла не включен в wholestagegen (1), что по непонятной причине приводит к отключению части фильтрации на диске и увеличению объему фильтруемых данных после считывания почти в 3 раза:
| Spark 2.4 | Spark 3.0 |
+- *(1) Project [_col0#2 AS check_id#100, _col11#13 AS bic_client#111, _col17#19 AS material#117, _col19#21 AS rt_promo#119, _col27#29 AS rtsaexcust#127, _col28#30 AS rtsaexcusv#128]
+- *(1) Filter isnotnull(_col17#19)
+- *(1) FileScan orc [_col0#2,_col11#13,_col17#19,_col19#21,_col27#29,_col28#30,calday#50] Batched: true, DataFilters: [isnotnull(_col17#19)], Format: ORC, Location: InMemoryFileIndex[dbfs:/mnt/exfs/warehouse/rdw.db/pos_rec_itm], PartitionCount: 61, PartitionFilters: [isnotnull(calday#50), (calday#50 >= 20210301), (calday#50 <= 20210431)], PushedFilters: [IsNotNull(_col17)], ReadSchema: struct
number of files read 244 filesystem read data size total (min, med, max) 6.0 GB (22.1 MB, 31.1 MB, 44.0 MB) scan time total (min, med, max) 15.9 m (2.9 s, 4.8 s, 10.1 s) |
+- *(1) Project [_col0#2 AS check_id#100, _col11#13 AS bic_client#111, _col17#19 AS material#117, _col19#21 AS rt_promo#119, _col27#29 AS rtsaexcust#127, _col28#30 AS rtsaexcusv#128]
+- *(1) Filter isnotnull(_col17#19)
+- *(1) ColumnarToRow
+- FileScan orc [_col0#2,_col11#13,_col17#19,_col19#21,_col27#29,_col28#30,calday#50] Batched: true, DataFilters: [isnotnull(_col17#19)], Format: ORC, Location: InMemoryFileIndex[dbfs:/mnt/exfs/warehouse/rdw.db/pos_rec_itm], PartitionFilters: [isnotnull(calday#50), (calday#50 >= 20210301), (calday#50 <= 20210431)], PushedFilters: [IsNotNull(_col17)], ReadSchema: struct
number of files read 244 filesystem read data size total (min, med, max) 16.3 GiB (60.3 MiB, 85.0 MiB, 119.2 MiB) scan time total (min, med, max) 33.9 m (6.6 s, 10.7 s, 15.2 s) |
Spark 2.4: simplQ = 8990 adHocQ = 61177 complexQ = 146193 Spark 3.0: simplQ = 8625 adHocQ = 59052 complexQ = 175663
Кластер, во время работы, был близок к полной загруженности:

HDInsight
При запуске тестов в HDInsight было 2 особенности:* Запросы не запускались в Spark 3.0 (preview) с ошибкой "org/apache/spark/sql/types/Decimal.class is broken (class java.lang.RuntimeException/error reading Scala signature of Decimal.class)"
* При создании кластера из 6 машин с 16 CPU, в Yarn было доступно 186 ядер, вместо 96.
По этому было проведено несколько тестов с разным количество воркеров.
Запуск с 93 вокрерами (6ГБ, 2 core на вокрер) должен соответствовать конфигурации Databricks.
| Замер | Простой запрос (сек) | adHoc запрос (сек) | Сложный запрос (сек) |
| Spark 3.0 | Error | Error | Error |
| Spark 2.4.4
num-executors = 186, executor-memory=3000M, executor-cores = 1 |
12.4 | 46.8 | Error |
| Spark 2.4.4
num-executors = 93, executor-memory=6000M, executor-cores = 2 |
10.1 | 46.9 | 295.8 |
| Spark 2.4.4
num-executors = 47, executor-memory=12000M, executor-cores = 4 |
8.0 | 40.8 | 212.2 |
Загруженность кластера также была близка к максимуму.


Вывод
Из-за проблем с Orc у Databricks Spark 3, сравнивать будем Databricks Spark 2.4.3 и HDInsight Spark 2.4.4 (в конфигурации num-executors = 47, executor-memory=12000M, executor-cores = 4).Суммарное средних времени Databricks Spark 2.4.3 = 12.4 + 46.8 + 164.2 = 223.4 суммарных секунд
HDInsight Spark 2.4.4 = 8.0 + 40.8 + 212.2 = 261 суммарных секунд
HDInsight хорошо держался на простых запросах, но сильно просел на комплексном, что по итогу дает +17% к времени работы в сравнении с Databricks
Но учитывая +40% к стоимости Databricks, цена за единицу работы у HDInsight выходит выгодней.
К текущим серверам HDInsight можно добавить 40% мощности, чтобы получить туже стоимость с Databricks, но оценочно все равно выиграть 23% в производительности.
Даже, если на кластере будут исполняться только сложные запросы, то цена HDInsigh в -40% все равно перекрывает просадку производительности в +30%
Комментариев нет:
Отправить комментарий