Параллельное программирование: взаимодействие процессов и нитей

Параллельное программирование: нити

Использование нитей и мьютексов из стандартной библиотеки STL языка C++

Программа-жеребьевка

Создаются две нити, первая 10 раз печатает строку "Heads", вторая 10 раз печатает "Tails", затем нити завершают работу. В перерывах между печатями каждая нить засыпает на случайное время от 1 до 500 миллисекунд. Обе нити используют общий мьютекс для синхронизации доступа к экрану терминала (чтобы исключить кашу при печати).

Выигрывает та нить, которая напечатает свое слово последней.

Исходный текст программы: "draw.cpp".

Тест генератора случайных чисел: "randDevice.cpp".

Параллельное вычисление интеграла по отрезку

Программа вычисляет интеграл по отрезку [a, b], создавая n нитей. Отрезок разбивается на n частей равной длины, каждая нить считает интеграл по своей части отрезка, используя формулу Симпсона. Нити работают параллельно. Окончательно интеграл вычисляется как сумма интегралов по частям отрезка, которые вычислены нитями.

Число используемых параллельно нитей вводится пользователем с клавиатуры, так же как и пределы и шаг интегрирования. Программа измеряет время вычисления интеграла, и можно экспериментально определить оптимальное число нитей для данного компьютера.

Исходный текст программы: "parallelIntegral.cpp".

Пример работы программы: вычисляется интеграл от функции sin(x) по отрезку [0, π] с шагом 0.00000001. Сначала интеграл вычисляется с использованием только одной нити (без распараллеливания), затем с помощью 4-х нитей.

    /home/vvb/Threads>g++ -o parallelIntegral parallelIntegral.cpp
    /home/vvb/Threads>./parallelIntegral
    Enter the number of threads: 1
    Enter integral limits a, b: 0 3.14159
    Enter integration step dx: 1e-8
    Integral = 2
    Duration 8895.12 milliseconds.

    /home/vvb/Threads>./parallelIntegral
    Enter the number of threads: 4
    Enter integral limits a, b: 0 3.14159
    Enter integration step dx: 1e-8
    Integral = 2
    Duration 3868.73 milliseconds.
В примере при распараллеливании с использованием 4-х нитей программа выполняется более чем в 2 раза быстрее.

Использование условных переменных и мьютекса в схеме Producer-Consumer

Схема Producer-Consumer (Поставщик-Обработчик) была впервые описана Е.Дейкстрой еще в 1965 г. (Е.Дейкстра был первым, кто рассматривал параллельное программирования и заложил основы его теории). В схеме взаимодействуют 2 параллельных процесса, каждый из них работает в бесконечном цикле. Процесс Producer порождает порцию информации и передает ее процессу Consumer через общий буфер. Процесс Consumer обрабатывает полученную порцию информации и ожидает следующей порции. Общий буфер может представлять собой очередь или в простейшем случае только один объект в разделяемой памяти, через который передается информация. Возможет также вариант, когда процессов Consumer несколько, каждый из которых может параллельно обрабатывать свою порцию информации, полученную из очереди (процесс Prooducer обычно только один).

Дейкстра предлагал реализацию работы этой схемы через семафоры. Используются два семафора: через первый семафор процесс Producer сигнализирует процессу Consumer о том, что подготовлена новая порция информации. Через второй семафор процесс Consumer оповещает процесс Producer о том, что он готов принять очередную порцию информации.

Современные программы чаще используют более удобную реализацию подобной схемы с помощью мьютекса m (от слова mutex) и условной переменной cv (от condition variable). Мьютекс m ограничивает доступ к общему буферу для только одного процесса, который в данный момент времени владеет мьютексом. Через условную переменную cv процессы передают сигнал друг другу об изменении статуса общего буфера — Producer о том, что в буфер помещена новая порция данных, Consumer о том, что очередная порция данных обработана и он готов принять новую порцию данных. Передающая нить оповещает принимающую нить, вызывая метод notify_one объекта "условная переменная":

    lock.unlock();
    cv.notify_one();
(перед вызовом notify_one() надо освободить мьютекс!).

Отметим, что данную схему можно реализовать с помощью одной условной переменной, через которую оповещается информация об изменении статуса буфера. В этом случае она используется как для передачи информации от Producer'а к Consumer'у (когда он добавляет очередую порцию данных в очередь), так и в обратную сторону (когда Consumer удаляет очередную порцию данных из очереди для обработки). Однако это не самая совершенная схема, поскольку при наличии нескольких Consumer'ов все они будут оповещаться не только о приходе новой порции данных, но и об удалении очередной порции из очереди — а это сообщение предназначено только Producer'у. Поэтому логичнее использовать две условные переменные: через первую Producer оповещает Consumer'ов о том, что он подготовил новую порцию данных и добавил ее в очередь для обработки; через вторую, наоборот, каждый из процессов-Consumer'ов оповещает Producer'а о том, что из очереди удалена порция данных для обрабтки. В случае, когда очередь имеет ограниченную длину (и в частном случае, когда для обмена испольуется всего лишь один объект), ее длина уменьшается и Producer имеет возможность добавить в очередь следующую порцию данных. В рассмотренном ниже примере Producer оповещает Consumer'а о новой порции данных через условную переменную

    condition_variable cv_lineReady;
(означающую "строка готова для обработки"). Consumer оповещает Producer'а о том, что строка обработана и он готов принять следующую строку, через условную переменную
    condition_variable cv_readyToReceive;

Каждая нить ждет оповещения об изменении статуса буфера с помощью метода wait объекта "условная переменная" cv:

    cv.wait(lock);
перед этим нить должна захватить мьютекс. Метод wait приостанавливает нить и временно освобождает мьютекс до того, как придет оповещение от другой нити; по завершении wait нить пробудится и ей снова будет возвращен мьютекс.

В примере ниже нить-Producer вводит строку с клавиатуры и передает ее нити-Consumer'у, которая инвертирует принятую строку и печатает ее, это повторяется в бесконечном цикле. Для окончания надо ввести пустую строку. Исходный текст программы:
"prodCons.cpp".

Пример работы программы:

    /home/vvb/Threads>g++ -o prodCons prodCons.cpp -lpthread
    /home/vvb/Threads>./prodCons
    Producer -- enter a string (empty for quit):
    Race car
    Consumer -- line Inverted:
    rac ecaR

    Producer -- enter a string (empty for quit):
    12345
    Consumer -- line Inverted: 
    54321

    Producer -- enter a string (empty for quit):

    Producer thread is finishing
    Consumer thread finished

Приведение матрицы к ступенчатому виду с использованием параллельного алгоритма, нитей, мьютексов и условных переменных

Программа приведения матрицы к ступенчатому виду использует параллельный алгоритм Гаусса с помощью нитей, а также мьютексы и условные переменные в качестве объектов синхронизации. Матрица читается из файла "input.txt", полученная ступенчатая матрица записывается в файл "output.txt". Программа просит человека ввести число нитей и в конце печатает время выполнения алгоритма в миллисекундах. Исходный код программы:
"parallelMatrixSTL.cpp"

Программа генерации случайной матрицы и записи ее в файл:
"generateMatrix.cpp"

Пример выполнения:

/home/vvb/Threads>g++ -o generateMatrix generateMatrix.cpp
/home/vvb/Threads>./generateMatrix
Generating a Random Matrix
Enter the number of matrix rows and columns: 8 8
Enter the range of matrix elements (minValue, maxValue): -1 1
-0.124 0.446 0.105 0.165 0.234 0.988 -0.998 -0.401
-0.169 0.469 0.425 0.534 -0.430 0.054 0.116 -0.066
0.197 -0.868 0.549 0.170 0.555 -0.768 0.081 0.833
-0.837 0.447 0.235 0.416 0.688 -0.306 0.416 0.841
0.658 -0.533 -0.937 0.598 -0.326 0.425 -0.262 0.023
0.966 -0.072 -0.162 -0.452 -0.744 -0.954 0.580 0.473
-0.881 0.165 0.102 0.823 -0.268 0.993 -0.554 0.746
-0.268 0.823 -0.369 -0.936 -0.803 -0.060 0.513 0.532
Enter the name of file to write a matrix in:
input.txt
Writing the random matrix to the file input.txt

/home/vvb/Threads>g++ -o parallelMatrixSTL parallelMatrixSTL.cpp
/home/vvb/Threads>./parallelMatrixSTL
Matrix of size 8*8:
8 8
    -0.124     0.446     0.105     0.165     0.234     0.988    -0.998    -0.401
    -0.169     0.469     0.425     0.534    -0.430     0.054     0.116    -0.066
     0.197    -0.868     0.549     0.170     0.555    -0.768     0.081     0.833
    -0.837     0.447     0.235     0.416     0.688    -0.306     0.416     0.841
     0.658    -0.533    -0.937     0.598    -0.326     0.425    -0.262     0.023
     0.966    -0.072    -0.162    -0.452    -0.744    -0.954     0.580     0.473
    -0.881     0.165     0.102     0.823    -0.268     0.993    -0.554     0.746
    -0.268     0.823    -0.369    -0.936    -0.803    -0.060     0.513     0.532
Enter number of threads: 4
Row echelon form of matrix:
8 8
     0.966    -0.072    -0.162    -0.452    -0.744    -0.954     0.580     0.473
     0.000    -0.853     0.582     0.262     0.707    -0.573    -0.037     0.737
     0.000     0.000    -1.157     0.757    -0.220     1.400    -0.636    -0.717
     0.000     0.000     0.000     1.059    -0.317     0.437    -0.192    -0.028
     0.000     0.000     0.000     0.000    -0.732    -0.105     0.041     1.262
     0.000     0.000     0.000     0.000     0.000    -1.173     0.796     2.072
     0.000     0.000     0.000     0.000     0.000     0.000    -0.523     2.126
     0.000     0.000     0.000     0.000     0.000     0.000     0.000     0.433
Rank of matrix: 8
Determinant of matrix: -0.196517
Computation time: 1.011 ms

Список задач.


Использование Unix-библиотеки pthreads (POSIX Threads)

Два простейших примера на создание и синхронизацию нитей

  • Пример 1: синхронизация с помощью мьютекса. Создаются две нити, каждая из них 10 раз печатает строку "Thread N", где N -- номер нити, и затем завершает работу. В перерывах между печатями нить засыпает на случайное время от одной до трех секунд. Обе нити используют общий мьютекс для синхронизации доступа к экрану терминала (чтобы исключить кашу при печати).
  • Пример 2: синхронизация с помощью семафоров. Создаются две нити. Первая вводит строку с клавиатуры терминала и помещает ее в массив в статической памяти. После этого вторая нить инвертирует введенную строку и печатает перевернутую строку на экран. Эти действия повторяются в бесконечном цикле. Первая нить сообщает второй о том, что строка введена, с помощью семафора "inputReady". Вторая нить информирует первую о том, что она готова принять следующую строку, используя семафор "invertReady"

Обе программы содержатся в архиве Threads.zip


Использование программного канала для передачи данных между процессами: pipe

Для иллюстрации передачи данных между процессами рассматривается простейшая игра крестики-нолики, реализованная в виде оконной программы над X-Window, тексты проекта в архиве "XO.zip". Исходный процесс создает два односторонних программных канала (канал крестиков и канал ноликов) с помощью двух вызовов pipe:

int fdX[2], fdO[2]; if (pipe(fdX) < 0) { perror("Pipe X error"); exit(-1); } if (pipe(fdO) < 0) { perror("Pipe O error"); exit(-1); } Затем процесс раздваивается с помощью вызова fork: int pid = fork(); if (pid < 0) { perror("Fork error"); exit(-1); } Родительский процесс, отвечающий за игру крестиками, в дальнейшем использует первый канал fdX для чтения, а второй канал fdO для передачи ходов партнеру по игре. Соответственно процесс-ребенок, отвечающий за игру ноликами, использует первый канал fdX для передачи, а второй канал fdO для чтения ходов. Поэтому процессы первым делом закрывают неиспользуемые концы каждой трубы, а номера каналов для чтения и записи запоминаются в переменных-членах readChannel и writeChannel класса MyWindow, представляющего окно программы на экране: if (pid != 0) { // Parent process close(fdO[0]); close(fdX[1]); w.readChannel = fdX[0]; w.writeChannel = fdO[1]; w.IAmX = true; // I am X } else { // Child process (I am O) close(fdO[1]); close(fdX[0]); w.readChannel = fdO[0]; w.writeChannel = fdX[1]; } Затем каждый из процессов создает окно, в котором отображается поле игры char field[3][3] в крестики-нолики размером 3x3. При щелчке мыши в родительском процессе в соответствующую клетку ставится крестик, и информация о сделанном ходе передается парному процессу. В процессе-ребенке в клетку ставится нолик. Логическая переменная-член IAmX графического окна указывает, является ли данный процесс родительским, т.е. играет ли он крестиками. // Process mouse click void MyWindow::onButtonPress(XEvent& event) { int x = event.xbutton.x; int y = event.xbutton.y; int ix = x / DX; // Coordinates of field cell int iy = y / DY; int c = 1; // Cross if (!IAmX) c = 2; // or Null field[ix][iy] = c; // Put cross/null into a cell char line[3]; line[0] = (char) ix; line[1] = (char) iy; line[2] = (char) c; if (!finished) { // Send data to peer process int res = write(writeChannel, line, 3); if (res < 0) { finished = true; printf("Write: res=%d (connection broken)\n", res); } } redraw(); } Для приема сделанного хода от парного процесса используется функция select, которая вызывается в цикле обработки событий, когда очередь оконных событий пуста. Функция select реализует псевдо-асинхронный режим ввода-вывода (если данные еще не пришли, функция завершается по таймауту и, таким образом, не блокирует работу графической программы). Используемый таймаут равен 0.05 сек: // Message loop XEvent e; while (GWindow::m_NumCreatedWindows > 0) { if (GWindow::getNextEvent(e)) { GWindow::dispatchEvent(e); } else { int maxFD = w.readChannel; fd_set readSet; FD_ZERO(&readSet); FD_SET(w.readChannel, &readSet); fd_set* r = &readSet; if (w.finished) { maxFD = 0; r = NULL; } timeval dt; // Timeout dt.tv_sec = 0; dt.tv_usec = 50000; // Maximal sleeping time 0.05 sec int res = select(maxFD + 1, r, 0, 0, &dt); if (res > 0) { printf("Select: res=%d\n", res); w.readMove(); // Read information sent by peer process } } } Если фунция select возвращает положительное число, то это означает, что пришли данные от парного процесса, т.е. информация о сделанном ходе. Эти данные считываются в методе readMove, в игровом поле ставится соответствующий знак и окно перерисовывается: void MyWindow::readMove() { if (!finished) { char move[3]; int res = read(readChannel, move, 3); if (res == 3) { int x = move[0]; int y = move[1]; int c = move[2]; field[x][y] = (char) c; redraw(); } else if (res <= 0) { // End of file or read error finished = true; printf("Read: res=%d (connection broken).\n", res); } } }