Китайский | Английский
Mqttx
разработан на основе протокола MQTT v3.1.1 с целью предоставления лёгко используемого и высокоэффективного broker'а MQTT.
Примечание: ветка v1.2
требует JDK17, остальные ветки требуют JDK8
Связанные проекты: Mqttx-Client реализация и использование очень простого клиента MQTTv3.1.1.
Хотите быстро попробовать через Docker? Смотрите Запуск с помощью Docker
Упаковка
redis
экземплярmvnw -P dev -DskipTests=true clean package
Запуск
java -jar mqttx-1.0.5.BETA.jar
Иллюстрация:
Другие примечания:1. Проект использует lombok, для использования с IDE установите соответствующие плагины.
Рекомендованное средство разработки — IntelliJ IDEA
Пример: в
idea
требуется установка плагинаLombok
, а также активацияEnable annotation processing
черезsettings > Build, Execution, Deployment > Compiler > Annotation Processor
.
Облачные сервисы истекли, доступ к экземплярам невозможен, есть ли друзья, готовые помочь? /(ㄒoㄒ)/~~> В облаке запущено одноэкземпельное служение mqttx
, доступное для функционального тестирования:
- Поддерживает работу без SSL.
- Активирован WebSocket, можно использовать для тестирования через http://ws.tool.tusk.link/, заменив домен на
119.45.158.51
(порт и адрес остаются без изменений).- Поддерживает функцию совместного подписывания.
- Версия развертывания v1.0.6.RELEASE.
mqttx
поддерживает аутентификацию клиентов и управление правами доступа к темам публикаций/подписок, если требуется использование вместе, рекомендуется следующая архитектура:
Сервис аутентификации клиентов реализуется пользователем самостоятельно
Внутренняя реализация (перечислены ключевые компоненты):
├─java │ └─com │ └─jun │ └─mqttx │ ├─broker # Реализация протокола MQTT и обработка сообщений │ │ ├─codec # Кодирование и декодирование │ │ └─handler # Обработчики сообщений (публикация, подписка, соединение и т.д.) │ ├─config # Конфигурация, главным образом объявление бинов │ ├─constants # Константы │ ├─consumer # Потребители сообщений в кластере │ ├─entity # Энтитеты │ ├─exception # Исключения │ ├─service # Интерфейсы бизнес-сервисов (аутентификация пользователей, хранение сообщений и т.д.) │ │ └─impl # По умолчанию реализация │ └─utils # Утилиты └─resources # Ресурсы (файл application.yml находится здесь) ├─META-INF # Дополнительные конфигурации Spring └─tls # Адрес расположения CA
Изображение загружено на **Docker-Hub**, доступ к нему: [fantasywujun/mqttx - Docker Hub](https://hub.docker.com/r/fantasywujun/mqttx) все изображения. После установки окружения Docker выполните команду `docker-compose -f ./docker-compose.yml up` для запуска:
Результат запуска показан ниже:

| Команда docker pull | Описание |
| ----------------------------------------- | -------------------------------------------------------------------------------------------------- |
| `docker pull fantasywujun/mqttx:1.2.0` | Версия `mqttx:1.2.0` на основе `jdk17.0.1` |
| `docker pull fantasywujun/mqttx:1.2.1` | Версия `mqttx:1.2.1` на основе `jdk17.0.1` |
| `docker pull fantasywujun/mqttx:1.2.2` | Версия `mqttx:1.2.2` на основе `jdk17.0.1` |
| `docker pull fantasywujun/mqttx:1.2.3` | Версия `mqttx:1.2.3` на основе `jdk17.0.1` |
**docker-compose** файл содержит следующее:
```yaml
version: "2"
services:
redis:
container_name: redis-for-mqttx
image: redis
mqttx:
container_name: mqttx
image: fantasywujun/mqttx:1.2.2
environment:
mqttx.max-bytes-in-message: 10485760
mqttx.web-socket.enable: false
ports:
- 1883:1883
QoS0 | QoS1 | QoS2 |
---|---|---|
Поддерживается | Поддерживается | Поддерживается |
Для поддержки QoS1
и QoS2
используется Redis
в качестве хранилища данных. Эта часть уже реализована как интерфейсы, которые можно заменить своими реализациями (например, с использованием MySQL
).
#
и одиночные шаблоны +
./
, например a/b/
. Вместо этого используйте a/b
.Примеры:
topicFilter | Соответствующие темы |
---|---|
/a/b/+ |
/a/b/abc , /a/b/test
|
a/b/# |
a/b , a/b/abc , a/b/c/def
|
a/+/b/# |
a/nani/b/abc |
/+/a/b/+/c |
/aaa/a/b/test/c |
Инструмент для проверки находится здесь: com.jun.mqttx.utils.TopicUtils
.
Mqttx
зависит от промежуточного звена для распределения сообщений для поддержки кластера. Поддерживаются следующие промежуточные звенья:
Kafka
– рекомендованная конфигурация для лучшей производительности. Kafka используется в качестве распространителя сообщений для кластера.Redis
– по умолчанию.Реализация принципа представлена ниже:
mqttx.cluster.enable
– ключевой параметр, значение по умолчанию false
.mqttx.cluster.type
– тип промежуточного звена, значение по умолчанию redis
.Примечания:
v1.0.5.RELEASE
функционал кластера имеет ошибки и недоступен.kafka
как кластерного распространителя сообщений требуется ручное изменение конфигурации файла application-*.yml
. Пример конфигурации можно найти в разделе 3. kafka кластер файла application-dev.yml
.#### 4.4 Поддержка SSLДля активации SSL вам потребуется наличие CA (самозаверенного или приобретённого сертификата). Затем следует изменить несколько параметров в файле application.yml
:
mqttx.ssl.enable
– ключевой параметр, значение по умолчанию false
, управляет состоянием SSL для websocket
и socket
.mqttx.ssl.key-store-location
– путь к keystore, относительно classpath
.mqttx.ssl.key-store-password
– пароль для keystore.mqttx.ssl.key-store-type
– тип keystore, например PKCS12
.mqttx.ssl.client-auth
– требование проверки клиентского сертификата серверной стороной, значение по умолчанию NONE
.Файл
mqttx.keystore
в директорииresources/tls
предназначен для тестирования, пароль:123456
.Инструмент для загрузки сертификатов находится в классе
com/jun/mqttx/utils/SslUtils.java
.
Чтобы ограничивать подписку на темы клиентами, внедряется механизм проверки прав доступа к подписке и публикации тем:
mqttx.enable-topic-sub-pub-secure
– ключевой параметр, значение по умолчанию false
.
При получении брокером соединения (conn
) запроса, он запрашивает {clientId, username, password}
у mqttx.auth.url
. Ответ от этого интерфейса содержит объект с полями authorizedSub, authorizedPub
, содержащими список авторизованных тем для подписки и публикации соответственно.
Подробнее см. раздел 4.12 Базовая поддержка аутентификации.3. Брокер проверяет права клиента при каждом событии подписки и публикации.
Поддерживаемые типы тем:
Общий подписчик является частью протокола mqtt5
. MQTTX
реализует его согласно стандарту.
$share/{ShareName}/{фильтр}
, где $share
— префикс, ShareName
— имя общего подписчика, фильтр
— фильтр неподключенного подписчика.round
– циклическое распределение.random
– случайное распределение.Для получения более подробной информации обратитесь к протоколу MQTT Version 5.0 (oasis-open.org).
Ниже приведена схема, демонстрирующая различия между общими темами и обычными темами:
Стратегия распространения сообщений msg-a
зависит от конфигурационного параметра mqttx.share-topic.share-sub-strategy
.
Эта стратегия может использоваться вместе с сессией cleanSession = 1
, при которой клиент, использующий общую тему, будет удален из подписок после отключения соединения сервером. Это означает, что сообщения будут доставлены только активным клиентам.Описание CleanSession: согласно протоколу MQTT 3.1.1, если cleanSession = 1
, то при отключении соединения клиент и сервер обязаны удалить любую предыдущую сессию и начать новую. Эта сессия продолжается до тех пор, пока существует сетевое соединение. Данные состояния этой сессии не должны повторно использоваться в последующих сессиях [MQTT-3.1.2-6].Состояние сессии клиента состоит из следующего:
Состояние сессии сервера состоит из следующего:
Поддерживается
mqttx брокер имеет встроенные системные темы, которые могут быть использованы пользователями по мере необходимости.
Системные темы не поддерживают следующие возможности:
Внимание: механизмы безопасности тем также влияют на возможность подписки клиентов на системные темы; незарегистрированные клиенты не смогут подписываться на системные темы. Системные темы можно разделить на два типа:1. Темы состояния: отображают состояние broker. 2. Функциональные темы: предоставляют внешнюю функциональную поддержку.
Клиенты могут получать состояние broker, подписываясь на системные темы. В настоящее время система поддерживает следующие темы состояния:| Тема | Описание |
| --------------------------------------- | ---------------------------------------------------------- |
| $SYS/broker/{brokerId}/status
| Подписчики этой темы будут регулярно (через mqttx.sys-topic.interval
) получать состояние broker, которое включает все значения ниже.
Примечание: После отключения соединения клиентская подписка будет автоматически отменена. |
| $SYS/broker/activeConnectCount
| Немедленно возвращает текущее количество активных подключений.
Триггер: каждое новое подключение триггерит событие. |
| $SYS/broker/time
| Немедленно возвращает текущий временной штамп.
Триггер: каждый новый запрос триггерит событие. |
| $SYS/broker/version
| Немедленно возвращает версию broker.
Триггер: каждый новый запрос триггерит событие. |
| $SYS/broker/receivedMsg
| Немедленно возвращает общее количество принятых сообщений MQTT с момента запуска broker (не считая ping).
Триггер: каждый новый запрос триггерит событие. |
| $SYS/broker/sendMsg
| Немедленно возвращает общее количество отправленных сообщений MQTT с момента запуска broker (не считая pingAck).
Триггер: каждый новый запрос триггерит событие. |
| $SYS/broker/uptime
| Немедленно возвращает время работы broker в секундах.
Триггер: каждый новый запрос триггерит событие. |
| $SYS/broker/maxActiveConnectCount
| Немедленно возвращает максимальное количество активных TCP-соединений с момента запуска broker. |
Триггер: каждый новый запрос triggeрит событие.|В системной теме $SYS/broker/{brokerId}/status
значение brokerId является конфигурационным параметром (см. 6.1 Конфигурация) и может использоваться вместе с темой $SYS/broker/+/status
.
Формат ответа представлен в виде строки JSON:
{
"activeConnectCount": 1,
"maxActiveConnectCount": 2,
"receivedMsg": 6,
"sendMsg": 77,
"timestamp": "2021-03-23T23:05:37.035",
"uptime": 149,
"version": "1.0.7.RELEASE"
}
поле | описание |
---|---|
activeConnectCount |
текущее количество активных соединений |
maxActiveConnectCount |
максимальное количество активных соединений |
receivedMsg |
количество полученных сообщений, за исключением ping |
sendMsg |
количество отправленных сообщений, за исключением pingAck |
timestamp |
метка времени; (yyyy-MM-ddTHH:mm:ss ) |
uptime |
время работы брокера в секундах |
version |
версия mqttx
|
Этот функциональный запрос был создан в задаче: Отслеживание состояния MQTT-клиента (в сети, вне сети) · Issue #8 · Amazingwujun/mqttx (github.com)| Тема | Описание |
| -------------------------------------------------------------------------------------- | ------------------------------------------------------------ |
| $SYS/broker/{brokerId}/clients/{clientId}/connected
| Тема уведомления о подключении клиента
Триггер: когда клиент подключается, broker отправляет сообщение на эту тему |
| $SYS/broker/{brokerId}/clients/{clientId}/disconnected
| Тема уведомления о отключении клиента
Триггер: когда клиент отключается, broker отправляет сообщение на эту тему |Две системные темы поддерживают шаблонные сопоставления. Примеры:
$SYS/broker/+/clients/#
: Соответствует темам уведомлений о подключении и отключении клиента$SYS/broker/+/clients/+/connected
: Соответствует теме уведомления о подключении клиента$SYS/broker/+/clients/+/disconnected
: Соответствует теме уведомления о отключении клиентаПоддерживает промежуточные системы:
Функция моста сообщений позволяет легко интегрироваться с промежуточной системой.
mqttx.message-bridge.enable
: Включение функции моста сообщенийmqttx.bridge-topics
: Темы сообщений, которые требуется мостить, должны соответствовать требованиям topic для Kafka
При получении сообщения от клиента publish, сначала проверяется, включен ли мост сообщений, затем проверяются темы сообщений, и если они соответствуют темам для моста, сообщение публикуется на MQ.
Поддерживается односторонний мост: устройство(клиент) => mqttx => MQ
Используется алгоритм бакета токенов com.jun.mqttx.utils.RateLimiter
для ограничения скорости для указанных тем.
Алгоритм бакета токенов см. здесь: https://stripe.com/blog/rate-limiters
Краткое объяснение концепции бакета токенов: имеется бакет с максимальной емкостью
capacity
, который заполняется токенами со скоростьюreplenish-rate
. При каждом вызове интерфейса расходуется определенное количество токенов (token-consumed-per-acquire
). Если токены доступны, запрос проходит.*Ограничение скорости применяется только к сообщениям сqos
равным 0.
Пример конфигурации:
mqttx:
rate-limiter:
enable: true
topic-rate-limits:
# Пример 1
- topic: "/test/a"
capacity: 9
replenish-rate: 4
token-consumed-per-acquire: 3
# Пример 2
- topic: "/test/b"
capacity: 5
replenish-rate: 5
token-consumed-per-acquire: 2
capacity
: емкость бакаreplenish-rate
: скорость пополнения токенамиtoken-consumed-per-acquire
: количество токенов, потребляемых при каждом запросеФормула расчета QPS:
QPS = capacity ÷ token-consumed-per-acquire
9 ÷ 3 = 3
5 ÷ 2 = 2.5
QPS = replenish-rate ÷ token-consumed-per-acquire
4 ÷ 3 ≈ 1.3
5 ÷ 2 = 2.5
Устойчивое хранение в mqttx
зависит от redis
. mqttx
будет устойчиво хранить сообщения с параметрами cleanSession = false & qos > 0
. Сообщения сериализуются объектом Serializer
в массив байтов и затем сохраняются в redis
.
На данный момент mqttx
предоставляет два варианта сериализации:
JsonSerializer
KryoSerializer
По умолчанию используется JsonSerializer
, чтобы обеспечить совместимость с предыдущими проектами; после версии v1.0.6.release
KryoSerializer
станет основным сериализатором.
Сериализацию можно изменить путем конфигурации параметра mqttx.serialize-strategy
.#### 4.12 Поддержка базовой аутентификации
mqttx
предоставляет базовый клиентский сервис аутентификации.
Параметры конфигурации:
mqttx.auth.url
: адрес интерфейса для аутентификации.mqttx.auth.timeout
: время ожидания ответа от HttpClient
.mqttx.auth.is-mandatory
: обязательность проверки имени пользователя и пароля.Пользователи могут объявить параметр mqttx.auth.url
в файле конфигурации. Объект com.jun.mqttx.service.impl.DefaultAuthenticationServiceImpl
использует HttpClient
для отправки POST-запроса на адрес mqttx.auth.url
.
Запрос содержит данные mqtt пакета: username, password
.
POST / HTTP/1.1
Host: mqttx.auth.url
Content-Type: application/json
Content-Length: 91
{
"clientId": "device_id_test",
"username": "mqttx",
"password": "123456"
}
При успешной аутентификации ответ представляет собой JSON-строку:
{
"authorizedSub": [
"subTopic1",
"subTopic2"
],
"authorizedPub": [
"pubTopic1",
"pubTopic2"
]
}
Успешный ответ может использоваться вместе с 4.5 поддержка безопасности темы.
Примечание:
http status = 200
, что указывает на успешную аутентификацию, все остальные значения статуса считаются неудачной аутентификацией
Благодарим JetBrains за предоставленные лицензии для открытых проектов 2. Ветка с длительной поддержкой и обновлением
v1.0
: основанная на jdk8
, при этом режим работы Redis IO — блокирующий.v1.2
: основанная на jdk17
, при этом режим работы Redis IO — неблокирующий.3. Чтобы сделать проект mqttx лучше, пожалуйста, активно сообщайте мне о вашем использовании и обучении этому проекту (откройте issue или присоединитесь к группе).Будущие задачи
v1.0.8.RELEASE
v1.1.0.RELEASE
v1.2
v2.0
Версия v1.2
была обновлена с JDK8 до JDK17
Ветка v2.0
будет начинаться с протокола MQTTv5
Рассмотрите использование протокола gossip для реализации функционала кластера, который больше не будет зависеть от Redis или Kafka
Поддержите автора чашкой шоколадного латте 😊
Общение в группе
В директории src/main/resources
находятся три файла конфигураций:
application.yml
application-dev.yml
application-prod.yml
Два последних файла предназначены для разделения конфигураций в различных окружениях, что облегчает управление ими.Описание параметров конфигурации:
Конфигурация | Значение по умолчанию | Описание |
---|---|---|
mqttx.version |
Берется из pom.xml
|
Версия |
mqttx.broker-id |
Берется из pom.xml
|
Уникальный идентификатор приложения |
mqttx.heartbeat |
60s |
Начальное значение сердцебиения, которое будет сброшено сообщением CONN |
mqttx.host |
0.0.0.0 |
Адрес прослушивания |
mqttx.so-backlog |
512 |
Очередь обработки TCP соединений |
mqttx.enable-topic-sub-pub-secure |
false |
Безопасность подписки/публикации тем, включается ограничение клиентских публикаций/подписок |
mqttx.ignore-client-self-pub |
true |
Пропуск сообщений от клиента, отправленных самому себе (когда клиент отправляет сообщение в тему, которую он сам подписал) |
mqttx.max-bytes-in-message |
8092 |
Максимальный размер сообщения, который может принимать mqttx, единицы измерения — байты. |
mqttx.serialize-strategy |
json |
Стратегия сериализации, используемая broker , стратегия кластера должна быть одинаковой. |
mqttx.redis.cluster-session-hash-key |
mqttx.session. |
Ключ хэша сессий Redis для кластера |
mqttx.redis.topic-prefix |
mqttx:topic: |
Префикс темы; используется для сохранения отношения между темами и клиентами |
mqttx.redis.retain-message-prefix |
mqttx:retain: |
Префикс сообщений для хранения; используется для сохранения сообщений retain |
| mqttx.redis.pub-msg-set-prefix
| mqttx:client:pubmsg:
| префикс Redis set для клиентских сообщений pub; сохраняет pubmsg, удаляет после получения puback и pubrec |
| mqttx.redis.pub-rel-msg-set-prefix
| mqttx:client:pubrelmsg:
| префикс Redis set для сообщений pubRel; сохраняет флаг pubrel сообщений, удаляет после получения pubcomp |
| mqttx.redis.topic-set-key
| mqttx:alltopic
| ключ Redis set для всех тем; сохраняет все темы |
| mqttx.redis.message-id-prefix
| mqttx:messageId:
| префикс messageId для клиентов без cleanSession
; использует redis INCR
команду |
| mqttx.redis.client-topic-set-prefix
| mqttx:client:topicset:
| префикс Redis set для подписанных тем клиента; сохраняет все подписанные темы клиента |
| mqttx.cluster.enable
| false
| включение режима работы в кластере |
| mqttx.cluster.inner-cache-consistancy-key
| mqttx:cache_consistency
| ключ для проверки согласованности кэша при запуске приложения |
| mqttx.cluster.type
| redis
| тип кластера |
| mqttx.ssl.enable
| false
| включение SSL |
| mqttx.ssl.client-auth
| NONE
| аутентификация сертификата клиента |
| mqttx.ssl.key-store-location
| classpath:tls/mqttx.keystore
| расположение keyStore |
| mqttx.ssl.trust-store-location
| classpath:tls/mqttx.truststore
| расположение trustStore |
Замечено, что последний столбец был прерван, поэтому продолжаем его:
| mqttx.ssl.key-store-password
| password
| пароль доступа к keyStore |
| mqttx.ssl.trust-store-password
| password
| пароль доступа к trustStore |```markdown
Параметр | Значение | Описание | |
---|---|---|---|
1 | key-store-password |
123456 |
Пароль keyStore |
2 | mqttx.ssl.key-store-type |
pkcs12 |
Тип keyStore |
3 | mqttx.socket.enable |
true |
Включение сокета |
4 | port |
1883 |
Порт прослушивания |
5 | mqttx.websocket.enable |
false |
Включение WebSocket |
6 | mqttx.websocket.port |
8083 |
Порт прослушивания WebSocket |
7 | mqttx.websocket.path |
/mqtt |
Путь WebSocket |
8 | mqttx.share-topic.share-sub-strategy |
round |
Стратегия балансировки нагрузки, поддерживает случайное распределение и циклическое распределение |
9 | mqttx.sys-topic.enable |
false |
Включение системной темы |
10 | mqttx.sys-topic.interval |
60s |
Интервал регулярного публикационного сообщения |
11 | mqttx.message-bridge.enable |
false |
Включение моста между сообщениями |
topics` | `null` | Список тем для моста между сообщениями |
|mqttx. rate-limiter. enable| `false` | Включение ограничения скорости сообщений |
|mqttx. rate-limiter. token-rate-limit| | Пример конфигурации, см. [Поддержка ограничения скорости сообщений](#410-тема-ограничения-скорости-сообщений) |
|mqttx. auth. url| `null` | Адрес интерфейса аутентификации MQTT conn username/password |
|mqttx. auth. timeout| `3s` | Часовой лимит чтения |
|mqttx. auth. is-mandatory| `false` | Обязательность проверки имени пользователя и пароля в сообщении conn |
|mqttx. sharable-payload.| | ||
payload-key-prefix| mqttx:sharable-payload:| префикс ключей Redis для хранения общего груза |
|mqttx. sharable-payload. unique-id-client-ids-set-prefix| mqttx:unique-id:client-ids:| список ассоциированных с общим грузом client_id |
|mqttx. sharable-payload. clean-work-interval| 1m| интервал времени между задачами очистки общего груза |
|mqttx. sharable-payload. threshold-in-message| 128| пороговое значение для активации общего груза; если значение превышает порог, то происходит разделение груза |
```
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )