1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/sogou-workflow

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
about-resource-pool.md 9.9 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 01.03.2025 10:20 552780a

Ресурсный пул

При использовании workflow для написания асинхронных программ часто возникают следующие сценарии:

  • При выполнении задачи требуется получить ресурс из некоторого пула. После завершения работы задачи ресурс возвращается обратно в пул, чтобы его мог использовать следующая задача.
  • Для ограничения общего уровня параллелизма при сетевых взаимодействиях, но при этом не хотят занимать потоки для ожидания.
  • У нас есть множество случайно поступающих задач, находящихся в различных сериях. Однако эти задачи должны выполняться последовательно.

Все эти требования можно решить с помощью модуля ресурсного пула. Например, наш WFDnsResolver использует этот метод для контроля параллелизма запросов к DNS серверам.

Интерфейсы ресурсного пула

Интерфейсы ресурсного пула определены в WFResourcePool.h:

class WFResourcePool
{
public:
    WFConditional *get(SubTask *task, void **resbuf);
    WFConditional *get(SubTask *task);
    void post(void *res);
    ...

protected:
    virtual void *pop()
    {
        return this->data.res[this->data.index++];
    }

    virtual void push(void *res)
    {
        this->data.res[--this->data.index] = res;
    }
    ...

public:
    WFResourcePool(void *const *res, size_t n);
    WFResourcePool(size_t n);
    ...
};

Конструкторы

Первый конструктор принимает массив ресурсов длиной n. Каждый элемент массива имеет тип void *. Внутри создается еще один массив такого же размера, который заполняется копиями переданного массива. Если все ваши начальные ресурсы являются nullptr, вы можете использовать второй конструктор, передав только n, вместо того чтобы создавать массив указателей, состоящий из nullptr. Пример внутренней реализации:

void WFResourcePool::create(size_t n)
{
    this->data.res = new void *[n];
    this->data.value = n;
    ...
}

WFResourcePool::WFResourcePool(void *const *res, size_t n)
{
    this->create(n);
    memcpy(this->data.res, res, n * sizeof (void *));
}

WFResourcePool::WFResourcePool(size_t n)
{
    this->create(n);
    memset(this->data.res, 0, n * sizeof (void *));
}

Интерфейсы использования

Пользователи используют интерфейс get() для упаковки задачи в условную задачу. Условная задача представляет собой задачу, которая будет выполнена, когда её условие будет выполнено. Интерфейс get() может содержать второй параметр типа void **resbuf, который используется для хранения полученного ресурса. Затем пользователь просто заменяет исходную задачу на условную задачу и может запустить её или добавить в поток задач. Обратите внимание, что условная задача пытается получить ресурс при своём выполнении, а не при создании. В противном случае следующий код был бы заблокирован:

WFResourcePool pool(1);

int f()
{
    WFHttpTask *t1 = WFTaskFactory::create_http_task(..., [](void *){pool.post(nullptr);});
    WFHttpTask *t2 = WFTaskFactory::create_http_task(..., [](void *){pool.post(nullptr);});

    WFConditional *c1 = pool.get(t1, &t1->user_data);  // Использование user_data для хранения res - это удобный способ.
    WFConditional *c2 = pool.get(t2, &t2->user_data);

    c2->start();
    // ждать завершения t2 здесь.
    ...
    c1->start();
    ...
}

Вышеуказанный код создает c1 раньше, чем c2, и ожидает завершение t2 перед выполнением c1. Здесь нет возможности застрять c2, так как условная задача пытается получить ресурс при своём выполнении. Когда пользователь закончил использование ресурса (обычно в callback задачи), он должен вернуть ресурс обратно в пул через интерфейс post(). Параметр res в вызове post() не обязательно должен совпадать с тем, что было получено через get().

Наследование

Из pop() и push() мы видим, что по умолчанию использование ресурсов осуществляется по принципу FILO (первым вошел — последним вышел). Причины использования FILO заключаются в том, что в большинстве случаев недавно освобожденные ресурсы следует использовать первыми. Однако пользователи могут легко переопределить pop() и push() для создания FIFO (первым вошел — первым вышел) ресурсного пула. Если потребуется, вы также можете реализовать расширяемый и сжимаемый ресурсный пул.

Пример

Мы собираемся получить список URL, но требуем, чтобы общее количество параллельных запросов не превышало max_p. Мы можем использовать parallel для этого, но использование ресурсного пула проще:

int fetch_with_max(std::vector<std::string>& url_list, size_t max_p)
{
    WFResourcePool pool(max_p);

    for (std::string& url : url_list)
    {
        WFHttpTask *task = WFTaskFactory::create_http_task(url, [&pool](WFHttpTask *task) {
            pool.post(nullptr);
        });
        WFConditional *cond = pool.get(task);  // Не требуется сохранение res, поэтому можно не передавать параметр resbuf.
        cond->start();
    }

    // ждать здесь...
}

Очередь сообщений

Очередь сообщений является компонентом, аналогичным ресурсному пулу. Различия между ними заключаются в следующем:

  • Общее количество ресурсов в пуле фиксируется при создании. А очередь сообщений может иметь произвольную длину.
  • Ресурсный пул использует метод доступа FILO (первым вошел — последним вышел), тогда как очередь сообщений работает по принципу FIFO (первым вошел — первым вышел).
  • Ресурсный пул использует метод "получить" и "вернуть", то есть ресурс должен быть получен прежде чем его можно вернуть обратно. В очереди сообщений такой необходимости нет.
  • Реализация ресурсного пула основана на массивах, а очередь сообщений использует связные списки. В целом, очередь сообщений проще в реализации и использовании.

Интерфейсы очереди сообщений

Интерфейсы очереди сообщений определены в WFMessageQueue.h:

class WFMessageQueue
{
public:
    WFConditional *get(SubTask *task, void **msgbuf);
    WFConditional *get(SubTask *task);
    void post(void *msg);
    ...```markdown
public:
    WFMessageQueue();
    ...
};

Учитывая знание использования пула ресурсов, подробное описание использования очереди сообщений нам не требуется. Шаблон использования идентичен тому, который применяется к пулу ресурсов. Интерфейсы get() и post() очереди сообщений не требуют обязательного получения сообщения до его возврата. Любая задача может получать и отправлять сообщения в любое время. Если требуется, пользователи также могут наследовать от класса WFMessageQueue и реализовать метод FIFO чтения сообщений.


Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/sogou-workflow.git
git@api.gitlife.ru:oschina-mirror/sogou-workflow.git
oschina-mirror
sogou-workflow
sogou-workflow
master