1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/venus-suite-rocketmq-with-delivery-time

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

Введение в концепцию

В открытом источнике RocketMQ поддерживает только задержку сообщений с 18 конкретными уровнями задержки.

В платной версии продукты MQ, такие как Alibaba Cloud и Tencent Cloud, поддерживают задержку сообщений с точностью до секунды.

Задачи со временем: Производитель отправляет сообщение в очередь сообщений RocketMQ, но не хочет немедленно передать это сообщение. Вместо этого он хочет, чтобы сообщение было доставлено потребителю через некоторое время после текущего момента. Такое сообщение называется сообщением с задачей времени.

Задержка сообщений: Производитель отправляет сообщение в очередь сообщений RocketMQ, но не хочет немедленно передать это сообщение. Вместо этого он хочет, чтобы сообщение было доставлено потребителю через определённое время. Такое сообщение называется сообщением с задержкой.

Сообщения с задачами времени и задержками имеют некоторые различия в конфигурации кода, но в конечном итоге достигается одинаковый эффект: сообщение не передаётся сразу после отправки в очередь сообщений RocketMQ, а передаётся позже, согласно свойствам сообщения, с задержкой определенного времени.## Реализация (четыре способа реализации)

1. Через прокси ссылка

2. Через время и delay-commit-log ссылка

3. Через время и время файл ссылка

Основанный на 18 уровнях поддержки RocketMQ ссылка## Применимые сценарии

Задерживаемые сообщения и сообщения с временными метками подходят для следующих сценариев:

Сообщение отправляется и получается в определенное время, например, при электронной коммерции, если заказ не оплачен в течение времени, он автоматически отменяется.

При создании заказа отправляется задерживающееся сообщение.

Это сообщение будет доставлено потребителю через 30 минут.

Получатель этого сообщения должен проверить, был ли заказ полностью оплачен.

Если заказ еще не оплачен, он будет отменен. Если же заказ уже оплачен, сообщение будет проигнорировано.

Использование сообщений для запуска задач на расписание, например, отправка напоминаний пользователям в определенное время.

Способ использования

Существует небольшая разница в использовании задерживаемых сообщений и сообщений с временными метками:

Для отправки сообщения с временной меткой требуется указать конкретное время после которого сообщение должно быть доставлено.

Для отправки задерживающегося сообщения следует установить задержку времени, сообщение будет доставлено после указанного периода времени.

Внимание

Точность сообщений с временной меткой может иметь погрешность до 1–2 секунд.Параметр msg.setStartDeliverTime для сообщений с временной меткой и задерживающихся сообщений должен быть установлен на значение текущего момента времени плюс некоторое количество миллисекунд.Если параметр установлен на момент времени раньше текущего, сообщение будет немедленно доставлено потребителю.

Параметр msg.setStartDeliverTime можно установить на любое время в пределах 40 дней (в миллисекундах). Превышение этого срока приведёт к ошибке отправки сообщения.

StartDeliverTime — это время начала передачи сообщения сервером конечному потребителю. Если у потребителя есть накопленные сообщения, то задерживающиеся и сообщения с временной меткой будут помещены в конце списка этих сообщений и могут не быть переданы строго в соответствии со своим временем.

По причине возможного различия во времени между клиентом и сервером, фактическое время передачи сообщения может отличаться от времени, указанного клиентом.

Как использовать

Рекомендуется использовать версию RocketMQ, предоставленную Alibaba Cloud

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.4.Final</version>
</dependency>

Отправка сообщений

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateFormatUtils;

import java.util.Date;
import java.util.Properties;
``````java
public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // AccessKey ID - unique authentication identifier in the RAM service on the Alibaba Cloud platform.
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // AccessKey Secret - secret authentication key in the RAM service on the Alibaba Cloud platform.
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        // Set the domain of the TCP connection, which can be viewed on the endpoint details page in the RocketMQ console.
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "localhost:9876");
    }
}
```        Producer producer = ONSFactory.createProducer(properties);
        // Перед отправкой сообщений необходимо вызвать метод start для запуска производителя, этот метод следует вызывать один раз.
        producer.start();

        {
            Message msg = new Message(
                    // Тема, созданная Вами в консоли управления RocketMQ.
                    "TopicTest",
                    // Определяет метку сообщения, которая может пониматься как аналог тэгов в Gmail, позволяющий группировать сообщения.
                    "TagA",
                    // Тело сообщения может содержать любые двоичные данные, сервис RocketMQ не ограничивает это.
                    "Демонстрация 15 секунд >>>".getBytes());
            // Устанавливает бизнес-ключевой атрибут сообщения, который должен быть уникален глобально.
            // Это полезно при отсутствии возможности получения сообщений, так как позволяет найти и восстановить сообщение через консоль управления.
            // Обратите внимание: если не установлен, это не повлияет на нормальное получение и отправление сообщений.
            msg.setKey("ORDERID_100e");
            try {
                // Отложенные сообщения, единицы измерения - миллисекунды (ms), сообщение будет доставлено после указанного времени (текущее время плюс задержка).
                long delayTime = System.currentTimeMillis() + 15000;
                System.out.println("Время отправки >>" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));```java
                // Устанавливает время начала доставки сообщения.
                msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);
// Синхронная отправка сообщения, если исключения не выброшены, значит отправка успешна.
if (sendResult != null) {
    System.out.println(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss") + " Отправка сообщения в очередь успешно завершена. Тема: " + msg.getTopic() + " ID сообщения: " + sendResult.getMessageId());
}
} catch (Exception e) {
    // При неудачной отправке сообщения требуется выполнить процедуру повторной отправки или записать данные для компенсационной обработки.
    System.out.println(new Date() + " Отправка сообщения в очередь не удалась. Тема: " + msg.getTopic());
    e.printStackTrace();
}
}
// Перед выходом приложения следует уничтожить объект Producer.
// Внимание: даже если объект не будет уничтожен, это не вызовет проблем.
producer.shutdown();

Получение сообщений```java

import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateFormatUtils; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt;

import java.util.Date; import java.util.List;

/**

  • Этот пример демонстрирует, как подписаться и получать сообщения с помощью предоставленного {@link DefaultMQPushConsumer}. */ public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

     /*
      * Создание экземпляра с указанным уникальным именем группы потребителей.
      */
    
     final int[] totals = {0};
     DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    
     consumer.setNamesrvAddr("localhost:9876");
     /*
      * Указание адресов серверов имен.
      * <p/>
      *
      * В качестве альтернативы вы можете указать адреса серверов имен через экспорт переменной окружения: NAMESRV_ADDR
      * <pre>
      * {@code
      * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
      * }
      * </pre>
      */
    
     /*
      * Указание точки отсчета в случае, если указанная группа потребителей является новой.
      */
     consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
     /*
      * Подписка на одну или несколько тем для получения сообщений.
      */
     consumer.subscribe("TopicTest", "*");

}

### Как отправить отложенные сообщения с использованием общественной версии RocketMQ

```java
/* Установите тему, созданную вами в консоли управления RocketMQ. */
Message msg = new Message("ВАШ ТЕМП",
    /* Установите метку сообщения. */
    "ВАША МЕТА-МАРКЕР",
    /* Содержание сообщения. */
    "Привет мир".getBytes(RemotingHelper.DEFAULT_CHARSET));
/* Для отправки отложенного сообщения требуется установить время отложенного сообщения, в миллисекундах (ms). Сообщение будет доставлено после указанного времени отложенного сообщения, например, сообщение будет доставлено через 3 секунды. */
long delayTime = System.currentTimeMillis() + 3000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
```
```/**
* Если вам нужно отправить сообщение с временем начала, вы должны установить это время. Сообщение будет доставлено в указанное время, например, сообщение будет доставлено в 2021-08-10 18:45:00.
* Формат временного штампа: yyyy-MM-dd HH:mm:ss. Если установленное значение временного штампа меньше текущего времени, сообщение будет немедленно передано Consumer.
* long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
* msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);## Полностью управляемый UI
### Управление темпами
![topics management](docs/img/a.png)

### Управление группами
![group management](docs/img/f.png)

### Управление сообщениями
![message management](docs/img/e.png)

#### Сильная возможность поиска сообщений (поддерживает различные аспекты)
![search messages](docs/img/search.png)

![search result](docs/img/result.png)

#### Сильная детализация сообщений (просмотр JSON-формата сообщений, поиск пути сообщений)
![message detail](docs/img/detail.png)

### Обратная связь
![контакты](docs/img/qrcode.jpg)
```

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Описание недоступно Развернуть Свернуть
Java и 6 других языков
Apache-2.0
Отмена

Обновления (1)

все

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/venus-suite-rocketmq-with-delivery-time.git
git@api.gitlife.ru:oschina-mirror/venus-suite-rocketmq-with-delivery-time.git
oschina-mirror
venus-suite-rocketmq-with-delivery-time
venus-suite-rocketmq-with-delivery-time
main