В процессе разработки часто требуется использование задач с заданным временем выполнения. Для интернет-магазина такие задачи особенно важны, например, истечение срока действия купонов, автоматическое закрытие неподтвержденных заказов, закрытие заказов при отсутствии оплаты в течение двух часов и т.д. Все эти задачи требуют использования задач с заданным временем выполнения. Однако сами задачи с заданным временем выполнения имеют определенные проблемы. Обычно мы используем периодическую проверку базы данных для определения наличия задач, которые нужно выполнить. Это означает, что независимо от обстоятельств, нам нужно сначала проверить базу данных. При этом некоторые задачи требуют высокой точности времени, и им требуется проверка каждую секунду. Для небольших систем это может быть не проблемой, но для больших систем с большим объемом данных это становится нереалистичным. Поэтому требуются другие методы. Разумеется, существуют различные способы реализации, например, использование Redis для создания очередей с заданным временем выполнения, использование приоритетных очередей JDK для задержек, использование таймера и т.д.### RabbitMQ с заданным временем выполнения RabbitMQ сам по себе не поддерживает задачи с заданным временем выполнения. Однако можно использовать его встроенные возможности для реализации таких задач. Чтобы RabbitMQ поддерживал задачи с заданным временем выполнения, необходимо использовать Dead Letter Exchange (DLX) RabbitMQ и время жизни сообщений (TTL).### Обменник мертвых писем (DLX) Сообщение попадает в обменник мертвых писем (DLX), если выполняются следующие условия:
"""
@desc Создание очереди с задержкой и обменника
@param string $exchange_name
@param string $queue_name
@param string $ttl_time
"""
public function createOutQueue($exchange_name = '', $queue_name = '', $ttl_time = '')
{
$this->delay_exchange_name = $exchange_name;
$this->delay_queue_name = $queue_name;
self::$ttl_time = $ttl_time ? $ttl_time : 15000;
try {
$this->setExchangeDeclare($this->delay_exchange_name, self::$type, false, false, false);
$this->getQueueDeclare($this->delay_queue_name, false, true, false, false, false);
$this->setQueueBind($this->delay_queue_name, $this->delay_exchange_name, $this->delay_exchange_name);
} catch (\Exception $e) {
return 'Info:' . $e->getMessage();
}
}
/**
* @desc Создание очереди и обменника
* @param string $exchange_name
* @param string $queue_name
* @return string
*/
public function createDefaultQueue($exchange_name = '', $queue_name = '')
{
$this->exchange_name = $exchange_name;
$this->queue_name = $queue_name;
try {
$tale = new AMQPTable();
``````markdown
```php
$tale->set('x-dead-letter-exchange', $this->delay_exchange_name); // Указывает, какой обменник будет обрабатывать сообщение после истечения срока его жизни
$tale->set('x-dead-letter-routing-key', $this->delay_exchange_name); // Ключ маршрутизации для сообщений-мертвых пачек
$tale->set('x-message-ttl', self::$ttl_time); // Время жизни сообщения
$this->setExchangeDeclare($this->exchange_name, self::$type, false, false, false);
$this->getQueueDeclare($this->queue_name, false, true, false, false, false, $tale);
$this->setQueueBind($this->queue_name, $this->exchange_name, $this->exchange_name);
} catch (\Exception $e) {
return 'Info:' . $e->getMessage();
}
}
/**
* @desc Отправка сообщения
* @param string $message
*/
public function sendMessage($message = '')
{
// Для тестирования отправляем несколько сообщений
for ($i = 1; $i <= 10; $i++) {
$messages = new AMQPMessage($message, [
'expiration' => intval(self::$ttl_time),
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$this->amqp_channel->basic_publish($messages, $this->exchange_name, $this->exchange_name);
// Для удобства просмотра отправленных сообщений
sleep(2);
echo date('Y-m-d H:i:s') . ' Sent ' . $message . PHP_EOL;
}
}
Исходный файл: send.php
```php
/**
* @desc Прием сообщения и его обработка
*/
public function receiveMessage()
{
echo 'Ожидание сообщения. Для выхода нажмите CTRL+C ' . PHP_EOL;
$callback = function ($message) {
echo date('Y-m-d H:i:s') . " Received: ", $message->body, PHP_EOL;
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};
$this->amqp_channel->basic_qos(null, 1, null);
$this->amqp_channel->basic_consume($this->delay_queue_name, '', false, false, false, false, $callback);
}
```markdown
while (count($this->amqp_channel->callbacks)) {
$this->amqp_channel->wait();
}
}
Исходный файл: receive.php
Сводка: на основе RabbitMQ реализованы задержанные задачи, то есть сообщение устанавливается с временем жизни, и помещается в очередь, которая не читается. Когда сообщение истекает, оно автоматически переходит в другую очередь, и обработчик, который слушает эту очередь, выполняет конкретные действия по задержанным задачам.
Оригинальная ссылка: Наиболее подходящий для новичков учебник RabbitMQ+PHP (восьмая часть): задержанная очередь
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )