1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/yu120-lemon-guide

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Middleware.md 420 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 30.11.2024 18:34 ec8a065

Middleware

Введение: сбор технических знаний, связанных с Redis, RocketMQ, Zookeeper, Netty, Tomcat и др.

SPI

SPI (Service Provider Interface) — это механизм обнаружения сервисов. Суть SPI заключается в том, что имена классов реализации интерфейса записываются в конфигурационном файле, а загрузчик сервиса считывает конфигурацию и загружает классы реализации. Это позволяет динамически заменять реализацию интерфейса во время выполнения программы, благодаря чему можно легко расширить функциональность программы с помощью механизма SPI.

Java SPI

В JDK есть функция SPI, основанная на классе java.util.ServiceLoader. Она позволяет получить доступ к нескольким файлам конфигурации реализаций в каталоге META-INF/services/. Чтобы решить проблему расширения функциональности, создадим файл com.github.yu120.test.SuperLoggerConfiguration в каталоге META-INF/services/, который будет содержать только одну строку с именем нашего класса реализации по умолчанию — com.github.yu120.test.XMLConfiguration (обратите внимание, что в одном файле можно указать несколько реализаций, разделяя их символами новой строки). Затем используем ServiceLoader для получения конфигурации механизма SPI:

// META-INF/services/com.github.test.test.SuperLoggerConfiguration:
com.github.yu120.test.XMLConfiguration

ServiceLoader<SuperLoggerConfiguration> serviceLoader = ServiceLoader.load(SuperLoggerConfiguration.class);
Iterator<SuperLoggerConfiguration> iterator = serviceLoader.iterator();
SuperLoggerConfiguration configuration;
while(iterator.hasNext()) {
        // 加载并 инициализируем реализацию класса
        configuration = iterator.next();
}
// Вызов метода configure для последней конфигурации
configuration.configure(configFile);

Dubbo SPI (https://github.com/apache/dubbo/tree/master/dubbo-common/src/main/java/org/apache/dubbo/common/extension)

Логика Dubbo SPI инкапсулирована в классе ExtensionLoader. Его метод getExtensionLoader используется для извлечения экземпляра ExtensionLoader из кэша или создания нового экземпляра, если кэш не содержит соответствующей записи. Основная идея Dubbo SPI проста:

  • Развязка расширений интерфейсов и классов реализации через конфигурацию.
  • Автоматическое внедрение зависимостей расширений через IOC.
  • Подтверждение фактического класса расширения через URL-параметры во время выполнения.

Конфигурационные файлы Dubbo SPI должны быть размещены в каталоге META-INF/dubbo. Пример конфигурации взят из документации Dubbo:

optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee

В отличие от конфигурации классов реализации в Java SPI, Dubbo использует пары ключ-значение для настройки, позволяя выбирать конкретные классы реализации при необходимости. Также необходимо добавить аннотацию @SPI к интерфейсу.

Motan SPI (https://github.com/weibocom/motan/tree/master/motan-core/src/main/java/com/weibo/api/motan/core/extension)

Motan использует SPI для обеспечения доступа между модулями на основе интерфейсов и имён. Это снижает связанность между модулями.

Реализация Motan SPI находится в motan-core/com/weibo/api/motan/core/extension. Структура каталогов выглядит следующим образом:

motan-core/com.weibo.api.motan.core.extension
    |-Activation:SPI的扩展功能,例如过滤、排序
    |-ActivationComparator:排序比较器
    |-ExtensionLoader:核心,主要负责SPI的扫描和加载
    |-Scope:模式枚举,单例、多例
    |-Spi:注解,作用在接口上,表明这个接口的实现可以通过SPI的形式加载
    |-SpiMeta:注解,作用在具体的SPI接口的实现类上,标注该扩展的名称

SpringBoot SPI

Файл конфигурации Spring SPI — это фиксированный файл META-INF/spring.factories, который работает аналогично Java SPI. Каждый интерфейс может иметь несколько реализаций. Использование простое:

// Получение всех фабрик, настроенных в spring.factories
List<LoggingSystemFactory>> factories = SpringFactoriesLoader.loadFactories(LoggingSystemFactory.class, classLoader);

Пример конфигурации в Spring Boot:

# Logging Systems
org.springframework.boot.logging.LoggingSystemFactory=\
org.springframework.boot.logging.logback.LogbackLoggingSystem.Factory,\
org.springframework.boot.logging.log4j2.Log4J2LoggingSystem.Factory

# PropertySource Loaders
org.springframework.boot.env.PropertySourceLoader=\
org.springframework.boot.env.PropertiesPropertySourceLoader,\
org.springframework.boot.env.YamlPropertySourceLoader

Redis

Тип Хранимые байты Биты Диапазон значений
int 4 32 -2^31 ~ 2^31-1
short 2 16 -2^15 ~ 2^15-1
long 8 64 -2^63 ~ 2^63-1
byte 1 8 -2^7 ~ 2^7-1, -128 ~ 127
float 4 32
double 8 64
boolean 1 8 true, false
char 2 16

Перевод:

В Java, независимо от того, является ли символ цифрой, английским или китайским иероглифом, он занимает два байта |

Примечание:

  • Английские цифры, буквы или символы: 1 символ = 1 байт.
  • Китайские цифры, буквы или символы: 1 символ = 2 байта.
  • Базовая единица компьютера: бит. Один бит представляет собой 0 или 1, один байт равен 8 битам.
  • 1 ТБ = 1024 ГБ, 1 ГБ = 1024 МБ, 1 МБ = 1024 КБ, 1 КБ = 1024 Б (байт), 1 Б = 8 б (бит).

Модель потока

На рисунке показана модель потока Redis.

Redis использует внутри себя однопоточный файловый обработчик событий File Event Handler, поэтому Redis называется однопоточной моделью. Он использует механизм мультиплексирования ввода-вывода I/O для одновременного прослушивания нескольких сокетов, помещая события, генерируемые сокетами, в очередь памяти. Диспетчер событий распределяет события по соответствующим обработчикам событий в зависимости от типа события на сокете. Файловый обработчик событий состоит из пяти частей:

— Несколько сокетов; — Программа мультиплексного ввода-вывода; — Очередь сокетов; — Диспетчер событий; — Обработчик событий (обработчик ответа соединения, обработчик запроса команды, обработчик ответа команды).

На следующем рисунке показан файловый обработчик событий Redis.

Процесс связи

Процесс связи между клиентом и Redis:

Тип запроса 1: клиент инициирует запрос на установление соединения. Сервер отвечает событием AE_READABLE, и после получения события серверной программы сокета программа мультиплексного ввода-вывода помещает этот сокет в очередь. Диспетчер событий извлекает сокет из очереди и передаёт его обработчику ответа соединения для создания доступного для клиента сокета socket01. Сокет socket01 связывается с обработчиком запроса команды. Обработчик запроса команды считывает ключ и значение из socket01 и устанавливает их в памяти. Сокет socket01 связан с событием AE_WRITABLE обработчика ответа команды.

Тип запроса 2: клиент отправляет запрос set key value. Socket socket01 генерирует событие AE_READABLE и помещается в очередь. Диспетчер событий получает сокет socket01 из очереди и связывает его с обработчиком запросов команд. Обработчик запросов команд считывает key и value из socket01 и сохраняет их в памяти. Событие AE_WRITABLE сокета socket01 связано с обработчиком ответов команд.

Тип запроса 3: сервер возвращает результат. Socket socket01 генерирует событие AE_WRITABLE и помещается в очередь. Диспетчер событий получает socket01 из очереди и связывает его с обработчиком ответа команд. Ответ обработчика команд записывает результат операции в socket01 (например, ok). Затем обработчик ответов команд отменяет связь между событием AE_WRITABLE socket01 и собой.

Файловый обработчик событий

Собственная сетевая система обработки событий, основанная на модели Reactor, была разработана для использования в качестве файлового обработчика событий (File Event Handler). Файловый обработчик событий использует программу мультиплексного ввода-вывода для одновременного мониторинга нескольких сокетов и связывает различные обработчики событий с соответствующими сокетами в соответствии с задачами сокета. Когда сокеты, за которыми ведётся наблюдение, готовы выполнить операции соединения (accept), чтения (read), записи (write) и закрытия (close), соответствующие события файла будут сгенерированы, и файловый обработчик событий вызовет соответствующий обработчик событий для обработки этих событий. Файловый обработчик событий работает в однопоточном режиме, но благодаря использованию программы мультиплексного ввода-вывода он может эффективно взаимодействовать с другими модулями Redis, работающими в однопоточном режиме. Это сохраняет простоту внутренней однопоточной архитектуры Redis.

Мультиплексный ввод-вывод

Мультиплексный ввод-вывод относится к сетевому вводу-выводу, а мультиплексный означает несколько TCP-соединений (то есть сокетов или каналов), что означает совместное использование одного или нескольких потоков. Смысл в том, чтобы обрабатывать несколько соединений одним или несколькими потоками. Основное преимущество заключается в снижении системных издержек и отсутствии необходимости создавать слишком много процессов/потоков или поддерживать эти процессы/потоки. Мультиплексная модель ввода-вывода использует два системных вызова (select/poll/epoll и recvfrom), блокирующий ввод-вывод вызывает только recvfrom; select/poll/epoll ядро может одновременно обрабатывать несколько подключений, а не быстрее, поэтому при небольшом количестве подключений производительность может быть не лучше, чем у многопоточного + блокирующего ввода-вывода. В модели мультиплексного ввода-вывода каждый сокет устанавливается как неблокирующий, а блокировка вызывается функцией select, а не блокировкой сокета.

Принцип работы функции select: При взаимодействии клиента с сервером генерируются три типа дескрипторов файлов (FD): writefds (запись), readfds (чтение) и exceptfds (исключения). Функция select блокирует мониторинг трёх типов дескрипторов файлов, ожидая данных, доступных для чтения, записи или исключения, или тайм-аута, а затем возвращается. После возврата дескрипторы файлов в наборе fdset проверяются один за другим, и готовые дескрипторы используются для выполнения операций ввода-вывода. Преимущества:

— Поддерживается почти на всех платформах, хорошая кроссплатформенная поддержка. Недостатки:

— Поскольку используется метод опроса, производительность снижается с увеличением количества дескрипторов файлов FD. — Каждый вызов select() требует копирования набора fd из пользовательского режима в режим ядра и обхода (передача сообщений всегда происходит из режима ядра в пользовательский режим). — По умолчанию существует ограничение в 1024 дескриптора файлов для каждого процесса открытия, которое можно изменить с помощью определения макроса, но эффективность всё ещё низкая.

Принцип работы функции poll аналогичен принципу работы select, оба используют метод опроса + обход; единственное отличие состоит в том, что функция poll не имеет ограничения на количество дескрипторов файлов.

Принцип работы epoll: Нет ограничения на количество FD, требуется только одно копирование пользовательского режима в режим ядра, и используется механизм уведомления о времени для запуска. Через функцию epoll_ctl зарегистрируйте FD, и когда FD будет готов, вызовите callback для активации соответствующего FD и выполните соответствующую операцию ввода-вывода. Причина высокой производительности epoll заключается в следующих трёх функциях:

— epoll_create(): при запуске системы в Linux ядро запрашивает объект B+ дерева, возвращает epoll объект, который также является дескриптором файла. — epoll_ctl(): каждый раз, когда создаётся новое соединение, оно управляется этой функцией в объекте epoll, добавляя, удаляя или изменяя дескриптор файла, связанный с соединением, и привязывая функцию обратного вызова. — epoll_wait(): циклически перебирает все обратные вызовы и выполняет соответствующие операции ввода-вывода.

Преимущества:

— Нет ограничений на количество FD, поддерживаемое операционной системой, максимальное количество дескрипторов файлов составляет около 100 000. — Эффективность повышается за счёт использования механизма обратного вызова вместо метода опроса, и производительность не снижается с увеличением числа FD. — Пользовательское пространство и пространство ядра совместно используют одну и ту же область памяти через mmap (mmap — это метод сопоставления файлов, то есть сопоставление файла или другого объекта с адресным пространством процесса).

Пример: если имеется 1 миллион соединений, из которых 10 000 активны, мы можем сравнить производительность select, poll и epoll:

— select: по умолчанию 1024, необходимо 977 процессов для поддержки 1 миллиона соединений, что приводит к значительному снижению производительности процессора. — poll: нет ограничений на максимальное количество дескрипторов файлов, для 1 миллиона подключений требуется 1 миллион дескрипторов, и все они должны быть пройдены, что также приводит к нехватке ресурсов памяти. — epoll: при поступлении запроса создайте дескриптор и привяжите функцию обратного вызова, просто нужно пройти через 10 000 активных соединений, высокая эффективность и отсутствие необходимости в копировании памяти.

Высокая эффективность

Почему Redis с однопоточным режимом всё ещё обладает такой высокой эффективностью?

— Чистые операции с памятью: данные хранятся в памяти, время отклика которой составляет примерно 100 наносекунд, что является важной основой для доступа Redis к миллиардам операций в секунду. — Неблокирующая модель мультиплексного ввода-вывода: Redis использует epoll в качестве реализации технологии мультиплексного ввода-вывода, а также объединяет события в Redis для преобразования операций соединения, чтения и записи в операции времени, избегая потери времени на ввод-вывод. — Реализация на языке C: близость к операционной системе обеспечивает более высокую скорость выполнения. — Избегание переключения контекста в одном потоке: избегание накладных расходов на переключение контекста, вызванных многопоточностью, и предотвращение проблем конкуренции, которые могут возникнуть в многопоточности.

Типы данных

Основные типы данных Redis и их применение:

— String (строка): кэш, счётчик, распределённая блокировка и т. д. — List (список): список, очередь, временная шкала списка подписчиков в социальных сетях и т. д. — Hash (хэш): информация о пользователе, хэш-таблица и т. д. — Set (набор): дедупликация, лайки, антипатии, общие друзья и т. д. — Zset (отсортированный набор): рейтинг популярности, рейтинг кликов и т. д. Redis и его структуры данных

Структура данных Hash в Redis позволяет изменять значение только одного свойства, как при обновлении атрибута в базе данных.

Практическое применение:

  • Кэширование: Redis используется для кэширования часто используемых данных, таких как строки, изображения или видео. В этом случае Redis выступает в роли слоя кэша, а MySQL — в качестве слоя хранения данных, что снижает нагрузку на чтение и запись в MySQL.
  • Счётчики: поскольку Redis является однопоточным, команды выполняются последовательно. При этом данные могут быть сразу записаны в другие источники данных.
  • Сессии: распространённым решением является использование Spring Session с Redis для реализации совместного использования сессий.

List (список)

List представляет собой двусвязный список. С помощью этой структуры можно легко реализовать функции, такие как отображение последних сообщений (например, TimeLine в Weibo). List также может использоваться в качестве очереди сообщений: задачи помещаются в очередь с помощью операции PUSH, а затем извлекаются и выполняются с помощью операции POP рабочими потоками. Redis предоставляет API для работы с определёнными элементами списка, позволяя выполнять запросы, удаление элементов из списка.

Практическое применение:

  • TimeLine: новые сообщения добавляются в список с помощью операции lpush, а последние сообщения отображаются с помощью операции rpop.
  • Очередь сообщений: задачи помещаются в список с использованием операции push, а рабочие потоки извлекают и выполняют задачи с помощью операции pop.

Set (множество)

Set представляет собой множество уникальных значений. Используя структуру данных Set в Redis, можно хранить данные, которые имеют свойство множества, например, список подписчиков пользователя в социальной сети. Благодаря тому, что Redis предлагает множество операций над множествами, таких как пересечение, объединение и разность, можно легко реализовывать функции, связанные с общими интересами, взаимными подписками и вторичными друзьями. Для всех операций над множествами можно выбрать, возвращать ли результат клиенту или сохранять его в новом множестве.

Практическое применение:

  • Общие друзья и вторичные друзья: можно использовать операции над множествами для определения общих друзей или вторичных друзей.
  • Уникальные IP-адреса: благодаря уникальности элементов множества можно подсчитать уникальные IP-адреса посетителей веб-сайта.
  • Рекомендации: при добавлении тегов к сообщениям или действиям можно использовать операцию пересечения множеств для рекомендации похожих действий или сообщений.

Sorted Set (упорядоченное множество)

В отличие от обычного множества, элементы в Sorted Sets имеют дополнительный атрибут — вес (score), который позволяет упорядочивать элементы по весу. Например, в списке учеников класса их имена могут выступать в качестве элементов, а оценки — в качестве веса. Это позволяет автоматически сортировать список по оценкам. Кроме того, Sorted Sets можно использовать для создания очередей с приоритетами, где обычные сообщения имеют вес 1, а важные сообщения — 2. Рабочие потоки могут выбирать задачи для выполнения в порядке убывания веса, обеспечивая приоритет важных задач.

Практическое применение:

  • Ранжированные списки: Sorted Sets широко используются для создания ранжированных списков, таких как рейтинги пользователей или игр.

HyperLogLog (основание статистики)

HyperLogLog используется для приблизительного подсчёта количества уникальных элементов. Вместо хранения каждого элемента, он использует вероятностный алгоритм, основанный на хранении позиции первого бита, равного 1, в хэше элемента. HyperLogLog позволяет выполнять подсчёт с минимальными затратами памяти.

Команды:

Команда Действие
pfadd key element ... Добавление всех элементов в ключ
pfcount key Подсчёт оценочного значения ключа (не точное)
pgmerge new_key key1 key2 ... Объединение ключей в новый ключ

Пример использования: Как подсчитать количество разных аккаунтов, посетивших главную страницу Google за день? Для такого большого сайта, как Google, точное количество посещений не имеет большого значения, поэтому можно ограничиться приблизительным подсчётом. Для решения этой задачи можно использовать HashMap, BitMap или HyperLogLog.

Сравнение решений:

  • HashMap: простое решение с высокой точностью, подходит для небольших объёмов данных, но требует много памяти для больших объёмов.
  • BitMap: использует алгоритм битовой карты, также обеспечивает высокую точность, но занимает меньше памяти, чем HashMap. Однако для больших объёмов данных всё равно требуется много памяти.
  • HyperLogLog: имеет некоторую погрешность, но использует мало памяти (около 12 КБ) и может обрабатывать до 2^64 элементов. Рекомендуется для подобных задач.

Geo (географическая информация)

Geo используется для хранения и обработки географической информации, такой как местоположение. Redis позволяет хранить Geo-информацию в упорядоченных множествах (zset), используя Geohash для индексации. Команды:

Команда Действие
geoadd key latitude longitude member Добавление местоположения (широты, долготы и имени) в ключ
geopos key member ... Получение координат местоположения
geodist key member1 member2 [unit] Вычисление расстояния между двумя местоположениями. Если одно из местоположений не существует, возвращается пустое значение
georadius Поиск по диапазону координат
georadiusbymember Поиск по радиусу вокруг местоположения члена
geohash Вычисление геохеша для широты и долготы

GEORADIUS:

GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count]

Возвращает все элементы в ключе, находящиеся в радиусе, заданном широтой, долготой и радиусом. Единицы измерения радиуса:

  • m — метры;
  • km — километры;
  • mi — мили;
  • ft — футы.

Опции:

  • WITHDIST: возвращает расстояние между центром и каждым элементом. Единица измерения соответствует радиусу.
  • WITHCOORD: возвращает широту и долготу каждого элемента.
  • WITHHASH: возвращает геохеш каждого элемента в виде 52-битного целого числа со знаком. Используется для внутренних целей или отладки.

По умолчанию GEORADIUS возвращает неупорядоченный список элементов. Можно указать порядок сортировки с помощью параметров ASC (от ближнего к дальнему) или DESC (от дальнего к ближнему).

Опция COUNT <count> позволяет ограничить количество возвращаемых элементов. Если данные не конфиденциальны и могут быть сгенерированы заново из других источников, можно отключить сохранение на диск.

  • Если данные важны и можно допустить потерю нескольких минут данных, например, кэш, достаточно использовать RDB.
  • Если это данные в памяти, используйте Redis с сохранением на диск, рекомендуется включить и RDB, и AOF.
  • Если используется только AOF, предпочтительно выбрать конфигурацию everysec, так как она обеспечивает баланс между надёжностью и производительностью.

Когда включены оба режима сохранения (RDB и AOF), Redis будет использовать AOF для восстановления данных, поскольку файлы, сохранённые AOF, более полные.

RDB режим (снимок данных)

RDB (Redis Database Backup File, файл резервной копии базы данных Redis) — это метод сохранения всех пар ключ-значение в базе данных Redis в виде снимка данных через определённые промежутки времени. В определённый момент данные записываются во временный файл, а после завершения процесса сохранения этот файл заменяет предыдущий файл сохранения.

Преимущества:

  • RDB-снимок представляет собой сжатый и компактный файл. Он подходит для резервного копирования данных и аварийного восстановления.
  • Максимальная производительность Redis. При сохранении RDB-файла серверный процесс должен только создать дочерний процесс для создания файла, родительский процесс не выполняет операции ввода-вывода.
  • Восстановление больших наборов данных происходит быстрее по сравнению с AOF.

Недостатки:

  • Безопасность данных RDB ниже, чем у AOF, процесс сохранения всего набора данных более сложен. В зависимости от конфигурации может потребоваться несколько минут для создания снимка, и если сервер выйдет из строя, могут быть потеряны данные за несколько минут.
  • Для больших наборов данных создание дочернего процесса для создания RDB-снимка требует значительных ресурсов процессора и времени.

Создание

При сохранении данных Redis программа сохраняет текущее состояние базы данных в памяти на диске. Создание RDB-файлов включает два основных Redis-команды: SAVE и BGSAVE.

Загрузка

Во время загрузки RDB-файла сервер находится в заблокированном состоянии до завершения загрузки.

save — синхронная команда сохранения

Команда save является синхронной операцией. Во время выполнения команды сервер блокируется, отказывая клиентам в отправке командных запросов.

Процесс выполнения: клиент отправляет команду сохранения на сервер, сервер создаёт дочерний процесс, который записывает данные на диск, после чего сервер разблокируется и продолжает обработку клиентских запросов. Если серверу необходимо сохранить большой объём данных, выполнение команды save может занять значительное время, блокируя все клиентские запросы. Поэтому команда save редко используется в производственной среде. Вместо неё обычно применяется команда BGSAVE. Если сохранение данных с помощью BGSAVE завершается неудачно, команда save используется для сохранения последних данных.

bgsave — асинхронная команда сохранения

Команда bgsave является асинхронной операцией. При выполнении команды дочерний процесс сохраняет данные, в то время как основной поток сервера продолжает обрабатывать клиентские запросы.

Процесс выполнения: сервер создаёт дочерний процесс и сохраняет данные на диск. Основной поток продолжает обслуживать клиентские запросы, пока дочерний процесс завершает сохранение данных. Если сохранение успешно завершено, клиент может проверить результат с помощью команды LASTSAVE.

Автоматическое сохранение

С помощью конфигурационного файла можно настроить Redis таким образом, чтобы он автоматически сохранял данные при выполнении определённых условий, таких как изменение определённого количества ключей за заданный промежуток времени:

# RDB автоматическое сохранение правил
# Автоматически сохранять данные при изменении хотя бы одного ключа за 900 секунд
save 900 1
# Автоматически сохранять данные при изменении как минимум 10 ключей за 300 секунд
save 300 10
# Автоматически сохранять данные при изменении более 10 000 ключей за 60 секунд
save 60 10000

# Имя файла сохранения RDB
dbfilename dump-<port>.rdb

# Каталог хранения файлов сохранения
dir /var/lib/redis

# Останавливать ли запись при ошибке bgsave, обычно да
stop-writes-on-bgsave-error yes

# Использовать ли сжатие при сохранении RDB
rdbcompression yes

# Проверять ли контрольную сумму при сохранении RDB, обычно да
rdbchecksum yes

Стандартная конфигурация

Стандартная конфигурация RDB выглядит следующим образом:

################################ SNAPSHOTTING  ################################
#
# Сохранять базу данных на диск:
# Автоматическое сохранение при изменении определённого количества ключей в течение заданного времени
#  save <seconds> <changes>
# 
save 900 1
save 300 10
save 60 10000
#bgsave发生错误时是否停止写入,一般为yes
stop-writes-on-bgsave-error yes
#持久化时是否使用LZF压缩字符串对象?
rdbcompression yes
#是否对rdb文件进行校验和检验,通常为yes
rdbchecksum yes
# RDB持久化文件名
dbfilename dump.rdb
#持久化文件存储目录
dir ./

AOF режим (добавление журнала)

AOF (Append Only File, добавочный журнал) — это способ сохранения всех командных запросов в виде журнала в файле. Redis сначала выполняет команду, затем записывает её в журнал. Поскольку этот метод является только добавлением, нет накладных расходов на поиск диска, что делает его быстрым и похожим на binlog MySQL. AOF больше подходит для горячего резервирования.

Преимущества:

  • Данные более полные и безопасные, потеря данных составляет секунды (в зависимости от стратегии fsync, если используется стратегия everysec, максимальная потеря данных — одна секунда).
  • Файл AOF представляет собой добавляемый журнал, записи в котором сохраняются в формате протокола Redis и являются читаемыми, что удобно для быстрого восстановления после случайного удаления.

Недостатки:

  • Для одинаковых наборов данных файл AOF может быть больше, чем файл RDB, а восстановление данных может занимать больше времени.
  • В зависимости от используемой стратегии fsync скорость AOF может быть медленнее, чем RDB. Однако в большинстве случаев производительность fsync каждую секунду остаётся очень высокой.

Процесс сохранения

Если AOF включён, после выполнения каждой команды сервер добавляет её в буфер aof_buf. Затем перед завершением каждого события сервер вызывает функцию flushAppendOnlyFile, которая решает, следует ли записать содержимое буфера aof_buf в файл AOF и сохранить его на диск. Функция flushAppendOnlyFile управляется параметром appendfsync сервера, который может принимать значения always, everysec или no. Эти значения определяют стратегию сохранения данных:

  • always: каждая команда записывается на диск с использованием fsync. Это гарантирует, что данные не будут потеряны, но может замедлить работу сервера.
  • everysec: содержимое буфера записывается на диск каждую секунду. Это компромисс между скоростью и надёжностью.
  • no: операционная система определяет, когда записывать данные на диск. Это наиболее быстрый вариант, но данные могут быть потеряны при сбоях системы.

Восстановление данных из AOF-журнала выполняется путём анализа и выполнения команд, записанных в журнале. Создаётся новый клиент без сетевого подключения, и команды выполняются последовательно до тех пор, пока весь журнал не будет обработан.

Перезапись файла

Перезапись файла необходима для предотвращения его чрезмерного увеличения. Новый файл создаётся без потери данных, так как каждая пара ключ-значение записывается одной командой вместо множества команд, которые могли использоваться ранее.

Переписывание происходит в фоновом режиме, чтобы не блокировать основной процесс сервера. Во время переписывания сервер продолжает выполнять клиентские команды, добавляя новые команды в новый буфер и сохраняя их в старом буфере. Redis 5.0.3 (00000000/0) 64 bit

.- .-```. ```\/ _.,_ ''-._ ( ' , .-` | `, ) Running in standalone mode |`-._`-...-` __...-.-.|'_.-'| Port: 6380 | -. ._ / _.-' | PID: 93825 -._ -._ -./ _.-' _.-' |-.-._ -.__.-' .-'.-'| | -._-. .-'.-' | http://redis.io -._ -.-.__.-'_.-' _.-' |-.-._ -..-' .-'.-'| | -._-._ .-'.-' | -._ -._-.__.-'_.-' _._-' -._ -..-' .-' -. _.-' `-.__.-'

主从复制 (Replication)

Redis主从配置非常简单,过程如下(演示情况下主从配置在一台电脑上):

第一步: 复制两个redis配置文件(启动两个redis,只需要一份redis程序,两个不同的redis配置文件即可)

mkdir redis-master-slave
cp path/to/redis/conf/redis.conf path/to/redis-master-slave master.conf
cp path/to/redis/conf/redis.conf path/to/redis-master-slave slave.conf

第二步: 修改配置

## master.conf
port 6379

## slave.conf
port 6380
slaveof 127.0.0.1 6379

第三步: 分别启动两个redis

redis-server path/to/redis-master-slave/master.conf
redis-server path/to/redis-master-slave/slave.conf

启动之后,打开两个命令行窗口,分别执行 telnet localhost 6379telnet localhost 6380,然后分别在两个窗口中执行 info 命令,可以看到:

# Replication
role:master

# Replication
role:slave
master_host:127.0.0.1
master_port:6379

主从配置没问题。然后在master 窗口执行 set 之后,到slave窗口执行get,可以 get到,说明主从同步成功。这时,我们如果在slave窗口执行 set ,会报错:

-READONLY You can't write against a read only replica.

因为从节点是只读的。

哨兵 (Sentinel)

Sentinel是用来监控主从节点的健康情况。客户端连接Redis主从的时候,先连接Sentinel,Sentinel会告诉客户端主Redis的地址是多少,然后客户端连接上Redis并进行后续的操作。当主节点挂掉的时候,客户端就得不到连接了因而报错了,客户端重新向Sentinel询问主master的地址,然后客户端得到了[新选举出来的主Redis],然后又可以愉快的操作了。

哨兵 sentinel 配置

为了说明 sentinel 的用处,我们做个试验。配置 3 个 redis (1 主 2 从),1 个哨兵。步骤如下:

mkdir redis-sentinel
cd redis-sentinel
cp redis/path/conf/redis.conf path/to/redis-sentinel/redis01.conf
cp redis/path/conf/redis.conf path/to/redis-sentinel/redis02.conf
cp redis/path/conf/redis.conf path/to/redis-sentinel/redis03.conf
touch sentinel.conf

上我们创建了 3 个 redis 配置文件,1 个哨兵配置文件。我们将 redis01 设置为 master,将redis02,redis03 设置为 slave。

vim redis01.conf
port 63791

vim redis02.conf
port 63792
slaveof 127.0.0.1 63791

vim redis03.conf
port 63793
slaveof 127.0.0.1 63791

vim sentinel.conf
daemonize yes
port 26379
sentinel monitor mymaster 127.0.0.1 63793 1   # 下面解释含义

上面的主从配置都熟悉,只有哨兵配置 sentinel.conf,需要解释一下:

mymaster        # 为主节点名字,可以随便取,后面程序里边连接的时候要用到
127.0.0.1 63793 # 为主节点的 ip,port
1               # 后面的数字 1 表示选举主节点的时候,投票数。1表示有一个 sentinel 同意即可升级为 master

启动哨兵

上面我们配置好了 redis 主从,1 主 2 从,以及 1 个哨兵。下面我们分别启动 redis,并启动哨兵:

redis-server path/to/redis-sentinel/redis01.conf
redis-server path/to/redis-sentinel/redis02.conf
redis-server path/to/redis-sententer/redis03.conf

redis-server path/to/redis-sentinel/sentinel.conf --sentinel

启动之后,可以分别连接到 3 个 redis 上,执行 info 查看主从信息。

模拟主节点宕机情况

运行上面的程序(注意,在实验这个效果的时候,可以将 sleep 时间加长或者 for 循环增多,以防程序提前停止,不便看整体效果),然后将主 redis 关掉,模拟 redis 挂掉的情况。现在主 redis 为 redis01,端口为 63791。

redis-cli -p 63791 shutdown

集群 (Cluster)

上述所做的这些工作只是保证了数据备份以及高可用,目前为止我们的程序一直都是向 1 台 redis 写数据,其他的 redis 只是备份而已。实际场景中,单个 redis 节点可能不满足要求,因为:

  • 单个 redis 并发有限
  • 单个 redis 接收所有数据,最终回导致内存太大,内存太大回导致 rdb 文件过大,从很大的 rdb 文件中同步恢复数据会很慢

所以需要 redis cluster 即 redis 集群。Redis 集群是一个提供在多个 Redis 间节点间共享数据的程序集。Redis 集群并不支持处理多个 keys 的命令,因为这需要在不同的节点间移动数据,从而达不到像 Redis 那样的性能, 在高负载的情况下可能会导致不可预料的错误。Redis 集群通过分区来提供一定程度的可用性,在实际环境中当某个节点宕机或者不可达的情况下继续处理命令.。Redis 集群的优势:

  • 自动分割数据到不同的节点上
  • 整个集群的部分节点失败或者不可达的情况下能够继续处理命令

为了配置一个 redis cluster, 我们需要准备至少 6 台 redis,为啥至少 6 台呢?我们可以在 redis 的官方文档中找到如下一句话:

Note that the minimal cluster that works as expected requires to contain at least three master nodes. Тема: Если в Redis есть 100 миллионов ключей, и из них 1 миллион начинается с определённого префикса, как их найти?

Используя команду keys, можно получить список ключей, соответствующих заданному шаблону. Затем собеседник спрашивает: если Redis обслуживает онлайн-сервисы, какие проблемы могут возникнуть при использовании команды keys? В этом случае необходимо ответить, что Redis является однопоточным. Команда keys может привести к блокировке потока на некоторое время, в результате чего онлайн-сервис будет недоступен до завершения выполнения команды. В этой ситуации можно использовать команду scan, которая позволяет извлекать список ключей без блокировки, но с некоторой вероятностью дублирования. На клиенте можно выполнить дедупликацию, но общее время выполнения будет больше, чем при использовании команды keys.

Kafka

RocketMQ

RocketMQ — это распределённое промежуточное ПО для обмена сообщениями от Alibaba. Оно поддерживает транзакции, упорядоченные сообщения, пакетные сообщения, сообщения с таймером и сообщения с возможностью отката. RocketMQ имеет несколько концепций, отличающихся от стандартных систем обмена сообщениями, таких как группы, темы и очереди. Система состоит из производителей, потребителей, брокеров и серверов имён.

Преимущества:

— Сглаживание пиковых нагрузок: решение проблем потери сообщений и сбоя системы из-за кратковременных всплесков нагрузки. — Развязка приложений: решение проблемы зависимости между системами разной важности и производительности, приводящей к каскадным сбоям. — Повышение производительности: отправка одного сообщения в систему обмена сообщениями для уведомления связанных систем. — Буферизация потоков: возможность тестирования систем с нестабильными каналами связи путём накопления сообщений. — Асинхронная обработка: сокращение времени отклика за счёт исключения синхронных вызовов удалённых процедур.

Архитектура:

Модель развёртывания:

img

Роли:

Брокер:

— Понимание как сам RocketMQ. — Брокеры принимают и отправляют сообщения от производителей и потребителей. — Брокер периодически отправляет информацию на сервер имён. — Является сервером хранения и пересылки сообщений. — Каждый брокер при запуске находит все серверы имён, устанавливает с ними постоянное соединение и регистрирует свою информацию. После этого он периодически отправляет обновления.

Сервер имён:

— Аналогичен Zookeeper, но реализован самостоятельно. Предоставляет функции маршрутизации, регистрации сервисов и обнаружения сервисов. Это узел без состояния. — Сервер имён основан на Netty и предоставляет маршрутизацию, регистрацию сервисов и обнаружение сервисов. Он не сохраняет состояние и не имеет главного и резервного серверов. — Серверы имён периодически отправляют обновления друг другу для обеспечения высокой доступности. — Информация о брокерах и темах хранится в памяти сервера имён и не сохраняется постоянно.

Производитель:

— Создаёт сообщения. — Выбирает случайный сервер имён и устанавливает постоянное соединение для получения информации о темах. — Устанавливает постоянное соединение с главным сервером для отправки сообщений. Периодически отправляет обновления главному серверу.

Потребитель:

— Получает сообщения. — Использует сервер имён для поиска тем. — Подключается к брокерам для получения сообщений. Поскольку сообщения могут быть отправлены как главным, так и подчинённым сервером, потребитель подключается к обоим.

Основные процессы:

— Все брокеры регистрируются на серверах имён. — Производители получают информацию о темах от серверов имён. — Производители устанавливают постоянные соединения с главными серверами для отправки сообщений и периодически отправляют им обновления. — Потребители получают информацию о темах от серверов имён и подключаются к брокерам. — Потребители подключаются как к главным, так и к подчинённым серверам брокеров для получения сообщений.

Принцип работы:

RocketMQ состоит из серверов имён, производителей, потребителей и нескольких брокеров (процессов RocketMQ). Принцип работы следующий:

— При запуске брокеры находят все серверы имён и устанавливают с ними постоянные соединения. Они также отправляют обновления каждые 30 секунд. — Когда производители отправляют сообщения, они получают информацию о теме от сервера имён. — Производители отправляют сообщения на главный сервер, с которым они установили постоянное соединение. Они периодически отправляют ему обновления. — Потребители используют сервер имён для получения информации о теме и подключения к брокерам. Они подключаются как к главному, так и к подчинённому серверу брокера для получения сообщений. Ядро дизайна

Сообщение очистки

После того как сообщение в брокере было использовано, оно не удаляется сразу. Каждое сообщение сохраняется в CommitLog, и каждый потребитель, подключенный к брокеру, сохраняет информацию о ходе потребления. После использования сообщения обновляется только ход потребления текущего потребителя (смещение в CommitLog). По умолчанию файлы CommitLog, которые больше не используются, удаляются через 48 часов:

  • проверяется время последнего доступа к файлу;
  • если оно превышает срок хранения, файл удаляется;
  • по умолчанию удаление происходит в 4 часа утра.
/**
 * {@link org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isTimeToDelete()}
 */
private boolean isTimeToDelete() {
    // when = "04";
    String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
    // 是04点,就返回true
    if (UtilAll.isItTimeToDo(when)) {
        return true;
    }
 // 不是04点,返回false
    return false;
}

/**
 * {@link org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles()}
 */
private void deleteExpiredFiles() {
    // isTimeToDelete() этот метод — это проверка, действительно ли сейчас 4 часа, и если да, то выполнение удаления логики.
    if (isTimeToDelete()) {
        // 默认 72, но broker конфигурационный файл по умолчанию изменил на 48, поэтому новые версии все 48.
        long fileReservedTime = 48 * 60 * 60 * 1000;
        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(72 * 60 * 60 * 1000, xx, xx, xx);
    }
}
                                                                       
/**
 * {@link org.apache.rocketmq.store.CommitLog#deleteExpiredFile()}
 */
public int deleteExpiredFile(xxx) {
    // Этот метод основной логики — это перебрать поиск последнего изменения времени + срок действия, меньше текущего системного времени, удалить его (то есть меньше 48 часов).
    return this.mappedFileQueue.deleteExpiredFileByTime(72 * 60 * 60 * 1000, xx, xx, xx);
}

Push или pull

RocketMQ не имеет истинного смысла push, всё это pull, хотя есть класс push, но фактическая реализация нижнего уровня использует длинный опрос (длинный опрос), то есть способ извлечения. Атрибут Broker longPollingEnable указывает, включён ли длинный опрос, по умолчанию он включён. Исходный код выглядит следующим образом:

// {@link org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage()}

// Извлекает сообщение, помещает результат в pullCallback.
this.pullAPIWrapper.pullKernelImpl(pullCallback);

Почему нужно активно извлекать сообщения вместо использования механизма событий?

Механизм событий заключается в установлении длинного соединения, и данные передаются способом события (отправка данных) для реального времени. Если брокер активно отправляет сообщения, возможно, скорость отправки сообщений будет быстрой, а скорость потребления медленной, что может привести к накоплению сообщений на стороне потребителя и невозможности их использования другими потребителями. Метод извлечения позволяет потребителю извлекать в соответствии с текущей ситуацией и не создаёт слишком большого давления, приводящего к узким местам. Поэтому используется метод извлечения.

Балансировка нагрузки

RocketMQ реализует балансировку нагрузки путём распределения тем по нескольким брокерам.

На стороне производителя:

  • отправитель указывает очередь сообщений для отправки сообщений соответствующему брокеру для обеспечения баланса нагрузки при записи;
  • повышение пропускной способности при записи, когда несколько производителей одновременно записывают данные в один брокер, производительность снижается;
  • сообщения распределяются по нескольким брокерам для подготовки к потреблению.

По умолчанию стратегия — случайный выбор:

  • производитель поддерживает индекс;
  • каждый раз, когда берётся узел, индекс увеличивается сам по себе;
  • индекс делится на количество узлов;
  • встроенная отказоустойчивая стратегия.

Другие реализации:

  • SelectMessageQueueByHash — хэш передаваемых аргументов;
  • SelectMessageQueueByRandom — случайный выбор;
  • SelectMessageQueueByMachineRoom — не реализовано.

Также можно настроить реализацию интерфейса MessageQueueSelector в методе выбора.

MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);

На стороне потребителя:

используется алгоритм равномерного распределения для балансировки нагрузки.

Другие алгоритмы балансировки нагрузки:

  • равномерное распределение (по умолчанию) (AllocateMessageQueueAveragely);
  • циклическое распределение (AllocateMessageQueueAveragelyByCircle);
  • настраиваемая стратегия распределения (AllocateMessageQueueByConfig);
  • стратегия распределения по машинному залу (AllocateMessageQueueByMachineRoom);
  • согласованное хеширование (AllocateMessageQueueConsistentHash);
  • близость машинного зала (AllocateMachineRoomNearby).

Что произойдёт, если потребитель и очередь не сбалансированы?

Потребитель и очередь будут приоритетно равномерно распределены. Если потребителей меньше, чем очередей, некоторые потребители будут использовать несколько очередей. Если количество потребителей равно количеству очередей, каждый потребитель будет использовать одну очередь. Если число потребителей больше числа очередей, некоторые потребители останутся без дела.

Лучшие практики

Производитель:

  • Topic — сообщения классифицируются по различным бизнес-сообщениям.

  • Tag — используется для дальнейшей классификации сообщений в определённой теме, сообщения отправляются с этим атрибутом.

  • key — каждый сообщение имеет уникальный идентификатор на уровне бизнеса, который должен быть установлен в поле keys для облегчения поиска содержимого сообщения и того, кто его потребляет. Поскольку это хеш-индекс, убедитесь, что ключ максимально уникален, чтобы избежать потенциального конфликта хешей.

  • журнал — необходимо регистрировать успешную отправку или неудачу сообщения, обязательно регистрируйте send result и поле ключа.

  • send — метод отправки сообщения. Пока не возникает исключение, отправка считается успешной. Однако успешная отправка имеет несколько состояний, определённых в sendResult.

    • SEND_OK — отправка сообщения прошла успешно.
    • FLUSH_DISK_TIMEOUT — отправка сообщения прошла успешно, но серверу не удалось записать на диск вовремя, сообщение уже находится в очереди сервера, только если сервер выйдет из строя, сообщение будет потеряно.
    • FLUSH_SLAVE_TIMEOUT — сообщение отправлено успешно, но синхронизация с Slave не удалась вовремя, сообщение уже в очереди сервера, только если сервер выходит из строя, сообщение теряется.
    • SLAVE_NOT_AVAILABLE — отправка сообщения прошла успешно, но Slave недоступен в данный момент, сообщение уже находится в очереди сервера, только если сервер выходит из строя, сообщение будет потеряно.
  • Подписка на отношения должна быть последовательной.

Несколько Group ID подписаны на несколько тем, и подписка каждого экземпляра потребителя в нескольких Group ID должна быть одинаковой.

![RocketMQ сообщение правильно подписано](images/Middleware/RocketMQ сообщение правильно подписано.png)

Потребитель:

  • Потребительский детерминизм — чтобы предотвратить повторные потребительские сообщения, вызывающие аномалии в бизнесе, потребитель RocketMQ версии в очереди сообщений после получения сообщения должен выполнять обработку на основе уникального ключа сообщения. Повторение сообщений может произойти в следующих сценариях:

    • Повторная отправка сообщения.
    • Повторное представление сообщения.
    • Балансировка нагрузки приводит к повторению сообщения (включая, помимо прочего, колебания сети, перезапуск брокера и перезапуск приложения-потребителя).
  • Журнал — регистрирует потребление, чтобы облегчить последующее позиционирование проблем.

  • Пакетное потребление — пакетное потребление может значительно повысить пропускную способность потребления.

Транзакционные сообщения:

Принцип согласованности MQ и DB (двухсторонняя транзакция):

![MQ и принцип согласованности DB](images/Middleware/MQ и принцип согласованности DB.png) Транзакции сообщений — это возможность MQ, которая похожа на XA и обеспечивает распределённую транзакционную согласованность.

Получив сообщение от производителя, MQ возвращает подтверждение (ack). Производитель начинает локальную транзакцию. Если она завершается успешно, производитель отправляет в MQ фиксацию (commit), а если неудачно — откат (rollback).

Если MQ долго не получает от производителя подтверждения фиксации или отката, то инициирует запрос на проверку состояния транзакции. Производитель проверяет состояние транзакции и повторно отправляет подтверждение.

Если MQ получает подтверждение фиксации, он доставляет сообщение потребителю. В противном случае сообщение сохраняется и удаляется через 3 дня.

Порядок сообщений

Сообщения RocketMQ хранятся в очередях топиков. Очередь — это FIFO (First In First Out), поэтому порядок сообщений в отдельной очереди гарантирован.

Существует два типа упорядоченных сообщений:

  • Разделенный порядок: все сообщения в одном разделе упорядочены по принципу FIFO.
  • Глобальный порядок: все сообщения в топике упорядочены по принципу FIFO.

Для двух заказов с сообщениями a1, b1, b2, a2, a3, b3 (в порядке абсолютного времени):

  • При отправке сообщения заказа a должны быть упорядочены как a1, a2, a3. То же самое верно для заказа b. Однако между сообщениями заказов a и b нет порядка. Это означает, что сообщения заказов a и b могут отправляться в разных потоках.
  • При хранении необходимо сохранять порядок сообщений каждого заказа, но порядок между сообщениями разных заказов может не соблюдаться.

При отправке и хранении сообщений соблюдается их порядок. При потреблении сообщений также соблюдается порядок их хранения.

MQPullConsumer управляется пользовательским потоком и активно извлекает сообщения от сервера. Каждый раз, когда список msgFoundList в PullResult заполняется, пользователь должен сам обеспечить порядок потребления.

В MQPushConsumer пользователь регистрирует MessageListener для потребления сообщений. Клиент должен гарантировать порядок вызова MessageListener.

Потеря сообщений

Сообщение проходит три этапа: производство, хранение и потребление. На любом из этих этапов сообщение может быть потеряно. Чтобы решить проблему потери сообщений, нужно определить причины потери на каждом этапе и принять соответствующие меры.

На этапе производства Producer отправляет сообщение брокеру MQ через сеть. Отправка может завершиться неудачей из-за сетевых проблем или недоступности брокера.

Неудачи автоматически повторяются. Даже после нескольких попыток, если сообщение не удаётся отправить, клиент может предпринять собственные действия для компенсации, не влияя на основную бизнес-логику. Кроме того, даже если брокер выходит из строя, другие брокеры продолжают предоставлять услуги, обеспечивая высокую доступность системы.

Чтобы избежать потери сообщений на этапе производства, рекомендуется использовать синхронные отправки, автоматические повторные попытки и несколько главных узлов.

Синхронная отправка подразумевает блокировку при отправке сообщения до получения ответа от брокера. Если ответ не получен, считается, что отправка не удалась. Это наиболее надёжный способ отправки.

Асинхронная отправка позволяет узнать об успешной отправке через обратный вызов. Однонаправленная отправка (OneWay) наименее надёжна, так как невозможно гарантировать доставку сообщения.

По умолчанию повторные попытки выполняются три раза. Количество попыток можно изменить с помощью API.

Автоматические повторные попытки означают, что при неудачной отправке или истечении времени ожидания сообщение будет автоматически повторено. По умолчанию выполняется три попытки синхронной отправки и одна попытка асинхронной отправки. Если брокер выходит из строя, но в производственной среде обычно используется несколько брокеров, то другие главные узлы продолжат предоставлять услуги, и это не повлияет на отправку сообщений, сообщения по-прежнему будут доставляться. Например, если сообщение отправляется брокеру, когда он выходит из строя, производитель получает ответ об ошибке отправки от брокера, в этом случае производитель автоматически повторяет попытку. В это время отказавший брокер отключается, поэтому производитель переключится на другого брокера для отправки сообщения.

Предположим, что мы хотим строго гарантировать, что сообщения не будут потеряны на этапе хранения сообщений брокера. Для этого потребуется следующая конфигурация, хотя производительность будет значительно ниже по сравнению со стандартной конфигурацией:

# Конфигурация главного узла
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER

# Конфигурация подчиненного узла
brokerRole=slave
flushDiskType = SYNC_FLUSH

Настройка стратегии синхронной записи на диск для брокера. По умолчанию сообщения сохраняются в памяти после того, как они достигают брокера. Затем брокер немедленно возвращает подтверждение производителю. После этого брокер периодически записывает группу сообщений из памяти на диск асинхронно. Этот метод уменьшает количество операций ввода-вывода и обеспечивает лучшую производительность, но если происходит сбой машины или аварийный выход из системы, сообщения могут быть потеряны, если они не были записаны на диск вовремя.

Если мы хотим гарантировать, что брокер не потеряет сообщения и обеспечить надёжность сообщений, нам необходимо изменить механизм сохранения сообщений на синхронный режим записи на диск, то есть сообщение будет сохранено на диске успешно, только тогда будет возвращён ответ. Измените конфигурацию брокера следующим образом:

# По умолчанию используется ASYNC_FLUSH 
flushDiskType = SYNC_FLUSH 

Если брокер не завершает запись на диск в течение заданного времени (по умолчанию 5 секунд), он вернёт статус SendStatus.FLUSH_DISK_TIMEOUT производителю.

Даже если для брокера настроена стратегия синхронной записи на диск, но после завершения записи диск выходит из строя, это может привести к потере всех сообщений на диске. Однако, даже если у нас есть один главный и один подчинённый сервер, и главный сервер завершил запись на диск до того, как диск вышел из строя, все сообщения на диске всё равно могут быть утеряны. Поэтому мы также можем настроить систему так, чтобы она уведомляла производителя о завершении записи на диск не только главным сервером, но и подчинённым сервером.

# По умолчанию — ASYN_MASTER
brokerRole=SYNC_MASTER

Потеря сообщений также может произойти на этапе потребления.

Только когда режим потребления установлен на MessageModel.CLUSTERING (режим кластера), брокер автоматически повторит попытку. Для широковещательных сообщений повторная попытка не выполняется. Если сообщение не удаётся обработать, RocketMQ будет отправлять такие сообщения в очередь недоставленных сообщений после достижения максимального количества попыток. Затем мы должны обратить внимание на очередь недоставленных сообщений и вручную компенсировать эти сообщения.

Потребитель сначала извлекает сообщения локально, затем выполняет бизнес-логику. После выполнения бизнес-логики потребитель должен подтвердить получение сообщения вручную. Только тогда потребление считается завершённым. Это не означает, что потребление завершено после извлечения сообщения локально. Вот пример:

 consumer.registerMessageListener(new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         try{
             for (MessageExt msg : msgs) {
                String str = new String(msg.getBody());
                System.out.println(str);
             }
             
             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         } catch(Throwable t){
             log.error("消费异常:{}", msgs, t);
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
         }
     }
 });

В следующих трёх случаях брокер обычно повторяет попытку (по умолчанию максимальное количество попыток составляет 16 раз). RocketMQ использует стратегию «уменьшения времени» для повторной отправки сообщений. Чем больше попыток, тем меньше вероятность успешного потребления сообщения. Мы можем настроить количество повторных попыток и интервал времени (интервал между первой отправкой) в файле конфигурации брокера broker.conf. Конфигурация выглядит следующим образом:

    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Клиент-потребитель сначала определяет, явно ли установлено максимальное количество повторных попыток MaxReconsumeTimes. Если нет, устанавливается значение по умолчанию, равное 16. В противном случае используется установленное максимальное количество повторных попыток.

private int getMaxReconsumeTimes() {
    // default reconsume times: 16
    if (this.defaultMQPushConsumer.getMaxReconsueTimes() == -1) {
        return 16;
    } else {
        return this.defaultMQPushConsumer.getMaxReconsueTimes();
    }
}

Если потребление занимает слишком много времени, MQ будет бесконечно отправлять сообщения потребителю. Эта ситуация возникает, когда потребитель не возвращает ConsumeConcurrentlyStatus. CONSUME_SUCCESS или ConsumeConcurrentlyStatus.RECONSUME_LATER.

Сообщения, которые не удалось обработать, будут отправлены в очередь недоставленных сообщений. Обработка логики очереди недоставленных сообщений:

  • Сначала определите, превышает ли текущее количество повторных попыток 16 или задержка сообщения меньше 0.
  • Если выполнено любое из этих условий, создайте новую тему (очередь недоставленных сообщений) с именем: %DLQ%+consumerGroup.
  • Добавьте атрибуты предварительной обработки.
  • Отправьте сообщения в соответствующую очередь недоставленных сообщений новой темы и сохраните их на диске, чтобы сделать сообщения постоянными.

Наконец, запустите отдельный потребитель очереди недоставших сообщений для потребления и выполните ручную компенсацию неудачных сообщений.

Существует три режима потребления сообщений во всех системах обмена сообщениями: at-most-once (максимум один раз), at-least-once (минимум один раз) и exactly-only-once (точно один раз). Распределённые системы обмена сообщениями находятся между этими тремя режимами, первые два из которых являются осуществимыми и широко используются.

at-most-once: после отправки сообщения независимо от того, было ли оно успешно обработано, сообщение больше не будет отправляться повторно, что может привести к тому, что сообщение не будет обработано. RocketMQ не использует этот метод. at-lease-once: после успешной обработки сообщения клиент возвращает ACK серверу. Если обработка не удалась, сервер не получит ACK от клиента, и сообщение будет отправлено повторно. Это может привести к повторному потреблению сообщения, и RocketMQ гарантирует, что каждое сообщение будет обработано хотя бы один раз, используя ACK. exactly-only-once: в распределённой системе, если мы хотим реализовать этот режим, неизбежны огромные накладные расходы. RocketMQ не гарантирует эту функцию, и невозможно избежать дублирования сообщений. Бизнес-система должна обрабатывать уникальность сообщений. Чтобы считаться «Exactly Only Once», должны выполняться следующие условия:

  • При отправке сообщения не допускается отправка повторяющихся сообщений.
  • Не допускается повторное потребление одного и того же сообщения при обработке сообщения.

Zookeeper — это централизованная служба для хранения информации о конфигурации, наименованиях, состоянии и других данных, используемых распределёнными приложениями. Zookeeper предоставляет следующие основные функции:

  • Нумерация версий: Zookeeper поддерживает управление версиями для каждого узла данных. Клиент может запросить конкретную версию узла данных или дождаться изменения версии узла данных.
  • Механизм уведомления: клиенты могут регистрировать наблюдателей для интересующих их узлов данных. Когда узел данных изменяется, наблюдатель уведомляется.
  • Синхронизация: Zookeeper обеспечивает синхронизацию данных между клиентами. Клиенты могут использовать Zookeeper для координации своих действий.

Узлы ролей

Лидер (Leader)

  • Отвечает за обработку клиентских запросов на чтение и запись транзакций. Здесь запрос транзакции имеет характеристики ACID транзакции.
  • Синхронно отправляет запросы на запись другим узлам и требует упорядочивания транзакций.
  • Состояние — LEADING.

Последователь (Follower)

  • Обрабатывает запросы клиентов на чтение.
  • Пересылает запросы на запись лидеру.
  • Участвует в выборах лидера.
  • Состояние — FOLLOWING.

Наблюдатель (Observer)

То же, что и последователь, за исключением того, что он не участвует в выборах лидера и его состояние — OBSERING. Может использоваться для линейного расширения QPS чтения. Запуск этапа Leader-выборов

  1. Узел A сначала голосует за себя, информация о голосовании содержит идентификатор узла (SID) и уникальный увеличивающийся номер ZXID, например (1, 0). SID настроен и уникален, ZXID — это уникальный возрастающий номер.

  2. Узел B сначала голосует за себя, информация о голосовании — (2, 0).

  3. Затем узлы A и B голосуют за себя для всего кластера.

  4. После того как узел A получает информацию о голосовании от узла B, он проверяет, находится ли узел B в состоянии текущего голосования и является ли он узлом, который ищет лидера (LOOKING).

  5. Голосование PK: узел A будет сравнивать своё голосование с другими и обновлять его, если другой узел отправил больший ZXID. Если ZXID равны, то сравниваются SID. Здесь ZXID узлов A и B одинаковы, а SID узла B больше, поэтому узел A обновляет информацию о своём голосовании до (2, 0), а затем отправляет её снова. Узлу B не нужно обновлять информацию о голосовании, но ему всё равно нужно отправить её ещё раз на следующей итерации. На данный момент информация о голосовании узла A равна (2, 0):

SID ZXID
Узел А 1 0
Узел В 2 0
  1. Подсчёт голосов: на каждой итерации подсчитывается количество голосов, полученных каждым узлом. Определяется, есть ли более половины узлов, получивших одинаковые голоса. Голоса узлов A и B равны (2, 0) и составляют более половины, поэтому узел B выбирается в качестве лидера.
  2. Обновление состояния узла: узел А становится последователем (Follower), его состояние обновляется до FOLLOWING, узел В становится лидером (Leader), его состояние обновляется до LEADING.

Обработка отказа лидера во время работы Zookeeper

Во время работы Zookeeper лидер всегда остаётся в состоянии LEADING, пока не произойдёт отказ лидера. В этом случае необходимо выбрать нового лидера, и процесс выбора аналогичен процессу запуска. Следует обратить внимание на следующие моменты:

  • Оставшиеся последователи участвуют в выборах, наблюдатели не участвуют.
  • Информация о голосовании в ZXID берётся из локального файла журнала на диске. Если этот узел имеет больший ZXID, он будет выбран в качестве лидера. Если все последователи имеют одинаковый ZXID, тот, у кого больший идентификатор узла, будет выбран в качестве лидера.

Процесс синхронизации данных между узлами

Различные клиенты могут подключаться к главному или резервному узлу. Когда клиент отправляет запрос на чтение или запись, он не знает, подключён ли он к лидеру или последователю. Если клиент подключён к главному узлу и отправляет запрос на запись, лидер выполняет двухфазную фиксацию (2PC) для синхронизации с другими последователями и наблюдателями. Однако если клиент подключён к последователю и отправляет запрос на запись, последователь перенаправляет запрос лидеру, который затем выполняет 2PC для синхронизации данных с последователем. Двухфазная фиксация (2PC):

  • Фаза 1: лидер сначала отправляет предложение (proposal) последователям, последователи отправляют подтверждение (ack) лидеру. Если получено подтверждение более чем от половины последователей, можно перейти к следующему этапу.
  • Фаза 2: лидер загружает данные из файла журнала диска в память, лидер отправляет сообщение фиксации (commit) последователям, последователи загружают данные в память.

Давайте рассмотрим процесс синхронизации данных лидера:

  1. Клиент отправляет запрос на операцию записи.
  2. Лидер получает запрос на запись и преобразует его в запрос предложения «proposal01:zxid1», который сохраняется в файле журнала диска.
  3. Отправляет предложение другим последователям.
  4. Последователь получает предложение и сохраняет его в файл журнала диска.

Теперь давайте посмотрим, как последователь обрабатывает полученное предложение лидера после его получения:

  1. Последователь возвращает подтверждение лидеру.
  2. Лидер получил подтверждение от более чем половины последователей и переходит к следующему этапу.
  3. Лидер загружает предложение из файла журнала диска в структуру данных znode в памяти.
  4. Лидер отправляет сообщение о фиксации всем последователям и наблюдателям.
  5. Последователь получает сообщение о фиксации и загружает данные в структуру данных znode в памяти.

В настоящее время данные лидера и последователя находятся в структуре данных в памяти и синхронизированы, и клиент может получить согласованные данные от лидера и последователя.

Принцип последовательной согласованности ZAB

Лидер фактически создаёт очередь для каждого последователя при отправке предложения. Предложение помещается в соответствующую очередь. Как показано на рисунке ниже, это процесс широковещательной рассылки сообщений в Zookeeper:

Предложение
proposal01:zxid1
proposal02:zxid2
proposal03:zxid3

Клиент отправил три запроса на запись, соответствующие предложению:

Лидер получает запросы и помещает их в очередь один за другим. Последователи получают запросы по очереди, обеспечивая тем самым упорядоченность данных.

Является ли Zookeeper строго согласованным?

Официально определено как упорядоченная согласованность.

Не обеспечивает строгой согласованности, почему?

Потому что лидер отправляет сообщения о фиксации всем последователям и наблюдателям после отправки, они не фиксируют данные одновременно. Например, из-за сетевых проблем разные узлы получают сообщения о фиксации позже, поэтому фиксация происходит позже, и данные могут быть несогласованными на короткое время. Однако после короткого периода времени все узлы фиксируют данные, и данные остаются согласованными. Кроме того, Zookeeper поддерживает строгую согласованность, вручную вызывая метод sync для обеспечения фиксации всех узлов.

Здесь возникает вопрос: если какой-либо узел не может зафиксировать данные, будет ли лидер пытаться повторить попытку? Как обеспечить согласованность данных?

Проблема потери данных при отказе лидера

Первый случай: предположим, что лидер уже записал данные на диск, но ещё не отправил предложение последователям, и в этот момент происходит сбой лидера. Тогда необходимо выбрать нового лидера. Новый лидер отправит предложение, и zxid будет иметь следующее правило увеличения:

  • Старший 32-битный увеличивается на 1, старший 32-битный представляет версию лидера.
  • Младший 32-битный увеличивается на 1, младший 32-битный продолжает увеличиваться.

Когда старый лидер восстанавливается, он становится последователем, и когда новый лидер отправляет последнее предложение, обнаруживается, что zxid предложения на старом лидере меньше, чем у нового лидера, поэтому старое предложение отбрасывается.

Второй случай: если лидер успешно отправил сообщение о фиксации последователям, но некоторые или все последователи ещё не зафиксировали данные, то есть не загрузили данные с диска в память, в это время происходит сбой лидера.

Затем необходимо выбрать последователя с наибольшим zxid из журнала дисков. Если zxid одинаковы, выберите узел с большим идентификатором узла в качестве лидера. Client (клиент): инициатор запроса.

Каждый сервер в процессе работы может находиться в одном из трёх состояний:

  • LOOKING: текущий сервер не знает, кто является лидером, и ищет его;
  • LEADING: текущий сервер выбран в качестве лидера;
  • FOLLOWING: лидер выбран, текущий сервер синхронизируется с ним.

В кластере Zookeeper есть три роли:

  • Лидер (Leader): отвечает за инициирование голосования и принятие решений, обновляет состояние системы. Сервер-лидер — это ядро механизма работы всего кластера ZooKeeper. Его основные функции:
    • единственная точка диспетчеризации и обработки транзакционных запросов, что обеспечивает последовательность обработки транзакций в кластере;
    • диспетчер для внутренних сервисов кластера.
  • Последователь (Follower): обрабатывает запросы от клиентов и возвращает им результаты. Во время выбора лидера участвует в голосовании. Последовательный сервер — это сервер состояния кластера ZooKeeper, который выполняет следующие функции:
    • обработка запросов от клиентов, которые не являются транзакционными, пересылка транзакционных запросов лидеру;
    • участие в голосовании по предложениям транзакционных запросов;
    • участие в выборе лидера.
  • Наблюдатель (Observer): может принимать соединения от клиентов, перенаправлять транзакционные запросы лидеру, но не участвует в голосовании. Наблюдатель был добавлен в версии 3.3.0 и играет роль наблюдателя за состоянием кластера ZooKeeper:
    • обрабатывает запросы клиентов, не являющиеся транзакционными, и пересылает транзакционные запросы лидеру;
    • не участвует ни в каком голосовании.

Модель данных

Модель данных Zookeeper имеет следующие характеристики:

  • иерархическая структура каталогов, имена соответствуют стандартным соглашениям файловой системы, подобно Linux;
  • каждый узел в zookeeper называется znode, и у него есть уникальный путь;
  • узел Znode может содержать данные и дочерние узлы, но узел типа EPHEMERAL не может иметь дочерних узлов;
  • данные в узле могут иметь несколько версий, например, если в определённом пути есть несколько данных, то при запросе данных этого пути необходимо указать версию;
  • клиентское приложение может устанавливать монитор на узле;
  • узлы не поддерживают частичное чтение и запись, а только полное чтение и запись.

У сервера есть четыре состояния: LOOKING, FOLLOWING, LEADING и OBSERVING.

  • LOOKING: поиск лидера. Когда сервер находится в этом состоянии, он считает, что в текущем кластере нет лидера, поэтому ему нужно войти в режим выбора лидера.
  • FOLLOWING: последователь. Указывает, что текущая роль сервера — последователь.
  • LEADING: лидер. Указывает, что текущая роль сервера — лидер.
  • OBSERVING: наблюдатель. Указывает, что текущая роль сервера — наблюдатель.

Существует три режима работы Zookeeper: одиночный, псевдокластерный и кластерный.

  • Одиночный режим: этот режим обычно используется в среде разработки и тестирования, поскольку у нас нет большого количества ресурсов оборудования, и нам не нужна очень хорошая стабильность в повседневной разработке и отладке.
  • Кластерный режим: кластер ZooKeeper обычно состоит из нескольких машин, обычно требуется более трёх машин для формирования кластера, который можно использовать. Каждая машина в кластере будет поддерживать текущее состояние сервера в памяти и поддерживать связь друг с другом.
  • Псевдокластерный режим: это особый режим кластера, когда все серверы кластера размещаются на одной машине. Если у вас есть мощная машина и вы хотите развернуть её как одиночную, ресурсы будут потрачены впустую. В этом случае ZooKeeper позволяет вам запускать несколько экземпляров службы ZooKeeper на одной машине, чтобы имитировать кластерные функции.

Выбор лидера

Во время инициализации кластера Zookeeper серверы (myid = 1–5) запускаются один за другим, начиная процесс выбора лидера:

  • Сервер 1 (myid=1) запускается, в данный момент существует только одна машина, и выбор лидера невозможен.
  • Запускается сервер 2 (myid=2), теперь две машины могут взаимодействовать друг с другом и начинают выбирать лидера.
    • Каждый сервер отправляет свой голос. Голоса серверов 1 и 2 представляют собой пары (myid, ZXID), где ZXID — идентификатор транзакции. На начальном этапе сервер 1 и сервер 2 проголосуют за себя, то есть сервер 1 проголосует (1,0), а сервер 2 — (2,0). Затем они отправят свои голоса всем машинам в кластере.
    • Каждая машина получает голоса от других машин и проверяет их действительность. Проверяется, является ли отправитель LOOKING сервером, и сравнивается ZXID.
    • После получения голосов от других серверов сервер обновит свой голос, сравнив ZXID и myid. Например, сервер 1 получил голоса (2,0) и (1,0). Поскольку ZXID одинаковы, сравниваются myid, и поскольку myid сервера 2 больше, чем myid сервера 1, сервер 1 обновит свой голос до (2,0), затем повторно отправит голос. Для сервера 2 нет необходимости обновлять свой голос, просто отправьте предыдущий голос снова.
    • После каждого раунда голосования сервер подсчитывает количество полученных голосов и определяет, превышает ли оно половину общего числа машин. Если да, выбирается лидер. Сервер 2 получил два голоса, что меньше трёх (n/2+1, где n — общее количество машин), поэтому он остаётся в состоянии LOOKING.
  • Запускается сервер 3 (myid=3), продолжая процесс выбора лидера. Голосование аналогично предыдущему процессу. Поскольку myid сервера 3 является наибольшим, все остальные машины проголосуют за него. Теперь сервер 3 получил три голоса, превышающие половину общего количества машин, поэтому он выбран лидером. Серверы 1 и 2 переходят в состояние FOLLOWING, а сервер 3 становится LEADING.
  • Запускается сервер 4, инициируя ещё один раунд выборов. В настоящее время серверы 1, 2 и 3 не находятся в состоянии LOOKING, поэтому они не будут обновлять свои голоса. Результаты голосования: сервер 3 получает три голоса, а сервер 4 — один голос. Сервер 4 переходит в состояние FOLLOWING.
  • Запускается сервер 5, инициируя ещё один раунд выборов. Аналогично предыдущему раунду, серверы не будут обновлять свои голоса. Результаты голосования: сервер 3 получает три голоса, сервер 5 — один голос. Сервер 5 переходит в состояние FOLLOWING. Лидер выбран — сервер 3.

Выборы лидера во время работы

В работающем кластере Zookeeper пять серверов (myid = 1–5), и внезапно лидер (сервер 3) выходит из строя. Начинается новый раунд выборов лидера:

  • Состояние всех серверов, кроме Observer, изменяется на LOOKING, и начинается процесс выборов лидера.
  • Каждый сервер голосует за себя. Поскольку это происходит во время работы, ZXID каждого сервера может быть разным. Предположим, что ZXID серверов 1, 2, 4 и 5 равны 333, 666, 999 и 888 соответственно. Они проголосуют (1,333), (2,666), (4,999) и (5,888), а затем отправят эти голоса остальным машинам.
  • Каждая машина получит голоса от других машин и проверит их действительность, как описано выше.
  • После получения голосов каждая машина подсчитает количество голосов и определит, превышает ли оно половину общего количества машин. ZXID сервера 4 равен 999, что является самым большим, поэтому он выбирается лидером. Эпох равны, выбирается zxid максимальный.

— Если эпох и zxid равны, то выбирается сервер с наибольшим значением myid (конфигурация zoo.cfg).

Фаза восстановления: на этом этапе Follower отправляет свой lastZxid лидеру, который на основе lastZxid определяет, как синхронизировать данные. Реализация этого этапа отличается от фазы 2: если Follower получает команду TRUNC, он прекращает обработку предложений после L.lastCommittedZxid; если он получает команду DIFF, то принимает новые предложения.

Широковещательная фаза: в настоящее время отсутствует.

Процесс выбора ZK

При первоначальном запуске кластера выбирается машина с наименьшим zxid в качестве лидера.

Когда лидер выходит из строя или теряет большинство последователей, ZK переходит в режим восстановления, чтобы выбрать нового лидера и восстановить все серверы до правильного состояния. Алгоритм выбора лидера в ZK использует протокол ZAB:

  1. Выборы инициируются текущим сервером, выполняющим роль инициатора выборов. Его основная функция — подсчёт результатов голосования и предложение выбранного сервера.
  2. Инициатор выборов отправляет запрос всем серверам, включая себя.
  3. После получения ответов инициатор проверяет, является ли запрос собственным (проверяя zxid на равенство), затем получает идентификатор другого сервера (myid) и сохраняет его в текущем списке объектов запроса, а также получает информацию о предложении лидера от другого сервера (идентификатор и zxid) и записывает её в таблицу записей текущего раунда выборов.
  4. После сбора ответов от всех серверов инициатор вычисляет сервер с максимальным zxid и устанавливает его как рекомендуемый сервер для следующего раунда голосования.
  5. Если выбранный сервер получает n/2 + 1 голосов от других серверов, инициатор устанавливает его в качестве избранного лидера и настраивает своё состояние в соответствии с информацией о выбранном сервере. В противном случае процесс повторяется до тех пор, пока не будет выбран лидер.

Анализ: чтобы лидер получил поддержку большинства серверов, общее количество серверов должно быть нечётным числом 2n + 1, и число живых серверов не должно быть меньше n + 1. Поскольку требуется более половины живых серверов для работы, надёжность кластера, предоставляемого 2n машинами, фактически такая же, как и у кластера, предоставленного 2n - 1 машинами.

Установка Zookeeper

Одиночный режим

Шаг 1: установка и развёртывание

# Скачать и распаковать
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
tar -zxvf zookeeper-3.4.9.tar.gz

# Установить глобальные переменные
vim ~/.bash_profile
# Добавить последнюю строку
export ZOOKEEPER_HOME=/home/zookeeper/zookeeper-3.4.9
export PATH=$ZOOKEEPER_HOME/bin:$PATH
# Сделать изменения активными
source ~/.bash_profile

# Скопировать конфигурационный файл
cp /home/zookeeper/zookeeper-3.4.9/conf/zoo_sample.cfg /home/zookeeper/zookeeper-3.4.9/conf/zoo.cfg

Шаг 2: настройка информации

# Интервал сердцебиения
tickTime=2000
# Каталог сохранения данных
dataDir=/home/zookeeper/zookeeper-3.4.9/dataDir
# Каталог сохранения журналов
dataLogDir=/home/zookeeper/zookeeper-3.4.9/dataLogDir
# Порт подключения клиентов к Zookeeper
clientPort=2181
# Максимальное время соединения для последователей при инициализации соединения (если превышено, соединение не устанавливается)
initLimit=5
# Представляет максимальное время передачи сообщения между лидером и последователем при отправке и получении сообщений
syncLimit=2

Кластерный режим

Шаг 1: установка и развёртывание

Установка аналогична одиночному режиму.

Шаг 2: настройка информации

Создайте файл myid в корневом каталоге dataDir и запишите значение (например, 1, 2, 3 и т. д.) в файл.

В режиме одиночной конфигурации добавьте следующие параметры:

# 1 представляет текущий идентификатор кластера
echo "1" > myid

Для серверов в кластере настройте следующие параметры:

# Формат: server.X=A:B:C
# X представляет myid (диапазон 1–255)
# A — это IP-адрес сервера
# B — порт, используемый этим сервером для обмена сообщениями с лидером кластера
# C — порт, используемый для выборов лидера
server.1=10.24.1.62:2888:3888
server.2=10.24.1.63:2888:3888
server.3=10.24.1.64:2888:3888

Команды управления

# Запустить ZK-сервер
./zkServer.sh start
# Подключиться к указанному серверу с помощью ZK Client
./zkCli.sh -server 127.0.0.1:2181
# Проверить статус ZK-сервера
./zkServer.sh status
# Остановить ZK-сервис
./zkServer.sh stop
# Перезапустить ZK-сервис
./zkServer.sh restart

Параметры конфигурации zoo.cfg

# Порт, к которому подключаются клиенты сервера
clientPort=2181
# Каталог, в котором хранятся снимки данных
dataDir=/User/lry/zookeeper/data
# Минимальный временной интервал в ZK
tickTime=5000
# Каталог журналов транзакций
dataLogDir=/User/lry/zookeeper/datalog
# Максимальный размер очереди запросов для каждого сервера
globalOutstandingLimit=1000
# Размер каждого файла журнала транзакций по умолчанию
preAllocSize=64
# Количество файлов журнала транзакций, которые будут созданы перед созданием снимка
snapCount=100000
# Ограничение количества подключений для одного клиента на одном IP-адресе
maxClientCnxns=60
# Для многосетевых машин можно указать разные порты прослушивания для каждого IP
clientPortAddress=10.24.22.56
# Время ожидания сеанса, если клиент устанавливает время ожидания вне этого диапазона, оно будет принудительно установлено на максимальное или минимальное время
minSessionTimeoutmaxSessionTimeout
# Последователь при запуске синхронизирует все последние данные с лидером, а затем определяет своё начальное состояние готовности к обслуживанию внешних запросов. Лидер позволяет F завершить эту работу в течение времени initLimit.
initLimit
# Во время работы лидер отвечает за связь со всеми машинами в кластере, например, через механизм пульса, для определения состояния активности машины. Если L отправляет пульс и не получает ответ от F в течение времени syncLimit, считается, что F не в сети.
syncLimit
# По умолчанию лидер принимает клиентские подключения и предоставляет обычные услуги чтения и записи. Однако, если вы хотите, чтобы лидер сосредоточился на координации кластера, вы можете установить этот параметр на no, что значительно повысит производительность операций записи
leaderServes=yes
# server.[myid]=[hostname]:[порт для синхронизации и других коммуникаций]:[порт для голосования]
server.x=[hostname]:nnnnn[:nnnnn]
# Группировка и настройка веса для машин
group.x=nnnnn[:nnnnn]weight.x=nnnnn
# Время ожидания соединения при открытии во время выборов лидера, по умолчанию 5 секунд
cnxTimeout
# Максимальный объём данных для каждой ноды, по умолчанию 1M
jute.maxbuffer
``` **Режим работы EventLoop**

На рисунке выше представлен универсальный режим работы EventLoop. Когда происходит событие, приложение помещает его в очередь событий, а затем EventLoop опрашивает очередь и выполняет события или распределяет их соответствующим слушателям событий.

Существует несколько способов выполнения событий: немедленное выполнение, отложенное выполнение и периодическое выполнение.

**Принцип NioEventLoop**

В Netty EventLoop можно рассматривать как механизм обработки событий реакторной модели потоков. Каждый поток EventLoop поддерживает селектор выбора и очередь задач taskQueue. Он отвечает за обработку событий ввода-вывода, обычных задач и периодических задач. В Netty рекомендуется использовать NioEventLoop в качестве реализации. Рассмотрим исходный код ключевого метода run() NioEventLoop:

```java
protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.BUSY_WAIT:
                case SelectStrategy.SELECT:
                    // 轮询 I/O 事件
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                }
            } catch (IOException e) {
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    // 处理 I/O 事件
                    processSelectedKeys();
                } finally {
                    // 处理所有任务
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    // 处理 I/O событий
                    processSelectedKeys();
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // Обработка завершена, переходим к обработке асинхронной очереди задач
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

Структура исходного кода ясна: каждый цикл обработки NioEventLoop включает в себя этапы опроса событий select, обработки событий processSelectedKeys и обработки задач runAllTasks. Это типичный механизм работы реактора. Кроме того, Netty предоставляет параметр ioRatio для настройки соотношения времени обработки событий ввода-вывода и времени обработки задач.

Далее мы подробно рассмотрим принципы реализации Netty EventLoop, сосредоточившись на обработке событий и обработке задач.

Механизм обработки событий

С учётом общей архитектуры Netty, давайте посмотрим на диаграмму потока событий EventLoop для лучшего понимания дизайна Netty EventLoop. Механизм обработки событий NioEventLoop использует безблокировочную последовательную конструкцию:

  • BossEventLoopGroup и WorkerEventLoopGroup содержат один или несколько NioEventLoop.

BossEventLoopGroup отвечает за прослушивание клиентских событий Accept. При возникновении события оно регистрируется в WorkerEventLoopGroup. Для каждого нового канала выбирается только один NioEventLoop для привязки. Таким образом, обработка всех событий жизненного цикла канала является независимой от потока, и разные потоки NioEventLoop не пересекаются друг с другом.

  • После завершения чтения данных NioEventLoop вызывает связанный ChannelPipeline для распространения событий.

ChannelPipeline также является потокобезопасным, и данные передаются первому обработчику ChannelHandler в ChannelPipeline. После обработки данных они передаются следующему обработчику, и весь процесс выполняется последовательно, без переключения контекста потока.

Безблокировочный последовательный дизайн не только максимизирует пропускную способность системы, но и снижает сложность разработки бизнес-логики пользователями, избавляя их от необходимости уделять слишком много внимания проблемам безопасности потоков. Хотя однопоточное выполнение предотвращает переключение контекста, оно может привести к блокировке длительных операций ввода-вывода. Если произойдёт блокировка одного события ввода-вывода, все последующие события ввода-вывода не смогут быть обработаны, что может вызвать накопление событий. Разрабатывая программы с использованием Netty, необходимо учитывать риски, связанные с реализацией ChannelHandler.

Важна надёжность потока NioEventLoop, поскольку его блокировка или зависание могут привести к сбою всей системы. В JDK реализация Epoll имеет уязвимость, которая позволяет пробуждать поток NIO даже при пустом списке событий, вызывая 100% загрузку ЦП. Это известная проблема JDK epoll с пустым опросом. Netty как высокопроизводительная и надёжная сетевая структура должна обеспечивать безопасность потоков ввода-вывода. Как она решает проблему JDK epoll? На самом деле Netty не решает эту проблему коренным образом, а просто обходит её.

Оставляя в стороне другие детали, давайте сосредоточимся на коде в методе select() события опроса, чтобы понять, как Netty решает проблему пустого опроса JDK epoll:

long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
    selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
    selector = selectRebuildSelector(selectCnt);
    selectCnt = 1;
    break;
}

Netty предоставляет механизм обнаружения потенциального зависания потока. Реализация этого механизма следующая:

  • Каждый раз при выполнении: | | channelInactive | Канал находится в состоянии неготовности. Данные можно считывать с канала удалённо. | | channelRead | Данные можно считать с канала удалённо. | | channelReadComplete | Считывание данных с канала завершено. | | userEventTriggered | Событие пользователя было инициировано. | | channelWritabilityChanged | Состояние записи канала изменилось. |

② Методы обратного вызова событий ChannelOutboundHandler и их триггеры

Методы обратного вызова событий в ChannelOutboundHandler чётко определены, и каждый тип операции имеет соответствующий метод обратного вызова, который можно увидеть непосредственно в списке интерфейсов ChannelOutboundHandler. Здесь каждый метод обратного вызова срабатывает перед соответствующей операцией. Мы не будем вдаваться в подробности. Кроме того, большинство интерфейсов в ChannelOutboundHandler содержат параметр ChannelPromise, чтобы можно было своевременно получать уведомления после завершения операции.

ChannelOutboundHandler

Механизм распространения событий ChannelPipeline

Вышеупомянутый ChannelPipeline можно разделить на входящие обработчики ChannelInboundHandler и исходящие обработчики ChannelOutboundHandler. Соответствующие типы событий можно разделить на входящие события и исходящие события.

  • Входящие события: направление распространения — Head->Tail, то есть распространение происходит в прямом порядке (A→B→C).

  • Исходящие события: направление распространения — Tail->Head, то есть распространение происходит в обратном порядке (C→B→A).

Пример кода для изучения механизма распространения событий ChannelPipeline:

serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        ch.pipeline()
                .addLast(new SampleInBoundHandler("SampleInBoundHandlerA", false))
                .addLast(new SampleInBoundHandler("SampleInBoundHandlerB", false))
                .addLast(new SampleInBoundHandler("SampleInBoundHandlerC", true));
        ch.pipeline()
                .addLast(new SampleOutBoundHandler("SampleOutBoundHandlerA"))
                .addLast(new SampleOutBoundHandler("SampleOutBoundHandlerB"))
                .addLast(new SampleOutBoundHandler("SampleOutBoundHandlerC"));

    }
}

Результат выполнения:

SampleOutBoundHandler执行结果

Механизм распространения исключений ChannelPipeline

Реализация распространения событий в ChannelPipeline использует классический режим цепочки ответственности, где вызовы связаны друг с другом. Что произойдёт, если в одном узле возникнет логическая ошибка? ctx.fireExceptionCaugh передаст исключение в порядке от Head к Tail. Если пользователь не обрабатывает исключение, оно будет обработано Tail.

![Netty обработка исключений лучшая практика](images/Middleware/Netty обработка исключений лучшая практика.png)

Рекомендуется, чтобы пользователь настраивал обработчик исключений. Пример кода:

public class ExceptionHandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (cause instanceof RuntimeException) {
            System.out.println("Handle Business Exception Success.");
        }
    }
}

После добавления унифицированного обработчика исключений можно видеть, что исключения были изящно перехвачены и обработаны. Это также рекомендуемая практика обработки исключений Netty.

![Проверка унифицированной обработки исключений Netty](images/Middleware/Проверка унифицированной обработки исключений Netty.png)

Таймер задачи TimerTask

Временное колесо HashedWheelTimer

Временное колесо фактически представляет собой разновидность кольцевой структуры данных, которую можно представить как часы, разделённые на множество сегментов, каждый сегмент представляет определённый период времени. Цепочка задач планирования хранится в каждом сегменте, а указатель перемещается по сегментам с течением времени, выполняя все задачи, срок выполнения которых истёк в соответствующем сегменте. Задачи распределяются по сегментам в зависимости от времени по модулю.

HashedWheelTimer

В управлении тысячами сетевых подключений каждое подключение имеет задачу тайм-аута, запуск отдельного таймера для каждой задачи может привести к использованию большого количества ресурсов. Для решения этой проблемы можно использовать инструмент Netty HashedWheelTimer.

Инструмент Netty HashedWheelTimer предоставляет приблизительную реализацию таймера, поскольку это временное колесо не выполняет задачи точно в срок, а выполняет их через определённые промежутки времени после каждого интервала времени и выполняет все задачи, которые должны были быть выполнены до этого интервала времени.

Конечно, точность выполнения конкретной задачи таймера можно настроить, отрегулировав размер временного интервала в конструкторе HashedWheelTimer, но в большинстве сетевых приложений из-за существования задержки ввода-вывода не требуется строгое соблюдение точности времени выполнения, поэтому интервал времени по умолчанию 100 мс может удовлетворить большинство ситуаций, и нет необходимости тратить дополнительные усилия на настройку точности времени.

Особенности HashedWheelTimer

— Из анализа исходного кода видно, что фактическая точность времени HashedWheelTimer не очень высока, погрешность может составлять около 100 миллисекунд, и если количество ожидающих задач в очереди задач слишком велико, погрешность может быть больше.

— Однако HashedWheelTimer может обрабатывать большое количество задач таймера, и для определения списка кандидатов на обработку задач требуется только O(1) времени, в то время как для Timer и других требуется корректировка кучи, которая является временной сложностью O(logN).

— HashedWheelTimer по сути имитирует колесо времени, разбивая большое количество задач на небольшие списки задач, эффективно экономя ресурсы ЦП и потоков.

Анализ исходного кода

public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, 
                        int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {
        ......
}

threadFactory: пользовательская фабрика потоков, используемая для создания объектов потока. — tickDuration: сколько времени проходит до перехода к следующему сегменту (эквивалентно перемещению часов на одну позицию). — unit: определяет единицу измерения tickDuration. — ticksPerWheel: количество сегментов в колесе. — leakDetection: следует ли включать обнаружение утечки памяти. — maxPendingTimeouts: максимальное количество задач, ожидающих выполнения. 0 или отрицательное число означает отсутствие ограничений.

Преимущества и недостатки

  • Преимущества: — Можно добавлять, удалять и отменять задачи таймера. — Может эффективно обрабатывать большое количество задач таймера.
  • Недостатки: — Требует много памяти и занимает много места в памяти. — Не требует высокой точности времени.

Варианты задач таймера

Вот некоторые из основных вариантов задач таймера:

— Timer. — ScheduledExecutorService. — ThreadPoolTaskScheduler (на основе ScheduledExecutorService). — Netty schedule (использует PriorityQueue). — Netty HashedWheelTimer (временное колесо). — Kafka TimingWheel (иерархическое временное колесо).

Пример использования:

// Создание экземпляра Timer
Timer timer = new

Примечание: в тексте запроса присутствуют фрагменты кода на языке Java, однако в ответе они не представлены. Также в запросе присутствуют ссылки на изображения, которые не были включены в ответ. ```
HashedWheelTimer();

// 提交一个任务,让它在 5s 后执行
Timeout timeout1 = timer.newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) {
        System.out.println("5s 后执行该任务");
    }
}, 5, TimeUnit.SECONDS);

// 再提交一个任务,让它在 10s 后执行
Timeout timeout2 = timer.newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) {
        System.out.println("10s 后执行该任务");
    }
}, 10, TimeUnit.SECONDS);

// 取消掉那个 5s 后执行的任务
if (!timeout1.isExpired()) {
    timeout1.cancel();
}

// 原来那个 5s 后执行的任务,已经取消了。这里我们反悔了,我们要让这个任务在 3s 后执行
// 我们说过 timeout 持有上、下层的实例,所以下面的 timer 也可以写成 timeout1.timer()
timer.newTimeout(timeout1.task(), 3, TimeUnit.SECONDS);

Без блокировки очереди MPSC queue

FastThreadLocal

ByteBuf

Протокол кодирования

Модуль Netty-codec в основном отвечает за кодирование и декодирование работы, реализуя взаимное преобразование между исходными байтовыми данными и бизнес-объектами. Netty поддерживает большинство основных протоколов отрасли, таких как HTTP, HTTP2, Redis и XML, что экономит разработчикам много усилий. Кроме того, этот модуль предоставляет абстрактные классы кодирования и декодирования ByteToMessageDecoder и MessageToByteEncoder, которые можно легко реализовать с помощью наследования этих двух классов.

Декодирование и кодирование пакетов

  • Решения для декодирования и кодирования пакетов:
    • Фиксированная длина сообщения: каждому сообщению требуется фиксированная длина. Когда получатель накапливает чтение сообщений фиксированной длины, считается, что получено полное сообщение. Если данные отправителя меньше фиксированной длины, необходимо заполнить пробелы. Метод фиксированной длины прост в использовании, но также имеет очевидные недостатки, невозможно установить хорошее значение фиксированной длины. Если длина слишком велика, это приведёт к потере байтов, а если она слишком мала, это повлияет на передачу сообщений, поэтому метод фиксированной длины обычно не используется.
    • Специальный разделительный символ: поскольку получатель не может различить границы сообщений, мы можем добавить специальный разделительный символ в конце каждого отправленного пакета, чтобы получатель мог разделить сообщения на основе специального разделительного символа.
    • Длина сообщения + содержимое сообщения: длина сообщения + содержимое сообщения является наиболее часто используемым протоколом в проектах разработки. Как показано в презентации, этот протокол имеет основной формат. Поле заголовка сообщения содержит общую длину сообщения, например, используя 4-байтовое целое число для записи длины сообщения. Тело сообщения представляет собой фактические двоичные данные. При анализе данных получатель сначала считывает поле длины сообщения Len, а затем считывает данные длиной Len, которые считаются полным пакетом данных.

Основной процесс

Процесс запуска сервера

Процесс запуска сервера Netty в целом делится на три этапа:

  1. Настройка пула потоков.
  2. Инициализация канала.
  3. Связывание порта.
  • Настройка пула потоков:

    • Однопоточный режим: все операции ввода-вывода в модели Reactor выполняются одним потоком, поэтому требуется только один EventLoopGroup.
    
    

EventLoopGroup group = new NioEventLoopGroup(1); ServerBootstrap b = new ServerBootstrap(); b.group(group);


  * Многопоточный режим: модель Reactor с одним потоком имеет серьёзные узкие места в производительности, поэтому появилась модель Reactor с несколькими потоками. В Netty использование модели Reactor с несколькими потоками аналогично использованию модели с одним потоком. Разница в том, что NioEventLoopGroup может быть запущен без параметров, и он автоматически запустит количество потоков, равное удвоенному количеству ядер процессора. Конечно, вы также можете настроить фиксированное количество потоков самостоятельно.

   ```java
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(group);
  • Модель главного и подчинённого многопоточного режима: в большинстве сценариев мы используем модель главного и подчинённого многопоточного Reactor. Boss — главный Reactor, Worker — подчинённый Reactor. Они используют разные NioEventLoopGroup, главный Reactor отвечает за обработку Accept, а затем регистрирует канал в подчинённом Reactor, подчинённый Reactor в основном отвечает за все события ввода-вывода во время жизненного цикла канала.
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
  • Инициализация канала:

    • Установите тип канала:
    
    

// Клиентский канал b.channel(NioSocketChannel.class); b.channel(OioSocketChannel.class);

// Серверный канал b.channel(NioServerSocketChannel.class); b.channel(OioServerSocketChannel.class); b.channel(EpollServerSocketChannel.class);

// UDP b.channel(NioDatagramChannel.class); b.channel(OioDatagramChannel.class);


  * Зарегистрируйте обработчик канала:

   ServerBootstrap требует регистрации обработчика канала через метод childHandler(). ChannelInitializer — это анонимный класс, который реализует интерфейс ChannelHandler и используется в качестве параметра ServerBootstrap.

   ```java
b.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        ch.pipeline()
                // HTTP кодировщик
                .addLast("codec", new HttpServerCodec())
                // Сжатие HTTP-контента
                .addLast("compressor", new HttpContentCompressor())
                // Агрегатор HTTP-сообщений
                .addLast("aggregator", new HttpObjectAggregator(65536)) 
                // Пользовательская логика обработки
                .addLast("handler", new HttpServerHandler());
    }
});
  • Задайте параметры канала:

Методы option и childOption в ServerBootstrap используются для установки свойств Boss и Worker соответственно.

b.option(ChannelOption.SO_KEEPALIVE, true);
``` Ниже представлен перевод текста на русский язык:

#### **Рисунок показывает операцию get() API. Для операций set(), read() и write() можно обратиться к книге или API.**

![ByteBuf-get](images/Middleware/ByteBuf-get.png)

### **Дополнительные операции**

![ByteBuf-дополнительные операции](images/Middleware/ByteBuf-дополнительные операции.png) 

Два следующих метода имеют довольно сложную для понимания логику работы, поэтому ниже представлено их объяснение:
* **hasArray()**: если ByteBuf поддерживается массивом байтов, то возвращает true. Другими словами, если ByteBuf находится в режиме кучи, это означает, что его внутренняя память поддерживается массивом байтов.
* **array()**: если ByteBuf поддерживается массивом байтов, возвращает массив, иначе выбрасывает исключение UnsupportedOperationException. То есть, если ByteBuf находится в режиме кучи, он поддерживается массивом.

### **Распределение ByteBuf**

Существует несколько способов создания и управления экземплярами ByteBuf: **последовательное распределение (ByteBufAllocator)**, **Unpooled буфер** и **класс ByteBufUtil**.

#### Последовательное распределение: интерфейс ByteBufAllocator

Netty реализует (ByteBuf) пул через интерфейс ByteBufAllocator. Netty предоставляет пул и непулы ByteBufAllocator:
* `ctx.channel().alloc().buffer()`  по сути, это ByteBufAllocator.DEFAULT.
* `ByteBufAllocator.DEFAULT.buffer()`  возвращает буфер Bytebuf, основанный на куче или прямой памяти. По умолчанию используется куча.
* `ByteBufAllocator.DEFAULT`  существует два типа: UnpooledByteBufAllocator.DEFAULT (непулы) и PooledByteBufAllocator.DEFAULT (пул). Для Java по умолчанию используется PooledByteBufAllocator (пул), а для Android  UnpooledByteBufAllocator (непулы).
* Можно использовать BootStrap для предоставления независимого экземпляра ByteBufAllocator для каждого канала.

![img](images/Middleware/ByteBufAllocator.png)

Объяснение:
* Метод buffer() на верхнем рисунке возвращает буфер ByteBuf, основанный на куче или прямой памяти, по умолчанию  куча. Исходный код: AbstractByteBufAllocator() { this(false); }
* ByteBufAllocator.DEFAULT  может быть пулом или непулом. По умолчанию это пул (PooledByteBufAllocator.DEFAULT).

#### Unpooled буфер  непулы

Unpooled предоставляет статические вспомогательные методы для создания непулов ByteBuf.

![Unpooled буфер](images/Middleware/Unpooled буфер.png)

Примечание:
* метод buffer() на верхней картинке возвращает непуловой буфер ByteBuf на основе кучи.
* wrappedBuffer()  создаёт представление и возвращает ByteBuf с упакованными данными. Очень полезно.

Создание ByteBuf кода:
```java
public void createByteBuf(ChannelHandlerContext ctx) {
 // 1. Создание ByteBuf через Channel
 ByteBuf buf1 = ctx.channel().alloc().buffer();
 // 2. Создание через ByteBufAllocator.DEFAULT
 ByteBuf buf2 =  ByteBufAllocator.DEFAULT.buffer();
 // 3. Создание через Unpooled
 ByteBuf buf3 = Unpooled.buffer();
}

Класс ByteBufUtil

Класс ByteBufUtil предоставляет статические вспомогательные методы для работы с ByteBuf: hexdump() и equals().

  • hexdump() — печатает содержимое ByteBuf в шестнадцатеричном формате. Очень ценно.
  • equals() — проверяет равенство двух экземпляров ByteBuf.

Счётчик ссылок

В версии Netty4.0 для ByteBuf и ByteBufHolder была введена технология счётчика ссылок. Следует различать счётчик ссылок и алгоритм достижимости (алгоритм сборки мусора JVM).

  • Кто отвечает за освобождение: обычно это сторона, которая в последний раз обращалась к объекту (счётчик ссылок), отвечает за его освобождение.
  • buffer.release() — уменьшает счётчик ссылок на 1.
  • buffer.retain() — увеличивает счётчик ссылок на 1.
  • buffer.refCnt() — возвращает текущее значение счётчика объекта.
  • buffer.touch() — записывает позицию доступа текущего объекта, в основном используется для отладки.
  • Счётчик ссылок применяется не только к прямым буферам (direct Buffer). Все три режима ByteBuf (куча Buffer, прямой Buffer и составной Buffer) используют счётчики ссылок, иногда требуется ручное управление счётчиком ссылок со стороны программиста.
public static void releaseReferenceCountedObject(){
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // Увеличиваем счётчик ссылок на 1
    buffer.retain();
    // Выводим счётчик ссылок
    buffer.refCnt();
    // Уменьшаем счётчик ссылок на 1
    buffer.release();
}

Zero-Copy

Zero-copy в Netty проявляется в нескольких аспектах:

  • Реализация нулевого копирования через CompositeByteBuf: Netty предоставляет класс CompositeByteBuf, который объединяет несколько ByteBuf в один логический ByteBuf, избегая копирования между отдельными ByteBuf.
  • Реализация нулевого копирования через операцию wrap: операция wrap позволяет обернуть byte[], ByteBuf, ByteBuffer и другие в объект Netty ByteBuf, предотвращая копирование данных.
  • Реализация нулевого копирования через операцию slice: ByteBuf поддерживает операцию slice, позволяя разделить ByteBuf на несколько ByteBuf, совместно использующих одну область хранения, избегая копирования памяти.
  • Реализация нулевого копирования через FileRegion: FileRegion используется для упаковки FileChannel.tranferTo, позволяя напрямую передавать данные из файла в целевой канал, избегая проблем с копированием памяти при традиционном методе циклического write. Байт-буфер slice (int index, int length)

Метод slice без параметров эквивалентен вызову buf.slice(buf.readerIndex(), buf.readableBytes()), который возвращает срез читаемой части buf. В то же время метод slice (int index, int length) более гибкий. С его помощью можно задать различные параметры для получения срезов из разных областей buf.

Процесс создания header и body с использованием метода slice не включает операцию копирования. Фактически header и body являются частями byteBuf, которые совместно используют его пространство хранения:

Использование FileRegion для реализации нулевого копирования

В Netty FileRegion используется для реализации нулевой копии при передаче файлов. Однако на нижнем уровне FileRegion зависит от функции нулевого копирования FileChannel.transfer в Java NIO. После появления FileRegion можно напрямую записывать содержимое файла в Channel, не копируя его сначала во временный буфер, а затем записывая буфер в Channel. Такая операция нулевого копирования несомненно полезна при передаче больших файлов.

Традиционный поток ввода-вывода

  1. Файл копируется из диска в ядро с помощью технологии DMA.
  2. Файл копируется из ядра в область пользовательского процесса.
  3. Файл копируется из области пользовательского процесса в сокет.
  4. Содержимое сокета копируется с помощью DMA в сетевую карту.

Общий поток нулевого копирования

TCP-сцепка и распаковка

Сцепка и распаковка — это проблемы, связанные с передачей данных по протоколу TCP.

Предположим, что клиент отправляет два пакета данных D1 и D2 на сервер. Поскольку сервер может считывать разное количество байтов за один раз, возможны следующие ситуации:

— Сервер считывает два отдельных пакета, D1 и D2, без сцепки и распаковки. — Сервер получает два сцепленных пакета, D1 и D2. — Сервер считывает часть содержимого пакетов D1 и D2 за одно чтение, а оставшуюся часть D2 — за следующее чтение. — Сервер считывает частично пакет D1 за первое чтение и полностью пакет D2 за второе чтение. — Если TCP-кэш ещё меньше, пакеты D1 и D2 будут разделены на несколько частей для отправки.

Основная причина возникновения сцепки и распаковки заключается в том, что операционная система имеет буферный пул при отправке данных TCP. Например, размер пула составляет 1024 байта:

— Если объём отправляемых данных невелик, TCP объединяет несколько запросов в один и отправляет их. Это приводит к сцепке. — Если объём данных большой и превышает размер пула, TCP разделяет данные на несколько пакетов для отправки, что приводит к распаковке.

Решения

Для ситуаций, когда используются пакеты фиксированной длины, можно использовать FixedLengthFrameDecoder. Этот декодер считывает сообщения фиксированной длины. Если текущее сообщение недостаточно длинное, он ждёт следующего сообщения для завершения. Использование этого декодера довольно просто: нужно только указать длину каждого сообщения в конструкторе.

Для обработки данных с разделителями Netty предоставляет два декодера: LineBasedFrameDecoder и DelimiterBasedFrameDecoder. Первый использует символы новой строки (\n или \r\n) для обработки данных, а второй — пользовательский разделитель.

Основной принцип решения проблемы сцепки и распаковки — добавление поля длины к данным пакета. Для этого используются LengthFieldBasedFrameDecoder для декодирования данных и LengthFieldPrepender для добавления поля длины перед отправкой данных. Обработка запросов данных от клиентов и ответ на них ch.pipeline().addLast(new EchoServerHandler()); }


#### Создание собственного упаковщика и распаковщика пакетов

Можно реализовать `MessageToByteEncoder` и `ByteToMessageDecoder` для создания собственных упаковщиков и распаковщиков пакетов.

- `MessageToByteEncoder`: отвечает за кодирование ответа в объект ByteBuf.
- `ByteToMessageDecoder`: преобразует полученный объект ByteBuf в нужный формат данных.

## Высокая производительность

### Архитектура ввода-вывода

* **Синхронный неблокирующий ввод-вывод**: использует минимум ресурсов для выполнения максимального количества задач.
* **Нулевое копирование памяти**: минимизирует ненужные копии памяти, обеспечивая более эффективную передачу данных.
* **Пул памяти**: позволяет повторно использовать выделенную память, особенно прямую (heap). Реализуется с помощью бинарного дерева поиска для управления распределением памяти.
* **Последовательная обработка чтения и записи**: избегает использования блокировок, которые могут привести к снижению производительности.
* **Высокопроизводительные протоколы сериализации**: поддерживает протоколы, такие как protobuf, для быстрой сериализации данных.

### Оптимизация операционной системы

#### Настройка параметров файловых дескрипторов

* **Максимальное количество открытых файлов**: можно изменить с помощью команды `cat /proc/sys/fs/file-max`.
* **Количество открытых файлов для одного процесса**: по умолчанию 1024, можно настроить с помощью `ulimit -a`.

#### Оптимизация TCP/IP параметров

Необходимо уделить внимание следующим параметрам:

* net.ipv4.tcp_rmem: размер буфера для каждого соединения.

#### Многосетевые очереди и мягкие прерывания

* **Буферы TCP**: настройка параметров SO_SNDBUF и SO_RCVBUF для оптимального размера буферов отправки и получения.
* **Мягкие прерывания**: использование RPS (Receive Packet Steering) в ядре Linux ≥2.6.35 для улучшения производительности сетевого взаимодействия. RPS распределяет обработку мягких прерываний между несколькими процессорами, основываясь на информации о пакетах.

## Оптимизация производительности Netty

### Настройка количества потоков

**Оптимизация главного пула потоков**:

Для серверов Netty обычно достаточно одного потока для прослушивания порта. Однако при большом количестве устройств, подключающихся одновременно, рекомендуется использовать несколько портов и главный пул потоков (Reactor). Это позволит обрабатывать подключения параллельно, ускоряя процесс подключения и снижая вероятность ошибок.

**Настройка рабочих потоков (I/O)**:

Сначала можно использовать значение по умолчанию, равное количеству ядер процессора * 2. Затем провести тестирование производительности и проанализировать загрузку CPU для рабочих потоков. Если они перегружены, следует увеличить количество потоков.

### Оптимизация сердцебиения

Цель сердцебиения — убедиться в доступности канала связи и способности другой стороны принимать и отправлять сообщения. Для этого необходимо периодически проверять состояние канала.

В случае большого количества подключений важно своевременно обнаруживать неработающие соединения и удалять их из списка. Также важно выбрать оптимальный интервал сердцебиения, чтобы избежать перегрузки системы и частых пауз в работе приложения.

Сердцембиение может быть реализовано на разных уровнях:

* TCP Keep-Alive: работает на уровне TCP.
* Протокольный уровень: используется в протоколах с длительным соединением, таких как MQTT.
* Уровень приложения: реализуется через периодическую отправку сообщений о состоянии.

Существует два типа сердцебиения:

* Ping-Pong: одна сторона отправляет сообщение Ping, а другая отвечает сообщением Pong.
* Ping-Ping: обе стороны отправляют сообщения Ping без ожидания ответа.

Стратегии обнаружения сбоев:

* Истечение времени ожидания: если после нескольких попыток не получено ответное сообщение, считается, что канал не работает.
* Сбой при чтении или отправке: если во время обмена сообщениями возникают ошибки ввода-вывода, это также указывает на сбой канала. **Оптимизация сердцебиения**

- Для серверов с миллионами пользователей обычно не рекомендуется использовать слишком длинные периоды сердцебиения и тайм-ауты.
- Период сердцебиения обычно не должен превышать 60 секунд, а тайм-аут сердцебиения обычно равен удвоенному периоду сердцебиения.
- Рекомендуется использовать IdleStateHandler для реализации сердцебиения, вместо создания собственных задач пула потоков, что может увеличить нагрузку на систему и привести к проблемам с параллельной безопасностью.
- В случае тайм-аута или сбоя сердцебиения необходимо закрыть канал, чтобы клиент мог повторно подключиться.
- После срабатывания события IdleStateEvent пользователь может подписаться на это событие и выполнить собственную логику обработки, такую как закрытие канала, повторное подключение клиента, оповещение и ведение журнала.

**Оптимизация приёма и отправки буферов**

Для длинных каналов каждый канал должен поддерживать свои собственные буферы приёма и отправки. В JDK NIO используется класс ByteBuffer, который представляет собой фиксированный массив байтов, не допускающий динамического расширения.

Пример: если максимальное сообщение составляет 10 КБ, а среднее — 5 КБ, то для обработки 10 КБ сообщений ByteBuffer должен быть установлен на 10 КБ. Это означает, что каждый канал будет занимать дополнительные 5 КБ памяти. Если количество каналов составляет 1 миллион, каждый канал будет иметь свой собственный ByteBuffer для приёма, что приведёт к дополнительным 4882 МБ памяти.

Netty предоставляет ByteBuf, который поддерживает динамическое изменение размера. Он также предоставляет два распределителя буфера приёма:

* FixedRecvByteBufAllocator: Распределитель буфера приёма фиксированного размера, который не изменяет размер в зависимости от размера данных. Однако он может расширяться при необходимости.
* AdaptiveRecvByteBufAllocator: Распределитель буфера приёма с динамическим изменением размера, который адаптируется к размеру данных и может сжимать буфер при необходимости.

Рекомендуется использовать AdaptiveRecvByteBufAllocator, который можно указать при создании клиента или сервера.

Примечание: независимо от того, используется ли буфер приёма или отправки, рекомендуется устанавливать его размер равным среднему размеру сообщения, а не максимальному размеру, чтобы избежать ненужного использования памяти.

**Эффективное использование пулов памяти**

Каждый поток NioEventLoop обрабатывает N каналов. Процесс обработки канала включает в себя начало обработки канала A, создание буфера приёма (создание ByteBuf), декодирование сообщения, преобразование в объект POJO, отправку в фоновый поток в виде задачи, освобождение буфера приёма и переход к обработке канала B.

При использовании пула памяти, когда канал A получает новые данные, он запрашивает свободный ByteBuf из пула памяти NioEventLoop. После декодирования сообщения ByteBuf освобождается обратно в пул памяти. Использование пула памяти позволяет сократить количество запросов и операций GC с 15625 до 0 (предполагается, что всегда есть доступный ByteBuf).

По умолчанию Netty не использует пулы памяти, но их можно настроить при создании клиента или сервера.

Использование пула памяти требует, чтобы выделение и освобождение памяти происходило попарно, иначе это может привести к утечке памяти. Также важно отметить, что после завершения работы с ByteBuf необходимо явно вызвать ReferenceCountUtil.release(msg) для освобождения буфера приёма, иначе он будет считаться используемым, что также может привести к утечкам памяти.

**Предотвращение непреднамеренной блокировки I/O потока**

Обычно не следует выполнять длительные операции, такие как доступ к базе данных или вызов сторонних сервисов, в потоках ввода-вывода Netty. Однако некоторые скрытые блокирующие операции могут быть упущены, например, запись журналов.

В производственной среде обычно требуется регистрировать вызовы интерфейсов в режиме реального времени, а другие журналы имеют уровень ERROR. При возникновении проблем с вводом-выводом в журнале ошибок будет записано сообщение об ошибке. Если диск имеет высокую задержку записи, это может вызвать блокировку операции записи журнала, время которой невозможно предсказать, что, в свою очередь, может заблокировать поток ввода-вывода Netty и помешать обработке других каналов.

Хотя log4j поддерживает асинхронную запись журналов (AsyncAppender), при заполнении очереди журналов она может блокироваться синхронно, пока в очереди не появится свободное место.

Эти проблемы могут быть трудно обнаружить в тестовой среде, поскольку они часто возникают случайно и на короткое время.

**Разделение I/O потоков и бизнес-потоков**

Если сервер выполняет только простые операции с памятью и пересылку сообщений, можно увеличить размер пула рабочих потоков NioEventLoop и выполнять обработку бизнес-каналов непосредственно в потоке ввода-вывода, что позволит избежать переключения контекста и повысить производительность.

Однако если бизнес-логика сложна, рекомендуется разделить потоки ввода-вывода и бизнес-потока. Для потоков ввода-вывода можно создать большой пул потоков NioEventLoopGroup, где все каналы совместно используют один пул. Для бизнес-потоков можно создать несколько небольших пулов потоков, которые можно связать с потоками ввода-вывода. Это уменьшит конкуренцию за блокировки и улучшит производительность обработки.

**Управление количеством одновременных подключений на сервере**

Независимо от оптимизации производительности сервера, необходимо учитывать управление подключениями. Когда ресурсы становятся узким местом или происходит массовое подключение устройств, необходимо обеспечить защиту системы с помощью управления подключениями. Netty в основном фокусируется на управлении количеством одновременных подключений. Баг, также возможно баг бизнес-слоя, требуется конкретный анализ конкретной проблемы.

- **Некорректное закрытие сокета**: например, I/O поток блокируется неожиданно, или доля пользовательских задач в выполнении I/O потока слишком высока, что приводит к несвоевременной обработке I/O операций и невозможности своевременного освобождения канала.

**Решение:**

- **Избегайте обработки бизнес-логики (кроме отправки и приёма сердцебиений) в I/O потоках Netty (worker потоках).**
- **Будьте осторожны при выполнении пользовательских задач на I/O потоке.**
- **Используйте IdleStateHandler, ReadTimeoutHandler и WriteTimeoutHandler с осторожностью.**

# RabbitMQ

## Введение в режимы работы

На официальном сайте RabbitMQ представлены 6 рабочих режимов: простой режим, режим рабочей очереди, режим публикации/подписки, режим маршрутизации, режим темы и режим RPC. В этой статье мы рассмотрим только первые 5 режимов работы.

### Простой режим и режим рабочей очереди

Мы объединили эти два режима, потому что их принцип работы очень прост и состоит из трёх объектов: производителя, очереди и потребителя.

Производитель отвечает за создание сообщений и отправку их в очередь. Потребитель отслеживает очередь и обрабатывает сообщения, когда они появляются.

Когда есть несколько потребителей, они равномерно обрабатывают сообщения из очереди. Пример кода:

Производитель:
```java
//1. Получение соединения
Connection connection = ConnectionUtil.getConnection();
//2. Создание канала
Channel channel = connection.createChannel();
//3. Объявление очереди
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4. Отправка сообщения
channel.basicPublish("", QUEUE_NAME, null, "hello simple".getBytes());

System.out.println("Отправка прошла успешно");
//5. Освобождение соединения
channel.close();
connection.close();

Потребитель:

// 1. Получение соединения
Connection connection = ConnectionUtil.getConnection();
// 2. Создание канала
Channel channel = connection.createChannel();
// 3. Объявление очереди
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4. Мониторинг сообщений
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
            byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Получено: " + message);
    }
});

Режимы публикации/подписки, маршрутизации и темы

Эти три режима используют обменник. Производитель не взаимодействует напрямую с очередью, а отправляет сообщения в обменник, который затем помещает сообщения в соответствующие очереди для обработки потребителями. Существует три основных типа обменников: разветвлённый, прямой и тематический. Принцип работы показан ниже:

Разветвлённый: не обрабатывает ключи маршрутизации. Нужно просто связать очередь с обменником. Сообщение, отправленное в обменник, будет перенаправлено во все связанные очереди. Это похоже на широковещательную рассылку в локальной сети, где каждое устройство в подсети получает копию сообщения. Разветвлённые обменники обеспечивают самую быструю пересылку сообщений.

Режим публикации/подписки использует разветвлённые обменники.

Прямой: обрабатывает ключи маршрутизации. Необходимо связать очередь с обменником и указать, что сообщение должно полностью соответствовать определённому ключу маршрутизации. Если очередь связана с обменником с требованием ключа маршрутизации «dog», то будут перенаправлены только сообщения с меткой «dog». Сообщения dog.puppy и dog.guard не будут перенаправляться, только dog.

Режим маршрутизации использует прямые обменники.

Тематический: сопоставляет ключи маршрутизации с определённым шаблоном. Здесь очередь должна быть связана с обменником по определённому шаблону. Символ «#» соответствует одному или нескольким словам, символ «*» — любому слову. Таким образом, «audit.#» может соответствовать «audit.irs.corporate», но «audit.*» будет соответствовать только «audit.irs».

Режим темы использует тематические обменники. Пример кода:

Производитель:

// 1. Получение соединения
Connection connection = ConnectionUtil.getConnection();
// 2. Создание канала
Channel channel = connection.createChannel();
// 3. Объявление обменника
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 4. Отправка сообщений
for (int i = 0; i < 100; i++) {
     channel.basicPublish(EXCHANGE_NAME, "", null, ("hello ps" + i + "").getBytes());
}

System.out.println("Отправка прошла успешно");
// 5. Освобождение соединения
channel.close();
connection.close();

Несколько потребителей:

// 1. Получение соединения
Connection connection = ConnectionUtil.getConnection();
// 2. Создание канала
Channel channel = connection.createChannel();
// 3. Объявление обменника
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 4. Связывание очереди с обменником
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 5. Обработка сообщений
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("recv1:" + message);

        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

Dubbo

Apache Dubbo — это высокопроизводительный, лёгкий и открытый Java-фреймворк. Apache Dubbo предоставляет шесть основных функций: высокопроизводительные RPC-вызовы на основе интерфейсов, интеллектуальное обнаружение ошибок и балансировка нагрузки, автоматическое обнаружение и регистрация сервисов, высокая масштабируемость, управление потоком трафика во время выполнения и визуальное управление и обслуживание сервисов.

  • Высокопроизводительные RPC-вызовы на основе интерфейса

    Обеспечивает высокую производительность на основе прокси-серверов для удалённых вызовов. Сервисы предоставляют интерфейсы, скрывая детали удалённого вызова от разработчиков. ExtensionLoader.getExtensionLoader(type.class).getExtension(name)来获取真正的实例来调用, посмотрите пример на официальном сайте.

public interface WheelMaker {
    Wheel makeWheel(URL url);
}
// WheelMaker 接口的 адаптивная реализация класса
public class AdaptiveWheelMaker implements WheelMaker {
    public Wheel makeWheel(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
     // 1. Вызов url 的 getXXX метода для получения значения параметра
        String wheelMakerName = url.getParameter("Wheel.maker");
        if (wheelMakerName == null) {
            throw new IllegalArgumentException("wheelMakerName == null");
        }
        // 2. Вызов ExtensionLoader для получения загрузчика
        // 3. Вызов ExtensionLoader для получения расширения на основе имени, полученного из URL
        WheelMaker wheelMaker = ExtensionLoader.getExtensionLoader(WheelMaker.class).getExtension(wheelMakerName);
        // 4. Вызов конкретного метода реализации класса для выполнения вызова.
        return wheelMaker.makeWheel(url);
    }
}

Анализ аннотации Adaptive

Из исходного кода аннотации Adaptive можно увидеть, что эта аннотация может быть применена к классу или методу. Логика реализации аннотации Adaptive в классе и методе различна.

Аннотация Adaptive в классе

Если аннотация Adaptive применяется к классу, Dubbo не будет генерировать прокси-класс для этого класса. В Dubbo только два класса используют аннотацию Adaptive: AdaptiveCompiler и AdaptiveExtensionFactory. Это означает, что логика расширения загружается вручную, но это не является основным фокусом внимания.

Аннотация Adaptive в методе

Когда аннотация Adaptive используется в методе, Dubbo автоматически генерирует логику прокси для этого метода. Это означает, что логика загрузки расширения должна быть сгенерирована автоматически фреймворком. Реализация механизма следующая:

  • загрузить интерфейс, помеченный @SPI, если он не существует, то механизм Adaptive не поддерживается;
  • создать подкласс кода для целевого интерфейса в соответствии с определённым шаблоном, скомпилировать и сгенерировать код, а затем создать объект этого класса через отражение;
  • объединить сгенерированный объект экземпляра, получить указанный ключ конфигурации через переданный объект URL, загрузить класс объекта, соответствующего ключу, и, наконец, делегировать вызов этому классу для выполнения.
@SPI("apple")
public interface FruitGranter {
  Fruit grant();
  @Adaptive
  String watering(URL url);
}
---
// Садовод яблок
public class AppleGranter implements FruitGranter {
  @Override
  public Fruit grant() {
    return new Apple();
  }
  @Override
  public String watering(URL url) {
    System.out.println("watering apple");
    return "watering finished";
  }
}
---
// Банан садовод
public class BananaGranter implements FruitGranter {
  @Override
  public Fruit grant() {
    return new Banana();
  }
  @Override
  public String watering(URL url) {
    System.out.println("watering banana");
    return "watering success";
  }
}

Реализация вызова:

public class ExtensionLoaderTest {
  @Test
  public void testGetExtensionLoader() {
    // Сначала создаём фиктивный объект URL
    URL url = URL.valueOf("dubbo://192.168.0.1:1412?fruit.granter=apple");
    // Получаем объект FruitGranter через ExtensionLoader
    FruitGranter granter = ExtensionLoader.getExtensionLoader(FruitGranter.class)
      .getAdaptiveExtension();
    // Используем этот объект FruitGranter для вызова его метода с адаптивной аннотацией и получаем результат вызова
    String result = granter.watering(url);
    System.out.println(result);
  }
}

Этот метод генерирует внутренний класс.

Процесс раскрытия сервиса

Обзор раскрытия сервиса

Фреймворк Dubbo использует URL в качестве общей модели. Во время работы системы все данные о состоянии могут быть получены через URL. Например, информация о том, какие методы сериализации используются, какой протокол связи используется и как распределяется нагрузка, представлена в виде параметров URL. Поэтому во время работы фреймворка, когда требуется определённая информация, её можно получить из соответствующего ключа в списке параметров URL. Формат URL следующий:

  • protocol: указывает на различные протоколы в Dubbo, такие как dubbo thrift http;
  • username/password: имя пользователя и пароль;
  • host/port: хост и порт;
  • path: название интерфейса;
  • parameters: параметры ключ-значение.
protocol://username:password@host:port/path?k=v

Процесс раскрытия сервиса можно разделить на три части:

  1. Проверка конфигурации и окончательная сборка в URL.
  2. Раскрытие сервиса на локальном и удалённом серверах.
  3. Регистрация сервиса в реестре.

Раскрытие сервиса с точки зрения построения объектов можно разделить на две стадии:

  1. Упаковка сервиса в Invoker.
  2. Преобразование Invoker в Exporter через протокол.

Отслеживание исходного кода процесса раскрытия сервиса

  • После запуска контейнера Spring IOC и обновления всех зависимостей вызывается onApplicationEvent для запуска процесса раскрытия сервиса. ServiceBean выполняет экспорт и doExport для создания URL. Чтобы скрыть детали вызова, предоставляется единый исполняемый объект, который получает invoker через ProxyFactory.
  • Конкретный Protocol преобразует упакованный invoker в exporter, используя SPI.
  • Затем запускается сервер server, который прослушивает порт и создаёт сервер прослушивания с помощью NettyServer.
  • Через RegistryProtocol URL регистрируется в реестре, чтобы потребитель мог получить информацию о провайдере.

Процесс использования сервиса

В Dubbo инстанс, выполняющий функцию, называется invoker. И провайдер, и потребитель должны взаимодействовать с invoker. Через прокси-класс, созданный с использованием протокола, invoker упаковывается. Существует два способа внедрения сервиса:

  • Голодный человек: реализуется через реализацию Spring InitializingBean и последующий вызов afterPropertiesSet. Контейнер вызывает метод afterPropertiesSet ReferenceBean для внедрения сервиса.
  • Ленивый человек (по умолчанию): сервис внедряется только тогда, когда он используется другими классами.

Существует три способа использования сервиса:

  • Локальное внедрение: сервис раскрывается локально, избегая сетевых вызовов.
  • Прямое подключение к удалённому сервису: реестр не запускается, и адрес провайдера указывается напрямую для прямого подключения.
  • Использование реестра для доступа к удалённому сервису: информация о балансировке нагрузки получается через реестр при использовании удалённого сервиса. Осуществляется объединение, выставляется только один invoker.

Затем строится прокси, инкапсулируется возвращённая ссылка на сервис invoker, после этого вызывается этот класс прокси.

Способ вызова:

  • oneway — не интересует успешность отправки запроса;
  • Async (асинхронный вызов) — Dubbo асинхронен по своей природе, клиент после вызова запроса сохраняет в контексте возвращаемый ResponseFuture, пользователь может в любое время вызвать future.get для получения результата. Асинхронный вызов идентифицируется уникальным ID;
  • Sync (синхронный вызов) — в исходном коде Dubbo уже вызван future.get, у пользователя создаётся впечатление, что метод заблокирован, и он должен дождаться результата перед возвратом.

Последовательность вызовов

Перед вызовом вам, возможно, потребуется учесть следующее:

  • потребитель и поставщик согласовывают протокол связи, Dubbo поддерживает множество протоколов, таких как dubbo, rmi, hessian, http, webservice и другие. По умолчанию используется протокол dubbo, соединение представляет собой единое длинное соединение, NIO асинхронная связь. Подходит для передачи небольших объёмов данных (один запрос менее 100 КБ), но с высокой степенью параллелизма;
  • согласовывается режим сериализации, который в целом делится на две категории: одна — символьный тип (XML или json, читаемый человеком, но низкая эффективность передачи), другая — двоичный поток (данные компактные, удобные для машины). По умолчанию используется hessian2 в качестве протокола сериализации;
  • при вызове потребителя поставщику предоставляются соответствующий интерфейс, имя метода, тип параметра, значение параметра и номер версии;
  • список поставщиков предоставляет услуги внешнему миру, включая выбор поставщика услуг балансировки нагрузки;
  • потребитель и поставщик периодически отправляют информацию в монитор.

Последовательность вызовов:

  • клиент инициирует запрос для вызова интерфейса, интерфейс вызывает генерируемый класс агента. Класс агента генерирует RpcInvocation и затем вызывает invoke;
  • ClusterInvoker получает список служб из реестра и предоставляет доступный invoker через балансировку нагрузки;
  • данные сериализуются и десериализуются при передаче по сети. Вызов сетевого сервиса через NettyServer;
  • серверный бизнес-поток пула принимает анализ данных, находит invoker для выполнения invoke через exportMap;
  • вызов реального Impl для получения результата и возврата.

Способы вызова:

  • oneway — не заботится об успешности отправки запроса, минимальные затраты;
  • sync (синхронный вызов) — future.get уже вызван в исходном коде Dubbo, у пользователя создаётся впечатление, что метод заблокирован и должен дождаться результата, прежде чем вернуться;
  • async (асинхронный вызов) — Dubbo по своей природе асинхронный, клиент сохраняет возвращаемый ResponseFuture в контексте после вызова запроса, пользователь может вызвать future.get в любое время, чтобы получить результат. Асинхронный вызов идентифицируется уникальным ID.

Балансировка нагрузки кластера

Dubbo вводит модули Cluster, Directory, Router, LoadBalance, Invoker для обеспечения надёжности системы Dubbo. Отношения показаны на рисунке ниже:

Сервис обнаружения регистрирует несколько удалённых вызовов в Directory, а затем инкапсулирует их в invoker через Cluster. invoker обеспечивает отказоустойчивость.

При использовании потребителем он выбирает доступный invoker из Directory и инициирует вызов.

Вы можете думать о Cluster в Dubbo как о большой упаковке, которая включает в себя различные функции отказоустойчивости.

Отказоустойчивость кластера

Отказоустойчивость кластера реализуется на стороне потребителя через подкласс Cluster. Существует 10 реализаций Cluster, каждый класс Cluster создаёт объект соответствующего ClusterInvoker. Основная идея заключается в том, чтобы позволить пользователю выборочно вызывать промежуточный уровень Cluster, скрывая конкретные детали реализации.

Кластер Cluster Invoker Функция
FailoverCluster FailoverClusterInvoker Автоматическое переключение при сбое, по умолчанию
FailfastCluster FailfastClusterInvoker Один вызов, сбой вызывает исключение
FailsafeCluster FailsafeClusterInvoker Запись журнала ошибок при возникновении ошибки
FailbackCluster FailbackClusterInvoker Сбой возвращает null, повторная попытка 2 раза
ForkingCluster ForkingClusterInvoker Одновременный вызов задачи, успех одного — успех
BroadcastCluster BroadcastClusterInvoker Вызов каждого invoker, все доступны
AvailableCluster AvailableClusterInvoker Используется доступный
MergeableCluster MergeableClusterInvoker Объединяет результаты

Интеллектуальная отказоустойчивость и балансировка нагрузки

В Dubbo обычно существует четыре стратегии балансировки нагрузки.

  • RandomLoadBalance: взвешенный случайный выбор, его алгоритм прост. Предположим, есть группа серверов servers = [A, B, C], соответствующие веса равны weights = [5, 3, 2], а общий вес равен 10. Теперь эти веса равномерно распределяются по одномерному координатному значению, интервал [0, 5) принадлежит серверу A, интервал [5, 8) принадлежит серверу B, интервал [8, 10) принадлежит серверу C. Затем используйте генератор случайных чисел для генерации случайного числа в диапазоне от 0 до 10, а затем вычислите, к какому интервалу принадлежит случайное число. По умолчанию.
  • LeastActiveLoadBalance: балансировка нагрузки с наименьшим числом активных вызовов, выбирается поставщик услуг с наименьшим количеством текущих активных вызовов для вызова, поскольку текущий активный вызов означает, что он очень лёгкий, и активный вызов подсчитывается от 0. Приходит запрос, активный вызов +1, обработка завершена, активный вызов -1. Таким образом, хотя активный вызов невелик, он также может косвенно отражать скорость обработки.
  • RoundRobinLoadBalance: взвешенная циклическая балансировка нагрузки, например, теперь есть два сервера A и B, порядок вызова по кругу — A, B, A, B. Если добавить вес, отношение веса A к весу B равно 2:1, тогда порядок вызова становится A, A, B, A, A, B.
  • ConsistentHashLoadBalance: согласованная хэш-балансировка нагрузки, IP-адрес и другая информация о сервере генерируют хеш-значение, которое проецируется на круг в качестве узла, а когда требуется ключ для поиска, он ищет первый узел, больший или равный этому ключу, двигаясь по часовой стрелке. Обычно вводится виртуальный узел, чтобы сделать данные более разбросанными и избежать концентрации данных на одном узле. Как показано на рисунке ниже, Dubbo по умолчанию имеет 160 виртуальных узлов.

Интеллектуальная отказоустойчивость и каталог услуг

Каталог услуг можно понимать как набор invokers с одинаковыми услугами. Ядром является класс RegistryDirectory. Он имеет три функции:

  • Получать список invokers из реестра.
  • Отслеживать изменения invoker в реестре, включая онлайн и офлайн.
  • Обновить список invokers в каталоге услуг.

Интеллектуальная отказоустойчивость и маршрутизация услуг

Маршрутизация услуг фактически является правилом маршрутизации, которое определяет, какие поставщики услуг могут быть вызваны потребителями услуг. Условие правила маршрутизации состоит из двух условий, которые используются для сопоставления потребителей услуг и поставщиков услуг соответственно. Например, существует такое правило:

host = 10.20.153.14 => host = 10.20.153.12

Это правило означает, что потребители услуг с IP-адресом 10.20.153.14 могут вызывать только поставщиков услуг с IP-адресом 10.20.153.12 и не могут вызывать других поставщиков услуг. Формат правил маршрутизации условий следующий:

[Условие соответствия потребителя услуг] => [Условие соответствия поставщика услуг]

Если условие соответствия потребителя услуг пусто, это означает, что ограничение на потребителя услуг не применяется. Если условие соответствия поставщика услуг пусто, это означает, что некоторые потребители услуг отключены. Зависимость:

<dependency>
    <groupId>com.alibaba.boot</groupId>
    <artifactId>nacos-config-spring-boot-starter</artifactId>
    <version>${latest.version}</version>
</dependency>  

Примечание: версия 0.2.x.RELEASE соответствует Spring Boot версии 2.x, а версия 0.1.x.RELEASE — Spring Boot версии 1.x.

  • Второй шаг: в application.properties настройте адрес сервера Nacos:
nacos.config.server-addr=127.0.0.1:8848
  • Третий шаг: используйте @NacosPropertySource для загрузки источника конфигурации с dataId «example» и включите автоматическое обновление:
@SpringBootApplication
@NacosPropertySource(dataId = "example", autoRefreshed = true)
public class NacosConfigApplication {
    public static void main(String[] args) {
        SpringApplication.run(NacosConfigApplication.class, args);
    }
}
  • Четвёртый шаг: с помощью аннотации @NacosValue установите значение свойства.
@Controller
@RequestMapping("config")
public class ConfigController {

    @NacosValue(value = "${useLocalCache:false}", autoRefreshed = true)
    private boolean useLocalCache;

    @RequestMapping(value = "/get", method = GET)
    @ResponseBody
    public boolean get() {
        return useLocalCache;
    }
}
  • Пятый шаг: запустите NacosConfigApplication, вызовите curl http://localhost:8080/config/get и получите ответ false.

  • Шестой шаг: опубликуйте конфигурацию на сервере Nacos с помощью вызова Nacos Open API. Установите dataId как «example», а содержимое как «useLocalCache=true».

curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=example&group=DEFAULT_GROUP&content=useLocalCache=true"
  • Седьмой шаг: снова посетите http://localhost:8080/config/get, и теперь ответ будет true, что означает, что значение useLocalCache в программе было динамически обновлено.

Запуск службы обнаружения

В этом разделе показано, как запустить службу обнаружения Nacos в вашем проекте Spring Boot.

  • Первый шаг: добавьте зависимость.
<dependency>
    <groupId>com.alibaba.boot</groupId>
    <artifactId>nacos-discovery-spring-boot-starter</artifactId>
    <version>${latest.version}</version>
</dependency>

Примечание: версия 0.2.x.RELEASE соответствует Spring Boot версии 2.x, а версия 0.1.x.RELEASE — Spring Boot версии 1.x.

  • Второй шаг: настройте адрес сервера Nacos в файле application.properties.
nacos.discovery.server-addr=127.0.0.1:8848
  • Третий шаг: внедрите экземпляр NamingService Nacos с помощью @NacosInjected.
@Controller
@RequestMapping("discovery")
public class DiscoveryController {

    @NacosInjected
    private NamingService namingService;

    @RequestMapping(value = "/get", method = GET)
    @ResponseBody
    public List<Instance> get(@RequestParam String serviceName) throws NacosException {
        return namingService.getAllInstances(serviceName);
    }
}

@SpringBootApplication
public class NacosDiscoveryApplication {

    public static void main(String[] args) {
        SpringApplication.run(NacosDiscoveryApplication.class, args);
    }}
  • Четвертый шаг: запустите приложение NacosDiscoveryApplication и вызовите curl http://localhost:8080/discovery/get?serviceName=example. Ответ будет пустым массивом JSON [].

  • Пятый шаг: зарегистрируйте сервис с именем example на сервере Nacos, используя вызов Nacos Open API.

curl -X PUT 'http://127.0.0.1:8848/nacos/v1/ns/instance?serviceName=example&ip=127.0.0.1&port=8080'
[
  {
    "instanceId": "127.0.0.1-8080-DEFAULT-example",
    "ip": "127.0.0.1",
    "port": 8080,
    "weight": 1.0,
    "healthy": true,
    "cluster": {
      "serviceName": null,
      "name": "",
      "healthChecker": {
        "type": "TCP"
      },
      "defaultPort": 80,
      "defaultCheckPort": 80,
      "useIPPort4Check": true,
      "metadata": {}
    },
    "service": null,
    "metadata": {}
  }
]

Spring Cloud

Этот раздел описывает, как настроить управление конфигурацией и обнаружение служб с использованием Nacos и Spring Cloud.

После запуска сервера Nacos вы можете использовать следующие примеры кода для настройки управления конфигурацией и обнаружения служб в ваших приложениях Spring Cloud.

  • Управление конфигурацией:

    • Шаг 1: Добавьте зависимость:
    <dependency>
        <groupId>com.alibaba.cloud</groupId>

    Используйте эту зависимость для интеграции Nacos в ваше приложение Spring Cloud.

    • Шаг 2: Настройте адрес сервера Nacos в файле свойств приложения:
    nacos.config.server-addr=127.0.0.1:8848

    Это позволит вашему приложению получать конфигурации от сервера Nacos.

    • Шаг 3: Используйте аннотацию @EnableConfigServer для включения управления конфигурацией:
    @SpringBootApplication
    @EnableConfigServer
    public class MyApplication {
        public static void main(String[] args) {
            SpringApplication.run(MyApplication.class, args);
        }
    }

    После этого ваше приложение будет готово к получению конфигураций от сервера Nacos.

  • Обнаружение служб:

    • Шаг 1: Добавьте зависимость для обнаружения служб:
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>

    Эта зависимость позволяет вашему приложению обнаруживать другие сервисы в вашей среде.

    • Шаг 2: Настройте сервер обнаружения в файле свойств:
    eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/

    Здесь мы указываем адрес сервера обнаружения (в данном случае это локальный сервер Eureka).

    • Шаг 3: Включите обнаружение сервисов в приложении:
    @SpringBootApplication
    @EnableEurekaClient
    public class MyApplication {
        public static void main(String[] args) {
            SpringApplication.run(MyApplication.class, args);
        }
    }

    С помощью этой аннотации ваше приложение будет регистрироваться на сервере обнаружения и получать информацию о других сервисах. artifactIdspring-cloud-starter-alibaba-nacos-config version${latest.version} dependency


*注意*: версия [2.1.x.RELEASE](https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-alibaba-nacos-config) соответствует Spring Boot 2.1.x версии. Версия [2.0.x.RELEASE](https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-alibaba-nacos-config) соответствует Spring Boot 2.0.x версии, версия [1.5.x.RELEASE](https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-alibaba-nacos-config) — Spring Boot 1.5.x версии.

**Второй шаг**: в *bootstrap.properties* настроить адрес сервера Nacos и имя приложения:

```properties
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
spring.application.name=example

Объяснение: причина настройки spring.application.name заключается в том, что это часть поля dataId управления конфигурацией Nacos.

Третий шаг: в Nacos Spring Cloud поле dataId имеет следующий полный формат:

${prefix}-${spring.profiles.active}.${file-extension}

prefix по умолчанию равен значению spring.application.name, но его также можно настроить с помощью параметра конфигурации spring.cloud.nacos.config.prefix.

spring.profiles.active — это профиль текущей среды, подробности см. в документации Spring Boot. Обратите внимание: если spring.profiles.active пуст, соответствующий соединительный символ - также не существует, формат соединения dataId становится ${prefix}.${file-extension}.

file-exetension — формат данных содержимого конфигурации, который можно настроить с помощью параметра конфигурации spring.cloud.nacos.config.file-extension. В настоящее время поддерживаются только типы properties и yaml.

Четвёртый шаг: реализовать автоматическое обновление конфигурации с помощью аннотации Spring Cloud @RefreshScope:

@RestController
@RequestMapping("/config")
@RefreshScope
public class ConfigController {

    @Value("${useLocalCache:false}")
    private boolean useLocalCache;

    @RequestMapping("/get")
    public boolean get() {
        return useLocalCache;
    }
}

Пятый шаг: сначала опубликовать конфигурацию на сервере Nacos через Open API Nacos: dataId равен example.properties, содержимое — useLocalCache=true:

curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=example.properties&group=DEFAULT_GROUP&content=useLocalCache=true"

Шестой шаг: запустить NacosConfigApplication, вызвать curl http://localhost:8080/config/get, ответ будет true.

Седьмой шаг: снова опубликовать конфигурацию через Open API Nacos на сервере Nacos: dataId равен example.properties, содержимое — useLocalCache=false:

curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=example.properties&group=DEFAULT_GROUP&content=useLocalCache=false"

Восьмой шаг: ещё раз посетите http://localhost:8080/config/get, на этот раз ответ будет false, что означает, что значение useLocalCache в программе было динамически обновлено.

Запуск службы обнаружения

В этом разделе показано, как включить функцию обнаружения служб Nacos в вашем проекте Spring Cloud, реализовав простой echo service, как показано на рисунке ниже:

echo_service

Первый шаг: добавить зависимость:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    <version>${latest.version}</version>
</dependency>

Обратите внимание: версия 2.1.x.RELEASE соответствует Spring Boot 2.1.x версии. Версия 2.0.x.RELEASE — Spring Boot 2.0.x версии, версия 1.5.x.RELEASE — Spring Boot 1.5.x версии.

Второй шаг: настроить поставщика услуг, чтобы поставщик услуг мог зарегистрировать свои услуги на сервере Nacos с помощью функции регистрации и обнаружения сервисов Nacos.

i. В application.properties настройте адрес сервера Nacos:

server.port=8070
spring.application.name=service-provider
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848

ii. Используйте аннотацию Spring Cloud @EnableDiscoveryClient для включения функции регистрации и обнаружения служб:

@SpringBootApplication
@EnableDiscoveryClient
public class NacosProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(NacosProviderApplication.class, args);
    }

    @RestController
    class EchoController {
        @RequestMapping(value = "/echo/{string}", method = RequestMethod.GET)
        public String echo(@PathVariable String string) {
            return "Hello Nacos Discovery " + string;
        }
    }
} **Третий шаг**: настройка потребителя услуг, чтобы он мог получать нужные ему услуги через функцию регистрации и обнаружения сервисов Nacos с сервера Nacos.

i. В `application.properties` настройте адрес сервера Nacos:
```properties
server.port=8080
spring.application.name=service-consumer
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848

ii. С помощью Spring Cloud включите функцию обнаружения сервисов, используя аннотацию @EnableDiscoveryClient. Добавьте аннотацию @LoadBalanced к экземпляру RestTemplate, чтобы включить интеграцию с @LoadBalanced и Ribbon:

@SpringBootApplication
@EnableDiscoveryClient
public class NacosConsumerApplication {

    @LoadBalanced
    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    public static void main(String[] args) {
        SpringApplication.run(NacosConsumerApplication.class, args);
    }

    @RestController
    public class TestController {

        private final RestTemplate restTemplate;

        @Autowired
        public TestController(RestTemplate restTemplate) {this.restTemplate = restTemplate;}

        @RequestMapping(value = "/echo/{str}", method = RequestMethod.GET)
        public String echo(@PathVariable String str) {
            return restTemplate.getForObject("http://service-provider/echo/" + str, String.class);
        }
    }
}

Четвёртый шаг: запустите ProviderApplication и ConsumerApplication, вызовите http://localhost:8080/echo/2018, и получите ответ «Hello Nacos Discovery 2018».

Sentinel

Sentinel — это система защиты стабильности сервисов в распределённых системах, которая фокусируется на трафике. Sentinel защищает стабильность сервисов с разных точек зрения, таких как управление трафиком, аварийное отключение, снижение нагрузки и защита от перегрузки системы. Sentinel обладает следующими характеристиками:

  • Разнообразные сценарии применения: Sentinel использовался для управления трафиком во время крупных распродаж Alibaba, например, для контроля пикового трафика во время «чёрной пятницы» или для управления сообщениями о пиках и спадах трафика, а также для управления кластерным трафиком и аварийного отключения нижестоящих приложений.
  • Комплексный мониторинг в реальном времени: Sentinel предоставляет функции мониторинга в реальном времени. Вы можете видеть данные с одной машины за секунду или даже с кластера из 500 машин.
  • Обширная экосистема с открытым исходным кодом: Sentinel предлагает готовые модули интеграции с другими фреймворками и библиотеками с открытым исходным кодом, такими как Spring Cloud, Dubbo и gRPC. Вам нужно только добавить соответствующие зависимости и выполнить простую настройку, чтобы быстро интегрировать Sentinel.
  • Полноценный интерфейс расширения SPI: Sentinel предоставляет простой и удобный интерфейс расширения SPI. Вы можете быстро настроить логику, реализовав интерфейс расширения, например, управление правилами или адаптацию динамических источников данных.

Основные характеристики

Основные характеристики Sentinel: Sentinel-features-overview

Экосистема с открытым исходным кодом

Экосистема Sentinel с открытым исходным кодом: Sentinel-opensource-eco Sentinel состоит из двух частей:

  • Ядро библиотеки (Java-клиент) не зависит от каких-либо фреймворков или библиотек и может работать в любой среде выполнения Java, при этом оно также хорошо поддерживает Dubbo / Spring Cloud и другие фреймворки.
  • Консоль управления (Dashboard) разработана на основе Spring Boot и может быть запущена напрямую после упаковки без необходимости дополнительного контейнера приложений, такого как Tomcat.

Быстрый старт

Ручная интеграция Sentinel и консоли управления

Пример ниже показывает, как можно интегрировать Sentinel за три шага. Кроме того, Sentinel также предоставляет консоль управления, где вы можете отслеживать ресурсы и управлять правилами в режиме реального времени. ШАГ 1. Добавление Jar-файла Sentinel в приложение Если вы используете проект Maven, добавьте следующий код в файл pom.xml:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-core</artifactId>
    <version>1.8.1</version>
</dependency>

Обратите внимание: Sentinel поддерживает только JDK 1.8 или более поздние версии. Если вы не используете инструмент управления зависимостями, загрузите JAR-файл непосредственно из Maven Center Repository. ШАГУ 2. Определение ресурсов Далее мы используем API Sentinel SphU.entry("HelloWorld") и entry.exit() для окружения кода, который необходимо контролировать. В следующем примере мы используем System.out.println("hello world"); в качестве ресурса и оборачиваем его в API (SphU.entry() и entry.exit()).

public static void main(String[] args) {
    initFlowRules();
    while (true) {
        Entry entry = null;
        try {
        entry = SphU.entry("HelloWorld");
            /*Ваш бизнес-код - начало*/
            System.out.println("hello world");
            /*Ваш бизнес-код - конец*/
    } catch (BlockException e1) {
            /*Логика управления потоком - начало*/
        System.out.println("block!");
            /*Логика управления потоком - конец*/
    } finally {
       if (entry != null) {
           entry.exit();
       }
    }
    }
}

После выполнения этих двух шагов модификация кода завершена. Конечно, мы также предоставляем модуль поддержки аннотаций, который позволяет определять ресурсы с низкой степенью вмешательства. ШАГУ 3. Определение правил Затем мы определяем правила, которые указывают, сколько запросов разрешено для каждого ресурса. Например, следующее правило позволяет ресурсу HelloWorld обрабатывать не более 20 запросов в секунду.

private static void initFlowRules(){
    List<FlowRule> rules = new ArrayList<>();
    FlowRule rule = new FlowRule();
    rule.setResource("HelloWorld");
    rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
    // Set limit QPS to 20.
    rule.setCount(20);
    rules.add(rule);
    FlowRuleManager.loadRules(rules);
}

Выполнив эти три шага, вы сможете использовать Sentinel. Для получения дополнительной информации см. документацию. STEP 4. Проверка эффекта

После запуска Demo мы можем увидеть в журнале ~/logs/csp/${appName}-metrics.log.xxx следующий вывод:

|--timestamp-|------date time----|-resource-|p |block|s |e|rt
1529998904000|2018-06-26 15:41:44|HelloWorld|20|0    |20|0|0
1529998905000|2018-06-26 15:41:45|HelloWorld|20|5579 |20|0|728
1529998906000|2018-06-26 15:41:46|HelloWorld|20|15698|20|0|0
1529998907000|2018-06-26 15:41:47|HelloWorld|20|19262|20|0|0
1529998908000|2018-06-26 15:41:48|HelloWorld|20|19502|20|0|0
1529998909000|2018-06-26 15:41:49|HelloWorld|20|18386|20|0|0

Здесь p обозначает количество запросов, успешно прошедших через систему, block — количество заблокированных запросов, s — количество успешных выполнений, e — пользовательские исключения, rt — среднее время отклика. Мы видим, что программа стабильно выводит «hello world» 20 раз в секунду, что соответствует заданному порогу.

STEP 5. Запуск Sentinel Control Panel

Для запуска контрольной панели Sentinel можно обратиться к документации Sentinel (https://github.com/alibaba/Sentinel/wiki/控制台). Панель позволяет отслеживать состояние ресурсов в реальном времени и изменять правила ограничения потока.

dashboard-monitoring

Influxdb

InfluxDB — это распределённая база данных временных рядов с открытым исходным кодом, написанная на языке Go. Она не требует внешних зависимостей и занимает первое место в категории баз данных временных рядов по версии DB-Engines.

Основные характеристики

  • Упрощённая архитектура: для работы InfluxDB достаточно установить один бинарный файл, который не имеет внешних зависимостей.
  • Высокая производительность записи: InfluxDB использует собственную систему хранения TSM (Time-Structured Merge Tree), которая обеспечивает высокую скорость записи и эффективное сжатие данных.
  • Эффективный поиск: теги индексируются, обеспечивая быстрый поиск.
  • InfluxQL: предоставляет язык запросов, похожий на SQL, для удобства использования.
  • Continuous Queries: позволяют автоматически агрегировать данные и ускорять запросы.

Система хранения

InfluxDB использует систему хранения TSM (Time-Structured Merge Tree) собственной разработки. TSM основана на LSM (Log-Structured Merge-Tree), но оптимизирована для удовлетворения требований к базам данных временных рядов.

Требования к базам данных временных рядов

  • Десятки миллиардов отдельных точек данных.
  • Высокая пропускная способность при записи.
  • Высокая скорость чтения.
  • Большие объёмы удаления данных (по истечении срока действия).
  • Основной объём работы связан с добавлением новых данных, а не с обновлением существующих.

LSM

LSM (Log-Structured Merge-Tree) — это принцип работы LevelDB, одного из самых популярных хранилищ ключей и значений. Однако у него есть некоторые ограничения, которые затрудняют его использование в качестве системы хранения для баз данных временных рядов:

  • Уровень безопасности: для безопасного резервного копирования необходимо закрыть базу данных перед копированием. Некоторые варианты LevelDB (RocksDB и HyperLevelDB) решают эту проблему.
  • Управление данными: базы данных временных рядов требуют автоматического управления сохранением данных, включая удаление устаревших данных. В LevelDB удаление данных происходит медленно и требует дополнительных усилий.

TSM

TSM разделяет данные на сегменты (Shards) в зависимости от временного диапазона. Поскольку данные временных рядов обычно добавляются последовательно, сегменты также растут линейно. Новые данные обычно записываются в последний сегмент, а не распределяются между несколькими сегментами.

Преимущества сегментирования включают лёгкое физическое удаление данных (достаточно удалить весь сегмент) и возможность масштабирования путём добавления новых сегментов.

В ранних версиях InfluxDB каждый сегмент представлял собой отдельную базу данных LevelDB. Это приводило к проблемам с управлением большим количеством файлов, так как LevelDB использует многоуровневую систему сжатия.

Чтобы решить эту проблему, InfluxDB перешёл на BoltDB, который использует B+ деревья для хранения данных. Однако B+ деревья могут вызывать большое количество случайных операций записи, что снижает производительность.

Наконец, InfluxDB разработал собственную систему хранения TSM, основанную на принципах LSM, но адаптированную под требования баз данных временных рядов. TSM включает в себя такие компоненты, как кэш, WAL (журнал упреждающей записи) и файлы данных, а также операции, такие как очистка и уплотнение данных.

Архитектура системы

InfluxDB系统架构

  • База данных: пользователи могут создавать новые базы данных с помощью команды create database xxx.
  • Политика хранения (Retention Policy, RP): определяет срок хранения данных.
  • Группа сегментов (Shard Group): логическая концепция, объединяющая множество сегментов. Сегменты являются основными единицами хранения и обработки данных в InfluxDB. Boot в основном является расширением Spring-фреймворка. Он устраняет необходимость в настройке XML-конфигурации для приложений Spring, обеспечивая более быструю и эффективную среду разработки.

Основные особенности Spring Boot:

  • создание независимых Spring-приложений;
  • встраивание Tomcat, Jetty и Undertow без необходимости их развёртывания;
  • предоставление «starters» POM для упрощения конфигурации Maven;
  • автоматическая настройка Spring-приложения по возможности;
  • обеспечение производственных метрик, проверки надёжности и внешней конфигурации;
  • отсутствие требований к генерации кода и XML-конфигурации.

Spring — это фреймворк, который упрощает разработку корпоративных приложений на Java. Он предоставляет набор инструментов и библиотек для создания, настройки и управления приложениями.

Основные компоненты Spring

На диаграмме показаны основные компоненты Spring:

  1. ApplicationContext — основной интерфейс для доступа к компонентам приложения.
  2. BeanFactory — интерфейс, предоставляющий доступ к компонентам приложения через фабрику.
  3. BeanDefinition — определение компонента, включая его имя, класс и свойства.
  4. PostProcessor — процессор, который может изменять или дополнять определения компонентов после их создания.
  5. AOP Alliance — набор интерфейсов и классов для реализации аспектно-ориентированного программирования.
  6. ResourceLoader — загрузчик ресурсов, таких как файлы свойств или XML-файлы.
  7. MessageSource — источник сообщений, используемый для интернационализации.
  8. EventPublisher — издатель событий, позволяющий компонентам подписываться на события.
  9. ApplicationEvent — событие, которое может быть опубликовано в приложении.
  10. ExceptionHandler — обработчик исключений, который может обрабатывать исключения, возникающие в компонентах.

Spring-модули

Spring включает в себя следующие модули:

  • Spring Context — предоставляет фреймворковую реализацию BeanFactory и ApplicationContext, а также корпоративные функции, такие как JNDI, EJB и управление транзакциями.
  • Spring Core — ядро Spring, обеспечивающее базовые функции, такие как IoC (Inversion of Control) и DI (Dependency Injection).
  • Spring AOP — модуль для аспектно-ориентированного программирования, позволяющий добавлять сквозную логику к компонентам.
  • Spring Web — поддержка веб-приложений, включая MVC, REST и веб-сервисы.
  • Spring MVC — реализация Model-View-Controller для веб-приложений.
  • Spring DAO — абстракция над JDBC для упрощения работы с базами данных.
  • Spring ORM — интеграция с популярными ORM-фреймворками, такими как Hibernate и JPA.

Пакеты Spring

Диаграмма показывает основные пакеты Spring:

  1. org.springframework.beans — пакет, содержащий классы и интерфейсы, связанные с управлением компонентами.
  2. org.springframework.context — пакет, включающий классы и интерфейсы для работы с контекстом приложения.
  3. org.springframework.core — пакет с основными классами и интерфейсами Spring.
  4. org.springframework.expression — пакет для обработки выражений.
  5. org.springframework.jdbc — пакет с поддержкой JDBC.
  6. org.springframework.orm — пакет с интеграцией с ORM.
  7. org.springframework.transaction — пакет с управлением транзакциями.
  8. org.springframework.web — пакет с веб-поддержкой.

Spring-аннотации

На диаграмме представлены основные аннотации Spring:

  1. @Component — указывает, что класс является компонентом Spring.
  2. @Autowired — автоматически связывает зависимости между компонентами.
  3. @Qualifier — позволяет указать конкретный компонент при наличии нескольких кандидатов.
  4. @Configuration — используется для определения конфигурации Spring.
  5. @Bean — определяет компонент в конфигурации.
  6. @Scope — устанавливает область видимости компонента.
  7. @Service — указывает на сервисный компонент.
  8. @Repository — указывает на репозиторий данных.
  9. @Controller — указывает на контроллер MVC.

IoC

IoC (Inversion of Control) — это принцип проектирования, при котором управление созданием и зависимостью объектов осуществляется контейнером IoC, а не самим кодом приложения. Это позволяет отделить код приложения от деталей реализации и упростить тестирование.

В Spring IoC реализуется через контейнер IoC, который управляет жизненным циклом компонентов и их зависимостями. Контейнер создаёт экземпляры компонентов, внедряет зависимости и управляет их состоянием.

Существует три способа внедрения зависимостей в Spring:

  • Конструктор — зависимости передаются в конструктор компонента.
  • Setter — зависимости устанавливаются через сеттеры компонента.
  • Annotation — зависимости определяются с помощью аннотаций.

AOP

AOP (Aspect-Oriented Programming) — это парадигма программирования, которая позволяет разделять сквозную логику (аспекты) от бизнес-логики. В Spring AOP реализуется через прокси-объекты, которые перехватывают вызовы методов и добавляют сквозную логику.

Существуют два типа прокси в Spring AOP:

  • JDK dynamic proxy — прокси, создаваемые на основе интерфейса.
  • Cglib proxy — прокси, созданные на основе класса.

Прокси-объекты могут быть созданы с использованием следующих технологий:

  • ProxyFactory — для создания прокси на основе интерфейса.
  • AspectJ — для статического аспекта.
  • Load-time weaving — для динамического аспекта во время загрузки класса.
  • Runtime weaving — для динамического аспекта во время выполнения.

Применение AOP

Сквозная логика, которую можно реализовать с помощью AOP, включает:

  • аутентификацию и авторизацию;
  • кэширование;
  • логирование;
  • транзакции;
  • безопасность;
  • обработку исключений;
  • профилирование.

Фильтры

Фильтры в Spring используются для предварительной обработки запросов перед их передачей контроллеру. Они могут выполнять различные задачи, такие как проверка прав доступа, кодирование/декодирование данных, обработка исключений и т. д.

Фильтры могут быть реализованы с использованием интерфейса Filter или HandlerInterceptor.

Интерфейс Filter предоставляет методы doFilter() для обработки запроса и init() и destroy() для инициализации и очистки фильтра.

HandlerInterceptor предоставляет методы preHandle(), postHandle() и afterCompletion() для перехвата запросов и ответов.

Перехватчики

Перехватчики в Spring позволяют добавлять логику до и после выполнения метода контроллера. Они реализуются с использованием интерфейса HandlerInterceptor или AspectJ.

С помощью перехватчиков можно выполнять такие задачи, как авторизация, логирование, кеширование и т. п. MethodInterceptor — это перехватчик в AOP-проекте, который перехватывает цель в виде метода, даже если это не метод контроллера.

Существует два основных способа реализации перехватчика MethodInterceptor:

  • реализация интерфейса MethodInterceptor;
  • использование аннотаций или конфигурации AspectJ.

Spring процесс

Процесс запуска Spring контейнера

  1. Инициализация Spring контейнера и регистрация встроенных BeanPostProcessor в контейнер:
    • создание экземпляра класса BeanFactory (DefaultListableBeanFactory) для генерации объектов Bean;
    • создание экземпляра BeanDefinitionReader для чтения и преобразования определённых аннотированных классов (например, @Service, @Repository) в объекты BeanDefinition;
    • создание экземпляра ClassPathBeanDefinitionScanner для сканирования и поиска объектов Bean в указанном каталоге.
  2. Регистрация конфигурационного класса в контейнере.
  3. Вызов метода refresh() для обновления контейнера:
    • prepareRefresh() — предварительная обработка перед обновлением;
    • obtainFreshBeanFactory() — получение BeanFactory, созданного при инициализации контейнера;
    • prepareBeanFactory(beanFactory) — предварительная обработка BeanFactory;
    • postProcessBeanFactory(beanFactory) — дочерний класс может переопределить этот метод для дальнейшей настройки после создания и предварительной обработки BeanFactory;
    • invokeBeanFactoryPostProcessors(beanFactory) — выполнение методов BeanFactoryPostProcessor после стандартной инициализации BeanFactory для реализации таких функций, как проксирование, автоматическая инъекция и циклическая зависимость;
    • registerBeanPostProcessors(beanFactory) — регистрация BeanPostProcessors в контейнере для управления процессом инициализации Spring beans и выполнения таких функций, как проксирование, автоматическая инъекция, циклическая зависимость и т. д.;
    • initMessageSource() — инициализация компонента MessageSource для поддержки интернационализации, привязки сообщений и их анализа;
    • initApplicationEventMulticaster() — инициализация EventDispatcher для использования при регистрации слушателей;
    • onRefresh() — дочерние контейнеры и классы могут переопределять этот метод для выполнения пользовательской логики во время обновления контейнера;
    • registerListeners() — регистрация слушателей. Все слушатели ApplicationListener в контейнере регистрируются в EventDispatcher, и события, созданные на предыдущих шагах, отправляются им;
    • finishBeanFactoryInitialization(beanFactory) — инициализация оставшихся одноэлементных beans. Основной метод — preInstantiateSingletons(), который вызывает getBean() для создания объектов;
    • finishRefresh() — публикация события о завершении обновления BeanFactory контейнера.

SpringMVC процесс

  1. Пользователь отправляет запрос на передний контроллер DispatcherServlet.
  2. После получения запроса DispatcherServlet вызывает HandlerMapping для поиска обработчика.
  3. HandlerMapping находит конкретный обработчик Handler на основе URL запроса и возвращает его вместе с перехватчиком Handler (если он есть) в DispatcherServlet.
  4. DispatcherServlet вызывает HandlerAdapter для выполнения обработчика Handler.
  5. HandlerAdapter адаптирует обработку и вызывает конкретный обработчик для бизнес-логики.
  6. Обработчик Handler завершает обработку и возвращает ModelAndView.
  7. HandlerAdapter возвращает ModelAndView в DispatcherServlet.
  8. DispatcherServlet передаёт ModelAndView на ViewResolver для разрешения представления.
  9. ViewResolver разрешает представление и возвращает конкретное представление View.
  10. DispatcherServlet отображает представление (заполняет модель данными в представлении).
  11. DispatcherServlet отвечает пользователю.

Front Controller DispatcherServlet: получает запросы и отвечает на них, действует как маршрутизатор и снижает степень сцепления между компонентами. HandlerMapping: определяет обработчик на основе URL запроса. HandlerAdapter: отвечает за выполнение обработчиков. Обработчик Handler: обработчики, которые должны быть разработаны программистом. ViewResolver: разрешает представление на основе логики имени представления и преобразует ModelAndView в реальное представление (View). Представление View: интерфейс представления, поддерживающий различные типы представлений, такие как jsp, freemarker, pdf и другие.

Неперехваченный HTTP-запрос

Последовательность выполнения обычного HTTP-запроса пользователя:

Запрос → Контроллер → Сервис → DAO → База данных

Перехваченный HTTP-запрос

После добавления фильтров и перехватчиков последовательность выполнения становится следующей:

Запрос → Фильтр → Контроллер → Перехватчик → Сервис → DAO → База данных

Расширение точек SpringBoot

Расширение точки запуска Spring

Пользовательские расширения могут быть добавлены до или после инициализации всего Spring контейнера. Существует три способа расширения:

  • добавление в класс запуска с помощью springApplication.addInitializers(new TestApplicationContextInitializer());
  • настройка в файле конфигурации context.initializer.classes=com.example.demo.TestApplicationContextInitializer;
  • расширение Spring SPI, добавив org.springframework.context.ApplicationContextInitializer=com.example.demo.TestApplicationContextInitializer в spring.factories.

BeanDefinitionRegistryPostProcessor

Расширение выполняется после чтения beanDefinition из проекта и предоставляет дополнительную точку расширения. Здесь можно динамически регистрировать собственные beanDefinitions или загружать beanDefinitions из classpath. BeanFactoryPostProcessor

Является расширением интерфейса beanFactory. Вызывается после того, как Spring считывает информацию о beanDefinition, но перед тем, как происходит создание экземпляра bean. В этот момент пользователь может обработать некоторые вещи через реализацию этого расширения, например, изменить метаинформацию уже зарегистрированного beanDefinition.

public class TestBeanFactoryPostProcessor implements BeanFactoryPostProcessor {  
    @Override  
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {  
        System.out.println("[BeanFactoryPostProcessor]");  
    }  
}

InstantiationAwareBeanPostProcessor

Этот интерфейс наследует от BeanPostProcess. Отличие состоит в следующем:

  • Интерфейс BeanPostProcess расширяется только на этапе инициализации bean (до и после внедрения в контекст Spring).
  • InstantiationAwareBeanPostProcessor расширяет функциональность на этапах создания экземпляра и внедрения свойств.

Основные точки расширения включают пять методов, которые применяются на двух основных этапах жизненного цикла bean: создание экземпляра и инициализация. Методы вызываются в следующем порядке:

  • postProcessBeforeInstantiation: перед созданием экземпляра bean, аналогично new для этого bean.
  • postProcessAfterInstantiation: после создания экземпляра bean, аналогично новому для этого bean.
  • postProcessPropertyValues: когда bean полностью создан и происходит внедрение свойств с использованием аннотаций @Autowired и @Resource.
  • postProcessBeforeInitialization: перед инициализацией bean, аналогично внедрению bean в контекст Spring.
  • postProcessAfterInitialization: после инициализации bean, аналогично завершению внедрения bean в контекст Spring.

Применение: Этот расширенный функционал полезен в различных сценариях, включая написание промежуточного программного обеспечения и бизнес-логику. Он позволяет выполнять сбор информации о bean, реализующих определённый интерфейс, или устанавливать значения для определённых типов bean.

public class TestInstantiationAwareBeanPostProcessor implements InstantiationAwareBeanPostProcessor {  
    @Override  
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {  
        System.out.println("[TestInstantiationAwareBeanPostProcessor] before initialization " + beanName);  
        return bean;  
    }  
  
    @Override  
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {  
        System.out.println("[TestInstantationAwareBeanPostProcessor] after initialization " + beanName);  
        return bean;  
    }  
  
    @Override  
    public Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException {  
        System.out.println("[TestInstantiationAwareBeanPostProcessor] before instantiation " + beanName);  
        return null;  
    }  
  
    @Override  
    public boolean postProcessAfterInstantiation(Object bean, String beanName) throws BeansException {  
        System.out.println("[TestInstantiationAwareBeanPostProcessor] after instantiation " + beanName);  
        return true;  
    }  
  
    @Override  
    public PropertyValues postProcessPropertyValues(PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException {  
        System.out.println("[TestInstantiationAwareBeanPostProcessor] postProcessPropertyValues " + beanName);  
        return pvs;  
    }
}

SmartInstantiationAwareBeanPostProcessor

Интерфейс имеет три точки расширения:

  1. predictBeanType: эта точка расширения происходит до postProcessBeforeInstantiation. Метод используется для прогнозирования типа Bean и возвращает первый успешно предсказанный класс. Если предсказание невозможно, возвращается null.
  2. determineCandidateConstructors: эта точка расширения следует после postProcessBeforeInstantiation и используется для определения конструкторов, используемых для создания экземпляра данного bean. Пользователь может расширить эту точку для выбора соответствующего конструктора.
  3. getEarlyBeanReference: эта точка расширения возникает после postProcessAfterInstantiation, особенно в сценариях с циклическими зависимостями. Когда экземпляр bean готов, чтобы предотвратить циклические зависимости, происходит предварительное раскрытие метода обратного вызова. Этот метод вызывается в методе обратного вызова, который был предварительно раскрыт. ``` beanName) throws BeansException {
    System.out.println("[TestSmartInstantiationAwareBeanPostProcessor] getEarlyBeanReference " + beanName);
    return bean;
    }

### BeanFactoryAware

Этот класс имеет только одну точку расширения, которая происходит после создания экземпляра компонента, но перед внедрением свойств, то есть до вызова сеттеров. Метод точки расширения этого класса — `setBeanFactory`, который позволяет получить атрибут `BeanFactory`.

Сценарий использования: можно получить `BeanFactory` после создания компонента, но до его инициализации, чтобы выполнить специализированную настройку для каждого компонента или сохранить `BeanFactory` в кэше для последующего использования.

```java
public class TestBeanFactoryAware implements BeanFactoryAware {  
    @Override  
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {  
        System.out.println("[TestBeanFactoryAware] " + beanFactory.getBean(TestBeanFactoryAware.class).getClass().getSimpleName());  
    }  
}

ApplicationContextAwareProcessor

Сам по себе этот класс не имеет точек расширения, но внутри него есть шесть точек расширения, которые могут быть реализованы и срабатывают после создания экземпляров компонентов, но до их инициализации. Этот класс используется для выполнения различных драйверов интерфейса, и после создания и заполнения свойств компонентов через выполнение расширенных интерфейсов, указанных в красном поле, можно получить соответствующие переменные контейнера. Поэтому здесь следует сказать, что существует шесть точек расширения:

  • EnvironmentAware: используется для получения расширения класса EnviromentAware, этот атрибут очень полезен и может получать все параметры системы. Конечно, я считаю, что расширение Aware не обязательно, потому что Spring может напрямую внедрить его через аннотацию.
  • EmbeddedValueResolverAware: используется для получения расширения класса StringValueResolver, которое используется для получения переменных на основе типа String, обычно мы используем @Value для получения, если вы реализуете этот интерфейс Aware, сохраните StringValueResolver в кэш и получите объект String через этот класс, эффект будет таким же.
  • ResourceLoaderAware: используется для получения расширения класса ResourceLoader, который может использоваться для получения ресурсов в пути к классам, вы можете расширить этот класс для получения объекта ResourceLoader.
  • ApplicationEventPublisherAware: используется для получения расширения класса ApplicationEventPublisher, который можно использовать для публикации событий и совместного использования с ApplicationListener. Этот объект также можно внедрить через Spring.
  • MessageSourceAware: используется для получения расширения класса MessageSource, который в основном используется для интернационализации.
  • ApplicationContextAware: используется для получения расширения класса ApplicationContext, который должен быть хорошо знаком многим людям. Это диспетчер контекста Spring, который может вручную получать любые компоненты, зарегистрированные в контексте Spring. Мы часто расширяем этот интерфейс для кэширования Spring Context и инкапсуляции его в статический метод. В то же время ApplicationContext также реализует интерфейсы BeanFactory, MessageSource и ApplicationEventPublisher, которые также могут использоваться для связанных операций.
class ApplicationContextAwareProcessor implements BeanPostProcessor {
    private void invokeAwareInterfaces(Object bean) {
        if (bean instanceof EnvironmentAware) {
          ((EnvironmentAware) bean).setEnvironment(this.applicationContext.getEnvironment());
        }
        if (bean instanceof EmbeddedValueResolverAware) {
          ((EmbeddedValueResolverAware) bean).setEmbeddedValueResolver(this.embeddedValueResolver);
        }
        if (bean instanceof ResourceLoaderAware) {
          ((ResourceLoaderAware) bean).setResourceLoader(this.applicationContext);
        }
        if (bean instanceof ApplicationEventPublisherAware) {
          ((ApplicationEventPublisherAware) bean).setApplicationEventPublisher(this.applicationContext);
        }
        if (bean instanceof MessageSourceAware) {
          ((MessageSourceAware) bean).setMessageSource(this.applicationContext);
        }
        if (bean instanceof ApplicationContextAware) {
          ((ApplicationContextAware) bean).setApplicationContext(this.applicationContext);
        }
    }
}

BeanNameAware

Это ещё один вид расширения Aware. Точка расширения находится перед инициализацией компонента, то есть перед postProcessBeforeInitialization, и метод точки расширения этого класса только один: setBeanName.

Сценарии использования: пользователи могут расширять эту точку, получать beanName из контейнера Spring перед инициализацией компонента и самостоятельно изменять значение beanName.

public class NormalBeanA implements BeanNameAware{  
    public NormalBeanA() {  
        System.out.println("NormalBean constructor");  
    }  
  
    @Override  
    public void setBeanName(String name) {  
        System.out.println("[BeanNameAware] " + name);  
    }  
}

@PostConstruct

Это не точка расширения, а фактически аннотация. Его функция заключается в том, что если метод аннотирован @PostConstruct, он будет вызываться на этапе инициализации компонента. Здесь важно обратить внимание на стандартную точку срабатывания, которая находится после postProcessBeforeInitialization и перед InitializingBean.afterPropertiesSet. Сценарии использования: пользователь может аннотировать определённый метод для инициализации определённого свойства.

public class NormalBeanA {  
    public NormalBeanA() {  
        System.out.println("NormalBean constructor");  
    }  
  
    @PostConstruct  
    public void init(){  
        System.out.println("[PostConstruct] NormalBeanA");  
    }  
}

InitializingBean

В этом классе есть два метода, которые можно использовать для настройки инициализации компонентов: afterPropertiesSet и destroy. Первый метод вызывается после внедрения всех свойств компонента, а второй — при уничтожении компонента. Фабрика бинов в Mybatis

Динамический прокси интерфейса Mapper в MyBatis также внедряется через фабрику бинов Spring.

public class MapperFactoryBean<T> extends SqlSessionDaoSupport implements FactoryBean<T> {
  
  // Тип интерфейса mapper
  private Class<T> mapperInterface;
 
  @Override
  public T getObject() throws Exception {
    // Получаем динамический прокси объекта интерфейса через SqlSession
    return getSqlSession().getMapper(this.mapperInterface);
  }
  
  @Override
  public Class<T> getObjectType() {
    return this.mapperInterface;
  }

}

Фабрика бинов в OpenFeign

Прокси интерфейса FeignClient также внедряется через фабрику бинов Spring.

public class FeignClientFactoryBean implements 
  FactoryBean<Object>, InitializingBean, ApplicationContextAware {
    
   // Тип интерфейса FeignClient
   private Class<?> type;
    
   @Override
   public Object getObject() throws Exception {
      return getTarget();
   }
    
    @Override
   public Class<?> getObjectType() {
      return type;
   }
  
}

@Import

Основная функция @Import — импортировать классы конфигурации и, в зависимости от используемых аннотаций (например, @EnableXXX), определять, какие бины должны быть внедрены в Spring.

Реализация ImportSelector

public class UserImportSelector implements ImportSelector {

    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        System.out.println("Вызов метода selectImports класса UserImportSelector для получения списка имён классов");
        return new String[]{"com.sanyou.spring.extension.User"};
    }

}

@Import(UserImportSelector.class)
public class Application {

    public static void main(String[] args) {
        AnnotationConfigApplicationContext applicationContext = 
          new AnnotationConfigApplicationContext();
        // Регистрируем класс Application в контейнере
        applicationContext.register(Application.class);
        applicationContext.refresh();
        System.out.println("Полученный Bean: " + applicationContext.getBean(User.class));
    }

}

// Результат выполнения:
// Вызов метода selectImports класса UserImportSelector для получения списка имён классов
// Полученный Bean: com.sanyou.spring.extension.User@282003e1

Реализация ImportBeanDefinitionRegistrar

public class UserImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) {
        // Создаём определение Bean с типом User
        AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.rootBeanDefinition(User.class)
                // Устанавливаем значение свойства username для User как «张小凡»
                .addPropertyValue("username", "张小凡")
                .getBeanDefinition();

        System.out.println("Внедрение User в контейнер Spring");
        // Зарегистрируем определение Bean в контейнере
        registry.registerBeanDefinition("user", beanDefinition);
    }

}

// Импортируем UserImportBeanDefinitionRegistrar
@Import(UserImportBeanDefinitionRegistrar.class)
public class Application {

    public static void main(String[] args) {
        AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
        // Регистрация класса Application в контейнере
        applicationContext.register(Application.class);
        applicationContext.refresh();

        User user = applicationContext.getBean(User.class);
        System.out.println("Полученный Bean: " + user + ", значение свойства username: " + user.getUsername());
    }

}

// Результат выполнения:
// Внедрение User в контейнер Spring
// Полученный Bean: com.sanyou.spring.extension.User@6385cb26, значение свойства username: 张小凡

BeanPostProcessor

BeanPostProcessor работает во время создания бина. BeanPostProcessor является важным расширением точки в процессе создания бина, поскольку каждый этап создания бина вызывает методы BeanPostProcessor и его дочерних интерфейсов, передавая текущий создаваемый бин. Если вы хотите расширить определённый этап процесса создания бина, вы можете создать собственный BeanPostProcessor. Два часто используемых дочерних интерфейса:

  • InstantiationAwareBeanPostProcessor;
  • DestructionAwareBeanPostProcessor.
public class UserBeanPostProcessor implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof User) {
            // Если текущий бин имеет тип User, присваиваем значение «张小凡» свойству username этого объекта
            ((User)
``` **Spring встроенные BeanPostProcessor**

| BeanPostProcessor | Функция                                           |
| :------------------ | :------------------------------------------------- |
| AutowiredAnnotationBeanPostProcessor   | Обработка @Autowired, @Value аннотаций            |
| CommonAnnotationBeanPostProcessor      | Обработка @Resource, @PostConstruct, @PreDestroy аннотаций |
| AnnotationAwareAspectJAutoProxyCreator | Обработка некоторых анноток или AOP-срезов динамического прокси |
| ApplicationContextAwareProcessor       | Обработка аннотированных интерфейсов внедрения Aware |
| AsyncAnnotationBeanPostProcessor       | Обработка @Async аннотации                        |
| ScheduledAnnotationBeanPostProcessor   | Обработка @Scheduled аннотации                    |

**Применение BeanPostProcessor в Dubbo**

В Dubbo можно использовать @DubboReference (@Reference) для ссылки на интерфейс, предоставленный производителем. Обработка этой аннотации также осуществляется с помощью ReferenceAnnotationBeanPostProcessor, который является расширением BeanPostProcessor.

```java
public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements ApplicationContextAware, BeanFactoryPostProcessor {
    // Игнорируется
}

BeanFactoryPostProcessor

BeanFactoryPostProcessor позволяет обрабатывать Spring контейнеры. Метод ввода параметров — это сам Spring контейнер. Через этот интерфейс можно выполнять любые операции над контейнером.

public class MyBeanFactoryPostProcessor implements BeanFactoryPostProcessor {

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        // Запретить циклические зависимости
        ((DefaultListableBeanFactory) beanFactory).setAllowCircularReferences(false);
    }

}

Spring SPI механизм

SPI (Service Provider Interface) — это механизм динамической замены. Это отличная идея для развязки, которая позволяет интерфейсам и реализациям быть разделенными, а API-провайдерам предоставлять только интерфейсы, которые могут быть реализованы третьими сторонами. Затем можно использовать файлы конфигурации для замены или расширения. Этот механизм широко используется в фреймворках и повышает их расширяемость.

  • JDK: ServiceLoader
  • Dubbo: ExtensionLoader
  • Spring: SpringFactoriesLoader

Согласно механизму Spring SPI, файлы конфигурации должны находиться в папке META-INF в пути к классам. Имя файла должно быть spring.factories, а содержимое файла — пары ключ-значение. Один ключ может иметь несколько значений, разделенных запятыми. Ключ и значение не обязательно должны быть связаны друг с другом.

Автоматическая сборка

PropertySourceLoader

В среде SpringBoot внешние конфигурационные файлы поддерживают форматы свойств и yaml. Однако если вы хотите использовать файлы конфигурации в формате json, вы можете использовать PropertySourceLoader.

public interface PropertySourceLoader {

   // Может ли он анализировать файлы определенного формата
   String[] getFileExtensions();

   // Анализировать файлы конфигурации, читать содержимое и возвращать список PropertySource<?>
   List<PropertySource<?>> load(String name, Resource resource) throws IOException;

}

SpringBoot предоставляет два варианта реализации PropertySourceLoader:

  • PropertiesPropertySourceLoader: может анализировать файлы конфигурации свойств или xml.

    public class PropertiesPropertySourceLoader implements PropertySourceLoader {
        private static final String XML_FILE_EXTENSION = ".xml";
    
        public PropertiesPropertySourceLoader() {
        }
    
        public String[] getFileExtensions() {
            return new String[]{"properties", "xml"};
        }
    
        public List<PropertySource<?>> load(String name, Resource resource) throws IOException {
            Map<String, ?> properties = this.loadProperties(resource);
            return properties.isEmpty() ? Collections.emptyList() : Collections.singletonList(new OriginTrackedMapPropertySource(name, Collections.unmodifiableMap(properties), true));
        }
    
        private Map<String, ?> loadProperties(Resource resource) throws IOException {
            String filename = resource.getFilename();
            return (Map)(filename != null && filename.endsWith(".xml") ? PropertiesLoaderUtils.loadProperties(resource) : (new
    

- **YamlPropertySourceLoader**: анализирует конфигурационные файлы, которые заканчиваются на .yml или .yaml.

```java
public class YamlPropertySourceLoader implements PropertySourceLoader {
  
    @Override
    public String[] getFileExtensions() {
        return new String[] { "yml", "yaml" };
    }
  
    @Override
    public List<PropertySource<?>> load(String name, Resource resource) throws IOException {
        if (!ClassUtils.isPresent("org.yaml.snakeyaml.Yaml", null)) {
            throw new IllegalStateException(
                    "Attempted to load " + name + " but snakeyaml was not found on the classpath");
        }
        List<Map<String, Object>> loaded = new OriginTrackedYamlLoader(resource).load();
        if (loaded.isEmpty()) {
            return Collections.emptyList();
        }
        List<PropertySource<?>> propertySources = new ArrayList<>(loaded.size());
        for (int i = 0; i < loaded.size(); i++) {
            String documentNumber = (loaded.size() != 1) ? " (document #" + i + ")" : "";
            propertySources.add(new OriginTrackedMapPropertySource(name + documentNumber,
                    Collections.unmodifiableMap(loaded.get(i)), true));
        }
        return propertySources;
    }
}

Реализация поддержки формата JSON

  • Шаг 1: создание класса JsonPropertySourceLoader, который реализует интерфейс PropertySourceLoader.
public class JsonPropertySourceLoader implements PropertySourceLoader {

    @Override
    public String[] getFileExtensions() {
        //этот метод указывает, что класс поддерживает анализ конфигурационных файлов, заканчивающихся на .json
        return new String[]{"json"};
    }

    @Override
    public List<PropertySource<?>> load(String name, Resource resource) throws IOException {

        ReadableByteChannel readableByteChannel = resource.readableChannel();

        ByteBuffer byteBuffer = ByteBuffer.allocate((int) resource.contentLength());

        //читаем содержимое файла в ByteBuffer
        readableByteChannel.read(byteBuffer);
        //преобразуем прочитанные байты в строку
        String content = new String(byteBuffer.array());
        // преобразуем строку в объект JSONObject
        JSONObject jsonObject = JSON.parseObject(content);

        Map<String, Object> map = new HashMap<>(jsonObject.size());
        //извлекаем пары ключ-значение из JSON и помещаем их в map
        for (String key : jsonObject.keySet()) {
            map.put(key, jsonObject.getString(key));
        }

        return Collections.singletonList(new MapPropertySource("jsonPropertySource", map));
    }
}
  • Шаг 2: настройка PropertySourceLoader

Используйте механизм SPI для загрузки реализаций PropertySourceLoader. В файле spring.factories настройте следующее:

org.springframework.boot.env.PropertySourceLoader=\
cn.test.JsonPropertySourceLoader
  • Шаг 3: тестирование

В файле application.json настройте следующее:

{
    "test.userName": "张小凡"
}

Создайте класс User:

public class User {
    // внедряем свойства из конфигурационного файла
    @Value("${test.userName:}")
    private String userName;
}

Запустите проект:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(Application.class);
        User user = applicationContext.getBean(User.class);
        System.out.println("Полученный Bean равен " + user + ", значение свойства userName равно: " + user.getUserName());
    }

    @Bean
    public User user() {
        return new User();
    }

}
// результат выполнения:
// Полученный Bean равен com.sanyou.spring.extension.User@481ba2cf, значение свойства userName равно: 张小凡

Nacos как центр конфигурации поддерживает не только файлы в формате properties и yaml, но и файлы в формате JSON. Поэтому клиент должен уметь анализировать эти форматы. SpringBoot уже поддерживает файлы в форматах properties и yaml. Nacos нужно только реализовать поддержку тех форматов, которые не поддерживаются SpringBoot.

ApplicationContextInitializer

ApplicationContextInitializer — это ещё одна точка расширения процесса запуска SpringBoot. Во время запуска SpringBoot вызывает метод initialize этого класса и передаёт ему ConfigurableApplicationContext. Саньюу: использование пользовательских тегов в Spring

myBean, атрибут класса в теге myBean определяет тип Bean, который будет внедрён в контейнер Spring. Функция этого тега аналогична другим тегам Spring.

Второй шаг: анализ пространства имён Анализ пространства имён прост. Spring предоставляет для этого всё необходимое — интерфейс NamespaceHandler. Достаточно реализовать этот интерфейс. Однако обычно мы не реализуем NamespaceHandler напрямую, а наследуем класс NamespaceHandlerSupport, который уже реализует этот интерфейс.

public class SanYouNameSpaceHandler extends NamespaceHandlerSupport {

    @Override
    public void init() {
        // Регистрация парсера myBean-тегов
        registerBeanDefinitionParser("myBean", new SanYouBeanDefinitionParser());
    }

    private static class SanYouBeanDefinitionParser extends AbstractSingleBeanDefinitionParser {
        @Override
        protected boolean shouldGenerateId() {
            return true;
        }

        @Override
        protected String getBeanClassName(Element element) {
            return element.getAttribute("class");
        }
    }
}

Функция SanYouNameSpaceHandler заключается в чтении тега mybean из пространства имён sanyou и получении атрибута class. Затем этот атрибут используется для определения типа class, который внедряется в Spring-контейнер. Код, связанный с регистрацией, делегируется родительскому классу SanYouBeanDefinitionParser.

Третий шаг: создание и настройка файлов spring.handlers и spring.schemas Сначала создаются файлы spring.handlers и spring.schemas. Содержимое файла spring.handlers:

http\://sanyou.com/schema/sanyou=com.sanyou.spring.extension.namespace.SanYouNameSpaceHandler

Файл конфигурации spring.handlers сообщает, что пространство имён sanyou должно обрабатываться SanYouNameSpaceHandler. Содержимое файла spring.schemas:

http\://sanyou.com/schema/sanyou.xsd=META-INF/sanyou.xsd

В файле spring.schemas указан путь к файлу xsd. Файлы можно разместить в папке classpath/META-INF.

Четвёртый шаг: тестирование Создаётся файл applicationContext.xml и помещается в папку resources:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns="http://www.springframework.org/schema/beans"
       xmlns:sanyou="http://sanyou.com/schema/sanyou"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://sanyou.com/schema/sanyou
       http://sanyou.com/schema/sanyou.xsd">

    <!-- Использование sanyou-тега для настройки User Bean -->
    <sanyou:mybean class="com.sanyou.spring.extension.User"/>

</beans>

Затем создаётся тестовый класс:

public class Application {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
        applicationContext.refresh();
        User user = applicationContext.getBean(User.class);
        System.out.println(user);
    }
}

// Результат выполнения:
// com.sanyou.spring.extension.User@27fe3806

SpringCloud

Eureka

Netflix Eureka — это основанный на REST компонент обнаружения сервисов от Netflix, включающий в себя сервер Eureka Server и клиент Eureka Client.

Клиент Eureka отвечает за регистрацию информации о сервисе на сервере Eureka. Сервер Eureka, похожий на реестр, содержит информацию о расположении и портах различных сервисов. Клиенты могут использовать эту информацию для поиска сервисов.

Рабочий процесс Eureka

Рабочий процесс Eureka выглядит следующим образом:

  1. После запуска сервер Eureka ожидает регистрации сервисов. Если настроено кластеризованное окружение, то между кластерами происходит периодическая синхронизация реестра через Replicate. Каждый сервер Eureka имеет независимый и полный реестр сервисов.
  2. Клиент Eureka регистрирует сервис на сервере Eureka при запуске, следуя конфигурации адреса сервера Eureka.
  3. Каждые 30 секунд клиент Eureka отправляет серверу Eureka запрос сердцебиения, чтобы подтвердить работоспособность сервиса.
  4. Если сервер Eureka не получает запрос сердцебиения от клиента Eureka в течение 90 секунд, он считает, что узел не работает, и удаляет его из реестра.
  5. Если сервер Eureka обнаруживает большое количество клиентов Eureka, которые не отправляют запросы сердцебиения, он предполагает, что это может быть связано с проблемами сети, и переходит в режим самозащиты, прекращая удаление узлов из реестра.
  6. Когда клиенты Eureka возобновляют отправку запросов сердцебиения после восстановления, сервер Eureka автоматически выходит из режима самозащиты.
  7. Клиент Eureka периодически (полностью или частично) обновляет информацию о реестре с сервера Eureka и сохраняет её локально.
  8. При вызове сервиса клиент Eureka сначала ищет информацию о нём в локальном кэше. Если информация не найдена, он обновляет реестр с сервера Eureka перед сохранением данных в кэше.
  9. Клиент Eureka получает информацию о целевом сервере от сервера Eureka и инициирует вызов сервиса.
  10. При завершении работы программы клиент Eureka уведомляет сервер Eureka об отмене регистрации. Сервер Eureka удаляет соответствующий узел из реестра. Внутренняя архитектура Eureka

В микросервисной архитектуре между сервисами обычно происходят межпроцессные вызовы, и коммуникация по сети может столкнуться с различными проблемами, такими как нормальное состояние микросервиса, сетевые разделы или сбои. Если большое количество экземпляров микросервисов будет отменено в течение фиксированного периода времени, это может серьёзно угрожать доступности всей микросервисной архитектуры.

Для решения этой проблемы Eureka разработала механизм самозащиты. Механизм самозащиты Eureka работает следующим образом:

  • Во время работы Eureka Server отслеживает долю неудачных попыток отправки пульса в пределах 15 минут. Если доля неудачных попыток ниже 85%, Eureka Server переходит в режим самозащиты.
  • После перехода в режим самозащиты на странице появляется сообщение: «EurekaServer перешёл в режим самозащиты».

После перехода Eureka Server в режим самозащиты возникают следующие ситуации:

  • Eureka больше не удаляет из списка регистрации сервисы, которые должны были быть удалены из-за длительного отсутствия пульса.
  • Eureka продолжает принимать запросы на регистрацию и поиск новых сервисов, но они не синхронизируются с другими узлами (то есть текущий узел остаётся доступным).
  • Когда сеть стабилизируется, новые регистрационные данные текущего экземпляра будут синхронизированы с другими узлами.

Механизм самозащиты Eureka предназначен для предотвращения случайного удаления сервисов. Он работает следующим образом:

  • Если у отдельного клиента возникает проблема с отправкой пульса, предполагается, что проблема связана с клиентом, и клиент удаляется.
  • Если Eureka обнаруживает большое количество неудачных попыток отправки пульса, предполагается, что проблема может быть связана с сетью, и Eureka переходит в режим самозащиты.
  • Когда пульс клиента восстанавливается, Eureka автоматически выходит из режима самозащиты.

Если сервис-провайдер неожиданно перестаёт работать во время периода самозащиты, сервис-потребитель получит недействительный экземпляр сервиса, и вызов завершится неудачно. Чтобы решить эту проблему, сервис-потребителю необходимо иметь механизмы отказоустойчивости, такие как повторные попытки или прерыватели цепи.

Чтобы включить или отключить режим самозащиты в конфигурации Eureka Server, можно использовать следующий параметр:

eureka.server.enable-self-preservation=true

Клиент Eureka

Клиент Eureka — это Java-клиент для взаимодействия с Eureka Server. Клиент Eureka извлекает, обновляет и кэширует информацию о Eureka Server. Даже если все узлы Eureka Server выйдут из строя, сервис-потребитель всё равно сможет использовать кэшированную информацию для поиска сервис-провайдера. Однако при изменении сервиса информация может стать неактуальной.

Основные функции клиента Eureka включают:

  • Регистрация: сервис-провайдеры регистрируются в центре регистрации, также являясь клиентами Eureka. При регистрации клиент Eureka предоставляет метаданные, такие как IP-адрес, порт, индикатор состояния службы, URL главной страницы и т. д.
  • Продление: клиент Eureka отправляет пульс каждые 30 секунд для продления регистрации. По умолчанию, если Eureka Server не получает пульс от клиента Eureka в течение 90 секунд, сервер удаляет экземпляр из реестра. Эти значения можно настроить, но обычно их не рекомендуется изменять. Продление имеет два важных атрибута:
    # интервал вызова задачи продления, по умолчанию 30 секунд
    eureka.instance.lease-renewal-interval-in-seconds=30
    # срок действия услуги, по умолчанию 90 секунд.
    eureka.instance.lease-expiration-duration-in-seconds=90
  • Выселение: когда клиент Eureka и Eureka Server перестают отправлять пульс, Eureka Server удаляет этот экземпляр службы из реестра служб, то есть выселяет его.
  • Отмена: при завершении работы программы клиент Eureka посылает запрос на отмену Eureka Server. После получения запроса информация об этом клиенте удаляется из реестра экземпляров Eureka Server. Запрос на отключение не завершается автоматически, он требует выполнения следующего кода:
    DiscoveryManager.getInstance().shutdownComponent();
  • Получение реестра: клиент Eureka получает реестр с сервера и сохраняет его локально. Клиент использует эту информацию для поиска других служб и выполнения удалённых вызовов. Реестр обновляется каждые 30 секунд. Каждый раз, когда возвращается информация о реестре, она может отличаться от кэшированной информации клиента Eureka, и клиент обрабатывает её автоматически.

Кэшированная информация о реестре используется в качестве основы для сервисов-потребителей, поэтому необходимо учитывать два важных параметра: properties # включение функции извлечения реестра услуг потребителями из центра регистрации eureka.client.fetch-registry=true # установка интервала обновления реестра услуг потребителем из центра регистрации eureka.client.registry-fetch-interval-seconds=30

  • Удалённый вызов: после получения информации о сервис-провайдере от центра регистрации клиент Eureka может вызвать соответствующую службу через HTTP-запрос. Если имеется несколько сервис-провайдеров, клиент Eureka автоматически распределяет нагрузку с помощью Ribbon.

Механизм кэширования Eureka

Данные в Eureka Server хранятся в двухуровневой структуре ConcurrentHashMap:

  • Ключ: имя службы, которое совпадает с именем приложения, зарегистрированным клиентом.
  • Подключ: instanceId, уникальный идентификатор экземпляра службы.
  • Lease: объект Lease хранит всю регистрационную информацию об экземпляре, включая IP, порт, атрибуты и т.д.

Eureka Server предоставляет двухуровневый механизм кэширования для повышения эффективности отклика. Первый уровень кэша — readOnlyCacheMap, который является ConcurrentHashMap только для чтения. Этот кэш синхронизируется с readWriteCacheMap каждые 30 секунд по умолчанию. Второй уровень кэша — readWriteCacheMap, основанный на Guava Cache. Данные в readWriteCacheMap синхронизируются с хранилищем данных. Когда данные отсутствуют в кэше, они загружаются из CacheLoader.load и помещаются в кэш. Срок действия данных в readWriteCacheMap составляет 180 секунд по умолчанию, и данные удаляются при отключении службы, истечении срока действия, регистрации или изменении состояния.

При получении данных клиент Eureka сначала пытается получить их из первого уровня кэша. Если данные отсутствуют, они извлекаются из второго уровня кэша. В противном случае данные синхронизируются из хранилища данных в кэш перед извлечением. Двухуровневый механизм кэширования в Eureka Server эффективно сокращает время отклика и обеспечивает поддержку различных типов данных в зависимости от сценария использования. 90-е годы могут быть восприняты другими сервисами:

  1. Во-первых, Eureka Server поддерживает обновлённый ответный кэш каждые 30 секунд.
  2. Eureka Client также сохраняет кэшированные данные о регистрации в течение 30 секунд после их получения.
  3. Компонент балансировки нагрузки Ribbon также имеет кэш на 30 секунд.

Эти три кэша вместе могут привести к максимальному времени задержки регистрации сервисов в 90 секунд, что требует внимания в особых бизнес-сценариях.

Общие настройки

Общие настройки Eureka Server:

# Включить режим самозащиты сервера, который был описан в предыдущем разделе.
eureka.server.enable-self-preservation=true
# Интервал времени (в миллисекундах), через который будут удаляться устаревшие данные о сервисах (по умолчанию — 60 * 1000).
eureka.server.eviction-interval-timer-in-ms= 60000
# Период времени, через который удаляются устаревшие дельта-данные.
eureka.server.delta-retention-timer-interval-in-ms=0
# Ограничитель частоты запросов.
eureka.server.rate-limiter-burst-size=10
# Включен ли ограничитель частоты запросов.
eureka.server.rate-limiter-enabled=false
# Среднее значение частоты запросов.
eureka.server.rate-limiter-full-fetch-average-rate=100
# Применяется ли ограничение частоты запросов к стандартным клиентам. Если false, то ограничение применяется только к нестандартным клиентам.
eureka.server.rate-limiter-throttle-standard-clients=false
# Средняя частота запросов для получения данных о зарегистрированных сервисах и списках сервисов.
eureka.server.rate-limiter-registry-fetch-average-rate=500
# Список доверенных клиентов.
eureka.server.rate-limiter-privileged-clients=
# Ожидаемый процент обновления клиентских договоров в заданном временном диапазоне.
eureka.server.renewal-percent-threshold=0.85
# Пороговое значение времени обновления договоров.
eureka.server.renewal-threshold-update-interval-ms=0
# Время истечения срока действия кэшированных данных о регистрации сервисов.
eureka.server.response-cache-auto-expiration-in-seconds=180
# Частота обновления кэша данных о сервисах.
eureka.server.response-cache-update-interval-ms=0
# Длительность хранения данных в очереди дельт для обеспечения целостности при поиске.
eureka.server.retention-time-in-m-s-in-delta-queue=0
# Следует ли синхронизировать временные метки при их расхождении.
eureka.server.sync-when-timestamp-differs=true
# Используется ли стратегия только для чтения для кэша ответов, которая предотвращает истечение срока действия данных.
eureka.server.use-read-only-response-cache=true


################server node 与 node 之间关联的配置#####################33
# Отправляются ли сжатые данные в запросах репликации, всегда сжимаются.
eureka.server.enable-replicated-request-compression=false
# Указывает, следует ли обрабатывать запросы репликации пакетами для повышения эффективности сети.
eureka.server.batch-replication=false
# Максимальное количество событий репликации, которые могут быть сохранены в пуле репликации для резервного копирования. Этот пул отвечает за все события, кроме обновлений состояния. Размер пула можно настроить в зависимости от размера памяти, тайм-аута и потока репликации.
eureka.server.max-elements-in-peer-replication-pool=10000
# Максимальное количество событий репликации, которые могут быть сохранены в пуле состояний для резервного копирования.
eureka.server.max-elements-in-status-replication-pool=10000
# Максимальный период бездействия для потоков синхронизации между узлами кластера.
eureka.server.max-idle-thread-age-in-minutes-for-peer-replication=15
# Максимальный период бездействия для потоков синхронизации состояния между узлами кластера.
eureka.server.max-idle-thread-in-minutes-age-for-status-reпликация=15
# Максимальное количество потоков для репликации данных между экземплярами серверов.
eureka.server.max-threads-for-peer-репликация=20
# Максимальное количество потоков для синхронизации состояния между экземплярами серверов.
eureka.server.max-threads-for-статус-репликация=1
# Максимальная продолжительность обмена данными между экземплярами серверов.
eureka.server.max-time-for-репликация=30000
# Минимальное количество нормальных экземпляров серверов для репликации между серверами. -1 означает, что сервер является одноузловым.
eureka.server.min-available-instances-for-одноранговая-репликация=-1
# Минимальное количество потоков для запуска репликации между экземплярами серверов.
eureka.server.min-threads-для-одноранговой-репликации=5
# Минимальное количество потоков, используемых для синхронизации состояний между экземплярами серверов.
eureka.server.min-потоки-для-статуса-репликация=1
# Количество попыток повторной попытки репликации данных.
eureka.server.number-of-репликационные-попытки=5
# Интервал обновления данных между узлами серверов. По умолчанию 10 минут.
eureka.server.peer-eureka-nodes-обновление-интервал-мс=600000
# Интервал синхронизации обновлений статуса между серверами.
eureka.server.peer-eureka-статус-обновить-время-интервал-мс=0
# Тайм-аут подключения между равноправными узлами серверов.
eureka.server.равный-узел-подключение-тайм-аут-мс=200
# Истечение времени простоя соединения между равноправными узлами серверов после установления соединения.
eureka.server.равный-узел-соединение-простой-тайм-аут-секунд=30
# Тайм-аут чтения данных соединения между равноправными узлами серверов.
eureka.server.равный-узел-чтение-тайм-аут-мс=200
# Общее максимальное количество соединений между узлами серверов.
eureka.server.равный-узел-общее-количество-соединений=1000
# Максимальное количество соединений с одним хостом между узлами серверов.
eureka.server.равный-узел-общее-соединения-на-хост=10
# Количество попыток синхронизации реестра при запуске сервера.
eureka.server.реестр-синхронизация-повторы=
# Интервал ожидания между попытками синхронизации реестра при запуске сервера.
eureka.server.реестр-синхронизация-повторная попытка-ожидание-мс=
# Ожидание сервера перед получением информации о регистрации экземпляра при запуске, если информация не может быть получена от равноправного узла.
eureka.server.ожидание-времени-в-мс-при-синхронизации-пусто=0

Настройки Eureka Client:

# Включён ли клиент.
eureka.client.включено=true
# Регистрирует ли экземпляр собственные данные на сервере Eureka. По умолчанию true.
eureka.клиент.зарегистрироваться-с-эврикой=ложь
# Получает ли клиент информацию о регистрации с сервера Eureka. По умолчанию верно.
eureka.клиент.получить-реестр=ложь
# Отфильтровывает ли клиент экземпляры, находящиеся в состоянии «не работает». По умолчанию верно.
eureka.клиент.фильтр-только-экземпляры-вверх=верно
# Адрес зоны и URL-адрес службы для связи с сервером Eureka после регистрации.
eureka.клиент.serviceUrl.defaultZone=http://${eureka.instance.hostname}:${server.port}/eureka/

# Время ожидания соединения после подключения клиента к серверу Eureka. По умолчанию 30 секунд.
eureka.клиент.eureka-соединение-бездействие-тайм-аут-секунды=30
# Тайм-аут соединения клиента с сервером Eureka. По умолчанию 5 секунд.
eureka.клиент.eureka-сервер-подключиться-тайм-аут-секунды=5
# Тайм-аут чтения клиентом данных с сервера Eureka.
eureka.клиент.eureka-server-читать-тайм-аут-секунды=8
# Общее количество подключений клиента ко всем серверам Eureka. По умолчанию 200.
eureka.клиент.eureka-server-общее-число-соединений=200
# Количество подключений клиента к одному серверу Eureka. По умолчанию 50. ### Eureka Instance 常用配置:

```properties
#服务注册中心实例的主机名
eureka.instance.hostname=localhost
#注册在Eureka服务中的应用组名
eureka.instance.app-group-name=
#注册在的Eureka服务中的应用名称
eureka.instance.appname=
#该实例注册到服务中心的唯一ID
eureka.instance.instance-id=
#该实例的IP地址
eureka.instance.ip-address=
#该实例,相较于hostname是否优先使用IP
eureka.instance.prefer-ip-address=false

Eureka集群原理

Eureka Server в кластере синхронизируют данные между собой через Replicate (копирование). Узлы не имеют разделения на главные и подчинённые, все они равноправны. В этой архитектуре узлы регистрируются друг у друга для повышения доступности, каждому узлу необходимо добавить один или несколько действительных serviceUrl, указывающих на другие узлы.

Если один из узлов Eureka Server выходит из строя, запросы клиентов Eureka Client автоматически перенаправляются на новый узел Eureka Server. Когда отказавший узел снова становится доступен, он снова включается в управление кластером серверов. Когда узел начинает принимать клиентские запросы, все операции будут выполняться с копированием запросов на все известные узлы Eureka Server.

Синхронизация данных между узлами Eureka Server следует принципу: «достаточно одной линии связи, чтобы соединить узлы, и информация может быть передана». Если существует несколько узлов, достаточно соединить их попарно, чтобы сформировать путь, и тогда все центры регистрации смогут обмениваться информацией. Каждый Eureka Server также является Eureka Client, и несколько Eureka Servers синхронизируют таблицы служб друг с другом через P2P.

Состояние между Eureka Server синхронизируется асинхронно, поэтому нет гарантии, что состояние между узлами будет одинаковым, но в конечном итоге оно будет согласованным.

Разделение Eureka

Eureka предоставляет концепции Region и Zone для разделения, которые заимствованы из AWS Amazon:

  • region: географический регион, такой как азиатский регион, китайский регион или Шэньчжэнь и т. д. Без ограничений по размеру. В зависимости от конкретных условий проекта можно разумно разделить region.
  • zone: можно просто понимать как конкретный дата-центр в регионе, например, если регион разделён на Шэньчжэнь, а затем в Шэньчжэне есть два дата-центра, то в этом регионе можно разделить на zone1 и zone2.

На рисунке выше us-east-1c, us-east-1d и us-east-1e представляют разные зоны. Клиенты Eureka Client в зоне предпочитают синхронизироваться с серверами Eureka Server в той же зоне, а также вызывать конечные точки, предпочитая получать список служб от серверов Eureka Server в своей зоне. После сбоя сервера Eureka Server в зоне он будет получать информацию из других зон.

Гарантия доступности Eureka

Все узлы Eureka Server равны, и сбой нескольких узлов не повлияет на работу оставшихся узлов. Оставшиеся узлы всё ещё могут предоставлять услуги регистрации и поиска. Клиент Eureka Client при попытке подключения к определённому серверу Eureka и обнаружении сбоя соединения автоматически переключится на другой узел. Пока работает хотя бы один сервер Eureka, регистрация услуг будет доступна (гарантия доступности), но полученная информация может быть не самой последней (не гарантируется строгая согласованность).

Протокол согласованности Eureka

Основное отличие между Eureka и Zookeeper заключается в том, что Eureka — это модель AP, а Zookeeper — модель CP. В случае разделения мозга Eureka обеспечивает доступность (каждый раздел всё ещё может предоставлять услуги независимо), что является децентрализованным. Как Eureka достигает конечной согласованности?

Широковещательное сообщение

  1. Eureka Server управляет полным списком серверов (PeerEurekaNodes).

  2. Когда Eureka Server получает запрос на регистрацию, отключение или сердцебиение от клиента, он широковещательно передаёт задачу через PeerEurekaNode другим серверам. Если широковещательная передача не удалась, она будет повторена до тех пор, пока задача не истечёт и не будет отменена. В это время данные между двумя серверами будут временно несогласованными. Примечание: Хотя широковещательная передача сообщения не удалась, пока принимается сердцебиение клиента, задача широковещательной передачи сердцебиения всё равно будет отправлена всем серверам (включая отключённые серверы).

  3. Если сеть восстанавливается и принимаются широковещательные задачи сердцебиения от других серверов, возможны три ситуации:

    • Если задача сердцебиения другого сервера была отправлена позже, чем текущая задача, текущая задача будет отброшена, и будет использоваться задача другого сервера.
    • Если задачи отправляются одновременно, они сравниваются по идентификатору запроса. Задача с наибольшим идентификатором запроса будет принята, а остальные будут отброшены.
    • Если ни одна из задач не отправляется позже другой, они будут приняты обеими сторонами, и данные будут временно несогласованными, пока не произойдёт следующее обновление. Регистрация сервиса
  4. Spring Cloud Eureka при запуске приложения инициализируется в классе EurekaAutoServiceRegistration и активно регистрируется на сервере Eureka.

  5. После запуска Eureka запускает 40-секундный периодический процесс, который отслеживает изменения в IP-адресе и конфигурации приложения и при обнаружении изменений повторно регистрирует приложение.

  6. При возврате статуса 404 при продлении регистрации происходит повторная регистрация.

Активное отключение

Eureka выполняет метод shutDown при закрытии контейнера Spring, сначала устанавливая своё состояние в DOWN, а затем отправляя команду cancel на сервер Eureka для отключения своего сервиса.

Проверка здоровья и автоматическое отключение

Проверка здоровья обычно основана на механизме TTL (Time To Live). Например, клиент отправляет сердцебиение каждые 5 секунд, и если сервер не получает сердцебиение в течение 15 секунд, он обновляет статус экземпляра как нездоровый. Если сервер не получает сердцебиения в течение 30 секунд, экземпляр удаляется из списка сервисов. По умолчанию сервер Eureka отправляет сердцебиение каждые 30 секунд и удаляет просроченные экземпляры через 90 секунд. Процесс очистки просроченных экземпляров выполняется каждые 60 секунд.

Восстановление после сбоя

Перезапуск

При запуске Spring Cloud Eureka во время инициализации EurekaServerBootstrap#initEurekaServerContext вызывается PeerAwareInstanceRegistryImpl#syncUp для синхронизации данных с другими Eureka серверами.

Разделение мозга

После разделения мозга: сервисы в том же регионе, что и Eureka Server, могут нормально обращаться к нему, в то время как сервисы в других регионах автоматически истекают.

После восстановления после разделения мозга: принимаются запросы на сердцебиение от других серверов Eureka, и в этом случае есть три сценария:

  • быстрое восстановление после разделения мозга, всё работает нормально;
  • экземпляр уже истёк, требуется повторная регистрация;
  • конфликт данных, необходимо инициировать синхронизацию.

Zuul

Zuul — это открытый компонент, разработанный Netflix, предназначенный для решения проблем «шлюза».

Функции шлюза:

  • Единый вход: предоставление уникального входа для всех служб, шлюз обеспечивает изоляцию между внешними и внутренними системами, обеспечивая безопасность внутренних служб.
  • Проверка подлинности: определение прав доступа каждого запроса и отклонение запросов, не соответствующих требованиям.
  • Динамическая маршрутизация: динамическое направление запросов к различным внутренним кластерам.
  • Уменьшение зависимости между клиентами и службами: службы могут развиваться независимо, а шлюз выполняет сопоставление.

Feign

Использование Feign включает два этапа: создание динамического прокси Feign и выполнение Feign.

Динамический прокси Feign

Feign по умолчанию реализуется с помощью ReflectiveFeign, создаваемого через Feign.Builder.

  • Contract унифицирует анализ методов MethodMetadata (*), позволяя адаптировать различные REST декларативные стандарты с помощью различных реализаций Contract.
  • buildTemplate фактически преобразует параметры метода в запрос.
  • metadata и buildTemplate объединяются в MethodHandler.

Реализация балансировки нагрузки на основе Feign

Балансировка нагрузки с использованием Feign интегрирована с Ribbon. Для реализации балансировки нагрузки необходимо обернуть Client, чтобы обеспечить балансировку нагрузки. Соответствующий код можно найти в RibbonClient и LBClient.

// RibbonClient основная логика
private final Client delegate;
private final LBClientFactory lbClientFactory;
public Response execute(Request request, Request.Options options) throws IOException {
    try {
        URI asUri = URI.create(request.url());
        String clientName = asUri.getHost();
        URI uriWithoutHost = cleanUrl(request.url(), clientName);
        // Создание RibbonRequest, включая Client, Request и uri
        LBClient.RibbonRequest ribbonRequest =
            new LBClient.RibbonRequest(delegate, request, uriWithoutHost);
        // executeWithLoadBalancer реализует балансировку нагрузки
        return lbClient(clientName).executeWithLoadBalancer(
            ribbonRequest,
            new FeignOptionsClientConfig(options)).toResponse();
    } catch (ClientException e) {
        propagateFirstIOException(e);
        throw new RuntimeException(e);
    }
}

По сути, это просто упаковка объекта Client, которая использует executeWithLoadBalancer для балансировки нагрузки, предоставляемой Ribbon. На основе Feign реализуется разрыв цепи

Разрыв цепи и ограничение потока на основе Feign (интеграция с Hystrix). Чтобы ограничить поток, необходимо перехватить выполнение метода, то есть переписать InvocationHandlerFactory, и выполнить разрыв цепи и ограничить поток перед выполнением метода. Соответствующий код можно найти в HystrixFeign, который фактически реализует HystrixInvocationHandler.

// HystrixInvocationHandler основная логика
public Object invoke(final Object proxy, final Method method, final Object[] args)
      throws Throwable {
    HystrixCommand<Object> hystrixCommand =
        new HystrixCommand<>(setterMethodMap.get(method)) {
            @Override
            protected Object run() throws Exception {
                return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
            }
            @Override
            protected Object getFallback() {
            };
        }
    ...
    return hystrixCommand.execute();
}

Общий процесс кодирования параметров Feign

Рисунок: Общий процесс кодирования параметров Feign

Резюме: Первые два шага — это этап создания прокси-сервера Feign, анализ параметров метода и аннотаций метаданных. Последние три шага — этап вызова, преобразование параметров метода в формат данных HTTP-запроса.

Contract интерфейс преобразует каждый метод интерфейса UserService и его аннотации в MethodMetadata, а затем использует RequestTemplate#request для создания запроса. RequestTemplate#request создаёт запрос, после чего можно вызвать Client#execute для отправки HTTP-запроса. У клиента есть конкретные реализации, такие как HttpURLConnection, Apache HttpComponnets, OkHttp3 и Netty. В этой статье основное внимание уделяется первым трём шагам: анализу метаданных методов Feign и процессу кодирования параметров.

Процесс сборки Feign в целом

Рисунок: Процесс сборки Feign в целом

Резюме: OpenFeign имеет два входа для сборки:

  1. @EnableAutoConfiguration автоматическая сборка (spring.factories):

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.openfeign.ribbon.FeignRibbonClientAutoConfiguration,\
    org.springframework.cloud.openfeign.FeignAutoConfiguration
    • FeignAutoConfiguration автоматически собирает FeignContext и Targeter, а также конфигурацию клиента.
    • Targeter имеет две реализации: DefaultTargeter, который напрямую вызывает Feign.Builder; HystrixTargeter, вызывающий HystrixFeign.Builder для включения разрыва цепи.
    • Клиент: автоматически собирает ApacheHttpClient, OkHttpClient, не собирает условия, по умолчанию является Client.Default. Однако эти клиенты не реализуют балансировку нагрузки.
    • FeignRibbonClientAutoConfiguration реализует балансировку нагрузки, которая выполняется на уровне клиента.
  2. @EnableFeignClients автоматический поиск

    @EnableFeignClients вводит FeignClientsRegistrar, который запускает автоматический поиск и упаковывает интерфейс, помеченный @FeignClient, в объект FeignClientFactoryBean, и, наконец, генерирует прокси-объект этого интерфейса через Feign.Builder. По умолчанию конфигурация Feign.Builder — FeignClientsConfiguration, которая вводится через FeignAutoConfiguration.

Примечание: разрыв цепи и балансировка нагрузки выполняются FeignAutoConfiguration путём внедрения HystrixTargeter и FeignRibbonClientAutoConfiguration соответственно.

Прочее

Feign — декларативный клиент веб-службы.

Рисунок: Введение Feign

Feign использует динамический прокси:

  • Во-первых, если вы определяете @FeignClient для определённого интерфейса, Feign создаст динамический прокси для этого интерфейса.
  • Затем, когда вы вызываете этот интерфейс, вы фактически вызываете динамический прокси Feign, что является основной функцией.
  • Динамический прокси Feign будет динамически создавать адрес службы, которую вы хотите запросить, на основе ваших аннотаций @RequestMapping и других в интерфейсе.
  • Наконец, отправьте запрос на этот адрес и проанализируйте ответ.

Рисунок: Feign Рисунок: Удаленный вызов Feign

Лента

Лента — компонент Netflix с открытым исходным кодом для балансировки нагрузки.

Рисунок: введение ленты

Роль ленты на стороне потребителя услуг заключается в балансировке нагрузки, и по умолчанию используется алгоритм балансировки нагрузки Round Robin. Лента получает соответствующую информацию о регистрации служб из Eureka Service Discovery, а затем знает местоположение соответствующих служб. Затем лента выбирает машину в соответствии с настроенным алгоритмом балансировки нагрузки, и Feigin создаёт и отправляет запросы на эти машины, как показано на рисунке ниже.

Рисунок: Лента Рисунок: Правила ленты

Гистрикс

Hystrix — проект с открытым исходным кодом от Netflix, предоставляющий функцию разрыва цепи, которая может предотвратить каскадные сбои в распределённых системах.

Рисунок: Введение Hystrix

Hystrix изолирует, разрывает цепь и понижает уровень обслуживания. Проще говоря, Hystrix создаст много небольших пулов потоков, которые будут запрашивать услуги и возвращать результаты. Hystrix действует как промежуточный фильтр. Если наша служба токенов не работает, мы сразу же вернёмся без ожидания тайм-аута, чтобы вызвать исключение, что называется разрывом цепи. Но мы не можем просто ничего не делать и вернуться, иначе мы не сможем вручную добавить токены позже. Поэтому мы записываем сообщение в базу данных каждый раз, когда вызываем службу токенов, что называется понижением уровня обслуживания. Весь процесс изоляции, разрыва цепи и понижения уровня обслуживания Hystrix выглядит следующим образом:

Рисунок: Hystrix Рисунок: Разрыв цепи Hystrix

Шлюз

Spring Cloud Gateway — шлюз, разработанный Spring на основе Spring 5.0, Spring Boot 2.0 и Project Reactor. Spring Cloud Gateway предназначен для обеспечения простого, эффективного и унифицированного способа управления API-маршрутизацией для микросервисной архитектуры.

Рисунок: Введение шлюза

Config

В Spring Cloud предоставляется распределённая конфигурация Spring Cloud Config, которая поддерживает клиентскую и серверную части. Bus

Используя Spring Cloud Bus, можно легко построить шину сообщений.

OAuth2

В микросервисной системе, построенной с помощью Spring Cloud, можно использовать Spring Cloud OAuth2 для защиты микросервисной системы.

Sleuth

Spring Cloud Sleuth — это компонент Spring Cloud. Его основная функция заключается в предоставлении решения для отслеживания сервисных цепочек в распределённых системах.

MyBatis

Mybatis — это полуобъектно-реляционный картограф (ORM). Он инкапсулирует JDBC, загружает драйверы, создаёт соединения и создаёт операторы. Разработчикам нужно только сосредоточиться на написании SQL-запросов. Можно строго контролировать производительность выполнения SQL и обеспечивать гибкость.

Как полуобъектно-реляционная система, Mybatis может использовать XML или аннотации для настройки и сопоставления исходных данных. Он преобразует записи POJO в базу данных, избегая почти всех JDBC-кодов и ручного задания параметров и получения результатов.

Различные операторы настраиваются через файлы XML или аннотаций, а затем сопоставляются с параметрами Java-объектов и операторов SQL для создания окончательных выполняемых SQL-операторов. Наконец, Mybatis выполняет SQL и отображает результаты в Java-объекты и возвращает их.

Поскольку Mybatis фокусируется на SQL, он обеспечивает высокую гибкость и подходит для проектов с высокими требованиями к производительности или частыми изменениями требований, таких как интернет-проекты.

Преимущества MyBatis:

— Основан на программировании SQL-запросами, что обеспечивает гибкость и не влияет на существующий дизайн приложений или баз данных. SQL пишется в XML, что позволяет отделить SQL от кода программы и упрощает централизованное управление. Предоставляет XML-теги и поддерживает написание динамических SQL-запросов, которые можно повторно использовать.

— По сравнению с JDBC, объём кода уменьшается более чем на 50%, устраняя избыточный код JDBC. Не требуется вручную переключать соединения.

— Хорошая совместимость с различными базами данных (поскольку MyBatis использует JDBC для подключения к базе данных, он поддерживает все базы данных, поддерживаемые JDBC).

— Может хорошо интегрироваться со Spring.

— Предоставляет теги сопоставления, поддерживает объекты и отношения ORM с полями базы данных; предоставляет теги объектных отношений, поддерживает компоненты объектных отношений.

Недостатки MyBatis:

— Написание SQL-запросов требует больших усилий, особенно когда полей много или есть много связанных таблиц. Это требует от разработчиков хорошего понимания написания SQL-запросов.

— SQL-запросы зависят от базы данных, что приводит к плохой переносимости и невозможности легко менять базы данных.

#{} и ${} — в чём разница?

${} — замена строки.

#{} — предварительная обработка.

Использование #{} может эффективно предотвратить внедрение SQL и повысить безопасность системы.

Архитектура MyBatis

Ядро Функция
Configuration Сохраняет большую часть конфигурации MyBatis
SqlSession Основной API MyBatis для взаимодействия с базой данных и реализации функций базы данных
Executor Диспетчер MyBatis, отвечающий за генерацию SQL-запроса и обслуживание кеша запросов
StatementHandler Инкапсуляция JDBC, ответственная за операции с операторами JDBC, такими как установка параметров
ParameterHandler Преобразование пользовательских параметров в типы данных, соответствующие операторам JDBC
ResultSetHandler Отвечает за преобразование результатов JDBC ResultSet в коллекции List
TypeHandler Ответственный за отображение и преобразование между типами данных Java и типами данных JDBC (также известными как типы столбцов таблицы)
MappedStatement MappedStatement отвечает за упаковку одного узла <select|update|delete|insert>
SqlSource Отвечает за динамическое создание SQL-запросов на основе переданных параметров пользователя и упаковывает информацию в BoundSql
BoundSql Представляет динамически созданный SQL-запрос и соответствующую информацию о параметрах

Процесс MyBatis

Принцип работы интерфейса Dao

Принцип работы интерфейса Mapper заключается в использовании JDK динамического прокси для генерации прокси-объекта для интерфейса Mapper. Прокси-объект перехватывает методы интерфейса, находит соответствующий MapperStatement на основе класса полного имени + метода и вызывает исполнитель для выполнения соответствующего SQL. Затем он возвращает результат выполнения.

Методы в интерфейсе Mapper нельзя перегружать, поскольку используется стратегия сохранения и поиска на основе полного имени класса + имени метода.

Кэш MyBatis

Кэш MyBatis имеет два уровня: первый уровень и второй уровень. Первый уровень кэша включён по умолчанию и не может быть выключен. Второй уровень кэша можно включить или выключить.

Первый уровень кэша относится к уровню SqlSession. Когда один и тот же SQL-запрос выполняется в одном и том же сеансе SqlSession, последующие запросы будут извлекаться из кэша, а не из базы данных. Первый уровень кэша может хранить до 1024 SQL-запросов. Второй уровень кеша является общим для разных сеансов SqlSession и может использоваться совместно. Первый уровень кеша использует PerpetualCache в качестве основной структуры данных, которая по сути является хэш-картой. Второй уровень кеша по умолчанию выключен.

Первый уровень кэша

Когда отправляется первый запрос SQL, результаты запроса записываются в кэш первого уровня сеанса SqlSession. Структура данных кэша представляет собой карту, где ключ состоит из следующих элементов:

— Идентификатор MapperID. — Смещение offset. — Лимит limit. — SQL. — Все параметры ввода.

Значение представляет собой информацию о пользователе.

Если в промежутке между двумя запросами происходит операция фиксации (изменение, добавление или удаление), то весь кэш первого уровня в текущем сеансе SqlSession очищается. При следующем запросе данные необходимо будет извлечь из базы данных, а после извлечения записать обратно в кэш.

Второй уровень кэша

Область действия второго уровня кэша — уровень Mapper (один и тот же Mapper в одном пространстве имён). Mapper создаёт структуру данных кэша на уровне пространства имён. Кэш реализован через CacheExecutor, который фактически является прокси-объектом Executor. Все запросы сначала проверяются на наличие в кэше, и если они отсутствуют, то запрашиваются из базы данных. Если вы знакомы с веб-разработкой, то должны знать о cookie и session.

Session в MyBatis похож на концепцию session в веб-разработке. Данные действительны в течение сессии (или встречи). После завершения сессии данные удаляются.

Также можно рассматривать SqlSession как соединение с базой данных (но это не совсем верно, потому что после создания SqlSession, если не выполнять SQL, соединение бессмысленно, поэтому соединение устанавливается при выполнении SQL).

SqlSession определяет только некоторые методы выполнения SQL, а их реализация осуществляется дочерними классами, например, классом DefaultSqlSession. В MyBatis выполнение SQL происходит через Executor, который создаётся при открытии SqlSession.

Executor — это интерфейс, определяющий упаковку JDBC. MyBatis предоставляет несколько исполнителей: CacheExecutor, ClosedExecutor и SimpleExecutor.

CacheExecutor — это прокси-класс Executor, содержащий делегат, который нужно создать вручную (выбрать один из трёх вариантов: simple, reuse или batch). ClosedExecutor генерирует исключения для всех интерфейсов, указывая на закрытый Executor. При создании SqlSession по умолчанию используется SimpleExecutor.

Executor упаковывает JDBC. Когда мы используем JDBC для выполнения SQL, обычно сначала предварительно обрабатываем SQL, получая PreparedStatement, а затем вызываем executeXxx() для выполнения SQL. То есть Executor также создаёт объект Statement при выполнении SQL.

StatementHandler в JDBC вызывает Statement.executeXxx() для выполнения SQL. В MyBatis также используется Statement.executeXxx(), поэтому необходимо упомянуть StatementHandler. Его можно считать рабочим, выполняющим следующие задачи:

  • предварительная обработка SQL;
  • выполнение SQL с помощью statement.executeXxx();
  • преобразование объекта результата базы данных (ORM).

ParameterHandler отвечает за установку параметров после предварительной обработки SQL. Существует DefaultParameterHandler.

После выполнения statement.execute() можно получить результат с помощью statement.getResultSet(). Затем MyBatis использует ResultSetHandler для преобразования данных результата в объекты Java (ORM-отображение). Существует класс реализации DefaultResultHandler, который переопределяет метод handlerResultSets.

TypeHandler используется в двух местах:

  1. Связывание параметров, происходящее в ParameterHandler.setParamenters(). В MyBatis можно использовать для определения отношений отображения результатов, включая тип каждого поля. TypeHandler может устанавливать значения для определённых полей в соответствии с конфигурацией типа в XML, например, при установке параметра uid для SQL его тип INTEGER (jdbcType).
  2. Получение значений полей результата, происходящее во время обработки ResultSetHandler результатов.

Nginx

Установка Nginx

# CentOS
yum install nginx;
# Ubuntu
sudo apt-get install nginx;
# Mac
brew install nginx;

После установки можно просмотреть информацию об установке Nginx с помощью команды rpm \-ql nginx.

# Файлы конфигурации Nginx
/etc/nginx/nginx.conf # основной файл конфигурации
/etc/nginx/nginx.conf.default

# Исполняемые файлы
/usr/bin/nginx-upgrade
/usr/sbin/nginx

# Библиотеки Nginx
/usr/lib/systemd/system/nginx.service # для настройки системных служб
/usr/lib64/nginx/modules # каталог модулей Nginx

# Документация
/usr/share/doc/nginx-1.16.1
/usr/share/doc/nginx-1.16.1/CHANGES
/usr/share/doc/nginx-1.16.1/README
/usr/share/doc/nginx-1.16.1/README.dynamic
/usr/share/doc/nginx-1.16.1/UPGRADE-NOTES-1.6-to-1.10

# Каталоги статических ресурсов
/usr/share/nginx/html/404.html
/usr/share/nginx/html/50x.html
/usr/share/nginx/html/index.html

# Журналы Nginx
/var/log/nginx

Основные папки для внимания:

  • /etc/nginx/conf.d/ — папка для хранения дочерних конфигураций, /etc/nginx/nginx.conf по умолчанию включает все файлы конфигурации в этой папке.
  • /usr/share/nginx/html/ — здесь размещаются статические файлы, но можно выбрать другое место в зависимости от предпочтений.

Команды управления

Обычно настраиваются в файле /etc/nginx/nginx.conf.

Команды systemctl

# Включить автозапуск при загрузке
systemctl enable nginx
# Отключить автозапуск при загрузке
systemctl disable nginx

# Запустить Nginx
systemctl start nginx # после успешного запуска Nginx можно напрямую обращаться к IP хоста, где будет отображаться страница по умолчанию Nginx
# Остановить Nginx
systemctl stop nginx
# Перезапустить Nginx
systemctl restart nginx
# Перегрузить конфигурацию Nginx без перезапуска
systemctl reload nginx
# Проверить статус Nginx
systemctl status nginx

# Просмотреть процессы Nginx
ps -ef | grep nginx
# Убить процесс Nginx
kill -9 pid # найти PID процесса Nginx и завершить его, -9 означает принудительное завершение процесса

Команды Nginx

# Запуск
nginx -s start
# Отправить сигнал главному процессу для перезагрузки конфигурации и горячего перезапуска
nginx -s reload
# Перезапуск Nginx
nginx -s reopen
# Быстрое закрытие
nginx -s stop
# Ожидание завершения работы процессов перед закрытием
nginx -s quit
# Просмотр текущей окончательной конфигурации Nginx
nginx -T
# Проверка конфигурации на наличие проблем
nginx -t

Правила конфигурации

Структура уровней Nginx выглядит следующим образом:

Nginx层级结构

URI-сопоставление

location = / {
    # Полное соответствие =
    # Чувствительность к регистру ~
    # Игнорирование регистра ~*
}
location ^~ /images/ {
    # Соответствие началу ~^
    # Можно использовать регулярные выражения, такие как:
    # location ~* \.(gif|jpg|png)$ { }
}
location / {
    # Если предыдущие правила не подходят, выполняется здесь
}

location используется для сопоставления URI.

upstream

Используется для определения информации о вышестоящих серверах (то есть серверах приложений, предоставляемых бэкендом).

upstream test_server{
    server 192.168.100.33:8081
}
``` **Прокси-проход**

Синтаксис: proxy_pass URL;
Контекст: location, if, limit_except
Пример:
proxy_pass http://127.0.0.1:8081
proxy_pass http://127.0.0.1:8081/proxy

URL параметр:

* URL должен начинаться с http или https;
* URL может содержать переменные;
* Наличие URI в URL напрямую влияет на URL запроса, отправляемого на вышестоящий сервер.

Разница между URL с / и без /:

* Без / Nginx не изменяет URL пользователя, а просто передаёт его вышестоящему серверу приложения;
* С / Nginx изменяет URL пользователя, удаляя из него часть после location.

Пример: пользователь запрашивает URL /bbs/abc/test.html. Запрос достигает вышестоящего сервера приложений с URL /bbs/abc/test.html, если используется proxy_pass http://127.0.0.1:8080, и с URL /abc/test.html при использовании proxy_pass http://127.0.0.1:8080/.

**Сценарии использования**

**Обратный прокси**

Пример 1:
* /etc/nginx/conf.d/proxy.conf
* 1.1 Имитация обслуживаемого сервиса
server {
    listen 8080;
    server_name localhost;
  
    location /proxy/ {
        root /usr/share/nginx/html/proxy;
        index index.html;
    }
}
* 1.2 /usr/share/nginx/html/proxy/index.html
<h1> 121.42.11.34 proxy html </h1>

Пример 2:
* /etc/nginx/conf.d/proxy.conf
* 2.1 Сервер бэкенда
upstream back_end {
    server 121.42.11.34:8080 weight=2 max_conns=1000 fail_timeout=10s max_fails=3;
    keepalive 32;
    keepalive_requests 80;
    keepalive_timeout 20s;
}
* 2.2 Конфигурация прокси
server {
    listen 80;
    # vim /etc/hosts для добавления записи: 121.5.180.193 proxy.lion.club
    server_name proxy.lion.club;
    location /proxy {
        proxy_pass http://back_end/proxy;
    }
}

**Балансировка нагрузки**

Пример 1:
* /etc/nginx/conf.d/balance.conf
* 1.1 Имитация первого обслуживаемого сервиса
server{
  listen 8020;
  location / {
   return 200 'return 8020 \n';
  }
}
* 1.2 Имитация второго обслуживаемого сервиса
server{
  listen 8030;
  location / {
   return 200 'return 8030 \n';
  }
}
* 1.3 Имитация третьего обслуживаемого сервиса
server{
  listen 8040;
  location / {
   return 200 'return 8040 \n';
  }
}

Пример 2:
* /etc/nginx/conf.d/balance.conf
* 2.1 Список серверов
upstream demo_server {
    server 121.42.11.34:8020;
    server 121.42.11.34:8030;
    server 121.42.11.34:8040;
}
* 2.2 Конфигурация прокси
server {
    listen 80;
    server_name balance.lion.club;
    location /balance/ {
        proxy_pass http://demo_server;
    }
}

Алгоритм хеширования:

upstream demo_server {
    hash $request_uri;
    server 121.42.11.34:8020;
    server 121.42.11.34:8030;
    server 121.42.11.34:8040;
}

Сервер:
listen 80;
server_name balance.lion.club;
location /balance/ {
    proxy_pass http://demo_server;
}

Запрос всегда направляется на один и тот же сервер, пока сохраняется значение request_uri.

Алгоритм IP-хеширования:

upstream demo_server {
    ip_hash;
    server 121.42.11.34:8020;
    server 121.42.11.34:8030;
    server 121.42.11.34:8040;
}

Распределение запросов по серверам осуществляется на основе IP-адреса клиента. Это помогает решить проблемы с сохранением сессий на бэкенд-серверах.

Наименьший алгоритм соединения:

upstream demo_server {
    zone test 10M; # zone можно установить имя и размер совместно используемой памяти
    least_conn;
    server 121.42.11.34:8020;
    server 121.42.11.34:8030;
    server 121.42.11.34:8040;
}

Каждый рабочий дочерний процесс считывает данные из общей памяти для получения информации о серверах. Затем выбирается сервер с наименьшим количеством установленных соединений для обработки запроса.

**Кэширование конфигурации**

 proxy_cache

Сохраняет некоторые ранее посещённые и потенциально повторно посещаемые ресурсы, чтобы пользователи могли получать их непосредственно от прокси-сервера, снижая нагрузку на вышестоящий сервер и ускоряя доступ.

 proxy_cache_path

Определяет путь к файлам кэша.

Параметры:
* path  путь к файлу кэша;
* level  уровень каталога;
* keys_zone  настройка общей памяти;
* inactive  время, в течение которого кэш будет очищен, если он не был посещён, по умолчанию 10 минут.

 proxy_cache_key

Устанавливает ключ для файла кэша.

По умолчанию: proxy_cache_key $scheme$proxy_host$request_uri;.

 proxy_cache_valid

Настраивает, какие коды состояния могут быть кэшированы и на какой срок.

Например: proxy_cache_valid 200 304 2m; означает, что файлы кэша со статусом 200 и 304 будут действительны в течение 2 минут. **Определение условий сохранения в кэш и условий, при которых ответ не будет сохранён в кэш**

Определяются условия, при которых ответ от сервера не будет сохраняться в кэш.

```nginx
Синтаксис: proxy_no_cache string;
Контекст: http, server, location
Пример: proxy_no_cache $http_pragma    $http_authorization;

Определение условий, при которых ответ будет обходить кэш

Определяется условие, при котором ответ от сервера будет обходить кэш.

Синтаксис: proxy_cache_bypass string;
Контекст: http, server, location
Пример: proxy_cache_bypass $http_pragma    $http_authorization;

Переменная upstream_cache_status

Хранит информацию о том, было ли попадание в кэш, полезна для отладки.

Значения:

  • MISS — промах кэша;
  • HIT — попадание в кэш;
  • EXPIRED — кэш истёк;
  • STALE — попадание в устаревший кэш;
  • REVALIDDATED — Nginx подтвердил актуальность устаревшего кэша;
  • UPDATING — контент устарел, но обновляется;
  • BYPASS — ответ получен напрямую с исходного сервера.
proxy_cache_path /etc/nginx/cache_temp levels=2:2 keys_zone=cache_zone:30m max_size=2g inactive=60m use_temp_path=off;

upstream cache_server {
    server 121.42.11.34:1010;
    server 121.42.11.34:1020;
}

server {
    listen 80;
    server_name cache.lion.club;

    # Сценарий 1: требования к оперативности невысоки, настраиваем кэш
    location /demo {
        proxy_cache cache_zone; # настройка кэша памяти, уже определённого выше
        proxy_cache_valid 200 5m; # кэш состояния запросов со статусом 200 на 5 минут
        proxy_cache_key $request_uri; # ключ кэширования файлов — URI запроса
        add_header Nginx-Cache-Status $upstream_cache_status # установка статуса кэша в заголовке ответа клиенту
        proxy_pass http://cache_server; # проксирование
    }

    # Сценарий 2: требования к оперативности очень высоки, настраиваем отсутствие кэша
    # Если URI заканчивается на .txt или .text, устанавливаем значение переменной «no cache»
    if ($request_uri ~ \\.(txt|text)$) {
        set $cache_name "no cache"
    }
    location /test {
        proxy_no_cache $cache_name; # если переменная имеет значение, то кэш не используется, иначе используется
        proxy_cache cache_zone; # настройка кэша памяти
        proxy_cache_valid 200 5m; # кэш состояния запросов со статусом 200 на 5 минут
        proxy_cache_key $request_uri; # ключ кэширования файлов — URI запроса
        add_header Nginx-Cache-Status $upstream_cache_status # установка статуса кэша в заголовке ответа клиенту
        proxy_pass http://cache_server; # проксирование
  }
}

HTTPS

Скачивается сжатый файл сертификата, который содержит папку Nginx. Файлы xxx.crt и xxx.key копируются в каталог сервера, после чего выполняется следующая конфигурация:

server {
  listen 443 ssl http2 default_server; # порт доступа SSL равен 443
  server_name lion.club; # доменное имя, связанное с сертификатом (здесь произвольное)
  ssl_certificate /etc/nginx/https/lion.club_bundle.crt; # путь к сертификату
  ssl_certificate_key /etc/nginx/https/lion.club.key; # путь к приватному ключу
  ssl_session_timeout 10m;
  ssl_protocols TLSv1 TLSv1.1 TLSv1.2; # поддержка версий протокола SSL, по умолчанию последние три, основная версия [TLSv1.2]

  location / {
    root         /usr/share/nginx/html;
    index        index.html index.htm;
  }
}

Включение сжатия gzip

В папке /etc/nginx/conf.d/ создаётся новый конфигурационный файл gzip.conf:

# По умолчанию off, включение сжатия gzip
gzip on;
# MIME-типы файлов, которые должны быть сжаты, text/html принудительно включён
gzip_types text/plain text/css application/json application/x-javascript text/xml application/xml application/xml+rss text/javascript;
# ------- Вышеуказанные два параметра включены, сжатие gzip поддерживается --------


# По умолчанию off, модуль включён, Nginx сначала проверяет наличие файла с расширением .gz для статических файлов, если есть, возвращает содержимое файла .gz
gzip_static on;
# По умолчанию off, включается при использовании Nginx в качестве обратного прокси, используется для настройки включения или отключения сжатия gzip из содержимого прокси-сервера
gzip_proxied any;
# Используется для добавления заголовка Vary: Accept-Encoding в ответ, чтобы прокси-сервер мог определить, следует ли использовать сжатие gzip на основе заголовка Accept-Encoding запроса
gzip_vary on;
# Уровень сжатия, 1–9, где 1 — самый низкий уровень сжатия, а 9 — самый высокий, чем выше уровень, тем больше степень сжатия и время сжатия, рекомендуется 4–6
gzip_comp_level 6;
# Объём памяти, используемый для кэширования результатов сжатия, 16 8k означает получение 8k * 16 единиц
gzip_buffers 16 8k;
# Минимальный размер страницы, разрешённый для сжатия, размер страницы получается из заголовка Content-Length. Значение по умолчанию равно 0, независимо от размера страницы она будет сжата. Рекомендуется установить значение больше 1k байт, меньше 1k может привести к увеличению размера после сжатия
gzip_min_length 1k;
# Минимальная версия HTTP, необходимая для включения сжатия gzip, значение по умолчанию — 1.1
gzip_http_version 1.1;

Общие настройки

Прослушивание портов

server {
    # Стандартный протокол HTTP
    listen 80;
    # Стандартный протокол HTTPS
    listen 443 ssl;
    # Для http2
    listen 443 ssl http2;
    # Прослушивание порта 80 с использованием IPv6
    listen [::]:80;
    # Только прослушивание с использованием IPv6
    listen [::]:80 ipv6only=on;
}

Журналы доступа

server {
    # Относительный или полный путь к файлу журнала
    access_log /path/to/file.log;
    # Включить 'on' или 'off'
    access_log on;
}

Доменные имена

server {
    # Слушаю yourdomain.com
    server_name yourdomain.com;
    # Слушать несколько доменов  server_name yourdomain.com www.yourdomain.com;
    # Слушайте все домены
    server_name *.yourdomain.com;
    # Слушайте все корневые домены верхнего уровня
    server_name yourdomain.*;
    # Слушайте неуказанные имена хостов (прослушивает IP-адрес напрямую)
    server_name "";
}

Статические ресурсы

server {
    listen 80;
    server_name yourdomain.com;
    location / {
        root ### Перенаправление

```nginx
server {
    listen 80;
    server_name www.yourdomain.com;
    return 301 http://yourdomain.com$request_uri;
}

server {
    listen 80;
    server_name www.yourdomain.com;
    location /redirect-url {
        return 301 http://otherdomain.com;
    }
}

Обратный прокси

server {
    listen 80;
    server_name yourdomain.com;
    location / {
        proxy_pass http://0.0.0.0:3000;
        # где 0.0.0.0:3000 — ваш сервер приложений (например, node.js), привязанный к 0.0.0.0 и прослушивающий порт 3000
    }
}

Балансировка нагрузки

upstream node_js {
    server 0.0.0.0:3000;
    server 0.0.0.0:4000;
    server 123.131.121.122;
}

server {
    listen 80;
    server_name yourdomain.com;
    location / {
        proxy_pass http://node_js;
    }
}

SSL-протокол

server {
    listen 443 ssl;
    server_name yourdomain.com;
    ssl on;
    ssl_certificate /path/to/cert.pem;
    ssl_certificate_key /path/to/privatekey.pem;
    ssl_stapling on;
    ssl_stapling_verify on;
    ssl_trusted_certificate /path/to/fullchain.pem;
    ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
    ssl_session_timeout 1h;
    ssl_session_cache shared:SSL:50m;
    add_header Strict-Transport-Security max-age=15768000;
}

# Постоянное перенаправление с HTTP на HTTPS
server {
    listen 80;
    server_name yourdomain.com;
    return 301 https://$host$request_uri;
}

Анализ журнала

# Подсчёт IP-адресов посещений
awk '{print $1}' access.log | sort -n | uniq | wc -l

# Просмотр количества посещений за определённый промежуток времени (с 4 до 5 часов)
grep "07/Apr/2017:0[4-5]" access.log | awk '{print $1}' | sort | uniq -c| sort -nr | wc -l   

# Поиск 100 наиболее частых IP-адресов посетителей
awk '{print $1}' access.log | sort -n |uniq -c | sort -rn | head -n 100

# Поиск IP-адресов, которые посещали сайт более 100 раз
awk '{print $1}' access.log | sort -n |uniq -c |awk '{if($1 >100) print $0}'|sort -nr

# Получение подробной информации о посещениях определённого IP-адреса, отсортированных по частоте посещений
grep '104.217.108.66' access.log |awk '{print $7}'|sort |uniq -c |sort -rn |head -n 100   

# Статистика посещений страниц
# Топ-100 самых посещаемых страниц
awk '{print $7}' access.log | sort |uniq -c | sort -nr | head -n 100

# Топ-100 самых посещаемых не PHP-страниц
grep -v ".php"  access.log | awk '{print $7}' | sort |uniq -c | sort -nr | head -n 100          

# Страницы, которые посетили более 100 раз
cat access.log | cut -d ' ' -f 7 | sort |uniq -c | awk '{if ($1 > 100) print $0}' | less

# Последние 1000 записей в журнале, с указанием наиболее посещаемых страниц
tail -1000 access.log |awk '{print $7}'|sort|uniq -c|sort -nr|less

# Количество запросов в секунду, топ-100 временных точек (точность до секунды)
awk '{print $4}' access.log |cut -c 14-21|sort|uniq -c|sort -nr|head -n 100

# Количество запросов за минуту, топ-100 временных точек (точность до минуты)
awk '{print $4}' access.log |cut -c 14-18|sort|uniq -c|sort -nr|head -n 100

# Количество запросов за час, топ-100 временных точек (точность до часа)
awk '{print $4}' access.log |cut -c 14-15|sort|uniq -c|sort -nr|head -n 100

# Анализ производительности
# Список страниц с временем передачи более 3 секунд, первые 20 записей
cat access.log|awk '($NF > 3){print $7}'|sort -n|uniq -c|sort -nr|head -20

# Список PHP-страниц с временем запроса более 3 секунд, количество появлений, первые 100 записей
cat access.log|awk '($NF > 1 &&  $7~/\\.php/){print $7}'|sort -n|uniq -c|sort -nr|head -100

# Счётчик TCP-соединений
# Общее количество текущих TCP-соединений
netstat -tan | grep "ESTABLISHED" | grep ":80" | wc -l

# Использование tcpdump для анализа 80 порта и определения наиболее активных IP-адресов
tcpdump -i eth0 -tnn dst port 80 -c 1000 | awk -F"." '{print $1"."$2"."$3"."$4}' | sort | uniq -c | sort -nr

# Подсчёт уникальных IP-адресов в файле журнала
awk '{print $1}' $logpath |sort -n|uniq|wc -l

# IP-адрес, который посетил сайт в определённое время
sed -n '/22\/Jun\/2017:1[5]/,/22\/Jun\/2017:1[6]/p' $logpath|awk '{print $1}'|sort -n|uniq|wc -l

# Посетители, которые были на сайте более 100 раз
awk '{print $1}' $logpath|sort -n|uniq -c|awk '{if($1>100) print $0}'|sort -rn

# Наиболее частые запросы (ТОП-50)
awk '{print $7}' $logpath |sort |uniq -c|sort -rn |head -n 50

# Количество запросов в секунду (ТОП-50)
awk '{print $4}' $logpath|cut -c 14-21|sort |uniq -c|sort -nr|head -n 50

# Количество запросов за минуту (ТОП-50)
awk '{print $4}' $logpath|cut -c 14-18|sort|uniк -c|sort -nr|head -n 50

# Количество запросов за час (ТОП-50)
awk '{print $4}' $logpath|cut -c 14-15|sort|uniк -c|sort -nr|head -n 50

# Запросы, время передачи которых превышает 1 секунду (ТОП-20)
cat $logpath|awk '($NF > 1){print $7}'|sort -n|uniк -c|sort -nr|head -20

LVS

LVS — это аббревиатура от Linux Virtual Server, что означает виртуальный серверный кластер на базе Linux.

Режимы работы

У LVS есть три основных режима балансировки нагрузки:

  • NAT (Network Address Translation) — режим сетевого адресного преобразования;
  • IP — режим виртуального IP-сервера;
  • TUN/TAP — режим туннелей TUN и TAP. ENAT-режим

ENAT-режим (Enhance NAT). ENAT-режим также называют треугольным режимом или DNAT/SNAT-режимом.

Принцип работы

  1. Клиент отправляет запрос (cip, vip).
  2. Запрос поступает в LVS, где LVS изменяет запрос на (vip, rip) и помещает cip в TCP Option.
  3. Запрос направляется по IP-маршруту к RS, модуль Ctk считывает cip из TCP Option.
  4. Ответный пакет (RIP, vip) перехватывается модулем Ctk, который изменяет его на (vip, cip).
  5. Так как цель ответного пакета — cip, он может быть напрямую отправлен клиенту без прохождения через LVS.

Преимущества

  • Не требуется, чтобы LVS и RS находились в одном VLAN.
  • Трафик не проходит через LVS, что повышает производительность.

Недостатки

  • Это настраиваемое решение для группы, требующее установки компонента Ctk на всех RS (аналогично TOA в full NAT).

Анализ

Почему ответный пакет в ENAT не должен проходить через LVS?

В режиме full NAT ответному пакету необходимо пройти через LVS для повторного изменения IP. В ENAT это делается на RS с помощью модуля Ctk.

Почему LVS и RS могут находиться в разных VLAN?

Как и в случае с full NAT.

Структура ENAT

LVS-ENAT-STR

Алгоритмы диспетчеризации

Статические алгоритмы диспетчеризации

Диспетчеризация основана только на алгоритме, без учёта состояния реальных серверов и нагрузки.

  • RR: циклическая диспетчеризация (Round Robin). Диспетчер распределяет внешние запросы по очереди между серверами кластера. Все серверы рассматриваются одинаково, независимо от их текущего соединения и загрузки системы.

  • WRR: взвешенная циклическая диспетчеризация (Weight RR). Диспетчер использует алгоритм «взвешенной циклической диспетчеризации» для распределения запросов на основе мощности обработки реальных серверов. Это позволяет серверам с лучшей производительностью обрабатывать больше трафика. Диспетчер может автоматически запрашивать информацию о нагрузке реальных серверов и динамически корректировать их веса.

  • DH: диспетчеризация по целевому адресу (Destination Hash). На основе целевого IP-адреса запроса диспетчер выбирает соответствующий сервер из статически распределённого списка. Если сервер доступен и не перегружен, запрос отправляется на этот сервер. Иначе возвращается пустое значение.

  • SH: диспетчеризация по исходному адресу (Source Hash). Исходный IP-адрес запроса используется в качестве ключа хеширования для выбора соответствующего сервера из статического списка. Если сервер доступен и не перегружен, запрос направляется на этот сервер, иначе возвращается пустое значение.

Динамические алгоритмы диспетчеризации

  • LC: диспетчеризация с наименьшим количеством соединений (Least Connections). Диспетчер динамически направляет сетевые запросы на сервер с наименьшим числом установленных соединений. Если реальные серверы в кластере имеют схожую производительность, этот алгоритм обеспечивает равномерное распределение нагрузки.

  • WLC: взвешенная диспетчеризация с наименьшим количеством соединений (Weighted Least Connections, по умолчанию). В системах с большим разбросом производительности между реальными серверами, этот алгоритм оптимизирует балансировку нагрузки путём использования весов. Серверы с более высоким весом будут обрабатывать большую долю активных соединений. Диспетчер автоматически запрашивает информацию о загрузке реальных серверов и динамически корректирует их веса.

  • SED: диспетчеризация с кратчайшим ожиданием (Shortest Expected Delay). Улучшение на основе WLC. Overhead = (ACTIVE + 1) * 256 / вес. Не учитывает неактивные состояния. Текущее количество активных серверов + 1 используется для принятия следующего запроса. Минимальное число получает следующий запрос. +1 предназначен для учёта весов при перегрузке.

  • NQ: никогда не стоять в очереди / диспетчеризация с наименьшей очередью (Never Queue Scheduling NQ). Нет необходимости в очереди. Если один из реальных серверов имеет 0 подключений, запрос сразу направляется туда. SED не рассматривается.

  • LBLC: локальная диспетчеризация с наименьшим количеством соединений (locality-Based Least Connections). Алгоритм диспетчеризации на основе локальности для балансировки нагрузки по целевым IP-адресам. Обычно используется в системах кэширования. Он выбирает ближайший сервер на основе целевого IP-адреса. Если этот сервер доступен и не перегружен, запрос отправляется туда. Если сервера нет или он перегружен и есть сервер с половиной рабочей нагрузки, выбирается сервер с наименьшим соединением.

  • LBLCR: локальная диспетчеризация с копированием и наименьшим количеством соединений (Locality-Based Least Connections with Replication). Аналогичен LBLC, но поддерживает список серверов для каждого целевого IP-адреса вместо одного сервера. Если целевой IP-адрес имеет перегруженный сервер, новый сервер добавляется в список, и запрос направляется к нему. Когда группа серверов не обновляется в течение некоторого времени, самый загруженный сервер удаляется из списка для снижения уровня репликации. Пятый шаг: перед проведением теста на доступ необходимо убедиться, что все узлы бэкенда доступны по отдельности.

Тест на связность. Узлы бэкенда.

[root@lb01 conf]# curl -H host:www.etiantian.org  10.0.0.8
web01 www
[root@lb01 conf]# curl -H host:www.etiantian.org  10.0.0.7
web02 www
[root@lb01 conf]# curl -H host:www.etiantian.org  10.0.0.9
web03 www
[root@lb01 conf]# curl -H host:bbs.etiantian.org  10.0.0.9
web03 bbs
[root@lb01 conf]# curl -H host:bbs.etiantian.org  10.0.0.8
web01 bbs
[root@lb01 conf]# curl -H host:bbs.etiantian.org  10.0.0.7
web02 bbs

Шестой шаг: просмотр состояния виртуального IP-адреса.

[root@lb01 conf]# ip a
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN 
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
    inet6 ::1/128 scope host 
       valid_lft forever preferred_lft forever
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP qlen 1000
    link/ether 00:0c:29:90:7f:0d brd ff:ff:ff:ff:ff:ff
    inet 10.0.0.5/24 brd 10.0.0.255 scope global eth0
    inet 10.0.0.3/24 scope global secondary eth0:1
    inet6 fe80::20c:29ff:fe90:7f0d/64 scope link 
       valid_lft forever preferred_lft forever
3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP qlen 1000
    link/ether 00:0c:29:90:7f:17 brd ff:ff:ff:ff:ff:ff
    inet 172.16.1.5/24 brd 172.16.1.255 scope global eth1
    inet6 fe80::20c:29ff:fe90:7f17/64 scope link 
       valid_lft forever preferred_lft forever

Седьмой шаг: [резюме] изменение файла конфигурации.

Keepalived главный и резервный файлы конфигурации отличаются:

  • информация о router_id не совпадает;
  • описание состояния state не совпадает;
  • значения приоритета priority для главного и резервного серверов не совпадают.

HAProxy

HAProxy обеспечивает высокую доступность, балансировку нагрузки и проксирование на основе TCP и HTTP-приложений. Он является бесплатным, быстрым и надёжным решением. HAProxy особенно подходит для веб-сайтов с высокой нагрузкой, которым обычно требуется сохранение сеансов или семиуровневая обработка. HAProxy работает на текущем оборудовании и может поддерживать тысячи одновременных подключений. Его режим работы делает его простым, безопасным и легко интегрируемым в существующую архитектуру, защищая при этом ваши веб-серверы от раскрытия в сети.

Четырёхслойная балансировка нагрузки

Так называемая четырёхслойная относится к четвёртому уровню эталонной модели ISO. Четырёхслойный балансировщик нагрузки также называется четырёхслойным коммутатором. Он в основном основан на анализе потоков на уровне IP и TCP/UDP для реализации балансировки нагрузки на основе IP и порта. Распространёнными четырёхслойными балансировщиками нагрузки являются LVS и F5.

Четырёхслойная балансировка нагрузки использует информацию об IP-адресе и номере порта для распределения трафика между серверами. Это позволяет эффективно распределять нагрузку между несколькими серверами, обеспечивая высокую производительность и доступность системы.

Четырёхслойная балансировка нагрузки

На примере типичного TCP-приложения, когда клиент отправляет SYN-запрос на балансировщик нагрузки, последний выбирает оптимальный сервер на основе настроенного алгоритма балансировки нагрузки. Затем он изменяет целевой IP-адрес в пакете на IP-адрес выбранного сервера и перенаправляет пакет на этот сервер. Таким образом, запрос успешно направляется на выбранный сервер, и процесс балансировки завершается.

В некоторых стратегиях балансировки нагрузки для обеспечения корректной передачи пакетов от выбранного сервера обратно к клиенту, балансировщик может также изменять исходный IP-адрес пакета. Это делается для того, чтобы клиент мог получить ответ от правильного сервера.

Семислойная балансировка нагрузки

Семислойный балансировщик нагрузки (также известный как семислойный коммутатор) работает на самом высоком уровне OSI — прикладном уровне. Он поддерживает различные протоколы приложений, такие как HTTP, FTP, SMTP и другие. Семислойные балансировщики нагрузки могут анализировать содержимое пакетов и использовать эту информацию для более точного распределения нагрузки между серверами на основе конкретных требований приложения. Например, для балансировки нагрузки веб-сервера семислойный балансировщик может учитывать URL-адреса, домены, браузеры и языки запросов при принятии решения о том, какой сервер должен обрабатывать конкретный запрос.

Распространёнными семислойными балансировщиками нагрузки являются HAproxy, Nginx и другие.

Семислойная балансировка нагрузки

Здесь мы снова рассмотрим пример с TCP-приложением. В отличие от четырёхслойной балансировки, семислойному балансировщику необходимо получать содержимое пакета для принятия решения о выборе сервера. Поэтому он сначала устанавливает соединение с клиентом и сервером, а затем получает содержимое пакета от клиента. На основе этой информации и настроенного алгоритма балансировки он определяет, на какой внутренний сервер направить запрос. Этот процесс аналогичен работе прокси-сервера.

При сравнении процессов четырёхслойной и семислойной балансировок нагрузки можно заметить, что в случае семислойной балансировки нагрузка на оборудование балансировщика выше, но при этом обеспечивается более точное распределение нагрузки в соответствии с требованиями приложения.

Четырёхслойная и семислойная балансировки нагрузки

Сравнение процессов четырёхслойной и семислойной балансировок показывает, что при использовании семислойного подхода балансировщик устанавливает два соединения — с клиентом и с внутренним сервером. В то время как при четырёхслойном подходе устанавливается только одно соединение. Это означает, что требования к оборудованию для семислойной балансировки выше, а производительность ниже, чем у четырёхслойных систем.

Стратегии балансировки нагрузки

Существует несколько стратегий балансировки нагрузки, которые используются в зависимости от требований и особенностей системы:

  • roundrobin — простая циклическая балансировка;
  • static-rr — балансировка с учётом весов;
  • leastconn — выбор сервера с наименьшим количеством активных соединений;
  • source — балансировка на основе исходного IP-адреса запроса;
  • uri — балансировка на основе URI запроса;
  • url_param — балансировка на основе параметров URL;
  • rdp-cookie(name) — балансировка на основе cookie с именем name.

HAProxy и LVS: различия

HAProxy и LVS — это два разных решения для балансировки нагрузки. Они имеют свои преимущества и недостатки, и выбор между ними зависит от конкретных потребностей и условий использования. Вот некоторые из основных различий между ними:

  • Оба решения являются программными балансировщиками нагрузки, но LVS основан на операционной системе Linux, тогда как HAProxy реализован как отдельное приложение.
  • LVS использует четырёхслойную технологию балансировки на основе IP, в то время как HAProxy предлагает как четырёхслойное, так и семислойное решение, включая поддержку TCP и HTTP.
  • В LVS состояние системы отслеживается на уровне четвёртого слоя, что ограничивает возможности мониторинга. HAProxy предоставляет более широкий спектр возможностей для мониторинга, включая порты, URL и скрипты.
  • HAProxy обладает более широкими возможностями, но его производительность может быть ниже, чем у LVS, который ближе к аппаратным решениям по скорости обработки и пропускной способности.

Таким образом, выбор между HAProxy и LVS зависит от ваших конкретных требований и условий эксплуатации.

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/yu120-lemon-guide.git
git@api.gitlife.ru:oschina-mirror/yu120-lemon-guide.git
oschina-mirror
yu120-lemon-guide
yu120-lemon-guide
main