Задержки сообщений RabbitMQ
Существует два способа реализации задержки сообщений в RabbitMQ.
При использовании очереди с TTL сообщения, которые не были использованы, будут повторно маршрутизироваться в очередь потребления. Время истечения срока действия всех сообщений в одной очереди будет одинаковым, даже если некоторые из них имеют собственный TTL. Если сообщение в начале очереди не истекло по сроку, то оно не может быть использовано.
Основные настройки:
spring:
cloud:
stream:
bindings:
input:
destination: delay_message_queue_input
group: test-service
output:
destination: delay_message_queue_output
producer:
required-groups: test-service
rabbit:
bindings:
input:
consumer:
exchangeType: direct
output:
producer:
ttl: 3000
autoBindDlq: true
deadLetterExchange: delay_message_queue_input
deadLetterQueueName: delay_message_queue_input.test-service
Определяются две очереди: delay_message_queue_output
и delay_message_queue_input
. Очередь delay_message_queue_output
объявляется как очередь с задержкой, и для неё устанавливается TTL в 3000 миллисекунд. DeadLetterExchange и deadLetterQueueName указывают на обмен и очередь, куда будут перенаправляться сообщения после истечения срока их действия.
Этот метод позволяет более гибко настраивать задержку для каждого сообщения. Сообщения, срок действия которых истёк, будут направляться в связанную очередь для использования.
Основные настройки:
spring:
cloud:
stream:
bindings:
input:
destination: delay_message_exchange
group: test-service
output:
destination: delay_message_exchange
rabbit:
bindings:
input:
consumer:
delayed-exchange: true
output:
producer:
delayed-exchange: true
Здесь также определяются две очереди, которые объявляются как очереди с задержкой обмена. Параметр delayed-exchange требует поддержки плагина задержки RabbitMQ. При отправке сообщения используется параметр x-delay для указания времени истечения срока его действия.
Пример кода отправки сообщения с задержкой:
public void sendDelayExchangeMessage(String message) {
log.info("send message {}", message);
processor.output().send(MessageBuilder.withPayload(message).setHeader("x-delay",20000).build());
}
Поддержите проект через WeChat Pay.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )