1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/hzhangjiayun-multi-process-queue

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

многопоточная-очередь

Многопоточная очередь на основе Swoole, где manage-процесс управляет дочерними процессами, мастер-процесс слушает очередь и распределяет задачи, а worker-процессы выполняют эти задачи. Обеспечивает многопоточность, минимальную задержку (в миллисекундах), низкое потребление ресурсов, высокую доступность и возможность распределённого размещения на нескольких серверах. Может использоваться вместе с фреймворками Laravel, ThinkPHP и другими.

Требования версий:

  • php>=7.1
  • swoole >= 4.4

Поддерживаемые драйверы:

  • redis (требуется версия Redis > 3.0.2, рекомендуется версия расширения Redis >= 4.0.0)### Основные характеристики
  • Минимальная задержка в миллисекунды
  • Поддержка пользовательских попыток повторной отправки и обратного вызова ошибок
  • Настройка пользовательского времени ожидания
  • Установка пользовательской функции запуска для предварительной подготовки, что позволяет удобно инициализировать другие фреймворки и проекты
  • Мастер-процесс использует корутины для прослушивания очередей, снижая задержку
  • Несколько рабочих процессов для выполнения задач
  • Поддержка однокнопочной активации корутины в рабочих процессах
  • Поддержка работы в фоновом режиме как демона, без необходимости использования других управляющих процессов
  • Возможность распределённого размещения на нескольких серверах
  • Высокая доступность, основанная на времени ожидания, гарантирующая успешное выполнение обратного вызова при отключении процесса.### Структура процессов
### Описание конфигурации Настройка Тип Обязательность Значение по умолчанию Описание
basics массив Да Нет Базовая конфигурация
basics.name строка Да Нет Имя текущего сервиса очередей; при запуске нескольких сервисов одновременно каждому сервису следует присвоить уникальное имя (имя может быть произвольным, главное — чтобы все имена были уникальными)
basics.pid_path строка Нет /tmp Путь хранения файла pid главного процесса
basics.driver строка Да Нет Драйвер очередей должен реализовывать MPQueue\Queue\Driver\DriverInterface
worker_start_handle вызываемый объект Нет пустая строка Вызывается после старта worker-процесса (эффективен для всех очередей в текущем сервисе)
log массив Да Нет Конфигурация логирования
log.path строка Нет /tmp Путь хранения логов
log.level целое число Нет Monolog\Logger::INFO Уровень записи логов Monolog\Logger::DEBUG/Monolog\Logger::INFO
log.driver строка/класс Нет RotatingFileLogDriver Логгер, должен реализовывать MPQueue\Log\Driver\LogDriverInterface
queue двумерный массив Да Нет Конфигурация очередей
queue[0].name строка Да Нет Имя очереди
queue[0].worker_number целое число Нет 3 Количество рабочих процессов
queue[0].memory_limit целое число Нет 128 Максимальное использование памяти рабочими процессами (в мегабайтах, 0 — без ограничений)
queue[0].sleep_seconds float Нет 1 Время сна процесса наблюдения (в секундах, минимальное значение до 0.001)
queue[0].fail_number целое число Нет 3 Максимальное количество попыток выполнения задачи (от стороны отправителя задачи). Если задача не выполнена за указанный период времени, она будет заново отправлена (то есть задача может быть выполнена максимум fail_number + 1 раз)
queue[0].fail_expire целое число Нет 3 Время задержки повторной отправки задачи при неудаче (в секундах, поддерживает точность до 0.001) (от стороны отправителя задачи)
queue[0].fail_handle вызываемый объект Нет пустой Функция, выполняемая при неудаче задачи (выполняется, когда задача просрочена или достигнуто максимальное количество попыток выполнения)
queue[0].worker_start_handle вызываемый объект Нет пустой Функция, выполняемая при запуске рабочего процесса (эффективна для текущей очереди)
queue[0].mode целое число Нет \MPQueue\Config\QueueConfig::MODEL_DISTRIBUTE Режим работы очереди (\MPQueue\Config\QueueConfig::MODEL_DISTRIBUTE — распределенный режим, \MPQueue\Config\QueueConfig::MODEL_GRAB — конкурентный режим)

Функция fail_handle принимает два параметра: $jobInfo — детали задачи и $e — объект исключения при ошибке. Запуск fail_handle также контролируется значением timeout.#### Примечание: если задача превышает время выполнения, она сразу считается неудачной, и не будет повторяться в соответствии с fail_number; можно использовать fail_handle для проверки, является ли задача неудачной из-за превышения времени выполнения, используя $jobInfo['type']#### Изменения в версии 2.0

  1. Отменено обратное вызов при превышении времени выполнения; теперь задача автоматически считается неудачной и запускается обратный вызов при неудаче.
  2. Вместо того чтобы каждый раз вызывать обратный вызов при неудаче, теперь он вызывается только после превышения времени выполнения или превышения количества попыток повторной отправки.
  3. Добавлен режим захвата: в этом режиме рабочий процесс активно берёт задачи, что увеличивает количество запросов к базе данных и соединений (ранее использовался распределённый режим).
  4. Улучшена надёжность: время простоя применимо к обратному вызову при неудаче; если процесс завершился аварийно во время выполнения обратного вызова при неудаче, остальные процессы снова выполнят обратный вызов при неудаче по истечению времени простоя.
  5. Оптимизация производительности распределённого режима: теперь задачи распределяются согласно текущему количеству свободных процессов, что значительно повышает QPS.

Планы для следующей версии

  • Поддержка кластера Redis (Redis Cluster)### Пример конфигурации
[
    'basics' => [
        'name' => 'mp-queue-1', // Необходимо указать уникальное имя для каждого сервера при одновременном запуске нескольких серверов
        'driver' => new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
    ],
    'queue' => [
        [
            'name' => 'test', // Имя очереди
            'fail_handle' => function () {
                var_dump('Произошла ошибка');
            }, // Функция обратного вызова при неудаче
        ],
        [
            'name' => 'test2', // Имя очереди
            'worker_number' => 4, // Количество рабочих процессов для данной очереди
            'memory_limit' => 0, // Максимальное использование памяти каждым рабочим процессом для данной очереди. При превышении этого значения процесс будет перезапущен. Единица измерения — мегабайты
        ]
    ]
]

Быстрый старт

1. Установка

  • Установка через Composer
composer require yuntian001/multi-process-queue
  • Или скачивание архива и выполнение composer install

2. Запуск очередей

  • Создайте файл main.php
<?php
define('MP_QUEUE_CLI', true);
use MPQueue\Config\Config;
require_once __DIR__ . '/vendor/autoload.php';

$config = [
    'basics' => [
        'name' => 'mp-queue-1', // Название для нескольких серверов при одновременной активации
        'driver' => new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
    ],
    'queue' => [
        [
            'name' => 'test', // Название очереди
            'fail_handle' => function () {
                var_dump('Ошибка!');
            }, // Обработчик ошибок
        ],
        [
            'name' => 'test2', // Название очереди
            'worker_number' => 4, // Количество рабочих процессов в очереди
            'memory_limit' => 0, // Ограничение памяти для рабочего процесса.
        ]
    ]
];
```При превышении  перезапуск. Единицы измерения: МБ
         ]
     ]
 ];
 Config::set($config);
 (new \MPQueue\Console\Application())->run();
  • Выполните команду worker:start для запуска очереди
  php master.php worker:start
  • Или запустите в фоновом режиме
  php master.php worker:start -d
  • Поддерживаемые команды (помощь доступна с помощью команды help)
 php master.php help
  queue:clean Очистить очередь --queue test Очистить указанную очередь:test
  queue:status Просмотреть информацию о очередях --queue test Указать очередь:test
  queue:failed Вывести детали ошибок --queue test Указать очередь:test
  worker:start Запустить --d для запуска в фоновом режиме
  worker:stop Остановить
  worker:restart Перезапустить --d для запуска в фоновом режиме
  worker:reload Гладкий перезапуск
  worker:status Проверить состояние процесса

3. Отправка задач

- Выполните следующий код для отправки задач (может вызываться через веб, без необходимости загрузки расширения Swoole)
    require_once __DIR__ . '/vendor/autoload.php'; // Загрузка файла composer если он еще не был загружен
    $config = [
        'basics' => [
            'name' => 'mp-queue-1', // Название для нескольких серверов при одновременной активации
            'driver' => new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
        ],
        'queue' => [
            [
                'name' => 'test', // Название очереди
                'fail_handle' => function() {
                    var_dump('Ошибка!');
                }, // Обработчик ошибок
            ],
            [
                'name' => 'test2',// название очереди
            ],
         ],
     ];
``````markdown
           'worker_number' => 4, // количество рабочих процессов в текущей очереди
            'memory_limit' => 0, // максимальное использование памяти рабочими процессами в текущей очереди. Если лимит превышен, процессы будут перезапущены. Единицы измерения - мегабайты (MB)
        ]
    ],
);
 Config::set($config);
 $job = function(){
    var_dump('hello world!');
 };
 MPQueue\Queue\Queue::push('test', $job, 10);

MPQueue\Queue\Queue::push принимает три параметра:

  • $queue название очереди
  • $job задача, которую нужно отправить $job может быть типа callable или объектом, созданным от подкласса \MPQueue\Job. Для конкретных тестов можно обратиться к файлам в папке test. Когда $job является анонимной функцией, рабочий процесс очереди и процесс отправки задачи могут находиться в разных проектах.

**Рекомендуется использовать** подклассы \MPQueue\Job для отправки $job, так как в этих подклассах можно настроить время ожидания, количество допустимых ошибок, время повторной попытки, функцию выполнения при превышении времени ожидания и функцию выполнения при возникновении ошибки. Подробное описание параметров доступно в [src/Job.php](src/Job.php).

- $delay — задержка перед отправкой задачи (в секундах), значение по умолчанию равно 0 (немедленная отправка).

Примеры отправки задач представлены в файлах normal.php, failed.php, timeout.php и pressure.php в папках test и test/Job.```## Внимание
- Команда `worker:reload` заставляет рабочий процесс завершить текущую задачу и перезапуститься. Команда `worker:restart` выполняет жёсткий перезапуск всех процессов manage, master и worker, что может привести к прерыванию выполнения задачи.
- Поскольку процессы загружаются сразу после запуска в оперативную память, изменения кода не будут применены немедленно. Чтобы изменения вступили в силу, необходимо выполнить команду `worker:reload` или `worker:start`.
- Команда `worker:reload` перезапускает только рабочие процессы и не перезагружает конфигурационные файлы. После изменения конфигурационных файлов необходимо выполнить команду `worker:restart`, чтобы изменения вступили в силу.## Использование в Laravel
- Установка
```bash
composer require yuntian001/multi-process-queue
  • Создайте конфигурационный файл mp-queue.php в папке config
<?php
// Пример конфигурации, конкретные настройки могут быть указаны согласно документации
return [
    'basics'=>[
        'name'=>'mp-queue-1',// Название должно быть уникальным для каждого сервера
        'driver'=> new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
        'worker_start_handle'=>function(){
            // Загрузка основного кода Laravel
            defined('LARAVEL_START') || define('LARAVEL_START', microtime(true));
            $app = require_once __DIR__.'/../bootstrap/app.php';
            $app->make(Illuminate\Contracts\Console\Kernel::class)->bootstrap();
        }
    ],
    'queue' => [
        [
            'name' => 'test',// Название очереди
            'fail_handle'=>function($info,$e){
                // Логика обработки ошибок, например запись в базу данных MySQL
            },// Обработка ошибок
        ],
        [
            'name' => 'test2',// Название очереди
            'worker_number' => 4,// Количество рабочих процессов для данной очереди
            'memory_limit' => 0, // Максимальное использование памяти рабочими процессами этой очереди, если превышено — перезапуск. Единицы измерения — мегабайты
        ]
    ],
    'log' => [
        'path' => __DIR__.'/../storage/logs',// Директория для хранения логов должна иметь права записи
    ]
];
  • В корневой директории проекта создайте запускательный файл mp-queue
#!/usr/bin/env php
<?php
define('MP_QUEUE_CLI', true);
use MPQueue\Config\Config;

require __DIR__ . '/vendor/autoload.php';
Config::set(include(__DIR__ . '/config/mp-queue.php'));
(new \MPQueue\Console\Application())->run();
  • Создайте класс задачи App\Job\HelloWord
<?php
// Расположение файла app/Job/HelloWord.php
namespace App\Job;
```use Illuminate\Support\Facades\Log;
use MPQueue\Job;

class HelloWord extends Job
{
    // В методе handle можно использовать методы Laravel, но передача аргументов через внедрение зависимостей не поддерживается
    public function handle()
    {
        var_dump('Hello world!');
        Log::info('Hello world!');
    }
}
  • Запустите очередь задач в фоновом режиме (или просто php mp-queue worker:start без -d)
 php mp-queue worker:start -d
```- Отправьте задачу. Добавьте следующий код в любом месте (например, в контроллере):
```php
    \MPQueue\Config\Config::set(config('mp-queue')); // Настройки конфигурации также могут быть установлены в контейнере приложения AppServiceProvider boot()
    \MPQueue\Queue\Queue::push('test', \App\Job\HelloWord::class);

Использование в ThinkPHP5.0

  • Установка
composer require yuntian001/multi-process-queue
  • Создайте конфигурационный файл mp-queue.php в папке application/extra
<?php
// Пример конфигурации, конкретные настройки можно указать согласно документации
return [
    'basics' => [
        'name' => 'mp-queue-1', // При одновременном запуске на нескольких серверах каждому нужно присвоить уникальное имя
        'driver' => new \MPQueue\Queue\Driver\Redis('127.0.0.1'),
        'worker_start_handle' => function () {
            // Инициализация основного приложения ThinkPHP
            \think\App::initCommon();
        },
    ],
    'queue' => [
        [
            'name' => 'test', // Название очереди
            'fail_handle' => function ($info, $e) {
                // Логика выполнения при ошибке, например запись в MySQL
            }, // Обработчик ошибок
        ],
        [
            'name' => 'test2', // Название очереди
            'worker_number' => 4, // Количество рабочих процессов для текущей очереди
            'memory_limit' => 0, // Максимальное использование памяти рабочими процессами текущей очереди. Если превышено, процессы будут перезапущены. Единица измерения - МБ
        ],
    ],
    'log' => [
        'path' => __DIR__.'/../runtime', // Путь к логам должен иметь права записи
    ],
];
  • В корне проекта создайте запускайший файл mp-queue
#!/usr/bin/env php
<?php
``````php
define('MP_QUEUES_CLI', true);
define('APP_PATH', __DIR__ . '/application/');
// Основной файл ThinkPHP
require __DIR__ . '/thinkphp/base.php';
\MPQueue\Config\Config::set(include(__DIR__ . '/application/extra/mp-queue.php'));
(new \MPQueue\Console\Application())->run();
  • Создайте класс job app\job\HelloWord
<?php
// application/job/HelloWord.php
namespace app\job;

use MPQueue\Job;
use think\Log;

class HelloWord extends Job
{
    // Внутри метода handle можно вызывать методы и функции ThinkPHP, но метод handle не поддерживает передачу аргументов через внедрение зависимостей
    public function handle()
    {
        var_dump('hello world!');
        Log::info('hello world!');
    }
}
?>
- Запустите очередь в фоновом режиме (или запустите без ключа `-d`)
php mp-queue worker:start -d
- Отправьте задачу. Добавьте следующий код в любом месте (например, в контроллере) для отправки задачи

\MPQueue\Config\Config::set(config('mp-queue')); // Настройки конфигурации также могут быть зарегистрированы в поведении инициализации (app_init) для единого загрузочного файла \MPQueue\Queue\Queue::push('test', \app\job\HelloWord::class);

- Установка

composer require yuntian001/multi-process-queue

- Создайте файл конфигурации mp-queue.php в папке config
[ 'name' => 'mp-queue-1', // Название должно быть уникальным при запуске нескольких серверов одновременно 'driver' => new \MPQueue\Queue\Driver\Redis('127.0.0.1'), 'worker_start_handle' => function () { // Инициализация основного приложения ThinkPHP (new \think\App())->initialize(); }, ], 'queue' => [ [ 'name' => 'test', // Название очереди 'fail_handle' => function ($info, $e) { // Логика обработки ошибок, например, запись в MySQL }, // Обработка ошибок ], [ 'name' => 'test2', // Название очереди 'worker_number' => 4, // Количество рабочих процессов текущей очереди 'memory_limit' => 0, // Максимальное использование памяти рабочими процессами текущей очереди. При превышении лимита процессы будут перезапущены. Единицы измерения - мегабайты ], ], 'log' => [ 'path' => __DIR__.'/../runtime', // Путь к файлам журнала должен иметь права записи ], ]; ``` - Создайте файл запуска mp-queue в корневой директории проекта ``` #!/usr/bin/env php run(); ``` - Создайте класс задачи app\job\HelloWord.php ```

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

На основе многопроцессной системы очередей Swoole, процесс manage управляет дочерними процессами, master-процесс отслеживает распределение задач в очереди, worker-процессы выполняют задачи. Многопроцессная система с низкими задержками и использованием ресурсов. Может использоваться совместно с такими фреймворками, как Laravel и ThinkPHP. Развернуть Свернуть
PHP
MIT
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/hzhangjiayun-multi-process-queue.git
git@api.gitlife.ru:oschina-mirror/hzhangjiayun-multi-process-queue.git
oschina-mirror
hzhangjiayun-multi-process-queue
hzhangjiayun-multi-process-queue
main