1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/JesusSlim-mqttclient

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
README_CN.md 11 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 28.11.2024 21:59 5be66c0

PHP mqtt client

Использование

English [Chinese]

Установка

Install:

composer require jesusslim/mqttclient

Если используется версия dev-master, необходимо добавить composer конфигурацию:

"minimum-stability": "dev"

Зависимости

swoole 2.0.8+
mosquitto.so расширение

Пример

Библиотека предоставляет два вида реализации mqtt клиента. Первый основан на swoole (не рекомендуется, см. ниже пример с swoole), второй — на mosquitto (см. пример с mosquitto).

Различия между реализациями:

  • разные зависимости;
  • в расширении swoole есть проблемы с пакетами и сбоями (которые всё ещё не решены в текущей версии 4.2.7). Поэтому при использовании необходимо вручную обрабатывать эти исключения. Например, следить за состоянием процесса, перезапускать процесс при сбое или получении неполного пакета.
  • клиент на основе swoole может одновременно отправлять и получать сообщения в одном процессе, клиент на основе mosquitto является блокирующим, для отправки и получения сообщений требуется запустить два процесса.
  • некоторые различия в синтаксисе между swoole и mosquitto.

Пример с mosquitto

Определите свой логгер:

class Logger implements \mqttclient\src\swoole\MqttLogInterface {

    public function log($type,$content,$params = []){
            echo "$type : $content \r\n";
     }
}

Используйте Mqttclient (для получения сообщений):

$host = '127.0.0.1';
$port = 1883;
$r = new \mqttclient\src\mosquitto\MqttClient($host,$port,10017);
$r->setAuth('username','password');
$r->setKeepAlive(60);
$r->setLogger(new Logger());
$r->setMaxReconnectTimesWhenError(360*12);
//reconnect interval
$r->setReconnectInterval(10);
//subscribe topics,callback's params can be any data we mapped into the container(IOC)
$r->setTopics(
[
    new \mqttclient\src\subscribe\Topic('test/slim',function($msg){
        echo "I receive:".$msg."\r\n";}),
    new \mqttclient\src\subscribe\Topic('test/slim3',function(\mqttclient\src\swoole\MqttClient $client,$msg){
        echo "I receive:".$msg." for slim3 \r\n";
        echo $client->getClientId();
    })
]
);
//set trigger
$r->on(\mqttclient\src\consts\ClientTriggers::SOCKET_CONNECT,function(){
    //do something
});
$r->start();

Sender (для отправки сообщений):

$host = '127.0.0.1';
$port = 1883;
$r = new \mqttclient\src\mosquitto\MqttSender($host,$port,10017);
$r->setAuth('username','password');
$r->setKeepAlive(60);
$r->setLogger(new Logger());
$r->setMaxReconnectTimesWhenError(360*12);
//reconnect interval
$r->setReconnectInterval(10);
$r->setQueue(new Queue());
$r->start();

Необходимо реализовать класс Queue, который реализует интерфейс mqttclient\src\mosquitto\MqttSendingQueue, чтобы упростить получение сообщений для отправки в цикле.

Пример с swoole (не рекомендуется)

Определите логгер для ведения журнала (реализуйте интерфейс mqttclient\src\swoole\MqttLogInterface, рекомендуется использовать простейший echo или запись в файл/redis):

class Logger implements \mqttclient\src\swoole\MqttLogInterface {

    public function log($type,$content,$params = []){
            echo "$type : $content \r\n";
     }
}

Определите tmp store для хранения данных во время работы программы (рекомендуется Redis или простейшая реализация в памяти):

class Store implements \mqttclient\src\swoole\TmpStorageInterface{

    private $data = [];

    public function set($message_type, $key, $sub_key, $data, $expire = 3600)
    {
        $this->data[$message_type][$key][$sub_key] = $data;
    }

    public function get($message_type, $key, $sub_key)
    {
        return $this->data[$message_type][$key][$sub_key];
    }

    public function delete($message_type, $key, $sub_key)
    {
        if (!isset($this->data[$message_type][$key][$sub_key])){
            echo "storage not found:$message_type $key $sub_key";
        }
        unset($this->data[$message_type][$key][$sub_key]);
    }

}

Пример MqttClient:

$host = '127.0.0.1';
$port = 1883;

$r = new \mqttclient\src\swoole\MqttClient($host,$port,10017);
$r->setAuth('username','password');
$r->setKeepAlive(60);
$r->setLogger(new Logger());
$r->setStore(new Store());
//dns lookup
$r->setDnsLookup(true);
//缓冲区大小
$r->setSocketBufferSize(1024*1024*5);
//最大错误重连次数
$r->setMaxReconnectTimesWhenError(360*12);
//尝试重连间隔
$r->setReconnectInterval(10000);
//订阅topics 回调以依赖注入方式实现 入参可以是任何在mqttclient容器中注入过的变量 如预定义的msg,\mqttclient\src\swoole\MqttClient $client等等
$r->setTopics(
[
    new \mqttclient\src\subscribe\Topic('test/slim',function($msg){
        echo "I receive:".$msg."\r\n";}),
    new \mqttclient\src\subscribe\Topic('test/slim3',function(\mqttclient\src\swoole\MqttClient

В этом тексте присутствуют фрагменты кода на языке PHP, но они не были переведены. ### Текст запроса $client,$msg){ echo "I receive:".$msg." for slim3 \r\n"; echo $client->getClientId(); }) ] );

//set trigger
$r->on(\mqttclient\src\consts\ClientTriggers::RECEIVE_SUBACK,function(\mqttclient\src\swoole\MqttClient $client){
    $client->publish('slim/echo','GGXX',\mqttclient\src\consts\Qos::ONE_TIME);
});

$r->connect();
$r->publish('test/slim','test qos',2);

Extends

Также можно создавать собственные классы Client, которые наследуются от MqttClient и переопределяют функцию контейнера для внедрения пользовательских переменных, таких как конфигурация или другие данные подключения.

Пример:

class Client extends MqttClient
{
    private $mysql_handler;
    private $mongo_handler;

    public function __construct($host,$port,$client_id,$mysql_conf,$mongo_conf)
    {
        $this->mysql_handler = new Mysqli($mysql_conf);
        $this->mongo_handler = new \MongoClient('mongodb://'.$mongo_conf['username'].':'.$mongo_conf['password'].'@'.$mongo_conf['host'].':'.$mongo_conf['port'].'/'.$mongo_conf['db']);
        parent::__construct($host,$port,$client_id);
    }

     /**
     * Переопределяет функцию produceContainer и позволяет использовать собственный класс, данные или замыкание в обработчике публикации-получения.
     * Например: $client->setTopics([new Topic('test/own',function($mongo,$msg){ $result = $mongo->selectCollection('log_platform','test')->find(['sid' => ['$gte' => intval($msg)]]); })]);
     * @return Injector
     */
    protected function produceContainer()
    {
        $container = new Injector();
        $container->mapData(MqttClient::class,$this);
        $container->mapData(Client::class,$this);
        $container->mapData('mysqli',$this->mysql_handler);
        $container->mapData('mongo',$this->mongo_handler);
        return $container;
    }

}

Рекомендуемое использование

Рекомендуется использовать MqttClient в основном для подписки на данные, их распространения и отправки. Следует избегать выполнения большого количества операций ввода-вывода (например, чтения и записи в базу данных, вызова HTTP API) в главном процессе, чтобы избежать его блокировки. Операции ввода-вывода, такие как запись в базу данных или вызов HTTP API, могут быть выполнены через дочерние процессы swoole_process.

Например, главный процесс получает сообщение о подписке, обрабатывает его и помещает в очередь redis. Дочерний процесс циклически блокируется, читая из очереди redis и выполняя операции записи.

Если дочернему процессу необходимо отправить сообщение, он может сделать это через канал связи, записывая данные в конвейер. Главный процесс считывает данные через события обратного вызова и отправляет их.

Главный процесс также должен контролировать дочерние процессы, чтобы своевременно перезапускать их при возникновении проблем и освобождать ресурсы.

Важно отметить, что создание swoole_процесса должно происходить до соединения клиента, иначе могут возникнуть сложные проблемы с вводом-выводом, а в последних версиях swoole будет выдана ошибка.

Конкретная реализация не рассматривается в этом проекте.

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/JesusSlim-mqttclient.git
git@api.gitlife.ru:oschina-mirror/JesusSlim-mqttclient.git
oschina-mirror
JesusSlim-mqttclient
JesusSlim-mqttclient
master