1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/sense7-windmq

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
README.md 7.4 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 29.11.2024 19:19 ace52dd

windmq — MQTT 快速开发脚手架

Введение

Springboot позволяет быстро подключиться к MQTT. Для обработки сообщений достаточно одного метода и аннотации.

Разработка

Ранее был реализован проект с использованием MQTT для работы с данными, которые поступали от оборудования. Проект был развёрнут в производственной среде на Alibaba Cloud. В качестве брокера сообщений использовался EMQ. На основе этого проекта был создан новый проект, который интегрирует Springboot и правила топиков. Автор выражает благодарность Ye Zhihao за его опыт и знания.

Если у вас есть идеи по совместному использованию подписки и обеспечению высокой доступности, автор будет рад услышать ваши предложения.

Функции

  • Распределение учётных данных для входа в MQTT-клиент (ACL поддерживает Alibaba Cloud и EMQ, но только с учётными записями и паролями, можно реализовать собственную систему).
  • Поддержка шифрования для таблиц поиска (необязательная функция) (подробности см.: com.stanwind.wmqtt.security.TableMsgEncrypt).
  • Высокая доступность при развёртывании (можно использовать несколько экземпляров с разными clientID, EMQ предоставляет общую подписку, а Alibaba Cloud использует механизм маршрутизации сообщений на основе правил, мы используем механизм, в котором каждый экземпляр отвечает за обработку своих команд).
  • Пул асинхронной обработки сообщений, обработка сообщений в порядке очереди (CPU * 2 + 1).
  • Соответствие топиков и обработка сообщений с помощью аннотаций, поддержка точного соответствия и соответствия с подстановочными знаками (регулярные выражения, можно указать параметры пути топика).

Рекомендации по умолчанию

  • Все топики, отправленные клиентами, должны иметь формат IOT_CLIENT/xxx (настраивается).
  • Все топики, отправляемые сервером, должны иметь формат IOT_SERVER/xxx (настраивается).
  • Если включена функция шифрования, все топики, начинающиеся с IOT, будут зашифрованы (см.: com.stanwind.wmqtt.security.IotDeviceMessageEncrypt).
  • Идентификаторы клиентов на Alibaba Cloud начинаются с GID_DEVICE@@@ (настраиваются).
  • На сервере используется аутентификация с подписью, на Alibaba Cloud клиенты используют токены для аутентификации, срок действия токенов составляет 12 часов.
  • Топики могут содержать {instanceId} для сопоставления с текущим идентификатором экземпляра и {deviceId} для сопоставления с серийным номером текущего устройства (подробнее: com.stanwind.wmqtt.MqttConfig).

Поддержка Springboot Версия 2.0.X.RELEASE.

Репозиторий проекта

<dependency>
  <groupId>com.stanwind</groupId>
  <artifactId>spring-boot-windmq</artifactId>
  <version>1.1.2-RELEASE</version>
</dependency>

Пример проекта https://gitee.com/sense7/windmq-demo.git

Зависимости

<!-- windmq dependency -->
<dependency>
  <groupId>com.stanwind</groupId>
  <artifactId>spring-boot-windmq</artifactId>
  <version>1.1.0-RELEASE</version>
</dependency>

<!-- MQTT -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
  <exclusions>
    <exclusion>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <groupId>org.eclipse.paho</groupId>
    </exclusion>
  </exclusions>
</dependency>
<!-- 1.2.0 版本有bug -->
<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.1</version>
</dependency>

Включение windmq

@EnableWindMQ
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

Использование примера

  • Временная подписка/отмена (внедрение WMHolder)
void addTopic(String... topic);
void addTopic(String topic, int qos);
void addTopics(String[] topic, int[] qos);
void removeTopic(String... topic);
  • Отправка сообщений IMessageService
/**
 * Отправка сообщения устройству, требуется класс payload с аннотацией @Topic
 * @param deviceId
 * @param payload
 */
void notify(String deviceId, Object payload);

/**
 * Отправка сообщения в топик
 * @param topic
 * @param payload
 */
void notifyToTopic(String topic, Object payload);

/**
 * Синхронное подтверждение отправки сообщения на устройство
 * @param deviceId
 * @param payload
 * @return
 */
MqttResponse request(String deviceId, MqttRequest payload);

/**
 * Синхронное подтверждение отправки сообщения на устройство
 * @param deviceId
 * @param payload
 * @param timeout ожидание в мс
 * @return
 */
MqttResponse request(String deviceId, MqttRequest payload, long timeout);

/**
 * Ответ на синхронное сообщение
 * @param message
 * @return
 */
boolean response(Message<MqttResponse> message);

/**
* Общий ответ клиента
* @param messageId
* @param deviceId
* @param result
*/
void sendCommonResponse(Long messageId, String deviceId, Integer result);

/**
* Общий ответ клиенту
* @param messageId
* @param deviceId
*/
void sendCommonResponse(Long messageId, String deviceId);
  • Обработка сообщений
@TopicHandler(topic = "$SYS/brokers/{node}/clients/{deviceId}/connected")
public void connected(MQTTMsg msg) {
    ClientReqVO clientReqVO = JSONObject.parseObject(msg.getPayload().toString(), ClientReqVO.class);
    process(clientReqVO);
}
  • Получение параметров пути
@Service
public class DemoHandler extends

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/sense7-windmq.git
git@api.gitlife.ru:oschina-mirror/sense7-windmq.git
oschina-mirror
sense7-windmq
sense7-windmq
v1.1.2