Big Data and Distributed Computing

Consistent snapshots of global state

dr inż. Arkadiusz Danilecki

based on lectures by prof. J. Brzeziński

The outline

  1. Definitions
  2. Why we want to do it?
  3. Why it's difficult?
  4. Stop-and-sync
  5. Lamport algorithm
  6. Lai-Yang algorithm

The notation

$\Sigma^i, \Sigma(\tau)$ - The $i$th global state, the global state of the system at the time $\tau$

$P_i$ - $i$th process

$S_i$ - the state of the $i$th process

$a, b, c ...$ - the events

The notation

$a, b, c ...$ - the events

${\bf a}\mapsto {\bf b}\iff { \begin{cases} {\mbox{1)}} {\bf a} {\mbox{ and }} {\bf b } {\mbox{ are events in the same process and }} {\bf a} {\mbox{ precedes }} {\bf b} {\mbox{ OR }}\\ {\mbox{2)}} {\bf a} {\mbox{ is sending of message }}M,{\mbox{ and }} {\bf b} {\mbox{ is receiving of }} M, {\mbox{ OR }}\\ {\mbox{3)}} {\mbox{ there is sequence of events }} {\bf a}, \ldots x, y, z\ldots {\bf b} {\mbox{, such as for each }} x, y \\{\mbox{ in this sequence, we have either case 1) or 2) above.}}\end{cases}} $

What we want to achieve?

Gather the state of all processes in the system, creating a "snapshot" of a computation. We want to ensure the state will be useful (in an informal sense) or more formally consistent.

Why we want to do it?

  1. The post-mortem execution analysis and debugging
  2. Checking what's happening in the system
  3. System recovery using checkpoints

Unfortunately, creating a consistent state snapshot is difficult!

The first (naïve) approach

  1. Initiator sends to everyone a message to record their state at specific time
  2. Each process after getting a message memorizes when to record its state

The second (still naïve) approach

  1. When initiator determines when to record time, it takes into account maximum communication delays
  2. Initiator sends to everyone a message to record their state at specific time
  3. Each process after getting a message memorizes when to record its state

The third approach (you guess it right, it's still naïve)

  1. Initiator determines the time to record the state taking into account both communication times and the current clocks of each other process
  2. The rest is as before

Still wrong, if processes' clocks are run with different speed

Wait a minute! But do we even need a state from a specific time?

The fourth approach, where we play "let's assume" game

Let's assume perfectly synchronous system

  1. Initiator sends to everyone a message to record their state at specific time
  2. Each process after getting a message memorizes when to record its time

Haven't we forgotten about something?

Dealing with the problem: two possibilities

  1. We record the history of communication. i.e. all sent and received messages
  2. We record the channels' states

Dealing with the problem: two possibilities

  1. We record the history of communication. i.e. all sent and received messages
  2. If $P_i$ recorded that message $M$ was sent to $P_j$, and $P_j$ has not recorded the event of receiving $M$... then the message is still in transit (in the channel)

  3. We record the channels' states

Dealing with the problem: two possibilities

  1. We record the history of communication. i.e. all sent and received messages
  2. If $P_i$ recorded that message $M$ was sent to $P_j$, and $P_j$ has not recorded the event of receiving $M$... then the message is still in transit (in the channel)

  3. We record the channels' states
  4. sending special messages of the kind "I've already counted the students, so take that into an account" which would work nicely with FIFO students

Informal reasoning

In the states recorded in two nodes, we have the same student

The student left $P_j$ after the state was recorded - we missed the fact the student left afterwards

The student arrived to $P_i$ before the state was recorded - so the saved state had "seen" its arrival

Informal conclusion

The student left $P_j$ after the state was recorded - we missed the fact the student left afterwards

We recorded a state $S_j^l$ before the event a of sending $M$

The student arrived to $P_i$ before the state was recorded - so the saved state had "seen" its arrival

Informal conclusion

The student left $P_j$ after the state was recorded - we missed the fact the student left afterwards

We recorded a state $S_j^l$ before the event a of sending $M$

The student arrived to $P_i$ before the state was recorded - so the saved state had "seen" its arrival

In state $S_i^k$ we recorded the event b of receiving the message $M$

In terms of happened-before relation defined before, what is the relation between events $a$ and $b$?

Informal conclusion

The student left $P_j$ after the state was recorded - we missed the fact the student left afterwards

We recorded a state $S_j^l$ before the event a of sending $M$

The student arrived to $P_i$ before the state was recorded - so the saved state had "seen" its arrival

In state $S_i^k$ we recorded the event b of receiving the message $M$

In other words - we recorded state with event $a$, but not an event $b$ which casually preceeds $a$

Time for more formal specifications!

Cut and consistent cut

The cut ${\boldsymbol C}$ of set of events $\mathcal{E}$ is a set ${\mathit C}\subseteq \mathcal{E}$, such that we have at least one event from each process. The consistent cut is a cut where $$(a\in {\mathit C}\land b\mapsto a)\Rightarrow (b\in {\mathit c})$$

If an event $a$ belongs to a consistent cut, all events causally preceding $a$ also must belong to the cut.

Configuration

The configuation ${\boldsymbol {\mathit {\Gamma }}}$ is a vector of local states $\left\langle S_{1}^{k1},S_{2}^{k2},\cdots ,S_{n}^{kn}\right\rangle $ of all processes $P_{1},P_{2},\ldots ,P_{n}$, such that for all $u,1\leq u\leq n,S_{u}^{ku}\in {\mathcal {S}}_{u}$.

Cuts and configurations are equivalent, assuming that given a sequence of events, always the same final state will be reached.

The graphical representation

Consistent cut and a global state

Each consistent cut is equivalent to a global state in some theoretically possible execution of a program.

Consistent cut and a global state

Later and earlier cuts

We say that the cut ${\mathit C}$ is later than some cut ${\mathit D}$, if ${\mathit C}\subseteq {\mathit D}$. In other words, $\mathit D$ contains all events from $\mathit C$, and $\mathit C$ does not contain events which are not in $\mathit D$.

Cuts more informally

Minimal cut

Why we do it - checkpoints and the state recovery

Why we do it - checkpoints and the state recovery

Obviously, for a given application w can always develop a solution tailored towards that application, which would recover from a set of checkpoints which form a set representing theoretically inconsistent cut.

Why we do it - checkpoints and the state recovery

Stop-and-sync

Stop-and-sync

Idea - stop all processes and the record their states

... obviously we must ensure communication channels are empty.

Stop-and-sync


				    message stop, ready, save, saved, continue

				    local bool ~~start_i := false~~
				    local bool ~~flushed_i[n] := false~~
				    local bool ~~ready_{\alpha}[n] := false~~
				    local set of messages ~~log_i := \emptyset~~
				    

Stop-and-sync


				    when ~~{P}_{\alpha}~~ wants to take a snapshot
				       suspend application 
				       save local state 
				       ~~start_i~~ := true
				       ~~flushed_{\alpha}[\alpha]~~ := true
				       broadcast ~~stop~~ to all ~~{P}_{j\neq\alpha}\in\mathcal{P}~~
				    end when
				    

Stop-and-sync


when a message ~~stop~~ arrives at ~~{P}_{i}~~ from ~~{P}_{j}~~ do
	if not ~~start_i~~ then
	    suspend application 
	    save local state
	    ~~start_i~~ := true
	    ~~flushed_{i}[i]~~ := true
	    broadcast ~~stop~~ to all ~~{P}_{k}\in\mathcal{P}~~
	end if

	~~flushed_{i}[j]~~ := true
	if ~~\forall k: flushed_{i}[k] == \mbox{true}~~ then
	    save ~~log_{i}~~
	    send ready to ~~P_{\alpha}~~
	end if
end when
				    

Stop-and-sync


				when an application message ~~m~~ arrives at ~~{P}_{i}~~ from ~~{P}_{j}~~ do
					if ~~start_i~~ and not ~~flushed_i[j]~~ then
					    ~~log_{i}~~ := ~~log_i \cup m~~
					else if ~~start_i~~
					    delay ~~m~~
					else
					    deliver ~~m~~
					end if
				end when
				    

Stop-and-sync


				when a message ~~ready~~ arrives at ~~{P}_{\alpha}~~ from ~~{P}_{j}~~ do
					~~ready_{\alpha}[j] := \mbox{true}~~
					if ~~\forall k: ready_{\alpha}[k] == \mbox{true}~~ then
					    save ~~log_{\alpha}~~
					    broadcast ~~continue~~ to all ~~{P}_k\in\mathcal{P}~~ including itself
					end if
				end when

				when a message continue arrives at ~~{P}_{i}~~ do
					resume application
					~~start_i := \mbox{false}~~
					deliver delayed messages
				end when
				    

Stop-and-sync: The algorithm correctness

  1. Progress?
  2. Safety?

We will assume reliable FIFO communication channels and no node crashes

Stop-and-sync: The algorithm correctness

  1. Progress?
  2. All processes will eventually record their states and communication logs, and all will resume their execution.

  3. Safety?

Stop-and-sync: The algorithm correctness

  1. Safety?
  2. The recorded states will form a consistent cut (actually, consistent configuration)

  3. Progress?

Stop-and-sync: The algorithm correctness

  1. Safety?
  2. The recorded states will form a consistent cut (actually, consistent configuration)

    If $P_i$ has recorded the event of receiving $M$ from $P_j$, then $P_j$ recorded event of sending $M$

  3. Progress?
  4. Założenia?

Stop-and-sync: Progress

The initiator starts by sending a special message $stop$ to all processes. Since channels are reliable and processes do not crash, that message must eventually reach all processes.

According to algorithm, all processes then must sent same message to all other processes.

Since we assumed reliable channels, those $stop$ messages will reach their destination, so eventually $\forall k: flushed_i[k] == \mbox{true}$ for each $P_i$

Stop-and-sync: Progress

Hence, all processes will eventually record their state and will sent $ready$ to initiator.

Since channels are reliable, initiator will get $ready$ from all processes. This will cause the initiator to record its state and to send $continue$, and it finally will resume execution.

As before, $continue$ will reach its destination, so all other processes will resume their execution too. QED.

Stop-and-sync: Safety

We would have inconsistent cut (actually configuration,but let's leave that) iff we would include an event of receiving some message $M$ without including an event of sending $M$

To get inconsistent cut, the $P_i$ would have to record a state after receiving $M$ from $P_j$, sent after $P_j$ recorded its state

But process $P_j$ after recording state won't sent new messages until it will resume execution after receiving $continue$

Stop-and-sync: Safety

To get inconsistent cut, the $P_i$ would have to record a state after receiving $M$ from $P_j$, sent after $P_j$ recorded its state

But process $P_j$ after recording state won't sent new messages until it will resume execution after receiving $continue$

Initiator will sent $continue$ only when it will get $ready$ from all proceesses.

A process can send $ready$ only, when $\forall k: flushed_i[k] == \mbox{true}$ (and it will record its state earlier)

This condition is true only if $P_i$ gets $stop$ from every other process

Stop-and-sync: Safety

To get inconsistent cut, the $P_i$ would have to record a state after receiving $M$ from $P_j$, sent after $P_j$ recorded its state

But process $P_j$ after recording state won't sent new messages until it will resume execution after receiving $continue$

Initiator will sent $continue$ only when it will get $ready$ from all proceesses.

A process can send $ready$ only, when $\forall k: flushed_i[k] == \mbox{true}$ (and it will record its state earlier)

This condition is true only if $P_i$ gets $stop$ from every other process

A proces may send $stop$ only after recording its state

Stop-and-sync: Safety

To get inconsistent cut, the $P_i$ would have to record a state after receiving $M$ from $P_j$, sent after $P_j$ recorded its state

But process $P_j$ after recording state won't sent new messages until it will resume execution after receiving $continue$

Initiator will sent $continue$ only when it will get $ready$ from all proceesses.

A process can send $ready$ only, when $\forall k: flushed_i[k] == \mbox{true}$ (and it will record its state earlier)

This condition is true only if $P_i$ gets $stop$ from every other process

A proces may send $stop$ only after recording its state

So initiator will sent $continue$ only when all processes already recorded their state

Stop-and-sync: Safety

To get inconsistent cut, the $P_i$ would have to record a state after receiving $M$ from $P_j$, sent after $P_j$ recorded its state

But process $P_j$ after recording state won't sent new messages until it will resume execution after receiving $continue$

Initiator will sent $continue$ only when it will get $ready$ from all proceesses.

A process can send $ready$ only, when $\forall k: flushed_i[k] == \mbox{true}$ (and it will record its state earlier)

This condition is true only if $P_i$ gets $stop$ from every other process

So initiator will sent $continue$ only when all processes already recorded their state

Stop-and-sync: Safety

To get inconsistent cut, the $P_i$ would have to record a state after receiving $M$ from $P_j$, sent after $P_j$ recorded its state

But process $P_j$ after recording state won't sent new messages until it will resume execution after receiving $continue$

Initiator will sent $continue$ only when it will get $ready$ from all proceesses.

A process can send $ready$ only, when $\forall k: flushed_i[k] == \mbox{true}$ (and it will record its state earlier)

This condition is true only if $P_i$ gets $stop$ from every other process

So initiator will sent $continue$ only when all processes already recorded their state

No new message sent $M$ unless all processs recorded their state

Hence, it's impossible to include an event of receiving $M$ without including an event of sending $M$

Stop-and-sync: Safety

To get inconsistent cut, the $P_i$ would have to record a state after receiving $M$ from $P_j$, sent after $P_j$ recorded its state

But process $P_j$ after recording state won't sent new messages until it will resume execution after receiving $continue$

Initiator will sent $continue$ only when it will get $ready$ from all proceesses.

A process can send $ready$ only, when $\forall k: flushed_i[k] == \mbox{true}$ (and it will record its state earlier)

This condition is true only if $P_i$ gets $stop$ from every other process

So initiator will sent $continue$ only when all processes already recorded their state

No new message sent $M$ unless all processs recorded their state

Hence, it's impossible to include an event of receiving $M$ without including an event of sending $M$

Stop-and-sync: Do we lose any messages?

Processes are stopped, so they can't sent new messages before they receive $continue$ from initiator.

When initiator gets $ready$ from everyone, it means all channels are empty

Stop-and-sync: Do we lose any messages?

Since we assume reliable FIFO channels, then all messages sent by $P_j$ to $P_i$ before it sent $stop$ to $P_i$ must be received by $P_i$ before receiving $stop$.

So - either the messages were received before recording of the state, or will be included in log

If a message was sent $P_j$ after it resumed execution, but reached $P_i$ before it resumed - it's delayed (so it's not lost)

Stop-and-sync: communication channels' states

The channels' states can be derived from the logs (a message in log means it's in-transit wrt to the cut)

As an alternative, we can keep the history of all messages sent and received. In this approach log is not necessary, and we can calculate the channels' state as difference between histories of each process pair.

Stop-and-sync

  1. Easy to understand, intuitive
  2. Requires us to stop the computation

Chandy-Lamport's algorithm

Chandy-Lamport's algorithm

We do not want to stop the computation... Fortunately the Stop-and-sync can be easily modified to avoid that! In fact, we can simply remove unnecessary instructions.

Assumptions: Reliable FIFO channels, processes do not crash

Chandy-Lamport's algorithm


				    message marker

				    local bool ~~started_i := false~~
				    local bool ~~flushed_i[n] := false~~
				    local set of messages ~~log_i[n] := \left[\emptyset, \emptyset \ldots \emptyset \right]~~
				    

Chandy-Lamport's algorithm


				    when ~~{P}_{\alpha}~~ wants to take a snapshot
				       save local state 
				       ~~started_i~~ := true
				       ~~flushed_{\alpha}[\alpha]~~ := true
				       ~~\forall C_{i,j} \in \mathcal{C}_i~~ broadcast ~~marker~~ alongside ~~C_{i,j}~~
				    end when
				    

Chandy-Lamport's algorithm


when a message ~~marker~~ arrives at ~~{P}_{i}~~ from ~~{P}_{j}~~ do
	if not ~~start_i~~ then
	    save local state
	    ~~started_i~~ := true
	    ~~flushed_{i}[i]~~ := true
	    ~~\forall C_{i,j} \in \mathcal{C}_i~~ broadcast ~~marker~~ alongside ~~C_{i,j}~~
	end if

	~~flushed_{i}[j]~~ := true
	if ~~\forall k: flushed_{i}[k] == \mbox{true}~~ then
	    send state and ~~log_i~~ to ~~P_{\alpha}~~
	end if
end when
				    

Chandy-Lamport's algorithm


				when an application message ~~m~~ arrives at ~~{P}_{i}~~ from ~~{P}_{j}~~ do
					if ~~started_i~~ and not ~~flushed_i[j]~~ then
					    ~~log_{i}~~ := ~~log_i \cup m~~
					end if 
					deliver ~~m~~
				end when
				    

Chandy-Lamport's algorithm: the complexity

Let a directed graph corresponding to the topology of distributed processing be characterized by the following parameters:

  • m – number of edges (one-direction channels),
  • d – diameter of the graph,
  • l – the length of the longest path.

The time complexity of Chandy-Lamport's is $d + 1$, while communication complexity is $2m$.

Chandy-Lamport's algorithm: The recorded state and real global state

What we did was we created a snapshot of the system. Is it useful?

Informally - the global state $\Sigma^l$ can be reached from global state $\Sigma^k$, if there is a theoretically possible run (the program execution) starting from $\Sigma^k$ and ending with $\Sigma^l$.

We write it as $\Sigma^k \rightsquigarrow \Sigma^l$

Chandy-Lamport's algorithm: The recorded state and real global state

If we started taking the snapshot in $\tau _{b}$ and we ended at $\tau _{e}$, then the global state at $\tau _{e}$, is reachable from the snapshot, and the snapshot is reachable from $\tau _{b}$. $$ {\mathit {\Sigma }}(\tau _{b})\rightsquigarrow {\mathit {\Gamma }}\rightsquigarrow {\mathit {\Sigma }}(\tau _{e})$$

Hmm... Ale co to nam daje?

Global predicates

Predicate - a function returning True or False. The global predicate answers questions related to the whole system:

  1. Has computation ended?
  2. Do we have deadlocks?
  3. Is there someone in the critical section?
  4. Are there idle processes?

Stable and unstable predicates

The stable predicate - basically: once it's true, it will be always true

  1. Has computation ended?
  2. Do we have deadlocks?

The unstable predicates - it can become false again

  1. Is there someone in the critical section?
  2. Are there idle processes?

Chandy-Lamport's algorithm: stable predicates

If we determine that predicate is true in a consistent snapshot we recorded, it's also true in all global states reachable from the snapshot (and you remember it means also the global state at the time we finished recording the snapshot, right?).

Lai-Yang algorithm

Lai-Yang algorithm

We will tag all the messages and processes with two colours

Assumptions: reliable NONFIFO communication channels, processes do not crash.

Lai-Yang algorithm


				    enum color = { white, red }
				    message $m$ is a struct $\left\langle \mbox{data } appdata, \mbox{enum color} color \right\rangle$

				    local enum color procColor_i := white
				    local bool ~~started_i := false~~
				    local bool ~~flushed_i[n] := false~~
				    local set of messages ~~sentLog_i[n] := \left[\emptyset, \emptyset \ldots \emptyset \right]~~
				    local set of messages ~~recvLog_i[n] := \left[\emptyset, \emptyset \ldots \emptyset \right]~~
				    

Lai-Yang algorithm


				    when ~~{P}_{\alpha}~~ wants to take a snapshot
				       save local state, $sentLog_{\alpha}$, and $recvLog_{\alpha}$
				       ~~procColor~~ := red

				       broadcast ~~\left\langle \emptyset, \mbox{red} \right\rangle~~ to all ~~P_{j\neq\alpha,j}\in\mathcal{P}~~
				    end when
				    

Lai-Yang algorithm


when a message ~~m~~ arrives at ~~{P}_{i}~~ from ~~{P}_{j}~~ do
	if ~~m.color = \mbox{red} \land procColor_i = \mbox{white}~~ then 
	    save local state, $sentLog_{\alpha}$, and $recvLog_{\alpha}$ 
	    ~~procColor~~ := red
	end if

	if ~~m.appdata \neq \emptyset~~ then 
	    ~~recvLog_i := recvLog_i \cup m~~
	    deliver ~~m~~
	end if
end when
				    

Lai-Yang algorithm


				when process ~~P_i~~ wants to sent a message ~~m~~ to ~~{P}_{j}~~ do
					~~sentLog_{i}~~ := ~~sentLog_i \cup m~~
					~~m.color := procColor_i~~
					sent ~~m~~ to ~~P_j~~
				end when
				    

Lai-Yang algorithm

Let's check whether you were paying attention: how we find out about the communication channels' states?

Next two lectures

The consensus: fundamental and arguably one of most important problem in Distributed Systems