1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/tmq777-t-rpc-framework

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

Агентский класс через Future ожидает получения значения. В течение заданного времени (конфигурированное время ожидания плюс 50 миллисекунд на передачу) после успешного получения, кэш удаляется и возвращается ответ. Если в течение указанного времени не будет получен успешный ответ, текущий ключ связи удаляется из кэша и, в зависимости от ситуации, помещается в очередь задержки. По умолчанию очередь очищается через 5 секунд.

Теоретически, в кэше не должно быть недействительных данных.

  1. При выполнении RPC-вызова, если параметры вызывающей стороны содержат пользовательский POJO-класс, то после декодирования на удалённой стороне этот тип преобразуется в LinkedHashMap. Проще говоря, если соответствующий тип не найден, он автоматически преобразуется в LinkedHashMap. Эта функция зависит от пакета jackson. При настройке сериализации необходимо обратить на это особое внимание.

Пример:

A необходимо вызвать службу B. В это время в параметрах A есть пользовательский класс, но в среде развёртывания B этого класса нет.

// A развёрнут в среде и вызывает службу B
bService.index(Any param);

В этом случае B может правильно получить параметр, но параметр param будет преобразован в LinkedHashMap. Ключ-значение = атрибут имени-значения.

B возвращает ответ A, который содержит тип, отсутствующий в среде A. У A есть два способа обработки ответа: 1 — использовать Map для приёма, 2 — использовать объект, атрибуты которого соответствуют возвращаемому телу ответа.

// Например, B возвращает объект, содержащий {name: "a", age: 16} в ответе
// Когда ответ возвращается в A, A может использовать любой атрибут, содержащий name и age, для получения пользовательского типа или напрямую использовать Map
CustomType res = bService.index(Any param);
// Пользовательский тип
class CustomType {
    private String name;
    private int age;
}

При совпадении ключей-значений можно свободно выбирать способ приёма.

Необходимо обратить внимание на реализацию этой функции при использовании настраиваемой стратегии сериализации.


Основные зависимости

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.0</version>
</dependency>

Описание основного кода

Класс автоматической конфигурации

cn.t.rpc.config.RpcAutoConfiguration

@Configuration
// @TRpcServiceScanner(basePackages = {"cn.t.rpc"})  // ※注意点1
public class RpcAutoConfiguration {

    /**
     * Клиентский конфигурационный класс
     * @return клиентский конфигурационный класс
     */
    @Bean
    @ConfigurationProperties(prefix = "rpc.service")
    public RpcConfigProperties rpcConfigProperties() {
        RpcConfigProperties properties = new RpcConfigProperties();
        properties.setSerializationStrategy(serializationStrategy());
        return properties;
    }
    
    /**
     * Контейнер после обработки
     * @return
     */
    @Bean
    public TRpcFrameworkAware tRpcFrameworkAware() {
        return new TRpcFrameworkAware();
    }
    
    /**
     * Инструмент сериализации<br>
     * По умолчанию используется инструмент Json от Jackson
     * @return инструмент сериализации
     */
    @Bean
    @ConditionalOnMissingBean(SerializationStrategy.class)
    public SerializationStrategy serializationStrategy() {
        return new SerializationByJson();
    }
    
    /**
     * Обнаружение службы
     * @return Bean обнаружения службы
     */
    @Bean
    @ConditionalOnMissingBean(TRpcDiscoveryStrategy.class)
    public TRpcDiscoveryStrategy rpcDiscovery() {
        return new TRpcDiscoveryByProperties();
    }
}

Класс автоматической настройки по умолчанию упаковывает все необходимые конфигурационные объекты для запуска всего фреймворка TRpcConfigProperties, а также по умолчанию настраивает сериализацию Json и стратегию поиска сервисов из памяти.

**Чтобы получить элемент конфигурации фреймворка, введите TRpcConfigProperties.

Чтобы настроить сериализатор, реализуйте интерфейс SerializationStrategy и создайте Bean.

Для настройки обнаружения сервиса реализуйте интерфейс TRpcDiscoveryStrategy и создайте Bean.

Примечание 1: код с закомментированным `@TRpcServiceScanner (basePackages = {"cn.t.rpc"}) должен быть добавлен в собственный класс конфигурации и указан путь к пакету, иначе невозможно сканировать необходимый интерфейс прокси!!!**

resources\META-INF\spring.factories путь уже настроен для автоматического подключения входа

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.t.rpc.config.RpcAutoConfiguration

Согласованный тип данных и формат (кодер-декодер)

Тип данных по умолчанию для передачи данных — Json, базовый класс сериализации — cn.t.rpc.core.TrafficData, по умолчанию наследует три подкласса:

  1. TRequest — данные запроса.
  2. TResponse — данные ответа.
  3. TIdle — данные для проверки сердцебиения.

Для данных передачи поля имён в классе данных были максимально сокращены для обеспечения семантической безопасности, чтобы уменьшить объём данных при сериализации и повысить производительность связи.

Формат данных передачи следующий:

  1. Тип сериализации (используется для различения основных функций и последующих расширенных функций).
  2. Длина байта действительных бизнес-данных.
  3. Текущий тип передаваемых данных (запрос или ответ, этот атрибут в настоящее время не имеет практического бизнес-значения).
  4. Бизнес-данные.
  5. Терминатор.

Например (с использованием вертикальной черты в качестве разделителя, 0 представляет каждый байт):

0000000|0000|0000|00000...000000|000

Кодировщик выглядит следующим образом (только основная часть):

@Override
protected void encode(ChannelHandlerContext ctx, TrafficData msg, ByteBuf out) throws Exception {
    try {
        // do something
        byte[] data = this.strategy.serialize(msg);
        // do something
        int len = data.length;
        // записать тип сериализации - длина массива байтов, соответствующая строке типа

*Примечание: в тексте запроса присутствуют фрагменты кода на языке Java, которые не были переведены.* ```
out.writeBytes(TRpcConstants.RPC_TRAFFIC_TYPE.getBytes(CharsetUtil.UTF_8)); 
// 写入长度-占4位
out.writeInt(len);
// 写入类型-占4位
out.writeInt(msg.getType().value());
// 写入数据
out.writeBytes(data);
// 写入终止符
out.writeBytes(TRpcConstants.RPC_TRAFFIC_EOF.getBytes(CharsetUtil.UTF_8));
} catch(Exception e) {
    logger.error("TrafficEncoder: 编码异常...本次连接关闭");
    logger.error(e.getMessage());
    ctx.close();
}

解码器如下 (只贴出核心部分):

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    try {
        // 1.判断长度
        if (in.readableBytes() <= TRpcConstants.RPC_TRAFFIC_VALID_LEN) {
            logger.warn("TrafficDecoder: 传输的数据不合法...数据长度不正确...移交后续解码器处理...");
            // 中止处理,触发后续管道操作
            ctx.fireChannelRead(in);
            return;
        }
        // 2.判断类型
        // 读取序列化类型字节数组
        byte[] types = new byte[TRpcConstants.RPC_TRAFFIC_TYPE_LEN];
        try {
            in.readBytes(types);
        } catch (IndexOutOfBoundsException e) {
            logger.debug("TrafficDecoder: 类型不正确...移交后续解码器处理...");
            // 重置读取的index避免影响后续解码器
            in.resetReaderIndex();
            ctx.fireChannelRead(in);
            return;
        }
        if (!TRpcConstants.RPC_TRAFFIC_TYPE.equals(new String(types, CharsetUtil.UTF_8))) {
            logger.debug("TrafficDecoder: 类型不正确...移交后续解码器处理...");
            in.resetReaderIndex();
            ctx.fireChannelRead(in);
            return;
        }
        // 读取报文的数据长度
        int len = in.readInt();
        if (len <= 0) {
            logger.error("TrafficDecoder: 传输的数据不合法...数据长度不正确...");
            // 直接关闭连接,不再接收,简单防止攻击
            ctx.close();
            return;
        }

        // 读取类型
        int type = in.readInt();
        // 剩余的数据量长度(数据长度加上终止符长度)
        int validLen = len + TRpcConstants.RPC_TRAFFIC_EOF_LEN;
        if (in.readableBytes() < validLen) {
            // 此时有可能数据量大,还未读完,等待下一次读取
            in.resetReaderIndex();
        } else if (in.readableBytes() > validLen) {
            // 进入这个case中有两种情况
            // 1: 业务设计ok的情况下,进入此分支则出现了沾包问题(两次消息利用同一个channel无间隔发送)
            // 2: 数据被篡改了
            // =======进行拆包======
            byte[] body = new byte[len];
            // 将可读缓冲区有效业务部分字节数据读入临时数组
            in.readBytes(body);
            // 判断之后是否是终止符
            byte[] eof = new byte[TRpcConstants.RPC_TRAFFIC_EOF_LEN];
            in.readBytes(eof);
            // 判断是否是终止符
            if (TRpcConstants.RPC_TRAFFIC_EOF.equals(new String(eof, CharsetUtil.UTF_8))) {
                // 保存当前读取位置,此处拆包完成
                TrafficData traffic = JsonUtil.convertToObject(body, TrafficTypeHelper.getTrafficClassType(type));
                out.add(traffic);
                logger.debug("TrafficDecoder: 数据流(拆包)读取完毕...长度:" + len + " ...类型:"
                             + TrafficType.findName(type));
                // 保存当前的readerIndex位置,读取下一个包的数据
                in.markReaderIndex();
            } else {
                logger.error("TrafficDecoder: 传输的数据不合法...数据被篡改...");
                // 直接关闭连接,不再接收
                ctx.close();
            }
        } else {
            logger.debug("TrafficDecoder: 数据流读取完毕...长度:" + len + " ...类型:" + TrafficType.findName(type));

            byte[] body = new byte[len];
            // 将剩余的缓冲区字节数据读入临时数组
            in.readBytes(body);
            // 将终止符读完(注意,netty中in.readBytes传入的参数为int类型时会生成新的ByteBuf,此时需要释放这个新的ByteBuf,否则会内存泄露)
            // in.readBytes(TRpcConstants.RPC_TRAFFIC_EOF_LEN);
            in.readBytes(new byte[TRpcConstants.RPC_TRAFFIC_EOF_LEN]);

            TrafficData traffic = this.strategy.deserialize(body, TrafficTypeHelper.getTrafficClassType(type));
            out.add(traffic);

            // 保存当前的readerIndex位置(本次有可能是沾包数据,但是刚好一半一半)
            in.markReaderIndex();
        }
    } catch (Exception e) {
        logger.error("TrafficDecoder: 解码异常...本次连接关闭...异常信息{}", e.getMessage());
        ctx.close();
    }
}
``` ### Клиентское программное обеспечение: сервис обнаружения

Сервис обнаружения клиента по умолчанию использует способ чтения файла `properties`. Файл находится в `/resources/rpc-server.properties`.

Формат файла:
```properties
#serverId=host:port
# 服务器id=IP地址:порт
# Много экземпляров сервера id указывают на один и тот же, разделяя их запятыми
app=127.0.0.1:8081,127.0.0.1:18081
time=127.0.0.1:18081

При нормальном запуске класс конфигурации по умолчанию генерирует стратегию обнаружения сервиса на основе файла.

/**
 * Сервис обнаружения
 * @return Сервис обнаружения Bean
 */
@Bean
@ConditionalOnMissingBean(TRpcDiscoveryStrategy.class)
public TRpcDiscoveryStrategy rpcDiscovery() {
return new TRpcDiscoveryByProperties();
}

Кроме того, фреймворк также предоставляет стратегию обнаружения сервисов на основе zookeeper в качестве центра регистрации. Необходимо настроить TRpcDiscoveryByZk как Bean.

В TRpcDiscoveryByZk вместе с инструментами класса пула соединений уже реализована функция динамического обновления сервисов.

@Configuration
@TRpcServiceScanner(basePackages = { "aaa.bbb.ccc" })
public class XXXConfig {
    // Вводим конфигурацию
    @Autowired
    private TRpcConfigProperties rpcConfigProperties;

    /**
     * Обнаружение сервиса на основе zookeeper
     * @return Стратегия обнаружения сервиса Bean
     * @throws Exception 
     */
    @Bean
    public TRpcDiscoveryStrategy rpcDiscoveryZk() throws Exception {
        return new TRpcDiscoveryByZk(rpcConfigProperties);
    }
}

Автоматическое проксирование клиентского интерфейса

Автоматическое проксирование интерфейса в основном зависит от реализации аннотации TRpcServiceScanner, где @Import используется для импорта RpcServiceScanner.class.

@Retention(RUNTIME)
@Target(TYPE)
@Import(RpcServiceScanner.class)
public @interface TRpcServiceScanner {
    String[] basePackages();
}

В классе RpcServiceScanner получают значение basePackages аннотации TRpcServiceScanner, а затем сканируют и создают определение bean.

public class RpcServiceScanner implements ImportBeanDefinitionRegistrar {
    
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        
        // Получаем метаданные аннотации импортированного класса
        AnnotationAttributes annotationAttributes = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(TRpcServiceScanner.class.getName()));

        // Получаем указанный путь пакета сканирования
        String[] basePackages = annotationAttributes.getStringArray("basePackages");

        RpcClassPathBeanDefinitionScanner scanner = new RpcClassPathBeanDefinitionScanner(registry, false);
        try {
            scanner.scanRpc(registry, basePackages);
        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | IOException e) {
            e.printStackTrace();
        }
    }
}

Класс использует RpcClassPathBeanDefinitionScanner для сканирования файлов Class.

public class RpcClassPathBeanDefinitionScanner extends ClassPathBeanDefinitionScanner {
    
    public RpcClassPathBeanDefinitionScanner(BeanDefinitionRegistry registry, boolean useDefaultFilters)
             {
        super(registry, useDefaultFilters);
    }

    protected void scanRpc(BeanDefinitionRegistry registry, String... basePackages) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
        // Добавляем фильтр типа сканирования
        super.addIncludeFilter(new AnnotationTypeFilter(TRpcRemoteService.class));
        // Сканируем пакет для получения метаданных — поскольку spring по умолчанию doScan игнорирует верхний уровень интерфейса, необходимо вручную переписать эту логику
        Set<BeanDefinition> candidates = new LinkedHashSet<>();
        ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
        for (String basePackage: basePackages) {
            // Нижеприведённая логика напрямую копирует код spring
            String searchPath = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + basePackage.replace(".", File.separator)  + File.separator + "/**/*.class";
            // Сканируем все классы в пакете
            Resource[] resources = resourcePatternResolver.getResources(searchPath);
            for (Resource resource : resources) {
                MetadataReader metadataReader = getMetadataReaderFactory().getMetadataReader(resource);
                ScannedGenericBeanDefinition sbd = new ScannedGenericBeanDefinition(metadataReader);
                sbd.setSource(resource);
                // Добавляем в список кандидатов
                candidates.add(sbd);
            }
        }
        
        for (BeanDefinition s : candidates) {
            GenericBeanDefinition definition = (GenericBeanDefinition) s;
            Class<?> clazz = Class.forName(definition.getBeanClassName());
            Annotation[] annotations = clazz.getAnnotations();
            for (Annotation a : annotations) {
                // **Текст запроса**:

判断注解类型和Class类型(只代理接口) if (TRpcRemoteService.class.isAssignableFrom(a.annotationType()) && clazz.isInterface()) { // 全限定类(接口)名 String className = definition.getBeanClassName(); String beanName = className.substring(className.lastIndexOf(".")); // 按类型自动装配 definition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE); // 设置生成Bean实例的工厂 definition.setBeanClass(RpcProxyBeanFactory.class); // 设置创建对象工厂的构造方法参数 definition.getConstructorArgumentValues().addIndexedArgumentValue(0, className); // 注册bean registry.registerBeanDefinition(beanName, definition); } } } } }


**Перевод текста на русский язык**:

Определить тип аннотации и тип класса (только прокси-интерфейс).
Если (TRpcRemoteService.class может быть присвоен из a.annotationType() и класс является интерфейсом), то:
* // полное имя класса (интерфейса)
  Строка className = определение.getBeanClassName();
  Строка beanName = className.подстрока(className.последний индекс("."));
* // автоматическая сборка по типу
  определение.setAutowireMode (GenericBeanDefinition.AUTOWIRE_BY_TYPE);
* // установить фабрику для создания экземпляров Bean
  определение.setBeanClass (RpcProxyBeanFactory.класс);
* // задать параметры конструктора фабрики объектов
  определение.getConstructorArgumentValues ().addIndexedArgumentValue (0, className);
* // зарегистрировать bean
  реестр.registerBeanDefinition (имя_компонента, определение);
}

### Клиентский пул соединений

В случае нескольких серверов клиент будет распределять пулы соединений в соответствии с каждым сервером serverId. В качестве базового пула соединений используется FixedChannelPool от netty, а также AbstractChannelPoolMap для установления связи между serverId и пулом соединений. Затем пулы соединений с одинаковым serverId объединяются в TClientPoolWrapper для достижения высокой доступности. Кроме того, в классе инструментов пула соединений запускается задача по расписанию для очистки недоступных пулов соединений.

Основная логика (некоторые операции log удалены):

```java
/**
 * Инициализация пула соединений
 *
 * @param addrList список адресов служб
 */
public void init(List<TRpcRemoteAddress> addrList) {

    if (this.config == null) {
        this.failureMsg.append("1. Не настроено соединение пула... Ошибка инициализации соединения пула... Необходимо вызвать метод config() для настройки...");
    }

    if (addrList == null || addrList.isEmpty()) {
        this.failureMsg.append("2. Не задано удалённое местоположение...");
    }}

    this.workerGroup = new NioEventLoopGroup();
    this.bootstrap = new Bootstrap();

    this.bootstrap.group(this.workerGroup).channel(NioSocketChannel.class)
        .option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.config.getConnectionTimeout());

    // Инициализируем poolMap
    poolMap = new AbstractChannelPoolMap<TRpcRemoteAddress, FixedChannelPool>() {

        @Override
        protected FixedChannelPool newPool(TRpcRemoteAddress key) {
            return new FixedChannelPool(bootstrap.remoteAddress(key.getRemoteAddress()),
                                        new TClientChannelPoolHandler(config), config.getMaxPoolConnection());
        }
    };

    // Предварительное создание
    if (addrList != null && !addrList.isEmpty()) {
        // Создаём пул соединений
        this.initPool(addrList);
        // Очистить информацию об ошибке операции
        this.failureMsg.delete(0, this.failureMsg.length());
        // Запускаем таймер обновления
        this.startTimer();
    } else {
        this.failureMsg.append("3. Не удалось получить удалённое расположение, предварительное создание пула соединений не завершено...");
    }
}
/**
 * Создание пула соединений
 *
 * @param addrList Список служб
 */
private void initPool(List<TRpcRemoteAddress> addrList) {
    this.rpcMaps = (this.rpcMaps == null || this.rpcMaps.size() == 0) ? new HashMap<>(32) : this.rpcMaps;
    addrList.forEach(addr -> {
        // poolMap вызывает get для получения пула соединений по умолчанию, если он уже создан, то сразу возвращает
        String id = addr.getServerId();
        if (this.rpcMaps.containsKey(id)) {
            // одинаковые соединения не будут добавляться повторно
            if (!this.poolMap.contains(addr)) {
                this.rpcMaps.get(id).add(new TClientPoolWrapper(addr, this.poolMap.get(addr)));
            }
        } else {
            List<TClientPoolWrapper> poolList = new ArrayList<>();
            poolList.add(new TClientPoolWrapper(addr, this.poolMap.get(addr)));
            this.rpcMaps.put(id, poolList);
        }
    });

    // Успешная инициализация
    this.status = true;
}

Вышеупомянутый this.rpcMaps — это пул сопоставления клиентских служб, и один и тот же удалённый сервис будет отображаться в одном списке. Атрибуты следующие:

private Map<String, List<TClientPoolWrapper>> rpcMaps;

При удалённом вызове канал будет получен из следующего метода, и после получения канала будет выполнен простой случайный опрос, чтобы получить подчинённого текущего канала для последующего освобождения.

/**
 * Получение канала указанного сервиса
 *
 * @param key идентификатор службы
 * @return канал
 */
public Channel getPooledChannel(String key) {
    if (this.rpcMaps == null || this.rpcMaps.isEmpty()) {
        throw new TRpcOperationException("Соединение пула не инициализировано... Необходимо вызвать метод init() для инициализации...");
    }

    List<TClientPoolWrapper> poolList = rpcMaps.get(key);

    if (poolList == null || poolList.isEmpty()) {
        throw new TRpcConnectionException("Удалённое соединение потеряно... Нет доступного пула соединений...");
    }

    // Случайный выбор по модулю массива
    int random = this.rand.nextInt(1024);
    int retryCnt = 0;
    // Гарантируем, что каждый пул может попробовать
    int limit =
``` **poolList.size();**

int index = 0;
Channel channel = null;
do {
    index = random % limit;
    // Использование случайного индекса для получения соединения из пула
    channel = poolList.get(index).acquireChannel(this.config.getRetryCnt(), this.config.getReconnectionTime());
    // Если получение не удалось, то увеличить случайное число и повторить попытку с другим пулом
    random++;
    retryCnt++;
} while (channel == null && retryCnt < limit);

if (channel == null) {
    throw new TRpcCoreException("Невозможно получить канал соединения...");
}

// Привязать атрибут к текущему каналу, указав его принадлежность к определённому пулу
AttributeKey<Map<String, Integer>> attrKey = AttributeKey.valueOf(POOL_KEY);
Map<String, Integer> metaMap = new HashMap<>();
metaMap.put(key, index);
channel.attr(attrKey).set(metaMap);
return channel;
}

В классе TClientPoolWrapper выполняется проверка работоспособности и повторное подключение к пулам соединений. Код для получения канала:

/**
 * Получение канала
 *
 * @param retryCnt         максимальное количество попыток получения канала
 * @param reconnectionTime  максимальное время для повторного подключения пула
 * @return канал
 */
public Channel acquireChannel(int retryCnt, int reconnectionTime) {

    // Пул недоступен, сразу генерируем исключение
    if (!this.isHealthy()) {
        logger.warn("Соединение с пулом соединений {} недоступно... Проверьте состояние сети...", this.key.getAddressString());
        // Переходим к следующему пулу
        return null;
    }

    Future<Channel> channeFuture = this.pool.acquire();
    Channel channel = null;
    try {
        channel = channeFuture.get();
    } catch (InterruptedException | ExecutionException e) {
        // Проверка работоспособности, повторная попытка
        this.increaseFailureCnt(retryCnt, reconnectionTime);
    }
    return channel;
}

Основная логика реализации прокси-класса (синхронизация данных между потоками, привязка)

Прокси-класс наследуется от класса, реализующего интерфейс InvocationHandler, который вызывает метод action для выполнения основной бизнес-логики.

@Override
protected Object action(Method method, Object[] args) throws TRpcTimeoutException, TRpcConnectionException {

    if (!CHANNEL_POOL.isReady()) {
        throw new TRpcConnectionException("Пул соединений не инициализирован успешно... Информация об операции: " + CHANNEL_POOL.getFailureMsg());
    }

    // Получаем указанный идентификатор сервера
    TRpcRemoteService a = method.getDeclaringClass().getAnnotation(TRpcRemoteService.class);
    String serverId = a.serverId();

    // Получаем коммуникационный канал
    Channel ch = CHANNEL_POOL.getPooledChannel(serverId);

    TRequest req = new TRequest();
    // Метод должен совпадать, чтобы можно было вызвать
    req.setMethod(method.getName());
    req.setParams(args);
    // Класс службы, к которой осуществляется вызов
    String className = a.className();
    if (className.isBlank()) {
        className = method.getDeclaringClass().getSimpleName();
    }
    req.setClsNm(className);

    // Предварительно создаём ответ
    CompletableFuture<TrafficData> trafficFuture = TRpcTrafficDataContextHolder.newFuture(req.getId());

    ch.writeAndFlush(req).addListener((f) -> {
        if (f.isSuccess()) {
            logger.debug("Отправка прошла успешно...");
        } else {
            logger.warn("Удаленный вызов {} не удался...", method.getName());
            // В случае неудачи очищаем кэш
            TRpcTrafficDataContextHolder.removeCache(req.getId());
        }
        // Освобождаем канал после использования
        CHANNEL_POOL.release(ch);
    }).syncUninterruptibly();

    try {
        TResponse res = trafficFuture.get(TRpcTrafficDataContextHolder.getTimeout() + 50, TimeUnit.MILLISECONDS).convert();
        if (res.getStatus() != 200) {
            throw new TRpcResponseException(res.getMsg());
        } else {
            return res.getBody();
        }

    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        logger.error("Исключение RPC-вызова... Сообщение об ошибке: {}", e.getMessage());
        // Удаляем кэш для текущего запроса в случае исключения
        TRpcTrafficDataContextHolder.removeCache(req.getId());
        throw new TRpcOperationException(e.getMessage(), e);
    }
}

Для многопоточной синхронизации данных и привязки прокси-класс использует TRpcTrafficDataContextHolder. Перед отправкой запроса создаётся будущее значение ответа и сохраняется в кэше TRpcTrafficDataContextHolder. В случае возникновения исключения этот ответ удаляется.

Обратите внимание: при вызове метода get помимо указанного времени ожидания добавляется ещё 50 миллисекунд, что является дополнительным временем для учёта передачи данных по сети.

Полный код TRpcTrafficDataContextHolder:

public final class TRpcTrafficDataContextHolder {

    private static final Logger logger = LoggerFactory.getLogger(TRpcTrafficDataContextHolder.class);

    /**
     * Кэш результатов ответов, разделенный на разные ответы по ключу<br>
     * Инициализация хэш-карты с емкостью для уменьшения расширения
     */
    private static final ConcurrentHashMap<String, CompletableFuture<TrafficData>> resultMap = new ConcurrentHashMap<>(
            128);

    /**
     * 
``` **Отсроченная неограниченная очередь для своевременного удаления недействительных ответов в экстремальных ситуациях**

     */
    private static final DelayQueue<InvalidKey> INVALIDKEY_QUEUE = new DelayQueue<>();

    /**
     * Поток очистки кэша
     */
    private static final Thread sweeper = new Thread(() -> {
        while (true) {
            try {
                logger.info("продолжайте очищать кэш...");
                resultMap.remove(INVALIDKEY_QUEUE.take().getKey());
            } catch (InterruptedException e) {
                logger.warn("поток очистки был прерван...");
            }
        }
    }, "the sweeper");

    static {
        sweeper.start();
    }

    private TRpcTrafficDataContextHolder() throws IllegalAccessException {
        throw new IllegalAccessException("Незаконная операция...");
    }

    /**
     * Время ожидания
     */
    private static int TIMEOUT = 2000;

    public static void setTimeout(int timeout) {
        TIMEOUT = timeout;
    }

    public static int getTimeout() {
        return TIMEOUT;
    }

    /**
     * Создание нового объекта CompletableFuture результата
     *
     * @param trafficId уникальный идентификатор
     * @return пустой объект CompletableFuture
     */
    public static CompletableFuture<TrafficData> newFuture(String trafficId) {
        CompletableFuture<TrafficData> future = new CompletableFuture<>();
        var old = resultMap.putIfAbsent(trafficId, future);
        if (old != null) {
            throw new TRpcDuplicateKeyException();
        }
        return future;
    }

    /**
     * Завершение указанного future, вызывается после получения ответа клиентом
     *
     * @param data данные ответа
     */
    public static void complete(TrafficData data) {
        CompletableFuture<TrafficData> future = resultMap.remove(data.getId());
        if (future != null) {
            future.complete(data);
        } else {
            throw new TRpcCoreException("future not found...");
        }
    }

    /**
     * Удаление ответа из кэша
     *
     * @param trafficId уникальный идентификатор
     * @return ответ из кэша
     */
    public static CompletableFuture<TrafficData> removeCache(String trafficId) {
        CompletableFuture<TrafficData> invalidData = resultMap.remove(trafficId);
        // Отсрочка перед удалением (гарантирует, что в экстремальных условиях, когда поток ожидает максимального значения и условия ответа клиента выполняются, кэш не может быть нормально использован)
        if (invalidData == null) {
            INVALIDKEY_QUEUE.add(new InvalidKey(trafficId));
        }
        return invalidData;
    }
}

В случае нормального ответа, после завершения ответа на основе запроса, созданного во время запроса, будет найден соответствующий предварительно созданный Future для завершения обработки.

### Событие ответа клиента

После получения ответа в цепочке обработчиков клиентского кода, он сравнивает разницу между временем начала ответа сервера и текущим временем с указанным временем ожидания запроса в файле конфигурации, чтобы контролировать обработку тайм-аута. Здесь используется разница между текущим временем и временем генерации ответа сервером, без тщательного учёта времени передачи данных (в прокси-классе добавлено увеличение на 50 мс), поэтому могут возникнуть ситуации, когда Future запроса истекает раньше времени. В описанном выше TRpcTrafficDataContextHolder поддерживается отложенная очередь, и если ключ не найден в кэше при удалении, этот ключ будет добавлен в очередь. Через 5 секунд он будет удалён снова.

```java
@Override
protected void channelRead0(ChannelHandlerContext ctx, TrafficData msg) throws Exception {
    logger.debug("{}客户端{}读取响应信息...", TRpcConstants.LOG_CLIENT_MSG_PREFIX, ctx.channel().remoteAddress());
    try {
        // Время начала ответа
        long ts = msg.getTs();
        // Текущее время
        long curTs = System.currentTimeMillis();
        // Если время превышает время ожидания, то оно отбрасывается и не помещается в кэш (в экстремальных случаях - как раз достигает порога, который будет помещён в кэш, а затем удалён в потоке очистки)
        if (curTs - ts > this.requestTimeout) {
            logger.warn("{}Ответ слишком длинный... Данные были отброшены...", TRpcConstants.LOG_CLIENT_MSG_PREFIX);
        } else {
            // Ответное значение сохраняется в общей карте
            TRpcTrafficDataContextHolder.complete(msg);
        }
    } catch(Exception e) {
        // Удаление из кэша
        TRpcTrafficDataContextHolder.removeCache(msg.getId());
        logger.error("{}Обработка ответа завершилась неудачно... Информация об ошибке:{}", TRpcConstants.LOG_CLIENT_MSG_PREFIX, e.getMessage());
    }
}

Серверная служба сканирования и вызова службы

Когда rpc.service.serviceType установлен на BOTH или SERVER, проект запускается, и другой поток запускает RPC-сервер. Все классы, отмеченные аннотацией TRpcLocalService, сканируются и сохраняются в кэше с использованием имени класса в качестве ключа и экземпляра класса в качестве значения. Основная операция выполняется в классе помощника TLocalServiceHolderContext. При вызове метода используется оптимизация производительности через прокси cglib. После получения запроса сервер вызывает службу.

Код выглядит следующим образом:

public class TLocalServiceHolderContext {

    /**
     * Полное имя класса - информация о классе
     */
    private static final Map<String, ServiceMetaData> SERVICE_MAP = new HashMap<>(32);

    /**
     * Добавить сервис
     * @param name имя класса
     * @param metaData метаданные класса
     */
    public static void addService(String name, ServiceMetaData metaData) {
        SERVICE_MAP.put(name, metaData);
    } ```
    /**
     * Количество локальных сервисов
     * @return Количество локальных сервисов
     */
    public static int size() {
        return SERVICE_MAP.size();
    }

    /**
    * Вызов метода сервиса
    * @param serviceName Имя сервиса
    * @param methodName Имя метода
    * @param params Параметры
    * @return Возвращаемое значение метода сервиса
    */
    public static Object invoke(String serviceName, String methodName, Object[] params) {

        ServiceMetaData metaData = SERVICE_MAP.get(serviceName);

        if (metaData == null) {
            throw new TRpcCoreException("Не найден соответствующий сервис...");
        }

        // Получение типа, создание cglib-прокси и выполнение (быстрее, чем прямое использование рефлексии)
        FastClass fc = FastClass.create(metaData.getBean().getClass());
        FastMethod m = fc.getMethod(metaData.getMethod(methodName));
        try {
            return m.invoke(metaData.getBean(), params);
        } catch (InvocationTargetException e) {
            throw new TRpcCoreException("Ошибка вызова сервиса... Сообщение об ошибке:" + e.getMessage());
        }
    }

    /**
    * Метаданные локального сервиса
    * @author TMQ
    */
    public static class ServiceMetaData {

        public ServiceMetaData(Object bean) {
            this.bean = bean;
        }

        /**
        * Фактический объект сервиса
        */
        private Object bean;

        /**
        * Отображение имени метода на метод объекта
        */
        private Map<String, Method> serviceMethods = new HashMap<>();

        public Object getBean() {
            return bean;
        }

        /**
         * Добавление метода сервиса
         * @param name Имя метода
         * @param method Метод объекта
         */
        public void addServiceMethod(String name, Method method) {
            this.serviceMethods.put(name, method);
        }

        /**
        * Получение метода сервиса
        * @param name Имя метода
        * @return Метод объекта
        */
        public Method getMethod(String name) {
            return this.serviceMethods.get(name);
        }
    }
}

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

На основе RPC-фреймворка Netty. Развернуть Свернуть
Apache-2.0
Отмена

Обновления (2)

все

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/tmq777-t-rpc-framework.git
git@api.gitlife.ru:oschina-mirror/tmq777-t-rpc-framework.git
oschina-mirror
tmq777-t-rpc-framework
tmq777-t-rpc-framework
master