1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/sogou-workflow

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
tutorial-08-matrix_multiply.md 13 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 01.03.2025 10:20 552780a

Определение вычислительной задачи: matrix_multiply

Пример кода

tutorial-08-matrix_multiply.cc

Обзор matrix_multiply

Программа выполняет умножение двух матриц в коде выполнения и выводит результат на экран.
Основная цель примера — показать, как реализовать пользовательскую вычислительную задачу для ЦПУ.

Определение вычислительной задачи

Для определения вычислительной задачи требуется предоставить три основных элемента: INPUT, OUTPUT и routine.
INPUT и OUTPUT являются двумя шаблонными параметрами, которые могут быть любого типа. Routine представляет собой процесс преобразования от INPUT до OUTPUT, определяется следующим образом:

template <typename INPUT, typename OUTPUT>
class __WFThreadTask
{
    ...
    std::function<void(INPUT*, OUTPUT*)> routine;
    ...
};

Из этого можно видеть, что routine представляет собой простой процесс вычисления от INPUT до OUTPUT. Указатель на INPUT не является константным, но пользователи также могут передавать функции с указателем на const INPUT*.
Например, задача сложения может быть определена так:

struct add_input
{
    int x;
    int y;
};

struct add_output
{
    int res;
};

void add_routine(const add_input* input, add_output* output)
{
    output->res = input->x + input->y;
}

typedef WFThreadTask<add_input, add_output> add_task;

В нашем примере умножения матриц INPUT состоит из двух матриц, а OUTPUT — из одной матрицы. Это определено следующим образом:

namespace algorithm
{
    
using Matrix = std::vector<std::vector<double>>;

struct MMInput
{
    Matrix a;
    Matrix b;
};

struct MMOutput
{
    int error;
    size_t m, n, k;
    Matrix c;
};

void matrix_multiply(const MMInput* in, MMOutput* out)
{
    ...
}

}

Умножение матриц может иметь проблемы с незаконными входными данными, поэтому в OUTPUT добавлено поле error для представления ошибок.

Создание вычислительной задачи

После определения типов INPUT и OUTPUT, а также алгоритма, можно использовать фабрику WFThreadTaskFactory для создания вычислительной задачи.
Определение фабрики вычислительных задач приведено ниже:

template <typename INPUT, typename OUTPUT>
class WFThreadTaskFactory
{
private:
    using T = WFThreadTask<INPUT, OUTPUT>;
    
public:
    static T* create_thread_task(const std::string& queue_name,
                                 std::function<void(INPUT*, OUTPUT*)> routine,
                                 std::function<void(T*)> callback);
    
    static T* create_thread_task(time_t seconds, long nanoseconds,
                                 const std::string& queue_name,
                                 std::function<void(INPUT*, OUTPUT*)> routine,
                                 std::function<void(T*)> callback);
    ...
};

Здесь представлены два метода для создания задачи. Второй метод позволяет пользователям задавать ограничения времени выполнения задачи, который будет рассмотрен далее.
Этот класс требует двух шаблонных параметров INPUT и OUTPUT.
Сведения о queue_name были уже рассмотрены в предыдущих примерах. Routine представляет ваш процесс вычисления, а callback — обратный вызов.
В нашем примере мы видим использование этих методов:

using MMTask = WFThreadTask<algorithm::MMInput,
                             algorithm::MMOutput>;

using namespace algorithm;

int main()
{
    typedef WFThreadTaskFactory<MMInput, MMOutput> MMFactory;
    MMTask* task = MMFactory::create_thread_task("matrix_multiply_task",
                                                 matrix_multiply,
                                                 callback);
    
    MMInput* input = task->get_input();
    
    input->a = {{1, 2, 3}, {4, 5, 6}};
    input->b = {{7, 8}, {9, 10}, {11, 12}};
    ...
}

После создания задачи, используйте get_input() для получения указателя на входные данные. Это аналогично get_req() для сетевых задач.
Старт и завершение задачи ничем не отличаются от сетевых задач. Также обратный вызов очень прост:

void callback(MMTask* task)     // MMtask = WFThreadTask<MMInput, MMOutput>
{
    MMInput* input = task->get_input();
    MMOutput* output = task->get_output();
    
    assert(task->get_state() == WFT_STATE_SUCCESS);
    
    if (output->error)
        printf("Ошибка: %d %s\n", output->error, strerror(output->error));
    else
    {
        printf("Матрица A\n");
        print_matrix(input->a, output->m, output->k);
        printf("Матрица B\n");
        print_matrix(input->b, output->k, output->n);
        printf("Матрица A * Матрица B =>\n");
        print_matrix(output->c, output->m, output->n);
    }
}

Обычная вычислительная задача может игнорировать возможность неудачи, состояние завершения всегда SUCCESS.
В обратном вызове просто печатаются входные и выходные данные. Если входные данные некорректны, то выводится сообщение об ошибке.

Вычислительная задача с ограничением времени выполнения

Конечно, наша система не может прерывать выполнение пользовательской вычислительной задачи, поскольку это функция, которую пользователь должен самостоятельно обеспечивать корректное завершение.
Однако мы поддерживаем возможность установки временного лимита для задачи. Интерфейс с временем выполнения определён следующим образом:

template <typename INPUT, typename OUTPUT>
class WFThreadTaskFactory
{
private:
    using T = WFThreadTask<INPUT, OUTPUT>;
    
public:
    static T* create_thread_task(time_t seconds, long nanoseconds,
                                 const std::string& queue_name,
                                 std::function<void(INPUT*, OUTPUT*)> routine,
                                 std::function<void(T*)> callback);
    ...
};

Аргументы seconds и nanoseconds представляют временной лимит. Здесь значение nanoseconds находится в диапазоне [0,1000000000).
Если задача не завершается в течение установленного времени, она немедленно переходит в обратный вызов со статусом WFT_STATE_SYS_ERROR и кодом ошибки ETIMEDOUT.
Пример использования matrix_multiply с ограничением времени:

void callback(MMTask* task)     // MMtask = WFThreadTask<MMInput, MMOutput>
{
    MMInput* input = task->get_input();
    MMOutput* output = task->get_output();
    
    if (task->get_state() == WFT_STATE_SYS_ERROR && task->get_error() == ETIMEDOUT)
    {
        printf("Вышло время.\n");
        return;
    }
    
    assert(task->get_state() == WFT_STATE_SUCCESS);
    
    if (output->error)
        printf("Ошибка: %d %s\n", output->error, strerror(output->error));
    else
    {
        printf("Матрица A\n");
        print_matrix(input->a, output->m, output->k);
        printf("Матрица B\n");
        print_matrix(input->b, output->k, output->n);
        printf("Матрица A * Матрица B =>\n");
        print_matrix(output->c, output->m, output->n);
    }
}

using namespace algorithm;

int main()
{
    typedef WFThreadTaskFactory<MMInput, MMOutput> MMFactory;
    MMTask* task = MMFactory::create_thread_task(0, 1000000,
                                                 "matrix_multiply_task",
                                                 matrix_multiply,
                                                 callback);
    
    MMInput* input = task->get_input();
    
    input->a = {{1, 2, 3}, {4, 5, 6}};
    input->b = {{7, 8}, {9, 10}, {11, 12}};
    ...
}
```В этом примере время выполнения задачи ограничивается одной миллисекундой; в противном случае задача завершается со статусом WFT_STATE_SYS_ERROR.  
Еще раз напомним, что мы не прерываем выполнение пользовательской функции. Когда задача превышает время выполнения и переходит в обратный вызов, функция продолжает выполнять свои действия до завершения.  
Если пользователь хочет прекратить выполнение функции, ему следует добавить проверочные точки в коде. Например:

```cpp
void callback(MMTask* task)     // MMtask = WFThreadTask<MMInput, MMOutput>
{
    if (task->get_state() == WFT_STATE_SYS_ERROR && task->get_error() == ETIMEDOUT)
    {
        task->get_input()->flag = true;
        printf("Вышло время.\n");
        return;
    }
    ...
}

void matrix_multiply(const MMInput* in, MMOutput* out)
{
    while (!in->flag)
    {
        ....
    }
}

Симметричность алгоритмов и протоколов

В нашей системе алгоритмы и протоколы имеют высокую степень симметрии на очень абстрактном уровне.
Существуют пользовательские вычислительные задачи, а значит, существуют и пользовательские сетевые протоколы.
Пользовательская задача требует предоставления процесса вычисления, а пользовательский протокол требует предоставления процессов сериализации и десериализации данных.
Как в случае с пользовательским алгоритмом, так и в случае с пользовательским протоколом, важно подчеркнуть, что они должны быть чистыми и не содержать информации о задачах или сериях.

Композиционность вычислительных и сетевых задач

В данном примере мы создали вычислительную задачу с помощью фабрики WFThreadTaskFactory. Это самый простой способ создания вычислительной задачи, который обычно достаточно для большинства случаев.
Точно так же пользователи могут легко определить свой собственный протокол сервера и клиента.
Однако в предыдущем примере мы видели, что можем создать параллельную задачу сортировки с использованием фабрики алгоритмов, что невозможно сделать с помощью одного рутайна.
Также для сетевой задачи, такой как задача Kafka, может потребоваться взаимодействие с несколькими машинами, чтобы получить результат, однако для пользователя это полностью прозрачно.
Поэтому все наши задачи являются композиционными, если вы хорошо знакомы с нашей системой, вы можете создать множество сложных компонентов.

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/sogou-workflow.git
git@api.gitlife.ru:oschina-mirror/sogou-workflow.git
oschina-mirror
sogou-workflow
sogou-workflow
master