Qbusbridge — это клиентский SDK для систем обмена сообщениями типа «публикация-подписка». В настоящее время он поддерживает:
Пользователь может переключиться на любую систему обмена сообщениями, изменив файл конфигурации. По умолчанию используется доступ к Kafka, если вы хотите изменить его на Pulsar, измените конфигурацию следующим образом:
mq.type=pulsar
# Другие настройки для Pulsar...
Подробнее см. в разделе config.
TODO: В настоящее время отсутствуют документы по конфигурации на английском языке.
QBusbridge-Kafka основан на librdkafka. Множество деталей, связанных с использованием, было скрыто, что делает QBus более простым и удобным в использовании, чем librdkafka. Для отправки и получения сообщений пользователям нужно всего лишь вызвать несколько API, им не нужно слишком много знать о Kafka.
Надёжность отправки сообщений, которая может быть самой большой проблемой для пользователей, была значительно улучшена.
Убедитесь, что в вашей системе установлены g++ (>= 4.8.5), boost (>= 1.41), cmake (>= 3.1) и swig (>= 3.0.12).
Кроме того, SDK qbus статически связывает libstdc++, поэтому вы должны убедиться, что существует libstdc++.a. Для пользователей CentOS выполните:
sudo yum install -y glibc-static libstdc++-static
git clone --recursive https://github.com/ntt360/qbusbridge.git
Если вам нужно, чтобы librdkafa поддерживал аутентификацию SASL в kafka, вам также необходимо установить:
sudo yum install -y cyrus-sasl-devel
Если вы также используете аутентификацию GSSAPI, вам необходимо скомпилировать соответствующий плагин:
sudo yum install -y cyrus-sasl-gssapi
Запустите ./build_dependencies.sh.
Он автоматически загрузит подмодули и установит их в cxx/thirdparts/local, где CMakeLists.txt находит заголовки и библиотеки.
См. cxx/thirdparts/local:
include/
librdkafka/
rdkafka.h
log4cplus/
logger.h
lib/
librdkafka.a
liblog4cplus.a
Если вам нужна поддержка функциональности SASL, после компиляции вы можете перейти в каталог cxx/thirdparts/librdkafka/examples/ и выполнить следующую команду, чтобы проверить, успешно ли скомпилирован компонент SASL:
cd cxx/thirdparts/librdkafka/examples/
./rdkafka_example -X builtin.features
# builtin.features = gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer,http,oidc
Убедитесь, что вывод builtin.features скомпилировал связанные с SASL модули аутентификации!
Перейдите в каталог cxx и запустите ./build.sh, будут созданы следующие файлы:
include/
qbus_consumer.h
qbus_producer.h
lib/
debug/libQBus.so
release/libQBus.so
Хотя для создания C++ SDK требуется поддержка C++11, SDK можно использовать со старым g++. Например, создайте qbus SDK с g++ 4.8.5 и используйте qbus SDK с g++ 4.4.7.
Перейдите в каталог golang и запустите ./build.sh, будут созданы следующие файлы:
gopath/
src/
qbus/
qbus.go
libQBus_go.so
Вы можете включить модуль go для примеров, запустив USE_GO_MOD=1. ./build.sh. После этого будут созданы следующие файлы:
examples/
go.mod
qbus/
qbus.go
go.mod
libQBus_go.so
Перейдите в каталог python
и запустите ./build.sh
. Будут созданы следующие файлы:
examples/
qbus.py
_qbus.so
Перейдите в каталог php
и запустите build.sh
, после чего будет создан файл:
examples/
qbus.php
qbus.so
Перейдите в подкаталог examples
и выполните команду ./build.sh [debug|release]
, чтобы создать исполняемые файлы. В режиме debug
используется libQBus.so
из подкаталога lib/debug
, а в режиме release
— libQBus.so
из lib/release
. Чтобы удалить их, выполните команду make clean
.
Если вы хотите создать собственные программы, посмотрите, как это делает Makefile
.
Перейдите в подкаталог examples
и запустите команду ./build.sh
для создания исполняемых файлов. Для их удаления выполните команду ./clean.sh
.
Добавьте путь к файлу libQBus_go.so
в переменную окружения LD_LIBRARY_PATH
, например:
export LD_LIBRARY_PATH=$PWD/gopath/src/qbus:$LD_LIBRARY_PATH
Чтобы создать собственные программы, добавьте сгенерированный каталог gopath
в переменную среды GOPATH
или переместите каталог gopath/src/qbus
в $GOPATH/src
.
Скопируйте созданные файлы qbus.py
и _qbus.so
по пути запускаемых скриптов Python.
Отредактируйте файл php.ini
и добавьте расширение <module-path>
, где <module-path>
— это путь к qbus.so
.
produce
, а в асинхронном режиме, проверив возвращаемое значение, можно узнать, заполнена ли очередь отправки.produce
напрямую вернёт значение, указывающее, была ли текущая запись успешной. Но это происходит за счёт потери производительности и использования ресурсов процессора. Поэтому рекомендуется использовать асинхронный режим.produce
:bool QbusProducer::init(const string& broker_list,
const string& log_path,
const string& config_path,
const string& topic_name);
bool QbusProducer::produce(const char* data,
size_t data_len,
const std::string& key);
void QbusProducer::uninit();
#include <string>
#include <iostream>
#include "qbus_producer.h"
int main(int argc, const char* argv[]) {
qbus::QbusProducer qbus_producer;
if (!qbus_producer.init("127.0.0.1:9092",
"./log",
"./config",
"topic_test")) {
std::cout << "Failed to init" << std::endl;
return 0;
}
std::string msg("test\n");
if (!qbus_producer.produce(msg.c_str(), msg.length(), "key")) {
std::cout << "Failed to produce" << std::endl;
}
qbus_producer.uninit();
return 0;
}
subscribeOne
для подписки на «тему» (также поддерживается подписка на несколько тем). Текущий процесс не блокируется, и каждое сообщение отправляется пользователю через обратный вызов.bool QbusConsumer::init(const std::string& broker_list,
const std::string& log_path,
const std::string& config_path,
const QbusConsumerCallback& callback);
bool QbusConsumer::subscribeOne(const std::string& group, const std::string& topic);
bool QbusConsumer::subscribe(const std::string& group,
const std::vector<std::string>& topics);
bool QbusConsumer::start();
void **QbusConsumer::stop();**
**bool QbusConsumer::pause(const std::vector<std::string>& topics);**
**bool QbusConsumer::resume(const std::vector<std::string>& topics);**
*Пример использования C++ SDK:*
```c++
#include <iostream>
#include "qbus_consumer.h"
qbus::QbusConsumer qbus_consumer;
class MyCallback: public qbus::QbusConsumerCallback {
public:
virtual void deliveryMsg(const std::string& topic,
const char* msg,
const size_t msg_len) const {
std::cout << "topic: " << topic << " | msg: " << std::string(msg, msg_len) << std::endl;
}
};
int main(int argc, char* argv[]) {
MyCallback my_callback;
if (qbus_consumer.init("127.0.0.1:9092",
"log",
"config",
my_callback)) {
if (qbus_consumer.subscribeOne("groupid_test", "topic_test")) {
if (!qbus_consumer.start()) {
std::cout << "Failed to start" << std::endl;
return NULL;
}
while (1) sleep(1); //other operations can appear here
qbus_consumer.stop();
} else {
std::cout << "Failed subscribe" << std::endl;
}
} else {
std::cout << "Failed init" << std::endl;
}
return 0;
}
```
Вы можете использовать методы `pause()` и `resume()`, чтобы приостановить или возобновить потребление некоторых тем. Смотрите [qbus_pause_resume_example.cc](./cxx/examples/consumer/qbus_pause_resume_example.cc).
Смотрите примеры в [C examples](c/examples/), [C++ examples](cxx/examples/), [Go examples](golang/examples/), [Python examples](python/examples/) и [PHP examples](php/examples/) для более подробного использования.
## КОНФИГУРАЦИЯ
Файл конфигурации находится в формате [INI](https://en.wikipedia.org/wiki/INI_file):
```ini
[global]
[topic]
[sdk]
```
См. [rdkafka 1.0.x configuration](https://github.com/edenhill/librdkafka/blob/1.0.x/CONFIGURATION.md) для *global* и *topic* конфигураций и [sdk configuration](https://github.com/Qihoo360/kafkabridge/blob/master/CONFIGURATION.md) для конфигурации *sdk*.
Обычно kafkabridge работает с пустым файлом конфигурации, но если ваша версия брокера < 0.10.0.0, вы должны указать параметры конфигурации, связанные с api.version, см. [broker version compatibility](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility).
Например, для брокера 0.9.0.1 необходимы следующие конфигурации:
```ini
[global]
api.version.request=false
broker.version.fallback=0.9.0.1
```
По умолчанию конфигурация теперь совместима с брокером 0.9.0.1. Поэтому, если используется более высокая версия брокера, `api.version.request` должен быть установлен в true. В противном случае протокол сообщений будет более старой версии, например, без поля timestamp.
## Контакты
QQ группа: 876834263

Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )