~~Exercise.#~~ #include #include void *thread_routine(void *arg) { puts("Hello world"); return NULL; } int main() { pthread_t myThread; if (-1 == pthread_create(&myThread, NULL, thread_routine, NULL)) { perror("Creating a thread failed"); return 1; } pthread_join(myThread, NULL); return 0; } ~~Exercise.#~~ Main thread exits before the newly spawned thread, and by default when the main thread exits, all other threads are terminated. Thus, the newly spawned thread is terminated before it prints the text. ~~Exercise.#~~ #include #include #include void *thread_routine(void *arg) { int i = *(int *)arg; free(arg); printf("Hello %d\n", i); return NULL; } int main() { pthread_t myThread[10]; for (int i = 0; i < 10; ++i) { int *arg = malloc(sizeof(int)); *arg = i; if (-1 == pthread_create(myThread + i, NULL, thread_routine, arg)) { perror("Creating a thread failed"); return 1; } } for (int i = 0; i < 10; ++i) pthread_join(myThread[i], NULL); return 0; } ~~Exercise.#~~ #include #include #include long global = 0; void *thread_routine(void *arg) { int i = *(int *)arg; free(arg); long *k = malloc(sizeof(long)); *k = global; // Reading and updating global is separated printf("Hello %d: %ld\n", i, *k); // on purpose with a printf to increase global = global + 1; // chances of concurrent access to it return k; } int main() { pthread_t myThread[10]; for (int i = 0; i < 10; ++i) { int *arg = malloc(sizeof(int)); *arg = i; if (-1 == pthread_create(myThread + i, NULL, thread_routine, arg)) { perror("Creating a thread failed"); return 1; } } long *result[10]; for (int i = 0; i < 10; ++i) pthread_join(myThread[i], (void **)result + i); for (int i = 0; i < 10; ++i) { printf("Thread %d: %ld\n", i, *result[i]); free(result[i]); } return 0; } ~~Exercise.#~~ The examples show that accessing data with no synchronisation may lead to inconsistent results. \\ In the first code example one thread updates a text (character array) to a new version, whereas the second thread attempts to read the text. However, the thread reading the text sees some characters in the text being part of one version and some other characters that being from another version. \\ While the first code example shows that distinct data items can be out of sync, in the second code example the same 64-bit wide integer is used by multiple threads. Each of the 16 threads increments the integer one thousand times, but the result happens to be less than 16 thousands. This shows that even accesses to a single main memory location must be handled with care in concurrent programs. \\ In general, the examples motivate need for mutexes. ~~Exercise.#~~ #include #include #include pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; unsigned long long version; char text[1020]; void *updater(void *arg) { while (1) for (char letter = 'a'; letter <= 'z'; ++letter) { pthread_mutex_lock(&mutex); version++; for (int i = 0; i < 1020 - 1; ++i) text[i] = letter; pthread_mutex_unlock(&mutex); } } int main() { pthread_t tid; pthread_create(&tid, NULL, updater, NULL); while (getchar() != EOF) { pthread_mutex_lock(&mutex); // the data is copied locally to make the critical section short // and to avoid an I/O operation (printf) in the critical section unsigned long long version_copy; char text_copy[1020]; version_copy = version; memcpy(text_copy, text, 1020); pthread_mutex_unlock(&mutex); printf("version: %llu\n text: %s\n", version_copy, text_copy); } return 0; } #include #include pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; unsigned long long counter; void *incrementer(void *arg) { for (int i = 0; i < 1000; ++i) { pthread_mutex_lock(&mutex); counter++; pthread_mutex_unlock(&mutex); } return NULL; } int main() { pthread_t tid[16]; for (int i = 0; i < 16; ++i) pthread_create(tid + i, NULL, incrementer, NULL); for (int i = 0; i < 16; ++i) pthread_join(tid[i], NULL); printf("%llu\n", counter); return 0; } Notice that the second example can also be amended more efficiently by using atomic operations: #include #include #include _Atomic unsigned long long counter; void *incrementer(void *arg) { for (int i = 0; i < 1000; ++i) atomic_fetch_add(&counter, 1); return NULL; } int main() { pthread_t tid[16]; for (int i = 0; i < 16; ++i) pthread_create(tid + i, NULL, incrementer, NULL); for (int i = 0; i < 16; ++i) pthread_join(tid[i], NULL); printf("%llu\n", counter); return 0; } ~~Exercise.#~~ ... struct list { struct node { struct node *next, *previous; element_t e; } *head, *tail; pthread_mutex_t coarseLock; }; void list_init(struct list *l) { l->head = l->tail = NULL; pthread_mutex_init(&l->coarseLock, NULL); } struct node *list_add_back(struct list *l, element_t e) { struct node *newNode = malloc(sizeof(struct node)); newNode->next = 0; newNode->e = e; pthread_mutex_lock(&l->coarseLock); if (l->tail == NULL) { newNode->previous = NULL; l->head = l->tail = newNode; } else { newNode->previous = l->tail; l->tail->next = newNode; l->tail = newNode; } pthread_mutex_unlock(&l->coarseLock); return newNode; } void list_delete_node(struct list *l, struct node *n) { if (n->previous == NULL) l->head = n->next; else n->previous->next = n->next; if (n->next == NULL) l->tail = n->previous; else n->next->previous = n->previous; free(n); } void list_delete_value(struct list *l, element_t e) { pthread_mutex_lock(&l->coarseLock); struct node *n = l->head, *next; while (n != NULL) { next = n->next; if (n->e == e) list_delete_node(l, n); n = next; } pthread_mutex_unlock(&l->coarseLock); } void list_print(struct list *l, int reverse) { pthread_mutex_lock(&l->coarseLock); struct node *n = reverse ? l->tail : l->head; while (n != NULL) { printf("%d\n", n->e); n = reverse ? n->previous : n->next; } pthread_mutex_unlock(&l->coarseLock); } ... ~~Exercise.#~~ Notice that ''beheader'' must lock the mutex so that no other thread deletes the head between ''l->head != NULL'' and ''list_delete_node(l, l->head)''. ... void *beheader(void *arg) { struct list *l = (struct list *)arg; srand(time(0) + 4); while (1) { pthread_mutex_lock(&l->coarseLock); if (l->head != NULL) list_delete_node(l, l->head); pthread_mutex_unlock(&l->coarseLock); usleep(50); } } int main() { struct list l; list_init(&l); pthread_t a, r, b; pthread_create(&a, NULL, adder, &l); pthread_create(&r, NULL, remover, &l); pthread_create(&b, NULL, beheader, &l); pthread_detach(a); pthread_detach(r); pthread_detach(b); while (getchar() != EOF) list_print(&l, 0); return 0; } ~~Exercise.#~~ Critical sections in this particular example are the lines between ''pthread_mutex_lock''(s) and ''pthread_mutex_unlock''(s) ~~Exercise.#~~ For instance, an attempt to exchange item 0 with item 1 in thread #2 concurrently with an attempt to exchange item 1 with item 0 in thread #3 can lead to a situation when thread #2 successfully locks mutex on item 0 and thread #3 successfully locks mutex on item 1. Then, neither thread can proceed, for the other of the two threads holds the required mutex. ~~Exercise.#~~ A common solution to prevent deadlocks is to always lock mutexes in predefined order. For instance, the lines locking the mutexes upon exchanging items could look like this: pthread_mutex_lock(&item[argOne < argTwo ? argOne : argTwo].mtx); pthread_mutex_lock(&item[argOne < argTwo ? argTwo : argOne].mtx); ~~Exercise.#~~ An example interleaving that looses an element: when two worker threads execute ''add_element'' and both execute ''head = tail = e;'' concurrently. A mutex that amends the program: ... pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; void add_element(int v) { pthread_mutex_lock(&mutex); struct element *e = malloc(sizeof(struct element)); e->next = NULL; e->value = v; if (tail == NULL) head = tail = e; else tail = tail->next = e; pthread_mutex_unlock(&mutex); } ... while (computed_count < 10) { int value; while (1) { pthread_mutex_lock(&mutex); if (head != NULL) { value = take_element(); break; } pthread_mutex_unlock(&mutex); } pthread_mutex_unlock(&mutex); printf("%02d: %d\n", ++computed_count, value); } ... The line that wastes CPU is the ''while (head == NULL)'' (and respectively the ''while (1)'' loop in the code above). Condition variable that amends the program: ... pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t condvar = PTHREAD_COND_INITIALIZER; void add_element(int v) { pthread_mutex_lock(&mutex); struct element *e = malloc(sizeof(struct element)); e->next = NULL; e->value = v; if (tail == NULL) head = tail = e; else tail = tail->next = e; pthread_mutex_unlock(&mutex); pthread_cond_signal(&condvar); } ... while (computed_count < 10) { int value; pthread_mutex_lock(&mutex); while (head == NULL) pthread_cond_wait(&condvar, &mutex); value = take_element(); pthread_mutex_unlock(&mutex); printf("%02d: %d\n", ++computed_count, value); } ... ~~Exercise.#~~ #include #include #include #include #include #define N 4 typedef int task_specification_t; #define EXECUTE_TASK(arg) sleep(arg) struct list { task_specification_t task; struct list *next; } *head; pthread_mutex_t list_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t list_cv = PTHREAD_COND_INITIALIZER; void *worker(void *arg) { while (1) { pthread_mutex_lock(&list_mutex); while (head == NULL) pthread_cond_wait(&list_cv, &list_mutex); struct list *e = head; head = e->next; if (head != NULL) pthread_cond_signal(&list_cv); pthread_mutex_unlock(&list_mutex); printf("Worker %ld executing task %d\n", (intptr_t)arg, e->task); EXECUTE_TASK(e->task); printf("Worker %ld finished task %d\n", (intptr_t)arg, e->task); free(e); } } int main() { for (intptr_t i = 0; i < N; ++i) { pthread_t tid; pthread_create(&tid, NULL, worker, (void *)i); pthread_detach(tid); } int td; while (scanf("%d", &td) != EOF) { printf("Dispatching task %d\n", td); struct list *e = malloc(sizeof(struct list)); *e = (struct list){.task = td, .next = NULL}; pthread_mutex_lock(&list_mutex); if (head == NULL) { head = e; pthread_cond_signal(&list_cv); } else { struct list *it = head; while (it->next != NULL) it = it->next; it->next = e; } pthread_mutex_unlock(&list_mutex); } }
~~META: language = en ~~