1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/hutu92-KMQueue

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

KMQueue

Данный фреймворк представляет собой распределённую очередь на основе Redis, которая отличается простотой и гибкостью.

Далее приводится краткое описание некоторых особенностей этой очереди. Если у вас есть дополнительные вопросы, вы можете обратиться к исходному коду и комментариям. В коде я добавил подробные комментарии.

Также вы можете создать issue для дополнительных вопросов.

История обновлений:

2018-01-23: добавлено обнаружение работоспособности для предотвращения повторного выполнения задач, которые были помещены в резервную очередь мониторинга и обнаружены как неудачные. Однако пользователь должен реализовать логику обнаружения работоспособности самостоятельно. В будущем планируется реализовать обнаружение работоспособности через Zookeeper.

Дизайн

Диаграмма последовательности

На основе Redis разработана архитектура распределённой очереди сообщений.

Режимы очередей

KMQueue имеет два режима очередей:

  • default — простая очередь;
  • safe — безопасная очередь.

По умолчанию используется режим default.

Режим очереди можно установить с помощью формата queueName:queueMode, где:

— queueName — имя очереди (default не требуется указывать, это значение по умолчанию). Характеристики: задачи в очереди могут быть потеряны, и задачи не имеют ограничения по времени ожидания.

— queueMode — режим очереди, допустимые значения: default, safe.

safe — это безопасная очередь, в которой задачи имеют стратегию повтора. Если задача не выполнена или время её жизни истекло (здесь время жизни означает AliveTimeout), Monitor отправит уведомление. Это позволяет выполнять некоторые действия в зависимости от бизнес-логики, например, сохранять неудавшиеся задачи в базе данных в качестве журнала. Конечно, у вас могут быть и другие способы обработки.

Обратите внимание: необходимо включить мониторинг резервной очереди, иначе задачи, которые не удалось выполнить в безопасной очереди, будут сохранены только в резервной очереди и не будут обработаны потребителями. Это может привести к опасным ситуациям.

new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe")
...

Здесь worker1_queue — простая очередь, а worker2_queue — безопасная очередь.

Обратите внимание: чтобы лучше поддерживать бизнес-процессы (например, изменение режима существующей очереди DEFAULT на SAFE и перезапуск службы), выполните следующие действия: Когда параметр имя очереди в new KMQueueManager.Builder содержит SAFE, будет создана резервная очередь (для мониторинга очереди задач, установки тайм-аута задач и повторной попытки неудачных задач). Имя резервной очереди основано на всех переданных именах очередей.

В приведённом выше примере стратегия создания резервной очереди:

base64(md5("worker1_queue" + "worker2_queue"))

Task (Задача)

Конструктор объявлен следующим образом:

public Task(String queue,
            String uid,
            boolean isUnique,
            String type,
            String data,
            Task.TaskStatus status)

— uid: если бизнес-требования требуют уникальности задач в очереди, вы должны сгенерировать uid самостоятельно. В противном случае очередь будет использовать uuid по умолчанию, что приведёт к тому, что даже задачи с одинаковыми данными будут обрабатываться как две разные задачи.

— Является ли задача уникальной, то есть в очереди одновременно существует только одна такая задача.

— type: используется для обработки бизнес-логики. Вы можете вызывать разные обработчики для разных типов задач type, или не передавать их.

KMQueueManager (Менеджер очередей)

Существует три способа получения соединения Redis. Подробные сведения см. в конструкторе KMQueueManager.Builder с тремя перегрузками. Если вы используете Spring, рекомендуется получить объект пула соединений Redis из Spring и создать менеджер очередей с помощью следующего конструктора:

public Builder(Pool<Jedis> pool, String... queues)

RedisTaskQueue (Очередь задач)

  1. Используется блокирующая очередь для блокировки (brpop) получения задач из очереди задач;
  2. Определяет, превышает ли время жизни задачи aliveTimeout;
  3. Обновляет время выполнения задачи и помещает её в начало резервной очереди (lpush).

BackupQueueMonitor (Монитор резервной очереди)

Поскольку резервная очередь инициализируется с использованием циклического маркера, монитор здесь использует стратегию периодической работы, используя brpoplpush backupQueue backupQueue для циклического обхода резервной очереди. При обнаружении циклического маркера цикл завершается.

Обработка задач, время ожидания которых истекло или которые не были выполнены (соответствует aliveTimeout и protectedTimeout соответственно):

  • Задача, время жизни которой истекло || задача, которая была повторно выполнена более RetryTimes раз: задача больше не повторяется, и она удаляется из резервной очереди после обработки. Соответственно, вы можете реализовать Pipeline для дополнительной обработки этих задач, такой как сохранение в базу данных для ведения журнала.

    // Обработка после окончательного сбоя задачи, необходимо реализовать интерфейс Pipeline и реализовать собственную логику обработки
    TaskPipeline taskPipeline = new TaskPipeline();
    BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName)
                    ...
                    .setPipeline(taskPipeline).build();
  • Время выполнения задачи истекло, но количество повторных попыток меньше RetryTimes: задача повторно помещается в очередь задач для выполнения, состояние задачи устанавливается на «retry», а количество повторных попыток увеличивается на 1.

Обнаружение работоспособности

Использование:

// Обнаружение работоспособности
MyAliveDetectHandler detectHandler = new MyAliveDetectHandler();
...
// Создание монитора прослушивателя
BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName)
        ...
        .registerAliveDetectHandler(detectHandler)
        .build();
// Запуск мониторинга
backupQueueMonitor.monitor();

Метод registerAliveDetectHandler() можно задать равным Null, чтобы отключить обнаружение работоспособности.

Проверяет, выполняется ли текущая задача (жива), чтобы предотвратить выполнение длительных задач (время выполнения задачи превышает время выполнения задачи, настроенное менеджером очередей — значение по умолчанию: com.kingsoft.wps.mail.queue.config.Constant.PROTECTED_TIMEOUT) резервным монитором очереди, который обнаружит задачу как неудачную и повторно выполнит её.

Используя этот механизм проверки, можно гарантировать, что задачи, возвращающие значение true для check(Task), не будут повторно выполняться резервным монитором очереди.

Здесь предоставляется только интерфейс, и пользователь должен сам реализовать проверку работоспособности выполнения задачи. На данный момент механизм обнаружения работоспособности находится на начальной стадии разработки, и основная логика обнаружения всё ещё требует реализации пользователем. Здесь предоставляется только интерфейс.

Один из простых способов реализации — запустить задание cron, которое каждые n миллисекунд проверяет состояние потока выполняемой задачи в redis, используя ключ «идентификатор задачи + ALIVE_KEY_SUFFIX», ttl равен n+m миллисекундам (m < n, m используется для обеспечения периода окна двух заданий), и помечает выполняемую задачу. Затем AliveDetectHandler реализует класс, проверяющий redis на наличие ключа на основе задачи, и возвращает true, если ключ существует. Вот перевод текста на русский язык:

Создание и отправка задачи

.setAliveTimeout(Constant.ALIVE_TIMEOUT)
    .build();
kmQueueManager.init();

TaskQueue taskQueue = kmQueueManager.getTaskQueue("worker2_queue");
JSONObject ob = new JSONObject();
ob.put("data", "mail proxy task");
String data = JSON.toJSONString(ob);

// Параметры:
// uid: если бизнес-логика требует уникальности задач в очереди, сгенерируйте uid самостоятельно. В противном случае очередь будет использовать стратегию uuid по умолчанию, что может привести к обработке двух разных задач, даже если данные полностью совпадают.
// isUnique: является ли задача уникальной, то есть существует только одна такая задача в очереди в данный момент времени. Этот параметр действует только для безопасных очередей; если он равен true, уникальность задачи определяется через uid.
// type: используется для обработки бизнес-логики. Вы можете вызывать разные обработчики для разных типов задач. Этот параметр можно не передавать.
Task task = new Task(taskQueue.getName(), "", true, "", data, new Task.TaskStatus());
taskQueue.pushTask(task);

Обработка задачи

@Test
public void popTaskTest() {
    KMQueueManager kmQueueManager = new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe")
        .setMaxWaitMillis(-1L)
        .setMaxTotal(600)
        .setMaxIdle(300)
        .setAliveTimeout(Constant.ALIVE_TIMEOUT)
        .build();
    kmQueueManager.init();

    TaskQueue taskQueue = kmQueueManager.getTaskQueue("worker1_queue");
    Task task = taskQueue.popTask();

    if (task != null) {
        task.doTask(kmQueueManager, MyTaskHandler.class);
    }
}

Вы можете реализовать свой собственный обработчик задач (TaskHandler), создать класс обработки задач, подходящий для вашей бизнес-логики, и выполнить задачу с помощью следующего кода:

task.doTask(kmQueueManager, TaskHandler.class)

Кроме того, метод doTask также поддерживает передачу параметров бизнес-логики через третий параметр:

task.doTask(kmQueueManager, TaskHandler.class, params)

Если обработка бизнес-логики вызывает исключение, очередь обрабатывает его как завершённую задачу и выполняет следующие действия:

public void doTask(KMQueueManager kmQueueManager, Class clazz, Object... params) {

    // Получение очереди задачи
    TaskQueue taskQueue = kmQueueManager.getTaskQueue(this.getQueue());
    String queueMode = taskQueue.getMode();
    if (KMQueueManager.SAFE.equals(queueMode)) { // Безопасная очередь
        try {
            handleTask(clazz, params);
        } catch (Throwable e) {
            e.printStackTrace();
        }
        // Задача выполнена, удалить соответствующую задачу из резервной очереди
        taskQueue.finishTask(this);
    } else { // Обычная очередь
        handleTask(class);
    }
}

Задача больше не будет повторно выполняться.

Резервное копирование и мониторинг очереди

@Test
public void monitorTaskTest() {

    MyAliveDetectHandler detectHandler = new MyAliveDetectHandler();
    MyPipeline pipeline = new MyPipeline();
    String backUpQueueName = KMQUtils.genBackUpQueueName("worker1_queue", "worker2_queue:safe");
    BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName)
        .setMaxWaitMillis(-1L)
        .setMaxTotal(600)
        .setMaxIdle(300)
        .setAliveTimeout(Constant.ALIVE_TIMEOUT)
        .setProtectedTimeout(Constant.PROTECTED_TIMEOUT)
        .setRetryTimes(Constant.RETRY_TIMES)
        .registerAliveDetectHandler(detectHandler)
        .setPipeline(pipeline).build();
    backupQueueMonitor.monitor();
}

Важно помнить: если вы указали режим очереди как безопасный, обязательно включите мониторинг резервной копии!

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Описание недоступно Развернуть Свернуть
Apache-2.0
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/hutu92-KMQueue.git
git@api.gitlife.ru:oschina-mirror/hutu92-KMQueue.git
oschina-mirror
hutu92-KMQueue
hutu92-KMQueue
master