1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/andwp-zguide-cn

В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
chapter2.txt 170 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 23.06.2025 22:09 06c6f61
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064
```markdown
--- vim: set filetype=markdown:
.set GIT=https://github.com/anjuke/zguide-cn
## Вторая глава: Дальнейшее изучение ZeroMQ
В первой главе мы попробовали несколько моделей коммуникации ZMQ: запрос-ответ, публикация-подписка и трубопровод. В этой главе мы углубимся в более сложные темы, которые могут пригодиться вам в реальном проекте:
В данной главе рассматриваются следующие темы:
* Создание и использование сокетов ZMQ
* Отправка и получение сообщений через сокеты
* Разработка приложений с использованием асинхронных I/O сокетов ZMQ
* Использование нескольких сокетов в одном потоке
* Правильное обработание критических и некритических ошибок
* Обработка сигналов прерывания, таких как Ctrl-C
* Правильное завершение работы приложения ZMQ
* Проверка утечек памяти в приложении ZMQ
* Отправка и получение сообщений с несколькими фреймами
* Передача сообщений через сеть
* Создание простого агента очередей сообщений
* Разработка многопоточных приложений с использованием ZMQ
* Передача сигналов между потоками с помощью ZMQ
* Координация узлов в сети с помощью ZMQ
* Создание постоянных сокетов с использованием идентификаторов
* Создание и использование конвертов сообщений в модели публикация-подписка
* Возобновление работы постоянных подписчиков после сбоев
* Предотвращение переполнения памяти с помощью пороговых значений (HWM)
```### Философия нуля
Символ Ø в названии ØMQ долгое время вызывал споры. С одной стороны, этот специфический символ снижал доход от ZMQ в Google и Twitter; с другой стороны, он мог раздражать некоторых датчан, которые считают, что Ø — это не просто 0.
Изначально ZMQ означало "нулевое middleware", "нулевую задержку". Со временем же это название приобрело новые значения: "нулевое управление", "нулевые затраты", "нулевое промахивание". В целом, ноль символизирует минимальность и простоту, что является фундаментальной философией этого проекта. Мы стремимся упрощать и повышать удобство использования.
### API сокетов
Честно говоря, ZMQ немного обманывает ожидания. Однако мы не собираемся извиняться за это, так как эта концептуальная смена ни в коем случае не наносит вреда. ZMQ предоставляет API, аналогичное BSD сокетам, но скрывает многие детали обработки сообщений, что вы будете постепенно осваивать и использовать с удовольствием.
Сокеты фактически являются стандартным интерфейсом для сетевой программы, и одна из причин популярности ZMQ заключается в том, что она основана на стандартном API сокетов. Поэтому операции с сокетами ZMQ легко понять, и их жизненный цикл состоит из четырёх основных этапов:* Создание и удаление сокетов: zmq_socket(), zmq_close()
* Настройка и чтение опций сокетов: zmq_setsockopt(), zmq_getsockopt()
* Установка соединений сокетов: zmq_bind(), zmq_connect()
* Отправка и получение сообщений: zmq_send(), zmq_recv()
```Вот C-код:``````c
void *mousetrap;
// Создаем сокет для ловли мышей
mousetrap = zmq_socket(context, ZMQ_PULL);
// Настроим сокет
int64_t jawsize = 10000;
zmq_setsockopt(mousetrap, ZMQ_HWM, &jawsize, sizeof(jawsize));
// Подключаем сокет к отверстию для мышей
zmq_connect(mousetrap, "tcp://192.168.55.221:5001");
// Ждем прибытия вкусной мыши
zmq_msg_t mouse;
zmq_msg_init(&mouse);
zmq_recv(mousetrap, &mouse, 0);
// Уничтожаем мышь
zmq_msg_close(&mouse);
// Уничтожаем сокет
zmq_close(mousetrap);
```
Обратите внимание, что сокет всегда является указателем пустого типа, а сообщение представляет собой структуру данных (о которой мы будем говорить ниже). Поэтому в C вы передаете сокет через переменную, а сообщение — через ссылку. Важно помнить, что все сокеты в ZMQ управляются самим ZMQ, а сообщения управляются программистом.
Создание, удаление и конфигурация сокетов похожи на работу с объектами, но помните, что ZMQ асинхронен и масштабируем, поэтому понимание его применения в сетях может потребовать времени.
### Использование сокетов для построения топологии
При соединении двух узлов один из них должен использовать `zmq_bind()`, а другой — `zmq_connect()`. Обычно узел, использующий `zmq_bind()`, называется сервером и имеет постоянный сетевой адрес, в то время как узел, использующий `zmq_connect()`, называется клиентом и имеет изменчивый адрес. Мы говорим о том, что сокет привязан к конечной точке, а другой сокет подключен к этой конечной точке. Конечная точка — это известный сетевой адрес.Связи ZMQ отличаются от традиционных TCP-соединений следующими особенностями:* Используются различные протоколы: inproc (внутри процесса), ipc (между процессами), tcp, pgm (multicast), epgm;
* Соединение устанавливается при вызове `zmq_connect()` клиентом, даже если конечная точка ещё не была привязана с помощью `zmq_bind()`;
* Соединение асинхронно и имеет буферизацию с помощью очередей сообщений;
* Соединение демонстрирует определённый шаблон сообщений, который определяется типом создаваемого сокета;
* Один сокет может иметь несколько входящих и исходящих соединений;
* ZMQ не предоставляет функции, аналогичные `zmq_accept()`, так как сокет автоматически начинает принимать соединения после привязки к конечной точке;
* Приложения не могут работать напрямую с этими соединениями, так как они скрыты в нижнем уровне ZMQ. В многих архитектурных решениях используется схожая клиент-серверная модель. Серверные компоненты являются более стабильными, в то время как клиентские компоненты более динамичны и могут легко добавляться или удаляться. Поэтому адрес сервера для клиента обычно является видимым, наоборот — нет. Таким образом, становится очевидно, какие компоненты следует рассматривать как серверные (используя zmq_bind()), а какие — как клиентские (используя zmq_connect()). В то же время это требует учёта типа используемых сокетов, что будет подробно рассмотрено ниже. Представьте, что если сначала запустить клиентскую часть, а затем — серверную, что произойдёт?В традиционной сетевой связи при запуске клиента система выдаст ошибку, но ZMQ позволяет свободно запускать компоненты архитектуры в любом порядке. Когда клиент использует zmq_connect() для подключения к определённому конечному адресу, он уже может отправлять сообщения через этот сокет. Если в это время сервер запустится и примет zmq_bind() для привязки к этому конечному адресу, ZMQ автоматически начнёт пересылку сообщений.Серверная сторона может использовать один сокет для привязки к нескольким конечным адресам. Это значит, что она может создавать соединения с использованием различных протоколов:
```c
zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "ipc://myserver.ipc");
```
Конечно, вы не можете привязывать к одному и тому же конечному адресу несколько раз, так как это вызовет ошибку.
Каждый раз, когда клиентская сторона использует zmq_connect() для подключения к одному из вышеупомянутых конечных адресов, сервер автоматически создает соединение. ZMQ не ограничивает количество соединений. Кроме того, клиентская сторона также может использовать один сокет для создания нескольких соединений.
В большинстве случаев, кто является сервером, а кто — клиентом, определяется на уровне сетевой архитектуры, а не на уровне потока сообщений. Однако есть некоторые особые случаи (например, повторная отправка сообщений после потери соединения), где использование одного и того же сокета для привязки и подключения может привести к различиям в поведении.
Поэтому, когда мы проектируем архитектуру, следует придерживаться принципа "серверная часть должна быть стабильной, а клиентская — гибкой", чтобы избежать ошибок.Сокеты имеют тип, и тип сокета определяет его поведение, правила отправки и приема сообщений и т.д. Вы можете подключать различные типы сокетов, например PUB-SUB комбинацию, которая называется моделью публикация-подписка, другие комбинации также имеют свои названия, которые будут подробно рассмотрены ниже.Именно благодаря возможности подключения сокетов различными способами, ZMQ обеспечивает базовую систему очередей сообщений. На этой основе можно строить более сложные устройства и маршрутизацию, что будет подробно рассмотрено ниже. В целом, ZMQ предоставляет вам набор компонентов для использования в вашей сетевой архитектуре.
### Передача данных с помощью сокетов
Отправка и получение сообщений осуществляются с помощью функций `zmq_send()` и `zmq_recv()`. Хотя названия функций кажутся простыми, понять их работу требует времени из-за отличий в модели I/O ZMQ от традиционного протокола TCP. Рассмотрим различия между TCP-сокетами и ZMQ-сокетами в передаче данных:* ZMQ-сокеты передают сообщения, а не байты (TCP) или фреймы (UDP). Сообщение представляет собой блок двоичных данных определённой длины. Мы подробнее рассмотрим сообщения ниже; эта концепция была разработана с учётом оптимизации производительности, поэтому она может быть сложной для понимания.
* ZMQ-сокеты выполняют операции ввода-вывода в фоновом режиме, то есть при отправке или получении сообщений они передаются в локальную буферизированную очередь памяти, размер которой можно настроить.
* ZMQ-сокеты могут подключаться к нескольким сокетам (если тип сокета позволяет это делать). TCP-протокол поддерживает только точечную связь, тогда как ZMQ поддерживает одновременную связь с многими сокетами (аналогично радиовещанию), множественные связи (аналогично почтовому отделению), множественные входящие связи (аналогично почтовому ящику) и, конечно же, одиночные связи.
* ZMQ-сокеты могут отправлять сообщения нескольким получателям (модель разветвления), или принимать сообщения от нескольких источников (модель слияния).```textdiagram
+------------+ +------------+
| | | |
| Node | | Node |
| | | |
+------------+ +------------+
| Socket | | Socket |
\----+-+-----/ \------+-----/
| | :
1 к N | +------------------------+
Fan out | |
+------------------------+ | N к 1
| | | Fan in
v v v
/------------\ /------------\
| Socket | | Socket |
+------------+ +------------+
| | | |
| Node | | Node |
| | | |
+------------+ +------------+
```
Когда вы записываете сообщение в сокет, оно может быть отправлено многим узлам, а сокет, в свою очередь, будет принимать сообщения со всех установленных соединений. Метод `zmq_recv()` использует алгоритм справедливого очередного обслуживания для определения, какое соединение будет выбрано для получения сообщения.При вызове метода `zmq_send()` сообщение фактически не отправляется непосредственно через соединение сокета. Вместо этого сообщение сохраняется в памяти в очереди, и его асинхронно отправляет фоновый I/O-поток. Если всё работает корректно, это поведение является неблокирующим. Поэтому, даже если `zmq_send()` возвращает значение, это не означает, что сообщение уже было отправлено. После инициализации сообщения с помощью `zmq_msg_init_data()` вы не можете переиспользовать или освободить это сообщение, так как I/O-поток ZMQ считает его мусором. Это распространённая ошибка для новичков, и мы рассмотрим правильное использование сообщений ниже.### Одиночная передача
ZMQ предоставляет набор протоколов одиночной передачи (inproc, ipc, tcp) и два протокола широковещательной передачи (epgm, pgm). Протоколы широковещательной передачи являются более продвинутыми и будут рассмотрены позже. Если вы не можете ответить на вопрос, как влияет коэффициент фан-аута на одиночную передачу одного ко многим, то лучше пока не изучайте протоколы широковещательной передачи.
Обычно используется **tcp** в качестве протокола передачи. TCP-соединение может работать автономно, оно гибкое, мобильное и достаточно быстрое. Почему оно называется автономным? Потому что TCP-соединение в ZMQ не требует наличия связанного сервиса на конечной точке. Клиент и сервер могут подключаться и связываться в любое время, и это прозрачно для приложения.
Протокол взаимодействия процессов, или **ipc**, похож на tcp, но абстрагирован от сетевой передачи и не требует указания IP-адреса или домена. Этот протокол часто удобен, и многие примеры в этом руководстве используют его. IPC-соединение в ZMQ также может быть автономным, но имеет один недостаток — он не работает на операционной системе Windows, что может быть исправлено в будущих версиях ZMQ. Обычно к конечной точке добавляется расширение .ipc, и при использовании IPC-соединения на UNIX-системах следует учитывать вопросы доступа. Убедитесь, что все программы могут найти эту конечную точку IPC.Протокол взаимодействия внутри процесса, или **inproc**, позволяет передавать сообщения между различными потоками одного и того же процесса. Он намного быстрее, чем IPC или TCP. Этот протокол требует предварительного связывания с конечной точкой перед установлением соединения, что может быть исправлено в будущем. Обычно сначала запускается поток сервера, связывается с конечной точкой, затем запускается поток клиента и подключается к этой конечной точке.
### ZMQ не только для передачи данных
Часто новички задаются вопросом, как использовать ZMQ для создания сервиса? Можно ли создать HTTP-сервер с помощью ZMQ?
Ожидаемый ответ заключается в том, что мы используем обычные сокеты для передачи HTTP-запросов и ответов, а ZMQ-сокеты также могут выполнять эту задачу, но быстрее и эффективнее.
К сожалению, это не совсем так. ZMQ не является просто инструментом для передачи данных, а представляет собой новую архитектуру, построенную поверх существующих протоколов связи. Его данные имеют другое форматирование по сравнению с существующими протоколами, как показано ниже:
Сравнение HTTP-запроса и ZMQ-запроса, использующих одинаковый протокол TCP/IP:
```textdiagram
+----------------+----+----+----+----+
| GET /index.html| 13 | 10 | 13 | 10 |
+----------------+----+----+----+----+
Рисунок # - HTTP-запрос
```HTTP-запрос использует CR-LF (перенос строки) для разделения информационных фреймов, в то время как ZMQ использует указанный размер для определения фрейма:
```textdiagram
+---+---+---+---+---+---+
| 5 | H | E | L | L | O |
+---+---+---+---+---+---+
Рисунок # - ZMQ-запрос
```
Таким образом, вы можете использовать ZMQ для создания чего-то похожего на HTTP-протокол, но это уже не будет HTTP.
Однако, если кто-то спросит меня, как лучше использовать ZMQ для создания нового сервиса, я могу предложить следующее решение: вы можете разработать свой собственный протокол связи, использовать ZMQ для соединения, предоставлять услуги и расширять их на разных языках программирования, будь то локально или удаленно. Архитектура сетевого сервиса Mongrel2 от Сайда Шо является отличным примером.
### Потоки ввода-вывода
Мы упоминали, что ZMQ использует фоновые потоки ввода-вывода для передачи сообщений. Один поток ввода-вывода может обрабатывать запросы множества сокетов, за исключением крайних случаев. Это и есть значение 1, которое мы передаем при создании контекста:
```c
void *context = zmq_init (1);
```Одним из отличий ZMQ-приложений от традиционных приложений заключается в том, что вам не требуется создавать соединение для каждого сокета. Одиночный ZMQ-сокет может обрабатывать все задачи отправки и приема. Например, если вам нужно отправлять сообщения тысяче подписчиков, вы можете использовать один сокет; если вам нужно распределить задачи двадцати служебным процессам, вы также можете использовать один сокет; если вам нужно получать данные из тысячи веб-приложений, вы также можете использовать один сокет. Эта функциональность может перевернуть подход к созданию сетевых приложений. В традиционных приложениях каждый процесс или поток имеет одно удалённое соединение, которое может обслуживать только один сокет. ZMQ позволяет вам отказаться от этой структуры и использовать один поток для выполнения всех задач, что делает расширение более простым.Эта функциональность может перевернуть подход к созданию сетевых приложений. В традиционных приложениях каждый процесс или поток имеет одно удалённое соединение, которое может обслуживать только один сокет. ZMQ позволяет вам отказаться от этой структуры и использовать один поток для выполнения всех задач, что делает расширение более простым.### Основные сообщающиеся модели
API сокетов ZMQ предлагает множество сообщающихся моделей. Если вы знакомы с корпоративными сообщающими системами, эти модели могут показаться вам знакомыми. Однако для новичков ZMQ может вызвать некоторое удивление.
Давайте вспомним, что делает ZMQ для вас: он быстро и эффективно отправляет сообщения на другие узлы, где узлом может быть поток, процесс или другой компьютер; ZMQ предоставляет простой API сокетов для приложений, не требуя знания используемых протоколов (внутри процесса, между процессами, TCP или широковещательной рассылки); когда узлы перемещаются, ZMQ автоматически подключается или переподключается; при отправке или получении сообщений ZMQ помещает их в очередь, гарантируя, что процесс не будет завершён из-за превышения памяти, а сообщения будут записаны на диск; ZMQ обрабатывает исключения сокетов; все операции ввода-вывода выполняются в фоновом режиме; ZMQ не создаёт мёртвых узлов.Давайте вспомним, что делает ZMQ для вас: он быстро и эффективно отправляет сообщения на другие узлы, где узлом может быть поток, процесс или другой компьютер; ZMQ предоставляет простой API сокетов для приложений, не требуя знания используемых протоколов (внутри процесса, между процессами, TCP или широковещательной рассылки); когда узлы перемещаются, ZMQ автоматически подключается или переподключается; при отправке или получении сообщений ZMQ помещает их в очередь, гарантируя, что процесс не будет завершён из-за превышения памяти, а сообщения будут записаны на диск; ZMQ обрабатывает исключения сокетов; все операции ввода-вывода выполняются в фоновом режиме; ZMQ не создаёт мёртвых замков.### Основные сообщающиеся модели
API сокетов ZMQ предлагает множество сообщающихся моделей. Если вы знакомы с корпоративными сообщающими системами, эти модели могут показаться вам знакомыми. Однако для новичков ZMQ может вызвать некоторое удивление.
Давайте вспомним, что делает ZMQ для вас: он быстро и эффективно отправляет сообщения на другие узлы, где узлом может быть поток, процесс или другой компьютер; ZMQ предоставляет простой API сокетов для приложений, не требуя знания используемых протоколов (внутри процесса, между процессами, TCP или широковещательной рассылки); когда узлы перемещаются, ZMQ автоматически подключается или переподключается; при отправке или получении сообщений ZMQ помещает их в очередь, гарантируя, что процесс не будет завершён из-за превышения памяти, а сообщения будут записаны на диск; ZMQ обрабатывает исключения сокетов; все операции ввода-вывода выполняются в фоновом режиме; ZMQ не создаёт мёртвых узлов.Давайте вспомним, что делает ZMQ для вас: он быстро и эффективно отправляет сообщения на другие узлы, где узлом может быть поток, процесс или другой компьютер; ZMQ предоставляет простой API сокетов для приложений, не требуя знания используемых протоколов (внутри процесса, между процессами, TCP или широковещательной рассылки); когда узлы перемещаются, ZMQ автоматически подключается или переподключается; при отправке или получении сообщений ZMQ помещает их в очередь, гарантируя, что процесс не будет завершён из-за превышения памяти, а сообщения будут записаны на диск; ZMQ обрабатывает исключения сокетов; все операции ввода-вывода выполняются в фоновом режиме; ZMQ не создаёт мёртвых замков.### Основные сообщающиеся модели
API сокетов ZMQ предлагает множество сообщающихся моделей. Если вы знакомы с корпоративными сообщающими системами, эти модели могут показаться вам знакомыми. Однако для новичков ZMQ может вызвать некоторое удивление.
Давайте вспомним, что делает ZMQ для вас: он быстро и эффективно отправляет сообщения на другие узлы, где узлом может быть поток, процесс или другой компьютер; ZMQ предоставляет простой API сокетов для приложений, не требуя знания используемых протоколов (внутри процесса, между процессами, TCP или широковещательной рассылки); когда узлы перемещаются, ZMQ автоматически подключается или переподключается; при отправке или получении сообщений ZMQ помещает их в очередь, гарантируя, что процесс не будет завершён из-за превышения памяти, а сообщения будут записаны на диск; ZMQ обрабатывает исключения сокетов; все операции ввода-вывода выполняются в фоновом режиме; ZMQ не создаёт мёртвых узлов.Давайте вспомним, что делает ZMQ для вас: он быстро и эффективно отправляет сообщения на другие узлы, где узлом может быть поток, процесс или другой компьютер; ZMQ предоставляет простой API сокетов для приложений, не требуя знания используемых протоколов (внутри процесса, между процессами, TCP или широковещательной рассылки); когда узлы перемещаются, ZMQ автоматически подключается или переподключается; при отправке или получении сообщений ZMQ помещает их в очередь, гарантируя, что процесс не будет завершён из-за превышения памяти, а сообщения будут записаны на диск; ZMQ обрабатывает исключения сокетов; все операции ввода-вывода выполняются в фоновом режиме; ZMQ не создаёт мёртвых замков.### Основные сообщающиеся модели
API сокетов ZMQ предлагает множество сообщающихся моделей. Если вы знакомы с корпоративными сообщающими системами, эти модели могут показаться вам знакомыми. Однако для новичков ZMQ может вызвать некоторое удивление.
Давайте вспомним, что делает ZMQ для вас: он быстро и эффективно отправляет сообщения на другие узлы, где узлом может быть поток, процесс или другой компьютер; ZMQ предоставляет простой API сокетов для приложений, не требуя знания используемых протоколов (внутри процесса, между процессами, TCP или широковещательной рассылки); когда узлы перемещаются, ZMQ автоматически подключается или переподключается; при отправке или получении сообщений ZMQ помещает их в очередь, гарантируя, что процесс не будет завершён из-за превышения памяти, а сообщения будут записаны на диск; ZMQ обрабатывает исключения сокетов; все операции ввода-вывода выполняются в фоновом режиме; ZMQ не создаёт мёртвых узлов.
Функция `zmq_socket()` включает в себя информацию обо всех режимах сообщений, которая представлена достаточно ясно, поэтому её стоит изучить несколько раз. Мы будем рассматривать содержание каждого режима сообщений и области его применения.Вот некоторые допустимые пары соединений сокетов (один конец привязан, другой — подключен):
* PUB - SUB
* REQ - REP
* REQ - ROUTER
* DEALER - REP
* DEALER - ROUTER
* DEALER - DEALER
* ROUTER - ROUTER
* PUSH - PULL
* PAIR - PAIR
Другие комбинации могут привести к непредсказуемым результатам и могут вызывать ошибки в будущих версиях ZMQ. Вы также можете изучить поведение этих типов сокетов через код.
### Верхнеуровневые режимы сообщений
Четыре основных режима сообщений, упомянутых выше, встроены в ZMQ и являются частью API, реализованной в C++ библиотеке ZMQ. Они гарантируют правильную работу. Если когда-либо Linux ядро будет использовать ZMQ, эти основные режимы будут включены.
Над этими режимами сообщений мы можем создать более сложные верхнеуровневые режимы сообщений. Эти режимы можно реализовать на любом языке программирования, они не являются частью основных типов и не распространяются вместе с ZMQ, а используются только в ваших собственных приложениях или поддерживаются сообществом ZMQ.
Цель этого руководства в том, чтобы предоставить вам некоторые верхнеуровневые режимы сообщений, как простые (правильное обработку сообщений), так и сложные (надежная модель публикации-подписки).
### Использование сообщенийЕдиницей передачи в ZMQ является сообщение, то есть двоичный блок данных. Вы можете использовать любую сериализацию, такую как Google Protocol Buffers, XDR, JSON и т.д., чтобы преобразовать данные в сообщение ZMQ. Однако этот инструмент сериализации должен быть удобным и быстрым, это зависит от вас.В памяти сообщение ZMQ представлено структурой zmq_msg_t (каждый язык имеет свою специфическую реализацию). При использовании ZMQ сообщений в C следует учитывать следующее:
* Вам нужно создавать и передавать объекты типа zmq_msg_t, а не просто блоки данных;
* Чтение сообщений требует инициализации пустого сообщения с помощью zmq_msg_init(), передачи его в функцию zmq_recv();
* Запись сообщений требует создания сообщения с помощью zmq_msg_init_size() (что также инициализирует область памяти), копирования данных в объект с помощью memcpy(), а затем передачи его в функцию zmq_send();
* Освобождение сообщения (а не его удаление) осуществляется с помощью функции zmq_msg_close(), которая удаляет ссылку на сообщение, а затем ZMQ уничтожает сообщение;
* Получение данных сообщения выполняется с помощью функции zmq_msg_data(); для получения размера сообщения используется функция zmq_msg_size();
* Что касается функций zmq_msg_move(), zmq_msg_copy(), zmq_msg_init_data(), рекомендуется не использовать их до полного понимания их описаний в документации. Вот типичный код для обработки сообщений, который должен быть знаком, если вы уже читали предыдущий код. Этот код был взят из файла zhelpers.h:
```c
// Получает ZMQ-строку из сокета и преобразует её в C-строку
static char *
s_recv (void *socket) {
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
int size = zmq_msg_size (&message);
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
zmq_msg_close (&message);
string[size] = 0;
return (string);
}
```// Преобразует C-строку в ZMQ-строку и отправляет её через сокет
static int
s_send (void *socket, char *string) {
int rc;
zmq_msg_t message;
zmq_msg_init_size (&message, strlen (string));
memcpy (zmq_msg_data (&message), string, strlen (string));
rc = zmq_send (socket, &message, 0);
assert (!rc);
zmq_msg_close (&message);
return (rc);
}
```
Вы можете расширить этот код, чтобы он поддерживал отправку и получение данных любой длины.
**Важно отметить, что после передачи объекта сообщения функции `zmq_send()`, его размер будет обнулен, поэтому вы не сможете отправить одно и то же сообщение дважды, а также не сможете получить содержимое уже отправленного сообщения.**
Если вам нужно отправить одно и то же сообщение дважды, вам следует создать новый объект сообщения перед первой отправкой, используя функцию `zmq_msg_copy()`. Эта функция не копирует само содержимое сообщения, а лишь копирует ссылку. После этого вы сможете отправить это сообщение снова (или несколько раз, если сделаете достаточно копий). Когда последняя ссылка на сообщение будет удалена, объект сообщения будет уничтожен.
ZMQ поддерживает многокадровые сообщения, то есть сохранение нескольких кадров сообщений в одном сообщении. Это широко используется в реальных приложениях, и мы подробнее рассмотрим это в третьей главе.
Что касается сообщений, стоит обратить внимание на следующие моменты:* Сообщения ZMQ отправляются и принимаются как единое целое, вы не получите только часть сообщения;
* ZMQ не отправляет сообщения сразу, а имеет задержку;
* Вы можете отправлять сообщения нулевой длины как сигнал;
* Сообщения должны умещаться в памяти, поэтому для отправки больших файлов или длинных сообщений их нужно разбивать на части и отправлять в отдельных сообщениях;
* Обязательно закрывайте сообщения с помощью функции zmq_msg_close(), за исключением случаев, когда язык автоматически освобождает объект сообщения при выходе переменной за область видимости. Ещё раз повторим: не используйте функцию zmq_msg_init_data(). Она предназначена для нулл-копирования и может вызвать проблемы. В ZMQ есть много вещей, которые вам ещё предстоит изучить, поэтому сейчас не стоит беспокоиться о том, как сократить несколько микросекунд.
### Обработка нескольких сокетов
В предыдущих примерах основной цикл программы выполнял следующие действия:
1. Ожидание сообщений от сокета;
1. Обработка сообщений;
1. Возврат к первому шагу.А что, если нам нужно читать сообщения из нескольких сокетов? Самый простой способ — подключить сокеты к нескольким конечным точкам и позволить ZMQ использовать механизм равномерной очереди для приема сообщений. Однако, если типы сокетов на разных конечных точках различаются (например, один сокет типа PULL, а другой PUB), этот подход может вызвать проблемы. Если вы начнете смешивать типы сокетов, это может привести к ненадежности в будущем.Правильный подход заключается в использовании функции zmq_poll(). Лучшим решением будет создание обёртки для zmq_poll(), чтобы сделать её частью фреймворка и написать событийно-ориентированный реактор, но это уже более сложная задача, которую мы здесь не рассматриваем.
Для начала мы не будем использовать zmq_poll(), а воспользуемся NOBLOCK (несинхронным) режимом для чтения сообщений из нескольких сокетов. Далее объединим примеры метеорологической службы и параллельной обработки:**msreader: Чтение из нескольких сокетов на C**```c
//
// Получение сообщений из нескольких сокетов
// В этом примере используется функция recv в цикле
//
#include "zhelpers.h"
int main (void)
{
// Подготовка контекста и сокетов
void *context = zmq_init (1);
// Подключение к распределителю задач
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Подключение к сервису погоды
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001", 5);
// Обработка сообщений, полученных из двух сокетов
// Здесь мы обрабатываем сообщения от распределителя задач первыми
while (1) {
// Обработка ожидающих задач
int rc;
for (rc = 0; ! rc; ) {
zmq_msg_t task;
zmq_msg_init (&task);
if ((rc = zmq_recv (receiver, &task, ZMQ_NOBLOCK)) == 0) {
// Обработка задачи
}
zmq_msg_close (&task);
}
// Обработка ожидающих обновлений погоды
for (rc = 0; ! rc; ) {
zmq_msg_t update;
zmq_msg_init (&update);
if ((rc = zmq_recv (subscriber, &update, ZMQ_NOBLOCK)) == 0) {
// Обработка обновления погоды
}
zmq_msg_close (&update);
}
// Нет сообщений, ждем 1 миллисекунду
s_sleep (1);
}
// Программа не должна доходить до этой точки, но все равно выполняем правильную очистку
zmq_close (receiver);
zmq_close (subscriber);
zmq_term (context);
return 0;
}
``````Одним из недостатков этого подхода является задержка в 1 миллисекунду до получения первого сообщения, что может создать проблемы в высоконагруженных программах. Кроме того, вам потребуется просмотреть функции, такие как `nanosleep()`, чтобы избежать увеличения количества циклов.```Пример повышает приоритет задачи-распределителя, и вы можете сделать улучшение, чередуя обработку сообщений, как это делается внутри ZMQ с помощью механизма справедливого очередного обслуживания.
Давайте рассмотрим, как можно использовать `zmq_poll()` для реализации такой же функциональности:
**Множественный сокетный опрос на C**
```c
//
// Получение сообщений с нескольких сокетов
// В этом примере используется функция zmq_poll()
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Подключение к распределителю задач
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Подключение к службе обновления погоды
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
// Инициализация объекта опроса
zmq_pollitem_t items[] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ subscriber, 0, ZMQ_POLLIN, 0 }
};
// Обработка сообщений от двух сокетов
while (1) {
zmq_msg_t message;
zmq_poll (items, 2, -1);
if (items[0].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (receiver, &message, 0);
// Обработка задачи
zmq_msg_close (&message);
}
if (items[1].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (subscriber, &message, 0);
// Обработка обновлений погоды
zmq_msg_close (&message);
}
}
// Программа никогда не достигнет этой точки
zmq_close (receiver);
zmq_close (subscriber);
zmq_term (context);
return 0;
}
```### Обработка ошибок и сигнала ETERM
Механизм обработки ошибок в ZMQ основан на принципе быстрого отказа. Мы считаем, что процесс должен быть максимально уязвимым к внутренним ошибкам, но достаточно устойчивым к внешним атакам и ошибкам. Например, живые клетки распадаются при обнаружении внутренних проблем, но активно защищаются от внешних воздействий. В ZMQ программировании использование утверждений (assertions) очень распространено, как и защита клеточной мембраны. Если мы не можем определить, является ли ошибка внутренней или внешней, это указывает на дизайн-дефект, который следует исправить. В C-языке, если проверка аргументов (assert) проваливается, программа немедленно завершается. В других языках можно использовать исключения для достижения того же эффекта.
Когда ZMQ обнаруживает проблему с внешней стороны, он возвращает ошибку вызывающему процессу. Если ZMQ не может восстановиться после ошибки, он не просто тихо игнорирует сообщение. В некоторых случаях ZMQ также использует аргументы для обнаружения внешних ошибок, что можно отнести к багам.На данный момент мы редко видим примеры обработки ошибок в C-коде. **В реальном коде каждое обращение к функциям ZMQ должно включать обработку ошибок**. Если вы не пишете на C, возможно, ваша библиотека ZMQ уже включает обработку ошибок. Однако в C вам придётся делать это самостоятельно. Вот некоторые общие методы обработки ошибок, начиная с POSIX:* Методы создания объектов возвращают NULL при неудаче;
* Другие методы возвращают 0 при успешном выполнении, и другое значение (обычно -1) при неудаче;
* Код ошибки можно получить из переменной errno или вызовом функции zmq_errno();
* Сообщение об ошибке можно получить с помощью функции zmq_strerror().
Существуют две ситуации, которые не следует рассматривать как ошибки:
* Когда поток использует NOBLOCK для вызова zmq_recv(), и если сообщение не было получено, метод вернёт -1, а errno будет установлен в EAGAIN;
* Когда поток вызывает zmq_term(), и если другие потоки выполняют блокирующие операции, функция завершит все операции, закроет сокеты и сделает возвращаемое значение методов -1, а errno установит в ETERM.
Соблюдая эти правила, вы сможете использовать аргументы в своих программах ZMQ:
```c
void *context = zmq_init (1);
assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc;
rc = zmq_bind (socket, "tcp://*:5555");
assert (rc == 0);
```
В первых версиях программы я помещал вызовы функций непосредственно в assert(), что приводило к проблемам, так как некоторые оптимизаторы могут удалить assert() из кода.
Давайте рассмотрим, как правильно завершить процесс. Возьмём пример с использованием режима трубопровода. Когда мы запускаем группу рабочих потоков в фоновом режиме, нам нужно закрыть их после выполнения задач. Мы можем отправить сообщение самоубийства этим рабочим потокам, и это лучше сделать через сборщика результатов.Как соединить сборщик результатов с рабочими потоками? PUSH-PULL сокеты односторонние. Принцип ZMQ гласит: если требуется решение нового вопроса, следует использовать новый тип сокета. В этом случае мы используем модель публикации-подписки для отправки сообщения самоубийства:
* Результат-сборщик создаёт PUB-сокет и подключается к новому конечному пункту;
* worker подключает SUB-сокет к этому конечному пункту;
* когда результат-сборщик обнаруживает завершение выполнения задачи, он отправляет сигнал самоуничтожения через PUB-сокет;
* worker получает сигнал самоуничтожения и завершает работу.
Этот процесс не требует добавления большого количества кода.```c
void *control = zmq_socket(context, ZMQ_PUB);
zmq_bind(control, "tcp://*:5559");
// Отправка сигнала самоуничтожения worker'ам
zmq_msg_init_data(&message, "KILL", cq_strlen("KILL"));
zmq_send(control, &message, 0);
zmq_msg_close(&message);
```
``````textdiagram
+-------------+
| |
| Вентилятор |
| |
+-------------+
| PUSH |
\------+------/
|
задачи
|
+---------------+---------------+
| | |
| /=--------|-----+=--------|-----+------\
задача | задача | задача | :
| | | | | | |
v v v v v v |
/------+-----\ /------+-----\ /------+-----\ |
| PULL | SUB | | PULL | SUB | | PULL | SUB | |
+------+-----+ +------+-----+ +------+-----+ |
| | | | | | |
| Рабочий | | Рабочий | | Рабочий | |
| | | | | | |
+------------+ +------------+ +------------+ |
| PUSH | | PUSH | | PUSH | |
\-----+------/ \-----+------/ \-----+------/ |
| | | |
результат результат результат |
| | | |
+---------------+---------------+ |
| |
результаты |
| |
v |
/-------------\ |
| PULL | |
+-------------+ |
| | |
| Слив | |
| | |
+-------------+ |
| PUB | |
\------+------/ |
``` | |
КОМАНДА ОТМЕНА |
| |
\--------------------------/
``` Рисунок # - Параллельный конвейер с сигналами саморазрушения ``````c
//
// Конвейерный режим - дизайн worker'а 2
// Добавлен публикация-подписка потока сообщений для получения сигналов самоуничтожения
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Сокет для получения задач
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Сокет для отправки результатов
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// Сокет для получения сигналов самоуничтожения
void *controller = zmq_socket (context, ZMQ_SUB);
zmq_connect (controller, "tcp://localhost:5559");
zmq_setsockopt (controller, ZMQ_SUBSCRIBE, "", 0);
// Обработка полученных задач или сигналов самоуничтожения
zmq_pollitem_t items [] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ controller, 0, ZMQ_POLLIN, 0 }
};
// Обработка сообщений
while (1) {
zmq_msg_t message;
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (receiver, &message, 0);
// Выполнение работы
s_sleep (atoi ((char *) zmq_msg_data (&message)));
// Отправка результата
zmq_msg_init (&message);
zmq_send (sender, &message, 0);
// Простое отображение выполненной задачи
printf (".");
fflush (stdout);
}
}
}
``` zmq_msg_close (&message);
}
// Любое контрольное сообщение означает самоуничтожение
if (items [1].revents & ZMQ_POLLIN)
break; // Выход из цикла
}
// Завершение программы
zmq_close (receiver);
zmq_close (sender);
zmq_close (controller);
zmq_term (context);
return 0;
}
```Вот модифицированный код собирающего результата, который после сбора всех результатов отправляет сигнал самоуничтожения всем worker'ам:
**tasksink2: Параллельный sink-задачи с сигналами самоуничтожения на C**
```c
//
// Pipeline Pattern - Collector Structure Design 2
// Добавлен публикуемо-подписываемый поток сообщений для отправки сигнала самоуничтожения worker'ам
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Socket to receive messages
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// Socket to send control information
void *controller = zmq_socket (context, ZMQ_PUB);
zmq_bind (controller, "tcp://*:5559");
// Wait for tasks to begin
char *string = s_recv (receiver);
free (string);
// Start timing
int64_t start_time = s_clock ();
// Confirm completion of 100 tasks
int task_nbr;
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (receiver);
free (string);
if ((task_nbr / 10) * 10 == task_nbr)
printf (":");
else
printf (".");
fflush (stdout);
}
printf ("Общее время выполнения: %d мс\n",
(int) (s_clock () - start_time));
// Send suicide signal to workers
s_send (controller, "KILL");
// Finish
sleep (1); // Wait for messages to be sent
zmq_close (receiver);
zmq_close (controller);
zmq_term (context);
return 0;
}
```
### Обработка сигналов прерывания
В реальных условиях приложение должно корректно очищаться и завершаться при получении сигнала Ctrl-C или других сигналов, таких как ETERM. По умолчанию этот сигнал убивает процесс, что приводит к потере неподанных сообщений, некорректному закрытию файлов и т.д.
В языке C обработка сигнала осуществляется следующим образом:**interrupt: Обработка Ctrl-C в C**
```c
//
// Пример обработки Ctrl-C
//
#include <zmq.h>
#include <stdio.h>
#include <signal.h>
// ---------------------------------------------------------------------
// Обработка сигналов
//
// Функция s_catch_signals() вызывается при запуске программы;
// В цикле проверяется значение переменной s_interrupted, если оно равно 1, то происходит выход из цикла;
// Подходит для использования с zmq_poll().
static int s_interrupted = 0;
static void s_signal_handler (int signal_value)
{
s_interrupted = 1;
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset (&action.sa_mask);
sigaction (SIGINT, &action, NULL);
sigaction (SIGTERM, &action, NULL);
}
int main (void)
{
void *context = zmq_init (1);
void *socket = zmq_socket (context, ZMQ_REP);
zmq_bind (socket, "tcp://*:5555");
s_catch_signals ();
while (1) {
// Блокирующее чтение останавливается при получении сигнала
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
if (s_interrupted) {
printf ("W: Получен сигнал прерывания, завершение программы...\n");
break;
}
}
zmq_close (socket);
zmq_term (context);
return 0;
}
```
Этот код использует функцию `s_catch_signals()` для перехвата сигналов, таких как Ctrl-C (SIGINT) и SIGTERM. При получении любого из этих сигналов функция устанавливает глобальную переменную `s_interrupted` равной 1. Приложение не будет автоматически завершено; вам потребуется явно выполнить необходимые действия по очистке и завершению работы.* Вызовите функцию s_catch_signals() при запуске программы для настройки перехвата сигналов;
* Если программа блокируется в функциях, таких как zmq_recv(), zmq_poll(), zmq_send() и т.д., при получении сигнала эти функции вернут EINTR;
* Функции, такие как s_recv(), обрабатывают прерывание, возвращая NULL;
* Таким образом, ваше приложение может проверять наличие ошибки EINTR, NULL возврата или значения переменной s_interrupted равной 1. Пример кода ниже хорошо иллюстрирует данное явление:```c
s_catch_signals();
client = zmq_socket(...);
while (!s_interrupted) {
char *message = s_recv(client);
if (!message)
break; // Клавиша Ctrl-C была нажата
}
zmq_close(client);
```
Если вы не обрабатываете сигналы после вызова `s_catch_signals()`, ваша программа будет невосприимчива к Ctrl-C и ETERM.
### Обнаружение утечек памяти
Любая долгоживущая программа должна правильно управлять памятью, в противном случае она рано или поздно столкнется с исчерпанием памяти, что приведет к аварийному завершению работы. Если ваш язык программирования автоматически управляет памятью за вас, то вам повезло. Однако при использовании языков, таких как C/C++, вам придется самостоятельно управлять памятью. В этом разделе мы рассмотрим инструмент под названием valgrind, который может использоваться для отслеживания утечек памяти.
* Установка valgrind на Ubuntu или Debian: `sudo apt-get install valgrind`
* По умолчанию, ZMQ может вызывать множество предупреждений в valgrind. Чтобы отключить эти предупреждения, можно использовать опцию `ZMQ_MAKE_VALGRIND_HAPPY` при компиляции ZMQ:
```
$ cd zeromq2
$ export CPPFLAGS=-DZMQ_MAKE_VALGRIND_HAPPY
$ ./configure
$ make clean; make
$ sudo make install
```
* Программа должна правильно обрабатывать Ctrl-C, особенно для долгоживущих процессов (например, очередей). Если этого не сделать, valgrind сообщит о всех распределенных участках памяти как об ошибках.
* Компиляция программы с опцией `-DDEBUG` позволит valgrind указать конкретные места кода, где происходит утечка памяти.* Наконец, запустите valgrind следующим образом:
```
valgrind --tool=memcheck --leak-check=full someprog
```
После решения всех проблем вы увидите следующее сообщение:
```
==30536== ERROR SUMMARY: 0 errors from 0 contexts...
```
### Сообщения с несколькими кадрами
Сообщения ZMQ могут содержать несколько кадров, что очень распространено в реальных приложениях, особенно когда речь идет о "конвертах". В этом разделе мы рассмотрим, как правильно отправлять и получать сообщения с несколькими кадрами.
Каждый кадр в сообщении с несколькими кадрами представляет собой структуру `zmq_msg`. Это значит, что если вы отправляете сообщение с пятью кадрами, вам нужно будет обрабатывать пять структур `zmq_msg`. Вы можете хранить эти кадры в некоторой структуре данных или обрабатывать их по одному.
Пример кода ниже демонстрирует отправку сообщения с несколькими кадрами:
```c
zmq_send (socket, &message, ZMQ_SNDMORE);
...
zmq_send (socket, &message, ZMQ_SNDMORE);
...
zmq_send (socket, &message, 0);
```
Затем мы рассмотрим, как принимать и обрабатывать эти сообщения. Этот код применим как для односоставных, так и для многосоставных сообщений:
```c
while (1) {
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
// Обработка одного сегмента сообщения
zmq_msg_close (&message);
int64_t more;
size_t more_size = sizeof (more);
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
if (!more)
break; // Достигнут последний сегмент
}
```
Что касается многосоставных сообщений, то вам следует знать следующее:* При отправке многосоставных сообщений, весь пакет будет отправлен только после отправки последнего сегмента;
* Если используется функция zmq_poll(), то при получении первого сегмента сообщения, все остальные сегменты уже были получены;
* Многосоставные сообщения передаются целиком, частичные сообщения не отправляются;
* Каждый сегмент многосоставного сообщения представляет собой структуру zmq_msg;
* Независимо от того, проверяете ли вы опцию ZMQ_RCVMORE сокета или нет, вы всё равно получите все сообщения;
* При отправке ZMQ кэширует начальные сегменты сообщения в памяти до тех пор, пока не будет получен последний сегмент;
* Мы не можем отменить отправку после отправки части сообщения, можно только закрыть сокет.
### Промежуточное программное обеспечение и устройства
Когда количество сетевых компонентов небольшое, все узлы знают о существовании других узлов. Однако, с увеличением количества узлов, стоимость такой структуры возрастает. Поэтому нам необходимо разделить эти компоненты на более мелкие модули и использовать промежуточное программное обеспечение для их соединения.Такая структура очень распространена в реальном мире; наши общественные и экономические системы полны механизмов промежуточного программного обеспечения, снижающих сложность и сжимающих затраты на создание крупных сетей. Промежуточное программное обеспечение также может называться оптовым торговцем, субподрядчиком, менеджером и т. д.Аналогично, ZMQ-сети также требуют промежуточного программного обеспечения при увеличении масштаба. В ZMQ такие компоненты называются "устройствами". На ранних этапах разработки ZMQ-программ мы рисуем несколько узлов и соединяем их без использования промежуточного программного обеспечения:
```textdiagram
+---------+
| |
| Узел |
| |
+---------+
| Socket |
\----+----/
|
|
+------+------+
| |
| |
/----+----\ /----+----\
| Socket | | Socket |
+---------+ +---------+
| | | |
| Узел | | Узел |
| | | |
+---------+ +---------+
```
Рисунок # - Малый масштаб приложения 0MQ
```
Затем мы продолжаем расширять эту структуру, размещая устройства на определённых позициях и увеличивая количество узлов:```textdiagram
+---------+
| |
| Node |
| |
+---------+
| Socket |
\----+----/
|
|
+-------------+-------------+
| | |
| | |
/----+----\ /----+----\ /----+----\
| Socket | | Socket | | Socket |
+---------+ +---------+ +---------+
| | | | | |
| Node | | Node | | Device |
| | | | | |
+---------+ +---------+ +---------+
| Socket |
\----+----/
|
|
+------+------+
| |
| |
/----+----\ /----+----\
| Socket | | Socket |
+---------+ +---------+
| | | |
| Node | | Node |
| | | |
+---------+ +---------+
``` Рисунок # - Больший масштаб приложения 0MQ
```
Устройства ZMQ не имеют конкретных правил проектирования, но обычно включают группу "передних" конечных точек и группу "задних" конечных точек. Устройства являются бессостоятельными, поэтому их можно широко развернуть в сети. Вы можете запустить устройство в отдельном потоке внутри процесса или запустить его непосредственно в одном процессе. Внутри ZMQ также предоставляются базовые реализации устройств для использования.
Устройства ZMQ могут использоваться для маршрутизации и адресации, предоставления услуг, планирования очередей и других задач, которые вы можете себе представить. Разные модели сообщений требуют различных типов устройств для построения сети. Например, в модели запрос-ответ используются устройства очередей и абстрактные службы, а в модели публикация-подписка — устройства потока и тематические устройства.
Преимущество устройств ZMQ перед другими промежуточными слоями заключается в том, что их можно разместить в любой части сети и использовать для выполнения любых задач, которые вам нужны.#### Прокси-сервис публикация-подпискаЧасто требуется расширить модель публикации-подписки на различные типы сетей. Например, группа подписчиков находится в Интернете, а мы хотим использовать широковещательную рассылку для отправки сообщений внутренним подписчикам и TCP-соединение для отправки сообщений внешним подписчикам.Нашей задачей является создание простого прокси-сервиса, который будет служить мостом между публикаторами и внешними подписчиками. Этот сервис имеет два конца: один конец подключается к внутренним публикаторам, а другой — к внешней сети. Он принимает сообщения от публикаторов и пересылает их внешним подписчикам.
**wuproxy: Прокси-сервис для погодных обновлений на C**
```c
//
// Прокси-сервис для погодных обновлений
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Подключаемся к погодным обновлениям
void *frontend = zmq_socket (context, ZMQ_SUB);
zmq_connect (frontend, "tcp://192.168.55.210:5556");
// Пересылаем погодные обновления
void *backend = zmq_socket (context, ZMQ_PUB);
zmq_bind (backend, "tcp://10.1.1.0:8100");
// Подписываемся на все сообщения
zmq_setsockopt (frontend, ZMQ_SUBSCRIBE, "", 0);
// Пересылаем сообщения
while (1) {
while (1) {
zmq_msg_t message;
int64_t more;
// Обрабатываем все фреймы сообщений
zmq_msg_init (&message);
zmq_recv (frontend, &message, 0);
size_t more_size = sizeof (more);
zmq_getsockopt (frontend, ZMQ_RCVMORE, &more, &more_size);
zmq_send (backend, &message, more ? ZMQ_SNDMORE : 0);
zmq_msg_close (&message);
if (!more)
break; // Достигли последнего фрейма
}
}
// Программа не достигнет этой точки, но всё равно должна корректно завершиться
zmq_close (frontend);
zmq_close (backend);
zmq_term (context);
return 0;
}
```Мы называем этот сервис прокси, так как он одновременно является подписчиком и публикатором. Это означает, что добавление этого сервиса не требует изменения кода других программ, достаточно лишь чтобы внешние подписчики знали новый сетевой адрес.
+-----------+
| |
| Publisher |
| |
+-----------+
| PUB |
\-----------/
bind
tcp://192.168.55.210:5556
|
|
+----------------+----------------+
| | |
| | |
connect connect |
/------------\ /------------\ connect
| SUB | | SUB | /------------\
+------------+ +------------+ | SUB |
| | | | +------------+
| Subscriber | | Subscriber | | |
| | | | | Forwarder |
+------------+ +------------+ | |
+------------+
Internal network | PUB |
---------------------------------\------------/--------
External network bind
tcp://10.1.1.0:8100
|
|
+--------+--------+
| |
| |
connect connect
/------------\ /------------\
| SUB | | SUB |
+------------+ +------------+
| | | |
| Subscriber | | Subscriber |
| | | | +------------+ +------------+Можно заметить, что этот сценарий корректно обрабатывает многофреймовые сообщения, передавая их полностью подписчику. Если при отправке мы не указываем опцию ZMQ_SNDMORE, то下游节点收到的消息可能会损坏。 Важно обеспечить правильную обработку многофреймовых сообщений при написании устройств, чтобы избежать потери сообщений.#### Запрос-ответный прокси
Давайте создадим небольшой прокси-агент для модели запрос-ответ в многопользовательской среде.
В модели Hello World клиент/сервер один клиент и один сервер взаимодействуют. Однако в реальных условиях нам может потребоваться, чтобы несколько клиентов вза møжно с несколькими серверами. Ключевой проблемой является то, что сервер должен быть бессостоятельным, а все состояние должно быть включено в одном запросе или храниться в других средствах, таких как база данных.
У нас есть два способа соединения нескольких клиентов и нескольких серверов. Первый способ заключается в том, чтобы позволить клиенту напрямую соединяться с несколькими серверами. Клиентский сокет может быть подключен к нескольким серверным сокетам, и запросы, отправленные им, будут распределяться между серверами с помощью балансировки нагрузки. Например, если у нас есть клиент, подключенный к трем серверам A, B и C, и клиент генерирует четыре запроса R1, R2, R3 и R4, то R1 и R4 будут обработаны сервером A, R2 — сервером B, а R3 — сервером C:
1. Клиент подключен к трем серверам A, B и C.
2. Клиент генерирует четыре запроса R1, R2, R3 и R4.
3. Запросы R1 и R4 обрабатываются сервером A.
4. Запрос R2 обрабатывается сервером B.
5. Запрос R3 обрабатывается сервером C.```textdiagram
+-----------+
| |
| Клиент |
| |
+-----------+
| REQ |
\-----+-----/
|
R1, R2, R3, R4
|
+-------------+-------------+
| | |
R1, R4 R2 R3
| | |
v v v
/---------\ /---------\ /---------\
| REP | | REP | | REP |
+---------+ +---------+ +---------+
| | | | | |
| Сервис | | Сервис | | Сервис |
| A | | B | | C |
| | | | | |
+---------+ +---------+ +---------+
``` Рисунок # - Распределение запросов
Эта конфигурация позволяет легко добавлять клиентов, но если требуется добавить серверы, то каждый клиент должен быть перенастроен. Если у вас есть 100 клиентов и вам нужно добавить три сервера, то каждому клиенту придется перенастроиться, чтобы знать о существовании новых серверов. Этот подход точно не то, что мы хотели бы использовать. Чем больше жестко закрепленных модулей в сетевой структуре, тем сложнее она становится для расширения. Поэтому нам нужен модуль, расположенный между клиентами и серверами, который будет концентрировать все знания в этой сетевой топологии. В идеальном случае, мы должны иметь возможность добавлять или удалять клиентов или серверы без необходимости изменения конфигурации любого компонента.
Давайте создадим такой компонент. Этот прокси будет связан с двумя конечными точками: передней конечной точкой для подключения клиентов и задней конечной точкой для подключения серверов. Он будет использовать zmq_poll() для опроса этих двух сокетов, принимать сообщения и пересылать их. В устройстве не будет очередей, так как ZMQ автоматически создает их внутри сокетов.При использовании REQ и REP сокетов, сессия запрос-ответ строго синхронна. Клиент отправляет запрос, сервер получает запрос и отправляет ответ, который затем получает клиент. Если один из клиентов или серверов сталкивается с проблемой (например, повторное отправление запроса), программа выдаст ошибку.Однако, наш прокси-компонент должен быть неблокирующим. Хотя можно использовать zmq_poll() для одновременного обслуживания двух сокетов, здесь явно нельзя использовать REP и REQ сокеты.
К счастью, у нас есть DEALER и ROUTER сокеты, которые могут справиться с этой задачей, обеспечивая неблокирующее приемо-передачу сообщений. DEALER ранее назывался XREQ, а ROUTER — XREP, но в новом коде следует использовать названия DEALER/ROUTER. В третьей главе вы узнаете, как использовать DEALER и ROUTER сокеты для создания различных типов запрос-ответ моделей.
Давайте рассмотрим, как DEALER и ROUTER сокеты работают в этом устройстве.
Ниже приведена схема, описывающая модель запрос-ответ, где REQ и ROUTER взаимодействуют, а затем DEALER и REP:
```textdiagram
+---------+ +---------+ +---------+
| REQ | | REQ | | REQ |
\----+----/ \----+----/ \----+----/
| | |
| | |
+-------------+-------------+
|
|
/-----+-----\
| ROUTER |
+-----------+
| code |
+-----------+
| DEALER |
\-----+-----/
|
|
+-------------+-------------+
| | |
| | |
/----+----\ /----+----\ /----+----\
| REP | | REP | | REP |
+---------+ +---------+ +---------+
```
Рисунок # - Расширенный запрос-ответ
Запрос-ответ агент привязывает два сокета к передней и задней части для подключения клиентских и серверных сокетов. Перед использованием этого устройства необходимо также скорректировать код клиента и сервера.**rrclient: Клиент запрос-ответ на C**
```c
//
// Приветственный клиент
// Подключает сокет REQ к tcp://localhost:5559
// Отправляет "Hello" серверу и ожидает ответ "World"
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Сокет для связи с сервером
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5559");
int request_nbr;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
s_send (requester, "Hello");
char *string = s_recv (requester);
printf ("Received reply %d [%s]\n", request_nbr, string);
free (string);
}
zmq_close (requester);
zmq_term (context);
return 0;
}
```
Вот код сервера:
**rrserver: Сервис запрос-ответ на C**
```c
//
// Приветственный сервер
// Подключает сокет REP к tcp://*:5560
// Получает запрос "Hello", возвращает ответ "World"
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Сокет для связи с клиентом
void *responder = zmq_socket (context, ZMQ_REP);
zmq_bind (responder, "tcp://*:5560");
while (1) {
// Ждет следующий запрос
char *string = s_recv (responder);
printf ("Received request: [%s]\n", string);
free (string);
// Выполняет некоторую "работу"
sleep (1);
// Возвращает ответ
s_send (responder, "World");
}
// Программа не достигнет этой точки, но все равно следует очистить
zmq_close (responder);
zmq_term (context);
return 0;
}
```
Наконец, вот код агента, который способен обрабатывать сообщения с несколькими кадрами:**rrbroker: Запрос-ответ агент на C**
```c
//
// Простой запрос-ответный прокси
//
#include "zhelpers.h"
int main(void)
{
// Подготовка контекста и сокетов
void *context = zmq_init(1);
void *frontend = zmq_socket(context, ZMQ_ROUTER);
void *backend = zmq_socket(context, ZMQ_DEALER);
zmq_bind(frontend, "tcp://*:5559");
zmq_bind(backend, "tcp://*:5560");
// Инициализация набора опроса
zmq_pollitem_t items[] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
// Передача сообщений между сокетами
while (1) {
zmq_msg_t message;
int64_t more; // Обнаружение многофреймовых сообщений
zmq_poll(items, 2, -1);
if (items[0].revents & ZMQ_POLLIN) {
while (1) {
// Обработка всех фреймов сообщения
zmq_msg_init(&message);
zmq_recv(frontend, &message, 0);
size_t more_size = sizeof(more);
zmq_getsockopt(frontend, ZMQ_RCVMORE, &more, &more_size);
zmq_send(backend, &message, more ? ZMQ_SNDMORE : 0);
zmq_msg_close(&message);
if (!more)
break; // Последний фрейм
}
}
if (items[1].revents & ZMQ_POLLIN) {
while (1) {
// Обработка всех фреймов сообщения
zmq_msg_init(&message);
zmq_recv(backend, &message, 0);
size_t more_size = sizeof(more);
zmq_getsockopt(backend, ZMQ_RCVMORE, &more, &more_size);
zmq_send(frontend, &message, more ? ZMQ_SNDMORE : 0);
zmq_msg_close(&message);
if (!more)
break; // Последний фрейм
}
}
}
// Программа не должна достигнуть этой точки, но все равно следует очистить
zmq_close(frontend);
zmq_close(backend);
zmq_term(context);
return 0;
}
``````Использование запросно-ответного агента позволяет сделать клиент/серверную сеть более расширяемой: клиенты не знают о существовании серверов, а серверы не знают о существовании клиентов. Единственным стабильным компонентом в сети является промежуточный агент:``````textdiagram
+---------+ +---------+ +---------+
| | | | | |
| Клиент | | Клиент | | Клиент |
| | | | | |
+---------+ +---------+ +---------+
| REQ | | REQ | | REQ |
\---------/ \---------/ \---------/
connect connect connect
| | |
| | |
request request request
| | |
+-------------+-------------+
|
справедливое очередное обслуживание
|
v
bind
/-----------\
| ROUTER |
+-----------+
| |
| Брокер |
| |
+-----------+
| DEALER |
\-----------/
bind
|
балансировка нагрузки
|
+-------------+-------------+
| | |
request request request
| | |
v v v
connect connect connect
/---------\ /---------\ /---------\
| REP | | REP | | REP |
+---------+ +---------+ +---------+
| | | | | |
| Сервер | | Сервер | | Сервер |
| A | | B | | C |
| | | | | |
+---------+ +---------+ +---------+
Рисунок # - Запросно-ответный брокер
```
#### Встроенные устройства
ZMQ предоставляет несколько встроенных устройств, хотя большинство людей предпочитает самостоятельно писать эти устройства. Встроенные устройства включают:
* QUEUE, который может использоваться как запросно-ответный агент;
* FORWARDER, который может использоваться как публикация-подписка агент;
* STREAMER, который может использоваться как агент в режиме потока данных.
QUEUE, FORWARDER и STREAMER — это встроенные устройства, которые можно использовать для различных типов коммуникаций.Для запуска устройства можно использовать функцию `zmq_device()`, передавая ей два сокета:
```c
zmq_device (ZMQ_QUEUE, frontend, backend);
```
Запуск QUEUE устройства эквивалентно добавлению запросно-ответного агента в сеть, достаточно создать уже связанные или подключенные сокеты. Ниже приведён пример использования встроенного устройства: **msgqueue: Очередь сообщений в C**
```c
//
// Простой агент очереди сообщений
// Обладает такими же функциями, как и агент запрос-ответ, но использует встроенные устройства
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Клиентский сокет
void *frontend = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (frontend, "tcp://*:5559");
// Серверный сокет
void *backend = zmq_socket (context, ZMQ_DEALER);
zmq_bind (backend, "tcp://*:5560");
// Запуск встроенного устройства
zmq_device (ZMQ_QUEUE, frontend, backend);
// Программа не достигнет этой точки
zmq_close (frontend);
zmq_close (backend);
zmq_term (context);
return 0;
}
```
Встроенные устройства правильно обрабатывают ошибки, в то время как агенты, реализованные вручную, не содержат механизмов обработки ошибок. Поэтому, когда вы можете использовать встроенные устройства, лучше это сделать.
Вы можете задаться вопросом, как некоторые разработчики ZMQ: что произойдет, если я передам другие типы сокетов этим устройствам? Ответ: не делайте этого. Вы можете передавать различные типы сокетов, но результат будет странным. Так, устройство QUEUE должно использовать ROUTER/DEALER сокеты, FORWARDER — SUB/PUB, STREAMER — PULL/PUSH.Если вам нужны другие типы сокетов для ваших приложений, вам придётся самостоятельно реализовать устройства.
### Многопоточное программирование с ZMQ
Многопоточное программирование с использованием ZMQ может стать настоящим удовольствием. При работе с многопоточностью и ZMQ-сокетами вам не нужно беспокоиться о дополнительных вещах, просто позвольте им работать.
При использовании ZMQ для многопоточного программирования **не требуется беспокоиться о мьютексах, блоках или других факторах, связанных с параллелизмом; единственное, что вам нужно учитывать, это сообщения между потоками**.
Что такое "идеальное" многопоточное программирование? Это когда код легко писать и читать, он может использоваться на разных системах и языках, работает на любом компьютере с любым количеством ядер, не имеет состояния и не имеет ограничений по скорости.Если у вас есть многолетний опыт работы с многопоточным программированием и вы знаете, как использовать блокировки, семафоры, критические секции и другие механизмы для правильной работы кода (не говоря уже о скорости), вы можете испытывать разочарование, потому что ZMQ изменяет все это. За последние тридцать лет разработка параллельных приложений показала, что не следует делиться состоянием. Это как два пьяных человека, пытающихся поделиться одной бутылкой пива; если они не лучшие друзья, они скоро начнут драку. Когда к ним присоединяются еще несколько пьяных людей, ситуация становится еще хуже. Многопоточное программирование иногда напоминает ситуацию с пьяными людьми и пивом. Разработка многопоточных программ часто является болезненным процессом, особенно когда приложение падает под нагрузкой, и причина этого остается непонятной. Был написан материал "11 ошибок в многопоточной программировании", который широко распространен в крупных компаниях. В нем перечислены некоторые из этих ошибок: отсутствие синхронизации, неправильная зернистость, разделение чтения и записи, использование безблокировки, передачи блокировки, конкуренции приоритетов и других.Представьте себе ситуацию: в три часа дня, когда биржевые рынки полны активности, внезапно приложение падает из-за проблем с блокировками. Какова будет реакция? Поэтому, как разработчики, мы должны использовать более сложные механизмы для решения сложных многопоточных задач.
Некоторые сравнивают многопоточные программы с основными опорами крупных компаний, но они также являются самыми уязвимыми. Продукты, которые пытаются расширяться через сеть, часто заканчиваются провалом.
Как использовать ZMQ для многопоточного программирования, вот несколько правил:
* Не следует обращаться к одному и тому же объекту данных из разных потоков. Если вам требуется механизм мьютекса из традиционного программирования, это противоречит идеологии ZMQ. Единственным исключением является объект контекста ZMQ, который является потокобезопасным.
* Каждому процессу следует создавать контекст ZMQ и передавать его всем потокам, которые используют протокол inproc для связи.
* Вы можете рассматривать потоки как отдельные задачи, используя свои собственные контексты, но эти потоки не смогут использовать протокол inproc для связи друг с другом. Преимущество такого подхода заключается в возможности легкого перехода к запуску программы в виде нескольких процессов.* Не следует передавать объекты сокетов между потоками, так как они не являются потокобезопасными. Хотя технически это возможно, это потребует использования механизмов мьютекса и блокировок, что может замедлить и сделать ваше приложение уязвимым. Единственным разумным случаем является передача объектов сокетов внутри библиотеки ZMQ для некоторых языков программирования, где требуется сборка мусора.
Когда вы работаете с двумя устройствами в приложении, вы можете передавать объекты сокетов из одного потока в другой, что может работать на начальных этапах, но рано или поздно приведет к случайным ошибкам. Поэтому следует открывать и закрывать сокеты в одном и том же потоке.Если вы следуете этим правилам, вы обнаружите, что многопоточные программы легко могут быть разделены на несколько процессов. Логика программы может выполняться в потоках, процессах или даже на разных компьютерах, в зависимости от ваших требований. ZMQ использует нативные механизмы потоков системы, а не какие-то "зелёные потоки". Преимуществом этого подхода является то, что вам не придётся учиться новому API многопоточного программирования, и ваш код будет хорошо интегрирован с целевой операционной системой. Вы можете использовать инструменты, такие как ThreadChecker от Intel, чтобы проследить за работой потоков. Недостатком является то, что создание слишком большого количества потоков (например, тысячи) может привести к чрезмерной нагрузке на операционную систему.Далее мы рассмотрим пример, который сделает нашу исходную службу "Hello World" более мощной. Исходная служба была однопоточной, и если количество запросов было невелико, это не вызывало проблем. Потоки ZMQ могут быстро работать на одном ядре, выполняя большое количество задач. Однако, что произойдёт, если одновременно придут 10 000 запросов? В реальных условиях мы запускаем несколько рабочих потоков, которые будут стараться принимать запросы от клиентов, обрабатывать их и возвращать ответы.
Конечно, можно использовать несколько рабочих процессов для реализации этой цели, но запуск одного процесса всегда проще и легче в управлении, чем запуск нескольких процессов. Кроме того, рабочие потоки, запущенные как потоки, занимают меньше полосы пропускания и имеют меньшую задержку.
Вот многопоточная версия службы "Hello World":**mtserver: Многопоточная служба на C**
```c
//
// Многопоточная версия службы Hello World
//
#include "zhelpers.h"
#include <pthread.h>
static void *
worker_routine (void *context) {
// Подключение к сокету агента
void *receiver = zmq_socket (context, ZMQ_REP);
zmq_connect (receiver, "inproc://workers");
while (1) {
char *string = s_recv (receiver);
printf ("Получен запрос: [%s]\n", string);
free (string);
// Выполнение работы
sleep (1);
// Отправка ответа
s_send (receiver, "World");
}
zmq_close (receiver);
return NULL;
}
int main (void)
{
void *context = zmq_init (1);
// Сокет для связи с клиентами
void *clients = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (clients, "tcp://*:5555");
// Сокет для связи с рабочими процессами
void *workers = zmq_socket (context, ZMQ_DEALER);
zmq_bind (workers, "inproc://workers");
// Запуск пула рабочих процессов
int thread_nbr;
for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_routine, context);
}
// Запуск очереди устройств
zmq_device (ZMQ_QUEUE, clients, workers);
// Программа не достигнет этой точки, но все равно выполняет очистку
zmq_close (clients);
zmq_close (workers);
zmq_term (context);
return 0;
}
```
Все коды должны быть уже знакомы:* Сервер запускает группу worker-потоков, каждый из которых создает REP сокет и обрабатывает полученные запросы. Worker-потоки работают как однопоточные службы; единственное отличие заключается в использовании inproc вместо tcp протокола, а также в том, что они привязываются и соединяются в обратном порядке.
* Сервер создает ROUTER сокет для связи с клиентами, предоставляя внешний интерфейс на основе TCP протокола.
* Сервер создает DEALER сокет для связи с worker-потоками, используя внутренний интерфейс (inproc).
* Сервер запускает внутреннее устройство QUEUE, которое соединяет сокеты на обоих концах. Устройство QUEUE распределяет полученные запросы между worker-потоками и направляет ответы обратно к отправителю.
Важно отметить, что в некоторых языках программирования создание потоков может быть не слишком удобным. POSIX предоставляет библиотеку pthreads, но в Windows требуется использование различных API. Мы рассмотрим, как упаковать многопоточный API, в третьей главе.Пример "работы" состоит всего лишь в том, чтобы остановиться на одну секунду; в worker-потоках можно выполнять любые действия, включая общение с другими узлами. Путь сообщений выглядит следующим образом: REQ-ROUTER-Очередь-DEALER-REP.
```textdiagram
+------------+
| |
| Клиент |
| |
+------------+
| REQ |
\---+--------/
| ^
| |
"Привет" "Мир"
| |
/------------------|--=-|------------------\
| v | :
| /--------+---\ |
| | ROUTER | |
| +------------+ |
| | | |
| | Сервер | |
| | | |
| +------------+ |
| | | |
| | Очередь | |
| | устройство| |
| | | |
| +------------+ |
| | DEALER | |
| \------------/ |
| ^ |
| | |
| +-----------+-----------+ |
| | | | |
| v v v |
| /--------\ /--------\ /--------\ |
| | REP | | REP | | REP | |
| +--------+ +--------+ +--------+ |
| | | | | | | |
| | Рабочий| | Рабочий| | Рабочий| |
| | | | | | | |
| +--------+ +--------+ +--------+ |
| |
\------------------------------------------/
```
Рисунок # - Многопоточный сервер
```textdiagram
+------------+
| |
| Шаг 1 |
| |
+------------+
| PAIR |
``` \-----+------/
|
|
Готово!
|
v
/------------\
| PAIR |
+------------+
| |
Шаг 2
| |
+------------+
| PAIR |
\-----+------/
|
|
Готово!
|
v
/------------\
| PAIR |
+------------+
| |
Шаг 3
| |
+------------+ Рисунок # - Бег эстафеты
```
Мы используем сокеты PAIR и протокол inproc.
**mtrelay: Многопоточная эстафета на C**```c
//
// Многопоточная синхронизация
//
#include "zhelpers.h"
#include <pthread.h>
static void *
step1 (void *context) {
// Подключение к шагу 2, чтобы сообщить, что готов
void *xmitter = zmq_socket (context, ZMQ_PAIR);
zmq_connect (xmitter, "inproc://step2");
printf ("Шаг 1 готов, уведомление шага 2...\n");
s_send (xmitter, "Готов");
zmq_close (xmitter);
return NULL;
}
static void *
step2 (void *context) {
// Запуск шага 1 и привязка к inproc сокету
void *receiver = zmq_socket (context, ZMQ_PAIR);
zmq_bind (receiver, "inproc://step2");
pthread_t thread;
pthread_create (&thread, NULL, step1, context);
// Ожидание сигнала
char *string = s_recv (receiver);
free (string);
zmq_close (receiver);
// Подключение к шагу 3, чтобы сообщить, что готов
void *xmitter = zmq_socket (context, ZMQ_PAIR);
zmq_connect (xmitter, "inproc://step3");
printf ("Шаг 2 готов, уведомление шага 3...\n");
s_send (xmitter, "Готов");
zmq_close (xmitter);
return NULL;
}
int main (void)
{
void *context = zmq_init (1);
// Запуск шага 2 и привязка к inproc сокету
void *receiver = zmq_socket (context, ZMQ_PAIR);
zmq_bind (receiver, "inproc://step3");
pthread_t thread;
pthread_create (&thread, NULL, step2, context);
// Ожидание сигнала
char *string = s_recv (receiver);
free (string);
zmq_close (receiver);
printf ("Тест успешен!\n");
zmq_term (context);
return 0;
}
```Это типичный пример многопоточного программирования с использованием ZMQ:1. Два потока взаимодействуют через протокол inproc, используя один и тот же контекст;
1. Основной поток создает сокет, связывает его с конечной точкой inproc:// и затем запускает дочерний поток, передавая ему объект контекста;
1. Дочерний поток создает второй сокет, подключается к конечной точке inproc:// и отправляет сигнал готовности основному потоку.Важно отметить, что этот код не может быть расширен для координации между несколькими процессами. Если вы используете протокол inproc, вы можете создать только очень тесно связанные приложения. Этот метод можно использовать в случае, если требуется строго контролировать задержку времени. Для других приложений каждый поток может использовать один и тот же контекст, а протоколы ipc или tcp могут быть выбраны для этого. В этом случае вы сможете свободно разделить свое приложение на несколько процессов или даже на несколько компьютеров.
Это первый раз, когда мы используем сокеты типа PAIR. Почему мы используем PAIR? Другие типы сокетов также могут быть использованы, но они имеют некоторые недостатки, влияющие на коммуникацию между потоками:
* Вы можете использовать PUSH для отправителя и PULL для получателя, что может показаться подходящим, но важно помнить, что сокет PUSH распределяет нагрузку между несколькими получателями. Если вы случайно запустите два получателя, вы можете "потерять" половину сигналов. В то время как сокеты PAIR устанавливают соединение один-к-одному, обеспечивая эксклюзивность.* Вы можете использовать DEALER для отправителя и ROUTER для получателя. Однако сокет ROUTER добавляет адрес источника в начало сообщения, что может привести к тому, что нулевое сообщение станет множеством отдельных сообщений. Если вы не беспокоитесь об этом и не планируете повторно читать этот сокет, это может быть приемлемым решением. Однако, если вы хотите использовать этот сокет для получения реальных данных, вы обнаружите, что сообщения, предоставляемые ROUTER, будут некорректными. Что касается сокета DEALER, он также имеет механизм балансировки нагрузки, что делает его подверженным тем же рискам, что и сокет PUSH.* Вы можете использовать PUB для отправителя и SUB для получателя. Это позволяет отправлять сообщения в первозданном виде, и сокет PUB не распределяет нагрузку. Однако вам потребуется установить пустое значение подписки для сокета SUB (чтобы принимать все сообщения); кроме того, если сокет SUB не успел установить соединение с сокетом PUB, сообщения могут быть потеряны.
Исходя из всего вышеуказанного, использование сокетов типа PAIR для координации между потоками является наиболее подходящим решением.
### Координация узлов
Когда вы хотите координировать узлы, сокеты PAIR не являются лучшим выбором, что отличает потоки от узлов. В общем, узлы могут входить и выходить свободно, в то время как потоки остаются более стабильными. При использовании сокетов PAIR, если удалённый узел отключается и затем снова подключается, сокеты PAIR этого не учитывают. Второе различие заключается в том, что количество потоков обычно фиксировано, а количество узлов часто меняется. Давайте рассмотрим, как следует координировать узлы в модели метеорологической информации, чтобы гарантировать, что клиенты не потеряют первые сообщения.
Вот логика выполнения программы:* Публикация знает ожидаемое количество подписчиков, которое можно произвольно задать;
* После запуска публикация ожидает соединения всех подписчиков, то есть координацию узлов. Каждый подписчик использует другой сокет для уведомления публикации о своей готовности;
* Когда все подписчики готовы, публикация начинает отправлять сообщения.Здесь мы будем использовать сокеты REQ-REP для синхронизации публикации и подписчиков. Код публикации представлен ниже:
**syncpub: Синхронизированная публикация на C**
```c
//
// Публикация - синхронизированная версия
//
#include "zhelpers.h"
// Ожидание 10 подписчиков
#define SUBSCRIBERS_EXPECTED 10
int main (void)
{
void *context = zmq_init (1);
// Сокет для связи с клиентами
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5561");
// Сокет для получения сигналов
void *syncservice = zmq_socket (context, ZMQ_REP);
zmq_bind (syncservice, "tcp://*:5562");
// Получение сигналов готовности от подписчиков
printf ("Ожидание готовности подписчиков\n");
int subscribers = 0;
while (subscribers < SUBSCRIBERS_EXPECTED) {
// - Ожидание сигнала готовности
char *string = s_recv (syncservice);
free (string);
// - Отправка ответа
s_send (syncservice, "");
subscribers++;
}
// Начало передачи 1 000 000 данных
printf ("Начало широковещательной передачи\n");
int update_nbr;
for (update_nbr = 0; update_nbr < 1000000; update_nbr++)
s_send (publisher, "Ревень");
s_send (publisher, "КОНЕЦ");
}
``````c
zmq_close (publisher);
zmq_close (syncservice);
zmq_term (context);
return 0;
}
``````textdiagram
+----------------+
| |
| Publisher |
| |
+--------+-------+
| PUB | REP |
\---+----+-----+-/
| ^ |
| | |
| (1) |
[3] | |
| | (2)
| | |
v | v
/--------+-+-----\
| SUB | REQ |
+--------+-------+
| |
| Subscriber |
| |
+----------------+
``````
Рисунок # - Синхронизация публикации/подписки## Подписчик синхронизации на C
```c
//
// Подписчик - синхронный класс
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 1. Подключение SUB сокета
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5561");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);
// ZMQ работает слишком быстро, поэтому мы немного задерживаемся...
sleep (1);
// 2. Синхронизация с издателем
void *syncclient = zmq_socket (context, ZMQ_REQ);
zmq_connect (syncclient, "tcp://localhost:5562");
// - Отправка запроса
s_send (syncclient, "");
// - Ожидание ответа
char *string = s_recv (syncclient);
free (string);
// 3. Обработка сообщений
int update_nbr = 0;
while (1) {
char *string = s_recv (subscriber);
if (strcmp (string, "END") == 0) {
free (string);
break;
}
free (string);
update_nbr++;
}
printf ("Получено %d сообщений\n", update_nbr);
zmq_close (subscriber);
zmq_close (syncclient);
zmq_term (context);
return 0;
}
```
Следующий shell-скрипт запускает 10 подписчиков и одного издателя:
```sh
echo "Запуск подписчиков..."
for a in 1 2 3 4 5 6 7 8 9 10; do
syncsub &
done
echo "Запуск издателя..."
syncpub
```
Результат:
```
Запуск подписчиков...
Запуск издателя...
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
Получено 1000000 сообщений
```Когда REQ-REP запрос завершён, мы всё ещё не можем гарантировать, что SUB сокет успешно подключён. Если не использовать протокол inproc, порядок внешних соединений может быть непредсказуемым. Поэтому в примере программы используется метод задержки sleep(1), после которого отправляется запрос на синхронизацию.Более надежная модель может выглядеть следующим образом:
* Издатель открывает PUB сокет и начинает отправлять сообщение Hello (не данные);
* Подписчик подключается к SUB сокету и, получив сообщение Hello, использует REQ-REP сокет для синхронизации;
* Когда издатель получает все ответы от подписчиков, он начинает отправлять реальные данные.
```### Нулевое копирование
В первой главе мы упомянули, что нулевое копирование может быть опасным, но это было просто предупреждение. Поскольку вы дошли до этого места, значит, у вас достаточно знаний, чтобы использовать нулевое копирование. Однако стоит помнить, что все пути ведут в ад, и преждевременная оптимизация программного кода может быть вредной. Другими словами, если вы не умеете правильно использовать нулевое копирование, это может сделать архитектуру программы хуже.
API, предоставляемый ZMQ, позволяет вам отправлять и получать сообщения без необходимости заботиться о кэшировании. Поскольку сообщения отправляются и принимаются ZMQ в фоновом режиме, использование нулевого копирования требует выполнения некоторых дополнительных действий.Для использования нулевого копирования используйте функцию `zmq_msg_init_data()` для создания сообщения, которое указывает на уже выделенный участок памяти, а затем передайте это сообщение функции `zmq_send()`. При создании сообщения вам также потребуется предоставить функцию для освобождения содержимого сообщения, которую ZMQ вызовет после завершения отправки сообщения. Вот простой пример, где мы предполагаем, что выделенный участок памяти составляет 1000 байт:
```c
void *data = malloc(1000);
int (*free_fn)(void *, void *) = &free;
zmq_msg_t msg;
zmq_msg_init_data(&msg, data, 1000, free_fn, NULL);
zmq_send(socket, &msg, 0);
``````c
void my_free (void *data, void *hint) {
free (data);
}
// Отправка сообщения из буфера, который мы выделяем, а ZMQ освободит за нас
zmq_msg_t message;
zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_send (socket, &message, 0);
```
При получении сообщений нулевое копирование использовать нельзя: ZMQ помещает полученное сообщение в область памяти для чтения, но не записывает его в заранее определённую область памяти программы.
Многосекционные сообщения ZMQ хорошо поддерживают нулевое копирование. В традиционных системах передачи сообщений вам приходится объединять содержимое различных кэшей в один кэш перед отправкой. Но ZMQ отправляет содержимое из различных областей памяти как отдельные кадры сообщения. И внутри ZMQ каждое сообщение обрабатывается как единое целое, что делает процесс очень эффективным.
### Одноразовые сокеты и постоянные сокеты
В традиционном сетевом программировании сокет является объектом API, жизненный цикл которого не превышает жизненный цикл программы. Однако если внимательно рассмотреть сокет, он занимает определённый ресурс — кэш. Это заставляет разработчиков ZMQ задуматься: можно ли сохранить эти кэши сокетов при аварийном завершении программы и восстановить их позже?Эта возможность может оказаться полезной, хотя она не решает все проблемы, но хотя бы частично компенсирует потери, особенно в случае многопубликационной подписной модели. Давайте обсудим это подробнее. Здесь есть два сокета, весело передающих метеорологическую информацию:```textdiagram
+-----------+
| |
| Отправитель |
| |
+-----------+
| Сокет |
\-----------/
+---------+
+---------+
+---------+ 0MQ буфер передачи
+---------+
+---------+
|
|
v
+---------+
+---------+ Буферы I/O сети
+---------+
|
|
v
+---------+
+---------+
+---------+ 0MQ буфер приема
+---------+
+---------+
/-----------\
| Сокет |
+-----------+
| |
| Получатель |
| |
+-----------+
Рисунок # - Отправитель доводит получателя до отчаяния
```
Если получатель (SUB, PULL, REQ) указывает идентификатор сокета, когда они отключаются от сети, отправитель (PUB, PUSH, REP) будет кэшировать информацию для них до достижения порогового значения (HWM). В данном случае отправителю не требуется указывать идентификатор сокета.
Важно отметить, что кэширование сокетов в ZMQ невидимо для приложения, как и кэширование TCP.
До настоящего времени мы использовали временные сокеты. Для преобразования временного сокета в постоянный необходимо задать ему идентификатор сокета. У всех сокетов ZMQ есть идентификатор, но он генерируется автоматически UUID.
Внутри ZMQ, когда два сокета соединяются, они обмениваются своими идентификаторами. Если один из них не имеет ID, он создает его самостоятельно для идентификации другого:
- Отправитель
- Получатель
- SUB
- PULL
- REQ
- PUB
- PUSH
- REP
- HWM
- UUID```textdiagram
+-----------+
| |
| Отправитель |
| |
+-----------+
| Сокет |
\-----------/
^ "Люси! Рад снова тебя видеть..."
|
|
| "Мое имя Люси"
/-----+-----\
| Сокет |
+-----------+
| |
| Получатель |
| |
+-----------+
Рисунок # - Устойчивый сокет
```
Но сокет также может сообщить своему партнеру свой идентификатор, тогда при повторном подключении они смогут узнать друг друга.```
+-----------+
| |
| Отправитель|
| |
+-----------+
| Сокет |
\-----------/
^ "Хорошо, я буду называть тебя Лув"
|
|
| "Не скажу тебе свое имя!"
/-----+-----\
| Сокет |
+-----------+
| |
| Получатель|
| |
+-----------+
``` Рисунок # - Временный сокет
Ниже приведён фрагмент кода, который позволяет установить идентификатор для сокета, тем самым создавая устойчивый сокет:
```c
zmq_setsockopt (socket, ZMQ_IDENTITY, "Lucy", 4);
```
Ещё несколько замечаний по поводу идентификаторов сокетов:
```* Если вы хотите установить идентификатор для сокета, это должно быть сделано до соединения или привязки к конечной точке;
* Получатель выбирает, использовать ли идентификатор сокета, аналогично тому, как cookies используются в HTTP-веб-приложениях; выбор конкретного cookie остаётся за сервером;
* Идентификатор сокета представляет собой двоичную строку; идентификатор сокета, начинающийся с нулевого байта, зарезервирован для использования ZMQ;
* Не следует использовать одинаковые идентификаторы для нескольких сокетов, так как если идентификатор уже занят, сокет не сможет подключиться к другому сокету;
* Не используйте случайные идентификаторы сокетов, так как это может привести к созданию большого количества устойчивых сокетов, что в конечном итоге может привести к отказу узла;
* Если вам требуется получить идентификатор сокета противоположной стороны, ROUTER-сокет автоматически выполнит эту задачу, а для других типов сокетов вам потребуется отправить идентификатор как часть сообщения;
* В целом использование устойчивых сокетов не является хорошей идеей, так как это может привести к увеличению путаницы для отправителя и уязвимости архитектуры. Если бы мы могли переработать ZMQ, вероятно, мы бы убрали возможность явного указания идентификатора сокета.Дополнительную информацию можно найти в разделе ZMQ_IDENTITY функции zmq_setsockopt(). Обратите внимание, что этот метод позволяет получать идентификаторы сокетов в программе, но не позволяет получать идентификаторы сокетов противоположной стороны.
### Публикация-подписка: сообщение-обёртка
Мы уже кратко рассмотрели многофреймовые сообщения, теперь давайте посмотрим на их типичное применение — сообщение-обёртка. Обёртка используется для указания источника сообщения без изменения его содержимого.
В модели публикация-подписка обёртка содержит подписочные ключи, которые используются для фильтрации сообщений, которые не нужно принимать.
Если вы хотите использовать обёртку для публикации-подписки, вам придётся самостоятельно её создать и установить. Этот шаг является необязательным, и мы не использовали его в наших предыдущих примерах. Использование обёртки в модели публикация-подписка может быть сложным, но это всё же необходимо для реальных приложений, поскольку обёртка и сообщение действительно представляют собой две отдельные части данных.
Это пример сообщения с обёрткой в модели публикация-подписка:
```textdiagram
+-------------+
Frame 1 | Подписочный ключ |
+-------------+
Frame 2 | Данные |
+-------------+
```
Рисунок # - Оболочка пуб-саб с отдельным ключом---
Помните, что в модели публикация-подписка (pub-sub) прием сообщений осуществляется на основе подписки, то есть на основе префикса сообщения. Вставка этого префикса в отдельную строку позволяет сделать соответствие очень явным. Это означает, что нет возможности для приложения получить только часть данных.
Вот пример простой оболочки сообщения pub-sub. Публикатор отправляет два типа сообщений: A и B, и оболочка указывает тип сообщения:
**psenvpub: Публикатор оболочки pub-sub на C**
```c
//
// Публикация-подписка сообщение оболочка - публикатор
// Функция s_sendmore() также предоставлена zhelpers.h
//
#include "zhelpers.h"
int main (void)
{
// Подготовка контекста и сокета PUB
void *context = zmq_init (1);
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5563");
while (1) {
// Отправка двух сообщений, типа A и типа B
s_sendmore (publisher, "A");
s_send (publisher, "Это сообщение нам не интересно");
s_sendmore (publisher, "B");
s_send (publisher, "Это сообщение нам интересно");
sleep (1);
}
// Правильное завершение работы
zmq_close (publisher);
zmq_term (context);
return 0;
}
```
Предположим, что подписчик заинтересован только в сообщениях типа B:
**psenvsub: Подписчик оболочки pub-sub на C**
```c
//
// Публикация-подписка сообщение оболочка - подписчик
//
#include "zhelpers.h"
int main (void)
{
// Подготовка контекста и сокета SUB
void *context = zmq_init (1);
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5563");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "B", 1);
``` while (1) {
// Чтение оболочки сообщения
char *address = s_recv(subscriber);
// Чтение содержимого сообщения
char *contents = s_recv(subscriber);
printf("[%s] %s\n", address, contents);
free(address);
free(contents);
}
// Правильное завершение работы
zmq_close(subscriber);
zmq_term(context);
return 0;
}
```
При выполнении этих программ подписчик будет выводить следующую информацию:
```
[B] Это сообщение нам интересно
[B] Это сообщение нам интересно
[B] Это сообщение нам интересно
[B] Это сообщение нам интересно
...
```
Этот пример показывает, что подписчик игнорирует неподписанные сообщения и получает полное многофреймовое сообщение — вы не получите только часть сообщения. Если вы подписаны на несколько сокетов и хотите знать идентификаторы этих сокетов, чтобы отправлять сообщения им через другой сокет (что довольно распространено), вы можете заставить публи셔 создать сообщение, содержащее три фрейма:
```textdiagram
+-------------+
Frame 1 | Ключ | Идентификатор подписки
+-------------+
Frame 2 | Идентификатор| Адрес публишера
+-------------+
Frame 3 | Данные | Само тело сообщения
+-------------+
Рисунок # - Оболочка пуб-саб с адресом отправителя
```
### (Полу)Персистентные подписчики и пороговое значение (HWM)Все типы сокетов могут использовать идентификаторы. Если вы используете PUB и SUB сокеты, где SUB сокет сам объявляет свой идентификатор, то при отключении SUB сокета, PUB будет хранить сообщения для отправки SUB-сокету.Эта механика имеет как положительные, так и отрицательные стороны. Положительной стороной является то, что публишер временно хранит эти сообщения, а затем отправляет их после того, как SUB сокет восстановит соединение; отрицательной стороной является то, что это может привести к исчерпанию памяти публишера.
**Если вы используете персистентный SUB сокет (то есть установили идентификатор для SUB), вам следует использовать пороговое значение (HWM), чтобы защитить сокет публишера от накопления сообщений в его очереди.** Пороговое значение публишера влияет на всех подписчиков отдельно.
Мы можем запустить пример, чтобы продемонстрировать это, используя wuclient и wuserver из первой главы. В wuclient добавьте следующую строку перед подключением сокета:
```c
zmq_setsockopt (subscriber, ZMQ_IDENTITY, "Hello", 5);
```
Скомпилируйте и запустите оба этих примера, все кажется нормальным. Однако, если вы посмотрите на использование памяти публишера, вы заметите, что оно постепенно увеличивается по мере отключения подписчиков. Если вы перезапустите подписчика, вы заметите, что использование памяти больше не увеличивается. Как только подписчик снова отключится, использование памяти снова начнет расти. Со временем это приведет к исчерпанию системных ресурсов.Давайте сначала рассмотрим, как устанавливать пороговое значение, а затем — как правильно его устанавливать. Ниже представлен код для публишера и подписчика, использующих "механизм координации узлов", о котором упоминалось выше. Публишер отправляет сообщение каждую секунду, и вы можете отключить подписчика, перезапустить его и посмотреть, что произойдет.Вот код для публишера:
**durapub: Персистентный публишер на C**
```c
//
// Публикатор - поддержание соединения с устойчивыми подписчиками
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Подписчики отправляют сообщения о готовности
void *sync = zmq_socket (context, ZMQ_PULL);
zmq_bind (sync, "tcp://*:5564");
// Используется этот сокет для публикации сообщений
void *publisher = zmq_socket (context, ZMQ_PUB);
zmq_bind (publisher, "tcp://*:5565");
// Ждем синхронизационное сообщение
char *string = s_recv (sync);
free (string);
// Отправляем 10 сообщений, каждое через секунду
int update_nbr;
for (update_nbr = 0; update_nbr < 10; update_nbr++) {
char string [20];
sprintf (string, "Обновление %d", update_nbr);
s_send (publisher, string);
sleep (1);
}
s_send (publisher, "КОНЕЦ");
zmq_close (sync);
zmq_close (publisher);
zmq_term (context);
return 0;
}
```
Вот код подписчика:
**durasub: Устойчивый подписчик на C**
```c
//
// Устойчивый подписчик
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Подключаем SUB сокет
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_setsockopt (subscriber, ZMQ_IDENTITY, "Привет", 6);
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0);
zmq_connect (subscriber, "tcp://localhost:5565");
// Отправляем синхронизационное сообщение
void *sync = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sync, "tcp://localhost:5564");
s_send (sync, "");
// Получаем обновления и завершаем работу по команде
while (1) {
char *string = s_recv (subscriber);
printf ("%s\n", string);
if (strcmp (string, "END") == 0) {
free (string);
break;
}
free (string);
}
zmq_close (sync);
zmq_close (subscriber);
zmq_term (context);
return 0;
}
```Запустите вышеуказанный код, откройте издателя и подписчика в разных окнах. Когда подписчик получает одно или два сообщения, нажмите Ctrl+C для прерывания, а затем перезапустите его, чтобы увидеть результат:
```
$ durasub
Обновление 0
Обновление 1
Обновление 2
^C
$ durasub
Обновление 3
Обновление 4
Обновление 5
Обновление 6
Обновление 7
^C
$ durasub
Обновление 8
Обновление 9
END
```
Как видно, единственное отличие подписчика заключается в установке идентификатора для сокета, что заставляет издателя кэшировать сообщения до тех пор, пока соединение не будет восстановлено. Установка идентификатора позволяет преобразовать временный сокет в устойчивый. В реальной жизни вам следует аккуратно называть свои сокеты, используя конфигурационные файлы или генерируя UUID и сохраняя его.
Когда мы задаем порог для PUB сокета, издатель начинает кэшировать определенное количество сообщений, а остальные сообщения отбрасываются. Давайте установим порог равным 2 и посмотрим, что произойдет:
```c
uint64_t hwm = bk;
zmq_setsockopt (publisher, ZMQ_HWM, &hwm, sizeof (hwm));
```
Запустите программу, прервите подписчика и подождите некоторое время перед перезапуском. Вы увидите следующий результат:
```
$ durasub
Обновление 0
Обновление 1
^C
$ durasub
Обновление 2
Обновление 3
Обновление 7
Обновление 8
Обновление 9
END
```
Исправлено:
- "bk" заменено на "2".Обратите внимание, что издатель сохранил только два сообщения (2 и 3). Порог заставляет ZMQ отбрасывать сообщения, выходящие за пределы очереди.
Кратко говоря, если вы хотите использовать устойчивого подписчика, вам необходимо задать порог для издателя, иначе сервер может столкнуться с проблемами из-за переполнения памяти. Однако существует ещё один способ. ZMQ предоставляет механизм, известный как обмен (swap), который представляет собой файл на диске, используемый для хранения сообщений, выходящих за пределы очереди. Запуск этого механизма прост.
```c
// Указывает размер области обмена, единица измерения: байты.
uint64_t swap = 25000000;
zmq_setsockopt (publisher, ZMQ_SWAP, &swap, sizeof (swap));
```
Можно объединить вышеупомянутые методы, чтобы создать публикацию, которая будет принимать устойчивые сокеты и при этом не будет переполняться памятью:
**durapub2: Устойчивый, но циничный публикатор на C**
```c
//
// Публикатор — подключается к устойчивому подписчику
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// Подписчик сообщит нам, что он готов
void *sync = zmq_socket (context, ZMQ_PULL);
zmq_bind (sync, "tcp://*:5564");
// Используем этот сокет для отправки сообщений
void *publisher = zmq_socket (context, ZMQ_PUB);
// Предотвращаем переполнение сообщений из-за медленного устойчивого подписчика
uint64_t hwm = 1;
zmq_setsockopt (publisher, ZMQ_HWM, &hwm, sizeof (hwm));
}
``` // Устанавливаем размер области обмена для использования всеми подписчиками
uint64_t swap = 25000000;
zmq_setsockopt (publisher, ZMQ_SWAP, &swap, sizeof (swap));
zmq_bind (publisher, "tcp://*:5565");
// Ждем синхронизационного сообщения
char *string = s_recv (sync);
free (string);
// Отправляем 10 сообщений, одно каждую секунду
int update_nbr;
for (update_nbr = 0; update_nbr < 10; update_nbr++) {
char string [20];
sprintf (string, "Обновление %d", update_nbr);
s_send (publisher, string);
sleep (1);
}
s_send (publisher, "КОНЕЦ");
zmq_close (sync);
zmq_close (publisher);
zmq_term (context);
return 0;
}
```* **Необходимо установить пороговое значение для PUB сокета**, которое можно определить на основе максимального количества подписчиков, доступной памяти для очередей и среднего размера сообщения. Например, если вы ожидаете 5000 подписчиков, имеете 1 ГБ памяти для использования и средний размер сообщения составляет около 200 байт, то разумное значение порога будет равно 1,000,000,000 / 200 / 5,000 = 1,000. Если вы не хотите потерять сообщения у медленных или зависших подписчиков, вы можете настроить обменник для хранения сообщений в часы пик. Размер обменника можно рассчитать на основе количества подписчиков, отношения пиков сообщений, среднего размера сообщения, временного хранения и других факторов. Например, если вы ожидаете 5000 подписчиков, размер сообщения около 200 байт, а количество сообщений в секунду составляет 100 000, вам потребуется 100 МБ места на диске для хранения сообщений каждую секунду. В сумме это составит 6 ГБ места на диске, которое должно быть достаточно быстрым (что выходит за рамки данного руководства). О постоянных подписчиках:* Данные могут быть потеряны в зависимости от частоты публикации сообщений, размера сетевого кэша, протокола связи и т.д. Постоянные подписчики надежнее временных сокетов, но это не означает, что они идеальны.
* Обменные файлы не восстанавливаются, поэтому при гибели публикатора или агента данные в обменной области будут потеряны.
О пороговых значениях:
* Этот параметр влияет как на очередь отправки, так и на очередь приема сокета. Конечно, PUB и PUSH не имеют очередей приема, а SUB, PULL, REQ и REP не имеют очередей отправки. В случае с DEALER, ROUTER и PAIR сокетами, они имеют как очереди отправки, так и очереди приема.
* Когда сокет достигает порогового значения, ZMQ может заблокировать его или просто отбросить сообщение.
* При использовании протокола inproc отправитель и получатель используют одну и ту же очередь кэширования, поэтому реальный порог равен сумме пороговых значений обоих сокетов. Если один из сокетов не имеет установленного порогового значения, то он не будет иметь ограничений по кэшированию.
### Вот то, что вы хотели!
ZMQ — это как набор конструктора, если у вас есть достаточно воображения, вы можете использовать его для создания любой сети.Эта высокопроизводительная и гибкая архитектура обязательно расширят ваш кругозор. Это не ново для ZMQ; давно существуют такие [потоково-ориентированные языки программирования](http://en.wikipedia.org/wiki/Flow-based_programming), как [Erlang](http://www.erlang.org/), которые уже делают это. Однако ZMQ предоставляет более дружественный и удобный интерфейс.Как отметил [Gonzo Diethelm](http://permalink.gmane.org/gmane.network.zeromq.devel/2145): "Я бы хотел сказать одним предложением: 'Если бы ZMQ не существовал, его следовало бы изобрести.' Как человек с многолетним опытом работы в этой области, ZMQ очень меня трогает. Я могу только сказать: 'Это то, что я хочу!'".

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/andwp-zguide-cn.git
git@api.gitlife.ru:oschina-mirror/andwp-zguide-cn.git
oschina-mirror
andwp-zguide-cn
andwp-zguide-cn
master