===== Threads ===== A process can run multiple threads – multiple flows of control. \\ In general, the threads share all resources (such as memory or open files). \\ Each thread uses a separate stack, and obviously has its own state of the CPU registers. A limited set of properties might also be specific to a thread (e.g., signal mask, CPU affinity). Since 2012 the C and C%%+%%+ languages include API for concurrency, including threads. However, it is very generic, so that it can be implemented in all operating systems. ^ ^ POSIX ^ C ^ C%%+%%+ ^ |create a new thread|''pthread_create''|''thrd_create''|''std::thread'' constructor| |join a thread|''pthread_join''|''thrd_join''|''std::thread::join''| |detach a thread|''pthread_detach''|''thrd_detach''|''std::thread::detach''| |exit from a thread|''pthread_exit''|''thrd_exit''|—| |send signal to a specific thread|''pthread_kill''|—|—| A process always starts with a single thread called the main thread. \\ When the main thread returns from ''main'' function, or when any thread calls ''exit'' function, all threads are forcefully terminated. ===== pthreads ===== The POSIX standard defines an API for threads called **[[https://en.wikipedia.org/wiki/Pthreads|pthreads]]**. While each operating system has its own implementation of threads, virtually any Unix-like system implements the POSIX Threads API. \\ Linux implementation of threads is called [[https://en.wikipedia.org/wiki/Native_POSIX_Thread_Library|NPTL]]. Apart from the standard pthread functions, it offers several Linux-specific functions (which names end with ''_np'' to indicate non-portability). **To compile any program that uses pthreads, one had to add ''-pthread'' option upon invoking compiler and linker.** \\ This changed since version 2.34 of glibc, that moved POSIX thread routines to standard C library (libc). ==== Creating a thread ==== To start a new thread, one must point to the function that will be executed by the thread. \\ The function must take a pointer to arbitrary data as an argument and return an pointer to arbitrary data((To specify a pointer without specifying the type of underlying data one has to use ''void *''.)):
void * start_routine (void * argument){
…
return some_pointer;
}
pthread.h
''int **pthread_create**(pthread_t *restrict //threadIdentifier//,'' \\
'' const pthread_attr_t *restrict //attr//,'' \\
'' void *(*//start_routine//)(void *),'' \\
'' void *restrict //arg//);''\\
It will write the thread identifier to the variable pointed by ''//threadIdentifier//'',
start a new thread and run ''//start_routine//(//arg//)'' in it. If the ''//attr//''
argument is not NULL, thread attributes will be set before starting the thread.
#include
#include
#include
#include
#include
struct myStruct { int i; char s[16]; };
// function to be run by a thread; it must accept and return a 'void *'
void *func(void *rawArg) {
struct myStruct *arg = rawArg;
printf("%d %s\n", arg->i, arg->s);
free(arg);
return NULL;
}
int main() {
pthread_t ti; // pthread_t is a type that stores thread identifiers ('long unsigned int' in Linux)
// to pass several data to a thread, a structure is created on heap
struct myStruct *t1_arg = malloc(sizeof(struct myStruct));
t1_arg->i = 42; strcpy(t1_arg->s, "foo baz bar");
// the following line creates a new thread and runs func(t1_arg) in the thread.
pthread_create(&ti, NULL, func, t1_arg);
// terminating the main thread (or any other) with pthread_exit does not terminate the process
pthread_exit(NULL);
}
++++
pthread.h
''int **pthread_join**(pthread_t //thread//, void %%**%%//value_ptr//);''
\\
Calling the ''pthread_join'' waits until the thread ''//thread//'' terminates
and writes its return value to the ''void *'' pointer pointed by ''//value_ptr//''.
The ''//value_ptr//'' can be NULL, and then the return value is discarded.
#include
#include
#include
#include
#include
// this just does computation, there's nothing about threads here
void monteCarloPi(uint64_t *hit, uint64_t *miss) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
srand(ts.tv_nsec);
for (int i = 0; i < 1e6; ++i) {
double x = rand() / (double)RAND_MAX;
double y = rand() / (double)RAND_MAX;
if ((x*x + y*y) < 1) (*hit)++;
else (*miss)++;
}
}
struct theResult { uint64_t hit; uint64_t miss; };
void *runPi(void *arg) {
// preparing the return value - a structure 'theResult'
struct theResult *ret = malloc(sizeof(struct theResult));
// filling it with data
ret->hit = ret->miss = 0;
monteCarloPi(&ret->hit, &ret->miss);
// returning pointer to memory allocated on heap
return ret;
}
int main() {
pthread_t ti[3];
uint64_t hit = 0, miss = 0;
for (int i = 0; i < 3; ++i)
pthread_create(ti + i, NULL, runPi, NULL);
monteCarloPi(&hit, &miss);
for (int i = 0; i < 3; ++i) {
// create a variable that will store the location of the result
struct theResult *result;
// wait until thread ti[i] exits, and write its return value to result
pthread_join(ti[i], (void **)&result);
// use the result
hit += result->hit;
miss += result->miss;
// free the memory
free(result);
}
printf("%f\n", 4. * hit / (hit + miss));
return 0;
}
++++
pthread.h
''int **pthread_detach**(pthread_t //thread//);''
\\
The ''pthread_detach'' function returns without waiting. The value returned by
the detached thread will be discarded.
\\
A thread can detach itself (i.e. call ''pthread_detach(pthread_self());''), but
typically the thread that spawned it calls the detach.
#ifdef __GNUC__ // whenever GNU Compiler or compatible is detected,
#define _GNU_SOURCE // enable GNU Compiler extensions.
#endif
#include
#include
#include
#include
#include
#include
void *buzzer(void *arg) {
#ifdef _GNU_SOURCE // when GNU Compiler extensions are available,
pthread_setname_np(pthread_self(), "buzzer"); // assign a name for the thread
#endif
int i = 0;
while (1) {
usleep(1e5);
fprintf(stderr, "%06d BZZZZZTT\n", ++i);
}
}
void usr1(int num) {
signal(SIGUSR1, SIG_DFL);
char name[33] = "USR1 received by ";
#ifdef _GNU_SOURCE
pthread_getname_np(pthread_self(), name + 17, 16);
#endif
int len = strlen(name); name[len] = '\n';
write(STDERR_FILENO, name, len + 1);
pthread_exit(NULL); // terminate this thread
}
int main() {
// create a thread
pthread_t ti;
pthread_create(&ti, NULL, buzzer, NULL);
// and tell the operating system that the thread will never be joined
pthread_detach(ti);
sleep(1);
signal(SIGUSR1, usr1); // notice that threads share signal handlers
pthread_kill(ti, SIGUSR1); // sends a signal to a specified thread
sleep(1);
return 0;
}
++++
#include
#include
#include
unsigned long long version;
char text[1020];
void *updater(void * arg) {
while (1)
for (char letter = 'a'; letter <= 'z'; ++letter) {
version++;
for (int i = 0; i < 1020 - 1; ++i)
text[i] = letter;
// memset(text, letter, 1020);
}
}
int main() {
pthread_t tid;
pthread_create(&tid, NULL, updater, NULL);
while (getchar() != EOF)
printf("version: %llu\n text: %s\n", version, text);
return 0;
}
#include
#include
unsigned long long counter;
void *incrementer(void * arg) {
for (int i = 0; i < 1000; ++i)
counter++;
return NULL;
}
int main() {
pthread_t tid[16];
for (int i = 0; i < 16; ++i)
pthread_create(tid + i, NULL, incrementer, NULL);
for (int i = 0; i < 16; ++i)
pthread_join(tid[i], NULL);
printf("%llu\n", counter);
return 0;
}
pthread.h
''pthread_mutex_t //mutex// = **PTHREAD_MUTEX_INITIALIZER**;''
\\
to initialize a mutex with default semantics, or an initialisation function:
\\
Needs header: pthread.h
''int **pthread_mutex_init**(pthread_mutex_t *restrict //mutex//,'' \\
'' const pthread_mutexattr_t *restrict //attr//)''
When using the ''pthread_mutex_init'' function, one can select non-default semantics
for a mutex.
\\
To do so, one has to declare a ''pthread_mutexattr_t'' variable and
initialize it using:
\\
Needs header: pthread.h
''int **pthread_mutexattr_init**(pthread_mutexattr_t *//attr//)''
\\
and finally set the attributes using corresponding functions:
* ''int pthread_mutexattr_set**type**(pthread_mutexattr_t *//attr//, int //type//)'' sets mutex type (see below),
* ''int pthread_mutexattr_set**pshared**(pthread_mutexattr_t *//attr//, int //pshared//)'' allows sharing mutexes between processes (they need to reside in shared memory),
* ''int pthread_mutexattr_set**robust**(pthread_mutexattr_t *//attr//, int //robust//)'' when a thread holding a mutex terminated, a robust mutex will return appropriate error instead of waiting,
* ''[[https://pubs.opengroup.org/onlinepubs/9799919799/functions/pthread_mutexattr_setprotocol.html|pthread_mutexattr_setprotocol]]'' and ''[[https://pubs.opengroup.org/onlinepubs/9799919799/functions/pthread_mutexattr_setprioceiling.html|setprioceiling]]'' deal with priorities.
The mutex type impacts what happens when a mutex that is already locked is
locked again by the same thread:
* ''PTHREAD_MUTEX_NORMAL'' - the thread will deadlock (with itself),
* ''PTHREAD_MUTEX_ERRORCHECK'' - locking the mutex will fail (i.e., return ''-1'' and set ''errno'' accordingly),
* ''PTHREAD_MUTEX_RECURSIVE'' - the mutex counts how many times it was locked, and will unlock after this many unlocks,
* ''PTHREAD_MUTEX_DEFAULT'' - it is undefined what will happen.
Some mutex types also guarantee failing((returning ''-1'' from ''pthread_mutex_unlock'' and setting ''errno'' accordingly.)) to unlock a lock not owned by the current thread – all mutexes of ''PTHREAD_MUTEX_ERRORCHECK'' and ''PTHREAD_MUTEX_RECURSIVE'' do that, as well as any mutex set to be robust does that.
An example of creating a recursive mutex is as follows:
pthread_mutexattr_t recursiveAttrs;
pthread_mutexattr_init(&recursiveAttrs);
pthread_mutexattr_settype(&recursiveAttrs, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, &recursiveAttrs);
pthread.h
''int **pthread_mutex_lock**(pthread_mutex_t *//mutex//)'' \\
''int pthread_mutex_trylock(pthread_mutex_t *//mutex//)''
\\
''int pthread_mutex_timedlock(pthread_mutex_t *restrict //mutex//, const struct timespec *restrict //abstime//)''
\\
If the ''//mutex//'' is unlocked, the functions lock the mutex. If ''//mutex//'' is
currently locked, ''pthread_mutex_lock'' waits until the mutex becomes unlocked,
''pthread_mutex_trylock'' immediately returns ''-1'' and sets ''errno'' to ''EBUSY'',
and ''pthread_mutex_timedlock'' waits at most //abstime// for the lock to become available.
\\
When a mutex is misused, locking it may return ''-1'' and set ''errno''
accordingly.
To unlock a mutex, one has to use the function:
\\
Needs header: pthread.h
''int **pthread_mutex_unlock**(pthread_mutex_t *//mutex//)''
\\
It is important to remember that only the thread that locked the mutex can unlock it.
~~Exercise.#~~
Use a mutex to fix the programs from the two examples in the section
[[#accessing_the_same_data_from_multiple_threads|accessing the same data from multiple threads]].
#include
#include
#include
#include
typedef int element_t;
struct list {
struct node {
struct node *next, *previous;
element_t e;
} * head, *tail;
};
void list_init(struct list *l) { l->head = l->tail = NULL; }
struct node *list_add_back(struct list *l, element_t e) {
struct node *newNode = malloc(sizeof(struct node));
newNode->next = 0;
newNode->e = e;
if (l->tail == NULL) {
newNode->previous = NULL;
l->head = l->tail = newNode;
} else {
newNode->previous = l->tail;
l->tail->next = newNode;
l->tail = newNode;
}
return newNode;
}
void list_delete_node(struct list *l, struct node *n) {
if (n->previous == NULL)
l->head = n->next;
else
n->previous->next = n->next;
if (n->next == NULL)
l->tail = n->previous;
else
n->next->previous = n->previous;
free(n);
}
void list_delete_value(struct list *l, element_t e) {
struct node *n = l->head, *next;
while (n != NULL) {
next = n->next;
if (n->e == e)
list_delete_node(l, n);
n = next;
}
}
void list_print(struct list *l, int reverse) {
struct node *n = reverse ? l->tail : l->head;
while (n != NULL) {
printf("%d\n", n->e);
n = reverse ? n->previous : n->next;
}
}
void *adder(void *arg) {
struct list *l = (struct list *)arg;
srand(time(0));
while (1) {
list_add_back(l, rand() % 25);
usleep(10);
}
}
void *remover(void *arg) {
struct list *l = (struct list *)arg;
srand(time(0) + 2);
while (1) {
list_delete_value(l, rand() % 25);
usleep(5);
}
}
int main() {
struct list l;
list_init(&l);
pthread_t a, r;
pthread_create(&a, NULL, adder, &l);
pthread_create(&r, NULL, remover, &l);
pthread_detach(a);
pthread_detach(r);
while (getchar() != EOF)
list_print(&l, 0);
return 0;
}
void *beheader(void *arg) {
struct list *l = (struct list *)arg;
srand(time(0) + 4);
while (1) {
if (l->head != NULL)
list_delete_node(l, l->head);
usleep(50);
}
}
/* x, y and mtx are shared */
/* the programmer believed that eventually x equals y, but now x==1, y==0 and only those two processes run: */
/* Thread 1 */ │ /* Thread 2 */
char loop = 1; │ char loop = 1;
while(loop) { │ while(loop) {
pthread_mutex_lock(&mtx); │ pthread_mutex_lock(&mtx);
if (*x==0 && *y==0) { │ if (*x==1 && *y==1) {
/* do business logic */ │ /* do business logic */
loop = 0; │ loop = 0;
} │ }
pthread_mutex_unlock(&mtx); │ pthread_mutex_unlock(&mtx);
} │ }
…
10│ struct myIO {FILE *in; FILE *out;};
11│ void* threadFunc(void *num);
…
17│ struct {
18│ pthread_mutex_t mtx;
19│ char text[256];
20│ } item[5];
21│
22│ int main(int argc, char **argv) {
…
28│ for (intptr_t i = 1; i < 5; ++i) {
29│ pthread_t tid;
30│ pthread_create(&tid, NULL, threadFunc, (void *)i);
31│ pthread_detach(tid);
32│ }
33│ threadFunc(0);
34│ }
35│
36│ void *threadFunc(void* numRaw) {
37│ intptr_t num = (intptr_t) numRaw;
38│ struct myIO win = openWin(num);
…
48│ while (1) {
49│ fprintf(win.out, "> ");
50│
51│ char line[1024];
52│ fgets(line, 1024, win.in);
53│ line[strlen(line)-1] = 0;
54│
55│ int argOne = atoi(line);
…
61│ char *argTwoTxt = strchr(line, ' ');
62│
63│ if (!argTwoTxt) {
65│ pthread_mutex_lock(&item[argOne].mtx);
66│ fprintf(win.out, "T#%ld reads %d as: %s\n", num, argOne, item[argOne].text);
67│ pthread_mutex_unlock(&item[argOne].mtx);
68│ continue;
69│ }
70│
71│ argTwoTxt++;
72│ char *e;
73│ int argTwo = strtol(argTwoTxt, &e, 10);
74│
75│ if (!*e && argTwo < 5 && argTwo >= 0 && argOne != argTwo) {
77│ pthread_mutex_lock(&item[argOne].mtx);
78│ pthread_mutex_lock(&item[argTwo].mtx);
79│ fprintf(win.out, "T#%ld copies %d to %d\n", num, argTwo, argOne);
80│ memcpy(item[argOne].text, item[argTwo].text, sizeof(item[argOne].text));
81│ pthread_mutex_unlock(&item[argTwo].mtx);
82│ pthread_mutex_unlock(&item[argOne].mtx);
83│ } else {
85│ pthread_mutex_lock(&item[argOne].mtx);
86│ fprintf(win.out, "T#%ld assigns to %d the value: %s\n", num, argOne, argTwoTxt);
87│ memset(item[argOne].text, 0, sizeof(item[argOne].text));
88│ strncpy(item[argOne].text, argTwoTxt, sizeof(item[argOne].text) - 1);
89│ pthread_mutex_unlock(&item[argOne].mtx);
90│ }
91│ }
92│ }
…
pthread.h
''int **pthread_key_create**(pthread_key_t *//key//, void (*//destructor//)(void*))''
\\
''int **pthread_setspecific**(pthread_key_t //key//, const void *//value//)''
\\
''void %%*%%**pthread_getspecific**(pthread_key_t //key//)''
\\
Each key is initially associated with a ''NULL'' for each thread.
#include
#include
#include
#include
pthread_key_t mySpecificVal;
void printMine() {
printf("I have: %s\n", (char *)pthread_getspecific(mySpecificVal));
}
void *routine(void *arg) {
char *text = malloc(4);
strcpy(text, "baz");
pthread_setspecific(mySpecificVal, text);
printMine();
return NULL;
}
int main() {
char *text = malloc(4);
strcpy(text, "foo");
pthread_key_create(&mySpecificVal, free);
pthread_setspecific(mySpecificVal, text);
pthread_t ti;
pthread_create(&ti, NULL, routine, NULL);
pthread_join(ti, NULL);
printMine();
return 0;
}
++++
pthread_mutex_lock(&mutex); while(!condition) pthread_cond_wait(&condvar, &mutex); logic(); pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex); logic_that_changes_condition(); pthread_cond_signal(&condvar); pthread_mutex_unlock(&mutex);
pthread.h
''pthread_cond_t //cond// = **PTHREAD_COND_INITIALIZER**;''
\\
to use default semantic, or with an initialisation function that allows
choosing desired semantic:
\\
Needs header: pthread.h
''int pthread_cond_init(pthread_cond_t *restrict //cond//, const pthread_condattr_t *restrict //attr//)''
A thread holding mutex //mutex// can use the function:
\\
Needs header: pthread.h
''int **pthread_cond_wait**(pthread_cond_t *restrict //cond//, pthread_mutex_t *restrict //mutex//)''
\\
to atomically release the //mutex// and start waiting on //cond//.
Another thread can use one of the functions:
\\
Needs header: pthread.h
''int **pthread_cond_signal**(pthread_cond_t *restrict //cond//)''\\
''int **pthread_cond_broadcast**(pthread_cond_t *restrict //cond//)''
\\
to wake at least one (''pthread_cond_signal''), or all (''pthread_cond_broadcast'')
threads waiting on the condition variable ''cond''.
\\
The thread that is signalling/broadcasting does not need to hold the mutex.
~~Exercise.#~~
The program below attempts to calculate 10 items (imagine that it tries to create
ten class timetables) among four threads, and runs one thread that waits until
the ten items are ready. __Right now, the program is incorrect.__
\\ • The program is not thread-safe. Propose an interleaving that looses an element.
\\ • Use mutex(es) to amend the program.
\\ • The thread collecting items wastes CPU on busy waiting. Point the line that wastes CPU.
\\ • Add condition variable(s) to amend the program.
#include
#include
#include
#include
struct element {
int value;
struct element *next;
} *head = NULL, *tail = NULL;
void add_element(int v) {
struct element *e = malloc(sizeof(struct element));
e->next = NULL;
e->value = v;
if (tail == NULL)
head = tail = e;
else
tail = tail->next = e;
}
int take_element() {
int v = head->value;
struct element *e = head;
head = head->next;
if (head == NULL)
tail = NULL;
free(e);
return v;
}
#define DO_HEAVY_COMPUTATION (usleep(rand() % 10000000), rand() % 1000000)
void *worker_thread(void *arg) {
srand(time(0) + (intptr_t)arg);
while (1) {
int v = DO_HEAVY_COMPUTATION;
pthread_testcancel();
add_element(v);
}
}
int main() {
pthread_t t[4];
for (int i = 0; i < 4; ++i)
pthread_create(t + i, NULL, worker_thread, NULL);
int computed_count = 0;
while (computed_count < 10) {
while (head == NULL);
int value = take_element();
printf("%02d: %d\n", ++computed_count, value);
}
for (int i = 0; i < 4; ++i)
pthread_cancel(t[i]);
for (int i = 0; i < 4; ++i)
pthread_join(t[i], NULL);
return 0;
}
#include
#include
mtx_t mutex;
cnd_t condition_variable;
char shared_data = 0;
int func(void *arg);
int main() {
mtx_init(&mutex, mtx_plain | mtx_recursive);
cnd_init(&condition_variable);
thrd_t t;
thrd_create(&t, func, NULL);
char c = getchar();
mtx_lock(&mutex);
shared_data = c;
mtx_unlock(&mutex);
cnd_signal(&condition_variable);
thrd_join(t, NULL);
return 0;
}
int func(void *arg) {
char c;
mtx_lock(&mutex);
while (shared_data == 0)
cnd_wait(&condition_variable, &mutex);
c = shared_data;
mtx_unlock(&mutex);
printf("%c\n", c ^ 0x20);
return thrd_success;
}