В третьей главе мы рассмотрели продвинутые паттерны запрос-ответ с помощью примеров. В этой главе мы обсудим вопросы устойчивости паттерна запрос-ответ и создадим надежную систему запрос-ответ с использованием типов сокетов, предоставляемых ZMQ.
В данной главе будут рассмотрены следующие темы:
Чтобы определить устойчивость, можно сначала определить противоположное понятие — отказ. Если мы можем обрабатывать определенные типы отказов, то наша модель является надежной для этих отказов. Давайте перечислим возможные проблемы в распределенных приложениях ZMQ, начиная с наиболее вероятных отказов:* Программный код приложения является основным источником отказов. Программа может завершиться аварийно, прекратить отклик на запросы или слишком медленно отвечать, что приводит к исчерпанию памяти и другим проблемам.
Поскольку первые пять типов отказов охватывают 99,9% случаев (эта статистика основана на недавнем исследовании), мы будем подробно рассматривать именно эти случаи. Если ваша компания достаточно крупна, чтобы учитывать последние два случая, свяжитесь со мной, так как мне нужна финансовая помощь для создания бассейна на моем заднем дворе.
Кратко говоря, устойчивость означает возможность продолжения работы даже при наличии отказов. Это намного сложнее, чем создание системы передачи сообщений. Мы рассмотрим каждый из ключевых паттернов сообщений, предоставляемых ZMQ, и обсудим способы обеспечения бесперебойной работы кода. Паттерн запрос-ответ: когда сервер прерывает обработку запроса, клиент узнает об этом и прекращает получение сообщений, выбирая ожидание повторной попытки, обращение к другому серверу и т.д. В данном случае мы не будем рассматривать ситуации с проблемами клиента.* Паттерн публикация-подписка: если клиент внезапно завершает работу после получения некоторых сообщений, сервер этого не узнает. Подписчики в паттерне публикация-подписка не отправляют никаких сообщений издателю обратно. Однако подписчики могут связаться с сервером другими способами, такими как паттерн запрос-ответ, и потребовать повторной отправки сообщений. В данном случае мы не будем рассматривать ситуации с проблемами сервера. Кроме того, подписчики могут проверять, не работают ли они слишком медленно, и принимать соответствующие меры (выдавать предупреждения оператору, останавливать работу и т. д.).* Паттерн трубопровод: если worker внезапно завершает работу, задачи-распределитель не узнает об этом. Паттерн трубопровод похож на паттерн публикация-подписка тем, что отправляет сообщения только в одном направлении. Однако конечный результат-собиратель может обнаружить, какие задачи не были выполнены, и сообщить задачи-распределителю перераспределить эти задачи. Если задачи-распределитель или результат-собиратель внезапно завершают работу, то запросы от клиента должны быть обработаны по-другому. Таким образом, системному коду действительно следует минимизировать вероятность ошибок, поскольку это сложно обрабатывать.
В этой главе мы рассмотрим надежное проектирование для паттерна запрос-ответ, а остальные паттерны будут рассмотрены в последующих главах.
Самый базовый паттерн запрос-ответ состоит в том, чтобы REQ-клиент отправлял синхронный запрос к REP-серверу. Надежность такого паттерна очень низкая. Если сервер прерывает обработку запроса, клиент будет находиться в состоянии ожидания вечно.В отличие от протокола TCP, ZMQ предоставляет автоматическое переподключение, балансировку нагрузки при распространении сообщений и т.д. Однако в реальных условиях этого недостаточно. Единственный случай, когда можно полностью доверять базовому паттерну запрос-ответ, это общение между двумя потоками в одном процессе, где нет сетевых проблем или отказа сервера.Однако, добавив некоторые модификации, этот базовый паттерн запрос-ответ может хорошо работать в реальных условиях. Я люблю называть его "пиратским" паттерном.
Кратко говоря, есть три способа, которыми клиент может подключаться к серверу, каждый из которых требует различных решений по обеспечению надежности:
Мы можем достичь надежной модели запрос-ответ, выполнив простые настройки на стороне клиента. Я назову это "ленивым пиратом" (Lazy Pirate) режимом.
При получении ответа мы не ждем его синхронно, а выполняем следующие действия:
Для использования REQ сокета необходимо строго соблюдать последовательность отправки и получения сообщений, так как внутри него используется конечный автомат для управления состоянием. Это может создать проблемы при использовании "пиратского" режима. Самый простой способ — закрыть и перезапустить REQ сокет, чтобы разорвать эту связь.
lpclient: Ленивый пиратский клиент на C
//
// Lazy Pirate client
// Использует zmq_poll для безопасного запроса-ответа
// При выполнении может случайно закрываться или перезапускаться lpserver
//
#include "czmq.h"
#define REQUEST_TIMEOUT 2500 // миллисекунды, (> 1000! )
#define REQUEST_RETRIES 3 // количество попыток
#define SERVER_ENDPOINT "tcp://localhost:5555"
``````c
int main (void)
{
zctx_t *ctx = zctx_new ();
printf ("I: Подключаюсь к серверу... \n");
void *client = zsocket_new (ctx, ZMQ_REQ);
assert (client);
zsocket_connect (client, SERVER_ENDPOINT);
int sequence = 0;
int retries_left = REQUEST_RETRIES;
while (retries_left && ! zctx_interrupted) {
// Отправляем запрос и начинаем получать ответ
char request [10];
sprintf (request, "%d", ++sequence);
zstr_send (client, request);
int expect_reply = 1;
while (expect_reply) {
// Опросим сокет и установим таймаут
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
}
}
}
``````c
// Обрабатываем ответ, если он получен
if (items[0].revents & ZMQ_POLLIN) {
// Сервер вернул ответ, который должен совпадать с номером запроса
char *reply = zstr_recv(client);
if (!reply)
break; // Прерывание
if (atoi(reply) == sequence) {
printf("I: Сервер вернул нормальный ответ (%s)\n", reply);
retries_left = REQUEST_RETRIES;
expect_reply = 0;
} else {
printf("E: Сервер вернул ненормальный ответ: %s\n", reply);
}
free(reply);
} else {
if (--retries_left == 0) {
printf("E: Сервер недоступен, отмена операции\n");
break;
} else {
printf("W: Сервер не отвечает, повторная попытка... \n");
// Закрываем старый сокет и создаем новый
zsocket_destroy(ctx, client);
printf("I: Подключение к серверу... \n");
client = zsocket_new(ctx, ZMQ_REQ);
zsocket_connect(client, SERVER_ENDPOINT);
}
}
}
}
zctx_destroy(&ctx);
return 0;
}
## lpserver: Ленивый пират сервер на C
``````c
//
// Ленивый пират сервер
// Привязывает REQ сокет к tcp://*:5555
// Аналогично hwserver программе, за исключением следующих моментов:
// - Прямое вывод сообщения запроса
// - Случайное замедление работы или завершение программы, моделирующее сбой
//
#include "zhelpers.h"
int main (void)
{
srandom ((unsigned) time (NULL));
void *context = zmq_init (1);
void *server = zmq_socket (context, ZMQ_REP);
zmq_bind (server, "tcp://*:5555");
int cycles = 0;
while (1) {
char *request = s_recv (server);
cycles++;
// После нескольких циклов начинаем моделировать различные сбои
if (cycles > 3 && randof (3) == 0) {
printf ("I: Моделируем программный сбой\n");
break;
}
else if (cycles > 3 && randof (3) == 0) {
printf ("I: Моделируем перегрузку процессора\n");
sleep (2);
}
printf ("I: Нормальный запрос (%s)\n", request);
sleep (1); // Время затратной операции
s_send (server, request);
free (request);
}
zmq_close (server);
zmq_term (context);
return 0;
}
При запуске этого тестового примера можно открыть два терминала: сервер случайным образом будет моделировать сбои, а клиент будет реагировать на эти сбои. Типичный вывод сервера:
I: Нормальный запрос (1)
I: Нормальный запрос (2)
I: Нормальный запрос (3)
I: Моделируем перегрузку процессора
I: Нормальный запрос (4)
I: Моделируем программный сбой
Вывод клиента:``` I: подключаюсь к серверу... I: сервер ответил OK (1) I: сервер ответил OK (2) I: сервер ответил OK (3) W: нет ответа от сервера, повторяем попытку... I: подключаюсь к серверу... W: нет ответа от сервера, повторяем попытку... I: подключаюсь к серверу... E: сервер выглядит как недоступный, прекращаем работу
Клиент использует REQ сокет для отправки запросов и открывает новый сокет при возникновении проблем, чтобы обойти обязательное правило отправки/получения REQ. Возможно, вы подумали бы использовать DEALER сокет, но это плохая идея. Во-первых, DEALER не обрабатывает оболочку так, как это делает REQ (если вы не знаете, что такое оболочка, то использование DEALER еще хуже). Во-вторых, вы можете получить результат, который вам совсем не нужен.
Этот вариант имеет следующие преимущества и недостатки:
* Преимущества: прост в реализации и понимании;
* Преимущества: легко интегрировать с существующими клиентскими и серверными программами;
* Преимущества: ZMQ имеет автоматическую систему повторного подключения;
* Недостатки: при отказе одного сервера невозможно переадресовать запросы на другой доступный сервер.### Базовая надежная очередь (простой пиратский режим)
Во втором режиме мы используем устройство очереди для расширения вышеупомянутого "ленивого пирата" режима, чтобы клиент мог прозрачно взаимодействовать с несколькими серверами. В данном контексте сервер может быть определен как worker. Мы можем начать с базовой модели и поэтапно внедрять этот подход.
Во всех пиратских режимах worker является временным, то есть существует некий общедоступный состояние, такое как общая база данных. Присутствие устройства очереди позволяет worker'ам входить и выходить без ведома клиента. Когда один worker прекращает работу, другой немедленно его заменяет. Эта топология очень проста, но её единственный недостаток заключается в том, что само устройство очереди может стать узким местом и источником однопунктовых отказов.
В третьей главе базовый алгоритм устройства очереди основан на принципе "наименее часто используемый" (Least Recently Used, LRU). Какова же наша реакция, если worker выходит из строя или блокируется? Ответ: практически никакой. Мы уже внедрили механизм повторной попытки в клиенте, поэтому использование базового LRU-устройства очереди будет работать хорошо. Этот подход также соответствует логике ZMQ, поэтому мы можем расширить его, вставив простое устройство очереди в пиратский режим:
Мы можем использовать клиента из "ленивого пирата" режима, а вот код устройства очереди:**spqueue: Простое пиратское устройство очереди на C**
```c
//
// Простая пиратская очередь
//
// Этот механизм полностью совпадает с LRU очередью и не имеет никаких средств обеспечения надежности, полагаясь на повторные попытки клиентов для поддержания работы
//
#include "czmq.h"
#define LRU_READY "\001" // Сообщение: worker готов к работе
int main (void)
{
// Подготовка контекста и сокетов
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555"); // Клиентский конец
zsocket_bind (backend, "tcp://*:5556"); // Конец worker
// Очередь доступных worker
zlist_t *workers = zlist_new ();
while (1) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Когда есть доступный worker, опросить передний конец
int rc = zmq_poll (items, zlist_size (workers) ? 2 : 1, -1);
if (rc == -1)
break; // Прервать
// Обработка сообщений worker с заднего конца
if (items [0].revents & ZMQ_POLLIN) {
// Используйте адрес worker для LRU очереди
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // Прервать
zframe_t *address = zmsg_unwrap (msg);
zlist_append (workers, address);
// Если сообщение не READY, переслать клиенту
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// Получить запрос клиента и переслать первому доступному worker
zmsg_t *msg = zmsg_recv (frontend);
if (msg) {
``````c
zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
zmsg_send (&msg, backend);
}
}
}
// Завершение программы, очистка
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
}
```Вот код worker'а, который использует службу "ленивых пиратов" и настраивает его в режиме LRU (передает сигнал "готовности" через REQ сокет):
```**spworker: Простой Pirate worker на C**
```c
//
// Простой worker в режиме "ленивых пиратов"
//
// Использует REQ сокет для подключения к tcp://*:5556 и реализует worker с использованием алгоритма LRU
//
#include "czmq.h"
#define LRU_READY "\001" // Сообщение: worker готов
int main (void)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
// Использует случайный символ для указания идентификатора сокета, что облегчает отслеживание
srandom ((unsigned) time (NULL));
char identity [10];
sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
zmq_setsockopt (worker, ZMQ_IDENTITY, identity, strlen (identity));
zsocket_connect (worker, "tcp://localhost:5556");
// Уведомляет агента о том, что worker готов
printf ("I: (%s) worker готов\n", identity);
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
int cycles = 0;
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Прерывание
// После нескольких циклов начинает имитировать различные проблемы
cycles++;
if (cycles > 3 && randof (5) == 0) {
printf ("I: (%s) имитация отказа\n", identity);
zmsg_destroy (&msg);
break;
}
else if (cycles > 3 && randof (5) == 0) {
printf ("I: (%s) имитация перегрузки CPU\n", identity);
sleep (3);
if (zctx_interrupted)
break;
}
printf ("I: (%s) нормальный ответ\n", identity);
sleep (1); // Выполняет некоторые действия
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return 0;
}
```Запустив этот пример, вы можете запустить несколько worker'ов, одного client'а и устройство очереди в любом порядке. Вы заметите, что все worker'ы рано или поздно будут либо отказываться, либо умирать, а client будет многократно пытаться повторно подключиться и в конечном итоге прекратит попытки. Устройство никогда не остановится, и вы можете свободно перезапускать worker'ы и client'ы, чтобы эта модель могла работать с любым количеством worker'ов и client'ов.### Надежная очередь (модель "параноидальных пиратов")
Модель "простых пиратов" работает очень хорошо, главным образом потому что она представляет собой сочетание двух существующих моделей. Однако, у неё есть и некоторые недостатки:
* Этот режим не может обрабатывать сбои или перезапуск очередей. Client будет повторять попытки, но worker не будет перезапущен. Хотя ZMQ автоматически восстановит соединение сокета worker, для новых запущенных устройств очереди это означает, что worker фактически отсутствует, так как он не отправил сообщение "готов". Чтобы решить эту проблему, нам нужно отправлять пульсы от устройства очереди к worker'у, чтобы тот знал, жив ли он.
* Устройство очереди не проверяет, жив ли worker, поэтому когда worker умирает в состоянии покоя, устройство очереди удалит его из очереди только после отправки запроса. В этом случае client ничего сделать не может, кроме как ждать. Это не является критической ошибкой, но всё же недостаточно хорошо. Поэтому нам нужно отправлять пульсы от worker'а к устройству очереди, чтобы последнее знало, когда worker умер.
Мы используем метод, известный как "параноидальный пират", чтобы решить вышеупомянутые две проблемы.Ранее мы использовали REQ сокеты в качестве типа сокета worker'а, но в параноидном пиратском режиме мы будем использовать DEALER сокеты, что позволит нам отправлять и принимать сообщения произвольно, а не выполнять цикл отправки-приёма, как это делает REQ сокет. Недостатком DEALER является то, что нам придётся самостоятельно управлять оболочками сообщений. Если вы не знаете, что такое оболочка, прочитайте об этом в третьей главе.
Мы всё ещё будем использовать ленивого пирата в качестве клиента, вот код устройства очереди для параноидального пирата:**ppqueue: Параноидальный пиратский режим очереди на C**
```c
//
// Параноидальный пиратский список
//
#include "czmq.h"
#define HEARTBEAT_LIVENESS 3 // Уровень живучести, значение от 3 до 5 является разумным
#define HEARTBEAT_INTERVAL 1000 // Единица измерения: миллисекунды
// Коды сообщений протокола Paranoid Pirate Protocol
#define PPP_READY "\001" // Рабочий процесс готов
#define PPP_HEARTBEAT "\002" // Сердцебиение рабочего процесса
// Используется следующая структура для представления одного действительного рабочего процесса в списке рабочих процессов
typedef struct {
zframe_t *address; // Адрес рабочего процесса
char *identity; // Печатаемое имя сокета
int64_t expiry; // Время истечения срока действия
} worker_t;
// Создание нового рабочего процесса
static worker_t *
s_worker_new(zframe_t *address)
{
worker_t *self = (worker_t *)zmalloc(sizeof(worker_t));
self->address = address;
self->identity = zframe_strdup(address);
self->expiry = zclock_time() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
return self;
}
// Уничтожение структуры рабочего процесса, включая его идентификатор
static void
s_worker_destroy(worker_t **self_p)
{
assert(self_p);
if (*self_p) {
worker_t *self = *self_p;
zframe_destroy(&self->address);
free(self->identity);
free(self);
*self_p = NULL;
}
}
// Обработка события готовности рабочего процесса, перемещение рабочего процесса в конец списка
static void
s_worker_ready(worker_t *self, zlist_t *workers)
{
worker_t *worker = (worker_t *)zlist_first(workers);
while (worker) {
if (streq(self->identity, worker->identity)) {
zlist_remove(workers, worker);
s_worker_destroy(&worker);
break;
}
worker = (worker_t *)zlist_next(workers);
}
}
``````markdown
}
zlist_append(workers, self);
}
// Возврат адреса следующего доступного рабочего процесса
static zframe_t *
s_workers_next(zlist_t *workers)
{
worker_t *worker = zlist_pop(workers);
assert(worker);
zframe_t *frame = worker->address;
worker->address = NULL;
s_worker_destroy(&worker);
return frame;
}
// Поиск и уничтожение просроченных рабочих процессов.
// Поскольку самый старый рабочий процесс находится в начале списка, поиск прекращается при первом непросроченном рабочем процессе.
static void
s_workers_purge(zlist_t *workers)
{
worker_t *worker = (worker_t *)zlist_first(workers);
while (worker) {
if (zclock_time() < worker->expiry)
break; // Рабочий процесс не просрочен, прекращаем поиск
zlist_remove(workers, worker);
s_worker_destroy(&worker);
}
worker = (worker_t *) zlist_first(workers);
}
int main(void)
{
zctx_t *ctx = zctx_new();
void *frontend = zsocket_new(ctx, ZMQ_ROUTER);
void *backend = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(frontend, "tcp://*:5555"); // Клиентский конечный пункт
zsocket_bind(backend, "tcp://*:5556"); // Рабочий конечный пункт
// Список доступных рабочих процессов
zlist_t *workers = zlist_new();
// Регулярная отправка пульса
uint64_t heartbeat_at = zclock_time() + HEARTBEAT_INTERVAL;
while (1) {
zmq_pollitem_t items[] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Когда есть доступные рабочие процессы, опрашивать передний конечный пункт
int rc = zmq_poll(items, zlist_size(workers) ? 2 : 1,
HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
// Обработка запросов от рабочего процесса
if (items[0].revents & ZMQ_POLLIN) {
``````markdown
// использовать адрес рабочего процесса для LRU маршрутизации
zmsg_t *msg = zmsg_recv(backend);
if (!msg)
break; // прерывание
// любой сигнал от рабочего процесса указывает на его активность
zframe_t *address = zmsg_unwrap(msg);
worker_t *worker = s_worker_new(address);
s_worker_ready(worker, workers);
// обработка команд управления или пересылка ответа клиенту
if (zmsg_size(msg) == 1) {
zframe_t *frame = zmsg_first(msg);
if (memcmp(zframe_data(frame), PPP_READY, 1)
|| memcmp(zframe_data(frame), PPP_HEARTBEAT, 1)) {
printf("E: invalid message from worker");
zmsg_dump(msg);
}
zmsg_destroy(&msg);
} else {
zmsg_send(&msg, frontend);
}
}
if (items[1].revents & ZMQ_POLLIN) {
// получение следующего запроса от клиента и передача его следующему доступному рабочему процессу
zmsg_t *msg = zmsg_recv(frontend);
if (!msg)
break; // прерывание
zmsg_push(msg, s_workers_next(workers));
zmsg_send(&msg, backend);
}
// отправка пульса свободным рабочим процессам
if (zclock_time() >= heartbeat_at) {
worker_t *worker = (worker_t *)zlist_first(workers);
while (worker) {
zframe_send(&worker->address, backend,
ZFRAME_REUSE + ZFRAME_MORE);
zframe_t *frame = zframe_new(PPP_HEARTBEAT, 1);
zframe_send(&frame, backend, 0);
worker = (worker_t *)zlist_next(workers);
}
heartbeat_at = zclock_time() + HEARTBEAT_INTERVAL;
}
s_workers_purge(workers);
}
// программа завершена, производится очистка
while (zlist_size(workers)) {
worker_t *worker = (worker_t *) zlist_pop(workers);
s_worker_destroy(&worker);
}
zlist_destroy(&workers);
zctx_destroy(&ctx);
return 0;
}
Этот модуль очереди расширяет LRU-модель с использованием механизма пульса, который кажется простым, но придумать такую идею довольно сложно. В следующем разделе будет более подробно рассмотрен механизм пульса.Вот код работника Paranoid Pirate на C:ppworker: Работник Paranoid Pirate на C```c // // Параноидальный пиратский worker // #include "czmq.h" #define HEARTBEAT_LIVENESS 3 // Разумное значение: 3-5 #define HEARTBEAT_INTERVAL 1000 // Единица измерения: миллисекунды #define INTERVAL_INIT 1000 // Интервал повторной попытки #define INTERVAL_MAX 32000 // Максимальное значение алгоритма отложенного повтора // Константы, определяющие стандарт параноидального пирата #define PPP_READY "\001" // Сообщение: worker готов #define PPP_HEARTBEAT "\002" // Сообщение: worker отправил пульс // Возвращает сокет, подключенный к устройству очереди параноидального пирата static void * s_worker_socket (zctx_t *ctx) { void *worker = zsocket_new (ctx, ZMQ_DEALER); zsocket_connect (worker, "tcp://localhost:5556"); // Уведомляет очередь о готовности worker printf ("I: worker готов\n"); zframe_t *frame = zframe_new (PPP_READY, 1); zframe_send (&frame, worker, 0); return worker; } int main (void) { zctx_t *ctx = zctx_new (); void *worker = s_worker_socket (ctx); // Если здоровье пульса равно нулю, это означает, что устройство очереди умерло size_t liveness = HEARTBEAT_LIVENESS; size_t interval = INTERVAL_INIT; // Регулярно отправляем пульс uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL; srandom ((unsigned) time (NULL)); int cycles = 0; while (1) { zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } }; int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC); if (rc == -1) break; // Прерывание if (items [0].revents & ZMQ_POLLIN) { // Получаем сообщение // - 3 части сообщения, штемпель + содержимое, представляет собой запрос // - 1 часть сообщения, представляет собой пульс
zmsg_t *msg = zmsg_recv(worker);
if (!msg)
break; // Прерывание
if (zmsg_size(msg) == 3) {
// Цикл нескольких слов после моделирования различных проблем
cycles++;
if (cycles > 3 && randof(5) == 0) {
printf("I: Моделирование отказа\n");
zmsg_destroy(&msg);
break;
} else
if (cycles > 3 && randof(5) == 0) {
printf("I: Моделирование перегрузки CPU\n");
sleep(3);
if (zctx_interrupted)
break;
}
printf("I: Нормальная реакция\n");
zmsg_send(&msg, worker);
}
liveness = HEARTBEAT_LIVENESS;
sleep(1); # Выполнение некоторых операций
if (zctx_interrupted)
break;
}
else
if (zmsg_size(msg) == 1) {
frame = zmsg_first(msg);
if (memcmp(zframe_data(frame), PPP_HEARTBEAT, 1) == 0)
liveness = HEARTBEAT_LIVENESS;
else {
printf("E: Недопустимое сообщение\n");
zmsg_dump(msg);
}
zmsg_destroy(&msg);
}
else {
printf("E: Недопустимое сообщение\n");
zmsg_dump(msg);
}
interval = INTERVAL_INIT;
}
else
if (--liveness == 0) {
printf("W: Отсутствие ответа на пинг, невозможно подключиться к устройству\n");
printf("W: Повторная попытка подключения через %zd миллисекунд... \n", interval);
zclock_sleep(interval);
if (interval < INTERVAL_MAX)
interval *= 2;
zsocket_destroy(ctx, worker);
worker = s_worker_socket(ctx);
liveness = HEARTBEAT_LIVENESS;
}
# Время от времени отправляем пинг устройству
if (zclock_time() > heartbeat_at) {
heartbeat_at = zclock_time() + HEARTBEAT_INTERVAL;
printf("I: worker отправил пинг\n");
frame = zframe_new(PPP_HEARTBEAT, 1);
zframe_send(&frame, worker, 0);
}
}
zctx_destroy(&ctx);
return 0;
}
Несколько пояснений:
msg
не получено, программа прерывается.Попробуйте выполнить следующий код, чтобы пройти через процесс:
ppqueue &
for i in 1 2 3 4; do
ppworker &
sleep 1
done
lpclient &
Вы заметите, как worker один за другим выходят из строя, а клиент отказывается после нескольких попыток. Вы можете остановить и перезапустить устройство очереди, клиент и worker последовательно восстановят соединение и правильно отправят, обработают и получат запросы, сохранив порядок. Таким образом, весь процесс коммуникации имеет две возможности: успешное взаимодействие или окончательный отказ клиента.
Сердцебиение не является запросом-ответом; они асинхронно передаются между узлами, и любой узел может использовать его для определения того, что другой узел умер, и прекратить коммуникацию.
Если узел использует устойчивый сокет (то есть установил флаг сокета), это означает, что сердцебиения могут накапливаться и будут получены вместе при перезапуске. Поэтому worker не должен использовать устойчивые сокеты. Пример кода использует устойчивые сокеты для удобства отладки, и в коде используются случайные идентификаторы сокета, чтобы избежать повторного использования предыдущих идентификаторов.
При использовании сердцебиения следует запустить его до начала обработки сообщений. Вам нужно гарантировать, что при запуске любого узла сердцебиение будет корректно выполняться. Остановите и перезапустите их, чтобы тестировать заморозку, аварии и другие ситуации.* Если ваш основной цикл использует zmq_poll()
, вам следует использовать другой таймер для триггеринга сердцебиения. Не используйте основной цикл для управления отправкой сердцебиений, это может привести к чрезмерному количеству отправок сердцебиений (блокировка сети) или слишком малому количеству (что приведет к отключению узлов). Библиотека zhelpers предоставляет функцию s_clock()
, которая возвращает текущее время системы в миллисекундах, которую можно использовать для контроля интервалов отправки сердцебиений. Пример кода на C:```c
int64_t now = s_clock();
// Regular heartbeat sending
uint64_t heartbeat_at = s_clock() + HEARTBEAT_INTERVAL;
while (1) {
…
zmq_poll(items, 1, HEARTBEAT_INTERVAL * 1000);
…
// Regardless of the behavior of zmq_poll, use the following logic to determine the need for sending a heartbeat
if (s_clock() > heartbeat_at) {
… Send heartbeat to all nodes
// Set the time for the next heartbeat
heartbeat_at = s_clock() + HEARTBEAT_INTERVAL;
}
}
```
* The main loop should use the heartbeat interval as the wait time. Clearly, using an infinite wait time is unacceptable, and a value smaller than the heartbeat interval will only lead to unnecessary loop iterations.
* Use a simple tracking method, such as logging information to the console. Here are some tracking tips: use the zmsg() function to print the contents of the socket; assign message numbers to check intervals between them.
* In real applications, the heartbeat should be configurable and synchronized with nodes. Some nodes require frequent heartbeats, for example, every 10 milliseconds, while others may require heartbeats every 30 seconds.
* If you want to send heartbeats at different frequencies to different nodes, set the poll wait time to the minimum heartbeat interval.* Возможно, вы захотите использовать отдельный сокет для управления пульсами, что может показаться хорошей идеей, так как это позволит отделить синхронные запросы-ответы от асинхронных пульсов. Однако, эта идея имеет недостатки: во-первых, отправка данных не требует отправки пульсов; во-вторых, сокет может заблокироваться из-за сетевых проблем, и вам потребуется знать, почему сокет для отправки данных перестал отвечать — из-за смерти или из-за чрезмерной нагрузки. В этом случае вам потребуется отправлять пульсы для этого сокета. В-третьих, управление двумя сокетами намного сложнее, чем управление одним.* Мы не настроили пульсы от клиента до очереди, так как это слишком сложно и не имеет большой ценности.
### Соглашения и протоколы
Возможно, вы заметили, что из-за механизма пульсов режимы параноидального пирата и простого пирата не совместимы.
На самом деле, здесь нам следует написать протокол. Возможно, в тестовом режиме протокол не требуется, но он необходим для реальных приложений. Что если мы хотим писать worker на другом языке? Будем ли мы вынуждены просматривать исходный код для понимания процесса коммуникации? Что если мы хотим изменить протокол? Нормы могут быть простыми, но они не очевидны. Чем успешнее протокол, тем сложнее он становится. Программа, которая не имеет соглашений, обязательно будет неразрешимой, поэтому давайте создадим спецификацию для этого протокола. Как это сделать?
* На вики-странице [rfc.zeromq.org](http://rfc.zeromq.org/) специально выделена страница для хранения протоколов ZMQ.
* Чтобы создать новый протокол, вам нужно зарегистрироваться и следовать инструкциям. Процесс прост, но не каждый может писать техническ型转换为俄语的翻译结果如下:
* Мы не настроили пульсы от клиента до очереди, так как это слишком сложно и не имеет большой ценности.
### Соглашения и протоколы
Возможно, вы заметили, что из-за механизма пульсов режимы параноидального пирата и простого пирата не совместимы.
На самом деле, здесь нам следует написать протокол. Возможно, в тестовом режиме протокол не требуется, но он необходим для реальных приложений. Что если мы хотим писать worker на другом языке? Будем ли мы вынуждены просматривать исходный код для понимания процесса коммуникации? Что если мы хотим изменить протокол? Нормы могут быть простыми, но они не очевидны. Чем успешнее протокол, тем сложнее он становится. Программа, которая не имеет соглашений, обязательно будет неразрешимой, поэтому давайте создадим спецификацию для этого протокола. Как это сделать?
* На вики-странице [rfc.zeromq.org](http://rfc.zeromq.org/) специально выделена страница для хранения протоколов ZMQ.
* Чтобы создать новый протокол, вам нужно зарегистрироваться и следовать инструкциям. Процесс прост, но не каждый может писать техническую документацию.
Мне потребовалось около 15 минут, чтобы набросать спецификацию [Пиратского протокола PPP](http://rfc.zeromq.org/spec:6), которая, несмотря на небольшие размеры, содержит все необходимые элементы.Чтобы использовать протокол PPP в реальных условиях, вам также потребуется:
* Включить номер версии в команду READY, чтобы безопасно добавлять новые версии PPP в будущем.
* В настоящее время сигналы READY и HEARTBEAT не содержат информации о том, являются ли они запросами или ответами. Для их различия требуется новая структура сообщений, содержащая информацию о "типе сообщения".
### Услуги надежной очереди (Управляющий режим)
В мире все меняется мгновенно, и когда мы ждем лучшего протокола для решения проблем предыдущего раздела, уже кто-то его разработал:
* http://rfc.zeromq.org/spec:7
Эта спецификация занимает всего одну страницу и делает протокол PPP более надежным. При проектировании сложных архитектур следует начинать с определения соглашений, а затем реализовывать их программным обеспечением.
Управляющий режим (MDP) вводит интересный механизм при расширении PPP: каждый запрос клиента имеет "название услуги", а worker при регистрации в очереди должен указать свой тип услуги. Преимуществом MDP является то, что он основан на реальном программировании, прост в использовании и легко масштабируется.
Введение механизма "название услуги" является простым дополнением к пиратской очереди, и результатом является превращение ее в агентство услуг.
Перед внедрением управляющего режима нам нужно создать фреймворк для клиента и worker. Если программист может реализовать этот режим через простой API, нет необходимости заставлять его знать детали протокола MDP и его реализации.
Поэтому наш первый протокол (MDP) определяет, как узлы взаимодействуют в распределённой архитектуре, а второй протокол должен определить, как приложения должны использовать этот протокол через фреймворк.
Управляющий режим имеет два конца: клиентский и серверный. Поскольку нам нужно создать фреймворк для обоих клиентов и worker, нам нужно предоставить две API. Ниже приведён пример API для клиента, разработанный с использованием простого объектно-ориентированного подхода и библиотеки [ZFL](http://zfl.zeromq.org/page:read-the-manual) на C.```c
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
```
Это очень просто. Мы создаём сессию для взаимодействия с брокером, отправляем и получаем запрос, а затем закрываем соединение. Ниже представлено начальное состояние API для worker-конечной точки.
```c
mdwrk_t *mdwrk_new (char *broker, char *service);
void mdwrk_destroy (mdwrk_t **self_p);
zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);
```
Две вышеуказанные части кода выглядят похожими, но API для worker-конечной точки имеет небольшие отличия. После первого вызова recv() в worker передаётся пустой ответ, а затем передаются текущие ответы и получены новые запросы.Оба API легко реализовать, достаточно внести изменения в код для режима "пирата". Ниже представлено API для клиента:**mdcliapi: Majordomo клиентский API на C**
```c
/* =====================================================================
mdcliapi.c
Majordomo Protocol Client API
Реализует спецификацию MDP/Worker по адресу http://rfc.zeromq.org/spec:7.
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
Этот файл является частью руководства по ZeroMQ: http://zguide.zeromq.org
Это свободное программное обеспечение; вы можете распространять его и/или изменять его в соответствии с
условиями Lesser General Public License, опубликованными Free Software Foundation; либо версию 3 лицензии,
либо (по вашему выбору) любую более позднюю версию.
Это программное обеспечение распространяется в надежде, что оно будет полезным, но
БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ; даже без скрытых гарантий MERCHANTABILITY или FITNESS FOR A PARTICULAR PURPOSE. Смотрите GNU
Lesser General Public License для получения дополнительной информации.
Вы должны были получить копию Lesser General Public License вместе с этим программным обеспечением.
Если нет, смотрите <http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "mdcliapi.h"
// Класс структура
// Мы будем использовать методы членов для доступа к этим свойствам
struct _mdcli_t {
zctx_t *ctx; // Контекст
char *broker;
void *client; // Соединение с брокером
int verbose; // Использование стандартного вывода для отображения текущей активности
int timeout; // Время ожидания запроса
};
``````c
int retries; // количество попыток повторного выполнения запроса
};
// ---------------------------------------------------------------------
// Подключение или повторное подключение к агенту
void s_mdcli_connect_to_broker(mdcli_t *self)
{
if (self->client)
zsocket_destroy(self->ctx, self->client);
self->client = zsocket_new(self->ctx, ZMQ_REQ);
zmq_connect(self->client, self->broker);
if (self->verbose)
zclock_log("I: Подключаюсь к агенту %s... ", self->broker);
}
// ---------------------------------------------------------------------
// Конструктор
mdcli_t *mdcli_new(char *broker, int verbose)
{
assert(broker);
mdcli_t *self = (mdcli_t *)zmalloc(sizeof(mdcli_t));
self->ctx = zctx_new();
self->broker = strdup(broker);
self->verbose = verbose;
self->timeout = 2500; // миллисекунды
}
``````markdown
self->retries = 3; // количество попыток
s_mdcli_connect_to_broker(self);
return self;
}
#---------------------------------------------------------------------------------
// Деструктор
void
mdcli_destroy(mdcli_t **self_p)
{
assert(self_p);
if (*self_p) {
mdcli_t *self = *self_p;
zctx_destroy(&self->ctx);
free(self->broker);
free(self);
*self_p = NULL;
}
}
#---------------------------------------------------------------------------------
// Установка времени ожидания запроса
void
mdcli_set_timeout(mdcli_t *self, int timeout)
{
assert(self);
self->timeout = timeout;
}
#---------------------------------------------------------------------------------
// Установка количества попыток запроса
void
mdcli_set_retries(mdcli_t *self, int retries)
{
assert(self);
self->retries = retries;
}
#---------------------------------------------------------------------------------
// Отправка запроса агенту и попытка получения ответа;
// Сохранение права собственности на сообщение, его удаление после отправки;
``````markdown
// Возврат ответного сообщения или NULL.
zmsg_t *
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
assert (self);
assert (request_p);
zmsg_t *request = *request_p;
// Обертка сообщения в протокольный префикс
// Frame 1: "MDPCxy" (шесть байтов, MDP/Client x.y)
// Frame 2: имя сервиса (печатаемая строка)
zmsg_pushstr (request, service);
zmsg_pushstr (request, MDPC_CLIENT);
if (self->verbose) {
zclock_log ("I: Отправка запроса сервису '%s':", service);
zmsg_dump (request);
}
int retries_left = self->retries;
while (retries_left && !zctx_interrupted) {
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->client);
while (TRUE) {
// Опрос сокета для получения ответа с таймаутом
zmq_pollitem_t items [] = {
{ self->client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
// Обработка полученного ответа
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->client);
if (self->verbose) {
zclock_log ("I: получен ответ:");
zmsg_dump (msg);
}
// Не пытайтесь обрабатывать ошибки, просто верните ошибку
assert(zmsg_size(msg) >= 3);
zframe_t *header = zmsg_pop(msg);
assert(zframe_streq(header, MDPC_CLIENT));
zframe_destroy(&header);
zframe_t *reply_service = zmsg_pop(msg);
assert(zframe_streq(reply_service, service));
zframe_destroy(&reply_service);
zmsg_destroy(&request);
return msg; // Успешно
}
else
if (--retries_left) {
if (self->verbose)
zclock_log("W: нет ответа, повторное подключение...");
}
}
}
}
```
```markdown
// Повторное подключение и отправка сообщения
s_mdcli_connect_to_broker(self);
zmsg_t *msg = zmsg_dup(request);
zmsg_send(&msg, self->client);
}
else {
if (self->verbose)
zclock_log("W: произошла серьезная ошибка, отмена повторной попытки. ");
break; // Отмена
}
}
}
if (zctx_interrupted)
printf("W: получено сообщение об остановке, завершение процесса client. . . \n");
zmsg_destroy(&request);
return NULL;
}
```
Ниже приведен тестовый программный код, который выполняет 100 000 запросов-ответов:
```**mdclient: Приложение клиента для протокола Majordomo на C**
```c
//
// Пример использования клиента для протокола Majordomo
// Использует API mdcli для скрытия внутренней реализации протокола
//
// Давайте скомпилируем этот код напрямую, без создания библиотеки
#include "mdcliapi.c"
int main(int argc, char *argv[])
{
int verbose = (argc > 1 && streq(argv[1], "-v"));
mdcli_t *session = mdcli_new("tcp://localhost:5555", verbose);
int count;
for (count = 0; count < 100000; count++)
{
zmsg_t *request = zmsg_new();
zmsg_pushstr(request, "Hello world");
zmsg_t *reply = mdcli_send(session, "echo", &request);
if (reply)
zmsg_destroy(&reply);
else
break; // Остановка выполнения
}
printf("Обработано %d запросов-ответов\n", count);
mdcli_destroy(&session);
return 0;
}
```
Вот API для worker'а:**mdwrkapi: API для работника Majordomo на C**
```c
/* =====================================================================
mdwrkapi.c
Majordomo Protocol Worker API
Реализует спецификацию MDP/Worker по адресу http://rfc.zeromq.org/spec:7.
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
Этот файл является частью руководства по ZeroMQ: http://zguide.zeromq.org
Это свободное программное обеспечение; вы можете распространять его и/или модифицировать его в соответствии с
условиями Lesser General Public License, опубликованными Free Software Foundation; либо версией 3 лицензии, либо (по вашему выбору) любой более поздней версией.
Это программное обеспечение распространяется в надежде, что оно будет полезным, но
БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ; даже без скрытых гарантий
пригодности для продажи или пригодности для определенного назначения. Смотрите Lesser General Public License для более подробной информации.
У вас должна быть копия Lesser General Public License вместе с этой программой. Если нет, то посмотрите
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "mdwrkapi.h"
// Параметры надежности
#define HEARTBEAT_LIVENESS 3 // Разумное значение: 3-5
// Структура класса
// Используются члены класса для доступа к свойствам
struct _mdwrk_t {
zctx_t *ctx; // Контекст
char *broker;
char *service;
void *worker; // Соединение с агентом
int verbose; // Использование стандартного вывода для отображения активности
};
``````c
// Настройки для сердцебиения
uint64_t heartbeat_at; // Время отправки сердцебиения
size_t liveness; // Количество попыток
int heartbeat; // Задержка сердцебиения, единицы: миллисекунды
int reconnect; // Задержка повторного подключения, единицы: миллисекунды
// Внутреннее состояние
int expect_reply; // Начальное значение: 0
// Адрес ответа, если он существует
zframe_t *reply_to;
};
// ---------------------------------------------------------------------
// Отправка сообщения агенту
// Если сообщение не предоставлено, создается внутренне
static void
s_mdwrk_send_to_broker(mdwrk_t *self, char *command, char *option, zmsg_t *msg)
{
msg = msg ? zmsg_dup(msg) : zmsg_new();
// Протокольная оболочка помещается в начало сообщения
if (option)
zmsg_pushstr(msg, option);
}
```
```markdown
zmsg_pushstr(msg, command);
zmsg_pushstr(msg, MDPW_WORKER);
zmsg_pushstr(msg, "");
if (self->verbose) {
zclock_log("I: отправка %s брокеру", mdps_commands[(int)*command]);
zmsg_dump(msg);
}
zmsg_send(&msg, self->worker);
}
// ---------------------------------------------------------------------
// Подключение или повторное подключение к брокеру
void s_mdwrk_connect_to_broker(mdwrk_t *self)
{
if (self->worker)
zsocket_destroy(self->ctx, self->worker);
self->worker = zsocket_new(self->ctx, ZMQ_DEALER);
zmq_connect(self->worker, self->broker);
if (self->verbose)
zclock_log("I: подключение к брокеру %s. . . ", self->broker);
// Регистрация типа услуги у брокера
s_mdwrk_send_to_broker(self, MDPW_READY, self->service, NULL);
// Когда уровень живучести равен нулю, это означает, что соединение с брокером прервано
self->liveness = HEARTBEAT_LIVENESS;
self->heartbeat_at = zclock_time() + self->heartbeat;
}
``````markdown
// ---------------------------------------------------------------------
// Конструктор
mdwrk_t *mdwrk_new(char *broker, char *service, int verbose)
{
assert(broker);
assert(service);
mdwrk_t *self = (mdwrk_t *)zmalloc(sizeof(mdwrk_t));
self->ctx = zctx_new();
self->broker = strdup(broker);
self->service = strdup(service);
self->verbose = verbose;
self->heartbeat = 2500; // миллисекунды
self->reconnect = 2500; // миллисекунды
s_mdwrk_connect_to_broker(self);
return self;
}
// ---------------------------------------------------------------------
// Деструктор
void mdwrk_destroy(mdwrk_t **self_p)
{
assert(self_p);
if (*self_p) {
mdwrk_t *self = *self_p;
zctx_destroy(&self->ctx);
free(self->broker);
free(self->service);
free(self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Установка времени таймаута для сердцебиения
void mdwrk_set_heartbeat(mdwrk_t *self, int heartbeat)
{
self->heartbeat = heartbeat;
}
// ---------------------------------------------------------------------
// Установка времени таймаута для повторного подключения
void mdwrk_set_reconnect(mdwrk_t *self, int reconnect)
{
self->reconnect = reconnect;
}
// ---------------------------------------------------------------------
// Отправка ответа брокеру, если есть, и ожидание новых запросов
zmsg_t *
mdwrk_recv(mdwrk_t *self, zmsg_t **reply_p)
{
// Форматируем и отправляем запрос полученного ответа
assert(reply_p);
zmsg_t *reply = *reply_p;
assert(reply || !self->expect_reply);
if (reply) {
assert(self->reply_to);
zmsg_wrap(reply, self->reply_to);
s_mdwrk_send_to_broker(self, MDPW_REPLY, NULL, reply);
zmsg_destroy(reply_p);
}
self->expect_reply = 1;
while (TRUE) {
zmq_pollitem_t items[] = {
{ self->worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll(items, 1, self->heartbeat * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
if (items[0].revents & ZMQ_POLLIN) {
```
```c
zmsg_t *msg = zmsg_recv(self->worker);
if (!msg)
break; // Прерывание
if (self->verbose) {
zclock_log("I: Получено сообщение от агента:");
zmsg_dump(msg);
}
self->liveness = HEARTBEAT_LIVENESS;
// Не следует обрабатывать ошибки, достаточно просто выдать сообщение об ошибке
assert(zmsg_size(msg) >= 3);
zframe_t *empty = zmsg_pop(msg);
assert(zframe_streq(empty, ""));
zframe_destroy(&empty);
zframe_t *header = zmsg_pop(msg);
assert(zframe_streq(header, MDPW_WORKER));
zframe_destroy(&header);
zframe_t *command = zmsg_pop(msg);
if (zframe_streq(command, MDPW_REQUEST)) {
// Здесь необходимо сохранить все адреса перед пустым фреймом в сообщении,
// но здесь мы временно сохраняем только один
self->reply_to = zmsg_unwrap(msg);
zframe_destroy(&command);
return msg; // Обработка запроса
} else
if (zframe_streq(command, MDPW_HEARTBEAT))
; // Не требуется никакой обработки для пульса
else
if (zframe_streq(command, MDPW_DISCONNECT))
s_mdwrk_connect_to_broker(self);
else {
zclock_log("E: Недопустимое сообщение");
zmsg_dump(msg);
}
zframe_destroy(&command);
zmsg_destroy(&msg);
} else
if (--self->liveness == 0) {
if (self->verbose)
zclock_log("W: Соединение с агентом потеряно - повторная попытка...");
}
```
```markdown
zclock_sleep(self->reconnect);
s_mdwrk_connect_to_broker(self);
}
// Время от времени отправляем сообщение
if (zclock_time() > self->heartbeat_at) {
s_mdwrk_send_to_broker(self, MDPW_HEARTBEAT, NULL, NULL);
}
``````c
self->heartbeat_at = zclock_time() + self->heartbeat;
}
}
if (zctx_interrupted)
printf("W: Получено сообщение об остановке, завершение работы worker... \n");
return NULL;
}
```
Ниже приведен тестовый скрипт, реализующий службу с именем echo:
```**mdworker: Пример приложения работника на C**
```c
//
// Пример работы с протоколом Majordomo
// Использует API mdwrk для скрытия внутренней реализации протокола MDP
//
// Давайте просто скомпилируем этот код, не создавая библиотеки
#include "mdwrkapi.c"
int main(int argc, char *argv[])
{
int verbose = (argc > 1 && streq(argv[1], "-v"));
mdwrk_t *session = mdwrk_new(
"tcp://localhost:5555", "echo", verbose);
zmsg_t *reply = NULL;
while (1) {
zmsg_t *request = mdwrk_recv(session, &reply);
if (request == NULL)
break; // Работник был остановлен
reply = request; // Эхо-сервис... на самом деле довольно сложный :)
}
mdwrk_destroy(&session);
return 0;
}
```
Несколько пояснений:
* API является однопоточным, поэтому работник не отправляет пакеты сердцебиения в фоновом режиме, что соответствует нашему ожиданию: если программа работника прекращает работу, сердцебиение также прекращается, и агент перестает отправлять новые запросы этому работнику.
* В API работника нет настроек алгоритмов повторной попытки, так как это не стоит затрат для использования такой сложной механики.
* API не предоставляет механизмов отслеживания ошибок, если возникают проблемы, они приводят к прерыванию программы (или исключению, в зависимости от языка). Такой подход полезен для экспериментальной разработки, так как позволяет сразу видеть результат выполнения. Однако в реальных условиях API должен быть достаточно надежным и правильно обрабатывать нелегальные сообщения.Может возникнуть вопрос, почему API работника закрывает его сокет и открывает новый? Особенно учитывая, что ZMQ имеет механизмы автоматического восстановления соединения после возврата узла. Мы можем обратиться к примерам простого пирата и параноика для лучшего понимания. ZMQ действительно осуществляет автоматическое восстановление соединения, но если агент умирает и восстанавливается, работник не регистрируется заново. Для решения этой проблемы существуют два варианта: первый, который мы используем здесь, заключается в том, чтобы работник закрывал свой сокет и начинал все сначала, когда он определяет, что агент умер. Второй вариант состоит в том, чтобы агент требовал от неизвестного работника регистрации его типа услуги при получении неизвестного пульса, что требует специального правила в протоколе.Теперь давайте рассмотрим агента протокола Majordomo, основной код которого представляет собой набор очередей, каждая из которых соответствует определённому типу услуги. Мы будем создавать соответствующие очереди при появлении работников (и удалять их при исчезновении работников, хотя мы пока не будем этим заниматься). Кроме того, мы будем поддерживать список работников для каждой услуги.
Чтобы сделать код на C более читаемым и удобным для написания, я использовал контейнеры для хеш-таблиц и связанных списков из проекта [ZFL](http://zfl.zeromq.org), названные соответственно [zhash](https://github.com/imatix/zguide/blob/master/examples/C/zhash.h) и [zlist](https://github.com/imatix/zguide/blob/master/examples/C/zlist.h). При использовании современного языка программирования можно воспользоваться встроенными контейнерами.**mdbroker: Majordomo брокер на C**
```c
//
// Majordomo протокол агент
// Минимальная реализация протоколов http://rfc.zeromq.org/spec:7 и spec:8
//
#include "czmq.h"
#include "mdp.h"
// Обычно мы получаем следующие значения из конфигурационного файла
#define HEARTBEAT_LIVENESS 3 // Разумное значение: 3-5
#define HEARTBEAT_INTERVAL 2500 // Единица измерения: миллисекунды
#define HEARTBEAT_EXPIRY HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
// Определение агента
typedef struct {
zctx_t *ctx; // Контекст
void *socket; // Сокет для соединения с клиентами и работниками
int verbose; // Использование стандартного вывода для отображения информации о работе
char *endpoint; // Конечная точка, к которой привязывается агент
zhash_t *services; // Хэш-таблица известных услуг
zhash_t *workers; // Хэш-таблица известных работников
zlist_t *waiting; // Список ожидающих работников
uint64_t heartbeat_at; // Время отправки пульса
} broker_t;
// Определение услуги
typedef struct {
char *name; // Название услуги
zlist_t *requests; // Список запросов от клиентов
zlist_t *waiting; // Список ожидающих работников
size_t workers; // Количество доступных работников
} service_t;
// Определение работника, состояние - свободен или занят
typedef struct {
char *identity; // Идентификатор работника
zframe_t *address; // Адресный фрейм
service_t *service; // Услуга, к которой принадлежит работник
int64_t expiry; // Время истечения с момента последнего пульса
} worker_t;
```// ---------------------------------------------------------------------
// Функции, используемые агентом
static broker_t *
s_broker_new (int verbose);
static void
s_broker_destroy (broker_t **self_p);
static void
s_broker_bind (broker_t *self, char *endpoint);
static void
s_broker_purge_workers (broker_t *self);
// Функции, используемые услугой
static service_t *
s_service_require (broker_t *self, zframe_t *service_frame);
static void
s_service_destroy (void *argument);
static void
s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg);
static void
s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg);
// Функции, используемые работником
static worker_t *
s_worker_require (broker_t *self, zframe_t *address);
static void
s_worker_delete (broker_t *self, worker_t *worker, int disconnect);
static void
s_worker_destroy (void *argument);
static void
s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
s_worker_send (broker_t *self, worker_t *worker, char *command, char *option, zmsg_t *msg);
static void
s_worker_waiting (broker_t *self, worker_t *worker);
// Функции, используемые клиентами
static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
// ---------------------------------------------------------------------
// Основная программа
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
broker_t *self = s_broker_new (verbose);
s_broker_bind (self, "tcp://*:5555");
// Принимает и обрабатывает сообщения до тех пор, пока программа не будет завершена
while (TRUE) {
zmq_pollitem_t items [] = {
{ self->socket, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
// Обрабатывает следующее входящее сообщение, если есть
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->socket);
if (!msg)
break; // Прерывание
}
}
}```markdown
if (self->verbose) {
zclock_log ("I: Received message:");
zmsg_dump (msg);
}
zframe_t *sender = zmsg_pop (msg);
zframe_t *empty = zmsg_pop (msg);
zframe_t *header = zmsg_pop (msg);
if (zframe_streq (header, MDPC_CLIENT)) {
s_client_process (self, sender, msg);
} else
if (zframe_streq (header, MDPW_WORKER)) {
s_worker_process (self, sender, msg);
} else {
zclock_log ("E: Invalid message:");
zmsg_dump (msg);
zmsg_destroy (&msg);
}
zframe_destroy (&sender);
zframe_destroy (&empty);
zframe_destroy (&header);
}
// Disconnects and removes expired workers
// Periodically sends a heartbeat to the worker
if (zclock_time () > self->heartbeat_at) {
s_broker_purge_workers (self);
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker) {
s_worker_send (self, worker, MDPW_HEARTBEAT, NULL, NULL);
worker = (worker_t *) zlist_next (self->waiting);
}
}
self->heartbeat_at = zclock_time() + HEARTBEAT_INTERVAL;
}
if (zctx_interrupted) {
printf("W: Received interrupt signal, shutting down... \n");
s_broker_destroy(&self);
return 0;
}
}
// ---------------------------------------------------------------------
// Constructor for the agent object
static broker_t *
s_broker_new(int verbose)
{
broker_t *self = (broker_t *) zmalloc(sizeof(broker_t));
// Initializes the agent's state
self->ctx = zctx_new();
self->socket = zsocket_new(self->ctx, ZMQ_ROUTER);
self->verbose = verbose;
self->services = zhash_new();
self->workers = zhash_new();
self->waiting = zlist_new();
self->heartbeat_at = zclock_time() + HEARTBEAT_INTERVAL;
return self;
}
// ---------------------------------------------------------------------
``````markdown
// Деструктор объекта агента
static void
s_broker_destroy(broker_t **self_p)
{
assert(self_p);
if (*self_p) {
broker_t *self = *self_p;
zctx_destroy(&self->ctx);
zhash_destroy(&self->services);
zhash_destroy(&self->workers);
zlist_destroy(&self->waiting);
free(self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Привязка агентского сокета к конечной точке, можно вызывать эту функцию несколько раз
// Мы используем один сокет для одновременного обслуживания клиентов и рабочих процессов
void
s_broker_bind(broker_t *self, char *endpoint)
{
zsocket_bind(self->socket, endpoint);
zclock_log("I: MDP агент/0.1.1 активен по адресу %s", endpoint);
}
// ---------------------------------------------------------------------
// Удаление просроченных рабочих процессов в состоянии ожидания
static void
s_broker_purge_workers(broker_t *self)
{
worker_t *worker = (worker_t *) zlist_first(self->waiting);
while (worker) {
if (zclock_time() < worker->expiry)
continue; // Этот рабочий процесс еще не истек, продолжаем поиск
if (self->verbose)
zclock_log("I: Удаление просроченного рабочего процесса: %s",
worker->identity);
s_worker_delete(self, worker, 0);
worker = (worker_t *) zlist_first(self->waiting);
}
}
// ---------------------------------------------------------------------
// Поиск или создание нового элемента услуги
static service_t *
s_service_require(broker_t *self, zframe_t *service_frame)
{
assert(service_frame);
}
```
char *name = zframe_strdup(service_frame);
service_t *service = (service_t *) zhash_lookup(self->services, name);
if (service == NULL) {
service = (service_t *) zmalloc(sizeof(service_t));
service->name = name;
service->requests = zlist_new();
service->waiting = zlist_new();
zhash_insert(self->services, name, service);
zhash_freefn(self->services, name, s_service_destroy);
if (self->verbose)
zclock_log("I: Received message:");
} else {
free(name);
}
return service;
}
// ---------------------------------------------------------------------
// Уничтожает объект сервиса при удалении его из broker->services
static void
s_service_destroy(void *argument)
{
service_t *service = (service_t *) argument;
// Уничтожает все элементы в очереди запросов
while (zlist_size(service->requests)) {
zmsg_t *msg = zlist_pop(service->requests);
zmsg_destroy(&msg);
}
zlist_destroy(&service->requests);
zlist_destroy(&service->waiting);
free(service->name);
free(service);
}
// ---------------------------------------------------------------------
// Распределяет запросы между ожидающими работами, если это возможно
static void
s_service_dispatch(broker_t *self, service_t *service, zmsg_t *msg)
{
assert(service);
if (msg) { // Добавляет сообщение в очередь
zlist_append(service->requests, msg);
}
s_broker_purge_workers(self);
while (zlist_size(service->waiting) && zlist_size(service->requests)) {
worker_t *worker = zlist_pop(service->waiting);
zlist_remove(self->waiting, worker);
zmsg_t *msg = zlist_pop(service->requests);
s_worker_send(self, worker, MDPW_REQUEST, NULL, msg);
zmsg_destroy(&msg);
}
}```markdown
// ---------------------------------------------------------------------
// Обрабатывает внутренние сервисы с использованием соглашения 8/MMI
static void
s_service_internal(broker_t *self, zframe_t *service_frame, zmsg_t *msg)
{
char *return_code;
if (zframe_streq(service_frame, "mmi.service")) {
char *name = zframe_strdup(zmsg_last(msg));
service_t *service = (service_t *) zhash_lookup(self->services, name);
return_code = service && service->workers ? "200" : "404";
free(name);
} else {
return_code = "501";
}
}
```markdown
zframe_reset(zmsg_last(msg), return_code, strlen(return_code));
// Удаляем и сохраняем envelope, возвращаемый клиенту, добавляем заголовок протокола и имя службы, а затем заново упаковываем envelope
zframe_t *client = zmsg_unwrap(msg);
zmsg_push(msg, zframe_dup(service_frame));
zmsg_pushstr(msg, MDPC_CLIENT);
zmsg_wrap(msg, client);
zmsg_send(&msg, self->socket);
}
// ---------------------------------------------------------------------
// Создание worker по необходимости
static worker_t *
s_worker_require(broker_t *self, zframe_t *address)
{
assert(address);
// self->workers использует идентификатор worker в качестве ключа
char *identity = zframe_strhex(address);
worker_t *worker = (worker_t *)zhash_lookup(self->workers, identity);
if (worker == NULL) {
worker = (worker_t *)zmalloc(sizeof(worker_t));
worker->identity = identity;
worker->address = zframe_dup(address);
zhash_insert(self->workers, identity, worker);
zhash_freefn(self->workers, identity, s_worker_destroy);
if (self->verbose)
zclock_log("I: Регистрация нового worker: %s", identity);
} else {
free(identity);
}
return worker;
}
// ---------------------------------------------------------------------
// Удаление worker из всех структур данных и уничтожение worker
void
s_worker_destroy(void *data)
{
worker_t *worker = (worker_t *)data;
zhash_delete(self->workers, worker->identity);
}
```
```markdown
// ---------------------------------------------------------------------
// Когда worker удаляется из broker->workers, уничтожается объект worker
static void
s_worker_destroy(void *argument)
{
worker_t *worker = (worker_t *)argument;
zframe_destroy(&worker->address);
free(worker->identity);
free(worker);
}
// ---------------------------------------------------------------------
// Обработка сообщений, отправленных worker
static void
s_worker_process(broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert(zmsg_size(msg) >= 1); // Сообщение должно содержать хотя бы одну командную рамку
zframe_t *command = zmsg_pop(msg);
char *identity = zframe_strhex(sender);
int worker_ready = (zhash_lookup(self->workers, identity) != NULL);
free(identity);
worker_t *worker = s_worker_require(self, sender);
if (zframe_streq(command, MDPW_READY)) {
// Если в очереди worker уже есть этот worker, но все же получен сигнал "готовности", удалите этого worker.
if (worker_ready)
s_worker_delete(self, worker, 1);
else
if (zframe_size(sender) >= 4 // Имя сервиса является зарезервированным сервисом
&& memcmp(zframe_data(sender), "mmi. ", 4) == 0)
s_worker_delete(self, worker, 1);
else {
// Соответствует worker сервису и устанавливает его в состояние "свободен"
zframe_t *service_frame = zmsg_pop(msg);
worker->service = s_service_require(self, service_frame);
worker->service->workers++;
s_worker_waiting(self, worker);
zframe_destroy(&service_frame);
}
}
else
if (zframe_streq(command, MDPW_REPLY)) {
if (worker_ready) {
```
```markdown
// Удаляет и сохраняет пакет для отправки клиенту, добавляет заголовок протокола и имя сервиса, а затем заново упаковывает пакет
zframe_t *client = zmsg_unwrap(msg);
zmsg_pushstr(msg, worker->service->name);
zmsg_pushstr(msg, MDPC_CLIENT);
zmsg_wrap(msg, client);
zmsg_send(&msg, self->socket);
s_worker_waiting(self, worker);
}
else
s_worker_delete(self, worker, 1);
}
else
if (zframe_streq(command, MDPW_HEARTBEAT)) {
if (worker_ready)
worker->expiry = zclock_time() + HEARTBEAT_EXPIRY;
else
s_worker_delete(self, worker, 1);
}
else
if (zframe_streq(command, MDPW_DISCONNECT))
s_worker_delete(self, worker, 0);
else {
zclock_log("E: Недопустимое сообщение");
zmsg_dump(msg);
}
free(command);
zmsg_destroy(&msg);
}
// ---------------------------------------------------------------------
// Отправка сообщения worker
// Если указатель указывает на сообщение, отправьте его, но не уничтожайте, так как это обязанность вызывающего
static void
s_worker_send(broker_t *self, worker_t *worker, char *command,
char *option, zmsg_t *msg)
{
msg = msg ? zmsg_dup(msg) : zmsg_new();
// Добавление протокольного пакета на вершину сообщения
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);
// Вставка маршрутизационной рамки в начало сообщения
zmsg_wrap (msg, zframe_dup (worker->address));
if (self->verbose) {
zclock_log ("I: Отправка сообщения для worker %s",
mdps_commands [(int) *command]);
zmsg_dump (msg);
}
zmsg_send (&msg, self->socket);
}
// ---------------------------------------------------------------------
```
```markdown
// Ожидание worker
static void
s_worker_waiting (broker_t *self, worker_t *worker)
{
// Добавление worker в очередь ожидания агента и службы
zlist_append (self->waiting, worker);
zlist_append (worker->service->waiting, worker);
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
s_service_dispatch (self, worker->service, NULL);
}
// ---------------------------------------------------------------------
// Обработка запроса от client
static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert (zmsg_size (msg) >= 2); // Название службы + содержимое запроса
zframe_t *service_frame = zmsg_pop (msg);
service_t *service = s_service_require (self, service_frame);
// Установка адреса отправителя для ответа
zmsg_wrap (msg, zframe_dup (sender));
if (zframe_size (service_frame) >= 4
&& memcmp (zframe_data (service_frame), "mmi. ", 4) == 0)
s_service_internal (self, service_frame, msg);
else
s_service_dispatch (self, service, msg);
zframe_destroy (&service_frame);
}
```
Этот пример является одним из самых сложных, которые мы видели, состоящим приблизительно из 500 строк кода. Написание этого кода и его укрепление заняло около двух дней. Однако это всего лишь часть полного агента на основе сервисов.
```Несколько пояснений:
* **Мастер-слейв протокол** требует от нас одновременного обслуживания клиентов и рабочих в одном сокете, что очень полезно для развертывания и управления агентом: он будет принимать и отправлять запросы только через один ZMQ конечный пункт, а не два.
* Агент отлично реализует содержание протокола MDP/0.1, включая механизм отключения при отправке нелегальных команд и сердцебиений.
* Этот код может быть расширен до многопоточного, где каждый поток управляет одним сокетом, группой клиентов и рабочих. Такой подход становится интересным при разделении крупных архитектур. Код на языке C уже имеет такую структуру, поэтому его легко реализовать.
* Код также может быть расширен до режима активного-активного или активного-пассивного, что повышает надежность. Поскольку агент по своей сути является бесштатным, он просто хранит информацию о существовании сервиса, клиенты и рабочие могут выбирать другой агент для связи.
* Интервал сердцебиений в примере кода составляет 5 секунд, что снижает количество выводимых данных при отладке. В реальном мире значение должно быть ниже, но процесс повторной попытки следует сделать немного длиннее, чтобы дать время для запуска сервиса, например, 10 секунд.
### Асинхронный мастер-слейвВышеупомянутый способ реализации мастер-слейв довольно прост, клиенты остаются в простом пиратском режиме, просто используя переписанный API. Я запустил программу на тестовой машине и обработал около 100 000 запросов за 14 секунд, что также зависит от кода, поскольку время копирования фреймов сообщений уменьшает время работы процессора. Но настоящая проблема заключается в том, что мы всегда обрабатываем запросы последовательно (round-trip): отправка-прием-отправка-прием... Внутри ZMQ отключен алгоритм оптимизации отправки TCP (алгоритм Nagle), но последовательная обработка все равно является неэффективной.
Теория есть теория, но она должна быть проверена практикой. Мы используем простой тестовый скрипт, чтобы проверить, действительно ли последовательная обработка занимает много времени. Этот тестовый скрипт отправляет набор сообщений, первый раз отправляя одно сообщение и получая одно, второй раз — отправляя сразу несколько и получая сразу несколько. Результаты должны быть одинаковыми, но скорость будет совершенно другой.**tripping: Round-trip демонстрация на C**
```c
//
// Round-trip симуляция
//
// В этом примере программа запускает клиент, рабочий процесс и прокси-сервер в многопоточном режиме,
// когда клиент завершает обработку, он отправляет сигнал основному процессу.
//
#include "czmq.h"
static void
client_task (void *args, zctx_t *ctx, void *pipe)
{
void *client = zsocket_new (ctx, ZMQ_DEALER);
zmq_setsockopt (client, ZMQ_IDENTITY, "C", 1);
zsocket_connect (client, "tcp://localhost:5555");
printf ("Начало тестирования... \n");
zclock_sleep (100);
int requests;
int64_t start;
printf ("Синхронный round-trip тест... \n");
start = zclock_time ();
for (requests = 0; requests < 10000; requests++) {
zstr_send (client, "hello");
char *reply = zstr_recv (client);
free (reply);
}
printf ("%d раз/с\n",
(1000 * 10000) / (int) (zclock_time () - start));
printf ("Асинхронный round-trip тест... \n");
start = zclock_time ();
for (requests = 0; requests < 100000; requests++)
zstr_send (client, "hello");
for (requests = 0; requests < 100000; requests++) {
char *reply = zstr_recv (client);
free (reply);
}
printf ("%d раз/с\n",
(1000 * 100000) / (int) (zclock_time () - start));
zstr_send (pipe, "Завершено");
}
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zmq_setsockopt (worker, ZMQ_IDENTITY, "W", 1);
zsocket_connect (worker, "tcp://localhost:5556");
while (1) {
zmsg_t *msg = zmsg_recv (worker);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
static void *
broker_task (void *args)
{
// Подготовка контекста и сокета
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555");
zsocket_bind (backend, "tcp://*:5556");
// Инициализация объекта опроса
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
``` { backend, 0, ZMQ_POLLIN, 0 }
};
while (1) {
int rc = zmq_poll(items, 2, -1);
if (rc == -1)
break; // Прерывание
if (items[0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv(frontend);
zframe_t *address = zmsg_pop(msg);
zframe_destroy(&address);
``````markdown
zmsg_pushstr(msg, "W");
zmsg_send(&msg, backend);
}
if (items[1].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv(backend);
zframe_t *address = zmsg_pop(msg);
zframe_destroy(&address);
zmsg_pushstr(msg, "C");
zmsg_send(&msg, frontend);
}
}
int main(void)
{
// Создание потока
zctx_t *ctx = zctx_new();
void *client = zthread_fork(ctx, client_task, NULL);
zthread_new(ctx, worker_task, NULL);
zthread_new(ctx, broker_task, NULL);
// Ожидание сигнала от клиентского конца трубы
char *signal = zstr_recv(client);
free(signal);
zctx_destroy(&ctx);
return 0;
}
```
```В моей среде разработки результат выполнения выглядит следующим образом:``````
Настройка теста...
Проверка однократного прохождения...
9057 вызовов/секунду
Проверка асинхронного прохождения...
173010 вызовов/секунду
```
Важно отметить, что при запуске клиент приостанавливается на некоторое время. Это происходит потому, что если сокет с указанным идентификатором не подключен к ROUTER-сокету, последний просто отбрасывает сообщение. В этом примере мы не используем алгоритм LRU, поэтому при более медленной скорости подключения worker'ов могут происходить потери данных, что влияет на результаты тестирования.
Мы можем видеть, что однократное прохождение цикла выполняется в 20 раз медленнее, чем асинхронное. Давайте применим это к паттерну менеджера.
Сначала давайте модифицируем API клиента, добавив независимые методы отправки и получения:
```
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
zmsg_t *mdcli_recv (mdcli_t *self);
```
Затем нам потребуется небольшое количество времени, чтобы преобразовать синхронный API клиента в асинхронный:
```**mdcliapi2: Асинхронный клиентский API Majordomo на C**
```c
/* =====================================================================
mdcliapi2.c
Majordomo Protocol Client API (async version)
Реализует спецификацию MDP/Worker по адресу http://rfc.zeromq.org/spec:7.
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
Этот файл является частью руководства по ZeroMQ: http://zguide.zeromq.org
Это свободное программное обеспечение; вы можете распространять его и/или
изменять его на условиях лицензии GNU Lesser General Public License,
опубликованной Free Software Foundation; либо версия 3 лицензии, либо (по
вашему выбору) любой более поздней версии.
Это программное обеспечение распространяется в надежде, что оно будет полезным,
но БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ; даже без скрытой гарантииMERCHANTABILITY или FITNESS
FOR A PARTICULAR PURPOSE. См. GNU Lesser General Public License для получения
дополнительной информации.
Вы должны были получить копию GNU Lesser General Public License вместе с этим
программным обеспечением. Если нет, см. <http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "mdcliapi2.h"
```// Класс структура
// Использование членских функций для доступа к свойствам
struct _mdcli_t {
zctx_t *ctx; // Контекст
char *broker;
void *client; // Соединение с брокером
int verbose; // Вывод состояния в стандартный вывод
int timeout; // Время ожидания запроса
};
// ---------------------------------------------------------------------
// Подключение или переподключение к брокеру
void s_mdcli_connect_to_broker(mdcli_t *self)
{
if (self->client)
zsocket_destroy(self->ctx, self->client);
self->client = zsocket_new(self->ctx, ZMQ_DEALER);
zmq_connect(self->client, self->broker);
if (self->verbose)
zclock_log("I: Подключаюсь к брокеру %s... ", self->broker);
}```markdown
// ---------------------------------------------------------------------
// Конструктор
mdcli_t *mdcli_new(char *broker, int verbose)
{
assert(broker);
mdcli_t *self = (mdcli_t *)zmalloc(sizeof(mdcli_t));
self->ctx = zctx_new();
self->broker = strdup(broker);
self->verbose = verbose;
self->timeout = 2500; // миллисекунды
s_mdcli_connect_to_broker(self);
return self;
}
```
```markdown
// ---------------------------------------------------------------------
// Деструктор
void
mdcli_destroy (mdcli_t **self_p)
{
assert (self_p);
if (*self_p) {
mdcli_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Установка времени ожидания запроса
void
mdcli_set_timeout (mdcli_t *self, int timeout)
{
assert (self);
self->timeout = timeout;
}
// ---------------------------------------------------------------------
// Отправка запроса агенту
// Получение полной собственности на запрос, его отправка и последующее удаление
int
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
assert (self);
assert (request_p);
zmsg_t *request = *request_p;
// Добавление в начало сообщения фреймов, согласно протоколу
// Frame 0: пустой (симулирует поведение REQ сокета)
// Frame 1: "MDPCxy" (6 байт, MDP/Client x.y)
// Frame 2: Имя сервиса (смотрите печатаемую строку)
zmsg_pushstr (request, service);
zmsg_pushstr (request, MDPC_CLIENT);
zmsg_pushstr (request, "");
if (self->verbose) {
zclock_log ("I: Отправка запроса сервису '%s':", service);
zmsg_dump (request);
}
zmsg_send (&request, self->client);
return 0;
}
// ---------------------------------------------------------------------
// Получение ответа на запрос, если ответ отсутствует, возвращается NULL;
// Эта функция не пытается восстановиться после сбоев агента,
``````c
// так как мы не храним информацию о запросах, которые не получили ответ,
// поэтому их нельзя повторно отправить.
zmsg_t *
mdcli_recv (mdcli_t *self)
{
assert (self);
// опрос сокета для получения ответа
zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
if (rc == -1)
return NULL; // прерывание
// обработка ответа при его получении
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->client);
if (self->verbose) {
zclock_log ("I: получен ответ:");
zmsg_dump (msg);
}
// не обрабатываем ошибки, а сразу выводим их
assert (zmsg_size (msg) >= 4);
zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
zframe_destroy (&empty);
zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPC_CLIENT));
zframe_destroy (&header);
zframe_t *service = zmsg_pop (msg);
zframe_destroy (&service);
return msg; // успех
}
if (zctx_interrupted)
printf ("W: Получено сообщение об остановке, завершение работы client...\n");
else
if (self->verbose)
zclock_log ("W: Критическая ошибка, отмена запроса");
return NULL;
}
```
## Асинхронный пример клиента Majordomo
``````c
//
// Асинхронный клиент Majordomo на C
// Использует API mdcli для скрытия реализации протокола MDP
//
#include "mdcliapi2.c"
int main(int argc, char *argv[]) {
int verbose = (argc > 1 && streq(argv[1], "-v"));
mdcli_t *session = mdcli_new("tcp://localhost:5555", verbose);
int count;
for(count = 0; count < 100000; count++) {
zmsg_t *request = zmsg_new();
zmsg_pushstr(request, "Hello world");
mdcli_send(session, "echo", &request);
}
for(count = 0; count < 100000; count++) {
zmsg_t *reply = mdcli_recv(session);
if(reply) {
zmsg_destroy(&reply);
} else {
break; // Прервать с помощью Ctrl-C
}
}
printf("Получено %d ответов\n", count);
mdcli_destroy(&session);
return 0;
}
```
Код агента и worker'а остался без изменений, так как мы не изменили протокол MDP. После модификации клиента заметна значительная ускоренная обработка запросов. В следующих таблицах показано время обработки 100 000 запросов в синхронном и асинхронном режимах:
```
$ time mdclient
100000 запросов/ответов обработано
real 0m14.088s
user 0m1.310s
sys 0m2.670s
```
Асинхронный режим:
```
$ time mdclient2
100000 ответов получено
real 0m8.730s
user 0m0.920s
sys 0m1.550s
```
Теперь создадим 10 worker'ов и посмотрим на результат:
```
$ time mdclient2
100000 ответов получено
real 0m3.863s
user 0m0.730s
sys 0m0.470s
```
Теперь создадим 10 worker'ов и посмотрим на результат:
```
$ time mdclient2
100000 ответов получено
real 0m3.863s
user 0m0.730s
sys 0m0.470s
```Из-за необходимости получения сообщений через LRU-очередь, полная асинхронность не достигается. Однако, чем больше worker'ов, тем лучше производительность. На моей тестовой машине при количестве worker'ов 8 производительность перестает расти — четырёхъядерный процессор может выполнять только столько операций. Тем не менее, мы получили почти четырёхкратное увеличение скорости, затратив всего несколько минут на модификацию. Кроме того, агент ещё не был оптимизирован, он всё ещё копирует сообщения, вместо использования нулевой копии. Тем не менее, мы достигли обработки 25 000 запросов-ответов в секунду, что уже очень хорошо.Конечно, асинхронный режим Majordomo также имеет недостатки, одним из которых является невозможность восстановления после сбоя агента. В коде mdcliapi2 нет механизма восстановления соединения, и для его восстановления требуется выполнение следующих условий:
* Каждый запрос имеет уникальный номер, а каждый ответ также содержит соответствующий номер, что требует изменения протокола для четкого определения;
* API клиента должен сохранять и отслеживать все отправленные, но ещё не получившие ответы запросы;
* Если агент прекращает работу, клиент будет повторно отправлять все сообщения.
Как можно заметить, высокая надёжность часто связана с увеличением сложности, и вопрос заключается в том, стоит ли применять эту механику в режиме управления. Это зависит от конкретной ситуации. Если это служба поиска по имени, где каждое соединение вызывается один раз, то применять эту механику не обязательно; если же это веб-сервис на переднем крае, обслуживающий тысячи клиентов, то это может потребоваться.
### Поиск услугТеперь, когда у нас есть агент, ориентированный на услуги, мы не можем узнать, предоставляет ли он определённую услугу. Если запрос завершается ошибкой, это может указывать на то, что данная услуга недоступна, но какие причины? Поэтому было бы полезно иметь возможность спросить агента: "Существует ли работающая служба echo?" Самым очевидным способом было бы добавление команды в протокол MDP/Client, позволяющей клиенту спрашивать агента о доступности определённой службы. Однако, главным преимуществом MDP/Client является его простота, и добавление функции поиска услуг могло бы сделать его слишком сложным.Другой подход заключается в использовании метода обработки электронной почты, который возвращает неудачные запросы обратно. Однако это также увеличивает сложность, так как нам нужно различать полученные сообщения — являются ли они ответами или возвращенными запросами.
Давайте воспользуемся тем же подходом, создав новую механику на основе MDP, а не изменяя его. Управление само по себе является услугой, и мы можем предлагать дополнительные услуги, такие как "деактивация услуги" или "получение данных о сервисе". Нам нужна механика, которая позволяет расширять протокол, не затрагивая его основные функции.
Таким образом, появляется небольшой RFC — MMI (ManagerInterface Interface) — прикладной уровень, построенный на протоколе MDP: http://rfc.zeromq.org/spec:8. Мы уже внедрили его в агента, возможно, вы заметили это. Ниже приведен пример кода, демонстрирующий использование этой функции поиска услуг:
**mmiecho: Поиск услуг через Majordomo на C**```c
//
// MMI echo service query example program
//
// Let's compile directly without generating a library
#include "mdcliapi.c"
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
// The name of the service we need to query
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "echo");
// Message sent to the "service query" service
zmsg_t *reply = mdcli_send (session, "mmi.service", &request);
if (reply) {
char *reply_code = zframe_strdup (zmsg_first (reply));
printf ("Query result for echo service: %s\n", reply_code);
free (reply_code);
zmsg_destroy (&reply);
}
else
printf ("E: Agent did not respond, please ensure it is running\n");
}
``` mdcli_destroy(&session);
return 0;
}
```Агент во время выполнения проверяет имя обслуживаемого сервиса и самостоятельно обрабатывает те, которые начинаются с mmi. Предыдущие запросы не будут переданы worker'ам. Вы можете запустить вышеупомянутый код без запуска worker'ов и наблюдать, будет ли программа отображать статус 200 или 404. Реализация MMI в агенте примерной программы очень проста, например, когда какой-то worker прекращает работу, этот сервис всё ещё помечается как доступный. В реальной жизни агент должен очищать сервисы, у которых нет worker'ов, через определённые промежутки времени.
### Независимые службы
Независимость означает возможность безопасно повторять выполнение определённой операции. Например, просмотр часов является независимой операцией, но занятие деньгами у жены другого человека — нет. Некоторые клиент-серверные коммуникации являются независимыми, некоторые — нет. Примеры независимых коммуникаций:
* Безостоянное выполнение задачи распределения, то есть сервер в модели "трубопровод" является безостоятельным worker'ом, его результат зависит от состояния клиента, поэтому он может повторно обрабатывать одинаковые запросы;
* Преобразование логического адреса в реальный конечный адрес в службе именования, которое можно повторно запросить несколько раз, поэтому это также независимое действие.Примеры неразрывных коммуникаций:
* Логирование, мы не хотим, чтобы одинаковые логи записывались несколько раз;
* Любые службы, которые влияют на последующие узлы, такие как отправка информации на следующий узел, если запрос повторяется, информация на следующем узле будет дублироваться;
* Когда служба изменяет общую информацию, но не настроена на независимость. Например, если служба выполняет списание со счета банка (debit), это обязательно будет неразрывной операцией.
Если приложение предоставляет неразрывную службу, следует рассмотреть, на каком этапе оно может потерпеть неудачу. Если программа прекращает работу во время простоя или обработки запроса, это не проблема. Мы можем использовать транзакции базы данных, чтобы гарантировать одновременное выполнение списания и зачисления. Если приложение прекращает работу во время отправки запроса, это уже проблема, так как для него это означает завершение работы.
Если сеть заблокирована во время возврата ответа, клиент считает, что запрос не был отправлен, и повторно отправляет его. Это приводит к повторному выполнению того же запроса на стороне сервера. Это не то, что мы хотели бы получить.
Часто используемое решение заключается в том, чтобы сервер обнаруживал и отклонял повторные запросы, для этого требуется: * Клиент должен добавлять уникальный идентификатор каждому запросу, включая идентификатор клиента и сообщения;
* Сервер должен использовать идентификатор клиента и сообщения в качестве ключа при отправке ответа и сохранять содержимое ответа;
* Когда сервер обнаруживает, что запрос уже существует в хэш-таблице ответов, он пропускает этот запрос и сразу возвращает содержимое ответа.### Оfflайн надежность (Гигант режим)
Когда вы осознаете, что режим дворецкого является очень надежным агентом сообщений, вы можете захотеть использовать диск для промежуточного хранения сообщений, чтобы еще больше повысить надежность. Хотя этот подход широко используется во многих корпоративных системах передачи сообщений, я все же немного против него по следующим причинам:
* Мы видим, что ленивый пиратский режим клиента может работать очень хорошо, функционируя в различных архитектурных средах. Единственная проблема заключается в том, что он предполагает, что worker является бесштатным и предоставляет идемпотентные услуги. Но эту проблему можно решить другими способами, не добавляя диск.* Добавление диска создает новые проблемы, требующие дополнительного управления и обслуживания. Наиболее значимым преимуществом пиратского режима является его простота и надежность. Если вы все еще беспокоитесь о возможных сбоях оборудования, вы можете перейти к режиму peer-to-peer, который будет рассмотрен в последнем разделе этой главы.Хотя есть эти причины, существует вполне обоснованная ситуация, когда использование диска для промежуточного хранения имеет смысл — асинхронная работа в офлайн сети. У пиратского режима есть проблема: клиент отправляет запрос и продолжает ждать ответа. Если клиент и worker не поддерживают постоянное соединение (можно сравнить это с электронной почтой), мы не можем установить бесштатную сеть между клиентом и worker, поэтому нам нужно сохранять состояние.
Из этого возникает Гигант режим, при котором сообщения записываются на диск, чтобы гарантировать их сохранность. Когда мы выполняем запросы службы, мы обращаемся к уровню гиганта. Гигант построен поверх режима дворецкого, а не переопределяет протокол MDP. Преимуществом такого подхода является возможность реализации надежности в конкретном worker без необходимости добавления логики агента.
* Реализация становится проще;
* Агент пишется на одном языке, а worker — на другом;
* Возможность свободного обновления этого режима.
Единственным недостатком является наличие дополнительного уровня взаимодействия между агентом и диском, но это стоит того.У нас есть множество способов реализации устойчивой архитектуры запрос-ответ, и цель, конечно, состоит в том, чтобы сделать это как можно проще. Самым простым способом, который приходит мне в голову, является предоставление услуги "гиганта" агентом, которая не влияет на работу существующего worker. Если клиент хочет немедленного ответа, он может общаться с агентом; если он не так торопится, он может общаться с гигантом: "Привет, гигант, помоги мне обработать этот запрос, я пойду купить продуктов".Таким образом, титан становится одновременно worker и клиентом. Общение между клиентом и титаном обычно выглядит следующим образом:
* Клиент: Пожалуйста, помоги мне обработать этот запрос. Титан: Хорошо.
* Клиент: Есть ли ответ для меня? Титан: Да, есть. (или нет)
* Клиент: Хорошо, ты можешь освободить этот запрос, работа завершена. Титан: Хорошо.
Общение между титаном и агентом обычно выглядит так:
* Титан: Привет, агент, у тебя есть служба echo? Агент: Да, кажется, есть.
* Титан: Привет, служба echo, помоги мне обработать этот запрос. Echo: Хорошо, вот ответ.
* Титан: Спасибо!
Вы можете представить себе некоторые сценарии с отказами и проверить, справится ли вышеупомянутая модель с ними. Если worker отказывает при обработке запроса, титан будет постоянно перезапрашивать; если ответ теряется во время передачи, титан также будет повторять попытки; если запрос был обработан, но клиент не получил ответ, он снова спросит титана; если титан отказывает при обработке запроса или отправке ответа, клиент будет повторять попытки. Как только запрос сохраняется на диске, он не будет потерян.
Эта система включает длительный процесс установления соединения, но клиент может использовать асинхронный режим управления, отправляя несколько запросов одновременно и ожидая ответов вместе.Нам нужен способ, чтобы клиент мог запрашивать содержимое ответа. Разные клиенты могут обращаться к одной и той же службе, и они могут свободно входить и выходить, имея различные идентификаторы. Простым, разумным и безопасным решением является:
* Когда титан получает запрос, он генерирует уникальный идентификатор (UUID) для каждого запроса и возвращает его клиенту;
* Клиент должен предоставить этот UUID при запросе содержимого ответа.
Таким образом, клиент должен безопасно хранить UUID. Однако это избавляет от необходимости валидировать запросы. Есть ли другие решения? Мы можем использовать постоянные сокеты, явно указывая идентификаторы сокетов клиента. Однако это создаст проблемы управления и может привести к бесконечным проблемам, если два клиента имеют одинаковые идентификаторы сокетов.
Перед тем как мы начнем разрабатывать новый протокол, давайте подумаем о том, как клиент взаимодействует с титаном. Одним из вариантов является предоставление услуги с тремя различными командами; другой вариант — предоставление трех отдельных услуг: * **titanic.request** - Сохраняет запрос и возвращает UUID.
* **titanic.reply** - Получает содержимое ответа по UUID.
* **titanic.close** - Подтверждает, что запрос был правильно обработан.
Необходимо создать многопоточный worker, как мы это делали ранее с использованием ZMQ для многопоточного программирования. Это довольно просто. Однако, перед тем как приступить к написанию кода, давайте запишем некоторые определения, связанные с режимом Гиганта: http://rfc.zeromq.org/spec:9. Мы называем его "Гигантским сервисным протоколом" или TSP.Использование протокола TSP, конечно, требует от клиента выполнения дополнительной работы. Вот пример простого, но достаточно надёжного клиента:**ticlient: Пример клиента Titanic на C**```c
//
// Пример клиента в режиме "Гигант"
// Реализует клиентскую часть протокола http://rfc.zeromq.org/spec:9
// Давайте скомпилируем прямо здесь, без создания библиотеки
#include "mdcliapi.c"
// Вызов услуги TSP
// В случае успеха возвращает ответ (код состояния: 200), иначе возвращает NULL
static zmsg_t *
s_service_call (mdcli_t *session, char *service, zmsg_t **request_p)
{
zmsg_t *reply = mdcli_send (session, service, request_p);
if (reply) {
zframe_t *status = zmsg_pop (reply);
if (zframe_streq (status, "200")) {
zframe_destroy (&status);
return reply;
}
else
if (zframe_streq (status, "400")) {
printf ("E: Клиентская ошибка, отмена запроса\n");
exit (EXIT_FAILURE);
}
else
if (zframe_streq (status, "500")) {
printf ("E: Серверная ошибка, отмена запроса\n");
exit (EXIT_FAILURE);
}
}
else
exit (EXIT_SUCCESS); // Прерывание или ошибка
zmsg_destroy (&reply);
return NULL; // Запрос не выполнен успешно, но причина не указана
}
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
// 1. Отправка запроса на сервис echo гиганту
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "echo");
zmsg_addstr (request, "Hello world");
zmsg_t *reply = s_service_call (
session, "titanic.request", &request);
zframe_t *uuid = NULL;
if (reply) {
uuid = zmsg_pop (reply);
zmsg_destroy (&reply);
zframe_print (uuid, "I: request UUID ");
}
// 2. Ожидание ответа
while (! zctx_interrupted) {
zclock_sleep (100);
request = zmsg_new ();
zmsg_add (request, zframe_dup (uuid));
zmsg_t *reply = s_service_call (
session, "titanic.reply", &request);
if (reply) {
```markdown
char *reply_string = zframe_strdup(zmsg_last(reply));
printf("Ответ: %s\n", reply_string);
free(reply_string);
zmsg_destroy(&reply);
// 3. Закрытие запроса
request = zmsg_new();
zmsg_add(request, zframe_dup(uuid));
reply = s_service_call(session, "titanic.close", &request);
zmsg_destroy(&reply);
break;
}
```
else {
printf("I: Ответ еще не получен, готовимся к повторной попытке через некоторое время...\n");
zclock_sleep(5000); // Повторная попытка через 5 секунд
}
}
zframe_destroy(&uuid);
mdcli_destroy(&session);
return 0;
}Конечно, код выше может быть интегрирован в одну платформу, где программисты не должны знать всех деталей. Если бы у меня было время, я попробовал бы написать такой API, чтобы приложение снова стало состоящим из нескольких строк кода. Эта идея согласуется с концепцией MDP: не повторяйте одно и то же.
```Вот реализация Титаника. Этот сервер использует три потока для обслуживания трёх различных сервисов. Он использует самый примитивный метод сохранения данных: создание файла на диске для каждого запроса. Хотя это просто, это также довольно пугает. Более сложной частью является то, что Титаник поддерживает очередь для хранения этих запросов, что позволяет избежать повторного сканирования директорий.**titanic: Пример брокера Titanic на C**
```c
//
// Гигантский режим - сервис
//
// Реализация серверной части протокола http://rfc.zeromq.org/spec:9
// Давайте сразу скомпилируем, не создавая библиотеки
#include "mdwrkapi.c"
#include "mdcliapi.c"
#include "zfile.h"
#include <uuid/uuid.h>
// Возвращает уникальный идентификатор (UUID) в виде строки
// Вызывающий код отвечает за освобождение памяти, выделенной для UUID
static char *
s_generate_uuid (void)
{
char hex_char[] = "0123456789ABCDEF";
char *uuidstr = zmalloc(sizeof(uuid_t) * 2 + 1);
uuid_t uuid;
uuid_generate(uuid);
int byte_nbr;
for (byte_nbr = 0; byte_nbr < sizeof(uuid_t); byte_nbr++) {
uuidstr[byte_nbr * 2 + 0] = hex_char[uuid[byte_nbr] >> 4];
uuidstr[byte_nbr * 2 + 1] = hex_char[uuid[byte_nbr] & 15];
}
return uuidstr;
}
// Создает имя файла для хранения запроса на основе UUID и возвращает его
#define TITANIC_DIR ".titanic"
static char *
s_request_filename (char *uuid)
{
char *filename = malloc(256);
snprintf(filename, 256, TITANIC_DIR "/%s.req", uuid);
return filename;
}
// Создает имя файла для хранения ответа на основе UUID и возвращает его
static char *
s_reply_filename (char *uuid)
{
char *filename = malloc(256);
snprintf(filename, 256, TITANIC_DIR "/%s.rep", uuid);
return filename;
}
// ---------------------------------------------------------------------
// Гигантский режим - сервис запросов
static void
titanic_request (void *args, zctx_t *ctx, void *pipe)
{
mdwrk_t *worker = mdwrk_new("tcp://localhost:5555", "titanic.request", 0);
zmsg_t *reply = NULL;
while (TRUE)
{
// Если ответ не пустой, отправляем его и получаем новый запрос от агента
zmsg_t *request = mdwrk_recv(worker, &reply);
if (!request)
break; // Прерываем выполнение
// Убедимся, что директория существует
file_mkdir(TITANIC_DIR);
}
}
``````markdown
// Генерируем UUID и сохраняем сообщение на диск
char *uuid = s_generate_uuid();
char *filename = s_request_filename(uuid);
FILE *file = fopen(filename, "w");
assert(file);
zmsg_save(request, file);
fclose(file);
free(filename);
zmsg_destroy(&request);
// Добавляем UUID в очередь
reply = zmsg_new();
zmsg_addstr(reply, uuid);
zmsg_send(&reply, pipe);
// Возвращаем UUID клиенту
// Этот процесс завершается функцией mdwrk_recv() в начале цикла
``````markdown
reply = zmsg_new ();
zmsg_addstr (reply, "200");
zmsg_addstr (reply, uuid);
free (uuid);
}
mdwrk_destroy (&worker);
}
// ---------------------------------------------------------------------
// Гигантский режим - Ответственный сервис
static void *
titanic_reply (void *context)
{
mdwrk_t *worker = mdwrk_new (
"tcp://localhost:5555", "titanic.reply", 0);
zmsg_t *reply = NULL;
while (TRUE) {
zmsg_t *request = mdwrk_recv (worker, &reply);
if (! request)
break; // Прервать выполнение
char *uuid = zmsg_popstr (request);
char *req_filename = s_request_filename (uuid);
char *rep_filename = s_reply_filename (uuid);
if (file_exists (rep_filename)) {
FILE *file = fopen (rep_filename, "r");
assert (file);
reply = zmsg_load (file);
zmsg_pushstr (reply, "200");
fclose (file);
}
else {
reply = zmsg_new ();
if (file_exists (req_filename))
zmsg_pushstr (reply, "300"); // Ожидание
else
zmsg_pushstr (reply, "400"); // Неизвестно
}
zmsg_destroy (&request);
free (uuid);
free (req_filename);
free (rep_filename);
}
mdwrk_destroy (&worker);
return 0;
}
// ---------------------------------------------------------------------
// Гигантский режим - Запрос закрытия
static void *
titanic_close (void *context)
{
mdwrk_t *worker = mdwrk_new (
``````markdown
"tcp://localhost:5555", "titanic.close", 0);
zmsg_t *reply = NULL;
while (TRUE) {
zmsg_t *request = mdwrk_recv(worker, &reply);
if (!request)
break; // Прервать выполнение
char *uuid = zmsg_popstr(request);
char *req_filename = s_request_filename(uuid);
char *rep_filename = s_reply_filename(uuid);
file_delete(req_filename);
file_delete(rep_filename);
free(uuid);
free(req_filename);
free(rep_filename);
zmsg_destroy(&request);
reply = zmsg_new();
zmsg_addstr(reply, "200");
}
mdwrk_destroy(&worker);
return 0;
}
// Обработка запроса, успешный ответ возвращает 1
static int
s_service_success(mdcli_t *client, char *uuid)
{
// Чтение содержимого запроса, первая фрейм содержит имя сервиса
char *filename = s_request_filename(uuid);
FILE *file = fopen(filename, "r");
free(filename);
// Если клиент уже закрыл этот запрос, вернуть 1
if (!file)
return 1;
zmsg_t *request = zmsg_load(file);
fclose(file);
zframe_t *service = zmsg_pop(request);
char *service_name = zframe_strdup(service);
// Используем протокол MMI для проверки доступности сервиса
zmsg_t *mmi_request = zmsg_new();
zmsg_add(mmi_request, service);
zmsg_t *mmi_reply = mdcli_send(client, "mmi.service", &mmi_request);
int service_ok = (mmi_reply
&& zframe_streq(zmsg_first(mmi_reply), "200"));
zmsg_destroy(&mmi_reply);
if (service_ok) {
zmsg_t *reply = mdcli_send(client, service_name, &request);
if (reply) {
filename = s_reply_filename(uuid);
FILE *file = fopen(filename, "w");
assert(file);
zmsg_save(reply, file);
fclose(file);
free(filename);
return 1;
}
zmsg_destroy(&reply);
} else {
zmsg_destroy(&request);
}
free(service_name);
return 0;
}
int main(int argc, char *argv[])
{
int verbose = (argc > 1 && streq(argv[1], "-v"));
zctx_t *ctx = zctx_new();
// Создаем сессию клиента MDP
```
mdcli_t *client = mdcli_new("tcp://localhost:5555", verbose);
mdcli_set_timeout(client, 1000); // 1 секунда
mdcli_set_retries(client, 1); // Попробовать один раз
void *request_pipe = zthread_fork(ctx, titanic_request, NULL);
zthread_new(ctx, titanic_reply, NULL);
zthread_new(ctx, titanic_close, NULL);
// Основной цикл
while (TRUE) {
// Если нет активности, будем циклить каждую секунду
zmq_pollitem_t items[] = {{request_pipe, 0, ZMQ_POLLIN, 0}};
int rc = zmq_poll(items, 1, 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прервать
if (items[0].revents & ZMQ_POLLIN) {
// Убедиться, что папка с сообщениями существует
file_mkdir(TITANIC_DIR);
// Добавляем UUID в очередь, используя "-" для обозначения ожидающего запроса
zmsg_t *msg = zmsg_recv(request_pipe);
if (!msg)
break; // Прервать
FILE *file = fopen(TITANIC_DIR "/queue", "a");
char *uuid = zmsg_popstr(msg);
fprintf(file, "-%s\n", uuid);
fclose(file);
free(uuid);
zmsg_destroy(&msg);
}
}
// Распределение
//
char entry[] = "? . . . . . . . :. . . . . . . :. . . . . . . :";
FILE *file = fopen(TITANIC_DIR "/queue", "r+");
while (file && fread(entry, 33, 1, file) == 1) {
// Обработка запросов с UUID, начинающимся с "-"
if (entry[0] == '-') {
if (verbose)
printf("I: Начало обработки запроса %s\n", entry + 1);
if (s_service_success(client, entry + 1)) {
// Отметка как обработано
fseek(file, -33, SEEK_CUR);
fwrite("+", 1, 1, file);
fseek(file, 32, SEEK_CUR);
}
}
// Пропуск последней строки
if (fgetc(file) == '\r')
fgetc(file);
if (zctx_interrupted)
break;
}
if (file)
fclose(file);
}
mdcli_destroy(&client);
return 0;
}
При тестировании откройте mdbroker и titanic, затем запустите ticlient, после чего запустите любое количество mdworkers. Вы увидите, что клиент получил ответ.Несколько пояснений:
* Мы используем протокол MMI для запроса к агенту о том, доступна ли какая-либо служба, что аналогично логике в MDP;
* Мы используем протокол inproc для связи основного цикла с сервисом titanic.request, чтобы сохранять новые запросы. Это позволяет избежать постоянного сканирования директорий основным циклом для чтения всех файлов запросов и их сортировки по времени.
Этот пример программы не должен заботиться о своей производительности (она будет очень низкой, хотя я не тестировал это), а должна демонстрировать надежный способ коммуникации. Вы можете протестировать это, открыв агента, гиганта, worker'а и клиента, используя параметр -v для вывода трассировки информации, затем открывая и закрывая агента, гиганта или worker'а (клиент нельзя закрывать), и вы увидите, что все запросы получили ответ.
Если вы хотите использовать режим гиганта в реальных условиях, вы, вероятно, захотите узнать, как сделать его быстрее. Вот мои предложения: * Используйте один файл на диске для хранения всех данных. Операционная система эффективнее обрабатывает большие файлы, чем множество маленьких.
* Используйте циклическую структуру для организации этого файла на диске, чтобы новые запросы могли быть последовательно записаны в этот файл. Одиночный поток при полной записи на диск работает достаточно эффективно.
* Сохраняйте индекс в памяти и восстанавливайте его при запуске программы. Это позволяет экономить место на диске для кэша и безопасно сохранять индекс на диске. Вам потребуется механизм fsync для сохранения каждого запроса; или вы можете ждать несколько миллисекунд, если потеря нескольких тысяч запросов вас не беспокоит.
* Если возможно, используйте SSD;
* Предварительно выделяйте пространство для файла на диске или увеличьте размер блока выделения, чтобы избежать фрагментации и обеспечить последовательное чтение и запись.Кроме того, я не рекомендую хранить сообщения в базе данных или использовать высокопроизводительные кэши ключей, так как они дороже одного файла на диске.
Если вы хотите сделать режим гиганта более надёжным, вы можете копировать запросы на другой сервер, чтобы не беспокоиться о воздействии ядерного удара на основную программу.
Если вы хотите сделать режим гиганта быстрее, но готовы пожертвовать некоторой надёжностью, вы можете хранить запросы и ответы в памяти. Это позволит использовать эту службу как автономную сеть, однако если сам сервис гиганта упадёт, я ничем не смогу помочь.
### Высоконадежные симметричные узлы (режим Gemini)
#### Обзор
Режим Gemini представляет собой пару высоконадежных узлов с механизмом главного и резервного узлов. В любой момент времени один узел выполняет роль основного узла, принимая все запросы от клиентов, а другой узел служит в роли резервного узла. Узлы постоянно мониторят друг друга, и если основной узел исчезает из сети, резервный узел немедленно заменяет его.
Режим Gemini был разработан Pieter Hintjens и Martin Sustrik и используется в сервере OpenAMQ компании iMatix. Основные идеи дизайна:
* Предоставление простого решения для обеспечения высокой надежности;
* Легкость понимания и использования;
* Возможность надежного переключения при отказе.
Предположим, что у нас есть группа серверов, работающих в режиме Gemini. Вот возможные сценарии отказа:
1. Основной узел выходит из строя из-за аппаратного сбоя (отключение питания, пожар и т.д.), и приложение немедленно переходит на подключение к резервному узлу;
2. Основной узел теряет связь из-за отказа сетевого оборудования (например, маршрутизатор был поражен молнией), и приложение немедленно переходит на подключение к резервному узлу;
3. Услуга на основном узле была случайно завершена техническим персоналом и не может автоматически восстановиться.
Процесс восстановления состоит из следующих шагов:
1. Технический персонал диагностирует причину отказа основного узла;
2. Резервный узел отключается, что приводит к кратковременному прерыванию услуги;
3. После того как все приложения переключатся на основной узел, технический персонал запускает резервный узел.
Процесс восстановления осуществляется вручную. Опыт показывает, что автоматическое восстановление может быть опасным:
* Отказ может вызвать кратковременное прерывание услуги на 10-30 секунд. Если это действительно внезапное событие, лучше всего временно прекратить обслуживание основного узла, так как немедленное восстановление может привести к еще одному прерыванию на 10-30 секунд, что лучше, чем потерять доступ к услуге.* При возникновении чрезвычайной ситуации можно записывать причины отказа во время ремонта, а не позволять системе автоматически восстанавливаться. Это позволяет администратору использовать свой опыт для защиты от следующего внезапного события.
* Наконец, если автоматическое восстановление действительно успешно, администратор не сможет определить причину отказа, поэтому не сможет провести анализ.
Процесс восстановления в режиме Gemini заключается в том, чтобы после устранения проблемы основного узла отключить резервный узел, а затем снова включить его:

Процесс отключения режима Gemini имеет два варианта:
1. Сначала отключается резервный узел, затем основной узел после некоторого времени;
1. Одновременное отключение основного и резервного узлов, при этом интервал времени между ними не превышает нескольких секунд. При отключении интервал времени должен быть короче времени переключения в случае отказа, иначе это может привести к потери соединения приложением, его повторному подключению и еще раз к потере соединения, что вызовет жалобы пользователей.
### Подробные требованияДвойной звезды (Dual Star) режим может быть очень простым, но работать эффективно. В действительности, эта реализация прошла три версии, а предыдущие были слишком сложными и пытались делать слишком много, поэтому их отвергли. Нам требуется только базовая функциональность, которая обеспечивает понятное, легко разрабатываемое и надёжное решение.Вот подробные требования для этой архитектуры: * Отказы, требующие использования режима с двойной звездой, включают катастрофические события, такие как сбой аппаратного обеспечения, пожар или случайное событие. Для других обычных отказов сервера можно использовать более простые методы.
* Время восстановления должно составлять менее 60 секунд, идеально — менее 10 секунд;
* Переключение в случае отказа (failover) должно происходить автоматически, в то время как восстановление системы (recover) должно выполняться вручную. Мы хотим, чтобы приложение автоматически переключалось с основного сервера на резервный при отказе, но не хотелось бы, чтобы оно автоматически переключалось обратно на основной сервер до тех пор, пока проблема не будет решена, так как это может привести к новому отказу основного сервера.
* Логика приложения должна быть максимально простой, легкой в использовании и лучше всего быть упакованной в API;
* Должна быть предоставлена явная индикация того, какой сервер является основным, чтобы избежать "раздвоения личности", когда оба сервера считают себя основными;
* Порядок запуска двух серверов не должен быть ограничен;
* При запуске или остановке основного и резервного серверов конфигурация клиента не должна меняться, хотя это может прервать соединение;
* Администратор должен иметь возможность одновременно мониторить оба сервера; * Между двумя серверами должно быть специальное высокоскоростное сетевое соединение, которое позволяет маршрутизировать по определенному IP-адресу.Мы делаем следующие предположения:
* Одного резервного сервера достаточно для обеспечения надежности, дополнительные механизмы резервного копирования не требуются;
* Основной и резервный серверы должны быть способны предоставлять полный набор услуг и выдерживать одинаковые нагрузки, без необходимости балансировки нагрузки;
* Бюджет позволяет иметь резервный сервер, который большую часть времени остается недействующим.
Режим двойной звезды не использует:
* Множество резервных серверов или балансировку нагрузки между основным и резервным серверами. Резервный сервер в этом режиме всегда остается недействующим, за исключением случаев, когда основной сервер выходит из строя;
* Обработку устойчивых сообщений или транзакций. Мы предполагаем, что подключенная сеть является ненадежной (или недоверенной);
* Автоматический поиск сети. Режим двойной звезды настраивается вручную, они знают о существовании друг друга, а приложение знает о существовании режима двойной звезды;
* Синхронизацию состояния между основным и резервным серверами. Все состояние сервера должно быть восстановлено приложением.
Вот несколько терминов, используемых в режиме двойных звезд: * **Хост** - обычно машина, работающая в качестве мастера;
* **Бэкап** - обычно машина, работающая в качестве слейва; она становится мастером только тогда, когда хост исчезает из сети и начинает принимать все запросы от приложений;
* **Мастер** - машина, которая принимает запросы от приложений в режиме двойных мастеров; в любой момент времени мастер может быть только один;
* **Слейв** - машина, которая заменяет мастера, если мастер исчезает.Шаги конфигурирования режима двойных звезд:
1. Убедитесь, что хост знает местоположение бэкапа;
1. Убедитесь, что бэкап знает местоположение хоста;
1. Настройте время восстановления после отказа; конфигурация двух машин должна быть одинаковой.
Одним из важнейших параметров является интервал проверки состояния другой машины и время, необходимое для выполнения действий. В нашем примере время восстановления после отказа установлено в 2000 миллисекунд, и если это время превышено, бэкап становится новым мастером. Однако, если вы запускаете сервис хоста через shell-скрипт, вам следует увеличить это время, чтобы избежать ситуации, когда бэкап становится мастером во время восстановления соединения хоста.
Чтобы сделать клиентское приложение совместимым с режимом двойных звезд, вам потребуется:
1. Знать адреса двух серверов;
1. Попытаться подключиться к хосту, если это невозможно — подключиться к бэкапу;
1. Обнаруживать потерянные соединения, обычно с использованием механизма пульса;
1. Попытаться повторно подключиться к хосту, а затем к бэкапу, интервал между которыми должен быть больше времени восстановления после отказа;
1. Перестроить все данные состояния, необходимые на стороне сервера;
1. Если требуется надежность, повторно отправить сообщения, потерянные во время отказа.Это не простая задача, поэтому мы обычно упаковываем это в API для использования программистами.
Основные ограничения режима двойных звезд:
* Процесс на стороне сервера не должен включать более одного узла симметричной пары;
* Хост может иметь только одного бэкапа;
* Когда бэкап находится в состоянии слейва, он не обрабатывает никаких запросов;
* Бэкап должен быть способен принять все запросы от приложений;
* Время восстановления после отказа не может быть изменено во время выполнения;
* Клиентское приложение должно выполнять работу по повторному подключению.
#### Предотвращение психического расщепления"Психическое расщепление" — это состояние, при котором различные части кластера одновременно считают себя мастером и прекращают обнаруживать друг друга. Алгоритмы режима двойных звёзд снижают вероятность возникновения этого состояния: хост и бэкап определяют, являются ли они мастером, проверяя, получили ли они запросы от приложений, и исчез ли другой узел из сети. Однако в некоторых случаях режим двойной звезды также может столкнуться с проблемой разделения сознания. Например, если основной и резервный серверы расположены в двух разных зданиях, где каждое здание имеет свои локальные сети, распределённые по нескольким приложениям. В этом случае, когда связь между сетями двух зданий прерывается, основной и резервный серверы будут работать независимо друг от друга, принимая и обрабатывая запросы в каждом здании.Чтобы предотвратить разделение сознания, мы должны обеспечить использование специализированной сети для соединения основного и резервного серверов. Самый простой способ — это соединить их кабелем UTP.
Мы не можем размещать режим двойной звезды на двух разных островах, обслуживая приложения на каждом острове. В этом случае мы будем использовать механизмы надежности, такие как федеральный режим.
Лучшим, но более экстремальным подходом будет полное разделение соединений между двумя машинами и соединений приложений, даже до использования разных сетевых адаптеров, а не просто разных портов. Это делается для того, чтобы упростить диагностику ошибок в будущем.
#### Реализация режима двойной звезды
Без лишних слов, вот код сервера для режима двойной звезды:**bstarsrv: Сервер двойной звезды на C**
```c
//
// Близнецовый режим - серверная часть
//
#include "czmq.h"
// Интервал отправки состояния
// Если противоположная сторона не отвечает за два сердцебиения, соединение считается разорванным
#define HEARTBEAT 1000 // В миллисекундах
// Структура состояний сервера
typedef enum {
STATE_PRIMARY = 1, // Основной сервер, ожидающий подключения резервного сервера
STATE_BACKUP = 2, // Резервный сервер, ожидающий подключения основного сервера
STATE_ACTIVE = 3, // Активное состояние, обрабатывает запросы приложения
STATE_PASSIVE = 4 // Пассивное состояние, не принимает запросы
} state_t;
// Структура событий
typedef enum {
PEER_PRIMARY = 1, // Основной сервер
PEER_BACKUP = 2, // Резервный сервер
PEER_ACTIVE = 3, // Активное состояние
PEER_PASSIVE = 4, // Пассивное состояние
CLIENT_REQUEST = 5 // Запрос клиента
} event_t;
// Конечный автомат
typedef struct {
state_t state; // Текущее состояние
event_t event; // Текущее событие
int64_t peer_expiry; // Время истечения сессии для проверки соединения
} bstar_t;
// Выполнение конечного автомата (связывание события со состоянием);
// Возвращает TRUE в случае возникновения исключения.
static bool
s_state_machine (bstar_t *fsm)
{
bool exception = false;
// Основной сервер ожидает подключения резервного сервера
// В этом состоянии принимаются события PEER_BACKUP и PEER_ACTIVE
if (fsm->state == STATE_PRIMARY) {
if (fsm->event == PEER_BACKUP) {
} else // Сервер находится в пассивном состоянии // Если партнер умер, событие CLIENT_REQUEST будет триггерить восстановление if (fsm->state == STATE_PASSIVE) { if (fsm->event == PEER_PRIMARY) { // Партнер перезапускается — переходим в активное состояние, партнер переходит в пассивное. printf ("I: Узел (slave) перезапускается, может стать мастером. \n"); fsm->state = STATE_ACTIVE; } else if (fsm->event == PEER_BACKUP) {
printf("I: Бэкап (slave) перезапускается, может стать мастером.\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_PASSIVE) {
// Если есть два slave, кластер не будет отвечать
printf("E: Критическая ошибка: два slave. Выход...\n");
exception = TRUE;
}
else
if (fsm->event == CLIENT_REQUEST) {
// Если таймаут сердцебиения, партнер станет мастером;
// Это поведение вызвано запросом клиента.
assert(fsm->peer_expiry > 0);
if (zclock_time() >= fsm->peer_expiry) {
// Партнер умер, переходим в активное состояние.
printf("I: Восстановление после отказа, могу стать мастером.\n");
fsm->state = STATE_ACTIVE;
}
else
// Партнер жив, отклоняем запрос.
exception = TRUE;
}
}
return exception;
}```c
int main(int argc, char *argv[]) {
// Командные аргументы могут быть:
// -p запуск как мастера, на tcp://localhost:5001
// -b запуск как слейва, на tcp://localhost:5002
zctx_t *ctx = zctx_new();
void *statepub = zsocket_new(ctx, ZMQ_PUB);
void *statesub = zsocket_new(ctx, ZMQ_SUB);
void *frontend = zsocket_new(ctx, ZMQ_ROUTER);
bstar_t fsm = {0};
if (argc == 2 && streq(argv[1], "-p")) {
printf("I: Запущен как мастер, ожидаю подключения слейва.\n");
zsocket_bind(frontend, "tcp://*:5001");
zsocket_bind(statepub, "tcp://*:5003");
zsocket_connect(statesub, "tcp://localhost:5004");
fsm.state = STATE_PRIMARY;
} else if (argc == 2 && streq(argv[1], "-b")) {
printf("I: Запущен как слейв, ожидаю подключения мастера.\n");
zsocket_bind(frontend, "tcp://*:5002");
zsocket_bind(statepub, "tcp://*:5004");
zsocket_connect(statesub, "tcp://localhost:5003");
fsm.state = STATE_BACKUP;
} else {
printf("Использование: bstarsrv { -p | -b }\n");
zctx_destroy(&ctx);
exit(0);
}
// Устанавливаем время следующей отправки состояния
int64_t send_state_at = zclock_time() + HEARTBEAT;
while (!zctx_interrupted) {
zmq_pollitem_t items[] = {
{frontend, 0, ZMQ_POLLIN, 0},
{statesub, 0, ZMQ_POLLIN, 0}
};
int time_left = (int)((send_state_at - zclock_time()));
if (time_left < 0)
time_left = 0;
int rc = zmq_poll(items, 2, time_left * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Объект контекста был закрыт
if (items[0].revents & ZMQ_POLLIN) {
// Получили запрос от клиента
zmsg_t *msg = zmsg_recv(frontend);
fsm.event = CLIENT_REQUEST;
if (s_state_machine(&fsm) == FALSE)
// Отправляем ответ
zmsg_send(&msg, frontend);
else
zmsg_destroy(&msg);
}
if (items[1].revents & ZMQ_POLLIN) {
// Получили сообщение о состоянии, как событие
char *message = zstr_recv(statesub);
fsm.event = atoi(message);
free(message);
if (s_state_machine(&fsm))
break; // Ошибка, выходим.
}
}
}
``````markdown
fsm.peer_expiry = zclock_time() + 2 * HEARTBEAT;
}
// Время отправки состояния
if (zclock_time() >= send_state_at) {
char message[2];
sprintf(message, "%d", fsm.state);
zstr_send(statepub, message);
send_state_at = zclock_time() + HEARTBEAT;
}
}
if (zctx_interrupted)
printf("W: Interrupt\n");
// Закрываем сокеты и контекст
zctx_destroy(&ctx);
return 0;
Вот клиентский код:
Важно отметить, что фраза "Interrupt" была переведена как "Прерывание", но в контексте программирования более подходящим будет оставить её без перевода, чтобы сохранить техническую специфичность. Однако, в данном случае, было решено перевести её для полноты перевода текста.bstarcli: Клиент для двойной звезды на C
//
// Двойная звезда - клиент
//
#include "czmq.h"
#define REQUEST_TIMEOUT 1000 // миллисекунд
#define SETTLE_DELAY 2000 // время ожидания
int main (void)
{
zctx_t *ctx = zctx_new ();
char *server [] = { "tcp://localhost:5001", "tcp://localhost:5002" };
uint server_nbr = 0;
printf ("I: Подключаюсь к серверу %s...\n", server [server_nbr]);
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, server [server_nbr]);
int sequence = 0;
while (!zctx_interrupted) {
// Отправка запроса и ожидание ответа
char request [10];
sprintf (request, "%d", ++sequence);
zstr_send (client, request);
int expect_reply = 1;
while (expect_reply) {
// Опрос сокета
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
if (rc == -1)
break; // прерывание
// Обработка ответа
if (items [0].revents & ZMQ_POLLIN) {
// Проверка номера ответа
char *reply = zstr_recv (client);
if (atoi (reply) == sequence) {
printf ("I: Сервер ответил корректно (%s)\n", reply);
expect_reply = 0;
sleep (1); // отправка одного запроса в секунду
}
else {
printf ("E: Некорректный ответ: %s\n",
reply);
}
free (reply);
}
else {
printf ("W: Сервер не отвечает, повторная попытка подключения\n");
// Уничтожение сокета
zsocket_destroy (ctx, client);
server_nbr = (server_nbr + 1) % 2;
zclock_sleep (SETTLE_DELAY);
printf ("I: Подключаюсь к серверу %s...\n",
server [server_nbr]);
client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, server [server_nbr]);
}
}
}
}
``` // Отправка запроса через новый сокет
zstr_send(client, request);
}
}
}
zctx_destroy(&ctx);
return 0;
}
```Запустите следующие команды для тестирования в произвольном порядке:
bstarsrv -p # Запуск основного сервера bstarsrv -b # Запуск резервного сервера bstarcli
Вы можете завершить процесс основного сервера, чтобы протестировать механизм восстановления после отказа; затем снова запустить основной сервер и завершить процесс резервного сервера, чтобы проверить механизм восстановления. Обратите внимание, что эти события должны быть инициированы клиентом.
Ниже приведена схема состояний служебных процессов. В зелёном состоянии принимаются запросы от клиента, в розовом — запросы отклоняются. События относятся к состоянию партнера, поэтому "активное состояние партнера" означает, что партнёрская машина сообщила нам, что она находится в активном состоянии. "Запрос от клиента" означает, что мы получили запрос от клиента, а "голосование клиента" указывает на то, что мы получили запрос от клиента и партнёр уже считается погибшим.
Важно отметить, что служебные процессы используют сокеты PUB-SUB для обмена состояниями, другие типы сокетов здесь непригодны. Например, сокеты PUSH и DEALER будут заблокированы, если нет подключённых узлов; сокеты PAIR не смогут автоматически переподключаться после разрыва связи; сокеты ROUTER требуют адреса для отправки сообщений.Это основные ограничения шаблона Двойной Звезды:
* Серверный процесс не может быть частью более чем одной пары двойной звезды.
* Основной сервер может иметь только один резервный сервер.
* Резервный сервер не может выполнять полезную работу в режиме резерва.
* Резервный сервер должен быть способен обрабатывать полную нагрузку приложения.
* Конфигурация переключения на резервный сервер не может быть изменена во время выполнения.
* Приложения-клиенты должны выполнять некоторую работу, чтобы воспользоваться преимуществами переключения на резервный сервер.
#### Реактор Двойной Звезды
Мы можем упаковать шаблон Двойной Звезды в класс, похожий на реактор, для последующего использования. В C языке мы используем класс zloop из библиотеки czmq, а другие языки должны иметь аналогичные реализации. Ниже приведён интерфейс bstar на языке C:
```c
// Создание экземпляра двойной звезды с использованием локальных (биндов) и удалённых (соединений) конечных точек для установки пары узлов.
bstar_t *bstar_new (int primary, char *local, char *remote);
// Уничтожение экземпляра
void bstar_destroy (bstar_t **self_p);
// Возвращает базовый zloop реактор для добавления таймеров, читателей, регистрации и отмены.
zloop_t *bstar_zloop (bstar_t *self);
// Регистрация читателя голосования
int bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler, void *arg);
```// Регистрация обработчика машины состояний
void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);
// Запуск реактора, который прекращается, когда обратный вызов возвращает -1, или процесс получает сигнал SIGINT или SIGTERM.
int bstar_start (bstar_t *self);
```Вот реализация класса:```**bstar: Класс ядра двойной звезды на C**```c
/* =====================================================================
bstar - Бинарный звездный реактор
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
Этот файл является частью руководства по ZeroMQ: http://zguide.zeromq.org
Это свободное программное обеспечение; вы можете распространять его и/или модифицировать его в соответствии с
условиями Генеральной общественной лицензии GNU версии Yöntem 3, выпущенной Free Software Foundation,
или (по вашему выбору) любой более поздней версией этой лицензии.
Это программное обеспечение распространяется в надежде, что оно будет полезным, но
БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ; даже без скрытых гарантий пригодности для использования или соответствия какому-либо назначению.
Смотрите Генеральную общественную лицензию GNU для получения дополнительной информации.
Вы должны были получить копию Генеральной общественной лицензии GNU вместе с этим программным обеспечением.
Если нет, см. <http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "bstar.h"
// Состояние сервера
typedef enum {
STATE_PRIMARY = 1, // Главный сервер, ожидает подключения от второго сервера
STATE_BACKUP = 2, // Второй сервер, ожидает подключения от главного сервера
STATE_ACTIVE = 3 // Активное состояние, обрабатывает запросы приложения
} state_t;
``````c
STATE_PASSIVE = 4 // Пассивное состояние, не принимает запросы
} state_t;
// События узла связи
typedef enum {
PEER_PRIMARY = 1, // Главный сервер
PEER_BACKUP = 2, // Второй сервер
PEER_ACTIVE = 3, // Активное состояние
PEER_PASSIVE = 4, // Пассивное состояние
CLIENT_REQUEST = 5 // Запрос клиента
} event_t;
// Интервал времени для отправки информации о состоянии
// Если противоположная сторона не отвечает после двух сердечных сокращений, считается, что соединение разорвано
#define BSTAR_HEARTBEAT 1000 // В миллисекундах
// Структура класса
struct _bstar_t {
zctx_t *ctx; // Приватный контекст
zloop_t *loop; // Цикл реактора
void *statepub; // Публикация состояния
void *statesub; // Подписка на состояние
state_t state; // Текущее состояние
event_t event; // Текущее событие
int64_t peer_expiry; // Время истечения для определения смерти узла
zloop_fn *voter_fn; // Обработчик сокета голосования
void *voter_arg; // Аргументы для обработчика голосования
}
``````markdown
zloop_fn *master_fn; // при переходе в состояние master
void *master_arg; // аргумент
zloop_fn *slave_fn; // при переходе в состояние slave
void *slave_arg; // аргумент
};
// ---------------------------------------------------------------------
// Выполняет конечный автомат (связывает события со состояниями);
// Возвращает -1 при возникновении ошибки, 0 при успешном выполнении.
static int
s_execute_fsm (bstar_t *self)
{
int rc = 0;
// Узел ожидает подключения другого узла
// В этом состоянии принимаются события CLIENT_REQUEST
if (self->state == STATE_PRIMARY) {
if (self->event == PEER_BACKUP) {
zclock_log ("I: Подключен к slave, можно работать в качестве master.");
self->state = STATE_ACTIVE;
if (self->master_fn)
(self->master_fn) (self->loop, NULL, self->master_arg);
}
else
if (self->event == PEER_ACTIVE) {
zclock_log ("I: Подключен к master, можно работать в качестве slave.");
self->state = STATE_PASSIVE;
if (self->slave_fn)
(self->slave_fn) (self->loop, NULL, self->slave_arg);
}
else
if (self->event == CLIENT_REQUEST) {
zclock_log ("I: Получено запрос от клиента, можно работать в качестве master.");
self->state = STATE_ACTIVE;
if (self->master_fn)
(self->master_fn) (self->loop, NULL, self->master_arg);
}
}
else
// Slave ожидает подключения другого узла
// В этом состоянии отклоняются события CLIENT_REQUEST
if (self->state == STATE_BACKUP) {
if (self->event == PEER_ACTIVE) {
zclock_log ("I: Подключен к master, можно работать в качестве slave.");
self->state = STATE_PASSIVE;
if (self->slave_fn)
(self->slave_fn) (self->loop, NULL, self->slave_arg);
}
else
if (self->event == CLIENT_REQUEST)
rc = -1;
}
else
// Узел находится в активном состоянии
// В этом состоянии принимаются события CLIENT_REQUEST
// Узел покидает активное состояние только при смерти
if (self->state == STATE_ACTIVE) {
if (self->event == PEER_ACTIVE) {
// При обнаружении двух masterов выбрасывается ошибка
zclock_log ("E: Критическая ошибка: двойной master. Выход.");
}```c rc = -1; } } else // Узел находится в пассивном состоянии
// Если партнер уже мертв, событие CLIENT_REQUEST запустит восстановление после отказа
if (self->state == STATE_PASSIVE) {
if (self->event == PEER_PRIMARY) {
// Партнер перезапускается — состояние меняется на активное, партнер становится пассивным.
zclock_log("I: Узел (slave) перезапускается, может стать мастером.");
self->state = STATE_ACTIVE;
} else if (self->event == PEER_BACKUP) {
// Партнер перезапускается — состояние меняется на активное, партнер становится пассивным.
zclock_log("I: Бэкап (slave) перезапускается, может стать мастером.");
self->state = STATE_ACTIVE;
} else if (self->event == PEER_PASSIVE) {
// Если есть два slave, кластер будет недоступен
zclock_log("E: Критическая ошибка: два slave. Выход.");
rc = -1;
} else if (self->event == CLIENT_REQUEST) {
// Если таймаут сердцебиения, партнер станет мастером;
// Это поведение запускается клиентским запросом.
assert(self->peer_expiry > 0);
if (zclock_time() >= self->peer_expiry) {
// Партнер мертв, состояние меняется на активное.
zclock_log("I: Восстановление после отказа, может стать мастером.");
self->state = STATE_ACTIVE;
} else {
// Партнер жив, запрос отклоняется.
rc = -1;
}
}
// Вызов функции обработки события изменения состояния
if (self->state == STATE_ACTIVE && self->master_fn)
(self->master_fn)(self->loop, NULL, self->master_arg);
}
return rc;
}
```// ---------------------------------------------------------------------
// Обработчик событий реактора
// Отправка информации о состоянии
int s_send_state(zloop_t *loop, void *socket, void *arg) {
bstar_t *self = (bstar_t *) arg;
zstr_sendf(self->statepub, "%d", self->state);
return 0;
}
// Получение информации о состоянии, запуск конечного автомата
int s_recv_state(zloop_t *loop, void *socket, void *arg) {
bstar_t *self = (bstar_t *) arg;
char *state = zstr_recv(socket);
if (state) {
self->event = atoi(state);
self->peer_expiry = zclock_time() + 2 * BSTAR_HEARTBEAT;
free(state);
}
return s_execute_fsm(self);
}```markdown
// Если запрос был обработан, вызываем функцию
self.event = CLIENT_REQUEST
if s_execute_fsm(self) == 0:
print("CLIENT REQUEST")
(self.voter_fn)(self.loop, socket, self.voter_arg)
else:
// Удаляем сообщение из очереди ожидания
msg = zmsg_recv(socket)
zmsg_destroy(&msg)
return 0
}
// ---------------------------------------------------------------------
// Конструктор
bstar_t *
bstar_new(int primary, char *local, char *remote)
{
bstar_t
*self
self = (bstar_t *) zmalloc(sizeof(bstar_t))
// Инициализируем двойную звезду
self.ctx = zctx_new()
self.loop = zloop_new()
self.state = primary ? STATE_PRIMARY : STATE_BACKUP
// Создаем публичный сокет состояния
self.statepub = zsocket_new(self.ctx, ZMQ_PUB)
zsocket_bind(self.statepub, local)
// Создаем подписочный сокет состояния
self.statesub = zsocket_new(self.ctx, ZMQ_SUB)
zsocket_connect(self.statesub, remote)
// Устанавливаем базовый обработчик событий реактора
zloop_timer(self.loop, BSTAR_HEARTBEAT, 0, s_send_state, self)
zloop_reader(self.loop, self.statesub, s_recv_state, self)
return self
}
// ---------------------------------------------------------------------
// Деструктор
void
bstar_destroy(bstar_t **self_p)
{
assert(self_p)
if *self_p:
bstar_t *self = *self_p
zloop_destroy(&self.loop)
zctx_destroy(&self.ctx)
free(self)
*self_p = NULL
}
// ---------------------------------------------------------------------
// Возвращает внутренний объект zloop для добавления дополнительных таймеров, читателей и т. д.
zloop_t *
bstar_zloop(bstar_t *self)
{
return self.loop
}
// ---------------------------------------------------------------------
``````c
// Создает сокет, подключается к локальному конечной точке, регистрируется как читатель;
// Читает сокет только если машина состояний позволяет это;
// Сообщения, полученные с этого сокета, рассматриваются как "голосование";
// Мы требуем, чтобы в режиме двойной звезды было только одно "голосующее" сокет.
int
bstar_voter(bstar_t *self, char *endpoint, int type, zloop_fn handler,
void *arg)
{
// Сохраняем оригинальную обратную функцию вызова и аргументы для использования позже
void *socket = zsocket_new(self->ctx, type);
zsocket_bind(socket, endpoint);
assert(!self->voter_fn);
self->voter_fn = handler;
self->voter_arg = arg;
return zloop_reader(self->loop, socket, s_voter_ready, self);
}
```
```markdown
## Обработчики событий изменения состояния
```c
// Регистрация обработчика событий изменения состояния
void
bstar_new_master(bstar_t *self, zloop_fn handler, void *arg)
{
assert(!self->master_fn);
self->master_fn = handler;
self->master_arg = arg;
}
// Регистрация обработчика событий изменения состояния
void
bstar_new_slave(bstar_t *self, zloop_fn handler, void *arg)
{
assert(!self->slave_fn);
self->slave_fn = handler;
self->slave_arg = arg;
}
```
## Управление трассировкой
```c
// Включение или отключение трассировки
void
bstar_set_verbose(bstar_t *self, bool verbose)
{
zloop_set_verbose(self->loop, verbose);
}
```
## Запуск реактора
```c
// Запуск реактора, который прекращает работу при возврате -1 из обратного вызова, или при получении процессом сигналов SIGINT или SIGTERM.
int
bstar_start(bstar_t *self)
{
assert(self->voter_fn);
return zloop_start(self->loop);
}
```
Таким образом, наш серверный код станет очень коротким:
```**bstarsrv2: Бинарный сервер двойной звезды, использующий основной класс на C**
```c
//
// Сервер двойной звезды, использующий bstar реактивный узел
//
// Прямая компиляция, без создания библиотеки
#include "bstar.c"
// Эхо-сервис
int s_echo(zloop_t *loop, void *socket, void *arg) {
zmsg_t *msg = zmsg_recv(socket);
zmsg_send(&msg, socket);
return 0;
}
int main(int argc, char *argv[]) {
// Командные аргументы могут быть:
// -p запуск в качестве основного сервера, на tcp://localhost:5001
// -b запуск в качестве резервного сервера, на tcp://localhost:5002
bstar_t *bstar;
if (argc == 2 && streq(argv[1], "-p")) {
printf("I: Основной сервер master, ожидает подключения резервного сервера (slave).\n");
bstar = bstar_new(BSTAR_PRIMARY, "tcp://*:5001", "tcp://localhost:5002");
bstar_voter(bstar, "tcp://*:5001", ZMQ_ROUTER, s_echo, NULL);
} else
if (argc == 2 && streq(argv[1], "-b")) {
printf("I: Резервный сервер slave, ожидает подключения основного сервера (master).\n");
bstar = bstar_new(BSTAR_BACKUP, "tcp://*:5002", "tcp://localhost:5001");
bstar_voter(bstar, "tcp://*:5002", ZMQ_ROUTER, s_echo, NULL);
} else {
printf("Использование: bstarsrvs { -p | -b }\n");
exit(0);
}
bstar_start(bstar);
bstar_destroy(&bstar);
return 0;
}
```
### Надежность без промежуточного программного обеспечения (режим Free Agent)Мы уже рассмотрели множество примеров с использованием промежуточного программного обеспечения, что может показаться противоречащим принципу "ZMQ как без промежуточного программного обеспечения". Однако стоит помнить, что в реальной жизни промежуточное программное обеспечение всегда было предметом любви и ненависти. В реальных системах многие сообщения используют промежуточное программное обеспечение для построения распределённых архитектур. Поэтому окончательное решение остаётся за вами. Это также объясняет, почему, хотя я могу доехать за 10 минут до крупного магазина и купить пять ящиков громкой музыки, я предпочитаю идти пешком за те же 10 минут до местного магазина. Такие экономические соображения (время, усилия, затраты и т.д.) являются важными как в повседневной жизни, так и в архитектуре программного обеспечения.Поэтому ZMQ не требует использования архитектуры с промежуточным программным обеспечением, но всё же предоставляет такие встроенные устройства, которые программисты могут использовать по своему усмотрению. В этом разделе мы откажемся от архитектуры, использующей промежуточные компоненты для обеспечения надёжности, и перейдём к использованию пунт-ту-пойнт архитектуры, то есть Free Agent Pattern, для надёжной передачи сообщений. Примером программы будет служба разрешения имен. Одним из распространённых вопросов в ZMQ является следующий: как нам узнать конечные точки, к которым нужно подключиться? Внедрение TCP/IP адресов напрямую в код явно неприемлемо; использование конфигурационных файлов создаёт проблемы с управлением. Представьте себе ситуацию, когда вам нужно настроить сотни компьютеров только для того, чтобы они знали, что IP-адрес google.com равен 74.125.230.82. Функции, которые должны быть реализованы в службе разрешения имен ZMQ:
* Преобразование логических имен в один или несколько адресов конечных точек, включая привязку и соединение. В реальном использовании служба имен будет предоставлять набор конечных точек.
* Возможность использования этой службы в различных окружениях, таких как среда разработки и среда производства.
* Служба должна быть надёжной, иначе приложение не сможет подключиться к сети.Предоставление службы разрешения имен для управления может быть полезным, хотя простое открытие конечных точек агента также возможно. Однако, если использовать службу разрешения имен правильно, она станет единственным внешним интерфейсом, что облегчит управление.
Типы сбоев, которые нам нужно учитывать, включают: отказ сервиса или перезапуск, перегрузка сервиса, сетевые факторы и т. д. Для обеспечения надежности нам нужно создать группу сервисов, так что клиенты могут подключаться к другим сервисам, если один из сервисов выходит из строя. На практике достаточно двух сервисов, но фактическое количество сервисов может быть любым.

В этом архитектурном решении множество клиентов взаимодействуют с небольшим количеством сервисов. Сервисы привязывают сокеты к отдельным портам, что отличается от агента в модели управления. У клиента есть несколько вариантов:
* Клиент может использовать сокет REQ и ленивый пиратский режим, но ему нужна механика, которая предотвращает постоянные запросы к уже остановленному сервису.
* Клиент может использовать сокет DEALER и отправлять запросы ко всем сервисам. Это просто, но не очень эффективно;* Клиент использует сокет ROUTER для подключения к конкретному сервису. Но как клиент узнает идентификатор сокета сервиса? Один способ — это активное подключение сервиса к клиенту (очень сложный), или закрепление идентификатора сервиса в коде (очень запутанный).#### Модель 1: Простое повторное подключение
Давайте начнем с простого подхода, перепишем ленивый пиратский режим, чтобы он мог работать с несколькими сервисами. При запуске сервиса указываем порт через командную строку. Затем запускаем несколько сервисов.
**flserver1: Freelance сервер, модель 1 на C**
```c
//
// Freelancer Mode - Server - Model 1
// Provides echo service
//
#include "czmq.h"
int main (int argc, char *argv []) {
if (argc < 2) {
printf ("I: syntax: %s <endpoint>\n", argv [0]);
exit (EXIT_SUCCESS);
}
zctx_t *ctx = zctx_new ();
void *server = zsocket_new (ctx, ZMQ_REP);
zsocket_bind (server, argv [1]);
printf ("I: echo серверный конечный пункт: %s\n", argv [1]);
while (TRUE) {
zmsg_t *msg = zmsg_recv (server);
if (!msg)
break; // Прерывание
zmsg_send (&msg, server);
}
if (zctx_interrupted)
printf ("W: прерывание\n");
zctx_destroy (&ctx);
return 0;
}
```
Запустите клиент, указав один или несколько конечных точек:
**flclient1: Freelance клиент, модель 1 на C**
```c
//
// Freelancer mode - Client - Model 1
// Uses REQ socket to request one or more servers
//
#include "czmq.h"
#define REQUEST_TIMEOUT 1000
#define MAX_RETRIES 3 // Количество попыток
static zmsg_t *
s_try_request (zctx_t *ctx, char *endpoint, zmsg_t *request) {
printf ("I: Trying to request echo service at endpoint %s...\n", endpoint);
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, endpoint);
// Отправляем запрос и ждём ответа
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, client);
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
zmsg_t *reply = NULL;
if (items [0].revents & ZMQ_POLLIN)
reply = zmsg_recv (client);
// Закрываем сокет
zsocket_destroy (ctx, client);
return reply;
}
``````Для запуска используйте следующие команды:
```
flserver1 tcp://*:5555 &
flserver1 tcp://*:5556 &
flclient1 tcp://localhost:5555 tcp://localhost:5556
```
Основной механизм клиента основан на ленивом пиратском алгоритме, то есть после получения одного успешного ответа он завершает работу. Возможны два случая:
* Если существует только один сервер, клиент будет повторять попытки N раз перед тем, как остановиться, что соответствует логике ленивого пиратского алгоритма;
* Если имеется несколько серверов, клиент будет пытаться получить ответ от каждого сервера по очереди, и после получения первого ответа прекратит дальнейшие попытки.
```Этот механизм дополняет пиратский алгоритм, позволяя ему работать в случае наличия только одного сервера.
Однако, данная концепция не может быть использована в реальных приложениях: если множество клиентов подключены к серверу, а основной сервер выходит из строя, все клиенты должны будут продолжить выполнение только после истечения времени ожидания.
#### Модель 2: Батч-отправка
Теперь давайте воспользуемся сокетами DEALER. Наша цель — получить ответ за минимальное время, не завися от состояния основного сервера. Для этого можно применить следующие меры:
* Подключиться ко всем серверам;
* При получении запроса отправить его одновременно всем серверам;
* Ждать первого ответа;
* Игнорировать остальные ответы.
При такой реализации клиента, после отправки запроса все серверы получат его и вернут ответ. Если какой-то сервер отключится, ZMQ может переслать запрос другим серверам, что приведет к получению некоторых серверов двух одинаковых запросов.
Более сложной проблемой является невозможность клиента определить количество полученных ответов, что может привести к путанице.Можно пронумеровать запросы и игнорировать несоответствующие ответы. Необходимо модифицировать серверы, чтобы они возвращали сообщения, содержащие номер запроса:
**flserver2: Freelance server, Model Two in C**```c
//
// Freelancer режим - сервер - модель 2
// Возвращает информацию OK с номером запроса
//
#include "czmq.h"
int main (int argc, char *argv[])
{
if (argc < 2) {
printf("I: синтаксис: %s <конечная точка>\n", argv[0]);
exit(EXIT_SUCCESS);
}
zctx_t *ctx = zctx_new();
void *server = zsocket_new(ctx, ZMQ_REP);
zsocket_bind(server, argv[1]);
printf("I: Сервер готов %s\n", argv[1]);
while (TRUE) {
zmsg_t *request = zmsg_recv(server);
if (!request)
break; // Прерывание
// Проверка содержимого запроса
assert(zmsg_size(request) == 2);
zframe_t *address = zmsg_pop(request);
zmsg_destroy(&request);
zmsg_t *reply = zmsg_new();
zmsg_add(reply, address);
zmsg_addstr(reply, "OK");
zmsg_send(&reply, server);
}
if (zctx_interrupted)
printf("W: прервано\n");
zctx_destroy(&ctx);
return 0;
}
```Клиентский код:**flclient2: Freelance client, модель два на C**
```c
//
// Freelancer Mode - Client - Model 2
// Использует сокет DEALER для отправки пакетных сообщений
//
#include "czmq.h"
// Продолжительность таймаута
#define GLOBAL_TIMEOUT 2500
// Обертывает клиентский API в класс
#ifdef __cplusplus
extern "C" {
#endif
// Определяет структуру класса
typedef struct _flclient_t flclient_t;
flclient_t *
flclient_new(void);
void
flclient_destroy(flclient_t **self_p);
void
flclient_connect(flclient_t *self, char *endpoint);
zmsg_t *
flclient_request(flclient_t *self, zmsg_t **request_p);
#ifdef __cplusplus
}
#endif
int main(int argc, char *argv[]) {
if (argc == 1) {
printf("I: syntax: %s <endpoint> . . . \n", argv[0]);
exit(EXIT_SUCCESS);
}
// Создает клиента для режима фрилансера
flclient_t *client = flclient_new();
// Подключается к каждому конечной точке
int argn;
for (argn = 1; argn < argc; argn++) {
flclient_connect(client, argv[argn]);
}
// Отправляет набор запросов и фиксирует время
int requests = 10000;
uint64_t start = zclock_time();
while (requests--) {
zmsg_t *request = zmsg_new();
zmsg_addstr(request, "random name");
zmsg_t *reply = flclient_request(client, &request);
if (!reply) {
printf("E: Name resolution service unavailable, exiting\n");
break;
}
zmsg_destroy(&reply);
}
printf("Среднее время выполнения запроса: %d микросекунд\n",
(int)(zclock_time() - start) / 10);
flclient_destroy(&client);
return 0;
}
// --------------------------------------------------------------------
// Структура класса
struct _flclient_t {
zctx_t *ctx; // Контекст
void *socket; // Сокет DEALER для связи с сервером
size_t servers; // Количество подключенных серверов
uint sequence; // Количество отправленных запросов
};
// --------------------------------------------------------------------
// Конструктор
flclient_t *
flclient_new(void) {
flclient_t *self;
self = (flclient_t *)zmalloc(sizeof(flclient_t));
self->ctx = zctx_new();
self->socket = zsocket_new(self->ctx, ZMQ_DEALER);
return self;
}
``````markdown
// --------------------------------------------------------------------
// Деструктор
void
flclient_destroy(flclient_t **self_p) {
assert(self_p);
if (*self_p) {
flclient_t *self = *self_p;
zctx_destroy(&self->ctx);
}
free(self);
*self_p = NULL;
}
// --------------------------------------------------------------------
// Подключение к новому серверному конечной точке
void
flclient_connect(flclient_t *self, char *endpoint)
{
assert(self);
zsocket_connect(self->socket, endpoint);
self->servers++;
}
// --------------------------------------------------------------------
// Отправка запроса, получение ответа
// Уничтожение запроса после отправки
zmsg_t *
flclient_request(flclient_t *self, zmsg_t **request_p)
{
assert(self);
assert(*request_p);
zmsg_t *request = *request_p;
// Добавление номера и пустого фрейма к сообщению
char sequence_text[10];
sprintf(sequence_text, "%u", ++self->sequence);
zmsg_pushstr(request, sequence_text);
zmsg_pushstr(request, "");
// Отправка запроса всем подключенным серверам
int server;
for (server = 0; server < self->servers; server++) {
zmsg_t *msg = zmsg_dup(request);
zmsg_send(&msg, self->socket);
}
// Получение ответа от любого сервера
// Поскольку мы можем poll несколько раз, каждый раз выполняется вычисление
zmsg_t *reply = NULL;
uint64_t endtime = zclock_time() + GLOBAL_TIMEOUT;
while (zclock_time() < endtime) {
zmq_pollitem_t items[] = {{self->socket, 0, ZMQ_POLLIN, 0}};
zmq_poll(items, 1, (endtime - zclock_time()) * ZMQ_POLL_MSEC);
if (items[0].revents & ZMQ_POLLIN) {
// Содержимое ответа [пустое][последовательность][OK]
reply = zmsg_recv(self->socket);
assert(zmsg_size(reply) == 3);
free(zmsg_popstr(reply));
char *sequence = zmsg_popstr(reply);
int sequence_nbr = atoi(sequence);
free(sequence);
if (sequence_nbr == self->sequence)
break;
}
}
}
``` zmsg_destroy(request_p);
return reply;
}
```
```Несколько пояснений: * Клиентская часть упакована в класс API, который скрывает сложный код.
* Клиентская часть прекращает поиск доступного сервера через несколько секунд.
* Клиентская часть должна создать легитимный REP-конверт, поэтому требуется добавление пустого фрейма.
В программе клиент отправляет 10 000 запросов на разрешение имени (хотя они и являются вымышленными) и вычисляет среднее время выполнения. На моей тестовой машине, при наличии одного сервера, время составляет 60 микросекунд; при наличии трёх — 80 микросекунд.
Преимущества и недостатки этого подхода:
* Преимущество: простота, легкость понимания и реализации;
* Преимущество: быстрое выполнение с механизмом повторной попытки;
* Недостаток: использование дополнительной сетевой пропускной способности;
* Недостаток: невозможность установки приоритета для серверов, таких как основной и дополнительный сервисы;
* Недостаток: сервер не может одновременно обрабатывать несколько запросов.
#### Модель три - сложная и злоключная
Модель массового отправления кажется нереалистичной, поэтому давайте исследуем последнюю, крайне сложную модель. Возможно, после её реализации мы снова вернёмся к модели массового отправления, ха-ха, это моя обычная практика.Можно заменить используемые клиентом сокеты на ROUTER, чтобы иметь возможность отправлять запросы конкретному серверу, прекращать отправку запросов умершему серверу, тем самым делая процесс максимально умным. Также можно заменить сокеты сервера на ROUTER, чтобы преодолеть ограничение однопоточности.Однако соединение двух временных сокетов с помощью ROUTER-ROUTER невозможно, так как узел генерирует идентификатор сокета для другого узла только после получения первого сообщения. Единственный способ — использовать устойчивый сокет у одного из узлов; лучшим решением будет использование устойчивого сокета у клиента, который знает идентификатор сервера.
Чтобы избежать создания новых конфигурационных параметров, можно использовать конечную точку сервера в качестве идентификатора сокета.
Помните, как работают идентификаторы ZMQ-сокетов. ROUTER-сокет сервера устанавливает свой идентификатор (до привязки), когда клиент подключается, происходит обмен идентификаторами через рукопожатие. ROUTER-сокет клиента отправляет пустое сообщение, а сервер генерирует случайный UUID для клиента. Затем сервер отправляет свой идентификатор клиенту.
Таким образом, клиент может отправлять сообщения конкретному серверу. Однако остаётся проблема: неизвестно, когда сервер завершит этот процесс рукопожатия. Если сервер доступен, это может занять доли миллисекунды. Если нет, это может занять очень долгое время.Здесь есть противоречие: нам нужно знать, когда сервер успешно подключился и может начать работу. В режиме Freelancer (свободного агента) сервер должен сначала отправить запрос, чтобы получить ответ, в отличие от режима middleware. Поэтому клиент должен сначала запросить сервер, прежде чем сервер сможет отправить сообщение обратно клиенту, что кажется невозможным.У меня есть решение этой проблемы — это массовая отправка. Здесь отправляется не реальный запрос, а пробный пинг (PING-PONG). Когда получено подтверждение, значит, другой конец активен.
Давайте установим протокол, который определяет, как работает этот пинг в режиме Freelancer:
* http://rfc.zeromq.org/spec:10
Реализация этого протокола на стороне сервера довольно проста. Вот модифицированный echo-сервер:
**flserver3: Freelance server, Model Three in C**
```c
//
// Freelancer mode - Server - Model Three
// Communicates using ROUTER-ROUTER sockets; single-threaded.
//
#include "czmq.h"
int main (int argc, char *argv []) {
int verbose = (argc > 1 && streq (argv [1], "-v"));
zctx_t *ctx = zctx_new ();
// Подготовка серверного сокета, его идентификатор и конечная точка совпадают
char *bind_endpoint = "tcp://*:5555";
char *connect_endpoint = "tcp://localhost:5555";
void *server = zsocket_new (ctx, ZMQ_ROUTER);
zmq_setsockopt (server, ZMQ_IDENTITY, connect_endpoint, strlen (connect_endpoint));
zsocket_bind (server, bind_endpoint);
printf ("I: Сервер готов %s\n", bind_endpoint);
while (!zctx_interrupted) {
zmsg_t *request = zmsg_recv (server);
if (verbose && request)
zmsg_dump (request);
if (!request)
break; // Прерывание
}
``` // Frame 0: Идентификатор клиента
// Frame 1: Пинг или контрольная информация клиента
// Frame 2: Содержимое запроса
zframe_t *address = zmsg_pop(request);
zframe_t *control = zmsg_pop(request);
zmsg_t *reply = zmsg_new();
if (zframe_streq(control, "PONG")) {
zmsg_addstr(reply, "PONG");
} else {
zmsg_add(reply, control);
zmsg_addstr(reply, "OK");
}
zmsg_destroy(&request);
zmsg_push(reply, address);
if (verbose && reply) {
zmsg_dump(reply);
}
zmsg_send(&reply, server);
}
if (zctx_interrupted) {
printf("W: Прерывание\n");
} zctx_destroy(&ctx);
return 0;
}
```Однако клиентская часть в режиме фриланса будет больше. Для ясности реализацию можно разделить на два класса. Сначала рассмотрим верхний уровень программы:
**flclient3: Клиент фрилансера, модель 3 на C**
```c
//
// Режим фрилансера - клиент - модель 3
// Использует класс flcliapi для упаковки режима фрилансера
//
// Прямая компиляция, без создания библиотеки
#include "flcliapi.c"
int main(void)
{
// Создание экземпляра клиента фрилансера
flcliapi_t *client = flcliapi_new();
// Подключение к конечной точке сервера
flcliapi_connect(client, "tcp://localhost:5555");
flcliapi_connect(client, "tcp://localhost:5556");
flcliapi_connect(client, "tcp://localhost:5557");
// Отправка случайных запросов, вычисление времени
int requests = 1000;
uint64_t start = zclock_time();
while (requests--) {
zmsg_t *request = zmsg_new();
zmsg_addstr(request, "random name");
zmsg_t *reply = flcliapi_request(client, &request);
if (!reply) {
printf("E: Сервис разрешения имени недоступен, выход\n");
break;
}
zmsg_destroy(&reply);
}
printf("Среднее время выполнения: %d микросекунд\n",
(int)(zclock_time() - start) / 10);
flcliapi_destroy(&client);
return 0;
}
```
Далее представлено более сложное реализование этого режима:**flcliapi: API клиента фрилансера на C**
```c
/* =====================================================================
flcliapi - Freelance Pattern agent class
Model 3: uses ROUTER socket to address specific services
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "flcliapi.h"
// Timeout for requests
#define GLOBAL_TIMEOUT 3000 // milliseconds
// Interval for heartbeats
#define PING_INTERVAL 2000 // milliseconds
// Time to live for server
#define SERVER_TTL 6000 // milliseconds
// =====================================================================
// Synchronization section, runs at the application level
// ---------------------------------------------------------------------
// Class structure
struct _flcliapi_t {
zctx_t *ctx; // context
void *pipe; // socket used for communication with main thread
};
// ---------------------------------------------------------------------
// Constructor
flcliapi_t *
flcliapi_new(void)
{
flcliapi_t *self;
self = (flcliapi_t *) zmalloc(sizeof(flcliapi_t));
self->ctx = zctx_new();
self->pipe = zthread_fork(self->ctx, flcliapi_agent, NULL);
return self;
}
// ---------------------------------------------------------------------
// Destructor
void
flcliapi_destroy(flcliapi_t **self_p)
{
assert(self_p);
if (*self_p) {
flcliapi_t *self = *self_p;
zctx_destroy(&self->ctx); free(self); *self_p = NULL; } } // Подключение к новому серверному конечному пункту // Содержимое сообщения: [CONNECT][endpoint] void flcliapi_connect(flcliapi_t *self, char *endpoint) { assert(self); assert(endpoint); zmsg_t *msg = zmsg_new(); zmsg_addstr(msg, "CONNECT"); zmsg_addstr(msg, endpoint); zmsg_send(&msg, self->pipe); zclock_sleep(100); // Ожидание подключения } // --------------------------------------------------------------------- // Отправка запроса и его уничтожение, получение ответа zmsg_t * flcliapi_request(flcliapi_t *self, zmsg_t **request_p) { assert(self); assert(*request_p); zmsg_pushstr(*request_p, "REQUEST"); zmsg_send(request_p, self->pipe); zmsg_t *reply = zmsg_recv(self->pipe); if (reply) { char *status = zmsg_popstr(reply); if (streq(status, "FAILED")) zmsg_destroy(&reply); free(status); } return reply; } // ===================================================================== // Асинхронная часть, выполняется в фоновом режиме // --------------------------------------------------------------------- // Информация о единичном сервере typedef struct { char *endpoint; // Серверная конечная точка/идентификатор сокета uint alive; // В сети ли int64_t ping_at; // Время следующего пинга int64_t expires; // Время истечения } server_t; server_t * server_new(char *endpoint) { server_t *self = (server_t *)zmalloc(sizeof(server_t)); self->endpoint = strdup(endpoint); self->alive = 0; self->ping_at = zclock_time() + PING_INTERVAL; self->expires = zclock_time() + SERVER_TTL; return self; } void server_destroy(server_t **self_p) { assert(self_p); if (*self_p) { server_t *self = *self_p; free(self->endpoint); free(self); *self_p = NULL; } } int
server_ping(char *key, void *server, void *socket)
{
server_t *self = (server_t *)server;
if (zclock_time() >= self->ping_at) {
zmsg_t *ping = zmsg_new();
zmsg_addstr(ping, self->endpoint);
zmsg_addstr(ping, "PING");
zmsg_send(&ping, socket);
self->ping_at = zclock_time() + PING_INTERVAL;
}
return 0;
}
server_tickless(char *key, void *server, void *arg)
{
server_t *self = (server_t *) server;
uint64_t *tickless = (uint64_t *) arg;
if (*tickless > self->ping_at)
*tickless = self->ping_at;
return 0;
}
// ---------------------------------------------------------------------
// Information about background handlers
typedef struct {
zctx_t *ctx; // Context
void *pipe; // Socket for communication with the application
void *router; // Socket for communication with the server
zhash_t *servers; // Connected servers
zlist_t *actives; // Active servers
uint sequence; // Request number
zmsg_t *request; // Current request
zmsg_t *reply; // Current reply
int64_t expires; // Time of request expiration
} agent_t;
```agent_t *
agent_new (zctx_t *ctx, void *pipe)
{
agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
self->ctx = ctx;
self->pipe = pipe;
self->router = zsocket_new (self->ctx, ZMQ_ROUTER);
self->servers = zhash_new ();
self->actives = zlist_new ();
return self;
}
void
agent_destroy (agent_t **self_p)
{
assert (self_p);
if (*self_p) {
agent_t *self = *self_p;
zhash_destroy (&self->servers);
zlist_destroy (&self->actives);
zmsg_destroy (&self->request);
zmsg_destroy (&self->reply);
free (self);
*self_p = NULL;
}
}
// Вызывается при удалении сервера из списка.
static void
s_server_free (void *argument)
{
server_t *server = (server_t *) argument;
server_destroy (&server);
}```c
void
agent_control_message(agent_t *self)
{
zmsg_t *msg = zmsg_recv(self->pipe);
char *command = zmsg_popstr(msg);
if (streq(command, "CONNECT")) {
char *endpoint = zmsg_popstr(msg);
printf("I: connecting to %s. . . \n", endpoint);
int rc = zmq_connect(self->router, endpoint);
assert(rc == 0);
server_t *server = server_new(endpoint);
zhash_insert(self->servers, endpoint, server);
zhash_freefn(self->servers, endpoint, s_server_free);
zlist_append(self->actives, server);
server->ping_at = zclock_time() + PING_INTERVAL;
server->expires = zclock_time() + SERVER_TTL;
free(endpoint);
}
else if (streq(command, "REQUEST")) {
assert(!self->request); // Поддерживает цикл запрос-ответ
// Добавляет номер запроса и пустой кадр в начало сообщения
char sequence_text[10];
sprintf(sequence_text, "%u", ++self->sequence);
zmsg_pushstr(msg, sequence_text);
// Получает контроль над запросом
self->request = msg;
msg = NULL;
// Устанавливает время истечения запроса
self->expires = zclock_time() + GLOBAL_TIMEOUT;
}
free(command);
zmsg_destroy(&msg);
}
void
agent_router_message(agent_t *self)
{
zmsg_t *reply = zmsg_recv(self->router);
// Первый кадр содержит идентификатор сервера-отправителя
char *endpoint = zmsg_popstr(reply);
server_t *server =
(server_t *)zhash_lookup(self->servers, endpoint);
assert(server);
free(endpoint);
if (!server->alive) {
zlist_append(self->actives, server);
server->alive = 1;
}
server->ping_at = zclock_time() + PING_INTERVAL;
server->expires = zclock_time() + SERVER_TTL;
// Второй кадр содержит номер ответа
char *sequence = zmsg_popstr(reply);
if (atoi(sequence) == self->sequence) {
zmsg_pushstr(reply, "OK");
zmsg_send(&reply, self->pipe);
zmsg_destroy(&self->request);
} else {
zmsg_destroy(&reply);
}
}
// ---------------------------------------------------------------------
``````markdown
// Асинхронный агент в фоновом режиме поддерживает пулы серверов для обработки запросов и ответов.
static void
flcliapi_agent(void *args, zctx_t *ctx, void *pipe)
{
agent_t *self = agent_new(ctx, pipe);
zmq_pollitem_t items[] = {
{ self->pipe, 0, ZMQ_POLLIN, 0 },
{ self->router, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
// Вычисляет время до следующего тика
uint64_t tickless = zclock_time() + 1000 * 3600;
if (self->request && tickless > self->expires) {
tickless = self->expires;
}
zhash_foreach(self->servers, server_tickless, &tickless);
int rc = zmq_poll(items, 2, (tickless - zclock_time()) * ZMQ_POLL_MSEC);
if (rc == -1) {
break; // Объект контекста был закрыт
}
if (items[0].revents & ZMQ_POLLIN) {
agent_control_message(self);
}
if (items[1].revents & ZMQ_POLLIN) {
agent_router_message(self);
}
// Если нам нужно обработать запрос, отправляем его следующему доступному серверу
}
}
```
```markdown
if (self->request) {
if (zclock_time() >= self->expires) {
// Запрос истек
zstr_send(self->pipe, "FAILED");
zmsg_destroy(&self->request);
} else {
// Поиск доступного сервера
while (zlist_size(self->actives)) {
server_t *server = (server_t *)zlist_first(self->actives);
if (zclock_time() >= server->expires) {
zlist_pop(self->actives);
server->alive = 0;
} else {
zmsg_t *request = zmsg_dup(self->request);
zmsg_pushstr(request, server->endpoint);
zmsg_send(&request, self->router);
break;
}
}
}
}
// Отключение и удаление просроченных серверов
```
// Отправка пинга свободным серверам
zhash_foreach (self->servers, server_ping, self->router);
}
agent_destroy (&self);
}
```
Эта группа API использует сложные механизмы, которые мы уже использовали ранее:**Асинхронный фоновый агент**
Клиентская API состоит из двух частей: синхронного класса `flcliapi`, работающего в потоке приложения, и асинхронного класса `agent`, работающего в фоновом потоке. Классы `flcliapi` и `agent` взаимодействуют через inproc сокет. Все, что связано с ZMQ, заключено внутри API. Класс `agent` фактически выполняется как мини-агент, отвечающий за коммуникацию с сервером в фоновом режиме. Он пытается подключиться к серверу для обработки запросов каждый раз, когда мы отправляем запрос.
**Механизм ожидания соединения**
Одной из особенностей ROUTER сокета является то, что он немедленно отбрасывает сообщения, которые не могут быть маршрутизированы. Это означает, что если вы сразу отправите сообщение после установления ROUTER-ROUTER соединения с сервером, это сообщение будет потеряно. Класс `flcliapi` задерживает отправку сообщения на некоторое время. В последующих коммуникациях, поскольку серверный сокет является постоянным, клиент больше не отбрасывает сообщения.
**Пинговая тишина**0MQ будет хранить сообщения для недоступного сервера бесконечно. Поэтому, если клиент будет повторно отправлять PING-сообщения недоступному серверу, когда этот сервер снова станет доступным, он получит множество PING-сообщений одновременно. Вместо того чтобы продолжать отправлять PING-сообщения серверу, который известно, что он недоступен, мы полагаемся на обработку 0MQ постоянных сокетов для доставки старых PING-сообщений, когда сервер снова станет доступным. Как только сервер восстановит соединение, он получит PING-сообщения от всех подключенных к нему клиентов, ответит PONG, и эти клиенты узнают, что сервер снова доступен.**Настройка времени ожидания**
В предыдущих примерах программ мы обычно задавали фиксированное время ожидания для опроса (например, 1 секунду). Этот подход прост, но для устройств, чувствительных к энергопотреблению (например, ноутбуков или смартфонов), пробуждение процессора требует дополнительной энергии. Поэтому, чтобы сэкономить энергию или просто поиграться, мы настроили время ожидания так, чтобы опрос завершался только при достижении времени истечения. Это позволяет сократить количество опросов. Мы можем хранить время истечения в списке для удобства проверки.
### Заключение
В этой главе мы рассмотрели много надежных механизмов запрос-ответ, каждая из которых имеет свои преимущества и недостатки. Большинство примеров кода можно использовать непосредственно в производственной среде, хотя они могут быть ещё более оптимизированы. Есть два основных типа моделей: модель с использованием middleware и модель без использования middleware.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )