Цена
Databricks - лидер и основной разработчик Spark, но он и выходит на 40% дороже, чем кластер HDInsight того же размера.Для сравнения выбраны "D14 v2 (16 Cores, 112 GB RAM)", т.к. они присутствуют в обоих сборках.
Кластер состоит из 1 драйвера и 6 воркеров.
Стоимосить кластера Databricks (Job кластер) - 14.83$:
Стоимосить кластера HDInsight - 10.47$:
Разница как раз складывается из DBU (databricks unit), которых нет в HDInsight. Сами железные сервера стоят одинаково.
При этом Ineractive Databricks кластер 21.8$, т.е. в 2 раза дороже HDInsight
Но справедливости ради, стоит заметить, что HDInsight не так удобен:
* Его нужно каждый раз вручную удалять и создавать (т.е. состояние кода и настроек не сохраняется из коробки)
* Периодические проблемы с падениями сервисов ambari и устаревшие версии Spark
Методика
Подготовительный этап - подключаемся к blob storage, читаем данные и задаем схему вручую.( Подключиться к metastorе Databricks из HDInsight сходу не получилось)
import org.apache.spark.sql.types._ import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import java.time.format.DateTimeFormatter import java.time.LocalDate import java.time.Duration import org.apache.spark.sql.types.Decimal val pos_rec_path = "abfs://dbricks-fs@***.dfs.core.windows.net/warehouse/rdw.db/pos_rec_itm/" val mat_path = "abfs://dbricks-fs@***.dfs.core.windows.net/warehouse/rdw.db/bi0_pmaterial/" val calc_days = Array(("20210501", "20210631"),//simpl ("20210301", "20210431"), //complex ("20210201", "20210631"))//adhoc val pos_rec_cols = Array("check_id", "rc_rt_fdy", ..., "calday") val mat_cols = Array("material", "objvers", ..., "cont_unit") val pos_df = spark.read.orc(pos_rec_path).toDF(pos_rec_cols:_*) val mat_df = spark.read.orc(mat_path).toDF(mat_cols:_*)Важно: тестирование будет происходить на ORC файлах
Для сравнения производительности использовалось 3 теста:
* Простой запрос на 1 таблице с агрегацией
def simplQ() = { val ret = pos_df. where($"calday".between(calc_days(0)._1, calc_days(0)._2)). groupBy($"plant"). agg( sum("rtsaexcust").as("rtsaexcust"), count("*").as("cnt") ). collect ret.length }* AdHoc запросы на большом периоде, но с сильной фильтрацией по товарам и магазинам
def adHocQ(p_wgh3: String) = { val ret = pos_df.where($"calday".between(calc_days(2)._1, calc_days(2)._2) && $"plant".isin("0002", "0001", "0128")). join(mat_df.where($"rpa_wgh3" === p_wgh3), mat_df("material") === pos_df("material"), "inner"). groupBy($"rpa_wgh4"). agg( sum("rtsaexcust").as("rtsaexcust"), countDistinct("check_id").as("checks"), countDistinct("bic_client").as("clients"), count("*").as("cnt") ). orderBy($"rpa_wgh4"). collect ret.length }* Сложный запрос с join, group by, order, case, distinct и аналитической функцией
def complexQ() = { val ret = pos_df.where($"calday".between(calc_days(1)._1, calc_days(1)._2)). join(mat_df.where($"bic_apur_grp" =!= "Z01"), mat_df("material") === pos_df("material"), "inner"). groupBy($"rpa_wgh2", $"rpa_wgh3"). agg( sum("rtsaexcust").as("rtsaexcust"), sum(when($"RT_PROMO" =!= " ", "bic_zaltcost").otherwise(0)).as("bic_zaltcost_promo"), countDistinct("check_id").as("checks"), countDistinct("bic_client").as("clients"), count("*").as("cnt"), sum(sum($"rtsaexcusv")).over( Window.partitionBy( $"rpa_wgh2" )).as("rtsaexcusv_w2") ). orderBy($"rpa_wgh3"). collect ret.length }Методика тестирования: каждый запрос прогоняется 5 раз и замеряется время работы. Из результатов исключается самый быстрый и самый медленный запуск, от оставшихся 3 берется среднее:
def avgTiming(block: => Unit) = { var timing = Array[Int]() for(i <- 1 to 5) { val st_dt = java.time.LocalDateTime.now block val ed_dt = java.time.LocalDateTime.now timing = timing :+ Duration.between(st_dt, ed_dt).toMillis.toInt } (timing.sum - timing.max - timing.min) / (timing.length - 2) }adHoc запрос дополнительно прогонялся по 3 раза на разных фильтрах, а для сложного запроса отключался broadcast, чтобы проверить скорость больших shuffle при SMJ
println("simplQ = " + avgTiming(simplQ)) println("adHocQ = " + avgTiming(List("5803", "5302", "7202").map(adHocQ))) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") spark.conf.set("spark.sql.adaptive.enabled", "false") spark.conf.set("spark.databricks.adaptive.autoBroadcastJoinThreshold", "-1") println("complexQ = " + avgTiming(complexQ))
Производительность
Databricks
Замер | Простой запрос (сек) | adHoc запрос (сек) | Сложный запрос (сек) |
Spark 2.4.3 | 12.4 | 46.8 | 164.2 |
Spark 3.0.1 | 37.5 | 71.8 | 207.9 |
Spark 3.1.2 | 29.4 | 70.1 | 247.0 |
Странно, но Spark 2.4 показывает лучшую производительность по сравнению с 3 при работе с ORC файлами.
Если сравнить планы, то видно разницу, что в Spark 3.0 этам чтения Orc файла не включен в wholestagegen (1), что по непонятной причине приводит к отключению части фильтрации на диске и увеличению объему фильтруемых данных после считывания почти в 3 раза:
Аналогичное сравнение было проведено на parquet файлах, в них так же выиграл Spark 2.4, но разницы в скорости чтения "pos_rec_itm" уже не было:Если сравнить планы, то видно разницу, что в Spark 3.0 этам чтения Orc файла не включен в wholestagegen (1), что по непонятной причине приводит к отключению части фильтрации на диске и увеличению объему фильтруемых данных после считывания почти в 3 раза:
Spark 2.4 | Spark 3.0 |
+- *(1) Project [_col0#2 AS check_id#100, _col11#13 AS bic_client#111, _col17#19 AS material#117, _col19#21 AS rt_promo#119, _col27#29 AS rtsaexcust#127, _col28#30 AS rtsaexcusv#128] +- *(1) Filter isnotnull(_col17#19) +- *(1) FileScan orc [_col0#2,_col11#13,_col17#19,_col19#21,_col27#29,_col28#30,calday#50] Batched: true, DataFilters: [isnotnull(_col17#19)], Format: ORC, Location: InMemoryFileIndex[dbfs:/mnt/exfs/warehouse/rdw.db/pos_rec_itm], PartitionCount: 61, PartitionFilters: [isnotnull(calday#50), (calday#50 >= 20210301), (calday#50 <= 20210431)], PushedFilters: [IsNotNull(_col17)], ReadSchema: struct number of files read 244 filesystem read data size total (min, med, max) 6.0 GB (22.1 MB, 31.1 MB, 44.0 MB) scan time total (min, med, max) 15.9 m (2.9 s, 4.8 s, 10.1 s) |
+- *(1) Project [_col0#2 AS check_id#100, _col11#13 AS bic_client#111, _col17#19 AS material#117, _col19#21 AS rt_promo#119, _col27#29 AS rtsaexcust#127, _col28#30 AS rtsaexcusv#128] +- *(1) Filter isnotnull(_col17#19) +- *(1) ColumnarToRow +- FileScan orc [_col0#2,_col11#13,_col17#19,_col19#21,_col27#29,_col28#30,calday#50] Batched: true, DataFilters: [isnotnull(_col17#19)], Format: ORC, Location: InMemoryFileIndex[dbfs:/mnt/exfs/warehouse/rdw.db/pos_rec_itm], PartitionFilters: [isnotnull(calday#50), (calday#50 >= 20210301), (calday#50 <= 20210431)], PushedFilters: [IsNotNull(_col17)], ReadSchema: struct number of files read 244 filesystem read data size total (min, med, max) 16.3 GiB (60.3 MiB, 85.0 MiB, 119.2 MiB) scan time total (min, med, max) 33.9 m (6.6 s, 10.7 s, 15.2 s) |
Spark 2.4: simplQ = 8990 adHocQ = 61177 complexQ = 146193 Spark 3.0: simplQ = 8625 adHocQ = 59052 complexQ = 175663
Кластер, во время работы, был близок к полной загруженности:
HDInsight
При запуске тестов в HDInsight было 2 особенности:* Запросы не запускались в Spark 3.0 (preview) с ошибкой "org/apache/spark/sql/types/Decimal.class is broken (class java.lang.RuntimeException/error reading Scala signature of Decimal.class)"
* При создании кластера из 6 машин с 16 CPU, в Yarn было доступно 186 ядер, вместо 96.
По этому было проведено несколько тестов с разным количество воркеров.
Запуск с 93 вокрерами (6ГБ, 2 core на вокрер) должен соответствовать конфигурации Databricks.
Замер | Простой запрос (сек) | adHoc запрос (сек) | Сложный запрос (сек) |
Spark 3.0 | Error | Error | Error |
Spark 2.4.4
num-executors = 186, executor-memory=3000M, executor-cores = 1 |
12.4 | 46.8 | Error |
Spark 2.4.4
num-executors = 93, executor-memory=6000M, executor-cores = 2 |
10.1 | 46.9 | 295.8 |
Spark 2.4.4
num-executors = 47, executor-memory=12000M, executor-cores = 4 |
8.0 | 40.8 | 212.2 |
Загруженность кластера также была близка к максимуму.
Вывод
Из-за проблем с Orc у Databricks Spark 3, сравнивать будем Databricks Spark 2.4.3 и HDInsight Spark 2.4.4 (в конфигурации num-executors = 47, executor-memory=12000M, executor-cores = 4).Суммарное средних времени Databricks Spark 2.4.3 = 12.4 + 46.8 + 164.2 = 223.4 суммарных секунд
HDInsight Spark 2.4.4 = 8.0 + 40.8 + 212.2 = 261 суммарных секунд
HDInsight хорошо держался на простых запросах, но сильно просел на комплексном, что по итогу дает +17% к времени работы в сравнении с Databricks
Но учитывая +40% к стоимости Databricks, цена за единицу работы у HDInsight выходит выгодней.
К текущим серверам HDInsight можно добавить 40% мощности, чтобы получить туже стоимость с Databricks, но оценочно все равно выиграть 23% в производительности.
Даже, если на кластере будут исполняться только сложные запросы, то цена HDInsigh в -40% все равно перекрывает просадку производительности в +30%
Комментариев нет:
Отправить комментарий