понедельник, 4 февраля 2030 г.

Скахин Алексей / pihel

Скахин Алексей, pihel Биография:
Родился в городе Вологда 9 марта 1987 года. В 2004 году завершил обучение в средней школе №12 города Вологда. В 2009 году окончил ВоГУ (бывший ВоГТУ) по специальности программное обеспечение. Тема дипломного проекта: "Синтез виртуальной среды с применением скалярных и аналитических функций возмущения и трехмерных массивов вокселей". С 2013 года проживаю в городе Санкт-Петербург.

В бывшем радиолюбитель: 3ий взрослый разряд по радиотелеграфии, радиолюбитель в кв диапазоне. Позывной ra1qkj.

О себе:
В свободное от работы время люблю играть в волейбол, кататься на горных лыжах и велосипеде, путешествовать на машине.

Места работы:
6. Лента: perfomance specialist: Sap, Oracle ( sql, pl/sql, abap ), 2016 - ...
5. Сигма: разработчик баз данных Oracle ( sql, pl/sql, Oracle BI, dwh ), 2015 - 2016
4. Tops Consulting: разработчик баз данных Oracle ( sql, pl/sql ), 2013 - 2015
3. Макси: разработка системы управления предприятием, программист (C++, Oracle, Pl/Sql) 2010-2013
2. Rstyle Softlab ОПР ДСУП: разработка системы управления предприятием, старший программист (rsl/vbs/fast report/ms sql) 2008-2010
1. ВНКЦ ЦЭМИ РАН: разработка внутренней информационной системы (php/js/mysql) 2007-2008 г.
0. Фриланс - web направление 2003-...

Навыки:
1. PHP: занимаюсь с 2004 года
 а. Bitrix - интеграция, интернет магазины, информационные порталы, государственные порталы)
 b. Drupal - интеграция, разработка модулей
2. JS: JQuery, ExtJS
3. C++: библиотека QT, C++ Builder
4. VBS: автоматизация рутины (авто сборка ПО, создание копий приложений и т.д.)
5. СУБД: Oracle PlSql, MySql, MS SQL (OLTP и OLAP), SqLite
6. Linux: пользователь Fedora и Ubuntu
7. Управление версиями: SVN, GIT, MS VSS
8. Остальное: FastReport, Excel (ActiveX, XML), Open(Libre)Office (XML, UNO), SVG/VML (raphaeljs, extjs), Flex, ITIL

Мои любимые статьи:
Внутреннее устройство Oracle: Оптимизация запрсов Oracle: Другое:

четверг, 28 июня 2018 г.

Введение в нейронные сети

Эта статья-заметка освещает основные понятия нейронных сетей и как их строить применяя python.

четверг, 24 мая 2018 г.

LSM дерево: быстрый доступ по ключу в условиях интенсивной вставки

В условиях интенсивной вставки для быстрого доступа к данным обычные btree индексы не подходят.
Во многих базах (Bigtable, HBase, LevelDB, MongoDB, SQLite4, Tarantool, RocksDB, WiredTiger, Apache Cassandra, и InfluxDB ) используется LSM-дерево (Log-structured merge-tree — журнально-структурированное дерево со слиянием)
LSM дерево служит для хранения ключ-значения.
В данной статье рассмотрим двухуровневое дерево. Первый уровень целиком находится в памяти, вторая половина на диске.
Вставка идет всегда в первую часть в памяти, а поиск из всех частей. Когда размер данных в памяти превышает определенный порог, то она записывается на диск. После чего блоки на диске объединяются в один.
Рассмотрим алгоритм по частям:

1. Все записи в дереве маркируются порядковым номером glsn , при каждой вставке этот счетчик увеличивается

Структура для хранения ключа-значения и порядкового номера:
//запись с ключ-значение
class SSItem {
 Integer key;
 String value;
 //+ идентификатор последовательности вставки
 Integer lsn;
 
 SSItem(Integer key, String value, Integer lsn) {
  this.key = key;
  this.value = value;
  this.lsn = lsn;
 }

2. Структуры объядиняются в одну таблицу в памяти.
В моем алгоритме используется хэш таблица.
В реальности чаще всего используется отсортированная по ключу структура - это дает преимущества, что для поиска и слияния используется последовательное чтение, вместо рандомного.
class MemSSTable { 
 
 //из-за хэша нет поиска по диапазону
 HashMap<Integer, SSItem> itms = new HashMap<Integer, SSItem>();
 

3. Т.к. размер памяти ограничен, то при превышении определенного порога, данные должны быть скинуты на диск.
В моей реализации - это простой вариант без фоновых заданий и асинхронных вызовов
При достижении лимита будет фриз, т.к. таблица скидывается на диск.
В реальных системах используется упреждающая запись: данные на диск скидываются асинхронно и заранее, до достижения жесткого лимита памяти.
 void SaveToDisk() throws FileNotFoundException, UnsupportedEncodingException {
  indx.path = "sstable_" + indx.max_lsn + ".dat";
  PrintWriter writer = new PrintWriter(indx.path, "UTF-8");
  Integer pad = 0;
  //последовательно пишем 10 байт с длиной значения и само значение
  for(Entry<Integer, SSItem> entry : itms.entrySet()) {
   SSItem itm = entry.getValue();
   String val = itm.get();
   writer.print( val );
   //регистрируем в индексе смещения в файле
   indx.keys.put(itm.key, pad);
   pad = pad + val.length();
  }
  writer.close();
  
  LSMTree.indexes.add(indx);
 } //SaveToDisk

4. Получается, что половина всех данных у нас хранится в хэше в памяти, а остальное в виде файлов на диске.
Чтобы не перебирать все файлы на диске создается дополнительная индексная структура, которая будет хранить метаинформацию о файлах в памяти:
//индекс над таблицей с даными
class SSTableIndex {
 //метаданные:
 //минимальный и максимальный ключи в таблице
 Integer min_key;
 Integer max_key;
 //минимальный и максимальный порядковый lsn
 Integer min_lsn;
 Integer max_lsn;
 
 //если таблица на диске, то путь до файла
 String path;
 
 //ключ - смещение ключа в файле
 HashMap<Integer, Integer> keys = new HashMap<Integer, Integer>();
 
 //добавить ключ в индекс
 void add(Integer k) {
  //также обновляем метаданные
  max_lsn = LSMTree.glsn;
  if(min_lsn == null) min_lsn = max_lsn;
  if(min_key == null || k < min_key) min_key = k;
  if(max_key == null || k > max_key) max_key = k;
  //добавление идет в память в хэш таблицу, на первом этапе смещения в файле нет
  keys.put(k, 0);
 }

5. Управляющий объект всего дерева хранит ссылку на таблицу с данными в памяти и на мета-индексы данных на диске:
public class LSMTree {
 
 static int glsn = 0;
 //максимальный размер, после которого таблица должна быть скинута на диск
 //для упрощения алгоритма = числу записей, а не размеру в байтах
 final static int max_sstable_size = 10; 
 
 //текущая таблица в памяти, куда вставляем данные
 MemSSTable MemTable;
 
 //все индексы, даже для таблиц на диске, хранятся в памяти
 static LinkedList<SSTableIndex> indexes = new LinkedList<SSTableIndex>();

6. Когда все объекты описаны, давайте посмотрим как происходит добавление нового элемента.
Добавление всегда идет в таблицу в памяти, что гарантирует быструю вставку:
public class LSMTree {
 //....
 
 //добавить запись
 public void add(Integer k, String v) {
  MemTable.add(k, v);
 }
Подробней логика добавления в таблицу в памяти:
* Увеличиваем глобальный счетчик элементов LSMTree.glsn
* Если число элментов превысило порог, то таблица скидывается на диск и очищается
В реальности это конечно не число элементов, а объем в байтах.
* Создается новый элемент SSItem
* И доблавляется в хэш массив
Причем, если элемент до этого уже существовал, то он перезаписывается.
В реальности хранятся все записи, чтобы была возможность поддержки транзакционности.
* Кроме этого в индексе обновляются метаданные indx.add
class MemSSTable { 
 //.....
 
 //добавить новый элемент
 void add(Integer k, String v) {
  //увеличиваем глобальный счетчик операций glsn
  LSMTree.glsn++;
  
  //если размер превышает, 
  if(itms.size() >= LSMTree.max_sstable_size) {
   try {
    //то сохраняем таблицу на диск
    //в реальных движках используется упреждающая фоновая запись
    //когда папять заполнена на N% (n<100), то данные скидываются на диск заранее, чтобы избежать фриза при сбросе памяти и записи на диск
    SaveToDisk();
   } catch (FileNotFoundException | UnsupportedEncodingException e) {
    e.printStackTrace();
    return;
   }
   //очищаем данные под новые значения
   indx = new SSTableIndex();
   itms = new HashMap<Integer, SSItem>();
  }
  
  //обновляем медаданные в индексе
  indx.add(k);
  
  SSItem itm = new SSItem(k, v, indx.max_lsn);
  
  //в моей реализации, при повторе ключ перезаписывается
  //т.е. транзакционность и многоверсионность тут не поддерживается
  itms.put(k,  itm);
  
 } //add
Если после вставки нового элемента старые данные оказались на диске, то в метаданных индекса прописывается ссылка на точное смещение в файле:
class SSTableIndex {
 //...
 
 //если таблица на диске, то путь до файла
 String path;
 
 //ключ - смещение
 HashMap<Integer, Integer> keys = new HashMap<Integer, Integer>();

7. С вставкой разобрались, теперь обратная операция - поиск элемента в дереве:
* Сперва мы проверяем наличие ключа в хэш таблице в памяти. Если элемент нашелся, то на этом все.
* Если элемента нет, то мы пробегамся по всем метаиндексам в поиске нашего ключа.
* проверяем, что ключ входит в диапазон indx.min_key - indx.max_key индекса
* И для полной точности проверяем наличие ключа в хэше ключей indx.keys.containsKey
* Если элемент нашелся, то это еще не конец, цикл продолжается, пока мы не переберем все индексы.
Т.к. ключ может добавляться несколько раз в разные промежутки времени.
* Из всех совпадений выбираем индекс с максимальным счетчиком вставки - это последнее изменение ключа
public class LSMTree {
 //.....
 
 //получить значение по ключу
 String getKey(Integer key) {
  //сперва смотрим в памяти
  String val = MemTable.getKey(key);
  
  if(val == null) {
   SSTableIndex indx_with_max_lsn = null;
   //потом таблицу по индексам, которая содержит наш ключ
   //если содержится в нескольких, то берем с максимальным lsn, т.к. это последнее изменение
   for (SSTableIndex indx : indexes) {
    Integer max_lsn = 0;
    if(key >= indx.min_key && key <= indx.max_key && max_lsn < indx.max_lsn ) {
     if(indx.keys.containsKey(key)) {
      max_lsn = indx.max_lsn;
      indx_with_max_lsn = indx;
     }
    }
   }
   //читаем из таблицы с диска
   if(indx_with_max_lsn != null) {
    try {
     return indx_with_max_lsn.getByPath(key);
    } catch (IOException e) {
     e.printStackTrace();
    }
   }
  }
  
  return val;
 }
* Определим нужный индекс обращаемся по нему к таблице на диске:
** Открываем нужный файл
В реальности, скорей всего, файы постоянно держатся открытыми для экономии времени.
** Следуя смещенями из индекса переходим к нужной точке файла file.seek
** И считываем значение file.read
class SSTableIndex {
 //...
 
 //получить значение ключа из открытого файла
 String getByPath(Integer key, RandomAccessFile file) throws IOException {
  //получаем смещение в файле для ключа из индекса
  Integer offset = keys.get(key);
  
  //смещаемся в файле
  file.seek(offset);
  
  //резервируем 10 байт под переменную с длинной значения
  byte[] lenb = new byte[10];
  file.read(lenb, 0, 10);
  
  Integer len = Integer.parseInt(new String(lenb, StandardCharsets.UTF_8));
 
  file.seek(offset + 10);
  
  //считываем значение
  byte[] valb = new byte[len];
  file.read(valb, 0, len);
  
  return new String(valb, StandardCharsets.UTF_8);
 } //getByPath
 
 //получить значение ключа с диска
 String getByPath(Integer key) throws IOException {
  if(!keys.containsKey(key)) return null;
  
  RandomAccessFile file = new RandomAccessFile(path, "r");
  
  String val = getByPath(key, file);
  
  file.close();

  return val;
 }

8. Т.к. ключ может вставляться неограниченное число раз, то со временем он может находится в нескольких файлах на диске одновременно.
Также, если бы LSM дерево поддерживало транзакционность, то в файлах также хранились бы разные версии одного ключа (изменения или удаления).
Для оптимизации последующего поиска применяется операция слияния нескольких файлов в один:
* Сортируем индексы по убыванию lsn
* Последовательно считываем элементы из первого индекса и записываем в новый
* Если элемент уже присутствует в объединенном индексе, то он пропускается
* Создается новый единственный файл и индекс со всеми ключами и значениями
* Старые файлы и индексы удаляются
public class LSMTree {
 //....
 
 //объединить несколько таблиц на диске в 1 большую
 void merge() throws IOException {
  Integer min_key = null;
  Integer max_key = null;
  Integer min_lsn = null;
  Integer max_lsn = null;
  
  //сортируем таблицы по убыванию lsn
  //чтобы вначале были самые свежие ключи
  Collections.sort(indexes, new Comparator<SSTableIndex>() {
   @Override
   public int compare(SSTableIndex o1, SSTableIndex o2) {
    if(o1.max_lsn > o2.max_lsn) {
     return -1;
    } else if(o1.max_lsn < o2.max_lsn) {
     return 1;
    }
    return 0;
   }
  });
  
  SSTableIndex merge_indx = new SSTableIndex();
  
  Integer pad = 0;
  merge_indx.path = "sstable_merge.dat";
  PrintWriter writer = new PrintWriter(merge_indx.path, "UTF-8");
  
  //пробегаемся по всем индексам, чтобы слить в 1
  for (SSTableIndex indx : indexes) {
   if(min_lsn == null || indx.min_lsn < min_lsn) min_lsn = indx.min_lsn;
   if(min_key == null || indx.min_key < min_key) min_key = indx.min_key;
   if(max_key == null || indx.max_key > max_key) max_key = indx.max_key;
   if(max_lsn == null || indx.max_lsn > max_lsn) max_lsn = indx.max_lsn;
   
   RandomAccessFile file = new RandomAccessFile(indx.path, "r");
   
   //т.к. данные в таблицах не упорядочены, это приводит к рандомным чтениям с диска
   //в реальности делают упорядочнный по ключу массив, чтобы делать быстрые последовательные чтения
   for(Entry<Integer, Integer> entry : indx.keys.entrySet()) {
    //оставляем запись только с максимальным lsn
    Integer key = entry.getKey();
    if(!merge_indx.keys.containsKey(key)) {
     String val = indx.getByPath(key, file);
     
     SSItem itm = new SSItem(key, val, 0);
     String itmval = itm.get();
     
     writer.print( itmval );
     merge_indx.keys.put(key, pad);
     pad = pad + itmval.length();     
    }
   }
   //записываем и удаляем старые файлы
   file.close();
   //delete
   File fd = new File(indx.path);
   fd.delete();
  }  
  
  merge_indx.min_lsn = min_lsn;
  merge_indx.min_key = min_key;
  merge_indx.max_key = max_key;
  merge_indx.max_lsn = max_lsn;
  
  writer.close();
  
  //переименовываем к обычному имени
  File fd = new File(merge_indx.path);
  merge_indx.path = "sstable_" + merge_indx.max_lsn + ".dat";
  File fdn = new File(merge_indx.path);
  fd.renameTo(fdn);
  
  indexes = new LinkedList<SSTableIndex>();
  indexes.add(merge_indx);
  
 } //merge
В моей реализации используется хэш массив, что дает большое число случайных чтений с диска.
В реальности данные хранят в отсотированной структуре, что позволят выполнять слияние 2 отсортированных массивов в один за линейное время используя только быстрое последовательное чтение.


Полный код класса LSM дерева можно посмотреть на github

пятница, 5 января 2018 г.

Hive: авторизация и аутентификация

Сборка Cloudera Hadoop не содержит никаких настроек авторизации и аутентификации, что конечно плохо, т.к. не дает возможности разделить права среди пользователей.
Опишу один из вариантов настройки авторизации и аутентификации.

Custom аутентификация:

1. Создаем linux пользователя на сервере hadoop:
sudo useradd user1
#создаем домашнюю директорию в hdfs
sudo -u hdfs hadoop fs -mkdir /user/user1
#задаем пользователя владельцем
sudo -u hdfs hadoop fs -chown user1 /user/user1
sudo passwd user1
#лочим пользователя в linux
sudo passwd -l user1

2. Создаем пользователя в hive
sudo -u hive hive
CREATE SCHEMA user1 LOCATION "/user/user1";

3. Создаем java класс, через который будет происходить проверка пользователя:
package hive.lenta;

import java.util.Hashtable;
import javax.security.sasl.AuthenticationException;
import org.apache.hive.service.auth.PasswdAuthenticationProvider;
import java.io.*;


public class LentaAuthenticator implements PasswdAuthenticationProvider {

  Hashtable<String, String> store = null;

  public LentaAuthenticator () {
    store = new Hashtable<String, String>();
 
 try {
  readPwd();  
 } catch (IOException e) {
  e.printStackTrace();
 }
 
  } //LentaAuthenticator
  
  public void readPwd () throws IOException {
 BufferedReader br = new BufferedReader(new FileReader("/u01/jar/pwd.ini"));
 String line;
 while ((line = br.readLine()) != null) {
  String login[] = line.split(";");
  if(login.length == 2) {
   store.put(login[0].trim(), login[1].trim());
  }
 }
 br.close();

  } //readPwd

  @Override
  public void Authenticate(String user, String  password)
      throws AuthenticationException {

    String storedPasswd = store.get(user);

    if (storedPasswd != null && storedPasswd.equals(password))
      return;
     
    throw new AuthenticationException("LentaAuthenticator: Error validating user");
  }

}
В файле /u01/jar/pwd.ini лежит логин и пароль, разделенные точкой с запятой:
user1;passwd

4. Компилируем класс в объект и собираем jar файл:
компилировать нужно именно той java под которой работает hadoop:
/usr/java/jdk1.7.0_67-cloudera/bin/javac -classpath /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/hive-service.jar -d /home/askahin/auth /home/askahin/auth/LentaAuthenticator.java
/usr/java/jdk1.7.0_67-cloudera/bin/jar -cf /home/askahin/auth/jar/lentauth.jar ./hive
sudo cp jar/lentauth.jar /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/
sudo chmod 777 /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/lentauth.jar
sudo cp jar/lentauth.jar /u01/jar/lentauth.jar
sudo chmod 777 /u01/jar/lentauth.jar

5. Включаем CUSTOM аутентификацию в настройках Cloudera Manager:
прописываем наш аутентификатор в настройках:
* Ищем все настройки по паттерну "hive-site.xml" в настройках Cloudera.
Делать нужно именно через cloudera, а не напрямую в файле "hive-site.xml", т.к. менеджер переписывает файл из памяти процесса.

Нажимаем view as xml и во всех окнах и вводим
<property>
    <name>hive.server2.authentication</name>
    <value>CUSTOM</value>
  </property>
  <property>
    <name>hive.server2.custom.authentication.class</name>
    <value>hive.lenta.LentaAuthenticator</value>
</property>

Также ставим флажки suppress parameter validation
Иначе настройки не попадут в итоговый XML файл
* Для работы также нужна включенная опция hive.server2.enable.doAs = true
Она означает, что все запросы будут идти из под пользователя, который прошел аутентификацию.

5. Задаем путь до кастомной папки с jar файлами:
Настройка: «HIVE_AUX_JARS_PATH»
Значение «/u01/jar»,
Также ставим suppress parameter validation


6. Все готово, перезапускам HIVE из консоли Cloudera

7. Теперь при аутентификации нужно указывать способ:
AuthMech=3


После того как все настроили, мы уверены, что пользователь прошел аутентификацию.
Войти без пароля больше не получится.

Дальше настраиваем авторизацию - выдачу прав пользователям на конкретные объекты.
Проще всего это сделать через расширенные права linux:


ACL авторизация


1. Включаем ACL права через Cloudera Manager
настройка "dfs.namenode.acls.enabled" = true


2. После этого можно распределять права на папки:
Сброс всех прав:
sudo -u hdfs hdfs dfs -setfacl -R -b /

Даем пользователю user1 права на создание таблиц в своей схеме
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:rwx,group::r-x,other::--- /user/user1
к остальному хранилищу hive даем права только на чтение:
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:r-x,group::r-x,other::--- /user/hive/warehouse
к какой-то из таблиц закрываем доступ:
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:---,group::r-x,other::--- /user/hive/warehouse/table1
Один минус у ACL - это остается возможность выполнить DROP table, даже если нет прав доступа на чтение или запись.
Как победить эту проблему я не понял.

Так же стоит не забывать, что таким образом мы защитили только HIVE, возможность читать данные через HDFS по прежнему остались.
(Как читать данные из HDFS и HIVE через java можно посмотреть в моем github: https://github.com/pihel/java/blob/master/bigdata/Hdfs2Hive.java )

Чтобы закрыть доступ чтения через HDFS мы используем закрытие всех портов на сервере, кроме 10000 , который используется службой HIVE.

суббота, 19 августа 2017 г.

Мониторинг нагрузки Sap Hana и Mssql

Предоставления для мониторинга нагрузки на субд sap hana и mssql:

SAP HANA

Представлю основные запросы для мониторинга нагрузки на sap hana в разрезе времени (чтото похожее на awr отчеты в oracle):

1. Использование CPU, Озу и физической памяти в разрезе 10 минут:
select concat( SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15), '0') as hh,
round(100*SUM(TOTAL_CPU_USER_TIME_DELTA) / ( SUM(TOTAL_CPU_IDLE_TIME_DELTA)+SUM(TOTAL_CPU_SYSTEM_TIME_DELTA)+SUM(TOTAL_CPU_USER_TIME_DELTA)+SUM(TOTAL_CPU_WIO_TIME_DELTA) ),2) as cpu_prc,
round(AVG(INSTANCE_TOTAL_MEMORY_USED_SIZE)/1024/1024/1024,2) MEMORY_USED,
round(AVG(INSTANCE_TOTAL_MEMORY_ALLOCATED_SIZE)/1024/1024/1024,2) MEMORY_ALLOCATED,
round(100*SUM(INSTANCE_TOTAL_MEMORY_USED_SIZE) / ( SUM(INSTANCE_TOTAL_MEMORY_ALLOCATED_SIZE) ),2) MEMORY_USED_PRC,
round(AVG(FREE_PHYSICAL_MEMORY)/1024/1024/1024,2) FREE_PHYSICAL_MEMORY,
round(AVG(USED_PHYSICAL_MEMORY)/1024/1024/1024,2) USED_PHYSICAL_MEMORY,
round(100*SUM(USED_PHYSICAL_MEMORY) / ( SUM(FREE_PHYSICAL_MEMORY)+SUM(USED_PHYSICAL_MEMORY) ),2) USED_PHYSICAL_MEMORY_PRC,
round(AVG(FREE_SWAP_SPACE)/1024/1024/1024,2) FREE_SWAP_SPACE,
round(AVG(USED_SWAP_SPACE)/1024/1024/1024,2) USED_SWAP,
round(100*SUM(USED_SWAP_SPACE) / ( SUM(FREE_SWAP_SPACE)+SUM(USED_SWAP_SPACE) ),2) USED_SWAP_PRC
from _SYS_STATISTICS.HOST_RESOURCE_UTILIZATION_STATISTICS
where TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyymmddhh24mi') between '2017081410' and '201708142359'
group by SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15)
order by SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15) desc
В графическом виде это будет выглядеть так:


2. Интенсивность чтения и записи:
select concat( SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15), '0') as hh,
SUM(TOTAL_IO_TIME_DELTA)/1000/1000 as io_sec, SUM(TOTAL_READ_SIZE_DELTA) read_size, SUM(TOTAL_READ_TIME_DELTA)/1000/1000 as read_sec,
SUM(TOTAL_WRITE_SIZE_DELTA) as write_size, SUM(TOTAL_WRITE_TIME_DELTA/1000/1000) as write_sec,
SUM(TOTAL_FAILED_READS_DELTA) as failed_reads, SUM(TOTAL_FAILED_WRITES_DELTA) as failed_writes
from _SYS_STATISTICS.HOST_VOLUME_IO_TOTAL_STATISTICS
where TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyymmddhh24mi') between '2017081410' and '201708142359'
group by SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15)
order by SUBSTRING (TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24:mi'),1,15) desc
В графическом виде это будет выглядеть так:
Время чтения в 10 минут:

Размер чтений в 10 минут:


3. Топовые sql запросы по времени выполнения:
select * from (
       select v.* , ROW_NUMBER() OVER(pARTITION BY hh ORDER BY DELTA_TIME desc) as rn
       from (
             select TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24') as hh,
             AVG(AVG_EXECUTION_TIME)/1000/1000, SUM(EXECUTION_COUNT),
             AVG(AVG_EXECUTION_TIME)/1000/1000*SUM(EXECUTION_COUNT) as DELTA_TIME,
             STRING_AGG(USER_NAME), TO_VARCHAR(STATEMENT_STRING) as txt, MAX(index) as index
             from _SYS_STATISTICS.HOST_SQL_PLAN_CACHE
             where SERVER_TIMESTAMP between to_date('01.08.2017', 'dd.mm.yyyy') and to_date('02.08.2017', 'dd.mm.yyyy')
             group by TO_NVARCHAR(SERVER_TIMESTAMP, 'yyyy.mm.dd hh24'), TO_VARCHAR(STATEMENT_STRING)
       ) v
)
where rn <= 5
order by hh, rn;
Получится что-то вроде такого топа:


4. Использование сети с момента сброса (system reset):
ALTER SYSTEM RESET MONITORING VIEW SYS. M_SERVICE_NETWORK_IO_RESET;
select SENDER_HOST, RECEIVER_HOST, sum(SEND_SIZE), sum(RECEIVE_SIZE), sum(SEND_DURATION)/1000/1000, sum(RECEIVE_DURATION)/1000/1000, sum(REQUEST_COUNT)
from SYS.M_SERVICE_NETWORK_IO_RESET Group BY SENDER_HOST, RECEIVER_HOST


MSSQL

топ sql запросов в разрезе показателей с последней перезагрузки бд:
SELECT  TOP 20 creation_time 
        ,last_execution_time
        ,total_physical_reads
        ,total_logical_reads 
        ,total_logical_writes
        , execution_count
        , total_worker_time
        , total_elapsed_time
        , total_elapsed_time / execution_count / 1000 avg_elapsed_time_ms
        ,SUBSTRING(st.text, (qs.statement_start_offset/2) + 1,
         ((CASE statement_end_offset
          WHEN -1 THEN DATALENGTH(st.text)
          ELSE qs.statement_end_offset END
            - qs.statement_start_offset)/2) + 1) AS statement_text,
   qp.query_plan,
   st.text
FROM sys.dm_exec_query_stats AS qs
CROSS APPLY sys.dm_exec_sql_text(qs.sql_handle) st
CROSS APPLY sys.dm_exec_query_plan(qs.plan_handle) qp
ORDER BY total_worker_time desc;

вторник, 15 августа 2017 г.

HIVE: Своя быстрая функция замен встроенной

Сегодня расскажу об одном способе ускорения запросов с аналитическими функциями в субд HIVE, работающей поверх Hadoop.

Один из вариантов ускорить HIVEQL запрос - это переписать встроенную аналитическую функцию на свой упрощенный вариант.

К примеру функция ROW_NUMBER(OVER PARTITION BY c1 ORDER BY c2) имеет достаточно сложную реализацию (github) только для того чтобы посчитать номер строки в группе.

Пример запроса с row_number:
create table tmp_table stored as orc as
select v.material, v.client_id, row_number() over (partition by v.client_id order by v.clientsum desc, v.checkcount desc) as rn
from pos_rec_itm_tst v;

Можно реализовать значительно упрощенную версию подсчета номера строки в группе.
На вход функции Rank.evaluate подаем значение группы key (то что было в partition by) и инкрементируем значение счетчика counter.
Если приходит новая группа, то счетчик сбрасывается на 0, а в переменную группы "this.last_key" записывается значение новой группы:
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
public final class Rank extends UDF{
    private int  counter;
    private String last_key;
    public int evaluate(final String key){
      if ( !key.equals(this.last_key) ) {
         this.counter = 0;
         this.last_key = key;
      }
      return (++this.counter);
    }
}
Понятно, что для правильной работы этой функции набор данных нужно предварительно отсортировать по группе партицирования "partition by", а потом по остальным полям "order by".

Пример запроса с собственной функцией:
create table tmp_table stored as orc as
select v.material, v.client_id, myrank(v.client_id) as rn from (
  SELECT client_id, clientsum, checkcount, material FROM pos_rec_itm_tst
  DISTRIBUTE BY client_id SORT BY clientsum desc, checkcount desc
) v;
Дополнительно сортировку в HIVE можно ускорить, если распараллелить мапперы по полю партицирования "partition by", а внутри этих групп сортировать по полям из "order by".
За счет параллельности мы опять же ускорим сортировку.


Чтобы создать такую функцию в HIVE нужно скомпилировать ее из java исходников:

$> /путь_до_java_который_используется_в_hive/bin/javac -classpath /путь_до_hive/lib/hive/lib/hive-serde-1.7.jar:/путь_до_hive/lib/hive/lib/hive-exec.jar:/путь_до_hadoop/lib/hadoop/client-0.20/hadoop-core.jar -d /путь_куда_компилим /путь_до_программы.java
$> /путь_до_java_который_используется_в_hive/bin/jar -cf название_jar_программы.jar com/example/hive/udf/название_класса.class

запускаем hive и регистрируем наш jar , как функцию с произвольным названием:
hive> add jar Rank.jar;
hive> create temporary function myrank as 'com.example.hive.udf.Rank';

Такое небольшое изменение ускорит выполнение запроса на 10-20%.

среда, 5 апреля 2017 г.

Oracle: columnar compression в exadata и inmemory

В связи с развитием баз данных в области колоночных inmemory хранилищ, хотел бы осветить развитие опции компрессии в субд Oracle.

Строчные сжатия

Basic compression – работает только при direct path (append) вставке
Представляет из себя дедублицирование данных.
При Update строка становится мигрированной и хранится уже не сжатой.

Oltp compression – работает при любых вставках и требует наличия опции advanced compression.
Представляет из себя дедублицирование данных.
При не «direct path» строка сначала вставляется несжатой, при накоплении % несжатых записей в блоке = PCTFREE, блок сжимается и т.д.:

При Update строка становится мигрированной и хранится сначала не сжатой, но потом при накоплении PCTFREE % несжатых записей, блок аналогично сжимается.


Колоночное сжатие

HCC - hybrid columnar compression
работает только с «Exadata или Oracle ZFS Storage Appliance или either the Pillar Axiom или Oracle FS1 storage array».
Oracle хранит компрессированную колонку в виде связки блоков (как мигрированная строка), что оставляет возможность быстрого доступа к колонке по rowid и создание индексов!.
Также требуется direct path (append) вставка.

Внутреннее устройство:
Строки данных бьются на compression unit (cu) – это достаточно большая сущность (32 КБ) и со стороны Oracle рассматривается как 1 блок.


Детальная структура CU:

CU в заголовке содержит указатели на колонки. А блоки внутри CU указатели на строки.
Т.к. все строки поделены на CU, внутри которых последовательно хранятся колонки, то добавление колонок к запросу не увеличивает число чтений (даже если строки из разных CU, число чтений будет кратно число блоков в CU * кол-во запрашиваемых CU):

Чтение 1 блока дает 417 чтений:
select /*+ MONITOR */ id, num_1000000 from t_qh where id in( 5123456);

И чтение из 2 разных блоков тоже дает 417 чтений, т.к. оба этих блока входят в 1 CU:
select /*+ MONITOR */ id, num_1000000 from t_qh where id in( 5123456, 6114557 );


Поддержка DML:
- Блокировка по умолчанию происходит на уровне CU. Если нужна блокировка на уровне строки, то таблицу нужно создавать с директивой «ROW LEVEL LOCKING», которая расширяет заголовок CU под флаги блокировок всех строк всех блоков.
- При обновлении строки, она мигрирует в другой блок и помечается сжатой как OLTP, в не зависимости от степени сжатия раньше (см. описание выше)


Сжатие колонок
Обычные способы сжатия, а не алгоритмы дедубликации:
  • Query Low – LZO (4x)
    Данные перед вставкой не сортируются
  • Query High – gzip (6x)
    Во всех других случаях данных сортируются в рамках одного CU, чтобы достичь большей компрессии (по одному из столбцов?)
  • Archive Low – gzip high (7x)
  • Archive High – bzip2 (12x)

Т.к. HCC заточено на хранилища данных, то максимальный выигрыш дают смарт сканы, т.к. разархивация происходит на стороне exadata cells. В случае индексного доступа разархивация происходит уже в БД (причем всего CU!):

Смарт скан, отрицательный offloading - возвращено больше, чем считано:
select /*+ monitor */ count(DISTINCT num_1000), count(DISTINCT num_10), count(DISTINCT num_1000000) from t_qh;


Чтение по индексу - считывается весь CU, а разархивация происходит внутри субд:
select /*+ MONITOR */ id, num_1000000 from t_qh where id in( 5123456);


Inmemory HCC

Oracle хранит данные сразу в 2 форматах: строковый и колоночный. Это могут быть как таблицы, так и matview на наборе таблиц.
Загонять данные в колонки нужно вручную.
В inmemory можно поместить часть столбцов или разную степень для разных колонок.

Размер IMCU = 1МБ ( blogs.oracle.com ) см. imcu_addr.v$im_header



Данные в IMCU всегда хранятся в порядке вставки (rowid) - docs.oracle.com
Пример:
SELECT cust_id, time_id, channel_id
FROM sales
WHERE prod_id =5;

Данные из колонки «prod_id» фильтруются, в результате чего определяются позиции на которых находятся искомые данные. Т.к. данные в колонках лежат в том же порядке (включая null), то по этим номерам позиций забираются соответствующие значения из связанных колонок (time_id, chanel_id)

Основные понятия IM HCC

Local dictionary CU
хэш массив уникальных значений в CU (в виде чисел)
min/max значение в колонке CU

«IM storage index» - для отфильтровывания CU
IMCU index хранятся прямо в заголовке CU.

SMU (Snapshot Metadata Unit) = 64КБ – метаданные о IMCU - информация о инвалидации данных в IMCU при DML (транзакционный журнал).
Для DML по прежнему используется буферный кэш, но в SMU помещается информация, что rowid был инвалидирован.
Т.е. чем больше DML, тем хуже работает IM

Repopulation – периодическое обновление измененных данных в IMCU из буферного кэша (при накоплении определенного объема). На время этой операции IMCU отключаются, используется стандартный механизм доступа.



Ручной запуск:
EXEC DBMS_INMEMORY.POPULATE('SH', 'CUSTOMERS');

При exchange partition чтобы не потерять данные IM нужно собрать данные src таблицы в IM. Тогда обмен произойдет как у таблицы, так и у IM хранилища.
При direct path (+append) вставке происходит автоматическая фоновая репопуляция.

IM expression unit
В IM кроме обычных колонок могут хранится виртуальные выражения (как колонки, там и предрасчитанные на основе статистики вызова - «IM expression statistic store»), они будут также обновляться при repopulation.

IM expression statistic store
Дополнительное сохранение агрегированных данных SELECT, WHERE, GROUP BY если они часто вызываются и требуют больших расходов на расчет.

SIMD – векторная обработка колонок
Т.к. данные для фильтрации хранятся в виде колонки-вектора, то для фильтрации можно использовать векторные возможности CPU.
Векторная операция – это когда за 1 такт процессора происходит сравнение не одной переменной, а целого вектора значений (к примеру 8)



Раньше эта технология часто использовалась в графике, например при работе с RGB значениями, цвет пикселя можно было сменить за 1 такт CPU, теперь эти технологии нашли применение и в СУБД.

Компрессия:
* «MEMCOMPRESS FOR QUERY LOW» - алгоритм по умолчанию
Похоже на дедублицирование: словарь + обнаружение повторов + замена повтором на бит ссылку
(oracle.com/technetwork )
* «MEMCOMPRESS FOR CAPACITY LOW» - Используется проприетарный алгоритм OZIP, как расширение поверх «MEMCOMPRESS FOR QUERY LOW». Нужно разжатие перед выполнением WHERE, но может выполняться прямо на CPU (нужны специальные сервера с SQL in Silicon)


IM JOIN
SELECT v.year, v.name, s.sales_price
FROM   vehicles v, sales s
WHERE  v.name = s.name;

Обычный джойн выполняется так:


Т.е. большая часть работы — это преобразование колоночного вида в строковое хранилище pga.

Над join можно произвести первую оптимизацию — bloom filter, т. е. испольлзуя данные 1 таблицы откинуть лишние CU еще до join.

Для устранения необходимости преобразования колоночного вида в строковый можно подготовить что-то типа join индекса в IM:
CREATE INMEMORY JOIN GROUP deptid_jg (hr.employees(department_id),hr.departments(department_id));

В индексе сохраняются сочетания значений «Local dictionary CU» нужных колонок таблиц, где значения = указатели на нужные CU.

В итоге JOIN будет выполняться как фильтр большей таблицы по данным меньшей:
* фильтруем колонку левой таблицы
* получаем цифровые значения строк левой таблицы из “Local dictionary CU”
* сохраняем этот массив в PGA
* применяем фильтр к правой таблице на основе нашего «INMEMORY JOIN GROUP» где хранится связка левых CU с правыми CU



В итоге получается выполнение join без преобразования колонок в строки и фильтрация идет по обычному массиву (не хэш), т. е. без потребления дополнительного CPU для хэширования и проблемы хэш массива: устранения коллизий.


IM Vector Group BY
SELECT c.customer_id, s.quantity_sold, s.amount_sold 
FROM   customers c, sales s
WHERE  c.customer_id = s.customer_id 
AND    c.country_id = 'FR';

IM Vector group by есть общее со star transformation – преобразование JOIN в фильтрацию фактовой таблицы измерениями:
* фильтруем колонку левой таблицы
* Создаем массив уникальных значений левой таблицы: 0 — нет значения, 1,2,N существующие уникальные значения
* созданные массивы в векторном режиме применяются над фактовой таблицой.

План будет выглядеть так:
SQL_ID  0yxqj2nq8p9kt, child number 0
-------------------------------------
SELECT t.calendar_year, p.prod_category, SUM(quantity_sold) FROM
times t, products p, sales f WHERE  t.time_id = f.time_id AND
p.prod_id   = f.prod_id GROUP BY t.calendar_year, p.prod_category

Plan hash value: 2377225738
------------------------------------------------------------------------------------------------------
|Id| Operation                           | Name              |Rows|Bytes|Cost(%CPU)|Time|Pstart|Pstop|
------------------------------------------------------------------------------------------------------
| 0|SELECT STATEMENT                     |                         |    |     |285(100)|        | |  |
| 1| TEMP TABLE TRANSFORMATION           |                         |    |     |        |        | |  |
| 2|  LOAD AS SELECT                     |SYS_TEMP_0FD9D6644_11CBE8|    |     |        |        | |  |
| 3|   VECTOR GROUP BY                   |                         |   5|  80 |  3(100)|00:00:01| |  |
| 4|    KEY VECTOR CREATE BUFFERED       | :KV0000                 |1826|29216|  3(100)|00:00:01| |  |
| 5|     TABLE ACCESS INMEMORY FULL      | TIMES                   |1826|21912|  1(100)|00:00:01| |  |
| 6|  LOAD AS SELECT                     |SYS_TEMP_0FD9D6645_11CBE8|    |     |        |        | |  |
| 7|   VECTOR GROUP BY                   |                         |   5| 125 |  1(100)|00:00:01| |  |
| 8|    KEY VECTOR CREATE BUFFERED       | :KV0001                 |  72| 1800|  1(100)|00:00:01| |  |
| 9|     TABLE ACCESS INMEMORY FULL      | PRODUCTS                |  72| 1512|  0  (0)|        | |  |
|10|  HASH GROUP BY                      |                         |  18| 1440|282 (99)|00:00:01| |  |
|11|   HASH JOIN                         |                         |  18| 1440|281 (99)|00:00:01| |  |
|12|    HASH JOIN                        |                         |  18| 990 |278(100)|00:00:01| |  |
|13|     TABLE ACCESS FULL               |SYS_TEMP_0FD9D6644_11CBE8|   5|  80 |  2  (0)|00:00:01| |  |
|14|     VIEW                            | VW_VT_AF278325          |  18| 702 |276(100)|00:00:01| |  |
|15|      VECTOR GROUP BY                |                         |  18| 414 |276(100)|00:00:01| |  |
|16|       HASH GROUP BY                 |                         |  18| 414 |276(100)|00:00:01| |  |
|17|        KEY VECTOR USE               | :KV0000                 |918K|  20M|276(100)|00:00:01| |  |
|18|         KEY VECTOR USE              | :KV0001                 |918K|  16M|272(100)|00:00:01| |  |
|19|          PARTITION RANGE ALL        |                         |918K|  13M|257(100)|00:00:01|1|28|
|20|           TABLE ACCESS INMEMORY FULL| SALES                   |918K|  13M|257(100)|00:00:01|1|28|
|21|    TABLE ACCESS FULL                |SYS_TEMP_0FD9D6645_11CBE8|  5 |  125|  2  (0)|00:00:01| |  |
------------------------------------------------------------------------------------------------------

Predicate Information (identified by operation id):
---------------------------------------------------
  11 - access("ITEM_10"=INTERNAL_FUNCTION("C0") AND "ITEM_11"="C2")
  12 - access("ITEM_8"=INTERNAL_FUNCTION("C0") AND "ITEM_9"="C2")

Note
-----
   - vector transformation used for this statement


IM RAC

Можно продублировать IMCU на всех нодах «DUPLICATE ALL» или разделить таблицу (DISTRIBUTE ) — часть на 1 ноде, часть на другой:
* по партициям «DISTRIBUTE BY PARTITION »
* по диапазону rowid “DISTRIBUTE BY ROWID RANGE”

воскресенье, 26 марта 2017 г.

Решение bigdata задач на Hadoop mapreduce

Hadoop

Набор утилит, библиотек и фреймворк для разработки и выполнения распределённых программ, работающих на кластерах из сотен и тысяч узлов.

Ссылка на установку: http://www.cloudera.com/content/www/en-us/downloads.html
На основе курса

hadoop
Hadoop состоит из 2 основных частей hdfs и map reduce. Рассмотрим подробней.

Hdfs

- распределенная файловая система. Это значит, что данные файла распределены по множеству серверов.

* hdfs лучше работает с небольшим числом больших файлов/блоков
* один раз записали, много раз считали
* можно только целиком считать, целиком очистить или дописать в конец (нельзя с середины)
* файлы бьются на блоки split (к примеру 64мб)
 ** размер блока выбирается так, чтобы нивилировать время передачи блока по сети (если сеть быстрая, то блок можно поменьше, если медленная, то больше)
* все блоки реплицируются с фактором 3 поумолчанию (хранятся в 3 копиях на разных серверах)
 ** это обеспечивает высокую пропускную способность, но не скорость реакции

Hdfs java api

Пример программы копирующей один файл в другой:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.InputStream;

Path source_path = new Path(args[0]);
Path target_path = new Path(args[1]);

Configuration conf = new Configuration();
FileSystem source_fs = FileSystem.get(source_path.toUri(), conf);
FileSystem target_fs = FileSystem.get(target_path.toUri(), conf);

//доступ к нескольким файлам по маскам
//FileStatus [] files = fs.globStatus(glob); 
//?, *, [abc], [^a], {ab, cd}

FSDataOutputStream output = target_fs.create(target_path);
InputStream input = null;
try{
    input = source_fs.open(source_path);
    IOUtils.copyBytes(input, output, conf);
} finally {
    IOUtils.closeStream(input);
    IOUtils.closeStream(output);
}

Парадигма map reduce

1. input file: split 1 + split 2 + split 3 (блоки файла). Каждый сплит хранится на своем сервере кластера.
2. mapper:
 * на каждый split N создается worker для обработки
 * worker выполяется на сервере, где находится блок
 * workerы не могут обмениваются между собой данными
  ** т.е. mapper подходит для независимых данных, которые легко побить на части.
   зависимые данные (типа архива zip) нельзя побить на части в mapper:
    все блоки (split) будут переданы в один mapper
 * в случае ошибки процесс рестратует на другой копии блока hdfs
 * combine - (необязательный шаг) - reducer внутри mappera агрегирующие данные в меньшее число данных (ключ менять нельзя)
  ** агрегирует только данные одного маппера
 * partition - (необязательный шаг) - определение куда отправится ключ (поумолчанию hash(k) mod N , где N - число reducer )
3. результат mapper записывается в виде ключ значение {k->v} в циклический буфер в памяти, конец списка пишется на локальный диск сервера с worker
  ** это сделано в целях производительности, чтобы не реплицировать
  ** но в случае ошибки, данные нельзя будет восстановить и потребует рестартовать mapper
4. данные из {k->v} расходятся на сервера reduccer на основе ключа.
Т.е. данные с разных mapperов но с одним key попадут в один reducer
5. Результат reducer складываются в hdfs. Число файлов = число reducer

Hadoop streaming

Инструмент для быстрого прототипирования map reduce задач.
можно писать python программы заготовки, общение маппера с редусером проиходит чреез stdin/out.
Данные передаются как текст разделенный табом

word count
#!/usr/bin/python
import sys
for line in sys.stdin:
  for token in line.strip().split(" "):
    if token: print(token + '\t1')
 
#!/usr/bin/python
import sys
(lastKey, sum)=(None, 0)
for line in sys.stdin:
  (key, value) = line.strip().split("\t")
  if lastKey and lastKey != key:
    print (lastKey + '\t' + str(sum))
    (lastKey, sum) = (key, int(value))
  else:
    (lastKey, sum) = (key, sum + int(value))
if lastKey:
  print (lastKey + '\t' + str(sum))

Запуск:
hadoop jar $HADOOP_HOME/hadoop/hadoop-streaming.jar \
-D mapred.job.name="WordCount Job via Streaming" \
-files countMap.py, countReduce.py \
-input text.txt \
-output /tmp/wordCount/ \
-mapper countMap.py \
-combiner countReduce.py \
-reducer countReduce.py

Java source code example

Программа подсчитывающая число слов в hdfs файле.

word count

public class WordCountJob extends Configured implements Tool {
 static public class WordCountMapper extends Mapper < LongWritable, Text, Text, IntWritable > {
  private final static IntWritable one = new IntWritable(1);
  private final Text word = new Text();
  
  @Override
  protected void map(LongWritable key, Text value, Context context)  throws IOException,  InterruptedException {
     //разбиваем строку (key) на слова и передаем пару: слово, 1
     StringTokenizertokenizer = new StringTokenizer(value.toString());
     while (tokenizer.hasMoreTokens()) {
      text.set(tokenizer.nextToken());
      context.write(text, one);
     }
  } //map
 } //WordCountMapper
  
 static public class WordCountReducer  extends Reducer < Text,  IntWritable,  Text,  IntWritable > {
   @Override
   protected void reduce(Text key, Iterable < IntWritable > values, Context context)   throws IOException,  InterruptedException {
     //в редусер приходят все единицы от одного слова
    intsum = 0;
    for (IntWritable value: values) {
     sum += value.get();
    }
    context.write(key, new IntWritable(sum));
   } //reduce
 } //WordCountReducer
  
  @Override
  public int run(String[] args) throws Exception {
   Job job = Job.getInstance(getConf(), "WordCount");
   //класс обработчик
   job.setJarByClass(getClass());
   //путь до файла
   TextInputFormat.addInputPath(job, new Path(args[0]));
   //формат входных данных (можно отнаследовать и сделать собственный)
   job.setInputFormatClass(TextInputFormat.class);
   //классы мапперов, редусееров, комбайнеров и т.д.
   job.setMapperClass(WordCountMapper.class);
   job.setReducerClass(WordCountReducer.class);
   job.setCombinerClass(WordCountReducer.class);
   //выходной файл
   TextOutputFormat.setOutputPath(job, new Path(args[1]));
   job.setOutputFormatClass(TextOutputFormat.class);
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(IntWritable.class);
   return job.waitForCompletion(true) ? 0 : 1;
  } //run
  
  public static void main(String[] args) throws Exception {
   int exitCode = ToolRunner.run( new WordCountJob(), args);
   System.exit(exitCode);
  } //main
} //WordCountJob

Такая программа будет тратить все время на передачу данных от маппера к редусеру.

Её можно усовершенствовать несколькими способами:
* Сделать комбайнер - чтобы данные из одного маппера были сагрегированы там.
 - не обязательно вызывается
 +/- агрегирует данные только одного вызова маппера (одной строки)
* хэш аггегировать внутри маппера:
 + обязательно выполнится
 +/- агрегирует данные только одного вызова маппера (одной строки)
 - нужно следить за размером хэш массива
* хэш массив сверху маппера, а внутри маппера обрабатывать
 + обязательно выполнится
 + агрегирует все вызовы маппера (сплита)
 - нужно следить за размером хэш массива

Sql операции

Разность (minus) и другое по аналогии
// на вход подаются элементы из двух множеств A и B
class Mapper
method Map(rowkey key, value t)
 Emit(value t, string t.SetName) // t.SetName либо ‘A‘ либо ‘B'
class Reducer
// массив n может быть [‘A'], [‘B'], [‘A' ‘B‘] или [‘B', ‘A']
method Reduce(value t, array n)
 if n.size() = 1 and n[1] = 'A'
  Emit(value t, null)
* hash join
class Mapper
method Initialize
 H = new AssociativeArray : join_key -> tuple from A //хэшируем меньшую таблицу целиком в память
 A = load()
 for all [ join_key k, tuple [a1, a2,...] ] in A
  H{k} = H{k}.append( [a1, a2,...] )

method Map(join_key k, tuple B)
 for all tuple a in H{k} //ищем в A соответствие ключа из B
  Emit(null, tuple [k a B] ) //если нашли, то записываем

Операции на графах

оптимальное представление в виде списка смежности, т.к. матрица смежности занимает слишком много места
* поиск кратчайшего пути
 ** обычный алгоритм - дейкстра
 ** поиск в ширину (bfs)
  на невзвешенном графе (нет связей назад):
  - Останавливаемся, как только расстояния до каждой вершины стало известно
  - Останавливаемся, когда пройдет число итераций, равное диаметру графа
  на взвешенном графе (есть обратные связи):
  - Останавливаемся, после того, как расстояния перестают меняться
* PR = a (1/N) + (1-a)* SUM( PR(i) / C(i) )
 ** N - начальное число точек в графе
 ** C - число исходящих точек
 ** a - вероятность случайного перехода
 ** pr решается только приблизительно:
  - останавливаем, когда значения перестают меняться
  - фиксированное число итерации
  - когда изменение значение меньше определенного числа
* проблемы
 ** необходимость передавать весь граф между всеми задачами
 ** итеративный режим, т.е. переход на следующий этап, когда все параллельные задачи выполнятся
* оптимизации:
 ** inmemory combining - агрегируем сообщения в общем массиве
 ** партицирование - смежные точки оказались на одном маппере
 ** структуру графа читать из hdfs, а не передавать + партицирование

Высокоуровневые языки поверх hdfs или hadoop:

Pig

высокоуровневый язык + компилятор в mapreduce
используется для быстрого написания типовых map-reduce задач

основные составляющие:
* field - поле
* tuple - кортеж (1, "a", 2)
* bag - коллекция кортежей {(), ()}
 ** коллекции могут содержать разное кол-во полей в кортеже (также они могут быть разных типов)

word count:
input_lines = LOAD 'file-with-text' AS (line:chararray);
words = FOREACH input_lines GENERATE FLATTEN(TOKENIZE(line)) AS word;
filtered_words = FILTER words BY word MATCHES '\\w+';
word_groups = GROUP filtered_words BY word;
word_count = FOREACH word_groups GENERATE COUNT(filtered_words) AS count,
group AS word;
ordered_word_count = ORDER word_count BY count DESC;
STORE ordered_word_count INTO ‘number-of-words-file';

join
//Загрузить записи в bag #1
posts = LOAD 'data/user-posts.txt' USING PigStorage(',') AS (user:chararray,post:chararray,date:long);
//Загрузить записи в bag #2
likes = LOAD 'data/user-likes.txt' USING PigStorage(',') AS (user:chararray,likes:int,date:long);
userInfo = JOIN posts BY user, likes BY user;
DUMP userInfo;

Hive

sql подобный язык hivesql поверх hadoop
основные составляющие - как в бд
метаданные могут храниться в mysql /derby

* создание таблицы
hive> CREATE TABLE posts (user STRING, post STRING, time BIGINT)
 > ROW FORMAT DELIMITED
 > FIELDS TERMINATED BY ','
 > STORED AS TEXTFILE;

* при нарушении схемы ошибки не будет, вставится NULL

join
hive> INSERT OVERWRITE TABLE posts_likes
 > SELECT p.user, p.post, l.count
 > FROM posts p JOIN likes l ON (p.user = l.user);

word count
CREATE TABLE docs (line STRING);
LOAD DATA INPATH 'docs' OVERWRITE INTO TABLE docs;
CREATE TABLE word_counts AS
SELECT word, count(1) AS count FROM
(SELECT explode(split(line, '\s')) AS word FROM docs) w
GROUP BY word
ORDER BY word;

Nosql

* Масштабирование:
 ** мастер-слейв: распределение нагрузки cpu, io остается общим
 ** шардинг: деление данных таблицы по серверам: распределение io, cpu Общее
* cap theorem - достигается только 2 из 3
 ** consistency - непротиворечивость
  Чаще всего жертвует строгой целостностью в текущий момент, на целостность в конечном счете
 ** aviability - доступность
 ** partioning - разделение данных на части
* типы:
 ** ключ-значение
 ** колоночная
 ** документоориентировання
 ** графовая

Hbase

- поколоночная база
 ** строки в таблицах шардируются на сервера
  *** строки хранятся в отстортированном виде, чтобы можно было искать по серверам
  *** список шардов хранится в HMaster
   через него определяется сервер для чтений, а дальше чтение идет уже напряму с него миную hmaster
   ! возможная точка отказа
 ** столбцы объединяются в column family
  *** каждая такая группа сохраняется также в отдельный файл
  *** для всей группы задается степень сжатия и другие физические настройки
 ** изменение/удаление происходит через вставку нового значения в колонку с TS
  *** при select выбираются последние 3 изменения (если в колонке не было изменений, то берется предыдущая максимальная версия)
 ** промежуточные данные хранятся в memstore сервера и логе изменений
  *** периодически данные из памяти скидываются на hdfs
 ** hfile - мапится в файл hdfs, который также splitit'ится на разные сервера
 ** сжатие файлов хранилища:
  *** объединение файлов CF в один
  *** очистка от удаленных записей

 Массовость - деление данных на сервера по:
 * по семейству колонок
 * по диапазону ключей
 * по версии строки
 * сам файл разбивается в hdfs на сплиты

Cassandra

отличия:
** есть sql, а не только java api
** CF имеют иерархичность и могут также объединяться в группы по какомуто признаку
** настраевая consistency от самой строгой - ожидание отклика всех серверов, то записи без проверок
** нет единой точки отказа hmaster, данные делятся по диапазону
 ** но усложняет деление и поиск данных

* greenplum - классическая колоночная бд, с шардирование данных на основе pgsql
 ** есть поддержка sql

Spark

Альтернативный вариант реализации hadoop.

* Преимущества перед hadoop:
1. Необязательно сохранять промежуточные результаты в HDFS при итеративной разработке. Данные можно хранить в памяти.
2. Необязательно чередовать map-reduce, можно делать несколько reducer подряд
3. Операции в памяти (вкллючая синхронизацию между серверами), а не через hdfs
4. Возможность загрузить данные в распределенную память, а потом делать обработку (каждому mapper уже не надо считывать данные из hdfs)

* В основе может лежать любой вид хранилища данных
* Архитектура, как в hadoop: name node (driver) + worker

* Основная составляющая вещь - RDD:
 ** объект партицируется
 ** каждая партиция имеет произвольное хранение: память, диск, hdfs и т.д.
  *** Для хранения данных в памяти используется LRU буфер.
   Старые, редко используемые данны вытесняются в медленное хранилище
 ** объект неизменяем (изменение создает новый объект)
 ** rdd распределена по серверам

* Программа состоит из однонаправленного графа операций без циклов
 Виды зависимостей в графе:
 ** Narrow (узкая) - переход 1 партииции в 1 другую
 ** wide (широкая) - переход 1 (N) партиций в N (1)

* Восстановление в случае падения:
 - нет репликации как в hdfs
 + для воостановления используется перезапуск
  ** если потерялись данные из одной партиции в Narrow связи, то восстанавливается только она из предыдущей партиции
  ** если в wide то перезапускаются все партиции (т.к. связ не 1 к 1).
   Результат wide поумолчанию сохраняется на диск.
 ** Если есть шанс потерять данные, а цепочка длинная, то лучше самим самостоятельно сохранять промежуточные результаты на диск.

* Типы операции состоят из:
 ** Трансформация - не запускают работу
  map, filer, group, union, join ....
 ** Действий - запускают работу (все трансформации в графе до этого графа)
  count, collect, save, reduce

* spark поддерживает:
 ** java
 ** scala
 ** python

Join на scala
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkContext._
  val sc = new SparkContext(master, appName, [sparkHome], [jars])
 
  //transform
  //или из текстового файла  sc.textFile("file.txt")
  val visits = sc.parallelize(
   Seq(("index.html", "1.2.3.4"),
   ("about.html", "3.4.5.6"),
   ("index.html", "1.3.3.1")))
  val pageNames = sc.parallelize(
   Seq(("index.html", "Home"),
   ("about.html", "About")))
  visits.join(pageNames)
  
  //action
  visits.saveAsTextFile("hdfs://file.txt")
  // ("index.html", ("1.2.3.4", "Home"))
  // ("index.html", ("1.3.3.1", "Home"))
  // ("about.html", ("3.4.5.6", "About"))
word count
val file = sc.textFile("hdfs://...")
val sics = file.filter(_.contains("MAIL"))
val cached = sics.cache()
val ones = cached.map(_ => 1)
val count = ones.reduce(_+_)
val file = sc.textFile("hdfs://...")
val count = file.filter(_.contains(“MAIL")).count()
или
val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
 .map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

* Переменные для обмена данными между серверами
 ** broadcast variables - переменная только для чтения
  val broadcastVar = sc.broadcast(Array(1, 2, 3))
  ...
  broadcastVar.value 
 ** Accumulators - счетчики
  val accum = sc.accumulator(0)
  accum += x
  accum.value

Yarn

планировщик ресурсов, которые избавляет от недостатков классического MR:
* жесткое разделенеи ресурсов: можно освободить ресурсы от mapper после их полного выполнения и отдать все ресурсы reducer
* возможность разделять ресурсы не только между MR Задачами, но и другими процессами

* разделение происходит в понятии: озу, cpu, диск, сеть и т.д.
* он же производит запуск задач (не только MR) и восстанавливает в случае падения
* для работы нужны вспомогательные процессы:
  ** appserver - получение комманд от клиента
  ** resource server - север для выделяющий ресурсы
  ** node server - сервер для запуска задач и отслеживания за ними (на каждом сервере)

пятница, 17 февраля 2017 г.

Oracle: Hash агрегация и приблизительный DISTINCT

Список статей о внутреннем устройстве Oracle:
Нововведение Oracle 12.1 - приблизительный подсчет уникальных значений ( APPROX_COUNT_DISTINCT ~= count distinct ).
Классический подход к расчету числа уникальных значений предполагает создание хэш массива, где ключ = колонке таблицы, а в значении число совпадений.
Число элемнтов в хэш массиве будет равно числу уникальных данных в колонке таблицы:
class RealDist {
 HashMap words;

 public RealDist() {
  words = new HashMap();
 }

 public void set(String word) {
  if (!words.containsKey(word)) {
   words.put(word, 1);
  } else {
   words.put(word, words.get(word) + 1);
  }
 } //set

 public int get() {
  return words.size();
 } //get
} // RealDist

Этот способ всем хорош, пока у нас достаточно памяти под хранение хэш массива.
При росте размера данных время и потребляемые ресурсы начинают катастрофически расти.

Пример со страницы antognini с разным числом уникальных значений на 10 млн. строк:
CREATE TABLE t
AS
WITH
  t1000 AS (SELECT /*+ materialize */ rownum AS n
   FROM dual
   CONNECT BY level <= 1E3)
SELECT rownum AS id,
    mod(rownum,2) AS n_2,
    mod(rownum,4) AS n_4,
   mod(rownum,8) AS n_8,
   mod(rownum,16) AS n_16,
   mod(rownum,32) AS n_32,
   mod(rownum,64) AS n_64,
   mod(rownum,128) AS n_128,
   mod(rownum,256) AS n_256,
   mod(rownum,512) AS n_512,
   mod(rownum,1024) AS n_1024,
   mod(rownum,2048) AS n_2048,
   mod(rownum,4096) AS n_4096,
   mod(rownum,8192) AS n_8192,
   mod(rownum,16384) AS n_16384,
   mod(rownum,32768) AS n_32768,
   mod(rownum,65536) AS n_65536,
   mod(rownum,131072) AS n_131072,
   mod(rownum,262144) AS n_262144,
   mod(rownum,524288) AS n_524288,
   mod(rownum,1048576) AS n_1048576,
   mod(rownum,2097152) AS n_2097152,
   mod(rownum,4194304) AS n_4194304,
   mod(rownum,8388608) AS n_8388608,
   mod(rownum,16777216) AS n_16777216
FROM t1000, t1000, t1000
WHERE rownum <= 1E8;

Скорость вычислений классическим способом падает вместе с ростом числа уникальных значений:


Пропорционально времени работы растет и потребление PGA памяти под хранение хэш массива:


Чтобы решить проблему производительности/памяти и при этом получить приемлемый результат, можно применить приблизительный расчет числа уникальных значений.
Как и bloom filter он основан на вероятностных хэш значениях от данных.

Общий смысл приблизительного расчета (habr):

1. От каждого значения берется произвольная хэш функция
public static int fnv1a(String text) {
 int hash = 0x811c9dc5;
 for (int i = 0; i < text.length(); ++i) {
  hash ^= (text.charAt(i) & 0xff);
  hash *= 16777619;
 }
 return hash >>> 0;
} //fnv1a

2. У каждого хэш значения определяется ранк первого ненулевого бита справа.
// позиция первого ненулевого бита справа
public static int rank(int hash, int max_rank) {
 int r = 1;
 while ((hash & 1) == 0 && r <= max_rank) {
  r++;
  // смещаем вправо, пока не дойдет до hash & 1 == 1
  hash >>>= 1;
 }
 return r;
} //rank

Вероятность того, что мы встретим хеш с рангом 1 равна 0.5, с рангом 2 — 0.25, с рангом r — 1 / 2ранг
Если запоминать наибольший обнаруженный ранг R, то 2R сгодится в качестве грубой оценки количества уникальных элементов среди уже просмотренных.

Такой подход даст нам приблизительное число уникальных значений, но с сильной ошибкой.

Для данного алгоритма есть усовершенствованный вариант HyperLogLog:

а. От битового представления хэш берется 8 первых левых бит. Они становятся ключом массива (2^8 = 256 значений)
public void set(String word) {
 int hash = fnv1a(word);
 // убираем 24 бита из 32 справа - остается 8 левых
 // (=256 разных значений)
 int k = hash >>> 24;
 
 // если коллизия, то берем наибольший ранк
 hashes[k] = Math.max(hashes[k], rank(hash, 24)); 
} // count

б. Над массивов вычисляется формула:

где am - корректирующий коэффициент, m - размер массива = 256, M[] - массив ранков (hashes)

Для погрешности в 6.5% числитель этой формулы будет равен константе = 47072.7126712022335488.
Остается в цикле возвести 2 в -степень максимального ранка из каждого элемента массива:
public double get_loglog() {
 double count = 0;

 for (int i = 0; i < hashes.length; i++) {
  count += 1 / Math.pow(2, hashes[i]);
 }

 return 47072.7126712022335488 / count;
} //get_loglog

Данный алгоритм уже дает приемлемый результат на больших данных, но серьезно ошибается на небольшом наборе.
Для небольших значение применяется дополнительная математическая корректировка:
public int get() { 
 long pow_2_32 = 4294967296L;

 double E = get_loglog();

 // коррекция
 if (E <= 640) {
  int V = 0;
  for (int i = 0; i < 256; i++) {
   if (hashes[i] == 0) {
    V++;
   }
  }
  if (V > 0) {
   E = 256 * Math.log((256 / (double) V));
  }

 } else if (E > 1 / 30 * pow_2_32) {
  E = -pow_2_32 * Math.log(1 - E / pow_2_32);
 }
 // конец коррекции

 return (int) Math.round(E);
} //get

Проведенных тест на 1-2 томах войны и мира показал результаты:
* реальное количество уникальных слов = 3737
* приблизительное число уникальных слов основываясь только на максимальном ранке первого ненулевого бита справа = 16384
* приблизительное число уникальных слов по формуле из п. б на массиве из 256 элементов = 3676
* приблизительное число уникальных слов по формуле и корректировке = 3676

При ошибке числа уникальных значений в 6,1% мы затратили константное число памяти под массив из 256 элементов, что в разы меньше, чем при обычном хэш массиве.

Полный исходный код на java можно посмотреть на github

вторник, 27 декабря 2016 г.

Oracle: Lru буферный кэш

Список статей о внутреннем устройстве Oracle:

Следующий аспект больших бд, у которых все данные не способны поместиться в памяти - кеширование частоиспользуемых данных в памяти.

В Oracle для кэширования используется модифицированный алгоритм LRU.

Lru (least recent used) кэш — буфер в памяти для быстрого доступа к часто используемым элементам.
Lru кэш представляет из себя двусвязный список (2Linked List), вначале которого наиболее часто используемые элементы, а в конце - редко.
Для ускорения произвольного доступа над списком создается хэш массив (Hash Map) ссылок на элементы двусвязанного списка:

Раньше я уже описывал lru кэш со стороны разработчика бд: Ядро oracle, сегодня копнем немного глубже, с кодом lru кэша на java.


Двусвязный список с дополнительным счетчиком обращений для хранения LRU:
//двухсвязный список
class Node<Value> {
 //ключ списка
 int key;
 
 //абстрактное значение
 Value value;
 
 //счетчик обращений к элементу
 int cnt;
 
 //указатель на предыдущий элемент
 Node prev;
 
 //указатель на следующий элемент
 Node next;
 
 //элемент был перемещен из конца списка в начало
 boolean swaped;
 
 public Node(int key, Value value){
  this.key = key;
  this.value = value;
  this.cnt = 1;
  this.swaped = false;
 }

При обращении к элементу списка увеличиваем счетчик обращений "cnt":
 //получить значение из списка
 public Value getValue() {
  //увеличиваем счетчик обращений
  cnt++;
  //сбрасываем признак смещений, если элемент был прочитан
  this.swaped = false;
  
  return value;
 }
 
 //установить значение
 public void setValue(Value val) {
  this.value = val;
  //также увеличиваем счетчик
  cnt++;
  //сбрасываем признак смещений, если элемент был перезаписан
  this.swaped = false;
 } //setValue
} //Node

Класс Lru дополняется хэш массивом "map" в котором для ускорения доступа количество хэш секций (capacity) >= числу элемнтов в списке;
И стандартные указатель на начало "head" и конец "end" списка.
Дополнительное изменение в Oracle - указатель на середину списка "cold", где начинаются холодные данные.
public class Lru<Value> {
 
 //доступной число элементов в кэше
 int capacity;
 
 //хэш массив элементов для быстрого доступа
 HashMap<Integer, Node> map;
 
 //указатель на начало (горячие элементы)
 Node head = null;
 
 //указатель на середину (начало холодных элементов)
 Node cold = null;
 
 //указатель на конец (самый редкоиспользуемый)
 Node end = null;
 
 //число элементов в кэше
 int cnt;
 
 
 //конкструктор с числом элементов в кэше
 public Lru (int capacity) {
  this.capacity = capacity;
  
  //хэш массив создаем с нужным числом секций = загруженности
  map = new HashMap<Integer, Node>(capacity);
 }

Для получения элемента используется хэш массив, без последовательного просмотра всего списка:
 //получить элемент из кэша
 public Value get(int key) {
  //быстрое извлечение их хэш массива
  if(map.containsKey(key)) {
   //и инкремент счетчика обращений
   return (Value) map.get(key).getValue();
  }
  
  return null;
 } //get

Если элементов в списке меньше размер буфера, то используется обычный алгоритм LRU - новые элементы добавляются в начало списка, а старые вытесняются вправо, пока не дойдут до конца списка.
 //места достаточно, добавляем вначало
 protected void addHead(Node n) {
  
  //первый элемент
  if(this.head == null) {
   //устанавливаем начало и конец = элементу
   this.head = n;
   this.end = n;
  } else {
   
   //вставляем вначало
   
   //следующий для нового элемента = начало списка
   n.next = this.head;
   
   //предыдущий для начала списка = новый элемент
   this.head.prev = n;     
   this.head = n;
   
   //второй элемент
   if(this.end.prev == null) {
    //предыдущий для конца = новый элемент
    this.end.prev = n;
   }
  }    
  
  //устанавливаем середину
  if(cnt == capacity / 2) {
   this.cold = n;
  }
  
  //счетчик элементов + 1
  cnt++;
 } //addHead

У этого подхода есть существенный минус: редкоиспользуемый элемент может случайно вытеснить из списка популярный блок, который не вызывался небольшой период времени, за который он успел сместиться до конца.
Для решения этой проблемы Oracle применяет 2 подхода:
1. Если в конце элемент со счетчиком обращений = 1, то он открепляется от списка, а новый блок помещается в среднюю точку cold.
Для этого ссылки соседей открепляются друг от друга и перенацеливаются на новый блок, который становится новой серединой.
 //удаляем конца списка
 private void delEnd() {
  //из хэш массива
  map.remove(this.end.key);
  
  //и делаем концом списка = предыдущий элемент
  this.end = this.end.prev;
  this.end.next = null;
 } //delEnd
 
 //вконце малопопулярный блок
 protected void addColdUnPop(Node n) {
  //удаляем конец
  delEnd();
  
  //у старой середины изменяем счетчик на 1
  if(this.cold.swaped) {
   //если смещенный элемент не был ни разу считан 
   //и дошел до середины, 
   //то сбрасываем счетчик в 1
   this.cold.cnt = 1;
   this.cold.swaped = false;
  }
  
  //новый блок в середину = cold
  
  //проставляем ссылки у нового элемента
  n.prev = this.cold.prev;
  n.next = this.cold;
  
  //и разрываем связи и соседей
  n.prev.next = n;
  n.next.prev = n;
  this.cold = n;
 } //addColdUnPop

2. Если в конце популярный элемент со счетчиком > 1 , тогда последний блок открепляется, счетчик обращений делится на 2 и этот блок перемещается в начало списка.
Такой элемент помечается флагом swaped = true, который сбрасывается при любом последующем обращении к элементу.
Это предотвращает случайное вытеснение популярного блока из памяти.
Середина списка также смещается влево, одновременно делая число обращений = 1 у новой элемента-середины, если к ней никто не обращался за время смещения от начала до середины (swaped = true).
Новый блок помещается в центр списка cold.
 //вконце популярный блок
 protected void addColdPop(int key, Value value) {
  //делим счетчик на пополам
  this.end.cnt = this.end.cnt / 2;  
  
  //открепляем конец
  Node n = this.end;
  
  //удаляем конец
  delEnd();
  
  //конец перемещаем в начало
  n.prev = null;
  n.next = this.head;
  this.head.prev = n;     
  this.head = n;
  
  //помечаем, что элемент был перемещен из конца в начало
  this.head.swaped = true;
  
  //смещаем середину на 1 влево
  if(this.cold.swaped) {
   //если смещенный элемент не был ни разу считан 
   //и дошел до середины, 
   //то сбрасываем счетчик в 1
   this.cold.cnt = 1;
   this.cold.swaped = false;
  }
  this.cold = this.cold.prev;
  
  //рекурсивно пытаемся вставить вконец
  //TODO: если все популярные? то вставка будет идти очень долго
  this.set(key, value);
 } //addColdPop


События ожиданий связанные с наполнением буферного кэша:
* Buffer busy waits / read by other session - сессия пытается считать блок, который сейчас читается в кэш или модифицируется в кэше другой сессией.

Полный исходный код можно посмотреть тут: https://github.com/pihel/java/blob/master/cache/lru.java