четверг, 29 ноября 2018 г.

Введение в Scala, параллельную разработку и Apache Spark

В этой статье я хотел бы охватить все аспекты работы с данными на языке Scala - от примитивов языка к параллельному программированию и до анализа данных на кластере Spark.

Введение в Scala

Переменные

val - константа
var - Изменяемая переменная:
var s:String = "test"
- все типы являются классами (нет встроенных типов): Byte, Char, Short, Long, Float, Double, Boolean
- String используется java.lang.String но добавляет дополнительные функции (StringOps, RichInt, RichDouble ...)
"Hello".intersect("World") //== lo

Операторы

- Все операторы, на самом деле методы:
a+b = a.+(b)
- нет инкремента (++) и декремента (--) , только i+=1
- Некоторые функции (типа sqrt) можно использовать без объявления пакета после import
scala.math.sqrt(16) //полная запись
import scala.math._
sqrt(16) //частичная запись
- s(i) - java аналог s.charAt(i)
явный вызов в scala: s.apply(i)
- () - аналог void
- ; - нужна только если в строке несколько выражений
- {} - несколько операторов в блоке, возвращает значение - это будет последнее выражение
var a = {a; b; sqrt(16)} //вернет = 4
- результатом присвоения является () - в java = присваеваемому

Условия

- Условия возвращают значение
val s = if (x > 0) 1 else -1
это удобней, т.к. может быть использовано для инициализации const
аналогом c++ является x > 0 ? 1 : -1

Вывод в консоль

print(f"text=$a1 and ${...}") //расширенный аналог printf в java
, где:
$a - переменная
${} - выражение

Циклы

- while / do - аналогично java
- цикл for работает по принципу foreach - перебора элементов последовательности:
for (i <- 1 to n) {}
1 to n - генерирует последовательность чисел, которая обходится последовательно
последовательность может быть любая - выражение или просто строка
- нет break и continue
- в for может быть несколько итераторов через ; и выражение может содержать условие:
for (i <- 1 to 3; любое выражение ; j <- 1 to 3 if i != j) print(f"${10 * i + j}%3d")
// Выведет 12 13 21 23 31 32
т.е. каждая итерация i запускает цикл j (аналог вложенного цикла с условием) и накладывает фильтр, чтобы в j не было пересечений с i
- циклы могут использоваться для генераций колекций
val coll = for (i <- 1 to 10) yield i % 3 //создание коллекции
//== scala.collection.immutable.IndexedSeq[Int] = Vector(1, 2, 0, 1, 2, 0, 1, 2, 0, 1)

Функции

- Однострочный вариант:
def abs(x: Double) = if (x >= 0) x else -x
- Многострочный:
def myfnc(x: Double) {
 var r = 10
 r
}
- Указание типа обязательно для рекурсии, иначе нет:
def fac(n: Int): Int = if (n <= 0) 1 else n * fac(n - 1)
- Можно использовать return для немедленного выхода из функции
- параметр функции может иметь значение по умолчанию:
def decorate(str: String, left: String = "[", right: String = "]") =
 left + str + right
- передавать параметры можно по имени в любом порядке:
decorate(left = "<<<", str = "Hello", right = ">>>")
- функция с переменным числом параметров:
def sumf(args: Int*) = {
 var result = 0
 for (arg <- args) result += arg
 result
}
- Чтобы передать список как несколько параметров, нужно добавить : _*
sumf(1 to 5: _*)
//res6: Int = 15
- def w = ....
логически похоже на переменную, которая каждый раз переинициализируется при обращении
- функцию можно передать параметром другую функции, тогда она будет вызываться только при обращений к переменной-функции, а не при передаче:
def v(x: => Int)
v(f()) //f() выполнится при первом обращении к x

- Частичные (partial) функции:
partial функция - реализуется чреез pattern matching и может работать только для нужных case:
val divide10: PartialFunction[Int, Int] = {
 case 1 => 10
 case 2 => 5
 case 5 => 2
 case 10 => 1
}
определена ли функция для значения можно проверить через isDefinedAt
divide10.isDefinedAt(2) == true
divide10.isDefinedAt(3) == false
если вызывать для несуществующего значения, то будет исключение:
divide10(3)
//scala.MatchError: 3 (of class java.lang.Integer)
//at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:253)
может применяться в lambda функциях как параметр и будет вызываться только для применимых значений:
List.range(1, 11).collect(divide10)
// List(1, 2,..., 10).collect == List(10, 5, 2, 1)

- Возврат Option значения
обработка ситуации NULL указателя через Option и getOrElse
def divide(x: Int, y: Int): Option[Int] =
 if(y == 0) None else Some(x / y)
 
def showDivide(x: Int, y: Int): String =
 divide(x, y) match {
     case Some(d) => s"$x = $d * $y"
     case None => "null division"
 }
или
def showDivide(x: Int, y: Int): String =
 divide(x, y)
 .map(d => s"$x = $d * $y")
 .getOrElse("null division")
обработка None
divide(7, 0).getOrElse(1)
//или 
divide(7, 0).orElse(divide(7, 2)) 

- Исключающее ИЛИ в параметрах или возвращаемом значении
Either[A, B] - исключающее или
val numOrStr1: Either[Double, String] = Left(2.12) //Left вернет левый тип, Right - правый
применяется как параметр в matching :
def info(numOrStr: Either[Double, String]): String =
 numOrStr match {
 case Left(num) => s"number $num"
 case Right(str) => s"string $str"
}
как возвращаемый параметр - может использоваться как обработку ошибок без Try
"Right - это правильно". Поэтому же map и flatMap применяются к правильной ветви, то есть к правой. И "хороший" результат оборачивают в Right, а ошибку в Left.
def sqrt(x: Double): Either[String, Double] =
 if (x < 0) Left("negative number")
 else Right(Math.sqrt(x))

Процедуры

То же, что функции, но без = после объявления:
def box(s : String) {()}
//или 
def box(s : String):Unit = {()}

Ленивые переменные

lazy val w = .....
Операция для инициализации w вызовется при первом обращении.

Исключения

Аналогично java:
throw new Exception("test") // вызов исключения
- в scala нет проверки на обязательность исключения во время компиляции как в java
так писать не нужно:
void doSomething() throws IOException, InterruptedException, ClassNotFoundException
- перехват exception идет через case:
try {
 process(url)
} catch {
 case _: MalformedURLException => println(s"Bad URL: $url")
 case ex: IOException => ex.printStackTrace()
}
finally {
 //выполнится всегда
 in.close()
}

Массивы

- Фиксированный массив:
// из 10 элементов по умолчанию = 0: 
val nums = new Array[Int](10)
- с начальной инициализацией:
 val s = Array("Hello", "World") //new не используется
- Переменной длины:
Аналог java arraylist:
import scala.collection.mutable.ArrayBuffer
val b = ArrayBuffer[Int]()
- добавить элемент в массив:
b += 1;
- несколько значений (каждое станет значением)
b += (1,2)
b += Array(1,2)
- обход массива:
-- если индекс не нужен:
for(e <- elems) {}
-- если нужен индекс:
for(i <- 0 until arr.length)
until почти то же самое, что to , создается множество -1 значение
- yeld - обход с преобразованием в новый массив (описано в циклах)
- обход и фильтрация массива:
arr.filter(_ % 2 == 0).map (2 * _ )
- arr.indices - список индексов (ключей) массива
- встроенные методы: sum, max, count(_ > 0), sorted (_ < _) //направление сравнения
- arr.mkString(",") - преобразование массива в строку или toString - дополнительно возвращает тип данных

- Многомерный массив:
val matrix = Array.ofDim[Double](3, 4) // Три строки, четыре столбца

Ассоциативные массивы

- с начальными значениями фиксированный:
val scores = Map("Alice" -> 10, "Bob" -> 3, "Cindy" -> 8)
- изменяемый массив:
val scores = new scala.collection.mutable.HashMap[String, Int]
- доступ почти аналогичен java:
val bobsScore = if (scores.contains("Bob")) scores("Bob") else 0
короткий вариант записи:
val bobsScore = scores.getOrElse("Bob", 0) // второй параметр, если не найдено
- изменение:
scores("A") = 10
- добавление, как в обычном массиве: += //если ключ уже есть, то он перетрется
- удалить ключ: -= "Key"
- обход ассоц. массива:
for((k,v) <- map) {}
- чтобы получить только значения: arr.values, ключи: arr.KeySet
- отсортированный массив хранится в виде Красно-чёрного дерева
val scores = scala.collection.immutable.SortedMap("Alice" -> 10, "Fred" -> 7, "Bob" -> 3, "Cindy" -> 8)
Красно-чёрное дерево схоже по структуре с B-деревом с параметром 2, в котором каждый узел может содержать от 1 до 3 значений и, соответственно, от 2 до 4 указателей на потомков
- хуже сбалансировано, чем btree (много уровней дерева из-за маленькой степени = 2)
+ но легче балансировать при удалении:
* ищется меньшее значение справа от удаляемого (или большее слева)
* найденый указатель перемещается в точку удаления
* ссылка от родителя на перемещаемый элементы (пред. пункт) удаляется
Дерево просто балансируется, тогда как btree при удалении элементы он лишь помечается к удалению, из-за чего оно растет в объеме и требует переиодического ребилда.
Дерево медленнй hash массива при вставке и доступе по ключу, но быстрей для поиска по диапазону.

Кортежи (tuples)

val t = (1, 3.14, "Fred")
основные отличия от массивов:
- различные типы в значениях
- переменное число элементов
- обращение к элементу:
t._2 
- замапить кортеж на переменные:
val (first, second, _) = t //_ не нужная переменная
- удобно, если нужно вернуть несколько значений из функции
- преобразование 2 кортежей в массив пар:
val k = Array("<", "-", ">")
val v = Array(2, 10, 2)
val pairs = k.zip(v) //Array(("<", 2), ("-", 10), (">", 2))
pairs.map // преобразует к асс. массиву

Классы

- аналогично java:
class Counter {
 private var value = 0 // Поля должны инициализироваться
 def increment() { value += 1 } // Методы по умолчанию общедоступные
 def current() = value
}
- Геттеры / сеттеры генерируются автоматически
class Counter {
 var value = 0 
 dev value() = value // геттер
 dev value_ //сеттер - сеттер можно переопределить
 // но обращение все также будет по value
 val conts // поле только для чтения
 @BeanProperty var name: String = _ // геттеры и сеттеры не генерятся
 
 def this(name: String) { // Дополнительный конструктор
  this() // Главный конструктор
 ...
- параметры главного конструктора - автоматически генерируют свойства и геттеры/сеттеры к нему (поле становится приватным)
class Person(name: String, age: Int, private var age1: Int)) { //плюс можно регулировать сразу видимостью геттера/сеттера
 def description = s"$name is $age years old"
}
- можно вкладывать классы в классы и функции в функции

Объекты

- нет статичных методов, вместо этого используется object - объект этого типа может иметь только 1 экземпляр (реализует singleton)
object Accounts {
 private var lastNumber = 0
 def newUniqueNumber() = { lastNumber += 1; lastNumber }
}
var num = Accounts.newUniqueNumber()
- Объекты компаньоны - должны определяться в одном файле и имеют доступ к приватным свойствам/методам друг друга
- метод apply - служит для создания и возвращения класса компаньона
class Account private (val id: Int, initialBalance: Double) {
 private var balance = initialBalance
 ...
}
object Account { // Объект-компаньон
 def apply(initialBalance: Double) =
  new Account(newUniqueNumber(), initialBalance)
 ...
}
val acct = Account(1000.0) //без new вызовется apply
Пример применения apply:
- Array(100) - вернет объект-массив с 1 элементом = 100
- new Array(100) - выполнит конструктор и создаст пустой массив из 100 элементов

- Любая программа должна начинаться с main:
object Hello {
 def main(args: Array[String]) {
  println("Hello, World!")
 }
}
Но если унаследовать App, то это можно делать в конструкторе:
object Hello extends App {
 println("Hello, World!")
}

- перечисления - enum
Нет встроенного типа, но есть класс
object TrafficLightColor extends Enumeration {
 val Red, Yellow, Green = Value
}
теперь можно обращаться:
if( TrafficLightColor.value == TrafficLightColor.Red ) println("Stop")

Пакеты и импортирование

- пакеты служат для управления пространствами имен
package impatient {
class Employee
...
для обращения нужно указывать полный путь: impatient.Employee
- 1 пакет может быть определен в нескольких файлах
- имена пакетов могут быть относительными
- альтернатива создать пакет без фигурных скобок:
package people // в начале файла
- в пакете не может быть переменных и функций
для обхода можно использовать объект пакета (1 на пакет)
package object people {
 val defaultName = "John Q. Public"
}
внутри пакета можно обращаться просто по defaultName
снаружи с полным путем, включая имя объекта
- свойство видимое только в пакете people:
private[people] def description = s"A person with name $name"
- импортирование аналогично:
import java.awt._
-- но может располагаться в любом месте и будет видимо до конца блока
-- импорт в 1 строку нескольких пакетов:
import java.awt.{Color, Font}
-- пакет можно переименовать:
import java.util.{HashMap => JavaHashMap}

Наследование

- по умолчанию все как в java:
class Employee extends Person
- final может быть как класс, так и метод (в отличии от java)
- override - для переопределения не абстрактных методов и переменных требуется
- super.method_name - вызов родителя
- protected методы видны в классе, но не видны в пакете
чтобы стали видны нужно писать protected[пакет]
- во второстепенном конструкторе класса можно вызывать только главный конструктор класса
super конструктор может быть вызван только из главного конструктора (сигнатуры описанной при создании class)
- анонимные классы создаваемые 1 раз при определении:
val alien = new Person("Fred") {
 def greeting = "Greetings, Earthling! My name is Fred."
}
- также есть абстрактные классы:
abstract class Person(val name: String)
-- абстрактное свойство задается при создании:
val id: Int
- инициализация переменных до вызова конструктора супер класса
к примеру если в нем инициализация массива использует эту переменную
class Bug extends {
 override val range = 2
} with Creature
в этом случае сначала инициализируется переменная, потом вызывается конструктор
-- также это можно обойти, если обозначить поле как lazy, чтобы все инициализации прошли при первом обращении - наследование классов в scala:
Any <- AnyVal <- Int..Double...
    <- AnyRef <- все классы
-- в any есть isInstanceOf и hashcode
Встроенный хеш-код генерируется лишь один раз для каждого объекта при первом вызове hashCode(), после сохраняется в заголовке объекта для последующих вызовов.
Но для первого раза используется random или Xorshift:
  0 – Park-Miller RNG (по умолчанию)
  1 – f(адрес, глобальное_состояние)
  2 – константа 1
  3 – последовательный счетчик
  4 – адрес объекта
  5 – Thread-local Xorshift (псевдослучайное от threadid)
-- anyval - нет нового
-- anyref добавляет: wait, notify из Object , synchronized
-- класс Null = null
-- Nothing может использоваться для любого значения: [T] == [Nothing]
-- Unit == void в c++ == ()
- equals - сравнивает равенство адресов ссылок объектов, но его можно переопределить:
final override def equals(other: Any) = {
 other.isInstanceOf[Item] && {
  val that = other.asInstanceOf[Item]
  description == that.description && price == that.price
 }
}
-- аналогично переопределяется метод hashCode
- Классы с одной переменной - классы значения
они не создаются каждый раз, а встраиваются в место вызова:
class MilTime(val time: Int) extends AnyVal {
 def minutes = time % 100
 def hours = time / 100
 override def toString = f"$time04d"
}

Файлы и регулярные выражения

- чтение всего файла в буфер:
import scala.io.Source
val source = Source.fromFile("myfile.txt", "UTF-8")
val lineIterator = source.getLines //toArray, mkString
for (l <- lineIterator) //обработка строки
for (c <- source) //обработка посимвольно
-- если файл большой, то используем буферизированное чтение:
val iter = source.buffered
while (iter.hasNext) {
 if (iter.head is nice) //работаетм
- чтение URL
val source1 = Source.fromURL("http://horstmann.com", "UTF-8")
- двоичные файлы читаются как в java
- запись также как в java
- сериализация:
@SerialVersionUID(42L) class Person extends Serializable //SerialVersionUID - установка значения переменной
- выполнение команд командной строки:
import scala.sys.process._
val result = "ls -al /".!!
("ls -al /" #| "grep u").! // если нужно переправить данные между программами
- строки в тройных кавычках не надо экранировать:
val wsnumwsPattern = """\s+[0-9]+\s+""".r
- NIO - новая реализация отображения файлов в память, самый быстрый вариант
-- чтение данных
-- складывание в буфер, который мы читаем
+ эти процессы происходят в параллель (асинхронность), что дает повышенную производительность при чтении, так и при записи (т.к. данные обмениваются через быстрый буфер в памяти)

Трейты (интерфейсы)

- может быть несколько трейтов, но не несколько классов (все как в java)
- может потребовать наличие полей, методов, суперклассов
-- требование наличия суперкласса Exception:
trait LoggedException extends Logged {
 this: Exception =>
  def log() { log(getMessage()) }
}
-- требование наличие метода getMessage:
trait LoggedException extends Logged {
 this: { def getMessage() : String } =>
  def log() { log(getMessage()) }
}
- может быть часть реализации (как у абстрактных классов java) - называется примесью (добавляется часть новой реализации)
trait ConsoleLogger {
 def log(msg: String) { println(msg) }
}
- важен порядок - первые методы в конце списка - методы будут вызываться последовательно с конца или перетираться
trait TimestampLogger extends ConsoleLogger { //пример с расширением трейта
 override def log(msg: String) {
  super.log(s"${java.time.Instant.now()} $msg") //вызывает вышестоящий трейт ConsoleLogger
 }
}
-- но есть возможность обратиться к конкретному трейту:
super[ConsoleLogger].log(...)
- при переопредлении метода трейта не нужно указывать override
- доп трейты наследуются через with
class ConsoleLogger extends Logger with Cloneable with Serializable
- при создании объекта можно подмешать другой класс, который наследует тот же trait
val acct = new SavingsAccount with ConsoleLogger
теперь функция log будет из ConsoleLogger,а не из Log, как по умолчанию в SavingsAccount (т.е. получается как бы наследование на лету)
- трейт может иметь конструктор как обычный класс, но конструктор будет без параметров (единственное техническое отличие от классов!)
-- проблема инициализации переменных в том, что переменные инициализируются до вызова основного конструктора-класса
чтобы обойти, нужно использовать префиксную установку переменной до конструктора: //то же самое было описано выше в классах
val acct = new { 
 val filename = "myapp.log"
} with SavingsAccount with FileLogger
либо использовать lazy переменные
- трейты могут наследоваться от класса, тогда он становится суперклассом для подмешивающего

Операторы

- инфиксные операторы:
замена вызова метода: .to, -> на 1 to 10, 1 -> 10
- опрделение своего оператора как в java:
def *(other: ClassName) = new ClassName(x * other.x, y * other.y)
- f(arg1, arg2, ...) вызовет f.apply(arg1, arg2, ...) , если метода f нет в классе
- f(arg1, arg2, ...) = value - если слева от =, то вызовет update
это используется в массивах:
scores("Bob") = 100
- unapply - Обратная операция - объект разворачивается в переменные:
object Fraction {
 def unapply(input: Fraction) =
  if (input.den == 0) None else Some
использование: val Fraction(a, b) = f;
- динамические методы и переменные:
import scala.language.dynamics
-- динамический вызов транслируется в :
obj.applyDynamic("name")(arg1, arg2, ...) 
//или 
obj.updateDynamic("name")(выражение_справа) //для присвоения
-- person.lastName = "Doe"
транслируется в:
person.updateDynamic("lastName")("Doe")
реализовывать функцию нужно самим
def updateDynamic(field: String)(newValue: String) { ... } 

Функции высшего порядка

- функцию можно положить в переменную:
val fun = ceil _
_ означает необходимость параметра для функции
- метод класса:
val f = (_: String).charAt(_: Int)
переменная: (параметр: тип).функция(параметр: тип)
val f: (String, Int) => Char = _.charAt(_)
переменная: (входные параметры) => выходной параметр = вызов функции с параметрами
- анонимная функция
(x: Double) => 3 * x
можно передавать ее как параметр, без переменных:
Array(3.14, 1.42, 2.0).map((x: Double) => 3 * x)
более короткая запись:
Array(3.14, 1.42, 2.0).map(3 * _)
.reduceLeft(_ * _) //последовательное применение слева направо
- функция, параметром которой является функция:
def funcname(f: (Double) => Double) = f(0.25) // принимается любая функция с параметром = Double
funcname(sqrt _) //пример
- функция возвращает функцию:
def mulBy(factor : Double) = (x : Double) => factor * x
в функцию mulBy положена функция "(x : Double) => factor * x"
val quintuple = mulBy(5)
quintuple(20) // 100
- каррирование - преобразование ф. с 2 параметрами в 2 функции с 1 параметром:
def mulOneAtATime(x: Int) = (y: Int) => x * y
mulOneAtATime(6)(7)
- передача абстрактного кода:
def runInThread(block: => Unit) ///
runInThread { println("Hi"); Thread.sleep(10000); println("Bye") }
- return может быть использован чтобы определить тип возвращаемых данных, если одновременно используется анонимная функция без типа, которая должна посчитать результат

Коллекции

- наследование объектов:
Iterable -> Seq -> IndexedSeq //обычный массив или список (+ список с индексным доступом)
   -> Set -> SortedSet //коллекция значений (без индексов?)
   -> Map -> SortedMap //множество пар (ключ,значение)
- Преобразование типов через: .to*
- все коллекции по умолчанию неизменяемые (Immutable)
т.е. если мы изменяем коллекцию, то создается копия с изменениями
- Vector - immutable расширяемый массив (ArrayBuffer - muttable)
Технически хранится в виде b-дерева, с элементами в узле
- Range - последовательность чисел
Технически хранит: начало, конец и шаг
- на последовательностях сделано: Stack, Queue, Prior Queue, ListBuffer
- списки (List) - есть указатели на начало (.head) , .tail будет указывать на список за первым элементом
val digits = List(4, 2)
каждый элемент списка также список из 1 элемента с функциями начало и конец
-- добавить значение в начало:
9 :: List(4, 2)
- ListBuffer - имеет отдельные ссылки на начало и конец
- Множества - коллекция, где значение может присутствовать 1 раз:
Set(2, 0, 1) // порядок следования не гарантируется
для хранения используется хэш массив
-- LinkedHashSet - использует и хэш массив и связанный список, что сохраняет последовательность
- добавление элемента в неупорядоченную коллекцию: +
добавление в конец: :+ , начало: +:
++ и -- для добавления и удаления группы элементов
+= или ++= создают новую коллекцию добавляя элмент или множество
- map применяет одноместную функцию над каждым элементом создавая новую коллекцию:
names.map(_.toUpperCase)
тоже самое , что
for (n <- names) yield n.toUpperCase //это транслируется в map
-- flatmap - если функция возвращает коллекцию, то элементы развернутся в основную
-- transform - muttable map (без создания копии)
-- foreach - если не надо никуда сохранять, а только вызвать
- reduce - применяет 2х местную функцию и возвращает 1 результат (так последовательно к результату и следующему элементу, пока в конце не останется 1 элеменент)
- список1 zip список2 - объединение 2 списков в 1 список пар значений
-- обращение к элементам пар значений:
 (prices zip quantities) map { p => p._1 * p._2 }
-- zipall - объединение листов разного размера со значением по умолчанию
- iterator используется там, где поместить объект целиком в память слишком дорого
for (elem <- iter) //каждый вызов изменяет итератор
- неизменяемые итератор: #:: - потоки - ленивый вызов следующего в списке
val words = Source.fromFile("/usr/share/dict/words").getLines.toStream

- for yield - более короткая замена для Map/flatmap/withfilter
val nums = List(2, 5, 1, 7, 4)
val nums2 = for {
 x <- nums
 y <- 1 to x if y > 3 //вложенный цикл
 y2 = y * 2
 z <- nums if z < y2
} yield z + y2 - y
это аналог более длинной записи:
val nums2 =
nums.flatMap(x =>
 (1 to x).withFilter(y => y > 3) //вложенный цикл
 .map(y => (y, y * 2))
 .flatMap { case (y, y2) =>
 nums.withFilter(z => z < y)
 .map(z => z + y2 - y)
 })
- можно приводить java объекты к scala коллекциям
import scala.collection.JavaConversions._
val props: scala.collection.mutable.Map[String, String] =
System.getProperties()

Сопоставление с образцом

- аналог switch:
ch = ..
ch match {
 case '+' | '*' => sign = 1
 case '-' => sign = -1
 case _ => sign = 0
}
-- не нужен brake - он есть по умолчанию
-- _ - если не будет, то выкинется исключение при проверке
-- можно перечислять несколько значений через |
-- в case может быть любое логическое условие:
case _ if Character.isDigit(ch) => digit = Character.digit(ch, 10)
-- присвоение идет произвольной переменной за =>
-- сопоставление может идти с типом переменной:
case s: String => Integer.parseInt(s)
- сопоставление массива 2 переменным и всего остального последовательности rest:
val Array(first, second, rest @ _*) = arr
- сопоставление ключа и значения ассоц. массива может быть в цикле:
for ((k, v) <- System.getProperties())
- сопоставление может идти по типу класса:
case Dollar(v) => s"$$$v"
- объект можно скопировать с изменением параметра:
val price = amt.copy(value = 19.95)
- для сопоставления с пустым значением можно использовать тип Some
case Some(score) => println(score) //любое значение
case None => println("No score") //пустое значение (замена "" или NULL)
- сравнение с коллекциями:
def sum(xs: List[Int], start: Int = 0 ) = xs match {
 case List() => start
 case List(x) => start + x
 case List(x, y) => start + x + y
- для сопоставления с образцом класса, нужен специальный case класс:
case class Address(country: String, city: String)
def addressInfo(address: Address): String = address match{
 case Address("Russia", _) => "russian"
 case Address("Japan", _) => "japanese"
 case _ => "no info"
}

Аннотации

Диррективы компилятору
- определение собственной аннотации
class unchecked extends annotation.Annotation
- volatile поле, которое можно менять из разных потоков
jvm выключает оптимизации: помещение в кэше процессора и т.д.
Изменения данных всегда будет видно всем процессам программы, т.к. обновление будет идти через общее хранилище.
@volatile var done = false
- несериализуемое поле
@transient var recentLookups = new HashMap[String, String]
- методы на c++
@native def win32RegKeys(root: Int, path: String): Array[String]
- Контролируемые исключения:
@throws(classOf[IOException]) def read(filename: String) { ... }
//аналог java:
void read(String filename) throws IOException
- генерация геттеров/сеттеров:
@BeanProperty var name : String = _
- если рекурсивный вызов - просто вызов, то он может быть преобразован к циклу:
большая рекурсия может отвалиться по памяти стека:
//плохо: 
if (xs.isEmpty) 0 else xs.head + sum(xs.tail) //значение + рекурсивный вызов не может автоматически преобразоваться к циклу
//хорошо: 
@tailrec def sum2(if (xs.isEmpty) partial else sum2(xs.tail, xs.head + partial)
- @switch - трансформировать swith в таблицу переходов, что оптимальнее if
- @elidable(500) - метод после 500 сборки удалится из скомпилированного кода
- def allDifferent[@specialized(Long, Double) T](x: T, y: T, z: T) = ... - генерация специализированных функций под каждый тип
- @deprecated(message = "Use factorial(n: BigInt) instead") - устаревшая функция
@deprecatedName('sz) - устаревший параметр

Обработка XML

- создание xml переменной:
val elem = <a href="http://scala-lang.org">The <em>Scala</em> language</a>
- обработка элементов:
for (n <- elem.child) обработать n
- доступ к атрибуту:
val url = elem.attributes("href") // вернется список, даже если 1 элемент
- обход всех атрибутов:
for (attr <- elem.attributes)
- вставка переменной в xml:
<ul><li>{items(0)}</li><li>{items(1)}</li></ul>
<ul>{for (i <- items) yield <li>{i}</li>}</ul>
- xpath поиск:
-- поиск непосредственного тега body - любой тег - тег li
doc \ "body" \ "_" \ "li"
-- поиск тега img на любом уровне вложенности
doc \\ "img"
- case <img/> => ... - выберется если img с любым содержанием
- объекты неизменяемые, для изменения атрибута нужно его скопировать:
val list2 = list.copy(label = "ol")
- для трансформации можно использовать класс с case преобразования внутри:
- загрузка xml:
import scala.xml.XML
val root = XML.loadFile("myfile.xml")
- загрузка с сохранением комментариев:
ConstructingParser.fromFile(new File("myfile.xml"), preserveWS = true)
- сохранение xml:
XML.save(writer, root, "UTF-8", false, null)

Обобщенные типы

- обобщенный класс (generic):
class Pair[T, S](val first: T, val second: S)
- если хотим сравнивать:
class Pair[T <: Comparable[T]](val first: T, val second: T)
- если хотим поменять местами, то наоборот T должен быть супертипом:
def replaceFirst[R >: T](newFirst: R) = new Pair(newFirst, second)
- если класс не реализует Comparable, но может привестись к нему (к примеру Int к RichInt):
class Pair[T](val first: T, val second: T)
(implicit ev: T => Comparable[T])
- для работы с массивом обобщенных типов, нужен "ClassTag"
def makePair[T : ClassTag](first: T, second: T) = {
 val r = new Array[T](2); r(0) = first; r(1) = second; r
}
- можно одновременно указывать обе границы:
T <: Upper >: Lower
- обобщенный тип нескольких трейтов:
T <: Comparable[T] with Serializable with Cloneable
- Ограничение типов
T =:= U //равенство типу
T <:< U //подтип
T <%< U //может приводиться к типу

- вариантность и род:
-- ковариантность - тип A производится (может находится только в возвращаемых результатах)
trait Coll[+A]{
 def apply(i: Int): A
}
т.е. +A может быть любым детализирующим подтипом A (doc/cat от Animal)
-- контрвариантность - наоборот - тип только во входных параметрах
trait Printer[-A]{
 def print(a: A): String
}
т.е. -A может быть любым более общим типом A (может принимать Animal , как doc/cat)
но может использоваться и как тип для подтипа в результате:
def prefixed(s: String): Printer[A]
-- вариантность:
все функции варианты:
потребляет A => и возвращает R
trait Function1[-A, +R]
-- род - тип типов
Int, String, List[Int]: T - простой обобщенный тип
List, Vector : T[_] - обобщенный тип, который принимает параметр подтипа
Map, Function : T[_, _]
пример:
case class IntContainer[F[_]](value: F[Int]) 
контейнер для int - F может быть List, Vector
также можно накладывать ограничения:
case class Dict[K, V, T[X] <: Seq[X]](items: T[(K, V)])
T[X] - контейнер Xов должен быть подтипом Seq[X]
-- если хотим использовать ковариантый результат в параметре, то нужно определить подтип от типа:
B >: ковариантый тип

- псевдоним для типа:
type IntList = List[Int]
и обобщенного:
type DenseMatrix[A] = Vector[Vector[A]]
также можно указывать ограничения:
type Matrix[F[X] <: Iterable[X], A] = F[F[A]]
type IntMap[A] = Map[Int, A]
type DenseMatrix[A] = Matrix[IntMap, A]
типы компоненты - псевдонимы без определения:
trait Item {
 type Key
 type Value
 def key: Key
 def value: Value
}
компонентный тип с уточнением:
trait Container {
 type Item
 def values: List[Item]
}
val ints: Container {type Item = Int} =
 new Container {
 type Item = Int
 val values = List(1, 2, 3)
 }
val xs: List[Int] = ints.values // List[Int]

Дополнительные типы

- составление цепочек вызовов:
article.setTitle("Whatever Floats Your Boat").setAuthor("Cay Horstmann")
для того, чтобы она работала и для наследников базового класса нужно указывать "this.type":
def setTitle(title: String): this.type = { ...; this }
- псевдоним для длинного типа:
type Index = HashMap[String, (Int, Int)]
- необходимость передачи типа данных с методом "append"
def appendLines(target: { def append(str: String): Any },
- внедрение зависимостей:
//1. трейт:
trait Logger { def log(msg: String) }
//2. 2 реализации с выводом на экран и в файл
//3. аутентификация с поддержкой логирования
trait Auth {
 this: Logger => //трейт может подмешиваться в классы , наследующий Logger
  def login(id: String, password: String): Boolean
}
//4. трейт приложения должен подмешиваться в класс с логированием и аутентификацией
trait App {
 this: Logger with Auth =>
 ...
}
//5. приложение собирается с нужными классами для логирования:
object MyApp extends App with FileLogger("test.log") with MockAuth("users.txt")
- абстрактный тип:
trait Reader {
 type Contents
// определение абстрактного типа:
class StringReader extends Reader {
 type Contents = String
- то же самое через обобщенный тип:
trait Reader[C] {
class StringReader extends Reader[String]

Неявные преобразования

- функции с неявными преобразованием параметров Int к Fraction
implicit def int2Fraction(n: Int) = Fraction(n, 1)
val result = 3 * Fraction(4, 5) // Вызовет int2Fraction(3) * Fraction(4, 5)
- добавление своей функции в стандартный класс:
-- добавляем функцию read
class RichFile(val from: File) {
 def read = Source.fromFile(from.getPath).mkString
}
-- неявное преобразование класса File к RichFile:
implicit def file2RichFile(from: File) = new RichFile(from)
-- вместо функции можно использовать класс:
implicit class RichFile(val from: File) { ... }
дальше обычно создаем объект File
помещаем функции в файл
и обязательно импортируем без префикса:
import com.horstmann.impatient.FractionConversions._
чтобы минимизировать число неявных преобразований, вызов можно делать внутри кода класса
- необязательные параметры:
def quote(what: String)(implicit delims: Delimiters) =
//явный параметр:
quote("Bonjour le monde")(Delimiters(""", """)) 
можно заранее объявить неявный разделитель:
implicit val quoteDelimiters = Delimiters(""", """)


Конкурентное программирование Scala

Реализация многопоточности в JVM

- поток в jvm это обертка над потоком ОС (в отличии от python)
- java-style поток:
object ThreadsCreation extends App {
 class MyThread extends Thread {
  override def run(): Unit = {
   println("New thread running.")
  }
 }
 val t = new MyThread
 t.start()
 t.join() //основной поток приостанавливается до окончания работы T
 println("New thread joined.")
}
- простой способ вернуть данные из потока - это объявить глобальную переменную, но использовать ее после join
- объявление синхронизированной функции:
def getUniqueId() = this.synchronized {}
- самодельный пул потоков:
-- пул ожидающий работы (wait):
while (tasks.isEmpty) tasks.wait()
tasks.dequeue()
-- помещение программы в пул и нотификация, что нужно перестать делать wait
tasks.enqueue((
-- остановить пул при установке некого флага:
Worker.interrupt()

Особенности сборки мусора в jvm
- очень плохо относится к swap - даже если 1 поток ушел туда, т.к. виртуальная машина опрашивает всех потоки
- забрав ОЗУ, обратно не отдается
- если heap закончилось, то запускается сборщик мусора с попыткой высвободить память
Адаптивный сборщик мусора:
- Во время сбора мусора работа программы останавливается.
- Сборщик прослеживает связи от стека/статической памяти до кучи на наличие ссылок
-- Если в памяти много мелких объектов и они лежат достаточно плотно, то все активные объекты переносятся из одной кучи в другу и плотно упаковываются.
Оставшиеся объекты в старой куче уничтожаются. Этот процесс может происходит как целиком на всех данных или кусочками.
-- Если объекты меняются редко или данные сильно разбросаны / фрагментированы, то данные не переносятся. Объекты без ссылок в куче помечаются к удалению и вычищаются
В среднем этот метод медленней, но в случае сильной фрагментации он лучше.

Атомарные изменчивые переменные

@volatile var found = false
такие переменные изменяются за 1 такт, поэтому не могут меняться одновременно в нескольких потоках
на уровне процессора это операция: compareAndSet - установить значение, если compare выполнилось удачно
полный список описан тут: java.util.concurrent.atomic: AtomicBoolean, AtomicInteger, AtomicLong и AtomicReference
Пример использования:
private val uid = new AtomicLong(0L)
def getUniqueId(): Long = uid.incrementAndGet() //увеличить счетчик и получить значение
getAndSet //получить значение и установить новое
Аналог через synchronized:
def compareAndSet(ov: Long, nv: Long): Boolean =
this.synchronized {
 if (this.get == ov) false else {
  this.set(nv)
  true
 }
}
- lazy переменные инициируются 1 раз при первом обращении
для проверки инициализации используются дополнительное атомарное поле-флаг
- muttable объекты нужно использовать с блокировками
- шаблон producer / consumer
между потребителем и производителем находится конкурентная очередь с данными:
- BlockingQueue - реализация в Scala: add/remove/element - с исключениями (poll/ofer/peak - со специальным значение) (take, put - блокирующие)
-- ArrayBlockingQueue - очередь фиксированного размера
-- LinkedBlockingQueue - неограниченная очередь (когда производители гарантированно работают быстрее потребителей)
- Конкурентные объекты:
val files: concurrent.Map[String, Entry] = new concurrent.TrieMap() // хэш массив с блокировками
-- concurrent.TrieMap() - позволяет обходить массив по его копии, что не блокирует его самого и не дает получить изменяемые данные

Параллельные коллекции данных

- параллельный запуск обработки колекции = вызов функции par
coll.par.count(_ % 2 == 0)
-- можно распараллелить цикл:
for (i <- (0 until 100000).par) print(s" $i")
-- преобразование параллельной коллекции в последовательную:
coll.par.filter(p).seq
-- reduce может применяться над параллельными коллекциями, если выполняется a op b == b op a ( к примеру сложение просто парралелиться, но не вычитание, т.к. оно зависит от порядка )
взамен reduceLeft в параллельных программах лучше использовать aggregate - каждый поток будет иметь свой аккумулятор вместо одного общего
--- Установка уровня параллельности для блока:
def withParallelism[A](n : Int)(block : => A) : A = {
  import collection.parallel.ForkJoinTasks.defaultForkJoinPool._
  val defaultParLevel = getParallelism
  setParallelism(n)
  val ret = block
  setParallelism(defaultParLevel)
  ret
}
withParallelism(2) {
  (1 to 100).par.map(_ * 2)
}
--- Установка параллельности для оператора:
val fjpool = new ForkJoinPool(2)
val customTaskSupport = new parallel.ForkJoinTaskSupport(fjpool)
coll.par.tasksupport = customTaskSupport
- для преобразования последовательной коллекции к параллельной используются сплиттеры (надстройка над итератором)
-- линейноссылочные структуры List/Stream не имеют эффективного алгоритма Split
при вызове par у этих структур они сначала преобразуются к обычным массивам
- Пример создания своей параллельной коллекции (в данном случае строки):
class ParString(val str: String) extends immutable.ParSeq[Char] { //immutable.ParSeq[Char] - последовательность Char
 def apply(i: Int) = str.charAt(i)
 def length = str.length
 def splitter = new ParStringSplitter(str, 0, str.length) //в последовательных коллекциях тут был бы итератор
        //в данном случае сплиттер должен возвращать часть строки
 def seq = new collection.immutable.WrappedString(str) //преобразование параллельной коллекции в последовательную
}
сплиттер должен быть отнаследован от трейта:
trait IterableSplitter[T] extends Iterator[T] {
 def dup: IterableSplitter[T] //заглушка - создание копии сплиттера
 def remaining: Int //число элементов в сплиттере
 def split: Seq[IterableSplitter[T]] //разделение сплиттера на подсплиты
}
- необязательные функции:
psplit - ручное задание размеров для сплита
- пример сплита на 2 части:
def split = {
 val rem = remaining
 if (rem >= 2) psplit(rem / 2, rem - rem / 2) //вызываем psplit 2 раза
 else Seq(this)
}
- psplit создает сплиттер с половиной строки:
def psplit(sizes: Int*): Seq[ParStringSplitter] = {
 val ss = for (sz <- sizes) yield {
  val nlimit = (i + sz) min limit
  val ps = new ParStringSplitter(s, i, nlimit) //нет копирования, просто передача индексов по строке
  i = nlimit
  ps
 }
 if (i == limit) ss
 else ss :+ new ParStringSplitter(s, i, limit)
}
- Комбинаторы - слияние сплитов обратно при использовании методов трансформации: map, filter groupBy

Объекты Future

- создание отдельного потока из пула потоков:
import java.time._
import scala.concurrent._
import ExecutionContext.Implicits.global //пул потоков
val f = Future {
 Thread.sleep(10000)
 println(s"This is the future at ${LocalTime.now}") //отдельный поток
 42
}
val result = Await.result(f, 10.seconds) // ожидание завершение потока с блокировкой основной программы
      // 2ой параметр - это таймаут ожидания потока, если превысит, то будет TimeoutException.
-- другой вариант ожидания:
Await.ready(f, 10.seconds)
val Some(t) = f.value
-- альтернативный вариант без блокировки:
по результату работы асинхронно вызовется функция:
f.onComplete {
 case Success(v) => println(s"The answer is $v")
 case Failure(ex) => println(ex.getMessage)
}
println(s"This is the present at ${LocalTime.now}")
-- объект Future должен возвращать результат или генерировать exception в случае ошибки

- размер пула по умолчанию = числу ядер
-- если нужно другое значение, то нужно создать свой пул потоков:
val pool = Executors.newCachedThreadPool()
implicit val ec = ExecutionContext.fromExecutor(pool)
- чтобы перехватить иключение, можно использовать объект Try:
val result = Try(str.toInt)
// обработка результата:
if (t.isSuccess) println(s"The answer is ${t.get}")
if (t.isFailure) println(t.failed.get.getMessage)
- если нужно выполнить несколько потоков парраллельно, но данные нужны из обоих, то
//выполняем их параллельно:
val future1 = Future { getData1() }
val future2 = Future { getData2() }
// а результат ожидаем в последовательном oncomplete
future1 onComplete {
 case Success(n1) =>
 future2 onComplete {
  case Success(n2) => {
   val n = n1 + n2
   println(s"Result: $n")
  }
  case Failure(ex) => ...
 }
 case Failure(ex) => ...
}
- еще вариант параллельной обработки нескольких future через map:
val combined = for (n1 <- future1; n2 <- future2) yield n1 + n2 //т.к. Future имеет метод map, позволяющий работать с ним как с массивом
- или через async:
val combined = async { await(future1) + await(future2) }
- если функции нужно вызывать последовательно, то это future блоки надо определить как функции, а не переменные как ранее
def future1 = Future { getData() }
def future2 = Future { getMoreData() } // def, не val
val combined = for (n1 <- future1; n2 <- future2) yield n1 + n2 //сначала вызовется функция future1, потом future2
- обработка исключения SQLException внутри Future:
val f = Future { persist(data) } recover { case e: SQLException => 0 }
- разделение данных на части и их параллельная обработка:
val futures = parts.map(p => Future { вычислить результат в p })
//получить коллекция результатов, по окончанию всех потоков:
val result = Future.sequence(futures); //функция не блокирует программу, так что изначально тут просто ссылки на объекты без результатов
//сложение результатов, по мере их готовности:
val result = Future.reduceLeft(futures)(_ + _)
//если нужно получить первый выполнившийся поток:
Future[T] result = Future.firstCompletedOf(futures) // но другие задания продолжат работу!

- Через Promise можно вернуть несколько значений из потока:
val p1 = Promise[Int]()
val p2 = Promise[Int]()
Future {
 val n1 = getData1()
 p1.success(n1)
 val n2 = getData2()
 p2.success(n2)
}
// получение результата из первого выполнившегося потока:
val p = Promise[Int]()
Future {
 var n = workHard(arg)
 p.trySuccess(n)
}
Future {
 var n = workSmart(arg)
 p.trySuccess(n)
}

Реактивное программирование

подписка на изменения объекта
- для использования нужно подключить стороннюю библиотеку:
com.netflix.rxjava
- подписка на изменения 3 строк - 2 разных подписчика (в Future только 1 onComplete)
val o = Observable.items("Pascal", "Java", "Scala") //синхронный вызов, через timer - асинхронный
o.subscribe(name => log(s"learned the $name language"))
o.subscribe(name => log(s"forgot the $name language"))
- пример - мониторинг изменения файлов:
def modified(directory: String): Observable[String] = {
 Observable.create { observer =>
  val fileMonitor = new FileAlterationMonitor(1000) //генерация событий каждые 1000мс
  val fileObs = new FileAlterationObserver(directory)
  val fileLis = new FileAlterationListenerAdaptor {
   override def onFileChange(file: java.io.File) {
    observer.onNext(file.getName)
   }
  }
  fileObs.addListener(fileLis)
  fileMonitor.addObserver(fileObs)
  fileMonitor.start()
  Subscription { fileMonitor.stop() }
 }
}
подписка на мониторинг:
val sub = modified(".").subscribe(n => log(s"$n modified!"))

Программная транзакционная память

- в основе лежит операция atomic
для работы нужна библиотека org.scala-stm
import scala.concurrent.stm._
def swap() = atomic {
атомарные операции не подвержены взаимоблокировкам, из-за этого они используются в оптимистических блокировках:
-- перед началом запоминается состояние памяти
-- если оказывается при записи, что область была изменена, то транзакция рестартуется
- для применения оптимистичной блокировки над переменной, нужно использовать ссылочный тип:
val urls = Ref[List[String]](Nil)
val clen = Ref(0)
- минус оптимистических блокировок:
-- рестарт транзакции, так что в транзакции нельзя вызывать неоткатываемые транзакции - к примеру http запрос
-- дополнительная нагрузка из-за рестарта
- для 1 минуса - "повторный вызов неоткатываемого запроса", можно использовать событие - aftercommit:
def inc() = atomic { implicit txn =>
 val valueAtStart = myValue()
 Txn.afterCommit { _ => //вызов неоткатываемого действия после удачного вызова
  log(s"Incrementing $valueAtStart")
 }
 Txn.afterRollback { _ => //после неудачного - оно будет рестартовано?
  log(s"rollin’ back") //тут можно сделать чтото, чтобы избежать повторного исполнения
 }
 myValue() = myValue() + 1
}
- в случае исключения внутри atomic - она целиком откатывается
- повторение транзакции, но с таймаутом в 1с:
Future {
 blocking {
  atomic.withRetryTimeout(1000) { implicit txn =>
   if (message() != "") log(s"got a message - ${message()}")
   else retry
  }
 }
}
- бесконечный повтор с интервалом в 1с:
Future {
 blocking {
  atomic { implicit txn =>
   if (message() == "") {
    retryFor(1000)
    log(s"no message.")
   } else log(s"got a message - ‘${message()}’")
  }
 }
}
- для локальных переменных нужно использовать TxnLocal - она индивидуальна в каждой транзакции
val myLog = TxnLocal("")
- транзакционный массив:
val website: TArray[String] = TArray(pages)
- транзакционный словарь:
val alphabet = TMap("a" -> 1, "B" -> 2, "C" -> 3)
-- словарь имеет возможность получить неизменяемый снапшот:
val snap = alphabet.single.snapshot

Акторы

взаимодействие независимых программ посредством сообщений (модель producer / consumer )
- Общие данные хранятся в памяти актора. Новые сообщения поступают через очередь, и обрабатываются внутри последовательно.
Акторы могут даже находится на разных компах, логика не зависит от их положения
- для работы нужен фреймворк Akka
- Первое: нужно объявить шаблон-класс акторов, который описывает его поведение и порядок приема сообщений
на основе этого шаблона будут создаваться акторы:
import akka.actor._
import akka.event.Logging
class HelloActor(val hello: String) extends Actor {
 val log = Logging(context.system, this)
 def receive = {
  case 'hello' =>
   log.info(s"Received a '$hello'... $hello!")
  case msg =>
   log.info(s"Unexpected message ‘$msg’")
  context.stop(self) //остановка актора
 }
}
- создание актора по шаблону:
def props(hello: String) = Props(new HelloActor(hello))
- отсылка сообщений в акторы:
отправка происходит в очередь, не дожидаясь окончания обработки
val deafActor: ActorRef = ourSystem.actorOf(Props[DeafActor], name = "deafy")
deafActor ! "hi" // ! - оператор для отправки сообщения
deafActor ! 1234
- поведение и состояние акторов. Актор со счетчиком:
работает как конечный автомат
при достижении 0 , актор больше не принимает receive - полностью переключается на done
class CountdownActor extends Actor {
 val log = Logging(context.system, this)
 var n = 10
 def counting: Actor.Receive = {
  case "count" =>
  n -= 1
  log.info(s"n = $n")
  if (n == 0) context.become(done)
 }
 def done = PartialFunction.empty
 def receive = counting
}
-- переключать во время работы можно на любую функцию, используя:
context.become(function)
- Иерархия акторов
-- Дочерний актор:
class ChildActor extends Actor {
 val log = Logging(context.system, this)
 def receive = {
  case "sayhi" =>
   val parent = context.parent
   log.info(s"my parent $parent made me say hi!")
 }
-- Родительский вызывает дочерний:
class ParentActor extends Actor {
val log = Logging(context.system, this)
 def receive = {
  case "create" =>
   context.actorOf(Props[ChildActor])
   log.info(s"created a kid; children = ${context.children}")
  case "sayhi" =>
   log.info("Kids, say hi!")
   for (c <- context.children) c ! "sayhi"
-- получить путь в иерархии акторов:
context.actorSelection(path)
-- в случае исключения в акторе, вышестоящий его перезапускает
- события в акторе:
-- preStart - перед началом обработки -- preRestart - перед рестартом в случае ошибки -> еще раз вызывает postStop
-- postRestart - после рестарта, перед обработкой -> еще раз вызывает preStart
-- postStop - окончание обработки
пример использования:
override def preStart(): Unit = log.info(s"printer preStart.")
- отправка данных в актор с ожиданием ответа
class Pongy extends Actor {
 val log = Logging(context.system, this)
 def receive = {
  case "ping" =>
   log.info("Got a ping -- ponging back!")
   sender ! "pong" //после обработки, отправляем сендеру ответ
   context.stop(self) //и останавливаем актора
 }
сендер отправляет событие и ждет ответа:
class Pingy extends Actor {
 val log = Logging(context.system, this)
 def receive = {
  case pongyRef: ActorRef =>
   val f = pongyRef ? "ping"
   f pipeTo sender //получение ответа
 }
}
- Диспетчеризация акторов:
Дочерние акторы можно:
-- перезапустить - restart (стратегия по умолчанию)
-- возобновить без перезапуска - Resume
-- остановить - stop
-- распространить ошибку на все дочерние акторы - Escalate
override val supervisorStrategy =
 OneForOneStrategy() {
  case ake: ActorKilledException => Restart //в случае кила, будет рестарт
  case _ => Escalate //если один из акторов потерпел неудачу, она распространяется на всех
- Удаленное взаимодействие акторов:
-- настраиваем конфиг взаимодействия:
def remotingConfig(port: Int) = ConfigFactory.parseString(s""
akka {
 actor.provider = "akka.remote.RemoteActorRefProvider"
 remote {
  enabled-transports = ["akka.remote.netty.tcp"]
  netty.tcp {
   hostname = "127.0.0.1"
   port = $port
  }
 }
}
-- создаем актор:
val system = remotingSystem("PongyDimension", 24321)
val pongy = system.actorOf(Props[Pongy], "pongy")
-- получаем ссылку на удаленный актор "ActorIdentity", после посылаем сообщения:
def receive = {
 case "start" =>
 val pongySys = "akka.tcp://PongyDimension@127.0.0.1:24321"
 val pongyPath = "/user/pongy"
 val url = pongySys + pongyPath
 val selection = context.actorSelection(url)
 selection ! Identify(0)
 case ActorIdentity(0, Some(ref)) =>
  pingy ! ref
 case ActorIdentity(0, None) =>
  log.info("Something’s wrong - ain’t no pongy anywhere!")
  context.stop(self)
 case "pong" =>
  log.info("got a pong from another dimension.")
  context.stop(self)
- Минусы:
-- Нельзя посылать один и тот же тип сообщения из разных receiver. Для разграничения нужно будет включать уникальный ключ.
-- сложно настроить ожидание определенной цепочки событий (послать в 2 актора и ждать ответа)

Реакторы

расширение модели акторов (решает проблемы минусов акторов)
- подключение:
io.reactors
import io.reactors._
- создание шаблона реактора принимающего только строки
val welcomeReactor = Reactor[String] { self =>
 self.main.events onEvent { name =>
  println(s"Welcome, $name!")
  self.main.seal()
 }
}
создание реактора по шаблону
val system = ReactorSystem.default("test-system")
val ch = system.spawn(welcomeReactor)
передача события в реактор
ch ! "Alan"
реакторы получают на входе события, вместо сообщений в акторах


Молниеносный анализ Spark

Hive

Для начала небольшое отступление про HIVE.

- HIVE STREAMING:
hive запрос может вызывать скрипты для трансформации во время своей работы:
ADD FILE /path/to/mapper.py;
ADD FILE /path/to/reducer.py;
FROM (
 FROM wikipedia_sample
 SELECT TRANSFORM (line)
 USING "./mapper.py" AS word, counts
 DISTRIBUTE BY word SORT BY word --чтобы сгуппировать и отсортировать данные слов по нодам (одни слова на одной ноде)
) word_pairs
SELECT TRANSFORM (word_pairs.word, word_pairs.counts)
USING "./reducer.py"
AS word, counts

- HIVE optimization:
-- distribution создано для удобного сэмплирования
также если 2 таблицы одинаково distributed, то join будет идти внутри каждого бакета (уменьшение shuffling)
а sort облегчит group by и join (ну нужно будет делать merge sort)
-- data skew -
create table TABLE ...
SKEWED BY (user_id) ON ("unknown", "1")
запрос будет разбит на 2 части:
select * from T where user_id = 1
и
select * from T where user_id <> 1
Либо нужно устанять перкосы вручную - генерируя уникальные ключи из неуникальных перекосов:
DISTRIBUTE BY (
 hash(user_id)
 + IF(user_id IS NULL, my_salt_UDF(), 0)
 )
 

Введение в Spark

Общее описание архитектуры Spark описано тут: /search/label/bigdata#spark
var lines = sc.textFile("/apps/hive/warehouse/stg/pos_rec_hdr__imp/part-m-00000")
lines.count() //число строк
lines.first() //первая строка
lines.filter(line => line.contains("20181121")).count() //фильтрация
- Программа на Spark состоит из частей:
--программа драйвер - запускает операции на кластере
-- переменная sc - SparkContext - связь с вычислительным кластером
+ переданные функции выполняются на кластере
- запуск spark shell
spark-shell --master yarn --driver-memory 8G --executor-memory 1G --num-executors 8
- подключение к Spark из Scala:
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "/apps/hive/warehouse/stg/pos_rec_hdr__imp/part-m-00000"
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("20181122")).count()
    val numBs = logData.filter(line => line.contains("20181123")).count()
    println(s"Lines with 20181122: $numAs, Lines with 20181123: $numBs")
    spark.stop()
  }
}
- подсчет числа строк в файле:
val file = sc.textFile("/apps/hive/warehouse/tmp/imp__271996/hadoop_imp_807272.csv") //val - т.к. все RDD неизменяемые
file.flatMap(line => line.split("\\|")) //сплитим строку в массив, flatMap - чтобы раскрыть элементы массива строки в общий массив всего файла
.map(word => (word, 1)) //помещаем каждый элемент массива в хэш массив с числом в значении = 1
.reduceByKey{_ + _} //сворачиваем одинаковые ключи, суммируя значения (можно также countByKey)
.map(item => item.swap) //меняем ключ со значением местами
.sortByKey(true, 1) //сортируем по ключу, где у нас значение после разворота (true - для прямой сортировки)
.map(item => item.swap) //обратно меняем 
.saveAsTextFile("/apps/hive/warehouse/tmp/words/") //сохраняем в файл
- install sbt for ubuntu
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
sudo apt-get update
sudo apt-get install sbt
- sbt конфиг для сборки:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
- сборка jar:
export JAVA_HOME=/usr/jdk64/jdk1.8.0_112/
export PATH=$PATH:/usr/jdk64/jdk1.8.0_112/bin/
sbt package
- последующий запуск:
spark-submit \
--class "SimpleApp" \
--master "yarn" \
target/scala-2.11/simple-project_2.11-1.0.jar
-- master = local[N] - локальный запуск на текущий машине с N ядрами
-- доп параметры:
--jars depl.jar,dep2.jar,dep3.jar \ -- добавить jar к выполнению
--total-executor-cores 300 \ -- число executor
--executor-memory 10g \ --памяти под процесс
--files - список файлов для размещения в каталоге выполнения (файлы с доп. данными)
-- большинство этих параметров можно задать из кода при создании Spark Context: SparkSession.builder

RDD

- виды операций:
-- narrow (слабые) преобразования - создание нового RDD на основе текущего с выполнением некоторых операций
все narrow операции выполняются в 1 stage, т.к. данные не надо никуда перекачивать (обработка на месте)
-- wide (сильная) действия - вычисления результата, данные 1 блока RDD переносятся в несколько других блоков (reduce, order, group, join)
wide операция создает новый stage в плане, т.к. требует shuffling - сохранение промежуточных данных и пересылку на другие сервера (партиции)
выполнение преобразований откладывается и оптимизируется до первого действия
(т.е. нагляднее сделать несколько map/filter, чем 1 громоздкий)
- если RDD сохранили в переменную, то при каждом вызове действия над ней все преобразования будут вычисляться заново
.persist / .cache - если нужно сохранить результат преобразований
-- можно задать место хранения, к примеру: (StorageLevel.DISK_ONLY) / MEMORY_ONLY/ SER - с сериализацией объекта
-- если задали MEMORY_ONLY, но после RDD был вытеснен, то при последующем обращении к нему ему нужно выполниться заново
если такая вероятность есть, то лучше сохранять с MEMORY_AND_DISK - тогда при повторном обращении блок будет сохранен и загружен с диска
-- unpersist - выгрузить переменную из памяти
- создание RDD
-- из массива:
val lines = sc.parallelize(List("pandas", "i like pandas"))
-- из текстового файла:
val lines = sc.textFile("/path/to/README.md")
- сохранение RDD
-- .collect - в локальный массив или take(N элементов)/takeSample
-- saveAsSequenceFile("path",Some(classOf[GzipCodec]) - в 1 текстовый файл
saveAsTextFile - в несколько файлов
- передача собственных функций
-- функция должна поддерживать сериализацию
def isMatch(s: String): Boolean = {
 s.contains("123")
}
file.filter(isMatch).first()
- основные операции:
-- input.map(x => х * х) - выполняется над каждым элементом и создает новый RDD с тем же числом преобразованных элементов
-- flatMap - то же самое, но разворачивает элементы подмассивов в элементы 1 массива
-- поддерживаются операции над множествами: distinct / union / intersection / substract (oracle minus) / cartesian
-- sample - выборка случайных элементов
-- foreach
- основные действия:
-- reduce / fold - принимают 2 элемента и возвращает 1 элемент того же типа
-- aggregate - похоже на reduce, но используется, если нужно вернуть значение другого типа
val result = input.aggregate ( (0, 0)) (
(асс, value) => (асс._1 + value, асс._2 + 1), //считаем сумму и кол-во одновременно
(accl, асс2) => (accl._1 + асс2._1, accl._2 + асс2._2))
val avg = result._1 / result. 2.toDoule
-- countByValue - частота каждого элемента в массиве

Spark SQL (DataFrame)

- DataFrame - обязательно имеет схему (аналог view над данными) - тип данных: org.apache.spark.sql.DataFrame
- RDD можно преобразовать в DF, если указать схему (CreateDataFrame( RDD, schema) )
- код DF выполняется на стороне кластера инструкциями Scala (optimizer генерирует код), что в 3 раза быстрей, чем фильтровать на python RDD (и код нужно писать самому)
- для работы с SQL нужно в sbt добавить зависимости:
groupid = org.apache.spark
artifactid = spark-hive_2.10
version = 1.2.0
- над полученным DataFrame допустимы все RDD операции: count/filter/map/...
- создать временную таблицу на основе DataFrame. Таблицу можно будет исползовать в HIVE SQL только в этом же контексте
rows.registerTempTable("tempTable") 
- набор хранимых типов ограничен: int / double / timestamp / array
- кэширование tableName в памяти spark
val c = hiveCtx.cacheTable("rdw.bi0_pmaterial") 
- кэширование таблицы через SQL
val c = hiveCtx.sql("CACHE TABLE rdw.bi0_pplant") 
- создание таблицы на основе RDD:
case class HappyPerson(handle: String, favouriteBeverage: String) // описание структуры
// Создать набор персон и превратить его в SchemaRDD
val happyPeopleRDD = sc.parallelize(List(HappyPerson("holden", "coffee"))) //обычный RDD
// Обратите внимание: здесь действует неявное преобразование,
// эквивалентное вызову sqlCtx.createSchemaRDD(happyPeopleRDD)
happyPeopleRDD.registerTempTable("happy_people")
- кастомные функции для Spark SQL:
registerFunction("strLenScala", (_: String) .length)
val tweetLength = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10")
- window func:
f.count(*).over(Window.partitionBy("ip"))
- компиляция запроса в java байт код
conf.set("spark.sql.codegen", "true") 

Работа с парами ключ-значение

- создание массива, где ключ - первое слово, значение = сама строка:
val pairs = lines.map(x => (x.split(" ")(0), х))
- фильтр по значению:
pairs.filter{case (key, value) => value.length < 20}
- вычисление среднего по ключам:
rdd.mapValues(x => (х, 1)) .reduceByKey(
(х, у) => (x._l + у._1, х._2 + у._2))
- combineByKey - объединение значений по ключу
пример вычисления среднего:
val result = input.combineByKey(
(v) => (v, 1), //createCombiner - если ключа не было - создаем (число значения, 1)
(асс: (Int, Int), v) => (асс._1 + v, асс._2 + 1), //mergeValues - если ключ есть в этом блоке, то - (число значние + новое значние, число потовторов + 1)
(accl: (Int, Int), асс2: (Int, Int)) => (accl._1 + асс2._1, accl._2 + асс2._2) //mergeCombiners - объединяем потоки
) .map{ case (key, value) => (key, value._1 / value._2.toFloat) //проходимся по полученному массиву: значение / число повторов
result.collectAsMap() .map(println(_)) //collectAsMap - преобразование RDD в локальный ассоциативный массив + выводим на экран
- настройка уровня параллелизма:
sc.parallelize(data) .reduceByKey((x, у) => х + у, число потоков)
- repartition - перераспределение данных в кластере
- coalesce - облегченный вариант repartition с минимизацией передачи данных по сети
операция уменьшает число блоков, склеивая мелкие в более большие на 1 ноде
-- rdd.partitions.size () - чтобы узнать число блоков для репартиций
- группировка по ключу - значения становятся массивом:
groupByKey - [К, Iterable [V]]
-- расширенный вариант этой функции - groupBy
- группировка 2 RDD по 1 ключу (заготова для распредленного hash join)
cogroup () -> RDD [ (К, ( Iterable [V], Iterable [W]))]
- соединения:
-- storeAddress.join(storeRating) - inner join по ключу: вернется массив из значений левой и правой таблицы
-- аналогично есть left/right/cross join
- переопределение функции сортировки:
-- пример - сортировка чисел как строк:
implicit val sortintegersByString = new Ordering[Int] {
 override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey ()
- доп. действия связанные только с kv rdd:
-- countByKey - число элементов для каждого ключа
-- lookup(key) - вывести массив всех элементов ключа key

Управление распределением данных

- перед работой нужно импортировать:
import org.apache.spark.HashPartitioner
- можно распределить данные по ключу при создании, если после это распределение ключа будет использоваться несколько раз:
val userData = sc.sequenceFile[UserID, Userinfo] ("hdfs:// ... ")
.partitionBy(new HashPartioner(100)) // распределяем файл в кластере по 100 хэш партициям
. persist()
-- также 100 означает, что столько параллельных заданий будет обрабатывать будущее соединение с этой таблицей
-- map - создает новый RDD, который не наследует принцип партицирования
- определение метода партицирования:
-- pairs.partitioner - посмотреть тип партицирования у RDD
-- хэш партицирование
val partitioned = pairs.partitionBy(new spark.HashPartioner(2))
-- range партицирование - разделение RDD на 10 равных последовательных частей
counts.partitionBy(new RangePartitioner(10,counts))
range партицирование происходит при сортировке, чтобы данные были по порядку, а не случайно, как в hash
для передачи данных используется Serializer (java - default / Kryo - более быстрый, но не поддерживает все типы)
-- свой тип партицирования:
class DomainNamePartitioner(numParts: Int) extends Partitioner {
 override def numPartitions: Int = numParts //число партиций
 override def getPartition(key: Any): Int = { //получить номер партиции по ключу
  val domain = new Java.net.URL(key.toString) .getHost() //в данном случае: хэш код от хоста url
  val code = (domain.hashCode % numPartitions)
  if (code < 0) (
   code + numPartitions // Сделать неотрицательным
  else
   code
 }
 // Jаvа-метод equals для сравнения объектов Partitioner
 //для сравнения 2 RDD партицированных одним способом
 override def equals(other: Any): Boolean = other match {
  case dnp: DomainNamePartitioner =>
   dnp.numPartitions == numPartitions
  case _ =>
   false
 }
}

Загрузка и сохранение данных

- загрузка текстового файла:
val input = sc.textFile("hdfs путь до файла или каталога!")
-- загрузка CSV:
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
val input = sc.textFile(inputFile)
val result = input.map{ line =>
 val reader = new CSVReader(new StringReader(line));
 reader.readNext();
}
-- сохранение в csv RDD из 2 полей:
pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray)
.mapPartitions{people =>
 val stringWriter = new StringWriter();
 val csvWriter = new CSVWriter(stringWriter);
 csvWriter.writeAll(people.toList) //преобразование массива с строку с запятыми
 Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)
- SequenceFiles - сериализованный файл ключей-значений, с метками положений для быстрого поиска и деления
файл с ключом = строке и числом в значении
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).map(case (х, у) => (x.toString, y.get()))
сохранение проще:
val data = sc.parallelize(List(("Panda", 3), ("Кау", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)
- objectFile / saveAsObjectFile - объектные файлы через сериализацию
работает медленно, нет меток синхронизации
- другие типы данных:
-- Key - Value текстовый формат:
val input = sc.hadoopFile[Text, Text, KeyValueTextinputFormat] (inputFile)
.map {
 case (х, у) => (x.toString, y.toString)
}
-- загрузка lzo сжатых файлов:
val input; sc.newAPIHadoopFile(inputFile, classOf[LzoJsonlnputFormat], classOf[LongWritaЫe], classOf[MapWritaЫe], conf)
- доступ к apache hive:
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first ()
println (firstRow.getString(0)) // Поле О - это поле name
- обработка json посредством sql:
val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")
- доступ к бд по jdbc:
-- доступ к mysql:
def createConnection() = { //описание подключения
 Class.forName{"com.mysql.jdbc.Driver").newinstance();
 DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r: ResultSet) { //алгоритм извлечения данных
 (r.getint(l), r.getString(2))
}
val data = new JdbcRDD(sc, createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?", //запрос
lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues) 
println(data.collect().toList)
lowerBound/upperBound задает границы данных,
результат которых разбивается на numPartitions партиций (запрос также будет выполняться 2 потоками)
- подключение к hbase:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritaЫe
import org.apache.hadoop.hbase.mapreduce.TaЬleinputFormat
val conf = HBaseConfiguration.create ()
conf.set(TableinputFormat.INPUT_TABLE, "tablname") // определить таблицу
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableinputFormat], classOf[ImmutaЬleBytesWritable],classOf [Result])

Дополнительные возможности

- аккумулярторы
max/filter может использовать в своем коде внешние переменные, но они будут локальными для конкретного воркера
переменные видимые в драйвере и воркере (часто для отладки)
val blankLines = sc.accumulator(0)
val callSigns = file.flatMap (line =>
 if (line == '"') {
  blankLines += 1 // Увеличить значение в аккумуляторе
 line.split(" ")
})
-- значение акк. можно будет получить только после действия с RDD (flatmap - преобразование не вызываемое сразу)
-- точность значения аккумулятора не гарантируется, он может быть больше, если контейнер рестратрует или если RDD несколько раз сохраняется на диск, а потом обратно кэшируется в память
-- можно передавать небольшие значения: Int, Double, Long, Float
-- собственный акк. можно создать отнаследовавшись от AccumulatorParam
- Широковещательные переменные
-- предназначены для передачи большого объема данных, но только для чтения
(пример использования - левая таблица для HJ)
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
 val country = lookupinArray (sign, signPrefixes. value)
 (country, count)
} . reduceByKey ( (х, у) => х + у)
-- signPrefixes - можно изменить в воркере, но изменение будет видно только там
- Единичная инициализация ресурсов через mapPartitions
val contactsContactLists = validSigns.distinct() .mapPartitions{
signs =>
val mapper = createMapper() //единичная инициализация для воркера, а не для каждого элемента
val client = new HttpClient()
client.start ()
// создать http-зaпpoc
signs.map {sign => createExchangeForSign(sign)} //поэлементная обработка
.map{ case (sign, exchange) => // извлечь ответы
 (sign, readExchangeCallLog(mapper, exchange))).filter(x => х._2 != null) // Удалить пустые записи
}
- взаимодействие с внешними программами
val distScript = "./src/R/finddistance.R"
val distScriptName = "finddistance.R"
sc.addFile(distScript)
val distances = contactsContactLists.values.flatMap(x =>
x.map(y => s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")) //передать параметры
.pipe(Seq(SparkFiles.get(distScriptName))) //вызывать скрипт через pipe
-- все вызванные скрипты сохраняются в SparkFiles. getRootDirectory - важно давать разные имена

Оптимизаиция

- map* функции не сохраняют партицирование (preservePartition = false), т.к. мап может создавать/изменять как значения, так и ключи
если точно знаем, что ключи не меняются, то можно указать preservePartition = true
тогда эта операция станет слабой (narrow)

- function optimization
-- если нужно использовать большой объект в функции, то лучше не делать замыкание - объявлять глобальную переменную в драйвере и использовать ее во внутренней функции.
Т.к. spark в этом случае сереализует эту переменную и разошлет всем executor.
Лучше создать broadcast переменную, которая разошлется только по нодам.
-- применение функции к группе строк, а не 1 (что уменьшим число смен контекста)
rdd.mapPartitions(fnc)
def fnc(rows) = rows.foreach(r => {some logic on row} 

- просмотр плана выполнения Catalyst
spark.sql(q).explain(true) - несколько сгеренированных физических планов с разной стоимостью
план представляет из себя дерево: литералов (константы), атрибутов (переменные), поддеревьев содержащих рекурсивную функцию
- происходит несколько трансформаций, для снижения стоимости плана: схлопывание литералов, filter pushdown, projection pushdown (column pruning - исключение колонок)
На этом этапе можно определить, что вероятно план был сгенерирован неверно.

- типы join в sparksql:
-- broadcast join
если таблица целиком влезет в autoBroadcastJoinThreadshold, тогда не будет shuffling/reduce
-- shuffle hash join
для запросов через равно
одна таблица меньше другой: минимум в 3 раза
достаточно памяти для шафлинга партиции хэш массива:
shuffle.partitions (кол-во партиция для shuddle/reduce) * autoBroadcastJoinThreadshold (максимальный размер, который может быть заброадкащен = ~10mb)
-- sort-merge join
алгоритм поумолчанию
может выполняться частями на таблице любого размера на любом условии
Этот запрос можно ускорить, если кластеризовать таблицы по колонкам соединения.
-- broadcast NL
почти не используется, т.к. нет индексов. Но наиболее универаслен, т.к. принимает любое условие соединения

- уровень параллелизма можно повысить, если переразбить данные на большее число партиций (repartitions)
или решить проблему перекоса данных, если разбить данные более равномерно через repartitions
- checkpoint - если есть шанс падения, то можно сохранить данные в hdfs: df.rdd.checkpoint()

- Модель памяти:
-- execution - программный кэш
если тут заканчивается память, то данные скидываются на диск, а потом читаются оттуда
-- storage - разделяемый кэш
если тут заканчивается память, то старые данные вытесняеются LRU (и не факт что они пригодились бы)
память разделяется между этими секциями, но execution может динамически отжать почти все, если ему надо (с Spark 1.6 )
- resources best practice:
-- 5 CPU на 1 executor - это лучше для ускорения чтения HDFS
из всех ресурсов нужно вычесть 1 контейнер для нужд ОС/Hadoop, а доступную память * 0,9
- dynamic allocation - может выключать неиспользуемые executor и передавать их другим программам
- speculative-execution - если время выполнения executor больше медианы, то он будет перепланирован в другом месте

Настройка и отладка

- input.toDebugString - посмотреть ациклический граф получения RDD
counts.toDebugString
(2) ShuffledRDD [ 296] at reduceByKey at <console>: 17
+-(2) MappedRDD[295] at rnap at <console>:17
 | FilteredRDD [ 294 ] at filter at <console>: 15
 | MappedRDD[293] at rnap at <console>:15
 | input.text MappedRDD[292] at textFile at <console>:13
 | input.text HadoopRDD[291] at textFile at <console>:13
-- объекты на одном уровне выполняются конвеерно без сохранения результата на диск
-- при применении действия ацикличный граф (DAG) преобразуется в реальный план выполнения
- web интерфейс мониторинга:
-- storage в web мониторе показывает список закешированных RDD
на каких хостах он хранится и какое рапредление озу/диск
-- executors - показывает всех исполнителей и сколько ресурсов они себе забрали
тут же можно посмотреть Thread Dump - стэктрейс работы программы
на основе этого можно делать сэмплирование, чтобы определить наиболее медленные части
-- в environment - Хранятся общие настройки
-- также лог выполнения можно посмотреть через yarn
для уже завершенного: yarn logs -applicationid <арр ID>
- число воркеров зависит от числа блоков родительского RDD
т.е. все еще начинается с файлов в hdfs, если файл маленький, то большой параллельности по умолчанию не будет
- после фильтрации RDD число воркеров наследуется от родителя, несмотря на уменьшение объема
из-за этого есть смысл делать coalesce - чтобы уменьшить число пустых блоков
- MEMORY_AND_DISK - большие RDD лучше кэшировать с этой опцией, чтобы не допускать повторный пересчет
60% всей выделенной памяти используется под кэш RDD
- использовать под кэш Spark быстрый флэш диски
- под мелкие задачи выделять мелкие контейнеры, чтобы накладные расходы на GC и передачу данных не перевешивали полезную работу

Трассиовка java программ
Делается через java visual wm:
- sampler - распределение времени, на что тратится основное время (останавливает программу периодически и смотрит дампы)
-- показывает гросс и нет вложенных объектов
-- в сэмпл могут не попасть мелкие но частые
- profiler - подмена байт кода на инструментированный - начало/конец
-- (при запуске нужно поставить ключ (vm options) = noverify
-- и указать класс
-- попадает все, но инструментирование замедляется
- еще вариант через reflexию и прокси - требуется ручное изменение программы
- jmh - проведенение тестов (@Benchmark) - проводим N раз и берем среднее (+ разогрев)
- Трассировку удобно визаулизировать черзе flamegraph (https://queue.acm.org/detail.cfm?id=2927301) :
# git clone https://github.com/brendangregg/FlameGraph 
# cd FlameGraph 
# perf record -F 99 -a -g -- sleep 60 
# perf script | ./stackcollapse-perf.pl | ./flamegraph.pl > out.svg

Spark Streaming

- DStreams - RDD для потоковых данных. Он объединеят в себе множество RDD, каждый из которых отвественен за порцию данных.
- зависимости sbt:
groupld = org.apache.spark
artifactld = spark-streaming_2.10
version = 1.2.0
* импорты:
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext.
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
- программа:
// Создать StreamingContext с 1-секундным интервалом обработки
// и с использованием SparkConf
val ssc = new StreamingContext(conf, Seconds(l))
// Создать DStream из данных, принятых с порта 7777
// локального компьютера
val lines = ssc.socketTextStream("localhost", 7777)
// Отфильтровать поток DStream, оставив строки со словом "error"
val errorLines = lines.filter (_. contains ( "error"))
// Вывести строки со словом "error"
errorLines.print()
-- в отдельном потоке стартуем и ждем окончания
// Запустить обработку потока и дождаться ее "завершения"
ssc.start ()
// Ждать завершения задания
ssc.awaitTermination()
-- запуск:
spark-submit \
--class com.oreilly.learningsparkexamples.scala.StreamingLoginput \
$ASSEМВLY_JAR local[4]
-- отправка сообщения:
nc localhost 7777 testmsg
- отказоустойчивость:
-- данные копируются на 2 ноды
-- каждые 5-6 сообщений происходит checkpoint - запись данных в hdfs
принудительный вызов:
ssc.checkpoint( "hdfs:// ... ")
- преобразования:
-- без сохранения состояния:
map/filter/reduce/group/join - применяются над каждым RDD порции данных.
применение будет над окном заданном в StreamingContext
- transform - применение функции над каждым RDD из окна:
val outlierDStream = accessLogsDStream.transform { rdd =>
 extractOutliers(rdd)
}
-- с сохранением состояния:
при вычислении текущего окна используются данные предыдущих
- Оконные функции - например, нужно посчитать среднее за 30 секунд (размер окна - кратно шагу), при частоте обновления = 10 (шаг перемещения окна)
Кол-во значения в скользящем окне:
val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10))
val windowCounts = accessLogsWindow.count()
Reduce на окне:
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getipAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
( (х, у) => х + у), // Добавить элементы из новых пакетов в окне
( (х, у) => х - у), // Удалить элементы из пакетов, покинувших окно
Seconds(30), // Размер окна
Seconds(10)) // Шаг перемешения окна
также есть: countByValueAndWindow и countByWindow
- UpdateStateByKey - поддержка состояния между пакетами
пример подсчет числа кодов http ответов:
def updateRunningSurn(values: Seq[Long], state: Option[Long]) = {
 Some(state.getOrElse(0L) + values.size)
}
val responseCodeDStrearn = accessLogsDStrearn.rnap(log => (log.getResponseCode(I, 1L) //начальный хэш массив
val responseCodeCountDStrearn = responseCodeDStrearn.updateStateByKey(updateRunningSurn _) //добавляем единичку
- операции вывода
-- сохранение потока в текстовый Sequence файл
writableipAddressRequestCount.saveAsHadoopFiles[
SequenceFileOutputFormat[Text, LongWritble]]("outputDir", "txt")
-- через перебор всех элементов:
ipAddressRequestCount.foreachRDD { rdd =>
 rdd.foreachPartition { partition =>
  // Открыть соединение с внешней системой (например, с базой данных)
  partition.foreach { item =>
   // Передать элемент через соединение
  }
  // Закрыть соединение
 }
}
- источники данных:
-- потоковое чтение файла:
val logData = ssc.textFileStream(logDirectory)
-- можно читать данные посланные из акторов Akka
-- Apache Kafka
Добавляем в sbt:
spark-streaming-kafka 2.10
подписка на стрим Kafka:
import org.apache.spark.streaming.kafka.
// Создать отображение тем в число потоков выполнения
val topics = List(("pandas", 1), ("logs", 1)) .toMap
val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics) //zkQuorum - данные о хостах из zookeper
StreamingLoginput.processLines(topicLines.map(_._2))
-- Apache Flume - пассивный приемник - инициатор передачи - Flume
val events = FlumeUtils.createStream(ssc, receiverHostname, receiverPort)
нет гарантии доставки
- активный приемник - Spark Streaming вычитывает данные из Flume
данные хранятся, пока Spark не подтвердит, что все забрал
извлекаем данные из промежуточного получателя Flume:
val events = FlumeUtils.createPollingStream(ssc, receiverHostname, receiverPort)
// Предполагается, что событие - это строка в кодировке UTF-8
val lines = events.map{e => new String(e.event.getBody() .array(), "UTF-8"))

Данный конспект создан на основе трех одноименных книг

1 комментарий: