Wprowadzenie

  • Dotychczas uruchamiane programy działały sekwencyjnie tzn. instrukcja po instrukcji
  • Działały one w jednym wątku tzw. wątku głównym
  • Można napisać jednak program tak, żeby wątek główny tworzył inne
  • W efekcie różne części kodu będą wykonywały się równolegle (ang. parallel)
  • Obecnie właściwie każdy komputer (ze smartfonami włącznie) posiada kilka rdzeni CPU, zatem równoległość zamienia się nawet we współbieżność (jednoczesne wykonanie różnego kodu w tej samej jednostce czasu, ang. concurrency)
  • Jak w systemach operacyjnych zaimplementowana jest równoległość?
    • Istnieje lista wszystkich utworzonych wątków
    • System operacyjny wybiera pierwszy element i przydziela mu kwant czasu (rzędu milisekund)
    • Wątek wykonuje swój kod przez określony czas
    • Stan wątku zostaje zapisany (wszystkie wartości rejestrów, zmiennych lokalnych, itp.)
    • Wątek przechodzi na koniec listy w trybie nieaktywnym i oczekuje na kolejny przydział kwantu czasu
  • Odpowiedni przydział długości kwantu czasu daje efekt, że wątki wykonują się "stale" (nieprzerwanie z punktu widzenia użytkownika), a przy tym każdy wątek otrzyma swój przydział i wykona swój kod
  • Powyższe stanowi BARDZO zgrubny opis, bo w rzeczywistości pod uwagę brane są takie rzeczy jak:
    • Operacje I/O (np. czekanie kilkaset milisekund na odczyt pliku z dysku magnetycznego - wówczas wątek i tak nie wykona kodu)
    • Priorytety (np. pewne systemowe wątki są ważniejsze niż odtwarzanie filmów w przeglądarce)
    • i wiele, wiele innych...

Zalety i wady równoległości

  • Jeśli program jest dekomponowalny i mamy np. 2 rdzenie, można osiągnąć 2-krotne przyspieszenie
  • Przykład: dana jest tablica 1000 liczb całkowitych, należy każdą z nich podnieść do kwadratu
    • Sekwencyjny kod, musi wykonać pętlę 1000 razy
    • Współbieżny kod na 2 CPU wykona się szybciej (można podzielić tablicę na indeksy 0..499 oraz 500..999)
    • W ogólności (i czysto teoretycznie), można by nawet użyć 1000 CPU by każdy wyliczył jedną tylko wartość
  • Ale wiele rzeczywistych problemów jest bardziej skomplikowana, wymaga komunikacji między wątkami i/lub synchronizacji
  • Sposób przydziału kwantów czasu jest w pewnym sensie chaotyczny tzn. mimo, że jest to znany z góry algorytm działający na znanym sprzęcie, itp. niemożliwe jest w ogólności wyznaczenie dokładnie który wątek kiedy się wykona (to bardzo ważna uwaga!)
  • Wątki mogą mieć własną przestrzeń zmiennych ("widoczną" tylko dla nich samych) lub globalną i współdzieloną
  • Biorąc pod uwagę chaotyczność przydziału czasu CPU do wątków, brak elementów synchronizacji dostępu do danych współdzielonych powoduje zupełnie różne wyniki przetwarzania, w różnych uruchomieniach tego samego programu

Wątki w C

Szablon kodu

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

void *hello_world(void *data) {
    printf("Hello World from thread %d\n", (int) data);
}

int main() {
    int i, n;
    pthread_t *threads;

    scanf("%d", &n);
    threads = (pthread_t*) malloc(n * sizeof(pthread_t));

    for (i = 0; i < n; i++) {
        pthread_create(&threads[i], NULL, hello_world, (void*) i);
    }

    for (i = 0; i < n; i++) {
        pthread_join(threads[i], NULL);
    }

    free(threads);
    return 0;
}
Hello World from thread 0
Hello World from thread 3
Hello World from thread 2
Hello World from thread 1

Hello World from thread 1
Hello World from thread 0
Hello World from thread 2
Hello World from thread 3

Demonstracja liniowego przyspieszenia

#include <limits.h>
#include <math.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

void *iterate(void *data) {
    int i;
    int m = (int) data;

    printf("Counting to %d\n", m);

    for (i = 0; i < m; i++) {
        sqrt(i);
    }
}

int main() {
    int i, n;
    pthread_t *threads;

    scanf("%d", &n);
    threads = (pthread_t*) malloc(n * sizeof(pthread_t));

    for (i = 0; i < n; i++) {
        pthread_create(&threads[i], NULL, iterate, INT_MAX / n);
    }

    for (i = 0; i < n; i++) {
        pthread_join(threads[i], NULL);
    }

    free(threads);
    return 0;
}

Demonstracja problemów z brakiem synchronizacji

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

int sum = 0;

void *compute(void *data) {
    int i;

    for (i = 0; i < 1000; i++) {
        sum++;
    }
}

int main() {
    int i, n;
    pthread_t *threads;

    scanf("%d", &n);
    threads = (pthread_t*) malloc(n * sizeof(pthread_t));

    for (i = 0; i < n; i++) {
        pthread_create(&threads[i], NULL, compute, NULL);
    }

    for (i = 0; i < n; i++) {
        pthread_join(threads[i], NULL);
    }

    printf("%d\n", sum);

    free(threads);
    return 0;
}
  • Jaki będzie wynik dla np. 32000 wątków?
  • Należy użyć semafora. Jest to specjalny licznik:
    • Jest widziany przez wiele wątków jednocześnie
    • Ma pewną początkową wartość
    • Ma operację zmniejszenia lub zwiększenia wartości o 1
    • Jeśli aktualnie ma wartość 0, to nie można go zmniejszyć i wątek, który by to próbował zrobić będzie czekał dopóki inny nie podniesie wartości; wówczas wątek czekający przechodzi dalej, pozostawiając wartość na 0
    • W efekcie, semafor z wartością początkową np. 1 będzie "przepuszczał" tylko jeden wątek dalej i na koniec sygnalizując zakończenie swojego działania może dopiero wpuścić następny
    • Ważne: semafory działają atomowo tzn. że operacja zmniejszenia lub zwiększenia wartości o 1 odbywa się "natychmiastowo" i efekt jest widoczny dla wszystkich wątków
  • Semafor w C dla powyższego zadania:
    • Nagłówek:
      #include <semaphore.h>
    • Zmienna globalna:
      sem_t semaphore;
    • Inicjalizacja (w funkcji main()):
      sem_init(&semaphore, 0, 1);
    • Sekcja krytyczna (w funkcji compute()):
      sem_wait(&semaphore);
      sum++;
      sem_post(&semaphore);
  • Jeżeli semafor ma wartość 1, to wygodniejsze jest użycie blokady typu mutex:
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    ...
    pthread_mutex_lock(&mutex);
    sum++;
    pthread_mutex_unlock(&mutex);

Obliczanie liczby {$\pi$} metodą Monte Carlo

  • Pole powierzchni dowolnej figury można w przybliżeniu obliczyć przy użyciu metody Monte Carlo:
    • Załóżmy, że figura cała mieści się w innej, której pole znamy (np. w kwadracie o znanym boku)
    • Losując punkt w obrębie figury o znanym polu (np. tego kwadratu), możemy sprawdzić czy należy on także do figury, której pole chcemy obliczyć
    • Jeśli wykonamy to dostatecznie wiele razy i losowanie przebiegało poprawnie (dokładniej mówiąc: rozkład prawdopodobieństwa był równomierny), to stosunek trafień do ogólnej liczby prób stanowić będzie przybliżenie pola powierzchni
    • Innymi słowy i przez przykład: jeśli mamy kwadrat o boku 1 (pole = 1) oraz kwadrat o boku 2 (pole = 4), to umieszczenie pierwszego w drugim i losowe próbkowanie punktów daje prawdopodobieństwo {$\frac{1}{4}$}, że punkt znajdzie się w kwadracie mniejszym
    • Oczywiście, im więcej losowych prób, tym bliżej do prawidłowej odpowiedzi (i mimo, że jak podaję jest to "oczywiste", sprawdzimy dziś tę hipotezę!)
  • Możemy poczynić takie obserwacje:
    • Pole powierzchni koła o promieniu równym 1 ma wartość {$\pi$}
    • Jeśli założymy, że środkiem tego koła jest punkt {$(0,0)$}, to w pierwszej ćwiartce układu współrzędnych znajduje się wycinek koła o polu {$\frac{\pi}{4}$}
    • Możemy też w punktach {$(0,0), (0,1), (1,1), (1,0)$} umieścić kwadrat (czyli o boku długości 1, czyli o polu równym 1)
  • Zgodnie z metodą Monte Carlo, możemy losować punkt {$(x,y)$}, gdzie {$0 \le x \le 1$} oraz {$0 \le y \le 1$} i korzystając z nierówności koła sprawdzić czy należy ono do rozważanego wycinka koła
  • ZADANIE: Napisz kod wątku, który sprawdzi losowo 1000 punktów zgodnie z powyższym opisem. Wątek główny powinien utworzyć pożądaną liczbę wątków (podawaną przez użytkownika), poczekać aż zakończy się ich działanie, następnie niech wypisze przewidywaną wartość liczby {$\pi$}. Hipoteza brzmi: im więcej wątków weźmie udział w liczeniu, tym dokładniejszy wynik otrzymamy
  • Pomocne informacje:
    • Nierówność koła o promieniu 1 ze środkiem w punkcie {$(0,0)$}): {$x^2 + y^2 \le 1$}
    • Wylosowanie wartości z przedziału [0, 1]:
      double x = (double) rand() / (double) RAND_MAX;

Rozwiązanie problemu producent-konsument

#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>

sem_t can_produce;
sem_t can_consume;

void *producer(void *data) {
    while (1) {
        sem_wait(&can_produce);
        puts("PRODUCING!");
        sem_post(&can_consume);
    }
}

void *consumer(void *data) {
    while (1) {
        sem_wait(&can_consume);
        puts("CONSUMING!");
        sem_post(&can_produce);
    }
}

int main(int argc, char **argv) {
    pthread_t thread_prod, thread_cons;

    sem_init(&can_produce, 0, 1);
    sem_init(&can_consume, 0, 0);

    pthread_create(&thread_prod, NULL, producer, NULL);
    pthread_create(&thread_cons, NULL, consumer, NULL);

    pthread_join(thread_cons, NULL);
    pthread_join(thread_prod, NULL);

    return 0;
}

Rozwiązanie problemu czytelników-pisarzy

#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>

#define MAX_READERS 5

int readers;
sem_t can_read;
sem_t can_write;
sem_t rcount;

void *writer(void *data) {
    while (1) {
        sem_wait(&can_write);
        puts("WRITING!");
        sem_post(&can_read);
    }
}

void *reader(void *data) {
    while (1) {
        sem_wait(&can_read);

        sem_wait(&rcount);
        readers++;
        if (readers < MAX_READERS) {
            sem_post(&can_read);
        }
        sem_post(&rcount);

        printf("READING! (id=%d)\n", (int) data);

        sem_wait(&rcount);
        readers--;
        if (readers == 0) {
            sem_post(&can_write);
        } else {
            sem_post(&can_read);
        }
        sem_post(&rcount);
    }
}

int main(int argc, char **argv) {
    int i;
    pthread_t thread_reader[MAX_READERS];
    pthread_t thread_writer;

    readers = 0;
    sem_init(&can_write, 0, 1);
    sem_init(&can_read, 0, 0);
    sem_init(&rcount, 0, 1);

    pthread_create(&thread_writer, NULL, writer, NULL);
    for (i = 0; i < MAX_READERS; i++) {
        pthread_create(&thread_reader[i], NULL, reader, (void*) i);
    }

    pthread_join(thread_writer, NULL);
    for (i = 0; i < MAX_READERS; i++) {
        pthread_join(thread_reader[i], NULL);
    }
    return 0;
}