1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/iots-mqtt-client

Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
readme.md

Обзор

MQTT — это протокол связи "машинное устройство к машинному устройству" (M2M)/"Интернет вещей". Он был спроектирован как очень лёгковесный протокол публикации/подписки. Этот протокол полезен для соединений с удалёнными местами, где требуется небольшой размер кода или когда пропускная способность сети ограничена.

mqtt-client предоставляет API к MQTT с лицензией ASL 2.0. Он автоматически восстанавливает соединение с вашим сервером MQTT и возвращает состояние клиента при возникновении сетевых ошибок. Приложения могут использовать стиль API с блокировкой, основанный на будущих значениях или стиль API с обратными вызовами/передачей продолжений.

Использование из Maven

Добавьте следующее в свой файл maven pom.xml.

<dependency>
  <groupId>org.fusesource.mqtt-client</groupId>
  <artifactId>mqtt-client</artifactId>
  <version>1.12</version>
</dependency>

Использование из Gradle

Добавьте следующее в свой файл Gradle.

compile 'org.fusesource.mqtt-client:mqtt-client:1.12'

Использование из любой другой системы сборки

Скачайте файл uber-jar и добавьте его в вашу систему сборки. Uber-jar содержит все необходимые зависимости, которые использует mqtt-client из других проектов.

Использование на Java 1.4Мы также предоставляем файл uber-jar для Java 1.4,

совместимый с JVM Java 1.4. Эта версия файла jar не поддерживает соединения SSL, так как класс SSLEngine, используемый для реализации SSL на NIO, был введен только в Java 1.5.## Настройка соединения MQTT

API с блокировкой, основанного на будущих значениях и API с обратными вызовами используют одинаковое настроение соединения. Вы создаёте новый экземпляр класса MQTT и конфигурируете его с параметрами соединения и сокета. Минимально необходимо вызвать метод setHost перед попыткой установки соединения.

MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
// или
mqtt.setHost("tcp://localhost:1883");

Управление параметрами MQTT

  • setClientId: Используется для установки идентификатора клиента сессии. Это то, что используется MQTT-сервером для идентификации сессии, где setCleanSession(false) используется. Идентификатор должен содержать 23 символа или меньше. По умолчанию используется автоматически сгенерированный идентификатор (основанный на адресе вашего сокета, порту и времени).

  • setCleanSession: Установите значение в false, если вы хотите, чтобы сервер MQTT сохранял подписки на темы и позиции подтверждений между сессиями клиентов. По умолчанию установлено значение true.

  • setKeepAlive: Настройка таймера Keep Alive в секундах. Определяет максимальный временной интервал между сообщениями, полученными от клиента. Это позволяет серверу обнаруживать, что соединение сети с клиентом прервано, без необходимости ждать долгого времени ожидания TCP/IP.

  • setUserName: Устанавливает имя пользователя, используемое для аутентификации против сервера.* setPassword : Устанавливает пароль, используемый для аутентификации против сервера.

  • setWillTopic : Если указан, сервер будет публиковать сообщение Will клиента на указанные темы при непредвиденном отключении клиента.

  • setWillMessage : Сообщение Will для отправки. По умолчанию это сообщение нулевой длины.

  • setWillQos : Устанавливает уровень качества обслуживания для сообщения Will. По умолчанию установлено значение QoS.AT_MOST_ONCE.

  • setWillRetain : Устанавливает значение true, если вы хотите, чтобы сообщение Will было опубликовано с опцией retain.

  • setVersion : Устанавливает значение "3.1.1", чтобы использовать версию MQTT 3.1.1. В противном случае используется протокол версии 3.1 по умолчанию.

Управление повторными подключениями

