Analogicznie do wątków w Javie:
val th = new Thread(new Runnable { def run() { println("new thread") } }) th start println("old thread") th join
Przykład: uruchomienie funkcji w nowym wątku:
def start[T](expression: => T) = { val th = new Thread(new Runnable{ def run() { expression } }) th.start; th } start((0 until 100) map {e => println(e * e)})
Współbieżny dostęp do sekwencji:
var seq = 0 class Incr extends Thread { override def run () { println(seq) seq = seq + 1 } } (0 until 10) map ((_) => (new Incr).start)
Synchronizacja przed wejściem do sekcji krytycznej:
var seq = 0 class Incr extends Thread { override def run () { (this getClass) synchronized { println(seq) seq = seq + 1 } } } (0 until 10) map ((_) => (new Incr).start)
Zmienna ulotna (volatile) -- zmienna używana przez wiele wątków nie podlegająca cache-owaniu.
@volatile var seq = 0
Prymitywy synchronizacji z java.util.concurrent._:
import java.util.concurrent.atomic.AtomicInteger val aint = new AtomicInteger(0) aint set 1 aint get import java.util.concurrent.locks.ReentrantReadWriteLock val l = new ReentrantReadWriteLock val rl = l readLock val wl = l writeLock rl lock rl unlock ExecutorService ForkJoinPool // etc
Pula wątków:
import java.util.concurrent.Executors val cores = Runtime.getRuntime.availableProcessors val pool = Executors.newFixedThreadPool(cores) def toRunnable[T] (expression: => T) = { new Runnable{ def run() { expression } } } pool.execute{ toRunnable { Thread sleep 1000; println("Hello") } } pool.execute{ toRunnable { Thread sleep 10; println("World") } } pool.shutdown
Wyniki obliczeń w puli wątków (i konwersja między Javą i Scalą):
def toCallable[T] (expression: => T) = { new java.util.concurrent.Callable[T]{ def call() = { expression } } } val tasks = ((0 until 100) toList) map (e => toCallable {e * e}) import scala.collection.JavaConversions._ val jTasks = asJavaCollection(tasks) val jResult = pool.invokeAll( jTasks ) val result = (asScalaBuffer(jResult) toList) map (e => e.get)
Kolekcje mają wersje współbieżne.
ParArray ParVector mutable.ParHashMap mutable.ParHashSet immutable.ParHashMap immutable.ParHashSet ParRange ParTrieMap
Użycie:
val list = List(0,1,2,3,4,5,6,7,8,9) list foreach { e => println(e) } (list.par) foreach { e => println(e) }
Future to obiekt który przechowuje jakąś wartość, która kiedyś w przyszłości stanie się dostępna. Funkcja może być nie zakończona (not completed) lub zakończona (completed), gdzie zakończenie może być pomyślne (succesful) lub błędne (failed -- np. zakończone wyjątkiem).
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future val f: Future[List[Int]] = Future { ((0 until 100) toList) map (e => e * e) } val f: Future[List[Int]] = Future { Thread sleep 1000; ((0 until 100) toList) map (e => e * e) }
Asynchroniczne odebranie wyniku z przyszłosci.
f onSuccess { case result => println("done!" + result.last) } f onFailure { case exception => println("fail!" + exception) }
Synchroniczne odebranie wyniku z przyszłości.
f value import scala.concurrent.Await import scala.concurrent.duration._ Await.result(f, 100 seconds) Await.result(f, Duration.Inf)
Przykład zrównoleglenia obliczeń za pomocą przyszłości:
val l = (0 until 100) toList val f = (x:Int) => println(x) l map { e => Future { f(e) } } val f = (x:Int) => { println(x); x * x } l map { e => Future { f(e) } } map { e => Await.result(e, Duration.Inf) }
Redukcja: Future.reduce, Future.fold.
Aktor to uniwersalny prymityw do obliczeń równoległych. Aktory mogą wysyłać i odbierać komunikaty od innych aktorów, podejmować decyzje i tworzyć innych aktorów. Aktory mogą być stanowe lub bezstanowe, ale powinny być hermetyczne i nie wymieniać informacji ze światem w sposób inny niż przez niezmienne (immutable) komunikaty. Aktory można traktowac jak bardzo purystyczne obiekty.
System aktorów w języku Scala jest dostępny przez bilbiotekę Akka. Konfiguracja wymaga umieszczenia config-*.jar i akka-actor_*.jar w ścieżce projektu.
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props import akka.actor.Status.Success import akka.actor.ActorRef // Actor definition. class FinalCountdown(id:Int) extends Actor { def receive = { case (actor: ActorRef, number: Int) => { println(id + " sending " + number) // send message: operator '!' equivalent to method 'tell' actor ! number } case number: Int => { println(id + " received " + number) if (number > 0) // sender is implicitly defined for each message sender ! (number - 1) else println(id + " done " + number) } } }
object finalCountdown extends App { // Create actor system. val system = ActorSystem("ItsTheFinalCountdown") // Create new actor reference (proxy). val one = system.actorOf(Props(new FinalCountdown(1)), "ActorOne") val two = system.actorOf(Props(new FinalCountdown(2)), "ActorTwo") one ! (two, 10) system.shutdown }
Komunikacja tell:
one ! 10 one tell 10
Wykonana asynchronicznie (fire and forget), komunikat ostatecznie dotrze do celu, nie zwraca wyniku (zakłada, że aktor skomunikuje się z wysyłającym aktorem przez sender).
Komunikacja ask:
import akka.pattern.ask import akka.util.Timeout import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global implicit val timeout = Timeout(10 seconds) val future = one ? 10 val future = one ask 10 future onSuccess { case x => println("result=" + x) }
Wykonana asynchronicznie ale zwraca Future, które będzie zawierało wiadomość wysłaną przez aktora do obiektu sender. Future daje możliwość synchronizacji.
Komunikacja forward:
abstract class Message case class Request(needle: Int, visited: List[Int]) case class Success(id: Int) case class Failure(id: Int) case class Connect(next: ActorRef) // Actor definition. class ChainLink(id: Int) extends Actor { var next: ActorRef = null def receive = { case Request(needle, visited) => { println(id + " request " + needle + " visited " + visited) if (needle == id) { println(id + " success " + needle) sender ! Success(id) } else if (next == null) { println(id + " failure " + needle) sender ! Failure(id) } else { println(id + " forwarding " + needle) next forward Request(needle, id :: visited) } } case Connect(actor) => { println(self + " connecting to " + actor); next = actor } } }
object chain extends App { implicit val timeout = Timeout(10 seconds) // Create actor system. val system = ActorSystem("Chain") // Create new actor reference (proxy). val actors = (0 to 5) map (e => system.actorOf(Props(new ChainLink(e)), e.toString)) actors.reduceLeft((acc: ActorRef, next: ActorRef) => {acc ! Connect(next); next}) val f4 = actors.head ? Request(4, List()) val r4 = Await.result(f4, Duration.Inf) println(r4) val f6 = actors.head ? Request(6, List()) val r6 = Await.result(f6, Duration.Inf) println(r6) system.shutdown }
Własności aktorów: sender, self, context (np. referencje do innych aktorów actorSelection przez ID wzoru PROTOCOL://ACTOR_SYSTEM/SCOPE/ACTOR/). Przykładowe protokoły: akka, akka.tcp. Przykładowe przestrzenie: user, system, deadLetters.
Aktorzy jako maszyna stanów (become i unbecome):
class Parser extends Actor { def receive = { case "{" => { sender ! true; context.become(comment) } case "}" => sender ! false case _ => sender ! true } def comment: Receive = { case "{" => { sender ! false; context.unbecome() } case "}" => { sender ! true; context.unbecome() } case _ => sender ! true } } object parser extends App { implicit val timeout = Timeout(10 seconds) val system = ActorSystem("Parser") val parser = system.actorOf(Props(new Parser())) List("{", "x", "}", "}") foreach { e => val future = parser ? e; val result = Await.result(future, 10 seconds) println(e + " -> " + result) } system.shutdown }
Bezstanowy aktor przez become:
class Counter extends Actor { class F(i : Int) extends PartialFunction[scala.Any, scala.Unit] { def apply(v : Any): Unit = { println(i); context.become(new F(i+1)) } def isDefinedAt(x: Any) = true } def receive() = { case _ => { println(0); context.become(new F(1)) } } } object Counter extends App { val system = ActorSystem("Counters") val counter = system.actorOf(Props[Counter], "Counter") counter ! () counter ! () counter ! () }
Opóźnianie obsługi wiadomości (trait Stash: stash, unstashAll).
Aktory są niezależne od lokalizacji, więc mogą być umieszczane w dowolnym wątku lub na dowolnym wierzchołku sieci. Rozpraszanie systemu aktorów następuje przez plik konfiguracyjny application.conf. Dodatkowo rozpraszanie wymaga akka-remote_*.jar, protobuf-*.jar, netty-*.jar.
Konfiguracja dla serwera systemu aktorów.
Cannot analyze code. No Pygments lexer found for "json".
.. code-block:: json akka { actor { provider = "akka.remote.RemoteActorRefProvider" # default: akka.actor.LocalActorRefProvider } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "10.0.0.10" # actor system starts here port = 9001 # actor system's port, 0 for automatic } } }
Więcej opcji konfiguracji opisanych w dokumentacji.
Domyślnie używana jest konfiguracja w pliku nazwanym application.conf w korzeniu ścieżki projektu (np. w src/). Można wskazać inny plik konfiguracyjny przez argumenty JVM: -Dconfig.resource=custom.conf lub w kodzie programu jako argument przy tworzeniu systemu aktorów:
val system = ActorSystem("SomethingSomething", ConfigFactory.load.getConfig("custom"))
Kod zdalnego aktora:
package remoting import akka.actor._ class RemoteActor extends Actor { def receive = { case msg: Int => println("received " + msg) if (msg > 0) sender ! (msg - 1) } } object remote extends App { val system = ActorSystem("RemoteActorSystem") system.actorOf(Props(new RemoteActor), "RemoteActor") // Creates actor instance. }
Wywołanie zdalnego aktora:
object local extends App { implicit val timeout = Timeout(10 seconds) val system = ActorSystem("LocalSystem") val actor = system.actorSelection("akka.tcp://RemoteActorSystem@127.0.0.1:9010/user/RemoteActor") val future = actor ? 1 val result = Await.result(future, 10 seconds) println(result) }
(Wewnątrz aktora można użyć actorSelection z context.)
Tworzenie aktora na innym węźle:
import akka.actor.Deploy import akka.remote.RemoteScope object local extends App { implicit val timeout = Timeout(10 seconds) val system = ActorSystem("LocalSystem") val actor = system.actorOf(Props[RemoteActor]. withDeploy(Deploy("akka.tcp://RemoteActorSystem@127.0.0.1:9010/user/RemoteActor"))) val future = actor ? 1 val result = Await.result(future, 10 seconds) println(result) }
Warning: not properly tested
Transparentne rozmieszczanie aktorów na węzłach: clustering
Space | Forward |
---|---|
Right, Down, Page Down | Next slide |
Left, Up, Page Up | Previous slide |
P | Open presenter console |
H | Toggle this help |