{- Personal Assistant np pa_discon.pi -} import "Nstd/Map" type SiteL = (rec L = [e>[] l>[Site L]]) type Error = ^String program par : [SiteL Error] = ( val [sites err] = par new registPA : ^[ String Agent ] new summonPA : ^[ String Agent Site ] new moveOn : ^Site new notFound : ^String new mid : ^String new ack : ^[] agent NameServer = (new names : ^(Map String Agent) def eq (a:String b:String) : Bool = (==$ a b) ( names ! (map.make eq) | registPA ?* [descr PA] = names?m = names!(map.add m descr PA) | summonPA ?* [descr Su s] = names?m = ( switch (map.lookup m descr) of ( Found> PA : Agent -> ( moveOn @ PA !s | print!(+$ descr " found.")) NotFound> _:[] -> notFound @ Su ! descr) | names!m))) def spawnSummoners l:SiteL = switch l of ( e>[] -> () l>[s:Site r:SiteL] -> (agent Summoner = (migrate to s run print!"MIGRATED!" def loop [] = ( val key = (sys.read "summon PA : ") if (==$ key "DISCON") then (do "disconnect" [] in loop![]) else if (==$ key "CON") then (do "connect to" s in loop![]) else (summonPA@NameServer![key Summoner s] | notFound?key= print!(+$ key " not found!") | loop![]) ) loop![]) spawnSummoners!r)) agent PA1 = ( registPA @ NameServer ! ["pawel" PA1] | mid?*d = print!(+$ "pawelPA$ Incoming: " d) | moveOn ?* s = (migrate to s ( print!"pawelPA$ I am here, Pawel." | (val d = (sys.read "pawelPA$ Write message for Peter: ") mid @ PA2 ! d)))) and PA2 = ( registPA @ NameServer ! ["peter" PA2] | mid?*d = print!(+$ "peterPA$ Incoming: " d) | moveOn ?* s = (migrate to s ( print!"peterPA$ I am here, Peter." | () {- this below can't work since agent..and.. is currently encoded as agent ... agent ... and PA1 isn't visible (val d = (sys.read "peterPA$ Write message for Pawel: ") mid @ PA1 ! d) -} ))) run err?*status=print!status spawnSummoners!sites ) {-********************************************************************* * Copyright (c) 1998-2001 Pawel T. Wojciechowski * * Partition-aware Query Server with Caching * *********************************************************************-} {- Assumptions and requirements: An agent can disconnect their site from QSC and later reconnect message are transparently delivered irrespective of agent migrations and site disconnections no messages are ever lost no duplicate messages are ever received agent migration and registration are not transparent - an exception is raised if the site where the operation was invoked is disconnected; migration to a site which is disconnected is blocked until the site is reconnected Restrictions of the algorithm presented: (1) The algorithm has serious drawback: a site disconnection blocks *all* communication (and migration) which involves QSC (also between agents which are not on the disconnected site..); only one site can be disconnected at a time a more realistic algorithm would block operations which involve disconnected sites only and allow many sites to disconnect in the same time - this would require "toggles" and little locks in maps and a new map of sites to block migrations to disconnected sites (2) agent migration and registration can also fail if there is a slow connection to QSC (as opposite to message delivery which is always correct) a local daemon has exact knowledge if there is connection to QSC or not, so we can improve the algorithm by synchronising the agent migration and registration with the local daemon (we omit this here as not trivial, e.g. deadlock with disconnection requests must be avoided) The algorithm is to illustrate the use of : - timeout mechanism - replicated messages (exactly one message must be delivered) - a rudimentary module system -} {Agent} = Agent {Site} = [Site Agent] type SiteTy = [Site Agent] type Id = ^SiteTy new register : ^[Agent SiteTy] new migrating : ^[Agent ^[]] new migrated : ^[Site Agent] new message : ^[#X Agent Site Agent ^X X ^SiteTy] new try_message : ^[#X Agent ^X X] new update : ^[Agent [Site Agent]] new buffer : ^(Map Agent SiteTy) new disconnect : ^Agent new connect : ^[Agent Site Agent] new block : ^[Agent Site] new unblock : ^[Agent Site] new ack : ^[] new currentloc : ^[Site Agent] new msg : ^[#Y Id ^Y Y] {toplevel P prog_par}[Agent Agent Site Int ^String] = ( new daemondaemon : ^Site new nd : ^[Site Agent] new err : ^String {- initial values -} val s0 = (sys.get_site 2) val s1 = (sys.get_site 1) val s2 = (sys.get_site 0) val nil:SiteL = (rec {e>[]}) val t = 5 {- timeout in sec. -} agent a = ( val SQ = s0 {- Query Server -} agent Q = ( migrate to SQ run print!"QSC installed." new lock : ^(Map Agent SiteTy) def eq (a:Agent b:Agent) : Bool = (sys.== #Agent a b) ( nd![SQ Q] | lock!(map.make eq) {- initialise the lock -} | register?*[a [S DS]]= {- register a new agent -} lock?m= ( lock!(map.add m a [S DS]) | ack![]) | migrating?*[a : Agent ack : ^[]] = {- lock during a migration -} lock?m= switch (map.lookup m a) of ( Found> [S : Site DS : Agent] -> ( ack![] | migrated?[S' DS'] = ( lock!(map.add m a [S' DS']) | ack![])) NotFound> _:[] -> ()) | message?*[#X DU U a:Agent c:^X v:X dack:^SiteTy]= {- deal with a lost message -} lock?m= switch (map.lookup m a) of ( {Found> [R : Site DR : Agent]} -> ( dack![R DR] | message![Q SQ a c v dack] | dack?_ = lock!(map.add m a [R DR])) {- | dack![R DR] -} {NotFound> _:[]} -> ()) | block?*[a:Agent S:Site]= lock?m= ( ack![] | buffer!m ) | unblock?*[a:Agent S:Site]= buffer?m= ( lock!m | ack![]) )) (daemondaemon?*S:Site= {- launch a daemon D on site S -} (agent D = {- the daemon body -} (migrate to S run print!"Daemon installed." new lock : ^(Map Agent SiteTy) def eq (a:Agent b:Agent) : Bool = (sys.== #Agent a b) def send_message [#X Q:Agent SQ:Site D:Agent S:Site a:Agent c:^X v:X m:(Map Agent SiteTy)] = (new dack : ^SiteTy ( message![D S a c v dack] | dack?s= lock!(map.add m a s ))) ( nd![S D] {- ack the daemon installation -} | lock!(map.make eq) | try_message?*[#X a:Agent c:^X v:X]= lock?m= switch (map.lookup m a) of ( {Found> [R : Site DR : Agent]} -> (new dack : ^SiteTy ( message![D S a c v dack] | wait dack?s= lock!(map.add m a s) timeout t -> send_message![Q SQ D S a c v m])) {NotFound> _:[]} -> send_message![Q SQ D S a c v m]) | message?*[#X DU:Agent U:Site a:Agent c:^X v:X dack:^SiteTy] = iflocal msg![dack c v] then dack![S D] else lock?m= ( message![D S a c v dack] | dack?s= ( lock!(map.add m a s) | dack!s )) | disconnect?*a= lock?m= ( buffer!m | block![a S]) | connect?*[a _ _]= {- connect and unblock pending communications -} buffer?m= ( unblock![a S] | lock!m) )) ()) | nd?_= (daemondaemon!s1 | nd?s1= (daemondaemon!s2 | nd?s2= (val sl : SiteL = (rec {l> [s1 {l> [s2 nil]}]}) val {prog_par} = [sl err] val par = [a Q SQ t err] ( currentloc!s1 | {P}par) )))) ) ()) { agent b = P in P' }par = run currentloc?[S DS]= (val [a Q SQ t err] = par agent B = (val {b} = B new msglog : ^(Map Id []) def eq(a : Id b : Id) : Bool = (sys.== #Id a b) ( register![B [S DS]] | wait ack?_= iflocal ack![] then ( currentloc![S DS] | (val par = [B Q SQ t err] {P}par)) else () timeout t -> ( ack![] | err!"No connection.") {- raise exception here -} | msglog!(map.make eq) | msg?*[#X id c v] = msglog?m = switch (map.lookup m id) of ( {NotFound> _:[]} -> (c!v | msglog!(map.add m id [])) {Found>_:Id} -> msglog!m))) {- ignore duplicate msg -} in val {b} = B ack?_= ( currentloc![S DS] | ({P'}par()))) { migrate to u P }par = run currentloc?[S DS]= (val [a Q SQ t err] = par val [U DU] = u new mack : ^[] ( migrating![a mack] | wait mack?_ = (migrate to U ( migrated![U DU] | mack?_ = ( currentloc![U DU] | {P}par))) timeout t -> ( currentloc![S DS] | mack?_ = {- executed only if timeout was caused by slow communication -} migrated![S DS] {- release lock in QSC and ignore mack from Q -} | err!"No connection." {- raise exception here -} | {P}par) )) { c @ b ! v }par = iflocal <{b}>{c}!{v} then () else currentloc?[S DS]= iflocal try_message![b c {v}] then currentloc![S DS] else () { "disconnect" foo in P }par = run currentloc?[S DS]= (val [a _ _ _ _] = par iflocal disconnect!a then ack?_= ( currentloc![S DS] | print!"Now, you can safely disconnect your computer." | {P}par) else ()) { "connect to" s in P }par = run currentloc?[S DS]= (val [a _ _ _ _] = par val [SQ:Site Q:Agent] = {s} iflocal connect![a SQ Q] then ack?_= ( currentloc![S DS] | print!"OK.connected." | {P}par) else ())