Bezbolesne Programowanie Współbieżne¶

Wykład dla koła DSG, 9 maj 2012, Politechnika Poznańska
Kilka prostych technik które pozwalają w pełni wykorzystać wielordzeniowe procesory ale bez przejmowania się tymi wszystkimi problemami które powszechnie kojarzą się z programowaniem współbieżnym. W końcu w dzisiejszych czasach większość procesorów z jakimi się spotykamy jest wielordzeniowa, natomiast większość programistów nie wykorzystuje potencjału wielu rdzeni, bo programowanie współbieżne jest trudne.
Tematyka przydatna każdemu kto myśli o pisaniu programów. Nie jest wymagana żadna wiedza wstępna, poza absolutnymi podstawami programowania.
Techniki które będę pokazywał to “wątki funkcyjne” i pamięć transakcyjna. Technologia to głównie język Python, oraz przykłady w językach Java i C#.
Prezentujący¶
Koło DSG¶
Więcej informacji na temat koła DSG i organizowanych przez nas wykładach znajdziesz na wiki: http://dsg.cs.put.poznan.pl/wiki .
Aby otrzymywać Dołącz do naszego http://dsg.cs.put.poznan.pl/forum lub publicznej listy dyskusyjnej koła naukowego (skisr-kolo@libra.cs.put.poznan.pl): formularz zgłoszeniowy.
Przykłady¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | #!/usr/bin/env python
from os import listdir
files = listdir()
from hashlib import md5
from _thread import start_new_thread # python2: thread
from threading import Semaphore, Barrier
hashes = [None] * len(files)
semaphore, barrier = Semaphore(4), Barrier(len(files) + 1)
def make_hash(i):
# There's a problem with this semaphore, but you're going to have to figure
# it out for yourself. Tough cookies.
with semaphore:
source = open(files[i], 'rb')
data = source.read()
source.close()
hashes[i] = md5(data).digest()
barrier.wait()
for i in range(0, len(files)):
start_new_thread(make_hash, (i,))
barrier.wait()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | #!/usr/bin/env python
from random import choice, randint
from threading import Thread
from Axon.STM import Store # Kamaelia (Axon 1.7)
from Axon.STM import BusyRetry, ConcurrentUpdate
class Transaction (Thread):
def __init__(self, f, a):
Thread.__init__(self)
self._f = f
self._a = a
def run(self):
while True:
try:
self._f(*self._a)
except ConcurrentUpdate:
continue
except BusyRetry:
continue
break
ACCOUNTS = 100
TRANSFERS = 10
accounts = Store()
def init(ids, sum):
_accounts = accounts.using(*ids)
[_accounts[i].set(sum) for i in ids]
_accounts.commit()
def total(ids):
_acccounts = accounts.using(*ids)
total = sum([_accounts[i] for i in ids])
_accounts.commit()
def transfer(a, b, sum):
_accounts = accounts.using(a, b)
_accounts[a].set(_accounts[a].value - sum)
_accounts[b].set(_accounts[b].value + sum)
_accounts.commit()
ids = list(range(0, ACCOUNTS))
init(ids, 200)
transactions = []
for i in range(0, TRANSFERS):
a, b = choice(ids), choice(ids)
s = randint(5, 20)
t = Transaction(transfer, (a, b, s))
t.start()
transactions.append(t)
t = Transaction(total, (ids,))
t.start()
transactions.append(t)
[t.join() for t in transactions]
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | import java.io.FileInputStream;
import java.io.InputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
public class Hashes {
private static String createHash(String file) throws Exception {
InputStream fis = new FileInputStream(file);
MessageDigest d = MessageDigest.getInstance("MD5");
byte[] buffer = new byte[1024];
for (int n = 0; n != -1;) {
n = fis.read(buffer);
if (n > 0) {
d.update(buffer, 0, n);
}
}
fis.close();
return d.digest().toString();
}
public static void main(String[] args) throws InterruptedException,
BrokenBarrierException {
final String[] files = args;
final List<String> hashes = new ArrayList<String>();
for (int i = 0; i < files.length; i++) {
hashes.add(null);
}
Collections.fill(hashes, null);
final Semaphore semaphore = new Semaphore(4);
final CyclicBarrier barrier = new CyclicBarrier(files.length + 1);
for (int i = 0; i < files.length; i++) {
final int index = i;
new Thread() {
public void run() {
try {
semaphore.acquire();
hashes.set(index, createHash(files[index]));
semaphore.release();
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}
barrier.await();
}
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | import java.util.LinkedList;
import java.util.List;
import java.util.Random;
// -javaagent:bin/deuceAgent.jar
public class Bank {
public static final int TRANSFERS = 10;
public static final int ACCOUNTS = 100;
private final double[] accounts = new double[ACCOUNTS];
public Bank() {
for (int i = 0; i < accounts.length; i++) {
accounts[i] = 200;
}
}
@Atomic
public void total() {
double total = 0d;
for (int i = 0; i < accounts.length; i++) {
total += accounts[i];
}
}
@Atomic
public void transfer(int a, int b, double sum) {
accounts[a] -= sum;
accounts[b] += sum;
}
public static void main(String[] args) throws InterruptedException {
final Random random = new Random();
final Bank bank = new Bank();
List<Thread> transactions = new LinkedList<Thread>();
for (int i = 0; i < TRANSFERS; i++) {
final int a = random.nextInt(ACCOUNTS);
final int b = random.nextInt(ACCOUNTS);
final double sum = random.nextDouble() % 15d + 5d;
Thread t = new Thread() {
public void run() {
bank.transfer(a, b, sum);
}
};
t.start();
transactions.add(t);
}
{
Thread t = new Thread() {
public void run() {
bank.total();
}
};
t.start();
transactions.add(t);
}
for (Thread t : transactions) {
t.join();
}
}
}
|