понедельник, 19 августа 2019 г.

Оптимизация хранения данных в Orc

В этой статье хотел бы объединить знания по оптимизации хранения данных в Hadoop для Hive с упором на Orc тип файла.
Кому лень читать, могу перейти сразу к сравнению результатов

ORC: формат файла

Документация: https://orc.apache.org/specification/ORCv1/
ORC - это колоночный формат хранения данных, но колоночность в нем ограниченная, т.к. строки таблицы разбиваются на несколько частей:
Данные делятся на файлы по размеру, внутри строки разбиваются на stripe (размер stripe задаются параметром orc.stripe.size - обычно 64МБ).

Stripe состоит из частей:
* Index Data:
 ** Min/Max значение, количество строк, наличие NULL для каждой колонки внутри stripe целиком
 ** Index c Min/Max значением колонки для каждых 10 тыс. строк (hive.exec.orc.default.row.index.stride)
 ** Bloom filter для указанных колонок
 ** Смещение до данных колонки в Row Data из индекса

* Row Data - сжатые данные колонок (сжатие по умолчанию orc.compress = ZLIB)
Способы сжатия:
 ** Числовые данные хранятся с переменной длинной: <= 128 - 1 байт, <= 16383 - 2 байт и т.д.
 ** Повторяющиеся числа сохраняются как дельта
 ** Строки кодируются справочником.
Если число уникальных значений не превышает 0,8 (hive.exec.orc.dictionary.key.size.threshold) от общего числа строк, то на основании данных строковой колонки создается справочник и упорядочивается для бинарного поиска. В Row Data же вставляется id справочника.

* Footer - данные о кодировке

Такой формат хранения позволяет отсекать как строки читая индексы файла и stripe, так и колонки используя смещения в stripe.



Посмотрим влияние параметров orc на размер, время вставки:

Параметры по умолчанию

Параметры и таблица по умолчанию для сравнения с оптимизациями:
Это будет таблица с продажами за 3 дня - 31 млн строк. Состоит из 42 столбца из которых 28 это ключевые строковые поля и 14 числовых показателей.
Все запросы будут выполняться с принудительным bucketing и sorting, также с логированием performance параметров.
set hive.server2.logging.operation.level=performance;
set hive.enforce.bucketing = true; 
set hive.enforce.sorting = true;

insert into pos_1_def
select * from pos_rec_itm
where calday between '20190601' and '20190603';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           11                0             0      276213.00     1,601,560        8,928     31,096,951               0
------------------------------------------------------------------------------------------------------------------

Loading data to table pos_1_def
Table pos_1_def stats: [numFiles=11, numRows=31096951, totalSize=1795231673, rawDataSize=72898554240]
Time taken: 278.238 seconds
При параметрах по умолчанию время вставки составляет 278 секунд и размер файла получается 1712 МБ

Если задампить блок, то большинство строковых колонок содержат секцию со справочником:
Stream: column 9 section DATA start: 1060399 length 203242
Stream: column 9 section LENGTH start: 1263641 length 101
Stream: column 9 section DICTIONARY_DATA start: 1263742 length 64140
Количество страйпов в 1 файле = 66
hive --orcfiledump /apps/hive/warehouse/pos_1_def/000000_0 | grep 'Stripe:' | wc -l
66

Максимальная доля уникальных значений для создания справочника

Попробуем уменьшить hive.exec.orc.dictionary.key.size.threshold с 0,8 до 0,2 и посмотреть как это повлияет:
set hive.exec.orc.dictionary.key.size.threshold = 0.2;
truncate table pos_1_def;
insert into pos_1_def
select * from pos_rec_itm
where calday between '20190601' and '20190603'
;
Table pos_1_def stats: [numFiles=11, numRows=31096951, totalSize=1983777997, rawDataSize=72898554240]
Time taken: 196.922 seconds
Время вставки ускорилось с 278 секунд до 197 секунд, а рамзер файла возрос до 1892 МБ.

Из дампа у 9 строковой колонки пропал справочник:
Stream: column 9 section DATA start: 5890885 length 1096170
Stream: column 9 section LENGTH start: 6987055 length 929

Если наоборот увеличить необходимость справочника до 1:
set hive.exec.orc.dictionary.key.size.threshold = 1;
truncate table pos_1_def;
insert into pos_1_def
select * from pos_rec_itm
where calday between '20190601' and '20190603'
;
Table pos_1_def stats: [numFiles=11, numRows=31096951, totalSize=1784757162, rawDataSize=72898554240]
Time taken: 322.133 seconds
То время вставки возрастает до 322 секунд, а размер файла значительно не меняется.

Вывод: лучше оставить этот параметр по умолчанию = 0.8, как баланс между размером файла и скоростью вставки.
Отключать справочники, видимо, имеет смысл, если в таблице множество уникальных небольших строк, т.к. в этом случае замена короткой строки на длинное число из справочника не принесет должного сжатия.

Количество строк в stride

Попробуем поварировать число строк в индексе hive.exec.orc.default.row.index.stride (по умолчанию = 10000 ).
Но проводить тест будем на сортированной таблице, т.к. на стандартной таблице индексы почти не отфильтровывают данные.
Стандартная сортированная таблица:
set hive.exec.orc.default.row.index.stride=10000;
truncate table pos_3_sort;
insert into pos_3_sort
select * from pos_rec_itm
where calday between '20190601' and '20190603'
;
Table pos_3_sort stats: [numFiles=4, numRows=31096951, totalSize=1015100427, rawDataSize=72898553064]
Time taken: 433.964 seconds
Что дает 968 МБ данных и 22 Stripe:
$ hive --orcfiledump /apps/hive/warehouse/pos_3_sort/000000_0 | grep 'Stripe:' | wc -l
$ 22
При отборе данных отфильтровываются данные индексом до 4млн:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where `/bic/client` = '0010000273';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           23                0             0       10525.00       278,840       15,736      4,216,492              23
 Reducer 2            1                0             0        2985.00         1,850           80             23               0
