Dydaktyka:
FeedbackTo jest stara wersja strony!
Do budowy aplikacji sieciowych opartych o pętlę zdarzeń potrzebna jest funkcja
która określi na którym ze wskazanych gniazd jest możliwość odczytu/zapisu
bez blokowania, jeśli trzeba czekając aż taka możliwość się pojawi.
Standard POSIX definiuje dwie takie funkcje: select i poll. Pozwalają
one na monitorowanie dowolnych deskryptorów plików, w tym gniazd.
Obie są też dostępne w systemach z rodziny Microsoft Windows, ale tam
można nimi monitorować tylko gniazda.
Do budowy serwerów zwykle wybiera się funkcję poll (w Windowsie
występującą pod zmienioną nazwą
WSAPoll).
W systemach uniksopodobnych funkcja select ma ograniczenia które limitują
jej przydatność w programach sieciowych, np. zwykle nie obsługuje deskryptorów
plików o numerach większych niż 1023.
Implementacja w Windowsie
pozwala dowolnie podnieść ten limit.
Poza ustandaryzowanymi funkcjami poll i select każdy system operacyjny
dostarcza też własnych interfejsów programistycznych które pozwalają budować
jednowątkowe aplikacje używające jednocześnie wielu gniazd (i te interfejsy
często oferują wyższą wydajność).
Przed wykonaniem funkcji poll trzeba przygotować listę informacji na co program
chce czekać.
Taka lista musi mieć postać tablicy struktur pollfd, w których trzeba
wypełnić pole fd deskryptorem pliku, oraz pole events listą tego na
jakie zdarzenia program chce czekać:
pollfd pfds[2]; pfds[0].fd = cli1; pfds[0].events = POLLIN; pfds[1].fd = cli2; pfds[1].events = POLLIN;
POLLIN określa że nastąpiło coś, co pozwoli wykonać funkcje odbierające
dane bez blokowania. Może to oznaczać że przyszła nowa wiadomość, ale może też
przykładowo oznaczać że połączenie zostało zamknięte (wtedy funkcje czytające
dane z sieci też zakończą się bez czekania).
Po przygotowaniu tych informacji, można rozpocząć oczekiwanie funkcją poll:
poll(pfds, 2, -1);
W momencie wywołania tej funkcji system operacyjny odczytuje z tablicy struktur
pollfd przekazanej jako pierwszy argument tyle struktur ile podano w drugim
argumencie.
Następnie zatrzymuje (blokuje) działanie wątku który wywołał poll
do czasu aż którąś ze wskazanych w tych strukturach operacji będzie się dało
wykonać bez blokowania.
Trzeci argument funkcji poll służy do ustawiania maksymalnego czas blokowania;
ustawienie go na wartość -1 określi że poll ma czekać do skutku.
Kiedy można już wykonać którąś ze wskazanych operacji wejścia/wyjścia bez
blokowania, system operacyjny nadpisuje pola revents struktur wskazanych
w argumentach na te zdarzenia które wystąpiły (dla struktur opisujących
deskryptory na których nie da się bez blokowania wykonać żadnej operacji
ustawi 0).
Programista musi następnie przeanalizować ustawione wartości pól revents, np:
for (int i = 0; i < 2; ++i) { if (pfds[i].revents & POLLIN) { int cnt = recv(pfds[i].fd, buf, 256, 0); if (cnt <= 0) termianteConnection(pfds[i].fd); else handleMessage(pfds[i].fd, buf, cnt); } }
Załóżmy że od klienta przyszła wiadomość od drugiego klienta z prośbą o wysłanie mu dużego pliku. Serwer próbuje więc wykonać:
int cnt = send(pfds[1].fd, bigFile, 16777216, MSG_DONTWAIT);
Zwróć uwagę że wysyłając ustawiono MSG_DONTWAIT. Pamiętaj że funkcje
wysyłające w normalnym, blokującym trybie, czekają tak długo jak trzeba żeby
wysłać całą wiadomość. W jednowątkowym serwerze to zatrzymałoby cały program
na (potencjalnie bardzo długi) czas wysyłania tego pliku.
Załóżmy że funkcja send zwróciła wartość 2097152, czyli mniej niż
żądano. Oznacza to że bez czekania nie da się wysłać więcej.
Program chce więc teraz też czekać na wysłanie danych, dlatego zmienia listę
zdarzeń na którą czeka:
pfds[1].events = POLLIN | POLLOUT;
POLLOUT określa że albo można wysłać przynajmniej jeden bajt, albo gniazdo
znalazło się w innym stanie w którym funkcje wysyłające dane wykonają się bez
czekania (np. połączenie zostało zamknięte bądź zerwane).
Zauważ jak wewnątrz pliku nagłówkowego #include <poll.h> są zdefiniowane
kolejne stałe określające zdarzenia:
#define POLLIN 0x001 // 0b00000001 #define POLLPRI 0x002 // 0b00000010 #define POLLOUT 0x004 // 0b00000100 #define POLLERR 0x008 // 0b00001000 #define POLLHUP 0x010 // 0b00010000
Jak widać, każdy bit określa inne zdarzenie. To pozwala w polu events czy
revents (które jest np. typu short int) zapisać osobno informację
o każdym zdarzeniu.
Dlatego, żeby sprawdzić czy dane zdarzenie jest pośród ustawionych, musisz użyć
operatora &(np. if(pfds[i].revents & POLLIN)).
Dodanie zdarzenia np. POLLIN to pfd.events = pfd.events | POLLIN;,
a usunięcie to pfd.events = pfd.events & ~POLLIN;
Następnie można ponownie wykonać funkcję poll (czekając już teraz na
możliwość odczyty z obu gniazd i/lub możliwość zapisu do drugiego gniazda).
Załóżmy że została teraz wykonana z argumentami:
int count = poll(pfds, 2, 5000);
Ustawienie ostatniego argumentu, maksymalnego czasu czekania, na 5000 oznacza
5000ms, czyli 5s.
Jako wynik poll zwraca na ilu deskryptorach ustawiono niezerową listę zdarzeń.
Dla przypomnienia, pfds[0].events ma wartość POLLIN, a pfds[1].events ma wartość POLLIN | POLLOUT.
Wybrane możliwe wyniki po wykonaniu funkcji to teraz:
count | pfds[0].revents | pfds[1].revents | taki wynik wystąpi między innymi jeżeli: |
|---|---|---|---|
0 | 0 | 0 | przez 5s nic się nie stało |
1 | 0 | POLLIN | przyszła wiadomość od drugiego klienta |
1 | 0 | POLLOUT | systemowi operacyjnemu udało się coś wysłać do drugiego klienta i można nadać kolejna porcję danych |
1 | 0 | POLLIN | POLLOUT | wystąpiły naraz dwa powyższe zdarzenia |
2 | POLLIN | POLLIN | przyszła wiadomość i od pierwszego, i od drugiego klienta |
1 | POLLIN | POLLERR | POLLHUP | 0 | połączenie z pierwszym klientem zostało zerwane1) |
Zwróć uwagę że mimo tego że program nie czekał ani na zdarzenie
POLLERR,
ani na zdarzenie POLLHUP, to zostały mu one zwrócone – jeśli któreś z tych
zdarzeń wystąpi, to funkcja poll zawsze je zgłosi.
Program musi teraz sprawdzić które zdarzenia nastąpiły. Zwróć uwagę, że jeżeli jest możliwość wysłania danych do klienta drugiego, to program potrzebuje teraz wykonać:
int cnt = send(fd, bigFile + 2097152, 16777216 - 2097152, MSG_DONTWAIT);
gdzie 16777216 to rozmiar pliku który ma być wysłany, a 2097152 to
liczba już wysłanych bajtów.
Zwróć uwagę, że zwykle w programie trzeba dla każdego klienta przechowywać po stronie aplikacji częściowo odebrane logiczne wiadomości jak i dane które czekają na wysłanie (tj. przekazanie do wysłania do systemu operacyjnego).
Zwykle w programach pełniących rolę serwera trzeba dynamicznie tworzyć tablicę
struktur pollfd – z każdym nowym klientem potrzeba przecież zwiększyć
rozmiar tej tablicy. (W C++ można do tego wykorzystać np. std::vector,
przy czym trzeba pamiętać o tym jak i które modyfikacje struktur danych można
zrobić w trakcie przechodzenia po nich.)
Dla gniazda nasłuchującego POLLIN określa że pojawił się nowy klient.
Zadanie 1
Weź z materiałów do poprzedniego tematu dwa pierwsze
fragmenty kodu, a następnie dokończ opisaną tam grę "kto szybciej pisze na
klawiaturze" tak żeby program działał na jednym wątku z użyciem funkcji
poll.
Pamiętaj żeby dodać do kodu #include <poll.h>, które dołączy plik
z deklaracją potrzebnych funkcji, struktur i stałych.
Zadanie 2 Napisz jednowątkowy program który połączy się, używając TCP, pod wskazany adres, a następnie będzie równocześnie:
Zadanie 3 Napisz jednowątkowy serwer TCP, który każdą otrzymaną wiadomość przekaże
wszystkim połączonym klientom. Zauważ że serwer musi jednocześnie czekać na
nowych klientów i jednocześnie odbierać wiadomości od każdego z już połączonych.
Pisząc serwer załóż, że jeżeli nieblokujące wysyłanie do kogoś wiadomości nie
wyśle całej wiadomości bez czekania, to należy zakończyć połączenie z tym
klientem uznając że zostało ono zerwane.
Zadanie 4 Protokół TCP nie gwarantuje że dane są odbierane w takich samych
porcjach w jakich były wysłane.
Zauważ że dla programu z poprzedniego zadania logiczna wiadomość to jedna linia
– tekst kończący się znakiem '\n'.
Jeżeli jeden klient wysłał linię Hello world! a drugi wysłał linię
Witaj świecie!, a pierwszy tekst zostanie odebrany w dwóch wywołaniach
funkcji read, to pozostałe osoby mogą zobaczyć na swoim ekranie np. linię
Hello Witaj świecie! oraz linię world!.
Zmień program z poprzedniego zadania tak, żeby logiczne wiadomości były
przesyłane dalej poprawnie. Zauważ że musisz do tego zbierać dane od każdego
klienta do osobnego bufora.
Do testów tego programu użyj jako klienta komendy
socat tcp:adres:port stdio,ignoreeof
która pozwoli tobie, wciskając Ctrl + d, wysłać dotychczas wpisane
znaki.
Serwer wysyłający żądane pliki (Linux)
Serwer od każdego połączonego klienta czyta linię tekstu, następnie wysyła do niego plik wskazany w tej linii.
Zauważ że wysyłając plik trzeba zakładać że nie uda się go wysłać w całości
bez czekania, stąd program pamięta dla każdego klienta co ma mu wysłać i
czeka też na zdarzenie POLLOUT jeśli trzeba.
Dla uproszczenia program wysyłając jeden plik nie próbuje odczytać od klienta danych (jeżeli klient wyśle nazwę kolejnego pliku, zostanie ona odebrana dopiero po wysłaniu całego poprzedniego).
Czytając kod szczególnie zwróć uwagę na linie zaczynające się od /*!*/.
#include <filesystem> #include <fstream> #include <iostream> #include <unordered_map> #include <vector> #include <netdb.h> #include <poll.h> #include <signal.h> #include <sys/socket.h> #include <unistd.h> int main(int ac, char **av) { if (ac != 2) { std::cerr << "need arg" << std::endl; return 1; } signal(SIGPIPE, SIG_IGN); addrinfo hints{.ai_flags = AI_PASSIVE, .ai_protocol = IPPROTO_TCP}, *ret; if (getaddrinfo(nullptr, av[1], &hints, &ret)) { std::cerr << "bad port " << std::endl; return 1; } int serv = socket(ret->ai_family, ret->ai_socktype, ret->ai_protocol); if (bind(serv, ret->ai_addr, ret->ai_addrlen)) { perror("bind failed"); return 1; } freeaddrinfo(ret); listen(serv, 1); struct Client { std::string buffer; std::fstream file; }; std::unordered_map<int, Client> cliMap; // prepare information on which file descriptors pool should wait, and for which I/O operations std::vector<pollfd> pfds; /* !*/ pfds.push_back({.fd = serv, .events = POLLIN}); while (1) { // wait until any of the listed file descriptors is ready to do the requested I/O operation /*!*/ poll(pfds.data(), pfds.size(), -1); // loop over all pollfd structures in the list auto it = pfds.begin(); /*!*/ while (it != pfds.end()) { pollfd &pfd = *it; // if there are no ready events, skip this fd /*!*/ if (pfd.revents == 0) { it++; continue; } // if this is the server socket, accept a new client /*!*/ if (pfd.fd == serv) { int cli = accept(serv, nullptr, nullptr); if (cli == -1) { perror("accept failed"); it++; continue; } std::cout << "new client " << cli << std::endl; // add the newly accepted client to the list of descriptors that should be polled pfds.push_back({.fd = cli, .events = POLLIN}); it = ++pfds.begin(); // vector iterator is invalidated on push_back continue; } // not the sever socket, so it's a client socket; look up this client's buffers auto &cli = cliMap[pfd.fd]; // check if poll indicated that read can be called without blocking /*!*/ if (pfd.revents & POLLIN) { char buf[256]; int cnt = read(pfd.fd, buf, 256); // if poll returned POLLIN, it can mean that: data arrived, socket closed, error occurred /*!*/ if (cnt <= 0) { std::cout << "client " << pfd.fd << " gone" << std::endl; cliMap.erase(pfd.fd); close(pfd.fd); it = pfds.erase(it); // erasing from vector returns iterator to next element continue; } cli.buffer.append(buf, cnt); if ('\n' != *cli.buffer.rbegin()) // check if full line was received { it++; continue; } cli.buffer.resize(cli.buffer.size() - 1); std::cout << "client " << pfd.fd << " requested " << cli.buffer << std::endl; cliMap[pfd.fd].file.open(cli.buffer); // construct message to be sent: either error, or the file size if (!cli.file.good()) cli.buffer = "ERROR\n"; else cli.buffer = std::to_string(std::filesystem::file_size(cli.buffer)) + "\n"; // indicate that now poll must wait for possibility of writing data to that client /*!*/ pfd.events = POLLOUT; it++; continue; } // if previous loop iteration sent whole buffer, replenish it if (cli.buffer.empty()) { char buf[4096]; cli.file.read(buf, 4096); cli.buffer.append(buf, cli.file.gcount()); if (cli.buffer.empty()) { cli.file.close(); std::cout << "sent file to client " << pfd.fd << std::endl; // indicate that now poll must wait for data from the client (subsequent file name) pfd.events = POLLIN; it++; continue; } } // poll indicated that it's possible to write at least one byte or error occurred /*!*/ // this does not mean that any amount of data can be sent, hence non-blocking send int cnt = send(pfd.fd, cli.buffer.data(), cli.buffer.length(), MSG_DONTWAIT); if (cnt == -1) { std::cout << "client " << pfd.fd << " write error" << std::endl; cliMap.erase(pfd.fd); close(pfd.fd); it = pfds.erase(it); continue; } cli.buffer.erase(0, cnt); it++; continue; } } }
Serwer wysyłający żądane pliki (Windows)
Serwer od każdego połączonego klienta czyta linię tekstu, następnie wysyła do niego plik wskazany w tej linii.
Zauważ że wysyłając plik trzeba zakładać że nie uda się go wysłać w całości
bez czekania, stąd program pamięta dla każdego klienta co ma mu wysłać i
czeka też na zdarzenie POLLOUT jeśli trzeba.
Dla uproszczenia program wysyłając jeden plik nie próbuje odczytać od klienta danych (jeżeli klient wyśle nazwę kolejnego pliku, zostanie ona odebrana dopiero po wysłaniu całego poprzedniego).
Czytając kod szczególnie zwróć uwagę na linie zaczynające się od /*!*/.
#include <filesystem> #include <fstream> #include <iostream> #include <unordered_map> #include <vector> #include <WinSock2.h> #include <ws2tcpip.h> #pragma comment(lib, "Ws2_32.lib") int main(int ac, char **av) { if (ac != 2) { std::cerr << "need arg" << std::endl; return 1; } WSADATA winSockInfos; if (int error = WSAStartup(MAKEWORD(2, 2), &winSockInfos)) { fprintf(stderr, "WinSock2 startup failed: %d\n", error); return 1; } addrinfo hints{.ai_flags = AI_PASSIVE, .ai_family = AF_INET, .ai_protocol = IPPROTO_TCP}, *ret; if (getaddrinfo(nullptr, av[1], &hints, &ret)) { std::cerr << "bad port " << std::endl; return 1; } SOCKET serv = socket(ret->ai_family, ret->ai_socktype, ret->ai_protocol); if (bind(serv, ret->ai_addr, ret->ai_addrlen)) { perror("bind failed"); return 1; } freeaddrinfo(ret); listen(serv, 1); struct Client { std::string buffer; std::fstream file; }; std::unordered_map<SOCKET, Client> cliMap; // prepare information on which file descriptors pool should wait, and for which I/O operations std::vector<WSAPOLLFD> pfds; /* !*/ pfds.push_back({.fd = serv, .events = POLLIN}); while (1) { // wait until any of the listed file descriptors is ready to do the requested I/O operation /*!*/ WSAPoll(pfds.data(), pfds.size(), -1); // loop over all pollfd structures in the list auto it = pfds.begin(); /*!*/ while (it != pfds.end()) { WSAPOLLFD &pfd = *it; // if there are no ready events, skip this fd /*!*/ if (pfd.revents == 0) { it++; continue; } // if this is the server socket, accept a new client /*!*/ if (pfd.fd == serv) { SOCKET cli = accept(serv, nullptr, nullptr); if (cli == -1) { perror("accept failed"); it++; continue; } std::cout << "new client " << cli << std::endl; // add the newly accepted client to the list of descriptors that should be polled pfds.push_back({.fd = cli, .events = POLLIN}); // windows does not support MSG_DONTWAIT (which is needed on send in this program) // threfore the entire socket is put into non-blocking mode unsigned long mode = 1; /*!*/ ioctlsocket(cli, FIONBIO, &mode); it = ++pfds.begin(); // vector iterator is invalidated on push_back continue; } // not the sever socket, so it's a client socket; look up this client's buffers auto &cli = cliMap[pfd.fd]; // check if poll indicated that read can be called without blocking /*!*/ if (pfd.revents & POLLIN) { char buf[256]; int cnt = recv(pfd.fd, buf, 256, 0); // if poll returned POLLIN, it can mean that: data arrived, socket closed, error occurred /*!*/ if (cnt <= 0) { std::cout << "client " << pfd.fd << " gone" << std::endl; cliMap.erase(pfd.fd); closesocket(pfd.fd); it = pfds.erase(it); // erasing from vector returns iterator to next element continue; } cli.buffer.append(buf, cnt); if ('\n' != *cli.buffer.rbegin()) // check if full line was received { it++; continue; } cli.buffer.resize(cli.buffer.size() - 1); std::cout << "client " << pfd.fd << " requested " << cli.buffer << std::endl; cliMap[pfd.fd].file.open(cli.buffer); // construct message to be sent: either error, or the file size if (!cli.file.good()) cli.buffer = "ERROR\n"; else cli.buffer = std::to_string(std::filesystem::file_size(cli.buffer)) + "\n"; // indicate that now poll must wait for possibility of writing data to that client /*!*/ pfd.events = POLLOUT; it++; continue; } // if previous loop iteration sent whole buffer, replenish it if (cli.buffer.empty()) { char buf[4096]; cli.file.read(buf, 4096); cli.buffer.append(buf, cli.file.gcount()); if (cli.buffer.empty()) { cli.file.close(); std::cout << "sent file to client " << pfd.fd << std::endl; // indicate that now poll must wait for data from the client (subsequent file name) pfd.events = POLLIN; it++; continue; } } // poll indicated that it's possible to write at least one byte or error occurred /*!*/ // this does not mean that any amount of data can be sent int cnt = send(pfd.fd, cli.buffer.data(), cli.buffer.length(), 0); if (cnt == -1) { std::cout << "client " << pfd.fd << " write error" << std::endl; cliMap.erase(pfd.fd); closesocket(pfd.fd); it = pfds.erase(it); continue; } cli.buffer.erase(0, cnt); it++; continue; } } }
read najpierw zwrócą wszystkie już odebrane dane, potem zakończą się wartością -1 ustawiając errno na wartość ECONNRESET (Połączenie zerwane przez drugą stronę).