MQTT — это протокол связи "машинное устройство к машинному устройству" (M2M)/"Интернет вещей". Он был спроектирован как очень лёгковесный протокол публикации/подписки. Этот протокол полезен для соединений с удалёнными местами, где требуется небольшой размер кода или когда пропускная способность сети ограничена.
mqtt-client
предоставляет API к MQTT с лицензией ASL 2.0. Он автоматически восстанавливает соединение с вашим сервером MQTT и возвращает состояние клиента при возникновении сетевых ошибок. Приложения могут использовать стиль API с блокировкой, основанный на будущих значениях или стиль API с обратными вызовами/передачей продолжений.
Добавьте следующее в свой файл maven pom.xml
.
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.12</version>
</dependency>
Добавьте следующее в свой файл Gradle.
compile 'org.fusesource.mqtt-client:mqtt-client:1.12'
Скачайте файл uber-jar и добавьте его в вашу систему сборки. Uber-jar содержит все необходимые зависимости, которые использует mqtt-client из других проектов.
совместимый с JVM Java 1.4. Эта версия файла jar не поддерживает соединения SSL, так как класс SSLEngine, используемый для реализации SSL на NIO, был введен только в Java 1.5.## Настройка соединения MQTT
API с блокировкой, основанного на будущих значениях и API с обратными вызовами используют одинаковое настроение соединения. Вы создаёте новый экземпляр класса MQTT
и конфигурируете его с параметрами соединения и сокета. Минимально необходимо вызвать метод setHost
перед попыткой установки соединения.
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
// или
mqtt.setHost("tcp://localhost:1883");
setClientId
: Используется для установки идентификатора клиента сессии. Это то, что используется MQTT-сервером для идентификации сессии, где setCleanSession(false)
используется. Идентификатор должен содержать 23 символа или меньше. По умолчанию используется автоматически сгенерированный идентификатор (основанный на адресе вашего сокета, порту и времени).
setCleanSession
: Установите значение в false, если вы хотите, чтобы сервер MQTT сохранял подписки на темы и позиции подтверждений между сессиями клиентов. По умолчанию установлено значение true.
setKeepAlive
: Настройка таймера Keep Alive в секундах. Определяет максимальный временной интервал между сообщениями, полученными от клиента. Это позволяет серверу обнаруживать, что соединение сети с клиентом прервано, без необходимости ждать долгого времени ожидания TCP/IP.
setUserName
: Устанавливает имя пользователя, используемое для аутентификации против сервера.* setPassword
: Устанавливает пароль, используемый для аутентификации против сервера.
setWillTopic
: Если указан, сервер будет публиковать сообщение Will клиента на указанные темы при непредвиденном отключении клиента.
setWillMessage
: Сообщение Will для отправки. По умолчанию это сообщение нулевой длины.
setWillQos
: Устанавливает уровень качества обслуживания для сообщения Will. По умолчанию установлено значение QoS.AT_MOST_ONCE.
setWillRetain
: Устанавливает значение true, если вы хотите, чтобы сообщение Will было опубликовано с опцией retain.
setVersion
: Устанавливает значение "3.1.1", чтобы использовать версию MQTT 3.1.1. В противном случае используется протокол версии 3.1 по умолчанию.
Подключение автоматически восстанавливается и заново устанавливается сессия связи, если произошла любая сетевая ошибка. Вы можете контролировать частоту попыток повторного подключения и определить максимальное количество попыток повторного подключения с помощью следующих методов:* setConnectAttemptsMax
: Максимальное количество попыток повторного подключения перед тем, как ошибка будет возвращена клиенту после первой попытки подключения к серверу. Установите значение -1 для использования неограниченного количества попыток. По умолчанию установлено значение -1.
setReconnectAttemptsMax
: Максимальное количество попыток повторного подключения перед тем, как ошибка будет возвращена клиенту после установления соединения с сервером. Установите значение -1 для использования неограниченного количества попыток. По умолчанию установлено значение -1.
setReconnectDelay
: Количество времени в миллисекундах, которое должно пройти до первой попытки повторного подключения. По умолчанию установлено значение 10.
setReconnectDelayMax
: Максимальное время ожидания в миллисекундах между попытками повторного подключения. По умолчанию установлено значение 30000.
setReconnectBackOffMultiplier
: Экспоненциальный множитель затухания между попытками повторного подключения. Установите значение 1 для отключения экспоненциального затухания. По умолчанию установлено значение 2.### Настройка параметров сокетаВы можете отрегулировать некоторые параметры сокета, используя следующие методы:
setReceiveBufferSize
: Устанавливает размер внутреннего буфера приема сокета. По умолчанию равен 65536 (64 КБ).
setSendBufferSize
: Устанавливает размер внутреннего буфера отправки сокета. По умолчанию равен 65536 (64 КБ).
setTrafficClass
: Устанавливает класс трафика или октет типа сервиса в заголовке IP для пакетов, отправленных через транспорт. По умолчанию равно 8
, что означает, что трафик должен быть оптимизирован для пропускной способности.
Если вы хотите замедлить скорость чтения или записи ваших соединений, используйте следующие методы:
setMaxReadRate
: Устанавливает максимальное количество байтов в секунду, которое этот транспорт будет принимать данных. Это настройка ограничивает чтение таким образом, чтобы скорость не была превышена. По умолчанию равно 0, что отключает ограничение.
setMaxWriteRate
: Устанавливает максимальное количество байтов в секунду, которое этот транспорт будет отправлять данных. Это настройка ограничивает запись таким образом, чтобы скорость не была превышена. По умолчанию равно 0, что отключает ограничение.
"ssl://"
или "tls://"
вместо "tcp://"
для поля host
. Для более детального контроля над тем, какой алгоритм используется, поддерживаются следующие значения протокола:* ssl://
— Использует версию SSL-алгоритма по умолчанию JVM.sslv*://
— Использует конкретную версию SSL, где *
является версией, поддерживаемой вашей JVM. Пример: sslv3
.tls://
— Использует версию TLS-алгоритма по умолчанию JVM.tlsv*://
— Использует конкретную версию TLS, где *
является версией, поддерживаемой вашей JVM. Пример: tlsv1.1
.Клиент будет использовать по умолчанию SSLContext
JVM, который конфигурируется через системные свойства JVM, если вы не конфигурируете экземпляр MQTT с помощью метода setSslContext
.
SSL-соединения выполняют блокирующие операции против внутреннего пула потоков, если вы не вызываете метод setBlockingExecutor
, чтобы конфигурировать этот исполнитель, который они будут использовать вместо этого.
Очередь диспатча HawtDispatch используется для синхронизации доступа к соединению. Если явная очередь не конфигурируется через метод setDispatchQueue
, то новая очередь создается для соединения. Установка явной очереди может быть полезна, если вам требуется несколько соединений, чтобы делиться одной и той же очередью для синхронизации.
Метод MQTT.connectBlocking
устанавливает соединение и предоставляет вам соединение с блокирующим API.
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
```Отправьте сообщения в тему с помощью метода `publish`:
```java
connection.publish("foo", "Привет".getBytes(), QoS.AT_LEAST_ONCE, false);
Вы можете подписаться на несколько тем с помощью метода subscribe
:
Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
byte[] qoses = connection.subscribe(topics);
Затем получите и подтвердите прием сообщений с помощью методов receive
и ack
:
Message message = connection.receive();
System.out.println(message.getTopic());
byte[] payload = message.getPayload();
// Обработайте сообщение, затем:
message.ack();
Наконец, чтобы завершить соединение:
connection.disconnect();
Метод MQTT.connectFuture
устанавливает соединение и предоставляет вам соединение с API на основе объектов Future. Все операции против соединения являются асинхронными и возвращают результат через объект Future.
FutureConnection connection = mqtt.futureConnection();
Future<Void> f1 = connection.connect();
f1.await();
Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)});
byte[] qoses = f2.await();
// Мы можем начать будущее получение...
Future<Message> receive = connection.receive();
// Отправьте сообщение...
Future<Void> f3 = connection.publish("foo", "Привет".getBytes(), QoS.AT_LEAST_ONCE, false);
// Затем получение сообщения будет выполнено.
Message message = receive.await();
message.ack();
Future<Void> f4 = connection.disconnect();
f4.await();
MQTT.connectCallback
устанавливает соединение и предоставляет вам доступ к API на основе обратного вызова. Это самый сложный способ использования API, но он может обеспечить наилучшие показатели производительности. Блокирующий и асинхронный API используют этот API под капотом. Все операции над соединением являются асинхронными, а результаты операций передаются через интерфейсы обратного вызова, которые вы реализуете.```javafinal CallbackConnection connection = mqtt.callbackConnection(); connection.listener(new Listener() {
public void onDisconnected() {
}
public void onConnected() {
}
});
```java
public void onPublish(String topic, byte[] payload, Runnable ack) {
// Теперь вы можете обрабатывать полученное сообщение с темы.
// После выполнения обработки запустите run() из объекта ack.
ack.run();
}
public void onFailure(Throwable value) {
connection.close(null); // Произошло отключение соединения.
}
})
connection.connect(new Callback<Void>() {
public void onFailure(Throwable value) {
result.failure(value); // Если мы не смогли подключиться к серверу.
}
// После успешного подключения...
public void onSuccess(Void v) {
// Подписываемся на тему
Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
connection.subscribe(topics, new Callback<byte[]>() {
public void onSuccess(byte[] qoses) {
// Результат запроса подписки.
}
public void onFailure(Throwable value) {
connection.close(null); // Подписка не удалась.
}
});
}
``` // Отправляем сообщение в тему
connection.publish("foo", "Привет".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
public void onSuccess(Void v) {
// Операция публикации завершена успешно.
}
public void onFailure(Throwable value) {
connection.close(null); // Публикация не удалась.
}
});
// Для отключения...
connection.disconnect(new Callback<Void>() {
public void onSuccess(Void v) {
// Вызывается после отключения соединения.
}
public void onFailure(Throwable value) {
// Отключение никогда не проваливается.
}
});
}
});Каждое соединение имеет очередь обработки [HawtDispatch](http://hawtdispatch.fusesource.org/),
которую использует для обработки событий ввода-вывода для сокета. Очередь обработки является Executor,
обеспечивающим последовательное выполнение событий ввода-вывода и процессинга, а также используется для обеспечения синхронного доступа к соединению.
```Коллбэки будут выполняться в очереди dispatch, связанной с соединением, поэтому безопасно использовать соединение внутри коллбэка, но вы НЕ ДОЛЖНЫ выполнять никакие блокирующие операции внутри коллбэка. Если вам требуется выполнить какие-либо действия, которые МОГУТ вызвать блокировку, вы должны отправить их в другой пул потоков для выполнения. Кроме того, если другому потоку требуется взаимодействовать с соединением, он может сделать это только путём использования объекта Runnable, отправленного в очередь dispatch соединения. Пример выполнения Runnable в очереди dispatch соединения:
connection.getDispatchQueue().execute(new Runnable(){
public void run() {
connection.publish(...);
}
});
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )