среда, 28 июля 2021 г.

Azure: Databricks vs HDInsight

В облаке MS Azure есть 2 простых способа организовать кластер Spark - это Databricks и HDInsight.

Цена

Databricks - лидер и основной разработчик Spark, но он и выходит на 40% дороже, чем кластер HDInsight того же размера.
Для сравнения выбраны "D14 v2 (16 Cores, 112 GB RAM)", т.к. они присутствуют в обоих сборках.

Кластер состоит из 1 драйвера и 6 воркеров.
Стоимосить кластера Databricks (Job кластер) - 14.83$:

Стоимосить кластера HDInsight - 10.47$:

Разница как раз складывается из DBU (databricks unit), которых нет в HDInsight. Сами железные сервера стоят одинаково.
При этом Ineractive Databricks кластер 21.8$, т.е. в 2 раза дороже HDInsight
Но справедливости ради, стоит заметить, что HDInsight не так удобен:
* Его нужно каждый раз вручную удалять и создавать (т.е. состояние кода и настроек не сохраняется из коробки)
* Периодические проблемы с падениями сервисов ambari и устаревшие версии Spark

Методика

Подготовительный этап - подключаемся к blob storage, читаем данные и задаем схему вручую.
( Подключиться к metastorе Databricks из HDInsight сходу не получилось)
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import java.time.format.DateTimeFormatter
import java.time.LocalDate
import java.time.Duration
import org.apache.spark.sql.types.Decimal

val pos_rec_path = "abfs://dbricks-fs@***.dfs.core.windows.net/warehouse/rdw.db/pos_rec_itm/"
val mat_path = "abfs://dbricks-fs@***.dfs.core.windows.net/warehouse/rdw.db/bi0_pmaterial/" 


val calc_days = Array(("20210501", "20210631"),//simpl
                      ("20210301", "20210431"), //complex
                      ("20210201", "20210631"))//adhoc
                      
val pos_rec_cols = Array("check_id", "rc_rt_fdy", ..., "calday")
val mat_cols = Array("material", "objvers", ..., "cont_unit")

val pos_df = spark.read.orc(pos_rec_path).toDF(pos_rec_cols:_*)
val mat_df = spark.read.orc(mat_path).toDF(mat_cols:_*)
Важно: тестирование будет происходить на ORC файлах

Для сравнения производительности использовалось 3 теста:
* Простой запрос на 1 таблице с агрегацией

def simplQ() = {
  val ret = pos_df.
  where($"calday".between(calc_days(0)._1, calc_days(0)._2)).
  groupBy($"plant").
  agg(
    sum("rtsaexcust").as("rtsaexcust"),
    count("*").as("cnt")
  ).
  collect
  
  ret.length
}
* AdHoc запросы на большом периоде, но с сильной фильтрацией по товарам и магазинам
def adHocQ(p_wgh3: String) = {
  val ret = pos_df.where($"calday".between(calc_days(2)._1, calc_days(2)._2) && $"plant".isin("0002", "0001", "0128")).
    join(mat_df.where($"rpa_wgh3" === p_wgh3), mat_df("material") === pos_df("material"), "inner").
    groupBy($"rpa_wgh4").
    agg(
        sum("rtsaexcust").as("rtsaexcust"),
        countDistinct("check_id").as("checks"),
        countDistinct("bic_client").as("clients"),
        count("*").as("cnt")
      ).
    orderBy($"rpa_wgh4").
  collect
  
  ret.length
}
* Сложный запрос с join, group by, order, case, distinct и аналитической функцией
def complexQ() = {
  val ret = pos_df.where($"calday".between(calc_days(1)._1, calc_days(1)._2)).
  join(mat_df.where($"bic_apur_grp" =!= "Z01"), mat_df("material") === pos_df("material"), "inner").
  groupBy($"rpa_wgh2", $"rpa_wgh3").
  agg(
      sum("rtsaexcust").as("rtsaexcust"),
      sum(when($"RT_PROMO" =!= " ", "bic_zaltcost").otherwise(0)).as("bic_zaltcost_promo"),
      countDistinct("check_id").as("checks"),
      countDistinct("bic_client").as("clients"),
      count("*").as("cnt"),
      sum(sum($"rtsaexcusv")).over( Window.partitionBy( $"rpa_wgh2" )).as("rtsaexcusv_w2")
    ).
  orderBy($"rpa_wgh3").
  collect
  
  ret.length
}
Методика тестирования: каждый запрос прогоняется 5 раз и замеряется время работы. Из результатов исключается самый быстрый и самый медленный запуск, от оставшихся 3 берется среднее:
def avgTiming(block: => Unit) = {
  var timing = Array[Int]()
  for(i <- 1 to 5) {
    val st_dt = java.time.LocalDateTime.now
    block
    val ed_dt = java.time.LocalDateTime.now
    timing = timing :+ Duration.between(st_dt, ed_dt).toMillis.toInt
  }
  (timing.sum - timing.max - timing.min) / (timing.length - 2)
}
adHoc запрос дополнительно прогонялся по 3 раза на разных фильтрах, а для сложного запроса отключался broadcast, чтобы проверить скорость больших shuffle при SMJ
println("simplQ = " + avgTiming(simplQ))
println("adHocQ = " + avgTiming(List("5803", "5302", "7202").map(adHocQ)))

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.databricks.adaptive.autoBroadcastJoinThreshold", "-1")
println("complexQ = " + avgTiming(complexQ))

Производительность

Databricks

Замер Простой запрос (сек) adHoc запрос (сек) Сложный запрос (сек)
Spark 2.4.3 12.4 46.8 164.2
Spark 3.0.1 37.5 71.8 207.9
Spark 3.1.2 29.4 70.1 247.0

Странно, но Spark 2.4 показывает лучшую производительность по сравнению с 3 при работе с ORC файлами.
Если сравнить планы, то видно разницу, что в Spark 3.0 этам чтения Orc файла не включен в wholestagegen (1), что по непонятной причине приводит к отключению части фильтрации на диске и увеличению объему фильтруемых данных после считывания почти в 3 раза:
Spark 2.4 Spark 3.0
 +- *(1) Project [_col0#2 AS check_id#100, _col11#13 AS bic_client#111, _col17#19 AS material#117, _col19#21 AS rt_promo#119, _col27#29 AS rtsaexcust#127, _col28#30 AS rtsaexcusv#128]
    +- *(1) Filter isnotnull(_col17#19)
       +- *(1) FileScan orc [_col0#2,_col11#13,_col17#19,_col19#21,_col27#29,_col28#30,calday#50] Batched: true, DataFilters: [isnotnull(_col17#19)], Format: ORC, Location: InMemoryFileIndex[dbfs:/mnt/exfs/warehouse/rdw.db/pos_rec_itm], PartitionCount: 61, PartitionFilters: [isnotnull(calday#50), (calday#50 >= 20210301), (calday#50 <= 20210431)], PushedFilters: [IsNotNull(_col17)], ReadSchema: struct
number of files read	244
filesystem read data size total (min, med, max)	6.0 GB (22.1 MB, 31.1 MB, 44.0 MB)
scan time total (min, med, max)	15.9 m (2.9 s, 4.8 s, 10.1 s)
 +- *(1) Project [_col0#2 AS check_id#100, _col11#13 AS bic_client#111, _col17#19 AS material#117, _col19#21 AS rt_promo#119, _col27#29 AS rtsaexcust#127, _col28#30 AS rtsaexcusv#128]
    +- *(1) Filter isnotnull(_col17#19)
       +- *(1) ColumnarToRow
          +- FileScan orc [_col0#2,_col11#13,_col17#19,_col19#21,_col27#29,_col28#30,calday#50] Batched: true, DataFilters: [isnotnull(_col17#19)], Format: ORC, Location: InMemoryFileIndex[dbfs:/mnt/exfs/warehouse/rdw.db/pos_rec_itm], PartitionFilters: [isnotnull(calday#50), (calday#50 >= 20210301), (calday#50 <= 20210431)], PushedFilters: [IsNotNull(_col17)], ReadSchema: struct
number of files read	244
filesystem read data size total (min, med, max)	16.3 GiB (60.3 MiB, 85.0 MiB, 119.2 MiB)
scan time total (min, med, max)	33.9 m (6.6 s, 10.7 s, 15.2 s)
Аналогичное сравнение было проведено на parquet файлах, в них так же выиграл Spark 2.4, но разницы в скорости чтения "pos_rec_itm" уже не было:
Spark 2.4:
simplQ = 8990
adHocQ = 61177
complexQ = 146193

Spark 3.0:
simplQ = 8625
adHocQ = 59052
complexQ = 175663

Кластер, во время работы, был близок к полной загруженности:

HDInsight

При запуске тестов в HDInsight было 2 особенности:
* Запросы не запускались в Spark 3.0 (preview) с ошибкой "org/apache/spark/sql/types/Decimal.class is broken (class java.lang.RuntimeException/error reading Scala signature of Decimal.class)"
* При создании кластера из 6 машин с 16 CPU, в Yarn было доступно 186 ядер, вместо 96.
По этому было проведено несколько тестов с разным количество воркеров.
Запуск с 93 вокрерами (6ГБ, 2 core на вокрер) должен соответствовать конфигурации Databricks.
Замер Простой запрос (сек) adHoc запрос (сек) Сложный запрос (сек)
Spark 3.0 Error Error Error
Spark 2.4.4
num-executors = 186, executor-memory=3000M, executor-cores = 1
12.4 46.8 Error
Spark 2.4.4
num-executors = 93, executor-memory=6000M, executor-cores = 2
10.1 46.9 295.8
Spark 2.4.4
num-executors = 47, executor-memory=12000M, executor-cores = 4
8.0 40.8 212.2
У HDInsight лучше всего показала конфигурация с числом вокреров = число CPU / 2 и числом ядер на вокер x4.

Загруженность кластера также была близка к максимуму.



Вывод

Из-за проблем с Orc у Databricks Spark 3, сравнивать будем Databricks Spark 2.4.3 и HDInsight Spark 2.4.4 (в конфигурации num-executors = 47, executor-memory=12000M, executor-cores = 4).
Суммарное средних времени Databricks Spark 2.4.3 = 12.4 + 46.8 + 164.2 = 223.4 суммарных секунд
HDInsight Spark 2.4.4 = 8.0 + 40.8 + 212.2 = 261 суммарных секунд

HDInsight хорошо держался на простых запросах, но сильно просел на комплексном, что по итогу дает +17% к времени работы в сравнении с Databricks
Но учитывая +40% к стоимости Databricks, цена за единицу работы у HDInsight выходит выгодней.
К текущим серверам HDInsight можно добавить 40% мощности, чтобы получить туже стоимость с Databricks, но оценочно все равно выиграть 23% в производительности.

Даже, если на кластере будут исполняться только сложные запросы, то цена HDInsigh в -40% все равно перекрывает просадку производительности в +30%

Комментариев нет:

Отправить комментарий