Мы предлагаем ещё один способ использования вычислительных задач, который имитирует go-задачи, реализованные в языке Go. Использование go-task для выполнения вычислительных задач не требует определения входных и выходных данных; все данные передаются через аргументы функции.
class WFTaskFactory
{
...
public:
template<class FUNC, class... ARGS>
static WFGoTask* create_go_task(const std::string& queue_name,
FUNC&& func, ARGS&&... args);
};
Аргумент queue_name
представляет имя очереди вычислений, которое было рассмотрено ранее в примерах документов.
Функция func
может быть указателем на функцию, объектом функции, функтором, лямбда-функцией, членом класса и любым другим вызываемым объектом.
Аргументы args
представляют список аргументов для func
. Обратите внимание, что если func
является нестатическим методом класса, первый аргумент args
должен быть адресом экземпляра класса.
Предположим, мы хотим асинхронно запустить функцию сложения void add(int a, int b, int& res);
,
а также распечатать результат после завершения этой функции. Это можно сделать следующим образом:
#include <stdio.h>
#include <utility>
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
void add(int a, int b, int& res)
{
res = a + b;
}
int main(void)
{
WFFacilities::WaitGroup wait_group(1);
int a = 1;
int b = 1;
int res;
WFGoTask* task = WFTaskFactory::create_go_task("test", add, a, b, std::ref(res));
task->set_callback([&](WFGoTask* task) {
printf("%d + %d = %d\n", a, b, res);
wait_group.done();
});
task->start();
wait_group.wait();
return 0;
}
Приведённый выше пример демонстрирует асинхронное выполнение функции сложения, печать результата и завершение программы. Использование go-task мало отличается от других задач, но имеет поле user_data
.
Единственная особенность заключается в том, что при создании go-task callback не передается, но его можно установить с помощью set_callback
.
Если какой-либо из аргументов функции является ссылкой, следует использовать std::ref
, чтобы избежать преобразования в значение, это характерно для C++11.
Пользователи могут использовать только go-task, тем самым превращая workflow в пул потоков, где количество потоков равно количеству ядер процессора машины.
Однако этот пул потоков обладает большими возможностями по сравнению с обычным пулем, такими как наличие имени очереди для каждой задачи, возможность создания различных последовательностей и параллельных зависимостей между задачами.
С помощью интерфейса create_timedgo_task
(необходимо использовать вместо create_go_task
) можно создать go-task с ограничением времени выполнения:
class WFTaskFactory
{
/* Создание 'Go'-задачи с ограничением времени выполнения в секундах и наносекундах.
* Если время выполнения превышает установленное значение, состояние будет WFT_STATE_SYS_ERROR, а ошибка - ETIMEDOUT. */
template<class FUNC, class... ARGS>
static WFGoTask* create_timedgo_task(time_t seconds, long nanoseconds,
const std::string& queue_name,
FUNC&& func, ARGS&&... args);
};
Для создания go-task с ограничением времени выполнения требуется два дополнительных аргумента: seconds
и nanoseconds
.
Если время выполнения функции достигнет значения seconds + nanoseconds
, задача вернет управление обратно через callback, и состояние будет WFT_STATE_SYS_ERROR
, а ошибка — ETIMEDOUT
.
Обратите внимание, что фреймворк не может прервать выполнение задачи пользователя. Функция продолжит свое выполнение до конца, но больше не будет вызывать callback. Также, диапазон значений nanoseconds
находится в [0, 10^9)
.
Когда мы добавляем ограничение времени выполнения к go-task, момент вызова callback может произойти раньше окончания выполнения функции func
, а также раньше окончания серии задач, к которой принадлежит эта задача.
Это может привести к ошибкам, если мы будем обращаться к серии задач внутри функции func
. Например:
void f(SeriesWork* series)
{
series->set_context(...); // Ошибка. Когда f является go-task с ограничением времени выполнения, серия может уже быть недействительной.
}
int http_callback(WFHttpTask* task)
{
SeriesWork* series = series_of(task);
WFGoTask* go = WFTaskFactory::create_timedgo_task(1, 0, "test", f, series); // Go-task с ограничением времени выполнения в 1 секунду
series_of(task)->push_back(go);
}
Поэтому мы не рекомендуем выполнять операции над серией задач внутри функции func
. Все такие операции должны выполняться внутри callback, например:
int main()
{
WFGoTask* task = WFTaskFactory::create_timedgo_task(1, 0, "test", f);
task->set_callback([](WFGoTask* task) {
SeriesWork* series = series_of(task);
void* context = series->get_context();
if (task->get_state() == WFT_STATE_SUCCESS) { // Успешное выполнение
...
} else { // state == WFT_STATE_SYS_ERROR && error == ETIMEDOUT // Превышение времени выполнения
...
}
});
}
Однако использование задачи внутри функции func
безопасно. Поэтому можно использовать task->user_data
для передачи данных между функцией func
и callback, например:
int main()
{
WFGoTask* task = WFTaskFactory::create_timedgo_task(1, 0, "test", [&task](){
task->user_data = (void*)123;
});
task->set_callback([](WFGoTask* task){
SeriesWork* series = series_of(task);
void* context = series->get_context();
if (task->get_state() == WFT_STATE_SUCCESS){ // Успешное выполнение
int result = (int)task->user_data;
} else { // state == WFT_STATE_SYS_ERROR && error == ETIMEDOUT // Превышение времени выполнения
...
}
});
task->start();
...
}
В некоторых случаях нам может потребоваться получить доступ к задаче внутри функции выполнения go-task, как показано в примере выше, записывая результат выполнения в поле user_data
задачи.
В приведённом выше примере используется захват по ссылке. Однако захват по ссылке имеет некоторые проблемы, такие как жизненный цикл задачи. Мы предпочитаем захватывать указатель на go-task напрямую внутри функции выполнения.
Значение захвата явно неверно, например:
WFGoTask* task = WFTaskFactory::create_timedgo_task(1, 0, "test", [task](){
task->user_data = (void*)123;
});
~~~Этот код не позволяет получить указатель на задачу внутри лямбда-функции, поскольку захват происходит до того, как задача была инициализирована. Однако мы можем достичь этого с помощью следующего кода:
```cpp
WFGoTask* task = WFTaskFactory::create_timedgo_task(1, 0, "test", nullptr); // Функция выполнения может быть инициализирована как nullptr
WFTaskFactory::reset_go_task(task, [task]() {
task->user_data = (void*)123;
});
```
Функция `WFTaskFactory::reset_go_task()` предназначена для изменения функции выполнения go-task. Поскольку задача уже создана, захват указателя на задачу внутри лямбда-функции теперь корректен.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )