Многопоточная очередь на основе Swoole, где manage-процесс управляет дочерними процессами, мастер-процесс слушает очередь и распределяет задачи, а worker-процессы выполняют эти задачи. Обеспечивает многопоточность, минимальную задержку (в миллисекундах), низкое потребление ресурсов, высокую доступность и возможность распределённого размещения на нескольких серверах. Может использоваться вместе с фреймворками Laravel, ThinkPHP и другими.
Требования версий:
Поддерживаемые драйверы:
![]() |
Настройка | Тип | Обязательность | Значение по умолчанию | Описание |
---|---|---|---|---|---|
basics | массив | Да | Нет | Базовая конфигурация | |
basics.name | строка | Да | Нет | Имя текущего сервиса очередей; при запуске нескольких сервисов одновременно каждому сервису следует присвоить уникальное имя (имя может быть произвольным, главное — чтобы все имена были уникальными) | |
basics.pid_path | строка | Нет | /tmp | Путь хранения файла pid главного процесса | |
basics.driver | строка | Да | Нет | Драйвер очередей должен реализовывать MPQueue\Queue\Driver\DriverInterface | |
worker_start_handle | вызываемый объект | Нет | пустая строка | Вызывается после старта worker-процесса (эффективен для всех очередей в текущем сервисе) | |
log | массив | Да | Нет | Конфигурация логирования | |
log.path | строка | Нет | /tmp | Путь хранения логов | |
log.level | целое число | Нет | Monolog\Logger::INFO | Уровень записи логов Monolog\Logger::DEBUG/Monolog\Logger::INFO | |
log.driver | строка/класс | Нет | RotatingFileLogDriver | Логгер, должен реализовывать MPQueue\Log\Driver\LogDriverInterface | |
queue | двумерный массив | Да | Нет | Конфигурация очередей | |
queue[0].name | строка | Да | Нет | Имя очереди | |
queue[0].worker_number | целое число | Нет | 3 | Количество рабочих процессов | |
queue[0].memory_limit | целое число | Нет | 128 | Максимальное использование памяти рабочими процессами (в мегабайтах, 0 — без ограничений) | |
queue[0].sleep_seconds | float | Нет | 1 | Время сна процесса наблюдения (в секундах, минимальное значение до 0.001) | |
queue[0].fail_number | целое число | Нет | 3 | Максимальное количество попыток выполнения задачи (от стороны отправителя задачи). Если задача не выполнена за указанный период времени, она будет заново отправлена (то есть задача может быть выполнена максимум fail_number + 1 раз) | |
queue[0].fail_expire | целое число | Нет | 3 | Время задержки повторной отправки задачи при неудаче (в секундах, поддерживает точность до 0.001) (от стороны отправителя задачи) | |
queue[0].fail_handle | вызываемый объект | Нет | пустой | Функция, выполняемая при неудаче задачи (выполняется, когда задача просрочена или достигнуто максимальное количество попыток выполнения) | |
queue[0].worker_start_handle | вызываемый объект | Нет | пустой | Функция, выполняемая при запуске рабочего процесса (эффективна для текущей очереди) | |
queue[0].mode | целое число | Нет | \MPQueue\Config\QueueConfig::MODEL_DISTRIBUTE | Режим работы очереди (\MPQueue\Config\QueueConfig::MODEL_DISTRIBUTE — распределенный режим, \MPQueue\Config\QueueConfig::MODEL_GRAB — конкурентный режим) |
Функция fail_handle принимает два параметра: $jobInfo — детали задачи и $e — объект исключения при ошибке. Запуск fail_handle также контролируется значением timeout.#### Примечание: если задача превышает время выполнения, она сразу считается неудачной, и не будет повторяться в соответствии с fail_number; можно использовать fail_handle для проверки, является ли задача неудачной из-за превышения времени выполнения, используя $jobInfo['type']#### Изменения в версии 2.0
[
'basics' => [
'name' => 'mp-queue-1', // Необходимо указать уникальное имя для каждого сервера при одновременном запуске нескольких серверов
'driver' => new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
],
'queue' => [
[
'name' => 'test', // Имя очереди
'fail_handle' => function () {
var_dump('Произошла ошибка');
}, // Функция обратного вызова при неудаче
],
[
'name' => 'test2', // Имя очереди
'worker_number' => 4, // Количество рабочих процессов для данной очереди
'memory_limit' => 0, // Максимальное использование памяти каждым рабочим процессом для данной очереди. При превышении этого значения процесс будет перезапущен. Единица измерения — мегабайты
]
]
]
composer require yuntian001/multi-process-queue
composer install
main.php
<?php
define('MP_QUEUE_CLI', true);
use MPQueue\Config\Config;
require_once __DIR__ . '/vendor/autoload.php';
$config = [
'basics' => [
'name' => 'mp-queue-1', // Название для нескольких серверов при одновременной активации
'driver' => new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
],
'queue' => [
[
'name' => 'test', // Название очереди
'fail_handle' => function () {
var_dump('Ошибка!');
}, // Обработчик ошибок
],
[
'name' => 'test2', // Название очереди
'worker_number' => 4, // Количество рабочих процессов в очереди
'memory_limit' => 0, // Ограничение памяти для рабочего процесса.
]
]
];
```При превышении — перезапуск. Единицы измерения: МБ
]
]
];
Config::set($config);
(new \MPQueue\Console\Application())->run();
worker:start
для запуска очереди php master.php worker:start
php master.php worker:start -d
help
) php master.php help
queue:clean Очистить очередь --queue test Очистить указанную очередь:test
queue:status Просмотреть информацию о очередях --queue test Указать очередь:test
queue:failed Вывести детали ошибок --queue test Указать очередь:test
worker:start Запустить --d для запуска в фоновом режиме
worker:stop Остановить
worker:restart Перезапустить --d для запуска в фоновом режиме
worker:reload Гладкий перезапуск
worker:status Проверить состояние процесса
- Выполните следующий код для отправки задач (может вызываться через веб, без необходимости загрузки расширения Swoole)
require_once __DIR__ . '/vendor/autoload.php'; // Загрузка файла composer если он еще не был загружен
$config = [
'basics' => [
'name' => 'mp-queue-1', // Название для нескольких серверов при одновременной активации
'driver' => new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
],
'queue' => [
[
'name' => 'test', // Название очереди
'fail_handle' => function() {
var_dump('Ошибка!');
}, // Обработчик ошибок
],
[
'name' => 'test2',// название очереди
],
],
];
``````markdown
'worker_number' => 4, // количество рабочих процессов в текущей очереди
'memory_limit' => 0, // максимальное использование памяти рабочими процессами в текущей очереди. Если лимит превышен, процессы будут перезапущены. Единицы измерения - мегабайты (MB)
]
],
);
Config::set($config);
$job = function(){
var_dump('hello world!');
};
MPQueue\Queue\Queue::push('test', $job, 10);
MPQueue\Queue\Queue::push принимает три параметра:
$queue
название очереди$job
задача, которую нужно отправить
$job
может быть типа callable или объектом, созданным от подкласса \MPQueue\Job. Для конкретных тестов можно обратиться к файлам в папке test.
Когда $job
является анонимной функцией, рабочий процесс очереди и процесс отправки задачи могут находиться в разных проектах.
**Рекомендуется использовать** подклассы \MPQueue\Job для отправки $job, так как в этих подклассах можно настроить время ожидания, количество допустимых ошибок, время повторной попытки, функцию выполнения при превышении времени ожидания и функцию выполнения при возникновении ошибки. Подробное описание параметров доступно в [src/Job.php](src/Job.php).
- $delay — задержка перед отправкой задачи (в секундах), значение по умолчанию равно 0 (немедленная отправка).
Примеры отправки задач представлены в файлах normal.php, failed.php, timeout.php и pressure.php в папках test и test/Job.```## Внимание
- Команда `worker:reload` заставляет рабочий процесс завершить текущую задачу и перезапуститься. Команда `worker:restart` выполняет жёсткий перезапуск всех процессов manage, master и worker, что может привести к прерыванию выполнения задачи.
- Поскольку процессы загружаются сразу после запуска в оперативную память, изменения кода не будут применены немедленно. Чтобы изменения вступили в силу, необходимо выполнить команду `worker:reload` или `worker:start`.
- Команда `worker:reload` перезапускает только рабочие процессы и не перезагружает конфигурационные файлы. После изменения конфигурационных файлов необходимо выполнить команду `worker:restart`, чтобы изменения вступили в силу.## Использование в Laravel
- Установка
```bash
composer require yuntian001/multi-process-queue
<?php
// Пример конфигурации, конкретные настройки могут быть указаны согласно документации
return [
'basics'=>[
'name'=>'mp-queue-1',// Название должно быть уникальным для каждого сервера
'driver'=> new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
'worker_start_handle'=>function(){
// Загрузка основного кода Laravel
defined('LARAVEL_START') || define('LARAVEL_START', microtime(true));
$app = require_once __DIR__.'/../bootstrap/app.php';
$app->make(Illuminate\Contracts\Console\Kernel::class)->bootstrap();
}
],
'queue' => [
[
'name' => 'test',// Название очереди
'fail_handle'=>function($info,$e){
// Логика обработки ошибок, например запись в базу данных MySQL
},// Обработка ошибок
],
[
'name' => 'test2',// Название очереди
'worker_number' => 4,// Количество рабочих процессов для данной очереди
'memory_limit' => 0, // Максимальное использование памяти рабочими процессами этой очереди, если превышено — перезапуск. Единицы измерения — мегабайты
]
],
'log' => [
'path' => __DIR__.'/../storage/logs',// Директория для хранения логов должна иметь права записи
]
];
#!/usr/bin/env php
<?php
define('MP_QUEUE_CLI', true);
use MPQueue\Config\Config;
require __DIR__ . '/vendor/autoload.php';
Config::set(include(__DIR__ . '/config/mp-queue.php'));
(new \MPQueue\Console\Application())->run();
<?php
// Расположение файла app/Job/HelloWord.php
namespace App\Job;
```use Illuminate\Support\Facades\Log;
use MPQueue\Job;
class HelloWord extends Job
{
// В методе handle можно использовать методы Laravel, но передача аргументов через внедрение зависимостей не поддерживается
public function handle()
{
var_dump('Hello world!');
Log::info('Hello world!');
}
}
php mp-queue worker:start
без -d
) php mp-queue worker:start -d
```- Отправьте задачу. Добавьте следующий код в любом месте (например, в контроллере):
```php
\MPQueue\Config\Config::set(config('mp-queue')); // Настройки конфигурации также могут быть установлены в контейнере приложения AppServiceProvider boot()
\MPQueue\Queue\Queue::push('test', \App\Job\HelloWord::class);
composer require yuntian001/multi-process-queue
mp-queue.php
в папке application/extra
<?php
// Пример конфигурации, конкретные настройки можно указать согласно документации
return [
'basics' => [
'name' => 'mp-queue-1', // При одновременном запуске на нескольких серверах каждому нужно присвоить уникальное имя
'driver' => new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
'worker_start_handle' => function () {
// Инициализация основного приложения ThinkPHP
\think\App::initCommon();
},
],
'queue' => [
[
'name' => 'test', // Название очереди
'fail_handle' => function ($info, $e) {
// Логика выполнения при ошибке, например запись в MySQL
}, // Обработчик ошибок
],
[
'name' => 'test2', // Название очереди
'worker_number' => 4, // Количество рабочих процессов для текущей очереди
'memory_limit' => 0, // Максимальное использование памяти рабочими процессами текущей очереди. Если превышено, процессы будут перезапущены. Единица измерения - МБ
],
],
'log' => [
'path' => __DIR__.'/../runtime', // Путь к логам должен иметь права записи
],
];
mp-queue
#!/usr/bin/env php
<?php
``````php
define('MP_QUEUES_CLI', true);
define('APP_PATH', __DIR__ . '/application/');
// Основной файл ThinkPHP
require __DIR__ . '/thinkphp/base.php';
\MPQueue\Config\Config::set(include(__DIR__ . '/application/extra/mp-queue.php'));
(new \MPQueue\Console\Application())->run();
job
app\job\HelloWord
<?php
// application/job/HelloWord.php
namespace app\job;
use MPQueue\Job;
use think\Log;
class HelloWord extends Job
{
// Внутри метода handle можно вызывать методы и функции ThinkPHP, но метод handle не поддерживает передачу аргументов через внедрение зависимостей
public function handle()
{
var_dump('hello world!');
Log::info('hello world!');
}
}
?>
- Запустите очередь в фоновом режиме (или запустите без ключа `-d`)
php mp-queue worker:start -d
- Отправьте задачу. Добавьте следующий код в любом месте (например, в контроллере) для отправки задачи
\MPQueue\Config\Config::set(config('mp-queue')); // Настройки конфигурации также могут быть зарегистрированы в поведении инициализации (app_init) для единого загрузочного файла \MPQueue\Queue\Queue::push('test', \app\job\HelloWord::class);
- Установка
composer require yuntian001/multi-process-queue
- Создайте файл конфигурации mp-queue.php в папке config
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )