GrabDuck

Считаем Пи параллельно. Часть 1

:


В этой серии постов мы попробуем решить одну простую задачу с помощью более-менее актуальных технологий параллельного программирования (Нативные потоки, OpenMP, TBB, MPI, CUDA, OpenCL, OpenACC, Chapel может быть еще что-нить экзотическое. Как бы сравнительно и в hands-on ключе.

Чтобы все влезло в один пост (в нескольких частях) и вообще могло быть совокупно охвачено вниманием проблема была выбрана умышленно тривиальная. Поэтому многие аспекты упомянутых технологий и главное трудности которые возникают при реальном прграммировании современных массивно-параллельных систем останутся за кадром. Также не стоит пользоваться приведённым и примерами в качестве бенчмарка «GPU против CPU» или что-то в этом духе. Единственная цель — показать «как оно работает». Пост рассчитан на людей с параллельным программированием не очень знакомых. Под катом будет много кода. Собственно считать мы будем число Пи путем численного интегрирования image

Исходные коды также доступны на github.com/undertherain/pi (будут пополнятся по мере написания следующих частей поста).

Последовательная версия.


Давайте для начала сделаем последовательную версию. Т.е. такую, которая использует одно ядро обычного центрального процессора. Возьмем самый простой из методов численного инетгрирования — метод прямоугольников и закодим его на языке Си (вообще дальше будем пользоваться Си\С++-ным суржиком в виду ряда причин.
Объявим некоторое количество cntSteps прямоугольников на которые мы разобъем нашу площадь под интегралом, посчитаем основание:
step = 1./static_cast<double>(cntSteps);

и всю площадь, посчитав значение функии в каждом прямоугольнике и домножив на основание.

for (unsigned long i=0; i<cntSteps; i++)
{
    x = ( i + .5 ) *step;
    sum = sum + 4.0/(1.+ x*x);
}

Впрочем, домножим на основание step мы лучше «за скобками», т.е. за циклом — вот в принципе и все.

Вот полный код программы:

#include <iostream>
#include <iomanip>
#include <sys/times.h>
#include <cmath>

int main(int argc, char** argv)
{
    const unsigned long cntSteps=500000000;    /* # of rectangles */
    double step; 
    const double PI25DT = 3.141592653589793238462643; //reference Pi value
    double pi=0;
    double sum=0.0; 
    double x;
    std::cout<< "calculating pi on CPU single-threaded\n";
    //засекаем время
    clock_t clockStart, clockStop;  
    tms tmsStart, tmsStop; 
    clockStart = times(&tmsStart);
    //вычисляем шаг интегрирования и интегральную сумму
    step = 1./static_cast<double>(cntSteps);
    for (unsigned long i=0; i<cntSteps; i++)
    {
        x = (i + .5)*step;
        sum = sum + 4.0/(1.+ x*x);
    }
    pi = sum*step;
    //снова засекаем время и печатаем результат
    clockStop = times(&tmsStop);
    std::cout << "The value of PI is " << pi << " Error is " << fabs(pi - PI25DT) << "\n";
    std::cout << "The time to calculate PI was " ;
    double secs= (clockStop - clockStart)/static_cast<double>(sysconf(_SC_CLK_TCK));
    std::cout << secs << " seconds\n";
    return 0;
}

Копия по ссылке http://dumpz.org/195276/.

Native Threads


Самая, пожалуй, доступная простым пользователям параллельная архитектура — это обычный многоядерный процессор (кажется сейчас уже трудно найти процессор с одним ядром) или несколько процессоров на одной материнской плате. В принципе и одно ядро способно исполнять параллельно несоколько потоков — такой режим называется псевдо-параллельным или конкурентным. Ядро переключается между процессами (процесс — это грубо говоря находящаяся в памяти программа), выделяя каждому квант времени. В принципе такой режим выполнения уже может привести к росту производительности за счет сокрытия латентности памяти, если не на обычных «домашних» процессорах, то на специализированных многопоточных, но об этом позже. В нашем случае избыточное количество потоков привело бы к замедлению из-за накладных расходов на переключение между потоками.

Самый «исторический» способ исползьовать сразу несколько ядер на процессоре — это механизм потоков операционной системы, который сущесвовал задолго до истинно-параллельных процессоров для конкуренности хотя бы ради более удобного написания программ. С точки зрения программиста важно то, что параллельные потоки, исполняемые на разных ядрах или процессорах видят одно и то же адресное пространство, т.е нет нужды явно передавать данные между потоками. Зато если вдруг разные потоки пишут\читают одну и ту же переменную — то придётся озаботиться синхронизацией.

Ладно, давайти ближе к коду: с точки зрения языка Си поток — это обычная функция или метод класса, удовлетворяющая определённому прототипу. Давайте назовём ее static void * worker ( void * ptrArgs ), аргументом она получает указатель на структуру, в которой можно сделать полей сколько нужно чтобы передать потоку его аргументы. В нашем случае мы скажем потоковой функии в каком диапазоне считать наш интеграл. В теле потоковой функции — уже известный нам цикл, собсно и считающий по диапазону который мы ему передали в параметрах.

for (long long i=args->left; i<args->right; i++)
{
    x = (i + .5)*step;
    sum = sum + 4.0/(1.+ x*x);
}

Интервал интегрирования для каждого потока мы заранее вычислим изходя из его порядкового номера. Если один из потоков посчитает свою часть раньше — то соответсвующее ядро будет простаивать, т.е. мы потеряем производительность. Идеально было бы разделить интервал на много маленьих участков и раздавать потокам по мере того как они справляются со своей работой. Но пока оставим как есть.

arrArgsThread[idThread].left  = idThread*cntStepsPerThread;
arrArgsThread[idThread].right = (idThread+1)*cntStepsPerThread;

На исполнение отдельным потоком функция запускается через системный вызов pthread_create в случае POSIX (в линуксе, например) или в случае windows этто будет аналогичный вызов из Win32 API, вуглядеть будет немного по-другому, но в целом похоже.

Результат будем из каждого потока прибавлять к общем перменной pi += sum * step ; (помним что мы находимся в общем адресном пространстве).

Чтобы память не испортилась в случае если два потока будут одновременно изменять одну ячейко нам надо как-то гарантировать что в один момент времени только один поток получает доступ к переменной pi, т.е. создать так называемую «критическую секцию». Для это можно воспользовться специальным механизмом операционной системы под названием мьютекс (от слова mutual exclusion) — если один поток заблокировал мьютекс, другой поток будет ждать (на попытке получить мьютекс самому) пока первый поток его не «освободит».

pthread_mutex_lock(&mutexReduction);
pi += sum*step;
pthread_mutex_unlock(&mutexReduction);

Итого выходит как-то так:
#include <iostream>
#include <iomanip>
#include <cmath>
#include <cstdlib>
#include <pthread.h>
#include <sys/times.h>

#define cntThreads 4

pthread_mutex_t mutexReduction;
double pi=0.;
//параметры которые мы передадим потоковой функции
struct ArgsThread
{
    long long left,right;
    double step;
};
//потоковая функция
static void *worker(void *ptrArgs)
{
    ArgsThread * args = reinterpret_cast<ArgsThread *>(ptrArgs);
    double x;
    double sum=0.;
    double step=args->step;
    for (long long i=args->left; i<args->right; i++)
    {
        x = (i + .5)*step;
        sum = sum + 4.0/(1.+ x*x);
    }
    pthread_mutex_lock(&mutexReduction);
    pi += sum*step;
    pthread_mutex_unlock(&mutexReduction);
    return NULL;
}

int main(int argc, char** argv)
{
    const unsigned long num_steps=500000000;    
    const double PI25DT = 3.141592653589793238462643;
    pthread_t threads[cntThreads];
    ArgsThread arrArgsThread[cntThreads];
    std::cout<<"POSIX threads. number of threads = "<<cntThreads<<std::endl;
    clock_t clockStart, clockStop;
    tms tmsStart, tmsStop;
    clockStart = times(&tmsStart);
    double step = 1./(double)num_steps;
    long long cntStepsPerThread= num_steps / cntThreads;
    //для каждого потока вычисляем его диапазон интегрирования и запускаем поток
    for (unsigned int idThread=0; idThread<<cntThreads; idThread++)
    {
        arrArgsThread[idThread].left  = idThread*cntStepsPerThread;
        arrArgsThread[idThread].right = (idThread+1)*cntStepsPerThread;
        arrArgsThread[idThread].step = step;
        if (pthread_create(&threads[idThread], NULL, worker, &arrArgsThread[idThread]) != 0)
        {
            return EXIT_FAILURE;
        }
    }
    //ждем когда все потоки завершаться с помощью функции join
    for (unsigned int idThread=0; idThread<cntThreads; idThread++)
    {
        if (pthread_join(threads[idThread], NULL) != 0)
        {
            return EXIT_FAILURE;
        }
    }
    //печатаем!
    clockStop = times(&tmsStop);
    std::cout << "The value of PI is " << pi << " Error is " <<  fabs(pi - PI25DT) << std::endl;
    std::cout << "The time to calculate PI was " ;
    double secs= (clockStop - clockStart)/static_cast<double>(sysconf(_SC_CLK_TCK));
    std::cout << secs << " seconds\n" << std::endl;
    return 0;
}

Копия на http://dumpz.org/195404/ на случай если у кото-то моё адово форматироваие неровно отображается.

На самом деле конкретно в этом примере (но так везти будет не всегда) можно было обойтись без мьютексов вообще если сохранять в каждом потоке результат в отдельную переменную (элемент массива ArgsThread arrArgsThread [ cntThreads ];
) а потом, дождавшись завершения всех потоков — просуммировать что получилось.

Вот код без мьютексов:

#include <iostream>
#include <iomanip>
#include <cmath>
#include <cstdlib>
#include <pthread.h>
#include <sys/times.h>

#define cntThreads 4

struct ArgsThread
{
    long long left,right;
    double step;
    double partialSum;
};

static void *worker(void *ptrArgs)
{
    ArgsThread * args = reinterpret_cast<ArgsThread *>(ptrArgs);
    double x;
    double sum=0.;
    double step=args->step;
    for (long long i=args->left; i<args->right; i++)
    {
        x = (i + .5)*step;
        sum = sum + 4.0/(1.+ x*x);
    }
    args->partialSum=sum*step;
    return NULL;
}

int main(int argc, char** argv)
{
    const unsigned long num_steps=500000000;
    const double PI25DT = 3.141592653589793238462643;
    pthread_t threads[cntThreads];
    ArgsThread arrArgsThread[cntThreads];
    std::cout<<"POSIX threads. number of threads = "<<cntThreads<<std::endl;
    clock_t clockStart, clockStop;
    tms tmsStart, tmsStop;
    clockStart = times(&tmsStart);
    double step = 1./(double)num_steps;
    long long cntStepsPerThread= num_steps / cntThreads;
    for (unsigned int idThread=0; idThread<cntThreads; idThread++)
    {
        arrArgsThread[idThread].left  = idThread*cntStepsPerThread;
        arrArgsThread[idThread].right = (idThread+1)*cntStepsPerThread;
        arrArgsThread[idThread].step = step;
        if (pthread_create(&threads[idThread], NULL, worker, &arrArgsThread[idThread]) != 0)
        {
            return EXIT_FAILURE;
        }
    }
    double pi=0.;
    for (unsigned int idThread=0; idThread<cntThreads; idThread++)
    {
        if (pthread_join(threads[idThread], NULL) != 0)
        {
            return EXIT_FAILURE;
        }
        pi +=arrArgsThread[idThread].partialSum;
    }
    clockStop = times(&tmsStop);
    std::cout << "The value of PI is " << pi << " Error is " << fabs(pi - PI25DT) << std::endl;
    std::cout << "The time to calculate PI was " ;
    double secs= (clockStop - clockStart)/static_cast<double>(sysconf(_SC_CLK_TCK));
    std::cout << secs << " seconds\n" << std::endl;
    return 0;
}

И копия на http://dumpz.org/195415/.

Как видите, код получилася довольно громоздкий и некросплатформенный. Если последнее решается отчасти с помощью boost::threads(но не все хотят ставить boost) или в новом C++11 потоки вообще стали частью языка (очень здорово вышло на самом деле) — но большая часть софта пока еще использует старый C++. Но проблема громоздкости кода всё равно остается.

OpenMP


OpenMP — это расширение языка (C/C++/Fortran) позволяющее делать примерно то же самое что мы делали с помощью API потоков операционной системы — но гораздо проще и лаконичней с помощью так называемых прагм. Прагма как бы говорит компилатору «взять вот этот код и исполнять параллельно» — и копмилятор делает все остальное.
Для распараллеливания цикла for в нашем первом последовательном примере достаточно добавить туда одну строчку:
#pragma omp parallel for private (x), reduction (+:sum)
for (int i=0; i<numSteps; i++)
{
    x = (i + .5)*step;
    sum = sum + 4.0/(1.+ x*x);
}

эта прагма говорит, что нужно распараллелить витки цикла, переменную x сделать приватной для каждого потока, по переменной sum потом провести редукцию (или как это по-русски?) по суммированию. Т.е. сначала создать для каждого потока по копии — а потом их сложить. Примерно то же самое, что мы сделали в прошлом примере без мьютекосв. Также OpenMP предоставляет небольшой API для сервисных нужд.
#include <iostream>
#include <iomanip>
#include <sys/times.h>
#include <cmath>
#include <omp.h>

int main(int argc, char** argv)
{
    const unsigned long numSteps=500000000;                     /* default # of rectangles */
    double step;
    double PI25DT = 3.141592653589793238462643;
    double pi=0;
    double sum=0.0;
    double x;

    #pragma omp parallel
    {
    #pragma omp master
        {
            int cntThreads=omp_get_num_threads();
            std::cout<<"OpenMP. number of threads = "<<cntThreads<<std::endl;
        }
    }

    clock_t clockStart, clockStop;
    tms tmsStart, tmsStop;
    step = 1./static_cast<double>(numSteps);
    clockStart = times(&tmsStart);
    #pragma omp parallel for private (x), reduction (+:sum)
    for (int i=0; i<numSteps; i++)
    {
        x = (i + .5)*step;
        sum = sum + 4.0/(1.+ x*x);
    }
    pi = sum*step;
    clockStop = times(&tmsStop);
    std::cout << "The value of PI is " << pi << " Error is " << fabs(pi - PI25DT) << std::endl;
    std::cout << "The time to calculate PI was " ;
    double secs= (clockStop - clockStart)/static_cast<double>(sysconf(_SC_CLK_TCK));
    std::cout << secs << " seconds\n" << std::endl;
    return 0;
}

Копия http://dumpz.org/195550/.
Компилировать с опцией -fopenmp в случае g++, с случае другого компилятора обратитесь к руководству пользователя.

Вопросы и комментарии приветсвуются, продолжение — следует!