Введение
Springboot позволяет быстро подключиться к MQTT. Для обработки сообщений достаточно одного метода и аннотации.
Разработка
Ранее был реализован проект с использованием MQTT для работы с данными, которые поступали от оборудования. Проект был развёрнут в производственной среде на Alibaba Cloud. В качестве брокера сообщений использовался EMQ. На основе этого проекта был создан новый проект, который интегрирует Springboot и правила топиков. Автор выражает благодарность Ye Zhihao за его опыт и знания.
Если у вас есть идеи по совместному использованию подписки и обеспечению высокой доступности, автор будет рад услышать ваши предложения.
Функции
Рекомендации по умолчанию
Поддержка Springboot Версия 2.0.X.RELEASE.
Репозиторий проекта
<dependency>
<groupId>com.stanwind</groupId>
<artifactId>spring-boot-windmq</artifactId>
<version>1.1.2-RELEASE</version>
</dependency>
Пример проекта https://gitee.com/sense7/windmq-demo.git
Зависимости
<!-- windmq dependency -->
<dependency>
<groupId>com.stanwind</groupId>
<artifactId>spring-boot-windmq</artifactId>
<version>1.1.0-RELEASE</version>
</dependency>
<!-- MQTT -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<exclusions>
<exclusion>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<groupId>org.eclipse.paho</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- 1.2.0 版本有bug -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.1</version>
</dependency>
Включение windmq
@EnableWindMQ
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
Использование примера
void addTopic(String... topic);
void addTopic(String topic, int qos);
void addTopics(String[] topic, int[] qos);
void removeTopic(String... topic);
/**
* Отправка сообщения устройству, требуется класс payload с аннотацией @Topic
* @param deviceId
* @param payload
*/
void notify(String deviceId, Object payload);
/**
* Отправка сообщения в топик
* @param topic
* @param payload
*/
void notifyToTopic(String topic, Object payload);
/**
* Синхронное подтверждение отправки сообщения на устройство
* @param deviceId
* @param payload
* @return
*/
MqttResponse request(String deviceId, MqttRequest payload);
/**
* Синхронное подтверждение отправки сообщения на устройство
* @param deviceId
* @param payload
* @param timeout ожидание в мс
* @return
*/
MqttResponse request(String deviceId, MqttRequest payload, long timeout);
/**
* Ответ на синхронное сообщение
* @param message
* @return
*/
boolean response(Message<MqttResponse> message);
/**
* Общий ответ клиента
* @param messageId
* @param deviceId
* @param result
*/
void sendCommonResponse(Long messageId, String deviceId, Integer result);
/**
* Общий ответ клиенту
* @param messageId
* @param deviceId
*/
void sendCommonResponse(Long messageId, String deviceId);
@TopicHandler(topic = "$SYS/brokers/{node}/clients/{deviceId}/connected")
public void connected(MQTTMsg msg) {
ClientReqVO clientReqVO = JSONObject.parseObject(msg.getPayload().toString(), ClientReqVO.class);
process(clientReqVO);
}
@Service
public class DemoHandler extends
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )