1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/wenzhenxi-See-KafKa

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

Упрощённое расширение PHP-Kafka See-KafKa

Иллюстрация

Введение

(Простое простотой удобное расширение)

Kafka — это распределенная система подписки и доставки сообщений, поддерживаемая фондом Apache. Оригинальной целью Kafka было решение задачи создания единого, эффективного, с низкими задержками, высокопроизводительного (способность передавать большое количество данных одновременно) и надёжного платформы для обмена сообщениями. Это идеальное решение для распределённых очередей сообщений, журналов и каналов передачи данных. К сожалению, существующие расширения для PHP не очень удобны в использовании (расширение php-kafka давно не поддерживается и имеет множество проблем, а rdkafka C-библиотека также неудобна для использования). Поэтому было создано простое и удобное расширение для работы с Kafka.

Ссылки:

Адрес GitHub: https://github.com/wenzhenxi/See-KafKa

Адрес rdkafka PHP расширения: https://github.com/arnaud-lb/php-rdkafka

Зависимости уровня сервиса: https://github.com/edenhill/librdkafka

Блог автора: http://w-blog.cn

1. Установка

**(See-KafKa поддерживает версии от 0.9 до 0.10, но не поддерживает протоколы версий 0.8 и более ранних)**Первоначально вам потребуется установить и настроить Zookeeper + Kafka: можно обратиться к разделу Kafka на сайте автора для получения информации по установке, там приведена информация по установке версии 0.8.2.2, но процесс установки для версий 0.9 и 0.10 ничем не отличается, просто скачайте соответствующие пакеты.

До использования необходимо последовательно установить librdkafka, затем php-rdkafka:

# Установка librdkafka
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
make install
# Установка php-rdkafka
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
make install
# Добавьте следующую информацию в конфигурационный файл php.ini
vim /usr/local/php/etc/php.ini
extension=rdkafka.so  

После выполнения этих шагов вы можете использовать команду php -m для проверки наличия расширения rdkafka среди установленных расширений, что свидетельствует о успешной установке.

2. Использование

See-Kafka полностью совместимо с PhalApi, достаточно получить расширение Kafka из каталога расширений. Конечно, если вы используете другую систему, то вам нужно будет включить файл kafka.php для использования.

2.1 Производитель

Один из двух основных компонентов Kafka — это производитель (для подробностей см. статью автора). К отправке сообщения в Topic Kafka можно привести следующий пример:

<?php
/**
 * Пример использования производителя See-Kafka
 * Цикл записи 10 000 сообщений за 15 миллисекунд
 */
``````markdown
## 2.1 Настройка конфигурации Kafka (по умолчанию порт 9092) через запятую
```php
$Kafka_Lite = new Kafka_Lite("127.0.0.1,localhost");
// Установка темы
$Kafka_Lite->setTopic("test");
// Одноразовая запись эффективна, запись 10 000 сообщений за 15 миллисекунд
$Producer = $Kafka_Lite->newProducer();
// Параметры: разделение, содержание сообщения, ключ сообщения (необязательно)
// Разделение: может быть установлено как KAFKA_PARTITION_UA для автоматического распределения, если есть 6 разделов, то при записи будет случайным образом выбран раздел
$Producer->setMessage(0, "Привет");
?>

2.2 Потребители

Для потребителей поддерживаются четыре способа получения offset'ов:

  • KAFKA_OFFSET_STORED # Получение offset'а через группу (необходимо установить группу)
  • KAFKA_OFFSET_END # Получение последнего offset'а
  • KAFKA_OFFSET_BEGINNING # Получение первого offset'а
  • Вручную указывается начальное значение offset'а

2.2.1 Пример 1

Этот пример подходит для сценария, где требуется получить отрезок данных и завершить работу. Каждый вызов getMassage создаёт соединение и затем закрывает его. При использовании циклического вызова getMassage это может привести к значительному снижению производительности.

<?php
/**
 * Пример использования потребителя See-kafka
 */
$Kafka_Lite = new Kafka_Lite("127.0.0.1,localhost");
// Установка Topic
$Kafka_Lite->setTopic("test");
// Установка группы потребителя (не обязательно, если не используется автоматическое получение offset'а)
$Kafka_Lite->setGroup("test");
// Получение экземпляра потребителя
$consumer = $Kafka_Lite->newConsumer();// Получение группы сообщений параметры: разделение, максимальное количество сообщений, offset (необязательно) по умолчанию KAFKA_OFFSET_STORED
$rs = $consumer->getMessages(0, 100);
// Возвращаемое значение - массив, каждый элемент типа Kafka_Message
?>

2.2.2 Пример 2

Этот пример подходит для скриптов очередей задач.

<?php
/**
 * Пример использования потребителя See-kafka
 * 889 миллисекунд для получения 10 000 сообщений
 */
```// Настройка конфигурации Kafka (по умолчанию порт 9092) через запятую
$Kafka_Lite = new Kafka_Lite("127.0.0.1,localhost");
// Установка Topic
$Kafka_Lite->setTopic("test");
// Установка группы потребителя (не обязательно, если не используется автоматическое получение offset'а)
$Kafka_Lite->setGroup("test");
?>
```// Это настройка определяет, с какого оффсета начинается новый group — с минимального или максимального. По умолчанию используется максимальный (или последний).
$Kafka_Lite->setTopicConf('auto.offset.reset', 'smallest');
// Эта конфигурация определяет, будет ли автоматическое подтверждение сообщений считаться успешным после получения данных, без необходимости вызова метода stop. Однако есть ограничения.
// Чем меньше значение времени, тем быстрее происходит подтверждение. Чем больше время, тем реже происходит подтверждение. При получении одного сообщения и возникновении ошибки, время между получением и выбросом исключения используется для вычисления завершённости обработки.
// Если исключение выбрасывается раньше указанного времени, оффсет не обновляется. Если же время больше этого значения, оффсет обновляется сразу. Рекомендуется установить это значение в диапазоне от 100 до 1000 миллисекунд.
$Kafka_Lite->setTopicConf('auto.commit.interval.ms', 1000);## 3. Конфигурационные файлыSee-kafka предоставляет два типа конфигурационных файлов, передаваемых ключами и значениями. Для получения информации о конкретных конфигурациях и их действиях обратитесь к следующему адресу:

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Описание конфигурационных файлов: [https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md "Описание конфигурационных файлов")

```php
$KafKa_Lite->setTopicConf();
$KafKa_Lite->setKafkaConf();

При использовании Consumer группы (KAFKA_OFFSET_STORED) следует учитывать следующие параметры конфигурации, чтобы при использовании новой группы смещение начиналось с текущего значения (в зависимости от сценария).

// Этот параметр определяет поведение при использовании новой группы — начинать с минимального смещения или максимального (по умолчанию максимального или последнего).
$KafKa_Lite->setTopicConf('auto.offset.reset', 'smallest');

После того как данные были получены Consumer'ом, необходимо сообщить Kafka о успешном получении данных и обновлении смещения. Однако, если в процессе произошла ошибка и смещение не было обновлено, то следующий запрос будет начинаться с начала. Этот параметр позволяет установить время автоматического обновления смещения после ошибки.```php // Этот параметр определяет, что данные будут считаться успешно полученными и смещение будет автоматически обновлено после указанного времени. // Чем меньше это значение, тем быстрее будет обновление смещения. Чем больше это значение, тем реже будет обновление смещения. // Если после получения данных возникает исключение за указанный период времени, то смещение не будет обновлено. В противном случае, смещение будет обновлено немедленно. // Рекомендуется установить это значение между 100 и 1000 миллисекундами. $Kafka_Lite->setTopicConf('auto.commit.interval.ms', 1000);


При инициализации объекта `KafKa_Lite` производится проверка доступности портов кластера. Если ни один из портов недоступен, будет выброшено исключение **Не могу использовать KafKa**. Также можно активировать операцию ping для проверки наличия доступных машин в кластере. При получении исключения от Consumer будет выброшено исключение **KafKa_Exception_Base**, которое имеет код ошибки, который можно использовать для справки в файле Exception/err.php. Рекомендуется использовать try-catch для обработки.

## 5. Заключение

Цель расширения See-KafKa — сделать более удобной интеграцию KafKa с PHP и обеспечить простоту использования. Если вас заинтересует этот проект, попробуйте его использовать, и если возникнут вопросы, вы можете обратиться с отзывами. Автор этого расширения будет продолжать его поддерживать!

Официальная группа поддержки: 438882880

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Kafka — это распределённая система подписки и рассылки, которую поддерживает Apache Foundation. Изначально Kafka была создана для того, чтобы создать унифицированную, эффективную, с низкими задержками и высокой пропускной способностью (объёмом передаваемых данных) и доступную платформу обмена сообщениями. Это распределённая очередь сообщений, ра... Развернуть Свернуть
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/wenzhenxi-See-KafKa.git
git@api.gitlife.ru:oschina-mirror/wenzhenxi-See-KafKa.git
oschina-mirror
wenzhenxi-See-KafKa
wenzhenxi-See-KafKa
master