понедельник, 4 февраля 2030 г.

Скахин Алексей / pihel

Скахин Алексей, pihel Биография:
Родился в городе Вологда 9 марта 1987 года. В 2004 году завершил обучение в средней школе №12 города Вологда. В 2009 году окончил ВоГУ (бывший ВоГТУ) по специальности программное обеспечение. Тема дипломного проекта: "Синтез виртуальной среды с применением скалярных и аналитических функций возмущения и трехмерных массивов вокселей". С 2013 года проживаю в городе Санкт-Петербург.

В бывшем радиолюбитель: 3ий взрослый разряд по радиотелеграфии, радиолюбитель в кв диапазоне. Позывной ra1qkj.

О себе:
В свободное от работы время люблю играть в волейбол, кататься на горных лыжах и велосипеде, путешествовать на машине.

Места работы:
6. Лента: perfomance specialist: Sap, Oracle ( sql, pl/sql, abap ), 2016 - ...
5. Сигма: разработчик баз данных Oracle ( sql, pl/sql, Oracle BI, dwh ), 2015 - 2016
4. Tops Consulting: разработчик баз данных Oracle ( sql, pl/sql ), 2013 - 2015
3. Макси: разработка системы управления предприятием, программист (C++, Oracle, Pl/Sql) 2010-2013
2. Rstyle Softlab ОПР ДСУП: разработка системы управления предприятием, старший программист (rsl/vbs/fast report/ms sql) 2008-2010
1. ВНКЦ ЦЭМИ РАН: разработка внутренней информационной системы (php/js/mysql) 2007-2008 г.
0. Фриланс - web направление 2003-...

Навыки:
1. PHP: занимаюсь с 2004 года
 а. Bitrix - интеграция, интернет магазины, информационные порталы, государственные порталы)
 b. Drupal - интеграция, разработка модулей
2. JS: JQuery, ExtJS
3. C++: библиотека QT, C++ Builder
4. VBS: автоматизация рутины (авто сборка ПО, создание копий приложений и т.д.)
5. СУБД: Oracle PlSql, MySql, MS SQL (OLTP и OLAP), SqLite
6. Linux: пользователь Fedora и Ubuntu
7. Управление версиями: SVN, GIT, MS VSS
8. Остальное: FastReport, Excel (ActiveX, XML), Open(Libre)Office (XML, UNO), SVG/VML (raphaeljs, extjs), Flex, ITIL

Мои любимые статьи:
Внутреннее устройство Oracle: Оптимизация запрсов Oracle: Другое:

четверг, 24 мая 2018 г.

LSM дерево: быстрый доступ по ключу в условиях интенсивной вставки

В условиях интенсивной вставки для быстрого доступа к данным обычные btree индексы не подходят.
Во многих базах (Bigtable, HBase, LevelDB, MongoDB, SQLite4, Tarantool, RocksDB, WiredTiger, Apache Cassandra, и InfluxDB ) используется LSM-дерево (Log-structured merge-tree — журнально-структурированное дерево со слиянием)
LSM дерево служит для хранения ключ-значения.
В данной статье рассмотрим двухуровневое дерево. Первый уровень целиком находится в памяти, вторая половина на диске.
Вставка идет всегда в первую часть в памяти, а поиск из всех частей. Когда размер данных в памяти превышает определенный порог, то она записывается на диск. После чего блоки на диске объединяются в один.
Рассмотрим алгоритм по частям:

1. Все записи в дереве маркируются порядковым номером glsn , при каждой вставке этот счетчик увеличивается

Структура для хранения ключа-значения и порядкового номера:
//запись с ключ-значение
class SSItem {
 Integer key;
 String value;
 //+ идентификатор последовательности вставки
 Integer lsn;
 
 SSItem(Integer key, String value, Integer lsn) {
  this.key = key;
  this.value = value;
  this.lsn = lsn;
 }

2. Структуры объядиняются в одну таблицу в памяти.
В моем алгоритме используется хэш таблица.
В реальности чаще всего используется отсортированная по ключу структура - это дает преимущества, что для поиска и слияния используется последовательное чтение, вместо рандомного.
class MemSSTable { 
 
 //из-за хэша нет поиска по диапазону
 HashMap<Integer, SSItem> itms = new HashMap<Integer, SSItem>();
 

3. Т.к. размер памяти ограничен, то при превышении определенного порога, данные должны быть скинуты на диск.
В моей реализации - это простой вариант без фоновых заданий и асинхронных вызовов
При достижении лимита будет фриз, т.к. таблица скидывается на диск.
В реальных системах используется упреждающая запись: данные на диск скидываются асинхронно и заранее, до достижения жесткого лимита памяти.
 void SaveToDisk() throws FileNotFoundException, UnsupportedEncodingException {
  indx.path = "sstable_" + indx.max_lsn + ".dat";
  PrintWriter writer = new PrintWriter(indx.path, "UTF-8");
  Integer pad = 0;
  //последовательно пишем 10 байт с длиной значения и само значение
  for(Entry<Integer, SSItem> entry : itms.entrySet()) {
   SSItem itm = entry.getValue();
   String val = itm.get();
   writer.print( val );
   //регистрируем в индексе смещения в файле
   indx.keys.put(itm.key, pad);
   pad = pad + val.length();
  }
  writer.close();
  
  LSMTree.indexes.add(indx);
 } //SaveToDisk

4. Получается, что половина всех данных у нас хранится в хэше в памяти, а остальное в виде файлов на диске.
Чтобы не перебирать все файлы на диске создается дополнительная индексная структура, которая будет хранить метаинформацию о файлах в памяти:
//индекс над таблицей с даными
class SSTableIndex {
 //метаданные:
 //минимальный и максимальный ключи в таблице
 Integer min_key;
 Integer max_key;
 //минимальный и максимальный порядковый lsn
 Integer min_lsn;
 Integer max_lsn;
 
 //если таблица на диске, то путь до файла
 String path;
 
 //ключ - смещение ключа в файле
 HashMap<Integer, Integer> keys = new HashMap<Integer, Integer>();
 
 //добавить ключ в индекс
 void add(Integer k) {
  //также обновляем метаданные
  max_lsn = LSMTree.glsn;
  if(min_lsn == null) min_lsn = max_lsn;
  if(min_key == null || k < min_key) min_key = k;
  if(max_key == null || k > max_key) max_key = k;
  //добавление идет в память в хэш таблицу, на первом этапе смещения в файле нет
  keys.put(k, 0);
 }

5. Управляющий объект всего дерева хранит ссылку на таблицу с данными в памяти и на мета-индексы данных на диске:
public class LSMTree {
 
 static int glsn = 0;
 //максимальный размер, после которого таблица должна быть скинута на диск
 //для упрощения алгоритма = числу записей, а не размеру в байтах
 final static int max_sstable_size = 10; 
 
 //текущая таблица в памяти, куда вставляем данные
 MemSSTable MemTable;
 
 //все индексы, даже для таблиц на диске, хранятся в памяти
 static LinkedList<SSTableIndex> indexes = new LinkedList<SSTableIndex>();

6. Когда все объекты описаны, давайте посмотрим как происходит добавление нового элемента.
Добавление всегда идет в таблицу в памяти, что гарантирует быструю вставку:
public class LSMTree {
 //....
 
 //добавить запись
 public void add(Integer k, String v) {
  MemTable.add(k, v);
 }
Подробней логика добавления в таблицу в памяти:
* Увеличиваем глобальный счетчик элементов LSMTree.glsn
* Если число элментов превысило порог, то таблица скидывается на диск и очищается
В реальности это конечно не число элементов, а объем в байтах.
* Создается новый элемент SSItem
* И доблавляется в хэш массив
Причем, если элемент до этого уже существовал, то он перезаписывается.
В реальности хранятся все записи, чтобы была возможность поддержки транзакционности.
* Кроме этого в индексе обновляются метаданные indx.add
class MemSSTable { 
 //.....
 
 //добавить новый элемент
 void add(Integer k, String v) {
  //увеличиваем глобальный счетчик операций glsn
  LSMTree.glsn++;
  
  //если размер превышает, 
  if(itms.size() >= LSMTree.max_sstable_size) {
   try {
    //то сохраняем таблицу на диск
    //в реальных движках используется упреждающая фоновая запись
    //когда папять заполнена на N% (n<100), то данные скидываются на диск заранее, чтобы избежать фриза при сбросе памяти и записи на диск
    SaveToDisk();
   } catch (FileNotFoundException | UnsupportedEncodingException e) {
    e.printStackTrace();
    return;
   }
   //очищаем данные под новые значения
   indx = new SSTableIndex();
   itms = new HashMap<Integer, SSItem>();
  }
  
  //обновляем медаданные в индексе
  indx.add(k);
  
  SSItem itm = new SSItem(k, v, indx.max_lsn);
  
  //в моей реализации, при повторе ключ перезаписывается
  //т.е. транзакционность и многоверсионность тут не поддерживается
  itms.put(k,  itm);
  
 } //add
Если после вставки нового элемента старые данные оказались на диске, то в метаданных индекса прописывается ссылка на точное смещение в файле:
class SSTableIndex {
 //...
 
 //если таблица на диске, то путь до файла
 String path;
 
 //ключ - смещение
 HashMap<Integer, Integer> keys = new HashMap<Integer, Integer>();

7. С вставкой разобрались, теперь обратная операция - поиск элемента в дереве:
* Сперва мы проверяем наличие ключа в хэш таблице в памяти. Если элемент нашелся, то на этом все.
* Если элемента нет, то мы пробегамся по всем метаиндексам в поиске нашего ключа.
* проверяем, что ключ входит в диапазон indx.min_key - indx.max_key индекса
* И для полной точности проверяем наличие ключа в хэше ключей indx.keys.containsKey
* Если элемент нашелся, то это еще не конец, цикл продолжается, пока мы не переберем все индексы.
Т.к. ключ может добавляться несколько раз в разные промежутки времени.
* Из всех совпадений выбираем индекс с максимальным счетчиком вставки - это последнее изменение ключа
public class LSMTree {
 //.....
 
 //получить значение по ключу
 String getKey(Integer key) {
  //сперва смотрим в памяти
  String val = MemTable.getKey(key);
  
  if(val == null) {
   SSTableIndex indx_with_max_lsn = null;
   //потом таблицу по индексам, которая содержит наш ключ
   //если содержится в нескольких, то берем с максимальным lsn, т.к. это последнее изменение
   for (SSTableIndex indx : indexes) {
    Integer max_lsn = 0;
    if(key >= indx.min_key && key <= indx.max_key && max_lsn < indx.max_lsn ) {
     if(indx.keys.containsKey(key)) {
      max_lsn = indx.max_lsn;
      indx_with_max_lsn = indx;
     }
    }
   }
   //читаем из таблицы с диска
   if(indx_with_max_lsn != null) {
    try {
     return indx_with_max_lsn.getByPath(key);
    } catch (IOException e) {
     e.printStackTrace();
    }
   }
  }
  
  return val;
 }
* Определим нужный индекс обращаемся по нему к таблице на диске:
** Открываем нужный файл
В реальности, скорей всего, файы постоянно держатся открытыми для экономии времени.
** Следуя смещенями из индекса переходим к нужной точке файла file.seek
** И считываем значение file.read
class SSTableIndex {
 //...
 
 //получить значение ключа из открытого файла
 String getByPath(Integer key, RandomAccessFile file) throws IOException {
  //получаем смещение в файле для ключа из индекса
  Integer offset = keys.get(key);
  
  //смещаемся в файле
  file.seek(offset);
  
  //резервируем 10 байт под переменную с длинной значения
  byte[] lenb = new byte[10];
  file.read(lenb, 0, 10);
  
  Integer len = Integer.parseInt(new String(lenb, StandardCharsets.UTF_8));
 
  file.seek(offset + 10);
  
  //считываем значение
  byte[] valb = new byte[len];
  file.read(valb, 0, len);
  
  return new String(valb, StandardCharsets.UTF_8);
 } //getByPath
 
 //получить значение ключа с диска
 String getByPath(Integer key) throws IOException {
  if(!keys.containsKey(key)) return null;
  
  RandomAccessFile file = new RandomAccessFile(path, "r");
  
  String val = getByPath(key, file);
  
  file.close();

  return val;
 }

8. Т.к. ключ может вставляться неограниченное число раз, то со временем он может находится в нескольких файлах на диске одновременно.
Также, если бы LSM дерево поддерживало транзакционность, то в файлах также хранились бы разные версии одного ключа (изменения или удаления).
Для оптимизации последующего поиска применяется операция слияния нескольких файлов в один:
* Сортируем индексы по убыванию lsn
* Последовательно считываем элементы из первого индекса и записываем в новый
* Если элемент уже присутствует в объединенном индексе, то он пропускается
* Создается новый единственный файл и индекс со всеми ключами и значениями
* Старые файлы и индексы удаляются
public class LSMTree {
 //....
 
 //объединить несколько таблиц на диске в 1 большую
 void merge() throws IOException {
  Integer min_key = null;
  Integer max_key = null;
  Integer min_lsn = null;
  Integer max_lsn = null;
  
  //сортируем таблицы по убыванию lsn
  //чтобы вначале были самые свежие ключи
  Collections.sort(indexes, new Comparator<SSTableIndex>() {
   @Override
   public int compare(SSTableIndex o1, SSTableIndex o2) {
    if(o1.max_lsn > o2.max_lsn) {
     return -1;
    } else if(o1.max_lsn < o2.max_lsn) {
     return 1;
    }
    return 0;
   }
  });
  
  SSTableIndex merge_indx = new SSTableIndex();
  
  Integer pad = 0;
  merge_indx.path = "sstable_merge.dat";
  PrintWriter writer = new PrintWriter(merge_indx.path, "UTF-8");
  
  //пробегаемся по всем индексам, чтобы слить в 1
  for (SSTableIndex indx : indexes) {
   if(min_lsn == null || indx.min_lsn < min_lsn) min_lsn = indx.min_lsn;
   if(min_key == null || indx.min_key < min_key) min_key = indx.min_key;
   if(max_key == null || indx.max_key > max_key) max_key = indx.max_key;
   if(max_lsn == null || indx.max_lsn > max_lsn) max_lsn = indx.max_lsn;
   
   RandomAccessFile file = new RandomAccessFile(indx.path, "r");
   
   //т.к. данные в таблицах не упорядочены, это приводит к рандомным чтениям с диска
   //в реальности делают упорядочнный по ключу массив, чтобы делать быстрые последовательные чтения
   for(Entry<Integer, Integer> entry : indx.keys.entrySet()) {
    //оставляем запись только с максимальным lsn
    Integer key = entry.getKey();
    if(!merge_indx.keys.containsKey(key)) {
     String val = indx.getByPath(key, file);
     
     SSItem itm = new SSItem(key, val, 0);
     String itmval = itm.get();
     
     writer.print( itmval );
     merge_indx.keys.put(key, pad);
     pad = pad + itmval.length();     
    }
   }
   //записываем и удаляем старые файлы
   file.close();
   //delete
   File fd = new File(indx.path);
   fd.delete();
  }  
  
  merge_indx.min_lsn = min_lsn;
  merge_indx.min_key = min_key;
  merge_indx.max_key = max_key;
  merge_indx.max_lsn = max_lsn;
  
  writer.close();
  
  //переименовываем к обычному имени
  File fd = new File(merge_indx.path);
  merge_indx.path = "sstable_" + merge_indx.max_lsn + ".dat";
  File fdn = new File(merge_indx.path);
  fd.renameTo(fdn);
  
  indexes = new LinkedList<SSTableIndex>();
  indexes.add(merge_indx);
  
 } //merge
В моей реализации используется хэш массив, что дает большое число случайных чтений с диска.
В реальности данные хранят в отсотированной структуре, что позволят выполнять слияние 2 отсортированных массивов в один за линейное время используя только быстрое последовательное чтение.


Полный код класса LSM дерева можно посмотреть на github

пятница, 5 января 2018 г.

Hive: авторизация и аутентификация

Сборка Cloudera Hadoop не содержит никаких настроек авторизации и аутентификации, что конечно плохо, т.к. не дает возможности разделить права среди пользователей.
Опишу один из вариантов настройки авторизации и аутентификации.

Custom аутентификация:

1. Создаем linux пользователя на сервере hadoop:
sudo useradd user1
#создаем домашнюю директорию в hdfs
sudo -u hdfs hadoop fs -mkdir /user/user1
#задаем пользователя владельцем
sudo -u hdfs hadoop fs -chown user1 /user/user1
sudo passwd user1
#лочим пользователя в linux
sudo passwd -l user1

2. Создаем пользователя в hive
sudo -u hive hive
CREATE SCHEMA user1 LOCATION "/user/user1";

3. Создаем java класс, через который будет происходить проверка пользователя:
package hive.lenta;

import java.util.Hashtable;
import javax.security.sasl.AuthenticationException;
import org.apache.hive.service.auth.PasswdAuthenticationProvider;
import java.io.*;


public class LentaAuthenticator implements PasswdAuthenticationProvider {

  Hashtable<String, String> store = null;

  public LentaAuthenticator () {
    store = new Hashtable<String, String>();
 
 try {
  readPwd();  
 } catch (IOException e) {
  e.printStackTrace();
 }
 
  } //LentaAuthenticator
  
  public void readPwd () throws IOException {
 BufferedReader br = new BufferedReader(new FileReader("/u01/jar/pwd.ini"));
 String line;
 while ((line = br.readLine()) != null) {
  String login[] = line.split(";");
  if(login.length == 2) {
   store.put(login[0].trim(), login[1].trim());
  }
 }
 br.close();

  } //readPwd

  @Override
  public void Authenticate(String user, String  password)
      throws AuthenticationException {

    String storedPasswd = store.get(user);

    if (storedPasswd != null && storedPasswd.equals(password))
      return;
     
    throw new AuthenticationException("LentaAuthenticator: Error validating user");
  }

}
В файле /u01/jar/pwd.ini лежит логин и пароль, разделенные точкой с запятой:
user1;passwd

4. Компилируем класс в объект и собираем jar файл:
компилировать нужно именно той java под которой работает hadoop:
/usr/java/jdk1.7.0_67-cloudera/bin/javac -classpath /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/hive-service.jar -d /home/askahin/auth /home/askahin/auth/LentaAuthenticator.java
/usr/java/jdk1.7.0_67-cloudera/bin/jar -cf /home/askahin/auth/jar/lentauth.jar ./hive
sudo cp jar/lentauth.jar /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/
sudo chmod 777 /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/lentauth.jar
sudo cp jar/lentauth.jar /u01/jar/lentauth.jar
sudo chmod 777 /u01/jar/lentauth.jar

5. Включаем CUSTOM аутентификацию в настройках Cloudera Manager:
прописываем наш аутентификатор в настройках:
* Ищем все настройки по паттерну "hive-site.xml" в настройках Cloudera.
Делать нужно именно через cloudera, а не напрямую в файле "hive-site.xml", т.к. менеджер переписывает файл из памяти процесса.

Нажимаем view as xml и во всех окнах и вводим
<property>
    <name>hive.server2.authentication</name>
    <value>CUSTOM</value>
  </property>
  <property>
    <name>hive.server2.custom.authentication.class</name>
    <value>hive.lenta.LentaAuthenticator</value>
</property>

Также ставим флажки suppress parameter validation
Иначе настройки не попадут в итоговый XML файл
* Для работы также нужна включенная опция hive.server2.enable.doAs = true
Она означает, что все запросы будут идти из под пользователя, который прошел аутентификацию.

5. Задаем путь до кастомной папки с jar файлами:
Настройка: «HIVE_AUX_JARS_PATH»
Значение «/u01/jar»,
Также ставим suppress parameter validation


6. Все готово, перезапускам HIVE из консоли Cloudera

7. Теперь при аутентификации нужно указывать способ:
AuthMech=3


После того как все настроили, мы уверены, что пользователь прошел аутентификацию.
Войти без пароля больше не получится.

Дальше настраиваем авторизацию - выдачу прав пользователям на конкретные объекты.
Проще всего это сделать через расширенные права linux:


ACL авторизация


1. Включаем ACL права через Cloudera Manager
настройка "dfs.namenode.acls.enabled" = true


2. После этого можно распределять права на папки:
Сброс всех прав:
sudo -u hdfs hdfs dfs -setfacl -R -b /

Даем пользователю user1 права на создание таблиц в своей схеме
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:rwx,group::r-x,other::--- /user/user1
к остальному хранилищу hive даем права только на чтение:
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:r-x,group::r-x,other::--- /user/hive/warehouse
к какой-то из таблиц закрываем доступ:
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:---,group::r-x,other::--- /user/hive/warehouse/table1
Один минус у ACL - это остается возможность выполнить DROP table, даже если нет прав доступа на чтение или запись.
Как победить эту проблему я не понял.

Так же стоит не забывать, что таким образом мы защитили только HIVE, возможность читать данные через HDFS по прежнему остались.
(Как читать данные из HDFS и HIVE через java можно посмотреть в моем github: https://github.com/pihel/java/blob/master/bigdata/Hdfs2Hive.java )

Чтобы закрыть доступ чтения через HDFS мы используем закрытие всех портов на сервере, кроме 10000 , который используется службой HIVE.

суббота, 19 августа 2017 г.

Мониторинг нагрузки Sap Hana и Mssql

Предоставления для мониторинга нагрузки на субд sap hana и mssql:

SAP HANA

Представлю основные запросы для мониторинга нагрузки на sap hana в разрезе времени (чтото похожее на awr отчеты в oracle):

1. Использование CPU, Озу и физической памяти в разрезе 10 минут:
select concat( SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15), '0') as hh,
round(100*SUM(TOTAL_CPU_USER_TIME_DELTA) / ( SUM(TOTAL_CPU_IDLE_TIME_DELTA)+SUM(TOTAL_CPU_SYSTEM_TIME_DELTA)+SUM(TOTAL_CPU_USER_TIME_DELTA)+SUM(TOTAL_CPU_WIO_TIME_DELTA) ),2) as cpu_prc,
round(AVG(INSTANCE_TOTAL_MEMORY_USED_SIZE)/1024/1024/1024,2) MEMORY_USED,
round(AVG(INSTANCE_TOTAL_MEMORY_ALLOCATED_SIZE)/1024/1024/1024,2) MEMORY_ALLOCATED,
round(100*SUM(INSTANCE_TOTAL_MEMORY_USED_SIZE) / ( SUM(INSTANCE_TOTAL_MEMORY_ALLOCATED_SIZE) ),2) MEMORY_USED_PRC,
round(AVG(FREE_PHYSICAL_MEMORY)/1024/1024/1024,2) FREE_PHYSICAL_MEMORY,
round(AVG(USED_PHYSICAL_MEMORY)/1024/1024/1024,2) USED_PHYSICAL_MEMORY,
round(100*SUM(USED_PHYSICAL_MEMORY) / ( SUM(FREE_PHYSICAL_MEMORY)+SUM(USED_PHYSICAL_MEMORY) ),2) USED_PHYSICAL_MEMORY_PRC,
round(AVG(FREE_SWAP_SPACE)/1024/1024/1024,2) FREE_SWAP_SPACE,
round(AVG(USED_SWAP_SPACE)/1024/1024/1024,2) USED_SWAP,
round(100*SUM(USED_SWAP_SPACE) / ( SUM(FREE_SWAP_SPACE)+SUM(USED_SWAP_SPACE) ),2) USED_SWAP_PRC
from _SYS_STATISTICS.HOST_RESOURCE_UTILIZATION_STATISTICS
where TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyymmddhh24mi') between '2017081410' and '201708142359'
group by SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15)
order by SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15) desc
В графическом виде это будет выглядеть так:


2. Интенсивность чтения и записи:
select concat( SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15), '0') as hh,
SUM(TOTAL_IO_TIME_DELTA)/1000/1000 as io_sec, SUM(TOTAL_READ_SIZE_DELTA) read_size, SUM(TOTAL_READ_TIME_DELTA)/1000/1000 as read_sec,
SUM(TOTAL_WRITE_SIZE_DELTA) as write_size, SUM(TOTAL_WRITE_TIME_DELTA/1000/1000) as write_sec,
SUM(TOTAL_FAILED_READS_DELTA) as failed_reads, SUM(TOTAL_FAILED_WRITES_DELTA) as failed_writes
from _SYS_STATISTICS.HOST_VOLUME_IO_TOTAL_STATISTICS
where TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyymmddhh24mi') between '2017081410' and '201708142359'
group by SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15)
order by SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15) desc
В графическом виде это будет выглядеть так:
Время чтения в 10 минут:

Размер чтений в 10 минут:


3. Топовые sql запросы по времени выполнения:
select * from (
       select v.* , ROW_NUMBER() OVER(pARTITION BY hh ORDER BY DELTA_TIME desc) as rn
       from (
             select TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24') as hh,
             AVG(AVG_EXECUTION_TIME)/1000/1000, SUM(EXECUTION_COUNT),
             AVG(AVG_EXECUTION_TIME)/1000/1000*SUM(EXECUTION_COUNT) as DELTA_TIME,
             STRING_AGG(USER_NAME), TO_VARCHAR(STATEMENT_STRING) as txt, MAX(index) as index
             from _SYS_STATISTICS.HOST_SQL_PLAN_CACHE
             where SERVER_TIMESTAMP between to_date('01.08.2017', 'dd.mm.yyyy') and to_date('02.08.2017', 'dd.mm.yyyy')
             group by TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24'), TO_VARCHAR(STATEMENT_STRING)
       ) v
)
where rn <= 5
order by hh, rn;
Получится что-то вроде такого топа:


4. Использование сети с момента сброса (system reset):
ALTER SYSTEM RESET MONITORING VIEW SYS. M_SERVICE_NETWORK_IO_RESET;
select SENDER_HOST, RECEIVER_HOST, sum(SEND_SIZE), sum(RECEIVE_SIZE), sum(SEND_DURATION)/1000/1000, sum(RECEIVE_DURATION)/1000/1000, sum(REQUEST_COUNT)
from SYS.M_SERVICE_NETWORK_IO_RESET Group BY SENDER_HOST, RECEIVER_HOST


MSSQL

топ sql запросов в разрезе показателей с последней перезагрузки бд:
SELECT  TOP 20 creation_time 
        ,last_execution_time
        ,total_physical_reads
        ,total_logical_reads 
        ,total_logical_writes
        , execution_count
        , total_worker_time
        , total_elapsed_time
        , total_elapsed_time / execution_count / 1000 avg_elapsed_time_ms
        ,SUBSTRING(st.text, (qs.statement_start_offset/2) + 1,
         ((CASE statement_end_offset
          WHEN -1 THEN DATALENGTH(st.text)
          ELSE qs.statement_end_offset END
            - qs.statement_start_offset)/2) + 1) AS statement_text,
   qp.query_plan,
   st.text
FROM sys.dm_exec_query_stats AS qs
CROSS APPLY sys.dm_exec_sql_text(qs.sql_handle) st
CROSS APPLY sys.dm_exec_query_plan(qs.plan_handle) qp
ORDER BY total_worker_time desc;

вторник, 15 августа 2017 г.

HIVE: Своя быстрая функция замен встроенной

Сегодня расскажу об одном способе ускорения запросов с аналитическими функциями в субд HIVE, работающей поверх Hadoop.

Один из вариантов ускорить HIVEQL запрос - это переписать встроенную аналитическую функцию на свой упрощенный вариант.

К примеру функция ROW_NUMBER(OVER PARTITION BY c1 ORDER BY c2) имеет достаточно сложную реализацию (github) только для того чтобы посчитать номер строки в группе.

Пример запроса с row_number:
create table tmp_table stored as orc as
select v.material, v.client_id, row_number() over (partition by v.client_id order by v.clientsum desc, v.checkcount desc) as rn
from pos_rec_itm_tst v;

Можно реализовать значительно упрощенную версию подсчета номера строки в группе.
На вход функции Rank.evaluate подаем значение группы key (то что было в partition by) и инкрементируем значение счетчика counter.
Если приходит новая группа, то счетчик сбрасывается на 0, а в переменную группы "this.last_key" записывается значение новой группы:
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
public final class Rank extends UDF{
    private int  counter;
    private String last_key;
    public int evaluate(final String key){
      if ( !key.equals(this.last_key) ) {
         this.counter = 0;
         this.last_key = key;
      }
      return (++this.counter);
    }
}
Понятно, что для правильной работы этой функции набор данных нужно предварительно отсортировать по группе партицирования "partition by", а потом по остальным полям "order by".

Пример запроса с собственной функцией:
create table tmp_table stored as orc as
select v.material, v.client_id, myrank(v.client_id) as rn from (
  SELECT client_id, clientsum, checkcount, material FROM pos_rec_itm_tst
  DISTRIBUTE BY client_id SORT BY clientsum desc, checkcount desc
) v;
Дополнительно сортировку в HIVE можно ускорить, если распараллелить мапперы по полю партицирования "partition by", а внутри этих групп сортировать по полям из "order by".
За счет параллельности мы опять же ускорим сортировку.


Чтобы создать такую функцию в HIVE нужно скомпилировать ее из java исходников:

$> /путь_до_java_который_используется_в_hive/bin/javac -classpath /путь_до_hive/lib/hive/lib/hive-serde-1.7.jar:/путь_до_hive/lib/hive/lib/hive-exec.jar:/путь_до_hadoop/lib/hadoop/client-0.20/hadoop-core.jar -d /путь_куда_компилим /путь_до_программы.java
$> /путь_до_java_который_используется_в_hive/bin/jar -cf название_jar_программы.jar com/example/hive/udf/название_класса.class

запускаем hive и регистрируем наш jar , как функцию с произвольным названием:
hive> add jar Rank.jar;
hive> create temporary function myrank as 'com.example.hive.udf.Rank';

Такое небольшое изменение ускорит выполнение запроса на 10-20%.

среда, 5 апреля 2017 г.

Oracle: columnar compression в exadata и inmemory

В связи с развитием баз данных в области колоночных inmemory хранилищ, хотел бы осветить развитие опции компрессии в субд Oracle.

Строчные сжатия

Basic compression – работает только при direct path (append) вставке
Представляет из себя дедублицирование данных.
При Update строка становится мигрированной и хранится уже не сжатой.

Oltp compression – работает при любых вставках и требует наличия опции advanced compression.
Представляет из себя дедублицирование данных.
При не «direct path» строка сначала вставляется несжатой, при накоплении % несжатых записей в блоке = PCTFREE, блок сжимается и т.д.:

При Update строка становится мигрированной и хранится сначала не сжатой, но потом при накоплении PCTFREE % несжатых записей, блок аналогично сжимается.


Колоночное сжатие

HCC - hybrid columnar compression
работает только с «Exadata или Oracle ZFS Storage Appliance или either the Pillar Axiom или Oracle FS1 storage array».
Oracle хранит компрессированную колонку в виде связки блоков (как мигрированная строка), что оставляет возможность быстрого доступа к колонке по rowid и создание индексов!.
Также требуется direct path (append) вставка.

Внутреннее устройство:
Строки данных бьются на compression unit (cu) – это достаточно большая сущность (32 КБ) и со стороны Oracle рассматривается как 1 блок.


Детальная структура CU:

CU в заголовке содержит указатели на колонки. А блоки внутри CU указатели на строки.
Т.к. все строки поделены на CU, внутри которых последовательно хранятся колонки, то добавление колонок к запросу не увеличивает число чтений (даже если строки из разных CU, число чтений будет кратно число блоков в CU * кол-во запрашиваемых CU):

Чтение 1 блока дает 417 чтений:
select /*+ MONITOR */ id, num_1000000 from t_qh where id in( 5123456);

И чтение из 2 разных блоков тоже дает 417 чтений, т.к. оба этих блока входят в 1 CU:
select /*+ MONITOR */ id, num_1000000 from t_qh where id in( 5123456, 6114557 );


Поддержка DML:
- Блокировка по умолчанию происходит на уровне CU. Если нужна блокировка на уровне строки, то таблицу нужно создавать с директивой «ROW LEVEL LOCKING», которая расширяет заголовок CU под флаги блокировок всех строк всех блоков.
- При обновлении строки, она мигрирует в другой блок и помечается сжатой как OLTP, в не зависимости от степени сжатия раньше (см. описание выше)


Сжатие колонок
Обычные способы сжатия, а не алгоритмы дедубликации:
  • Query Low – LZO (4x)
    Данные перед вставкой не сортируются
  • Query High – gzip (6x)
    Во всех других случаях данных сортируются в рамках одного CU, чтобы достичь большей компрессии (по одному из столбцов?)
  • Archive Low – gzip high (7x)
  • Archive High – bzip2 (12x)

Т.к. HCC заточено на хранилища данных, то максимальный выигрыш дают смарт сканы, т.к. разархивация происходит на стороне exadata cells. В случае индексного доступа разархивация происходит уже в БД (причем всего CU!):

Смарт скан, отрицательный offloading - возвращено больше, чем считано:
select /*+ monitor */ count(DISTINCT num_1000), count(DISTINCT num_10), count(DISTINCT num_1000000) from t_qh;


Чтение по индексу - считывается весь CU, а разархивация происходит внутри субд:
select /*+ MONITOR */ id, num_1000000 from t_qh where id in( 5123456);


Inmemory HCC

Oracle хранит данные сразу в 2 форматах: строковый и колоночный. Это могут быть как таблицы, так и matview на наборе таблиц.
Загонять данные в колонки нужно вручную.
В inmemory можно поместить часть столбцов или разную степень для разных колонок.

Размер IMCU = 1МБ ( blogs.oracle.com ) см. imcu_addr.v$im_header



Данные в IMCU всегда хранятся в порядке вставки (rowid) - docs.oracle.com
Пример:
SELECT cust_id, time_id, channel_id
FROM sales
WHERE prod_id =5;

Данные из колонки «prod_id» фильтруются, в результате чего определяются позиции на которых находятся искомые данные. Т.к. данные в колонках лежат в том же порядке (включая null), то по этим номерам позиций забираются соответствующие значения из связанных колонок (time_id, chanel_id)

Основные понятия IM HCC

Local dictionary CU
хэш массив уникальных значений в CU (в виде чисел)
min/max значение в колонке CU

«IM storage index» - для отфильтровывания CU
IMCU index хранятся прямо в заголовке CU.

SMU (Snapshot Metadata Unit) = 64КБ – метаданные о IMCU - информация о инвалидации данных в IMCU при DML (транзакционный журнал).
Для DML по прежнему используется буферный кэш, но в SMU помещается информация, что rowid был инвалидирован.
Т.е. чем больше DML, тем хуже работает IM

Repopulation – периодическое обновление измененных данных в IMCU из буферного кэша (при накоплении определенного объема). На время этой операции IMCU отключаются, используется стандартный механизм доступа.



Ручной запуск:
EXEC DBMS_INMEMORY.POPULATE('SH', 'CUSTOMERS');

При exchange partition чтобы не потерять данные IM нужно собрать данные src таблицы в IM. Тогда обмен произойдет как у таблицы, так и у IM хранилища.
При direct path (+append) вставке происходит автоматическая фоновая репопуляция.

IM expression unit
В IM кроме обычных колонок могут хранится виртуальные выражения (как колонки, там и предрасчитанные на основе статистики вызова - «IM expression statistic store»), они будут также обновляться при repopulation.

IM expression statistic store
Дополнительное сохранение агрегированных данных SELECT, WHERE, GROUP BY если они часто вызываются и требуют больших расходов на расчет.

SIMD – векторная обработка колонок
Т.к. данные для фильтрации хранятся в виде колонки-вектора, то для фильтрации можно использовать векторные возможности CPU.
Векторная операция – это когда за 1 такт процессора происходит сравнение не одной переменной, а целого вектора значений (к примеру 8)



Раньше эта технология часто использовалась в графике, например при работе с RGB значениями, цвет пикселя можно было сменить за 1 такт CPU, теперь эти технологии нашли применение и в СУБД.

Компрессия:
* «MEMCOMPRESS FOR QUERY LOW» - алгоритм по умолчанию
Похоже на дедублицирование: словарь + обнаружение повторов + замена повтором на бит ссылку
(oracle.com/technetwork )
* «MEMCOMPRESS FOR CAPACITY LOW» - Используется проприетарный алгоритм OZIP, как расширение поверх «MEMCOMPRESS FOR QUERY LOW». Нужно разжатие перед выполнением WHERE, но может выполняться прямо на CPU (нужны специальные сервера с SQL in Silicon)


IM JOIN
SELECT v.year, v.name, s.sales_price
FROM   vehicles v, sales s
WHERE  v.name = s.name;

Обычный джойн выполняется так:


Т.е. большая часть работы — это преобразование колоночного вида в строковое хранилище pga.

Над join можно произвести первую оптимизацию — bloom filter, т. е. испольлзуя данные 1 таблицы откинуть лишние CU еще до join.

Для устранения необходимости преобразования колоночного вида в строковый можно подготовить что-то типа join индекса в IM:
CREATE INMEMORY JOIN GROUP deptid_jg (hr.employees(department_id),hr.departments(department_id));

В индексе сохраняются сочетания значений «Local dictionary CU» нужных колонок таблиц, где значения = указатели на нужные CU.

В итоге JOIN будет выполняться как фильтр большей таблицы по данным меньшей:
* фильтруем колонку левой таблицы
* получаем цифровые значения строк левой таблицы из “Local dictionary CU”
* сохраняем этот массив в PGA
* применяем фильтр к правой таблице на основе нашего «INMEMORY JOIN GROUP» где хранится связка левых CU с правыми CU



В итоге получается выполнение join без преобразования колонок в строки и фильтрация идет по обычному массиву (не хэш), т. е. без потребления дополнительного CPU для хэширования и проблемы хэш массива: устранения коллизий.


IM Vector Group BY
SELECT c.customer_id, s.quantity_sold, s.amount_sold 
FROM   customers c, sales s
WHERE  c.customer_id = s.customer_id 
AND    c.country_id = 'FR';

IM Vector group by есть общее со star transformation – преобразование JOIN в фильтрацию фактовой таблицы измерениями:
* фильтруем колонку левой таблицы
* Создаем массив уникальных значений левой таблицы: 0 — нет значения, 1,2,N существующие уникальные значения
* созданные массивы в векторном режиме применяются над фактовой таблицой.

План будет выглядеть так:
SQL_ID  0yxqj2nq8p9kt, child number 0
-------------------------------------
SELECT t.calendar_year, p.prod_category, SUM(quantity_sold) FROM
times t, products p, sales f WHERE  t.time_id = f.time_id AND
p.prod_id   = f.prod_id GROUP BY t.calendar_year, p.prod_category

Plan hash value: 2377225738
------------------------------------------------------------------------------------------------------
|Id| Operation                           | Name              |Rows|Bytes|Cost(%CPU)|Time|Pstart|Pstop|
------------------------------------------------------------------------------------------------------
| 0|SELECT STATEMENT                     |                         |    |     |285(100)|        | |  |
| 1| TEMP TABLE TRANSFORMATION           |                         |    |     |        |        | |  |
| 2|  LOAD AS SELECT                     |SYS_TEMP_0FD9D6644_11CBE8|    |     |        |        | |  |
| 3|   VECTOR GROUP BY                   |                         |   5|  80 |  3(100)|00:00:01| |  |
| 4|    KEY VECTOR CREATE BUFFERED       | :KV0000                 |1826|29216|  3(100)|00:00:01| |  |
| 5|     TABLE ACCESS INMEMORY FULL      | TIMES                   |1826|21912|  1(100)|00:00:01| |  |
| 6|  LOAD AS SELECT                     |SYS_TEMP_0FD9D6645_11CBE8|    |     |        |        | |  |
| 7|   VECTOR GROUP BY                   |                         |   5| 125 |  1(100)|00:00:01| |  |
| 8|    KEY VECTOR CREATE BUFFERED       | :KV0001                 |  72| 1800|  1(100)|00:00:01| |  |
| 9|     TABLE ACCESS INMEMORY FULL      | PRODUCTS                |  72| 1512|  0  (0)|        | |  |
|10|  HASH GROUP BY                      |                         |  18| 1440|282 (99)|00:00:01| |  |
|11|   HASH JOIN                         |                         |  18| 1440|281 (99)|00:00:01| |  |
|12|    HASH JOIN                        |                         |  18| 990 |278(100)|00:00:01| |  |
|13|     TABLE ACCESS FULL               |SYS_TEMP_0FD9D6644_11CBE8|   5|  80 |  2  (0)|00:00:01| |  |
|14|     VIEW                            | VW_VT_AF278325          |  18| 702 |276(100)|00:00:01| |  |
|15|      VECTOR GROUP BY                |                         |  18| 414 |276(100)|00:00:01| |  |
|16|       HASH GROUP BY                 |                         |  18| 414 |276(100)|00:00:01| |  |
|17|        KEY VECTOR USE               | :KV0000                 |918K|  20M|276(100)|00:00:01| |  |
|18|         KEY VECTOR USE              | :KV0001                 |918K|  16M|272(100)|00:00:01| |  |
|19|          PARTITION RANGE ALL        |                         |918K|  13M|257(100)|00:00:01|1|28|
|20|           TABLE ACCESS INMEMORY FULL| SALES                   |918K|  13M|257(100)|00:00:01|1|28|
|21|    TABLE ACCESS FULL                |SYS_TEMP_0FD9D6645_11CBE8|  5 |  125|  2  (0)|00:00:01| |  |
------------------------------------------------------------------------------------------------------

Predicate Information (identified by operation id):
---------------------------------------------------
  11 - access("ITEM_10"=INTERNAL_FUNCTION("C0") AND "ITEM_11"="C2")
  12 - access("ITEM_8"=INTERNAL_FUNCTION("C0") AND "ITEM_9"="C2")

Note
-----
   - vector transformation used for this statement


IM RAC

Можно продублировать IMCU на всех нодах «DUPLICATE ALL» или разделить таблицу (DISTRIBUTE ) — часть на 1 ноде, часть на другой:
* по партициям «DISTRIBUTE BY PARTITION »
* по диапазону rowid “DISTRIBUTE BY ROWID RANGE”

воскресенье, 26 марта 2017 г.

Решение bigdata задач на Hadoop mapreduce

Hadoop

Набор утилит, библиотек и фреймворк для разработки и выполнения распределённых программ, работающих на кластерах из сотен и тысяч узлов.

Ссылка на установку: http://www.cloudera.com/content/www/en-us/downloads.html
На основе курса

hadoop
Hadoop состоит из 2 основных частей hdfs и map reduce. Рассмотрим подробней.

Hdfs

- распределенная файловая система. Это значит, что данные файла распределены по множеству серверов.

* hdfs лучше работает с небольшим числом больших файлов/блоков
* один раз записали, много раз считали
* можно только целиком считать, целиком очистить или дописать в конец (нельзя с середины)
* файлы бьются на блоки split (к примеру 64мб)
 ** размер блока выбирается так, чтобы нивилировать время передачи блока по сети (если сеть быстрая, то блок можно поменьше, если медленная, то больше)
* все блоки реплицируются с фактором 3 поумолчанию (хранятся в 3 копиях на разных серверах)
 ** это обеспечивает высокую пропускную способность, но не скорость реакции

Hdfs java api

Пример программы копирующей один файл в другой:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.InputStream;

Path source_path = new Path(args[0]);
Path target_path = new Path(args[1]);

Configuration conf = new Configuration();
FileSystem source_fs = FileSystem.get(source_path.toUri(), conf);
FileSystem target_fs = FileSystem.get(target_path.toUri(), conf);

//доступ к нескольким файлам по маскам
//FileStatus [] files = fs.globStatus(glob); 
//?, *, [abc], [^a], {ab, cd}

FSDataOutputStream output = target_fs.create(target_path);
InputStream input = null;
try{
    input = source_fs.open(source_path);
    IOUtils.copyBytes(input, output, conf);
} finally {
    IOUtils.closeStream(input);
    IOUtils.closeStream(output);
}

Парадигма map reduce

1. input file: split 1 + split 2 + split 3 (блоки файла). Каждый сплит хранится на своем сервере кластера.
2. mapper:
 * на каждый split N создается worker для обработки
 * worker выполяется на сервере, где находится блок
 * workerы не могут обмениваются между собой данными
  ** т.е. mapper подходит для независимых данных, которые легко побить на части.
   зависимые данные (типа архива zip) нельзя побить на части в mapper:
    все блоки (split) будут переданы в один mapper
 * в случае ошибки процесс рестратует на другой копии блока hdfs
 * combine - (необязательный шаг) - reducer внутри mappera агрегирующие данные в меньшее число данных (ключ менять нельзя)
  ** агрегирует только данные одного маппера
 * partition - (необязательный шаг) - определение куда отправится ключ (поумолчанию hash(k) mod N , где N - число reducer )
3. результат mapper записывается в виде ключ значение {k->v} в циклический буфер в памяти, конец списка пишется на локальный диск сервера с worker
  ** это сделано в целях производительности, чтобы не реплицировать
  ** но в случае ошибки, данные нельзя будет восстановить и потребует рестартовать mapper
4. данные из {k->v} расходятся на сервера reduccer на основе ключа.
Т.е. данные с разных mapperов но с одним key попадут в один reducer
5. Результат reducer складываются в hdfs. Число файлов = число reducer

Hadoop streaming

Инструмент для быстрого прототипирования map reduce задач.
можно писать python программы заготовки, общение маппера с редусером проиходит чреез stdin/out.
Данные передаются как текст разделенный табом

word count
#!/usr/bin/python
import sys
for line in sys.stdin:
  for token in line.strip().split(" "):
    if token: print(token + '\t1')
 
#!/usr/bin/python
import sys
(lastKey, sum)=(None, 0)
for line in sys.stdin:
  (key, value) = line.strip().split("\t")
  if lastKey and lastKey != key:
    print (lastKey + '\t' + str(sum))
    (lastKey, sum) = (key, int(value))
  else:
    (lastKey, sum) = (key, sum + int(value))
if lastKey:
  print (lastKey + '\t' + str(sum))

Запуск:
hadoop jar $HADOOP_HOME/hadoop/hadoop-streaming.jar \
-D mapred.job.name="WordCount Job via Streaming" \
-files countMap.py, countReduce.py \
-input text.txt \
-output /tmp/wordCount/ \
-mapper countMap.py \
-combiner countReduce.py \
-reducer countReduce.py

Java source code example

Программа подсчитывающая число слов в hdfs файле.

word count

public class WordCountJob extends Configured implements Tool {
 static public class WordCountMapper extends Mapper < LongWritable, Text, Text, IntWritable > {
  private final static IntWritable one = new IntWritable(1);
  private final Text word = new Text();
  
  @Override
  protected void map(LongWritable key, Text value, Context context)  throws IOException,  InterruptedException {
     //разбиваем строку (key) на слова и передаем пару: слово, 1
     StringTokenizertokenizer = new StringTokenizer(value.toString());
     while (tokenizer.hasMoreTokens()) {
      text.set(tokenizer.nextToken());
      context.write(text, one);
     }
  } //map
 } //WordCountMapper
  
 static public class WordCountReducer  extends Reducer < Text,  IntWritable,  Text,  IntWritable > {
   @Override
   protected void reduce(Text key, Iterable < IntWritable > values, Context context)   throws IOException,  InterruptedException {
     //в редусер приходят все единицы от одного слова
    intsum = 0;
    for (IntWritable value: values) {
     sum += value.get();
    }
    context.write(key, new IntWritable(sum));
   } //reduce
 } //WordCountReducer
  
  @Override
  public int run(String[] args) throws Exception {
   Job job = Job.getInstance(getConf(), "WordCount");
   //класс обработчик
   job.setJarByClass(getClass());
   //путь до файла
   TextInputFormat.addInputPath(job, new Path(args[0]));
   //формат входных данных (можно отнаследовать и сделать собственный)
   job.setInputFormatClass(TextInputFormat.class);
   //классы мапперов, редусееров, комбайнеров и т.д.
   job.setMapperClass(WordCountMapper.class);
   job.setReducerClass(WordCountReducer.class);
   job.setCombinerClass(WordCountReducer.class);
   //выходной файл
   TextOutputFormat.setOutputPath(job, new Path(args[1]));
   job.setOutputFormatClass(TextOutputFormat.class);
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(IntWritable.class);
   return job.waitForCompletion(true) ? 0 : 1;
  } //run
  
  public static void main(String[] args) throws Exception {
   int exitCode = ToolRunner.run( new WordCountJob(), args);
   System.exit(exitCode);
  } //main
} //WordCountJob

Такая программа будет тратить все время на передачу данных от маппера к редусеру.

Её можно усовершенствовать несколькими способами:
* Сделать комбайнер - чтобы данные из одного маппера были сагрегированы там.
 - не обязательно вызывается
 +/- агрегирует данные только одного вызова маппера (одной строки)
* хэш аггегировать внутри маппера:
 + обязательно выполнится
 +/- агрегирует данные только одного вызова маппера (одной строки)
 - нужно следить за размером хэш массива
* хэш массив сверху маппера, а внутри маппера обрабатывать
 + обязательно выполнится
 + агрегирует все вызовы маппера (сплита)
 - нужно следить за размером хэш массива

Sql операции

Разность (minus) и другое по аналогии
// на вход подаются элементы из двух множеств A и B
class Mapper
method Map(rowkey key, value t)
 Emit(value t, string t.SetName) // t.SetName либо ‘A‘ либо ‘B'
class Reducer
// массив n может быть [‘A'], [‘B'], [‘A' ‘B‘] или [‘B', ‘A']
method Reduce(value t, array n)
 if n.size() = 1 and n[1] = 'A'
  Emit(value t, null)
* hash join
class Mapper
method Initialize
 H = new AssociativeArray : join_key -> tuple from A //хэшируем меньшую таблицу целиком в память
 A = load()
 for all [ join_key k, tuple [a1, a2,...] ] in A
  H{k} = H{k}.append( [a1, a2,...] )

method Map(join_key k, tuple B)
 for all tuple a in H{k} //ищем в A соответствие ключа из B
  Emit(null, tuple [k a B] ) //если нашли, то записываем

Операции на графах

оптимальное представление в виде списка смежности, т.к. матрица смежности занимает слишком много места
* поиск кратчайшего пути
 ** обычный алгоритм - дейкстра
 ** поиск в ширину (bfs)
  на невзвешенном графе (нет связей назад):
  - Останавливаемся, как только расстояния до каждой вершины стало известно
  - Останавливаемся, когда пройдет число итераций, равное диаметру графа
  на взвешенном графе (есть обратные связи):
  - Останавливаемся, после того, как расстояния перестают меняться
* PR = a (1/N) + (1-a)* SUM( PR(i) / C(i) )
 ** N - начальное число точек в графе
 ** C - число исходящих точек
 ** a - вероятность случайного перехода
 ** pr решается только приблизительно:
  - останавливаем, когда значения перестают меняться
  - фиксированное число итерации
  - когда изменение значение меньше определенного числа
* проблемы
 ** необходимость передавать весь граф между всеми задачами
 ** итеративный режим, т.е. переход на следующий этап, когда все параллельные задачи выполнятся
* оптимизации:
 ** inmemory combining - агрегируем сообщения в общем массиве
 ** партицирование - смежные точки оказались на одном маппере
 ** структуру графа читать из hdfs, а не передавать + партицирование

Высокоуровневые языки поверх hdfs или hadoop:

Pig

высокоуровневый язык + компилятор в mapreduce
используется для быстрого написания типовых map-reduce задач

основные составляющие:
* field - поле
* tuple - кортеж (1, "a", 2)
* bag - коллекция кортежей {(), ()}
 ** коллекции могут содержать разное кол-во полей в кортеже (также они могут быть разных типов)

word count:
input_lines = LOAD 'file-with-text' AS (line:chararray);
words = FOREACH input_lines GENERATE FLATTEN(TOKENIZE(line)) AS word;
filtered_words = FILTER words BY word MATCHES '\\w+';
word_groups = GROUP filtered_words BY word;
word_count = FOREACH word_groups GENERATE COUNT(filtered_words) AS count,
group AS word;
ordered_word_count = ORDER word_count BY count DESC;
STORE ordered_word_count INTO ‘number-of-words-file';

join
//Загрузить записи в bag #1
posts = LOAD 'data/user-posts.txt' USING PigStorage(',') AS (user:chararray,post:chararray,date:long);
//Загрузить записи в bag #2
likes = LOAD 'data/user-likes.txt' USING PigStorage(',') AS (user:chararray,likes:int,date:long);
userInfo = JOIN posts BY user, likes BY user;
DUMP userInfo;

Hive

sql подобный язык hivesql поверх hadoop
основные составляющие - как в бд
метаданные могут храниться в mysql /derby

* создание таблицы
hive> CREATE TABLE posts (user STRING, post STRING, time BIGINT)
 > ROW FORMAT DELIMITED
 > FIELDS TERMINATED BY ','
 > STORED AS TEXTFILE;

* при нарушении схемы ошибки не будет, вставится NULL

join
hive> INSERT OVERWRITE TABLE posts_likes
 > SELECT p.user, p.post, l.count
 > FROM posts p JOIN likes l ON (p.user = l.user);

word count
CREATE TABLE docs (line STRING);
LOAD DATA INPATH 'docs' OVERWRITE INTO TABLE docs;
CREATE TABLE word_counts AS
SELECT word, count(1) AS count FROM
(SELECT explode(split(line, '\s')) AS word FROM docs) w
GROUP BY word
ORDER BY word;

Nosql

* Масштабирование:
 ** мастер-слейв: распределение нагрузки cpu, io остается общим
 ** шардинг: деление данных таблицы по серверам: распределение io, cpu Общее
* cap theorem - достигается только 2 из 3
 ** consistency - непротиворечивость
  Чаще всего жертвует строгой целостностью в текущий момент, на целостность в конечном счете
 ** aviability - доступность
 ** partioning - разделение данных на части
* типы:
 ** ключ-значение
 ** колоночная
 ** документоориентировання
 ** графовая

Hbase

- поколоночная база
 ** строки в таблицах шардируются на сервера
  *** строки хранятся в отстортированном виде, чтобы можно было искать по серверам
  *** список шардов хранится в HMaster
   через него определяется сервер для чтений, а дальше чтение идет уже напряму с него миную hmaster
   ! возможная точка отказа
 ** столбцы объединяются в column family
  *** каждая такая группа сохраняется также в отдельный файл
  *** для всей группы задается степень сжатия и другие физические настройки
 ** изменение/удаление происходит через вставку нового значения в колонку с TS
  *** при select выбираются последние 3 изменения (если в колонке не было изменений, то берется предыдущая максимальная версия)
 ** промежуточные данные хранятся в memstore сервера и логе изменений
  *** периодически данные из памяти скидываются на hdfs
 ** hfile - мапится в файл hdfs, который также splitit'ится на разные сервера
 ** сжатие файлов хранилища:
  *** объединение файлов CF в один
  *** очистка от удаленных записей

 Массовость - деление данных на сервера по:
 * по семейству колонок
 * по диапазону ключей
 * по версии строки
 * сам файл разбивается в hdfs на сплиты

Cassandra

отличия:
** есть sql, а не только java api
** CF имеют иерархичность и могут также объединяться в группы по какомуто признаку
** настраевая consistency от самой строгой - ожидание отклика всех серверов, то записи без проверок
** нет единой точки отказа hmaster, данные делятся по диапазону
 ** но усложняет деление и поиск данных

* greenplum - классическая колоночная бд, с шардирование данных на основе pgsql
 ** есть поддержка sql

Spark

Альтернативный вариант реализации hadoop.

* Преимущества перед hadoop:
1. Необязательно сохранять промежуточные результаты в HDFS при итеративной разработке. Данные можно хранить в памяти.
2. Необязательно чередовать map-reduce, можно делать несколько reducer подряд
3. Операции в памяти (вкллючая синхронизацию между серверами), а не через hdfs
4. Возможность загрузить данные в распределенную память, а потом делать обработку (каждому mapper уже не надо считывать данные из hdfs)

* В основе может лежать любой вид хранилища данных
* Архитектура, как в hadoop: name node (driver) + worker

* Основная составляющая вещь - RDD:
 ** объект партицируется
 ** каждая партиция имеет произвольное хранение: память, диск, hdfs и т.д.
  *** Для хранения данных в памяти используется LRU буфер.
   Старые, редко используемые данны вытесняются в медленное хранилище
 ** объект неизменяем (изменение создает новый объект)
 ** rdd распределена по серверам

* Программа состоит из однонаправленного графа операций без циклов
 Виды зависимостей в графе:
 ** Narrow (узкая) - переход 1 партииции в 1 другую
 ** wide (широкая) - переход 1 (N) партиций в N (1)

* Восстановление в случае падения:
 - нет репликации как в hdfs
 + для воостановления используется перезапуск
  ** если потерялись данные из одной партиции в Narrow связи, то восстанавливается только она из предыдущей партиции
  ** если в wide то перезапускаются все партиции (т.к. связ не 1 к 1).
   Результат wide поумолчанию сохраняется на диск.
 ** Если есть шанс потерять данные, а цепочка длинная, то лучше самим самостоятельно сохранять промежуточные результаты на диск.

* Типы операции состоят из:
 ** Трансформация - не запускают работу
  map, filer, group, union, join ....
 ** Действий - запускают работу (все трансформации в графе до этого графа)
  count, collect, save, reduce

* spark поддерживает:
 ** java
 ** scala
 ** python

Join на scala
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkContext._
  val sc = new SparkContext(master, appName, [sparkHome], [jars])
 
  //transform
  //или из текстового файла  sc.textFile("file.txt")
  val visits = sc.parallelize(
   Seq(("index.html", "1.2.3.4"),
   ("about.html", "3.4.5.6"),
   ("index.html", "1.3.3.1")))
  val pageNames = sc.parallelize(
   Seq(("index.html", "Home"),
   ("about.html", "About")))
  visits.join(pageNames)
  
  //action
  visits.saveAsTextFile("hdfs://file.txt")
  // ("index.html", ("1.2.3.4", "Home"))
  // ("index.html", ("1.3.3.1", "Home"))
  // ("about.html", ("3.4.5.6", "About"))
word count
val file = sc.textFile("hdfs://...")
val sics = file.filter(_.contains("MAIL"))
val cached = sics.cache()
val ones = cached.map(_ => 1)
val count = ones.reduce(_+_)
val file = sc.textFile("hdfs://...")
val count = file.filter(_.contains(“MAIL")).count()
или
val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
 .map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

* Переменные для обмена данными между серверами
 ** broadcast variables - переменная только для чтения
  val broadcastVar = sc.broadcast(Array(1, 2, 3))
  ...
  broadcastVar.value 
 ** Accumulators - счетчики
  val accum = sc.accumulator(0)
  accum += x
  accum.value

Yarn

планировщик ресурсов, которые избавляет от недостатков классического MR:
* жесткое разделенеи ресурсов: можно освободить ресурсы от mapper после их полного выполнения и отдать все ресурсы reducer
* возможность разделять ресурсы не только между MR Задачами, но и другими процессами

* разделение происходит в понятии: озу, cpu, диск, сеть и т.д.
* он же производит запуск задач (не только MR) и восстанавливает в случае падения
* для работы нужны вспомогательные процессы:
  ** appserver - получение комманд от клиента
  ** resource server - север для выделяющий ресурсы
  ** node server - сервер для запуска задач и отслеживания за ними (на каждом сервере)

пятница, 17 февраля 2017 г.

Oracle: Hash агрегация и приблизительный DISTINCT

Список статей о внутреннем устройстве Oracle:
Нововведение Oracle 12.1 - приблизительный подсчет уникальных значений ( APPROX_COUNT_DISTINCT ~= count distinct ).
Классический подход к расчету числа уникальных значений предполагает создание хэш массива, где ключ = колонке таблицы, а в значении число совпадений.
Число элемнтов в хэш массиве будет равно числу уникальных данных в колонке таблицы:
class RealDist {
 HashMap words;

 public RealDist() {
  words = new HashMap();
 }

 public void set(String word) {
  if (!words.containsKey(word)) {
   words.put(word, 1);
  } else {
   words.put(word, words.get(word) + 1);
  }
 } //set

 public int get() {
  return words.size();
 } //get
} // RealDist

Этот способ всем хорош, пока у нас достаточно памяти под хранение хэш массива.
При росте размера данных время и потребляемые ресурсы начинают катастрофически расти.

Пример со страницы antognini с разным числом уникальных значений на 10 млн. строк:
CREATE TABLE t
AS
WITH
  t1000 AS (SELECT /*+ materialize */ rownum AS n
   FROM dual
   CONNECT BY level <= 1E3)
SELECT rownum AS id,
    mod(rownum,2) AS n_2,
    mod(rownum,4) AS n_4,
   mod(rownum,8) AS n_8,
   mod(rownum,16) AS n_16,
   mod(rownum,32) AS n_32,
   mod(rownum,64) AS n_64,
   mod(rownum,128) AS n_128,
   mod(rownum,256) AS n_256,
   mod(rownum,512) AS n_512,
   mod(rownum,1024) AS n_1024,
   mod(rownum,2048) AS n_2048,
   mod(rownum,4096) AS n_4096,
   mod(rownum,8192) AS n_8192,
   mod(rownum,16384) AS n_16384,
   mod(rownum,32768) AS n_32768,
   mod(rownum,65536) AS n_65536,
   mod(rownum,131072) AS n_131072,
   mod(rownum,262144) AS n_262144,
   mod(rownum,524288) AS n_524288,
   mod(rownum,1048576) AS n_1048576,
   mod(rownum,2097152) AS n_2097152,
   mod(rownum,4194304) AS n_4194304,
   mod(rownum,8388608) AS n_8388608,
   mod(rownum,16777216) AS n_16777216
FROM t1000, t1000, t1000
WHERE rownum <= 1E8;

Скорость вычислений классическим способом падает вместе с ростом числа уникальных значений:


Пропорционально времени работы растет и потребление PGA памяти под хранение хэш массива:


Чтобы решить проблему производительности/памяти и при этом получить приемлемый результат, можно применить приблизительный расчет числа уникальных значений.
Как и bloom filter он основан на вероятностных хэш значениях от данных.

Общий смысл приблизительного расчета (habr):

1. От каждого значения берется произвольная хэш функция
public static int fnv1a(String text) {
 int hash = 0x811c9dc5;
 for (int i = 0; i < text.length(); ++i) {
  hash ^= (text.charAt(i) & 0xff);
  hash *= 16777619;
 }
 return hash >>> 0;
} //fnv1a

2. У каждого хэш значения определяется ранк первого ненулевого бита справа.
// позиция первого ненулевого бита справа
public static int rank(int hash, int max_rank) {
 int r = 1;
 while ((hash & 1) == 0 && r <= max_rank) {
  r++;
  // смещаем вправо, пока не дойдет до hash & 1 == 1
  hash >>>= 1;
 }
 return r;
} //rank

Вероятность того, что мы встретим хеш с рангом 1 равна 0.5, с рангом 2 — 0.25, с рангом r — 1 / 2ранг
Если запоминать наибольший обнаруженный ранг R, то 2R сгодится в качестве грубой оценки количества уникальных элементов среди уже просмотренных.

Такой подход даст нам приблизительное число уникальных значений, но с сильной ошибкой.

Для данного алгоритма есть усовершенствованный вариант HyperLogLog:

а. От битового представления хэш берется 8 первых левых бит. Они становятся ключом массива (2^8 = 256 значений)
public void set(String word) {
 int hash = fnv1a(word);
 // убираем 24 бита из 32 справа - остается 8 левых
 // (=256 разных значений)
 int k = hash >>> 24;
 
 // если коллизия, то берем наибольший ранк
 hashes[k] = Math.max(hashes[k], rank(hash, 24)); 
} // count

б. Над массивов вычисляется формула:

где am - корректирующий коэффициент, m - размер массива = 256, M[] - массив ранков (hashes)

Для погрешности в 6.5% числитель этой формулы будет равен константе = 47072.7126712022335488.
Остается в цикле возвести 2 в -степень максимального ранка из каждого элемента массива:
public double get_loglog() {
 double count = 0;

 for (int i = 0; i < hashes.length; i++) {
  count += 1 / Math.pow(2, hashes[i]);
 }

 return 47072.7126712022335488 / count;
} //get_loglog

Данный алгоритм уже дает приемлемый результат на больших данных, но серьезно ошибается на небольшом наборе.
Для небольших значение применяется дополнительная математическая корректировка:
public int get() { 
 long pow_2_32 = 4294967296L;

 double E = get_loglog();

 // коррекция
 if (E <= 640) {
  int V = 0;
  for (int i = 0; i < 256; i++) {
   if (hashes[i] == 0) {
    V++;
   }
  }
  if (V > 0) {
   E = 256 * Math.log((256 / (double) V));
  }

 } else if (E > 1 / 30 * pow_2_32) {
  E = -pow_2_32 * Math.log(1 - E / pow_2_32);
 }
 // конец коррекции

 return (int) Math.round(E);
} //get

Проведенных тест на 1-2 томах войны и мира показал результаты:
* реальное количество уникальных слов = 3737
* приблизительное число уникальных слов основываясь только на максимальном ранке первого ненулевого бита справа = 16384
* приблизительное число уникальных слов по формуле из п. б на массиве из 256 элементов = 3676
* приблизительное число уникальных слов по формуле и корректировке = 3676

При ошибке числа уникальных значений в 6,1% мы затратили константное число памяти под массив из 256 элементов, что в разы меньше, чем при обычном хэш массиве.

Полный исходный код на java можно посмотреть на github

вторник, 27 декабря 2016 г.

Oracle: Lru буферный кэш

Список статей о внутреннем устройстве Oracle:

Следующий аспект больших бд, у которых все данные не способны поместиться в памяти - кеширование частоиспользуемых данных в памяти.

В Oracle для кэширования используется модифицированный алгоритм LRU.

Lru (least recent used) кэш — буфер в памяти для быстрого доступа к часто используемым элементам.
Lru кэш представляет из себя двусвязный список (2Linked List), вначале которого наиболее часто используемые элементы, а в конце - редко.
Для ускорения произвольного доступа над списком создается хэш массив (Hash Map) ссылок на элементы двусвязанного списка:

Раньше я уже описывал lru кэш со стороны разработчика бд: Ядро oracle, сегодня копнем немного глубже, с кодом lru кэша на java.


Двусвязный список с дополнительным счетчиком обращений для хранения LRU:
//двухсвязный список
class Node<Value> {
 //ключ списка
 int key;
 
 //абстрактное значение
 Value value;
 
 //счетчик обращений к элементу
 int cnt;
 
 //указатель на предыдущий элемент
 Node prev;
 
 //указатель на следующий элемент
 Node next;
 
 //элемент был перемещен из конца списка в начало
 boolean swaped;
 
 public Node(int key, Value value){
  this.key = key;
  this.value = value;
  this.cnt = 1;
  this.swaped = false;
 }

При обращении к элементу списка увеличиваем счетчик обращений "cnt":
 //получить значение из списка
 public Value getValue() {
  //увеличиваем счетчик обращений
  cnt++;
  //сбрасываем признак смещений, если элемент был прочитан
  this.swaped = false;
  
  return value;
 }
 
 //установить значение
 public void setValue(Value val) {
  this.value = val;
  //также увеличиваем счетчик
  cnt++;
  //сбрасываем признак смещений, если элемент был перезаписан
  this.swaped = false;
 } //setValue
} //Node

Класс Lru дополняется хэш массивом "map" в котором для ускорения доступа количество хэш секций (capacity) >= числу элемнтов в списке;
И стандартные указатель на начало "head" и конец "end" списка.
Дополнительное изменение в Oracle - указатель на середину списка "cold", где начинаются холодные данные.
public class Lru<Value> {
 
 //доступной число элементов в кэше
 int capacity;
 
 //хэш массив элементов для быстрого доступа
 HashMap<Integer, Node> map;
 
 //указатель на начало (горячие элементы)
 Node head = null;
 
 //указатель на середину (начало холодных элементов)
 Node cold = null;
 
 //указатель на конец (самый редкоиспользуемый)
 Node end = null;
 
 //число элементов в кэше
 int cnt;
 
 
 //конкструктор с числом элементов в кэше
 public Lru (int capacity) {
  this.capacity = capacity;
  
  //хэш массив создаем с нужным числом секций = загруженности
  map = new HashMap<Integer, Node>(capacity);
 }

Для получения элемента используется хэш массив, без последовательного просмотра всего списка:
 //получить элемент из кэша
 public Value get(int key) {
  //быстрое извлечение их хэш массива
  if(map.containsKey(key)) {
   //и инкремент счетчика обращений
   return (Value) map.get(key).getValue();
  }
  
  return null;
 } //get

Если элементов в списке меньше размер буфера, то используется обычный алгоритм LRU - новые элементы добавляются в начало списка, а старые вытесняются вправо, пока не дойдут до конца списка.
 //места достаточно, добавляем вначало
 protected void addHead(Node n) {
  
  //первый элемент
  if(this.head == null) {
   //устанавливаем начало и конец = элементу
   this.head = n;
   this.end = n;
  } else {
   
   //вставляем вначало
   
   //следующий для нового элемента = начало списка
   n.next = this.head;
   
   //предыдущий для начала списка = новый элемент
   this.head.prev = n;     
   this.head = n;
   
   //второй элемент
   if(this.end.prev == null) {
    //предыдущий для конца = новый элемент
    this.end.prev = n;
   }
  }    
  
  //устанавливаем середину
  if(cnt == capacity / 2) {
   this.cold = n;
  }
  
  //счетчик элементов + 1
  cnt++;
 } //addHead

У этого подхода есть существенный минус: редкоиспользуемый элемент может случайно вытеснить из списка популярный блок, который не вызывался небольшой период времени, за который он успел сместиться до конца.
Для решения этой проблемы Oracle применяет 2 подхода:
1. Если в конце элемент со счетчиком обращений = 1, то он открепляется от списка, а новый блок помещается в среднюю точку cold.
Для этого ссылки соседей открепляются друг от друга и перенацеливаются на новый блок, который становится новой серединой.
 //удаляем конца списка
 private void delEnd() {
  //из хэш массива
  map.remove(this.end.key);
  
  //и делаем концом списка = предыдущий элемент
  this.end = this.end.prev;
  this.end.next = null;
 } //delEnd
 
 //вконце малопопулярный блок
 protected void addColdUnPop(Node n) {
  //удаляем конец
  delEnd();
  
  //у старой середины изменяем счетчик на 1
  if(this.cold.swaped) {
   //если смещенный элемент не был ни разу считан 
   //и дошел до середины, 
   //то сбрасываем счетчик в 1
   this.cold.cnt = 1;
   this.cold.swaped = false;
  }
  
  //новый блок в середину = cold
  
  //проставляем ссылки у нового элемента
  n.prev = this.cold.prev;
  n.next = this.cold;
  
  //и разрываем связи и соседей
  n.prev.next = n;
  n.next.prev = n;
  this.cold = n;
 } //addColdUnPop

2. Если в конце популярный элемент со счетчиком > 1 , тогда последний блок открепляется, счетчик обращений делится на 2 и этот блок перемещается в начало списка.
Такой элемент помечается флагом swaped = true, который сбрасывается при любом последующем обращении к элементу.
Это предотвращает случайное вытеснение популярного блока из памяти.
Середина списка также смещается влево, одновременно делая число обращений = 1 у новой элемента-середины, если к ней никто не обращался за время смещения от начала до середины (swaped = true).
Новый блок помещается в центр списка cold.
 //вконце популярный блок
 protected void addColdPop(int key, Value value) {
  //делим счетчик на пополам
  this.end.cnt = this.end.cnt / 2;  
  
  //открепляем конец
  Node n = this.end;
  
  //удаляем конец
  delEnd();
  
  //конец перемещаем в начало
  n.prev = null;
  n.next = this.head;
  this.head.prev = n;     
  this.head = n;
  
  //помечаем, что элемент был перемещен из конца в начало
  this.head.swaped = true;
  
  //смещаем середину на 1 влево
  if(this.cold.swaped) {
   //если смещенный элемент не был ни разу считан 
   //и дошел до середины, 
   //то сбрасываем счетчик в 1
   this.cold.cnt = 1;
   this.cold.swaped = false;
  }
  this.cold = this.cold.prev;
  
  //рекурсивно пытаемся вставить вконец
  //TODO: если все популярные? то вставка будет идти очень долго
  this.set(key, value);
 } //addColdPop


События ожиданий связанные с наполнением буферного кэша:
* Buffer busy waits / read by other session - сессия пытается считать блок, который сейчас читается в кэш или модифицируется в кэше другой сессией.

Полный исходный код можно посмотреть тут: https://github.com/pihel/java/blob/master/cache/lru.java

среда, 14 декабря 2016 г.

Oracle: вероятностный Bloom filter

Список статей о внутреннем устройстве Oracle:

Более редкая разновидность хэширования данных и вероятностной фильтрации данных - bloom filter
Раньше я уже описывал bloom filter в параллельных запросах со стороны разработчика бд: Oracle: оптимизация параллельных запросов, сегодня копнем немного глубже, с кодом bloom filter на java.


Bloom filter — это вероятностная структура данных, позволяющая компактно хранить множество элементов и проверять принадлежность заданного элемента к множеству. При этом существует возможность получить ложноположительное срабатывание (элемента в множестве нет, но структура данных сообщает, что он есть), но не ложноотрицательное.
Фильтр Блума может использовать любой объём памяти, заранее заданный пользователем, причём чем он больше, тем меньше вероятность ложного срабатывания.

Пример фильтра Блума с 18 бит в карте и 3 функциям хэширования, хранящего множество {x, y, z}. Цветные стрелки указывают на места в битовом массиве, соответствующие каждому элементу множества. Этот фильтр Блума определит, что элемент w не входит в множество, так как один из соответствующих ему битов равен нулю.

Основное предназначение - предфильтрация данных в источнике до передачи их в базу данных.
Принцип применения блум фильтра при join:
1. Левая таблица читается с диска и фильтруется (plan line = 3)
2. На основе левой таблицы создается битовая карта блум фильтра (:BF0000 plan line = 2)
3. Блум фильтр передается в источник правой таблицы (:BF0000 plan line = 4)
4. Строки правой таблицы фильтруются через блум фильтр (:BF0000 plan line = 5 - storage(SYS_OP_BLOOM_FILTER(:BF0000,"R"."L_ID")))
5. В бд передается уже частично отфильтрованная правая таблица
6. Выполняется join (plan line = 1)
create table l as select level as id, 'name_' || level as title, rpad('*', level) as pad from dual connect by level <= 50;
create table r as select rownum as id, mod(rownum, 50) as l_id, rpad('*', 20) as pad 
from (select * from dual connect by level <= 1000 ) join (select * from dual connect by level <= 1000 ) on 1=1;

begin
DBMS_STATS.GATHER_TABLE_STATS(USER, 'L');
DBMS_STATS.GATHER_TABLE_STATS(USER, 'R');
end;

explain plan for
select * 
from l
join r on l.id = r.l_id
WHERE l.title = 'name_5';

select * from table(dbms_xplan.display(format=>'ALLSTATS ALL ADVANCED'));

Plan hash value: 3967001914
 
----------------------------------------------------------------------------------------
| Id  | Operation                   | Name    | E-Rows |E-Bytes| Cost (%CPU)| E-Time   |
----------------------------------------------------------------------------------------
|   0 | SELECT STATEMENT            |         |  20000 |  1308K|   184  (20)| 00:00:01 |
|*  1 |  HASH JOIN                  |         |  20000 |  1308K|   184  (20)| 00:00:01 |
|   2 |   JOIN FILTER CREATE        | :BF0000 |      1 |    38 |     2   (0)| 00:00:01 |
|*  3 |    TABLE ACCESS STORAGE FULL| L       |      1 |    38 |     2   (0)| 00:00:01 |
|   4 |   JOIN FILTER USE           | :BF0000 |   1000K|    27M|   171  (15)| 00:00:01 |
|*  5 |    TABLE ACCESS STORAGE FULL| R       |   1000K|    27M|   171  (15)| 00:00:01 |
----------------------------------------------------------------------------------------
 
Query Block Name / Object Alias (identified by operation id):
-------------------------------------------------------------
 
   1 - SEL$58A6D7F6
   3 - SEL$58A6D7F6 / L@SEL$1
   5 - SEL$58A6D7F6 / R@SEL$1
 
Predicate Information (identified by operation id):
---------------------------------------------------
 
   1 - access("L"."ID"="R"."L_ID")
   3 - storage("L"."TITLE"='name_5')
       filter("L"."TITLE"='name_5')
   5 - storage(SYS_OP_BLOOM_FILTER(:BF0000,"R"."L_ID"))
       filter(SYS_OP_BLOOM_FILTER(:BF0000,"R"."L_ID"))

Детальное описание с кодом на java:
1. Блум фильтр состоит из массива бит произвольной длинны. В моем случае под битовый массив выделена переменная long в 64 бита:
package Hash;

import java.util.concurrent.ThreadLocalRandom;

public class BloomFilter {

  //long переменная в 64бита под битовый массив
  private long data;
  
  //битов в битовой карте = числу битов в long
  private int bit_array_size = Long.SIZE;

2. Определяется hash_num функций хэширования. На входе функции произвольная строка, на выходе номер бита в битовой карте для установки в 1.
  //примесь для случайного хэширования
  private int seed = ThreadLocalRandom.current().nextInt(1, bit_array_size);
  
  //хэшировани = номер бита в битовом массиве
  public long hashCode(String s, int hash_num) {
    long result = 1;
    
    //для каждого байта в строке
    for (int i = 0; i < s.length(); ++i) {
      //применяем хэш функцию под номером hash_num и обрезаем по маске
     
     //простая хэш функция = ascii значение буквы * примесь * номер функции * хэш от предыдущей функции & обрезка по маске
      //1 = (1 * 1 + 58)
      //1 = ( 0001 * 0001 + 11 0001 ) & 1111 1111 1111 1111 
      result = ((hash_num + seed) * result + s.charAt(i)) & this.hashMask;
    }

3. Устанавливаем биты в битовой карте для hash_nums хэш функций
  //установить index бит в битовой карте
  public void setBit(long index) {
   //= битовая карта OR 1 смещенное влево на index
   this.data = this.data | (1L << index );
  } //setbit
  
  //добавить элемент в блум фильтр
  public void add(String s) {
    //++ счетчик элементов
    cnt++;
    //для каждой хэш функции
    for(int i = 1; i <= hash_nums; i++) {
      //расчитаем номер индекса в битовой карте и установим его
      long index = hashCode(s, i);
      setBit(index);
    }
  } //add

4. Обратная операция - тестирование строки на вероятное наличие элемента в блум фильтре: хэшируем и проверяем бит в блум фильтре
  //получить значение бита на index месте
  public long getBit(long index) {
   //=битовая карта смещенная вправо на index мест (>>> пустые места справа заполняются 0)
   // & 01 - проверка только крайнего правого бита (все остальные игнорируются)
   return ( this.data >>> index ) & 1;
  } //getBit
  
  //проверка наличия элемента в блум фильтре
  public boolean test(String s) {
 //для каждой хэш функции
    for(int i = 1; i <= hash_nums; i++) {
      //определяем номер бита в битовой карте
      long index = hashCode(s, i);
      
      //если хотябы одна проверка не прошла - элемента нет
      if( getBit(index) == 0L ) return false;
    }
    
    //иначе элемент вероятно есть
    return true;
  } //test

Очевидно, чем больше битовая карта в блум фильтре, тем точней результат - меньше ложных срабатываний.
Зависимость определяется формулой:

, где n — предполагаемое количество элементов хранящихся в фильтре-множестве, p — вероятность ложного срабатывания, m - число бит в карте

  //вероятность ложного срабатывания
  public double getFalsePossb() {
   if(cnt == 0) return 0;
   return 1 / Math.pow(Math.E, bit_array_size * Math.log(2) * Math.log(2) / cnt );
  } //getFalsePossb

Вероятность ложного срабатывания от числа бит в карте и кол-ва элементов:


Также использование нескольких функции хэширования дает преимущество при достаточном размере блум фильтра:
  //оптимальное число функций хэширования
  public int getOptimalFncCnt() {
   if(cnt == 0) return 1;
   return (int)Math.ceil( bit_array_size / cnt * Math.log(2) );
  } //getOptimalFncCnt

График вероятности ложного срабатывания от размера массива и числа функций хэширования:

Практическое применение Bloom фильтра в Oracle:
* Partition pruning - усечение просматриваемых партиций
 На основе фильтра левой таблицы выбираются секции для сканирования из правой, что позволит не сканировать таблицу целиком.
* RAC - уменьшение сетевого трафика между кластерными нодами
 Для уменьшения сетевого взаимодействия между кластерными нодами, если левую таблицу читает 1 нода, а правую нода 2.
 Применение блум фильтра уменьшает размер пересылаемых данных правой таблицы из ноды 2 в ноду 1 до выполнения соединения.
* Exadata или Map reduce движок в качестве источника данных
 Массовая параллельная обработка внутри exadata проходит быстрей, чем аналогичная операция внутри бд Oracle.
 Из-за этого при наличии exadata Oracle стремится произвести максимальное число фильтрации в источнике до передачи в базу.
* In Memory хранилище данных
 Аналогично exadata, inmemory движок может производить фильтрацию внутри себя и это будет быстрей, чем выполнять join на полном наборе данных.
 При наличии таких данных Oracle стремится произвести максимум фильтрации до передачи данных в бд.
* Parallel - параллельные запросы
 Взаимодействие параллеьлных поток происходит через координатор и если 1 поток читает левую таблицу, а 2 поток - правую, то выгодней и быстрей отфильтровать правую таблицу блум фильтром и передать в первый поток уже отфильтрованные данные.
* Гетерогенные запросы через дблинк (возможно)
 Сетевое взаимодействие очень дорогое по сравнению с работой с памятью и даже диском. Из-за этого было бы выгодно отфильтровать правую таблицу на удаленной бд до ее передачи в нашу.

Полную реализацию блум фильтра можно видеть здесь: https://github.com/pihel/java/blob/master/Hash/BloomFilter.java