Wprowadzenie
- Dotychczas uruchamiane programy działały sekwencyjnie tzn. instrukcja po instrukcji
- Działały one w jednym wątku tzw. wątku głównym
- Można napisać jednak program tak, żeby wątek główny tworzył inne
- W efekcie różne części kodu będą wykonywały się równolegle (ang. parallel)
- Obecnie właściwie każdy komputer (ze smartfonami włącznie) posiada kilka rdzeni CPU, zatem równoległość zamienia się nawet we współbieżność (jednoczesne wykonanie różnego kodu w tej samej jednostce czasu, ang. concurrency)
- Jak w systemach operacyjnych zaimplementowana jest równoległość?
- Istnieje lista wszystkich utworzonych wątków
- System operacyjny wybiera pierwszy element i przydziela mu kwant czasu (rzędu milisekund)
- Wątek wykonuje swój kod przez określony czas
- Stan wątku zostaje zapisany (wszystkie wartości rejestrów, zmiennych lokalnych, itp.)
- Wątek przechodzi na koniec listy w trybie nieaktywnym i oczekuje na kolejny przydział kwantu czasu
- Odpowiedni przydział długości kwantu czasu daje efekt, że wątki wykonują się "stale" (nieprzerwanie z punktu widzenia użytkownika), a przy tym każdy wątek otrzyma swój przydział i wykona swój kod
- Powyższe stanowi BARDZO zgrubny opis, bo w rzeczywistości pod uwagę brane są takie rzeczy jak:
- Operacje I/O (np. czekanie kilkaset milisekund na odczyt pliku z dysku magnetycznego - wówczas wątek i tak nie wykona kodu)
- Priorytety (np. pewne systemowe wątki są ważniejsze niż odtwarzanie filmów w przeglądarce)
- i wiele, wiele innych...
Zalety i wady równoległości
- Jeśli program jest dekomponowalny i mamy np. 2 rdzenie, można osiągnąć 2-krotne przyspieszenie
- Przykład: dana jest tablica 1000 liczb całkowitych, należy każdą z nich podnieść do kwadratu
- Sekwencyjny kod, musi wykonać pętlę 1000 razy
- Współbieżny kod na 2 CPU wykona się szybciej (można podzielić tablicę na indeksy 0..499 oraz 500..999)
- W ogólności (i czysto teoretycznie), można by nawet użyć 1000 CPU by każdy wyliczył jedną tylko wartość
- Ale wiele rzeczywistych problemów jest bardziej skomplikowana, wymaga komunikacji między wątkami i/lub synchronizacji
- Sposób przydziału kwantów czasu jest w pewnym sensie chaotyczny tzn. mimo, że jest to znany z góry algorytm działający na znanym sprzęcie, itp. niemożliwe jest w ogólności wyznaczenie dokładnie który wątek kiedy się wykona (to bardzo ważna uwaga!)
- Wątki mogą mieć własną przestrzeń zmiennych ("widoczną" tylko dla nich samych) lub globalną i współdzieloną
- Biorąc pod uwagę chaotyczność przydziału czasu CPU do wątków, brak elementów synchronizacji dostępu do danych współdzielonych powoduje zupełnie różne wyniki przetwarzania, w różnych uruchomieniach tego samego programu
Wątki w C
Szablon kodu
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
void *hello_world(void *data) {
printf("Hello World from thread %d\n", (int) data);
}
int main() {
int i, n;
pthread_t *threads;
scanf("%d", &n);
threads = (pthread_t*) malloc(n * sizeof(pthread_t));
for (i = 0; i < n; i++) {
pthread_create(&threads[i], NULL, hello_world, (void*) i);
}
for (i = 0; i < n; i++) {
pthread_join(threads[i], NULL);
}
free(threads);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
void *hello_world(void *data) {
printf("Hello World from thread %d\n", (int) data);
}
int main() {
int i, n;
pthread_t *threads;
scanf("%d", &n);
threads = (pthread_t*) malloc(n * sizeof(pthread_t));
for (i = 0; i < n; i++) {
pthread_create(&threads[i], NULL, hello_world, (void*) i);
}
for (i = 0; i < n; i++) {
pthread_join(threads[i], NULL);
}
free(threads);
return 0;
}
Hello World from thread 0
Hello World from thread 3
Hello World from thread 2
Hello World from thread 1
Hello World from thread 1
Hello World from thread 0
Hello World from thread 2
Hello World from thread 3
Hello World from thread 3
Hello World from thread 2
Hello World from thread 1
Hello World from thread 1
Hello World from thread 0
Hello World from thread 2
Hello World from thread 3
Demonstracja liniowego przyspieszenia
#include <limits.h>
#include <math.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
void *iterate(void *data) {
int i;
int m = (int) data;
printf("Counting to %d\n", m);
for (i = 0; i < m; i++) {
sqrt(i);
}
}
int main() {
int i, n;
pthread_t *threads;
scanf("%d", &n);
threads = (pthread_t*) malloc(n * sizeof(pthread_t));
for (i = 0; i < n; i++) {
pthread_create(&threads[i], NULL, iterate, INT_MAX / n);
}
for (i = 0; i < n; i++) {
pthread_join(threads[i], NULL);
}
free(threads);
return 0;
}
#include <math.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
void *iterate(void *data) {
int i;
int m = (int) data;
printf("Counting to %d\n", m);
for (i = 0; i < m; i++) {
sqrt(i);
}
}
int main() {
int i, n;
pthread_t *threads;
scanf("%d", &n);
threads = (pthread_t*) malloc(n * sizeof(pthread_t));
for (i = 0; i < n; i++) {
pthread_create(&threads[i], NULL, iterate, INT_MAX / n);
}
for (i = 0; i < n; i++) {
pthread_join(threads[i], NULL);
}
free(threads);
return 0;
}
Demonstracja problemów z brakiem synchronizacji
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
int sum = 0;
void *compute(void *data) {
int i;
for (i = 0; i < 1000; i++) {
sum++;
}
}
int main() {
int i, n;
pthread_t *threads;
scanf("%d", &n);
threads = (pthread_t*) malloc(n * sizeof(pthread_t));
for (i = 0; i < n; i++) {
pthread_create(&threads[i], NULL, compute, NULL);
}
for (i = 0; i < n; i++) {
pthread_join(threads[i], NULL);
}
printf("%d\n", sum);
free(threads);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
int sum = 0;
void *compute(void *data) {
int i;
for (i = 0; i < 1000; i++) {
sum++;
}
}
int main() {
int i, n;
pthread_t *threads;
scanf("%d", &n);
threads = (pthread_t*) malloc(n * sizeof(pthread_t));
for (i = 0; i < n; i++) {
pthread_create(&threads[i], NULL, compute, NULL);
}
for (i = 0; i < n; i++) {
pthread_join(threads[i], NULL);
}
printf("%d\n", sum);
free(threads);
return 0;
}
- Jaki będzie wynik dla np. 32000 wątków?
- Należy użyć semafora. Jest to specjalny licznik:
- Jest widziany przez wiele wątków jednocześnie
- Ma pewną początkową wartość
- Ma operację zmniejszenia lub zwiększenia wartości o 1
- Jeśli aktualnie ma wartość 0, to nie można go zmniejszyć i wątek, który by to próbował zrobić będzie czekał dopóki inny nie podniesie wartości; wówczas wątek czekający przechodzi dalej, pozostawiając wartość na 0
- W efekcie, semafor z wartością początkową np. 1 będzie "przepuszczał" tylko jeden wątek dalej i na koniec sygnalizując zakończenie swojego działania może dopiero wpuścić następny
- Ważne: semafory działają atomowo tzn. że operacja zmniejszenia lub zwiększenia wartości o 1 odbywa się "natychmiastowo" i efekt jest widoczny dla wszystkich wątków
- Semafor w C dla powyższego zadania:
- Nagłówek:
#include <semaphore.h>
- Zmienna globalna:
sem_t semaphore;
- Inicjalizacja (w funkcji
main()
):sem_init(&semaphore, 0, 1); - Sekcja krytyczna (w funkcji
compute()
):sem_wait(&semaphore);
sum++;
sem_post(&semaphore);
- Nagłówek:
- Jeżeli semafor ma wartość 1, to wygodniejsze jest użycie blokady typu mutex:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
...
pthread_mutex_lock(&mutex);
sum++;
pthread_mutex_unlock(&mutex);
Obliczanie liczby {$\pi$} metodą Monte Carlo
- Pole powierzchni dowolnej figury można w przybliżeniu obliczyć przy użyciu metody Monte Carlo:
- Załóżmy, że figura cała mieści się w innej, której pole znamy (np. w kwadracie o znanym boku)
- Losując punkt w obrębie figury o znanym polu (np. tego kwadratu), możemy sprawdzić czy należy on także do figury, której pole chcemy obliczyć
- Jeśli wykonamy to dostatecznie wiele razy i losowanie przebiegało poprawnie (dokładniej mówiąc: rozkład prawdopodobieństwa był równomierny), to stosunek trafień do ogólnej liczby prób stanowić będzie przybliżenie pola powierzchni
- Innymi słowy i przez przykład: jeśli mamy kwadrat o boku 1 (pole = 1) oraz kwadrat o boku 2 (pole = 4), to umieszczenie pierwszego w drugim i losowe próbkowanie punktów daje prawdopodobieństwo {$\frac{1}{4}$}, że punkt znajdzie się w kwadracie mniejszym
- Oczywiście, im więcej losowych prób, tym bliżej do prawidłowej odpowiedzi (i mimo, że jak podaję jest to "oczywiste", sprawdzimy dziś tę hipotezę!)
- Możemy poczynić takie obserwacje:
- Pole powierzchni koła o promieniu równym 1 ma wartość {$\pi$}
- Jeśli założymy, że środkiem tego koła jest punkt {$(0,0)$}, to w pierwszej ćwiartce układu współrzędnych znajduje się wycinek koła o polu {$\frac{\pi}{4}$}
- Możemy też w punktach {$(0,0), (0,1), (1,1), (1,0)$} umieścić kwadrat (czyli o boku długości 1, czyli o polu równym 1)
- Zgodnie z metodą Monte Carlo, możemy losować punkt {$(x,y)$}, gdzie {$0 \le x \le 1$} oraz {$0 \le y \le 1$} i korzystając z nierówności koła sprawdzić czy należy ono do rozważanego wycinka koła
- ZADANIE: Napisz kod wątku, który sprawdzi losowo 1000 punktów zgodnie z powyższym opisem. Wątek główny powinien utworzyć pożądaną liczbę wątków (podawaną przez użytkownika), poczekać aż zakończy się ich działanie, następnie niech wypisze przewidywaną wartość liczby {$\pi$}. Hipoteza brzmi: im więcej wątków weźmie udział w liczeniu, tym dokładniejszy wynik otrzymamy
- Pomocne informacje:
- Nierówność koła o promieniu 1 ze środkiem w punkcie {$(0,0)$}): {$x^2 + y^2 \le 1$}
- Wylosowanie wartości z przedziału [0, 1]:
double x = (double) rand() / (double) RAND_MAX;
Rozwiązanie problemu producent-konsument
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
sem_t can_produce;
sem_t can_consume;
void *producer(void *data) {
while (1) {
sem_wait(&can_produce);
puts("PRODUCING!");
sem_post(&can_consume);
}
}
void *consumer(void *data) {
while (1) {
sem_wait(&can_consume);
puts("CONSUMING!");
sem_post(&can_produce);
}
}
int main(int argc, char **argv) {
pthread_t thread_prod, thread_cons;
sem_init(&can_produce, 0, 1);
sem_init(&can_consume, 0, 0);
pthread_create(&thread_prod, NULL, producer, NULL);
pthread_create(&thread_cons, NULL, consumer, NULL);
pthread_join(thread_cons, NULL);
pthread_join(thread_prod, NULL);
return 0;
}
#include <semaphore.h>
#include <stdio.h>
sem_t can_produce;
sem_t can_consume;
void *producer(void *data) {
while (1) {
sem_wait(&can_produce);
puts("PRODUCING!");
sem_post(&can_consume);
}
}
void *consumer(void *data) {
while (1) {
sem_wait(&can_consume);
puts("CONSUMING!");
sem_post(&can_produce);
}
}
int main(int argc, char **argv) {
pthread_t thread_prod, thread_cons;
sem_init(&can_produce, 0, 1);
sem_init(&can_consume, 0, 0);
pthread_create(&thread_prod, NULL, producer, NULL);
pthread_create(&thread_cons, NULL, consumer, NULL);
pthread_join(thread_cons, NULL);
pthread_join(thread_prod, NULL);
return 0;
}
Rozwiązanie problemu czytelników-pisarzy
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#define MAX_READERS 5
int readers;
sem_t can_read;
sem_t can_write;
sem_t rcount;
void *writer(void *data) {
while (1) {
sem_wait(&can_write);
puts("WRITING!");
sem_post(&can_read);
}
}
void *reader(void *data) {
while (1) {
sem_wait(&can_read);
sem_wait(&rcount);
readers++;
if (readers < MAX_READERS) {
sem_post(&can_read);
}
sem_post(&rcount);
printf("READING! (id=%d)\n", (int) data);
sem_wait(&rcount);
readers--;
if (readers == 0) {
sem_post(&can_write);
} else {
sem_post(&can_read);
}
sem_post(&rcount);
}
}
int main(int argc, char **argv) {
int i;
pthread_t thread_reader[MAX_READERS];
pthread_t thread_writer;
readers = 0;
sem_init(&can_write, 0, 1);
sem_init(&can_read, 0, 0);
sem_init(&rcount, 0, 1);
pthread_create(&thread_writer, NULL, writer, NULL);
for (i = 0; i < MAX_READERS; i++) {
pthread_create(&thread_reader[i], NULL, reader, (void*) i);
}
pthread_join(thread_writer, NULL);
for (i = 0; i < MAX_READERS; i++) {
pthread_join(thread_reader[i], NULL);
}
return 0;
}
#include <semaphore.h>
#include <stdio.h>
#define MAX_READERS 5
int readers;
sem_t can_read;
sem_t can_write;
sem_t rcount;
void *writer(void *data) {
while (1) {
sem_wait(&can_write);
puts("WRITING!");
sem_post(&can_read);
}
}
void *reader(void *data) {
while (1) {
sem_wait(&can_read);
sem_wait(&rcount);
readers++;
if (readers < MAX_READERS) {
sem_post(&can_read);
}
sem_post(&rcount);
printf("READING! (id=%d)\n", (int) data);
sem_wait(&rcount);
readers--;
if (readers == 0) {
sem_post(&can_write);
} else {
sem_post(&can_read);
}
sem_post(&rcount);
}
}
int main(int argc, char **argv) {
int i;
pthread_t thread_reader[MAX_READERS];
pthread_t thread_writer;
readers = 0;
sem_init(&can_write, 0, 1);
sem_init(&can_read, 0, 0);
sem_init(&rcount, 0, 1);
pthread_create(&thread_writer, NULL, writer, NULL);
for (i = 0; i < MAX_READERS; i++) {
pthread_create(&thread_reader[i], NULL, reader, (void*) i);
}
pthread_join(thread_writer, NULL);
for (i = 0; i < MAX_READERS; i++) {
pthread_join(thread_reader[i], NULL);
}
return 0;
}