Показаны сообщения с ярлыком parallel. Показать все сообщения
Показаны сообщения с ярлыком parallel. Показать все сообщения

четверг, 29 ноября 2018 г.

Введение в Scala и параллельную разработку

В этой статье я хотел бы охватить все аспекты работы с данными на языке Scala - от примитивов языка до параллельного программирования.

четверг, 24 марта 2016 г.

Oracle: распределение (Distrib) данных в параллельных запросах

Эта заметка осмысление статьи http://oracle-randolf.blogspot.ru/2012/12/hash-join-buffered.html по неочевидной операции HASH JOIN BUFFERED в параллельных запросах.

Основная мысль, которую надо понять при работе с параллельными запроса: "At most one data distribution can be active at the same time" - только одно распределение данных ( PQ Distrib / PX SEND ) может работать одновременно.

Это является причиной появления операций HASH JOIN BUFFERED или BUFFER SORT в параллельных планах, именно они сбивают с мысли человека привыкшего к последовательным не параллельным запросам.
Может сложиться ошибочное мнение о том что при выполнении HASH JOIN закончился hash area size и oracle свопит промежуточные данные на диск/память или делается какаято сортировка, чтобы потом выполнить merge join.

В реальности, как я уже сказал ранее, на параллельные запросы накладывается дополнительное условие: только одно распределение данных ( PQ Distrib / PX SEND ) может работать в один момент.
Это вынуждает складировать промежуточные данные других потоков до конца выполнения распределения, что является причиной появления буферных операций HASH JOIN BUFFERED и BUFFER SORT.

Рассмотрим на примере:

Создадим 2 таблицы:
create table t2
compress
as
select
        rownum as id
      , mod(rownum, 1000) + 1 as fk
      , rpad('x', 100) as filler
from
        dual
connect by
        level <= 1000000
;
exec dbms_stats.gather_table_stats(null, 't2')

-- Create a copy of T2
create table t4
compress as
select * from t2;

insert /*+ append */ into t4
select
        1000000 + rownum as id
      , 1000 + mod(rownum, 1000) + 1 as fk
      , rpad('x', 100) as filler
from
        dual
connect by
        level <= 1000000
;

commit;

exec dbms_stats.gather_table_stats(null, 't4')

alter table t2 parallel 2;

alter table t4 parallel 2;

Oracle может выполнить hash join 3 разными способами.

Способ 1 с HASH JOIN BUFFERED:
explain plan for
select * from (
select  /*+ no_merge use_hash(a b) no_cpu_costing leading(a b) pq_distribute(b, hash, hash) */
        a.filler as a_filler, b.filler as b_filler
from
        t2 a
      , t4 b
where
        a.fk = b.fk
)
where rownum > 1;
select * from table(dbms_xplan.display(format=>'ALLSTATS ALL ADVANCED'));

-----------------------------------------------------------------------------------------------------
| Id  | Operation                  | Name     | E-Rows |E-Bytes| Cost  |    TQ  |IN-OUT| PQ Distrib |
-----------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT           |          |   1000M|    96G| 56606 |        |      |            |
|   1 |  COUNT                     |          |        |       |       |        |      |            |
|*  2 |   FILTER                   |          |        |       |       |        |      |            |
|   3 |    PX COORDINATOR          |          |        |       |       |        |      |            |
|   4 |     PX SEND QC (RANDOM)    | :TQ10002 |   1000M|    96G| 56606 |  Q1,02 | P->S | QC (RAND)  |
|   5 |      VIEW                  |          |   1000M|    96G| 56606 |  Q1,02 | PCWP |            |
|*  6 |       HASH JOIN BUFFERED   |          |   1000M|   195G| 56606 |  Q1,02 | PCWP |            |
|   7 |        PX RECEIVE          |          |   1000K|   100M|   175 |  Q1,02 | PCWP |            |
|   8 |         PX SEND HASH       | :TQ10000 |   1000K|   100M|   175 |  Q1,00 | P->P | HASH       |
|   9 |          PX BLOCK ITERATOR |          |   1000K|   100M|   175 |  Q1,00 | PCWC |            |
|  10 |           TABLE ACCESS FULL| T2       |   1000K|   100M|   175 |  Q1,00 | PCWP |            |
|  11 |        PX RECEIVE          |          |   2000K|   200M|   359 |  Q1,02 | PCWP |            |
|  12 |         PX SEND HASH       | :TQ10001 |   2000K|   200M|   359 |  Q1,01 | P->P | HASH       |
|  13 |          PX BLOCK ITERATOR |          |   2000K|   200M|   359 |  Q1,01 | PCWC |            |
|  14 |           TABLE ACCESS FULL| T4       |   2000K|   200M|   359 |  Q1,01 | PCWP |            |
-----------------------------------------------------------------------------------------------------
Хинты заданы, чтобы точно гарантировать последовательность работы.
Через хинт pq_distribute(b, hash, hash) мы задаем способ обмена данными между потоками - в данном случае hash.

Используя мониторинг становится ясна последовательность выполнения (Timeline):
HASH JOIN BUFFERED
1. PX SEND HASH :TQ10000 - левая таблица целиком хэшируется и отправляется в :TQ10002
2. PX SEND HASH :TQ10001 - правая таблица также целиком хэшируется и отправляется в :TQ10000.
Так же до этапа HASH JOIN происходит откидывание несоответствующих хэшей правой таблицы (Randolf подтверждает это трассировкой и рассчетами на основании количества строк в правой hash таблице)
Дополнительная затраченная память отображается в столбце Memory.
3. PX SEND QC (RANDOM) :TQ10002 - происходит соединение таблиц используя хэши левой и правой таблицы + параллельная отправка промежуточных данных на следующий этап.

Такая последовательность действий обеспечивает нам 1 рабочий PX SEND одновременно.


Способ 2: HASH JOIN альтернативный план, когда хэш правой таблицы передается постепенно и не буферизуется.
Чтобы получить его, достаточно добавить агрегирующую функцию (count/min/max) - что заблокирут передачу промежуточных данных на уровень выше.
explain plan for
select count(*) from (
select  /*+ no_merge use_hash(a b) no_cpu_costing leading(a b) pq_distribute(b, hash, hash) */
        a.filler as a_filler, b.filler as b_filler
from
        t2 a
      , t4 b
where
        a.fk = b.fk
);
select * from table(dbms_xplan.display(format=>'ALLSTATS ALL ADVANCED'));

-----------------------------------------------------------------------------------------------------
| Id  | Operation                  | Name     | E-Rows |E-Bytes| Cost  |    TQ  |IN-OUT| PQ Distrib |
-----------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT           |          |      1 |       | 56043 |        |      |            |
|   1 |  SORT AGGREGATE            |          |      1 |       |       |        |      |            |
|   2 |   PX COORDINATOR           |          |        |       |       |        |      |            |
|   3 |    PX SEND QC (RANDOM)     | :TQ10002 |      1 |       |       |  Q1,02 | P->S | QC (RAND)  |
|   4 |     SORT AGGREGATE         |          |      1 |       |       |  Q1,02 | PCWP |            |
|   5 |      VIEW                  |          |   1000M|       | 56043 |  Q1,02 | PCWP |            |
|*  6 |       HASH JOIN            |          |   1000M|  7629M| 56043 |  Q1,02 | PCWP |            |
|   7 |        PX RECEIVE          |          |   1000K|  3906K|   175 |  Q1,02 | PCWP |            |
|   8 |         PX SEND HASH       | :TQ10000 |   1000K|  3906K|   175 |  Q1,00 | P->P | HASH       |
|   9 |          PX BLOCK ITERATOR |          |   1000K|  3906K|   175 |  Q1,00 | PCWC |            |
|  10 |           TABLE ACCESS FULL| T2       |   1000K|  3906K|   175 |  Q1,00 | PCWP |            |
|  11 |        PX RECEIVE          |          |   2000K|  7812K|   359 |  Q1,02 | PCWP |            |
|  12 |         PX SEND HASH       | :TQ10001 |   2000K|  7812K|   359 |  Q1,01 | P->P | HASH       |
|  13 |          PX BLOCK ITERATOR |          |   2000K|  7812K|   359 |  Q1,01 | PCWC |            |
|  14 |           TABLE ACCESS FULL| T4       |   2000K|  7812K|   359 |  Q1,01 | PCWP |            |
-----------------------------------------------------------------------------------------------------
hash join
1. PX SEND HASH :TQ10000 - левая таблица целиком хэшируется и отправляется в :TQ10002
2. PX SEND HASH :TQ10001 - правая таблица построчно хэшируется и отправляется в :TQ10000.
Эта операция не требует дополнительных затрат памяти.
3. HASH JOIN - происходит построчное соединение таблиц используя хэши левой и правой таблицы.
4. PX SEND QC (RANDOM) :TQ10002 - отправка результата дальше, выполняется после полного выполнения этапа 2, т.е. после полного выполнения HASH JOIN этапа 3.

Способ 3: BUFFER SORT альтернативный план, в целом похожий на HASH JOIN BUFFERED
select * from (
select  /*+ no_merge use_hash(a b) no_cpu_costing leading(a b) pq_distribute(b, none, broadcast) */
        a.filler as a_filler, b.filler as b_filler
from
        t2 a
      , t4 b
where
        a.fk = b.fk
)
where rownum > 1;

select * from table(dbms_xplan.display(format=>'ALLSTATS ALL ADVANCED'));

------------------------------------------------------------------------------------------------------
| Id  | Operation                   | Name     | E-Rows |E-Bytes| Cost  |    TQ  |IN-OUT| PQ Distrib |
------------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT            |          |   1000M|    96G| 56606 |        |      |            |
|   1 |  COUNT                      |          |        |       |       |        |      |            |
|*  2 |   FILTER                    |          |        |       |       |        |      |            |
|   3 |    PX COORDINATOR           |          |        |       |       |        |      |            |
|   4 |     PX SEND QC (RANDOM)     | :TQ10001 |   1000M|    96G| 56606 |  Q1,01 | P->S | QC (RAND)  |
|   5 |      VIEW                   |          |   1000M|    96G| 56606 |  Q1,01 | PCWP |            |
|*  6 |       HASH JOIN             |          |   1000M|   195G| 56606 |  Q1,01 | PCWP |            |
|   7 |        PX BLOCK ITERATOR    |          |   1000K|   100M|   175 |  Q1,01 | PCWC |            |
|   8 |         TABLE ACCESS FULL   | T2       |   1000K|   100M|   175 |  Q1,01 | PCWP |            |
|   9 |        BUFFER SORT          |          |        |       |       |  Q1,01 | PCWC |            |
|  10 |         PX RECEIVE          |          |   2000K|   200M|   359 |  Q1,01 | PCWP |            |
|  11 |          PX SEND BROADCAST  | :TQ10000 |   2000K|   200M|   359 |  Q1,00 | P->P | BROADCAST  |
|  12 |           PX BLOCK ITERATOR |          |   2000K|   200M|   359 |  Q1,00 | PCWC |            |
|  13 |            TABLE ACCESS FULL| T4       |   2000K|   200M|   359 |  Q1,00 | PCWP |            |
------------------------------------------------------------------------------------------------------
Меняем метод рассылки данных на BROADCAST - копирование данных во все потоки.
В этом случае хэши правой таблицы не создаются, вместо этого данные помещаются в BUFFER SORT и используются при выполнении HASH JOIN
buffer sort
1. PX SEND BROADCAST :TQ10000 - правая таблица целиком пересылается в BUFFER SORT
Надо заметить, что BUFFER SORT в этом случае ничего не сортирует, а только буферизирует данны.
Объем потребленной памяти = размер правой таблицы * параллелизм (из-за broadcast во все потоки).
2. HASH JOIN - происходит полное соединение таблиц.
4. PX SEND QC (RANDOM) :TQ10001 - отправка результата дальше по мере готовности.

Объем памяти в BUFFER SORT не растет во время выполнения, что подтверждает предположение - правая таблица целиком помещается в буфер сразу на первом этапе.
buffer sort 106sec


Вывод: обычный hash join лучше, когда памяти мало, т.к. нет операций с temp/памятью.
Если памяти достаточно, то hash join buffered предпочтительней, т.к. oracle может сразу перейти к следующему этапу запроса и получить большую степерь параллелизма.

пятница, 29 января 2016 г.

Oracle: оптимизация параллельных запросов

Один из простых способов ускорения запросов - это их расспараллеливание.
Это делается достаточно просто через:
* Хинт
/*+ parallel(N) */

* Через установки сессий:
alter session enable parallel dml; --для insert
ALTER SESSION FORCE PARALLEL QUERY PARALLEL N;

 - N число потоков

В теории этого достаточно, чтобы ускорить запрос в разы.
Но есть ряд ситуаций в которых параллельность наоборот мешает.

Для начала разберемся с терминологией - посмотрим на параллельный план с join 2 таблиц:
create table t_1
compress
as
select  /*+ use_nl(a b) */
        rownum as id
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1e5) */ * from dual
connect by
        level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;

create table t_2
compress
as
select
        rownum as id
      , case when rownum <= 5e5 then mod(rownum, 2e6) + 1 else 1 end as fk_id_skew
      , rownum as fk_id_uniform
      , rpad('x', 100) as filler
from
        (select /*+ cardinality(1e5) */ * from dual
connect by
        level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b
;

--соберем статистику
begin
  dbms_stats.gather_table_stats(user, 't_1');
  dbms_stats.gather_table_stats(user, 't_2');
end;
/
- Запрос 1

Таблица T2 имеет особенность: fk_id_skew неравномерно заполнен и имеет перекос в сторону 1 - она встречается значительно чаще других.
select COUNT(*) cnt, fk_id_skew  from t_2 GROUP BY fk_id_skew ORDER BY cnt desc;

       CNT FK_ID_SKEW
---------- ----------
   1500000          1
         1         22
         1         30
         1         34
....
- Запрос 2

Итак, выполнил простой запрос:
select count(t_2_filler) from (
select  /*+ monitor
            no_parallel
            leading(t_1 t_2)
            use_hash(t_2)
            no_swap_join_inputs(t_2)
        */
        t_1.id as t_1_id
      , t_1.filler as t_1_filler
      , t_2.id as t_2_id
      , t_2.filler as t_2_filler
from    t_1
      , t_2
where
        t_2.fk_id_uniform = t_1.id
and     regexp_replace(t_2.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') >= regexp_replace(t_1.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c')
);
- Запрос 3

* regexp_replace в этом запросе нужен, чтобы данные отбирались не мгновенно и были видны в статистике затраты CPU.
* Хинты вставлены, чтобы запрос в плане выглядел также как написан тут.

Время выполнения выполнения запроса = 49сек.

Добавим хинт parallel(8) замен no_parallel.
Время выполнения = , что в 6 раз быстрей.
Разберем для понимания план запроса:
SELECT * FROM TABLE(DBMS_XPLAN.DISPLAY(format=>'PARALLEL'));

-------------------------------------------------------------------------------------------------------------------
| Id  | Operation                 | Name     | Rows  | Bytes | Cost (%CPU)| Time     |    TQ  |IN-OUT| PQ Distrib |
-------------------------------------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT          |          |     1 |   213 |   619   (2)| 00:00:08 |        |      |            |
|   1 |  SORT AGGREGATE           |          |     1 |   213 |            |          |        |      |            |
|   2 |   PX COORDINATOR          |          |       |       |            |          |        |      |            |
|   3 |    PX SEND QC (RANDOM)    | :TQ10002 |     1 |   213 |            |          |  Q1,02 | P->S | QC (RAND)  |
|   4 |     SORT AGGREGATE        |          |     1 |   213 |            |          |  Q1,02 | PCWP |            |
|*  5 |      HASH JOIN            |          |   100K|    20M|   619   (2)| 00:00:08 |  Q1,02 | PCWP |            |
|   6 |       PX RECEIVE          |          |  2000K|   202M|   245   (2)| 00:00:03 |  Q1,02 | PCWP |            |
|   7 |        PX SEND HASH       | :TQ10000 |  2000K|   202M|   245   (2)| 00:00:03 |  Q1,00 | P->P | HASH       |
|   8 |         PX BLOCK ITERATOR |          |  2000K|   202M|   245   (2)| 00:00:03 |  Q1,00 | PCWC |            |
|   9 |          TABLE ACCESS FULL| T_1      |  2000K|   202M|   245   (2)| 00:00:03 |  Q1,00 | PCWP |            |
|  10 |       PX RECEIVE          |          |  2000K|   204M|   371   (1)| 00:00:05 |  Q1,02 | PCWP |            |
|  11 |        PX SEND HASH       | :TQ10001 |  2000K|   204M|   371   (1)| 00:00:05 |  Q1,01 | P->P | HASH       |
|  12 |         PX BLOCK ITERATOR |          |  2000K|   204M|   371   (1)| 00:00:05 |  Q1,01 | PCWC |            |
|  13 |          TABLE ACCESS FULL| T_2      |  2000K|   204M|   371   (1)| 00:00:05 |  Q1,01 | PCWP |            |
-------------------------------------------------------------------------------------------------------------------
- План 1

Основопологающие фазы:
* PX BLOCK ITERATOR - чтение таблицы частями в несколько потоков
* PX SEND - 1 поток посылает данные другому. Важно знать, что только один producer (PX SEND) может быть активен в одно время, что накладывает ограничения на параллельный план выполнения, подробней: Вторая часть по распределению данных в параллельных запросах
 ** RANGE - данные будут разбиты на диапазоны (часто при сортировке)
 ** HASH - диапазон данных на основе их хэша (hash join, group by)
 ** RANDOM - случайная отправка
 ** BROADCAST - отправка таблицы во все потоки (часто на маленькой таблице, совместно с последующей ROUND ROBIN правой таблицы. Может быть проблемой производительности, если левая таблица значительно больше, чем указано в статистике, т.к. данные дублируются во все потоки)
 ** ROUND ROBIN - данные отправляются в потоки по кругу

Про способы распределения данных по потокам нужно поговорить отдельно:

Стоит заметить, что данные бьются по значениям в столбцах строк, а не просто по строкам.
Это нужно, чтобы один и тотже диапозон данных из разных таблиц попал в один поток для join.
Если бы Oracle делал не так, то в 1 поток могли бы попасть совершенно разные данные и join нельзя было бы совершить.
На это стоит обратить внимание, т.к. это может являться и причиной замедлений выполнения параллельного запроса при сильном перекосе данных (О причинах замделенния параллельных запросов дальше)

 ** P->P - данные из одной параллельной группы передаются в другую параллельную группу
 ** P->S - параллельность в последовательное выполнение (узкое место или конец запроса - вторая из основных причин замедления параллельного запроса)
 ** PCWP - параллельность с родителем: сканируем таблицу и сразу делаем join с другой
 ** PCWC - наоборот: передаем фильтр из внешнего потока и применяем при сканировании
* PX RECEIVE - получение данных из одного параллельного потока в другой
* PX SEND QC - отправка данных координатору
* PX COORDINATOR - приемник всех параллельных запросов
* TQ - Номер потока

Мы рассмотрели идеальный случай распараллеленого запроса. Остановимся подробней на причинах замеделений:
1. Событие "P->S - параллельность в последовательное выполнение"
2. PX SEND skew - Перекос данных
и ускорения:
3. Bloom filter
4. Partition wise