1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/andwp-zguide-cn

В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
chapter3.txt 170 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 23.06.2025 22:09 06c6f61
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554
.- vim: set filetype=markdown:
.set GIT=https://github.com/anjuke/zguide-cn
## Глава 3. Продвинутые запрос-ответные модели
Во второй главе мы познакомились с основами использования ØMQ, разрабатывая ряд небольших приложений, каждый из которых вводил новые возможности. В этой главе мы продолжим этот подход, исследуя более продвинутые запрос-ответные модели, построенные на основе ØMQ.
В данной главе будут рассмотрены следующие темы:
* Создание и использование сообщений-оболочек в запрос-ответной модели
* Использование сокетов REQ, REP, DEALER и ROUTER
* Указание целей ответа с помощью идентификаторов
* Использование пользовательских дискретных маршрутизационных моделей
* Использование пользовательских моделей маршрутизации LRU
* Создание высокоуровневых классов для обработки сообщений
* Создание базового прокси-сервера запрос-ответ
* Корректное наименование сокетов
* Моделирование кластера клиент-работник
* Создание масштабируемого кластера запрос-ответ
* Использование сокетов-каналов для мониторинга потоков
### Оболочки запрос-ответ
В запрос-ответной модели оболочка хранит информацию о местоположении ответа. Именно поэтому ØMQ-сети, будучи бессостоятельными, всё же способны выполнять запрос-ответные операции.В обычном использовании вам не обязательно знать принцип работы оболочек запрос-ответ. При использовании REQ и REP ØMQ автоматически управляет оболочками сообщений. В устройстве (device) следующей главы вам также потребуется только читать и записывать все данные. ØMQ использует многосегментные сообщения для хранения оболочек, поэтому они копируются вместе с сообщениями при их копировании.Однако перед использованием продвинутых запрос-ответных моделей важно понять механизм оболочек. Вот как работает механизм оболочек в ROUTER:
* При чтении сообщения из ROUTER ØMQ оборачивает его в оболочку, указывающую источник сообщения.
* При записи сообщения в ROUTER (включая оболочку) ØMQ распаковывает оболочку и отправляет сообщение соответствующему получателю.
Если вы запишете сообщение, полученное из ROUTER A (включая оболочку), в ROUTER B (то есть отправите его DEALER, который подключен к ROUTER), то при получении этого сообщения из ROUTER B оно будет содержать две оболочки.
Основная роль механизма оболочек — это обеспечение ROUTER информации о том, как отправлять сообщения правильному получателю. Вам следует сохранять эти оболочки в программе. Вспомните REP-сокет, который последовательно распаковывает оболочки полученных сообщений и передаёт само сообщение приложению. При отправке сообщения он снова оборачивает его в оболочку и отправляет ROUTER, обеспечивая доставку сообщения правильному получателю. Можно использовать вышеупомянутые принципы для создания устройства ROUTER-DEALER:
```
[REQ] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [REP]
[REQ] <--> [ROUTER--DEALER] <--> [ROUTER--DEALER] <--> [REP]
... и т.д.
```
Когда вы используете сокет REQ для подключения к сокету ROUTER и отправляете запрос, вы получите сообщение следующего вида из ROUTER:```textdiagram
+---------------+
Frame 1 | Reply address | <----- Envelope
+---+-----------+
Frame 2 | | <------ Empty message part
+---+-------------------------------------+
Frame 3 | Data |
+-----------------------------------------+
Figure # - Single-hop request-reply envelope
```
* Третий кадр содержит сообщение, отправленное приложением через сокет REQ;
* Второй кадр с пустым сообщением добавляется сокетом REQ перед отправкой;
* Первый кадр, или envelop, добавляется сокетом ROUTER и содержит адрес источника сообщения.
Если мы будем передавать это сообщение по цепочке устройств, в конечном итоге мы получим сообщение с несколькими envelops. Последний envelop будет находиться в верхней части сообщения.
```textdiagram
(Следующий envelop будет здесь)
+---------------+
Frame 1 | Reply address | <----- Envelope (ROUTER)
+---------------+
Frame 2 | Reply address | <----- Envelope (ROUTER)
+---------------+
Frame 3 | Reply address | <----- Envelope (ROUTER)
+---+-----------+
Frame 4 | | <------ Empty message part (REQ)
+---+-------------------------------------+
Frame 5 | Data |
+-----------------------------------------+
Figure # - Multihop request-reply envelope
```
Далее будут подробно рассмотрены четыре типа сокетов, используемых нами в режиме запрос-ответ:
* DEALER представляет собой балансировщик нагрузки, который распределяет сообщения между подключенными узлами и использует механизм очередей для обработки принятых сообщений. DEALER действует как комбинация PUSH и PULL.
** Сокет REQ добавляет пустой кадр в начало сообщения при отправке и удаляет его при получении. На самом деле, REQ основан на DEALER, но REQ может продолжать работу только после отправки сообщения и получения ответа.
* Сокет ROUTER добавляет envelop в начало сообщения при получении, указывая источник сообщения. При отправке он использует этот envelop для определения узла, который получит сообщение. REP сохраняет все данные до первого пустого кадра и передает оригинальную информацию приложению. При отправке сообщений REP использует сохраненную информацию для обёртки ответного сообщения. REP фактически построен поверх ROUTER, но, как и REQ, должен завершить операции отправки и получения, чтобы продолжить работу.
REP требует, чтобы envelop в сообщении заканчивался пустым кадром, поэтому если вы не используете REQ для отправки сообщений, вам придётся самостоятельно добавить этот пустой кадр в сообщение.
Вы, конечно, спросите, как ROUTER идентифицирует источник сообщения? Ответ, конечно, через идентификатор сокета. Мы уже упоминали, что сокет может быть временным, и соединённый с ним сокет (например, ROUTER) создаёт идентификатор для него. Сокет также может явно определить свой идентификатор, чтобы другие сокеты могли использовать его напрямую.
Это временный сокет, и ROUTER создаёт UUID для идентификации источника сообщения.
```textdiagram
+-----------+
| |
| Клиент |
| |
+-----------+ +---------+
| REQ | | Данные | Клиент отправляет это
\-----+-----/ +---------+
|
| "Моя идентичность пустая"
v
/-----------\ +---------+
| ROUTER | | UUID | ROUTER придумывает UUID для использования в качестве адреса ответа
+-----------+ +-+-------+
| | | |
| Сервис | +-+-------+
| | | Данные |
+-----------+ +---------+
```
* Сокет ROUTER добавляет envelop в начало сообщения при получении, указывая источник сообщения. При отправке он использует этот envelop для определения узла, который получит сообщение. REP сохраняет все данные до первого пустого кадра и передает оригинальную информацию приложению. При отправке сообщений REP использует сохраненную информацию для обёртки ответного сообщения. REP фактически построен поверх ROUTER, но, как и REQ, должен завершить операции отправки и получения, чтобы продолжить работу.```
Рисунок # - ROUTER придумывает UUID для временных сокетов
Это временное соединение, идентификатор которого генерируется автоматически.
```textdiagram
+-----------+
| | zmq_setsockopt (socket,
| Клиент | ZMQ_IDENTITY, "Lucy", cq);
| |
+-----------+ +---------+
| REQ | | Данные | Клиент отправляет это
\-----+-----/ +---------+
|
| "Привет, меня зовут Lucy"
v
/-----------\ +---------+
| ROUTER | | 'Lucy' | ROUTER использует идентичность клиента как адрес ответа
+-----------+ +-+-------+
| | | |
| Сервис | +-+-------+
| | | Данные |
+-----------+ +---------+
```
Рисунок # - ROUTER использует идентификатор, если знает его
Ниже приведён пример программы, которая демонстрирует, как использовать идентификаторы сокетов в режиме запрос-ответ. Программа выводит сообщения, полученные ROUTER-сокетом от двух REP-сокетов: один из которых не имеет явного идентификатора, а другой имеет идентификатор "Hello".
**identity.c**
```c
//
// Ниже приведена программа, демонстрирующая использование идентификаторов сокетов в режиме запрос-ответ.
// Важно отметить, что функции, начинающиеся с s_, определены в zhelpers.h.
// Нам не требуется повторно реализовывать эти функции.
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
void *sink = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (sink, "inproc://example");
``` // Первый сокет автоматически получает идентификатор от 0MQ
void *anonymous = zmq_socket (context, ZMQ_REQ);
zmq_connect (anonymous, "inproc://example");
s_send (anonymous, "ROUTER использует сгенерированный UUID");
s_dump (sink); // Второй сокет имеет явно заданный идентификатор
void *identified = zmq_socket(context, ZMQ_REQ);
zmq_setsockopt(identified, ZMK_IDENTITY, "Hello", 5);
zmq_connect(identified, "inproc://example");
s_send(identified, "ROUTER сокет использует идентификатор REQ сокета");
s_dump(sink);
zmq_close(sink);
zmq_close(anonymous);
zmq_close(identified);
zmq_term(context);
return 0;
}
```
Выходные данные:
```
----------------------------------------
[017] 00314F043F46C441E28DD0AC54BE8DA727
[000]
[026] ROUTER использует сгенерированный UUID
----------------------------------------
[005] Hello
[000]
[038] ROUTER сокет использует идентификатор REQ сокета
```
### Пользовательские маршруты запрос-ответ
Мы уже видели, как ROUTER-сокет использует обёртку для правильной маршрутизации сообщений. Давайте рассмотрим это ещё раз, но теперь с точки зрения ROUTER: он использует определённый формат обёртки для маршрутизации сообщений к правильному адресату.
Таким образом, поведение ROUTER полностью контролируемо. Перед тем как углубиться в эту тему, давайте ближе познакомимся с REQ и REP сокетами, присвоив им некоторые живые роли:* REQ — это "мамин" сокет, который не будет слушать других, но будет постоянно задавать вопросы, требуя ответа. REQ строго синхронен и всегда находится на стороне запроса;
* REP — это "папин" сокет, который будет отвечать на вопросы, но не будет инициировать диалог. REP также строго синхронен и всегда находится на стороне ответа.
```О "мамином" сокете можно сказать, что он, как в детстве, позволяет общаться только тогда, когда сама начнёт разговор. Мама не так открыта, как папа, и не принимает двусмысленных ответов, как DEALER сокет. Поэтому, чтобы общаться с REQ сокетом, нужно ждать его запроса, после которого он будет ждать ответа, независимо от того, сколько времени это займет.```"Папин" сокет производит впечатление силы и холодности. Он выполняет одну задачу: независимо от вопроса, всегда даёт точный ответ. Не стоит ожидать, что REP сокет начнёт диалог сам или передаст ваш разговор другому, он этого не сделает.
Обычно мы считаем, что запрос-ответ модель обязательно должна быть симметричной, но на самом деле этот процесс может быть асинхронным. Нам достаточно знать адрес нужного нам узла, чтобы отправлять сообщения асинхронно через ROUTER сокет. ROUTER — это единственный сокет в ZMQ, который может определить источник сообщения.
Давайте сделаем небольшой обзор маршрутизации в запрос-ответ модели:
* Для временных сокетов ROUTER генерирует уникальный идентификатор UUID, поэтому сообщения, полученные из ROUTER, содержат этот идентификатор;
* Для постоянных сокетов можно задать свой идентификатор, который ROUTER помещает в сообщение;
* Узлы с явно заданным идентификатором могут подключаться к другим типам сокетов;
* Узлы могут заранее узнать идентификаторы других узлов через конфигурационные файлы и т.д., и адаптироваться к этому.
Минимум три способа существуют для подключения к ROUTER:
* ROUTER-DEALER
* ROUTER-REQ
* ROUTER-REP
В каждом из этих режимов мы можем полностью контролировать маршрутизацию сообщений, но разные режимы имеют свои специфические применения и потоки сообщений, которые мы будем объяснять по порядку.Самостоятельная маршрутизация также имеет свои особенности:
* Самостоятельная маршрутизация позволяет узлам контролировать направление сообщений, что противоречит правилам ØMQ. Единственной причиной использования самостоятельной маршрутизации является то, что ØMQ не предоставляет больше алгоритмов маршрутизации;
* Будущие версии ØMQ могут включать некоторые из наших самостоятельных маршрутизаций, что может сделать наш код несовместимым с новыми версиями ØMQ или избыточным;
* Встроенные механизмы маршрутизации являются расширяемыми и дружественными к устройствам, но самостоятельная маршрутизация требует решения этих проблем самостоятельно.
Таким образом, стоимость самостоятельной маршрутизации довольно высока, и чаще всего её следует оставить за ØMQ. Однако, поскольку мы уже дошли до этой точки, давайте углубимся ещё глубже!
### ROUTER-DEALER маршрутизация
ROUTER-DEALER — это самая простая форма маршрутизации. Соединяет ROUTER с несколькими DEALER, используя подходящий алгоритм для распределения сообщений между DEALER. DEALER может быть чёрной дырой (обрабатывает сообщения, но не отправляет ответы), прокси (пересылает сообщения другим узлам) или сервисом (отправляющим обратные сообщения).Если требуется, чтобы DEALER мог отвечать, необходимо обеспечить соединение только одного ROUTER с DEALER, так как DEALER не знает, какой конкретный узел его вызывает. Если есть несколько узлов, DEALER будет выполнять балансировку нагрузки, распределяя сообщения. Однако если DEALER является чёрной дырой, он может быть подключен к любому количеству узлов.Для чего используется ROUTER-DEALER маршрутизация? Если DEALER отправляет время выполнения задачи обратно ROUTER, то ROUTER сможет определить скорость обработки данных DEALER. Поскольку ROUTER и DEALER являются асинхронными сокетами, мы должны использовать zctx_poll() для обработки этой ситуации.
В следующем примере два DEALER не отправляют сообщения обратно ROUTER, наш маршрут использует взвешенный случайный алгоритм: отправка вдвое большего количества сообщений одному из DEALER.
```textdiagram
+-------------+
| |
| Клиент | Отправка на "A" или "B"
| |
+-------------+
| ROUTER |
\------+------/
|
|
+-------+-------+
| |
| |
v v
/-----------\ /-----------\
| DEALER | | DEALER |
| "A" | | "B" |
+-----------+ +-----------+
| | | |
| Рабочий | | Рабочий |
| | | |
+-----------+ +-----------+
Рисунок # - Маршрутизация ROUTER к DEALER
```
**rtdealer.c**
```c
//
// Пользовательская ROUTER-DEALER маршрутизация
//
// Этот пример представляет собой однопроцессное приложение для удобства запуска.
// Каждый поток имеет свое собственное ZMQ-окружение, поэтому можно рассматривать это как несколько отдельных процессов.
//
#include "zhelpers.h"
#include <pthread.h>
// Здесь определены два рабочих потока, код которых одинаков.
static void *
worker_task_a (void *args)
{
void *context = zmq_init (1);
void *worker = zmq_socket (context, ZMQ_DEALER);
zmq_setsockopt (worker, ZMQ_IDENTITY, "A", 1);
zmq_connect (worker, "ipc://routing.ipc");
``````c
int total = 0;
while (1) {
// Мы принимаем только вторую часть сообщения
char *request = s_recv(worker);
int finished = (strcmp(request, "КОНЕЦ") == 0);
free(request);
if (finished) {
printf("A получено: %d\n", total);
break;
}
total++;
}
zmq_close(worker);
zmq_term(context);
return NULL;
}
static void *
worker_task_b(void *args)
{
void *context = zmq_init(1);
void *worker = zmq_socket(context, ZMQ_DEALER);
zmq_setsockopt(worker, ZMQ_IDENTITY, "B", 1);
zmq_connect(worker, "ipc://routing.ipc");
int total = 0;
while (1) {
// Мы принимаем только вторую часть сообщения
char *request = s_recv(worker);
int finished = (strcmp(request, "КОНЕЦ") == 0);
free(request);
if (finished) {
printf("B получено: %d\n", total);
break;
}
total++;
}
zmq_close(worker);
zmq_term(context);
return NULL;
}
int main(void)
{
void *context = zmq_init(1);
void *client = zmq_socket(context, ZMQ_ROUTER);
zmq_bind(client, "ipc://routing.ipc");
pthread_t worker;
pthread_create(&worker, NULL, worker_task_a, NULL);
pthread_create(&worker, NULL, worker_task_b, NULL);
// Ждём, пока потоки подключатся к сокету, чтобы наши сообщения были правильно маршрутизированы
sleep(1);
// Отправляем 10 задач, двумя разными способами
int task_nbr;
srandom((unsigned)time(NULL));
for (task_nbr = 0; task_nbr < 10; task_nbr++) {
// Отправляем две части сообщения: первая - адрес получателя
if (randof(3) > 0)
s_sendmore(client, "A");
else
s_sendmore(client, "B");
// Затем отправляем задачу
s_send(client, "Это рабочий объем");
}
s_sendmore(client, "A");
}
``````markdown
s_send(client, "КОНЕЦ");
s_sendmore(client, "B");
s_send(client, "КОНЕЦ");
zmq_close(client);
zmq_term(context);
return 0;
}
```* ROUTER не знает, когда DEALER будет готов, поэтому мы можем использовать сигналы для решения этой проблемы, но чтобы не усложнять пример, используем `sleep(1)`. Без этого вызова первоначальные сообщения, отправленные ROUTER, не будут маршрутизированы, и ØMQ будет игнорировать эти сообщения.
* Важно отметить, что, помимо ROUTER, который игнорирует нераспознанные сообщения, PUB сокет также игнорирует отправленные сообщения, если нет SUB-соединений. Другие сокеты хранят нераспознанные сообщения до тех пор, пока они не будут обработаны.
При маршрутизации сообщений к DEALER мы вручную создаем такой конверт:
```textdiagram
+-------------+
Frame 1 | Адрес |
+-------------+-------------------------+
Frame 2 | Данные |
+---------------------------------------+
Рисунок # - Конверт маршрутизации для DEALER
```
ROUTER сокет удаляет первую рамку и передает только содержимое второй рамки соответствующему DEALER. Когда DEALER отправляет сообщение ROUTER, он отправляет только одну рамку, а ROUTER добавляет первый конверт и возвращает его нам.Если вы определяете недопустимый адрес конверта, ROUTER просто игнорирует это сообщение без каких-либо уведомлений. С этим ничего нельзя сделать, так как это может произойти только в двух случаях: либо целевой узел больше не существует, либо адрес был указан неверно в программе. Как узнать, что сообщение было правильно маршрутизировано? Единственный способ — получить обратную связь от маршрутизируемого узла. Это будет подробно рассмотрено в последующих главах.DEALER работает как комбинация PUSH и PULL. Однако, мы не можем использовать PUSH или PULL для создания запросно-ответной модели.
### Маршрутизация на основе алгоритма LRU
Ранее мы говорили, что REQ сокет всегда является инициатором диалога и затем ждет ответа. Эта особенность позволяет поддерживать несколько REQ сокетов в режиме ожидания. Другими словами, REQ сокет сообщает, что он готов.
Вы можете соединить ROUTER с несколькими REQ, и процесс запросно-ответного обмена будет следующим:
* REQ отправляет сообщение ROUTER
* ROUTER возвращает сообщение REQ
* REQ отправляет сообщение ROUTER
* ROUTER возвращает сообщение REQ
* ...
Как и в случае с DEALER, REQ может быть связан только с одним ROUTER, за исключением случаев, когда вы хотите реализовать что-то вроде многоуровневой маршрутизации (я даже не хочу объяснять это здесь), что приведет к чрезмерной сложности и вынудит вас отказаться от такого подхода.
```textdiagram
+-------------+
| |
| Клиент | Отправка в "A" или "B"
| |
+-------------+
| ROUTER |
\-------------/
^
| (1) Мама говорит Привет
|
+-------+-------+
| |
| | (2) ROUTER передает бельё
v v
/-----------\ /-----------\
| REQ | | REQ |
| "A" | | "B" |
+-----------+ +-----------+
| | | |
| Рабочий | | Рабочий |
| | | |
+-----------+ +-----------+
```
Перевод текста внутри диаграммы:
```textdiagram
+-------------+
| |
| Клиент | Отправка в "A" или "B"
| |
+-------------+
| ROUTER |
\-------------/
^
| (1) Мама говорит Привет
|
+-------+-------+
| |
| | (2) ROUTER передает бельё
v v
/-----------\ /-----------\
| REQ | | REQ |
| "A" | | "B" |
+-----------+ +-----------+
| | | |
| Рабочий | | Рабочий |
| | | |
+-----------+ +-----------+
```
Перевод текста внутри диаграммы:
```textdiagram
+-------------+
| |
| Клиент | Отправка в "A" или "B"
| |
+-------------+
| ROUTER |
\-------------/
^
| (1) Мама говорит Привет
|
+-------+-------+
| |
| | (2) ROUTER передает бельё
v v
/-----------\ /-----------\
| REQ | | REQ |
| "A" | | "B" |
+-----------+ +-----------+
| | | |
| Рабочий | | Рабочий |
| | | |
+-----------+ +-----------+
```
Перевод текста внутри диаграммы:
```textdiagram
+-------------+
| |
| Клиент | Отправка в "A" или "B"
| |
+-------------+
| ROUTER |
\-------------/
^
| (1) Мама говорит Привет
|
+-------+-------+
| |
| | (2) ROUTER передает бельё
v v
/-----------\ /-----------\
| REQ | | REQ |
| "A" | | "B" |
+-----------+ +-----------+
| | | |
| Рабочий | | Рабочий |
| | | |
+-----------+ +-----------+
``` Рисунок # - Пользовательский маршрут ROUTER к маме
```Можно ли использовать режим ROUTER-REQ для чего-то ещё? Самым распространённым применением является алгоритм наименьших запросов (LRU), где ROUTER отправляет запросы к тому REQ, который ждал дольше всего. Посмотрите на пример:
```c
//
// Пользовательский ROUTER-REQ маршрут
//
#include "zhelpers.h"
#include <pthread.h>
#define NBR_WORKERS 10
static void *
worker_task(void *args)
{
void *context = zmq_init(1);
void *worker = zmq_socket(context, ZMQ_REQ);
// Функция s_set_id() генерирует печатаемую строку на основе сокета,
// которая используется как идентификатор для этого сокета.
s_set_id(worker);
zmq_connect(worker, "ipc://routing.ipc");
int total = 0;
while (1)
{
// Уведомляем ROUTER, что мы готовы к работе
s_send(worker, "ready");
// Получаем работу от ROUTER до тех пор, пока не получим сообщение о завершении
char *workload = s_recv(worker);
int finished = (strcmp(workload, "END") == 0);
free(workload);
if (finished)
{
printf("Обработано: %d задач\n", total);
break;
}
total++;
// Случайное ожидание некоторого времени
s_sleep(randof(1000) + 1);
}
zmq_close(worker);
zmq_term(context);
return NULL;
}
int main(void)
{
void *context = zmq_init(1);
void *client = zmq_socket(context, ZMQ_ROUTER);
zmq_bind(client, "ipc://routing.ipc");
srandom((unsigned) time(NULL));
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
{
pthread_t worker;
pthread_create(&worker, NULL, worker_task, NULL);
}
int task_nbr;
for (task_nbr = 0; task_nbr < NBR_WORKERS * 10; task_nbr++)
{
// Наименее используемый worker находится в очереди сообщений
char *address = s_recv(client);
char *empty = s_recv(client);
free(empty);
char *ready = s_recv(client);
free(ready);
``````c
s_sendmore(client, address);
s_sendmore(client, "");
s_send(client, "Это рабочая нагрузка");
free(address);
}
// Уведомляем все REQ сокеты о завершении работы
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
char *address = s_recv(client);
char *empty = s_recv(client);
free(empty);
char *ready = s_recv(client);
free(ready);
s_sendmore(client, address);
s_sendmore(client, "");
s_send(client, "Завершение работы");
free(address);
}
zmq_close(client);
zmq_term(context);
return 0;
}
```В этом примере реализация алгоритма LRU не требует использования специальных данных структур, так как механизмы очередей сообщений ØMQ уже предоставляют эквивалентную реализацию. Более реалистичная реализация алгоритма LRU должна была бы собирать подготовленных worker'ов в очередь для распределения. Мы рассмотрим этот пример позже.
```Результат выполнения программы выводит количество задач, обработанных каждым worker'ом. Поскольку REQ сокеты случайным образом ожидают некоторое время, а мы не выполнили балансировку нагрузки, мы ожидаем, что каждый worker будет иметь приблизительно равное количество обработанных задач. Это и есть результат выполнения программы.
```
Обработано: 8 задач
Обработано: 8 задач
Обработано: 11 задач
Обработано: 7 задач
Обработано: 9 задач
Обработано: 11 задач
Обработано: 14 задач
Обработано: 11 задач
Обработано: 11 задач
Обработано: 10 задач
```
Несколько замечаний по поводу приведенного выше кода:
* В отличие от предыдущего примера, нам не нужно ждать некоторое время, так как REQ сокет явно сообщает ROUTER, что он готов.
* Мы используем функцию `s_set_id()` из заголовочного файла `zhelpers.h`, чтобы создать печатаемый строковый идентификатор для сокета, что делает пример более простым. В реальном мире REQ сокеты являются анонимными, и вам придется использовать `zmq_recv()` и `zmq_send()` для обработки сообщений, так как `s_recv()` и `s_send()` поддерживают только строковые идентификаторы сокетов.
* Более того, мы использовали случайные идентификаторы, что недопустимо в реальном мире для постоянных сокетов, так как это может привести к исчерпанию узлов.* Если вы просто скопируете приведённый выше код без полного понимания его работы, вы будете подобны тому, кто видел, как Человек-паук прыгнул с крыши, и решил сделать то же самое. Последствия будут на ваш страх и риск.При маршрутизации сообщений к REQ сокетам следует учитывать определённый формат, который состоит из адреса-пустого кадра-сообщения:
```textdiagram
+-------------+
Кадр 1 | Адрес |
+---+---------+
Кадр 2 | | <------ Пустое сообщение
+---+-----------------------------------+
Кадр 3 | Данные |
+---------------------------------------+
Рисунок # - Оболочка маршрутизации для мамы (REQ)
```
### Использование адреса для маршрутизации
В классической модели запрос-ответ ROUTER обычно не взаимодействует с REP сокетами, а DEALER взаимодействует с REP. DEALER случайным образом распределяет сообщения между несколькими REP и получает результаты. ROUTER лучше всего работает с REQ сокетами. Напомним, что классическая модель ØMQ часто работает лучше всего, ведь дорога, по которой ходят люди чаще всего, является самой хорошей. Если вы решите действовать нестандартно, то велик риск угодить в ловушку. В этом разделе мы соединим ROUTER с REP и посмотрим, что произойдёт.
REP сокет имеет два ключевых свойства:
* Он должен завершить полный цикл запрос-ответ;
* Он может принимать пакеты любого размера и полностью вернуть этот пакет.В обычном режиме запрос-ответ REP является анонимным и может быть заменён в любое время. Поскольку здесь мы используем пользовательскую маршрутизацию, нам нужно отправлять сообщение конкретному REP A, а не REP B. Это гарантирует, что одна сторона сети принадлежит вам, а другая — определённому REP.Одной из ключевых концепций ØMQ является идея, что периферийные узлы должны быть максимально умными и многочисленными, тогда как промежуточное программное обеспечение должно быть простым и стабильным. Это означает, что периферийные узлы могут отправлять сообщения конкретным узлам, таким как определённый REP. В данном случае мы не будем рассматривать маршрутизацию между несколькими узлами, а сосредоточимся на том, как ROUTER взаимодействует с определённым REP.
```textdiagram
+-------------+
| |
| Клиент | Отправка на "A" или "B"
| |
+-------------+
| ROUTER |
\-------------/
^
|
|
+-------+-------+
| |
| |
v v
/-----------\ /-----------\
| REP | | REP |
| "A" | | "B" |
+-----------+ +-----------+
| | | |
| Работник | | Работник |
| | | |
+-----------+ +-----------+
Рисунок # - Маршрутизация с ROUTER до конкретного REP
```
Этот рисунок описывает следующие события:* Клиент имеет сообщение, которое будет отправлено обратно через другой ROUTER. Сообщение содержит два адреса, пустую рамку и содержание;
* Клиент отправляет это сообщение ROUTER, указывая адрес REP;
* ROUTER удаляет адрес и передает сообщение соответствующему REP;
* REP получает сообщение, содержащее адрес, пустую рамку и содержание;
* REP удаляет всё, что находится перед пустой рамкой, и передает содержание работнику;
* работник обрабатывает сообщение и отправляет ответ REP;
* REP обёртывает ответ в сохранённую рамку и отправляет его ROUTER;
* ROUTER добавляет рамку с адресом REP в начало ответа. Этот процесс выглядит сложным, но всё же стоит понять его подробнее. Просто запомните, что REP сокет вернёт пакет в том же виде.**rtpapa.c**
```c
//
// Настройка ROUTER-REP маршрута
//
#include "zhelpers.h"
// В этом примере используется один процесс для подчеркивания последовательности событий
int main (void)
{
void *context = zmq_init (1);
void *client = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (client, "ipc://routing.ipc");
void *worker = zmq_socket (context, ZMQ_REP);
zmq_setsockopt (worker, ZMQ_IDENTITY, "A", 1);
zmq_connect (worker, "ipc://routing.ipc");
// Ждём подключения worker
sleep (1);
// Отправляем идентификатор REP, адрес, пустой фрейм и содержимое сообщения
s_sendmore (client, "A");
s_sendmore (client, "address 3");
s_sendmore (client, "address 2");
s_sendmore (client, "address 1");
s_sendmore (client, "");
s_send (client, "Это рабочая нагрузка");
// Worker получает только содержимое сообщения
s_dump (worker);
// Worker не должен обрабатывать обёртку
s_send (worker, "Это ответ");
// Посмотрим, что получено в ROUTER
s_dump (client);
zmq_close (client);
zmq_close (worker);
zmq_term (context);
return 0;
}
```
Результат выполнения:
```
----------------------------------------
[020] Это рабочая нагрузка
----------------------------------------
[001] A
[009] address 3
[009] address 2
[009] address 1
[000]
[017] Это ответ
```
Несколько замечаний по данному коду:
* В реальных условиях ROUTER и REP сокеты находятся на разных узлах. В данном примере мы не используем многопроцессорность, чтобы сделать последовательность событий более очевидной.
```* Функция `zmq_connect()` не завершается мгновенно; соединение между REP и ROUTER требует времени. В реальных условиях ROUTER не может знать, успешно ли подключился REP, пока не получит от него какой-либо ответ. В данном примере используется `sleep(1)` для решения этой проблемы; если этого не сделать, то REP не сможет получить сообщение (попробуйте сами).* Мы используем идентификатор REP сокета для маршрутизации. Если вам это кажется странным, попробуйте отправить сообщение для B и посмотрите, сможет ли A его получить.
* В данном примере используются функции `s_dump()` и других из файла `zhelpers.h`. Они используются для вывода данных сокета, поэтому мы можем создать надстройки над ØMQ API. Когда будем обсуждать сложные приложения, мы рассмотрим это подробнее. Для маршрутизации сообщений к REP нам необходимо создать оболочку, которую он сможет распознать:
```textdiagram
+-------------+
Frame 1 | Адрес | <--- Ноль или более таких фреймов
+---+---------+
Frame 2 | | <------ Именно один пустой фрагмент сообщения
+---+-----------------------------------+
Frame 3 | Данные |
+---------------------------------------+
Рисунок # - Оболочка маршрутизации для REP
```
### Модель запрос-ответ в контексте агента сообщений
В этом разделе мы рассмотрим, как использовать сообщения ØMQ, и попробуем создать универсальный агент сообщений. Мы создадим устройство очереди для соединения нескольких клиентов и рабочих процессов, а алгоритм маршрутизации будем выбирать самостоятельно. В данном случае мы выбрали алгоритм наименьшего недавнего использования, так как это практично с точки зрения балансировки нагрузки.Сначала давайте вспомним классическую модель запрос-ответ, и попробуем использовать её для создания масштабируемого сервисного сетевого приложения. Самая базовая модель запрос-ответ выглядит следующим образом:```textdiagram
+--------+
| Клиент |
+--------+
| REQ |
+---+----+
|
|
+-----------+-----------+
| | |
| | |
+---+----+ +---+----+ +---+----+
| REP | | REP | | REP |
+--------+ +--------+ +--------+
| Рабочий| | Рабочий| | Рабочий|
+--------+ +--------+ +--------+
Рисунок # - Базовая модель запрос-ответ
```
Эта модель поддерживает несколько REP сокетов, но если мы хотим поддерживать несколько REQ сокетов, нам потребуется промежуточное устройство, которое обычно представляет собой комбинацию ROUTER и DEALER. Это устройство просто перемещает информацию между двумя сокетами, поэтому мы можем использовать готовое устройство ZMQ_QUEUE:
```
+--------+ +--------+ +--------+
| Клиент | | Клиент | | Клиент |
+--------+ +--------+ +--------+
| REQ | | REQ | | REQ |
+---+----+ +---+----+ +---+----+
| | |
+-----------+-----------+
|
+---+----+
| ROUTER |
+--------+
| Устройство|
+--------+
| DEALER |
+---+----+
|
+-----------+-----------+
| | |
+---+----+ +---+----+ +---+----+
| REP | | REP | | REP |
+--------+ +--------+ +--------+
| Рабочий| | Рабочий| | Рабочий|
+--------+ +--------+ +--------+
```Рисунок # - Расширенная модель запрос-ответ
```Основная идея состоит в том, что ROUTER запоминает, от какого REQ пришло сообщение, создавая при этом конверт. Сокеты DEALER и REP не изменяют содержимое конверта при передаче сообщений, поэтому ROUTER может определить, какому REQ следует отправить ответ. В данной модели REP сокеты являются анонимными и не имеют конкретного адреса, поэтому они могут предоставлять только один тип услуги.В приведённой выше структуре для маршрутизации REP используется встроенный алгоритм балансировки нагрузки DEALER. Однако, мы хотим использовать алгоритм LRU для маршрутизации, что требует использования модели ROUTER-REP:
```textdiagram
+--------+ +--------+ +--------+
| Клиент | | Клиент | | Клиент |
+--------+ +--------+ +--------+
| REQ | | REQ | | REQ |
+---+----+ +---+----+ +---+----+
| | |
+-----------+-----------+
|
+---+----+
| ROUTER | Передняя часть
+--------+
| Устройство| Очередь LRU
+--------+
| ROUTER | Задняя часть
+---+----+
|
+-----------+-----------+
| | |
+---+----+ +---+----+ +---+----+
| REQ | | REQ | | REQ |
+--------+ +--------+ +--------+
| Рабочий| | Рабочий| | Рабочий|
+--------+ +--------+ +--------+
Рисунок # - Расширенный запрос-ответ с использованием LRU
```
Эта очередь LRU между двумя ROUTER сокетами не может просто перемещать сообщения, что делает следующий код более сложным, но он имеет высокую повторяемость в модели запрос-ответ.**lruqueue.c**
```c
//
// Устройство, использующее алгоритм LRU
// Client и worker находятся в разных потоках
//
#include "zhelpers.h"
#include <pthread.h>
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
// Операция выталкивания, реализованная с использованием массива произвольного типа
#define DEQUEUE(q) memmove(&(q)[0], &(q)[1], sizeof(q) - sizeof(q[0]))
// Реализация базового паттерна запрос-ответ с использованием REQ сокета
// Поскольку s_send() и s_recv() не могут обрабатывать бинарные метки сокетов 0MQ,
// здесь генерируется печатаемая строка-идентификатор.
//
static void *
client_task(void *args)
{
void *context = zmq_init(1);
void *client = zmq_socket(context, ZMQ_REQ);
s_set_id(client); // Установка печатаемого идентификатора
zmq_connect(client, "ipc://frontend.ipc");
// Отправка запроса и получение ответа
s_send(client, "HELLO");
char *reply = s_recv(client);
printf("Client: %s\n", reply);
free(reply);
zmq_close(client);
zmq_term(context);
return NULL;
}
// Worker использует REQ сокет для реализации алгоритма LRU
//
static void *
worker_task(void *args)
{
void *context = zmq_init(1);
void *worker = zmq_socket(context, ZMQ_REQ);
s_set_id(worker); // Установка печатаемого идентификатора
zmq_connect(worker, "ipc://backend.ipc");
// Уведомление агента о готовности worker
s_send(worker, "READY");
while (1) {
// Сохранение всех данных перед пустым фреймом (оболочки),
// в данном примере перед пустым фреймом находится один фрейм, но может быть больше.
char *address = s_recv(worker);
char *empty = s_recv(worker);
assert(*empty == 0);
free(empty);
// Получение запроса и отправка ответа
char *request = s_recv(worker);
printf("Worker: %s\n", request);
``` free(request);
s_sendmore(worker, address);
s_sendmore(worker, "");
s_send(worker, "OK");
free(address);
}
zmq_close(worker);
zmq_term(context);
return NULL;
}
int main(void)
{
// Подготовка контекста и сокетов 0MQ
void *context = zmq_init(1);
void *frontend = zmq_socket(context, ZMQ_ROUTER);
void *backend = zmq_socket(context, ZMQ_ROUTER);
zmq_bind(frontend, "ipc://frontend.ipc");
zmq_bind(backend, "ipc://backend.ipc");
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++) {
pthread_t client;
pthread_create(&client, NULL, client_task, NULL);
}
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
pthread_t worker;
pthread_create(&worker, NULL, worker_task, NULL);
}
// Логика LRU
// - Всегда получает сообщения из backend; начинает получать сообщения из frontend, когда более одного worker свободен.
// - Когда worker отвечает, он помечается как готовый, и ответ worker передаётся клиенту.
// - Если клиент отправляет запрос, его запрос передаётся следующему worker.
// Хранит список доступных workers
int available_workers = 0;
char *worker_queue[10];
while (1) {
zmq_pollitem_t items[] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
zmq_poll(items, available_workers ? 2 : 1, -1);
// Обрабатывает очередь workers из backend
if (items[0].revents & ZMQ_POLLIN) {
// Добавляет адрес worker в очередь
char *worker_addr = s_recv(backend);
assert(available_workers < NBR_WORKERS);
worker_queue[available_workers++] = worker_addr;
// Пропускает пустую рамку
char *empty = s_recv(backend);
assert(empty[0] == 0);
free(empty);
// Третья рамка содержит "READY" или адрес клиента
char *client_addr = s_recv(backend);```
// Если это ответное сообщение, передаёт его клиенту
if (strcmp(client_addr, "READY") != 0) {
empty = s_recv(backend);
assert(empty[0] == 0);
free(empty);
char *reply = s_recv(backend);
s_sendmore(frontend, client_addr);
s_sendmore(frontend, "");
s_send(frontend, reply);
free(reply);
if (--client_nbr == 0)
break; // Выходит после обработки N сообщений
}
free(client_addr);
if (items[1].revents & ZMQ_POLLIN) {
// Получает запрос следующего клиента и передаёт его свободному worker
// Формат запроса клиента: [адрес клиента][пустая рамка][содержание запроса]
char *client_addr = s_recv(frontend);
char *empty = s_recv(frontend);
assert(empty[0] == 0);
free(empty);
char *request = s_recv(frontend);
s_sendmore(backend, worker_queue[0]);
s_sendmore(backend, "");
s_sendmore(backend, client_addr);
s_sendmore(backend, "");
s_send(backend, request);
free(client_addr);
free(request);
// Удаляет адрес данного worker из очереди
free(worker_queue[0]);
DEQUEUE(worker_queue);
available_workers--;
}
zmq_close(frontend);
zmq_close(backend);
zmq_term(context);
return 0;
}
```
Этот программный код имеет два ключевых аспекта: 1) как каждый сокет обрабатывает конверты; 2) алгоритм LRU. Сначала рассмотрим формат конверта.
```Мы знаем, что сокет REQ при отправке сообщений добавляет в начало пустой кадр, а при получении автоматически удаляет его. Наша задача — удовлетворять требованиям REQ, корректно обрабатывая пустые кадры. Также следует учесть, что сокет ROUTER добавляет адрес источника перед каждым полученным сообщением.
Теперь мы пройдёмся по полному циклу запроса-ответа. Мы установим метку для сокета клиента как "CLIENT", а для рабочего — как "WORKER". Вот сообщение, отправленное клиентом:
```textdiagram
+---+-------+
Frame 1 | 5 | HELLO | Data part
+---+-------+
Figure # - Message sent by the client
```
Прокси-сервер получает следующий формат сообщения из ROUTER:
```textdiagram
+---+--------+
Frame 1 | 6 | CLIENT | Client identifier
+---+--------+
Frame 2 | 0 | Empty message part
+---+-------+
Frame 3 | 5 | HELLO | Data part
+---+-------+
Figure # - Message received on the frontend
```
Прокси-сервер извлекает адрес свободного рабочего из очереди LRU и добавляет его в качестве конверта к сообщению, передавая его ROUTER. Обратите внимание, что добавляется пустой кадр.```textdiagram
+---+--------+
Frame 1 | 6 | WORKER | Worker identifier
+---+--------+
Frame 2 | 0 | Empty message part
+---+--------+
Frame 3 | 6 | CLIENT | Client identifier
+---+--------+
Frame 4 | 0 | Empty message part
+---+-------+
Frame 5 | 5 | HELLO | Data part
+---+-------+
``` Рисунок # - Сообщение, отправляемое на бэкенд
```
При получении сообщения сокетом REQ (рабочий) удаляет конверт и пустой кадр:
```textdiagram
+---+--------+
Кадр 1 | 6 | CLIENT | Идентификатор клиента
+---+--------+
Кадр 2 | 0 | Пустая часть сообщения
+---+-------+
Кадр 3 | 5 | HELLO | Часть данных
+---+-------+
Рисунок # - Сообщение, доставленное рабочему
```
Как видно, сообщение, полученное рабочим, совпадает с сообщением, полученным клиентом от ROUTER. Рабочий должен сохранить конверт в этом сообщении и выполнять операции только над содержимым сообщения. При возврате:
* рабочий отправляет сообщение \[адрес клиента\]\[пустой фрейм\]\[ответное содержание\] устройству через REQ;
* устройство получает от рабочего через ROUTER \[адрес рабочего\]\[пустой фрейм\]\[адрес клиента\]\[пустой фрейм\]\[ответное содержание\];
* устройство сохраняет адрес рабочего и отправляет \[адрес клиента\]\[пустой фрейм\]\[ответное содержание\] клиенту через ROUTER;
* клиент получает \[ответное содержание\] через REQ.
Далее рассмотрим алгоритм LRU:* создается группа poll, которая постоянно получает сообщения с backend (ROUTER worker'а); сообщения с frontend (ROUTER клиента) получаются только при наличии свободного worker'а;
* выполняется цикл poll;
* если есть сообщения с backend, то это может быть либо READY сообщение (worker готов к работе), либо ответное сообщение (нужно передать клиенту); в обоих случаях адрес worker'а сохраняется в LRU очереди, а если есть ответное содержание, то оно передается соответствующему клиенту;
* если есть сообщения с frontend, то следующий worker из LRU очереди выбирается для выполнения запроса; для этого отправляется [адрес worker'а][пустой фрейм][адрес клиента][пустой фрейм][запросное содержание] на ROUTER worker'а.Алгоритм можно расширить, выполняя самотестирование worker'а при запуске, вычисляя его производительность и отправляя вместе с READY сообщением агенту, что позволит агенту более эффективно распределять задачи.
### Обертка над верхним уровнем API ØMQ
Использование API ØMQ для работы с многофрагментными сообщениями является сложной задачей, как показано ниже:
```c
while (1) {
// Сохраняем все содержимое до пустого фрейма (оболочка)
// В данном примере до пустого фрейма только один фрейм, но может быть больше.
char *address = s_recv(worker);
char *empty = s_recv(worker);
assert(*empty == 0);
free(empty);
// Получаем запрос и отправляем ответ
char *request = s_recv(worker);
printf("Worker: %s\n", request);
free(request);
s_sendmore(worker, address);
s_sendmore(worker, "");
s_send(worker, "OK");
free(address);
}
```Этот код не удовлетворяет требованиям переиспользования, так как он может обрабатывать только одну оболочку. На самом деле, этот код уже представляет собой некоторую обертку над API ØMQ, и использование низкоуровневых API приведет к еще более длинному коду.
```c
while (1) {
// Сохраняем все содержимое перед пустым фреймом (оболочку),
// в данном примере перед пустым фреймом находится только один фрейм,
// но может быть больше.
zmq_msg_t address;
zmq_msg_init (&address);
zmq_recv (worker, &address, 0);
zmq_msg_t empty;
zmq_msg_init (&empty);
zmq_recv (worker, &empty, 0);
// Получаем запрос и отправляем ответ
zmq_msg_t payload;
zmq_msg_init (&payload);
zmq_recv (worker, &payload, 0);
int char_nbr;
printf ("Worker: ");
for (char_nbr = 0; char_nbr < zmq_msg_size (&payload); char_nbr++)
printf ("%c", *(char *) (zmq_msg_data (&payload) + char_nbr));
printf ("\n");
zmq_msg_init_size (&payload, 2);
memcpy (zmq_msg_data (&payload), "OK", 2);
zmq_send (worker, &address, ZMQ_SNDMORE);
zmq_close (&address);
zmq_send (worker, &empty, ZMQ_SNDMORE);
zmq_close (&empty);
zmq_send (worker, &payload, 0);
zmq_close (&payload);
}
```
Идеальный API должен был бы принимать и обрабатывать полное сообщение, включая конверт, одним шагом. Нижележащие API ØMQ не предназначены для этого, но мы можем создать дополнительный уровень абстракции поверх них, что является важной частью изучения ØMQ.Создание такого API представляет определённую сложность, поскольку необходимо избегать чрезмерного копирования данных. Кроме того, ØMQ использует термин "сообщение" для обозначения отдельных частей сообщений и целых сообщений, которые могут быть как строковыми, так и двоичными, что усложняет задачу создания API.
Одним из решений может быть использование новых названий: строки (s_send() и s_recv() уже используются), кадры (части сообщения), сообщения (один или несколько кадров). Ниже представлен пример переписанного worker'а с использованием нового API:
```c
while (1) {
zmsg_t *zmsg = zmsg_recv(worker);
zframe_print(zmsg_last(zmsg), "Worker: ");
zframe_reset(zmsg_last(zmsg), "OK", 2);
zmsg_send(&zmsg, worker);
}
```
Замена 22 строк кода на 4 строки делает его более понятным и компактным. Мы можем использовать этот подход для создания других API, чтобы реализовать следующие функции:
* Автоматическое управление сокетами. Ручное закрытие каждого сокета каждый раз является проблемой, поэтому было бы удобно автоматически закрывать все сокеты при завершении контекста.
* Удобное управление потоками. Большинство приложений ØMQ используют многопоточность, но POSIX API для работы с потоками не очень удобны, поэтому можно создать более удобный уровень абстракции.* Удобное управление временем. Получение текущего времени в миллисекундах или пауза на несколько миллисекунд также являются проблемами, поэтому API должен предоставлять эти возможности.* Реактор, заменяющий zmq_poll(). Цикл опроса прост, но громоздок и приводит к дублированию кода. Реактор, который обрабатывает чтение/запись сокетов и управление временем, будет намного удобнее.
* Правильная обработка клавиши Ctrl-C. Мы уже видели, как обрабатывать прерывания, и было бы хорошо, если бы это было доступно для всех программ.
Мы можем использовать расширение czmq для выполнения вышеупомянутых требований. Это расширение существует давно и предоставляет множество уровней абстракции над ØMQ, а также некоторые данные структуры (хэши, списки и т.д.).
Ниже представлен пример переписанного LRU агента с использованием czmq:**lruqueue2.c**
```c
//
// ЛRU сообщение очереди устройства, использует библиотеку czmq
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // Информация о готовности worker
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://frontend.ipc");
// Отправка запроса и получение ответа
while (1) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (! reply)
break;
printf ("Клиент: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}
//
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://backend.ipc");
// Уведомление агента о готовности worker
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
// Получение сообщения и его обработка
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (! msg)
break; // Завершение
//zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
int main (void)
{
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "ipc://frontend.ipc");
zsocket_bind (backend, "ipc://backend.ipc");
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (ctx, client_task, NULL);
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (ctx, worker_task, NULL);
// ЛRU логика
// - всегда получает сообщения из backend; когда есть более одного свободного worker, получает сообщения из frontend.
``````c
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Когда есть доступный worker, получаем сообщение с frontend
int rc = zmq_poll (items, zlist_size (workers) ? 2 : 1, -1);
if (rc == -1)
break; // Прерывание
// Обрабатываем сообщение, полученное с backend
if (items [0].revents & ZMQ_POLLIN) {
// Используем адрес worker для LRU маршрутизации
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // Прерывание
zframe_t *address = zmsg_unwrap (msg);
zlist_append (workers, address);
// Если это не сообщение READY, передаем его клиенту
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// Получаем запрос от клиента и передаем его worker
zmsg_t *msg = zmsg_recv (frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
zmsg_send (&msg, backend);
}
}
}
// Если все завершено, выполняем очистку
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
}
```
``````czmq предоставляет простую систему прерываний: при нажатии Ctrl+C программа завершает выполнение ØMQ и возвращает -1, errno устанавливается в EINTR. При прерывании программы метод recv класса czmq возвращает NULL, поэтому можно использовать следующий код для проверки:
``````c
while (1) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupt
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
```
Если используется функция `zmq_poll()`, то проверка может выглядеть так:
```
int rc = zmq_poll (items, zlist_size (workers) ? 2 : 1, -1);
if (rc == -1)
break; // Interrupt
```
В примере выше используется оригинальная функция `zmq_poll()`. Также можно использовать реактор `zloop`, предоставленный `czmq`, который позволяет:
* Получать сообщения с любого сокета, то есть если на сокете есть сообщение, это вызывает функцию;
* Остановить чтение сообщений с сокета;
* Установить таймеры, чтобы периодически читать сообщения.
Внутри `zloop` используется функция `zmq_poll()`, но он позволяет динамически добавлять и удалять прослушиватели с сокетов, перестраивать пул `zmq_poll` и вычислять следующее событие таймера на основе времени ожидания в `zmq_poll`.
Используя этот паттерн реактора, наш код становится более компактным:
```c
zloop_t *reactor = zloop_new ();
zloop_reader (reactor, self->backend, s_handle_backend, self);
zloop_start (reactor);
zloop_destroy (&reactor);
```
На самом деле обработка сообщений происходит в других частях программы, и не все могут предпочесть такой стиль, но `zloop` действительно объединяет поведение таймеров и сокетов. В последующих примерах мы будем использовать `zmq_poll()` для простых примеров и `zloop` для сложных.
Теперь мы перепишем устройство LRU очереди с использованием `zloop`:**lruqueue3.c**
```c
//
// ЛРУ очередь, реализованная с использованием czmq и паттерна реактора
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // Сообщение о готовности worker
//
// Использует REQ для реализации базовой модели запрос-ответ
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://frontend.ipc");
// Отправляет запрос и получает ответ
while (1) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break;
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}
//
// Worker использует REQ сокет для реализации маршрутизации
//
static void *
worker_task (void *arg_ptr)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://backend.ipc");
// Уведомляет агента о готовности worker
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
// Получает сообщение и обрабатывает его
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Прерывание
//zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
//
// Структура ЛРУ очереди, передается в реактор
typedef struct {
void *frontend; // Прослушивание клиентов
void *backend; // Прослушивание worker
zlist_t *workers; // Список доступных worker
} lruqueue_t;
//
// Обработка сообщений от frontend
int s_handle_frontend (zloop_t *loop, void *socket, void *arg)
{
lruqueue_t *self = (lruqueue_t *) arg;
zmsg_t *msg = zmsg_recv (self->frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (self->workers));
```
zmsg_send (&msg, self->backend);
// Если нет доступных worker, прекращаем прослушивание frontend
if (zlist_size (self->workers) == 0)
zloop_cancel (loop, self->frontend);
}
return 0;
}
//
// Обработка сообщений от backend
int s_handle_backend (zloop_t *loop, void *socket, void *arg)
{
// Использует адрес worker для ЛРУ маршрутизации
lruqueue_t *self = (lruqueue_t *) arg;
zmsg_t *msg = zmsg_recv (self->backend);
if (msg) {
``````c
zframe_t *address = zmsg_unwrap(msg);
zlist_append(self->workers, address);
// Когда есть доступный worker, увеличиваем слушатель frontend
if (zlist_size(self->workers) == 1)
zloop_reader(loop, self->frontend, s_handle_frontend, self);
// Если ответ пришел от worker, передаем его клиенту
zframe_t *frame = zmsg_first(msg);
if (memcmp(zframe_data(frame), LRU_READY, 1) == 0)
zmsg_destroy(&msg);
else
zmsg_send(&msg, self->frontend);
}
int main(void)
{
zctx_t *ctx = zctx_new();
lruqueue_t *self = (lruqueue_t *)zmalloc(sizeof(lruqueue_t));
self->frontend = zsocket_new(ctx, ZMQ_ROUTER);
self->backend = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(self->frontend, "ipc://frontend.ipc");
zsocket_bind(self->backend, "ipc://backend.ipc");
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new(ctx, client_task, NULL);
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new(ctx, worker_task, NULL);
// Список доступных workers
self->workers = zlist_new();
// Подготовка и запуск реактора
zloop_t *reactor = zloop_new();
zloop_reader(reactor, self->backend, s_handle_backend, self);
zloop_start(reactor);
zloop_destroy(&reactor);
// Очистка после завершения
while (zlist_size(self->workers))
{
zframe_t *frame = (zframe_t *)zlist_pop(self->workers);
zframe_destroy(&frame);
}
zlist_destroy(&self->workers);
zctx_destroy(&ctx);
free(self);
return 0;
}```Обработка Ctrl-C корректно может вызвать некоторые трудности. Если вы используете класс zctx, он автоматически будет обрабатывать это, но также требуются соответствующие изменения в коде. Если zmq_poll() вернула -1, или метод recv (zstr_recv, zframe_recv, zmsg_recv) вернул NULL, следует завершить все циклы. Кроме того, полезно добавить проверку !zctx_interrupted на самом высоком уровне цикла.```### Асинхронная структура C/S
В предыдущей модели ROUTER-DEALER мы видели, как клиент асинхронно взаимодействует с несколькими рабочими процессами. Мы можем перевернуть эту структуру, чтобы реализовать несколько клиентов, которые асинхронно взаимодействуют с одним сервером:
```textdiagram
+-----------+ +-----------+
| | | |
| Клиент | | Клиент |
| | | |
+-----------+ +-----------+
| DEALER | | DEALER |
\-----------/ \-----------/
^ ^
| |
| |
+-------+-------+
|
|
v
/------+------\
| ROUTER |
+-------------+
| |
| Сервер |
| |
+-------------+
Рисунок # - Асинхронная структура клиента-сервера
```
* Клиент подключается к серверу и отправляет запрос;
* При каждом получении запроса сервер отправляет от 0 до N ответов;
* Клиент может отправлять несколько запросов одновременно, не ожидая ответов;
* Сервер может отправлять несколько ответов одновременно, не ожидая новых запросов.
**asyncsrd.c**```c
//
// Асинхронная модель C/S (DEALER-ROUTER)
//
#include "czmq.h"
// ---------------------------------------------------------------------
// Это клиентская задача, которая подключается к серверу, отправляет запрос каждую секунду и собирает/печатает ответы.
// Мы будем запускать несколько клиентских задач с использованием случайных идентификаторов.
static void *client_task(void *args)
{
zctx_t *ctx = zctx_new();
void *client = zsocket_new(ctx, ZMQ_DEALER);
``` // Устанавливаем случайный идентификатор для удобства отслеживания
char identity[10];
sprintf(identity, "%04X-%04X", randof(0x10000), randof(0x10000));
zsockopt_set_identity(client, identity);
zsocket_connect(client, "tcp://localhost:5570");
zmq_pollitem_t items[] = {{client, 0, ZMQ_POLLIN, 0}};
int request_nbr = 0;
while (1)
{
// Получаем сообщение из poll, каждую секунду
int centitick;
for (centitick = 0; centitick < 100; centitick++)
{
zmq_poll(items, 1, 10 * ZMQ_POLL_MSEC);
if (items[0].revents & ZMQ_POLLIN)
{
zmsg_t *msg = zmsg_recv(client);
zframe_print(zmsg_last(msg), identity);
zmsg_destroy(&msg);
}
}
zstr_sendf(client, "запрос #%d", ++request_nbr);
}
zctx_destroy(&ctx);
return NULL;
}
// ---------------------------------------------------------------------
// Это серверная задача, которая использует многопоточность для распределения запросов между несколькими worker'ами и правильного возврата ответов.
// Один worker может обрабатывать только один запрос, но клиент может отправлять несколько запросов одновременно.
static void server_worker(void *args, zctx_t *ctx, void *pipe);
void *server_task(void *args)
{
zctx_t *ctx = zctx_new();
// frontend сокет использует TCP для связи с клиентом
void *frontend = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(frontend, "tcp://*:5570");
// backend сокет использует inproc для связи с worker'ами
void *backend = zsocket_new(ctx, ZMQ_DEALER);
zsocket_bind(backend, "inproc://backend");
// Запускаем пул worker'ов, количество произвольное
int thread_nbr;
for (thread_nbr = 0; thread_nbr < 5; thread_nbr++)
zthread_fork(ctx, server_worker, NULL);```c
// Используем устройство очереди для соединения backend и frontend, мы могли бы сделать это следующим образом:
// zmq_device(ZMQ_QUEUE, frontend, backend);
// Но здесь мы будем выполнять эту задачу самостоятельно, что позволяет удобнее отлаживать.
```
```c
// Передача сообщений между frontend и backend
while (1) {
zmq_pollitem_t items[] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
zmq_poll(items, 2, -1);
if (items[0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv(frontend);
//puts("Запрос от клиента:");
//zmsg_dump(msg);
zmsg_send(&msg, backend);
}
if (items[1].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv(backend);
//puts("Ответ от рабочего процесса:");
//zmsg_dump(msg);
zmsg_send(&msg, frontend);
}
}
zctx_destroy(&ctx);
return NULL;
}
// Обработка запроса и случайное возвращение нескольких одинаковых ответов с случайной задержкой между ними.
//
static void
server_worker(void *args, zctx_t *ctx, void *pipe)
{
void *worker = zsocket_new(ctx, ZMQ_DEALER);
zsocket_connect(worker, "inproc://backend");
while (1) {
// Сокет DEALER возвращает нам оболочку и содержимое сообщения
zmsg_t *msg = zmsg_recv(worker);
zframe_t *address = zmsg_pop(msg);
zframe_t *content = zmsg_pop(msg);
assert(content);
zmsg_destroy(&msg);
// Возвращаем случайное количество ответов от 0 до 4
int reply, replies = randof(5);
for (reply = 0; reply < replies; reply++) {
// Ожидаем некоторое время
zclock_sleep(randof(1000) + 1);
zframe_send(&address, worker, ZFRAME_REUSE + ZFRAME_MORE);
zframe_send(&content, worker, ZFRAME_REUSE);
}
zframe_destroy(&address);
}
``` zframe_destroy(&content);
}
}
// Основная программа для запуска нескольких клиентов и одного сервера
//
int main (void)
{
zctx_t *ctx = zctx_new ();
zthread_new (ctx, client_task, NULL);
zthread_new (ctx, client_task, NULL);
zthread_new (ctx, client_task, NULL);
zthread_new (ctx, server_task, NULL);
// Запуск программы на 5 секунд
zclock_sleep (5 * 1000);
zctx_destroy (&ctx);
return 0;
}
```
```Запустив вышеуказанный код, можно заметить, что три клиента имеют свои случайные идентификаторы, и каждый запрос получает от 0 до нескольких ответов.* Клиент отправляет запрос каждую секунду и получает от 0 до нескольких ответов. Это реализуется с помощью zmq_poll(), но мы не можем выполнять поллинг только один раз в секунду, так как это не позволит своевременно обрабатывать ответы. В программе мы поллим 100 раз в секунду, что позволяет серверу использовать это как сигнал сердцебиения (heartbeat) для проверки, находится ли клиент онлайн.
* Сервер использует пул рабочих процессов, каждый из которых обрабатывает запрос одновременно. Мы можем использовать встроенные очереди для перемещения сообщений, но для удобства отладки в программе мы самостоятельно реализовали этот процесс. Вы можете удалить закомментированные строки и посмотреть на вывод.Общая архитектура этого кода представлена следующей схемой:
```textdiagram
+---------+ +---------+ +---------+
| | | | | |
| Клиент | | Клиент | | Клиент |
| | | | | |
+---------+ +---------+ +---------+
| Диллер | | Диллер | | Диллер |
\---------/ \---------/ \---------/
подключить подключить подключить
| | |
| | |
+-------------+-------------+
|
/----------------------|----------------------\
: v :
: привязка :
: /-----------\ :
: | ROUTER | :
: +-----------+ :
: | | :
: | Сервер | :
: | | :
: +-----------+ :
: | Диллер | :
: \-----------/ :
: привязка :
: | :
: +-------------+-------------+ :
: | | | :
: v v v :
: подключить подключить подключить :
: /---------\ /---------\ /---------\ :
: | Диллер | | Диллер | | Диллер | :
: +---------+ +---------+ +---------+ :
: | | | | | | :
: | Рабочий| | Рабочий| | Рабочий| :
: | | | | | | :
: +---------+ +---------+ +---------+ :
: :
\---------------------------------------------/
```
Рисунок # - Подробное описание асинхронного сервера
```Как видно, соединение между клиентом и сервером мы используем с помощью DEALER-ROUTER, а соединение между сервером и worker'ами — с помощью DEALER-DEALER. Если worker представляет собой синхронный поток, можно использовать REP. Однако в данном примере worker должен иметь возможность отправлять несколько ответов, поэтому используется асинхронный сокет типа DEALER. Здесь нам не требуется маршрутизация ответов, так как все worker'ы подключены к одному серверу.
Давайте рассмотрим маршрутизацию с использованием конвертов. Клиент отправляет сообщение, сервер получает сообщение, содержащее адрес клиента, таким образом у нас есть два возможных варианта для связи сервер-worker:
* Worker получает незаметное сообщение. Мы используем явно объявленные метки вместе с ROUTER-сокетом для подключения worker'а к серверу. Такое решение требует, чтобы worker заранее сообщил ROUTER о своём существовании, что является типичной LRU-стратегией.
* Worker получает сообщение с меткой и возвращает ответ с меткой. Это требует от worker'а обработки конверта.
Второй вариант проще:
```
клиент сервер фронтенд worker
[ DEALER ]<---->[ ROUTER <----> DEALER <----> DEALER ]
1 часть 2 части 2 части
```Когда нам нужно поддерживать диалог между клиентом и сервером, мы сталкиваемся с классической проблемой: клиент нефиксирован, если сохранять сообщения для каждого клиента, система быстро исчерпывает свои ресурсы. Даже если поддерживается постоянное соединение с одним и тем же клиентом, использование временных сокетов (без явно объявленных меток) приводит к тому, что каждое соединение воспринимается как новое.Чтобы сохранять информацию о клиентах в асинхронных запросах, следует учитывать следующие моменты:
* Клиент должен отправлять пульсацию сердца серверу. В данном примере клиент каждую секунду отправляет запрос серверу, что служит надежной системой пульсации.
* Использование меток сокетов клиента для хранения информации, эффективно для временных и постоянных сокетов;
* Обнаружение остановивших пульсацию клиентов, если за две секунды не было получено пульсации от клиента, состояние сохранения должно быть удалено.
### Пример: кросс-агентская маршрутизация
Давайте применим то, что мы узнали, к реальному примеру. Наш крупный клиент сегодня позвонил с экстренным запросом, чтобы построить крупную облачную инфраструктуру. Он хочет, чтобы эта облачная архитектура могла работать через несколько центров обработки данных, каждый из которых будет содержать группу клиентов и worker'ов, способных работать вместе. Мы убеждены, что практика важнее теории, поэтому предложили использовать ZMQ для создания такой системы. Клиент согласился, возможно, потому что он действительно хотел снизить затраты на разработку, или же прочитал слишком много положительных отзывов о ZMQ в Twitter.
### Подробное описаниеПосле нескольких чашек концентрированного кофе мы приступили к работе, но здравый смысл подсказал нам, что стоит сначала тщательно проанализировать проблему, прежде чем искать решения. Что именно должна делать облачность? Вот ответ клиента:* Worker выполняют свою работу на различных устройствах, но способны обрабатывать все типы задач. В каждом кластере может быть сотни worker'ов, умноженные на количество кластеров, что делает общее число очень большим.
* Client назначает задачи worker'ам, каждая задача является независимой. Каждый client хочет найти подходящего worker'а для выполнения задачи как можно быстрее. Количество клиентов постоянно меняется.
* Основная сложность заключается в том, что архитектура должна позволять легко добавлять и удалять кластеры, вместе с клиентами и worker'ами внутри них.
* Если в кластере нет доступных worker'ов, он передаст задачу другому кластеру, где есть свободные worker'ы.
* Client отправляет запрос и ждет ответа. Если через X секунд ответа нет, они повторно отправят запрос. Это уже учтено в API клиента.
* Worker обрабатывает каждый запрос по отдельности, их поведение очень простое. Если worker выходит из строя, другой скрипт запустит его снова.
Получив эти ответы, мы задали еще несколько вопросов:
* Между кластерами есть более высокий уровень сети, который их соединяет? Клиент подтвердил это.
* Какую пропускную способность нам нужно будет поддерживать? Клиент сказал, что в каждом кластере около тысячи клиентов, каждый из которых отправляет 10 запросов в секунду. Запросы содержат небольшое количество данных, и ответы также небольшие, не превышающие 1КБ.Мы провели простые расчеты: 2500 клиентов х 10 запросов/секунду х 1000 байт х двусторонняя связь = 50 МБ/секунду, или 400 Мбит/секунду, что не представляет проблемы для сети на скорости 1 Гбит/секунду, и можно использовать протокол TCP.
Таким образом, требования стали более ясными, и для реализации этой задачи не требуется дополнительного оборудования или протоколов. Нужно просто предоставить эффективный алгоритм маршрутизации и тщательно спроектировать его. Мы начали с одного кластера (дата-центра) и затем подумали, как их соединить.
### Архитектура одного кластера
Worker и client работают синхронно, мы используем алгоритм LRU для распределения задач worker'ам. Все worker'ы равны, поэтому нам не нужно беспокоиться о сервисах. Worker'ы анонимны, client не взаимодействует с конкретным worker'ом, поэтому нам не нужно гарантировать доставку сообщений или повторную отправку при неудаче. Учитывая вышеупомянутые причины, client и worker не будут прямым образом взаимодействовать друг с другом, что делает невозможным динамическое добавление и удаление узлов. Поэтому наш базовый модель будет использовать агентскую структуру, которая была использована в режиме запрос-ответ.```textdiagram
+--------+ +--------+ +--------+
| Клиент | | Клиент | | Клиент |
+--------+ +--------+ +--------+
| REQ | | REQ | | REQ |
+---+----+ +---+----+ +---+----+
| | |
+-----------+-----------+
|
+--------------------------------+
| | |
| +-----+------+ |
| | ROUTER | |
| +------------+ |
| | LRU Queue | |
| +------------+ |
| | ROUTER | |
| +-----+------+ |
| | Брокер :
+--------------------------------+
|
|
+-----------+-----------+
| | |
+---+----+ +---+----+ +---+----+
| REQ | | REQ | | REQ |
+--------+ +--------+ +--------+
| Рабочий| | Рабочий| | Рабочий|
+--------+ +--------+ +--------+
``` Figure # - Cluster architecture
```
#### Архитектура нескольких кластеров
Далее мы расширяем кластеры до нескольких, каждый из которых имеет свою группу клиентов и рабочих процессов, используя агентов для соединения:
```textdiagram
Cluster 1 : Cluster 2
:
:
+---+ +---+ +---+ : +---+ +---+ +---+
| C | | C | | C | : | C | | C | | C |
+-+-+ +-+-+ +-+-+ : +-+-+ +-+-+ +-+-+
| | | : | | |
| | | : | | |
+-+------+------+-+ : +-+------+------+-+
| Broker | : | Broker |
+-+------+------+-+ : +-+------+------+-+
| | | : | | |
| | | : | | |
+-+-+ +-+-+ +-+-+ : +-+-+ +-+-+ +-+-+
| W | | W | | W | : | W | | W | | W |
+---+ +---+ +---+ : +---+ +---+ +---+
:
Проблема заключается в том, как клиент одного кластера может взаимодействовать с рабочими процессами другого кластера. Вот несколько решений и их преимущества/недостатки:
* Клиент напрямую подключается к нескольким агентам. Преимущество заключается в том, что нам не нужно менять агентов и рабочие процессы, но клиент становится более сложным и должен знать всю структуру. Если мы хотим добавить третий или четвертый кластеры, все клиенты потребуют изменений. Мы фактически пишем маршрутизацию и механизмы отказоустойчивости в клиенте, что не является хорошей идеей.* Рабочие процессы напрямую подключаются к нескольким агентам. Однако REQ-тип рабочих процессов не может это сделать, он может отвечать только одному агенту. Если использовать REP-сокеты, то LRU алгоритм для очередей агентов будет недоступен. Это неприемлемо, так как в нашей структуре должны использоваться LRU алгоритмы для управления рабочими процессами. Вариант с ROUTER-сокетами также рассматривается, назовём его вариантом 1.
* Агенты могут быть взаимосвязаны между собой, что выглядит хорошо, так как не требует добавления большого количества дополнительных соединений. Хотя мы не можем добавлять агентов произвольно, это можно временно игнорировать. В этом случае рабочие процессы и клиенты внутри кластера не должны беспокоиться о всей структуре, агенты будут взаимодействовать между собой при наличии свободной производительности. Это вариант 2.
Сначала рассмотрим вариант 1, когда рабочие процессы взаимодействуют с несколькими агентами одновременно:
```textdiagram``` Кластер 1 : Кластер 2
:
:
| | | |
+------------+ +------------+
| ROUTER | | ROUTER |
+-----+------+ +-----+------+
| |
+---------|-+--=--------+--------------+
: | : :
+-----------+-----------+ :
| : | : | :
| : | : | :
+---+-+--+ +---+-+--+ +---+-+--+
| ROUTER | | ROUTER | | ROUTER |
+--------+ +--------+ +--------+
| рабочий | | рабочий | | рабочий |
+--------+ +--------+ +--------+ Рисунок # - Идея 1 - рабочие процессы, связанные между кластерами
```Это выглядит гибко, но не предоставляет нужных нам функций: клиент начинает запрашивать удалённых работников только когда они становятся недоступными в локальном кластере. Кроме того, сигнал "готовности" работника отправляется сразу двум агентам, что может привести к получению двух одинаковых задач. Еще одна причина неудачи этого подхода заключается в том, что мы снова переместили логику маршрутизации на периферию. Давайте рассмотрим вариант 2, где мы будем создавать соединения для каждого агента, не изменяя работников и клиентов:
```textdiagram
Кластер 1 : Кластер 2
:
:
+---+ +---+ +---+ : +---+ +---+ +---+
| C | | C | | C | : | C | | C | | C |
+-+-+ +-+-+ +-+-+ : +-+-+ +-+-+ +-+-+
| | | : | | |
| | | : | | |
+-+------+------+-+ : +-+------+------+-+
| Брокер |<--------->| Брокер |
+-+------+------+-+ : +-+------+------+-+
| | | : | | |
| | | : | | |
+-+-+ +-+-+ +-+-+ : +-+-+ +-+-+ +-+-+
| W | | W | | W | : | W | | W | | W |
+---+ +---+ +---+ : +---+ +---+ +---+
:
Рисунок # - Идея 2 - брокеры общаются друг с другом
```Эта концепция имеет ряд преимуществ: нам нужно решить проблему только в одном месте, остальные части системы остаются нетронутыми. Это как секретная связь между агентами: парень, у меня есть свободные ресурсы, если у тебя работы больше, чем ты можешь выполнить, дай знать, цены можно обсудить.На самом деле, нам просто нужно будет разработать более сложный алгоритм маршрутизации: агенты будут выступать в роли подрядчиков для других агентов. Эта концепция также имеет несколько других преимуществ:
* В обычной ситуации (например, при наличии одного кластера) этот подход работает так же, как и раньше, а при наличии нескольких кластеров выполняются дополнительные действия.
* Для различных задач можно использовать различные модели потока сообщений, такие как использование разных сетевых соединений.
* Расширение архитектуры кажется простым: если необходимо, можно добавить суперагент для управления распределением задач.
Теперь давайте начнем писать код. Мы создадим полный кластер в одном процессе, что облегчит демонстрацию и позволит легко адаптировать его для реального использования. Это и есть красота ZMQ: вы можете использовать минимальные модули для экспериментов, а затем легко перенести их в реальное приложение. Потоки становятся процессами, модели сообщений и логика остаются без изменений. Каждый "кластер" процесс будет содержать потоки клиента, работника и агента.Мы уже хорошо знакомы с базовой моделью:
* клиентский поток использует REQ сокет для отправки запросов агентскому потоку (ROUTER сокет);
* рабочий поток использует REQ сокет для обработки и ответа на запросы, полученные от агентского потока (ROUTER сокет);
* агент использует LRU очередь и маршрутизацию для управления запросами.#### Федеративный режим и режим коллег
Существует множество способов подключения к агенту, и нам придётся выбрать наиболее подходящий. Нам требуется функционал, который позволит сообщать другим агентам о наличии свободных рабочих потоков, а также сообщать им, что нагрузка достигла предела. Этот процесс не обязательно должен быть идеальным; иногда мы действительно принимаем больше задач, чем можем обрабатывать, но всё равно постепенно выполняем их.
Наиболее простой способ называется федеративным, при котором агент выступает в роли клиента и рабочего потока для других агентов. Мы можем соединить передний сокет агента с задним сокетом другого агента и наоборот. Подсказка: в ZMQ можно привязать один сокет к одному конечному адресу, а затем подключить его к другому конечному адресу.
```textdiagram
Кластер 1 : Кластер 2
:
:
+---+ +---+ : +---+ +---+
| C | | C | : | C | | C |
+-+-+ +-+-+ +----+ : +-----+ +-+-+ +-+-+
| | | | : | | | |
| | | | : | | | |
+-+------+------+-+ | : | +-+------+------+-+
| Агент | | : | | Агент |
+-+------+--------+ | : | +--------+------+-+
| | ^ | : | ^ | |
| | | | : | | | |
+-+-+ +-+-+ | +-----------+ +-+-+ +-+-+
| W | | W | | : | | W | | W |
+---+ +---+ +----------+ +---+ +---+
:
``` Рисунок # - Агенты, взаимосвязанные в федеративном режиме
Эта архитектура будет достаточно простой: когда агент не имеет клиентов, он сообщает другим агентам, что готов принять задачу. Однако проблема заключается в том, что этот механизм слишком прост, и агент в федеративном режиме может обрабатывать только один запрос за раз. Если клиент и рабочий поток строго синхронизированы, то остальные свободные рабочие потоки в агенте не будут получать задачи. Нам нужен агент, который обладает полностью асинхронной работой. Однако федеративный режим отлично подходит для некоторых приложений, таких как сервисно-ориентированная архитектура (SOA). Поэтому не спешите отказываться от федеративного режима; он просто не подходит для алгоритма LRU и балансировки нагрузки в кластере.
Еще один способ подключения прокси — это режим партнерства. Прокси знают друг о друге и используют специальный канал для связи. Предположим, что есть N прокси, каждый из которых имеет N-1 партнеров, и все прокси используют сообщения одного и того же формата для общения. Есть два важных момента, связанных с передачей сообщений между прокси:* Каждый прокси должен сообщать всем своим партнерам, сколько свободных рабочих процессов у него есть. Это простое сообщение, которое представляет собой постоянно обновляемое число. Очевидно, мы будем использовать PUB-SUB сокеты. Таким образом, каждый прокси будет открывать PUB сокет для постоянного обновления информации о себе, а также SUB сокет для получения информации от других прокси.* Каждый прокси должен каким-то образом передавать задачи другим прокси и получать ответы, причём этот процесс должен быть асинхронным. Мы будем использовать ROUTER-ROUTER сокеты для этого, у нас нет других вариантов. Каждый прокси будет использовать два таких ROUTER сокета: один для приема задач, другой для их распределения. Без использования двух сокетов нам потребуется дополнительная логика для различения запросов и ответов, что потребует добавления дополнительной информации в сообщения.
Кроме того, следует учитывать общение прокси с локальными клиентами и рабочими процессами.
#### Церемония именования
У прокси есть три потока сообщений, каждый из которых использует два сокета, таким образом, всего требуется шесть сокетов. Важно выбрать хорошее имя для этих сокетов, чтобы мы не запутались при переходе между ними. У каждого сокета есть определённая задача, которую он выполняет, и это может быть частью его имени. Таким образом, когда мы будем перечитывать эти коды позже, они не покажутся нам чужими.
Вот три потока сообщений, которые мы используем:
* Локальный поток запросов-ответов, обеспечивающий общение прокси с клиентами и рабочими процессами;
* Облачный поток запросов-ответов, обеспечивающий общение прокси с их партнёрами;
* Поток состояния, обеспечивающий обмен информацией между прокси и их партнёрами.Найдение значимых и одинаково длинных имен сделает наш код более аккуратным. Возможно, они не будут иметь прямого отношения друг к другу, но со временем мы к ним привыкнем. Каждый поток сообщений имеет два сокета, которые мы ранее называли «фронтенд» (frontend) и «бэкенд» (backend). Эти названия мы использовали множество раз: фронтенд отвечает за принятие информации или задач; бэкенд отправляет информацию или задачи своим коллегам. Концептуально, потоки сообщений всегда идут с фронта к задней части, а ответы — с задней части к передней.
Поэтому мы решили использовать следующие названия:
* localfe / localbe
* cloudfe / cloudbe
* statefe / statebe
Что касается протокола связи, мы используем IPC. Преимущество использования этого протокола заключается в том, что он может работать как протокол офлайн-коммуникации, подобно TCP, но без необходимости использования IP-адресов или DNS-сервисов. Для конечных точек протокола IPC мы будем называть xxx-localfe/be, xxx-cloud, xxx-state, где xxx представляет имя кластера.
Возможно, вы считаете, что такие названия слишком длинные и лучше было бы называть их s1, s2, s3... Однако ваш мозг не является машиной, и при чтении кода вы не сможете сразу понять значение переменной. Использование метода «три потока сообщений, два направления» будет удобнее, чем простое запоминание «шести различных сокетов».Вот схема распределения сокетов агента:
```textdiagram
+---------+ +---------+ +---------+
| Клиент | | Брокер | | Брокер |
| | | cloudbe | | statebe |
+---------+ +---------+ +---------+
| подключ.| | подключ.| | привязк.|
+---------+ +---------+ +---------+
запрос запрос состояние
| | |
+-+ | +-+
| | |
v v v
+---------+---------+---------+
| привязк.| привязк.| подключ.|
+---------+---------+---------+
| localfe | cloudfe | statefe | Передние конечные точки
| ROUTER | ROUTER | SUB | (входящие)
+---------+---------+---------+
| |
| Брокер |
| |
+---------+---------+---------+
| ROUTER | ROUTER | PUB | Задние конечные точки
| localbe | cloudbe | statebe | (выходящие)
+---------+---------+---------+
| привязк.| подключ.| привязк.|
+---------+---------+---------+
запрос запрос состояние
| | |
+-+ | +-+
| | |
v v v
+---------+ +---------+ +---------+
| подключ.| | привязк.| | подключ.|
+---------+ +---------+ +---------+
| Рабочий | | Брокер | | Брокер |
| | | cloudfe | | statefe |
+---------+ +---------+ +---------+
```
#### Примечание:Так как каждое сообщение имеет свою уникальную особенность, мы не будем сразу писать весь код, а будем разрабатывать и тестировать его по частям. Когда каждое сообщение будет работать корректно, мы объединим все в одну целостную систему. Мы начинаем с прототипа состояния:```textdiagram
+---------+
| Брокер |
| statebe |
+---------+
| bind |
+---------+
состояние
|
+-+
|
v
+---------+---------+---------+
| | | connect |
+---------+---------+---------+
| | | statefe |
| | | SUB |
+---------+---------+---------+
| |
| Брокер |
| |
+---------+---------+---------+
| | | PUB |
| | | statebe |
+---------+---------+---------+
| | | bind |
+---------+---------+---------+
состояние
|
+-+
|
v
+---------+
| connect |
+---------+
| statefe |
| Брокер |
+---------+
Рисунок # - Прототип потока состояния
```
Код представлен ниже:**peering1: Прототип потока состояния на C**
```c
//
// Прототип состояния потока для симуляции пира (часть 1)
//
#include "czmq.h"
int main(int argc, char *argv[])
{
// Первый аргумент — имя агента
// Остальные аргументы — имена других пиров
//
if (argc < 2) {
printf("синтаксис: peering1 me {you}...\n");
exit(EXIT_FAILURE);
}
char *self = argv[1];
printf("I: Подготовка агента %s...\n", self);
srandom((unsigned)time(NULL));
// Подготовка контекста и сокета
zctx_t *ctx = zctx_new();
void *statebe = zsocket_new(ctx, ZMQ_PUB);
zsocket_bind(statebe, "ipc://%s-state.ipc", self);
// Подключение statefe сокета к каждому пиру
void *statefe = zsocket_new(ctx, ZMQ_SUB);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv[argn];
printf("I: Соединение со статусным потоком пира '%s'\n", peer);
zsocket_connect(statefe, "ipc://%s-state.ipc", peer);
}
// Отправка и получение сообщений о состоянии
// Время ожидания в функции zmq_poll() равно времени сердцебиения
//
while (1) {
// Инициализация списка объектов poll
zmq_pollitem_t items[] = {
{ statefe, 0, ZMQ_POLLIN, 0 }
};
// Опрос активности сокетов, время ожидания — 1 секунда
int rc = zmq_poll(items, 1, 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
// Обработка полученных сообщений о состоянии
if (items[0].revents & ZMQ_POLLIN) {
char *peer_name = zstr_recv(statefe);
char *available = zstr_recv(statefe);
printf("Пир %s имеет %s свободных worker-ов\n", peer_name, available);
free(peer_name);
free(available);
}
else {
``` // Отправка случайного числа, представляющего количество свободных worker-ов
zstr_sendm (statebe, self);
zstr_sendf (statebe, "%d", randof (10));
}
}
zctx_destroy (&ctx);
return EXIT_SUCCESS;
}
```Несколько пояснений:* Каждому агенту требуется уникальный идентификатор для генерации соответствующей ipc конечной точки. В реальных условиях агенты должны использовать протокол TCP для подключения, что требует более полного механизма конфигурации, который будет подробно рассмотрен в последующих разделах.
* Ядром программы является цикл zmq_poll(), который обрабатывает полученные сообщения и отправляет состояние агента. Состояние агента отправляется только тогда, когда zmq_poll() выходит из строя из-за отсутствия сообщений от партнёра. Если бы мы отправляли своё состояние после каждого полученного сообщения, это привело бы к избыточности сообщений.
* Сообщение состояния состоит из двух фреймов: первый фрейм содержит адрес агента, второй — количество свободных рабочих процессов. Мы обязаны сообщать партнёрам свой адрес, чтобы они могли принимать запросы; единственный способ сделать это — указать его в сообщении.
* Мы не установили идентификатор на SUB сокете, чтобы избежать получения устаревшей информации о состоянии от партнёра при подключении.
* Мы не установили пороговое значение (HWM) на PUB сокете, так как подписчики являются мгновенными. Мы также можем установить пороговое значение в 1, но это не обязательно.
Давайте скомпилируем этот код и используем его для моделирования трёх кластеров: DC1, DC2, DC3. Мы будем запускать следующие команды в разных окнах:```
peering1 DC1 DC2 DC3 # Запускаем DC1 и подключаемся к DC2 и DC3
peering1 DC2 DC1 DC3 # Запускаем DC2 и подключаемся к DC1 и DC3
peering1 DC3 DC1 DC2 # Запускаем DC3 и подключаемся к DC1 и DC2
```
Каждый кластер будет отслеживать состояние своих партнеров и каждую секунду выводить своё текущее состояние.
В реальном программировании мы не будем отправлять своё состояние по расписанию, а будем делать это при изменении состояния. Это может показаться затратным по поводу использования полосы пропускания, но фактическое содержание сообщений состояния обычно небольшое, и соединения между кластерами очень быстрые.
Если нам нужно отправлять информацию о состоянии с более точным интервалом времени, можно создать дополнительный поток, открыть сокет statebe и передавать нерегулярные сообщения о состоянии основному потоку, который затем будет отправлять эти сообщения с регулярными интервалами. Однако эта механика требует дополнительного программирования.
#### Прототип локального потока и облачного потока
Теперь давайте создадим прототип локального потока и облачного потока. Этот код будет получать запросы от клиента и случайным образом распределять их между рабочими процессами внутри кластера или между кластерами.```textdiagram```
+---------+ +---------+
| Клиент | | Брокер |
| | | cloudbe |
+---------+ +---------+
| connect | | connect |
+---------+ +---------+
запрос запрос
| |
+-+ |
| |
v v
+---------+---------+---------+
| bind | bind | |
+---------+---------+---------+
| localfe | cloudfe | |
| ROUTER | ROUTER | |
+---------+---------+---------+
| |
| Брокер |
| |
+---------+---------+---------+
| ROUTER | ROUTER | |
| localbe | cloudbe | |
+---------+---------+---------+
| bind | connect | |
+---------+---------+---------+
запрос запрос
| |
+-+ |
| |
v v
+---------+ +---------+
| connect | | bind |
+---------+ +---------+
| | | cloudfe |
| Рабочий | | Брокер |
+---------+ +---------+
Рисунок # - Поток задачПрежде чем приступить к написанию кода, давайте сначала изобразим основную логику маршрутизации и составим простой и надёжный дизайн.
Нам потребуются две очереди: одна для хранения запросов, полученных от локального клиента кластеризации, а другая — для запросов, полученных от других кластеров. Один из способов — получать сообщения из передних сокетов локальных и облачных клиентов и помещать их в соответствующие очереди. Однако это кажется излишним, так как ZMQ сокеты сами являются очередями. Поэтому мы будем использовать кэши, предоставляемые ZMQ сокетами, как очереди.
Эта техника была использована нами в устройстве LRU очередей, и она работала хорошо. Мы получаем запросы из сокетов только тогда, когда у агента есть свободные рабочие процессы или другие кластеры, готовые принять запросы. Мы можем постоянно получать ответы с заднего конца и маршрутизировать их обратно. Если задний конец ничего не отвечает, нет необходимости принимать запросы с переднего конца.
Поэтому наш основной цикл будет выполнять следующие действия:
* Проверка заднего конечного сокета для получения сообщений "готовности" от рабочих процессов или ответов. Если это ответ, он будет маршрутизирован обратно к клиенту кластеризации или другому кластеру.* После получения ответа рабочий процесс будет помечен как доступный, помещен в очередь и учтен;
* Если есть доступные рабочие процессы, запрос будет получен, который может приходить от клиента внутри кластера или от другого кластера. Затем запрос будет передан рабочему процессу внутри кластера или случайным образом передан другому кластеру.
Здесь мы просто случайным образом отправляем запросы другим кластерам, а не моделируем рабочий процесс внутри агента для маршрутизации задач между кластерами. Это выглядит глупо, но пока работает.
Мы используем идентификатор агента для маршрутизации сообщений до агента. У каждого агента есть своё имя, которое задается в командной строке. Если эти имена не совпадают с UUID, автоматически созданными ZMQ для клиентов, мы сможем понять, что ответ должен быть отправлен обратно клиенту или другому кластеру.
Вот код, интересные части отмечены в программе:**peering2: Прототип потока задач локальной и облачной среды на C**
```c
//
// Прокси-партнерское моделирование (часть вторая)
// Прототип потока запросов-ответов
//
// Пример программы использует один процесс, чтобы сделать программу простой,
// каждый поток имеет свой контекст, поэтому можно рассматривать их как несколько процессов.
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // Сообщение: worker готов
// Имя прокси; в реальном мире это имя должно быть настроено
static char *self;
// Клиент использует REQ сокет для запросов-ответов
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://%s-localfe.ipc", self);
while (1) {
// Отправка запроса, получение ответа
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (! reply)
break; // Прерывание
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}
// Worker использует REQ сокет и выполняет LRU маршрутизацию
//
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://%s-localbe.ipc", self);
// Уведомление прокси, что worker готов
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
// Обработка сообщений
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (! msg)
break; // Прерывание
zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
int main (int argc, char *argv [])
{
// Первый аргумент - имя прокси
``` // Остальные аргументы - имена партнерских прокси
//
if (argc < 2) {
printf ("синтаксис: peering2 me {you}. . . \n");
exit (EXIT_FAILURE);
}
self = argv[1];
printf ("I: Подготовка прокси %s. . . \n", self);
srandom((unsigned)time(NULL));
// Подготовка контекста и сокетов
zctx_t *ctx = zctx_new();
char endpoint[256];
// Привязка cloudfe к конечной точке
void *cloudfe = zsocket_new(ctx, ZMQ_ROUTER);
zsockopt_set_identity(cloudfe, self);
zsocket_bind(cloudfe, "ipc://%s-cloud.ipc", self);
// Соединение cloudbe с конечными точками партнерских прокси
void *cloudbe = zsocket_new(ctx, ZMQ_ROUTER);
zsockopt_set_identity(cloudfe, self);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv[argn];
printf("I: Подключаюсь к агенту-партнеру '%s' через cloudfe\n", peer);
zsocket_connect(cloudbe, "ipc://%s-cloud.ipc", peer);
}
// Подготовка локального фронтенда и бэкенда
void *localfe = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(localfe, "ipc://%s-localfe.ipc", self);
void *localbe = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(localbe, "ipc://%s-localbe.ipc", self);
// Даем пользователю возможность начать процесс
printf("Подтвердите запуск всех агентов, нажмите любую клавишу для продолжения: ");
getchar();
// Запуск локального worker
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new(ctx, worker_task, NULL);
// Запуск локального клиента
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new(ctx, client_task, NULL);
// Интересная часть
// -------------------------------------------------------------
// Поток запросов-ответов
// - Если есть доступный worker, то получаем запросы с локальной или облачной системы;```c
// - Передаем запросы доступному worker или другому кластеру.
// Очередь доступных workers
int capacity = 0;
zlist_t *workers = zlist_new ();
while (1) {
zmq_pollitem_t backends [] = {
{ localbe, 0, ZMQ_POLLIN, 0 },
{ cloudbe, 0, ZMQ_POLLIN, 0 }
};
// Если нет доступных workers, продолжаем ждать
int rc = zmq_poll (backends, 2,
capacity ? 1000 * ZMQ_POLL_MSEC : -1);
if (rc == -1)
break; // Прерывание
// Обработка ответов от локальных workers
zmsg_t *msg = NULL;
if (backends [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localbe);
if (!msg)
break; // Прерывание
zframe_t *address = zmsg_unwrap (msg);
zlist_append (workers, address);
capacity++;
// Если это сигнал "готовности", больше не требуется маршрутизация
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
}
// Обработка ответов от агентов-партнеров
else if (backends [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv (cloudbe);
if (!msg)
break; // Прерывание
// нам не нужен адрес агента-партнера
zframe_t *address = zmsg_unwrap (msg);
zframe_destroy (&address);
}
// если адрес в ответственном сообщении является адресом агента-партнера, отправляем ему
for (int argn = 2; msg && argn < argc; argn++) {
char *data = (char *) zframe_data (zmsg_first (msg));
size_t size = zframe_size (zmsg_first (msg));
if (size == strlen (argv[argn]) && memcmp (data, argv[argn], size) == 0)
zmsg_send (&msg, cloudfe);
}
// направляем ответ обратно клиенту
if (msg)
zmsg_send (&msg, localfe);
// начинаем обработку запросов от клиента
while (capacity) {
zmq_pollitem_t frontends[] = {
```
Пожалуйста, обратите внимание, что в этом коде есть несколько мест, где текст был переведен, но структура и форматирование были сохранены. { localfe, 0, ZMQ_POLLIN, 0 },
{ cloudfe, 0, ZMQ_POLLIN, 0 }
};
rc = zmq_poll(frontends, 2, 0);
assert(rc >= 0);
int reroutable = 0;
// Приоритетное обслуживание запросов от агента-партнера, чтобы избежать исчерпания ресурсов
if (frontends[1].revents & ZMQ_POLLIN) {
msg = zmsg_recv(cloudfe);
reroutable = 0;
} else if (frontends[0].revents & ZMQ_POLLIN) {
msg = zmsg_recv(localfe);
reroutable = 1;
} else {
break; // Нет запросов
}
// Направляем 20% запросов в другие кластеры
if (reroutable && argc > 2 && randof(5) == 0) {
// Случайное направление запроса к агенту-партнеру
int random_peer = randof(argc - 2) + 2;
zmsg_pushmem(msg, argv[random_peer], strlen(argv[random_peer]));
zmsg_send(&msg, cloudbe);
} else {
zframe_t *frame = (zframe_t *) zlist_pop(workers);
zmsg_wrap(msg, frame);
zmsg_send(&msg, localbe);
capacity--;
}
}
// Завершающая очистка после завершения программы
while (zlist_size(workers)) {
zframe_t *frame = (zframe_t *) zlist_pop(workers);
zframe_destroy(&frame);
}
zlist_destroy(&workers);
zctx_destroy(&ctx);
return EXIT_SUCCESS;
}
```Пожалуйста, предоставьте текст для перевода. Запустите вышеуказанный код в двух окнах:```
peering2 me you
peering2 you me
```
Несколько пояснений:
* Библиотека `zmsg` значительно упрощает программу, и такие программы должны стать неотъемлемой частью нашего набора инструментов как ZMQ-программистов;
* Поскольку в программе нет реализации получения состояния соседнего агента, предположим, что все они имеют свободных работников. В реальности мы не будем отправлять запросы на несуществующие соседние агенты.
* Вы можете запустить этот код на длительное время, чтобы проверить, появится ли сообщение об ошибке маршрутизации, так как при возникновении ошибки клиент будет заблокирован. Вы можете попробовать закрыть один из агентов, чтобы увидеть, как агенты не могут маршрутизировать запросы к другим агентам в облаке, клиенты по очереди блокируются, и программа прекращает вывод отладочной информации.
#### Сборка
Давайте объединим все это в одну программу. Как и раньше, мы выполним всё это в одном процессе. Мы объединим два примера программы из вышеприведённого текста, чтобы создать программу, которая может имитировать любое количество кластеров.
Программа состоит из 270 строк кода и идеально подходит для моделирования полной системы кластера, включая клиентов, работников, агентов и механизм распределения задач в облаке.**peering3: Полное моделирование кластера на C**
```c
//
// Симуляция агента-посредника (третья часть)
// Прототип потока сообщений состояния и задач
//
// Пример программы использует один процесс, чтобы сделать программу простой,
// каждый поток имеет свой объект контекста, поэтому можно считать их несколькими процессами.
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 5
#define LRU_READY "\001" // Сообщение: worker готов
// Имя агента; в реальности это имя должно быть настроено каким-то образом
static char *self;
// Клиент запрос-ответ использует REQ сокет
// Для моделирования стресс-тестирования клиент отправляет большое количество запросов за один раз
//
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://%s-localfe.ipc", self);
void *monitor = zsocket_new (ctx, ZMQ_PUSH);
zsocket_connect (monitor, "ipc://%s-monitor.ipc", self);
while (1) {
sleep (randof (5));
int burst = randof (15);
while (burst--) {
char task_id [5];
sprintf (task_id, "%04X", randof (0x10000));
// Использует случайный шестнадцатеричный ID для идентификации задачи
zstr_send (client, task_id);
// Ждет максимум Yöntenim 10 секунд
zmq_pollitem_t pollset [1] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (pollset, 1, 10 * 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
if (pollset [0].revents & ZMQ_POLLIN) {
char *reply = zstr_recv (client);
if (!reply)
break; // Прерывание
// Ответ worker должен содержать ID задачи
puts (reply);
assert (streq (reply, task_id));
}
``````c
free (reply);
}
else {
zstr_sendf (monitor,
"E: Клиент завершил работу, потеряна задача: %s", task_id);
return NULL;
}
}
}
zctx_destroy (&ctx);
return NULL;
}
// Worker использует REQ сокет и выполняет LRU маршрутизацию
//
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://%s-localbe.ipc", self);
// Уведомляет агента, что worker готов
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
while (1) {
// worker будет случайным образом задерживаться на несколько секунд
zmsg_t *msg = zmsg_recv(worker);
sleep(randof(2));
zmsg_send(&msg, worker);
}
zctx_destroy(&ctx);
return NULL;
}
int main(int argc, char *argv[])
{
// Первый аргумент — это имя агента
// Остальные аргументы — это имена агентов-партнеров
//
if (argc < 2) {
printf("синтаксис: peering3 me {you}...\n");
exit(EXIT_FAILURE);
}
self = argv[1];
printf("I: Подготовка агента %s...\n", self);
srandom((unsigned)time(NULL));
// Подготовка контекста и сокетов
zctx_t *ctx = zctx_new();
char endpoint[256];
// Привязка cloudfe к конечной точке
void *cloudfe = zsocket_new(ctx, ZMQ_ROUTER);
zsockopt_set_identity(cloudfe, self);
zsocket_bind(cloudfe, "ipc://%s-cloud.ipc", self);
// Привязка statebe к конечной точке
void *statebe = zsocket_new(ctx, ZMQ_PUB);
zsocket_bind(statebe, "ipc://%s-state.ipc", self);
// Привязка cloudbe к конечной точке агентов-партнеров
void *cloudbe = zsocket_new(ctx, ZMQ_ROUTER);
zsockopt_set_identity(cloudbe, self);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv[argn];
printf("I: Соединение с агентом-партнером '%s' через cloudfe\n", peer);
}
}
``` zsocket_connect(cloudbe, "ipc://%s-cloud.ipc", peer);
}
// Привязка statefe к конечной точке агентов-партнеров
void *statefe = zsocket_new(ctx, ZMQ_SUB);
for (argn = 2; argn < argc; argn++) {
char *peer = argv[argn];
printf("I: Соединение с агентом-партнером '%s' через statefe\n", peer);
zsocket_connect(statefe, "ipc://%s-state.ipc", peer);
}
// Подготовка локального фронтенда и бэкенда
void *localfe = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(localfe, "ipc://%s-localfe.ipc", self);
void *localbe = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(localbe, "ipc://%s-localbe.ipc", self);
// Подготовка мониторингового сокета
void *monitor = zsocket_new(ctx, ZMQ_PULL);
zsocket_bind(monitor, "ipc://%s-monitor.ipc", self);
// Запуск локального worker
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new(ctx, worker_task, NULL);
// Запуск локального клиента
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new(ctx, client_task, NULL);
// Интересная часть
// -------------------------------------------------------------
// Публикация-подписка сообщений
// - Проверка состояния соседнего агента;
// - Отправка широковещательных сообщений при изменении собственного состояния.
// Запрос-ответ сообщений
// - Если есть доступные worker'ы, проверка наличия запросов локально или в облаке;
// - Передача запросов локальным worker'ам или другим кластерам.
// Очередь доступных worker'ов
int local_capacity = 0;
int cloud_capacity = 0;
zlist_t *workers = zlist_new();
while (1) {
zmq_pollitem_t primary[] = {
{ localbe, 0, ZMQ_POLLIN, 0 },
{ cloudbe, 0, ZMQ_POLLIN, 0 },
{ statefe, 0, ZMQ_POLLIN, 0 },
{ monitor, 0, ZMQ_POLLIN, 0 }
}; };
// Если нет доступных worker'ов, ждать до их появления
int rc = zmq_poll(primary, 4, local_capacity ? 1000 * ZMQ_POLL_MSEC : -1);
if (rc == -1)
break; // Прерывание
// Отслеживание изменений собственного состояния
int previous = local_capacity;
// Обработка ответов от локальных worker'ов
zmsg_t *msg = NULL;
if (primary[0].revents & ZMQ_POLLIN) {
msg = zmsg_recv(localbe);
if (!msg)
break; // Прерывание
zframe_t *address = zmsg_unwrap(msg);
zlist_append(workers, address);
local_capacity++;
// Если это сигнал "готовности", прекратить маршрутизацию
zframe_t *frame = zmsg_first(msg);
if (memcmp(zframe_data(frame), LRU_READY, 1) == 0)
zmsg_destroy(&msg);
}
// Обработка ответов от соседнего агента
else if (primary[1].revents & ZMQ_POLLIN) {
msg = zmsg_recv(cloudbe);
if (!msg)
break; // Прерывание
// Адрес соседнего агента нам не требуется
zframe_t *address = zmsg_unwrap(msg);
zframe_destroy(&address);
}
// Если адрес в ответе совпадает с адресом соседнего агента, отправляем ему сообщение
for (int argn = 2; msg && argn < argc; argn++) {
char *data = (char *)zframe_data(zmsg_first(msg));
size_t size = zframe_size(zmsg_first(msg));
if (size == strlen(argv[argn]) && memcmp(data, argv[argn], size) == 0)
zmsg_send(&msg, cloudfe);
}
// Передаем ответ локальному клиенту
if (msg)
zmsg_send(&msg, localfe); // Обрабатываем обновление состояния соседнего агента
if (primary[2].revents & ZMQ_POLLIN) {
char *status = zstr_recv(statefe);
cloud_capacity = atoi(status);
free(status);
}
// Обрабатываем мониторинговые сообщения
if (primary[3].revents & ZMQ_POLLIN) {
char *status = zstr_recv(monitor);
printf("%s\n", status);
free(status);
}
// Начинаем обработку запросов от клиентов
// - Если есть свободные локальные worker'ы, принимаем запросы от локального клиента и облачного сервера;
// - Если есть только свободные соседние агенты, принимаем запросы только от локального клиента;
// - Передаем запросы локальным worker'ам или соседним агентам.
//
while (local_capacity + cloud_capacity) {
zmq_pollitem_t secondary[] = {
{ localfe, 0, ZMQ_POLLIN, 0 },
{ cloudfe, 0, ZMQ_POLLIN, 0 }
};
if (local_capacity)
rc = zmq_poll(secondary, 2, 0);
else
rc = zmq_poll(secondary, 1, 0);
assert(rc >= 0);
if (secondary[0].revents & ZMQ_POLLIN)
msg = zmsg_recv(localfe);
else
if (secondary[1].revents & ZMQ_POLLIN)
msg = zmsg_recv(cloudfe);
else
break; // Нет задач
if (local_capacity) {
zframe_t *frame = (zframe_t *) zlist_pop(workers);
zmsg_wrap(msg, frame);
zmsg_send(&msg, localbe);
local_capacity--;
} else {
// Случайным образом передаем запрос соседнему агенту
int random_peer = randof(argc - 2) + 2;
zmsg_pushmem(msg, argv[random_peer], strlen(argv[random_peer]));
zmsg_send(&msg, cloudbe);
}
}```c
if (local_capacity != previous) {
// Добавляем адрес нашего агента в сообщение
zstr_sendm(statebe, self);
// Отправляем новое состояние
zstr_sendf(statebe, "%d", local_capacity);
}
}
// Очистка после завершения программы
while (zlist_size(workers)) {
zframe_t *frame = (zframe_t *) zlist_pop(workers);
zframe_destroy(&frame);
}
zlist_destroy(&workers);
zctx_destroy(&ctx);
return EXIT_SUCCESS;
}
```
Этот код не слишком длинный, но его отладка заняла около одного дня. Ниже приведены некоторые пояснения:
* `client`-поток будет обнаруживать и сообщать о неудачных запросах, он будет полифонировать агентский сокет, проверяя наличие ответов, с таймаутом в 10 секунд.
* `client`-поток не будет самостоятельно выводить информацию, а будет отправлять сообщения PUSH-потоку мониторинга, который будет выводить сообщения. Это наш первый опыт использования ZMQ для мониторинга и логирования, и мы будем использовать это больше в будущем.
* `client` будет имитировать различные ситуации нагрузки, чтобы кластер работал при различных уровнях нагрузки, поэтому запросы могут быть обработаны локально или отправлены в облако. Количество `client` и `worker` в кластере, количество других кластеров и время задержки будут влиять на результат. Вы можете настроить различные параметры для тестирования.
```* В основном цикле есть две группы наборов полифонирования, фактически мы можем использовать три: поток информации, бэкенд и фронтенд. Поскольку в предыдущих примерах нет необходимости полифонировать запросы фронтенда, если бэкенд не имеет свободных worker.Вот несколько проблем, с которыми мы столкнулись во время написания:
* Если запрос или ответ потеряются где-то, `client` будет заблокирован. Вспомните, что ROUTER-ROUTER сокеты просто игнорируют сообщения, если они не могут быть маршрутизированы. Одним из подходов здесь является изменение `client`-потока для обнаружения и отчета об этих ошибках. Кроме того, я использую `zmsg_dump()` после каждого `recv()` и перед каждым `send()`, чтобы быстрее находить сообщения.
* Основной цикл может неправильно получать сообщения из нескольких готовых сокетов, что приводит к потере первого сообщения. Решением является получение сообщений только из первого готового сокета.
* Библиотека `zmsg` плохо кодирует UUID в C-строках, что приводит к ошибкам при наличии байта 0 в UUID. Решением является преобразование UUID в печатаемое шестнадцатеричное представление.
Этот симулятор не проверяет наличие агента-партнера. Если вы запустите агента, он отправит состояние другим агентам, а затем завершит работу, другие агенты продолжат отправлять запросы этому агенту. В результате `client` других агентов будут сообщать множество ошибок. Решение состоит из двух частей: установка времени жизни для состояния, чтобы запросы не отправлялись к агенту-партнеру, который отсутствует некоторое время, и повышение надежности запросов-ответов, что будет обсуждаться в следующей главе.

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/andwp-zguide-cn.git
git@api.gitlife.ru:oschina-mirror/andwp-zguide-cn.git
oschina-mirror
andwp-zguide-cn
andwp-zguide-cn
master