Во второй главе мы познакомились с основами использования ØMQ, создав несколько небольших приложений, каждый из которых вводил новые возможности. В этой главе мы продолжим этот подход, чтобы исследовать более продвинутые запрос-ответные модели, построенные на основе ØMQ.
Основные темы, рассматриваемые в этой главе:
В запрос-ответной модели оболочка хранит информацию о местоположении объекта, отвечающего за запрос. Именно поэтому ØMQ-сети являются бессостоятельными, но все же могут выполнять запрос-ответные операции.В обычном использовании вам не обязательно знать принцип работы оболочек запрос-ответ. При использовании REQ и REP ØMQ автоматически управляет оболочками сообщений. В следующей главе, где будут рассмотрены устройства (device), вам потребуется только читать и записывать все данные. ØMQ использует многосегментные сообщения для хранения оболочек, поэтому они также копируются при копировании сообщений.Однако перед использованием продвинутых запрос-ответных моделей важно понять механизм оболочек. Вот как работает механизм оболочек в ROUTER:
Если вы запишете сообщение, полученное из ROUTER A (включая оболочку), в ROUTER B (то есть отправите сообщение DEALER, подключенному к ROUTER), то при получении этого сообщения из ROUTER B будет содержать две оболочки.
Основная роль механизма оболочек заключается в том, чтобы ROUTER знал, как передать сообщение правильному объекту-ответчику. Вам следует сохранять эти оболочки в своем коде. Вспомните REP сокет, который последовательно распаковывает оболочки полученного сообщения и передает само сообщение приложению. При отправке сообщения он снова оборачивает его в оболочку и отправляет ROUTER, обеспечивая доставку сообщения правильному объекту-ответчику.
Используя вышеупомянутые принципы, можно создать устройство ROUTER-DEALER:
[ЗАПРОС] <--> [ОТВЕТ]
[ЗАПРОС] <--> [ШЛЯПА--ДЕЛЕР] <--> [ОТВЕТ]
[ЗАПРОС] <--> [ШЛЯПА--ДЕЛЕР] <--> [ШЛЯПА--ДЕЛЕР] <--> [ОТВЕТ]
...и т.д.
Когда вы используете сокет ЗАПРОС для подключения к сокету ШЛЯПА и отправляете запросовое сообщение, вы получите следующее сообщение от ШЛЯПЫ:
Если мы передаем это сообщение через цепочку устройств, в конечном итоге мы получим сообщение с несколькими конвертами. Новый конверт будет находиться в верхней части сообщения.
Далее рассмотрим четыре типа сокетов, используемых нами в режиме запрос-ответ:
ДЕЛЕР является балансировщиком нагрузки, который распределяет сообщения между подключенными узлами и использует механизм справедливого очередного обслуживания для обработки принятых сообщений. ДЕЛЕР действует как комбинация PUSH и PULL.
ЗАПРОС добавляет пустую фрейм в начало сообщения при отправке и удаляет её при приеме. ЗАПРОС основан на ДЕЛЕРЕ, но может продолжать работу только после получения ответа на отправленное сообщение.
ШЛЯПА добавляет конверт в начало сообщения при его получении, указывая источник сообщения. При отправке сообщения ШЛЯПА использует этот конверт для определения узла, которому сообщение должно быть отправлено.* ОТВЕТ сохраняет все данные до первого пустого фрейма при получении сообщения и передает исходные данные приложению. При отправке ответа ОТВЕТ использует сохраненные данные для обертывания ответного сообщения. ОТВЕТ основан на ШЛЯПЕ, но, как и ЗАПРОС, должен завершить оба действия — прием и отправку — прежде чем продолжить работу.
ОТВЕТ требует, чтобы конверт сообщения заканчивался пустым фреймом, поэтому если вы не отправляете сообщение с помощью ЗАПРОС, вам потребуется добавить этот пустой фрейм самостоятельно.
Вы, конечно, спросите, как ШЛЯПА идентифицирует источник сообщения? Ответ, конечно, идентификатор сокета. Мы уже говорили, что сокет может быть временным, и соединённый с ним сокет (например, ШЛЯПА) создаёт идентификатор для него. Сокет также может явно определить свой идентификатор, который затем может использоваться другими сокетами.
Это временный сокет, и ШЛЯПА автоматически создаёт UUID для идентификации источника сообщения.
Это устойчивый сокет, идентификатор которого задаётся самим источником сообщения.
Давайте рассмотрим эти два действия в примере ниже. Программа ниже выводит сообщения, полученные ROUTER-сокетом от двух REP-сокетов: один из которых не имеет заданного идентификатора, а другой имеет заданный идентификатор "Hello".identity.c
//
// Ниже приведён пример использования идентификаторов сокетов в режиме запрос-ответ.
// Важно отметить, что функции, начинающиеся с s_, предоставлены библиотекой zhelpers.h.
// Нам не требуется повторно реализовывать этот код.
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
void *sink = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (sink, "inproc://example");
// Первый сокет использует автоматически сгенерированный идентификатор
void *anonymous = zmq_socket (context, ZMQ_REQ);
zmq_connect (anonymous, "inproc://example");
s_send (anonymous, "ROUTER использует сгенерированный UUID");
s_dump (sink);
// Второй сокет использует заданный идентификатор
void *identified = zmq_socket (context, ZMK_REQ);
zmq_setsockopt (identified, ZMK_IDENTITY, "Hello", 5);
zmq_connect (identified, "inproc://example");
s_send (identified, "ROUTER сокет использует идентификатор REQ сокета");
s_dump (sink);
zmq_close (sink);
zmq_close (anonymous);
zmq_close (identified);
zmq_term (context);
return 0;
}
Результат выполнения:
----------------------------------------
[017] 00314F043F46C441E28DD0AC54BE8DA727
[000]
[026] ROUTER использует сгенерированный UUID
----------------------------------------
[005] Hello
[000]
[038] ROUTER сокет использует идентификатор REQ сокета
Мы уже видели, как ROUTER-сокет использует обёртку для отправки сообщений правильному ответчику. Давайте теперь рассмотрим ROUTER с точки зрения маршрутизации: если использовать определённый формат обёртки при отправке сообщений, ROUTER сможет асинхронно направлять сообщение к соответствующему узлу.Таким образом, поведение ROUTER полностью контролируемо. Перед тем как углубиться в эту тему, давайте внимательно рассмотрим REQ и REP сокеты и придадим им конкретные роли:
"Мамин" сокет позволяет общаться только тогда, когда сама начнет разговор. Мама не так открыта, как папа, и не принимает двусмысленных ответов, как DEALER сокет. Поэтому, чтобы общаться с REQ сокетом, нужно ждать его запроса, после которого он будет ждать ответа, независимо от того, сколько времени это займет.
"Папин" сокет производит впечатление силы и холодности. Он делает одно дело: независимо от вопроса, всегда дает точный ответ. Не стоит ожидать, что REP сокет начнет диалог сам или передаст ваш разговор другому, он этого не сделает.Обычно мы считаем, что запрос-ответ модель обязательно должна быть симметричной, но на самом деле этот процесс может быть асинхронным. Нам достаточно знать адрес нужного нам узла, чтобы отправлять сообщения асинхронно через ROUTER сокет. ROUTER — это единственный сокет в ZMQ, который может определить источник сообщения.Давайте сделаем небольшой обзор маршрутизации в запрос-ответ модели:
Минимум три способа существуют для подключения к ROUTER:
В каждом из этих режимов мы можем полностью контролировать маршрутизацию сообщений, но разные режимы имеют свои специфические применения и потоки сообщений, которые мы будем объяснять по порядку.
Самостоятельная маршрутизация также имеет свои особенности:* Самостоятельная маршрутизация позволяет узлам контролировать направление сообщений, что противоречит правилам ØMQ. Единственная причина использования самостоятельной маршрутизации заключается в том, что ØMQ не предоставляет больше алгоритмов маршрутизации;
ROUTER-DEALER — это самая простая форма маршрутизации. Соединяется ROUTER с несколькими DEALER, используя подходящий алгоритм для распределения сообщений между DEALER. DEALER может быть чёрной дырой (обрабатывает сообщения, но не отправляет ответ), прокси (пересылает сообщения другим узлам) или сервисом (отправляющим обратные сообщения).
Если требуется, чтобы DEALER мог отвечать, то должно быть подключено только одно ROUTER к DEALER, так как DEALER не знает, какой конкретный узел его вызывает. Если несколько узлов вызывают DEALER, он будет выполнять балансировку нагрузки, распределяя сообщения. Однако если DEALER является чёрной дырой, то можно подключить любое количество узлов.
Для чего используется ROUTER-DEALER маршрутизация? Если DEALER возвращает время выполнения задачи ROUTER, то ROUTER сможет определить скорость обработки данных DEALER. Поскольку ROUTER и DEALER являются асинхронными сокетами, мы должны использовать zmq_poll() для обработки этой ситуации.
В следующем примере два DEALER не отправляют сообщения обратно ROUTER. Наш маршрут использует взвешенный случайный алгоритм: отправка вдвое большего количества сообщений одному из DEALER.
rtdealer.c```c // // Пользовательская ROUTER-DEALER маршрутизация // // Этот пример представляет собой однопроцессное приложение для удобства запуска. // Каждый поток имеет свое собственное ZMQ-окружение, поэтому можно рассматривать это как несколько процессов. // #include "zhelpers.h" #include <pthread.h>
// Здесь определены два рабочих потока, код которых одинаков.
static void * worker_task_a (void *args) { void *context = zmq_init (1); void *worker = zmq_socket (context, ZMQ_DEALER); zmq_setsockopt (worker, ZMQ_IDENTITY, "A", 1); zmq_connect (worker, "ipc://routing.ipc");
int total = 0;
while (1) {
// Мы принимаем только вторую часть сообщения
char *request = s_recv (worker);
int finished = (strcmp (request, "END") == 0);
free (request);
if (finished) {
printf ("A received: %d\n", total);
break;
}
total++;
}
zmq_close (worker);
zmq_term (context);
return NULL;
}
static void * worker_task_b (void *args) { void *context = zmq_init (1); void *worker = zmq_socket (context, ZMQ_DEALER); zmq_setsockopt (worker, ZMQ_IDENTITY, "B", 1); zmq_connect (worker, "ipc://routing.ipc");
int total = 0;
while (1) {
// Мы принимаем только вторую часть сообщения
char *request = s_recv (worker);
int finished = (strcmp (request, "END") == 0);
free (request);
if (finished) {
printf ("B received: %d\n", total);
break;
}
total++;
}
zmq_close (worker);
zmq_term (context);
return NULL;
}
int main (void) { void *context = zmq_init (1); void *client = zmq_socket (context, ZMQ_ROUTER); zmq_bind (client, "ipc://routing.ipc");
pthread_t worker;
pthread_create (&worker, NULL, worker_task_a, NULL);
pthread_create (&worker, NULL, worker_task_b, NULL);
s_send(client, "КОНЕЦ");
s_sendmore(client, "B");
s_send(client, "КОНЕЦ");
zmq_close(client);
zmq_term(context);
return 0;
}
```
Вот два пояснения к вышеуказанному коду:
* ROUTER не знает, когда DEALER будет готов, поэтому мы можем использовать сигналы для решения этой проблемы, но чтобы не усложнять пример, используем `sleep(1)` для обработки. Без этого вызова сообщения, отправленные ROUTER сразу после запуска, будут игнорированы ØMQ, и эти сообщения будут отброшены.
* Важно отметить, что, помимо ROUTER, который отбрасывает нераспределенные сообщения, PUB сокет также отбрасывает отправленные сообщения, если нет SUB, подключенного к нему. Другие сокеты хранят нераспределённые сообщения до тех пор, пока они не будут обработаны.
При маршрутизации сообщений к DEALER мы вручную создаем такой конверт:

```Сокет ROUTER удаляет первую часть и передаёт вторую часть соответствующему DEALER. Когда DEALER отправляет сообщение ROUTER, он отправляет только одну часть, а ROUTER добавляет первую часть как конверт и возвращает её обратно.Если вы определяете недействительный адрес конверта, ROUTER просто отбрасывает это сообщение без каких-либо уведомлений. С этим ничего нельзя сделать, так как это может произойти только в двух случаях: либо целевой узел больше не существует, либо адрес был указан неверно в программе. Как узнать, что сообщение было правильно маршрутизировано? Единственный способ — получить обратную связь от маршрутизируемого узла. Это будет подробно объяснено в последующих главах.
Работа DEALER аналогична тому, как работает комбинация PUSH и PULL. Однако, невозможно использовать PUSH или PULL для создания запрос-ответ модели.
### Маршрутизация на основе алгоритма LRU
Ранее мы говорили, что сокет REQ всегда является инициатором диалога и затем ждет ответа. Эта особенность позволяет поддерживать несколько сокетов REQ в режиме ожидания. Другими словами, сокет REQ сообщает, что он готов.
Вы можете соединить ROUTER с несколькими REQ, и процесс запрос-ответ будет следующим:
* REQ отправляет сообщение ROUTER
* ROUTER возвращает сообщение REQ
* REQ отправляет сообщение ROUTER
* ROUTER возвращает сообщение REQ
* ...
Как и DEALER, REQ может быть связан только с одним ROUTER, за исключением случаев, когда вы хотите сделать что-то вроде многоуровневого маршрутизации (я даже не хочу объяснять это здесь), что приведет к чрезмерной сложности и вынудит вас отказаться от этого.Для чего используется режим ROUTER-REQ? Самым распространенным применением является маршрутизация на основе алгоритма LRU (Least Recently Used), где запросы ROUTER направляются к тому REQ, который дольше всего находится в ожидании.
Вот пример:
```c
//
// Пользовательский ROUTER-REQ маршрут
//
#include "zhelpers.h"
#include <pthread.h>
#define NBR_WORKERS 10
static void *
worker_task(void *args)
{
void *context = zmq_init(1);
void *worker = zmq_socket(context, ZMQ_REQ);
// Функция s_set_id() генерирует печатаемую строку на основе сокета,
// которая используется как идентификатор для этого сокета.
s_set_id(worker);
zmq_connect(worker, "ipc://routing.ipc");
int total = 0;
while (1) {
// Уведомляем ROUTER, что мы готовы к работе
s_send(worker, "ready");
// Получаем работу от ROUTER до тех пор, пока не получим сообщение о завершении
char *workload = s_recv(worker);
int finished = (strcmp(workload, "END") == 0);
free(workload);
if (finished) {
printf("Обработано: %d задач\n", total);
break;
}
total++;
// Случайное ожидание некоторого времени
s_sleep(randof(1000) + 1);
}
zmq_close(worker);
zmq_term(context);
return NULL;
}
int main(void)
{
void *context = zmq_init(1);
void *client = zmq_socket(context, ZMQ_ROUTER);
zmq_bind(client, "ipc://routing.ipc");
srandom((unsigned) time(NULL));
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
pthread_t worker;
pthread_create(&worker, NULL, worker_task, NULL);
}
int task_nbr;
for (task_nbr = 0; task_nbr < NBR_WORKERS * 10; task_nbr++) {
// Наименее используемый worker находится в очереди сообщений
char *address = s_recv(client);
char *empty = s_recv(client);
free(empty);
char *ready = s_recv(client);
free(ready);
``````markdown
s_sendmore(client, address);
s_sendmore(client, "");
s_send(client, "Это рабочая нагрузка");
free(address);
}
// Уведомляем все REQ сокеты о завершении работы
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
char *address = s_recv(client);
char *empty = s_recv(client);
free(empty);
char *ready = s_recv(client);
free(ready);
s_sendmore(client, address);
s_sendmore(client, "");
s_send(client, "Завершение работы");
free(address);
}
zmq_close(client);
zmq_term(context);
return 0;
}
```В этом примере реализация алгоритма LRU не требует использования специальных данных структур, так как механизм очередей сообщений ØMQ уже предоставляет эквивалентную реализацию. Более реалистичная реализация алгоритма LRU должна была бы собирать подготовленных worker'ов в очередь для распределения. Мы рассмотрим этот пример позже.
```Результат выполнения программы выводит количество задач, обработанных каждым worker'ом. Поскольку REQ сокет случайным образом ждет некоторое время, а мы не выполнили балансировку нагрузки, мы ожидаем, что каждый worker будет иметь приблизительно равное количество обработанных задач. Это и есть результат выполнения программы.
```
Обработано: 8 задач
Обработано: 8 задач
Обработано: 11 задач
Обработано: 7 задач
Обработано: 9 задач
Обработано: 11 задач
Обработано: 14 задач
Обработано: 11 задач
Обработано: 11 задач
Обработано: 10 задач
```
Несколько замечаний относительно приведенного выше кода:
* В отличие от предыдущего примера, нам не нужно было ждать некоторое время, так как REQ сокет явно сообщает ROUTER, что он готов.
* Мы использовали функцию `s_set_id()` из заголовочного файла `zhelpers.h`, чтобы создать печатаемый строковый идентификатор для сокета. Это сделано для упрощения примера. В реальном мире REQ сокеты являются анонимными, и вам придется использовать `zmq_recv()` и `zmq_send()` для обработки сообщений, так как `s_recv()` и `s_send()` поддерживают только строковые идентификаторы сокетов.
* Хуже того, мы использовали случайные идентификаторы, что недопустимо в реальном мире для постоянных сокетов, так как это может привести к исчерпанию узлов.* Если вы просто скопировали приведённый выше код без полного понимания его работы, то вы можете столкнуться с последствиями, аналогичными тем, которые испытает человек, который увидел, как Человек-Паук прыгнул с крыши, и решил сделать то же самое.При маршрутизации сообщений к REQ сокетам следует учитывать определённый формат: адрес-пустой фрейм-сообщение:

### Маршрутизация по адресу
В классической модели запрос-ответ ROUTER обычно не взаимодействует с REP сокетами, а DEALER взаимодействует с REP. DEALER случайным образом распределяет сообщения между несколькими REP и получает результаты. ROUTER лучше всего работает с REQ сокетами.
Нам следует помнить, что классическая модель ØMQ часто является наиболее эффективной, поскольку пути, по которым ходят люди, обычно являются хорошими. Если вы решите действовать вопреки общепринятым правилам, вы можете оказаться в очень глубокой яме. Давайте соединим ROUTER с REP и посмотрим, что произойдёт.
REP сокет имеет два основных свойства: *Он должен завершить полный цикл запрос-ответ;*
*Он может принимать пакеты любого размера и возвращать их полностью.*
В обычном режиме запрос-ответ (REQ-REP), REP является анонимным и может быть заменён в любое время. Поскольку мы здесь будем использовать кастомный маршрут, нам нужно будет отправлять сообщение конкретному REP A, а не REP B. Это гарантирует, что одна сторона сети принадлежит вам, а другая — конкретному REP.Одной из ключевых концепций ØMQ является то, что периферийные узлы должны быть максимально умными и многочисленными, в то время как middleware должен быть фиксированным и простым. Это означает, что периферийные узлы могут отправлять сообщения конкретным узлам, таким как конкретному REP. В данном случае мы пока не будем рассматривать маршрутизацию между несколькими узлами, а сосредоточимся на последнем шаге, где ROUTER взаимодействует с конкретным REP.
Эта диаграмма описывает следующие события:
* Клиент имеет сообщение, которое в будущем будет отправлено обратно через другой ROUTER. Это сообщение содержит два адреса, пустую рамку и содержание;
* Клиент отправляет это сообщение ROUTER, указывая адрес REP;
* ROUTER удаляет этот адрес и решает, какой из REP получит это сообщение;
* REP получает сообщение, содержащее адрес, пустую рамку и содержание;
* REP удаляет все содержимое до пустой рамки и передает его рабочему для обработки сообщения;
* Рабочий завершает обработку и отправляет ответ REP;
* REP обертывает этот ответ в сохраненный ранее пакет и отправляет его ROUTER;
* ROUTER добавляет новую рамку с адресом REP в начало этого ответа.
Процесс кажется сложным, но важно понять его. Просто запомните, что REP сокеты возвращают пакеты в том же виде, в котором они были получены.**rtpapa.c**```c
//
// Пользовательский ROUTER-REP маршрут
//
#include "zhelpers.h"
// Здесь используется один процесс для акцента на последовательность событий
int main (void)
{
void *context = zmq_init (1);
void *client = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (client, "ipc://routing.ipc");
void *worker = zmq_socket (context, ZMQ_REP);
zmq_setsockopt (worker, ZMQ_IDENTITY, "A", 1);
zmq_connect (worker, "ipc://routing.ipc");
// Ожидание подключения worker
sleep (1);
// Отправка идентификатора REP, адреса, пустого фрейма и содержимого сообщения
s_sendmore (client, "A");
s_sendmore (client, "address 3");
s_sendmore (client, "address 2");
s_sendmore (client, "address 1");
s_sendmore (client, "");
s_send (client, "Это рабочий объем");
// Worker получает только содержимое сообщения
s_dump (worker);
// Worker не должен обрабатывать конверт
s_send (worker, "Это ответ");
// Посмотрим, что получено в ROUTER
s_dump (client);
zmq_close (client);
zmq_close (worker);
zmq_term (context);
return 0;
}
```Результат выполнения```
----------------------------------------
[020] Это нагрузка
----------------------------------------
[001] A
[009] адрес 3
[009] адрес 2
[009] адрес 1
[000]
[017] Это ответ
```
Несколько замечаний по данному коду:
* В реальных условиях ROUTER и REP сокеты находятся на разных узлах. В данном примере многопроцессинг не включен, чтобы сделать последовательность событий более понятной.
* `zmq_connect()` не завершается мгновенно; соединение между REP и ROUTER может занять некоторое время. В реальных условиях ROUTER не может знать, успешно ли подключился REP, пока не получит от него какого-либо ответа. В данном примере используется `sleep(1)` для решения этой проблемы; если этого не сделать, REP не сможет получить сообщение (попробуйте сами).
* Мы используем идентификатор сокета REP для маршрутизации; если вы сомневаетесь, попробуйте отправить сообщение B и посмотрите, сможет ли A его получить.
* Функции, такие как `s_dump()`, взяты из файла `zhelpers.h`. Вы можете заметить, что код для подключения сокетов одинаков, поэтому мы можем строить надстройки над ØMQ API. Подробнее об этом будет рассказано при обсуждении сложных приложений.
Чтобы маршрутизировать сообщения к REP, нам нужно создать оболочку, которую он сможет распознать:

### Прокси-сервер для режима запрос-ответ
```В этом разделе мы вспомним, как использовать оболочки ØMQ, и попробуем создать универсальный прокси-сервер. Мы создадим очередь, которая будет соединять несколько клиентов и рабочих процессов, а алгоритм маршрутизации можно будет выбирать самостоятельно. В данном случае мы выбрали алгоритм наименьшего недавнего использования, так как это практично для балансировки нагрузки.Сначала вспомним классический режим запрос-ответ, попробуем использовать его для создания постоянно растущей сети услуг. Самый базовый запрос-ответ модель выглядит следующим образом:

Эта модель поддерживает несколько REP сокетов, но если мы хотим поддерживать несколько REQ сокетов, нам потребуется промежуточный слой, который обычно состоит из ROUTER и DEALER. Этот слой просто перемещает информацию между двумя сокетами, поэтому мы можем использовать готовую ZMQ_QUEUE для этого.
```
+--------+ +--------+ +--------+
| Клиент | | Клиент | | Клиент |
+--------+ +--------+ +--------+
| REQ | | REQ | | REQ |
+---+----+ +---+----+ +---+----+
| | |
+-----------+-----------+
|
+---+----+
| ROUTER |
+--------+
| Устройство |
+--------+
| DEALER |
+---+----+
|
+-----------+-----------+
| | |
+---+----+ +---+----+ +---+----+
| REP | | REP | | REP |
+--------+ +--------+ +--------+
| Рабочий | | Рабочий | | Рабочий |
+--------+ +--------+ +--------+
```
Рисунок # - Расширенный запрос-ответ
Основная идея заключается в том, что ROUTER запоминает, от какого REQ пришло сообщение, создавая при этом конверт. Сокеты DEALER и REP не меняют содержимое конверта при передаче сообщений. Когда сообщение возвращается ROUTER, он знает, куда его отправить. В данной модели REP сокет анонимен и не имеет конкретного адреса, поэтому может предоставлять только один тип услуги.В приведённой структуре для маршрутизации REP используется встроенный алгоритм балансировки нагрузки DEALER. Однако, мы хотим использовать алгоритм LRU для маршрутизации, что требует использования режима ROUTER-REP:

Эта очередь LRU между двумя ROUTER сокетами не просто перемещает сообщения, что делает следующий код более сложным, но тем не менее, она широко используется в модели запрос-ответ.**lruqueue.c**```c
//
// Устройство, использующее алгоритм LRU
// Client и worker находятся в разных потоках
//
#include "zhelpers.h"
#include <pthread.h>
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
// Операция выталкивания, реализованная с использованием массива, который может хранить любой тип данных
#define DEQUEUE(q) memmove(&(q)[0], &(q)[1], sizeof(q) - sizeof(q[0]))
// Реализация базового паттерна запрос-ответ с использованием REQ сокета
// Поскольку s_send() и s_recv() не могут обрабатывать бинарные метки сокетов 0MQ,
// здесь генерируется печатаемая строка-идентификатор.
//
static void *
client_task (void *args)
{
void *context = zmq_init (1);
void *client = zmq_socket (context, ZMQ_REQ);
s_set_id (client); // Установка печатаемого идентификатора
zmq_connect (client, "ipc://frontend.ipc");
// Отправка запроса и получение ответа
s_send (client, "HELLO");
char *reply = s_recv (client);
printf ("Client: %s\n", reply);
free (reply);
zmq_close (client);
zmq_term (context);
return NULL;
}
// Worker использует REQ сокет для реализации алгоритма LRU
//
static void *
worker_task (void *args)
{
void *context = zmq_init (1);
void *worker = zmq_socket (context, ZMQ_REQ);
s_set_id (worker); // Установка печатаемого идентификатора
zmq_connect (worker, "ipc://backend.ipc");
// Уведомление агента о готовности worker
s_send (worker, "READY");
while (1) {
// Сохранение всех данных перед пустым фреймом (оболочки),
// в данном примере перед пустым фреймом находится один фрейм, но может быть больше.
char *address = s_recv (worker);
char *empty = s_recv (worker);
assert (*empty == 0);
free (empty);
// Получение запроса и отправка ответа
``` char *request = s_recv(worker);
printf("Worker: %s\n", request);
free(request);
s_sendmore(worker, address);
s_sendmore(worker, "");
s_send(worker, "OK");
free(address);
}
zmq_close(worker);
zmq_term(context);
return NULL;
}
int main(void)
{
// Подготовка контекста и сокетов 0MQ
void *context = zmq_init(1);
void *frontend = zmq_socket(context, ZMQ_ROUTER);
void *backend = zmq_socket(context, ZMQ_ROUTER);
zmq_bind(frontend, "ipc://frontend.ipc");
zmq_bind(backend, "ipc://backend.ipc");
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) {
pthread_t client;
pthread_create(&client, NULL, client_task, NULL);
}
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
pthread_t worker;
pthread_create(&worker, NULL, worker_task, NULL);
}
// Логика LRU
// - Всегда получает сообщения из backend; начинает получать сообщения из frontend, когда более одного worker свободен.
// - Когда worker отвечает, он помечается как готовый, и ответ worker передаётся клиенту.
// - Если клиент отправляет запрос, его запрос передаётся следующему worker.
// Хранит список доступных workers
int available_workers = 0;
char *worker_queue[10];
while (1) {
zmq_pollitem_t items[] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
zmq_poll(items, available_workers ? 2 : 1, -1);
// Обрабатывает очередь workers из backend
if (items[0].revents & ZMQ_POLLIN) {
// Добавляет адрес worker в очередь
char *worker_addr = s_recv(backend);
assert(available_workers < NBR_WORKERS);
worker_queue[available_workers++] = worker_addr;
// Пропускает пустую рамку
char *empty = s_recv(backend);
assert(empty[0] == 0);
free(empty);
// Третья рамка содержит "READY" или адрес клиента
char *client_addr = s_recv(backend);
// Если это ответное сообщение, передаёт его клиенту
if (strcmp(client_addr, "READY") != 0) {
char *empty = s_recv(backend);
assert(empty[0] == 0);
free(empty);
char *reply = s_recv(backend);
s_sendmore(frontend, client_addr);
s_sendmore(frontend, "");
s_send(frontend, reply);
free(reply);
if (--client_nbr == 0)
break; // Выходит после обработки N сообщений
}
free(client_addr);
}
if (items[1].revents & ZMQ_POLLIN) {
// Получает запрос следующего клиента и передаёт его свободному worker
// Формат запроса клиента: [адрес клиента][пустая рамка][содержание запроса]
char *client_addr = s_recv(frontend);
char *empty = s_recv(frontend);
assert(empty[0] == 0);
free(empty);
char *request = s_recv(frontend);
s_sendmore(backend, worker_queue[0]);
s_sendmore(backend, "");
s_sendmore(backend, client_addr);
s_sendmore(backend, "");
s_send(backend, request);
free(client_addr);
free(request);
// освобождает адрес данного worker
free(worker_queue[0]);
DEQUEUE(worker_queue);
available_workers--;
}
}
}
zmq_close(frontend);
zmq_close(backend);
zmq_term(context);
return 0;
```
```Этот программный код имеет два ключевых аспекта: 1) обработка пакетов каждым сокетом; 2) алгоритм LRU. Сначала рассмотрим формат пакетов.Мы знаем, что сокет REQ при отправке сообщений добавляет в начало сообщения пустую рамку и автоматически удаляет её при получении. Наша задача — удовлетворять требованиям REQ при передаче сообщений, правильно обрабатывая пустые рамки. Также следует учесть, что сокет ROUTER добавляет адрес источника в начало каждого полученного сообщения.
Давайте пройдёмся по полному циклу запроса-ответа. Мы установим метку "CLIENT" для сокета клиента и "WORKER" для рабочего. Вот сообщение, отправленное клиентом:

Формат сообщения, полученного агентом из ROUTER, выглядит следующим образом:

Агент извлекает адрес свободного рабочего из очереди LRU и добавляет его как пакет к сообщению, передавая его ROUTER. Не забудьте добавить пустую рамку.

При получении сообщения сокетом REQ (рабочий), пакет и пустая рамка будут удалены:

Как видно, сообщение, полученное рабочим, совпадает с сообщением, полученным клиентом. Рабочий должен сохранить пакет и работать только с содержанием сообщения.
При обратной передаче:* рабочий через REQ передаёт устройству сообщение \[адрес клиента\]\[пустая рамка\]\[ответное содержание\];
* устройство извлекает из рабочего ROUTER сообщение \[адрес рабочего\]\[пустая рамка\]\[адрес клиента\]\[пустая рамка\]\[ответное содержание\];
* устройство сохраняет адрес рабочего и передаёт \[адрес клиента\]\[пустая рамка\]\[ответное содержание\] клиентскому ROUTER;
* клиент извлекает из REQ \[ответное содержание\].Теперь рассмотрим алгоритм LRU, который требует использования сокетов REQ для клиента и рабочего, а также правильного хранения и возврата пакетов сообщений. В частности:
* создайте группу poll, которая постоянно извлекает сообщения из backend (ROUTER рабочего); сообщения извлекаются из frontend (ROUTER клиента) только когда есть свободный рабочий;
* повторяйте процесс poll
* если в backend есть сообщение, это может быть либо сообщение READY (рабочий готов к распределению), либо ответное сообщение (необходимо передать клиенту). В обоих случаях адрес рабочего сохраняется в очереди LRU, и если есть ответное содержание, оно передаётся соответствующему клиенту.
* Если у frontend есть сообщение, мы извлекаем следующего worker из LRU-очереди и отправляем ему запрос. Для этого необходимо отправить [адрес worker-a][пустой фрейм][адрес клиента][пустой фрейм][содержимое запроса] на ROUTER-конечную точку worker-a.
Можно расширить этот алгоритм, выполнив самотестирование worker-a при запуске, чтобы вычислить его скорость обработки и передать её вместе с сообщением READY агенту. Это позволит агенту учитывать эту информацию при распределении задач.
### Обёртка над верхним уровнем API ØMQ
Использование API ØMQ для работы с многосегментными сообщениями может быть сложным, как показано в следующем коде:```c
while (1) {
// Сохраняем все содержимое фрейма (обёртки) перед пустым фреймом,
// в данном примере перед пустым фреймом находится только один фрейм, но может быть больше.
zmq_msg_t address;
zmq_msg_init(&address);
zmq_recv(worker, &address, 0);
zmq_msg_t empty;
zmq_msg_init(&empty);
zmq_recv(worker, &empty, 0);
// Получаем запрос и отправляем ответ
zmq_msg_t payload;
zmq_msg_init(&payload);
zmq_recv(worker, &payload, 0);
int char_nbr;
printf("Worker: ");
for (char_nbr = 0; char_nbr < zmq_msg_size(&payload); char_nbr++)
printf("%c", *(char *)(zmq_msg_data(&payload) + char_nbr));
printf("\n");
zmq_msg_init_size(&payload, 2);
memcpy(zmq_msg_data(&payload), "OK", 2);
zmq_send(worker, &address, ZMQ_SNDMORE);
zmq_close(&address);
zmq_send(worker, &empty, ZMQ_SNDMORE);
zmq_close(&empty);
zmq_send(worker, &payload, 0);
zmq_close(&payload);
}
```
Этот код не удовлетворяет требованиям переиспользования, так как он может обрабатывать только один фрейм обёртки. На самом деле, этот код уже представляет собой некоторую обёртку, и если использовать низкоуровневые API ØMQ, код будет ещё более длинным:
```c
while (1) {
// Сохраняем все содержимое фрейма (обёртки) перед пустым фреймом,
// в данном примере перед пустым фреймом находится только один фрейм, но может быть больше.
zmq_msg_t address;
zmq_msg_init(&address);
zmq_recv(worker, &address, 0);
zmq_msg_t empty;
zmq_msg_init(&empty);
zmq_recv(worker, &empty, 0);
// Получаем запрос и отправляем ответ
zmq_msg_t payload;
zmq_msg_init(&payload);
zmq_recv(worker, &payload, 0);
int char_nbr;
printf("Worker: ");
for (char_nbr = 0; char_nbr < zmq_msg_size(&payload); char_nbr++)
printf("%c", *(char *)(zmq_msg_data(&payload) + char_nbr));
printf("\n");
zmq_msg_init_size(&payload, 2);
memcpy(zmq_msg_data(&payload), "OK", 2);
zmq_send(worker, &address, ZMQ_SNDMORE);
zmq_close(&address);
zmq_send(worker, &empty, ZMQ_SNDMORE);
zmq_close(&empty);
zmq_send(worker, &payload, 0);
zmq_close(&payload);
}
```Наш идеальный API должен был бы принимать и обрабатывать полное сообщение за один шаг, включая обёртку. Низкоуровневые API ØMQ не предназначены для этой цели, но мы можем создать дополнительную обёртку над ними, что является важной частью изучения ØMQ. Создание такого API представляет определённую сложность, поскольку необходимо избегать чрезмерной частоты копирования данных. Кроме того, ØMQ использует термин «сообщение» для определения отдельных сегментов сообщений и частей этих сегментов, при этом сообщение может быть как строковым, так и двоичным, что увеличивает сложность разработки API.
Одним из решений является использование новых названий: строка (s_send() и s_recv() уже используются), кадр (часть сообщения), сообщение (один или несколько кадров). Ниже приведён пример переписанного worker'а с использованием нового API:
```c
while (1) {
zmsg_t *zmsg = zmsg_recv(worker);
zframe_print(zmsg_last(zmsg), "Worker: ");
zframe_reset(zmsg_last(zmsg), "OK", 2);
zmsg_send(&zmsg, worker);
}
```
Замена 22 строк кода на 4 строки значительно упрощает понимание кода. Можно использовать этот подход для дальнейшего развития других API, чтобы достичь следующих целей:
* Автоматическое управление сокетами. Ручное закрытие каждого сокета является утомительной задачей, а установка времени жизни также не всегда требуется. Поэтому было бы полезно автоматически закрывать сокеты при завершении контекста.* Удобное управление потоками. Большинство приложений ØMQ используют многопоточность, но POSIX API для работы с потоками не всегда удобен. Поэтому можно создать более удобный интерфейс.
* Удобное управление временем. Получение значения в миллисекундах или пауза на несколько миллисекунд может быть затруднительным. Наш API должен предоставлять такие возможности.
* Реактор для замены zmq_poll(). Цикл опроса прост, но громоздок и создаёт повторяющийся код: вычисление времени, обработка информации в сокетах и т.д. Реактор для управления чтением/записью сокетов и контролем времени был бы очень полезен.
* Правильная обработка клавиши Ctrl+C. Мы уже видели, как обрабатывать прерывания, и было бы полезно иметь такое же поведение для всех программ.
Эти требования можно реализовать с помощью расширения czmq. Это расширение существует давно и предоставляет множество верхнеуровневых обёрток для ØMQ, а также некоторые данные структуры (хэши, списки и т.д.).
Ниже представлен пример переписанного LRU-агента с использованием czmq:**lruqueue2.c**
```c
//
// ЛRU сообщение очереди устройства, использует библиотеку czmq
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // Информация о готовности worker
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://frontend.ipc");
//
// Отправка запроса и получение ответа
//
while (1) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break;
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}
//
// Worker использует REQ сокет, реализует ЛRU маршрутизацию
//
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://backend.ipc");
//
// Уведомление агента о готовности worker
//
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
//
// Получение сообщения и его обработка
//
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Завершение
//zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
int main (void)
{
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "ipc://frontend.ipc");
zsocket_bind (backend, "ipc://backend.ipc");
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (ctx, client_task, NULL);
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (ctx, worker_task, NULL);
//
// ЛRU логика
}
``````markdown
// - всегда получает сообщения из backend; получает сообщения из frontend только когда есть более одного свободного worker.
// - когда worker отвечает, он помечается как готовый и ответ worker передается клиенту
// - если клиент отправляет запрос, он передается следующему worker
//
// очередь доступных worker
zlist_t *workers = zlist_new ();
while (1) {
// инициализация poll
zmq_pollitem_t items[] = {
{backend, 0, ZMQ_POLLIN, 0},
{frontend, 0, ZMQ_POLLIN, 0}
};
// Когда есть доступный worker, получаем сообщение с frontend
int rc = zmq_poll(items, zlist_size(workers) ? 2 : 1, -1);
if (rc == -1)
break; // Прерывание
// Обрабатываем сообщение, полученное с backend
if (items[0].revents & ZMQ_POLLIN) {
// Используем адрес worker для LRU маршрутизации
zmsg_t *msg = zmsg_recv(backend);
if (!msg)
break; // Прерывание
zframe_t *address = zmsg_unwrap(msg);
zlist_append(workers, address);
// Если это не сообщение READY, передаем его клиенту
zframe_t *frame = zmsg_first(msg);
if (memcmp(zframe_data(frame), LRU_READY, 1) == 0)
zmsg_destroy(&msg);
else
zmsg_send(&msg, frontend);
}
if (items[1].revents & ZMQ_POLLIN) {
// Получаем запрос от клиента и передаем его worker
zmsg_t *msg = zmsg_recv(frontend);
if (msg) {
zmsg_wrap(msg, (zframe_t *)zlist_pop(workers));
zmsg_send(&msg, backend);
}
}
}
// Если все завершено, выполняем очистку
while (zlist_size(workers)) {
zframe_t *frame = (zframe_t *)zlist_pop(workers);
zframe_destroy(&frame);
}
zlist_destroy(&workers);
```
zctx_destroy(&ctx);
return 0;
}
```
```czmq предоставляет простую систему прерываний: при нажатии Ctrl-C программа завершает выполнение ØMQ и возвращает -1, errno устанавливается в EINTR. При прерывании программы метод recv класса czmq возвращает NULL, поэтому можно использовать следующий код для проверки:```c
while (1) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupt
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
```
Если используется функция `zmq_poll()`, то проверка может выглядеть так:
```
int rc = zmq_poll (items, zlist_size (workers) ? 2 : 1, -1);
if (rc == -1)
break; // Interrupt
```
В примере выше используется оригинальная функция `zmq_poll()`. Также можно использовать реактор `zloop`, предоставленный `czmq`, который позволяет:
* Получать сообщения с любого сокета, то есть если на сокете есть сообщение, это вызывает функцию;
* Остановить чтение сообщений с сокета;
* Установить таймеры, чтобы периодически читать сообщения.
Внутри `zloop` используется функция `zmq_poll()`, но он позволяет динамически добавлять и удалять прослушиватели с сокетов, перестраивать пул `zmq_poll` и вычислять следующее событие таймера на основе времени ожидания в `zmq_poll`.
Используя этот паттерн реактора, наш код становится более компактным:
```c
zloop_t *reactor = zloop_new ();
zloop_reader (reactor, self->backend, s_handle_backend, self);
zloop_start (reactor);
zloop_destroy (&reactor);
```
На самом деле обработка сообщений происходит в других частях программы, и не все могут предпочесть такой стиль, но `zloop` действительно объединяет поведение таймеров и сокетов. В последующих примерах мы будем использовать `zmq_poll()` для простых примеров и `zloop` для сложных.
Теперь мы перепишем устройство LRU очереди с использованием `zloop`:**lruqueue3.c**
```c
//
// ЛРУ очередь, реализованная с использованием czmq и паттерна реактора
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // сообщение о готовности worker
//
// Использует REQ для реализации базовой модели запрос-ответ
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://frontend.ipc");
// Отправляет запрос и получает ответ
while (1) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break;
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}
//
// Worker использует REQ сокет для реализации маршрутизации
//
static void *
worker_task (void *arg_ptr)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://backend.ipc");
// Уведомляет агента о готовности worker
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
// Получает сообщение и обрабатывает его
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // прерывание
//zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
//
// Структура ЛРУ очереди, передается в реактор
typedef struct {
void *frontend; // слушает client
void *backend; // слушает worker
zlist_t *workers; // список доступных worker
} lruqueue_t;
//
// Обработка сообщений от frontend
int s_handle_frontend (zloop_t *loop, void *socket, void *arg)
{
lruqueue_t *self = (lruqueue_t *) arg;
zmsg_t *msg = zmsg_recv (self->frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (self->workers));
zmsg_send (&msg, self->backend);
}
``````markdown
zframe_t *address = zmsg_unwrap(msg);
zlist_append(self->workers, address);
// Когда есть доступный worker, увеличиваем слушатель frontend
if (zlist_size(self->workers) == 1)
zloop_reader(loop, self->frontend, s_handle_frontend, self);
// Если ответ пришел от worker, передаем его клиенту
zframe_t *frame = zmsg_first(msg);
if (memcmp(zframe_data(frame), LRU_READY, 1) == 0)
zmsg_destroy(&msg);
else
zmsg_send(&msg, self->frontend);
}
int main(void)
{
zctx_t *ctx = zctx_new();
lruqueue_t *self = (lruqueue_t *)zmalloc(sizeof(lruqueue_t));
self->frontend = zsocket_new(ctx, ZMQ_ROUTER);
self->backend = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(self->frontend, "ipc://frontend.ipc");
zsocket_bind(self->backend, "ipc://backend.ipc");
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new(ctx, client_task, NULL);
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new(ctx, worker_task, NULL);
// Список доступных worker
self->workers = zlist_new();
// Подготовка и запуск реактора
zloop_t *reactor = zloop_new();
zloop_reader(reactor, self->backend, s_handle_backend, self);
zloop_start(reactor);
zloop_destroy(&reactor);
// Очистка после завершения
while (zlist_size(self->workers))
{
zframe_t *frame = (zframe_t *)zlist_pop(self->workers);
zframe_destroy(&frame);
}
zlist_destroy(&self->workers);
zctx_destroy(&ctx);
free(self);
return 0;
}
```
```Правильная обработка Ctrl+C всё же представляет определённые трудности. Если вы используете класс `zctx`, он автоматически будет её выполнять, но потребуется также соответствующий код. Если `zmq_poll()` вернула `-1`, или метод `recv` (`zstr_recv`, `zframe_recv`, `zmsg_recv`) вернул `NULL`, то следует завершить все циклы. Кроме того, полезно добавить проверку `!zctx_interrupted` в главный внешний цикл.```### Асинхронная структура C/S
В предыдущей модели ROUTER-DEALER мы видели, как клиент асинхронно взаимодействует с несколькими рабочими процессами. Мы можем перевернуть эту структуру, чтобы несколько клиентов асинхронно взаимодействовали с одним сервером:

* Клиент подключается к серверу и отправляет запрос;
* При каждом получении запроса сервер отправляет 0 до N ответов;
* Клиент может одновременно отправлять несколько запросов без необходимости ждать ответа;
* Сервер может одновременно отправлять несколько ответов без необходимости получения новых запросов.
**asyncsrd.c**```c
//
// Асинхронная модель C/S (DEALER-ROUTER)
//
#include "czmq.h"
// ---------------------------------------------------------------------
// Это клиентская задача, которая подключается к серверу, отправляет запрос каждую секунду и собирает/печатает ответы.
// Мы будем запускать несколько клиентских задач с использованием случайных идентификаторов.
static void *client_task(void *args)
{
zctx_t *ctx = zctx_new();
void *client = zsocket_new(ctx, ZMQ_DEALER);
// Устанавливаем случайный идентификатор для удобства отслеживания
char identity[10];
sprintf(identity, "%04X-%04X", randof(0x10000), randof(0x10000));
zsockopt_set_identity(client, identity);
zsocket_connect(client, "tcp://localhost:5570");
zmq_pollitem_t items[] = {{client, 0, ZMQ_POLLIN, 0}};
int request_nbr = 0;
``` while (1)
{
// Получаем сообщение из poll каждую секунду
int centitick;
for (centitick = 0; centitick < 100; centitick++)
{
zmq_poll(items, 1, 10 * ZMQ_POLL_MSEC);
if (items[0].revents & ZMQ_POLLIN)
{
zmsg_t *msg = zmsg_recv(client);
zframe_print(zmsg_last(msg), identity);
zmsg_destroy(&msg);
}
}```md
// Передача сообщений между фронтендом и бэкендом
while (1) {
zmq_pollitem_t items[] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
zmq_poll(items, 2, -1);
if (items[0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv(frontend);
//puts("Запрос от клиента:");
//zmsg_dump(msg);
zmsg_send(&msg, backend);
}
if (items[1].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv(backend);
//puts("Ответ от рабочего:");
//zmsg_dump(msg);
zmsg_send(&msg, frontend);
}
}
zctx_destroy(&ctx);
return NULL;
}
// ---------------------------------------------------------------------
// Это серверная задача, которая использует многопоточность для распределения запросов между несколькими worker'ами и правильного возврата ответов.
// Один worker может обрабатывать только один запрос, но клиент может отправлять несколько запросов одновременно.
static void server_worker(void *args, zctx_t *ctx, void *pipe)
{
while (1) {
zmsg_t *msg = zmsg_recv(pipe);
if (!msg)
break;
// Обработка сообщения
char *address = zmsg_popstr(msg);
char *empty = zmsg_popstr(msg);
free(empty);
int request_nbr = atoi(zmsg_popstr(msg));
free(address);
// Отправка ответа обратно клиенту
zmsg_t *reply = zmsg_new();
zmsg_addstr(reply, address);
zmsg_addstr(reply, "");
zmsg_addstr(reply, zclock_time());
zmsg_send(&reply, pipe);
free(address);
}
}
``````markdown
// Обработка запроса и случайное возвращение нескольких одинаковых ответов с задержкой
static void
server_worker(void *args, zctx_t *ctx, void *pipe)
{
void *worker = zsocket_new(ctx, ZMQ_DEALER);
zsocket_connect(worker, "inproc://backend");
while (1) {
// DEALER сокет возвращает нам оболочку и содержимое сообщения
zmsg_t *msg = zmsg_recv(worker);
zframe_t *address = zmsg_pop(msg);
zframe_t *content = zmsg_pop(msg);
assert(content);
zmsg_destroy(&msg);
// Случайное возвращение от 0 до 4 ответов
int reply, replies = randof(5);
for (reply = 0; reply < replies; reply++) {
// Пауза на некоторое время
zclock_sleep(randof(1000) + 1);
zframe_send(&address, worker, ZFRAME_REUSE + ZFRAME_MORE);
zframe_send(&content, worker, ZFRAME_REUSE);
}
zframe_destroy(&address);
zframe_destroy(&content);
}
}
// Основная программа для запуска нескольких клиентов и одного сервера
int main(void)
{
zctx_t *ctx = zctx_new();
zthread_new(ctx, client_task, NULL);
zthread_new(ctx, client_task, NULL);
zthread_new(ctx, client_task, NULL);
zthread_new(ctx, server_task, NULL);
// Завершение программы через 5 секунд
zclock_sleep(5 * 1000);
zctx_destroy(&ctx);
return 0;
}
```
Запустив вышеуказанный код, можно заметить, что три клиента имеют свои случайные идентификаторы, и каждый запрос получает от 0 до четырёх ответов.
```* Клиент отправляет запрос каждую секунду и получает от 0 до нескольких ответов. Это реализуется с помощью zmq_poll(), но мы не можем поллинговать только один раз в секунду, так как это не позволит своевременно обрабатывать ответы. В программе мы поллим 100 раз в секунду, что позволяет серверу использовать это как своего рода сердцебиение для проверки наличия клиента.* Сервер использует пул рабочих процессов, каждый из которых синхронно обрабатывает один запрос. Мы можем использовать встроенные очереди для перемещения сообщений, но для удобства отладки реализуем этот процесс самостоятельно. Вы можете удалить закомментированные строки и посмотреть на вывод программы.
Общая архитектура этого кода представлена ниже:

Как видно, соединение между клиентом и сервером осуществляется через DEALER-ROUTER, а соединение между сервером и рабочими процессами — через DEALER-DEALER. Если рабочий процесс является синхронным потоком, мы могли бы использовать REP. Однако в данном примере рабочие процессы должны иметь возможность отправлять несколько ответов, поэтому используется асинхронный сокет типа DEALER. Здесь нам не требуется маршрутизация ответов, так как все рабочие процессы подключены к одному серверу.
Рассмотрим использование конвертов для маршрутизации. Клиент отправляет сообщение, и сервер получает информацию, содержащую адрес клиента. Таким образом, у нас есть два возможных варианта связи между сервером и рабочими процессами:* Рабочий процесс получает незамеченное сообщение. Мы используем явно объявленные идентификаторы вместе с ROUTER-сокетом для соединения рабочего процесса и сервера. Такое решение требует, чтобы рабочий процесс заранее сообщил ROUTER о своём существовании, что соответствует LRU-алгоритму, который мы рассматривали ранее.* Рабочий процесс получает сообщение с идентификатором и возвращает ответ с идентификатором. Это требует, чтобы рабочий процесс корректно обрабатывал конверты.
Второй вариант более прост:
```
клиент сервер фронтенд рабочий процесс
[ DEALER ]<---->[ ROUTER <----> DEALER <----> DEALER ]
1 часть 2 части 2 части
```
Когда нам нужно поддерживать диалог между клиентом и сервером, мы сталкиваемся с классической проблемой: клиенты могут быть нефиксированы, и если сохранять сообщения для каждого клиента, система быстро исчерпывает свои ресурсы. Даже если поддерживается постоянное соединение с одним и тем же клиентом, использование временных сокетов (без явно объявленных идентификаторов) приводит к тому, что каждое соединение воспринимается как новое. Чтобы сохранять информацию о клиенте в асинхронных запросах, следует учесть следующие моменты:
* Клиент должен отправлять пульсацию серверу. В данном примере клиент каждую секунду отправляет запрос на сервер, что является надежной системой пульсации.
* Использование сокетного идентификатора клиента для хранения информации, что работает как для временных, так и для постоянных сокетов;
* Обнаружение клиентов, которые прекратили пульсацию. Например, если за две секунды не было получено пульсации от определенного клиента, состояние этого клиента можно удалить.### Практика: маршрутизация через прокси
Давайте объединим все знания и применим их на практике. Наш крупный клиент позвонил с экстренным запросом, чтобы построить крупную облачную инфраструктуру. Он требует, чтобы эта облачная архитектура могла работать через несколько центров обработки данных, каждый из которых будет содержать группу клиентов и рабочих, которые могут эффективно взаимодействовать.
Мы верим, что практика важнее теории, поэтому предложили использовать ZMQ для создания такой системы. Клиент согласился, возможно, потому что он действительно хотел снизить затраты на разработку, или потому что видел много преимуществ ZMQ на Twitter.
#### Подробное описание
После нескольких чашек концентрированного кофе мы готовы начать работу, но здравый смысл подсказывает нам, что стоит сначала проанализировать проблему, прежде чем искать решения. Что должна делать облако? Мы спросили, и клиент ответил:
* Рабочие выполняют задачи на различных устройствах, но способны обрабатывать все типы задач. В каждом кластере может быть сотни рабочих, умноженные на количество кластеров, что делает общее число очень большим.
* Клиенты назначают задачи рабочим, каждая задача является независимой, и каждый клиент хочет найти подходящего рабочего для выполнения задачи как можно быстрее. Клиенты являются нестационарными и часто меняются.* Основная сложность заключается в том, что архитектура должна быть способна легко добавлять и удалять кластеры, вместе с клиентами и рабочими внутри этих кластеров.
* Если в кластере нет доступных рабочих, он передаст задачу доступному рабочему в другом кластере.
* Клиенты отправляют запросы по одному и ждут ответа. Если они не получают ответ в течение X секунд, они повторно отправляют запрос. Это уже реализовано в API клиента.
* Рабочие обрабатывают запросы по одному, их поведение очень простое. Если рабочий выходит из строя, другой скрипт запустит его снова.
После получения ответа мы задали ещё один вопрос:
* Между кластерами есть более высокий уровень сети для соединения их, верно? Клиент подтвердил это. Какую пропускную способность нам нужно обрабатывать? Клиент говорит, что в каждом кластере около тысячи клиентов, а каждый клиент отправляет 10 запросов в секунду. Запросы содержат небольшое количество данных, а ответы также небольшие, не превышая 1КБ каждым.
Мы провели простые расчёты: 2500 клиентов х 10 запросов/секунду х 1000 байт х двустороннюю связь = 50 МБ/секунду, или 400 Мбит/секунду, что не является проблемой для сети 1 Гбит/с, и можно использовать протокол TCP.Таким образом, требования становятся ясными, и нет необходимости в дополнительном оборудовании или протоколах для выполнения этой задачи. Нам просто нужно предоставить эффективный алгоритм маршрутизации и тщательно спроектировать его. Мы начнем с одного кластера (дата-центра) и затем будем думать о том, как их соединять.#### Архитектура одного кластера
Работники и клиенты являются синхронными, мы используем алгоритм LRU для распределения задач между работниками. Каждый работник равен другому, поэтому нам не нужно беспокоиться о сервисах. Работники анонимны, клиенты не будут общаться с конкретным работником, поэтому нам не нужно гарантировать доставку сообщений и повторную отправку при неудаче.
Как было упомянуто выше, клиенты и работники не будут взаимодействовать напрямую, что делает невозможным динамическое добавление и удаление узлов. Поэтому наша базовая модель будет использовать структуру агента, использованную в модели запрос-ответ:

#### Архитектура нескольких кластеров
Теперь мы расширяем кластеры до нескольких, каждый кластер имеет свою группу клиентов и работников, и используют агентов для соединения:

Проблема заключается в следующем: как сделать так, чтобы клиенты одного кластера могли взаимодействовать с работниками другого кластера? Есть несколько решений, давайте рассмотрим их преимущества и недостатки:* Клиенты напрямую соединены с несколькими агентами. Преимущество заключается в том, что нам не нужно менять агентов и сотрудников, но клиенты станут сложнее и должны знать всю структуру. Если мы хотим добавить третий или четвертый кластер, все клиенты потребуют изменений. Таким образом, мы фактически пишем маршрутизацию и функции отказоустойчивости в клиентов, что не является хорошей идеей.* Работники напрямую соединены с несколькими агентами. Однако работники типа REQ не могут этого делать, они могут отвечать только одному агенту. Если использовать REP сокеты, то мы не сможем использовать алгоритм LRU для очередей агентов. Это невозможно, поскольку в нашей структуре мы должны использовать алгоритм LRU для управления работниками. Еще один метод — использование ROUTER сокетов, давайте назовем это схемой 1.* Агенты могут между собой взаимодействовать, что выглядит хорошо, так как не требует добавления большого количества дополнительных соединений. Хотя мы не можем произвольно добавлять агентов, но это можно отложить на потом. В этом случае рабочие и клиенты в кластере не должны беспокоиться о глобальной архитектуре; когда у агента есть свободная производительность, он будет взаимодействовать с другими агентами. Это вариант 2.
Давайте сначала рассмотрим вариант 1, где рабочий одновременно взаимодействует с несколькими агентами:
Это выглядит гибко, но не предоставляет нужных нам возможностей: клиент обращается к удалённому рабочему только тогда, когда рабочий в текущем кластере недоступен. Кроме того, сигнал "готовности" рабочего будет отправлен двум агентам одновременно, что может привести к получению двух одинаковых задач. Этот вариант также имеет ещё одну причину неудачи: мы снова переместили логику маршрутизации на границу сети.Теперь рассмотрим вариант 2, где мы создаём соединения между агентами, не изменяя рабочих и клиентов:

Преимущество этого подхода заключается в том, что мы решаем проблему в одном месте, не затрагивая остальные части системы. Это как секретные переговоры между агентами: парень, у меня есть свободная рабочая сила, если у тебя работы больше, скажи мне, цена договорная.
На самом деле, нам просто нужно создать более сложный алгоритм маршрутизации: агенты становятся подрядчиками друг для друга. У этого подхода есть и другие преимущества:
* В обычной ситуации (например, при наличии одного кластера) этот подход работает так же, как и раньше, а при наличии нескольких кластеров выполняются дополнительные действия.
* Для различных типов работ можно использовать различные модели потока сообщений, такие как использование разных сетевых соединений.
* Расширение архитектуры кажется простым: если необходимо, можно добавить суперагент для управления распределением задач.Теперь начнем писать код. Мы создадим полный кластер в одном процессе, чтобы было удобнее демонстрировать и легко адаптировать для реального использования. Это и есть красота ZMQ: вы можете экспериментировать с минимальными модулями разработки, а затем легко перенести это в реальное приложение. Потоки становятся процессами, модели сообщений и логика остаются без изменений. Каждый "кластерный" процесс будет содержать поток клиента, поток рабочего и поток агента.Мы уже достаточно знакомы с базовой моделью:
* клиент-потоки используют REQ сокеты для отправки запросов агенту-потоку (ROUTER сокет);
* рабочие потоки используют REQ сокеты для обработки и ответа на запросы, полученные от агента-потока (ROUTER сокет);
* агент использует LRU очередь и маршрутизацию для управления запросами.
#### Федеративный режим и режим коллег
Существует множество способов подключения агента, и нам нужно будет выбрать наиболее подходящий. Нам требуется функциональность, которая позволяет другим агентам знать, что "у меня есть свободные рабочие потоки", и начинать принимать и обрабатывать некоторые задачи; также нам нужно сообщать другим агентам, что "хватит, у меня уже достаточно работы". Этот процесс не обязательно должен быть идеальным, иногда мы действительно принимаем больше задач, чем можем обработать, но всё равно постепенно завершаем их.
Наиболее простой способ называется федеративным, при котором агент выступает в роли клиента и рабочего потока для других агентов. Мы можем соединить передний сокет агента с задним сокетом других агентов, и наоборот. Подсказка: в ZMQ можно привязать один сокет к одному конечному адресу, а затем подключиться к другому конечному адресу.
Эта архитектура имеет простую логику: когда у агента нет клиентов, он сообщает другим агентам, что он готов, и начинает обрабатывать задачу. Однако проблема заключается в том, что этот механизм слишком прост, и агент в федеративном режиме может обрабатывать только один запрос за раз. Если клиент и worker строго синхронизированы, то остальные свободные worker в агенте не будут получать задачи. Агент, который мы хотим использовать, должен иметь полностью асинхронную природу.Однако федеративный режим отлично подходит для некоторых приложений, таких как сервисная архитектура (SOA). Поэтому не стоит сразу отказываться от федеративного режима; он просто не подходит для алгоритма LRU и балансировки нагрузки в кластере.
Ещё один способ подключения агентов — режим коллег. Агенты знают о существовании друг друга и используют специальный канал для связи. Давайте рассмотрим это по частям: предположим, что у нас есть N агентов, каждый из которых имеет N-1 коллег, и все агенты используют сообщения одного и того же формата для общения. Есть две вещи, которые следует отметить относительно сообщений между агентами:* Каждый агент должен сообщать всем своим коллегам, сколько у него свободных worker'ов. Это простое сообщение, которое представляет собой постоянно обновляемое число. Очевидно, мы будем использовать PUB-SUB сокеты. Таким образом, каждый агент будет открывать PUB сокет, чтобы постоянно сообщать о своём состоянии; и SUB сокет, чтобы получать информацию от других агентов. Каждый агент должен каким-то образом передавать задачи другим агентам и получать ответы, при этом процесс должен быть асинхронным. Для этого мы будем использовать ROUTER-ROUTER сокеты, других вариантов нет. Каждый агент будет использовать два таких ROUTER сокета: один для получения задач, другой для их распределения. Если бы не использовались два сокета, потребовалось бы дополнительное логическое различие между запросами и ответами, что потребовало бы добавления больше информации в сообщения.Также следует учитывать коммуникацию между агентами и локальными клиентами и рабочими процессами.
#### Церемония именования
Внутри каждого агента есть три потока сообщений, каждый из которых использует два сокета, поэтому всего требуется шесть сокетов. Важно выбрать хорошее имя для этих сокетов, чтобы не запутаться при переходах между ними. У каждого сокета есть определенная задача, которую он выполняет, и это может быть частью его имени. Таким образом, когда мы будем перечитывать этот код позже, он не покажется нам таким незнакомым.
Вот три потока сообщений, которые мы используем:
* Локальный поток запросов-ответов, обеспечивающий коммуникацию между агентами и клиентами, а также между агентами и рабочими процессами;
* Облачный поток запросов-ответов, обеспечивающий коммуникацию между агентами и их коллегами;
* Поток состояния, обеспечивающий обмен информацией между агентами и их коллегами.
Найдение значимых и одинаково длинных имен поможет сделать наш код более аккуратным. Возможно, они не будут иметь прямого отношения друг к другу, но со временем вы привыкнете к ним.Каждый поток сообщений имеет два сокета, которые мы ранее называли "фронтенд" и "бэкенд". Эти названия мы уже использовали много раз: фронтенд отвечает за получение информации или задач; бэкенд отправляет информацию или задачи коллегам. Концептуально, потоки сообщений всегда идут с фронтенда на бэкенд, а ответы — с бэкенда на фронтенд.Поэтому мы решили использовать следующие названия:
* localfe / localbe
* cloudfe / cloudbe
* statefe / statebe
Что касается протокола связи, мы используем IPC. Преимущество использования этого протокола заключается в том, что он работает как протокол офлайн-коммуникации, подобно TCP, но без необходимости использования IP-адресов или DNS-сервисов. Для конечных точек протокола IPC мы будем называть xxx-localfe/be, xxx-cloud, xxx-state, где xxx представляет собой имя кластера. Возможно, вы считаете, что такой способ названия слишком длинный, и лучше просто называть их s1, s2, s3... Однако ваш мозг не является машиной, и он не может сразу понять значение переменной при чтении кода. Использование способа "три потока сообщений, два направления" для запоминания информации проще, чем попытка запомнить "шесть различных сокетов". Вот схема распределения сокетов агента:

Обратите внимание, что мы будем соединять cloudbe с cloudfe других агентов, а также statebe с statefe других агентов.
#### Прототип потока состояния
Поскольку каждый сообщение имеет свои особенности, мы не будем сразу писать весь код, а будем разрабатывать и тестировать его по частям. Когда каждый поток сообщений будет работать корректно, мы соберём их в одно целое. Мы начнем с прототипа потока состояния:
Код, представленный ниже:**peering1: Прототип потока состояния на C**```c
//
// Прототип агента-партнера (часть 1)
// Прототип потока состояния
//
#include "czmq.h"
int main(int argc, char *argv[]) {
// Первый параметр — имя агента
// Остальные параметры — имена различных партнеров
//
if (argc < 2) {
printf("синтаксис: peering1 me {you}...\n");
exit(EXIT_FAILURE);
}
char *self = argv[1];
printf("I: Подготовка агента %s...\n", self);
srandom((unsigned)time(NULL));
// Подготовка контекста и сокета
zctx_t *ctx = zctx_new();
void *statebe = zsocket_new(ctx, ZMQ_PUB);
zsocket_bind(statebe, "ipc://%s-state.ipc", self);
// Подключение statefe-сокета к каждому партнеру
void *statefe = zsocket_new(ctx, ZMQ_SUB);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv[argn];
printf("I: Соединение со статусным потоком агента-партнера '%s'\n", peer);
zsocket_connect(statefe, "ipc://%s-state.ipc", peer);
}
// Отправка и получение сообщений о состоянии
// Время ожидания в функции zmq_poll() равно времени сердцебиения
//
while (1) {
// Инициализация списка объектов poll
zmq_pollitem_t items[] = {
{ statefe, 0, ZMQ_POLLIN, 0 }
};
// Опрос активности сокетов, время ожидания — 1 секунда
int rc = zmq_poll(items, 1, 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
// Обработка полученных сообщений о состоянии
if (items[0].revents & ZMQ_POLLIN) {
char *peer_name = zstr_recv(statefe);
char *available = zstr_recv(statefe);
printf("Агент-партнер %s имеет %s свободных worker-ов\n", peer_name, available);
free(peer_name);
free(available);
}
else {
``` // Отправка случайного числа, представляющего количество свободных worker-ов
zstr_sendm (statebe, self);
zstr_sendf (statebe, "%d", randof (10));
}
}
zctx_destroy (&ctx);
return EXIT_SUCCESS;
}
```Несколько пояснений:* Каждому агенту требуется уникальный идентификатор для генерации соответствующей IPC конечной точки. В реальных условиях агенты должны использовать протокол TCP для подключения, что требует более полной системы конфигурации. Мы рассмотрим это в последующих разделах.
* Ядром программы является цикл zmq_poll(), который обрабатывает полученные сообщения и отправляет состояние агента. Состояние агента отправляется только тогда, когда zmq_poll() выходит из-под воздействия отсутствующих сообщений от партнера. Если бы мы отправляли своё состояние после каждого полученного сообщения, это привело бы к избыточности сообщений.
* Сообщение состояния состоит из двух фреймов: первый фрейм содержит адрес агента, второй — количество свободных рабочих процессов. Нам необходимо сообщать партнёрам свой адрес, чтобы они могли принимать запросы. Единственный способ сделать это — указать его в сообщении.
* Мы не установили идентификатор на SUB сокете, чтобы избежать получения устаревшей информации о состоянии партнёра при подключении.
* Мы не установили пороговое значение (HWM) на PUB сокете, так как подписчики являются мгновенными. Мы также можем установить пороговое значение в 1, но это не обязательно.
Давайте скомпилируем этот код и используем его для моделирования трёх кластеров: DC1, DC2 и DC3. Мы будем запускать следующие команды в разных окнах:```
peering1 DC1 DC2 DC3 # Запускаем DC1 и подключаемся к DC2 и DC3
peering1 DC2 DC1 DC3 # Запускаем DC2 и подключаемся к DC1 и DC3
peering1 DC3 DC1 DC2 # Запускаем DC3 и подключаемся к DC1 и DC2
```
Каждый кластер будет отслеживать состояние своих партнеров и каждую секунду выводить свое текущее состояние.
В реальном программировании мы не будем отправлять свое состояние по расписанию, а будем делать это при изменении состояния. Это может показаться затратным по ширине канала, но фактическое содержание сообщений состояния обычно небольшое, а соединения между кластерами очень быстрые.
Если нам нужно отправлять информацию о состоянии с более точной периодичностью, можно создать дополнительный поток, открыть сокет `statebe` и передавать нерегулярную информацию о состоянии основному потоку, который затем будет отправлять эти сообщения по расписанию. Однако это потребует дополнительного программирования.
#### Прототипы локального и облачного потоков
Теперь давайте создадим прототипы локального и облачного потоков. Этот код будет получать запросы от клиента и случайным образом распределять их между рабочими процессами внутри кластера или между кластерами.
Перед тем как начать писать код, давайте сначала опишем основную логику маршрутизации и составим простой и надёжный дизайн. Нам нужны две очереди: одна для хранения запросов, полученных от локального кластера клиентов, и другая для хранения запросов, полученных от других кластеров. Один из способов — это получение сообщений из передних сокетов локальной машины и облака и помещение их в соответствующие очереди. Однако это кажется излишним, так как ZMQ сокеты сами являются очередями. Поэтому мы можем использовать кэши, предоставляемые ZMQ сокетами, как очереди.Эта техника была использована нами в LRU очередях и работала хорошо. Мы получаем запросы из сокета только тогда, когда есть свободные рабочие процессы или другие кластеры, готовые принять запросы. Мы можем постоянно получать ответы с заднего конца и маршрутизировать их обратно. Если задний конец ничего не отвечает, нет необходимости принимать запросы с переднего конца.
Поэтому наш основной цикл будет выполнять следующие действия:
* Проверка заднего конечного сокета для получения сообщений "готовности" от рабочих процессов или ответов. Если это ответ, он будет маршрутизирован обратно к кластерному клиенту или другому кластеру.
* После получения ответа рабочий процесс будет помечен как доступный, помещен в очередь и счетчик увеличен;
* Если есть доступные рабочие процессы, запрос будет получен, который может прийти от локального клиента или другого кластера. Затем запрос будет передан рабочему процессу в кластере или случайным образом передан другому кластеру.
Здесь мы просто случайным образом отправляем запросы другим кластерам, а не создаем рабочий процесс в агенте для распределения задач между кластерами. Это выглядит глупо, но пока работает.Мы используем идентификатор агента для маршрутизации сообщений до агента. У каждого агента есть своё имя, которое задаётся в командной строке. Если эти имена не совпадают с UUID, автоматически сгенерированными ZMQ для клиентов, мы знаем, что ответ должен быть отправлен обратно клиенту или другому кластеру.Вот код, интересные части отмечены в программе:**peering2: прототип локального и облачного потока на C**
```c
//
// Прокси-партнерское моделирование (часть вторая)
// Прототип потока запросов-ответов
//
// Пример программы использует один процесс, чтобы сделать программу простой,
// каждый поток имеет свой контекст, поэтому можно рассматривать их как несколько процессов.
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // Сообщение: worker готов
// Имя прокси; в реальном мире это имя должно быть настроено
static char *self;
// Клиент использует REQ сокет для запросов-ответов
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://%s-localfe.ipc", self);
while (1) {
// Отправка запроса, получение ответа
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (! reply)
break; // Прерывание
printf ("Клиент: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}
// Worker использует REQ сокет и выполняет LRU маршрутизацию
//
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://%s-localbe.ipc", self);
// Уведомление прокси, что worker готов
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
// Обработка сообщений
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (! msg)
break; // Прерывание
zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
int main (int argc, char *argv [])
{
// Первый аргумент - имя прокси
``` // Остальные аргументы - имена партнерских прокси
//
if (argc < 2) {
printf ("синтаксис: peering2 me {you}. . . \n");
exit (EXIT_FAILURE);
}
self = argv[1];
printf ("Я: Подготовка прокси %s. . . \n", self);
srandom((unsigned)time(NULL));
// Подготовка контекста и сокетов
zctx_t *ctx = zctx_new();
char endpoint[256];
// Привязка cloudfe к конечной точке
void *cloudfe = zsocket_new(ctx, ZMQ_ROUTER);
zsockopt_set_identity(cloudfe, self);
zsocket_bind(cloudfe, "ipc://%s-cloud.ipc", self);
// Соединение cloudbe с конечными точками партнерских прокси
void *cloudbe = zsocket_new(ctx, ZMQ_ROUTER);
zsockopt_set_identity(cloudbe, self);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv[argn];
printf("Я: Подключаюсь к агенту-партнеру '%s' через cloudfe\n", peer);
zsocket_connect(cloudbe, "ipc://%s-cloud.ipc", peer);
}
// Подготовка локального фронтенда и бэкенда
void *localfe = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(localfe, "ipc://%s-localfe.ipc", self);
void *localbe = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(localbe, "ipc://%s-localbe.ipc", self);
// Дождаться подтверждения от пользователя
printf("Подтвердите запуск всех агентов, нажмите любую клавишу для продолжения: ");
getchar();
// Запуск локальных worker'ов
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new(ctx, worker_task, NULL);
// Запуск локальных клиентов
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new(ctx, client_task, NULL);
// Интересная часть
// -------------------------------------------------------------
// Поток запросов-ответов
// - Если есть доступные worker'ы, получаем запросы с локальной или облачной стороны;```markdown
// - Передаем запросы доступному worker'у или другому кластеру.
// Очередь доступных worker'ов
int capacity = 0;
zlist_t *workers = zlist_new ();
while (1) {
zmq_pollitem_t backends [] = {
{ localbe, 0, ZMQ_POLLIN, 0 },
{ cloudbe, 0, ZMQ_POLLIN, 0 }
};
// Если нет доступных worker'ов, ждем
int rc = zmq_poll (backends, 2,
capacity ? 1000 * ZMQ_POLL_MSEC : -1);
if (rc == -1)
break; // Прерывание
// Обработка ответов от локальных worker'ов
zmsg_t *msg = NULL;
if (backends [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localbe);
if (!msg)
break; // Прерывание
zframe_t *address = zmsg_unwrap (msg);
zlist_append (workers, address);
capacity++;
// Если это сигнал "готовности", больше не маршрутизировать
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
}
// Обработка ответов от агента-партнера
else
if (backends [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv(cloudbe);
if (!msg)
break; // прервать
// нам не нужен адрес агента-партнера
zframe_t *address = zmsg_unwrap(msg);
zframe_destroy(&address);
}
// если адрес в ответственном сообщении принадлежит агенту-партнеру, отправляем ему
for (int argn = 2; msg && argn < argc; argn++) {
char *data = (char *) zframe_data(zmsg_first(msg));
size_t size = zframe_size(zmsg_first(msg));
if (size == strlen(argv[argn])
&& memcmp(data, argv[argn], size) == 0)
zmsg_send(&msg, cloudfe);
}
// маршрутизируем ответное сообщение к локальному клиенту
if (msg)
```
zmsg_send(&msg, localfe);
// начинаем обработку запросов от клиента
//
while (capacity) {
zmq_pollitem_t frontends[] = {
{ localfe, 0, ZMQ_POLLIN, 0 },
{ cloudfe, 0, ZMQ_POLLIN, 0 }
};
rc = zmq_poll(frontends, 2, 0);
assert(rc >= 0);
int reroutable = 0;
// приоритетное обслуживание запросов от агента-партнера, чтобы избежать исчерпания ресурсов
if (frontends[1].revents & ZMQ_POLLIN) {
msg = zmsg_recv(cloudfe);
reroutable = 0;
} else
if (frontends[0].revents & ZMQ_POLLIN) {
msg = zmsg_recv(localfe);
reroutable = 1;
} else
break; // нет запросов
// направляем 20% запросов в другие кластеры
//
if (reroutable && argc > 2 && randof(5) == 0) {
// случайное маршрутизирование запросов к агенту-партнеру
int random_peer = randof(argc - 2) + 2;
zmsg_pushmem(msg, argv[random_peer], strlen(argv[random_peer]));
zmsg_send(&msg, cloudbe);
} else {
zframe_t *frame = (zframe_t *) zlist_pop(workers);
zmsg_wrap(msg, frame);
zmsg_send(&msg, localbe);
capacity--;
}
}
}
// завершающая очистка после завершения программы
while (zlist_size(workers)) {
zframe_t *frame = (zframe_t *) zlist_pop(workers);
zframe_destroy(&frame);
}
zlist_destroy(&workers);
zctx_destroy(&ctx);
return EXIT_SUCCESS;
}
```Пожалуйста, предоставьте текст для перевода. Для выполнения вышеуказанного кода в двух окнах используйте следующие команды:```
peering2 me you
peering2 you me
```
Несколько замечаний:
* Библиотека `zmsg` значительно упрощает программу, и такие программы должны стать неотъемлемой частью нашего набора инструментов как ZMQ-программистов;
* Поскольку в программе нет реализации получения состояния соседнего агента, предположим, что все они имеют свободных работников. В реальности мы не будем отправлять запросы агентам, которые не существуют.
* Вы можете запустить эту программу на длительное время, чтобы проверить, появляются ли сообщения об ошибках маршрутизации, так как при возникновении ошибки клиент будет заблокирован. Вы можете попробовать закрыть один из агентов, чтобы увидеть, как агенты не могут маршрутизировать запросы к другим агентам в облаке, клиенты по очереди блокируются, и программа прекращает выводить отладочную информацию.
#### Сборка
Давайте объединим все это в одну программу. Как и раньше, мы выполним всё это в одном процессе. Мы объединим два примера программы из вышеприведённого текста, чтобы создать программу, которая может имитировать любое количество кластеров.
Программа состоит из 270 строк кода и идеально подходит для моделирования полной системы кластера, включая клиентов, работников, агентов и механизм распределения задач в облаке.**peering3: Полное моделирование кластера на C**```c
//
// Подруга агент-симулятор (третья часть)
// Прототип сообщений состояния и задач
//
// Пример программы использует один процесс, чтобы сделать программу простой,
// каждый поток имеет свой собственный объект контекста, поэтому можно считать их многими процессами.
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 5
#define LRU_READY "\001" // Сообщение: worker готов
// Агент имя; в реальности это имя должно быть настроено каким-то образом
static char *self;
// Клиент запрос-ответ использует REQ сокет
// Для моделирования стресс-тестирования клиент отправляет множество запросов одновременно
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://%s-localfe.ipc", self);
void *monitor = zsocket_new (ctx, ZMQ_PUSH);
zsocket_connect (monitor, "ipc://%s-monitor.ipc", self);
while (1) {
sleep (randof (5));
int burst = randof (15);
while (burst--) {
char task_id [5];
sprintf (task_id, "%04X", randof (0x10000));
// Использует случайный шестнадцатеричный ID для идентификации задачи
zstr_send (client, task_id);
// Ждет максимум 10 секунд
zmq_pollitem_t pollset [1] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (pollset, 1, 10 * 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
if (pollset [0].revents & ZMQ_POLLIN) {
char *reply = zstr_recv (client);
if (!reply)
break; // Прерывание
// Ответ worker должен содержать ID задачи
puts (reply);
assert (streq (reply, task_id));
free (reply);
``````markdown
}
else {
zstr_sendf(monitor,
"E: Клиент завершил работу, потеряна задача: %s", task_id);
return NULL;
}
}
}
zctx_destroy(&ctx);
return NULL;
}
// Worker использует REQ сокет и выполняет LRU маршрутизацию
//
static void *
worker_task(void *args)
{
zctx_t *ctx = zctx_new();
void *worker = zsocket_new(ctx, ZMQ_REQ);
zsocket_connect(worker, "ipc://%s-localbe.ipc", self);
// Уведомляет агента, что worker готов
zframe_t *frame = zframe_new(LRU_READY, 1);
zframe_send(&frame, worker, 0);
while (1) {
// Worker будет случайным образом задерживаться на несколько секунд
zmsg_t *msg = zmsg_recv(worker);
sleep(randof(2));
zmsg_send(&msg, worker);
}
zctx_destroy(&ctx);
return NULL;
}
int main(int argc, char *argv[])
{
// Первый аргумент — это имя агента
// Остальные аргументы — это имена агентов-партнеров
//
if (argc < 2) {
printf("Синтаксис: peering3 me {you}...\n");
exit(EXIT_FAILURE);
}
self = argv[1];
printf("Я: Подготовка агента %s...\n", self);
srandom((unsigned)time(NULL));
// Подготовка контекста и сокетов
zctx_t *ctx = zctx_new();
char endpoint[256];
// Привязка cloudfe к конечной точке
void *cloudfe = zsocket_new(ctx, ZMQ_ROUTER);
zsockopt_set_identity(cloudfe, self);
zsocket_bind(cloudfe, "ipc://%s-cloud.ipc", self);
// Привязка statebe к конечной точке
void *statebe = zsocket_new(ctx, ZMQ_PUB);
zsocket_bind(statebe, "ipc://%s-state.ipc", self);
// Привязка cloudbe к конечной точке агентов-партнеров
void *cloudbe = zsocket_new(ctx, ZMQ_ROUTER);
zsockopt_set_identity(cloudbe, self);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv[argn];
printf("Я: Соединение с агентом-партнером '%s' через cloudfe\n", peer);
}
``` zsocket_connect(cloudbe, "ipc://%s-cloud.ipc", peer);
}
// Привязка statefe к конечной точке агентов-партнеров
void *statefe = zsocket_new(ctx, ZMQ_SUB);
for (argn = 2; argn < argc; argn++) {
char *peer = argv[argn];
printf("Я: Соединение с агентом-партнером '%s' через statefe\n", peer);
zsocket_connect(statefe, "ipc://%s-state.ipc", peer);
}
// Подготовка локального фронтенда и бэкенда
void *localfe = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(localfe, "ipc://%s-localfe.ipc", self);
void *localbe = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(localbe, "ipc://%s-localbe.ipc", self);
// Подготовка мониторингового сокета
void *monitor = zsocket_new(ctx, ZMQ_PULL);
zsocket_bind(monitor, "ipc://%s-monitor.ipc", self);
// Запуск локального worker
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new(ctx, worker_task, NULL);
// Запуск локального клиента
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new(ctx, client_task, NULL);
// Интересная часть
// -------------------------------------------------------------
// Публикация-подписка сообщений
// - Проверка состояния соседнего агента;
// - Отправка широковещательных сообщений при изменении собственного состояния.
// Запрос-ответ сообщений
// - Если есть доступные worker'ы, то проверка наличия запросов локально или в облаке;
// - Передача запросов локальным worker'ам или другим кластерам.
// Очередь доступных worker'ов
int local_capacity = 0;
int cloud_capacity = 0;
zlist_t *workers = zlist_new();
while (1) {
zmq_pollitem_t primary[] = {
{ localbe, 0, ZMQ_POLLIN, 0 },
{ cloudbe, 0, ZMQ_POLLIN, 0 },
{ statefe, 0, ZMQ_POLLIN, 0 }, { monitor, 0, ZMQ_POLLIN, 0 }
};
// Если нет доступных worker'ов, то ожидание продолжается
int rc = zmq_poll(primary, 4,
local_capacity ? 1000 * ZMQ_POLL_MSEC : -1);
if (rc == -1)
break; // Прерывание
// Отслеживание изменений собственного состояния
int previous = local_capacity;
// Обработка ответов от локальных worker'ов
zmsg_t *msg = NULL;
if (primary[0].revents & ZMQ_POLLIN) {
msg = zmsg_recv(localbe);
if (!msg)
break; // Прерывание
zframe_t *address = zmsg_unwrap(msg);
zlist_append(workers, address);
local_capacity++;
// Если это сигнал "готовности", то дальнейшая маршрутизация прекращается
zframe_t *frame = zmsg_first(msg);
if (memcmp(zframe_data(frame), LRU_READY, 1) == 0)
zmsg_destroy(&msg);
}
// Обработка ответов от соседнего агента
else if (primary[1].revents & ZMQ_POLLIN) {
msg = zmsg_recv(cloudbe);
if (!msg)
break; // Прерывание
// Адрес соседнего агента нам не требуется
zframe_t *address = zmsg_unwrap(msg);
zframe_destroy(&address);
}
// Если адрес в ответе совпадает с адресом соседнего агента, то отправляем ему сообщение
for (int argn = 2; msg && argn < argc; argn++) {
char *data = (char *)zframe_data(zmsg_first(msg));
size_t size = zframe_size(zmsg_first(msg));
if (size == strlen(argv[argn]) &&
memcmp(data, argv[argn], size) == 0)
zmsg_send(&msg, cloudfe);
}
// Передаем ответ клиенту локально
if (msg)
zmsg_send(&msg, localfe); // Обрабатываем обновление состояния peer proxy
if (primary[2].revents & ZMQ_POLLIN) {
char *status = zstr_recv(statefe);
cloud_capacity = atoi(status);
free(status);
}
// Обрабатываем мониторинговые сообщения
if (primary[3].revents & ZMQ_POLLIN) {
char *status = zstr_recv(monitor);
printf("%s\n", status);
free(status);
}
// Начинаем обработку запросов от клиентов
// - Если есть свободные worker'ы локально, принимаем запросы от локального клиента и облачного сервера;
// - Если есть только свободные peer proxy, принимаем запросы только от локального клиента;
// - Передаем запросы локальным worker'ам или peer proxy.
//
while (local_capacity + cloud_capacity) {
zmq_pollitem_t secondary[] = {
{ localfe, 0, ZMQ_POLLIN, 0 },
{ cloudfe, 0, ZMQ_POLLIN, 0 }
};
if (local_capacity)
rc = zmq_poll(secondary, 2, 0);
else
rc = zmq_poll(secondary, 1, 0);
assert(rc >= 0);
if (secondary[0].revents & ZMQ_POLLIN)
msg = zmsg_recv(localfe);
else
if (secondary[1].revents & ZMQ_POLLIN)
msg = zmsg_recv(cloudfe);
else
break; // Нет задач
if (local_capacity) {
zframe_t *frame = (zframe_t *) zlist_pop(workers);
zmsg_wrap(msg, frame);
zmsg_send(&msg, localbe);
local_capacity--;
} else {
// Случайным образом передаем запросы peer proxy
int random_peer = randof(argc - 2) + 2;
zmsg_pushmem(msg, argv[random_peer], strlen(argv[random_peer]));
zmsg_send(&msg, cloudbe);
}
} if (local_capacity != previous) {
// Добавляем адрес нашего proxy в сообщение
zstr_sendm(statebe, self);
// Отправляем новое состояние
zstr_sendf(statebe, "%d", local_capacity);
}
}
// Очистка после завершения программы
while (zlist_size(workers)) {
zframe_t *frame = (zframe_t *) zlist_pop(workers);
zframe_destroy(&frame);
}
zlist_destroy(&workers);
zctx_destroy(&ctx);
return EXIT_SUCCESS;
} Этот код не слишком длинный, но его отладка заняла примерно один день. Ниже приведены некоторые пояснения:
* `client`-поток будет обнаруживать и сообщать о неудачных запросах, он будет полифонировать агентский сокет, проверяя наличие ответов, с таймаутом в 10 секунд.
* `client`-поток не будет самостоятельно выводить информацию, а будет отправлять сообщения PUSH-потоку мониторинга, который будет выводить сообщения. Это наш первый опыт использования ZMQ для мониторинга и логирования, и мы будем использовать это больше в будущем.
* `client` будет имитировать различные ситуации нагрузки, чтобы кластер работал при различных уровнях нагрузки, поэтому запросы могут быть обработаны локально или отправлены в облако. Количество `client` и `worker` в кластере, количество других кластеров и время задержки будут влиять на результат. Вы можете настроить различные параметры для тестирования.* В основном цикле есть две группы наборов полифонирования, фактически мы можем использовать три: поток информации, бэкенд и фронтенд. Поскольку в предыдущих примерах нет необходимости полифонировать запросы фронтенда, если бэкенд не имеет свободных worker.Вот несколько проблем, с которыми мы столкнулись во время написания:
* Если запрос или ответ потеряются где-то, `client` будет заблокирован. Вспомните, что ROUTER-ROUTER сокеты просто игнорируют сообщения, если они не могут быть маршрутизированы. Одним из подходов здесь является изменение `client`-потока для обнаружения и отчета об этих ошибках. Кроме того, я использую `zmsg_dump()` после каждого `recv()` и перед каждым `send()`, чтобы быстрее находить сообщения.
* Основной цикл может неправильно получать сообщения из нескольких готовых сокетов, что приводит к потере первого сообщения. Решением является получение сообщений только из первого готового сокета.
* Библиотека `zmsg` плохо кодирует UUID в C-строках, что приводит к ошибкам при наличии байта 0 в UUID. Решением является преобразование UUID в печатаемое шестнадцатеричное представление.
Этот симулятор не проверяет наличие агента-партнера. Если вы запустите агента, он отправит состояние другим агентам, а затем завершит работу, другие агенты продолжат отправлять запросы этому агенту. В результате `client` других агентов будут сообщать множество ошибок. Решение состоит из двух частей: установка времени жизни для состояния, чтобы запросы не отправлялись к агенту-партнеру, который отсутствует некоторое время, и повышение надежности запросов-ответов, что будет обсуждаться в следующей главе.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )