пятница, 21 августа 2020 г.

BigData анализ с помощью Spark и Scala

В этой статье я хотел бы охватить основные аспекты работы с фреймворком Spark
Общее описание архитектуры Spark было ранее тут: /search/label/bigdata#spark

RDD

Виды операций
- narrow (слабые) преобразования - создание нового RDD на основе текущего с выполнением некоторых операций
все narrow операции выполняются в 1 stage, т.к. данные не надо никуда перекачивать (обработка на месте)
- wide (сильная) действия - вычисления результата, данные 1 блока RDD переносятся в несколько других блоков (reduce, order, group, join)
wide операция создает новый stage в плане, т.к. требует shuffling - сохранение промежуточных данных и пересылку на другие сервера (партиции)
выполнение преобразований откладывается и оптимизируется до первого действия
(т.е. нагляднее сделать несколько map/filter, чем 1 громоздкий)

- если RDD сохранили в переменную, то при каждом вызове действия над ней все преобразования будут вычисляться заново
.persist / .cache - если нужно сохранить результат преобразований
-- можно задать место хранения, к примеру: (StorageLevel.DISK_ONLY) / MEMORY_ONLY/ SER - с сериализацией объекта
-- если задали MEMORY_ONLY, но после RDD был вытеснен, то при последующем обращении к нему ему нужно выполниться заново
если такая вероятность есть, то лучше сохранять с MEMORY_AND_DISK - тогда при повторном обращении блок будет сохранен и загружен с диска
-- unpersist - выгрузить переменную из памяти

Создание RDD
- из массива:
val lines = sc.parallelize(List("pandas", "i like pandas"))
- из текстового файла:
val lines = sc.textFile("/path/to/README.md")

Сохранение RDD
- .collect - в локальный массив или take(N элементов)/takeSample
- saveAsSequenceFile("path",Some(classOf[GzipCodec]) - в 1 текстовый файл
saveAsTextFile - в несколько файлов

- передача собственных функций
-- функция должна поддерживать сериализацию
def isMatch(s: String): Boolean = {
 s.contains("123")
}
file.filter(isMatch).first()

Основные операции
- input.map(x => х * х) - выполняется над каждым элементом и создает новый RDD с тем же числом преобразованных элементов
- flatMap - то же самое, но разворачивает элементы подмассивов в элементы 1 массива
- поддерживаются операции над множествами: distinct / union / intersection / substract (oracle minus) / cartesian
- sample - выборка случайных элементов
- foreach

Основные действия
def foldleft[B](z: B)(f: (B, A)=> B): B 
- не может быть распараллелена, т.к. последовательно агрегирует данные с типа A на тип B с хранением промежуточного результата в аккумуляторе типа B
- по этому этой операции нет в Spark RDD
def fold(z: A)(f: (A, A)=> A): A 
- может параллелиться, т.к. не делает преобразования типа и промежуточные результаты могут использовать свой собственный аккумулятор
- т.к. не меняет тип коллекции, то сумму можно посчитать только для числового массива

reduce - тоже самое, что fold, но без начального значения z
aggregate[B](z: => B)(seqop: (B, A)=> B, combop: (B, B) => B): B
- может менять тип и параллелиться за счет 2 операций (допустим посчитать число мелких букв - массив букв типа A, результат кол-во типа B)
-- seqop - агрегация в параллельных потоках с индивидуальным аккумулятором типа B
-- combop - агрегация промежуточных аккумуляторов в итог
++ aggregate - предпочительней filter+count, т.к. сжимает комплексные типы на входе внутри таски к простым аккумуляторам на выходе
-- reduce / fold - принимают 2 элемента и возвращает 1 элемент того же типа
-- aggregate - похоже на reduce, но используется, если нужно вернуть значение другого типа

RDD Key-Value

Сигнатура KV RDD:
RDD[(K, V)]
Создание KV RDD
где ключ - первое слово, значение = сама строка:
val pairs = lines.map(x => (x.split(" ")(0), х))
Действия над KV RDD
filter{case (key, value) => value.length < 20} //фильтр по значению
groupByKey(): RDD[(K, Iterable[V])] //группировка по ключу: ключ-массив значений
reduceByKey(func: (V, V) => V): RDD[(K, V)] //группировка по ключу с выполнение функции func над значениями: ключ - 1 итог
join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] //join двух rdd по ключу
mapValues [U] ( f: V => U) : RDD [(K, U)] //выполнение функции f над значениями RDD -- краткая запись для rdd.map { case (x, y): (x, func(y))} 
countByKey(): Map[K, Long] //подсчет кол-ва элементов в группе
keys: RDD[K] //получить ключи
- map - работает значительно медленней mapValues, т.к. сбрасывает partioner и вызывает shuffle (т.к. map может менять как ключи, так и значения)
- partitionBy перед groupBy/Join может убрать shuffle, т.к. данные уже будут разбиты по серверам на части (аналогично groupBy убирает shuffle у последующего join)
- reduceByKey - быстрей groupByKey, т.к. весь массив данных ключа не передается по сети

Пример - вычисление среднего
rdd.mapValues(x => (х, 1)) //RDD -> KV RDD
.reduceByKey((х, у) => (x._l + у._1, х._2 + у._2)) //схлопывание значений в ключе: ключ=>(сумма, число элементов)

- combineByKey - аналог aggregate для KV RDD (позволяет сменить тип выходного значения, оставляя параллельность)
пример вычисления среднего:
val result = input.combineByKey(
(v) => (v, 1), //createCombiner - если ключа не было - создаем (число значения, 1)
(асс: (Int, Int), v) => (асс._1 + v, асс._2 + 1), //mergeValues - если ключ есть в этом блоке, то - (число значние + новое значние, число потовторов + 1)
(accl: (Int, Int), асс2: (Int, Int)) => (accl._1 + асс2._1, accl._2 + асс2._2) //mergeCombiners - объединяем потоки
) 
.map{ case (key, value) => (key, value._1 / value._2.toFloat) //проходимся по полученному массиву: значение / число повторов
result.collectAsMap() .map(println(_)) //collectAsMap - преобразование RDD в локальный ассоциативный массив + выводим на экран

DataFrame и Spark SQL

DataFrame - это Rdd + Схема данных

Отличия
RDD
не оптимизируется Spark, для него это blob объект из байт о содержимом которого он ничего не знает.
Для обработки используются самописанные анонимные функции, которые выглядят черным ящиком:
ds.filter ( p => p.city == "Boston" ) //high order функция не может быть перенесена на уровень хранилища и применена над блоками parquet
RDD[T] - параметризирован классом T и проверяет типы класса при компиляции

DataFrame
добавляет информацию о схеме, что дает возможность оптимизации.
Над данными производится ограниченный набор табличных преобразований, которые могут быть оптимизированы и проброшены на уровень хранилища
(вместо reduce/group/map - sql like функции join, group, sort, join, aggregate )
ds.filter ( $"city" === "Boston" ) //точное значение Boston может быть проброшено на уровень хранища и отбросить блоки данных без их чтения
DataFrame - не параметризируется пользовательским классом. Описание схемы происходит пользователем и проверятся на этапе выполнения.
type DataFrame = Dataset[Row] //именно для ROW задается число колонок и их схема

Преимущетсва DataFrame перед RDD
- Catalyst - оптимизации запроса: перестановки, преобразования, push предикатов, projection (читаем только что нужно) и т.д.
- Tungesten
-- данные в колоночном сжатом формате
-- векторные вычисления
-- формат сериализации значительно проще default java (быстрей передача по сети и сериализация и десериализация:
* java serializer: header info, hashcode, Unicode info, etc. Строка “abcd” занимает 48 байт, вместо 4
* Tungesten serializer хранит только смещения до значений (ofset) и размер данных (fiel length)

-- off-heap хранение данных за пределами heap ( нет влияния GC)
-- специализированные encoder для сжатых колоночных данных (Parquet, orc и т.д.)
-- знание схемы позволяет компактнее и хранить и бытрей извелкать данные

Создание Dataframe из RDD
- преобразование KV RDD в DataFrame требует описания названия колонок , т.к. они хранятся как ._N
val tupleRDD = ... // Assume RDD[(Int, String String, String)]
val tupleDF = tupleRDD.toDF("COL1", ... , "COLN") 
- если в RDD находится case class, то названия берутся из него
case class Person(id: Int, name: String, city: String)
val peopleRDD = ... // Assume RDD[Person]
val peopleDF = peopleRDD.toDF
- создание DF из сырых данных (пример Json) с точным описанием схемы
val schema = StructType(
    Array(
        StructField("name", StringType, nullable = true),
        StructField("age", StringType, nullable = true),
        )
    )
// сначала строки RDD нужно привести к типу ROW
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
//потом можно навешивать схему
val peopleDF = spark.createDataFrame(rowRDD, schema)
- создание DF из структурированного источника Hive Table или Jdbc не требует описания схемы и типов (все берется из источника)
spark.read.table("rdw.pos_rec_itm")

Функции над DF
- sql like функции join, group, sort, join, aggregate и т.д.
- аналоги Pandas Dataframe:
fill(0) // заполнить null->0
fill(Map("minBalance" -> 0)) // заполнить null->0, только в minBalance
replace(Array("id"),Map(1234 -> 8923)) // поменять 1234 на 8923 в колонке ID
drop() // удалить строки, которые содержат null хотябы в 1 колонке
drop("all") // удалить полностью null строки
drop(Array("id", "name")) // удалить строки у которых в колонках id или name есть NULL

df.printSchema // описание DF

Spark SQL
- создать временную таблицу на основе DataFrame. Таблицу можно будет исползовать в SQL только в этом же контексте
DF.registerTempTable("tempTable") 
- кэширование таблицы через SQL
val c = spark.sql("CACHE TABLE rdw.bi0_pplant") 
- кастомные функции для Spark SQL:
registerFunction("strLenScala", (_: String) .length)
val tweetLength = spark.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10")
- Пример-сравнение dataframe api и Spark SQL
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val sales = spark.read.table("rdw.pos_rec_itm").where($"CALDAY".between("20200618", "20200619") && $"RT_PAYDIR" =!= "2" && $"/BIC/CLIENT" =!= " ")
val promo = spark.read.table("rdw.bi0_prt_promo").where($"rt_promoth" === "Z010" && $"rt_promo" =!= " ").select("rt_promo").groupBy("rt_promo").max()
val kso = spark.read.table("bw.kso").select("plant", "zposno").groupBy("plant", "zposno").max()

val sjoin =
sales.
join(broadcast(promo), sales("rt_promo") === promo("rt_promo"), "left").
join(broadcast(kso), sales("plant") === kso("plant") && sales("/bic/zposno") === kso("zposno"), "left").
select( 
concat( $"calday", lit("_"), $"time", lit("_"), $"/bic/crecnum", lit("_"), $"doc_num", lit("_"), $"/bic/zposno", lit("_"), sales("plant")).as("check_id") ,
$"/bic/client".cast(IntegerType).as("client_id"),
sales("plant").cast(IntegerType).as("plant"),
$"material".cast(IntegerType).as("material"),
$"time".substr(1,2).cast(IntegerType).as("hh"),
when(sales("RT_PROMO") =!= " ", 1).otherwise(0).as("is_promo"),
when(promo("rt_promo").isNotNull, 1).otherwise(0).as("is_crazy"),
when(kso("plant").isNotNull, 1).otherwise(0).as("is_kso"),
when($"/bic/zinstype" === "08" || $"/bic/zinstype" === "09", 1).otherwise(0).as("is_scan"),
( $"cpsaexcubu" * when($"/bic/zinstype" === "G", 0.002).when($"/bic/zinstype" === "KG", 0.5).otherwise(1) ).as("qnt"),
$"rtsaexcust".as("amount"),
$"/bic/ZRT_PREDR".as("amount_disc"),
count("*").over( Window.partitionBy($"/bic/client") ).as("cnt_lines_by_client"),
$"calday"
)
Может быть описано обычным SQL
select
concat(i.calday, i.time, '_', i.`bic_crecnum`, '_', i.doc_num, '_', i.`bic_zposno`, '_', i.plant )  as check_id ,
case when i.`bic_client` IN( 'null', ' ') or length(i.`bic_client`) = 0 then null else i.`bic_client` end as client_id,
cast(i.plant as int) as plant,
cast(i.material as int) as material,
cast(SUBSTR(i.time,1,2) as int) as hh, 
case when i.`RT_PROMO` IN( 'null', ' ') or length(i.`RT_PROMO`) = 0 then 0 else 1 end as is_promo,
case when p.rt_promo IS NOT NULL then 1 else 0 end as is_crazy,
case when ks.plant IS NOT NULL then 1 else 0 end as is_kso,
case when i.`bic_zinstype` in ('08', '09') then 1 else 0 end as is_scan,
case when i.base_uom = 'G' then 0.002 when i.base_uom = 'KG' then 0.5 else 1 end * i.cpsaexcubu as qnt,
i.rtsaexcust as amount,
i.`bic_ZRT_PREDR` as amount_disc,
i.calday,
COUNT(*) OVER(partition by i.`bic_client`) as cnt_lines_by_client
from rdw.pos_rec_itm i
left join (select rt_promo from rdw.bi0_prt_promo p  WHERE p.rt_promoth = 'Z010' AND p.rt_promo <> ' ' group by rt_promo) p on p.rt_promo = i.rt_promo
left join (select plant, zposno from bw.kso group by plant, zposno) ks on ks.plant = i.plant AND ks.zposno = i.`bic_zposno`
where 
i.`bic_client` <> ' '
AND i.RT_PAYDIR != '2 '
and i.calday BETWEEN '20200616' AND '20200617'

DataSet

параметризированный DF, который объединяет функциональный подход RDD и Sql-like dataframe, но добавляет проверку типов данных на этапе компиляции
listingsDS.groupByKey(l => l.zip) //от RDD
.agg(avg($"price") .as[Double]) //от dataframe но с указанием типа колонки для type safe

Преобразование DF в DS
- неявно
import spark.implicits._ 
myDF.toDS //для DF или RDD
- явно указывая типы данных для схемы
val myDS = spark.read.json("people.json").as[Person] 

DF.map(r => TimeUsageRow(r.getAs("working"), r.getAs("sex"), r.getAs("age"), r.getAs("primaryNeeds"), r.getAs("work"), r.getAs("other")))

Функции над DS
DS имеет набор функции как у RDD, но groupByKey возвращает KeyValueGroupedDataset , который преобразуется обратно в DS с помощью reduceGroups или agg
groupByKey[K](f: T => K): KeyValueGroupedDataset[K, T] 
функции над KeyValueGroupedDataset:
reduceGroups(f: (V, V) => V): Dataset[(K, V)] 
agg[U](col: TypedColumn[V, U]): Dataset[(K, U)] 

mapGroups[U](f: (K, Iterator[V]) => U): Dataset[U] 

Пример:
- kv RDD:
rdd.reduceByKey(_+_)
- DS:
import org.apache.spark.sql.expressions.scalalang.typed
ds.
  groupByKey(r => r.age). //KeyValueGroupedDataset
  agg( 
    typed.sum[TimUsageRow](_.work).as(Encoders.DOUBLE),
  ).
  map(k => TimeUsageRow(k._1, k._2)).
  sort("age")
Замен mapGroups всегда желательней использовать agg
- reduceGroups - параллелится, но нет возможности сменить тип данных в массиве ключа
- agg - поддерживает ограниченное число типов данных и операций агрегации
Если нужна своя, то можно реализовать кастомный агрегатор, который будет параллелиться и не будет приводить к лишним сериализациям/десериализациям на уровне строки (только на уровне партиции):
val strConcat = new Aggregator[(Int, String), String , String]{
    def zero : String= ""
    def reduce(b: String , a : (Int, String) ) : String = b + a._2
    def merge(bl : String , b2 : String) : String = bl + b2
    def finish(r : String) : String = r
    override def bufferEncoder : Encoder[BUF] = Encoders.STRING //кастомные энкодеры для Tungesten
    override def outputEncoder : Encoder[OUT] = Encoders.STRING
}.toColumn
keyValuesDS.groupByKey(pair => pair._1)
.agg(strConcat.as[String] )

Управление распределением данных

- repartition(N) - перераспределение данных в кластере на N новых партиций
- coalesce(N) - операция уменьшает число блоков, склеивая мелкие в более большие на 1 ноде
- rdd.partitions.size() - число partition

Задание способа партицирования
- Хэш партицирование
import org.apache.spark.HashPartitioner
val userData = sc.sequenceFile[UserID, Userinfo] ("hdfs:// ... ")
.partitionBy(new HashPartioner(100)) // распределяем файл в кластере по 100 хэш партициям
.persist()

- range партицирование - разделение RDD на 10 равных последовательных частей
counts.partitionBy(new RangePartitioner(10,counts))
range партицирование происходит при сортировке, чтобы данные были по порядку, а не случайно, как в hash

- свой тип партицирования:
class DomainNamePartitioner(numParts: Int) extends Partitioner {
 override def numPartitions: Int = numParts //число партиций
 override def getPartition(key: Any): Int = { //получить номер партиции по ключу
  val domain = new Java.net.URL(key.toString) .getHost() //в данном случае: хэш код от хоста url
  val code = (domain.hashCode % numPartitions)
  if (code < 0) (
   code + numPartitions // Сделать неотрицательным
  else
   code
 }
 // Jаvа-метод equals для сравнения объектов Partitioner
 //для сравнения 2 RDD партицированных одним способом
 override def equals(other: Any): Boolean = other match {
  case dnp: DomainNamePartitioner =>
   dnp.numPartitions == numPartitions
  case _ =>
   false
 }
}

- map - создает новый RDD, который не наследует принцип партицирования (нужно исопльзовать mapValues)
- pairs.partitioner - определение метода партицирования

Дополнительные возможности

Аккумулярторы
max/filter может использовать в своем коде внешние переменные, но они будут локальными для конкретного воркера
Чтобы использовать переменные видимые в драйвере и воркере, нужно брать accumulator
val blankLines = sc.accumulator(0)
val callSigns = file.flatMap (line =>
 if (line == '"') {
  blankLines += 1 // Увеличить значение в аккумуляторе
 line.split(" ")
})
-- значение акк. можно будет получить только после действия с RDD (flatmap - преобразование не вызываемое сразу)
-- точность значения аккумулятора не гарантируется, он может быть больше, если контейнер рестратрует или если RDD несколько раз сохраняется на диск, а потом обратно кэшируется в память (по этому чаще используется для статистики или отладки)
-- можно передавать небольшие значения: Int, Double, Long, Float
-- собственный акк. можно создать отнаследовавшись от AccumulatorParam

Широковещательные переменные
предназначены для передачи большого объема данных, но только для чтения
Пример использования - левая таблица для HJ или для lookup:
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
 val country = lookupinArray (sign, signPrefixes. value)
 (country, count)
} . reduceByKey ( (х, у) => х + у)
signPrefixes - можно изменить в воркере, но изменение будет видно только там

mapPartitions
- Единичная инициализация ресурсов через mapPartitions для конкретного воркера:
val contactsContactLists = validSigns.distinct() .mapPartitions{
signs =>
    val mapper = createMapper() //единичная инициализация для воркера, а не для каждого элемента
    val client = new HttpClient()
    client.start ()
    // создать http-зaпpoc
    signs.map {sign => createExchangeForSign(sign)} //поэлементная обработка
        .map{ case (sign, exchange) => // извлечь ответы
            (sign, readExchangeCallLog(mapper, exchange))).filter(x => х._2 != null) // Удалить пустые записи
        }
}

Взаимодействие с внешними программами
val distScript = "./src/R/finddistance.R"
val distScriptName = "finddistance.R"
sc.addFile(distScript)
val distances = contactsContactLists.values.flatMap(x =>
    x.map(y => s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")) //передать параметры
.pipe(Seq(SparkFiles.get(distScriptName))) //вызывать скрипт через pipe
Все вызванные скрипты сохраняются в SparkFiles.getRootDirectory - важно давать разные имена

Spark Streaming

Подключение к Kafka с помощью Spark Structure Streaming было описано раньше в блоге

- каждые 5-6 сообщений происходит checkpoint - запись данных в hdfs
принудительный вызов:
ssc.checkpoint( "hdfs:// ... ")
Преобразования
- без сохранения состояния (отдельно над каждой новой порцией данных):
-- map/filter/reduce/group/join - применяются над каждым RDD порции данных.
применение будет над окном заданном в StreamingContext
-- transform - применение функции над каждым RDD из окна:
val outlierDStream = accessLogsDStream.transform { rdd =>
 extractOutliers(rdd)
}
- с сохранением состояния (при вычислении текущего окна используются данные предыдущих):
-- Оконные функции - например, нужно посчитать среднее за 30 секунд (размер окна - кратно шагу), при частоте обновления = 10 (шаг перемещения окна)
Кол-во значения в скользящем окне:
val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10))
val windowCounts = accessLogsWindow.count()
-- Reduce на окне:
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getipAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
( (х, у) => х + у), // Добавить элементы из новых пакетов в окне
( (х, у) => х - у), // Удалить элементы из пакетов, покинувших окно
Seconds(30), // Размер окна
Seconds(10)) // Шаг перемешения окна
также есть: countByValueAndWindow и countByWindow

Настройка и отладка

- input.toDebugString - посмотреть ациклический граф получения RDD
counts.toDebugString
(2) ShuffledRDD [ 296] at reduceByKey at <console>: 17
+-(2) MappedRDD[295] at rnap at <console>:17
 | FilteredRDD [ 294 ] at filter at <console>: 15
 | MappedRDD[293] at rnap at <console>:15
 | input.text MappedRDD[292] at textFile at <console>:13
 | input.text HadoopRDD[291] at textFile at <console>:13
Web интерфейс мониторинга
- Stages - основная информация о работе шага
-- сравнить min с max - определить наличие перекоса (нужен другой repartition или coalesce маленьких партций)
-- сравнить время работы task в временем GC - возможно нехватает памяти
-- также если большой spill на диск - надо увеличить память или выдать больше партиций
- storage в web мониторе показывает список закешированных RDD
на каких хостах он хранится и какое рапредление озу/диск
- executors - показывает всех исполнителей и сколько ресурсов они себе забрали
тут же можно посмотреть Thread Dump - стэктрейс работы программы
на основе этого можно делать сэмплирование, чтобы определить наиболее медленные части или определить проблемный стек при зависании
- в environment - Хранятся общие настройки
- также лог выполнения можно посмотреть через yarn
для уже завершенного: yarn logs -applicationid <арр ID>

