- RDD
- RDD Key-Value
- DataFrame и Spark SQL
- DataSet
- Управление распределением данных
- Shuffle
- Дополнительные возможности
- Spark Streaming
- Настройка и отладка
- Оптимизация
Общее описание архитектуры Spark было ранее тут: /search/label/bigdata#spark
RDD
Виды операций- narrow (слабые) преобразования - создание нового RDD на основе текущего с выполнением некоторых операций
все narrow операции выполняются в 1 stage, т.к. данные не надо никуда перекачивать (обработка на месте)
- wide (сильная) действия - вычисления результата, данные 1 блока RDD переносятся в несколько других блоков (reduce, order, group, join)
wide операция создает новый stage в плане, т.к. требует shuffling - сохранение промежуточных данных и пересылку на другие сервера (партиции)
выполнение преобразований откладывается и оптимизируется до первого действия
(т.е. нагляднее сделать несколько map/filter, чем 1 громоздкий)
- если RDD сохранили в переменную, то при каждом вызове действия над ней все преобразования будут вычисляться заново
.persist / .cache - если нужно сохранить результат преобразований
-- можно задать место хранения, к примеру: (StorageLevel.DISK_ONLY) / MEMORY_ONLY/ SER - с сериализацией объекта
-- если задали MEMORY_ONLY, но после RDD был вытеснен, то при последующем обращении к нему ему нужно выполниться заново
если такая вероятность есть, то лучше сохранять с MEMORY_AND_DISK - тогда при повторном обращении блок будет сохранен и загружен с диска
-- unpersist - выгрузить переменную из памяти
Создание RDD
- из массива:
val lines = sc.parallelize(List("pandas", "i like pandas"))- из текстового файла:
val lines = sc.textFile("/path/to/README.md")
Сохранение RDD
- .collect - в локальный массив или take(N элементов)/takeSample
- saveAsSequenceFile("path",Some(classOf[GzipCodec]) - в 1 текстовый файл
saveAsTextFile - в несколько файлов
- передача собственных функций
-- функция должна поддерживать сериализацию
def isMatch(s: String): Boolean = { s.contains("123") } file.filter(isMatch).first()
Основные операции
- input.map(x => х * х) - выполняется над каждым элементом и создает новый RDD с тем же числом преобразованных элементов
- flatMap - то же самое, но разворачивает элементы подмассивов в элементы 1 массива
- поддерживаются операции над множествами: distinct / union / intersection / substract (oracle minus) / cartesian
- sample - выборка случайных элементов
- foreach
Основные действия
def foldleft[B](z: B)(f: (B, A)=> B): B- не может быть распараллелена, т.к. последовательно агрегирует данные с типа A на тип B с хранением промежуточного результата в аккумуляторе типа B
- по этому этой операции нет в Spark RDD
def fold(z: A)(f: (A, A)=> A): A- может параллелиться, т.к. не делает преобразования типа и промежуточные результаты могут использовать свой собственный аккумулятор
- т.к. не меняет тип коллекции, то сумму можно посчитать только для числового массива
reduce - тоже самое, что fold, но без начального значения z
aggregate[B](z: => B)(seqop: (B, A)=> B, combop: (B, B) => B): B- может менять тип и параллелиться за счет 2 операций (допустим посчитать число мелких букв - массив букв типа A, результат кол-во типа B)
-- seqop - агрегация в параллельных потоках с индивидуальным аккумулятором типа B
-- combop - агрегация промежуточных аккумуляторов в итог
++ aggregate - предпочительней filter+count, т.к. сжимает комплексные типы на входе внутри таски к простым аккумуляторам на выходе
-- reduce / fold - принимают 2 элемента и возвращает 1 элемент того же типа
-- aggregate - похоже на reduce, но используется, если нужно вернуть значение другого типа
RDD Key-Value
Сигнатура KV RDD:RDD[(K, V)]Создание KV RDD
где ключ - первое слово, значение = сама строка:
val pairs = lines.map(x => (x.split(" ")(0), х))Действия над KV RDD
filter{case (key, value) => value.length < 20} //фильтр по значению groupByKey(): RDD[(K, Iterable[V])] //группировка по ключу: ключ-массив значений reduceByKey(func: (V, V) => V): RDD[(K, V)] //группировка по ключу с выполнение функции func над значениями: ключ - 1 итог join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] //join двух rdd по ключу mapValues [U] ( f: V => U) : RDD [(K, U)] //выполнение функции f над значениями RDD -- краткая запись для rdd.map { case (x, y): (x, func(y))} countByKey(): Map[K, Long] //подсчет кол-ва элементов в группе keys: RDD[K] //получить ключи- map - работает значительно медленней mapValues, т.к. сбрасывает partioner и вызывает shuffle (т.к. map может менять как ключи, так и значения)
- partitionBy перед groupBy/Join может убрать shuffle, т.к. данные уже будут разбиты по серверам на части (аналогично groupBy убирает shuffle у последующего join)
- reduceByKey - быстрей groupByKey, т.к. весь массив данных ключа не передается по сети
Пример - вычисление среднего
rdd.mapValues(x => (х, 1)) //RDD -> KV RDD .reduceByKey((х, у) => (x._l + у._1, х._2 + у._2)) //схлопывание значений в ключе: ключ=>(сумма, число элементов)
- combineByKey - аналог aggregate для KV RDD (позволяет сменить тип выходного значения, оставляя параллельность)
пример вычисления среднего:
val result = input.combineByKey( (v) => (v, 1), //createCombiner - если ключа не было - создаем (число значения, 1) (асс: (Int, Int), v) => (асс._1 + v, асс._2 + 1), //mergeValues - если ключ есть в этом блоке, то - (число значние + новое значние, число потовторов + 1) (accl: (Int, Int), асс2: (Int, Int)) => (accl._1 + асс2._1, accl._2 + асс2._2) //mergeCombiners - объединяем потоки ) .map{ case (key, value) => (key, value._1 / value._2.toFloat) //проходимся по полученному массиву: значение / число повторов result.collectAsMap() .map(println(_)) //collectAsMap - преобразование RDD в локальный ассоциативный массив + выводим на экран
DataFrame и Spark SQL
DataFrame - это Rdd + Схема данныхОтличия
RDD
не оптимизируется Spark, для него это blob объект из байт о содержимом которого он ничего не знает.
Для обработки используются самописанные анонимные функции, которые выглядят черным ящиком:
ds.filter ( p => p.city == "Boston" ) //high order функция не может быть перенесена на уровень хранилища и применена над блоками parquetRDD[T] - параметризирован классом T и проверяет типы класса при компиляции
DataFrame
добавляет информацию о схеме, что дает возможность оптимизации.
Над данными производится ограниченный набор табличных преобразований, которые могут быть оптимизированы и проброшены на уровень хранилища
(вместо reduce/group/map - sql like функции join, group, sort, join, aggregate )
ds.filter ( $"city" === "Boston" ) //точное значение Boston может быть проброшено на уровень хранища и отбросить блоки данных без их чтенияDataFrame - не параметризируется пользовательским классом. Описание схемы происходит пользователем и проверятся на этапе выполнения.
type DataFrame = Dataset[Row] //именно для ROW задается число колонок и их схема
Преимущетсва DataFrame перед RDD
- Catalyst - оптимизации запроса: перестановки, преобразования, push предикатов, projection (читаем только что нужно) и т.д.
- Tungesten
-- данные в колоночном сжатом формате
-- векторные вычисления
-- формат сериализации значительно проще default java (быстрей передача по сети и сериализация и десериализация:
* java serializer: header info, hashcode, Unicode info, etc. Строка “abcd” занимает 48 байт, вместо 4
* Tungesten serializer хранит только смещения до значений (ofset) и размер данных (fiel length)
-- off-heap хранение данных за пределами heap ( нет влияния GC)
-- специализированные encoder для сжатых колоночных данных (Parquet, orc и т.д.)
-- знание схемы позволяет компактнее и хранить и бытрей извелкать данные
Создание Dataframe из RDD
- преобразование KV RDD в DataFrame требует описания названия колонок , т.к. они хранятся как ._N
val tupleRDD = ... // Assume RDD[(Int, String String, String)] val tupleDF = tupleRDD.toDF("COL1", ... , "COLN")- если в RDD находится case class, то названия берутся из него
case class Person(id: Int, name: String, city: String) val peopleRDD = ... // Assume RDD[Person] val peopleDF = peopleRDD.toDF- создание DF из сырых данных (пример Json) с точным описанием схемы
val schema = StructType( Array( StructField("name", StringType, nullable = true), StructField("age", StringType, nullable = true), ) ) // сначала строки RDD нужно привести к типу ROW val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) //потом можно навешивать схему val peopleDF = spark.createDataFrame(rowRDD, schema)- создание DF из структурированного источника Hive Table или Jdbc не требует описания схемы и типов (все берется из источника)
spark.read.table("rdw.pos_rec_itm")
Функции над DF
- sql like функции join, group, sort, join, aggregate и т.д.
- аналоги Pandas Dataframe:
fill(0) // заполнить null->0 fill(Map("minBalance" -> 0)) // заполнить null->0, только в minBalance replace(Array("id"),Map(1234 -> 8923)) // поменять 1234 на 8923 в колонке ID drop() // удалить строки, которые содержат null хотябы в 1 колонке drop("all") // удалить полностью null строки drop(Array("id", "name")) // удалить строки у которых в колонках id или name есть NULL df.printSchema // описание DF
Spark SQL
- создать временную таблицу на основе DataFrame. Таблицу можно будет исползовать в SQL только в этом же контексте
DF.registerTempTable("tempTable")- кэширование таблицы через SQL
val c = spark.sql("CACHE TABLE rdw.bi0_pplant")- кастомные функции для Spark SQL:
registerFunction("strLenScala", (_: String) .length) val tweetLength = spark.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10")- Пример-сравнение dataframe api и Spark SQL
import org.apache.spark.sql.types._ import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val sales = spark.read.table("rdw.pos_rec_itm").where($"CALDAY".between("20200618", "20200619") && $"RT_PAYDIR" =!= "2" && $"/BIC/CLIENT" =!= " ") val promo = spark.read.table("rdw.bi0_prt_promo").where($"rt_promoth" === "Z010" && $"rt_promo" =!= " ").select("rt_promo").groupBy("rt_promo").max() val kso = spark.read.table("bw.kso").select("plant", "zposno").groupBy("plant", "zposno").max() val sjoin = sales. join(broadcast(promo), sales("rt_promo") === promo("rt_promo"), "left"). join(broadcast(kso), sales("plant") === kso("plant") && sales("/bic/zposno") === kso("zposno"), "left"). select( concat( $"calday", lit("_"), $"time", lit("_"), $"/bic/crecnum", lit("_"), $"doc_num", lit("_"), $"/bic/zposno", lit("_"), sales("plant")).as("check_id") , $"/bic/client".cast(IntegerType).as("client_id"), sales("plant").cast(IntegerType).as("plant"), $"material".cast(IntegerType).as("material"), $"time".substr(1,2).cast(IntegerType).as("hh"), when(sales("RT_PROMO") =!= " ", 1).otherwise(0).as("is_promo"), when(promo("rt_promo").isNotNull, 1).otherwise(0).as("is_crazy"), when(kso("plant").isNotNull, 1).otherwise(0).as("is_kso"), when($"/bic/zinstype" === "08" || $"/bic/zinstype" === "09", 1).otherwise(0).as("is_scan"), ( $"cpsaexcubu" * when($"/bic/zinstype" === "G", 0.002).when($"/bic/zinstype" === "KG", 0.5).otherwise(1) ).as("qnt"), $"rtsaexcust".as("amount"), $"/bic/ZRT_PREDR".as("amount_disc"), count("*").over( Window.partitionBy($"/bic/client") ).as("cnt_lines_by_client"), $"calday" )Может быть описано обычным SQL
select concat(i.calday, i.time, '_', i.`bic_crecnum`, '_', i.doc_num, '_', i.`bic_zposno`, '_', i.plant ) as check_id , case when i.`bic_client` IN( 'null', ' ') or length(i.`bic_client`) = 0 then null else i.`bic_client` end as client_id, cast(i.plant as int) as plant, cast(i.material as int) as material, cast(SUBSTR(i.time,1,2) as int) as hh, case when i.`RT_PROMO` IN( 'null', ' ') or length(i.`RT_PROMO`) = 0 then 0 else 1 end as is_promo, case when p.rt_promo IS NOT NULL then 1 else 0 end as is_crazy, case when ks.plant IS NOT NULL then 1 else 0 end as is_kso, case when i.`bic_zinstype` in ('08', '09') then 1 else 0 end as is_scan, case when i.base_uom = 'G' then 0.002 when i.base_uom = 'KG' then 0.5 else 1 end * i.cpsaexcubu as qnt, i.rtsaexcust as amount, i.`bic_ZRT_PREDR` as amount_disc, i.calday, COUNT(*) OVER(partition by i.`bic_client`) as cnt_lines_by_client from rdw.pos_rec_itm i left join (select rt_promo from rdw.bi0_prt_promo p WHERE p.rt_promoth = 'Z010' AND p.rt_promo <> ' ' group by rt_promo) p on p.rt_promo = i.rt_promo left join (select plant, zposno from bw.kso group by plant, zposno) ks on ks.plant = i.plant AND ks.zposno = i.`bic_zposno` where i.`bic_client` <> ' ' AND i.RT_PAYDIR != '2 ' and i.calday BETWEEN '20200616' AND '20200617'
DataSet
параметризированный DF, который объединяет функциональный подход RDD и Sql-like dataframe, но добавляет проверку типов данных на этапе компиляцииlistingsDS.groupByKey(l => l.zip) //от RDD .agg(avg($"price") .as[Double]) //от dataframe но с указанием типа колонки для type safe
Преобразование DF в DS
- неявно
import spark.implicits._ myDF.toDS //для DF или RDD- явно указывая типы данных для схемы
val myDS = spark.read.json("people.json").as[Person] DF.map(r => TimeUsageRow(r.getAs("working"), r.getAs("sex"), r.getAs("age"), r.getAs("primaryNeeds"), r.getAs("work"), r.getAs("other")))
Функции над DS
DS имеет набор функции как у RDD, но groupByKey возвращает KeyValueGroupedDataset , который преобразуется обратно в DS с помощью reduceGroups или agg
groupByKey[K](f: T => K): KeyValueGroupedDataset[K, T]функции над KeyValueGroupedDataset:
reduceGroups(f: (V, V) => V): Dataset[(K, V)] agg[U](col: TypedColumn[V, U]): Dataset[(K, U)] mapGroups[U](f: (K, Iterator[V]) => U): Dataset[U]
Пример:
- kv RDD:
rdd.reduceByKey(_+_)- DS:
import org.apache.spark.sql.expressions.scalalang.typed ds. groupByKey(r => r.age). //KeyValueGroupedDataset agg( typed.sum[TimUsageRow](_.work).as(Encoders.DOUBLE), ). map(k => TimeUsageRow(k._1, k._2)). sort("age")Замен mapGroups всегда желательней использовать agg
- reduceGroups - параллелится, но нет возможности сменить тип данных в массиве ключа
- agg - поддерживает ограниченное число типов данных и операций агрегации
Если нужна своя, то можно реализовать кастомный агрегатор, который будет параллелиться и не будет приводить к лишним сериализациям/десериализациям на уровне строки (только на уровне партиции):
val strConcat = new Aggregator[(Int, String), String , String]{ def zero : String= "" def reduce(b: String , a : (Int, String) ) : String = b + a._2 def merge(bl : String , b2 : String) : String = bl + b2 def finish(r : String) : String = r override def bufferEncoder : Encoder[BUF] = Encoders.STRING //кастомные энкодеры для Tungesten override def outputEncoder : Encoder[OUT] = Encoders.STRING }.toColumn keyValuesDS.groupByKey(pair => pair._1) .agg(strConcat.as[String] )
Управление распределением данных
- repartition(N) - перераспределение данных в кластере на N новых партиций- coalesce(N) - операция уменьшает число блоков, склеивая мелкие в более большие на 1 ноде
- rdd.partitions.size() - число partition
Задание способа партицирования
- Хэш партицирование
import org.apache.spark.HashPartitioner val userData = sc.sequenceFile[UserID, Userinfo] ("hdfs:// ... ") .partitionBy(new HashPartioner(100)) // распределяем файл в кластере по 100 хэш партициям .persist()
- range партицирование - разделение RDD на 10 равных последовательных частей
counts.partitionBy(new RangePartitioner(10,counts))range партицирование происходит при сортировке, чтобы данные были по порядку, а не случайно, как в hash
- свой тип партицирования:
class DomainNamePartitioner(numParts: Int) extends Partitioner { override def numPartitions: Int = numParts //число партиций override def getPartition(key: Any): Int = { //получить номер партиции по ключу val domain = new Java.net.URL(key.toString) .getHost() //в данном случае: хэш код от хоста url val code = (domain.hashCode % numPartitions) if (code < 0) ( code + numPartitions // Сделать неотрицательным else code } // Jаvа-метод equals для сравнения объектов Partitioner //для сравнения 2 RDD партицированных одним способом override def equals(other: Any): Boolean = other match { case dnp: DomainNamePartitioner => dnp.numPartitions == numPartitions case _ => false } }
- map - создает новый RDD, который не наследует принцип партицирования (нужно исопльзовать mapValues)
- pairs.partitioner - определение метода партицирования
Shuffle
Распределение данные неразрывно связано с операцией Shuffle.Операции join, groupby, repartition и другие требуют объединения одинаковых данных ключа в одном worker.
Для этого на стадии map данные предагрегируются и сохраняются на диск, т.е. фактически операция checkpoint. Это необходимо, чтобы в случае падения программы на следующих шагах не нужно было пересчитывать все с самого начала, а достаточно было взять данные из фс с последней операции shuffle.
На стадии reduce worker по сети обращается к всем предыдущим worker и считывает сохраненные данные с диска.
Эта архитектура имеет ряд недостатков:
* требуется пересчет при полной потере worker
* повышенная нагрузка к локальной фс worker, когда следующие шаги считывают данные
* необходимо держать worker, даже если он больше не нужен
По этому в будущем, скорей всего, для shuffle будут использоваться внешние Shuffle сервисы (подробней обсуждение по ссылке)
Дополнительные возможности
Аккумулярторыmax/filter может использовать в своем коде внешние переменные, но они будут локальными для конкретного воркера
Чтобы использовать переменные видимые в драйвере и воркере, нужно брать accumulator
val blankLines = sc.accumulator(0) val callSigns = file.flatMap (line => if (line == '"') { blankLines += 1 // Увеличить значение в аккумуляторе line.split(" ") })-- значение акк. можно будет получить только после действия с RDD (flatmap - преобразование не вызываемое сразу)
-- точность значения аккумулятора не гарантируется, он может быть больше, если контейнер рестратрует или если RDD несколько раз сохраняется на диск, а потом обратно кэшируется в память (по этому чаще используется для статистики или отладки)
-- можно передавать небольшие значения: Int, Double, Long, Float
-- собственный акк. можно создать отнаследовавшись от AccumulatorParam
Широковещательные переменные
предназначены для передачи большого объема данных, но только для чтения
Пример использования - левая таблица для HJ или для lookup:
val signPrefixes = sc.broadcast(loadCallSignTable()) val countryContactCounts = contactCounts.map{case (sign, count) => val country = lookupinArray (sign, signPrefixes. value) (country, count) } . reduceByKey ( (х, у) => х + у)signPrefixes - можно изменить в воркере, но изменение будет видно только там
mapPartitions
- Единичная инициализация ресурсов через mapPartitions для конкретного воркера:
val contactsContactLists = validSigns.distinct() .mapPartitions{ signs => val mapper = createMapper() //единичная инициализация для воркера, а не для каждого элемента val client = new HttpClient() client.start () // создать http-зaпpoc signs.map {sign => createExchangeForSign(sign)} //поэлементная обработка .map{ case (sign, exchange) => // извлечь ответы (sign, readExchangeCallLog(mapper, exchange))).filter(x => х._2 != null) // Удалить пустые записи } }
Взаимодействие с внешними программами
val distScript = "./src/R/finddistance.R" val distScriptName = "finddistance.R" sc.addFile(distScript) val distances = contactsContactLists.values.flatMap(x => x.map(y => s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")) //передать параметры .pipe(Seq(SparkFiles.get(distScriptName))) //вызывать скрипт через pipeВсе вызванные скрипты сохраняются в SparkFiles.getRootDirectory - важно давать разные имена
Spark Streaming
Подключение к Kafka с помощью Spark Structure Streaming было описано раньше в блоге- каждые 5-6 сообщений происходит checkpoint - запись данных в hdfs
принудительный вызов:
ssc.checkpoint( "hdfs:// ... ")Преобразования
- без сохранения состояния (отдельно над каждой новой порцией данных):
-- map/filter/reduce/group/join - применяются над каждым RDD порции данных.
применение будет над окном заданном в StreamingContext
-- transform - применение функции над каждым RDD из окна:
val outlierDStream = accessLogsDStream.transform { rdd => extractOutliers(rdd) }- с сохранением состояния (при вычислении текущего окна используются данные предыдущих):
-- Оконные функции - например, нужно посчитать среднее за 30 секунд (размер окна - кратно шагу), при частоте обновления = 10 (шаг перемещения окна)
Кол-во значения в скользящем окне:
val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10)) val windowCounts = accessLogsWindow.count()-- Reduce на окне:
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getipAddress(), 1)) val ipCountDStream = ipDStream.reduceByKeyAndWindow( ( (х, у) => х + у), // Добавить элементы из новых пакетов в окне ( (х, у) => х - у), // Удалить элементы из пакетов, покинувших окно Seconds(30), // Размер окна Seconds(10)) // Шаг перемешения окнатакже есть: countByValueAndWindow и countByWindow
Настройка и отладка
- input.toDebugString - посмотреть ациклический граф получения RDDcounts.toDebugString (2) ShuffledRDD [ 296] at reduceByKey at <console>: 17 +-(2) MappedRDD[295] at rnap at <console>:17 | FilteredRDD [ 294 ] at filter at <console>: 15 | MappedRDD[293] at rnap at <console>:15 | input.text MappedRDD[292] at textFile at <console>:13 | input.text HadoopRDD[291] at textFile at <console>:13Web интерфейс мониторинга
- Stages - основная информация о работе шага
-- сравнить min с max - определить наличие перекоса (нужен другой repartition или coalesce маленьких партций)
-- сравнить время работы task в временем GC - возможно нехватает памяти
-- также если большой spill на диск - надо увеличить память или выдать больше партиций
- storage в web мониторе показывает список закешированных RDD
на каких хостах он хранится и какое рапредление озу/диск
- executors - показывает всех исполнителей и сколько ресурсов они себе забрали
тут же можно посмотреть Thread Dump - стэктрейс работы программы
на основе этого можно делать сэмплирование, чтобы определить наиболее медленные части или определить проблемный стек при зависании
- в environment - Хранятся общие настройки
- также лог выполнения можно посмотреть через yarn
для уже завершенного: yarn logs -applicationid <арр ID>
- просмотр плана выполнения Catalyst
spark.sql("select plant, SUM(rtsaexcust) rtsaexcust from tst where plant = '002' group by plant order by plant").explain(true) == Physical Plan == Sort [plant#160 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(plant#160 ASC NULLS FIRST, 4), [id=#110] +- *(1) HashAggregate(keys=[plant#160], functions=[sum(rtsaexcust#185)]) +- *(1) Project [plant#160, rtsaexcust#185] +- *(1) Filter (isnotnull(plant#160) && (plant#160 = 002)) +- *(1) FileScan parquet tst[plant#160,rtsaexcust#185,calday#206] Batched: true, DataFilters: [isnotnull(plant#160), (plant#160 = 002)], Format: Parquet, Location: CatalogFileIndex[dbfs:/mnt/exfs/warehouse/tst], PartitionCount: 3, PartitionFilters: [], PushedFilters: [IsNotNull(plant), EqualTo(plant,002)], ReadSchema: struct<plant:string,rtsaexcust:decimal(17,2)>, SelectedBucketsCount: 1 out of 4план представляет из себя дерево: литералов (константы), атрибутов (переменные), поддеревьев содержащих рекурсивную функцию
- происходит несколько трансформаций, для снижения стоимости плана: схлопывание литералов, filter pushdown, projection pushdown (column pruning - исключение колонок)
На этом этапе можно определить, что вероятно план был сгенерирован неверно.
- Для мониторинга java приложений есть отдельная статья в блоге.
Оптимизация
Число воркеров при чтении с дискаЧисло воркеров зависит от числа блоков родительского RDD
т.е. все еще начинается с файлов в hdfs, если файл маленький, то большой параллельности по умолчанию не будет
Для решения этой проблемы есть 2 подхода:
- при записи сделать repartition на большее число партиций
- увеличить число файлов с помощью настройки
dw.write.option("maxRecordsPerFile", N")При чтении задать maxPartitionBytes, который разобьет блоки на партиции нужного размера
spark.sql.files.maxPartitionBytes = bytesУточненная формула на основании исходного кода Spark
val sumBytes = files.map(_.getLen + spark.sql.files.openCostInBytes).sum val bytesPerCore = sumBytes / (spark.sql.files.minPartitionNum или spark.sql.leafNodeDefaultParallelism) sumBytes / min(spark.sql.files.maxPartitionBytes, max(spark.sql.files.openCostInBytes, bytesPerCore))т.е. к размеру файла (sumBytes) добавляется минимальная стоимость его открытия (spark.sql.files.openCostInBytes = 4МБ)
число партиций не может быть меньше spark.sql.files.minPartitionNum или spark.sql.leafNodeDefaultParallelism
- после фильтрации RDD число воркеров наследуется от родителя, несмотря на уменьшение объема
из-за этого есть смысл делать coalesce - чтобы уменьшить число пустых блоков
Число воркеров на фазе shuffle
- Чаще всего рекомендуется, чтобы размер данных не превышал 200МБ
Пример: размер входных данных в shuffle = 210ГБ
Число shuffle partition = 210000MB / 200MB = 1050
Но если достпно большее число ресурсов, то ставим доступный максимум
spark.conf.set("spark.sql.shuffle.partitions", 1050)
Признаки недостаточности числа worker или ОЗУ для partition
- Shuffle spill (disk) - сохранение сериализованных данных на диск
- Большая доля GC time
Проблема перекоса данных
Можно определить по Min / Max времени работы тасков в рамках одного job (будут значительно отличаться) - В databricks можно решить автоматически хинтом:
df.join( skewDF.hint("SKEW", "skewKey", List("0", "9999") ), Seq("keyCol"), "inner") )- дополнительный генеренный столбец, который добавляется в соединение/группировку и разбрасывает данные одинаково равномерно оба DF.
- так же можно сделать coalesce, чтобы объединить мелкие партиции в более крупные
Почему может быть OOM, если в Spark есть механизм Spill?
Проверка необходимости Spill происходит с помощью сэмплирования данных.
Если в данных есть перекос, то метод сэмплирования может дать ошибку и привести к OOM из-за неверной оценки необходимой памяти под процесс и Spark не успеет выполнить Spill.
Типы join
sort-merge join
алгоритм поумолчанию
На стадии чтения происходит сортировка и группировка данных, стадия shuffle объединяет данные в партиции с общим ключем, тут же делается слияние сортированных данных за линейное время.
Запрос может быть выполнен без стадии shuffle 2 способами (map-side join):
- Одинаково партицировать оба DF
даст преимуществно только при повторных выполнениях, т.к. не сохраняется физически на диск.
- Одинаково кластеризовать обе таблицы источники для DF
SET spark.sql.shuffle.partitions = 4; create table tst_clust using parquet PARTITIONED BY (calday) CLUSTERED BY(plant) SORTED BY (plant) INTO 4 BUCKETS as select * from tst;Особенности:
-- каждый воркер пишет свой набор файлов бакетов, по этому общее число файлов = число воркеров * бакетов (но, т.к. нет объеднения файлов, то нет шафла)
-- как следствие при выполнении join нужна дополнительная сортировка, которая сольет файлы в 1 бакет (но без шафла!)
-- не поддерживается Join с union all, даже если одинаковое число бакетов
-- не совместимо с hive (другой алгорит хэширования: murmur3 продив hivehash, в hive бакеты сливаются в 1 при вставке - требует шафла)
Broadcast join
Второй по популярности тип соединения.
Левая таблица должна быть меньше 10МБ
spark.sql.autoBroadcatsJoinThreshold = def 10MB -- создается на драйвере и рассылается во все executorЗа счет Broadcats join выполнится без стадии shuffle
- Возможная будущая оптимизация:
broadcast hash join executor side, чуть эффективней, чем работать через драйвер:
-- через драйвер: K/N данных отправить с N exec, K данных отправить на N exec == K/N * N + K*N = K * N + K
-- обменяться данными между exec: K/N Данных отправить на N-1 exec и так на N exec == K/N *(N - 1) * N = K * N - K
т.е. на 2 полные пересылки эффективней и данные пересылаются между executor, а не все через diver (может стать бутылочным горлышком)
shuffle hash join
для запросов только через равно
одна таблица меньше другой: минимум в 3 раза
достаточно памяти для шафлинга партиции хэш массива
практически не используется из-за более общего алгоритма sort-merge join без ограничение HJ
Принудительно можно активировать хинтом и 2 настройками:
set spark.sql.autoBroadcastJoinThreshold = -1; set spark.sql.join.preferSortMergeJoin = false; select /*+ SHUFFLE_HASH(m, i) */ m.rpa_wgh3, SUM(rtsaexcust) rtsaexcust from rdw.pos_rec_itm i join rdw.bi0_pmaterial m on m.material = i.material where i.calday between '20201005' and '20201010' group by m.rpa_wgh3 ; *(4) HashAggregate(keys=[rpa_wgh3#856], functions=[finalmerge_sum(merge sum#947) AS sum(rtsaexcust#30)#943], output=[rpa_wgh3#856, rtsaexcust#838]) +- Exchange hashpartitioning(rpa_wgh3#856, 200), true, [id=#1590] +- *(3) HashAggregate(keys=[rpa_wgh3#856], functions=[partial_sum(rtsaexcust#30) AS sum#947], output=[rpa_wgh3#856, sum#947]) +- *(3) Project [rtsaexcust#30, rpa_wgh3#856] +- ShuffledHashJoin [material#20], [material#839], Inner, BuildRight, false :- Exchange hashpartitioning(material#20, 200), true, [id=#1578] : +- *(1) Project [material#20, rtsaexcust#30] : +- *(1) Filter isnotnull(material#20) : +- *(1) ColumnarToRow : +- FileScan orc rdw.pos_rec_itm[material#20,rtsaexcust#30,calday#51] Batched: true, DataFilters: [isnotnull(material#20)], Format: ORC, Location: InMemoryFileIndex[dbfs:/mnt/exfs/warehouse/rdw.db/pos_rec_itm/calday=20201005, dbfs:/mnt/exfs/war..., PartitionFilters: [isnotnull(calday#51), (calday#51 >= 20201005), (calday#51 <= 20201010)], PushedFilters: [IsNotNull(material)], ReadSchema: struct<material:string,rtsaexcust:decimal(17,2)> +- Exchange hashpartitioning(material#839, 200), true, [id=#1584] +- *(2) Project [material#839, rpa_wgh3#856] +- *(2) Filter isnotnull(material#839) +- *(2) ColumnarToRow +- FileScan orc rdw.bi0_pmaterial[material#839,rpa_wgh3#856] Batched: true, DataFilters: [isnotnull(material#839)], Format: ORC, Location: InMemoryFileIndex[dbfs:/mnt/exfs/warehouse/rdw.db/bi0_pmaterial], PartitionFilters: [], PushedFilters: [IsNotNull(material)], ReadSchema: struct<material:string,rpa_wgh3:string>На моих замерах такой запрос выполнился на 10% медленней, чем sort merge join. Что странно, т.к. при достаточности памяти HJ должен быть дешевле SMJ из-за отсутствия сортировки.
broadcast NL
почти не используется, но наиболее универаслен, т.к. принимает любое условие соединения
для соединений, где для усвловий отличных от = (обычно NOT EXISTS с условием <>)
Range join
Соединение таблицы по диапазону between:
SELECT * FROM r1 JOIN r2 ON r1.start < r2.end AND r2.start < r1.endподдерживается только databricks через bin join
интервальный справочник разбивает на подинтервалы (bin) , факт шафлится по bin и выполняется за счет этого в несколько потоков на части данных (bin)
шаг bin задается хинтом: /*+ RANGE_JOIN(points, 10) */
Оптимизация функций
- map не сохраняют партицирование (preservePartition = false), т.к. может создавать/изменять как значения, так и ключи
если точно знаем, что ключи не меняются, то можно указать preservePartition = true или использовать замен mapValues
- если нужно использовать большой объект в функции, то лучше не делать замыкание - объявлять глобальную переменную в драйвере и использовать ее во внутренней функции.
Т.к. spark в этом случае сереализует эту переменную и разошлет всем executor.
Лучше создать broadcast переменную, которая разошлется только по нодам.
- Если нужна построчная обработка, то нужно использовать mapPartitions, чтобы уменьшить число смен контекста между партциями
rdd.mapPartitions(rows => rows.foreach(r => {some logic on row} )- если 100% точность не нужна, можно использовать апроксимационные функции:
--функция результат время(с) count(distinct check_id) --141490 24.599 approx_count_distinct(check_id,0.01) --139874 19.795 approx_count_distinct(check_id) --136133 16.956 approx_count_distinct(check_id,0.001) --141490 17.786- если в коде есть большие запросы (pivot в ML), то лучше увеличить perm size для JVM
- для heap меньше 32 ГБ следует использовать 4 байтовые указатели - UseCompressedOOps
Увеличение heap до 32ГБ активирует ипользование 64битных указателей, что увеличивает потребление памяти на несколько ГБ.
Так что выделть нужно сразу больше на несколько ГБ или пытаться уложиться в 32.
- дополнительные опции при создании таблицы через JDBC
.option("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Spark SQL test table'")- быстрая генерация сурогатных ключей
* monotonicali_increasing_id - поддерживается sql/df, но получаются очень большие числа (занимают много места)
* md5 - 128 бит - по всем полям таблицы - могут быть коллизии
* row_number - выполняется на дарйвере (не параллелится) , если сделать parittion by, то будет параллелится, но будет большой шафл
* zipwithindex - подходит только для rdd (нужно преобразовывать df в rdd, выполнять и обратно пересоздать df) - наиболее быстрый вариант
Уменьшение Shuffle
- Один из способов уменьшить Shuffle, это выполнить агрегацию данные как можно раньше на стадии map.
К примеру запрос поумолчанию:
select m.rpa_wgh3, SUM(rtsaexcust) rtsaexcust from rdw.pos_rec_itm i join rdw.bi0_pmaterial m on m.material = i.material where i.calday between '20201005' and '20201010' group by m.rpa_wgh3Выполняется 1,8 минут и вызывает 2,7ГБ shuffle по сети (Exchange data size total (min, med, max): 2.7 GB (89.2 MB, 105.5 MB, 164.9 MB)
Но если выполнить агрегации фактовой таблицы как можно раньше, то объем данных для shuffle получится сократить еще на этапе локального map
select m.rpa_wgh3, SUM(rtsaexcust) rtsaexcust from (select material, SUM(rtsaexcust) as rtsaexcust from rdw.pos_rec_itm i2 where i2.calday between '20201005' and '20201010' group by material) i join rdw.bi0_pmaterial m on m.material = i.material group by m.rpa_wgh3Подзапрос с группировокой "group by material" агрегируют данные там где они находятся и отправит в shuffle значительно меньший объем = 87.4 MB (Exchange data size total (min, med, max): 87.4 MB (3.4 MB, 3.6 MB, 3.9 MB)
Время запроса сокращается более чем в 3 раза до 34 секунд.
Данный прием имеет мало пользы в обычных (не кластерных) бд, т.к. там нет проблем с передачей данных по сети между параллельными процессами.
- count distinct 1 из операций, которая порождает большой shuffle из-за передачи хэш массива уникальных значений из map в reduce
Если count distinct считается на уникальной колонке, то сжатия данных при отправке в reduce вообще не произойдет.
По этому для расчета уникальных значений иногда лучше сделать все в рамках 1 ноды, но с большим объемом озу, чем на нескольких небольших машинах:
Запуск на 1 машине с 7 ядрами и 28ГБ - 27 секунд выполнения:
$ spark-shell --master yarn --driver-memory 8G --executor-memory 28G --num-executors 1 --executor-cores 7 val df = spark.sql("select check_id, client_id, plant, material, calday from rdw.pos_rec_itm where calday between '20200901' and '20201101'").persist df.count() df.agg(countDistinct($"client_id")).show() //27 secЗапуск на 7 машинах с выделенным 1 ядром и 4ГБ - 45 секунд (почти в 2 раза дольше за счет низкой локальности выполнения count distinct - большого shuffle):
spark-shell --master yarn --driver-memory 8G --executor-memory 4G --num-executors 7 val df = spark.sql("select check_id, client_id, plant, material, calday from rdw.pos_rec_itm where calday between '20200901' and '20201101'").persist df.count() df.agg(countDistinct($"client_id")).show() //45sПлан выполнения показыает, что через сеть было отправлено 1532.3 MB
map: aggregate time total (min, med, max): 2.3 m (290 ms, 512 ms, 1.6 s) peak memory total (min, med, max): 17.4 GB (72.0 MB, 72.0 MB, 72.0 MB) number of output rows: 50,210,554 shuffle: Exchange data size total (min, med, max): 1532.3 MB (4.9 MB, 6.1 MB, 8.0 MB)
- Также одним из best practics является выделение большего числа CPU на 1 worker - это должно уменьшить число Shuffle операций и overhead на подъем JVM в каждом worker
Уменьшение размера данных
- Выбирайте или подготавливайте наименьший необходимый тип данных под данные, если планируете их несколько раз переиспользовать в запросах.
Лучше 1 раз пронумеровать данные последовательностью Int строковые ключи, чем потом выполнять group/distinct на длинных строках исходных данных
- Если позволяет точность, то используйте встроенные типы данных с плавающей запятой double и float, вместо decimal.
Т.к. decimal программный тип данных и не может быть выполнен математическим сопроцессором CPU
Сравним скорость работы одни их тех же данных в decimal:
select sum(amount), AVG(amount/qnt) as price from tmp.pos_dec; 18316088570.13 78.15631680131521235406 Time taken: 112.448 secondsи в double
select cast(sum(amount) as decimal(18,2)), AVG(amount/qnt) as price from tmp.pos_dbl; 18316088570.1 78.15631680091742 Time taken: 14.531 seconds, Fetched: 1 row(s)Скорость несколько раз в лучше, если нет никаких дополнительных действий.
Но стоит помнить о неточностях при преобразовании чисел в двоичной системе счисления (double, float) в десятичную, т.к. плавающая часть данных после запятой приводится к десятичной посредством дробей:
0 или 1 / 2 ^ позиция в знаменателе
что может дать недопустимую погрешность при необходимости точных расчетов
Оптимизации в Spark 3
адптивные запросы
spark.sql.adaptive.enabled = true- автоматическое опредление кол-ва shuffle partition
- автоматический coalesce мелких shuffle part в большие
- смена типа соединения sort merge - broadcast
Другие оптимизации:
- dynmic partition pruning - broadcast таблицы справочника и динамичекое определние нужных партиций фактовой таблицы
- определение перкоса - coalesce для мелких перекошенных партиций в большие или наоборот разбитие больших на мелкие
- push предикатов на уровень хранилища для parquet
- Catalyst - разбивает длинные методы на несколько мелких (set SplitAggregateFunc.enabled = true)
Типы планировщиков Spark
- Fair Scheduller
можно задать лимит числа контейнеров для параллельных потоков внутри 1 submit
- spark.dynamicAllocation.enabled - динамическое определение ресурсов для onprem Spark в зависимости от числа партиций
Отличная статья! Много ценной информации. Спасибо, Алексей
ОтветитьУдалить