Programowanie Wielowątkowe w Języku C (powtórzenie)

Wątek to najbardziej podstawowy fragment kodu który może być zarządzany przez scheduler w sposób niezależny.

Każdy proces systemu operacyjnego wykonuje przynajmniej jeden wątek (wątek główny). Każdy proces może uruchomić kilka (równorzędnych) współbieżnych wątków.

Wątki współdzielą używają identyfikatora procesu, przestrzeni adresowej i tablicy otwartych plików procesu z którego pochodzą. Ponieważ współdzielą pamięć i otwarte pliki, to wymagają synchronizacji przy dostępie do nich.

Wątki działają tak długo jak działa proces który je uruchomił: jeśli jakikolwiek z wątków procesu zamknie proces, wszystkie inne wątki także zostaną zakończone. W szczególności: jeśli wątek główny zakończy wykonywanie funkcji main, wszystkie wątki zostają zakończone.

Tworzenie nowego wątku jest tańsze niż tworzenie nowego procesu.

Kompilacja

Należy podczas kompilacji podlinkować bibliotekę pthreads:

  • gcc -pthread source.c -o program

API

  • Utworzenie wątku

    int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
    
    • pthread_t *thread: wskaźnik do wątku
    • pthread_attr_t *attr: parametry wątku
    • *(*start_routine) (void *): wskaźnik na funkcję wykonywaną przez wątek, funkcja ta musi przyjmować 1 argument typu ``void * i zwracać wynik typu void *
    • void *arg argument przekazywany funkcji start_routine
    • zwraca 0 w wypadku poprawnego wykonania lub kod błędu

    Przykład:

    #include <stdio.h>
    #include <pthread.h>
    #include <unistd.h>
    
    void *hello_world(void *arg) {
        char *s = (char *) arg;
        printf("Hello %s!\n", s);
        return 0;
    }
    
    int main(int argc, char* argv[]) {
        pthread_t hello_world_thread;
        int result = pthread_create(&hello_world_thread, NULL, hello_world, (void *) argv[0]);
        if (result != 0) {
            perror("Could not create thread.");
        }
        //sleep(3);
        printf("Main thread exiting.\n");
        return 0;
    }
    
  • Oczekiwanie na zamknięcie wątku:

    int pthread_join(pthread_t thread, void **retval);
    
    • pthread_t thread: wątek na który czekamy (nie wskaźnik)
    • void **retval: wartość zwracana przez wątek
    • zwraca 0 w wypadku poprawnego wykonania lub kod błędu

    Przykład:

    #include <stdio.h>
    #include <pthread.h>
    #include <unistd.h>
    
    void *hello_world(void *arg) {
        char *s = (char *) arg;
        printf("Hello %s!\n", s);
        return 0;
    }
    
    int main(int argc, char* argv[]) {
        pthread_t hello_world_thread;
        int result = pthread_create(&hello_world_thread, NULL, hello_world, (void *) argv[0]);
        if (result != 0) {
            perror("Could not create thread.");
        }
    
        pthread_join(hello_world_thread, NULL);
        printf("Main thread exiting.\n");
    
        return 0;
    }
    
    #include <stdio.h>
    #include <pthread.h>
    #include <unistd.h>
    
    void *hello_world(void *arg) {
        char *s = (char *) arg;
        printf("Hello %s!\n", s);
    
        int *result = malloc(sizeof(int));
        *result = 15;
        return result;
    }
    
    int main(int argc, char* argv[]) {
        pthread_t hello_world_thread;
        int result = pthread_create(&hello_world_thread, NULL, hello_world, (void *) argv[0]);
        if (result != 0) {
            perror("Could not create thread.");
        }
    
        int *hello_world_result;
        pthread_join(hello_world_thread, (void **) &hello_world_result);
        printf("Thread returned, %d.\n", *hello_world_result);
    
        return 0;
    }
    

    Każdy wątek powinien być połączony z innym wątkiem (np głównym) ponieważ wątek zwalnia swoje zasoby (do procesu) dopiero w momencie wykonania na nim join-a.

  • Odłączenie wątku:

    int pthread_detach(pthread_t thread);
    
    • pthread_t thread: wątek

    Wątek odłączony zwraca wszystkie zasoby bez potrzby wykonania join-a. Na wątku odłączonym nie można wykonać join-a.

  • Zakończenie aktualnego wątku:

    void pthread_exit(void *retval);
    
    • void *retval: wartość zwracana przez bieżący wątek
    #include <stdio.h>
    #include <pthread.h>
    #include <unistd.h>
    
    void *hello_world(void *arg) {
        char *s = (char *) arg;
        printf("Hello %s!\n", s);
    
        int *result = malloc(sizeof(int));
        *result = 15;
        pthread_exit(result);
    
        printf("Not shown\n");
    }
    
    int main(int argc, char* argv[]) {
        pthread_t hello_world_thread;
        int result = pthread_create(&hello_world_thread, NULL, hello_world, (void *) argv[0]);
        if (result != 0) {
            perror("Could not create thread.");
        }
    
        int *hello_world_result;
        pthread_join(hello_world_thread, (void **) &hello_world_result);
        printf("Thread returned, %d.\n", *hello_world_result);
    
        return 0;
    }
    

    Linux Programmer’s Manual: To allow other threads to continue execution, the main thread should terminate by calling pthread_exit() rather than exit(3).

  • Zakończenie innego wątku:

    int pthread_cancel(pthread_t thread);
    
    • pthread_t thread: wątek do zamknięcia
  • Unimożliwienie zakończenia bieżącego wątku przez inny wątek:

    int pthread_setcancelstate(int state, int *oldstate);
    
    • int state: nowy stan wątku: PTHREAD_CANCEL_ENABLE, PTHREAD_CANCEL_DISABLE, ...
    • int *oldstate: poprzedni stan wątku

Synchronizacja

Zamki wyłączne

  • Inicjalizacja i usunięcie zamka:

    int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);
    int pthread_mutex_destroy(pthread_mutex_t *mutex);
    
    • pthread_mutex_t *mutex: zamek
    • pthread_mutexattr_t *mutexattr: atrybuty zamka:
  • Operacje na zamkach

    int pthread_mutex_lock(pthread_mutex_t *mutex);
    int pthread_mutex_unlock(pthread_mutex_t *mutex);
    
    • pthread_mutex_t *mutex: zamek
    • zwraca 0 w wypadku poprawnego wykonania, lub numer błędu w przeciwnym wypadku
    int pthread_mutex_trylock(pthread_mutex_t *mutex);
    
    • pthread_mutex_t *mutex: zamek
    • zwraca 0 w wypadku pobrania zamka, lub inną wartość jeśli zamek nie został pobrany lub jeśli wystąpił błąd.
    #include <stdio.h>
    #include <pthread.h>
    #include <unistd.h>
    
    #define THREAD_COUNT 16
    
    int counter = 0;
    
    void *incrementer_thread(void *arg) {
        printf("Start thread.");
        int temp = counter;
        sleep(1);
        counter = temp + 1;
        printf("Done.");
    }
    
    int main(int argc, char* argv[]) {
        pthread_t incrementer_threads[THREAD_COUNT];
    
        for (int i = 0; i < THREAD_COUNT; i++) {
            int result = pthread_create(&(incrementer_threads[i]), NULL, incrementer_thread, NULL);
                if (result != 0) {
                    perror("Could not create thread.");
            }
        }
    
        for (int i = 0; i < THREAD_COUNT; i++) {
                pthread_join(incrementer_threads[i], NULL);
        }
    
        printf("\nCounter value after executing %d threads is %d.\n", THREAD_COUNT, counter);
        return 0;
    }
    
    #include <stdio.h>
    #include <pthread.h>
    #include <unistd.h>
    
    #define THREAD_COUNT 16
    
    int counter = 0;
    pthread_mutex_t counter_lock;
    
    void *incrementer_thread(void *arg) {
        printf("Start thread.");
        pthread_mutex_lock(&counter_lock);
        int temp = counter;
        sleep(1);
        counter = temp + 1;
        pthread_mutex_unlock(&counter_lock);
        printf("Done.");
    }
    
    int main(int argc, char* argv[]) {
        pthread_t incrementer_threads[THREAD_COUNT];
    
        pthread_mutex_init(&counter_lock, NULL);
    
        for (int i = 0; i < THREAD_COUNT; i++) {
            int result = pthread_create(&(incrementer_threads[i]), NULL, incrementer_thread, NULL);
                if (result != 0) {
                    perror("Could not create thread.");
            }
        }
    
        for (int i = 0; i < THREAD_COUNT; i++) {
            pthread_join(incrementer_threads[i], NULL);
        }
    
        pthread_mutex_destroy(&counter_lock);
    
        printf("\nCounter value after executing %d threads is %d.\n", THREAD_COUNT, counter);
        return 0;
    }
    

Zamki zapis/odczyt

  • Analogiczne operacje do zamków wyłacznych, podział na zapis i odczyt:

    int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *rwlockattr);
    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
    int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
    
    #include <stdio.h>
    #include <pthread.h>
    #include <unistd.h>
    
    #define THREAD_COUNT 16
    
    int counter = 0;
    pthread_mutex_t counter_lock;
    
    void *reader_thread(void *arg) {
        printf("Start reader.\n");
        pthread_mutex_lock(&counter_lock);
        int temp = counter;
        sleep(1);
        pthread_mutex_unlock(&counter_lock);
        printf("Read %d.\n", temp);
    }
    
    void *writer_thread(void *arg) {
        printf("Start writer.\n");
        pthread_mutex_lock(&counter_lock);
        int temp = counter;
        temp += 1;
        sleep(1);
        counter = temp;
        pthread_mutex_unlock(&counter_lock);
        printf("Written %d.\n", temp);
    }
    
    int main(int argc, char* argv[]) {
        pthread_t threads[THREAD_COUNT];
    
        pthread_mutex_init(&counter_lock, NULL);
    
        for (int i = 0; i < THREAD_COUNT; i++) {
            int result;
            if ((i == 4) || (i == 12)) {
              result = pthread_create(&(threads[i]), NULL, writer_thread, NULL);
            } else {
              result = pthread_create(&(threads[i]), NULL, reader_thread, NULL);
            }
            if (result != 0) {
                perror("Could not create thread.");
            }
        }
    
        for (int i = 0; i < THREAD_COUNT; i++) {
            pthread_join(threads[i], NULL);
        }
    
        pthread_mutex_destroy(&counter_lock);
    
        printf("\nCounter value after executing %d threads is %d.\n", THREAD_COUNT, counter);
        return 0;
    }
    
    #include <stdio.h>
    #include <pthread.h>
    #include <unistd.h>
    
    #define THREAD_COUNT 16
    
    int counter = 0;
    pthread_rwlock_t counter_lock;
    
    void *reader_thread(void *arg) {
        printf("Start reader.\n");
        pthread_rwlock_rdlock(&counter_lock);
        int temp = counter;
        sleep(1);
        pthread_rwlock_unlock(&counter_lock);
        printf("Read %d.\n", temp);
    }
    
    void *writer_thread(void *arg) {
        printf("Start writer.\n");
        pthread_rwlock_wrlock(&counter_lock);
        int temp = counter;
        temp += 1;
        sleep(1);
        counter = temp;
        pthread_rwlock_unlock(&counter_lock);
        printf("Written %d.\n", temp);
    }
    
    int main(int argc, char* argv[]) {
        pthread_t threads[THREAD_COUNT];
    
        pthread_rwlock_init(&counter_lock, NULL);
    
        for (int i = 0; i < THREAD_COUNT; i++) {
            int result;
            if ((i == 4) || (i == 12)) {
              result = pthread_create(&(threads[i]), NULL, writer_thread, NULL);
            } else {
              result = pthread_create(&(threads[i]), NULL, reader_thread, NULL);
            }
            if (result != 0) {
                perror("Could not create thread.");
            }
        }
    
        for (int i = 0; i < THREAD_COUNT; i++) {
            pthread_join(threads[i], NULL);
        }
    
        pthread_rwlock_destroy(&counter_lock);
    
        printf("\nCounter value after executing %d threads is %d.\n", THREAD_COUNT, counter);
        return 0;
    }
    

    (Kompilacja -std=gnu99)

Wait-notify

  • Inicjalizacja i usunięcie zmiennej warunkowej:

    int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *condattr);
    int pthread_cond_destroy(pthread_cond_t *cond);
    
    • pthread_cond_t *cond: zmienna warunkowa
    • pthread_condattr_t *condattr: konfiguracja zmiennej warunkowej
  • Czekanie na zmiennej warunkowej:

    int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
    
    • pthread_cond_t *cond: zmienna warunkowa
    • pthread_mutex_t *mutex: zamek

    Czekanie na zmiennej warunkowej zawsze występuje w sekcji krytycznej zamka (tzn. wątek który rozpoczyna czekanie uprzednio pobrał zamek).

  • Sygnalizowanie zmiennej warunkowej:

    int pthread_cond_signal(pthread_cond_t *cond);
    int pthread_cond_broadcast(pthread_cond_t *cond);
    
    • pthread_cond_t *cond: zmienna warunkowa

    Sygnalizowanie kończy przywraca do aktywności jeden (dowolny) lub wszystkie (kolejno) czekające wątki.

  • Przykład:

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    
    #define PUSH_THREAD_COUNT 8
    #define POP_THREAD_COUNT 2
    #define STACK_SIZE 6
    
    int stack[STACK_SIZE];
    int stack_pointer = 0;
    
    pthread_mutex_t stack_lock;
    pthread_cond_t stack_condition;
    
    void *push_thread(void *arg) {
        int value = *((int *) arg);
    
        pthread_mutex_lock(&stack_lock);
    
        // Stack is full: wait (release mutex for the duration)
        while (stack_pointer >= STACK_SIZE) {
            printf("Waiting to push value %d.\n", value);
            pthread_cond_wait(&stack_condition, &stack_lock);
        }
    
        // There's room in the stack: put more stuff on and notify all
        printf("Pushing value %d.\n", value);
        stack[stack_pointer++] = value;
        sleep(1);
        pthread_cond_broadcast(&stack_condition);
    
        pthread_mutex_unlock(&stack_lock);
    }
    
    void *pop_thread(void *arg) {
        pthread_mutex_lock(&stack_lock);
    
        // Stack is empty: wait (release mutex for the duration)
        while (stack_pointer <= 0) {
            printf("Waiting to pop value.\n");
            pthread_cond_wait(&stack_condition, &stack_lock);
        }
    
        // There's room in the stack: put more stuff on and notify all
        int value = stack[--stack_pointer];
        stack[stack_pointer] = 0;
        printf("Popping value %d.\n", value);
        sleep(1);
        pthread_cond_broadcast(&stack_condition);
    
        pthread_mutex_unlock(&stack_lock);
    }
    
    int main(int argc, char* argv[]) {
        pthread_t pop_threads[POP_THREAD_COUNT];
        pthread_t push_threads[PUSH_THREAD_COUNT];
    
        pthread_mutex_init(&stack_lock, NULL);
        pthread_cond_init(&stack_condition, NULL);
    
        for (int i = 0; i < POP_THREAD_COUNT; i++) {
            int result = pthread_create(&(pop_threads[i]), NULL, pop_thread, NULL);
                if (result != 0) {
                    perror("Could not create pop thread.");
                }
        }
    
        for (int i = 0; i < PUSH_THREAD_COUNT; i++) {
            int *value = (int *) malloc(sizeof(int));
            (*value) = i + 1;
            int result = pthread_create(&(push_threads[i]), NULL, push_thread, value);
                if (result != 0) {
                    perror("Could not create push thread.");
                }
        }
    
        for (int i = 0; i < PUSH_THREAD_COUNT; i++) {
                pthread_join(push_threads[i], NULL);
        }
    
        for (int i = 0; i < POP_THREAD_COUNT; i++) {
                pthread_join(pop_threads[i], NULL);
        }
    
        pthread_mutex_destroy(&stack_lock);
    
        printf("\nStack after execution of %d push and %d pop threads.\n", PUSH_THREAD_COUNT, POP_THREAD_COUNT);
        for (int i = 0; i < stack_pointer; i++) {
            printf("[%d] => %d\n", i, stack[i]);
        }
    
        return 0;
    }
    

Zadania

1. Napisz program map-reduce-len który rozkłada wykonanie obliczeń na zbiorze danych na wiele wątków zgodnie z techniką map-reduce. Konkretnie, program oblicza sumę długość jakiegoś tekstu w sposób współbieżny. Użytkownik podaje przez parametr/argumenty tekst \(S\). Program dzieli ten tekst na frac{n} ~równych fragmentów. Dla każdego takiego fragmentu \(S_i\) program tworzy wątek \(T_i\), który oblicza długość \(S_i\) i zwraca wynik do wątku głównego. Następnie wątek główny sumuje wyniki z poszczególnych wątków i wypisuje sumę na ekran.

2. Napisz program gl-histogram który tworzy histogram liter dla jakiegoś tekstu w sposób wielowątkowy. Użytkownik podaje przez parametr/argumenty tekst \(S\). Program dzieli ten tekst na frac{n} ~równych fragmentów. Dla każdego takiego fragmentu \(S_i\) program tworzy wątek \(T_i\). Wątek \(T_i\) liczy liczbę wystąpień poszczególnych liter w \(S_i\) i wpisuje wyniki dla poszczególnych liter do globalnej współdzieloną tablicę \(H\). Po zakończeniu wykonania wszystkich wątków, wątek główny wypisuje na ekran zawartość \(H\). Zapewnij synchronizację wątków używając zamka wyłącznego.

3. Napisz program fgl-histogram który dodaje do gl-histogram wydajną synchronizację: zamiast używać jednego zamka wyłącznego zastosuj tablicę zamków współdzielonych. Porównaj wydajność systemu.

4. Napisz program prod-cons który rozwiązuje problem producent-konsument. W pamięci istnieje kolejka FIFO \(Q\). Wątek pełniący rolę producenta generuje kolejne liczby całkowite i dopisuje je na koniec \(Q\). Jeśli w kolejce nie ma miejsca na kolejną wartość, wątek czeka aż miejsce się zwolni. Wątek pełniący rolę konsumenta odczytuje kolejne liczby z początku \(Q\) i wypisuje na ekran. Konsument usuwa odczytane wartości z \(Q\). Jeśli kolejka jest pusta, konsument czeka aż w kolejce znajdą się nowe wartości. Sprawdź zachowanie systemu w przypadku jeśli konsument jest dużo szybszy niż producent i vice versa.

5. Napisz program n-prod-m-cons który jest rozszerzeniem programu prod-cons, gdzie jednocześnie istnieje \(n\) producentów i \(m\) konsumentów. Wartości \(n\) i \(m\) są podawane przez argumenty.

  1. https://deadlockempire.github.io/