Слияние кода завершено, страница обновится автоматически
.- vim: set filetype=markdown:
.set GIT=https://github.com/anjuke/zguide-cn
# ZMQ Гид
**Автор: Пиетер Хинтжинс <ph@imatix.com>, CEO компании iMatix Corporation.**
**Перевод: Жань Чжан <jizhang@anjuke.com>, инженер компании Anjuke Group Haozhuwang**
Спасибо Биллу Дезмаре, Брайану Дорси, CAF, Дэниелу Лин, Эрику Дешранжу, Гонсало Диетхельму, Гвидо Гольдштейну, Хантеру Форду, Камилю Шакирову, Мартину Сустрику, Майклу Кастлмену, Навеену Чауле, Николаю Педуччи, Оливеру Смиту, Оливье Шаму, Петеру Александр, Пьеру Рулеау, Рэнди Дирбургу, Джону Уину, Алексу Томасу, Михаилу Минкову, Джереми Авенету, Майклу Комптону, Камилю Кишилу, Марку Харитонову, Гийому Оуберту, Иану Барберу, Майклу Шеридану, Фарук Акгюлю, Олегу Сидорову, Леву Гивону, Аллистеру МакЛоуду, Александру Д'Арчанжелу, Андреасу Хольцвиммеру, Хану Холлу, Роберту Джакабоски, Фелипе Крузу, Маркусу Маккарди, Михаилу Кулемину, доктору Герго Эрди, Павлу Жукову, Александру Элсе, Джованни Ругьеро, Рик "ТехноВини" и Дэниелу Лундину, Дэви Ховарду, Саймону Джеффорду, Бенджамину Петерсону, Джастину Кейсу, Девону Веллеру, Ричарду Смиту, Александру Морланду, Вадиму Грасзе, Майклу Яклу и Зеду Шоу за их вклад, а также Статису Сидерису за [Ditaa](http://www.ditaa.org).
Пожалуйста, используйте [трекер ошибок](https://github.com/imatix/zguide/issues) для всех комментариев и исправлений. Эта версия охватывает последнюю стабильную версию 0MQ и была опубликована &date("ddd d mmmm, yyyy").Гайд представлен главным образом на [C](http://zguide.zeromq.org/page:all), но также доступен на [PHP](http://zguide.zeromq.org/php:all) и [Lua](http://zguide.zeromq.org/lua:all).
---
Эта работа лицензирована под лицензией [Creative Commons Attribution-ShareAlike 3.0](http://creativecommons.org/licenses/by-sa/3.0/).
## Глава 1 Основы ZeroMQ
### Спасение мира
Как объяснить ZMQ? Некоторые люди начинают с перечисления всех преимуществ ZMQ: это набор компонентов для быстрого создания сокетов; его почтовая система имеет мощные маршрутизационные возможности; он слишком быстрый! Другие предпочитают делиться моментами просветления, когда ZMQ внезапно сделал всё понятным и очевидным. Другие сравнивают ZMQ с другими продуктами: он меньше, проще, но при этом кажется знакомым. Для меня лично, я предпочитаю рассказывать историю создания ZMQ, которая, я уверен, найдёт отклик у читателей. Программирование — это наука, но часто выдаётся за искусство. Мы никогда не интересуемся самыми базовыми механизмами программного обеспечения, или, другими словами, никто этим не занимается. Программное обеспечение не сводится только к алгоритмам, структурам данных, языкам программирования или абстракциям; все эти элементы — лишь инструменты, которые мы создаем, используем и в конечном итоге отбрасываем. Настоящее существо программного обеспечения — это существо человека.Например, когда мы сталкиваемся с очень сложной проблемой, мы объединяем усилия, выполняем специализированные задачи и разделяем проблему на несколько частей, чтобы решить её вместе. Здесь проявляется научная сторона программирования: создание набора небольших модулей, которые легко понять и использовать, что позволяет людям работать вместе над решением проблемы.
Мы живём в мире, где всё взаимосвязано, и нам нужна современная программа для руководства нами. Поэтому будущие модули для решения больших вычислительных задач должны быть взаимосвязанными и способными к параллельной работе. В этом случае код программы больше не будет сосредоточен только на себе, он будет общаться друг с другом, становясь достаточно разговорчивым. Код программы должен работать как человеческий мозг: миллиарды нейронов передают сигналы со скоростью света в среде без центрального контроля, без точки отказа, чтобы решать проблемы. Это не должно нас удивлять, так как современные сети уже похожи на то, что каждый узел подключен к человеческому мозгу.Если вы работали с потоками, протоколами или сетями, вы могли бы сказать, что мои слова звучат как сказка. В реальном применении просто соединение нескольких программ или сетей может быть сложной задачей. Миллиарды узлов? Это невозможно представить. В настоящее время только крупные, богатые компании могут позволить себе такие программы и услуги.Сетевые структуры современного мира давно вышли за рамки нашего понимания. Восьмидесятые годы прошлого века были временем [программного кризиса](http://en.wikipedia.org/wiki/No_Silver_Bullet), о котором говорил Фред Брукс. Позже, свободное и открытое программное обеспечение решило этот кризис, позволяя эффективно делиться знаниями. Сегодня мы снова сталкиваемся с программным кризисом, но об этом говорят мало. Только крупные, богатые компании могут позволить себе создание сильно связанных приложений. Там есть облака, но они частные. Наши данные и знания исчезают с наших личных компьютеров и переходят в облака, где они недоступны или не могут конкурировать. Кто владеет нашими социальными сетями? Это действительно похоже на новую эру главных компьютеров. Пока что не будем вдаваться в политические аспекты, на которые можно было бы написать отдельную книгу. В настоящее время, хотя интернет позволяет соединять миллионы программ, большинство из нас не могут этого сделать. Таким образом, действительно интересные крупные проблемы (например, здравоохранение, образование, экономика, транспорт и т. д.), остаются нерешенными. Мы не можем связывать код таким образом, чтобы эффективно решать масштабные задачи, как это делает сеть нейронов.Уже были предприняты попытки использовать различные методы для соединения приложений, такие как тысячи спецификаций IETF, каждая из которых решает конкретную проблему. Для разработчиков протокол HTTP является относительно простым и удобным, но это также часто приводит к ухудшению ситуации, поскольку он способствует мышлению, основанному на серверной модели, игнорируя клиентскую сторону.Поэтому до сих пор люди продолжают использовать примитивные TCP/UDP протоколы, частные протоколы, HTTP протоколы и сетевые сокеты для соединения приложений. Этот подход все еще вызывает проблемы, поскольку он медленный, трудно масштабируемый и требует централизованного управления. Распределенные P2P протоколы используются преимущественно для развлечений, а не для реальных приложений. Кто будет использовать Skype или BitTorrent для обмена данными?
Это заставляет нас вернуться к вопросам программной науки. Чтобы спасти мир, нам нужно сделать две вещи: во-первых, найти способ соединять любые два приложения в любом месте; во-вторых, упаковать это решение в максимально простую форму для использования разработчиками.
Может быть, это кажется слишком простым, но это действительно так.
### Введение в ZMQZMQ (ØMQ, ZeroMQ, 0MQ) выглядит как набор встроенных сетевых библиотек, но работает как концептуальный фреймворк для параллелизма. Он предоставляет сокеты, которые могут передавать сообщения через множество протоколов, таких как потоки, процессы, TCP, широковещательные сети и т.д. Вы можете использовать сокеты для создания различных моделей соединений, таких как фан-аут, публикация-подписка, распределение задач, запрос-ответ и т.д. Быстродействие ZMQ достаточно для работы с кластерными приложениями. Его асинхронная модель ввода-вывода позволяет создавать многопоточные приложения и выполнять асинхронную обработку сообщений. ZMQ поддерживает множество языков программирования и может работать практически на всех операционных системах. ZMQ является продуктом компании [iMatix](http://www.iMatix.com/) и распространяется под лицензией LGPL.### Необходимые знания
* Использование самой последней стабильной версии ZMQ;
* Использование операционной системы Linux или аналогичной;
* Способность читать код на языке C, который используется по умолчанию в примерах данного руководства;
* Когда мы пишем константы, такие как PUSH или SUBSCRIBE, вы должны знать, где найти соответствующие реализации этих констант на вашем языке, например ZMQ_PUSH или ZMQ_SUBSCRIBE.
### Получение примеров
Все примеры в этом руководстве хранятся в [репозитории GitHub](https://github.com/imatix/zguide). Самый простой способ получить доступ к ним — выполнить следующую команду:
```
git clone git://github.com/imatix/zguide.git
```
Обзор каталога examples показывает реализацию на различных языках. Если вы заметили отсутствие реализации на каком-либо из используемых вами языках, мы будем рады, если вы [добавите её](http://zguide.zeromq.org/main:translate). Это делает руководство полезным, и мы благодарим всех, кто вносил свой вклад.
Все примеры кода распространяются под лицензией MIT/X11, за исключением случаев, когда в исходном коде указано другое.
### Вопрос-ответ
Давайте начнем с простого кода, с традиционной программы "Hello World". Мы создадим клиент и сервер, где клиент отправляет "Hello" серверу, а сервер отвечает "World". Ниже приведён код сервера на C, который открывает ZMQ-сокет на порту 5555, ожидает запросы и отвечает "World".**hwserver.c: Программа "Hello World" сервера**
```c
//
// Сервер "Hello World"
// Привязка REP-сокета к tcp://*:5555
// Прием "Hello" от клиента и ответ "World"
//
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
int main (void)
{
void *context = zmq_init (1);
// Сокет для связи с клиентом
void *responder = zmq_socket (context, ZMQ_REP);
zmq_bind (responder, "tcp://*:5555");
while (1) {
// Ожидание запроса от клиента
zmq_msg_t request;
zmq_msg_init (&request);
zmq_recv (responder, &request, 0);
printf ("Получено Hello\n");
zmq_msg_close (&request);
// Выполнение "обработки"
sleep (1);
// Отправка ответа
zmq_msg_t reply;
zmq_msg_init_size (&reply, 5);
memcpy (zmq_msg_data (&reply), "World", 5);
zmq_send (responder, &reply, 0);
zmq_msg_close (&reply);
}
// Программа не достигнет этой точки, но это демонстрирует, как следует завершить работу
zmq_close (responder);
zmq_term (context);
return 0;
}
```
```textdiagram
+------------+
| |
| Клиент |
| |
+------------+
| REQ |
\---+--------/
| ^
| |
"Hello" "World"
| |
v |
/--------+---\
| REP |
+------------+
| |
| Сервер |
| |
+------------+
```
Рисунок 1 - Запрос-Ответ
```Использование REQ-REP сокетов для отправки и получения сообщений требует соблюдения определённого порядка. Клиент сначала отправляет сообщение с помощью zmq_send(), затем получает ответ с помощью zmq_recv(), и так далее в цикле. Если этот порядок нарушается (например, если отправить два сообщения подряд), возникнет ошибка. Аналогично, сервер должен сначала принимать запрос, а затем отвечать.ZMQ использует C язык для своего руководства, и данное руководство также использует его как примерный язык программирования. Если вы читаете онлайн версию этого руководства, вы можете увидеть реализацию примеров на других языках ниже. Например, это реализация на C++:
**hwserver.cpp: Пример сервера**
```cpp
//
// Пример сервера на C++
// Привязывает REP сокет к tcp://*:5555
// Получает "Hello" от клиента и отвечает "World"
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>
int main () {
// Подготовка контекста и сокета
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REP);
socket.bind ("tcp://*:5555");
while (true) {
zmq::message_t request;
// Ждет запроса от клиента
socket.recv (&request);
std::cout << "Получено Hello" << std::endl;
// Выполняет некоторую "обработку"
usleep (1000000);
// Отправляет ответ "World"
zmq::message_t reply (5);
memcpy ((void *) reply.data (), "World", 5);
socket.send (reply);
}
return 0;
}
```
Как видно, API код на C и C++ очень похожи, но на языках, таких как PHP, код будет более лаконичным:
**hwserver.php: Пример сервера**
```php
<?php
/**
* Пример сервера
* Привязывает REP сокет к tcp://*:5555
* Получает "Hello" от клиента и отвечает "World"
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
$context = new ZMQContext(1);
// Сокет для взаимодействия с клиентом
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");
while(true) {
// Ждет запроса от клиента
$request = $responder->recv();
printf ("Получен запрос: [%s]\n", $request);
}
``````diff
// Выполняет некоторую "обработку"
sleep(1);
// Отправляет ответ "World"
$responder->send("World");
}
```
Вот пример кода для клиента:
**hwclient: Пример клиента на C**
```c
//
// Пример клиентского приложения "Hello World"
// Подключение REQ сокета к tcp://localhost:5555
// Отправка приветствия серверу и получение ответа World
//
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
int main(void)
{
void *context = zmq_init(1);
// Подключение к серверному сокету
printf("Подключение к серверу hello world...\n");
void *requester = zmq_socket(context, ZMQ_REQ);
zmq_connect(requester, "tcp://localhost:5555");
int request_nbr;
for (request_nbr = 0; request_nbr != 10; request_nbr++)
{
zmq_msg_t request;
zmq_msg_init_size(&request, 5);
memcpy(zmq_msg_data(&request), "Hello", 5);
printf("Отправка Hello %d...\n", request_nbr);
zmq_send(requester, &request, 0);
zmq_msg_close(&request);
zmq_msg_t reply;
zmq_msg_init(&reply);
zmq_recv(requester, &reply, 0);
printf("Получено World %d\n", request_nbr);
zmq_msg_close(&reply);
}
zmq_close(requester);
zmq_term(context);
return 0;
}
```
Это выглядит слишком просто? ZMQ – это такой инструмент, в который можно добавить немного компонентов, чтобы создать бомбу с неограниченной энергией, которую можно использовать для спасения мира!
```textdiagram
+------------+ +------------+
| | | | Zap!
| TCP socket +------->| 0MQ socket |
| | BOOM! | cC00 | POW!!
+------------+ +------------+
^ ^ ^
| | |
| | +---------+
| | |
| +----------+ |
Незаконные | |
радиоизотопы | |
из секретного | |
советского города| Шпандекс |
Космические лучи
``` Рисунок # - Ужасная авария...
Теоретически вы можете подключить миллионы клиентов к этому серверу, и все подключения будут работать без проблем. Программа будет работать отлично. Вы можете попробовать открыть клиент до запуска сервера и увидеть, что программа всё равно будет работать корректно, подумайте над тем, что это означает.
Давайте я кратко объясню, что делают эти два фрагмента кода. Сначала они создают контекст ZMQ, а затем сокет. Не пугайтесь этих незнакомых терминов, мы рассмотрим их подробнее. Сервер привязывает REP-сокет к порту cq5555 и начинает ждать запросы, отвечать на них и повторять этот цикл. Клиент отправляет запросы и ожидает ответы от сервера.
За этими кодами скрывается множество действий, но программисту не нужно беспокоиться о них. Достаточно знать, что эти коды короткие, эффективные и редко вызывают ошибки, выдерживают высокую нагрузку. Такой способ коммуникации называется режимом запрос-ответ, это одна из самых простых и прямых реализаций ZMQ. Его можно сравнить с RPC и традиционной моделью C/S.ZMQ не заботится о содержании сообщений, достаточно знать их размер в байтах. Поэтому программисту нужно убедиться, что получатель сможет правильно прочитать эти сообщения. Как преобразовать объект или сложный тип данных в сообщение, которое может быть отправлено через ZMQ, существуют специальные сериализаторы, такие как Protocol Buffers. Но при работе со строками также следует учитывать некоторые моменты.В C, строки заканчиваются нулевым символом, и вы можете отправить полную строку следующим образом:
```c
zmq_msg_init_data (&request, "Hello", 5, NULL, NULL);
```
Однако, если вы отправите эту строку на другом языке, она, возможно, не будет содержать этот нулевой байт. Например, если вы используете Python:
```python
socket.send("Привет")
```
Актуальное отправленное сообщение:
```textdiagram
+-----+ +-----+-----+-----+-----+-----+
| 6 | | П | р | и | в | е | т |
+-----+ +-----+-----+-----+-----+-----+
Рисунок # - Строка 0MQ
```
Если вы прочтете это сообщение с использованием C, вы получите что-то похожее на строку, хотя она может быть представлена как строка (шестой байт в памяти является нулевым байтом), но это не всегда корректно. В этом случае клиент и сервер имеют разные определения строки, что приведёт к странным результатам.
Когда вы получаете строку из ZMQ с использованием C, вы не можете полагаться на то, что строка завершается корректным образом. Поэтому при получении строки следует создать буфер на один байт больше, поместить строку в него и добавить завершающий ноль.
Поэтому давайте сделаем следующее предположение: **строки в ZMQ имеют фиксированную длину и не содержат завершающего нуля при передаче**. В самом простом случае строка ZMQ и фрейм сообщения ZMQ эквивалентны, как показано на рисунке выше, представляющем собой длину и последовательность байтов.Следующий функциональный метод поможет правильно получать строковые сообщения в C:
```c
// Получение строки из ZMQ сокета и преобразование её в строку C
static char *
s_recv (void *socket) {
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
int size = zmq_msg_size (&message);
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
zmq_msg_close (&message);
string[size] = 0;
return (string);
}
```
Этот код мы будем использовать в последующих примерах, поэтому можно также написать метод `s_send()` и упаковать его в `.h` файл для использования.
Таким образом, появляется `zhelpers.h`, библиотека функций ZMQ для C. Исходный код довольно длинный и полезен только для программистов C, вы можете посмотреть его в свободное время [здесь](https://github.com/imatix/zguide/blob/master/examples/C/zhelpers.h).
### Получение версии
У ZMQ есть несколько версий, и он продолжает развиваться. Если вы столкнулись с проблемой, возможно, она уже решена в следующей версии. Чтобы узнать текущую версию ZMQ, вы можете запустить следующий код:
**Отчет о версии ØMQ в C**
```c
//
// Возвращает текущую версию ZMQ
//
#include "zhelpers.h"
int main (void)
{
int major, minor, patch;
zmq_version (&major, &minor, &patch);
printf ("Текущая версия ZMQ: %d.%d.%d\n", major, minor, patch);
return EXIT_SUCCESS;
}
```
### Делаем сообщения текучимиВторой классический пример модели сообщений — одностороннее распространение данных: сервер отправляет события обновления группе клиентов. Рассмотрим пример публикации информации о погоде, включающей почтовый индекс, температуру и относительную влажность. Мы случайным образом генерируем эти данные, как это делает метеостанция.Вот код сервера, использующий порт 5556:
**wuserver: Погодный обновляющий сервер на C**
```c
//
// Погодный обновляющий сервер
// Привязывает PUB сокет к tcp://*:5556
// Отправляет случайные данные о погоде
//
#include "zhelpers.h"
int main (void)
{
// Подготовка контекста и PUB сокета
void *context = zmq_init (1);
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5556");
zmq_bind (publisher, "ipc://weather.ipc");
// Инициализация генератора случайных чисел
srandom ((unsigned) time (NULL));
while (1) {
// Генерация данных
int zipcode, temperature, relhumidity;
zipcode = randof (100000);
temperature = randof (215) - 80;
relhumidity = randof (50) + 10;
// Отправка сообщения всем подписчикам
char update [20];
sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
s_send (publisher, update);
}
zmq_close (publisher);
zmq_term (context);
return 0;
}
```
Этот сервис обновления работает непрерывно, как непрекращающийся радиосигнал.
```textdiagram
+-------------+
| |
| Publisher |
| |
+-------------+
| PUB |
\-------------/
bind
|
|
updates
|
+---------------+---------------+
| | |
updates updates updates
| | |
| | |
v v v
connect connect connect
/------------\ /------------\ /------------\
| SUB | | SUB | | SUB |
+------------+ +------------+ +------------+
| | | | | |
| Subscriber | | Subscriber | | Subscriber |
| | | | | |
+------------+ +------------+ +------------+
``` Рисунок # - Publish-Subscribe
```Вот клиентская программа, которая принимает сообщения от публикатора и обрабатывает только те сообщения, которые помечены определенным почтовым индексом, например, для Нью-Йорка это индекс 10001:
**wuclient: Программа клиента для погодных обновлений на C**
```c
//
// Клиент для получения погодной информации
// Подключаем SUB сокет к tcp://*:5556
// Собираем информацию о погоде для указанного почтового индекса и вычисляем среднюю температуру
//
#include "zhelpers.h"
int main (int argc, char *argv [])
{
void *context = zmq_init (1);
// Создаем сокет для подключения к серверу
printf ("Сбор погодной информации...\n");
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
// Устанавливаем фильтр, по умолчанию для Нью-Йорка, почтовый индекс 10001
char *filter = (argc > 1) ? argv [1] : "10001";
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
// Обрабатываем 100 обновлений
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
char *string = s_recv (subscriber);
int zipcode, temperature, relhumidity;
sscanf (string, "%d %d %d",
&zipcode, &temperature, &relhumidity);
total_temp += temperature;
free (string);
}
printf ("Средняя температура для почтового индекса '%s' составляет %dF\n",
filter, (int) (total_temp / update_nbr));
zmq_close (subscriber);
zmq_term (context);
return 0;
}
```Важно отметить, что при использовании SUB сокета необходимо использовать метод `zmq_setsockopt()` для установки фильтра. Если вы не установите фильтр, то вы ничего не получите, что часто является ошибкой новичков. Фильтр может быть любой строкой и может быть установлено несколько раз. SUB сокет получит сообщение, если оно соответствует хотя бы одному из установленных фильтров. Подписчики могут выбрать, какие сообщения не получать, используя также метод `zmq_setsockopt()`.Комбинация PUB-SUB сокетов асинхронна. Клиент использует `zmq_recv()` в цикле для получения сообщений, и если отправить сообщение на SUB сокет, будет возникать ошибка; аналогично, сервер может постоянно использовать `zmq_send()` для отправки сообщений, но не может использовать `zmq_recv()` на PUB сокете.
Еще одна важная особенность PUB-SUB сокетов заключается в том, что вы не можете знать, когда SUB начинает получать сообщения. Даже если вы открываете SUB сокет раньше, чем начинаете отправлять сообщения на PUB, SUB все равно может пропустить некоторые сообщения, так как время на установление соединения не нулевое. Эта "медленная связь" может сбить с толку многих людей, поэтому я подробно объясню это здесь. Вспомните, что ZMQ выполняет асинхронную передачу I/O в фоновом режиме. Если у вас есть два узла, соединенные в следующем порядке:
* Подписчик подключается к конечной точке для приема сообщений и подсчета;
* Публикатор привязывается к конечной точке и сразу отправляет 1000 сообщений.
Результат выполнения может заключаться в том, что подписчику не придет ни одного сообщения. Это может вызвать удивление и заставить вас проверять, правильно ли настроены подписки, и повторно пытаться, но результат будет таким же.Мы знаем, что для установления TCP-соединения требуется три рукопожатия, что занимает несколько миллисекунд, и этот временной интервал увеличивается с увеличением количества узлов. В течение этого короткого времени ZMQ может отправить очень много сообщений. Например, если установка соединения занимает 5 миллисекунд, то ZMQ может отправить все 1000 сообщений за 1 миллисекунду.В главе 2 я объясню, как синхронизировать публикатора и подписчика таким образом, чтобы публикатор начинал отправлять сообщения только после того, как подписчик будет готов. Есть простой способ синхронизации PUB и SUB — это задержка отправки сообщений публикатором на некоторое время. В реальном программировании я не рекомендую использовать этот метод, так как он слишком хрупкий и трудно контролируемый. Однако мы временно используем `sleep` для решения этой проблемы, а в главе 2 рассмотрим правильный подход.
Другой способ синхронизации состоит в том, что поток сообщений от публикатора считается бесконечным, поэтому потеря первых нескольких сообщений не имеет значения. Наш клиент для метеорологических данных работает именно так.
Пример клиента для метеорологических данных собирает 1000 записей для определенного почтового индекса, среди которых около 10 миллионов сообщений публикуются. Вы можете запустить клиент до запуска сервера, работать некоторое время, а затем перезапустить сервер, и клиент продолжит работать нормально. После сбора всех необходимых данных клиент вычисляет и выводит среднюю температуру.
Несколько замечаний о модели публикации-подписки:* Подписчики могут подключаться к нескольким публикаторам и получать сообщения последовательно;
* Если у публикатора нет подписчиков, его сообщения будут немедленно отбрасываться;
* Если вы используете протокол TCP, сообщения будут накапливаться на стороне публикатора, если скорость обработки подписчиком недостаточна. Мы обсудим использование порогового значения (HWM) для защиты публикатора.
* В текущих версиях ZMQ фильтрация сообщений происходит на стороне подписчика. То есть, публикатор отправляет все сообщения подписчику, который затем отбрасывает те сообщения, которые не были подписаны. Я попробовал отправить 10 миллионов сообщений на своей четырёхъядерной машине, и это было достаточно быстро, но ничего особенного:```
ph@ws200901:~/work/git/0MQGuide/examples/c$ time wuclient
Сбор данных с сервера погоды...
Средняя температура для почтового индекса '10001' составила 18F
real 0m5.939s
user 0m1.590s
sys 0m2.290s
```
### Распределенное обработание
В следующем примере программы мы будем использовать ZMQ для супервычислений, то есть параллельной обработки модели:
* Диспетчер задач будет создавать множество задач, которые можно обрабатывать параллельно;
* Есть группа worker, которые будут обрабатывать эти задачи;
* Агрегатор результатов будет принимать все результаты от worker и суммировать их.
На практике worker могут быть рассредоточены на разных машинан и использовать GPU (графический процессор) для выполнения сложных вычислений. Ниже приведен код диспетчера задач, который создает 100 задач, каждая из которых требует от получившего ее worker задержки на несколько миллисекунд.
**taskvent: Параллельный диспетчер задач на C**
```c
//
// Диспетчер задач
// Привязывает PUSH сокет к tcp://localhost:5557
// Отправляет группу задач связанным worker
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Сокет для отправки сообщений
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// Сокет для отправки сигнала начала
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");
printf ("Нажмите Enter после подготовки worker: ");
getchar ();
printf ("Отправка задач на worker...\n");
}
``` // Отправка сигнала начала
s_send(sink, "0");
// Инициализация генератора случайных чисел
srandom((unsigned)time(NULL));
// Отправка 100 задач
int task_nbr;
int total_msec = 0; // Ожидаемое время выполнения (мс)
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Генерация случайного времени работы от 1 до 100 мс
workload = randof(100) + 1;
total_msec += workload;
char string[10];
sprintf(string, "%d", workload);
s_send(sender, string);
}
printf("Ожидаемое время выполнения: %d мс\n", total_msec);
sleep(1); // Задержка для завершения отправки задач```c
zmq_close (sink);
zmq_close (sender);
zmq_term (context);
return 0;
}
``````textdiagram
+-------------+
| |
| Вентилятор |
| |
+-------------+
| PUSH |
\------+------/
|
задачи
|
+---------------+---------------+
| | |
задача задача задача
| | |
v v v
/------------\ /------------\ /------------\
| PULL | | PULL | | PULL |
+------------+ +------------+ +------------+
| | | | | |
| Worker | | Worker | | Worker |
| | | | | |
+------------+ +------------+ +------------+
| PUSH | | PUSH | | PUSH |
\-----+------/ \-----+------/ \-----+------/
| | |
результат результат результат
| | |
+---------------+---------------+
|
результаты
|
v
/-------------\
| PULL |
+-------------+
| |
| Слив |
| |
+-------------+
``` Рисунок # - Параллельный конвейер
```
Вот код работника, который принимает информацию, задерживает выполнение на указанное количество миллисекунд и отправляет сигнал о завершении выполнения:
**taskwork: Параллельный рабочий процесс в C**
```c
//
// Задачи выполнителя
// Подключаем PULL сокет к tcp://localhost:5557
// Получаем задачи от задачного диспетчера
// Подключаем PUSH сокет к tcp://localhost:5558
// Отправляем результаты в сборщик результатов
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Сокет для получения задач
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Сокет для отправки результатов
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// Цикл обработки задач
while (1) {
char *string = s_recv (receiver);
// Выводим прогресс обработки
fflush (stdout);
printf ("%s.", string);
// Начинаем обработку
s_sleep (atoi (string));
free (string);
// Отправляем результат
s_send (sender, "");
}
zmq_close (receiver);
zmq_close (sender);
zmq_term (context);
return 0;
}
```
Вот код результата-накопителя. Он собирает результаты обработки 100 задач и вычисляет общее время выполнения, чтобы определить, является ли задача параллельной вычислительной.
**tasksink: Параллельный накопитель задач на C**
```c
//
// Задача-накопитель
// Привязываем PULL сокет к tcp://localhost:5558
// Собираем результаты обработки от worker
//
#include "zhelpers.h"
``````c
int main (void)
{
// Подготовка контекста и сокета
void *context = zmq_init (1);
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// Ожидание сигнала начала
char *string = s_recv (receiver);
free (string);
// Начало отсчета времени
int64_t start_time = s_clock ();
// Убедиться, что все 100 задач обработаны
int task_nbr;
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (receiver);
free (string);
if ((task_nbr / 10) * 10 == task_nbr)
printf (":");
else
printf (".");
fflush (stdout);
}
// Вычисление и вывод общего времени выполнения
printf ("Общее время выполнения: %d миллисекунд\n",
(int) (s_clock () - start_time));
zmq_close (receiver);
zmq_term (context);
return 0;
}
```
Среднее время выполнения одной задачи составляет около 5 секунд. Вот результаты выполнения при запуске 1, 2 и 4 worker'ов:
```
# 1 worker
Общее время выполнения: 5034 миллисекунды
# 2 worker'а
Общее время выполнения: 2421 миллисекунды
# 4 worker'а
Общее время выполнения: 1018 миллисекунды
```
Несколько деталей относительно этого кода:
* Worker подключен к задаче-распределителю сверху и к задаче-накопителю снизу, что позволяет запустить любое количество worker'ов. Однако если worker привязан к конечной точке, а не подключен к ней, нам потребуется больше конечных точек и настройка задачи-распределителя и задачи-накопителя. Поэтому задача-распределитель и задача-накопитель являются более стабильными частями этой сети, и они должны быть привязаны к конечной точке, а не worker'ы, так как они более динамичны.
```* Нам нужно выполнить некоторую синхронизацию, чтобы ждать, пока все worker'ы запустятся, прежде чем распределять задачи. Это важно в ZMQ и сложно решить. Действие привязки сокета занимает некоторое время, поэтому когда первый worker подключается, он сразу получает множество задач. Поэтому, если мы не выполним синхронизацию, эти задачи просто не будут выполняться параллельно. Вы можете попробовать это самостоятельно.
* Распределитель задач использует сокет PUSH для равномерного распределения задач между worker'ами (предполагается, что все worker'ы уже подключены), это называется _балансировкой нагрузки_, мы встретимся с этим ещё много раз.
* Сборщик результатов использует сокет PULL для равномерного сбора сообщений от worker'ов, это называется _честной очередью_:
```textdiagram
+---------+ +---------+ +---------+
| PUSH | | PUSH | | PUSH |
\----+----/ \----+----/ \----+----/
| | |
R1, R2, R3 R4 R5, R6
| | |
+-------------+-------------+
|
честная очередь
R1, R4, R5, R2, R6, R3
|
v
/-------------\
| PULL |
+-------------+
Рисунок # - Честная очередь
```Пайплайн также может столкнуться с медленными соединениями, что может ввести в заблуждение относительно того, что сокет PUSH не выполняет балансировки нагрузки. Если какой-то worker получает больше запросов, это потому, что его сокет PULL подключается быстрее, чем остальные, и он получает дополнительные сообщения до того, как подключатся остальные worker'ы.### Программирование с использованием ZMQ
После просмотра этих примеров программ вы наверняка хотите немедленно приступить к программированию с использованием ZMQ. Однако перед тем как начать, есть несколько советов, которые помогут вам избежать проблем в будущем:
* Изучайте ZMQ постепенно, хотя это всего лишь набор API, он предоставляет бесконечные возможности. Учитесь постепенно, осваивая каждую функцию.
* Пишите красивый код. Некрасивый код скрывает проблемы и затрудняет помощь другим людям. Например, вы можете привыкнуть использовать бесполезные имена переменных, но прочитавший ваш код не будет знать, что они бесполезны. Используйте осмысленные имена переменных, а не случайные. Отступы должны быть единообразными, а структура кода — ясной. Красивый код делает ваш мир лучше.
* Проверяйте код по мере написания, чтобы быстро локализовать проблемы. Это особенно важно при написании приложений на ZMQ, так как часто невозможно сразу написать правильный код.* Когда вы обнаруживаете, что ваш код не работает правильно, разбейте его на части и проверьте, какая часть не выполняется корректно. ZMQ позволяет создавать очень модульные программы, поэтому следует использовать эту возможность. При необходимости следует использовать абстрактные методы для написания программ (классы, члены функции и т. д.). Не следует произвольно копировать код, так как вместе с кодом копируются и ошибки.Рассмотрим следующий фрагмент кода, который мне предложил коллега для доработки:
```c
// Attention: do not use this code!
static char *topic_str = "msg.x|";
void* pub_worker(void* arg){
void *ctx = arg;
assert(ctx);
void *qskt = zmq_socket(ctx, ZMQ_REP);
assert(qskt);
int rc = zmq_connect(qskt, "inproc://querys");
assert(rc == 0);
void *pubskt = zmq_socket(ctx, ZMQ_PUB);
assert(pubskt);
rc = zmq_bind(pubskt, "inproc://publish");
assert(rc == 0);
uint8_t cmd;
uint32_t nb;
zmq_msg_t topic_msg, cmd_msg, nb_msg, resp_msg;
zmq_msg_init_data(&topic_msg, topic_str, strlen(topic_str), NULL, NULL);
fprintf(stdout,"WORKER: готов к приему сообщений\n");
// Attention: do not use this code, it does not work!
// For example, topic_msg will be invalid the second time
while (1){
zmq_send(pubskt, &topic_msg, ZMQ_SNDMORE);
zmq_msg_init(&cmd_msg);
zmq_recv(qskt, &cmd_msg, 0);
memcpy(&cmd, zmq_msg_data(&cmd_msg), sizeof(uint8_t));
zmq_send(pubskt, &cmd_msg, ZMQ_SNDMORE);
zmq_msg_close(&cmd_msg);
fprintf(stdout, "received cmd %u\n", cmd);
zmq_msg_init(&nb_msg);
zmq_recv(qskt, &nb_msg, 0);
memcpy(&nb, zmq_msg_data(&nb_msg), sizeof(uint32_t));
zmq_send(pubskt, &nb_msg, 0);
zmq_msg_close(&nb_msg);
fprintf(stdout, "received nb %u\n", nb);
zmq_msg_init_size(&resp_msg, sizeof(uint8_t));
memset(zmq_msg_data(&resp_msg), 0, sizeof(uint8_t));
zmq_send(qskt, &resp_msg, 0);
zmq_msg_close(&resp_msg);
}
return NULL;
}
```Вот мой вариант переписанного кода, в котором исправлены некоторые ошибки:
```c
static void *
worker_thread (void *arg) {
void *context = arg;
void *worker = zmq_socket (context, ZMQ_REP);
assert (worker);
int rc;
rc = zmq_connect (worker, "ipc://worker");
assert (rc == 0);
void *broadcast = zmq_socket (context, ZMQ_PUB);
assert (broadcast);
rc = zmq_bind (broadcast, "ipc://publish");
assert (rc == 0);
while (1) {
char *part1 = s_recv (worker);
char *part2 = s_recv (worker);
printf ("Рабочий получил [%s][%s]\n", part1, part2);
s_sendmore (broadcast, "msg");
s_sendmore (broadcast, part1);
s_send (broadcast, part2);
free (part1);
free (part2);
s_send (worker, "OK");
}
return NULL;
}
```
В конце вышеупомянутого сегмента программы сокет передается между двумя потоками, что приводит к непредсказуемым проблемам. Такое поведение в ZMQ 2.1 является легальным, но не рекомендуется.### Версия ZMQ 2.1
История учит нас, что ZMQ 2.0 был распределенной системой сообщений с низкой задержкой, которая выделялась среди множества аналогичных программ, отказываясь от различных излишеств и провозглашая лозунг "без границ". Это была стабильная версия, которую мы использовали все это время.
Со временем все меняется. То, что было популярно в 2010 году, может уже не быть актуальным в 2011 году. Когда разработчики ZMQ и сообщество активно обсуждали различные проблемы ZMQ, вышла версия ZMQ 2.1, ставшая новой стабильной версией.
Это руководство в основном ориентировано на описание ZMQ 2.1, поэтому для разработчиков, переходящих с ZMQ 2.0, есть несколько моментов, на которые следует обратить внимание:
* В 2.0 вызовы `zmq_close()` и `zmq_term()` приводили к тому, что все непереданные сообщения были бы废弃。Поэтому после отправки всех сообщений нельзя было сразу закрывать программу; в примерах для 2.0 обычно использовался `sleep(1)`, чтобы избежать этой проблемы. Однако в 2.1 это не требуется, программа будет завершена только после того, как все сообщения будут переданы.
* В 2.0 можно было вызвать `zmq_term()`, когда еще были открыты сокеты, что делало это действие безопасным. Однако в 2.1 это становится опасным и может привести к блокировке программы. Поэтому в 2.1 программах сначала закрываются все сокеты, а затем программа завершается. Если есть непереданные сообщения в сокете, программа будет ждать до тех пор, пока они не будут переданы, или до тех пор, пока не будет установлено значение LINGER для сокета (например, установка его в ноль), после чего сокет будет закрыт.
```c
int zero = 0;
zmq_setsockopt (mysocket, ZMQ_LINGER, &zero, sizeof (zero));
```
* В 2.0 функция `zmq_poll()` не имела функции таймера, она немедленно возвращалась при выполнении условия. Для проверки оставшегося времени требовалось использовать цикл внутри программы. В 2.1 функция `zmq_poll()` возвращает после истечения указанного времени, что позволяет использовать её как таймер.
```c
zmq_poll(items, num_items, timeout);
```* В версии 2.0, ZMQ игнорирует сигналы прерывания операционной системы, что означает, что вызовы к libzmq не получают сообщение EINTR, и поэтому не могут обрабатывать сигналы типа SIGINT (Ctrl-C). В версии 2.1 эта проблема была решена, и методы, такие как `zmq_recv()`, принимают и возвращают сообщение EINTR от операционной системы.
### Правильное использование контекста
Программа ZMQ всегда начинается с создания контекста, который затем используется для создания сокетов. В C-языке программирования функция для создания контекста называется zmq_init(). В одном процессе следует создать только один контекст. С точки зрения реализации, контекст является контейнером, содержащим все сокеты в этом процессе и предоставляющим реализацию протокола inproc для быстрого соединения различных потоков внутри процесса. Если в одном процессе создаются два контекста, это эквивалентно запуску двух экземпляров ZMQ. Если это именно то, что вам нужно, это нормально, но обычно это не требуется.
**Используйте функцию zmq_init() для создания контекста в процессе и функцию zmq_term() для его закрытия при завершении**
Если вы используете системный вызов fork(), каждый процесс должен иметь свой собственный объект контекста. Если функция zmq_init() была вызвана до вызова fork(), каждый дочерний процесс будет иметь свой собственный объект контекста. Обычно вам потребуется сделать что-то интересное в дочернем процессе, а родительский процесс будет управлять ими.### Правильное завершение и очистка
Хорошей привычкой программиста является выполнение очистки при завершении работы. Когда вы пишете приложение ZMQ на языках, таких как Python, система автоматически выполняет очистку за вас. Однако если вы используете язык C, вам следует быть осторожным, иначе могут возникнуть утечки памяти или проблемы с устойчивостью приложения.
Утечка памяти — это лишь одна из проблем, но ZMQ действительно заботится о том, как программа завершает работу. Причины этого сложны, но кратко можно сказать следующее: если какие-либо сокеты остаются открытыми, вызов функции `zmq_term()` может привести к зависанию программы; даже если все сокеты закрыты, если есть незавершенные сообщения для отправки, вызов функции `zmq_term()` также может привести к зависанию программы. Только когда опция `LINGER` для сокета установлена в 0, можно избежать этих проблем.
Нам следует обращать внимание на такие объекты ZMQ, как сообщения, сокеты и контексты. К счастью, их немного, по крайней мере, в обычных приложениях:
* После обработки сообщения обязательно вызывайте функцию `zmq_msg_close()` для его закрытия;
* Если вы открываете или закрываете много сокетов одновременно, возможно, стоит перепланировать структуру вашего приложения;
* При завершении программы сначала закройте все сокеты, а затем вызовите функцию `zmq_term()` для удаления объекта контекста.Если вы хотите использовать ZMQ для многопоточной разработки, вам придётся учесть больше вопросов. Мы подробно рассмотрим многопоточную разработку в следующей главе, но если вы не можете дождаться, вот несколько советов по завершению работы:
* Не используйте один и тот же сокет в нескольких потоках. Не спрашивайте почему, просто не делайте этого.
* Закройте все сокеты и закройте объект контекста в основном процессе.
* Если есть ожидающие recv или poll вызовы, поймите эти ошибки в основном процессе и закройте сокеты в соответствующих потоках. Не повторно закрывайте контекст, функция zmq_term() будет ждать, пока все сокеты безопасно закроются.
Да, процесс сложный, поэтому реализаторы API для различных языков могут упаковать эти шаги, чтобы сделать завершение работы программы менее сложной задачей.
### Почему нам нужен ZMQ
Теперь, когда мы запустили ZMQ, давайте вспомним, почему нам нужен ZMQ:Современные приложения часто включают компоненты, работающие через сеть, будь то локальная сеть или Интернет. Разработчики этих программ используют различные механизмы передачи сообщений. Некоторые используют продукты очередей сообщений, а большинство предпочитает самостоятельно реализовывать эти механизмы, используя протоколы TCP или UDP. Эти протоколы не сложны в использовании, но простое отправление сообщений от A к B и надёжная передача сообщений в любых условиях — это две совершенно разные вещи.Рассмотрим некоторые типичные проблемы, возникающие при передаче сообщений с использованием чистого протокола TCP. Любой повторно используемый слой передачи сообщений обязательно должен решать следующие вопросы:
* Как обрабатывать ввод/вывод? Должна ли программа блокироваться в ожидании ответа, или же следует выполнять эту задачу в фоновом режиме? Это ключевой фактор в проектировании программного обеспечения. Блокирующее ввод/вывод может усложнить расширение архитектуры программы, а фоновая обработка ввод/вывода также представляет собой сложную задачу.
* Как обрабатывать временные, непостоянные компоненты? Нужно ли нам делить компоненты на клиентов и серверы, требуя от серверов постоянной работы? А если нам нужно соединять серверы? Будем ли мы пытаться заново подключаться каждые несколько секунд?
* Как представлять сообщение? Как можно разбивать сообщение таким образом, чтобы оно было легко читаемым и записываемым, без опасений переполнения буфера, эффективно передавая как маленькие сообщения, так и большие файлы, такие как видео?
* Как обрабатывать сообщения, которые не могут быть немедленно отправлены? Например, если нам нужно ждать, пока сетевой компонент не восстановится? Просто игнорируем это сообщение, сохраняем его в базе данных или в очереди в памяти?* Где хранить очередь сообщений? Что делать, если какой-то компонент читает очередь сообщений слишком медленно, вызывая накопление сообщений? Какие стратегии использовать?
* Как обрабатывать потерянные сообщения? Ждать новых данных, запросить повторную отправку или требуется создать новую систему надежности, чтобы гарантировать, что сообщения не будут потеряны? А если эта система сама выйдет из строя?
* Если нам нужно изменить сетевое соединение, например, заменить TCP одиночным широковещательной передачей или перейти на IPv6? Нужно ли переписывать все приложения или абстрагировать этот протокол в отдельный слой?
* Как маршрутизировать сообщения? Можно ли отправлять сообщения сразу нескольким узлам? Можно ли возвращать ответные сообщения обратно к отправителю запроса? Как нам создать API для другой языковой среды? Нужно ли полностью переписывать протокол или достаточно переупаковать библиотеку?
Как нам передавать сообщения между различными архитектурными структурами? Необходимо ли определять кодировку для сообщений?
Как нам обрабатывать ошибки сетевой связи? Ждать и повторять попытку, игнорировать или отменять операцию?Можно найти открытый исходный код в качестве примера, например [Hadoop Zookeeper](http://hadoop.apache.org/zookeeper/), и рассмотреть его C-библиотеку API, [src/c/src/zookeeper.c](http://github.com/apache/zookeeper/blob/trunk/src/c/src/zookeeper.c). Этот код состоит приблизительно из 3200 строк без комментариев и реализует клиент-серверную сеть. Он работает эффективно благодаря использованию `poll()` вместо `select()`. Однако, Zookeeper следует рассматривать как универсальный уровень обмена сообщениями и подробно прокомментировать. Такие модули должны максимально использовать существующие решения, а не создавать новые колеса.```
+------------+
| |
| Часть A |
| |
+------------+
^
|
TCP
|
v
+------------+
| |
| Часть B |
| |
+------------+
Рис. # - Обмен сообщениями на начальной стадии
```
Но как создать такой повторно используемый уровень сообщений? Почему люди предпочитают переписывать код управления примитивными TCP-сокетами в своих проектах, вместо создания такого общего пакета?На самом деле, создание универсального уровня сообщений является очень сложной задачей, и поэтому проекты с открытым исходным кодом продолжают экспериментировать, а коммерческие продукты для обмена сообщениями становятся все более сложными, дорогими, громоздкими и хрупкими. В 2006 году компания iMatix разработала протокол AMQP, который предоставил проектам с открытым исходным кодом первый повторно используемый уровень сообщений. [AMQP] превосходит другие продукты, но [он всё ещё сложен, дорог и хрупок](http://www.imatix.com/articles:whats-wrong-with-amqp). Учиться работе с ним требуются недели, а создание рабочей архитектуры может занять месяцы, и к тому времени может быть слишком поздно. Большинство проектов сообщений, таких как AMQP, для решения вышеупомянутых проблем придумали новые концепции, такие как концепцию "агента", которая включает функции адресации, маршрутизации, очередей и других. В результате был создан клиент-серверный протокол и соответствующие API поверх незаметного протокола, что позволяет приложениям и агентам взаимодействовать друг с другом. Агент действительно является хорошим решением для снижения сложности крупных сетевых структур. Однако применение механизма агентов в проектах, таких как Zookeeper, может быть ещё хуже, так как это требует добавления нового компьютера и создания нового точки отказа. Агенты могут постепенно стать новыми бутылочными горлами и становиться более рискованными для управления.Если программное обеспечение позволяет, можно добавить второй, третий, четвертый агент и т. д., создавая модель избыточности и отказоустойчивости. Некоторые люди делают именно это, что усложняет архитектуру системы и увеличивает риск. В такой агентской архитектуре требуется специализированная команда по эксплуатации. Вам придется круглосуточно наблюдать за состоянием агентов и время от времени корректировать их поведение. Вам потребуется добавлять компьютеры и дополнительные резервные машины, а также иметь персонал для управления этими машинами. Это имеет смысл только для крупных сетевых приложений, так как они имеют больше модулей для перемещения, несколько команд для разработки и поддержки, а также были созданы за многие годы.Таким образом, разработчики малых и средних приложений оказываются в затруднительном положении. Они вынуждены избегать написания сетевых приложений и вместо этого создавать программы, которые не требуют масштабирования; либо использовать примитивные методы сетевой разработки, что приводит к созданию очень хрупкого и сложного программного обеспечения, которое сложно поддерживать; либо выбрать продукт для сообщений, который позволяет создавать масштабируемые приложения, но требует значительных затрат. Нет ни одной подходящей опции, и это одна из причин, почему системы сообщений стали широко распространённой проблемой в прошлом веке.
```textdiagram
+---+ | +---+
+---+ | | +---+ | | |
| +-->| | | | | | |
| | +---+ | | | +-+-+
+-+-+ +-+-+ | |
| | | |
| +-----------------+
| | | |
+-----------------------+
| | | |
+-------|-------|----+--|------+
| v | v |
+-+-+ +---+ | +---+ |
| | | | +-+-+ | | |
| | | | | | | | |
+---+ +---+ | | +---+ |
^ +---+ ^ |
| ^ | +-+
+-------+-------+-------+ |
| | | |
v +-+-+ v +---+ |
+---+ | | +---+ | | |
| | | |<--+ | | |<-+
| | +---+ | | +-+-+
+---+ +---+
Рисунок # - Сообщения, когда они становятся
```Что нам действительно нужно, так это программное обеспечение, которое может выполнять все те задачи, которые выполняет крупное программное обеспечение, но при этом быть простым в использовании, иметь низкую стоимость и быть применимым ко всем приложениям без каких-либо зависимостей. Без дополнительных модулей снижается вероятность возникновения ошибок. Такое программное обеспечение должно работать на всех операционных системах и поддерживать все языки программирования. ZMQ — это такой программный продукт: он эффективен, предоставляет встроенные библиотеки, позволяющие приложениям хорошо масштабироваться в сети, при этом затраты остаются низкими.Основные характеристики ZMQ включают:* ZMQ асинхронно обрабатывает операции ввода-вывода в фоновых потоках, используя структуры данных, которые не могут вызвать мертвую блокировку для хранения сообщений.
* Компоненты сети могут быть добавлены или удалены, ZMQ автоматически восстанавливает соединения, что позволяет запускать компоненты в любом порядке; таким образом, создается сервисно-ориентированная архитектура (SOA), где службы могут свободно входить и выходить из сети.
* Когда это необходимо, ZMQ автоматически помещает сообщения в очередь для сохранения, начиная отправку сразу после установления соединения.
* ZMQ имеет механизм порогового значения (High Water Mark, HWM), который предотвращает переполнение очередей. При заполнении очереди ZMQ автоматически блокирует отправителя или отбрасывает часть сообщений, в зависимости от используемого режима передачи сообщений.
* ZMQ позволяет использовать различные протоколы связи, такие как TCP, широковещательная сеть, внутрисистемная связь, межпроцессорная связь. При изменении протокола связи вам не требуется изменять код.
* ZMQ правильно обрабатывает медленные узлы, применяя различные стратегии в зависимости от режима передачи сообщений.
* ZMQ предлагает множество режимов маршрутизации сообщений, таких как режим запрос-ответ, режим публикация-подписка и другие. Эти режимы можно использовать для создания сетевых топологий.* В ZMQ можно создавать промежуточные устройства (очень маленькие), которые помогают упрощать структуру сети.
* ZMQ отправляет целое сообщение, используя механизм фреймов сообщений для передачи. Если вы отправите сообщение размером Yöntem 10 КБ, вы получите сообщение того же размера.
* ZMQ не требует использования определённого формата сообщений, сообщения могут быть нулевой длины или иметь размер до гигабайтов. При представлении этих сообщений вы можете использовать сериализаторы, такие как Google Protocol Buffers или XDR.
* ZMQ умело обрабатывает сетевые ошибки, иногда повторяя попытки, а иногда информируя вас о возникших ошибках.
* ZMQ даже может снижать нагрузку на окружающую среду, экономя время работы процессора и, следовательно, электроэнергию.На самом деле, возможности ZMQ не ограничиваются этим, он может полностью изменить подход к написанию сетевых приложений. Хотя на первый взгляд он представляет собой набор API для работы с сокетами, позволяющий использовать zsock_recv() и zsock_send() для приема и передачи сообщений, но обработка сообщений становится центральной частью приложений, и вскоре ваши программы станут состоять из модулей обработки сообщений, что делает их элегантными и естественными. Его масштабируемость также высока, так как каждая задача может выполняться одним узлом (узел — это поток), двумя узлами на одном компьютере (узел — это процесс) или двумя компьютерами в одной сети (узел — это компьютер), без необходимости изменения приложения.
### Расширяемость сокетов ZMQ
Давайте рассмотрим пример расширяемости сокетов ZMQ. Этот скрипт запускает службу метеорологической информации и несколько клиентов:
```
wuserver &
wuclient 12345 &
wuclient 23456 &
wuclient 34567 &
wuclient 45678 &
wuclient 56789 &
```
В процессе выполнения можно использовать команду `top`, чтобы проверить состояние процессов (вот пример для четырёхъядерной машины):
```
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
7136 ph 20 0 1040m 959m 1156 R 157 12.0 16:25.47 wuserver
7966 ph 20 0 98608 1804 1372 S 33 0.0 0:03.94 wuclient
7963 ph 20 0 33116 1748 1372 S 14 0.0 0:00.76 wuclient
7965 ph 20 0 33116 1784 1372 S 6 0.0 0:00.47 wuclient
7964 ph 20 0 33116 1788 1372 S 5 0.0 0:00.25 wuclient
7967 ph 20 0 33072 1740 1372 S 5 0.0 0:00.35 wuclient
```Рассмотрим, что происходит: программа службы метеорологической информации использует один сокет, но способна одновременно отправлять сообщения пяти клиентам параллельно. Можно иметь сотни клиентов, работающих параллельно, а сервер не видит этих клиентов и не может ими управлять.
### Как решить проблему с потерянными сообщениямиПри написании приложений на ZMQ наиболее часто встречающаяся проблема — это отсутствие получения сообщений. Ниже приведена схема решения этой проблемы, которая охватывает основные причины возникновения ошибок. Не беспокойтесь, если вы ещё не знакомы со всеми терминами, они будут подробно объяснены в последующих главах.
```textdiagram
+-----------------+
| |
| Я не могу получить данные! |
| |
| {o} |
+--------+--------+
|
|
v
+-----------------+ +-----------------+ +------------------+
| | | | | Используйте функцию |
| Вы получаете сообщение из SUB сокета? | | zmq_setsockopt() |
| | Да | | Нет | Установите ZMQ_SUBSCRIBE |
| {o} | | {o} | | пустой строкой |
+--------+--------+ +--------+--------+ +------------------+
| Нет | Да
| |
| v
| +-----------------+ +------------------+
| | | | Откройте все SUB сокеты |
| | После PUB был ли запущен SUB? | | перед тем как открыть PUB, чтобы избежать потери сообщений. |
| | | Нет | |
| | {o} | | |
| +--------+--------+ +------------------+
| | Да
| |
| v
| +-------------------------+
``` | | Пожалуйста, прочитайте объяснение о "медленной связи". |
| | |
| +-------------------------+
|
|
v
+-----------------+ +--------------------+
| | | В REQ сокете выполните |
| Используете ли вы REQ и REP сокеты? | | send и recv, проверьте результат. |
| | Да | В REP сокете выполните |
| {o} | | recv и send. |
+--------+--------+ +--------------------+
| Нет
|
v
+-----------------+ +---------------------+ +-----------------+
| | | Первый успешный | | Перед отправкой |
| Используется ли | | соединение | | задачи требуется|
| PUSH сокет? +------->| PULL может получить +------->| дополнительная |
| {o} | | все задачи, а другие | | работа для |
| | | PULL не получат. | | синхронизации |
+--------+--------+ +---------------------+ +-----------------+
| Нет
|
v
+-----------------+ +-----------------+
| | | |
| Проверены ли все | | Проверены ли все |
| возвращаемые коды? | | возвращаемые коды |
| методов ZMQ? +------->| C языке с помощью |
| {o} | | assert. |
+--------+--------+ +-----------------+
| Да
|
v
+-----------------+ +-----------------+ +------------------+
| | | | | | | Используются ли | | Передаются ли | | Для каждого потока|
| потоки в приложении? | | объекты сокетов | | создаются отдельные|
| | Да | между потоками? | | объекты сокетов. |
| {o} | | {o} | | |
+--------+--------+ +--------+--------+ +------------------+
| Нет | Нет
+--------------------------+
|
v
+-----------------+ +-----------------+ +------------------+
| | | | | |
| Используется ли | | Многократно ли | | Каждый процесс |
| протокол inproc? | | вызывается zmq_init()? | | вызывает zmq_init()|
| | | | | один раз. |
+-----------------+ +-----------------+ +------------------+ | | Да | | Да | |
| {o} | | {o} | | |
+--------+--------+ +--------+--------+ +------------------+
| Нет | Нет
| |
| v
| +-----------------+
| | |
| | Проверить, были ли |
| | предварительно сокращены конечные |
| | точки перед подключением к ним. |
| +-----------------+
|
v
+-----------------+ +-----------------+ +-----------------+ | | | Проверить, является ли| | Если используется постоянное |
| Используется ли ROUTER сокет? | ------->| адрес правильным. ZMQ| | соединение, проверьте, был ли |
| | Да | Отбросить сообщения, которые невозможно маршрутизировать.| | он установлен до подключения, а |
| {o} | | | | не после. |
+--------+--------+ +-----------------+ +--------+--------+
| Нет
|
v
+-----------------+ +--------------------+
| | | |
| Есть ли только отдельные потери сообщений? | ------->| возможно, есть еще клиенты в фоновом режиме, найдите их и завершите работу.|
| | Да | |
| {o} | | |
+--------+--------+ +--------------------+
| Нет
|
v
+-----------------+
| |
| Напишите самый простой тестовый пример и обратитесь за помощью в канал ZMQ IRC. |
| |
+-----------------+### Предупреждение: Ваше мышление может быть перевернуто!Традиционное сетевое программирование обычно предполагает, что сокет может поддерживать соединение только с одним узлом. Хотя существуют протоколы широковещательной рассылки, они являются сторонними. Когда мы считаем, что "один сокет = одно соединение", мы расширяем архитектуру приложения определенным образом: создаем поток для каждой логической части, каждый из которых независимо поддерживает один сокет.
Однако в мире ZMQ сокеты являются умными и многопоточными, автоматически поддерживающими набор полных соединений. Вы не можете видеть эти соединения и даже не можете манипулировать ими напрямую. Когда вы отправляете и получаете сообщения, выполняете опрос и так далее, вы работаете только с сокетами ZMQ, а не с самыми соединениями. Таким образом, соединения в мире ZMQ являются приватными и недоступными для внешнего воздействия, что делает ZMQ легким для масштабирования.
Поскольку ваш код взаимодействует только с определенным сокетом, он может обрабатывать любое количество соединений, используя любой сетевой протокол. А сообщение ZMQ позволяет ещё более дешево и легко масштабировать.Таким образом, традиционное мышление не применимо в мире ZMQ. Когда вы читаете примеры кода, вы можете попытаться связать эти примеры с традиционным сетевым программированием: когда вы читаете "сокет", вы думаете, что это означает соединение с другим узлом — это неверное понимание; когда вы читаете "поток", вы думаете, что это означает соединение с другим узлом — это также неверное понимание. Если это ваш первый раз с этим руководством, и вы провели один-два дня (или даже больше) разработки с использованием ZMQ, вы, возможно, почувствуете некоторое замешательство. Как ZMQ может сделать всё так просто? Вы снова попытаетесь понять ZMQ с помощью вашего обычного мышления, но безуспешно. В конце концов, вы будете покорены идеями ZMQ, и перед вами откроются новые горизонты, начиная наслаждаться преимуществами ZMQ.
[iMatix]: http://www.imatix.com/
[AMQP]: http://www.amqp.org/
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )