(Простое простотой удобное расширение)
Kafka — это распределенная система подписки и доставки сообщений, поддерживаемая фондом Apache. Оригинальной целью Kafka было решение задачи создания единого, эффективного, с низкими задержками, высокопроизводительного (способность передавать большое количество данных одновременно) и надёжного платформы для обмена сообщениями. Это идеальное решение для распределённых очередей сообщений, журналов и каналов передачи данных. К сожалению, существующие расширения для PHP не очень удобны в использовании (расширение php-kafka давно не поддерживается и имеет множество проблем, а rdkafka C-библиотека также неудобна для использования). Поэтому было создано простое и удобное расширение для работы с Kafka.
Ссылки:
Адрес GitHub: https://github.com/wenzhenxi/See-KafKa
Адрес rdkafka PHP расширения: https://github.com/arnaud-lb/php-rdkafka
Зависимости уровня сервиса: https://github.com/edenhill/librdkafka
Блог автора: http://w-blog.cn
**(See-KafKa поддерживает версии от 0.9 до 0.10, но не поддерживает протоколы версий 0.8 и более ранних)**Первоначально вам потребуется установить и настроить Zookeeper + Kafka: можно обратиться к разделу Kafka на сайте автора для получения информации по установке, там приведена информация по установке версии 0.8.2.2, но процесс установки для версий 0.9 и 0.10 ничем не отличается, просто скачайте соответствующие пакеты.
До использования необходимо последовательно установить librdkafka, затем php-rdkafka:
# Установка librdkafka
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
make install
# Установка php-rdkafka
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
make install
# Добавьте следующую информацию в конфигурационный файл php.ini
vim /usr/local/php/etc/php.ini
extension=rdkafka.so
После выполнения этих шагов вы можете использовать команду php -m
для проверки наличия расширения rdkafka среди установленных расширений, что свидетельствует о успешной установке.
See-Kafka полностью совместимо с PhalApi, достаточно получить расширение Kafka из каталога расширений. Конечно, если вы используете другую систему, то вам нужно будет включить файл kafka.php для использования.
Один из двух основных компонентов Kafka — это производитель (для подробностей см. статью автора). К отправке сообщения в Topic Kafka можно привести следующий пример:
<?php
/**
* Пример использования производителя See-Kafka
* Цикл записи 10 000 сообщений за 15 миллисекунд
*/
``````markdown
## 2.1 Настройка конфигурации Kafka (по умолчанию порт 9092) через запятую
```php
$Kafka_Lite = new Kafka_Lite("127.0.0.1,localhost");
// Установка темы
$Kafka_Lite->setTopic("test");
// Одноразовая запись эффективна, запись 10 000 сообщений за 15 миллисекунд
$Producer = $Kafka_Lite->newProducer();
// Параметры: разделение, содержание сообщения, ключ сообщения (необязательно)
// Разделение: может быть установлено как KAFKA_PARTITION_UA для автоматического распределения, если есть 6 разделов, то при записи будет случайным образом выбран раздел
$Producer->setMessage(0, "Привет");
?>
Для потребителей поддерживаются четыре способа получения offset'ов:
Этот пример подходит для сценария, где требуется получить отрезок данных и завершить работу. Каждый вызов getMassage
создаёт соединение и затем закрывает его. При использовании циклического вызова getMassage
это может привести к значительному снижению производительности.
<?php
/**
* Пример использования потребителя See-kafka
*/
$Kafka_Lite = new Kafka_Lite("127.0.0.1,localhost");
// Установка Topic
$Kafka_Lite->setTopic("test");
// Установка группы потребителя (не обязательно, если не используется автоматическое получение offset'а)
$Kafka_Lite->setGroup("test");
// Получение экземпляра потребителя
$consumer = $Kafka_Lite->newConsumer();// Получение группы сообщений параметры: разделение, максимальное количество сообщений, offset (необязательно) по умолчанию KAFKA_OFFSET_STORED
$rs = $consumer->getMessages(0, 100);
// Возвращаемое значение - массив, каждый элемент типа Kafka_Message
?>
Этот пример подходит для скриптов очередей задач.
<?php
/**
* Пример использования потребителя See-kafka
* 889 миллисекунд для получения 10 000 сообщений
*/
```// Настройка конфигурации Kafka (по умолчанию порт 9092) через запятую
$Kafka_Lite = new Kafka_Lite("127.0.0.1,localhost");
// Установка Topic
$Kafka_Lite->setTopic("test");
// Установка группы потребителя (не обязательно, если не используется автоматическое получение offset'а)
$Kafka_Lite->setGroup("test");
?>
```// Это настройка определяет, с какого оффсета начинается новый group — с минимального или максимального. По умолчанию используется максимальный (или последний).
$Kafka_Lite->setTopicConf('auto.offset.reset', 'smallest');
// Эта конфигурация определяет, будет ли автоматическое подтверждение сообщений считаться успешным после получения данных, без необходимости вызова метода stop. Однако есть ограничения.
// Чем меньше значение времени, тем быстрее происходит подтверждение. Чем больше время, тем реже происходит подтверждение. При получении одного сообщения и возникновении ошибки, время между получением и выбросом исключения используется для вычисления завершённости обработки.
// Если исключение выбрасывается раньше указанного времени, оффсет не обновляется. Если же время больше этого значения, оффсет обновляется сразу. Рекомендуется установить это значение в диапазоне от 100 до 1000 миллисекунд.
$Kafka_Lite->setTopicConf('auto.commit.interval.ms', 1000);## 3. Конфигурационные файлыSee-kafka предоставляет два типа конфигурационных файлов, передаваемых ключами и значениями. Для получения информации о конкретных конфигурациях и их действиях обратитесь к следующему адресу:
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Описание конфигурационных файлов: [https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md "Описание конфигурационных файлов")
```php
$KafKa_Lite->setTopicConf();
$KafKa_Lite->setKafkaConf();
При использовании Consumer группы (KAFKA_OFFSET_STORED) следует учитывать следующие параметры конфигурации, чтобы при использовании новой группы смещение начиналось с текущего значения (в зависимости от сценария).
// Этот параметр определяет поведение при использовании новой группы — начинать с минимального смещения или максимального (по умолчанию максимального или последнего).
$KafKa_Lite->setTopicConf('auto.offset.reset', 'smallest');
После того как данные были получены Consumer'ом, необходимо сообщить Kafka о успешном получении данных и обновлении смещения. Однако, если в процессе произошла ошибка и смещение не было обновлено, то следующий запрос будет начинаться с начала. Этот параметр позволяет установить время автоматического обновления смещения после ошибки.```php // Этот параметр определяет, что данные будут считаться успешно полученными и смещение будет автоматически обновлено после указанного времени. // Чем меньше это значение, тем быстрее будет обновление смещения. Чем больше это значение, тем реже будет обновление смещения. // Если после получения данных возникает исключение за указанный период времени, то смещение не будет обновлено. В противном случае, смещение будет обновлено немедленно. // Рекомендуется установить это значение между 100 и 1000 миллисекундами. $Kafka_Lite->setTopicConf('auto.commit.interval.ms', 1000);
При инициализации объекта `KafKa_Lite` производится проверка доступности портов кластера. Если ни один из портов недоступен, будет выброшено исключение **Не могу использовать KafKa**. Также можно активировать операцию ping для проверки наличия доступных машин в кластере. При получении исключения от Consumer будет выброшено исключение **KafKa_Exception_Base**, которое имеет код ошибки, который можно использовать для справки в файле Exception/err.php. Рекомендуется использовать try-catch для обработки.
## 5. Заключение
Цель расширения See-KafKa — сделать более удобной интеграцию KafKa с PHP и обеспечить простоту использования. Если вас заинтересует этот проект, попробуйте его использовать, и если возникнут вопросы, вы можете обратиться с отзывами. Автор этого расширения будет продолжать его поддерживать!
Официальная группа поддержки: 438882880
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )