1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/andwp-zguide-cn

В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
chapter1.md 91 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 23.06.2025 22:09 06c6f61

Руководство по ZMQ

Автор: Пиетер Хинтженс ph@imatix.com, CEO компании iMatix Corporation.
Перевод: Жань Жи jizhang@anjuke.com, инженер компании Anjuke Group, Haozhuwang

Спасибо Биллу Дезмаре, Брайану Дорси, CAF, Дэниелу Лин, Эрику Дешранжу, Гонсало Диетхельму, Гвидо Гольдштейну, Хантеру Форду, Камилю Шакирову, Мартину Сустрику, Майклу Кастлмену, Навеену Чавле, Николаю Педуччи, Оливеру Смиту, Оливье Шаму, Петеру Александр, Пьеру Рулеау, Рэнди Драйбергу, Джону Уину, Алексу Томасу, Михаилу Минкову, Джереми Авенту, Майклу Комптону, Камилю Кишилу, Марку Харитонову, Гийому Оуберту, Иану Барберу, Майклу Шеридану, Фаруку Акгулу, Олегу Сидорову, Леву Гивону, Аллистеру МакЛоуду, Александру Д'Арчанжелю, Андреасу Хольцвиммеру, Хану Холлу, Роберту Джакабоски, Фелипе Крузу, Маркусу МакКурди, Михаилу Кулемину, доктору Герго Эрди, Павлу Жукову, Александру Элсе, Джованни Руггьеро, Рик "ТехноВини" и Дэниелу Лундину за их вклад, а также Статису Сидерису за Ditaa.

Пожалуйста, используйте систему отслеживания проблем для всех комментариев и исправлений. Эта версия охватывает последний стабильный выпуск 0.0MQ и была опубликована 10 октября 2011 года.

Руководство представлено главным образом на C, но также доступно на PHP и Lua.---

Эта работа лицензирована под лицензией Creative Commons Attribution-ShareAlike 3.0.

Глава 1. Основы ZeroMQ

Спасение мираКак объяснить ZMQ? Некоторые люди начинают с перечисления всех преимуществ ZMQ: это набор компонентов для быстрого создания сокетов; его почтовая система обладает невероятной способностью маршрутизировать; он слишком быстр! Другие предпочитают делиться моментами просветления, когда ZMQ внезапно сделал всё понятным и очевидным. Другие сравнивают ZMQ с другими продуктами: он меньше, проще, но при этом кажется знакомым. Для меня лично, я предпочитаю рассказывать историю создания ZMQ, уверен, что она будет близка к сердцу читателей.

Программирование — это наука, но она часто маскируется как искусство. Мы никогда не углубляемся в самые базовые механизмы программного обеспечения, или, возможно, никто этого просто не делает. Программное обеспечение не сводится только к алгоритмам, данным, языкам программирования или абстракциям; все это лишь инструменты, которые мы создаем, используем и в конечном итоге отбрасываем. На самом деле, суть программного обеспечения заключается в человеческой сути. Например, когда мы сталкиваемся с очень сложной проблемой, мы объединяем усилия, делаем специализированные разделы работы и разделяем проблему на несколько частей, чтобы решить её вместе. Это демонстрирует научную сторону программирования: создание группы небольших модулей, которые легко понять и использовать, что позволяет людям работать вместе над решением проблемы.Мы живём в мире, где всё взаимосвязано, и нам нужны современные программные средства, чтобы направлять нас. Поэтому будущие модули для обработки больших вычислений должны быть взаимосвязанными и способными к параллельной работе. В этом случае программы больше не смогут существовать в изоляции; они должны общаться друг с другом, становиться более "разговорчивыми". Программы должны работать как человеческий мозг: миллиарды нейронов передают сигналы быстро и эффективно в среде без центрального контроля, без точки отказа, чтобы решать задачи. Это не должно удивлять, так как современная сеть работает аналогично: каждый узел подключен к сети, которая работает как человеческий мозг.

Если вы имели дело с потоками, протоколами или сетями, вы можете подумать, что это звучит как бред. В реальном применении соединение нескольких программ или сетей уже является сложной задачей. Миллиарды узлов? Это просто невозможно представить. В настоящее время только крупные, богатые компании могут позволить себе такие программы и услуги.Сетевые структуры современного мира значительно превышают наши возможности управления. Восьмидесятые годы прошлого века были временем программной катастрофы, о которой писал Фред Брукс, говоря, что нет "волшебного средства". Затем свободное и открытое программное обеспечение решило эту программную катастрофу, позволяя нам эффективно делиться знаниями. Сегодня мы снова сталкиваемся с программной катастрофой, но об этом говорят гораздо меньше. Только крупные, богатые компании могут позволить себе создание сильно связанных приложений. Там есть облака, но они частные. Наша информация и знания исчезают с наших личных компьютеров и переходят в облака, где они недоступны или труднодоступны для конкуренции. Кто владеет нашими социальными сетями? Это действительно похоже на новую эру главных компьютеров.Не говоря уже о политических аспектах, которые заслуживают отдельной книги. В настоящий момент, хотя интернет позволяет миллионам программ связываться, большинство из нас не может этого сделать. Таким образом, действительно интересные большие проблемы (например, в области здравоохранения, образования, экономики, транспорта) остаются нерешёнными. Мы не можем соединить код таким образом, чтобы он мог обрабатывать большие проблемы, как это делает человеческий мозг. Уже кто-то пытался подключать приложения различными способами, используя тысячи спецификаций IETF, каждая из которых решает конкретную проблему. Для разработчиков HTTP-протокол является простым и удобным в использовании, но это также часто приводит к ухудшению ситуации, так как он способствует формированию подхода, основывающегося на серверной стороне, а не на клиентской.

Поэтому до сих пор люди продолжают использовать первоначальные протоколы TCP/UDP, частные протоколы, HTTP-протоколы и сетевые сокеты для подключения приложений. Такой подход всё ещё вызывает большие проблемы, поскольку он медленный, трудно масштабируемый и требует централизованного управления. Распределённые P2P-протоколы подходят только для развлечений, а не для реальных приложений. Кто бы использовал Skype или BitTorrent для обмена данными?Это заставляет нас вернуться к вопросам программной науки. Чтобы спасти этот мир, нам нужно сделать две вещи: во-первых, найти способ подключения любого двух приложений откуда угодно; во-вторых, упаковать это решение в максимально простую форму для использования разработчиками.

Может быть, это звучит слишком просто, но это действительно так.

Введение в ZMQ

ZMQ (ØMQ, ZeroMQ, 0MQ) выглядит как набор встроенных сетевых библиотек, но работает как концептуальная архитектура параллельных вычислений. Он предоставляет сокеты, которые могут передавать сообщения через различные протоколы, такие как потоки, процессы, TCP, широковещательные сети и т. д. Вы можете использовать сокеты для создания различных моделей соединений, таких как распределение, публикация-подписка, распределение задач, запрос-ответ и т. д. Быстродействие ZMQ позволяет ему выполнять задачи кластеризации. Его асинхронная модель ввода-вывода позволяет создавать многопоточные приложения для выполнения асинхронной обработки сообщений. ZMQ поддерживает множество языков программирования и может работать практически на всех операционных системах. ZMQ — продукт компании iMatix, распространяемый под лицензией LGPL.

Необходимые знания* Использование самой последней стабильной версии ZMQ;

  • Использование операционной системы Linux или аналогичной;
  • Способность читать код на языке C, который используется по умолчанию в примерах этого руководства;
  • Когда мы пишем константы, такие как PUSH или SUBSCRIBE, вы должны знать, где находятся соответствующие реализации этих констант на вашем языке, например ZMQ_PUSH или ZMQ_SUBSCRIBE.### Получение примеров

Все примеры этого руководства хранятся в репозитории GitHub. Самый простой способ получить доступ к ним — выполнить следующий код:

git clone git://github.com/imatix/zguide.git

Обзор каталога examples позволит вам увидеть реализации на различных языках. Если вы заметили отсутствие реализации на каком-либо из языков, которые вы используете, мы будем рады, если вы добавите её. Это делает это руководство полезным, и мы благодарим всех, кто вносил свой вклад. Все примеры кода распространяются под лицензией MIT/X11, если в исходном коде не указано иное.

Вопрос-ответ

Давайте начнем с простого кода, с традиционной программы "Hello World". Мы создадим клиент и сервер, где клиент отправляет "Hello" серверу, а сервер отвечает "World". Ниже приведен код сервера на C, который открывает сокет ZMQ на порту 5555, ожидает запросы и после получения отвечает "World".

hwserver.c: Hello World сервер

//
//  Сервер "Hello World"
//  Привязывает REP сокет к tcp://*:5555
//  Получает "Hello" от клиента и отвечает "World"
//
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>

int main (void)
{
    void *context = zmq_init (1);

    //  Сокет для связи с клиентом
    void *responder = zmq_socket (context, ZMQ_REP);
    zmq_bind (responder, "tcp://*:5555");

    while (1) {
        //  Ожидаем запрос от клиента
        zmq_msg_t request;
        zmq_msg_init (&request);
        zmq_recv (responder, &request, 0);
        printf ("Получено Hello\n");
        zmq_msg_close (&request);
    }
}
```        // Выполняем "обработку"
        sleep(1);

        // Отправляем ответ
        zmq_msg_t reply;
        zmq_msg_init_size(&reply, 5);
        memcpy(zmq_msg_data(&reply), "World", 5);
        zmq_send(responder, &reply, 0);
        zmq_msg_close(&reply);
    }
    // Программа не достигнет этой точки, но это показывает, как следует завершить работу
    zmq_close(responder);
    zmq_term(context);
    return 0;
}

1

Использование REQ-REP сокетов для отправки и приема сообщений требует соблюдения определенного порядка. Клиент сначала использует zmq_send() для отправки сообщения, затем использует zmq_recv() для получения ответа, и так далее. Если этот порядок нарушается (например, отправка двух сообщений подряд), произойдет ошибка. Аналогично, сервер должен сначала принимать запрос, а затем отправлять ответ.

ZMQ использует C язык как язык справочного руководства, и данное руководство также использует его для примеров программ. Если вы читаете онлайн версию этого руководства, вы можете заметить реализацию примеров кода на других языках. Например, вот реализация на C++:hwserver.cpp: Пример сервера "Hello World"```cpp // // Пример серверной реализации на C++ // Привязка REP сокета к tcp://*:5555 // Получение сообщения Hello от клиента и ответ World // #include <zmq.hpp> #include #include #include <unistd.h>

int main () { // Подготовка контекста и сокета zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REP); socket.bind ("tcp://*:5555");

while (true) {
    zmq::message_t request;

    // Ожидание запроса от клиента
    socket.recv (&request);
    std::cout << "Получено сообщение Hello" << std::endl;

    // Выполнение "обработки"
    usleep(1000000);

    // Ответ World
    zmq::message_t reply (5);
    memcpy ((void *) reply.data (), "World", 5);
    socket.send (reply);
}
return 0;

}


```php
<?php
/**
 * Сервер "Hello World"
 * Привязка REP сокета к tcp://*:5555
 * Получение "Hello" от клиента и ответ "World"
 * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
 */

$context = new ZMQContext(1);

// Сокет для взаимодействия с клиентом
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");

while (true) {
    // Ожидание запроса от клиента
    $request = $responder->recv();
    printf ("Получен запрос: [%s]\n", $request);

    // Обработка
    sleep (1);

    // Ответ "World"
    $responder->send("World");
}
?>

Ниже приведён код клиента:

hwclient: Клиент "Hello World" на C

//
//  Клиент "Hello World"
//  Подключение REQ сокета к tcp://localhost:5555
//  Отправка "Hello" серверу и получение "World"
//
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

int main (void)
{
    void *context = zmq_init (1);

    // Сокет для подключения к серверу
    printf ("Подключение к серверу 'Hello World'...\n");
    void *requester = zmq_socket (context, ZMQ_REQ);
    zmq_connect (requester, "tcp://localhost:5555");

    int request_nbr;
    for (request_nbr = 0; request_nbr != 10; request_nbr++) {
        zmq_msg_t request;
        zmq_msg_init_size (&request, 5);
        memcpy (zmq_msg_data (&request), "Hello", 5);
        printf ("Отправка Hello %d...\n", request_nbr);
        zmq_send (requester, &request, 0);
        zmq_msg_close (&request);

        zmq_msg_t reply;
        zmq_msg_init (&reply);
        zmq_recv (requester, &reply, 0);
        printf ("Получено World %d\n", request_nbr);
        zmq_msg_close (&reply);
    }
    zmq_close (requester);
    zmq_term (context);
    return 0;
}

Видите ли, это выглядит слишком просто? ZMQ — это такой инструмент, который, добавив немного "ингредиентов", можно использовать для создания мощного "оружия", способного "спасти мир"!2

Теоретически вы можете подключить миллионы клиентов к этому серверу, и всё будет работать без проблем. Вы можете попробовать запустить клиент до запуска сервера и увидеть, что программа всё равно будет работать корректно. Подумайте, что это означает. Давайте кратко рассмотрим, что делают эти два отрезка программы. Сначала они создают контекст ZMQ, а затем сокет. Не позволяйте незнакомым терминам пугать вас, мы всё объясним позже. Сервер привязывает REP-сокет к порту Yö 5555 и начинает ждать запросы, отвечать на них и повторять этот цикл. Клиент отправляет запросы и ожидает ответы от сервера.

За этими кодами скрывается множество действий, но программисту не нужно беспокоиться об этом, достаточно знать, что эти коды короткие, эффективные и редко содержат ошибки. Такой способ связи называется режимом запрос-ответ, это одна из самых простых и прямых реализаций ZMQ. Вы можете сравнить его с RPC и традиционной моделью C/S.

О строках

ZMQ не заботится о содержимом сообщений, ему важно знать только их размер в байтах. Поэтому программисту нужно убедиться, что получатель может правильно прочитать эти сообщения. Как преобразовать объект или сложный тип данных в сообщение, которое можно отправить через ZMQ? Для этого существуют сериализаторы, такие как Protocol Buffers. Однако, даже для строк нужно быть внимательным.В C строки заканчиваются нулевым байтом. Вы можете отправить полную строку следующим образом:

zmq_msg_init_data (&request, "Привет", 7, NULL, NULL);

Однако, если вы отправите эту строку на другом языке, она может не содержать этот нулевой байт. Например, если вы используете Python:

socket.send ("Привет")

На самом деле, отправленное сообщение будет таким:

3

Если вы прочитаете это сообщение на C, вы получите что-то похожее на строку, возможно, даже строку (нулевой байт находится на шестом месте в памяти), но это не совсем верно. Таким образом, клиент и сервер имеют разные представления о строках, что может привести к странным результатам.

Когда вы получаете строку на C из ZMQ, вы не можете быть уверены, что она завершена нулевым байтом. Поэтому при получении строки следует создать буфер на один байт больше, поместить строку в него и добавить завершающий нулевой байт.

Поэтому давайте сделаем следующее предположение: строки ZMQ имеют фиксированную длину и не содержат завершающего нулевого байта. В самом простом случае, строка ZMQ эквивалентна одному фрейму в сообщении ZMQ, как показано на рисунке выше, представляющем собой пару длины и последовательности байтов. Ниже представлен функциональный метод, который поможет нам правильно принимать строковые сообщения в C:```c // Принимает строку из ZMQ сокета и преобразует её в строку C static char * s_recv (void *socket) { zmq_msg_t message; zmq_msg_init (&message); zmq_recv (socket, &message, 0); int size = zmq_msg_size (&message); char *string = malloc (size + 1); memcpy (string, zmq_msg_data (&message), size); zmq_msg_close (&message); string[size] = 0; return (string); }


Этот код мы будем использовать в последующих примерах и можем написать метод `s_send()`, а затем упаковать его в файл `.h` для использования.

Таким образом, появляется `zhelpers.h` — библиотека функций ZMQ для C. Код этой библиотеки довольно длинный и полезен только для разработчиков C. Вы можете ознакомиться с ней в свободное время [здесь](https://github.com/imatix/zguide/blob/master/examples/C/zhelpers.h).

### Получение версии

У ZMQ есть несколько версий, и он продолжает развиваться. Если вы столкнулись с проблемой, возможно, она уже решена в следующей версии. Чтобы узнать текущую версию ZMQ, вы можете запустить следующий код в вашем приложении:

**Отчет о версии ØMQ в C**

```c
//
// Возвращает текущую версию ØMQ
//
#include "zhelpers.h"

int main (void)
{
    int major, minor, patch;
    zmq_version (&major, &minor, &patch);
    printf ("Текущая версия ØMQ: %d.%d.%d\n", major, minor, patch);

    return EXIT_SUCCESS;
}

Движение сообщенийВторой классический шаблон сообщений — это одностороннее распространение данных: сервер отправляет события обновления группе клиентов. Давайте рассмотрим пример погодной информации, включающей почтовый индекс, температуру и относительную влажность. Мы генерируем эти случайные данные для моделирования работы метеостанции.Вот код сервера, использующий порт 5556:

wuserver: Сервер обновления погоды на C```c // // Услуга обновления метеорологической информации // Привязка PUB сокета к tcp://*:5556 // Публикация случайной метеорологической информации // #include "zhelpers.h"

int main (void) { // Подготовка контекста и PUB сокета void *context = zmq_init (1); void publisher = zmq_socket (context, ZMQ_PUB); zmq_bind (publisher, "tcp://:5556"); zmq_bind (publisher, "ipc://weather.ipc");

//  Инициализация генератора случайных чисел
srandom ((unsigned) time (NULL));
while (1) {
    //  Генерация данных
    int zipcode, temperature, relhumidity;
    zipcode     = randof (100000);
    temperature = randof (215) - 80;
    relhumidity = randof (50) + 10;

    //  Отправка сообщения всем подписчикам
    char update [20];
    sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
    s_send (publisher, update);
}
zmq_close (publisher);
zmq_term (context);
return 0;

}


![4](https://github.com/anjuke/zguide-cn/raw/master/images/chapter1_4.png)

Ниже приведён клиентский программный код, который принимает сообщения от публикатора и обрабатывает только те, которые помечены определённым почтовым индексом, например, для Нью-Йорка это индекс 10001:

**wuclient: Программа клиента для получения погодных обновлений на C**

```c
//
//  Клиент для получения погодной информации
//  Подключает SUB сокет к tcp://*:5556
//  Собирает погодную информацию для определённого почтового индекса и вычисляет среднюю температуру
//
#include "zhelpers.h"
``````c
int main (int argc, char *argv [])
{
    void *context = zmq_init (1);

    // Создаём сокет для подключения к серверу
    printf ("Сбор погодной информации...\n");
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5556");

    // Устанавливаем фильтр, по умолчанию для Нью-Йорка, почтовый индекс 10001
    char *filter = (argc > 1) ? argv [1] : "10001";
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));

    // Обрабатываем 100 обновлений
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {
        char *string = s_recv (subscriber);
        int zipcode, temperature, relhumidity;
        sscanf (string, "%d %d %d", &zipcode, &temperature, &relhumidity);
        total_temp += temperature;
        free (string);
    }
    printf ("Средняя температура для почтового индекса '%s' составляет %dF\n",
        filter, (int) (total_temp / update_nbr));

    zmq_close (subscriber);
    zmq_term (context);
    return 0;
}

Важно отметить, что при использовании SUB сокета необходимо использовать метод zmq_setsockopt() для установки фильтра. Если вы не установите фильтр, то ничего не получите, что часто является ошибкой новичков. Фильтр может быть любой строкой и может быть установлено несколько раз. SUB сокет получит сообщение, если оно соответствует хотя бы одному из установленных фильтров. Подписчики могут выбрать, какие сообщения не получать, используя также метод zmq_setsockopt(). ```Комбинация PUB-SUB сокетов асинхронна. Клиент использует zmq_recv() в цикле для получения сообщений, и если отправить сообщение на SUB сокет, то произойдёт ошибка; аналогично, сервер может постоянно использовать `zmq_send()` для отправки сообщений, но не может использовать `zmq_recv()` на PUB сокете.Ещё одна важная особенность PUB-SUB сокетов заключается в том, что вы не можете знать, когда SUB начинает получать сообщения. Даже если вы открываете SUB сокет раньше, чем начинаете отправлять сообщения на PUB, SUB всё равно будет пропускать некоторые сообщения, так как время на установление соединения есть. Часто, но не всегда. Эта "медленная связь" может сбить с толку многих людей, поэтому я подробно объясню это здесь. Вспомните, что ZMQ выполняет асинхронную передачу I/O в фоновом режиме. Если у вас есть два узла, соединённых в следующем порядке:

  • Подписчик подключается к конечной точке для получения сообщений и подсчёта;
  • Публикатор привязывается к конечной точке и сразу отправляет 1000 сообщений.

Результат выполнения может заключаться в том, что подписчику не придут никакие сообщения. Это может вызвать удивление и заставить вас проверять, правильно ли настроены подписки, и пробовать снова, но результат будет таким же.

Мы знаем, что для установления TCP-соединения требуется три рукопожатия, что занимает несколько миллисекунд, и этот временной интервал увеличивается с увеличением количества узлов. В течение этого короткого времени ZMQ может отправить очень много сообщений. Например, если установка соединения занимает 5 миллисекунд, то ZMQ может отправить все 1000 сообщений за 1 миллисекунду.В главе 2 я объясню, как синхронизировать публикатора и подписчика таким образом, чтобы публикатор начинал отправлять сообщения только после того, как подписчик будет готов. Есть простой способ синхронизации PUB и SUB — это задержка отправки сообщений публикатором. Я не рекомендую использовать этот метод в реальном программировании, так как он слишком хрупкий и сложен для управления. Однако мы временно используем sleep для решения этой проблемы, а в главе 2 рассмотрим правильный подход.

Другой способ синхронизации состоит в том, что поток сообщений от публикатора считается бесконечным, поэтому потеря первых нескольких сообщений не имеет значения. Наш клиент для метеорологических данных работает именно так.

Пример клиента для метеорологических данных собирает 1000 сообщений для определенного почтового индекса, среди которых около 10 миллионов сообщений будут отправлены. Вы можете запустить клиент до запуска сервера, работать некоторое время, а затем перезапустить сервер, и клиент продолжит работать нормально. После сбора всех необходимых данных клиент вычисляет и выводит среднюю температуру.

Несколько замечаний о модели публикации-подписки:* Подписчики могут подключаться к нескольким публикаторам и получать сообщения последовательно;

  • Если у публикатора нет подписчиков, его сообщения будут просто игнорированы;
  • Если вы используете протокол TCP, сообщения будут накапливаться на стороне публикатора, если скорость обработки подписчиком недостаточна. Мы обсудим использование порогового значения (HWM) для защиты публикатора.
  • В текущей версии ZMQ фильтрация сообщений происходит на стороне подписчика. То есть, публикатор отправляет все сообщения подписчику, который затем отбрасывает те сообщения, которые не были подписаны. Я попробовал отправить 10 миллионов сообщений на своей четырёхъядерной машине, и это было достаточно быстро, но ничего особенного:``` ph@ws200901:~/work/git/0MQGuide/examples/c$ time wuclient Сбор данных с сервера погоды... Средняя температура для почтового индекса '10001' составила 18F

real 0m5.939s user 0m1.590s sys 0m2.290s


### Распределенное обработание

В следующем примере программы мы будем использовать ZMQ для супервычислений, то есть параллельной обработки модели:

* Диспетчер задач будет создавать множество задач, которые можно обрабатывать параллельно;
* Есть группа worker, которые будут обрабатывать эти задачи;
* Агрегатор результатов будет принимать все результаты от worker и суммировать их.

На практике worker могут быть рассредоточены на разных машинан и использовать GPU (графический процессор) для выполнения сложных вычислений. Ниже приведен код диспетчера задач, который создает 100 задач, каждая из которых требует от получившего ее worker задержки на несколько миллисекунд.

**taskvent: Параллельный диспетчер задач на C**

```c
//
//  Диспетчер задач
//  Привязывает PUSH сокет к tcp://localhost:5557
//  Отправляет группу задач связанным worker
//
#include "zhelpers.h"

int main (void)
{
    void *context = zmq_init (1);

    //  Сокет для отправки сообщений
    void *sender = zmq_socket (context, ZMQ_PUSH);
    zmq_bind (sender, "tcp://*:5557");

    //  Сокет для отправки сигнала начала
    void *sink = zmq_socket (context, ZMQ_PUSH);
    zmq_connect (sink, "tcp://localhost:5558");

    printf ("Нажмите Enter после подготовки worker: ");
    getchar ();
    printf ("Отправка задач на worker...\n");
}
```    // Отправка сигнала начала
    s_send(sink, "0");

    // Инициализация генератора случайных чисел
    srandom((unsigned)time(NULL));

    // Отправка 100 задач
    int task_nbr;
    int total_msec = 0;     // Ожидаемое время выполнения (мс)
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {
        int workload;
        // Генерация случайного времени работы от 1 до 100 мс
        workload = randof(100) + 1;
        total_msec += workload;
        char string[10];
        sprintf(string, "%d", workload);
        s_send(sender, string);
    }
    printf("Ожидаемое время выполнения: %d мс\n", total_msec);
    sleep(1);              // Задержка для завершения отправки задач

    zmq_close(sink);
    zmq_close(sender);
    zmq_term(context);
    return 0;
}
```![5](https://github.com/anjuke/zguide-cn/raw/master/images/chapter1_5.png)

Вот код worker'а, который принимает сообщение, задерживает выполнение на указанное количество миллисекунд и отправляет сигнал о завершении:

**taskwork: Параллельный рабочий процесс в C**

```c
//
// Выполняющий процесс
// Подключаем PULL сокет к tcp://localhost:5557
// Получаем задачи от диспетчера задач
// Подключаем PUSH сокет к tcp://localhost:5558
// Отправляем результаты сборщику
//
#include "zhelpers.h"

int main(void) 
{
    void *context = zmq_init(1);

    // Сокет для получения задач
    void *receiver = zmq_socket(context, ZMQ_PULL);
    zmq_connect(receiver, "tcp://localhost:5557");

    // Сокет для отправки результатов
    void *sender = zmq_socket(context, ZMQ_PUSH);
    zmq_connect(sender, "tcp://localhost:5558");

    // Цикл обработки задач
    while (1) {
        char *string = s_recv(receiver);
        // Выводим прогресс выполнения
        fflush(stdout);
        printf("%s.", string);
}        //  Начинаем выполнение
        s_sleep(atoi(string));
        free(string);

        //  Отправляем результат
        s_send(sender, "");
    }
    zmq_close(receiver);
    zmq_close(sender);
    zmq_term(context);
    return 0;
}

Вот код сборщика результатов. Он собирает 100 результатов выполнения и вычисляет общее время выполнения, чтобы определить, были ли задачи выполнены параллельно.

tasksink: Параллельный сборщик задач в C

//
//  Сборщик задач
//  Привязываем PULL сокет к tcp://localhost:5558
//  Собираем результаты выполнения от worker'а
//
#include "zhelpers.h"

int main (void) 
{
    //  Подготавливаем контекст и сокет
    void *context = zmq_init (1);
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_bind (receiver, "tcp://*:5558");

    //  Ждем начального сигнала
    char *string = s_recv (receiver);
    free (string);

    //  Начинаем отсчет времени
    int64_t start_time = s_clock ();

    //  Ожидаем завершения 100 задач
    int task_nbr;
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {
        char *string = s_recv (receiver);
        free (string);
        if ((task_nbr / 10) * 10 == task_nbr)
            printf (":");
        else
            printf (".");
        fflush (stdout);
    }

    //  Вычисляем и выводим общее время выполнения
    printf ("Общее время выполнения: %d мс\n", 
        (int) (s_clock () - start_time));

    zmq_close (receiver);
    zmq_term (context);
    return 0;
}

Среднее время выполнения одной задачи составляет около 5 секунд. Ниже приведены результаты выполнения при запуске одного, двух и четырёх рабочих процессов:

#   1 рабочий процесс
Общее время выполнения: 5034 мс
#   2 рабочих процесса
Общее время выполнения: 2421 мс
#   4 рабочих процесса
Общее время выполнения: 1018 мс
```Несколько деталей относительно этого кода:

* Рабочие процессы соединены с задачами распределителями сверху и собирают результаты снизу. Это позволяет запускать любое количество рабочих процессов. Однако если рабочие процессы привязаны к конечной точке, а не подключены к ней, нам потребуется больше конечных точек и настройка задач распределителей и сборщиков результатов. Таким образом, задачи распределители и сборщики результатов являются наиболее стабильной частью этой сети, поэтому они должны быть привязаны к конечной точке, а не рабочие процессы, так как последние более динамичны.

* Нам нужно выполнить синхронизацию, чтобы ждать полного запуска всех рабочих процессов перед распределением задач. Это важно в ZMQ и сложно решить. Подключение сокетов занимает некоторое время, поэтому когда первый рабочий процесс успешно подключается, он сразу получает множество задач. Поэтому, если мы не выполним синхронизацию, эти задачи просто не будут выполняться параллельно. Вы можете проверить это самостоятельно.

* Распределитель задач использует PUSH сокет для равномерного распределения задач среди рабочих процессов (предполагая, что все рабочие процессы уже подключены), этот механизм называется _балансировка нагрузки_, и вы встретите его снова и снова.* Сборщик результатов использует PULL сокет для равномерного получения сообщений от рабочих процессов; этот механизм называется _fair queueing_:

![6](https://github.com/anjuke/zguide-cn/raw/master/images/chapter1_6.png)

Пайплайн также может столкнуться с медленными подключениями, что может ввести в заблуждение, будто PUSH сокет не выполняет балансировки нагрузки. Если ваш рабочий процесс получил больше запросов, это потому, что его PULL сокет подключился быстрее, чем остальные, и он получил дополнительные сообщения до того, как остальные рабочие процессы подключились.

### Программирование с использованием ZMQ

После просмотра этих примеров программ вы, вероятно, захотите немедленно начать программировать с использованием ZMQ. Но прежде чем начать, есть несколько советов, которые помогут вам избежать проблем в будущем:

* Изучайте ZMQ постепенно. Хотя это всего лишь API, он предлагает бесконечные возможности. Учитесь постепенно, полностью осваивая каждую функцию.* Пишите красивый код. Уродливый код скрывает проблемы и делает трудным оказание помощи вам. Например, вы можете привыкнуть использовать бесполезные имена переменных, но те, кто читает ваш код, не знают об этом. Используйте значимые имена переменных, а не случайные. Отступы должны быть единообразными, а структура — четкой. Красивый код делает ваш мир лучше.* Пишите и тестируйте одновременно, чтобы быстро локализовать проблемные строки кода. Это особенно важно при написании приложений на ZMQ, так как часто невозможно сразу написать правильный код.* Когда вы обнаруживаете, что ваш код не работает должным образом, разбейте его на несколько фрагментов и проверьте, какой из них выполняется неправильно. ZMQ позволяет создавать очень модульный код, поэтому следует использовать это преимущество.

* При необходимости используйте абстрактные методы для написания программ (классы, члены функций и т. д.), не копируйте код произвольно, так как вместе с кодом вы можете скопировать и ошибки.

Рассмотрим следующий фрагмент кода, который мне предложил коллега для доработки:```c
// Внимание: не используйте этот код!
static char *topic_str = "msg.x|";
 
void* pub_worker(void* arg){
    void *ctx = arg;
    assert(ctx);
 
    void *qskt = zmq_socket(ctx, ZMQ_REP);
    assert(qskt);
 
    int rc = zmq_connect(qskt, "inproc://querys");
    assert(rc == 0);
 
    void *pubskt = zmq_socket(ctx, ZMQ_PUB);
    assert(pubskt);
 
    rc = zmq_bind(pubskt, "inproc://publish");
    assert(rc == 0);
 
    uint8_t cmd;
    uint32_t nb;
    zmq_msg_t topic_msg, cmd_msg, nb_msg, resp_msg;
 
    zmq_msg_init_data(&topic_msg, topic_str, strlen(topic_str), NULL, NULL);
 
    fprintf(stdout,"WORKER: готов к приему сообщений\n");
    // Внимание: не используйте этот код, он не работает!
    // Например, topic_msg будет некорректным во второй раз
    while (1){
        zmq_send(pubskt, &topic_msg, ZMQ_SNDMORE);
 
        zmq_msg_init(&cmd_msg);
        zmq_recv(qskt, &cmd_msg, 0);
        memcpy(&cmd, zmq_msg_data(&cmd_msg), sizeof(uint8_t));
        zmq_send(pubskt, &cmd_msg, ZMQ_SNDMORE);
        zmq_msg_close(&cmd_msg);
 
        fprintf(stdout, "получено cmd %u\n", cmd);
 
        zmq_msg_init(&nb_msg);
        zmq_recv(qskt, &nb_msg, 0);
        memcpy(&nb, zmq_msg_data(&nb_msg), sizeof(uint32_t));
        zmq_send(pubskt, &nb_msg, 0);
        zmq_msg_close(&nb_msg);
 
        fprintf(stdout, "получено nb %u\n", nb);
 
        zmq_msg_init_size(&resp_msg, sizeof(uint8_t));
        memset(zmq_msg_data(&resp_msg), 0, sizeof(uint8_t));
        zmq_send(qskt, &resp_msg, 0);
        zmq_msg_close(&resp_msg);
 
    }
    return NULL;
}
```Вот мой вариант переписанного кода, в котором исправлены некоторые ошибки:

```c
static void *
worker_thread (void *arg) {
    void *context = arg;
    void *worker = zmq_socket (context, ZMQ_REP);
    assert (worker);
    int rc;
    rc = zmq_connect (worker, "ipc://worker");
    assert (rc == 0);

    void *broadcast = zmq_socket (context, ZMQ_PUB);
    assert (broadcast);
    rc = zmq_bind (broadcast, "ipc://publish");
    assert (rc == 0);

    while (1) {
        char *part1 = s_recv (worker);
        char *part2 = s_recv (worker);
        printf ("Рабочий получил [%s][%s]\n", part1, part2);
        s_sendmore (broadcast, "msg");
        s_sendmore (broadcast, part1);
        s_send (broadcast, part2);
        free (part1);
        free (part2);

        s_send (worker, "OK");
    }
    return NULL;
}

В конце вышеупомянутого сегмента программы сокет передается между двумя потоками, что приводит к непредсказуемым проблемам. Такое поведение в ZMQ 2.1 является легальным, но не рекомендуется.

Версия ZMQ 2.1

История учит нас, что ZMQ 2.0 был распределенной системой сообщений с низкой задержкой, которая выделялась среди множества аналогичных программ, отказываясь от различных излишеств и провозглашая лозунг "без границ". Это была стабильная версия, которую мы использовали все это время.

Со временем все меняется. То, что было популярно в 2010 году, может уже не быть актуальным в 2011 году. Когда разработчики ZMQ и сообщество активно обсуждали различные проблемы ZMQ, вышла версия ZMQ 2.1, ставшая новой стабильной версией.Это руководство в основном ориентировано на описание ZMQ 2.1, поэтому для разработчиков, переходящих с ZMQ 2.0, следует обратить внимание на несколько моментов:* В 2.0 вызовы zmq_close() и zmq_term() приводили к тому, что все непереданные сообщения были бы废弃。Таким образом, после отправки всех сообщений нельзя было сразу закрыть программу; в примерах для 2.0 обычно использовался sleep(1), чтобы избежать этой проблемы. Однако в 2.1 это не требуется, программа будет завершена только после отправки всех сообщений.

  • Вместо этого, в 2.0 можно было вызвать zmq_term(), даже если некоторые сокеты были открыты, что стало не безопасным в 2.1 и могло привести к блокировке программы. Поэтому в программах для 2.1 мы сначала закрываем все сокеты, прежде чем завершить работу программы. Если есть непереданные сообщения в сокетах, программа будет продолжать ждать, пока не будут установлены параметры LINGER для сокета (например, установка в ноль), тогда сокет будет закрыт через соответствующее время.
int zero = 0;
zmq_setsockopt (mysocket, ZMQ_LINGER, &zero, sizeof (zero));
  • В 2.0 функция zmq_poll() не имела функции таймера и возвращалась немедленно при выполнении условий. Для проверки оставшегося времени требовалось делать проверку внутри цикла. В 2.1 функция zmq_poll() возвращает управление через указанное время, поэтому она может использоваться как таймер.

  • В 2.0 библиотека ZMQ игнорировала сигналы прерывания операционной системы, что означало, что вызовы libzmq не получали сообщений EINTR, и не могли обрабатывать сигналы, такие как SIGINT (Ctrl-C). В 2.1 эта проблема была решена, и методы, такие как zmq_recv(), получают и возвращают сообщения EINTR от операционной системы.### Правильное использование контекста

Запуск ZMQ-приложения начинается с создания контекста, который затем используется для создания сокетов. В C-языке программирования функция для создания контекста называется zmq_init(). В одном процессе следует создать только один контекст. С точки зрения реализации, контекст является контейнером, содержащим все сокеты в данном процессе и предоставляющим реализацию протокола inproc для быстрого соединения различных потоков внутри процесса. Если в одном процессе создаются два контекста, это эквивалентно запуску двух экземпляров ZMQ. Если это именно то, что вам нужно, это нормально, но обычно это не требуется:

Используйте функцию zmq_init() для создания контекста в процессе и функцию zmq_term() для его закрытия при завершенииЕсли вы используете системный вызов fork(), каждый процесс должен иметь свой собственный объект контекста. Если функция zmq_init() была вызвана до вызова fork(), каждый дочерний процесс будет иметь свой собственный объект контекста. Обычно вам потребуется сделать что-то интересное в дочернем процессе, а родительский процесс будет управлять ими.

Правильное завершение и очистка

Хорошей привычкой программиста является выполнение очистки при завершении работы. Когда вы пишете приложение ZMQ на языках, таких как Python, система автоматически выполняет очистку за вас. Однако если вы используете язык C, вам следует быть осторожным, иначе могут возникнуть утечки памяти или проблемы с устойчивостью приложения.

Утечка памяти — это лишь одна из проблем, но ZMQ действительно заботится о том, как программа завершает работу. Причины этого сложны, но кратко можно сказать следующее: если какие-либо сокеты остаются открытыми, вызов функции zmq_term() может привести к зависанию программы; даже если все сокеты закрыты, если есть незавершенные сообщения для отправки, вызов функции zmq_term() также может привести к зависанию программы. Только когда опция LINGER для сокета установлена в 0, можно избежать этих проблем.

Нам следует обращать внимание на такие объекты ZMQ, как сообщения, сокеты и контексты. К счастью, их немного, по крайней мере, в обычных приложениях:* После обработки сообщения обязательно вызывайте функцию zmq_msg_close() для его закрытия;

  • Если вы открываете или закрываете много сокетов одновременно, возможно, стоит перепланировать структуру вашего приложения;
  • При завершении программы сначала закройте все сокеты, а затем вызовите функцию zmq_term() для удаления объекта контекста.

Если вы хотите использовать ZMQ для многопоточной разработки, вам придётся учесть больше вопросов. Мы подробно рассмотрим многопоточную разработку в следующей главе, но если вы не можете дождаться, вот несколько советов по завершению работы:

  • Не используйте один и тот же сокет в нескольких потоках. Не спрашивайте почему, просто не делайте этого.
  • Закройте все сокеты и закройте объект контекста в основном процессе.
  • Если есть ожидающие recv или poll вызовы, поймите эти ошибки в основном процессе и закройте сокеты в соответствующих потоках. Не повторно закрывайте контекст, функция zmq_term() будет ждать, пока все сокеты безопасно закроются.

Да, процесс сложный, поэтому реализаторы API для различных языков могут упаковать эти шаги, чтобы сделать завершение работы программы менее сложной задачей.### Почему нам нужен ZMQ

Теперь, когда мы запустили ZMQ, давайте вспомним, почему нам нужен ZMQ:Современные приложения часто включают компоненты, работающие через сеть, будь то локальная сеть или Интернет. Разработчики этих программ используют различные механизмы передачи сообщений. Некоторые используют продукты очередей сообщений, а большинство предпочитает самостоятельно реализовывать эти механизмы, используя протоколы TCP или UDP. Эти протоколы не сложны в использовании, но простое отправление сообщений от A к B и надёжная передача сообщений в любых условиях — это две совершенно разные вещи.Рассмотрим некоторые типичные проблемы, возникающие при передаче сообщений с использованием чистого протокола TCP. Любой повторно используемый слой передачи сообщений обязательно должен решать следующие вопросы:

  • Как обрабатывать ввод/вывод? Должна ли программа блокироваться в ожидании ответа, или же следует выполнять эту задачу в фоновом режиме? Это ключевой фактор в проектировании программного обеспечения. Блокирующее ввод/вывод может усложнить расширение архитектуры программы, а фоновая обработка ввод/вывода также является сложной задачей.

  • Как обрабатывать временные, непостоянные компоненты? Нужно ли нам делить компоненты на клиентов и серверы, требуя от серверов постоянного присутствия? А если нам нужно соединять серверы? Будем ли мы пытаться заново подключаться каждые несколько секунд?

  • Как представлять сообщение? Как можно разбить сообщение на части, чтобы сделать его легко читаемым и записываемым, избежать переполнения буфера, эффективно передавать маленькие сообщения и обрабатывать большие файлы, такие как видео?

  • Как обрабатывать сообщения, которые не могут быть немедленно отправлены? Например, если нам нужно ждать, пока сетевой компонент не восстановится? Будем ли мы просто игнорировать это сообщение, сохранять его в базе данных или в очереди в памяти?* Где хранить очередь сообщений? Что делать, если какой-то компонент читает очередь сообщений слишком медленно, вызывая накопление сообщений? Какие стратегии использовать?

  • Как обрабатывать потерянные сообщения? Будем ли мы ждать новых данных, просить повторной отправки или требуется создание нового механизма надежности для предотвращения потери сообщений? Что делать, если этот механизм сам по себе выйдет из строя?

  • Если нам нужно изменить сетевое соединение, например, заменить TCP одноадресной связью на широковещательную, или перейти на IPv6? Будем ли мы переписывать все приложения или абстрагировать этот протокол в отдельный слой?

  • Как маршрутизировать сообщения? Можно ли одновременно отправлять сообщения нескольким узлам? Можно ли возвращать ответные сообщения обратно к отправителю запроса? Как нам написать API для другой языковой версии? Нужно ли полностью переписывать протокол или достаточно переупаковать библиотеку?

Как нам передавать сообщения между различными архитектурными структурами? Необходимо ли определять кодировку для сообщений?

Как нам обрабатывать ошибки сетевой связи? Ждать и повторять попытку, игнорировать или отменить операцию?Можно найти открытый исходный код в качестве примера, например Hadoop Zookeeper, и посмотреть исходный код C-библиотеки API, src/c/src/zookeeper.c. Этот код состоит приблизительно из 3200 строк без комментариев и реализует клиент-серверную сеть. Он работает эффективно, используя poll() вместо select(). Однако, Zookeeper следует рассматривать как универсальный уровень обмена сообщениями и подробно прокомментировать. Такие модули должны максимально использоваться, а не создаваться заново.7

Но как нам написать такой перезапускаемый уровень сообщений? Почему люди предпочитают переписывать код управления примитивными TCP-сокетами в своих проектах, вместо создания такого общего пакета?На самом деле, написание универсального уровня сообщений является очень сложной задачей, и поэтому FOSS проекты продолжают пробовать, а коммерческие сообщения продукты становятся все более сложными, дорогими, громоздкими и хрупкими. В 2006 году iMatix разработал протокол AMQP, который предоставил FOSS проектам первый перезапускаемый уровень сообщений. [AMQP] лучше других продуктов, но все еще остается сложным, дорогим и хрупким. Учиться его использовать может занять несколько недель, а создать рабочую архитектуру — несколько месяцев, что может быть слишком поздно. Большинство проектов сообщений систем, таких как AMQP, для решения вышеупомянутых проблем, придумали новые концепции, такие как концепцию "агента", которая включает функции адресации, маршрутизации, очередей и других. В результате был создан C/S протокол и соответствующие API поверх незаметного протокола, что позволяет приложениям и агентам взаимодействовать. Агент действительно является хорошим решением для снижения сложности крупных сетевых структур. Однако применение механизма агента в проектах, таких как Zookeeper, может быть еще хуже, так как это требует добавления нового компьютера и создания нового точки отказа. Агент может постепенно стать новым бутылочным горлышком и стать более рискованным для управления.Если программное обеспечение поддерживает это, можно добавить второй, третий, четвертый агент и создать модель избыточности и отказоустойчивости. Некоторые люди делают именно это, что усложняет архитектуру системы и увеличивает риск. В такой агентоориентированной архитектуре требуется специализированная команда по эксплуатации. Вам придется постоянно наблюдать за состоянием агентов, время от времени корректировать их поведение. Вам потребуется добавлять компьютеры и дополнительные резервные машины, а также иметь специалистов для управления этими машинами. Это имеет смысл только для крупных сетевых приложений, так как они имеют больше модулей для перемещения, несколько команд для разработки и поддержки, и уже были созданы за многие годы.Таким образом, разработчики малых и средних приложений оказываются в затруднительном положении. Они могут попытаться избежать написания сетевых приложений, вместо этого создавая программы, которые не требуют расширения; или они могут использовать примитивные методы сетевой разработки, но созданные программы будут очень хрупкими и сложными, что затрудняет их поддержку; либо они могут выбрать продукт для сообщений, который позволяет создавать расширяемые приложения, но требует значительных затрат. Никакой из этих вариантов не кажется разумным, и это одна из причин, почему система сообщений стала широко распространённой проблемой в прошлом веке.

8

Что нам действительно нужно, так это программное обеспечение для сообщений, которое может делать всё то, что делает крупное программное обеспечение для сообщений, но при этом быть простым в использовании, иметь низкую стоимость и быть применимым ко всем приложениям без каких-либо зависимостей. Без дополнительных модулей вероятность ошибок снижается. Такое программное обеспечение должно работать на всех операционных системах и поддерживать все языки программирования.

ZMQ — это именно такое программное обеспечение: оно эффективно, предоставляет встроенные библиотеки, позволяющие приложениям хорошо масштабироваться в сети, и имеет низкую стоимость.Основные характеристики ZMQ включают:

  • ZMQ асинхронно обрабатывает операции ввода-вывода в фоновом потоке, используя структуру данных, которая не приводит к зацикливанию для хранения сообщений.
  • Компоненты сети могут быть добавлены или удалены в любое время; ZMQ берет на себя задачу автоматического переподключения, что позволяет вам запускать компоненты в любом порядке; при создании сервисной ориентированной архитектуры (SOA) с помощью ZMQ, сервисы могут свободно входить и выходить из сети.
  • В случае необходимости ZMQ автоматически помещает сообщения в очередь для сохранения и начинает отправку сразу после установления соединения.
  • ZMQ имеет механизм порогового значения (High Water Mark, HWM), который предотвращает переполнение очередей сообщений. Когда очередь заполнена, ZMQ автоматически блокирует отправителя или отбрасывает часть сообщений, в зависимости от используемого режима сообщений.
  • ZMQ позволяет использовать различные протоколы связи, такие как TCP, широковещательная сеть, внутрисистемная связь, межпроцессорная связь. При изменении протокола связи вам не требуется менять код.
  • ZMQ правильно обрабатывает медленные узлы, применяя различные стратегии в зависимости от режима сообщений.
  • ZMQ предоставляет несколько режимов маршрутизации сообщений, таких как режим запрос-ответ, режим публикация-подписка и другие.Эти режимы можно использовать для создания топологии сети.
  • В ZMQ можно создавать промежуточные устройства (маленькие компоненты), которые помогают упрощать сеть.
  • ZMQ отправляет полное сообщение, используя механизм фреймов сообщений. Если вы отправите сообщение размером Yöntemleri ağ topolojisi oluşturmak için kullanabilirsiniz.
  • ZMQ, küçük bileşenler olan geçici cihazlar oluşturabilir (küçük bileşenler). Bu, ağları basitleştirmeyi sağlar.
  • ZMQ, mesaj çerçeveleri mekanizması kullanarak tam bir mesaj gönderir. 10KB boyutunda bir mesaj gönderirseniz, aynı boyutta bir mesaj alırsınız.
  • ZMQ, belirli bir mesaj biçimi kullanmaya zorlamaz; mesajlar sıfır uzunlukta olabilir veya gigabaytlar kadar uzun olabilir. Bu mesajları temsil ederken, Google Protocol Buffers veya XDR gibi serileştirme yöntemlerini kullanabilirsiniz.
  • ZMQ, ağ hatalarını akıllıca yönetebilir, bazen tekrar deneme yapabilir, bazen size hata hakkında bilgi verebilir.
  • ZMQ, çevre yükünü azaltabilir, çünkü işlemci zamanı tasarrufu, elektrik enerjisinin tasarrufunu anlamına gelir. Aslında, ZMQ'nin yapabileceği çok daha fazlası var ve görünüşünün aksine, ağ uygulamalarının oluşturulma yaklaşımını tamamen değiştirir. Dışarıda, sadece soketlerle çalışmak için API'ler sunan, zmq_recv() ve zmq_send() işlevleri aracılığıyla mesajları göndermek ve almak için yeterlidir. Ancak, asıl mesaj işleme, uygulamanın ana bileşeni haline gelir. Zamanla, kodunuz, mesaj işleme görevlerini yapan ayrı modüllerden oluşacak ve bu, daha esnek ve doğal hale getirir.

ZMQ отправляет полное сообщение, используя механизм фреймов сообщений. Если вы отправите сообщение размером 10КБ, вы получите сообщение того же размера. ZMQ не требует использования определенного формата сообщений; сообщения могут быть нулевой длины или иметь размер до гигабайтов. При представлении этих сообщений вы можете использовать сериализацию, такую как Google Protocol Buffers или XDR. ZMQ способен умно обрабатывать сетевые ошибки, иногда повторяя попытки, а иногда информируя вас о возникших ошибках. ZMQ даже может снижать нагрузку на окружающую среду, поскольку экономия времени процессора означает экономию электроэнергии. На самом деле ZMQ способен сделать гораздо больше, чем кажется на первый взгляд, он полностью меняет подход к созданию сетевых приложений. Хотя снаружи это выглядит как набор API для работы с сокетами, позволяющий отправлять и получать сообщения с помощью функций zmq_recv() и zmq_send(), на самом деле обработка сообщений становится ключевой частью приложения. Со временем ваш код будет состоять из отдельных модулей, занимающихся обработкой сообщений, что делает его более элегантным и естественным.Кроме того, ZMQ обладает высокой масштабируемостью: каждая задача может выполняться одним узлом (узел — это поток), двумя узлами на одном компьютере (узел — это процесс), двумя машинами в одной сети (узел — это машина), без необходимости изменения самого приложения.### Эластичность сокетов ZMQ

Давайте рассмотрим пример эластичности сокетов ZMQ. Этот скрипт запускает службу метеорологической информации и несколько клиентов:

wuserver &
wuclient 12345 &
wuclient 23456 &
wuclient 34567 &
wuclient 45678 &
wuclient 56789 &

В процессе выполнения можно использовать команду top для просмотра состояния процессов (в данном случае на четырёхъядерной машине):

  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
 7136 ph        20   0 1040m 959m 1156 R  157 12.0  16:25.47 wuserver
 7966 ph        20   0 98608 1804 1372 S   33  0.0   0:03.94 wuclient
 7963 ph        20   0 33116 1748 1372 S   14  0.0   0:00.76 wuclient
 7965 ph        20   0 33116 1784 1372 S    6  0.0   0:00.47 wuclient
 7964 ph        20   0 33116 1788 1372 S    5  0.0   0:00.25 wuclient
 7967 ph        20   0 33072 1740 1372 S    5  0.0   0:00.35 wuclient

Рассмотрим, что происходит: служба метеорологической информации использует один сокет, но способна одновременно отправлять сообщения пяти клиентам параллельно. Можно иметь сотни параллельно работающих клиентов, а сервер даже не видит этих клиентов и не управляет ими.

Как решить проблему утерянных сообщений

При написании приложений на ZMQ самыми распространенными проблемами являются случаи, когда сообщения не могут быть получены. Ниже приведена схема решения этой проблемы, которая охватывает основные причины возникновения ошибок. Не беспокойтесь, если некоторые термины вам пока не знакомы, они будут подробно объяснены в последующих главах.9Если ZMQ играет очень важную роль в вашем приложении, вам может потребоваться тщательно спланировать процесс. Во-первых, создайте прототип для проверки работоспособности вашего设计方案. Примените методы стресс-тестирования, чтобы убедиться в достаточной надежности системы. Во-вторых, сосредоточьтесь на тестировании кода, то есть напишите тестовый фреймворк, обеспечив достаточное энергопотребление и время для проведения интенсивных тестов. В идеальном случае, одна команда должна писать программу, а другая — пытаться её сломать. Наконец, свяжитесь с компанией iMatix вовремя по этой ссылке для получения технической поддержки. Кратко говоря, если у вас нет достаточно веских аргументов, что созданный вами архитектурный дизайн будет работать в реальных условиях, то он может просто-напросто сломаться в самый ответственный момент.

Предупреждение: ваши представления могут быть перевернуты!

设计方案 -> архитектурный дизайнТрадиционное сетевое программирование обычно предполагает, что сокет может поддерживать соединение только с одним узлом. Хотя существуют протоколы широковещательной рассылки, они являются сторонними. Когда мы считаем, что "один сокет = одно соединение", мы используем определённые способы расширения архитектуры приложения: создаём поток для каждой логической части, каждый из которых независимо поддерживает один сокет.Однако в мире ZMQ сокеты являются умными, многопоточными и автоматически поддерживают набор полных соединений. Вы не можете видеть эти соединения и даже не можете манипулировать ими напрямую. Когда вы отправляете и получаете сообщения, выполняете опрос и так далее, вы работаете только с сокетами ZMQ, а не с самыми соединениями. Таким образом, соединения в мире ZMQ являются приватными и недоступными для внешнего доступа, что делает ZMQ легко масштабируемым.

Поскольку ваш код взаимодействует только с определенным сокетом, он может обрабатывать любое количество соединений, используя любой сетевой протокол. А модели сообщений ZMQ позволяют еще более дешевое и удобное масштабирование.

Таким образом, традиционные мысли не применимы в мире ZMQ. Когда вы читаете примеры кода, вы можете попытаться связать эти примеры с традиционным сетевым программированием: когда вы читаете "сокет", вы можете подумать, что это означает соединение с другим узлом — это неверное понимание; когда вы читаете "поток", вы можете подумать, что это означает соединение с другим узлом — это также неверное понимание.Если вы впервые читаете это руководство и использовали ZMQ в течение одного-двух дней (или дольше), вы можете почувствовать некоторое замешательство, как ZMQ может сделать всё так просто. Вы снова пытаетесь использовать свои старые мысли для понимания ZMQ, но безуспешно. В конце концов, вы будете покорены идеями ZMQ, и перед вами откроется новый мир, и вы начнете наслаждаться преимуществами ZMQ. [iMatix]: http://www.imatix.com/ [AMQP]: http://www.amqp.org/

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/andwp-zguide-cn.git
git@api.gitlife.ru:oschina-mirror/andwp-zguide-cn.git
oschina-mirror
andwp-zguide-cn
andwp-zguide-cn
master