1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/longzhiyan-phpkafka

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
consumer.md 7.7 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 07.03.2025 04:28 135a743

Клиент

Конфигурация клиента

Класс: longlang\phpkafka\Consumer\ConsumerConfig

Поддерживает передачу массива через конструктор### Параметры конфигурации| Параметр | Описание | Значение по умолчанию | | --- | --- | --- | | connectTimeout | Время соединения (в секундах, поддерживаются десятичные значения); значение -1 указывает на отсутствие ограничений | -1 | | sendTimeout | Время отправки (в секундах, поддерживаются десятичные значения); значение -1 указывает на отсутствие ограничений | -1 | | recvTimeout | Время приема (в секундах, поддерживаются десятичные значения); значение -1 указывает на отсутствие ограничений | -1 | | clientId | Идентификатор клиента Kafka; для различных процессов потребителей следует использовать различные значения | null | | maxWriteAttempts | Максимальное количество попыток записи | 3 | | client | Класс клиента Kafka, используемый; если значение равно null, то используется автоматическое распознавание в зависимости от сценария | null | | socket | Класс сокета Kafka, используемый; если значение равно null, то используется автоматическое распознавание в зависимости от сценария | null | | broker | Брокер; формат: '127.0.0.1:9092' | null | | bootstrapServers | Альтернативное имя bootstrapServer; серверы бутстрапа; если указано это значение, будет автоматически установлено соединение с этим сервером и автоматически обновлены брокеры. Формат: '127.0.0.1:9092,127.0.0.1:9093' или ['127.0.0.1:9092', '127.0.0.1:9093'] | null | | updateBrokers | Автоматическое обновление брокеров | true || interval | Интервал времени между попытками получения сообщений при отсутствии сообщений (в секундах; поддерживаются десятичные значения); значение 0 указывает на немедленную повторную попытку | 0 | | groupId | Идентификатор группы | null | | memberId | Идентификатор пользователя | null | | groupInstanceId | Идентификатор экземпляра группы; для разных процессов потребителей следует использовать различные значения | null | | sessionTimeout | Если за время этого параметра нет ответа на пинг, координатор считает пользователя недействительным (в секундах; поддерживаются десятичные значения) | 60 | | rebalanceTimeout | Максимальное время, которое координатор ждет каждого участника для присоединения при ребалансировании группы (в секундах; поддерживаются десятичные значения) | 60 | | topic | Название темы; поддерживается одновременная подписка на несколько тем | null | | replicaId | Идентификатор реплики | -1 | | rackId | Номер шкафа | "" | | autoCommit | Автоматическая запись смещения | true | | groupRetry | Число автоматических попыток повторной операции группы при совпадении с зарезервированным кодом ошибки | 5 | | groupRetrySleep | Интервал времени между автоматическими попытками повторной операции группы (в секундах) | 1 | | offsetRetry | Число автоматических попыток повторной операции смещения при совпадении с зарезервированным кодом ошибки | 5 | | groupHeartbeat | Интервал времени между пингами группы (в секундах) | 3 || autoCreateTopic | Автоматическое создание темы | true || | partitionAssignmentStrategy | Стратегия назначения разделов для потребителя, варианты: диапазонное назначение - longlang\phpkafka\Consumer\Assignor\RangeAssignor, циклическое назначение - longlang\phpkafka\Consumer\Assignor\RoundRobinAssignor, прилипающее назначение - longlang\phpkafka\Consumer\Assignor\StickyAssignor | longlang\phpkafka\Consumer\Assignor\RangeAssignor | | exceptionCallback | Вызывается в случае возникновения исключения, которое невозможно выбросить в корутине recv(). Формат: function(\Exception $e){} | null |

Асинхронное потребление (callback)Пример кода:

use longlang\phpkafka\Consumer\ConsumeMessage;
use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;

function consume(ConsumeMessage $message)
{
    var_dump($message->getKey() . ':' . $message->getValue());
    // $consumer->ack($message); // автоматическое подтверждение выключено
}
$config = new ConsumerConfig();
$config->setBroker('127.0.0.1:9092');
$config->setTopic('test'); // название темы
$config->setGroupId('testGroup'); // группа ID
$config->setClientId('test'); // клиентский ID, разные значения для разных процессов потребителей
$config->setGroupInstanceId('test'); // уникальный идентификатор группы, разные значения для разных процессов потребителей
$config->setInterval(0.1);
$consumer = new Consumer($config, 'consume');
$consumer->start();

Синхронное потребление

Пример кода:

use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;

$config = new ConsumerConfig();
$config->setBroker('127.0.0.1:9092');
$config->setTopic('test'); // название темы
$config->setGroupId('testGroup'); // группа ID
$config->setClientId('test_custom'); // клиентский ID, разные значения для разных процессов потребителей
$config->setGroupInstanceId('test_custom'); // уникальный идентификатор группы, разные значения для разных процессов потребителей
$consumer = new Consumer($config);
while (true) {
    $message = $consumer->consume();
    if ($message) {
        var_dump($message->getKey() . ':' . $message->getValue());
        $consumer->ack($message); // ручное подтверждение
    }
    sleep(1);
}

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/longzhiyan-phpkafka.git
git@api.gitlife.ru:oschina-mirror/longzhiyan-phpkafka.git
oschina-mirror
longzhiyan-phpkafka
longzhiyan-phpkafka
master