В главах 3 и 4 были рассмотрены некоторые продвинутые методы использования модели запрос-ответ в ZMQ. Если вы уже полностью поняли эти концепции, поздравляем вас. В этой главе мы будем сосредоточены на паб-саб модели, используя верхнеуровневые модели для улучшения производительности, надежности, синхронизации состояния и безопасности в ZMQ.
Основные темы, рассматриваемые в этой главе:
При использовании паб-саб модели одной из наиболее распространенных проблем является обработка медленных подписчиков. Идеально, если издатель может отправлять сообщения подписчикам со скоростью, которую они могут обрабатывать, но на практике подписчики могут требовать значительное время для обработки сообщений или просто недостаточно эффективны, чтобы следовать за издателем.
Как обрабатывать медленных подписчиков? Лучшим подходом было бы сделать подписчика более эффективным, но это потребует дополнительных усилий. Вот несколько способов решения проблемы медленных подписчиков:* Хранение сообщений на стороне издателя. Это то, что делает Gmail, если вы не прочитали свои электронные письма в течение нескольких часов, они будут сохранены. Однако, в высокопроизводительных приложениях накопление сообщений издателем часто приводит к переполнению памяти и последующему отказу. Особенно это становится проблемой, когда есть несколько подписчиков или невозможно использовать диск как буфер.
Хранение сообщений на стороне подписчика. Этот подход лучше, так как это по умолчанию поведение ZMQ. Если кто-то столкнется с переполнением памяти, это будет подписчик, а не издатель, что кажется справедливым. Однако этот подход имеет смысл только для приложений, где мгновенная нагрузка очень велика, подписчики временно не могут обработать все сообщения, но в конечном итоге догонят. Но это не решает проблему медленных подписчиков.
Приостановка отправки сообщений. Это также то, что делает Gmail, если объем вашего почтового ящика превышает 7,554 ГБ, новые письма будут отклонены или потеряны. Этот подход полезен для издателя, ZMQ также по умолчанию использует пороговое значение (HWM) для ограничения количества сообщений. Однако это не решает проблему медленных подписчиков, мы просто делаем сообщения нерегулярными.* Отключение связи с переполненным подписчиком. Это то, что делает Hotmail, если вы не входили в систему в течение двух недель, ваш аккаунт будет отключен. Однако такой подход невозможен в ZMQ, так как подписчики невидимы для издателя, и нет возможности для издателя реагировать на это. Кажется, что ни один из классических подходов не удовлетворяет наши потребности, поэтому нам придется придумать что-то новое. Мы можем заставить подписчика "самоубиться", а не просто отключиться. Это называется "самоубийца-улитка" режим. Когда подписчик обнаруживает, что работает слишком медленно (определение "слишком медленно" должно быть конфигурационной опцией, и когда это происходит, он должен сообщить программисту об этом), он издает стон и затем самоубивается.
Как подписчик может обнаружить, что он работает слишком медленно? Один способ — присваивать номера сообщениям и устанавливать пороговое значение на стороне публикации. Когда подписчик замечает, что номера сообщений не являются последовательными, он понимает, что что-то не так. Этот пороговый уровень будет значением, при котором подписчик решится самоубиться.Этот подход имеет два недостатка: 1) если мы подключены к нескольким публикаторам, как мы будем присваивать номера сообщениям? Решение состоит в том, чтобы присваивать уникальный номер каждому публикатору, который станет частью номера сообщения. 2) Если подписчик использует опцию ZMQ_SUBSCRIBE для фильтрации сообщений, наш тщательно спроектированный механизм присвоения номеров сообщений окажется бесполезным.В некоторых случаях сообщения не будут фильтроваться, поэтому присвоение номеров сообщений всё ещё будет работать. Однако более универсальное решение заключается в том, чтобы публикаторы помечали свои сообщения временными метками, а подписчики проверяли эти метки времени при получении сообщений. Если разница между этими метками времени достигнет определённого значения, подписчик должен издать предупреждение и самоуничтжиться.
Режим "самоубийца-улитка" будет особенно полезен, когда подписчики имеют собственные клиентские или сервисные соглашения, требующие максимального допустимого времени задержки. Отключение подписчика может не быть идеальным решением, но по крайней мере, это не вызовет дополнительных проблем. Если подписчик получит устаревшие данные, это может привести к дальнейшему повреждению данных и сделать их труднодоступными для обнаружения.
Вот простейшая реализация режима "самоубийца-улитка":suisnail: Самоубийца-улитка на Cc // // Самоубийца-улитка режим // #include "czmq.h" // --------------------------------------------------------------------- // Подписчик подключается к публишеру, получает все сообщения, // во время работы он временно приостанавливается, имитируя сложные вычисления, // когда он обнаруживает задержку в получении сообщений более 1 секунды, он самоуничтожается. #define MAX_ALLOWED_DELAY 1000 // миллисекунд static void subscriber (void *args, zctx_t *ctx, void *pipe) { // Подписаться на все сообщения void *subscriber = zsocket_new (ctx, ZMQ_SUB); zsocket_connect (subscriber, "tcp://localhost:5556"); // Получение и обработка сообщений while (1) { char *string = zstr_recv (subscriber); int64_t clock; int terms = sscanf (string, "%" PRId64, &clock); assert (terms == 1); free (string); // Логика самоуничтожения if (zclock_time () - clock > MAX_ALLOWED_DELAY) { fprintf (stderr, "E: Подписчик не может следовать за публишером, самоуничтожение\n"); break; } // Работа некоторое время zclock_sleep (1 + randof (2)); } zstr_send (pipe, "Подписчик остановлен"); } // --------------------------------------------------------------------- // Публишер отправляет сообщение с меткой времени каждую миллисекунду static void publisher (void *args, zctx_t *ctx, void *pipe) { // Подготовка публишера void *publisher = zsocket_new (ctx, ZMQ_PUB); zsocket_bind (publisher, "tcp://*:5556"); while (1) { // Отправка текущего времени (миллисекунды) подписчику char string [20]; sprintf (string, "%" PRId64, zclock_time ()); zstr_send (publisher, string); char *signal = zstr_recv_nowait (pipe); if (signal) { free (signal); break; } }
zclock_sleep(1); // Ожидание 1 миллисекунды
}
}
// Ниже приведён код для запуска подписчика и публишера, который останавливается, когда подписчик умирает
//
int main(void)
{
zctx_t *ctx = zctx_new();
void *pubpipe = zthread_fork(ctx, publisher, NULL);
void *subpipe = zthread_fork(ctx, subscriber, NULL);
free(zstr_recv(subpipe));
zstr_send(pubpipe, "break");
zclock_sleep(100);
zctx_destroy(&ctx);
return 0;
}
* В примере программа публикатор и подписчик являются двумя потоками одного процесса. В реальных приложениях они должны быть двумя отдельными процессами. В примере это сделано для удобства демонстрации.
### Быстрый подписчик (чёрный ящик)
Одним из типичных сценариев применения паттерна публикация-подписка является масштабируемое распределённое обработание данных. Например, если требуется обрабатывать данные, собранные с бирж, можно установить публикатора в системе торговли ценными бумагами для получения информации о ценах и отправки её группе подписчиков. Если у нас много подписчиков, мы можем использовать TCP. Если количество подписчиков достигнет определённого уровня, следует использовать надёжные протоколы широковещательной передачи, такие как PGM.Предположим, наш публикатор генерирует 100 000 сообщений размером 100 байт каждую секунду. Даже после удаления ненужной информации рынка, этот темп остаётся вполне разумным. Теперь нам нужно записать данные за день (около 8 часов, что составляет около 250 ГБ) и передать их в симуляцию сети, то есть в группу подписчиков. Хотя 100 000 сообщений в секунду легко обрабатывает ZMQ, нам требуется более высокая скорость.Предположим, у нас есть несколько машин, одна из которых работает как публикатор, а остальные — как подписчики. Все машины имеют 8 ядер, а у машины-публиликатора — 12 ядер.
Когда мы начинаем отправлять сообщения, стоит обратить внимание на следующие моменты:
1. Даже обрабатывая небольшие объемы данных, подписчики могут не успевать за скоростью публикатора;
1. При обработке данных со скоростью 6 Мбит/с публикатор и подписчики могут достигнуть своих пределов.
Сначала нам нужно спроектировать подписчика как многопоточное приложение, чтобы мы могли читать сообщения в одном потоке и использовать другие потоки для их обработки. Обычно для каждого типа сообщений используется свой способ обработки. Таким образом, подписчик может фильтровать полученные сообщения, используя заголовочные данные. Когда сообщение удовлетворяет определённым условиям, подписчик передаёт его рабочему потоку для обработки. В терминологии ZMQ подписчик передаёт сообщение рабочему потоку для обработки.
Таким образом, подписчик выглядит как устройство очереди, которое можно подключить к устройству очереди и рабочим потокам различными способами. Например, мы можем создать одностороннюю связь, где каждый рабочий поток одинаков, используя PUSH и PULL сокеты, а распределение работы оставляем на ZMQ. Это самый простой и быстрый способ.Связь между подписчиками и публикациями осуществляется с помощью протоколов TCP или PGM, а связь между подписчиками и worker'ами происходит в рамках одного процесса, поэтому используется протокол inproc.
Давайте рассмотрим, как преодолеть ограничения производительности. Поскольку подписчики являются однопоточными, они не могут использовать другие ядра процессора, когда загрузка ЦП достигает 100%. Однопоточные программы всегда сталкиваются с ограничениями, независимо от того, сколько данных они обрабатывают. Нам нужно распределить нагрузку по нескольким потокам и выполнять её параллельно.
Многие высокопроизводительные системы используют метод шардинга, то есть разделение работы на независимые параллельные потоки. Например, половина тематических данных может передаваться одним потоком, а другая половина — другим. Мы можем создать больше потоков, но если количество ядер процессора остаётся неизменным, это не имеет смысла.
Рассмотрим, как разделить работу на два потока:

Чтобы обеспечить полную загрузку двух потоков, ZMQ следует настроить следующим образом: * Использование двух I/O потоков вместо одного;
* Использование двух независимых сетевых интерфейсов;
* Каждый I/O поток привязывается к одному сетевому интерфейсу;
* Два потока подписчиков, каждый из которых привязывается к одному ядру процессора;
* Использование двух SUB сокетов;
* Оставшиеся ядра процессора предназначены для worker'ов;
* Worker потоки привязываются к PUSH сокетам двух потоков подписчиков.Количество создаваемых потоков должно совпадать с количеством ядер процессора. Если количество потоков превышает количество ядер, скорость обработки будет снижена. Кроме того, использование нескольких I/O потоков также не требуется.
### Общий ключ-значение кэш (клонирование)
Публикация-подписка похожа на радиовещание: вы не знаете, какие сообщения были отправлены до того, как начали слушать, и количество полученных сообщений зависит от вашей способности принимать их. Удивительно, что эта модель, которая соответствует требованиям многих инженеров, стала лучшим механизмом для распространения сообщений в реальной жизни. Подумайте о таких приложениях, как Weibo, Twitter, BBS новости, спортивные новости и т.д.
Однако в некоторых случаях надёжная публикация-подписка также имеет значение. Как мы обсуждали запрос-ответ модели, мы будем определять "надёжность" по "сбоям", вот некоторые из возможных сбоев в публикации-подписке модели:
* Подписчики подключаются слишком медленно, чтобы получить первоначальные сообщения от публикатора;
* Подписчики работают слишком медленно, чтобы не потерять сообщения;
* Подписчики могут отключиться, и сообщения, отправленные во время отключения, будут потеряны.
Ещё有一些不太常见的场景,但我们仍然会遇到它们: * Подписчик может выйти из строя и перезапуститься, потеряв все полученные сообщения;
* Подписчик может обрабатывать сообщения слишком медленно, что приводит к накоплению и переполнению очереди;
* Сообщения могут теряться из-за перегрузки сети (особенно при использовании протокола PGM);
* Низкая скорость интернета может привести к переполнению очереди на стороне публикатора, что в свою очередь вызовет его сбой.
На самом деле, существуют и другие возможные проблемы, но вышеупомянутые являются наиболее типичными в реальных приложениях.
У нас уже есть решения для некоторых из этих проблем, например, для медленных подписчиков можно использовать режим "самоубийства" улитки. Однако для других проблем нам потребуется повторно используемая архитектура для создания надежной модели публикации-подписки.
Основная сложность заключается в том, что мы не знаем, как конкретное приложение будет обрабатывать эти данные. Оно может фильтровать сообщения, обрабатывать только часть из них, записывать их для будущего использования или передавать дальше своим подчиненным worker'ам. Ситуаций слишком много, и каждая из них требует своей формы надежности.Поэтому мы абстрагируем проблему таким образом, чтобы она могла использоваться различными приложениями. Эту абстракцию мы называем общедоступным ключевым кэшем, который хранит бинарные данные по уникальным ключам.Не путайте эту абстракцию с распределенными хеш-таблицами, которые используются для соединения узлов в распределенной сети; также не путайте её с распределенными ключ-значение таблицами, которые больше похожи на NoSQL базы данных. Мы создаём приложение, которое надёжно передаёт состояние из памяти группе клиентов, и это должно выполнять следующие задачи:
* Клиенты могут присоединяться к сети в любое время и получать текущее состояние сервера;
* Любой клиент может изменять ключевой кэш (добавлять, обновлять, удалять);
* Эти изменения должны надёжно и с минимальной задержкой распространяться ко всем клиентам;
* Обработка сотен и тысяч клиентов должна быть возможной.
Ключевой момент в режиме клонирования состоит в том, что клиенты будут обратно взаимодействовать с сервером, что не является обычным для простой модели публикации-подписки. Поэтому здесь я использую слова "сервер" и "клиент", а не "публикатор" и "подписчик". Мы будем использовать модель публикации-подписки как основную модель сообщений, но также будем использовать и другие модели.
#### Распределение событий обновления ключейМы будем внедрять режим клонирования поэтапно. Сначала рассмотрим, как отправлять события обновления ключей от сервера ко всем клиентам. Мы модифицируем модель погодного сервиса из первой главы, чтобы отправлять информацию в виде пар ключ-значение и позволяем клиентам хранить её в виде хэш-таблицы:Вот код сервера:
**clonesrv1: Клон-сервер, модель 1 на C**
```c
//
// Клон-сервер, модель 1
//
// Начнем с компиляции без создания библиотеки
#include "kvsimple.c"
int main (void)
{
// Подготовка контекста и публикационного сокета
zctx_t *ctx = zctx_new ();
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5556");
zclock_sleep (200);
zhash_t *kvmap = zhash_new ();
int64_t sequence = 0;
srandom ((unsigned) time (NULL));
while (!zctx_interrupted) {
// Распределение сообщений с помощью ключ-значение пар
kvmsg_t *kvmsg = kvmsg_new (++sequence);
kvmsg_fmt_key (kvmsg, "%d", randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_store (&kvmsg, kvmap);
}
printf ("Прервано\nОтправлено %lld сообщений\n", (long long) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
Вот код клиента:
clonecli1: Клон-клиент, модель 1 на C
//
// Клон-клиент, модель 1
//
// Начнем с компиляции без создания библиотеки
#include "kvsimple.c"
int main (void)
{
// Подготовка контекста и подписного сокета
zctx_t *ctx = zctx_new ();
void *updates = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (updates, "tcp://localhost:5556");
zhash_t *kvmap = zhash_new ();
int64_t sequence = 0;
while (TRUE) {
kvmsg_t *kvmsg = kvmsg_recv (updates);
if (!kvmsg)
break; // Прерывание
kvmsg_store (&kvmsg, kvmap);
sequence++;
}
printf ("Прервано\nПолучено %lld сообщений\n", (long long) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
```Несколько замечаний:
* Все сложные операции выполняются в классе kvmsg, который обрабатывает объекты сообщений типа ключ-значение. На самом деле это ZMQ-сообщение с несколькими фреймами, содержащее три фрейма: ключ (ZMQ-строка), номер (64 бита, в порядке байт) и двоичное тело (хранящее все дополнительные данные).
* Сервер случайным образом генерирует сообщения, используя четырёхзначные числа в качестве ключей, что позволяет имитировать большое количество, но не избыточное количество записей хеш-таблицы (10 000 записей).
* После того как сервер привязывает сокет, он ожидает 200 миллисекунд, чтобы избежать потери данных из-за задержек подключения подписчиков. Мы решаем эту проблему в последующих моделях.
* Мы используем названия "публикация" и "подписка" для сокетов, используемых в программе, чтобы избежать путаницы с другими сокетами в последующих моделях. Вот упрощённый код класса kvmsg на C:
**kvsimple: Класс key-value message в C**```c
/* =====================================================================
kvsimple - простой класс для работы с ключ-значением в примерах приложений
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
Этот файл является частью руководства по ZeroMQ: http://zguide.zeromq.org
Это свободное программное обеспечение; вы можете распространять его и/или изменять его в соответствии с
``` условиями Генеральной общественной лицензии GNU версии 3, либо (по вашему выбору) любой более поздней версии.
Это программное обеспечение распространяется в надежде, что оно будет полезным, но
БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ; даже без скрытых гарантий товарной пригодности или пригодности для определенного назначения. См. Генеральную общественную лицензию GNU для более подробной информации.
Вы должны были получить копию Генеральной общественной лицензии GNU вместе с этим программным обеспечением. Если нет, см.
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "kvsimple.h"
#include "zlist.h"
// Ключ представляет собой короткую строку
#define KVMSG_KEY_MAX 255
// Сообщение форматируется в три фрейма
// frame bk 0: ключ (ZMQ строка)
// frame 1: номер (8 байт, в порядке возрастания)
// frame 2: содержимое (блок двоичных данных)
#define FRAME_KEY 0
#define FRAME_SEQ 1
#define FRAME_BODY 2
#define KVMSG_FRAMES 3
// Структура класса
struct _kvmsg {
// Фрейм существует в сообщении
int present[KVMSG_FRAMES];
// Соответствующий ZMQ фрейм сообщения
zmq_msg_t frame[KVMSG_FRAMES];
// Преобразование ключа в C строку
char key[KVMSG_KEY_MAX + 1];
};
// ---------------------------------------------------------------------
// Конструктор, устанавливает номер
kvmsg_t *
kvmsg_new(int64_t sequence)
{
kvmsg_t
*self;
self = (kvmsg_t *) zmalloc(sizeof(kvmsg_t));
kvmsg_set_sequence(self, sequence);
return self;
}
// ---------------------------------------------------------------------
// Деструктор```markdown
// Освобождает фреймы сообщения, доступен для zhash_freefn() функции
void
kvmsg_free(void *ptr)
{
if (ptr) {
kvmsg_t *self = (kvmsg_t *) ptr;
// Уничтожает фреймы сообщения
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
if (self->present[frame_nbr])
zmq_msg_close(&self->frame[frame_nbr]);
// Освобождает объект
free(self);
}
}
void
kvmsg_destroy(kvmsg_t **self_p)
{
assert(self_p);
if (*self_p) {
kvmsg_free(*self_p);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Читает ключ-значение сообщение с сокета, возвращает экземпляр kvmsg
kvmsg_t *
kvmsg_recv(void *socket)
{
assert(socket);
kvmsg_t *self = kvmsg_new(0);
// Читает все кадры, при ошибке уничтожает объект
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present[frame_nbr])
zmq_msg_close(&self->frame[frame_nbr]);
zmq_msg_init(&self->frame[frame_nbr]);
self->present[frame_nbr] = 1;
if (zmq_recvmsg(socket, &self->frame[frame_nbr], 0) == -1) {
kvmsg_destroy(&self);
break;
}
// Проверяет мультифреймовое сообщение
int rcvmore = (frame_nbr < KVMSG_FRAMES - 1) ? 1 : 0;
if (zsockopt_rcvmore(socket) != rcvmore) {
kvmsg_destroy(&self);
break;
}
}
return self;
}
// ---------------------------------------------------------------------
// Отправляет ключ-значение сообщение на сокет, не проверяет содержимое кадров
void
kvmsg_send(kvmsg_t *self, void *socket)
{
assert(self);
assert(socket);
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
zmq_msg_t copy;
zmq_msg_init(©);
if (self->present[frame_nbr])
zmq_msg_copy (©, &self->frame [frame_nbr]);
zmq_sendmsg (socket, ©,
(frame_nbr < KVMSG_FRAMES - 1) ? ZMQ_SNDMORE : 0);
zmq_msg_close (©);
}
}
// ---------------------------------------------------------------------
// Получает ключ-значение из сообщения, если отсутствует, возвращает NULL
char *
kvmsg_key (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_KEY]) {
if (! *self->key) {
size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
if (size > KVMSG_KEY_MAX)
size = KVMSG_KEY_MAX;
memcpy (self->key,
zmq_msg_data (&self->frame [FRAME_KEY]), size);
self->key [size] = 0;
}
return self->key;
} else {
return NULL;
}
}
// ---------------------------------------------------------------------
// Возвращает номер сообщения
int64_t
kvmsg_sequence (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_SEQ]) {
assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
int64_t sequence = ((int64_t) (source [0]) << 56)
+ ((int64_t) (source [1]) << 48)
+ ((int64_t) (source [2]) << 40)
+ ((int64_t) (source [3]) << 32)
+ ((int64_t) (source [4]) << 24)
+ ((int64_t) (source [5]) << 16)
+ ((int64_t) (source [6]) << 8)
+ (int64_t) (source [7]);
return sequence;
} else {
return 0;
}
}
// ---------------------------------------------------------------------
// Возвращает содержимое сообщения, если оно отсутствует, возвращает NULL
byte *
kvmsg_body (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
else {
return NULL;
}
}
// ---------------------------------------------------------------------
// Возвращает размер содержимого сообщения
size_t
kvmsg_size (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return zmq_msg_size (&self->frame [FRAME_BODY]);
else
return 0;
}
// ---------------------------------------------------------------------
// Sets the message key
void
kvmsg_set_key (kvmsg_t *self, char *key)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_KEY];
if (self->present [FRAME_KEY])
zmq_msg_close (msg);
zmq_msg_init_size (msg, strlen (key));
memcpy (zmq_msg_data (msg), key, strlen (key));
self->present [FRAME_KEY] = 1;
}
// ---------------------------------------------------------------------
// Sets the message sequence number
void
kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_SEQ];
if (self->present [FRAME_SEQ])
zmq_msg_close (msg);
zmq_msg_init_size (msg, 8);
byte *source = zmq_msg_data (msg);
source [0] = (byte) ((sequence >> 56) & 255);
source [1] = (byte) ((sequence >> 48) & 255);
source [2] = (byte) ((sequence >> 40) & 255);
source [3] = (byte) ((sequence >> 32) & 255);
source [4] = (byte) ((sequence >> 24) & 255);
source [5] = (byte) ((sequence >> 16) & 255);
source [6] = (byte) ((sequence >> 8) & 255);
source [7] = (byte) ((sequence) & 255);
self->present [FRAME_SEQ] = 1;
}
// ---------------------------------------------------------------------
// Sets the message body
void
kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
{
assert (self);
zmq_msg_t *message = &self->frame [FRAME_BODY];
if (self->present [FRAME_BODY])
zmq_msg_close (message);
self->present [FRAME_BODY] = 1;
zmq_msg_init_size (message, size);
memcpy (zmq_msg_data (message), body, size);
}
// ---------------------------------------------------------------------
// Использует printf() для форматирования ключа сообщения
void
kvmsg_fmt_key (kvmsg_t *self, char *формат, ...)
{
char значение [KVMSG_KEY_MAX + 1];
va_list аргументы;
assert (self);
va_start (аргументы, формат);
vsnprintf (значение, KVMSG_KEY_MAX, формат, аргументы);
va_end (аргументы);
kvmsg_set_key (self, значение);
}
// ---------------------------------------------------------------------
// Использует sprintf() для форматирования содержимого сообщения
void
kvmsg_fmt_body (kvmsg_t *self, char *формат, ...)
{
char значение [255 + 1];
va_list аргументы;
assert (self);
va_start (аргументы, формат);
vsnprintf (значение, 255, формат, аргументы);
va_end (аргументы);
kvmsg_set_body (self, (byte *) значение, strlen (значение));
}
// ---------------------------------------------------------------------
// Если ключ и содержимое kvmsg структуры существуют, то они сохраняются в хеш-таблице;
// если kvmsg структура больше не используется, она автоматически уничтожается и освобождается.
void
kvmsg_store (kvmsg_t **self_p, zhash_t *хеш)
{
assert (self_p);
if (*self_p) {
kvmsg_t *self = *self_p;
assert (self);
if (self->присутствует [FRAME_KEY]
&& self->присутствует [FRAME_BODY]) {
zhash_update (хеш, kvmsg_key (self), self);
zhash_freefn (хеш, kvmsg_key (self), kvmsg_free);
}
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Выводит содержимое сообщения на стандартный поток ошибок для отладки и отслеживания
void
kvmsg_dump (kvmsg_t *self)
{
if (self) {
if (!self) {
fprintf (stderr, "NULL");
return;
}
}
}
size_t размер = kvmsg_size (self);
byte *тело = kvmsg_body (self);
fprintf(stderr, "[seq:%" PRId64 "]", kvmsg_sequence(self));
fprintf(stderr, "[key:%s]", kvmsg_key(self));
fprintf(stderr, "[size:%zd] ", размер);
int char_nbr;
for (char_nbr = 0; char_nbr < размер; char_nbr++) {
fprintf(stderr, "%02X", тело[char_nbr]);
}
fprintf(stderr, "\n");
} else {
fprintf(stderr, "NULL message\n");
}
}
// ---------------------------------------------------------------------
// Тестовые примеры
int
kvmsg_test(int verbose) {
kvmsg_t *сообщение;
printf(" * сообщение: ");
// Подготовка контекста и сокета
zctx_t *контекст = zctx_new();
void *выход = zsocket_new(контекст, ZMQ_DEALER);
int rc = zmq_bind(выход, "ipc://kvmsg_selftest.ipc");
assert(rc == 0);
void *вход = zsocket_new(контекст, ZMQ_DEALER);
rc = zmq_connect(вход, "ipc://kvmsg_selftest.ipc");
assert(rc == 0);
zhash_t *карта_сообщений = zhash_new();
// Тестирование отправки и получения простого сообщения
сообщение = kvmsg_new(1);
kvmsg_set_key(сообщение, "ключ");
kvmsg_set_body(сообщение, (byte *)"тело", 4);
if (verbose) {
kvmsg_dump(сообщение);
}
kvmsg_send(сообщение, выход);
kvmsg_store(&сообщение, карта_сообщений);
сообщение = kvmsg_recv(вход);
if (verbose) {
kvmsg_dump(сообщение);
}
assert(streq(kvmsg_key(сообщение), "ключ"));
kvmsg_store(&сообщение, карта_сообщений);
// Закрытие и удаление всех объектов
zhash_destroy(&карта_сообщений);
zctx_destroy(&контекст);
printf("OK\n");
return 0;
}
Мы создадим более полный класс `kvmsg`, который можно использовать в реальных условиях.
```Клиенты и серверы будут поддерживать хэш-таблицы, но эта модель требует, чтобы все клиенты запускались раньше сервера и не могли потерпеть сбои, что очевидно не соответствует требованиям надёжности.
#### Создание снимков
Чтобы последующие подключения (или восстановление после сбоев) клиентов могли получить информацию о состоянии сервера, необходимо, чтобы они получали снимок при подключении. Как мы упростили понятие "сообщение" до "номерованной пары ключ-значение", так и можно упростить понятие "состояния" до "хэш-таблицы". Для получения состояния сервера клиент открывает REQ сокет для запроса:

Нам следует учитывать время, поскольку создание снимка занимает некоторое время, и нам нужно знать, с какого события обновления следует начать обновление снимка. Сервер не знает, когда происходит событие обновления. Один способ — сначала начать подписываться на сообщения, а затем, получив первое сообщение, запросить серверу "отправить все до этого события обновления". Однако это означает, что сервер должен сохранять снимок для каждого события обновления, что явно нереалистично.
Поэтому, мы будем использовать следующий метод синхронизации на стороне клиента:* Клиент начинает подписываться на события обновления сервера, а затем запрашивает снимок. Это гарантирует, что снимок был создан после последнего обновления.* Клиент начинает ожидать снимок от сервера и сохраняет события обновления в очереди. Это просто означает, что не следует читать сообщения из сокета; ZMQ автоматически сохраняет эти сообщения, и здесь не следует устанавливать пороговое значение (HWM).
* Когда клиент получает снимок, он снова начинает читать события обновления, но должен игнорировать те, которые произошли до времени создания снимка. Например, если снимок был создан после 200 событий обновления, клиент будет читать события обновления, начиная с 201.
* Затем клиент использует события обновления для обновления своего состояния.
Это довольно простая модель, так как она использует механизм очередей сообщений ZMQ. Код сервера выглядит следующим образом:
**clonesrv2: Клонирующий сервер, Модель 2 на C**```c
//
// Клонирование модели - серверная часть - модель 2
//
// Давайте сразу скомпилируем, не создавая библиотеки
#include "kvsimple.c"
static int s_send_single(char *key, void *data, void *args);
static void state_manager(void *args, zctx_t *ctx, void *pipe);
int main(void)
{
// Подготовка сокетов и контекста
zctx_t *ctx = zctx_new();
void *publisher = zsocket_new(ctx, ZMQ_PUB);
zsocket_bind(publisher, "tcp://*:5557");
int64_t sequence = 0;
srandom((unsigned)time(NULL));
// Запуск менеджера состояния и ожидание сигнала готовности
void *updates = zthread_fork(ctx, state_manager, NULL);
free(zstr_recv(updates));
}
``` while (!zctx_interrupted)
{
// Distribution of key-value messages
kvmsg_t *kvmsg = kvmsg_new(++sequence);
kvmsg_fmt_key(kvmsg, "%d", randof(10000));
kvmsg_fmt_body(kvmsg, "%d", randof(1000000));
kvmsg_send(kvmsg, publisher);
kvmsg_send(kvmsg, updates);
kvmsg_destroy(&kvmsg);
} printf("Прервано\nОтправлено %d сообщений\n", (int)sequence);
zctx_destroy(&ctx);
return 0;
}
// Информация о стороннем запросе снимка
typedef struct
{
void *socket; // Сокет ROUTER для отправки снимков
zframe_t *identity; // Идентификатор запросчика
} kvroute_t;
// Отправка отдельной пары ключ-значение в снимке
// Используется объект kvmsg как носитель
static int
s_send_single(char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *)args;
// Сначала отправляем идентификатор получателя
zframe_send(&kvroute->identity,
kvroute->socket, ZFRAME_MORE | ZFRAME_REUSE);
kvmsg_t *kvmsg = (kvmsg_t *)data;
kvmsg_send(kvmsg, kvroute->socket);
return 0;
}
// Этот поток поддерживает состояние сервера и обрабатывает запросы на снимки.
static void
state_manager(void *args, zctx_t *ctx, void *pipe)
{
zhash_t *kvmap = zhash_new();
zstr_send(pipe, "READY");
void *snapshot = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(snapshot, "tcp://*:5556");
zmq_pollitem_t items[] = {
{ pipe, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};```markdown
int64_t sequence = 0; // Текущий номер версии снимка
while (!zctx_interrupted)
{
int rc = zmq_poll(items, 2, -1);
if (rc == -1 && errno == ETERM)
break; // Прерывание контекста
// Ожидание событий обновления от основного потока
if (items[0].revents & ZMQ_POLLIN)
{
kvmsg_t *kvmsg = kvmsg_recv(pipe);
if (!kvmsg)
break; // Прерывание
sequence = kvmsg_sequence(kvmsg);
kvmsg_store(&kvmsg, kvmap);
}
// Выполнение запроса на снимок
if (items[1].revents & ZMQ_POLLIN)
{
zframe_t *identity = zframe_recv(snapshot);
if (!identity)
break; // Прерывание
// Содержимое запроса находится во второй фрейме
char *request = zstr_recv(snapshot);
if (streq(request, "ICANHAZ? "))
{
free(request);
}
else
{
printf("E: Неправильный запрос, программа прервана\n");
break;
}
// Отправка снимка клиенту
kvroute_t routing = { snapshot, identity };
// Отправка каждого элемента отдельно
zhash_foreach(kvmap, s_send_single, &routing);
// Отправка завершающего идентификатора, содержащего номер версии снимка
printf("Отправка снимка, номер версии %d\n", (int)sequence);
zframe_send(&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new(sequence);
kvmsg_set_key(kvmsg, "KTHXBAI");
kvmsg_set_body(kvmsg, (byte *)"", 0);
kvmsg_send(kvmsg, snapshot);
kvmsg_destroy(&kvmsg);
}
}
zhash_destroy(&kvmap);
}
Вот клиентский код:
```c
//
// Клонирование - клиент - модель 2
//
// Давайте скомпилируем напрямую, не создавая библиотеки
#include "kvsimple.c"
int main (void)
{
// Подготовка контекста и SUB сокета
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (subscriber, "tcp://localhost:5557");
zhash_t *kvmap = zhash_new ();
// Получение снимка
int64_t sequence = 0;
zstr_send (snapshot, "ICANHAZ?");
while (TRUE) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // Прерывание
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence (kvmsg);
printf ("Снимок получен, версия=%ld\n", (long) sequence);
kvmsg_destroy (&kvmsg);
break; // Завершение
}
kvmsg_store (&kvmsg, kvmap);
}
// Применение событий обновления из очереди, игнорирование устаревших событий
while (!zctx_interrupted) {
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
break; // Прерывание
if (kvmsg_sequence (kvmsg) > sequence) {
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
}
else
kvmsg_destroy (&kvmsg);
}
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
Несколько замечаний:
Клиент использует два потока: один для генерации случайных событий обновления, другой для управления состоянием. Между ними используется PAIR сокет для связи. Возможно, вы подумали бы использовать SUB сокет, но проблема "slow joiner" могла бы повлиять на работу программы. PAIR сокет обеспечивает строгое синхронное взаимодействие между двумя потоками.* Мы установили пороговое значение (HWM) для обновлений на обновляющем сокете, чтобы избежать переполнения памяти в службе обновлений. В соединении протокола inproc пороговое значение является суммой пороговых значений обоих концов соединения, поэтому его следует устанавливать отдельно для каждого конца.
Клиент довольно прост, написан на C и состоит примерно из 60 строк кода. Большая часть работы выполняется в классе kvmsg, хотя в целом реализация модели клонирования достаточно проста.
Мы не использовали специальные методы сериализации содержимого состояния. Пары ключ-значение представлены объектами kvmsg и хранятся в хэш-таблице. При различных временных запросах к состоянию получается различные снимки. Предполагается, что клиент взаимодействует только с одним сервисом, и этот сервис должен быть в рабочем состоянии. В данный момент мы не рассматриваем восстановление после сбоев сервиса.
В настоящее время эти две программы ещё не работают, но уже способны правильно синхронизировать состояние. Это смесь различных моделей сообщений: пары PAIR внутри процесса, публикация-подписка, ROUTER-DEALER и другие.
События обновления ключей на клиентской стороне передаются через сокеты PUSH-PULL на сервер:
Почему бы нам не позволить клиенту напрямую отправлять обновления другим клиентам? Хотя это могло бы уменьшить задержку, но тогда невозможно было бы добавить уникальный и увеличивающийся номер для каждого события обновления. Многие приложения требуют, чтобы события обновления были отсортированы определённым образом, и только отправка сообщений на сервер и распределение обновлений сервером гарантируют последовательность событий обновления.Имея уникальный номер, клиент также может обнаружить больше проблем: засорение сети или переполнение очереди. Если клиент заметит отсутствие входящих сообщений на некоторое время, он может принять меры. Возможно, вы подумаете, что уведомление сервера клиентом о необходимости повторной отправки потерянных сообщений решит проблему. Но на самом деле это не всегда так. Пропуск сообщений указывает на плохое состояние сети, и дополнительные запросы только ухудшают ситуацию. Поэтому обычно клиенты отправляют предупреждения и останавливаются до тех пор, пока специалисты по обслуживанию не возобновят работу.Теперь мы начинаем создание модели для обновления состояния на клиентской стороне. Ниже представлен код клиента:
## Клиентский код
```python
def update_state(client_state):
# Логика обновления состояния
pass
```c
//
// Клонирование режима серверной модели 3
//
// Прямая компиляция, без создания библиотеки
#include "kvsimple.c"
static int s_send_single(char *key, void *data, void *args);
// Информация о стороннем запросе снимка
typedef struct {
void *socket; // ROUTER сокет
zframe_t *identity; // Идентификатор запросчика
} kvroute_t;
int main(void)
{
// Подготовка контекста и сокетов
zctx_t *ctx = zctx_new();
void *snapshot = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(snapshot, "tcp://*:5556");
void *publisher = zsocket_new(ctx, ZMQ_PUB);
zsocket_bind(publisher, "tcp://*:5557");
void *collector = zsocket_new(ctx, ZMQ_PULL);
zsocket_bind(collector, "tcp://*:5558");
int64_t sequence = 0;
zhash_t *kvmap = zhash_new();
zmq_pollitem_t items[] = {
{ collector, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
int rc = zmq_poll(items, 2, 1000 * ZMQ_POLL_MSEC);
// Выполнение событий обновления от клиентов
if (items[0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv(collector);
if (!kvmsg)
break; // Прерывание
kvmsg_set_sequence(kvmsg, ++sequence);
kvmsg_send(kvmsg, publisher);
kvmsg_store(&kvmsg, kvmap);
printf("I: Опубликовано событие обновления %5d\n", (int)sequence);
}
// Ответ на запрос снимка
if (items[1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv(snapshot);
if (!identity)
break; // Прерывание
// Запрос содержится во второй части сообщения
char *request = zstr_recv(snapshot);
if (streq(request, "ICANHAZ?"))
free(request);
else {
printf("E: Некорректный запрос, программа завершена\n");
break;
}
// Отправка снимка
``````c
kvroute_t routing = { snapshot, identity };
// Отправка построчно
zhash_foreach(kvmap, s_send_single, &routing);
// Отправка завершающего идентификатора и номера
printf("I: Отправка снимка, версия: %d\n", (int)sequence);
zframe_send(&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new(sequence);
kvmsg_set_key(kvmsg, "KTHXBAI");
kvmsg_set_body(kvmsg, (byte *)"", 0);
}
kvmsg_send(kvmsg, snapshot);
kvmsg_destroy(&kvmsg);
}
}
printf("Прервано\nОбработано %d сообщений\n", (int)sequence);
zhash_destroy(&kvmap);
zctx_destroy(&ctx);
return 0;
}
// Отправка состояния ключ-значение в сокет с использованием объекта kvmsg для хранения ключ-значение
static int
s_send_single(char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *)args;
// Отправка идентификатора получателя вначале
zframe_send(&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_t *kvmsg = (kvmsg_t *)data;
kvmsg_send(kvmsg, kvroute->socket);
return 0;
}
Вот клиентский код:clonecli3: Клиент для клонирования, модель Three на C
//
// Клонирование модели - клиент - модель 3
//
// Прямая компиляция, без создания библиотеки классов
#include "kvsimple.c"
int main(void)
{
// Подготовка контекста и SUB сокета
zctx_t *ctx = zctx_new();
void *snapshot = zsocket_new(ctx, ZMQ_DEALER);
zsocket_connect(snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new(ctx, ZMQ_SUB);
zsocket_connect(subscriber, "tcp://localhost:5557");
void *publisher = zsocket_new(ctx, ZMQ_PUSH);
zsocket_connect(publisher, "tcp://localhost:5558");
zhash_t *kvmap = zhash_new();
srandom((unsigned)time(NULL));
// Получение состояния снимка
int64_t sequence = 0;
zstr_send(snapshot, "ICANHAZ?");
while (TRUE)
{
kvmsg_t *kvmsg = kvmsg_recv(snapshot);
if (!kvmsg)
break; // Прерывание
if (streq(kvmsg_key(kvmsg), "KTHXBAI"))
{
sequence = kvmsg_sequence(kvmsg);
printf("I: Получено состояние снимка, версия: %ld\n", (long)sequence);
kvmsg_destroy(&kvmsg);
break; // Завершение
}
kvmsg_store(&kvmsg, kvmap);
}
int64_t alarm = zclock_time() + 1000;
while (!zctx_interrupted)
{
zmq_pollitem_t items[] = {{subscriber, 0, ZMQ_POLLIN, 0}};
int tickless = (int)((alarm - zclock_time()));
if (tickless < 0)
tickless = 0;
int rc = zmq_poll(items, 1, tickless * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Контекст закрыт
if (items[0].revents & ZMQ_POLLIN)
{
kvmsg_t *kvmsg = kvmsg_recv(subscriber);
if (!kvmsg)
break; // Прерывание
// Отбрасывание устаревших сообщений, включая пинг
if (kvmsg_sequence(kvmsg) > sequence)
{
sequence = kvmsg_sequence(kvmsg);
kvmsg_store(&kvmsg, kvmap);
printf("I: Получено событие обновления: %ld\n", (long)sequence);
}
else
kvmsg_destroy(&kvmsg);
}
}
``` // Создание случайного события обновления
if (zclock_time() >= alarm)
{
kvmsg_t *kvmsg = kvmsg_new(0);
kvmsg_fmt_key(kvmsg, "%d", randof(10000));
kvmsg_fmt_body(kvmsg, "%d", randof(1000000));
kvmsg_send(kvmsg, publisher);
kvmsg_destroy(&kvmsg);
alarm = zclock_time() + 1000;
}
}
}
``````markdown
printf("Подготовлено\nПолучено %d сообщений\n", (int) sequence);
zhash_destroy(&kvmap);
zctx_destroy(&ctx);
return 0;
}
* Сервер объединён в один поток, который отвечает за сбор обновлений от клиентов и передачу их другим клиентам. Он использует PULL сокет для получения обновлений, ROUTER сокет для обработки запросов на снимки состояния и PUB сокет для публикации обновлений.
* Клиенты отправляют случайные обновления каждую секунду примерно, что в реальности происходит по инициативе приложения.
#### Поддеревья
В реальных системах ключ-значение могут увеличиваться со временем, а клиенты могут нуждаться только в части этого кэша. Мы можем использовать поддеревья для решения этой проблемы: клиенты сообщают серверу, какие поддеревья они хотят получить, отправляя запросы на снимки состояния, и указывают эти поддеревья при подписке на обновления.
Существуют различные синтаксисы для поддеревьев, включая "иерархическую структуру путей" и "тематическое дерево":
* Иерархическая структура путей: /some/list/of/paths
* Тематическое дерево: some.list.of.topicsВ данном случае мы будем использовать иерархическую структуру путей для реализации поддеревьев и расширения сервера и клиентов для работы с ними. Поддержка нескольких поддеревьев не представляет особой сложности, поэтому мы не будем демонстрировать это здесь.
Вот код сервера, развившийся из модели 3:**clonesrv4: Сервер поддеревьев, модель 4 на C**
```c
//
// Клонирование модели 4 для сервера
//
// Прямая компиляция, без создания библиотеки
#include "kvsimple.c"
static int s_send_single(char *key, void *data, void *args);
// Информация о стороннем запросе снимка
typedef struct {
void *socket; // ROUTER сокет
zframe_t *identity; // Идентификатор запросчика
char *subtree; // Указанный поддерево
} kvroute_t;
int main(void)
{
// Подготовка контекста и сокетов
zctx_t *ctx = zctx_new();
void *snapshot = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(snapshot, "tcp://*:5556");
void *publisher = zsocket_new(ctx, ZMQ_PUB);
zsocket_bind(publisher, "tcp://*:5557");
void *collector = zsocket_new(ctx, ZMQ_PULL);
zsocket_bind(collector, "tcp://*:5558");
int64_t sequence = 0;
zhash_t *kvmap = zhash_new();
zmq_pollitem_t items[] = {
{ collector, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
int rc = zmq_poll(items, 2, 1000 * ZMQ_POLL_MSEC);
// Выполнение событий обновления от клиентов
if (items[0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv(collector);
if (!kvmsg)
break; // Прерывание
kvmsg_set_sequence(kvmsg, ++sequence);
kvmsg_send(kvmsg, publisher);
kvmsg_store(&kvmsg, kvmap);
printf("I: Опубликовано событие обновления %5d\n", (int)sequence);
}
// Ответ на запрос снимка
if (items[1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv(snapshot);
if (!identity)
break; // Прерывание
// Запрос содержится во второй части сообщения
char *request = zstr_recv(snapshot);
char *subtree = NULL;
if (streq(request, "ICANHAZ? ")) {
free(request);
subtree = zstr_recv(snapshot);
}
else {
``````c
printf("E: Некорректный запрос, программа завершена\n");
break;
}
// Отправка снимка
kvroute_t routing = { snapshot, identity, subtree };
// Отправка построчно
zhash_foreach(kvmap, s_send_single, &routing);
// Отправка завершающего идентификатора и номера
printf("I: Отправка снимка, версия: %d\n", (int)sequence);
}
}
}
``````markdown
zframe_send(&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new(sequence);
kvmsg_set_key(kvmsg, "KTHXBAI");
kvmsg_set_body(kvmsg, (byte *)subtree, 0);
kvmsg_send(kvmsg, snapshot);
kvmsg_destroy(&kvmsg);
free(subtree);
}
}
printf("Прервано\nОбработано %d сообщений\n", (int)sequence);
zhash_destroy(&kvmap);
zctx_destroy(&ctx);
return 0;
}
// Отправка состояния ключ-значение в сокет с использованием объекта kvmsg
static int
s_send_single(char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *)args;
kvmsg_t *kvmsg = (kvmsg_t *)data;
if (strlen(kvroute->subtree) <= strlen(kvmsg_key(kvmsg))
&& memcmp(kvroute->subtree,
kvmsg_key(kvmsg), strlen(kvroute->subtree)) == 0) {
// Отправка идентификатора получателя
zframe_send(&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send(kvmsg, kvroute->socket);
}
return 0;
}
Вот клиентский код:clonecli4: Клиент для клонирования, модель Four на C
//
// Клонирование - клиент - модель 4
//
// Прямая компиляция, без создания библиотеки классов
#include "kvsimple.c"
#define SUBTREE "/client/"
int main(void)
{
// Подготовка контекста и SUB сокета
zctx_t *ctx = zctx_new();
void *snapshot = zsocket_new(ctx, ZMQ_DEALER);
zsocket_connect(snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new(ctx, ZMQ_SUB);
zsocket_connect(subscriber, "tcp://localhost:5557");
zsockopt_set_subscribe(subscriber, SUBTREE);
void *publisher = zsocket_new(ctx, ZMQ_PUSH);
zsocket_connect(publisher, "tcp://localhost:5558");
zhash_t *kvmap = zhash_new();
srandom((unsigned)time(NULL));
// Получение состояния снимка
int64_t sequence = 0;
zstr_sendm(snapshot, "ICANHAZ?");
zstr_send(snapshot, SUBTREE);
while (TRUE) {
kvmsg_t *kvmsg = kvmsg_recv(snapshot);
if (! kvmsg)
break; // Прерывание
if (streq(kvmsg_key(kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence(kvmsg);
printf("I: Получено состояние снимка, версия: %d\n", (int)sequence);
kvmsg_destroy(&kvmsg);
break; // Завершение
}
kvmsg_store(&kvmsg, kvmap);
}
int64_t alarm = zclock_time() + 1000;
while (! zctx_interrupted) {
zmq_pollitem_t items[] = {{subscriber, 0, ZMQ_POLLIN, 0}};
int tickless = (int)((alarm - zclock_time()));
if (tickless < 0)
tickless = 0;
int rc = zmq_poll(items, 1, tickless * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Контекст закрыт
if (items[0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv(subscriber);
if (! kvmsg)
break; // Прерывание
// Отбрасывание устаревших сообщений, включая пинг
if (kvmsg_sequence(kvmsg) > sequence) {
sequence = kvmsg_sequence(kvmsg);
kvmsg_store(&kvmsg, kvmap);
printf("I: Получено обновление, версия: %d\n", (int)sequence);
}
}
}
}
``````markdown
} else {
kvmsg_destroy(&kvmsg);
}
// Создание случайного обновления
if (zclock_time() >= alarm) {
kvmsg_t *kvmsg = kvmsg_new(0);
kvmsg_fmt_key(kvmsg, "%s%d", SUBTREE, randof(10000));
kvmsg_fmt_body(kvmsg, "%d", randof(1000000));
kvmsg_send(kvmsg, publisher);
kvmsg_destroy(&kvmsg);
alarm = zclock_time() + 1000;
}
}
printf("Готово\nПолучено %d сообщений\n", (int)sequence);
zhash_destroy(&kvmap);
zctx_destroy(&ctx);
return 0;
}
```#### Временные значения
```Временные значения относятся к тем данным, которые сразу же истёкнут. Если вы используете режим клонирования для создания службы, аналогичной DNS, то можно использовать временные значения для моделирования динамического анализа DNS. Когда узел подключается к сети, он публикует свой адрес и постоянно обновляет его. Если узел отключается, его адрес становится недействительным.
Временные значения могут быть связаны с сессией (session), и когда сессия завершается, временные значения также становятся недействительными. В режиме клонирования сессия определяется клиентом, и она исчезает, когда клиент отключается.
Проще всего установить для каждого временного значения срок действия, который клиент будет продлевать. Когда соединение прерывается, срок действия не обновляется, и сервер автоматически удаляет значение.
Мы будем использовать этот простой подход для реализации временных значений, поскольку более сложные методы могут быть излишними. Различие между ними заключается в производительности. Если у клиента много временных значений, то установка срока действия для каждого значения является правильным подходом; если количество временных значений достигает определённого уровня, то лучше связать их с сессией и управлять сроками действия вместе.Сначала нам нужно добавить срок действия в ключ-значение сообщение. Мы можем добавить дополнительный фрейм сообщения, но это означает, что каждый раз, когда нам нужно добавить содержимое сообщения, мы должны изменять библиотеку kvmsg, что неудобно. Поэтому мы добавляем один "атрибут" фрейма сообщения, который используется для добавления различных атрибутов сообщения.
Затем нам нужно организовать удаление данных. В настоящее время сервер и клиент бездумно изменяют данные хэш-таблицы. Мы можем определить, что, когда значение сообщения пустое, это означает удаление данных для данного ключа.
Ниже представлен более полный класс kvmsg, который реализует "атрибут" фрейм и фрейм UUID, который мы будем использовать позже. Этот класс также отвечает за обработку сообщений с пустыми значениями, чтобы удалить данные:**kvmsg: Класс сообщения ключ-значение - полный на C**```c
/* =====================================================================
kvmsg - класс сообщений ключ-значение для примеров приложений
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
Этот файл является частью руководства по ZeroMQ: http://zguide.zeromq.org
Это свободное программное обеспечение; вы можете распространять его и/или модифицировать его в соответствии с
условиями Генеральной общественной лицензии GNU версии 3, либо (по вашему выбору) любой более поздней версии.
Это программное обеспечение распространяется в надежде, что оно будет полезным, но
БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ; даже без скрытой гарантии пригодности для использования или соответствия какому-либо назначению. Смотрите Генеральную общественную лицензию GNU для более подробной информации.
Вы должны были получить копию Генеральной общественной лицензии GNU вместе с этим программным обеспечением. Если нет, смотрите
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "kvmsg.h"
#include <uuid/uuid.h>
#include "zlist.h"
// Ключ — короткая строка
#define KVMSG_KEY_MAX 255
// Сообщение состоит из пяти фреймов
// frame 0: ключ (ZMQ строка)
// frame 1: номер (8 байт, в порядке возрастания)
// frame 2: UUID (блок данных, 16 байт)
// frame 3: свойства (ZMQ строка)
// frame 4: значение (блок данных)
#define FRAME_KEY 0
#define FRAME_SEQ 1
#define FRAME_UUID 2
``````c
#define FRAME_PROPS 3
#define FRAME_BODY 4
#define KVMSG_FRAMES 5
// Структура класса
struct _kvmsg {
// Флаги существования фреймов
int present[KVMSG_FRAMES];
// Соответствующие фреймы сообщения
zmq_msg_t frame[KVMSG_FRAMES];
// Ключ, строка C
char key[KVMSG_KEY_MAX + 1];
// Список свойств, в формате key=value
zlist_t *props;
size_t props_size;
};
// Преобразование списка свойств в строку
static void
s_encode_props(kvmsg_t *self)
{
zmq_msg_t *msg = &self->frame[FRAME_PROPS];
if (self->present[FRAME_PROPS])
zmq_msg_close(msg);
zmq_msg_init_size(msg, self->props_size);
char *prop = zlist_first(self->props);
char *dest = (char *)zmq_msg_data(msg);
while (prop) {
strcpy(dest, prop);
dest += strlen(prop);
*dest++ = '\n';
prop = zlist_next(self->props);
}
self->present[FRAME_PROPS] = 1;
}
// Разбор атрибутов из строки
static void
s_decode_props(kvmsg_t *self)
{
zmq_msg_t *msg = &self->frame[FRAME_PROPS];
self->props_size = 0;
while (zlist_size(self->props))
free(zlist_pop(self->props));
size_t remainder = zmq_msg_size(msg);
char *prop = (char *)zmq_msg_data(msg);
char *eoln = memchr(prop, '\n', remainder);
while (eoln) {
*eoln = 0;
zlist_append(self->props, strdup(prop));
self->props_size += strlen(prop) + 1;
remainder -= strlen(prop) + 1;
prop = eoln + 1;
eoln = memchr(prop, '\n', remainder);
}
}
// ---------------------------------------------------------------------
// Конструктор, задает номер сообщения
kvmsg_t *
kvmsg_new(int64_t sequence)
{
kvmsg_t *self;
self = (kvmsg_t *)zmalloc(sizeof(kvmsg_t));
self->props = zlist_new();
kvmsg_set_sequence(self, sequence);
return self;
}
// ---------------------------------------------------------------------
// Деструктор
// Функция освобождения памяти для вызова zhash_free_fn()
void
kvmsg_free(void *ptr)
{
if (ptr) {
kvmsg_t *self = (kvmsg_t *)ptr;
// Освобождает все фреймы сообщения
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present[frame_nbr]) {
zmq_msg_t *msg = &self->frame[frame_nbr];
zmq_msg_close(msg);
}
}
zlist_destroy(&self->props);
free(self);
}
}
``````c
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
if (self->present[frame_nbr])
zmq_msg_close(&self->frame[frame_nbr]);
// Освобождает список атрибутов
while (zlist_size(self->props))
free(zlist_pop(self->props));
zlist_destroy(&self->props);
// Освобождает объект сам по себе
free(self);
}
}
void
kvmsg_destroy(kvmsg_t **self_p)
{
assert(self_p);
if (*self_p) {
kvmsg_free(*self_p);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Копирует объект kvmsg
kvmsg_t *
kvmsg_dup(kvmsg_t *self)
{
kvmsg_t *kvmsg = kvmsg_new(0);
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present[frame_nbr]) {
zmq_msg_t *src = &self->frame[frame_nbr];
zmq_msg_t *dst = &kvmsg->frame[frame_nbr];
zmq_msg_init_size(dst, zmq_msg_size(src));
memcpy(zmq_msg_data(dst),
zmq_msg_data(src), zmq_msg_size(src));
kvmsg->present[frame_nbr] = 1;
}
}
kvmsg->props = zlist_copy(self->props);
return kvmsg;
}
// Чтение ключевых значений из сокета, возврат экземпляра kvmsg
kvmsg_t *
kvmsg_recv(void *socket)
{
assert(socket);
kvmsg_t *self = kvmsg_new(0);
// Чтение всех фреймов, если произошла ошибка, возвращаем пустое значение
int frame_nbr;
for(frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
{
if(self->present[frame_nbr])
zmq_msg_close(&self->frame[frame_nbr]);
zmq_msg_init(&self->frame[frame_nbr]);
self->present[frame_nbr] = 1;
if(zmq_recvmsg(socket, &self->frame[frame_nbr], 0) == -1)
{
kvmsg_destroy(&self);
break;
}
// Проверка многофреймового сообщения
int rcvmore = (frame_nbr < KVMSG_FRAMES - 1) ? 1 : 0;
if(zsockopt_rcvmore(socket) != rcvmore)
{
kvmsg_destroy(&self);
break;
}
}
if(self)
s_decode_props(self);
return self;
}
// ---------------------------------------------------------------------
``````markdown
## Возвращение номера сообщения
```c
int64_t
kvmsg_sequence(kvmsg_t *self)
{
assert(self);
if (self->present[FRAME_SEQ]) {
assert(zmq_msg_size(&self->frame[FRAME_SEQ]) == 8);
byte *source = zmq_msg_data(&self->frame[FRAME_SEQ]);
int64_t sequence = ((int64_t)(source[0]) << 56)
+ ((int64_t)(source[1]) << 48)
+ ((int64_t)(source[2]) << 40)
+ ((int64_t)(source[3]) << 32)
+ ((int64_t)(source[4]) << 24)
+ ((int64_t)(source[5]) << 16)
+ ((int64_t)(source[6]) << 8)
+ (int64_t)(source[7]);
return sequence;
} else {
return 0;
}
}
byte *
kvmsg_uuid(kvmsg_t *self)
{
assert(self);
if (self->present[FRAME_UUID] && zmq_msg_size(&self->frame[FRAME_UUID]) == sizeof(uuid_t)) {
return (byte *)zmq_msg_data(&self->frame[FRAME_UUID]);
} else {
return NULL;
}
}
```c
byte *
kvmsg_body(kvmsg_t *self)
{
assert(self);
if (self->present[FRAME_BODY]) {
return (byte *)zmq_msg_data(&self->frame[FRAME_BODY]);
} else {
return NULL;
}
}
size_t
kvmsg_size(kvmsg_t *self)
{
assert(self);
if (self->present[FRAME_BODY]) {
return zmq_msg_size(&self->frame[FRAME_BODY]);
} else {
return 0;
}
}
void
kvmsg_set_key(kvmsg_t *self, char *key)
{
assert(self);
zmq_msg_t *msg = &self->frame[FRAME_KEY];
if (self->present[FRAME_KEY]) {
zmq_msg_close(msg);
}
zmq_msg_init_size(msg, strlen(key));
memcpy(zmq_msg_data(msg), key, strlen(key));
self->present[FRAME_KEY] = 1;
}
void
kvmsg_set_sequence(kvmsg_t *self, int64_t sequence)
{
assert(self);
zmq_msg_t *msg = &self->frame[FRAME_SEQ];
if (self->present[FRAME_SEQ]) {
zmq_msg_close(msg);
zmq_msg_init_size(msg, 8);
byte *source = zmq_msg_data(msg);
source[0] = (byte)((sequence >> 56) & 255);
source[1] = (byte)((sequence >> 48) & 255);
source[2] = (byte)((sequence >> 40) & 255);
source[3] = (byte)((sequence >> 32) & 255);
source[4] = (byte)((sequence >> 24) & 255);
source[5] = (byte)((sequence >> 16) & 255);
source[6] = (byte)((sequence >> 8) & 255);
source[7] = (byte)(sequence & 255);
self->present[FRAME_SEQ] = 1;
}
}
// ---------------------------------------------------------------------
// Генерация и установка UUID сообщения
void
kvmsg_set_uuid(kvmsg_t *self)
{
assert(self);
zmq_msg_t *msg = &self->frame[FRAME_UUID];
uuid_t uuid;
uuid_generate(uuid);
if (self->present[FRAME_UUID]) {
zmq_msg_close(msg);
}
zmq_msg_init_size(msg, sizeof(uuid));
memcpy(zmq_msg_data(msg), uuid, sizeof(uuid));
self->present[FRAME_UUID] = 1;
}
``````markdown
zmq_msg_init_size (msg, sizeof (uuid));
memcpy (zmq_msg_data (msg), uuid, sizeof (uuid));
self->present [FRAME_UUID] = 1;
}
// ---------------------------------------------------------------------
// Установка содержимого сообщения
void
kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_BODY];
if (self->present [FRAME_BODY])
zmq_msg_close (msg);
self->present [FRAME_BODY] = 1;
zmq_msg_init_size (msg, size);
memcpy (zmq_msg_data (msg), body, size);
}
// ---------------------------------------------------------------------
// Установка ключа сообщения с использованием printf() формата
void
kvmsg_fmt_key (kvmsg_t *self, char *format, ...)
{
char value [KVMSG_KEY_MAX + 1];
va_list args;
assert (self);
va_start (args, format);
vsnprintf (value, KVMSG_KEY_MAX, format, args);
va_end (args);
kvmsg_set_key (self, value);
}
// ---------------------------------------------------------------------
// Установка содержимого сообщения с использованием printf() формата
void
kvmsg_fmt_body (kvmsg_t *self, char *format, ...)
{
char value [255 + 1];
va_list args;
assert (self);
va_start (args, format);
vsnprintf (value, 255, format, args);
va_end (args);
kvmsg_set_body (self, (byte *) value, strlen (value));
}
// ---------------------------------------------------------------------
// Получение свойства сообщения, если нет - возврат пустой строки
char *
kvmsg_get_prop (kvmsg_t *self, char *name)
{
assert (strchr (name, '=') == NULL);
char *prop = zlist_first(self->props);
size_t namelen = strlen(name);
while (prop) {
if (strlen(prop) > namelen
&& memcmp(prop, name, namelen) == 0
&& prop[namelen] == '=')
return prop + namelen + 1;
prop = zlist_next(self->props);
}
return "";
}
``` // ---------------------------------------------------------------------
// Устанавливает свойства сообщения
// Имя свойства не может содержать знак '=', максимальная длина значения - 255 символов
void
kvmsg_set_prop(kvmsg_t *self, char *name, char *format, ...)
{
assert(strchr(name, '=') == NULL);
char value[255 + 1];
va_list args;
assert(self);
va_start(args, format);
vsnprintf(value, 255, format, args);
va_end(args);
// Выделяем память
char *prop = malloc(strlen(name) + strlen(value) + 2);
// Удаляем существующие свойства
sprintf(prop, "%s=", name);
char *existing = zlist_first(self->props);
while (existing) {
if (memcmp(prop, existing, strlen(prop)) == 0) {
self->props_size -= strlen(existing) + 1;
zlist_remove(self->props, existing);
free(existing);
break;
}
existing = zlist_next(self->props);
}
// Добавляем новое свойство
strcat(prop, value);
zlist_append(self->props, prop);
self->props_size += strlen(prop) + 1;
}
// ---------------------------------------------------------------------
// Сохраняет объект kvmsg в хеш-таблице
// Освобождает объект kvmsg, когда он больше не используется;
// Если переданный объект пуст, то он удаляется.
void
kvmsg_store(kvmsg_t **self_p, zhash_t *hash)
{
assert(self_p);
if (*self_p) {
kvmsg_t *self = *self_p;
assert(self);
if (kvmsg_size(self)) {
if (self->present[FRAME_KEY] && self->present[FRAME_BODY]) {
zhash_update(hash, kvmsg_key(self), self);
zhash_freefn(hash, kvmsg_key(self), kvmsg_free);
}
} else {
zhash_delete(hash, kvmsg_key(self));
*self_p = NULL;
}
}
}
// ---------------------------------------------------------------------
// Выводит содержимое сообщения в стандартный поток ошибок
void
kvmsg_dump(kvmsg_t *self)
{
if (self) {
if (!self) {
}```markdown
```c
fprintf(stderr, "NULL");
return;
}
size_t size = kvmsg_size(self);
byte *body = kvmsg_body(self);
fprintf(stderr, "[seq:%" PRId64 "]", kvmsg_sequence(self));
fprintf(stderr, "[key:%s]", kvmsg_key(self));
fprintf(stderr, "[size:%zu] ", size);
if (zlist_size(self->props)) {
fprintf(stderr, "[");
char *prop = zlist_first(self->props);
while (prop) {
fprintf(stderr, "%s;", prop);
prop = zlist_next(self->props);
}
fprintf(stderr, "]");
}
int char_nbr;
for (char_nbr = 0; char_nbr < size; char_nbr++) {
fprintf(stderr, "%02X", body[char_nbr]);
}
fprintf(stderr, "\n");
} else {
fprintf(stderr, "Сообщение NULL\n");
}
}
```// Тестовые примеры
int kvmsg_test(int verbose) {
kvmsg_t *kvmsg;
printf(" * kvmsg: ");
// Подготовка контекста и сокета
zctx_t *ctx = zctx_new();
void *output = zsocket_new(ctx, ZMQ_DEALER);
int rc = zmq_bind(output, "ipc://kvmsg_selftest.ipc");
assert(rc == 0);
void *input = zsocket_new(ctx, ZMQ_DEALER);
rc = zmq_connect(input, "ipc://kvmsg_selftest.ipc");
assert(rc == 0);
zhash_t *kvmap = zhash_new();
// Тестирование отправки и получения простых сообщений
kvmsg = kvmsg_new(1);
kvmsg_set_key(kvmsg, "key");
kvmsg_set_uuid(kvmsg);
kvmsg_set_body(kvmsg, (byte *)"body", 4);
if (verbose) {
kvmsg_dump(kvmsg);
}
kvmsg_send(kvmsg, output);
kvmsg_store(&kvmsg, kvmap);
kvmsg = kvmsg_recv(input);
if (verbose) {
kvmsg_dump(kvmsg);
}
assert(streq(kvmsg_key(kvmsg), "key"));
kvmsg_store(&kvmsg, kvmap);
// Тестирование отправки и получения сообщений с атрибутами
kvmsg = kvmsg_new(2);
kvmsg_set_prop(kvmsg, "prop1", "value1");
kvmsg_set_prop(kvmsg, "prop2", "value1");
kvmsg_set_prop(kvmsg, "prop2", "value2");
kvmsg_set_key(kvmsg, "key");
kvmsg_set_uuid(kvmsg);
kvmsg_set_body(kvmsg, (byte *)"body", 4);
assert(streq(kvmsg_get_prop(kvmsg, "prop2"), "value2"));
if (verbose) {
kvmsg_dump(kvmsg);
}
kvmsg_send(kvmsg, output);
kvmsg_destroy(&kvmsg);
kvmsg = kvmsg_recv(input);
if (verbose) {
kvmsg_dump(kvmsg);
}
assert(streq(kvmsg_key(kvmsg), "key"));
assert(streq(kvmsg_get_prop(kvmsg, "prop2"), "value2"));
}
kvmsg_destroy(&kvmsg); // Закрыть и уничтожить все объекты zhash_destroy(&kvmap); zctx_destroy(&ctx); printf("OK\n"); return 0; }
Клиентская модель 5 и модель 4 не имеют значительных различий, за исключением изменения библиотеки kvmsg. При обновлении сообщений необходимо добавить свойство с временем жизни:```c
kvmsg_set_prop(kvmsg, "ttl", "%d", randof(30));
Серверная модель 5 претерпела значительные изменения. Мы будем использовать реактор вместо поллинга, чтобы смешивать обработку таймерных событий и событий сокетов. Это может быть сложнее реализовать на языке C. Вот пример кода:
clonesrv5: Клоновый сервер, Модель Пять на C
//
// Клонирование - сервер - модель 5
//
// Прямая компиляция, без создания библиотеки
#include "kvmsg.c"
// Обработчик реактора
static int s_snapshots(zloop_t *loop, void *socket, void *args);
static int s_collector(zloop_t *loop, void *socket, void *args);
static int s_flush_ttl(zloop_t *loop, void *socket, void *args);
// Атрибуты сервера
typedef struct {
zctx_t *ctx; // Контекст
zhash_t *kvmap; // Хранилище пар ключ-значение
zloop_t *loop; // zloop реактор
int port; // Основной порт
int64_t sequence; // Номер события обновления
void *snapshot; // Обработка запросов на снимок
void *publisher; // Публикация событий обновления
void *collector; // Сбор событий обновления от клиентов
} clonesrv_t;
int main(void) {
clonesrv_t *self = (clonesrv_t *) zmalloc(sizeof(clonesrv_t));
self->port = 5556;
self->ctx = zctx_new();
self->kvmap = zhash_new();
self->loop = zloop_new();
zloop_set_verbose(self->loop, FALSE);
// Открытие сокета сервера клонирования
self->snapshot = zsocket_new(self->ctx, ZMQ_ROUTER);
self->publisher = zsocket_new(self->ctx, ZMQ_PUB);
self->collector = zsocket_new(self->ctx, ZMQ_PULL);
zsocket_bind(self->snapshot, "tcp://*:%d", self->port);
zsocket_bind(self->publisher, "tcp://*:%d", self->port + 1);
zsocket_bind(self->collector, "tcp://*:%d", self->port + 2);
``` // Регистрация обработчиков реактора
zloop_reader(self->loop, self->snapshot, s_snapshots, self);
zloop_reader(self->loop, self->collector, s_collector, self);
zloop_timer(self->loop, 1000, 0, s_flush_ttl, self);
// Запуск реактора до прерывания
zloop_start(self->loop);
zloop_destroy(&self->loop);
zhash_destroy(&self->kvmap);
zctx_destroy(&self->ctx);
free(self);
return 0;
}
// ---------------------------------------------------------------------
// Отправка содержимого снимка
static int s_send_single(char *key, void *data, void *args);
// Информация о запросе
typedef struct {
void *socket; // Сокет ROUTER
zframe_t *identity; // Идентификатор запроса
char *subtree; // Поддерево
} kvroute_t;```c
static int
s_snapshots(zloop_t *loop, void *snapshot, void *args) {
clonesrv_t *self = (clonesrv_t *) args;
zframe_t *identity = zframe_recv(snapshot);
}
if (identity) {
// Запрос находится во второй части сообщения
char *request = zstr_recv(snapshot);
char *subtree = NULL;
if (streq(request, "ICANHAZ? ")) {
free(request);
subtree = zstr_recv(snapshot);
} else {
printf("E: Неверный запрос, программа завершена\n");
}
if (subtree) {
// Отправка состояния снимка
kvroute_t routing = { snapshot, identity, subtree };
zhash_foreach(self->kvmap, s_send_single, &routing);
// Отправка завершающего символа и номера версии
zclock_log("I: Отправка снимка, номер версии: %d", (int) self->sequence);
zframe_send(&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new(self->sequence);
kvmsg_set_key(kvmsg, "KTHXBAI");
kvmsg_set_body(kvmsg, (byte *) subtree, 0);
kvmsg_send(kvmsg, snapshot);
kvmsg_destroy(&kvmsg);
free(subtree);
}
}
return 0;
}
// Отправка каждого ключ-значение пары по отдельности
static int
s_send_single(char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen(kvroute->subtree) <= strlen(kvmsg_key(kvmsg))
&& memcmp(kvroute->subtree,
kvmsg_key(kvmsg), strlen(kvroute->subtree)) == 0) {
// Сначала отправляем идентификатор получателя
zframe_send(&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send(kvmsg, kvroute->socket);
}
return 0;
}
// ---------------------------------------------------------------------
// Сбор событий обновления
static int
s_collector(zloop_t *loop, void *collector, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = kvmsg_recv(collector);
if (kvmsg) {
``````markdown
static kvmsg_set_sequence(kvmsg, ++self->sequence);
static kvmsg_send(kvmsg, self->publisher);
int ttl = atoi(kvmsg_get_prop(kvmsg, "ttl"));
if (ttl) {
kvmsg_set_prop(kvmsg, "ttl", "%" PRId64, zclock_time() + ttl * 1000);
}
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: публикация события обновления %d", (int) self->sequence);
}
return 0;
}
// ---------------------------------------------------------------------
// удаление просроченных мгновенных значений
```markdown
static int s_flush_single(char *key, void *data, void *args);
static int
s_flush_ttl(zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
zhash_foreach(self->kvmap, s_flush_single, args);
return 0;
}
// Удаление просроченных ключ-значение пар и отправка события о удалении
static int
s_flush_single(char *key, void *data, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
int64_t ttl;
sscanf(kvmsg_get_prop(kvmsg, "ttl"), "%" PRId64, &ttl);
if (ttl && zclock_time() >= ttl) {
kvmsg_set_sequence(kvmsg, ++self->sequence);
kvmsg_set_body(kvmsg, (byte *) "", 0);
kvmsg_send(kvmsg, self->publisher);
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: Отправка события о удалении %d", (int) self->sequence);
}
return 0;
}
```Клонирование моделей 1 до 5 является относительно простым процессом, но далее мы рассмотрим очень сложную модель. Можно заметить, что для построения надежной очереди сообщений требуется много усилий. Поэтому часто задается вопрос: действительно ли это необходимо? Если вы готовы принять ненадежную или достаточно надежную архитектуру, то поздравляем вас, вы нашли баланс между затратами и выгодой. Хотя иногда могут теряться некоторые сообщения, с экономической точки зрения это оправдано. В любом случае, давайте рассмотрим эту сложную модель.
В модели 3 вы будете отключать и перезапускать сервис, что приведет к потере данных. Любые последующие клиенты смогут получить только те данные, которые были отправлены после перезапуска, а не все. Давайте попробуем найти способ сделать так, чтобы клонированная модель могла пережить перезапуск сервера.
Вот список проблем, которые нам нужно решить:
* Клонированный серверный процесс прекращает работу и автоматически или вручную перезапускается. Процесс теряет все данные, поэтому ему необходимо восстановиться из другого источника.
* Клонированный сервер испытывает аппаратные проблемы, которые длительное время не могут быть восстановлены. Клиентам необходимо переключиться на другой доступный сервер.* Клонированный сервер отключается от сети, например, из-за отказа коммутатора. Он соединяется снова в какой-то момент времени, но за это время данные должны быть обработаны запасным сервером.
Первым шагом является добавление еще одного сервера. Мы можем использовать двойник-модель, упомянутую в четвертой главе, которая представляет собой реактор, и наша программа также может быть представлена как реактор, поэтому они могут работать вместе.
Мы должны гарантировать, что события обновления будут сохранены при сбое основного сервера. Самый простой механизм — это отправка событий сразу на два сервера.
Запасной сервер можно рассматривать как клиента, который получает обновления от основного сервера, как и другие клиенты. В то же время он получает обновления от клиентов — хотя эти обновления не должны использоваться для обновления данных, они могут быть временно сохранены.
По сравнению с моделью 5, модель 6 вводит следующие характеристики:
* Клиенты отправляют события обновления через PUB-SUB сокеты вместо PUSH-PULL. Причина заключается в том, что PUSH сокеты блокируются, если нет получателя, и выполняют балансировку нагрузки — нам нужно, чтобы оба сервера получили сообщение. Мы связываем SUB сокеты на серверах и PUB сокеты на клиентах.* Мы добавляем пульсацию в события обновления, отправляемые сервером клиентам, чтобы те знали, жив ли основной сервер, и могли переключиться на запасной сервер. Мы используем класс bstar реактора в режиме Gemini для создания основного и резервного узлов. В режиме Gemini требуется «голосующий» сокет для помощи в определении, жив ли другой узел. Здесь мы используем запросы кэша в качестве «голосования».
Мы добавим UUID-атрибут для всех событий обновления, который будет генерироваться клиентом и публиковаться всем клиентам сервером.
Резервный узел будет поддерживать список «ожидаемых обработок», который хранит события обновления от клиентов, которые еще не были опубликованы сервером; или наоборот, события обновления от сервера, которые еще не были получены от клиентов. Этот список упорядочен от старого к новому, что позволяет легко удалять сообщения сверху.
Мы можем спроектировать конечный автомат для клиента, который имеет три состояния:
* Клиент открывает и подключается к сокету, затем отправляет запрос кэша на сервер. Чтобы избежать бурю сообщений, он делает это только дважды.
* Клиент ожидает ответ на запрос кэша, если получает его, сохраняет; если нет, отправляет запрос на второй сервер.
* Клиент получает кэш, начинает ждать события обновления. Если в течение некоторого времени нет ответа от сервера, он подключается ко второму серверу.Клиент будет циклически повторять этот процесс; возможно, при запуске программы некоторые клиенты попытаются подключиться к основному узлу, а другие — к резервному. Уверены, что режим Gemini хорошо справится с этой ситуацией.
Мы можем изобразить диаграмму состояний клиента:

Шаги восстановления после отказа:
* Клиент обнаруживает, что основной узел перестал отправлять пульсы, поэтому переходит к подключению к резервному узлу и запрашивает новый кэш;
* Резервный узел начинает принимать запросы кэша и обнаруживает, что основной узел умер, и начинает работать как основной узел;
* Резервный узел записывает события обработки из списка «ожидаемых обработок» в своё состояние, а затем начинает обрабатывать запросы кэша.
При восстановлении основного узла:
* Запускается в роли slave и подключается к резервному узлу в режиме клонирования;
* Одновременно использует SUB-сокет для получения событий обновления от клиентов.
Мы делаем два предположения:* По крайней мере один основной узел продолжает работать. Если оба основных узла выйдут из строя, мы потеряем все данные сервера и не сможем восстановиться.
* Разные клиенты не будут одновременно обновлять один и тот же ключ. События обновления клиентов будут поступать на два сервера последовательно, поэтому порядок обновления может отличаться. Однако порядок получения одного события обновления клиентом на двух серверах будет одинаковым, так что беспокоиться не о чем. Вот общая схема архитектуры:
Перед началом программирования нам нужно будет重构客户端成为一个可重用的类。在ZMQ中编写异步类有时是为了练习如何写出优雅的代码,但在本例中,我们确实希望克隆模式能够成为一种易于使用的程序。上述架构的可扩展性取决于客户端的正确行为,因此有必要将其封装成一个API。在客户端中实现故障恢复是比较复杂的,想象一下自由者模式和克隆模式结合在一起会是什么样子。
По моей привычке, я обычно начинаю с составления списка API, а затем реализую его. Давайте предположим, что у нас есть API под названием `clone`, и на основе этого создадим клиентский API для режима клонирования. Упаковка кода в API несомненно повысит его стабильность. Например, в модели 5 клиенту требуется открыть три сокета, и адреса этих сокетов напрямую указаны в коде. Мы можем создать такой набор API:
```c
// Указание конечной точки для каждого сокета
clone_subscribe(clone, "tcp://localhost:5556");
clone_snapshot(clone, "tcp://localhost:5557");
clone_updates(clone, "tcp://localhost:5558");
// Поскольку есть два сервера, выполним это еще раз
clone_subscribe(clone, "tcp://localhost:5566");
clone_snapshot(clone, "tcp://localhost:5567");
clone_updates(clone, "tcp://localhost:5568");
Однако такой подход все еще кажется излишне длинным, поскольку нет необходимости раскрывать некоторые детали внутренней структуры API перед программистом. Сейчас мы используем три сокета, но в будущем может потребоваться использовать два или четыре. Мы не можем требовать, чтобы все приложения адаптировались к этому. Давайте упакуем эту информацию в API:
// Указание конечных точек для главного и резервного сервера
clone_connect(clone, "tcp://localhost:5551");
clone_connect(clone, "tcp://localhost:5561");
Таким образом, код становится более компактным, но также влияет на внутреннюю структуру существующего кода. Нам нужно будет выводить три других конечные точки из одной конечной точки. Одним из способов является предположение, что клиент и сервер используют три последовательные конечные точки для связи, и запись этого правила в протокол; другим — получение недостающих конечных точек от сервера. Мы выбрали первый, более простой метод: * Состояние сервера ROUTER на конечной точке P;
clone
и четвертый раздел класса flcliapi
очень похожи и состоят из двух частей: * Асинхронного агента режима клонирования, работающего в фоновом режиме. Этот агент управляет всеми операциями ввода-вывода и в реальном времени взаимодействует с сервером;clone
, работающего в главном приложении. При создании объекта clone
автоматически создаётся фоновый поток клонирования; при удалении объекта clone
этот фоновый поток также удаляется.Фронтальный класс clone
использует канал inproc для связи с фоновым агентом. В C библиотека czmq автоматически создаёт этот канал для нас. Это стандартный подход многопоточного программирования ZMQ.
Без использования ZMQ, такая асинхронная модель была бы трудной для работы под высокими нагрузками, а ZMQ делает её простой. Написание такого кода может быть сложным. Мы можем использовать реактивный подход для его реализации, но это увеличит сложность и затруднит использование приложения. Поэтому наш API будет больше напоминать таблицу ключевых значений, способную взаимодействовать с сервером:
clone_t *clone_new (void);
void clone_destroy (clone_t **self_p);
void clone_connect (clone_t *self, char *address, char *service);
void clone_set (clone_t *self, char *key, char *value);
char *clone_get (clone_t *self, char *key);
Вот пример кода модели 6 клиента-клонирования, который очень короткий благодаря использованию этого API: clonecli6: Клиент-клонирование, модель шесть на C``` // // Клонирование - клиент - модель 6 //
// Компиляция без создания библиотеки #include "clone.c"
#define SUBTREE "/client/"
int main (void) { // Создание распределённой хеш-таблицы clone_t *clone = clone_new ();
// Настройка
clone_subtree (clone, SUBTREE);
clone_connect (clone, "tcp://localhost", "5556");
clone_connect (clone, "tcp://localhost", "5566");
// Вставка случайных ключей и значений
while (!zctx_interrupted) {
// Генерация случайного значения
char key [255];
char value [10];
sprintf (key, "%s%d", SUBTREE, randof (10000));
sprintf (value, "%d", randof (1000000));
clone_set (clone, key, value, randof (30));
sleep (1);
}
clone_destroy (&clone);
return 0;
}
Вот реализация класса `clone`: **clone: Класс clone на C**
c
/* =====================================================================
clone - клиентская сторона Clone Pattern класса
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
Этот файл является частью руководства по ZeroMQ: http://zguide.zeromq.org
Это свободное программное обеспечение; вы можете распространять его и/или модифицировать его в соответствии с
условиями Генеральной общественной лицензии GNU версии 3, выпущенной Free Software Foundation, или (по вашему выбору) любой более поздней версией.
Это программное обеспечение распространяется в надежде, что оно будет полезным, но
БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ; даже без неявной гарантии
КОММЕРЧЕСКОЙ НАПРАВЛЕННОСТИ или ПРИГОДНОСТИ ДЛЯ ОПРЕДЕЛЕННЫХ ЦЕЛЕЙ. Смотрите Генеральную общественную лицензию GNU для более подробной информации.
Вы должны были получить копию Генеральной общественной лицензии GNU вместе с этим программным обеспечением. Если нет, смотрите
http://www.gnu.org/licenses/.
=====================================================================
*/
#include "clone.h"
// Время ожидания запроса
#define GLOBAL_TIMEOUT 4000 // мс
// Время жизни сервера
#define SERVER_TTL 5000 // мс
// Количество серверов
#define SERVER_MAX 2
// =====================================================================
// Синхронная часть, работающая в потоке приложения
// ---------------------------------------------------------------------
// Структура класса
struct _clone_t {
// ---------------------------------------------------------------------
// Указание поддерева для создания снимка и события обновления перед отправкой в агента-посредника
// Содержимое сообщения: [SUBTREE][subtree]
void clone_subtree(clone_t *self, char *subtree)
{
assert(self);
zmsg_t *msg = zmsg_new();
zmsg_addstr(msg, "SUBTREE");
zmsg_addstr(msg, subtree);
zmsg_send(&msg, self->pipe);
}
// ---------------------------------------------------------------------
// Подключение к новому серверному конечному устройству
// Содержимое сообщения: [CONNECT][endpoint][service]
void clone_connect(clone_t *self, char *address, char *service)
{
assert(self);
zmsg_t *msg = zmsg_new();
zmsg_addstr(msg, "CONNECT");
zmsg_addstr(msg, address);
zmsg_addstr(msg, service);
zmsg_send(&msg, self->pipe);
}
// ---------------------------------------------------------------------
// Установка нового значения
// Содержимое сообщения: [SET][key][value][ttl]
void clone_set(clone_t *self, char *key, char *value, int ttl)
{
char ttlstr[10];
sprintf(ttlstr, "%d", ttl);
assert(self);
zmsg_t *msg = zmsg_new();
zmsg_addstr(msg, "SET");
zmsg_addstr(msg, key);
zmsg_addstr(msg, value);
zmsg_addstr(msg, ttlstr);
zmsg_send(&msg, self->pipe);
}
``` zmsg_addstr(msg, ttlstr);
zmsg_send(&msg, self->pipe);
}
// ---------------------------------------------------------------------
// Получение значения
// Содержимое сообщения: [GET][key]
// Возвращает NULL, если клон недоступен
char *clone_get(clone_t *self, char *key)
{
assert(self);
assert(key);
zmsg_t *msg = zmsg_new();
zmsg_addstr(msg, "GET");
zmsg_addstr(msg, key);
zmsg_send(&msg, self->pipe);
zmsg_t *reply = zmsg_recv(self->pipe);
if (reply)
{
char *value = zmsg_popstr(reply);
zmsg_destroy(&reply);
return value;
}
return NULL;
}
// =====================================================================
// Асинхронная часть, выполняется в фоновом режиме
// ---------------------------------------------------------------------
// Информация о единичном серверном конечном устройстве
typedef struct
{
char *address; // Адрес серверного конечного устройства
int port; // Порт
void *snapshot; // Сокет для создания снимка
void *subscriber; // Сокет для получения событий обновления
uint64_t expiry; // Время истечения срока действия сервера
uint requests; // Количество запросов на создание снимков
} server_t;```markdown
static server_t *
server_new(zctx_t *ctx, char *address, int port, char *subtree)
{
server_t *self = (server_t *)zmalloc(sizeof(server_t));
zclock_log("I: adding server %s:%d. . . ", address, port);
self->address = strdup(address);
self->port = port;
self->snapshot = zsocket_new(ctx, ZMQ_DEALER);
zsocket_connect(self->snapshot, "%s:%d", address, port);
self->subscriber = zsocket_new(ctx, ZMQ_SUB);
zsocket_connect(self->subscriber, "%s:%d", address, port + 1);
zsockopt_set_subscribe(self->subscriber, subtree);
return self;
}
static void
server_destroy(server_t **self_p)
{
assert(self_p);
if (*self_p) {
server_t *self = *self_p;
free(self->address);
free(self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Бэкенд-агент
// Состояния
#define STATE_INITIAL 0 // До подключения
#define STATE_SYNCING 1 // Синхронизация
#define STATE_ACTIVE 2 // Активное состояние
typedef struct {
zctx_t *ctx; // Контекст
void *pipe; // Сокет для связи с основным потоком
zhash_t *kvmap; // Хэш-таблица ключей и значений
char *subtree; // Поддерево
server_t *server[SERVER_MAX];
uint nbr_servers; // Диапазон: 0 - SERVER_MAX
uint state; // Текущее состояние
uint cur_server; // Текущий мастер, 0/1
int64_t sequence; // Номер пары ключ-значение
void *publisher; // Сокет для публикации событий
} agent_t;
static agent_t *
agent_new(zctx_t *ctx, void *pipe)
{
agent_t *self = (agent_t *)zmalloc(sizeof(agent_t));
self->ctx = ctx;
self->pipe = pipe;
self->kvmap = zhash_new();
self->subtree = strdup("");
self->state = STATE_INITIAL;
self->publisher = zsocket_new(self->ctx, ZMQ_PUB);
return self;
}
static void
agent_destroy(agent_t **self_p)
{
assert(self_p);
if (*self_p) {
agent_t *self = *self_p;
int server_nbr;
for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++)
``````markdown
zmsg_t *msg = zmsg_recv(self->pipe);
char *command = zmsg_popstr(msg);
if (command == NULL)
return -1;
if (streq(command, "SUBTREE")) {
free(self->subtree);
self->subtree = zmsg_popstr(msg);
}
else if (streq(command, "CONNECT")) {
char *address = zmsg_popstr(msg);
char *service = zmsg_popstr(msg);
if (self->nbr_servers < SERVER_MAX) {
self->server[self->nbr_servers++] = server_new(
self->ctx, address, atoi(service), self->subtree);
// 广播更新事件
zsocket_connect(self->publisher, "%s:%d",
address, atoi(service) + 2);
} else {
zclock_log("E: слишком много серверов (макс. %d)", SERVER_MAX);
}
free(address);
free(service);
}
else if (streq(command, "SET")) {
char *key = zmsg_popstr(msg);
char *value = zmsg_popstr(msg);
char *ttl = zmsg_popstr(msg);
zhash_update(self->kvmap, key, (byte *)value);
zhash_freefn(self->kvmap, key, free);
// отправка пары ключ-значение на сервер
kvmsg_t *kvmsg = kvmsg_new(0);
kvmsg_set_key(kvmsg, key);
kvmsg_set_uuid(kvmsg);
kvmsg_fmt_body(kvmsg, "%s", value);
kvmsg_set_prop(kvmsg, "ttl", ttl);
kvmsg_send(kvmsg, self->publisher);
kvmsg_destroy(&kvmsg);
puts(key);
free(ttl);
free(key); // ключ-значение фактически контролируется объектом хеш-таблицы
}
else if (streq(command, "GET")) {
char *key = zmsg_popstr(msg);
char *value = zhash_lookup(self->kvmap, key);
if (value)
zstr_send(self->pipe, value);
else
zstr_send(self->pipe, "");
free(key);
free(value);
}
free(command);
zmsg_destroy(&msg);
return 0;
}
// ---------------------------------------------------------------------
// Асинхронный фоновый агент поддерживает пул серверов и обрабатывает запросы или ответы от приложений.
static void
``````c
clone_agent (void *args, zctx_t *ctx, void *pipe)
{
agent_t *self = agent_new (ctx, pipe);
while (TRUE)
{
zmq_pollitem_t poll_set [] =
{
{ pipe, 0, ZMQ_POLLIN, 0 },
{ 0, 0, ZMQ_POLLIN, 0 }
};
int poll_timer = -1;
int poll_size = 2;
server_t *server = self->server [self->cur_server];
switch (self->state)
{
case STATE_INITIAL:
// В этом состоянии, если есть доступные серверы, отправляется запрос на получение снимка
if (self->nbr_servers > 0)
{
zclock_log ("I: Ожидание сервера %s:%d... ",
server->address, server->port);
if (server->requests < 2)
{
zstr_sendm (server->snapshot, "ICANHAZ?");
zstr_send (server->snapshot, self->subtree);
server->requests++;
}
server->expiry = zclock_time () + SERVER_TTL;
self->state = STATE_SYNCING;
poll_set [1].socket = server->snapshot;
}
else
poll_size = 1;
break;
case STATE_SYNCING:
// В этом состоянии мы принимаем снимок данных от сервера, при неудаче пробуем другой сервер
poll_set [1].socket = server->snapshot;
break;
case STATE_ACTIVE:
// В этом состоянии мы получаем события обновления от сервера, при неудаче пробуем другой сервер
poll_set [1].socket = server->subscriber;
break;
}
if (server)
{
poll_timer = (server->expiry - zclock_time ()) * ZMQ_POLL_MSEC;
if (poll_timer < 0)
poll_timer = 0;
}
// ------------------------------------------------------------
// Цикл zmq_poll
int rc = zmq_poll (poll_set, poll_size, poll_timer);
if (rc == -1)
break; // Контекст был закрыт
if (poll_set [0].revents & ZMQ_POLLIN)
{
// Обработка входящих сообщений
}
if (poll_set [1].revents & ZMQ_POLLIN)
{
// Обработка входящих сообщений
}
}
}
``````markdown
revents & ZMQ_POLLIN) {
if (agent_control_message (self))
break; // Прерывание
}
else
if (poll_set [1].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv (poll_set [1].socket);
if (!kvmsg)
break; // Прерывание
// Любое сообщение от сервера обновляет его время истечения
server->expiry = zclock_time () + SERVER_TTL;
if (self->state == STATE_SYNCING) {
// Сохранение снимка данных
server->requests = 0;
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
self->sequence = kvmsg_sequence(kvmsg);
self->state = STATE_ACTIVE;
zclock_log("I: получен с %s:%d snapshot=%d",
server->address, server->port,
(int) self->sequence);
kvmsg_destroy(&kvmsg);
}
else
kvmsg_store(&kvmsg, self->kvmap);
}
else
if (self->state == STATE_ACTIVE) {
// Отбрасываем просроченные события обновления
if (kvmsg_sequence(kvmsg) > self->sequence) {
self->sequence = kvmsg_sequence(kvmsg);
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: получен с %s:%d update=%d",
server->address, server->port,
(int) self->sequence);
}
else
kvmsg_destroy(&kvmsg);
}
}
else {
// Сервер недоступен, пробуем следующий сервер
zclock_log("I: сервер %s:%d недоступен",
server->address, server->port);
self->cur_server = (self->cur_server + 1) % self->nbr_servers;
self->state = STATE_INITIAL;
}
}
agent_destroy(&self);
}
Последним является код модели 6 клонирующего сервера:
```c
//
// Клонирование - серверная часть - модель 6
//
// Прямая компиляция, без создания библиотеки
#include "bstar.c"
#include "kvmsg.c"
// API для bstar реактора
static int s_snapshots(zloop_t *loop, void *socket, void *args);
static int s_collector(zloop_t *loop, void *socket, void *args);
static int s_flush_ttl(zloop_t *loop, void *socket, void *args);
static int s_send_hugz(zloop_t *loop, void *socket, void *args);
static int s_new_master(zloop_t *loop, void *unused, void *args);
static int s_new_slave(zloop_t *loop, void *unused, void *args);
static int s_subscriber(zloop_t *loop, void *socket, void *args);
// Атрибуты сервера
typedef struct {
zctx_t *ctx; // Контекст
zhash_t *kvmap; // Хранение пар ключ-значение
bstar_t *bstar; // Ядро bstar реактора
int64_t sequence; // Номер события обновления
int port; // Основной порт
int peer; // Порт партнера
void *publisher; // Порт публикации событий обновления
void *collector; // Порт сбора событий обновления от клиентов
void *subscriber; // Порт подписки на события обновления от партнеров
zlist_t *pending; // Список отложенных событий обновления
Bool primary; // Является ли основным сервером
Bool master; // Является ли мастером
Bool slave; // Является ли слейвом
} clonesrv_t;
``````c
int main(int argc, char *argv[]) {
clonesrv_t *self = (clonesrv_t *)zmalloc(sizeof(clonesrv_t));
if (argc == 2 && streq(argv[1], "-p")) {
zclock_log("I: Запущен как основной сервер мастер, ожидаю подключения слейва.");
self->bstar = bstar_new(BSTAR_PRIMARY, "tcp://*:5003", "tcp://localhost:5004");
bstar_voter(self->bstar, "tcp://*:5556", ZMQ_ROUTER, s_snapshots, self);
self->port = 5556;
self->peer = 5566;
self->primary = TRUE;
} else if (argc == 2 && streq(argv[ Yöntem: Запущен как слейв, ожидаю подключения мастера.
self->bstar = bstar_new(BSTAR_BACKUP, "tcp://*:5004", "tcp://localhost:5003");
bstar_voter(self->bstar, "tcp://*:5566", ZMQ_ROUTER, s_snapshots, self);
self->port = 5566;
self->peer = 5556;
self->primary = FALSE;
} else {
printf("Использование: clonesrv4 { -p | -b }\n");
free(self);
exit(0);
}
// Хост станет мастером
if (self->primary) {
self->kvmap = zhash_new();
}
self->ctx = zctx_new();
self->pending = zlist_new();
bstar_set_verbose(self->bstar, TRUE);
// Устанавливаем сокет сервера клонирования
self->publisher = zsocket_new(self->ctx, ZMQ_PUB);
self->collector = zsocket_new(self->ctx, ZMQ_SUB);
zsocket_bind(self->publisher, "tcp://*:%d", self->port + 1);
zsocket_bind(self->collector, "tcp://*:%d", self->port + 2);
// Подключаемся как клиент клонирования к партнеру
self->subscriber = zsocket_new(self->ctx, ZMQ_SUB);
zsocket_connect(self->subscriber, "tcp://localhost:%d", self->peer + 1);
// Регистрируем обработчики событий состояния
bstar_new_master(self->bstar, s_new_master, self);
bstar_new_slave(self->bstar, s_new_slave, self);
// Регистрируем обработчики других событий реактора bstar
zloop_reader(bstar_zloop(self->bstar), self->collector, s_collector, self);
zloop_timer(bstar_zloop(self->bstar), 1000, 0, s_flush_ttl, self);
zloop_timer(bstar_zloop(self->bstar), 1000, 0, s_send_hugz, self);
// Запускаем реактор bstar
bstar_start(self->bstar);
// Прерывание, завершение.
while (zlist_size(self->pending)) {
printf("Использование: clonesrv4 { -p | -b }\n");
free(self);
exit(0);
}
// Хост станет мастером
if (self->primary) {
self->kvmap = zhash_new();
}
self->ctx = zctx_new();
self->pending = zlist_new();
bstar_set_verbose(self->bstar, TRUE);
// Устанавливаем сокет сервера клонирования
self->publisher = zsocket_new(self->ctx, ZMQ_PUB);
self->collector = zsocket_new(self->ctx, ZMQ_SUB);
zsocket_bind(self->publisher, "tcp://*:%d", self->port + 1);
zsocket_bind(self->collector, "tcp://*:%d", self->port + 2);
// Подключаемся как клиент клонирования к партнеру
self->subscriber = zsocket_new(self->ctx, ZMQ_SUB);
zsocket_connect(self->subscriber, "tcp://localhost:%d", self->peer + 1);
// Регистрируем обработчики событий состояния
bstar_new_master(self->bstar, s_new_master, self);
bstar_new_slave(self->bstar, s_new_slave, self);
// Регистрируем обработчики других событий реактора bstar
zloop_reader(bstar_zloop(self->bstar), self->collector, s_collector, self);
zloop_timer(bstar_zloop(self->bstar), 1000, 0, s_flush_ttl, self);
zloop_timer(bstar_zloop(self->bstar), 1000, 0, s_send_hugz, self);
// Запускаем реактор bstar
bstar_start(self->bstar);
// Прерывание, завершение.
while (zlist_size(self->pending)) {
``````markdown
kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop(self->pending);
kvmsg_destroy(&kvmsg);
}
zlist_destroy(&self->pending);
bstar_destroy(&self->bstar);
zhash_destroy(&self->kvmap);
zctx_destroy(&self->ctx);
free(self);
return 0;
}
// ---------------------------------------------------------------------
// Отправка содержимого снимка
static int s_send_single(char *key, void *data, void *args);
// Информация запросчика
typedef struct {
void *socket; // ROUTER сокет
zframe_t *identity; // Идентификатор запросчика
char *subtree; // Поддерево
} kvroute_t;
static int
s_snapshots(zloop_t *loop, void *snapshot, void *args) {
clonesrv_t *self = (clonesrv_t *) args;
zframe_t *identity = zframe_recv(snapshot);
if (identity) {
// Запрос находится во второй фрейме сообщения
char *request = zstr_recv(snapshot);
char *subtree = NULL;
if (streq(request, "ICANHAZ?")) {
free(request);
subtree = zstr_recv(snapshot);
}
else {
printf("E: Ошибочный запрос, завершение работы.\n");
if (subtree) {
// Отправка состояния
kvroute_t routing = { snapshot, identity, subtree };
zhash_foreach(self->kvmap, s_send_single, &routing);
// Отправка сообщения о завершении, а также номера сообщения
zclock_log("I: Отправка состояния, версия: %d", (int) self->sequence);
zframe_send(&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new(self->sequence);
kvmsg_set_key(kvmsg, "KTHXBAI");
kvmsg_set_body(kvmsg, (byte *) subtree, 0);
kvmsg_send(kvmsg, snapshot);
kvmsg_destroy(&kvmsg);
free(subtree);
}
}
}
return 0;
}
// Каждый раз отправляем пару ключ-значение состояния
static int
s_send_single(char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen(kvroute->subtree) <= strlen(kvmsg_key(kvmsg)))
memcmp(kvroute->subtree,
kvmsg_key(kvmsg), strlen(kvroute->subtree)) == 0) {
// Сначала отправляем адрес получателя
zframe_send(&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send(kvmsg, kvroute->socket);
}
return 0;
}
// ---------------------------------------------------------------------
// Сбор обновлений от клиентов
// Если мы мастер, записываем событие в объект kvmap;
// Если мы слейв, записываем его в очередь задержек
static int s_was_pending(clonesrv_t *self, kvmsg_t *kvmsg);
static int
s_collector(zloop_t *loop, void *collector, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = kvmsg_recv(collector);
kvmsg_dump(kvmsg);
if (kvmsg) {
if (self->master) {
kvmsg_set_sequence(kvmsg, ++self->sequence);
kvmsg_send(kvmsg, self->publisher);
int ttl = atoi(kvmsg_get_prop(kvmsg, "ttl"));
if (ttl)
kvmsg_set_prop(kvmsg, "ttl",
"%" PRId64, zclock_time() + ttl * 1000);
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: Публикация обновления события: %d", (int) self->sequence);
}
else {
// Если мы уже получили это событие от мастера, отбрасываем сообщение
if (s_was_pending(self, kvmsg))
```markdown
kvmsg_destroy(&kvmsg);
} else {
zlist_append(self->pending, kvmsg);
}
}
}
return 0;
}
// Если сообщение уже находится в списке задержек, то удаляем его и возвращаем TRUE
static int
s_was_pending(clonesrv_t *self, kvmsg_t *kvmsg)
{
kvmsg_t *held = (kvmsg_t *) zlist_first(self->pending);
while (held) {
if (memcmp(kvmsg_uuid(kvmsg), kvmsg_uuid(held), sizeof(uuid_t)) == 0) {
zlist_remove(self->pending, held);
return TRUE;
}
held = (kvmsg_t *) zlist_next(self->pending);
}
return FALSE;
}
// ---------------------------------------------------------------------
``````markdown
// Удаление мгновенного значения с истекшим сроком действия
static int s_flush_single(char *key, void *data, void *args);
static int
s_flush_ttl(zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
zhash_foreach(self->kvmap, s_flush_single, args);
return 0;
}
// Если ключ-значение истекло, то выполняется удаление и публикуется событие
static int
s_flush_single(char *key, void *data, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
int64_t ttl;
sscanf(kvmsg_get_prop(kvmsg, "ttl"), "%" PRId64, &ttl);
if (ttl && zclock_time() >= ttl) {
kvmsg_set_sequence(kvmsg, ++self->sequence);
kvmsg_set_body(kvmsg, (byte *) "", 0);
kvmsg_send(kvmsg, self->publisher);
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: Удаление события отправлено: %d", (int) self->sequence);
}
return 0;
}
// ---------------------------------------------------------------------
// Отправка пульса
static int
s_send_hugz(zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = kvmsg_new(self->sequence);
kvmsg_set_key(kvmsg, "HUGZ");
kvmsg_set_body(kvmsg, (byte *) "", 0);
kvmsg_send(kvmsg, self->publisher);
kvmsg_destroy(&kvmsg);
return 0;
}
// ---------------------------------------------------------------------
// Обработчик событий изменения состояния
// Мы переходим в состояние master
//
// Второстепенный сервер сначала обновляет события из списка задержек в свой снимок,
// а затем начинает принимать запросы на снимки от клиентов.
static int
s_new_master(zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
self->master = True;
self->slave = False;
zloop_cancel(bstar_zloop(self->bstar), self->subscriber);
// Применение событий из списка задержек
while (zlist_size(self->pending)) {
kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop(self->pending);
kvmsg_set_sequence(kvmsg, ++self->sequence);
kvmsg_send(kvmsg, self->publisher);
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: Отправка события из списка задержек: %d", (int) self->sequence);
return 0;
}
static int s_new_slave(zloop_t *loop, void *unused, void *args) { clonesrv_t *self = (clonesrv_t *) args; zhash_destroy(&self->kvmap); self->master = False; self->slave = True; zloop_reader(bstar_zloop(self->bstar), self->subscriber, s_subscriber, self); return 0; }
static int s_subscriber(zloop_t *loop, void *subscriber, void *args) { clonesrv_t *self = (clonesrv_t *) args; // Получение снимка, если это необходимо if (self->kvmap == NULL) { self->kvmap = zhash_new(); void *snapshot = zsocket_new(self->ctx, ZMQ_DEALER); zsocket_connect(snapshot, "tcp://localhost:%d", self->peer); zclock_log("I: Запрос снимка: tcp://localhost:%d", self->peer); zstr_send(snapshot, "ICANHAZ?"); while (True) { kvmsg_t *kvmsg = kvmsg_recv(snapshot); if (!kvmsg) { break; // Прерывание } if (streq(kvmsg_key(kvmsg), "KTHXBAI")) { self->sequence = kvmsg_sequence(kvmsg); kvmsg_destroy(&kvmsg); break; // Завершение } kvmsg_store(&kvmsg, self->kvmap); } zclock_log("I: Получен снимок, версия: %d", (int) self->sequence); zsocket_destroy(self->ctx, snapshot); } // Поиск и удаление kvmsg_t *kvmsg = kvmsg_recv(subscriber); if (!kvmsg) { return 0; } if (strneq(kvmsg_key(kvmsg), "HUGZ")) { if (!s_was_pending(self, kvmsg)) {``````markdown // Если событие обновления master поступает раньше, чем событие клиента, то событие master добавляется в список задержек, // и при получении события обновления от клиента, оно удаляется из списка. zlist_append(self->pending, kvmsg_dup(kvmsg)) } // Если событие обновления имеет большую последовательность, чем версия kvmap, применяется это событие if (kvmsg_sequence(kvmsg) > self->sequence) { self->sequence = kvmsg_sequence(kvmsg) kvmsg_store(&kvmsg, self->kvmap) zclock_log("I: Получено событие обновления: %d", (int) self->sequence) } else kvmsg_destroy(&kvmsg) } else kvmsg_destroy(&kvmsg) return 0 }
Этот программный код состоит всего из нескольких сотен строк, но всё равно занял некоторое время для отладки. В этой модели включены механизмы восстановления после сбоев, мгновенные значения, поддеревья и прочее. Несмотря на то, что мы тщательно спроектировали его заранее, отладка между несколькими сокетами всё ещё представляет собой сложную задачу. Ниже приведён мой подход к работе:
```* Используя реактор (bstar, основанный на zloop), мы значительно сократили объём кода, сделав программу более понятной. Вся служба работает в одном потоке, поэтому нет проблем с межпоточным взаимодействием. Достаточно передать указатель на структуру (self) всем обработчикам. Кроме того, использование реактора позволяет сделать код более модульным и удобным для переиспользования.
* Мы отладывали каждый модуль по отдельности, переходя к следующему только тогда, когда текущий работал корректно. Поскольку используется четыре-пять сокетов, это увеличивает объём работы по отладке. Я выводил информацию об отладке непосредственно на экран, так как не было необходимости использовать отдельный отладочный инструмент.
* Поскольку я постоянно использовал инструмент valgrind для тестирования, я уверен, что программа не имеет утечек памяти. В C-языке утечки памяти являются серьёзной проблемой, поскольку нет автоматической системы сбора мусора. Правильное использование абстракций, таких как kvmsg и czmq, помогает избежать утечек памяти.
В этом коде, конечно, могут быть ошибки, и некоторые читатели могут помочь мне в их отладке и исправлении, за что я благодарен.При тестировании модели 6 сначала запускаются основной и резервный серверы, а затем группа клиентов. Порядок запуска может быть произвольным. Случайным образом прекращается работа одного из сервисных процессов; если дизайн программы правильный, клиенты должны получать согласованные данные.#### Клонирование протокола
После затраченных усилий на создание надёжной модели публикации-подписки, мы хотим, чтобы её можно было легко расширять в будущем. Лучшим способом является создание протокола, который можно реализовать на различных языках.
Мы называем его "протоколом распределённой хэш-таблицы", это механизм управления хэш-таблицами в распределённых системах, предоставляющий возможности для многоклиентского взаимодействия; клиенты могут работать только с поддеревьями данных, включая обновление и установку мгновенных значений.
* http://rfc.zeromq.org/spec:12
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )