четверг, 29 ноября 2018 г.

Введение в Scala и параллельную разработку

В этой статье я хотел бы охватить все аспекты работы с данными на языке Scala - от примитивов языка до параллельного программирования.

четверг, 28 июня 2018 г.

Введение в нейронные сети

Эта статья-заметка освещает основные понятия нейронных сетей и как их строить применяя python.

четверг, 24 мая 2018 г.

LSM дерево: быстрый доступ по ключу в условиях интенсивной вставки

В условиях интенсивной вставки для быстрого доступа к данным обычные btree индексы не подходят.
Во многих базах (Bigtable, HBase, LevelDB, MongoDB, SQLite4, Tarantool, RocksDB, WiredTiger, Apache Cassandra, и InfluxDB ) используется LSM-дерево (Log-structured merge-tree — журнально-структурированное дерево со слиянием)
LSM дерево служит для хранения ключ-значения.
В данной статье рассмотрим двухуровневое дерево. Первый уровень целиком находится в памяти, вторая половина на диске.
Вставка идет всегда в первую часть в памяти, а поиск из всех частей. Когда размер данных в памяти превышает определенный порог, то она записывается на диск. После чего блоки на диске объединяются в один.
Рассмотрим алгоритм по частям:

1. Все записи в дереве маркируются порядковым номером glsn , при каждой вставке этот счетчик увеличивается

Структура для хранения ключа-значения и порядкового номера:
//запись с ключ-значение
class SSItem {
 Integer key;
 String value;
 //+ идентификатор последовательности вставки
 Integer lsn;
 
 SSItem(Integer key, String value, Integer lsn) {
  this.key = key;
  this.value = value;
  this.lsn = lsn;
 }

2. Структуры объядиняются в одну таблицу в памяти.
В моем алгоритме используется хэш таблица.
В реальности чаще всего используется отсортированная по ключу структура - это дает преимущества, что для поиска и слияния используется последовательное чтение, вместо рандомного.
class MemSSTable { 
 
 //из-за хэша нет поиска по диапазону
 HashMap<Integer, SSItem> itms = new HashMap<Integer, SSItem>();
 

3. Т.к. размер памяти ограничен, то при превышении определенного порога, данные должны быть скинуты на диск.
В моей реализации - это простой вариант без фоновых заданий и асинхронных вызовов
При достижении лимита будет фриз, т.к. таблица скидывается на диск.
В реальных системах используется упреждающая запись: данные на диск скидываются асинхронно и заранее, до достижения жесткого лимита памяти.
 void SaveToDisk() throws FileNotFoundException, UnsupportedEncodingException {
  indx.path = "sstable_" + indx.max_lsn + ".dat";
  PrintWriter writer = new PrintWriter(indx.path, "UTF-8");
  Integer pad = 0;
  //последовательно пишем 10 байт с длиной значения и само значение
  for(Entry<Integer, SSItem> entry : itms.entrySet()) {
   SSItem itm = entry.getValue();
   String val = itm.get();
   writer.print( val );
   //регистрируем в индексе смещения в файле
   indx.keys.put(itm.key, pad);
   pad = pad + val.length();
  }
  writer.close();
  
  LSMTree.indexes.add(indx);
 } //SaveToDisk

4. Получается, что половина всех данных у нас хранится в хэше в памяти, а остальное в виде файлов на диске.
Чтобы не перебирать все файлы на диске создается дополнительная индексная структура, которая будет хранить метаинформацию о файлах в памяти:
//индекс над таблицей с даными
class SSTableIndex {
 //метаданные:
 //минимальный и максимальный ключи в таблице
 Integer min_key;
 Integer max_key;
 //минимальный и максимальный порядковый lsn
 Integer min_lsn;
 Integer max_lsn;
 
 //если таблица на диске, то путь до файла
 String path;
 
 //ключ - смещение ключа в файле
 HashMap<Integer, Integer> keys = new HashMap<Integer, Integer>();
 
 //добавить ключ в индекс
 void add(Integer k) {
  //также обновляем метаданные
  max_lsn = LSMTree.glsn;
  if(min_lsn == null) min_lsn = max_lsn;
  if(min_key == null || k < min_key) min_key = k;
  if(max_key == null || k > max_key) max_key = k;
  //добавление идет в память в хэш таблицу, на первом этапе смещения в файле нет
  keys.put(k, 0);
 }

5. Управляющий объект всего дерева хранит ссылку на таблицу с данными в памяти и на мета-индексы данных на диске:
public class LSMTree {
 
 static int glsn = 0;
 //максимальный размер, после которого таблица должна быть скинута на диск
 //для упрощения алгоритма = числу записей, а не размеру в байтах
 final static int max_sstable_size = 10; 
 
 //текущая таблица в памяти, куда вставляем данные
 MemSSTable MemTable;
 
 //все индексы, даже для таблиц на диске, хранятся в памяти
 static LinkedList<SSTableIndex> indexes = new LinkedList<SSTableIndex>();

6. Когда все объекты описаны, давайте посмотрим как происходит добавление нового элемента.
Добавление всегда идет в таблицу в памяти, что гарантирует быструю вставку:
public class LSMTree {
 //....
 
 //добавить запись
 public void add(Integer k, String v) {
  MemTable.add(k, v);
 }
Подробней логика добавления в таблицу в памяти:
* Увеличиваем глобальный счетчик элементов LSMTree.glsn
* Если число элментов превысило порог, то таблица скидывается на диск и очищается
В реальности это конечно не число элементов, а объем в байтах.
* Создается новый элемент SSItem
* И доблавляется в хэш массив
Причем, если элемент до этого уже существовал, то он перезаписывается.
В реальности хранятся все записи, чтобы была возможность поддержки транзакционности.
* Кроме этого в индексе обновляются метаданные indx.add
class MemSSTable { 
 //.....
 
 //добавить новый элемент
 void add(Integer k, String v) {
  //увеличиваем глобальный счетчик операций glsn
  LSMTree.glsn++;
  
  //если размер превышает, 
  if(itms.size() >= LSMTree.max_sstable_size) {
   try {
    //то сохраняем таблицу на диск
    //в реальных движках используется упреждающая фоновая запись
    //когда папять заполнена на N% (n<100), то данные скидываются на диск заранее, чтобы избежать фриза при сбросе памяти и записи на диск
    SaveToDisk();
   } catch (FileNotFoundException | UnsupportedEncodingException e) {
    e.printStackTrace();
    return;
   }
   //очищаем данные под новые значения
   indx = new SSTableIndex();
   itms = new HashMap<Integer, SSItem>();
  }
  
  //обновляем медаданные в индексе
  indx.add(k);
  
  SSItem itm = new SSItem(k, v, indx.max_lsn);
  
  //в моей реализации, при повторе ключ перезаписывается
  //т.е. транзакционность и многоверсионность тут не поддерживается
  itms.put(k,  itm);
  
 } //add
Если после вставки нового элемента старые данные оказались на диске, то в метаданных индекса прописывается ссылка на точное смещение в файле:
class SSTableIndex {
 //...
 
 //если таблица на диске, то путь до файла
 String path;
 
 //ключ - смещение
 HashMap<Integer, Integer> keys = new HashMap<Integer, Integer>();

7. С вставкой разобрались, теперь обратная операция - поиск элемента в дереве:
* Сперва мы проверяем наличие ключа в хэш таблице в памяти. Если элемент нашелся, то на этом все.
* Если элемента нет, то мы пробегамся по всем метаиндексам в поиске нашего ключа.
* проверяем, что ключ входит в диапазон indx.min_key - indx.max_key индекса
* И для полной точности проверяем наличие ключа в хэше ключей indx.keys.containsKey
* Если элемент нашелся, то это еще не конец, цикл продолжается, пока мы не переберем все индексы.
Т.к. ключ может добавляться несколько раз в разные промежутки времени.
* Из всех совпадений выбираем индекс с максимальным счетчиком вставки - это последнее изменение ключа
public class LSMTree {
 //.....
 
 //получить значение по ключу
 String getKey(Integer key) {
  //сперва смотрим в памяти
  String val = MemTable.getKey(key);
  
  if(val == null) {
   SSTableIndex indx_with_max_lsn = null;
   //потом таблицу по индексам, которая содержит наш ключ
   //если содержится в нескольких, то берем с максимальным lsn, т.к. это последнее изменение
   for (SSTableIndex indx : indexes) {
    Integer max_lsn = 0;
    if(key >= indx.min_key && key <= indx.max_key && max_lsn < indx.max_lsn ) {
     if(indx.keys.containsKey(key)) {
      max_lsn = indx.max_lsn;
      indx_with_max_lsn = indx;
     }
    }
   }
   //читаем из таблицы с диска
   if(indx_with_max_lsn != null) {
    try {
     return indx_with_max_lsn.getByPath(key);
    } catch (IOException e) {
     e.printStackTrace();
    }
   }
  }
  
  return val;
 }
* Определим нужный индекс обращаемся по нему к таблице на диске:
** Открываем нужный файл
В реальности, скорей всего, файы постоянно держатся открытыми для экономии времени.
** Следуя смещенями из индекса переходим к нужной точке файла file.seek
** И считываем значение file.read
class SSTableIndex {
 //...
 
 //получить значение ключа из открытого файла
 String getByPath(Integer key, RandomAccessFile file) throws IOException {
  //получаем смещение в файле для ключа из индекса
  Integer offset = keys.get(key);
  
  //смещаемся в файле
  file.seek(offset);
  
  //резервируем 10 байт под переменную с длинной значения
  byte[] lenb = new byte[10];
  file.read(lenb, 0, 10);
  
  Integer len = Integer.parseInt(new String(lenb, StandardCharsets.UTF_8));
 
  file.seek(offset + 10);
  
  //считываем значение
  byte[] valb = new byte[len];
  file.read(valb, 0, len);
  
  return new String(valb, StandardCharsets.UTF_8);
 } //getByPath
 
 //получить значение ключа с диска
 String getByPath(Integer key) throws IOException {
  if(!keys.containsKey(key)) return null;
  
  RandomAccessFile file = new RandomAccessFile(path, "r");
  
  String val = getByPath(key, file);
  
  file.close();

  return val;
 }

8. Т.к. ключ может вставляться неограниченное число раз, то со временем он может находится в нескольких файлах на диске одновременно.
Также, если бы LSM дерево поддерживало транзакционность, то в файлах также хранились бы разные версии одного ключа (изменения или удаления).
Для оптимизации последующего поиска применяется операция слияния нескольких файлов в один:
* Сортируем индексы по убыванию lsn
* Последовательно считываем элементы из первого индекса и записываем в новый
* Если элемент уже присутствует в объединенном индексе, то он пропускается
* Создается новый единственный файл и индекс со всеми ключами и значениями
* Старые файлы и индексы удаляются
public class LSMTree {
 //....
 
 //объединить несколько таблиц на диске в 1 большую
 void merge() throws IOException {
  Integer min_key = null;
  Integer max_key = null;
  Integer min_lsn = null;
  Integer max_lsn = null;
  
  //сортируем таблицы по убыванию lsn
  //чтобы вначале были самые свежие ключи
  Collections.sort(indexes, new Comparator<SSTableIndex>() {
   @Override
   public int compare(SSTableIndex o1, SSTableIndex o2) {
    if(o1.max_lsn > o2.max_lsn) {
     return -1;
    } else if(o1.max_lsn < o2.max_lsn) {
     return 1;
    }
    return 0;
   }
  });
  
  SSTableIndex merge_indx = new SSTableIndex();
  
  Integer pad = 0;
  merge_indx.path = "sstable_merge.dat";
  PrintWriter writer = new PrintWriter(merge_indx.path, "UTF-8");
  
  //пробегаемся по всем индексам, чтобы слить в 1
  for (SSTableIndex indx : indexes) {
   if(min_lsn == null || indx.min_lsn < min_lsn) min_lsn = indx.min_lsn;
   if(min_key == null || indx.min_key < min_key) min_key = indx.min_key;
   if(max_key == null || indx.max_key > max_key) max_key = indx.max_key;
   if(max_lsn == null || indx.max_lsn > max_lsn) max_lsn = indx.max_lsn;
   
   RandomAccessFile file = new RandomAccessFile(indx.path, "r");
   
   //т.к. данные в таблицах не упорядочены, это приводит к рандомным чтениям с диска
   //в реальности делают упорядочнный по ключу массив, чтобы делать быстрые последовательные чтения
   for(Entry<Integer, Integer> entry : indx.keys.entrySet()) {
    //оставляем запись только с максимальным lsn
    Integer key = entry.getKey();
    if(!merge_indx.keys.containsKey(key)) {
     String val = indx.getByPath(key, file);
     
     SSItem itm = new SSItem(key, val, 0);
     String itmval = itm.get();
     
     writer.print( itmval );
     merge_indx.keys.put(key, pad);
     pad = pad + itmval.length();     
    }
   }
   //записываем и удаляем старые файлы
   file.close();
   //delete
   File fd = new File(indx.path);
   fd.delete();
  }  
  
  merge_indx.min_lsn = min_lsn;
  merge_indx.min_key = min_key;
  merge_indx.max_key = max_key;
  merge_indx.max_lsn = max_lsn;
  
  writer.close();
  
  //переименовываем к обычному имени
  File fd = new File(merge_indx.path);
  merge_indx.path = "sstable_" + merge_indx.max_lsn + ".dat";
  File fdn = new File(merge_indx.path);
  fd.renameTo(fdn);
  
  indexes = new LinkedList<SSTableIndex>();
  indexes.add(merge_indx);
  
 } //merge
В моей реализации используется хэш массив, что дает большое число случайных чтений с диска.
В реальности данные хранят в отсотированной структуре, что позволят выполнять слияние 2 отсортированных массивов в один за линейное время используя только быстрое последовательное чтение.


Полный код класса LSM дерева можно посмотреть на github

пятница, 5 января 2018 г.

Hive: авторизация и аутентификация

Сборка Cloudera Hadoop не содержит никаких настроек авторизации и аутентификации, что конечно плохо, т.к. не дает возможности разделить права среди пользователей.
Опишу один из вариантов настройки авторизации и аутентификации.

Custom аутентификация:

1. Создаем linux пользователя на сервере hadoop:
sudo useradd user1
#создаем домашнюю директорию в hdfs
sudo -u hdfs hadoop fs -mkdir /user/user1
#задаем пользователя владельцем
sudo -u hdfs hadoop fs -chown user1 /user/user1
sudo passwd user1
#лочим пользователя в linux
sudo passwd -l user1

2. Создаем пользователя в hive
sudo -u hive hive
CREATE SCHEMA user1 LOCATION "/user/user1";

3. Создаем java класс, через который будет происходить проверка пользователя:
package hive.lenta;

import java.util.Hashtable;
import javax.security.sasl.AuthenticationException;
import org.apache.hive.service.auth.PasswdAuthenticationProvider;
import java.io.*;


public class LentaAuthenticator implements PasswdAuthenticationProvider {

  Hashtable<String, String> store = null;

  public LentaAuthenticator () {
    store = new Hashtable<String, String>();
 
 try {
  readPwd();  
 } catch (IOException e) {
  e.printStackTrace();
 }
 
  } //LentaAuthenticator
  
  public void readPwd () throws IOException {
 BufferedReader br = new BufferedReader(new FileReader("/u01/jar/pwd.ini"));
 String line;
 while ((line = br.readLine()) != null) {
  String login[] = line.split(";");
  if(login.length == 2) {
   store.put(login[0].trim(), login[1].trim());
  }
 }
 br.close();

  } //readPwd

  @Override
  public void Authenticate(String user, String  password)
      throws AuthenticationException {

    String storedPasswd = store.get(user);

    if (storedPasswd != null && storedPasswd.equals(password))
      return;
     
    throw new AuthenticationException("LentaAuthenticator: Error validating user");
  }

}
В файле /u01/jar/pwd.ini лежит логин и пароль, разделенные точкой с запятой:
user1;passwd

4. Компилируем класс в объект и собираем jar файл:
компилировать нужно именно той java под которой работает hadoop:
/usr/java/jdk1.7.0_67-cloudera/bin/javac -classpath /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/hive-service.jar -d /home/askahin/auth /home/askahin/auth/LentaAuthenticator.java
/usr/java/jdk1.7.0_67-cloudera/bin/jar -cf /home/askahin/auth/jar/lentauth.jar ./hive
sudo cp jar/lentauth.jar /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/
sudo chmod 777 /u01/cloudera/parcels/CDH-5.11.1-1.cdh5.11.1.p0.4/lib/hive/lib/lentauth.jar
sudo cp jar/lentauth.jar /u01/jar/lentauth.jar
sudo chmod 777 /u01/jar/lentauth.jar

5. Включаем CUSTOM аутентификацию в настройках Cloudera Manager:
прописываем наш аутентификатор в настройках:
* Ищем все настройки по паттерну "hive-site.xml" в настройках Cloudera.
Делать нужно именно через cloudera, а не напрямую в файле "hive-site.xml", т.к. менеджер переписывает файл из памяти процесса.

Нажимаем view as xml и во всех окнах и вводим
<property>
    <name>hive.server2.authentication</name>
    <value>CUSTOM</value>
  </property>
  <property>
    <name>hive.server2.custom.authentication.class</name>
    <value>hive.lenta.LentaAuthenticator</value>
</property>

Также ставим флажки suppress parameter validation
Иначе настройки не попадут в итоговый XML файл
* Для работы также нужна включенная опция hive.server2.enable.doAs = true
Она означает, что все запросы будут идти из под пользователя, который прошел аутентификацию.

5. Задаем путь до кастомной папки с jar файлами:
Настройка: «HIVE_AUX_JARS_PATH»
Значение «/u01/jar»,
Также ставим suppress parameter validation


6. Все готово, перезапускам HIVE из консоли Cloudera

7. Теперь при аутентификации нужно указывать способ:
AuthMech=3


После того как все настроили, мы уверены, что пользователь прошел аутентификацию.
Войти без пароля больше не получится.

Дальше настраиваем авторизацию - выдачу прав пользователям на конкретные объекты.
Проще всего это сделать через расширенные права linux:


ACL авторизация


1. Включаем ACL права через Cloudera Manager
настройка "dfs.namenode.acls.enabled" = true


2. После этого можно распределять права на папки:
Сброс всех прав:
sudo -u hdfs hdfs dfs -setfacl -R -b /

Даем пользователю user1 права на создание таблиц в своей схеме
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:rwx,group::r-x,other::--- /user/user1
к остальному хранилищу hive даем права только на чтение:
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:r-x,group::r-x,other::--- /user/hive/warehouse
к какой-то из таблиц закрываем доступ:
sudo -u hdfs hdfs dfs -setfacl -R --set user::rwx,user:hadoop:rwx,user:hive:rwx,user:user1:---,group::r-x,other::--- /user/hive/warehouse/table1
Один минус у ACL - это остается возможность выполнить DROP table, даже если нет прав доступа на чтение или запись.
Как победить эту проблему я не понял.

Так же стоит не забывать, что таким образом мы защитили только HIVE, возможность читать данные через HDFS по прежнему остались.
(Как читать данные из HDFS и HIVE через java можно посмотреть в моем github: https://github.com/pihel/java/blob/master/bigdata/Hdfs2Hive.java )

Чтобы закрыть доступ чтения через HDFS мы используем закрытие всех портов на сервере, кроме 10000 , который используется службой HIVE.