четверг, 29 ноября 2018 г.

Введение в Scala, параллельную разработку и Apache Spark

В этой статье я хотел бы охватить все аспекты работы с данными на языке Scala - от примитивов языка к параллельному программированию и до анализа данных на кластере Spark.

Введение в Scala

Переменные

val - константа
var - Изменяемая переменная:
var s:String = "test"
- все типы являются классами (нет встроенных типов): Byte, Char, Short, Long, Float, Double, Boolean
- String используется java.lang.String но добавляет дополнительные функции (StringOps, RichInt, RichDouble ...)
"Hello".intersect("World") //== lo

Операторы

- Все операторы, на самом деле методы:
a+b = a.+(b)
- нет инкремента (++) и декремента (--) , только i+=1
- Некоторые функции (типа sqrt) можно использовать без объявления пакета после import
scala.math.sqrt(16) //полная запись
import scala.math._
sqrt(16) //частичная запись
- s(i) - java аналог s.charAt(i)
явный вызов в scala: s.apply(i)
- () - аналог void
- ; - нужна только если в строке несколько выражений
- {} - несколько операторов в блоке, возвращает значение - это будет последнее выражение
var a = {a; b; sqrt(16)} //вернет = 4
- результатом присвоения является () - в java = присваеваемому

Условия

- Условия возвращают значение
val s = if (x > 0) 1 else -1
это удобней, т.к. может быть использовано для инициализации const
аналогом c++ является x > 0 ? 1 : -1

Вывод в консоль

print(f"text=$a1 and ${...}") //расширенный аналог printf в java
, где:
$a - переменная
${} - выражение

Циклы

- while / do - аналогично java
- цикл for работает по принципу foreach - перебора элементов последовательности:
for (i <- 1 to n) {}
1 to n - генерирует последовательность чисел, которая обходится последовательно
последовательность может быть любая - выражение или просто строка
- нет break и continue
- в for может быть несколько итераторов через ; и выражение может содержать условие:
for (i <- 1 to 3; любое выражение ; j <- 1 to 3 if i != j) print(f"${10 * i + j}%3d")
// Выведет 12 13 21 23 31 32
т.е. каждая итерация i запускает цикл j (аналог вложенного цикла с условием) и накладывает фильтр, чтобы в j не было пересечений с i
- циклы могут использоваться для генераций колекций
val coll = for (i <- 1 to 10) yield i % 3 //создание коллекции
//== scala.collection.immutable.IndexedSeq[Int] = Vector(1, 2, 0, 1, 2, 0, 1, 2, 0, 1)

Функции

- Однострочный вариант:
def abs(x: Double) = if (x >= 0) x else -x
- Многострочный:
def myfnc(x: Double) {
 var r = 10
 r
}
- Указание типа обязательно для рекурсии, иначе нет:
def fac(n: Int): Int = if (n <= 0) 1 else n * fac(n - 1)
- Можно использовать return для немедленного выхода из функции
- параметр функции может иметь значение по умолчанию:
def decorate(str: String, left: String = "[", right: String = "]") =
 left + str + right
- передавать параметры можно по имени в любом порядке:
decorate(left = "<<<", str = "Hello", right = ">>>")
- функция с переменным числом параметров:
def sumf(args: Int*) = {
 var result = 0
 for (arg <- args) result += arg
 result
}
- Чтобы передать список как несколько параметров, нужно добавить : _*
sumf(1 to 5: _*)
//res6: Int = 15
- def w = ....
логически похоже на переменную, которая каждый раз переинициализируется при обращении

Процедуры

То же, что функции, но без = после объявления:
def box(s : String) {()}
//или 
def box(s : String):Unit = {()}

Ленивые переменные

lazy val w = .....
Операция для инициализации w вызовется при первом обращении.

Исключения

Аналогично java:
throw new Exception("test") // вызов исключения
- в scala нет проверки на обязательность исключения во время компиляции как в java
так писать не нужно:
void doSomething() throws IOException, InterruptedException, ClassNotFoundException
- перехват exception идет через case:
try {
 process(url)
} catch {
 case _: MalformedURLException => println(s"Bad URL: $url")
 case ex: IOException => ex.printStackTrace()
}
finally {
 //выполнится всегда
 in.close()
}

Массивы

- Фиксированный массив:
// из 10 элементов по умолчанию = 0: 
val nums = new Array[Int](10)
- с начальной инициализацией:
 val s = Array("Hello", "World") //new не используется
- Переменной длины:
Аналог java arraylist:
import scala.collection.mutable.ArrayBuffer
val b = ArrayBuffer[Int]()
- добавить элемент в массив:
b += 1;
- несколько значений (каждое станет значением)
b += (1,2)
b += Array(1,2)
- обход массива:
-- если индекс не нужен:
for(e <- elems) {}
-- если нужен индекс:
for(i <- 0 until arr.length)
until почти то же самое, что to , создается множество -1 значение
- yeld - обход с преобразованием в новый массив (описано в циклах)
- обход и фильтрация массива:
arr.filter(_ % 2 == 0).map (2 * _ )
- arr.indices - список индексов (ключей) массива
- встроенные методы: sum, max, count(_ > 0), sorted (_ < _) //направление сравнения
- arr.mkString(",") - преобразование массива в строку или toString - дополнительно возвращает тип данных

- Многомерный массив:
val matrix = Array.ofDim[Double](3, 4) // Три строки, четыре столбца

Ассоциативные массивы

- с начальными значениями фиксированный:
val scores = Map("Alice" -> 10, "Bob" -> 3, "Cindy" -> 8)
- изменяемый массив:
val scores = new scala.collection.mutable.HashMap[String, Int]
- доступ почти аналогичен java:
val bobsScore = if (scores.contains("Bob")) scores("Bob") else 0
короткий вариант записи:
val bobsScore = scores.getOrElse("Bob", 0) // второй параметр, если не найдено
- изменение:
scores("A") = 10
- добавление, как в обычном массиве: += //если ключ уже есть, то он перетрется
- удалить ключ: -= "Key"
- обход ассоц. массива:
for((k,v) <- map) {}
- чтобы получить только значения: arr.values, ключи: arr.KeySet
- отсортированный массив хранится в виде Красно-чёрного дерева
val scores = scala.collection.immutable.SortedMap("Alice" -> 10, "Fred" -> 7, "Bob" -> 3, "Cindy" -> 8)
Красно-чёрное дерево схоже по структуре с B-деревом с параметром 2, в котором каждый узел может содержать от 1 до 3 значений и, соответственно, от 2 до 4 указателей на потомков
- хуже сбалансировано, чем btree (много уровней дерева из-за маленькой степени = 2)
+ но легче балансировать при удалении:
* ищется меньшее значение справа от удаляемого (или большее слева)
* найденый указатель перемещается в точку удаления
* ссылка от родителя на перемещаемый элементы (пред. пункт) удаляется
Дерево просто балансируется, тогда как btree при удалении элементы он лишь помечается к удалению, из-за чего оно растет в объеме и требует переиодического ребилда.
Дерево медленнй hash массива при вставке и доступе по ключу, но быстрей для поиска по диапазону.

Кортежи (tuples)

val t = (1, 3.14, "Fred")
основные отличия от массивов:
- различные типы в значениях
- переменное число элементов
- обращение к элементу:
t._2 
- замапить кортеж на переменные:
val (first, second, _) = t //_ не нужная переменная
- удобно, если нужно вернуть несколько значений из функции
- преобразование 2 кортежей в массив пар:
val k = Array("<", "-", ">")
val v = Array(2, 10, 2)
val pairs = k.zip(v) //Array(("<", 2), ("-", 10), (">", 2))
pairs.map // преобразует к асс. массиву

Классы

- аналогично java:
class Counter {
 private var value = 0 // Поля должны инициализироваться
 def increment() { value += 1 } // Методы по умолчанию общедоступные
 def current() = value
}
- Геттеры / сеттеры генерируются автоматически
class Counter {
 var value = 0 
 dev value() = value // геттер
 dev value_ //сеттер - сеттер можно переопределить
 // но обращение все также будет по value
 val conts // поле только для чтения
 @BeanProperty var name: String = _ // геттеры и сеттеры не генерятся
 
 def this(name: String) { // Дополнительный конструктор
  this() // Главный конструктор
 ...
- параметры главного конструктора - автоматически генерируют свойства и геттеры/сеттеры к нему (поле становится приватным)
class Person(name: String, age: Int, private var age1: Int)) { //плюс можно регулировать сразу видимостью геттера/сеттера
 def description = s"$name is $age years old"
}
- можно вкладывать классы в классы и функции в функции

Объекты

- нет статичных методов, вместо этого используется object - объект этого типа может иметь только 1 экземпляр (реализует singleton)
object Accounts {
 private var lastNumber = 0
 def newUniqueNumber() = { lastNumber += 1; lastNumber }
}
var num = Accounts.newUniqueNumber()
- Объекты компаньоны - должны определяться в одном файле и имеют доступ к приватным свойствам/методам друг друга
- метод apply - служит для создания и возвращения класса компаньона
class Account private (val id: Int, initialBalance: Double) {
 private var balance = initialBalance
 ...
}
object Account { // Объект-компаньон
 def apply(initialBalance: Double) =
  new Account(newUniqueNumber(), initialBalance)
 ...
}
val acct = Account(1000.0) //без new вызовется apply
Пример применения apply:
- Array(100) - вернет объект-массив с 1 элементом = 100
- new Array(100) - выполнит конструктор и создаст пустой массив из 100 элементов

- Любая программа должна начинаться с main:
object Hello {
 def main(args: Array[String]) {
  println("Hello, World!")
 }
}
Но если унаследовать App, то это можно делать в конструкторе:
object Hello extends App {
 println("Hello, World!")
}

- перечисления - enum
Нет встроенного типа, но есть класс
object TrafficLightColor extends Enumeration {
 val Red, Yellow, Green = Value
}
теперь можно обращаться:
if( TrafficLightColor.value == TrafficLightColor.Red ) println("Stop")

Пакеты и импортирование

- пакеты служат для управления пространствами имен
package impatient {
class Employee
...
для обращения нужно указывать полный путь: impatient.Employee
- 1 пакет может быть определен в нескольких файлах
- имена пакетов могут быть относительными
- альтернатива создать пакет без фигурных скобок:
package people // в начале файла
- в пакете не может быть переменных и функций
для обхода можно использовать объект пакета (1 на пакет)
package object people {
 val defaultName = "John Q. Public"
}
внутри пакета можно обращаться просто по defaultName
снаружи с полным путем, включая имя объекта
- свойство видимое только в пакете people:
private[people] def description = s"A person with name $name"
- импортирование аналогично:
import java.awt._
-- но может располагаться в любом месте и будет видимо до конца блока
-- импорт в 1 строку нескольких пакетов:
import java.awt.{Color, Font}
-- пакет можно переименовать:
import java.util.{HashMap => JavaHashMap}

Наследование

- по умолчанию все как в java:
class Employee extends Person
- final может быть как класс, так и метод (в отличии от java)
- override - для переопределения не абстрактных методов и переменных требуется
- super.method_name - вызов родителя
- protected методы видны в классе, но не видны в пакете
чтобы стали видны нужно писать protected[пакет]
- во второстепенном конструкторе класса можно вызывать только главный конструктор класса
super конструктор может быть вызван только из главного конструктора (сигнатуры описанной при создании class)
- анонимные классы создаваемые 1 раз при определении:
val alien = new Person("Fred") {
 def greeting = "Greetings, Earthling! My name is Fred."
}
- также есть абстрактные классы:
abstract class Person(val name: String)
-- абстрактное свойство задается при создании:
val id: Int
- инициализация переменных до вызова конструктора супер класса
к примеру если в нем инициализация массива использует эту переменную
class Bug extends {
 override val range = 2
} with Creature
в этом случае сначала инициализируется переменная, потом вызывается конструктор
-- также это можно обойти, если обозначить поле как lazy, чтобы все инициализации прошли при первом обращении - наследование классов в scala:
Any <- AnyVal <- Int..Double...
    <- AnyRef <- все классы
-- в any есть isInstanceOf и hashcode
Встроенный хеш-код генерируется лишь один раз для каждого объекта при первом вызове hashCode(), после сохраняется в заголовке объекта для последующих вызовов.
Но для первого раза используется random или Xorshift:
  0 – Park-Miller RNG (по умолчанию)
  1 – f(адрес, глобальное_состояние)
  2 – константа 1
  3 – последовательный счетчик
  4 – адрес объекта
  5 – Thread-local Xorshift (псевдослучайное от threadid)
-- anyval - нет нового
-- anyref добавляет: wait, notify из Object , synchronized
-- класс Null = null
-- Nothing может использоваться для любого значения: [T] == [Nothing]
-- Unit == void в c++ == ()
- equals - сравнивает равенство адресов ссылок объектов, но его можно переопределить:
final override def equals(other: Any) = {
 other.isInstanceOf[Item] && {
  val that = other.asInstanceOf[Item]
  description == that.description && price == that.price
 }
}
-- аналогично переопределяется метод hashCode
- Классы с одной переменной - классы значения
они не создаются каждый раз, а встраиваются в место вызова:
class MilTime(val time: Int) extends AnyVal {
 def minutes = time % 100
 def hours = time / 100
 override def toString = f"$time04d"
}

Файлы и регулярные выражения

- чтение всего файла в буфер:
import scala.io.Source
val source = Source.fromFile("myfile.txt", "UTF-8")
val lineIterator = source.getLines //toArray, mkString
for (l <- lineIterator) //обработка строки
for (c <- source) //обработка посимвольно
-- если файл большой, то используем буферизированное чтение:
val iter = source.buffered
while (iter.hasNext) {
 if (iter.head is nice) //работаетм
- чтение URL
val source1 = Source.fromURL("http://horstmann.com", "UTF-8")
- двоичные файлы читаются как в java
- запись также как в java
- сериализация:
@SerialVersionUID(42L) class Person extends Serializable //SerialVersionUID - установка значения переменной
- выполнение команд командной строки:
import scala.sys.process._
val result = "ls -al /".!!
("ls -al /" #| "grep u").! // если нужно переправить данные между программами
- строки в тройных кавычках не надо экранировать:
val wsnumwsPattern = """\s+[0-9]+\s+""".r
- NIO - новая реализация отображения файлов в память, самый быстрый вариант
-- чтение данных
-- складывание в буфер, который мы читаем
+ эти процессы происходят в параллель (асинхронность), что дает повышенную производительность при чтении, так и при записи (т.к. данные обмениваются через быстрый буфер в памяти)

Трейты (интерфейсы)

- может быть несколько трейтов, но не несколько классов (все как в java)
- может потребовать наличие полей, методов, суперклассов
-- требование наличия суперкласса Exception:
trait LoggedException extends Logged {
 this: Exception =>
  def log() { log(getMessage()) }
}
-- требование наличие метода getMessage:
trait LoggedException extends Logged {
 this: { def getMessage() : String } =>
  def log() { log(getMessage()) }
}
- может быть часть реализации (как у абстрактных классов java) - называется примесью (добавляется часть новой реализации)
trait ConsoleLogger {
 def log(msg: String) { println(msg) }
}
- важен порядок - первые методы в конце списка - методы будут вызываться последовательно с конца или перетираться
trait TimestampLogger extends ConsoleLogger { //пример с расширением трейта
 override def log(msg: String) {
  super.log(s"${java.time.Instant.now()} $msg") //вызывает вышестоящий трейт ConsoleLogger
 }
}
-- но есть возможность обратиться к конкретному трейту:
super[ConsoleLogger].log(...)
- при переопредлении метода трейта не нужно указывать override
- доп трейты наследуются через with
class ConsoleLogger extends Logger with Cloneable with Serializable
- при создании объекта можно подмешать другой класс, который наследует тот же trait
val acct = new SavingsAccount with ConsoleLogger
теперь функция log будет из ConsoleLogger,а не из Log, как по умолчанию в SavingsAccount (т.е. получается как бы наследование на лету)
- трейт может иметь конструктор как обычный класс, но конструктор будет без параметров (единственное техническое отличие от классов!)
-- проблема инициализации переменных в том, что переменные инициализируются до вызова основного конструктора-класса
чтобы обойти, нужно использовать префиксную установку переменной до конструктора: //то же самое было описано выше в классах
val acct = new { 
 val filename = "myapp.log"
} with SavingsAccount with FileLogger
либо использовать lazy переменные
- трейты могут наследоваться от класса, тогда он становится суперклассом для подмешивающего

Операторы

- инфиксные операторы:
замена вызова метода: .to, -> на 1 to 10, 1 -> 10
- опрделение своего оператора как в java:
def *(other: ClassName) = new ClassName(x * other.x, y * other.y)
- f(arg1, arg2, ...) вызовет f.apply(arg1, arg2, ...) , если метода f нет в классе
- f(arg1, arg2, ...) = value - если слева от =, то вызовет update
это используется в массивах:
scores("Bob") = 100
- unapply - Обратная операция - объект разворачивается в переменные:
object Fraction {
 def unapply(input: Fraction) =
  if (input.den == 0) None else Some
использование: val Fraction(a, b) = f;
- динамические методы и переменные:
import scala.language.dynamics
-- динамический вызов транслируется в :
obj.applyDynamic("name")(arg1, arg2, ...) 
//или 
obj.updateDynamic("name")(выражение_справа) //для присвоения
-- person.lastName = "Doe"
транслируется в:
person.updateDynamic("lastName")("Doe")
реализовывать функцию нужно самим
def updateDynamic(field: String)(newValue: String) { ... } 

Функции высшего порядка

- функцию можно положить в переменную:
val fun = ceil _
_ означает необходимость параметра для функции
- метод класса:
val f = (_: String).charAt(_: Int)
переменная: (параметр: тип).функция(параметр: тип)
val f: (String, Int) => Char = _.charAt(_)
переменная: (входные параметры) => выходной параметр = вызов функции с параметрами
- анонимная функция
(x: Double) => 3 * x
можно передавать ее как параметр, без переменных:
Array(3.14, 1.42, 2.0).map((x: Double) => 3 * x)
более короткая запись:
Array(3.14, 1.42, 2.0).map(3 * _)
.reduceLeft(_ * _) //последовательное применение слева направо
- функция, параметром которой является функция:
def funcname(f: (Double) => Double) = f(0.25) // принимается любая функция с параметром = Double
funcname(sqrt _) //пример
- функция возвращает функцию:
def mulBy(factor : Double) = (x : Double) => factor * x
в функцию mulBy положена функция "(x : Double) => factor * x"
val quintuple = mulBy(5)
quintuple(20) // 100
- каррирование - преобразование ф. с 2 параметрами в 2 функции с 1 параметром:
def mulOneAtATime(x: Int) = (y: Int) => x * y
mulOneAtATime(6)(7)
- передача абстрактного кода:
def runInThread(block: => Unit) ///
runInThread { println("Hi"); Thread.sleep(10000); println("Bye") }
- return может быть использован чтобы определить тип возвращаемых данных, если одновременно используется анонимная функция без типа, которая должна посчитать результат

Коллекции

- наследование объектов:
Iterable -> Seq -> IndexedSeq //обычный массив или список (+ список с индексным доступом)
   -> Set -> SortedSet //коллекция значений (без индексов?)
   -> Map -> SortedMap //множество пар (ключ,значение)
- Преобразование типов через: .to*
- все коллекции по умолчанию неизменяемые (Immutable)
т.е. если мы изменяем коллекцию, то создается копия с изменениями
- Vector - immutable расширяемый массив (ArrayBuffer - muttable)
Технически хранится в виде b-дерева, с элементами в узле
- Range - последовательность чисел
Технически хранит: начало, конец и шаг
- на последовательностях сделано: Stack, Queue, Prior Queue, ListBuffer
- списки (List) - есть указатели на начало (.head) , .tail будет указывать на список за первым элементом
val digits = List(4, 2)
каждый элемент списка также список из 1 элемента с функциями начало и конец
-- добавить значение в начало:
9 :: List(4, 2)
- ListBuffer - имеет отдельные ссылки на начало и конец
- Множества - коллекция, где значение может присутствовать 1 раз:
Set(2, 0, 1) // порядок следования не гарантируется
для хранения используется хэш массив
-- LinkedHashSet - использует и хэш массив и связанный список, что сохраняет последовательность
- добавление элемента в неупорядоченную коллекцию: +
добавление в конец: :+ , начало: +:
++ и -- для добавления и удаления группы элементов
+= или ++= создают новую коллекцию добавляя элмент или множество
- map применяет одноместную функцию над каждым элементом создавая новую коллекцию:
names.map(_.toUpperCase)
тоже самое , что
for (n <- names) yield n.toUpperCase //это транслируется в map
-- flatmap - если функция возвращает коллекцию, то элементы развернутся в основную
-- transform - muttable map (без создания копии)
-- foreach - если не надо никуда сохранять, а только вызвать
- reduce - применяет 2х местную функцию и возвращает 1 результат (так последовательно к результату и следующему элементу, пока в конце не останется 1 элеменент)
- список1 zip список2 - объединение 2 списков в 1 список пар значений
-- обращение к элементам пар значений:
 (prices zip quantities) map { p => p._1 * p._2 }
-- zipall - объединение листов разного размера со значением по умолчанию
- iterator используется там, где поместить объект целиком в память слишком дорого
for (elem <- iter) //каждый вызов изменяет итератор
- неизменяемые итератор: #:: - потоки - ленивый вызов следующего в списке
val words = Source.fromFile("/usr/share/dict/words").getLines.toStream
- можно приводить java объекты к scala коллекциям
import scala.collection.JavaConversions._
val props: scala.collection.mutable.Map[String, String] =
System.getProperties()

Сопоставление с образцом

- аналог switch:
ch = ..
ch match {
 case '+' | '*' => sign = 1
 case '-' => sign = -1
 case _ => sign = 0
}
-- не нужен brake - он есть по умолчанию
-- _ - если не будет, то выкинется исключение при проверке
-- можно перечислять несколько значений через |
-- в case может быть любое логическое условие:
case _ if Character.isDigit(ch) => digit = Character.digit(ch, 10)
-- присвоение идет произвольной переменной за =>
-- сопоставление может идти с типом переменной:
case s: String => Integer.parseInt(s)
- сопоставление массива 2 переменным и всего остального последовательности rest:
val Array(first, second, rest @ _*) = arr
- сопоставление ключа и значения ассоц. массива может быть в цикле:
for ((k, v) <- System.getProperties())
- сопоставление может идти по типу класса:
case Dollar(v) => s"$$$v"
- объект можно скопировать с изменением параметра:
val price = amt.copy(value = 19.95)
- для сопоставления с пустым значением можно использовать тип Some
case Some(score) => println(score) //любое значение
case None => println("No score") //пустое значение (замена "" или NULL)

Аннотации

Диррективы компилятору
- определение собственной аннотации
class unchecked extends annotation.Annotation
- volatile поле, которое можно менять из разных потоков
jvm выключает оптимизации: помещение в кэше процессора и т.д.
Изменения данных всегда будет видно всем процессам программы, т.к. обновление будет идти через общее хранилище.
@volatile var done = false
- несериализуемое поле
@transient var recentLookups = new HashMap[String, String]
- методы на c++
@native def win32RegKeys(root: Int, path: String): Array[String]
- Контролируемые исключения:
@throws(classOf[IOException]) def read(filename: String) { ... }
//аналог java:
void read(String filename) throws IOException
- генерация геттеров/сеттеров:
@BeanProperty var name : String = _
- если рекурсивный вызов - просто вызов, то он может быть преобразован к циклу:
большая рекурсия может отвалиться по памяти стека:
//плохо: 
if (xs.isEmpty) 0 else xs.head + sum(xs.tail) //значение + рекурсивный вызов не может автоматически преобразоваться к циклу
//хорошо: 
@tailrec def sum2(if (xs.isEmpty) partial else sum2(xs.tail, xs.head + partial)
- @switch - трансформировать swith в таблицу переходов, что оптимальнее if
- @elidable(500) - метод после 500 сборки удалится из скомпилированного кода
- def allDifferent[@specialized(Long, Double) T](x: T, y: T, z: T) = ... - генерация специализированных функций под каждый тип
- @deprecated(message = "Use factorial(n: BigInt) instead") - устаревшая функция
@deprecatedName('sz) - устаревший параметр

Обработка XML

- создание xml переменной:
val elem = <a href="http://scala-lang.org">The <em>Scala</em> language</a>
- обработка элементов:
for (n <- elem.child) обработать n
- доступ к атрибуту:
val url = elem.attributes("href") // вернется список, даже если 1 элемент
- обход всех атрибутов:
for (attr <- elem.attributes)
- вставка переменной в xml:
<ul><li>{items(0)}</li><li>{items(1)}</li></ul>
<ul>{for (i <- items) yield <li>{i}</li>}</ul>
- xpath поиск:
-- поиск непосредственного тега body - любой тег - тег li
doc \ "body" \ "_" \ "li"
-- поиск тега img на любом уровне вложенности
doc \\ "img"
- case <img/> => ... - выберется если img с любым содержанием
- объекты неизменяемые, для изменения атрибута нужно его скопировать:
val list2 = list.copy(label = "ol")
- для трансформации можно использовать класс с case преобразования внутри:
- загрузка xml:
import scala.xml.XML
val root = XML.loadFile("myfile.xml")
- загрузка с сохранением комментариев:
ConstructingParser.fromFile(new File("myfile.xml"), preserveWS = true)
- сохранение xml:
XML.save(writer, root, "UTF-8", false, null)

Обобщенные типы

- обобщенный класс (generic):
class Pair[T, S](val first: T, val second: S)
- если хотим сравнивать:
class Pair[T <: Comparable[T]](val first: T, val second: T)
- если хотим поменять местами, то наоборот T должен быть супертипом:
def replaceFirst[R >: T](newFirst: R) = new Pair(newFirst, second)
- если класс не реализует Comparable, но может привестись к нему (к примеру Int к RichInt):
class Pair[T](val first: T, val second: T)
(implicit ev: T => Comparable[T])
- для работы с массивом обобщенных типов, нужен "ClassTag"
def makePair[T : ClassTag](first: T, second: T) = {
 val r = new Array[T](2); r(0) = first; r(1) = second; r
}
- можно одновременно указывать обе границы:
T <: Upper >: Lower
- обобщенный тип нескольких трейтов:
T <: Comparable[T] with Serializable with Cloneable
- Ограничение типов
T =:= U //равенство типу
T <:< U //подтип
T <%< U //может приводиться к типу

Дополнительные типы

- составление цепочек вызовов:
article.setTitle("Whatever Floats Your Boat").setAuthor("Cay Horstmann")
для того, чтобы она работала и для наследников базового класса нужно указывать "this.type":
def setTitle(title: String): this.type = { ...; this }
- псевдоним для длинного типа:
type Index = HashMap[String, (Int, Int)]
- необходимость передачи типа данных с методом "append"
def appendLines(target: { def append(str: String): Any },
- внедрение зависимостей:
//1. трейт:
trait Logger { def log(msg: String) }
//2. 2 реализации с выводом на экран и в файл
//3. аутентификация с поддержкой логирования
trait Auth {
 this: Logger => //трейт может подмешиваться в классы , наследующий Logger
  def login(id: String, password: String): Boolean
}
//4. трейт приложения должен подмешиваться в класс с логированием и аутентификацией
trait App {
 this: Logger with Auth =>
 ...
}
//5. приложение собирается с нужными классами для логирования:
object MyApp extends App with FileLogger("test.log") with MockAuth("users.txt")
- абстрактный тип:
trait Reader {
 type Contents
// определение абстрактного типа:
class StringReader extends Reader {
 type Contents = String
- то же самое через обобщенный тип:
trait Reader[C] {
class StringReader extends Reader[String]

Неявные преобразования

- функции с неявными преобразованием параметров Int к Fraction
implicit def int2Fraction(n: Int) = Fraction(n, 1)
val result = 3 * Fraction(4, 5) // Вызовет int2Fraction(3) * Fraction(4, 5)
- добавление своей функции в стандартный класс:
-- добавляем функцию read
class RichFile(val from: File) {
 def read = Source.fromFile(from.getPath).mkString
}
-- неявное преобразование класса File к RichFile:
implicit def file2RichFile(from: File) = new RichFile(from)
-- вместо функции можно использовать класс:
implicit class RichFile(val from: File) { ... }
дальше обычно создаем объект File
помещаем функции в файл
и обязательно импортируем без префикса:
import com.horstmann.impatient.FractionConversions._
чтобы минимизировать число неявных преобразований, вызов можно делать внутри кода класса
- необязательные параметры:
def quote(what: String)(implicit delims: Delimiters) =
//явный параметр:
quote("Bonjour le monde")(Delimiters(""", """)) 
можно заранее объявить неявный разделитель:
implicit val quoteDelimiters = Delimiters(""", """)


Конкурентное программирование Scala

Реализация многопоточности в JVM

- поток в jvm это обертка над потоком ОС (в отличии от python)
- java-style поток:
object ThreadsCreation extends App {
 class MyThread extends Thread {
  override def run(): Unit = {
   println("New thread running.")
  }
 }
 val t = new MyThread
 t.start()
 t.join() //основной поток приостанавливается до окончания работы T
 println("New thread joined.")
}
- простой способ вернуть данные из потока - это объявить глобальную переменную, но использовать ее после join
- объявление синхронизированной функции:
def getUniqueId() = this.synchronized {}
- самодельный пул потоков:
-- пул ожидающий работы (wait):
while (tasks.isEmpty) tasks.wait()
tasks.dequeue()
-- помещение программы в пул и нотификация, что нужно перестать делать wait
tasks.enqueue((
-- остановить пул при установке некого флага:
Worker.interrupt()

Особенности сборки мусора в jvm
- очень плохо относится к swap - даже если 1 поток ушел туда, т.к. виртуальная машина опрашивает всех потоки
- забрав ОЗУ, обратно не отдается
- если heap закончилось, то запускается сборщик мусора с попыткой высвободить память
Адаптивный сборщик мусора:
- Во время сбора мусора работа программы останавливается.
- Сборщик прослеживает связи от стека/статической памяти до кучи на наличие ссылок
-- Если в памяти много мелких объектов и они лежат достаточно плотно, то все активные объекты переносятся из одной кучи в другу и плотно упаковываются.
Оставшиеся объекты в старой куче уничтожаются. Этот процесс может происходит как целиком на всех данных или кусочками.
-- Если объекты меняются редко или данные сильно разбросаны / фрагментированы, то данные не переносятся. Объекты без ссылок в куче помечаются к удалению и вычищаются
В среднем этот метод медленней, но в случае сильной фрагментации он лучше.

Атомарные изменчивые переменные

@volatile var found = false
такие переменные изменяются за 1 такт, поэтому не могут меняться одновременно в нескольких потоках
на уровне процессора это операция: compareAndSet - установить значение, если compare выполнилось удачно
полный список описан тут: java.util.concurrent.atomic: AtomicBoolean, AtomicInteger, AtomicLong и AtomicReference
Пример использования:
private val uid = new AtomicLong(0L)
def getUniqueId(): Long = uid.incrementAndGet() //увеличить счетчик и получить значение
getAndSet //получить значение и установить новое
Аналог через synchronized:
def compareAndSet(ov: Long, nv: Long): Boolean =
this.synchronized {
 if (this.get == ov) false else {
  this.set(nv)
  true
 }
}
- lazy переменные инициируются 1 раз при первом обращении
для проверки инициализации используются дополнительное атомарное поле-флаг
- muttable объекты нужно использовать с блокировками
- шаблон producer / consumer
между потребителем и производителем находится конкурентная очередь с данными:
- BlockingQueue - реализация в Scala: add/remove/element - с исключениями (poll/ofer/peak - со специальным значение) (take, put - блокирующие)
-- ArrayBlockingQueue - очередь фиксированного размера
-- LinkedBlockingQueue - неограниченная очередь (когда производители гарантированно работают быстрее потребителей)
- Конкурентные объекты:
val files: concurrent.Map[String, Entry] = new concurrent.TrieMap() // хэш массив с блокировками
-- concurrent.TrieMap() - позволяет обходить массив по его копии, что не блокирует его самого и не дает получить изменяемые данные

Параллельные коллекции данных

- параллельный запуск обработки колекции = вызов функции par
coll.par.count(_ % 2 == 0)
-- можно распараллелить цикл:
for (i <- (0 until 100000).par) print(s" $i")
-- преобразование параллельной коллекции в последовательную:
coll.par.filter(p).seq
-- reduce может применяться над параллельными коллекциями, если выполняется a op b == b op a ( к примеру сложение просто парралелиться, но не вычитание, т.к. оно зависит от порядка )
взамен reduceLeft в параллельных программах лучше использовать aggregate - каждый поток будет иметь свой аккумулятор вместо одного общего
--- Установка уровня параллельности для блока:
def withParallelism[A](n : Int)(block : => A) : A = {
  import collection.parallel.ForkJoinTasks.defaultForkJoinPool._
  val defaultParLevel = getParallelism
  setParallelism(n)
  val ret = block
  setParallelism(defaultParLevel)
  ret
}
withParallelism(2) {
  (1 to 100).par.map(_ * 2)
}
--- Установка параллельности для оператора:
val fjpool = new ForkJoinPool(2)
val customTaskSupport = new parallel.ForkJoinTaskSupport(fjpool)
coll.par.tasksupport = customTaskSupport
- для преобразования последовательной коллекции к параллельной используются сплиттеры (надстройка над итератором)
-- линейноссылочные структуры List/Stream не имеют эффективного алгоритма Split
при вызове par у этих структур они сначала преобразуются к обычным массивам
- Пример создания своей параллельной коллекции (в данном случае строки):
class ParString(val str: String) extends immutable.ParSeq[Char] { //immutable.ParSeq[Char] - последовательность Char
 def apply(i: Int) = str.charAt(i)
 def length = str.length
 def splitter = new ParStringSplitter(str, 0, str.length) //в последовательных коллекциях тут был бы итератор
        //в данном случае сплиттер должен возвращать часть строки
 def seq = new collection.immutable.WrappedString(str) //преобразование параллельной коллекции в последовательную
}
сплиттер должен быть отнаследован от трейта:
trait IterableSplitter[T] extends Iterator[T] {
 def dup: IterableSplitter[T] //заглушка - создание копии сплиттера
 def remaining: Int //число элементов в сплиттере
 def split: Seq[IterableSplitter[T]] //разделение сплиттера на подсплиты
}
- необязательные функции:
psplit - ручное задание размеров для сплита
- пример сплита на 2 части:
def split = {
 val rem = remaining
 if (rem >= 2) psplit(rem / 2, rem - rem / 2) //вызываем psplit 2 раза
 else Seq(this)
}
- psplit создает сплиттер с половиной строки:
def psplit(sizes: Int*): Seq[ParStringSplitter] = {
 val ss = for (sz <- sizes) yield {
  val nlimit = (i + sz) min limit
  val ps = new ParStringSplitter(s, i, nlimit) //нет копирования, просто передача индексов по строке
  i = nlimit
  ps
 }
 if (i == limit) ss
 else ss :+ new ParStringSplitter(s, i, limit)
}
- Комбинаторы - слияние сплитов обратно при использовании методов трансформации: map, filter groupBy

Объекты Future

- создание отдельного потока из пула потоков:
import java.time._
import scala.concurrent._
import ExecutionContext.Implicits.global //пул потоков
val f = Future {
 Thread.sleep(10000)
 println(s"This is the future at ${LocalTime.now}") //отдельный поток
 42
}
val result = Await.result(f, 10.seconds) // ожидание завершение потока с блокировкой основной программы
      // 2ой параметр - это таймаут ожидания потока, если превысит, то будет TimeoutException.
-- другой вариант ожидания:
Await.ready(f, 10.seconds)
val Some(t) = f.value
-- альтернативный вариант без блокировки:
по результату работы асинхронно вызовется функция:
f.onComplete {
 case Success(v) => println(s"The answer is $v")
 case Failure(ex) => println(ex.getMessage)
}
println(s"This is the present at ${LocalTime.now}")
-- объект Future должен возвращать результат или генерировать exception в случае ошибки

- размер пула по умолчанию = числу ядер
-- если нужно другое значение, то нужно создать свой пул потоков:
val pool = Executors.newCachedThreadPool()
implicit val ec = ExecutionContext.fromExecutor(pool)
- чтобы перехватить иключение, можно использовать объект Try:
val result = Try(str.toInt)
// обработка результата:
if (t.isSuccess) println(s"The answer is ${t.get}")
if (t.isFailure) println(t.failed.get.getMessage)
- если нужно выполнить несколько потоков парраллельно, но данные нужны из обоих, то
//выполняем их параллельно:
val future1 = Future { getData1() }
val future2 = Future { getData2() }
// а результат ожидаем в последовательном oncomplete
future1 onComplete {
 case Success(n1) =>
 future2 onComplete {
  case Success(n2) => {
   val n = n1 + n2
   println(s"Result: $n")
  }
  case Failure(ex) => ...
 }
 case Failure(ex) => ...
}
- еще вариант параллельной обработки нескольких future через map:
val combined = for (n1 <- future1; n2 <- future2) yield n1 + n2 //т.к. Future имеет метод map, позволяющий работать с ним как с массивом
- или через async:
val combined = async { await(future1) + await(future2) }
- если функции нужно вызывать последовательно, то это future блоки надо определить как функции, а не переменные как ранее
def future1 = Future { getData() }
def future2 = Future { getMoreData() } // def, не val
val combined = for (n1 <- future1; n2 <- future2) yield n1 + n2 //сначала вызовется функция future1, потом future2
- обработка исключения SQLException внутри Future:
val f = Future { persist(data) } recover { case e: SQLException => 0 }
- разделение данных на части и их параллельная обработка:
val futures = parts.map(p => Future { вычислить результат в p })
//получить коллекция результатов, по окончанию всех потоков:
val result = Future.sequence(futures); //функция не блокирует программу, так что изначально тут просто ссылки на объекты без результатов
//сложение результатов, по мере их готовности:
val result = Future.reduceLeft(futures)(_ + _)
//если нужно получить первый выполнившийся поток:
Future[T] result = Future.firstCompletedOf(futures) // но другие задания продолжат работу!

- Через Promise можно вернуть несколько значений из потока:
val p1 = Promise[Int]()
val p2 = Promise[Int]()
Future {
 val n1 = getData1()
 p1.success(n1)
 val n2 = getData2()
 p2.success(n2)
}
// получение результата из первого выполнившегося потока:
val p = Promise[Int]()
Future {
 var n = workHard(arg)
 p.trySuccess(n)
}
Future {
 var n = workSmart(arg)
 p.trySuccess(n)
}

Реактивное программирование

подписка на изменения объекта
- для использования нужно подключить стороннюю библиотеку:
com.netflix.rxjava
- подписка на изменения 3 строк - 2 разных подписчика (в Future только 1 onComplete)
val o = Observable.items("Pascal", "Java", "Scala") //синхронный вызов, через timer - асинхронный
o.subscribe(name => log(s"learned the $name language"))
o.subscribe(name => log(s"forgot the $name language"))
- пример - мониторинг изменения файлов:
def modified(directory: String): Observable[String] = {
 Observable.create { observer =>
  val fileMonitor = new FileAlterationMonitor(1000) //генерация событий каждые 1000мс
  val fileObs = new FileAlterationObserver(directory)
  val fileLis = new FileAlterationListenerAdaptor {
   override def onFileChange(file: java.io.File) {
    observer.onNext(file.getName)
   }
  }
  fileObs.addListener(fileLis)
  fileMonitor.addObserver(fileObs)
  fileMonitor.start()
  Subscription { fileMonitor.stop() }
 }
}
подписка на мониторинг:
val sub = modified(".").subscribe(n => log(s"$n modified!"))

Программная транзакционная память

- в основе лежит операция atomic
для работы нужна библиотека org.scala-stm
import scala.concurrent.stm._
def swap() = atomic {
атомарные операции не подвержены взаимоблокировкам, из-за этого они используются в оптимистических блокировках:
-- перед началом запоминается состояние памяти
-- если оказывается при записи, что область была изменена, то транзакция рестартуется
- для применения оптимистичной блокировки над переменной, нужно использовать ссылочный тип:
val urls = Ref[List[String]](Nil)
val clen = Ref(0)
- минус оптимистических блокировок:
-- рестарт транзакции, так что в транзакции нельзя вызывать неоткатываемые транзакции - к примеру http запрос
-- дополнительная нагрузка из-за рестарта
- для 1 минуса - "повторный вызов неоткатываемого запроса", можно использовать событие - aftercommit:
def inc() = atomic { implicit txn =>
 val valueAtStart = myValue()
 Txn.afterCommit { _ => //вызов неоткатываемого действия после удачного вызова
  log(s"Incrementing $valueAtStart")
 }
 Txn.afterRollback { _ => //после неудачного - оно будет рестартовано?
  log(s"rollin’ back") //тут можно сделать чтото, чтобы избежать повторного исполнения
 }
 myValue() = myValue() + 1
}
- в случае исключения внутри atomic - она целиком откатывается
- повторение транзакции, но с таймаутом в 1с:
Future {
 blocking {
  atomic.withRetryTimeout(1000) { implicit txn =>
   if (message() != "") log(s"got a message - ${message()}")
   else retry
  }
 }
}
- бесконечный повтор с интервалом в 1с:
Future {
 blocking {
  atomic { implicit txn =>
   if (message() == "") {
    retryFor(1000)
    log(s"no message.")
   } else log(s"got a message - ‘${message()}’")
  }
 }
}
- для локальных переменных нужно использовать TxnLocal - она индивидуальна в каждой транзакции
val myLog = TxnLocal("")
- транзакционный массив:
val website: TArray[String] = TArray(pages)
- транзакционный словарь:
val alphabet = TMap("a" -> 1, "B" -> 2, "C" -> 3)
-- словарь имеет возможность получить неизменяемый снапшот:
val snap = alphabet.single.snapshot

Акторы

взаимодействие независимых программ посредством сообщений (модель producer / consumer )
- Общие данные хранятся в памяти актора. Новые сообщения поступают через очередь, и обрабатываются внутри последовательно.
Акторы могут даже находится на разных компах, логика не зависит от их положения
- для работы нужен фреймворк Akka
- Первое: нужно объявить шаблон-класс акторов, который описывает его поведение и порядок приема сообщений
на основе этого шаблона будут создаваться акторы:
import akka.actor._
import akka.event.Logging
class HelloActor(val hello: String) extends Actor {
 val log = Logging(context.system, this)
 def receive = {
  case 'hello' =>
   log.info(s"Received a '$hello'... $hello!")
  case msg =>
   log.info(s"Unexpected message ‘$msg’")
  context.stop(self) //остановка актора
 }
}
- создание актора по шаблону:
def props(hello: String) = Props(new HelloActor(hello))
- отсылка сообщений в акторы:
отправка происходит в очередь, не дожидаясь окончания обработки
val deafActor: ActorRef = ourSystem.actorOf(Props[DeafActor], name = "deafy")
deafActor ! "hi" // ! - оператор для отправки сообщения
deafActor ! 1234
- поведение и состояние акторов. Актор со счетчиком:
работает как конечный автомат
при достижении 0 , актор больше не принимает receive - полностью переключается на done
class CountdownActor extends Actor {
 val log = Logging(context.system, this)
 var n = 10
 def counting: Actor.Receive = {
  case "count" =>
  n -= 1
  log.info(s"n = $n")
  if (n == 0) context.become(done)
 }
 def done = PartialFunction.empty
 def receive = counting
}
-- переключать во время работы можно на любую функцию, используя:
context.become(function)
- Иерархия акторов
-- Дочерний актор:
class ChildActor extends Actor {
 val log = Logging(context.system, this)
 def receive = {
  case "sayhi" =>
   val parent = context.parent
   log.info(s"my parent $parent made me say hi!")
 }
-- Родительский вызывает дочерний:
class ParentActor extends Actor {
val log = Logging(context.system, this)
 def receive = {
  case "create" =>
   context.actorOf(Props[ChildActor])
   log.info(s"created a kid; children = ${context.children}")
  case "sayhi" =>
   log.info("Kids, say hi!")
   for (c <- context.children) c ! "sayhi"
-- получить путь в иерархии акторов:
context.actorSelection(path)
-- в случае исключения в акторе, вышестоящий его перезапускает
- события в акторе:
-- preStart - перед началом обработки -- preRestart - перед рестартом в случае ошибки -> еще раз вызывает postStop
-- postRestart - после рестарта, перед обработкой -> еще раз вызывает preStart
-- postStop - окончание обработки
пример использования:
override def preStart(): Unit = log.info(s"printer preStart.")
- отправка данных в актор с ожиданием ответа
class Pongy extends Actor {
 val log = Logging(context.system, this)
 def receive = {
  case "ping" =>
   log.info("Got a ping -- ponging back!")
   sender ! "pong" //после обработки, отправляем сендеру ответ
   context.stop(self) //и останавливаем актора
 }
сендер отправляет событие и ждет ответа:
class Pingy extends Actor {
 val log = Logging(context.system, this)
 def receive = {
  case pongyRef: ActorRef =>
   val f = pongyRef ? "ping"
   f pipeTo sender //получение ответа
 }
}
- Диспетчеризация акторов:
Дочерние акторы можно:
-- перезапустить - restart (стратегия по умолчанию)
-- возобновить без перезапуска - Resume
-- остановить - stop
-- распространить ошибку на все дочерние акторы - Escalate
override val supervisorStrategy =
 OneForOneStrategy() {
  case ake: ActorKilledException => Restart //в случае кила, будет рестарт
  case _ => Escalate //если один из акторов потерпел неудачу, она распространяется на всех
- Удаленное взаимодействие акторов:
-- настраиваем конфиг взаимодействия:
def remotingConfig(port: Int) = ConfigFactory.parseString(s""
akka {
 actor.provider = "akka.remote.RemoteActorRefProvider"
 remote {
  enabled-transports = ["akka.remote.netty.tcp"]
  netty.tcp {
   hostname = "127.0.0.1"
   port = $port
  }
 }
}
-- создаем актор:
val system = remotingSystem("PongyDimension", 24321)
val pongy = system.actorOf(Props[Pongy], "pongy")
-- получаем ссылку на удаленный актор "ActorIdentity", после посылаем сообщения:
def receive = {
 case "start" =>
 val pongySys = "akka.tcp://PongyDimension@127.0.0.1:24321"
 val pongyPath = "/user/pongy"
 val url = pongySys + pongyPath
 val selection = context.actorSelection(url)
 selection ! Identify(0)
 case ActorIdentity(0, Some(ref)) =>
  pingy ! ref
 case ActorIdentity(0, None) =>
  log.info("Something’s wrong - ain’t no pongy anywhere!")
  context.stop(self)
 case "pong" =>
  log.info("got a pong from another dimension.")
  context.stop(self)
- Минусы:
-- Нельзя посылать один и тот же тип сообщения из разных receiver. Для разграничения нужно будет включать уникальный ключ.
-- сложно настроить ожидание определенной цепочки событий (послать в 2 актора и ждать ответа)

Реакторы

расширение модели акторов (решает проблемы минусов акторов)
- подключение:
io.reactors
import io.reactors._
- создание шаблона реактора принимающего только строки
val welcomeReactor = Reactor[String] { self =>
 self.main.events onEvent { name =>
  println(s"Welcome, $name!")
  self.main.seal()
 }
}
создание реактора по шаблону
val system = ReactorSystem.default("test-system")
val ch = system.spawn(welcomeReactor)
передача события в реактор
ch ! "Alan"
реакторы получают на входе события, вместо сообщений в акторах


Молниеносный анализ Spark

Введение в Spark

Общее описание архитектуры Spark описано тут: /search/label/bigdata#spark
var lines = sc.textFile("/apps/hive/warehouse/stg/pos_rec_hdr__imp/part-m-00000")
lines.count() //число строк
lines.first() //первая строка
lines.filter(line => line.contains("20181121")).count() //фильтрация
- Программа на Spark состоит из частей:
--программа драйвер - запускает операции на кластере
-- переменная sc - SparkContext - связь с вычислительным кластером
+ переданные функции выполняются на кластере
- запуск spark shell
spark-shell --master yarn --driver-memory 8G --executor-memory 1G --num-executors 8
- подключение к Spark из Scala:
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "/apps/hive/warehouse/stg/pos_rec_hdr__imp/part-m-00000"
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("20181122")).count()
    val numBs = logData.filter(line => line.contains("20181123")).count()
    println(s"Lines with 20181122: $numAs, Lines with 20181123: $numBs")
    spark.stop()
  }
}
- подсчет числа строк в файле:
val file = sc.textFile("/apps/hive/warehouse/tmp/imp__271996/hadoop_imp_807272.csv") //val - т.к. все RDD неизменяемые
file.flatMap(line => line.split("\\|")) //сплитим строку в массив, flatMap - чтобы раскрыть элементы массива строки в общий массив всего файла
.map(word => (word, 1)) //помещаем каждый элемент массива в хэш массив с числом в значении = 1
.reduceByKey{_ + _} //сворачиваем одинаковые ключи, суммируя значения (можно также countByKey)
.map(item => item.swap) //меняем ключ со значением местами
.sortByKey(true, 1) //сортируем по ключу, где у нас значение после разворота (true - для прямой сортировки)
.map(item => item.swap) //обратно меняем 
.saveAsTextFile("/apps/hive/warehouse/tmp/words/") //сохраняем в файл
- install sbt for ubuntu
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
sudo apt-get update
sudo apt-get install sbt
- sbt конфиг для сборки:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
- сборка jar:
export JAVA_HOME=/usr/jdk64/jdk1.8.0_112/
export PATH=$PATH:/usr/jdk64/jdk1.8.0_112/bin/
sbt package
- последующий запуск:
spark-submit \
--class "SimpleApp" \
--master "yarn" \
target/scala-2.11/simple-project_2.11-1.0.jar
-- master = local[N] - локальный запуск на текущий машине с N ядрами
-- доп параметры:
--jars depl.jar,dep2.jar,dep3.jar \ -- добавить jar к выполнению
--total-executor-cores 300 \ -- число executor
--executor-memory 10g \ --памяти под процесс
--files - список файлов для размещения в каталоге выполнения (файлы с доп. данными)
-- большинство этих параметров можно задать из кода при создании Spark Context: SparkSession.builder

RDD

- виды операций:
-- преобразования - создание нового RDD на основе текущего с выполнением некоторых операций (фильтрация)
-- действия - вычисления результата, не создавая новый RDD: first, count, save
выполнение преобразований откладывается и оптимизируется до первого действия
(т.е. нагляднее сделать несколько map/filter, чем 1 громоздкий)
- если RDD сохранили в переменную, то при каждом вызове действия над ней все преобразования будут вычисляться заново
.persist / .cache - если нужно сохранить результат преобразований
-- можно задать место хранения, к примеру: (StorageLevel.DISK_ONLY) / MEMORY_ONLY/ SER - с сериализацией объекта
-- если задали MEMORY_ONLY, но после RDD был вытеснен, то при последующем обращении к нему ему нужно выполниться заново
если такая вероятность есть, то лучше сохранять с MEMORY_AND_DISK - тогда при повторном обращении блок будет сохранен и загружен с диска
-- unpersist - выгрузить переменную из памяти
- создание RDD
-- из массива:
val lines = sc.parallelize(List("pandas", "i like pandas"))
-- из текстового файла:
val lines = sc.textFile("/path/to/README.md")
- сохранение RDD
-- .collect - в локальный массив или take(N элементов)/takeSample
-- saveAsSequenceFile("path",Some(classOf[GzipCodec]) - в 1 текстовый файл
saveAsTextFile - в несколько файлов
- передача собственных функций
-- функция должна поддерживать сериализацию
def isMatch(s: String): Boolean = {
 s.contains("123")
}
file.filter(isMatch).first()
- основные операции:
-- input.map(x => х * х) - выполняется над каждым элементом и создает новый RDD с тем же числом преобразованных элементов
-- flatMap - то же самое, но разворачивает элементы подмассивов в элементы 1 массива
-- поддерживаются операции над множествами: distinct / union / intersection / substract (oracle minus) / cartesian
-- sample - выборка случайных элементов
-- foreach
- основные действия:
-- reduce / fold - принимают 2 элемента и возвращает 1 элемент того же типа
-- aggregate - похоже на reduce, но используется, если нужно вернуть значение другого типа
val result = input.aggregate ( (0, 0)) (
(асс, value) => (асс._1 + value, асс._2 + 1), //считаем сумму и кол-во одновременно
(accl, асс2) => (accl._1 + асс2._1, accl._2 + асс2._2))
val avg = result._1 / result. 2.toDoule
-- countByValue - частота каждого элемента в массиве

Работа с парами ключ-значение

- создание массива, где ключ - первое слово, значение = сама строка:
val pairs = lines.map(x => (x.split(" ")(0), х))
- фильтр по значению:
pairs.filter{case (key, value) => value.length < 20}
- вычисление среднего по ключам:
rdd.mapValues(x => (х, 1)) .reduceByKey(
(х, у) => (x._l + у._1, х._2 + у._2))
- combineByKey - объединение значений по ключу
пример вычисления среднего:
val result = input.combineByKey(
(v) => (v, 1), //createCombiner - если ключа не было - создаем (число значения, 1)
(асс: (Int, Int), v) => (асс._1 + v, асс._2 + 1), //mergeValues - если ключ есть в этом блоке, то - (число значние + новое значние, число потовторов + 1)
(accl: (Int, Int), асс2: (Int, Int)) => (accl._1 + асс2._1, accl._2 + асс2._2) //mergeCombiners - объединяем потоки
) .map{ case (key, value) => (key, value._1 / value._2.toFloat) //проходимся по полученному массиву: значение / число повторов
result.collectAsMap() .map(println(_)) //collectAsMap - преобразование RDD в локальный ассоциативный массив + выводим на экран
- настройка уровня параллелизма:
sc.parallelize(data) .reduceByKey((x, у) => х + у, число потоков)
- repartition - перераспределение данных в кластере
- coalesce - облегченный вариант repartition с минимизацией передачи данных по сети
операция уменьшает число блоков, склеивая мелкие в более большие на 1 ноде
-- rdd.partitions.size () - чтобы узнать число блоков для репартиций
- группировка по ключу - значения становятся массивом:
groupByKey - [К, Iterable [V]]
-- расширенный вариант этой функции - groupBy
- группировка 2 RDD по 1 ключу (заготова для распредленного hash join)
cogroup () -> RDD [ (К, ( Iterable [V], Iterable [W]))]
- соединения:
-- storeAddress.join(storeRating) - inner join по ключу: вернется массив из значений левой и правой таблицы
-- аналогично есть left/right/cross join
- переопределение функции сортировки:
-- пример - сортировка чисел как строк:
implicit val sortintegersByString = new Ordering[Int] {
 override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey ()
- доп. действия связанные только с kv rdd:
-- countByKey - число элементов для каждого ключа
-- lookup(key) - вывести массив всех элементов ключа key

Управление распределением данных

- перед работой нужно импортировать:
import org.apache.spark.HashPartitioner
- можно распределить данные по ключу при создании, если после это распределение ключа будет использоваться несколько раз:
val userData = sc.sequenceFile[UserID, Userinfo] ("hdfs:// ... ")
.partitionBy(new HashPartioner(100)) // распределяем файл в кластере по 100 хэш партициям
. persist()
-- также 100 означает, что столько параллельных заданий будет обрабатывать будущее соединение с этой таблицей
-- map - создает новый RDD, который не наследует принцип партицирования
- определение метода партицирования:
-- pairs.partitioner - посмотреть тип партицирования у RDD
-- хэш партицирование
val partitioned = pairs.partitionBy(new spark.HashPartioner(2))
-- range партицирование - разделение RDD на 10 равных последовательных частей
counts.partitionBy(new RangePartitioner(10,counts))
-- свой тип партицирования:
class DomainNamePartitioner(numParts: Int) extends Partitioner {
 override def numPartitions: Int = numParts //число партиций
 override def getPartition(key: Any): Int = { //получить номер партиции по ключу
  val domain = new Java.net.URL(key.toString) .getHost() //в данном случае: хэш код от хоста url
  val code = (domain.hashCode % numPartitions)
  if (code < 0) (
   code + numPartitions // Сделать неотрицательным
  else
   code
 }
 // Jаvа-метод equals для сравнения объектов Partitioner
 //для сравнения 2 RDD партицированных одним способом
 override def equals(other: Any): Boolean = other match {
  case dnp: DomainNamePartitioner =>
   dnp.numPartitions == numPartitions
  case _ =>
   false
 }
}

Загрузка и сохранение данных

- загрузка текстового файла:
val input = sc.textFile("hdfs путь до файла или каталога!")
-- загрузка CSV:
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
val input = sc.textFile(inputFile)
val result = input.map{ line =>
 val reader = new CSVReader(new StringReader(line));
 reader.readNext();
}
-- сохранение в csv RDD из 2 полей:
pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray)
.mapPartitions{people =>
 val stringWriter = new StringWriter();
 val csvWriter = new CSVWriter(stringWriter);
 csvWriter.writeAll(people.toList) //преобразование массива с строку с запятыми
 Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)
- SequenceFiles - сериализованный файл ключей-значений, с метками положений для быстрого поиска и деления
файл с ключом = строке и числом в значении
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).map(case (х, у) => (x.toString, y.get()))
сохранение проще:
val data = sc.parallelize(List(("Panda", 3), ("Кау", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)
- objectFile / saveAsObjectFile - объектные файлы через сериализацию
работает медленно, нет меток синхронизации
- другие типы данных:
-- Key - Value текстовый формат:
val input = sc.hadoopFile[Text, Text, KeyValueTextinputFormat] (inputFile)
.map {
 case (х, у) => (x.toString, y.toString)
}
-- загрузка lzo сжатых файлов:
val input; sc.newAPIHadoopFile(inputFile, classOf[LzoJsonlnputFormat], classOf[LongWritaЫe], classOf[MapWritaЫe], conf)
- доступ к apache hive:
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first ()
println (firstRow.getString(0)) // Поле О - это поле name
- обработка json посредством sql:
val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")
- доступ к бд по jdbc:
-- доступ к mysql:
def createConnection() = { //описание подключения
 Class.forName{"com.mysql.jdbc.Driver").newinstance();
 DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r: ResultSet) { //алгоритм извлечения данных
 (r.getint(l), r.getString(2))
}
val data = new JdbcRDD(sc, createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?", //запрос
lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues) 
println(data.collect().toList)
lowerBound/upperBound задает границы данных,
результат которых разбивается на numPartitions партиций (запрос также будет выполняться 2 потоками)
- подключение к hbase:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritaЫe
import org.apache.hadoop.hbase.mapreduce.TaЬleinputFormat
val conf = HBaseConfiguration.create ()
conf.set(TableinputFormat.INPUT_TABLE, "tablname") // определить таблицу
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableinputFormat], classOf[ImmutaЬleBytesWritable],classOf [Result])

Дополнительные возможности

- аккумулярторы
max/filter может использовать в своем коде внешние переменные, но они будут локальными для конкретного воркера
переменные видимые в драйвере и воркере (часто для отладки)
val blankLines = sc.accumulator(0)
val callSigns = file.flatMap (line =>
 if (line == '"') {
  blankLines += 1 // Увеличить значение в аккумуляторе
 line.split(" ")
})
-- значение акк. можно будет получить только после действия с RDD (flatmap - преобразование не вызываемое сразу)
-- точность значения аккумулятора не гарантируется, он может быть больше, если контейнер рестратрует или если RDD несколько раз сохраняется на диск, а потом обратно кэшируется в память
-- можно передавать небольшие значения: Int, Double, Long, Float
-- собственный акк. можно создать отнаследовавшись от AccumulatorParam
- Широковещательные переменные
-- предназначены для передачи большого объема данных, но только для чтения
(пример использования - левая таблица для HJ)
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
 val country = lookupinArray (sign, signPrefixes. value)
 (country, count)
} . reduceByKey ( (х, у) => х + у)
-- signPrefixes - можно изменить в воркере, но изменение будет видно только там
- Единичная инициализация ресурсов через mapPartitions
val contactsContactLists = validSigns.distinct() .mapPartitions{
signs =>
val mapper = createMapper() //единичная инициализация для воркера, а не для каждого элемента
val client = new HttpClient()
client.start ()
// создать http-зaпpoc
signs.map {sign => createExchangeForSign(sign)} //поэлементная обработка
.map{ case (sign, exchange) => // извлечь ответы
 (sign, readExchangeCallLog(mapper, exchange))).filter(x => х._2 != null) // Удалить пустые записи
}
- взаимодействие с внешними программами
val distScript = "./src/R/finddistance.R"
val distScriptName = "finddistance.R"
sc.addFile(distScript)
val distances = contactsContactLists.values.flatMap(x =>
x.map(y => s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")) //передать параметры
.pipe(Seq(SparkFiles.get(distScriptName))) //вызывать скрипт через pipe
-- все вызванные скрипты сохраняются в SparkFiles. getRootDirectory - важно давать разные имена

Настройка и отладка

- input.toDebugString - посмотреть ациклический граф получения RDD
counts.toDebugString
(2) ShuffledRDD [ 296] at reduceByKey at <console>: 17
+-(2) MappedRDD[295] at rnap at <console>:17
 | FilteredRDD [ 294 ] at filter at <console>: 15
 | MappedRDD[293] at rnap at <console>:15
 | input.text MappedRDD[292] at textFile at <console>:13
 | input.text HadoopRDD[291] at textFile at <console>:13
-- объекты на одном уровне выполняются конвеерно без сохранения результата на диск
-- при применении действия ацикличный граф (DAG) преобразуется в реальный план выполнения
- web интерфейс мониторинга:
-- storage в web мониторе показывает список закешированных RDD
на каких хостах он хранится и какое рапредление озу/диск
-- executors - показывает всех исполнителей и сколько ресурсов они себе забрали
тут же можно посмотреть Thread Dump - стэктрейс работы программы
на основе этого можно делать сэмплирование, чтобы определить наиболее медленные части
-- в environment - Хранятся общие настройки
-- также лог выполнения можно посмотреть через yarn
для уже завершенного: yarn logs -applicationid <арр ID>
- число воркеров зависит от числа блоков родительского RDD
т.е. все еще начинается с файлов в hdfs, если файл маленький, то большой параллельности по умолчанию не будет
- после фильтрации RDD число воркеров наследуется от родителя, несмотря на уменьшение объема
из-за этого есть смысл делать coalesce - чтобы уменьшить число пустых блоков
- MEMORY_AND_DISK - большие RDD лучше кэшировать с этой опцией, чтобы не допускать повторный пересчет
60% всей выделенной памяти используется под кэш RDD
- использовать под кэш Spark быстрый флэш диски
- под мелкие задачи выделять мелкие контейнеры, чтобы накладные расходы на GC и передачу данных не перевешивали полезную работу

Трассиовка java программ
Делается через java visual wm:
- sampler - распределение времени, на что тратится основное время (останавливает программу периодически и смотрит дампы)
-- показывает гросс и нет вложенных объектов
-- в сэмпл могут не попасть мелкие но частые
- profiler - подмена байт кода на инструментированный - начало/конец
-- (при запуске нужно поставить ключ (vm options) = noverify
-- и указать класс
-- попадает все, но инструментирование замедляется
- еще вариант через reflexию и прокси - требуется ручное изменение программы
- jmh - проведенение тестов (@Benchmark) - проводим N раз и берем среднее (+ разогрев)
- Трассировку удобно визаулизировать черзе flamegraph (https://queue.acm.org/detail.cfm?id=2927301) :
# git clone https://github.com/brendangregg/FlameGraph 
# cd FlameGraph 
# perf record -F 99 -a -g -- sleep 60 
# perf script | ./stackcollapse-perf.pl | ./flamegraph.pl > out.svg

Spark SQL

- DataFrame - RDD для хранения данных SQL (тип данных: org.apache.spark.sql.DataFrame )
- для работы с SQL нужно в sbt добавить зависимости:
groupid = org.apache.spark
artifactid = spark-hive_2.10
version = 1.2.0
- над полученным DataFrame допустимы все RDD операции: count/filter/map/...
- создать временную таблицу на основе DataFrame. Таблицу можно будет исползовать в HIVE SQL только в этом же контексте
rows.registerTempTable("tempTable") 
- набор хранимых типов ограничен: int / double / timestamp / array
- кэширование tableName в памяти spark
val c = hiveCtx.cacheTable("rdw.bi0_pmaterial") 
- кэширование таблицы через SQL
val c = hiveCtx.sql("CACHE TABLE rdw.bi0_pplant") 
- создание таблицы на основе RDD:
case class HappyPerson(handle: String, favouriteBeverage: String) // описание структуры
// Создать набор персон и превратить его в SchemaRDD
val happyPeopleRDD = sc.parallelize(List(HappyPerson("holden", "coffee"))) //обычный RDD
// Обратите внимание: здесь действует неявное преобразование,
// эквивалентное вызову sqlCtx.createSchemaRDD(happyPeopleRDD)
happyPeopleRDD.registerTempTable("happy_people")
- кастомные функции для Spark SQL:
registerFunction("strLenScala", (_: String) .length)
val tweetLength = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10")
- компиляция запроса в java байт код
conf.set("spark.sql.codegen", "true") 

Spark Streaming

- DStreams - RDD для потоковых данных. Он объединеят в себе множество RDD, каждый из которых отвественен за порцию данных.
- зависимости sbt:
groupld = org.apache.spark
artifactld = spark-streaming_2.10
version = 1.2.0
* импорты:
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext.
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
- программа:
// Создать StreamingContext с 1-секундным интервалом обработки
// и с использованием SparkConf
val ssc = new StreamingContext(conf, Seconds(l))
// Создать DStream из данных, принятых с порта 7777
// локального компьютера
val lines = ssc.socketTextStream("localhost", 7777)
// Отфильтровать поток DStream, оставив строки со словом "error"
val errorLines = lines.filter (_. contains ( "error"))
// Вывести строки со словом "error"
errorLines.print()
-- в отдельном потоке стартуем и ждем окончания
// Запустить обработку потока и дождаться ее "завершения"
ssc.start ()
// Ждать завершения задания
ssc.awaitTermination()
-- запуск:
spark-submit \
--class com.oreilly.learningsparkexamples.scala.StreamingLoginput \
$ASSEМВLY_JAR local[4]
-- отправка сообщения:
nc localhost 7777 testmsg
- отказоустойчивость:
-- данные копируются на 2 ноды
-- каждые 5-6 сообщений происходит checkpoint - запись данных в hdfs
принудительный вызов:
ssc.checkpoint( "hdfs:// ... ")
- преобразования:
-- без сохранения состояния:
map/filter/reduce/group/join - применяются над каждым RDD порции данных.
применение будет над окном заданном в StreamingContext
- transform - применение функции над каждым RDD из окна:
val outlierDStream = accessLogsDStream.transform { rdd =>
 extractOutliers(rdd)
}
-- с сохранением состояния:
при вычислении текущего окна используются данные предыдущих
- Оконные функции - например, нужно посчитать среднее за 30 секунд (размер окна - кратно шагу), при частоте обновления = 10 (шаг перемещения окна)
Кол-во значения в скользящем окне:
val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10))
val windowCounts = accessLogsWindow.count()
Reduce на окне:
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getipAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
( (х, у) => х + у), // Добавить элементы из новых пакетов в окне
( (х, у) => х - у), // Удалить элементы из пакетов, покинувших окно
Seconds(30), // Размер окна
Seconds(10)) // Шаг перемешения окна
также есть: countByValueAndWindow и countByWindow
- UpdateStateByKey - поддержка состояния между пакетами
пример подсчет числа кодов http ответов:
def updateRunningSurn(values: Seq[Long], state: Option[Long]) = {
 Some(state.getOrElse(0L) + values.size)
}
val responseCodeDStrearn = accessLogsDStrearn.rnap(log => (log.getResponseCode(I, 1L) //начальный хэш массив
val responseCodeCountDStrearn = responseCodeDStrearn.updateStateByKey(updateRunningSurn _) //добавляем единичку
- операции вывода
-- сохранение потока в текстовый Sequence файл
writableipAddressRequestCount.saveAsHadoopFiles[
SequenceFileOutputFormat[Text, LongWritble]]("outputDir", "txt")
-- через перебор всех элементов:
ipAddressRequestCount.foreachRDD { rdd =>
 rdd.foreachPartition { partition =>
  // Открыть соединение с внешней системой (например, с базой данных)
  partition.foreach { item =>
   // Передать элемент через соединение
  }
  // Закрыть соединение
 }
}
- источники данных:
-- потоковое чтение файла:
val logData = ssc.textFileStream(logDirectory)
-- можно читать данные посланные из акторов Akka
-- Apache Kafka
Добавляем в sbt:
spark-streaming-kafka 2.10
подписка на стрим Kafka:
import org.apache.spark.streaming.kafka.
// Создать отображение тем в число потоков выполнения
val topics = List(("pandas", 1), ("logs", 1)) .toMap
val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics) //zkQuorum - данные о хостах из zookeper
StreamingLoginput.processLines(topicLines.map(_._2))
-- Apache Flume - пассивный приемник - инициатор передачи - Flume
val events = FlumeUtils.createStream(ssc, receiverHostname, receiverPort)
нет гарантии доставки
- активный приемник - Spark Streaming вычитывает данные из Flume
данные хранятся, пока Spark не подтвердит, что все забрал
извлекаем данные из промежуточного получателя Flume:
val events = FlumeUtils.createPollingStream(ssc, receiverHostname, receiverPort)
// Предполагается, что событие - это строка в кодировке UTF-8
val lines = events.map{e => new String(e.event.getBody() .array(), "UTF-8"))

Данный конспект создан на основе трех одноименных книг