{-
Personal Assistant
np pa_discon.pi
-}
import "Nstd/Map"
type SiteL = (rec L = [e>[] l>[Site L]])
type Error = ^String
program par : [SiteL Error] =
(
val [sites err] = par
new registPA : ^[ String Agent ]
new summonPA : ^[ String Agent Site ]
new moveOn : ^Site
new notFound : ^String
new mid : ^String
new ack : ^[]
agent NameServer =
(new names : ^(Map String Agent)
def eq (a:String b:String) : Bool = (==$ a b)
( names ! (map.make eq)
| registPA ?* [descr PA] = names?m =
names!(map.add m descr PA)
| summonPA ?* [descr Su s] = names?m =
( switch (map.lookup m descr) of (
Found> PA : Agent -> ( moveOn @ PA !s
| print!(+$ descr " found."))
NotFound> _:[] -> notFound @ Su ! descr)
| names!m)))
def spawnSummoners l:SiteL =
switch l of (
e>[] -> ()
l>[s:Site r:SiteL] ->
(agent Summoner =
(migrate to s
run print!"MIGRATED!"
def loop [] =
(
val key = (sys.read "summon PA : ")
if (==$ key "DISCON") then
(do "disconnect" [] in loop![])
else if (==$ key "CON") then
(do "connect to" s in loop![])
else
(summonPA@NameServer![key Summoner s]
| notFound?key= print!(+$ key " not found!")
| loop![])
)
loop![])
spawnSummoners!r))
agent PA1 =
( registPA @ NameServer ! ["pawel" PA1]
| mid?*d = print!(+$ "pawelPA$ Incoming: " d)
| moveOn ?* s =
(migrate to s
( print!"pawelPA$ I am here, Pawel."
| (val d = (sys.read "pawelPA$ Write message for Peter: ")
mid @ PA2 ! d))))
and PA2 =
( registPA @ NameServer ! ["peter" PA2]
| mid?*d = print!(+$ "peterPA$ Incoming: " d)
| moveOn ?* s =
(migrate to s
( print!"peterPA$ I am here, Peter."
| ()
{- this below can't work since agent..and.. is currently encoded as
agent ... agent ... and PA1 isn't visible
(val d = (sys.read "peterPA$ Write message for Pawel: ")
mid @ PA1 ! d) -}
)))
run err?*status=print!status
spawnSummoners!sites
)
{-*********************************************************************
* Copyright (c) 1998-2001 Pawel T. Wojciechowski *
* Partition-aware Query Server with Caching *
*********************************************************************-}
{-
Assumptions and requirements:
An agent can disconnect their site from QSC and later reconnect
message are transparently delivered irrespective of agent
migrations and site disconnections
no messages are ever lost
no duplicate messages are ever received
agent migration and registration are not transparent - an exception
is raised if the site where the operation was invoked is disconnected;
migration to a site which is disconnected is blocked until the site
is reconnected
Restrictions of the algorithm presented:
(1)
The algorithm has serious drawback: a site disconnection blocks
*all* communication (and migration) which involves QSC (also
between agents which are not on the disconnected site..); only
one site can be disconnected at a time
a more realistic algorithm would block operations which involve
disconnected sites only and allow many sites to disconnect in the
same time - this would require "toggles" and little locks in maps
and a new map of sites to block migrations to disconnected sites
(2)
agent migration and registration can also fail if there is a slow
connection to QSC (as opposite to message delivery which is always
correct)
a local daemon has exact knowledge if there is connection to QSC
or not, so we can improve the algorithm by synchronising the agent
migration and registration with the local daemon (we omit this here
as not trivial, e.g. deadlock with disconnection requests must
be avoided)
The algorithm is to illustrate the use of :
- timeout mechanism
- replicated messages (exactly one message must be delivered)
- a rudimentary module system
-}
{Agent} = Agent
{Site} = [Site Agent]
type SiteTy = [Site Agent]
type Id = ^SiteTy
new register : ^[Agent SiteTy]
new migrating : ^[Agent ^[]]
new migrated : ^[Site Agent]
new message : ^[#X Agent Site Agent ^X X ^SiteTy]
new try_message : ^[#X Agent ^X X]
new update : ^[Agent [Site Agent]]
new buffer : ^(Map Agent SiteTy)
new disconnect : ^Agent
new connect : ^[Agent Site Agent]
new block : ^[Agent Site]
new unblock : ^[Agent Site]
new ack : ^[]
new currentloc : ^[Site Agent]
new msg : ^[#Y Id ^Y Y]
{toplevel P prog_par}[Agent Agent Site Int ^String] =
(
new daemondaemon : ^Site
new nd : ^[Site Agent]
new err : ^String
{- initial values -}
val s0 = (sys.get_site 2)
val s1 = (sys.get_site 1)
val s2 = (sys.get_site 0)
val nil:SiteL = (rec {e>[]})
val t = 5 {- timeout in sec. -}
agent a =
(
val SQ = s0
{- Query Server -}
agent Q =
(
migrate to SQ
run print!"QSC installed."
new lock : ^(Map Agent SiteTy)
def eq (a:Agent b:Agent) : Bool = (sys.== #Agent a b)
( nd![SQ Q]
| lock!(map.make eq) {- initialise the lock -}
| register?*[a [S DS]]= {- register a new agent -}
lock?m=
( lock!(map.add m a [S DS])
| ack![])
| migrating?*[a : Agent ack : ^[]] = {- lock during a migration -}
lock?m= switch (map.lookup m a) of (
Found> [S : Site DS : Agent] ->
( ack![]
| migrated?[S' DS'] =
( lock!(map.add m a [S' DS'])
| ack![]))
NotFound> _:[] -> ())
| message?*[#X DU U a:Agent c:^X v:X dack:^SiteTy]= {- deal with a lost message -}
lock?m= switch (map.lookup m a) of (
{Found> [R : Site DR : Agent]} ->
( dack![R DR]
| message![Q SQ a c v dack]
| dack?_ = lock!(map.add m a [R DR]))
{- | dack![R DR] -}
{NotFound> _:[]} -> ())
| block?*[a:Agent S:Site]=
lock?m= ( ack![]
| buffer!m )
| unblock?*[a:Agent S:Site]=
buffer?m= ( lock!m
| ack![])
))
(daemondaemon?*S:Site= {- launch a daemon D on site S -}
(agent D = {- the daemon body -}
(migrate to S
run print!"Daemon installed."
new lock : ^(Map Agent SiteTy)
def eq (a:Agent b:Agent) : Bool = (sys.== #Agent a b)
def send_message [#X Q:Agent SQ:Site D:Agent S:Site
a:Agent c:^X v:X m:(Map Agent SiteTy)] =
(new dack : ^SiteTy
( message![D S a c v dack]
| dack?s= lock!(map.add m a s )))
( nd![S D] {- ack the daemon installation -}
| lock!(map.make eq)
| try_message?*[#X a:Agent c:^X v:X]=
lock?m= switch (map.lookup m a) of (
{Found> [R : Site DR : Agent]} ->
(new dack : ^SiteTy
( message![D S a c v dack]
| wait
dack?s= lock!(map.add m a s)
timeout t ->
send_message![Q SQ D S a c v m]))
{NotFound> _:[]} -> send_message![Q SQ D S a c v m])
| message?*[#X DU:Agent U:Site a:Agent c:^X v:X dack:^SiteTy] =
iflocal msg![dack c v] then dack![S D]
else lock?m= ( message![D S a c v dack]
| dack?s= ( lock!(map.add m a s)
| dack!s ))
| disconnect?*a=
lock?m= ( buffer!m
| block![a S])
| connect?*[a _ _]= {- connect and unblock pending communications -}
buffer?m= ( unblock![a S]
| lock!m) ))
())
| nd?_=
(daemondaemon!s1 | nd?s1=
(daemondaemon!s2 | nd?s2=
(val sl : SiteL = (rec {l> [s1 {l> [s2 nil]}]})
val {prog_par} = [sl err]
val par = [a Q SQ t err]
( currentloc!s1
| {P}par)
))))
)
())
{ agent b = P in P' }par = run
currentloc?[S DS]= (val [a Q SQ t err] = par
agent B =
(val {b} = B
new msglog : ^(Map Id [])
def eq(a : Id b : Id) : Bool = (sys.== #Id a b)
( register![B [S DS]]
| wait ack?_= iflocal ack![] then
( currentloc![S DS]
| (val par = [B Q SQ t err]
{P}par))
else ()
timeout t -> ( ack![]
| err!"No connection.") {- raise exception here -}
| msglog!(map.make eq)
| msg?*[#X id c v] = msglog?m =
switch (map.lookup m id) of (
{NotFound> _:[]} -> (c!v | msglog!(map.add m id []))
{Found>_:Id} -> msglog!m))) {- ignore duplicate msg -}
in
val {b} = B
ack?_= ( currentloc![S DS]
| ({P'}par())))
{ migrate to u P }par = run
currentloc?[S DS]= (val [a Q SQ t err] = par
val [U DU] = u
new mack : ^[]
( migrating![a mack]
| wait mack?_ = (migrate to U
( migrated![U DU]
| mack?_ = ( currentloc![U DU]
| {P}par)))
timeout t ->
( currentloc![S DS]
| mack?_ = {- executed only if timeout was caused by slow
communication -}
migrated![S DS] {- release lock in QSC and ignore mack from Q -}
| err!"No connection." {- raise exception here -}
| {P}par)
))
{ c @ b ! v }par =
iflocal <{b}>{c}!{v} then ()
else currentloc?[S DS]=
iflocal try_message![b c {v}] then
currentloc![S DS]
else ()
{ "disconnect" foo in P }par = run
currentloc?[S DS]= (val [a _ _ _ _] = par
iflocal disconnect!a then
ack?_= ( currentloc![S DS]
| print!"Now, you can safely disconnect your computer."
| {P}par)
else ())
{ "connect to" s in P }par = run
currentloc?[S DS]= (val [a _ _ _ _] = par
val [SQ:Site Q:Agent] = {s}
iflocal connect![a SQ Q] then
ack?_= ( currentloc![S DS]
| print!"OK.connected."
| {P}par)
else ())