- ORC: формат файла
- Партицирование
- Кластеризация
- Сортировка
- ORC: Bloom filter
- Ускорение вставки в кластеризованную таблицу с динамическим партицированием
- Сравнение результатов
- Сравнение ORC с Parquet
- Hive: Predicate pushdown
- Hive: BroadCast Join
- Skew Join
- Hive 3: Materialized view
- Hive LLAP
- Iceberg
Кому лень читать, могу перейти сразу к сравнению результатов
ORC: формат файла
Документация: https://orc.apache.org/specification/ORCv1/ORC - это колоночный формат хранения данных, но колоночность в нем ограниченная, т.к. строки таблицы разбиваются на несколько частей:
Данные делятся на файлы по размеру, внутри строки разбиваются на stripe (размер stripe задаются параметром orc.stripe.size - обычно 64МБ).
Stripe состоит из частей:
* Index Data:
** Min/Max значение, количество строк, наличие NULL для каждой колонки внутри stripe целиком
** Index c Min/Max значением колонки для каждых 10 тыс. строк (hive.exec.orc.default.row.index.stride)
** Bloom filter для указанных колонок
** Смещение до данных колонки в Row Data из индекса
* Row Data - сжатые данные колонок (сжатие по умолчанию orc.compress = ZLIB)
Способы сжатия:
** Числовые данные хранятся с переменной длинной: <= 128 - 1 байт, <= 16383 - 2 байт и т.д.
** Повторяющиеся числа сохраняются как дельта (delta encoding)
** Возврастающие значения кодируются длиной неизменной дельты (run length encoding)
** Строки кодируются справочником (dictionary encoding).
Если число уникальных значений не превышает 0,8 (hive.exec.orc.dictionary.key.size.threshold) от общего числа строк, то на основании данных строковой колонки создается справочник и упорядочивается для бинарного поиска. В Row Data же вставляется id справочника.
* Footer - данные о кодировке
Такой формат хранения позволяет отсекать как строки читая индексы файла и stripe, так и колонки используя смещения в stripe.
Посмотрим влияние параметров orc на размер, время вставки:
Параметры по умолчанию
Параметры и таблица по умолчанию для сравнения с оптимизациями:Это будет таблица с продажами за 3 дня - 31 млн строк. Состоит из 42 столбца из которых 28 это ключевые строковые поля и 14 числовых показателей.
Все запросы будут выполняться с принудительным bucketing и sorting, также с логированием performance параметров.
set hive.server2.logging.operation.level=performance; set hive.enforce.bucketing = true; set hive.enforce.sorting = true; insert into pos_1_def select * from pos_rec_itm where calday between '20190601' and '20190603'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 11 0 0 276213.00 1,601,560 8,928 31,096,951 0 ------------------------------------------------------------------------------------------------------------------ Loading data to table pos_1_def Table pos_1_def stats: [numFiles=11, numRows=31096951, totalSize=1795231673, rawDataSize=72898554240] Time taken: 278.238 secondsПри параметрах по умолчанию время вставки составляет 278 секунд и размер файла получается 1712 МБ
Если задампить блок, то большинство строковых колонок содержат секцию со справочником:
Stream: column 9 section DATA start: 1060399 length 203242 Stream: column 9 section LENGTH start: 1263641 length 101 Stream: column 9 section DICTIONARY_DATA start: 1263742 length 64140Количество страйпов в 1 файле = 66
hive --orcfiledump /apps/hive/warehouse/pos_1_def/000000_0 | grep 'Stripe:' | wc -l 66
Максимальная доля уникальных значений для создания справочника
Попробуем уменьшить hive.exec.orc.dictionary.key.size.threshold с 0,8 до 0,2 и посмотреть как это повлияет:set hive.exec.orc.dictionary.key.size.threshold = 0.2; truncate table pos_1_def; insert into pos_1_def select * from pos_rec_itm where calday between '20190601' and '20190603' ; Table pos_1_def stats: [numFiles=11, numRows=31096951, totalSize=1983777997, rawDataSize=72898554240] Time taken: 196.922 secondsВремя вставки ускорилось с 278 секунд до 197 секунд, а рамзер файла возрос до 1892 МБ.
Из дампа у 9 строковой колонки пропал справочник:
Stream: column 9 section DATA start: 5890885 length 1096170 Stream: column 9 section LENGTH start: 6987055 length 929
Если наоборот увеличить необходимость справочника до 1:
set hive.exec.orc.dictionary.key.size.threshold = 1; truncate table pos_1_def; insert into pos_1_def select * from pos_rec_itm where calday between '20190601' and '20190603' ; Table pos_1_def stats: [numFiles=11, numRows=31096951, totalSize=1784757162, rawDataSize=72898554240] Time taken: 322.133 secondsТо время вставки возрастает до 322 секунд, а размер файла значительно не меняется.
Вывод: лучше оставить этот параметр по умолчанию = 0.8, как баланс между размером файла и скоростью вставки.
Отключать справочники, видимо, имеет смысл, если в таблице множество уникальных небольших строк, т.к. в этом случае замена короткой строки на длинное число из справочника не принесет должного сжатия.
Количество строк в stride
Попробуем поварировать число строк в индексе hive.exec.orc.default.row.index.stride (по умолчанию = 10000 ).Но проводить тест будем на сортированной таблице, т.к. на стандартной таблице индексы почти не отфильтровывают данные.
Стандартная сортированная таблица:
set hive.exec.orc.default.row.index.stride=10000; truncate table pos_3_sort; insert into pos_3_sort select * from pos_rec_itm where calday between '20190601' and '20190603' ; Table pos_3_sort stats: [numFiles=4, numRows=31096951, totalSize=1015100427, rawDataSize=72898553064] Time taken: 433.964 secondsЧто дает 968 МБ данных и 22 Stripe:
$ hive --orcfiledump /apps/hive/warehouse/pos_3_sort/000000_0 | grep 'Stripe:' | wc -l $ 22При отборе данных отфильтровываются данные индексом до 4млн:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where `/bic/client` = '0010000273'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 23 0 0 10525.00 278,840 15,736 4,216,492 23 Reducer 2 1 0 0 2985.00 1,850 80 23 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 11.599 seconds, Fetched: 1 row(s)Увеличим число строк в индексе в 10 раз:
set hive.exec.orc.default.row.index.stride=100000; truncate table pos_1_def; insert into pos_1_def select * from pos_rec_itm where calday between '20190601' and '20190603' ; Table pos_3_sort stats: [numFiles=4, numRows=31096951, totalSize=1013477928, rawDataSize=72898553064] Time taken: 475.205 secondsРазмер файла, как и время вставки осталось примерно одинаково.
Число Stripe уменьшились почти в 2 раза:
$ hive --orcfiledump /apps/hive/warehouse/pos_3_sort/000000_0 | grep 'Stripe:' | wc -l $ 12Но вот эффективность индекса при поиске по ключу значительно снижилась с 4 млн до 24 млн:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where `/bic/client` = '0010000273'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 24 0 0 11169.00 308,520 17,330 24,316,951 24 Reducer 2 1 0 0 3525.00 1,190 52 24 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 12.852 seconds, Fetched: 1 row(s)Вывод: изменение числа строк в индексе в моем примере существенно не влияет на время вставки и размер таблицы, но хорошо помогает при последующих select.
Если в таблице размер строки был меньше, то метаданные индекса могли создать дополнительную нагрузку на вставку и размер файла.
Также если значение колонки регулярно присутствует во всех продажах, то индекс не даст эффекта, т.к. значение попадет во все Stripe. Т.е. индекс Min/Max для диапазона эффективен для редковстречающихся значений (id клиента), и малоэффективен для повторяющихся значений (ид популярной акции или товара)
Партицирование
Не буду заострять внимание на этой опции. Партицирование есть во всех современных бд.Основное преимущество HIVE партицирования перед Oracle:
* Неограниченная вложенность
* автоматическое создание на оснований уникальных значений в колонке
Стоит обратить внимание, что hive поддерживает отсечение партиций не только по ключу, но и при join через справочник
Кластеризация
Другой возможностью разбиения данных на части, помимо партицирования, является кластеризация. В отличии от партицирования, кластеризация разбивает данные на файлы.Документация: LanguageManual+DDL+BucketedTables
Аналогом в Oracle является хэш партицирование.
Время вставки
Замерим показатели размера, время вставки и скорости select для кластеризованной таблицы.Кластеризуем таблицу на 4 части по колонке магазина:
CLUSTERED BY(plant) INTO 4 BUCKETS
insert into pos_2_clust select * from pos_rec_itm where calday between '20190601' and '20190603'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 11 0 0 97875.00 850,680 17,647 31,096,951 31,096,951 Reducer 2 4 0 0 387931.00 1,139,700 4,545 31,096,951 0 ------------------------------------------------------------------------------------------------------------------ Loading data to table pos_2_clust Table pos_2_clust stats: [numFiles=4, numRows=31096951, totalSize=1754969327, rawDataSize=72898553064] Time taken: 408.902 secondsОтличия в вставке от обычной таблицы:
* Появляется Reducer в который отправляются данные нужной хэш секции магазина.
* По этому время вставки возросло с 278 секунд до 408 (на 50% дольше)
* Размер данных незначительно сократился с 1712 МБ до 1673 МБ за счет локальности магазинов в одном файле
Select по ключу кластеризации
Сравним время выборки по ключу магазина для обычной таблицы и кластеризованнойВыборка по ключу из обычной таблицы считывает все 31 млн строк за 12 секунд:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_1_def where plant = '0002'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 14 0 0 10740.00 173,570 7,868 31,096,951 14 Reducer 2 1 0 0 6033.00 1,280 67 14 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 12.265 seconds, Fetched: 1 row(s)Из кластеризованной же считывается половина строк (15,5 млн) за 11 секунд:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_2_clust where plant = '0002'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 6 0 0 9935.00 63,060 3,315 15,485,169 6 Reducer 2 1 0 0 7096.00 350 29 6 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 10.889 seconds, Fetched: 1 row(s)Замечание: странно, что считалась половина строк, тогда как 1 магазин должен был попасть в 1 кластер, что должно было сократить объем чтения в 4 раза (1 / число кластеров)
Отличие Hive кластеризации от Spark
- разные алгоритмы хэширования, как следствие полная несовместимость (Spark бакетированную таблицу нельзя прочитать Hive)- Spark делает бакеты из мапперов (без стадии shuffle/reduce), что дает число файлов = число мапперов * число бакетов
Это ускоряет запись, но при чтении эти несколько файлов нужно слить в один, что замедляет чтение
Hive на стадии записи добавляет reducer, на котором бакеты из разным мапперов мержит в 1 файл
- Если к ключу бакетирования добавляется еще доп ключ, то все равно требуется полный shuffle
- если делается union одинаково бакетированных таблиц, то бакетирование теряется
Задача SPARK-19256 поддержки hive бакетирования еще не закрыта (фев.2021) .
В остальном Spark поддерживает динамическую перезапись партиций (spark.sql.sources.partitionOverwriteMode=DYNAMIC) и SMJ без Exchange hashpartitioning, но с сортировкой, т.к. в 1 бакете несколько файлов.
Сортировка
Еще сильней сжать данные можно, если отсортировать их. Тогда Orc сможет сжать файлы максимально эффективно.Документация: LanguageManualDDL-BucketedSortedTables
Основной сложностью при сортировке таблицы является выборк колонки по которой это сделать.
При выборе я руководствовался 2 факторами:
* Частота использования колонки в фильтрах
* Размер данных после сортировки
Для определения популярных фильтров на колонке можно распарсить логи запросов.
Для этого был написан небольшой скрипт: hive_predicate_stat.py, по результату которого стало ясно, что самые популярные фильтр по магазину (plant)
Для проверки размера данных проведем несколько замеров на 1 дне (calday):
Размер данных
Исходный размер одного дня без сортировок 688,5МБ:CLUSTERED BY(material) SORTED BY(material, `/bic/client`, plant) INTO 4 BUCKETS -- Кластеризация по товару, сортировка по товару, клиенту и магазину == 498 МБ CLUSTERED BY(material) SORTED BY(`/bic/client`, plant, material) INTO 4 BUCKETS -- Кластеризация по товару, сортировка по клиенту и магазину == 454 МБ CLUSTERED BY(`/bic/client`) SORTED BY(`/bic/client`, plant, material) INTO 4 BUCKETS -- Кластеризация по клиенту, сортировка по клиенту, магазину и товару == 388 МБ CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`, material) INTO 4 BUCKETS -- Кластеризация по магазину, сортировка по магазину и клиенту == 374.5 МБИсходя из этого был выбран вариант сжатия, который как ускорял выборку, так и хорошо сжимал данные: CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS
Замечу, что чем больше число бакетов, тем лучше будет отсечение лишних данных, но хуже сжатие.
Так 16 бакетов сжали данные до 554 МБ, тогда как 4 бакета дали 454 МБ
CLUSTERED BY(material) SORTED BY(`/bic/client`, plant, material) INTO 16 BUCKETS -- Кластеризация по товару, сортировка по товару, клиенту и магазину == 554 МБтак что степень = 4, выбрана как компромисс
Время вставки
Вставка данных в кластеризованно-сортированную таблицу: CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETSinsert into pos_3_sort select * from pos_rec_itm where calday between '20190601' and '20190603' ; Task Execution Summary ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 11 0 0 95966.00 900,790 18,164 31,096,951 31,096,951 Reducer 2 4 0 0 412328.00 1,228,640 5,323 31,096,951 0 ------------------------------------------------------------------------------------------------------------------ Loading data to table pos_3_sort Table pos_3_sort stats: [numFiles=4, numRows=31096951, totalSize=1014407061, rawDataSize=72898553064] Time taken: 433.964 secondsОтличия от обычной таблицы:
* Появляется Reducer в который отправляются данные нужной хэш секции магазина и сортирует.
* По этому время вставки возросло с 278 секунд до 433 секунд, что незначительно больше 408с при просто кластеризованной таблице
* Размер данных сократился с 1712 МБ до 967 МБ, т.е. почти в 2 раза!
Select по первому ключу сортировки
Если вернуться к описанию хранения индексов, видно, что данные по ключу могут попасть в любой stripe случайным образом, что значительно ухудшает эффективность встроенных индексов OrcЕсли же данные отсортировать по ключу поиска, то уникальные значения лягут в один Stripe и будут максимально эффективно отфильтровываться
<< Слева показан пример с выборкой уникального значения по обычной таблице и сортированной (красным - отсеченные stripe, зеленым - считываемые)
Проверим на реальных данных.
Выборка по ключу на обычной таблице считывает все 31 млн строк за 12 секунд:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from tmp.pos_1_def where plant = '0002'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 14 0 0 10740.00 173,570 7,868 31,096,951 14 Reducer 2 1 0 0 6033.00 1,280 67 14 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 12.265 seconds, Fetched: 1 row(s)По отсортированной таблице считывается в разы меньший объем строк за 4 секунды (в 3 раза быстрей):
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where plant = '0002'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 2 0 0 3039.00 8,760 872 180,000 2 Reducer 2 1 0 0 1594.00 1,700 42 2 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 4.342 seconds, Fetched: 1 row(s)
Select по второму ключу сортировки
Выборка по второму ключу поиска считывает все бакеты, но все равно отбирает значительно меньше строк из Stripe:select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where `/bic/client` = '0010000273'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 23 0 0 10525.00 278,840 15,736 4,216,492 23 Reducer 2 1 0 0 2985.00 1,850 80 23 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 11.599 seconds, Fetched: 1 row(s)
Select по ключу без сортировки
Select по ключу без сортировки получается одинаковый по обоим таблицам:select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_1_def where rt_promo = 'A122RP94'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 14 0 0 10926.00 173,190 8,356 31,096,951 14 Reducer 2 1 0 0 6504.00 1,190 46 14 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 11.675 seconds, Fetched: 1 row(s)
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where rt_promo = 'A122RP94'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 6 0 0 10300.00 71,340 3,946 31,096,951 6 Reducer 2 1 0 0 7234.00 1,950 111 6 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 11.384 seconds, Fetched: 1 row(s)
Sort Merge Bucket Join
Дополнительное преимущество которое дает сортированность таблиц - это возможность выполнения sort merge join без собственно сортировки и пересылки данных в reducer.Отсортируем и кластеризуем все таблицы в join одиково по магазину.
Также нужно включить sortmerge и bucketmapjoin, которые выключены по умолчанию в Hive.
Документация: LanguageManualJoinOptimization-AutoConversiontoSMBMapJoin
План по умолчанию на обычных таблицах:
set hive.auto.convert.sortmerge.join=true; set hive.optimize.bucketmapjoin = true; set hive.optimize.bucketmapjoin.sortedmerge = true; set hive.convert.join.bucket.mapjoin.tez=true; select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus from pos_1_def p join bi0_pplant m on m.plant = p.plant; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 14 2 0 25340.00 498,640 16,367 31,096,951 31,096,951 Map 4 1 0 0 7087.00 21,530 851 5,000 5,000 Reducer 2 145 0 0 18011.00 988,800 15,270 31,101,951 145 Reducer 3 1 0 0 6623.00 4,490 59 145 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 38.847 seconds, Fetched: 1 row(s)Join обычных таблиц генерирует 2 мапера, в которых считываются и сортируются данные.
После чего порции данных с одним ключем отправляются в Reducer и в нем происходит слияние 2 таблиц и последующий подсчет count и sum
В 3ий единственный reducer отправляется промежуточный результат count и sum , где досчитывается итоговая сумма и кол-во.
Самой дорогой операцией тут является Map1 - сортировка данных и отправка промежуточных результатов в Reducer2
Stage-0 Fetch Operator limit:-1 Stage-1 Reducer 3 File Output Operator [FS_12] compressed:false Statistics:Num rows: 1 Data size: 16 Group By Operator [GBY_10] | aggregations:["count(VALUE._col0)","sum(VALUE._col1)"] | outputColumnNames:["_col0","_col1"] | Statistics:Num rows: 1 Data size: 16 |<-Reducer 2 [SIMPLE_EDGE] Reduce Output Operator [RS_9] sort order: Statistics:Num rows: 1 Data size: 16 value expressions:_col0 (type: bigint), _col1 (type: double) Group By Operator [GBY_8] aggregations:["count()","sum(_col27)"] outputColumnNames:["_col0","_col1"] Statistics:Num rows: 1 Data size: 16 Merge Join Operator [MERGEJOIN_17] | condition map:[{"":"Inner Join 0 to 1"}] | keys:{"0":"plant (type: string)","1":"plant (type: string)"} | outputColumnNames:["_col27"] | Statistics:Num rows: 17103323 Data size: 40094206990 |<-Map 1 [SIMPLE_EDGE] vectorized | Reduce Output Operator [RS_19] | key expressions:plant (type: string) | Map-reduce partition columns:plant (type: string) | sort order:+ | Statistics:Num rows: 15548476 Data size: 36449278292 | value expressions:rtsaexcust (type: double) | Filter Operator [FIL_18] | predicate:plant is not null (type: boolean) | Statistics:Num rows: 15548476 Data size: 36449278292 | TableScan [TS_0] | alias:p | Statistics:Num rows: 31096951 Data size: 72898554240 |<-Map 4 [SIMPLE_EDGE] vectorized Reduce Output Operator [RS_21] key expressions:plant (type: string) Map-reduce partition columns:plant (type: string) sort order:+ Statistics:Num rows: 2500 Data size: 4681364 Filter Operator [FIL_20] predicate:plant is not null (type: boolean) Statistics:Num rows: 2500 Data size: 4681364 TableScan [TS_1] alias:m Statistics:Num rows: 5000 Data size: 9362729Повторим тест, но на одинаково кластеризованно-сортированных таблицах:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus from pos_3_sort p join bi0_pplant m on m.plant = p.plant; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 6 0 0 23180.00 201,040 8,189 31,096,951 6 Reducer 2 1 0 0 17839.00 1,200 46 6 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 24.346 seconds, Fetched: 1 row(s)Теперь обе таблицы читаются и соединяются в одном маппере без сортировки и пересылки по сети и запрос отработал почти в 2 раза быстрей:
Stage-0 Fetch Operator limit:-1 Stage-1 Reducer 2 File Output Operator [FS_12] compressed:false Statistics:Num rows: 1 Data size: 16 Group By Operator [GBY_10] | aggregations:["count(VALUE._col0)","sum(VALUE._col1)"] | outputColumnNames:["_col0","_col1"] | Statistics:Num rows: 1 Data size: 16 |<-Map 1 [SIMPLE_EDGE] Reduce Output Operator [RS_9] sort order: Statistics:Num rows: 1 Data size: 16 value expressions:_col0 (type: bigint), _col1 (type: double) Group By Operator [GBY_8] aggregations:["count()","sum(_col27)"] outputColumnNames:["_col0","_col1"] Statistics:Num rows: 1 Data size: 16 Merge Join Operator [MERGEJOIN_17] | condition map:[{"":"Inner Join 0 to 1"}] | keys:{"0":"plant (type: string)","1":"plant (type: string)"} | outputColumnNames:["_col27"] | Statistics:Num rows: 17103323 Data size: 40094206343 | |<-Filter Operator [FIL_16] | predicate:plant is not null (type: boolean) | Statistics:Num rows: 2500 Data size: 4681364 | TableScan [TS_1] | alias:m | Statistics:Num rows: 5000 Data size: 9362729 |<-Filter Operator [FIL_15] predicate:plant is not null (type: boolean) Statistics:Num rows: 15548476 Data size: 36449277704 TableScan [TS_0] alias:p Statistics:Num rows: 31096951 Data size: 72898553064Причем замечу, что этот подход работает и для просто одинаково кластеризованных таблиц без сортировки.
Сортировка добавляется внутрь map1, что немного медленней, но все равно экономит кучу времени на shuffling - пересылку данных по сети в reducer.
ORC: Bloom filter
Что такое Bloom фильтр описывалось уже несколько раз в этом блоге: oracle-bloom-filterBloom фильтр также может применяться в orc файлах и очень удобно подходит для фильтрации на не отсортированных колонках, т.к. индексы на них неэффективны.
Bloom filter же дает дополнительную информацию о содержании колонки в stripe
Время вставки
Протестируем работу на кластеризованно-сортированной таблице с bloom filter на 4 колонках.Замечу, что названия колонок нужно задавать без кавычек
CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS stored as orc
tblproperties ("orc.bloom.filter.columns"="material,rt_promo,/bic/client,/bic/card12,", "orc.bloom.filter.fpp"="0.05")
insert into pos_4_bloom select * from pos_rec_itm where calday between '20190601' and '20190603' ; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 11 0 0 105566.00 957,410 19,722 31,096,951 31,096,951 Reducer 2 4 0 0 428483.00 1,247,480 5,818 31,096,951 0 ------------------------------------------------------------------------------------------------------------------ Loading data to table pos_4_bloom Table pos_4_bloom stats: [numFiles=4, numRows=31096951, totalSize=1066863577, rawDataSize=72898553064] Time taken: 450.133 secondsОтличия от кластеризованно-сортированной таблицы:
* Время вставки возросло с 433 секунд до 450 секунд, что незначительно больше
* Размер данных возрос с 967 МБ до 1 017 МБ, что на 5% больше
В индексных данных колонки появляется дополнительный раздел "BLOOM_FILTER":
Stripe: offset: 805493730 data: 13149108 rows: 410000 tail: 799 index: 285445 Stream: column 0 section ROW_INDEX start: 805493730 length 20 ... Stream: column 18 section ROW_INDEX start: 805502343 length 570 Stream: column 18 section BLOOM_FILTER start: 805502913 length 260902
Select по ключу bloom filter
Возьмем из пункта про сортировку запрос и сравним с ним.Запрос к таблице с блум фильтром:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_4_bloom where rt_promo = 'A122RP94'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 8 1 0 10105.00 83,770 4,074 330,000 8 Reducer 2 1 0 0 6507.00 370 0 8 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 10.839 seconds, Fetched: 1 row(s)Отбирается 330т строк, вместо 31 млн по умолчанию
Если взять колонку на которой есть сортировка и блум фильтр, то результат еще лучше:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_4_bloom where `/bic/client` = '0010000273'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 24 0 0 10325.00 286,540 17,450 10,000 24 Reducer 2 1 0 0 1953.00 6,490 86 24 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 11.958 seconds, Fetched: 1 row(s)В просто сортированной таблице отбиралось 24 млн строк, а с bloom фильтром всего 10 т.
Увеличение False positive Bloom filter
Попробуем увеличить вероятность ложных срабатываний bloom фильтра (False positive с 0,05 до 0,2), чтобы оценить влияние на вставку и выборку:CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS stored as orc
tblproperties ("orc.bloom.filter.columns"="material,rt_promo,/bic/client,/bic/card12,", "orc.bloom.filter.fpp"="0.2")
insert into pos_4_bloom2 select * from pos_rec_itm where calday between '20190601' and '20190603'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 11 0 0 98033.00 976,480 19,705 31,096,951 31,096,951 Reducer 2 4 0 0 425676.00 1,251,980 4,976 31,096,951 0 ------------------------------------------------------------------------------------------------------------------ Loading data to table pos_4_bloom2 Table pos_4_bloom2 stats: [numFiles=4, numRows=31096951, totalSize=1042572361, rawDataSize=72898553064] Time taken: 446.657 secondsОтличия от bloom filter по умолчанию:
* Время примерно одно
* Размер данных стал меньше с 1 017 МБ до 994 МБ (из-за уменьшения битовой карты)
Но увеличение False positive не изменило размер фильтрации - те же 330т строк:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_4_bloom2 where rt_promo = 'A122RP94'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 6 0 0 9101.00 57,690 3,591 330,000 6 Reducer 2 1 0 0 5991.00 1,540 46 6 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 10.165 seconds, Fetched: 1 row(s)И по клиенту - те же 10т строк:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_4_bloom2 where `/bic/client` = '0010000273'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 21 0 0 9935.00 249,380 13,191 10,000 21 Reducer 2 1 0 0 2590.00 2,370 79 21 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 11.007 seconds, Fetched: 1 row(s)
Ускорение вставки в кластеризованную таблицу с динамическим партицированием
Существенный минус при кластеризации партицированной таблицы, что вместо большого количества reducer мы получаем число писателей = числу бакетов.И если происходит вставка большого объема данных это существенно замедляет процесс.
Запрос по умолчанию имеет 36 начальных мапперов, но всего 4 пишущих reducer.
В итоге время вставки = 1497 секунд.
insert OVERWRITE table DWH.TBL PARTITION (part_col) select * from STG.TBL; -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 36 36 0 0 0 0 Reducer 2 ...... SUCCEEDED 4 4 0 0 0 0 --------------------------------------------------------------------------------
Проблему можно частично решить, если установить параметр
SET hive.optimize.sort.dynamic.partition=true;- данные будут отсортированы по колонке партицирования и создано большее число reducer, что ускорит вставку
Этот прием дает незначительное ускорение. Общее время = 1054 секунд.
SET hive.optimize.sort.dynamic.partition=true; insert OVERWRITE table DWH.TBL PARTITION (part_col) select * from STG.TBL; -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 36 36 0 0 0 0 Reducer 2 ...... SUCCEEDED 253 253 0 0 0 0 --------------------------------------------------------------------------------Но надо помнить, что этот параметр нестабилен и может применяться только для простых вставках без группировок.
Лучший вариант, это переписать вставку таким образом, чтобы чтение исходной таблицы было 1, а вставок несколько.
По итогу исходная таблица читается 1 раз (MAP), а пишется сразу несколькими reducer в нужные партиции.
Тут получается максимальное ускорение. Общее время = 461 секунд.
from STG.TBL s insert OVERWRITE table DWH.TBL PARTITION (part_col) select s.* where s.part_col = 1 insert OVERWRITE table DWH.TBL PARTITION (part_col) select s.* where s.part_col = 2 insert OVERWRITE table DWH.TBL PARTITION (part_col) select s.* where s.part_col = 3; -------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 36 36 0 0 0 0 Reducer 1 ..... SUCCEEDED 4 4 0 0 0 0 Reducer 2 RUNNING 4 0 4 0 0 0 Reducer 3 RUNNING 4 0 4 0 0 0 --------------------------------------------------------------------------------
Кроме собственно вставки, таким образом можно ускорять повторяющиеся запросы, у которых одно основание, но разные уровни агрегации.
К примеру, нам надо подсчитать число уникальны чеков в разрезе магазина + группы товара и магазина + товара.
Это нужно делать 2 запросами к одним и теми же данным, т.к. в случае count distinct нельзя переиспользовать промежуточные результаты.
1 запрос с группировкой до магазина + группы:
insert OVERWRITE table tmp.checks_wgh4 select i.plant, m.rpa_wgh4, COUNT(DISTINCT i.check_id) as pr_check from rdw.pos_rec_itm i join rdw.BI0_PMATERIAL m on m.material = i.material join rdw.bi0_pplant p on i.plant = p.plant where i.calday between '20201005' and '20201010' AND m.rpa_wgh2 between '11' and '20' AND p.`/bic/zfrmttyp` = 'H' group by i.plant, m.rpa_wgh4; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 24 0 0 30604.00 756,840 41,165 60,417,666 11,928,929 Map 3 1 0 0 2850.00 10,280 259 558,756 41,656 Map 4 2 0 0 12414.00 13,210 301 5,033 305 Reducer 2 253 0 0 31972.00 2,103,120 31,721 11,928,929 0 ------------------------------------------------------------------------------------------------------------------ Loading data to table tmp.checks_wgh4 Table tmp.checks_wgh4 stats: [numFiles=253, numRows=58079, totalSize=421037, rawDataSize=10802694] OK Time taken: 58.198 seconds2 запрос до магазина+товара, использует те же map для считывания данных и отличаются только стадией reduce
insert OVERWRITE table tmp.checks_material select i.plant, i.material, COUNT(DISTINCT i.check_id) as pr_check from rdw.pos_rec_itm i join rdw.BI0_PMATERIAL m on m.material = i.material join rdw.bi0_pplant p on i.plant = p.plant where i.calday between '20201005' and '20201010' AND m.rpa_wgh2 between '11' and '20' AND p.`/bic/zfrmttyp` = 'H' group by i.plant, i.material; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 24 0 0 20754.00 930,540 31,129 60,417,666 12,219,596 Map 3 1 0 0 1666.00 5,880 64 558,756 41,656 Map 4 2 0 0 6734.00 15,940 387 5,033 305 Reducer 2 253 0 0 23094.00 1,534,680 22,291 12,219,596 0 ------------------------------------------------------------------------------------------------------------------ Loading data to table tmp.checks_material Table tmp.checks_material stats: [numFiles=253, numRows=314669, totalSize=1919892, rawDataSize=62304462] OK Time taken: 44.262 secondsЧто дает 58+44 = 102с работы запросов.
Для оптимизации можно сделать join и сохранить полный набор данных в физическую таблицу, либо использовать функционал insert all.
insert all - предпочтительней, потому что в этом случае данные остаются в памяти и не тратится время на повторное сохранение и считывание данных с диска.
FROM ( select i.plant, i.material, m.rpa_wgh4, i.check_id from rdw.pos_rec_itm i join rdw.BI0_PMATERIAL m on m.material = i.material join rdw.bi0_pplant p on i.plant = p.plant where i.calday between '20201005' and '20201010' AND m.rpa_wgh2 between '11' and '20' AND p.`/bic/zfrmttyp` = 'H' ) i insert OVERWRITE table tmp.checks_wgh4 select i.plant, i.rpa_wgh4, COUNT(DISTINCT i.check_id) as pr_check group by i.plant, i.rpa_wgh4 insert OVERWRITE table tmp.checks_material select i.plant, i.material, COUNT(DISTINCT check_id) as pr_check group by i.plant, i.material ; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 24 0 0 28257.00 952,580 17,621 60,417,666 24,148,525 -- мапперы выполняются 1 раз Map 4 1 0 0 619.00 2,210 0 558,756 41,656 Map 5 2 0 0 5967.00 14,330 291 5,033 305 Reducer 2 253 0 0 20250.00 797,850 16,972 24,148,525 0 -- reducer от обоих запросов группировки Reducer 3 253 0 0 28019.00 1,326,100 18,884 24,148,525 0 ------------------------------------------------------------------------------------------------------------------ Loading data to table tmp.checks_wgh4 Loading data to table tmp.checks_material Table tmp.checks_wgh4 stats: [numFiles=253, numRows=58079, totalSize=421037, rawDataSize=10802694] Table tmp.checks_material stats: [numFiles=253, numRows=314669, totalSize=1919892, rawDataSize=62304462] OK Time taken: 64.394 secondsПереиспользование промежуточных данных в памяти позволяет ускорить запрос почти в 2 раза до 64 секунд.
Стоит заметить, что insert all есть почти во всех популярных бд.
Но в случае Oracle максимум, что можно сделать в insert all - это фильтрация перед вставкой в кокретную таблицу:
INSERT ALL WHEN id <= 3 THEN INTO dest_tab1 (id, description) VALUES (id, description) WHEN id BETWEEN 4 AND 7 THEN INTO dest_tab2 (id, description) VALUES (id, description) WHEN 1=1 THEN INTO dest_tab3 (id, description) VALUES (id, description) SELECT id, description FROM source_tabHive же дает полный набор синтаксиса, включая группировки, агрегации и т.д.
Сравнение результатов
Сравнивать будем время вставки, объем данных и число выбранных данных.В select сравниваем не время, т.к. слишком большая погрешность и многое зависит от числа читающих мапперов.
Лучше - меньше (выделено цветом):
Показатель | По умолчанию | Кластеризованная | Кластеризованная и сортированная | Кластеризованная, сортированная и с bloom фильтром |
Время вставки (сек) | 278 | 408 | 434 | 450 |
Размер (МБ) | 1 712 | 1 673 | 967 | 1 017 |
Выборка по ключу (Магазин) строк | 31 096 951 | 15 485 169 | 180 000 | 180 000 |
Выборка по вторичному ключу (Клиент) строк | 31 096 951 | 31 096 951 | 4 216 492 | 10 000 |
Выборка по не ключевому полю (Акция) строк | 31 096 951 | 31 096 951 | 31 096 951 | 330 000 |
Join по ключу (Магазин) сек | 38.8 | 25 | 24.3 | 24.3 |
Сравнение ORC с Parquet
Создание таблицы
Создадим 2 одинаковые партицированно-кластеризованных таблицыORC
PARTITIONED BY ( `calday` string) CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS stored as orc tblproperties ("orc.bloom.filter.columns"="rt_promo", "orc.bloom.filter.fpp"="0.05")Parquet (не поддерживает bloom filter)
PARTITIONED BY ( `calday` string) CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS stored as parquet
Вставка и размер файлов
Вставим в таблицу 1 день продаж, всего 12 363 714 строк.Размер получившихся таблиц:
ORC с ZLIB сжатием = 308.6 M
Parquet с GZIP чуть больше = 324.3 M
Orc на таблице продаж обеспечивает лучшее сжатие
Метаданные parquet файлов
Данные о данных можно посмотреть утилитой "parquet-tools", которую нужно установить отдельно.Файл также бьется на области, текстовые поля кодируются справочниками. Но по метаданным не видно минимального/максимального значения.
$ hadoop jar ./parquet-tools-1.8.2.jar meta /apps/hive/warehouse/tmp/parquet/calday=20190601/000000_0 file schema: hive_schema -------------------------------------------------------------------------------- ... sales_unit: OPTIONAL BINARY O:UTF8 R:0 D:1 cpsaexcubu: OPTIONAL FIXED_LEN_BYTE_ARRAY O:DECIMAL R:0 D:1 ... row group 1: RC:3303124 TS:682794260 OFFSET:4 -------------------------------------------------------------------------------- ... sales_unit: BINARY GZIP DO:0 FPO:34513471 SZ:467378/1316597/2.82 VC:3303124 ENC:PLAIN_DICTIONARY,BIT_PACKED,RLE cpsaexcubu: FIXED_LEN_BYTE_ARRAY GZIP DO:0 FPO:34980849 SZ:3360465/26426447/7.86 VC:3303124 ENC:BIT_PACKED,RLE,PLAIN ...Hive для parquet сохраняет названия полей в файле, а для orc хранит отдельно.
Но spark сохраняет названия полей и в ORC
Скорость полного чтения
ORC - 7 секунд , 4 маппера = 4 файламselect COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from tmp.orc where calday = '20190601'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 4 0 0 6910.00 56,290 4,443 12,363,714 4 Reducer 2 1 0 0 3201.00 2,270 56 4 0 ------------------------------------------------------------------------------------------------------------------Parquet - время чтения в 4 раза больше и почему то 4 файла читаются 22 мапперами:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from tmp.parquet where calday = '20190601'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 22 0 0 17097.00 253,100 16,257 12,363,714 22 Reducer 2 1 0 0 11810.00 6,910 103 22 0 ------------------------------------------------------------------------------------------------------------------
Скорость чтения с фильтром по полю CLUSTERED
ORC - считалось 2 из 4 бакета, но из файлов взялись только нужные stride, что дало 80т строк на входе мапперов:select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from tmp.orc where calday = '20190601' and plant = '0002'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 2 0 0 2240.00 5,090 552 80,000 2 Reducer 2 1 0 0 824.00 1,110 90 2 0 ------------------------------------------------------------------------------------------------------------------Parquet - отобралась почему то ровно половина строк, хотя есть точный фильтр по plant, который участвует в CLUSTERED и Sort
что по итогу дало время больше в 5 раз:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from tmp.parquet where calday = '20190601' and plant = '0002'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 22 0 0 10328.00 212,600 14,263 6,083,421 22 Reducer 2 1 0 0 7686.00 2,060 66 22 0 ------------------------------------------------------------------------------------------------------------------
Скорость чтения с фильтром по полю с Bloom filter
Parquet не поддерживаются настраиваемые bloom filterORC - читались все 4 файла, но благодаря bloom фильтру из 12 млн строк в мапперы попало только 130т
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from rdw.orc where calday = '20190601' and rt_promo = 'A122RP94'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 4 0 0 4062.00 22,240 2,926 130,000 4 Reducer 2 1 0 0 1632.00 730 0 4 0 ------------------------------------------------------------------------------------------------------------------Parquet - отбираются все 12М строки, как следствие запрос работает в 4 раза дольше
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from stg.parquet where calday = '20190601' and rt_promo = 'A122RP94'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 22 0 0 16239.00 243,110 12,791 12,363,714 22 Reducer 2 1 0 0 10475.00 9,760 94 22 0 ------------------------------------------------------------------------------------------------------------------
Распространенность форматов
ORC на моих замерах выигрывает выигрывает во всех тестах. В чем же тогда такая популярность parquet?Дата релиза Parquet - 13 March 2013, Orc - 20 February 2013. Практически одновременно.
Вероятная причина в том, что Parquet сразу делал клиентов под разные языки и нативные клиенты сразу могли использовать его, т.е. складывалась экосистема.
Orc долго использовался только в хайве и jvm языках. Только начиная с 16-17 годов был создан нативный c++ клиент, который могли использовать не jvm приложения.
Итоговое сравнение
Показатель | Orc | Parquet |
Размер (МБ) | 308.6 | 324.3 |
Полное сканирование (сек) | 6.9 | 17.1 |
Селективность фильтра по кластеризованному столбцу (строк) | 80 000 | 6 083 421 |
Селективность фильтра по Bloom фильтру (строк) | 130 000 | - |
Распростаненность | + | +++ |
Hive: Predicate pushdown
Кроме фильтрации в where проверим передачу фильтра через join:Join predicate pushdown
1 тест на обычных таблицах, чтобы проверить передачу фильтра в индексы Stripe.bi0_pmaterial - справочник товаров отфильтрован по штрих коду и отбирает 1 товар.
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus from pos_1_def p join bi0_pmaterial m on m.material = p.material where m.eanupc = '4615041324532'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 14 0 0 12214.00 217,980 8,513 31,096,952 14 Map 3 4 0 0 6315.00 17,350 90 480,000 1 Reducer 2 1 0 0 5590.00 1,200 0 14 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 14.558 seconds, Fetched: 1 row(s)Фильтр по товару не передался в маппер, из продаж выбрались все данные.
Попробуем повторить тест на сортированной таблице с блум фильтрами на колонках товара (material)
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus from pos_4_bloom p join bi0_pmaterial2 m on m.material = p.material where m.eanupc = '4615041324532'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 8 0 0 38212.00 110,160 3,471 31,096,952 8 Map 3 4 0 0 413.00 2,630 98 480,000 1 Reducer 2 1 0 0 34809.00 600 0 8 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 39.68 seconds, Fetched: 1 row(s)К сожалению фильтр по товару также не передался в маппер, из продаж выбрались все данные.
Похоже на текущий момент Hive не поддерживает pushdown предикатов из справочника на таблицу с фактами
Partition pruning
Проверим отсечение партиций при соединений через справочник.Очень частый кейс для BI инструментов, где фильтрация происходит через календарь:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcust from pos_rec_itm p join calendar c on c.calday = p.calday where c.calmonth = '201906' and c.calday1 IN( '01', '02') ; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 8 0 0 17620.00 133,190 4,249 2 8 Map 3 1 0 0 1847.00 5,030 97 3,655 2 Reducer 2 1 0 0 11037.00 3,230 77 8 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 31.376 seconds, Fetched: 1 row(s)Фильтр в продажи удачно передался и ненужные партиции откинулись из продаж
Это видно по плану - сгенерированный FIL_23 забродкастился в маппер фильтрации продаж :
Stage-0 Fetch Operator limit:-1 Stage-1 Reducer 2 File Output Operator [FS_12] compressed:false Statistics:Num rows: 1 Data size: 16 Group By Operator [GBY_10] | aggregations:["count(VALUE._col0)","sum(VALUE._col1)"] | outputColumnNames:["_col0","_col1"] | Statistics:Num rows: 1 Data size: 16 |<-Map 1 [SIMPLE_EDGE] vectorized Reduce Output Operator [RS_9] sort order: Statistics:Num rows: 1 Data size: 16 value expressions:_col0 (type: bigint), _col1 (type: double) Group By Operator [OP_28] aggregations:["count()","sum(_col27)"] outputColumnNames:["_col0","_col1"] Statistics:Num rows: 1 Data size: 16 Map Join Operator [MAPJOIN_27] | condition map:[{"":"Inner Join 0 to 1"}] | HybridGraceHashJoin:true | keys:{"Map 3":"calday (type: string)","Map 1":"calday (type: string)"} | outputColumnNames:["_col27"] | Statistics:Num rows: 16551844040 Data size: 40150488709349 |<-Map 3 [BROADCAST_EDGE] vectorized | Reduce Output Operator [RS_24] | key expressions:calday (type: string) | Map-reduce partition columns:calday (type: string) | sort order:+ | Statistics:Num rows: 457 Data size: 498587 | Filter Operator [FIL_23] | predicate:(calday is not null and (calmonth = '201906') and (calday1) IN ('01', '02')) (type: boolean) | Statistics:Num rows: 457 Data size: 498587 | TableScan [TS_1] | alias:c | Statistics:Num rows: 3655 Data size: 3987605 | Dynamic Partitioning Event Operator [EVENT_20] | Statistics:Num rows: 457 Data size: 498587 | Group By Operator [OP_26] | keys:_col0 (type: string) | outputColumnNames:["_col0"] | Statistics:Num rows: 457 Data size: 498587 | Select Operator [OP_25] | outputColumnNames:["_col0"] | Statistics:Num rows: 457 Data size: 498587 | Please refer to the previous Filter Operator [FIL_23] |<-TableScan [TS_0] alias:p Statistics:Num rows: 15047130620 Data size: 36500443490101 Basic stats: COMPLETE Column stats: PARTIAL
Hive: BroadCast Join
Если таблица факта соединяется с достаточно небольшим справочником, то такой join может быть оптимизирован путем исключения фазы reducer и передачи хэш массива справочника во все мапперы, читающие таблицу факта для последующей фильтрацииДопустимый размер хэш массива задается параметром: hive.auto.convert.join.noconditionaltask.size
Документация: LanguageManualJoinOptimization-OptimizeChainsofMapJoins
Запустим запрос, но отключим broadcast, ограничив размер хэш массива:
set hive.auto.convert.join.noconditionaltask.size = 1; select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus from pos_1_def p join bi0_pmaterial m on m.material = p.material where m.eanupc = '4615041324532'; VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 15 0 0 29683.00 517,290 17,651 31,096,951 31,096,951 Map 4 1 0 0 3887.00 10,730 666 510,549 1 Reducer 2 145 0 0 31143.00 1,535,900 24,726 31,096,952 145 Reducer 3 1 0 0 8275.00 10,270 177 145 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 44.604 seconds, Fetched: 1 row(s)Запрос выполнился за 44,6 секунды.
План стандартный через merge join: таблицы по отдельности читаются, сортируются, сливаются в отдельном reducer, а потом группируются.
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) Reducer 3 <- Reducer 2 (SIMPLE_EDGE) Stage-0 Fetch Operator limit:-1 Stage-1 Reducer 3 File Output Operator [FS_13] compressed:false Statistics:Num rows: 1 Data size: 16 Group By Operator [GBY_11] | aggregations:["count(VALUE._col0)","sum(VALUE._col1)"] | outputColumnNames:["_col0","_col1"] | Statistics:Num rows: 1 Data size: 16 |<-Reducer 2 [SIMPLE_EDGE] Reduce Output Operator [RS_10] sort order: Statistics:Num rows: 1 Data size: 16 value expressions:_col0 (type: bigint), _col1 (type: double) Group By Operator [GBY_9] aggregations:["count()","sum(_col27)"] outputColumnNames:["_col0","_col1"] Statistics:Num rows: 1 Data size: 16 Merge Join Operator [MERGEJOIN_18] | condition map:[{"":"Inner Join 0 to 1"}] | keys:{"0":"material (type: string)","1":"material (type: string)"} | outputColumnNames:["_col27"] | Statistics:Num rows: 17103323 Data size: 40110666850 |<-Map 1 [SIMPLE_EDGE] vectorized | Reduce Output Operator [RS_20] | key expressions:material (type: string) | Map-reduce partition columns:material (type: string) | sort order:+ | Statistics:Num rows: 15548476 Data size: 36464241801 | value expressions:rtsaexcust (type: double) | Filter Operator [FIL_19] | predicate:material is not null (type: boolean) | Statistics:Num rows: 15548476 Data size: 36464241801 | TableScan [TS_0] | alias:p | Statistics:Num rows: 31096951 Data size: 72928481257 |<-Map 4 [SIMPLE_EDGE] vectorized Reduce Output Operator [RS_22] key expressions:material (type: string) Map-reduce partition columns:material (type: string) sort order:+ Statistics:Num rows: 127637 Data size: 684861875 Filter Operator [FIL_21] predicate:(material is not null and (eanupc = '4615041324532')) (type: boolean) Statistics:Num rows: 127637 Data size: 684861875 TableScan [TS_1] alias:m Statistics:Num rows: 510549 Data size: 2739452867Теперь выполним без ограничений:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus from pos_1_def p join bi0_pmaterial m on m.material = p.material where m.eanupc = '4615041324532'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 15 0 0 18323.00 228,930 7,344 31,096,952 15 Map 3 1 0 0 2444.00 9,050 136 510,549 1 Reducer 2 1 0 0 10164.00 2,250 0 15 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 22.365 seconds, Fetched: 1 row(s)Время выполнения почти в 2 раза быстрей: 22,3 секунд.
По плану видно, что хэш массив создается до основного выполнения и передается во все мапперы, что позволяет выполнить запрос без передачи большой таблицы фактов в reducer.
Map 1 <- Map 3 (BROADCAST_EDGE) Reducer 2 <- Map 1 (SIMPLE_EDGE) Stage-0 Fetch Operator limit:-1 Stage-1 Reducer 2 File Output Operator [FS_13] compressed:false Statistics:Num rows: 1 Data size: 16 Group By Operator [GBY_11] | aggregations:["count(VALUE._col0)","sum(VALUE._col1)"] | outputColumnNames:["_col0","_col1"] | Statistics:Num rows: 1 Data size: 16 |<-Map 1 [SIMPLE_EDGE] vectorized Reduce Output Operator [RS_10] sort order: Statistics:Num rows: 1 Data size: 16 value expressions:_col0 (type: bigint), _col1 (type: double) Group By Operator [OP_24] aggregations:["count()","sum(_col27)"] outputColumnNames:["_col0","_col1"] Statistics:Num rows: 1 Data size: 16 Map Join Operator [MAPJOIN_23] | condition map:[{"":"Inner Join 0 to 1"}] | HybridGraceHashJoin:true | keys:{"Map 3":"material (type: string)","Map 1":"material (type: string)"} | outputColumnNames:["_col27"] | Statistics:Num rows: 17103323 Data size: 40110666850 |<-Map 3 [BROADCAST_EDGE] vectorized | Reduce Output Operator [RS_21] | key expressions:material (type: string) | Map-reduce partition columns:material (type: string) | sort order:+ | Statistics:Num rows: 127637 Data size: 684861875 | Filter Operator [FIL_20] | predicate:(material is not null and (eanupc = '4615041324532')) (type: boolean) | Statistics:Num rows: 127637 Data size: 684861875 | TableScan [TS_1] | alias:m | Statistics:Num rows: 510549 Data size: 2739452867 |<-Filter Operator [FIL_22] predicate:material is not null (type: boolean) Statistics:Num rows: 15548476 Data size: 36464241801 TableScan [TS_0] alias:p Statistics:Num rows: 31096951 Data size: 72928481257
Skew Join
Проявление
Одна из распространенных проблем при выполнении параллельного join - перекос данных к одному из значений.Перекос - это когда основной объем данных приходятся к одному уникальному значению ключа.
Тогда при выполнение hash join или merge join одно уникальное значение уходит в 1 из потоков/reducer и запрос по итогу выполняется однопоточно.
В Spark это будет выглядеть так (медианное время = 0,4с, максимальное = 2,2 минуты ):
Проблема перекоса данных в Oracle описывалась раньше
Смоделируем перекос данных пересоздав таблицу отдав 98% данных одному значению "/bic/client" = '0010000273'
truncate table pos_1_def; insert into pos_1_def select v.*, case when calday = '20190601' and material like '%9' then coalesce(`/bic/client`,'0010000273') else '0010000273' end as client_id from pos_rec_itm v where calday between '20190601' and '20190610'Проверим распределение:
select COUNT(*) as cnt, COUNT(case when client_id = '0010000273' then 1 end) as cnt_skew, COUNT(case when client_id = '0010000273' then 1 end) / COUNT(*) as prc from pos_1_def; 103507863 102337257 0.9886906562837646Выполним запрос на таблице по умолчанию:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus from pos_1_def p join bic_pclient m on m.client_id = p.client_id ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 40 0 0 76687.00 2,120,690 165,023 103,507,863 103,507,863 Map 4 253 0 0 70310.00 3,217,510 349,357 38,007,934 38,007,934 Reducer 2 253 0 0 493257.00 3,056,730 82,336 141,515,797 253 Reducer 3 1 0 0 459041.00 11,120 112 253 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 552.744 seconds, Fetched: 1 row(s)552 секунд и последний job "reducer 2" работал дольше всего.
Исправление через Union и Broadcast
Одним из способов устранения перекоса - это выполнение Broadcast HJ.В этом случае не происходит перераспределение (reduce / shuffle) данных по ключу и данные джойнятся прямо в мапперах.
Если таблица-справочник слишком большая, чтобы сделать Broadcast, то запрос можно разбить на 2 части:
* перекошенное значение, которое точно влезет в Broadcast
* все остальные значения (т.к. они равномерные, то тип join тут не принципиален)
select /*+ MAPJOIN(m) */ COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus from pos_1_def p join (select MAX(client_id) as client_id from bic_pclient where client_id = '0010000273' group by client_id) m on m.client_id = p.client_id where p.client_id = '0010000273' UNION ALL select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus from pos_1_def p join bic_pclient m on m.client_id = p.client_id where p.client_id != '0010000273'; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 253 0 0 82988.00 1,945,220 306,966 2,530,000 1 Map 3 40 0 0 14952.00 726,940 98,558 103,507,864 40 Map 6 4 0 0 19629.00 63,500 5,461 12,363,714 1,170,606 Map 9 253 0 0 71837.00 4,136,120 438,018 38,007,934 38,007,933 Reducer 2 6 0 0 8323.00 14,120 157 1 1 Reducer 4 1 0 0 2605.00 4,680 165 40 0 Reducer 7 253 0 0 26512.00 1,745,820 40,359 39,178,539 253 Reducer 8 1 0 0 6852.00 1,730 0 253 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 108.231 seconds, Fetched: 2 row(s)Запрос выполнился за 108 секунд, т.е. в 5 раз быстрей.
К минусам можно отнести повторное чтение фактовой таблицы.
Исправление через Salting
Второй, наиболее универсальный способ, который можно выполнить всегда, даже если нет Broadcast - это создание сурогатного ключа с равномерным распределением:* в фактовой таблице создаем колонку salt, где для перкошенного значения будут случайные числа от 0 до 15. У остальных значений будет фиксировано 0
case when v.client_id != '0010000273' then 0 else FLOOR(RAND() * 16) end* перекошенное значение в справочнике размножаем на число строк колонки salt
select v.client_id, salt from bic_pclient v LATERAL VIEW explode(ARRAY(0,1,2,3,4,5,6,7 ,8,9,10,11,12,13,14,15)) adTable as salt where v.client_id = '0010000273'Остальные значения справочника добавляем без изменений с фиксированным значеним salt = 0
* После этого выполняем соединение уже по 2 ключам client_id и salt.
В этом случае перкошенное значение разобьется на 16 тасков и выполнится пропорционально быстрей:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus from ( select v.rtsaexcust, v.client_id, case when v.client_id != '0010000273' then 0 else FLOOR(RAND() * 16) end as salt from pos_1_def v ) p join ( select v.client_id, salt from bic_pclient v LATERAL VIEW explode(ARRAY(0,1,2,3,4,5,6,7 ,8,9,10,11,12,13,14,15)) adTable as salt where v.client_id = '0010000273' UNION ALL select v.client_id, 0 as salt from bic_pclient v where v.client_id != '0010000273' ) m on m.client_id = p.client_id AND m.salt = p.salt ; ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 40 0 0 69224.00 1,349,340 178,767 103,507,863 103,507,863 Map 4 253 0 0 105130.00 2,810,580 553,729 2,530,000 32 Map 6 253 0 0 85245.00 4,386,890 433,833 38,007,934 38,007,933 Reducer 2 253 0 0 106096.00 4,952,640 169,951 141,515,828 253 Reducer 3 1 0 0 62534.00 2,960 0 253 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 194.003 seconds, Fetched: 1 row(s)194 секунды - в 3 раза быстрей изначального варианта.
Число значений в salt можно увеличивать, улучшая тем самым параллельность, но нужно помнить, что тем самым мы увеличиваем и размер таблицы-справочника.
Данный способ наиболее универсален и применим, если нет возможности сделать BHJ.
Исправление встроенными средства Hive
Документация hive-skew: LanguageManualDDL-SkewedTables* При создании таблицы нужно указать колонку и значение с перекосом
* Пересоздадим таблицу
* Hive в этом случае разбивает данные на 2 части: подкаталог с перекошенным значением "/%2Fbic%2Fclient=0010000273/" и каталог с остальным:
SKEWED BY (`/bic/client`) ON ('0010000273') STORED AS DIRECTORIES
$ hadoop fs -du -h /apps/hive/warehouse/pos_1_def 199.7 M /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000000_0 18.2 M /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000001_0 197.9 M /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000002_0 148.3 M /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000003_0 221.1 M /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000004_0 20.6 M /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000005_0 219.8 M /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000006_0 73.7 M /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000007_0 221.3 M /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000008_0 20.5 M /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000009_0 159.3 M /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000010_0 22.9 M /apps/hive/warehouse/pos_1_def/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/000000_0 2.1 M /apps/hive/warehouse/pos_1_def/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/000001_0 22.1 M /apps/hive/warehouse/pos_1_def/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/000002_0 16.5 M /apps/hive/warehouse/pos_1_def/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/000003_0И выполним запрос с включенной оптимизацией перекосов "hive.optimize.skewjoin=true":
set hive.optimize.skewjoin=true; set hive.skewjoin.key=5000; select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus from pos_1_def p join bic_pclient m on m.`/bic/client` = p.`/bic/client`; --все также последний редюсер тормозит ------------------------------------------------------------------------------------------------------------------ VERTICES TOTAL_TASKS FAILED_ATTEMPTS KILLED_TASKS DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS ------------------------------------------------------------------------------------------------------------------ Map 1 15 0 0 25306.00 434,520 15,334 31,096,951 31,096,951 Map 4 8 0 0 28176.00 427,410 12,580 26,682,353 26,682,353 Reducer 2 253 0 0 128475.00 2,726,900 45,064 57,779,304 7,454 Reducer 3 1 0 0 100911.00 8,820 86 7,454 0 ------------------------------------------------------------------------------------------------------------------ Time taken: 142.052 seconds, Fetched: 1 row(s)Ииии.... получаем тоже самое
Либо оптимизация не работает или я делаю что-то не так.
Исправление в Spark
Абсолютно аналогично (salting/broadcast) решается проблема перекоса данных в Spark.Отмечу одну особенность: замен RAND можно использовать monotonically_increasing_id:
val df = spark.read.table("pos_1_def").select($"rtsaexcust", $"client_id"). withColumn("salt", when($"client_id" === "0010000273", monotonically_increasing_id%16).otherwise(0)) df.createOrReplaceTempView("tmp_pos_1_def")
В databricks есть встроенные оптимизации
Hive 3: Materialized view
Начиная с 3 версии Hive поддерживаем материализованные представления и соответственно rewrite запросов на select из matview (hive.materializedview.rewriting).Документация: Materialized+views
К примеру:
* Maview:
CREATE MATERIALIZED VIEW mv1 AS SELECT empid, deptname, hire_date FROM emps JOIN depts ON (emps.deptno = depts.deptno) WHERE hire_date >= '2016-01-01';При выполнении запроса:
SELECT empid, deptname FROM emps JOIN depts ON (emps.deptno = depts.deptno) WHERE hire_date >= '2018-01-01' AND hire_date <= '2018-03-31';Будет подменена на запрос из представления:
SELECT empid, deptname FROM mv1 WHERE hire_date >= '2018-01-01' AND hire_date <= '2018-03-31';
Также Hive поддерживает инкрементальное обновление. Для этого требуется:
* Включенный ACID на таблицах источниках для трекинга измененных строк
* Включенный ACID на matview для выполнения merge в представление
Если matview устарела и не была обновлена, то rewrite автоматически отключается.
Но можно принудительно включить rewrite на промежуток времени до обновления целиком в системе:
SET hive.materializedview.rewriting.time.window=10min;или этот же параметр может быть установлен на уровне конкретных таблиц.
Hive LLAP
Начиная с 2 версии Hive поддерживает LLAP (Live Long And Process) сервер запросовОсновные отличия от обычного Hive:
* постоянно запущенный демон, что уменьшает время старта запроса
* обращение к данным происходит через демона LLAP, что дает возможность кэширования и prefetching данных между запросами
- Минус, что под кэш нужно много Ram, по этому llap обычно отдается сервер целиком и на нем больше ничего не ставят.
Iceberg
Iceberg - набирающий популярность формат данных поддерживающий расширенный функционал: конкурентное обновление и полная поддержка DML, эволюция схемы, скрытое партицирование и другое о чем дальше.Схема формата данных
Составлюящие:- iceberg catalog - текущий указатель на метеданные для каждой таблицы.
Нужна поддержка атомарности и реализуется 2 способами:
* file-based catalogs (HDFS, S3) - через атомарное изменение файла version-hint.text
* service-based catalogs (Hive Metastore, Nessie) - через изменение поля location в таблице tables
В пути хранится адрес файла метаданных metadata.json.
- metadata file - содержит в себе схему, партиции, сортировки, список снапшотов и какой из них активен сейчас
* снапшот (ветка) создается после каждой dml операции
из текущего снапшота берется location файла со список manifest (manifests_lists)
* в manifests_lists содержатся статистики, по которым отфильтровываваются конкретные manifests
* внутри manifests уже содержатся детальные статистики каждого отдельного дата файла, по которым можно дофильтровать файлы
под каждый dml создается свой metadata file , по этому можно менять схему таблицы или перепатицировать на лету
если перепартицируем на том же столбце, то партишен прунинг все равно будет на уровне значений в метаданных
Конкурентный доступ
- запись конкурентная и атомарная за счет оптимистичных блокировокв случая 2 одновременных изменений, последний будет рестартован
Все это обеспечивает Serializable isolation - максимальную изоляцию транзакций при записи и чтении.
- Как реализуется частичное изменение данных на примере Delete Query
2 стратегии:
* copy-on-write
заменяются файлы с данными на новые (только подходящие под фильтр удаления)
* merge-on-read
читаются файлы по фильтру удаления, записывается файл с позициями для исключения при чтении
Улучшение партицирования
- Hidden PartitioningВ описании партиции указывается колонка и функция трансформации:
"partition-specs" : [ { "spec-id" : 0, "fields" : [ { "name" : "ts_hour", "transform" : "hour", "source-id" : 2, "field-id" : 1000 } ] } ],если типы данных в фильтре select и описании партиции сравнимы, то они будут преобразованы функцией трансформации и применены над метаданными до непосредственного обращения к файлам данным.
Кроме более гибкого обращения к колонкам партицирования это улучшает скорость построения плана запроса, т.к. уже не требуется сканировать папки партиций, как это делается в классическом подходе.
- spark поддерживает storage partition join для iceberg таблиц,
join будет без стадии шафла, если 2 таблицы имеют одинаковую спецификацию partition by
Версионирование
- Обращение к предыдущим версиям данныхSELECT * FROM table1 AS OF '2021-01-28 00:00:00'
- Работа с данными как с ветками git
бранч можно создать по времени или по снапшоту
список снапшотов:
SELECT * FROM <database>.<table name>.HISTORYсоздание бранча
ALTER TABLE <table name> CREATE BRANCH <branch name> FOR SYSTEM_VERSION AS OF <SNAPSHOT_ID>или с актуальной версии
ALTER TABLE <table name> CREATE BRANCH <branch name>дальше обращаемся к брачну по имени
FROM <database name>.<table name>.branch_<branch name>когда все изменения в бранче готовы, переключаем указатель
ALTER TABLE <table name> EXECUTE FAST-FORWARD 'x' 'z';тем самым можно незаметно для пользователей делать ряд не атомарных изменений, а когда все готово публиковать готовое
Компакция и распределение
CALL catalog.system.rewrite_data_files( table => 'db.table1', strategy => 'binpack', options => map('min-input-files','2') )компкакция периода
CALL catalog_name.system.rewrite_data_files( table => 'db.sample', where => 'ingested_at between "2022-06-30 10:00:00" and "2022-06-30 11:00:00" ' )компакция с сортировкой
CALL catalog.system.rewrite_data_files( table => 'db.teams', strategy => 'sort', sort_order => 'team ASC NULLS LAST, name DESC NULLS FIRST' )В sort_order поддерживается zorder распределение по нескольким столбцам:
- как-нибудь поговорим о нем отдельно.
Более подробно о iceberg вы можете почитать тут: https://www.dremio.com/resources/guides/apache-iceberg-an-architectural-look-under-the-covers/
Крутой материал получился! Спасибо!
ОтветитьУдалитьЕсть такой вопрос: правильно ли я понимаю, что Хайв до сих пор не умеет создавать таблицу со схемой и данными из файлов орк, паркет и как импортить данные в Хайв в таких случаях?
Hive не сохраняет названия столбцов в orc файлы, а берет их из метастора
УдалитьНасчет как импортировать: описать схему в метасторе и копировать файлы или загружать sparkом, он вытаскивает схему из файлов
В спарке не силён, буду пробовать sqoop-ом) Еще, схему то не сложно выудить, только вот при создании таблицы с указанием схемы Hive валится и говорит, что не поддерживает struct, думаю вариант только разбивать массив на дополнительные колонки.
УдалитьСпасибо огромное! Очень интересная статья!
ОтветитьУдалитьКакая версия Hive использовалась?
Hive 1.2 и 2.1
Удалить