понедельник, 19 августа 2019 г.

Оптимизация хранения данных в bigdata

В этой статье хотел бы объединить знания по оптимизации хранения данных в Hadoop для Hive с упором на Orc тип файла.
Кому лень читать, могу перейти сразу к сравнению результатов

ORC: формат файла

Документация: https://orc.apache.org/specification/ORCv1/
ORC - это колоночный формат хранения данных, но колоночность в нем ограниченная, т.к. строки таблицы разбиваются на несколько частей:
Данные делятся на файлы по размеру, внутри строки разбиваются на stripe (размер stripe задаются параметром orc.stripe.size - обычно 64МБ).

Stripe состоит из частей:
* Index Data:
 ** Min/Max значение, количество строк, наличие NULL для каждой колонки внутри stripe целиком
 ** Index c Min/Max значением колонки для каждых 10 тыс. строк (hive.exec.orc.default.row.index.stride)
 ** Bloom filter для указанных колонок
 ** Смещение до данных колонки в Row Data из индекса

* Row Data - сжатые данные колонок (сжатие по умолчанию orc.compress = ZLIB)
Способы сжатия:
 ** Числовые данные хранятся с переменной длинной: <= 128 - 1 байт, <= 16383 - 2 байт и т.д.
 ** Повторяющиеся числа сохраняются как дельта (delta encoding)
 ** Возврастающие значения кодируются длиной неизменной дельты (run length encoding)
 ** Строки кодируются справочником (dictionary encoding).
Если число уникальных значений не превышает 0,8 (hive.exec.orc.dictionary.key.size.threshold) от общего числа строк, то на основании данных строковой колонки создается справочник и упорядочивается для бинарного поиска. В Row Data же вставляется id справочника.

* Footer - данные о кодировке

Такой формат хранения позволяет отсекать как строки читая индексы файла и stripe, так и колонки используя смещения в stripe.



Посмотрим влияние параметров orc на размер, время вставки:

Параметры по умолчанию

Параметры и таблица по умолчанию для сравнения с оптимизациями:
Это будет таблица с продажами за 3 дня - 31 млн строк. Состоит из 42 столбца из которых 28 это ключевые строковые поля и 14 числовых показателей.
Все запросы будут выполняться с принудительным bucketing и sorting, также с логированием performance параметров.
set hive.server2.logging.operation.level=performance;
set hive.enforce.bucketing = true; 
set hive.enforce.sorting = true;

insert into pos_1_def
select * from pos_rec_itm
where calday between '20190601' and '20190603';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           11                0             0      276213.00     1,601,560        8,928     31,096,951               0
------------------------------------------------------------------------------------------------------------------

Loading data to table pos_1_def
Table pos_1_def stats: [numFiles=11, numRows=31096951, totalSize=1795231673, rawDataSize=72898554240]
Time taken: 278.238 seconds
При параметрах по умолчанию время вставки составляет 278 секунд и размер файла получается 1712 МБ

Если задампить блок, то большинство строковых колонок содержат секцию со справочником:
Stream: column 9 section DATA start: 1060399 length 203242
Stream: column 9 section LENGTH start: 1263641 length 101
Stream: column 9 section DICTIONARY_DATA start: 1263742 length 64140
Количество страйпов в 1 файле = 66
hive --orcfiledump /apps/hive/warehouse/pos_1_def/000000_0 | grep 'Stripe:' | wc -l
66

Максимальная доля уникальных значений для создания справочника

Попробуем уменьшить hive.exec.orc.dictionary.key.size.threshold с 0,8 до 0,2 и посмотреть как это повлияет:
set hive.exec.orc.dictionary.key.size.threshold = 0.2;
truncate table pos_1_def;
insert into pos_1_def
select * from pos_rec_itm
where calday between '20190601' and '20190603'
;
Table pos_1_def stats: [numFiles=11, numRows=31096951, totalSize=1983777997, rawDataSize=72898554240]
Time taken: 196.922 seconds
Время вставки ускорилось с 278 секунд до 197 секунд, а рамзер файла возрос до 1892 МБ.

Из дампа у 9 строковой колонки пропал справочник:
Stream: column 9 section DATA start: 5890885 length 1096170
Stream: column 9 section LENGTH start: 6987055 length 929

Если наоборот увеличить необходимость справочника до 1:
set hive.exec.orc.dictionary.key.size.threshold = 1;
truncate table pos_1_def;
insert into pos_1_def
select * from pos_rec_itm
where calday between '20190601' and '20190603'
;
Table pos_1_def stats: [numFiles=11, numRows=31096951, totalSize=1784757162, rawDataSize=72898554240]
Time taken: 322.133 seconds
То время вставки возрастает до 322 секунд, а размер файла значительно не меняется.

Вывод: лучше оставить этот параметр по умолчанию = 0.8, как баланс между размером файла и скоростью вставки.
Отключать справочники, видимо, имеет смысл, если в таблице множество уникальных небольших строк, т.к. в этом случае замена короткой строки на длинное число из справочника не принесет должного сжатия.

Количество строк в stride

Попробуем поварировать число строк в индексе hive.exec.orc.default.row.index.stride (по умолчанию = 10000 ).
Но проводить тест будем на сортированной таблице, т.к. на стандартной таблице индексы почти не отфильтровывают данные.
Стандартная сортированная таблица:
set hive.exec.orc.default.row.index.stride=10000;
truncate table pos_3_sort;
insert into pos_3_sort
select * from pos_rec_itm
where calday between '20190601' and '20190603'
;
Table pos_3_sort stats: [numFiles=4, numRows=31096951, totalSize=1015100427, rawDataSize=72898553064]
Time taken: 433.964 seconds
Что дает 968 МБ данных и 22 Stripe:
$ hive --orcfiledump /apps/hive/warehouse/pos_3_sort/000000_0 | grep 'Stripe:' | wc -l
$ 22
При отборе данных отфильтровываются данные индексом до 4млн:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where `/bic/client` = '0010000273';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           23                0             0       10525.00       278,840       15,736      4,216,492              23
 Reducer 2            1                0             0        2985.00         1,850           80             23               0
------------------------------------------------------------------------------------------------------------------

Time taken: 11.599 seconds, Fetched: 1 row(s)
Увеличим число строк в индексе в 10 раз:
set hive.exec.orc.default.row.index.stride=100000;
truncate table pos_1_def;
insert into pos_1_def
select * from pos_rec_itm
where calday between '20190601' and '20190603'
;
Table pos_3_sort stats: [numFiles=4, numRows=31096951, totalSize=1013477928, rawDataSize=72898553064]
Time taken: 475.205 seconds
Размер файла, как и время вставки осталось примерно одинаково.
Число Stripe уменьшились почти в 2 раза:
$ hive --orcfiledump /apps/hive/warehouse/pos_3_sort/000000_0 | grep 'Stripe:' | wc -l
$ 12
Но вот эффективность индекса при поиске по ключу значительно снижилась с 4 млн до 24 млн:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where `/bic/client` = '0010000273';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           24                0             0       11169.00       308,520       17,330     24,316,951              24 
 Reducer 2            1                0             0        3525.00         1,190           52             24               0
------------------------------------------------------------------------------------------------------------------

Time taken: 12.852 seconds, Fetched: 1 row(s)
Вывод: изменение числа строк в индексе в моем примере существенно не влияет на время вставки и размер таблицы, но хорошо помогает при последующих select.
Если в таблице размер строки был меньше, то метаданные индекса могли создать дополнительную нагрузку на вставку и размер файла.
Также если значение колонки регулярно присутствует во всех продажах, то индекс не даст эффекта, т.к. значение попадет во все Stripe. Т.е. индекс Min/Max для диапазона эффективен для редковстречающихся значений (id клиента), и малоэффективен для повторяющихся значений (ид популярной акции или товара)

Партицирование

Не буду заострять внимание на этой опции. Партицирование есть во всех современных бд.
Основное преимущество HIVE партицирования перед Oracle:
* Неограниченная вложенность
* автоматическое создание на оснований уникальных значений в колонке
Стоит обратить внимание, что hive поддерживает отсечение партиций не только по ключу, но и при join через справочник

Кластеризация

Другой возможностью разбиения данных на части, помимо партицирования, является кластеризация. В отличии от партицирования, кластеризация разбивает данные на файлы.
Документация: LanguageManual+DDL+BucketedTables
Аналогом в Oracle является хэш партицирование.

Время вставки

Замерим показатели размера, время вставки и скорости select для кластеризованной таблицы.
Кластеризуем таблицу на 4 части по колонке магазина:
CLUSTERED BY(plant) INTO 4 BUCKETS
insert into pos_2_clust
select * from pos_rec_itm
where calday between '20190601' and '20190603';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           11                0             0       97875.00       850,680       17,647     31,096,951      31,096,951
 Reducer 2            4                0             0      387931.00     1,139,700        4,545     31,096,951               0
------------------------------------------------------------------------------------------------------------------

Loading data to table pos_2_clust
Table pos_2_clust stats: [numFiles=4, numRows=31096951, totalSize=1754969327, rawDataSize=72898553064]
Time taken: 408.902 seconds
Отличия в вставке от обычной таблицы:
* Появляется Reducer в который отправляются данные нужной хэш секции магазина.
* По этому время вставки возросло с 278 секунд до 408 (на 50% дольше)
* Размер данных незначительно сократился с 1712 МБ до 1673 МБ за счет локальности магазинов в одном файле

Select по ключу кластеризации

Сравним время выборки по ключу магазина для обычной таблицы и кластеризованной
Выборка по ключу из обычной таблицы считывает все 31 млн строк за 12 секунд:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_1_def where plant = '0002';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           14                0             0       10740.00       173,570        7,868     31,096,951              14
 Reducer 2            1                0             0        6033.00         1,280           67             14               0
------------------------------------------------------------------------------------------------------------------
Time taken: 12.265 seconds, Fetched: 1 row(s)
Из кластеризованной же считывается половина строк (15,5 млн) за 11 секунд:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_2_clust where plant = '0002';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            6                0             0        9935.00        63,060        3,315     15,485,169               6
 Reducer 2            1                0             0        7096.00           350           29              6               0
------------------------------------------------------------------------------------------------------------------
Time taken: 10.889 seconds, Fetched: 1 row(s)
Замечание: странно, что считалась половина строк, тогда как 1 магазин должен был попасть в 1 кластер, что должно было сократить объем чтения в 4 раза (1 / число кластеров)

Отличие Hive кластеризации от Spark

- разные алгоритмы хэширования, как следствие полная несовместимость (Spark бакетированную таблицу нельзя прочитать Hive)
- Spark делает бакеты из мапперов (без стадии shuffle/reduce), что дает число файлов = число мапперов * число бакетов
Это ускоряет запись, но при чтении эти несколько файлов нужно слить в один, что замедляет чтение
Hive на стадии записи добавляет reducer, на котором бакеты из разным мапперов мержит в 1 файл
- Если к ключу бакетирования добавляется еще доп ключ, то все равно требуется полный shuffle
- если делается union одинаково бакетированных таблиц, то бакетирование теряется
Задача SPARK-19256 поддержки hive бакетирования еще не закрыта (фев.2021) .
В остальном Spark поддерживает динамическую перезапись партиций (spark.sql.sources.partitionOverwriteMode=DYNAMIC) и SMJ без Exchange hashpartitioning, но с сортировкой, т.к. в 1 бакете несколько файлов.

Сортировка

Еще сильней сжать данные можно, если отсортировать их. Тогда Orc сможет сжать файлы максимально эффективно.
Документация: LanguageManualDDL-BucketedSortedTables
Основной сложностью при сортировке таблицы является выборк колонки по которой это сделать.
При выборе я руководствовался 2 факторами:
* Частота использования колонки в фильтрах
* Размер данных после сортировки
Для определения популярных фильтров на колонке можно распарсить логи запросов.
Для этого был написан небольшой скрипт: hive_predicate_stat.py, по результату которого стало ясно, что самые популярные фильтр по магазину (plant)

Для проверки размера данных проведем несколько замеров на 1 дне (calday):

Размер данных

Исходный размер одного дня без сортировок 688,5МБ:
CLUSTERED BY(material)        SORTED BY(material, `/bic/client`, plant)  INTO 4 BUCKETS  -- Кластеризация по товару, сортировка по товару, клиенту и магазину   == 498 МБ
CLUSTERED BY(material)        SORTED BY(`/bic/client`, plant, material)  INTO 4 BUCKETS  -- Кластеризация по товару, сортировка по клиенту и магазину           == 454 МБ
CLUSTERED BY(`/bic/client`)   SORTED BY(`/bic/client`, plant, material)  INTO 4 BUCKETS  -- Кластеризация по клиенту, сортировка по клиенту, магазину и товару  == 388 МБ
CLUSTERED BY(plant)           SORTED BY(plant, `/bic/client`, material)  INTO 4 BUCKETS  -- Кластеризация по магазину, сортировка по магазину и клиенту         == 374.5 МБ
Исходя из этого был выбран вариант сжатия, который как ускорял выборку, так и хорошо сжимал данные: CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS

Замечу, что чем больше число бакетов, тем лучше будет отсечение лишних данных, но хуже сжатие.
Так 16 бакетов сжали данные до 554 МБ, тогда как 4 бакета дали 454 МБ
CLUSTERED BY(material)        SORTED BY(`/bic/client`, plant, material)  INTO 16 BUCKETS -- Кластеризация по товару, сортировка по товару, клиенту и магазину   == 554 МБ 
так что степень = 4, выбрана как компромисс

Время вставки

Вставка данных в кластеризованно-сортированную таблицу: CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS
insert into pos_3_sort
select * from pos_rec_itm
where calday between '20190601' and '20190603'
;

Task Execution Summary
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           11                0             0       95966.00       900,790       18,164     31,096,951      31,096,951
 Reducer 2            4                0             0      412328.00     1,228,640        5,323     31,096,951               0
------------------------------------------------------------------------------------------------------------------

Loading data to table pos_3_sort
Table pos_3_sort stats: [numFiles=4, numRows=31096951, totalSize=1014407061, rawDataSize=72898553064]
Time taken: 433.964 seconds
Отличия от обычной таблицы:
* Появляется Reducer в который отправляются данные нужной хэш секции магазина и сортирует.
* По этому время вставки возросло с 278 секунд до 433 секунд, что незначительно больше 408с при просто кластеризованной таблице
* Размер данных сократился с 1712 МБ до 967 МБ, т.е. почти в 2 раза!

Select по первому ключу сортировки

Если вернуться к описанию хранения индексов, видно, что данные по ключу могут попасть в любой stripe случайным образом, что значительно ухудшает эффективность встроенных индексов Orc
Если же данные отсортировать по ключу поиска, то уникальные значения лягут в один Stripe и будут максимально эффективно отфильтровываться

<< Слева показан пример с выборкой уникального значения по обычной таблице и сортированной (красным - отсеченные stripe, зеленым - считываемые)
Проверим на реальных данных.
Выборка по ключу на обычной таблице считывает все 31 млн строк за 12 секунд:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from tmp.pos_1_def where plant = '0002';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           14                0             0       10740.00       173,570        7,868     31,096,951              14
 Reducer 2            1                0             0        6033.00         1,280           67             14               0
------------------------------------------------------------------------------------------------------------------
Time taken: 12.265 seconds, Fetched: 1 row(s)
По отсортированной таблице считывается в разы меньший объем строк за 4 секунды (в 3 раза быстрей):
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where plant = '0002';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            2                0             0        3039.00         8,760          872        180,000               2
 Reducer 2            1                0             0        1594.00         1,700           42              2               0
------------------------------------------------------------------------------------------------------------------
Time taken: 4.342 seconds, Fetched: 1 row(s)

Select по второму ключу сортировки

Выборка по второму ключу поиска считывает все бакеты, но все равно отбирает значительно меньше строк из Stripe:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where `/bic/client` = '0010000273';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           23                0             0       10525.00       278,840       15,736      4,216,492              23
 Reducer 2            1                0             0        2985.00         1,850           80             23               0
------------------------------------------------------------------------------------------------------------------
Time taken: 11.599 seconds, Fetched: 1 row(s)

Select по ключу без сортировки

Select по ключу без сортировки получается одинаковый по обоим таблицам:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_1_def where rt_promo = 'A122RP94';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           14                0             0       10926.00       173,190        8,356     31,096,951              14 
 Reducer 2            1                0             0        6504.00         1,190           46             14               0
------------------------------------------------------------------------------------------------------------------
Time taken: 11.675 seconds, Fetched: 1 row(s)
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where rt_promo = 'A122RP94';  
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            6                0             0       10300.00        71,340        3,946     31,096,951               6
 Reducer 2            1                0             0        7234.00         1,950          111              6               0
------------------------------------------------------------------------------------------------------------------
Time taken: 11.384 seconds, Fetched: 1 row(s)

Sort Merge Bucket Join

Дополнительное преимущество которое дает сортированность таблиц - это возможность выполнения sort merge join без собственно сортировки и пересылки данных в reducer.

Отсортируем и кластеризуем все таблицы в join одиково по магазину.
Также нужно включить sortmerge и bucketmapjoin, которые выключены по умолчанию в Hive.
Документация: LanguageManualJoinOptimization-AutoConversiontoSMBMapJoin

План по умолчанию на обычных таблицах:
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.convert.join.bucket.mapjoin.tez=true; 

select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bi0_pplant m on m.plant = p.plant;
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           14                2             0       25340.00       498,640       16,367     31,096,951      31,096,951
     Map 4            1                0             0        7087.00        21,530          851          5,000           5,000
 Reducer 2          145                0             0       18011.00       988,800       15,270     31,101,951             145
 Reducer 3            1                0             0        6623.00         4,490           59            145               0
------------------------------------------------------------------------------------------------------------------
Time taken: 38.847 seconds, Fetched: 1 row(s)
Join обычных таблиц генерирует 2 мапера, в которых считываются и сортируются данные.
После чего порции данных с одним ключем отправляются в Reducer и в нем происходит слияние 2 таблиц и последующий подсчет count и sum
В 3ий единственный reducer отправляется промежуточный результат count и sum , где досчитывается итоговая сумма и кол-во.
Самой дорогой операцией тут является Map1 - сортировка данных и отправка промежуточных результатов в Reducer2
Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Reducer 3
         File Output Operator [FS_12]
            compressed:false
            Statistics:Num rows: 1 Data size: 16
            Group By Operator [GBY_10]
            |  aggregations:["count(VALUE._col0)","sum(VALUE._col1)"]
            |  outputColumnNames:["_col0","_col1"]
            |  Statistics:Num rows: 1 Data size: 16
            |<-Reducer 2 [SIMPLE_EDGE]
               Reduce Output Operator [RS_9]
                  sort order:
                  Statistics:Num rows: 1 Data size: 16
                  value expressions:_col0 (type: bigint), _col1 (type: double)
                  Group By Operator [GBY_8]
                     aggregations:["count()","sum(_col27)"]
                     outputColumnNames:["_col0","_col1"]
                     Statistics:Num rows: 1 Data size: 16
                     Merge Join Operator [MERGEJOIN_17]
                     |  condition map:[{"":"Inner Join 0 to 1"}]
                     |  keys:{"0":"plant (type: string)","1":"plant (type: string)"}
                     |  outputColumnNames:["_col27"]
                     |  Statistics:Num rows: 17103323 Data size: 40094206990
                     |<-Map 1 [SIMPLE_EDGE] vectorized
                     |  Reduce Output Operator [RS_19]
                     |     key expressions:plant (type: string)
                     |     Map-reduce partition columns:plant (type: string)
                     |     sort order:+
                     |     Statistics:Num rows: 15548476 Data size: 36449278292
                     |     value expressions:rtsaexcust (type: double)
                     |     Filter Operator [FIL_18]
                     |        predicate:plant is not null (type: boolean)
                     |        Statistics:Num rows: 15548476 Data size: 36449278292
                     |        TableScan [TS_0]
                     |           alias:p
                     |           Statistics:Num rows: 31096951 Data size: 72898554240
                     |<-Map 4 [SIMPLE_EDGE] vectorized
                        Reduce Output Operator [RS_21]
                           key expressions:plant (type: string)
                           Map-reduce partition columns:plant (type: string)
                           sort order:+
                           Statistics:Num rows: 2500 Data size: 4681364
                           Filter Operator [FIL_20]
                              predicate:plant is not null (type: boolean)
                              Statistics:Num rows: 2500 Data size: 4681364
                              TableScan [TS_1]
                                 alias:m
                                 Statistics:Num rows: 5000 Data size: 9362729
Повторим тест, но на одинаково кластеризованно-сортированных таблицах:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_3_sort p
join bi0_pplant m on m.plant = p.plant;
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            6                0             0       23180.00       201,040        8,189     31,096,951               6
 Reducer 2            1                0             0       17839.00         1,200           46              6               0
------------------------------------------------------------------------------------------------------------------
Time taken: 24.346 seconds, Fetched: 1 row(s)
Теперь обе таблицы читаются и соединяются в одном маппере без сортировки и пересылки по сети и запрос отработал почти в 2 раза быстрей:
Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Reducer 2
         File Output Operator [FS_12]
            compressed:false
            Statistics:Num rows: 1 Data size: 16
            Group By Operator [GBY_10]
            |  aggregations:["count(VALUE._col0)","sum(VALUE._col1)"]
            |  outputColumnNames:["_col0","_col1"]
            |  Statistics:Num rows: 1 Data size: 16
            |<-Map 1 [SIMPLE_EDGE]
               Reduce Output Operator [RS_9]
                  sort order:
                  Statistics:Num rows: 1 Data size: 16
                  value expressions:_col0 (type: bigint), _col1 (type: double)
                  Group By Operator [GBY_8]
                     aggregations:["count()","sum(_col27)"]
                     outputColumnNames:["_col0","_col1"]
                     Statistics:Num rows: 1 Data size: 16
                     Merge Join Operator [MERGEJOIN_17]
                     |  condition map:[{"":"Inner Join 0 to 1"}]
                     |  keys:{"0":"plant (type: string)","1":"plant (type: string)"}
                     |  outputColumnNames:["_col27"]
                     |  Statistics:Num rows: 17103323 Data size: 40094206343
                     |
                     |<-Filter Operator [FIL_16]
                     |     predicate:plant is not null (type: boolean)
                     |     Statistics:Num rows: 2500 Data size: 4681364
                     |     TableScan [TS_1]
                     |        alias:m
                     |        Statistics:Num rows: 5000 Data size: 9362729
                     |<-Filter Operator [FIL_15]
                           predicate:plant is not null (type: boolean)
                           Statistics:Num rows: 15548476 Data size: 36449277704
                           TableScan [TS_0]
                              alias:p
                              Statistics:Num rows: 31096951 Data size: 72898553064
Причем замечу, что этот подход работает и для просто одинаково кластеризованных таблиц без сортировки.
Сортировка добавляется внутрь map1, что немного медленней, но все равно экономит кучу времени на shuffling - пересылку данных по сети в reducer.

ORC: Bloom filter

Что такое Bloom фильтр описывалось уже несколько раз в этом блоге: oracle-bloom-filter
Bloom фильтр также может применяться в orc файлах и очень удобно подходит для фильтрации на не отсортированных колонках, т.к. индексы на них неэффективны.
Bloom filter же дает дополнительную информацию о содержании колонки в stripe

Время вставки

Протестируем работу на кластеризованно-сортированной таблице с bloom filter на 4 колонках.
Замечу, что названия колонок нужно задавать без кавычек
CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS stored as orc
tblproperties ("orc.bloom.filter.columns"="material,rt_promo,/bic/client,/bic/card12,", "orc.bloom.filter.fpp"="0.05")


insert into pos_4_bloom
select * from pos_rec_itm
where calday between '20190601' and '20190603'
;

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           11                0             0      105566.00       957,410       19,722     31,096,951      31,096,951
 Reducer 2            4                0             0      428483.00     1,247,480        5,818     31,096,951               0
------------------------------------------------------------------------------------------------------------------

Loading data to table pos_4_bloom
Table pos_4_bloom stats: [numFiles=4, numRows=31096951, totalSize=1066863577, rawDataSize=72898553064]
Time taken: 450.133 seconds
Отличия от кластеризованно-сортированной таблицы:
* Время вставки возросло с 433 секунд до 450 секунд, что незначительно больше
* Размер данных возрос с 967 МБ до 1 017 МБ, что на 5% больше

В индексных данных колонки появляется дополнительный раздел "BLOOM_FILTER":
Stripe: offset: 805493730 data: 13149108 rows: 410000 tail: 799 index: 285445
Stream: column 0 section ROW_INDEX start: 805493730 length 20
...
Stream: column 18 section ROW_INDEX start: 805502343 length 570
Stream: column 18 section BLOOM_FILTER start: 805502913 length 260902

Select по ключу bloom filter

Возьмем из пункта про сортировку запрос и сравним с ним.

Запрос к таблице с блум фильтром:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_4_bloom where rt_promo = 'A122RP94'; 
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            8                1             0       10105.00        83,770        4,074        330,000               8
 Reducer 2            1                0             0        6507.00           370            0              8               0
------------------------------------------------------------------------------------------------------------------
Time taken: 10.839 seconds, Fetched: 1 row(s)
Отбирается 330т строк, вместо 31 млн по умолчанию

