Показаны сообщения с ярлыком parallel. Показать все сообщения
Показаны сообщения с ярлыком parallel. Показать все сообщения
четверг, 20 июня 2024 г.
четверг, 29 ноября 2018 г.
Введение в Scala и параллельную разработку
Метки:
bigdata,
functional,
parallel,
scala
В этой статье я хотел бы охватить все аспекты работы с данными на языке Scala - от примитивов языка до параллельного программирования.
-
Введение в Scala
- Переменные
- Операторы
- Условия
- Вывод в консоль
- Циклы
- Функции
- Процедуры
- Ленивые переменные
- Исключения
- Массивы
- Ассоциативные массивы
- Кортежи (tuples)
- Классы
- Объекты
- Пакеты и импортирование
- Наследование
- Файлы и регулярные выражения
- Трейты (интерфейсы)
- Операторы
- Функции высшего порядка
- Коллекции
- Сопоставление с образцом
- Аннотации
- Обработка XML
- Обобщенные типы
- Дополнительные типы
- Неявные преобразования
- Динамическое программирование
- Конкурентное программирование Scala
- Молниеносный анализ Spark
понедельник, 30 мая 2016 г.
Том Кайт: Oracle для профессионалов
Третье издание книги Тома Кайта: Oracle для профессионалов, включающее особенности Oracle 12.
1-9. 1-9 главы - внутренняя работа субд oracle
10. Глава 10 - таблицы
11. Глава 11 - индексы
12. Глава 12 - типы данных
13. Глава 13 - секционирование
14. Глава 14 - параллельное выполнение
15. Глава 15 - загрузка и выгрузка данных
1-9. 1-9 главы - внутренняя работа субд oracle
10. Глава 10 - таблицы
11. Глава 11 - индексы
12. Глава 12 - типы данных
13. Глава 13 - секционирование
14. Глава 14 - параллельное выполнение
15. Глава 15 - загрузка и выгрузка данных
четверг, 24 марта 2016 г.
Oracle: распределение (Distrib) данных в параллельных запросах
Основная мысль, которую надо понять при работе с параллельными запроса: "At most one data distribution can be active at the same time" - только одно распределение данных ( PQ Distrib / PX SEND ) может работать одновременно.
Это является причиной появления операций HASH JOIN BUFFERED или BUFFER SORT в параллельных планах, именно они сбивают с мысли человека привыкшего к последовательным не параллельным запросам.
Может сложиться ошибочное мнение о том что при выполнении HASH JOIN закончился hash area size и oracle свопит промежуточные данные на диск/память или делается какаято сортировка, чтобы потом выполнить merge join.
В реальности, как я уже сказал ранее, на параллельные запросы накладывается дополнительное условие: только одно распределение данных ( PQ Distrib / PX SEND ) может работать в один момент.
Это вынуждает складировать промежуточные данные других потоков до конца выполнения распределения, что является причиной появления буферных операций HASH JOIN BUFFERED и BUFFER SORT.
Рассмотрим на примере:
Создадим 2 таблицы:
create table t2 compress as select rownum as id , mod(rownum, 1000) + 1 as fk , rpad('x', 100) as filler from dual connect by level <= 1000000 ; exec dbms_stats.gather_table_stats(null, 't2') -- Create a copy of T2 create table t4 compress as select * from t2; insert /*+ append */ into t4 select 1000000 + rownum as id , 1000 + mod(rownum, 1000) + 1 as fk , rpad('x', 100) as filler from dual connect by level <= 1000000 ; commit; exec dbms_stats.gather_table_stats(null, 't4') alter table t2 parallel 2; alter table t4 parallel 2;
Oracle может выполнить hash join 3 разными способами.
Способ 1 с HASH JOIN BUFFERED:
explain plan for select * from ( select /*+ no_merge use_hash(a b) no_cpu_costing leading(a b) pq_distribute(b, hash, hash) */ a.filler as a_filler, b.filler as b_filler from t2 a , t4 b where a.fk = b.fk ) where rownum > 1; select * from table(dbms_xplan.display(format=>'ALLSTATS ALL ADVANCED')); ----------------------------------------------------------------------------------------------------- | Id | Operation | Name | E-Rows |E-Bytes| Cost | TQ |IN-OUT| PQ Distrib | ----------------------------------------------------------------------------------------------------- | 0 | SELECT STATEMENT | | 1000M| 96G| 56606 | | | | | 1 | COUNT | | | | | | | | |* 2 | FILTER | | | | | | | | | 3 | PX COORDINATOR | | | | | | | | | 4 | PX SEND QC (RANDOM) | :TQ10002 | 1000M| 96G| 56606 | Q1,02 | P->S | QC (RAND) | | 5 | VIEW | | 1000M| 96G| 56606 | Q1,02 | PCWP | | |* 6 | HASH JOIN BUFFERED | | 1000M| 195G| 56606 | Q1,02 | PCWP | | | 7 | PX RECEIVE | | 1000K| 100M| 175 | Q1,02 | PCWP | | | 8 | PX SEND HASH | :TQ10000 | 1000K| 100M| 175 | Q1,00 | P->P | HASH | | 9 | PX BLOCK ITERATOR | | 1000K| 100M| 175 | Q1,00 | PCWC | | | 10 | TABLE ACCESS FULL| T2 | 1000K| 100M| 175 | Q1,00 | PCWP | | | 11 | PX RECEIVE | | 2000K| 200M| 359 | Q1,02 | PCWP | | | 12 | PX SEND HASH | :TQ10001 | 2000K| 200M| 359 | Q1,01 | P->P | HASH | | 13 | PX BLOCK ITERATOR | | 2000K| 200M| 359 | Q1,01 | PCWC | | | 14 | TABLE ACCESS FULL| T4 | 2000K| 200M| 359 | Q1,01 | PCWP | | -----------------------------------------------------------------------------------------------------Хинты заданы, чтобы точно гарантировать последовательность работы.
Через хинт pq_distribute(b, hash, hash) мы задаем способ обмена данными между потоками - в данном случае hash.
Используя мониторинг становится ясна последовательность выполнения (Timeline):

1. PX SEND HASH :TQ10000 - левая таблица целиком хэшируется и отправляется в :TQ10002
2. PX SEND HASH :TQ10001 - правая таблица также целиком хэшируется и отправляется в :TQ10000.
Так же до этапа HASH JOIN происходит откидывание несоответствующих хэшей правой таблицы (Randolf подтверждает это трассировкой и рассчетами на основании количества строк в правой hash таблице)
Дополнительная затраченная память отображается в столбце Memory.
3. PX SEND QC (RANDOM) :TQ10002 - происходит соединение таблиц используя хэши левой и правой таблицы + параллельная отправка промежуточных данных на следующий этап.
Такая последовательность действий обеспечивает нам 1 рабочий PX SEND одновременно.
Способ 2: HASH JOIN альтернативный план, когда хэш правой таблицы передается постепенно и не буферизуется.
Чтобы получить его, достаточно добавить агрегирующую функцию (count/min/max) - что заблокирут передачу промежуточных данных на уровень выше.
explain plan for select count(*) from ( select /*+ no_merge use_hash(a b) no_cpu_costing leading(a b) pq_distribute(b, hash, hash) */ a.filler as a_filler, b.filler as b_filler from t2 a , t4 b where a.fk = b.fk ); select * from table(dbms_xplan.display(format=>'ALLSTATS ALL ADVANCED')); ----------------------------------------------------------------------------------------------------- | Id | Operation | Name | E-Rows |E-Bytes| Cost | TQ |IN-OUT| PQ Distrib | ----------------------------------------------------------------------------------------------------- | 0 | SELECT STATEMENT | | 1 | | 56043 | | | | | 1 | SORT AGGREGATE | | 1 | | | | | | | 2 | PX COORDINATOR | | | | | | | | | 3 | PX SEND QC (RANDOM) | :TQ10002 | 1 | | | Q1,02 | P->S | QC (RAND) | | 4 | SORT AGGREGATE | | 1 | | | Q1,02 | PCWP | | | 5 | VIEW | | 1000M| | 56043 | Q1,02 | PCWP | | |* 6 | HASH JOIN | | 1000M| 7629M| 56043 | Q1,02 | PCWP | | | 7 | PX RECEIVE | | 1000K| 3906K| 175 | Q1,02 | PCWP | | | 8 | PX SEND HASH | :TQ10000 | 1000K| 3906K| 175 | Q1,00 | P->P | HASH | | 9 | PX BLOCK ITERATOR | | 1000K| 3906K| 175 | Q1,00 | PCWC | | | 10 | TABLE ACCESS FULL| T2 | 1000K| 3906K| 175 | Q1,00 | PCWP | | | 11 | PX RECEIVE | | 2000K| 7812K| 359 | Q1,02 | PCWP | | | 12 | PX SEND HASH | :TQ10001 | 2000K| 7812K| 359 | Q1,01 | P->P | HASH | | 13 | PX BLOCK ITERATOR | | 2000K| 7812K| 359 | Q1,01 | PCWC | | | 14 | TABLE ACCESS FULL| T4 | 2000K| 7812K| 359 | Q1,01 | PCWP | | -----------------------------------------------------------------------------------------------------

1. PX SEND HASH :TQ10000 - левая таблица целиком хэшируется и отправляется в :TQ10002
2. PX SEND HASH :TQ10001 - правая таблица построчно хэшируется и отправляется в :TQ10000.
Эта операция не требует дополнительных затрат памяти.
3. HASH JOIN - происходит построчное соединение таблиц используя хэши левой и правой таблицы.
4. PX SEND QC (RANDOM) :TQ10002 - отправка результата дальше, выполняется после полного выполнения этапа 2, т.е. после полного выполнения HASH JOIN этапа 3.
Способ 3: BUFFER SORT альтернативный план, в целом похожий на HASH JOIN BUFFERED
select * from ( select /*+ no_merge use_hash(a b) no_cpu_costing leading(a b) pq_distribute(b, none, broadcast) */ a.filler as a_filler, b.filler as b_filler from t2 a , t4 b where a.fk = b.fk ) where rownum > 1; select * from table(dbms_xplan.display(format=>'ALLSTATS ALL ADVANCED')); ------------------------------------------------------------------------------------------------------ | Id | Operation | Name | E-Rows |E-Bytes| Cost | TQ |IN-OUT| PQ Distrib | ------------------------------------------------------------------------------------------------------ | 0 | SELECT STATEMENT | | 1000M| 96G| 56606 | | | | | 1 | COUNT | | | | | | | | |* 2 | FILTER | | | | | | | | | 3 | PX COORDINATOR | | | | | | | | | 4 | PX SEND QC (RANDOM) | :TQ10001 | 1000M| 96G| 56606 | Q1,01 | P->S | QC (RAND) | | 5 | VIEW | | 1000M| 96G| 56606 | Q1,01 | PCWP | | |* 6 | HASH JOIN | | 1000M| 195G| 56606 | Q1,01 | PCWP | | | 7 | PX BLOCK ITERATOR | | 1000K| 100M| 175 | Q1,01 | PCWC | | | 8 | TABLE ACCESS FULL | T2 | 1000K| 100M| 175 | Q1,01 | PCWP | | | 9 | BUFFER SORT | | | | | Q1,01 | PCWC | | | 10 | PX RECEIVE | | 2000K| 200M| 359 | Q1,01 | PCWP | | | 11 | PX SEND BROADCAST | :TQ10000 | 2000K| 200M| 359 | Q1,00 | P->P | BROADCAST | | 12 | PX BLOCK ITERATOR | | 2000K| 200M| 359 | Q1,00 | PCWC | | | 13 | TABLE ACCESS FULL| T4 | 2000K| 200M| 359 | Q1,00 | PCWP | | ------------------------------------------------------------------------------------------------------Меняем метод рассылки данных на BROADCAST - копирование данных во все потоки.
В этом случае хэши правой таблицы не создаются, вместо этого данные помещаются в BUFFER SORT и используются при выполнении HASH JOIN

1. PX SEND BROADCAST :TQ10000 - правая таблица целиком пересылается в BUFFER SORT
Надо заметить, что BUFFER SORT в этом случае ничего не сортирует, а только буферизирует данны.
Объем потребленной памяти = размер правой таблицы * параллелизм (из-за broadcast во все потоки).
2. HASH JOIN - происходит полное соединение таблиц.
4. PX SEND QC (RANDOM) :TQ10001 - отправка результата дальше по мере готовности.
Объем памяти в BUFFER SORT не растет во время выполнения, что подтверждает предположение - правая таблица целиком помещается в буфер сразу на первом этапе.

Вывод: обычный hash join лучше, когда памяти мало, т.к. нет операций с temp/памятью.
Если памяти достаточно, то hash join buffered предпочтительней, т.к. oracle может сразу перейти к следующему этапу запроса и получить большую степерь параллелизма.
пятница, 29 января 2016 г.
Oracle: оптимизация параллельных запросов
Один из простых способов ускорения запросов - это их расспараллеливание.
Это делается достаточно просто через:
* Хинт
* Через установки сессий:
- N число потоков
В теории этого достаточно, чтобы ускорить запрос в разы.
Но есть ряд ситуаций в которых параллельность наоборот мешает.
Для начала разберемся с терминологией - посмотрим на параллельный план с join 2 таблиц:
Таблица T2 имеет особенность: fk_id_skew неравномерно заполнен и имеет перекос в сторону 1 - она встречается значительно чаще других.
Итак, выполнил простой запрос:
* regexp_replace в этом запросе нужен, чтобы данные отбирались не мгновенно и были видны в статистике затраты CPU.
* Хинты вставлены, чтобы запрос в плане выглядел также как написан тут.
Время выполнения выполнения запроса = 49сек.
Добавим хинт parallel(8) замен no_parallel.
Время выполнения = 8с, что в 6 раз быстрей.
Разберем для понимания план запроса:
Основопологающие фазы:
* PX BLOCK ITERATOR - чтение таблицы частями в несколько потоков
* PX SEND - 1 поток посылает данные другому. Важно знать, что только один producer (PX SEND) может быть активен в одно время, что накладывает ограничения на параллельный план выполнения, подробней: Вторая часть по распределению данных в параллельных запросах
** RANGE - данные будут разбиты на диапазоны (часто при сортировке)
** HASH - диапазон данных на основе их хэша (hash join, group by)
** RANDOM - случайная отправка
** BROADCAST - отправка таблицы во все потоки (часто на маленькой таблице, совместно с последующей ROUND ROBIN правой таблицы. Может быть проблемой производительности, если левая таблица значительно больше, чем указано в статистике, т.к. данные дублируются во все потоки)
** ROUND ROBIN - данные отправляются в потоки по кругу
Про способы распределения данных по потокам нужно поговорить отдельно:
** P->S - параллельность в последовательное выполнение (узкое место или конец запроса - вторая из основных причин замедления параллельного запроса)
** PCWP - параллельность с родителем: сканируем таблицу и сразу делаем join с другой
** PCWC - наоборот: передаем фильтр из внешнего потока и применяем при сканировании
* PX RECEIVE - получение данных из одного параллельного потока в другой
* PX SEND QC - отправка данных координатору
* PX COORDINATOR - приемник всех параллельных запросов
* TQ - Номер потока
Мы рассмотрели идеальный случай распараллеленого запроса. Остановимся подробней на причинах замеделений:
1. Событие "P->S - параллельность в последовательное выполнение"
2. PX SEND skew - Перекос данных
и ускорения:
3. Bloom filter
4. Partition wise
Это делается достаточно просто через:
* Хинт
/*+ parallel(N) */
* Через установки сессий:
alter session enable parallel dml; --для insert ALTER SESSION FORCE PARALLEL QUERY PARALLEL N;
- N число потоков
В теории этого достаточно, чтобы ускорить запрос в разы.
Но есть ряд ситуаций в которых параллельность наоборот мешает.
Для начала разберемся с терминологией - посмотрим на параллельный план с join 2 таблиц:
create table t_1 compress as select /*+ use_nl(a b) */ rownum as id , rpad('x', 100) as filler from (select /*+ cardinality(1e5) */ * from dual connect by level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b ; create table t_2 compress as select rownum as id , case when rownum <= 5e5 then mod(rownum, 2e6) + 1 else 1 end as fk_id_skew , rownum as fk_id_uniform , rpad('x', 100) as filler from (select /*+ cardinality(1e5) */ * from dual connect by level <= 1e5) a, (select /*+ cardinality(20) */ * from dual connect by level <= 20) b ; --соберем статистику begin dbms_stats.gather_table_stats(user, 't_1'); dbms_stats.gather_table_stats(user, 't_2'); end; /- Запрос 1
Таблица T2 имеет особенность: fk_id_skew неравномерно заполнен и имеет перекос в сторону 1 - она встречается значительно чаще других.
select COUNT(*) cnt, fk_id_skew from t_2 GROUP BY fk_id_skew ORDER BY cnt desc; CNT FK_ID_SKEW ---------- ---------- 1500000 1 1 22 1 30 1 34 ....- Запрос 2
Итак, выполнил простой запрос:
select count(t_2_filler) from ( select /*+ monitor no_parallel leading(t_1 t_2) use_hash(t_2) no_swap_join_inputs(t_2) */ t_1.id as t_1_id , t_1.filler as t_1_filler , t_2.id as t_2_id , t_2.filler as t_2_filler from t_1 , t_2 where t_2.fk_id_uniform = t_1.id and regexp_replace(t_2.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') >= regexp_replace(t_1.filler, '^\s+([[:alnum:]]+)\s+$', lpad('\1', 10), 1, 1, 'c') );- Запрос 3
* regexp_replace в этом запросе нужен, чтобы данные отбирались не мгновенно и были видны в статистике затраты CPU.
* Хинты вставлены, чтобы запрос в плане выглядел также как написан тут.
Время выполнения выполнения запроса = 49сек.
Добавим хинт parallel(8) замен no_parallel.
Время выполнения = 8с, что в 6 раз быстрей.
Разберем для понимания план запроса:
SELECT * FROM TABLE(DBMS_XPLAN.DISPLAY(format=>'PARALLEL')); ------------------------------------------------------------------------------------------------------------------- | Id | Operation | Name | Rows | Bytes | Cost (%CPU)| Time | TQ |IN-OUT| PQ Distrib | ------------------------------------------------------------------------------------------------------------------- | 0 | SELECT STATEMENT | | 1 | 213 | 619 (2)| 00:00:08 | | | | | 1 | SORT AGGREGATE | | 1 | 213 | | | | | | | 2 | PX COORDINATOR | | | | | | | | | | 3 | PX SEND QC (RANDOM) | :TQ10002 | 1 | 213 | | | Q1,02 | P->S | QC (RAND) | | 4 | SORT AGGREGATE | | 1 | 213 | | | Q1,02 | PCWP | | |* 5 | HASH JOIN | | 100K| 20M| 619 (2)| 00:00:08 | Q1,02 | PCWP | | | 6 | PX RECEIVE | | 2000K| 202M| 245 (2)| 00:00:03 | Q1,02 | PCWP | | | 7 | PX SEND HASH | :TQ10000 | 2000K| 202M| 245 (2)| 00:00:03 | Q1,00 | P->P | HASH | | 8 | PX BLOCK ITERATOR | | 2000K| 202M| 245 (2)| 00:00:03 | Q1,00 | PCWC | | | 9 | TABLE ACCESS FULL| T_1 | 2000K| 202M| 245 (2)| 00:00:03 | Q1,00 | PCWP | | | 10 | PX RECEIVE | | 2000K| 204M| 371 (1)| 00:00:05 | Q1,02 | PCWP | | | 11 | PX SEND HASH | :TQ10001 | 2000K| 204M| 371 (1)| 00:00:05 | Q1,01 | P->P | HASH | | 12 | PX BLOCK ITERATOR | | 2000K| 204M| 371 (1)| 00:00:05 | Q1,01 | PCWC | | | 13 | TABLE ACCESS FULL| T_2 | 2000K| 204M| 371 (1)| 00:00:05 | Q1,01 | PCWP | | -------------------------------------------------------------------------------------------------------------------- План 1
Основопологающие фазы:
* PX BLOCK ITERATOR - чтение таблицы частями в несколько потоков
* PX SEND - 1 поток посылает данные другому. Важно знать, что только один producer (PX SEND) может быть активен в одно время, что накладывает ограничения на параллельный план выполнения, подробней: Вторая часть по распределению данных в параллельных запросах
** RANGE - данные будут разбиты на диапазоны (часто при сортировке)
** HASH - диапазон данных на основе их хэша (hash join, group by)
** RANDOM - случайная отправка
** BROADCAST - отправка таблицы во все потоки (часто на маленькой таблице, совместно с последующей ROUND ROBIN правой таблицы. Может быть проблемой производительности, если левая таблица значительно больше, чем указано в статистике, т.к. данные дублируются во все потоки)
** ROUND ROBIN - данные отправляются в потоки по кругу
Про способы распределения данных по потокам нужно поговорить отдельно:
Стоит заметить, что данные бьются по значениям в столбцах строк, а не просто по строкам.
Это нужно, чтобы один и тотже диапозон данных из разных таблиц попал в один поток для join.
Если бы Oracle делал не так, то в 1 поток могли бы попасть совершенно разные данные и join нельзя было бы совершить.
На это стоит обратить внимание, т.к. это может являться и причиной замедлений выполнения параллельного запроса при сильном перекосе данных (О причинах замделенния параллельных запросов дальше)
** P->S - параллельность в последовательное выполнение (узкое место или конец запроса - вторая из основных причин замедления параллельного запроса)
** PCWP - параллельность с родителем: сканируем таблицу и сразу делаем join с другой
** PCWC - наоборот: передаем фильтр из внешнего потока и применяем при сканировании
* PX RECEIVE - получение данных из одного параллельного потока в другой
* PX SEND QC - отправка данных координатору
* PX COORDINATOR - приемник всех параллельных запросов
* TQ - Номер потока
Мы рассмотрели идеальный случай распараллеленого запроса. Остановимся подробней на причинах замеделений:
1. Событие "P->S - параллельность в последовательное выполнение"
2. PX SEND skew - Перекос данных
и ускорения:
3. Bloom filter
4. Partition wise
Подписаться на:
Сообщения (Atom)