При использовании workflow для написания асинхронных программ часто возникают следующие сценарии:
Все эти требования можно решить с помощью модуля ресурсного пула. Например, наш WFDnsResolver использует этот метод для контроля параллелизма запросов к DNS серверам.
Интерфейсы ресурсного пула определены в WFResourcePool.h:
class WFResourcePool
{
public:
WFConditional *get(SubTask *task, void **resbuf);
WFConditional *get(SubTask *task);
void post(void *res);
...
protected:
virtual void *pop()
{
return this->data.res[this->data.index++];
}
virtual void push(void *res)
{
this->data.res[--this->data.index] = res;
}
...
public:
WFResourcePool(void *const *res, size_t n);
WFResourcePool(size_t n);
...
};
Первый конструктор принимает массив ресурсов длиной n. Каждый элемент массива имеет тип void *. Внутри создается еще один массив такого же размера, который заполняется копиями переданного массива. Если все ваши начальные ресурсы являются nullptr, вы можете использовать второй конструктор, передав только n, вместо того чтобы создавать массив указателей, состоящий из nullptr. Пример внутренней реализации:
void WFResourcePool::create(size_t n)
{
this->data.res = new void *[n];
this->data.value = n;
...
}
WFResourcePool::WFResourcePool(void *const *res, size_t n)
{
this->create(n);
memcpy(this->data.res, res, n * sizeof (void *));
}
WFResourcePool::WFResourcePool(size_t n)
{
this->create(n);
memset(this->data.res, 0, n * sizeof (void *));
}
Пользователи используют интерфейс get()
для упаковки задачи в условную задачу. Условная задача представляет собой задачу, которая будет выполнена, когда её условие будет выполнено.
Интерфейс get()
может содержать второй параметр типа void **resbuf
, который используется для хранения полученного ресурса.
Затем пользователь просто заменяет исходную задачу на условную задачу и может запустить её или добавить в поток задач.
Обратите внимание, что условная задача пытается получить ресурс при своём выполнении, а не при создании. В противном случае следующий код был бы заблокирован:
WFResourcePool pool(1);
int f()
{
WFHttpTask *t1 = WFTaskFactory::create_http_task(..., [](void *){pool.post(nullptr);});
WFHttpTask *t2 = WFTaskFactory::create_http_task(..., [](void *){pool.post(nullptr);});
WFConditional *c1 = pool.get(t1, &t1->user_data); // Использование user_data для хранения res - это удобный способ.
WFConditional *c2 = pool.get(t2, &t2->user_data);
c2->start();
// ждать завершения t2 здесь.
...
c1->start();
...
}
Вышеуказанный код создает c1
раньше, чем c2
, и ожидает завершение t2
перед выполнением c1
. Здесь нет возможности застрять c2
, так как условная задача пытается получить ресурс при своём выполнении.
Когда пользователь закончил использование ресурса (обычно в callback задачи), он должен вернуть ресурс обратно в пул через интерфейс post()
.
Параметр res
в вызове post()
не обязательно должен совпадать с тем, что было получено через get()
.
Из pop()
и push()
мы видим, что по умолчанию использование ресурсов осуществляется по принципу FILO (первым вошел — последним вышел).
Причины использования FILO заключаются в том, что в большинстве случаев недавно освобожденные ресурсы следует использовать первыми.
Однако пользователи могут легко переопределить pop()
и push()
для создания FIFO (первым вошел — первым вышел) ресурсного пула.
Если потребуется, вы также можете реализовать расширяемый и сжимаемый ресурсный пул.
Мы собираемся получить список URL, но требуем, чтобы общее количество параллельных запросов не превышало max_p
. Мы можем использовать parallel
для этого, но использование ресурсного пула проще:
int fetch_with_max(std::vector<std::string>& url_list, size_t max_p)
{
WFResourcePool pool(max_p);
for (std::string& url : url_list)
{
WFHttpTask *task = WFTaskFactory::create_http_task(url, [&pool](WFHttpTask *task) {
pool.post(nullptr);
});
WFConditional *cond = pool.get(task); // Не требуется сохранение res, поэтому можно не передавать параметр resbuf.
cond->start();
}
// ждать здесь...
}
Очередь сообщений является компонентом, аналогичным ресурсному пулу. Различия между ними заключаются в следующем:
Интерфейсы очереди сообщений определены в WFMessageQueue.h:
class WFMessageQueue
{
public:
WFConditional *get(SubTask *task, void **msgbuf);
WFConditional *get(SubTask *task);
void post(void *msg);
...```markdown
public:
WFMessageQueue();
...
};
Учитывая знание использования пула ресурсов, подробное описание использования очереди сообщений нам не требуется. Шаблон использования идентичен тому, который применяется к пулу ресурсов.
Интерфейсы get()
и post()
очереди сообщений не требуют обязательного получения сообщения до его возврата. Любая задача может получать и отправлять сообщения в любое время.
Если требуется, пользователи также могут наследовать от класса WFMessageQueue
и реализовать метод FIFO чтения сообщений.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )