Задачи со временем: Производитель отправляет сообщение в очередь сообщений RocketMQ, но не хочет немедленно передать это сообщение. Вместо этого он хочет, чтобы сообщение было доставлено потребителю через некоторое время после текущего момента. Такое сообщение называется сообщением с задачей времени.
Задержка сообщений: Производитель отправляет сообщение в очередь сообщений RocketMQ, но не хочет немедленно передать это сообщение. Вместо этого он хочет, чтобы сообщение было доставлено потребителю через определённое время. Такое сообщение называется сообщением с задержкой.
Сообщения с задачами времени и задержками имеют некоторые различия в конфигурации кода, но в конечном итоге достигается одинаковый эффект: сообщение не передаётся сразу после отправки в очередь сообщений RocketMQ, а передаётся позже, согласно свойствам сообщения, с задержкой определенного времени.## Реализация (четыре способа реализации)
Задерживаемые сообщения и сообщения с временными метками подходят для следующих сценариев:
Сообщение отправляется и получается в определенное время, например, при электронной коммерции, если заказ не оплачен в течение времени, он автоматически отменяется.
При создании заказа отправляется задерживающееся сообщение.
Это сообщение будет доставлено потребителю через 30 минут.
Получатель этого сообщения должен проверить, был ли заказ полностью оплачен.
Если заказ еще не оплачен, он будет отменен. Если же заказ уже оплачен, сообщение будет проигнорировано.
Использование сообщений для запуска задач на расписание, например, отправка напоминаний пользователям в определенное время.
Существует небольшая разница в использовании задерживаемых сообщений и сообщений с временными метками:
Для отправки сообщения с временной меткой требуется указать конкретное время после которого сообщение должно быть доставлено.
Для отправки задерживающегося сообщения следует установить задержку времени, сообщение будет доставлено после указанного периода времени.
Точность сообщений с временной меткой может иметь погрешность до 1–2 секунд.Параметр msg.setStartDeliverTime
для сообщений с временной меткой и задерживающихся сообщений должен быть установлен на значение текущего момента времени плюс некоторое количество миллисекунд.Если параметр установлен на момент времени раньше текущего, сообщение будет немедленно доставлено потребителю.
Параметр msg.setStartDeliverTime
можно установить на любое время в пределах 40 дней (в миллисекундах). Превышение этого срока приведёт к ошибке отправки сообщения.
StartDeliverTime
— это время начала передачи сообщения сервером конечному потребителю. Если у потребителя есть накопленные сообщения, то задерживающиеся и сообщения с временной меткой будут помещены в конце списка этих сообщений и могут не быть переданы строго в соответствии со своим временем.
По причине возможного различия во времени между клиентом и сервером, фактическое время передачи сообщения может отличаться от времени, указанного клиентом.
Рекомендуется использовать версию RocketMQ, предоставленную Alibaba Cloud
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.4.Final</version>
</dependency>
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateFormatUtils;
import java.util.Date;
import java.util.Properties;
``````java
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
// AccessKey ID - unique authentication identifier in the RAM service on the Alibaba Cloud platform.
properties.put(PropertyKeyConst.AccessKey, "XXX");
// AccessKey Secret - secret authentication key in the RAM service on the Alibaba Cloud platform.
properties.put(PropertyKeyConst.SecretKey, "XXX");
// Set the domain of the TCP connection, which can be viewed on the endpoint details page in the RocketMQ console.
properties.put(PropertyKeyConst.NAMESRV_ADDR, "localhost:9876");
}
}
``` Producer producer = ONSFactory.createProducer(properties);
// Перед отправкой сообщений необходимо вызвать метод start для запуска производителя, этот метод следует вызывать один раз.
producer.start();
{
Message msg = new Message(
// Тема, созданная Вами в консоли управления RocketMQ.
"TopicTest",
// Определяет метку сообщения, которая может пониматься как аналог тэгов в Gmail, позволяющий группировать сообщения.
"TagA",
// Тело сообщения может содержать любые двоичные данные, сервис RocketMQ не ограничивает это.
"Демонстрация 15 секунд >>>".getBytes());
// Устанавливает бизнес-ключевой атрибут сообщения, который должен быть уникален глобально.
// Это полезно при отсутствии возможности получения сообщений, так как позволяет найти и восстановить сообщение через консоль управления.
// Обратите внимание: если не установлен, это не повлияет на нормальное получение и отправление сообщений.
msg.setKey("ORDERID_100e");
try {
// Отложенные сообщения, единицы измерения - миллисекунды (ms), сообщение будет доставлено после указанного времени (текущее время плюс задержка).
long delayTime = System.currentTimeMillis() + 15000;
System.out.println("Время отправки >>" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));```java
// Устанавливает время начала доставки сообщения.
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);
// Синхронная отправка сообщения, если исключения не выброшены, значит отправка успешна.
if (sendResult != null) {
System.out.println(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss") + " Отправка сообщения в очередь успешно завершена. Тема: " + msg.getTopic() + " ID сообщения: " + sendResult.getMessageId());
}
} catch (Exception e) {
// При неудачной отправке сообщения требуется выполнить процедуру повторной отправки или записать данные для компенсационной обработки.
System.out.println(new Date() + " Отправка сообщения в очередь не удалась. Тема: " + msg.getTopic());
e.printStackTrace();
}
}
// Перед выходом приложения следует уничтожить объект Producer.
// Внимание: даже если объект не будет уничтожен, это не вызовет проблем.
producer.shutdown();
import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateFormatUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt;
import java.util.Date; import java.util.List;
/**
Этот пример демонстрирует, как подписаться и получать сообщения с помощью предоставленного {@link DefaultMQPushConsumer}. */ public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
/*
* Создание экземпляра с указанным уникальным именем группы потребителей.
*/
final int[] totals = {0};
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("localhost:9876");
/*
* Указание адресов серверов имен.
* <p/>
*
* В качестве альтернативы вы можете указать адреса серверов имен через экспорт переменной окружения: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
/*
* Указание точки отсчета в случае, если указанная группа потребителей является новой.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Подписка на одну или несколько тем для получения сообщений.
*/
consumer.subscribe("TopicTest", "*");
}
### Как отправить отложенные сообщения с использованием общественной версии RocketMQ
```java
/* Установите тему, созданную вами в консоли управления RocketMQ. */
Message msg = new Message("ВАШ ТЕМП",
/* Установите метку сообщения. */
"ВАША МЕТА-МАРКЕР",
/* Содержание сообщения. */
"Привет мир".getBytes(RemotingHelper.DEFAULT_CHARSET));
/* Для отправки отложенного сообщения требуется установить время отложенного сообщения, в миллисекундах (ms). Сообщение будет доставлено после указанного времени отложенного сообщения, например, сообщение будет доставлено через 3 секунды. */
long delayTime = System.currentTimeMillis() + 3000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
```
```/**
* Если вам нужно отправить сообщение с временем начала, вы должны установить это время. Сообщение будет доставлено в указанное время, например, сообщение будет доставлено в 2021-08-10 18:45:00.
* Формат временного штампа: yyyy-MM-dd HH:mm:ss. Если установленное значение временного штампа меньше текущего времени, сообщение будет немедленно передано Consumer.
* long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
* msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);## Полностью управляемый UI
### Управление темпами

### Управление группами

### Управление сообщениями

#### Сильная возможность поиска сообщений (поддерживает различные аспекты)


#### Сильная детализация сообщений (просмотр JSON-формата сообщений, поиск пути сообщений)

### Обратная связь

```
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )