В первой главе мы попробовали несколько моделей коммуникации ZMQ: запрос-ответ, публикация-подписка и сокеты-трубопровод. В этой главе мы углубимся в изучение вещей, которые будут полезны при реальном использовании:
Содержание главы включает:
Символ Ø в слове ØMQ долгое время вызывал путаницу. С одной стороны, этот специальный символ снижает количество упоминаний ZMQ в Google и Twitter; с другой стороны, он может раздражать некоторых датчан, которые могут сказать, что Ø не является странным нулем.
Начиналось с того, что ZMQ означало "нулевое middleware", "нулевую задержку". Однако со временем это понятие приобрело новые значения: "нулевое управление", "нулевые затраты", "нулевой отход". В целом, ноль означает минимальность и простоту, что является философией этого проекта. Мы стремимся уменьшить сложность и повысить удобство использования.
Честно говоря, ZMQ немного обманывает ожидания. Но мы не собираемся извиняться за это, потому что эта концептуальная смена ни в коем случае не принесет вреда. ZMQ предоставляет API, похожее на BSD сокеты, но скрывает многие детали механизма обработки сообщений. Со временем вы привыкнете к этому и будете рады использовать его для программирования.
Сокеты фактически являются стандартным интерфейсом для сетевого программирования, и одна из причин, почему ZMQ так привлекательна, заключается в том, что она основана на стандартном API сокетов. Поэтому операции с сокетами ZMQ очень легко понять, и их жизненный цикл состоит из четырёх основных частей:* Создание и удаление сокетов: zmq_socket(), zmq_close()
Вот C-код:
void *mousetrap;
// Создаем сокет для ловли мышей
mousetrap = zmq_socket(context, ZMQ_PULL);
// Настроим сокет
int64_t jawsize = 10000;
zmq_setsockopt(mousetrap, ZMQ_HWM, &jawsize, sizeof(jawsize));
// Подключаем сокет к отверстию для мышей
zmq_connect(mousetrap, "tcp://192.168.55.221:5001");
// Ждем, пока вкусная мышь не придет
zmq_msg_t mouse;
zmq_msg_init(&mouse);
zmq_recv(mousetrap, &mouse, 0);
// Уничтожаем мышь
zmq_msg_close(&mouse);
// Уничтожаем сокет
zmq_close(mousetrap);
Обратите внимание, что сокет всегда является указателем пустого типа, а сообщение представляет собой структуру данных (о которой мы будем говорить ниже). Поэтому в C вы передаете сокет через переменную, а сообщение — через ссылку. Важно помнить, что все сокеты в ZMQ управляются самим ZMQ, а сообщения управляются программистом.
Создание, удаление и настройка сокетов похожи на работу с объектами, но помните, что ZMQ асинхронен и масштабируем, поэтому понимание его применения в сетях может потребовать времени.
zmq_bind()
, а другой — zmq_connect()
. Обычно узел, использующий zmq_bind()
, называется сервером и имеет постоянный сетевой адрес; узел, использующий zmq_connect()
, называется клиентом и имеет изменчивый адрес. Мы говорим о том, что сокет привязан к конечной точке, а другой сокет подключен к этой конечной точке. Конечная точка — это известный сетевой адрес.Связи ZMQ отличаются от традиционных TCP-соединений следующими особенностями:* Используются различные протоколы: inproc (внутри процесса), ipc (между процессами), tcp, pgm (multicast), epgm;zmq_connect()
клиентом, даже если конечная точка ещё не была привязана с помощью zmq_bind()
;zmq_accept()
, так как сокет автоматически начинает принимать соединения после привязки к конечной точке;zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "ipc://myserver.ipc");
Конечно, вы не можете привязываться к одному и тому же конечному устройству несколько раз, так как это вызовет ошибку.
Каждый раз, когда клиентский узел использует zmq_connect() для подключения к одному из вышеупомянутых конечных устройств, сервер автоматически создает соединение. ZMQ не ограничивает количество соединений. Кроме того, клиентский узел также может использовать один сокет для создания нескольких соединений одновременно.
В большинстве случаев, какой узел является сервером, а какой — клиентом, определяется на уровне сетевой архитектуры, а не на уровне потока сообщений. Однако есть некоторые особые случаи (например, повторная отправка сообщений после потери соединения), где использование одного и того же сокета для привязки и подключения может привести к различиям в поведении.
Поэтому, когда вы проектируете архитектуру, следует придерживаться принципа "сервер стабилен, клиент гибок", чтобы избежать ошибок.Сокеты имеют тип, и тип сокета определяет его поведение, правила отправки и приема сообщений и т.д. Вы можете соединять сокеты различных типов, например PUB-SUB комбинацию, которая называется публикация-подписка моделью. Другие комбинации также имеют свои названия моделей, которые будут подробно рассмотрены ниже.Именно благодаря возможности использования различных способов соединения сокетов, ØMQ обеспечивает базовую систему очередей сообщений. На этой основе можно строить более сложные устройства и маршрутизацию, что будет подробно рассмотрено ниже. В целом, ØMQ предоставляет вам набор компонентов для сборки и использования в вашей сетевой архитектуре.
Отправка и прием сообщений осуществляются с помощью функций zmq_send()
и zmq_recv()
. Хотя названия функций кажутся простыми, понимание их работы требует времени из-за отличий I/O модели ØMQ от традиционного протокола TCP.
Давайте рассмотрим различия между TCP сокетами и ØMQ сокетами в передаче данных:* ZMQ сокеты передают сообщения, а не байты (TCP) или фреймы (UDP). Сообщение представляет собой блок двоичных данных определённой длины. Мы подробнее поговорим о сообщениях ниже, так как это решение направлено на оптимизацию производительности и может быть сложно понять.
Поэтому, когда вы отправляете сообщение в сокет, оно может быть передано нескольким узлам, и соответственно, сокет будет принимать сообщения со всех установленных соединений. Метод zmq_recv()
использует алгоритм справедливого очередного обслуживания для выбора сообщения для получения.
При вызове метода zmq_send()
сообщение фактически не отправляется в сокет. Оно сохраняется в буферной очереди памяти, и отправляется асинхронно фоновым I/O потоком. Если всё работает правильно, этот процесс является неблокирующим. Поэтому, даже если метод zmq_send()
вернул значение, это не означает, что сообщение было отправлено. После инициализации сообщения с помощью метода zmq_msg_init_data()
, его нельзя использовать повторно или освободить, иначе I/O поток ZMQ будет считать, что он отправляет мусорные данные. Это распространённая ошибка для новичков, и мы подробнее рассмотрим правильное использование сообщений ниже.
Часто новички спрашивают, как использовать ZMQ для создания услуги. Можно ли создать HTTP-сервер с помощью ZMQ?
Ожидаемый ответ заключается в том, что мы используем обычные сокеты для передачи HTTP-запросов и ответов, и ZMQ-сокеты также могут выполнять эту задачу, причём быстрее и эффективнее.
К сожалению, ответ не так прост. ZMQ — это не просто инструмент для передачи данных, а новая архитектура, созданная поверх существующих протоколов связи. Его данные имеют специфическую структуру, которая не совместима с существующими протоколами, как показано на сравнении HTTP-запроса и запроса ZMQ, использующего одинаковый TCP/IP-протокол:
HTTP-запрос использует CR-LF (перенос строки) для разделения информационных кадров, тогда как ZMQ использует фиксированную длину для определения кадров:
Поэтому вы действительно можете использовать ZMQ для создания чего-то похожего на HTTP-протокол, но это уже не будет HTTP.
Однако, если кто-то спросит меня, как лучше использовать ZMQ для создания новой услуги, я могу дать хороший ответ: вы можете разработать свой собственный протокол связи, использовать ZMQ для подключения, предоставлять услуги и расширения на разных языках программирования, как локально, так и через удалённую передачу. Архитектура сети Mongrel2 Сэда Шо является отличным примером этого подхода.### Поток ввода/вывода
Мы упоминали, что ZMQ использует фоновый поток ввода/вывода для передачи сообщений. Один поток ввода/вывода достаточно для обслуживания нескольких сокетов, за исключением крайних случаев. Это то, что мы передаем при создании контекста:
void *context = zmq_init (1);
Одним из отличий ZMQ-приложений от традиционных приложений является то, что вам не нужно создавать соединение для каждого сокета. Одиночный ZMQ-сокет может обрабатывать все задачи отправки и приема. Например, если вам нужно отправить сообщение тысяче подписчиков, вы можете использовать один сокет; если вам нужно распределить задачи двадцати служебным процессам, вы также можете использовать один сокет; если вам нужно получать данные из тысячи веб-приложений, вы снова используете один сокет.Эта особенность может перевернуть подход к написанию сетевых приложений. В традиционных приложениях каждый процесс или поток имеет одно удалённое соединение, которое может обслуживать только один сокет. ZMQ позволяет вам отказаться от этой структуры и использовать один поток для выполнения всех задач, что делает расширение более простым.
API сокетов ZMQ предлагает множество моделей сообщений. Если вы знакомы с корпоративными системами передачи сообщений, эти модели могут показаться вам знакомыми. Однако для новичков ZMQ-сокеты могут вызвать удивление.
Давайте вспомним, что делает ZMQ для вас: он быстро и эффективно передаёт сообщения на другие узлы, где узлом может быть поток, процесс или другой компьютер; ZMQ предоставляет простой API сокета для приложений, не требуя знания реального протокола (внутри процесса, между процессами, TCP или широковещательная рассылка); когда узлы перемещаются, ZMQ автоматически подключается или переподключается; независимо от того, отправляете ли вы или принимаете сообщения, ZMQ помещает их в очередь, гарантируя, что процесс не будет завален памятью и что сообщения будут записаны на диск в нужное время; ZMQ обрабатывает исключения сокетов; все операции ввода/вывода выполняются в фоновом режиме; ZMQ не создаёт мёртвых потоков.### Поток ввода/вывода
Мы упоминали, что ZMQ использует фоновый поток ввода/вывода для передачи сообщений. Один поток ввода/вывода достаточно для обслуживания нескольких сокетов, за исключением крайних случаев. Это то, что мы передаем при создании контекста:
void *context = zmq_init (1);
ZMQ-приложения отличаются от традиционных тем, что вам не нужно создавать соединение для каждого сокета. Один ZMQ-сокет может обрабатывать все задачи отправки и приема. Например, если вам нужно отправить сообщение тысяче подписчиков, вы можете использовать один сокет; если вам нужно распределить задачи двадцати служебным процессам, вы также можете использовать один сокет; если вам нужно получать данные из тысячи веб-приложений, вы снова используете один сокет.
Эта особенность может перевернуть подход к написанию сетевых приложений. В традиционных приложениях каждый процесс или поток имеет одно удаленное соединение, которое может обслуживать только один сокет. ZMQ позволяет вам отказаться от этой структуры и использовать один поток для выполнения всех задач, что делает расширение более простым.
API сокетов ZMQ предлагает множество моделей сообщений. Если вы знакомы с корпоративными системами передачи сообщений, эти модели могут показаться вам знакомыми. Однако для новичков ZMQ-сокеты могут вызвать удивление.Давайте вспомним, что делает ZMQ для вас: он быстро и эффективно передает сообщения на другие узлы, где узлом может быть поток, процесс или другой компьютер; ZMQ предоставляет простой API сокета для приложений, не требуя знания реального протокола (внутри процесса, между процессами, TCP или широковещательная рассылка); когда узлы перемещаются, ZMQ автоматически подключается или переподключается; независимо от того, отправляете ли вы или принимаете сообщения, ZMQ помещает их в очередь, гарантируя, что процесс не будет завален памятью и что сообщения будут записаны на диск в нужное время; ZMQ обрабатывает исключения сокетов; все операции ввода/вывода выполняются в фоновом режиме; ZMQ не создает мертвых LOCKs. Сообщение-модели ZMQ представляют собой сочетание различных типов сокетов. Другими словами, чтобы понять сообщение-модели ZMQ, вам нужно понять типы сокетов ZMQ и то, как они работают вместе. Эта часть требует зубрежки.
Основные сообщение-модели ZMQ включают:
Разделение запросов и ответов соединяет группу серверов с группой клиентов для удаленного вызова процедур или распределения задач.
Публикация и подписка соединяет группу публикаторов с группой подписчиков для распространения данных.* Пipelined модель использует ввод/вывод для сборки нескольких узлов, что может создать несколько шагов или циклы, для построения параллельной обработки.
Исправленный вариант:
Трубопроводная модель использует ввод/вывод для сборки нескольких узлов, что может создать несколько шагов или циклов для построения параллельной обработки.Мы уже рассказали о этих моделях в первой главе, но есть ещё одна модель, предназначенная для тех, кто всё ещё считает ZMQ аналогичным TCP для соединений типа точка-к-точка:
Уникальное соединение соединяет два сокета один-к-одному, эта модель имеет мало применений, и мы увидим пример в конце этой главы.
Страница с описанием функции zmq_socket() содержит информацию обо всех сообщение-моделях, которая достаточно ясна, поэтому её стоит прочитать несколько раз. Мы рассмотрим содержание каждой модели сообщений и области их применения.
Вот допустимые пары связей сокетов-соединений:
Другие комбинации могут привести к непредсказуемым результатам и могут вернуть ошибку в будущих версиях ZMQ. Вы также можете изучить поведение этих типов сокетов через код.
Четыре основные сообщение-модели вышеупомянуты встроены в ZMQ и являются частью API, реализованного в C++ библиотеке ядра ZMQ, гарантирующей правильную работу. Если когда-нибудь Linux ядро примет ZMQ, эти основные модели будут включены.Над этими сообщения-моделями мы можем строить более сложные верхнеуровневые сообщения-модели. Эти модели можно написать на любом языке программирования; они не являются частью основных типов и не распространяются вместе с ZMQ, а используются только в ваших собственных приложениях или поддерживаются сообществом ZMQ.Одной из целей этого руководства является предоставление некоторых верхнеуровневых моделей сообщений, от простых (правильная обработка сообщений) до сложных (надежная публикация и подписка).
Единицей передачи в ZMQ является сообщение, то есть двоичный блок. Вы можете использовать любую сериализацию, такую как Google Protocol Buffers, XDR, JSON и т.д., чтобы преобразовать содержимое в сообщение ZMQ. Однако этот сериализатор должен быть удобным и быстрым, что зависит от вас. В памяти ZMQ-сообщение представляет структуру zmq_msg_t (каждый язык имеет своё представление). При работе с ZMQ-сообщениями в C следует учитывать следующие моменты:* Вам нужно создавать и передавать объекты типа zmq_msg_t, а не просто набор данных;
// Получение ZMQ-строки с сокета и преобразование её в строку C
static char *
s_recv (void *socket) {
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
int size = zmq_msg_size (&message);
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
zmq_msg_close (&message);
string[size] = 0;
return (string);
}
// Преобразование строки C в ZMQ-строку и отправка её через сокет
static int
s_send (void *socket, char *string) {
int rc;
zmq_msg_t message;
zmq_msg_init_size (&message, strlen (string));
memcpy (zmq_msg_data (&message), string, strlen (string));
rc = zmq_send (socket, &message, 0);
assert (!rc);
zmq_msg_close (&message);
return (rc);
}
Вы можете расширить этот код, чтобы он поддерживал отправку и получение данных любой длины.Важно отметить, что после передачи объекта сообщения функцией zmq_send()
, его размер будет обнулен, поэтому невозможно отправить одно и то же сообщение дважды, а также получить содержимое уже отправленного сообщения. Если вы хотите отправить один и тот же объект сообщения дважды, вам нужно создать новый объект перед отправкой первого сообщения, используя функцию zmq_msg_copy()
. Эта функция не копирует содержимое сообщения, а лишь копирует ссылку. После этого вы можете снова отправить это сообщение (или любое количество раз, если произвели достаточное количество копий). Объект сообщения будет уничтожен, когда последняя ссылка на него будет освобождена. Поддержка ZMQ многокадровых сообщений позволяет сохранять несколько кадров сообщений в одном сообщении. Это широко используется в реальных приложениях, и мы подробно рассмотрим это в третьей главе.Что касается сообщений, есть несколько моментов, которые следует учитывать:
zmq_msg_close()
, за исключением случаев, когда язык автоматически освобождает объект сообщения при выходе переменной за область видимости.Еще раз: не используйте функцию zmq_msg_init_data()
без необходимости. Она предназначена для нулевых копий и может вызвать проблемы. В ZMQ есть много вещей, которые стоит изучить, поэтому сейчас не стоит беспокоиться о том, чтобы экономить несколько микросекунд.
В предыдущих примерах основной цикл программы выполнял следующие действия:
Мы пока не будем использовать zmq_poll(), а воспользуемся NOBLOCK (несинхронным) режимом для чтения сообщений из нескольких сокетов. Далее объединим примеры метеорологической службы и параллельной обработки:msreader: Чтение из нескольких сокетов на Cc // // Получение сообщений из нескольких сокетов // В этом примере используется функция recv в цикле // #include "zhelpers.h" int main (void) { // Подготовка контекста и сокетов void *context = zmq_init (1); // Подключение к распределителю задач void *receiver = zmq_socket (context, ZMQ_PULL); zmq_connect (receiver, "tcp://localhost:5557"); // Подключение к сервису погоды void *subscriber = zmq_socket (context, ZMQ_SUB); zmq_connect (subscriber, "tcp://localhost:5556"); zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001", 5); // Обработка сообщений, полученных из двух сокетов // Здесь мы обрабатываем сообщения от распределителя задач первыми while (1) { // Обработка ожидающих задач int rc; for (rc = 0; ! rc; ) { zmq_msg_t task; zmq_msg_init (&task); if ((rc = zmq_recv (receiver, &task, ZMQ_NOBLOCK)) == 0) { // Обработка задачи } zmq_msg_close (&task); } // Обработка ожидающих обновлений погоды for (rc = 0; ! rc; ) { zmq_msg_t update; zmq_msg_init (&update); if ((rc = zmq_recv (subscriber, &update, ZMQ_NOBLOCK)) == 0) { // Обработка обновления погоды } zmq_msg_close (&update); } // Нет сообщений, ждем 1 миллисекунду s_sleep (1); } // Программа не должна доходить до этой точки, но все равно выполняем правильную очистку zmq_close (receiver); zmq_close (subscriber); zmq_term (context); return 0; } ``````Одним из недостатков этого подхода является задержка в 1 миллисекунду до получения первого сообщения, что может создать проблемы в высоконагруженных программах. Кроме того, вам потребуется просмотреть функции, такие как `nanosleep()`, чтобы избежать увеличения количества циклов.
Пример повышает приоритет задачи-распределителя, и вы можете сделать улучшение, чередуя обработку сообщений, как это делается внутри ZMQ с помощью механизма справедливого очередного обслуживания.
Давайте рассмотрим, как можно использовать zmq_poll()
для реализации такой же функциональности:
Множественный сокетный опрос на C
//
// Получение сообщений с нескольких сокетов
// В этом примере используется функция zmq_poll()
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Подключение к распределителю задач
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Подключение к службе обновления погоды
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
// Инициализация объекта опроса
zmq_pollitem_t items[] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ subscriber, 0, ZMQ_POLLIN, 0 }
};
// Обработка сообщений от двух сокетов
while (1) {
zmq_msg_t message;
zmq_poll (items, 2, -1);
if (items[0].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (receiver, &message, 0);
// Обработка задачи
zmq_msg_close (&message);
}
if (items[1].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (subscriber, &message, 0);
// Обработка обновлений погоды
zmq_msg_close (&message);
}
}
// Программа никогда не достигнет этой точки
zmq_close (receiver);
zmq_close (subscriber);
zmq_term (context);
return 0;
}
```### Обработка ошибок и сигнала ETERM
Механизм обработки ошибок в ZMQ основан на принципе быстрого отказа. Мы считаем, что процесс должен быть максимально уязвимым к внутренним ошибкам, но достаточно устойчивым к внешним атакам и ошибкам. Например, живые клетки распадаются при обнаружении внутренних проблем, но активно защищаются от внешних воздействий. В ZMQ программировании использование утверждений (assertions) очень распространено, как и защита клеточной мембраны. Если мы не можем определить, является ли ошибка внутренней или внешней, это указывает на дизайн-дефект, который следует исправить. В C-языке, если проверка аргумента в assert() не пройдет, программа немедленно завершится. В других языках можно использовать исключения для достижения того же эффекта.
Когда ZMQ обнаруживает проблему с внешней стороны, он возвращает ошибку вызывающему процессу. Если ZMQ не может восстановиться после ошибки, он не просто тихо отбрасывает сообщение. В некоторых случаях ZMQ также использует ассерт для внешних ошибок, что можно рассматривать как баг.На данный момент мы редко видим примеры обработки ошибок в C-коде. **В реальном коде следует обрабатывать ошибки для каждого вызова функции ZMQ**. Если вы не пишете на C, возможно, ваша библиотека ZMQ уже выполняет эту обработку. Однако на C вам придётся делать это самостоятельно. Вот некоторые общие методы обработки ошибок, начиная с POSIX:* Методы создания объектов возвращают NULL при неудаче;
* Другие методы возвращают 0 при успешном выполнении, и другое значение (обычно -1) при неудаче;
* Код ошибки можно получить из переменной errno или вызовом функции zmq_errno();
* Сообщение об ошибке можно получить с помощью функции zmq_strerror().
Существуют две ситуации, которые не следует рассматривать как ошибки:
* Когда поток использует NOBLOCK для вызова zmq_recv(), и нет доступных сообщений, этот метод вернет -1 и установит errno в EAGAIN;
* Когда поток вызывает zmq_term(), и другие потоки находятся в состоянии блокировки, эта функция завершает все операции, закрывает сокеты и делает возвращаемое значение -1 для блокирующих методов, устанавливая errno в ETERM.
Соблюдая эти правила, вы можете использовать ассерты в своих программах ZMQ:
```c
void *context = zmq_init (1);
assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc;
rc = zmq_bind (socket, "tcp://*:5555");
assert (rc == 0);
В первых версиях программы я помещал вызовы функций непосредственно внутрь assert(), что было недопустимым, так как некоторые оптимизаторы могут удалить assert() из кода.
Давайте рассмотрим, как правильно завершить процесс. Возьмем пример с использованием режима трубопровода. Когда мы запускаем группу рабочих потоков в фоновом режиме, нам нужно будет завершить их после выполнения задач. Мы можем отправить сообщение самоубийства этим рабочим потокам, и это лучше сделать через сборщика результатов.Как соединить сборщик результатов с рабочими потоками? PUSH-PULL сокеты односторонние. Принцип ZMQ гласит: если требуется решение нового вопроса, следует использовать новый тип сокета. В этом случае мы используем паттерн публикация-подписка для отправки сообщения самоубийства:
Этот процесс не требует добавления большого количества кода:
void *control = zmq_socket (context, ZMQ_PUB);
zmq_bind (control, "tcp://*:5559");
...
// Отправка сигнала самоуничтожения worker'ам
zmq_msg_init_data (&message, "KILL", cq_length("KILL"));
zmq_send (control, &message, 0);
zmq_msg_close (&message);
Вот код worker-процесса, который открывает три сокета: PULL для получения задач, PUSH для отправки результатов и SUB для получения сигнала самоуничтожения, используя zmq_poll() для опроса:taskwork2: Параллельный worker задачи с сигналами самоуничтожения на C```c // // Pipeline Pattern - Worker Design 2 // Добавлен публикация-подписка потока сообщений для получения сообщений о самоуничтожении // #include "zhelpers.h"
int main (void) { void *context = zmq_init (1);
// Socket for receiving messages
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Socket for sending messages
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// Socket for receiving control messages
void *controller = zmq_socket (context, ZMQ_SUB);
zmq_connect (controller, "tcp://localhost:5559");
zmq_setsockopt (controller, ZMQ_SUBSCRIBE, "", 0);
// Handle received tasks or control messages
zmq_pollitem_t items [] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ controller, 0, ZMQ_POLLIN, 0 }
};
// Process messages
while (1) {
zmq_msg_t message;
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (receiver, &message, 0);
// Work
s_sleep (atoi ((char *) zmq_msg_data (&message)));
// Send result
zmq_msg_init (&message);
zmq_send (sender, &message, 0);
// Simple task progress indicator
printf (".");
fflush (stdout);
zmq_msg_close (&message);
}
// Any control command indicates suicide
if (items [1].revents & ZMQ_POLLIN)
break; // Exit loop
}
// Terminate program
zmq_close (receiver);
zmq_close (sender);
zmq_close (controller);
zmq_term (context);
return 0;
}
```c
//
// Пайплайн-модель - Сборщик результатов версия 2
// Добавлен публикационно-подписочный поток сообщений для отправки сигналов самоуничтожения worker'ам
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Сокет для получения сообщений
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// Сокет для отправки управления
void *controller = zmq_socket (context, ZMQ_PUB);
zmq_bind (controller, "tcp://*:5559");
// Ждать начала задач
char *string = s_recv (receiver);
free (string);
// Начать отсчёт времени
int64_t start_time = s_clock ();
// Подтверждать завершение 100 задач
int task_nbr;
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (receiver);
free (string);
if ((task_nbr / 10) * 10 == task_nbr)
printf (":");
else
printf (".");
fflush (stdout);
}
printf ("Общее время выполнения: %d мс\n",
(int) (s_clock () - start_time));
// Отправить сигнал самоуничтожения worker'ам
s_send (controller, "KILL");
// Завершение
sleep (1); // Ждать завершения отправки
zmq_close (receiver);
zmq_close (controller);
zmq_term (context);
return 0;
}
В реальных условиях приложение должно корректно очищаться и завершаться при получении сигнала Ctrl-C или других сигналов, таких как SIGTERM. По умолчанию этот сигнал убивает процесс, что приводит к потере незавершенных сообщений, некорректному закрытию файлов и т.д.В языке C обработка сигнала выглядит следующим образом:
interrupt: Обработка Ctrl-C корректно в C
//
// Показывает, как обрабатывать Ctrl-C
//
#include <zmq.h>
#include <stdio.h>
#include <signal.h>
// ---------------------------------------------------------------------
// Обработка сигналов
//
// При запуске программы вызывается функция s_catch_signals();
// В цикле проверяется значение s_interrupted, если оно равно 1, то происходит выход из цикла;
// Очень полезно для zmq_poll().
static int s_interrupted = 0;
static void s_signal_handler (int signal_value)
{
s_interrupted = 1;
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset (&action.sa_mask);
sigaction (SIGINT, &action, NULL);
sigaction (SIGTERM, &action, NULL);
}
int main (void)
{
void *context = zmq_init (1);
void *socket = zmq_socket (context, ZMQ_REP);
zmq_bind (socket, "tcp://*:5555");
s_catch_signals ();
while (1) {
// Блокирующее чтение останавливается при получении сигнала
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
if (s_interrupted) {
printf ("W: Получено сообщение об остановке, программа завершена...\n");
break;
}
}
zmq_close (socket);
zmq_term (context);
return 0;
}
Этот программный код использует функцию s_catch_signals()
для перехвата сигналов, таких как Ctrl-C (SIGINT) и SIGTERM. При получении любого из этих сигналов функция устанавливает глобальную переменную s_interrupted
в значение 1. Ваша программа не остановится автоматически, вам потребуется выполнить явные действия по очистке и завершению работы.* Вызовите функцию s_catch_signals()
в начале программы для настройки перехвата сигналов;
zmq_recv()
, zmq_poll()
, zmq_send()
, при получении сигнала эти функции вернут значение EINTR;s_recv()
, обрабатывают это прерывание и возвращают NULL;s_interrupted
в значение 1.Пример кода:
s_catch_signals();
client = zmq_socket(...);
while (!s_interrupted) {
char *message = s_recv(client);
if (!message)
break; // Клавиша Ctrl-C была нажата
}
zmq_close(client);
Если вы не будете обрабатывать сигналы после вызова s_catch_signals()
, ваша программа станет невосприимчивой к Ctrl-C и SIGTERM.
Любая долгоживущая программа должна правильно управлять памятью, иначе она рано или поздно столкнётся с утечками памяти, что приведёт к её аварийному завершению. Если ваш язык программирования автоматически управляет памятью за вас, то вы можете считать себя везунчиком. Однако, если вы используете язык, такой как C/C++, вам придётся самостоятельно управлять памятью. В этом разделе мы рассмотрим инструмент под названием Valgrind, который позволяет отслеживать утечки памяти.
sudo apt-get install valgrind
* По умолчанию ZMQ будет генерировать предупреждения в valgrind. Чтобы отключить эти предупреждения, вы можете использовать опцию ZMQ_MAKE_VALGRIND_HAPPY при компиляции ZMQ:```
$ cd zeromq2
$ export CPPFLAGS=-DZMQ_MAKE_VALGRIND_HAPPY
$ ./configure
$ make clean; make
$ sudo make install
* Программа должна правильно обрабатывать Ctrl-C, особенно для долгоживущих процессов (например, очередей), иначе valgrind сообщит о всех распределённых участках памяти как об ошибках.
* Компилируйте программу с опцией `-DDEBUG`, чтобы valgrind мог указать конкретные участки кода, где происходит утечка памяти.
* Наконец, запустите valgrind следующим образом:
valgrind --tool=memcheck --leak-check=full someprog
После решения всех проблем вы увидите сообщение о том, что утечек памяти больше нет.```
==30536== ОШИБОЧНЫЙ СУММАРНЫЙ ОТЧЕТ: Yöntem 0 ошибок из 0 контекстов...
ZMQ-сообщение может содержать несколько фреймов, что очень распространено в реальных приложениях, особенно там, где используются "конверты". В этом разделе мы рассмотрим, как правильно отправлять и получать сообщения с несколькими фреймами.
Каждый фрейм сообщения с несколькими фреймами представляет собой структуру zmq_msg
. Это значит, что если вы отправляете сообщение с пятью фреймами, вам нужно будет обрабатывать пять структур zmq_msg
. Вы можете хранить эти фреймы в некоторой структуре данных или обрабатывать их по одному.
Пример кода для отправки сообщения с несколькими фреймами:
zmq_send(socket, &message, ZMQ_SNDMORE);
...
zmq_send(socket, &message, ZMQ_SNDMORE);
...
zmq_send(socket, &message, 0);
==30536== ОШИБОЧНЫЙ СУММАРНЫЙ ОТЧЕТ: 0 ошибок из 0 контекстов...Теперь рассмотрим, как принимать и обрабатывать эти сообщения. Этот код применим как для однофреймовых, так и для многофреймовых сообщений:
while (1) {
zmq_msg_t message;
zmq_msg_init(&message);
zmq_recv(socket, &message, 0);
// Обработка одного фрейма
zmq_msg_close(&message);
int64_t more;
size_t more_size = sizeof(more);
zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size);
if (!more)
break; // Достигнут последний фрейм
}
Что еще следует знать о сообщениях с несколькими фреймами:
zmq_poll()
, то при получении первого фрейма все остальные фреймы уже были получены;zmq_msg
;ZMQ_RCVMORE
сокета или нет, вы все равно получите все сообщения;Затем мы расширяем эту структуру, помещаем устройства на определённые позиции и продолжаем увеличивать количество узлов:
Устройства в ZMQ не имеют строгих правил дизайна, но обычно они содержат группу "передних" конечных точек и группу "задних" конечных точек. Устройства являются бессостоятельными, поэтому их можно широко развернуть в сети. Вы можете запустить устройство в отдельном потоке внутри процесса или запустить его непосредственно в процессе. Внутри ZMQ предоставляет базовые реализации устройств для использования.Устройства ZMQ могут использоваться для маршрутизации и адресации, предоставления услуг, управления очередями и других задач. Разные модели сообщений требуют различных типов устройств для построения сети. Например, в модели запрос-ответ можно использовать устройства очередей и абстрактные службы, а в модели публикация-подписка — потоковые устройства и тематические устройства.Преимущество устройств ZMQ перед другими промежуточными программами заключается в том, что вы можете размещать их в любой части сети и выполнять любые задачи, которые вам необходимы.
Часто требуется расширять модель публикация-подписка на различные типы сетей. Например, группа подписчиков находится в интернете, и вы хотите отправлять сообщения широковещательной рассылкой подписчикам в локальной сети, а подписчикам в интернете — через протокол TCP.
Для этого нужно создать простое устройство прокси-сервиса, которое будет служить мостом между публикаторами и внешними подписчиками. Это устройство имеет две конечные точки: одна соединена с публикаторами в локальной сети, а другая — с внешней сетью. Оно принимает сообщения от публикаторов и пересылает их внешним подписчикам.wuproxy: Прокси-сервис погодных обновлений на C```c // // Метеорологическая информация агента сервиса устройства // #include "zhelpers.h"
int main (void) { void *context = zmq_init (1);
// Подписка на метеорологическую информацию
void *frontend = zmq_socket (context, ZMQ_SUB);
zmq_connect (frontend, "tcp://192.168.55.210:5556");
// Передача метеорологической информации
void *backend = zmq_socket (context, ZMQ_PUB);
zmq_bind (backend, "tcp://10.1.1.0:8100");
// Подписка на все сообщения
zmq_setsockopt (frontend, ZMQ_SUBSCRIBE, "", 0);
// Передача сообщений
while (1) {
while (1) {
zmq_msg_t message;
int64_t more;
// Обработка всех фреймов сообщений
zmq_msg_init (&message);
zmq_recv (frontend, &message, 0);
size_t more_size = sizeof (more);
zmq_getsockopt (frontend, ZMQ_RCVMORE, &more, &more_size);
zmq_send (backend, &message, more ? ZMQ_SNDMORE : 0);
zmq_msg_close (&message);
if (!more)
break; // Достигнут последний фрейм
}
}
// Программа не достигнет этой точки, но всё равно должна корректно завершиться
zmq_close (frontend);
zmq_close (backend);
zmq_term (context);
return 0;
}
Заметим, что этот код корректно обрабатывает многофреймовые сообщения, передавая их целиком подписчику. Если при отправке мы не указываем опцию ZMQ_SNDMORE, то полученные сообщения могут оказаться поврежденными. При создании агента важно обеспечить правильную обработку многофреймовых сообщений, чтобы избежать потери данных.
#### Агент запрос-ответ
Давайте создадим небольшой агент очереди сообщений в режиме запрос-ответ.
В модели клиент/сервер "Привет, мир", один клиент и один сервер общаются друг с другом. Однако в реальных условиях нам может потребоваться, чтобы несколько клиентов общались с несколькими серверами. Ключевой проблемой здесь является то, что сервер должен быть бессостоятельным, то есть все состояние должно быть включено в одном запросе или храниться в других средствах, таких как база данных.
У нас есть два способа соединения нескольких клиентов с нескольким серверами. Первый — это прямое соединение клиента с несколькими серверами. Клиентский сокет может быть подключен к нескольким серверным сокетам, и его запросы будут распределяться между серверами с помощью балансировки нагрузки. Например, если у нас есть клиент, подключенный к трем серверам A, B и C, и клиент создаёт четыре запроса R1, R2, R3 и R4, то R1 и R4 будут обработаны сервером A, R2 — сервером B, а R3 — сервером C:
Эта конфигурация позволяет легко добавлять новых клиентов, но для добавления новых серверов потребуется изменение конфигурации каждого клиента. Если у вас 100 клиентов и вам нужно добавить три новых сервера, каждый из этих клиентов должен будет быть переустановлен, чтобы знать о существовании новых серверов.Конечно, такой подход не является оптимальным. Чем больше жёстко закреплённых модулей в сети, тем сложнее её масштабировать. Поэтому нам нужен модуль, расположенный между клиентами и серверами, который будет содержать всю информацию о топологии сети. В идеальном случае, мы должны иметь возможность добавлять или удалять клиентов или серверы без необходимости изменения конфигурации любого компонента. Давайте создадим такой компонент. Этот агент будет связан с двумя конечными точками: передней конечной точкой для подключения клиентов и задней конечной точкой для подключения сервера. Он будет использовать zmq_poll() для опроса этих двух сокетов, принимая сообщения и пересылая их. В устройстве не будет присутствовать очереди, так как ZMQ автоматически выполняет это в сокетах. При использовании REQ и REP сокетов сессия запрос-ответ строго синхронизирована. Клиент отправляет запрос, сервер принимает запрос и отправляет ответ, который затем принимает клиент. Если один из участников (клиент или сервер) сталкивается с проблемой (например, дважды подряд отправляет запрос), программа выдаст ошибку.Однако наш прокси-устройство должно быть асинхронным, хотя можно использовать `zmq_poll()` для одновременного обработки двух сокетов, но здесь явно нельзя использовать REP и REQ сокеты.
К счастью, у нас есть DEALER и ROUTER сокеты, которые могут выполнить эту задачу, обеспечивая асинхронную передачу сообщений. DEALER ранее назывался XREQ, а ROUTER — XREP, но в новом коде следует использовать названия DEALER/ROUTER. В третьей главе вы узнаете, как использовать DEALER и ROUTER сокеты для создания различных типов запрос-ответ моделей.
Давайте рассмотрим, как DEALER и ROUTER сокеты работают в устройстве.
Ниже приведена схема, описывающая модель запрос-ответ, где REQ и ROUTER взаимодействуют, а DEALER взаимодействует с REP. Между ROUTER и DEALER требуется пересылка сообщений:

Прокси-устройство запрос-ответ будет связывать два сокета к передней и задней части, чтобы они могли соединяться с клиентскими и серверными сокетами соответственно. Перед использованием устройства потребуется изменение кода клиента и сервера.
**rrclient: Запрос-ответ клиент на C**
```c
//
// Пример Hello world клиента
// Соединяет REQ сокет с конечной точкой tcp://localhost:5559
// Отправляет "Hello" серверу и ожидает ответ "World"
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Сокет для связи с сервером
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5559");
``````markdown
int request_nbr;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
s_send(requester, "Привет");
char *string = s_recv(requester);
printf("Получен ответ %d [%s]\n", request_nbr, string);
free(string);
}
zmq_close(requester);
zmq_term(context);
return 0;
}
Вот пример кода сервера:
rrserver: Запрос-ответ сервис на C```c // // Сервер для услуги Hello World // Подключает REP сокет к tcp://*:5560 // Принимает запросы Hello и возвращает ответы World // #include "zhelpers.h"
int main (void) { void *context = zmq_init(1);
// Сокет для связи с клиентами
void *responder = zmq_socket(context, ZMQ_REP);
zmq_connect(responder, "tcp://localhost:5560");
while (1) {
// Ожидание следующего запроса
char *string = s_recv(responder);
printf("Получен запрос: [%s]\n", string);
free(string);
// Выполнение некоторой "работы"
sleep(1);
// Возврат ответа
s_send(responder, "World");
}
// Программа не достигнет этой точки, но все равно следует очистить
zmq_close(responder);
zmq_term(context);
return 0;
}
**rrbroker: Request-reply брокер на C**
```c
//
// Простой агент запрос-ответ
//
#include "zhelpers.h"
int main (void)
{
// Подготовка контекста и сокетов
void *context = zmq_init(1);
void *frontend = zmq_socket(context, ZMQ_ROUTER);
void *backend = zmq_socket(context, ZMQ_DEALER);
zmq_bind(frontend, "tcp://*:5559");
zmq_bind(backend, "tcp://*:5560");
// Инициализация набора для опроса
zmq_pollitem_t items[] = {
{frontend, 0, ZMQ_POLLIN, 0},
{backend, 0, ZMQ_POLLIN, 0}
};
// Передача сообщений между сокетами
while (1) {
zmq_msg_t message;
int64_t more; // Обнаружение сообщений с несколькими фреймами
``` zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
while (1) {
// Обработка всех фреймов сообщения
zmq_msg_init (&message);
zmq_recv (frontend, &message, 0);
size_t more_size = sizeof (more);
zmq_getsockopt (frontend, ZMQ_RCVMORE, &more, &more_size);
zmq_send (backend, &message, more ? ZMQ_SNDMORE : 0);
zmq_msg_close (&message);
if (!more)
break; // Последний фрейм
}
}
if (items [1].revents & ZMQ_POLLIN) {
while (1) {
// Обработка всех фреймов сообщения
zmq_msg_init (&message);
zmq_recv (backend, &message, 0);
size_t more_size = sizeof (more);
zmq_getsockopt (backend, ZMQ_RCVMORE, &more, &more_size);
zmq_send (frontend, &message, more ? ZMQ_SNDMORE : 0);
zmq_msg_close (&message);
if (!more)
break; // Последний фрейм
}
}
}
// Программа не должна достигать этой точки, но все равно следует очистить
zmq_close (frontend);
zmq_close (backend);
zmq_term (context);
return 0;
}
Использование агента запрос-ответ позволяет сделать структуру клиент-сервер более гибкой: клиент не знает о существовании сервера, а сервер не знает о существовании клиента. Единственный стабильный компонент в сети — это промежуточный агент:
ZMQ предоставляет несколько встроенных устройств, однако большинство людей предпочитает самостоятельно писать эти устройства. Встроенные устройства включают:* QUEUE — может использоваться как агент для запросов-ответов;
Для запуска одного из этих устройств можно использовать функцию zmq_device()
, передавая ей два сокета:
zmq_device (ZMQ_QUEUE, frontend, backend);
Запуск QUEUE аналогичен добавлению агента для запросов-ответов в сети, достаточно создать привязанные или подключенные сокеты. Ниже приведён пример использования встроенного устройства:
msgqueue: Брокер очереди сообщений на C
//
// Простой брокер очереди сообщений
// Обеспечивает функциональность агента для запросов-ответов, но использует встроенное устройство
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Клиентский сокет
void *frontend = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (frontend, "tcp://*:5559");
// Серверный сокет
void *backend = zmq_socket (context, ZMQ_DEALER);
zmq_bind (backend, "tcp://*:5560");
// Запуск встроенного устройства
zmq_device (ZMQ_QUEUE, frontend, backend);
// Программа не достигнет этой точки
zmq_close (frontend);
zmq_close (backend);
zmq_term (context);
return 0;
}
Встроенные устройства правильно обрабатывают ошибки, в то время как агенты, реализованные вручную, могут не содержать механизмов обработки ошибок. Поэтому, когда это возможно, лучше использовать встроенные устройства.Вы можете задать вопрос, как некоторые разработчики ZMQ: что произойдет, если я передам другие типы сокетов этим устройствам? Ответ: не делайте этого. Вы можете передавать различные типы сокетов, но результат будет странным. Так, для QUEUE следует использовать ROUTER/DEALER сокеты, для FORWARDER — SUB/PUB, а для STREAMER — PULL/PUSH.
Если вам нужны другие типы сокетов для ваших нужд, вам придётся самостоятельно реализовать устройства.
Многопоточное программирование (MT-программирование) с использованием ZMQ может стать настоящим удовольствием. При использовании ZMQ-сокетов в многопоточной среде вам не нужно беспокоиться о дополнительных вещах, просто позвольте им работать.
При многопоточном программировании с использованием ZMQ не требуется беспокоиться о мьютексах, блокировках или других факторах, связанных с параллельным программированием; единственное, что вам действительно нужно учитывать, это сообщения между потоками. Идеальное многопоточное программирование — это когда код легко писать и читать, можно использовать одну и ту же технологию на разных системах и языках, программа может работать на компьютерах с любым количеством ядер, не имеет состояния и не сталкивается со скоростными ограничениями.Если у вас есть многолетний опыт в многопоточном программировании и вы знаете, как использовать блокировки, семафоры, критические секции и другие механизмы для правильной работы кода (хотя ещё не учитывая его быстродействие), вы можете испытывать разочарование, потому что ZMQ изменит всё это. За три десятилетия развития параллельных приложений было установлено правило: не делитесь состоянием. Это как два пьяных человека, пытающихся поделиться одной бутылкой пива; если они не лучшие друзья, они скоро начнут драку. Когда к ним присоединяются ещё несколько пьяных людей, ситуация становится ещё хуже. Многопоточное программирование иногда напоминает ситуацию с пьяными людьми, пытающимися забрать бутылку пива.
Многопоточное программирование часто является мучительным процессом, особенно когда программа ломается из-за чрезмерной нагрузки. Вы можете не знать, почему это произошло. Некто написал статью "11 ошибочных мест в многопоточной программе", которая широко распространена в крупных компаниях. В ней перечислены некоторые из этих проблем: отсутствие синхронизации, неправильная зернистость, разделение чтения и записи, безблокировка, передача блокировок, конфликты приоритетов и так далее.Представьте себе, что в три часа дня, когда фондовый рынок работает в полную силу, ваша программа внезапно ломается из-за проблемы с блокировками. Какова будет ваша реакция? Поэтому, как программисты, мы вынуждены использовать более сложные механизмы для решения сложных многопоточных проблем.Некто сравнил многопоточные программы с основными опорами крупных компаний, но они часто оказываются самыми уязвимыми. Продукты, которые пытаются расширяться через сеть, обычно заканчивают провалом.
Как использовать ZMQ для многопоточного программирования, вот некоторые правила:
Не обращайтесь к одному и тому же набору данных из разных потоков. Если вам нужно использовать традиционные механизмы блокировки, это противоречит идеологии ZMQ. Единственным исключением является объект контекста ZMQ, который является потокобезопасным.
Создайте контекст ZMQ для каждого процесса и передайте его всем потокам, которые будут использовать протокол inproc для связи.* Вы можете рассматривать потоки как отдельные задачи, используя свои собственные контексты, но эти потоки не смогут использовать протокол inproc для связи друг с другом. Преимуществом этого подхода является возможность легкого перехода на выполнение программы в виде отдельных процессов в будущем.* Не передавайте объекты сокетов между различными потоками, так как они не являются потокобезопасными. Хотя теоретически это возможно, это потребует использования механизмов мьютексов и блокировок, что может замедлить и сделать ваше приложение более хрупким. Единственная разумная ситуация — это внутреннее использование механизма сборки мусора в библиотеках ZMQ некоторых языков программирования, где может происходить передача объектов сокетов. Когда вам требуется использовать два устройства в приложении, вы можете передавать объект сокета из одного потока в другой. Это может работать на начальных этапах, но рано или поздно приведет к случайным ошибкам. Поэтому следует открывать и закрывать сокеты в одном и том же потоке.*Если вы будете следовать этим правилам, вы обнаружите, что многопоточные программы легко могут быть разделены на несколько процессов. Логика программы может выполняться в потоках, процессах или даже на разных компьютерах, в зависимости от ваших потребностей.
ZMQ использует нативные механизмы потоков системы, а не какие-то "зелёные потоки". Преимуществом этого является то, что вам не нужно учиться новому API многопоточного программирования, и ваша программа будет хорошо интегрирована с целевой операционной системой. Вы можете использовать инструменты, такие как ThreadChecker от Intel, чтобы проследить за работой потоков. Недостатком является то, что если программа создаёт слишком много потоков (например, тысячи), это может привести к чрезмерной нагрузке на операционную систему.
Давайте рассмотрим пример, который сделает нашу службу Hello World более мощной. Исходная служба была однопоточной, и если количество запросов было невелико, это не вызывало проблем. Потоки ZMQ могут быстро работать на одном ядре, выполняя большое количество работы. Однако, что произойдёт, если одновременно поступят 10 000 запросов? В реальном мире мы запускаем несколько рабочих потоков, которые будут стараться принимать запросы от клиентов, обрабатывать их и возвращать ответы.Конечно, вы можете запустить несколько рабочих процессов для выполнения этой задачи, но запуск одного процесса всегда проще и легче управлять, чем несколькими. Кроме того, рабочие потоки, запущенные как потоки, занимают меньше полосы пропускания и имеют меньшую задержку.
Вот многопоточная версия службы Hello World:
mtserver: Многопоточная служба на C
//
// Многопоточная версия службы Hello World
//
#include "zhelpers.h"
#include <pthread.h>
static void *
worker_routine (void *context) {
// Подключение к сокету агента
void *receiver = zmq_socket (context, ZMQ_REP);
zmq_connect (receiver, "inproc://workers");
while (1) {
char *string = s_recv (receiver);
printf ("Получен запрос: [%s]\n", string);
free (string);
// Выполнение работы
sleep (1);
// Отправка ответа
s_send (receiver, "World");
}
zmq_close (receiver);
return NULL;
}
int main (void)
{
void *context = zmq_init (1);
// Сокет для связи с клиентами
void *clients = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (clients, "tcp://*:5555");
// Сокет для связи с рабочими процессами
void *workers = zmq_socket (context, ZMQ_DEALER);
zmq_bind (workers, "inproc://workers");
// Запуск пула рабочих процессов
int thread_nbr;
for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_routine, context);
}
// Запуск очереди устройств
zmq_device (ZMQ_QUEUE, clients, workers);
// Программа не достигнет этой точки, но все равно выполняет очистку
zmq_close (clients);
zmq_close (workers);
zmq_term (context);
return 0;
}
Все коды должны быть уже хорошо знакомы:* Сервер запускает группу worker-потоков, каждый worker создаёт REP сокет и обрабатывает полученные запросы. Worker-потоки работают как однопоточные службы, единственное отличие заключается в использовании протокола inproc вместо tcp и в том, что направление привязки-соединения поменялось местами.
Важно отметить, что в некоторых языках программирования создание потоков может быть не слишком удобным. POSIX предоставляет библиотеку pthreads, но в Windows требуется использование различных API. Мы рассмотрим, как упаковать многопоточный API, в третьей главе.
Пример "работы" представляет собой просто ожидание в течение одной секунды; любую операцию можно выполнить в worker-потоках, включая общение с другими узлами. Поток сообщений выглядит следующим образом: REQ-ROUTER-QUEUE-DEALER-REP.
Мы используем PAIR сокеты и протокол inproc.
mtrelay: Многопоточное ретранслирование на C
//
// Многопоточная синхронизация
//
#include "zhelpers.h"
#include <pthread.h>
static void *
step1 (void *context) {
// Подключение к шагу 2, чтобы сообщить, что готов
void *xmitter = zmq_socket (context, ZMQ_PAIR);
zmq_connect (xmitter, "inproc://step2");
printf ("Шаг 1 готов, уведомление шага 2...\n");
s_send (xmitter, "Готово");
zmq_close (xmitter);
return NULL;
}
static void *
step2 (void *context) {
// Запуск шага 1 и привязка к inproc сокету
void *receiver = zmq_socket (context, ZMQ_PAIR);
zmq_bind (receiver, "inproc://step2");
pthread_t thread;
pthread_create (&thread, NULL, step1, context);
// Ожидание сигнала
char *string = s_recv (receiver);
free (string);
zmq_close (receiver);
// Подключение к шагу 3, чтобы сообщить, что готов
void *xmitter = zmq_socket (context, ZMQ_PAIR);
zmq_connect (xmitter, "inproc://step3");
printf ("Шаг 2 готов, уведомление шага 3...\n");
s_send (xmitter, "Готово");
zmq_close (xmitter);
return NULL;
}
int main (void)
{
void *context = zmq_init (1);
// Запуск шага 2 и привязка к inproc сокету
void *receiver = zmq_socket (context, ZMQ_PAIR);
zmq_bind (receiver, "inproc://step3");
pthread_t thread;
pthread_create (&thread, NULL, step2, context);
// Ожидание сигнала
char *string = s_recv (receiver);
free (string);
zmq_close (receiver);
printf ("Тест успешен!\n");
zmq_term (context);
return 0;
}
Это типичный пример многопоточного программирования с использованием ZMQ:1. Два потока взаимодействуют через протокол inproc, используя один и тот же контекст;
Важно отметить, что этот код не может быть расширен для координации между несколькими процессами. Если вы используете протокол inproc, можно создать только очень тесно связанные приложения. Этот метод можно использовать в случае, если требуется строго контролировать время задержки. Для других приложений каждый поток может использовать один и тот же контекст, а протоколы ipc или tcp могут быть выбраны для этого. В этом случае вы сможете свободно разделить приложение на несколько процессов или даже на несколько компьютеров.
Это первый раз, когда мы используем сокеты типа PAIR. Почему использовать PAIR? Можно было бы использовать и другие типы сокетов, но они имеют некоторые недостатки, влияющие на коммуникацию между потоками:* Вы можете использовать PUSH для отправителя и PULL для получателя, что может показаться подходящим, но важно помнить, что сокет PUSH распределяет нагрузку, поэтому если вы случайно запустите два получателя, вы "потеряете" половину сообщений. В то время как сокеты PAIR устанавливают однозначное соединение, которое является эксклюзивным.* Вы можете использовать DEALER для отправителя и ROUTER для получателя. Однако сокет ROUTER добавляет адрес источника в начало сообщения, что может превратить нулевое сообщение в несколько частей. Если вам это не важно и вы не будете повторно читать этот сокет, это может быть приемлемым решением. Однако если вы хотите использовать этот сокет для получения реальных данных, вы обнаружите, что сообщения, предоставляемые ROUTER, будут неверными. Что касается сокета DEALER, он также имеет механизм балансировки нагрузки, что делает его подверженным тем же рискам, что и сокет PUSH.
Исходя из всего вышеуказанного, использование сокетов типа PAIR для координации между потоками является наиболее подходящим решением.### Координация узлов
Когда вы хотите координировать узлы, сокеты PAIR не являются лучшим выбором, что отличает потоки от узлов. В общем, узлы могут входить и выходить свободно, в то время как потоки остаются более стабильными. При использовании сокетов PAIR, если удалённый узел отключается и затем снова подключается, сокеты PAIR этого не учитывают.
Второе различие заключается в том, что количество потоков обычно фиксировано, а количество узлов часто меняется. Давайте рассмотрим, как следует координировать узлы в модели метеорологической информации, чтобы гарантировать, что клиенты не потеряют первые сообщения.Вот логика выполнения программы:
Здесь мы будем использовать сокеты REQ-REP для синхронизации публикации и подписчиков. Код публикации представлен ниже:
syncpub: Синхронизированная публикация на C
//
// Публикация - синхронизированная версия
//
#include "zhelpers.h"
// Ожидание 10 подписчиков
#define SUBSCRIBERS_EXPECTED 10
int main (void)
{
void *context = zmq_init (1);
// Сокет для связи с клиентами
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5561");
// Сокет для приема сигналов
void *syncservice = zmq_socket (context, ZMQ_REP);
zmq_bind (syncservice, "tcp://*:5562");
// Приём сигналов готовности от подписчиков
printf ("Ожидание готовности подписчиков\n");
int subscribers = 0;
while (subscribers < SUBSCRIBERS_EXPECTED) {
// - Ожидание сигнала готовности
char *string = s_recv (syncservice);
free (string);
// - Отправка ответа
s_send (syncservice, "");
subscribers++;
}
// Начало передачи 1 000 000 сообщений
printf ("Начало широковещательной передачи\n");
int update_nbr;
for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
s_send (publisher, "Ревень");
}
``` s_send(publisher, "КОНЕЦ");
zmq_close(publisher);
zmq_close(syncservice);
zmq_term(context);
return 0;
}
Вот код подписчика:
syncsub: Синхронизированный подписчик на C```c // // Подписчик - синхронная версия // #include "zhelpers.h"
int main(void) { void *context = zmq_init(1);
// 1. Подключение SUB сокета
void *subscriber = zmq_socket(context, ZMQ_SUB);
zmq_connect(subscriber, "tcp://localhost:5561");
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
// ZMQ работает слишком быстро, поэтому мы немного задерживаемся...
sleep(1);
// 2. Синхронизация с публикацией
void *syncclient = zmq_socket(context, ZMQ_REQ);
zmq_connect(syncclient, "tcp://localhost:5562");
// - Отправка запроса
s_send(syncclient, "");
// - Ожидание ответа
char *string = s_recv(syncclient);
free(string);
// 3. Обработка сообщений
int update_nbr = 0;
while (1) {
char *string = s_recv(subscriber);
if (strcmp(string, "END") == 0) {
free(string);
break;
}
free(string);
update_nbr++;
}
printf("Получено %d сообщений\n", update_nbr);
zmq_close(subscriber);
zmq_close(syncclient);
zmq_term(context);
return 0;
}
```sh
echo "Запуск подписчиков..."
for a in 1 2 3 4 5 6 7 8 9 10; do
syncsub &
done
echo "Запуск публика..."
syncpub
Результат выполнения скрипта:
Запуск подписчиков...
Запуск публика...
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
```При завершении запроса REQ-REP нельзя гарантировать успешное соединение SUB-сокета, если не использовать протокол inproc. Поскольку порядок внешних соединений не определён, в примерах программы используется метод sleep(1) для ожидания завершения соединений перед отправкой синхронизирующего запроса.
Более надёжная модель может выглядеть следующим образом:
* Публикатор открывает PUB-сокет и начинает отправлять сообщения "Hello" (не данные);
* Подписчики подключаются к SUB-сокетам и после получения сообщения "Hello" используют REQ-REP-сокеты для синхронизации;
* После получения всех синхронизирующих сообщений от подписчиков, публикатор начинает отправлять реальные данные.
### Нулевое копирование
В первой главе мы говорили, что нулевое копирование очень опасно, но это было просто предупреждение. Поскольку вы дошли до этого места, можно сказать, что у вас достаточно знаний, чтобы использовать нулевое копирование. Однако стоит помнить, что все пути ведут в ад, и слишком раннее оптимизирование программы может быть вредным. Другими словами, если вы не умеете правильно использовать нулевое копирование, это может сделать архитектуру программы хуже.API, предоставляемый ZMQ, позволяет вам напрямую отправлять и получать сообщения, не беспокоясь о кэшировании. Поскольку сообщения отправляются и принимаются ZMQ в фоновом режиме, использование нулевого копирования требует дополнительной работы.Для использования нулевого копирования создается сообщение с помощью функции `zmq_msg_init_data()`, которая указывает на уже выделенный участок памяти, а затем передается функции `zmq_send()`. При создании сообщения также требуется предоставить функцию для освобождения содержимого сообщения, которую ZMQ вызовет после завершения отправки сообщения. Ниже приведен простой пример, где предполагается, что выделенный участок памяти имеет размер 1000 байт:
```c
void my_free (void *data, void *hint) {
free (data);
}
// Отправка сообщения из буфера, который мы выделили, а ZMQ освободит за нас
zmq_msg_t message;
zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_send (socket, &message, 0);
При получении сообщений нулевое копирование использовать нельзя: ZMQ помещает полученное сообщение в область памяти для чтения, но не записывает его в заранее указанную область памяти программы.
Многоканальные сообщения ZMQ поддерживают нулевое копирование очень хорошо. В традиционных системах передачи сообщений вам приходится сохранять содержимое из различных буферов в один общий буфер, прежде чем отправить его. Однако ZMQ собирает содержимое из различных областей памяти в одну фрейм сообщения и отправляет её. Внутри ZMQ сообщение передается как единое целое, что делает процесс очень эффективным.
Эта возможность может оказаться полезной, хотя она не решает все проблемы, но позволяет компенсировать некоторые потери, особенно в многопубликационных-подписках моделях. Давайте обсудим это подробнее.
Здесь два сокета активно передают метеорологическую информацию:
Если принимающая сторона (SUB, PULL, REQ) указывает идентификатор сокета, когда они отключаются от сети, отправляющая сторона (PUB, PUSH, REP) будет хранить информацию для них до достижения порогового значения (HWM). При этом отправляющая сторона не обязана иметь идентификатор сокета.
Важно отметить, что буферы сокетов ZMQ невидимы для программиста, как и буферы TCP.
До настоящего времени мы использовали одноразовые сокеты. Для преобразования одноразовых сокетов в постоянные необходимо задать им идентификатор сокета. У всех сокетов ZMQ есть идентификатор, который генерируется автоматически UUID.Внутри ZMQ, когда два сокета соединяются, они обмениваются своими идентификаторами. Если один из них не имеет ID, он генерирует его самостоятельно для идентификации другого:
Однако сокеты также могут сообщать свои идентификаторы друг другу, поэтому при повторном подключении они узнают друг друга.
+-----------+
| |
| Отправитель|
| |
+-----------+
| Сокет |
\-----------/
^ "Люси! Рад снова тебя видеть..."
|
|
| "Мое имя Люси"
/-----+-----\
| Сокет |
+-----------+
| |
| Получатель|
| |
+-----------+
Рисунок # - Устойчивый сокет
Ниже приведена строка кода, которая позволяет установить идентификатор для сокета, создавая тем самым устойчивый сокет:
zmq_setsockopt (socket, ZMQ_IDENTITY, "Lucy", 4);
Еще несколько замечаний относительно идентификаторов сокетов:
Мы уже кратко рассмотрели многофреймовые сообщения, теперь давайте посмотрим на их типичное применение — обертку сообщений. Обертка представляет собой способ указания источника сообщения без изменения его содержимого.
В модели Publish-Subscribe обертка содержит информацию о подписке, которую можно использовать для фильтрации ненужных сообщений.
Если вы хотите использовать обертку Publish-Subscribe, вам придется самостоятельно создать и установить её. Это действие является опциональным, и мы не использовали её в наших предыдущих примерах. Использование обёртки в модели Publish-Subscribe может быть сложным, но это действительно необходимо в реальных приложениях, поскольку обёртка и сообщение — это две отдельные данные.
Это пример сообщения с обёрткой в модели Publish-Subscribe:
Помните, что в модели Publish-Subscribe сообщения принимаются на основе информации о подписке, то есть префикса сообщения. Размещение этого префикса в отдельном фрейме сообщения делает соответствие очевидным. Так как нет возможности, чтобы приложение случайно совпадало только с частью данных. Вот пример простого публика-подписки сообщения с использованием конверта. Публикатор отправляет два типа сообщений: A и B, указывая тип сообщения в конверте:psenvpub: Публика-подписка публикатор на C
//
// Публика-подписка сообщение конверт - публикатор
// Функция s_sendmore() также предоставлена zhelpers.h
//
#include "zhelpers.h"
int main (void)
{
// Подготовка контекста и публикаторского сокета
void *context = zmq_init (1);
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5563");
while (1) {
// Отправка двух сообщений, типа A и B
s_sendmore (publisher, "A");
s_send (publisher, "Мы не хотим видеть это");
s_sendmore (publisher, "B");
s_send (publisher, "Мы хотели бы видеть это");
sleep (1);
}
// Правильное завершение работы
zmq_close (publisher);
zmq_term (context);
return 0;
}
Предположим, что подписчик заинтересован только в сообщениях типа B:
psenvsub: Публика-подписка подписчик на C
//
// Публика-подписка сообщение конверт - подписчик
//
#include "zhelpers.h"
int main (void)
{
// Подготовка контекста и подписчика сокета
void *context = zmq_init (1);
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5563");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "B", 1);
while (1) {
// Чтение конверта сообщения
char *address = s_recv (subscriber);
// Чтение содержимого сообщения
char *contents = s_recv (subscriber);
printf ("[%s] %s\n", address, contents);
free (address);
free (contents);
}
// Правильное завершение работы
zmq_close (subscriber);
zmq_term (context);
return 0;
}
При выполнении вышеуказанных программ, подписчик будет выводить следующую информацию:
[B] Мы хотели бы видеть это
[B] Мы хотели бы видеть это
[B] Мы хотели бы видеть это
[B] Мы хотели бы видеть это
...
```Этот пример показывает, что подписчик игнорирует неподписанные сообщения и получает полное многофреймовое сообщение — вы не получите только часть сообщения.
Если вы подписываетесь на несколько сокетов и хотите знать идентификаторы этих сокетов, чтобы отправить сообщения им через другой сокет (этот случай очень распространен), вы можете заставить публикатор создать сообщение с тремя фреймами:
### (Полу)постоянные подписчики и пороговые значения (HWM)
Все типы сокетов могут использовать идентификаторы. Если вы используете PUB и SUB сокеты, где SUB сокет сам себе указывает идентификатор, то при отключении SUB сокета, PUB будет хранить сообщения для отправки SUB.
Эта механика имеет как положительные, так и отрицательные стороны. Положительной стороной является то, что публикуемый сокет временно хранит эти сообщения, а когда подписчик восстанавливает соединение, отправляет их; отрицательной стороной является риск переполнения памяти публикуемого сокета.
**Если вы используете постоянный SUB сокет (то есть установили идентификатор для SUB сокета), вам следует избегать накопления сообщений в очереди публикуемого сокета и использовать пороговое значение (HWM) для защиты публикуемого сокета**. Пороговое значение публикуемого сокета влияет на всех подписчиков отдельно.Мы можем запустить пример, чтобы продемонстрировать это, используя `wuclient` и `wuserver` из первой главы. В `wuclient` добавьте следующую строку перед подключением сокета:
```c
zmq_setsockopt (subscriber, ZMQ_IDENTITY, "Привет", 6);
Скомпилируйте и запустите оба этих примера, всё кажется нормальным. Однако, если вы посмотрите на использование памяти публикуемого сокета, вы заметите, что оно постепенно увеличивается по мере того, как подписчики отключаются. Если вы затем перезапустите подписчика, вы заметите, что использование памяти больше не увеличивается. Как только подписчик снова отключится, использование памяти снова начнёт расти. Со временем это приведёт к исчерпанию системных ресурсов.
Давайте сначала рассмотрим, как устанавливать пороговые значения, а затем — как правильно их настроить. Ниже представлен код публикуемого и подписывающегося сокета, использующий вышеупомянутую "механизм координации узлов". Публикуемый сокет отправляет сообщение каждую секунду, и вы можете отключить подписчика, а затем перезапустить его, чтобы увидеть, что происходит.
Вот код публикуемого сокета:durapub: Устойчивый публикуемый сокет на C
//
// Публикатор - подключение устойчивого подписчика
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Подписчик отправляет сообщение о готовности
void *sync = zmq_socket (context, ZMQ_PULL);
zmq_bind (sync, "tcp://*:5564");
// Используется этот сокет для публикации сообщений
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5565");
// Ждем синхронизированное сообщение
char *string = s_recv (sync);
free (string);
// Отправляем cq 10 сообщений, одно каждую секунду
int update_nbr;
for (update_nbr = 0; update_nbr < 10; update_nbr++) {
char string [20];
sprintf (string, "Обновление %d", update_nbr);
s_send (publisher, string);
sleep (1);
}
s_send (publisher, "КОНЕЦ");
zmq_close (sync);
zmq_close (publisher);
zmq_term (context);
return 0;
}
Вот код подписчика:durasub: Устойчивый подписчик на C
//
// Устойчивый подписчик
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Подключаем SUB сокет
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_setsockopt (subscriber, ZMQ_IDENTITY, "Привет", 7);
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);
zmq_connect (subscriber, "tcp://localhost:5565");
// Отправляем синхронизационное сообщение
void *sync = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sync, "tcp://localhost:5564");
s_send (sync, "");
// Получаем обновления и завершаем работу по команде
while (1) {
char *string = s_recv (subscriber);
printf ("%s\n", string);
if (strcmp (string, "КОНЕЦ") == 0) {
free (string);
break;
}
free (string);
}
zmq_close (sync);
zmq_close (subscriber);
zmq_term (context);
return 0;
}
Запустите вышеуказанный код, откройте издателя и подписчика в разных окнах. Когда подписчик получает одно или несколько сообщений, нажмите Ctrl+C для прерывания, а затем перезапустите его, чтобы увидеть результат:
$ durasub
Обновление 0
Обновление 1
Обновление 2
^C
$ durasub
Обновление 3
Обновление 4
Обновление 5
Обновление 6
Обновление 7
^C
$ durasub
Обновление 8
Обновление 9
КОНЕЦ
```Как видно, единственное отличие устойчивого подписчика заключается в установке идентификатора для сокета, что позволяет издателю кэшировать сообщения до тех пор, пока соединение не будет восстановлено. Установка идентификатора позволяет преобразовать временный сокет в устойчивый. В реальной жизни вам следует аккуратно называть свои сокеты, используя конфигурационные файлы или генерируя UUID и сохраняя его.Когда мы задаем порог для PUB сокета, издатель начинает кэшировать определенное количество сообщений, а остальные сообщения будут отбрасываться. Давайте установим порог равным 2 и посмотрим, что произойдет:
```c
uint64_t hwm = 2;
zmq_setsockopt (publisher, ZMQ_HWM, &hwm, sizeof (hwm));
Запустите программу, прервите подписчика, подождите некоторое время и перезапустите его. Вы увидите следующий результат:
$ durasub
Обновление 0
Обновление 1
^C
$ durasub
Обновление 2
Обновление 3
Обновление 7
Обновление 8
Обновление 9
КОНЕЦ
Обратите внимание, что издатель сохранил только два сообщения (2 и 3). Порог заставляет ZMQ отбрасывать сообщения, выходящие за пределы очереди.
Кратко говоря, если вы используете устойчивого подписчика, вам необходимо задать порог для издателя, иначе сервер может столкнуться с проблемами из-за переполнения памяти. Однако есть еще один способ. ZMQ предоставляет механизм, известный как обмен (swap), который представляет собой файл на диске, используемый для хранения сообщений, выходящих за пределы очереди. Запуск этого механизма очень прост:
// Указание размера области обмена, единица измерения: байты.
uint64_t swap = 25000000;
zmq_setsockopt (publisher, ZMQ_SWAP, &swap, sizeof (swap));
Можно объединить вышеупомянутые методы, чтобы создать публикацию, которая будет принимать устойчивые сокеты и при этом не будет переполняться памятью:durapub2: Устойчивый, но циничный публикатор на C
//
// Публикатор — подключение к устойчивому подписчику
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Подписчик сообщит нам, что он готов
void *sync = zmq_socket (context, ZMQ_PULL);
zmq_bind (sync, "tcp://*:5564");
// Используем этот сокет для отправки сообщений
void *publisher = zmq_socket (context, ZMQ_PUB);
// Предотвратим переполнение сообщений из-за медленного устойчивого подписчика
uint64_t hwm = 1;
zmq_setsockopt (publisher, ZMQ_HWM, &hwm, sizeof (hwm));
// Установим размер области обмена для использования всеми подписчиками
uint64_t swap = 25000000;
zmq_setsockopt (publisher, ZMQ_SWAP, &swap, sizeof (swap));
zmq_bind (publisher, "tcp://*:5565");
// Ждем синхронизационного сообщения
char *string = s_recv (sync);
free (string);
// Отправляем 10 сообщений, одно каждую секунду
int update_nbr;
for (update_nbr = 0; update_nbr < 10; update_nbr++)
{
char string [20];
sprintf (string, "Обновление %d", update_nbr);
s_send (publisher, string);
sleep (1);
}
s_send (publisher, "КОНЕЦ");
zmq_close (sync);
zmq_close (publisher);
zmq_term (context);
return 0;
}
Если установить пороговое значение в реальных условиях равным 1, все ожидающие отправки сообщения будут сохранены на диск, что значительно замедлит процесс. Вот несколько типичных подходов к работе с различными подписчиками:* Необходимо установить пороговое значение для PUB сокета, которое можно определить на основе максимального количества подписчиков, доступной памяти для очередей и среднего размера сообщения. Например, если вы ожидаете 5000 подписчиков, имеете 1 ГБ памяти для использования и средний размер сообщения составляет около 200 байт, то разумное значение порога будет равно 1,000,000,000 / 200 / 5,000 = 1,000. Если вы не хотите потерять сообщения у медленных или зависших подписчиков, вы можете настроить обменник для хранения сообщений в часы пик. Размер обменника можно рассчитать на основе количества подписчиков, отношения пиков сообщений, среднего размера сообщения, временного хранения и других факторов. Например, если вы ожидаете 5000 подписчиков, размер сообщения составляет около 200 байт, а количество сообщений в секунду достигает 100 000, вам потребуется 100 МБ места на диске для хранения сообщений каждую секунду. В сумме это составит 6 ГБ места на диске, которое должно быть достаточно быстрым (что выходит за рамки данного руководства). О постоянных подписчиках:* Данные могут быть потеряны в зависимости от частоты публикации сообщений, размера сетевого кэша, протокола связи и т.д. Постоянные подписчики надежнее временных сокетов, но это не означает, что они идеальны.
О пороговых значениях:
Этот параметр влияет как на очередь отправки, так и на очередь приема сокета. Конечно, PUB и PUSH не имеют очередей приема, а SUB, PULL, REQ и REP не имеют очередей отправки. В случае с DEALER, ROUTER и PAIR сокетами, они имеют как очереди отправки, так и очереди приема.
Когда сокет достигает порогового значения, ZMQ может заблокировать его или просто отбросить сообщение.
При использовании протокола inproc отправитель и получатель используют общую очередь кэширования, поэтому реальный порог равен сумме пороговых значений обоих сокетов. Если один из сокетов не имеет установленного порогового значения, то он не будет иметь ограничений по кэшированию.
ZMQ — это как набор конструктора, если у вас есть достаточно воображения, вы можете использовать его для создания любой сети.Эта высокопроизводительная и гибкая архитектура обязательно расширит ваше понимание. Это не новое изобретение ZMQ; давно существуют такие потоково-ориентированные языки программирования, как Erlang, которые уже предоставляют подобные возможности. Однако ZMQ предлагает более удобный и легкий в использовании интерфейс.Как отметил Gonzo Diethelm: "Я бы хотел сказать одним предложением: 'Если бы ZMQ не существовало, его следовало бы изобрести.' Как человек с многолетним опытом работы в этой области, ZMQ очень меня вдохновляет. Я могу только сказать: 'Вот то, что я хочу!'"
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )