Napisz program który obliczy sumę wszystkich elementów tablicy.
int[] array = new int[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; int sum = 0; for (int i = 0; i < array.length; i++) { sum += array[i]; }
[java]
int[] array = new int[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; int sum = 0; for (int e : array) { sum += e; }
[java]
List<Integer> array = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); int sum = 0; for (int e : array) { sum += e; }
[java]
List<Integer> array = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); int sum = array.stream().reduce((a, b) -> a + b).get();
[java 8]
val sum = (1 to 10) reduce ((a, b) ⇒ a + b)
[scala]
int[] array = new int[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; int sum = 0; int s = 9; int i = 0; FOR_LOOP: sum += array[i]; i++; s--; if (s == 0) { goto FOR_LOOP; }
[pseudojava]
.data array dw 1,2,3,5,6,7,8,9,10 .code mov ax,@data mov ds,ax mov si,offset array mov cx,9 xor ax,ax for_loop: mov bx,[ si ] add ax,bx add si,2 loop for_loop
[asm]
public class Server { private static final MessageDigest md5 = MessageDigest.getInstance("MD5"); public static final String digest(String input) throws Exception { byte[] inputBytes = input.getBytes(); return String.valueOf(md5.digest(inputBytes)); } public static final void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(6969); Socket clientSocket = serverSocket.accept(); OutputStream outputSteam = clientSocket.getOutputStream(); PrintWriter out = new PrintWriter(outputSteam, true); InputStream inputStream = clientSocket.getInputStream(); InputStreamReader inputStreamReader = new InputStreamReader(inputStream); BufferedReader in = new BufferedReader(inputStreamReader); String inputLine, outputLine; while ((inputLine = in.readLine()) != null) { out.println(digest(inputLine)); } } } public class Client { public static final void main(String[] args) throws Exception { Socket socket = new Socket("localhost", 6969); OutputStream outputSteam = clientSocket.getOutputStream(); PrintWriter out = new PrintWriter(outputSteam, true); InputStream inputStream = clientSocket.getInputStream(); InputStreamReader inputStreamReader = new InputStreamReader(inputStream); BufferedReader in = new BufferedReader(inputStreamReader); out.println("Tej!"); System.out.println(in.readLine()); } }
[java]
.data array dw 1,2,3,5,6,7,8,9,10 .code mov ax,@data mov ds,ax mov si,offset array mov cx,9 xor ax,ax for_loop: mov bx,[ si ] add ax,bx add si,2 loop for_loop
[asm]
public class Server { private static final MessageDigest md5 = MessageDigest.getInstance("MD5"); public static final String digest(String input) throws Exception { byte[] inputBytes = input.getBytes(); return String.valueOf(md5.digest(inputBytes)); } private void onReceive(String item) { sender.reply(digest(item)); } public static final void main(String[] args) throws Exception { start("HashServ"); } } public class Client { public static final void main(String[] args) throws Exception { Server server = getServer("HashServ"); String response = server.send("Tej!"); System.out.println(response); } }
[magic unicorm land]
class Server { val md5 = MessageDigest.getInstance("MD5") def digest(input: String) = { val inputBytes = input.getBytes() String.valueOf(md5.digest(inputBytes)) } def onReceive(String item) = sender reply (digest(item)) } object Server extends App { start("HashServ") } object Client extends App { val server = getServer("HashServ") val response = server send "Tej!" println(response) }
[magic unicorm land]
Kod:
gitlab.cs.put.poznan.pl/ksiek/Actors2016
ProducerConsumer
class Queue { val queue = mutable.List() def onReceive = { case 'add, item => queue = queue :+ item case 'get => if (queue.isEmpty) wait else { sender reply (queue.head) queue = queue.tail } } } object Queue extends App { start("Q") } object Consumer extends App { while (true) { val response = (getQueue("Q")) send 'get println(response) } } object Producer extends App { var counter = 0 while (true) { (getQueue("Q")) send ('add, counter) counter += 1 } }
[magic unicorm land]
Teoria z lat 70-tych: uniwersalny prymityw do obliczeń równoległych
Rosnąca popularność: Twitter.
aktory vs aktorzy -- who cares!
Biblioteka Akka Actors http://akka.io/
Wersja: Akka 2.4.4 (Scala 2.11, Java 1.8)
class MyActor extends UntypedActor { public void onReceive(Object message) { // ??? } }
[java]
class MyActor extends Actor { def receive = { case message => ??? } }
[scala]
public static void main(String[] args) { final ActorSystem system = ActorSystem.create("MyActorSystem"); final ActorRef myActor = system.actorOf(Props.create(MyActor.class), "MyActor1"); system.terminate() }
[java]
object HashActor extends App { val system = ActorSystem("MyActorSystem") val hashActor = system.actorOf(Props[MyActor], "MyActor1") system.terminate }
[scala]
(fire and forget)
myActor.tell("Message", self) myActor.tell("Message", ActorRef.noSender())
[java]
myActor tell ("Message", self) myActor tell ("Message", Actor.noSender()) myActor ! "Message"
[scala]
Wykorzystanie:
(komunikacja zwrotna)
import akka.util.Timeout import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps import akka.pattern.ask import scala.concurrent.ExecutionContext.Implicits.global
implicit val timeout = Timeout(5 seconds) val future = hashActor ? "Tej!" val result = Await.result(future, Duration.Inf); println(result) future onSuccess { case response => println(response) }
[scala]
Wykorzystanie:
(komunikacja zwrotna)
Future future = ask(actor, "Tej!", 5000); Object response = Await.result(future, Duration.Inf()); System.out.println(response); OnSuccess handler = new OnSuccess() { public void onSuccess(Object result) { System.out.println(result); } }; future.onSuccess(handler, system.dispatcher());
[java]
Wykorzystanie:
Komunikacja zwrotna:
class MyActor extends UntypedActor { public void onReceive(Object message) { if (message instanceof String) { getSender().tell(((String) message).toUpperCase(), self); } else { undelivered(message); } } }
[java]
class MyActor extends Actor { def receive = { case message : String => sender ! message.toUpper } }
[scala]
Przekierowanie wiadomości:
class MyActor extends UntypedActor { public void onReceive(Object message) { if (message instanceof String) { ActorSelection actor = getContext().actorSelection("/user/MyActor2"); actor.tell(message, self); } else { undelivered(message); } } }
[java]
class MyActor extends Actor { def receive = { case message : String => { val actor = context.actorSelection("/user/MyActor2") actor ! message } } }
[scala]
Transparentne przekierowanie wiadomości:
class MyActor extends UntypedActor { public void onReceive(Object message) { if (message instanceof String) { ActorSelection actor = getContext.actorSelection("/user/MyActor2"); // actor.tell(message, sender); actor.forward(message); } else { undelivered(message); } } }
[java]
class MyActor extends Actor { def receive = { case message : String => { val actor = context.actorSelection("/user/MyActor2") actor forward message } } }
[scala]
public class HashActorJava extends UntypedActor { private static MessageDigest md5 = MessageDigest.getInstance("MD5"); public static final String digest(String input) { byte[] inputBytes = input.getBytes(); return String.valueOf(md5.digest(inputBytes)); } public void onReceive(Object input) { String message = (String) input; getSender().tell(digest(message), getSelf()); } public static void main(String[] args) throws Exception { final ActorSystem system = ActorSystem.create("HashSystem"); final ActorRef hashActor = system.actorOf(Props.create(HashActorJava.class), "HashActor"); Future future = ask(hashActor, "Tej!", 5000); Object response = Await.result(future, Duration.Inf()); System.out.println(response); system.terminate(); } }
class HashActor extends Actor { val md5 = MessageDigest.getInstance("MD5") def digest(input: String) = { val inputBytes = input.getBytes() java.lang.String.valueOf(md5.digest(inputBytes)) } def receive = { case input: String => sender ! digest(input) } } object HashActor extends App { val system = ActorSystem("HashSystem") val hashActor = system.actorOf(Props[HashActor], "HashActor") implicit val timeout = Timeout(5 seconds) val future = hashActor ? List("Tej!") val result = Await.result(future, Duration.Inf) println(result) system.terminate() }
Aktory są niezależne od lokalizacji, więc mogą być umieszczane w dowolnym wątku lub na dowolnym wierzchołku sieci.
Rozpraszanie systemu aktorów następuje przez plik konfiguracyjny application.conf.
akka { actor { provider = "akka.remote.RemoteActorRefProvider" # default: akka.actor.LocalActorRefProvider } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "10.0.0.10" # actor system starts here port = 9001 # actor system's port, 0 for automatic } } }
Dokumentacja: http://doc.akka.io/docs/akka/current/scala/remoting.html#remote-configuration-scala
Domyślnie używana jest konfiguracja w pliku nazwanym application.conf w korzeniu ścieżki projektu (np. w src/). Można wskazać inny plik konfiguracyjny przez argumenty JVM: -Dconfig.resource=custom.conf lub w kodzie programu jako argument przy tworzeniu systemu aktorów:
ActorSystem system = ActorSystem.create("SomethingSomething", ConfigFactory.load().getConfig("custom"));
[java]
val system = ActorSystem("SomethingSomething", ConfigFactory.load.getConfig("custom"))
[scala]
public static void main(String[] args) { final ActorSystem system = ActorSystem.create("MyActorSystem"); final ActorSelection myActor = system.actorSelection("akka.tcp://RemoteActorSystem@10.0.0.10:9010/user/HashActor1"); Future future = ask(hashActor, "Tej!", 5000); Object response = Await.result(future, Duration.Inf()); System.out.println(response); system.terminate() }
object RemoteClient extends App { val system = ActorSystem("LocalSystem") val remoteActor = system.actorSelection("akka.tcp://RemoteActorSystem@10.0.0.10:9010/user/HashActor1") implicit val timeout = Timeout(5 seconds) val future = remoteActor ? List("Tej!") val result = Await.result(future, Duration.Inf) println(result) system.terminate() }
(Wewnątrz aktora można użyć actorSelection z context.)
import akka.actor.Deploy import akka.remote.RemoteScope implicit val timeout = Timeout(10 seconds) val system = ActorSystem("LocalSystem") val actor = system.actorOf(Props[HashActor].withDeploy( Deploy("akka.tcp://RemoteActorSystem@10.0.0.11:9010/user/HashActor3") ))
protokoły: akka, akka.tcp
przestrzenie: user, system, deadletters, temp, remote
dostęp do ścieżki: self.path
wildcard: *, ?
ścieżki względne: ../mybrothertheactor1, ../*
Odkładanie wiadomości do przetworzenia później.
public class StashActor extends UntypedActorWithStash { // ... }
[java]
class StashActor extends Actor with Stash { // ... }
[scala]
metody: stash, unstashAll
Pełny przykład:
gitlab.cs.put.poznan.pl/ksiek/Actors2016
Examples/procrastinActor
Aktorzy jako maszyna stanów (become i unbecome):
class ABActor extends Actor { def receive: Receive = { case "msg A" => { // ... context.become(receiveA) } case "msg B" => { // ... context.become(receiveB) } } def receiveA: Receive = { case "msg A" => { // ... } case "msg B" => { // ... context.become(receiveB) } case _ => { context.unbecome() } } def receiveB : Receive = ??? }
[scala]
Kod:
gitlab.cs.put.poznan.pl/ksiek/Actors2016
Examples/statefulactor
Kod:
gitlab.cs.put.poznan.pl/ksiek/Actors2016
Examples/producerconsumer
Space | Forward |
---|---|
Right, Down, Page Down | Next slide |
Left, Up, Page Up | Previous slide |
P | Open presenter console |
H | Toggle this help |