1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/DeleyTask

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
readme.md

RabbitMQ

Установка

Docker установка

Rabbitmq — Official Image | Docker Hub

拉取镜像:
docker pull rabbitmq:3.10.1-management
运行容器:
docker run -d --hostname my-rabbit --name rabbitmq -p 5672:5672 -p 15672:15672  rabbitmq镜像id(只需要填前几位,确保与其他镜像id即可识别)

hostname:容器内的主机名
通讯端口:5672
web界面端口:15672
记得开启5672和15672端口

По умолчанию пользователь и пароль — guest


Rabbitmq初识

  1. Производитель (Publisher) — публикует сообщения в обменник (Exchange) в RabbitMQ.
  2. Обменник (Exchange) — устанавливает соединение с производителем и принимает сообщения от него.
  3. Потребитель (Consumer) — отслеживает сообщения в очереди (Queue) в RabbitMQ.
  4. Очередь (Queue) — обменник распределяет сообщения по определённым очередям.
  5. Маршрут (Routes) — правила, по которым обменник направляет сообщения в очередь.

AMQP 0.9.1 протокол

Подробнее см. официальную документацию:

AMQP 0-9-1 Model Explained — RabbitMQ

AMQP протокол

AMQP (Advanced Message Queuing Protocol) — сетевой протокол для обмена данными между приложениями и посредниками сообщений (message brokers).

Посредники сообщений и их роль

Посредники сообщений (message brokers) получают сообщения от производителей (publishers) и направляют их потребителям (consumers) согласно заданным маршрутам.

Поскольку AMQP — это сетевой протокол, производители, потребители и посредники могут находиться на разных устройствах.

Модель AMQP 0-9-1

Процесс работы AMQP 0-9-1: сообщение (message) отправляется производителем (publisher) в обменник (exchange), который часто сравнивают с почтовым отделением или почтовым ящиком. Затем обменник отправляет сообщения в связанные очереди (queues) согласно маршрутам. Наконец, посредник доставляет сообщения потребителям или они самостоятельно забирают их из очередей.

Производители могут задавать различные свойства (message meta-data) при отправке сообщений. Некоторые свойства могут использоваться посредником, другие остаются невидимыми для него и используются только получающим приложением.

С точки зрения безопасности сеть ненадёжна, и получающее приложение может не обработать сообщение. Поэтому AMQP включает концепцию подтверждения сообщений (message acknowledgements): когда сообщение отправляется из очереди потребителю, потребитель уведомляет посредника об этом. Это может быть сделано автоматически или разработчиком приложения. Пока подтверждение включено, посредник не удаляет сообщение полностью из очереди, пока не получит подтверждение от потребителя.

В некоторых случаях, например, если сообщение не удаётся направить, оно может быть возвращено производителю и отброшено. Или, если посредник выполняет операцию задержки, сообщение помещается в так называемую очередь недоставленных сообщений (dead letter queue). В таких случаях производитель может выбрать параметры обработки этих ситуаций.

Очереди, обменники и привязки вместе называются AMQP сущностями (AMQP entities).


Программируемость AMQP

AMQP 0-9-1 — программируемый протокол. AMQP сущности и маршруты определяются самим приложением, а не посредником. Сюда входят объявление очередей и обменников, определение их привязок и подписка на очереди.

Хотя это даёт разработчикам свободу действий, им нужно учитывать возможные конфликты определений. Однако такие конфликты редко возникают на практике и проявляются как ошибки конфигурации (misconfiguration).

Приложения объявляют AMQP сущности, определяют необходимые маршруты или удаляют ненужные AMQP сущности.


Обменники и их типы

Обменники используются для отправки сообщений в AMQP. Они направляют сообщения в одну или несколько очередей после получения сообщения. Тип маршрута определяется типом обменника и правилами привязки (bindings). В AMQP 0-9-1 есть четыре типа обменников:

Name (обменник) Default pre-declared names (предварительно объявленные имена по умолчанию)
Direct exchange (прямой обменник) (Empty string) and amq.direct
Fanout exchange (широковещательный обменник) amq.fanout
Topic exchange (тематический обменник) amq.topic
Headers exchange (обменник заголовков) amq.match (and amq.headers in RabbitMQ)

Кроме типа, при объявлении обменника можно задать множество других свойств, среди которых наиболее важны следующие:

  • Name;
  • Durability (будет ли обменник существовать после перезапуска посредника);
  • Auto-delete (удаляется ли обменник после того, как все связанные с ним очереди перестают его использовать);
  • Arguments (зависят от конкретного посредника).

Обменники могут иметь два состояния: постоянное (durable) и временное (transient). Постоянные обменники сохраняются после перезапуска посредников, временные — нет (они должны быть повторно объявлены после перезапуска). Не все приложения требуют постоянных обменников.


По умолчанию обменник

По умолчанию обменник — это безымянный прямой обменник, предварительно объявленный посредником без имени (имя — пустая строка). У него есть специальное свойство, которое делает его полезным для простых приложений: каждая новая очередь автоматически привязывается к нему с именем привязки, совпадающим с именем очереди. Например, когда вы объявляете очередь с именем «search-indexing-online», посредник автоматически привязывает её к умолчанию обменнику с именем привязки «search-indexing-online». Таким образом, сообщения с именем привязки «search-indexing-online» направляются в очередь «search-indexing-online». По умолчанию обменник выглядит так, будто он напрямую отправляет сообщения в очередь, хотя технически он этого не делает.


Прямой обменник

Прямой обменник направляет сообщения на основе ключа маршрутизации (routing key), указанного в сообщении, в соответствующую очередь. Прямые обменники используются для одноадресной маршрутизации (unicast routing), хотя они также могут обрабатывать многоадресную маршрутизацию. Вот как это работает:

  • Очередь привязывается к обменнику с ключом маршрутизации.
  • Сообщение с ключом маршрутизации R направляется прямому обменнику. Он направляет его в очередь с тем же ключом маршрутизации R. Прямые обменники часто используются для циклического распределения задач между несколькими рабочими (workers). Важно понимать, что в AMQP 0-9-1 балансировка нагрузки происходит между потребителями, а не очередями. Здесь приведён перевод текста на русский язык:

Fanout-обменник

Fanout-обменник (fanout exchange) направляет сообщения всем очередям, которые к нему привязаны, игнорируя ключ маршрутизации, который был использован при привязке. Если N очередей привязано к fanout-обменнику, то при отправке сообщения в этот обменник оно будет скопировано и доставлено во все N очередей. Fanout используется для широковещательной маршрутизации сообщений.

Поскольку fanout-обменники отправляют копии сообщений во все очереди, к которым они привязаны, их сценарии использования очень похожи:

  • В масштабных многопользовательских онлайн-играх (MMO) они могут использоваться для обновления рейтингов и других глобальных событий.
  • На новостных сайтах они могут быть использованы для почти мгновенного распространения обновлений результатов спортивных соревнований на мобильные устройства.
  • В системах рассылки они используются для распространения различных состояний и конфигурационных обновлений.
  • При групповом общении они применяются для отправки сообщений всем участникам группы. (AMQP не имеет встроенной концепции присутствия, поэтому XMPP может быть лучшим выбором).

Пример fanout-обмена:

enter image description here


Topic-обменник

Topic-обменник (topic exchanges) направляет сообщение в одну или несколько очередей на основе соответствия между ключом маршрутизации сообщения и привязкой очереди к обмену. Topic-обменники часто используются для реализации различных схем распределения/подписки и их вариаций. Они обычно используются для многоадресной маршрутизации сообщений (multicast routing).

У topic-обменников есть множество сценариев использования. Когда возникает задача, требующая выборочного получения сообщений от нескольких потребителей/приложений (multiple consumers/applications), topic-обменник может быть рассмотрен.

Примеры использования:

  • Распространение данных о конкретных географических местоположениях, таких как точки продаж.
  • Выполнение фоновых задач несколькими работниками (workers), каждый из которых отвечает за определённые задачи.
  • Обновление цен на акции (и другие финансовые данные).
  • Новостные обновления, связанные с классификацией или метками (например, для определённых спортивных команд или лиг).
  • Координация облачных сервисов и системная интеграция, где каждый разработчик может обрабатывать только определённую архитектуру или систему.

Что такое режим привязки, мы рассмотрим позже, когда будем говорить о конкретной реализации протокола AMQP в RabbitMQ.


Headers-обменник

Иногда маршрутизация сообщений включает в себя несколько атрибутов, и в этом случае headers-обменник является подходящим решением. Headers-обменники используют несколько свойств сообщения вместо ключа маршрутизации для создания правил маршрутизации. Правила маршрутизации устанавливаются путём сопоставления значений заголовков сообщения с указанными привязками.

Мы можем привязать очередь к headers-обменику и использовать несколько заголовков для этой привязки. В этом случае посредник должен получить дополнительную информацию от разработчика приложения, то есть он должен учитывать, требуется ли частичное или полное соответствие сообщения. «Дополнительная информация» — это параметр «x-match». Когда «x-match» установлен на «any», любое значение заголовка может соответствовать условию, а когда «x-match» установлено на «all», все значения заголовка должны совпадать.

Headers-обменники можно рассматривать как разновидность direct-обмена. Они могут работать аналогично direct-обмену, но правила маршрутизации основаны на значениях заголовков, а не на ключах маршрутизации. Ключи маршрутизации должны быть строками, в то время как значения заголовков могут быть целыми числами, хэшами и т. д.


Очередь

Очереди в AMQP похожи на очереди в других системах обмена сообщениями или очередях задач: они хранят сообщения, которые будут обработаны потребителями. Очереди имеют общие свойства с обменниками, но также обладают дополнительными свойствами:

  • Name
  • Durable (сообщения сохраняются после перезапуска посредника)
  • Exclusive (используется только одним соединением, и очередь удаляется после закрытия соединения)
  • Auto-delete (удаляется после отмены подписки последнего потребителя)
  • Arguments (посредник использует их для дополнительных функций, подобных TTL)

Очередь становится доступной после объявления (declare). Если очередь не существует, объявление создаёт её. Если объявленная очередь уже существует и свойства полностью совпадают, объявление не влияет на существующую очередь. Если свойства объявленной очереди отличаются от существующей, генерируется ошибка канала 406.


Имя очереди

Имя очереди может быть выбрано приложением (application) или автоматически сгенерировано посредником. Имя очереди может содержать до 255 байт в кодировке utf-8. Если вы хотите, чтобы посредник сгенерировал имя очереди, укажите пустую строку в параметре name очереди. Последующие методы в одном канале позволяют нам использовать пустую строку для обозначения имени очереди, созданного ранее. Посредник молча запоминает последнее созданное им имя очереди.

Имена очередей, начинающиеся с «amq.», зарезервированы для внутреннего использования посредником. Попытка нарушить это правило при объявлении очереди приведёт к ошибке канала 403 (ACCESS_REFUSED).


Постоянные очереди

Постоянные очереди (Durable queues) сохраняются на диске и остаются после перезапуска посредника. Не все сценарии требуют сохранения очередей.

Постоянство очередей не делает сообщения, направляемые в них, постоянными. Если посредник выйдет из строя и перезапустится, постоянные очереди будут повторно объявлены. Только постоянные сообщения могут быть восстановлены после сбоя.


Привязка

Привязка (Binding) — это набор правил, которым следует посредник для направления сообщений (message) в очередь (queue). Чтобы указать посреднику «E» направить сообщение в очередь «Q», необходимо выполнить привязку между «Q» и «E». Для некоторых типов обменников привязка требует указания ключа маршрутизации (routing key). Ключ маршрутизации определяет, какие сообщения из множества отправленных в обменник будут направлены в связанную очередь.

Благодаря промежуточному слою в виде обменника, многие сложные схемы маршрутизации, которые было бы трудно реализовать напрямую между издателем и очередью, становятся возможными, избавляя разработчиков приложений от необходимости повторять эту работу.

Если сообщение не может быть направлено в очередь (например, из-за отсутствия привязки очереди к обменнику), оно либо уничтожается на месте, либо возвращается отправителю. Способ обработки зависит от настроек сообщения, установленных отправителем.


Потребитель

Сообщения бесполезны, если они просто хранятся в очереди. Их ценность раскрывается, когда они обрабатываются приложением. В модели AMQP 0-9-1 есть два способа достижения этого:

  • Отправка сообщений приложению («push API»)
  • Приложение получает сообщения по мере необходимости («pull API»)

Используя push API, приложение явно указывает, какие сообщения в определённой очереди его интересуют. Мы можем сказать, что приложение зарегистрировало потребителя или подписалось на очередь. Одна очередь может иметь несколько потребителей или одного эксклюзивного потребителя (если эксклюзивный потребитель существует, другие потребители исключаются).

Каждый потребитель (подписчик) имеет уникальный идентификатор, называемый идентификатором потребителя. Он используется для отмены подписки на сообщения. Идентификатор потребителя фактически представляет собой строку.


Подтверждение сообщений

Потребительские приложения (Consumer applications), предназначенные для приёма и обработки сообщений, иногда могут давать сбой или даже аварийно завершать работу. Кроме того, проблемы с сетью также могут вызывать различные проблемы. Это ставит перед нами задачу: когда посредник должен удалять сообщения? В AMQP 0-9-1 предлагаются две рекомендации:

  • Посредник удаляет сообщение сразу после его отправки приложению (используя метод AMQP basic.deliver или basic.get-ok).
  • Сообщение удаляется после того, как приложение отправляет подтверждение (acknowledgement) (используя метод AMPQ basic.ack).

Первый подход называется моделью автоматического подтверждения (automatic acknowledgement model), а второй — моделью явного подтверждения (explicit acknowledgement model). В явной модели потребительское приложение решает, когда отправлять подтверждение. Приложение может отправить подтверждение сразу после получения сообщения, сохранить необработанные сообщения и отправить подтверждение позже, или дождаться завершения обработки сообщения перед отправкой подтверждения.

Если потребитель аварийно завершает работу до отправки подтверждения, посредник повторно направит сообщение другому потребителю. Если доступных потребителей нет, посредник будет ждать регистрации нового потребителя в этой очереди и затем попытается повторно направить сообщение.


Отклонение сообщений

После получения сообщения потребитель может успешно обработать его или столкнуться с проблемами. Приложение может сообщить посреднику, что обработка сообщения не удалась по причине отклонения сообщения (или невозможности завершить обработку в данный момент). При отклонении сообщения приложение может указать посреднику, как обработать сообщение — уничтожить его или повторно поместить в очередь. При наличии только одного потребителя в очереди убедитесь, что отклонение сообщения и выбор повторной постановки в очередь не приведут к бесконечному циклу сообщения у одного потребителя.


Отрицательные подтверждения

В сценариях с несколькими потребителями, совместно использующими одну очередь, полезно указать, сколько сообщений может быть принято каждым потребителем до получения следующего подтверждения. Это может помочь в пакетной рассылке сообщений и повысить пропускную способность сообщений.

Обратите внимание, что RabbitMQ поддерживает только предварительную выборку на уровне канала, а не на уровне соединения или на основе размера. ### Сообщение и его свойства (атрибуты) в AMQP

В модели AMQP сообщение (Message) — это объект с атрибутами. Некоторые атрибуты настолько распространены, что AMQP 0-9-1 явно определяет их, и разработчикам приложений не нужно задумываться о том, что представляют собой имена этих атрибутов. Например:

  • Content type (тип содержимого);
  • Content encoding (кодирование содержимого);
  • Routing key (ключ маршрутизации);
  • Delivery mode (persistent or not) (режим доставки (постоянный или нет));
  • Message priority (приоритет сообщения);
  • Message publishing timestamp (отметка времени публикации сообщения);
  • Expiration period (срок действия сообщения);
  • Publisher application id (идентификатор приложения издателя).

Некоторые атрибуты используются AMQP-посредником, но большинство из них открыты для интерпретации получающим их приложением. Некоторые атрибуты являются необязательными и называются заголовками сообщений. Они похожи на X-заголовки HTTP-протокола. Свойства сообщения должны быть определены при публикации сообщения.

Помимо свойств, сообщение AMQP также содержит полезную нагрузку — Payload, которую AMQP-посредник обрабатывает как непрозрачный массив байтов. Посредник не проверяет и не изменяет полезную нагрузку. Сообщение может содержать только атрибуты без полезной нагрузки. Обычно оно использует данные в формате, подобном JSON, для экономии места, а протоколы буфера и MessagePack сериализуют структурированные данные, чтобы их можно было опубликовать в виде полезной нагрузки сообщения. AMQP и подобные ему обычно используют поля «content-type» и «content-encoding» для идентификации полезной нагрузки сообщений, но это основано только на соглашении.

Сообщения могут быть опубликованы постоянным образом, и AMQP-брокер будет хранить их на диске. Если сервер перезапустится, система подтвердит, что полученные постоянные сообщения не были потеряны. Просто отправка сообщения на постоянный обмен или маршрутизация к постоянной очереди не делает сообщение постоянным: это полностью зависит от режима постоянства самого сообщения (persistence mode). Публикация сообщения постоянным способом оказывает определённое влияние на производительность (как и операции с базой данных, надёжность требует некоторых жертв производительности).

Подтверждение сообщения

Из-за неопределённости сети и возможности сбоя приложения подтверждение получения (acknowledgement) становится очень важным. Иногда достаточно подтвердить, что потребитель получил сообщение, иногда подтверждение означает, что сообщение было проверено и обработано, например, данные были проверены и сохранены или проиндексированы.

Эта ситуация очень распространена, поэтому AMQP 0-9-1 имеет встроенную функцию подтверждения сообщений (message acknowledgements), которую потребители используют для подтверждения получения или обработки сообщений. Если приложение аварийно завершает работу (в этот момент соединение разрывается, поэтому AMPQ-посредник также знает об этом), и функция подтверждения сообщения была включена, но посредник ещё не получил подтверждения, то сообщение будет повторно помещено в очередь (и немедленно доставлено другому потребителю, если есть другие потребители в этой очереди).

Встроенная функция подтверждения сообщений в протоколе помогает разработчикам создавать мощное программное обеспечение.

Методы AMQP 0–9–1

AMQP 0–9–1 состоит из множества методов (methods). Метод — это операция, которая не имеет ничего общего с методами в объектно-ориентированном программировании. Методы AMQP сгруппированы в классы. Здесь класс — это просто логическая группировка методов AMQP. В справочнике AMQP 0–9–1 подробно описаны методы AMQP.

Рассмотрим класс Exchange. Существует группа методов, связанных с операциями обмена. Эти методы включают:

  • exchange.declare;
  • exchange.declare-ok;
  • exchange.delete;
  • exchange.delete-ok.

Эти операции разделены на «запросы» (requests), отправленные клиентом, и «ответы» (responses), отправленные посредником в ответ на предыдущие «запросы».

Например, клиент запрашивает, чтобы посредник использовал метод exchange.declare для объявления нового обмена:

Клиент запрашивает у посредника использовать метод exchange.declare для объявления нового обмена.

Как показано на рисунке выше, метод exchange.declare имеет несколько параметров. Эти параметры позволяют клиенту указать имя обмена, тип, является ли он постоянным и т. д.

После успешной операции посредник отвечает методом exchange.declare-ok:

Посредник отвечает методом exchange.declare-ok после успешной операции.

Метод exchange.declare-ok, кроме номера канала, не имеет других параметров.

Методы класса Queue, связанные с операциями очереди, имеют аналогичную последовательность событий:

Аналогичная последовательность событий происходит с методами класса Queue.

Не все методы AMQP имеют соответствующую «вторую половину». Многие (наиболее часто используемый — basic.publish) не имеют соответствующего «ответа», некоторые (например, basic.get) имеют более одного соответствующего «ответа».

Соединение

Обычно соединения AMQP являются постоянными. AMQP — это протокол прикладного уровня, который обеспечивает надёжную доставку через TCP. AMQP использует механизмы аутентификации и предоставляет TLS (SSL) защиту. Когда приложению больше не требуется подключаться к AMQP-посреднику, необходимо изящно освободить AMQP-соединение, а не просто закрыть TCP-соединение.

Канал

Некоторым приложениям требуется установить несколько соединений с AMQP-посредником. Однако одновременное открытие нескольких TCP-соединений нецелесообразно, поскольку это потребляет слишком много системных ресурсов и усложняет настройку брандмауэра. AMQP 0–9–1 предоставляет каналы для управления несколькими подключениями, которые можно рассматривать как совместное использование одного TCP-соединения несколькими облегчёнными подключениями.

В приложениях с несколькими потоками/процессами обычно используется один канал (channel) для каждого потока/процесса, и эти каналы не могут совместно использоваться потоками/процессами.

Связь на одном канале полностью изолирована от связи на другом канале, поэтому каждому методу AMQP необходимо указать номер канала, чтобы клиент мог указать, для какого канала предназначен этот метод.

Виртуальный хост

Чтобы реализовать несколько изолированных сред (пользователей, групп пользователей, обменов, очередей и т.д.) на одном отдельном посреднике, AMQP предлагает концепцию виртуального хоста (virtual hosts — vhosts). Это похоже на концепцию виртуальных хостов на веб-серверах, которая обеспечивает полную изоляцию для сущностей AMQP. При установлении соединения клиент указывает, какой виртуальный хост использовать.

Руководство по разработке Java-клиента

Подробные инструкции см. в официальной документации.

Руководство по API Java Client — RabbitMQ

RabbitMQ Java-клиент использует com.rabbitmq.client в качестве своего корневого пакета. Ключевые классы и интерфейсы включают:

  • Channel: представляет канал AMQP 0–9–1 и предоставляет большую часть операций (методов протокола).
  • Connection: представляет соединение AMQP 0–9–1.
  • ConnectionFactory: создаёт экземпляры Connection.
  • Consumer: представляет потребителя сообщений.
  • DefaultConsumer: базовый класс для потребителей.
  • BasicProperties: свойства сообщения (метаданные).
  • BasicProperties.Builder: построитель BasicProperties.

Через интерфейс Channel можно выполнять операции над протоколом. Соединение используется для открытия каналов, регистрации событий жизненного цикла соединения и закрытия ненужных соединений. ConnectionFactory используется для создания экземпляров Connection и позволяет устанавливать такие свойства, как vhost, username и т. п. Инструменты класса

public class RabbitmqUtil {
    private final String keyPrefix="spring.rabbitmq.";
    private final YamlUtil yamlUtil;
    private final RabbitmqClient rabbitmqClient;

    public RabbitmqUtil(String ymlPath) {
        this.yamlUtil =new YamlUtil(ymlPath);
        this.rabbitmqClient=RabbitmqClient.builder()
                .userName(yamlUtil.get(keyPrefix+"username"))
                .password(yamlUtil.get(keyPrefix+"password"))
                .host(yamlUtil.get(keyPrefix+"host"))
                .port(Integer.valueOf(yamlUtil.get(keyPrefix+"port")))
                .virtualHost(yamlUtil.get(keyPrefix+"virtual-host"))
                .build();
    }

    public Connection getConnection() throws IOException, TimeoutException {
       return rabbitmqClient.getConnection();
    }
}

Тестирование

        RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
        Connection connection = rabbitmqUtil.getConnection();

Для локального узла RabbitMQ все эти параметры имеют подходящие значения по умолчанию.

Если перед созданием соединения не указаны значения параметров, то используются значения по умолчанию:

Property Default Value
Username "guest"
Password "guest"
Virtual host "/"
Hostname "localhost"
port 5672 для нормального обмена данными, 5671 для SSL-шифрованного обмена

Обратите внимание, что по умолчанию пользователь с правами гостя может подключаться только локально (https://www.rabbitmq.com/access-control.html). Это сделано для ограничения использования известных учётных данных в производственных системах.

# В конфигурационном файле установите loopback_users равным none, тогда пользователь guest сможет подключаться удалённо.
loopback_users = none

Создание канала

    public Channel createChannel() throws IOException {
        log.info("Канал создаётся...");
       return connection.createChannel();
    }

Закрытие соединения Rabbitmq

Чтобы закрыть соединение с RabbitMQ, достаточно просто закрыть канал и соединение:

    public void close(){
        log.info("Закрытие соединения с RabbitMQ...");
        try {
            //channel.close(); необязательно
            connection.close();
        } catch (IOException e) {
            log.error("Ошибка при закрытии соединения: ",e);
        }
    }

Следует отметить, что закрытие канала не является обязательным действием. Канал будет автоматически закрыт при закрытии соединения.


Срок жизни соединения и канала

Connections — это постоянные соединения. Протоколы и оптимизация разработаны с учётом требований постоянных соединений. Поэтому создание нового соединения для каждой операции, такой как отправка сообщения, крайне не рекомендуется, так как это приводит к большому количеству сетевых обменов и накладным расходам.

Хотя Channels также являются постоянными, они могут быть закрыты из-за большого количества восстанавливаемых ошибок протокола. Срок службы каналов немного меньше, чем у соединений. Хотя открытие и закрытие канала для каждой операции не обязательно, это всё же возможно. В некоторых случаях предпочтительнее повторно использовать каналы.

Например, попытка получить сообщение из несуществующей очереди приведёт к ошибке уровня канала, которая приведёт к закрытию канала. После закрытия канал больше нельзя использовать, и он не будет получать такие события сервера, как доставка сообщений. RabbitMQ регистрирует ошибки уровня канала и инициирует последовательность закрытия канала.


Предоставление метки для текущего соединения

RabbitMQ узел может хранить ограниченную информацию о клиенте:

  • TCP-узел клиента (исходный IP-адрес и порт)
  • Используемые учётные данные

AMQP 0-9-1 клиенты, включая RabbitMQ Java клиент, могут предоставить уникальный идентификатор, который будет отображаться в журналах сервера и интерфейсе управления, чтобы различать клиентов. После настройки имя будет отражено в содержимом журнала и интерфейсе управления. Идентификатор называется меткой текущего соединения. Метка полезна для идентификации приложения или конкретного компонента приложения. Хотя метка не обязательна, настоятельно рекомендуется её предоставить, поскольку это значительно упрощает некоторые задачи.

image-20220526191401910

Метод newConnection имеет несколько перегрузок, некоторые из которых позволяют установить метку для этого соединения.

    public Connection getConnection(String connectionName) throws IOException, TimeoutException {
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        return connectionFactory.newConnection(connectionName);
    }

image-20220526191357387


Обмен и очередь Обмен и очередь необходимо объявлять заранее.

Проще говоря, объявление объектов любого типа делается для того, чтобы убедиться в их существовании и создать их при необходимости.

Очередь с эксклюзивным доступом

В следующем коде объявляется обмен и именованная очередь сервера RabbitMQ, а затем они связываются:

        channel.exchangeDeclare("dhy-exchange", "direct", true);
        //queueDeclare создаёт имя очереди
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, "dhy-exchange", "dhy");

Это приведёт к активному объявлению следующих объектов, которые можно настроить с помощью дополнительных параметров. В этом примере им не заданы специальные параметры:

  • постоянный, неавтоматически удаляемый прямой обмен;
  • очередь с системным именем, непостоянная, с эксклюзивным доступом и автоматическим удалением.

Обратите внимание, что это типичный способ объявления очереди, когда только один клиент планирует эксклюзивный доступ. Очередь автоматически очищается (удаляется). Если несколько клиентов планируют использовать одну и ту же очередь, лучше подойдёт следующий код:

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

Он приведёт к активному объявлению:

  • постоянного, неавтоматически удалённого прямого обмена;
  • очереди с заданным именем, постоянной, совместно используемой и неавтоматически удалённой.

Многие методы интерфейса Channel перегружены. Используемые здесь короткие перегрузки методов exchangeDeclare, queueDeclare и queueBind используют подходящие значения по умолчанию, что упрощает их использование. Конечно, есть и более длинные перегрузки с дополнительными параметрами, позволяющими переопределить некоторые значения по умолчанию и получить более полный контроль.


Пассивное объявление очереди и обмена

Очередь и обмен могут быть объявлены пассивно. Пассивное объявление просто проверяет, существует ли соответствующий объект. Для успешно обнаруженной очереди пассивное объявление возвращает ту же информацию, что и активное объявление, то есть количество потребителей и сообщений в очереди.

Если соответствующий объект не существует, операция вызовет исключение на уровне канала, после чего канал нельзя будет использовать, и потребуется открыть новый канал. Обычно пассивные объявления выполняются с использованием временных одноразовых каналов.

Методы Channel#queueDeclarePassive и Channel#exchangeDeclarePassive используются для пассивных объявлений. Ниже показано использование Channel#queueDeclarePassive:

        AMQP.Queue.DeclareOk response = channel.queueDeclarePassive("queue-name");
        response.getMessageCount();
        response.getConsumerCount();

Возвращаемое значение метода Channel#exchangeDeclarePassive не содержит полезной информации. Пока метод возвращает правильно и без исключений на канале, это означает, что обмен уже существует.


Не дожидаясь ответа сервера

Некоторые распространённые операции также имеют «неблокирующие» версии, которые не ждут ответа сервера. Например, следующий метод объявляет очередь и уведомляет сервер не отправлять ответ:

channel.queueDeclareNoWait(queueName, true, false, false, null);

Неблокирующие версии операций более эффективны, но менее безопасны, например, они больше полагаются на механизм сердцебиения для обнаружения неудачных операций. Если вы не уверены, начните с стандартных версий операций. Неблокирующие версии требуются только в сложных топологиях (очереди, привязки).


Удаление сущностей и сообщений

Сущности и сообщения можно явно удалить:

channel.queueDelete("queue-name")

Или удалить, когда очередь пуста:

channel.queueDelete("queue-name", false, true)

Либо когда она больше не используется (нет потребителей, потребляющих из неё):

channel.queueDelete("queue-name", true, false)

Очередь можно очистить (удалить все сообщения):

channel.queuePurge("queue-name")

Отправка сообщений

// Отправка сообщения на обмен
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,"hello rabbitmq".getBytes(StandardCharsets.UTF_8));

Для более точного контроля можно использовать перегруженные варианты для указания обязательного атрибута или отправки сообщений с предопределёнными свойствами.

channel.basicPublish(exchangeName, routingKey, mandatory,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     messageBodyBytes);

Обязательный означает обязательный.

Когда мы отправляем сообщение на обмен, если сообщение не может быть перенаправлено ни на одну из очередей, связанных с этим обменом, и издатель отправляет сообщение с обязательным атрибутом, установленным в false (по умолчанию), сообщение будет перенаправлено на альтернативный обмен, при условии, что он существует, иначе будет записано предупреждение.

При отправке сообщения на обмен, если сообщение не может быть перенаправлено ни в одну из очередей, привязанных к этому обмену, и издатель отправляет сообщение с обязательным атрибутом, установленным в true, сообщение будет возвращено издателю, и отправитель должен предоставить обработчик обратного вызова для обработки сообщения об ошибке маршрутизации.

Атрибут mandatory подробно описан в официальной документации: Издатели — RabbitMQ.

Альтернативный обмен подробно описан в официальной документации: Альтернативные обмены — RabbitMQ.


Следующий пример отправляет сообщение, указывая режим доставки как 2 (постоянный) и приоритет 1, а также тип содержимого («text/plain»). Используйте класс Builder для создания объекта свойств сообщения, который требует нескольких атрибутов, таких как:

        // Отправка сообщения на обмен
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
                new AMQP.BasicProperties.Builder()
                        .contentType("text/plain")
                        .deliveryMode(2)
                        .priority(1)
                        .userId("dhy")
                        .build(),
                "hello2".getBytes(StandardCharsets.UTF-8));

Пример отправки сообщения с настраиваемыми заголовками:

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("dhy", 18);

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .headers(headers)
               .build(),
               "hello3".getBytes(StandardCharsets.UTF-8));

Следующий пример публикует сообщение с атрибутом срока действия:


``` **Перевод текста на русский язык:**

Здесь мы не показываем все возможности.

Обратите внимание, что `BasicProperties`  это встроенный класс, автоматически генерируемый AMQP для хранения свойств.

#### Канал и параллелизм

Следует избегать совместного использования канала между потоками. Приложение должно использовать отдельный канал для каждого потока, а не делиться каналом между несколькими потоками.

Хотя некоторые операции на канале можно безопасно выполнять параллельно, некоторые операции нельзя выполнять параллельно. Это может привести к ошибкам в сети или повторному подтверждению и другим проблемам.

Выполнение параллельных операций публикации на общем канале приведёт к ошибкам в сети, вызовет исключение протокола на уровне соединения и приведёт к тому, что соединение будет напрямую закрыто прокси-сервером. Поэтому необходимо явно синхронизировать в коде приложения (необходимо вызывать `Channel#basicPublish` в ключевых частях).

Совместное использование канала между потоками также может помешать [подтверждению издателя](https://www.rabbitmq.com/confirms.html). Лучше всего полностью избегать параллельной публикации на общих каналах, например, используя один канал для каждого потока для достижения параллелизма.

Также можно избежать параллельной отправки сообщений на общие каналы с помощью пула каналов: как только поток завершает работу с определённым каналом, он возвращает канал в пул, чтобы другие потоки могли его повторно использовать. Пул каналов можно рассматривать как особый механизм синхронизации. Рекомендуется использовать готовые библиотеки пулов, а не создавать свои собственные. Например, [Spring AMQP](https://projects.spring.io/spring-amqp/).

Каналы потребляют ресурсы, и в большинстве приложений в одном процессе JVM редко открывается более нескольких сотен каналов. Представьте, что каждый поток в нашем приложении имеет свой собственный канал (поскольку один и тот же канал не должен использоваться для параллельных операций), и вы получите довольно большие накладные расходы в одной JVM, которые можно было бы избежать. Кроме того, небольшое количество быстрых издателей может легко перегрузить сетевой интерфейс и узлы прокси.

Классический антипаттерн, которого следует избегать,  открывать отдельный канал для каждой отправляемой публикации. Каналы должны быть долговечными.

Безопасно, когда один поток обрабатывает сообщения, а другой поток отправляет сообщения на общий канал.

Механизм доставки сообщений (описанный ниже) является параллельным и обеспечивает фиксированный порядок для каждого канала. Механизм распределения используется в каждом соединении с помощью [java.util.concurrent.ExecutorService](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html). Все соединения, созданные с использованием `ConnectionFactory#setSharedExecutor`, могут совместно использовать пользовательский `executor`.

#### Получение сообщений через подписку (интерфейс обратного вызова)

Самый эффективный способ получения сообщений  использовать интерфейс `Consumer` для настройки подписки. Сообщения автоматически доставляются при их поступлении, а не запрашиваются явно.

Когда вызываются методы, связанные с интерфейсом `Consumers`, подписка всегда ссылается на одного потребителя. Потребительские метки могут быть созданы клиентом или сервером для идентификации потребителей. Чтобы RabbitMQ сгенерировал уникальный тег в пределах узла, можно использовать перегрузку `Channel#basicConsume` без атрибута `consumerTag` или передать пустую строку в качестве `consumerTag`, а затем использовать значение, возвращаемое `Channel#basicConsume`. Потребительская метка также используется для отмены потребителя.

Разные экземпляры потребителей должны иметь разные потребительские метки. Крайне не рекомендуется иметь повторяющиеся потребительские метки на одном и том же соединении, так как это может вызвать проблемы с [автоматическим восстановлением соединения](https://www.rabbitmq.com/api-guide.html#connection-recovery) и запутать данные мониторинга при мониторинге потребителей.

Простейший способ реализовать `Consumer`  создать подкласс `DefaultConsumer`. Экземпляр этого подкласса можно передать в качестве параметра при вызове `basicConsume`, чтобы настроить подписку:

```java
@Slf4j
public class Publisher implements Runnable {
    @Override
    public void run() {
        try {
            RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.prepareChannel();
            channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,"你好,我是生产者".getBytes(StandardCharsets.UTF_8));
            log.info("Отправка сообщения...");
        } catch (IOException | TimeoutException e) {
            log.error("Возникло исключение: ",e);
        }
    }
}
@Slf4j
public class Consumer implements Runnable{
    @Override
    public void run() {
        RabbitmqUtil rabbitmqUtil = null;
        try {
            rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.prepareChannel();
            // Не включаем автоматическое подтверждение
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, "myConsumerTag",
                    new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body)
                                throws IOException
                        {
                            String routingKey = envelope.getRoutingKey();
                            String contentType = properties.getContentType();
                            long deliveryTag = envelope.getDeliveryTag();
                            // Подтверждаем получение сообщения вручную
                            channel.basicAck(deliveryTag, false);
                            log.info("Получено сообщение: {} , ключ маршрутизации: {} , тип: {}",new String(body),routingKey,contentType);
                        }
                    });
        } catch (IOException | TimeoutException e) {
            log.error("Возникло исключение: ",e);
        }
    }
}

**Тест:**

```java
        Thread consumer = new Thread(new Consumer(),"Потребитель");
        Thread publisher = new Thread(new Publisher(),"Издатель");
        consumer.start();
        publisher.start();
``` **Здесь, поскольку мы установили `autoAck = false`, необходимо вручную подтверждать сообщения, доставленные потребителю.** Самый простой способ  сделать это в методе `handleDelivery`, как было описано выше.

Более сложные потребители должны переписать другие методы. В частности, следует отметить, что при закрытии канала и соединения вызывается метод `handleShutdownSignal`, а метод `handleConsumeOk` передаётся потребителю перед вызовом других обратных вызовов потребителя.

```java
    @Override
    public void handleConsumeOk(String consumerTag) {
        this._consumerTag = consumerTag;
    }

Потребители также могут использовать методы handleCancelOk и handleCancel для получения информации о том, была ли отмена явной или неявной.

Вы можете явно отменить конкретного потребителя с помощью метода Channel.basicCancel, передав тег потребителя.

channel.basicCancel(consumerTag);

Передаётся тег потребителя.

Как и издателям, здесь также необходимо учитывать параллельную безопасность потребителей.

Обратные вызовы потребителей выполняются в отдельном пуле потоков, который отличается от пула, связанного с экземпляром канала. Это означает, что потребители могут безопасно вызывать такие блокирующие методы, как Channel#queueDeclare и Channel#basicCancel.

Каждый канал имеет свой собственный пул планирования. Для большинства распространённых сценариев, когда на каждом канале есть один потребитель, это означает, что потребители не влияют друг на друга. Однако следует помнить, что если в канале несколько потребителей, долго работающий потребитель может блокировать выполнение обратных вызовов других потребителей в этом канале.

Получение одного сообщения (извлечение интерфейса)

Попробуйте извлечь сообщение. Если сообщение существует, оно возвращается, в противном случае возвращается значение null. Поэтому большинству клиентов необходимо постоянно опрашивать, чтобы получить сообщение, поэтому этот метод не рекомендуется.

Используйте Channel.basicGet для извлечения («извлечения») сообщений. Возвращаемое значение представляет собой экземпляр объекта GetResponse, содержащего информацию об атрибутах и теле сообщения.

@Slf4j
public class Consumer implements Runnable{
    @Override
    public void run() {
        RabbitmqUtil rabbitmqUtil = null;
        try {
            rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.prepareChannel();
            boolean autoAck = false;
            while(true){
                GetResponse response = channel.basicGet(QUEUE_NAME, autoAck);
                if (response == null) {
                    log.info("当前无消息....");
                } else {
                    byte[] body = response.getBody();
                    log.info("msg: {}",new String(body));
                    channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
                    break;
                }
            }
        } catch (IOException | TimeoutException e) {
            log.error("出现异常: ",e);
        }
    }
}

ChannIN класс на самом деле очень прост:

@Override
    public GetResponse basicGet(String queue, boolean autoAck)
        throws IOException
    {
        validateQueueNameLength(queue);
        //构造命令
        AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder()
                                                  .queue(queue)
                                                  .noAck(autoAck)
                                                 .build());
        //命令执行
        Method method = replyCommand.getMethod();
        //判断执行结果
        if (method instanceof Basic.GetOk) {
            Basic.GetOk getOk = (Basic.GetOk)method;
            Envelope envelope = new Envelope(getOk.getDeliveryTag(),
                                             getOk.getRedelivered(),
                                             getOk.getExchange(),
                                             getOk.getRoutingKey());
            BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
            byte[] body = replyCommand.getContentBody();
            int messageCount = getOk.getMessageCount();

            metricsCollector.consumedMessage(this, getOk.getDeliveryTag(), autoAck);
            //有结果,那么构造成GetResponse后返回
            return new GetResponse(envelope, props, body, messageCount);
        } else if (method instanceof Basic.GetEmpty) {
            //没有消息,那么返回的结果为空,会返回null
            return null;
        } else {
            throw new UnexpectedMethodError(method);
        }
    }

Обработка сообщений, которые не удалось маршрутизировать

Если опубликованное сообщение имеет обязательный атрибут, но не было успешно маршрутизировано, прокси-сервер вернёт его отправившему клиенту (через команду AMQP.Basic.Return).

Клиент может получать эти уведомления о возврате, реализуя интерфейс ReturnListener и вызывая Channel.addReturnListener. Если клиент не настроил возврат для определённого канала, соответствующие сообщения будут молча отброшены. Перевод текста на русский язык:

Канал.basicPublish (EXCHANGE_NAME, UNKNOWN_ROUTING_KEY, true, null, «你好, я — производитель».getBytes(StandardCharsets.UTF_8));

Например, клиент отправил сообщение с обязательным флагом, это сообщение установило тип обмена как «прямой», но обмен не был привязан к очереди, в этом случае будет вызван возврат прослушивателя.

Thread publisher = new Thread(new Publisher(), «Поток производителя»);
publisher.start();

Журнал:

19:52:56.103 [Поток производителя] INFO com.dhy.util.RabbitmqUtil — Соединение установлено 19:52:56.232 [Поток производителя] INFO com.dhy.util.RabbitmqUtil — Соединение установлено, имя соединения клиента dhy-connection 19:52:56.234 [Поток производителя] INFO com.dhy.util.RabbitmqUtil — Создание канала... 19:52:56.369 [Поток производителя] INFO com.dhy.util.RabbitmqUtil — Подготовка канала... 19:52:56.374 [Поток производителя] INFO com.dhy.Publisher — Отправка сообщения... 19:52:56.403 [AMQP Connection 110.40.155.17:5672] WARN com.dhy.RouteFailListener — Информация о сообщении об ошибке маршрутизации: replyCode=312, replyText=NO_ROUTE, exchange=dhy-exchange, routingKey=unknown, body=你 хороший, я — производитель


Потребительская операция пула потоков

По умолчанию потребительский поток будет распределён через новый пул потоков ExecutorService.

Можно увидеть, что размер потока по умолчанию равен удвоенному количеству ядер процессора.

Если требуется больший контроль, можно использовать newConnection() для применения ExecutorService для замены. Это пример использования большего пула потоков:

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

Когда соединение закрывается, предоставленный ExecutorService также выполнит shutdown(), но пользовательский ExecutorService (как показано выше) не выполнит shutdown(). Клиент, предоставляющий собственный ExecutorService, должен гарантировать, что он в конечном итоге будет закрыт (т. е. вызвать метод shutdown()), иначе пул потоков повлияет на завершение работы JVM.

Один и тот же ExecutorService может совместно использоваться несколькими соединениями или постоянно повторно использоваться и повторно подключаться, но после закрытия его нельзя использовать снова.

Эту функцию следует рассматривать только в том случае, если обработка обратного вызова потребителя серьёзно перегружена. Если нет или требуется лишь небольшое количество обратных вызовов потребителя, то потока по умолчанию достаточно. Даже если иногда происходит резкое увеличение активности потребителей, первоначальная нагрузка невелика, а ресурсы потока не могут быть расширены бесконечно.


Использование списка хостов

Передача массива адресов в newConnection() не вызывает проблем. Адрес — это простой класс пакета com.rabbitmq.client, который содержит компоненты хоста и порта.

Например:

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1), new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);

Сначала будет предпринята попытка подключения к hostname1:portnumber1, и если она не удастся, будет предпринята вторая попытка hostname2:portnumber2. Возвращаемый объект соединения является первым успешным элементом массива (если не было выброшено исключение IOException). Это точно так же, как установка хоста и порта по отдельности, а затем последовательный вызов factory.newConnection() до успешного выполнения.

Если одновременно предоставляется ExecutorService (в factory.newConnection (es, addrArr)), то пул потоков также соответствует первому успешному соединению.


Реализация обнаружения службы с использованием интерфейса AddressResolver

Мы можем использовать интерфейс AddressResolver для изменения алгоритма разрешения конечной точки при подключении:

Connection conn = factory.newConnection(addressResolver);

Интерфейс AddressResolver похож на:

public interface AddressResolver {
  List<Address> getAddresses() throws IOException;
}

Как и в случае со списком хостов, сначала будет предпринята попытка первого адреса в списке, и если он не удастся, будет предпринята вторая попытка, пока не будет достигнут успех.

Если одновременно предоставляется ExecutorService (в factory.newConnection (es, addrArr)), то пул потоков также соответствует первому успешному соединению.

AddressResolver — лучший способ реализовать настраиваемое обнаружение служб, чтобы клиенты могли автоматически подключаться к узлам, которые были доступны при первом запуске.

Java-клиент поставляется с реализацией (см. javadoc):

  • DnsRecordIpAddressResolver: возвращает IP-адрес хоста на основе заданного имени хоста (для разрешения DNS-сервера).
  • DnsSrvRecordAddressResolver: на основе указанного имени службы возвращает хост/порт пары, которые могут реализовать функции, подобные регистрации и обнаружению служб, например, Eurkea, Consule.

По умолчанию существует только один адрес хоста, который использует DnsRecordIpAddressResolver.

Метод getAddresses будет вызываться в методе newConnection.


Проблема с атрибутом autoDelete

  • Условие автоматического удаления обмена: есть очередь или обменник, привязанный к этому обмену, и все очереди или обменники отвязываются от этого обмена, тогда этот обмен будет автоматически удалён, когда autoDelete=true.
  • Условие автоматического удаления очереди: есть подписчик на эту очередь, и все потребители отписываются от этой очереди, тогда эта очередь будет автоматически удалена, даже если в очереди всё ещё есть сообщения, когда autoDelete=true. Потребитель:
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    log.info("Сообщение: {}",new String(body));
                }
            });

Рабочий режим очередей

Основная идея рабочих очередей заключается в том, чтобы избежать немедленного выполнения ресурсоёмких задач и не ждать их завершения. Вместо этого мы планируем задачи на более позднее выполнение. Мы упаковываем задачи в сообщения и отправляем их в очередь. Фоновые рабочие процессы извлекают задачи и выполняют работу. Когда есть несколько рабочих потоков, эти рабочие потоки совместно обрабатывают задачи.

На этой диаграмме сообщения всё ещё помещаются в основной обменник, и по сравнению с простой очередью количество потребителей увеличилось. Поскольку сообщение может быть использовано только один раз, распределение сообщений становится нашей заботой.

RabbitMQ tutorial — Рабочие очереди — RabbitMQ

Код потребителя не меняется, код производителя также остаётся неизменным, нам просто нужно запустить двух потребителей одновременно, а также через веб-интерфейс вручную вставить сообщения в очередь для тестирования:

Мы отправили 6 сообщений, давайте посмотрим, сколько сообщений получил каждый потребитель:

По умолчанию RabbitMQ будет отправлять каждое сообщение следующему потребителю по очереди. В среднем каждый потребитель получит одинаковое количество сообщений. Этот механизм распределения сообщений называется циклическим перебором.


Механизм подтверждения сообщений

— Потребителю может потребоваться некоторое время для завершения одной задачи, если один из потребителей обрабатывает длинную задачу и завершает её лишь частично, что произойдёт? RabbitMQ немедленно помечает это сообщение как удалённое после того, как оно было передано потребителю. В этом случае, если потребитель внезапно выйдет из строя, мы потеряем сообщение, которое он обрабатывал, и все последующие сообщения, которые будут отправлены этому потребителю, поскольку он не сможет их получить.

— Чтобы гарантировать, что сообщения не будут потеряны во время передачи, RabbitMQ вводит механизм подтверждения сообщений: после получения и обработки сообщения потребитель сообщает RabbitMQ, что он обработал его, и RabbitMQ может удалить это сообщение.

Если потребитель не отвечает на определённое сообщение в течение указанного времени ожидания, текущий канал принудительно закрывается и возникает исключение PRECONDITION_FAILED уровня канала.

PRECONDITION_FAILED

Время ожидания по умолчанию составляет 30 минут.


Автоматическое подтверждение

Сообщения считаются успешно отправленными сразу после отправки. Этот режим требует баланса между высокой пропускной способностью и безопасностью передачи данных, потому что, если сообщение получено до того, как потребитель его получит, соединение или канал на стороне потребителя могут быть закрыты, и сообщение будет потеряно. Конечно, с другой стороны, этот режим позволяет потребителю передавать перегруженные сообщения без ограничения количества передаваемых сообщений. Конечно, это может привести к тому, что потребитель будет перегружен слишком большим количеством сообщений, которые он не успевает обработать, что приведёт к накоплению этих сообщений и, в конечном итоге, к исчерпанию памяти, и эти потребительские потоки будут убиты операционной системой, поэтому этот режим подходит только для ситуаций, когда потребитель может эффективно обрабатывать сообщения с определённой скоростью.

Рекомендуется не использовать автоматическое подтверждение.


Ручное подтверждение
  • Подтверждение сообщения

    // Первый параметр: подтвердить какое сообщение
    // Второй параметр: включить ли пакетное подтверждение
    channel.basicAck(envelope.getDeliveryTag(),false);

    Как работает пакетное подтверждение?

Можно выполнить пакетное подтверждение для ручного подтверждения, чтобы уменьшить сетевой трафик. Это достигается путём установки нескольких полей метода подтверждения в значение true.

Когда поле пакетного подтверждения установлено в значение true, например, предположим, что на канале Ch есть неподтверждённые сообщения с тегами доставки 5, 6, 7 и 8, когда подтверждение кадра достигает этого канала, delivery_tag установлен на 8 и поле пакетного подтверждения установлено на true, тогда все теги от 5 до 8 будут подтверждены.

Если поле пакетного подтверждения установлено в false, то 5, 6 и 7 по-прежнему не будут подтверждены.

Если сообщение получено потребителем, но до разрыва соединения не было ответа на сообщение, сообщение будет повторно поставлено в очередь.

  • Отклонение сообщения (два способа)

    // Первый параметр: отклонить какое сообщение
    // Второй параметр: повторно поставить ли отклонённое сообщение в очередь
    channel.basicReject(envelope.getDeliveryTag(),true);
    // Первый параметр: отклонить какое сообщение
    // Второй параметр: пакетный отказ
    // Третий параметр: повторно поставить ли отклонённое сообщение в очередь
    // Метод basic.nack может отклонять или повторно ставить в очередь несколько сообщений за один раз. В этом его отличие от basic.reject.
    channel.basicNack(envelope.getDeliveryTag(),true,true);

Пользователь не может обработать сообщение немедленно, но другие экземпляры могут это сделать. В этом случае может потребоваться повторно поставить сообщение в очередь, чтобы другой потребитель мог принять и обработать его. basic.reject и basic.nack — это два протокола, используемых для этой цели.

Это сообщение можно отбросить, повторно поставить в очередь или отправить в мёртвую букву. Это контролируется полем requeue. Если это поле установлено в true, брокер будет использовать указанный тег доставки для повторной постановки в очередь (или нескольких тегов доставки). В противном случае, если настроено, сообщение будет перенаправлено в обмен мёртвыми буквами, в противном случае оно будет отброшено.

При повторной постановке сообщения в очередь оно будет помещено обратно в свою исходную позицию в очереди, если это возможно. В противном случае (из-за одновременной передачи и подтверждения от других пользователей при совместном использовании очередей) сообщение будет повторно поставлено ближе к началу очереди.

Дополнительные сведения о подтверждении сообщений см. в официальной документации:

Подтверждение сообщений потребителями и подтверждение публикаций — RabbitMQ


Постоянство сообщений

Для обеспечения постоянства сообщений необходимо сначала сделать соответствующую очередь постоянной, а затем пометить сообщение как постоянное при публикации сообщения.

 channel.basicPublish("",QUEUE_NAME,true, 
                      // Добавляем атрибут постоянства к сообщению
                      MessageProperties.PERSISTENT_TEXT_PLAIN,("Номер"+i).getBytes(StandardCharsets.UTF_8));

Обратите внимание! ! !

Маркировка сообщения как постоянного не гарантирует, что сообщение не будет потеряно. Хотя оно сообщает RabbitMQ сохранить сообщение на диск, здесь всё ещё существует промежуток времени, когда сообщение готово к сохранению на диске, но ещё не сохранено, и сообщение всё ещё находится в кэше. В это время оно фактически не записывается на диск. Гарантия постоянства не является сильной, но для нашей простой очереди задач она уже достаточно сильна. Если требуется более сильная стратегия постоянства, можно использовать подтверждение издателя.


Справедливое планирование и предварительная выборка

Возможно, вы заметили, что планирование всё ещё не полностью соответствует нашим требованиям. Например, при наличии двух рабочих и большой разнице в скорости обработки между ними один рабочий будет постоянно занят, а другой почти ничего не делает. Ну, RabbitMQ об этом не знает и продолжает равномерно распределять сообщения.

Эта ситуация возникает потому, что RabbitMQ планирует сообщения только тогда, когда они поступают в очередь. Он не проверяет количество неподтверждённых сообщений у потребителей. Он просто отправляет n-е сообщение n-му потребителю вслепую. Перевод текста с английского на русский язык:

Чтобы решить эту проблему, мы можем использовать базовый метод Qos с предустановленным счётчиком = 1. Это сообщает RabbitMQ не отправлять сразу несколько сообщений одному работнику. Или, другими словами, перед обработкой и подтверждением предыдущего сообщения, не отправлять новое сообщение работнику, а вместо этого перенаправить его другому свободному работнику.

channel.basicQos(1)

Пример: потребительская программа вызывает channel.basicQos (5), затем подписывается на определённую очередь для потребления. RabbitMq сохраняет список потребителей, каждый раз отправляя сообщение, он увеличивает счётчик для соответствующего потребителя. Когда счётчик достигает 5, RabbitMQ прекращает отправку сообщений этому потребителю. После того как потребитель подтверждает обработку сообщения, RabbitMQ уменьшает соответствующий счётчик на 1, и потребитель может продолжать получать сообщения до тех пор, пока счётчик снова не достигнет верхнего предела. Этот механизм можно сравнить с «скользящим окном» в TCP IP.

Подтверждение потребителя (https://www.rabbitmq.com/confirms.html).

Публикация и подписка

RabbitMQ tutorial — Publish/Subscribe — RabbitMQ

Сообщения из обмена (Exchange) отправляются всем очередям, связанным с этим обменом, игнорируя routingKey.

img

Этот тип обмена называется «широковещательным» (fanout).

В приведённом примере обе очереди являются временными очередями, то есть очереди создаются сервером и связаны с уникальным клиентом. Если клиент отключается, очередь автоматически удаляется.

Публикация и подписка очень просты, вот простой пример:

— Подготовьте обмен и очередь.

public static final String EXCHANGE_NAME="dhy-exchange";
public static final String QUEUE_NAME="dhy-queue";
public static final String ROUTING_KEY="dhy";
public static final String TEMP_QUEUE="";
public static final String UNKNOWN_ROUTING_KEY ="unknown";

public Channel prepareChannel() throws IOException, TimeoutException {
    RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml","dhy-connection");
    Channel channel = rabbitmqUtil.createChannel();
    // Объявляем обмен и очередь
    // Непостоянный, не эксклюзивный, неавтоматический обмен типа fanout
    channel.exchangeDeclare(EXCHANGE_NAME, FANOUT, false);
    // Очередь с заданным именем, непостоянная, неэксклюзивная, неавтоматическая
    channel.queueDeclare(TEMP_QUEUE, false, false, false, null);
    // Связываем обмен и очередь
    channel.queueBind(TEMP_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
    log.info("Подготовка канала...");
    return channel;
}

— Потребитель.

image-20220526191148793

— Производитель.

image-20220526191144497

image-20220526191140578

Маршрутизация

RabbitMQ tutorial — Routing — RabbitMQ

Обмен направляет сообщения в очереди на основе ключа маршрутизации.

img

img

img

Это официальные примеры использования прямого обмена с ключами маршрутизации, которые определяют, какие очереди получают сообщения.

Отношения между обменом и очередями можно представить как карту:

Map<List<RouteKey>,List<Queue>> exchange;

Когда обмен получает ключ маршрутизации, ему нужно знать, в какие очереди направить сообщение. Как это сделать?

List<Queue> queues=exchange.get(Arrays.asList(key1,key2...))

Несколько ключей маршрутизации могут указывать на одну и ту же очередь, или один ключ может указывать на несколько очередей, поэтому это отношение «многие ко многим».

Вот пример использования:

— Подготовка констант.

/**
 * Прямой обмен
 */
public static final String DIRECT_EXCHANGE="direct";
/**
 * Очередь 1
 */
public static final String QUEUE_ONE="queue_one";
public static final String ONE_KEY="one";
/**
 * Очередь 2
 */
public static final String QUEUE_TWO="queue_two";
public static final String TWO_KEY="two";

— Код производителя.

```java
@Slf4j
public class Publisher implements Runnable {
    @Override
    public void run() {
        try {
            RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.createChannel();
            // Объявить прямой обмен
            channel.exchangeDeclare(DIRECT_EXCHANGE, DIRECT,false);
            // Отправить два сообщения с ключами one и two
            channel.basicPublish(DIRECT_EXCHANGE,ONE_KEY,false, null,"one".getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(DIRECT_EXCHANGE,TWO_KEY,false, null,"two".getBytes(StandardCharsets.UTF_8));
        } catch (IOException | TimeoutException e  ) {
            log.error("Возникла ошибка: ",e);
        }
    }
}
```

— Код потребителя.

@Slf4j
public class ConsumerOne implements Runnable{
    @Override
    public void run() {
        RabbitmqUtil rabbitmqUtil = null;
        try
``` Вот перевод текста на русский язык:

{ rabbitmqUtil = new RabbitmqUtil("application.yml"); Channel channel = rabbitmqUtil.createChannel(); channel.queueDeclare(QUEUE_ONE, false, false, false, null); //绑定别忘了 channel.queueBind(QUEUE_ONE, DIRECT_EXCHANGE, ONE_KEY); channel.basicConsume(QUEUE_ONE, true, new DefaultConsumer(channel) { @SneakyThrows @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { log.info("Сообщение: {}", new String(body)); } }); } catch (IOException | TimeoutException e) { log.error("Возникло исключение: ", e); } }


```java
@Slf4j
public class ConsumerTwo implements Runnable {
    @Override
    public void run() {
        RabbitmqUtil rabbitmqUtil = null;
        try {
            rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.createChannel();
            channel.queueDeclare(QUEUE_TWO, false, false, false, null);
            channel.queueBind(QUEUE_TWO, DIRECT_EXCHANGE, TWO_KEY);
            channel.basicConsume(QUEUE_TWO, true, new DefaultConsumer(channel) {
                @SneakyThrows
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    log.info("Сообщение: {}", new String(body));
                }
            });
        } catch (IOException | TimeoutException e) {
            log.error("Возникло исключение: ", e);
        }
    }
}
  • Сначала запустите потребителя, затем запустите производителя, потому что необходимо сначала создать очередь, иначе обменник получит сообщение и не найдёт очередь, поэтому он сразу отбросит сообщение.
Thread consumer1 = new Thread(new ConsumerOne(), "Потребитель 1");
Thread consumer2 = new Thread(new ConsumerTwo(), "Потребитель 2");
Thread publisher = new Thread(new Publisher(), "Производитель");
consumer1.start();
consumer2.start();
Thread.sleep(1000);
publisher.start();

Тема

RabbitMQ tutorial — Topics — RabbitMQ

Суть темы

Суть темы заключается в использовании свойств прямого обмена, но прямой обмен имеет недостаток — он не может выполнять нечёткое сопоставление, необходимо точно указать ключ маршрута. Поэтому существует тема, которая предоставляет функцию нечёткого сопоставления.

Ниже мы расскажем, как именно происходит нечёткое сопоставление:

  • Сообщения, отправляемые в тему обмена, должны соответствовать определённым требованиям к ключу маршрута: он должен быть списком слов, разделённых точками. Эти слова могут быть любыми словами, например «stock.usd.nyse», «nyse.vmw», «quick.orange.rabbit». Конечно, этот список слов не должен превышать 255 байт.
  • В этом списке правил есть два подстановочных знака:
    • Можно заменить одно слово.
    • Может заменять ноль или более слов.

Рассмотрим пример:

  • quick.orange.rabbit: принимается очередью Q1Q2.
  • quick.orange.fox: принимается очередью Q1.
  • lazy.brown.fox: принимается очередью Q2.
  • lazy.pink.rabbit: хотя соответствует двум привязкам очереди Q2, будет принято только один раз.
  • quick.orange.male.rabbit: четыре слова не соответствуют ни одной привязке и будут отброшены.

Сравнение:

  • Когда очередь привязана к #, эта очередь будет принимать все данные, что немного похоже на разветвление.
  • Если в привязке очереди нет # и *, то тип привязки этой очереди — прямая.

Практический пример:

  • Подготовка констант:

    /**
      * Тема обмена
      */
     public static final String TOPIC_EXCHANGE="topic";
    
    
     public static final String Q1_QUEUE="Q1";
     public static final String Q1_ROUTE_KEY="*.orange.*";
    
     public static final String Q2_QUEUE="Q2";
     public static final String Q2_ROUTE_KEY1="*.*.rabbit";
     public static final string Q2_ROUTE_KE2="lazy.#";
  • Подготовка производителя:

    @Slf4j
    public class Publisher implements Runnable {
       @Override
       public void run() {
           try {
               RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
               Channel channel = rabbitmqUtil.createChannel();
               // Объявляем тему обмена
               channel.exchangeDeclare(TOPIC_EXCHANGE, TOPIC,false);
               channel.basicPublish(TOPIC_EXCHANGE,"apple.orange.banana",false, null,"q1".getBytes(StandardCharsets.UTF_8));
               channel.basicPublish(TOPIC_EXCHANGE,"dog.pig.rabbit",false, null,"q21".getBytes(StandardCharsets.UTF_8));
               channel.basicPublish(TOPIC_EXCHANGE,"lazy.lazy1",false, null,"q22".getBytes(StandardCharsets.UTF_8));
           } catch (IOException |

TimeoutException e) { log.error("出现异常: ",e); }


```java
@Slf4j
public class ConsumerOne implements Runnable{
    @Override
    public void run() {
        RabbitmqUtil rabbitmqUtil = null;
        try {
            rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.createChannel();
            channel.queueDeclare(Q1_QUEUE,false,false,false,null);
            channel.queueBind(Q1_QUEQUE,TOPIC_EXCHANGE,Q1_ROUTE_KEY);
            channel.basicConsume(Q1_QUEUE,true,new DefaultConsumer(channel){
                @SneakyThrows
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    log.info("消息为: {}",new String(body));
                }
            });
        } catch (IOException | TimeoutException e) {
            log.error("出现异常: ",e);
        }
    }
}
@Slf4j
public class ConsumerTwo implements Runnable{
    @Override
    public void run() {
        RabbitmqUtil rabbitmqUtil = null;
        try {
            rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.createChannel();
            channel.queueDeclare(Q2_QUEUE,false,false,false,null);
            //绑定两个路由key到Q2上
            channel.queueBind(Q2_QUEUE,TOPIC_EXCHANGE,Q2_ROUTE_KEY1);
            channel.queueBind(Q2_QUEUE,TOPIC_EXCHANGE,Q2_ROUTE_KE2);
            channel.basicConsume(Q2_QUEUE,true,new DefaultConsumer(channel){
                @SneakyThrows
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    log.info("消息为: {}",new String(body));
                }
            });
        } catch (IOException | TimeoutException e) {
            log.error("出现异常: ",e);
        }
    }
}

Тестирование

        Thread consumer1 = new Thread(new ConsumerOne(),"消费者1");
        Thread consumer2 = new Thread(new ConsumerTwo(),"消费者2");
        Thread publisher = new Thread(new Publisher(),"生产者");
        consumer1.start();
        consumer2.start();
        publisher.start();

RPC-режим

RPC — это межпроцессное взаимодействие, в отличие от ранее перечисленных режимов, где производитель отправляет сообщение, а потребитель обрабатывает его, то есть однонаправленная коммуникация.

Если вы хотите, чтобы потребитель после получения сообщения мог отправить результат обратно производителю, как это сделать?

Не беспокойтесь, RabbitMQ уже подготовил для нас всё необходимое, давайте посмотрим:

В RabbitMQ реализовать RPC очень просто: клиент отправляет запросное сообщение, сервер отвечает ответным сообщением. Чтобы получить ответ, нам нужно отправить «обратный вызов» вместе с запросом. Мы можем использовать для этого стандартную очередь.

//生成一个临时队列
String queue = channel.queueDeclare().getQueue();
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
//指明回调队列的地址
.replyTo(queue).build();
channel.basicPublish("","rpc_queue",basicProperties,"rpc调用".getBytes(StandardCharsets.UTF_8));

Свойства сообщения:

AMQP 0-9-1 протокол предопределил 14 свойств, которые могут быть предоставлены вместе с сообщением. Большинство свойств используются редко, но следующие свойства являются исключением:

  • deliveryMode: помечает сообщение как постоянное (значение 2) или непостоянное (любое другое значение).
  • contentType: используется для описания кодированного типа mime. Например, для широко используемого кодирования JSON лучше всего установить это свойство на application/json.
  • replyTo: обычно используется для указания очереди обратного вызова.
  • correlationId: используется для связывания ответа RPC с запросом.
Correlation Id

Для каждого запроса RPC создайте отдельную очередь обратного вызова. Это очень неэффективно, и лучше создать отдельную очередь для каждого клиента.

Это поднимает новый вопрос: после получения ответа в этой очереди, мы не знаем, к какому запросу он относится. Вот где используется свойство correlationId. Мы установим его в уникальное значение для каждого запроса. Позже, когда мы получим сообщение в очереди обратного вызова, мы проверим это свойство и сможем сопоставить ответ с запросом. Если мы увидим неизвестное значение correlationId, мы можем безопасно отбросить это сообщение — оно не принадлежит нашему запросу.

Вы можете спросить, почему мы должны игнорировать неизвестные сообщения в очереди обратного вызова вместо того, чтобы потерпеть неудачу и вызвать ошибку? Это связано с тем, что на сервере может возникнуть состояние состязания. Хотя это маловероятно, RPC-сервер может умереть сразу после отправки ответа, но до отправки подтверждения запроса. В случае такого сценария перезапущенный RPC-сервер будет обрабатывать этот запрос снова. Вот почему на стороне клиента мы должны изящно обрабатывать повторяющиеся ответы, и RPC должен быть идемпотентным в идеальном случае. Rpc конкретный рабочий процесс выглядит следующим образом:

  • Для RPC-запроса клиент отправляет сообщение с двумя свойствами: replyTo (устанавливается как анонимная эксклюзивная очередь только для запроса) и correlationId (устанавливается как уникальное значение для каждого запроса).
  • Запрос отправляется в очередь rpc_queue.
  • Рабочий поток RPC (также называемый сервером) ожидает запросов в очереди. Когда появляется запрос, он выполняет задание и использует поле replyTo для отправки сообщения с результатами обратно клиенту.

Практическое использование:

Клиент:

/**
 * Отправляет запрос на сервер и получает ответные данные от сервера.
 */
@Slf4j
public class Client implements Runnable {
    final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
    @Override
    public void run() {
        RabbitmqUtil rabbitmqUtil = null;
        try {
            rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel client = rabbitmqUtil.createChannel();
            String rpcQueue = client.queueDeclare().getQueue();
            log.info("Имя очереди rpc: {}", rpcQueue);
            String uid = UUID.randomUUID().toString();
            AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                // Идентификатор корреляции отправленного сообщения
                .correlationId(uid)
                // Сообщение отправляется в какую очередь
                .replyTo(rpcQueue).build();
            // Использование по умолчанию обмена, ключ маршрутизации — rpc, сбой маршрутизации вызовет интерфейс обратного вызова сообщений
            client.basicPublish("", "rpc_queue", true, props, "rpc".getBytes(StandardCharsets.UTF_8));
            // Мониторинг очереди ответов rpc — возвращает текущий ярлык потребителя
            String ctag = client.basicConsume(rpcQueue, true, (consumerTag, delivery) -> {
                if (delivery.getProperties().getCorrelationId().equals(uid)) {
                    response.offer(new String(delivery.getBody()));
                }
            }, consumerTag -> {});

            // Блокировка до тех пор, пока в очереди не появится сообщение
            String res = response.take();
            // Отмена текущего потребителя — временная очередь будет удалена
            client.basicCancel(ctag);
            log.info("Получен результат rpc: {}", res);
        } catch (IOException | TimeoutException | InterruptedException e) {
            log.error("Возникла исключительная ситуация: ", e);
        }
    }
}

Сервер:

@Slf4j
public class Server implements Runnable {

    @Override
    public void run() {
        RabbitmqUtil rabbitmqUtil = null;
        try {
            rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel server = rabbitmqUtil.createChannel();
            // Объявить очередь связи между клиентом и сервером, это не очередь обратного вызова
            server.queueDeclare(getRpc_queue(), false, false, false, null);
            // Очистить сообщения в очереди
            server.queuePurge(getRpc_queue());
            // Максимальное количество сообщений — 1
            server.basicQos(1);
            // Сообщения в очереди rpc_queue, отправленные клиентом
            server.basicConsume(getRpc_queue(),false,
                    // Обработка сообщений в очереди
                    (consumerTag, delivery) ->{
                        // Подготовка свойств для ответа
                        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                                // Идентификатор корреляции отправляемого сообщения
                                .correlationId(delivery.getProperties().getCorrelationId())
                                .build();
                        String reply = new String(delivery.getBody());
                        log.info("Сообщение клиента: {}", reply);
                        if(reply.equals("rpc")){
                            // Ответное сообщение
                            server.basicPublish("",delivery.getProperties().getReplyTo(),props,"respect!!!".getBytes(StandardCharsets.UTF_8));
                            // Ответить клиенту на отправленное сообщение
                            server.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                        }
                    },consumerTag -> {});
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    private String getRpc_queue() {
        return "rpc_queue";
    }
}

Тестирование:

Thread client = new Thread(new Client());
Thread server=new Thread(new Server());
client.start();
server.start();

Рисунок 20220526191034156

Вопросы эксклюзивности очередей

Если внимательно прочитать статью, у некоторых читателей может возникнуть вопрос: почему сервер всё ещё может отправлять сообщения в эту временную очередь, хотя она является эксклюзивной для текущего клиента?

Эксклюзивность очереди основана на видимости соединения. Различные каналы одного и того же соединения могут одновременно обращаться к одной и той же очереди, созданной этим соединением. RabbitMQ автоматически удаляет эту очередь, независимо от того, была ли она объявлена как Durable (Durable =true).

То есть, даже если клиентская программа объявит эксклюзивную очередь как Durable, RabbitMQ удалит её, как только будет вызван метод Close соединения или клиентская программа завершит работу. Здесь важно отметить, что удаление происходит при разрыве соединения, а не канала.

Причина этого вопроса в том, что не было чёткого понимания работы RabbitMQ. В данном случае мы объявляем по умолчанию обмен, то есть сервер отправляет сообщение на этот обмен и добавляет ключ маршрутизации. Затем обмен направляет сообщение во временную очередь.

Режим подтверждения публикации

Для получения более подробной информации рекомендуется обратиться к официальной документации:

RabbitMQ tutorial — Reliable Publishing with Publisher Confirms — RabbitMQ

Publisher confirms.

Производитель устанавливает канал в режим confirm, и после этого все сообщения, отправленные через этот канал, получают уникальный идентификатор (начиная с 1). После того как сообщение будет доставлено во все соответствующие очереди, брокер отправит подтверждение производителю (включая уникальный идентификатор сообщения). Это позволяет производителю узнать, что сообщение успешно достигло целевой очереди.

Здесь сообщение успешно отправляется на обмен, и брокер сообщает производителю, что отправка прошла успешно, а не потребителю, который подтверждает сообщение и затем вызывает обратный вызов.

Если сообщение и очередь являются постоянными, подтверждение сообщения отправляется после записи сообщения на диск. В подтверждении сообщения от брокера содержится номер последовательности, указывающий на подтверждение сообщения. Кроме того, брокер может установить поле multiple в basic.ack, чтобы указать, что все сообщения до этого номера последовательности были обработаны.

Преимущество режима confirm заключается в его асинхронной природе. После отправки сообщения производитель может продолжать отправку следующего сообщения, ожидая подтверждения канала. Когда сообщение окончательно подтверждается, производитель обрабатывает подтверждение через обратный вызов. Если RabbitMQ теряет сообщение из-за внутренней ошибки, он отправляет nack-сообщение, которое также обрабатывается производителем через обратный вызов.

Этот механизм подтверждения можно сравнить с концепцией подтверждения сообщений в TCP, где асинхронный обратный вызов является распространённым способом реализации асинхронного подхода.

Включение подтверждения публикации

Подтверждение публикации по умолчанию отключено. Для включения необходимо вызвать метод confirm.Select. Каждый раз, когда вы хотите использовать подтверждение публикации, вам нужно вызывать этот метод на канале.

Channel channel = connection.createChannel();
channel.confirmSelect();

Индивидуальное подтверждение публикации

Это простой способ подтверждения, который представляет собой синхронное подтверждение публикации. То есть следующее сообщение не будет отправлено, пока предыдущее не будет подтверждено. Метод waitForConfirmsOrDie(long) возвращает значение только после подтверждения сообщения и генерирует исключение, если сообщение не подтверждается в течение указанного времени.

Основным недостатком этого метода подтверждения является его низкая скорость публикации.

@Slf4j
public class Publisher implements Runnable {
    @Override
    public void run() {
        try {
            RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.prepareChannel();
            //mandatory为true时,消息路由失败,会回调消息回退接口
            channel.addReturnListener(new RouteFailListener());
            //开启发布确认
            channel.confirmSelect();
            long begin = System.currentTimeMillis();
            //单个发布确认
            for (int i = 0; i < 100; i++) {
                channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,true, null,("序号"+i).getBytes(StandardCharsets.UTF_8));
                // 单个消息马上进行发布确认
                boolean flag = channel.waitForConfirms();
                if (flag){
                   log.info("消息发送成功");
                }
            }
           log.info("总耗时: {}",System.currentTimeMillis()-begin);
        } catch (IOException | TimeoutException | InterruptedException e  ) {
            log.error("出现异常: ",e);
        }
    }
}

Пакетное подтверждение

Предыдущий метод очень медленный. По сравнению с индивидуальным подтверждением, пакетное подтверждение значительно увеличивает пропускную способность. Однако этот метод также имеет недостаток: если возникает ошибка, приводящая к проблемам с публикацией, неизвестно, какое сообщение вызвало проблему. Необходимо сохранить всю партию в памяти для последующего повторного выпуска сообщений. Конечно, этот подход всё ещё синхронный и блокирует публикацию сообщений.

@Slf4j
public class Publisher implements Runnable {
    @Override
    public void run() {
        try {
            RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.prepareChannel();
            //mandatory为true时,消息路由失败,会回调消息回退接口
            channel.addReturnListener(new RouteFailListener());
            //开启发布确认
            channel.confirmSelect();
            long begin = System.currentTimeMillis();
            //单个发布确认
            for (int i = 0; i < 100; i++) {
                channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,true, null,("序号"+i).getBytes(StandardCharsets.UTF_8));
                if (i%10 == 0){
                    channel.waitForConfirms();
                }
            }
           log.info("总耗时: {}",System.currentTimeMillis()-begin);
        } catch (IOException | TimeoutException | InterruptedException e  ) {
            log.error("出现异常: ",e);
        }
    }
}

Асинхронное подтверждение

Асинхронное подтверждение реализуется через другой поток, вызывающий метод обратного вызова для подтверждения. Этот метод прост в реализации и не блокирует текущий выполняющийся поток. Рекомендуется использовать этот метод.

@Slf4j
public class Publisher implements Runnable {
    @Override
    public void run() {
        try {
            RabbitmqUtil rabbitmqUtil = new
``` ```
RabbitmqUtil("application.yml");
Channel channel = rabbitmqUtil.prepareChannel();
//mandatory为true时,消息路由失败,会回调消息回退接口
channel.addReturnListener(new RouteFailListener());
//开启发布确认
channel.confirmSelect();

// 消息确认成功回调函数
/*
 * 参数1:消息的标记
 * 参数2:是否为批量确认
 * */
ConfirmCallback ackCallback = (deliveryTag, multiply) -> {
    System.out.println("确认的消息:" + deliveryTag);
    System.out.println("是否为批量确认:  " + (multiply ? "YES" : "NO"));
};

// 消息确认失败回调函数--nack-ed的消息(客户端拒绝接受,或者路由失败的消息)
ConfirmCallback nackCallback = (deliveryTag, multiply) -> {
    String message = outstandingConfirms.get(deliveryTag);
    System.out.println("未确认的消息是:" + message + "未确认的消息tag:" + deliveryTag);
};

// 准备消息的监听器,监听哪些消息成功,哪些消息失败
/*
 * 参数1:监听哪些消息成功
 * 参数2:监听哪些消息失败
 * */
channel.addConfirmListener(ackCallback, nackCallback);

long begin = System.currentTimeMillis();
//单个发布确认
for (int i = 0; i < 100; i++) {
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, null, ("序号" + i).getBytes(StandardCharsets.UTF_8));
}
log.info("总耗时: {}", System.currentTimeMillis() - begin);
} catch (IOException | TimeoutException e) {
    log.error("出现异常: ", e);
}
``` **outstandingConfirms.put(channel.getNextPublishSeqNo(),"序号"+i);**

**log.info("总耗时: {}",System.currentTimeMillis()-begin);**

В запросе представлен фрагмент кода на языке Java, который выполняет операции с очередями и обменом сообщениями. В тексте запроса нет технических терминов или специфических понятий, которые требовали бы перевода.

Текст запроса представляет собой описание процесса отправки сообщений через очереди и обмен в системе RabbitMQ. Автор описывает различные сценарии работы с сообщениями, включая успешную отправку, неудачную отправку из-за отсутствия канала или сети, а также асинхронные вызовы. Также упоминается использование тайм-аутов для сообщений и настройка очередей с ограничением по времени жизни (TTL).

Запрос не содержит специфической технической терминологии, которая требовала бы перевода, поэтому перевод выполнен дословно. 但是 не гарантируется, что после истечения срока очередь будет удалена с такой скоростью.

Когда сервер перезапускается, аренда очереди будет возобновлена.

> Срок действия в миллисекундах и должен быть положительным целым числом (в отличие от сообщения TTL, он не может быть равен 0).

- Очередь истекает через 30 минут бездействия.
```java
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

Очередь недоставленных сообщений

Очередь недоставленных сообщений — RabbitMQ

— Очередь недоставленных сообщений, как следует из названия, представляет собой очередь сообщений, которые не могут быть использованы. Это буквально означает, что производитель отправил сообщение брокеру или непосредственно в очередь, но потребитель не смог его использовать. Такие сообщения обычно называют недоставленными сообщениями. — Сценарии использования: для обеспечения того, чтобы данные сообщений о заказах не были потеряны, необходимо использовать механизм очереди недоставленных сообщений RabbitMQ. Когда происходит сбой потребления сообщения, сообщение отправляется в очередь недоставленных сообщений. Например, пользователь успешно размещает заказ в магазине и переходит к оплате, но оплата не проходит в течение указанного времени, заказ автоматически аннулируется.

Источники недоставленных сообщений:

  1. Истечение срока действия сообщения.
  2. Достижение максимальной длины очереди (очередь заполнена, и новые данные не могут быть добавлены в MQ).
  3. Сообщение отклонено (basic.reject или basic.nack), и requeue=false.

Обратите внимание, что истечение срока действия очереди не приводит к тому, что сообщения в очереди становятся недоставленными.

Обработка недоставленных сообщений:

  • Отбросить, если это не критично, можно выбрать отбрасывание.
  • Записать недоставленное сообщение в базу данных для последующего анализа или обработки.
  • Обработать недоставленные сообщения через очередь недоставленных сообщений с помощью приложения, ответственного за мониторинг недоставленных сообщений.

image-20220526190926955

Настройка очереди недоставленных сообщений:

  • Настроить бизнес-очередь, привязанную к бизнес-обмену.
  • Для бизнес-очереди настроить обмен недоставленными сообщениями и ключ маршрутизации.
  • Настроить очередь недоставленных сообщений для обмена недоставленными сообщениями.

Обратите внимание, что не создаётся общая очередь недоставленных сообщений, и все недоставленные сообщения автоматически отправляются в эту очередь. Вместо этого для каждой бизнес-очереди, которая должна использовать недоставленные сообщения, настраивается отдельный обмен недоставленными сообщениями, где один проект может совместно использовать один обмен недоставленными сообщениями, а затем каждому бизнес-процессу назначается отдельный ключ маршрутизации.

После настройки обмена недоставленными сообщениями и ключа маршрутизации следующим шагом является настройка очереди недоставленных сообщений и привязка её к обмену недоставленными сообщениями. То есть очередь недоставленных сообщений не является специальной очередью, это просто очередь, связанная с обменом недоставленными сообщениями. Обмен недоставленными сообщениями также не является специальным обменом, это просто обмен, используемый для получения недоставленных сообщений, поэтому он может быть любого типа [Direct, Fanout, Topic].

Обычно для каждого бизнес-процесса выделяется отдельный ключ маршрутизации, который соответствует настроенной очереди недоставленных сообщений. Другими словами, обычно для каждого важного бизнес-процесса настраивается отдельная очередь недоставленных сообщений.

Обмен недоставленными сообщениями и очередь недоставленных сообщений являются обычными обменами и очередями соответственно, за исключением того, что при объявлении обычной очереди указывается, на какой обмен будут отправляться недоставленные сообщения этой очереди, и этот обмен называется обменом недоставленными сообщениями. Затем обмен недоставленными сообщениями отправляет сообщения в связанную очередь недоставленных сообщений на обработку.

Практика:

  • Подготовка издателя:
@Slf4j
public class Publisher implements Runnable {
    @Override
    public void run() {
        try {
            RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.createChannel();
            declare(channel);
            // Отправляем одно сообщение в очередь
            for (int i = 0; i < 10; i++) {
                channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,("dead message"+i).getBytes(StandardCharsets.UTF_8));
            }
        } catch (IOException | TimeoutException e  ) {
            log.error("Возникла исключительная ситуация: ",e);
        }
    }

    public void declare(Channel channel) throws IOException {
       // Объявляем обмен недоставленными сообщениями
        channel.exchangeDeclare(DEAD_EXCHANGE,DIRECT,false,true,null);
        // Объявляем очередь недоставленных сообщений
        channel.queueDeclare(DEAD_QUEUE,false,false,true,null);
        // Связываем обмен недоставленными сообщениями и очередь недоставленных сообщений
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,DEAD_KEY);

        // Устанавливаем свойства обычной очереди
        HashMap<String, Object> arguments = new HashMap<>();
        // Указываем текущий обмен недоставленными сообщениями обычной очереди
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        // Указываем ключ маршрутизации недоставленного сообщения
        arguments.put("x-dead-letter-routing-key",DEAD_KEY);
        // Указываем время жизни сообщения в очереди -- 5s
        arguments.put("x-message-ttl",5000);

        // Объявление обычного обмена
        channel.exchangeDeclare(EXCHANGE_NAME,DIRECT,false,true,null);
        // Объявление обычной очереди
        channel.queueDeclare(QUEUE_NAME,false,false,true,arguments);
        // Привязываем обычный обмен и обычную очередь
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
    }
}
  • Подготовка потребителя очереди недоставленных сообщений:

Здесь мы не предоставляем потребителя обычной очереди, потому что наша цель состоит в том, чтобы сообщения обычной очереди не использовались, а затем отправлялись в очередь недоставленных сообщений после истечения срока их действия.

@Slf4j
public class DeadConsumer implements Runnable{
    @Override
    public void run() {
        try {
            RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.createChannel();
            // Непосредственно потребляем сообщения - все операции объявления уже выполнены в части производителя
            channel.basicConsume(DEAD_QUEUE,false,(consumerTag, message) -> {
                log.info("Сообщение: {}",new String(message.getBody()));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),true);
            },consumerTag -> {});
        } catch (IOException | TimeoutException e  ) {
            log.error("Произошла исключительная ситуация: ",e);
        }
    }
}
  • Тест:
        Thread deadConsumer = new Thread(new DeadConsumer());
        Thread publisher = new Thread(new Publisher());
        publisher.start();
       // Ждём некоторое время для объявления и создания обмена и очередей
        Thread.sleep(2000);
        deadConsumer.start();
``` **Текст запроса:**

После истечения срока действия сообщения:

*Рисунок 1.*

В недоставленных сообщениях будет содержаться информация заголовка x-death, которая указывает, как было создано недоставленное сообщение:

```java
log.info("Печать заголовка: {}",message.getProperties().getHeaders());

Вывод:

Печать заголовка: {
    x-first-death-exchange=dhy-exchange, 
    x-death=[{
        reason=expired, count=1, exchange=dhy-exchange, time=Sat May 21 21:38:45 CST 2022, 
        routing-keys=[dhy], queue=dhy-queue
             }], 
    x-first-death-reason=expired, 
    x-first-death-queue=dhy-queue
     }

О причине есть четыре значения:

— rejected: сообщение было отклонено с параметром requeue, установленным в значение false;

— expired: срок действия сообщения message TTL истёк;

— maxlen: превышена максимальная разрешённая длина очереди;

— delivery_limit: сообщение возвращалось больше раз, чем предел (установленный аргументом политики delivery-limit очередей кворума).


AMQP.BasicProperties

Здесь приводится описание:

String contentType, //тип содержимого сообщения
String contentEncoding, //формат кодирования содержимого сообщения
Map<String,Object> headers,//header типа обмена можно использовать
Integer deliveryMode,//сообщение сохраняется 1 не сохраняется 2 сохраняется
Integer priority,//приоритет
String correlationId, //связанный идентификатор
String replyTo,//обычно используется для именования очереди обратного вызова
String expiration,//установить время истечения недоставленного сообщения
String messageId, //идентификатор сообщения
Date timestamp, //отметка времени сообщения
String type,  //тип
String userId, //ID пользователя
String appId, //ID приложения
String clusterId //ID кластера

Ограничение максимального количества сообщений в очереди

Ограничение длины очереди — RabbitMQ

Мы можем использовать командную строку или кодирование для установки ограничения на максимальное количество сообщений в очереди. Здесь я говорю только о кодировании, вы можете обратиться к официальному документу для получения подробной информации о командной строке.

Поведение по умолчанию при переполнении очереди сообщениями:

По умолчанию очередь не имеет ограничения на количество сообщений, но если мы установим его, то как только количество сообщений в очереди превысит лимит, сообщения в начале очереди будут отброшены или станут недоставленными (выбор зависит от того, существует ли недоставленный обменник).

Причина отбрасывания сообщений в начале очереди заключается в том, что сообщения в начале являются самыми старыми, а новые сообщения помещаются в конец очереди.

Во всех случаях используется количество сообщений в состоянии готовности; сообщения пользователей, которые не были подтверждены, не учитываются.

Мы можем настроить стратегию переполнения, здесь она не обсуждается, вы можете посмотреть ссылку на официальный документ, предоставленную выше.

  • Установить ограничение на сохранение 10 сообщений одновременно в текущей очереди:
Map<String, Object> args = new HashMap<String, Object>();
//max-length-bytes можно установить общее количество байтов сообщений, которое может сохранить текущая очередь--конкретно обратитесь к официальному документу
//overflow можно использовать для настройки стратегии переполнения--конкретно обратитесь к официальному документу
args.put("x-max-length", 10);
channel.queueDeclare("myqueue", false, false, false, args);

Очередь приоритетов

Поддержка очереди приоритетов — RabbitMQ

Очередь приоритетов упорядочивает сообщения в очереди в соответствии с приоритетом, поэтому она называется очередью приоритетов.

Любая очередь может использовать клиентские дополнительные параметры для преобразования в очередь приоритетов (нельзя использовать динамическую настройку стратегии, о которой будет сказано ниже). Официальная рекомендация — максимальный приоритет 255, рекомендуется диапазон 0–10.


Установка приоритета очереди

  • x-max-priority: параметр должен быть целым числом от 1 до 255 и указывать максимальный поддерживаемый приоритет очереди.
Channel ch = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
ch.queueDeclare("my-priority-queue", true, false, false, args);

Затем издатель может указать приоритет сообщения при публикации следующим образом:

            Channel channel = rabbitmqUtil.createChannel();
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
                   //указать текущий приоритет отправляемого сообщения
                    .priority(5)
                    .build();
            channel.basicPublish("",QUEUE_NAME,basicProperties,"hello".getBytes(StandardCharsets.UTF_8));

Примечание

  • Каждый приоритет в каждой очереди имеет некоторые затраты памяти и диска. Также существуют дополнительные затраты процессора, особенно при потреблении, поэтому лучше не устанавливать слишком большой диапазон приоритетов для очереди.
  • Сообщения без атрибута приоритета будут рассматриваться как имеющие приоритет 0. Сообщения с приоритетом выше максимального приоритета очереди будут рассматриваться как опубликованные с максимальным приоритетом.

Официальная рекомендация:

Если вам нужна очередь приоритетов, мы рекомендуем использовать диапазон от 1 до 10. В настоящее время использование большего количества приоритетов требует использования большего количества процессов Erlang для потребления дополнительных ресурсов ЦП. План выполнения также будет затронут.


Взаимодействие потребителя с очередью приоритетов

Обычно потребитель может получать большое количество сообщений до подтверждения любого сообщения, ограничиваясь только сетевым давлением.

Поэтому, если такой голодный потребитель подключится к пустой очереди, сообщения, опубликованные впоследствии в эту очередь, могут вообще не ждать в очереди какое-то время. В этом случае сообщения приоритетов не получат возможности сортировать их по приоритету.

В большинстве случаев вы хотите использовать метод basic.qos для ограничения количества сообщений, которые могут быть отправлены в любое время для передачи, чтобы позволить сообщениям быть отсортированными по приоритетам в ручном режиме подтверждения потребителя.


Сообщение с истекшим сроком действия и ограничение длины очереди

  • Сообщение может узнать, истекло ли оно, только когда оно достигнет начала очереди, поэтому обычная очередь отличается тем, что даже если установлен срок действия отдельного сообщения в очереди, сообщение с низким приоритетом может быть заблокировано сообщением с высоким приоритетом, которое ещё не истекло. Эти сообщения никогда не будут переданы, но они будут отображаться в статистике очереди.

  • Как обычно, очередь с ограничением длины будет отбрасывать сообщения с начала, чтобы принудительно применить ограничение. Это означает, что более высокие сообщения приоритета могут быть отброшены, чтобы освободить место для сообщений с более низким приоритетом.


Почему не поддерживается динамическая настройка стратегии

Самый удобный способ определить параметры для очереди — это использовать стратегию. Стратегия — это конфигурация TTL (Time To Live), ограничения длины очереди и другие рекомендуемые методы для необязательных параметров очередей.

Однако стратегия не может быть использована для настройки приоритета, поскольку стратегия динамична и может быть изменена после объявления очереди. Приоритет очереди после её объявления изменить невозможно, поэтому использование стратегии не является безопасным вариантом.

Потребительский приоритет

Потребительский приоритет позволяет гарантировать, что высокоприоритетные потребители будут получать сообщения, когда они активны, а сообщения будут отправляться низкоприоритетным потребителям, если высокоприоритетный потребитель заблокирован. Обычно подключённые к очереди активные потребители получают сообщения от этой очереди по принципу циклического перебора. Если существует несколько активных потребителей с одинаковым высоким приоритетом, то сообщения передаются им также по принципу циклического перебора.

Активные потребители

Активные потребители — это потребители, которые могут получать сообщения без ожидания. Если потребитель не может получить сообщение, он блокируется, так как его канал достиг максимального количества неподтверждённых сообщений после отправки basic.qos или просто из-за сетевой перегрузки.

Установка потребительского приоритета

Channel channel = ...;
Consumer consumer = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10);
channel.basicConsume("my-queue", false, args, consumer);

Резервный обменник

mandatory

— TRUE: сообщение маршрутизируется в очередь, но происходит сбой, вызывается интерфейс возврата сообщения. — FALSE: сообщение маршрутизируется в очередь, но происходит сбой, сообщение перенаправляется на резервный обменник.

Если mandatory имеет значение false, сообщение будет перенаправлено на резервный обменник, который в данном случае является резервным обменником.

Зачем нужен резервный обменник?

С помощью параметра mandatory и обратного сообщения мы можем обнаружить и обработать ситуацию, когда сообщение не может быть доставлено. Однако иногда мы не знаем, как обрабатывать такие сообщения, и можем только записать журнал и вызвать оповещение, а затем вручную обработать их.

Обработка таких сообщений через журнал не является оптимальным решением, особенно когда сервис имеет несколько машин, ручное копирование журнала становится более сложным и подверженным ошибкам.

Установка параметра mandatory также усложняет производителя, добавляя логику обработки возвращённых сообщений.

Как решить эту проблему, не теряя сообщения и не усложняя производителя?

Ранее при настройке мёртвой очереди упоминалось, что можно настроить мёртвую очередь для хранения сообщений, которые не удалось обработать. Однако эти сообщения не имеют возможности попасть в очередь и, следовательно, не могут использовать мёртвую очередь.

В RabbitMQ есть механизм резервного обменника, который может эффективно решать эту проблему.

Что такое резервный обменник?

Резервный обменник можно рассматривать как «запасной вариант» для обмена в RabbitMQ. Когда мы объявляем соответствующий резервный обменник для определённого обмена, мы создаём запасной вариант для него. Когда обмен получает сообщение, которое не может быть маршрутизировано, оно перенаправляет его на резервный обменник. Резервный обменник отвечает за дальнейшую маршрутизацию и обработку сообщения, обычно используя тип обмена Fanout, чтобы отправить все сообщения в связанные очереди, после чего мы привязываем очередь к резервному обмену. Все сообщения, которые исходный обмен не смог маршрутизировать, попадают в эту очередь.

После того как резервный обменник получает сообщение, которое исходный обменник не смог маршрутизировать, мы можем привязать к нему резервную очередь и очередь оповещения, чтобы сохранить эти сообщения и оповестить об этом администратора.

Использование

Поскольку резервный обменник является «запасом» для определённого обмена, нам нужно указать имя резервного обменника при объявлении этого обмена.

Издатель

@Slf4j
public class Publisher implements Runnable {
    @Override
    public void run() {
        try {
            RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.createChannel();
            Map<String, Object> args = new HashMap<String, Object>();
            // Указываем имя резервного обменника my-ae через атрибут
            args.put("alternate-exchange", "my-ae");
            // Резервный обменник my-direct называется my-ae
            channel.exchangeDeclare("my-direct", "direct", false, false, args);
            // my-ae обычно является обменником Fanout
            channel.exchangeDeclare("my-ae", "fanout");
            // Привязываем очередь temp к резервному обменнику
            channel.queueDeclare("temp",false,false,true,null);
            channel.queueBind("temp","my-ae","temp");
            channel.addReturnListener(new RouteFailListener());
            // Устанавливаем manatory в true
            channel.basicPublish("my-direct", "no", true, null, "hello".getBytes(StandardCharsets.UTF_8));
        } catch (IOException | TimeoutException e) {
            log.error("Возникла исключительная ситуация: ", e);
        }
    }
}

Потребитель temp

@Slf4j
public class ConsumerOne implements Runnable{
    @Override
    public void run() {
        RabbitmqUtil rabbitmqUtil = null;
        try {
            rabbitmqUtil = new RabbitmqUtil("application.yml");
            Channel channel = rabbitmqUtil.createChannel();

            channel.basicConsume("temp",true,new DefaultConsumer(channel){
                @SneakyThrows
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    log.info("Сообщение: {}",new String(body));
                }
            });
        } catch (IOException | TimeoutException e) {
            log.error("Возникло исключение: ",e);
        }
    }
}

Тест

Сообщение, которое исходный обменник не может маршрутизировать, попадает в резервную очередь.

Рабочий процесс

Когда обменник сталкивается с сообщением, которое он не может маршрутизировать, он сначала пытается получить помощь от резервного обменника. Если резервный обменник существует, он перенаправляет сообщение на него. В противном случае он записывает предупреждение в журнал или вызывает интерфейс возврата сообщений (manatory имеет значение true).

Если резервный обменник также не может маршрутизировать сообщение, он ищет резервный обменник текущего резервного обменника.

Кому подчиняются резервный обменник и manatory?

Ответ уже дан в тесте: резервному обменнику. Использование любых промежуточных сообщений может привести к потере некоторых сообщений.

В случае с RabbitMQ это может произойти из-за того, что производитель или потребитель разорвали соединение с RabbitMQ, и они используют разные механизмы подтверждения; также это может быть связано с различными стратегиями пересылки между обменником и очередью; кроме того, обменник может не быть связан ни с одной очередью, а производитель не знает об этом или не предпринимает соответствующих мер. Кроме того, кластерная стратегия RabbitMQ сама по себе может привести к потере сообщений.

В таких случаях необходим эффективный механизм отслеживания и регистрации процесса доставки сообщений, чтобы помочь разработчикам и администраторам в определении проблем.

FireHose

Для отслеживания сообщений в RabbitMQ можно использовать функцию Firehose. Firehose регистрирует каждое отправленное или полученное сообщение, что облегчает отладку и устранение неполадок при использовании RabbitMQ.

Принцип работы Firehose заключается в том, что сообщения, отправляемые производителем в RabbitMQ или получаемые потребителем от RabbitMQ, отправляются на стандартный обменник с именем amq.rabbitmq.trace. Имя этого обменника — topic. Сообщения, отправленные на этот обменник, имеют routingKey publish.exchangename и deliver.queuename, где exchangename и queuename — это фактические имена обменника и очереди соответственно, соответствующие сообщениям, отправленным производителем в обменник, и сообщениям, полученным потребителем из очереди.

Примечание: включение трассировки влияет на производительность записи сообщений. После включения рекомендуется отключить её.

# Включить
rabbitmqctl trace_on [-p vhost] 
# Выключить
rabbitmqctl trace_off [-p vhost]

Firehose по умолчанию выключен и не является постоянным, он возвращается в состояние по умолчанию после перезапуска службы RabbitMQ. Включение Firehose может повлиять на производительность сервиса, поскольку это приводит к дополнительным сообщениям, маршрутизации и хранению.

После включения функции FireHose каждое отправленное сообщение будет регистрировать одно сообщение журнала в стандартном обменнике, которое затем отправляется в связанную очередь (по умолчанию у этого обменника нет связанной очереди).

rabbitmq_tracing плагин

Плагин rabbitmq_tracing работает аналогично Firehose, но предоставляет более удобный графический интерфейс для управления и использования.

Чтобы включить плагин:

rabbitmq-plugins enable rabbitmq_tracing

«Format» указывает формат журнала сообщений, который может быть Text или JSON. Text формат удобен для чтения человеком, а JSON формат удобен для анализа программой.

JSON формат полезной нагрузки (сообщения) по умолчанию кодируется с помощью Base64. Например, «trace test payload.» будет закодирован как «dHJhY2UgdGVzdCBwYXlsb2FkLg==».

«Max payload bytes» указывает максимальный размер каждого сообщения в байтах. Если установлено значение 10, то любое сообщение размером более 10 байт будет усечено при регистрации в файле трассировки. Например, текстовое сообщение «trace test payload.» будет усечено до «trace test».

Pattern используется для настройки соответствия шаблону, аналогично Firehose. Например, «#» соответствует всем входящим и исходящим сообщениям, то есть регистрирует все сообщения, отправленные или полученные клиентами. «publish.#» соответствует только входящим сообщениям. «deliver.#» соответствует только исходящим сообщениям.

Ленивая очередь

RabbitMQ представил концепцию ленивой очереди начиная с версии 3.6.0. Ленивая очередь стремится хранить сообщения на диске, а не в памяти, пока потребитель не получит их. Это особенно полезно в ситуациях, когда потребитель временно недоступен из-за различных причин, таких как отключение, сбой или обслуживание, что может привести к накоплению сообщений.

По умолчанию, когда производитель отправляет сообщение в RabbitMQ, оно хранится в памяти, чтобы обеспечить быструю доставку потребителю. Даже постоянные сообщения сохраняются в памяти в качестве резервной копии во время записи на диск. Когда RabbitMQ необходимо освободить память, сообщения из памяти переносятся на диск, что занимает значительное время и может блокировать операции очереди, препятствуя получению новых сообщений. Хотя разработчики RabbitMQ постоянно улучшают алгоритмы, результаты всё ещё далеки от идеальных, особенно при большом объёме сообщений.

Существует два режима очередей: default и lazy. По умолчанию используется режим default, и никаких изменений не требуется до версии 3.6.0. Режим lazy представляет собой ленивую очередь и может быть установлен при объявлении канала.queueDeclare или через Policy. Если очередь настроена с использованием обоих методов, Policy имеет более высокий приоритет. Для изменения режима существующей очереди необходимо сначала удалить её, а затем повторно объявить новую.

При объявлении очереди можно установить режим с помощью параметра «x-queue-mode», который принимает значения «default» и «lazy». Пример ниже демонстрирует детали объявления ленивой очереди:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode","lazy");
channel.queueDeclare( "myqueue", false, false, false,args);

Сравнение использования памяти показывает, что для отправки 1 миллиона сообщений, каждое размером примерно 1 КБ, обычная очередь использует 1,2 ГБ памяти, в то время как ленивая очередь потребляет всего 1,5 МБ.

Интеграция Springboot

Создание проекта Springboot и внедрение стартера RabbitMQ для интеграции Springboot.

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

Настройка информации о подключении.

  spring:
    rabbitmq:
      host: 服务器地址
      #通信端口
      port: 5672
      username: guest
      password: guest
      #如果没有修改的话,默认为/,可以不写
      virtual-host: /

Добавление очереди.

  @Configuration
  public class RabbitConfig {
     public static final String QUEUE_NAME="order_queue";
  
      /**
       * 参数介绍:
       *   1.队列名
       *   2.队列持久化
       *   3.队列是否具有排他性,有排他性的队列只能被创建的Connection处理
       *   4.如果队列没有消费теля,那么是否自动删除队列
       */
     @Bean
     public Queue queue(){
``` **Добавление потребителя, слушающего очередь**

```java
@Component
@Slf4j
public class RabbitConsumerListener {
    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void handlerMsg(String msg){
      log.info("Получено сообщение из очереди: {}",msg);  
    }
}

Запуск проекта

Клиент устанавливает соединение с каналом и RabbitMQ.

Тестирование помещения сообщения в очередь.

Просмотр вывода потребителя.


Отложенная очередь

Отложенная очередь — это упорядоченная очередь, наиболее важная характеристика которой заключается в её свойстве задержки. Элементы в отложенной очереди должны быть обработаны в определённое время после или до указанного срока. Проще говоря, отложенная очередь используется для хранения элементов, которые должны быть обработаны в определённый момент времени.


Применение

  1. Если заказ не оплачен в течение десяти минут, он автоматически отменяется.
  2. Если новый магазин не загрузил товары в течение десяти дней, отправляется автоматическое напоминание.
  3. Если пользователь успешно зарегистрировался, но не вошёл в систему в течение трёх дней, ему отправляется SMS-уведомление.
  4. Если пользователь инициирует возврат средств, и они не обрабатываются в течение трёх дней, соответствующий операционный персонал уведомляется об этом.
  5. После бронирования встречи необходимо уведомить всех участников за десять минут до назначенного времени встречи.

Все эти сценарии имеют общую черту: после или перед определённым событием должна быть выполнена задача в установленный срок.


Практика

Создаются две очереди QA и QB с TTL (временем жизни) 10 секунд и 40 секунд соответственно. Затем создаются обмен X и обмен недоставленных сообщений Y, оба типа direct. Создаётся очередь недоставленных сообщений QD, которая связана с ними следующим образом:

Константы:

public class RabbitmqConstants {

    //--------------------EXCHANGE--------------------------

    public static final String X_EXCHANGE = "X";
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

    //--------------------QUEUE--------------------------

    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    public static final String DEAD_LATTER_QUEUE = "QD";

    //------------------ROUTE_KEY-----------------------------

    public static final String ROUTE_KEY_A="XA";
    public static final String ROUTE_KEY_B="XB";
    public static final String ROUTE_KEY_DEAD="YD";
}

Обмен и подготовка очередей:

@Configuration
public class RabbitConfig {

   //--------------------EXCHANGE--------------------------

   @Bean("xExchange")
   public DirectExchange xExchange()
   {
      return new DirectExchange(X_EXCHANGE);
   }

   @Bean("yExchange")
   public DirectExchange yExchange()
   {
      return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
   }

   //--------------------QUEUE--------------------------

   @Bean("queueA")
   public Queue queueA(){
      Map<String, Object> arguments = new HashMap<>(3);
      //устанавливаем обмен недоставленными сообщениями
      arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
      //устанавливаем ключ маршрутизации для недоставленных сообщений
      arguments.put("x-dead-letter-routing-key",ROUTE_KEY_DEAD);
      //устанавливаем TTL в миллисекундах ----сообщения в этой очереди истекают через 10 с
      arguments.put("x-message-ttl",10000);
      return QueueBuilder.nonDurable(QUEUE_A).withArguments(arguments).build();
   }


   @Bean("queueB")
   public Queue queueB(){
      Map<String, Object> arguments = new HashMap<>(3);
      //устанавливаем обмен недоставленными сообщениями
      arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
      //устанавливаем ключ маршрутизации для недоставленных сообщений
      arguments.put("x-dead-letter-routing-key",ROUTE_KEY_DEAD);
      //устанавливаем TTL в миллисекундах ----сообщения в этой очереди истекают через 40 с
      arguments.put("x-message-ttl",40000);
      return QueueBuilder.nonDurable(QUEUE_B).withArguments(arguments).build();
   }


   @Bean("queueD")
   public Queue queueD(){
      return QueueBuilder.nonDurable(DEAD_LATTER_QUEUE).build();
   }

   //--------------------bind--------------------------

   @Bean
   public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                 @Qualifier("xExchange") DirectExchange xExchange){
      //связываем очередь A с обменом X
      return BindingBuilder.bind(queueA).to(xExchange).with(ROUTE_KEY_A);
   }


   @Bean
   public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                 @Qualifier("xExchange") DirectExchange xExchange){
      //связываем очередь B с обменом x
      return BindingBuilder.bind(queueB).to(xExchange).with(ROUTE_KEY_B);
   }


   @Bean
   public Binding queueDBindingX(@Qualifier("queueD") Queue queueD,

``` **Запрос 1:**

@Qualifier("yExchange") DirectExchange yExchange){ //死信队列D和死信交换机绑定 return BindingBuilder.bind(queueD).to(yExchange).with(ROUTE_KEY_DEAD); } }


**Перевод:**

@Квалификатор («yExchange») DirectExchange yExchange) {
    // Привязка очереди мёртвых писем D и обменника мёртвых писем
    return BindingBuilder.привязать (очередьD) .к (yExchange) .с (ROUTE_KEY_DEAD);
}

**Запрос 2:**

- 消费者准备

```java
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = DEAD_LATTER_QUEUE)
    public void receiveD(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}", new Date().toString(), msg);
    }
}

Перевод:

— Потребитель готов

Java

@Slf4j @Компонент общественный класс DeadLetterQueueConsumer {

@RabbitListener (очереди = DEAD_LATTER_QUEUE)
общественная пустота receiveD (сообщение сообщения, канал канала) выбрасывает исключение {
    Строка msg = новая строка (message.getbody ());
    журнал.информация («текущее время: {}, получено сообщение из очереди мертвых писем: {}», новая дата (). toString (), msg);
}

}

Запрос 3:

  • 生产者准备
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/ttl")
public class RabbitmqController {
    private final RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend(X_EXCHANGE,ROUTE_KEY_A,"消息来自TTL为10s的队列:" + message);
        rabbitTemplate.convertAndSend(X_EXCHANGE,ROUTE_KEY_B,"消息来自TTL为40s的队列:" + message);
    }
}

Перевод:

— Производитель готов

Java

@Slf4j @Рестконтроллер @Requiredargsconstructor @RequestMapping ("/ ttl") публичный класс Rabbitmqcontroller { частный окончательный Rabbittemplate RabbitTemplate;

@Getmapping ("/ sendmsg / {сообщение}")
публичная пустота sendmsg (@pathvariable строка сообщения) {
    журнал.информация ("текущее время: {}, отправка информации в две очереди ttl: {}", новая дата (). toString (), сообщение);
    RabbitTemplate.ConvertAndSend (x_exchange, route_key_a, "сообщение из очереди ttl 10 s: " + сообщение);
    RabbitTemplate.ConvertAndSend (x_exchange, route_key_b, "сообщение из очереди ttl 40 s: " + сообщение);
}

}

Запрос 4:

  • Тестирование

Первая строка сообщений становится сообщением о недоставке через 10 секунд, а затем потребляется потребителем. Вторая строка сообщений становится сообщением о недоставке через 40 секунд, а затем потребляется. Таким образом, очередь с задержкой была создана.

Однако, если вы используете этот метод, вам придётся добавлять новую очередь каждый раз, когда вам нужно увеличить время задержки. Здесь есть только варианты времени 10 и 40 секунд. Если вам нужна задержка в один час, вам потребуется добавить очередь с задержкой в один час. Если это для уведомления о встрече, вам понадобится бесконечное количество очередей.


Оптимизация

В предыдущем примере мы установили TTL для каждой очереди сообщений. На самом деле, чтобы упростить настройку, мы можем установить время истечения срока действия для каждого сообщения.

Здесь мы добавляем новую очередь и устанавливаем время истечения срока действия каждого сообщения при отправке сообщения.

  • Добавление констант
  public static final String QUEUE_C = "QC";
  public static final String ROUTE_KEY_C = "XC";
  • Добавление конфигурации
   @Bean("queueC")
   public Queue queueC(){
      Map<String, Object> arguments = new HashMap<>(3);
      //设置死信交换机
      arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
      //设置死信Routing-key
      arguments.put("x-dead-letter-routing-key",ROUTE_KEY_DEAD);
      return QueueBuilder.nonDurable(QUEUE_C).withArguments(arguments).build();
   }

   @Bean
   public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){
      return BindingBuilder.bind(queueC).to(xExchange).with(ROUTE_KEY_C);
   }
  • Увеличение отдельного запроса с настраиваемой задержкой
    @GetMapping("sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
        log.info("текущее время: {}, отправка сообщения с задержкой {} миллисекунд в очередь QC: {}",
                TimeUtil.getCurTime(),ttlTime,message);
        RabbitTemplate.ConvertAndSend (x_exchange, route_key_c, сообщение, msg -> {
            msg.getMessageProperties().setExpiration(ttlTime);
            вернуть сообщение;
        });
    }
  • Проверка

Кажется, что проблем нет, но в начале мы упомянули, что если вы установите время TTL в свойствах сообщения, сообщение может не истечь вовремя, потому что RabbitMQ проверяет только, истекло ли время первого сообщения. Если первое сообщение имеет большую задержку, второе сообщение с меньшей задержкой не будет обработано первым.


Реализация плагина для очереди с отложенной обработкой

Как упоминалось ранее, это действительно проблема. Если вы не можете реализовать TTL на уровне сообщений и гарантировать, что сообщения истекают вовремя, вы не сможете создать универсальную очередь с отложенной обработкой. Как решить эту проблему? Давайте решим эту проблему ниже.

Загрузите плагин rabbitmq_delayed_message_exchange с официального сайта и распакуйте его в каталог плагинов RabbitMQ.

  • Загрузите плагин на сервер Linux и разархивируйте его.

Wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.0/rabbitmq_delayed_message_exchange-3.10.0.ez Если загрузка идёт медленно, попробуйте использовать следующий зеркальный адрес для загрузки. Wget http://110.40.155.17/download/rabbitmq_delayed_message_exchange-3.10.0.ez Распаковать Tar -zxvf 3.10.0.tar.gz

  • Скопируйте плагин в каталог plugins контейнера Docker.

Docker cp rabbitmq_delayed_message_exchange-3.10.0.ez cf:/plugins

  • Запустите следующую команду, чтобы активировать плагин, а затем перезапустите RabbitMQ.

Docker exec -it cf rabbitmq-plugins enable rabbitmq_delayed_message_exchange

  • Перезагрузите службу RabbitMQ.

Docker restart cf Использование отложенного обмена

  • До использования отложенного обмена:

Здесь и далее перевод изображения не приводится, так как он не несёт смысловой нагрузки.

  • После использования отложенного обмена:

Если использовать отложенный обмен, официальный документ содержит ссылку:

rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (github.com)

Здесь сначала будет представлен простой пример использования отложенного обмена через нативный API, а затем будет показано, как это реализовать с помощью Springboot.

Что делает отложенный обмен: он берёт сообщения, для которых указано время задержки отправки, и вместо того чтобы сразу направить их в определённую очередь, сохраняет их, а после истечения времени задержки отправляет их в указанную очередь.

  • Чтобы использовать отложенный обмен, сначала объявите тип обмена как «x-delayed-message»:
Map<String, Object> args = new HashMap<String, Object>();
// Объявляем конкретный тип отложенного обмена — сам по себе отложенный обмен только реализует механизм задержки сообщений,
// а какой именно тип обмена используется, нужно выбрать самостоятельно
args.put("x-delayed-type", "direct");
// x-delayed-message — это тип отложенного обмена
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
  • При отправке сообщения укажите в заголовке, на какое время его нужно задержать, и отправьте сообщение:
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
// Добавляем в заголовок поле x-delay, чтобы указать, что сообщение должно быть отправлено через 5 секунд
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

Обратите внимание: если при отправке сообщения в заголовках нет поля x-delay, то обмен получит сообщение и сразу отправит его.

Через поле x-delayed-type можно указать режим маршрутизации при объявлении отложенного обмена, и это поле обязательно должно указывать на существующий тип обмена.

Отложенный обмен можно рассматривать как прокси-обмен, который предоставляет дополнительную поддержку для задержки сообщений и оборачивает существующие обмены, но из-за добавления этой функции может возникнуть снижение производительности.

Поэтому, если вам не нужна функция задержки сообщений, рекомендуется не использовать отложенный обмен.

  • Этот обмен сохраняет сообщения на диск, поэтому его использование может привести к снижению производительности.
  • Отложенный обмен пытается отправить сообщение только один раз, и могут быть причины, по которым отправка не удастся.
  • Отложенный обмен не поддерживает атрибут mandatory.

Практический пример:

Здесь мы добавили очередь delayed.queue и пользовательский обмен delayed.exchange со следующей связью:

Здесь перевод изображения также не приводится.

  • Подготовка констант:
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_ROUTING_KEY = "DELAY";
  • Создание очереди и обмена:
   @Bean("delayedQueue")
   public Queue delayedQueue(){
      return new Queue(DELAYED_QUEUE_NAME);
   };

   @Bean
   public CustomExchange delayedExchange(){

      Map<String, Object> arguments = new HashMap<>();
      // Тип отложенного обмена
      arguments.put("x-delayed-type","direct");
      /*
       * 1. Имя обмена
       * 2. Тип обмена
       * 3. Нужно ли сохранять
       * 4. Нужно ли автоматически удалять
       * 5. Другие параметры
       * */
      return new CustomExchange(DELAYED_EXCHANGE_NAME,
                                // Тип отложенного обмена
                                "x-delayed-message",
              false,false,arguments);
   }
   
   @Bean
   public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                                     @Qualifier("delayedExchange") CustomExchange delayedExchange){
      return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
   }
  • Потребитель:
@Slf4j
@Component
public class DelayQueueConsumer {
    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void recieveDelayQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("Текущее время: {}, получено сообщение из отложенной очереди: {}", TimeUtil.getCurTime(), msg);
    }
}
  • Производитель:
    @GetMapping("/delay/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime){
        log.info("Текущее время: {}, отправляем сообщение длиной {} миллисекунд в отложенную очередь delayed.queue: {}",
                TimeUtil.getCurTime(),delayTime,message);
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME
                ,DELAYED_ROUTING_KEY,message,msg -> {
                    // Задержка отправки сообщения указывается в миллисекундах
                    msg.getMessageProperties().setDelay(delayTime);
                    return msg;
                });
    }
  • Тестирование. Можно увидеть, что не будет блокироваться приём коротких сообщений с задержкой из-за того, что ранее было отправлено сообщение с более длительной задержкой.

Режим подтверждения публикации

Здесь демонстрируется элегантное использование springboot для реализации режима подтверждения публикации.

Подготовка констант

public class RabbitmqConstants {
    //--------------------EXCHANGE--------------------------
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //--------------------QUEUE--------------------------
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //------------------ROUTE_KEY-----------------------------
    public static final String CONFIRM_ROUTING_KEY = "key1";
}

Объявление обмена и очереди

@Configuration
public class RabbitConfig {
    //--------------------EXCHANGE--------------------------
    @Bean
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }
    //--------------------QUEUE--------------------------
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
    //--------------------bind--------------------------
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange")DirectExchange confirmExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}

Потребитель (второстепенный, подтверждение или отказ и потребитель не связаны, неясно, посмотрите выше режим подтверждения публикации)

@Slf4j
@Component
public class Consumer {
    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveConfirmMessage(Message message) {
        String msg = new String(message.getBody());
        log.info("Получено сообщение в очереди confirm.queue: {}", msg);
    }
}

Производитель

@Slf4j
@RequiredArgsConstructor
@RequestMapping("/msg")
public class RabbitmqController {
    private final RabbitTemplate rabbitTemplate;
    @PostMapping("/{message}")
    public void sendMessage(@PathVariable String message){
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, CONFIRM_ROUTING_KEY, message, new CorrelationData("1"));
        log.info("Отправлено сообщение: {}", message);
    }
}

Подготовка обратного вызова

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //将当前实现类,注入到rabbitTemplate中的接口上
        //不然到时候rabbitTemplate调用接口时,找不到我们的实现类
        rabbitTemplate.setConfirmCallback(this);
    }

    /*
     * 交换机确认回调方法,发消息后,交换机接收到了就回调
     *   1.1 correlationData:保存回调消息的ID及相关信息
     *   1.2 b:交换机收到消息,为ack=true
     *   1.3 s:失败原因,成功为null
     *
     * 发消息,交换机接受失败,也回调
     *   2.1 correlationData:保存回调消息的ID及相关信息
     *   2.2 b:交换机没收到消息,为ack=false
     *   2.3 s:失败的原因
     *
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData!=null ? correlationData.getId():"";
        if (b){
            log.info("Обмен уже получил информацию с ID: {}", id);
        }else {
            log.info("Обмен ещё не получил сообщение с ID: {}, причина: {}", id, s);
        }
    }
}

Включение режима подтверждения публикации

spring.rabbitmq.publisher-confirm-type=correlated

NONE отключает режим подтверждения публикации, это значение по умолчанию. ⚫ CORRELATED вызывает обратный вызов после успешной отправки сообщения на обменник. ⚫ SIMPLE подтверждает одну отправку сообщения одной отправкой сообщения.

Обратите внимание на режим SIMPLE: при использовании метода waitForConfirms или waitForConfirmsOrDie в rabbitTemplate для ожидания ответа от узла брокера необходимо учитывать, что метод waitForConfirmsOrDie может закрыть канал, если он возвращает false, и последующие сообщения не могут быть отправлены брокеру.

Тестирование

Интерфейс возврата сообщений

Выше было сказано, здесь рассказывается о том, как использовать springboot.

  • mandatory

    • TRUE: сообщение не направляется в очередь, вызывается интерфейс возврата сообщений.
    • FALSE: сообщение не направляется в очередь, предпринимается попытка переслать сообщение на резервный обменник.
  • В конфигурационном файле включить настройку mandatory со значением true.

spring.rabbitmq.publisher-returns=true
  • Установить интерфейс возврата сообщений
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

...
} ```
@PostConstruct
public void init() {
    // 将当前实现类,注入到rabbitTemplate中的接口上
    // 否则到时候rabbitTemplate调用接口时,找不到我们的实现类
    rabbitTemplate.setConfirmCallback(this);
    rabbitTemplate.setReturnsCallback(this);
}

/*
 * 交换机确认回调方法,发消息后,交换机接收到了就回调
 *   1.1 correlationData:保存回调消息的ID及相关信息
 *   1.2 b:交换机收到消息,为ack=true
 *   1.3 s:失败原因,成功为null
 *
 * 发消息,交换机接受失败,也回调
 *   2.1 correlationData:保存回调消息的ID及相关信息
 *   2.2 b:交换机没收到消息,为ack=false
 *   2.3 s:失败的原因
 *
 */

@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
    String id = correlationData != null ? correlationData.getId() : "";
    if (b) {
        log.info("交换机已经收到ID为:{}的信息", id);
    } else {
        log.info(
            "交换机还未收到ID为:{}的消息,由于原因:{}",
            id,
            s
        );
    }
}

@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
    log.error(
        "消息{},被交换机{}退回,退回的原因:{},路由Key:{}",
        new String(returnedMessage.getMessage().getBody()),
        returnedMessage.getExchange(),
        returnedMessage.getReplyText(),
        returnedMessage.getRoutingKey()
    );
}
  • Тест
@PostMapping("/msg/{message}")
public void sendMessage(@PathVariable String message) {
    rabbitTemplate
        .convertAndSend(CONFIRM_EXCHANGE_NAME, CONFIRM_ROUTING_KEY, message + "key1", new CorrelationData("1"));
    log.info("发送消息内容:{}", message + "key1");

    CorrelationData correlationData2 = new CorrelationData("2");
    rabbitTemplate
        .convertAndSend(
            CONFIRM_EXCHANGE_NAME,
            CONFIRM_ROUTING_KEY + "2",
            message + "key12",
            correlationData2
        );
    log.info("发送消息内容:{}", message + "key12");
}

Резервный обменник

Резервный обменник — это расширение протокола AMPQ в RabbitMQ, которое называется Alternate Exchanges в RabbitMQ.

Код для объявления резервного обменника:

@Configuration
public class RabbitConfig {

    // Обменник
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    // Очередь
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    // RoutingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";
    // Резервный обменник
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
    // Резервная очередь
    public static final String BACKUP_QUEUE_NAME = "backup_queue";
    // Предупреждающая очередь
    public static final String WARNING_QUEUE_NAME = "warning_queue";

    // Объявление обменника
    @Bean
    public DirectExchange confirmExchange() {
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
                // Указание резервного обменника
                .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)
                .build();
    }

    // Объявление очереди
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    // Резервный обменник - обычно типа разветвления
    @Bean
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    // Резервная очередь
    @Bean
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    // Предупреждающая очередь
    @Bean
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    // Привязка
    @Bean
    public Binding queueBindingExchange(
            @Qualifier("confirmQueue") Queue confirmQueue,
            @Qualifier("confirmExchange") DirectExchange confirmExchange
    ) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

    @Bean
    public Binding backupQueueBindingBackupExchange(
            @Qualifier("backupQueue") Queue backupQueue,
            @Qualifier("backupExchange") FanoutExchange backupExchange
    ) {
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }
}
``` **Перевод текста на русский язык:**

**Предупреждение очереди резервного обмена (@Qualifier("warningQueue") Очередь backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange)**

Возвращает BindingBuilder.bind(backupQueue).to(backupExchange);

}

**Другие процессы и обычные очереди, обменники используются без различий.**

****

## Производство. Внимание

### Компенсация сообщений

*Основная идея: записи в базе данных о подтверждённых сообщениях пользователя, затем сопоставление отправленных производителем сообщений с сообщениями, подтверждёнными потребителем, позволяет выявить потерянные или неподтверждённые сообщения, после чего потребитель уведомляется о необходимости повторной отправки.*

В этом процессе также используется механизм отложенной отправки сообщений и служба сравнения по расписанию, что обеспечивает двойную гарантию компенсации сообщений.

****

### Как сохранить уникальность сообщений

Уникальность означает, что результаты одного запроса или нескольких запросов пользователя для одной и той же операции совпадают и не вызывают побочных эффектов из-за многократного нажатия.

Простейший пример  это оплата. После покупки товара пользователь оплачивает его, платёж успешно списывается, но при возврате результата возникает проблема с сетью, и деньги уже списаны. Пользователь снова нажимает кнопку, и происходит второе списание, результат возвращается успешно, пользователь проверяет баланс и обнаруживает, что было списано больше денег, а записи в журнале транзакций удвоились.

В старых системах с одним приложением достаточно было поместить данные в транзакцию, чтобы при возникновении ошибки немедленно откатить их, но во время ответа клиенту всё ещё существовала вероятность сбоя сети или других проблем.

Существует несколько методов обеспечения уникальности сообщений:

****

#### Глобальный уникальный идентификатор

Этот метод имеет недостаток, заключающийся в том, что каждый запрос к базе данных требует значительных затрат производительности, поэтому его не рекомендуется использовать.

****

#### Redis setnx

Причина использования setnx заключается в его уникальности, то есть если ключ уже существует, установка завершится неудачно.

****

#### Оптимистическая блокировка

Оптимистическая блокировка  это стратегия управления параллелизмом, которая предполагает, что конфликты между одновременными операциями происходят редко, и пытается разрешить их автоматически, когда они возникают.

****

## Краткое изложение основных процессов коммуникации Rabbitmq

Есть много дополнительных параметров для управления процессом связи, которые здесь не перечислены. Вы можете обратиться к предоставленному процессу и продолжить добавлять и улучшать его.

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Различные подходы к реализации отложенных задач. Развернуть Свернуть
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/DeleyTask.git
git@api.gitlife.ru:oschina-mirror/DeleyTask.git
oschina-mirror
DeleyTask
DeleyTask
master