Dydaktyka:
Feedback
Wyobraź sobie, że masz do napisania grę "kto szybciej pisze na klawiaturze" z następującymi zasadami:
Przygotowujesz logikę — bazę tekstów do przepisywania, licznik rund, listę klientów, funkcję rozpoczynającą nową rundę i kończącą grę jeśli rozegrano już trzy rundy, funkcję oceniającą odpowiedź od gracza:
#include <algorithm> #include <cstdlib> #include <iostream> #include <string> #include <thread> #include <vector> #include <netinet/in.h> #include <sys/socket.h> #include <unistd.h> std::vector<std::string> randomTexts = {"Colors may fade.", "Type louder, please.", "Natural laws have no pity.", "If in doubt, mumble.", "Place stamp here."}; int currRound = -1; int clients[3]; void nextRound() { if (++currRound == 3) { for (int i = 0; i < 3; ++i) { write(clients[i], "Game ended. Thanks for playing.\n", 33); shutdown(clients[i], SHUT_RDWR); close(clients[i]); } exit(0); } std::string msg = "\nNew round begins! Please type:\n" + randomTexts[currRound] + "\n"; for (int i = 0; i < 3; ++i) write(clients[i], msg.c_str(), msg.length()); } void checkAnswer(int cliFd, const char *ansBytes, int ansLength) { if ((randomTexts[currRound] + "\n") != std::string(ansBytes, ansLength)) { write(cliFd, "Wrong! Try again.\n", 18); return; } for (int i = 0; i < 3; ++i) if (clients[i] == cliFd) write(clients[i], "Correct!\n", 9); else write(clients[i], "Somone else was faster!\n", 25); nextRound(); }
Następnie zaczynasz pisać funkcję main – rozpoczynasz nasłuchiwanie,
przyjmujesz trzech klientów, rozpoczynasz pierwszą rundę:
int main(int argc, char **argv) { std::random_shuffle(randomTexts.begin(), randomTexts.end()); if (argc < 2) { std::cerr << "Missing port number!" << std::endl; return 1; } sockaddr_in sa{}; sa.sin_family = AF_INET; sa.sin_port = htons(atoi(argv[1])); int servFd = socket(AF_INET, SOCK_STREAM, 0); if (-1 == bind(servFd, (sockaddr *)&sa, sizeof(sa))) { perror("bind failed"); return 1; } listen(servFd, 1); for (int i = 0; i < 3; ++i) { clients[i] = accept(servFd, 0, 0); if (i != 2) write(clients[i], "Wait for others...\n", 19); } close(servFd); nextRound();
I teraz musisz odebrać wiadomość od tego klienta, który pierwszy wyśle przepisany tekst. Wiesz że można zrobić to kodem:
char buf[256]; int cnt = read(cliFd, buf, 256); if (cnt <= 0) exit(1); checkAnswer(cliFd, buf, cnt);
Ale zaraz, nie wiesz przecież który gracz pierwszy wyśle przepisany tekst!
Typowo aplikacje (jakiekolwiek, włączając sieciowe) muszą jednocześnie obsługiwać
wiele źródeł zdarzeń – np. z sieci przyszła wiadomość, użytkownik kliknął na menu
czy wpisał coś z klawiatury, minął czas oczekiwania na coś.
Często funkcje obsługujące takie zdarzenia po prostu blokują się czekając aż
oczekiwane zdarzenie wystąpi.
Przykładowo domyślnie operacje na gniazdach, np. connect, accept czy
read, blokują przetwarzanie (odpowiednio czekając na nawiązanie połączenia,
przyjście nowego klienta i przyjście wiadomości).
Wcześniej na zajęciach była mowa że można to zmienić na zachowanie nieblokujące,
ale oczywiście aktywne czekanie (busy waiting)
jest bardzo głupim pomysłem1), bo niepotrzebnie zużywa czas procesora.
Do jednoczesnej obsługi wielu źródeł zdarzeń stworzono dedykowane metody, można też używać podejścia wielowątkowego. Tworząc aplikację sieciową można ją napisać:
Wiesz, że procesy (uruchomione programy) wykonują, jedna po drugiej, następujące
po sobie instrukcje programu. Takie wykonywanie po kolei instrukcji programu
można nazwać wątkiem.
Jedną z instrukcji którą programista może umieścić w swoim programie jest
poproszenie systemu operacyjnego o uruchomienie nowego wątku przetwarzania,
wskazując którą instrukcję nowy wątek ma wykonać jako pierwszą.
Po stworzeniu drugiego wątku (zakładając że w procesie był jeden wątek) proces
robi dwie rzeczy naraz – jeden wątek wykonuje kolejną instrukcję po tej
żądającej tworzenia nowego wątku, drugi wykonuje kolejne instrukcje zaczynając
od wskazanej.
O tym który wątek używa w danej chwili którego procesora decyduje system
operacyjny – dwa wątki tego samego procesu mogą np. naprzemiennie dostawać
czas na tym samym procesorze, albo jednocześnie dostać czas (każdy wątek na
innym procesorze).
System operacyjny dba o to, żeby każdy proces miał własną pamięć operacyjną
(w której między innymi trzyma wartości zmiennych), własną listę otwartych
plików, własny katalog roboczy, etc.
Wątki tego samego procesu używają tej samej pamięci, tych samych plików, tego
samego katalogu roboczego etc.
Każdy wątek ma oddzielnie od innych tylko te rzeczy, które są związane
z informacjami o kolejno wykonywanych instrukcjach i ze stanem procesora (np.
informację czy rezultat poprzedniej operacji matematycznej był ujemny).
Poza instrukcją która prosi system operacyjny o uruchomienie nowego wątku, jest
też instrukcja która prosi o poczekanie na zakończenie wskazanego wątku.
Można dzięki temu np. wykonać część obliczeń w bieżącym wątku, a część w nowo
do tego celu utworzonym, a kiedy potrzeba już wyniku poczekać aż ten dodatkowy
wątek się zakończy.
Systemy operacyjne wymagają od programisty albo żeby poinformował go że nigdy
nie wywoła funkcji czekające na zakończenie wskazanego wątku, albo żeby
ostatecznie ją wykonał – do tego czasu (po zakończeniu wątku) system musi wciąż
pamiętać że ten się zakończył.
Wątki w języku C++:
std::thread, czyli przy tworzeniu zmiennej typu std::thread,std::thread nie można kopiować (ale można je przenosić),std::thread trzeba wykonać albo join albo detach przed wywołaniem destruktora.Przykładowy kod tworzący jeden wątek i czekający aż ten się zakończy:
#include <iostream> #include <thread> #include <unistd.h> void funkcja() { for (int i = 0; i < 5; ++i) { std::cout << "drugi wątek, iteracja " << i << std::endl; usleep(100000); } } int main() { std::thread wątek(funkcja); for (int i = 0; i < 2; ++i) { std::cout << "główny wątek, iteracja " << i << std::endl; usleep(100000); } wątek.join(); return 0; }
Przykładowy kod tworzący jeden wątek i NIE czekający aż ten się zakończy:
#include <iostream> #include <thread> #include <unistd.h> void funkcja() { for (int i = 0; i < 5; ++i) { std::cout << "drugi wątek, iteracja " << i << std::endl; usleep(100000); } } int main() { std::thread wątek(funkcja); wątek.detach(); for (int i = 0; i < 2; ++i) { std::cout << "główny wątek, iteracja " << i << std::endl; usleep(100000); } return 0; }
Przekazywanie argumentów do funkcji którą ma wykonywać wątek odbywa się następująco:
#include <iostream> #include <string> #include <thread> #include <unistd.h> void funkcja(int ilość, std::string tekst) { for (int i = 0; i < ilość; ++i) { std::cout << tekst << ", iteracja " << i << std::endl; usleep(100000); } } int main() { std::thread wątek(funkcja, 3, "drugi wątek"); wątek.detach(); for (int i = 0; i < 3; ++i) { std::cout << "główny wątek, iteracja " << i << std::endl; usleep(100000); } return 0; }
Dowolny wątek może stworzyć nowy wątek; poniżej przykład kodu tworzący wiele wątków:
#include <iostream> #include <string> #include <thread> #include <unistd.h> void funkcja(int ilość, std::string tekst) { for (int i = 0; i < ilość; /*nic*/) { std::cout << tekst << ", iteracja " << i << std::endl; usleep(100000); ++i; std::thread(funkcja, ilość - i, tekst + "_" + std::to_string(i)).detach(); } } int main() { std::thread(funkcja, 5, "wątek 1").detach(); for (int i = 0; i < 6; ++i) { std::cout << "wątek 0, iteracja " << i << std::endl; usleep(100000); } return 0; }
Wątki tego samego procesu pracują na tej samej pamięci, tych samych plikach etc.
Jednoczesna praca na tych samych zasobach (np. tej samej zmiennej) z dwóch
wątków może doprowadzić do problemów.
Poniżej 2 wątki, każdy 100000 razy, zwiększają wartość zmiennej a i b
o 1.001 raza.
#include <iostream> #include <thread> double a = 1.0; double b = 1.0; void funkcja() { for (int i = 0; i < 100000; ++i) { a = a * 1.001; b = b * 1.001; } } int main() { std::thread wątek(funkcja); funkcja(); wątek.join(); std::cout << a << std::endl << b << std::endl; return 0; }
Oczekiwany wynik zarówno dla zmiennej a jak i b to
1.0·1.0012*100000 ≈ 6.54e86, ale po uruchomieniu programu można
zobaczyć inne wyniki, np. a≈2.9e52 i b≈1.5e49, czy a≈3.2e62
i b≈4.2e46.
Procesor wykonując operację a = a * 1.001; najpierw odczytuje wartość a,
potem mnoży ją przez 1.001, i na końcu zapisuje wynik z powrotem do zmiennej
a – w trzech oddzielnych krokach.
Jeden z możliwych scenariuszy jest taki, że dwa wątki działające na dwóch
różnych procesorach równocześnie odczytają wartkość a, każdy pomnoży ją
przez 1.001, a na końcu każdy zapisze wynik z powrotem do zmiennej a.
Wtedy mimo wykonania dwóch powiększeń a, wartość zmiennej a zostanie
powiększona tylko jednokrotnie.
Do rozwiązania tego i podobnych problemów potrzebne jest dodanie przez
programistę operacji synchronizujących pracę wątków tak, by nie korzystały
naraz z tych samych rzeczy.
To jak należy poprawnie pisać współbieżne programy jest tematem na osobny
przedmiot (w programie bioinformatyki to np. przedmiot obieralny z trzeciego
semestru podstawy programowania współbieżnego).
Wracając do przykładu z początku zajęć: jeżeli więc trzeba czekać na wiadomość od trzech klientów naraz, można przygotować funkcję którą na każdym takim gnieździe trzeba wykonywać:
void readFromClient(int cliFd) { while (true) { char buf[256]; int cnt = read(cliFd, buf, 256); if (cnt <= 0) exit(1); checkAnswer(cliFd, buf, cnt); } }
A następnie dokończyć funkcję main liniami:
...
nextRound();
std::thread(readFromClient, clients[0]).detach();
std::thread(readFromClient, clients[1]).detach();
readFromClient(clients[2]);
}
Standard POSIX określa czy i jak można ją wykonywać funkcje współbieżnie z
innymi. Dla gniazd TCP można współbieżne wywoływać operacje wejścia/wyjścia
na tym samym gnieździe i są one atomowe, tzn. wywołanie współbieżnie dwóch
send (lub dwóch recv) wykona najpierw jedno z tych wywołań, potem
drugie.
Przy czym trzeba pamiętać, że niektóre operacje mogą wysłać czy odczytać mniej
danych niż żądał programista, więc zwykle i tak synchronizacja (przynajmniej
odbierania) na tym samym gnieździe jest konieczna.
Zamknięcie połączenia TCP z jednego wątku przerywa operacje czekające na
odebranie wiadomości na innych wątkach (zwracają 0 sygnalizujące że zamknięto
połączenie) i przerywa operacje wysyłania danych (zgłaszają sygnał SIGPIPE
i zwracają -1 sygnalizujące błąd, ustawiając wcześniej errno na EPIPE).
Zadanie 1 Połącz powyższe fragmenty kodu z odpowiednimi fragmentami z początku materiałów. Skompiluj program i przetestuj jego działanie.
Zadanie 2
Zastanów się co złego może się stać jeżeli program jednocześnie zacznie
wykonywać funkcję checkAnswer z dwóch wątków (dla dwóch różnych graczy).
Zadanie 3 Napisz program który połączy się, używając TCP, pod wskazany adres, a następnie będzie równocześnie:
Zadanie 4 Napisz serwer TCP, który każdą otrzymaną wiadomość przekaże wszystkim połączonym klientom. Zauważ że serwer musi jednocześnie czekać na nowych klientów i jednocześnie odbierać wiadomości od każdego z już połączonych.