Библиотека, основанная на протоколе MQTT, реализует точечную передачу сообщений:
Req-Rep
) модель;Pub-Sub
) модель;В дизайне тем для NextMQTT уже учтены различия между различными службами и клиентскими узлами. В соответствии с уровневым разделением тем MQTT, правила различных уровней тем в NextMQTT следуют ниже:
/${string:domain}/${string:target-node-id}/${string:pattern-type}/${long:request-id}/${string:sender-node-id}/${string:tag}
domain
: строковое значение имени целевой службы;target-node-id
: строковое значение ID клиента-узла получателя;pattern-type
: строковое значение типа шаблона. В настоящее время существуют три значения:
requests
: тип шаблона запроса;replies
: тип шаблона ответа;pubsub
: тип шаблона публикации-подписки;request-id
: число типа long, которое является уникальным идентификатором запроса или ответа. В модели pubsub это постоянное значение, по умолчанию равно 0;sender-node-id
: строковое значение ID отправителя;tag
: строковое значение, используемое для расширения: позволяет реализовать первичное маршрутизирование сообщений на уровне темы, без необходимости указывать маркеры маршрутизации в полезной нагрузке сообщения.В дизайне тем для NextMQTT каждому конечному устройству присваивается домен, состоящий из domain
и node-id
.Путь /domain/node-id
в правиле темы важен для отправки сообщений конкретному конечному устройству. На основе этого, NextMQTT предоставляет две модели.
Модель Req-Rep
, аналогичная протоколу HTTP, представляет собой взаимодействие одного конечного устройства как клиента и другого как сервера для выполнения модели "запрос-ответ".
На уровне тем используется сегмент ${string:pattern-type}
, чтобы отличать сообщения запроса от сообщений ответа.
Клиент отправляет сообщение серверу, направляя запросное сообщение на сервер с определённым
node-id
. Сервер слушает входящие запросные сообщения.
Шаг 1: Подключение сервера к запросам
Сервер прослушивает входящие запросные сообщения. Соответственно, конечное устройство подписывается на тему сообщений:
/${domain}/${
SERVER
.node-id}/requests
/#
Шаг 2: Отправка запросного сообщения клиентом
Клиент отправляет запросное сообщение серверу, зная его node-id
. Тема отправляемого запросного сообщения выглядит следующим образом:
/${domain}/${
SERVER
.node-id}/requests
/${request-id}/${CLIENT
.node-id}/${tag}
Клиент слушает отправленные ему reply сообщения. Сервер возвращает ответ, создавая Reply сообщение на основе полученного Request сообщения и отправляет его клиенту.Шаг 1: Клиент слушает Reply сообщения
Клиент, который принимает ответ, подписывается на Topic:
/${domain}/${
THIS
.node-id}/replies
/#
Шаг 2: Сервер отправляет Reply сообщение
Сервер отправляет Reply сообщение на Topic:
/${domain}/${
CLIENT
.node-id}/replies
/${request-id}/${THIS
.node-id}/${tag}
Пример кода:
final MQTTSocket server = MQTTSocket.context()
.domain("next-mqtt")
.nodeId("SERVER")
.address("tcp://iot.eclipse.org:1883")
.socket();
final MQTTSocket client = MQTTSocket.context()
.domain("next-mqtt")
.nodeId("CLIENT")
.address("tcp://iot.eclipse.org:1883")
.socket();
server.connect();
client.connect();
// Подписываемся на запросы, адресованные текущему серверному ноду. Тема подписки: "/next-mqtt/SERVER/requests/#"
final long reqrepId = server.addRequestMessageHandler(new MessageHandler() {
@Override
public void onMessage(MQTTSocket socket, Message request) {
// Отправляем ответное сообщение клиенту. Тема отправки: "/next-mqtt/CLIENT/replies/${req-id}/SERVER"
server.send(server.newReplyMessageOf(request, request.payload));
}
});
// Отправляем запрос на сервер и синхронно получаем ответное сообщение
System.out.println("Клиент получил синхронный ответ: " +
client.sendCall(client.newRequestMessageFor("SERVER", "ECHO-SYNC".getBytes()).builder()
.tag("ping-tag")
.build()).execute());
// Отправляем запрос на сервер и асинхронно получаем ответное сообщение
final Latched<Message> reply = new Latched<>();
client.sendCall(client.newRequestMessageFor("SERVER", "ECHO-ASYNC".getBytes())).enqueue(new MessageCallback() {
@Override
public void onError(Exception error) {
error.printStackTrace();
reply.set(null);
}
@Override
public void onSuccess(Message message) {
System.out.println("Асинхронный ответ получен");
reply.set(message);
}
});
``` @Override
public void onMessage(MQTTSocket socket, Message message) {
reply.set(message);
}
});
System.out.println("Клиент получил асинхронный ответ: " + reply.get());
server.removeRequestMessageHandler(reqrepId);
client.disconnect();
server.disconnect();
Режим Pub-Sub представляет собой модель распространения событий, при которой клиент отправляет сообщения о событиях серверу, а сервер подписывается на эти события без ответа.
Топик для подписки сообщений сервера:
/${domain}/${
SERVER
.node-id}/pubsub
/#
Клиент, который отправляет сообщение о событии, использует следующий топик:
/${domain}/${
SERVER
.node-id}/pubsub
/0/${CLIENT
.node-id}/${tag}
Важно отметить, что request-id в сообщении Pub-Sub имеет фиксированное значение и по умолчанию равно 0
.
В режиме Pub-Sub клиент отправляет сообщения о событиях и не ждет ответа; сервер обрабатывает сообщения о событиях и также не отвечает.
Пример кода:
final MQTTSocket server = MQTTSocket.context()
.domain("next-mqtt")
.nodeId("SERVER")
.address("tcp://iot.eclipse.org:1883")
.socket();
final MQTTSocket client = MQTTSocket.context()
.domain("next-mqtt")
.nodeId("CLIENT")
.address("tcp://iot.eclipse.org:1883")
.socket();
server.connect();
client.connect();
// Подписываемся на сообщения типа PubSub, отправляемые конкретному серверному узлу
final long pubsubId = server.addSubMessageHandler(new MessageHandler() {
@Override
public void onMessage(MQTTSocket socket, Message receivedMessage) {
System.out.println("Сообщение Pub получено сервером: " + receivedMessage);
}
});
```// Клиент отправляет сообщение Pub на указанный серверный узел, обеспечивая точечную передачу данных между двумя узлами
client.send(client.newPubMessageFor(NODE_SERVER, "Юоция".getBytes()).builder()
.tag("fun-tag") // Расширенное использование: можно использовать tag для маршрутизации сообщений
.build());
client.send(client.newPubMessageFor(NODE_SERVER, "Чэн".getBytes()));
server.removeSubMessageHandler(pubsubId);
client.disconnect();
server.disconnect();
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )