воскресенье, 26 марта 2017 г.

Решение bigdata задач на Hadoop mapreduce

Hadoop

Набор утилит, библиотек и фреймворк для разработки и выполнения распределённых программ, работающих на кластерах из сотен и тысяч узлов.

Ссылка на установку: http://www.cloudera.com/content/www/en-us/downloads.html
На основе курса

hadoop
Hadoop состоит из 2 основных частей hdfs и map reduce. Рассмотрим подробней.

Hdfs

- распределенная файловая система. Это значит, что данные файла распределены по множеству серверов.

* hdfs лучше работает с небольшим числом больших файлов/блоков
* один раз записали, много раз считали
* можно только целиком считать, целиком очистить или дописать в конец (нельзя с середины)
* файлы бьются на блоки split (к примеру 64мб)
 ** размер блока выбирается так, чтобы нивилировать время передачи блока по сети (если сеть быстрая, то блок можно поменьше, если медленная, то больше)
* все блоки реплицируются с фактором 3 поумолчанию (хранятся в 3 копиях на разных серверах)
 ** это обеспечивает высокую пропускную способность, но не скорость реакции

Hdfs java api

Пример программы копирующей один файл в другой:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.InputStream;

Path source_path = new Path(args[0]);
Path target_path = new Path(args[1]);

Configuration conf = new Configuration();
FileSystem source_fs = FileSystem.get(source_path.toUri(), conf);
FileSystem target_fs = FileSystem.get(target_path.toUri(), conf);

//доступ к нескольким файлам по маскам
//FileStatus [] files = fs.globStatus(glob); 
//?, *, [abc], [^a], {ab, cd}

FSDataOutputStream output = target_fs.create(target_path);
InputStream input = null;
try{
    input = source_fs.open(source_path);
    IOUtils.copyBytes(input, output, conf);
} finally {
    IOUtils.closeStream(input);
    IOUtils.closeStream(output);
}

Парадигма map reduce

1. input file: split 1 + split 2 + split 3 (блоки файла). Каждый сплит хранится на своем сервере кластера.
2. mapper:
 * на каждый split N создается worker для обработки
 * worker выполяется на сервере, где находится блок
 * workerы не могут обмениваются между собой данными
  ** т.е. mapper подходит для независимых данных, которые легко побить на части.
   зависимые данные (типа архива zip) нельзя побить на части в mapper:
    все блоки (split) будут переданы в один mapper
 * в случае ошибки процесс рестратует на другой копии блока hdfs
 * combine - (необязательный шаг) - reducer внутри mappera агрегирующие данные в меньшее число данных (ключ менять нельзя)
  ** агрегирует только данные одного маппера
 * partition - (необязательный шаг) - определение куда отправится ключ (поумолчанию hash(k) mod N , где N - число reducer )
3. результат mapper записывается в виде ключ значение {k->v} в циклический буфер в памяти, конец списка пишется на локальный диск сервера с worker
  ** это сделано в целях производительности, чтобы не реплицировать
  ** но в случае ошибки, данные нельзя будет восстановить и потребует рестартовать mapper
4. данные из {k->v} расходятся на сервера reduccer на основе ключа.
Т.е. данные с разных mapperов но с одним key попадут в один reducer
5. Результат reducer складываются в hdfs. Число файлов = число reducer

Hadoop streaming

Инструмент для быстрого прототипирования map reduce задач.
можно писать python программы заготовки, общение маппера с редусером проиходит чреез stdin/out.
Данные передаются как текст разделенный табом

word count
#!/usr/bin/python
import sys
for line in sys.stdin:
  for token in line.strip().split(" "):
    if token: print(token + '\t1')
 
#!/usr/bin/python
import sys
(lastKey, sum)=(None, 0)
for line in sys.stdin:
  (key, value) = line.strip().split("\t")
  if lastKey and lastKey != key:
    print (lastKey + '\t' + str(sum))
    (lastKey, sum) = (key, int(value))
  else:
    (lastKey, sum) = (key, sum + int(value))
if lastKey:
  print (lastKey + '\t' + str(sum))

Запуск:
hadoop jar $HADOOP_HOME/hadoop/hadoop-streaming.jar \
-D mapred.job.name="WordCount Job via Streaming" \
-files countMap.py, countReduce.py \
-input text.txt \
-output /tmp/wordCount/ \
-mapper countMap.py \
-combiner countReduce.py \
-reducer countReduce.py

Java source code example

Программа подсчитывающая число слов в hdfs файле.

word count

public class WordCountJob extends Configured implements Tool {
 static public class WordCountMapper extends Mapper < LongWritable, Text, Text, IntWritable > {
  private final static IntWritable one = new IntWritable(1);
  private final Text word = new Text();
  
  @Override
  protected void map(LongWritable key, Text value, Context context)  throws IOException,  InterruptedException {
     //разбиваем строку (key) на слова и передаем пару: слово, 1
     StringTokenizertokenizer = new StringTokenizer(value.toString());
     while (tokenizer.hasMoreTokens()) {
      text.set(tokenizer.nextToken());
      context.write(text, one);
     }
  } //map
 } //WordCountMapper
  
 static public class WordCountReducer  extends Reducer < Text,  IntWritable,  Text,  IntWritable > {
   @Override
   protected void reduce(Text key, Iterable < IntWritable > values, Context context)   throws IOException,  InterruptedException {
     //в редусер приходят все единицы от одного слова
    intsum = 0;
    for (IntWritable value: values) {
     sum += value.get();
    }
    context.write(key, new IntWritable(sum));
   } //reduce
 } //WordCountReducer
  
  @Override
  public int run(String[] args) throws Exception {
   Job job = Job.getInstance(getConf(), "WordCount");
   //класс обработчик
   job.setJarByClass(getClass());
   //путь до файла
   TextInputFormat.addInputPath(job, new Path(args[0]));
   //формат входных данных (можно отнаследовать и сделать собственный)
   job.setInputFormatClass(TextInputFormat.class);
   //классы мапперов, редусееров, комбайнеров и т.д.
   job.setMapperClass(WordCountMapper.class);
   job.setReducerClass(WordCountReducer.class);
   job.setCombinerClass(WordCountReducer.class);
   //выходной файл
   TextOutputFormat.setOutputPath(job, new Path(args[1]));
   job.setOutputFormatClass(TextOutputFormat.class);
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(IntWritable.class);
   return job.waitForCompletion(true) ? 0 : 1;
  } //run
  
  public static void main(String[] args) throws Exception {
   int exitCode = ToolRunner.run( new WordCountJob(), args);
   System.exit(exitCode);
  } //main
} //WordCountJob

Такая программа будет тратить все время на передачу данных от маппера к редусеру.

Её можно усовершенствовать несколькими способами:
* Сделать комбайнер - чтобы данные из одного маппера были сагрегированы там.
 - не обязательно вызывается
 +/- агрегирует данные только одного вызова маппера (одной строки)
* хэш аггегировать внутри маппера:
 + обязательно выполнится
 +/- агрегирует данные только одного вызова маппера (одной строки)
 - нужно следить за размером хэш массива
* хэш массив сверху маппера, а внутри маппера обрабатывать
 + обязательно выполнится
 + агрегирует все вызовы маппера (сплита)
 - нужно следить за размером хэш массива

Sql операции

Разность (minus) и другое по аналогии
// на вход подаются элементы из двух множеств A и B
class Mapper
method Map(rowkey key, value t)
 Emit(value t, string t.SetName) // t.SetName либо ‘A‘ либо ‘B'
class Reducer
// массив n может быть [‘A'], [‘B'], [‘A' ‘B‘] или [‘B', ‘A']
method Reduce(value t, array n)
 if n.size() = 1 and n[1] = 'A'
  Emit(value t, null)
* hash join
class Mapper
method Initialize
 H = new AssociativeArray : join_key -> tuple from A //хэшируем меньшую таблицу целиком в память
 A = load()
 for all [ join_key k, tuple [a1, a2,...] ] in A
  H{k} = H{k}.append( [a1, a2,...] )

method Map(join_key k, tuple B)
 for all tuple a in H{k} //ищем в A соответствие ключа из B
  Emit(null, tuple [k a B] ) //если нашли, то записываем

Операции на графах

оптимальное представление в виде списка смежности, т.к. матрица смежности занимает слишком много места
* поиск кратчайшего пути
 ** обычный алгоритм - дейкстра
 ** поиск в ширину (bfs)
  на невзвешенном графе (нет связей назад):
  - Останавливаемся, как только расстояния до каждой вершины стало известно
  - Останавливаемся, когда пройдет число итераций, равное диаметру графа
  на взвешенном графе (есть обратные связи):
  - Останавливаемся, после того, как расстояния перестают меняться
* PR = a (1/N) + (1-a)* SUM( PR(i) / C(i) )
 ** N - начальное число точек в графе
 ** C - число исходящих точек
 ** a - вероятность случайного перехода
 ** pr решается только приблизительно:
  - останавливаем, когда значения перестают меняться
  - фиксированное число итерации
  - когда изменение значение меньше определенного числа
* проблемы
 ** необходимость передавать весь граф между всеми задачами
 ** итеративный режим, т.е. переход на следующий этап, когда все параллельные задачи выполнятся
* оптимизации:
 ** inmemory combining - агрегируем сообщения в общем массиве
 ** партицирование - смежные точки оказались на одном маппере
 ** структуру графа читать из hdfs, а не передавать + партицирование

Высокоуровневые языки поверх hdfs или hadoop:

Pig

высокоуровневый язык + компилятор в mapreduce
используется для быстрого написания типовых map-reduce задач

основные составляющие:
* field - поле
* tuple - кортеж (1, "a", 2)
* bag - коллекция кортежей {(), ()}
 ** коллекции могут содержать разное кол-во полей в кортеже (также они могут быть разных типов)

word count:
input_lines = LOAD 'file-with-text' AS (line:chararray);
words = FOREACH input_lines GENERATE FLATTEN(TOKENIZE(line)) AS word;
filtered_words = FILTER words BY word MATCHES '\\w+';
word_groups = GROUP filtered_words BY word;
word_count = FOREACH word_groups GENERATE COUNT(filtered_words) AS count,
group AS word;
ordered_word_count = ORDER word_count BY count DESC;
STORE ordered_word_count INTO ‘number-of-words-file';

join
//Загрузить записи в bag #1
posts = LOAD 'data/user-posts.txt' USING PigStorage(',') AS (user:chararray,post:chararray,date:long);
//Загрузить записи в bag #2
likes = LOAD 'data/user-likes.txt' USING PigStorage(',') AS (user:chararray,likes:int,date:long);
userInfo = JOIN posts BY user, likes BY user;
DUMP userInfo;

Hive

sql подобный язык hivesql поверх hadoop
основные составляющие - как в бд
метаданные могут храниться в mysql /derby

* создание таблицы
hive> CREATE TABLE posts (user STRING, post STRING, time BIGINT)
 > ROW FORMAT DELIMITED
 > FIELDS TERMINATED BY ','
 > STORED AS TEXTFILE;

* при нарушении схемы ошибки не будет, вставится NULL

join
hive> INSERT OVERWRITE TABLE posts_likes
 > SELECT p.user, p.post, l.count
 > FROM posts p JOIN likes l ON (p.user = l.user);

word count
CREATE TABLE docs (line STRING);
LOAD DATA INPATH 'docs' OVERWRITE INTO TABLE docs;
CREATE TABLE word_counts AS
SELECT word, count(1) AS count FROM
(SELECT explode(split(line, '\s')) AS word FROM docs) w
GROUP BY word
ORDER BY word;

Nosql

* Масштабирование:
 ** мастер-слейв: распределение нагрузки cpu, io остается общим
 ** шардинг: деление данных таблицы по серверам: распределение io, cpu Общее
* cap theorem - достигается только 2 из 3
 ** consistency - непротиворечивость
  Чаще всего жертвует строгой целостностью в текущий момент, на целостность в конечном счете
 ** aviability - доступность
 ** partioning - разделение данных на части
* типы:
 ** ключ-значение
 ** колоночная
 ** документоориентировання
 ** графовая

Hbase

- поколоночная база
 ** строки в таблицах шардируются на сервера
  *** строки хранятся в отстортированном виде, чтобы можно было искать по серверам
  *** список шардов хранится в HMaster
   через него определяется сервер для чтений, а дальше чтение идет уже напряму с него миную hmaster
   ! возможная точка отказа
 ** столбцы объединяются в column family
  *** каждая такая группа сохраняется также в отдельный файл
  *** для всей группы задается степень сжатия и другие физические настройки
 ** изменение/удаление происходит через вставку нового значения в колонку с TS
  *** при select выбираются последние 3 изменения (если в колонке не было изменений, то берется предыдущая максимальная версия)
 ** промежуточные данные хранятся в memstore сервера и логе изменений
  *** периодически данные из памяти скидываются на hdfs
 ** hfile - мапится в файл hdfs, который также splitit'ится на разные сервера
 ** сжатие файлов хранилища:
  *** объединение файлов CF в один
  *** очистка от удаленных записей

 Массовость - деление данных на сервера по:
 * по семейству колонок
 * по диапазону ключей
 * по версии строки
 * сам файл разбивается в hdfs на сплиты

Cassandra

отличия:
** есть sql, а не только java api
** CF имеют иерархичность и могут также объединяться в группы по какомуто признаку
** настраевая consistency от самой строгой - ожидание отклика всех серверов, то записи без проверок
** нет единой точки отказа hmaster, данные делятся по диапазону
 ** но усложняет деление и поиск данных

* greenplum - классическая колоночная бд, с шардирование данных на основе pgsql
 ** есть поддержка sql

Spark

Альтернативный вариант реализации hadoop.

* Преимущества перед hadoop:
1. Необязательно сохранять промежуточные результаты в HDFS при итеративной разработке. Данные можно хранить в памяти.
2. Необязательно чередовать map-reduce, можно делать несколько reducer подряд
3. Операции в памяти (вкллючая синхронизацию между серверами), а не через hdfs
4. Возможность загрузить данные в распределенную память, а потом делать обработку (каждому mapper уже не надо считывать данные из hdfs)

* В основе может лежать любой вид хранилища данных
* Архитектура, как в hadoop: name node (driver) + worker

* Основная составляющая вещь - RDD:
 ** объект партицируется
 ** каждая партиция имеет произвольное хранение: память, диск, hdfs и т.д.
  *** Для хранения данных в памяти используется LRU буфер.
   Старые, редко используемые данны вытесняются в медленное хранилище
 ** объект неизменяем (изменение создает новый объект)
 ** rdd распределена по серверам

* Программа состоит из однонаправленного графа операций без циклов
 Виды зависимостей в графе:
 ** Narrow (узкая) - переход 1 партииции в 1 другую
 ** wide (широкая) - переход 1 (N) партиций в N (1)

* Восстановление в случае падения:
 - нет репликации как в hdfs
 + для воостановления используется перезапуск
  ** если потерялись данные из одной партиции в Narrow связи, то восстанавливается только она из предыдущей партиции
  ** если в wide то перезапускаются все партиции (т.к. связ не 1 к 1).
   Результат wide поумолчанию сохраняется на диск.
 ** Если есть шанс потерять данные, а цепочка длинная, то лучше самим самостоятельно сохранять промежуточные результаты на диск.

* Типы операции состоят из:
 ** Трансформация - не запускают работу
  map, filer, group, union, join ....
 ** Действий - запускают работу (все трансформации в графе до этого графа)
  count, collect, save, reduce

* spark поддерживает:
 ** java
 ** scala
 ** python

Join на scala
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkContext._
  val sc = new SparkContext(master, appName, [sparkHome], [jars])
 
  //transform
  //или из текстового файла  sc.textFile("file.txt")
  val visits = sc.parallelize(
   Seq(("index.html", "1.2.3.4"),
   ("about.html", "3.4.5.6"),
   ("index.html", "1.3.3.1")))
  val pageNames = sc.parallelize(
   Seq(("index.html", "Home"),
   ("about.html", "About")))
  visits.join(pageNames)
  
  //action
  visits.saveAsTextFile("hdfs://file.txt")
  // ("index.html", ("1.2.3.4", "Home"))
  // ("index.html", ("1.3.3.1", "Home"))
  // ("about.html", ("3.4.5.6", "About"))
word count
val file = sc.textFile("hdfs://...")
val sics = file.filter(_.contains("MAIL"))
val cached = sics.cache()
val ones = cached.map(_ => 1)
val count = ones.reduce(_+_)
val file = sc.textFile("hdfs://...")
val count = file.filter(_.contains(“MAIL")).count()
или
val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
 .map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

* Переменные для обмена данными между серверами
 ** broadcast variables - переменная только для чтения
  val broadcastVar = sc.broadcast(Array(1, 2, 3))
  ...
  broadcastVar.value 
 ** Accumulators - счетчики
  val accum = sc.accumulator(0)
  accum += x
  accum.value

Yarn

планировщик ресурсов, которые избавляет от недостатков классического MR:
* жесткое разделенеи ресурсов: можно освободить ресурсы от mapper после их полного выполнения и отдать все ресурсы reducer
* возможность разделять ресурсы не только между MR Задачами, но и другими процессами

* разделение происходит в понятии: озу, cpu, диск, сеть и т.д.
* он же производит запуск задач (не только MR) и восстанавливает в случае падения
* для работы нужны вспомогательные процессы:
  ** appserver - получение комманд от клиента
  ** resource server - север для выделяющий ресурсы
  ** node server - сервер для запуска задач и отслеживания за ними (на каждом сервере)