Systemy współbieżne i rozproszone¶
Prezentacja: http://www.cs.put.poznan.pl/ksiek/fp/concurrency/
Wątki¶
Analogicznie do wątków w Javie:
val th = new Thread(new Runnable {
def run() {
println("new thread")
}
})
th start
println("old thread")
th join
Przykład: uruchomienie funkcji w nowym wątku:
def start[T](expression: => T) = {
val th = new Thread(new Runnable{ def run() { expression } })
th.start; th
}
start((0 until 100) map {e => println(e * e)})
Sterowanie współbieżnością¶
Współbieżny dostęp do sekwencji:
var seq = 0
class Incr extends Thread {
override def run () {
println(seq)
seq = seq + 1
}
}
(0 until 10) map ((_) => (new Incr).start)
Synchronizacja przed wejściem do sekcji krytycznej:
var seq = 0
class Incr extends Thread {
override def run () {
(this getClass) synchronized {
println(seq)
seq = seq + 1
}
}
}
(0 until 10) map ((_) => (new Incr).start)
Zmienna ulotna (volatile) – zmienna używana przez wiele wątków nie podlegająca cache-owaniu.
@volatile var seq = 0
Prymitywy synchronizacji z java.util.concurrent._
:
import java.util.concurrent.atomic.AtomicInteger
val aint = new AtomicInteger(0)
aint set 1
aint get
import java.util.concurrent.locks.ReentrantReadWriteLock
val l = new ReentrantReadWriteLock
val rl = l readLock
val wl = l writeLock
rl lock
rl unlock
ExecutorService
ForkJoinPool
// etc
Pula wątków:
import java.util.concurrent.Executors
val cores = Runtime.getRuntime.availableProcessors
val pool = Executors.newFixedThreadPool(cores)
def toRunnable[T] (expression: => T) = { new Runnable{ def run() { expression } } }
pool.execute{ toRunnable { Thread sleep 1000; println("Hello") } }
pool.execute{ toRunnable { Thread sleep 10; println("World") } }
pool.shutdown
Wyniki obliczeń w puli wątków (i konwersja między Javą i Scalą):
def toCallable[T] (expression: => T) = {
new java.util.concurrent.Callable[T]{ def call() = { expression } }
}
val tasks = ((0 until 100) toList) map (e => toCallable {e * e})
import scala.collection.JavaConversions._
val jTasks = asJavaCollection(tasks)
val jResult = pool.invokeAll( jTasks )
val result = (asScalaBuffer(jResult) toList) map (e => e.get)
Embarassingly Parallel Problems¶
Kolekcje mają wersje współbieżne.
ParArray
ParVector
mutable.ParHashMap
mutable.ParHashSet
immutable.ParHashMap
immutable.ParHashSet
ParRange
ParTrieMap
Użycie:
val list = List(0,1,2,3,4,5,6,7,8,9)
list foreach { e => println(e) }
(list.par) foreach { e => println(e) }
Przyszłość¶
Future to obiekt który przechowuje jakąś wartość, która kiedyś w przyszłości stanie się dostępna. Funkcja może być nie zakończona (not completed) lub zakończona (completed), gdzie zakończenie może być pomyślne (succesful) lub błędne (failed – np. zakończone wyjątkiem).
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
val f: Future[List[Int]] = Future { ((0 until 100) toList) map (e => e * e) }
val f: Future[List[Int]] = Future { Thread sleep 1000; ((0 until 100) toList) map (e => e * e) }
Asynchroniczne odebranie wyniku z przyszłosci.
f onSuccess { case result => println("done!" + result.last) }
f onFailure { case exception => println("fail!" + exception) }
Synchroniczne odebranie wyniku z przyszłości.
f value
import scala.concurrent.Await
import scala.concurrent.duration._
Await.result(f, 100 seconds)
Await.result(f, Duration.Inf)
Przykład zrównoleglenia obliczeń za pomocą przyszłości:
val l = (0 until 100) toList
val f = (x:Int) => println(x)
l map { e => Future { f(e) } }
val f = (x:Int) => { println(x); x * x }
l map { e => Future { f(e) } } map { e => Await.result(e, Duration.Inf) }
Redukcja: Future.reduce
, Future.fold
.
Systemy aktorskie¶
Aktor to uniwersalny prymityw do obliczeń równoległych. Aktory mogą wysyłać i odbierać komunikaty od innych aktorów, podejmować decyzje i tworzyć innych aktorów. Aktory mogą być stanowe lub bezstanowe, ale powinny być hermetyczne i nie wymieniać informacji ze światem w sposób inny niż przez niezmienne (immutable) komunikaty. Aktory można traktowac jak bardzo purystyczne obiekty.
System aktorów w języku Scala jest dostępny przez bilbiotekę Akka. Konfiguracja wymaga umieszczenia config-*.jar
i
akka-actor_*.jar
w ścieżce projektu.
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Status.Success
import akka.actor.ActorRef
// Actor definition.
class FinalCountdown(id:Int) extends Actor {
def receive = {
case (actor: ActorRef, number: Int) => {
println(id + " sending " + number)
// send message: operator '!' equivalent to method 'tell'
actor ! number
}
case number: Int => {
println(id + " received " + number)
if (number > 0)
// sender is implicitly defined for each message
sender ! (number - 1)
else
println(id + " done " + number)
}
}
}
object finalCountdown extends App {
// Create actor system.
val system = ActorSystem("ItsTheFinalCountdown")
// Create new actor reference (proxy).
val one = system.actorOf(Props(new FinalCountdown(1)), "ActorOne")
val two = system.actorOf(Props(new FinalCountdown(2)), "ActorTwo")
one ! (two, 10)
system.shutdown
}
Komunikacja tell
:
one ! 10
one tell 10
Wykonana asynchronicznie (fire and forget), komunikat ostatecznie dotrze do
celu, nie zwraca wyniku (zakłada, że aktor skomunikuje się z wysyłającym
aktorem przez sender
).
Komunikacja ask
:
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
implicit val timeout = Timeout(10 seconds)
val future = one ? 10
val future = one ask 10
future onSuccess { case x => println("result=" + x) }
Wykonana asynchronicznie ale zwraca Future
, które będzie zawierało
wiadomość wysłaną przez aktora do obiektu sender
. Future
daje możliwość
synchronizacji.
Komunikacja forward
:
abstract class Message
case class Request(needle: Int, visited: List[Int])
case class Success(id: Int)
case class Failure(id: Int)
case class Connect(next: ActorRef)
// Actor definition.
class ChainLink(id: Int) extends Actor {
var next: ActorRef = null
def receive = {
case Request(needle, visited) => {
println(id + " request " + needle + " visited " + visited)
if (needle == id) {
println(id + " success " + needle)
sender ! Success(id)
} else if (next == null) {
println(id + " failure " + needle)
sender ! Failure(id)
} else {
println(id + " forwarding " + needle)
next forward Request(needle, id :: visited)
}
}
case Connect(actor) => { println(self + " connecting to " + actor); next = actor }
}
}
object chain extends App {
implicit val timeout = Timeout(10 seconds)
// Create actor system.
val system = ActorSystem("Chain")
// Create new actor reference (proxy).
val actors = (0 to 5) map (e => system.actorOf(Props(new ChainLink(e)), e.toString))
actors.reduceLeft((acc: ActorRef, next: ActorRef) => {acc ! Connect(next); next})
val f4 = actors.head ? Request(4, List())
val r4 = Await.result(f4, Duration.Inf)
println(r4)
val f6 = actors.head ? Request(6, List())
val r6 = Await.result(f6, Duration.Inf)
println(r6)
system.shutdown
}
Własności aktorów: sender
, self
, context
(np. referencje do innych
aktorów actorSelection
przez ID wzoru
PROTOCOL://ACTOR_SYSTEM/SCOPE/ACTOR/
). Przykładowe protokoły: akka
,
akka.tcp
. Przykładowe przestrzenie: user
, system
, deadLetters
.
Aktorzy jako maszyna stanów (become
i unbecome
):
class Parser extends Actor {
def receive = {
case "{" => { sender ! true; context.become(comment) }
case "}" => sender ! false
case _ => sender ! true
}
def comment: Receive = {
case "{" => { sender ! false; context.unbecome() }
case "}" => { sender ! true; context.unbecome() }
case _ => sender ! true
}
}
object parser extends App {
implicit val timeout = Timeout(10 seconds)
val system = ActorSystem("Parser")
val parser = system.actorOf(Props(new Parser()))
List("{", "x", "}", "}") foreach { e =>
val future = parser ? e;
val result = Await.result(future, 10 seconds)
println(e + " -> " + result)
}
system.shutdown
}
Opóźnianie obsługi wiadomości (trait Stash
: stash
, unstashAll
).
Remoting¶
Aktory są niezależne od lokalizacji, więc mogą być umieszczane w dowolnym wątku
lub na dowolnym wierzchołku sieci. Rozpraszanie systemu aktorów następuje przez
plik konfiguracyjny application.conf
. Dodatkowo rozpraszanie wymaga
akka-remote_*.jar
, protobuf-*.jar
, netty-*.jar
.
Konfiguracja dla serwera systemu aktorów.
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider" # default: akka.actor.LocalActorRefProvider
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "10.0.0.10" # actor system starts here
port = 9001 # actor system's port, 0 for automatic
}
}
}
Więcej opcji konfiguracji opisanych w dokumentacji.
Domyślnie używana jest konfiguracja w pliku nazwanym application.conf
w
korzeniu ścieżki projektu (np. w src/
). Można wskazać inny plik
konfiguracyjny przez argumenty JVM: -Dconfig.resource=custom.conf
lub w
kodzie programu jako argument przy tworzeniu systemu aktorów:
val system = ActorSystem("SomethingSomething", ConfigFactory.load.getConfig("custom"))
Kod zdalnego aktora:
package remoting
import akka.actor._
class RemoteActor extends Actor {
def receive = {
case msg: Int =>
println("received " + msg)
if (msg > 0)
sender ! (msg - 1)
}
}
object remote extends App {
val system = ActorSystem("RemoteActorSystem")
system.actorOf(Props(new RemoteActor), "RemoteActor") // Creates actor instance.
}
Wywołanie zdalnego aktora:
object local extends App {
implicit val timeout = Timeout(10 seconds)
val system = ActorSystem("LocalSystem")
val actor = system.actorSelection("akka.tcp://RemoteActorSystem@127.0.0.1:9010/user/RemoteActor")
val future = actor ? 1
val result = Await.result(future, 10 seconds)
println(result)
}
(Wewnątrz aktora można użyć actorSelection
z context
.)
Tworzenie aktora na innym węźle:
import akka.actor.Deploy
import akka.remote.RemoteScope
object local extends App {
implicit val timeout = Timeout(10 seconds)
val system = ActorSystem("LocalSystem")
val actor = system.actorOf(Props[RemoteActor].withDeploy(Deploy("akka.tcp://RemoteActorSystem@127.0.0.1:9010/user/RemoteActor")))
val future = actor ? 1
val result = Await.result(future, 10 seconds)
println(result)
}
Warning: not properly tested
Transparentne rozmieszczanie aktorów na węzłach: clustering
Ćwiczenia¶
- Rozwiąż problem producent/konsument używając (a) wątków i (b) aktorów.
- Rozwiąż problem z 1 nie używając zmiennych.
- Rozszerz rozwiązanie problemu producent/konsument do n konsumentów, zapewniając, że rozwiązanie będzie sprawiedliwe.
- Rozwiąż problem czytelników i pisarzy używając (a) wątków i (b) aktorów.
- Zaimplementuj funkcję \(f(X) = \sum_{x\in X} x^2\) używając aktorów i techniki map-reduce (map dla \(x^2\) i reduce dla sumy).