User Tools

Site Tools


Sidebar

os_cp:threads

Threads

A process can run multiple threads – multiple flows of control.
In general, the threads share all resources (such as memory or open files).
Each thread uses a separate stack, and obviously has its own state of the CPU registers. A limited set of properties might also be specific to a thread (e.g., signal mask, CPU affinity).

Since 2012 the C and C++ languages include API for concurrency, including threads. However, it is very generic, so that it can be implemented in all operating systems.

POSIX C C++
create a new threadpthread_createthrd_createstd::thread constructor
join a threadpthread_jointhrd_joinstd::thread::join
detach a threadpthread_detachthrd_detachstd::thread::detach
exit from a threadpthread_exitthrd_exit
send signal to a specific threadpthread_kill

A process always starts with a single thread called the main thread.
When the main thread returns from main function, or when any thread calls exit function, all threads are forcefully terminated.

pthreads

The POSIX standard defines an API for threads called pthreads. While each operating system has its own implementation of threads, virtually any Unix-like system implements the POSIX Threads API.
Linux implementation of threads is called NPTL. Apart from the standard pthread functions, it offers several Linux-specific functions (which names end with _np to indicate non-portability).

To compile any program that uses pthreads, one had to add -pthread option upon invoking compiler and linker.
This changed since version 2.34 of glibc, that moved POSIX thread routines to standard C library (libc).

Creating a thread

To start a new thread, one must point to the function that will be executed by the thread.
The function must take a pointer to arbitrary data as an argument and return an pointer to arbitrary data1):

void * start_routine (void * argument){return some_pointer;
}

In C such function is of type void *(*)(void *), and a variable called func of this type should be declared as void *(*func)(void *).
A new thread will get an identifier of type pthread_t that can be used later on to refer to the thread.

The function starting a new thread has the following syntax:
Needs header:
pthread.h
int pthread_create(pthread_t *restrict threadIdentifier,
                   const pthread_attr_t *restrict attr,
                   void *(*start_routine)(void *),
                   void *restrict arg);
It will write the thread identifier to the variable pointed by threadIdentifier, start a new thread and run start_routine(arg) in it. If the attr argument is not NULL, thread attributes will be set before starting the thread.

An example of creating a thread and passing it some data

Caring for a thread

Just like a child process is remembered by the operating system until the parent process reaps it, a thread that has terminated is remembered by the operating system until another thread joins it. Alternatively, a thread can be explicitly detached so that it is immediately reaped upon termination and cannot be joined anymore.

By default, a thread starts in joinable state. One can create a set of attributes (using pthread_attr_init and pthread_attr_setdetachstate) that tells the OS to create a thread in the detached state.

To join a thread and collect its result one has to call the function:
Needs header: pthread.h int pthread_join(pthread_t thread, void **value_ptr);
Calling the pthread_join waits until the thread thread terminates and writes its return value to the void * pointer pointed by value_ptr. The value_ptr can be NULL, and then the return value is discarded.

An example code that creates 3 threads and joins them, collecting their result.

To detach a thread one has to call the function:
Needs header: pthread.h int pthread_detach(pthread_t thread);
The pthread_detach function returns without waiting. The value returned by the detached thread will be discarded.
A thread can detach itself (i.e. call pthread_detach(pthread_self());), but typically the thread that spawned it calls the detach.

An example code that creates a thread and detaches it.

POSIX offers a mechanism called cancellation. That is, one can send to a thread a cancellation request using the pthread_cancel function. Each thread can select is behaviour on receiving such request using the pthread_setcancelstate and pthread_setcanceltype functions. One may either allow or disallow cancalling a thread, and one may choose whether the cancallation should ouccur immediatly after a request, or at the next cancallation point. Cancellation points are the execution points of several pthread functions, among which notably pthread_testcancel is there just to this end. Once the thread complies to the cancellation request, it roughly does the same actions as if pthread_exit(PTHREAD_CANCELED) was called. The cancellation mechanism allows deferring thread termination until a logically consistent state (e.g. of data shared with other threads) is reached.

Exercises

Exercise 1 In the main thread create a new thread, join it and then return from main.
In the newly created thread write Hello World to the standard output.

Exercise 2 Remove from the code you wrote for the previous exercise the pthread_join and re-run the code. What has changed in the behaviour of the program?

Exercise 3 In the main thread create 10 new threads, passing to each of them an ordinal number. Then, join each thread and exit.
In the newly created threads write the number to the standard output.

Exercise 4 Add to the program from the previous exercise a global variable. In each thread read the variable and increment it. Output the obtained value together with the ordinal number, and return it from the thread entry routine. In the main thread, collect the returned numbers and display them.

Accessing the same data from multiple threads

The following examples show what might happen upon accessing the same data from multiple threads with no synchronisation.

First, let's read/write "linearly" from/to an array from two threads:

#include <pthread.h>
#include <stdio.h>
#include <string.h>
 
unsigned long long version;
char text[1020];
 
void *updater(void * arg) {
  while (1)
    for (char letter = 'a'; letter <= 'z'; ++letter) {
      version++;
      for (int i = 0; i < 1020 - 1; ++i)
        text[i] = letter;
      // memset(text, letter, 1020);
    }
}
 
int main() {
  pthread_t tid;
  pthread_create(&tid, NULL, updater, NULL);
  while (getchar() != EOF)
    printf("version: %llu\n   text: %s\n", version, text);
  return 0;
}

Next, let's read-modify-write the same variable from multiple threads:

#include <pthread.h>
#include <stdio.h>
 
unsigned long long counter;
 
void *incrementer(void * arg) {
  for (int i = 0; i < 1000; ++i)
    counter++;
  return NULL;
}
 
int main() {
  pthread_t tid[16];
  for (int i = 0; i < 16; ++i)
    pthread_create(tid + i, NULL, incrementer, NULL);
  for (int i = 0; i < 16; ++i)
    pthread_join(tid[i], NULL);
  printf("%llu\n", counter);
  return 0;
}

If the code above always returns the right answer, try to run it a million times:
for X in `seq 1000000`; do RES=$(./progname); test "$RES" -ne 16000 && echo -e "\n$RES" && break || echo -n '.'; done

Exercise 5 Read, understand the code, compile and run the two above examples. What is wrong with the results? What problem do these examples show?

POSIX mutexes and readers-writers locks

Mutexes

A mutex is a synchronisation primitive that has two operations: lock that locks (acquires) the mutex and unlock that unlocks (releases) the mutex. A thread that invoked lock and did not yet invoke unlock holds the mutex. A mutex guarantees that at most one thread holds it at a time. The name mutex comes from mutual exclusion. Mutexes are sometimes called locks, however some APIs use the name lock for an object that locks a mutex upon creation and unlocks it upon destruction.

To create a POSIX mutex, one has to create a variable of pthread_mutex_t type and initialize it, using either an initializer macro:
Needs header: pthread.h pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
to initialize a mutex with default semantics, or an initialisation function:
Needs header: pthread.h int pthread_mutex_init(pthread_mutex_t *restrict mutex,
                       const pthread_mutexattr_t *restrict attr)

When using the pthread_mutex_init function, one can select non-default semantics for a mutex.
To do so, one has to declare a pthread_mutexattr_t variable and initialize it using:
Needs header: pthread.h int pthread_mutexattr_init(pthread_mutexattr_t *attr)
and finally set the attributes using corresponding functions:

The mutex type impacts what happens when a mutex that is already locked is locked again by the same thread:

  • PTHREAD_MUTEX_NORMAL - the thread will deadlock (with itself),
  • PTHREAD_MUTEX_ERRORCHECK - locking the mutex will fail (i.e., return -1 and set errno accordingly),
  • PTHREAD_MUTEX_RECURSIVE - the mutex counts how many times it was locked, and will unlock after this many unlocks,
  • PTHREAD_MUTEX_DEFAULT - it is undefined what will happen.

Some mutex types also guarantee failing2) to unlock a lock not owned by the current thread – all mutexes of PTHREAD_MUTEX_ERRORCHECK and PTHREAD_MUTEX_RECURSIVE do that, as well as any mutex set to be robust does that.

An example of creating a recursive mutex is as follows:

pthread_mutexattr_t recursiveAttrs;
pthread_mutexattr_init(&recursiveAttrs);
pthread_mutexattr_settype(&recursiveAttrs, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, &recursiveAttrs);

To lock a mutex one can use either of:
Needs header: pthread.h int pthread_mutex_lock(pthread_mutex_t *mutex)
int pthread_mutex_trylock(pthread_mutex_t *mutex)

int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime)
If the mutex is unlocked, the functions lock the mutex. If mutex is currently locked, pthread_mutex_lock waits until the mutex becomes unlocked, pthread_mutex_trylock immediately returns -1 and sets errno to EBUSY, and pthread_mutex_timedlock waits at most abstime for the lock to become available.
When a mutex is misused, locking it may return -1 and set errno accordingly.

To unlock a mutex, one has to use the function:
Needs header: pthread.h int pthread_mutex_unlock(pthread_mutex_t *mutex)
It is important to remember that only the thread that locked the mutex can unlock it.

Exercise 6 Use a mutex to fix the programs from the two examples in the section accessing the same data from multiple threads.

Exercise 7 The program below accesses a linked list from multiple threads. Add mutexes to functions which names start with list_… so that the program no longer crashes.

Source code for this exercise:

Exercise 8 Add to the program from the previous exercise a thread executing the following function, and amend the code so that it neither crashes nor deadlocks:

linkedlist.c
void *beheader(void *arg) {
  struct list *l = (struct list *)arg;
  srand(time(0) + 4);
  while (1) {
    if (l->head != NULL)
      list_delete_node(l, l->head);
    usleep(50);
  }
}

Readers-writers locks [extra]

A readers-writers lock (rwlock) is a synchronisation primitive similar to a mutex, but it offers two modes of locking: shared (readers) and exclusive (writers). At a time, either any number of threads can lock the rwlock in the shared (read) mode, or at most one thread can lock the rwlock in the exclusive (write) mode.

The POSIX read-write lock object API is similar to the mutex API.
To create a rwlock one has to create a pthread_rwlock_t variable and initialize it with either PTHREAD_RWLOCK_INITIALIZER macro or pthread_rwlock_init function.
A read-write lock can be locked with:

pthread_rwlock_timedrdlock / pthread_rwlock_timedwrlock variants are also available.
To unlock a rwlock (locked in any mode), pthread_rwlock_unlock is used.

[extra] Concurrency, parallelism

The aim of parallel programming is to take advantage of the fact that a computer can execute two or more sequences of instructions at the same time (literally).
One needs at least two physical threads (for instance, two CPU cores) for this.

Contention is the situation when two processes / threads want to use the same resource (say, the same variable) at the same time.

Concurrent programming deals with processes / threads that contend on some resources (and run either interleaved or in parallel).
Concurrency also concerns computers that have one physical thread (a single CPU core) whenever they can switch context from one process / thread to another.

When two processes / threads access the same data and at least one of the accesses modifies the data, then the race condition occurs – the result of the computation depends upon a chance.
Race conditions often lead to incorrect state of the data and thus introduce bugs (that are usually hard to pinpoint).

Regions of code that must be executed without being interleaved among each other are called critical sections. At most one process may be at a time inside inside a critical section.

When a process P wants to enter a critical section, and another process Q is executing a critical section, then P must wait (aka block) until Q exits the critical section.

When a number of processes / threads are waiting on synchronisation primitives (say, blocked by pthread_mutex_lock), and the only processes / threads that could wake the ones waiting (say, execute pthread_mutex_unlock) are within the waiting ones, then a deadlock occurred.
A deadlock usually means that the processing stops.

It is also possible to run into a state when processes don't stop, but can't progress either. This is called a livelock. Imagine the following code:

/* x, y and mtx are shared */
/* the programmer believed that eventually x equals y, but now x==1, y==0 and only those two processes run: */
/* Thread 1 *//* Thread 2 */
char loop = 1;char loop = 1;
while(loop) {while(loop) {
  pthread_mutex_lock(&mtx);   │      pthread_mutex_lock(&mtx);
  if (*x==0 && *y==0) {if (*x==1 && *y==1) {
    /* do business logic *//* do business logic */
    loop = 0;                 │        loop = 0;
  }}
  pthread_mutex_unlock(&mtx); │      pthread_mutex_unlock(&mtx);
}}

Fairness describes whether all processes / threads have equal chance to enter the critical section. Sometimes priorities are intentionally given to selected processes / threads. Incorrect implementation of priorities may lead to priority inversion.
If the algorithm is completely unfair for some process / thread, it can starve by never entering the critical section on contention.

Critical sections, deadlocks

Exercise 9 Which lines of code are the critical section in the code below?

Exercise 10 Spot the deadlock scenario in the code below.

Exercise 11 Fix the code so that it no longer is able to deadlock.

another_bad_idea.c
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
 
struct {
  pthread_mutex_t mtx;
  char text[256];
} item[5];
 
const char *arg0;
 
void *threadFunc(intptr_t num) {
  char name[1024], cmd[1024];
  sprintf(name, "%s.pipe.%ld", arg0, num);
  sprintf(cmd, "rm -f %s; mkfifo %s", name, name);
  system(cmd);
  FILE *myPipe = fopen(name, "r+");
 
  while (1) {
    char line[1024], nl;
    fscanf(myPipe, "%1023[^\n]%c", line, &nl);
 
    int argOne = atoi(line);
    if (argOne >= 5 || argOne < 0)
      continue;
 
    char *argTwoTxt = strchr(line, ' ');
 
    if (!argTwoTxt) {
      pthread_mutex_lock(&item[argOne].mtx);
      printf("T%ld reads %d as: %s\n", num, argOne, item[argOne].text);
      pthread_mutex_unlock(&item[argOne].mtx);
      continue;
    }
 
    argTwoTxt++;
    char *e;
    int argTwo = strtol(argTwoTxt, &e, 10);
 
    if (!*e && argTwo < 5 && argTwo >= 0 && argOne != argTwo) {
      pthread_mutex_lock(&item[argOne].mtx);
      pthread_mutex_lock(&item[argTwo].mtx);
      printf("T%ld copies %d to %d\n", num, argTwo, argOne);
      memcpy(item[argOne].text, item[argTwo].text, sizeof(item[argOne].text));
      pthread_mutex_unlock(&item[argTwo].mtx);
      pthread_mutex_unlock(&item[argOne].mtx);
    } else {
      pthread_mutex_lock(&item[argOne].mtx);
      printf("T%ld assigns to %d the value: %s\n", num, argOne, argTwoTxt);
      memset(item[argOne].text, 0, sizeof(item[argOne].text));
      strncpy(item[argOne].text, argTwoTxt, sizeof(item[argOne].text) - 1);
      pthread_mutex_unlock(&item[argOne].mtx);
    }
  }
}
 
int main(int argc, char **argv) {
  arg0 = argv[0];
 
  printf("To use this program, write to one of the %s.pipe.<thread_id> the "
         "following:\n"
         "  <num>             prints <text> from item <num>\n"
         "  <num> <text>      puts <text> to item <num>\n"
         "  <num1> <num2>     copies to item <num1> the text from item <num2>\n"
         "Valid pipe numbers are 0-4, valid item numbers are 0-4.",
         arg0);
 
  for (int i = 0; i < 5; ++i)
    pthread_mutex_init(&item[i].mtx, NULL);
 
  for (intptr_t i = 1; i < 5; ++i) {
    pthread_t tid;
    pthread_create(&tid, NULL, (void *(*)(void *))threadFunc, (void *)i);
    pthread_detach(tid);
  }
  threadFunc(0);
}

Thread-specific data, thread local storage [extra]

In C, data items can have different storage duration and different linkage. Until C11, there existed only static, automatic or allocated storage durations. Roughly:
 • static = global variables and static variables defined inside functions,
 • automatic = items on stack, automatically allocated inside functions ('local variables'),
 • allocated = items on heap, explicitly allocated and freed.
None of these cover a case when one wants a static ("global") variable with a separate value for each thread. Such storage duration is called thread local and a coupe of ways to create thread-local items are presented in this section.

POSIX Thread-specific data

One can create keys (that may be understood as identifiers of variables) and then associate, for each thread separately, a value for a key. To this end the following functions can be used:
Need header:
pthread.h
int pthread_key_create(pthread_key_t *key, void (*destructor)(void*))
int pthread_setspecific(pthread_key_t key, const void *value)
void *pthread_getspecific(pthread_key_t key)

Each key is initially associated with a NULL for each thread.

An example code that creates a key, stores and retrieves the value from two threads.

C thread-local storage

Starting with C11, a new storage duration _Thread_local or thread_local3) is defined. Variables defined with this storage duration have unique value for each thread. Notice that the thread_local variables can have initial values (unlike in POSIX), but there is no way to provide a custom destructor (unlike in POSIX).

POSIX condition variables

Condition variables are strictly related to mutexes. They allow a thread that holds a mutex to release the mutex and at the same time start waiting for a wakeup signal from another thread. When the other thread signals the condition variable, then either one or all threads waiting on that variable are waken and each of these threads enters the procedure of locking back the mutex.

Using condition variables is the most common way to communicate threads.

When one thread needs to execute some logic once a specific condition is fulfilled, it should:

  1. lock a mutex,
  2. check the condition,
  3. if the condition is false:
    1. wait on a condition variable,
    2. go back to step 2,
  4. do the logic,
  5. unlock the mutex.

A thread that may change the state and thus affect the condition should:

  1. lock the mutex,
  2. do its logic,
  3. signal the condition variable,
  4. unlock the mutex4).

The example conditions include:

  • a boolean flag is set; the flag indicates that another thread finished a part of the computation and the results can used,
  • a list of items is not empty; the list contains tasks to be done by this thread,
  • a number crossed some threshold; the number is the size of items to potentially garbage collect.

To use a condition variable, one has to declare a pthread_cond_t variable, and then initialize it with:
Needs header: pthread.h pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
to use default semantic, or with an initialisation function that allows choosing desired semantic:
Needs header: pthread.h int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr)

A thread holding mutex mutex can use the function:
Needs header: pthread.h int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex)
to atomically release the mutex and start waiting on cond.

Another thread can use one of the functions:
Needs header: pthread.h int pthread_cond_signal(pthread_cond_t *restrict cond)
int pthread_cond_broadcast(pthread_cond_t *restrict cond)

to wake at least one (pthread_cond_signal), or all (pthread_cond_broadcast) threads waiting on the condition variable cond.
The thread that is signalling/broadcasting does not need to hold the mutex.

Exercise 12 The program below attempts to calculate 10 items (imagine that it tries to create ten class timetables) among four threads, and runs one thread that waits until the ten items are ready. Right now, the program is incorrect.
      • The program is not thread-safe. Propose an interleaving that looses an element.
      • Use mutex(es) to amend the program.
      • The thread collecting items wastes CPU on busy waiting. Point the line that wastes CPU.
      • Add condition variable(s) to amend the program.

linkedlist.c
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
 
struct element {
  int value;
  struct element *next;
} *head = NULL, *tail = NULL;
 
void add_element(int v) {
  struct element *e = malloc(sizeof(struct element));
  e->next = NULL;
  e->value = v;
  if (tail == NULL)
    head = tail = e;
  else
    tail = tail->next = e;
}
 
int take_element() {
  int v = head->value;
  struct element *e = head;
  head = head->next;
  if (head == NULL)
    tail = NULL;
  free(e);
  return v;
}
 
#define DO_HEAVY_COMPUTATION (usleep(rand() % 10000000), rand() % 1000000)
 
void *worker_thread(void *arg) {
  srand(time(0) + (intptr_t)arg);
  while (1) {
    int v = DO_HEAVY_COMPUTATION;
    pthread_testcancel();
    add_element(v);
  }
}
 
int main() {
  pthread_t t[4];
  for (int i = 0; i < 4; ++i)
    pthread_create(t + i, NULL, worker_thread, NULL);
 
  int computed_count = 0;
  while (computed_count < 10) {
    while (head == NULL);
    int value = take_element();
    printf("%02d: %d\n", ++computed_count, value);
  }
 
  for (int i = 0; i < 4; ++i)
    pthread_cancel(t[i]);
  for (int i = 0; i < 4; ++i)
    pthread_join(t[i], NULL);
 
  return 0;
}

Exercise 13 Write a program that starts N worker threads that wait for tasks to do. In the main thread read commands from standard input. Once a command is read, one of the worker threads should execute the command and once the command is executed, the worker thread should print the result to the standard output.
For simplicity let the command be an integer, and let the execution procedure be sleeping for this many seconds.
This idea of creating worker threads and passing them tasks is very popular bears the name thread pool in software engineering.

C mutexes and condition variables [extra]

C11 language standard brought mutexes and condition variables to C.
The API for these synchronisation constructs looks like a simplified POSIX API:

#include <stdio.h>
#include <threads.h>
 
mtx_t mutex;
cnd_t condition_variable;
 
char shared_data = 0;
 
int func(void *arg);
 
int main() {
  mtx_init(&mutex, mtx_plain | mtx_recursive);
  cnd_init(&condition_variable);
  thrd_t t;
  thrd_create(&t, func, NULL);
  char c = getchar();
 
  mtx_lock(&mutex);
  shared_data = c;
  mtx_unlock(&mutex);
  cnd_signal(&condition_variable);
 
  thrd_join(t, NULL);
  return 0;
}
 
int func(void *arg) {
  char c;
 
  mtx_lock(&mutex);
  while (shared_data == 0)
    cnd_wait(&condition_variable, &mutex);
  c = shared_data;
  mtx_unlock(&mutex);
 
  printf("%c\n", c ^ 0x20);
 
  return thrd_success;
}

For a full reference, consult for instance: https://en.cppreference.com/w/c/thread#Mutual_exclusion

Atomic variables [extra]

Current instruction set architectures offer atomic instructions. An atomic instruction observes all other instructions either fully executed or not executed at all and itself is observed by other instructions either fully executed or not executed at all. Atomic instructions themselves might not impose any order among instructions.

Some examples of atomic instructions are:

Such instructions are usually the base building blocks of synchronisation primitives (e.g., mutexes), but can be useful themselves to the end users. For instance, imagine that you need to assign unique identifiers from multiple threads – then a single atomic fetch-and-add (or even atomic increment) suffices.

C11 standardized invoking the atomic instructions. To use them, the variable has to be declared with _Atomic type specifier or atomic_… convenience type alias (e.g. _Atomic int i; or the equivalent atomic_int i;).
On architectures that do not support one of the atomic operations used on a variable, a lock is automatically acquired for the duration of each operation on that variable. One can learn whether the operation is lock-free by checking the value of appropriate macros.
The list of available atomic operations includes: load, store, swap, test-and-set, compare-and-swap, fetch-and-add.
For details, consult e.g., https://en.cppreference.com/w/c/thread#Atomic_operations.

Each atomic instruction in C can be also combined with a memory ordering instruction. A memory ordering instruction can be issued explicitly using atomic_thread_fence. The memory model of C, as well as the memory ordering enum values, are described among others in here: https://en.cppreference.com/w/c/atomic/memory_order.

1) To specify a pointer without specifying the type of underlying data one has to use void *.
2) returning -1 from pthread_mutex_unlock and setting errno accordingly.
3) Until C23 thread_local was a macro defined in threads.h, since C23 it is a keyword with same meaning as the _Thread_local keyword.
4) One can also signal the condition variable after unlocking the mutex.
os_cp/threads.txt · Last modified: 2024/04/18 11:42 by jkonczak