------------------------------------------------------------------------------------------------------------------

Time taken: 11.599 seconds, Fetched: 1 row(s)
Увеличим число строк в индексе в 10 раз:
set hive.exec.orc.default.row.index.stride=100000;
truncate table pos_1_def;
insert into pos_1_def
select * from pos_rec_itm
where calday between '20190601' and '20190603'
;
Table pos_3_sort stats: [numFiles=4, numRows=31096951, totalSize=1013477928, rawDataSize=72898553064]
Time taken: 475.205 seconds
Размер файла, как и время вставки осталось примерно одинаково.
Число Stripe уменьшились почти в 2 раза:
$ hive --orcfiledump /apps/hive/warehouse/pos_3_sort/000000_0 | grep 'Stripe:' | wc -l
$ 12
Но вот эффективность индекса при поиске по ключу значительно снижилась с 4 млн до 24 млн:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where `/bic/client` = '0010000273';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           24                0             0       11169.00       308,520       17,330     24,316,951              24 
 Reducer 2            1                0             0        3525.00         1,190           52             24               0
------------------------------------------------------------------------------------------------------------------

Time taken: 12.852 seconds, Fetched: 1 row(s)
Вывод: изменение числа строк в индексе в моем примере существенно не влияет на время вставки и размер таблицы, но хорошо помогает при последующих select.
Если в таблице размер строки был меньше, то метаданные индекса могли создать дополнительную нагрузку на вставку и размер файла.
Также если значение колонки регулярно присутствует во всех продажах, то индекс не даст эффекта, т.к. значение попадет во все Stripe. Т.е. индекс Min/Max для диапазона эффективен для редковстречающихся значений (id клиента), и малоэффективен для повторяющихся значений (ид популярной акции или товара)

Партицирование

Не буду заострять внимание на этой опции. Партицирование есть во всех современных бд.
Основное преимущество HIVE партицирования перед Oracle:
* Неограниченная вложенность
* автоматическое создание на оснований уникальных значений в колонке
Стоит обратить внимание, что hive поддерживает отсечение партиций не только по ключу, но и при join через справочник

Кластеризация

Другой возможностью разбиения данных на части, помимо партицирования, является кластеризация. В отличии от партицирования, кластеризация разбивает данные на файлы.
Документация: LanguageManual+DDL+BucketedTables
Аналогом в Oracle является хэш партицирование.

Время вставки

Замерим показатели размера, время вставки и скорости select для кластеризованной таблицы.
Кластеризуем таблицу на 4 части по колонке магазина:
CLUSTERED BY(plant) INTO 4 BUCKETS
insert into pos_2_clust
select * from pos_rec_itm
where calday between '20190601' and '20190603';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           11                0             0       97875.00       850,680       17,647     31,096,951      31,096,951
 Reducer 2            4                0             0      387931.00     1,139,700        4,545     31,096,951               0
------------------------------------------------------------------------------------------------------------------

Loading data to table pos_2_clust
Table pos_2_clust stats: [numFiles=4, numRows=31096951, totalSize=1754969327, rawDataSize=72898553064]
Time taken: 408.902 seconds
Отличия в вставке от обычной таблицы:
* Появляется Reducer в который отправляются данные нужной хэш секции магазина.
* По этому время вставки возросло с 278 секунд до 408 (на 50% дольше)
* Размер данных незначительно сократился с 1712 МБ до 1673 МБ за счет локальности магазинов в одном файле

Select по ключу кластеризации

Сравним время выборки по ключу магазина для обычной таблицы и кластеризованной
Выборка по ключу из обычной таблицы считывает все 31 млн строк за 12 секунд:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_1_def where plant = '0002';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           14                0             0       10740.00       173,570        7,868     31,096,951              14
 Reducer 2            1                0             0        6033.00         1,280           67             14               0
------------------------------------------------------------------------------------------------------------------
Time taken: 12.265 seconds, Fetched: 1 row(s)
Из кластеризованной же считывается половина строк (15,5 млн) за 11 секунд:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_2_clust where plant = '0002';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            6                0             0        9935.00        63,060        3,315     15,485,169               6
 Reducer 2            1                0             0        7096.00           350           29              6               0
------------------------------------------------------------------------------------------------------------------
Time taken: 10.889 seconds, Fetched: 1 row(s)
Замечание: странно, что считалась половина строк, тогда как 1 магазин должен был попасть в 1 кластер, что должно было сократить объем чтения в 4 раза (1 / число кластеров)

Сортировка

Еще сильней сжать данные можно, если отсортировать их. Тогда Orc сможет сжать файлы максимально эффективно.
Документация: LanguageManualDDL-BucketedSortedTables
Основной сложностью при сортировке таблицы является выборк колонки по которой это сделать.
При выборе я руководствовался 2 факторами:
* Частота использования колонки в фильтрах
* Размер данных после сортировки
Для определения популярных фильтров на колонке можно распарсить логи запросов.
Для этого был написан небольшой скрипт: hive_predicate_stat.py, по результату которого стало ясно, что самые популярные фильтр по магазину (plant)

Для проверки размера данных проведем несколько замеров на 1 дне (calday):

Размер данных

Исходный размер одного дня без сортировок 688,5МБ:
CLUSTERED BY(material)        SORTED BY(material, `/bic/client`, plant)  INTO 4 BUCKETS  -- Кластеризация по товару, сортировка по товару, клиенту и магазину   == 498 МБ
CLUSTERED BY(material)        SORTED BY(`/bic/client`, plant, material)  INTO 4 BUCKETS  -- Кластеризация по товару, сортировка по клиенту и магазину           == 454 МБ
CLUSTERED BY(`/bic/client`)   SORTED BY(`/bic/client`, plant, material)  INTO 4 BUCKETS  -- Кластеризация по клиенту, сортировка по клиенту, магазину и товару  == 388 МБ
CLUSTERED BY(plant)           SORTED BY(plant, `/bic/client`, material)  INTO 4 BUCKETS  -- Кластеризация по магазину, сортировка по магазину и клиенту         == 374.5 МБ
Исходя из этого был выбран вариант сжатия, который как ускорял выборку, так и хорошо сжимал данные: CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS

Замечу, что чем больше число бакетов, тем лучше будет отсечение лишних данных, но хуже сжатие.
Так 16 бакетов сжали данные до 554 МБ, тогда как 4 бакета дали 454 МБ
CLUSTERED BY(material)        SORTED BY(`/bic/client`, plant, material)  INTO 16 BUCKETS -- Кластеризация по товару, сортировка по товару, клиенту и магазину   == 554 МБ 
так что степень = 4, выбрана как компромисс

Время вставки

Вставка данных в кластеризованно-сортированную таблицу: CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS
insert into pos_3_sort
select * from pos_rec_itm
where calday between '20190601' and '20190603'
;

Task Execution Summary
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           11                0             0       95966.00       900,790       18,164     31,096,951      31,096,951
 Reducer 2            4                0             0      412328.00     1,228,640        5,323     31,096,951               0
------------------------------------------------------------------------------------------------------------------

Loading data to table pos_3_sort
Table pos_3_sort stats: [numFiles=4, numRows=31096951, totalSize=1014407061, rawDataSize=72898553064]
Time taken: 433.964 seconds
Отличия от обычной таблицы:
* Появляется Reducer в который отправляются данные нужной хэш секции магазина и сортирует.
* По этому время вставки возросло с 278 секунд до 433 секунд, что незначительно больше 408с при просто кластеризованной таблице
* Размер данных сократился с 1712 МБ до 967 МБ, т.е. почти в 2 раза!

Select по первому ключу сортировки

Если вернуться к описанию хранения индексов, видно, что данные по ключу могут попасть в любой stripe случайным образом, что значительно ухудшает эффективность встроенных индексов Orc
Если же данные отсортировать по ключу поиска, то уникальные значения лягут в один Stripe и будут максимально эффективно отфильтровываться

<< Слева показан пример с выборкой уникального значения по обычной таблице и сортированной (красным - отсеченные stripe, зеленым - считываемые)
Проверим на реальных данных.
Выборка по ключу на обычной таблице считывает все 31 млн строк за 12 секунд:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from tmp.pos_1_def where plant = '0002';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           14                0             0       10740.00       173,570        7,868     31,096,951              14
 Reducer 2            1                0             0        6033.00         1,280           67             14               0
------------------------------------------------------------------------------------------------------------------
Time taken: 12.265 seconds, Fetched: 1 row(s)
По отсортированной таблице считывается в разы меньший объем строк за 4 секунды (в 3 раза быстрей):
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where plant = '0002';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            2                0             0        3039.00         8,760          872        180,000               2
 Reducer 2            1                0             0        1594.00         1,700           42              2               0
------------------------------------------------------------------------------------------------------------------
Time taken: 4.342 seconds, Fetched: 1 row(s)

Select по второму ключу сортировки

Выборка по второму ключу поиска считывает все бакеты, но все равно отбирает значительно меньше строк из Stripe:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where `/bic/client` = '0010000273';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           23                0             0       10525.00       278,840       15,736      4,216,492              23
 Reducer 2            1                0             0        2985.00         1,850           80             23               0
------------------------------------------------------------------------------------------------------------------
Time taken: 11.599 seconds, Fetched: 1 row(s)

Select по ключу без сортировки

Select по ключу без сортировки получается одинаковый по обоим таблицам:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_1_def where rt_promo = 'A122RP94';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           14                0             0       10926.00       173,190        8,356     31,096,951              14 
 Reducer 2            1                0             0        6504.00         1,190           46             14               0
------------------------------------------------------------------------------------------------------------------
Time taken: 11.675 seconds, Fetched: 1 row(s)
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_3_sort where rt_promo = 'A122RP94';  
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            6                0             0       10300.00        71,340        3,946     31,096,951               6
 Reducer 2            1                0             0        7234.00         1,950          111              6               0
------------------------------------------------------------------------------------------------------------------
Time taken: 11.384 seconds, Fetched: 1 row(s)

Sort Merge Bucket Join

Дополнительное преимущество которое дает сортированность таблиц - это возможность выполнения sort merge join без собственно сортировки и пересылки данных в reducer.

Отсортируем и кластеризуем все таблицы в join одиково по магазину.
Также нужно включить sortmerge и bucketmapjoin, которые выключены по умолчанию в Hive.
Документация: LanguageManualJoinOptimization-AutoConversiontoSMBMapJoin

План по умолчанию на обычных таблицах:
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.convert.join.bucket.mapjoin.tez=true; 

select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bi0_pplant m on m.plant = p.plant;
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           14                2             0       25340.00       498,640       16,367     31,096,951      31,096,951
     Map 4            1                0             0        7087.00        21,530          851          5,000           5,000
 Reducer 2          145                0             0       18011.00       988,800       15,270     31,101,951             145
 Reducer 3            1                0             0        6623.00         4,490           59            145               0
------------------------------------------------------------------------------------------------------------------
Time taken: 38.847 seconds, Fetched: 1 row(s)
Join обычных таблиц генерирует 2 мапера, в которых считываются и сортируются данные.
После чего порции данных с одним ключем отправляются в Reducer и в нем происходит слияние 2 таблиц и последующий подсчет count и sum
В 3ий единственный reducer отправляется промежуточный результат count и sum , где досчитывается итоговая сумма и кол-во.
Самой дорогой операцией тут является Map1 - сортировка данных и отправка промежуточных результатов в Reducer2
Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Reducer 3
         File Output Operator [FS_12]
            compressed:false
            Statistics:Num rows: 1 Data size: 16
            Group By Operator [GBY_10]
            |  aggregations:["count(VALUE._col0)","sum(VALUE._col1)"]
            |  outputColumnNames:["_col0","_col1"]
            |  Statistics:Num rows: 1 Data size: 16
            |<-Reducer 2 [SIMPLE_EDGE]
               Reduce Output Operator [RS_9]
                  sort order:
                  Statistics:Num rows: 1 Data size: 16
                  value expressions:_col0 (type: bigint), _col1 (type: double)
                  Group By Operator [GBY_8]
                     aggregations:["count()","sum(_col27)"]
                     outputColumnNames:["_col0","_col1"]
                     Statistics:Num rows: 1 Data size: 16
                     Merge Join Operator [MERGEJOIN_17]
                     |  condition map:[{"":"Inner Join 0 to 1"}]
                     |  keys:{"0":"plant (type: string)","1":"plant (type: string)"}
                     |  outputColumnNames:["_col27"]
                     |  Statistics:Num rows: 17103323 Data size: 40094206990
                     |<-Map 1 [SIMPLE_EDGE] vectorized
                     |  Reduce Output Operator [RS_19]
                     |     key expressions:plant (type: string)
                     |     Map-reduce partition columns:plant (type: string)
                     |     sort order:+
                     |     Statistics:Num rows: 15548476 Data size: 36449278292
                     |     value expressions:rtsaexcust (type: double)
                     |     Filter Operator [FIL_18]
                     |        predicate:plant is not null (type: boolean)
                     |        Statistics:Num rows: 15548476 Data size: 36449278292
                     |        TableScan [TS_0]
                     |           alias:p
                     |           Statistics:Num rows: 31096951 Data size: 72898554240
                     |<-Map 4 [SIMPLE_EDGE] vectorized
                        Reduce Output Operator [RS_21]
                           key expressions:plant (type: string)
                           Map-reduce partition columns:plant (type: string)
                           sort order:+
                           Statistics:Num rows: 2500 Data size: 4681364
                           Filter Operator [FIL_20]
                              predicate:plant is not null (type: boolean)
                              Statistics:Num rows: 2500 Data size: 4681364
                              TableScan [TS_1]
                                 alias:m
                                 Statistics:Num rows: 5000 Data size: 9362729
Повторим тест, но на одинаково кластеризованно-сортированных таблицах:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_3_sort p
join bi0_pplant m on m.plant = p.plant;
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            6                0             0       23180.00       201,040        8,189     31,096,951               6
 Reducer 2            1                0             0       17839.00         1,200           46              6               0
------------------------------------------------------------------------------------------------------------------
Time taken: 24.346 seconds, Fetched: 1 row(s)
Теперь обе таблицы читаются и соединяются в одном маппере без сортировки и пересылки по сети и запрос отработал почти в 2 раза быстрей:
Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Reducer 2
         File Output Operator [FS_12]
            compressed:false
            Statistics:Num rows: 1 Data size: 16
            Group By Operator [GBY_10]
            |  aggregations:["count(VALUE._col0)","sum(VALUE._col1)"]
            |  outputColumnNames:["_col0","_col1"]
            |  Statistics:Num rows: 1 Data size: 16
            |<-Map 1 [SIMPLE_EDGE]
               Reduce Output Operator [RS_9]
                  sort order:
                  Statistics:Num rows: 1 Data size: 16
                  value expressions:_col0 (type: bigint), _col1 (type: double)
                  Group By Operator [GBY_8]
                     aggregations:["count()","sum(_col27)"]
                     outputColumnNames:["_col0","_col1"]
                     Statistics:Num rows: 1 Data size: 16
                     Merge Join Operator [MERGEJOIN_17]
                     |  condition map:[{"":"Inner Join 0 to 1"}]
                     |  keys:{"0":"plant (type: string)","1":"plant (type: string)"}
                     |  outputColumnNames:["_col27"]
                     |  Statistics:Num rows: 17103323 Data size: 40094206343
                     |
                     |<-Filter Operator [FIL_16]
                     |     predicate:plant is not null (type: boolean)
                     |     Statistics:Num rows: 2500 Data size: 4681364
                     |     TableScan [TS_1]
                     |        alias:m
                     |        Statistics:Num rows: 5000 Data size: 9362729
                     |<-Filter Operator [FIL_15]
                           predicate:plant is not null (type: boolean)
                           Statistics:Num rows: 15548476 Data size: 36449277704
                           TableScan [TS_0]
                              alias:p
                              Statistics:Num rows: 31096951 Data size: 72898553064
Причем замечу, что этот подход работает и для просто одинаково кластеризованных таблиц без сортировки.
Сортировка добавляется внутрь map1, что немного медленней, но все равно экономит кучу времени на shuffling - пересылку данных по сети в reducer.

ORC: Bloom filter

Что такое Bloom фильтр описывалось уже несколько раз в этом блоге: oracle-bloom-filter
Bloom фильтр также может применяться в orc файлах и очень удобно подходит для фильтрации на не отсортированных колонках, т.к. индексы на них неэффективны.
Bloom filter же дает дополнительную информацию о содержании колонки в stripe

Время вставки

Протестируем работу на кластеризованно-сортированной таблице с bloom filter на 4 колонках.
Замечу, что названия колонок нужно задавать без кавычек
CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS stored as orc
tblproperties ("orc.bloom.filter.columns"="material,rt_promo,/bic/client,/bic/card12,", "orc.bloom.filter.fpp"="0.05")


insert into pos_4_bloom
select * from pos_rec_itm
where calday between '20190601' and '20190603'
;

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           11                0             0      105566.00       957,410       19,722     31,096,951      31,096,951
 Reducer 2            4                0             0      428483.00     1,247,480        5,818     31,096,951               0
------------------------------------------------------------------------------------------------------------------

