четверг, 29 ноября 2018 г.

Введение в Scala и параллельную разработку

В этой статье я хотел бы охватить все аспекты работы с данными на языке Scala - от примитивов языка до параллельного программирования.

Введение в Scala

Переменные

val - константа
var - Изменяемая переменная:
var s:String = "test"
- все типы являются классами (нет встроенных типов): Byte, Char, Short, Long, Float, Double, Boolean
- String используется java.lang.String но добавляет дополнительные функции (StringOps, RichInt, RichDouble ...)
"Hello".intersect("World") //== lo

Операторы

- Все операторы, на самом деле методы:
a+b = a.+(b)
- нет инкремента (++) и декремента (--) , только i+=1
- Некоторые функции (типа sqrt) можно использовать без объявления пакета после import
scala.math.sqrt(16) //полная запись
import scala.math._
sqrt(16) //частичная запись
- s(i) - java аналог s.charAt(i)
явный вызов в scala: s.apply(i)
- () - аналог void
- ; - нужна только если в строке несколько выражений
- {} - несколько операторов в блоке, возвращает значение - это будет последнее выражение
var a = {a; b; sqrt(16)} //вернет = 4
- результатом присвоения является () - в java = присваеваемому

Условия

- Условия возвращают значение
val s = if (x > 0) 1 else -1
это удобней, т.к. может быть использовано для инициализации const
аналогом c++ является x > 0 ? 1 : -1

Вывод в консоль

print(f"text=$a1 and ${...}") //расширенный аналог printf в java
, где:
$a - переменная
${} - выражение

Циклы

- while / do - аналогично java
- цикл for работает по принципу foreach - перебора элементов последовательности:
for (i <- 1 to n) {}
1 to n - генерирует последовательность чисел, которая обходится последовательно
последовательность может быть любая - выражение или просто строка
- нет break и continue
- в for может быть несколько итераторов через ; и выражение может содержать условие:
for (i <- 1 to 3; любое выражение ; j <- 1 to 3 if i != j) print(f"${10 * i + j}%3d")
// Выведет 12 13 21 23 31 32
т.е. каждая итерация i запускает цикл j (аналог вложенного цикла с условием) и накладывает фильтр, чтобы в j не было пересечений с i
- циклы могут использоваться для генераций колекций
val coll = for (i <- 1 to 10) yield i % 3 //создание коллекции
//== scala.collection.immutable.IndexedSeq[Int] = Vector(1, 2, 0, 1, 2, 0, 1, 2, 0, 1)

Функции

- Однострочный вариант:
def abs(x: Double) = if (x >= 0) x else -x
- Многострочный:
def myfnc(x: Double) {
 var r = 10
 r
}
- Указание типа обязательно для рекурсии, иначе нет:
def fac(n: Int): Int = if (n <= 0) 1 else n * fac(n - 1)
- Можно использовать return для немедленного выхода из функции
- параметр функции может иметь значение по умолчанию:
def decorate(str: String, left: String = "[", right: String = "]") =
 left + str + right
- передавать параметры можно по имени в любом порядке:
decorate(left = "<<<", str = "Hello", right = ">>>")
- функция с переменным числом параметров:
def sumf(args: Int*) = {
 var result = 0
 for (arg <- args) result += arg
 result
}
- Чтобы передать список как несколько параметров, нужно добавить : _*
sumf(1 to 5: _*)
//res6: Int = 15
- def w = ....
логически похоже на переменную, которая каждый раз переинициализируется при обращении

- Способы передачи параметров в функцию:
call by value - каждый аргумент расчитывается единожды и передается в функцию
call by name (по ссылке) - аргумент не вычисляется, если он не используется в теле функции (вычисление происходит внутри функции)
т.е. если в параметре сложное выражение, то оно будет выполнено несколько раз (но если оно не испольузется внутри, то не будет ненужного вычисления как в call by value)
def fnc(x: Int, y: => Int)
// x: Int - call by value
// y: => Int - call by name ( передача y по ссылке)

- Частичные (partial) функции:
partial функция - реализуется чреез pattern matching и может работать только для нужных case:
val divide10: PartialFunction[Int, Int] = {
 case 1 => 10
 case 2 => 5
 case 5 => 2
 case 10 => 1
}
определена ли функция для значения можно проверить через isDefinedAt
divide10.isDefinedAt(2) == true
divide10.isDefinedAt(3) == false
если вызывать для несуществующего значения, то будет исключение:
divide10(3)
//scala.MatchError: 3 (of class java.lang.Integer)
//at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:253)
если нужно Option вместо exception, то при вызове можно исопльзовать .lift
divide10.lift(3)
//None
может применяться в lambda функциях как параметр и будет вызываться только для применимых значений:
List.range(1, 11).collect(divide10)
// List(1, 2,..., 10).collect == List(10, 5, 2, 1)

- Возврат Option значения
обработка ситуации NULL указателя через Option и getOrElse
def divide(x: Int, y: Int): Option[Int] =
 if(y == 0) None else Some(x / y)
 
def showDivide(x: Int, y: Int): String =
 divide(x, y) match {
     case Some(d) => s"$x = $d * $y"
     case None => "null division"
 }
или
def showDivide(x: Int, y: Int): String =
 divide(x, y)
 .map(d => s"$x = $d * $y")
 .getOrElse("null division")
обработка None
divide(7, 0).getOrElse(1)
//или 
divide(7, 0).orElse(divide(7, 2)) 

- Исключающее ИЛИ в параметрах или возвращаемом значении
Either[A, B] - исключающее или
val numOrStr1: Either[Double, String] = Left(2.12) //Left вернет левый тип, Right - правый
применяется как параметр в matching :
def info(numOrStr: Either[Double, String]): String =
 numOrStr match {
 case Left(num) => s"number $num"
 case Right(str) => s"string $str"
}
как возвращаемый параметр - может использоваться как обработку ошибок без Try
"Right - это правильно". Поэтому же map и flatMap применяются к правильной ветви, то есть к правой. И "хороший" результат оборачивают в Right, а ошибку в Left.
def sqrt(x: Double): Either[String, Double] =
 if (x < 0) Left("negative number")
 else Right(Math.sqrt(x))

- чтобы перехватить иключение, можно использовать объект Try:
val result = Try(str.toInt)
// обработка результата:
if (t.isSuccess) println(s"The answer is ${t.get}")
if (t.isFailure) println(t.failed.get.getMessage)

Процедуры

То же, что функции, но без = после объявления:
def box(s : String) {()}
//или 
def box(s : String):Unit = {()}

Ленивые переменные

lazy val w = .....
Операция для инициализации w вызовется при первом обращении.

Исключения

Аналогично java:
throw new Exception("test") // вызов исключения
- в scala нет проверки на обязательность исключения во время компиляции как в java
так писать не нужно:
void doSomething() throws IOException, InterruptedException, ClassNotFoundException
- перехват exception идет через case:
try {
 process(url)
} catch {
 case _: MalformedURLException => println(s"Bad URL: $url")
 case ex: IOException => ex.printStackTrace()
}
finally {
 //выполнится всегда
 in.close()
}

Массивы

- Фиксированный массив:
// из 10 элементов по умолчанию = 0: 
val nums = new Array[Int](10)
- с начальной инициализацией:
 val s = Array("Hello", "World") //new не используется
- Переменной длины:
Аналог java arraylist:
import scala.collection.mutable.ArrayBuffer
val b = ArrayBuffer[Int]()
- добавить элемент в массив:
b += 1;
- несколько значений (каждое станет значением)
b += (1,2)
b += Array(1,2)
- обход массива:
-- если индекс не нужен:
for(e <- elems) {}
-- если нужен индекс:
for(i <- 0 until arr.length)
until почти то же самое, что to , создается множество -1 значение
- yeld - обход с преобразованием в новый массив (описано в циклах)
- обход и фильтрация массива:
arr.filter(_ % 2 == 0).map (2 * _ )
withFilter - ленивый filter, который возвращает итератор
это лучше filter, т.к. нет 2 прохода коллекции, withFilter + map будет выполнять по 1 элементу за раз и сделается за 1 проход

- arr.indices - список индексов (ключей) массива
- встроенные методы: sum, max, count(_ > 0), sorted (_ < _) //направление сравнения
- arr.mkString(",") - преобразование массива в строку или toString - дополнительно возвращает тип данных

- Многомерный массив:
val matrix = Array.ofDim[Double](3, 4) // Три строки, четыре столбца
- lastOption / headOption - безопасное получение конца и начала без риска на пустой коллекции

Ассоциативные массивы

- с начальными значениями фиксированный:
val scores = Map("Alice" -> 10, "Bob" -> 3, "Cindy" -> 8)
- изменяемый массив:
val scores = new scala.collection.mutable.HashMap[String, Int]
- доступ почти аналогичен java:
val bobsScore = if (scores.contains("Bob")) scores("Bob") else 0
короткий вариант записи:
val bobsScore = scores.getOrElse("Bob", 0) // второй параметр, если не найдено
- изменение:
scores("A") = 10
- добавление, как в обычном массиве: += //если ключ уже есть, то он перетрется
- удалить ключ: -= "Key"
- обход ассоц. массива:
for((k,v) <- map) {}
- чтобы получить только значения: arr.values, ключи: arr.KeySet
- отсортированный массив хранится в виде Красно-чёрного дерева
val scores = scala.collection.immutable.SortedMap("Alice" -> 10, "Fred" -> 7, "Bob" -> 3, "Cindy" -> 8)
Красно-чёрное дерево схоже по структуре с B-деревом с параметром 2, в котором каждый узел может содержать от 1 до 3 значений и, соответственно, от 2 до 4 указателей на потомков
- хуже сбалансировано, чем btree (много уровней дерева из-за маленькой степени = 2)
+ но легче балансировать при удалении:
* ищется меньшее значение справа от удаляемого (или большее слева)
* найденый указатель перемещается в точку удаления
* ссылка от родителя на перемещаемый элементы (пред. пункт) удаляется
Дерево просто балансируется, тогда как btree при удалении элементы он лишь помечается к удалению, из-за чего оно растет в объеме и требует переиодического ребилда.
Дерево медленнй hash массива при вставке и доступе по ключу, но быстрей для поиска по диапазону.

Кортежи (tuples)

val t = (1, 3.14, "Fred")
основные отличия от массивов:
- различные типы в значениях
- переменное число элементов
- по своей сути это case class
- обращение к элементу:
t._2 
- замапить кортеж на переменные:
val (first, second, _) = t //_ не нужная переменная
- удобно, если нужно вернуть несколько значений из функции
- преобразование 2 кортежей в массив пар:
val k = Array("<", "-", ">")
val v = Array(2, 10, 2)
val pairs = k.zip(v) //Array(("<", 2), ("-", 10), (">", 2))
pairs.map // преобразует к асс. массиву

Классы

- аналогично java:
class Counter {
 private var value = 0 // Поля должны инициализироваться
 def increment() { value += 1 } // Методы по умолчанию общедоступные
 def current() = value
}
- Геттеры / сеттеры генерируются автоматически
class Counter {
 var value = 0 
 dev value() = value // геттер
 dev value_ //сеттер - сеттер можно переопределить
 // но обращение все также будет по value
 val conts // поле только для чтения
 @BeanProperty var name: String = _ // геттеры и сеттеры не генерятся
 
 def this(name: String) { // Дополнительный конструктор
  this() // Главный конструктор
 ...
- параметры главного конструктора - автоматически генерируют свойства и геттеры/сеттеры к нему (поле становится приватным)
class Person(name: String, age: Int, private var age1: Int)) { //плюс можно регулировать сразу видимостью геттера/сеттера
 def description = s"$name is $age years old"
}
- параметр конструктора становитя public методом класса, если к нему добавить val/var
- указание требований к параметрам класса:
class Rational(x: Int, y: Int) {
    require(y > 0, "denominator must be positive")
- можно вкладывать классы в классы и функции в функции

Case class
* всегда имеет apply
* перегруженный toString
* все переменные public
* перегруженный equals, который сравнивает поэлементно
* copy(someVal = "ads") - может копировать и менять одно из значений


Объекты

- нет статичных методов, вместо этого используется object - объект этого типа может иметь только 1 экземпляр (реализует singleton)
object Accounts {
 private var lastNumber = 0
 def newUniqueNumber() = { lastNumber += 1; lastNumber }
}
var num = Accounts.newUniqueNumber()
- Объекты компаньоны - должны определяться в одном файле и имеют доступ к приватным свойствам/методам друг друга
- метод apply - служит для создания и возвращения класса компаньона
class Account private (val id: Int, initialBalance: Double) {
 private var balance = initialBalance
 ...
}
object Account { // Объект-компаньон
 def apply(initialBalance: Double) =
  new Account(newUniqueNumber(), initialBalance)
 ...
}
val acct = Account(1000.0) //без new вызовется apply
Пример применения apply:
- Array(100) - вернет объект-массив с 1 элементом = 100
- new Array(100) - выполнит конструктор и создаст пустой массив из 100 элементов

- Любая программа должна начинаться с main:
object Hello {
 def main(args: Array[String]) {
  println("Hello, World!")
 }
}
Но если унаследовать App, то это можно делать в конструкторе:
object Hello extends App {
 println("Hello, World!")
}

- перечисления - enum
Нет встроенного типа, но есть класс
object TrafficLightColor extends Enumeration {
 val Red, Yellow, Green = Value
}
теперь можно обращаться:
if( TrafficLightColor.value == TrafficLightColor.Red ) println("Stop")

Пакеты и импортирование

- пакеты служат для управления пространствами имен
package impatient {
class Employee
...
для обращения нужно указывать полный путь: impatient.Employee
- 1 пакет может быть определен в нескольких файлах
- имена пакетов могут быть относительными
- альтернатива создать пакет без фигурных скобок:
package people // в начале файла
- в пакете не может быть переменных и функций
для обхода можно использовать объект пакета (1 на пакет)
package object people {
 val defaultName = "John Q. Public"
}
внутри пакета можно обращаться просто по defaultName
снаружи с полным путем, включая имя объекта
- свойство видимое только в пакете people:
private[people] def description = s"A person with name $name"
- импортирование аналогично:
import java.awt._
-- но может располагаться в любом месте и будет видимо до конца блока
-- импорт в 1 строку нескольких пакетов:
import java.awt.{Color, Font}
-- пакет можно переименовать:
import java.util.{HashMap => JavaHashMap}

Наследование

- по умолчанию все как в java:
class Employee extends Person
- final может быть как класс, так и метод (в отличии от java)
- sealed для класса - это более мягкая версия final, поэтому допускается extends в текущем файле, но воспрещается вне этого файла
- override - для переопределения не абстрактных методов и переменных требуется
- super.method_name - вызов родителя
- protected методы видны в классе, но не видны в пакете
чтобы стали видны нужно писать protected[пакет]
- во второстепенном конструкторе класса можно вызывать только главный конструктор класса
super конструктор может быть вызван только из главного конструктора (сигнатуры описанной при создании class)
- анонимные классы создаваемые 1 раз при определении:
val alien = new Person("Fred") {
 def greeting = "Greetings, Earthling! My name is Fred."
}
- также есть абстрактные классы:
abstract class Person(val name: String)
-- абстрактное свойство задается при создании:
val id: Int
- инициализация переменных до вызова конструктора супер класса
к примеру если в нем инициализация массива использует эту переменную
class Bug extends {
 override val range = 2
} with Creature
в этом случае сначала инициализируется переменная, потом вызывается конструктор
-- также это можно обойти, если обозначить поле как lazy, чтобы все инициализации прошли при первом обращении
- стандартное наследование классов в scala:
Any <- AnyVal <- Int..Double...
    <- AnyRef <- все классы
-- в any есть isInstanceOf и hashcode
Встроенный хеш-код генерируется лишь один раз для каждого объекта при первом вызове hashCode(), после сохраняется в заголовке объекта для последующих вызовов.
Но для первого раза используется random или Xorshift (псевдослучайное от threadid)
-- anyval - нет нового
-- anyref добавляет: wait, notify из Object , synchronized
-- класс Null = null
-- Nothing может использоваться для любого значения: [T] == [Nothing]
-- Unit == void в c++ == ()
- equals - сравнивает равенство адресов ссылок объектов, но его можно переопределить:
final override def equals(other: Any) = {
 other.isInstanceOf[Item] && {
  val that = other.asInstanceOf[Item]
  description == that.description && price == that.price
 }
}
-- аналогично переопределяется метод hashCode
- Классы с одной переменной - классы значения
они не создаются каждый раз, а встраиваются в место вызова:
class MilTime(val time: Int) extends AnyVal {
 def minutes = time % 100
 def hours = time / 100
 override def toString = f"$time04d"
}

Файлы и регулярные выражения

- чтение всего файла в буфер:
import scala.io.Source
val source = Source.fromFile("myfile.txt", "UTF-8")
val lineIterator = source.getLines //toArray, mkString
for (l <- lineIterator) //обработка строки
for (c <- source) //обработка посимвольно
-- если файл большой, то используем буферизированное чтение:
val iter = source.buffered
while (iter.hasNext) {
 if (iter.head is nice) //работаетм
- чтение URL
val source1 = Source.fromURL("http://horstmann.com", "UTF-8")
- двоичные файлы читаются как в java
- запись также как в java
- сериализация:
@SerialVersionUID(42L) class Person extends Serializable //SerialVersionUID - установка значения переменной
- выполнение команд командной строки:
import scala.sys.process._
val result = "ls -al /".!!
("ls -al /" #| "grep u").! // если нужно переправить данные между программами
- строки в тройных кавычках не надо экранировать:
val wsnumwsPattern = """\s+[0-9]+\s+""".r
- NIO - новая реализация отображения файлов в память, самый быстрый вариант
-- чтение данных
-- складывание в буфер, который мы читаем
+ эти процессы происходят в параллель (асинхронность), что дает повышенную производительность при чтении, так и при записи (т.к. данные обмениваются через быстрый буфер в памяти)

Трейты (интерфейсы)

- может быть несколько трейтов, но не несколько классов (все как в java)
- может потребовать наличие полей, методов, суперклассов
-- требование наличия суперкласса Exception:
trait LoggedException extends Logged {
 this: Exception =>
  def log() { log(getMessage()) }
}
-- требование наличие метода getMessage:
trait LoggedException extends Logged {
 this: { def getMessage() : String } =>
  def log() { log(getMessage()) }
}
- может быть часть реализации (как у абстрактных классов java) - называется примесью (добавляется часть новой реализации)
trait ConsoleLogger {
 def log(msg: String) { println(msg) }
}
- важен порядок - первые методы в конце списка - методы будут вызываться последовательно с конца или перетираться
trait TimestampLogger extends ConsoleLogger { //пример с расширением трейта
 override def log(msg: String) {
  super.log(s"${java.time.Instant.now()} $msg") //вызывает вышестоящий трейт ConsoleLogger
 }
}
-- но есть возможность обратиться к конкретному трейту:
super[ConsoleLogger].log(...)
- при переопредлении метода трейта не нужно указывать override
- доп трейты наследуются через with
class ConsoleLogger extends Logger with Cloneable with Serializable
- при создании объекта можно подмешать другой класс, который наследует тот же trait
val acct = new SavingsAccount with ConsoleLogger
теперь функция log будет из ConsoleLogger,а не из Log, как по умолчанию в SavingsAccount (т.е. получается как бы наследование на лету)
- трейт может иметь конструктор как обычный класс, но конструктор будет без параметров (единственное техническое отличие от классов!)
-- проблема инициализации переменных в том, что переменные инициализируются до вызова основного конструктора-класса
чтобы обойти, нужно использовать префиксную установку переменной до конструктора: //то же самое было описано выше в классах
val acct = new { 
 val filename = "myapp.log"
} with SavingsAccount with FileLogger
либо использовать lazy переменные
- трейты могут наследоваться от класса, тогда он становится суперклассом для подмешивающего

Операторы

- инфиксные операторы:
замена вызова метода: .to, -> на 1 to 10, 1 -> 10
- опрделение своего оператора как в java:
def *(other: ClassName) = new ClassName(x * other.x, y * other.y)
- f(arg1, arg2, ...) вызовет f.apply(arg1, arg2, ...) , если метода f нет в классе
- f(arg1, arg2, ...) = value - если слева от =, то вызовет update
это используется в массивах:
scores("Bob") = 100
- unapply - Обратная операция - объект разворачивается в переменные:
object Fraction {
 def unapply(input: Fraction) =
  if (input.den == 0) None else Some
использование: val Fraction(a, b) = f;
- динамические методы и переменные:
import scala.language.dynamics
-- динамический вызов транслируется в :
obj.applyDynamic("name")(arg1, arg2, ...) 
//или 
obj.updateDynamic("name")(выражение_справа) //для присвоения
-- person.lastName = "Doe"
транслируется в:
person.updateDynamic("lastName")("Doe")
реализовывать функцию нужно самим
def updateDynamic(field: String)(newValue: String) { ... } 

Функции высшего порядка

- функцию можно положить в переменную:
val fun = ceil _
_ означает необходимость параметра для функции

- Общая сигнатура функции высшего порядка:
f - тип функции, которая принимает A и возвращает B
f: A => B
- метод класса:
val f = (_: String).charAt(_: Int)
переменная: (параметр: тип).функция(параметр: тип)
val f: (String, Int) => Char = _.charAt(_)
переменная: (входные параметры) => выходной параметр = вызов функции с параметрами
- анонимная функция
(x: Double) => 3 * x
можно передавать ее как параметр, без переменных:
Array(3.14, 1.42, 2.0).map((x: Double) => 3 * x)
более короткая запись:
Array(3.14, 1.42, 2.0).map(3 * _)
.reduceLeft(_ * _) //последовательное применение слева направо
- функция, параметром которой является функция:
def funcname(f: (Double) => Double) = f(0.25) // принимается любая функция с параметром = Double
funcname(sqrt _) //пример
- функция возвращает функцию:
def mulBy(factor : Double) = (x : Double) => factor * x
в функцию mulBy положена функция "(x : Double) => factor * x"
val quintuple = mulBy(5)
quintuple(20) // 100
- каррирование - преобразование ф. с 2 параметрами в 2 функции с 1 параметром:
каррирование удобно для создания новых функций:
val cubeSum(f: Double => Double)(x) = f(x => x*x*x) //f - функция преобразования, x - статичное значение, внутри реализация с сочетанием f и x
cubeSum(sum)(x) //вызов - передается функция f - в данном случае сложение и само знаение x
т.е. это цепочка из 2 функций вложенных друг в друга(если параметров каррирования 2)
первый описывает преобразование
второй статичные параметры, которое использует действие (преобразование)
(Int => Int) => (Int=>Int) => Int
(Int => Int) => ( (Int=>Int) => Int )

- передача абстрактного кода:
def runInThread(block: => Unit) ///
runInThread { println("Hi"); Thread.sleep(10000); println("Bye") }
- return может быть использован чтобы определить тип возвращаемых данных, если одновременно используется анонимная функция без типа, которая должна посчитать результат

- monad - Монада
имеет 2 обязательных метода:
-- flatMap (примеменяет функцию f(T): монада(T) для всех элементов)
-- unit (T->монада[T]) - помещение элемента во внутренний контейнер (int в List/Set/Map)
К примеру, List - монада, т.к. помещает элементы во внутренний контейнер-список над которым можно применить flatmap
Правила монады:
-- ассоциативность: m.flatMap(f).flatMap(g) == m.flatMap(x=> f(x).flatMap(g))
-- левый unit: unit(x).flatMap(f) == f(x) //вызов flatmap c функцией f над list из 1 элемента = вызову f(один элемент)
-- правый unit: m.flatMap(unit) == m (m - уже монада = unit(x))

Коллекции

- наследование объектов:
Iterable -> Seq -> IndexedSeq //обычный массив или список (+ список с индексным доступом)
   -> Set -> SortedSet //коллекция значений (без индексов?)
   -> Map -> SortedMap //множество пар (ключ,значение)
- Преобразование типов через: .to*
- все коллекции по умолчанию неизменяемые (Immutable)
т.е. если мы изменяем коллекцию, то создается копия с изменениями
- Vector - immutable расширяемый массив (ArrayBuffer - muttable)
Технически хранится в виде b-дерева, с элементами в узле (32 элемента в блоке)
- Range - последовательность чисел
Технически хранит: начало, конец и шаг
- на последовательностях сделано: Stack, Queue, Prior Queue, ListBuffer

- List - список: рекурсивная структура: 1 -> 2 -> 3 -> Nil (head - текущий элемент, tail - следующий элемент)
A :: B :: C // добавить элемент в начало
A :: List(B) // prepend
xs + ys // объединение 2 листов
xs.update(n, val) // создание нового листа, где на позиции n будет значение val
Операции reduce над списком:
def foldLeft[U](z: U)(op: (U, T) => U): U = this match {
    case Nil => z
    case x :: xs => (xs foldLeft op(z, x))(op)
}
op(z, x) - резальтат операции op над аккумулятором z и x - первым элементом списка
рекурсивный вызов для оставшейся части списка и результатом op (аккумулятором)
когда дошли до конца списка (Nil) - возвращаем аккумулятор: z
reduceLeft делается чрезе foldLeft, но аккумулятор = первому элементу списка
У списков нет вспомогательных указателей на last/init - т.е. для вызова их нужно пройтись целиком по списку
- Развернуть список в Scala стиле:
к пустому списку добавляем элементы основного списка
добавление начинается сконца, т.к. рекурсия сначала доходит до конца foldLeft, а потом начинает раскручивать в обратную сторону
def reverse[a](xs: List[T]): List[T] = (xs foldLeft List[T]())((xs, x) => x :: xs)
- ListBuffer - имеет отдельные ссылки на начало и конец
- Множества - коллекция, где значение может присутствовать 1 раз:
Set(2, 0, 1) // порядок следования не гарантируется
для хранения используется хэш массив
-- LinkedHashSet - использует и хэш массив и связанный список, что сохраняет последовательность
- добавление элемента в неупорядоченную коллекцию: +
добавление в конец: :+ , начало: +:
++ и -- для добавления и удаления группы элементов
+= или ++= создают новую коллекцию добавляя элмент или множество
- map применяет одноместную функцию над каждым элементом создавая новую коллекцию:
names.map(_.toUpperCase)
тоже самое , что
for (n <- names) yield n.toUpperCase //это транслируется в map
-- mapValues - применение map только над значениями массива
-- flatmap - если функция возвращает коллекцию, то элементы развернутся в основную
-- transform - muttable map (без создания копии)
-- foreach - если не надо никуда сохранять, а только вызвать
- reduce - применяет 2х местную функцию и возвращает 1 результат (так последовательно к результату и следующему элементу, пока в конце не останется 1 элеменент)
- список1 zip список2 - объединение 2 списков в 1 список пар значений
-- обращение к элементам пар значений:
 (prices zip quantities) map { p => p._1 * p._2 }
-- zipall - объединение листов разного размера со значением по умолчанию
- iterator используется там, где поместить объект целиком в память слишком дорого
for (elem <- iter) //каждый вызов изменяет итератор
val words = Source.fromFile("/usr/share/dict/words").getLines.toStream

- Streams - бесконечные последовательности
(1 to 100).toStream //Range  может быть преобразована в стрим:
val words = Source.fromFile("/usr/share/dict/words").getLines.toStream //ленивое чтение бесконечного файла
это будет список, который знает только о текущем значении, а хвост lazy val, которая не вычисляется, пока к ней не обратиться явно
(после обращения туда помещается реальное значение, чтобы не делать перерасчет каждый раз)
пример:
((1000 to 10000).toStream.filter(isPrime(_))(1) //стрим дойдет до второго (0->1) простого числа и остановится
//при обычном запуске просчиталось бы все, а потом вязлся второй
Добавление элемента в стрим: el #:: stream
Бесконечный стрим:
def from(n: Int): Stream[Int] = n #:: from(n+1)
Пример расчет простых чисел:
def sieve(s: Stream[Int]): Stream[Int] = s.head #:: sieve(s.tail filter (_ % s.head != 0))
val primes = sieve(from(2))
// 2 -- 2 простое в хеад --> рекурс вызов - стрим с 3 отфилььтрованный по %2 -- List(2, 3, 4, 5, 6)
//3 не фильтруется на 2  и простое -- рекурсивный вызов стрима без делитя 2, теперь без делителя на  3 --List(3, 5, 7, 9, 11)
//и т.д. s все обрастает фильтрами
такой подход можно использовать для приблизительных расчетов (бесконечных рекурсий), которые не имеют точного решения

- for yield - более короткая замена для Map/flatmap/withfilter
val nums = List(2, 5, 1, 7, 4)
val nums2 = for {
 x <- nums
 y <- 1 to x if y > 3 //вложенный цикл
 y2 = y * 2
 z <- nums if z < y2
} yield z + y2 - y
это аналог более длинной записи:
val nums2 =
nums.flatMap(x =>
 (1 to x).withFilter(y => y > 3) //вложенный цикл
 .map(y => (y, y * 2))
 .flatMap { case (y, y2) =>
 nums.withFilter(z => z < y)
 .map(z => z + y2 - y)
 })
- можно приводить java объекты к scala коллекциям
import scala.collection.JavaConversions._
val props: scala.collection.mutable.Map[String, String] =
System.getProperties()

Сопоставление с образцом

- аналог switch (pattern matching):
ch = ..
ch match {
 case '+' | '*' => sign = 1
 case '-' => sign = -1
 case _ => sign = 0
}
-- не нужен brake - он есть по умолчанию
-- _ - если не будет, то выкинется исключение при проверке
-- можно перечислять несколько значений через |
-- в case может быть любое логическое условие:
case _ if Character.isDigit(ch) => digit = Character.digit(ch, 10)
-- присвоение идет произвольной переменной за =>
-- сопоставление может идти с типом переменной:
case s: String => Integer.parseInt(s)
- сопоставление массива 2 переменным и всего остального последовательности rest:
val Array(first, second, rest @ _*) = arr
- сопоставление ключа и значения ассоц. массива может быть в цикле:
for ((k, v) <- System.getProperties())
- сопоставление может идти по типу класса:
case Dollar(v) => s"$$$v"
- объект можно скопировать с изменением параметра:
val price = amt.copy(value = 19.95)
- для сопоставления с пустым значением можно использовать тип Some
case Some(score) => println(score) //любое значение
case None => println("No score") //пустое значение (замена "" или NULL)
- сравнение с коллекциями:
def sum(xs: List[Int], start: Int = 0 ) = xs match {
 case List() => start
 case List(x) => start + x
 case List(x, y) => start + x + y
- для сопоставления с образцом класса, нужен специальный case класс:
case class Address(country: String, city: String)
def addressInfo(address: Address): String = address match{
 case Address("Russia", _) => "russian"
 case Address("Japan", _) => "japanese"
 case _ => "no info"
}

Аннотации

Диррективы компилятору
- определение собственной аннотации
class unchecked extends annotation.Annotation
- volatile поле, которое можно менять из разных потоков
jvm выключает оптимизации: помещение в кэше процессора и т.д.
Изменения данных всегда будет видно всем процессам программы, т.к. обновление будет идти через общее хранилище.
@volatile var done = false
- несериализуемое поле
@transient var recentLookups = new HashMap[String, String]
- методы на c++
@native def win32RegKeys(root: Int, path: String): Array[String]
- Контролируемые исключения:
@throws(classOf[IOException]) def read(filename: String) { ... }
//аналог java:
void read(String filename) throws IOException
- генерация геттеров/сеттеров:
@BeanProperty var name : String = _

- рекурсивный вызов - может быть преобразован к циклу комплиятором
Рекурсивный вызов самостоятелен (не является частью выражения) - может быть преобразован к циклу:
@tailrec def sum2(if (xs.isEmpty) partial else sum2(xs.tail, xs.head + partial) // рекурсия заменяется на постепенный проход по циклу
Значение + рекурсивный вызов не может автоматически преобразоваться к циклу, т.к. требуется дойти до конца рекурсии, а потом раскручивать вызовы в обратном порядке:
if (xs.isEmpty) 0 else xs.head + sum(xs.tail) //может привести к переполнению стека при большом числе вызовов

- @switch - трансформировать swith в таблицу переходов, что оптимальнее if
- @elidable(500) - метод после 500 сборки удалится из скомпилированного кода
- def allDifferent[@specialized(Long, Double) T](x: T, y: T, z: T) = ... - генерация специализированных функций под каждый тип
- @deprecated(message = "Use factorial(n: BigInt) instead") - устаревшая функция
@deprecatedName('sz) - устаревший параметр

Обработка XML

- создание xml переменной:
val elem = <a href="http://scala-lang.org">The <em>Scala</em> language</a>
- обработка элементов:
for (n <- elem.child) обработать n
- доступ к атрибуту:
val url = elem.attributes("href") // вернется список, даже если 1 элемент
- обход всех атрибутов:
for (attr <- elem.attributes)
- вставка переменной в xml:
<ul><li>{items(0)}</li><li>{items(1)}</li></ul>
<ul>{for (i <- items) yield <li>{i}</li>}</ul>
- xpath поиск:
-- поиск непосредственного тега body - любой тег - тег li
doc \ "body" \ "_" \ "li"
-- поиск тега img на любом уровне вложенности
doc \\ "img"
- case <img/> => ... - выберется если img с любым содержанием
- объекты неизменяемые, для изменения атрибута нужно его скопировать:
val list2 = list.copy(label = "ol")
- для трансформации можно использовать класс с case преобразования внутри:
- загрузка xml:
import scala.xml.XML
val root = XML.loadFile("myfile.xml")
- загрузка с сохранением комментариев:
ConstructingParser.fromFile(new File("myfile.xml"), preserveWS = true)
- сохранение xml:
XML.save(writer, root, "UTF-8", false, null)

Обобщенные типы

- обобщенный класс (generic):
class Pair[T, S](val first: T, val second: S)
- если хотим сравнивать:
class Pair[T <: Comparable[T]](val first: T, val second: T)
- если хотим поменять местами, то наоборот T должен быть супертипом:
def replaceFirst[R >: T](newFirst: R) = new Pair(newFirst, second)
- класс можно обозначить Comparable неявно через implicit
class Pair[T](val first: T, val second: T)
(implicit ev: T => Comparable[T])
- для работы с массивом обобщенных типов, нужен "ClassTag"
def makePair[T : ClassTag](first: T, second: T) = {
 val r = new Array[T](2); r(0) = first; r(1) = second; r
}
- можно одновременно указывать обе границы:
T <: Upper >: Lower
- обобщенный тип нескольких трейтов:
T <: Comparable[T] with Serializable with Cloneable
- Ограничение типов
T =:= U //равенство типу
T <:< U //T - подтип U
T >:> U //T - более общий тип от U
T <%< U //может приводиться к типу

- вариантность и род:
-- ковариантность - тип A производится (может находится только в возвращаемых результатах)
коваринтыми типами не могут быть мутабельные структуры, т.к. элемент и его тип можно поменять
Так, если класс Cat наследуется от класса Animal, то естественно полагать, что перечисление IEnumerable<Cat> будет потомком перечисления IEnumerable<Animal>
trait Coll[+A]{
 def apply(i: Int): A
}
т.е. +A может быть любым детализирующим подтипом A (doc/cat от Animal)
-- контрвариантность - наоборот - тип только во входных параметрах
trait Printer[-A]{
 def print(a: A): String
}
Принцип subtype по Liskov: обязательно тип и его подтим должны реализовывать одни и теже действия (не должно быть разницы в методах)
-A может быть любым более общим типом A (может принимать Animal , как doc/cat)
но может использоваться и как тип для подтипа в результате:
def prefixed(s: String): Printer[A]
-- вариантность:
все функции варианты:
потребляет A => и возвращает R
trait Function1[-A, +R]
-- род - тип типов
Int, String, List[Int]: T - простой обобщенный тип
List, Vector : T[_] - обобщенный тип, который принимает параметр подтипа
Map, Function : T[_, _]
пример:
case class IntContainer[F[_]](value: F[Int]) 
контейнер для int - F может быть List, Vector
также можно накладывать ограничения:
case class Dict[K, V, T[X] <: Seq[X]](items: T[(K, V)])
T[X] - контейнер Xов должен быть подтипом Seq[X]
-- если хотим использовать ковариантый результат в параметре, то нужно определить подтип от типа:
B >: ковариантый тип

- псевдоним для типа:
type IntList = List[Int]
и обобщенного:
type DenseMatrix[A] = Vector[Vector[A]]
также можно указывать ограничения:
type Matrix[F[X] <: Iterable[X], A] = F[F[A]]
type IntMap[A] = Map[Int, A]
type DenseMatrix[A] = Matrix[IntMap, A]
типы компоненты - псевдонимы без определения:
trait Item {
 type Key
 type Value
 def key: Key
 def value: Value
}
компонентный тип с уточнением:
trait Container {
 type Item
 def values: List[Item]
}
val ints: Container {type Item = Int} =
 new Container {
 type Item = Int
 val values = List(1, 2, 3)
 }
val xs: List[Int] = ints.values // List[Int]

Дополнительные типы

- составление цепочек вызовов:
article.setTitle("Whatever Floats Your Boat").setAuthor("Cay Horstmann")
для того, чтобы она работала и для наследников базового класса нужно указывать "this.type":
def setTitle(title: String): this.type = { ...; this }
- псевдоним для длинного типа:
type Index = HashMap[String, (Int, Int)]
- необходимость передачи типа данных с методом "append"
def appendLines(target: { def append(str: String): Any },
- внедрение зависимостей:
//1. трейт:
trait Logger { def log(msg: String) }
//2. 2 реализации с выводом на экран и в файл
//3. аутентификация с поддержкой логирования
trait Auth {
 this: Logger => //трейт может подмешиваться в классы , наследующий Logger
  def login(id: String, password: String): Boolean
}
//4. трейт приложения должен подмешиваться в класс с логированием и аутентификацией
trait App {
 this: Logger with Auth =>
 ...
}
//5. приложение собирается с нужными классами для логирования:
object MyApp extends App with FileLogger("test.log") with MockAuth("users.txt")
- абстрактный тип:
trait Reader {
 type Contents
// определение абстрактного типа:
class StringReader extends Reader {
 type Contents = String
- то же самое через обобщенный тип:
trait Reader[C] {
class StringReader extends Reader[String]

Неявные преобразования

- функции с неявными преобразованием параметров Int к Fraction
implicit def int2Fraction(n: Int) = Fraction(n, 1)
val result = 3 * Fraction(4, 5) // Вызовет int2Fraction(3) * Fraction(4, 5)
- добавление своей функции в стандартный класс:
-- добавляем функцию read
class RichFile(val from: File) {
 def read = Source.fromFile(from.getPath).mkString
}
-- неявное преобразование класса File к RichFile:
implicit def file2RichFile(from: File) = new RichFile(from)
-- вместо функции можно использовать класс:
implicit class RichFile(val from: File) { ... }
дальше обычно создаем объект File
помещаем функции в файл
и обязательно импортируем без префикса:
import com.horstmann.impatient.FractionConversions._
чтобы минимизировать число неявных преобразований, вызов можно делать внутри кода класса
- необязательные параметры:
def quote(what: String)(implicit delims: Delimiters) =
//явный параметр:
quote("Bonjour le monde")(Delimiters(""", """)) 
можно заранее объявить неявный разделитель:
implicit val quoteDelimiters = Delimiters(""", """)
- Неявная передача параметра ord в зависимости от типа другого параметра (A)
def sort[A](xs: List[A])(implicit ord: Ordering[A]): List[A]
Параметр можно задать явно: sort(xs)(Ordering.Int.reverse)
- из 2 реализаций implicit выбирается наиболее точная:
-- по типу
implicit def universal[A]: A = ??? //общая функция, котора может принимать и возвращать Int
implicit def int: Int = ??? //детальная реализация
-- по вложенности
- можно посмотреть, что подставится в implicit для заданного типа вызовом:
implicitly[Ordering[Int]]
>> res0: Ordering[Int] = scala.math.Ordering$Int$@73564ab0
- порядок поиска implicit: локально -> глобальный код импортированного файла -> объект компаньон

Динамическое программирование

//динамическое программирование
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value
import scala.reflect.runtime.{universe => ru}
val mir = ru.runtimeMirror(getClass.getClassLoader)
val courseClass = mir.staticClass("advanced.Reflections.Course") //метаинфа о классе
val classMirror = mir.reflectClass(courseClass) //получаем все методы
val constructor = courseClass.primaryConstructor.asMethod //метаинфа о конструкторе
val constructorMirror = classMirror.reflectConstructor(constructor) // сам конструктор
val instance = constructorMirror.apply("Scala") //вызов

def determineClass(instance: Object): String = instance.getClass().getSimpleName() //вызов метода из объекта


Конкурентное программирование Scala

Реализация многопоточности в JVM

- поток в jvm это обертка над потоком ОС (в отличии от python)
- java-style поток:
object ThreadsCreation extends App {
 class MyThread extends Thread {
  override def run(): Unit = {
   println("New thread running.")
  }
 }
 val t = new MyThread
 t.start()
 t.join() //основной поток приостанавливается до окончания работы T
 println("New thread joined.")
}
- более scala way подход:
//создание потока 
val runnable = new Runnable {
    override def run(): Unit = println("I run in parallel")
}
val thread = new Thread(runnable)
firstThread.start() //запуск нити
firstThread.join() //ожидание завершения
- простой способ вернуть данные из потока - это объявить глобальную переменную, но использовать ее после join
- объявление синхронизированной функции:
def getUniqueId() = this.synchronized {}

- создание пула потоков:
import java.util.concurrent.Executors
val pool = Executors.newFixedThreadPool(3) // указываем количество нитей, которые хотим переиспользовать

//передача кода для выполнения
val runnable = new Runnable {
    override def run(): Unit = println("I run in parallel")
}
pool.execute(runnable)
pool.shutdown() //остановка пула (существующие задания доработают)

- самодельный пул потоков:
-- пул ожидающий работы (wait):
while (tasks.isEmpty) tasks.wait()
tasks.dequeue()
-- помещение программы в пул и нотификация, что нужно перестать делать wait
tasks.enqueue((
-- остановить пул при установке некого флага:
Worker.interrupt()
- один из способов побороть deadlock - синхронизировать ресурсы в 1 последовательности, чтобы не получилось перекрестной взаимоблокировки
если ресурсы блокируются в 1 последовательности, то 2 поток просто дождется окончания 1
Способ: перед трансфером получить атомарный счетчик в объект
val uid = getUniqueUid()
и перевод всегдя делается с объекта у которого меньший счетчик, чтобы обеспечить одинаковую последовательность блокировок:
if (this.uid < target.uid) this.lockAndTransfer(target, n)
else target.lockAndTransfer(this, -n)

Атомарные изменчивые переменные

@volatile var found = false
такие переменные изменяются за 1 такт, поэтому не могут меняться одновременно в нескольких потоках
на уровне процессора это операция: compareAndSet - установить значение, если compare выполнилось удачно
полный список описан тут: java.util.concurrent.atomic: AtomicBoolean, AtomicInteger, AtomicLong и AtomicReference
Пример использования:
private val uid = new AtomicLong(0L)
def getUniqueId(): Long = uid.incrementAndGet() //увеличить счетчик и получить значение
getAndSet //получить значение и установить новое
Аналог через synchronized:
def compareAndSet(ov: Long, nv: Long): Boolean =
this.synchronized {
 if (this.get == ov) false else {
  this.set(nv)
  true
 }
}
- блоки синхронизации могут вкладываться друг в друга - это лучше, чем делать 1 глобальный лок:
def tr(tar: Obj, n: Int) = {
    this.synchronized {
        tar.synchronized {
            //код над this и tar
        }
    }
}
- lazy переменные инициируются 1 раз при первом обращении
для проверки инициализации используются дополнительное атомарное поле-флаг
- muttable объекты нужно использовать с блокировками
- шаблон producer / consumer
между потребителем и производителем находится конкурентная очередь с данными:
- BlockingQueue - реализация в Scala: add/remove/element - с исключениями (poll/ofer/peak - со специальным значение) (take, put - блокирующие)
-- ArrayBlockingQueue - очередь фиксированного размера
-- LinkedBlockingQueue - неограниченная очередь (когда производители гарантированно работают быстрее потребителей)

- Конкурентные объекты:
val files: concurrent.Map[String, Entry] = new concurrent.TrieMap() // хэш массив с блокировками
-- concurrent.TrieMap() - позволяет обходить массив по его копии, что не блокирует его самого и не дает получить изменяемые данные
-- ConcurrentSkipListSet[Int] //lock free структуры (оптимистическая блокировка с атомарной проверкой)

Параллельные коллекции данных

- параллельный запуск обработки колекции = вызов функции par
coll.par.count(_ % 2 == 0)
-- можно распараллелить цикл:
for (i <- (0 until 100000).par) print(s" $i")
-- преобразование параллельной коллекции в последовательную:
coll.par.filter(p).seq

- Left операторы не могут обрабатываться параллельно, т.к. используют результат предыдущего выполнения
- Reduce операции сохраняющий тип данных : (A, A) => A могут быть распараллены
-- Дополнительно должно выполняться условие: a op b == b op a
( к примеру сложение просто парралелиться, но не вычитание, т.к. оно зависит от порядка )
- Если хочется результат другого типа, отличного от параметров, то нужно использовать aggregate
Array('E', 'P', 'F', 'L').par.aggregate(0)(
    (count, c) => if (isVowel(c)) count + 1 else count, //аккумулятор - часть выполняется в паралельных потоках
    _ + _ //комбайнер - складывает результат потоков
)
//def aggregate[S](z: => S)(seqop: (S, Int) => S,combop: (S, S) => S): S
- Если нужно, чтобы функция принимала или обычную или параллельную коллекцию, то нужно использовать тип GenSeq это интерфейс обоих контейнеров
def largestPalindrome(xs: GenSeq[Int]): Int = { ...

- Установка уровня параллельности для блока:
def withParallelism[A](n : Int)(block : => A) : A = {
  import collection.parallel.ForkJoinTasks.defaultForkJoinPool._
  val defaultParLevel = getParallelism
  setParallelism(n)
  val ret = block
  setParallelism(defaultParLevel)
  ret
}
withParallelism(2) {
  (1 to 100).par.map(_ * 2)
}
--- Установка параллельности для оператора:
val fjpool = new ForkJoinPool(2)
val customTaskSupport = new parallel.ForkJoinTaskSupport(fjpool)
coll.par.tasksupport = customTaskSupport
- для преобразования последовательной коллекции к параллельной используются сплиттеры (надстройка над итератором)
-- линейноссылочные структуры List/Stream не имеют эффективного алгоритма Split
при вызове par у этих структур они сначала преобразуются к обычным массивам
- Пример создания своей параллельной коллекции (в данном случае строки):
class ParString(val str: String) extends immutable.ParSeq[Char] { //immutable.ParSeq[Char] - последовательность Char
 def apply(i: Int) = str.charAt(i)
 def length = str.length
 def splitter = new ParStringSplitter(str, 0, str.length) //в последовательных коллекциях тут был бы итератор
        //в данном случае сплиттер должен возвращать часть строки
 def seq = new collection.immutable.WrappedString(str) //преобразование параллельной коллекции в последовательную
}
сплиттер должен быть отнаследован от трейта:
trait IterableSplitter[T] extends Iterator[T] {
 def dup: IterableSplitter[T] //заглушка - создание копии сплиттера
 def remaining: Int //число элементов в сплиттере
 def split: Seq[IterableSplitter[T]] //разделение сплиттера на подсплиты
}
- необязательные функции:
psplit - ручное задание размеров для сплита
- пример сплита на 2 части:
def split = {
 val rem = remaining
 if (rem >= 2) psplit(rem / 2, rem - rem / 2) //вызываем psplit 2 раза
 else Seq(this)
}
- psplit создает сплиттер с половиной строки:
def psplit(sizes: Int*): Seq[ParStringSplitter] = {
 val ss = for (sz <- sizes) yield {
  val nlimit = (i + sz) min limit
  val ps = new ParStringSplitter(s, i, nlimit) //нет копирования, просто передача индексов по строке
  i = nlimit
  ps
 }
 if (i == limit) ss
 else ss :+ new ParStringSplitter(s, i, limit)
}
- Комбинаторы - слияние сплитов обратно при использовании методов трансформации: map, filter groupBy

Объекты Future

- создание отдельного потока из пула потоков:
import java.time._
import scala.concurrent._
import ExecutionContext.Implicits.global //пул потоков
val f = Future {
 Thread.sleep(10000)
 println(s"This is the future at ${LocalTime.now}") //отдельный поток
 42
}
val result = Await.result(f, 10.seconds) // ожидание завершение потока с блокировкой основной программы
      // 2ой параметр - это таймаут ожидания потока, если превысит, то будет TimeoutException.
-- другой вариант ожидания:
Await.ready(f, 10.seconds)
val Some(t) = f.value
-- альтернативный вариант без блокировки:
по результату работы асинхронно вызовется функция:
f.onComplete {
 case Success(v) => println(s"The answer is $v")
 case Failure(ex) => println(ex.getMessage)
}
println(s"This is the present at ${LocalTime.now}")
-- объект Future должен возвращать результат или генерировать exception в случае ошибки

- размер пула по умолчанию = числу ядер
-- если нужно другое значение, то нужно создать свой пул потоков:
val pool = Executors.newCachedThreadPool()
implicit val ec = ExecutionContext.fromExecutor(pool)
- если нужно выполнить несколько потоков парраллельно, но данные нужны из обоих, то
//выполняем их параллельно:
val future1 = Future { getData1() }
val future2 = Future { getData2() }
// а результат ожидаем в последовательном oncomplete
future1 onComplete {
 case Success(n1) =>
 future2 onComplete {
  case Success(n2) => {
   val n = n1 + n2
   println(s"Result: $n")
  }
  case Failure(ex) => ...
 }
 case Failure(ex) => ...
}
- еще вариант параллельной обработки нескольких future через map:
val combined = for (n1 <- future1; n2 <- future2) yield n1 + n2 //т.к. Future имеет метод map, позволяющий работать с ним как с массивом
- или через async:
val combined = async { await(future1) + await(future2) }
- если функции нужно вызывать последовательно, то это future блоки надо определить как функции, а не переменные как ранее
def future1 = Future { getData() }
def future2 = Future { getMoreData() } // def, не val
val combined = for (n1 <- future1; n2 <- future2) yield n1 + n2 //сначала вызовется функция future1, потом future2
- обработка исключения SQLException внутри Future:
val f = Future { persist(data) } recover { case e: SQLException => 0 }
- разделение данных на части и их параллельная обработка:
val futures = parts.map(p => Future { вычислить результат в p })
//получить коллекция результатов, по окончанию всех потоков:
val result = Future.sequence(futures); //функция не блокирует программу, так что изначально тут просто ссылки на объекты без результатов
//сложение результатов, по мере их готовности:
val result = Future.reduceLeft(futures)(_ + _)
//если нужно получить первый выполнившийся поток:
Future[T] result = Future.firstCompletedOf(futures) // но другие задания продолжат работу!

- Через Promise можно вернуть несколько значений из потока:
val p1 = Promise[Int]()
val p2 = Promise[Int]()
Future {
 val n1 = getData1()
 p1.success(n1)
 val n2 = getData2()
 p2.success(n2)
}
// получение результата из первого выполнившегося потока:
val p = Promise[Int]()
Future {
 var n = workHard(arg)
 p.trySuccess(n)
}
Future {
 var n = workSmart(arg)
 p.trySuccess(n)
}
- Пример параллельной merge сортировки через future в GitHub

Реактивное программирование

События хранятся в сигнале - это иммутабельный объект
- значение в объекте можно менять
- оно будет автоматически проброшено на все зависимые сигналы
Signal = значение , выражение , список наблюдателей + update, который обновит значение используя выражение и оповестит всех наблюдателей о необходимости update
A() = 2
B() = A()*2 //==4
A() = 3 // автоматически изменит B() == 6
- Сравнение классического и реактивного подхода:
-- Классический подход:
var balance = 0
balance = balance + amount
subscribers.foreach(_.handler(this)) //оповещаем всех подписчиков, что изменилось значение
-- reactive подход:
val balance = Var(0)
val bal = balance() //отдельный вызов, т.к. опрашивается сигнал
balance() = bal + amount //потом устанавливается (не все сразу)
Код подписчика, который считает сумму всех балансов:
def cons(ac: List): Signal[Int] = Signal(ac.map(_.balance).sum()) 
//где в ac - помещаем список объектов с balance, за которыми будем следить
- зависимое событие:
val xchange = Signal(246.00)
vak inDollar = Signal(cons() * xchange())

Программная транзакционная память

- в основе лежит операция atomic
для работы нужна библиотека org.scala-stm
import scala.concurrent.stm._
def swap() = atomic {
атомарные операции не подвержены взаимоблокировкам, из-за этого они используются в оптимистических блокировках:
-- перед началом запоминается состояние памяти
-- если оказывается при записи, что область была изменена, то транзакция рестартуется
- для применения оптимистичной блокировки над переменной, нужно использовать ссылочный тип:
val urls = Ref[List[String]](Nil)
val clen = Ref(0)
- минус оптимистических блокировок:
-- рестарт транзакции, так что в транзакции нельзя вызывать неоткатываемые транзакции - к примеру http запрос
-- дополнительная нагрузка из-за рестарта
- для 1 минуса - "повторный вызов неоткатываемого запроса", можно использовать событие - aftercommit:
def inc() = atomic { implicit txn =>
 val valueAtStart = myValue()
 Txn.afterCommit { _ => //вызов неоткатываемого действия после удачного вызова
  log(s"Incrementing $valueAtStart")
 }
 Txn.afterRollback { _ => //после неудачного - оно будет рестартовано?
  log(s"rollin’ back") //тут можно сделать чтото, чтобы избежать повторного исполнения
 }
 myValue() = myValue() + 1
}
- в случае исключения внутри atomic - она целиком откатывается
- повторение транзакции, но с таймаутом в 1с:
Future {
 blocking {
  atomic.withRetryTimeout(1000) { implicit txn =>
   if (message() != "") log(s"got a message - ${message()}")
   else retry
  }
 }
}
- бесконечный повтор с интервалом в 1с:
Future {
 blocking {
  atomic { implicit txn =>
   if (message() == "") {
    retryFor(1000)
    log(s"no message.")
   } else log(s"got a message - ‘${message()}’")
  }
 }
}
- для локальных переменных нужно использовать TxnLocal - она индивидуальна в каждой транзакции
val myLog = TxnLocal("")
- транзакционный массив:
val website: TArray[String] = TArray(pages)
- транзакционный словарь:
val alphabet = TMap("a" -> 1, "B" -> 2, "C" -> 3)
-- словарь имеет возможность получить неизменяемый снапшот:
val snap = alphabet.single.snapshot

Акторы

взаимодействие независимых программ посредством сообщений (модель producer / consumer )
- Общие данные хранятся в памяти актора. Новые сообщения поступают через очередь, и обрабатываются внутри последовательно.
Акторы могут даже находится на разных компах, логика не зависит от их положения
- для работы нужен фреймворк Akka
- Первое: нужно объявить шаблон-класс акторов, который описывает его поведение и порядок приема сообщений
на основе этого шаблона будут создаваться акторы:
import akka.actor._
import akka.event.Logging
class HelloActor(val hello: String) extends Actor {
 val log = Logging(context.system, this)
 def receive = {
  case 'hello' =>
   log.info(s"Received a '$hello'... $hello!")
  case msg =>
   log.info(s"Unexpected message ‘$msg’")
  context.stop(self) //остановка актора
 }
}
- создание актора по шаблону:
def props(hello: String) = Props(new HelloActor(hello))
- отсылка сообщений в акторы:
отправка происходит в очередь, не дожидаясь окончания обработки
val deafActor: ActorRef = ourSystem.actorOf(Props[DeafActor], name = "deafy")
deafActor ! "hi" // ! - оператор для отправки сообщения
deafActor ! 1234
- поведение и состояние акторов. Актор со счетчиком:
работает как конечный автомат
при достижении 0 , актор больше не принимает receive - полностью переключается на done
class CountdownActor extends Actor {
 val log = Logging(context.system, this)
 var n = 10
 def counting: Actor.Receive = {
  case "count" =>
  n -= 1
  log.info(s"n = $n")
  if (n == 0) context.become(done)
 }
 def done = PartialFunction.empty
 def receive = counting
}
-- переключать во время работы можно на любую функцию, используя:
context.become(function)
- Иерархия акторов
-- Дочерний актор:
class ChildActor extends Actor {
 val log = Logging(context.system, this)
 def receive = {
  case "sayhi" =>
   val parent = context.parent
   log.info(s"my parent $parent made me say hi!")
 }
-- Родительский вызывает дочерний:
class ParentActor extends Actor {
val log = Logging(context.system, this)
 def receive = {
  case "create" =>
   context.actorOf(Props[ChildActor])
   log.info(s"created a kid; children = ${context.children}")
  case "sayhi" =>
   log.info("Kids, say hi!")
   for (c <- context.children) c ! "sayhi"
-- получить путь в иерархии акторов:
context.actorSelection(path)
-- в случае исключения в акторе, вышестоящий его перезапускает
- события в акторе:
-- preStart - перед началом обработки -- preRestart - перед рестартом в случае ошибки -> еще раз вызывает postStop
-- postRestart - после рестарта, перед обработкой -> еще раз вызывает preStart
-- postStop - окончание обработки
пример использования:
override def preStart(): Unit = log.info(s"printer preStart.")
- отправка данных в актор с ожиданием ответа
class Pongy extends Actor {
 val log = Logging(context.system, this)
 def receive = {
  case "ping" =>
   log.info("Got a ping -- ponging back!")
   sender ! "pong" //после обработки, отправляем сендеру ответ
   context.stop(self) //и останавливаем актора
 }
сендер отправляет событие и ждет ответа:
class Pingy extends Actor {
 val log = Logging(context.system, this)
 def receive = {
  case pongyRef: ActorRef =>
   val f = pongyRef ? "ping"
   f pipeTo sender //получение ответа
 }
}
- Диспетчеризация акторов:
Дочерние акторы можно:
-- перезапустить - restart (стратегия по умолчанию)
-- возобновить без перезапуска - Resume
-- остановить - stop
-- распространить ошибку на все дочерние акторы - Escalate
override val supervisorStrategy =
 OneForOneStrategy() {
  case ake: ActorKilledException => Restart //в случае кила, будет рестарт
  case _ => Escalate //если один из акторов потерпел неудачу, она распространяется на всех
- Удаленное взаимодействие акторов:
-- настраиваем конфиг взаимодействия:
def remotingConfig(port: Int) = ConfigFactory.parseString(s""
akka {
 actor.provider = "akka.remote.RemoteActorRefProvider"
 remote {
  enabled-transports = ["akka.remote.netty.tcp"]
  netty.tcp {
   hostname = "127.0.0.1"
   port = $port
  }
 }
}
-- создаем актор:
val system = remotingSystem("PongyDimension", 24321)
val pongy = system.actorOf(Props[Pongy], "pongy")
-- получаем ссылку на удаленный актор "ActorIdentity", после посылаем сообщения:
def receive = {
 case "start" =>
 val pongySys = "akka.tcp://PongyDimension@127.0.0.1:24321"
 val pongyPath = "/user/pongy"
 val url = pongySys + pongyPath
 val selection = context.actorSelection(url)
 selection ! Identify(0)
 case ActorIdentity(0, Some(ref)) =>
  pingy ! ref
 case ActorIdentity(0, None) =>
  log.info("Something’s wrong - ain’t no pongy anywhere!")
  context.stop(self)
 case "pong" =>
  log.info("got a pong from another dimension.")
  context.stop(self)
- Минусы:
-- Нельзя посылать один и тот же тип сообщения из разных receiver. Для разграничения нужно будет включать уникальный ключ.
-- сложно настроить ожидание определенной цепочки событий (послать в 2 актора и ждать ответа)

Молниеносный анализ Spark

Вынесено в отдельную статью

1 комментарий: