Bezbolesne Programowanie Współbieżne

../_images/painless-poster.jpg

Wykład dla koła DSG, 9 maj 2012, Politechnika Poznańska

Kilka prostych technik które pozwalają w pełni wykorzystać wielordzeniowe procesory ale bez przejmowania się tymi wszystkimi problemami które powszechnie kojarzą się z programowaniem współbieżnym. W końcu w dzisiejszych czasach większość procesorów z jakimi się spotykamy jest wielordzeniowa, natomiast większość programistów nie wykorzystuje potencjału wielu rdzeni, bo programowanie współbieżne jest trudne.

Tematyka przydatna każdemu kto myśli o pisaniu programów. Nie jest wymagana żadna wiedza wstępna, poza absolutnymi podstawami programowania.

Techniki które będę pokazywał to “wątki funkcyjne” i pamięć transakcyjna. Technologia to głównie język Python, oraz przykłady w językach Java i C#.

Prezentujący

Pliki

  • Prezentacja (PDF)

Koło DSG

Więcej informacji na temat koła DSG i organizowanych przez nas wykładach znajdziesz na wiki: http://dsg.cs.put.poznan.pl/wiki .

Aby otrzymywać Dołącz do naszego http://dsg.cs.put.poznan.pl/forum lub publicznej listy dyskusyjnej koła naukowego (skisr-kolo@libra.cs.put.poznan.pl): formularz zgłoszeniowy.

Przykłady

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python

from os import listdir
files = listdir()

from hashlib import md5
from _thread import start_new_thread # python2: thread
from threading import Semaphore, Barrier

hashes = [None] * len(files)
semaphore, barrier = Semaphore(4), Barrier(len(files) + 1)

def make_hash(i):
    # There's a problem with this semaphore, but you're going to have to figure
    # it out for yourself. Tough cookies.
    with semaphore:
        source = open(files[i], 'rb')
        data = source.read()
        source.close()
        hashes[i] = md5(data).digest()
    barrier.wait()

for i in range(0, len(files)):
    start_new_thread(make_hash, (i,))

barrier.wait()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#!/usr/bin/env python

from random import choice, randint
from threading import Thread
from Axon.STM import Store # Kamaelia (Axon 1.7)
from Axon.STM import BusyRetry, ConcurrentUpdate

class Transaction (Thread):
    def __init__(self, f, a):
        Thread.__init__(self)
        self._f = f
        self._a = a
    def run(self):
        while True:
            try:
                self._f(*self._a)
            except ConcurrentUpdate:
                continue
            except BusyRetry:
                continue
            break

ACCOUNTS = 100
TRANSFERS = 10
accounts = Store()

def init(ids, sum):
    _accounts = accounts.using(*ids)
    [_accounts[i].set(sum) for i in ids]
    _accounts.commit()

def total(ids):
    _acccounts = accounts.using(*ids)
    total = sum([_accounts[i] for i in ids])
    _accounts.commit() 

def transfer(a, b, sum):
    _accounts = accounts.using(a, b)
    _accounts[a].set(_accounts[a].value - sum)
    _accounts[b].set(_accounts[b].value + sum)
    _accounts.commit()
    
ids = list(range(0, ACCOUNTS))
init(ids, 200)

transactions = []
for i in range(0, TRANSFERS):
    a, b = choice(ids), choice(ids)
    s = randint(5, 20)
    t = Transaction(transfer, (a, b, s))          
    t.start()
    transactions.append(t)

t = Transaction(total, (ids,))
t.start()
transactions.append(t)

[t.join() for t in transactions]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.io.FileInputStream;
import java.io.InputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;

public class Hashes {

    private static String createHash(String file) throws Exception {
        InputStream fis = new FileInputStream(file);
        MessageDigest d = MessageDigest.getInstance("MD5");
        byte[] buffer = new byte[1024];
        for (int n = 0; n != -1;) {
            n = fis.read(buffer);
            if (n > 0) {
                d.update(buffer, 0, n);
            }
        }
        fis.close();
        return d.digest().toString();
    }

    public static void main(String[] args) throws InterruptedException,
                                                    BrokenBarrierException {
        final String[] files = args;

        final List<String> hashes = new ArrayList<String>();
        for (int i = 0; i < files.length; i++) {
            hashes.add(null);
        }
        Collections.fill(hashes, null);
        final Semaphore semaphore = new Semaphore(4);
        final CyclicBarrier barrier = new CyclicBarrier(files.length + 1);

        for (int i = 0; i < files.length; i++) {
            final int index = i;
            new Thread() {
                public void run() {
                    try {
                        semaphore.acquire();
                        hashes.set(index, createHash(files[index]));
                        semaphore.release();
                        barrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }.start();
        }
        
        barrier.await();
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import java.util.LinkedList;
import java.util.List;
import java.util.Random;

// -javaagent:bin/deuceAgent.jar
public class Bank {
	public static final int TRANSFERS = 10;
	public static final int ACCOUNTS = 100;

	private final double[] accounts = new double[ACCOUNTS];

	public Bank() {
		for (int i = 0; i < accounts.length; i++) {
			accounts[i] = 200;
		}
	}

	@Atomic
	public void total() {
		double total = 0d;
		for (int i = 0; i < accounts.length; i++) {
			total += accounts[i];
		}
	}

	@Atomic
	public void transfer(int a, int b, double sum) {
		accounts[a] -= sum;
		accounts[b] += sum;
	}

	public static void main(String[] args) throws InterruptedException {
		final Random random = new Random();
		final Bank bank = new Bank();

		List<Thread> transactions = new LinkedList<Thread>();

		for (int i = 0; i < TRANSFERS; i++) {
			final int a = random.nextInt(ACCOUNTS);
			final int b = random.nextInt(ACCOUNTS);
			final double sum = random.nextDouble() % 15d + 5d;

			Thread t = new Thread() {
				public void run() {
					bank.transfer(a, b, sum);
				}
			};
			t.start();
			transactions.add(t);
		}

		{
			Thread t = new Thread() {
				public void run() {
					bank.total();
				}
			};
			t.start();
			transactions.add(t);
		}

		for (Thread t : transactions) {
			t.join();
		}
	}
}