Loading data to table pos_4_bloom
Table pos_4_bloom stats: [numFiles=4, numRows=31096951, totalSize=1066863577, rawDataSize=72898553064]
Time taken: 450.133 seconds
Отличия от кластеризованно-сортированной таблицы:
* Время вставки возросло с 433 секунд до 450 секунд, что незначительно больше
* Размер данных возрос с 967 МБ до 1 017 МБ, что на 5% больше

В индексных данных колонки появляется дополнительный раздел "BLOOM_FILTER":
Stripe: offset: 805493730 data: 13149108 rows: 410000 tail: 799 index: 285445
Stream: column 0 section ROW_INDEX start: 805493730 length 20
...
Stream: column 18 section ROW_INDEX start: 805502343 length 570
Stream: column 18 section BLOOM_FILTER start: 805502913 length 260902

Select по ключу bloom filter

Возьмем из пункта про сортировку запрос и сравним с ним.

Запрос к таблице с блум фильтром:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_4_bloom where rt_promo = 'A122RP94'; 
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            8                1             0       10105.00        83,770        4,074        330,000               8
 Reducer 2            1                0             0        6507.00           370            0              8               0
------------------------------------------------------------------------------------------------------------------
Time taken: 10.839 seconds, Fetched: 1 row(s)
Отбирается 330т строк, вместо 31 млн по умолчанию

Если взять колонку на которой есть сортировка и блум фильтр, то результат еще лучше:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_4_bloom where `/bic/client` = '0010000273';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           24                0             0       10325.00       286,540       17,450         10,000              24
 Reducer 2            1                0             0        1953.00         6,490           86             24               0
------------------------------------------------------------------------------------------------------------------
Time taken: 11.958 seconds, Fetched: 1 row(s)
В просто сортированной таблице отбиралось 24 млн строк, а с bloom фильтром всего 10 т.

Увеличение Bloom filter

Попробуем увеличить размер bloom фильтра, чтобы оценить влияние на вставку и выборку:
CLUSTERED BY(plant) SORTED BY(plant, `/bic/client`) INTO 4 BUCKETS stored as orc
tblproperties ("orc.bloom.filter.columns"="material,rt_promo,/bic/client,/bic/card12,", "orc.bloom.filter.fpp"="0.2")

insert into pos_4_bloom2
select * from pos_rec_itm
where calday between '20190601' and '20190603';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           11                0             0       98033.00       976,480       19,705     31,096,951      31,096,951
 Reducer 2            4                0             0      425676.00     1,251,980        4,976     31,096,951               0
------------------------------------------------------------------------------------------------------------------

Loading data to table pos_4_bloom2
Table pos_4_bloom2 stats: [numFiles=4, numRows=31096951, totalSize=1042572361, rawDataSize=72898553064]
Time taken: 446.657 seconds
Отличия от bloom filter по умолчанию:
* Время примерно одно
* Размер данных почему то стал меньше с 1 017 МБ до 994 МБ

Увеличение не дало лучшей фильтрации - те же 330т строк:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_4_bloom2 where rt_promo = 'A122RP94'; 
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            6                0             0        9101.00        57,690        3,591        330,000               6
 Reducer 2            1                0             0        5991.00         1,540           46              6               0
------------------------------------------------------------------------------------------------------------------
Time taken: 10.165 seconds, Fetched: 1 row(s)
И по клиенту - те же 10т строк:
select COUNT(*) as cnt, SUM(rtsaexcust) as rtsaexcust from pos_4_bloom2 where `/bic/client` = '0010000273';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           21                0             0        9935.00       249,380       13,191         10,000              21
 Reducer 2            1                0             0        2590.00         2,370           79             21               0
------------------------------------------------------------------------------------------------------------------
Time taken: 11.007 seconds, Fetched: 1 row(s)

Сравнение результатов

Сравнивать будем время вставки, объем данных и число выбранных данных.
В select сравниваем не время, т.к. слишком большая погрешность и многое зависит от числа читающих мапперов.
Лучше - меньше (выделено цветом):
Показатель По умолчанию Кластеризованная Кластеризованная и сортированная Кластеризованная, сортированная и с bloom фильтром
Время вставки (сек) 278 408 434 450
Размер (МБ) 1 712 1 673 967 1 017
Выборка по ключу (Магазин) строк 31 096 951 15 485 169 180 000 180 000
Выборка по вторичному ключу (Клиент) строк 31 096 951 31 096 951 4 216 492 10 000
Выборка по не ключевому полю (Акция) строк 31 096 951 31 096 951 31 096 951 330 000
Join по ключу (Магазин) сек 38.8 25 24.3 24.3

Hive: Predicate pushdown

Кроме фильтрации в where проверим передачу фильтра через join:

Join predicate pushdown

1 тест на обычных таблицах, чтобы проверить передачу фильтра в индексы Stripe.
bi0_pmaterial - справочник товаров отфильтрован по штрих коду и отбирает 1 товар.
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bi0_pmaterial m on m.material = p.material
where m.eanupc = '4615041324532';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           14                0             0       12214.00       217,980        8,513     31,096,952              14
     Map 3            4                0             0        6315.00        17,350           90        480,000               1
 Reducer 2            1                0             0        5590.00         1,200            0             14               0
------------------------------------------------------------------------------------------------------------------
Time taken: 14.558 seconds, Fetched: 1 row(s)
Фильтр по товару не передался в маппер, из продаж выбрались все данные.

Попробуем повторить тест на сортированной таблице с блум фильтрами на колонках товара (material)
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_4_bloom p
join bi0_pmaterial2 m on m.material = p.material
where m.eanupc = '4615041324532';
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            8                0             0       38212.00       110,160        3,471     31,096,952               8
     Map 3            4                0             0         413.00         2,630           98        480,000               1
 Reducer 2            1                0             0       34809.00           600            0              8               0
------------------------------------------------------------------------------------------------------------------
Time taken: 39.68 seconds, Fetched: 1 row(s)
К сожалению фильтр по товару также не передался в маппер, из продаж выбрались все данные.
Похоже на текущий момент Hive не поддерживает pushdown предикатов из справочника на таблицу с фактами

Partition pruning

Проверим отсечение партиций при соединений через справочник.
Очень частый кейс для BI инструментов, где фильтрация происходит через календарь:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcust
from pos_rec_itm p
join calendar c on c.calday = p.calday
where c.calmonth = '201906' and c.calday1 IN( '01', '02')
;
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1            8                0             0       17620.00       133,190        4,249              2               8
     Map 3            1                0             0        1847.00         5,030           97          3,655               2
 Reducer 2            1                0             0       11037.00         3,230           77              8               0
------------------------------------------------------------------------------------------------------------------
Time taken: 31.376 seconds, Fetched: 1 row(s)
Фильтр в продажи удачно передался и ненужные партиции откинулись из продаж
Это видно по плану - сгенерированный FIL_23 забродкастился в маппер фильтрации продаж :
Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Reducer 2
         File Output Operator [FS_12]
            compressed:false
            Statistics:Num rows: 1 Data size: 16
            Group By Operator [GBY_10]
            |  aggregations:["count(VALUE._col0)","sum(VALUE._col1)"]
            |  outputColumnNames:["_col0","_col1"]
            |  Statistics:Num rows: 1 Data size: 16
            |<-Map 1 [SIMPLE_EDGE] vectorized
               Reduce Output Operator [RS_9]
                  sort order:
                  Statistics:Num rows: 1 Data size: 16
                  value expressions:_col0 (type: bigint), _col1 (type: double)
                  Group By Operator [OP_28]
                     aggregations:["count()","sum(_col27)"]
                     outputColumnNames:["_col0","_col1"]
                     Statistics:Num rows: 1 Data size: 16
                     Map Join Operator [MAPJOIN_27]
                     |  condition map:[{"":"Inner Join 0 to 1"}]
                     |  HybridGraceHashJoin:true
                     |  keys:{"Map 3":"calday (type: string)","Map 1":"calday (type: string)"}
                     |  outputColumnNames:["_col27"]
                     |  Statistics:Num rows: 16551844040 Data size: 40150488709349
                     |<-Map 3 [BROADCAST_EDGE] vectorized
                     |  Reduce Output Operator [RS_24]
                     |     key expressions:calday (type: string)
                     |     Map-reduce partition columns:calday (type: string)
                     |     sort order:+
                     |     Statistics:Num rows: 457 Data size: 498587
                     |     Filter Operator [FIL_23]
                     |        predicate:(calday is not null and (calmonth = '201906') and (calday1) IN ('01', '02')) (type: boolean)
                     |        Statistics:Num rows: 457 Data size: 498587
                     |        TableScan [TS_1]
                     |           alias:c
                     |           Statistics:Num rows: 3655 Data size: 3987605
                     |  Dynamic Partitioning Event Operator [EVENT_20]
                     |     Statistics:Num rows: 457 Data size: 498587
                     |     Group By Operator [OP_26]
                     |        keys:_col0 (type: string)
                     |        outputColumnNames:["_col0"]
                     |        Statistics:Num rows: 457 Data size: 498587
                     |        Select Operator [OP_25]
                     |           outputColumnNames:["_col0"]
                     |           Statistics:Num rows: 457 Data size: 498587
                     |            Please refer to the previous Filter Operator [FIL_23]
                     |<-TableScan [TS_0]
                           alias:p
                           Statistics:Num rows: 15047130620 Data size: 36500443490101 Basic stats: COMPLETE Column stats: PARTIAL

Hive: BroadCast Join

Если таблица факта соединяется с достаточно небольшим справочником, то такой join может быть оптимизирован путем исключения фазы reducer и передачи хэш массива справочника во все мапперы, читающие таблицу факта для последующей фильтрации
Допустимый размер хэш массива задается параметром: hive.auto.convert.join.noconditionaltask.size
Документация: LanguageManualJoinOptimization-OptimizeChainsofMapJoins

Запустим запрос, но отключим broadcast, ограничив размер хэш массива:
set hive.auto.convert.join.noconditionaltask.size = 1;
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bi0_pmaterial m on m.material = p.material
where m.eanupc = '4615041324532';

  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           15                0             0       29683.00       517,290       17,651     31,096,951      31,096,951
     Map 4            1                0             0        3887.00        10,730          666        510,549               1
 Reducer 2          145                0             0       31143.00     1,535,900       24,726     31,096,952             145
 Reducer 3            1                0             0        8275.00        10,270          177            145               0
------------------------------------------------------------------------------------------------------------------
Time taken: 44.604 seconds, Fetched: 1 row(s)
Запрос выполнился за 44,6 секунды.
План стандартный через merge join: таблицы по отдельности читаются, сортируются, сливаются в отдельном reducer, а потом группируются.
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE)

Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Reducer 3
         File Output Operator [FS_13]
            compressed:false
            Statistics:Num rows: 1 Data size: 16
            Group By Operator [GBY_11]
            |  aggregations:["count(VALUE._col0)","sum(VALUE._col1)"]
            |  outputColumnNames:["_col0","_col1"]
            |  Statistics:Num rows: 1 Data size: 16
            |<-Reducer 2 [SIMPLE_EDGE]
               Reduce Output Operator [RS_10]
                  sort order:
                  Statistics:Num rows: 1 Data size: 16
                  value expressions:_col0 (type: bigint), _col1 (type: double)
                  Group By Operator [GBY_9]
                     aggregations:["count()","sum(_col27)"]
                     outputColumnNames:["_col0","_col1"]
                     Statistics:Num rows: 1 Data size: 16
                     Merge Join Operator [MERGEJOIN_18]
                     |  condition map:[{"":"Inner Join 0 to 1"}]
                     |  keys:{"0":"material (type: string)","1":"material (type: string)"}
                     |  outputColumnNames:["_col27"]
                     |  Statistics:Num rows: 17103323 Data size: 40110666850
                     |<-Map 1 [SIMPLE_EDGE] vectorized
                     |  Reduce Output Operator [RS_20]
                     |     key expressions:material (type: string)
                     |     Map-reduce partition columns:material (type: string)
                     |     sort order:+
                     |     Statistics:Num rows: 15548476 Data size: 36464241801
                     |     value expressions:rtsaexcust (type: double)
                     |     Filter Operator [FIL_19]
                     |        predicate:material is not null (type: boolean)
                     |        Statistics:Num rows: 15548476 Data size: 36464241801
                     |        TableScan [TS_0]
                     |           alias:p
                     |           Statistics:Num rows: 31096951 Data size: 72928481257
                     |<-Map 4 [SIMPLE_EDGE] vectorized
                        Reduce Output Operator [RS_22]
                           key expressions:material (type: string)
                           Map-reduce partition columns:material (type: string)
                           sort order:+
                           Statistics:Num rows: 127637 Data size: 684861875
                           Filter Operator [FIL_21]
                              predicate:(material is not null and (eanupc = '4615041324532')) (type: boolean)
                              Statistics:Num rows: 127637 Data size: 684861875
                              TableScan [TS_1]
                                 alias:m
                                 Statistics:Num rows: 510549 Data size: 2739452867
Теперь выполним без ограничений:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bi0_pmaterial m on m.material = p.material
where m.eanupc = '4615041324532';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           15                0             0       18323.00       228,930        7,344     31,096,952              15
     Map 3            1                0             0        2444.00         9,050          136        510,549               1
 Reducer 2            1                0             0       10164.00         2,250            0             15               0
------------------------------------------------------------------------------------------------------------------
Time taken: 22.365 seconds, Fetched: 1 row(s)
Время выполнения почти в 2 раза быстрей: 22,3 секунд.
По плану видно, что хэш массив создается до основного выполнения и передается во все мапперы, что позволяет выполнить запрос без передачи большой таблицы фактов в reducer.
Map 1 <- Map 3 (BROADCAST_EDGE)
Reducer 2 <- Map 1 (SIMPLE_EDGE)

Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Reducer 2
         File Output Operator [FS_13]
            compressed:false
            Statistics:Num rows: 1 Data size: 16
            Group By Operator [GBY_11]
            |  aggregations:["count(VALUE._col0)","sum(VALUE._col1)"]
            |  outputColumnNames:["_col0","_col1"]
            |  Statistics:Num rows: 1 Data size: 16
            |<-Map 1 [SIMPLE_EDGE] vectorized
               Reduce Output Operator [RS_10]
                  sort order:
                  Statistics:Num rows: 1 Data size: 16
                  value expressions:_col0 (type: bigint), _col1 (type: double)
                  Group By Operator [OP_24]
                     aggregations:["count()","sum(_col27)"]
                     outputColumnNames:["_col0","_col1"]
                     Statistics:Num rows: 1 Data size: 16
                     Map Join Operator [MAPJOIN_23]
                     |  condition map:[{"":"Inner Join 0 to 1"}]
                     |  HybridGraceHashJoin:true
                     |  keys:{"Map 3":"material (type: string)","Map 1":"material (type: string)"}
                     |  outputColumnNames:["_col27"]
                     |  Statistics:Num rows: 17103323 Data size: 40110666850
                     |<-Map 3 [BROADCAST_EDGE] vectorized
                     |  Reduce Output Operator [RS_21]
                     |     key expressions:material (type: string)
                     |     Map-reduce partition columns:material (type: string)
                     |     sort order:+
                     |     Statistics:Num rows: 127637 Data size: 684861875
                     |     Filter Operator [FIL_20]
                     |        predicate:(material is not null and (eanupc = '4615041324532')) (type: boolean)
                     |        Statistics:Num rows: 127637 Data size: 684861875
                     |        TableScan [TS_1]
                     |           alias:m
                     |           Statistics:Num rows: 510549 Data size: 2739452867
                     |<-Filter Operator [FIL_22]
                           predicate:material is not null (type: boolean)
                           Statistics:Num rows: 15548476 Data size: 36464241801
                           TableScan [TS_0]
                              alias:p
                              Statistics:Num rows: 31096951 Data size: 72928481257

Hive: Skew Join

Одна из распространенных проблем при выполнении параллельного join - перекос данных к одному из значений.
Перекос - это когда основной объем данных приходятся к одному уникальному значению ключа.
Тогда при выполнение hash join или merge join одно уникальное значение уходит в 1 из потоков/reducer и запрос по итогу выполняется однопоточно.
Проблема перекоса данных в Oracle описывалась раньше
Документация hive-skew: LanguageManualDDL-SkewedTables

Смоделируем перекос данных пересоздав таблицу отдав 96% данных одному значению "/bic/client" = '0010000273'
truncate table pos_1_def;
insert into pos_1_def
select ..., 
case when calday = '20190601' and material like '%9' then `/bic/client` else '0010000273' end as `/bic/client`
from pos_rec_itm
where calday between '20190601' and '20190603';
Проверим распределение:
select COUNT(*) as cnt, COUNT(case when `/bic/client` = '0010000273' then 1 end) as cnt_skew, COUNT(case when `/bic/client` = '0010000273' then 1 end) / COUNT(*) as prc 
from pos_1_def;
31096951        29926345        0.9623562451508509
Выполним запрос на таблице по умолчанию:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bic_pclient m on m.`/bic/client` = p.`/bic/client`;
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           11                0             0       23738.00       394,730       13,020     31,096,951      31,096,951
     Map 4            8                0             0       28492.00       456,770       13,997     26,682,353      26,682,353
 Reducer 2          253                0             0      130887.00     2,667,430       47,128     57,779,304           8,541
 Reducer 3            1                0             0      104979.00         3,770            0          8,541               0
------------------------------------------------------------------------------------------------------------------
Time taken: 149.813 seconds, Fetched: 1 row(s)
150 секунд и последний job reducer 2 работал дольше всего.

Одним из способов устранения перекоса - это разбиение запроса на 2 части: перекошенное значение и все остальное:
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bic_pclient m on m.`/bic/client` = p.`/bic/client`
where p.`/bic/client` = '0010000273' AND m.`/bic/client` = '0010000273'
UNION ALL
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bic_pclient m on m.`/bic/client` = p.`/bic/client`
where p.`/bic/client` != '0010000273';

------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           11                0             0       24969.00       389,220       13,581     31,096,951      29,926,345
     Map 5            8                0             0       12357.00       121,110        8,002        400,000               1
     Map 6            4                0             0       14176.00        93,470        3,818     12,363,714       1,170,606
     Map 9            8                0             0       32404.00       469,320       15,902     26,682,353      26,682,352
 Reducer 2          253                0             0      133918.00       915,360       27,289     29,926,346             253
 Reducer 3            1                0             0      102038.00        12,060            0            253               0
 Reducer 7          253                0             0       24520.00     2,429,700       32,257     27,852,958             253
 Reducer 8            1                0             0        9178.00         1,880            0            253               0
------------------------------------------------------------------------------------------------------------------
Time taken: 152.275 seconds, Fetched: 2 row(s)
К сожалению это не дало какого то значимого результата - те же 152 секунды

Попробуем встроенную в Hive оптимизацию:
* При создании таблицы нужно указать колонку и значение с перекосом
* Пересоздадим таблицу
* Hive в этом случае разбивает данные на 2 части: подкаталог с перекошенным значением "/%2Fbic%2Fclient=0010000273/" и каталог с остальным:
SKEWED BY (`/bic/client`) ON ('0010000273') STORED AS DIRECTORIES
$ hadoop fs -du -h /apps/hive/warehouse/pos_1_def
199.7 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000000_0
18.2 M   /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000001_0
197.9 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000002_0
148.3 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000003_0
221.1 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000004_0
20.6 M   /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000005_0
219.8 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000006_0
73.7 M   /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000007_0
221.3 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000008_0
20.5 M   /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000009_0
159.3 M  /apps/hive/warehouse/pos_1_def/%2Fbic%2Fclient=0010000273/000010_0
22.9 M  /apps/hive/warehouse/pos_1_def/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/000000_0
2.1 M   /apps/hive/warehouse/pos_1_def/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/000001_0
22.1 M  /apps/hive/warehouse/pos_1_def/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/000002_0
16.5 M  /apps/hive/warehouse/pos_1_def/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/000003_0
И выполним запрос с включенной оптимизацией перекосов "hive.optimize.skewjoin=true":
set hive.optimize.skewjoin=true;
set hive.skewjoin.key=5000;
select COUNT(*) as cnt, SUM(p.rtsaexcust) as rtsaexcus
from pos_1_def p
join bic_pclient m on m.`/bic/client` = p.`/bic/client`;

--все также последний редюсер тормозит
------------------------------------------------------------------------------------------------------------------
  VERTICES  TOTAL_TASKS  FAILED_ATTEMPTS  KILLED_TASKS   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS  OUTPUT_RECORDS
------------------------------------------------------------------------------------------------------------------
     Map 1           15                0             0       25306.00       434,520       15,334     31,096,951      31,096,951
     Map 4            8                0             0       28176.00       427,410       12,580     26,682,353      26,682,353
 Reducer 2          253                0             0      128475.00     2,726,900       45,064     57,779,304           7,454
 Reducer 3            1                0             0      100911.00         8,820           86          7,454               0
------------------------------------------------------------------------------------------------------------------
Time taken: 142.052 seconds, Fetched: 1 row(s)
Ииии.... получаем тоже самое
Либо оптимизация не работает или я делаю что-то не так.

3ий вариант оптимизации перекоса - это генерация суррогатного ключа для перекошенного значения и делать join по суррогатному ключу, чтобы данные распределились равномерно по процессам (так делает Oracle автоматически с 12 версии)
Т.к. в моих задачах этой проблемы нет, то дальше эксперименты не проводились.

Hive 3: Materialized view

Начиная с 3 версии Hive поддерживаем материализованные представления и соответственно rewrite запросов на select из matview (hive.materializedview.rewriting).
Документация: Materialized+views

К примеру:
* Maview:
CREATE MATERIALIZED VIEW mv1
AS
SELECT empid, deptname, hire_date
FROM emps JOIN depts
  ON (emps.deptno = depts.deptno)
WHERE hire_date >= '2016-01-01';
При выполнении запроса:
SELECT empid, deptname
FROM emps
JOIN depts
  ON (emps.deptno = depts.deptno)
WHERE hire_date >= '2018-01-01'
    AND hire_date <= '2018-03-31';
Будет подменена на запрос из представления:
SELECT empid, deptname
FROM mv1
WHERE hire_date >= '2018-01-01'
    AND hire_date <= '2018-03-31';

Также Hive поддерживает инкрементальное обновление. Для этого требуется:
* Включенный ACID на таблицах источниках для трекинга измененных строк
* Включенный ACID на matview для выполнения merge в представление

Если matview устарела и не была обновлена, то rewrite автоматически отключается.
Но можно принудительно включить rewrite на промежуток времени до обновления целиком в системе:
SET hive.materializedview.rewriting.time.window=10min;
или этот же параметр может быть установлен на уровне конкретных таблиц.

Hive LLAP

Начиная с 2 версии Hive поддерживает LLAP (Live Long And Process) сервер запросов
Основные отличия от обычного Hive:
* постоянно запущенный демон, что уменьшает время старта запроса
* обращение к данным происходит через демона LLAP, что дает возможность кэширования и prefetching данных между запросами
- Минус, что под кэш нужно много Ram, по этому llap обычно отдается сервер целиком и на нем больше ничего не ставят.

Комментариев нет:

Отправить комментарий