Мост Apache-Kafka: режим исходящего трафика
Мост данных Apache-Kafka обеспечивает взаимодействие между нашей локальной средой MQTT и внешними кластерами Apache-Kafka. В режиме исходящего трафика локальный брокер MQTT настроен на пересылку сообщений на указанный удалённый сервер Apache-Kafka.
Параллельные соединения
MQTT позволяет нескольким клиентам устанавливать параллельные подключения к мосту Apache-Kafka. Количество параллельных клиентских подключений к Apache-Kafka можно настроить во время конфигурации моста. Путем точной настройки этого параметра можно максимально использовать ресурсы сервера, что приводит к увеличению пропускной способности сообщений и повышению производительности в условиях параллельной работы. Эта функция особенно полезна для приложений, требующих обработки с высокой нагрузкой и параллелизмом.
Правила генерации идентификатора клиента Apache-Kafka:
${client_id_prefix}:${bridge_name}:egress:${node_id}:${entry_index}:${client_no}
Фрагмент | Описание |
---|---|
${client_id_prefix} | Настроенный префикс идентификатора клиента |
${bridge_name} | Имя моста |
${node_id} | Идентификатор узла, на котором запущен клиент Apache-Kafka |
${entry_index} | Индекс записи темы |
${client_no} | Число от 1 до настроенного ограничения параллельных клиентских подключений к Apache-Kafka |
Плагин:
rmqtt-bridge-egress-kafka
Файл конфигурации плагина:
plugins/rmqtt-bridge-egress-kafka.toml
Структура файла конфигурации плагина:
[[bridges]]
name = "bridge_kafka_1"
connection configuration
[[bridges.entries]]
topic filter configuration
[[bridges.entries]]
topic filter configuration
[[bridges]]
name = "bridge_kafka_2"
connection configuration
[[bridges.entries]]
topic filter configuration
[[bridges.entries]]
topic filter configuration
Структура файла конфигурации позволяет настраивать несколько мостов, каждый из которых может подключаться к отдельному удалённому серверу Apache-Kafka. Кроме того, для каждого соединения моста можно указать несколько наборов фильтров тем.
Параметры конфигурации плагина:
[[bridges]]
# Включить мост. Значения: true/false. По умолчанию: true.
enable = true
# Имя моста.
name = "bridge_kafka_1"
# Серверы начальной загрузки для кластера Kafka.
servers = "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"
# Префикс для идентификатора клиента.
client_id_prefix = "kafka_001"
# Максимальное ограничение клиентов, подключённых к удалённому брокеру kafka
concurrent_client_limit = 3
# См. дополнительные свойства и их определения на https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
[bridges.properties]
"message.timeout.ms" = "5000"
[[bridges.entries]]
# Локальный фильтр тем: все сообщения, соответствующие этому фильтру тем, будут отправлены.
local.topic_filter = "local/topic1/egress/#"
remote.topic = "remote-topic1-egress-${local.topic}"
# Параметр queue_timeout контролирует, как долго ждать, если очередь производителя librdkafka заполнена. 0 — никогда не блокировать.
remote.queue_timeout = "0m"
# Устанавливает целевой раздел записи.
# remote.partition = 0
[[bridges.entries]]
# Локальный фильтр тем: все сообщения, соответствующие этому фильтру тем, будут отправлены.
local.topic_filter = "local/topic2/egress/#"
remote.topic = "remote-topic2-egress"
# remote.queue_timeout = "0m"
# remote.partition = 0
По умолчанию этот плагин не включён. Чтобы активировать его, необходимо добавить rmqtt-bridge-egress-kafka
в plugins.default_startups
конфигурации в основном файле конфигурации rmqtt.toml
, как показано ниже: ## --------------------------------------------------------------------
plugins.dir = "rmqtt-plugins/"
plugins.default_startups = [
#"rmqtt-plugin-template",
#"rmqtt-retainer",
#"rmqтт-auth-http",
#"rmqtt-cluster-broadcast",
#"rmqtt-cluster-raft",
#"rmqtt-sys-topic",
#"rmqtt-message-storage",
#"rmqtt-session-storage",
#"rmqtt-bridge-ingress-mqtt",
#"rmqtt-bridge-egress-mqtt",
#"rmqtt-bridge-ingress-kafka",
"rmqtt-bridge-egress-kafka",
"rmqtt-web-hook",
"rmqtt-http-api"
]
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )