Big Data and Distributed Computing

Consistent snapshots of global state

dr inż. Arkadiusz Danilecki

W oparciu o wykłady prof. J. Brzezińskiego

The outline wykładu

  1. Definitions
  2. Why we want to do it?
  3. Why it's difficult?
  4. Stop-and-sync
  5. Lamport algorithm
  6. Lai-Yang algorithm

The notation

$\Sigma^i, \Sigma(\tau)$ - The $i$th global state, the global state of the system at the time $\tau$

$P_i$ - $i$th process

$S_i$ - the state of the $i$th process

$a, b, c ...$ - the events

The notation

$a, b, c ...$ - the events

${\bf a}\mapsto {\bf b}\iff { \begin{cases} {\mbox{1)}} {\bf a} {\mbox{ and }} b {\mbox{ are events in the same process and }} a {\mbox{ precedes }} b {\mbox{ OR }}\\ {\mbox{2)}} {\bf a} {\mbox{ is sending of message }}M,\\ {\mbox{ and }} {\bf b} {\mbox{ is receivin of }} M, {\mbox{ OR }}\\ {\mbox{3)}}{\mbox{ there is sequence of events }} {\bf a}, i, j \ldots x, y, z\ldots {\bf b} {\mbox{, such as for each }} x, y \\{\mbox{ in this sequence, we have either case 1) or 2) above.}}\end{cases}}$

Why we want to do it?

  1. The post-mortem execution analysis and debugging
  2. Checking what's happening in the system
  3. System recovery using checkpoints

Unfortunately, creating a consistent state snapshot is difficult!

The first (naïve) approach

  1. Initiator sends to everyone a message to record their state at specific time
  2. Each process after getting a message memorizes when to record its time

Podejście drugie

  1. Proces inicjator ustala czas wyznaczenia stanu uwzględniając maksymalny czas przesłania wiadomości
  2. Proces inicjator wysyła do wszystkich wiadomość, by wyznaczyli stan o konkretnej godzinie
  3. Każdy proces po otrzymaniu wiadomości zapamiętuje kiedy zapisuje stan

Podejście trzecie

  1. Proces inicjator ustala czas wyznaczenia stanu uwzględniając maksymalny czas przesłania wiadomości... osobno dla każdego procesu, uwzględniając jego lokalny zegar
  2. Proces inicjator wysyła do wszystkich wiadomość, by wyznaczyli stan o konkretnej godzinie

Wszystko i tak się zepsuje, jeżeli zegary chodzą z różną prędkością

Zaraz! Czy my naprawdę musimy wyznaczyć stan o konkretnej godzinie??

Podejście czwarte

A co jeżeli zegary i czasy komunikacji byłyby idealnie zsynchronizowane?

  1. Proces inicjator wysyła do wszystkich wiadomość, by zapisali stan o konkretnej godzinie
  2. Procesy po otrzymaniu wiadomości zapamiętują kiedy zapisać stan

Czy o czymś nie zapomnieliśmy?

Dwa możliwe podejścia

  1. Zapisujemy historię komunikacji, a więc log wszystkich wysłanych i odebranych wiadomości
  2. Wyznaczamy stan kanałów

Dwa możliwe podejścia

  1. Zapisujemy historię komunikacji, a więc log wszystkich wysłanych i odebranych wiadomości
  2. Jeżeli w logu procesu $P_i$ mamy zdarzenie wysłania wiadomości do procesu $P_j$, a w logu procesu $P_j$ nie zdarzenia odebrania tej wiadomości... to wiadomość jest w kanale

  3. Wyznaczamy stan kanałów

Dwa możliwe podejścia

  1. Zapisujemy historię komunikacji, a więc log wszystkich wysłanych i odebranych wiadomości
  2. Jeżeli w logu procesu $P_i$ mamy zdarzenie wysłania wiadomości do procesu $P_j$, a w logu procesu $P_j$ nie zdarzenia odebrania tej wiadomości... to wiadomość jest w kanale

  3. Wyznaczamy stan kanałów
  4. Moglibyśmy wysłać wiadomość typu "ja już zapisałem liczbę studentów, więc jeżeli policzyłeś u siebie i ktoś do Ciebie doszedł, to u mnie nie są policzeni"

Nieformalny wniosek

W obu stanach mamy zapisanego tego samego studenta.

Student wyszedł od $P_j$ po zapisaniu stanu - nie zapisaliśmy faktu jego wyjścia

Student przyszedł do $P_i$ przed zapisaniem stanu - zapisaliśmy fakt jego przyjścia

Nieformalny wniosek

Student wyszedł od $P_j$ po zapisaniu stanu - nie zapisaliśmy faktu jego wyjścia

W stanie $S_j^l$ nie ma zapisanego zdarzenia wysłania wiadomości $m$ (to byłby jakiś stan $S_j^{(l+1)}$

Student przyszedł do $P_i$ przed zapisaniem stanu - zapisaliśmy fakt jego przyjścia

Nieformalny wniosek

Student wyszedł od $P_j$ po zapisaniu stanu - nie zapisaliśmy faktu jego wyjścia

W stanie $S_j^l$ nie ma zapisanego zdarzenia wysłania wiadomości $m$ (to byłby jakiś stan $S_j^{(l+1)}$

Student przyszedł do $P_i$ przed zapisaniem stanu - zapisaliśmy fakt jego przyjścia

W stanie $S_i^k$ jest zapisane zdarzenia odbioru wiadomości $m$

Albo inaczej - zapisaliśmy stan $S_i^k$, a nie zapisaliśmy stanu $S_j^{(l+1)}$, od którego $S_i^k$ zależy

Czas na ujęcie tego w formalne definicje

Konfiguracja

Konfiguracja ${\boldsymbol {\mathit {\Gamma }}}$ jest wektorem $\left\langle S_{1}^{k1},S_{2}^{k2},\cdots ,S_{n}^{kn}\right\rangle $ stanów lokalnych (historii lokalnych) wszystkich procesów $P_{1},P_{2},\ldots ,P_{n}$, takim że dla każdego $u,1\leq u\leq n,S_{u}^{ku}\in {\mathcal {S}}_{u}$.

Konfiguracja spójna

Konfigurację $ \Gamma $ nazwiemy konfiguracją spójną lub obrazem spójnym jeżeli $\forall E\forall E'$ zachodzi: $$(E'\in \Gamma \land E\mapsto E')\Rightarrow (E\in \Gamma )$$ Warunek ten oznacza, że jeśli jakieś zdarzenie $E'$ jest elementem konfiguracji spójnej $ \Gamma $ (to znaczy, jeżeli $ E'\in \Gamma $), to również wszystkie zdarzenia $E$ , od których $E'$ przyczynowo zależy (czyli $E\mapsto E'$) również są elementami tej konfiguracji spójnej.

Odcięcie spójne

Odcięciem ${\boldsymbol {\mathit {\Psi }}}$ (albo: obcięciem) zbioru zdarzeń $\mathcal{E}$ nazwiemy skończony zbiór ${\mathit {\Psi }}\subseteq \mathcal{E}$, taki że: $$(E'\in {\mathit {\Psi }}\land E\mapsto _{i}E')\Rightarrow (E\in {\mathit {\Psi }})$$ Definicja ta mówi, że jeżeli jakieś zdarzenie $E'$ należy do odcięcia, to także wszystkie zdarzenia $E$ które są lokalnie poprzedzane przez $E'$ należą do tego odcięcia.

Konfiguracja a odcięcie

Konfiguracja - wektor stanów, odcięcie - zbiór zdarzeń.

W praktyce używamy często wymiennie i piszemy "zdarzenie należące do konfiguracji" (jak w definicji wyżej).

Linia odcięcia

Odcięcie spójne a stan globalny

Każdemu odcięciu spójnemu (wzgl. konfiguracji spójnej) wyznaczonym w wykonaniu $\Upsilon ^i$ odpowiada stan globalny $\Sigma(\tau)$ w jakimś teoretycznie możliwym wykonaniu $\Upsilon ^j$.

Odcięcie spójne a stan globalny

Odcięcie późniejsze lub wcześniejsza

Powiemy, że odcięcie ${\mathit {\Psi }}_{2}$ jest późniejsze od odcięcia ${\mathit {\Psi }}_{1}$, jeżeli ${\mathit {\Psi }}_{1}\subseteq {\mathit {\Psi }}_{2}$. Oznacza to, że odcięcie ${\mathit {\Psi }}_{2}$ nie zawiera żadnego takiego zdarzenia $E'$ nie należącego do ${\mathit {\Psi }}_{1}$, które by lokalnie poprzedzało jakiekolwiek zdarzenie $E$ znajdujące się w odcięciu ${\mathit {\Psi }}_{1}$.

Linia odcięcia nieformalnie

Minimalne odcięcie

Dygresja - odtwarzanie stanu

Dygresja - odtwarzanie stanu

Posiadając wiedzę na temat działania konkretnej aplikacji, można oczywiście zapisywać stan formalnie niespójny

Dygresja - odtwarzanie stanu

Stop-and-sync

Stop-and-sync

Idea - zatrzymujemy wszystkie procesy

... oczywiście musimy przepłukać kanały.

Stop-and-sync


				    message stop, ready, save, saved, continue

				    local bool ~~start_i := false~~
				    local bool ~~flushed_i[n] := false~~
				    local bool ~~ready_{\alpha}[n] := false~~
				    local set of messages ~~log_i := \emptyset~~
				    

Stop-and-sync


				    when ~~{P}_{\alpha}~~ wants to take a snapshot
				       suspend application 
				       save local state 
				       ~~start_i~~ := true
				       ~~flushed_{\alpha}[\alpha]~~ := true
				       broadcast ~~stop~~ to all ~~{P}_{j\neq\alpha}\in\mathcal{P}~~
				    end when
				    

Stop-and-sync


when a message ~~stop~~ arrives at ~~{P}_{i}~~ from ~~{P}_{j}~~ do
	if not ~~start_i~~ then
	    suspend application 
	    save local state
	    ~~start_i~~ := true
	    ~~flushed_{i}[i]~~ := true
	    broadcast ~~stop~~ to all ~~{P}_{k}\in\mathcal{P}~~
	end if

	~~flushed_{i}[j]~~ := true
	if ~~\forall k: flushed_{i}[k] == \mbox{true}~~ then
	    save ~~log_{i}~~
	    send ready to ~~P_{\alpha}~~
	end if
end when
				    

Stop-and-sync


				when an application message ~~m~~ arrives at ~~{P}_{i}~~ from ~~{P}_{j}~~ do
					if ~~start_i~~ and not ~~flushed_i[j]~~ then
					    ~~log_{i}~~ := ~~log_i \cup m~~
					else if ~~start_i~~
					    delay ~~m~~
					else
					    deliver ~~m~~
					end if
				end when
				    

Stop-and-sync


				when a message ~~ready~~ arrives at ~~{P}_{\alpha}~~ from ~~{P}_{j}~~ do
					~~ready_{\alpha}[j] := \mbox{true}~~
					if ~~\forall k: ready_{\alpha}[k] == \mbox{true}~~ then
					    save ~~log_{\alpha}~~
					    broadcast ~~continue~~ to all ~~{P}_k\in\mathcal{P}~~ including itself
					end if
				end when

				when a message continue arrives at ~~{P}_{i}~~ do
					resume application
					~~start_i := \mbox{false}~~
					deliver delayed messages
				end when
				    

Stop-and-sync: Poprawność algorytmu

  1. Postęp?
  2. Bezpieczeństwo?
  3. Założenia?

Stop-and-sync: Poprawność algorytmu

  1. Postęp?
  2. Wszystkie procesy ostatecznie zapiszą stan i logi, wszystkie procesy ostatecznie wznowią działanie.

  3. Bezpieczeństwo?
  4. Założenia?

Stop-and-sync: Poprawność algorytmu

  1. Bezpieczeństwo?
  2. Zapisane stany utworzą konfigurację spójną.

  3. Postęp?
  4. Założenia?

Stop-and-sync: Poprawność algorytmu

  1. Bezpieczeństwo?
  2. Zapisane stany utworzą konfigurację spójną.

    Jeżeli $P_i$ zapisał zdarzenie odbioru wiadomości $m$ od $P_j$, to $P_j$ zapisał zdarzenie wysłania tej wiadomości

  3. Postęp?
  4. Założenia?

Stop-and-sync: Poprawność algorytmu

  1. Założenia?
  2. Kanały niezawodne oraz FIFO, procesy nie ulegają awariom

  3. Bezpieczeństwo?
  4. Postęp?

Stop-and-sync: Postęp

Inicjator wysyła wiadomość do wszystkich procesów. Skoro kanały nie gubią wiadomości, ostatecznie wiadomości stop dojdą do wszystkich procesów.

Wszystkie procesy zgodnie z algorytmem wyślą tę samą wiadomość do wszystkich innych procesów.

Skoro kanały są niezawodne, te wiadomości $stop$ też dojdą do adresatów, a więc ostatecznie warunek $\forall k: flushed_i[k] == \mbox{true}$ będzie spełniony dla każdego $P_i$

Stop-and-sync: Postęp

Wszystkie procesy więc ostatecznie zapiszą stan i wyślą $ready$ do inicjatora.

Skoro kanały są niezawodne, te wiadomości dotrą do inicjatora, który także zapisze stan, wyśle innym zgodę na wznowienie działania $continue$ i sam wznowi działania.

Wiadomości $continue$ na pewno dotrą (skoro kanały są niezawodne) a więc wszystkie procesy także wznowią działanie. CND

Stop-and-sync: Bezpieczeństwo

Konfiguracja niespójna byłaby tylko, gdybyśmy mieli zapisane zdarzenie odbioru wiadomości, a nie mieli zapisanego zdarzenia wysłania wiadomości.

Proces $P_i$ musiałby więc zapisać stan uwzględniający zdarzenie odbioru wiadomości wysłanej przez $P_j$ już po zapisaniu przez niego stanu.

Proces $P_j$ po zapisaniu stanu nie wyśle nowych wiadomości, dopóki nie otrzyma wiadomości $continue$ od inicjatora

Stop-and-sync: Bezpieczeństwo

Inicjator wyśle $continue$ dopiero, gdy otrzyma $ready$ od wszystkich procesów.

Każdy proces zapisze stan dopiero i wyśle $ready$ do inicjatora dopiero, gdy warunek $\forall k: flushed_i[k] == \mbox{true}$ będzie spełniony.

Warunek ten spełniony będzie dopiero, gdy proces $P_i$ otrzyma wiadomość $stop$ od każdego procesu.

Stop-and-sync: Bezpieczeństwo

Proces wysyła wiadomość $stop$ dopiero, gdy zapisze stan.

A więc inicjator wyśle $continue$ dopiero, gdy wszystkie procesy zapiszą swój stan...

A więc niemożliwe jest by któryś zapisany stan obejmował zdarzenie odbioru wiadomości, wysłanej przez inny proces dopiero po zapisaniu przez niego stanu.

Stop-and-sync: Czy gubimy wiadomości?

Procesy są zawieszone, a więc nie zaczną wysyłać żadnych nowych wiadomości póki nie otrzymają $continue$ od inicjatora.

Kiedy inicjator otrzyma $ready$ od wszystkich procesów, wszystkie kanały są puste.

Stop-and-sync: Czy gubimy wiadomości?

Skoro kanały są FIFO, to wszystkie wiadomości wysłane przed $P_j$ do $P_i$ przed wysłaniem przez niego wiadomości $stop$ do $P_i$, muszą być odebrane przez $P_i$ przed odebraniem wiadomości $stop$.

Albo więc wiadomość była odebrana przed zapisaniem stanu, albo zostanie zapisana w logu.

Jeżeli zaś wiadomość była wysłana już po wznowieniu pracy przez jakiś $P_j$ i dotarła do $P_i$ przed wznowieniem przez niego pracy, jest wstrzymywana do czasu aż ten wznowi pracę (nie jest gubiona).

Stop-and-sync: Stan kanałów

Stan kanałów można wyznaczyć z logów zapisanych przez procesy.

Alternatywnie, stan procesów może obejmować historię wiadomości wysłanych i odebranych. Nie trzeba wówczas tworzenia logu podczas działania algorytmu, de facto jest trzymany cały czas. Stan kanałów wyliczamy jako różnicę wiadomości wysłanych/odebranych.

Stop-and-sync

  1. Łatwy w zrozumieniu, intuicyjny
  2. Wymaga wstrzymywania przetwarzania

Algorytm Lamporta

Algorytm Lamporta

Nie chcemy wstrzymywać przetwarzania... Na szczęście Stop-and-sync można łatwo zmodyfikować!

Założenia: Kanały niezawodne oraz FIFO, procesy nie ulegają awariom

Algorytm Lamporta


				    message marker

				    local bool ~~started_i := false~~
				    local bool ~~flushed_i[n] := false~~
				    local set of messages ~~log_i[n] := \left[\emptyset, \emptyset \ldots \emptyset \right]~~
				    

Algorytm Lamporta


				    when ~~{P}_{\alpha}~~ wants to take a snapshot
				       save local state 
				       ~~started_i~~ := true
				       ~~flushed_{\alpha}[\alpha]~~ := true
				       ~~\forall C_{i,j} \in \mathcal{C}_i~~ broadcast ~~marker~~ alongside ~~C_{i,j}~~
				    end when
				    

Algorytm Lamporta


when a message ~~marker~~ arrives at ~~{P}_{i}~~ from ~~{P}_{j}~~ do
	if not ~~start_i~~ then
	    save local state
	    ~~started_i~~ := true
	    ~~flushed_{i}[i]~~ := true
	    ~~\forall C_{i,j} \in \mathcal{C}_i~~ broadcast ~~marker~~ alongside ~~C_{i,j}~~
	end if

	~~flushed_{i}[j]~~ := true
	if ~~\forall k: flushed_{i}[k] == \mbox{true}~~ then
	    send state and ~~log_i~~ to ~~P_{\alpha}~~
	end if
end when
				    

Algorytm Lamporta


				when an application message ~~m~~ arrives at ~~{P}_{i}~~ from ~~{P}_{j}~~ do
					if ~~started_i~~ and not ~~flushed_i[j]~~ then
					    ~~log_{i}~~ := ~~log_i \cup m~~
					end if 
					deliver ~~m~~
				end when
				    

Algorytm Lamporta: Złożoność

Niech graf zorientowany odpowiadający topologii przetwarzania rozproszonego scharakteryzowany jest przez następujące parametry:

  • m –liczba krawędzi grafu (jednokierunkowych kanałów komunikacyjnych),
  • d –średnica grafu,
  • l –długość najdłuższej ścieżki w grafie.

Złożoność czasowa algorytmu Chandy-Lamporta wynosi $d + 1$, a złożoność komunikacyjna, w sensie liczby przesyłanych znaczników i pomijając zbieranie stanu przez inicjatora, wynosi $2m$.

Algorytm Lamporta: Stan wyznaczony a faktyczny

Nieformalnie - stan globalny $\Sigma^l$ jest osiągalny ze stanu $\Sigma^k$, jeżeli teoretycznie istnieje wykonanie zaczynające się od $\Sigma^k$ a kończące się $\Sigma^l$.

Zapisujemy to $\Sigma^k \rightsquigarrow \Sigma^l$

Algorytm Lamporta: Stan wyznaczony a faktyczny

Jeżeli proces konstrukcji konfiguracji spójnej rozpoczął się w chwili $\tau _{b}$ - a zakończył w chwili $\tau _{e}$, to wyznaczona konfiguracja ${\mathit {\Gamma }}$, reprezentująca pewien stan globalny ${\mathit {\Sigma }}$, jest osiągalna ze stanu ${\mathit {\Sigma }}(\tau _{b})$, a stan globalny ${\mathit {\Sigma }}(\tau _{e})$ jest osiągalny ze stanu ${\mathit {\Sigma }}={\mathit {\Gamma }}$. Tym samym: $$ {\mathit {\Sigma }}(\tau _{b})\rightsquigarrow {\mathit {\Gamma }}\rightsquigarrow {\mathit {\Sigma }}(\tau _{e})$$

Hmm... Ale co to nam daje?

Predykaty globalne

Predykat - funkcja zwracająca jako odpowiedź Prawda lub Fałsz. Predykat globalny odpowiada na pytania dotyczące całego systemu, np:

  1. Czy przetwarzanie uległo zakończeniu?
  2. Czy procesy się zakleszczyły?
  3. Czy ktoś jest w sekcji krytycznej?
  4. Czy istnieją bezczynne procesy?

Predykaty stabilne i niestabilne

Predykat stabilny - gdy raz stanie się prawdziwy, już zawsze będzie prawdziwy

  1. Czy przetwarzanie uległo zakończeniu?
  2. Czy procesy się zakleszczyły?

Predykat niestabilny - gdy raz stanie się prawdziwy, może z powrotem stać się fałszywy

  1. Czy ktoś jest w sekcji krytycznej?
  2. Czy istnieją bezczynne procesy?

Algorytm Lamporta: predykaty stabilne

Jeżeli w danej konfiguracji spójnej $\Gamma$ predykat stabilny $\Upsilon(\Gamma)$ jest prawdziwy, to $$\forall \Sigma ^k \in\mathit{\Sigma}:: \Gamma \rightsquigarrow \Sigma ^k \Rightarrow \Upsilon(\Sigma ^k)$$

Algorytm Lai Yanga

Algorytm Lai-Yanga

Będziemy dodatkowo kolorować procesy oraz wiadomości

Założenia: Kanały niezawodne oraz NONFIFO, procesy nie ulegają awariom

Algorytm Lai Yanga


				    enum color = { white, red }
				    message $m$ is a struct $\left\langle \mbox{data } appdata, \mbox{enum color} color \right\rangle$

				    local enum color procColor_i := white
				    local bool ~~started_i := false~~
				    local bool ~~flushed_i[n] := false~~
				    local set of messages ~~sentLog_i[n] := \left[\emptyset, \emptyset \ldots \emptyset \right]~~
				    local set of messages ~~recvLog_i[n] := \left[\emptyset, \emptyset \ldots \emptyset \right]~~
				    

Algorytm Lai Yanga


				    when ~~{P}_{\alpha}~~ wants to take a snapshot
				       save local state, $sentLog_{\alpha}$, and $recvLog_{\alpha}$
				       ~~procColor~~ := red

				       broadcast ~~\left\langle \emptyset, \mbox{red} \right\rangle~~ to all ~~P_{j\neq\alpha,j}\in\mathcal{P}~~
				    end when
				    

Algorytm Lai Yanga


when a message ~~m~~ arrives at ~~{P}_{i}~~ from ~~{P}_{j}~~ do
	if ~~m.color = \mbox{red} \land procColor_i = \mbox{white}~~ then 
	    save local state, $sentLog_{\alpha}$, and $recvLog_{\alpha}$ 
	    ~~procColor~~ := red
	end if

	if ~~m.appdata \neq \emptyset~~ then 
	    ~~recvLog_i := recvLog_i \cup m~~
	    deliver ~~m~~
	end if
end when
				    

Algorytm Lai Yanga


				when process ~~P_i~~ wants to sent a message ~~m~~ to ~~{P}_{j}~~ do
					~~sentLog_{i}~~ := ~~sentLog_i \cup m~~
					~~m.color := procColor_i~~
					sent ~~m~~ to ~~P_j~~
				end when
				    

Algorytm Lai Yanga

Pytanie sprawdzające - jak wyznaczamy stan kanałów?

W domu!

Dowody poprawności oraz złożoność komunikacyjna i obliczeniowa dla algorytmów Lamporta oraz Lai-Yanga!