Класс: longlang\phpkafka\Consumer\ConsumerConfig
Поддерживает передачу массива через конструктор### Параметры конфигурации| Параметр | Описание | Значение по умолчанию | | --- | --- | --- | | connectTimeout | Время соединения (в секундах, поддерживаются десятичные значения); значение
-1
указывает на отсутствие ограничений |-1
| | sendTimeout | Время отправки (в секундах, поддерживаются десятичные значения); значение-1
указывает на отсутствие ограничений |-1
| | recvTimeout | Время приема (в секундах, поддерживаются десятичные значения); значение-1
указывает на отсутствие ограничений |-1
| | clientId | Идентификатор клиента Kafka; для различных процессов потребителей следует использовать различные значения |null
| | maxWriteAttempts | Максимальное количество попыток записи |3
| | client | Класс клиента Kafka, используемый; если значение равноnull
, то используется автоматическое распознавание в зависимости от сценария |null
| | socket | Класс сокета Kafka, используемый; если значение равноnull
, то используется автоматическое распознавание в зависимости от сценария |null
| | broker | Брокер; формат:'127.0.0.1:9092'
|null
| | bootstrapServers | Альтернативное имя bootstrapServer; серверы бутстрапа; если указано это значение, будет автоматически установлено соединение с этим сервером и автоматически обновлены брокеры. Формат:'127.0.0.1:9092,127.0.0.1:9093'
или['127.0.0.1:9092', '127.0.0.1:9093']
|null
| | updateBrokers | Автоматическое обновление брокеров |true
|| interval | Интервал времени между попытками получения сообщений при отсутствии сообщений (в секундах; поддерживаются десятичные значения); значение0
указывает на немедленную повторную попытку |0
| | groupId | Идентификатор группы |null
| | memberId | Идентификатор пользователя |null
| | groupInstanceId | Идентификатор экземпляра группы; для разных процессов потребителей следует использовать различные значения |null
| | sessionTimeout | Если за время этого параметра нет ответа на пинг, координатор считает пользователя недействительным (в секундах; поддерживаются десятичные значения) |60
| | rebalanceTimeout | Максимальное время, которое координатор ждет каждого участника для присоединения при ребалансировании группы (в секундах; поддерживаются десятичные значения) |60
| | topic | Название темы; поддерживается одновременная подписка на несколько тем |null
| | replicaId | Идентификатор реплики |-1
| | rackId | Номер шкафа |""
| | autoCommit | Автоматическая запись смещения |true
| | groupRetry | Число автоматических попыток повторной операции группы при совпадении с зарезервированным кодом ошибки |5
| | groupRetrySleep | Интервал времени между автоматическими попытками повторной операции группы (в секундах) |1
| | offsetRetry | Число автоматических попыток повторной операции смещения при совпадении с зарезервированным кодом ошибки |5
| | groupHeartbeat | Интервал времени между пингами группы (в секундах) |3
|| autoCreateTopic | Автоматическое создание темы |true
|| | partitionAssignmentStrategy | Стратегия назначения разделов для потребителя, варианты: диапазонное назначение -longlang\phpkafka\Consumer\Assignor\RangeAssignor
, циклическое назначение -longlang\phpkafka\Consumer\Assignor\RoundRobinAssignor
, прилипающее назначение -longlang\phpkafka\Consumer\Assignor\StickyAssignor
|longlang\phpkafka\Consumer\Assignor\RangeAssignor
| | exceptionCallback | Вызывается в случае возникновения исключения, которое невозможно выбросить в корутинеrecv()
. Формат:function(\Exception $e){}
|null
|
use longlang\phpkafka\Consumer\ConsumeMessage;
use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;
function consume(ConsumeMessage $message)
{
var_dump($message->getKey() . ':' . $message->getValue());
// $consumer->ack($message); // автоматическое подтверждение выключено
}
$config = new ConsumerConfig();
$config->setBroker('127.0.0.1:9092');
$config->setTopic('test'); // название темы
$config->setGroupId('testGroup'); // группа ID
$config->setClientId('test'); // клиентский ID, разные значения для разных процессов потребителей
$config->setGroupInstanceId('test'); // уникальный идентификатор группы, разные значения для разных процессов потребителей
$config->setInterval(0.1);
$consumer = new Consumer($config, 'consume');
$consumer->start();
Пример кода:
use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;
$config = new ConsumerConfig();
$config->setBroker('127.0.0.1:9092');
$config->setTopic('test'); // название темы
$config->setGroupId('testGroup'); // группа ID
$config->setClientId('test_custom'); // клиентский ID, разные значения для разных процессов потребителей
$config->setGroupInstanceId('test_custom'); // уникальный идентификатор группы, разные значения для разных процессов потребителей
$consumer = new Consumer($config);
while (true) {
$message = $consumer->consume();
if ($message) {
var_dump($message->getKey() . ':' . $message->getValue());
$consumer->ack($message); // ручное подтверждение
}
sleep(1);
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )