Systemy współbieżne i rozproszone

Prezentacja: http://www.cs.put.poznan.pl/ksiek/fp/concurrency/

Wątki

Analogicznie do wątków w Javie:

val th = new Thread(new Runnable {
  def run() {
    println("new thread")
  }
})

th start
println("old thread")
th join

Przykład: uruchomienie funkcji w nowym wątku:

def start[T](expression: => T) = {
    val th = new Thread(new Runnable{ def run() { expression } })
    th.start; th
}

start((0 until 100) map {e => println(e * e)})

Sterowanie współbieżnością

Współbieżny dostęp do sekwencji:

var seq = 0

class Incr extends Thread {
    override def run () {
        println(seq)
        seq = seq + 1
    }
}

(0 until 10) map ((_) => (new Incr).start)

Synchronizacja przed wejściem do sekcji krytycznej:

var seq = 0

class Incr extends Thread {
    override def run () {
        (this getClass) synchronized {
            println(seq)
            seq = seq + 1
        }
    }
}

(0 until 10) map ((_) => (new Incr).start)

Zmienna ulotna (volatile) – zmienna używana przez wiele wątków nie podlegająca cache-owaniu.

@volatile var seq = 0

Prymitywy synchronizacji z java.util.concurrent._:

import java.util.concurrent.atomic.AtomicInteger
val aint = new AtomicInteger(0)
aint set 1
aint get

import java.util.concurrent.locks.ReentrantReadWriteLock
val l = new ReentrantReadWriteLock

val rl = l readLock
val wl = l writeLock

rl lock
rl unlock

ExecutorService
ForkJoinPool

// etc

Pula wątków:

import java.util.concurrent.Executors

val cores = Runtime.getRuntime.availableProcessors
val pool = Executors.newFixedThreadPool(cores)

def toRunnable[T] (expression: => T) = { new Runnable{ def run() { expression } } }

pool.execute{ toRunnable { Thread sleep 1000; println("Hello") } }
pool.execute{ toRunnable { Thread sleep 10;   println("World") } }

pool.shutdown

Wyniki obliczeń w puli wątków (i konwersja między Javą i Scalą):

def toCallable[T] (expression: => T) = {
    new java.util.concurrent.Callable[T]{ def call() = { expression } }
}
val tasks = ((0 until 100) toList) map (e => toCallable {e * e})

import scala.collection.JavaConversions._
val jTasks = asJavaCollection(tasks)

val jResult = pool.invokeAll( jTasks )
val result = (asScalaBuffer(jResult) toList) map (e => e.get)

Embarassingly Parallel Problems

Kolekcje mają wersje współbieżne.

ParArray
ParVector
mutable.ParHashMap
mutable.ParHashSet
immutable.ParHashMap
immutable.ParHashSet
ParRange
ParTrieMap

Użycie:

val list = List(0,1,2,3,4,5,6,7,8,9)

list foreach { e => println(e) }

(list.par) foreach { e => println(e) }

Przyszłość

Future to obiekt który przechowuje jakąś wartość, która kiedyś w przyszłości stanie się dostępna. Funkcja może być nie zakończona (not completed) lub zakończona (completed), gdzie zakończenie może być pomyślne (succesful) lub błędne (failed – np. zakończone wyjątkiem).

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

val f: Future[List[Int]] = Future { ((0 until 100) toList) map (e => e * e) }
val f: Future[List[Int]] = Future { Thread sleep 1000; ((0 until 100) toList) map (e => e * e) }

Asynchroniczne odebranie wyniku z przyszłosci.

f onSuccess { case result => println("done!" + result.last) }
f onFailure { case exception => println("fail!" + exception) }

Synchroniczne odebranie wyniku z przyszłości.

f value

import scala.concurrent.Await
import scala.concurrent.duration._

Await.result(f, 100 seconds)
Await.result(f, Duration.Inf)

Przykład zrównoleglenia obliczeń za pomocą przyszłości:

val l = (0 until 100) toList
val f = (x:Int) => println(x)
l map { e => Future { f(e) } }

val f = (x:Int) => { println(x); x * x }
l map { e => Future { f(e) } } map { e => Await.result(e, Duration.Inf) }

Redukcja: Future.reduce, Future.fold.

Systemy aktorskie

Aktor to uniwersalny prymityw do obliczeń równoległych. Aktory mogą wysyłać i odbierać komunikaty od innych aktorów, podejmować decyzje i tworzyć innych aktorów. Aktory mogą być stanowe lub bezstanowe, ale powinny być hermetyczne i nie wymieniać informacji ze światem w sposób inny niż przez niezmienne (immutable) komunikaty. Aktory można traktowac jak bardzo purystyczne obiekty.

System aktorów w języku Scala jest dostępny przez bilbiotekę Akka. Konfiguracja wymaga umieszczenia config-*.jar i akka-actor_*.jar w ścieżce projektu.

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Status.Success
import akka.actor.ActorRef

// Actor definition.
class FinalCountdown(id:Int) extends Actor {
  def receive = {
    case (actor: ActorRef, number: Int) => {
      println(id + " sending " + number)
      // send message: operator '!' equivalent to method 'tell'
      actor ! number
    }
    case number: Int => {
      println(id + " received " + number)
      if (number > 0)
        // sender is implicitly defined for each message
        sender ! (number - 1)
      else
        println(id + " done " + number)
    }
  }
}

object finalCountdown extends App {
  // Create actor system.
  val system = ActorSystem("ItsTheFinalCountdown")

  // Create new actor reference (proxy).
  val one = system.actorOf(Props(new FinalCountdown(1)), "ActorOne")
  val two = system.actorOf(Props(new FinalCountdown(2)), "ActorTwo")

  one ! (two, 10)

  system.shutdown
}

Komunikacja tell:

one ! 10
one tell 10

Wykonana asynchronicznie (fire and forget), komunikat ostatecznie dotrze do celu, nie zwraca wyniku (zakłada, że aktor skomunikuje się z wysyłającym aktorem przez sender).

Komunikacja ask:

import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

implicit val timeout = Timeout(10 seconds)

val future = one ? 10
val future = one ask 10

future onSuccess { case x => println("result=" + x) }

Wykonana asynchronicznie ale zwraca Future, które będzie zawierało wiadomość wysłaną przez aktora do obiektu sender. Future daje możliwość synchronizacji.

Komunikacja forward:

abstract class Message
case class Request(needle: Int, visited: List[Int])
case class Success(id: Int)
case class Failure(id: Int)
case class Connect(next: ActorRef)

// Actor definition.
class ChainLink(id: Int) extends Actor {
  var next: ActorRef = null
  def receive = {
    case Request(needle, visited) => {
      println(id + " request " + needle + " visited " + visited)
      if (needle == id) {
        println(id + " success " + needle)
        sender ! Success(id)
      } else if (next == null) {
        println(id + " failure " + needle)
        sender ! Failure(id)
      } else {
        println(id + " forwarding " + needle)
        next forward Request(needle, id :: visited)
      }
    }
    case Connect(actor) => { println(self + " connecting to " + actor); next = actor }
  }
}

object chain extends App {
  implicit val timeout = Timeout(10 seconds)

  // Create actor system.
  val system = ActorSystem("Chain")

  // Create new actor reference (proxy).
  val actors = (0 to 5) map (e => system.actorOf(Props(new ChainLink(e)), e.toString))
  actors.reduceLeft((acc: ActorRef, next: ActorRef) => {acc ! Connect(next); next})

  val f4 = actors.head ? Request(4, List())
  val r4 = Await.result(f4, Duration.Inf)
  println(r4)

  val f6 = actors.head ? Request(6, List())
  val r6 = Await.result(f6, Duration.Inf)
  println(r6)

  system.shutdown
}

Własności aktorów: sender, self, context (np. referencje do innych aktorów actorSelection przez ID wzoru PROTOCOL://ACTOR_SYSTEM/SCOPE/ACTOR/). Przykładowe protokoły: akka, akka.tcp. Przykładowe przestrzenie: user, system, deadLetters.

Aktorzy jako maszyna stanów (become i unbecome):

class Parser extends Actor {
  def receive = {
        case "{" => { sender ! true; context.become(comment) }
        case "}" => sender ! false
        case _ => sender ! true
  }

  def comment: Receive = {
        case "{" => { sender ! false; context.unbecome() }
        case "}" => { sender ! true; context.unbecome() }
        case _ => sender ! true
  }
}

object parser extends App {
  implicit val timeout = Timeout(10 seconds)

  val system = ActorSystem("Parser")
  val parser = system.actorOf(Props(new Parser()))

  List("{", "x", "}", "}") foreach { e =>
      val future = parser ? e;
      val result = Await.result(future, 10 seconds)
      println(e + " -> " + result)
  }

  system.shutdown
}

Opóźnianie obsługi wiadomości (trait Stash: stash, unstashAll).

Remoting

Aktory są niezależne od lokalizacji, więc mogą być umieszczane w dowolnym wątku lub na dowolnym wierzchołku sieci. Rozpraszanie systemu aktorów następuje przez plik konfiguracyjny application.conf. Dodatkowo rozpraszanie wymaga akka-remote_*.jar, protobuf-*.jar, netty-*.jar.

Konfiguracja dla serwera systemu aktorów.

akka {
    actor {
        provider = "akka.remote.RemoteActorRefProvider"     # default: akka.actor.LocalActorRefProvider
    }
    remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
            hostname = "10.0.0.10"                          # actor system starts here
            port = 9001                                     # actor system's port, 0 for automatic
        }
    }
}

Więcej opcji konfiguracji opisanych w dokumentacji.

Domyślnie używana jest konfiguracja w pliku nazwanym application.conf w korzeniu ścieżki projektu (np. w src/). Można wskazać inny plik konfiguracyjny przez argumenty JVM: -Dconfig.resource=custom.conf lub w kodzie programu jako argument przy tworzeniu systemu aktorów:

val system = ActorSystem("SomethingSomething", ConfigFactory.load.getConfig("custom"))

Kod zdalnego aktora:

package remoting

import akka.actor._

class RemoteActor extends Actor {
  def receive = {
    case msg: Int =>
        println("received " + msg)
        if (msg > 0)
            sender ! (msg - 1)
  }
}

object remote extends App  {
  val system = ActorSystem("RemoteActorSystem")
  system.actorOf(Props(new RemoteActor), "RemoteActor") // Creates actor instance.
}

Wywołanie zdalnego aktora:

object local extends App {
    implicit val timeout = Timeout(10 seconds)
    val system = ActorSystem("LocalSystem")
    val actor = system.actorSelection("akka.tcp://RemoteActorSystem@127.0.0.1:9010/user/RemoteActor")
    val future = actor ? 1
    val result = Await.result(future, 10 seconds)
    println(result)
}

(Wewnątrz aktora można użyć actorSelection z context.)

Tworzenie aktora na innym węźle:

import akka.actor.Deploy
import akka.remote.RemoteScope

object local extends App {
    implicit val timeout = Timeout(10 seconds)
    val system = ActorSystem("LocalSystem")
    val actor = system.actorOf(Props[RemoteActor].withDeploy(Deploy("akka.tcp://RemoteActorSystem@127.0.0.1:9010/user/RemoteActor")))
    val future = actor ? 1
    val result = Await.result(future, 10 seconds)
    println(result)
}

Warning: not properly tested

Transparentne rozmieszczanie aktorów na węzłach: clustering

Ankieta

Ćwiczenia

  1. Rozwiąż problem producent/konsument używając (a) wątków i (b) aktorów.
  2. Rozwiąż problem z 1 nie używając zmiennych.
  3. Rozszerz rozwiązanie problemu producent/konsument do n konsumentów, zapewniając, że rozwiązanie będzie sprawiedliwe.
  4. Rozwiąż problem czytelników i pisarzy używając (a) wątków i (b) aktorów.
  5. Zaimplementuj funkcję \(f(X) = \sum_{x\in X} x^2\) używając aktorów i techniki map-reduce (map dla \(x^2\) i reduce dla sumy).