User Tools

Site Tools


Sidebar

os_cp:semaphores

Semaphores

What is a semaphore

A semaphore is a mechanism to control concurrency. It has an associated number that can be concurrently incremented and decremented by multiple processes / threads. Incrementing a semaphore is instantaneous, and so is decrementing a semaphore which value is positive.
Decrementing a semaphore which value is zero waits until the value becomes nonzero, that is until another process / thread increments the semaphore.

Semaphore is an important theoretical primitive for synchronizing processes or threads. Semaphores are either considered binary or counting: the former may have value of 0 or 1, and the latter may have any non-negative value.
The real-world programming libraries usually build semaphores on top of other synchronization primitives, offer only counting semaphores, and have multiple undefined behaviours whenever semaphores are used incorrectly.

POSIX standardizes two implementations of semaphores: POSIX semaphores and System V semaphores. While the standard C++ libraries include semaphores, there are no plans to include them in the standard C library (unlike other synchronisation primitives, that became already part of C11). C++ semaphore semantic is defined with respect to threads only.


Semantic of the operation
Name of the operation in:
theory POSIX C++/Java
incrementVverhogensem_postrelease
wait until nonzero and decrementPproberensem_waitacquire
read the valuesem_getvalue—/availablePermits

POSIX semaphore API

There are two ways of creating a POSIX semaphore:

  • Named semaphore: create / open a semaphore associated with an identifier using:
    Needs headers:
    semaphore.h and fcntl.h
    sem_t *sem_open(const char *name, int oflag)
    sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value)

    These functions work just like opening a file / opening a shared memory object.
    The extra value argument is the initial value of the semaphore.
  • Unnamed semaphore: create a variable of the sem_t type in (usually shared) memory and initialize it using:
    Needs header: semaphore.hint sem_init(sem_t *sem, int pshared, unsigned value);
    If pshared is logically false (and so equal 0), the semaphore is only guaranteed to work among threads of this process. Else, if pshared is logically true (that is, has a nonzero value) the semaphore is guaranteed to work for threads of distinct processes as well. Again, value sets the initial value of the semaphore.

To decrement a semaphore (possibly waiting thereby), one shall call:
Needs header: semaphore.h int sem_wait(sem_t *sem);
The POSIX semaphores offer also non-blocking (sem_trywait) and time-restricted (sem_timedwait) variants of the sem_wait function.

To increment a semaphore, one shall call:
Needs header: semaphore.h int sem_post(sem_t *sem);

It is possible to read the current value of a semaphore (this is useful for logging and debugging purposes) using:
Needs header: semaphore.h int sem_getvalue(sem_t *sem, int *sval);
Notice that the value is written to sval (and the function returns, like most POSIX functions, 0 on success and -1 on error).

Once an unnamed semaphore is no longer needed by any process, it shall be destroyed by sem_destroy.
Once an named semaphore is no longer needed by this process, it shall be closed with sem_close.
Once the name of a named semaphore is no longer needed, it shall be removed by sem_unlink1).

To compile programs that use semaphores with older glibc versions, one must add -pthread to compile options.

Exercise 1 Write a program that creates a named semaphore with value of 0 and waits on it, and a program that opens the named semaphore and posts it.

Exercise 2 Write a program that allows adding or retrieving data to/from a ring buffer located in shared memory and synchronizes processes accessing the buffer by means of named semaphores.
A non-concurrent ring buffer code is provided below:

non_concurrent_ring_buffer.c
#include <signal.h>
#include <stdio.h>
#include <string.h>
 
#define MAX_ELEMENTS_PLUS_ONE 4
typedef char item_t[256];
 
struct ring_buffer {
  item_t data[MAX_ELEMENTS_PLUS_ONE];
  size_t first;
  size_t last;
};
 
void initBuffer(struct ring_buffer *buffer) {
  memset(buffer, 0, sizeof(*buffer));
}
 
void put(struct ring_buffer *buffer, const item_t *item) {
  if ((buffer->last + 1) % MAX_ELEMENTS_PLUS_ONE == buffer->first)
    // the buffer is full, now what?
    raise(SIGUSR1);
  memcpy(&buffer->data[buffer->last], item, sizeof(*item));
  buffer->last = (buffer->last + 1) % MAX_ELEMENTS_PLUS_ONE;
}
 
void get(struct ring_buffer *buffer, item_t *item) {
  if (buffer->last == buffer->first)
    // the buffer is empty, what now?
    raise(SIGUSR2);
  memcpy(item, &buffer->data[buffer->first], sizeof(*item));
  buffer->first = (buffer->first + 1) % MAX_ELEMENTS_PLUS_ONE;
}
 
int main() {
  struct ring_buffer buffer;
  initBuffer(&buffer);
  printf("g             gets data from buffer\n"
         "ptext...      puts 'text...' to buffer\n");
  while (1) {
    printf("> ");
    fflush(stdout);
    char cmd[2], c;
    item_t item = {0};
    scanf("%1[^\n]%255[^\n]", cmd, item);
    do { // this reads all remaining characters in this line including '\n'
      c = getchar();
      if (c == -1)
        return 0;
    } while (c != '\n');
    switch (cmd[0]) {
    case 'p':
      put(&buffer, &item);
      break;
    case 'g':
      get(&buffer, &item);
      printf("%s\n", item);
      break;
    }
  }
  return 0;
}

Exercise 3 Analyze the program below. The program is not correct – when run concurrently multiple times, one of its instances can block forever. Add a sleep function (or run the program in a debugger) so that the problem manifests.

bad_idea.c
#include <fcntl.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <unistd.h>
 
struct data {
  sem_t sem;
} *data;
 
int main() {
  int fd = shm_open("/myData", O_RDWR | O_CREAT | O_EXCL, 0666);
  char createSucceeded = (fd != -1);
  if (fd == -1) fd = shm_open("/myData", O_RDWR, 0666);
  if (fd == -1) { perror("shm_open"); exit(1); }
 
  ftruncate(fd, sizeof(struct data));
  data = mmap(NULL, sizeof(struct data), PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
 
  if (createSucceeded)
    sem_init(&data->sem, 1, 1);
 
  sem_wait(&data->sem);
  printf("I'm alone here!\n");
  sem_post(&data->sem);
 
  return 0;
}

[extra] Concurrency, parallelism

The aim of parallel programming is to take advantage of the fact that a computer can execute two or more sequences of instructions at the same time (literally).
One needs at least two physical threads (for instance, two CPU cores) for this.

Contention is the situation when two processes / threads want to use the same resource (say, the same variable) at the same time.

Concurrent programming deals with processes / threads that contend on some resources (and run either interleaved or in parallel).
Concurrency also concerns computers that have one physical thread (a single CPU core) whenever they can switch context from one process / thread to another.

When two processes / threads access the same data and at least one of the accesses modifies the data, then the race condition occurs – the result of the computation depends upon a chance.
Race conditions often lead to incorrect state of the data and thus introduce bugs (that are usually hard to pinpoint).

Regions of code that must be executed without being interleaved among each other are called critical sections. At most one process may be at a time inside inside a critical section.

When a process P wants to enter a critical section, and another process Q is executing a critical section, then P must wait (aka block) until Q exits the critical section.

When a number of processes / threads are waiting on synchronisation primitives (say, blocked by sem_wait), and the only processes / threads that could wake the ones waiting (say, execute sem_post) are within the waiting ones, then a deadlock occurred.
A deadlock usually means that the processing stops.

It is also possible to run into a state when processes don't stop, but can't progress either. This is called a livelock. Imagine the following code:

/* x, y and sem are shared */
/* the programmer believed that eventually x equals y, but now x==1, y==0 and only those two processes run: */
/* Process 1 *//* Process 2 */
char loop = 1;char loop = 1;
while(loop) {while(loop) {
  sem_wait(&sem);             │      sem_wait(&sem);
  if (*x==0 && *y==0) {if (*x==1 && *y==1) {
    /* do business logic *//* do business logic */
    loop = 0;                 │        loop = 0;
  }}
  sem_post(&sem);             │      sem_post(&sem);
}}

Fairness describes whether all processes / threads have equal chance to enter the critical section. Sometimes priorities are intentionally given to selected processes / threads. Incorrect implementation of priorities may lead to priority inversion.
If the algorithm is completely unfair for some process / thread, it can starve by never entering the critical section on contention.

Critical sections, deadlocks

Exercise 4 Which lines of code are the critical section in the code below?

Exercise 5 Spot the deadlock scenario in the code below.

Exercise 6 Fix the code so that it no longer is able to deadlock.

another_bad_idea.c
#include <fcntl.h>
#include <semaphore.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <unistd.h>
 
struct data {
  atomic_bool initialized;
  struct {
    sem_t sem;
    char text[256];
  } item[5];
} *data;
 
int main(int argc, char **argv) {
  if (argc < 2 || argc > 3) {
    printf("Usage:\n"
           "  %s <num>             prints <text> from item <num>\n"
           "  %s <num> <text>      puts <text> to item <num>\n"
           "  %s <num1> <num2>     copies to item <num1> the text from item <num2>\n",
           argv[0], argv[0], argv[0]);
    return 1;
  }
 
  int fd = shm_open("/fiveItems", O_RDWR | O_CREAT | O_EXCL, 0666);
  char createSucceeded = (fd != -1);
  if (fd == -1) fd = shm_open("/fiveItems", O_RDWR, 0666);
  if (fd == -1) { perror("shm_open"); exit(1); }
 
  ftruncate(fd, sizeof(struct data));
  data = mmap(NULL, sizeof(struct data), PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
 
  if (createSucceeded) {
    for (int i = 0; i < 5; ++i)
      sem_init(&data->item[i].sem, 1, 1);
    data->initialized = 1;
  } else
    while (!data->initialized);
 
  int argOne = atoi(argv[1]);
  if (argOne >= 5 || argOne < 0) return 1;
 
  if (argc == 2) {
    // print an item
    sem_wait(&data->item[argOne].sem);
    printf("%s\n", data->item[argOne].text);
    sem_post(&data->item[argOne].sem);
    return 0;
  }
 
  char *e;
  int argTwo = strtol(argv[2], &e, 10);
  if (*e == 0 && argTwo < 5 && argTwo >= 0 && argOne != argTwo) {
    // copy item to item
    sem_wait(&data->item[argOne].sem);
    sem_wait(&data->item[argTwo].sem);
    memcpy(data->item[argOne].text, data->item[argTwo].text, sizeof(data->item[argOne].text));
    sem_post(&data->item[argTwo].sem);
    sem_post(&data->item[argOne].sem);
  } else {
    // assign new value
    sem_wait(&data->item[argOne].sem);
    memset(data->item[argOne].text, 0, sizeof(data->item[argOne].text));
    strncpy(data->item[argOne].text, argv[2], sizeof(data->item[argOne].text) - 1);
    sem_post(&data->item[argOne].sem);
  }
 
  return 0;
}

1) The semaphore itself remains intact, and will be automatically destroyed as soon as no process has the semaphore open.
os_cp/semaphores.txt · Last modified: 2023/05/10 02:23 by jkonczak