1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/durcframework-paho-mqtt-client

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
readme.md

paho-mqtt-client

MQTT-клиент, основанный на paho

Способ использования

Maven-зависимость:

<dependency>
    <groupId>net.oschina.durcframework</groupId>
    <artifactId>paho-mqtt-client</artifactId>
    <version>1.0.0</version>
</dependency>

Запись кода

PahoMqttClient mqttClient = PahoMqttClient.create()
    .broker(broker)
    .auth(username, password)
    .clientId(clientId)
    .cleanSession(true)
    // Автоматическое восстановление соединения
    .automaticReconnect(true)
    // Подписка на сообщения
    .subscribe(topic, 2)
    // Установка обратного вызова для обработки сообщений
    .callback(new MyMqttCallback())
    .connect();

public static class MyMqttCallback extends PahoMqttCallback {

    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("[client] потеряно соединение:" + throwable.getMessage());
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        System.out.println("[client] получено сообщение: topic:" + topic + ", msg:" + mqttMessage.toString());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
}

Отправка сообщений

mqttClient.publish(upLocationTopic, location.getBytes(), 1);

Тестовые примеры

В этом примере используется система车联网 (система车联网 представляет собой систему управления транспортными средствами через интернет) в качестве тестового сценария. Автомобильные терминалы отправляют данные о местоположении с определенной периодичностью, а платформа IOT отправляет команды на обновление через OTA с определенной периодичностью.

Функции реализации платформы IOT:

  • Подписка на топик местоположения, получение данных о местоположении от устройств

  • Отправка сообщений OTA на автомобильные терминалыФункции реализации автомобильных терминалов:

  • Подписка на топик точек для отправки команд, получение команд от платформы

  • Подписка на топик местоположения, отправка данных о местоположении

Код реализации функций платформы IOT

package com.gitee.mqttclient;

import com.gitee.mqttclient.callback.PahoMqttCallback;
import com.gitee.mqttclient.client.PahoMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
 * Модель платформы
 * @author thc
 */
public class ServerTest {

    protected static String broker = "tcp://1.1.1.1:1883";
    protected static String username = "s001";
    protected static String password = "123456";
    static PahoMqttClient mqttClient;

    /**
     * Запуск серверной части
     * @throws InterruptedException
     * @throws MqttException
     */
    @Test
    public void server() throws InterruptedException, MqttException {
        String clientId = "server-node-1";
        // Серверная часть слушает все темы для позиционирования автомобилей
        String topic = "prod/+/+/base/location/#";
        mqttClient = PahoMqttClient.create()
                .broker(broker)
                .auth(username, password)
                .clientId(clientId)
                .cleanSession(true)
                // Автоматическое восстановление соединения
                .automaticReconnect(true)
                // Подписка на сообщения
                .subscribe(topic, 1)
                .callback(new ServerMqttCallback())
                .connect();
    }
}
```        // Запуск отдельного потока для отправки команд
        new Thread(() -> {
            // Каждые 20 секунд отправляется команда для OTA-обновления автомобиля A000001
            String downTopic = "o2o/A000001/ota";
            while (true) {
                String otaContent = String.valueOf(System.currentTimeMillis());
                System.out.println("Отправка OTA-команды: " + otaContent);
                byte[] otaFile = otaContent.getBytes();
                try {
                    mqttClient.publish(downTopic, otaFile, 2);
                } catch (MqttException e) {
                    throw new RuntimeException(e);
                }
                try {
                    TimeUnit.SECONDS.sleep(20);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();```markdown
```java
package com.gitee.mqttclient;

import com.gitee.mqttclient.callback.PahoMqttCallback;
import com.gitee.mqttclient.client.PahoMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.Test;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * Код функций автомобильного терминала
 * @author thc
 */
public class ClientTest {

    protected static String broker = "tcp://1.1.1.1:1883";
    protected static String username = "u001";
    protected static String password = "123456";

    // Тип автомобиля
    static String carType = "suv";
    // VIN автомобиля
    static String vin = "A000001";

    // Формат: client-имя пользователя-VIN
    static String clientId = "client-" + username + "-" + vin;

    private static PahoMqttClient mqttClient;

    private void init() throws MqttException {
        // Подписка на точку-к-точку сообщения
        String topic = "o2o/" + vin + "/+";
        mqttClient = PahoMqttClient.create()
                .broker(broker)
                .auth(username, password)
                .clientId(clientId)
                .cleanSession(true)
                // Автоматическое переподключение
                .automaticReconnect(true)
                // Подписка на сообщения
                .subscribe(topic, 2)
                .callback(new MyMqttCallback())
                .connect();
    }

    /**
     * Класс обратного вызова для серверной части MQTT
     */
    public static class ServerMqttCallback extends PahoMqttCallback {

        @Override
        public void connectionLost(Throwable throwable) {
            System.out.println("[server] Соединение потеряно: " + throwable.getMessage());
        }

        @Override
        public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
            System.out.println("[server] Получено сообщение: тема: " + topic + ", сообщение: " + mqttMessage.toString());
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        }
    }
}
```    @Test
    public void device() throws InterruptedException {
        try {
            init();
        } catch (MqttException me) {
            System.err.println("Не удалось подключиться к MQTT");
            System.out.println("причина " + me.getReasonCode());
            System.out.println("сообщение " + me.getMessage());
            System.out.println("локализованное сообщение " + me.getLocalizedMessage());
            System.out.println("причина " + me.getCause());
            System.out.println("исключение " + me);
            me.printStackTrace();
            return;
        }

        // Запуск отдельного потока для отправки данных
        new Thread(() -> {
            // Топик для отправки данных о местоположении
            String upLocationTopic = String.format("prod/%s/%s/base/location", carType, vin);

            // Отправка информации о местоположении каждые 10 секунд
            while (true) {
                try {
                    // Случайные координаты
                    String lon = "120.123" + new Random().nextInt(100);
                    String lat = "30.123" + new Random().nextInt(100);
                    String location = lon + "," + lat;
                    System.out.println("Отправка информации о местоположении: " + location);
                    // Отправка координат
                    mqttClient.publish(upLocationTopic, location.getBytes(), 1);
                } catch (MqttException e) {
                    throw new RuntimeException(e);
                }
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();        // stop here
        TimeUnit.DAYS.sleep(1);
    }

    public static class MyMqttCallback extends PahoMqttCallback {

        @Override
        public void connectionLost(Throwable throwable) {
            System.out.println("[client] потерял соединение: " + throwable.getMessage());
        }        @Override
        public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
            System.out.println("[client] получил сообщение: topic:" + topic + ", msg:" + mqttMessage.toString());
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }
    }


}

broker, username, password замените на свои значения

Сначала выполните тестовые примеры для платформы, затем для устройства.

Полный код см. в пакете test проекта.

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

MQTT-клиент, основанный на paho, который поддерживает подписку и публикацию сообщений по темам, а также автоматическое восстановление соединения при разрыве. Развернуть Свернуть
Java
MIT
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/durcframework-paho-mqtt-client.git
git@api.gitlife.ru:oschina-mirror/durcframework-paho-mqtt-client.git
oschina-mirror
durcframework-paho-mqtt-client
durcframework-paho-mqtt-client
master