1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/sogou-workflow

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
about-go-task.md 11 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 01.03.2025 10:20 552780a

О go-task

Мы предлагаем ещё один способ использования вычислительных задач, который имитирует go-задачи, реализованные в языке Go. Использование go-task для выполнения вычислительных задач не требует определения входных и выходных данных; все данные передаются через аргументы функции.

Создание go-task

class WFTaskFactory
{
    ...
public:
    template<class FUNC, class... ARGS>
    static WFGoTask* create_go_task(const std::string& queue_name,
                                    FUNC&& func, ARGS&&... args);
};

Аргумент queue_name представляет имя очереди вычислений, которое было рассмотрено ранее в примерах документов.
Функция func может быть указателем на функцию, объектом функции, функтором, лямбда-функцией, членом класса и любым другим вызываемым объектом.
Аргументы args представляют список аргументов для func. Обратите внимание, что если func является нестатическим методом класса, первый аргумент args должен быть адресом экземпляра класса.

Пример

Предположим, мы хотим асинхронно запустить функцию сложения void add(int a, int b, int& res);,
а также распечатать результат после завершения этой функции. Это можно сделать следующим образом:

#include <stdio.h>
#include <utility>
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"

void add(int a, int b, int& res)
{
    res = a + b;
}

int main(void)
{
    WFFacilities::WaitGroup wait_group(1);
    int a = 1;
    int b = 1;
    int res;

    WFGoTask* task = WFTaskFactory::create_go_task("test", add, a, b, std::ref(res));
    task->set_callback([&](WFGoTask* task) {
        printf("%d + %d = %d\n", a, b, res);
        wait_group.done();
    });

    task->start();
    wait_group.wait();
    return 0;
}

Приведённый выше пример демонстрирует асинхронное выполнение функции сложения, печать результата и завершение программы. Использование go-task мало отличается от других задач, но имеет поле user_data.

Единственная особенность заключается в том, что при создании go-task callback не передается, но его можно установить с помощью set_callback.

Если какой-либо из аргументов функции является ссылкой, следует использовать std::ref, чтобы избежать преобразования в значение, это характерно для C++11.

Использование workflow как пула потоков

Пользователи могут использовать только go-task, тем самым превращая workflow в пул потоков, где количество потоков равно количеству ядер процессора машины.
Однако этот пул потоков обладает большими возможностями по сравнению с обычным пулем, такими как наличие имени очереди для каждой задачи, возможность создания различных последовательностей и параллельных зависимостей между задачами.

Go-task с ограничением времени выполнения

С помощью интерфейса create_timedgo_task (необходимо использовать вместо create_go_task) можно создать go-task с ограничением времени выполнения:

class WFTaskFactory
{
    /* Создание 'Go'-задачи с ограничением времени выполнения в секундах и наносекундах.
     * Если время выполнения превышает установленное значение, состояние будет WFT_STATE_SYS_ERROR, а ошибка - ETIMEDOUT. */
    template<class FUNC, class... ARGS>
    static WFGoTask* create_timedgo_task(time_t seconds, long nanoseconds,
                                         const std::string& queue_name,
                                         FUNC&& func, ARGS&&... args);
};

Для создания go-task с ограничением времени выполнения требуется два дополнительных аргумента: seconds и nanoseconds.
Если время выполнения функции достигнет значения seconds + nanoseconds, задача вернет управление обратно через callback, и состояние будет WFT_STATE_SYS_ERROR, а ошибка — ETIMEDOUT.
Обратите внимание, что фреймворк не может прервать выполнение задачи пользователя. Функция продолжит свое выполнение до конца, но больше не будет вызывать callback. Также, диапазон значений nanoseconds находится в [0, 10^9).

Когда мы добавляем ограничение времени выполнения к go-task, момент вызова callback может произойти раньше окончания выполнения функции func, а также раньше окончания серии задач, к которой принадлежит эта задача.

Это может привести к ошибкам, если мы будем обращаться к серии задач внутри функции func. Например:

void f(SeriesWork* series)
{
    series->set_context(...);   // Ошибка. Когда f является go-task с ограничением времени выполнения, серия может уже быть недействительной.
}

int http_callback(WFHttpTask* task)
{
    SeriesWork* series = series_of(task);
    WFGoTask* go = WFTaskFactory::create_timedgo_task(1, 0, "test", f, series);  // Go-task с ограничением времени выполнения в 1 секунду
    series_of(task)->push_back(go);
}

Поэтому мы не рекомендуем выполнять операции над серией задач внутри функции func. Все такие операции должны выполняться внутри callback, например:

int main()
{
    WFGoTask* task = WFTaskFactory::create_timedgo_task(1, 0, "test", f);
    task->set_callback([](WFGoTask* task) {
        SeriesWork* series = series_of(task);
        void* context = series->get_context();
        if (task->get_state() == WFT_STATE_SUCCESS) { // Успешное выполнение
            ...
        } else { // state == WFT_STATE_SYS_ERROR && error == ETIMEDOUT  // Превышение времени выполнения
            ...
        }
    });
}

Однако использование задачи внутри функции func безопасно. Поэтому можно использовать task->user_data для передачи данных между функцией func и callback, например:

int main()
{
    WFGoTask* task = WFTaskFactory::create_timedgo_task(1, 0, "test", [&task](){
        task->user_data = (void*)123;
    });
    task->set_callback([](WFGoTask* task){
        SeriesWork* series = series_of(task);
        void* context = series->get_context();
        if (task->get_state() == WFT_STATE_SUCCESS){ // Успешное выполнение
            int result = (int)task->user_data;
        } else { // state == WFT_STATE_SYS_ERROR && error == ETIMEDOUT  // Превышение времени выполнения
            ...
        }
    });
    task->start();
    ...
}

Изменение функции выполнения go-task

В некоторых случаях нам может потребоваться получить доступ к задаче внутри функции выполнения go-task, как показано в примере выше, записывая результат выполнения в поле user_data задачи.
В приведённом выше примере используется захват по ссылке. Однако захват по ссылке имеет некоторые проблемы, такие как жизненный цикл задачи. Мы предпочитаем захватывать указатель на go-task напрямую внутри функции выполнения.
Значение захвата явно неверно, например:

WFGoTask* task = WFTaskFactory::create_timedgo_task(1, 0, "test", [task](){
        task->user_data = (void*)123;
});
~~~Этот код не позволяет получить указатель на задачу внутри лямбда-функции, поскольку захват происходит до того, как задача была инициализирована. Однако мы можем достичь этого с помощью следующего кода:

```cpp
WFGoTask* task = WFTaskFactory::create_timedgo_task(1, 0, "test", nullptr);  // Функция выполнения может быть инициализирована как nullptr
WFTaskFactory::reset_go_task(task, [task]() {
    task->user_data = (void*)123;
});
```

Функция `WFTaskFactory::reset_go_task()` предназначена для изменения функции выполнения go-task. Поскольку задача уже создана, захват указателя на задачу внутри лямбда-функции теперь корректен.

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/sogou-workflow.git
git@api.gitlife.ru:oschina-mirror/sogou-workflow.git
oschina-mirror
sogou-workflow
sogou-workflow
master