1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/andwp-zguide-cn

В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
chapter4.txt 270 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 23.06.2025 22:09 06c6f61
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127
```markdown
.vim: set filetype=markdown:
.set GIT=https://github.com/anjuke/zguide-cn
## Reliable Request-Reply Pattern
In Chapter 3, we explored the advanced request-reply pattern using examples. In this chapter, we will discuss the reliability of the request-reply pattern and create a reliable request-reply system using the socket types provided by ZMQ.
The following topics will be covered in this chapter:
* Client Request-Reply
* Least Recently Used Queue
* Heartbeating
* Service Queue
* Disk-Based Queue (offline)
* Primary and Backup Services
* Request-Reply Without Middleware
### What is Reliability?
To define reliability, we can first define its opposite concept — failure. If we can handle certain types of failures, then our model is reliable with respect to these failures. Let's examine possible issues in distributed applications using ZMQ, starting with the most likely failures:
```* Программный код приложения является основным источником отказов. Программа может прекратить работу, перестать отвечать на запросы или отвечать слишком медленно, что приведёт к исчерпанию памяти и т.д.
* Код системы, такой как промежуточное программное обеспечение, написанное с использованием ZMQ, также может внезапно прекратить работу. Код системы должен быть более надёжным, чем код приложения, но всё же он может выйти из строя, особенно если система работает с очень медленными клиентами, что может привести к исчерпанию памяти.
* Превышение размера очереди сообщений, что обычно происходит из-за того, что система не активно управляет медленными клиентами, позволяя очереди сообщений переполниться.
* Временные сбои сети, приводящие к потере сообщений. Такие ошибки ZMQ-приложений могут не обнаружить сразу, так как ZMQ автоматически выполняет повторное подключение.
* Сбой аппаратной системы, приводящий к завершению всех процессов.
* Специфические сбои сети, такие как отказ порта коммутатора, что делает часть сети недоступной.
* Центры обработки данных могут столкнуться с грозой, землетрясением, пожаром, перегрузкой напряжения, отказом системы охлаждения и т.д.Создание программной системы, способной избегать всех вышеупомянутых рисков, требует значительных затрат времени и ресурсов, что выходит за рамки данного руководства.
Так как первые пять типов отказов охватывают 99,9% случаев (эта статистика основана на недавнем исследовании), мы будем подробно рассматривать их. Если ваша компания достаточно крупна, чтобы учитывать последние два случая, свяжитесь со мной, так как мне нужна финансовая помощь для создания бассейна на моем заднем дворе.
### Проектирование надежности
Кратко говоря, надежность — это способность системы продолжать работать даже при возникновении отказов. Это намного сложнее, чем создание системы передачи сообщений. Мы рассмотрим каждый из ключевых типов сообщений, предоставляемых ZMQ, и обсудим способы обеспечения непрерывной работы кода.
Пожалуйста, обратите внимание, что в исходном тексте использовались китайские символы, а в переводе они заменены на русские. Также были адаптированы некоторые специфические термины и метафоры для лучшего понимания русскоязычного читателя. * Паттерн запрос-ответ: когда сервер прерывает обработку запроса, клиент узнает об этом и прекращает получение сообщений, выбирая ожидание повторной попытки, обращение к другому серверу и т.д. В данном случае мы не будем рассматривать ситуации с проблемами клиента.* Паттерн публикация-подписка: если клиент внезапно завершает работу после получения некоторых сообщений, сервер этого не узнает. Подписчики в паттерне публикация-подписка не отправляют никаких сообщений издателю обратно. Однако подписчики могут связаться с сервером другими способами, такими как паттерн запрос-ответ, и потребовать повторной отправки сообщений. В данном случае мы не будем рассматривать ситуации с проблемами сервера. Кроме того, подписчики могут проверять, не работают ли они слишком медленно, и принимать соответствующие меры (выдавать предупреждения оператору, останавливать работу и т. д.).
* Паттерн трубопровод: если worker внезапно завершает работу, задачи-распределитель не узнает об этом. Паттерн трубопровод похож на паттерн публикация-подписка тем, что отправляет сообщения только в одном направлении. Однако конечный приемник результатов может обнаружить, какие задачи не были завершены, и сообщить задачи-распределителю перераспределить эти задачи. Если задачи-распределитель или конечный приемник результатов внезапно завершают работу, то запросы от клиента должны быть обработаны по-другому. Таким образом, системному коду действительно следует минимизировать вероятность ошибок, поскольку это сложно обрабатывать.В этой главе рассматриваются методы обеспечения надежности в паттерне запрос-ответ. Другие паттерны будут рассмотрены в последующих главах.
Самый базовый паттерн запрос-ответ состоит в том, чтобы REQ-клиент отправлял синхронный запрос REP-серверу. Надежность такого паттерна очень низкая. Если сервер прерывает обработку запроса, клиент будет находиться в состоянии ожидания вечно.
В отличие от протокола TCP, ZMQ предоставляет автоматическое восстановление соединения, балансировку нагрузки при распределении сообщений и т. д. Однако в реальных условиях этого недостаточно. Единственный случай, когда можно полностью доверять базовому паттерну запрос-ответ, это общение между двумя потоками в одном процессе, где нет проблем с сетью или отказа сервера.
Однако, добавив некоторые модификации, этот базовый паттерн запрос-ответ может хорошо работать в реальных условиях. Я люблю называть его "пиратским" паттерном.
Кратко говоря, есть три основных способа, которыми клиент может подключаться к серверу, каждый из которых требует различных методов обеспечения надежности:* Множество клиентов непосредственно взаимодействуют с одним сервером. Сценарий использования: один центральный сервер, все клиенты должны общаться с ним. Проблемы для решения: отказ сервера и перезапуск; прерывание сетевого соединения.*
* Несколько клиентов и один серверный очередной механизм общаются друг с другом, при этом механизм распределяет запросы между несколькими серверами. Сценарий использования: распределение задач. Обрабатываемые сбои: аварийное завершение работы рабочего процесса и перезапуск, зацикливание, перегрузка; аварийное завершение работы очередного механизма и перезапуск; прерывание сети.** Несколько клиентов непосредственно взаимодействуют с несколькими серверами без промежуточного программного обеспечения. Сценарий использования: распределённые службы, аналогичные DNS. Обрабатываемые сбои: аварийное завершение работы сервера и перезапуск, зацикливание, перегрузка; прерывание сетевого соединения.
Каждый из этих вариантов дизайна имеет свои ограничения, и часто они используются в сочетании друг с другом. Далее мы подробнее рассмотрим этот вопрос.
### Реализация надёжности клиента (ленивый пиратский режим)
Мы можем достичь надёжного режима запрос-ответ, выполнив простые настройки на стороне клиента. Я назову это "ленивым пиратом" (Lazy Pirate) режимом.
При получении ответа мы не ждём его синхронно, а выполняем следующие действия:
* Проверяем REQ сокет на наличие сообщений, принимаем сообщение только при его наличии;
* После истечения времени ожидания запрос повторяется несколько раз;
* Если ответ так и не получен, текущая транзакция завершается.```textdiagram
+-----------+ +-----------+ +-----------+
| Client | | Client | | Client |
+-----------+ +-----------+ +-----------+
| Retry | | Retry | | Retry |
+-----------+ +-----------+ +-----------+
| REQ | | REQ | | REQ |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
|
v
/-------------\
| REP |
+-------------+
| |
| Server |
| |
+-------------+
``` Рисунок # - Ленивый пиратский режим
```
При использовании REQ сокета важно строго соблюдать последовательность отправки и получения данных, так как внутри него используется конечный автомат для управления состоянием. Это свойство может создать проблемы при применении "пиратского" режима. Самый простой способ — закрыть и перезапустить REQ сокет, чтобы разорвать ограничение.
```**lpclient: Ленивый пиратский клиент на C**
```c
//
// Lazy Pirate client
// Использует zmq_poll для безопасного запроса-ответа
// При выполнении программы случайным образом закрывается или перезапускается lpserver
//
#include "czmq.h"
#define REQUEST_TIMEOUT 2500 // миллисекунды, (> 1000! )
#define REQUEST_RETRIES 3 // количество попыток
#define SERVER_ENDPOINT "tcp://localhost:5555"
int main (void)
{
zctx_t *ctx = zctx_new ();
printf ("I: Подключаюсь к серверу... \n");
void *client = zsocket_new (ctx, ZMQ_REQ);
assert (client);
zsocket_connect (client, SERVER_ENDPOINT);
int sequence = 0;
int retries_left = REQUEST_RETRIES;
while (retries_left && !zctx_interrupted) {
// Отправляем запрос и начинаем получать ответ
char request [10];
sprintf (request, "%d", ++sequence);
zstr_send (client, request);
int expect_reply = 1;
while (expect_reply) {
// Опросим сокет и установим таймаут
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
// Обрабатываем ответ, если он получен
if (items [0].revents & ZMQ_POLLIN) {
// Сервер вернул ответ, который должен совпадать с номером запроса
char *reply = zstr_recv (client);
if (!reply)
break; // Прерывание
if (atoi (reply) == sequence) {
printf ("I: Сервер вернул нормальный ответ (%s)\n", reply);
retries_left = REQUEST_RETRIES;
expect_reply = 0;
}
else
printf ("E: Сервер вернул ненормальный ответ: %s\n",
reply);
free (reply);
}
else
if (--retries_left == 0) {
``````c
printf("E: Сервер недоступен, отмена операции\n");
break;
}
else {
printf("W: Сервер не отвечает, повторная попытка...\n");
// Закрываем старый сокет и создаем новый
zsocket_destroy(ctx, client);
printf("I: Подключение к серверу...\n");
client = zsocket_new(ctx, ZMQ_REQ);
zsocket_connect(client, SERVER_ENDPOINT);
}
}
}
zctx_destroy(&ctx);
return 0;
}
```
```**lpserver: Ленивый пират сервер на C**```
```c
//
// Ленивый пират сервер
// Привязывает REQ сокет к tcp://*:5555
// Аналогично hwserver программе, за исключением следующих моментов:
// - Прямое вывод сообщения запроса
// - Случайное замедление работы или завершение программы, моделирующее сбой
//
#include "zhelpers.h"
int main (void)
{
srandom ((unsigned) time (NULL));
void *context = zmq_init (1);
void *server = zmq_socket (context, ZMQ_REP);
zmq_bind (server, "tcp://*:5555");
int cycles = 0;
while (1) {
char *request = s_recv (server);
cycles++;
// После нескольких циклов начинаем моделировать различные сбои
if (cycles > 3 && randof (3) == 0) {
printf ("I: Моделируем программный сбой\n");
break;
}
else if (cycles > 3 && randof (3) == 0) {
printf ("I: Моделируем перегрузку процессора\n");
sleep (2);
}
printf ("I: Нормальный запрос (%s)\n", request);
sleep (1); // Время затратной операции
s_send (server, request);
free (request);
}
zmq_close (server);
zmq_term (context);
return 0;
}
```
При запуске этого тестового примера можно открыть два терминала: один для сервера, который случайным образом будет моделировать сбои, и другой для клиента, чтобы увидеть его реакцию. Типичный вывод сервера:
```c
I: Нормальный запрос (1)
I: Нормальный запрос (2)
I: Нормальный запрос (3)
I: Моделируем перегрузку процессора
I: Нормальный запрос (4)
I: Моделируем программный сбой
```
Вывод клиента:```
I: подключаюсь к серверу...
I: сервер ответил OK (1)
I: сервер ответил OK (2)
I: сервер ответил OK (3)
W: нет ответа от сервера, повторяем попытку...
I: подключаюсь к серверу...
W: нет ответа от сервера, повторяем попытку...
I: подключаюсь к серверу...
E: сервер кажется недоступным, прекращаем работу
```Клиент добавляет номер последовательности к каждому запросу и проверяет, совпадает ли полученный ответ с номером последовательности, чтобы гарантировать, что ни один запрос или ответ не был потерян, и что один и тот же ответ не получен несколько раз или в неправильном порядке. Повторите несколько раз, чтобы убедиться, что это действительно работает. В реальных условиях вам не потребуется использовать последовательность, это было сделано только для демонстрации того, что этот подход работает.
Клиент использует REQ сокет для отправки запросов и открывает новый сокет при возникновении проблем, чтобы обойти обязательное правило отправки/получения REQ. Вы можете подумать о использовании DEALER сокета, но это плохая идея. Во-первых, DEALER не обрабатывает оболочку так, как это делает REQ (если вы не знаете, что такое оболочка, то использование DEALER еще хуже). Во-вторых, вы можете получить результат, которого не ожидали. Этот вариант имеет следующие преимущества и недостатки:
* Преимущества: прост в реализации и понимании;
* Преимущества: легко интегрировать с существующими клиентскими и серверными программами;
* Преимущества: ZMQ имеет автоматическую систему повторного подключения;
* Недостатки: при отказе одного сервера невозможно переадресовать запросы на другой доступный сервер.### Базовая надежная очередь (простой пиратский режим)
Во втором режиме мы используем устройство очереди для расширения "ленивого пирата" режима, чтобы клиент мог прозрачно взаимодействовать с несколькими серверами. В данном контексте серверы могут быть определены как рабочие процессы (workers). Мы можем начать с базовой модели и поэтапно внедрять этот подход.
Во всех пиратских режимах рабочие процессы являются бесштатными, то есть существует некий общедоступный состояние, такое как общая база данных. Присутствие устройства очереди позволяет рабочим процессам входить и выходить без ведома клиента. Когда один рабочий процесс прекращает работу, другой может его заменить. Эта топология очень проста, но её единственный недостаток заключается в том, что само устройство очереди может стать узким местом и источником однопунктовых отказов.
В третьей главе базовый алгоритм устройства очереди основан на методе наименьших недавних использований (LRU). Если рабочий процесс умирает или блокируется, что нам следует сделать? Ответ — практически ничего. Мы уже внедрили механизм повторной попытки в клиенте, поэтому использование базового LRU-устройства очереди будет работать хорошо. Этот подход также соответствует логике ZMQ, поэтому мы можем расширить его, вставив простое устройство очереди между точками взаимодействия:```textdiagram```
+-----------+ +-----------+ +-----------+
| Клиент | | Клиент | | Клиент |
+-----------+ +-----------+ +-----------+
| Повтор | | Повтор | | Повтор |
+-----------+ +-----------+ +-----------+
| REQ | | REQ | | REQ |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
|
v
/-----------\
| ROUTER |
+-----------+
| LRU |
| Очередь |
+-----------+
| ROUTER |
\-----------/
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| REQ | | REQ | | REQ |
+-----------+ +-----------+ +-----------+
| LRU | | LRU | | LRU |
| Рабочий | | Рабочий | | Рабочий |
+-----------+ +-----------+ +-----------+
Рисунок # - Простой пиратский шаблон
Можно использовать клиент из шаблона "ленивого пирата". Ниже приведён код очереди:
**spqueue: Простая пиратская очередь на C**
```c
//
// Простая пиратская очередь
//
// Этот механизм полностью аналогичен LRU-очереди и не содержит никаких средств обеспечения надежности. Работа механизма поддерживается за счет повторных попыток клиента.
//
#include "czmq.h"
#define LRU_READY "\001" // Сообщение: worker готов к работе
int main(void)
{
// Подготовка контекста и сокетов
zctx_t *ctx = zctx_new();
void *frontend = zsocket_new(ctx, ZMQ_ROUTER);
void *backend = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(frontend, "tcp://*:5555"); // Клиентский конечный пункт
zsocket_bind(backend, "tcp://*:5556"); // Конечный пункт worker
// Очередь доступных worker'ов
zlist_t *workers = zlist_new();
while (1) {
zmq_pollitem_t items[] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Когда есть доступные worker'ы, опросить передний конечный пункт
int rc = zmq_poll(items, zlist_size(workers) ? 2 : 1, -1);
if (rc == -1)
break; // Прервать
// Обработать сообщения worker'ов с заднего конечного пункта
if (items[0].revents & ZMQ_POLLIN) {
// Используйте адрес worker'а для LRU-очереди
zmsg_t *msg = zmsg_recv(backend);
if (!msg)
break; // Прервать
zframe_t *address = zmsg_unwrap(msg);
zlist_append(workers, address);
// Если сообщение не является READY, переслать его клиенту
zframe_t *frame = zmsg_first(msg);
if (memcmp(zframe_data(frame), LRU_READY, 1) == 0)
zmsg_destroy(&msg);
else
zmsg_send(&msg, frontend);
}
if (items[1].revents & ZMQ_POLLIN) {
// Получить запрос клиента и переслать первому доступному worker'у
zmsg_t *msg = zmsg_recv(frontend);
}
``````c
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
zmsg_send (&msg, backend);
}
}
}
// По завершении работы программы выполнить очистку
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
}
```Вот код worker'а, который использует службу "ленивых пиратов" и настраивает его в режиме LRU (используя REQ сокет для передачи сигнала "готовности"):
```**spworker: Простой Pirate worker на C**
```c
//
// Простой пиратский worker
//
// Использует REQ сокет для подключения к tcp://*:5556, реализует worker с использованием алгоритма LRU
//
#include "czmq.h"
#define LRU_READY "\001" // Сообщение: worker готов
int main (void)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
// Использует случайный символ для указания идентификатора сокета, что облегчает отслеживание
srandom ((unsigned) time (NULL));
char identity [10];
sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
zmq_setsockopt (worker, ZMQ_IDENTITY, identity, strlen (identity));
zsocket_connect (worker, "tcp://localhost:5556");
// Уведомляет агента о том, что worker готов
printf ("I: (%s) worker готов\n", identity);
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
int cycles = 0;
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Прерывание
// После нескольких циклов начинает имитировать различные проблемы
cycles++;
if (cycles > 3 && randof (5) == 0) {
printf ("I: (%s) имитация отказа\n", identity);
zmsg_destroy (&msg);
break;
}
else if (cycles > 3 && randof (5) == 0) {
printf ("I: (%s) имитация перегрузки CPU\n", identity);
sleep (3);
if (zctx_interrupted)
break;
}
printf ("I: (%s) нормальный ответ\n", identity);
sleep (1); // Выполняет некоторые действия
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return 0;
}
```Запустив этот пример, запустите несколько worker'ов, один client и одну очередь устройств в любом порядке. Вы заметите, что все worker'ы рано или поздно выйдут из строя или умрут, а client будет многократно пытаться повторить попытки и в конечном итоге сдаться. Устройство никогда не прекращает работу, и вы можете перезапускать worker'ы и client'ы сколько угодно. Эта модель может работать с любым количеством worker'ов и client'ов.### Надежная очередь (параноидальный пиратский режим)
Режим "простого пирата" работает очень хорошо, главным образом потому, что он представляет собой сочетание двух существующих режимов. Однако у него есть и некоторые недостатки:
* Этот режим не может обрабатывать сбои или перезапуск очередей. Client будет повторять попытки, но worker не будет перезапущен. Хотя ZMQ автоматически восстановит соединение сокета worker, для новых запущенных устройств очереди worker будет считаться отсутствующим, так как он не отправил сообщение «готовности». Для решения этой проблемы нам необходимо отправлять пульсы от устройства очереди к worker, чтобы тот знал, когда очередь умерла.
* Устройство очереди не проверяет, жив ли worker, поэтому если worker умирает в состоянии покоя, устройство очереди удалит его из очереди только после отправки запроса. В этом случае client ничего сделать не может, кроме как ждать. Это не является критической проблемой, но все же недостаточно хорошим решением. Поэтому нам необходимо отправлять пульсы от worker к устройству очереди, чтобы последнее знало, когда worker умер.
Мы используем метод, известный как "параноидальный пиратский режим", чтобы решить вышеупомянутые две проблемы.Ранее мы использовали REQ сокеты в качестве типа сокета worker, но в параноидальном пиратском режиме мы будем использовать DEALER сокеты, что позволит нам отправлять и принимать сообщения произвольно, а не выполнять цикл отправки-приема, как это делает REQ сокет. Недостатком DEALER является то, что нам придётся самостоятельно управлять оболочками сообщений. Если вы не знаете, что такое оболочка, прочитайте об этом в третьей главе.```textdiagram``````
+-----------+ +-----------+ +-----------+
| Клиент | | Клиент | | Клиент |
+-----------+ +-----------+ +-----------+
| Повтор | | Повтор | | Повтор |
+-----------+ +-----------+ +-----------+
| REQ | | REQ | | REQ |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
|
v
/-----------\
| ROUTER |
+-----------+
| Очередь |
+-----------+
| Heartbeat |
+-----------+
| ROUTER |
\-----------/
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| DEALER | | DEALER | | DEALER |
+-----------+ +-----------+ +-----------+
| Heartbeat | | Heartbeat | | Heartbeat |
+-----------+ +-----------+ +-----------+
| Рабочий | | Рабочий | | Рабочий |
+-----------+ +-----------+ +-----------+
```````
Рисунок # - Паттерн Paranoid Pirate
```**ppqueue: Параноидальная пиратская очередь на C**
```c
//
// Параноидальная пиратская очередь
//
#include "czmq.h"
#define HEARTBEAT_LIVENESS 3 // Уровень живучести сердцебиения, значение от 3 до 5 является разумным
#define HEARTBEAT_INTERVAL 1000 // Единица измерения: миллисекунды
// Коды сообщений протокола параноидального пирата
#define PPP_READY "\001" // Рабочий процесс готов
#define PPP_HEARTBEAT "\002" // Сердцебиение рабочего процесса
// Используется следующая структура для представления одного действительного рабочего процесса в списке рабочих процессов
typedef struct {
zframe_t *address; // Адрес рабочего процесса
char *identity; // Печатаемое имя сокета
int64_t expiry; // Время истечения срока действия
} worker_t;
// Создание нового рабочего процесса
static worker_t *
s_worker_new(zframe_t *address)
{
worker_t *self = (worker_t *)zmalloc(sizeof(worker_t));
self->address = address;
self->identity = zframe_strdup(address);
self->expiry = zclock_time() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
return self;
}
// Уничтожение структуры рабочего процесса, включая его идентификатор
static void
s_worker_destroy(worker_t **self_p)
{
assert(self_p);
if (*self_p) {
worker_t *self = *self_p;
zframe_destroy(&self->address);
free(self->identity);
free(self);
*self_p = NULL;
}
}
// Обработка события готовности рабочего процесса, перемещение рабочего процесса в конец списка
static void
s_worker_ready(worker_t *self, zlist_t *workers)
{
worker_t *worker = (worker_t *)zlist_first(workers);
while (worker) {
if (streq(self->identity, worker->identity)) {
zlist_remove(workers, worker);
s_worker_destroy(&worker);
break;
}
}
}
``````c
worker = (worker_t *)zlist_next(workers);
}
zlist_append(workers, self);
}
// Возврат адреса следующего доступного рабочего процесса
static zframe_t *
s_workers_next(zlist_t *workers)
{
worker_t *worker = zlist_pop(workers);
assert(worker);
zframe_t *frame = worker->address;
worker->address = NULL;
s_worker_destroy(&worker);
return frame;
}
// Поиск и уничтожение просроченных рабочих процессов.
// Поскольку самый старый рабочий процесс находится в начале списка, поиск прекращается при первом непросроченном рабочем процессе.
static void
s_workers_purge(zlist_t *workers)
{
worker_t *worker = (worker_t *)zlist_first(workers);
while (worker) {
if (zclock_time() < worker->expiry)
break; // Рабочий процесс не просрочен, прекращаем поиск
zlist_remove(workers, worker);
s_worker_destroy(&worker);
}
}
// Возврат адреса первого доступного рабочего процесса
worker = (worker_t *) zlist_first(workers);
}
int main(void)
{
zctx_t *ctx = zctx_new();
void *frontend = zsocket_new(ctx, ZMQ_ROUTER);
void *backend = zsocket_new(ctx, ZMQ_ROUTER);
zsocket_bind(frontend, "tcp://*:5555"); // Клиентский конечный пункт
zsocket_bind(backend, "tcp://*:5556"); // Рабочий конечный пункт
// Список доступных рабочих процессов
zlist_t *workers = zlist_new();
// Регулярная отправка пульса
uint64_t heartbeat_at = zclock_time() + HEARTBEAT_INTERVAL;
while (1) {
zmq_pollitem_t items[] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Когда есть доступные рабочие процессы, опрашивать передний конечный пункт
int rc = zmq_poll(items, zlist_size(workers) ? 2 : 1,
HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
// Обработка запросов от рабочего процесса
if (items[0].revents & ZMQ_POLLIN) {
``````c
// использовать адрес рабочего процесса для LRU маршрутизации
zmsg_t *msg = zmsg_recv(backend);
if (!msg)
break; // прерывание
// любой сигнал от рабочего процесса указывает на его активность
zframe_t *address = zmsg_unwrap(msg);
worker_t *worker = s_worker_new(address);
s_worker_ready(worker, workers);
// обработка команд управления или пересылка ответа клиенту
if (zmsg_size(msg) == 1) {
zframe_t *frame = zmsg_first(msg);
if (memcmp(zframe_data(frame), PPP_READY, 1)
|| memcmp(zframe_data(frame), PPP_HEARTBEAT, 1)) {
printf("E: invalid message from worker");
zmsg_dump(msg);
}
zmsg_destroy(&msg);
} else {
zmsg_send(&msg, frontend);
}
}
if (items[1].revents & ZMQ_POLLIN) {
// получение следующего запроса от клиента и передача его следующему доступному рабочему процессу
zmsg_t *msg = zmsg_recv(frontend);
if (!msg)
break; // прерывание
zmsg_push(msg, s_workers_next(workers));
zmsg_send(&msg, backend);
}
// отправка пульса свободным рабочим процессам
if (zclock_time() >= heartbeat_at) {
worker_t *worker = (worker_t *)zlist_first(workers);
while (worker) {
zframe_send(&worker->address, backend,
ZFRAME_REUSE + ZFRAME_MORE);
zframe_t *frame = zframe_new(PPP_HEARTBEAT, 1);
zframe_send(&frame, backend, 0);
worker = (worker_t *)zlist_next(workers);
}
heartbeat_at = zclock_time() + HEARTBEAT_INTERVAL;
}
s_workers_purge(workers);
}
// Программа завершена, производится очистка
while (zlist_size(workers)) {
```
worker_t *worker = (worker_t *) zlist_pop(workers);
s_worker_destroy(&worker);
}
zlist_destroy(&workers);
zctx_destroy(&ctx);
return 0;
}
```
Этот модуль очереди расширяет LRU-модель с использованием механизма пульса, что кажется простым, но придумать такую идею довольно сложно. В следующем разделе будет более подробно рассмотрен механизм пульса.Вот код работника Paranoid Pirate на C:**ppworker: Paranoid Pirate worker in C**```c
//
// Paranoid Pirate worker
//
#include "czmq.h"
#define HEARTBEAT_LIVENESS 3 // Reasonable value: 3-5
#define HEARTBEAT_INTERVAL 1000 // Unit of measurement: milliseconds
#define INTERVAL_INIT 1000 // Retry interval
#define INTERVAL_MAX 32000 // Maximum value for the delayed retry algorithm
// Constants defining the Paranoid Pirate standard
#define PPP_READY "\001" // Message: worker ready
#define PPP_HEARTBEAT "\002" // Message: worker heartbeat
// Returns a socket connected to the Paranoid Pirate queue device
static void *
s_worker_socket (zctx_t *ctx) {
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (worker, "tcp://localhost:5556");
// Notifies the queue that the worker is ready
printf ("I: worker ready\n");
zframe_t *frame = zframe_new (PPP_READY, 1);
zframe_send (&frame, worker, 0);
return worker;
}
int main (void)
{
zctx_t *ctx = zctx_new ();
void *worker = s_worker_socket (ctx);
// If the heartbeat health is zero, it means that the queue device has died
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;
// Regularly send heartbeats
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
srandom ((unsigned) time (NULL));
int cycles = 0;
while (1) {
zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Termination
if (items [0].revents & ZMQ_POLLIN) {
// Receive message
// - 3 parts of the message, stamp + content, represents a request
// - 1 part of the message, represents a heartbeat
``````c
zmsg_t *msg = zmsg_recv(worker);
if (!msg)
break; // Прерывание
if (zmsg_size(msg) == 3) {
// Цикл нескольких слов после моделирования различных проблем
cycles++;
if (cycles > 3 && randof(5) == 0) {
printf("I: Моделирование отказа\n");
zmsg_destroy(&msg);
break;
} else
if (cycles > 3 && randof(5) == 0) {
printf("I: Моделирование перегрузки CPU\n");
sleep(3);
if (zctx_interrupted)
break;
}
printf("I: Нормальная реакция\n");
zmsg_send(&msg, worker);
}
liveness = HEARTBEAT_LIVENESS;
sleep(1); // Выполнение некоторых операций
if (zctx_interrupted)
break;
if (zmsg_size(msg) == 1) {
zframe_t *frame = zmsg_first(msg);
if (memcmp(zframe_data(frame), PPP_HEARTBEAT, 1) == 0)
liveness = HEARTBEAT_LIVENESS;
else {
printf("E: Недопустимое сообщение\n");
zmsg_dump(msg);
}
zmsg_destroy(&msg);
} else {
printf("E: Недопустимое сообщение\n");
zmsg_dump(msg);
}
interval = INTERVAL_INIT;
if (--liveness == 0) {
printf("W: Отсутствие ответа на пинг, невозможно подключиться к устройству\n");
printf("W: Повторное подключение через %zd миллисекунд...\n", interval);
zclock_sleep(interval);
if (interval < INTERVAL_MAX)
interval *= 2;
zsocket_destroy(ctx, worker);
worker = s_worker_socket(ctx);
liveness = HEARTBEAT_LIVENESS;
}
```
// отправка пинга устройству в нужное время
if (zclock_time() > heartbeat_at) {
heartbeat_at = zclock_time() + HEARTBEAT_INTERVAL;
printf("I: worker отправил пинг\n");
zframe_t *frame = zframe_new(PPP_HEARTBEAT, 1);
zframe_send(&frame, worker, 0);
}
}
zctx_destroy(&ctx);
return 0;
}
```
Несколько пояснений:* Код включает несколько мест с имитацией ошибок, как и раньше. Это делает код крайне сложным для поддержки, поэтому при запуске в продакшене эти имитации следует удалить.
* В режиме параноидальных пиратов иногда возникают проблемы с сердцебиением очереди, что будет подробно рассмотрено ниже.
* Worker использует механизм повторной попытки, аналогичный ленивым пиратам клиента, но с двумя отличиями: 1) настройка алгоритма отката; 2) никогда не сдаваться.
Попробуйте запустить следующий код, чтобы пройти через процесс:
```
ppqueue &
for i in 1 2 3 4; do
ppworker &
sleep 1
done
lpclient &
```
Вы заметите, как worker один за другим выходят из строя, а клиент отказывается после нескольких попыток. Вы можете остановить и перезапустить устройство очереди, клиент и worker последовательно переподключатся и правильно отправлять, обрабатывать и получать запросы, сохраняя порядок. Таким образом, весь процесс коммуникации имеет две возможности: успешное взаимодействие или окончательный отказ клиента.
### СердцебиениеКогда я писал пример для режима параноидальных пиратов, мне потребовалось около пяти часов, чтобы согласовать сердцебиение между очередью и worker'ами, а остальные запросы-ответы заняли около десяти минут. Преимущества механизма сердцебиения в плане надежности иногда не оправдывают проблем, которые он вызывает. В процессе использования могут возникнуть "ложные сбои", когда узел считает, что потерял соединение, потому что сердцебиение не было отправлено корректно.При понимании и реализации сердцебиения следует учитывать следующее:
* Сердцебиение не является запросом-ответом; они асинхронно передаются между узлами, и любой узел может использовать его для определения того, что другой узел умер, и прекратить коммуникацию.
* Если узел использует постоянный сокет (то есть установил флаг сокета), это означает, что сердцебиения могут накапливаться и будут получены вместе при переподключении. Поэтому worker не должен использовать постоянные сокеты. Пример кода использует постоянные сокеты для удобства отладки, и в коде используются случайные флаги сокета, чтобы избежать повторного использования предыдущих флагов.
* При использовании сердцебиения следует запустить его до обработки других сообщений. Убедитесь, что при запуске любого узла сердцебиение работает корректно. Остановите и перезапустите их, чтобы тестировать заморозку, аварии и другие ситуации.* Если ваш основной цикл использует zmq_poll(), вам следует использовать другой таймер для триггеров сердцебиения. Не используйте основной цикл для управления отправкой сердцебиений, это может привести к чрезмерному количеству сердцебиений (блокировка сети) или слишком малому количеству (что приведет к отключению узлов). Библиотека zhelpers предоставляет функцию s_clock(), которая возвращает текущее время системы в миллисекундах, которую можно использовать для контроля интервалов отправки сердцебиений. Пример кода на C:```c
int current_time = s_clock();
```
```c
// Regular heartbeat sending
uint64_t heartbeat_at = s_clock() + HEARTBEAT_INTERVAL;
while (1) {
zmq_poll(items, 1, HEARTBEAT_INTERVAL * 1000);
// Regardless of the behavior of zmq_poll, use the following logic to determine the need for sending a heartbeat
if (s_clock() > heartbeat_at) {
… Send heartbeat to all nodes
// Set the time for the next heartbeat
heartbeat_at = s_clock() + HEARTBEAT_INTERVAL;
}
}
```
* The main loop should use the heartbeat interval as the wait time. Clearly, using an infinite wait time is unacceptable, and a value less than the heartbeat interval will only lead to unnecessary loop iterations.
* Use a simple tracking method, such as printing information to the console. Here are some tracking tips: use the zmsg() function to print the contents of the socket; assign message numbers to check intervals between them.
* In real applications, the heartbeat must be configurable and synchronized with nodes. Some nodes require frequent heartbeats, for example, every 10 milliseconds, while others may require heartbeats every 30 seconds.
* If you want to send heartbeats at different frequencies for different nodes, set the poll wait time to the minimum heartbeat interval.* Возможно, вы захотите использовать отдельный сокет для управления пульсами, что может показаться хорошей идеей, так как это позволит отделить синхронные запросы-ответы от асинхронных пульсов. Однако, эта идея имеет недостатки: во-первых, отправка данных не требует отправки пульсов; во-вторых, сокет может заблокироваться из-за сетевых проблем, и вам потребуется знать, почему сокет для отправки данных перестал отвечать — из-за смерти или из-за чрезмерной нагрузки. В этом случае вам потребуется отправлять пульсы для этого сокета. В-третьих, управление двумя сокетами намного сложнее, чем управление одним.* Мы не настроили пульсы от клиента до очереди, так как это слишком сложно и не имеет большой ценности.
### Соглашения и протоколы
Возможно, вы заметили, что из-за механизма пульсов режимы параноидального пирата и простого пирата не совместимы.
На самом деле, здесь нам следует создать протокол. Возможно, в тестовом режиме протокол не требуется, но он необходим для реальных приложений. Что если использовать другие языки для написания рабочих процессов? Необходимо ли нам просматривать исходный код для понимания процесса коммуникации? Что если мы хотим изменить протокол? Нормы могут быть простыми, но они не очевидны. Чем успешнее протокол, тем сложнее он становится. Программа, которая не следует соглашениям, точно не будет переиспользуемой, поэтому давайте создадим спецификацию для этого протокола. Как это сделать?
* На вики-странице [rfc.zeromq.org](http://rfc.zeromq.org/) специально выделена страница для хранения протоколов ZMQ.
* Чтобы создать новый протокол, вам нужно зарегистрироваться и следовать инструкциям. Процесс прост, но не каждый может писать техническую документацию.
Мне потребовалось около 15 минут, чтобы набросать спецификацию [Пиратского протокола PPP](http://rfc.zeromq.org/spec:6), она небольшая, но полная.
Чтобы использовать протокол PPP в реальных условиях, вам также потребуется:* Включите номер версии в команду READY, чтобы безопасно добавлять новые версии PPP в будущем.
* В настоящее время сигналы READY и HEARTBEAT не указывают, являются ли они запросами или ответами. Для их различия требуется новая структура сообщений, содержащая информацию о "типе сообщения".
### Услуги надежной очереди (Управляющий протокол)
В мире все меняется быстро, и когда мы ждем лучшего протокола для решения проблем предыдущего раздела, уже кто-то его разработал:
* http://rfc.zeromq.org/spec:7
Эта спецификация занимает всего одну страницу и делает протокол PPP более надежным. При проектировании сложных архитектур мы должны делать так: сначала записывать соглашения, а затем реализовывать их программным обеспечением.
Управляющий протокол (MDP) вводит интересную особенность при расширении PPP: каждый запрос, отправленный клиентом, имеет "название услуги", а worker при регистрации в очередь должен указать свой тип услуги. Преимуществом MDP является то, что он основан на реальной практике программирования, прост в использовании и легко масштабируется.
Введение механизма "название услуги" является простым дополнением к пиратскому протоколу, и результатом является превращение его в агента услуг.```textdiagram```
+-----------+ +-----------+ +-----------+
| | | | | |
| Клиент | | Клиент | | Клиент |
| | | | | |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
"Дай мне кофе" | "Дай мне чай"
v
/-----------\
| |
| Брокер |
| |
\-----------/
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| "Вода" | | "Чай" | | "Кофе" |
+-----------+ +-----------+ +-----------+
| | | | | |
| Рабочий | | Рабочий | | Рабочий |
| | | | | |
+-----------+ +-----------+ +-----------+
Рисунок # - Шаблон управляющего
```Перед внедрением шаблона Управляющего нам необходимо создать фреймворк для клиентов и рабочих процессов. Если программист может реализовать этот шаблон с помощью простого API, то нет необходимости заставлять его изучать детали протокола Управляющего и его реализацию.
Поэтому наш первый протокол (т.е. протокол Управляющего) определяет, как узлы взаимодействуют в распределённой архитектуре, а второй протокол должен определить, как приложения должны использовать этот протокол через фреймворк.
Шаблон Управляющего имеет два конца: клиентский и серверный. Поскольку нам нужно создать фреймворк как для клиентов, так и для рабочих процессов, нам потребуется предоставить два набора API. Ниже приведён пример API клиента, разработанный с использованием простых объектно-ориентированных методов и библиотеки [ZFL](http://zfl.zeromq.org/page:read-the-manual) на языке C.
```c
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
```
Это всё. Мы создаём сессию для связи с брокером, отправляем и получаем запрос, а затем закрываем соединение. Ниже представлен пример API для рабочего процесса.
```c
mdwrk_t *mdwrk_new (char *broker, char *service);
void mdwrk_destroy (mdwrk_t **self_p);
zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);
```Эти два фрагмента кода выглядят похожими, но API для рабочего процесса немного различается. После первого вызова recv() рабочий процесс отправляет пустой ответ, а затем отправляет текущий ответ и получает новый запрос.Оба API легко реализовать, достаточно модифицировать код из примера Параноидального пирата. Ниже представлен API клиента:
**mdcliapi: API клиента Управляющего на C**```c
/* =====================================================================
mdcliapi.c
Majordomo Protocol Client API
Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "mdcliapi.h"
// Структуры определения
// Мы будем использовать методы-члены для доступа к этим свойствам
struct _mdcli_t {
zctx_t *ctx; // Контекст
char *broker;
void *client; // Соединение с агентом
int verbose; // Использовать ли стандартный вывод для отображения текущей активности
int timeout; // Время ожидания запроса (миллисекунды)
int retries; // Количество попыток повторной отправки запроса
};
// ---------------------------------------------------------------------
// Подключение к агенту или переподключение
void s_mdcli_connect_to_broker(mdcli_t *self)
{
if (self->client)
zsocket_destroy(self->ctx, self->client);
self->client = zsocket_new(self->ctx, ZMQ_REQ);
zmq_connect(self->client, self->broker);
if (self->verbose)
zclock_log("I: Подключаюсь к агенту %s...", self->broker);
}
``````c
// ---------------------------------------------------------------------
// Конструктор
mdcli_t *
mdcli_new(char *broker, int verbose)
{
assert(broker);
mdcli_t *self = (mdcli_t *)zmalloc(sizeof(mdcli_t));
self->ctx = zctx_new();
self->broker = strdup(broker);
self->verbose = verbose;
self->timeout = 2500; // миллисекунды
}
```
```c
self->retries = 3; // количество попыток
s_mdcli_connect_to_broker(self);
return self;
}
// ---------------------------------------------------------------------
// Деструктор
void
mdcli_destroy(mdcli_t **self_p)
{
assert(self_p);
if (*self_p) {
mdcli_t *self = *self_p;
zctx_destroy(&self->ctx);
free(self->broker);
free(self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Установка времени ожидания запроса
void
mdcli_set_timeout(mdcli_t *self, int timeout)
{
assert(self);
self->timeout = timeout;
}
// ---------------------------------------------------------------------
// Установка количества попыток повторной отправки запроса
void
mdcli_set_retries(mdcli_t *self, int retries)
{
assert(self);
self->retries = retries;
}
// ---------------------------------------------------------------------
// Отправка запроса агенту и попытка получения ответа;
// Сохранение права собственности на сообщение, его удаление после отправки;
// Возврат ответного сообщения или NULL.
zmsg_t *
mdcli_send(mdcli_t *self, char *service, zmsg_t **request_p)
{
assert(self);
assert(request_p);
zmsg_t *request = *request_p;
// Обертка сообщения в протокол
// Frame 1: "MDPCxy" (шесть байтов, MDP/Client x.y)
// Frame 2: имя сервиса (печатаемая строка)
zmsg_pushstr(request, service);
zmsg_pushstr(request, MDPC_CLIENT);
if (self->verbose) {
zclock_log("I: Отправка запроса к сервису '%s':", service);
zmsg_dump(request);
}
}
``````c
int retries_left = self->retries;
while (retries_left && !zctx_interrupted) {
zmsg_t *msg = zmsg_dup(request);
zmsg_send(&msg, self->client);
while (TRUE) {
// Опрос сокета для получения ответа с таймаутом
zmq_pollitem_t items[] = {
{ self->client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll(items, 1, self->timeout * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
// Обработка полученного ответа
if (items[0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv(self->client);
if (self->verbose) {
zclock_log("I: получен ответ:");
zmsg_dump(msg);
}
// Не пытайтесь обрабатывать ошибки, просто верните ошибку
assert(zmsg_size(msg) >= 3);
zframe_t *header = zmsg_pop(msg);
assert(zframe_streq(header, MDPC_CLIENT));
zframe_destroy(&header);
zframe_t *reply_service = zmsg_pop(msg);
assert(zframe_streq(reply_service, service));
zframe_destroy(&reply_service);
zmsg_destroy(&request);
return msg; // Успешно
}
else if (--retries_left) {
if (self->verbose)
zclock_log("W: нет ответа, повторное подключение...");
// Повторное подключение и отправка сообщения
s_mdcli_connect_to_broker(self);
zmsg_t *msg = zmsg_dup(request);
zmsg_send(&msg, self->client);
}
else {
if (self->verbose)
zclock_log("W: произошла серьезная ошибка, отмена повторной попытки.");
break; // Отмена
}
}
}
if (zctx_interrupted)
printf("W: получено сообщение об остановке, завершение процесса client...\n");
zmsg_destroy(&request);
```
return NULL;
}
```
Ниже приведен тестовый программный код, который выполняет 100 000 запросов-ответов:**mdclient: Приложение клиента для протокола Majordomo на C**
```c
//
// Пример использования клиента для протокола Majordomo
// Использует API mdcli для скрытия внутренней реализации протокола
//
// Давайте скомпилируем этот код напрямую, не создавая библиотеки
#include "mdcliapi.c"
int main(int argc, char *argv[])
{
int verbose = (argc > 1 && streq(argv[1], "-v"));
mdcli_t *session = mdcli_new("tcp://localhost:5555", verbose);
int count;
for (count = 0; count < 100000; count++)
{
zmsg_t *request = zmsg_new();
zmsg_pushstr(request, "Hello world");
zmsg_t *reply = mdcli_send(session, "echo", &request);
if (reply)
zmsg_destroy(&reply);
else
break; // Остановка выполнения
}
printf("Обработано %d запросов-ответов\n", count);
mdcli_destroy(&session);
return 0;
}
```
Вот API для worker'а:**mdwrkapi: API для работника Majordomo на C**
```c
/* =====================================================================
mdwrkapi.c
Majordomo Protocol Worker API
Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "mdwrkapi.h"
//可靠性参数
#define HEARTBEAT_LIVENESS 3 //合理值:3-5
//类结构
//使用成员函数访问属性
struct _mdwrk_t {
zctx_t *ctx; //上下文
char *broker;
char *service;
void *worker; //连接到代理的套接字
int verbose; //使用标准输出打印活动
//心跳设置
uint64_t heartbeat_at; //发送心跳的时间
size_t liveness; //尝试次数
int heartbeat; //心跳延迟,单位:毫秒
int reconnect; //重连延迟,单位:毫秒
//内部状态
int expect_reply; //初始值为0
//应答地址,如果存在的话
zframe_t *reply_to;
};
// ---------------------------------------------------------------------
//发送消息给代理
//如果没有提供消息,则内部创建一个
static void
s_mdwrk_send_to_broker(mdwrk_t *self, char *command, char *option, zmsg_t *msg)
{
msg = msg ? zmsg_dup(msg) : zmsg_new();
//将协议信封压入消息顶部
if (option)
zmsg_pushstr(msg, option);
}
```zmsg_pushstr(msg, command);
zmsg_pushstr(msg, MDPW_WORKER);
zmsg_pushstr(msg, "");
if (self->verbose) {
zclock_log("I: отправка %s брокеру", mdps_commands[(int)*command]);
zmsg_dump(msg);
}
zmsg_send(&msg, self->worker);
}// ---------------------------------------------------------------------
// Подключение или повторное подключение к брокеру
void s_mdwrk_connect_to_broker(mdwrk_t *self)
{
if (self->worker)
zsocket_destroy(self->ctx, self->worker);
self->worker = zsocket_new(self->ctx, ZMQ_DEALER);
zmq_connect(self->worker, self->broker);
if (self->verbose)
zclock_log("I: подключение к брокеру %s...", self->broker);
// Регистрация типа услуги у брокера
s_mdwrk_send_to_broker(self, MDPW_READY, self->service, NULL);
// Когда уровень живучести равен нулю, это означает, что соединение с брокером прервано
self->liveness = HEARTBEAT_LIVENESS;
self->heartbeat_at = zclock_time() + self->heartbeat;
}
// ---------------------------------------------------------------------
// Конструктор
mdwrk_t *
mdwrk_new(char *broker, char *service, int verbose)
{
assert(broker);
assert(service);
mdwrk_t *self = (mdwrk_t *)zmalloc(sizeof(mdwrk_t));
self->ctx = zctx_new();
self->broker = strdup(broker);
self->service = strdup(service);
self->verbose = verbose;
self->heartbeat = 2500; // миллисекунды
self->reconnect = 2500; // миллисекунды
s_mdwrk_connect_to_broker(self);
return self;
}
// ---------------------------------------------------------------------
// Деструктор
void
mdwrk_destroy(mdwrk_t **self_p)
{
assert(self_p);
if (*self_p) {
mdwrk_t *self = *self_p;
zctx_destroy(&self->ctx);
free(self->broker);
free(self->service);
free(self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Установка времени таймаута сердцебиения
void
mdwrk_set_heartbeat(mdwrk_t *self, int heartbeat)
{
self->heartbeat = heartbeat;
}```c
// ---------------------------------------------------------------------
// Установка времени таймаута повторного подключения
void
mdwrk_set_reconnect(mdwrk_t *self, int reconnect)
{
self->reconnect = reconnect;
}
```
```markdown
mdwrk_recv (mdwrk_t *self, zmsg_t **reply_p)
{
// Форматируем и отправляем запрос полученного ответа
assert (reply_p);
zmsg_t *reply = *reply_p;
assert (reply || !self->expect_reply);
if (reply) {
assert (self->reply_to);
zmsg_wrap (reply, self->reply_to);
s_mdwrk_send_to_broker (self, MDPW_REPLY, NULL, reply);
zmsg_destroy (reply_p);
}
self->expect_reply = 1;
while (TRUE) {
zmq_pollitem_t items[] = {
{ self->worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, self->heartbeat * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
if (items[0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->worker);
if (!msg)
break; // Прерывание
if (self->verbose) {
zclock_log ("I: Получено сообщение от агента:");
zmsg_dump (msg);
}
self->liveness = HEARTBEAT_LIVENESS;
// Не следует обрабатывать ошибки, достаточно просто выдать сообщение об ошибке
assert (zmsg_size (msg) >= 3);
zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
zframe_destroy (&empty);
zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPW_WORKER));
zframe_destroy (&header);
zframe_t *command = zmsg_pop (msg);
if (zframe_streq (command, MDPW_REQUEST)) {
// Здесь необходимо сохранить все адреса перед пустым фреймом в сообщении,
// но здесь мы временно сохраняем только один
self->reply_to = zmsg_unwrap (msg);
zframe_destroy (&command);
return msg; // Обработка запроса
}
else
```
Все изменения внесены согласно правилам перевода.```c
if (zframe_streq (command, MDPW_HEARTBEAT))
; // Не требуется никакой обработка для пульса
else
if (zframe_streq (command, MDPW_DISCONNECT))
s_mdwrk_connect_to_broker (self);
else {
zclock_log ("E: Недопустимое сообщение");
zmsg_dump (msg);
}
zframe_destroy (&command);
zmsg_destroy (&msg);
}
else
if (--self->liveness == 0) {
if (self->verbose)
zclock_log ("W: Соединение с агентом потеряно - повторная попытка. . . ");
}
```c
zclock_sleep(self->reconnect);
s_mdwrk_connect_to_broker(self);
}
// Время от времени отправляем сообщение
if (zclock_time() > self->heartbeat_at) {
s_mdwrk_send_to_broker(self, MDPW_HEARTBEAT, NULL, NULL);
self->heartbeat_at = zclock_time() + self->heartbeat;
}
}
if (zctx_interrupted)
printf("W: Получено сообщение о прерывании, завершение работы worker. . . \n");
return NULL;
}
```Ниже приведен тестовый скрипт, реализующий службу с именем echo:
```**mdworker: Основной Majordomo worker на C**
```c
//
// Пример работы с протоколом Majordomo
// Использует API mdwrk для скрытия внутренней реализации протокола MDP
//
// Давайте просто скомпилируем этот код, а не создаем библиотеку
#include "mdwrkapi.c"
int main(int argc, char *argv[])
{
int verbose = (argc > 1 && streq(argv[1], "-v"));
mdwrk_t *session = mdwrk_new(
"tcp://localhost:5555", "echo", verbose);
zmsg_t *reply = NULL;
while (1) {
zmsg_t *request = mdwrk_recv(session, &reply);
if (request == NULL)
break; // worker был остановлен
reply = request; // echo сервис... на самом деле довольно сложный :)
}
mdwrk_destroy(&session);
return 0;
}
```
Несколько пояснений:
* API является однопоточным, поэтому worker не отправляет пакеты сердцебиения в фоновом режиме, что мы и хотели бы видеть: если программа worker прекращает работу, сердцебиение также прекращается, и агент перестает отправлять новые запросы этому worker.
* В API worker нет настроек алгоритмов повторной попытки, так как это не стоит того, чтобы использовать эту сложную механику.
* API не предоставляет механизмов отслеживания ошибок, если возникают проблемы, они приводят к завершению программы (или выбросу исключения, в зависимости от языка). Это полезно для экспериментального программирования, так как позволяет сразу видеть результат выполнения. Однако в реальных условиях API должен быть достаточно надежным, чтобы правильно обрабатывать нелегальные сообщения.Может возникнуть вопрос, почему API worker закрывает свои сокеты и открывает новые? Особенно учитывая, что ZMQ имеет механизмы автоматического восстановления соединения при появлении узла. Мы можем обратиться к примерам worker'ов из простого пирата и из параноидального пирата, чтобы лучше понять это. ZMQ действительно имеет механизмы автоматического восстановления соединения, но если агент умирает и восстанавливается, worker не регистрируется заново. Для решения этой проблемы есть два подхода: первый, который мы используем здесь, заключается в том, чтобы worker закрывал свои сокеты и начинал все сначала, когда он определяет, что агент умер. Второй подход состоит в том, чтобы агент требовал от неизвестного worker'а регистрации его типа услуги при получении пульса от него, что требует спецификации этого правила в протоколе.Теперь давайте рассмотрим дизайн агента Majordomo, основной код которого представляет собой набор очередей, каждая из которых соответствует определённому типу услуги. Мы будем создавать соответствующие очереди при появлении worker'ов (и удалять их при исчезновении worker'ов, хотя мы пока не будем этим заниматься). Кроме того, мы будем поддерживать список worker'ов для каждой услуги.
Чтобы сделать код на C более читаемым и удобным для написания, я использовал контейнеры для хеш-таблиц и списков, предоставляемые проектом [ZFL](http://zfl.zeromq.org). Эти контейнеры названы [zhash](https://github.com/imatix/zguide/blob/master/examples/C/zhash.h) и [zlist](https://github.com/imatix/zguide/blob/master/examples/C/zlist.h). При использовании современных языков программирования можно воспользоваться встроенными контейнерами.**mdbroker: Majordomo брокер на C**
```c
//
// Majordomo протокол агент
// Минимальная реализация протоколов http://rfc.zeromq.org/spec:7 и spec:8
//
#include "czmq.h"
#include "mdp.h"
// Обычно мы получаем следующие значения из конфигурационного файла
#define HEARTBEAT_LIVENESS 3 // Разумное значение: 3-5
#define HEARTBEAT_INTERVAL 2500 // Единица измерения: миллисекунды
#define HEARTBEAT_EXPIRY HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
// Определение агента
typedef struct {
zctx_t *ctx; // Контекст
void *socket; // Сокет для соединения с клиентами и работниками
int verbose; // Использование стандартного вывода для отображения информации о работе
char *endpoint; // Конечная точка, к которой привязывается агент
zhash_t *services; // Хэш-таблица известных услуг
zhash_t *workers; // Хэш-таблица известных работников
zlist_t *waiting; // Список ожидающих работников
uint64_t heartbeat_at; // Время отправки пульса
} broker_t;
// Определение услуги
typedef struct {
char *name; // Название услуги
zlist_t *requests; // Список запросов от клиентов
zlist_t *waiting; // Список ожидающих работников
size_t workers; // Количество доступных работников
} service_t;
// Определение работника, состояние - свободен или занят
typedef struct {
char *identity; // Идентификатор работника
zframe_t *address; // Адресный фрейм
service_t *service; // Услуга, к которой принадлежит работник
int64_t expiry; // Время истечения с момента последнего пульса
} worker_t;
``````markdown
// ---------------------------------------------------------------------
// Функции, используемые агентом
static broker_t *
s_broker_new (int verbose);
static void
s_broker_destroy (broker_t **self_p);
static void
s_broker_bind (broker_t *self, char *endpoint);
static void
s_broker_purge_workers (broker_t *self);
// Функции, используемые услугой
static service_t *
s_service_require (broker_t *self, zframe_t *service_frame);
static void
s_service_destroy (void *argument);
static void
s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg);
static void
s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg);
// Функции, используемые работником
static worker_t *
s_worker_require (broker_t *self, zframe_t *address);
static void
s_worker_delete (broker_t *self, worker_t *worker, int disconnect);
static void
s_worker_destroy (void *argument);
static void
s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
s_worker_send (broker_t *self, worker_t *worker, char *command, char *option, zmsg_t *msg);
static void
s_worker_waiting (broker_t *self, worker_t *worker);
// Функции, используемые клиентами
static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
// ---------------------------------------------------------------------
// Основная программа
int main (int argc, char *argv []) {
int verbose = (argc > 1 && streq (argv [1], "-v"));
broker_t *self = s_broker_new (verbose);
s_broker_bind (self, "tcp://*:5555");
// Принимает и обрабатывает сообщения до тех пор, пока программа не будет завершена
while (TRUE) {
zmq_pollitem_t items [] = {
{ self->socket, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прерывание
// Обрабатывает следующее входящее сообщение, если есть
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->socket);
if (!msg)
break; // Прерывание
}
}
}
```
Все необходимые изменения были внесены, чтобы текст был грамматически корректен и соответствовал стандартам IT-документации. if (self->verbose) {
zclock_log ("I: Received message:");
zmsg_dump (msg);
}
zframe_t *sender = zmsg_pop (msg);
zframe_t *empty = zmsg_pop (msg);
zframe_t *header = zmsg_pop (msg);
if (zframe_streq (header, MDPC_CLIENT))
s_client_process (self, sender, msg);
else
if (zframe_streq (header, MDPW_WORKER))
s_worker_process (self, sender, msg);
else {
zclock_log ("E: Invalid message:");
zmsg_dump (msg);
zmsg_destroy (&msg);
}
zframe_destroy (&sender);
zframe_destroy (&empty);
zframe_destroy (&header);
}
// Disconnects and removes expired workers
// Periodically sends a heartbeat to the worker
if (zclock_time () > self->heartbeat_at) {
s_broker_purge_workers (self);
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker) {
s_worker_send (self, worker, MDPW_HEARTBEAT, NULL, NULL);
worker = (worker_t *) zlist_next (self->waiting);
}
self->heartbeat_at = zclock_time() + HEARTBEAT_INTERVAL;
}
}
if (zctx_interrupted)
printf("W: Received interrupt signal, shutting down... \n");
s_broker_destroy(&self);
return 0;
}
// ---------------------------------------------------------------------
// Constructor for the agent object
static broker_t *
s_broker_new(int verbose)
{
broker_t *self = (broker_t *) zmalloc(sizeof(broker_t));
// Initializes the agent's state
self->ctx = zctx_new();
self->socket = zsocket_new(self->ctx, ZMQ_ROUTER);
self->verbose = verbose;
self->services = zhash_new();
self->workers = zhash_new();
self->waiting = zlist_new();
self->heartbeat_at = zclock_time() + HEARTBEAT_INTERVAL;
return self;
}
// ---------------------------------------------------------------------
// Destructor for the agent object static void
s_broker_destroy(broker_t **self_p)
{
assert(self_p);
if (*self_p) {
broker_t *self = *self_p;
zctx_destroy(&self->ctx);
zhash_destroy(&self->services);
zhash_destroy(&self->workers);
zlist_destroy(&self->waiting);
free(self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Привязка агентского сокета к конечной точке, можно вызывать эту функцию несколько раз.
// Мы используем один сокет для одновременного обслуживания клиентов и рабочих процессов.
void
s_broker_bind(broker_t *self, char *endpoint)
{
zsocket_bind(self->socket, endpoint);
zclock_log("I: Agent MDP/0.1.1 active on address %s", endpoint);
}
// ---------------------------------------------------------------------
// Удаление просроченных рабочих процессов из состояния "ожидания".
static void
s_broker_purge_workers(broker_t *self)
{
worker_t *worker = (worker_t *) zlist_first(self->waiting);
while (worker) {
if (zclock_time() < worker->expiry)
continue; // Рабочий процесс не просрочен, продолжаем поиск.
if (self->verbose)
zclock_log("I: Deleting expired worker process: %s",
worker->identity);
s_worker_delete(self, worker, 0);
worker = (worker_t *) zlist_first(self->waiting);
}
}
// ---------------------------------------------------------------------
// Поиск или создание нового элемента услуги.
static service_t *
s_service_require(broker_t *self, zframe_t *service_frame)
{
assert(service_frame);
char *name = zframe_strdup(service_frame);
service_t *service = (service_t *) zhash_lookup(self->services, name);
if (service == NULL) {
service = (service_t *) zmalloc(sizeof(service_t));
service->name = name;
service->requests = zlist_new();
service->waiting = zlist_new();
zhash_insert(self->services, name, service);
} zhash_freefn(self->services, name, s_service_destroy);
if (self->verbose)
zclock_log("I: Received message:");
} else {
free(name);
}
return service;
}// ---------------------------------------------------------------------
// Уничтожает объект сервиса при удалении его из broker->services
static void
s_service_destroy(void *argument)
{
service_t *service = (service_t *) argument;
// Уничтожает все элементы в очереди запросов
while (zlist_size(service->requests)) {
zmsg_t *msg = zlist_pop(service->requests);
zmsg_destroy(&msg);
}
zlist_destroy(&service->requests);
zlist_destroy(&service->waiting);
free(service->name);
free(service);
}
// ---------------------------------------------------------------------
// Распределяет запросы между ожидающими работниками, если это возможно
static void
s_service_dispatch(broker_t *self, service_t *service, zmsg_t *msg)
{
assert(service);
if (msg) { // Добавляет сообщение в очередь
zlist_append(service->requests, msg);
}
s_broker_purge_workers(self);
while (zlist_size(service->waiting) && zlist_size(service->requests)) {
worker_t *worker = zlist_pop(service->waiting);
zlist_remove(self->waiting, worker);
zmsg_t *request = zlist_pop(service->requests);
s_worker_send(self, worker, MDPW_REQUEST, NULL, request);
zmsg_destroy(&request);
}
}// ---------------------------------------------------------------------
// Обрабатывает внутренние сервисы с использованием соглашения 8/MMI
static void
s_service_internal(broker_t *self, zframe_t *service_frame, zmsg_t *msg)
{
char *return_code;
if (zframe_streq(service_frame, "mmi.service")) {
char *name = zframe_strdup(zmsg_last(msg));
service_t *service = (service_t *) zhash_lookup(self->services, name);
return_code = service && service->workers ? "200" : "404";
free(name);
} else {
return_code = "501";
}
}```markdown
zframe_reset(zmsg_last(msg), return_code, strlen(return_code));
// Удаляем и сохраняем envelop для клиента, добавляем заголовок протокола и имя сервиса, а затем заново упаковываем envelop
zframe_t *client = zmsg_unwrap(msg);
zmsg_push(msg, zframe_dup(service_frame));
zmsg_pushstr(msg, MDPC_CLIENT);
zmsg_wrap(msg, client);
zmsg_send(&msg, self->socket);
}
// ---------------------------------------------------------------------
// Создаем worker по необходимости
static worker_t *
s_worker_require(broker_t *self, zframe_t *address)
{
assert(address);
// self->workers использует идентификатор worker в качестве ключа
char *identity = zframe_strhex(address);
worker_t *worker =
(worker_t *)zhash_lookup(self->workers, identity);
if (worker == NULL) {
worker = (worker_t *)zmalloc(sizeof(worker_t));
worker->identity = identity;
worker->address = zframe_dup(address);
zhash_insert(self->workers, identity, worker);
zhash_freefn(self->workers, identity, s_worker_destroy);
if (self->verbose)
zclock_log("I: Регистрация нового worker: %s", identity);
}
else
free(identity);
return worker;
}
// ---------------------------------------------------------------------
// Удаляем worker из всех структур данных и уничтожаем его
static void
s_worker_delete(broker_t *self, worker_t *worker, int disconnect)
{
assert(worker);
if (disconnect)
s_worker_send(self, worker, MDPW_DISCONNECT, NULL, NULL);
}```c
if (worker->service) {
zlist_remove(worker->service->waiting, worker);
worker->service->workers--;
}
zlist_remove(self->waiting, worker);
// Этот метод косвенно вызывает метод s_worker_destroy()
zhash_delete(self->workers, worker->identity);
}
// ---------------------------------------------------------------------
// При удалении worker из broker->workers уничтожается объект worker
static void
s_worker_destroy(void *argument)
{
worker_t *worker = (worker_t *)argument;
zframe_destroy(&worker->address);
free(worker->identity);
free(worker);
}
// ---------------------------------------------------------------------
// Обрабатываем сообщение, полученное от worker
static void
s_worker_process(broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert(zmsg_size(msg) >= 1); // Сообщение должно содержать хотя бы одну командную рамку
zframe_t *command = zmsg_pop(msg);
char *identity = zframe_strhex(sender);
int worker_ready = (zhash_lookup(self->workers, identity) != NULL);
free(identity);
worker_t *worker = s_worker_require(self, sender);
if (zframe_streq(command, MDPW_READY)) {
// Если в очереди worker уже есть этот worker, но все же получен сигнал "готовности", удалите этого worker.
if (worker_ready)
s_worker_delete(self, worker, 1);
else
if (zframe_size(sender) >= 4 // Имя сервиса является зарезервированным сервисом
&& memcmp(zframe_data(sender), "mmi. ", 4) == 0)
s_worker_delete(self, worker, 1);
else {
// Соответствует worker сервису и устанавливает его в состояние "свободен"
zframe_t *service_frame = zmsg_pop(msg);
worker->service = s_service_require(self, service_frame);
worker->service->workers++;
s_worker_waiting(self, worker);
zframe_destroy(&service_frame);
}
}
else
if (zframe_streq(command, MDPW_REPLY)) {
if (worker_ready) {
```
```markdown
// Удаляет и сохраняет пакет для отправки клиенту, добавляет заголовок протокола и имя сервиса, а затем заново упаковывает пакет
zframe_t *client = zmsg_unwrap(msg);
zmsg_pushstr(msg, worker->service->name);
zmsg_pushstr(msg, MDPC_CLIENT);
zmsg_wrap(msg, client);
zmsg_send(&msg, self->socket);
s_worker_waiting(self, worker);
}
else
s_worker_delete(self, worker, 1);
}
else
if (zframe_streq(command, MDPW_HEARTBEAT)) {
if (worker_ready)
worker->expiry = zclock_time() + HEARTBEAT_EXPIRY;
else
s_worker_delete(self, worker, 1);
}
else
if (zframe_streq(command, MDPW_DISCONNECT))
s_worker_delete(self, worker, 0);
else {
zclock_log("E: Недопустимое сообщение");
zmsg_dump(msg);
}
free(command);
zmsg_destroy(&msg);
}
// ---------------------------------------------------------------------
// Отправка сообщения worker
// Если указатель указывает на сообщение, отправьте его, но не уничтожайте, так как это обязанность вызывающего
static void
s_worker_send(broker_t *self, worker_t *worker, char *command,
char *option, zmsg_t *msg)
{
msg = msg ? zmsg_dup(msg) : zmsg_new();
// Протокольная обертка помещается в вершину сообщения
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);
// Вставка маршрутизационной рамки в начало сообщения
zmsg_wrap (msg, zframe_dup (worker->address));
if (self->verbose) {
zclock_log ("I: Отправка сообщения для worker %s",
mdps_commands [(int) *command]);
zmsg_dump (msg);
}
zmsg_send (&msg, self->socket);
}
// ---------------------------------------------------------------------
```
```c
// Ожидание worker
static void
s_worker_waiting (broker_t *self, worker_t *worker)
{
// Добавление worker в очередь ожидания агента и службы
zlist_append (self->waiting, worker);
zlist_append (worker->service->waiting, worker);
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
s_service_dispatch (self, worker->service, NULL);
}
// Обработка запроса от client
static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert (zmsg_size (msg) >= 2); // Название службы + содержимое запроса
zframe_t *service_frame = zmsg_pop (msg);
service_t *service = s_service_require (self, service_frame);
// Установка адреса отправителя для ответа
zmsg_wrap (msg, zframe_dup (sender));
if (zframe_size (service_frame) >= 4
&& memcmp (zframe_data (service_frame), "mmi. ", 4) == 0)
s_service_internal (self, service_frame, msg);
else
s_service_dispatch (self, service, msg);
zframe_destroy (&service_frame);
}
```
Этот пример является одним из самых сложных, которые мы видели, состоящим приблизительно из 500 строк кода. Написание этого кода и его укрепление заняло около двух дней. Однако это всего лишь часть полного агента на основе сервисов.
```Несколько пояснений:
* Управляемый протокол требует от нас одновременного обслуживания клиентов и рабочих в одном сокете, что очень полезно для развертывания и управления агентом: он будет принимать и отправлять запросы только через один ZMQ-конечный пункт, а не через два.
* Агент отлично реализует содержимое протокола MDP/0.1, включая механизм отключения при отправке нелегальных команд и сердцебиений.
* Этот код может быть расширен до многопоточного, где каждый поток управляет одним сокетом, группой клиентов и рабочих. Такой подход становится интересным при разделении крупных архитектур. Код на языке C уже имеет такую структуру, поэтому его легко реализовать.
* Код также может быть расширен до режима "мастер-слейв" или "двойного онлайн", что повысит надежность. Поскольку агент по сути является бесштатным, сохраняя только наличие или отсутствие сервиса, клиенты и рабочие могут выбирать другие агенты для связи.
* Интервал сердцебиений в примере кода составляет 5 секунд, что снижает количество вывода при отладке. В реальном мире значение должно быть ниже, но процесс повторной попытки следует сделать немного длиннее, чтобы дать время для запуска сервиса, например, 10 секунд.
### Асинхронный управляемый режимВышеупомянутый способ реализации управляемого режима довольно прост, клиенты остаются в простом пиратском режиме, просто используя переписанный API. Я запустил программу на тестовой машине и обработал около 100 000 запросов за 14 секунд, что частично связано с тем, что время процессора тратится на копирование фреймов сообщений. Но основная проблема заключается в том, что мы всегда обрабатываем запросы последовательно (round-trip): отправка-получение-отправка-получение... Внутри ZMQ отключен алгоритм оптимизации отправки TCP (алгоритм Nagle), но последовательная обработка все равно является неэффективной.Теория есть теория, но она должна быть проверена практикой. Мы используем простой тестовый скрипт, чтобы проверить, действительно ли последовательная обработка занимает много времени. Этот тестовый скрипт отправляет набор сообщений: в первый раз отправляет одно сообщение и получает одно, во второй раз — отправляет сразу несколько сообщений и получает сразу несколько. Результаты должны быть одинаковыми, но скорость будет совершенно другой.**tripping: Round-trip демонстрация на C**```c
//
// Round-trip симуляция
//
// В этом примере программа запускает клиент, рабочий процесс и прокси-сервер в многопоточном режиме,
// когда клиент завершает обработку, он отправляет сигнал основному процессу.
//
#include "czmq.h"
static void
client_task (void *args, zctx_t *ctx, void *pipe)
{
void *client = zsocket_new (ctx, ZMQ_DEALER);
zmq_setsockopt (client, ZMQ_IDENTITY, "C", 1);
zsocket_connect (client, "tcp://localhost:5555");
printf ("Начало тестирования... \n");
zclock_sleep (100);
int requests;
int64_t start;
printf ("Синхронный round-trip тест... \n");
start = zclock_time ();
for (requests = 0; requests < 10000; requests++) {
zstr_send (client, "hello");
char *reply = zstr_recv (client);
free (reply);
}
printf ("%d раз/с\n",
(1000 * 10000) / (int) (zclock_time () - start));
printf ("Асинхронный round-trip тест... \n");
start = zclock_time ();
for (requests = 0; requests < 100000; requests++)
zstr_send (client, "hello");
for (requests = 0; requests < 100000; requests++) {
char *reply = zstr_recv (client);
free (reply);
}
printf ("%d раз/с\n",
(1000 * 100000) / (int) (zclock_time () - start));
zstr_send (pipe, "Завершено");
}
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zmq_setsockopt (worker, ZMQ_IDENTITY, "W", 1);
zsocket_connect (worker, "tcp://localhost:5556");
while (1) {
zmsg_t *msg = zmsg_recv (worker);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
static void *
broker_task (void *args)
{
// Подготовка контекста и сокета
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555");
zsocket_bind (backend, "tcp://*:5556");
// Инициализация объекта опроса
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
while (1) {
int rc = zmq_poll(items, 2, -1);
if (rc == -1)
break; // Прерывание
if (items[0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv(frontend);
zframe_t *address = zmsg_pop(msg);
zframe_destroy(&address);
``````c
zmsg_pushstr(msg, "W");
zmsg_send(&msg, backend);
}
if (items[1].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv(backend);
zframe_t *address = zmsg_pop(msg);
zframe_destroy(&address);
zmsg_pushstr(msg, "C");
zmsg_send(&msg, frontend);
}
}
int main(void)
{
// Создаем контекст
zctx_t *ctx = zctx_new();
void *client = zthread_fork(ctx, client_task, NULL);
zthread_new(ctx, worker_task, NULL);
zthread_new(ctx, broker_task, NULL);
// Ждем сигнал от клиентского конца трубы
char *signal = zstr_recv(client);
free(signal);
zctx_destroy(&ctx);
return 0;
}
```
```В моей среде разработки результат выполнения выглядит следующим образом:```
Настройка теста...
Проверка однократного прохождения...
9057 вызовов/секунду
Проверка асинхронного прохождения...
173010 вызовов/секунду
```
Важно отметить, что при запуске клиент приостанавливается на некоторое время. Это происходит потому, что если сокет с указанным идентификатором не подключен к ROUTER-сокету, последний просто отбрасывает сообщение. В этом примере мы не используем алгоритм LRU, поэтому при более медленной скорости подключения worker'ов могут происходить потери данных, что влияет на результаты тестирования.
Мы можем видеть, что однократное прохождение цикла выполняется в 20 раз медленнее, чем асинхронное. Давайте применим это к паттерну менеджера.
Сначала давайте модифицируем API клиента, добавив независимые методы отправки и получения:
```
mdcli_t *mdcli_new (char *broker);
void mdcli_destroy (mdcli_t **self_p);
int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
zmsg_t *mdcli_recv (mdcli_t *self);
```
Затем нам потребуется небольшое количество времени, чтобы преобразовать синхронный API клиента в асинхронный:
```**mdcliapi2: Асинхронный клиентский API Majordomo на C**
```c
/* =====================================================================
mdcliapi2.c
Majordomo Protocol Client API (async version)
Реализует спецификацию MDP/Worker по адресу http://rfc.zeromq.org/spec:7.
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
Этот файл является частью руководства по ZeroMQ: http://zguide.zeromq.org
Это свободное программное обеспечение; вы можете распространять его и/или
изменять его на условиях лицензии GNU Lesser General Public License,
опубликованной Free Software Foundation; либо версия 3 лицензии, либо (по
вашему выбору) любой более поздней версии.
Это программное обеспечение распространяется в надежде, что оно будет полезным,
но БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ; даже без скрытой гарантииMERCHANTABILITY или FITNESS
FOR A PARTICULAR PURPOSE. См. GNU Lesser General Public License для получения
дополнительной информации.
Вы должны были получить копию GNU Lesser General Public License вместе с этим
программным обеспечением. Если нет, см. <http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "mdcliapi2.h"
```// Класс структура
// Использует методы класса для доступа к свойствам
struct _mdcli_t {
zctx_t *ctx; // Контекст
char *broker;
void *client; // Соединение с агентом
int verbose; // Вывод состояния в стандартный вывод
int timeout; // Время ожидания запроса
};
// ---------------------------------------------------------------------
// Подключение или повторное подключение к агенту
void s_mdcli_connect_to_broker(mdcli_t *self)
{
if (self->client)
zsocket_destroy(self->ctx, self->client);
self->client = zsocket_new(self->ctx, ZMQ_DEALER);
zmq_connect(self->client, self->broker);
if (self->verbose)
zclock_log("I: Подключаюсь к агенту %s...", self->broker);
}```c
// ---------------------------------------------------------------------
// Конструктор
mdcli_t *mdcli_new(char *broker, int verbose)
{
assert(broker);
mdcli_t *self = (mdcli_t *)zmalloc(sizeof(mdcli_t));
self->ctx = zctx_new();
self->broker = strdup(broker);
self->verbose = verbose;
self->timeout = 2500; // milliseconds
s_mdcli_connect_to_broker(self);
return self;
}
// ---------------------------------------------------------------------
// Деструктор
void
mdcli_destroy (mdcli_t **self_p)
{
assert (self_p);
if (*self_p) {
mdcli_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Установка времени ожидания запроса
void
mdcli_set_timeout (mdcli_t *self, int timeout)
{
assert (self);
self->timeout = timeout;
}
// ---------------------------------------------------------------------
// Отправка запроса агенту
// Получение владения сообщением запроса, его отправка и последующее удаление
int
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
assert (self);
assert (request_p);
zmsg_t *request = *request_p;
// Добавление в начало сообщения фреймов, согласно протоколу
// Frame 0: пустой (симулирует поведение REQ сокета)
// Frame 1: "MDPCxy" (6 байт, MDP/Client x.y)
// Frame 2: Имя сервиса (смотрите печатаемую строку)
zmsg_pushstr (request, service);
zmsg_pushstr (request, MDPC_CLIENT);
zmsg_pushstr (request, "");
if (self->verbose) {
zclock_log ("I: Sending request to service '%s':", service);
zmsg_dump (request);
}
zmsg_send (&request, self->client);
return 0;
}
// ---------------------------------------------------------------------
// Получение ответа на запрос, если ответ отсутствует, возвращается NULL;
// Эта функция не пытается восстановиться после сбоев агента,
``````
// Так как мы не храним информацию о запросах, которые не получили ответ,
// поэтому их нельзя повторно отправить.
zmsg_t *
mdcli_recv (mdcli_t *self)
{
assert (self);
// Опрос сокета для получения ответа
zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
if (rc == -1)
return NULL; // Прерывание
// Обработка ответа при его получении
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->client);
if (self->verbose) {
zclock_log ("I: получен ответ:");
zmsg_dump (msg);
}
// Не обрабатываем ошибки, а сразу выводим их
assert (zmsg_size (msg) >= 4);
zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
zframe_destroy (&empty);
zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPC_CLIENT));
zframe_destroy (&header);
zframe_t *service = zmsg_pop (msg);
zframe_destroy (&service);
return msg; // Успех
}
if (zctx_interrupted)
printf ("W: Получено сообщение о прерывании, завершение работы client... \n");
else
if (self->verbose)
zclock_log ("W: Критическая ошибка, отмена запроса");
return NULL;
}
```
Вот соответствующий тестовый код:
```c
```**mdclient2: Пример приложения клиента Majordomo на C**
```
//
// Асинхронный Majordomo клиент - пример программы
// Использует API mdcli для скрытия конкретной реализации протокола MDP
//
// Прямое компилирование исходного кода без создания библиотеки
#include "mdcliapi2.c"
int main(int argc, char *argv[])
{
int verbose = (argc > 1 && streq(argv[1], "-v"));
mdcli_t *session = mdcli_new("tcp://localhost:5555", verbose);
int count;
for(count = 0; count < 100000; count++)
{
zmsg_t *request = zmsg_new();
zmsg_pushstr(request, "Hello world");
mdcli_send(session, "echo", &request);
}
for(count = 0; count < 100000; count++)
{
zmsg_t *reply = mdcli_recv(session);
if(reply)
zmsg_destroy(&reply);
else
break; // Прерывание с помощью Ctrl-C
}
printf("Получено %d ответов\n", count);
mdcli_destroy(&session);
return 0;
}
```
Код агента и worker'а не изменился, так как мы не изменили протокол MDP. После модификации клиента можно заметить значительное увеличение скорости. Вот время обработки 100 000 запросов в синхронном режиме:
```
$ time mdclient
100000 запросов/ответов обработано
real 0m14.088s
user 0m1.310s
sys 0m2.670s
```
А вот время обработки 100 000 запросов в асинхронном режиме:
```
$ time mdclient2
100000 ответов получено
real 0m8.730s
user 0m0.920s
sys 0m1.550s
```
Давайте запустим 10 worker'ов и посмотрим на результат:
```
$ time mdclient2
100000 ответов получено
real 0m3.863s
user 0m0.730s
sys 0m0.470s
```Из-за необходимости получения сообщений через LRU-очередь, полная асинхронность не достигается. Однако, чем больше worker'ов, тем лучше производительность. На моей машине, когда количество worker'ов достигает 8, скорость уже не увеличивается — четырёхъядерный процессор может делать только столько. Тем не менее, мы получаем почти четырёхкратное увеличение скорости, а процесс модификации занял всего несколько минут. Кроме того, агент ещё не был оптимизирован, он всё ещё копирует сообщения, вместо использования нулевой копии. Однако, мы уже можем обрабатывать около 25 000 запросов-ответов в секунду, что уже очень хорошо.Конечно, асинхронный Majordomo клиент также имеет недостатки, одним из которых является невозможность восстановления после сбоя агента. В коде mdcliapi2 нет механизма восстановления соединения, а повторное подключение требует выполнения следующих условий:
* Каждый запрос имеет уникальный номер, а каждый ответ также содержит соответствующий номер, что требует изменения протокола для четкого определения;
* API клиента должен сохранять и отслеживать все отправленные, но ещё не получившие ответы запросы;
* Если агент прекращает работу, клиент будет повторно отправлять все сообщения.
Как можно заметить, высокая надёжность часто связана с увеличением сложности, и вопрос остаётся открытым — стоит ли применять эту механику в режиме управления? Это зависит от конкретной ситуации. Если это служба поиска по имени, где каждое соединение вызывается один раз, то применять эту механику нет необходимости; если же это веб-сервис на переднем крае, обслуживающий тысячи клиентов, то возможно, стоит её использовать.
### Поиск сервисовТеперь, когда у нас есть агент, ориентированный на сервисы, мы не можем узнать, предоставляет ли он определённый сервис. Если запрос завершается ошибкой, это может указывать на то, что сервис недоступен, но какова причина? Поэтому было бы полезно иметь возможность спросить агента: "Существует ли работающий сервис echo?" Самым очевидным решением было бы добавление нового типа команд в протокол MDP/Client, позволяющее клиенту спрашивать агента о доступности определённого сервиса. Однако, главным преимуществом MDP/Client является его простота, и добавление функции поиска сервисов могло бы усложнить его.Другой подход заключается в использовании метода, аналогичного обработке электронной почты, где неудачные запросы возвращаются обратно. Однако это также увеличивает сложность, так как требуется различать полученные сообщения как ответы или возвращённые запросы.
Давайте воспользуемся тем же подходом, создав новую механику на основе MDP, а не изменяя его. Управление сервисами само по себе является сервисом, и мы можем предлагать дополнительные услуги, такие как "деактивация сервиса", "получение данных сервиса" и т.д. Нам нужна механика, которая позволяет расширять протокол, не нарушая его основные принципы.
Таким образом, появляется небольшой RFC - MMI (Механизм управления интерфейса) уровня приложения, построенный на протоколе MDP: http://rfc.zeromq.org/spec:8 . Мы уже внедрили его в агент, возможно, вы заметили это. Ниже приведён пример кода, демонстрирующий использование этой функции поиска сервисов:**mmiecho: Поиск сервисов через Majordomo на C**```c
//
// Пример программы для запроса сервиса эхо
//
// Давайте скомпилируем напрямую без создания библиотеки
#include "mdcliapi.c"
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
// Имя сервиса, который нам нужно запросить
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "echo");
// Сообщение, отправляемое сервису "service query"
zmsg_t *reply = mdcli_send (session, "mmi.service", &request);
if (reply) {
char *reply_code = zframe_strdup (zmsg_first (reply));
printf ("Результат запроса для сервиса echo: %s\n", reply_code);
free (reply_code);
zmsg_destroy (&reply);
}
else
printf ("E: Агент не ответил, пожалуйста, убедитесь, что он работает\n");
mdcli_destroy (&session);
return 0;
}
```Прокси во время выполнения проверяет имя обслуживаемого сервиса и самостоятельно обрабатывает те, которые начинаются с `mmi.`, не пересылая запросы worker'ам. Вы можете запустить вышеупомянутый код без запуска worker'ов и наблюдать, как программа отвечает статусом 200 или 404. Реализация MMI в примере прокси-сервера очень проста: когда какой-либо worker прекращает работу, этот сервис всё ещё помечается как доступный. В реальной практике прокси-сервер должен очищать такие сервисы, которые не имеют связанных worker'ов, через определённые интервалы времени.### Инвайроидные службы
Инвайроидность означает возможность безопасно повторять выполнение операции. Например, просмотр часов является инвайроидной операцией, а заем денег у жены друга — нет. Некоторые клиент-серверные взаимодействия являются инвайроидными, а некоторые — нет. Примеры инвайроидных взаимодействий:
* Безопасное распределение задач, где сервер-работник является бессостоятельным, его результаты зависят от состояния клиента, поэтому он может повторно обрабатывать одинаковые запросы;
* Преобразование логических адресов в реальные конечные точки в службе именования, которое можно повторно запросить несколько раз, что делает его инвайроидным.
Примеры нинвайроидных взаимодействий:
* Логирование, где мы не хотим, чтобы одни и те же данные логирования записывались несколько раз;
* Любые службы, которые влияют на последующие узлы, например, если служба отправляет информацию на следующий узел, повторное получение одного и того же запроса приведёт к получению повторной информации;
* Когда служба изменяет общие данные без обеспечения инвайроидности, например, если служба выполняет списание со счета банка (дебет), это будет нинвайроидным действием.Если приложение предоставляет невиртуальные службы, следует рассмотреть, на каком этапе оно может потерпеть неудачу. Если программа прекращает работу во время простоя или обработки запроса, это не вызовет проблем. Мы можем использовать транзакционные механизмы баз данных, чтобы гарантировать одновременное выполнение операций списания и зачисления. Если приложение прекращает работу во время отправки запроса, это вызовет проблемы, так как для него это будет означать завершение работы.Если сеть заблокирована во время возврата ответа, клиент будет считать запрос неудачным и повторно отправит его, что приведёт к повторному выполнению того же запроса на стороне сервера. Это не то поведение, которое мы хотели бы видеть.
Часто используемое решение заключается в том, чтобы сервер обнаруживал и отклонял повторные запросы, что требует:
* Клиент должен добавлять уникальный идентификатор каждому запросу, включая идентификатор клиента и сообщения;
* Сервер должен использовать идентификатор клиента и сообщения в качестве ключа для хранения ответов;
* Когда сервер обнаруживает, что запрос уже существует в таблице ответов, он пропускает этот запрос и сразу возвращает содержимое ответа.
### Оfflайн надежность (Гигантский режим)
Когда вы осознаете, что режим дворецкого является очень надежной системой агента сообщений, вы можете захотеть использовать диск для промежуточного хранения сообщений, чтобы ещё больше повысить надёжность. Хотя этот подход широко используется во многих корпоративных системах передачи сообщений, я всё же немного против него, по следующим причинам:* Мы видим, что клиент в ленивом пиратском режиме может работать очень хорошо, функционируя в различных архитектурных средах. Единственная проблема заключается в том, что он предполагает, что worker является бесштатным и предоставляет идемпотентные услуги. Однако эту проблему можно решить другими способами, не прибегая к использованию диска.* Добавление диска создает новые проблемы, требующие дополнительных затрат на управление и обслуживание. Основное преимущество пиратского режима заключается в его простоте и ясности, что делает его менее склонным к сбою. Если вы все еще беспокоитесь о возможных проблемах с оборудованием, вы можете перейти к режиму peer-to-peer, который будет рассмотрен в последнем разделе этой главы.
Хотя есть эти причины, существует вполне обоснованная ситуация, когда использование диска для промежуточного хранения имеет смысл — асинхронная работа в офлайн сети. У пиратского режима есть проблема: клиент продолжает ждать ответа после отправки запроса. Если клиент и worker не поддерживают постоянное соединение (можно сравнить это с электронной почтой), мы не можем установить бесштатную сеть между клиентом и worker, поэтому нам нужно сохранять состояние.
Из этого возникает Гигантский режим, при котором сообщения записываются на диск, чтобы гарантировать их сохранность. Когда мы выполняем запросы к службе, мы обращаемся к уровню гиганта. Гигант строится поверх режима дворецкого, а не переопределяет протокол MDP. Преимуществом такого подхода является возможность реализации надежности в конкретном worker без необходимости добавления логики агента.* Реализация становится проще;
* Агент пишется на одном языке, а worker — на другом;
* Возможность свободного обновления этого режима.
Единственным недостатком является наличие дополнительного уровня взаимодействия между агентом и диском, но это стоит того.
У нас есть множество способов реализации устойчивой архитектуры запрос-ответ, и цель, конечно, состоит в том, чтобы сделать это как можно проще. Самый простой способ, который приходит мне в голову, — это предоставление услуги "гиганта" для агента, которая не влияет на работу существующего worker. Если клиент хочет немедленного ответа, он может общаться с агентом; если нет, он может общаться с гигантом: "Привет, гигант, пожалуйста, обработай этот запрос, я пойду за продуктами." Общение клиента с гигантом обычно выглядит следующим образом:
* Клиент: Пожалуйста, помоги мне обработать этот запрос. Гигант: Хорошо.
* Клиент: Есть ли ответ для меня? Гигант: Да, есть (или нет).
* Клиент: Хорошо, ты можешь освободить этот запрос, работа завершена. Гигант: Хорошо.
Общение гиганта с агентом обычно выглядит следующим образом:
* Гигант: Привет, агент, у тебя есть сервис под названием echo? Агент: Да, кажется, есть.
* Гигант: Привет, сервис echo, пожалуйста, помоги обработать этот запрос. Echo: Хорошо, вот ответ.
* Гигант: Спасибо!Можно представить различные ситуации с отказами, чтобы проверить, способна ли вышеупомянутая модель справиться с ними. Например, если worker упал при обработке запроса, гигант будет постоянно перезапускать запрос; если ответ потерялся во время передачи, гигант также будет повторять попытки; если запрос был обработан, но клиент не получил ответ, он снова спросит гиганта; если гигант упал при обработке запроса или отправке ответа, клиент будет повторять попытки; поскольку запрос сохраняется на диске, он не будет потерян.
```
+-----------+ +-----------+ +-----------+
| | | | | |
| Клиент | | Клиент | | Клиент |
| | | | | |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
"Титаник, | "Титаник,
дай мне кофе" | дай мне чай"
v Диск
/-----------\ +---------+ +-------+
| | | | | |
| Брокер |<--->| Титаник |<--->| {s} |
| | | | | |
\-----------/ +---------+ +-------+
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| "Вода" | | "Чай" | | "Кофе" |
+-----------+ +-----------+ +-----------+
| | | | | |
| Рабочий | | Рабочий | | Рабочий |
| | | | | |
+-----------+ +-----------+ +-----------+
```
Теперь заменим текст в кавычках на русском:
```
+-----------+ +-----------+ +-----------+
| | | | | |
| Клиент | | Клиент | | Клиент |
| | | | | |
\-----------/ \-----------/ \-----------/
^ ^ ^
| | |
\---------------+---------------/
"Титаник, дай мне кофе" | "Титаник, дай мне чай"
v Диск
/-----------\ +---------+ +-------+
| | | | | |
| Брокер |<--->| Титаник |<--->| {s} |
| | | | | |
\-----------/ +---------+ +-------+
^
|
/---------------+---------------\
| | |
v v v
/-----------\ /-----------\ /-----------\
| "Вода" | | "Чай" | | "Кофе" |
+-----------+ +-----------+ +-----------+
| | | | | |
| Рабочий | | Рабочий | | Рабочий |
| | | | | |
+-----------+ +-----------+ +-----------+
``` Рисунок # - Шаблон Титаника
```В этой системе процесс рукопожатия занимает значительное время, но клиент может использовать асинхронный домашний режим, отправляя несколько запросов одновременно и ожидая ответы вместе.
Нам нужен способ, чтобы клиент мог запрашивать содержимое ответа. Разные клиенты могут обращаться к одному и тому же сервису, и каждый клиент имеет уникальное имя. Простым, логичным и безопасным решением является:
* Когда титан получает запрос, он генерирует уникальный идентификатор (UUID) для каждого запроса и возвращает этот идентификатор клиенту;
* Клиент должен предоставить этот идентификатор при запросе содержимого ответа.
Таким образом, клиенту необходимо будет безопасно хранить UUID, что исключает необходимость дополнительной проверки. Есть ли другие решения? Мы можем использовать постоянные сокеты, явно указывая идентификатор клиента. Однако это создаст проблемы управления, и если два клиента имеют одинаковый идентификатор сокета, это приведет к бесконечным проблемам.
Перед тем как начать разработку нового протокола, давайте подумаем, как клиенты будут взаимодействовать с титаном. Одним вариантом является предоставление услуги, которая работает с тремя различными командами; другим — более простым — вариантом является предоставление трех независимых услуг:``` * **titanic.request** - Сохраняет запрос и возвращает UUID
* **titanic.reply** - Получает содержимое ответа по UUID
* **titanic.close** - Подтверждает правильную обработку запроса
Нам потребуется создать многопоточный рабочий процесс, как мы делали это ранее с использованием ZMQ для многопоточной работы. Однако перед тем как начать писать код, давайте запишем некоторые определения для титанового режима: http://rfc.zeromq.org/spec:9 . Мы назовем его "Протоколом сервиса титана" или TSP.
Использование протокола TSP, конечно, требует от клиента выполнения дополнительной работы. Вот пример простого, но достаточно надежного клиента:**ticlient: Пример клиента титана на C**```c
//
// Пример клиента в режиме Titan
// Реализует клиентскую часть протокола http://rfc.zeromq.org/spec:9
// Давайте скомпилируем прямо здесь, без создания библиотеки
#include "mdcliapi.c"
// Вызов услуги TSP
// В случае успеха возвращает ответ (код состояния: 200), иначе возвращает NULL
static zmsg_t *
s_service_call (mdcli_t *session, char *service, zmsg_t **request_p)
{
zmsg_t *reply = mdcli_send (session, service, request_p);
if (reply) {
zframe_t *status = zmsg_pop (reply);
if (zframe_streq (status, "200")) {
zframe_destroy (&status);
return reply;
}
else
if (zframe_streq (status, "400")) {
printf ("E: Клиент столкнулся с серьезной ошибкой, отмена запроса\n");
exit (EXIT_FAILURE);
}
else
if (zframe_streq (status, "500")) {
printf ("E: Сервер столкнулся с серьезной ошибкой, отмена запроса\n");
exit (EXIT_FAILURE);
}
}
else
exit (EXIT_SUCCESS); // Прерывание или ошибка
zmsg_destroy (&reply);
return NULL; // Запрос не выполнен успешно, но причина не указана
}
int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));
mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
// 1. Отправка запроса на сервис echo серверу Titan
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "echo");
zmsg_addstr (request, "Hello world");
zmsg_t *reply = s_service_call (
session, "titanic.request", &request);
zframe_t *uuid = NULL;
if (reply) {
uuid = zmsg_pop (reply);
zmsg_destroy (&reply);
zframe_print (uuid, "I: request UUID ");
}
// 2. Ожидание ответа
while (! zctx_interrupted) {
zclock_sleep (100);
request = zmsg_new ();
zmsg_add (request, zframe_dup (uuid));
zmsg_t *reply = s_service_call (
session, "titanic.reply", &request);
}```c
if (reply) {
char *reply_string = zframe_strdup(zmsg_last(reply));
printf("Ответ: %s\n", reply_string);
free(reply_string);
zmsg_destroy(&reply);
// 3. Закрытие запроса
request = zmsg_new();
zmsg_add(request, zframe_dup(uuid));
reply = s_service_call(session, "titanic.close", &request);
zmsg_destroy(&reply);
break;
}
```
else {
printf("I: Ответ еще не получен, готовимся к повторной попытке через некоторое время... \n");
zclock_sleep(5000); // Повторная попытка через 5 секунд
}
}
zframe_destroy(&uuid);
mdcli_destroy(&session);
return 0;
}Конечно, код выше может быть интегрирован в одну платформу, где программисты не должны знать всех деталей. Если бы у меня было время, я попробовал бы написать такой API, чтобы приложение снова стало состоящим из нескольких строк кода. Эта идея согласуется с концепцией MDP: не повторяйте одно и то же.
```Вот реализация Титаника. Этот сервер использует три потока для обслуживания трёх различных сервисов. Он использует самый примитивный метод сохранения данных: создание файла на диске для каждого запроса. Хотя это просто, это также довольно пугает. Более сложной частью является то, что Титаник поддерживает очередь для хранения этих запросов, что позволяет избежать повторного сканирования директорий.**titanic: Пример брокера Titanic на C**
```c
//
// Гигантский режим - сервис
//
// Реализация серверной части протокола http://rfc.zeromq.org/spec:9
// Давайте сразу скомпилируем, не создавая библиотеки
#include "mdwrkapi.c"
#include "mdcliapi.c"
#include "zfile.h"
#include <uuid/uuid.h>
// Возвращает уникальный идентификатор (UUID) в виде строки
// Вызывающий код отвечает за освобождение памяти, выделенной для UUID
static char *
s_generate_uuid (void)
{
char hex_char[] = "0123456789ABCDEF";
char *uuidstr = zmalloc(sizeof(uuid_t) * 2 + 1);
uuid_t uuid;
uuid_generate(uuid);
int byte_nbr;
for (byte_nbr = 0; byte_nbr < sizeof(uuid_t); byte_nbr++) {
uuidstr[byte_nbr * 2 + 0] = hex_char[uuid[byte_nbr] >> 4];
uuidstr[byte_nbr * 2 + 1] = hex_char[uuid[byte_nbr] & 15];
}
return uuidstr;
}
// Создает имя файла для хранения запроса на основе UUID и возвращает его
#define TITANIC_DIR ".titanic"
static char *
s_request_filename (char *uuid)
{
char *filename = malloc(256);
snprintf(filename, 256, TITANIC_DIR "/%s.req", uuid);
return filename;
}
// Создает имя файла для хранения ответа на основе UUID и возвращает его
static char *
s_reply_filename (char *uuid)
{
char *filename = malloc(256);
snprintf(filename, 256, TITANIC_DIR "/%s.rep", uuid);
return filename;
}
// ---------------------------------------------------------------------
// Гигантский режим - сервис запросов
static void
titanic_request (void *args, zctx_t *ctx, void *pipe)
{
mdwrk_t *worker = mdwrk_new("tcp://localhost:5555", "titanic.request", 0);
zmsg_t *reply = NULL;
while (TRUE)
{
// Если ответ не пустой, отправляем его и получаем новый запрос от агента
zmsg_t *request = mdwrk_recv(worker, &reply);
if (!request)
break; // Прерываем выполнение
// Убедимся, что директория существует
file_mkdir(TITANIC_DIR);
}
``` // Генерируем UUID и сохраняем сообщение на диск
char *uuid = s_generate_uuid();
char *filename = s_request_filename(uuid);
FILE *file = fopen(filename, "w");
assert(file);
zmsg_save(request, file);
fclose(file);
free(filename);
zmsg_destroy(&request);
// Добавляем UUID в очередь
reply = zmsg_new();
zmsg_addstr(reply, uuid);
zmsg_send(&reply, pipe);
// Возвращаем UUID клиенту
// Этот процесс завершается функцией mdwrk_recv() в начале цикла
``` reply = zmsg_new();
zmsg_addstr(reply, "200");
zmsg_addstr(reply, uuid);
free(uuid);
}
mdwrk_destroy(&worker);
}```
// ---------------------------------------------------------------------
// Титаник режим - ответственный сервис
static void *
titanic_reply(void *context)
{
mdwrk_t *worker = mdwrk_new("tcp://localhost:5555", "titanic.reply", 0);
zmsg_t *reply = NULL;
while (TRUE) {
zmsg_t *request = mdwrk_recv(worker, &reply);
if (!request)
break; // Прервать и выйти
char *uuid = zmsg_popstr(request);
char *req_filename = s_request_filename(uuid);
char *rep_filename = s_reply_filename(uuid);
if (file_exists(rep_filename)) {
FILE *file = fopen(rep_filename, "r");
assert(file);
reply = zmsg_load(file);
zmsg_pushstr(reply, "200");
fclose(file);
} else {
reply = zmsg_new();
if (file_exists(req_filename))
zmsg_pushstr(reply, "300"); // Подождать
else
zmsg_pushstr(reply, "400"); // Неизвестно
}
zmsg_destroy(&request);
free(uuid);
free(req_filename);
free(rep_filename);
}
mdwrk_destroy(&worker);
return 0;
}
// ---------------------------------------------------------------------
// Титаник режим - закрытие запроса
static void *
titanic_close(void *context)
{
mdwrk_t *worker = mdwrk_new("tcp://localhost:5555", "titanic.close", 0);
zmsg_t *reply = NULL;
while (TRUE) {
zmsg_t *request = mdwrk_recv(worker, &reply);
if (!request)
break; // Прервать и выйти
char *uuid = zmsg_popstr(request);
char *req_filename = s_request_filename(uuid);
char *rep_filename = s_reply_filename(uuid);
file_delete(req_filename);
file_delete(rep_filename);
free(uuid);
free(req_filename);
free(rep_filename);
zmsg_destroy(&request);
reply = zmsg_new();
zmsg_addstr(reply, "200");
}
mdwrk_destroy(&worker);
return 0;
}
// Обработка запроса, если успешна, возвращает 1
static int
s_service_success(mdcli_t *client, char *uuid)
{
``` // Чтение содержимого запроса, первая фрейм — имя сервиса
char *filename = s_request_filename(uuid);
FILE *file = fopen(filename, "r");
free(filename);
// Если клиент уже закрыл этот запрос, вернуть 1
if (!file)
return 1;
zmsg_t *request = zmsg_load(file);
fclose(file);
zframe_t *service = zmsg_pop(request);
char *service_name = zframe_strdup(service);
// Используем протокол MMI для проверки доступности сервиса
zmsg_t *mmi_request = zmsg_new();
zmsg_add(mmi_request, service);
zmsg_t *mmi_reply = mdcli_send(client, "mmi.service", &mmi_request);
int service_ok = (mmi_reply
&& zframe_streq(zmsg_first(mmi_reply), "200"));
zmsg_destroy(&mmi_reply);
if (service_ok) {
zmsg_t *reply = mdcli_send(client, service_name, &request);
if (reply) {
filename = s_reply_filename(uuid);
FILE *file = fopen(filename, "w");
assert(file);
zmsg_save(reply, file);
fclose(file);
free(filename);
return 1;
}
zmsg_destroy(&reply);
} else {
zmsg_destroy(&request);
}
free(service_name);
return 0;
}```c
int main(int argc, char *argv[]) {
int verbose = (argc > 1 && streq(argv[1], "-v"));
zctx_t *ctx = zctx_new();
// Создаем сессию клиента MDP
mdcli_t *client = mdcli_new("tcp://localhost:5555", verbose);
mdcli_set_timeout(client, 1000); // 1 секунда
mdcli_set_retries(client, 1); // Попробовать один раз
void *request_pipe = zthread_fork(ctx, titanic_request, NULL);
zthread_new(ctx, titanic_reply, NULL);
zthread_new(ctx, titanic_close, NULL);
// Основной цикл
while (true) {
// Если нет активности, будем циклить каждую секунду
zmq_pollitem_t items[] = {{request_pipe, 0, ZMQ_POLLIN, 0}};
int rc = zmq_poll(items, 1, 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Прервать
if (items[0].revents & ZMQ_POLLIN) {
// Убедиться, что каталог файлов существует
file_mkdir(TITANIC_DIR);
// Добавляем UUID в очередь, используя "-" для обозначения ожидающего запроса
zmsg_t *msg = zmsg_recv(request_pipe);
if (!msg)
break; // Прервать
FILE *file = fopen(TITANIC_DIR "/queue", "a");
char *uuid = zmsg_popstr(msg);
fprintf(file, "-%s\n", uuid);
fclose(file);
free(uuid);
zmsg_destroy(&msg);
}
// Распределение
//
char entry[] = "? . . . . . . . :. . . . . . . :. . . . . . . :";
FILE *file = fopen(TITANIC_DIR "/queue", "r+");
while (file && fread(entry, 33, 1, file) == 1) {
// Обработка запросов с префиксом UUID "-"
if (entry[0] == '-') {
if (verbose)
printf("I: Начало обработки запроса %s\n", entry + 1);
if (s_service_success(client, entry + 1)) {
// Отметка как обработано
fseek(file, -33, SEEK_CUR);
fwrite("+", 1, 1, file);
fseek(file, 32, SEEK_CUR);
}
}
}
}
}
``` // Пропуск последней строки
if (fgetc(file) == '\r')
fgetc(file);
if (zctx_interrupted)
break;
}
if (file)
fclose(file);
}
mdcli_destroy(&client);
return 0;
}
```
```При тестировании откройте mdbroker и titanic, затем запустите ticlient и начните запускать любое количество mdworkers. Вы увидите, что клиент получил ответ.Несколько пояснений:
* Мы используем протокол MMI для запроса к агенту о доступности услуги, что аналогично логике в MDP;
* Мы используем протокол inproc для связи основного цикла с сервисом titanic.request, чтобы сохранять новые запросы. Это позволяет избежать постоянного сканирования директорий основным циклом для чтения всех файлов запросов и их сортировки по времени.
Этот пример программы не должен заботиться о своей производительности (она будет очень низкой, хотя я не тестировал это), а должна демонстрировать надежный способ коммуникации. Вы можете протестировать это, открыв агента, титана, worker'ов и клиента, используя параметр -v для вывода трассировки информации, затем открывая и закрывая агента, титана или worker'ов (клиент нельзя закрывать), и вы увидите, что все запросы получили ответ.
Если вы хотите использовать режим титана в реальных условиях, вы, вероятно, захотите узнать, как сделать его быстрее. Вот мои предложения: * Используйте один файл на диске для хранения всех данных. Операционная система эффективнее обрабатывает большие файлы, чем множество маленьких.
* Используйте циклическую структуру для организации этого файла на диске, чтобы новые запросы могли быть последовательно записаны в него. Одиночный поток может эффективно записывать данные на диск при полной загрузке.
* Сохраняйте индекс в памяти и восстанавливайте его при запуске программы. Это позволяет экономить место на диске для кэша и безопасно сохранять индекс на диске. Вам потребуется механизм fsync для сохранения каждого запроса; либо вы можете ждать несколько миллисекунд, если потеря нескольких тысяч запросов вас не беспокоит.
* Если возможно, используйте SSD для повышения скорости.
* Предварительно выделяйте пространство для файла на диске или увеличьте размер блока выделения, чтобы избежать фрагментации и обеспечить последовательное чтение и запись.Кроме того, я не рекомендую хранить сообщения в базе данных или использовать высокопроизводительные кэши ключей, так как они дороже одного файла на диске.
Если вы хотите сделать режим титана более надёжным, вы можете копировать запросы на другой сервер, чтобы не беспокоиться о воздействии ядерного удара на основной процесс.
Если вы хотите сделать режим титана быстрее, но готовы пожертвовать некоторой надёжностью, вы можете хранить запросы и ответы в памяти. Это позволит использовать эту службу как автономную сеть, однако если сама служба титана прекратит работу, я ничем не смогу помочь.
### Высокоуровневый симметричный узел (режим близнецов)
#### Обзор
Режим близнецов представляет собой пару надёжных узлов с механизмом главного и резервного узлов. В любой момент времени один узел выполняет роль основного узла (мастер), принимая все запросы от клиентов, а другой узел служит резервным узлом (слейв). Узлы постоянно мониторят друг друга, и если основной узел исчезает из сети, резервный узел немедленно заменяет его.
Режим близнецов был разработан Pieter Hintjens и Martin Sustrik и используется в сервере OpenAMQ компании iMatix. Основные идеи дизайна:
* Предоставление простого решения для обеспечения высокой надёжности;
* Легкость понимания и использования;
* Возможность надёжного переключения при отказе.```textdiagram
+------------+ +------------+
| | | |
| Primary |<------------->| Backup |
| "master" | | "slave" |
| | | |
+------------+ +------------+
^
|
|
|
+-----+------+
| |
| Client |
| |
+------------+
Рисунок # - Парная высокодоступность, нормальная работа
```
Предположим, что у нас есть группа серверов, работающих в режиме близнецов. Вот возможные сценарии отказа:
1. Основной узел выходит из строя из-за аппаратного сбоя (отключение питания, пожар и т.д.), и приложение немедленно переходит на соединение с резервным узлом;
2. Основной узел теряет связь из-за сетевых проблем (например, роутер был поражен молнией), и приложение немедленно переходит на соединение с резервным узлом;
3. Услуга на основном узле была случайно завершена техническим персоналом и не может автоматически восстановиться.
Шаги восстановления:
1. Технический персонал диагностирует причину отказа основного узла;
2. Резервный узел отключается, что приводит к кратковременному прерыванию услуги;
3. После того как все приложения переключатся на основной узел, технический персонал запускает резервный узел.
Процесс восстановления осуществляется вручную. Жестокий опыт показывает, что автоматическое восстановление может быть очень опасным:* Отказ может вызвать кратковременное прерывание услуги в течение 10-30 секунд. Если это действительно внезапная ситуация, лучше всего остановить основной узел, так как немедленное восстановление может привести к еще одному прерыванию услуги в течение 10-30 секунд. Лучше всего временно прекратить использование услуги.
* При наличии экстренной ситуации можно записывать причины отказа во время ремонта, вместо автоматического восстановления. Это позволяет администратору использовать свой опыт для защиты от следующего внезапного события.
* Наконец, если автоматическое восстановление действительно успешно, администратор не сможет определить причину отказа, поэтому не сможет провести анализ. Процесс восстановления после отказа в режиме зеркального репликации состоит в следующем: после устранения проблемы на основном узле, резервный узел отключается, а затем через некоторое время снова включается:
```textdiagram
+------------+ +------------+
| | | |
| Основной |<------------->| Резервный |
| "рабочий" | | "ведущий" |
| | | |
+------------+ +------------+
^
|
+-----------------------------+
|
+-----+------+
| |
| Клиент |
| |
+------------+
Рисунок # - Высокоуровневая пара, во время переключения
```Процесс отключения в режиме зеркального репликации имеет два варианта:
1. Сначала отключается резервный узел, затем, после некоторого времени, отключается основной узел;
1. Одновременно отключаются основной и резервный узлы, при этом интервал времени между отключением не превышает нескольких секунд.
При отключении интервал времени должен быть меньше времени переключения, чтобы избежать потери соединения клиентом, повторного подключения и последующей потери соединения, что может вызвать жалобы пользователей.
#### Подробные требования
Зеркальная репликация может быть очень простой, но при этом работать эффективно. В действительности, данная реализация прошла три версии, предыдущие версии были слишком сложными и пытались делать слишком много, поэтому они были отвергнуты. Нам требуется только базовая функциональность, которая обеспечивает понятное, легко разрабатываемое и надёжное решение.Вот подробные требования к данной архитектуре:
* Проблемы, требующие использования режима "Твин-Стар" (Twin-Star): это ситуации, когда система подвергается катастрофическому воздействию, такому как отказ аппаратного обеспечения, пожар, случайные события и т. д. Для других типичных сбоев серверов можно использовать более простые методы.
* Время восстановления после сбоя должно составлять менее 60 секунд, а в идеальном случае — менее 10 секунд;
* Процесс переключения на резервное оборудование (failover) должен происходить автоматически, тогда как процесс восстановления системы (recover) должен выполняться вручную. Мы хотим, чтобы приложение автоматически переключалось с основного оборудования на резервное при возникновении сбоя, но не хотели бы, чтобы оно автоматически переключалось обратно на основное оборудование до того, как проблема будет решена, так как это может привести к повторному отказу основного оборудования.
* Логика программы должна быть максимально простой и удобной для использования, желательно, чтобы она была упакована в API;
* Необходимо предоставить явное указание на то, какое оборудование является основным, чтобы избежать состояния "раздвоения личности", когда оба сервера считают себя основными;
* Порядок запуска двух серверов не должен быть ограничен;* При запуске или остановке основного и резервного оборудования не требуется изменения конфигурации клиента, хотя это может привести к прерыванию соединения;
* Администратор должен иметь возможность одновременно мониторить оба сервера;
* Между двумя серверами должно быть специальное высокоскоростное сетевое соединение, которое должно поддерживать маршрутизацию по конкретному IP-адресу. Делаем следующие предположения: * Одна резервная машина обеспечивает достаточную защиту, поэтому нет необходимости в других механизмах резервного копирования;
* Основная и резервная машины должны быть способны предоставлять полный набор услуг и выдерживать одинаковые нагрузки, без необходимости использования балансировки нагрузки;
* Бюджет позволяет иметь резервную машину, которая большую часть времени будет простаивать.
Режим "Твин-Стар" не использует:
* Несколько резервных машин или балансировку нагрузки между основной и резервной машинами. В этом режиме резервная машина всегда находится в состоянии покоя, пока основная машина работает;
* Обработку устойчивых сообщений или транзакций. Мы предполагаем, что подключенная сеть является ненадежной (или недоверенной);
* Автоматическое поисковое открытие сети. Режим "Твин-Стар" настраивается вручную, они знают о существовании друг друга, а приложение знает о существовании режима "Твин-Стар";
* Синхронизацию состояния между основной и резервной машинами. Все состояния сервера должны быть восстановлены приложением.
Ниже приведены некоторые термины, используемые в режиме "Твин-Стар": * **Основная машина** - обычно это машина, работающая в качестве мастера;
* **Резервная машина** - обычно это машина, работающая в качестве слейва, которая становится мастером, когда основная машина исчезает из сети;
* **Мастер** - машина, принимающая запросы от приложения в режиме "Твин-Стар"; в любой момент времени только одна машина является мастером;
* **Слейв** - машина, которая становится мастером, когда мастер исчезает.Шаги настройки режима "Твин-Стар":
1. Убедитесь, что основная машина знает местоположение резервной машины;
1. Убедитесь, что резервная машина знает местоположение основной машины;
1. Настройте время восстановления после отказа; конфигурация двух машин должна быть одинаковой.
Одним из важнейших параметров является интервал проверки состояния другой машины и время, через которое будут предприниматься действия. В нашем примере время восстановления после отказа установлено в 2000 миллисекунд. Если резервная машина не получает ответ от основной машины за это время, она становится мастером. Однако, если вы запускаете основную машину через скрипт командной строки, вам может потребоваться увеличить это время, чтобы избежать ситуации, когда резервная машина становится мастером во время процесса восстановления основной машины.
Чтобы сделать клиентское приложение совместимым с режимом "Твин-Стар", вам потребуется: 1. Знать адреса двух серверов;
1. Попытаться подключиться к основному серверу; если это невозможно, подключиться к резервному серверу;
1. Обнаруживать потерянное соединение, обычно с помощью механизма "сердцебиения";
1. Попытаться повторно подключиться к основному серверу, затем к резервному серверу; интервал между попытками должен быть больше времени восстановления после отказа;
1. Восстановить все необходимые данные состояния сервера;
1. Если требуется обеспечение надежности, повторно отправьте сообщения, которые были потеряны в результате отказа. Это не простая задача, поэтому мы обычно упаковываем её в API для использования программистами.Основные ограничения двойной звездной модели следующие:
* Серверный процесс не может включать более одного симметричного двойного звездного узла;
* Узел может иметь только один резервный узел;
* Когда резервный узел находится в режиме slave, он не обрабатывает никаких запросов;
* Резервный узел должен быть способен выдерживать все запросы приложения;
* Время восстановления после отказа не может быть настроено во время выполнения;
* Клиентское приложение должно выполнять некоторую работу по повторному подключению.
#### Предотвращение психического расщепления
Симптом "психического расщепления" означает, что различные части кластера одновременно считают себя мастером и прекращают проверять друг друга. Алгоритмы двойной звездной модели снижают вероятность возникновения этого симптома: мастер и резервный узлы определяют, являются ли они мастером, проверяя, получили ли они запросы от приложений, а также исчез ли другой узел из сети.
Однако в некоторых случаях двойная звезда также может столкнуться с психическим расщеплением. Например, если мастер и резервный узлы были настроены в двух разных зданиях, где каждое здание имеет свои локальные сети с приложениями. В этом случае, когда связь между двумя зданиями прерывается, мастер и резервный узлы будут принимать и обрабатывать запросы в своих зданиях соответственно.Чтобы предотвратить психическое расщепление, нам следует использовать специализированную сеть для соединения мастерского и резервного узлов; самым простым решением является использование twisted pair cable для их соединения.
Мы не можем развертывать двойную звезду на двух разных островах для обслуживания приложений на каждом острове. В этом случае мы будем использовать механизмы надежности, такие как федеральная модель.
Лучшим, но наиболее экстремальным подходом будет полное разделение соединений между двумя машинами и соединений приложений, даже до использования различных сетевых адаптеров, а не только различных портов. Это делается для того, чтобы упростить диагностику ошибок в будущем.
#### Реализация двойной звездной модели
Без лишних слов, вот серверный код для двойной звездной модели:**bstarsrv: Бинарный сервер двойной звезды на C**
```c
//
// Близнецовый режим - серверная часть
//
#include "czmq.h"
// Интервал отправки состояния
// Если противоположная сторона не отвечает за два сердцебиения, соединение считается разорванным
#define HEARTBEAT 1000 // В миллисекундах
// Структура состояний сервера
typedef enum {
STATE_PRIMARY = 1, // Основной сервер, ожидает подключения резервного сервера
STATE_BACKUP = 2, // Резервный сервер, ожидает подключения основного сервера
STATE_ACTIVE = 3, // Активное состояние, обрабатывает запросы приложения
STATE_PASSIVE = 4 // Пассивное состояние, не принимает запросы
} state_t;
// Структура событий
typedef enum {
PEER_PRIMARY = 1, // Основной сервер
PEER_BACKUP = 2, // Резервный сервер
PEER_ACTIVE = 3, // Активное состояние
PEER_PASSIVE = 4, // Пассивное состояние
CLIENT_REQUEST = 5 // Запрос клиента
} event_t;
// Конечный автомат
typedef struct {
state_t state; // Текущее состояние
event_t event; // Текущее событие
int64_t peer_expiry; // Время истечения сессии для узла
} bstar_t;
// Выполнение конечного автомата (связывание события со состоянием);
// Возвращает TRUE в случае возникновения исключения.
static Bool
s_state_machine (bstar_t *fsm)
{
Bool exception = FALSE;
// Основной сервер ожидает подключения резервного сервера
// В этом состоянии принимаются события CLIENT_REQUEST
if (fsm->state == STATE_PRIMARY) {
if (fsm->event == PEER_BACKUP) {
``````
}
else
{
// Сервер находится в пассивном состоянии
// Если партнер умер, событие CLIENT_REQUEST будет триггерить восстановление
if (fsm->state == STATE_PASSIVE) {
if (fsm->event == PEER_PRIMARY) {
// Партнер перезапускается - переходит в активное состояние, партнер переходит в пассивное состояние.
printf ("I: Узел (slave) перезапускается, может работать как master. \n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_BACKUP) {
``` // Партнер перезапускается - переходит в активное состояние, партнер переходит в пассивное состояние.
printf ("I: Узел (slave) перезапускается, может работать как master. \n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_PASSIVE) {
// Если есть два slave, кластер не будет отвечать
printf ("E: Критическая ошибка: два slave. Выход... \n");
exception = TRUE;
}
else
if (fsm->event == CLIENT_REQUEST) {
// Если таймаут сердцебиения, партнер становится master;
// Это поведение было вызвано запросом клиента.
assert (fsm->peer_expiry > 0);
if (zclock_time () >= fsm->peer_expiry) {
// Партнер умер, переходит в активное состояние.
printf ("I: Восстановление после отказа, может работать как master. \n");
fsm->state = STATE_ACTIVE;
}
else
// Партнер жив, отклоняет запрос.
exception = TRUE;
}
}
return exception;
}
int main (int argc, char *argv [])
{
// Командные аргументы могут быть:
// -p запуск как master, на tcp://localhost:5001
// -b запуск как slave, на tcp://localhost:5002
zctx_t *ctx = zctx_new ();
void *statepub = zsocket_new (ctx, ZMQ_PUB);
void *statesub = zsocket_new (ctx, ZMQ_SUB);
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
bstar_t fsm = { 0 };
if (argc == 2 && streq (argv [1], "-p")) {
printf ("I: Master, ожидание подключения slave. \n");
zsocket_bind (frontend, "tcp://*:5001");
zsocket_bind (statepub, "tcp://*:5003");
zsocket_connect (statesub, "tcp://localhost:5004");
fsm.state = STATE_PRIMARY;
}
else
if (argc == 2 && streq (argv [1], "-b")) {
printf ("I: Slave, ожидание подключения master. \n");
zsocket_bind (frontend, "tcp://*:5002");
zsocket_bind (statepub, "tcp://*:5004");
zsocket_connect (statesub, "tcp://localhost:5003");
fsm.state = STATE_BACKUP;
}
else {
printf ("Использование: bstarsrv { -p | -b }\n");
zctx_destroy (&ctx);
exit (0);
}
}// Устанавливаем время следующей отправки состояния
int64_t send_state_at = zclock_time() + HEARTBEAT;
while (! zctx_interrupted) {
zmq_pollitem_t items[] = {
{frontend, 0, ZMQ_POLLIN, 0},
{statesub, 0, ZMQ_POLLIN, 0}
};
int time_left = (int)((send_state_at - zclock_time()));
if (time_left < 0)
time_left = 0;
int rc = zmq_poll(items, 2, time_left * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Контекст был закрыт
if (items[0].revents & ZMQ_POLLIN) {
// Получено запрос от клиента
zmsg_t *msg = zmsg_recv(frontend);
fsm.event = CLIENT_REQUEST;
if (s_state_machine(&fsm) == FALSE)
// Отправляем ответ
zmsg_send(&msg, frontend);
else
zmsg_destroy(&msg);
}
if (items[1].revents & ZMQ_POLLIN) {
// Получено состояние, как событие
char *message = zstr_recv(statesub);
fsm.event = atoi(message);
free(message);
if (s_state_machine(&fsm))
break; // Ошибка, выходим.
fsm.peer_expiry = zclock_time() + 2 * HEARTBEAT;
}
// Временная отправка состояния
if (zclock_time() >= send_state_at) {
char message[2];
sprintf(message, "%d", fsm.state);
zstr_send(statepub, message);
send_state_at = zclock_time() + HEARTBEAT;
}
}
if (zctx_interrupted)
printf("W: Прерывание\n");
// Закрываем сокеты и контекст
zctx_destroy(&ctx);
return 0;
```
```Вот клиентский код:```**bstarcli: Клиент двойной звезды на C**
```c
//
// Двойная звезда - клиент
//
#include "czmq.h"
#define REQUEST_TIMEOUT 1000 // миллисекунд
#define SETTLE_DELAY 2000 // время ожидания
int main (void)
{
zctx_t *ctx = zctx_new ();
char *server [] = { "tcp://localhost:5001", "tcp://localhost:5002" };
uint server_nbr = 0;
printf ("I: Подключаюсь к серверу %s...\n", server [server_nbr]);
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, server [server_nbr]);
int sequence = 0;
while (!zctx_interrupted) {
// Отправка запроса и ожидание ответа
char request [10];
sprintf (request, "%d", ++sequence);
zstr_send (client, request);
int expect_reply = 1;
while (expect_reply) {
// Опрос сокета
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
if (rc == -1)
break; // прерывание
// Обработка ответа
if (items [0].revents & ZMQ_POLLIN) {
// Проверка номера ответа
char *reply = zstr_recv (client);
if (atoi (reply) == sequence) {
printf ("I: Сервер ответил корректно (%s)\n", reply);
expect_reply = 0;
sleep (1); // отправка одного запроса в секунду
}
else {
printf ("E: Некорректный ответ: %s\n",
reply);
}
free (reply);
}
else {
printf ("W: Сервер не отвечает, повторная попытка подключения\n");
// Уничтожение сокета
zsocket_destroy (ctx, client);
server_nbr = (server_nbr + 1) % 2;
zclock_sleep (SETTLE_DELAY);
printf ("I: Подключаюсь к серверу %s...\n",
server [server_nbr]);
client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, server [server_nbr]);
}
}
}
}
``` // Отправка запроса через новый сокет
zstr_send (client, request);
}
}
}
zctx_destroy (&ctx);
return 0;
}
```Запустите следующие команды для тестирования в произвольном порядке:
```
bstarsrv -p # Запуск основного сервера
bstarsrv -b # Запуск резервного сервера
bstarcli
```
Вы можете завершить работу процесса основного сервера, чтобы протестировать механизм восстановления после отказа; затем запустите основной сервер снова и завершите работу процесса резервного сервера, чтобы проверить механизм восстановления. Обратите внимание, что эти события должны быть инициированы клиентом.
Ниже приведена схема состояний служебных процессов. В зелёном состоянии принимаются запросы от клиента, в розовом — отклоняются. События относятся к состоянию партнёра, поэтому "активное состояние партнёра" означает, что партнёрская машина сообщила нам, что она находится в активном состоянии. "Запрос клиента" означает, что мы получили запрос от клиента, а "голосование клиента" указывает на то, что мы получили запрос от клиента и партнёр уже считается погибшим из-за пропущенного голосования.```textdiagram```
Начало /-----------------------\ /----------\
| |Запрос клиента | | Клиент|Запрос |
v | v v | v
+---------+-+ +-----------+ | +-----------+
| | Резервное копирование узла | +------/ | |
| Основной |----------------->| Активный |<--------------\ | Резервный |
| | /------>| |<-----\ | | |
| {o} c9FB | | | {o} c9FB | | | | {o} cF9B |
+-----+-----+ | +-----+-----+ | | +-----+-----+
| | | | | |
Узел|Активный | Узел|Активный | | Узел|Активный
| | v | | |
| | +-----------+ | | |
| | | | | | |
| Узел|Резервный| Ошибка! | Узел|Основной | |
| | | | | | |
| | | {o} cF00 | | | |
| | +-----------+ | | |
| | ^ | | |
| | Узел|Пассивный | Клиент|Голосование|
| | | | | |
| | +-----+-----+ | | |
| | | +------/ | |
| \-------+ Пассивный +---------------/ |
\---------------------->| |<-----------------------/
| {o} cF9B |
+-----------+ Рисунок # - Конечный автомат двойной звезды (Binary Star finite state machine)
```Необходимо отметить, что служебный процесс использует сокеты PUB-SUB для обмена состояниями, другие типы сокетов здесь неприемлемы. Например, сокеты PUSH и DEALER будут заблокированы, если нет соединенного узла; сокеты PAIR не смогут автоматически восстановить соединение после отключения узла; сокеты ROUTER требуют адреса для отправки сообщений.
Это основные ограничения паттерна Двойной Звезды:
* Серверный процесс не может быть частью более чем одного пары Двойной Звезды.
* Основной сервер может иметь только один резервный сервер.
* Резервный сервер не может выполнять полезную работу в режиме резерва.
* Резервный сервер должен быть способен обрабатывать полную нагрузку приложения.
* Настройки failover не могут быть изменены во время выполнения.
* Приложения-клиенты должны выполнять некоторую работу, чтобы получить выгоду от failover.
#### Реактор Двойной Звезды
Мы можем упаковать паттерн Двойной Звезды в класс, похожий на реактор, для последующего использования. В языке C мы используем класс zloop из библиотеки czmq, а другие языки должны иметь соответствующие реализации. Ниже приведен интерфейс bstar на языке C:
```c
// Создает экземпляр паттерна Двойной Звезды, используя локальные (биндинг) и удаленные (соединение) конечные точки для настройки узлов.
bstar_t *bstar_new (int primary, char *local, char *remote);
```// Уничтожает экземпляр
void bstar_destroy (bstar_t **self_p);
// Возвращает базовый zloop реактор для добавления таймеров, читателей, регистрации и отмены функций.
zloop_t *bstar_zloop (bstar_t *self);
// Регистрирует читателя голосования
int bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler, void *arg);
// Регистрирует обработчик конечного автомата
void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);
// Запускает реактор, прекращая его, когда обратный вызов возвращает -1, или когда процесс получает сигнал SIGINT или SIGTERM.
int bstar_start (bstar_t *self);
```
Ниже приведена реализация класса:
// Уничтожает экземпляр
void bstar_destroy (bstar_t **self_p);
// Возвращает базовый zloop реактор для добавления таймеров, читателей, регистрации и отмены функций.
zloop_t *bstar_zloop (bstar_t *self);
// Регистрирует читателя голосования
int bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler, void *arg);
// Регистрирует обработчик конечного автомата
void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);
// Запускает реактор, прекращая его, когда обратный вызов возвращает -1, или когда процесс получает сигнал SIGINT или SIGTERM.
int bstar_start (bstar_t *self);
```**bstar: Ядро класса Двойной Звезды на C**```c
/* =====================================================================
bstar - Бинарный звездный реактор
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
Этот файл является частью руководства по ZeroMQ: http://zguide.zeromq.org
Это свободное программное обеспечение; вы можете распространять его и/или модифицировать его в соответствии с
условиями Генеральной общественной лицензии GNU версии Yöntem 3, выпущенной Free Software Foundation,
или (по вашему выбору) любой более поздней версией этой лицензии.
Это программное обеспечение распространяется в надежде, что оно будет полезным, но
БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ; даже без скрытых гарантий пригодности для использования или соответствия какому-либо назначению.
Смотрите Генеральную общественную лицензию GNU для получения дополнительной информации.
Вы должны были получить копию Генеральной общественной лицензии GNU вместе с этим программным обеспечением.
Если нет, см. <http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "bstar.h"
// Состояния сервера
typedef enum {
STATE_PRIMARY = 1, // Главный сервер, ожидает подключения от второго сервера
STATE_BACKUP = 2, // Второй сервер, ожидает подключения от главного сервера
STATE_ACTIVE = 3 // Активное состояние, обрабатывает запросы приложения
};
``` STATE_PASSIVE = 4 // Пассивное состояние, не принимает запросы
} state_t;
// События узлов диалога
typedef enum {
PEER_PRIMARY = 1, // Главный сервер
PEER_BACKUP = 2, // Второй сервер
PEER_ACTIVE = 3, // Активное состояние
PEER_PASSIVE = 4, // Пассивное состояние
CLIENT_REQUEST = 5 // Запрос клиента
} event_t;
// Интервал времени для отправки информации о состоянии
// Если противоположная сторона не отвечает после двух сердечных сокращений, считается, что соединение разорвано
#define BSTAR_HEARTBEAT 1000 // В миллисекундах
// Структура класса
struct _bstar_t {
zctx_t *ctx; // Приватный контекст
zloop_t *loop; // Цикл реактора
void *statepub; // Публикатор состояния
void *statesub; // Подписчик состояния
state_t state; // Текущее состояние
event_t event; // Текущее событие
int64_t peer_expiry; // Время истечения для определения смерти узла
zloop_fn *voter_fn; // Обработчик сокета голосования
void *voter_arg; // Аргументы для обработчика голосования
}
``````c
typedef struct {
zloop_fn *master_fn; // При переходе в состояние master вызывается
void *master_arg; // Аргументы
zloop_fn *slave_fn; // При переходе в состояние slave вызывается
void *slave_arg; // Аргументы
};
// ---------------------------------------------------------------------
// Выполняет конечный автомат состояний (связывает события со состояниями);
// Возвращает -1 при возникновении ошибки, 0 при успешном выполнении.
static int
s_execute_fsm (bstar_t *self)
{ int rc = 0;
// Основной узел ожидает подключения резервного узла
// В этом состоянии принимаются события CLIENT_REQUEST
if (self->state == STATE_PRIMARY) {
if (self->event == PEER_BACKUP) {
zclock_log("I: Подключен к резервному узлу (slave), можно работать в качестве master.");
self->state = STATE_ACTIVE;
if (self->master_fn)
(self->master_fn)(self->loop, NULL, self->master_arg);
} else if (self->event == PEER_ACTIVE) {
zclock_log("I: Подключен к основному узлу (master), можно работать в качестве slave.");
self->state = STATE_PASSIVE;
if (self->slave_fn)
(self->slave_fn)(self->loop, NULL, self->slave_arg);
} else if (self->event == CLIENT_REQUEST) {
zclock_log("I: Получено запрос от клиента, можно работать в качестве master.");
self->state = STATE_ACTIVE;
if (self->master_fn)
(self->master_fn)(self->loop, NULL, self->master_arg);
}
} else if (self->state == STATE_BACKUP) {
// Резервный узел ожидает подключения основного узла
// В этом состоянии отклоняются события CLIENT_REQUEST
if (self->event == PEER_ACTIVE) {
zclock_log("I: Подключен к основному узлу (master), можно работать в качестве slave.");
self->state = STATE_PASSIVE;
if (self->slave_fn)
(self->slave_fn)(self->loop, NULL, self->slave_arg);
} else if (self->event == CLIENT_REQUEST) {
rc = -1;
}
} else if (self->state == STATE_ACTIVE) {
// Узел находится в активном состоянии
// В этом состоянии принимаются события CLIENT_REQUEST
// Узел покидает активное состояние только при его смерти
if (self->event == PEER_ACTIVE) {
// Если возникают два master, выбрасывается ошибка
}
}
zclock_log("E: Critical error: double master. Exit.");
rc = -1;
}
}
else
// Node is in passive state
}
``````markdown
// If the partner is already dead, the CLIENT_REQUEST event will trigger recovery after failure
if (self->state == STATE_PASSIVE) {
if (self->event == PEER_PRIMARY) {
// Partner is restarting — state changes to active, partner becomes passive.
zclock_log("I: Slave node restarting, may become master.");
self->state = STATE_ACTIVE;
} else if (self->event == PEER_BACKUP) {
// Partner is restarting — state changes to active, partner becomes passive.
zclock_log("I: Backup node restarting, may become master.");
self->state = STATE_ACTIVE;
} else if (self->event == PEER_PASSIVE) {
// If there are two slaves, the cluster will be unavailable
zclock_log("E: Critical error: two slaves. Exit.");
rc = -1;
} else if (self->event == CLIENT_REQUEST) {
// If heartbeat timeout, partner becomes master;
// This behavior is triggered by a client request.
assert(self->peer_expiry > 0);
if (zclock_time() >= self->peer_expiry) {
// Partner is dead, state changes to active.
zclock_log("I: Recovery after failure, may become master.");
self->state = STATE_ACTIVE;
} else {
// Partner is alive, request is rejected.
rc = -1;
}
}
// Call function to handle state change event
if (self->state == STATE_ACTIVE && self->master_fn)
(self->master_fn)(self->loop, NULL, self->master_arg);
}
return rc;
}// ---------------------------------------------------------------------
// Обработчик событий реактора
// Отправка информации о состоянии
int s_send_state(zloop_t *loop, void *socket, void *arg) {
bstar_t *self = (bstar_t *) arg;
zstr_sendf(self->statepub, "%d", self->state);
return 0;
}
// Получение информации о состоянии, запуск конечного автомата
int s_recv_state(zloop_t *loop, void *socket, void *arg) {
bstar_t *self = (bstar_t *) arg;
char *state = zstr_recv(socket);
if (state) {
self->event = atoi(state);
self->peer_expiry = zclock_time() + 2 * BSTAR_HEARTBEAT;
free(state);
}
return s_execute_fsm(self);
}```c
// Получение запроса приложения, проверка возможности принятия
int s_voter_ready(zloop_t *loop, void *socket, void *arg) {
bstar_t *self = (bstar_t *) arg;
}
// Если запрос можно обработать, вызвать функцию
self->event = CLIENT_REQUEST;
if (s_execute_fsm(self) == 0) {
puts("CLIENT REQUEST");
(self->voter_fn)(self->loop, socket, self->voter_arg);
} else {
// Удалить сообщение из очереди ожидания
zmsg_t *msg = zmsg_recv(socket);
zmsg_destroy(&msg);
}
return 0;
}
// ---------------------------------------------------------------------
// Конструктор
bstar_t *
bstar_new(int primary, char *local, char *remote) {
bstar_t *self;
self = (bstar_t *) zmalloc(sizeof(bstar_t));
// Инициализировать двойную звезду
self->ctx = zctx_new();
self->loop = zloop_new();
self->state = primary ? STATE_PRIMARY : STATE_BACKUP;
// Создать публикационный сокет состояния
self->statepub = zsocket_new(self->ctx, ZMQ_PUB);
zsocket_bind(self->statepub, local);
// Создать подписочный сокет состояния
self->statesub = zsocket_new(self->ctx, ZMQ_SUB);
zsocket_connect(self->statesub, remote);
// Установить базовый обработчик событий реактора
zloop_timer(self->loop, BSTAR_HEARTBEAT, 0, s_send_state, self);
zloop_reader(self->loop, self->statesub, s_recv_state, self);
return self;
}
// ---------------------------------------------------------------------
// Деструктор
void
bstar_destroy(bstar_t **self_p) {
assert(self_p);
if (*self_p) {
bstar_t *self = *self_p;
zloop_destroy(&self->loop);
zctx_destroy(&self->ctx);
free(self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Возвращает внутренний объект zloop для добавления дополнительных таймеров, читателей и т. д.
zloop_t *
bstar_zloop(bstar_t *self) {
return self->loop;
}
``` // ---------------------------------------------------------------------
// Создает сокет, подключается к локальной конечной точке, регистрируется как читатель;
// Сокет будет прочитан только если это допустимо для машины состояний;
// Сообщения, полученные с этого сокета, будут использоваться как "голосование";
// Мы требуем, чтобы в режиме двойной звезды был только один "голосующий" сокет.
int
bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler,
void *arg)
{
// Сохраняет оригинальную функцию обратного вызова и аргументы, которые будут использованы позже
void *socket = zsocket_new (self->ctx, type);
zsocket_bind (socket, endpoint);
assert (!self->voter_fn);
self->voter_fn = handler;
self->voter_arg = arg;
return zloop_reader(self->loop, socket, s_voter_ready, self);
}```
Таким образом, наш серверный код станет очень коротким:
**bstarsrv2: Бинарный сервер двойной звезды, использующий основной класс на C**
```c
//
// Сервер двойной звезды, использующий bstar реактивный узел
//
// Прямая компиляция, без создания библиотеки
#include "bstar.c"
// Эхо-сервис
int s_echo(zloop_t *loop, void *socket, void *arg) {
zmsg_t *msg = zmsg_recv(socket);
zmsg_send(&msg, socket);
return 0;
}
```int main(int argc, char *argv[]) {
// Командные аргументы могут быть:
// -p запуск в качестве основного сервера, на tcp://localhost:5001
// -b запуск в качестве резервного сервера, на tcp://localhost:5002
bstar_t *bstar;
if (argc == 2 && streq(argv[1], "-p")) {
printf("I: Основной сервер master, ожидает подключения резервного сервера (slave).\n");
bstar = bstar_new(BSTAR_PRIMARY, "tcp://*:5003", "tcp://localhost:5004");
bstar_voter(bstar, "tcp://*:5001", ZMQ_ROUTER, s_echo, NULL);
} else
if (argc == 2 && streq(argv[1], "-b")) {
printf("I: Резервный сервер slave, ожидает подключения основного сервера (master).\n");
bstar = bstar_new(BSTAR_BACKUP, "tcp://*:5004", "tcp://localhost:5003");
bstar_voter(bstar, "tcp://*:5002", ZMQ_ROUTER, s_echo, NULL);
} else {
printf("Использование: bstarsrvs { -p | -b }\n");
exit(0);
}
bstar_start(bstar);
bstar_destroy(&bstar);
return 0;
}
```### Надежность без промежуточного программного обеспечения (модель Free Agent)
Мы уже рассмотрели множество примеров с использованием промежуточного программного обеспечения, что может показаться противоречащим утверждению о том, что "ZMQ является без промежуточного программного обеспечения". Однако стоит помнить, что в реальной жизни промежуточное программное обеспечение всегда было предметом любви и ненависти. В реальных системах многие сообщения используют промежуточное программное обеспечение для построения распределенных архитектур. Поэтому окончательное решение остается за вами. Это также объясняет, почему, хотя я могу доехать за 10 минут до крупного магазина и купить пять ящиков громкой музыки, я предпочитаю идти пешком за те же 10 минут до местного магазина. Такие экономические соображения (время, усилия, затраты и т. д.) являются важными как в повседневной жизни, так и в архитектуре программного обеспечения.Поэтому ZMQ не требует использования архитектуры с промежуточным программным обеспечением, но всё же предоставляет такие встроенные устройства, которые программисты могут использовать по своему усмотрению. В этом разделе мы откажемся от архитектуры, использующей промежуточные компоненты для обеспечения надёжности, и перейдём к использованию пунт-ту-пойнт архитектуры, то есть Free Agent Pattern, для надёжной передачи сообщений. Примерным приложением будет служба разрешения имен. Одним из распространённых вопросов в ZMQ является следующий: как нам узнать конечные точки, к которым нужно подключиться? Внедрение TCP/IP адресов напрямую в код явно неприемлемо; использование конфигурационных файлов создаёт проблемы с управлением. Представьте себе ситуацию, когда вам нужно настроить сотни компьютеров только для того, чтобы они знали, что IP-адрес google.com равен 74.125.230.82. Функции, которые должны быть реализованы в службе разрешения имен ZMQ: * Преобразование логических имен в один или несколько адресов конечных точек, включая привязку и соединение. В реальных условиях служба имен будет предоставлять набор конечных точек.
* Возможность использования разрешения имен в различных средах, таких как среда разработки и среда производства.
* Служба должна быть надежной, иначе приложение не сможет подключиться к сети.
Предоставление службы разрешения имен для паттерна管家模式 (паттерн管家模式) может быть полезным, хотя экспонирование конечных точек агента также является простым решением. Однако, если использовать службу разрешения имен правильно, она станет единственным экспонируемым интерфейсом, что облегчит управление.
Типы сбоев, которые нам нужно учитывать, включают: отказ сервиса или перезапуск, перегрузка сервиса, сетевые факторы и т. д. Для обеспечения надежности нам необходимо создать группу сервисов, чтобы клиенты могли подключаться к другим сервисам после отказа одного из них. На практике достаточно двух сервисов, но фактически количество сервисов может быть любым.
```textdiagram``` +-----------+ +-----------+ +-----------+
| | | | | |
| Клиент | | Клиент | | Клиент |
| | | | | |
\-----------/ \-----------/ \-----------/
подключиться подключиться подключиться
| | |
| | |
+---------------+---------------+
| | |
| | |
привязка привязка привязка
/-----------\ /-----------\ /-----------\
| | | | | |
| Сервер | | Сервер | | Сервер |
| | | | | |
+-----------+ +-----------+ +-----------+ Рисунок # - Паттерн Freelance
```
В этом архитектурном решении множество клиентов взаимодействуют с небольшим количеством серверов, которые привязывают сокеты к отдельным портам. Это отличается от паттерна **Agent** (агент), где агент имеет другое поведение. У клиента есть несколько вариантов действий:
* Клиент может использовать REQ сокет и ленивый пиратский режим, но ему необходим механизм предотвращения постоянных запросов к уже остановленному серверу.
* Клиент может использовать DEALER сокет и отправлять запросы ко всем серверам. Это просто, но не очень эффективно; клиент использует ROUTER сокет для подключения к конкретному серверу. Но как клиент узнает сокет-идентификатор сервера? Один способ — это сделать так, чтобы сервер активно подключался к клиенту (очень сложный), или закрепить идентификатор сервера в коде (очень запутанный).
### Модель 1: Простое повторное соединение
Давайте начнем с простого подхода, перепишем ленивый пиратский режим, чтобы он мог взаимодействовать с несколькими серверами. При запуске сервера используйте параметры командной строки для указания порта. Затем запустите несколько серверов.
**flserver1: Freelance server, Model One in C**
```c
//
// Freelancer mode - Server - Model One
// Provides echo service
//
#include "czmq.h"
int main (int argc, char *argv [])
{
if (argc < 2) {
printf ("I: syntax: %s <endpoint>\n", argv [0]);
exit (EXIT_SUCCESS);
}
zctx_t *ctx = zctx_new ();
void *server = zsocket_new (ctx, ZMQ_REP);
zsocket_bind (server, argv [1]);
``````c
printf("I: echo endpoint: %s\n", argv[1]);
while (TRUE) {
zmsg_t *msg = zmsg_recv(server);
if (!msg)
break; // Прервать
zmsg_send(&msg, server);
}
if (zctx_interrupted)
printf("W: Interrupt\n");
zctx_destroy(&ctx);
return 0;
}
```
Запустите клиент, указав один или несколько конечных точек:
```**flclient1: Freelance клиент, модель One на C**```c
//
// Freelancer режим - Клиент - Модель 1
// Использует REQ сокет для запроса одного или нескольких серверов
//
#include "czmq.h"
#define REQUEST_TIMEOUT 1000
#define MAX_RETRIES 3 // Количество попыток
static zmsg_t *
s_try_request (zctx_t *ctx, char *endpoint, zmsg_t *request)
{
printf ("I: Попытка запроса сервиса echo по адресу %s...\n", endpoint);
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, endpoint);
// Отправляем запрос и ждём ответа
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, client);
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
zmsg_t *reply = NULL;
if (items [0].revents & ZMQ_POLLIN)
reply = zmsg_recv (client);
// Закрываем сокет
zsocket_destroy (ctx, client);
return reply;
}
int main (int argc, char *argv [])
{
zctx_t *ctx = zctx_new ();
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "Hello world");
zmsg_t *reply = NULL;
int endpoints = argc - 1;
if (endpoints == 0)
printf ("I: Синтаксис: %s <endpoint> ...\n", argv [0]);
else
if (endpoints == 1) {
// Если есть только один адрес, повторяем N раз
int retries;
for (retries = 0; retries < MAX_RETRIES; retries++) {
char *endpoint = argv [1];
reply = s_try_request (ctx, endpoint, request);
if (reply)
break; // Успех
printf ("W: Нет ответа от %s, готовимся к повторной попытке...\n", endpoint);
}
}
else {
// Если есть несколько адресов, пробуем каждый раз по одному разу
int endpoint_nbr;
for (endpoint_nbr = 0; endpoint_nbr < endpoints; endpoint_nbr++) {
char *endpoint = argv [endpoint_nbr + 1];
reply = s_try_request (ctx, endpoint, request);
if (reply)
break; // Успешно
printf ("W: Нет ответа от %s\n", endpoint);
}
}
if (reply)
printf ("Сервис работает нормально\n");
zmsg_destroy (&request);
zmsg_destroy (&reply);
zctx_destroy (&ctx);
return 0;
}
```Для запуска используйте следующие команды:```
flserver1 tcp://*:5555 &
flserver1 tcp://*:5556 &
flclient1 tcp://localhost:5555 tcp://localhost:5556
```
Основной механизм клиента основан на ленивом пиратском алгоритме, то есть после получения одного успешного ответа он завершает работу. Возможны два случая:
* Если существует только один сервер, клиент будет повторять попытки N раз перед тем, как остановиться, что соответствует логике ленивого пиратского алгоритма;
* Если имеется несколько серверов, клиент будет пытаться получить ответ от каждого сервера по очереди, и после получения первого ответа прекратит дальнейшие попытки.
Этот механизм дополняет пиратский алгоритм, позволяя ему работать в случае наличия только одного сервера.
Однако, данная концепция не может быть использована в реальных приложениях: если множество клиентов подключены к серверу, а основной сервер выходит из строя, все клиенты должны будут продолжить выполнение только после истечения времени ожидания.
#### Модель 2: Батч-отправка
Теперь давайте воспользуемся сокетами DEALER. Наша цель — получить ответ за минимальное время, не завися от состояния основного сервера. Для этого можно применить следующие меры:
* Подключиться ко всем серверам;
* При получении запроса отправить его одновременно всем серверам;
* Ждать первого ответа;
* Игнорировать остальные ответы.При такой реализации клиента, после отправки запроса все серверы получат его и отправят ответ. Если какой-то сервер отключится, ZMQ может переслать запрос другим серверам, что приведет к получению некоторых серверов двух одинаковых запросов.
Более сложной проблемой является невозможность клиента определить количество полученных ответов, что может привести к путанице.
Можно пронумеровать запросы и игнорировать несоответствующие ответы. Необходимо модифицировать серверы, чтобы они включали номер запроса в своих ответах:
**flserver2: Freelance server, Model Two in C**```c
//
// Freelancer Mode - Server - Model 2
// Возвращает сообщение "OK" с номером запроса
//
#include "czmq.h"
int main (int argc, char *argv [])
{
if (argc < 2) {
printf ("I: syntax: %s <endpoint>\n", argv [0]);
exit (EXIT_SUCCESS);
}
zctx_t *ctx = zctx_new ();
void *server = zsocket_new (ctx, ZMQ_REP);
zsocket_bind (server, argv [1]);
printf ("I: Server is ready %s\n", argv [1]);
while (TRUE) {
zmsg_t *request = zmsg_recv (server);
if (!request)
break; // Прерывание
// Проверка правильности содержимого запроса
assert (zmsg_size (request) == 2);
zframe_t *address = zmsg_pop (request);
zmsg_destroy (&request);
zmsg_t *reply = zmsg_new ();
zmsg_add (reply, address);
zmsg_addstr (reply, "OK");
zmsg_send (&reply, server);
}
if (zctx_interrupted)
printf ("W: interrupted\n");
zctx_destroy (&ctx);
return 0;
}
```Клиентский код:
**flclient2: Freelance client, модель второго поколения на C**```c
//
// Freelancer Mode - Client - Model 2
// Использует DEALER сокет для отправки пакетных сообщений
//
#include "czmq.h"
// Продолжительность таймаута
#define GLOBAL_TIMEOUT 2500
// Обертывает клиентский API в класс
#ifdef __cplusplus
extern "C" {
#endif
// Объявляет структуру класса
typedef struct _flclient_t flclient_t;
```flclient_t *
flclient_new(void);
void
flclient_destroy(flclient_t **self_p);
void
flclient_connect(flclient_t *self, char *endpoint);
zmsg_t *
flclient_request(flclient_t *self, zmsg_t **request_p);
#ifdef __cplusplus
}
#endif
int main(int argc, char *argv[]) {
if (argc == 1) {
printf("I: синтаксис: %s <endpoint> . . . \n", argv[0]);
exit(EXIT_SUCCESS);
}
// Создает клиент в режиме фрилансера
flclient_t *client = flclient_new();
// Подключается к каждому конечной точке
int argn;
for (argn = 1; argn < argc; argn++) {
flclient_connect(client, argv[argn]);
}
// Отправляет набор запросов и записывает время
int requests = 10000;
uint64_t start = zclock_time();
while (requests--) {
zmsg_t *request = zmsg_new();
zmsg_addstr(request, "random name");
zmsg_t *reply = flclient_request(client, &request);
if (!reply) {
printf("E: Сервис разрешения имени недоступен, выход\n");
break;
}
zmsg_destroy(&reply);
}
printf("Среднее время выполнения запроса: %d микросекунд\n",
(int)(zclock_time() - start) / 10);
flclient_destroy(&client);
return 0;
}
// --------------------------------------------------------------------
// Классовая структура
struct _flclient_t {
zctx_t *ctx; // Контекст
void *socket; // SOCKET DEALER используется для связи с сервером
size_t servers; // Количество подключенных серверов
uint sequence; // Количество отправленных запросов
};
// --------------------------------------------------------------------
// Конструктор
flclient_t *
flclient_new(void) {
flclient_t *self;
self = (flclient_t *)zmalloc(sizeof(flclient_t));
self->ctx = zctx_new();
self->socket = zsocket_new(self->ctx, ZMQ_DEALER);
return self;
}```markdown
// --------------------------------------------------------------------
// Деструктор
void
flclient_destroy(flclient_t **self_p) {
assert(self_p);
if (*self_p) {
flclient_t *self = *self_p;
zctx_destroy(&self->ctx);
}
free(self);
*self_p = NULL;
}
}
// Подключиться к новому серверному конечной точке
void
flclient_connect (flclient_t *self, char *endpoint)
{
assert (self);
zsocket_connect (self->socket, endpoint);
self->servers++;
}
// --------------------------------------------------------------------
// Отправить запрос и получить ответ
// Удалить запрос после отправки
zmsg_t *
flclient_request (flclient_t *self, zmsg_t **request_p)
{
assert (self);
assert (*request_p);
zmsg_t *request = *request_p;
// Добавить номер и пустую строку в сообщение
char sequence_text [10];
sprintf (sequence_text, "%u", ++self->sequence);
zmsg_pushstr (request, sequence_text);
zmsg_pushstr (request, "");
// Отправить запрос всем подключенными серверам
int server;
for (server = 0; server < self->servers; server++) {
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->socket);
}
// Получить ответ от любого сервера
// Поскольку мы можем poll несколько раз, каждый раз вычисляем время окончания
zmsg_t *reply = NULL;
uint64_t endtime = zclock_time () + GLOBAL_TIMEOUT;
while (zclock_time () < endtime) {
zmq_pollitem_t items [] = { { self->socket, 0, ZMQ_POLLIN, 0 } };
zmq_poll (items, 1, (endtime - zclock_time ()) * ZMQ_POLL_MSEC);
if (items [0].revents & ZMQ_POLLIN) {
// Формат ответа [пустое][последовательность][OK]
reply = zmsg_recv (self->socket);
assert (zmsg_size (reply) == 3);
free (zmsg_popstr (reply));
char *sequence = zmsg_popstr (reply);
int sequence_nbr = atoi (sequence);
free (sequence);
if (sequence_nbr == self->sequence)
break;
}
}
zmsg_destroy (request_p);
return reply;
}
```
Несколько пояснений:
``` * Клиентская часть упакована в класс API, который скрывает сложный код.
* Клиентская часть отменяет поиск доступного сервера через несколько секунд.
* Клиентская часть должна создать легитимный REP-конверт, поэтому требуется добавление пустого фрейма.
В программе клиент отправляет 10 000 запросов на разрешение имени (хотя они и являются вымышленными) и вычисляет среднее время выполнения. На моей тестовой машине, при наличии одного сервера, время составляет 60 микросекунд; при наличии трёх серверов — 80 микросекунд.
Преимущества и недостатки этого подхода:
* Преимущество: простота, легкость понимания и реализации;
* Преимущество: быстрое выполнение с механизмом повторной попытки;
* Недостаток: использование дополнительной сетевой пропускной способности;
* Недостаток: невозможность установки приоритета для серверов, таких как основной и дополнительный сервисы;
* Недостаток: сервер не может одновременно обрабатывать несколько запросов.
#### Модель три - сложная и злоключительная
Модель массового отправления кажется нереалистичной, поэтому давайте исследуем последнюю, крайне сложную модель. Возможно, после её реализации мы снова вернёмся к модели массового отправления, ха-ха, это моя обычная практика.Мы можем заменить используемые клиентом сокеты на ROUTER, чтобы иметь возможность отправлять запросы конкретному серверу, прекращать отправку запросов умершему серверу, тем самым делая систему максимально умной. Также мы можем заменить сокеты сервера на ROUTER, чтобы преодолеть ограничение однопоточности.Однако соединение двух временных сокетов типа ROUTER-ROUTER невозможно, так как узел генерирует идентификатор сокета для другого узла только после получения первого сообщения. Единственный способ — использовать устойчивый сокет у одного из узлов; лучшим решением будет использование устойчивого сокета у клиента, который знает идентификатор сервера.
Чтобы избежать создания новых конфигурационных параметров, мы используем конечную точку сервера в качестве идентификатора сокета.
Помните, как работают идентификаторы ZMQ-сокетов. ROUTER-сокет сервера устанавливает свой идентификатор (до привязки), когда клиент подключается, происходит обмен идентификаторами через рукопожатие. ROUTER-сокет клиента отправляет пустое сообщение, а сервер генерирует случайный UUID для клиента. Затем сервер отправляет свой идентификатор клиенту.
Таким образом, клиент может отправлять сообщения конкретному серверу. Однако остаётся проблема: мы не знаем, когда сервер завершит этот процесс рукопожатия. Если сервер доступен, это может занять доли миллисекунды. Если нет, это может занять очень долгое время.Возникает противоречие: нам необходимо знать, когда сервер успешно подключился и готов к работе. В режиме Freelancer (свободного агента) в отличие от режима middleware (промежуточного слоя), сервер должен сначала отправить запрос, чтобы получить ответ. Поэтому до того как сервер отправит сообщение клиенту, клиент должен сначала запросить сервер, что кажется невозможным.У меня есть решение этой проблемы, которое заключается в использовании пакетной отправки. Здесь отправляется не реальный запрос, а пробный пинг (PING-PONG). Когда получено подтверждение, это означает, что другой конец активен.
Давайте установим протокол, который определяет, как этот пинг передается в режиме Freelancer:
* http://rfc.zeromq.org/spec:10
Реализация этого протокола на стороне сервера довольно проста. Вот пример модифицированного echo-сервера:
**flserver3: Freelance сервер, модель 3 на C**
```c
//
// Свободный агент - сервер - модель 3
// Использует ROUTER-ROUTER сокеты для связи; однопоточный.
//
#include "czmq.h"
int main (int argc, char *argv []) {
int verbose = (argc > 1 && streq (argv [1], "-v"));
zctx_t *ctx = zctx_new ();
// Подготовка серверного сокета, его идентификатор и конечная точка совпадают
char *bind_endpoint = "tcp://*:5555";
char *connect_endpoint = "tcp://localhost:5555";
void *server = zsocket_new (ctx, ZMQ_ROUTER);
zmq_setsockopt (server,
ZMQ_IDENTITY, connect_endpoint, strlen (connect_endpoint));
zsocket_bind (server, bind_endpoint);
printf ("I: Сервер готов %s\n", bind_endpoint);
while (!zctx_interrupted) {
zmsg_t *request = zmsg_recv (server);
if (verbose && request)
zmsg_dump (request);
if (!request)
break; // Прерывание
}
``` // Frame 0: Идентификатор клиента
// Frame 1: Пинг или контрольная информация клиента
// Frame 2: Содержимое запроса
zframe_t *address = zmsg_pop(request);
zframe_t *control = zmsg_pop(request);
zmsg_t *reply = zmsg_new();
if (zframe_streq(control, "PONG")) {
zmsg_addstr(reply, "PONG");
} else {
zmsg_add(reply, control);
zmsg_addstr(reply, "OK");
}
zmsg_destroy(&request);
zmsg_push(reply, address);
if (verbose && reply) {
zmsg_dump(reply);
}
zmsg_send(&reply, server);
}
if (zctx_interrupted) {
printf("W: Прерывание\n");
} zctx_destroy(&ctx);
return 0;
}
```Однако клиентская часть в режиме фриланса будет больше. Для ясности мы разделим её на два класса. Сначала рассмотрим верхний уровень программы:
**flclient3: Клиентский режим фриланса, модель 3 на C**
```c
//
// Клиентский режим фриланса - Модель 3
// Использует класс flcliapi для обёртки режима фриланса
//
// Прямая компиляция, без создания библиотеки
#include "flcliapi.c"
int main(void)
{
// Создание экземпляра клиента в режиме фриланса
flcliapi_t *client = flcliapi_new();
// Подключение к серверным точкам
flcliapi_connect(client, "tcp://localhost:5555");
flcliapi_connect(client, "tcp://localhost:5556");
flcliapi_connect(client, "tcp://localhost:5557");
// Отправка случайных запросов, вычисление времени
int requests = 1000;
uint64_t start = zclock_time();
while (requests--) {
zmsg_t *request = zmsg_new();
zmsg_addstr(request, "random name");
zmsg_t *reply = flcliapi_request(client, &request);
if (!reply) {
printf("E: Сервис разрешения имени недоступен, выход\n");
break;
}
zmsg_destroy(&reply);
}
printf("Среднее время выполнения: %d микросекунд\n",
(int)(zclock_time() - start) / 10);
flcliapi_destroy(&client);
return 0;
}
```
Далее представлено более сложное реализование этого режима:
```**flcliapi: API клиента фриланса на C**
```c
/* =====================================================================
flcliapi - Freelance Pattern agent class
Model 3: uses ROUTER socket to address specific services
---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.
This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/
#include "flcliapi.h"
// Timeout for requests
#define GLOBAL_TIMEOUT 3000 // milliseconds
// Interval for heartbeats
#define PING_INTERVAL 2000 // milliseconds
// Time to live for server
#define SERVER_TTL 6000 // milliseconds
// =====================================================================
// Synchronization section, runs at the application level
// ---------------------------------------------------------------------
// Class structure
struct _flcliapi_t {
zctx_t *ctx; // context
void *pipe; // socket used for communication with main thread
};
// ---------------------------------------------------------------------
// Constructor
flcliapi_t *
flcliapi_new(void)
{
flcliapi_t *self;
self = (flcliapi_t *) zmalloc(sizeof(flcliapi_t));
self->ctx = zctx_new();
self->pipe = zthread_fork(self->ctx, flcliapi_agent, NULL);
return self;
}
// ---------------------------------------------------------------------
// Destructor
void
flcliapi_destroy(flcliapi_t **self_p)
{
assert(self_p);
if (*self_p) {
flcliapi_t *self = *self_p;
```
zctx_destroy(&self->ctx);
free(self);
*self_p = NULL;
}
}
// ---------------------------------------------------------------------
// Подключение к новому серверному конечному пункту
// Содержимое сообщения: [CONNECT][endpoint]
void
flcliapi_connect(flcliapi_t *self, char *endpoint)
{
assert(self);
assert(endpoint);
zmsg_t *msg = zmsg_new();
zmsg_addstr(msg, "CONNECT");
zmsg_addstr(msg, endpoint);
zmsg_send(&msg, self->pipe);
zclock_sleep(100); // Ожидание подключения
}
// ---------------------------------------------------------------------
// Отправка запроса и его уничтожение, получение ответа
zmsg_t *
flcliapi_request(flcliapi_t *self, zmsg_t **request_p)
{
assert(self);
assert(*request_p);
zmsg_pushstr(*request_p, "REQUEST");
zmsg_send(request_p, self->pipe);
zmsg_t *reply = zmsg_recv(self->pipe);
if (reply) {
char *status = zmsg_popstr(reply);
if (streq(status, "FAILED")) {
zmsg_destroy(&reply);
free(status);
}
}
return reply;
}
// =====================================================================
// Асинхронная часть, выполняется в фоновом режиме
// ---------------------------------------------------------------------
// Информация о единичном сервере
typedef struct {
char *endpoint; // Серверная конечная точка/идентификатор сокета
uint alive; // В сети ли
int64_t ping_at; // Время следующего пинга
int64_t expires; // Время истечения
} server_t;
server_t *
server_new(char *endpoint)
{
server_t *self = (server_t *)zmalloc(sizeof(server_t));
self->endpoint = strdup(endpoint);
self->alive = 0;
self->ping_at = zclock_time() + PING_INTERVAL;
self->expires = zclock_time() + SERVER_TTL;
return self;
}
void
server_destroy(server_t **self_p)
{
assert(self_p);
if (*self_p) {
server_t *self = *self_p;
free(self->endpoint);
free(self);
*self_p = NULL;
}
}
int```c
server_ping(char *key, void *server, void *socket)
{
server_t *self = (server_t *)server;
if (zclock_time() >= self->ping_at) {
zmsg_t *ping = zmsg_new();
zmsg_addstr(ping, self->endpoint);
zmsg_addstr(ping, "PING");
zmsg_send(&ping, socket);
self->ping_at = zclock_time() + PING_INTERVAL;
}
return 0;
}
int server_tickless(char *key, void *server, void *arg)
{
server_t *self = (server_t *)server;
uint64_t *tickless = (uint64_t *)arg;
if (*tickless > self->ping_at)
*tickless = self->ping_at;
return 0;
}
// ---------------------------------------------------------------------
// Описание бэкграунд-обработчика
typedef struct {
zctx_t *ctx; // Контекст
void *pipe; // Сокет для связи с приложением
void *router; // Сокет для связи с сервером
zhash_t *servers; // Соединённые серверы
zlist_t *actives; // Активные серверы
uint sequence; // Номер запроса
zmsg_t *request; // Текущий запрос
zmsg_t *reply; // Текущий ответ
int64_t expires; // Время истечения запроса
} agent_t;
```agent_t *
agent_new(zctx_t *ctx, void *pipe)
{
agent_t *self = (agent_t *)zmalloc(sizeof(agent_t));
self->ctx = ctx;
self->pipe = pipe;
self->router = zsocket_new(self->ctx, ZMQ_ROUTER);
self->servers = zhash_new();
self->actives = zlist_new();
return self;
}
void
agent_destroy(agent_t **self_p)
{
assert(self_p);
if (*self_p) {
agent_t *self = *self_p;
zhash_destroy(&self->servers);
zlist_destroy(&self->actives);
zmsg_destroy(&self->request);
zmsg_destroy(&self->reply);
free(self);
*self_p = NULL;
}
}
// Вызывается при удалении сервера из списка.
static void
s_server_free(void *argument)
{
server_t *server = (server_t *)argument;
server_destroy(&server);
}
void
agent_control_message(agent_t *self)
{
zmsg_t *msg = zmsg_recv(self->pipe);
char *command = zmsg_popstr(msg);
if (streq(command, "CONNECT")) {
char *endpoint = zmsg_popstr(msg);
printf("I: connecting to %s. . . \n", endpoint);
int rc = zmq_connect(self->router, endpoint);
assert(rc == 0);
server_t *server = server_new(endpoint);
zhash_insert(self->servers, endpoint, server);
zhash_freefn(self->servers, endpoint, s_server_free);
zlist_append(self->actives, server);
server->ping_at = zclock_time() + PING_INTERVAL;
server->expires = zclock_time() + SERVER_TTL;
free(endpoint);
}
else if (streq(command, "REQUEST")) {
assert(!self->request); // Поддерживает цикл запрос-ответ
// Добавляет номер запроса и пустой кадр в начало сообщения
char sequence_text[10];
sprintf(sequence_text, "%u", ++self->sequence);
zmsg_pushstr(msg, sequence_text);
// Получает владение запросом
self->request = msg;
msg = NULL;
// Устанавливает время истечения запроса
self->expires = zclock_time() + GLOBAL_TIMEOUT;
}
free(command);
zmsg_destroy(&msg);
}```c
void
agent_router_message(agent_t *self)
{
zmsg_t *reply = zmsg_recv(self->router);
// Первый кадр — это служебная метка сервера
char *endpoint = zmsg_popstr(reply);
server_t *server =
(server_t *)zhash_lookup(self->servers, endpoint);
assert(server);
free(endpoint);
if (!server->alive) {
zlist_append(self->actives, server);
server->alive = 1;
}
server->ping_at = zclock_time() + PING_INTERVAL;
server->expires = zclock_time() + SERVER_TTL;
// Второй кадр — это номер ответа
char *sequence = zmsg_popstr(reply);
if (atoi(sequence) == self->sequence) {
zmsg_pushstr(reply, "OK");
zmsg_send(&reply, self->pipe);
zmsg_destroy(&self->request);
} else {
zmsg_destroy(&reply);
}
}
``` // ---------------------------------------------------------------------
# Асинхронный фоновый агент поддерживает пул серверов для обработки запросов и ответов.
static void
flcliapi_agent(void *args, zctx_t *ctx, void *pipe)
{
agent_t *self = agent_new(ctx, pipe);
zmq_pollitem_t items[] = {
{ self->pipe, 0, ZMQ_POLLIN, 0 },
{ self->router, 0, ZMQ_POLLIN, 0 }
};
while (! zctx_interrupted) {
# Вычисляет время до срабатывания таймера
uint64_t tickless = zclock_time() + 1000 * 3600;
if (self->request && tickless > self->expires) {
tickless = self->expires;
}
zhash_foreach(self->servers, server_tickless, &tickless);
int rc = zmq_poll(items, 2, (tickless - zclock_time()) * ZMQ_POLL_MSEC);
if (rc == -1) {
break; # Объект контекста был закрыт
}
if (items[0].revents & ZMQ_POLLIN) {
agent_control_message(self);
}
if (items[1].revents & ZMQ_POLLIN) {
agent_router_message(self);
}
# Если нам нужно обработать запрос, отправляем его следующему доступному серверу
if (self->request) {
if (zclock_time() >= self->expires) {
// Запрос истек
zstr_send(self->pipe, "FAILED");
zmsg_destroy(&self->request);
} else {
// Поиск доступного сервера
while (zlist_size(self->actives)) {
server_t *server = (server_t *)zlist_first(self->actives);
if (zclock_time() >= server->expires) {
zlist_pop(self->actives);
server->alive = 0;
} else {
zmsg_t *request = zmsg_dup(self->request);
zmsg_pushstr(request, server->endpoint);
zmsg_send(&request, self->router);
break;
}
}
}
}
}
} // Отключение и удаление просроченных серверов
// Отправка пинга свободным серверам
zhash_foreach (self->servers, server_ping, self->router);
}
agent_destroy (&self);
}
```
Эта группа API использует сложные механизмы, которые мы уже использовали ранее:**Асинхронный фоновый агент**
Клиентская API состоит из двух частей: синхронного класса flcliapi, работающего в потоке приложения, и асинхронного класса agent, работающего в фоновом потоке. Классы flcliapi и agent взаимодействуют через inproc сокет. Все, что связано с ZMQ, заключено внутри API. Класс agent фактически выполняется как мини-агент, отвечающий за коммуникацию с сервером в фоновом режиме. Он пытается подключиться к серверу для обработки запросов каждый раз, когда мы отправляем запрос.
**Механизм ожидания соединения**
Одной из особенностей ROUTER сокета является то, что он немедленно отбрасывает сообщения, которые не могут быть маршрутизированы. Это означает, что если вы сразу отправите сообщение после установления ROUTER-ROUTER соединения с сервером, это сообщение будет потеряно. Класс flcliapi задерживает отправку сообщения на некоторое время. В последующих коммуникациях, поскольку серверный сокет является постоянным, клиент больше не отбрасывает сообщения.
**Пинговая тишина**0MQ будет хранить сообщения для недоступного сервера бесконечно. Поэтому, если клиент будет повторно отправлять PING-сообщения недоступному серверу, когда этот сервер снова станет доступным, он получит множество PING-сообщений одновременно. Вместо того чтобы продолжать отправлять PING-сообщения серверу, который известно, что он недоступен, мы полагаемся на обработку постоянных сокетов 0MQ для доставки старых PING-сообщений, когда сервер снова станет доступным. Как только сервер восстановит соединение, он получит PING-сообщения от всех подключенных к нему клиентов, ответит PONG, и эти клиенты узнают, что сервер снова доступен.**Настройка времени ожидания**
В предыдущих примерах программ мы обычно задавали фиксированное время ожидания для опроса (например, 1 секунду). Этот подход прост, но для устройств, чувствительных к энергопотреблению (например, ноутбуков или смартфонов), пробуждение процессора требует дополнительной энергии. Поэтому, чтобы сэкономить энергию или просто поиграться, мы настроили время ожидания так, чтобы оно истекало только при достижении времени истечения. Это позволяет сократить количество опросов. Мы можем хранить время истечения в списке для удобства проверки.
### Заключение
В этой главе мы рассмотрели много надежных механизмов запрос-ответ, каждая из которых имеет свои преимущества и недостатки. Большинство примеров кода можно использовать непосредственно в производственной среде, хотя они могут быть ещё оптимизированы. Есть два основных типа моделей: модель с использованием middleware и модель без использования middleware.

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/andwp-zguide-cn.git
git@api.gitlife.ru:oschina-mirror/andwp-zguide-cn.git
oschina-mirror
andwp-zguide-cn
andwp-zguide-cn
master