пятница, 21 августа 2020 г.

BigData анализ с помощью Spark и Scala

В этой статье я хотел бы охватить основные аспекты работы с фреймворком Spark
Общее описание архитектуры Spark было ранее тут: /search/label/bigdata#spark

RDD

Виды операций
- narrow (слабые) преобразования - создание нового RDD на основе текущего с выполнением некоторых операций
все narrow операции выполняются в 1 stage, т.к. данные не надо никуда перекачивать (обработка на месте)
- wide (сильная) действия - вычисления результата, данные 1 блока RDD переносятся в несколько других блоков (reduce, order, group, join)
wide операция создает новый stage в плане, т.к. требует shuffling - сохранение промежуточных данных и пересылку на другие сервера (партиции)
выполнение преобразований откладывается и оптимизируется до первого действия
(т.е. нагляднее сделать несколько map/filter, чем 1 громоздкий)

- если RDD сохранили в переменную, то при каждом вызове действия над ней все преобразования будут вычисляться заново
.persist / .cache - если нужно сохранить результат преобразований
-- можно задать место хранения, к примеру: (StorageLevel.DISK_ONLY) / MEMORY_ONLY/ SER - с сериализацией объекта
-- если задали MEMORY_ONLY, но после RDD был вытеснен, то при последующем обращении к нему ему нужно выполниться заново
если такая вероятность есть, то лучше сохранять с MEMORY_AND_DISK - тогда при повторном обращении блок будет сохранен и загружен с диска
-- unpersist - выгрузить переменную из памяти

Создание RDD
- из массива:
val lines = sc.parallelize(List("pandas", "i like pandas"))
- из текстового файла:
val lines = sc.textFile("/path/to/README.md")

Сохранение RDD
- .collect - в локальный массив или take(N элементов)/takeSample
- saveAsSequenceFile("path",Some(classOf[GzipCodec]) - в 1 текстовый файл
saveAsTextFile - в несколько файлов

- передача собственных функций
-- функция должна поддерживать сериализацию
def isMatch(s: String): Boolean = {
 s.contains("123")
}
file.filter(isMatch).first()

Основные операции
- input.map(x => х * х) - выполняется над каждым элементом и создает новый RDD с тем же числом преобразованных элементов
- flatMap - то же самое, но разворачивает элементы подмассивов в элементы 1 массива
- поддерживаются операции над множествами: distinct / union / intersection / substract (oracle minus) / cartesian
- sample - выборка случайных элементов
- foreach

Основные действия
def foldleft[B](z: B)(f: (B, A)=> B): B 
- не может быть распараллелена, т.к. последовательно агрегирует данные с типа A на тип B с хранением промежуточного результата в аккумуляторе типа B
- по этому этой операции нет в Spark RDD
def fold(z: A)(f: (A, A)=> A): A 
- может параллелиться, т.к. не делает преобразования типа и промежуточные результаты могут использовать свой собственный аккумулятор
- т.к. не меняет тип коллекции, то сумму можно посчитать только для числового массива

reduce - тоже самое, что fold, но без начального значения z
aggregate[B](z: => B)(seqop: (B, A)=> B, combop: (B, B) => B): B
- может менять тип и параллелиться за счет 2 операций (допустим посчитать число мелких букв - массив букв типа A, результат кол-во типа B)
-- seqop - агрегация в параллельных потоках с индивидуальным аккумулятором типа B
-- combop - агрегация промежуточных аккумуляторов в итог
++ aggregate - предпочительней filter+count, т.к. сжимает комплексные типы на входе внутри таски к простым аккумуляторам на выходе
-- reduce / fold - принимают 2 элемента и возвращает 1 элемент того же типа
-- aggregate - похоже на reduce, но используется, если нужно вернуть значение другого типа

RDD Key-Value

Сигнатура KV RDD:
RDD[(K, V)]
Создание KV RDD
где ключ - первое слово, значение = сама строка:
val pairs = lines.map(x => (x.split(" ")(0), х))
Действия над KV RDD
filter{case (key, value) => value.length < 20} //фильтр по значению
groupByKey(): RDD[(K, Iterable[V])] //группировка по ключу: ключ-массив значений
reduceByKey(func: (V, V) => V): RDD[(K, V)] //группировка по ключу с выполнение функции func над значениями: ключ - 1 итог
join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] //join двух rdd по ключу
mapValues [U] ( f: V => U) : RDD [(K, U)] //выполнение функции f над значениями RDD -- краткая запись для rdd.map { case (x, y): (x, func(y))} 
countByKey(): Map[K, Long] //подсчет кол-ва элементов в группе
keys: RDD[K] //получить ключи
- map - работает значительно медленней mapValues, т.к. сбрасывает partioner и вызывает shuffle (т.к. map может менять как ключи, так и значения)
- partitionBy перед groupBy/Join может убрать shuffle, т.к. данные уже будут разбиты по серверам на части (аналогично groupBy убирает shuffle у последующего join)
- reduceByKey - быстрей groupByKey, т.к. весь массив данных ключа не передается по сети

Пример - вычисление среднего
rdd.mapValues(x => (х, 1)) //RDD -> KV RDD
.reduceByKey((х, у) => (x._l + у._1, х._2 + у._2)) //схлопывание значений в ключе: ключ=>(сумма, число элементов)

- combineByKey - аналог aggregate для KV RDD (позволяет сменить тип выходного значения, оставляя параллельность)
пример вычисления среднего:
val result = input.combineByKey(
(v) => (v, 1), //createCombiner - если ключа не было - создаем (число значения, 1)
(асс: (Int, Int), v) => (асс._1 + v, асс._2 + 1), //mergeValues - если ключ есть в этом блоке, то - (число значние + новое значние, число потовторов + 1)
(accl: (Int, Int), асс2: (Int, Int)) => (accl._1 + асс2._1, accl._2 + асс2._2) //mergeCombiners - объединяем потоки
) 
.map{ case (key, value) => (key, value._1 / value._2.toFloat) //проходимся по полученному массиву: значение / число повторов
result.collectAsMap() .map(println(_)) //collectAsMap - преобразование RDD в локальный ассоциативный массив + выводим на экран

DataFrame и Spark SQL

DataFrame - это Rdd + Схема данных

Отличия
RDD
не оптимизируется Spark, для него это blob объект из байт о содержимом которого он ничего не знает.
Для обработки используются самописанные анонимные функции, которые выглядят черным ящиком:
ds.filter ( p => p.city == "Boston" ) //high order функция не может быть перенесена на уровень хранилища и применена над блоками parquet
RDD[T] - параметризирован классом T и проверяет типы класса при компиляции

DataFrame
добавляет информацию о схеме, что дает возможность оптимизации.
Над данными производится ограниченный набор табличных преобразований, которые могут быть оптимизированы и проброшены на уровень хранилища
(вместо reduce/group/map - sql like функции join, group, sort, join, aggregate )
ds.filter ( $"city" === "Boston" ) //точное значение Boston может быть проброшено на уровень хранища и отбросить блоки данных без их чтения
DataFrame - не параметризируется пользовательским классом. Описание схемы происходит пользователем и проверятся на этапе выполнения.
type DataFrame = Dataset[Row] //именно для ROW задается число колонок и их схема

Преимущетсва DataFrame перед RDD
- Catalyst - оптимизации запроса: перестановки, преобразования, push предикатов, projection (читаем только что нужно) и т.д.
- Tungesten
-- данные в колоночном сжатом формате
-- векторные вычисления
-- формат сериализации значительно проще default java (быстрей передача по сети и сериализация и десериализация:
* java serializer: header info, hashcode, Unicode info, etc. Строка “abcd” занимает 48 байт, вместо 4
* Tungesten serializer хранит только смещения до значений (ofset) и размер данных (fiel length)

-- off-heap хранение данных за пределами heap ( нет влияния GC)
-- специализированные encoder для сжатых колоночных данных (Parquet, orc и т.д.)
-- знание схемы позволяет компактнее и хранить и бытрей извелкать данные

Создание Dataframe из RDD
- преобразование KV RDD в DataFrame требует описания названия колонок , т.к. они хранятся как ._N
val tupleRDD = ... // Assume RDD[(Int, String String, String)]
val tupleDF = tupleRDD.toDF("COL1", ... , "COLN") 
- если в RDD находится case class, то названия берутся из него
case class Person(id: Int, name: String, city: String)
val peopleRDD = ... // Assume RDD[Person]
val peopleDF = peopleRDD.toDF
- создание DF из сырых данных (пример Json) с точным описанием схемы
val schema = StructType(
    Array(
        StructField("name", StringType, nullable = true),
        StructField("age", StringType, nullable = true),
        )
    )
// сначала строки RDD нужно привести к типу ROW
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
//потом можно навешивать схему
val peopleDF = spark.createDataFrame(rowRDD, schema)
- создание DF из структурированного источника Hive Table или Jdbc не требует описания схемы и типов (все берется из источника)
spark.read.table("rdw.pos_rec_itm")

Функции над DF
- sql like функции join, group, sort, join, aggregate и т.д.
- аналоги Pandas Dataframe:
fill(0) // заполнить null->0
fill(Map("minBalance" -> 0)) // заполнить null->0, только в minBalance
replace(Array("id"),Map(1234 -> 8923)) // поменять 1234 на 8923 в колонке ID
drop() // удалить строки, которые содержат null хотябы в 1 колонке
drop("all") // удалить полностью null строки
drop(Array("id", "name")) // удалить строки у которых в колонках id или name есть NULL

df.printSchema // описание DF

Spark SQL
- создать временную таблицу на основе DataFrame. Таблицу можно будет исползовать в SQL только в этом же контексте
DF.registerTempTable("tempTable") 
- кэширование таблицы через SQL
val c = spark.sql("CACHE TABLE rdw.bi0_pplant") 
- кастомные функции для Spark SQL:
registerFunction("strLenScala", (_: String) .length)
val tweetLength = spark.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10")
- Пример-сравнение dataframe api и Spark SQL
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val sales = spark.read.table("rdw.pos_rec_itm").where($"CALDAY".between("20200618", "20200619") && $"RT_PAYDIR" =!= "2" && $"/BIC/CLIENT" =!= " ")
val promo = spark.read.table("rdw.bi0_prt_promo").where($"rt_promoth" === "Z010" && $"rt_promo" =!= " ").select("rt_promo").groupBy("rt_promo").max()
val kso = spark.read.table("bw.kso").select("plant", "zposno").groupBy("plant", "zposno").max()

val sjoin =
sales.
join(broadcast(promo), sales("rt_promo") === promo("rt_promo"), "left").
join(broadcast(kso), sales("plant") === kso("plant") && sales("/bic/zposno") === kso("zposno"), "left").
select( 
concat( $"calday", lit("_"), $"time", lit("_"), $"/bic/crecnum", lit("_"), $"doc_num", lit("_"), $"/bic/zposno", lit("_"), sales("plant")).as("check_id") ,
$"/bic/client".cast(IntegerType).as("client_id"),
sales("plant").cast(IntegerType).as("plant"),
$"material".cast(IntegerType).as("material"),
$"time".substr(1,2).cast(IntegerType).as("hh"),
when(sales("RT_PROMO") =!= " ", 1).otherwise(0).as("is_promo"),
when(promo("rt_promo").isNotNull, 1).otherwise(0).as("is_crazy"),
when(kso("plant").isNotNull, 1).otherwise(0).as("is_kso"),
when($"/bic/zinstype" === "08" || $"/bic/zinstype" === "09", 1).otherwise(0).as("is_scan"),
( $"cpsaexcubu" * when($"/bic/zinstype" === "G", 0.002).when($"/bic/zinstype" === "KG", 0.5).otherwise(1) ).as("qnt"),
$"rtsaexcust".as("amount"),
$"/bic/ZRT_PREDR".as("amount_disc"),
count("*").over( Window.partitionBy($"/bic/client") ).as("cnt_lines_by_client"),
$"calday"
)
Может быть описано обычным SQL
select
concat(i.calday, i.time, '_', i.`bic_crecnum`, '_', i.doc_num, '_', i.`bic_zposno`, '_', i.plant )  as check_id ,
case when i.`bic_client` IN( 'null', ' ') or length(i.`bic_client`) = 0 then null else i.`bic_client` end as client_id,
cast(i.plant as int) as plant,
cast(i.material as int) as material,
cast(SUBSTR(i.time,1,2) as int) as hh, 
case when i.`RT_PROMO` IN( 'null', ' ') or length(i.`RT_PROMO`) = 0 then 0 else 1 end as is_promo,
case when p.rt_promo IS NOT NULL then 1 else 0 end as is_crazy,
case when ks.plant IS NOT NULL then 1 else 0 end as is_kso,
case when i.`bic_zinstype` in ('08', '09') then 1 else 0 end as is_scan,
case when i.base_uom = 'G' then 0.002 when i.base_uom = 'KG' then 0.5 else 1 end * i.cpsaexcubu as qnt,
i.rtsaexcust as amount,
i.`bic_ZRT_PREDR` as amount_disc,
i.calday,
COUNT(*) OVER(partition by i.`bic_client`) as cnt_lines_by_client
from rdw.pos_rec_itm i
left join (select rt_promo from rdw.bi0_prt_promo p  WHERE p.rt_promoth = 'Z010' AND p.rt_promo <> ' ' group by rt_promo) p on p.rt_promo = i.rt_promo
left join (select plant, zposno from bw.kso group by plant, zposno) ks on ks.plant = i.plant AND ks.zposno = i.`bic_zposno`
where 
i.`bic_client` <> ' '
AND i.RT_PAYDIR != '2 '
and i.calday BETWEEN '20200616' AND '20200617'

DataSet

параметризированный DF, который объединяет функциональный подход RDD и Sql-like dataframe, но добавляет проверку типов данных на этапе компиляции
listingsDS.groupByKey(l => l.zip) //от RDD
.agg(avg($"price") .as[Double]) //от dataframe но с указанием типа колонки для type safe

Преобразование DF в DS
- неявно
import spark.implicits._ 
myDF.toDS //для DF или RDD
- явно указывая типы данных для схемы
val myDS = spark.read.json("people.json").as[Person] 

DF.map(r => TimeUsageRow(r.getAs("working"), r.getAs("sex"), r.getAs("age"), r.getAs("primaryNeeds"), r.getAs("work"), r.getAs("other")))

Функции над DS
DS имеет набор функции как у RDD, но groupByKey возвращает KeyValueGroupedDataset , который преобразуется обратно в DS с помощью reduceGroups или agg
groupByKey[K](f: T => K): KeyValueGroupedDataset[K, T] 
функции над KeyValueGroupedDataset:
reduceGroups(f: (V, V) => V): Dataset[(K, V)] 
agg[U](col: TypedColumn[V, U]): Dataset[(K, U)] 

mapGroups[U](f: (K, Iterator[V]) => U): Dataset[U] 

Пример:
- kv RDD:
rdd.reduceByKey(_+_)
- DS:
import org.apache.spark.sql.expressions.scalalang.typed
ds.
  groupByKey(r => r.age). //KeyValueGroupedDataset
  agg( 
    typed.sum[TimUsageRow](_.work).as(Encoders.DOUBLE),
  ).
  map(k => TimeUsageRow(k._1, k._2)).
  sort("age")
Замен mapGroups всегда желательней использовать agg
- reduceGroups - параллелится, но нет возможности сменить тип данных в массиве ключа
- agg - поддерживает ограниченное число типов данных и операций агрегации
Если нужна своя, то можно реализовать кастомный агрегатор, который будет параллелиться и не будет приводить к лишним сериализациям/десериализациям на уровне строки (только на уровне партиции):
val strConcat = new Aggregator[(Int, String), String , String]{
    def zero : String= ""
    def reduce(b: String , a : (Int, String) ) : String = b + a._2
    def merge(bl : String , b2 : String) : String = bl + b2
    def finish(r : String) : String = r
    override def bufferEncoder : Encoder[BUF] = Encoders.STRING //кастомные энкодеры для Tungesten
    override def outputEncoder : Encoder[OUT] = Encoders.STRING
}.toColumn
keyValuesDS.groupByKey(pair => pair._1)
.agg(strConcat.as[String] )

Управление распределением данных

- repartition(N) - перераспределение данных в кластере на N новых партиций
- coalesce(N) - операция уменьшает число блоков, склеивая мелкие в более большие на 1 ноде
- rdd.partitions.size() - число partition

Задание способа партицирования
- Хэш партицирование
import org.apache.spark.HashPartitioner
val userData = sc.sequenceFile[UserID, Userinfo] ("hdfs:// ... ")
.partitionBy(new HashPartioner(100)) // распределяем файл в кластере по 100 хэш партициям
.persist()

- range партицирование - разделение RDD на 10 равных последовательных частей
counts.partitionBy(new RangePartitioner(10,counts))
range партицирование происходит при сортировке, чтобы данные были по порядку, а не случайно, как в hash

- свой тип партицирования:
class DomainNamePartitioner(numParts: Int) extends Partitioner {
 override def numPartitions: Int = numParts //число партиций
 override def getPartition(key: Any): Int = { //получить номер партиции по ключу
  val domain = new Java.net.URL(key.toString) .getHost() //в данном случае: хэш код от хоста url
  val code = (domain.hashCode % numPartitions)
  if (code < 0) (
   code + numPartitions // Сделать неотрицательным
  else
   code
 }
 // Jаvа-метод equals для сравнения объектов Partitioner
 //для сравнения 2 RDD партицированных одним способом
 override def equals(other: Any): Boolean = other match {
  case dnp: DomainNamePartitioner =>
   dnp.numPartitions == numPartitions
  case _ =>
   false
 }
}

- map - создает новый RDD, который не наследует принцип партицирования (нужно исопльзовать mapValues)
- pairs.partitioner - определение метода партицирования

Shuffle

Распределение данные неразрывно связано с операцией Shuffle.
Операции join, groupby, repartition и другие требуют объединения одинаковых данных ключа в одном worker.
Для этого на стадии map данные предагрегируются и сохраняются на диск, т.е. фактически операция checkpoint. Это необходимо, чтобы в случае падения программы на следующих шагах не нужно было пересчитывать все с самого начала, а достаточно было взять данные из фс с последней операции shuffle.
На стадии reduce worker по сети обращается к всем предыдущим worker и считывает сохраненные данные с диска.
Эта архитектура имеет ряд недостатков:
* требуется пересчет при полной потере worker
* повышенная нагрузка к локальной фс worker, когда следующие шаги считывают данные
* необходимо держать worker, даже если он больше не нужен
По этому в будущем, скорей всего, для shuffle будут использоваться внешние Shuffle сервисы (подробней обсуждение по ссылке)

Дополнительные возможности

Аккумулярторы
max/filter может использовать в своем коде внешние переменные, но они будут локальными для конкретного воркера
Чтобы использовать переменные видимые в драйвере и воркере, нужно брать accumulator
val blankLines = sc.accumulator(0)
val callSigns = file.flatMap (line =>
 if (line == '"') {
  blankLines += 1 // Увеличить значение в аккумуляторе
 line.split(" ")
})
-- значение акк. можно будет получить только после действия с RDD (flatmap - преобразование не вызываемое сразу)
-- точность значения аккумулятора не гарантируется, он может быть больше, если контейнер рестратрует или если RDD несколько раз сохраняется на диск, а потом обратно кэшируется в память (по этому чаще используется для статистики или отладки)
-- можно передавать небольшие значения: Int, Double, Long, Float
-- собственный акк. можно создать отнаследовавшись от AccumulatorParam

Широковещательные переменные
предназначены для передачи большого объема данных, но только для чтения
Пример использования - левая таблица для HJ или для lookup:
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
 val country = lookupinArray (sign, signPrefixes. value)
 (country, count)
} . reduceByKey ( (х, у) => х + у)
signPrefixes - можно изменить в воркере, но изменение будет видно только там

mapPartitions
- Единичная инициализация ресурсов через mapPartitions для конкретного воркера:
val contactsContactLists = validSigns.distinct() .mapPartitions{
signs =>
    val mapper = createMapper() //единичная инициализация для воркера, а не для каждого элемента
    val client = new HttpClient()
    client.start ()
    // создать http-зaпpoc
    signs.map {sign => createExchangeForSign(sign)} //поэлементная обработка
        .map{ case (sign, exchange) => // извлечь ответы
            (sign, readExchangeCallLog(mapper, exchange))).filter(x => х._2 != null) // Удалить пустые записи
        }
}

Взаимодействие с внешними программами
val distScript = "./src/R/finddistance.R"
val distScriptName = "finddistance.R"
sc.addFile(distScript)
val distances = contactsContactLists.values.flatMap(x =>
    x.map(y => s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")) //передать параметры
.pipe(Seq(SparkFiles.get(distScriptName))) //вызывать скрипт через pipe
Все вызванные скрипты сохраняются в SparkFiles.getRootDirectory - важно давать разные имена

Spark Streaming

Подключение к Kafka с помощью Spark Structure Streaming было описано раньше в блоге

- каждые 5-6 сообщений происходит checkpoint - запись данных в hdfs
принудительный вызов:
ssc.checkpoint( "hdfs:// ... ")
Преобразования
- без сохранения состояния (отдельно над каждой новой порцией данных):
-- map/filter/reduce/group/join - применяются над каждым RDD порции данных.
применение будет над окном заданном в StreamingContext
-- transform - применение функции над каждым RDD из окна:
val outlierDStream = accessLogsDStream.transform { rdd =>
 extractOutliers(rdd)
}
- с сохранением состояния (при вычислении текущего окна используются данные предыдущих):
-- Оконные функции - например, нужно посчитать среднее за 30 секунд (размер окна - кратно шагу), при частоте обновления = 10 (шаг перемещения окна)
Кол-во значения в скользящем окне:
val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10))
val windowCounts = accessLogsWindow.count()
-- Reduce на окне:
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getipAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
( (х, у) => х + у), // Добавить элементы из новых пакетов в окне
( (х, у) => х - у), // Удалить элементы из пакетов, покинувших окно
Seconds(30), // Размер окна
Seconds(10)) // Шаг перемешения окна
также есть: countByValueAndWindow и countByWindow

Настройка и отладка

- input.toDebugString - посмотреть ациклический граф получения RDD
counts.toDebugString
(2) ShuffledRDD [ 296] at reduceByKey at <console>: 17
+-(2) MappedRDD[295] at rnap at <console>:17
 | FilteredRDD [ 294 ] at filter at <console>: 15
 | MappedRDD[293] at rnap at <console>:15
 | input.text MappedRDD[292] at textFile at <console>:13
 | input.text HadoopRDD[291] at textFile at <console>:13
Web интерфейс мониторинга
- Stages - основная информация о работе шага
-- сравнить min с max - определить наличие перекоса (нужен другой repartition или coalesce маленьких партций)
-- сравнить время работы task в временем GC - возможно нехватает памяти
-- также если большой spill на диск - надо увеличить память или выдать больше партиций
- storage в web мониторе показывает список закешированных RDD
на каких хостах он хранится и какое рапредление озу/диск
- executors - показывает всех исполнителей и сколько ресурсов они себе забрали
тут же можно посмотреть Thread Dump - стэктрейс работы программы
на основе этого можно делать сэмплирование, чтобы определить наиболее медленные части или определить проблемный стек при зависании
- в environment - Хранятся общие настройки
- также лог выполнения можно посмотреть через yarn
для уже завершенного: yarn logs -applicationid <арр ID>

- просмотр плана выполнения Catalyst
spark.sql("select plant, SUM(rtsaexcust) rtsaexcust from tst where plant = '002' group by plant order by plant").explain(true)

== Physical Plan == 
Sort [plant#160 ASC NULLS FIRST], true, 0 
+- Exchange rangepartitioning(plant#160 ASC NULLS FIRST, 4), [id=#110] 
 +- *(1) HashAggregate(keys=[plant#160], functions=[sum(rtsaexcust#185)]) 
  +- *(1) Project [plant#160, rtsaexcust#185] 
   +- *(1) Filter (isnotnull(plant#160) && (plant#160 = 002)) 
    +- *(1) FileScan parquet tst[plant#160,rtsaexcust#185,calday#206] Batched: true, DataFilters: [isnotnull(plant#160), (plant#160 = 002)], Format: Parquet, 
Location: CatalogFileIndex[dbfs:/mnt/exfs/warehouse/tst], 
PartitionCount: 3, PartitionFilters: [], 
PushedFilters: [IsNotNull(plant), EqualTo(plant,002)], ReadSchema: struct<plant:string,rtsaexcust:decimal(17,2)>, 
SelectedBucketsCount: 1 out of 4
план представляет из себя дерево: литералов (константы), атрибутов (переменные), поддеревьев содержащих рекурсивную функцию
- происходит несколько трансформаций, для снижения стоимости плана: схлопывание литералов, filter pushdown, projection pushdown (column pruning - исключение колонок)
На этом этапе можно определить, что вероятно план был сгенерирован неверно.

- Для мониторинга java приложений есть отдельная статья в блоге.

Оптимизация

Число воркеров при чтении с диска
Число воркеров зависит от числа блоков родительского RDD
т.е. все еще начинается с файлов в hdfs, если файл маленький, то большой параллельности по умолчанию не будет
Для решения этой проблемы есть 2 подхода:
- при записи сделать repartition на большее число партиций
- увеличить число файлов с помощью настройки
dw.write.option("maxRecordsPerFile", N")
При чтении задать maxPartitionBytes, который разобьет блоки на партиции нужного размера
spark.sql.files.maxPartitionBytes = bytes
Уточненная формула на основании исходного кода Spark
val sumBytes = files.map(_.getLen + spark.sql.files.openCostInBytes).sum
val bytesPerCore = sumBytes / (spark.sql.files.minPartitionNum или spark.sql.leafNodeDefaultParallelism)
sumBytes / min(spark.sql.files.maxPartitionBytes, max(spark.sql.files.openCostInBytes, bytesPerCore))
т.е. к размеру файла (sumBytes) добавляется минимальная стоимость его открытия (spark.sql.files.openCostInBytes = 4МБ)
число партиций не может быть меньше spark.sql.files.minPartitionNum или spark.sql.leafNodeDefaultParallelism
- после фильтрации RDD число воркеров наследуется от родителя, несмотря на уменьшение объема
из-за этого есть смысл делать coalesce - чтобы уменьшить число пустых блоков

Число воркеров на фазе shuffle
- Чаще всего рекомендуется, чтобы размер данных не превышал 200МБ
Пример: размер входных данных в shuffle = 210ГБ
Число shuffle partition = 210000MB / 200MB = 1050
Но если достпно большее число ресурсов, то ставим доступный максимум
spark.conf.set("spark.sql.shuffle.partitions", 1050)

Признаки недостаточности числа worker или ОЗУ для partition
- Shuffle spill (disk) - сохранение сериализованных данных на диск
- Большая доля GC time

Проблема перекоса данных
Можно определить по Min / Max времени работы тасков в рамках одного job (будут значительно отличаться) - В databricks можно решить автоматически хинтом:
df.join(
    skewDF.hint("SKEW", "skewKey", List("0", "9999") ), Seq("keyCol"), "inner")
)
- дополнительный генеренный столбец, который добавляется в соединение/группировку и разбрасывает данные одинаково равномерно оба DF.
- так же можно сделать coalesce, чтобы объединить мелкие партиции в более крупные

Почему может быть OOM, если в Spark есть механизм Spill?
Проверка необходимости Spill происходит с помощью сэмплирования данных.
Если в данных есть перекос, то метод сэмплирования может дать ошибку и привести к OOM из-за неверной оценки необходимой памяти под процесс и Spark не успеет выполнить Spill.

Типы join
sort-merge join
алгоритм поумолчанию
На стадии чтения происходит сортировка и группировка данных, стадия shuffle объединяет данные в партиции с общим ключем, тут же делается слияние сортированных данных за линейное время.
Запрос может быть выполнен без стадии shuffle 2 способами (map-side join):
- Одинаково партицировать оба DF
даст преимуществно только при повторных выполнениях, т.к. не сохраняется физически на диск.
- Одинаково кластеризовать обе таблицы источники для DF
SET spark.sql.shuffle.partitions = 4;
create table tst_clust
using parquet
PARTITIONED BY (calday)
CLUSTERED BY(plant) SORTED BY (plant) INTO 4 BUCKETS
as 
select * from tst;
Особенности:
-- каждый воркер пишет свой набор файлов бакетов, по этому общее число файлов = число воркеров * бакетов (но, т.к. нет объеднения файлов, то нет шафла)
-- как следствие при выполнении join нужна дополнительная сортировка, которая сольет файлы в 1 бакет (но без шафла!)
-- не поддерживается Join с union all, даже если одинаковое число бакетов
-- не совместимо с hive (другой алгорит хэширования: murmur3 продив hivehash, в hive бакеты сливаются в 1 при вставке - требует шафла)

Broadcast join
Второй по популярности тип соединения.
Левая таблица должна быть меньше 10МБ
spark.sql.autoBroadcatsJoinThreshold = def 10MB -- создается на драйвере и рассылается во все executor
За счет Broadcats join выполнится без стадии shuffle
- Возможная будущая оптимизация:
broadcast hash join executor side, чуть эффективней, чем работать через драйвер:
-- через драйвер: K/N данных отправить с N exec, K данных отправить на N exec == K/N * N + K*N = K * N + K
-- обменяться данными между exec: K/N Данных отправить на N-1 exec и так на N exec == K/N *(N - 1) * N = K * N - K
т.е. на 2 полные пересылки эффективней и данные пересылаются между executor, а не все через diver (может стать бутылочным горлышком)

shuffle hash join
для запросов только через равно
одна таблица меньше другой: минимум в 3 раза
достаточно памяти для шафлинга партиции хэш массива
практически не используется из-за более общего алгоритма sort-merge join без ограничение HJ
Принудительно можно активировать хинтом и 2 настройками:
set spark.sql.autoBroadcastJoinThreshold = -1;
set spark.sql.join.preferSortMergeJoin = false;
select /*+ SHUFFLE_HASH(m, i) */ m.rpa_wgh3, SUM(rtsaexcust) rtsaexcust
from rdw.pos_rec_itm i
join rdw.bi0_pmaterial m on m.material = i.material
where i.calday between '20201005' and '20201010'
group by m.rpa_wgh3
;

*(4) HashAggregate(keys=[rpa_wgh3#856], functions=[finalmerge_sum(merge sum#947) AS sum(rtsaexcust#30)#943], output=[rpa_wgh3#856, rtsaexcust#838])
+- Exchange hashpartitioning(rpa_wgh3#856, 200), true, [id=#1590]
   +- *(3) HashAggregate(keys=[rpa_wgh3#856], functions=[partial_sum(rtsaexcust#30) AS sum#947], output=[rpa_wgh3#856, sum#947])
      +- *(3) Project [rtsaexcust#30, rpa_wgh3#856]
         +- ShuffledHashJoin [material#20], [material#839], Inner, BuildRight, false
            :- Exchange hashpartitioning(material#20, 200), true, [id=#1578]
            :  +- *(1) Project [material#20, rtsaexcust#30]
            :     +- *(1) Filter isnotnull(material#20)
            :        +- *(1) ColumnarToRow
            :           +- FileScan orc rdw.pos_rec_itm[material#20,rtsaexcust#30,calday#51] Batched: true, DataFilters: [isnotnull(material#20)], Format: ORC, Location: InMemoryFileIndex[dbfs:/mnt/exfs/warehouse/rdw.db/pos_rec_itm/calday=20201005, dbfs:/mnt/exfs/war..., PartitionFilters: [isnotnull(calday#51), (calday#51 >= 20201005), (calday#51 <= 20201010)], PushedFilters: [IsNotNull(material)], ReadSchema: struct<material:string,rtsaexcust:decimal(17,2)>
            +- Exchange hashpartitioning(material#839, 200), true, [id=#1584]
               +- *(2) Project [material#839, rpa_wgh3#856]
                  +- *(2) Filter isnotnull(material#839)
                     +- *(2) ColumnarToRow
                        +- FileScan orc rdw.bi0_pmaterial[material#839,rpa_wgh3#856] Batched: true, DataFilters: [isnotnull(material#839)], Format: ORC, Location: InMemoryFileIndex[dbfs:/mnt/exfs/warehouse/rdw.db/bi0_pmaterial], PartitionFilters: [], PushedFilters: [IsNotNull(material)], ReadSchema: struct<material:string,rpa_wgh3:string>
На моих замерах такой запрос выполнился на 10% медленней, чем sort merge join. Что странно, т.к. при достаточности памяти HJ должен быть дешевле SMJ из-за отсутствия сортировки.

broadcast NL
почти не используется, но наиболее универаслен, т.к. принимает любое условие соединения
для соединений, где для усвловий отличных от = (обычно NOT EXISTS с условием <>)

Range join
Соединение таблицы по диапазону between:
SELECT * FROM r1 JOIN r2 ON r1.start < r2.end AND r2.start < r1.end
поддерживается только databricks через bin join
интервальный справочник разбивает на подинтервалы (bin) , факт шафлится по bin и выполняется за счет этого в несколько потоков на части данных (bin)
шаг bin задается хинтом: /*+ RANGE_JOIN(points, 10) */

Оптимизация функций
- map не сохраняют партицирование (preservePartition = false), т.к. может создавать/изменять как значения, так и ключи
если точно знаем, что ключи не меняются, то можно указать preservePartition = true или использовать замен mapValues
- если нужно использовать большой объект в функции, то лучше не делать замыкание - объявлять глобальную переменную в драйвере и использовать ее во внутренней функции.
Т.к. spark в этом случае сереализует эту переменную и разошлет всем executor.
Лучше создать broadcast переменную, которая разошлется только по нодам.
- Если нужна построчная обработка, то нужно использовать mapPartitions, чтобы уменьшить число смен контекста между партциями
rdd.mapPartitions(rows => rows.foreach(r => {some logic on row} )
- если 100% точность не нужна, можно использовать апроксимационные функции:
--функция                               результат время(с)
count(distinct check_id)                --141490 24.599
approx_count_distinct(check_id,0.01)    --139874 19.795
approx_count_distinct(check_id)         --136133 16.956
approx_count_distinct(check_id,0.001)   --141490 17.786
- если в коде есть большие запросы (pivot в ML), то лучше увеличить perm size для JVM
- для heap меньше 32 ГБ следует использовать 4 байтовые указатели - UseCompressedOOps
Увеличение heap до 32ГБ активирует ипользование 64битных указателей, что увеличивает потребление памяти на несколько ГБ.
Так что выделть нужно сразу больше на несколько ГБ или пытаться уложиться в 32.
- дополнительные опции при создании таблицы через JDBC
.option("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Spark SQL test table'")
- быстрая генерация сурогатных ключей
* monotonicali_increasing_id - поддерживается sql/df, но получаются очень большие числа (занимают много места)
* md5 - 128 бит - по всем полям таблицы - могут быть коллизии
* row_number - выполняется на дарйвере (не параллелится) , если сделать parittion by, то будет параллелится, но будет большой шафл
* zipwithindex - подходит только для rdd (нужно преобразовывать df в rdd, выполнять и обратно пересоздать df) - наиболее быстрый вариант

Уменьшение Shuffle
- Один из способов уменьшить Shuffle, это выполнить агрегацию данные как можно раньше на стадии map.
К примеру запрос поумолчанию:
select  m.rpa_wgh3, SUM(rtsaexcust) rtsaexcust
from rdw.pos_rec_itm i
join rdw.bi0_pmaterial m on m.material = i.material
where i.calday between '20201005' and '20201010'
group by m.rpa_wgh3
Выполняется 1,8 минут и вызывает 2,7ГБ shuffle по сети (Exchange data size total (min, med, max): 2.7 GB (89.2 MB, 105.5 MB, 164.9 MB)

Но если выполнить агрегации фактовой таблицы как можно раньше, то объем данных для shuffle получится сократить еще на этапе локального map
select  m.rpa_wgh3, SUM(rtsaexcust) rtsaexcust
from (select material, SUM(rtsaexcust) as rtsaexcust from rdw.pos_rec_itm i2 where i2.calday between '20201005' and '20201010' group by material) i
join rdw.bi0_pmaterial m on m.material = i.material
group by m.rpa_wgh3
Подзапрос с группировокой "group by material" агрегируют данные там где они находятся и отправит в shuffle значительно меньший объем = 87.4 MB (Exchange data size total (min, med, max): 87.4 MB (3.4 MB, 3.6 MB, 3.9 MB)
Время запроса сокращается более чем в 3 раза до 34 секунд.
Данный прием имеет мало пользы в обычных (не кластерных) бд, т.к. там нет проблем с передачей данных по сети между параллельными процессами.

- count distinct 1 из операций, которая порождает большой shuffle из-за передачи хэш массива уникальных значений из map в reduce
Если count distinct считается на уникальной колонке, то сжатия данных при отправке в reduce вообще не произойдет.
По этому для расчета уникальных значений иногда лучше сделать все в рамках 1 ноды, но с большим объемом озу, чем на нескольких небольших машинах:
Запуск на 1 машине с 7 ядрами и 28ГБ - 27 секунд выполнения:
$ spark-shell --master yarn --driver-memory 8G --executor-memory 28G --num-executors 1 --executor-cores 7
val df = spark.sql("select check_id, client_id, plant, material, calday from rdw.pos_rec_itm where calday between '20200901' and '20201101'").persist
df.count()
df.agg(countDistinct($"client_id")).show() //27 sec
Запуск на 7 машинах с выделенным 1 ядром и 4ГБ - 45 секунд (почти в 2 раза дольше за счет низкой локальности выполнения count distinct - большого shuffle):
spark-shell --master yarn --driver-memory 8G --executor-memory 4G --num-executors 7
val df = spark.sql("select check_id, client_id, plant, material, calday from rdw.pos_rec_itm where calday between '20200901' and '20201101'").persist
df.count()
df.agg(countDistinct($"client_id")).show() //45s
План выполнения показыает, что через сеть было отправлено 1532.3 MB
map:
aggregate time total (min, med, max): 2.3 m (290 ms, 512 ms, 1.6 s)
peak memory total (min, med, max): 17.4 GB (72.0 MB, 72.0 MB, 72.0 MB)
number of output rows: 50,210,554
shuffle:
Exchange data size total (min, med, max): 1532.3 MB (4.9 MB, 6.1 MB, 8.0 MB)

- Также одним из best practics является выделение большего числа CPU на 1 worker - это должно уменьшить число Shuffle операций и overhead на подъем JVM в каждом worker

Уменьшение размера данных
- Выбирайте или подготавливайте наименьший необходимый тип данных под данные, если планируете их несколько раз переиспользовать в запросах.
Лучше 1 раз пронумеровать данные последовательностью Int строковые ключи, чем потом выполнять group/distinct на длинных строках исходных данных
- Если позволяет точность, то используйте встроенные типы данных с плавающей запятой double и float, вместо decimal.
Т.к. decimal программный тип данных и не может быть выполнен математическим сопроцессором CPU
Сравним скорость работы одни их тех же данных в decimal:
select sum(amount), AVG(amount/qnt) as price from tmp.pos_dec;
18316088570.13  78.15631680131521235406
Time taken: 112.448 seconds
и в double
select cast(sum(amount) as decimal(18,2)), AVG(amount/qnt) as price from tmp.pos_dbl;

18316088570.1   78.15631680091742
Time taken: 14.531 seconds, Fetched: 1 row(s)
Скорость несколько раз в лучше, если нет никаких дополнительных действий.
Но стоит помнить о неточностях при преобразовании чисел в двоичной системе счисления (double, float) в десятичную, т.к. плавающая часть данных после запятой приводится к десятичной посредством дробей:
0 или 1 / 2 ^ позиция в знаменателе
что может дать недопустимую погрешность при необходимости точных расчетов

Оптимизации в Spark 3
адптивные запросы
spark.sql.adaptive.enabled = true
- автоматическое опредление кол-ва shuffle partition
- автоматический coalesce мелких shuffle part в большие
- смена типа соединения sort merge - broadcast
Другие оптимизации:
- dynmic partition pruning - broadcast таблицы справочника и динамичекое определние нужных партиций фактовой таблицы
- определение перкоса - coalesce для мелких перекошенных партиций в большие или наоборот разбитие больших на мелкие
- push предикатов на уровень хранилища для parquet
- Catalyst - разбивает длинные методы на несколько мелких (set SplitAggregateFunc.enabled = true)

Типы планировщиков Spark
- Fair Scheduller
можно задать лимит числа контейнеров для параллельных потоков внутри 1 submit
- spark.dynamicAllocation.enabled - динамическое определение ресурсов для onprem Spark в зависимости от числа партиций

1 комментарий:

  1. Отличная статья! Много ценной информации. Спасибо, Алексей

    ОтветитьУдалить