Подключение автоматически восстанавливается и заново устанавливается сессия связи, если произошла любая сетевая ошибка. Вы можете контролировать частоту попыток повторного подключения и определить максимальное количество попыток повторного подключения с помощью следующих методов:* setConnectAttemptsMax : Максимальное количество попыток повторного подключения перед тем, как ошибка будет возвращена клиенту после первой попытки подключения к серверу. Установите значение -1 для использования неограниченного количества попыток. По умолчанию установлено значение -1.

  • setReconnectAttemptsMax : Максимальное количество попыток повторного подключения перед тем, как ошибка будет возвращена клиенту после установления соединения с сервером. Установите значение -1 для использования неограниченного количества попыток. По умолчанию установлено значение -1.

  • setReconnectDelay : Количество времени в миллисекундах, которое должно пройти до первой попытки повторного подключения. По умолчанию установлено значение 10.

  • setReconnectDelayMax : Максимальное время ожидания в миллисекундах между попытками повторного подключения. По умолчанию установлено значение 30000.

  • setReconnectBackOffMultiplier : Экспоненциальный множитель затухания между попытками повторного подключения. Установите значение 1 для отключения экспоненциального затухания. По умолчанию установлено значение 2.### Настройка параметров сокетаВы можете отрегулировать некоторые параметры сокета, используя следующие методы:

  • setReceiveBufferSize: Устанавливает размер внутреннего буфера приема сокета. По умолчанию равен 65536 (64 КБ).

  • setSendBufferSize: Устанавливает размер внутреннего буфера отправки сокета. По умолчанию равен 65536 (64 КБ).

  • setTrafficClass: Устанавливает класс трафика или октет типа сервиса в заголовке IP для пакетов, отправленных через транспорт. По умолчанию равно 8, что означает, что трафик должен быть оптимизирован для пропускной способности.

Ограничение скорости соединений

Если вы хотите замедлить скорость чтения или записи ваших соединений, используйте следующие методы:

  • setMaxReadRate: Устанавливает максимальное количество байтов в секунду, которое этот транспорт будет принимать данных. Это настройка ограничивает чтение таким образом, чтобы скорость не была превышена. По умолчанию равно 0, что отключает ограничение.

  • setMaxWriteRate: Устанавливает максимальное количество байтов в секунду, которое этот транспорт будет отправлять данных. Это настройка ограничивает запись таким образом, чтобы скорость не была превышена. По умолчанию равно 0, что отключает ограничение.

Использование SSL-соединенийЕсли вы хотите подключиться через SSL/TLS вместо TCP, используйте префикс URI "ssl://" или "tls://" вместо "tcp://" для поля host. Для более детального контроля над тем, какой алгоритм используется, поддерживаются следующие значения протокола:* ssl:// — Использует версию SSL-алгоритма по умолчанию JVM.

  • sslv*:// — Использует конкретную версию SSL, где * является версией, поддерживаемой вашей JVM. Пример: sslv3.
  • tls:// — Использует версию TLS-алгоритма по умолчанию JVM.
  • tlsv*:// — Использует конкретную версию TLS, где * является версией, поддерживаемой вашей JVM. Пример: tlsv1.1.

Клиент будет использовать по умолчанию SSLContext JVM, который конфигурируется через системные свойства JVM, если вы не конфигурируете экземпляр MQTT с помощью метода setSslContext.

SSL-соединения выполняют блокирующие операции против внутреннего пула потоков, если вы не вызываете метод setBlockingExecutor, чтобы конфигурировать этот исполнитель, который они будут использовать вместо этого.

Выбор очереди диспатча

Очередь диспатча HawtDispatch используется для синхронизации доступа к соединению. Если явная очередь не конфигурируется через метод setDispatchQueue, то новая очередь создается для соединения. Установка явной очереди может быть полезна, если вам требуется несколько соединений, чтобы делиться одной и той же очередью для синхронизации.

Использование блокирующего API

Метод MQTT.connectBlocking устанавливает соединение и предоставляет вам соединение с блокирующим API.

BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
```Отправьте сообщения в тему с помощью метода `publish`:

```java
connection.publish("foo", "Привет".getBytes(), QoS.AT_LEAST_ONCE, false);

Вы можете подписаться на несколько тем с помощью метода subscribe:

Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
byte[] qoses = connection.subscribe(topics);

Затем получите и подтвердите прием сообщений с помощью методов receive и ack:

Message message = connection.receive();
System.out.println(message.getTopic());
byte[] payload = message.getPayload();
// Обработайте сообщение, затем:
message.ack();

Наконец, чтобы завершить соединение:

connection.disconnect();

Использование API на основе объектов Future

Метод MQTT.connectFuture устанавливает соединение и предоставляет вам соединение с API на основе объектов Future. Все операции против соединения являются асинхронными и возвращают результат через объект Future.

FutureConnection connection = mqtt.futureConnection();
Future<Void> f1 = connection.connect();
f1.await();

Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)});
byte[] qoses = f2.await();

// Мы можем начать будущее получение...
Future<Message> receive = connection.receive();

// Отправьте сообщение...
Future<Void> f3 = connection.publish("foo", "Привет".getBytes(), QoS.AT_LEAST_ONCE, false);

// Затем получение сообщения будет выполнено.
Message message = receive.await();
message.ack();

Future<Void> f4 = connection.disconnect();
f4.await();

Использование API на основе обратного вызова/передачи продолженияМетод MQTT.connectCallback устанавливает соединение и предоставляет вам доступ к API на основе обратного вызова. Это самый сложный способ использования API, но он может обеспечить наилучшие показатели производительности. Блокирующий и асинхронный API используют этот API под капотом. Все операции над соединением являются асинхронными, а результаты операций передаются через интерфейсы обратного вызова, которые вы реализуете.```java

final CallbackConnection connection = mqtt.callbackConnection(); connection.listener(new Listener() {

public void onDisconnected() {
}

public void onConnected() {
}

});

```java
public void onPublish(String topic, byte[] payload, Runnable ack) {
    // Теперь вы можете обрабатывать полученное сообщение с темы.
    // После выполнения обработки запустите run() из объекта ack.
    ack.run();
}

public void onFailure(Throwable value) {
    connection.close(null); // Произошло отключение соединения.
}
})
connection.connect(new Callback<Void>() {
public void onFailure(Throwable value) {
    result.failure(value); // Если мы не смогли подключиться к серверу.
}

// После успешного подключения...
public void onSuccess(Void v) {

    // Подписываемся на тему
    Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
    connection.subscribe(topics, new Callback<byte[]>() {
        public void onSuccess(byte[] qoses) {
            // Результат запроса подписки.
        }
        public void onFailure(Throwable value) {
            connection.close(null); // Подписка не удалась.
        }
    });
}
```            // Отправляем сообщение в тему
            connection.publish("foo", "Привет".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
                public void onSuccess(Void v) {
                  // Операция публикации завершена успешно.
                }
                public void onFailure(Throwable value) {
                    connection.close(null); // Публикация не удалась.
                }
            });
            
            // Для отключения...
            connection.disconnect(new Callback<Void>() {
                public void onSuccess(Void v) {
                  // Вызывается после отключения соединения.
                }
                public void onFailure(Throwable value) {
                  // Отключение никогда не проваливается.
                }
            });
        }
    });Каждое соединение имеет очередь обработки [HawtDispatch](http://hawtdispatch.fusesource.org/),
которую использует для обработки событий ввода-вывода для сокета. Очередь обработки является Executor,
обеспечивающим последовательное выполнение событий ввода-вывода и процессинга, а также используется для обеспечения синхронного доступа к соединению.
```Коллбэки будут выполняться в очереди dispatch, связанной с соединением, поэтому безопасно использовать соединение внутри коллбэка, но вы НЕ ДОЛЖНЫ выполнять никакие блокирующие операции внутри коллбэка. Если вам требуется выполнить какие-либо действия, которые МОГУТ вызвать блокировку, вы должны отправить их в другой пул потоков для выполнения. Кроме того, если другому потоку требуется взаимодействовать с соединением, он может сделать это только путём использования объекта Runnable, отправленного в очередь dispatch соединения. Пример выполнения Runnable в очереди dispatch соединения:

    connection.getDispatchQueue().execute(new Runnable(){
        public void run() {
            connection.publish(...);
        }
    });

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

MQTT-клиент предоставляет лицензированный по ASL 2.0 API для MQTT. Он обеспечивает автоматическое переподключение к вашему MQTT-серверу и восстановление сессии клиента в случае каких-либо сетевых сбоев. Развернуть Свернуть
Apache-2.0
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/iots-mqtt-client.git
git@api.gitlife.ru:oschina-mirror/iots-mqtt-client.git
oschina-mirror
iots-mqtt-client
iots-mqtt-client
master