http://159.75.121.163/ admin admin
Архитектура Rpush обеспечивает простоту добавления поддержки новых типов сообщений для платформ, поэтому большая часть времени при расширении системы уходит на поиск документации по интеграции с новой платформой. В будущем планируется добавить поддержку других платформ и типов сообщений. Также приветствуется участие в расширении системы (добавление поддержки нового типа сообщений для платформы требует всего нескольких классов Java и не требует написания кода для интерфейса, обеспечивая все функции).
Для локальной установки необходимо выполнить следующие шаги:
sql/表结构.sql
для инициализации базы данных.rpush-route/src/main/resources/application.yml
на свои данные.com.regent.rpush.eureka.EurekaServerApplication
.com.regent.rpush.zuul.App
.com.regent.rpush.route.RouteApplication
.После запуска можно открыть браузер и перейти по адресу http://localhost:8124
, где будет доступна полная функциональность рассылки сообщений.
Следуя принципу «бизнес-сервисы отвечают только за отправку сообщений», код для отправки должен быть простым. Поэтому в Rpush отправка сообщения для одной платформы занимает всего одну строку кода, а для нескольких платформ — несколько строк. Например:
/**
* @author shuangmulin
* @since 2021/6/8/008 11:37
**/
public class RpushSenderTest {
/**
* Содержание сообщения.
*/
public static final String content = "Ваша встреча уже запланирована \n" +
">**Детали**: встреча \n" +
">Организатор: @miglioguan \n" +
">Участники: @miglioguan, @kunliu, @jamdeezhou, @kanexiong, @kisonwang \n" +
"> \n" +
">Конференц-зал: TIT 1 этаж, комната 301, Гуанчжоу \n" +
">Дата: 2021 год, 18 мая \n" +
">Время: с 9:00 до 11:00 \n" +
"> \n" +
">Пожалуйста, приходите вовремя. \n" +
"> \n" +
">Если вам нужно изменить информацию о встрече, нажмите: [Изменить информацию о встрече](https://work.weixin.qq.com)";
public static void main(String[] args) {
// Сообщение в формате Markdown для WeChat Work
MarkdownMessageDTO markdown = RpushMessage.WECHAT_WORK_AGENT_MARKDOWN().content(content).receiverIds(Collections.singletonList("ZhongBaoLin")).build();
// Сообщение для группового робота WeChat Work
TextMessageDTO text = RpushMessage.WECHAT_WORK_ROBOT_TEXT().content(content).receiverIds(Collections.singletonList("ZhongBaoLin")).build();
// Электронное письмо
EmailMessageDTO email = RpushMessage.EMAIL().title("Уведомление о встрече").content(content).build();
RpushService.instance("baolin", "666666").sendMessage(markdown, text, email); // Введите имя пользователя и пароль, чтобы запустить
}
}
Этот код отправляет три разных типа сообщений на разные платформы, и весь код занимает всего четыре или пять строк. Чтобы получить этот результат, достаточно добавить зависимость от rpush-sdk в Maven:
<project>
<!-- Настройка репозитория jitpack.io -->
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>
<dependencies>
<!-- Добавить зависимость rpush-sdk -->
<dependency>
<groupId>com.github.shuangmulin.rpush</groupId>
<artifactId>rpush-sdk</artifactId>
<version>v1.0.2</version>
</dependency>
</dependencies>
</project>
Rpush предлагает гибкий подход к реализации мгновенной связи, позволяя выбирать между различными технологиями, такими как Netty, WebSocket, Comet и даже Bio. Здесь показаны примеры однопользовательского и группового обмена сообщениями между веб-страницей и клиентом командной строки, использующими WebSocket и Netty соответственно (код примера клиента доступен по ссылке: https://github.com/shuangmulin/rpush-client-sample). ### Некоторые ключевые точки расширения
В дизайне Rpush сообщения классифицируются как «платформы сообщений» и «типы сообщений», которые соответствуют следующим двум перечислениям:
/**
* Перечисление платформ сообщений
**/
public enum MessagePlatformEnum {
EMAIL(EmailConfig.class, "электронная почта", "", "^[_a-z0-9-]+(\\.[_a-z0-9-]+)*@[a-z0-9-]+(\\.[a-z0-9-]+)*(\\.[a-z]{2,})$", true),
WECHAT_WORK_AGENT(WechatWorkAgentConfig.class, "корпоративный WeChat - приложение для сообщений", "", "", true),
WECHAT_WORK_ROBOT(WechatWorkRobotConfig.class, "корпоративный WeChat - групповой бот", "", "", true),
WECHAT_OFFICIAL_ACCOUNT(WechatOfficialAccountConfig.class, "WeChat Public Account", "", "", true),
DING_TALK_CORP(DingTalkCorpConfig.class, "Ding Talk - рабочее уведомление", "", "", true),
RPUSH_SERVER(EmptyConfig.class, "rpush сервис", "", "", true);
}
/**
* Перечисление типов сообщений
**/
public enum MessageType {
EMAIL("обычное письмо ", MessagePlatformEnum.EMAIL),
RPUSH_SERVER("текст", MessagePlatformEnum.RPUSH_SERVER),
// ================================корпоративный WeChat-приложение====================================
WECHAT_WORK_AGENT_TEXT("текст", MessagePlatformEnum.WECHAT_WORK_AGENT),
WECHAT_WORK_AGENT_IMAGE("изображение", MessagePlatformEnum.WECHAT_WORK_AGENT),
WECHAT_WORK_AGENT_VIDEO("видео", MessagePlatformEnum.WECHAT_WORK_AGENT),
WECHAT_WORK_AGENT_FILE("файл", MessagePlatformEnum.WECHAT_WORK_AGENT),
WECHAT_WORK_AGENT_TEXTCARD("текстовая карточка", MessagePlatformEnum.WECHAT_WORK_AGENT),
WECHAT_WORK_AGENT_NEWS("текст и изображение", MessagePlatformEnum.WECHAT_WORK_AGENT),
WECHAT_WORK_AGENT_MARKDOWN("Markdown", MessagePlatformEnum.WECHAT_WORK_AGENT),
// ================================корпоративный WeChat-групповой бот====================================
WECHAT_WORK_ROBOT_TEXT("текст", MessagePlatformEnum.WECHAT_WORK_ROBOT),
WECHAT_WORK_ROBOT_IMAGE("изображение", MessagePlatformEnum.WECHAT_WORK_ROBOT),
WECHAT_WORK_ROBOT_NEWS("текст и изображение", MessagePlatformEnum.WECHAT_WORK_ROBOT),
WECHAT_WORK_ROBOT_MARKDOWN("Markdown", MessagePlatformEnum.WECHAT_WORK_ROBOT),
// ================================WeChat Public Account====================================
WECHAT_OFFICIAL_ACCOUNT_TEXT("текст", MessagePlatformEnum.WECHAT_OFFICIAL_ACCOUNT),
WECHAT_OFFICIAL_ACCOUNT_NEWS("текст и изображение", MessagePlatformEnum.WECHAT_OFFICIAL_ACCOUNT),
WECHAT_OFFICIAL_ACCOUNT_TEMPLATE("шаблонное сообщение", MessagePlatformEnum.WECHAT_OFFICIAL_ACCOUNT),
// ================================Ding Talk-рабочее уведомление====================================
DING_TALK_COPR_TEXT("текст", MessagePlatformEnum.DING_TALK_CORP),
DING_TALK_COPR_MARKDOWN("Markdown", MessagePlatformEnum.DING_TALK_CORP),
DING_TALK_COPR_LINK("сообщение со ссылкой", MessagePlatformEnum.DING_TALK_CORP),
DING_TALK_COPR_ACTION_CARD_SINGLE("карточка с одной кнопкой", MessagePlatformEnum.DING_TALK_CORP),
DING_TALK_COPR_ACTION_CARD_MULTI("карточка с несколькими кнопками", MessagePlatformEnum.DING_TALK_CORP),
DING_TALK_COPR_OA("OA сообщение", MessagePlatformEnum.DING_TALK_CORP)
;
}
Здесь в качестве примера рассматривается тип сообщения «Текст корпоративного WeChat — приложения». Предположим, что теперь необходимо реализовать этот тип сообщения в Rpush, шаги следующие:
/**
* Конфигурация корпоративного WeChat
**/
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class WechatWorkAgentConfig extends Config {
private static final long serialVersionUID = -9206902816158196669L;
@ConfigValue(value = "корпоративный ID", description = "см. эту страницу: https://work.weixin.qq.com/wework_admin/frame#profile")
private String corpId;
@ConfigValue(value = "секрет приложения")
private String secret;
@ConfigValue(value = "идентификатор агента приложения")
private Integer agentId;
}
Поля внутри должны быть настроены в соответствии с требованиями соответствующей платформы. Например, здесь корпоративный WeChat требует только три поля для настройки. А каждое поле с аннотацией @ConfigValue
используется для автоматической генерации страниц, то есть достаточно добавить эту аннотацию, чтобы автоматически создать соответствующие интерфейсы добавления, удаления, изменения и проверки (без написания кода интерфейса).
В MessagePlatformEnum
и MessageType
добавить соответствующее перечисление, а именно WECHAT_WORK_AGENT(WechatWorkAgentConfig.class, «корпоративный WeChat — сообщение приложения», "", "", true)
и WECHAT_WORK_AGENT_TEXT («текст», MessagePlatformEnum.WECHAT_WORK_AGENT)
. Здесь следует обратить внимание на то, что первый параметр перечисления платформы является классом конфигурации, определённым на первом шаге.
Определить параметры сообщения корпоративного WeChat — текстового сообщения, например:
/**
* DTO для отправки сообщения корпоративного WeChat
**/
@EqualsAndHashCode(callSuper = true)
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public class TextMessageDTO extends BaseMessage {
private static final long serialVersionUID =
``` **2. Масштабируемый сокет-сервис rpush-server**
rpush-сервер как сокет-служба в основном поддерживает постоянное соединение с клиентами. Способность этой службы выдерживать нагрузку напрямую определяет, сколько пользователей может одновременно работать в системе обмена мгновенными сообщениями. Поэтому очевидно, что эта служба должна быть масштабируемой.
Предположим, развёрнуто 5 экземпляров сокет-службы, все они корректно настроены на использование Eureka в качестве центра регистрации. Чтобы реализовать кластеризацию сокет-служб, процесс подключения клиента к rpush-службе выглядит следующим образом:
1. Клиент запрашивает у маршрутизатора доступный IP-адрес и порт сервера сокетов.
2. Маршрутизатор выбирает доступный сервер сокетов на основе подходящего алгоритма балансировки нагрузки и возвращает клиенту его IP-адрес и номер порта.
3. Клиент устанавливает длительное соединение с выбранным сервером сокетов.
4. После успешного установления соединения соответствующий сервер сокетов сохраняет информацию о сеансе на уровне службы, а затем сообщает маршрутизатору об этом клиенте. Маршрутизатор сохраняет соответствие между клиентом и сервером сокетов.
5. Клиент и соответствующий сервер сокетов поддерживают определённую частоту пульсаций. Если пульс не проходит после разрыва соединения, клиент повторяет описанный выше процесс до тех пор, пока соединение снова не будет установлено.
После того как клиент подключён, другой клиент может отправить ему сообщение следующим образом:
1. Клиент запрашивает интерфейс отправки сообщений, предоставляемый маршрутизатором (это тот же интерфейс, который используется для отправки сообщений другим третьим сторонам, поэтому способ отправки сообщений одинаков).
2. Обработчик сообщений маршрутизатора (com.regent.rpush.route.handler.RpushMessageHandler) находит соответствующий сервер сокетов по идентификатору целевого клиента и отправляет ему сообщение.
3. Сервер сокетов находит целевого клиента из своего сохранённого сеанса и завершает отправку сообщения.
Реализовав этот процесс, можно добиться свободной кластеризации сокет-сервиса.
Описанный выше процесс несколько теоретизирован. Вот некоторые технические аспекты реализации:
**1. Как маршрутизатор использует подходящий алгоритм балансировки нагрузки для выбора доступного сервера сокетов?**
На самом деле это довольно просто. Сначала сервер сокетов предоставляет интерфейс для запроса своего собственного IP-адреса и номера порта. Затем маршрутизатор напрямую запрашивает этот интерфейс через Ribbon и реализует собственный класс правил балансировки нагрузки, чтобы выбрать сервер сокетов:
```java
/**
* Маршрутизатор -> Запрос экземпляра сервера сокетов
*/
public class ServerBalancer extends ZoneAvoidanceRule {
@Override
public Server choose(Object o) {
// ...
// Используйте алгоритм балансировки нагрузки по умолчанию для выбора доступного сервера сокетов (алгоритм можно изменить в соответствии с фактическими бизнес-потребностями)
return super.choose(o);
//...
}
}
В файле конфигурации этот «класс правил» настраивается следующим образом:
rpush-server:
ribbon:
NFLoadBalancerRuleClassName: com.regent.rpush.route.loadbalancer.ServerBalancer
Когда маршрутизатор запрашивает сервер сокетов, он проходит через этот «класс правил», который затем выбирает доступный сервер сокетов. В конечном итоге информация о номере порта и IP-адресе сервера сокетов также возвращается маршрутизатору через этот запрос. Конечно, этот класс правил выполняет не только эту задачу. Есть ещё одна проблема, которую должен решить этот класс.
2. Как маршрутизатор находит соответствующий сервер сокетов при отправке сообщений на основе идентификатора целевого клиента?
Во-первых, после того как клиент устанавливает соединение с определённым сервером сокетов, необходимо сохранить отношения между клиентом и этим сервером (например, в MySQL или Redis). Затем добавляется перехватчик запросов Feign («com.regent.rpush.route.loadbalancer.MessageRequestInterceptor»):
@Component
public class MessageRequestInterceptor implements RequestInterceptor {
/**
* Сохраняет идентификатор целевого сервера сокетов для текущей передачи сообщений
*/
static final ThreadLocal<String> SERVER_ID = new ThreadLocal<>();
@Autowired
private IRpushServerOnlineService rpushServerOnlineService;
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
@Override
public void apply(RequestTemplate requestTemplate) {
String url = requestTemplate.url();
String method = requestTemplate.method();
if (!"/push".equals(url) || !"POST".equals(method)) {
// Обрабатывать только интерфейс отправки сообщений
return;
}
// Если это отправка сообщения, необходимо отправить сообщение на принимающий конец подключенного сервера. В случае кластера серверов сокетов необходимо найти соответствующий сервер.
Строка body = новая строка (requestTemplate.body());
JSONObject jsonObject = новый JSONObject (body);
Строковый sendTo = jsonObject.getStr ("sendTo"); // Получить идентификатор целевого клиента
Строковая serverId = ""; // Найти соответствующий идентификатор сервера сокетов из redis или mysql
SERVER_ID.set (serverId); // Добавить в текущий поток
}
}
Этот «класс перехватчика» вместе с «классом правил» позволяет точно определить соответствующий сервер сокетов во время передачи сообщения от маршрутизатора к серверу сокетов. Полный «класс правил»:
/**
* Маршрутизатор->Запрос экземпляра сервера сокетов
*/
public class ServerBalancer extends ZoneAvoidanceRule {
@Override
public Server choose(Object o) {
try {
// Из класса перехватчика посмотреть, указан ли экземпляр сервера
Строковая serverId = MessageRequestInterceptor.SERVER_ID.get();
if (StringUtils.isEmpty(serverId)) {
// Если экземпляр сервера не указан, используйте алгоритм балансировки нагрузки по умолчанию
return super.choose(o);
}
// Если указан экземпляр сервера, это означает, что это передача сообщения, и использовать указанный экземпляр для отправки запроса на сервер сокетов
List<Server> servers = getLoadBalancer().getAllServers();
for (Сервер-сервер: серверы) {
if (StringUtils.equals (server.getId (), serverId)) {
вернуть сервер;
}
}
выбросить новое IllegalArgumentException («Нет доступных экземпляров RPUSH_SERVER»);
} наконец {
MessageRequestInterceptor.SERVER_ID.remove();
}
}
}
Имея эти два класса, код маршрутизатора для передачи сообщений на сервер сокетов также будет очень «чистым»:
@Компонент
public class RpushMessageHandler расширяет MessageHandler<RpushMessageDTO> {
// ...
@Override
public void handle(RpushMessageDTO param) {
Список <Строка> sendTos = param.getReceiverIds();
для (Строка sendTo: sendTos) {
// ...
messagePushService.push (build); // Маршрутизатор напрямую вызывает интерфейс запроса, «класс правил» и «класс перехватчиков» скрывают другую логику, поэтому здесь не нужно беспокоиться о том, будет ли он отправлен на неправильный сервер сокетов
}
}
} **Очередь.** В качестве асинхронной обработки в кольцевом буфере внутри службы маршрутизации используется Disruptor. Это позволяет максимально ускорить возврат интерфейса отправки сообщений. При высоком уровне параллелизма можно добавить Kafka, чтобы служба маршрутизации напрямую отслеживала сообщения Kafka и таким образом повышала общую производительность сервиса.
**Кэш.** Информация о состоянии подключения клиента может быть многоуровнево кэширована. То есть, внутренний кэш службы маршрутизации + кэш Redis. Конечно, чем больше добавляется уровней кэша, тем сложнее становится проблема согласованности кэша и тем больше ситуаций необходимо учитывать. Также необходимо принимать решение о развёртывании кластера для Redis в зависимости от конкретной ситуации.
**Мониторинг.** Для мониторинга состояния служб можно использовать Spring Boot Admin.
## Быстрое развёртывание сервиса Rpush с помощью docker-compose
```yml
version: '2'
services:
nginx:
image: nginx
container_name: nginx
ports:
- 80:80
volumes:
- /data/nginx/conf/nginx.conf:/etc/nginx/nginx.conf
- /data/nginx/log:/var/log/nginx
- /data/nginx/html:/usr/share/nginx/html
rpush-eureka:
image: shuangmulin/rpush-eureka
container_name: rpush-eureka
ports:
- 8761:8761
rpush-zuul:
image: shuangmulin/rpush-zuul
environment:
- eureka-service-ip=172.16.0.11
- eureka-service-port=8761
container_name: rpush-zuul
ports:
- 8124:8124
rpush-route:
image: shuangmulin/rpush-route
environment:
- eureka-service-ip=localhost
- eureka-service-port=8761
- jdbc.url=jdbc:mysql://localhost:3306/rpush?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
- jdbc.username=root
- jdbc.password=123456
- super-admin.username=superadmin
- super-admin.password=superadmin
- jwtSigningKey=fjksadjfklds
container_name: rpush-route
ports:
- 8121:8121
rpush-server:
image: shuangmulin/rpush-server
environment:
- eureka-service-ip=localhost
- eureka-service-port=8761
container_name: rpush-server
ports:
- 8122:8122
rpush-scheduler:
image: shuangmulin/rpush-scheduler
environment:
- eureka-service-ip=localhost
- eureka-service-port=8761
- jdbc.url=jdbc:mysql://localhost:3306/rpush?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
- jdbc.username=root
- jdbc.password=123456
- super-admin.username=superadmin
- super-admin.password=superadmin
- jwtSigningKey=fasdferear
container_name: rpush-scheduler
ports:
- 8123:8123
После запуска docker-compose up -d можно сразу получить доступ к порту 8124.
WeChat: zhongbl45
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )