Это статья краткая заметка по курсу Data Engineering on AWS Learning Plans (includes labs)В некотором роде - это аналог курса Инженер облачных сервисов Yandex, но, по ощущениям, имеет меньшую техническую составляющую.
Личный блог. Заметки о программировании и не только
Это статья краткая заметка по курсу Data Engineering on AWS Learning Plans (includes labs)
Вводная статья о основных возможностях DBT.
Памятка на основе прохождения курса Инженер облачных сервисов (Сертификат)
A list of new features in Spark that I think are important for a developer.
Статья заметка, на основании прохождения курса
Grokking Modern System Design Interview for Engineers & Managers (Сертификат)
//запись с ключ-значение
class SSItem {
Integer key;
String value;
//+ идентификатор последовательности вставки
Integer lsn;
SSItem(Integer key, String value, Integer lsn) {
this.key = key;
this.value = value;
this.lsn = lsn;
}
class MemSSTable {
//из-за хэша нет поиска по диапазону
HashMap<Integer, SSItem> itms = new HashMap<Integer, SSItem>();
void SaveToDisk() throws FileNotFoundException, UnsupportedEncodingException {
indx.path = "sstable_" + indx.max_lsn + ".dat";
PrintWriter writer = new PrintWriter(indx.path, "UTF-8");
Integer pad = 0;
//последовательно пишем 10 байт с длиной значения и само значение
for(Entry<Integer, SSItem> entry : itms.entrySet()) {
SSItem itm = entry.getValue();
String val = itm.get();
writer.print( val );
//регистрируем в индексе смещения в файле
indx.keys.put(itm.key, pad);
pad = pad + val.length();
}
writer.close();
LSMTree.indexes.add(indx);
} //SaveToDisk
//индекс над таблицей с даными
class SSTableIndex {
//метаданные:
//минимальный и максимальный ключи в таблице
Integer min_key;
Integer max_key;
//минимальный и максимальный порядковый lsn
Integer min_lsn;
Integer max_lsn;
//если таблица на диске, то путь до файла
String path;
//ключ - смещение ключа в файле
HashMap<Integer, Integer> keys = new HashMap<Integer, Integer>();
//добавить ключ в индекс
void add(Integer k) {
//также обновляем метаданные
max_lsn = LSMTree.glsn;
if(min_lsn == null) min_lsn = max_lsn;
if(min_key == null || k < min_key) min_key = k;
if(max_key == null || k > max_key) max_key = k;
//добавление идет в память в хэш таблицу, на первом этапе смещения в файле нет
keys.put(k, 0);
}
public class LSMTree {
static int glsn = 0;
//максимальный размер, после которого таблица должна быть скинута на диск
//для упрощения алгоритма = числу записей, а не размеру в байтах
final static int max_sstable_size = 10;
//текущая таблица в памяти, куда вставляем данные
MemSSTable MemTable;
//все индексы, даже для таблиц на диске, хранятся в памяти
static LinkedList<SSTableIndex> indexes = new LinkedList<SSTableIndex>();
public class LSMTree {
//....
//добавить запись
public void add(Integer k, String v) {
MemTable.add(k, v);
}
Подробней логика добавления в таблицу в памяти:
class MemSSTable {
//.....
//добавить новый элемент
void add(Integer k, String v) {
//увеличиваем глобальный счетчик операций glsn
LSMTree.glsn++;
//если размер превышает,
if(itms.size() >= LSMTree.max_sstable_size) {
try {
//то сохраняем таблицу на диск
//в реальных движках используется упреждающая фоновая запись
//когда папять заполнена на N% (n<100), то данные скидываются на диск заранее, чтобы избежать фриза при сбросе памяти и записи на диск
SaveToDisk();
} catch (FileNotFoundException | UnsupportedEncodingException e) {
e.printStackTrace();
return;
}
//очищаем данные под новые значения
indx = new SSTableIndex();
itms = new HashMap<Integer, SSItem>();
}
//обновляем медаданные в индексе
indx.add(k);
SSItem itm = new SSItem(k, v, indx.max_lsn);
//в моей реализации, при повторе ключ перезаписывается
//т.е. транзакционность и многоверсионность тут не поддерживается
itms.put(k, itm);
} //add
Если после вставки нового элемента старые данные оказались на диске, то в метаданных индекса прописывается ссылка на точное смещение в файле:
class SSTableIndex {
//...
//если таблица на диске, то путь до файла
String path;
//ключ - смещение
HashMap<Integer, Integer> keys = new HashMap<Integer, Integer>();
public class LSMTree {
//.....
//получить значение по ключу
String getKey(Integer key) {
//сперва смотрим в памяти
String val = MemTable.getKey(key);
if(val == null) {
SSTableIndex indx_with_max_lsn = null;
//потом таблицу по индексам, которая содержит наш ключ
//если содержится в нескольких, то берем с максимальным lsn, т.к. это последнее изменение
for (SSTableIndex indx : indexes) {
Integer max_lsn = 0;
if(key >= indx.min_key && key <= indx.max_key && max_lsn < indx.max_lsn ) {
if(indx.keys.containsKey(key)) {
max_lsn = indx.max_lsn;
indx_with_max_lsn = indx;
}
}
}
//читаем из таблицы с диска
if(indx_with_max_lsn != null) {
try {
return indx_with_max_lsn.getByPath(key);
} catch (IOException e) {
e.printStackTrace();
}
}
}
return val;
}
* Определим нужный индекс обращаемся по нему к таблице на диске:
class SSTableIndex {
//...
//получить значение ключа из открытого файла
String getByPath(Integer key, RandomAccessFile file) throws IOException {
//получаем смещение в файле для ключа из индекса
Integer offset = keys.get(key);
//смещаемся в файле
file.seek(offset);
//резервируем 10 байт под переменную с длинной значения
byte[] lenb = new byte[10];
file.read(lenb, 0, 10);
Integer len = Integer.parseInt(new String(lenb, StandardCharsets.UTF_8));
file.seek(offset + 10);
//считываем значение
byte[] valb = new byte[len];
file.read(valb, 0, len);
return new String(valb, StandardCharsets.UTF_8);
} //getByPath
//получить значение ключа с диска
String getByPath(Integer key) throws IOException {
if(!keys.containsKey(key)) return null;
RandomAccessFile file = new RandomAccessFile(path, "r");
String val = getByPath(key, file);
file.close();
return val;
}
public class LSMTree {
//....
//объединить несколько таблиц на диске в 1 большую
void merge() throws IOException {
Integer min_key = null;
Integer max_key = null;
Integer min_lsn = null;
Integer max_lsn = null;
//сортируем таблицы по убыванию lsn
//чтобы вначале были самые свежие ключи
Collections.sort(indexes, new Comparator<SSTableIndex>() {
@Override
public int compare(SSTableIndex o1, SSTableIndex o2) {
if(o1.max_lsn > o2.max_lsn) {
return -1;
} else if(o1.max_lsn < o2.max_lsn) {
return 1;
}
return 0;
}
});
SSTableIndex merge_indx = new SSTableIndex();
Integer pad = 0;
merge_indx.path = "sstable_merge.dat";
PrintWriter writer = new PrintWriter(merge_indx.path, "UTF-8");
//пробегаемся по всем индексам, чтобы слить в 1
for (SSTableIndex indx : indexes) {
if(min_lsn == null || indx.min_lsn < min_lsn) min_lsn = indx.min_lsn;
if(min_key == null || indx.min_key < min_key) min_key = indx.min_key;
if(max_key == null || indx.max_key > max_key) max_key = indx.max_key;
if(max_lsn == null || indx.max_lsn > max_lsn) max_lsn = indx.max_lsn;
RandomAccessFile file = new RandomAccessFile(indx.path, "r");
//т.к. данные в таблицах не упорядочены, это приводит к рандомным чтениям с диска
//в реальности делают упорядочнный по ключу массив, чтобы делать быстрые последовательные чтения
for(Entry<Integer, Integer> entry : indx.keys.entrySet()) {
//оставляем запись только с максимальным lsn
Integer key = entry.getKey();
if(!merge_indx.keys.containsKey(key)) {
String val = indx.getByPath(key, file);
SSItem itm = new SSItem(key, val, 0);
String itmval = itm.get();
writer.print( itmval );
merge_indx.keys.put(key, pad);
pad = pad + itmval.length();
}
}
//записываем и удаляем старые файлы
file.close();
//delete
File fd = new File(indx.path);
fd.delete();
}
merge_indx.min_lsn = min_lsn;
merge_indx.min_key = min_key;
merge_indx.max_key = max_key;
merge_indx.max_lsn = max_lsn;
writer.close();
//переименовываем к обычному имени
File fd = new File(merge_indx.path);
merge_indx.path = "sstable_" + merge_indx.max_lsn + ".dat";
File fdn = new File(merge_indx.path);
fd.renameTo(fdn);
indexes = new LinkedList<SSTableIndex>();
indexes.add(merge_indx);
} //merge
В моей реализации используется хэш массив, что дает большое число случайных чтений с диска.