- просмотр плана выполнения Catalyst
spark.sql("select plant, SUM(rtsaexcust) rtsaexcust from tst where plant = '002' group by plant order by plant").explain(true)

== Physical Plan == 
Sort [plant#160 ASC NULLS FIRST], true, 0 
+- Exchange rangepartitioning(plant#160 ASC NULLS FIRST, 4), [id=#110] 
 +- *(1) HashAggregate(keys=[plant#160], functions=[sum(rtsaexcust#185)]) 
  +- *(1) Project [plant#160, rtsaexcust#185] 
   +- *(1) Filter (isnotnull(plant#160) && (plant#160 = 002)) 
    +- *(1) FileScan parquet tst[plant#160,rtsaexcust#185,calday#206] Batched: true, DataFilters: [isnotnull(plant#160), (plant#160 = 002)], Format: Parquet, 
Location: CatalogFileIndex[dbfs:/mnt/exfs/warehouse/tst], 
PartitionCount: 3, PartitionFilters: [], 
PushedFilters: [IsNotNull(plant), EqualTo(plant,002)], ReadSchema: struct<plant:string,rtsaexcust:decimal(17,2)>, 
SelectedBucketsCount: 1 out of 4
план представляет из себя дерево: литералов (константы), атрибутов (переменные), поддеревьев содержащих рекурсивную функцию
- происходит несколько трансформаций, для снижения стоимости плана: схлопывание литералов, filter pushdown, projection pushdown (column pruning - исключение колонок)
На этом этапе можно определить, что вероятно план был сгенерирован неверно.

- Для мониторинга java приложений есть отдельная статья в блоге.

Оптимизация

Число воркеров при чтении с диска
Число воркеров зависит от числа блоков родительского RDD
т.е. все еще начинается с файлов в hdfs, если файл маленький, то большой параллельности по умолчанию не будет
Для решения этой проблемы есть 2 подхода:
- при записи сделать repartition на большее число партиций
- увеличить число файлов с помощью настройки
dw.write.option("maxRecordsPerFile", N")
При чтении задать maxPartitionBytes, который разобьет блоки на партиции нужного размера
spark.sql.files.maxPartitionBytes = bytes
- после фильтрации RDD число воркеров наследуется от родителя, несмотря на уменьшение объема
из-за этого есть смысл делать coalesce - чтобы уменьшить число пустых блоков

Число воркеров на фазе shuffle
- Чаще всего рекомендуется, чтобы размер данных не превышал 200МБ
Пример: размер входных данных в shuffle = 210ГБ
Число shuffle partition = 210000MB / 200MB = 1050
Но если достпно большее число ресурсов, то ставим доступный максимум
spark.conf.set("spark.sql.shuffle.partitions", 1050)

Признаки недостаточности числа worker или ОЗУ для partition
- Shuffle spill (disk) - сохранение сериализованных данных на диск
- Большая доля GC time

Проблема перекоса данных
Можно определить по Min / Max времени работы тасков в рамках одного job (будут значительно отличаться) - В databricks можно решить автоматически хинтом:
df.join(
    skewDF.hint("SKEW", "skewKey", List("0", "9999") ), Seq("keyCol"), "inner")
)
- дополнительный генеренный столбец, который добавляется в соединение/группировку и разбрасывает данные одинаково равномерно оба DF.
- так же можно сделать coalesce, чтобы объединить мелкие партиции в более крупные

Типы join
sort-merge join
алгоритм поумолчанию
На стадии чтения происходит сортировка и группировка данных, стадия shuffle объединяет данные в партиции с общим ключем, тут же делается слияние сортированных данных за линейное время.
Запрос может быть выполнен без стадии shuffle 2 способами (map-side join):
- Одинаково партицировать оба DF
даст преимуществно только при повторных выполнениях, т.к. не сохраняется физически на диск.
- Одинаково кластеризовать обе таблицы источники для DF
SET spark.sql.shuffle.partitions = 4;
create table tst_clust
using parquet
PARTITIONED BY (calday)
CLUSTERED BY(plant) SORTED BY (plant) INTO 4 BUCKETS
as 
select * from tst;
Особенности:
-- каждый воркер пишет свой набор файлов бакетов, по этому общее число файлов = число воркеров * бакетов (но, т.к. нет объеднения файлов, то нет шафла)
-- как следствие при выполнении join нужна дополнительная сортировка, которая сольет файлы в 1 бакет (но без шафла!)
-- не поддерживается Join с union all, даже если одинаковое число бакетов
-- не совместимо с hive (другой алгорит хэширования: murmur3 продив hivehash, в hive бакеты сливаются в 1 при вставке - требует шафла)

Broadcast join
Второй по популярности тип соединения.
Левая таблица должна быть меньше 10МБ
spark.sql.autoBroadcatsJoinThreshold = def 10MB -- создается на драйвере и рассылается во все executor
За счет Broadcats join выполнится без стадии shuffle
- Возможная будущая оптимизация:
broadcast hash join executor side, чуть эффективней, чем работать через драйвер:
-- через драйвер: K/N данных отправить с N exec, K данных отправить на N exec == K/N * N + K*N = K * N + K
-- обменяться данными между exec: K/N Данных отправить на N-1 exec и так на N exec == K/N *(N - 1) * N = K * N - K
т.е. на 2 полные пересылки эффективней и данные пересылаются между executor, а не все через diver (может стать бутылочным горлышком)

shuffle hash join
для запросов только через равно
одна таблица меньше другой: минимум в 3 раза
достаточно памяти для шафлинга партиции хэш массива
практически не используется из-за более общего алгоритма sort-merge join без ограничение HJ

broadcast NL
почти не используется, но наиболее универаслен, т.к. принимает любое условие соединения
для соединений, где для усвловий отличных от = (обычно NOT EXISTS с условием <>)

Range join
Соединение таблицы по диапазону between:
SELECT * FROM r1 JOIN r2 ON r1.start < r2.end AND r2.start < r1.end
поддерживается только databricks через bin join
интервальный справочник разбивает на подинтервалы (bin) , факт шафлится по bin и выполняется за счет этого в несколько потоков на части данных (bin)
шаг bin задается хинтом: /*+ RANGE_JOIN(points, 10) */

Оптимизация функций
- map не сохраняют партицирование (preservePartition = false), т.к. может создавать/изменять как значения, так и ключи
если точно знаем, что ключи не меняются, то можно указать preservePartition = true или использовать замен mapValues
- если нужно использовать большой объект в функции, то лучше не делать замыкание - объявлять глобальную переменную в драйвере и использовать ее во внутренней функции.
Т.к. spark в этом случае сереализует эту переменную и разошлет всем executor.
Лучше создать broadcast переменную, которая разошлется только по нодам.
- Если нужна построчная обработка, то нужно использовать mapPartitions, чтобы уменьшить число смен контекста между партциями
rdd.mapPartitions(rows => rows.foreach(r => {some logic on row} )
- если 100% точность не нужна, можно использовать апроксимационные функции:
--функция                               результат время(с)
count(distinct check_id)                --141490 24.599
approx_count_distinct(check_id,0.01)    --139874 19.795
approx_count_distinct(check_id)         --136133 16.956
approx_count_distinct(check_id,0.001)   --141490 17.786
- если в коде есть большие запросы (pivot в ML), то лучше увеличить perm size для JVM
- для heap меньше 32 ГБ следует использовать 4 байтовые указатели - UseCompressedOOps
- дополнительные опции при создании таблицы через JDBC
.option("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Spark SQL test table'")

Оптимизации в Spark 3
адптивные запросы
spark.sql.adaptive.enabled = true
- автоматическое опредление кол-ва shuffle partition
- автоматический coalesce мелких shuffle part в большие
- смена типа соединения sort merge - broadcast
Другие оптимизации:
- dynmic partition pruning - broadcast таблицы справочника и динамичекое определние нужных партиций фактовой таблицы
- определение перкоса - coalesce для мелких перекошенных партиций в большие или наоборот разбитие больших на мелкие
- push предикатов на уровень хранилища для parquet
- Catalyst - разбивает длинные методы на несколько мелких (set SplitAggregateFunc.enabled = true)

Типы планировщиков Spark
- Fair Scheduller
можно задать лимит числа контейнеров для параллельных потоков внутри 1 submit
- spark.dynamicAllocation.enabled - динамическое определение ресурсов для onprem Spark в зависимости от числа партиций

Комментариев нет:

Отправка комментария