1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/jonny-li-rabbitmq-study

В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
07-最适合入门的RabbitMQ+PHP教程(七)RPC.md 14 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 02.06.2025 18:34 9a50763

В предыдущей главе мы рассмотрели наиболее подходящий для начинающих учебник по RabbitMQ+PHP (шестая часть): использование Topic exchange (тематического обмена). С помощью реального кода мы можем четко понять общие правила соответствия и рабочий процесс. В этой главе мы рассмотрим RPC-модель RabbitMQ.

Предварительные условия

RabbitMQ установлен и запущен на localhost по стандартному порту (5672). Если вы используете другую машину, порт или учетные данные, вам потребуется настроить параметры подключения. Если вы столкнетесь с проблемами в этом учебнике, вы всегда можете связаться с нами по электронной почте.

Определение RPC

Чтобы использовать RPC, необходимо понять, что это такое и как это работает. RPC (Remote Procedure Call Protocol) — это протокол удаленного вызова процедур. Обычно в больших компаниях каждый проект состоит из нескольких систем, таких как система управления запасами, система товаров, система заказов и т.д. Разные группы разработчиков поддерживают разные системы, каждая из которых работает на разных машинах. Однако часто требуется вызывать данные между машинами. Поскольку они не находятся на одной машине, вызовы должны осуществляться через сеть. Существует множество протоколов RPC, таких как Java RMI, WebApi и т.д.### Официальное описание RPC Хотя RPC является очень распространенной моделью в вычислениях, она часто подвергается критике. Проблемы возникают, когда программисты не знают, является ли вызов функции локальным или медленным RPC. Такое смешение приводит к непредсказуемому поведению системы и усложняет отладку. Неправильное использование RPC может привести к трудно поддерживаемому коду "спагетти", а не к упрощению программного обеспечения. Учитывая это, рассмотрите следующие рекомендации:

  1. Убедитесь, что ясно различаются локальные и удаленные вызовы функций.
  2. Записывайте вашу систему. Сделайте зависимости между компонентами явными.
  3. Обрабатывайте случаи ошибок. Как должен реагировать клиент, если сервер RPC долго недоступен?
  4. Если у вас есть сомнения, избегайте использования RPC. Если возможно, используйте асинхронные трубы — вместо блокирующего RPC отправляйте результаты асинхронно в следующую стадию вычислений.### Описание функций В этом учебнике мы используем RabbitMQ для реализации RPC-вызовов. Для этого нам потребуется использовать обратный обмен (callback queue). (1) Обратный обмен, с помощью RabbitMQ RPC очень прост. Клиент отправляет сообщение, а сервер отвечает на него. Однако для получения ответа нам необходимо отправить адрес обратного обмена вместе с запросом.Наиболее подходящий для начинающих учебник по RabbitMQ+PHP (семь) RPC -- улиточное гнездо

(2) Атрибуты сообщения

AMQP 0-9-1 протокол предварительно определяет 14 атрибутов, связанных с сообщением. Большинство атрибутов редко используются, но следующие являются исключениями:

  1. delivery_mode: метка режима доставки сообщения, 2 — сообщение устойчиво, другие значения — временно.
  2. content_type: тип содержимого, используемый для описания mime-типа кодировки. Например, часто для этого атрибута задается JSON-кодировка.
  3. reply_to: имя обратного вызова очереди.
  4. correlation_id: корреляционный идентификатор, для удобства ассоциации ответа с запросом.(3) Процесс обработки RPC
  5. После запуска RPC-клиента создается анонимная, эксклюзивная, обратная вызова очередь.
  6. RPC-клиент устанавливает два атрибута сообщения: replyTo (имя обратной вызовной очереди) и correlationId (идентификатор запроса), затем отправляет сообщение в очередь rpc_queue.
  7. Запрос отправляется в очередь rpc_queue.
  8. RPC-сервер прослушивает сообщения запроса в очереди rpc_queue. После обработки запроса RPC-сервер упаковывает результат в сообщение и отправляет его в обратную вызовную очередь, указанную в replyTo, и добавляет correlationId к сообщению.
  9. RPC-клиент прослушивает сообщения в обратной вызовной очереди replyTo. При получении сообщения он проверяет correlationId. Если значение совпадает с тем, что было отправлено ранее, это значение является результатом обработки RPC.(4) Correlation Id?

Если для каждого RPC создается отдельная обратная вызов очередь, это будет очень неэффективно. Вместо этого можно создать отдельную обратную вызов очередь для каждого клиента. Однако, если очередь получает ответное сообщение, она может не знать, к какому запросу оно относится. Для этого используется атрибут correlationId. Мы должны установить уникальное значение для каждого запроса. Затем, получая сообщения из обратной вызов очереди, проверяем этот атрибут, чтобы связать ответ с запросом. Если мы получаем сообщение с неизвестным значением correlationId, можно смело игнорировать его — это не наш запрос. Вы можете задаться вопросом, почему следует игнорировать неизвестные сообщения в обратной вызов очереди, а не считать их ошибкой? Это связано с конкурентными условиями на сервере. Хотя это маловероятно, но если RPC-сервер отправляет нам результат, а затем выходит из строя до отправки обратного вызова, это может привести к получению сообщения с неизвестным значением correlationId. Если это произошло, перезапуск RPC-сервера приведет к повторной обработке запроса. Именно поэтому клиент должен хорошо обрабатывать повторные ответы, а RPC должен быть идемпотентным.

Клиентский код RPC (client code)

Основные шаги бизнес-логики:

  1. Настройка фабрики подключения
  2. Установка TCP-соединения
  3. Создание канала на основе TCP-соединения 4.Определение временного очередного имени replyQueueName, объявление уникального идентификатора corrId для текущего запроса и настройка параметров сообщения для отправки
  4. Отправка сообщения в очередь rpc_queue с использованием стандартного обменника
  5. Ожидание текущего процесса с использованием блокирующей очереди BlockingQueue
  6. При получении запроса, помещение запроса в BlockingQueue, пробуждение основного потока и вывод возвращаемого содержимого
public function call($body = null, $name = null)
{
    if (! $body || ! $name) return false;
    $this->response = null;
    // Генерация уникального идентификатора corrId
    $this->corr_id = uniqid();
    // Установка параметров replyTo для очереди callback_queue и корреляционного идентификатора
    $params = ['correlation_id' => $this->corr_id, 'reply_to' => $this->callback_queue];
    $message = new AMQPMessage((string) $body, $params);
    $this->amqp_channel->basic_publish($message, '', $name);
    while (! $this->response) {
        // Ожидание обработки
        $this->amqp_channel->wait();
    }
    return intval($this->response);
}

Полный код Клиентский код RPC: https://gitee.com/jonny-li/rabbitmq-study/blob/master/rabbitmq_test/RPC/rpc_client.php### Код службы RPC (серверный код)

  1. Настройка фабрики подключения
  2. Установка TCP-соединения
  3. Создание канала на основе TCP-соединения
  4. Объявление очереди rpc_queue
  5. Установка ограничения на получение одного сообщения одновременно
  6. Ожидание сообщений в очереди rpc_queue
  7. При получении сообщения, вызов обработчика для обработки сообщения и отправка ответа в очередь replyTo с корреляционным идентификатором
  8. Использование wait-notify для синхронизации основного потока и обработчика
public function basic_consume($queue_name = null)
{
    if (!$queue_name) return false;
    // Обработчик
    echo " [x] Ожидание RPC-запросов\n";
    $callback = function ($response) {
        $n = intval($response->body);

        echo ' [.] Это обратный вызов данных:(', $response->body, ")\n";

        $params = ['correlation_id' => $response->get('correlation_id')];

        $msg = new AMQPMessage((string) $this->fib($n), $params);

        $response->delivery_info['channel']->basic_publish($msg, '', $response->get('reply_to'));
    };
}

Полный код

Код сервера RPC: https://gitee.com/jonny-li/rabbitmq-study/blob/master/rabbitmq_test/RPC/rpc_server.php

Результаты тестирования

Наиболее подходящий для новичков учебник RabbitMQ+PHP (7) RPC -- Журнал

Наиболее подходящий для новичков учебник RabbitMQ+PHP (7) RPC -- Журнал``` [RpcServer] Ожидание RPC-запросов [RpcClient] Запрос 55 [RpcServer receive] [.] Это обратный вызов данных: (10)

### Заключение
RPC (Remote Procedure Call) завершен. Это асинхронное взаимодействие между клиентом и сервером, где сервер отвечает на запросы клиента через уникальный идентификатор, используя временный и эксклюзивный канал для передачи данных обратно клиенту. В следующем разделе будет рассмотрена "реализация задержки RabbitMQ".### Оригинальная ссылка: [https://www.phpassn.com/article/112.html](https://www.phpassn.com/article/112.html)

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/jonny-li-rabbitmq-study.git
git@api.gitlife.ru:oschina-mirror/jonny-li-rabbitmq-study.git
oschina-mirror
jonny-li-rabbitmq-study
jonny-li-rabbitmq-study
master