1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/andwp-zguide-cn

В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
chapter5.txt 170 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 23.06.2025 22:09 06c6f61
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928
```markdown
--- vim: set filetype=markdown:
.set GIT=https://github.com/anjuke/zguide-cn
## Глава 5. Продвинутые публикация-подписка модели
Главы 3 и 4 рассказали о продвинутых методах использования модели запрос-ответ в ZMQ. Если вы уже полностью поняли эти концепции, поздравляем вас. В этой главе мы будем рассматривать модель публикация-подписка, используя верхнеуровневые модели для повышения производительности, надежности, синхронизации состояния и безопасности ZMQ.
В данной главе будут рассмотрены следующие темы:
* Обработка медленных подписчиков (модель самоубивающейся улитки)
* Быстрые подписчики (черный ящик)
* Создание общего ключевого кэша (модель клонирования)
### Обнаружение медленных подписчиков (модель самоубивающейся улитки)
При использовании модели публикация-подписка одной из наиболее распространенных проблем является обработка медленных подписчиков. Идеально, если издатель может отправлять сообщения подписчикам со скоростью, максимально приближенной к полной. Однако на практике подписчики могут требовать значительное время для обработки сообщений или быть написаны таким образом, что они не могут поспеть за скоростью издателя.
Как обрабатывать медленных подписчиков? Лучшим подходом было бы сделать подписчика более эффективным, но это потребует дополнительных усилий. Вот несколько способов решения проблемы медленных подписчиков:
```* **Хранение сообщений на стороне издателя**. Это то, как работает Gmail: если вы не читали свои электронные письма в течение нескольких часов, они будут сохранены. Однако в высокопроизводительных приложениях накопление сообщений издателем часто приводит к переполнению памяти и последующему отказу. Особенно это становится проблемой, когда есть несколько подписчиков или невозможно использовать диск для буферизации.
* **Хранение сообщений на стороне подписчика**. Этот подход лучше. По умолчанию ZMQ работает именно так. Если кто-то должен столкнуться с переполнением памяти, то это будет подписчик, а не издатель, что кажется справедливым. Однако этот подход имеет смысл только для приложений, где мгновенная нагрузка очень велика, и подписчики временно не успевают за скоростью сообщений, но в конечном итоге догонят. Но это всё ещё не решает проблему медленных подписчиков.* **Приостановка отправки сообщений**. Это также то, как работает Gmail: если объем вашего почтового ящика превышает 7.554 ГБ, новые письма будут отклонены или потеряны. Этот подход полезен для издателя, и ZMQ по умолчанию работает таким образом, если установлен порог (HWM). Однако это не решает проблему медленных подписчиков, мы просто делаем сообщения менее регулярными.
```* **Отключиться от полностью подписанных пользователей**. Это делает Hotmail, отключаясь, если пользователь не заходит в систему в течение двух недель подряд. Именно поэтому я использую пятнадцатый аккаунт Hotmail. Однако такой подход не применим в ZMQ, так как издатели не видят подписчиков и не могут с ними взаимодействовать. Кажется, что ни один из классических подходов не удовлетворяет наши потребности, поэтому нам придется придумать что-то новое. Мы можем заставить подписчика "самоуничтожиться", а не просто отключиться. Это называется "самоуничтожение-улитка" режим. Когда подписчик обнаруживает, что работает слишком медленно (определение "слишком медленно" должно быть конфигурационной опцией, и когда это происходит, он должен сообщить программисту об этом), он издает стон и самоуничтожается.```Как подписчик может обнаружить, что работает слишком медленно? Один способ — присваивать номера сообщениям и устанавливать пороговое значение на стороне публикации. Когда подписчик замечает, что номера сообщений не являются последовательными, он понимает, что что-то не так. Этот пороговый уровень будет значением, при котором подписчик решится на завершение работы.
Этот подход имеет два недостатка: во-первых, если мы подключены к нескольким публикаторам, как мы будем присваивать номера сообщениям? Решение состоит в том, чтобы присваивать каждому публикатору уникальный номер, который будет частью номера сообщения. Во-вторых, если подписчик использует опцию ZMQ_SUBSCRIBE для фильтрации сообщений, наш тщательно спроектированный механизм присвоения номеров сообщений окажется бесполезным.
Есть ситуации, где сообщения не будут фильтроваться, поэтому присвоение номеров сообщений все еще будет работать. Однако более универсальное решение заключается в том, чтобы публикаторы помечали свои сообщения временными метками, а подписчики проверяли эти метки времени при получении сообщений. Если разница между этими метками времени достигнет определенного значения, подписчик должен выдать предупреждение и завершить работу.Режим "suicide-turtle" будет особенно полезен, когда подписчики имеют собственные клиентские или сервисные соглашения, требующие максимального допустимого времени задержки. Отключение подписчика может не быть идеальным решением, но это лучше, чем создание проблем. Если подписчик получит устаревшие данные, это может привести к дополнительному повреждению данных и сделать их труднодоступными для обнаружения.Вот простейшая реализация режима "самоуничтожающаяся улитка":**suisnail: Самоубийца-улитка на C**```c
//
// Самоубийца-улитка режим
//
#include "czmq.h"
// ---------------------------------------------------------------------
// Подписчик подключается к публишеру, получает все сообщения,
// во время работы он временно приостанавливается, имитируя сложные вычисления,
// когда он обнаруживает задержку в получении сообщений более 1 секунды, он самоуничтожается.
#define MAX_ALLOWED_DELAY 1000 // миллисекунд
static void
subscriber (void *args, zctx_t *ctx, void *pipe)
{
// Подписаться на все сообщения
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (subscriber, "tcp://localhost:5556");
// Получение и обработка сообщений
while (1) {
char *string = zstr_recv (subscriber);
int64_t clock;
int terms = sscanf (string, "%" PRId64, &clock);
assert (terms == 1);
free (string);
// Логика самоуничтожения
if (zclock_time () - clock > MAX_ALLOWED_DELAY) {
fprintf (stderr, "E: Подписчик не может следовать за публишером, самоуничтожение\n");
break;
}
// Работа некоторое время
zclock_sleep (1 + randof (2));
}
zstr_send (pipe, "Подписчик остановлен");
}
// ---------------------------------------------------------------------
// Публишер отправляет сообщение с меткой времени каждую миллисекунду
static void
publisher (void *args, zctx_t *ctx, void *pipe)
{
// Подготовка публишера
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5556");
while (1) {
// Отправка текущего времени (миллисекунды) подписчику
char string [20];
sprintf (string, "%" PRId64, zclock_time ());
zstr_send (publisher, string);
char *signal = zstr_recv_nowait (pipe);
if (signal) {
free (signal);
break;
}
``` zclock_sleep(1); // Ожидание 1 миллисекунды
}
}
// Ниже приведен код для запуска подписчика и публишера, который останавливается, когда подписчик умирает
//
int main(void)
{
zctx_t *ctx = zctx_new();
void *pubpipe = zthread_fork(ctx, publisher, NULL);
void *subpipe = zthread_fork(ctx, subscriber, NULL);
free(zstr_recv(subpipe));
zstr_send(pubpipe, "break");
zclock_sleep(100);
zctx_destroy(&ctx);
return 0;
}
```Несколько пояснений:* Пример программы включает сообщения, содержащие текущий временной штамп (в миллисекундах). В реальных приложениях вы должны использовать временные штампы как заголовки сообщений и предоставлять содержание сообщений.
* В примере программа публикатор и подписчик являются двумя потоками одного процесса. В реальных приложениях они должны быть двумя отдельными процессами. В примере это сделано для удобства демонстрации.
### Быстрый подписчик (чёрный ящик)
Одним из типичных сценариев применения паттерна публикация-подписка является масштабируемое распределённое обработание данных. Например, если требуется обрабатывать данные, собранные с финансовых рынков, можно установить публикатора в системе торговли ценными бумагами для получения информации о ценах и отправки её группе подписчиков. Если у вас много подписчиков, вы можете использовать TCP. Если количество подписчиков достигнет определённого уровня, вам следует использовать надёжные протоколы широковещательной передачи, такие как PGM.Предположим, что наш публикатор генерирует 100 000 сообщений размером 100 байт каждую секунду. Даже после удаления ненужной информации о рынке, этот темп остаётся вполне разумным. Теперь нам нужно записать данные за день (около 8 часов, что составляет около 250 ГБ) и передать их в симуляцию сети, то есть в группу подписчиков. Хотя 100 000 сообщений в секунду легко обрабатывает ZMQ, нам требуется более высокая скорость.Предположим, что у нас есть несколько машин, одна из которых работает как публикатор, а остальные — как подписчики. Все эти машины имеют 8 ядер, а у машины-публиликатора — 12 ядер.
Когда мы начинаем отправлять сообщения, стоит обратить внимание на следующие моменты:
1. Даже обрабатывая небольшие объемы данных, подписчики могут не успевать за скоростью публикатора;
1. При обработке данных со скоростью 6 Мбит/с публикатор и подписчики могут достигнуть своих пределов.
Сначала нам нужно спроектировать подписчика как многопоточное приложение, чтобы мы могли читать сообщения в одном потоке и использовать другие потоки для их обработки. Обычно для каждого типа сообщений используется свой способ обработки. Таким образом, подписчик может выполнять фильтрацию полученных сообщений, например, на основе заголовков. Когда сообщение удовлетворяет определённым условиям, подписчик передаёт его рабочему потоку для обработки. В терминологии ZMQ подписчик передаёт сообщение рабочему потоку для обработки.
Таким образом, подписчик выглядит как устройство очереди, которое можно подключить к устройству очереди и рабочим потокам различными способами. Например, можно создать одностороннюю связь, где каждый рабочий поток одинаков, используя PUSH и PULL сокеты, а распределение работы оставить на ZMQ. Это самый простой и быстрый способ.```textdiagram```plaintext
+-----------+
| |
| Publisher |
| |
+-----------+
| PUB |
\-----+-----/
|
+---------------------------|---------------------------+
: | :
: v :
: /-----------\ :
: | SUB | :
: +-----------+ :
: | | :
: | Subscriber| :
: | | :
: +-----------+ :
: | PUSH | :
: \-----+-----/ :
: | :
: | :
: /---------------+---------------\ :
: | | | :
: v v v :
: /-----------\ /-----------\ /-----------\ :
: | PULL | | PULL | | PULL | :
: +-----------+ +-----------+ +-----------+ :
: | | | | | | :
: | Worker | | Worker | | Worker | :
: | | | | | | :
: +-----------+ +-----------+ +-----------+ :
: :
+-------------------------------------------------------+
Рисунок # - Простой шаблон черной коробки
```
Связь между подписчиком и издателем осуществляется с помощью протоколов TCP или PGM, а связь между подписчиком и worker'ами происходит внутри одного процесса и поэтому использует протокол inproc.
```Давайте рассмотрим, как преодолеть ограничения производительности. Поскольку подписчик является однопоточным, когда его использование ЦП достигает 100%, он не может использовать другие ядра. Однопоточные программы всегда сталкиваются с ограничениями производительности, независимо от того, сколько ядер доступно. Нам необходимо распределить нагрузку по нескольким потокам и выполнять задачи параллельно. Многие высокопроизводительные продукты используют схему шардирования, то есть разделение рабочей нагрузки на независимые параллельные потоки. Например, половина тематических данных передается одним потоком, а другая половина — другим. Мы можем создать больше потоков, но если количество ядер процессора не увеличивается, это становится бесполезным.
Давайте рассмотрим, как можно разделить рабочую нагрузку на два потока:
```textdiagram``` +-----------+
| |
| Publisher |
| |
+-----+-----+
| PUB | PUB |
\--+--+--+--/
| |
+------------------------|--=--|------------------------+
: | | Быстрая коробка :
: v v :
: /-----+-----\ :
: | SUB | SUB | :
: +-----+-----+ :
: | | :
: | Subscriber| :
: | | :
: +-----+-----+ :
: | PUSH|PUSH | :
: \--+--+--+--/ :
: | | :
: | | :
: /------------+--+ +------------\ :
: | | | :
: v v v :
: /-----------\ /-----------\ /-----------\ :
: | PULL | | PULL | | PULL | :
: +-----------+ +-----------+ +-----------+ :
: | | | | | | :
: | Worker | | Worker | | Worker | :
: | | | | | | :
: +-----------+ +-----------+ +-----------+ :
: :
+-------------------------------------------------------+ Рисунок # - Схема черной коробки
```
В этом примере показана схема "Черная коробка" (Black Box Pattern), которая демонстрирует, как рабочая нагрузка может быть распределена между несколькими потоками и рабочими процессами. Чтобы два потока работали на полной скорости, необходимо настроить ZMQ следующим образом:
* Использовать два I/O-потока вместо одного;
* Использовать два независимых сетевых интерфейса;
* Каждый I/O-поток привязать к одному сетевому интерфейсу;
* Два подписчика, каждый из которых привязан к одному ядру процессора;
* Использовать два SUB-сокета;
* Оставшиеся ядра использовать для работы потоков;
* Рабочие потоки привязать к двум PUSH-сокетам подписчиков.
Количество создаваемых потоков должно совпадать с количеством ядер процессора. Если количество создаваемых потоков превышает количество ядер, скорость обработки будет снижена. Кроме того, использование нескольких I/O-потоков также не является обязательным.
### Общий кэш ключ-значение (клонирование)Размещение-подписка похоже на радиовещание: вы не узнаете сообщения, отправленные до того, как вы начали слушать, а количество полученных сообщений зависит от вашей способности принимать их. Удивительно, но это устройство идеально подходит для тех, кто стремится к совершенству, и широко распространено как лучший механизм для распространения сообщений в реальной жизни. Подумайте о таких приложениях, как Weibo, Twitter, форумы новостей, спортивные новости и т. д.Однако в некоторых случаях надежная модель публикации-подписки также имеет значение. Как мы обсуждали модель запрос-ответ, мы будем определять "надежность" по "сбоям". Вот некоторые проблемы, которые могут возникнуть в модели публикации-подписки:
* Подписчики подключаются слишком медленно, поэтому они не получают первоначальные сообщения от публикатора;
* Подписчики работают слишком медленно, что приводит к потере сообщений;
* Подписчики могут отключиться, и все сообщения, отправленные во время отключения, будут потеряны.
Еще есть ситуации, которые встречаются реже, но все же возможны:
* Подписчики могут прекратить работу и перезапуститься, потеряв все полученные сообщения;
Если маршрутизатор работает слишком медленно, сообщения могут быть потеряны из-за накопления в очереди и перегрузки;
* Из-за перегрузки сети сообщения могут быть потеряны (особенно при использовании протокола PGM);
* При недостаточной скорости сети сообщения могут быть перегружены публикатором, что может привести к ошибкам.
В реальных применениях помимо таких ошибок, те, что указаны выше, также обычно возникают.В ZMQ мы уже解决了某些问题,例如可以为运行缓慢的订阅者使用自毁的蛇模式。然而,对于其他问题,我们最终可以构建一个可重用的框架来创建安全的发布-订阅模型。主要困难在于,我们不知道目标应用程序将如何处理这些数据。它可能会过滤消息,仅处理其中的一部分,将其存储以供以后使用,或将它们传递给worker进行处理。情况太多,每种情况都有其独特的可靠性特点。因此,我们将该问题抽象化,以便在各种应用程序中使用。我们称这种抽象为公共键值缓存,用于按唯一键存储二进制数据块。Не следует путать эту абстракцию с распределённой таблицей хешей, которая используется для решения задач соединения узлов в распределённой сети; также не стоит её путать с распределённой ключ-значение таблицей, которая больше похожа на NoSQL базу данных. Мы создаём приложение, которое надёжно передаёт состояние памяти группе клиентов, и это должно выполнять следующие действия:
* Клиент может присоединиться к сети в любое время и получить текущее состояние сервера;
* Любой клиент может изменять ключ-значение кэш (вставка, обновление, удаление);
* Эти изменения должны быть надёжно и с минимальной задержкой переданы всем клиентам;
* Обрабатывать большое количество клиентов, сотни и тысячи.
Ключевой момент в режиме клонирования заключается в том, что клиенты будут обратно взаимодействовать с сервером, что не является обычным для простого режима публикации-подписки. Поэтому здесь я использую слова "сервер" и "клиент", а не "публикатор" и "подписчик". Мы будем использовать режим публикации-подписки как основной сообщающийся режим, но также будем использовать и другие режимы.
#### Распределение событий обновления ключ-значениеМы будем внедрять режим клонирования поэтапно. Сначала рассмотрим, как отправлять события обновления ключ-значения с сервера ко всем клиентам. Мы модифицируем модель погодного сервиса, использованную в первой главе, чтобы отправлять информацию в виде пар ключ-значение и позволяем клиентам сохранять её в виде хэш-таблицы:```textdiagram
+-------------+
| |
| Сервер |
| |
+-------------+
| PUB |
\-------------/
|
|
обновления
|
+---------------+---------------+
| | |
| | |
v v v
/------------\ /------------\ /------------\
| SUB | | SUB | | SUB |
+------------+ +------------+ +------------+
| | | | | |
| Клиент | | Клиент | | Клиент |
| | | | | |
+------------+ +------------+ +------------+
```
Рисунок # - Простейшая модель клонирования
---
## Серверная часть кода:
**clonesrv1: Клон сервер, Модель Один на C**
```c
//
// Клонирование модели серверной части 1
//
// Давайте просто скомпилируем, не создавая библиотеки
#include "kvsimple.c"
int main (void)
{
// Подготовка контекста и публикационного сокета
zctx_t *ctx = zctx_new ();
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5556");
zclock_sleep (200);
zhash_t *kvmap = zhash_new ();
int64_t sequence = 0;
srandom ((unsigned) time (NULL));
while (!zctx_interrupted) {
// Распределение сообщений с помощью ключ-значение пар
kvmsg_t *kvmsg = kvmsg_new (++sequence);
kvmsg_fmt_key (kvmsg, "%d", randof (10000));
kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
kvmsg_send (kvmsg, publisher);
kvmsg_store (kvmsg, kvmap);
}
printf ("Прервано\nОтправлено %lld сообщений\n", (long long) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
```
---## Клиентская часть кода:
**clonecli1: Клон клиент, Модель Один на C**
```c
//
// Клонирование модели клиентской части 1
//
// Давайте просто скомпилируем, не создавая библиотеки
#include "kvsimple.c"
int main (void)
{
// Подготовка контекста и подписного сокета
zctx_t *ctx = zctx_new ();
void *updates = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (updates, "tcp://localhost:5556");
zhash_t *kvmap = zhash_new ();
int64_t sequence = 0;
while (TRUE) {
kvmsg_t *kvmsg = kvmsg_recv (updates);
if (!kvmsg)
break; // Прерывание
kvmsg_store (&kvmsg, kvmap);
sequence++;
}
printf ("Прервано\nПолучено %lld сообщений\n", (long long) sequence);
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
```
---### Несколько замечаний:
* Все сложные операции выполняются в классе kvmsg, который обрабатывает объекты сообщений типа ключ-значение. На самом деле это ZMQ-сообщение с несколькими фреймами, содержащее три фрейма: ключ (ZMQ-строка), номер (64 бита, расположенные в порядке байт) и двоичное тело (хранящее все дополнительные данные).
* Сервер случайным образом генерирует сообщения, используя четырёхзначные числа в качестве ключей, что позволяет имитировать большое количество записей вместо переполнения хеш-таблицы (10 000 записей).
* После того как сервер привязывает сокет, он ожидает 200 миллисекунд, чтобы избежать потери данных из-за задержек подключения подписчиков. Мы решим эту проблему в последующих моделях.
* Мы используем названия "публикация" и "подписка" для сокетов, используемых в программе, чтобы избежать путаницы с другими сокетами в последующих моделях. Вот упрощённый код класса kvmsg на C:
**kvsimple: Класс key-value message в C**```c
/* =====================================================================
kvsimple - простой класс сообщений ключ-значение для примеров приложений
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
Этот файл является частью руководства по ZeroMQ: http://zguide.zeromq.org
Это свободное программное обеспечение; вы можете распространять его и/или модифицировать его в соответствии с
``` условиями Генеральной общественной лицензии GNU версии 3, либо (по вашему выбору) любой более поздней версии.
Это программное обеспечение распространяется в надежде, что оно будет полезным, но
БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ; даже без скрытых гарантий товарной пригодности или пригодности для определенного назначения. См. Генеральную общественную лицензию GNU для более подробной информации.
У вас должна быть копия Генеральной общественной лицензии GNU; если нет, см.
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "kvsimple.h"
#include "zlist.h"
// Ключ представляет собой короткую строку
#define KVMSG_KEY_MAX 255
// Сообщение форматируется в три фрейма
// фрейм 0: ключ (ZMQ строка)
// фрейм 1: номер (8 байт, в порядке возрастания)
// фрейм 2: содержимое (блок двоичных данных)
#define FRAME_KEY 0
#define FRAME_SEQ 1
#define FRAME_BODY 2
#define KVMSG_FRAMES 3
// Структура класса
struct _kvmsg {
// Фрейм существует в сообщении
int present[KVMSG_FRAMES];
// Соответствующий ZMQ фрейм сообщения
zmq_msg_t frame[KVMSG_FRAMES];
// Преобразование ключа в C-строку
char key[KVMSG_KEY_MAX + 1];
};
// ---------------------------------------------------------------------
// Конструктор, устанавливает номер
kvmsg_t *
kvmsg_new(int64_t sequence)
{
kvmsg_t
*self;
self = (kvmsg_t *) zmalloc(sizeof(kvmsg_t));
kvmsg_set_sequence(self, sequence);
return self;
}
// ---------------------------------------------------------------------
// Деструктор
// Освобождает фреймы сообщения, доступен для zhash_freefn() функции
void
kvmsg_free(void *ptr)
{
```c
if (ptr) {
kvmsg_t *self = (kvmsg_t *) ptr;
// Освобождает фреймы сообщения
int frame_nbr;
}
```
```c
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present[frame_nbr]) {
zmq_msg_close(&self->frame[frame_nbr]);
}
}
// Освобождает объект
free(self);
}
void kvmsg_destroy(kvmsg_t **self_p) {
assert(self_p);
if (*self_p) {
kvmsg_free(*self_p);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Читает ключ-значение сообщение с сокета и возвращает экземпляр kvmsg
kvmsg_t *kvmsg_recv(void *socket) {
assert(socket);
kvmsg_t *self = kvmsg_new(0);
// Читает все кадры, при ошибке уничтожает объект
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present[frame_nbr]) {
zmq_msg_close(&self->frame[frame_nbr]);
}
zmq_msg_init(&self->frame[frame_nbr]);
self->present[frame_nbr] = 1;
if (zmq_recvmsg(socket, &self->frame[frame_nbr], 0) == -1) {
kvmsg_destroy(&self);
break;
}
// Проверяет мультифреймовое сообщение
int rcvmore = (frame_nbr < KVMSG_FRAMES - 1) ? 1 : 0;
if (zsockopt_rcvmore(socket) != rcvmore) {
kvmsg_destroy(&self);
break;
}
}
return self;
}
// ---------------------------------------------------------------------
// Отправляет ключ-значение сообщение на сокет, не проверяя содержимое кадров
void kvmsg_send(kvmsg_t *self, void *socket) {
assert(self);
assert(socket);
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
zmq_msg_t copy;
zmq_msg_init(&copy);
if (self->present[frame_nbr]) {
zmq_msg_copy(&copy, &self->frame[frame_nbr]);
}
zmq_sendmsg(socket, &copy,
(frame_nbr < KVMSG_FRAMES - 1) ? ZMQ_SNDMORE : 0);
zmq_msg_close(&copy);
}
}
``` // ---------------------------------------------------------------------
// Получает ключ из сообщения, если отсутствует, возвращает NULL
char *
kvmsg_key (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_KEY]) {
if (!*self->key) {
size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
if (size > KVMSG_KEY_MAX)
size = KVMSG_KEY_MAX;
memcpy (self->key,
zmq_msg_data (&self->frame [FRAME_KEY]), size);
self->key[size] = 0;
}
return self->key;
} else {
return NULL;
}
}
// ---------------------------------------------------------------------
// Возвращает номер сообщения
int64_t
kvmsg_sequence (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_SEQ]) {
assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
int64_t sequence = ((int64_t)(source[0]) << 56)
+ ((int64_t)(source[1]) << 48)
+ ((int64_t)(source[2]) << 40)
+ ((int64_t)(source[3]) << 32)
+ ((int64_t)(source[4]) << 24)
+ ((int64_t)(source[5]) << 16)
+ ((int64_t)(source[6]) << 8)
+ (int64_t)(source[7]);
return sequence;
} else {
return 0;
}
}
// ---------------------------------------------------------------------
// Возвращает содержимое сообщения, если оно отсутствует, возвращает NULL
byte *
kvmsg_body (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return (byte *)zmq_msg_data (&self->frame [FRAME_BODY]);
else {
return NULL;
}
}
// ---------------------------------------------------------------------
// Возвращает размер содержимого сообщения
size_t
kvmsg_size (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return zmq_msg_size (&self->frame [FRAME_BODY]);
else {
return 0;
}
}
// ---------------------------------------------------------------------```
// Устанавливает ключ сообщения
void
kvmsg_set_key (kvmsg_t *self, char *key)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_KEY];
if (self->present [FRAME_KEY])
zmq_msg_close (msg);
zmq_msg_init_size (msg, strlen (key));
memcpy (zmq_msg_data (msg), key, strlen (key));
self->present [FRAME_KEY] = 1;
}
// ---------------------------------------------------------------------
// Устанавливает номер сообщения
void
kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_SEQ];
if (self->present [FRAME_SEQ])
zmq_msg_close (msg);
zmq_msg_init_size (msg, 8);
byte *source = zmq_msg_data (msg);
source [0] = (byte) ((sequence >> 56) & 255);
source [1] = (byte) ((sequence >> 48) & 255);
source [2] = (byte) ((sequence >> 40) & 255);
source [3] = (byte) ((sequence >> 32) & 255);
source [4] = (byte) ((sequence >> 24) & 255);
source [5] = (byte) ((sequence >> 16) & 255);
source [6] = (byte) ((sequence >> 8) & 255);
source [7] = (byte) ((sequence) & 255);
self->present [FRAME_SEQ] = 1;
}
// ---------------------------------------------------------------------
// Устанавливает содержимое сообщения
void
kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_BODY];
if (self->present [FRAME_BODY])
zmq_msg_close (msg);
self->present [FRAME_BODY] = 1;
zmq_msg_init_size (msg, size);
memcpy (zmq_msg_data (msg), body, size);
}
// ---------------------------------------------------------------------
// Использует printf() для форматирования ключа сообщения
void
kvmsg_fmt_key (kvmsg_t *self, char *format, ...)
{
char value [KVMSG_KEY_MAX + 1];
va_list args;
assert (self);
va_start (args, format);
vsnprintf (value, KVMSG_KEY_MAX, format, args);
va_end (args);
kvmsg_set_key (self, value);
}
// ---------------------------------------------------------------------
// Использует sprintf() для форматирования содержимого сообщения
void
kvmsg_fmt_body (kvmsg_t *self, char *format, ...)
{
char value [255 + 1];
va_list args;
assert (self);
va_start (args, format);
vsnprintf (value, 255, format, args);
va_end (args);
kvmsg_set_body (self, (byte *) value, strlen (value));
}
``````c
va_start(args, format);
vsnprintf(value, 255, format, args);
va_end(args);
kvmsg_set_body(self, (byte *)value, strlen(value));
}
// ---------------------------------------------------------------------
// Если ключ и содержимое сообщения существуют в структуре kvmsg, то они добавляются в хеш-таблицу;
// Если структура kvmsg больше не используется, она автоматически уничтожается и освобождается.
void
kvmsg_store(kvmsg_t **self_p, zhash_t *hash)
{
assert(self_p);
if (*self_p) {
kvmsg_t *self = *self_p;
assert(self);
if (self->present[FRAME_KEY]
&& self->present[FRAME_BODY]) {
zhash_update(hash, kvmsg_key(self), self);
zhash_freefn(hash, kvmsg_key(self), kvmsg_free);
}
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Выводит содержимое сообщения в стандартный поток ошибок для отладки и отслеживания
void
kvmsg_dump(kvmsg_t *self)
{
if (self) {
if (!self) {
fprintf(stderr, "NULL");
return;
}
size_t размер = kvmsg_size(self);
byte* тело = kvmsg_body(self);
fprintf(stderr, "[seq:%" PRId64 "]", kvmsg_sequence(self));
fprintf(stderr, "[key:%s]", kvmsg_key(self));
fprintf(stderr, "[size:%zd] ", размер);
int char_nbr;
for (char_nbr = 0; char_nbr < размер; char_nbr++)
fprintf(stderr, "%02X", тело[char_nbr]);
fprintf(stderr, "\n");
} else {
fprintf(stderr, "NULL message\n");
}
}
```
Вот исправленный текст с учетом всех правил перевода и форматирования.// ---------------------------------------------------------------------
// Тестовые примеры
int
kvmsg_test(int verbose)
{
kvmsg_t
*сообщение;
printf(" * сообщение: ");
// Подготовка контекста и сокета
zctx_t *контекст = zctx_new();
void *выход = zsocket_new(контекст, ZMQ_DEALER);
int rc = zmq_bind(выход, "ipc://kvmsg_selftest.ipc");
assert(rc == 0);
void *вход = zsocket_new(контекст, ZMQ_DEALER);
rc = zmq_connect(вход, "ipc://kvmsg_selftest.ipc");
assert(rc == 0);
zhash_t *карта_сообщений = zhash_new();
// Тестирование отправки и получения простого сообщения
сообщение = kvmsg_new(1);
kvmsg_set_key(сообщение, "ключ");
kvmsg_set_body(сообщение, (byte *)"тело", 4);
if (verbose)
kvmsg_dump(сообщение);
kvmsg_send(сообщение, выход);
kvmsg_store(&сообщение, карта_сообщений);
сообщение = kvmsg_recv(вход);
if (verbose)
kvmsg_dump(сообщение);
assert(streq(kvmsg_key(сообщение), "ключ"));
kvmsg_store(&сообщение, карта_сообщений);
// Закрытие и удаление всех объектов
zhash_destroy(&карта_сообщений);
zctx_destroy(&контекст);
printf("OK\n");
return 0;
}
```
```Мы создадим более полный класс kvmsg, который можно использовать в реальных условиях.
Клиенты и серверы будут поддерживать хэш-таблицы, но этот подход требует, чтобы все клиенты запускались раньше сервера и не могли потеряться, что явно не обеспечивает надежность.
#### Создание снимков
```Чтобы новые подключенные (или восстановленные после сбоев) клиенты могли получить информацию о состоянии сервера, им нужно будет запросить снимок при подключении. Как мы упростили понятие "сообщение" до "номерованной пары ключ-значение", так и состояние можно упростить до "хэш-таблицы". Для получения состояния сервера клиент открывает REQ сокет и делает запрос:```textdiagram
+-----------------+
| |
| Сервер |
| |
+--------+--------+
| PUB | ROUTER |
\----+---+--------/
| ^
| | запрос состояния
обновления +---------------\
| |
/----------------+----------------\ |
| | | |
| | | |
v v v |
/------+-----\ /------+-----\ /------+--+--\
| SUB | REQ | | SUB | REQ | | SUB | REQ |
+------+-----+ +------+-----+ +------+-----+
| | | | | |
| Клиент | | Клиент | | Клиент |
| | | | | |
+------------+ +------------+ +------------+
Рисунок # - Репликация состояния
```
Нам нужно учитывать время, поскольку создание снимка занимает некоторое время, и нам нужно знать, с какого события обновления следует начинать обновление снимка. Сервер не знает, когда происходят события обновления. Один способ — начать подписываться на сообщения, а затем запросить снимок после получения первого сообщения. Таким образом, сервер должен сохранять снимок для каждого события обновления, что явно нереалистично.
Поэтому мы будем синхронизировать таким образом:* Клиент начинает подписываться на события обновления сервера и затем запрашивает снимок. Это гарантирует, что снимок был создан после последнего события обновления.*
* Клиент начинает ожидание снимка сервера и сохраняет события обновления в очереди. Это довольно просто — достаточно не читать сообщения из сокета; ZMQ автоматически сохраняет эти сообщения. В этом случае не следует устанавливать пороговое значение (HWM).** Когда клиент получает снимок, он снова начинает читать события обновления, но при этом ему нужно игнорировать те события, которые произошли до времени создания снимка. Например, если снимок был создан после 200 событий обновления, клиент будет читать события обновления начиная с 201.
* Затем клиент использует события обновления для обновления своего состояния.
Это довольно простая модель, так как она использует механизм очередей сообщений ZMQ. Код сервера выглядит следующим образом:
**clonesrv2: Сервер клонирования, Модель Два на C**```c
//
// Клонирование модели - серверная часть - модель 2
//
// Давайте сразу скомпилируем, не создавая библиотеки
#include "kvsimple.c"
static int s_send_single(char *key, void *data, void *args);
static void state_manager(void *args, zctx_t *ctx, void *pipe);
int main(void)
{
// Подготовка сокетов и контекста
zctx_t *ctx = zctx_new();
void *publisher = zsocket_new(ctx, ZMQ_PUB);
zsocket_bind(publisher, "tcp://*:5557");
int64_t sequence = 0;
srandom((unsigned)time(NULL));
// Запуск менеджера состояния и ожидание сигнала готовности
void *updates = zthread_fork(ctx, state_manager, NULL);
free(zstr_recv(updates));
while (!zctx_interrupted)
{
// Распределение сообщений ключ-значение
kvmsg_t *kvmsg = kvmsg_new(++sequence);
kvmsg_fmt_key(kvmsg, "%ld", randof(10000));
kvmsg_fmt_body(kvmsg, "%ld", randof(1000000));
kvmsg_send(kvmsg, publisher);
kvmsg_send(kvmsg, updates);
kvmsg_destroy(&kvmsg);
}
printf("Прервано\nОтправлено %ld сообщений\n", (long)sequence);
zctx_destroy(&ctx);
return 0;
}
```// Информация о стороннем запросе снимка
typedef struct
{
void *socket; // Сокет ROUTER для отправки снимков
zframe_t *identity; // Идентификатор запросчика
} kvroute_t;
// Отправка отдельной пары ключ-значение в снимке
// Используется объект kvmsg как носитель
static int s_send_single(char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *)args;
// Сначала отправляем идентификатор получателя
zframe_send(&kvroute->identity, kvroute->socket, ZFRAME_MORE | ZFRAME_REUSE);
kvmsg_t *kvmsg = (kvmsg_t *)data;
kvmsg_send(kvmsg, kvroute->socket);
return 0;
}
// Этот поток поддерживает состояние сервера и обрабатывает запросы на снимки.
static void state_manager(void *args, zctx_t *ctx, void *pipe)
{
zhash_t *kvmap = zhash_new();
zstr_send(pipe, "READY");
void *snapshot = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(snapshot, "tcp://*:5556");
zmq_pollitem_t items[] = {
{pipe, 0, ZMQ_POLLIN, 0},
{snapshot, 0, ZMQ_POLLIN, 0}
};
int64_t sequence = 0; // Текущий номер версии снимка
while (!zctx_interrupted)
{
int rc = zmq_poll(items, 2, -1);
if (rc == -1 && errno == ETERM)
break; // Прерывание контекста
}
}```c
// Ожидание событий обновления от основного потока
if (items[0].revents & ZMQ_POLLIN)
```
```c
kvmsg_t *kvmsg = kvmsg_recv(pipe);
if (!kvmsg)
break; // прерывание
sequence = kvmsg_sequence(kvmsg);
kvmsg_store(&kvmsg, kvmap);
}
// выполнение запроса на снимок
if (items[1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv(snapshot);
if (!identity)
break; // прерывание
// содержимое запроса находится во второй фрейме
char *request = zstr_recv(snapshot);
if (streq(request, "ICANHAZ? ")) {
free(request);
} else {
printf("E: неверный запрос, программа завершена\n");
break;
}
// отправка снимка клиенту
kvroute_t routing = { snapshot, identity };
// отправка каждого элемента отдельно
zhash_foreach(kvmap, s_send_single, &routing);
// отправка конечного маркера, содержащего номер версии снимка
printf("Отправка снимка, номер версии %d\n", (int) sequence);
zframe_send(&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new(sequence);
kvmsg_set_key(kvmsg, "KTHXBAI");
kvmsg_set_body(kvmsg, (byte *) "", 0);
kvmsg_send(kvmsg, snapshot);
kvmsg_destroy(&kvmsg);
}
}
zhash_destroy(&kvmap);
}
```
```Вот клиентский код:**clonecli2: Клиент для клонирования, модель 2 на C**
```c
//
// Клонирование - клиент - модель 2
//
// Давайте скомпилируем напрямую, не создавая библиотеки
#include "kvsimple.c"
int main (void)
{
// Подготовка контекста и SUB сокета
zctx_t *ctx = zctx_new ();
void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (subscriber, "tcp://localhost:5557");
zhash_t *kvmap = zhash_new ();
// Получение снимка
int64_t sequence = 0;
zstr_send (snapshot, "ICANHAZ?");
while (TRUE) {
kvmsg_t *kvmsg = kvmsg_recv (snapshot);
if (!kvmsg)
break; // Прерывание
if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence (kvmsg);
printf ("Снимок получен, версия=%ld\n", (long) sequence);
kvmsg_destroy (&kvmsg);
break; // Завершение
}
kvmsg_store (&kvmsg, kvmap);
}
// Применение событий обновления из очереди, игнорирование устаревших событий
while (!zctx_interrupted) {
kvmsg_t *kvmsg = kvmsg_recv (subscriber);
if (!kvmsg)
break; // Прерывание
if (kvmsg_sequence (kvmsg) > sequence) {
sequence = kvmsg_sequence (kvmsg);
kvmsg_store (&kvmsg, kvmap);
}
else
kvmsg_destroy (&kvmsg);
}
zhash_destroy (&kvmap);
zctx_destroy (&ctx);
return 0;
}
```
Несколько замечаний:
* Клиент использует два потока: один для генерации случайных событий обновления, другой для управления состоянием. Между ними используется PAIR сокет для связи. Возможно, вы подумали бы использовать SUB сокет, но проблема "slow joiner" могла бы повлиять на работу программы. PAIR сокет обеспечивает строгое синхронное взаимодействие между двумя потоками.* Мы установили пороговое значение (HWM) для сокета updates, чтобы избежать переполнения памяти в службе обновлений. В соединении протокола inproc пороговое значение является суммой пороговых значений обоих концов соединения, поэтому его следует устанавливать отдельно для каждого конца.
* Клиент довольно прост, написан на C и состоит примерно из 60 строк кода. Большая часть работы выполняется в классе kvmsg, хотя в целом реализация модели клонирования достаточно проста.
* Мы не использовали специальные методы сериализации содержимого состояния. Пары ключ-значение представлены объектами kvmsg и хранятся в хэш-таблице. При различных временных запросах к состоянию будут получены различные снимки. Предположим, что клиент взаимодействует только с одним сервисом, и этот сервис должен быть в рабочем состоянии. В данный момент мы не рассматриваем восстановление после сбоев сервиса.
В настоящее время эти две программы ещё не работают полноценно, но уже способны правильно синхронизировать состояние. Это сочетание различных моделей сообщений: пары PAIR внутри процесса, публикация-подписка, ROUTER-DEALER и другие.
#### Пересылка событий обновления ключейВо втором модели события обновления ключей происходят от сервера, создавая централизованную модель. Однако нам требуется кэш на клиентской стороне, который может обновляться и синхронизироваться с другими клиентами. В этом случае сервер просто является бесостоятельным промежуточным слоем, что приносит следующие преимущества:* Мы можем не слишком беспокоиться о надёжности сервера, так как даже если он выйдет из строя, мы всё равно сможем получить полные данные с клиентской стороны.
* Мы можем использовать кэширование ключей для передачи данных между динамическими узлами.
События обновления ключей на клиентской стороне передаются через сокеты PUSH-PULL на сервер:
```textdiagram
+--------------------------+
| |
| Сервер |
| |
+--------+--------+--------+
| PUB | ROUTER | PULL |
\----+---+--------+--------/
| ^ ^
| | | обновление состояния
| | \---------\
| | запрос состояния|
обновления \------------\ |
| | |
/-----------+-------------\ | |
| | | |
| ^ ^ | | |
v | | v | |
/------+--+--+--+---\ /------+--+--+--+---\
| SUB | REQ | PUSH | | SUB | REQ | PUSH |
+------+-----+------+ +------+-----+------+
| | | |
| Клиент | | Клиент |
| | | |
+-------------------+ +-------------------+
``` Рисунок # - Пересылка обновлений
```
Почему бы не позволить клиенту напрямую отправлять информацию об обновлениях другим клиентам? Хотя это может снизить задержки, это также делает невозможным добавление уникального и увеличивающегося номера для каждого события обновления. Многие приложения требуют, чтобы события обновлений были отсортированы каким-то образом, и только отправка сообщений на сервер и распределение обновлений сервером гарантируют последовательность событий обновления. После получения уникального номера клиент может обнаружить дополнительные сбои: сетевые заторы или переполнение очередей. Если клиент заметит пропуск в потоке сообщений, он сможет принять меры. Возможно, вы подумаете, что в этом случае следует заставить клиента уведомлять сервер, чтобы тот отправил повторно потерянные сообщения. Однако это не всегда необходимо. Пропуск в потоке сообщений указывает на плохое состояние сети, и если предпринять такие запросы, это лишь ухудшит ситуацию. Поэтому обычно клиент просто выдает предупреждение и прекращает работу до тех пор, пока его не обслужит специалист.```Мы начинаем создание модели для обновления состояния на стороне клиента. Ниже представлен код клиента:**clonesrv3: Клоновый сервер, модель Три на C**
```c
//
// Клонирование режима серверной модели 3
//
// Прямая компиляция, без создания библиотеки
#include "kvsimple.c"
static int s_send_single(char *key, void *data, void *args);
// Информация о стороннем запросе снимка
typedef struct {
void *socket; // ROUTER сокет
zframe_t *identity; // Идентификатор запросчика
} kvroute_t;
int main(void)
{
// Подготовка контекста и сокетов
zctx_t *ctx = zctx_new();
void *snapshot = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(snapshot, "tcp://*:5556");
void *publisher = zsocket_new(ctx, ZMQ_PUB);
zsocket_bind(publisher, "tcp://*:5557");
void *collector = zsocket_new(ctx, ZMQ_PULL);
zsocket_bind(collector, "tcp://*:5558");
int64_t sequence = 0;
zhash_t *kvmap = zhash_new();
zmq_pollitem_t items[] = {
{ collector, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
while (!zctx_interrupted) {
int rc = zmq_poll(items, 2, 1000 * ZMQ_POLL_MSEC);
// Выполнение событий обновления от клиентов
if (items[0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv(collector);
if (!kvmsg)
break; // Прерывание
kvmsg_set_sequence(kvmsg, ++sequence);
kvmsg_send(kvmsg, publisher);
kvmsg_store(&kvmsg, kvmap);
printf("I: Опубликовано событие обновления %5d\n", (int)sequence);
}
// Ответ на запрос снимка
if (items[1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv(snapshot);
if (!identity)
break; // Прерывание
// Запрос содержится во второй части сообщения
char *request = zstr_recv(snapshot);
if (streq(request, "ICANHAZ? ")) {
free(request);
} else {
printf("E: Некорректный запрос, программа завершена\n");
break;
}
// Отправка снимка
``````c
kvroute_t routing = { snapshot, identity };
// Send row by row
zhash_foreach(kvmap, s_send_single, &routing);
// Send final identifier and number
printf("I: Sending snapshot, version: %d\n", (int)sequence);
zframe_send(&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new(sequence);
kvmsg_set_key(kvmsg, "KTHXBAI");
kvmsg_set_body(kvmsg, (byte *)"", 0);
}
``` kvmsg_send(kvmsg, snapshot);
kvmsg_destroy(&kvmsg);
}
}
printf("Interrupted\nProcessed %d messages\n", (int)sequence);
zhash_destroy(&kvmap);
zctx_destroy(&ctx);
return 0;
}
``````Вот клиентский код:
**clonecli3: Клиент для клонирования, модель Three на C**```c
//
// Клонирование модели - клиент - модель 3
//
// Прямая компиляция, без создания библиотеки классов
#include "kvsimple.c"
int main(void)
{
// Подготовка контекста и SUB сокета
zctx_t *ctx = zctx_new();
void *snapshot = zsocket_new(ctx, ZMQ_DEALER);
zsocket_connect(snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new(ctx, ZMQ_SUB);
zsocket_connect(subscriber, "tcp://localhost:5557");
void *publisher = zsocket_new(ctx, ZMQ_PUSH);
zsocket_connect(publisher, "tcp://localhost:5558");
zhash_t *kvmap = zhash_new();
srandom((unsigned)time(NULL));
// Получение состояния снимка
int64_t sequence = 0;
zstr_send(snapshot, "ICANHAZ?");
while (TRUE)
{
kvmsg_t *kvmsg = kvmsg_recv(snapshot);
if (!kvmsg)
break; // Прерывание
if (streq(kvmsg_key(kvmsg), "KTHXBAI"))
{
sequence = kvmsg_sequence(kvmsg);
printf("I: Получено состояние снимка, версия: %ld\n", (long)sequence);
kvmsg_destroy(&kvmsg);
break; // Завершение
}
kvmsg_store(&kvmsg, kvmap);
}
int64_t alarm = zclock_time() + 1000;
while (!zctx_interrupted)
{
zmq_pollitem_t items[] = {{subscriber, 0, ZMQ_POLLIN, 0}};
int tickless = (int)((alarm - zclock_time()));
if (tickless < 0)
tickless = 0;
``` int rc = zmq_poll(items, 1, tickless * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Закрытие контекста
if (items[0].revents & ZMQ_POLLIN)
{
kvmsg_t *kvmsg = kvmsg_recv(subscriber);
if (!kvmsg)
break; // Прерывание
// Отбрасывание устаревших сообщений, включая пинг
if (kvmsg_sequence(kvmsg) > sequence)
{
sequence = kvmsg_sequence(kvmsg);
kvmsg_store(&kvmsg, kvmap);
printf("I: Получено событие обновления: %d\n", (int)sequence);
}
else
kvmsg_destroy(&kvmsg);
}
// Создание случайного события обновления
if (zclock_time() >= alarm)
{
kvmsg_t *kvmsg = kvmsg_new(0);
kvmsg_fmt_key(kvmsg, "%d", randof(10000));
kvmsg_fmt_body(kvmsg, "%d", randof(1000000));
kvmsg_send(kvmsg, publisher);
kvmsg_destroy(&kvmsg);
alarm = zclock_time() + 1000;
}
}
}
``````c
printf("Готово\nПолучено %d сообщений\n", (int)sequence);
zhash_destroy(&kvmap);
zctx_destroy(&ctx);
return 0;
}
```
```Несколько пояснений:
* Сервер объединён в один поток, который отвечает за сбор обновлений от клиентов и передачу их другим клиентам. Он использует PULL сокет для получения обновлений, ROUTER сокет для обработки запросов на снимки состояния и PUB сокет для публикации обновлений.
* Клиенты отправляют случайные обновления каждую секунду примерно, что в реальности происходит по инициативе приложения.
#### ПоддеревьяВ реальных системах ключ-значение могут меняться со временем, а клиенты могут нуждаться только в части этого кэша. Мы можем использовать поддеревья для решения этой проблемы: клиенты сообщают серверу, какие поддеревья они хотят получить при запросе снимка состояния, а также указывают поддеревья при подписке на обновления.Существуют различные синтаксисы для поддеревьев, включая "иерархическую путь" структуру и "тематическое дерево":
* Иерархическая путь: /some/list/of/paths
* Тематическое дерево: some.list.of.topics
В данном случае мы будем использовать структуру иерархической путь для расширения сервера и клиентов, чтобы работать с поддеревьями. Поддержка нескольких поддеревьев не является сложной задачей, поэтому мы не будем демонстрировать это здесь.
Вот код сервера, развившийся из модели 3:**clonesrv4: Клонирующий сервер, модель Четвёртая на C**```c
//
// Клонирование модели 4 для сервера
//
// Прямая компиляция, без создания библиотеки
#include "kvsimple.c"
static int s_send_single(char *key, void *data, void *args);
// Информация о стороннем запросе снимка
typedef struct {
void *socket; // ROUTER сокет
zframe_t *identity; // Идентификатор запросчика
char *subtree; // Указанный поддерево
} kvroute_t;
int main(void)
{
// Подготовка контекста и сокетов
zctx_t *ctx = zctx_new();
void *snapshot = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(snapshot, "tcp://*:5556");
void *publisher = zsocket_new(ctx, ZMQ_PUB);
zsocket_bind(publisher, "tcp://*:5557");
void *collector = zsocket_new(ctx, ZMQ_PULL);
zsocket_bind(collector, "tcp://*:5558");
int64_t sequence = 0;
zhash_t *kvmap = zhash_new();
zmq_pollitem_t items[] = {
{ collector, 0, ZMQ_POLLIN, 0 },
{ snapshot, 0, ZMQ_POLLIN, 0 }
};
while (! zctx_interrupted) {
int rc = zmq_poll(items, 2, 1000 * ZMQ_POLL_MSEC);
// Выполнение событий обновления от клиентов
if (items[0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv(collector);
if (! kvmsg)
break; // Прерывание
kvmsg_set_sequence(kvmsg, ++sequence);
kvmsg_send(kvmsg, publisher);
kvmsg_store(&kvmsg, kvmap);
printf("I: Опубликовано событие обновления %5d\n", (int)sequence);
}
// Ответ на запрос снимка
if (items[1].revents & ZMQ_POLLIN) {
zframe_t *identity = zframe_recv(snapshot);
if (! identity)
break; // Прерывание
// Запрос содержится во второй части сообщения
char *request = zstr_recv(snapshot);
char *subtree = NULL;
if (streq(request, "ICANHAZ? ")) {
free(request);
subtree = zstr_recv(snapshot);
}
else {
```c
printf("E: Некорректный запрос, программа завершена\n");
break;
}
// Отправка снимка
kvroute_t routing = { snapshot, identity, subtree };
// Отправка построчно
zhash_foreach(kvmap, s_send_single, &routing);
// Отправка завершающего идентификатора и номера
printf("I: Отправка снимка, версия: %d\n", (int)sequence);
}
}
}
``````c
zframe_send(&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new(sequence);
kvmsg_set_key(kvmsg, "KTHXBAI");
kvmsg_set_body(kvmsg, (byte *)subtree, 0);
kvmsg_send(kvmsg, snapshot);
kvmsg_destroy(&kvmsg);
free(subtree);
}
}
printf("Прервано\nОбработано %d сообщений\n", (int)sequence);
zhash_destroy(&kvmap);
zctx_destroy(&ctx);
return 0;
}
// Отправка состояния ключ-значение в сокет с использованием объекта kvmsg
static int
s_send_single(char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *)args;
kvmsg_t *kvmsg = (kvmsg_t *)data;
if (strlen(kvroute->subtree) <= strlen(kvmsg_key(kvmsg))
&& memcmp(kvroute->subtree,
kvmsg_key(kvmsg), strlen(kvroute->subtree)) == 0) {
// Сначала отправляем идентификатор получателя
zframe_send(&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send(kvmsg, kvroute->socket);
}
return 0;
}
```
Вот клиентский код:
```**clonecli4: Клиент для клонирования, модель Four на C**
```c
//
// Клонирование режима - клиент - модель 4
//
// Прямая компиляция, без создания библиотеки классов
#include "kvsimple.c"
#define SUBTREE "/client/"
int main(void)
{
// Подготовка контекста и SUB сокета
zctx_t *ctx = zctx_new();
void *snapshot = zsocket_new(ctx, ZMQ_DEALER);
zsocket_connect(snapshot, "tcp://localhost:5556");
void *subscriber = zsocket_new(ctx, ZMQ_SUB);
zsocket_connect(subscriber, "tcp://localhost:5557");
zsockopt_set_subscribe(subscriber, SUBTREE);
void *publisher = zsocket_new(ctx, ZMQ_PUSH);
zsocket_connect(publisher, "tcp://localhost:5558");
zhash_t *kvmap = zhash_new();
srandom((unsigned)time(NULL));
// Получение состояния снимка
int64_t sequence = 0;
zstr_sendm(snapshot, "ICANHAZ?");
zstr_send(snapshot, SUBTREE);
while (TRUE) {
kvmsg_t *kvmsg = kvmsg_recv(snapshot);
if (! kvmsg)
break; // Прерывание
if (streq(kvmsg_key(kvmsg), "KTHXBAI")) {
sequence = kvmsg_sequence(kvmsg);
printf("I: Получено состояние снимка, версия: %ld\n", (long)sequence);
kvmsg_destroy(&kvmsg);
break; // Завершение
}
kvmsg_store(&kvmsg, kvmap);
}
int64_t alarm = zclock_time() + 1000;
while (! zctx_interrupted) {
zmq_pollitem_t items[] = {{subscriber, 0, ZMQ_POLLIN, 0}};
int tickless = (int)((alarm - zclock_time()));
if (tickless < 0)
tickless = 0;
int rc = zmq_poll(items, 1, tickless * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Контекст закрыт
if (items[0].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv(subscriber);
if (! kvmsg)
break; // Прерывание
// Отбрасывание устаревших сообщений, включая пинг
if (kvmsg_sequence(kvmsg) > sequence) {
sequence = kvmsg_sequence(kvmsg);
kvmsg_store(&kvmsg, kvmap);
}
}
}
}
``````c
printf("I: Получено обновление, версия: %d\n", (int)sequence);
} else {
kvmsg_destroy(&kvmsg);
}
}
// Создание случайного обновления
if (zclock_time() >= alarm) {
kvmsg_t *kvmsg = kvmsg_new(0);
kvmsg_fmt_key(kvmsg, "%s%d", SUBTREE, randof(10000));
kvmsg_fmt_body(kvmsg, "%d", randof(1000000));
kvmsg_send(kvmsg, publisher);
kvmsg_destroy(&kvmsg);
alarm = zclock_time() + 1000;
}
}
printf("Готово\nПолучено %d сообщений\n", (int)sequence);
zhash_destroy(&kvmap);
zctx_destroy(&ctx);
return 0;
}
```#### Временные значения
```Временные значения относятся к тем данным, которые сразу же истёкнут. Если вы используете режим клонирования для создания службы, аналогичной DNS, то можно использовать временные значения для моделирования динамического анализа DNS. Когда узел подключается к сети, он публикует свой адрес и постоянно обновляет его. Если узел отключается, его адрес становится недействительным.
Временные значения могут быть связаны с сессией (session), и когда сессия завершается, временные значения также становятся недействительными. В режиме клонирования сессия определяется клиентом, и она исчезает, когда клиент отключается.
Проще всего установить для каждого временного значения срок действия, который клиент будет продлевать. Когда соединение прерывается, срок действия не обновляется, и сервер автоматически удаляет значение.
Мы будем использовать этот простой подход для реализации временных значений, поскольку более сложные методы могут быть излишними. Различие между ними заключается в производительности. Если у клиента много временных значений, то установка срока действия для каждого значения является правильным подходом; если количество временных значений достигает определённого уровня, то лучше связать их с сессией и управлять сроками действия вместе.Сначала нам нужно добавить срок действия в сообщение ключ-значение. Мы можем добавить дополнительный фрейм сообщения, но это означало бы, что каждый раз при добавлении нового содержимого нам нужно было бы изменять библиотеку kvmsg, что было бы неудобно. Поэтому мы добавляем один "атрибут" фрейма, который используется для добавления различных атрибутов сообщений.
Затем нам нужно организовать удаление данных. На данный момент сервер и клиент просто добавляют и изменяют данные в хэш-таблице. Мы можем определить, что если значение сообщения пустое, это означает удаление данных для данного ключа.
Ниже представлен полный код класса kvmsg, который реализует "атрибут" фрейм и фрейм UUID, который мы будем использовать позже. Этот класс также отвечает за обработку сообщений с пустыми значениями, чтобы удалить данные:
```python
class Kvmsg:
def __init__(self):
self.attributes = {}
self.uuid_frame = None
def add_attribute(self, attribute_name, value):
self.attributes[attribute_name] = value
def set_uuid_frame(self, uuid):
self.uuid_frame = uuid
def process_message(self, message):
if not message.value:
# Удаление данных для данного ключа
del self.data[message.key]
```
В этом коде `Kvmsg` класс добавляет атрибуты сообщений и управляет фреймом UUID. Также он обрабатывает сообщения с пустыми значениями для удаления данных.**kvmsg: Класс сообщения ключ-значение - полный на C**```c
/* =====================================================================
kvmsg - класс сообщений ключ-значение для примеров приложений
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
Этот файл является частью руководства по ZeroMQ: http://zguide.zeromq.org
Это свободное программное обеспечение; вы можете распространять его и/или модифицировать его в соответствии с
условиями Генеральной общественной лицензии GNU версии 3, либо (по вашему выбору) любой более поздней версии.
Это программное обеспечение распространяется в надежде, что оно будет полезным, но
БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ; даже без скрытой гарантии пригодности для использования или соответствия какому-либо назначению. Смотрите Генеральную общественную лицензию GNU для более подробной информации.
Вы должны были получить копию Генеральной общественной лицензии GNU вместе с этим программным обеспечением. Если нет, смотрите
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "kvmsg.h"
#include <uuid/uuid.h>
#include "zlist.h"
// Ключ — короткая строка
#define KVMSG_KEY_MAX 255
// Сообщение состоит из пяти фреймов
// frame 0: ключ (ZMQ строка)
// frame 1: номер (8 байт, в порядке возрастания)
// frame 2: UUID (блок данных, 16 байт)
// frame 3: свойства (ZMQ строка)
// frame 4: значение (блок данных)
#define FRAME_KEY 0
#define FRAME_SEQ 1
#define FRAME_UUID 2
``````c
#define FRAME_PROPS 3
#define FRAME_BODY 4
#define KVMSG_FRAMES 5
// Структура класса
struct _kvmsg {
// Флаги существования фреймов
int present[KVMSG_FRAMES];
// Соответствующие фреймы сообщения
zmq_msg_t frame[KVMSG_FRAMES];
// Ключ, строка C
char key[KVMSG_KEY_MAX + 1];
// Список свойств, в формате key=value
zlist_t *props;
size_t props_size;
};
// Преобразование списка свойств в строку
static void
s_encode_props(kvmsg_t *self)
{
zmq_msg_t *msg = &self->frame[FRAME_PROPS];
if (self->present[FRAME_PROPS])
zmq_msg_close(msg);
zmq_msg_init_size(msg, self->props_size);
char *prop = zlist_first(self->props);
char *dest = (char *)zmq_msg_data(msg);
while (prop) {
strcpy(dest, prop);
dest += strlen(prop);
*dest++ = '\n';
prop = zlist_next(self->props);
}
self->present[FRAME_PROPS] = 1;
}
// Разбор атрибутов из строки
static void
s_decode_props(kvmsg_t *self)
{
zmq_msg_t *msg = &self->frame[FRAME_PROPS];
self->props_size = 0;
while (zlist_size(self->props))
free(zlist_pop(self->props));
size_t remainder = zmq_msg_size(msg);
char *prop = (char *)zmq_msg_data(msg);
char *eoln = memchr(prop, '\n', remainder);
while (eoln) {
*eoln = 0;
zlist_append(self->props, strdup(prop));
self->props_size += strlen(prop) + 1;
remainder -= strlen(prop) + 1;
prop = eoln + 1;
eoln = memchr(prop, '\n', remainder);
}
}
// ---------------------------------------------------------------------
// Конструктор, задает номер сообщения
kvmsg_t *
kvmsg_new(int64_t sequence)
{
kvmsg_t *self;
self = (kvmsg_t *)zmalloc(sizeof(kvmsg_t));
self->props = zlist_new();
kvmsg_set_sequence(self, sequence);
return self;
}
// ---------------------------------------------------------------------
// Деструктор
// Функция освобождения памяти для вызова zhash_free_fn()
void
kvmsg_free(void *ptr)
{
if (ptr) {
kvmsg_t *self = (kvmsg_t *)ptr;
// Освобождает все фреймы сообщения
int frame_nbr;
```
```c
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present[frame_nbr]) {
zmq_msg_t *msg = &self->frame[frame_nbr];
zmq_msg_close(msg);
}
}
zlist_destroy(&self->props);
free(self);
}
}
``````c
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
if (self->present[frame_nbr])
zmq_msg_close(&self->frame[frame_nbr]);
// Освобождает список атрибутов
while (zlist_size(self->props))
free(zlist_pop(self->props));
zlist_destroy(&self->props);
// Освобождает объект сам по себе
free(self);
}
}
void
kvmsg_destroy(kvmsg_t **self_p)
{
assert(self_p);
if (*self_p) {
kvmsg_free(*self_p);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Копирует объект kvmsg
kvmsg_t *
kvmsg_dup(kvmsg_t *self)
{
kvmsg_t *kvmsg = kvmsg_new(0);
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present[frame_nbr]) {
zmq_msg_t *src = &self->frame[frame_nbr];
zmq_msg_t *dst = &kvmsg->frame[frame_nbr];
zmq_msg_init_size(dst, zmq_msg_size(src));
memcpy(zmq_msg_data(dst),
zmq_msg_data(src), zmq_msg_size(src));
kvmsg->present[frame_nbr] = 1;
}
}
kvmsg->props = zlist_copy(self->props);
return kvmsg;
}
// ---------------------------------------------------------------------
// Чтение ключевых значений из сокета и возврат объекта kvmsg
kvmsg_t *
kvmsg_recv(void *socket)
{
assert(socket);
kvmsg_t *self = kvmsg_new(0);
// Чтение всех фреймов, если произошла ошибка, возвращаем пустое значение
int frame_nbr;
for(frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
{
if(self->present[frame_nbr])
zmq_msg_close(&self->frame[frame_nbr]);
zmq_msg_init(&self->frame[frame_nbr]);
self->present[frame_nbr] = 1;
if(zmq_recvmsg(socket, &self->frame[frame_nbr], 0) == -1)
{
kvmsg_destroy(&self);
break;
}
// Проверка многофреймового сообщения
int rcvmore = (frame_nbr < KVMSG_FRAMES - 1) ? 1 : 0;
if(zsockopt_rcvmore(socket) != rcvmore)
{
kvmsg_destroy(&self);
break;
}
}
if(self)
s_decode_props(self);
}
``````c
return self;
}
// ---------------------------------------------------------------------
// Отправка ключевых значений в сокет, отправка даже для пустых сообщений
void
kvmsg_send(kvmsg_t *self, void *socket)
{
assert(self);
assert(socket);
s_encode_props(self);
int frame_nbr;
for(frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
{
zmq_msg_t copy;
zmq_msg_init(&copy);
if(self->present[frame_nbr])
zmq_msg_copy(&copy, &self->frame[frame_nbr]);
zmq_sendmsg(socket, &copy,
(frame_nbr < KVMSG_FRAMES - 1) ? ZMQ_SNDMORE : 0);
zmq_msg_close(&copy);
}
}
// ---------------------------------------------------------------------
// Возвращает ключ сообщения
char *
kvmsg_key(kvmsg_t *self)
{
assert(self);
if(self->present[FRAME_KEY])
{
if(!*self->key)
{
size_t size = zmq_msg_size(&self->frame[FRAME_KEY]);
if(size > KVMSG_KEY_MAX)
size = KVMSG_KEY_MAX;
memcpy(self->key, zmq_msg_data(&self->frame[FRAME_KEY]), size);
self->key[size] = 0;
}
return self->key;
}
else
return NULL;
}
// ---------------------------------------------------------------------
// Возвращает номер сообщения
int64_t
kvmsg_sequence(kvmsg_t *self)
{
assert(self);
if (self->present[FRAME_SEQ]) {
assert(zmq_msg_size(&self->frame[FRAME_SEQ]) == 8);
byte *source = zmq_msg_data(&self->frame[FRAME_SEQ]);
int64_t sequence = ((int64_t)(source[0]) << 56)
+ ((int64_t)(source[1]) << 48)
+ ((int64_t)(source[2]) << 40)
+ ((int64_t)(source[3]) << 32)
+ ((int64_t)(source[4]) << 24)
+ ((int64_t)(source[5]) << 16)
+ ((int64_t)(source[6]) << 8)
+ (int64_t)(source[7]);
return sequence;
} else {
return 0;
}
}
// ---------------------------------------------------------------------
// Возвращает UUID сообщения
byte *
kvmsg_uuid(kvmsg_t *self)
{
assert(self);
if (self->present[FRAME_UUID])
```
Все изменения были сделаны согласно указанным правилам. && zmq_msg_size(&self->frame[FRAME_UUID]) == sizeof(uuid_t)) {
return (byte *)zmq_msg_data(&self->frame[FRAME_UUID]);
} else {
return NULL;
}
}
// ---------------------------------------------------------------------
// Возвращает содержимое сообщения
byte *
kvmsg_body(kvmsg_t *self)
{
assert(self);
if (self->present[FRAME_BODY]) {
return (byte *)zmq_msg_data(&self->frame[FRAME_BODY]);
} else {
return NULL;
}
}
// ---------------------------------------------------------------------
// Возвращает размер содержимого сообщения
size_t
kvmsg_size(kvmsg_t *self)
{
assert(self);
if (self->present[FRAME_BODY]) {
return zmq_msg_size(&self->frame[FRAME_BODY]);
} else {
return 0;
}
}
// ---------------------------------------------------------------------
// Устанавливает ключ сообщения
void
kvmsg_set_key(kvmsg_t *self, char *key)
{
assert(self);
zmq_msg_t *msg = &self->frame[FRAME_KEY];
if (self->present[FRAME_KEY]) {
zmq_msg_close(msg);
}
zmq_msg_init_size(msg, strlen(key));
memcpy(zmq_msg_data(msg), key, strlen(key));
self->present[FRAME_KEY] = 1;
}
// ---------------------------------------------------------------------
// Устанавливает номер сообщения
void
kvmsg_set_sequence(kvmsg_t *self, int64_t sequence)
{
assert(self);
zmq_msg_t *msg = &self->frame[FRAME_SEQ];
if (self->present[FRAME_SEQ]) {
zmq_msg_close(msg);
}
zmq_msg_init_size(msg, 8);
byte *source = zmq_msg_data(msg);
source[0] = (byte)((sequence >> 56) & 255);
source[1] = (byte)((sequence >> 48) & 255);
source[2] = (byte)((sequence >> 40) & 255);
source[3] = (byte)((sequence >> 32) & 255);
source[4] = (byte)((sequence >> 24) & 255);
source[5] = (byte)((sequence >> 16) & 255);
source[6] = (byte)((sequence >> 8) & 255);
source[7] = (byte)(sequence & 255);
} self->present[FRAME_SEQ] = 1;
}
// ---------------------------------------------------------------------
// Генерация и установка UUID сообщения
void
kvmsg_set_uuid(kvmsg_t *self)
{
assert(self);
zmq_msg_t *msg = &self->frame[FRAME_UUID];
uuid_t uuid;
uuid_generate(uuid);
if (self->present[FRAME_UUID])
zmq_msg_close(msg);
zmq_msg_init_size(msg, sizeof(uuid));
memcpy(zmq_msg_data(msg), uuid, sizeof(uuid));
self->present[FRAME_UUID] = 1;
}
// ---------------------------------------------------------------------
// Установка содержимого сообщения
void
kvmsg_set_body(kvmsg_t *self, byte *body, size_t size)
{
assert(self);
zmq_msg_t *msg = &self->frame[FRAME_BODY];
if (self->present[FRAME_BODY])
zmq_msg_close(msg);
self->present[FRAME_BODY] = 1;
zmq_msg_init_size(msg, size);
memcpy(zmq_msg_data(msg), body, size);
}
// ---------------------------------------------------------------------
// Установка ключа сообщения с использованием printf() формата
void
kvmsg_fmt_key(kvmsg_t *self, char *format, ...)
{
char значение[KVMSG_KEY_MAX + 1];
va_list аргументы;
assert(self);
va_start(аргументы, format);
vsnprintf(значение, KVMSG_KEY_MAX, format, аргументы);
va_end(аргументы);
kvmsg_set_key(self, значение);
}
// ---------------------------------------------------------------------
// Установка содержимого сообщения с использованием printf() формата
void
kvmsg_fmt_body(kvmsg_t *self, char *format, ...)
{
char значение[255 + 1];
va_list аргументы;
assert(self);
va_start(аргументы, format);
vsnprintf(значение, 255, format, аргументы);
va_end(аргументы);
kvmsg_set_body(self, (byte *)значение, strlen(значение));
}
// ---------------------------------------------------------------------
// Получение свойства сообщения, если нет - возврат пустой строки
char *
kvmsg_get_prop(kvmsg_t *self, char *name)
{
```c
char *prop = zlist_first(self->props);
size_t namelen = strlen(name);
while (prop) {
if (strlen(prop) > namelen
&& memcmp(prop, name, namelen) == 0
&& prop[namelen] == '=')
return prop + namelen + 1;
prop = zlist_next(self->props);
}
return "";
}
// ---------------------------------------------------------------------
// Устанавливает свойства сообщения
// Имя свойства не может содержать знак '=', максимальная длина значения — 255 символов
void
kvmsg_set_prop(kvmsg_t *self, char *name, char *format, ...)
{
assert(strchr(name, '=') == NULL);
char value[255 + 1];
va_list args;
assert(self);
va_start(args, format);
vsnprintf(value, 255, format, args);
va_end(args);
}
``` // Выделяем память
char *prop = malloc(strlen(name) + strlen(value) + 2);
// Удаляем существующее свойство
sprintf(prop, "%s=", name);
char *existing = zlist_first(self->props);
while (existing) {
if (memcmp(prop, existing, strlen(prop)) == 0) {
self->props_size -= strlen(existing) + 1;
zlist_remove(self->props, existing);
free(existing);
break;
}
existing = zlist_next(self->props);
}```c
// Добавляем новое свойство
strcat(prop, value);
zlist_append(self->props, prop);
self->props_size += strlen(prop) + 1;
}
// ---------------------------------------------------------------------
// Сохраняет объект kvmsg в хеш-таблице
// Освобождает объект kvmsg, когда он больше не используется;
// Если переданный объект пустой, то удаляет его.
void
kvmsg_store(kvmsg_t **self_p, zhash_t *hash)
{
assert(self_p);
if (*self_p) {
kvmsg_t *self = *self_p;
assert(self);
if (kvmsg_size(self)) {
if (self->present[FRAME_KEY]
&& self->present[FRAME_BODY]) {
zhash_update(hash, kvmsg_key(self), self);
zhash_freefn(hash, kvmsg_key(self), kvmsg_free);
}
} else {
zhash_delete(hash, kvmsg_key(self));
}
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Выводит содержимое сообщения в стандартный поток ошибок
void
kvmsg_dump(kvmsg_t *self)
{
if (self) {
if (!self) {
fprintf(stderr, "NULL");
return;
}
size_t size = kvmsg_size(self);
byte *body = kvmsg_body(self);
fprintf(stderr, "[seq:%" PRId64 "]", kvmsg_sequence(self));
fprintf(stderr, "[key:%s]", kvmsg_key(self));
fprintf(stderr, "[size:%zd] ", size);
if (zlist_size(self->props)) {
fprintf(stderr, "[");
char *prop = zlist_first(self->props);
while (prop) {
fprintf(stderr, "%s;", prop);
prop = zlist_next(self->props);
}
fprintf(stderr, "]");
}
int char_nbr;
for (char_nbr = 0; char_nbr < size; char_nbr++) {
fprintf(stderr, "%02X", body[char_nbr]);
}
fprintf(stderr, "\n");
} else {
fprintf(stderr, "NULL message\n");
}
}
```
// ---------------------------------------------------------------------
// Тестовые примеры
int
kvmsg_test(int verbose)
{
kvmsg_t *kvmsg;
printf(" * kvmsg: ");
// Подготовка контекста и сокета
zctx_t *ctx = zctx_new();
void *output = zsocket_new(ctx, ZMQ_DEALER);
int rc = zmq_bind(output, "ipc://kvmsg_selftest.ipc");
assert(rc == 0);
void *input = zsocket_new(ctx, ZMQ_DEALER);
rc = zmq_connect(input, "ipc://kvmsg_selftest.ipc");
assert(rc == 0);
zhash_t *kvmap = zhash_new();
// Тестирование отправки и получения простых сообщений
kvmsg = kvmsg_new(1);
kvmsg_set_key(kvmsg, "key");
kvmsg_set_uuid(kvmsg);
kvmsg_set_body(kvmsg, (byte *)"body", 4);
if (verbose) {
kvmsg_dump(kvmsg);
}
kvmsg_send(kvmsg, output);
kvmsg_store(&kvmsg, kvmap);
kvmsg = kvmsg_recv(input);
if (verbose) {
kvmsg_dump(kvmsg);
}
assert(streq(kvmsg_key(kvmsg), "key"));
kvmsg_store(&kvmsg, kvmap);
// Тестирование отправки и получения сообщений с атрибутами
kvmsg = kvmsg_new(2);
kvmsg_set_prop(kvmsg, "prop1", "value1");
kvmsg_set_prop(kvmsg, "prop2", "value1");
kvmsg_set_prop(kvmsg, "prop2", "value2");
kvmsg_set_key(kvmsg, "key");
kvmsg_set_uuid(kvmsg);
kvmsg_set_body(kvmsg, (byte *)"body", 4);
assert(streq(kvmsg_get_prop(kvmsg, "prop2"), "value2"));
if (verbose) {
kvmsg_dump(kvmsg);
}
kvmsg_send(kvmsg, output);
kvmsg_destroy(&kvmsg);
kvmsg = kvmsg_recv(input);
if (verbose) {
kvmsg_dump(kvmsg);
}
assert(streq(kvmsg_key(kvmsg), "key"));
assert(streq(kvmsg_get_prop(kvmsg, "prop2"), "value2"));
}
``` kvmsg_destroy(&kvmsg);
// Закрыть и уничтожить все объекты
zhash_destroy(&kvmap);
zctx_destroy(&ctx);
printf("OK\n");
return 0;
}
```Клиентская модель 5 и модель 4 не имеют значительных различий, за исключением изменения библиотеки kvmsg. При обновлении сообщений необходимо добавить свойство с временем жизни.```c
kvmsg_set_prop(kvmsg, "ttl", "%d", randof(30));
```
Серверная модель 5 претерпела значительные изменения. Мы будем использовать реактор вместо поллинга, чтобы смешивать обработку таймерных событий и событий сокетов. Это может быть сложнее реализовать на языке C. Вот пример кода:
**clonesrv5: Клоновый сервер, Модель Пять на C**
```c
//
// Клонирование - сервер - модель 5
//
// Прямая компиляция, без создания библиотеки
#include "kvmsg.c"
// Обработчик реактора
static int s_snapshots(zloop_t *loop, void *socket, void *args);
static int s_collector(zloop_t *loop, void *socket, void *args);
static int s_flush_ttl(zloop_t *loop, void *socket, void *args);
// Атрибуты сервера
typedef struct {
zctx_t *ctx; // Контекст
zhash_t *kvmap; // Хранилище пар ключ-значение
zloop_t *loop; // zloop реактор
int port; // Основной порт
int64_t sequence; // Номер события обновления
void *snapshot; // Обработка запросов на снимок
void *publisher; // Публикация событий обновления
void *collector; // Сбор событий обновления от клиентов
} clonesrv_t;
int main(void) {
clonesrv_t *self = (clonesrv_t *) zmalloc(sizeof(clonesrv_t));
self->port = 5556;
self->ctx = zctx_new();
self->kvmap = zhash_new();
self->loop = zloop_new();
zloop_set_verbose(self->loop, FALSE);
// Открытие сокета сервера клонирования
self->snapshot = zsocket_new(self->ctx, ZMQ_ROUTER);
self->publisher = zsocket_new(self->ctx, ZMQ_PUB);
self->collector = zsocket_new(self->ctx, ZMQ_PULL);
zsocket_bind(self->snapshot, "tcp://*:%d", self->port);
zsocket_bind(self->publisher, "tcp://*:%d", self->port + 1);
zsocket_bind(self->collector, "tcp://*:%d", self->port + 2);
``` // Регистрация обработчиков реактора
zloop_reader(self->loop, self->snapshot, s_snapshots, self);
zloop_reader(self->loop, self->collector, s_collector, self);
zloop_timer(self->loop, 1000, 0, s_flush_ttl, self);
// Запуск реактора до прерывания
zloop_start(self->loop);
zloop_destroy(&self->loop);
zhash_destroy(&self->kvmap);
zctx_destroy(&self->ctx);
free(self);
return 0;
}
// ---------------------------------------------------------------------
// Отправка содержимого снимка
static int s_send_single(char *key, void *data, void *args);
// Информация о запросе
typedef struct {
void *socket; // Сокет ROUTER
zframe_t *identity; // Идентификатор запроса
char *subtree; // Информация о поддереве
} kvroute_t;```c
static int
s_snapshots(zloop_t *loop, void *snapshot, void *args) {
clonesrv_t *self = (clonesrv_t *) args;
zframe_t *identity = zframe_recv(snapshot);
}
if (identity) {
// Request located in the second frame of the message
char *request = zstr_recv(snapshot);
char *subtree = NULL;
if (streq(request, "ICANHAZ? ")) {
free(request);
subtree = zstr_recv(snapshot);
} else {
printf("E: Неверный запрос, программа завершена\n");
}
if (subtree) {
// Send state snapshot
kvroute_t routing = {snapshot, identity, subtree};
zhash_foreach(self->kvmap, s_send_single, &routing);
// Send end marker and version number
zclock_log("I: Отправка снимка состояния, версия: %d", (int) self->sequence);
zframe_send(&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new(self->sequence);
kvmsg_set_key(kvmsg, "KTHXBAI");
kvmsg_set_body(kvmsg, (byte *) subtree, 0);
kvmsg_send(kvmsg, snapshot);
kvmsg_destroy(&kvmsg);
free(subtree);
}
}
return 0;
}
// Send each key-value pair in the snapshot
static int
s_send_single(char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen(kvroute->subtree) <= strlen(kvmsg_key(kvmsg))
&& memcmp(kvroute->subtree,
kvmsg_key(kvmsg), strlen(kvroute->subtree)) == 0) {
// First send receiver identifier
zframe_send(&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send(kvmsg, kvroute->socket);
}
return 0;
}
// ---------------------------------------------------------------------
// Collect update events
static int
s_collector(zloop_t *loop, void *collector, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = kvmsg_recv(collector);
if (kvmsg) {
kvmsg_set_sequence(kvmsg, ++self->sequence);
kvmsg_send(kvmsg, self->publisher);
int ttl = atoi(kvmsg_get_prop(kvmsg, "ttl"));
if (ttl) {
kvmsg_set_prop(kvmsg, "ttl",
``````c
"%" PRId64, zclock_time() + ttl * 1000);
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: Publishing update event %d", (int) self->sequence);
}
return 0;
}
// ---------------------------------------------------------------------
// Удаление просроченных временных значений
static int s_flush_single(char *key, void *data, void *args);
static int
s_flush_ttl(zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
zhash_foreach(self->kvmap, s_flush_single, args);
return 0;
}
// Удаление просроченных ключ-значение пар и отправка события о удалении
static int
s_flush_single(char *key, void *data, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
int64_t ttl;
sscanf(kvmsg_get_prop(kvmsg, "ttl"), "%" PRId64, &ttl);
if (ttl && zclock_time() >= ttl) {
kvmsg_set_sequence(kvmsg, ++self->sequence);
kvmsg_set_body(kvmsg, (byte *) "", 0);
kvmsg_send(kvmsg, self->publisher);
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: Отправка события о удалении %d", (int) self->sequence);
}
return 0;
}
```
#### Надежность клонированного сервераКлонирование моделей 1 до 5 является относительно простым процессом, но далее мы рассмотрим очень сложную модель. Можно заметить, что для построения надежной очереди сообщений требуется много усилий. Поэтому часто задаётся вопрос: действительно ли это необходимо? Если вы готовы принять ненадёжную или достаточно надёжную архитектуру, то вы нашли баланс между затратами и выгодой. Хотя иногда могут теряться некоторые сообщения, с экономической точки зрения это может быть оправдано. В любом случае, давайте рассмотрим эту сложную модель.
В модели 3 вы будете отключать и перезапускать сервис, что приведёт к потере данных. Любые последующие клиенты смогут получить только те данные, которые были отправлены после перезапуска, а не все. Давайте попробуем найти способ сделать так, чтобы клонированная модель могла пережить перезапуск сервера.
Вот список проблем, которые нам нужно решить:
* Клонированный серверный процесс прекращает работу и автоматически или вручную перезапускается. Процесс теряет все данные, поэтому ему необходимо восстановиться из другого источника.
* Клонированный сервер испытывает аппаратные проблемы, которые длительное время не могут быть восстановлены. Клиентам необходимо переключиться на другой доступный сервер.* Клонированный сервер отключается от сети, например, из-за отказа коммутатора. Он соединяется снова в какой-то момент времени, но за это время данные должны быть обработаны запасным сервером.
Первым шагом является добавление еще одного сервера. Мы можем использовать двойственный режим, о котором говорилось в четвертой главе, который является реактором, и наша программа также может быть организована как реактор, поэтому они могут работать вместе.
Мы должны гарантировать, что события обновления будут сохранены при сбое основного сервера. Самый простой механизм — это отправка событий сразу на два сервера.
Запасной сервер можно рассматривать как клиента, который получает обновления от основного сервера, как и другие клиенты. В то же время он получает обновления от клиентов — хотя эти обновления не должны использоваться для обновления данных, они могут быть временно сохранены.
По сравнению с моделью 5, модель 6 вводит следующие характеристики:
* Клиенты отправляют события обновления через PUB-SUB сокеты вместо PUSH-PULL. Причина заключается в том, что PUSH сокеты блокируются, если нет получателя, и выполняют балансировку нагрузки — нам нужно, чтобы оба сервера получили сообщение. Мы связываем SUB сокеты на серверах и PUB сокеты на клиентах.* Мы добавляем пульсацию в события обновления, отправляемые сервером клиентам, чтобы клиенты знали, жив ли основной сервер, и могли переключиться на запасной сервер. Мы используем класс bstar реактора в режиме Gemini для создания основного и резервного узлов. В режиме Gemini требуется голосующий сокет для помощи в определении, жив ли другой узел. Здесь мы используем запросы кэша в качестве голосования.
Мы добавим UUID-атрибут для всех событий обновления, который будет генерироваться клиентом и публиковаться всем клиентам сервером.
Резервный узел будет поддерживать список ожидаемых, содержащий события обновления от клиентов, которые еще не были опубликованы сервером; или наоборот, события обновления от сервера, которые еще не были получены от клиентов. Этот список упорядочен по времени от старого к новому, что позволяет легко удалять сообщения сверху.
Мы можем спроектировать конечный автомат для клиента, состоящий из трех состояний:
* Клиент открывает и подключается к сокету, затем отправляет запрос кэша на сервер. Чтобы избежать бурю сообщений, он делает это только дважды.
* Клиент ожидает ответа на запрос кэша, если получает его, сохраняет; если нет, отправляет запрос на следующий сервер.
* Клиент получает кэш, начинает ожидание событий обновления. Если в течение определенного времени нет ответа от сервера, он подключается к следующему серверу.Клиент будет циклически повторять этот процесс, возможно, при запуске программы некоторые клиенты попытаются подключиться к основному серверу, а другие — к резервному. Уверены, что режим Gemini хорошо справится с этой ситуацией.
Мы можем изобразить диаграмму состояний клиента:
```textdiagram
+-----------+
| |<----------------------\
| Начальное |<-------------------\ |
| | | |
+-----+-----+ | |
Запрос|кэша | |
| /----------------\ | |
| | | | |
v v | | |
+-----------+ | | |
| +-INPUT--------/ |
| Синхронизация | Сохранение кэша |
| | |
| +-ПОЛУЧЕНИЕ ДАННЫХ------------/ |
+-----+-----+ Переход на следующий |
KTHXBAI |
| /----------------\ |
| | | |
v v | |
+-----------+ | |
| +-INPUT--------/ |
| Активное | Сохранение обновления |
| | |
| +-ПОЛУЧЕНИЕ ДАННЫХ---------------/
+-----------+ Переход на следующий
```
**Рисунок # - Клонирование FSM клиента**
Шаги восстановления состояния: * Клиент обнаруживает, что основной сервер перестал отправлять пинг, и переходит к подключению к резервному серверу, запрашивая новый снимок;
* Резервный сервер начинает принимать запросы на снимки и обнаруживает, что основной сервер вышел из строя, поэтому начинает работать в качестве основного сервера;
* Резервный сервер записывает события обновления из списка ожидания в своё состояние, а затем начинает обрабатывать запросы на снимки.Когда основной сервер восстанавливает соединение:
* Запускается в режиме slave и подключается к резервному серверу в режиме клонированного клиента;
* В то же время использует SUB сокет для получения событий обновления от клиента.
Мы делаем два предположения:
* По крайней мере один основной сервер продолжает работать. Если оба основных сервера вышли из строя, мы потеряем все данные сервера и не сможем восстановиться.
* Разные клиенты не будут одновременно обновлять одну и ту же пару ключ-значение. События обновления клиентов будут поступать на два сервера последовательно, поэтому порядок обновлений может отличаться. Однако порядок поступления событий обновления одного клиента на два сервера будет одинаковым, поэтому нет необходимости беспокоиться.
Вот общая схема:
```textdiagram``` +--------------------+ +--------------------+
| | Бинарный | |
| Основной |<--------------->| Резервный |
| | Стейдж | |
+-----+--------+-----+ +-----+--------+-----+
| PUB | ROUTER | SUB | | PUB | ROUTER | SUB |
\--+--+--------+-----/ \-----+--------+-----/
| ^ ^ ^
| | | |
| | | |
| | +--------------------------------------/
| | |
v | |
/-----+----+---+--+--\
| SUB | REQ | PUB |
+-----+--------+-----+
| |
| Клиент |
| |
+--------------------+ Рисунок # - Пары высокодоступных клонированных серверов
```Перед началом программирования нам необходимо преобразовать клиентскую часть в переиспользуемый класс. В ZMQ написание асинхронных классов иногда служит тренировкой по созданию элегантного кода, но здесь мы действительно хотим, чтобы режим клонирования был удобным для использования. Эластичность вышеупомянутой архитектуры зависит от правильного поведения клиента, поэтому имеет смысл обернуть его в API. Внедрение механизма восстановления после сбоев в клиентской части является сложной задачей, представьте себе, что такое сочетание свободного режима и режима клонирования. По моей привычке, я сначала создаю список API, а затем реализую его. Давайте представим API с названием clone и напишем клиентский API для режима клонирования. Упаковка кода в API явно повышает его стабильность. Возьмём модель 5 в качестве примера; клиенту требуется открыть три сокета, и имена конечных точек напрямую указаны в коде. Мы можем создать такую группу API:
```c
// Указание конечной точки для каждого сокета
clone_subscribe(clone, "tcp://localhost:5556");
clone_snapshot(clone, "tcp://localhost:5557");
clone_updates(clone, "tcp://localhost:5558");
// Поскольку есть два сервера, выполним это ещё раз
clone_subscribe(clone, "tcp://localhost:5566");
clone_snapshot(clone, "tcp://localhost:5567");
clone_updates(clone, "tcp://localhost:5568");
```Однако такой подход довольно громоздкий, поскольку нет необходимости выносить внутреннюю структуру API наружу. В настоящий момент мы используем три сокета, но в будущем может потребоваться использовать два или четыре. Невозможно заставить все приложения адаптироваться? Давайте упакуем эту информацию в API:
```c
// Указание конечных точек основного и резервного сервера
clone_connect(clone, "tcp://localhost:5551");
clone_connect(clone, "tcp://localhost:5561");
```
Таким образом, код становится очень простым, но это также влияет на внутреннюю структуру существующего кода. Нам нужно будет вычислять три конечные точки из одной. Один способ заключается в предположении, что клиент и сервер используют три последовательных порта для связи и записывают это правило в протокол; другой — запрос недостающих конечных точек у сервера. Мы используем первый, более простой метод:
* Серверный статус ROUTER находится на конечной точке P;
* Обновление событий PUB находится на конечной точке P + 1;
* Обновление событий SUB находится на конечной точке P + 2.
Класс `clone` и класс `flcliapi` из четвёртой главы очень похожи и состоят из двух частей: * Асинхронный агент режима клонирования, работающий в фоновом режиме. Этот агент управляет всеми операциями ввода-вывода и в реальном времени взаимодействует со сервером;
* Класс `clone`, который работает синхронно в переднем приложении. Когда вы создаёте объект `clone`, он автоматически создаёт фоновый поток `clone`; когда вы уничтожаете объект `clone`, этот фоновый поток также уничтожается.Передний класс `clone` использует `inproc` канал для связи с фоновым агентом. В C языке `czmq` поток автоматически создаёт этот канал для нас. Это также является обычным способом многопоточного программирования ZMQ. Если бы не было ZMQ, такая асинхронная модель была бы трудной для работы под высокими нагрузками, а ZMQ делает её простой. Написанный код будет относительно сложным. Мы можем использовать модель реактора для его написания, но это увеличит сложность и может повлиять на использование приложения. Поэтому наш API будет больше напоминать таблицу ключей для взаимодействия с сервером:
```c
clone_t *clone_new (void);
void clone_destroy (clone_t **self_p);
void clone_connect (clone_t *self, char *address, char *service);
void clone_set (clone_t *self, char *key, char *value);
char *clone_get (clone_t *self, char *key);
```
Вот пример кода для модели шестого клиента в C, поскольку он использует API, код становится очень коротким:
**clonecli6: Клиентская модель шесть в C**
```
//
// Клон-модель - клиент - модель 6
//
// Прямое компилирование, без создания библиотеки
#include "clone.c"
#define SUBTREE "/client/"
int main (void)
{
// Создание распределённой хеш-таблицы
clone_t *clone = clone_new ();
// Конфигурация
clone_subtree (clone, SUBTREE);
clone_connect (clone, "tcp://localhost", "5556");
clone_connect (clone, "tcp://localhost", "5566");
}
``` // Вставка случайных ключей и значений
while (!zctx_interrupted) {
// Генерация случайного значения
char key [255];
char value [10];
sprintf (key, "%s%d", SUBTREE, randof (10000));
sprintf (value, "%d", randof (1000000));
clone_set (clone, key, value, randof (30));
sleep (1);
}
clone_destroy (&clone);
return 0;
}
```Вот реализация класса clone:
**clone: Класс clone в C**```c
/* =====================================================================
clone - клиентская сторона Clone Pattern класса
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
Этот файл является частью руководства по ZeroMQ: http://zguide.zeromq.org
Это свободное программное обеспечение; вы можете распространять его и/или модифицировать его в соответствии с
условиями Генеральной общественной лицензии GNU версии 3, выпущенной Free Software Foundation, или (по вашему выбору) любой более поздней версией.
Это программное обеспечение распространяется в надежде, что оно будет полезным, но
БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ; даже без неявной гарантии
КОММЕРЧЕСКОЙ НАПРАВЛЕННОСТИ или ПРИГОДНОСТИ ДЛЯ ОПРЕДЕЛЕННЫХ ЦЕЛЕЙ. Смотрите Генеральную общественную лицензию GNU для более подробной информации.
Вы должны были получить копию Генеральной общественной лицензии GNU вместе с этим программным обеспечением. Если нет, смотрите
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "clone.h"
// Время ожидания запроса
#define GLOBAL_TIMEOUT 4000 // мсек
// Время жизни сервера
#define SERVER_TTL 5000 // мсек
// Количество серверов
#define SERVER_MAX 2
// =====================================================================
// Синхронная часть, работающая в потоке приложения
// ---------------------------------------------------------------------
// Структура класса
struct _clone_t {
```c
zctx_t *ctx; // контекст
void *pipe; // канал связи с агентом в фоновом режиме
};
// Этот поток используется для обработки настоящего класса clone
static void clone_agent(void *args, zctx_t *ctx, void *pipe);
// ---------------------------------------------------------------------
// Конструктор
clone_t *
clone_new(void)
{
clone_t *self;
self = (clone_t *)zmalloc(sizeof(clone_t));
self->ctx = zctx_new();
self->pipe = zthread_fork(self->ctx, clone_agent, NULL);
return self;
}
// ---------------------------------------------------------------------
// Деструктор
void
clone_destroy(clone_t **self_p)
{
assert(self_p);
if (*self_p) {
clone_t *self = *self_p;
zctx_destroy(&self->ctx);
free(self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Указание поддерева для создания снимка и события обновления перед отправкой в агента-посредника
// Содержимое сообщения: [SUBTREE][subtree]
void clone_subtree(clone_t *self, char *subtree)
{
assert(self);
zmsg_t *msg = zmsg_new();
zmsg_addstr(msg, "SUBTREE");
zmsg_addstr(msg, subtree);
zmsg_send(&msg, self->pipe);
}
// ---------------------------------------------------------------------
// Подключение к новому серверному конечному узлу
// Содержимое сообщения: [CONNECT][endpoint][service]
void clone_connect(clone_t *self, char *address, char *service)
{
assert(self);
zmsg_t *msg = zmsg_new();
zmsg_addstr(msg, "CONNECT");
zmsg_addstr(msg, address);
zmsg_addstr(msg, service);
zmsg_send(&msg, self->pipe);
}
// ---------------------------------------------------------------------
// Установка нового значения
// Содержимое сообщения: [SET][key][value][ttl]
void clone_set(clone_t *self, char *key, char *value, int ttl)
{
char ttlstr[10];
sprintf(ttlstr, "%d", ttl);
assert(self);
zmsg_t *msg = zmsg_new();
zmsg_addstr(msg, "SET");
zmsg_addstr(msg, key);
zmsg_addstr(msg, value);
zmsg_addstr(msg, ttlstr);
}
``` zmsg_send(&msg, self->pipe);
}
// ---------------------------------------------------------------------
// Получение значения
// Содержимое сообщения: [GET][key]
// Возвращает NULL, если клон недоступен
char *clone_get(clone_t *self, char *key)
{
assert(self);
assert(key);
zmsg_t *msg = zmsg_new();
zmsg_addstr(msg, "GET");
zmsg_addstr(msg, key);
zmsg_send(&msg, self->pipe);
zmsg_t *reply = zmsg_recv(self->pipe);
if (reply)
{
char *value = zmsg_popstr(reply);
zmsg_destroy(&reply);
return value;
}
return NULL;
}
// =====================================================================
// Асинхронная часть, выполняющаяся в фоновом режиме
// ---------------------------------------------------------------------
// Информация о единичном серверном узле
typedef struct
{
char *address; // адрес серверного узла
int port; // порт
void *snapshot; // сокет для создания снимка
void *subscriber; // сокет для получения событий обновления
uint64_t expiry; // время истечения срока действия сервера
uint requests; // количество запросов на создание снимков
} server_t;static server_t *```c
server_new (zctx_t *ctx, char *address, int port, char *subtree)
{
server_t *self = (server_t *) zmalloc (sizeof (server_t));
zclock_log ("I: adding server %s:%d. . . ", address, port);
self->address = strdup (address);
self->port = port;
self->snapshot = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (self->snapshot, "%s:%d", address, port);
self->subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (self->subscriber, "%s:%d", address, port + 1);
zsockopt_set_subscribe (self->subscriber, subtree);
return self;
}
static void
server_destroy (server_t **self_p)
{
assert (self_p);
if (*self_p) {
server_t *self = *self_p;
free (self->address);
free (self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Backend agent
// States
#define STATE_INITIAL 0 // Before connection
#define STATE_SYNCING 1 // During synchronization
#define STATE_ACTIVE 2 // During update
typedef struct {
zctx_t *ctx; // Context
void *pipe; // Socket for communication with the main thread
zhash_t *kvmap; // Key-value map
char *subtree; // Subtree
server_t *server [SERVER_MAX];
uint nbr_servers; // Range: 0 - SERVER_MAX
uint state; // Current state
uint cur_server; // Current master, 0/1
int64_t sequence; // Key-value pair number
void *publisher; // Socket for publishing update events
} agent_t;
static agent_t *
agent_new (zctx_t *ctx, void *pipe)
{
agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
self->ctx = ctx;
self->pipe = pipe;
self->kvmap = zhash_new ();
self->subtree = strdup ("");
self->state = STATE_INITIAL;
self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
return self;
}
static void
agent_destroy (agent_t **self_p)
{
assert (self_p);
if (*self_p) {
agent_t *self = *self_p;
int server_nbr;
for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++)```c
zmsg_t *msg = zmsg_recv(self->pipe);
char *command = zmsg_popstr(msg);
if (command == NULL)
return -1;
if (streq(command, "SUBTREE")) {
free(self->subtree);
self->subtree = zmsg_popstr(msg);
}
else if (streq(command, "CONNECT")) {
char *address = zmsg_popstr(msg);
char *service = zmsg_popstr(msg);
if (self->nbr_servers < SERVER_MAX) {
self->server[self->nbr_servers++] = server_new(
self->ctx, address, atoi(service), self->subtree);
// Publish update event
zsocket_connect(self->publisher, "%s:%d",
address, atoi(service) + 2);
} else {
zclock_log("E: слишком много серверов (макс. %d)", SERVER_MAX);
}
free(address);
free(service);
}
else if (streq(command, "SET")) {
char *key = zmsg_popstr(msg);
char *value = zmsg_popstr(msg);
char *ttl = zmsg_popstr(msg);
zhash_update(self->kvmap, key, (byte *)value);
zhash_freefn(self->kvmap, key, free);
// Send key-value pair to server
kvmsg_t *kvmsg = kvmsg_new(0);
kvmsg_set_key(kvmsg, key);
kvmsg_set_uuid(kvmsg);
kvmsg_fmt_body(kvmsg, "%s", value);
kvmsg_set_prop(kvmsg, "ttl", ttl);
kvmsg_send(kvmsg, self->publisher);
kvmsg_destroy(&kvmsg);
puts(key);
free(ttl);
free(key); // Key-value pairs are actually controlled by the hash map object
}
else if (streq(command, "GET")) {
char *key = zmsg_popstr(msg);
char *value = zhash_lookup(self->kvmap, key);
if (value)
zstr_send(self->pipe, value);
else
zstr_send(self->pipe, "");
free(key);
free(value);
}
free(command);
zmsg_destroy(&msg);
return 0;
}
```// ---------------------------------------------------------------------
// Асинхронный фоновый агент поддерживает пул серверов и обрабатывает запросы или ответы от приложений.
static void
clone_agent(void *args, zctx_t *ctx, void *pipe)
{
agent_t *self = agent_new(ctx, pipe);
while (TRUE) {
zmq_pollitem_t poll_set[] = {
{ pipe, 0, ZMQ_POLLIN, 0 },
{ 0, 0, ZMQ_POLLIN, 0 }
};
int poll_timer = -1;
int poll_size = 2;
server_t *server = self->server[self->cur_server];
switch (self->state) {
case STATE_INITIAL:
// В этом состоянии, если есть доступные серверы, отправляется запрос на получение снимка
if (self->nbr_servers > 0) {
zclock_log("I: Ожидание сервера %s:%d... ",
server->address, server->port);
if (server->requests < 2) {
zstr_sendm(server->snapshot, "ICANHAZ?");
zstr_send(server->snapshot, self->subtree);
server->requests++;
}
server->expiry = zclock_time() + SERVER_TTL;
self->state = STATE_SYNCING;
poll_set[1].socket = server->snapshot;
} else {
poll_size = 1;
}
break;
case STATE_SYNCING:
// В этом состоянии мы принимаем снимок данных от сервера, при неудаче пробуем другой сервер
poll_set[1].socket = server->snapshot;
break;
case STATE_ACTIVE:
// В этом состоянии мы получаем события обновления от сервера, при неудаче пробуем другой сервер
poll_set[1].socket = server->subscriber;
break;
}
if (server) {
poll_timer = (server->expiry - zclock_time()) * ZMQ_POLL_MSEC;
if (poll_timer < 0) {
poll_timer = 0;
}
}
}
} // ------------------------------------------------------------
// Цикл zmq_poll
int rc = zmq_poll(poll_set, poll_size, poll_timer);
if (rc == -1)
break; // Контекст был закрыт
if (poll_set[0].revents & ZMQ_POLLIN) {
if (agent_control_message(self))
break; // Прерывание
} else if (poll_set[1].revents & ZMQ_POLLIN) {
kvmsg_t *kvmsg = kvmsg_recv(poll_set[1].socket);
if (!kvmsg)
break; // Прерывание
// Любое сообщение от сервера обновляет его время истечения
server->expiry = zclock_time() + SERVER_TTL;
if (self->state == STATE_SYNCING) {
// Сохранение снимка данных
server->requests = 0;
if (streq(kvmsg_key(kvmsg), "KTHXBAI")) {
self->sequence = kvmsg_sequence(kvmsg);
self->state = STATE_ACTIVE;
zclock_log("I: received from %s:%d snapshot=%d",
server->address, server->port,
(int) self->sequence);
kvmsg_destroy(&kvmsg);
} else {
kvmsg_store(&kvmsg, self->kvmap);
}
} else if (self->state == STATE_ACTIVE) {
// Отбрасываем просроченные события обновления
if (kvmsg_sequence(kvmsg) > self->sequence) {
self->sequence = kvmsg_sequence(kvmsg);
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: received from %s:%d update=%d",
server->address, server->port,
(int) self->sequence);
} else {
kvmsg_destroy(&kvmsg);
}
}
} else {
// Сервер недоступен, пробуем другой сервер
zclock_log("I: server %s:%d unavailable",
server->address, server->port);
} self->cur_server = (self->cur_server + 1) % self->nbr_servers;
self->state = STATE_INITIAL;
}
}
agent_destroy(&self);
}
```
```Последним является код модели 6-клонирующего сервера:**clonesrv6: Клонирующий сервер, Модель Шесть на C**
```c
//
// Клонирование - серверная часть - модель 6
//
// Прямая компиляция, без создания библиотеки
#include "bstar.c"
#include "kvmsg.c"
// API для bstar реактора
static int s_snapshots(zloop_t *loop, void *socket, void *args);
static int s_collector(zloop_t *loop, void *socket, void *args);
static int s_flush_ttl(zloop_t *loop, void *socket, void *args);
static int s_send_hugz(zloop_t *loop, void *socket, void *args);
static int s_new_master(zloop_t *loop, void *unused, void *args);
static int s_new_slave(zloop_t *loop, void *unused, void *args);
static int s_subscriber(zloop_t *loop, void *socket, void *args);
// Атрибуты сервера
typedef struct {
zctx_t *ctx; // Контекст
zhash_t *kvmap; // Хранение пар ключ-значение
bstar_t *bstar; // Ядро bstar реактора
int64_t sequence; // Номер события обновления
int port; // Основной порт
int peer; // Порт для партнера
void *publisher; // Порт для публикации событий обновления
void *collector; // Порт для сбора событий обновления от клиентов
void *subscriber; // Порт для подписки на события обновления от партнера
zlist_t *pending; // Список отложенных событий обновления
bool primary; // Является ли основным сервером
bool master; // Является ли мастером
bool slave; // Является ли слейвом
} clonesrv_t;
``````c
int main(int argc, char *argv[]) {
clonesrv_t *self = (clonesrv_t *) zmalloc(sizeof(clonesrv_t));
if (argc == 2 && streq(argv[1], "-p")) {
zclock_log("I: Запущен как основной сервер мастер, ожидаю подключения слейва.");
self->bstar = bstar_new(BSTAR_PRIMARY, "tcp://*:5003", "tcp://localhost:5004");
bstar_voter(self->bstar, "tcp://*:5556", ZMQ_ROUTER, s_snapshots, self);
self->port = 5556;
self->peer = 5566;
self->primary = TRUE;
} else if (argc == 2 && streq(argv[1], "-b")) {
zclock_log("I: Запущен как слейв, ожидаю подключения мастера.");
self->bstar = bstar_new(BSTAR_BACKUP, "tcp://*:5004", "tcp://localhost:5003");
bstar_voter(self->bstar, "tcp://*:5566", ZMQ_ROUTER, s_snapshots, self);
self->port = 5566;
self->peer = 5556;
self->primary = FALSE;
} else {
printf("Использование: clonesrv4 { -p | -b }\n");
free(self);
exit(0);
}
// Хост станет мастером
if (self->primary) {
self->kvmap = zhash_new();
}
self->ctx = zctx_new();
self->pending = zlist_new();
bstar_set_verbose(self->bstar, TRUE);
// Устанавливаем сокет сервера клонирования
self->publisher = zsocket_new(self->ctx, ZMQ_PUB);
self->collector = zsocket_new(self->ctx, ZMQ_SUB);
zsocket_bind(self->publisher, "tcp://*:%d", self->port + 1);
zsocket_bind(self->collector, "tcp://*:%d", self->port + 2);
// Подключаемся как клиент клонирования к партнеру
self->subscriber = zsocket_new(self->ctx, ZMQ_SUB);
zsocket_connect(self->subscriber, "tcp://localhost:%d", self->peer + 1);
// Регистрируем обработчики событий состояния
bstar_new_master(self->bstar, s_new_master, self);
bstar_new_slave(self->bstar, s_new_slave, self);
// Регистрируем обработчики других событий в реакторе bstar
zloop_reader(bstar_zloop(self->bstar), self->collector, s_collector, self);
zloop_timer(bstar_zloop(self->bstar), 1000, 0, s_flush_ttl, self);
zloop_timer(bstar_zloop(self->bstar), 1000, 0, s_send_hugz, self);
}
``` // Запускаем реактор bstar
bstar_start(self->bstar);
// Прерывание, завершение.
while (zlist_size(self->pending)) {
kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop(self->pending);
kvmsg_destroy(&kvmsg);
}
zlist_destroy(&self->pending);
bstar_destroy(&self->bstar);
zhash_destroy(&self->kvmap);
zctx_destroy(&self->ctx);
free(self);
return 0;
}
// ---------------------------------------------------------------------
// Отправка содержимого снимка
static int s_send_single(char *key, void *data, void *args);
// Информация о запросе
typedef struct {
void *socket; // ROUTER сокет
zframe_t *identity; // Идентификатор запроса
char *subtree; // Поддерево
} kvroute_t;
static int
s_snapshots(zloop_t *loop, void *snapshot, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
zframe_t *identity = zframe_recv(snapshot);
if (identity) {
// Запрос находится во второй фрейме сообщения
char *request = zstr_recv(snapshot);
char *subtree = NULL;
if (streq(request, "ICANHAZ?")) {
free(request);
subtree = zstr_recv(snapshot);
}
else {
printf("E: Ошибочный запрос, завершение работы...\n");
}
if (subtree) {
// Отправка состояния
kvroute_t routing = { snapshot, identity, subtree };
zhash_foreach(self->kvmap, s_send_single, &routing);
// Отправка сообщения о завершении, а также номера сообщения
zclock_log("I: Отправка состояния, версия: %d", (int) self->sequence);
zframe_send(&identity, snapshot, ZFRAME_MORE);
kvmsg_t *kvmsg = kvmsg_new(self->sequence);
kvmsg_set_key(kvmsg, "KTHXBAI");
kvmsg_set_body(kvmsg, (byte *) subtree, 0);
kvmsg_send(kvmsg, snapshot);
kvmsg_destroy(&kvmsg);
free(subtree);
}
}
return 0;
}
// Каждый раз отправляем пару ключ-значение состояния
static int```c
s_send_single(char *key, void *data, void *args)
{
kvroute_t *kvroute = (kvroute_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
if (strlen(kvroute->subtree) <= strlen(kvmsg_key(kvmsg)))
&& memcmp(kvroute->subtree,
kvmsg_key(kvmsg), strlen(kvroute->subtree)) == 0) {
// First send the recipient address
zframe_send(&kvroute->identity,
kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
kvmsg_send(kvmsg, kvroute->socket);
}
return 0;
}
// Collect updates from clients
// If we are master, record the event in the kvmap object;
// If we are slave, write it to the delay queue
static int s_was_pending(clonesrv_t *self, kvmsg_t *kvmsg);
static int
s_collector(zloop_t *loop, void *collector, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = kvmsg_recv(collector);
kvmsg_dump(kvmsg);
if (kvmsg) {
if (self->master) {
kvmsg_set_sequence(kvmsg, ++self->sequence);
kvmsg_send(kvmsg, self->publisher);
int ttl = atoi(kvmsg_get_prop(kvmsg, "ttl"));
if (ttl)
kvmsg_set_prop(kvmsg, "ttl",
"%" PRId64, zclock_time() + ttl * 1000);
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: Publishing update event: %d", (int) self->sequence);
} else {
// If we have already received this event from the master, discard the message
if (s_was_pending(self, kvmsg))
kvmsg_destroy(&kvmsg);
else
zlist_append(self->pending, kvmsg);
}
}
return 0;
}
// If the message is already in the delay list, delete it and return TRUE
static int
s_was_pending(clonesrv_t *self, kvmsg_t *kvmsg)
{
kvmsg_t *held = (kvmsg_t *) zlist_first(self->pending);
while (held) {
if (memcmp(kvmsg_uuid(kvmsg),
kvmsg_uuid(held), sizeof(uuid_t)) == 0) {
```
zlist_remove(self->pending, held);
return TRUE;
}
held = (kvmsg_t *) zlist_next(self->pending);
}
return FALSE;
}
// ---------------------------------------------------------------------
// Удаление мгновенного значения с истекшим сроком действия
static int s_flush_single(char *key, void *data, void *args);
static int
s_flush_ttl(zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
zhash_foreach(self->kvmap, s_flush_single, args);
return 0;
}
// Если ключ-значение истекло, то выполняется удаление и публикуется событие
static int
s_flush_single(char *key, void *data, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = (kvmsg_t *) data;
int64_t ttl;
sscanf(kvmsg_get_prop(kvmsg, "ttl"), "%" PRId64, &ttl);
if (ttl && zclock_time() >= ttl) {
kvmsg_set_sequence(kvmsg, ++self->sequence);
kvmsg_set_body(kvmsg, (byte *) "", 0);
kvmsg_send(kvmsg, self->publisher);
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: Удаление события отправлено: %d", (int) self->sequence);
}
return 0;
}
// ---------------------------------------------------------------------
// Отправка пульса
static int
s_send_hugz(zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
kvmsg_t *kvmsg = kvmsg_new(self->sequence);
kvmsg_set_key(kvmsg, "HUGZ");
kvmsg_set_body(kvmsg, (byte *) "", 0);
kvmsg_send(kvmsg, self->publisher);
kvmsg_destroy(&kvmsg);
return 0;
}
// ---------------------------------------------------------------------
// Обработчик событий изменения состояния
// Мы переходим в состояние master
//
// Второстепенный сервер сначала обновляет события из списка задержек в свой снимок,
// а затем начинает принимать запросы на снимки от клиентов.
static int
s_new_master(zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
return 0;
}Также стоит отметить, что язык исходного текста был определён как китайский, поэтому перевод выполнен на русский язык, сохраняя структуру и форматирование исходного текста.
```c
self->master = True;
self->slave = False;
zloop_cancel(bstar_zloop(self->bstar), self->subscriber);
// Обрабатываем события из списка задержек
while (zlist_size(self->pending)) {
kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop(self->pending);
kvmsg_set_sequence(kvmsg, ++self->sequence);
kvmsg_send(kvmsg, self->publisher);
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: Отправка событий из списка задержек: %d", (int) self->sequence);
}
return 0;
}
// ---------------------------------------------------------------------
// Переключаемся в режим slave
static int
s_new_slave(zloop_t *loop, void *unused, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
zhash_destroy(&self->kvmap);
self->master = False;
self->slave = True;
zloop_reader(bstar_zloop(self->bstar), self->subscriber,
s_subscriber, self);
return 0;
}
// ---------------------------------------------------------------------
// Получаем обновления от мастера
// При получении этих событий мы должны быть в режиме slave
static int
s_subscriber(zloop_t *loop, void *subscriber, void *args)
{
clonesrv_t *self = (clonesrv_t *) args;
// Получаем снимок, если это необходимо
if (self->kvmap == NULL) {
self->kvmap = zhash_new();
void *snapshot = zsocket_new(self->ctx, ZMQ_DEALER);
zsocket_connect(snapshot, "tcp://localhost:%d", self->peer);
zclock_log("I: Запрос снимка: tcp://localhost:%d",
self->peer);
zstr_send(snapshot, "ICANHAZ? ");
while (True) {
kvmsg_t *kvmsg = kvmsg_recv(snapshot);
if (! kvmsg)
break; // Прерывание
if (streq(kvmsg_key(kvmsg), "KTHXBAI")) {
self->sequence = kvmsg_sequence(kvmsg);
```
Замечания:
1. Удалены лишние пробелы после комментариев.
2. Исправлены знаки препинания и пробелы.
3. Переведены текстовые сообщения внутри кода.```markdown
kvmsg_destroy(&kvmsg);
break; // Завершение
}
kvmsg_store(&kvmsg, self->kvmap);
}
zclock_log("I: Получен снимок, версия: %d", (int) self->sequence);
zsocket_destroy(self->ctx, snapshot);
}
// Поиск и удаление
kvmsg_t *kvmsg = kvmsg_recv(subscriber);
if (! kvmsg)
return 0;
if (strneq(kvmsg_key(kvmsg), "HUGZ")) {
if (! s_was_pending(self, kvmsg)) {
// Если событие обновления master поступает раньше, чем событие клиента, то событие master добавляется в список задержек,
// и при получении события обновления от клиента, оно удаляется из списка.
zlist_append(self->pending, kvmsg_dup(kvmsg));
}
// Если событие обновления имеет большую последовательность, чем версия kvmap, применяется это событие
if (kvmsg_sequence(kvmsg) > self->sequence) {
self->sequence = kvmsg_sequence(kvmsg);
kvmsg_store(&kvmsg, self->kvmap);
zclock_log("I: Получено событие обновления: %d", (int)self->sequence);
} else {
kvmsg_destroy(&kvmsg);
}
} else {
kvmsg_destroy(&kvmsg);
}
return 0;
}
```
Этот программный код состоит всего из нескольких сотен строк, но всё равно занял некоторое время для отладки. В этой модели включены механизмы восстановления после сбоев, мгновенные значения, поддеревья и прочее. Несмотря на то, что мы тщательно спроектировали его заранее, отладка между несколькими сокетами всё ещё представляет собой сложную задачу. Ниже приведён мой подход к работе:
```* Используя реактор (bstar, основанный на zloop), мы значительно сократили объём кода, сделав программу более понятной и компактной. Вся служба работает в одном потоке, поэтому нет проблем с синхронизацией между потоками. Достаточно передать указатель на структуру (self) всем обработчикам. Кроме того, использование реактора позволяет сделать код более модульным и удобным для повторного использования.
* Мы отладывали каждый модуль по отдельности, переходя к следующему только тогда, когда текущий работал корректно. Поскольку используется четыре-пять сокетов, это увеличивает объём работы по отладке. Я выводил информацию об отладке непосредственно на экран, так как не было необходимости использовать отдельный отладочный инструмент.
* Поскольку я постоянно использовал инструмент valgrind для тестирования, я уверен, что программа не имеет утечек памяти. В C-языке утечки памяти являются серьёзной проблемой, поскольку нет автоматической системы сбора мусора. Правильное использование абстракций, таких как kvmsg и czmq, помогает избежать утечек памяти.
В этом коде, конечно, могут быть ошибки, и некоторые читатели могут помочь мне в их отладке и исправлении, за что я благодарен.При тестировании модели 6 сначала запускаются основной и резервный серверы, а затем группа клиентов. Порядок запуска может быть произвольным. Случайным образом прекращается работа одного из сервисных процессов; если дизайн программы правильный, клиенты должны получать согласованные данные.#### Клонирование протокола
После затраченных усилий на создание надёжной модели публикации-подписки, мы хотим, чтобы её можно было легко расширять в будущем. Лучшим способом является создание протокола, который можно реализовать на различных языках программирования.
Мы называем его "протоколом распределённой хэш-таблицы", это механизм управления хэш-таблицами в распределённых системах, предоставляющий возможности для многоклиентского взаимодействия; клиенты могут работать только с поддеревьями данных, включая обновление и установку мгновенных значений.
* http://rfc.zeromq.org/spec:12

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/andwp-zguide-cn.git
git@api.gitlife.ru:oschina-mirror/andwp-zguide-cn.git
oschina-mirror
andwp-zguide-cn
andwp-zguide-cn
master