Serwery współbieżne

Serwer współbieżny obsługuje klientów za pomocą wielu wątków lub procesów.

Przykład

Serwer:

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>

#define MULTITHREAD

#define ADDR "127.0.0.1"
#define PORT 9001
#define QUEUE_SIZE 16
#define BUFFER_SIZE 256

void *echo(void *arg) {
    printf("> echo\n");
    int client_sockfd = *(int *) arg;

    char buffer[BUFFER_SIZE];
    int read_bytes = read(client_sockfd, buffer, BUFFER_SIZE);
    printf("received: %d\n", read_bytes);
    if (read_bytes < 0) {
        perror("Could not read from socket");
        exit(-1);
    }
    printf("received: %s\n", buffer);

    int written_bytes = write(client_sockfd, buffer, read_bytes);
    printf("written: %d\n", written_bytes);
    if (written_bytes < 0) {
        perror("Could not write to socket");
        exit(-1);
    }

    close(client_sockfd);
    printf("< echo\n");

    return 0;
}

int main(int argc, char *argv[]) {

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("Could not create socket");
        exit(-1);
    }

    struct sockaddr_in server_sockaddr;
    server_sockaddr.sin_family = AF_INET;
    server_sockaddr.sin_port = htons(PORT);
    server_sockaddr.sin_addr.s_addr = inet_addr(ADDR);

    if (server_sockaddr.sin_addr.s_addr < 0) {
        perror("Error converting IP address");
        exit(-1);
    }

    int result = bind(sockfd, (struct sockaddr *) &server_sockaddr, sizeof(server_sockaddr));
    if (result < 0) {
        perror("Error binding socket");
        exit(-1);
    }

    result = listen(sockfd, QUEUE_SIZE);
    if (result < 0) {
        perror("Error setting up socket to listen");
        exit(-1);
    }

    while (1) {
        struct sockaddr_in client_sockaddr;
        socklen_t client_sockaddr_size;
        int *client_sockfd = malloc(sizeof(int));

        *client_sockfd = accept(sockfd, (struct sockaddr *) &client_sockaddr, &client_sockaddr_size);
        #ifdef MULTITHREAD
        pthread_t client_thread;
        result = pthread_create(&client_thread, NULL, echo, (void *) client_sockfd);
        if (result < 0) {
            perror("Could not create thread");
            exit(-1);
        }
        pthread_detach(client_thread);

        #else
        echo((void *) client_sockfd);

        #endif
    }
}

Klient:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>

#define ADDR "127.0.0.1"
#define PORT 9001
#define BUFFER_SIZE 256

int main(int argc, char *argv[]) {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("Could not create socket");
        exit(-1);
    }

    struct sockaddr_in server_sockaddr;
    server_sockaddr.sin_family = AF_INET;
    server_sockaddr.sin_port = htons(PORT);
    server_sockaddr.sin_addr.s_addr = inet_addr(ADDR);

    int result = connect(sockfd, (struct sockaddr *) &server_sockaddr, sizeof(server_sockaddr));
    if (result < 0) {
        perror("Could not connect to socket");
        exit(-1);
    }

    int written_bytes = write(sockfd, "hello", 5);
    printf("written: %d\n", written_bytes);
    if (written_bytes < 0) {
        perror("Could not write to socket");
        exit(-1);
    }

    char buffer[BUFFER_SIZE];
    int read_bytes = read(sockfd, buffer, BUFFER_SIZE);
    printf("received: %d\n", written_bytes);
    if (read_bytes < 0) {
        perror("Could not read from socket");
        exit(-1);
    }
    printf("received: %s\n", buffer);

    close(sockfd);
}

Monitorowanie gniazd BSD

  • “Klasyczna” metoda monitorowania deskryptorów gniazd (omijać):

    #include <unistd.h>
    int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
    
    • fd_set *readfds: deskryptory plików, w których przypadku czekamy na nowe dane które będzie można odczytać z desktyptora; po wykonaniu operacji argument jest modyfikowany i zawiera tylko te deskryptory które uległy zmianom.
    • fd_set *writefds: deskryptory plików, w których przypadku czekamy aż będzie możliwe wykonanie operacji bez blokowania; po wykonaniu operacji argument jest modyfikowany i zawiera tylko te deskryptory które uległy zmianom.
    • fd_set *exceptfds: deskryptory plików w których przypadku czekamy na wyjątki; po wykonaniu operacji argument jest modyfikowany i zawiera tylko te deskryptory które uległy zmianom.
    • int nfds: next file descriptor – najwyższy z podanych do monitorowania deskryptorów plików + 1.
    • struct timeval *timeout: maksymalny czas czekania lub NULL.
    • zwraca liczbę deskryptorów które uległy odpowiednim zmianom, lub -1 w wypadku błędu.
  • Poll:

    #include <poll.h>
    int poll(struct pollfd *fds, nfds_t nfds, int timeout);
    
    • struct pollfd *fds: zbiór deskryptorów plików do monitorowania:

      struct pollfd {
          int fd;         // Desktyptor
          short events;   // Oczekiwane zdarzenia
          short revents;  // Zdarzenia, które wystąpiły
      }
      

      zdarzenia:

      • POLLIN: nowe dane do przeczytania (lub gniazdo zostało zamknięte),
      • POLLOUT: można zapisać dane bez blokowania.
    • nfds_t nfds: liczba deskryptorów w fds,

    • int timeout: maksymalny czas oczekiwania na zmiany (negatywna wartość oznacza oczekiwanie bez ograniczeń czasowych),

    • zwraca 0 jeśli skończy się wyznaczony czas, wartość <0 w przypadku błędu, >0 w przypadku poprawnego wykonania.

Oczekiwanie na zdarzenia

Klient:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <poll.h>

#define ADDR "127.0.0.1"
#define PORT 9001
#define BUFFER_SIZE 256

int main(int argc, char *argv[]) {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("Could not create socket");
    }

    struct sockaddr_in server_sockaddr;
    server_sockaddr.sin_family = AF_INET;
    server_sockaddr.sin_port = htons(PORT);
    server_sockaddr.sin_addr.s_addr = inet_addr(ADDR);

    int result = connect(sockfd, (struct sockaddr *) &server_sockaddr, sizeof(server_sockaddr));
    if (result < 0) {
        perror("Could not connect to socket");
        exit(-1);
    }

    int written_bytes = write(sockfd, "hello\n", 6);
    printf("written: %d\n", written_bytes);
    if (written_bytes < 0) {
        perror("Could not write to socket");
        exit(-1);
    }

    struct pollfd sock_fds[] = {
        {sockfd, POLLIN}
    };

    while (1) {
    int result = poll(sock_fds, 1, 0);
    if (result < 0) {
        perror("Could not poll");
        exit(-1);
    }
    printf("poll returned %d\n", result);

    if (sock_fds[0].revents & POLLERR) {
        printf("error\n");
        close(sockfd);
        return 0;
    }

    if (sock_fds[0].revents & POLLHUP) {
        printf("hangup\n");
        close(sockfd);
        return 0;
    }

    if (sock_fds[0].revents & POLLNVAL) {
        printf("invalid\n");
        close(sockfd);
        return 0;
    }

    if (sock_fds[0].revents & POLLIN) {
        printf("something to read from server\n");

        char buffer[BUFFER_SIZE];
        int read_bytes = read(sockfd, buffer, BUFFER_SIZE - 1);
        printf("received: %d\n",read_bytes);
        if (read_bytes < 0) {
        perror("Could not read from socket");
        exit(-1);
        }

        buffer[read_bytes] = '\0';
        printf("received: %s\n", buffer);

        sleep(2);

        int written_bytes = write(sockfd, buffer, read_bytes);
        printf("written: %d\n", written_bytes);
        if (written_bytes < 0) {
            perror("Could not write to socket");
            exit(-1);
        }
    }

    sleep(2);
    }

    close(sockfd);
}

Serwer:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <poll.h>

#define ADDR "127.0.0.1"
#define PORT 9001
#define QUEUE_SIZE 16
#define BUFFER_SIZE 256

int echo(int client_sockfd) {
    printf("> echo\n");

    struct pollfd client_sock_fds[] = {
        {client_sockfd, POLLIN}
    };

    while (1) {
    int result = poll(client_sock_fds, 1, 0);
    if (result < 0) {
        perror("Could not poll");
        exit(-1);
    }
    printf("poll returned %d\n", result);

    if (client_sock_fds[0].revents & POLLERR) {
        printf("error\n");
        close(client_sockfd);
        return 0;
    }

    if (client_sock_fds[0].revents & POLLHUP) {
        printf("hangup\n");
        close(client_sockfd);
        return 0;
    }

    if (client_sock_fds[0].revents & POLLNVAL) {
        printf("invalid\n");
        close(client_sockfd);
        return 0;
    }

    if (client_sock_fds[0].revents & POLLIN) {
        printf("something to read from client\n");

        char buffer[BUFFER_SIZE];
        int read_bytes = read(client_sockfd, buffer, BUFFER_SIZE - 1);
        printf("received: %d\n", read_bytes);
        if (read_bytes < 0) {
            perror("Could not read from client");
            exit(-1);
        }

        buffer[read_bytes] = '\0';
        printf("received: %s\n", buffer);

        sleep(2);

        int written_bytes = write(client_sockfd, buffer, read_bytes);
        printf("written: %d\n", written_bytes);
        if (written_bytes < 0) {
            perror("Could not write to socket");
            exit(-1);
        }
    }

    sleep(2);
    }

    printf("< echo\n");

    close(client_sockfd);
    return 0;
}

int main(int argc, char *argv[]) {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("Could not create socket");
        exit(-1);
    }

    struct sockaddr_in server_sockaddr;
    server_sockaddr.sin_family = AF_INET;
    server_sockaddr.sin_port = htons(PORT);
    server_sockaddr.sin_addr.s_addr = inet_addr(ADDR);

    if (server_sockaddr.sin_addr.s_addr < 0) {
        perror("Error converting IP address");
        exit(-1);
    }

    int result = bind(sockfd, (struct sockaddr *) &server_sockaddr, sizeof(server_sockaddr));
    if (result < 0) {
        perror("Error binding socket");
        exit(-1);
    }

    result = listen(sockfd, QUEUE_SIZE);
    if (result < 0) {
        perror("Error setting up socket to listen");
        exit(-1);
    }

    while (1) {
    struct sockaddr_in client_sockaddr;
    socklen_t client_sockaddr_size;

    int client_sockfd = accept(sockfd, (struct sockaddr *) &client_sockaddr, &client_sockaddr_size);
    echo(client_sockfd);
    }
}

Demultipleksacja gniazd

void echo(int client_sockfd[]) {
     printf("> echo\n");


     struct pollfd client_sock_fds[] = {
         {client_sockfd[0], POLLIN},
         {client_sockfd[1], POLLIN},
         {client_sockfd[2], POLLIN}
     };

     while (1) {
         int result = poll(client_sock_fds, 3, 0);
         if (result < 0) {
             perror("Could not poll");
             exit(-1);
         }
         printf("poll returned %d\n", result);

         for (int i = 0; i < 3; i++) {
             // ...
         }
     }
 }

Zadania

  1. Napisz wielowątkowy połączeniowy serwer mt_echo, który obsługuje każdego nowego klienta w osobnym wątku.
  2. Napisz system chat z centralnym koordynatorem, gdzie wielu klientów podłącza się do centralnego serwera. Serwer rozsyła wiadomość otrzymaną od dowolnego z klientów do wszystkich innych klientów w systemie.
  3. Napisz system mt_chat który działa tak jak chat, ale każdy klient obsługiwany jest przez serwer za pomocą osobnego wątku.