1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/lingfengx-x-netty-rpc

Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

x-netty-rpc

Фреймворк RPC, основанный на Netty

Поддерживает пользовательские протоколы, поддерживает прямое запуск клиента или интеграцию с Spring, позволяет использовать пользовательские обработчики, прост в использовании

Компонент распределенного скачивания уже использует x-netty-rpc

адрес на Gitee: https://gitee.com/lingfengx/x-downloader

адрес на GitHub: https://github.com/lingfengcoder/x-downloader

Особенности

1. Упрощённый вызов удалённых сервисов (аналогично Feign)

Основные принципы:

  • Интерфейсы, помеченные аннотацией @RpcClient, динамически генерируются в объекты, которые могут автоматически обрабатывать данные Netty, и внедряются в контейнер Spring IOC
  • При получении объекта и выполнении его метода, активируется усиленный динамический объект, и имя метода и его параметры передаются через Netty целевому сервису
  • Целевой класс получает информацию RPC и использует рефлексию для поиска и выполнения целевого метода в контейнере
// Пример: сервис A вызывает метод register сервиса B
// Сервис A
// Интерфейс удалённого сервиса
@RpcClient("ServiceB")
public interface ServiceB {
    void register(BasicFrame<Object> frame);
}

class Test {
    // Или через @Autowired
    @Autowired
    private ServiceB serviceb;

    // Использование удалённого сервиса
    public void test() {
      serviceb.register(new BasicFrame());
    }
}
// Сервис B
// Целевой метод
@RpcComponent("ServiceB")
public class ServiceB {
    @RpcHandler("register")
    public void register(BasicFrame<Object> frame) {
        // Вывод данных BasicFrame
        log.info(frame);
    }
}
```## Пользовательский безопасный протокол SafeFrame

Поддерживает гибкую замену протоколов сериализации (JSON, JAVA, string, protobuf)

Поддерживает автоматическое шифрование и расшифровку (AES, RSA)

Поддерживает автоматическую проверку подписи сообщений для предотвращения изменения (MD5)

Встроенная метка времени для предотвращения повторных атак (timestamp)

Планируется добавить протокол Protobuf и алгоритм сжатия Snappy
  • BEFORE DECODE (66 bytes) AFTER DECODE (66 bytes)
  • +------+--------+------+------------------------------------------------+ +------+--------+------+-----------------------------------------------+
  • | cmd | serial | encrypt | timestamp | client | sign | length | content(11) | cmd | serial | encrypt | timestamp | client | sign | length | content(11) |
  • | 1 | 1 | 1 | 8 | 8 | 32 | 4 | "HELLO,WORLD" | 1 | 1 | 1 | 8 | 8 | 32 | 4 | "HELLO,WORLD" |
  • +------+--------+------+------------------------------------------------+ +------+--------+------+-----------------------------------------------+
   // Запрос REQUEST((byte) 1), // Ответ RESPONSE((byte) 2), // Пинг HEARTBEAT((byte) 3);
   private byte cmd;
   // Тип сериализации данных (content) JSON_SERIAL JAVA_SERIAL
   private byte serial;
   // Тип шифрования // Открытый текст NONE((byte) 0),// AES AES((byte) 2), // RSA RSA((byte) 3);
   private byte encrypt;
   // Временная метка, аналогичная соли
   private long timestamp;
   // Идентификатор клиента -1 означает сервер
   private long client;
   // Подпись сообщения MD5 фиксированной длины 32 байта
   private String sign;
   // Длина content
   private int length;
   // Содержимое
   private T content;
```## Клиент

Поддерживает автоматический перезапуск и подключение

Поддерживает ручное закрытие соединения

Поддерживает многопоточное отправление сообщений

### Интерфейс клиента

NettyClient {

int state(); // Состояние клиента

void start(); // Запуск

void restart(); // Перезапуск

void close(); // Закрытие

long getClientId(); // Получение идентификатора клиента

void defaultChannel(Channel channel); // Установка по умолчанию канала

<M extends Serializable> void writeAndFlush(Channel channel, M msg, Cmd type); // Отправка сообщения

<M extends Serializable> void writeAndFlush(ChannelHandlerContext channel, M msg, Cmd type); // Отправка сообщения

// Сохраненный интерфейс void writeAndFlush(M msg, Cmd type); }


### Пример клиента
  BizNettyClient client = NettyClientFactory.buildBizNettyClient(new Address("127.0.0.1", 9999),
            () -> Arrays.asList(new NettyReqHandler())); // Собственный NettyReqHandler обработчик сообщений
    client.start(); // Запуск

### Собственная аннотация @RpcHandler для обработки сообщений

Приходящие сообщения от сервера обрабатываются с помощью прокси и рефлексии для выполнения удаленных методов RPC
@RpcHandler("complexParam")
public Object complexParam(Map<String, Long> param) {
    Thread thread = Thread.currentThread();
    log.info(" client received a map data = {} ,thread={}", param, thread);

    return "map is OK  --bbq";
}

### Собственный NettyReqHandler обработчик сообщений protected void channelRead0(ChannelHandlerContext ctx, SafeFrame<Frame>> data) throws Exception { byte cmd = data.getCmd(); if (cmd == Cmd.REQUEST.code()) { Frame> frame = data.getContent(); String name = frame.getTarget(); // Использование пула потоков для обработки задач getExecutor().execute(() -> { // Прокси-вызов метода RpcInvokeProxy.invoke(ret -> { // Возврат данных Frame resp = new Frame<>(); resp.setData(ret); writeAndFlush(ctx.channel(), resp, Cmd.RESPONSE);


Поддерживает управление клиентами

Поддерживает отправку сообщений, закрытие клиентов и другие операции через API

### Интерфейсы сервера

// Состояние сервера int state(); // Запуск сервера void start(); // Перезапуск сервера void restart(); // Остановка сервера void stop(); // Получение идентификатора сервера long getServerId();

// Отправка сообщений void writeAndFlush(Channel channel, M msg, Cmd type);

void writeAndFlush(ChannelHandlerContext channel, M msg, Cmd type);

// Добавление канала клиента void addChannel(String clientId, Channel channel); // Закрытие канала по идентификатору клиента void closeChannel(String clientId); // Получение всех каналов клиентов Collection allChannels(); // Получение канала клиента по идентификатору Channel findChannel(String clientId); // Вывод информации о всех каналах void showChannels();


### Пример использования сервера

BizNettyServer server = NettyServerFactory.buildBizNettyServer( new Address("127.0.0.1", 9999), () -> Arrays.asList(new NettyServerHandler())); server.start();


### Обработчики сервера

@Override protected void channelRead0(ChannelHandlerContext ctx, SafeFrame<Frame>> data) throws Exception { byte cmd = data.getCmd(); // Запрос if (cmd == Cmd.REQUEST.code()) { Frame> frame = data.getContent(); log.info(" server get REQUEST data = {}", frame); // Отправка ответа // writeAndFlush(ctx.channel(), resp, Cmd.REQUEST); } // Ответ if (cmd == Cmd.RESPONSE.code()) { Frame<?> frame = data.getContent(); log.info("server get RESPONSE data = {}", frame); } else { //ctx.fireChannelRead(data); } }


### Сериализация Java

```java
public <T> byte[] serialize(T obj) {
    ByteArrayOutputStream byteArrayOutputStream =
            new ByteArrayOutputStream();
    try {
        ObjectOutputStream outputStream =
                new ObjectOutputStream(byteArrayOutputStream);

JSON-сериализация (Gson)

@Override
public <T> byte[] serialize(T obj) {
    return GsonTool.toJson(obj).getBytes(StandardCharsets.UTF_8);
}

@Override
public <T> T deserialize(byte[] data, Class<T> clazz) {
    return GsonTool.fromJson(new String(data), clazz);
}

Проблемы, связанные с протоколом данных Frame

1. Роль LengthFieldBasedFrameDecoder

LengthFieldBasedFrameDecoder — декодер с пользовательским определением длины, который решает проблему TCP-склеивания. Поэтому LengthFieldBasedFrameDecoder также известен как декодер с пользовательским определением длины.

1.1 TCP-склеивание и явление склеивания

  • TCP-склеивание означает, что несколько отправленных данных пакетов от отправителя приходят к получателю как один пакет. Из точки зрения буфера приема, конец одного пакета данных следует сразу за концом предыдущего пакета данных.
  • После установки TCP-соединения клиент отправляет несколько пакетов данных серверу. TCP-протокол гарантирует надежность данных, но не гарантирует, что клиент отправил n пакетов, а сервер получил n пакетов. Если клиент отправил n пакетов данных, сервер может получить n-1 или n+1 пакет.

1.2 Почему возникает явление склеивания-

1. Причина отправителя: TCP по умолчанию использует алгоритм Nagle. Алгоритм Nagle выполняет две основные задачи: 1) отправка следующего пакета только после подтверждения предыдущего пакета; 2) сбор нескольких маленьких пакетов и отправка их вместе при получении подтверждения. Поэтому алгоритм Nagle может вызвать склеивание данных у отправителя.
    1. Причина получателя: TCP-получатель использует буфер для чтения данных пакетов, читая несколько пакетов данных из одного буфера за один раз. Естественно, конец одного пакета данных и начало следующего пакета данных сливаются вместе. ### 1.3 Как решить проблему склеивания пакетов-
    2. Добавление специальных символов, приемник использует этот специальный символ для разделения полученных пакетов — DelimiterBasedFrameDecoder (специальный декодер на основе разделителя)
    1. Отправка пакетов фиксированной длины — FixedLengthFrameDecoder (декодер на основе фиксированной длины)
    1. Определение поля длины в заголовке сообщения, чтобы указать общую длину сообщения — LengthFieldBasedFrameDecoder (декодер на основе поля длины)

2. Как использовать LengthFieldBasedFrameDecoder

    1. LengthFieldBasedFrameDecoder по своей природе является ChannelHandler, который обрабатывает входящие события
    1. LengthFieldBasedFrameDecoder должен быть добавлен в ChannelPipeline и расположен в начале цепочки

3. Параметры LengthFieldBasedFrameDecoder

LengthFieldBasedFrameDecoder является пользовательским декодером на основе поля длины, поэтому в конструкторе используется 6 параметров, которые описывают определение поля длины.- 1. maxFrameLength - максимальная длина данных в фрейме

2. lengthFieldOffset - определение смещения поля длины в отправляемом массиве байтов. Другими словами: в отправляемом массиве байтов смещение поля длины начинается с индекса ${lengthFieldOffset}
    1. lengthFieldLength - описание длины поля длины. Другими словами: при отправке массива байтов bytes, область bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength] соответствует определенному полю длины
    1. lengthAdjustment - удовлетворяет формуле: длина массива байтов bytes - lengthFieldLength = bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength] + lengthFieldOffset + lengthAdjustment
    1. initialBytesToStrip - удаление первых initialBytesToStrip байтов из полученного отправленного пакета
    1. failFast - true: если прочитанное поле длины превышает maxFrameLength, выбрасывается исключение TooLongFrameException. false: исключение TooLongFrameException выбрасывается только после полного чтения всех байтов, указанных значением поля длины. По умолчанию установлено значение true, рекомендуется не изменять, иначе может произойти переполнение памяти
    1. ByteOrder - порядок хранения данных (big-endian или little-endian)

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

На основе rpc-фреймворка netty, поддерживающего пользовательские протоколы, поддерживается прямой запуск клиента или интеграция с spring, обработка с помощью пользовательского обработчика, прост в использовании. Развернуть Свернуть
MIT
Отмена

Обновления

Пока нет обновлений

Участники

все

Язык

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/lingfengx-x-netty-rpc.git
git@api.gitlife.ru:oschina-mirror/lingfengx-x-netty-rpc.git
oschina-mirror
lingfengx-x-netty-rpc
lingfengx-x-netty-rpc
master