1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/shuangmulin-rpush

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

Rpush: мультиплатформенная система для отправки сообщений

  • Мультиплатформенность: один интерфейс позволяет отправлять сообщения на различные платформы, включая WeChat, WeChat Work, DingTalk, электронную почту и другие.
  • Разделение логики: бизнес-логика отделена от логики платформы, что позволяет бизнесу сосредоточиться на выборе платформ и получателей, а не на деталях реализации.
  • Расширяемость: сильная расширяемость позволяет легко добавлять поддержку новых платформ с минимальными усилиями.
  • Поддержка веб-интерфейса: возможность отправлять сообщения через веб-интерфейс.
  • Возможность планирования: поддержка задач по расписанию.
  • Предварительные настройки: готовые шаблоны для сообщений.
  • Мгновенная связь: реализация мгновенной связи, поддерживающая горизонтальное масштабирование сервера.
  • Импорт получателей: возможность импортировать получателей.
  • Группировка получателей: разделение получателей на группы.
  • Журналирование сообщений: ведение журнала сообщений.

Онлайн-демонстрация

http://159.75.121.163/ admin admin

Поддерживаемые типы сообщений

  • Электронная почта.
  • WeChat Work — сообщения приложения:
    • текстовые;
    • с изображениями;
    • видео;
    • текст в карточке;
    • Markdown.
  • WeChat Work — групповые роботы:
    • текстовые;
    • с изображениями;
    • карточки с текстом и картинками;
    • Markdown.
  • WeChat Public Account:
    • текстовые;
    • карточки;
    • шаблоны.
  • DingTalk — рабочие уведомления:
    • текстовые;
    • Markdown;
    • ссылки;
    • карточки.
  • DingTalk — групповые роботы:
    • текстовые;
    • Markdown;
    • ссылки;
    • карточки;
    • FeedCard.

Архитектура Rpush обеспечивает простоту добавления поддержки новых типов сообщений для платформ, поэтому большая часть времени при расширении системы уходит на поиск документации по интеграции с новой платформой. В будущем планируется добавить поддержку других платформ и типов сообщений. Также приветствуется участие в расширении системы (добавление поддержки нового типа сообщений для платформы требует всего нескольких классов Java и не требует написания кода для интерфейса, обеспечивая все функции).

Локальная быстрая установка демонстрационной версии

Для локальной установки необходимо выполнить следующие шаги:

  1. Выполнить SQL-скрипт sql/表结构.sql для инициализации базы данных.
  2. После инициализации базы данных изменить конфигурацию подключения к базе данных в файле rpush-route/src/main/resources/application.yml на свои данные.
  3. Запустить следующие три класса:
    1. com.regent.rpush.eureka.EurekaServerApplication.
    2. com.regent.rpush.zuul.App.
    3. com.regent.rpush.route.RouteApplication.

После запуска можно открыть браузер и перейти по адресу http://localhost:8124, где будет доступна полная функциональность рассылки сообщений.

Демонстрация работы

Пример отправки одного типа сообщения

Пример отправки одного типа сообщения

Отправка сообщений с веб-интерфейса

Отправка сообщений с веб-интерфейса

Отправка сообщений через Postman

Отправка сообщений через Postman

Отправка сообщения с помощью кода

Следуя принципу «бизнес-сервисы отвечают только за отправку сообщений», код для отправки должен быть простым. Поэтому в Rpush отправка сообщения для одной платформы занимает всего одну строку кода, а для нескольких платформ — несколько строк. Например:

/**
 * @author shuangmulin
 * @since 2021/6/8/008 11:37
 **/
public class RpushSenderTest {
    /**
     * Содержание сообщения.
     */
    public static final String content = "Ваша встреча уже запланирована \n" +
            ">**Детали**: встреча \n" +
            ">Организатор: @miglioguan \n" +
            ">Участники: @miglioguan, @kunliu, @jamdeezhou, @kanexiong, @kisonwang \n" +
            "> \n" +
            ">Конференц-зал: TIT 1 этаж, комната 301, Гуанчжоу \n" +
            ">Дата: 2021 год, 18 мая \n" +
            ">Время: с 9:00 до 11:00 \n" +
            "> \n" +
            ">Пожалуйста, приходите вовремя. \n" +
            "> \n" +
            ">Если вам нужно изменить информацию о встрече, нажмите: [Изменить информацию о встрече](https://work.weixin.qq.com)";

    public static void main(String[] args) {
        // Сообщение в формате Markdown для WeChat Work
        MarkdownMessageDTO markdown = RpushMessage.WECHAT_WORK_AGENT_MARKDOWN().content(content).receiverIds(Collections.singletonList("ZhongBaoLin")).build();
        // Сообщение для группового робота WeChat Work
        TextMessageDTO text = RpushMessage.WECHAT_WORK_ROBOT_TEXT().content(content).receiverIds(Collections.singletonList("ZhongBaoLin")).build();
        // Электронное письмо
        EmailMessageDTO email = RpushMessage.EMAIL().title("Уведомление о встрече").content(content).build();
        RpushService.instance("baolin", "666666").sendMessage(markdown, text, email); // Введите имя пользователя и пароль, чтобы запустить
    }
}

Этот код отправляет три разных типа сообщений на разные платформы, и весь код занимает всего четыре или пять строк. Чтобы получить этот результат, достаточно добавить зависимость от rpush-sdk в Maven:


<project>
    <!-- Настройка репозитория jitpack.io -->
    <repositories>
        <repository>
            <id>jitpack.io</id>
            <url>https://jitpack.io</url>
        </repository>
    </repositories>

    <dependencies>
        <!-- Добавить зависимость rpush-sdk -->
        <dependency>
            <groupId>com.github.shuangmulin.rpush</groupId>
            <artifactId>rpush-sdk</artifactId>
            <version>v1.0.2</version>
        </dependency>
    </dependencies>
</project>

Мгновенная связь

Rpush предлагает гибкий подход к реализации мгновенной связи, позволяя выбирать между различными технологиями, такими как Netty, WebSocket, Comet и даже Bio. Здесь показаны примеры однопользовательского и группового обмена сообщениями между веб-страницей и клиентом командной строки, использующими WebSocket и Netty соответственно (код примера клиента доступен по ссылке: https://github.com/shuangmulin/rpush-client-sample). ### Некоторые ключевые точки расширения

1. Свободно расширяемые платформы сообщений и типы сообщений

В дизайне Rpush сообщения классифицируются как «платформы сообщений» и «типы сообщений», которые соответствуют следующим двум перечислениям:

/**
 * Перечисление платформ сообщений
 **/
public enum MessagePlatformEnum {

    EMAIL(EmailConfig.class, "электронная почта", "", "^[_a-z0-9-]+(\\.[_a-z0-9-]+)*@[a-z0-9-]+(\\.[a-z0-9-]+)*(\\.[a-z]{2,})$", true),
    WECHAT_WORK_AGENT(WechatWorkAgentConfig.class, "корпоративный WeChat - приложение для сообщений", "", "", true),
    WECHAT_WORK_ROBOT(WechatWorkRobotConfig.class, "корпоративный WeChat - групповой бот", "", "", true),
    WECHAT_OFFICIAL_ACCOUNT(WechatOfficialAccountConfig.class, "WeChat Public Account", "", "", true),
    DING_TALK_CORP(DingTalkCorpConfig.class, "Ding Talk - рабочее уведомление", "", "", true),
    RPUSH_SERVER(EmptyConfig.class, "rpush сервис", "", "", true);
}

/**
 * Перечисление типов сообщений
 **/
public enum MessageType {

    EMAIL("обычное письмо ", MessagePlatformEnum.EMAIL),
    RPUSH_SERVER("текст", MessagePlatformEnum.RPUSH_SERVER),

    // ================================корпоративный WeChat-приложение====================================
    WECHAT_WORK_AGENT_TEXT("текст", MessagePlatformEnum.WECHAT_WORK_AGENT),
    WECHAT_WORK_AGENT_IMAGE("изображение", MessagePlatformEnum.WECHAT_WORK_AGENT),
    WECHAT_WORK_AGENT_VIDEO("видео", MessagePlatformEnum.WECHAT_WORK_AGENT),
    WECHAT_WORK_AGENT_FILE("файл", MessagePlatformEnum.WECHAT_WORK_AGENT),
    WECHAT_WORK_AGENT_TEXTCARD("текстовая карточка", MessagePlatformEnum.WECHAT_WORK_AGENT),
    WECHAT_WORK_AGENT_NEWS("текст и изображение", MessagePlatformEnum.WECHAT_WORK_AGENT),
    WECHAT_WORK_AGENT_MARKDOWN("Markdown", MessagePlatformEnum.WECHAT_WORK_AGENT),

    // ================================корпоративный WeChat-групповой бот====================================
    WECHAT_WORK_ROBOT_TEXT("текст", MessagePlatformEnum.WECHAT_WORK_ROBOT),
    WECHAT_WORK_ROBOT_IMAGE("изображение", MessagePlatformEnum.WECHAT_WORK_ROBOT),
    WECHAT_WORK_ROBOT_NEWS("текст и изображение", MessagePlatformEnum.WECHAT_WORK_ROBOT),
    WECHAT_WORK_ROBOT_MARKDOWN("Markdown", MessagePlatformEnum.WECHAT_WORK_ROBOT),

    // ================================WeChat Public Account====================================
    WECHAT_OFFICIAL_ACCOUNT_TEXT("текст", MessagePlatformEnum.WECHAT_OFFICIAL_ACCOUNT),
    WECHAT_OFFICIAL_ACCOUNT_NEWS("текст и изображение", MessagePlatformEnum.WECHAT_OFFICIAL_ACCOUNT),
    WECHAT_OFFICIAL_ACCOUNT_TEMPLATE("шаблонное сообщение", MessagePlatformEnum.WECHAT_OFFICIAL_ACCOUNT),

    // ================================Ding Talk-рабочее уведомление====================================
    DING_TALK_COPR_TEXT("текст", MessagePlatformEnum.DING_TALK_CORP),
    DING_TALK_COPR_MARKDOWN("Markdown", MessagePlatformEnum.DING_TALK_CORP),
    DING_TALK_COPR_LINK("сообщение со ссылкой", MessagePlatformEnum.DING_TALK_CORP),
    DING_TALK_COPR_ACTION_CARD_SINGLE("карточка с одной кнопкой", MessagePlatformEnum.DING_TALK_CORP),
    DING_TALK_COPR_ACTION_CARD_MULTI("карточка с несколькими кнопками", MessagePlatformEnum.DING_TALK_CORP),
    DING_TALK_COPR_OA("OA сообщение", MessagePlatformEnum.DING_TALK_CORP)

;
}

Здесь в качестве примера рассматривается тип сообщения «Текст корпоративного WeChat — приложения». Предположим, что теперь необходимо реализовать этот тип сообщения в Rpush, шаги следующие:

  1. Определить класс конфигурации корпоративного WeChat, например:
/**
 * Конфигурация корпоративного WeChat
 **/
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class WechatWorkAgentConfig extends Config {
    private static final long serialVersionUID = -9206902816158196669L;

    @ConfigValue(value = "корпоративный ID", description = "см. эту страницу: https://work.weixin.qq.com/wework_admin/frame#profile")
    private String corpId;
    @ConfigValue(value = "секрет приложения")
    private String secret;
    @ConfigValue(value = "идентификатор агента приложения")
    private Integer agentId;

}

Поля внутри должны быть настроены в соответствии с требованиями соответствующей платформы. Например, здесь корпоративный WeChat требует только три поля для настройки. А каждое поле с аннотацией @ConfigValue используется для автоматической генерации страниц, то есть достаточно добавить эту аннотацию, чтобы автоматически создать соответствующие интерфейсы добавления, удаления, изменения и проверки (без написания кода интерфейса).

  1. В MessagePlatformEnum и MessageType добавить соответствующее перечисление, а именно WECHAT_WORK_AGENT(WechatWorkAgentConfig.class, «корпоративный WeChat — сообщение приложения», "", "", true) и WECHAT_WORK_AGENT_TEXT («текст», MessagePlatformEnum.WECHAT_WORK_AGENT). Здесь следует обратить внимание на то, что первый параметр перечисления платформы является классом конфигурации, определённым на первом шаге.

  2. Определить параметры сообщения корпоративного WeChat — текстового сообщения, например:

/**
 * DTO для отправки сообщения корпоративного WeChat
 **/
@EqualsAndHashCode(callSuper = true)
@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public class TextMessageDTO extends BaseMessage {
    private static final long serialVersionUID =
``` **2. Масштабируемый сокет-сервис rpush-server**

rpush-сервер как сокет-служба в основном поддерживает постоянное соединение с клиентами. Способность этой службы выдерживать нагрузку напрямую определяет, сколько пользователей может одновременно работать в системе обмена мгновенными сообщениями. Поэтому очевидно, что эта служба должна быть масштабируемой.

Предположим, развёрнуто 5 экземпляров сокет-службы, все они корректно настроены на использование Eureka в качестве центра регистрации. Чтобы реализовать кластеризацию сокет-служб, процесс подключения клиента к rpush-службе выглядит следующим образом:

1. Клиент запрашивает у маршрутизатора доступный IP-адрес и порт сервера сокетов.
2. Маршрутизатор выбирает доступный сервер сокетов на основе подходящего алгоритма балансировки нагрузки и возвращает клиенту его IP-адрес и номер порта.
3. Клиент устанавливает длительное соединение с выбранным сервером сокетов.
4. После успешного установления соединения соответствующий сервер сокетов сохраняет информацию о сеансе на уровне службы, а затем сообщает маршрутизатору об этом клиенте. Маршрутизатор сохраняет соответствие между клиентом и сервером сокетов.
5. Клиент и соответствующий сервер сокетов поддерживают определённую частоту пульсаций. Если пульс не проходит после разрыва соединения, клиент повторяет описанный выше процесс до тех пор, пока соединение снова не будет установлено.

После того как клиент подключён, другой клиент может отправить ему сообщение следующим образом:

1. Клиент запрашивает интерфейс отправки сообщений, предоставляемый маршрутизатором (это тот же интерфейс, который используется для отправки сообщений другим третьим сторонам, поэтому способ отправки сообщений одинаков).
2. Обработчик сообщений маршрутизатора (com.regent.rpush.route.handler.RpushMessageHandler) находит соответствующий сервер сокетов по идентификатору целевого клиента и отправляет ему сообщение.
3. Сервер сокетов находит целевого клиента из своего сохранённого сеанса и завершает отправку сообщения.

Реализовав этот процесс, можно добиться свободной кластеризации сокет-сервиса.

Описанный выше процесс несколько теоретизирован. Вот некоторые технические аспекты реализации:

**1. Как маршрутизатор использует подходящий алгоритм балансировки нагрузки для выбора доступного сервера сокетов?**

На самом деле это довольно просто. Сначала сервер сокетов предоставляет интерфейс для запроса своего собственного IP-адреса и номера порта. Затем маршрутизатор напрямую запрашивает этот интерфейс через Ribbon и реализует собственный класс правил балансировки нагрузки, чтобы выбрать сервер сокетов:

```java
/**
 * Маршрутизатор -> Запрос экземпляра сервера сокетов
 */
public class ServerBalancer extends ZoneAvoidanceRule {

    @Override
    public Server choose(Object o) {
        // ...
        // Используйте алгоритм балансировки нагрузки по умолчанию для выбора доступного сервера сокетов (алгоритм можно изменить в соответствии с фактическими бизнес-потребностями)
        return super.choose(o);
        //...
    }
}

В файле конфигурации этот «класс правил» настраивается следующим образом:

rpush-server:
  ribbon:
    NFLoadBalancerRuleClassName: com.regent.rpush.route.loadbalancer.ServerBalancer

Когда маршрутизатор запрашивает сервер сокетов, он проходит через этот «класс правил», который затем выбирает доступный сервер сокетов. В конечном итоге информация о номере порта и IP-адресе сервера сокетов также возвращается маршрутизатору через этот запрос. Конечно, этот класс правил выполняет не только эту задачу. Есть ещё одна проблема, которую должен решить этот класс.

2. Как маршрутизатор находит соответствующий сервер сокетов при отправке сообщений на основе идентификатора целевого клиента?

Во-первых, после того как клиент устанавливает соединение с определённым сервером сокетов, необходимо сохранить отношения между клиентом и этим сервером (например, в MySQL или Redis). Затем добавляется перехватчик запросов Feign («com.regent.rpush.route.loadbalancer.MessageRequestInterceptor»):


@Component
public class MessageRequestInterceptor implements RequestInterceptor {

    /**
     * Сохраняет идентификатор целевого сервера сокетов для текущей передачи сообщений
     */
    static final ThreadLocal<String> SERVER_ID = new ThreadLocal<>();

    @Autowired
    private IRpushServerOnlineService rpushServerOnlineService;

    @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
    @Override
    public void apply(RequestTemplate requestTemplate) {
        String url = requestTemplate.url();
        String method = requestTemplate.method();
        if (!"/push".equals(url) || !"POST".equals(method)) {
            // Обрабатывать только интерфейс отправки сообщений
            return;
        }

        // Если это отправка сообщения, необходимо отправить сообщение на принимающий конец подключенного сервера. В случае кластера серверов сокетов необходимо найти соответствующий сервер.
        Строка body = новая строка (requestTemplate.body());
        JSONObject jsonObject = новый JSONObject (body);
        Строковый sendTo = jsonObject.getStr ("sendTo"); // Получить идентификатор целевого клиента
        Строковая serverId = ""; // Найти соответствующий идентификатор сервера сокетов из redis или mysql
        SERVER_ID.set (serverId); // Добавить в текущий поток
    }
}

Этот «класс перехватчика» вместе с «классом правил» позволяет точно определить соответствующий сервер сокетов во время передачи сообщения от маршрутизатора к серверу сокетов. Полный «класс правил»:

/**
* Маршрутизатор->Запрос экземпляра сервера сокетов
*/
public class ServerBalancer extends ZoneAvoidanceRule {

    @Override
    public Server choose(Object o) {
        try {
            // Из класса перехватчика посмотреть, указан ли экземпляр сервера
            Строковая serverId = MessageRequestInterceptor.SERVER_ID.get();
            if (StringUtils.isEmpty(serverId)) {
                // Если экземпляр сервера не указан, используйте алгоритм балансировки нагрузки по умолчанию
                return super.choose(o);
            }
            // Если указан экземпляр сервера, это означает, что это передача сообщения, и использовать указанный экземпляр для отправки запроса на сервер сокетов
            List<Server> servers = getLoadBalancer().getAllServers();
            for (Сервер-сервер: серверы) {
                if (StringUtils.equals (server.getId (), serverId)) {
                    вернуть сервер;
                }
            }
            выбросить новое IllegalArgumentException («Нет доступных экземпляров RPUSH_SERVER»);
        } наконец {
            MessageRequestInterceptor.SERVER_ID.remove();
        }
    }
}

Имея эти два класса, код маршрутизатора для передачи сообщений на сервер сокетов также будет очень «чистым»:


@Компонент
public class RpushMessageHandler расширяет MessageHandler<RpushMessageDTO> {

    // ...
    @Override
    public void handle(RpushMessageDTO param) {
        Список <Строка> sendTos = param.getReceiverIds();

        для (Строка sendTo: sendTos) {
            // ...
            messagePushService.push (build); // Маршрутизатор напрямую вызывает интерфейс запроса, «класс правил» и «класс перехватчиков» скрывают другую логику, поэтому здесь не нужно беспокоиться о том, будет ли он отправлен на неправильный сервер сокетов
        }
    }
} **Очередь.** В качестве асинхронной обработки в кольцевом буфере внутри службы маршрутизации используется Disruptor. Это позволяет максимально ускорить возврат интерфейса отправки сообщений. При высоком уровне параллелизма можно добавить Kafka, чтобы служба маршрутизации напрямую отслеживала сообщения Kafka и таким образом повышала общую производительность сервиса.

**Кэш.** Информация о состоянии подключения клиента может быть многоуровнево кэширована. То есть, внутренний кэш службы маршрутизации + кэш Redis. Конечно, чем больше добавляется уровней кэша, тем сложнее становится проблема согласованности кэша и тем больше ситуаций необходимо учитывать. Также необходимо принимать решение о развёртывании кластера для Redis в зависимости от конкретной ситуации.

**Мониторинг.** Для мониторинга состояния служб можно использовать Spring Boot Admin.

## Быстрое развёртывание сервиса Rpush с помощью docker-compose

```yml
version: '2'
services:
  nginx:
    image: nginx
    container_name: nginx
    ports:
      - 80:80
    volumes:
      - /data/nginx/conf/nginx.conf:/etc/nginx/nginx.conf
      - /data/nginx/log:/var/log/nginx
      - /data/nginx/html:/usr/share/nginx/html
  rpush-eureka:
    image: shuangmulin/rpush-eureka
    container_name: rpush-eureka
    ports:
      - 8761:8761
  rpush-zuul:
    image: shuangmulin/rpush-zuul
    environment:
      - eureka-service-ip=172.16.0.11
      - eureka-service-port=8761
    container_name: rpush-zuul
    ports:
      - 8124:8124
  rpush-route:
    image: shuangmulin/rpush-route
    environment:
      - eureka-service-ip=localhost
      - eureka-service-port=8761
      - jdbc.url=jdbc:mysql://localhost:3306/rpush?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
      - jdbc.username=root
      - jdbc.password=123456
      - super-admin.username=superadmin
      - super-admin.password=superadmin
      - jwtSigningKey=fjksadjfklds
    container_name: rpush-route
    ports:
      - 8121:8121
  rpush-server:
    image: shuangmulin/rpush-server
    environment:
      - eureka-service-ip=localhost
      - eureka-service-port=8761
    container_name: rpush-server
    ports:
      - 8122:8122
  rpush-scheduler:
    image: shuangmulin/rpush-scheduler
    environment:
      - eureka-service-ip=localhost
      - eureka-service-port=8761
      - jdbc.url=jdbc:mysql://localhost:3306/rpush?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
      - jdbc.username=root
      - jdbc.password=123456
      - super-admin.username=superadmin
      - super-admin.password=superadmin
      - jwtSigningKey=fasdferear
    container_name: rpush-scheduler
    ports:
      - 8123:8123

После запуска docker-compose up -d можно сразу получить доступ к порту 8124.

Связаться с автором

WeChat: zhongbl45

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Описание недоступно Развернуть Свернуть
Apache-2.0
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/shuangmulin-rpush.git
git@api.gitlife.ru:oschina-mirror/shuangmulin-rpush.git
oschina-mirror
shuangmulin-rpush
shuangmulin-rpush
master