1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/Ljolan-si-mqtt

Клонировать/Скачать
README.md 8.4 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 30.11.2024 14:32 147cd4c

Введение

Golang MQTT сервер, кластерная версия, в настоящее время поддерживает кластеры БД и прямые подключения.

Использование

  • Corev5 пакет — основной пакет.
  • Проект основан на конфигурации в config/config.toml.
  • Добавьте переменную среды SI_CFG_PATH = «путь к файлу конфигурации», если она не настроена, по умолчанию используется конфигурация config/config.toml.
  • Запустите main.go способом package, чтобы запустить.

Тестирование клиентского вызова

Можно использовать v5 кодировщика-декодера Paho-Golang с открытым исходным кодом для тестирования.

// После локального запуска службы брокера
// Этот код можно скопировать непосредственно в paho_client_test.go для тестирования

func TestClientTest(t *testing.T) {
    conn, _ := net.Dial("tcp","127.0.0.1:1883")
    c := NewClient(ClientConfig{
        Conn:                       conn,
        Router: NewSingleHandlerRouter(func(p *Publish) {
            fmt.Println(p.String())
        }),
    })
    c.serverProps.SharedSubAvailable = true
    require.NotNil(t, c)
    c.SetDebugLogger(log.New(os.Stderr, "CONNECT: ", log.LstdFlags))
 
    cp := &Connect{
        KeepAlive:  30,
        ClientID:   "testClient",
        CleanStart: true,
        Properties: &ConnectProperties{
            ReceiveMaximum: Uint16(200),
        },
        WillMessage: &WillMessage{
            Topic:   "will/topic",
            Payload: []byte("am gone"),
        },
        WillProperties: &WillProperties{
            WillDelayInterval: Uint32(200),
        }
    }
 
    ca, err := c.Connect(context.Background(), cp)
    require.Nil(t, err)
    assert.Equal(t, uint8(0), ca.ReasonCode)
 
    time.Sleep(10 * time.Millisecond)
 
    s := &Subscribe{
        Subscriptions: map[string]SubscribeOptions{
            "test/1": {QoS: 1},
            "test/2": {QoS: 2},
            "test/3": {QoS: 0},
            "$share/aa/test/1": {QoS: 0},
        }
    }
 
    _, err = c.Subscribe(context.Background(), s)
    require.Nil(t, err)
    //assert.Equal(t, []byte{1, 2, 0}, sa.Reasons)
 
    time.Sleep(10 * time.Millisecond)
    var p *Publish
    p = &Publish{
        Topic:   "test/0",
        QoS:     0,
        Payload: []byte("test payload"),
    }
    
    _, err = c.Publish(context.Background(), p)
    require.Nil(t, err)
    
    time.Sleep(10 * time.Millisecond)
 
    p = &Publish{
        Topic:   "test/1",
        QoS:     1,
        Payload: []byte("test payload"),
    }
 
    pa, err := c.Publish(context.Background(), p)
    require.Nil(t, err)
    assert.Equal(t, uint8(0), pa.ReasonCode)
 
    time.Sleep(20 * time.Millisecond)
    p = &Publish{
        Topic:   "test/2",
        QoS:     2,
        Payload: []byte("test payload"),
    }
    
    pr, err := c.Publish(context.Background(), p)
    require.Nil(t, err)
    assert.Equal(t, uint8(0), pr.ReasonCode)
    
    time.Sleep(30 * time.Millisecond)
 }

Текущие поддерживаемые решения

  1. Дизайн кластера MongoDB.
  2. Дизайн кластера MySQL. — Добавлен метод автоматического объединения данных общей подписки кластера, простой код cluster/stat/colong/auto_compress_sub/factory.go.

Используя способ хранения данных кластера базы данных, можно предоставить услуги записи через несколько мастеров и получить данные от рабов. Это может предотвратить единичный сбой главного узла и не будет подключать все брокеры к одному экземпляру базы данных. (Конкретные знания см. в DDIA). Для данных модели предметной области, которые добавляют временную метку, рекомендуется решить, следует ли отказаться от данных на основе временной метки при получении данных подписчиками.

  1. Запуск со статической конфигурацией.

Запуск нескольких узлов

Изображение

MQTTX использует

Изображение

Оптимизация для будущего

  1. Идентификатор клиента, отправляемый клиенту, должен быть уникальным и не должен использовать старый идентификатор клиента, отправленный клиентом.
  2. Можно добавить пакетное удаление для освобождения сообщений из стека, иначе удаление одного за другим будет слишком медленным.
  3. Необходимо инициализировать сообщения, извлечённые из базы данных при запуске сеанса.
  4. Нужно ли удалять старую версию сеанса после разрыва соединения? Если да, то когда это делать?

    --- В настоящее время узел не удаляет его сразу, а ждёт истечения срока действия, после чего система автоматически удаляет его. Когда узел снова подключится к другому узлу, другой узел уведомит этот узел об удалении старого сеанса, затем другой узел получит его из базы данных и инициализирует его. Обратите внимание на сообщения QoS=0.

  5. После разрыва соединения необходимо обработать входные буферы.
  6. Сообщение с истекшим сроком действия.
  7. Срок действия сеанса истекает, и соединение разрывается. Можно ли повторно установить срок действия при повторном подключении?
  8. Режим запроса/ответа (обработка на стороне клиента).
  9. Идентификаторы подписки.
  10. Обработка параметров NoLocal, Retain As Publish, Retain Handling.
  11. Обработка псевдонимов тем (Topic Alias).
  12. Управление потоком.
  13. Атрибут Receive Maximum.
  14. Добавление строки причины (Reason string).
  15. Обработка максимального размера пакета (Maximum Packet Size).
  16. Отложенная отправка завещания.
  17. Если клиент, подписавшийся на общую подписку, отключается, другие узлы всё равно будут отправлять общую подписку этому клиенту. Необходимо обработать и повторно подписаться на эту общую подписку после повторного подключения.
  18. Проблема сохранения общей подписки (проблема решена с помощью MySQL).
  19. ... Системная область UML-проектирования

UML-диаграммы, схемы вызовов методов в разных пакетах (доступна по ссылке).

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/Ljolan-si-mqtt.git
git@api.gitlife.ru:oschina-mirror/Ljolan-si-mqtt.git
oschina-mirror
Ljolan-si-mqtt
Ljolan-si-mqtt
dev-v2-sample