1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/mirrors-KafkaBridge

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
README.md 15 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 30.11.2024 03:20 c896549

Введение

Qbusbridge — это клиентский SDK для систем обмена сообщениями типа «публикация-подписка». В настоящее время он поддерживает:

  • Apache Kafka;
  • Apache Pulsar.

Пользователь может переключиться на любую систему обмена сообщениями, изменив файл конфигурации. По умолчанию используется доступ к Kafka, если вы хотите изменить его на Pulsar, измените конфигурацию следующим образом:

mq.type=pulsar
# Другие настройки для Pulsar...

Подробнее см. в разделе config.

TODO: В настоящее время отсутствуют документы по конфигурации на английском языке.

QBusbridge-Kafka основан на librdkafka. Множество деталей, связанных с использованием, было скрыто, что делает QBus более простым и удобным в использовании, чем librdkafka. Для отправки и получения сообщений пользователям нужно всего лишь вызвать несколько API, им не нужно слишком много знать о Kafka.

Надёжность отправки сообщений, которая может быть самой большой проблемой для пользователей, была значительно улучшена.

Функции

  • Поддерживаются несколько языков программирования, включая C++, PHP, Python, Golong, с очень согласованными API.
  • Несколько интерфейсов, поэтому он прост в использовании.
  • Для опытных пользователей также поддерживается настройка librdkafka с помощью профилей.
  • В случае записи данных не по ключам SDK сделает всё возможное, чтобы гарантировать успешную запись сообщений.
  • Поддерживается два режима записи: синхронный и асинхронный.
  • Что касается потребления сообщений, смещение можно отправить автоматически или настроить вручную.
  • При использовании php-fpm соединение остаётся активным для непрерывного воспроизведения сообщений, что позволяет избежать затрат, вызванных повторным созданием соединений.

Компиляция

Убедитесь, что в вашей системе установлены g++ (>= 4.8.5), boost (>= 1.41), cmake (>= 3.1) и swig (>= 3.0.12).

Кроме того, SDK qbus статически связывает libstdc++, поэтому вы должны убедиться, что существует libstdc++.a. Для пользователей CentOS выполните:

sudo yum install -y glibc-static libstdc++-static

git clone:

git clone --recursive https://github.com/ntt360/qbusbridge.git

Поддержка SASL

Если вам нужно, чтобы librdkafa поддерживал аутентификацию SASL в kafka, вам также необходимо установить:

sudo yum install -y cyrus-sasl-devel

Если вы также используете аутентификацию GSSAPI, вам необходимо скомпилировать соответствующий плагин:

sudo yum install -y cyrus-sasl-gssapi

1. Установите подмодули

Запустите ./build_dependencies.sh.

Он автоматически загрузит подмодули и установит их в cxx/thirdparts/local, где CMakeLists.txt находит заголовки и библиотеки.

См. cxx/thirdparts/local:

include/
  librdkafka/
    rdkafka.h
  log4cplus/
    logger.h
lib/
  librdkafka.a
  liblog4cplus.a

Если вам нужна поддержка функциональности SASL, после компиляции вы можете перейти в каталог cxx/thirdparts/librdkafka/examples/ и выполнить следующую команду, чтобы проверить, успешно ли скомпилирован компонент SASL:

cd cxx/thirdparts/librdkafka/examples/
./rdkafka_example -X builtin.features
# builtin.features = gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer,http,oidc

Убедитесь, что вывод builtin.features скомпилировал связанные с SASL модули аутентификации!

2. Создайте SDK

C++

Перейдите в каталог cxx и запустите ./build.sh, будут созданы следующие файлы:

include/
  qbus_consumer.h
  qbus_producer.h
lib/
  debug/libQBus.so
  release/libQBus.so

Хотя для создания C++ SDK требуется поддержка C++11, SDK можно использовать со старым g++. Например, создайте qbus SDK с g++ 4.8.5 и используйте qbus SDK с g++ 4.4.7.

Go

Перейдите в каталог golang и запустите ./build.sh, будут созданы следующие файлы:

gopath/
  src/
    qbus/
      qbus.go
      libQBus_go.so

Вы можете включить модуль go для примеров, запустив USE_GO_MOD=1. ./build.sh. После этого будут созданы следующие файлы:

examples/
  go.mod
  qbus/
    qbus.go
    go.mod
    libQBus_go.so

Python

Перейдите в каталог python и запустите ./build.sh. Будут созданы следующие файлы:

examples/
  qbus.py
  _qbus.so

PHP

Перейдите в каталог php и запустите build.sh, после чего будет создан файл:

examples/
  qbus.php
  qbus.so

3. Сборка примеров

C++

Перейдите в подкаталог examples и выполните команду ./build.sh [debug|release], чтобы создать исполняемые файлы. В режиме debug используется libQBus.so из подкаталога lib/debug, а в режиме releaselibQBus.so из lib/release. Чтобы удалить их, выполните команду make clean.

Если вы хотите создать собственные программы, посмотрите, как это делает Makefile.

Go

Перейдите в подкаталог examples и запустите команду ./build.sh для создания исполняемых файлов. Для их удаления выполните команду ./clean.sh.

Добавьте путь к файлу libQBus_go.so в переменную окружения LD_LIBRARY_PATH, например:

export LD_LIBRARY_PATH=$PWD/gopath/src/qbus:$LD_LIBRARY_PATH

Чтобы создать собственные программы, добавьте сгенерированный каталог gopath в переменную среды GOPATH или переместите каталог gopath/src/qbus в $GOPATH/src.

Python

Скопируйте созданные файлы qbus.py и _qbus.so по пути запускаемых скриптов Python.

PHP

Отредактируйте файл php.ini и добавьте расширение <module-path>, где <module-path> — это путь к qbus.so.

Использование

Создание данных

  • В случае записи данных не по ключам SDK сделает всё возможное, чтобы отправить каждое сообщение. Пока в кластере Kafka есть хотя бы один брокер, работающий нормально, он будет пытаться повторить отправку.
  • Для записи данных нужно только вызвать интерфейс produce, а в асинхронном режиме, проверив возвращаемое значение, можно узнать, заполнена ли очередь отправки.
  • В синхронном режиме записи интерфейс produce напрямую вернёт значение, указывающее, была ли текущая запись успешной. Но это происходит за счёт потери производительности и использования ресурсов процессора. Поэтому рекомендуется использовать асинхронный режим.
  • Пример на C++, демонстрирующий вызов интерфейса produce:
bool QbusProducer::init(const string& broker_list,
                        const string& log_path,
                        const string& config_path,
                        const string& topic_name);
bool QbusProducer::produce(const char* data,
                           size_t data_len,
                           const std::string& key);
void QbusProducer::uninit();
  • Использование C++ SDK:
#include <string>
#include <iostream>
#include "qbus_producer.h"

int main(int argc, const char* argv[]) {
    qbus::QbusProducer qbus_producer;
    if (!qbus_producer.init("127.0.0.1:9092",
                    "./log",
                    "./config",
                    "topic_test")) {
        std::cout << "Failed to init" << std::endl;
        return 0;
    }

    std::string msg("test\n");
    if (!qbus_producer.produce(msg.c_str(), msg.length(), "key")) {
        std::cout << "Failed to produce" << std::endl;
    }

    qbus_producer.uninit();

    return 0;
}

Чтение данных

  • Чтение данных требует только вызова функции subscribeOne для подписки на «тему» (также поддерживается подписка на несколько тем). Текущий процесс не блокируется, и каждое сообщение отправляется пользователю через обратный вызов.
  • SDK также поддерживает ручную отправку смещения, пользователи могут отправлять смещение в коде тела сообщения, которое возвращается через обратные вызовы.
  • Пример C++, демонстрирующий использование интерфейса чтения:
bool QbusConsumer::init(const std::string& broker_list,
                        const std::string& log_path,
                        const std::string& config_path,
                        const QbusConsumerCallback& callback);
bool QbusConsumer::subscribeOne(const std::string& group, const std::string& topic);
bool QbusConsumer::subscribe(const std::string& group,
                             const std::vector<std::string>& topics);
bool QbusConsumer::start();
void **QbusConsumer::stop();**

**bool QbusConsumer::pause(const std::vector<std::string>& topics);**

**bool QbusConsumer::resume(const std::vector<std::string>& topics);**

*Пример использования C++ SDK:*

```c++
#include <iostream>
#include "qbus_consumer.h"

qbus::QbusConsumer qbus_consumer;
class MyCallback: public qbus::QbusConsumerCallback {
    public:
        virtual void deliveryMsg(const std::string& topic,
                const char* msg,
                const size_t msg_len) const {
            std::cout << "topic: " << topic << " | msg: " << std::string(msg, msg_len) << std::endl;
        }
};

int main(int argc, char* argv[]) {
    MyCallback my_callback;
    if (qbus_consumer.init("127.0.0.1:9092",
                "log",
                "config",
                my_callback)) {
        if (qbus_consumer.subscribeOne("groupid_test", "topic_test")) {
            if (!qbus_consumer.start()) {
                std::cout << "Failed to start" << std::endl;
                return NULL;
            }

            while (1) sleep(1);  //other operations can appear here

            qbus_consumer.stop();
        } else {
            std::cout << "Failed subscribe" << std::endl;
        }
    } else {
        std::cout << "Failed init" << std::endl;
    }
    return 0;
}
```

Вы можете использовать методы `pause()` и `resume()`, чтобы приостановить или возобновить потребление некоторых тем. Смотрите [qbus_pause_resume_example.cc](./cxx/examples/consumer/qbus_pause_resume_example.cc).

Смотрите примеры в [C examples](c/examples/), [C++ examples](cxx/examples/), [Go examples](golang/examples/), [Python examples](python/examples/) и [PHP examples](php/examples/) для более подробного использования.

## КОНФИГУРАЦИЯ

Файл конфигурации находится в формате [INI](https://en.wikipedia.org/wiki/INI_file):

```ini
[global]

[topic]

[sdk]
```

См. [rdkafka 1.0.x configuration](https://github.com/edenhill/librdkafka/blob/1.0.x/CONFIGURATION.md) для *global* и *topic* конфигураций и [sdk configuration](https://github.com/Qihoo360/kafkabridge/blob/master/CONFIGURATION.md) для конфигурации *sdk*.

Обычно kafkabridge работает с пустым файлом конфигурации, но если ваша версия брокера < 0.10.0.0, вы должны указать параметры конфигурации, связанные с api.version, см. [broker version compatibility](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility).

Например, для брокера 0.9.0.1 необходимы следующие конфигурации:

```ini
[global]
api.version.request=false
broker.version.fallback=0.9.0.1
```

По умолчанию конфигурация теперь совместима с брокером 0.9.0.1. Поэтому, если используется более высокая версия брокера, `api.version.request` должен быть установлен в true. В противном случае протокол сообщений будет более старой версии, например, без поля timestamp.

## Контакты

QQ группа: 876834263

![](https://github.com/Qihoo360/qbusbridge/blob/master/kafkabridge.png)

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/mirrors-KafkaBridge.git
git@api.gitlife.ru:oschina-mirror/mirrors-KafkaBridge.git
oschina-mirror
mirrors-KafkaBridge
mirrors-KafkaBridge
master