Агентский класс через Future ожидает получения значения. В течение заданного времени (конфигурированное время ожидания плюс 50 миллисекунд на передачу) после успешного получения, кэш удаляется и возвращается ответ. Если в течение указанного времени не будет получен успешный ответ, текущий ключ связи удаляется из кэша и, в зависимости от ситуации, помещается в очередь задержки. По умолчанию очередь очищается через 5 секунд.
Теоретически, в кэше не должно быть недействительных данных.
Пример:
A необходимо вызвать службу B. В это время в параметрах A есть пользовательский класс, но в среде развёртывания B этого класса нет.
// A развёрнут в среде и вызывает службу B
bService.index(Any param);
В этом случае B может правильно получить параметр, но параметр param будет преобразован в LinkedHashMap. Ключ-значение = атрибут имени-значения.
B возвращает ответ A, который содержит тип, отсутствующий в среде A. У A есть два способа обработки ответа: 1 — использовать Map для приёма, 2 — использовать объект, атрибуты которого соответствуют возвращаемому телу ответа.
// Например, B возвращает объект, содержащий {name: "a", age: 16} в ответе
// Когда ответ возвращается в A, A может использовать любой атрибут, содержащий name и age, для получения пользовательского типа или напрямую использовать Map
CustomType res = bService.index(Any param);
// Пользовательский тип
class CustomType {
private String name;
private int age;
}
При совпадении ключей-значений можно свободно выбирать способ приёма.
Необходимо обратить внимание на реализацию этой функции при использовании настраиваемой стратегии сериализации.
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>
cn.t.rpc.config.RpcAutoConfiguration
@Configuration
// @TRpcServiceScanner(basePackages = {"cn.t.rpc"}) // ※注意点1
public class RpcAutoConfiguration {
/**
* Клиентский конфигурационный класс
* @return клиентский конфигурационный класс
*/
@Bean
@ConfigurationProperties(prefix = "rpc.service")
public RpcConfigProperties rpcConfigProperties() {
RpcConfigProperties properties = new RpcConfigProperties();
properties.setSerializationStrategy(serializationStrategy());
return properties;
}
/**
* Контейнер после обработки
* @return
*/
@Bean
public TRpcFrameworkAware tRpcFrameworkAware() {
return new TRpcFrameworkAware();
}
/**
* Инструмент сериализации<br>
* По умолчанию используется инструмент Json от Jackson
* @return инструмент сериализации
*/
@Bean
@ConditionalOnMissingBean(SerializationStrategy.class)
public SerializationStrategy serializationStrategy() {
return new SerializationByJson();
}
/**
* Обнаружение службы
* @return Bean обнаружения службы
*/
@Bean
@ConditionalOnMissingBean(TRpcDiscoveryStrategy.class)
public TRpcDiscoveryStrategy rpcDiscovery() {
return new TRpcDiscoveryByProperties();
}
}
Класс автоматической настройки по умолчанию упаковывает все необходимые конфигурационные объекты для запуска всего фреймворка TRpcConfigProperties
, а также по умолчанию настраивает сериализацию Json и стратегию поиска сервисов из памяти.
**Чтобы получить элемент конфигурации фреймворка, введите TRpcConfigProperties
.
Чтобы настроить сериализатор, реализуйте интерфейс SerializationStrategy
и создайте Bean
.
Для настройки обнаружения сервиса реализуйте интерфейс TRpcDiscoveryStrategy
и создайте Bean
.
Примечание 1: код с закомментированным `@TRpcServiceScanner (basePackages = {"cn.t.rpc"}) должен быть добавлен в собственный класс конфигурации и указан путь к пакету, иначе невозможно сканировать необходимый интерфейс прокси!!!**
resources\META-INF\spring.factories
путь уже настроен для автоматического подключения входаorg.springframework.boot.autoconfigure.EnableAutoConfiguration=\ cn.t.rpc.config.RpcAutoConfiguration
Тип данных по умолчанию для передачи данных — Json
, базовый класс сериализации — cn.t.rpc.core.TrafficData
, по умолчанию наследует три подкласса:
TRequest
— данные запроса.TResponse
— данные ответа.TIdle
— данные для проверки сердцебиения.Для данных передачи поля имён в классе данных были максимально сокращены для обеспечения семантической безопасности, чтобы уменьшить объём данных при сериализации и повысить производительность связи.
Формат данных передачи следующий:
Например (с использованием вертикальной черты в качестве разделителя, 0 представляет каждый байт):
0000000|0000|0000|00000...000000|000
Кодировщик выглядит следующим образом (только основная часть):
@Override
protected void encode(ChannelHandlerContext ctx, TrafficData msg, ByteBuf out) throws Exception {
try {
// do something
byte[] data = this.strategy.serialize(msg);
// do something
int len = data.length;
// записать тип сериализации - длина массива байтов, соответствующая строке типа
*Примечание: в тексте запроса присутствуют фрагменты кода на языке Java, которые не были переведены.* ```
out.writeBytes(TRpcConstants.RPC_TRAFFIC_TYPE.getBytes(CharsetUtil.UTF_8));
// 写入长度-占4位
out.writeInt(len);
// 写入类型-占4位
out.writeInt(msg.getType().value());
// 写入数据
out.writeBytes(data);
// 写入终止符
out.writeBytes(TRpcConstants.RPC_TRAFFIC_EOF.getBytes(CharsetUtil.UTF_8));
} catch(Exception e) {
logger.error("TrafficEncoder: 编码异常...本次连接关闭");
logger.error(e.getMessage());
ctx.close();
}
解码器如下 (只贴出核心部分):
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
// 1.判断长度
if (in.readableBytes() <= TRpcConstants.RPC_TRAFFIC_VALID_LEN) {
logger.warn("TrafficDecoder: 传输的数据不合法...数据长度不正确...移交后续解码器处理...");
// 中止处理,触发后续管道操作
ctx.fireChannelRead(in);
return;
}
// 2.判断类型
// 读取序列化类型字节数组
byte[] types = new byte[TRpcConstants.RPC_TRAFFIC_TYPE_LEN];
try {
in.readBytes(types);
} catch (IndexOutOfBoundsException e) {
logger.debug("TrafficDecoder: 类型不正确...移交后续解码器处理...");
// 重置读取的index避免影响后续解码器
in.resetReaderIndex();
ctx.fireChannelRead(in);
return;
}
if (!TRpcConstants.RPC_TRAFFIC_TYPE.equals(new String(types, CharsetUtil.UTF_8))) {
logger.debug("TrafficDecoder: 类型不正确...移交后续解码器处理...");
in.resetReaderIndex();
ctx.fireChannelRead(in);
return;
}
// 读取报文的数据长度
int len = in.readInt();
if (len <= 0) {
logger.error("TrafficDecoder: 传输的数据不合法...数据长度不正确...");
// 直接关闭连接,不再接收,简单防止攻击
ctx.close();
return;
}
// 读取类型
int type = in.readInt();
// 剩余的数据量长度(数据长度加上终止符长度)
int validLen = len + TRpcConstants.RPC_TRAFFIC_EOF_LEN;
if (in.readableBytes() < validLen) {
// 此时有可能数据量大,还未读完,等待下一次读取
in.resetReaderIndex();
} else if (in.readableBytes() > validLen) {
// 进入这个case中有两种情况
// 1: 业务设计ok的情况下,进入此分支则出现了沾包问题(两次消息利用同一个channel无间隔发送)
// 2: 数据被篡改了
// =======进行拆包======
byte[] body = new byte[len];
// 将可读缓冲区有效业务部分字节数据读入临时数组
in.readBytes(body);
// 判断之后是否是终止符
byte[] eof = new byte[TRpcConstants.RPC_TRAFFIC_EOF_LEN];
in.readBytes(eof);
// 判断是否是终止符
if (TRpcConstants.RPC_TRAFFIC_EOF.equals(new String(eof, CharsetUtil.UTF_8))) {
// 保存当前读取位置,此处拆包完成
TrafficData traffic = JsonUtil.convertToObject(body, TrafficTypeHelper.getTrafficClassType(type));
out.add(traffic);
logger.debug("TrafficDecoder: 数据流(拆包)读取完毕...长度:" + len + " ...类型:"
+ TrafficType.findName(type));
// 保存当前的readerIndex位置,读取下一个包的数据
in.markReaderIndex();
} else {
logger.error("TrafficDecoder: 传输的数据不合法...数据被篡改...");
// 直接关闭连接,不再接收
ctx.close();
}
} else {
logger.debug("TrafficDecoder: 数据流读取完毕...长度:" + len + " ...类型:" + TrafficType.findName(type));
byte[] body = new byte[len];
// 将剩余的缓冲区字节数据读入临时数组
in.readBytes(body);
// 将终止符读完(注意,netty中in.readBytes传入的参数为int类型时会生成新的ByteBuf,此时需要释放这个新的ByteBuf,否则会内存泄露)
// in.readBytes(TRpcConstants.RPC_TRAFFIC_EOF_LEN);
in.readBytes(new byte[TRpcConstants.RPC_TRAFFIC_EOF_LEN]);
TrafficData traffic = this.strategy.deserialize(body, TrafficTypeHelper.getTrafficClassType(type));
out.add(traffic);
// 保存当前的readerIndex位置(本次有可能是沾包数据,但是刚好一半一半)
in.markReaderIndex();
}
} catch (Exception e) {
logger.error("TrafficDecoder: 解码异常...本次连接关闭...异常信息{}", e.getMessage());
ctx.close();
}
}
``` ### Клиентское программное обеспечение: сервис обнаружения
Сервис обнаружения клиента по умолчанию использует способ чтения файла `properties`. Файл находится в `/resources/rpc-server.properties`.
Формат файла:
```properties
#serverId=host:port
# 服务器id=IP地址:порт
# Много экземпляров сервера id указывают на один и тот же, разделяя их запятыми
app=127.0.0.1:8081,127.0.0.1:18081
time=127.0.0.1:18081
При нормальном запуске класс конфигурации по умолчанию генерирует стратегию обнаружения сервиса на основе файла.
/**
* Сервис обнаружения
* @return Сервис обнаружения Bean
*/
@Bean
@ConditionalOnMissingBean(TRpcDiscoveryStrategy.class)
public TRpcDiscoveryStrategy rpcDiscovery() {
return new TRpcDiscoveryByProperties();
}
Кроме того, фреймворк также предоставляет стратегию обнаружения сервисов на основе zookeeper
в качестве центра регистрации. Необходимо настроить TRpcDiscoveryByZk
как Bean
.
В TRpcDiscoveryByZk
вместе с инструментами класса пула соединений уже реализована функция динамического обновления сервисов.
@Configuration
@TRpcServiceScanner(basePackages = { "aaa.bbb.ccc" })
public class XXXConfig {
// Вводим конфигурацию
@Autowired
private TRpcConfigProperties rpcConfigProperties;
/**
* Обнаружение сервиса на основе zookeeper
* @return Стратегия обнаружения сервиса Bean
* @throws Exception
*/
@Bean
public TRpcDiscoveryStrategy rpcDiscoveryZk() throws Exception {
return new TRpcDiscoveryByZk(rpcConfigProperties);
}
}
Автоматическое проксирование интерфейса в основном зависит от реализации аннотации TRpcServiceScanner
, где @Import
используется для импорта RpcServiceScanner.class
.
@Retention(RUNTIME)
@Target(TYPE)
@Import(RpcServiceScanner.class)
public @interface TRpcServiceScanner {
String[] basePackages();
}
В классе RpcServiceScanner
получают значение basePackages
аннотации TRpcServiceScanner
, а затем сканируют и создают определение bean
.
public class RpcServiceScanner implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
// Получаем метаданные аннотации импортированного класса
AnnotationAttributes annotationAttributes = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(TRpcServiceScanner.class.getName()));
// Получаем указанный путь пакета сканирования
String[] basePackages = annotationAttributes.getStringArray("basePackages");
RpcClassPathBeanDefinitionScanner scanner = new RpcClassPathBeanDefinitionScanner(registry, false);
try {
scanner.scanRpc(registry, basePackages);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | IOException e) {
e.printStackTrace();
}
}
}
Класс использует RpcClassPathBeanDefinitionScanner
для сканирования файлов Class
.
public class RpcClassPathBeanDefinitionScanner extends ClassPathBeanDefinitionScanner {
public RpcClassPathBeanDefinitionScanner(BeanDefinitionRegistry registry, boolean useDefaultFilters)
{
super(registry, useDefaultFilters);
}
protected void scanRpc(BeanDefinitionRegistry registry, String... basePackages) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
// Добавляем фильтр типа сканирования
super.addIncludeFilter(new AnnotationTypeFilter(TRpcRemoteService.class));
// Сканируем пакет для получения метаданных — поскольку spring по умолчанию doScan игнорирует верхний уровень интерфейса, необходимо вручную переписать эту логику
Set<BeanDefinition> candidates = new LinkedHashSet<>();
ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
for (String basePackage: basePackages) {
// Нижеприведённая логика напрямую копирует код spring
String searchPath = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + basePackage.replace(".", File.separator) + File.separator + "/**/*.class";
// Сканируем все классы в пакете
Resource[] resources = resourcePatternResolver.getResources(searchPath);
for (Resource resource : resources) {
MetadataReader metadataReader = getMetadataReaderFactory().getMetadataReader(resource);
ScannedGenericBeanDefinition sbd = new ScannedGenericBeanDefinition(metadataReader);
sbd.setSource(resource);
// Добавляем в список кандидатов
candidates.add(sbd);
}
}
for (BeanDefinition s : candidates) {
GenericBeanDefinition definition = (GenericBeanDefinition) s;
Class<?> clazz = Class.forName(definition.getBeanClassName());
Annotation[] annotations = clazz.getAnnotations();
for (Annotation a : annotations) {
// **Текст запроса**:
判断注解类型和Class类型(只代理接口) if (TRpcRemoteService.class.isAssignableFrom(a.annotationType()) && clazz.isInterface()) { // 全限定类(接口)名 String className = definition.getBeanClassName(); String beanName = className.substring(className.lastIndexOf(".")); // 按类型自动装配 definition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE); // 设置生成Bean实例的工厂 definition.setBeanClass(RpcProxyBeanFactory.class); // 设置创建对象工厂的构造方法参数 definition.getConstructorArgumentValues().addIndexedArgumentValue(0, className); // 注册bean registry.registerBeanDefinition(beanName, definition); } } } } }
**Перевод текста на русский язык**:
Определить тип аннотации и тип класса (только прокси-интерфейс).
Если (TRpcRemoteService.class может быть присвоен из a.annotationType() и класс является интерфейсом), то:
* // полное имя класса (интерфейса)
Строка className = определение.getBeanClassName();
Строка beanName = className.подстрока(className.последний индекс("."));
* // автоматическая сборка по типу
определение.setAutowireMode (GenericBeanDefinition.AUTOWIRE_BY_TYPE);
* // установить фабрику для создания экземпляров Bean
определение.setBeanClass (RpcProxyBeanFactory.класс);
* // задать параметры конструктора фабрики объектов
определение.getConstructorArgumentValues ().addIndexedArgumentValue (0, className);
* // зарегистрировать bean
реестр.registerBeanDefinition (имя_компонента, определение);
}
### Клиентский пул соединений
В случае нескольких серверов клиент будет распределять пулы соединений в соответствии с каждым сервером serverId. В качестве базового пула соединений используется FixedChannelPool от netty, а также AbstractChannelPoolMap для установления связи между serverId и пулом соединений. Затем пулы соединений с одинаковым serverId объединяются в TClientPoolWrapper для достижения высокой доступности. Кроме того, в классе инструментов пула соединений запускается задача по расписанию для очистки недоступных пулов соединений.
Основная логика (некоторые операции log удалены):
```java
/**
* Инициализация пула соединений
*
* @param addrList список адресов служб
*/
public void init(List<TRpcRemoteAddress> addrList) {
if (this.config == null) {
this.failureMsg.append("1. Не настроено соединение пула... Ошибка инициализации соединения пула... Необходимо вызвать метод config() для настройки...");
}
if (addrList == null || addrList.isEmpty()) {
this.failureMsg.append("2. Не задано удалённое местоположение...");
}}
this.workerGroup = new NioEventLoopGroup();
this.bootstrap = new Bootstrap();
this.bootstrap.group(this.workerGroup).channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.config.getConnectionTimeout());
// Инициализируем poolMap
poolMap = new AbstractChannelPoolMap<TRpcRemoteAddress, FixedChannelPool>() {
@Override
protected FixedChannelPool newPool(TRpcRemoteAddress key) {
return new FixedChannelPool(bootstrap.remoteAddress(key.getRemoteAddress()),
new TClientChannelPoolHandler(config), config.getMaxPoolConnection());
}
};
// Предварительное создание
if (addrList != null && !addrList.isEmpty()) {
// Создаём пул соединений
this.initPool(addrList);
// Очистить информацию об ошибке операции
this.failureMsg.delete(0, this.failureMsg.length());
// Запускаем таймер обновления
this.startTimer();
} else {
this.failureMsg.append("3. Не удалось получить удалённое расположение, предварительное создание пула соединений не завершено...");
}
}
/**
* Создание пула соединений
*
* @param addrList Список служб
*/
private void initPool(List<TRpcRemoteAddress> addrList) {
this.rpcMaps = (this.rpcMaps == null || this.rpcMaps.size() == 0) ? new HashMap<>(32) : this.rpcMaps;
addrList.forEach(addr -> {
// poolMap вызывает get для получения пула соединений по умолчанию, если он уже создан, то сразу возвращает
String id = addr.getServerId();
if (this.rpcMaps.containsKey(id)) {
// одинаковые соединения не будут добавляться повторно
if (!this.poolMap.contains(addr)) {
this.rpcMaps.get(id).add(new TClientPoolWrapper(addr, this.poolMap.get(addr)));
}
} else {
List<TClientPoolWrapper> poolList = new ArrayList<>();
poolList.add(new TClientPoolWrapper(addr, this.poolMap.get(addr)));
this.rpcMaps.put(id, poolList);
}
});
// Успешная инициализация
this.status = true;
}
Вышеупомянутый this.rpcMaps — это пул сопоставления клиентских служб, и один и тот же удалённый сервис будет отображаться в одном списке. Атрибуты следующие:
private Map<String, List<TClientPoolWrapper>> rpcMaps;
При удалённом вызове канал будет получен из следующего метода, и после получения канала будет выполнен простой случайный опрос, чтобы получить подчинённого текущего канала для последующего освобождения.
/**
* Получение канала указанного сервиса
*
* @param key идентификатор службы
* @return канал
*/
public Channel getPooledChannel(String key) {
if (this.rpcMaps == null || this.rpcMaps.isEmpty()) {
throw new TRpcOperationException("Соединение пула не инициализировано... Необходимо вызвать метод init() для инициализации...");
}
List<TClientPoolWrapper> poolList = rpcMaps.get(key);
if (poolList == null || poolList.isEmpty()) {
throw new TRpcConnectionException("Удалённое соединение потеряно... Нет доступного пула соединений...");
}
// Случайный выбор по модулю массива
int random = this.rand.nextInt(1024);
int retryCnt = 0;
// Гарантируем, что каждый пул может попробовать
int limit =
``` **poolList.size();**
int index = 0;
Channel channel = null;
do {
index = random % limit;
// Использование случайного индекса для получения соединения из пула
channel = poolList.get(index).acquireChannel(this.config.getRetryCnt(), this.config.getReconnectionTime());
// Если получение не удалось, то увеличить случайное число и повторить попытку с другим пулом
random++;
retryCnt++;
} while (channel == null && retryCnt < limit);
if (channel == null) {
throw new TRpcCoreException("Невозможно получить канал соединения...");
}
// Привязать атрибут к текущему каналу, указав его принадлежность к определённому пулу
AttributeKey<Map<String, Integer>> attrKey = AttributeKey.valueOf(POOL_KEY);
Map<String, Integer> metaMap = new HashMap<>();
metaMap.put(key, index);
channel.attr(attrKey).set(metaMap);
return channel;
}
В классе TClientPoolWrapper
выполняется проверка работоспособности и повторное подключение к пулам соединений. Код для получения канала:
/**
* Получение канала
*
* @param retryCnt максимальное количество попыток получения канала
* @param reconnectionTime максимальное время для повторного подключения пула
* @return канал
*/
public Channel acquireChannel(int retryCnt, int reconnectionTime) {
// Пул недоступен, сразу генерируем исключение
if (!this.isHealthy()) {
logger.warn("Соединение с пулом соединений {} недоступно... Проверьте состояние сети...", this.key.getAddressString());
// Переходим к следующему пулу
return null;
}
Future<Channel> channeFuture = this.pool.acquire();
Channel channel = null;
try {
channel = channeFuture.get();
} catch (InterruptedException | ExecutionException e) {
// Проверка работоспособности, повторная попытка
this.increaseFailureCnt(retryCnt, reconnectionTime);
}
return channel;
}
Прокси-класс наследуется от класса, реализующего интерфейс InvocationHandler
, который вызывает метод action
для выполнения основной бизнес-логики.
@Override
protected Object action(Method method, Object[] args) throws TRpcTimeoutException, TRpcConnectionException {
if (!CHANNEL_POOL.isReady()) {
throw new TRpcConnectionException("Пул соединений не инициализирован успешно... Информация об операции: " + CHANNEL_POOL.getFailureMsg());
}
// Получаем указанный идентификатор сервера
TRpcRemoteService a = method.getDeclaringClass().getAnnotation(TRpcRemoteService.class);
String serverId = a.serverId();
// Получаем коммуникационный канал
Channel ch = CHANNEL_POOL.getPooledChannel(serverId);
TRequest req = new TRequest();
// Метод должен совпадать, чтобы можно было вызвать
req.setMethod(method.getName());
req.setParams(args);
// Класс службы, к которой осуществляется вызов
String className = a.className();
if (className.isBlank()) {
className = method.getDeclaringClass().getSimpleName();
}
req.setClsNm(className);
// Предварительно создаём ответ
CompletableFuture<TrafficData> trafficFuture = TRpcTrafficDataContextHolder.newFuture(req.getId());
ch.writeAndFlush(req).addListener((f) -> {
if (f.isSuccess()) {
logger.debug("Отправка прошла успешно...");
} else {
logger.warn("Удаленный вызов {} не удался...", method.getName());
// В случае неудачи очищаем кэш
TRpcTrafficDataContextHolder.removeCache(req.getId());
}
// Освобождаем канал после использования
CHANNEL_POOL.release(ch);
}).syncUninterruptibly();
try {
TResponse res = trafficFuture.get(TRpcTrafficDataContextHolder.getTimeout() + 50, TimeUnit.MILLISECONDS).convert();
if (res.getStatus() != 200) {
throw new TRpcResponseException(res.getMsg());
} else {
return res.getBody();
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.error("Исключение RPC-вызова... Сообщение об ошибке: {}", e.getMessage());
// Удаляем кэш для текущего запроса в случае исключения
TRpcTrafficDataContextHolder.removeCache(req.getId());
throw new TRpcOperationException(e.getMessage(), e);
}
}
Для многопоточной синхронизации данных и привязки прокси-класс использует TRpcTrafficDataContextHolder
. Перед отправкой запроса создаётся будущее значение ответа и сохраняется в кэше TRpcTrafficDataContextHolder
. В случае возникновения исключения этот ответ удаляется.
Обратите внимание: при вызове метода
get
помимо указанного времени ожидания добавляется ещё 50 миллисекунд, что является дополнительным временем для учёта передачи данных по сети.
Полный код TRpcTrafficDataContextHolder
:
public final class TRpcTrafficDataContextHolder {
private static final Logger logger = LoggerFactory.getLogger(TRpcTrafficDataContextHolder.class);
/**
* Кэш результатов ответов, разделенный на разные ответы по ключу<br>
* Инициализация хэш-карты с емкостью для уменьшения расширения
*/
private static final ConcurrentHashMap<String, CompletableFuture<TrafficData>> resultMap = new ConcurrentHashMap<>(
128);
/**
*
``` **Отсроченная неограниченная очередь для своевременного удаления недействительных ответов в экстремальных ситуациях**
*/
private static final DelayQueue<InvalidKey> INVALIDKEY_QUEUE = new DelayQueue<>();
/**
* Поток очистки кэша
*/
private static final Thread sweeper = new Thread(() -> {
while (true) {
try {
logger.info("продолжайте очищать кэш...");
resultMap.remove(INVALIDKEY_QUEUE.take().getKey());
} catch (InterruptedException e) {
logger.warn("поток очистки был прерван...");
}
}
}, "the sweeper");
static {
sweeper.start();
}
private TRpcTrafficDataContextHolder() throws IllegalAccessException {
throw new IllegalAccessException("Незаконная операция...");
}
/**
* Время ожидания
*/
private static int TIMEOUT = 2000;
public static void setTimeout(int timeout) {
TIMEOUT = timeout;
}
public static int getTimeout() {
return TIMEOUT;
}
/**
* Создание нового объекта CompletableFuture результата
*
* @param trafficId уникальный идентификатор
* @return пустой объект CompletableFuture
*/
public static CompletableFuture<TrafficData> newFuture(String trafficId) {
CompletableFuture<TrafficData> future = new CompletableFuture<>();
var old = resultMap.putIfAbsent(trafficId, future);
if (old != null) {
throw new TRpcDuplicateKeyException();
}
return future;
}
/**
* Завершение указанного future, вызывается после получения ответа клиентом
*
* @param data данные ответа
*/
public static void complete(TrafficData data) {
CompletableFuture<TrafficData> future = resultMap.remove(data.getId());
if (future != null) {
future.complete(data);
} else {
throw new TRpcCoreException("future not found...");
}
}
/**
* Удаление ответа из кэша
*
* @param trafficId уникальный идентификатор
* @return ответ из кэша
*/
public static CompletableFuture<TrafficData> removeCache(String trafficId) {
CompletableFuture<TrafficData> invalidData = resultMap.remove(trafficId);
// Отсрочка перед удалением (гарантирует, что в экстремальных условиях, когда поток ожидает максимального значения и условия ответа клиента выполняются, кэш не может быть нормально использован)
if (invalidData == null) {
INVALIDKEY_QUEUE.add(new InvalidKey(trafficId));
}
return invalidData;
}
}
В случае нормального ответа, после завершения ответа на основе запроса, созданного во время запроса, будет найден соответствующий предварительно созданный Future для завершения обработки.
### Событие ответа клиента
После получения ответа в цепочке обработчиков клиентского кода, он сравнивает разницу между временем начала ответа сервера и текущим временем с указанным временем ожидания запроса в файле конфигурации, чтобы контролировать обработку тайм-аута. Здесь используется разница между текущим временем и временем генерации ответа сервером, без тщательного учёта времени передачи данных (в прокси-классе добавлено увеличение на 50 мс), поэтому могут возникнуть ситуации, когда Future запроса истекает раньше времени. В описанном выше TRpcTrafficDataContextHolder поддерживается отложенная очередь, и если ключ не найден в кэше при удалении, этот ключ будет добавлен в очередь. Через 5 секунд он будет удалён снова.
```java
@Override
protected void channelRead0(ChannelHandlerContext ctx, TrafficData msg) throws Exception {
logger.debug("{}客户端{}读取响应信息...", TRpcConstants.LOG_CLIENT_MSG_PREFIX, ctx.channel().remoteAddress());
try {
// Время начала ответа
long ts = msg.getTs();
// Текущее время
long curTs = System.currentTimeMillis();
// Если время превышает время ожидания, то оно отбрасывается и не помещается в кэш (в экстремальных случаях - как раз достигает порога, который будет помещён в кэш, а затем удалён в потоке очистки)
if (curTs - ts > this.requestTimeout) {
logger.warn("{}Ответ слишком длинный... Данные были отброшены...", TRpcConstants.LOG_CLIENT_MSG_PREFIX);
} else {
// Ответное значение сохраняется в общей карте
TRpcTrafficDataContextHolder.complete(msg);
}
} catch(Exception e) {
// Удаление из кэша
TRpcTrafficDataContextHolder.removeCache(msg.getId());
logger.error("{}Обработка ответа завершилась неудачно... Информация об ошибке:{}", TRpcConstants.LOG_CLIENT_MSG_PREFIX, e.getMessage());
}
}
Когда rpc.service.serviceType установлен на BOTH или SERVER, проект запускается, и другой поток запускает RPC-сервер. Все классы, отмеченные аннотацией TRpcLocalService, сканируются и сохраняются в кэше с использованием имени класса в качестве ключа и экземпляра класса в качестве значения. Основная операция выполняется в классе помощника TLocalServiceHolderContext. При вызове метода используется оптимизация производительности через прокси cglib. После получения запроса сервер вызывает службу.
Код выглядит следующим образом:
public class TLocalServiceHolderContext {
/**
* Полное имя класса - информация о классе
*/
private static final Map<String, ServiceMetaData> SERVICE_MAP = new HashMap<>(32);
/**
* Добавить сервис
* @param name имя класса
* @param metaData метаданные класса
*/
public static void addService(String name, ServiceMetaData metaData) {
SERVICE_MAP.put(name, metaData);
} ```
/**
* Количество локальных сервисов
* @return Количество локальных сервисов
*/
public static int size() {
return SERVICE_MAP.size();
}
/**
* Вызов метода сервиса
* @param serviceName Имя сервиса
* @param methodName Имя метода
* @param params Параметры
* @return Возвращаемое значение метода сервиса
*/
public static Object invoke(String serviceName, String methodName, Object[] params) {
ServiceMetaData metaData = SERVICE_MAP.get(serviceName);
if (metaData == null) {
throw new TRpcCoreException("Не найден соответствующий сервис...");
}
// Получение типа, создание cglib-прокси и выполнение (быстрее, чем прямое использование рефлексии)
FastClass fc = FastClass.create(metaData.getBean().getClass());
FastMethod m = fc.getMethod(metaData.getMethod(methodName));
try {
return m.invoke(metaData.getBean(), params);
} catch (InvocationTargetException e) {
throw new TRpcCoreException("Ошибка вызова сервиса... Сообщение об ошибке:" + e.getMessage());
}
}
/**
* Метаданные локального сервиса
* @author TMQ
*/
public static class ServiceMetaData {
public ServiceMetaData(Object bean) {
this.bean = bean;
}
/**
* Фактический объект сервиса
*/
private Object bean;
/**
* Отображение имени метода на метод объекта
*/
private Map<String, Method> serviceMethods = new HashMap<>();
public Object getBean() {
return bean;
}
/**
* Добавление метода сервиса
* @param name Имя метода
* @param method Метод объекта
*/
public void addServiceMethod(String name, Method method) {
this.serviceMethods.put(name, method);
}
/**
* Получение метода сервиса
* @param name Имя метода
* @return Метод объекта
*/
public Method getMethod(String name) {
return this.serviceMethods.get(name);
}
}
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )