понедельник, 4 февраля 2030 г.

Скахин Алексей / pihel

Скахин Алексей, pihel Биография:
Родился в Вологде 9 марта 1987 года. В 2009 году окончил ВоГУ по специальности программное обеспечение. Сейчас проживаю в Санкт-Петербурге.
В свободное от работы время люблю играть в волейбол, кататься на горных лыжах и велосипеде, путешествовать на машине.

Места работы:
6. Лента: performance specialist: Sap Abap, Oracle, Hadoop, 2016 - ...
5. Сигма: разработчик баз данных Oracle ( sql, pl/sql, Oracle BI, dwh ), 2015 - 2016
4. Tops Consulting: разработчик баз данных Oracle ( sql, pl/sql ), 2013 - 2015
3. Макси: разработка системы управления предприятием, программист (C++, Oracle, Pl/Sql) 2010-2013
2. Rstyle Softlab ОПР ДСУП: разработка системы управления предприятием, старший программист (rsl/vbs/fast report/ms sql) 2008-2010
1. ВНКЦ ЦЭМИ РАН: разработка внутренней информационной системы (php/js/mysql) 2007-2008 г.
0. Фриланс - web направление 2003-...

Skills:
1. Oracle development and performace tuning (OLTP and DWH)
2. Hadoop BigData Engineer: Hadoop deploy; Spark, Scala, Hive develop, Oozie workflow
Python, Bash for small scripts
3. Sap, Abap perfomance managment: tracing, root cause analyses
4. Sometimes Web/php development

Мои любимые статьи:
Внутреннее устройство Oracle: Оптимизация запросов Oracle: Другое:

суббота, 18 мая 2019 г.

Oracle 18: новые возможности для разработчика

Private Temporary Tables
Private temporary tables удаляются вконце транзакции или сессии. Такие таблицы хранятся в памяти и видимы только внутри одной сессии (аналог mssql временных таблиц).
CREATE PRIVATE TEMPORARY TABLE ORA$PTT_sales_ptt_transaction
    (time_id      DATE,
     amount_sold  NUMBER(10,2))
   ON COMMIT DROP DEFINITION;

User-Defined Sharding Method
LIST и RANGE партиции теперь можно вручнуть положить на нужный шард.
CREATE TABLESPACE ts1 IN SHARDSPACE west;
CREATE TABLESPACE ts2 IN SHARDSPACE central;

CREATE SHARDED TABLE accounts
( id             NUMBER
, account_number NUMBER
, customer_id    NUMBER
, state          VARCHAR(2) NOT NULL
)
PARTITION BY LIST (state)
( PARTITION p_northwest VALUES ('OR', 'WA') TABLESPACE ts1
, PARTITION p_southwest VALUES ('AZ', 'UT', 'NM') TABLESPACE ts2
)
;

Уровни консистенции для запросов к нескольким шардам
Можно установить уровень консистенции для запроса с нескольких шардов.
* STRONG - полная консистенция до SCN (по умолчанию)
* SHARD_LOCAL - консистенция SCN на уровне шарда. Если допустимо, то должно ускорить запрос.
* DELAYED_STANDBY_ALLOWED - возможность забирать данные из STANDBY , где данные могут быть с задержкой

Oracle RAC Sharding
Oracle RAC Sharding может связывать партиции таблицы с инстанциями Oracle RAC. И запрос с указанным ключем партицирования перенаправляется на нужную инстанцию, которая связанна с определенным шардом.

Analytic View FILTER FACT and ADD MEASURE Keywords
Запросы, которые читают аналитические представления, могут включать ключевые слова FILTER FACT для фильтрации данных до любых вычислений и ADD MEASURES для определения дополнительных вычисляемых показателей запроса.
https://docs.oracle.com/en/database/oracle/oracle-database/18/dwhsg/overview-analytic-views.html
https://docs.oracle.com/en/database/oracle/oracle-database/18/dwhsg/analytic-view-objects.html
CREATE OR REPLACE ANALYTIC VIEW sales_av
USING sales_fact
DIMENSION BY
  (time_attr_dim
    KEY month_id REFERENCES month_id
    HIERARCHIES (
      time_hier DEFAULT,
      time_season_hier),
   product_attr_dim
    KEY category_id REFERENCES category_id
    HIERARCHIES (
      product_hier DEFAULT),
   geography_attr_dim
    KEY state_province_id 
    REFERENCES state_province_id
    HIERARCHIES (
      geography_hier DEFAULT)
   )
MEASURES
 (sales FACT sales,
  units FACT units
  )
DEFAULT MEASURE SALES;

SELECT time_hier.member_name, TO_CHAR(sales, '999,999,999,999') AS sales 
  FROM sales_av HIERARCHIES(time_hier)
  WHERE time_hier.level_name = 'YEAR'
  ORDER BY time_hier.hier_order;
  
 MEMBER_NAME          SALES
-----------  -------------
CY2011       6,755,115,981
CY2012       6,901,682,399
CY2013       7,240,938,718
CY2014       7,579,746,353
CY2015       7,941,102,885

SELECT time_hier.member_name, TO_CHAR(sales, '999,999,999,999') AS sales
  FROM ANALYTIC VIEW (                      -- inline analytic view
  USING sales_av HIERARCHIES(time_hier)
  FILTER FACT (time_hier TO level_name = 'MONTH'
               AND TO_CHAR(month_end_date, 'Q') IN (1, 2)
               )
  )
  WHERE time_hier.level_name = 'YEAR')
  ORDER BY time_hier.hier_order;
  
MEMBER_NAME          SALES
-----------  -------------
CY2011       6,755,115,981
CY2012       6,901,682,399
CY2013       7,240,938,718
CY2014       7,579,746,353
CY2015       7,941,102,885

ALTER SYSTEM CANCEL SQL
Отключение SQL запроса без отключения сессии.

Inline External Tables
Возможность читать external данные без создания таблицы:
SELECT * FROM   EXTERNAL (   
    (time_id        DATE NOT NULL,     
     prod_id        INTEGER NOT NULL,
     quantity_sold  NUMBER(10,2),
     amount_sold    NUMBER(10,2))     
    TYPE ORACLE_LOADER     
    DEFAULT DIRECTORY data_dir1
    ACCESS PARAMETERS (
      RECORDS DELIMITED BY NEWLINE
      FIELDS TERMINATED BY '|')     
   LOCATION ('sales_9.csv') REJECT LIMIT UNLIMITED) sales_external;
Также external таблицы теперь можно партицировать
External таблицы теперь можно помещать в In-Memory

Polymorphic Tables
Функция принимает на вход таблицу и возвращает таблицу
Передача таблицы в параметр:
FUNCTION skip_col(tab TABLE, 
чтение данных из функции:
SELECT * FROM noop(emp);
синтаксис

Parallel Partition-Wise SQL Operations
SELECT DISTINCT, Window - теперь могут параллелиться, если идут в по колонке партиции

Modifying the Partitioning
Теперь стратегию партицирования можно менять налету: ALTER TABLE MODIFY PARTITION SQL

Улучшение SQL Tuning Advisor для Exadata
SQL Tuning Advisor получил дополнительные алгоритмы для оптимизации запросов для Oracle Exadata.

Concurrent SQL Execution with SQL Performance Analyzer
Теперь можно имитировать параллельную нагрузку от запросов в tuning set (раньше они всегда выполнялись последовательно)

Automatic In-Memory
Данные в In-Memory используют тепловые карты, статистику колонок и прочее для загрузки или выгрузки данных из Inmem.

Scalable Sequences
В начало последовательности добавляется префикс:
6 digit scalable sequence offset number = 3 digit instance offset number || 3 digit session offset number.
что разбрасывает данные равномерно и решает проблему конкуренции за крайнии блоки индекса.

Memoptimized Rowstore
inmemory k-v хранилище, для быстрого доступа по PK к очень популярным таблицам
ALTER TABLE sh.sales MEMOPTIMIZE FOR READ;
execute DBMS_MEMOPTIMIZE.POPULATE('OE','ORDERS'); 

понедельник, 18 февраля 2019 г.

RBTree - красно-черное дерево

Красно-чёрное дерево (Red-black tree, RB-Tree) — самобалансирующееся двоичное дерево поиска, гарантирующих логарифмический рост высоты от числа узлов.

Дополнительные требования к двоичному дереву:
1. Узел либо красный, либо чёрный
2. Корень — чёрный
3. Все листья (NIL) — чёрные (листья не содержат данных)
4. Оба потомка каждого красного узла — чёрные.
5. Всякий простой путь от данного узла до любого листового узла, являющегося его потомком, содержит одинаковое число чёрных узлов.
Эти ограничения реализуют главное свойство красно-чёрных деревьев: путь от корня до самого дальнего листа не более чем в два раза длиннее пути от корня до ближайшего листа.
Результатом является то, что дерево примерно сбалансировано.

Реализацию красно-черного дерева на java можно посмотреть тут: RBTree.java

Но в такой реализации оно сложно для понимания и программистам бд более близка реализация через B-дерево с фактором = 4.


В таком дереве каждый узел может содержать от 1 до 3 значений и от 2 до 4 указателей на потомков.

Если страница В-дерева содержит только одно значение, данный узел чёрный и имеет двух потомков.
Если страница содержит три значения, то центральный узел является чёрным, а каждый его сосед — красным.
Однако, если страница содержит два значения, любой узел может стать чёрным в красно-чёрном дереве (и тогда второй будет красным).

Пример реализации красно-черного дерева на основе B-дерева с размером блока = 3 можно посмотреть тут: BTree.scala

Для программистов БД нужно помнить 2 отличия B-дерева от B+дерева:
* при разделение элемент переносится наверх (не остается в листе)
* листы не связаны друг с другом ссылками


Красно-чёрные деревья являются одними из наиболее активно используемых на практике самобалансирующихся деревьев поиска.
В частности, контейнеры set и map в большинстве реализаций библиотеки STL языка C++, класс TreeMap языка Java, ассоциативный массив, когда реализация через списки дает слишком много коллизий
( подробное описание хэш массива тут: 3ий вариант реализации Oracle HashMap )

Популярность красно-чёрных деревьев связана с тем, что на них часто достигается подходящий баланс между степенью сбалансированности и сложностью поддержки сбалансированности при удалении.

четверг, 29 ноября 2018 г.

Введение в Scala, параллельную разработку и Apache Spark

В этой статье я хотел бы охватить все аспекты работы с данными на языке Scala - от примитивов языка к параллельному программированию и до анализа данных на кластере Spark.

четверг, 28 июня 2018 г.

Введение в нейронные сети

Эта статья-заметка освещает основные понятия нейронных сетей и как их строить применяя python.

четверг, 24 мая 2018 г.

LSM дерево: быстрый доступ по ключу в условиях интенсивной вставки

В условиях интенсивной вставки для быстрого доступа к данным обычные btree индексы не подходят.
Во многих базах (Bigtable, HBase, LevelDB, MongoDB, SQLite4, Tarantool, RocksDB, WiredTiger, Apache Cassandra, и InfluxDB ) используется LSM-дерево (Log-structured merge-tree — журнально-структурированное дерево со слиянием)
LSM дерево служит для хранения ключ-значения.
В данной статье рассмотрим двухуровневое дерево. Первый уровень целиком находится в памяти, вторая половина на диске.
Вставка идет всегда в первую часть в памяти, а поиск из всех частей. Когда размер данных в памяти превышает определенный порог, то она записывается на диск. После чего блоки на диске объединяются в один.
Рассмотрим алгоритм по частям:

1. Все записи в дереве маркируются порядковым номером glsn , при каждой вставке этот счетчик увеличивается

Структура для хранения ключа-значения и порядкового номера:
//запись с ключ-значение
class SSItem {
 Integer key;
 String value;
 //+ идентификатор последовательности вставки
 Integer lsn;
 
 SSItem(Integer key, String value, Integer lsn) {
  this.key = key;
  this.value = value;
  this.lsn = lsn;
 }

2. Структуры объядиняются в одну таблицу в памяти.
В моем алгоритме используется хэш таблица.
В реальности чаще всего используется отсортированная по ключу структура - это дает преимущества, что для поиска и слияния используется последовательное чтение, вместо рандомного.
class MemSSTable { 
 
 //из-за хэша нет поиска по диапазону
 HashMap<Integer, SSItem> itms = new HashMap<Integer, SSItem>();
 

3. Т.к. размер памяти ограничен, то при превышении определенного порога, данные должны быть скинуты на диск.
В моей реализации - это простой вариант без фоновых заданий и асинхронных вызовов
При достижении лимита будет фриз, т.к. таблица скидывается на диск.
В реальных системах используется упреждающая запись: данные на диск скидываются асинхронно и заранее, до достижения жесткого лимита памяти.
 void SaveToDisk() throws FileNotFoundException, UnsupportedEncodingException {
  indx.path = "sstable_" + indx.max_lsn + ".dat";
  PrintWriter writer = new PrintWriter(indx.path, "UTF-8");
  Integer pad = 0;
  //последовательно пишем 10 байт с длиной значения и само значение
  for(Entry<Integer, SSItem> entry : itms.entrySet()) {
   SSItem itm = entry.getValue();
   String val = itm.get();
   writer.print( val );
   //регистрируем в индексе смещения в файле
   indx.keys.put(itm.key, pad);
   pad = pad + val.length();
  }
  writer.close();
  
  LSMTree.indexes.add(indx);
 } //SaveToDisk

4. Получается, что половина всех данных у нас хранится в хэше в памяти, а остальное в виде файлов на диске.
Чтобы не перебирать все файлы на диске создается дополнительная индексная структура, которая будет хранить метаинформацию о файлах в памяти:
//индекс над таблицей с даными
class SSTableIndex {
 //метаданные:
 //минимальный и максимальный ключи в таблице
 Integer min_key;
 Integer max_key;
 //минимальный и максимальный порядковый lsn
 Integer min_lsn;
 Integer max_lsn;
 
 //если таблица на диске, то путь до файла
 String path;
 
 //ключ - смещение ключа в файле
 HashMap<Integer, Integer> keys = new HashMap<Integer, Integer>();
 
 //добавить ключ в индекс
 void add(Integer k) {
  //также обновляем метаданные
  max_lsn = LSMTree.glsn;
  if(min_lsn == null) min_lsn = max_lsn;
  if(min_key == null || k < min_key) min_key = k;
  if(max_key == null || k > max_key) max_key = k;
  //добавление идет в память в хэш таблицу, на первом этапе смещения в файле нет
  keys.put(k, 0);
 }

5. Управляющий объект всего дерева хранит ссылку на таблицу с данными в памяти и на мета-индексы данных на диске:
public class LSMTree {
 
 static int glsn = 0;
 //максимальный размер, после которого таблица должна быть скинута на диск
 //для упрощения алгоритма = числу записей, а не размеру в байтах
 final static int max_sstable_size = 10; 
 
 //текущая таблица в памяти, куда вставляем данные
 MemSSTable MemTable;
 
 //все индексы, даже для таблиц на диске, хранятся в памяти
 static LinkedList<SSTableIndex> indexes = new LinkedList<SSTableIndex>();

6. Когда все объекты описаны, давайте посмотрим как происходит добавление нового элемента.
Добавление всегда идет в таблицу в памяти, что гарантирует быструю вставку:
public class LSMTree {
 //....
 
 //добавить запись
 public void add(Integer k, String v) {
  MemTable.add(k, v);
 }
Подробней логика добавления в таблицу в памяти:
* Увеличиваем глобальный счетчик элементов LSMTree.glsn
* Если число элментов превысило порог, то таблица скидывается на диск и очищается
В реальности это конечно не число элементов, а объем в байтах.
* Создается новый элемент SSItem
* И доблавляется в хэш массив
Причем, если элемент до этого уже существовал, то он перезаписывается.
В реальности хранятся все записи, чтобы была возможность поддержки транзакционности.
* Кроме этого в индексе обновляются метаданные indx.add
class MemSSTable { 
 //.....
 
 //добавить новый элемент
 void add(Integer k, String v) {
  //увеличиваем глобальный счетчик операций glsn
  LSMTree.glsn++;
  
  //если размер превышает, 
  if(itms.size() >= LSMTree.max_sstable_size) {
   try {
    //то сохраняем таблицу на диск
    //в реальных движках используется упреждающая фоновая запись
    //когда папять заполнена на N% (n<100), то данные скидываются на диск заранее, чтобы избежать фриза при сбросе памяти и записи на диск
    SaveToDisk();
   } catch (FileNotFoundException | UnsupportedEncodingException e) {
    e.printStackTrace();
    return;
   }
   //очищаем данные под новые значения
   indx = new SSTableIndex();
   itms = new HashMap<Integer, SSItem>();
  }
  
  //обновляем медаданные в индексе
  indx.add(k);
  
  SSItem itm = new SSItem(k, v, indx.max_lsn);
  
  //в моей реализации, при повторе ключ перезаписывается
  //т.е. транзакционность и многоверсионность тут не поддерживается
  itms.put(k,  itm);
  
 } //add
Если после вставки нового элемента старые данные оказались на диске, то в метаданных индекса прописывается ссылка на точное смещение в файле:
class SSTableIndex {
 //...
 
 //если таблица на диске, то путь до файла
 String path;
 
 //ключ - смещение
 HashMap<Integer, Integer> keys = new HashMap<Integer, Integer>();

7. С вставкой разобрались, теперь обратная операция - поиск элемента в дереве:
* Сперва мы проверяем наличие ключа в хэш таблице в памяти. Если элемент нашелся, то на этом все.
* Если элемента нет, то мы пробегамся по всем метаиндексам в поиске нашего ключа.
* проверяем, что ключ входит в диапазон indx.min_key - indx.max_key индекса
* И для полной точности проверяем наличие ключа в хэше ключей indx.keys.containsKey
* Если элемент нашелся, то это еще не конец, цикл продолжается, пока мы не переберем все индексы.
Т.к. ключ может добавляться несколько раз в разные промежутки времени.
* Из всех совпадений выбираем индекс с максимальным счетчиком вставки - это последнее изменение ключа
public class LSMTree {
 //.....
 
 //получить значение по ключу
 String getKey(Integer key) {
  //сперва смотрим в памяти
  String val = MemTable.getKey(key);
  
  if(val == null) {
   SSTableIndex indx_with_max_lsn = null;
   //потом таблицу по индексам, которая содержит наш ключ
   //если содержится в нескольких, то берем с максимальным lsn, т.к. это последнее изменение
   for (SSTableIndex indx : indexes) {
    Integer max_lsn = 0;
    if(key >= indx.min_key && key <= indx.max_key && max_lsn < indx.max_lsn ) {
     if(indx.keys.containsKey(key)) {
      max_lsn = indx.max_lsn;
      indx_with_max_lsn = indx;
     }
    }
   }
   //читаем из таблицы с диска
   if(indx_with_max_lsn != null) {
    try {
     return indx_with_max_lsn.getByPath(key);
    } catch (IOException e) {
     e.printStackTrace();
    }
   }
  }
  
  return val;
 }
* Определим нужный индекс обращаемся по нему к таблице на диске:
** Открываем нужный файл
В реальности, скорей всего, файы постоянно держатся открытыми для экономии времени.
** Следуя смещенями из индекса переходим к нужной точке файла file.seek
** И считываем значение file.read
class SSTableIndex {
 //...
 
 //получить значение ключа из открытого файла
 String getByPath(Integer key, RandomAccessFile file) throws IOException {
  //получаем смещение в файле для ключа из индекса
  Integer offset = keys.get(key);
  
  //смещаемся в файле
  file.seek(offset);
  
  //резервируем 10 байт под переменную с длинной значения
  byte[] lenb = new byte[10];
  file.read(lenb, 0, 10);
  
  Integer len = Integer.parseInt(new String(lenb, StandardCharsets.UTF_8));
 
  file.seek(offset + 10);
  
  //считываем значение
  byte[] valb = new byte[len];
  file.read(valb, 0, len);
  
  return new String(valb, StandardCharsets.UTF_8);
 } //getByPath
 
 //получить значение ключа с диска
 String getByPath(Integer key) throws IOException {
  if(!keys.containsKey(key)) return null;
  
  RandomAccessFile file = new RandomAccessFile(path, "r");
  
  String val = getByPath(key, file);
  
  file.close();

  return val;
 }

8. Т.к. ключ может вставляться неограниченное число раз, то со временем он может находится в нескольких файлах на диске одновременно.
Также, если бы LSM дерево поддерживало транзакционность, то в файлах также хранились бы разные версии одного ключа (изменения или удаления).
Для оптимизации последующего поиска применяется операция слияния нескольких файлов в один:
* Сортируем индексы по убыванию lsn
* Последовательно считываем элементы из первого индекса и записываем в новый
* Если элемент уже присутствует в объединенном индексе, то он пропускается
* Создается новый единственный файл и индекс со всеми ключами и значениями
* Старые файлы и индексы удаляются
public class LSMTree {
 //....
 
 //объединить несколько таблиц на диске в 1 большую
 void merge() throws IOException {
  Integer min_key = null;
  Integer max_key = null;
  Integer min_lsn = null;
  Integer max_lsn = null;
  
  //сортируем таблицы по убыванию lsn
  //чтобы вначале были самые свежие ключи
  Collections.sort(indexes, new Comparator<SSTableIndex>() {
   @Override
   public int compare(SSTableIndex o1, SSTableIndex o2) {
    if(o1.max_lsn > o2.max_lsn) {
     return -1;
    } else if(o1.max_lsn < o2.max_lsn) {
     return 1;
    }
    return 0;
   }
  });
  
  SSTableIndex merge_indx = new SSTableIndex();
  
  Integer pad = 0;
  merge_indx.path = "sstable_merge.dat";
  PrintWriter writer = new PrintWriter(merge_indx.path, "UTF-8");
  
  //пробегаемся по всем индексам, чтобы слить в 1
  for (SSTableIndex indx : indexes) {
   if(min_lsn == null || indx.min_lsn < min_lsn) min_lsn = indx.min_lsn;
   if(min_key == null || indx.min_key < min_key) min_key = indx.min_key;
   if(max_key == null || indx.max_key > max_key) max_key = indx.max_key;
   if(max_lsn == null || indx.max_lsn > max_lsn) max_lsn = indx.max_lsn;
   
   RandomAccessFile file = new RandomAccessFile(indx.path, "r");
   
   //т.к. данные в таблицах не упорядочены, это приводит к рандомным чтениям с диска
   //в реальности делают упорядочнный по ключу массив, чтобы делать быстрые последовательные чтения
   for(Entry<Integer, Integer> entry : indx.keys.entrySet()) {
    //оставляем запись только с максимальным lsn
    Integer key = entry.getKey();
    if(!merge_indx.keys.containsKey(key)) {
     String val = indx.getByPath(key, file);
     
     SSItem itm = new SSItem(key, val, 0);
     String itmval = itm.get();
     
     writer.print( itmval );
     merge_indx.keys.put(key, pad);
     pad = pad + itmval.length();     
    }
   }
   //записываем и удаляем старые файлы
   file.close();
   //delete
   File fd = new File(indx.path);
   fd.delete();
  }  
  
  merge_indx.min_lsn = min_lsn;
  merge_indx.min_key = min_key;
  merge_indx.max_key = max_key;
  merge_indx.max_lsn = max_lsn;
  
  writer.close();
  
  //переименовываем к обычному имени
  File fd = new File(merge_indx.path);
  merge_indx.path = "sstable_" + merge_indx.max_lsn + ".dat";
  File fdn = new File(merge_indx.path);
  fd.renameTo(fdn);
  
  indexes = new LinkedList<SSTableIndex>();
  indexes.add(merge_indx);
  
 } //merge
В моей реализации используется хэш массив, что дает большое число случайных чтений с диска.
В реальности данные хранят в отсотированной структуре, что позволят выполнять слияние 2 отсортированных массивов в один за линейное время используя только быстрое последовательное чтение.


Полный код класса LSM дерева можно посмотреть на github

пятница, 5 января 2018 г.

Hive: авторизация и аутентификация

Сборка Cloudera Hadoop не содержит никаких настроек авторизации и аутентификации, что конечно плохо, т.к. не дает возможности разделить права среди пользователей.
Опишу один из вариантов настройки авторизации и аутентификации.

Custom аутентификация:

1. Создаем linux пользователя на сервере hadoop:
sudo useradd user1
#создаем домашнюю директорию в hdfs
sudo -u hdfs hadoop fs -mkdir /user/user1
#задаем пользователя владельцем
sudo -u hdfs hadoop fs -chown user1 /user/user1
sudo passwd user1
#лочим пользователя в linux
sudo passwd -l user1

2. Создаем пользователя в hive
sudo -u hive hive
CREATE SCHEMA user1 LOCATION "/user/user1";

3. Создаем java класс, через который будет происходить проверка пользователя:
package hive.lenta;

import java.util.Hashtable;
import javax.security.sasl.AuthenticationException;
import org.apache.hive.service.auth.PasswdAuthenticationProvider;
import java.io.*;


public class LentaAuthenticator implements PasswdAuthenticationProvider {

  Hashtable<String, String> store = null;

  public LentaAuthenticator () {
    store = new Hashtable<String, String>();
 
 try {
  readPwd();  
 } catch (IOException e) {
  e.printStackTrace();
 }
 
  } //LentaAuthenticator
  
  public void readPwd () throws IOException {
 BufferedReader br = new BufferedReader(new FileReader("/u01/jar/pwd.ini"));
 String line;
 while ((line = br.readLine()) != null) {
  String login[] = line.split(";");
  if(login.length == 2) {
   store.put(login[0].trim(), login[1].trim());
  }
 }
 br.close();

  } //readPwd

  @Override
  public void Authenticate(String user, String  password)
      throws AuthenticationException {

    String storedPasswd = store.get(user);

    if (storedPasswd != null && storedPasswd.equals(password))
      return;
     
    throw new AuthenticationException("LentaAuthenticator: Error validating user");
  }

}
В файле /u01/jar/pwd.ini лежит логин и пароль, разделенные точкой с запятой:
user1;passwd

4. Компилируем класс в объект и собираем jar файл:
компилировать нужно именно той java под которой работает hadoop:
/usr/java/jdk1.7.0_67-cloudera/bin/javac -classpath /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/hive-service.jar -d /home/askahin/auth /home/askahin/auth/LentaAuthenticator.java
/usr/java/jdk1.7.0_67-cloudera/bin/jar -cf /home/askahin/auth/jar/lentauth.jar ./hive
sudo cp jar/lentauth.jar /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/
sudo chmod 777 /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/lentauth.jar
sudo cp jar/lentauth.jar /u01/jar/lentauth.jar
sudo chmod 777 /u01/jar/lentauth.jar

5. Включаем CUSTOM аутентификацию в настройках Cloudera Manager:
прописываем наш аутентификатор в настройках:
* Ищем все настройки по паттерну "hive-site.xml" в настройках Cloudera.
Делать нужно именно через cloudera, а не напрямую в файле "hive-site.xml", т.к. менеджер переписывает файл из памяти процесса.

Нажимаем view as xml и во всех окнах и вводим
<property>
    <name>hive.server2.authentication</name>
    <value>CUSTOM</value>
  </property>
  <property>
    <name>hive.server2.custom.authentication.class</name>
    <value>hive.lenta.LentaAuthenticator</value>
</property>

Также ставим флажки suppress parameter validation
Иначе настройки не попадут в итоговый XML файл
* Для работы также нужна включенная опция hive.server2.enable.doAs = true
Она означает, что все запросы будут идти из под пользователя, который прошел аутентификацию.

5. Задаем путь до кастомной папки с jar файлами:
Настройка: «HIVE_AUX_JARS_PATH»
Значение «/u01/jar»,
Также ставим suppress parameter validation


6. Все готово, перезапускам HIVE из консоли Cloudera

7. Теперь при аутентификации нужно указывать способ:
AuthMech=3


После того как все настроили, мы уверены, что пользователь прошел аутентификацию.
Войти без пароля больше не получится.

Дальше настраиваем авторизацию - выдачу прав пользователям на конкретные объекты.
Проще всего это сделать через расширенные права linux:


ACL авторизация


1. Включаем ACL права через Cloudera Manager
настройка "dfs.namenode.acls.enabled" = true


2. После этого можно распределять права на папки:
Сброс всех прав:
sudo -u hdfs hdfs dfs -setfacl -R -b /

Даем пользователю user1 права на создание таблиц в своей схеме
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:rwx,group::r-x,other::--- /user/user1
к остальному хранилищу hive даем права только на чтение:
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:r-x,group::r-x,other::--- /user/hive/warehouse
к какой-то из таблиц закрываем доступ:
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:---,group::r-x,other::--- /user/hive/warehouse/table1
Один минус у ACL - это остается возможность выполнить DROP table, даже если нет прав доступа на чтение или запись.
Как победить эту проблему я не понял.

Так же стоит не забывать, что таким образом мы защитили только HIVE, возможность читать данные через HDFS по прежнему остались.
(Как читать данные из HDFS и HIVE через java можно посмотреть в моем github: https://github.com/pihel/java/blob/master/bigdata/Hdfs2Hive.java )

Чтобы закрыть доступ чтения через HDFS мы используем закрытие всех портов на сервере, кроме 10000 , который используется службой HIVE.

суббота, 19 августа 2017 г.

Мониторинг нагрузки Sap Hana и Mssql

Предоставления для мониторинга нагрузки на субд sap hana и mssql:

SAP HANA

Представлю основные запросы для мониторинга нагрузки на sap hana в разрезе времени (чтото похожее на awr отчеты в oracle):

1. Использование CPU, Озу и физической памяти в разрезе 10 минут:
select concat( SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15), '0') as hh,
round(100*SUM(TOTAL_CPU_USER_TIME_DELTA) / ( SUM(TOTAL_CPU_IDLE_TIME_DELTA)+SUM(TOTAL_CPU_SYSTEM_TIME_DELTA)+SUM(TOTAL_CPU_USER_TIME_DELTA)+SUM(TOTAL_CPU_WIO_TIME_DELTA) ),2) as cpu_prc,
round(AVG(INSTANCE_TOTAL_MEMORY_USED_SIZE)/1024/1024/1024,2) MEMORY_USED,
round(AVG(INSTANCE_TOTAL_MEMORY_ALLOCATED_SIZE)/1024/1024/1024,2) MEMORY_ALLOCATED,
round(100*SUM(INSTANCE_TOTAL_MEMORY_USED_SIZE) / ( SUM(INSTANCE_TOTAL_MEMORY_ALLOCATED_SIZE) ),2) MEMORY_USED_PRC,
round(AVG(FREE_PHYSICAL_MEMORY)/1024/1024/1024,2) FREE_PHYSICAL_MEMORY,
round(AVG(USED_PHYSICAL_MEMORY)/1024/1024/1024,2) USED_PHYSICAL_MEMORY,
round(100*SUM(USED_PHYSICAL_MEMORY) / ( SUM(FREE_PHYSICAL_MEMORY)+SUM(USED_PHYSICAL_MEMORY) ),2) USED_PHYSICAL_MEMORY_PRC,
round(AVG(FREE_SWAP_SPACE)/1024/1024/1024,2) FREE_SWAP_SPACE,
round(AVG(USED_SWAP_SPACE)/1024/1024/1024,2) USED_SWAP,
round(100*SUM(USED_SWAP_SPACE) / ( SUM(FREE_SWAP_SPACE)+SUM(USED_SWAP_SPACE) ),2) USED_SWAP_PRC
from _SYS_STATISTICS.HOST_RESOURCE_UTILIZATION_STATISTICS
where TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyymmddhh24mi') between '2017081410' and '201708142359'
group by SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15)
order by SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15) desc
В графическом виде это будет выглядеть так:


2. Интенсивность чтения и записи:
select concat( SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15), '0') as hh,
SUM(TOTAL_IO_TIME_DELTA)/1000/1000 as io_sec, SUM(TOTAL_READ_SIZE_DELTA) read_size, SUM(TOTAL_READ_TIME_DELTA)/1000/1000 as read_sec,
SUM(TOTAL_WRITE_SIZE_DELTA) as write_size, SUM(TOTAL_WRITE_TIME_DELTA/1000/1000) as write_sec,
SUM(TOTAL_FAILED_READS_DELTA) as failed_reads, SUM(TOTAL_FAILED_WRITES_DELTA) as failed_writes
from _SYS_STATISTICS.HOST_VOLUME_IO_TOTAL_STATISTICS
where TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyymmddhh24mi') between '2017081410' and '201708142359'
group by SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15)
order by SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15) desc
В графическом виде это будет выглядеть так:
Время чтения в 10 минут:

Размер чтений в 10 минут:


3. Топовые sql запросы по времени выполнения:
select * from (
       select v.* , ROW_NUMBER() OVER(pARTITION BY hh ORDER BY DELTA_TIME desc) as rn
       from (
             select TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24') as hh,
             AVG(AVG_EXECUTION_TIME)/1000/1000, SUM(EXECUTION_COUNT),
             AVG(AVG_EXECUTION_TIME)/1000/1000*SUM(EXECUTION_COUNT) as DELTA_TIME,
             STRING_AGG(USER_NAME), TO_VARCHAR(STATEMENT_STRING) as txt, MAX(index) as index
             from _SYS_STATISTICS.HOST_SQL_PLAN_CACHE
             where SERVER_TIMESTAMP between to_date('01.08.2017', 'dd.mm.yyyy') and to_date('02.08.2017', 'dd.mm.yyyy')
             group by TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24'), TO_VARCHAR(STATEMENT_STRING)
       ) v
)
where rn <= 5
order by hh, rn;
Получится что-то вроде такого топа:


4. Использование сети с момента сброса (system reset):
ALTER SYSTEM RESET MONITORING VIEW SYS. M_SERVICE_NETWORK_IO_RESET;
select SENDER_HOST, RECEIVER_HOST, sum(SEND_SIZE), sum(RECEIVE_SIZE), sum(SEND_DURATION)/1000/1000, sum(RECEIVE_DURATION)/1000/1000, sum(REQUEST_COUNT)
from SYS.M_SERVICE_NETWORK_IO_RESET Group BY SENDER_HOST, RECEIVER_HOST


MSSQL

топ sql запросов в разрезе показателей с последней перезагрузки бд:
SELECT  TOP 20 creation_time 
        ,last_execution_time
        ,total_physical_reads
        ,total_logical_reads 
        ,total_logical_writes
        , execution_count
        , total_worker_time
        , total_elapsed_time
        , total_elapsed_time / execution_count / 1000 avg_elapsed_time_ms
        ,SUBSTRING(st.text, (qs.statement_start_offset/2) + 1,
         ((CASE statement_end_offset
          WHEN -1 THEN DATALENGTH(st.text)
          ELSE qs.statement_end_offset END
            - qs.statement_start_offset)/2) + 1) AS statement_text,
   qp.query_plan,
   st.text
FROM sys.dm_exec_query_stats AS qs
CROSS APPLY sys.dm_exec_sql_text(qs.sql_handle) st
CROSS APPLY sys.dm_exec_query_plan(qs.plan_handle) qp
ORDER BY total_worker_time desc;

вторник, 15 августа 2017 г.

HIVE: Своя быстрая функция замен встроенной

Сегодня расскажу об одном способе ускорения запросов с аналитическими функциями в субд HIVE, работающей поверх Hadoop.

Один из вариантов ускорить HIVEQL запрос - это переписать встроенную аналитическую функцию на свой упрощенный вариант.

К примеру функция ROW_NUMBER(OVER PARTITION BY c1 ORDER BY c2) имеет достаточно сложную реализацию (github) только для того чтобы посчитать номер строки в группе.

Пример запроса с row_number:
create table tmp_table stored as orc as
select v.material, v.client_id, row_number() over (partition by v.client_id order by v.clientsum desc, v.checkcount desc) as rn
from pos_rec_itm_tst v;

Можно реализовать значительно упрощенную версию подсчета номера строки в группе.
На вход функции Rank.evaluate подаем значение группы key (то что было в partition by) и инкрементируем значение счетчика counter.
Если приходит новая группа, то счетчик сбрасывается на 0, а в переменную группы "this.last_key" записывается значение новой группы:
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
public final class Rank extends UDF{
    private int  counter;
    private String last_key;
    public int evaluate(final String key){
      if ( !key.equals(this.last_key) ) {
         this.counter = 0;
         this.last_key = key;
      }
      return (++this.counter);
    }
}
Понятно, что для правильной работы этой функции набор данных нужно предварительно отсортировать по группе партицирования "partition by", а потом по остальным полям "order by".

Пример запроса с собственной функцией:
create table tmp_table stored as orc as
select v.material, v.client_id, myrank(v.client_id) as rn from (
  SELECT client_id, clientsum, checkcount, material FROM pos_rec_itm_tst
  DISTRIBUTE BY client_id SORT BY clientsum desc, checkcount desc
) v;
Дополнительно сортировку в HIVE можно ускорить, если распараллелить мапперы по полю партицирования "partition by", а внутри этих групп сортировать по полям из "order by".
За счет параллельности мы опять же ускорим сортировку.


Чтобы создать такую функцию в HIVE нужно скомпилировать ее из java исходников:

$> /путь_до_java_который_используется_в_hive/bin/javac -classpath /путь_до_hive/lib/hive/lib/hive-serde-1.7.jar:/путь_до_hive/lib/hive/lib/hive-exec.jar:/путь_до_hadoop/lib/hadoop/client-0.20/hadoop-core.jar -d /путь_куда_компилим /путь_до_программы.java
$> /путь_до_java_который_используется_в_hive/bin/jar -cf название_jar_программы.jar com/example/hive/udf/название_класса.class

запускаем hive и регистрируем наш jar , как функцию с произвольным названием:
hive> add jar Rank.jar;
hive> create temporary function myrank as 'com.example.hive.udf.Rank';

Такое небольшое изменение ускорит выполнение запроса на 10-20%.