Если взять колонку на которой есть сортировка и блум фильтр, то результат еще лучше:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_4_bloom where `/bic/client` = '0010000273';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           24                0             0       10325.00       286,540       17,450         10,000              24
 Reducer 2            1                0             0        1953.00         6,490           86             24               0
------------------------------------------------------------------------------------------------------------------
Time taken: 11.958 seconds, Fetched: 1 row(s)
В просто сортированной таблице отбиралось 24 млн строк, а с bloom фильтром всего 10 т.

Увеличение False positive Bloom filter

Попробуем увеличить вероятность ложных срабатываний bloom фильтра (False positive с 0,05 до 0,2), чтобы оценить влияние на вставку и выборку:
CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS stored as orc
tblproperties ("orc.bloom.filter.columns"="material,rt_promo,/bic/client,/bic/card12,", "orc.bloom.filter.fpp"="0.2")

insert into pos_4_bloom2
select * from pos_rec_itm
where calday between '20190601' and '20190603';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           11                0             0       98033.00       976,480       19,705     31,096,951      31,096,951
 Reducer 2            4                0             0      425676.00     1,251,980        4,976     31,096,951               0
------------------------------------------------------------------------------------------------------------------

Loading data to table pos_4_bloom2
Table pos_4_bloom2 stats: [numFiles=4, numRows=31096951, totalSize=1042572361, rawDataSize=72898553064]
Time taken: 446.657 seconds
Отличия от bloom filter по умолчанию:
* Время примерно одно
* Размер данных стал меньше с 1 017 МБ до 994 МБ (из-за уменьшения битовой карты)

Но увеличение False positive не изменило размер фильтрации - те же 330т строк:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_4_bloom2 where rt_promo = 'A122RP94'; 
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            6                0             0        9101.00        57,690        3,591        330,000               6
 Reducer 2            1                0             0        5991.00         1,540           46              6               0
------------------------------------------------------------------------------------------------------------------
Time taken: 10.165 seconds, Fetched: 1 row(s)
И по клиенту - те же 10т строк:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_4_bloom2 where `/bic/client` = '0010000273';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           21                0             0        9935.00       249,380       13,191         10,000              21
 Reducer 2            1                0             0        2590.00         2,370           79             21               0
------------------------------------------------------------------------------------------------------------------
Time taken: 11.007 seconds, Fetched: 1 row(s)

Ускорение вставки в кластеризованную таблицу с динамическим партицированием

Существенный минус при кластеризации партицированной таблицы, что вместо большого количества reducer мы получаем число писателей = числу бакетов.
И если происходит вставка большого объема данных это существенно замедляет процесс.

Запрос по умолчанию имеет 36 начальных мапперов, но всего 4 пишущих reducer.
В итоге время вставки = 1497 секунд.

insert OVERWRITE table DWH.TBL PARTITION (part_col)
select * from STG.TBL;

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     36         36        0        0       0       0
Reducer 2 ......   SUCCEEDED      4          4        0        0       0       0
--------------------------------------------------------------------------------

Проблему можно частично решить, если установить параметр
SET hive.optimize.sort.dynamic.partition=true; 
- данные будут отсортированы по колонке партицирования и создано большее число reducer, что ускорит вставку
Этот прием дает незначительное ускорение. Общее время = 1054 секунд.
SET hive.optimize.sort.dynamic.partition=true; 
insert OVERWRITE table DWH.TBL PARTITION (part_col)
select * from STG.TBL;

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     36         36        0        0       0       0
Reducer 2 ......   SUCCEEDED    253        253        0        0       0       0
--------------------------------------------------------------------------------
Но надо помнить, что этот параметр нестабилен и может применяться только для простых вставках без группировок.

Лучший вариант, это переписать вставку таким образом, чтобы чтение исходной таблицы было 1, а вставок несколько.
По итогу исходная таблица читается 1 раз (MAP), а пишется сразу несколькими reducer в нужные партиции.
Тут получается максимальное ускорение. Общее время = 461 секунд.
from STG.TBL s
insert OVERWRITE table DWH.TBL PARTITION (part_col)
select s.* where s.part_col = 1
insert OVERWRITE table DWH.TBL PARTITION (part_col)
select s.* where s.part_col = 2
insert OVERWRITE table DWH.TBL PARTITION (part_col)
select s.* where s.part_col = 3;

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     36         36        0        0       0       0
Reducer 1 .....   SUCCEEDED      4          4        0        0       0       0
Reducer 2            RUNNING      4          0        4        0       0       0
Reducer 3            RUNNING      4          0        4        0       0       0
--------------------------------------------------------------------------------


Кроме собственно вставки, таким образом можно ускорять повторяющиеся запросы, у которых одно основание, но разные уровни агрегации.
К примеру, нам надо подсчитать число уникальны чеков в разрезе магазина + группы товара и магазина + товара.
Это нужно делать 2 запросами к одним и теми же данным, т.к. в случае count distinct нельзя переиспользовать промежуточные результаты.
1 запрос с группировкой до магазина + группы:
insert OVERWRITE table tmp.checks_wgh4
select 
i.plant, m.rpa_wgh4,
COUNT(DISTINCT i.check_id) as pr_check
from rdw.pos_rec_itm i
join rdw.BI0_PMATERIAL m on m.material = i.material
join rdw.bi0_pplant p on i.plant = p.plant
where i.calday between '20201005' and '20201010'
AND m.rpa_wgh2 between '11' and '20'
AND p.`/bic/zfrmttyp` = 'H'
group by 
i.plant, m.rpa_wgh4;

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           24                0             0       30604.00       756,840       41,165     60,417,666      11,928,929
     Map 3            1                0             0        2850.00        10,280          259        558,756          41,656
     Map 4            2                0             0       12414.00        13,210          301          5,033             305
 Reducer 2          253                0             0       31972.00     2,103,120       31,721     11,928,929               0
------------------------------------------------------------------------------------------------------------------

Loading data to table tmp.checks_wgh4
Table tmp.checks_wgh4 stats: [numFiles=253, numRows=58079, totalSize=421037, rawDataSize=10802694]
OK
Time taken: 58.198 seconds
2 запрос до магазина+товара, использует те же map для считывания данных и отличаются только стадией reduce
insert OVERWRITE table tmp.checks_material
select 
i.plant, i.material,
COUNT(DISTINCT i.check_id) as pr_check
from rdw.pos_rec_itm i
join rdw.BI0_PMATERIAL m on m.material = i.material
join rdw.bi0_pplant p on i.plant = p.plant
where i.calday between '20201005' and '20201010'
AND m.rpa_wgh2 between '11' and '20'
AND p.`/bic/zfrmttyp` = 'H'
group by 
i.plant, i.material;

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           24                0             0       20754.00       930,540       31,129     60,417,666      12,219,596
     Map 3            1                0             0        1666.00         5,880           64        558,756          41,656
     Map 4            2                0             0        6734.00        15,940          387          5,033             305
 Reducer 2          253                0             0       23094.00     1,534,680       22,291     12,219,596               0
------------------------------------------------------------------------------------------------------------------

Loading data to table tmp.checks_material
Table tmp.checks_material stats: [numFiles=253, numRows=314669, totalSize=1919892, rawDataSize=62304462]
OK
Time taken: 44.262 seconds
Что дает 58+44 = 102с работы запросов.

Для оптимизации можно сделать join и сохранить полный набор данных в физическую таблицу, либо использовать функционал insert all.
insert all - предпочтительней, потому что в этом случае данные остаются в памяти и не тратится время на повторное сохранение и считывание данных с диска.
FROM (
    select 
    i.plant, i.material, m.rpa_wgh4, i.check_id
    from rdw.pos_rec_itm i
    join rdw.BI0_PMATERIAL m on m.material = i.material
    join rdw.bi0_pplant p on i.plant = p.plant
    where i.calday between '20201005' and '20201010'
    AND m.rpa_wgh2 between '11' and '20'
    AND p.`/bic/zfrmttyp` = 'H'
) i
insert OVERWRITE table tmp.checks_wgh4
select 
i.plant, i.rpa_wgh4,
COUNT(DISTINCT i.check_id) as pr_check
group by 
i.plant, i.rpa_wgh4
insert OVERWRITE table tmp.checks_material
select 
i.plant, i.material,
COUNT(DISTINCT check_id) as pr_check
group by 
i.plant, i.material
;

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           24                0             0       28257.00       952,580       17,621     60,417,666      24,148,525 -- мапперы выполняются 1 раз
     Map 4            1                0             0         619.00         2,210            0        558,756          41,656
     Map 5            2                0             0        5967.00        14,330          291          5,033             305
 Reducer 2          253                0             0       20250.00       797,850       16,972     24,148,525               0 -- reducer от обоих запросов группировки
 Reducer 3          253                0             0       28019.00     1,326,100       18,884     24,148,525               0
------------------------------------------------------------------------------------------------------------------

Loading data to table tmp.checks_wgh4
Loading data to table tmp.checks_material
Table tmp.checks_wgh4 stats: [numFiles=253, numRows=58079, totalSize=421037, rawDataSize=10802694]
Table tmp.checks_material stats: [numFiles=253, numRows=314669, totalSize=1919892, rawDataSize=62304462]
OK
Time taken: 64.394 seconds
Переиспользование промежуточных данных в памяти позволяет ускорить запрос почти в 2 раза до 64 секунд.

Стоит заметить, что insert all есть почти во всех популярных бд.
Но в случае Oracle максимум, что можно сделать в insert all - это фильтрация перед вставкой в кокретную таблицу:
INSERT ALL
  WHEN id <= 3 THEN
    INTO dest_tab1 (id, description) VALUES (id, description)
  WHEN id BETWEEN 4 AND 7 THEN
    INTO dest_tab2 (id, description) VALUES (id, description)
  WHEN 1=1 THEN
    INTO dest_tab3 (id, description) VALUES (id, description)
SELECT id, description
FROM   source_tab
Hive же дает полный набор синтаксиса, включая группировки, агрегации и т.д.

Сравнение результатов

Сравнивать будем время вставки, объем данных и число выбранных данных.
В select сравниваем не время, т.к. слишком большая погрешность и многое зависит от числа читающих мапперов.
Лучше - меньше (выделено цветом):
Показатель По умолчанию Кластеризованная Кластеризованная и сортированная Кластеризованная, сортированная и с bloom фильтром
Время вставки (сек) 278 408 434 450
Размер (МБ) 1 712 1 673 967 1 017
Выборка по ключу (Магазин) строк 31 096 951 15 485 169 180 000 180 000
Выборка по вторичному ключу (Клиент) строк 31 096 951 31 096 951 4 216 492 10 000
Выборка по не ключевому полю (Акция) строк 31 096 951 31 096 951 31 096 951 330 000
Join по ключу (Магазин) сек 38.8 25 24.3 24.3

Сравнение ORC с Parquet

Создание таблицы

Создадим 2 одинаковые партицированно-кластеризованных таблицы
ORC
PARTITIONED BY (   `calday` string)
CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS 
stored as orc
tblproperties ("orc.bloom.filter.columns"="rt_promo", "orc.bloom.filter.fpp"="0.05")
Parquet (не поддерживает bloom filter)
PARTITIONED BY (   `calday` string)
CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS 
stored as parquet

Вставка и размер файлов

Вставим в таблицу 1 день продаж, всего 12 363 714 строк.

Размер получившихся таблиц:
ORC с ZLIB сжатием = 308.6 M
Parquet с GZIP чуть больше = 324.3 M
Orc на таблице продаж обеспечивает лучшее сжатие

Метаданные parquet файлов

Данные о данных можно посмотреть утилитой "parquet-tools", которую нужно установить отдельно.
Файл также бьется на области, текстовые поля кодируются справочниками. Но по метаданным не видно минимального/максимального значения.
$ hadoop jar ./parquet-tools-1.8.2.jar meta /apps/hive/warehouse/tmp/parquet/calday=20190601/000000_0

file schema:    hive_schema
--------------------------------------------------------------------------------
...
sales_unit:     OPTIONAL BINARY O:UTF8 R:0 D:1
cpsaexcubu:     OPTIONAL FIXED_LEN_BYTE_ARRAY O:DECIMAL R:0 D:1
...

row group 1:    RC:3303124 TS:682794260 OFFSET:4
--------------------------------------------------------------------------------
...
sales_unit:      BINARY GZIP DO:0 FPO:34513471 SZ:467378/1316597/2.82 VC:3303124 ENC:PLAIN_DICTIONARY,BIT_PACKED,RLE
cpsaexcubu:      FIXED_LEN_BYTE_ARRAY GZIP DO:0 FPO:34980849 SZ:3360465/26426447/7.86 VC:3303124 ENC:BIT_PACKED,RLE,PLAIN
...
Hive для parquet сохраняет названия полей в файле, а для orc хранит отдельно.
Но spark сохраняет названия полей и в ORC

Скорость полного чтения

ORC - 7 секунд , 4 маппера = 4 файлам
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from tmp.orc where calday = '20190601';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            4                0             0        6910.00        56,290        4,443     12,363,714               4
 Reducer 2            1                0             0        3201.00         2,270           56              4               0
------------------------------------------------------------------------------------------------------------------
Parquet - время чтения в 4 раза больше и почему то 4 файла читаются 22 мапперами:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from tmp.parquet where calday = '20190601';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           22                0             0       17097.00       253,100       16,257     12,363,714              22
 Reducer 2            1                0             0       11810.00         6,910          103             22               0
------------------------------------------------------------------------------------------------------------------

Скорость чтения с фильтром по полю CLUSTERED

ORC - считалось 2 из 4 бакета, но из файлов взялись только нужные stride, что дало 80т строк на входе мапперов:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from tmp.orc where calday = '20190601' and plant = '0002';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            2                0             0        2240.00         5,090          552         80,000               2
 Reducer 2            1                0             0         824.00         1,110           90              2               0
------------------------------------------------------------------------------------------------------------------
Parquet - отобралась почему то ровно половина строк, хотя есть точный фильтр по plant, который участвует в CLUSTERED и Sort
что по итогу дало время больше в 5 раз:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from tmp.parquet where calday = '20190601' and plant = '0002';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           22                0             0       10328.00       212,600       14,263      6,083,421              22
 Reducer 2            1                0             0        7686.00         2,060           66             22               0
------------------------------------------------------------------------------------------------------------------

Скорость чтения с фильтром по полю с Bloom filter

Parquet не поддерживаются настраиваемые bloom filter

ORC - читались все 4 файла, но благодаря bloom фильтру из 12 млн строк в мапперы попало только 130т
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from rdw.orc where calday = '20190601' and rt_promo = 'A122RP94'; 

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            4                0             0        4062.00        22,240        2,926        130,000               4
 Reducer 2            1                0             0        1632.00           730            0              4               0
------------------------------------------------------------------------------------------------------------------
Parquet - отбираются все 12М строки, как следствие запрос работает в 4 раза дольше
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from stg.parquet where calday = '20190601' and rt_promo = 'A122RP94'; 

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           22                0             0       16239.00       243,110       12,791     12,363,714              22
 Reducer 2            1                0             0       10475.00         9,760           94             22               0
------------------------------------------------------------------------------------------------------------------

Распространенность форматов

ORC на моих замерах выигрывает выигрывает во всех тестах. В чем же тогда такая популярность parquet?
Дата релиза Parquet - 13 March 2013, Orc - 20 February 2013. Практически одновременно.

Вероятная причина в том, что Parquet сразу делал клиентов под разные языки и нативные клиенты сразу могли использовать его, т.е. складывалась экосистема.
Orc долго использовался только в хайве и jvm языках. Только начиная с 16-17 годов был создан нативный c++ клиент, который могли использовать не jvm приложения.

Итоговое сравнение

Показатель Orc Parquet
Размер (МБ) 308.6 324.3
Полное сканирование (сек) 6.9 17.1
Селективность фильтра по кластеризованному столбцу (строк) 80 000 6 083 421
Селективность фильтра по Bloom фильтру (строк) 130 000 -
Распростаненность + +++
Дополнительно: benchmark ORC vs Parquet vs Avro vs Json

Hive: Predicate pushdown

Кроме фильтрации в where проверим передачу фильтра через join:

Join predicate pushdown

1 тест на обычных таблицах, чтобы проверить передачу фильтра в индексы Stripe.
bi0_pmaterial - справочник товаров отфильтрован по штрих коду и отбирает 1 товар.
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bi0_pmaterial m on m.material = p.material
where m.eanupc = '4615041324532';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           14                0             0       12214.00       217,980        8,513     31,096,952              14
     Map 3            4                0             0        6315.00        17,350           90        480,000               1
 Reducer 2            1                0             0        5590.00         1,200            0             14               0
------------------------------------------------------------------------------------------------------------------
Time taken: 14.558 seconds, Fetched: 1 row(s)
Фильтр по товару не передался в маппер, из продаж выбрались все данные.

Попробуем повторить тест на сортированной таблице с блум фильтрами на колонках товара (material)
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_4_bloom p
join bi0_pmaterial2 m on m.material = p.material
where m.eanupc = '4615041324532';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            8                0             0       38212.00       110,160        3,471     31,096,952               8
     Map 3            4                0             0         413.00         2,630           98        480,000               1
 Reducer 2            1                0             0       34809.00           600            0              8               0
------------------------------------------------------------------------------------------------------------------
Time taken: 39.68 seconds, Fetched: 1 row(s)
К сожалению фильтр по товару также не передался в маппер, из продаж выбрались все данные.
Похоже на текущий момент Hive не поддерживает pushdown предикатов из справочника на таблицу с фактами

Partition pruning

Проверим отсечение партиций при соединений через справочник.
Очень частый кейс для BI инструментов, где фильтрация происходит через календарь:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcust
from pos_rec_itm p
join calendar c on c.calday = p.calday
where c.calmonth = '201906' and c.calday1 IN( '01', '02')
;
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            8                0             0       17620.00       133,190        4,249              2               8
     Map 3            1                0             0        1847.00         5,030           97          3,655               2
 Reducer 2            1                0             0       11037.00         3,230           77              8               0
------------------------------------------------------------------------------------------------------------------
Time taken: 31.376 seconds, Fetched: 1 row(s)
Фильтр в продажи удачно передался и ненужные партиции откинулись из продаж
Это видно по плану - сгенерированный FIL_23 забродкастился в маппер фильтрации продаж :
Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Reducer 2
         File Output Operator [FS_12]
            compressed:false
            Statistics:Num rows: 1 Data size: 16
            Group By Operator [GBY_10]
            |  aggregations:["count(VALUE._col0)","sum(VALUE._col1)"]
            |  outputColumnNames:["_col0","_col1"]
            |  Statistics:Num rows: 1 Data size: 16
            |<-Map 1 [SIMPLE_EDGE] vectorized
               Reduce Output Operator [RS_9]
                  sort order:
                  Statistics:Num rows: 1 Data size: 16
                  value expressions:_col0 (type: bigint), _col1 (type: double)
                  Group By Operator [OP_28]
                     aggregations:["count()","sum(_col27)"]
                     outputColumnNames:["_col0","_col1"]
                     Statistics:Num rows: 1 Data size: 16
                     Map Join Operator [MAPJOIN_27]
                     |  condition map:[{"":"Inner Join 0 to 1"}]
                     |  HybridGraceHashJoin:true
                     |  keys:{"Map 3":"calday (type: string)","Map 1":"calday (type: string)"}
                     |  outputColumnNames:["_col27"]
                     |  Statistics:Num rows: 16551844040 Data size: 40150488709349
                     |<-Map 3 [BROADCAST_EDGE] vectorized
                     |  Reduce Output Operator [RS_24]
                     |     key expressions:calday (type: string)
                     |     Map-reduce partition columns:calday (type: string)
                     |     sort order:+
                     |     Statistics:Num rows: 457 Data size: 498587
                     |     Filter Operator [FIL_23]
                     |        predicate:(calday is not null and (calmonth = '201906') and (calday1) IN ('01', '02')) (type: boolean)
                     |        Statistics:Num rows: 457 Data size: 498587
                     |        TableScan [TS_1]
                     |           alias:c
                     |           Statistics:Num rows: 3655 Data size: 3987605
                     |  Dynamic Partitioning Event Operator [EVENT_20]
                     |     Statistics:Num rows: 457 Data size: 498587
                     |     Group By Operator [OP_26]
                     |        keys:_col0 (type: string)
                     |        outputColumnNames:["_col0"]
                     |        Statistics:Num rows: 457 Data size: 498587
                     |        Select Operator [OP_25]
                     |           outputColumnNames:["_col0"]
                     |           Statistics:Num rows: 457 Data size: 498587
                     |            Please refer to the previous Filter Operator [FIL_23]
                     |<-TableScan [TS_0]
                           alias:p
                           Statistics:Num rows: 15047130620 Data size: 36500443490101 Basic stats: COMPLETE Column stats: PARTIAL

Hive: BroadCast Join

Если таблица факта соединяется с достаточно небольшим справочником, то такой join может быть оптимизирован путем исключения фазы reducer и передачи хэш массива справочника во все мапперы, читающие таблицу факта для последующей фильтрации
Допустимый размер хэш массива задается параметром: hive.auto.convert.join.noconditionaltask.size
Документация: LanguageManualJoinOptimization-OptimizeChainsofMapJoins

Запустим запрос, но отключим broadcast, ограничив размер хэш массива:
set hive.auto.convert.join.noconditionaltask.size = 1;
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bi0_pmaterial m on m.material = p.material
where m.eanupc = '4615041324532';

  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           15                0             0       29683.00       517,290       17,651     31,096,951      31,096,951
     Map 4            1                0             0        3887.00        10,730          666        510,549               1
 Reducer 2          145                0             0       31143.00     1,535,900       24,726     31,096,952             145
 Reducer 3            1                0             0        8275.00        10,270          177            145               0
------------------------------------------------------------------------------------------------------------------
Time taken: 44.604 seconds, Fetched: 1 row(s)
Запрос выполнился за 44,6 секунды.
План стандартный через merge join: таблицы по отдельности читаются, сортируются, сливаются в отдельном reducer, а потом группируются.
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE)

Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Reducer 3
         File Output Operator [FS_13]
            compressed:false
            Statistics:Num rows: 1 Data size: 16
            Group By Operator [GBY_11]
            |  aggregations:["count(VALUE._col0)","sum(VALUE._col1)"]
            |  outputColumnNames:["_col0","_col1"]
            |  Statistics:Num rows: 1 Data size: 16
            |<-Reducer 2 [SIMPLE_EDGE]
               Reduce Output Operator [RS_10]
                  sort order:
                  Statistics:Num rows: 1 Data size: 16
                  value expressions:_col0 (type: bigint), _col1 (type: double)
                  Group By Operator [GBY_9]
                     aggregations:["count()","sum(_col27)"]
                     outputColumnNames:["_col0","_col1"]
                     Statistics:Num rows: 1 Data size: 16
                     Merge Join Operator [MERGEJOIN_18]
                     |  condition map:[{"":"Inner Join 0 to 1"}]
                     |  keys:{"0":"material (type: string)","1":"material (type: string)"}
                     |  outputColumnNames:["_col27"]
                     |  Statistics:Num rows: 17103323 Data size: 40110666850
                     |<-Map 1 [SIMPLE_EDGE] vectorized
                     |  Reduce Output Operator [RS_20]
                     |     key expressions:material (type: string)
                     |     Map-reduce partition columns:material (type: string)
                     |     sort order:+
                     |     Statistics:Num rows: 15548476 Data size: 36464241801
                     |     value expressions:rtsaexcust (type: double)
                     |     Filter Operator [FIL_19]
                     |        predicate:material is not null (type: boolean)
                     |        Statistics:Num rows: 15548476 Data size: 36464241801
                     |        TableScan [TS_0]
                     |           alias:p
                     |           Statistics:Num rows: 31096951 Data size: 72928481257
                     |<-Map 4 [SIMPLE_EDGE] vectorized
                        Reduce Output Operator [RS_22]
                           key expressions:material (type: string)
                           Map-reduce partition columns:material (type: string)
                           sort order:+
                           Statistics:Num rows: 127637 Data size: 684861875
                           Filter Operator [FIL_21]
                              predicate:(material is not null and (eanupc = '4615041324532')) (type: boolean)
                              Statistics:Num rows: 127637 Data size: 684861875
                              TableScan [TS_1]
                                 alias:m
                                 Statistics:Num rows: 510549 Data size: 2739452867
Теперь выполним без ограничений:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bi0_pmaterial m on m.material = p.material
where m.eanupc = '4615041324532';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           15                0             0       18323.00       228,930        7,344     31,096,952              15
     Map 3            1                0             0        2444.00         9,050          136        510,549               1
 Reducer 2            1                0             0       10164.00         2,250            0             15               0
------------------------------------------------------------------------------------------------------------------
Time taken: 22.365 seconds, Fetched: 1 row(s)
Время выполнения почти в 2 раза быстрей: 22,3 секунд.
По плану видно, что хэш массив создается до основного выполнения и передается во все мапперы, что позволяет выполнить запрос без передачи большой таблицы фактов в reducer.
Map 1 <- Map 3 (BROADCAST_EDGE)
Reducer 2 <- Map 1 (SIMPLE_EDGE)

Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Reducer 2
         File Output Operator [FS_13]
            compressed:false
            Statistics:Num rows: 1 Data size: 16
            Group By Operator [GBY_11]
            |  aggregations:["count(VALUE._col0)","sum(VALUE._col1)"]
            |  outputColumnNames:["_col0","_col1"]
            |  Statistics:Num rows: 1 Data size: 16
            |<-Map 1 [SIMPLE_EDGE] vectorized
               Reduce Output Operator [RS_10]
                  sort order:
                  Statistics:Num rows: 1 Data size: 16
                  value expressions:_col0 (type: bigint), _col1 (type: double)
                  Group By Operator [OP_24]
                     aggregations:["count()","sum(_col27)"]
                     outputColumnNames:["_col0","_col1"]
                     Statistics:Num rows: 1 Data size: 16
                     Map Join Operator [MAPJOIN_23]
                     |  condition map:[{"":"Inner Join 0 to 1"}]
                     |  HybridGraceHashJoin:true
                     |  keys:{"Map 3":"material (type: string)","Map 1":"material (type: string)"}
                     |  outputColumnNames:["_col27"]
                     |  Statistics:Num rows: 17103323 Data size: 40110666850
                     |<-Map 3 [BROADCAST_EDGE] vectorized
                     |  Reduce Output Operator [RS_21]
                     |     key expressions:material (type: string)
                     |     Map-reduce partition columns:material (type: string)
                     |     sort order:+
                     |     Statistics:Num rows: 127637 Data size: 684861875
                     |     Filter Operator [FIL_20]
                     |        predicate:(material is not null and (eanupc = '4615041324532')) (type: boolean)
                     |        Statistics:Num rows: 127637 Data size: 684861875
                     |        TableScan [TS_1]
                     |           alias:m
                     |           Statistics:Num rows: 510549 Data size: 2739452867
                     |<-Filter Operator [FIL_22]
                           predicate:material is not null (type: boolean)
                           Statistics:Num rows: 15548476 Data size: 36464241801
                           TableScan [TS_0]
                              alias:p
                              Statistics:Num rows: 31096951 Data size: 72928481257

Skew Join

Проявление

Одна из распространенных проблем при выполнении параллельного join - перекос данных к одному из значений.
Перекос - это когда основной объем данных приходятся к одному уникальному значению ключа.
Тогда при выполнение hash join или merge join одно уникальное значение уходит в 1 из потоков/reducer и запрос по итогу выполняется однопоточно.
В Spark это будет выглядеть так (медианное время = 0,4с, максимальное = 2,2 минуты ):


Проблема перекоса данных в Oracle описывалась раньше

Смоделируем перекос данных пересоздав таблицу отдав 98% данных одному значению "/bic/client" = '0010000273'
truncate table pos_1_def;
insert into pos_1_def
select v.*,
case when calday = '20190601' and material like '%9' then coalesce(`/bic/client`,'0010000273') else '0010000273' end as client_id
from pos_rec_itm v
where calday between '20190601' and '20190610'
Проверим распределение:
select COUNT(*) as cnt, COUNT(case when client_id = '0010000273' then 1 end) as cnt_skew, COUNT(case when client_id = '0010000273' then 1 end) / COUNT(*) as prc 
from pos_1_def;
103507863       102337257       0.9886906562837646
Выполним запрос на таблице по умолчанию:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bic_pclient m on m.client_id = p.client_id
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           40                0             0       76687.00     2,120,690      165,023    103,507,863     103,507,863
     Map 4          253                0             0       70310.00     3,217,510      349,357     38,007,934      38,007,934
 Reducer 2          253                0             0      493257.00     3,056,730       82,336    141,515,797             253
 Reducer 3            1                0             0      459041.00        11,120          112            253               0
------------------------------------------------------------------------------------------------------------------
Time taken: 552.744 seconds, Fetched: 1 row(s)
552 секунд и последний job "reducer 2" работал дольше всего.

Исправление через Union и Broadcast

Одним из способов устранения перекоса - это выполнение Broadcast HJ.
В этом случае не происходит перераспределение (reduce / shuffle) данных по ключу и данные джойнятся прямо в мапперах.
Если таблица-справочник слишком большая, чтобы сделать Broadcast, то запрос можно разбить на 2 части:
* перекошенное значение, которое точно влезет в Broadcast
* все остальные значения (т.к. они равномерные, то тип join тут не принципиален)
select /*+ MAPJOIN(m) */ COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join (select MAX(client_id) as client_id from bic_pclient where client_id = '0010000273' group by client_id) m on m.client_id = p.client_id
where p.client_id = '0010000273' 
UNION ALL
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bic_pclient m on m.client_id = p.client_id
where p.client_id != '0010000273';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1          253                0             0       82988.00     1,945,220      306,966      2,530,000               1
     Map 3           40                0             0       14952.00       726,940       98,558    103,507,864              40
     Map 6            4                0             0       19629.00        63,500        5,461     12,363,714       1,170,606
     Map 9          253                0             0       71837.00     4,136,120      438,018     38,007,934      38,007,933
 Reducer 2            6                0             0        8323.00        14,120          157              1               1
 Reducer 4            1                0             0        2605.00         4,680          165             40               0
 Reducer 7          253                0             0       26512.00     1,745,820       40,359     39,178,539             253
 Reducer 8            1                0             0        6852.00         1,730            0            253               0
------------------------------------------------------------------------------------------------------------------
Time taken: 108.231 seconds, Fetched: 2 row(s)
Запрос выполнился за 108 секунд, т.е. в 5 раз быстрей.

К минусам можно отнести повторное чтение фактовой таблицы.

Исправление через Salting

Второй, наиболее универсальный способ, который можно выполнить всегда, даже если нет Broadcast - это создание сурогатного ключа с равномерным распределением:
* в фактовой таблице создаем колонку salt, где для перкошенного значения будут случайные числа от 0 до 15. У остальных значений будет фиксировано 0
case when v.client_id != '0010000273' then 0 else FLOOR(RAND() * 16) end
* перекошенное значение в справочнике размножаем на число строк колонки salt
select v.client_id, salt from bic_pclient v LATERAL VIEW explode(ARRAY(0,1,2,3,4,5,6,7 ,8,9,10,11,12,13,14,15)) adTable as salt where v.client_id = '0010000273'
Остальные значения справочника добавляем без изменений с фиксированным значеним salt = 0
* После этого выполняем соединение уже по 2 ключам client_id и salt.
В этом случае перкошенное значение разобьется на 16 тасков и выполнится пропорционально быстрей:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from (
    select v.rtsaexcust, v.client_id, case when v.client_id != '0010000273' then 0 else FLOOR(RAND() * 16) end as salt from pos_1_def v
) p
join (
    select v.client_id, salt from bic_pclient v LATERAL VIEW explode(ARRAY(0,1,2,3,4,5,6,7 ,8,9,10,11,12,13,14,15)) adTable as salt where v.client_id = '0010000273'
    UNION ALL
    select v.client_id, 0 as salt from bic_pclient v where v.client_id != '0010000273'
) m on m.client_id = p.client_id AND m.salt = p.salt
;
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           40                0             0       69224.00     1,349,340      178,767    103,507,863     103,507,863
     Map 4          253                0             0      105130.00     2,810,580      553,729      2,530,000              32
     Map 6          253                0             0       85245.00     4,386,890      433,833     38,007,934      38,007,933
 Reducer 2          253                0             0      106096.00     4,952,640      169,951    141,515,828             253
 Reducer 3            1                0             0       62534.00         2,960            0            253               0
------------------------------------------------------------------------------------------------------------------
Time taken: 194.003 seconds, Fetched: 1 row(s)
194 секунды - в 3 раза быстрей изначального варианта.
Число значений в salt можно увеличивать, улучшая тем самым параллельность, но нужно помнить, что тем самым мы увеличиваем и размер таблицы-справочника.
Данный способ наиболее универсален и применим, если нет возможности сделать BHJ.

Исправление встроенными средства Hive

Документация hive-skew: LanguageManualDDL-SkewedTables
* При создании таблицы нужно указать колонку и значение с перекосом
* Пересоздадим таблицу
* Hive в этом случае разбивает данные на 2 части: подкаталог с перекошенным значением "/%2Fbic%2Fclient=0010000273/" и каталог с остальным:
SKEWED BY (`/bic/client`) ON ('0010000273') STORED AS DIRECTORIES
$ hadoop fs -du -h /apps/hive/warehouse/pos_1_def
199.7 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000000_0
18.2 M   /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000001_0
197.9 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000002_0
148.3 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000003_0
221.1 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000004_0
20.6 M   /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000005_0
219.8 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000006_0
73.7 M   /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000007_0
221.3 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000008_0
20.5 M   /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000009_0
159.3 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000010_0
22.9 M  /apps/hive/warehouse/pos_1_def/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/000000_0
2.1 M   /apps/hive/warehouse/pos_1_def/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/000001_0
22.1 M  /apps/hive/warehouse/pos_1_def/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/000002_0
16.5 M  /apps/hive/warehouse/pos_1_def/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/000003_0
И выполним запрос с включенной оптимизацией перекосов "hive.optimize.skewjoin=true":
set hive.optimize.skewjoin=true;
set hive.skewjoin.key=5000;
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bic_pclient m on m.`/bic/client` = p.`/bic/client`;

--все также последний редюсер тормозит
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           15                0             0       25306.00       434,520       15,334     31,096,951      31,096,951
     Map 4            8                0             0       28176.00       427,410       12,580     26,682,353      26,682,353
 Reducer 2          253                0             0      128475.00     2,726,900       45,064     57,779,304           7,454
 Reducer 3            1                0             0      100911.00         8,820           86          7,454               0
------------------------------------------------------------------------------------------------------------------
Time taken: 142.052 seconds, Fetched: 1 row(s)
Ииии.... получаем тоже самое
Либо оптимизация не работает или я делаю что-то не так.

Исправление в Spark

Абсолютно аналогично (salting/broadcast) решается проблема перекоса данных в Spark.
Отмечу одну особенность: замен RAND можно использовать monotonically_increasing_id:
val df = spark.read.table("pos_1_def").select($"rtsaexcust", $"client_id").
withColumn("salt", when($"client_id" === "0010000273", monotonically_increasing_id%16).otherwise(0))
df.createOrReplaceTempView("tmp_pos_1_def")

В databricks есть встроенные оптимизации

Hive 3: Materialized view

Начиная с 3 версии Hive поддерживаем материализованные представления и соответственно rewrite запросов на select из matview (hive.materializedview.rewriting).
Документация: Materialized+views

К примеру:
* Maview:
CREATE MATERIALIZED VIEW mv1
AS
SELECT empid, deptname, hire_date
FROM emps JOIN depts
  ON (emps.deptno = depts.deptno)
WHERE hire_date >= '2016-01-01';
При выполнении запроса:
SELECT empid, deptname
FROM emps
JOIN depts
  ON (emps.deptno = depts.deptno)
WHERE hire_date >= '2018-01-01'
    AND hire_date <= '2018-03-31';
Будет подменена на запрос из представления:
SELECT empid, deptname
FROM mv1
WHERE hire_date >= '2018-01-01'
    AND hire_date <= '2018-03-31';

Также Hive поддерживает инкрементальное обновление. Для этого требуется:
* Включенный ACID на таблицах источниках для трекинга измененных строк
* Включенный ACID на matview для выполнения merge в представление

Если matview устарела и не была обновлена, то rewrite автоматически отключается.
Но можно принудительно включить rewrite на промежуток времени до обновления целиком в системе:
SET hive.materializedview.rewriting.time.window=10min;
или этот же параметр может быть установлен на уровне конкретных таблиц.

Hive LLAP

Начиная с 2 версии Hive поддерживает LLAP (Live Long And Process) сервер запросов
Основные отличия от обычного Hive:
* постоянно запущенный демон, что уменьшает время старта запроса
* обращение к данным происходит через демона LLAP, что дает возможность кэширования и prefetching данных между запросами
- Минус, что под кэш нужно много Ram, по этому llap обычно отдается сервер целиком и на нем больше ничего не ставят.

Iceberg

Iceberg - набирающий популярность формат данных поддерживающий расширенный функционал: конкурентное обновление и полная поддержка DML, эволюция схемы, скрытое партицирование и другое о чем дальше.

Схема формата данных

Составлюящие:
- iceberg catalog - текущий указатель на метеданные для каждой таблицы.
Нужна поддержка атомарности и реализуется 2 способами:
* file-based catalogs (HDFS, S3) - через атомарное изменение файла version-hint.text
* service-based catalogs (Hive Metastore, Nessie) - через изменение поля location в таблице tables
В пути хранится адрес файла метаданных metadata.json.

- metadata file - содержит в себе схему, партиции, сортировки, список снапшотов и какой из них активен сейчас
* снапшот (ветка) создается после каждой dml операции
из текущего снапшота берется location файла со список manifest (manifests_lists)
* в manifests_lists содержатся статистики, по которым отфильтровываваются конкретные manifests
* внутри manifests уже содержатся детальные статистики каждого отдельного дата файла, по которым можно дофильтровать файлы
под каждый dml создается свой metadata file , по этому можно менять схему таблицы или перепатицировать на лету
если перепартицируем на том же столбце, то партишен прунинг все равно будет на уровне значений в метаданных

Конкурентный доступ

- запись конкурентная и атомарная за счет оптимистичных блокировок
в случая 2 одновременных изменений, последний будет рестартован
Все это обеспечивает Serializable isolation - максимальную изоляцию транзакций при записи и чтении.

- Как реализуется частичное изменение данных на примере Delete Query
2 стратегии:
* copy-on-write
заменяются файлы с данными на новые (только подходящие под фильтр удаления)
* merge-on-read
читаются файлы по фильтру удаления, записывается файл с позициями для исключения при чтении

Улучшение партицирования

- Hidden Partitioning
В описании партиции указывается колонка и функция трансформации:
"partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
        "name" : "ts_hour",
        "transform" : "hour",
        "source-id" : 2,
        "field-id" : 1000
    } ]
} ],
если типы данных в фильтре select и описании партиции сравнимы, то они будут преобразованы функцией трансформации и применены над метаданными до непосредственного обращения к файлам данным.
Кроме более гибкого обращения к колонкам партицирования это улучшает скорость построения плана запроса, т.к. уже не требуется сканировать папки партиций, как это делается в классическом подходе.

- spark поддерживает storage partition join для iceberg таблиц,
join будет без стадии шафла, если 2 таблицы имеют одинаковую спецификацию partition by

Версионирование

- Обращение к предыдущим версиям данных
SELECT *
FROM table1 AS OF '2021-01-28 00:00:00'

- Работа с данными как с ветками git
бранч можно создать по времени или по снапшоту
список снапшотов:
SELECT * FROM <database>.<table name>.HISTORY
создание бранча
ALTER TABLE <table name> CREATE BRANCH <branch name> FOR SYSTEM_VERSION AS OF <SNAPSHOT_ID>
или с актуальной версии
ALTER TABLE <table name> CREATE BRANCH <branch name>
дальше обращаемся к брачну по имени
FROM <database name>.<table name>.branch_<branch name>
когда все изменения в бранче готовы, переключаем указатель
ALTER TABLE <table name> EXECUTE FAST-FORWARD 'x' 'z';
тем самым можно незаметно для пользователей делать ряд не атомарных изменений, а когда все готово публиковать готовое

Компакция и распределение

CALL catalog.system.rewrite_data_files(
  table => 'db.table1', 
  strategy => 'binpack', 
  options => map('min-input-files','2')
)
компкакция периода
CALL catalog_name.system.rewrite_data_files(
  table => 'db.sample', 
  where => 'ingested_at between 
  	"2022-06-30 10:00:00" 
  	and "2022-06-30 11:00:00" '
)
компакция с сортировкой
CALL catalog.system.rewrite_data_files(
    table => 'db.teams', 
    strategy => 'sort', 
    sort_order => 'team ASC NULLS LAST, name DESC NULLS FIRST'
  )
В sort_order поддерживается zorder распределение по нескольким столбцам:

- как-нибудь поговорим о нем отдельно.

Более подробно о iceberg вы можете почитать тут: https://www.dremio.com/resources/guides/apache-iceberg-an-architectural-look-under-the-covers/

5 комментариев:

  1. Крутой материал получился! Спасибо!
    Есть такой вопрос: правильно ли я понимаю, что Хайв до сих пор не умеет создавать таблицу со схемой и данными из файлов орк, паркет и как импортить данные в Хайв в таких случаях?

    ОтветитьУдалить
    Ответы
    1. Hive не сохраняет названия столбцов в orc файлы, а берет их из метастора
      Насчет как импортировать: описать схему в метасторе и копировать файлы или загружать sparkом, он вытаскивает схему из файлов

      Удалить
    2. В спарке не силён, буду пробовать sqoop-ом) Еще, схему то не сложно выудить, только вот при создании таблицы с указанием схемы Hive валится и говорит, что не поддерживает struct, думаю вариант только разбивать массив на дополнительные колонки.

      Удалить
  2. Спасибо огромное! Очень интересная статья!
    Какая версия Hive использовалась?

    ОтветитьУдалить