SmartGo — это распределённое, модельное интеллектуальное промежуточное ПО, разработанное на языке GoLang, которое поддерживает основные функции очередей сообщений и может использоваться для отправки сообщений MQTT в IoT-сетях с количеством подключённых устройств в несколько миллионов.
git config --global core.autocrlf false
, чтобы обеспечить использование формата Unix в локальном коде.Название | Описание |
---|---|
SmartGo-Store | Документация по технологии хранения |
SmartGo-Broker | Документация по брокеру |
SmartGo-Net | Документация по сети |
SmartGo-Register | Документация по регистрации |
SmartGo-Client | Документация по клиенту |
Модуль хранения отвечает за хранение данных, таких как сообщения от производителей, очереди потребления, индексы и т. д., а также за синхронизацию мастер-реплик, запись на диск и очистку файлов.
Технология нулевого копирования позволяет избежать ненужного копирования данных между ядром и пользовательским пространством, что повышает производительность системы. В SmartGo используются два метода нулевого копирования: mmap+write и sendfile. Метод mmap+write обеспечивает высокую эффективность при работе с небольшими блоками данных, но требует более сложного управления безопасностью памяти. Метод sendfile использует DMA и эффективен при работе с большими блоками данных, но может быть менее эффективным при работе с маленькими блоками.
SmartGo использует метод mmap+write из-за необходимости работы с небольшими данными.
CommitLog используется для хранения реальных данных сообщений. По умолчанию путь к CommitLog находится в рабочем каталоге пользователя /store/commitlog.
Структура каталога CommitLog:
commitlog
- 00000000000000000000
- 00000000001073741824
Правила формирования имени файла CommitLog:
Имя файла состоит из 20 символов, слева добавляются нули до нужной длины. Оставшиеся символы представляют собой начальный физический сдвиг файла (начальный сдвиг первого файла равен нулю).
Размер файла по умолчанию составляет 1 ГБ, но его можно настроить с помощью параметра MessageStoreConfig mapedFileSizieCommitLog.
Например, если размер файла по умолчанию равен 1 ГБ = 1073741824 байта, то имя первого файла будет состоять из 20 нулей, так как начальный сдвиг равен нулю. Имя второго файла будет начинаться с 1073741824, так как это начальный сдвиг второго файла. Остальные файлы будут иметь имена, соответствующие их начальным сдвигам.
Начальный сдвиг n-го файла = размер * (n - 1)
Начальный сдвиг первого файла = 1073741824 * (1 - 1) = 0
Начальный сдвиг второго файла = 1073741824 * (2 - 1) = 1073741824
Используя имя файла CommitLog, можно быстро найти информацию о файле.
Индекс файла = (физический начальный сдвиг сообщения - начальный сдвиг самого раннего файла) / размер файла, т. е. (1073741827 - 0) / 1073741824 = 1, что означает, что сообщение находится во втором файле очереди. Структура сообщений файла commitlog:
Номер | Поле | Описание | Количество байт | Примечание |
---|---|---|---|---|
1 | TotalSize | Общая длина сообщения | 4 | |
2 | MagicCode | MagicCode | 4 | MagicCode делится на: MessageMagicCode и BlankMagicCode. MessageMagicCode указывает на правильное содержание сообщения; BlankMagicCode означает, что пространство файла CommitLog недостаточно, и используются пустые байты для заполнения файла. |
3 | BodyCRC | CRC содержимого сообщения | 4 | Значение BodyCRC — это 32-битный избыточный код проверки, сгенерированный с помощью CRC32 для содержимого сообщения (body), который используется для обеспечения правильности сообщения. |
4 | QueueId | Идентификатор очереди сообщений | 4 | |
5 | Flag | Флаг сообщения | 4 | |
6 | QueueOffset | Позиция в очереди сообщений | 8 | Самоинкрементное значение, логическая позиция в очереди сообщений, которая позволяет найти данные в очереди потребления; QueueOffset * 20 — физический сдвиг очереди сообщений. |
7 | PhysicalOffset | Физическая позиция | 8 | |
8 | SysFlag | MessageSysFlag | 4 | |
9 | BornTimestamp | Время создания сообщения | 8 | |
10 | BornHost | Адрес и порт отправителя сообщения | 8 | |
11 | StoreTimestamp | Время сохранения сообщения | 8 | |
12 | StoreHost | Адрес и порт получателя сообщения | 8 | |
13 | ReconsumeTimes | Количество повторных использований сообщения | 4 | |
14 | PreparedTransationOffset | 8 | ||
15 | BodyLength | Длина содержимого сообщения | 4 | |
16 | Body | Содержимое сообщения | bodyLength | |
17 | TopicLength | Длина темы | 1 | |
18 | Topic | Тема | topicLength | |
19 | PropertiesLength | Длина дополнительных свойств | 2 | |
20 | Properties | Дополнительные свойства | propertiesLength |
Добавление данных в CommitLog включает запись данных в MapedFile. Каждый MapedFile соответствует двоичному файлу, хранящему сообщения. При создании MapedFile отображается в памяти. При добавлении сообщения данные, которые необходимо сохранить, записываются в память. После этого служба записи на диск сохраняет данные из памяти в двоичный физический файл. Основной процесс добавления данных в CommitLog показан на рисунке ниже. 1.4ConsumeQueue
Потребительская логическая очередь, соответствующая папке /store/consumequeue. Структура каталогов для каждого файла потребительской очереди выглядит следующим образом:
consumequeue
-- topic
-- queue id
-- 00000000000000000000
-- 00000000000000001040
-- 00000000000000002080
Правила создания имени файла consumequeue:
Правила создания имени файла commitlog аналогичны. Следует обратить внимание на то, что размер файла maped равен округлению в большую сторону (указанный размер / размер информации о местоположении сообщения) * размер информации о местоположении сообщения.
Например:
Указанный размер файла очереди потребления = 1024
Размер информации о местоположении сообщения = 20
mapedFileSize = округление в большую сторону (1024 / 20) * 20
mapedFileSize = 1040
Структура файла consumequeue:
В ConsumeQueue нет необходимости хранить содержимое сообщений, вместо этого он хранит смещение сообщения в CommitLog. То есть, по сути, ConsumeQuue является индексным файлом CommitLog. | Номер | Поле | Описание | Размер в байтах | Примечание | | --- | --- | --- | --- | --- | | 1 | CommitLog Offset | Начальное физическое смещение CommitLog | 8 | | | 2 | Size | Размер сообщения | 4 | | | 3 | Message Tag Hashcode | Хеш-код тега сообщения | 8 | Используется для фильтрации сообщений при подписке (при подписке, если указан тег, можно быстро найти подписанное сообщение по хеш-коду) |
ConsumeQueue имеет фиксированную длину, и размер каждой записи данных составляет 20 байт. По умолчанию каждый файл имеет размер 60 миллионов байт.
При потреблении сообщения Consumer должен выполнить два шага: сначала прочитать ConsumeQueue, чтобы получить смещение, а затем прочитать CommitLog, чтобы получить содержимое сообщения.
Когда сообщение добавляется, после добавления сообщения в commitLog отправляется запрос на распространение. Служба распространения вызывает MessageStore для добавления информации о местоположении сообщения. На основе Topic и QueueId получается ConsumeQueue. Информация о местоположении сообщения добавляется в соответствующую очередь потребления, и, наконец, она сохраняется в двоичном файле. Основной процесс показан на рисунке ниже. 1.1.6 Главный и подчинённый: синхронизация
В режиме развёртывания кластера главный (Master) и подчинённый (Slave) узлы связываются друг с другом путём указания одинакового параметра brokerName. Идентификатор BrokerId главного узла должен быть равен 0, а идентификатор BrokerId подчинённого узла — больше 0.
Под главным узлом может быть подключено несколько подчинённых узлов. Различить их можно по разным значениям BrokerId. Каждый брокер будет единообразно сохранять все свои топики в файле в формате JSON. Smargo сохраняет путь к файлу топиков как /текущий каталог пользователя/store/config/topic.json
.
В этом файле хранится основная информация о каждом топике, такая как:
Содержание файла:
{
"topicConfigTable": {
"topicConfigTable": {
"%RETRY%consumerGroupId-example-200": {
"SEPARATOR": "",
"topicName": "%RETRY%consumerGroupId-example-200",
"readQueueNums": 1,
"writeQueueNums": 1,
"perm": 6,
"topicFilterType": 0,
"topicSysFlag": 0,
"order": false
}
}
},
"dataVersion": {
"timestamp": 1511333414604049700,
"counter": 2023
}
}
Инициализация
При запуске брокера он загружает файл Topic.json и поддерживает в памяти связь между названиями топиков и объектами топиков. Любые операции с топиками обновляют эту связь в памяти и файл Topic.json.
Создание топика
Создание топика инициируется клиентом. Если брокер не обнаруживает топик, который клиент хочет отправить, то создание происходит следующим образом: ``` Вот перевод текста на русский язык:
Разработка и тестирование программного обеспечения включают в себя различные аспекты, такие как анализ требований, проектирование, кодирование, тестирование и документирование. В процессе разработки и тестирования программного обеспечения используются различные инструменты и методы, которые помогают обеспечить качество и надёжность программного продукта.
Анализ требований является важным этапом разработки программного обеспечения, который позволяет определить, какие функции и возможности должны быть реализованы в программном продукте. На основе анализа требований разрабатывается техническое задание, которое служит основой для проектирования и разработки программного обеспечения.
Проектирование включает в себя разработку архитектуры программного обеспечения, определение компонентов и модулей, а также их взаимодействие. Проектирование является ключевым этапом, который определяет структуру и функциональность программного продукта.
Кодирование представляет собой процесс написания кода, который реализует функциональность, определённую на этапе проектирования. Кодирование является одним из самых важных этапов разработки программного обеспечения, так как от качества кода зависит работоспособность и эффективность программного продукта.
Тестирование является процессом проверки работоспособности и качества программного обеспечения. Тестирование включает в себя проверку функциональности, производительности, безопасности и других аспектов программного продукта. Документация является важной частью процесса разработки программного обеспечения, которая содержит информацию о требованиях, проектировании, коде и тестировании.
В тексте перевода были исправлены некоторые грамматические ошибки и неточности. Потребитель поддерживает SubscriptionGroup через сервис сердцебиения.
* **Персистентность**
Каждый раз, когда Broker изменяет отношения SubscriptionGroup, происходит процесс персистентности. Smargo сохраняет путь к файлу subscriptionGroup в каталоге текущего пользователя /store/config/subscriptionGroup.json. Структура хранения выглядит следующим образом:
{
topicName: {
"groupName": "xx", // имя группы подписки
"consumeEnable": true, // возможность потребления
"consumeFromMinEnable": true, // разрешение на потребление с минимального положения очереди, по умолчанию в рабочей среде установлено значение false
"consumeBroadcastEnable": true, // разрешение на широковещательное потребление
"retryQueueNums": 1, // количество очередей повтора для сообщений при неудачном потреблении
"retryMaxTimes": 16, // максимальное количество попыток повторного потребления, превышение которого приводит к отправке сообщения в очередь недоставленных сообщений, прекращению повторных попыток и генерации оповещения
"brokerId": 0, // брокер, с которого начинается потребление
"whichBrokerWhenConsumeSlowly": 0 // при обнаружении накопления сообщений перенаправление запросов на потребление от Consumer на другую Slave-машину
},
"dataVersion": {
"timestamp": 1511342161274071800,
"counter": 3
}
} Весь процесс отправки сообщения делится на два этапа:
* **Consumer** отправляет сообщение обратно.
* В случае, если сообщение не было успешно обработано на стороне Consumer, оно отправляется в очередь повторных попыток. Broker получает сообщение и проверяет следующее:
1. Существует ли группа подписки для текущего сообщения.
2. Есть ли у текущего Broker разрешение на запись.
3. Получает Topic очереди повторных попыток (обычно это %RETRY% + groupName) и вычисляет QueueID.
4. Если количество попыток обработки сообщения больше установленного значения, сообщение отправляется в очередь недоставленных сообщений (Topic обычно имеет вид %DLQ% + GroupName) и больше не обрабатывается.
5. Если количество попыток меньше установленного значения, текущая попытка увеличивается на 3, и обработка сообщения откладывается.
6. Сообщение повторно упаковывается, после чего вызывается store-сервис.
7. Проводится статистика.
* **Producer** отправляет обычное сообщение.
* Отправка обычных сообщений гораздо проще, чем отправка сообщений для повторной попытки. Процесс отправки обычного сообщения включает следующие шаги:
1. Проверяется законность отправки сообщения (например, проверяется соответствие Topic и наличие прав доступа у Broker).
2. Сообщение заново упаковывается и вызывается store-сервис.
3. Проводится статистика.
1. Обработка сообщения потребителем.
* На стороне потребителя существует два способа обработки сообщений: push (активная отправка) и pull (пассивный запрос). Независимо от того, используется ли push или pull, взаимодействие с Broker осуществляется через операцию pull, инициируемую потребителем. Основной процесс выглядит следующим образом: 4.7Hold
- **Назначение**
Потребительский конец получает сообщения от брокера. Если сообщение не существует, то реализуется получение результата путём установления длительного соединения или отправки запроса на получение через определённые промежутки времени. И длительный способ соединения, и отправка запросов через определённые промежутки времени могут привести к потере пропускной способности и ненужной нагрузке на брокер.
Способ задержки запросов на получение сообщений брокером может решить вышеуказанную проблему. Запросы на получение сообщений, которые не были получены, помещаются в очередь ожидания. Ожидание происходит до тех пор, пока не поступит новое сообщение или не истечёт время ожидания. Поскольку служба Hold записывает информацию о канале текущего запроса на извлечение, брокер может активно отправлять клиенту извлечённые сообщения или информацию о состоянии тайм-аута, тем самым избегая потери пропускной способности.
- **Процесс Hold**
Клиентский конец запрашивает у брокера сообщение с определённым смещением. Если смещение больше максимального смещения, это означает, что сообщение ещё не сохранено в брокере и его нельзя получить. Запрос на получение помещается в очередь ожидания, ожидая пробуждения при поступлении нового сообщения. Последовательность запросов Hold показана ниже. 4.8 Сообщение статистики
- Назначение
Broker собирает статистику по количеству и объёму сообщений, помещённых в Topic и Group, а также полученных из них, с учётом таких параметров, как время, источник сообщения и потребитель. В режиме реального времени производится расчёт и запись соответствующего Tps. На основе статистических данных пользователи могут получить представление об объёме своего бизнеса и его колебаниях, а администраторы — оценить текущую пропускную способность кластера Broker и принять решение о необходимости расширения ресурсов.
- Процесс обработки
При запуске Broker инициализирует службу статистики. При успешном получении или сохранении сообщения Broker увеличивает соответствующие значения количества и объёма на основе исходных статистических данных, а также рассчитывает и записывает Tps для разных временных интервалов. График временной шкалы статистической информации о сообщениях выглядит следующим образом: 4.9Producer, Consumer соединение
- Назначение
Запись информации о каналах, соединяющих текущих производителей и потребителей с брокером, а также поддержание отношений между каналами, темами и списками подписчиков. Разработчики и администраторы могут просматривать количество текущих производителей, потребителей, количество клиентов для каждой темы и количество клиентов в одной группе подписки.
- Процесс соединения
Клиенты через реестр запрашивают список маршрутизации брокера, перебирают список маршрутизации и находят адрес брокера для соответствующей бизнес-темы. На основе адреса брокера клиенты соединяются с брокером и периодически отправляют сердцебиение для поддержания соединения канала. После получения сердцебиения брокер в реальном времени поддерживает канал, тему, группу и список подписчиков. Последовательность управления соединением клиента показана на схеме ниже: 1.
1. Структура пакета связи:
- length домен: длина сообщения.
- field домен: содержание сообщения. **Длина**: длина сообщения.
**Длина заголовка**: длина заголовка сообщения.
**Заголовок**: заголовок сообщения.
**Тело**: содержание сообщения.
Заголовок и тело сообщения после анализа данных представляют собой структуру RemotingCommand, которая при коммуникации сериализуется в массив байтов byte[] для передачи на уровне связи.
1.
1.
1. **4.5.1 Структура RemotingCommand**
| **Поле** | **Запрос** | **Ответ** |
| --- | --- | --- |
| code | Код операции запроса, на основе которого получатель выполняет различные действия | Код результата ответа, 0 означает успех, не-0 — различные ошибки |
| Language | Язык реализации отправителя | Язык реализации получателя |
| Version | Версия программы отправителя | Версия программы получателя |
| Opaque | Идентификационный код запроса, используется для многопоточности и повторного использования соединения | Получатель не изменяет, а возвращает напрямую |
| Flag | Флаг уровня связи | Флаг уровня связи |
| Remark | Передаваемая произвольная текстовая информация | Подробное описание ошибки |
| ExtFields | Пользовательские поля запроса | Пользовательские поля ответа |
| CustomHeader | Пользовательская структура, преобразуется в данные типа extFields при передаче | Пользовательская структура, преобразуется в данные типа extFields при передаче |
| Body | Тело запроса | Тело ответа |
1.
1. **4.6 Сжатие сообщений**
После достижения определённой длины сообщения предоставляется функция сжатия сообщений. Алгоритм сжатия использует zip. Второй бит флага SysFlag в RemotingCommand указывает, сжато ли сообщение.
1.
1. **4.7 Обработка пульса**
Компонент связи сам по себе не обрабатывает пульс, обработка пульса выполняется верхним уровнем.
#### Техническое описание SmartGo-Registry
###
###
Для версии V1.0.0
### ©
#### Архитектура базовой платформы Чэнду
#### 2017/11/21
###
#### Содержание
1 Введение 4
2 Взаимодействие модуля Registry 4
2.1 Брокер 4
2.2 Консоль 4
2.3 Сеть 4
2.4 Реестр 4
3 Профессиональные термины 5
4 Принцип реализации реестра 5
4.1 Регистрация брокера 5
4.2 Сканирование активных брокеров 6
4.3 Реестр и ZooKeeper 7
4.4 Основные данные структуры реестра 8
4.5 Изменения данных в памяти реестра 8
4.6 Обычная тема и упорядоченная тема 9
4.7 Тема и сопоставление брокера с темой 10
1. Введение
Реестр поддерживает множество информации о брокерах и темах, устанавливая длительное соединение через сеть и брокера, чтобы поддерживать связь с брокером, одновременно предоставляя обнаружение пульса, обновление данных и другие обычные услуги. Он сохраняет список активных брокеров, включая мастер и подчиненные; также сохраняет все темы и списки очередей всех тем.
1. Взаимодействие модуля реестра
1. 2.1 Брокер
Каждый реестр периодически получает информацию о регистрации брокера и поддерживает адрес узла, роль, идентификатор и другую информацию каждого брокера, а также поддерживает информацию об активности каждого брокера.
1.
1. 2.2 Консоль
Модуль консоли косвенно устанавливает соединение и обменивается данными со всеми реестрами через клиентский интерфейс нижнего уровня.
1.
1. 2.3 Сеть
Реестр создаёт службу через Net (текущий порт — 9876), регистрирует и публикует службу, которую могут вызывать модули Broker, Client, Web и т. д.
1.
1. 2.4 Реестр
Между узлами реестра нет взаимодействия. Большая часть данных, поддерживаемых реестром, хранится в памяти, а данные упорядоченной темы хранятся в файле.
1. Профессиональные термины
Тема
Тема — это сообщение, один онлайн-экземпляр Producer может соответствовать только одной теме, один онлайн-экземпляр Consumer может соответствовать нескольким темам, одно сообщение должно принадлежать одной теме.
QueueData
Очередь темы, содержащая имя брокера, количество очередей чтения, количество очередей записи, права брокера, отметку синхронизации темы.
BrokerData
Данные, описывающие подробную информацию о брокере, включая имя брокера, адрес брокера, идентификатор брокера и т.д.
TopicQueueTable
Описывает сопоставление между данными структуры темы, очереди и брокера.
BrokerAddrTable
Описывает сопоставление между именами брокеров, идентификаторами брокеров и адресами брокеров.
ClusterAddrTable
Описывает соответствие между брокерами и кластерами, можно запросить подробную информацию о брокерах через имена брокеров.
BrokerLiveTable
Описывает структуру информации о сердцебиении брокера, после того как каждый брокер отправляет информацию о сердцебиении, реестр будет поддерживать время последнего обновления информации о сердцебиении брокера. Брокер каждые 30 секунд отправляет информацию о сердцебиении. После получения информации о сердцебиении, Registry поддерживает в памяти структуру данных BrokerLiveTable. Эта структура данных хранит информацию о последнем обновлении времени сердцебиения каждого брокера, адресе брокера и сетевом соединении и т. д.
После запуска Registry сразу начинает выполнять периодическую задачу: каждые 10 секунд выполняется сканирование брокеров, которые не отправляли информацию о сердцебиении в течение 2 минут.
Если в результате сканирования обнаруживаются брокеры, которые не отправляли информацию о сердцебиении в течение последних 2 минут, то Registry активно закрывает сетевое соединение и удаляет данные, хранящиеся в памяти, а также связанные с ними данные о брокере, теме, кластере, фильтре и т.д.
1.
1.1 Registry и ZooKeeper
(1) Для Smartgo данные темы на каждом Master равноправны, ни один из Master не имеет всех данных темы, поэтому функция выбора лидера ZooKeeper не подходит для Smartgo.
(2) В кластере Smartgo необходимо иметь компонент для обработки некоторых общих данных, таких как список брокеров, время обновления брокеров. Обработка некоторых логических отношений между данными с помощью клиента ZooKeeper может быть довольно сложной, и ZooKeeper также должен обеспечивать согласованность между несколькими мастерами, что ещё больше усложняет код. Если есть несколько ролей, код ZooKeeper становится ещё более сложным.
(3) Поскольку в кластере Smartgo не используются некоторые функции ZooKeeper высокого уровня, а только его функции обеспечения согласованности данных и публикации-подписки, лучше написать облегчённую версию Registry, чем полагаться на сложный ZooKeeper.
(4) Registry также можно развернуть в кластерной среде. Между Registry нет никакой синхронизации данных, и Registry с менее чем тысячей строк кода определённо более стабилен, чем ZooKeeper.
1.
1.2 Основные структуры данных Registry
| Структура данных | Тип | Формат данных | Хранимые данные |
| --- | --- | --- | --- |
| TopicQueueTable | HashMap | topic[list<QueueData>] | Сохраняет информацию о теме-очереди |
| BrokerAddrTable | HashMap | brokerName[BrokerData] | Сохраняет адрес брокера |
| ClusterAddrTable | HashMap | clusterName[set<brokerName>] | Сохраняет связь между брокером и кластером |
| BrokerLiveTable | HashMap | brokerAddr[brokerLiveTable] | Сохраняет данные сердцебиения брокера | По умолчанию алгоритм распределения нагрузки для потребительских пушей заключается в равномерном распределении очередей по ConsumerId.
Например, если есть Topic с 24 очередями и один клиент, который подписывается на этот Topic, то он будет потреблять все 24 очереди. Если же клиентов два, каждый из них будет потреблять по 12 очередей из 24. Таким образом достигается распределение нагрузки между клиентами. Клиенты могут переписать эту стратегию распределения нагрузки.
**4.6 Push-широковещательное потребление**
Процесс похож на push-потребление из пуша, но с сохранением локального offset. Распределение нагрузки не влияет на широковещательное потребление.
**4.7 Pull-потребление**
Временная диаграмма сообщений при pull-потреблении выглядит следующим образом:
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )