Serwery współbieżne¶
Serwer współbieżny obsługuje klientów za pomocą wielu wątków lub procesów.
Przykład¶
Serwer:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#define MULTITHREAD
#define ADDR "127.0.0.1"
#define PORT 9001
#define QUEUE_SIZE 16
#define BUFFER_SIZE 256
void *echo(void *arg) {
printf("> echo\n");
int client_sockfd = *(int *) arg;
char buffer[BUFFER_SIZE];
int read_bytes = read(client_sockfd, buffer, BUFFER_SIZE);
printf("received: %d\n", read_bytes);
if (read_bytes < 0) {
perror("Could not read from socket");
exit(-1);
}
printf("received: %s\n", buffer);
int written_bytes = write(client_sockfd, buffer, read_bytes);
printf("written: %d\n", written_bytes);
if (written_bytes < 0) {
perror("Could not write to socket");
exit(-1);
}
close(client_sockfd);
printf("< echo\n");
return 0;
}
int main(int argc, char *argv[]) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("Could not create socket");
exit(-1);
}
struct sockaddr_in server_sockaddr;
server_sockaddr.sin_family = AF_INET;
server_sockaddr.sin_port = htons(PORT);
server_sockaddr.sin_addr.s_addr = inet_addr(ADDR);
if (server_sockaddr.sin_addr.s_addr < 0) {
perror("Error converting IP address");
exit(-1);
}
int result = bind(sockfd, (struct sockaddr *) &server_sockaddr, sizeof(server_sockaddr));
if (result < 0) {
perror("Error binding socket");
exit(-1);
}
result = listen(sockfd, QUEUE_SIZE);
if (result < 0) {
perror("Error setting up socket to listen");
exit(-1);
}
while (1) {
struct sockaddr_in client_sockaddr;
socklen_t client_sockaddr_size;
int *client_sockfd = malloc(sizeof(int));
*client_sockfd = accept(sockfd, (struct sockaddr *) &client_sockaddr, &client_sockaddr_size);
#ifdef MULTITHREAD
pthread_t client_thread;
result = pthread_create(&client_thread, NULL, echo, (void *) client_sockfd);
if (result < 0) {
perror("Could not create thread");
exit(-1);
}
pthread_detach(client_thread);
#else
echo((void *) client_sockfd);
#endif
}
}
Klient:
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#define ADDR "127.0.0.1"
#define PORT 9001
#define BUFFER_SIZE 256
int main(int argc, char *argv[]) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("Could not create socket");
exit(-1);
}
struct sockaddr_in server_sockaddr;
server_sockaddr.sin_family = AF_INET;
server_sockaddr.sin_port = htons(PORT);
server_sockaddr.sin_addr.s_addr = inet_addr(ADDR);
int result = connect(sockfd, (struct sockaddr *) &server_sockaddr, sizeof(server_sockaddr));
if (result < 0) {
perror("Could not connect to socket");
exit(-1);
}
int written_bytes = write(sockfd, "hello", 5);
printf("written: %d\n", written_bytes);
if (written_bytes < 0) {
perror("Could not write to socket");
exit(-1);
}
char buffer[BUFFER_SIZE];
int read_bytes = read(sockfd, buffer, BUFFER_SIZE);
printf("received: %d\n", written_bytes);
if (read_bytes < 0) {
perror("Could not read from socket");
exit(-1);
}
printf("received: %s\n", buffer);
close(sockfd);
}
Monitorowanie gniazd BSD¶
“Klasyczna” metoda monitorowania deskryptorów gniazd (omijać):
#include <unistd.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
fd_set *readfds
: deskryptory plików, w których przypadku czekamy na nowe dane które będzie można odczytać z desktyptora; po wykonaniu operacji argument jest modyfikowany i zawiera tylko te deskryptory które uległy zmianom.fd_set *writefds
: deskryptory plików, w których przypadku czekamy aż będzie możliwe wykonanie operacji bez blokowania; po wykonaniu operacji argument jest modyfikowany i zawiera tylko te deskryptory które uległy zmianom.fd_set *exceptfds
: deskryptory plików w których przypadku czekamy na wyjątki; po wykonaniu operacji argument jest modyfikowany i zawiera tylko te deskryptory które uległy zmianom.int nfds
: next file descriptor – najwyższy z podanych do monitorowania deskryptorów plików+ 1
.struct timeval *timeout
: maksymalny czas czekania lubNULL
.- zwraca liczbę deskryptorów które uległy odpowiednim zmianom, lub
-1
w wypadku błędu.
Poll:
#include <poll.h> int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd *fds
: zbiór deskryptorów plików do monitorowania:struct pollfd { int fd; // Desktyptor short events; // Oczekiwane zdarzenia short revents; // Zdarzenia, które wystąpiły }
zdarzenia:
POLLIN
: nowe dane do przeczytania (lub gniazdo zostało zamknięte),POLLOUT
: można zapisać dane bez blokowania.
nfds_t nfds
: liczba deskryptorów wfds
,int timeout
: maksymalny czas oczekiwania na zmiany (negatywna wartość oznacza oczekiwanie bez ograniczeń czasowych),zwraca
0
jeśli skończy się wyznaczony czas, wartość<0
w przypadku błędu,>0
w przypadku poprawnego wykonania.
Oczekiwanie na zdarzenia¶
Klient:
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <poll.h>
#define ADDR "127.0.0.1"
#define PORT 9001
#define BUFFER_SIZE 256
int main(int argc, char *argv[]) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("Could not create socket");
}
struct sockaddr_in server_sockaddr;
server_sockaddr.sin_family = AF_INET;
server_sockaddr.sin_port = htons(PORT);
server_sockaddr.sin_addr.s_addr = inet_addr(ADDR);
int result = connect(sockfd, (struct sockaddr *) &server_sockaddr, sizeof(server_sockaddr));
if (result < 0) {
perror("Could not connect to socket");
exit(-1);
}
int written_bytes = write(sockfd, "hello\n", 6);
printf("written: %d\n", written_bytes);
if (written_bytes < 0) {
perror("Could not write to socket");
exit(-1);
}
struct pollfd sock_fds[] = {
{sockfd, POLLIN}
};
while (1) {
int result = poll(sock_fds, 1, 0);
if (result < 0) {
perror("Could not poll");
exit(-1);
}
printf("poll returned %d\n", result);
if (sock_fds[0].revents & POLLERR) {
printf("error\n");
close(sockfd);
return 0;
}
if (sock_fds[0].revents & POLLHUP) {
printf("hangup\n");
close(sockfd);
return 0;
}
if (sock_fds[0].revents & POLLNVAL) {
printf("invalid\n");
close(sockfd);
return 0;
}
if (sock_fds[0].revents & POLLIN) {
printf("something to read from server\n");
char buffer[BUFFER_SIZE];
int read_bytes = read(sockfd, buffer, BUFFER_SIZE - 1);
printf("received: %d\n",read_bytes);
if (read_bytes < 0) {
perror("Could not read from socket");
exit(-1);
}
buffer[read_bytes] = '\0';
printf("received: %s\n", buffer);
sleep(2);
int written_bytes = write(sockfd, buffer, read_bytes);
printf("written: %d\n", written_bytes);
if (written_bytes < 0) {
perror("Could not write to socket");
exit(-1);
}
}
sleep(2);
}
close(sockfd);
}
Serwer:
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <poll.h>
#define ADDR "127.0.0.1"
#define PORT 9001
#define QUEUE_SIZE 16
#define BUFFER_SIZE 256
int echo(int client_sockfd) {
printf("> echo\n");
struct pollfd client_sock_fds[] = {
{client_sockfd, POLLIN}
};
while (1) {
int result = poll(client_sock_fds, 1, 0);
if (result < 0) {
perror("Could not poll");
exit(-1);
}
printf("poll returned %d\n", result);
if (client_sock_fds[0].revents & POLLERR) {
printf("error\n");
close(client_sockfd);
return 0;
}
if (client_sock_fds[0].revents & POLLHUP) {
printf("hangup\n");
close(client_sockfd);
return 0;
}
if (client_sock_fds[0].revents & POLLNVAL) {
printf("invalid\n");
close(client_sockfd);
return 0;
}
if (client_sock_fds[0].revents & POLLIN) {
printf("something to read from client\n");
char buffer[BUFFER_SIZE];
int read_bytes = read(client_sockfd, buffer, BUFFER_SIZE - 1);
printf("received: %d\n", read_bytes);
if (read_bytes < 0) {
perror("Could not read from client");
exit(-1);
}
buffer[read_bytes] = '\0';
printf("received: %s\n", buffer);
sleep(2);
int written_bytes = write(client_sockfd, buffer, read_bytes);
printf("written: %d\n", written_bytes);
if (written_bytes < 0) {
perror("Could not write to socket");
exit(-1);
}
}
sleep(2);
}
printf("< echo\n");
close(client_sockfd);
return 0;
}
int main(int argc, char *argv[]) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("Could not create socket");
exit(-1);
}
struct sockaddr_in server_sockaddr;
server_sockaddr.sin_family = AF_INET;
server_sockaddr.sin_port = htons(PORT);
server_sockaddr.sin_addr.s_addr = inet_addr(ADDR);
if (server_sockaddr.sin_addr.s_addr < 0) {
perror("Error converting IP address");
exit(-1);
}
int result = bind(sockfd, (struct sockaddr *) &server_sockaddr, sizeof(server_sockaddr));
if (result < 0) {
perror("Error binding socket");
exit(-1);
}
result = listen(sockfd, QUEUE_SIZE);
if (result < 0) {
perror("Error setting up socket to listen");
exit(-1);
}
while (1) {
struct sockaddr_in client_sockaddr;
socklen_t client_sockaddr_size;
int client_sockfd = accept(sockfd, (struct sockaddr *) &client_sockaddr, &client_sockaddr_size);
echo(client_sockfd);
}
}
Demultipleksacja gniazd¶
void echo(int client_sockfd[]) {
printf("> echo\n");
struct pollfd client_sock_fds[] = {
{client_sockfd[0], POLLIN},
{client_sockfd[1], POLLIN},
{client_sockfd[2], POLLIN}
};
while (1) {
int result = poll(client_sock_fds, 3, 0);
if (result < 0) {
perror("Could not poll");
exit(-1);
}
printf("poll returned %d\n", result);
for (int i = 0; i < 3; i++) {
// ...
}
}
}
Zadania¶
- Napisz wielowątkowy połączeniowy serwer
mt_echo
, który obsługuje każdego nowego klienta w osobnym wątku. - Napisz system
chat
z centralnym koordynatorem, gdzie wielu klientów podłącza się do centralnego serwera. Serwer rozsyła wiadomość otrzymaną od dowolnego z klientów do wszystkich innych klientów w systemie. - Napisz system
mt_chat
który działa tak jakchat
, ale każdy klient obsługiwany jest przez serwer za pomocą osobnego wątku.