1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/jonny-li-rabbitmq-study

В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
08-最适合入门的RabbitMQ+PHP教程(八)延迟队列.md 9.3 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 02.06.2025 18:34 9a50763

Сценарий

В процессе разработки часто требуется использование задач с заданным временем выполнения. Для интернет-магазина такие задачи особенно важны, например, истечение срока действия купонов, автоматическое закрытие неподтвержденных заказов, закрытие заказов при отсутствии оплаты в течение двух часов и т.д. Все эти задачи требуют использования задач с заданным временем выполнения. Однако сами задачи с заданным временем выполнения имеют определенные проблемы. Обычно мы используем периодическую проверку базы данных для определения наличия задач, которые нужно выполнить. Это означает, что независимо от обстоятельств, нам нужно сначала проверить базу данных. При этом некоторые задачи требуют высокой точности времени, и им требуется проверка каждую секунду. Для небольших систем это может быть не проблемой, но для больших систем с большим объемом данных это становится нереалистичным. Поэтому требуются другие методы. Разумеется, существуют различные способы реализации, например, использование Redis для создания очередей с заданным временем выполнения, использование приоритетных очередей JDK для задержек, использование таймера и т.д.### RabbitMQ с заданным временем выполнения RabbitMQ сам по себе не поддерживает задачи с заданным временем выполнения. Однако можно использовать его встроенные возможности для реализации таких задач. Чтобы RabbitMQ поддерживал задачи с заданным временем выполнения, необходимо использовать Dead Letter Exchange (DLX) RabbitMQ и время жизни сообщений (TTL).### Обменник мертвых писем (DLX) Сообщение попадает в обменник мертвых писем (DLX), если выполняются следующие условия:

  • Сообщение было отклонено потребителем, и параметр requeue в методе reject установлен в false. Это означает, что сообщение не будет заново помещено в очередь для использования другими потребителями.
  • TTL сообщения истекло, и сообщение стало недействительным.
  • Длина очереди достигла предела, и сообщения, находящиеся в начале очереди, будут отброшены или перенаправлены в обменник мертвых писем.### TTL сообщений (время жизни сообщений) TTL сообщений представляет собой время жизни сообщения. RabbitMQ позволяет устанавливать TTL как для очередей, так и для отдельных сообщений. TTL для очереди определяет время, в течение которого сообщение будет храниться в очереди без потребителя. Для каждого отдельного сообщения можно также установить свое TTL. Если TTL истек, сообщение считается недействительным и становится "мертвым". Если TTL установлен как для очереди, так и для сообщения, будет использоваться наименьшее значение. Таким образом, время жизни сообщения может различаться в зависимости от настроек очередей, в которые оно было направлено.

Код отправки сообщения (send)

"""
    @desc Создание очереди с задержкой и обменника
    @param string $exchange_name
    @param string $queue_name
    @param string $ttl_time
"""
public function createOutQueue($exchange_name = '', $queue_name = '', $ttl_time = '')
{
    $this->delay_exchange_name = $exchange_name;
    $this->delay_queue_name = $queue_name;
    self::$ttl_time = $ttl_time ?  $ttl_time : 15000;
    try {
        $this->setExchangeDeclare($this->delay_exchange_name, self::$type, false, false, false);
        $this->getQueueDeclare($this->delay_queue_name, false, true, false, false, false);
        $this->setQueueBind($this->delay_queue_name, $this->delay_exchange_name, $this->delay_exchange_name);
    } catch (\Exception $e) {
        return 'Info:' .  $e->getMessage();
    }
}
/**
     * @desc Создание очереди и обменника
     * @param string $exchange_name
     * @param string $queue_name
     * @return string
     */
public function createDefaultQueue($exchange_name = '', $queue_name = '')
{
    $this->exchange_name = $exchange_name;
    $this->queue_name = $queue_name;
    try {
        $tale = new AMQPTable();
``````markdown
```php
$tale->set('x-dead-letter-exchange', $this->delay_exchange_name); // Указывает, какой обменник будет обрабатывать сообщение после истечения срока его жизни
$tale->set('x-dead-letter-routing-key', $this->delay_exchange_name); // Ключ маршрутизации для сообщений-мертвых пачек
$tale->set('x-message-ttl', self::$ttl_time); // Время жизни сообщения
$this->setExchangeDeclare($this->exchange_name, self::$type, false, false, false);
$this->getQueueDeclare($this->queue_name, false, true, false, false, false, $tale);
$this->setQueueBind($this->queue_name, $this->exchange_name, $this->exchange_name);
} catch (\Exception $e) {
return 'Info:' . $e->getMessage();
}
}
/**
 * @desc Отправка сообщения
 * @param string $message
 */
public function sendMessage($message = '')
{
    // Для тестирования отправляем несколько сообщений
    for ($i = 1; $i <= 10; $i++) {
        $messages = new AMQPMessage($message, [
            'expiration' => intval(self::$ttl_time),
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        $this->amqp_channel->basic_publish($messages, $this->exchange_name, $this->exchange_name);
        // Для удобства просмотра отправленных сообщений
        sleep(2);
        echo date('Y-m-d H:i:s') . ' Sent ' . $message . PHP_EOL;
    }
}

Исходный файл: send.php

```php
/**
    * @desc Прием сообщения и его обработка
    */
public function receiveMessage()
{
    echo 'Ожидание сообщения. Для выхода нажмите CTRL+C ' . PHP_EOL;

    $callback = function ($message) {

        echo date('Y-m-d H:i:s') . " Received: ", $message->body, PHP_EOL;

        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); 
    };
    $this->amqp_channel->basic_qos(null, 1, null);

    $this->amqp_channel->basic_consume($this->delay_queue_name, '', false, false, false, false, $callback);
}
```markdown
    while (count($this->amqp_channel->callbacks)) {

        $this->amqp_channel->wait();
    }
}

Исходный файл: receive.php

Сводка: на основе RabbitMQ реализованы задержанные задачи, то есть сообщение устанавливается с временем жизни, и помещается в очередь, которая не читается. Когда сообщение истекает, оно автоматически переходит в другую очередь, и обработчик, который слушает эту очередь, выполняет конкретные действия по задержанным задачам.

Оригинальная ссылка: Наиболее подходящий для новичков учебник RabbitMQ+PHP (восьмая часть): задержанная очередь

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/jonny-li-rabbitmq-study.git
git@api.gitlife.ru:oschina-mirror/jonny-li-rabbitmq-study.git
oschina-mirror
jonny-li-rabbitmq-study
jonny-li-rabbitmq-study
master