Фреймворк RPC, основанный на Netty
Поддерживает пользовательские протоколы, поддерживает прямое запуск клиента или интеграцию с Spring, позволяет использовать пользовательские обработчики, прост в использовании
Компонент распределенного скачивания уже использует x-netty-rpc
адрес на Gitee: https://gitee.com/lingfengx/x-downloader
адрес на GitHub: https://github.com/lingfengcoder/x-downloader
// Пример: сервис A вызывает метод register сервиса B
// Сервис A
// Интерфейс удалённого сервиса
@RpcClient("ServiceB")
public interface ServiceB {
void register(BasicFrame<Object> frame);
}
class Test {
// Или через @Autowired
@Autowired
private ServiceB serviceb;
// Использование удалённого сервиса
public void test() {
serviceb.register(new BasicFrame());
}
}
// Сервис B
// Целевой метод
@RpcComponent("ServiceB")
public class ServiceB {
@RpcHandler("register")
public void register(BasicFrame<Object> frame) {
// Вывод данных BasicFrame
log.info(frame);
}
}
```## Пользовательский безопасный протокол SafeFrame
Поддерживает гибкую замену протоколов сериализации (JSON, JAVA, string, protobuf)
Поддерживает автоматическое шифрование и расшифровку (AES, RSA)
Поддерживает автоматическую проверку подписи сообщений для предотвращения изменения (MD5)
Встроенная метка времени для предотвращения повторных атак (timestamp)
Планируется добавить протокол Protobuf и алгоритм сжатия Snappy
// Запрос REQUEST((byte) 1), // Ответ RESPONSE((byte) 2), // Пинг HEARTBEAT((byte) 3);
private byte cmd;
// Тип сериализации данных (content) JSON_SERIAL JAVA_SERIAL
private byte serial;
// Тип шифрования // Открытый текст NONE((byte) 0),// AES AES((byte) 2), // RSA RSA((byte) 3);
private byte encrypt;
// Временная метка, аналогичная соли
private long timestamp;
// Идентификатор клиента -1 означает сервер
private long client;
// Подпись сообщения MD5 фиксированной длины 32 байта
private String sign;
// Длина content
private int length;
// Содержимое
private T content;
```## Клиент
Поддерживает автоматический перезапуск и подключение
Поддерживает ручное закрытие соединения
Поддерживает многопоточное отправление сообщений
### Интерфейс клиента
NettyClient {
int state(); // Состояние клиента
void start(); // Запуск
void restart(); // Перезапуск
void close(); // Закрытие
long getClientId(); // Получение идентификатора клиента
void defaultChannel(Channel channel); // Установка по умолчанию канала
<M extends Serializable> void writeAndFlush(Channel channel, M msg, Cmd type); // Отправка сообщения
<M extends Serializable> void writeAndFlush(ChannelHandlerContext channel, M msg, Cmd type); // Отправка сообщения
// Сохраненный интерфейс void writeAndFlush(M msg, Cmd type); }
### Пример клиента
BizNettyClient client = NettyClientFactory.buildBizNettyClient(new Address("127.0.0.1", 9999),
() -> Arrays.asList(new NettyReqHandler())); // Собственный NettyReqHandler обработчик сообщений
client.start(); // Запуск
### Собственная аннотация @RpcHandler для обработки сообщений
Приходящие сообщения от сервера обрабатываются с помощью прокси и рефлексии для выполнения удаленных методов RPC
@RpcHandler("complexParam")
public Object complexParam(Map<String, Long> param) {
Thread thread = Thread.currentThread();
log.info(" client received a map data = {} ,thread={}", param, thread);
return "map is OK --bbq";
}
### Собственный NettyReqHandler обработчик сообщений
protected void channelRead0(ChannelHandlerContext ctx, SafeFrame<Frame>> data) throws Exception {
byte cmd = data.getCmd();
if (cmd == Cmd.REQUEST.code()) {
Frame> frame = data.getContent();
String name = frame.getTarget();
// Использование пула потоков для обработки задач
getExecutor().execute(() -> {
// Прокси-вызов метода
RpcInvokeProxy.invoke(ret -> {
// Возврат данных
Frame resp = new Frame<>();
resp.setData(ret);
writeAndFlush(ctx.channel(), resp, Cmd.RESPONSE);
Поддерживает управление клиентами
Поддерживает отправку сообщений, закрытие клиентов и другие операции через API
### Интерфейсы сервера
// Состояние сервера int state(); // Запуск сервера void start(); // Перезапуск сервера void restart(); // Остановка сервера void stop(); // Получение идентификатора сервера long getServerId();
// Отправка сообщений void writeAndFlush(Channel channel, M msg, Cmd type);
void writeAndFlush(ChannelHandlerContext channel, M msg, Cmd type);
// Добавление канала клиента void addChannel(String clientId, Channel channel); // Закрытие канала по идентификатору клиента void closeChannel(String clientId); // Получение всех каналов клиентов Collection allChannels(); // Получение канала клиента по идентификатору Channel findChannel(String clientId); // Вывод информации о всех каналах void showChannels();
### Пример использования сервера
BizNettyServer server = NettyServerFactory.buildBizNettyServer( new Address("127.0.0.1", 9999), () -> Arrays.asList(new NettyServerHandler())); server.start();
### Обработчики сервера
@Override protected void channelRead0(ChannelHandlerContext ctx, SafeFrame<Frame>> data) throws Exception { byte cmd = data.getCmd(); // Запрос if (cmd == Cmd.REQUEST.code()) { Frame> frame = data.getContent(); log.info(" server get REQUEST data = {}", frame); // Отправка ответа // writeAndFlush(ctx.channel(), resp, Cmd.REQUEST); } // Ответ if (cmd == Cmd.RESPONSE.code()) { Frame<?> frame = data.getContent(); log.info("server get RESPONSE data = {}", frame); } else { //ctx.fireChannelRead(data); } }
### Сериализация Java
```java
public <T> byte[] serialize(T obj) {
ByteArrayOutputStream byteArrayOutputStream =
new ByteArrayOutputStream();
try {
ObjectOutputStream outputStream =
new ObjectOutputStream(byteArrayOutputStream);
@Override
public <T> byte[] serialize(T obj) {
return GsonTool.toJson(obj).getBytes(StandardCharsets.UTF_8);
}
@Override
public <T> T deserialize(byte[] data, Class<T> clazz) {
return GsonTool.fromJson(new String(data), clazz);
}
LengthFieldBasedFrameDecoder — декодер с пользовательским определением длины, который решает проблему TCP-склеивания. Поэтому LengthFieldBasedFrameDecoder также известен как декодер с пользовательским определением длины.
1. Причина отправителя: TCP по умолчанию использует алгоритм Nagle. Алгоритм Nagle выполняет две основные задачи: 1) отправка следующего пакета только после подтверждения предыдущего пакета; 2) сбор нескольких маленьких пакетов и отправка их вместе при получении подтверждения. Поэтому алгоритм Nagle может вызвать склеивание данных у отправителя.
### 1.3 Как решить проблему склеивания пакетов
-2. lengthFieldOffset - определение смещения поля длины в отправляемом массиве байтов. Другими словами: в отправляемом массиве байтов смещение поля длины начинается с индекса ${lengthFieldOffset}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )