===== Threads ===== A process can run multiple threads – multiple flows of control. \\ In general, the threads share all resources (such as memory or open files). \\ Each thread uses a separate stack, and obviously has its own state of the CPU registers. A limited set of properties might also be specific to a thread (e.g., signal mask, CPU affinity). Since 2012 the C and C%%+%%+ languages include API for concurrency, including threads. However, it is very generic, so that it can be implemented in all operating systems. ^ ^ POSIX ^ C ^ C%%+%%+ ^ |create a new thread|''pthread_create''|''thrd_create''|''std::thread'' constructor| |join a thread|''pthread_join''|''thrd_join''|''std::thread::join''| |detach a thread|''pthread_detach''|''thrd_detach''|''std::thread::detach''| |exit from a thread|''pthread_exit''|''thrd_exit''|—| |send signal to a specific thread|''pthread_kill''|—|—| A process always starts with a single thread called the main thread. \\ When the main thread returns from ''main'' function, or when any thread calls ''exit'' function, all threads are forcefully terminated. ===== pthreads ===== The POSIX standard defines an API for threads called **[[https://en.wikipedia.org/wiki/Pthreads|pthreads]]**. While each operating system has its own implementation of threads, virtually any Unix-like system implements the POSIX Threads API. \\ Linux implementation of threads is called [[https://en.wikipedia.org/wiki/Native_POSIX_Thread_Library|NPTL]]. Apart from the standard pthread functions, it offers several Linux-specific functions (which names end with ''_np'' to indicate non-portability). **To compile any program that uses pthreads, one had to add ''-pthread'' option upon invoking compiler and linker.** \\ This changed since version 2.34 of glibc, that moved POSIX thread routines to standard C library (libc). ==== Creating a thread ==== To start a new thread, one must point to the function that will be executed by the thread. \\ The function must take a pointer to arbitrary data as an argument and return an pointer to arbitrary data((To specify a pointer without specifying the type of underlying data one has to use ''void *''.)):
void * start_routine (void * argument){ … return some_pointer; }
In C such function is of type ''void *(*)(void *)'', and a variable called ''func'' of this type should be declared as ''void *(*func)(void *)''. \\ A new thread will get an identifier of type ''pthread_t'' that can be used later on to refer to the thread. The function starting a new thread has the following syntax: \\ Needs header:
pthread.h
''int **pthread_create**(pthread_t *restrict //threadIdentifier//,'' \\ ''                   const pthread_attr_t *restrict //attr//,'' \\ ''                   void *(*//start_routine//)(void *),'' \\ ''                   void *restrict //arg//);''\\ It will write the thread identifier to the variable pointed by ''//threadIdentifier//'', start a new thread and run ''//start_routine//(//arg//)'' in it. If the ''//attr//'' argument is not NULL, thread attributes will be set before starting the thread.
++++An example of creating a thread and passing it some data| #include #include #include #include #include struct myStruct { int i; char s[16]; }; // function to be run by a thread; it must accept and return a 'void *' void *func(void *rawArg) { struct myStruct *arg = rawArg; printf("%d %s\n", arg->i, arg->s); free(arg); return NULL; } int main() { pthread_t ti; // pthread_t is a type that stores thread identifiers ('long unsigned int' in Linux) // to pass several data to a thread, a structure is created on heap struct myStruct *t1_arg = malloc(sizeof(struct myStruct)); t1_arg->i = 42; strcpy(t1_arg->s, "foo baz bar"); // the following line creates a new thread and runs func(t1_arg) in the thread. pthread_create(&ti, NULL, func, t1_arg); // terminating the main thread (or any other) with pthread_exit does not terminate the process pthread_exit(NULL); } ++++
==== Caring for a thread ==== Just like a child process is remembered by the operating system until the parent process reaps it, a thread that has terminated is remembered by the operating system until another thread **join**s it. Alternatively, a thread can be explicitly **detach**ed so that it is immediately reaped upon termination and cannot be joined anymore. By default, a thread starts in joinable state. One can create a set of attributes (using ''pthread_attr_init'' and ''pthread_attr_setdetachstate'') that tells the OS to create a thread in the detached state. To join a thread and collect its result one has to call the function: \\ Needs header: pthread.h ''int **pthread_join**(pthread_t //thread//, void %%**%%//value_ptr//);'' \\ Calling the ''pthread_join'' waits until the thread ''//thread//'' terminates and writes its return value to the ''void *'' pointer pointed by ''//value_ptr//''. The ''//value_ptr//'' can be NULL, and then the return value is discarded.
++++An example code that creates 3 threads and joins them, collecting their result.| #include #include #include #include #include // this just does computation, there's nothing about threads here void monteCarloPi(uint64_t *hit, uint64_t *miss) { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); srand(ts.tv_nsec); for (int i = 0; i < 1e6; ++i) { double x = rand() / (double)RAND_MAX; double y = rand() / (double)RAND_MAX; if ((x*x + y*y) < 1) (*hit)++; else (*miss)++; } } struct theResult { uint64_t hit; uint64_t miss; }; void *runPi(void *arg) { // preparing the return value - a structure 'theResult' struct theResult *ret = malloc(sizeof(struct theResult)); // filling it with data ret->hit = ret->miss = 0; monteCarloPi(&ret->hit, &ret->miss); // returning pointer to memory allocated on heap return ret; } int main() { pthread_t ti[3]; uint64_t hit = 0, miss = 0; for (int i = 0; i < 3; ++i) pthread_create(ti + i, NULL, runPi, NULL); monteCarloPi(&hit, &miss); for (int i = 0; i < 3; ++i) { // create a variable that will store the location of the result struct theResult *result; // wait until thread ti[i] exits, and write its return value to result pthread_join(ti[i], (void **)&result); // use the result hit += result->hit; miss += result->miss; // free the memory free(result); } printf("%f\n", 4. * hit / (hit + miss)); return 0; } ++++
To detach a thread one has to call the function: \\ Needs header: pthread.h ''int **pthread_detach**(pthread_t //thread//);'' \\ The ''pthread_detach'' function returns without waiting. The value returned by the detached thread will be discarded. \\ A thread can detach itself (i.e. call ''pthread_detach(pthread_self());''), but typically the thread that spawned it calls the detach.
++++An example code that creates a thread and detaches it.| #ifdef __GNUC__ // whenever GNU Compiler or compatible is detected, #define _GNU_SOURCE // enable GNU Compiler extensions. #endif #include #include #include #include #include #include void *buzzer(void *arg) { #ifdef _GNU_SOURCE // when GNU Compiler extensions are available, pthread_setname_np(pthread_self(), "buzzer"); // assign a name for the thread #endif int i = 0; while (1) { usleep(1e5); fprintf(stderr, "%06d BZZZZZTT\n", ++i); } } void usr1(int num) { signal(SIGUSR1, SIG_DFL); char name[33] = "USR1 received by "; #ifdef _GNU_SOURCE pthread_getname_np(pthread_self(), name + 17, 16); #endif int len = strlen(name); name[len] = '\n'; write(STDERR_FILENO, name, len + 1); pthread_exit(NULL); // terminate this thread } int main() { // create a thread pthread_t ti; pthread_create(&ti, NULL, buzzer, NULL); // and tell the operating system that the thread will never be joined pthread_detach(ti); sleep(1); signal(SIGUSR1, usr1); // notice that threads share signal handlers pthread_kill(ti, SIGUSR1); // sends a signal to a specified thread sleep(1); return 0; } ++++
POSIX offers a mechanism called //cancellation//. That is, one can send to a thread a cancellation request using the ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cancel.html|pthread_cancel]]'' function. Each thread can select is behaviour on receiving such request using the ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_setcancelstate.html|pthread_setcancelstate]]'' and ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_setcancelstate.html|pthread_setcanceltype]]'' functions. One may either allow or disallow cancalling a thread, and one may choose whether the cancallation should ouccur immediatly after a request, or at the next cancallation point. Cancellation points are the execution points of several pthread functions, among which notably ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_setcancelstate.html|pthread_testcancel]]'' is there just to this end. Once the thread complies to the cancellation request, it roughly does the same actions as if ''pthread_exit(PTHREAD_CANCELED)'' was called. The cancellation mechanism allows deferring thread termination until a logically consistent state (e.g. of data shared with other threads) is reached. === Exercises === ~~Exercise.#~~ In the main thread create a new thread, join it and then return from main. \\ In the newly created thread write ''Hello World'' to the standard output. ~~Exercise.#~~ Remove from the code you wrote for the previous exercise the ''pthread_join'' and re-run the code. What has changed in the behaviour of the program? ~~Exercise.#~~ In the main thread create 10 new threads, passing to each of them an ordinal number. Then, join each thread and exit. \\ In the newly created threads write the number to the standard output. ~~Exercise.#~~ Add to the program from the previous exercise a global variable. In each thread read the variable and increment it. Output the obtained value together with the ordinal number, and return it from the thread entry routine. In the main thread, collect the returned numbers and display them. ===== Accessing the same data from multiple threads ===== The following examples show what might happen upon accessing the same data from multiple threads with no synchronisation. First, let's read/write "linearly" from/to an array from two threads:
#include #include #include unsigned long long version; char text[1020]; void *updater(void * arg) { while (1) for (char letter = 'a'; letter <= 'z'; ++letter) { version++; for (int i = 0; i < 1020 - 1; ++i) text[i] = letter; // memset(text, letter, 1020); } } int main() { pthread_t tid; pthread_create(&tid, NULL, updater, NULL); while (getchar() != EOF) printf("version: %llu\n text: %s\n", version, text); return 0; }
Next, let's read-modify-write the same variable from multiple threads:
#include #include unsigned long long counter; void *incrementer(void * arg) { for (int i = 0; i < 1000; ++i) counter++; return NULL; } int main() { pthread_t tid[16]; for (int i = 0; i < 16; ++i) pthread_create(tid + i, NULL, incrementer, NULL); for (int i = 0; i < 16; ++i) pthread_join(tid[i], NULL); printf("%llu\n", counter); return 0; }
If the code above always returns the right answer, try to run it a million times: \\ ''for X in `seq 1000000`; do RES=$(%%./%%//progname//); test "$RES" -ne 16000 && echo -e "\n$RES" && break || echo -n '.'; done''
~~Exercise.#~~ Read, understand the code, compile and run the two above examples. What is wrong with the results? What problem do these examples show? ===== POSIX mutexes and readers-writers locks ===== ==== Mutexes ==== A [[https://en.wikipedia.org/wiki/Lock_(computer_science)|mutex]] is a synchronisation primitive that has two operations: //lock// that locks (acquires) the mutex and //unlock// that unlocks (releases) the mutex. A thread that invoked lock and did not yet invoke unlock //holds// the mutex. A mutex guarantees that at most one thread holds it at a time. The name mutex comes from //mutual exclusion//. Mutexes are sometimes called //locks//, however some APIs use the name //lock// for an object that locks a mutex upon creation and unlocks it upon destruction. To create a POSIX mutex, one has to create a variable of ''pthread_mutex_t'' type and initialize it, using either an initializer macro: \\ Needs header: pthread.h ''pthread_mutex_t //mutex// = **PTHREAD_MUTEX_INITIALIZER**;'' \\ to initialize a mutex with default semantics, or an initialisation function: \\ Needs header: pthread.h ''int **pthread_mutex_init**(pthread_mutex_t *restrict //mutex//,'' \\ ''                       const pthread_mutexattr_t *restrict //attr//)'' When using the ''pthread_mutex_init'' function, one can select non-default semantics for a mutex. \\ To do so, one has to declare a ''pthread_mutexattr_t'' variable and initialize it using: \\ Needs header: pthread.h ''int **pthread_mutexattr_init**(pthread_mutexattr_t *//attr//)'' \\ and finally set the attributes using corresponding functions: * ''int pthread_mutexattr_set**type**(pthread_mutexattr_t *//attr//, int //type//)'' sets mutex type (see below), * ''int pthread_mutexattr_set**pshared**(pthread_mutexattr_t *//attr//, int //pshared//)'' allows sharing mutexes between processes (they need to reside in shared memory), * ''int pthread_mutexattr_set**robust**(pthread_mutexattr_t *//attr//, int //robust//)'' when a thread holding a mutex terminated, a robust mutex will return appropriate error instead of waiting, * ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutexattr_setprotocol.html|pthread_mutexattr_setprotocol]]'' and ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutexattr_setprioceiling.html|setprioceiling]]'' deal with priorities. The mutex type impacts what happens when a mutex that is already locked is locked again by the same thread: * ''PTHREAD_MUTEX_NORMAL'' - the thread will deadlock (with itself), * ''PTHREAD_MUTEX_ERRORCHECK'' - locking the mutex will fail (i.e., return ''-1'' and set ''errno'' accordingly), * ''PTHREAD_MUTEX_RECURSIVE'' - the mutex counts how many times it was locked, and will unlock after this many unlocks, * ''PTHREAD_MUTEX_DEFAULT'' - it is undefined what will happen. Some mutex types also guarantee failing((returning ''-1'' from ''pthread_mutex_unlock'' and setting ''errno'' accordingly.)) to unlock a lock not owned by the current thread – all mutexes of ''PTHREAD_MUTEX_ERRORCHECK'' and ''PTHREAD_MUTEX_RECURSIVE'' do that, as well as any mutex set to be robust does that. An example of creating a recursive mutex is as follows:
pthread_mutexattr_t recursiveAttrs; pthread_mutexattr_init(&recursiveAttrs); pthread_mutexattr_settype(&recursiveAttrs, PTHREAD_MUTEX_RECURSIVE); pthread_mutex_t mutex; pthread_mutex_init(&mutex, &recursiveAttrs);
To lock a mutex one can use either of: \\ Needs header: pthread.h ''int **pthread_mutex_lock**(pthread_mutex_t *//mutex//)'' \\ ''int pthread_mutex_trylock(pthread_mutex_t *//mutex//)'' \\ ''int pthread_mutex_timedlock(pthread_mutex_t *restrict //mutex//, const struct timespec *restrict //abstime//)'' \\ If the ''//mutex//'' is unlocked, the functions lock the mutex. If ''//mutex//'' is currently locked, ''pthread_mutex_lock'' waits until the mutex becomes unlocked, ''pthread_mutex_trylock'' immediately returns ''-1'' and sets ''errno'' to ''EBUSY'', and ''pthread_mutex_timedlock'' waits at most //abstime// for the lock to become available. \\ When a mutex is misused, locking it may return ''-1'' and set ''errno'' accordingly. To unlock a mutex, one has to use the function: \\ Needs header: pthread.h ''int **pthread_mutex_unlock**(pthread_mutex_t *//mutex//)'' \\ It is important to remember that only the thread that locked the mutex can unlock it. ~~Exercise.#~~ Use a mutex to fix the programs from the two examples in the section [[#accessing_the_same_data_from_multiple_threads|accessing the same data from multiple threads]].
~~Exercise.#~~ The program below accesses a linked list from multiple threads. Add mutexes to functions which names start with ''list_…'' so that the program no longer crashes.
++++ Source code for this exercise: |
#include #include #include #include typedef int element_t; struct list { struct node { struct node *next, *previous; element_t e; } * head, *tail; }; void list_init(struct list *l) { l->head = l->tail = NULL; } struct node *list_add_back(struct list *l, element_t e) { struct node *newNode = malloc(sizeof(struct node)); newNode->next = 0; newNode->e = e; if (l->tail == NULL) { newNode->previous = NULL; l->head = l->tail = newNode; } else { newNode->previous = l->tail; l->tail->next = newNode; l->tail = newNode; } return newNode; } void list_delete_node(struct list *l, struct node *n) { if (n->previous == NULL) l->head = n->next; else n->previous->next = n->next; if (n->next == NULL) l->tail = n->previous; else n->next->previous = n->previous; free(n); } void list_delete_value(struct list *l, element_t e) { struct node *n = l->head, *next; while (n != NULL) { next = n->next; if (n->e == e) list_delete_node(l, n); n = next; } } void list_print(struct list *l, int reverse) { struct node *n = reverse ? l->tail : l->head; while (n != NULL) { printf("%d\n", n->e); n = reverse ? n->previous : n->next; } } void *adder(void *arg) { struct list *l = (struct list *)arg; srand(time(0)); while (1) { list_add_back(l, rand() % 25); usleep(10); } } void *remover(void *arg) { struct list *l = (struct list *)arg; srand(time(0) + 2); while (1) { list_delete_value(l, rand() % 25); usleep(5); } } int main() { struct list l; list_init(&l); pthread_t a, r; pthread_create(&a, NULL, adder, &l); pthread_create(&r, NULL, remover, &l); pthread_detach(a); pthread_detach(r); while (getchar() != EOF) list_print(&l, 0); return 0; }
++++ ~~Exercise.#~~ Add to the program from the previous exercise a thread executing the following function, and amend the code so that it neither crashes nor deadlocks:
void *beheader(void *arg) { struct list *l = (struct list *)arg; srand(time(0) + 4); while (1) { if (l->head != NULL) list_delete_node(l, l->head); usleep(50); } }
==== Readers-writers locks [extra] ==== A [[https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock|readers-writers lock]] (//rwlock//) is a synchronisation primitive similar to a mutex, but it offers two modes of locking: shared (//readers//) and exclusive (//writers//). At a time, either any number of threads can lock the rwlock in the shared (read) mode, or at most one thread can lock the rwlock in the exclusive (write) mode. The POSIX read-write lock object API is similar to the mutex API. \\ To create a rwlock one has to create a ''pthread_rwlock_t'' variable and initialize it with either ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_rwlock_init.html|PTHREAD_RWLOCK_INITIALIZER]]'' macro or ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_rwlock_init.html|pthread_rwlock_init]]'' function. \\ A read-write lock can be locked with: * ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_rwlock_rdlock.html|pthread_rwlock_rdlock]]'' and ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_rwlock_rdlock.html|pthread_rwlock_tryrdlock]]'' in shared (read) mode, * ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_rwlock_wrlock.html|pthread_rwlock_wrlock]]'' and ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_rwlock_wrlock.html|pthread_rwlock_trywrlock]]'' in exclusive (write) mode. ''pthread_rwlock_timedrdlock'' / ''pthread_rwlock_timedwrlock'' variants are also available. \\ To unlock a rwlock (locked in any mode), ''[[https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_rwlock_unlock.html|pthread_rwlock_unlock]]'' is used. ===== [extra] Concurrency, parallelism ===== The aim of __[[https://en.wikipedia.org/wiki/Parallel_computing|parallel programming]]__ is to take advantage of the fact that a computer can execute two or more sequences of instructions at the same time (literally). \\ One needs at least two physical threads (for instance, two CPU cores) for this. __[[https://en.wikipedia.org/wiki/Resource_contention|Contention]]__ is the situation when two processes / threads want to use the same resource (say, the same variable) at the same time. __Concurrent programming__ deals with processes / threads that contend on some resources (and run either interleaved or in parallel). \\ Concurrency also concerns computers that have one physical thread (a single CPU core) whenever they can switch context from one process / thread to another. When two processes / threads access the same data and at least one of the accesses modifies the data, then the __[[https://en.wikipedia.org/wiki/Race_condition#In_software|race condition]]__ occurs – the result of the computation depends upon a chance. \\ Race conditions often lead to incorrect state of the data and thus introduce bugs (that are usually hard to pinpoint). Regions of code that must be executed without being interleaved among each other are called __[[https://en.wikipedia.org/wiki/Critical_section|critical sections]]__. At most one process may be at a time inside inside a critical section. When a process P wants to enter a critical section, and another process Q is executing a critical section, then P must __wait__ (aka __block__) until Q exits the critical section. When a number of processes / threads are waiting on synchronisation primitives (say, blocked by ''pthread_mutex_lock''), and the only processes / threads that could wake the ones waiting (say, execute ''pthread_mutex_unlock'') are within the waiting ones, then a __[[https://en.wikipedia.org/wiki/Deadlock|deadlock]]__ occurred. \\ A deadlock usually means that the processing stops.
It is also possible to run into a state when processes don't stop, but can't progress either. This is called a __livelock__. Imagine the following code:
/* x, y and mtx are shared */ /* the programmer believed that eventually x equals y, but now x==1, y==0 and only those two processes run: */ /* Thread 1 */ │ /* Thread 2 */ char loop = 1; │ char loop = 1; while(loop) { │ while(loop) { pthread_mutex_lock(&mtx); │ pthread_mutex_lock(&mtx); if (*x==0 && *y==0) { │ if (*x==1 && *y==1) { /* do business logic */ │ /* do business logic */ loop = 0; │ loop = 0; } │ } pthread_mutex_unlock(&mtx); │ pthread_mutex_unlock(&mtx); } │ }
__Fairness__ describes whether all processes / threads have equal chance to enter the critical section. Sometimes priorities are intentionally given to selected processes / threads. Incorrect implementation of priorities may lead to __[[https://en.wikipedia.org/wiki/Priority_inversion|priority inversion]]__. \\ If the algorithm is completely unfair for some process / thread, it can __[[https://en.wikipedia.org/wiki/Starvation_(computer_science)|starve]]__ by never entering the critical section on contention. ===== Critical sections, deadlocks ===== ~~Exercise.#~~ Which lines of code are the critical section in the code below? ~~Exercise.#~~ Spot the deadlock scenario in the code below. ~~Exercise.#~~ Fix the code so that it no longer is able to deadlock.
#include #include #include #include #include struct { pthread_mutex_t mtx; char text[256]; } item[5]; const char *arg0; void *threadFunc(intptr_t num) { char name[1024], cmd[1024]; sprintf(name, "%s.pipe.%ld", arg0, num); sprintf(cmd, "rm -f %s; mkfifo %s", name, name); system(cmd); FILE *myPipe = fopen(name, "r+"); while (1) { char line[1024], nl; fscanf(myPipe, "%1023[^\n]%c", line, &nl); int argOne = atoi(line); if (argOne >= 5 || argOne < 0) continue; char *argTwoTxt = strchr(line, ' '); if (!argTwoTxt) { pthread_mutex_lock(&item[argOne].mtx); printf("T%ld reads %d as: %s\n", num, argOne, item[argOne].text); pthread_mutex_unlock(&item[argOne].mtx); continue; } argTwoTxt++; char *e; int argTwo = strtol(argTwoTxt, &e, 10); if (!*e && argTwo < 5 && argTwo >= 0 && argOne != argTwo) { pthread_mutex_lock(&item[argOne].mtx); pthread_mutex_lock(&item[argTwo].mtx); printf("T%ld copies %d to %d\n", num, argTwo, argOne); memcpy(item[argOne].text, item[argTwo].text, sizeof(item[argOne].text)); pthread_mutex_unlock(&item[argTwo].mtx); pthread_mutex_unlock(&item[argOne].mtx); } else { pthread_mutex_lock(&item[argOne].mtx); printf("T%ld assigns to %d the value: %s\n", num, argOne, argTwoTxt); memset(item[argOne].text, 0, sizeof(item[argOne].text)); strncpy(item[argOne].text, argTwoTxt, sizeof(item[argOne].text) - 1); pthread_mutex_unlock(&item[argOne].mtx); } } } int main(int argc, char **argv) { arg0 = argv[0]; printf("To use this program, write to one of the %s.pipe. the " "following:\n" " prints from item \n" " puts to item \n" " copies to item the text from item \n" "Valid pipe numbers are 0-4, valid item numbers are 0-4.", arg0); for (int i = 0; i < 5; ++i) pthread_mutex_init(&item[i].mtx, NULL); for (intptr_t i = 1; i < 5; ++i) { pthread_t tid; pthread_create(&tid, NULL, (void *(*)(void *))threadFunc, (void *)i); pthread_detach(tid); } threadFunc(0); }
===== Thread-specific data, thread local storage [extra] ===== In C, data items can have different storage duration and different linkage. Until C11, there existed only static, automatic or allocated storage durations. Roughly: \\  • static = global variables and static variables defined inside functions, \\  • automatic = items on stack, automatically allocated inside functions ('local variables'), \\  • allocated = items on heap, explicitly allocated and freed. \\ None of these cover a case when one wants a static ("global") variable with a separate value for each thread. Such storage duration is called [[https://en.wikipedia.org/wiki/Thread-local_storage|thread local]] and a coupe of ways to create thread-local items are presented in this section. ==== POSIX Thread-specific data ==== One can create //keys// (that may be understood as identifiers of variables) and then associate, for each thread separately, a value for a key. To this end the following functions can be used: \\ Need header:
pthread.h
''int **pthread_key_create**(pthread_key_t *//key//, void (*//destructor//)(void*))'' \\ ''int **pthread_setspecific**(pthread_key_t //key//, const void *//value//)'' \\ ''void %%*%%**pthread_getspecific**(pthread_key_t //key//)'' \\ Each key is initially associated with a ''NULL'' for each thread.
++++An example code that creates a key, stores and retrieves the value from two threads.| #include #include #include #include pthread_key_t mySpecificVal; void printMine() { printf("I have: %s\n", (char *)pthread_getspecific(mySpecificVal)); } void *routine(void *arg) { char *text = malloc(4); strcpy(text, "baz"); pthread_setspecific(mySpecificVal, text); printMine(); return NULL; } int main() { char *text = malloc(4); strcpy(text, "foo"); pthread_key_create(&mySpecificVal, free); pthread_setspecific(mySpecificVal, text); pthread_t ti; pthread_create(&ti, NULL, routine, NULL); pthread_join(ti, NULL); printMine(); return 0; } ++++
==== C thread-local storage ==== Starting with C11, a new storage duration ''_Thread_local'' or ''[[https://en.cppreference.com/w/c/thread/thread_local|thread_local]]''((Until C23 ''thread_local'' was a macro defined in ''threads.h'', since C23 it is a keyword with same meaning as the ''_Thread_local'' keyword.)) is defined. Variables defined with this storage duration have unique value for each thread. Notice that the ''thread_local'' variables can have initial values (unlike in POSIX), but there is no way to provide a custom destructor (unlike in POSIX). ===== POSIX condition variables ===== [[https://en.wikipedia.org/wiki/Monitor_(synchronization)|Condition variables]] are strictly related to mutexes. They allow a thread that holds a mutex to release the mutex and at the same time start //wait//ing for a wakeup signal from another thread. When the other thread //signal//s the condition variable, then either one or all threads waiting on that variable are waken and each of these threads enters the procedure of locking back the mutex. **Using condition variables is the most common way to communicate threads.** When one thread needs to execute some logic once a specific condition is fulfilled, it should: - lock a mutex, - check the condition, - if the condition is false: - wait on a condition variable, - go back to step 2, - do the logic, - unlock the mutex. A thread that may change the state and thus affect the condition should: - lock the mutex, - do its logic, - signal the condition variable, - unlock the mutex((One can also signal the condition variable after unlocking the mutex.)). The example conditions include: * a boolean flag is set; the flag indicates that another thread finished a part of the computation and the results can used, * a list of items is not empty; the list contains tasks to be done by this thread, * a number crossed some threshold; the number is the size of items to potentially garbage collect. To use a condition variable, one has to declare a ''pthread_cond_t'' variable, and then initialize it with: \\ Needs header: pthread.h ''pthread_cond_t //cond// = **PTHREAD_COND_INITIALIZER**;'' \\ to use default semantic, or with an initialisation function that allows choosing desired semantic: \\ Needs header: pthread.h ''int pthread_cond_init(pthread_cond_t *restrict //cond//, const pthread_condattr_t *restrict //attr//)'' A thread holding mutex //mutex// can use the function: \\ Needs header: pthread.h ''int **pthread_cond_wait**(pthread_cond_t *restrict //cond//, pthread_mutex_t *restrict //mutex//)'' \\ to atomically release the //mutex// and start waiting on //cond//. Another thread can use one of the functions: \\ Needs header: pthread.h ''int **pthread_cond_signal**(pthread_cond_t *restrict //cond//)''\\ ''int **pthread_cond_broadcast**(pthread_cond_t *restrict //cond//)'' \\ to wake at least one (''pthread_cond_signal''), or all (''pthread_cond_broadcast'') threads waiting on the condition variable ''cond''. \\ The thread that is signalling/broadcasting does not need to hold the mutex. ~~Exercise.#~~ The program below attempts to calculate 10 items (imagine that it tries to create ten class timetables) among four threads, and runs one thread that waits until the ten items are ready. __Right now, the program is incorrect.__ \\       • The program is not thread-safe. Propose an interleaving that looses an element. \\       • Use mutex(es) to amend the program. \\       • The thread collecting items wastes CPU on busy waiting. Point the line that wastes CPU. \\       • Add condition variable(s) to amend the program.
#include #include #include #include struct element { int value; struct element *next; } *head = NULL, *tail = NULL; void add_element(int v) { struct element *e = malloc(sizeof(struct element)); e->next = NULL; e->value = v; if (tail == NULL) head = tail = e; else tail = tail->next = e; } int take_element() { int v = head->value; struct element *e = head; head = head->next; if (head == NULL) tail = NULL; free(e); return v; } #define DO_HEAVY_COMPUTATION (usleep(rand() % 10000000), rand() % 1000000) void *worker_thread(void *arg) { srand(time(0) + (intptr_t)arg); while (1) { int v = DO_HEAVY_COMPUTATION; pthread_testcancel(); add_element(v); } } int main() { pthread_t t[4]; for (int i = 0; i < 4; ++i) pthread_create(t + i, NULL, worker_thread, NULL); int computed_count = 0; while (computed_count < 10) { while (head == NULL); int value = take_element(); printf("%02d: %d\n", ++computed_count, value); } for (int i = 0; i < 4; ++i) pthread_cancel(t[i]); for (int i = 0; i < 4; ++i) pthread_join(t[i], NULL); return 0; }
~~Exercise.#~~ Write a program that starts N worker threads that wait for tasks to do. In the main thread read commands from standard input. Once a command is read, one of the worker threads should execute the command and once the command is executed, the worker thread should print the result to the standard output. \\ For simplicity let the command be an integer, and let the execution procedure be sleeping for this many seconds. \\ This idea of creating worker threads and passing them tasks is very popular bears the name [[https://en.wikipedia.org/wiki/Thread_pool|thread pool]] in software engineering. ===== C mutexes and condition variables [extra] ===== C11 language standard brought mutexes and condition variables to C. \\ The API for these synchronisation constructs looks like a simplified POSIX API:
#include #include mtx_t mutex; cnd_t condition_variable; char shared_data = 0; int func(void *arg); int main() { mtx_init(&mutex, mtx_plain | mtx_recursive); cnd_init(&condition_variable); thrd_t t; thrd_create(&t, func, NULL); char c = getchar(); mtx_lock(&mutex); shared_data = c; mtx_unlock(&mutex); cnd_signal(&condition_variable); thrd_join(t, NULL); return 0; } int func(void *arg) { char c; mtx_lock(&mutex); while (shared_data == 0) cnd_wait(&condition_variable, &mutex); c = shared_data; mtx_unlock(&mutex); printf("%c\n", c ^ 0x20); return thrd_success; }
For a full reference, consult for instance: [[https://en.cppreference.com/w/c/thread#Mutual_exclusion]] ===== Atomic variables [extra] ===== Current [[https://en.wikipedia.org/wiki/Instruction_set_architecture|instruction set architectures]] offer atomic instructions. An atomic instruction observes all other instructions either fully executed or not executed at all and itself is observed by other instructions either fully executed or not executed at all. Atomic instructions themselves might not impose any order among instructions. Some examples of atomic instructions are: * [[https://en.wikipedia.org/wiki/Test-and-set|test-and-set]], * [[https://en.wikipedia.org/wiki/Compare-and-swap|compare-and-swap]], * [[https://en.wikipedia.org/wiki/Fetch-and-add|fetch-and-add]]. Such instructions are usually the base building blocks of synchronisation primitives (e.g., mutexes), but can be useful themselves to the end users. For instance, imagine that you need to assign unique identifiers from multiple threads – then a single atomic fetch-and-add (or even atomic increment) suffices. C11 standardized invoking the atomic instructions. To use them, the variable has to be declared with ''[[https://en.cppreference.com/w/c/language/atomic|_Atomic]]'' type specifier or ''[[https://en.cppreference.com/w/c/thread#Convenience_type_aliases|atomic_…]]'' convenience type alias (e.g. ''_Atomic int i;'' or the equivalent ''atomic_int i;''). \\ On architectures that do not support one of the atomic operations used on a variable, a lock is automatically acquired for the duration of each operation on that variable. One can learn whether the operation is lock-free by checking the value of [[https://en.cppreference.com/w/c/atomic/ATOMIC_LOCK_FREE_consts|appropriate macros]]. \\ The list of available atomic operations includes: load, store, swap, test-and-set, compare-and-swap, fetch-and-add. \\ For details, consult e.g., [[https://en.cppreference.com/w/c/thread#Atomic_operations]]. Each atomic instruction in C can be also combined with a memory ordering instruction. A memory ordering instruction can be issued explicitly using ''[[https://en.cppreference.com/w/c/atomic/atomic_thread_fence|atomic_thread_fence]]''. The memory model of C, as well as the memory ordering enum values, are described among others in here: [[https://en.cppreference.com/w/c/atomic/memory_order]]. ~~META: language = en ~~