MQTT-клиент, основанный на paho
Maven-зависимость:
<dependency>
<groupId>net.oschina.durcframework</groupId>
<artifactId>paho-mqtt-client</artifactId>
<version>1.0.0</version>
</dependency>
Запись кода
PahoMqttClient mqttClient = PahoMqttClient.create()
.broker(broker)
.auth(username, password)
.clientId(clientId)
.cleanSession(true)
// Автоматическое восстановление соединения
.automaticReconnect(true)
// Подписка на сообщения
.subscribe(topic, 2)
// Установка обратного вызова для обработки сообщений
.callback(new MyMqttCallback())
.connect();
public static class MyMqttCallback extends PahoMqttCallback {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("[client] потеряно соединение:" + throwable.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("[client] получено сообщение: topic:" + topic + ", msg:" + mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
Отправка сообщений
mqttClient.publish(upLocationTopic, location.getBytes(), 1);
В этом примере используется система车联网 (система车联网 представляет собой систему управления транспортными средствами через интернет) в качестве тестового сценария. Автомобильные терминалы отправляют данные о местоположении с определенной периодичностью, а платформа IOT отправляет команды на обновление через OTA с определенной периодичностью.
Функции реализации платформы IOT:
Подписка на топик местоположения, получение данных о местоположении от устройств
Отправка сообщений OTA на автомобильные терминалыФункции реализации автомобильных терминалов:
Подписка на топик точек для отправки команд, получение команд от платформы
Подписка на топик местоположения, отправка данных о местоположении
package com.gitee.mqttclient;
import com.gitee.mqttclient.callback.PahoMqttCallback;
import com.gitee.mqttclient.client.PahoMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
/**
* Модель платформы
* @author thc
*/
public class ServerTest {
protected static String broker = "tcp://1.1.1.1:1883";
protected static String username = "s001";
protected static String password = "123456";
static PahoMqttClient mqttClient;
/**
* Запуск серверной части
* @throws InterruptedException
* @throws MqttException
*/
@Test
public void server() throws InterruptedException, MqttException {
String clientId = "server-node-1";
// Серверная часть слушает все темы для позиционирования автомобилей
String topic = "prod/+/+/base/location/#";
mqttClient = PahoMqttClient.create()
.broker(broker)
.auth(username, password)
.clientId(clientId)
.cleanSession(true)
// Автоматическое восстановление соединения
.automaticReconnect(true)
// Подписка на сообщения
.subscribe(topic, 1)
.callback(new ServerMqttCallback())
.connect();
}
}
``` // Запуск отдельного потока для отправки команд
new Thread(() -> {
// Каждые 20 секунд отправляется команда для OTA-обновления автомобиля A000001
String downTopic = "o2o/A000001/ota";
while (true) {
String otaContent = String.valueOf(System.currentTimeMillis());
System.out.println("Отправка OTA-команды: " + otaContent);
byte[] otaFile = otaContent.getBytes();
try {
mqttClient.publish(downTopic, otaFile, 2);
} catch (MqttException e) {
throw new RuntimeException(e);
}
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();```markdown
```java
package com.gitee.mqttclient;
import com.gitee.mqttclient.callback.PahoMqttCallback;
import com.gitee.mqttclient.client.PahoMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.Test;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* Код функций автомобильного терминала
* @author thc
*/
public class ClientTest {
protected static String broker = "tcp://1.1.1.1:1883";
protected static String username = "u001";
protected static String password = "123456";
// Тип автомобиля
static String carType = "suv";
// VIN автомобиля
static String vin = "A000001";
// Формат: client-имя пользователя-VIN
static String clientId = "client-" + username + "-" + vin;
private static PahoMqttClient mqttClient;
private void init() throws MqttException {
// Подписка на точку-к-точку сообщения
String topic = "o2o/" + vin + "/+";
mqttClient = PahoMqttClient.create()
.broker(broker)
.auth(username, password)
.clientId(clientId)
.cleanSession(true)
// Автоматическое переподключение
.automaticReconnect(true)
// Подписка на сообщения
.subscribe(topic, 2)
.callback(new MyMqttCallback())
.connect();
}
/**
* Класс обратного вызова для серверной части MQTT
*/
public static class ServerMqttCallback extends PahoMqttCallback {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("[server] Соединение потеряно: " + throwable.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("[server] Получено сообщение: тема: " + topic + ", сообщение: " + mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
}
``` @Test
public void device() throws InterruptedException {
try {
init();
} catch (MqttException me) {
System.err.println("Не удалось подключиться к MQTT");
System.out.println("причина " + me.getReasonCode());
System.out.println("сообщение " + me.getMessage());
System.out.println("локализованное сообщение " + me.getLocalizedMessage());
System.out.println("причина " + me.getCause());
System.out.println("исключение " + me);
me.printStackTrace();
return;
}
// Запуск отдельного потока для отправки данных
new Thread(() -> {
// Топик для отправки данных о местоположении
String upLocationTopic = String.format("prod/%s/%s/base/location", carType, vin);
// Отправка информации о местоположении каждые 10 секунд
while (true) {
try {
// Случайные координаты
String lon = "120.123" + new Random().nextInt(100);
String lat = "30.123" + new Random().nextInt(100);
String location = lon + "," + lat;
System.out.println("Отправка информации о местоположении: " + location);
// Отправка координат
mqttClient.publish(upLocationTopic, location.getBytes(), 1);
} catch (MqttException e) {
throw new RuntimeException(e);
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start(); // stop here
TimeUnit.DAYS.sleep(1);
}
public static class MyMqttCallback extends PahoMqttCallback {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("[client] потерял соединение: " + throwable.getMessage());
} @Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("[client] получил сообщение: topic:" + topic + ", msg:" + mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
}
broker, username, password замените на свои значения
Сначала выполните тестовые примеры для платформы, затем для устройства.
Полный код см. в пакете test проекта.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )