Model Aktorów

Konrad Siek <konrad.siek@cs.put.edu.pl>

KN SKiSR <dsg.cs.put.poznan.pl/wiki>

Zadanie

Napisz program który obliczy sumę wszystkich elementów tablicy.

int[] array = new int[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
int sum = 0;

for (int i = 0; i < array.length; i++) {
   sum += array[i];
}

[java]

int[] array = new int[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
int sum = 0;

for (int e : array) {
    sum += e;
}

[java]

List<Integer> array = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
int sum = 0;

for (int e : array) {
    sum += e;
}

[java]

List<Integer> array = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
int sum = array.stream().reduce((a, b) -> a + b).get();

[java 8]

val sum = (1 to 10) reduce ((a, b)  a + b)

[scala]

  int[] array = new int[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
  int sum = 0;
  int s = 9;
  int i = 0;

FOR_LOOP:
  sum += array[i];
  i++;
  s--;
  if (s == 0) {
    goto FOR_LOOP;
  }

[pseudojava]

.data
  array dw 1,2,3,5,6,7,8,9,10

.code
  mov  ax,@data
  mov  ds,ax

  mov  si,offset array
  mov  cx,9
  xor  ax,ax

for_loop:
  mov  bx,[ si ]
  add  ax,bx
  add  si,2
  loop for_loop

[asm]

Pytanie

Abstrakcja

Zadanie

public class Server {
    private static final MessageDigest md5 = MessageDigest.getInstance("MD5");

    public static final String digest(String input) throws Exception {
        byte[] inputBytes = input.getBytes();
        return String.valueOf(md5.digest(inputBytes));
    }

    public static final void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(6969);
        Socket clientSocket = serverSocket.accept();
        OutputStream outputSteam = clientSocket.getOutputStream();
        PrintWriter out = new PrintWriter(outputSteam, true);
        InputStream inputStream = clientSocket.getInputStream();
        InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
        BufferedReader in = new BufferedReader(inputStreamReader);

        String inputLine, outputLine;
        while ((inputLine = in.readLine()) != null) {
            out.println(digest(inputLine));
        }
    }
}

public class Client {
    public static final void main(String[] args) throws Exception {
        Socket socket = new Socket("localhost", 6969);
        OutputStream outputSteam = clientSocket.getOutputStream();
        PrintWriter out = new PrintWriter(outputSteam, true);
        InputStream inputStream = clientSocket.getInputStream();
        InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
        BufferedReader in = new BufferedReader(inputStreamReader);

        out.println("Tej!");
        System.out.println(in.readLine());
    }
}

[java]

.data
  array dw 1,2,3,5,6,7,8,9,10

.code
  mov  ax,@data
  mov  ds,ax

  mov  si,offset array
  mov  cx,9
  xor  ax,ax

for_loop:
  mov  bx,[ si ]
  add  ax,bx
  add  si,2
  loop for_loop

[asm]

public class Server {
    private static final MessageDigest md5 = MessageDigest.getInstance("MD5");

    public static final String digest(String input) throws Exception {
        byte[] inputBytes = input.getBytes();
        return String.valueOf(md5.digest(inputBytes));
    }

    private void onReceive(String item) {
        sender.reply(digest(item));
    }

    public static final void main(String[] args) throws Exception {
        start("HashServ");
    }
}

public class Client {
    public static final void main(String[] args) throws Exception {
        Server server = getServer("HashServ");
        String response = server.send("Tej!");
        System.out.println(response);
    }
}

[magic unicorm land]

class Server {
  val md5 = MessageDigest.getInstance("MD5")

  def digest(input: String) = {
    val inputBytes = input.getBytes()
    String.valueOf(md5.digest(inputBytes))
  }

  def onReceive(String item) =
    sender reply (digest(item))
}

object Server extends App {
  start("HashServ")
}

object Client extends App {
  val server = getServer("HashServ")
  val response = server send "Tej!"
  println(response)
}

[magic unicorm land]

Potrzebna abstrakcja

Zadanie

Kod:

gitlab.cs.put.poznan.pl/ksiek/Actors2016

ProducerConsumer

Potrzebna abstrakcja

class Queue {
  val queue = mutable.List()

  def onReceive = {
      case 'add, item =>
        queue = queue :+ item
      case 'get =>
        if (queue.isEmpty)
          wait
        else {
          sender reply (queue.head)
          queue = queue.tail
        }
    }
}

object Queue extends App {
  start("Q")
}

object Consumer extends App {
  while (true) {
    val response = (getQueue("Q")) send 'get
    println(response)
  }
}

object Producer extends App {
  var counter = 0
  while (true) {
    (getQueue("Q")) send ('add, counter)
    counter += 1
  }
}

[magic unicorm land]

Abstrakcje

  1. Network server (np. MINA, Netty)
  2. Message Passing (Java Message Queues)
  3. Actors (Akka Actors)
  4. Remote Procedure Calls (Java RMI)

System aktorów

Teoria z lat 70-tych: uniwersalny prymityw do obliczeń równoległych

Rosnąca popularność: Twitter.

aktory vs aktorzy -- who cares!

Aktor

Limits are Freedom

Biblioteka Akka Actors http://akka.io/

Wersja: Akka 2.4.4 (Scala 2.11, Java 1.8)

Definicja

class MyActor extends UntypedActor {
    public void onReceive(Object message) {
        // ???
    }
}

[java]

class MyActor extends Actor {
  def receive = {
    case message => ???
  }
}

[scala]

Uruchomienie

public static void main(String[] args) {
    final ActorSystem system = ActorSystem.create("MyActorSystem");
    final ActorRef myActor = system.actorOf(Props.create(MyActor.class), "MyActor1");

    system.terminate()
}

[java]

object HashActor extends App {
  val system = ActorSystem("MyActorSystem")
  val hashActor = system.actorOf(Props[MyActor], "MyActor1")

  system.terminate
}

[scala]

Komunikacja

(fire and forget)

myActor.tell("Message", self)
myActor.tell("Message", ActorRef.noSender())

[java]

myActor tell ("Message", self)
myActor tell ("Message", Actor.noSender())

myActor ! "Message"

[scala]

Wykorzystanie:

Komunikacja

(komunikacja zwrotna)

import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.pattern.ask
import scala.concurrent.ExecutionContext.Implicits.global
implicit val timeout = Timeout(5 seconds)
val future = hashActor ? "Tej!"
val result = Await.result(future, Duration.Inf);
println(result)

future onSuccess {
  case response => println(response)
}

[scala]

Wykorzystanie:

Komunikacja

(komunikacja zwrotna)

Future future = ask(actor, "Tej!", 5000);
Object response = Await.result(future, Duration.Inf());
System.out.println(response);

OnSuccess handler = new OnSuccess() {
    public void onSuccess(Object result) {
        System.out.println(result);
    }
};
future.onSuccess(handler, system.dispatcher());

[java]

Wykorzystanie:

Obsługa wiadomości

Komunikacja zwrotna:

class MyActor extends UntypedActor {
    public void onReceive(Object message) {
        if (message instanceof String) {
            getSender().tell(((String) message).toUpperCase(), self);
        } else {
            undelivered(message);
        }
    }
}

[java]

class MyActor extends Actor {
  def receive = {
    case message : String => sender ! message.toUpper
  }
}

[scala]

Obsługa wiadomości

Przekierowanie wiadomości:

class MyActor extends UntypedActor {
    public void onReceive(Object message) {
        if (message instanceof String) {
            ActorSelection actor = getContext().actorSelection("/user/MyActor2");
            actor.tell(message, self);
        } else {
            undelivered(message);
        }
    }
}

[java]

class MyActor extends Actor {
  def receive = {
    case message : String => {
        val actor = context.actorSelection("/user/MyActor2")
        actor ! message
    }
  }
}

[scala]

Obsługa wiadomości

Transparentne przekierowanie wiadomości:

class MyActor extends UntypedActor {
    public void onReceive(Object message) {
        if (message instanceof String) {
            ActorSelection actor = getContext.actorSelection("/user/MyActor2");
            // actor.tell(message, sender);
            actor.forward(message);
        } else {
            undelivered(message);
        }
    }
}

[java]

class MyActor extends Actor {
  def receive = {
    case message : String => {
        val actor = context.actorSelection("/user/MyActor2")
        actor forward message
    }
  }
}

[scala]

Zadanie

public class HashActorJava extends UntypedActor {

    private static MessageDigest md5 = MessageDigest.getInstance("MD5");

    public static final String digest(String input) {
        byte[] inputBytes = input.getBytes();
        return String.valueOf(md5.digest(inputBytes));
    }

    public void onReceive(Object input) {
        String message = (String) input;
        getSender().tell(digest(message), getSelf());
    }

    public static void main(String[] args) throws Exception {
        final ActorSystem system = ActorSystem.create("HashSystem");
        final ActorRef hashActor = system.actorOf(Props.create(HashActorJava.class), "HashActor");

        Future future = ask(hashActor, "Tej!", 5000);
        Object response = Await.result(future, Duration.Inf());
        System.out.println(response);

        system.terminate();
    }
}
class HashActor extends Actor {

  val md5 = MessageDigest.getInstance("MD5")

  def digest(input: String) = {
    val inputBytes = input.getBytes()
    java.lang.String.valueOf(md5.digest(inputBytes))
  }

  def receive = {
    case input: String => sender ! digest(input)
  }
}

object HashActor extends App {
  val system = ActorSystem("HashSystem")
  val hashActor = system.actorOf(Props[HashActor], "HashActor")

  implicit val timeout = Timeout(5 seconds)
  val future = hashActor ? List("Tej!")
  val result = Await.result(future, Duration.Inf)
  println(result)

  system.terminate()
}

Remoting

Aktory są niezależne od lokalizacji, więc mogą być umieszczane w dowolnym wątku lub na dowolnym wierzchołku sieci.

Rozpraszanie systemu aktorów następuje przez plik konfiguracyjny application.conf.

Konfiguracja dla serwera systemu aktorów

akka {
    actor {
        provider = "akka.remote.RemoteActorRefProvider"     # default: akka.actor.LocalActorRefProvider
    }
    remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
            hostname = "10.0.0.10"                          # actor system starts here
            port = 9001                                     # actor system's port, 0 for automatic
        }
    }
}

Dokumentacja: http://doc.akka.io/docs/akka/current/scala/remoting.html#remote-configuration-scala

Skąd konfiguracja?

Domyślnie używana jest konfiguracja w pliku nazwanym application.conf w korzeniu ścieżki projektu (np. w src/). Można wskazać inny plik konfiguracyjny przez argumenty JVM: -Dconfig.resource=custom.conf lub w kodzie programu jako argument przy tworzeniu systemu aktorów:

ActorSystem system = ActorSystem.create("SomethingSomething", ConfigFactory.load().getConfig("custom"));

[java]

val system = ActorSystem("SomethingSomething", ConfigFactory.load.getConfig("custom"))

[scala]

Dostęp do zdalnego aktora

public static void main(String[] args) {
    final ActorSystem system = ActorSystem.create("MyActorSystem");
    final ActorSelection myActor =
            system.actorSelection("akka.tcp://RemoteActorSystem@10.0.0.10:9010/user/HashActor1");

    Future future = ask(hashActor, "Tej!", 5000);
    Object response = Await.result(future, Duration.Inf());
    System.out.println(response);

    system.terminate()
}
object RemoteClient extends App {
  val system = ActorSystem("LocalSystem")
  val remoteActor =
          system.actorSelection("akka.tcp://RemoteActorSystem@10.0.0.10:9010/user/HashActor1")

  implicit val timeout = Timeout(5 seconds)
  val future = remoteActor ? List("Tej!")
  val result = Await.result(future, Duration.Inf)
  println(result)

  system.terminate()
}

(Wewnątrz aktora można użyć actorSelection z context.)

Tworzenie aktora na innym węźle

import akka.actor.Deploy
import akka.remote.RemoteScope

implicit val timeout = Timeout(10 seconds)
val system = ActorSystem("LocalSystem")
val actor = system.actorOf(Props[HashActor].withDeploy(
    Deploy("akka.tcp://RemoteActorSystem@10.0.0.11:9010/user/HashActor3")
))

Ścieżki

protocol://actor_system/scope/actor

protokoły: akka, akka.tcp

przestrzenie: user, system, deadletters, temp, remote

dostęp do ścieżki: self.path

wildcard: *, ?

ścieżki względne: ../mybrothertheactor1, ../*

Stash

Odkładanie wiadomości do przetworzenia później.

public class StashActor extends UntypedActorWithStash {
    // ...
}

[java]

class StashActor extends Actor with Stash {
    // ...
}

[scala]

metody: stash, unstashAll

Pełny przykład:

gitlab.cs.put.poznan.pl/ksiek/Actors2016

Examples/procrastinActor

Stan

Aktorzy jako maszyna stanów (become i unbecome):

class ABActor extends Actor {
  def receive: Receive = {
        case "msg A" => {
            // ...
            context.become(receiveA)
        }
        case "msg B" => {
            // ...
            context.become(receiveB)
        }
  }
  def receiveA: Receive = {
        case "msg A" => {
            // ...
        }
        case "msg B" => {
            // ...
            context.become(receiveB)
        }
        case _ => {
            context.unbecome()
        }
  }
  def receiveB : Receive = ???
}

[scala]

Przykład

Kod:

gitlab.cs.put.poznan.pl/ksiek/Actors2016

Examples/statefulactor

Zadanie

Kod:

gitlab.cs.put.poznan.pl/ksiek/Actors2016

Examples/producerconsumer

Co jeszcze?

SpaceForward
Right, Down, Page DownNext slide
Left, Up, Page UpPrevious slide
POpen presenter console
HToggle this help