中文 | English
Mqttx
основан на протоколе MQTT v3.1.1 и направлен на предоставление Mqtt брокера, который является легко используемым и имеет высокие характеристики производительности.mvnw -P test -DskipTests=true clean package
redis
mvnw -P dev -DskipTests=true clean package
java -jar mqttx-1.0.5.BETA.jar
Быстрый старт — режим тестирования Легенда:
Режим тестирования
redis
Режим разработки
redis
, по умолчанию соединение с localhost:6376
без пароляНазываемый режим тестирования и режим разработки предназначен для студентов, чтобы быстро начинать проекты и проверять функциональные тесты. После ознакомления с проектом, студенты могут изменить 6.1 Настройка элемента, чтобы включать или отключать функции, предоставляемые mqttx
.> mqttx
зависит от redis
для обеспечения надежности сообщений, кластеризации и других функций. Также можно использовать другие промежуточные серверы (mysql
, mongodb
, kafka
и т.д.). В то же время springboot
имеет плuggable компоненты типа spring-boot-starter-***
, что удобно для всех для изменения стандартной реализации.### 1.2 Зависимости проекта
Другие инструкции:
Рекомендуется использовать IntelliJ IDEA как средство разработки
Пример:
idea
требует установки плагинаLombok
,settings > Build, Execution, Deployment > Compiler > Annotation Processor
, чтобы активироватьEnable annotation processing
Единственный сервис mqttx
развернут в облаке для функционального тестирования:
119.45.158.51
(порт и адрес остаются без изменений)v1.0.5.BETA
mqttx
поддерживает функции аутентификации клиентов и тематической публикации/подписки. Если вам нужно использовать эти функции вместе, рекомендованная архитектура следующая:
Аутентификационные службы клиентов реализуются пользователями самостоятельно
Внутренние отношения реализационного фреймворка (перечислены только ключевые пункты):
### 2.1 Структура директорий
├─java
│ └─com
│ └─jun
│ └─mqttx
│ ├─broker # пакет реализации и обработки протокола MQTT
│ │ ├─codec # кодеки
│ │ └─handler # обработчик сообщений (публикация, подписка, соединение и т.д.)
│ ├─config # конфигурация, в основном объявление бинов
│ ├─constants # константы
│ ├─consumer # потребитель сообщений кластера
│ ├─entity # класс сущностей
│ ├─exception # класс исключений
│ ├─service # бизнес-сервис (аутентификация пользователя, хранение сообщений и т.д.) интерфейс
│ │ └─impl # базовая реализация
│ └─utils # инструменты
└─resources # файлы ресурсов (application.yml находится в этой папке)
└─tls # адрес хранения CA
Для ускорения локального развертывания проекта используется Docker.
- Перед выполнением локального развертывания вам нужно сначала скачать Docker.
- Соответствие портов (
1883, 8083
) жестко закреплено в файле docker-compose. Если вы измените конфигурацию портовmqttx
, то также следует сделать это вdocker-compose.yml
.
Dockerfile
, и выполните команду docker build -t mqttx:v1.0.4.RELEASE .
.docker-compose up
.## 4 Описание функцийQoS0 | QoS1 | QoS2 |
---|---|---|
Поддерживается | Поддерживается | Поддерживается |
Чтобы поддерживать QoS1 и QoS2, используется Redis в качестве слоя хранения данных. Эта часть была упакована в виде интерфейса, который может быть заменён самим собой (например, использование MySQL).
#
и одиночные звездочки +
./
, не поддерживаются, например a/b/
, пожалуйста, измените на a/b
.Mqttx проверяет только подписанные темы
topicFilter
. Публикуемые темы не проверяются на корректность. Можно включить 4.5 поддержку безопасности тем для ограничения тем, которые клиент может публиковать.
Пример:
topicFilter | совпадающие темы |
---|---|
/a/b/+ | /a/b/abc, /a/b/test |
a/b/# | a/b, a/b/abc, a/b/c/def |
a/+/b/# | a/nani/b/abc |
/+/a/b/+/c | /aaa/a/b/test/c |
Класс для верификации: com.jun.mqttx.utils.TopicUtils
Проект использует Redis pub/sub
для распределения сообщений и поддержки функции кластеризации. Если требуется замена на Kafka
или другое MQ
, потребуется изменение конфигурационного класса ClusterConfig
и замены реализационного класса InternalMessageServiceImpl
.
mqttx.cluster.enable
: функциональный переключатель, значение по умолчанию — false
.Версии до
v1.0.5.RELEASE
имеют ошибки в обработке кластерных сообщений и не могут использоваться.#### 4.4 Поддержка SSL
Чтобы включить SSL, вам сначала потребуется самоподписанный или приобретённый сертификат CA, а затем отредактировать несколько конфигураций в файле application.yml
:
mqttx.ssl.enable
: переключатель функции, значение по умолчанию — false
, управляет как websocket
, так и socket
mqttx.ssl.key-store-location
: адрес сертификата, основан на classpath
mqttx.ssl.key-store-password
: пароль сертификатаmqttx.ssl.key-store-type
: тип хранилища ключей, такой как PKCS12
mqttx.ssl.client-auth
: нужна ли серверу проверка сертификата клиента, значение по умолчанию — false
В каталоге
resources/tls
находитсяmqttx.keystore
для тестирования, пароль:123456
.Инструмент для загрузки сертификатов:
com/jun/mqttx/utils/SslUtils.java
Для ограничения подписок клиентов на темы следует добавить механизм аутентификации подписки на темы и публикации:1. mqttx.enable-topic-sub-pub-secure
: переключатель функции, значение по умолчанию — false
2. Интерфейс AuthenticationService
требуется реализовать при использовании. Объект, возвращаемый этим интерфейсом, содержит authorizedSub
, authorizedPub
для хранения списка тем, на которые клиент имеет права подписки и публикации.
3. Брокер проверяет права доступа клиента во время подписки и публикации сообщений.Поддерживаемые типы тем:
Общие подписки являются частью протокола mqtt5
, и многие системы (например, kafka
) уже поддерживают эту возможность.
mqttx.share-topic.enable
: переключатель функции, значение по умолчанию — true
$share/{ShareName}/{topic-filter}
, где $share
— префикс, ShareName
— имя общего подписчика, а topic-filter
— фильтр неподеленной подписки.hash
, random
, и round-robin
Следующее изображение показывает различия между общими темами и обычными темами:

Стратегия распределения сообщений msg-a
зависит от параметра конфигурации mqttx.share-topic.share-sub-strategy
.
Вы можете использовать сессию cleanSession = 1
. После того, как клиент, использующий общую тему, отключается, сервер удалит подписку, чтобы сообщения общей темы распространялись только на активных клиентов. Очистка сессии Введение: протокол mqtt3.1.1
указывает, что при cleanSession = 1
все состояния (кроме удерживаемых сообщений) связанные с сессией будут удалены после отключения соединения (mqtt5
добавил настройку времени жизни сессии, заинтересованным пользователям можно узнать подробнее). После версии mqttx v1.0.5.BETA
включительно, сообщения сессий при cleanSession = 1
хранятся в памяти, что обеспечивает очень высокую производительность.
Поддерживаемые типы тем:
Общие подписки являются частью протокола mqtt5
, и многие системы (например, kafka
) уже поддерживают эту возможность.
mqttx.share-topic.enable
: переключатель функции, значение по умолчанию — true
$share/{ShareName}/{topic-filter}
, где $share
— префикс, ShareName
— имя общего подписчика, а topic-filter
— фильтр неподеленной подписки.hash
, random
, и round-robin
Следующее изображение показывает различия между общими темами и обычными темами:
Стратегия распределения сообщений msg-a
зависит от параметра конфигурации mqttx.share-topic.share-sub-strategy
.
Вы можете использовать сессию cleanSession = 1
. После того, как клиент, использующий общую тему, отключается, сервер удалит подписку, чтобы сообщения общей темы распространялись только на активных клиентов. Очистка сессии Введение: протокол mqtt3.1.1
указывает, что при cleanSession = 1
все состояния (кроме удерживаемых сообщений) связанные с сессией будут удалены после отключения соединения (mqtt5
добавил настройку времени жизни сессии, заинтересованным пользователям можно узнать подробнее). После версии mqttx v1.0.5.BETA
включительно, сообщения сессий при cleanSession = 1
хранятся в памяти, что обеспечивает очень высокую производительность.> Если значение CleanSession установлено равным 1, то Клиент и Сервер ДОЛЖНЫ отказаться от использования любого предыдущего Состояния и начать новое. Это Состояние существует до тех пор, пока существует сетевое соединение. Данные состояния, связанные с этим Состоянием НЕ ДОЛЖНЫ использоваться повторно в любом последующем Состоянии [MQTT-3.1.2-6].
Состояние клиента состоит из следующих данных:
- Сообщения с уровнем качества обслуживания (QoS) 1 и 2, отправленные серверу, но ещё не полностью подтверждённые.
- Сообщения с уровнем качества обслуживания (QoS) 2, полученные от сервера, но ещё не полностью подтверждённые.
Состояние сервера состоит из следующих данных:
- Существование сессии, даже если остальные данные состояния сессии пусты.
- Подписки клиента.
- Сообщения с уровнями качества обслуживания (QoS) 1 и 2, отправленные клиенту, но ещё не полностью подтверждённые.
- Сообщения с уровнями качества обслуживания (QoS) 1 и 2, ожидающие передачи клиенту.
- Сообщения с уровнем качества обслуживания (QoS) 2, полученные от клиента, но ещё не полностью подтверждённые.
- Опционально, сообщения с уровнем качества обслуживания (QoS) 0, ожидающие передачи клиенту.#### 4.7 Поддержка WebSocket
поддерживается
Клиент может получить состояние брокера, подписавшись на системные темы. В настоящее время система поддерживает следующие темы:
тема | повторение | комментарий |
---|---|---|
$SYS/broker/status |
false |
Подписчики этой темы периодически (mqttx.sys-topic.interval ) будут получать состояние брокера, которое охватывает значения состояний всех нижележащих тем. Примечание: После отключения соединения клиента подписка будет отменена |
$SYS/broker/activeConnectCount |
true |
Немедленно возвращает текущее количество активных подключений |
$SYS/broker/time |
true |
Немедленно возвращает текущее время |
$SYS/broker/version |
true |
Немедленно возвращает версию брокера |
повторение
:
повторение = false
: Подписаться один раз, брокер будет регулярно публиковать данные в эту тему.повторение = true
: Подписаться один раз, брокер отправляет сообщение один раз, и может повторно подписываться несколько раз.
Примечание:
- Механизм безопасности темы также влияет на подписку клиента к системным темам; недопустимым клиентам не позволяется подписываться на системные темы.
- Подписка на системные темы не является постоянной.
Формат ответного объекта — это строка JSON:```json { "активное_количество_соединений": 2, "временная_марка": "2020-09-18 15:13:46", "версия": "1.0.5.ALPHA" }
| поле | Описание |
| ---------------------- | ------------------------------------------------------------------------- |
| `активное_количество_соединений` | Текущее количество активных соединений |
| `временная_марка` | Временная метка; (`гггг-мм-дд чч:мм:сс`) |
| `версия` | Версия `mqttx` |
#### 4.9 Поддержка моста сообщений
Поддержка промежуточного программного обеспечения для сообщений:
- [x] kafka
Функция моста сообщений позволяет удобно подключаться к середине очереди сообщений.
1. `mqttx.message_bridge_enable`: Включение функции моста сообщений
2. `mqttx.bridge_topics`: Темы, которым требуется мостовое соединение
После того как `mqttx` получает сообщение от клиента **публикации**, он сначала проверяет, включен ли мостовой режим, затем проверяет, является ли тема темой, которая требует мостового соединения, и наконец публикует сообщение в **MQ**.
**Поддерживает односторонний мост: устройство(клиент) => mqttx => MQ**
## 5 Сообщение разработчика
1. В состоянии кластера следует рассмотреть возможность интеграции регистрации сервиса, что позволит удобнее управлять состоянием кластера. Можно использовать `consul`. Возможно, я создам ветку для реализации этого. > На самом деле я хотел бы внедрить `SpringCloud`, но чувствую, что `springcloud` немного тяжеловат, поэтому возможно, я создам ветку для его реализации.
2. Устранение ошибок и оптимизация продолжаются, но в основном зависит от студентов, использующих и изучающих `mqttx`, чтобы они предоставляли мне обратную связь по проблемам (если нет обратной связи, я буду считать это отсутствием ~). > Это действительно важно, но пока мало студентов обращаются ко мне за обратной связью. Я человек ограниченной власти после окончания всего.3. Управляемая платформа [mqttx-admin](https://github.com/Amazingwujun/mqttx-admin), основанная на `vue2.0`, `element-ui`, в настоящее время находится в процессе разработки. Обновление функциональности `mqttx` будет приостановлено на некоторое время ~~(Последнее время изучал [mqtt5](http://docs.oasis-open.org/mqtt/mqtt/v5.0/csprd02/mqtt-v5.0-csprd02.html))~~. В ходе разработки проекта было выявлено, что требуются некоторые изменения в `mqttx`, но эти изменения не следует отправлять в основной ветвь mqttx (например, аутентификация безопасности темы должна взаимодействовать с `mqttx-platform`, возможно, я буду использовать [Retrofit](https://square.github.io/retrofit/) для обработки вызовов API, фактически можно использовать `feign`, я считаю, что эти два подхода схожи), мне следует открыть бизнес-ветку для обработки этих изменений. Кстати, разработка проектов на `javascript` так крута, почему ты не подумал об этом раньше? > Вначале мне пришлось направить некоторые усилия на производное проект `mqttx-admin`, но позднее выяснилось, что в `mqttx` всё ещё остаётся слишком много задач, и пришлось изменить план.4. [Оценка производительности](#63-оценка-производительности) показывает, что производительность `mqttx` может быть улучшена. Я буду модифицировать логику обработки `pub/sub` в версии `v1.1.0.RELEASE`.
> Основной акцент — это замена `StringRedisTemplate` на `ReactiveStringRedisTemplate`, переход от **синхронного** режима к **асинхронному**.
5. Введение в направление развития
`mqttx` создаёт две ветви:
- v1.0: `com.jun.mqttx.service.impl` синхронный интерфейс
- v1.1: `com.jun.mqttx.service.impl` изменён на асинхронный интерфейс
Поддержка [mqtt5](http://docs.oasis-open.org/mqtt/mqtt/v5.0/csprd02/mqtt-v5.0-csprd02.html) начинается с версии `v1.0`, поэтому неудивительно, что следующая версия — это `v1.0.6.RELEASE`.
6. Группа обмена
<img src="https://s1.ax1x.com/2020/10/10/0ytoSx.jpg" alt="QR-код группы" height="300" />
## 6 Расписание
### 6.1 Настройка параметров конфигурации
В директории `src/main/resources` находятся три файла конфигурации:
1. `application.yml`
2. `application-dev.yml`
3. `application-prod.yml`
Цель вторых двух файлов заключается в различении конфигураций в различных окружениях для удобства управления.Описание параметров конфигурации:
| Конфигурация | По умолчанию | Описание |
|-------------------------|--------------|--------------------------------------------------------------------------------------------|
| `mqttx.version` | Из `pom.xml` | Версия |
| `mqttx.brokerId` | `1` | Уникальный идентификатор приложения |
| `mqttx.heartbeat` | `60s` | Изначальное значение heartbeat будет перезапрошено сообщением keepalive в коннект-сообщении |
| `mqttx.host` | `0.0.0.0` | Адрес прослушивания |
| `mqttx.soBacklog` | `512` | Очередь обработки TCP соединений |
| `mqttx.enableTopicSubPubSecure` | `false` | Функция безопасности подписки/публикации темы клиентами, когда активирована, она ограничивает темы, которые могут быть опубликованы или подписаны клиентами |
| `mqttx.enableInnerCache` | `true` | После публикации каждого сообщения требуется запрос к Redis для получения списка подписанных клиентов. При активации этой функции будет создано отображение отношений между темой и клиентом в оперативной памяти, что позволит приложению непосредственно обращаться к данным в памяти |
| `mqttx.enableTestMode` | `false` | Переключатель тестового режима, система переходит в тестовый режим после его активации |
| `mqttx.redis.clusterSessionHashKey` | `mqttx.session.key` | Ключ отображения Redis; ключ хранения сессий для кластера |
| `mqttx.redis.topicPrefix` | `mqttx:topic:` | Префикс темы; сохранение отношения между темой и клиентом |
| `mqttx.redis.retainMessagePrefix` | `mqttx:retain:` | Префикс сохраняемого сообщения; сохранение сообщения retain |
|mqttx.redis.|```markdown
| Настройка | Значение | Описание |
|-------------------------------------------------------|-----------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `mqttx.redis.pubMsgSetPrefix` | `mqttx:client:pubmsg:` | Префикс набора Redis для сообщений публикации клиента; сохранение сообщения pubmsg, удаление после получения puback для получения pubrec |
| `mqttx.redis.pubRelMsgSetPrefix` | `mqttx:client:pubrelmsg:` | Префикс набора Redis для сообщений pubRel клиента; сохранение сообщения pubrel, удаление после получения сообщения pubcom |
| `mqttx.redis.topicSetKey` | `mqttx:alltopic` | Набор всех тем; ключ значения Redis; сохранение всех тем |
| `mqttx.cluster.enable` | `false` | Переключатель кластера |
| `mqttx.cluster.innerCacheConsistancyKey` | `mqttx:cache_consistence` | После запуска приложения первым делом выполняется запрос к Redis без этого значения ключа, затем проверяется согласованность |
| `mqttx.ssl.enable` | `false` | Переключатель SSL |
| `mqttx.ssl.client-auth` | `false` | Проверка сертификата клиента |
| `mqttx.ssl.keyStoreLocation` | `classpath:tls/mqttx.keystore` | Расположение keyStore |
| `mqttx.ssl.keyStorePassword` | `123456` | Пароль keyStore |
| `mqttx.ssl.keyStoreType` | `pkcs12` | Тип keyStore |
| `mqttx.socket.enabled` | `true` | Переключатель сокета |
| `mqttx.socket.port` | `1883` | Порт прослушивания сокета |
| `mqttx.websocket.enable` | `false` | Переключатель WebSocket |
| `mqttx.websocket.port` | `8083` | Порт прослушивания WebSocket |
| `mqttx.websocket.path` | `/mqtt` | Путь WebSocket |
| `mqttx.share-topic.enable` | `true` | Переключатель функции общего топика |
| `mqttx.share-topic.share-sub-strategy` | `round-robin` | Стратегия балансировки нагрузки, в настоящее время поддерживаются случайный, опрос, хэширование |
| `mqttx.sys-topic.enable` | `false` | Переключатель функции системного топика |
``````diff
+ interval` | `60c` | интервал периодического запуска |
+ | `mqttx.sys-topic.qos` | `0` | уровень качества обслуживания топика |
+ | `mqttx.message-bridge.enable` | `false` | переключатель функции моста сообщений |
+ | `mqttx.message-bridge.topics` | `null` | список тем, которым требуется мостовое соединение |
epoll
, см. https://netty.io/wiki/native-transports.html
cleanSession
Redis
MQTTv3.1.1
v1.1.0.RELEASE (в разработке)
Redis
асинхронной для повышения производительностиУсловия тестирования просты, а результаты даны для справки.
Версия: MQTTX v1.0.5.BETA
Инструменты: mqtt-bench
Машина:
Система | Процессор | Память |
---|---|---|
Windows 10 |
Intel Core i5-4460 |
16 ГБ |
Redis
cleanSession
: true> На самом деле, хранение сообщений pub
не использует Redis
. Причины этого см. раздел "Поддержка общих тем"
Выполнить java -jar -Xmx1g -Xms1g mqttx-1.0.5.BETA.jar
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=0 -count=1000
2020-09-30 15:33:54.462089 +0800 CST Начало бенчмарка
2020-09-30 15:34:33.6010217 +0800 CST Конец бенчмарка
Результат: брокер=tcp://localhost:1883, клиентов=1000, total_count=1000000, длительность=39134мс, пропускная_способность=25553.23сообщений/сек
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=1 -count=1000
2020-09-30 15:29:17.9027515 +0800 CST Начало бенчмарка
2020-09-30 15:30:25.0316915 +0800 CST Конец бенчмарка
Результат: брокер=tcp://localhost:1883, клиентов=1000, total_count=1000000, длительность=67124мс, пропускная_способность=14897.80сообщений/сек
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=2 -count=1000
2020-09-30 15:37:00.0678207 +0800 CST Начало тестирования
2020-09-30 15:38:55.4419847 +0800 CST Конец тестирования
Результат: брокер=tcp://localhost:1883, клиентов=1000, total_count=1000000, длительность=115369мс, пропускная_способность=8667.84сообщений/сек
Количество одновременных соединений | Действие | Размер одного сообщения | Количество сообщений одного соединения | Общее количество сообщений | QoS | Время выполнения | QPS |
---|---|---|---|---|---|---|---|
1000 |
Отправка сообщений | 1024 байта |
1000 |
Один миллион | 0 |
39.1 c |
25553 |
1000 |
Отправка сообщений | 1024 байта |
1000 |
Один миллион | 1 |
67.1 c |
14897 |
1000 |
Отправка сообщений | 1024 байта |
1000 |
Один миллион | 2 |
115.3 c |
8667 |
redis
cleanSession
: false
Запустите java -jar -Xmx1g -Xms1g mqttx-1.0.5.BETA.jar
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=0 -count=1000
2020-09-30 17:03:55.7560928 +0800 CST Начало тестирования
2020-09-30 17:04:36.2080909 +0800 CST Конец тестирования
Результат: брокер=tcp://localhost:1883, клиентов=1000, total_count=1000000, длительность=40447 мс, пропускная способность=24723.71 сообщений/сек
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=1 -count=1000
2020-09-30 17:06:18.9136484 +0800 CST Начало тестирования
2020-09-30 17:08:20.9072865 +0800 CST Конец тестирования
Результат: брокер=tcp://localhost:1883, клиентов=1000, total_count=1000000, длительность=121992 мс, пропускная способность=8197.26 сообщений/сек
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=2 -count=1000
2020-09-30 17:09:35.1314262 +0800 CST Начало тестирования
2020-09-30 17:13:10.7914125 +0800 CST Конец тестирования
```Результат: брокер=tcp://localhost:1883, клиентов=1000, total_count=1000000, длительность=215656 мс, пропускная способность=4637.01 сообщений/сек
```| Количество одновременных соединений | Действие | Размер одного сообщения | Количество сообщений в одном соединении | Общее количество сообщений | QoS | Время выполнения | QPS |
| ------------ | -------- | ------------ | -------------- | -------- | ---- | -------- | ------- |
| `1000` | Отправка сообщения | `1024 байта` | `1000` | Один миллион | `0` | `40.4 секунды` | `24723` |
| `1000` | Отправка сообщения | `1024 байта` | `1000` | Один миллион | `1` | `121.9 секунды` | `8197` |
| `1000` | Отправка сообщения | `1024 байта` | `1000` | Один миллион | `2` | `215.6 секунд` | `4637` |```**Расход ресурсов: `cpu: 45%`, `mem: 440 MB`**
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )