Programowanie Wielowątkowe w Języku C (powtórzenie)¶
Wątek to najbardziej podstawowy fragment kodu który może być zarządzany przez scheduler w sposób niezależny.
Każdy proces systemu operacyjnego wykonuje przynajmniej jeden wątek (wątek główny). Każdy proces może uruchomić kilka (równorzędnych) współbieżnych wątków.
Wątki współdzielą używają identyfikatora procesu, przestrzeni adresowej i tablicy otwartych plików procesu z którego pochodzą. Ponieważ współdzielą pamięć i otwarte pliki, to wymagają synchronizacji przy dostępie do nich.
Wątki działają tak długo jak działa proces który je uruchomił: jeśli jakikolwiek z wątków procesu zamknie proces, wszystkie inne wątki także zostaną zakończone.
W szczególności: jeśli wątek główny zakończy wykonywanie funkcji main
, wszystkie wątki zostają zakończone.
Tworzenie nowego wątku jest tańsze niż tworzenie nowego procesu.
Kompilacja¶
Należy podczas kompilacji podlinkować bibliotekę pthreads
:
gcc -pthread source.c -o program
Pomoc¶
man 7 pthreads
- http://www.cs.put.poznan.pl/akobusinska/sop2.html
API¶
Utworzenie wątku
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
pthread_t *thread
: wskaźnik do wątkupthread_attr_t *attr
: parametry wątku*(*start_routine) (void *): wskaźnik na funkcję wykonywaną przez wątek, funkcja ta musi przyjmować 1 argument typu ``void *
i zwracać wynik typuvoid *
void *arg
argument przekazywany funkcjistart_routine
- zwraca
0
w wypadku poprawnego wykonania lub kod błędu
Przykład:
#include <stdio.h> #include <pthread.h> #include <unistd.h> void *hello_world(void *arg) { char *s = (char *) arg; printf("Hello %s!\n", s); return 0; } int main(int argc, char* argv[]) { pthread_t hello_world_thread; int result = pthread_create(&hello_world_thread, NULL, hello_world, (void *) argv[0]); if (result != 0) { perror("Could not create thread."); } //sleep(3); printf("Main thread exiting.\n"); return 0; }
Oczekiwanie na zamknięcie wątku:
int pthread_join(pthread_t thread, void **retval);
pthread_t thread
: wątek na który czekamy (nie wskaźnik)void **retval
: wartość zwracana przez wątek- zwraca
0
w wypadku poprawnego wykonania lub kod błędu
Przykład:
#include <stdio.h> #include <pthread.h> #include <unistd.h> void *hello_world(void *arg) { char *s = (char *) arg; printf("Hello %s!\n", s); return 0; } int main(int argc, char* argv[]) { pthread_t hello_world_thread; int result = pthread_create(&hello_world_thread, NULL, hello_world, (void *) argv[0]); if (result != 0) { perror("Could not create thread."); } pthread_join(hello_world_thread, NULL); printf("Main thread exiting.\n"); return 0; }
#include <stdio.h> #include <pthread.h> #include <unistd.h> void *hello_world(void *arg) { char *s = (char *) arg; printf("Hello %s!\n", s); int *result = malloc(sizeof(int)); *result = 15; return result; } int main(int argc, char* argv[]) { pthread_t hello_world_thread; int result = pthread_create(&hello_world_thread, NULL, hello_world, (void *) argv[0]); if (result != 0) { perror("Could not create thread."); } int *hello_world_result; pthread_join(hello_world_thread, (void **) &hello_world_result); printf("Thread returned, %d.\n", *hello_world_result); return 0; }
Każdy wątek powinien być połączony z innym wątkiem (np głównym) ponieważ wątek zwalnia swoje zasoby (do procesu) dopiero w momencie wykonania na nim
join
-a.Odłączenie wątku:
int pthread_detach(pthread_t thread);
pthread_t thread
: wątek
Wątek odłączony zwraca wszystkie zasoby bez potrzby wykonania
join
-a. Na wątku odłączonym nie można wykonaćjoin
-a.Zakończenie aktualnego wątku:
void pthread_exit(void *retval);
void *retval
: wartość zwracana przez bieżący wątek
#include <stdio.h> #include <pthread.h> #include <unistd.h> void *hello_world(void *arg) { char *s = (char *) arg; printf("Hello %s!\n", s); int *result = malloc(sizeof(int)); *result = 15; pthread_exit(result); printf("Not shown\n"); } int main(int argc, char* argv[]) { pthread_t hello_world_thread; int result = pthread_create(&hello_world_thread, NULL, hello_world, (void *) argv[0]); if (result != 0) { perror("Could not create thread."); } int *hello_world_result; pthread_join(hello_world_thread, (void **) &hello_world_result); printf("Thread returned, %d.\n", *hello_world_result); return 0; }
Linux Programmer’s Manual: To allow other threads to continue execution, the main thread should terminate by calling
pthread_exit()
rather thanexit(3)
.Zakończenie innego wątku:
int pthread_cancel(pthread_t thread);
pthread_t thread
: wątek do zamknięcia
Unimożliwienie zakończenia bieżącego wątku przez inny wątek:
int pthread_setcancelstate(int state, int *oldstate);
int state
: nowy stan wątku:PTHREAD_CANCEL_ENABLE
,PTHREAD_CANCEL_DISABLE
, ...int *oldstate
: poprzedni stan wątku
Synchronizacja¶
Zamki wyłączne¶
Inicjalizacja i usunięcie zamka:
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr); int pthread_mutex_destroy(pthread_mutex_t *mutex);
pthread_mutex_t *mutex
: zamekpthread_mutexattr_t *mutexattr
: atrybuty zamka:
Operacje na zamkach
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);
pthread_mutex_t *mutex
: zamek- zwraca
0
w wypadku poprawnego wykonania, lub numer błędu w przeciwnym wypadku
int pthread_mutex_trylock(pthread_mutex_t *mutex);
pthread_mutex_t *mutex
: zamek- zwraca
0
w wypadku pobrania zamka, lub inną wartość jeśli zamek nie został pobrany lub jeśli wystąpił błąd.
#include <stdio.h> #include <pthread.h> #include <unistd.h> #define THREAD_COUNT 16 int counter = 0; void *incrementer_thread(void *arg) { printf("Start thread."); int temp = counter; sleep(1); counter = temp + 1; printf("Done."); } int main(int argc, char* argv[]) { pthread_t incrementer_threads[THREAD_COUNT]; for (int i = 0; i < THREAD_COUNT; i++) { int result = pthread_create(&(incrementer_threads[i]), NULL, incrementer_thread, NULL); if (result != 0) { perror("Could not create thread."); } } for (int i = 0; i < THREAD_COUNT; i++) { pthread_join(incrementer_threads[i], NULL); } printf("\nCounter value after executing %d threads is %d.\n", THREAD_COUNT, counter); return 0; }
#include <stdio.h> #include <pthread.h> #include <unistd.h> #define THREAD_COUNT 16 int counter = 0; pthread_mutex_t counter_lock; void *incrementer_thread(void *arg) { printf("Start thread."); pthread_mutex_lock(&counter_lock); int temp = counter; sleep(1); counter = temp + 1; pthread_mutex_unlock(&counter_lock); printf("Done."); } int main(int argc, char* argv[]) { pthread_t incrementer_threads[THREAD_COUNT]; pthread_mutex_init(&counter_lock, NULL); for (int i = 0; i < THREAD_COUNT; i++) { int result = pthread_create(&(incrementer_threads[i]), NULL, incrementer_thread, NULL); if (result != 0) { perror("Could not create thread."); } } for (int i = 0; i < THREAD_COUNT; i++) { pthread_join(incrementer_threads[i], NULL); } pthread_mutex_destroy(&counter_lock); printf("\nCounter value after executing %d threads is %d.\n", THREAD_COUNT, counter); return 0; }
Zamki zapis/odczyt¶
Analogiczne operacje do zamków wyłacznych, podział na zapis i odczyt:
int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *rwlockattr); int pthread_rwlock_destroy(pthread_rwlock_t *rwlock); int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
#include <stdio.h> #include <pthread.h> #include <unistd.h> #define THREAD_COUNT 16 int counter = 0; pthread_mutex_t counter_lock; void *reader_thread(void *arg) { printf("Start reader.\n"); pthread_mutex_lock(&counter_lock); int temp = counter; sleep(1); pthread_mutex_unlock(&counter_lock); printf("Read %d.\n", temp); } void *writer_thread(void *arg) { printf("Start writer.\n"); pthread_mutex_lock(&counter_lock); int temp = counter; temp += 1; sleep(1); counter = temp; pthread_mutex_unlock(&counter_lock); printf("Written %d.\n", temp); } int main(int argc, char* argv[]) { pthread_t threads[THREAD_COUNT]; pthread_mutex_init(&counter_lock, NULL); for (int i = 0; i < THREAD_COUNT; i++) { int result; if ((i == 4) || (i == 12)) { result = pthread_create(&(threads[i]), NULL, writer_thread, NULL); } else { result = pthread_create(&(threads[i]), NULL, reader_thread, NULL); } if (result != 0) { perror("Could not create thread."); } } for (int i = 0; i < THREAD_COUNT; i++) { pthread_join(threads[i], NULL); } pthread_mutex_destroy(&counter_lock); printf("\nCounter value after executing %d threads is %d.\n", THREAD_COUNT, counter); return 0; }
#include <stdio.h> #include <pthread.h> #include <unistd.h> #define THREAD_COUNT 16 int counter = 0; pthread_rwlock_t counter_lock; void *reader_thread(void *arg) { printf("Start reader.\n"); pthread_rwlock_rdlock(&counter_lock); int temp = counter; sleep(1); pthread_rwlock_unlock(&counter_lock); printf("Read %d.\n", temp); } void *writer_thread(void *arg) { printf("Start writer.\n"); pthread_rwlock_wrlock(&counter_lock); int temp = counter; temp += 1; sleep(1); counter = temp; pthread_rwlock_unlock(&counter_lock); printf("Written %d.\n", temp); } int main(int argc, char* argv[]) { pthread_t threads[THREAD_COUNT]; pthread_rwlock_init(&counter_lock, NULL); for (int i = 0; i < THREAD_COUNT; i++) { int result; if ((i == 4) || (i == 12)) { result = pthread_create(&(threads[i]), NULL, writer_thread, NULL); } else { result = pthread_create(&(threads[i]), NULL, reader_thread, NULL); } if (result != 0) { perror("Could not create thread."); } } for (int i = 0; i < THREAD_COUNT; i++) { pthread_join(threads[i], NULL); } pthread_rwlock_destroy(&counter_lock); printf("\nCounter value after executing %d threads is %d.\n", THREAD_COUNT, counter); return 0; }
(Kompilacja
-std=gnu99
)
Wait-notify¶
Inicjalizacja i usunięcie zmiennej warunkowej:
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *condattr); int pthread_cond_destroy(pthread_cond_t *cond);
pthread_cond_t *cond
: zmienna warunkowapthread_condattr_t *condattr
: konfiguracja zmiennej warunkowej
Czekanie na zmiennej warunkowej:
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
pthread_cond_t *cond
: zmienna warunkowapthread_mutex_t *mutex
: zamek
Czekanie na zmiennej warunkowej zawsze występuje w sekcji krytycznej zamka (tzn. wątek który rozpoczyna czekanie uprzednio pobrał zamek).
Sygnalizowanie zmiennej warunkowej:
int pthread_cond_signal(pthread_cond_t *cond); int pthread_cond_broadcast(pthread_cond_t *cond);
pthread_cond_t *cond
: zmienna warunkowa
Sygnalizowanie kończy przywraca do aktywności jeden (dowolny) lub wszystkie (kolejno) czekające wątki.
Przykład:
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> #define PUSH_THREAD_COUNT 8 #define POP_THREAD_COUNT 2 #define STACK_SIZE 6 int stack[STACK_SIZE]; int stack_pointer = 0; pthread_mutex_t stack_lock; pthread_cond_t stack_condition; void *push_thread(void *arg) { int value = *((int *) arg); pthread_mutex_lock(&stack_lock); // Stack is full: wait (release mutex for the duration) while (stack_pointer >= STACK_SIZE) { printf("Waiting to push value %d.\n", value); pthread_cond_wait(&stack_condition, &stack_lock); } // There's room in the stack: put more stuff on and notify all printf("Pushing value %d.\n", value); stack[stack_pointer++] = value; sleep(1); pthread_cond_broadcast(&stack_condition); pthread_mutex_unlock(&stack_lock); } void *pop_thread(void *arg) { pthread_mutex_lock(&stack_lock); // Stack is empty: wait (release mutex for the duration) while (stack_pointer <= 0) { printf("Waiting to pop value.\n"); pthread_cond_wait(&stack_condition, &stack_lock); } // There's room in the stack: put more stuff on and notify all int value = stack[--stack_pointer]; stack[stack_pointer] = 0; printf("Popping value %d.\n", value); sleep(1); pthread_cond_broadcast(&stack_condition); pthread_mutex_unlock(&stack_lock); } int main(int argc, char* argv[]) { pthread_t pop_threads[POP_THREAD_COUNT]; pthread_t push_threads[PUSH_THREAD_COUNT]; pthread_mutex_init(&stack_lock, NULL); pthread_cond_init(&stack_condition, NULL); for (int i = 0; i < POP_THREAD_COUNT; i++) { int result = pthread_create(&(pop_threads[i]), NULL, pop_thread, NULL); if (result != 0) { perror("Could not create pop thread."); } } for (int i = 0; i < PUSH_THREAD_COUNT; i++) { int *value = (int *) malloc(sizeof(int)); (*value) = i + 1; int result = pthread_create(&(push_threads[i]), NULL, push_thread, value); if (result != 0) { perror("Could not create push thread."); } } for (int i = 0; i < PUSH_THREAD_COUNT; i++) { pthread_join(push_threads[i], NULL); } for (int i = 0; i < POP_THREAD_COUNT; i++) { pthread_join(pop_threads[i], NULL); } pthread_mutex_destroy(&stack_lock); printf("\nStack after execution of %d push and %d pop threads.\n", PUSH_THREAD_COUNT, POP_THREAD_COUNT); for (int i = 0; i < stack_pointer; i++) { printf("[%d] => %d\n", i, stack[i]); } return 0; }
Zadania¶
1. Napisz program map-reduce-len
który rozkłada wykonanie obliczeń
na zbiorze danych na wiele wątków zgodnie z techniką map-reduce.
Konkretnie, program oblicza sumę długość jakiegoś tekstu w sposób współbieżny.
Użytkownik podaje przez parametr/argumenty tekst \(S\). Program dzieli ten
tekst na frac{n} ~równych fragmentów. Dla każdego takiego fragmentu
\(S_i\) program tworzy wątek \(T_i\), który oblicza długość \(S_i\)
i zwraca wynik do wątku głównego. Następnie wątek główny sumuje wyniki z
poszczególnych wątków i wypisuje sumę na ekran.
2. Napisz program gl-histogram
który tworzy histogram liter dla jakiegoś
tekstu w sposób wielowątkowy. Użytkownik podaje przez parametr/argumenty tekst
\(S\). Program dzieli ten tekst na frac{n} ~równych fragmentów. Dla
każdego takiego fragmentu \(S_i\) program tworzy wątek \(T_i\). Wątek
\(T_i\) liczy liczbę wystąpień poszczególnych liter w \(S_i\) i wpisuje
wyniki dla poszczególnych liter do globalnej współdzieloną tablicę \(H\).
Po zakończeniu wykonania wszystkich wątków, wątek główny wypisuje na ekran
zawartość \(H\). Zapewnij synchronizację wątków używając zamka wyłącznego.
3. Napisz program fgl-histogram
który dodaje do gl-histogram
wydajną
synchronizację: zamiast używać jednego zamka wyłącznego zastosuj tablicę zamków
współdzielonych. Porównaj wydajność systemu.
4. Napisz program prod-cons
który rozwiązuje problem producent-konsument.
W pamięci istnieje kolejka FIFO \(Q\). Wątek pełniący rolę producenta
generuje kolejne liczby całkowite i dopisuje je na koniec \(Q\). Jeśli w
kolejce nie ma miejsca na kolejną wartość, wątek czeka aż miejsce się zwolni.
Wątek pełniący rolę konsumenta odczytuje kolejne liczby z początku \(Q\) i
wypisuje na ekran. Konsument usuwa odczytane wartości z \(Q\). Jeśli
kolejka jest pusta, konsument czeka aż w kolejce znajdą się nowe wartości.
Sprawdź zachowanie systemu w przypadku jeśli konsument jest dużo szybszy niż
producent i vice versa.
5. Napisz program n-prod-m-cons
który jest rozszerzeniem programu
prod-cons
, gdzie jednocześnie istnieje \(n\) producentów i \(m\)
konsumentów. Wartości \(n\) i \(m\) są podawane przez argumenty.