Middleware
Введение: сбор технических знаний, связанных с Redis, RocketMQ, Zookeeper, Netty, Tomcat и др.
SPI (Service Provider Interface) — это механизм обнаружения сервисов. Суть SPI заключается в том, что имена классов реализации интерфейса записываются в конфигурационном файле, а загрузчик сервиса считывает конфигурацию и загружает классы реализации. Это позволяет динамически заменять реализацию интерфейса во время выполнения программы, благодаря чему можно легко расширить функциональность программы с помощью механизма SPI.
В JDK есть функция SPI, основанная на классе java.util.ServiceLoader. Она позволяет получить доступ к нескольким файлам конфигурации реализаций в каталоге META-INF/services/. Чтобы решить проблему расширения функциональности, создадим файл com.github.yu120.test.SuperLoggerConfiguration в каталоге META-INF/services/, который будет содержать только одну строку с именем нашего класса реализации по умолчанию — com.github.yu120.test.XMLConfiguration (обратите внимание, что в одном файле можно указать несколько реализаций, разделяя их символами новой строки). Затем используем ServiceLoader для получения конфигурации механизма SPI:
// META-INF/services/com.github.test.test.SuperLoggerConfiguration:
com.github.yu120.test.XMLConfiguration
ServiceLoader<SuperLoggerConfiguration> serviceLoader = ServiceLoader.load(SuperLoggerConfiguration.class);
Iterator<SuperLoggerConfiguration> iterator = serviceLoader.iterator();
SuperLoggerConfiguration configuration;
while(iterator.hasNext()) {
// 加载并 инициализируем реализацию класса
configuration = iterator.next();
}
// Вызов метода configure для последней конфигурации
configuration.configure(configFile);
Логика Dubbo SPI инкапсулирована в классе ExtensionLoader. Его метод getExtensionLoader используется для извлечения экземпляра ExtensionLoader из кэша или создания нового экземпляра, если кэш не содержит соответствующей записи. Основная идея Dubbo SPI проста:
Конфигурационные файлы Dubbo SPI должны быть размещены в каталоге META-INF/dubbo. Пример конфигурации взят из документации Dubbo:
optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee
В отличие от конфигурации классов реализации в Java SPI, Dubbo использует пары ключ-значение для настройки, позволяя выбирать конкретные классы реализации при необходимости. Также необходимо добавить аннотацию @SPI к интерфейсу.
Motan использует SPI для обеспечения доступа между модулями на основе интерфейсов и имён. Это снижает связанность между модулями.
Реализация Motan SPI находится в motan-core/com/weibo/api/motan/core/extension. Структура каталогов выглядит следующим образом:
motan-core/com.weibo.api.motan.core.extension
|-Activation:SPI的扩展功能,例如过滤、排序
|-ActivationComparator:排序比较器
|-ExtensionLoader:核心,主要负责SPI的扫描和加载
|-Scope:模式枚举,单例、多例
|-Spi:注解,作用在接口上,表明这个接口的实现可以通过SPI的形式加载
|-SpiMeta:注解,作用在具体的SPI接口的实现类上,标注该扩展的名称
Файл конфигурации Spring SPI — это фиксированный файл META-INF/spring.factories, который работает аналогично Java SPI. Каждый интерфейс может иметь несколько реализаций. Использование простое:
// Получение всех фабрик, настроенных в spring.factories
List<LoggingSystemFactory>> factories = SpringFactoriesLoader.loadFactories(LoggingSystemFactory.class, classLoader);
Пример конфигурации в Spring Boot:
# Logging Systems
org.springframework.boot.logging.LoggingSystemFactory=\
org.springframework.boot.logging.logback.LogbackLoggingSystem.Factory,\
org.springframework.boot.logging.log4j2.Log4J2LoggingSystem.Factory
# PropertySource Loaders
org.springframework.boot.env.PropertySourceLoader=\
org.springframework.boot.env.PropertiesPropertySourceLoader,\
org.springframework.boot.env.YamlPropertySourceLoader
Тип | Хранимые байты | Биты | Диапазон значений |
---|---|---|---|
int | 4 | 32 | -2^31 ~ 2^31-1 |
short | 2 | 16 | -2^15 ~ 2^15-1 |
long | 8 | 64 | -2^63 ~ 2^63-1 |
byte | 1 | 8 | -2^7 ~ 2^7-1, -128 ~ 127 |
float | 4 | 32 | |
double | 8 | 64 | |
boolean | 1 | 8 | true, false |
char | 2 | 16 |
Перевод:
В Java, независимо от того, является ли символ цифрой, английским или китайским иероглифом, он занимает два байта |
Примечание:
На рисунке показана модель потока Redis.
Redis использует внутри себя однопоточный файловый обработчик событий File Event Handler, поэтому Redis называется однопоточной моделью. Он использует механизм мультиплексирования ввода-вывода I/O для одновременного прослушивания нескольких сокетов, помещая события, генерируемые сокетами, в очередь памяти. Диспетчер событий распределяет события по соответствующим обработчикам событий в зависимости от типа события на сокете. Файловый обработчик событий состоит из пяти частей:
— Несколько сокетов; — Программа мультиплексного ввода-вывода; — Очередь сокетов; — Диспетчер событий; — Обработчик событий (обработчик ответа соединения, обработчик запроса команды, обработчик ответа команды).
На следующем рисунке показан файловый обработчик событий Redis.
Процесс связи между клиентом и Redis:
Тип запроса 1: клиент инициирует запрос на установление соединения. Сервер отвечает событием AE_READABLE, и после получения события серверной программы сокета программа мультиплексного ввода-вывода помещает этот сокет в очередь. Диспетчер событий извлекает сокет из очереди и передаёт его обработчику ответа соединения для создания доступного для клиента сокета socket01. Сокет socket01 связывается с обработчиком запроса команды. Обработчик запроса команды считывает ключ и значение из socket01 и устанавливает их в памяти. Сокет socket01 связан с событием AE_WRITABLE обработчика ответа команды.
Тип запроса 2: клиент отправляет запрос set key value. Socket socket01 генерирует событие AE_READABLE и помещается в очередь. Диспетчер событий получает сокет socket01 из очереди и связывает его с обработчиком запросов команд. Обработчик запросов команд считывает key и value из socket01 и сохраняет их в памяти. Событие AE_WRITABLE сокета socket01 связано с обработчиком ответов команд.
Тип запроса 3: сервер возвращает результат. Socket socket01 генерирует событие AE_WRITABLE и помещается в очередь. Диспетчер событий получает socket01 из очереди и связывает его с обработчиком ответа команд. Ответ обработчика команд записывает результат операции в socket01 (например, ok). Затем обработчик ответов команд отменяет связь между событием AE_WRITABLE socket01 и собой.
Собственная сетевая система обработки событий, основанная на модели Reactor, была разработана для использования в качестве файлового обработчика событий (File Event Handler). Файловый обработчик событий использует программу мультиплексного ввода-вывода для одновременного мониторинга нескольких сокетов и связывает различные обработчики событий с соответствующими сокетами в соответствии с задачами сокета. Когда сокеты, за которыми ведётся наблюдение, готовы выполнить операции соединения (accept), чтения (read), записи (write) и закрытия (close), соответствующие события файла будут сгенерированы, и файловый обработчик событий вызовет соответствующий обработчик событий для обработки этих событий. Файловый обработчик событий работает в однопоточном режиме, но благодаря использованию программы мультиплексного ввода-вывода он может эффективно взаимодействовать с другими модулями Redis, работающими в однопоточном режиме. Это сохраняет простоту внутренней однопоточной архитектуры Redis.
Мультиплексный ввод-вывод относится к сетевому вводу-выводу, а мультиплексный означает несколько TCP-соединений (то есть сокетов или каналов), что означает совместное использование одного или нескольких потоков. Смысл в том, чтобы обрабатывать несколько соединений одним или несколькими потоками. Основное преимущество заключается в снижении системных издержек и отсутствии необходимости создавать слишком много процессов/потоков или поддерживать эти процессы/потоки. Мультиплексная модель ввода-вывода использует два системных вызова (select/poll/epoll и recvfrom), блокирующий ввод-вывод вызывает только recvfrom; select/poll/epoll ядро может одновременно обрабатывать несколько подключений, а не быстрее, поэтому при небольшом количестве подключений производительность может быть не лучше, чем у многопоточного + блокирующего ввода-вывода. В модели мультиплексного ввода-вывода каждый сокет устанавливается как неблокирующий, а блокировка вызывается функцией select, а не блокировкой сокета.
Принцип работы функции select: При взаимодействии клиента с сервером генерируются три типа дескрипторов файлов (FD): writefds (запись), readfds (чтение) и exceptfds (исключения). Функция select блокирует мониторинг трёх типов дескрипторов файлов, ожидая данных, доступных для чтения, записи или исключения, или тайм-аута, а затем возвращается. После возврата дескрипторы файлов в наборе fdset проверяются один за другим, и готовые дескрипторы используются для выполнения операций ввода-вывода. Преимущества:
— Поддерживается почти на всех платформах, хорошая кроссплатформенная поддержка. Недостатки:
— Поскольку используется метод опроса, производительность снижается с увеличением количества дескрипторов файлов FD. — Каждый вызов select() требует копирования набора fd из пользовательского режима в режим ядра и обхода (передача сообщений всегда происходит из режима ядра в пользовательский режим). — По умолчанию существует ограничение в 1024 дескриптора файлов для каждого процесса открытия, которое можно изменить с помощью определения макроса, но эффективность всё ещё низкая.
Принцип работы функции poll аналогичен принципу работы select, оба используют метод опроса + обход; единственное отличие состоит в том, что функция poll не имеет ограничения на количество дескрипторов файлов.
Принцип работы epoll: Нет ограничения на количество FD, требуется только одно копирование пользовательского режима в режим ядра, и используется механизм уведомления о времени для запуска. Через функцию epoll_ctl зарегистрируйте FD, и когда FD будет готов, вызовите callback для активации соответствующего FD и выполните соответствующую операцию ввода-вывода. Причина высокой производительности epoll заключается в следующих трёх функциях:
— epoll_create(): при запуске системы в Linux ядро запрашивает объект B+ дерева, возвращает epoll объект, который также является дескриптором файла. — epoll_ctl(): каждый раз, когда создаётся новое соединение, оно управляется этой функцией в объекте epoll, добавляя, удаляя или изменяя дескриптор файла, связанный с соединением, и привязывая функцию обратного вызова. — epoll_wait(): циклически перебирает все обратные вызовы и выполняет соответствующие операции ввода-вывода.
Преимущества:
— Нет ограничений на количество FD, поддерживаемое операционной системой, максимальное количество дескрипторов файлов составляет около 100 000. — Эффективность повышается за счёт использования механизма обратного вызова вместо метода опроса, и производительность не снижается с увеличением числа FD. — Пользовательское пространство и пространство ядра совместно используют одну и ту же область памяти через mmap (mmap — это метод сопоставления файлов, то есть сопоставление файла или другого объекта с адресным пространством процесса).
Пример: если имеется 1 миллион соединений, из которых 10 000 активны, мы можем сравнить производительность select, poll и epoll:
— select: по умолчанию 1024, необходимо 977 процессов для поддержки 1 миллиона соединений, что приводит к значительному снижению производительности процессора. — poll: нет ограничений на максимальное количество дескрипторов файлов, для 1 миллиона подключений требуется 1 миллион дескрипторов, и все они должны быть пройдены, что также приводит к нехватке ресурсов памяти. — epoll: при поступлении запроса создайте дескриптор и привяжите функцию обратного вызова, просто нужно пройти через 10 000 активных соединений, высокая эффективность и отсутствие необходимости в копировании памяти.
Почему Redis с однопоточным режимом всё ещё обладает такой высокой эффективностью?
— Чистые операции с памятью: данные хранятся в памяти, время отклика которой составляет примерно 100 наносекунд, что является важной основой для доступа Redis к миллиардам операций в секунду. — Неблокирующая модель мультиплексного ввода-вывода: Redis использует epoll в качестве реализации технологии мультиплексного ввода-вывода, а также объединяет события в Redis для преобразования операций соединения, чтения и записи в операции времени, избегая потери времени на ввод-вывод. — Реализация на языке C: близость к операционной системе обеспечивает более высокую скорость выполнения. — Избегание переключения контекста в одном потоке: избегание накладных расходов на переключение контекста, вызванных многопоточностью, и предотвращение проблем конкуренции, которые могут возникнуть в многопоточности.
Основные типы данных Redis и их применение:
— String (строка): кэш, счётчик, распределённая блокировка и т. д. — List (список): список, очередь, временная шкала списка подписчиков в социальных сетях и т. д. — Hash (хэш): информация о пользователе, хэш-таблица и т. д. — Set (набор): дедупликация, лайки, антипатии, общие друзья и т. д. — Zset (отсортированный набор): рейтинг популярности, рейтинг кликов и т. д. Redis и его структуры данных
Структура данных Hash в Redis позволяет изменять значение только одного свойства, как при обновлении атрибута в базе данных.
Практическое применение:
List представляет собой двусвязный список. С помощью этой структуры можно легко реализовать функции, такие как отображение последних сообщений (например, TimeLine в Weibo). List также может использоваться в качестве очереди сообщений: задачи помещаются в очередь с помощью операции PUSH, а затем извлекаются и выполняются с помощью операции POP рабочими потоками. Redis предоставляет API для работы с определёнными элементами списка, позволяя выполнять запросы, удаление элементов из списка.
Практическое применение:
Set представляет собой множество уникальных значений. Используя структуру данных Set в Redis, можно хранить данные, которые имеют свойство множества, например, список подписчиков пользователя в социальной сети. Благодаря тому, что Redis предлагает множество операций над множествами, таких как пересечение, объединение и разность, можно легко реализовывать функции, связанные с общими интересами, взаимными подписками и вторичными друзьями. Для всех операций над множествами можно выбрать, возвращать ли результат клиенту или сохранять его в новом множестве.
Практическое применение:
В отличие от обычного множества, элементы в Sorted Sets имеют дополнительный атрибут — вес (score), который позволяет упорядочивать элементы по весу. Например, в списке учеников класса их имена могут выступать в качестве элементов, а оценки — в качестве веса. Это позволяет автоматически сортировать список по оценкам. Кроме того, Sorted Sets можно использовать для создания очередей с приоритетами, где обычные сообщения имеют вес 1, а важные сообщения — 2. Рабочие потоки могут выбирать задачи для выполнения в порядке убывания веса, обеспечивая приоритет важных задач.
Практическое применение:
HyperLogLog используется для приблизительного подсчёта количества уникальных элементов. Вместо хранения каждого элемента, он использует вероятностный алгоритм, основанный на хранении позиции первого бита, равного 1, в хэше элемента. HyperLogLog позволяет выполнять подсчёт с минимальными затратами памяти.
Команды:
Команда | Действие |
---|---|
pfadd key element ... | Добавление всех элементов в ключ |
pfcount key | Подсчёт оценочного значения ключа (не точное) |
pgmerge new_key key1 key2 ... | Объединение ключей в новый ключ |
Пример использования: Как подсчитать количество разных аккаунтов, посетивших главную страницу Google за день? Для такого большого сайта, как Google, точное количество посещений не имеет большого значения, поэтому можно ограничиться приблизительным подсчётом. Для решения этой задачи можно использовать HashMap, BitMap или HyperLogLog.
Сравнение решений:
Geo используется для хранения и обработки географической информации, такой как местоположение. Redis позволяет хранить Geo-информацию в упорядоченных множествах (zset), используя Geohash для индексации. Команды:
Команда | Действие |
---|---|
geoadd key latitude longitude member | Добавление местоположения (широты, долготы и имени) в ключ |
geopos key member ... | Получение координат местоположения |
geodist key member1 member2 [unit] | Вычисление расстояния между двумя местоположениями. Если одно из местоположений не существует, возвращается пустое значение |
georadius | Поиск по диапазону координат |
georadiusbymember | Поиск по радиусу вокруг местоположения члена |
geohash | Вычисление геохеша для широты и долготы |
GEORADIUS:
GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count]
Возвращает все элементы в ключе, находящиеся в радиусе, заданном широтой, долготой и радиусом. Единицы измерения радиуса:
Опции:
По умолчанию GEORADIUS возвращает неупорядоченный список элементов. Можно указать порядок сортировки с помощью параметров ASC (от ближнего к дальнему) или DESC (от дальнего к ближнему).
Опция COUNT <count>
позволяет ограничить количество возвращаемых элементов. Если данные не конфиденциальны и могут быть сгенерированы заново из других источников, можно отключить сохранение на диск.
Когда включены оба режима сохранения (RDB и AOF), Redis будет использовать AOF для восстановления данных, поскольку файлы, сохранённые AOF, более полные.
RDB (Redis Database Backup File, файл резервной копии базы данных Redis) — это метод сохранения всех пар ключ-значение в базе данных Redis в виде снимка данных через определённые промежутки времени. В определённый момент данные записываются во временный файл, а после завершения процесса сохранения этот файл заменяет предыдущий файл сохранения.
Создание
При сохранении данных Redis программа сохраняет текущее состояние базы данных в памяти на диске. Создание RDB-файлов включает два основных Redis-команды: SAVE и BGSAVE.
Загрузка
Во время загрузки RDB-файла сервер находится в заблокированном состоянии до завершения загрузки.
Команда save является синхронной операцией. Во время выполнения команды сервер блокируется, отказывая клиентам в отправке командных запросов.
Процесс выполнения: клиент отправляет команду сохранения на сервер, сервер создаёт дочерний процесс, который записывает данные на диск, после чего сервер разблокируется и продолжает обработку клиентских запросов. Если серверу необходимо сохранить большой объём данных, выполнение команды save может занять значительное время, блокируя все клиентские запросы. Поэтому команда save редко используется в производственной среде. Вместо неё обычно применяется команда BGSAVE. Если сохранение данных с помощью BGSAVE завершается неудачно, команда save используется для сохранения последних данных.
Команда bgsave является асинхронной операцией. При выполнении команды дочерний процесс сохраняет данные, в то время как основной поток сервера продолжает обрабатывать клиентские запросы.
Процесс выполнения: сервер создаёт дочерний процесс и сохраняет данные на диск. Основной поток продолжает обслуживать клиентские запросы, пока дочерний процесс завершает сохранение данных. Если сохранение успешно завершено, клиент может проверить результат с помощью команды LASTSAVE.
С помощью конфигурационного файла можно настроить Redis таким образом, чтобы он автоматически сохранял данные при выполнении определённых условий, таких как изменение определённого количества ключей за заданный промежуток времени:
# RDB автоматическое сохранение правил
# Автоматически сохранять данные при изменении хотя бы одного ключа за 900 секунд
save 900 1
# Автоматически сохранять данные при изменении как минимум 10 ключей за 300 секунд
save 300 10
# Автоматически сохранять данные при изменении более 10 000 ключей за 60 секунд
save 60 10000
# Имя файла сохранения RDB
dbfilename dump-<port>.rdb
# Каталог хранения файлов сохранения
dir /var/lib/redis
# Останавливать ли запись при ошибке bgsave, обычно да
stop-writes-on-bgsave-error yes
# Использовать ли сжатие при сохранении RDB
rdbcompression yes
# Проверять ли контрольную сумму при сохранении RDB, обычно да
rdbchecksum yes
Стандартная конфигурация RDB выглядит следующим образом:
################################ SNAPSHOTTING ################################
#
# Сохранять базу данных на диск:
# Автоматическое сохранение при изменении определённого количества ключей в течение заданного времени
# save <seconds> <changes>
#
save 900 1
save 300 10
save 60 10000
#bgsave发生错误时是否停止写入,一般为yes
stop-writes-on-bgsave-error yes
#持久化时是否使用LZF压缩字符串对象?
rdbcompression yes
#是否对rdb文件进行校验和检验,通常为yes
rdbchecksum yes
# RDB持久化文件名
dbfilename dump.rdb
#持久化文件存储目录
dir ./
AOF (Append Only File, добавочный журнал) — это способ сохранения всех командных запросов в виде журнала в файле. Redis сначала выполняет команду, затем записывает её в журнал. Поскольку этот метод является только добавлением, нет накладных расходов на поиск диска, что делает его быстрым и похожим на binlog MySQL. AOF больше подходит для горячего резервирования.
Преимущества:
Недостатки:
Если AOF включён, после выполнения каждой команды сервер добавляет её в буфер aof_buf. Затем перед завершением каждого события сервер вызывает функцию flushAppendOnlyFile, которая решает, следует ли записать содержимое буфера aof_buf в файл AOF и сохранить его на диск. Функция flushAppendOnlyFile управляется параметром appendfsync сервера, который может принимать значения always, everysec или no. Эти значения определяют стратегию сохранения данных:
Восстановление данных из AOF-журнала выполняется путём анализа и выполнения команд, записанных в журнале. Создаётся новый клиент без сетевого подключения, и команды выполняются последовательно до тех пор, пока весь журнал не будет обработан.
Перезапись файла необходима для предотвращения его чрезмерного увеличения. Новый файл создаётся без потери данных, так как каждая пара ключ-значение записывается одной командой вместо множества команд, которые могли использоваться ранее.
Переписывание происходит в фоновом режиме, чтобы не блокировать основной процесс сервера. Во время переписывания сервер продолжает выполнять клиентские команды, добавляя новые команды в новый буфер и сохраняя их в старом буфере. Redis 5.0.3 (00000000/0) 64 bit
.- .-```. ```\/ _.,_ ''-._ ( ' , .-` | `, ) Running in standalone mode |`-._`-...-` __...-.
-.|'_.-'| Port: 6380 |
-. ._ / _.-' | PID: 93825 -._ -._ -./ _.-' _.-' |
-.-._
-.__.-' .-'.-'|
| -._
-. .-'.-' | http://redis.io
-._ -.-.__.-'_.-' _.-' |
-.-._
-..-' .-'.-'|
| -._
-._ .-'.-' |
-._ -._-.__.-'_.-' _._-' -._
-..-' .-'
-. _.-'
`-.__.-'
Redis主从配置非常简单,过程如下(演示情况下主从配置在一台电脑上):
第一步: 复制两个redis配置文件(启动两个redis,只需要一份redis程序,两个不同的redis配置文件即可)
mkdir redis-master-slave
cp path/to/redis/conf/redis.conf path/to/redis-master-slave master.conf
cp path/to/redis/conf/redis.conf path/to/redis-master-slave slave.conf
第二步: 修改配置
## master.conf
port 6379
## slave.conf
port 6380
slaveof 127.0.0.1 6379
第三步: 分别启动两个redis
redis-server path/to/redis-master-slave/master.conf
redis-server path/to/redis-master-slave/slave.conf
启动之后,打开两个命令行窗口,分别执行 telnet localhost 6379
和 telnet localhost 6380
,然后分别在两个窗口中执行 info
命令,可以看到:
# Replication
role:master
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
主从配置没问题。然后在master 窗口执行 set 之后,到slave窗口执行get,可以 get到,说明主从同步成功。这时,我们如果在slave窗口执行 set ,会报错:
-READONLY You can't write against a read only replica.
因为从节点是只读的。
Sentinel是用来监控主从节点的健康情况。客户端连接Redis主从的时候,先连接Sentinel,Sentinel会告诉客户端主Redis的地址是多少,然后客户端连接上Redis并进行后续的操作。当主节点挂掉的时候,客户端就得不到连接了因而报错了,客户端重新向Sentinel询问主master的地址,然后客户端得到了[新选举出来的主Redis],然后又可以愉快的操作了。
哨兵 sentinel 配置
为了说明 sentinel 的用处,我们做个试验。配置 3 个 redis (1 主 2 从),1 个哨兵。步骤如下:
mkdir redis-sentinel
cd redis-sentinel
cp redis/path/conf/redis.conf path/to/redis-sentinel/redis01.conf
cp redis/path/conf/redis.conf path/to/redis-sentinel/redis02.conf
cp redis/path/conf/redis.conf path/to/redis-sentinel/redis03.conf
touch sentinel.conf
上我们创建了 3 个 redis 配置文件,1 个哨兵配置文件。我们将 redis01 设置为 master,将redis02,redis03 设置为 slave。
vim redis01.conf
port 63791
vim redis02.conf
port 63792
slaveof 127.0.0.1 63791
vim redis03.conf
port 63793
slaveof 127.0.0.1 63791
vim sentinel.conf
daemonize yes
port 26379
sentinel monitor mymaster 127.0.0.1 63793 1 # 下面解释含义
上面的主从配置都熟悉,只有哨兵配置 sentinel.conf,需要解释一下:
mymaster # 为主节点名字,可以随便取,后面程序里边连接的时候要用到
127.0.0.1 63793 # 为主节点的 ip,port
1 # 后面的数字 1 表示选举主节点的时候,投票数。1表示有一个 sentinel 同意即可升级为 master
启动哨兵
上面我们配置好了 redis 主从,1 主 2 从,以及 1 个哨兵。下面我们分别启动 redis,并启动哨兵:
redis-server path/to/redis-sentinel/redis01.conf
redis-server path/to/redis-sentinel/redis02.conf
redis-server path/to/redis-sententer/redis03.conf
redis-server path/to/redis-sentinel/sentinel.conf --sentinel
启动之后,可以分别连接到 3 个 redis 上,执行 info 查看主从信息。
模拟主节点宕机情况
运行上面的程序(注意,在实验这个效果的时候,可以将 sleep 时间加长或者 for 循环增多,以防程序提前停止,不便看整体效果),然后将主 redis 关掉,模拟 redis 挂掉的情况。现在主 redis 为 redis01,端口为 63791。
redis-cli -p 63791 shutdown
上述所做的这些工作只是保证了数据备份以及高可用,目前为止我们的程序一直都是向 1 台 redis 写数据,其他的 redis 只是备份而已。实际场景中,单个 redis 节点可能不满足要求,因为:
所以需要 redis cluster 即 redis 集群。Redis 集群是一个提供在多个 Redis 间节点间共享数据的程序集。Redis 集群并不支持处理多个 keys 的命令,因为这需要在不同的节点间移动数据,从而达不到像 Redis 那样的性能, 在高负载的情况下可能会导致不可预料的错误。Redis 集群通过分区来提供一定程度的可用性,在实际环境中当某个节点宕机或者不可达的情况下继续处理命令.。Redis 集群的优势:
为了配置一个 redis cluster, 我们需要准备至少 6 台 redis,为啥至少 6 台呢?我们可以在 redis 的官方文档中找到如下一句话:
Note that the minimal cluster that works as expected requires to contain at least three master nodes. Тема: Если в Redis есть 100 миллионов ключей, и из них 1 миллион начинается с определённого префикса, как их найти?
Используя команду keys, можно получить список ключей, соответствующих заданному шаблону. Затем собеседник спрашивает: если Redis обслуживает онлайн-сервисы, какие проблемы могут возникнуть при использовании команды keys? В этом случае необходимо ответить, что Redis является однопоточным. Команда keys может привести к блокировке потока на некоторое время, в результате чего онлайн-сервис будет недоступен до завершения выполнения команды. В этой ситуации можно использовать команду scan, которая позволяет извлекать список ключей без блокировки, но с некоторой вероятностью дублирования. На клиенте можно выполнить дедупликацию, но общее время выполнения будет больше, чем при использовании команды keys.
Kafka
RocketMQ
RocketMQ — это распределённое промежуточное ПО для обмена сообщениями от Alibaba. Оно поддерживает транзакции, упорядоченные сообщения, пакетные сообщения, сообщения с таймером и сообщения с возможностью отката. RocketMQ имеет несколько концепций, отличающихся от стандартных систем обмена сообщениями, таких как группы, темы и очереди. Система состоит из производителей, потребителей, брокеров и серверов имён.
Преимущества:
— Сглаживание пиковых нагрузок: решение проблем потери сообщений и сбоя системы из-за кратковременных всплесков нагрузки. — Развязка приложений: решение проблемы зависимости между системами разной важности и производительности, приводящей к каскадным сбоям. — Повышение производительности: отправка одного сообщения в систему обмена сообщениями для уведомления связанных систем. — Буферизация потоков: возможность тестирования систем с нестабильными каналами связи путём накопления сообщений. — Асинхронная обработка: сокращение времени отклика за счёт исключения синхронных вызовов удалённых процедур.
Архитектура:
Модель развёртывания:
Роли:
Брокер:
— Понимание как сам RocketMQ. — Брокеры принимают и отправляют сообщения от производителей и потребителей. — Брокер периодически отправляет информацию на сервер имён. — Является сервером хранения и пересылки сообщений. — Каждый брокер при запуске находит все серверы имён, устанавливает с ними постоянное соединение и регистрирует свою информацию. После этого он периодически отправляет обновления.
Сервер имён:
— Аналогичен Zookeeper, но реализован самостоятельно. Предоставляет функции маршрутизации, регистрации сервисов и обнаружения сервисов. Это узел без состояния. — Сервер имён основан на Netty и предоставляет маршрутизацию, регистрацию сервисов и обнаружение сервисов. Он не сохраняет состояние и не имеет главного и резервного серверов. — Серверы имён периодически отправляют обновления друг другу для обеспечения высокой доступности. — Информация о брокерах и темах хранится в памяти сервера имён и не сохраняется постоянно.
Производитель:
— Создаёт сообщения. — Выбирает случайный сервер имён и устанавливает постоянное соединение для получения информации о темах. — Устанавливает постоянное соединение с главным сервером для отправки сообщений. Периодически отправляет обновления главному серверу.
Потребитель:
— Получает сообщения. — Использует сервер имён для поиска тем. — Подключается к брокерам для получения сообщений. Поскольку сообщения могут быть отправлены как главным, так и подчинённым сервером, потребитель подключается к обоим.
Основные процессы:
— Все брокеры регистрируются на серверах имён. — Производители получают информацию о темах от серверов имён. — Производители устанавливают постоянные соединения с главными серверами для отправки сообщений и периодически отправляют им обновления. — Потребители получают информацию о темах от серверов имён и подключаются к брокерам. — Потребители подключаются как к главным, так и к подчинённым серверам брокеров для получения сообщений.
Принцип работы:
RocketMQ состоит из серверов имён, производителей, потребителей и нескольких брокеров (процессов RocketMQ). Принцип работы следующий:
— При запуске брокеры находят все серверы имён и устанавливают с ними постоянные соединения. Они также отправляют обновления каждые 30 секунд. — Когда производители отправляют сообщения, они получают информацию о теме от сервера имён. — Производители отправляют сообщения на главный сервер, с которым они установили постоянное соединение. Они периодически отправляют ему обновления. — Потребители используют сервер имён для получения информации о теме и подключения к брокерам. Они подключаются как к главному, так и к подчинённому серверу брокера для получения сообщений. Ядро дизайна
Сообщение очистки
После того как сообщение в брокере было использовано, оно не удаляется сразу. Каждое сообщение сохраняется в CommitLog, и каждый потребитель, подключенный к брокеру, сохраняет информацию о ходе потребления. После использования сообщения обновляется только ход потребления текущего потребителя (смещение в CommitLog). По умолчанию файлы CommitLog, которые больше не используются, удаляются через 48 часов:
/**
* {@link org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isTimeToDelete()}
*/
private boolean isTimeToDelete() {
// when = "04";
String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
// 是04点,就返回true
if (UtilAll.isItTimeToDo(when)) {
return true;
}
// 不是04点,返回false
return false;
}
/**
* {@link org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles()}
*/
private void deleteExpiredFiles() {
// isTimeToDelete() этот метод — это проверка, действительно ли сейчас 4 часа, и если да, то выполнение удаления логики.
if (isTimeToDelete()) {
// 默认 72, но broker конфигурационный файл по умолчанию изменил на 48, поэтому новые версии все 48.
long fileReservedTime = 48 * 60 * 60 * 1000;
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(72 * 60 * 60 * 1000, xx, xx, xx);
}
}
/**
* {@link org.apache.rocketmq.store.CommitLog#deleteExpiredFile()}
*/
public int deleteExpiredFile(xxx) {
// Этот метод основной логики — это перебрать поиск последнего изменения времени + срок действия, меньше текущего системного времени, удалить его (то есть меньше 48 часов).
return this.mappedFileQueue.deleteExpiredFileByTime(72 * 60 * 60 * 1000, xx, xx, xx);
}
Push или pull
RocketMQ не имеет истинного смысла push, всё это pull, хотя есть класс push, но фактическая реализация нижнего уровня использует длинный опрос (длинный опрос), то есть способ извлечения. Атрибут Broker longPollingEnable
указывает, включён ли длинный опрос, по умолчанию он включён. Исходный код выглядит следующим образом:
// {@link org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage()}
// Извлекает сообщение, помещает результат в pullCallback.
this.pullAPIWrapper.pullKernelImpl(pullCallback);
Почему нужно активно извлекать сообщения вместо использования механизма событий?
Механизм событий заключается в установлении длинного соединения, и данные передаются способом события (отправка данных) для реального времени. Если брокер активно отправляет сообщения, возможно, скорость отправки сообщений будет быстрой, а скорость потребления медленной, что может привести к накоплению сообщений на стороне потребителя и невозможности их использования другими потребителями. Метод извлечения позволяет потребителю извлекать в соответствии с текущей ситуацией и не создаёт слишком большого давления, приводящего к узким местам. Поэтому используется метод извлечения.
Балансировка нагрузки
RocketMQ реализует балансировку нагрузки путём распределения тем по нескольким брокерам.
На стороне производителя:
По умолчанию стратегия — случайный выбор:
Другие реализации:
Также можно настроить реализацию интерфейса MessageQueueSelector в методе выбора.
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
На стороне потребителя:
используется алгоритм равномерного распределения для балансировки нагрузки.
Другие алгоритмы балансировки нагрузки:
Что произойдёт, если потребитель и очередь не сбалансированы?
Потребитель и очередь будут приоритетно равномерно распределены. Если потребителей меньше, чем очередей, некоторые потребители будут использовать несколько очередей. Если количество потребителей равно количеству очередей, каждый потребитель будет использовать одну очередь. Если число потребителей больше числа очередей, некоторые потребители останутся без дела.
Лучшие практики
Производитель:
Topic — сообщения классифицируются по различным бизнес-сообщениям.
Tag — используется для дальнейшей классификации сообщений в определённой теме, сообщения отправляются с этим атрибутом.
key — каждый сообщение имеет уникальный идентификатор на уровне бизнеса, который должен быть установлен в поле keys для облегчения поиска содержимого сообщения и того, кто его потребляет. Поскольку это хеш-индекс, убедитесь, что ключ максимально уникален, чтобы избежать потенциального конфликта хешей.
журнал — необходимо регистрировать успешную отправку или неудачу сообщения, обязательно регистрируйте send result и поле ключа.
send — метод отправки сообщения. Пока не возникает исключение, отправка считается успешной. Однако успешная отправка имеет несколько состояний, определённых в sendResult.
Подписка на отношения должна быть последовательной.
Несколько Group ID подписаны на несколько тем, и подписка каждого экземпляра потребителя в нескольких Group ID должна быть одинаковой.

Потребитель:
Потребительский детерминизм — чтобы предотвратить повторные потребительские сообщения, вызывающие аномалии в бизнесе, потребитель RocketMQ версии в очереди сообщений после получения сообщения должен выполнять обработку на основе уникального ключа сообщения. Повторение сообщений может произойти в следующих сценариях:
Журнал — регистрирует потребление, чтобы облегчить последующее позиционирование проблем.
Пакетное потребление — пакетное потребление может значительно повысить пропускную способность потребления.
Транзакционные сообщения:
Принцип согласованности MQ и DB (двухсторонняя транзакция):
 Транзакции сообщений — это возможность MQ, которая похожа на XA и обеспечивает распределённую транзакционную согласованность.
Получив сообщение от производителя, MQ возвращает подтверждение (ack). Производитель начинает локальную транзакцию. Если она завершается успешно, производитель отправляет в MQ фиксацию (commit), а если неудачно — откат (rollback).
Если MQ долго не получает от производителя подтверждения фиксации или отката, то инициирует запрос на проверку состояния транзакции. Производитель проверяет состояние транзакции и повторно отправляет подтверждение.
Если MQ получает подтверждение фиксации, он доставляет сообщение потребителю. В противном случае сообщение сохраняется и удаляется через 3 дня.
Порядок сообщений
Сообщения RocketMQ хранятся в очередях топиков. Очередь — это FIFO (First In First Out), поэтому порядок сообщений в отдельной очереди гарантирован.
Существует два типа упорядоченных сообщений:
Для двух заказов с сообщениями a1, b1, b2, a2, a3, b3 (в порядке абсолютного времени):
При отправке и хранении сообщений соблюдается их порядок. При потреблении сообщений также соблюдается порядок их хранения.
MQPullConsumer управляется пользовательским потоком и активно извлекает сообщения от сервера. Каждый раз, когда список msgFoundList в PullResult заполняется, пользователь должен сам обеспечить порядок потребления.
В MQPushConsumer пользователь регистрирует MessageListener для потребления сообщений. Клиент должен гарантировать порядок вызова MessageListener.
Потеря сообщений
Сообщение проходит три этапа: производство, хранение и потребление. На любом из этих этапов сообщение может быть потеряно. Чтобы решить проблему потери сообщений, нужно определить причины потери на каждом этапе и принять соответствующие меры.
На этапе производства Producer отправляет сообщение брокеру MQ через сеть. Отправка может завершиться неудачей из-за сетевых проблем или недоступности брокера.
Неудачи автоматически повторяются. Даже после нескольких попыток, если сообщение не удаётся отправить, клиент может предпринять собственные действия для компенсации, не влияя на основную бизнес-логику. Кроме того, даже если брокер выходит из строя, другие брокеры продолжают предоставлять услуги, обеспечивая высокую доступность системы.
Чтобы избежать потери сообщений на этапе производства, рекомендуется использовать синхронные отправки, автоматические повторные попытки и несколько главных узлов.
Синхронная отправка подразумевает блокировку при отправке сообщения до получения ответа от брокера. Если ответ не получен, считается, что отправка не удалась. Это наиболее надёжный способ отправки.
Асинхронная отправка позволяет узнать об успешной отправке через обратный вызов. Однонаправленная отправка (OneWay) наименее надёжна, так как невозможно гарантировать доставку сообщения.
По умолчанию повторные попытки выполняются три раза. Количество попыток можно изменить с помощью API.
Автоматические повторные попытки означают, что при неудачной отправке или истечении времени ожидания сообщение будет автоматически повторено. По умолчанию выполняется три попытки синхронной отправки и одна попытка асинхронной отправки. Если брокер выходит из строя, но в производственной среде обычно используется несколько брокеров, то другие главные узлы продолжат предоставлять услуги, и это не повлияет на отправку сообщений, сообщения по-прежнему будут доставляться. Например, если сообщение отправляется брокеру, когда он выходит из строя, производитель получает ответ об ошибке отправки от брокера, в этом случае производитель автоматически повторяет попытку. В это время отказавший брокер отключается, поэтому производитель переключится на другого брокера для отправки сообщения.
Предположим, что мы хотим строго гарантировать, что сообщения не будут потеряны на этапе хранения сообщений брокера. Для этого потребуется следующая конфигурация, хотя производительность будет значительно ниже по сравнению со стандартной конфигурацией:
# Конфигурация главного узла
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER
# Конфигурация подчиненного узла
brokerRole=slave
flushDiskType = SYNC_FLUSH
Настройка стратегии синхронной записи на диск для брокера. По умолчанию сообщения сохраняются в памяти после того, как они достигают брокера. Затем брокер немедленно возвращает подтверждение производителю. После этого брокер периодически записывает группу сообщений из памяти на диск асинхронно. Этот метод уменьшает количество операций ввода-вывода и обеспечивает лучшую производительность, но если происходит сбой машины или аварийный выход из системы, сообщения могут быть потеряны, если они не были записаны на диск вовремя.
Если мы хотим гарантировать, что брокер не потеряет сообщения и обеспечить надёжность сообщений, нам необходимо изменить механизм сохранения сообщений на синхронный режим записи на диск, то есть сообщение будет сохранено на диске успешно, только тогда будет возвращён ответ. Измените конфигурацию брокера следующим образом:
# По умолчанию используется ASYNC_FLUSH
flushDiskType = SYNC_FLUSH
Если брокер не завершает запись на диск в течение заданного времени (по умолчанию 5 секунд), он вернёт статус SendStatus.FLUSH_DISK_TIMEOUT производителю.
Даже если для брокера настроена стратегия синхронной записи на диск, но после завершения записи диск выходит из строя, это может привести к потере всех сообщений на диске. Однако, даже если у нас есть один главный и один подчинённый сервер, и главный сервер завершил запись на диск до того, как диск вышел из строя, все сообщения на диске всё равно могут быть утеряны. Поэтому мы также можем настроить систему так, чтобы она уведомляла производителя о завершении записи на диск не только главным сервером, но и подчинённым сервером.
# По умолчанию — ASYN_MASTER
brokerRole=SYNC_MASTER
Потеря сообщений также может произойти на этапе потребления.
Только когда режим потребления установлен на MessageModel.CLUSTERING (режим кластера), брокер автоматически повторит попытку. Для широковещательных сообщений повторная попытка не выполняется. Если сообщение не удаётся обработать, RocketMQ будет отправлять такие сообщения в очередь недоставленных сообщений после достижения максимального количества попыток. Затем мы должны обратить внимание на очередь недоставленных сообщений и вручную компенсировать эти сообщения.
Потребитель сначала извлекает сообщения локально, затем выполняет бизнес-логику. После выполнения бизнес-логики потребитель должен подтвердить получение сообщения вручную. Только тогда потребление считается завершённым. Это не означает, что потребление завершено после извлечения сообщения локально. Вот пример:
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try{
for (MessageExt msg : msgs) {
String str = new String(msg.getBody());
System.out.println(str);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch(Throwable t){
log.error("消费异常:{}", msgs, t);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
В следующих трёх случаях брокер обычно повторяет попытку (по умолчанию максимальное количество попыток составляет 16 раз). RocketMQ использует стратегию «уменьшения времени» для повторной отправки сообщений. Чем больше попыток, тем меньше вероятность успешного потребления сообщения. Мы можем настроить количество повторных попыток и интервал времени (интервал между первой отправкой) в файле конфигурации брокера broker.conf. Конфигурация выглядит следующим образом:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Клиент-потребитель сначала определяет, явно ли установлено максимальное количество повторных попыток MaxReconsumeTimes. Если нет, устанавливается значение по умолчанию, равное 16. В противном случае используется установленное максимальное количество повторных попыток.
private int getMaxReconsumeTimes() {
// default reconsume times: 16
if (this.defaultMQPushConsumer.getMaxReconsueTimes() == -1) {
return 16;
} else {
return this.defaultMQPushConsumer.getMaxReconsueTimes();
}
}
Если потребление занимает слишком много времени, MQ будет бесконечно отправлять сообщения потребителю. Эта ситуация возникает, когда потребитель не возвращает ConsumeConcurrentlyStatus. CONSUME_SUCCESS или ConsumeConcurrentlyStatus.RECONSUME_LATER.
Сообщения, которые не удалось обработать, будут отправлены в очередь недоставленных сообщений. Обработка логики очереди недоставленных сообщений:
Наконец, запустите отдельный потребитель очереди недоставших сообщений для потребления и выполните ручную компенсацию неудачных сообщений.
Существует три режима потребления сообщений во всех системах обмена сообщениями: at-most-once (максимум один раз), at-least-once (минимум один раз) и exactly-only-once (точно один раз). Распределённые системы обмена сообщениями находятся между этими тремя режимами, первые два из которых являются осуществимыми и широко используются.
at-most-once: после отправки сообщения независимо от того, было ли оно успешно обработано, сообщение больше не будет отправляться повторно, что может привести к тому, что сообщение не будет обработано. RocketMQ не использует этот метод. at-lease-once: после успешной обработки сообщения клиент возвращает ACK серверу. Если обработка не удалась, сервер не получит ACK от клиента, и сообщение будет отправлено повторно. Это может привести к повторному потреблению сообщения, и RocketMQ гарантирует, что каждое сообщение будет обработано хотя бы один раз, используя ACK. exactly-only-once: в распределённой системе, если мы хотим реализовать этот режим, неизбежны огромные накладные расходы. RocketMQ не гарантирует эту функцию, и невозможно избежать дублирования сообщений. Бизнес-система должна обрабатывать уникальность сообщений. Чтобы считаться «Exactly Only Once», должны выполняться следующие условия:
Zookeeper — это централизованная служба для хранения информации о конфигурации, наименованиях, состоянии и других данных, используемых распределёнными приложениями. Zookeeper предоставляет следующие основные функции:
То же, что и последователь, за исключением того, что он не участвует в выборах лидера и его состояние — OBSERING. Может использоваться для линейного расширения QPS чтения. Запуск этапа Leader-выборов
Узел A сначала голосует за себя, информация о голосовании содержит идентификатор узла (SID) и уникальный увеличивающийся номер ZXID, например (1, 0). SID настроен и уникален, ZXID — это уникальный возрастающий номер.
Узел B сначала голосует за себя, информация о голосовании — (2, 0).
Затем узлы A и B голосуют за себя для всего кластера.
После того как узел A получает информацию о голосовании от узла B, он проверяет, находится ли узел B в состоянии текущего голосования и является ли он узлом, который ищет лидера (LOOKING).
Голосование PK: узел A будет сравнивать своё голосование с другими и обновлять его, если другой узел отправил больший ZXID. Если ZXID равны, то сравниваются SID. Здесь ZXID узлов A и B одинаковы, а SID узла B больше, поэтому узел A обновляет информацию о своём голосовании до (2, 0), а затем отправляет её снова. Узлу B не нужно обновлять информацию о голосовании, но ему всё равно нужно отправить её ещё раз на следующей итерации. На данный момент информация о голосовании узла A равна (2, 0):
SID | ZXID | |
---|---|---|
Узел А | 1 | 0 |
Узел В | 2 | 0 |
Обработка отказа лидера во время работы Zookeeper
Во время работы Zookeeper лидер всегда остаётся в состоянии LEADING, пока не произойдёт отказ лидера. В этом случае необходимо выбрать нового лидера, и процесс выбора аналогичен процессу запуска. Следует обратить внимание на следующие моменты:
Процесс синхронизации данных между узлами
Различные клиенты могут подключаться к главному или резервному узлу. Когда клиент отправляет запрос на чтение или запись, он не знает, подключён ли он к лидеру или последователю. Если клиент подключён к главному узлу и отправляет запрос на запись, лидер выполняет двухфазную фиксацию (2PC) для синхронизации с другими последователями и наблюдателями. Однако если клиент подключён к последователю и отправляет запрос на запись, последователь перенаправляет запрос лидеру, который затем выполняет 2PC для синхронизации данных с последователем. Двухфазная фиксация (2PC):
Давайте рассмотрим процесс синхронизации данных лидера:
Теперь давайте посмотрим, как последователь обрабатывает полученное предложение лидера после его получения:
В настоящее время данные лидера и последователя находятся в структуре данных в памяти и синхронизированы, и клиент может получить согласованные данные от лидера и последователя.
Принцип последовательной согласованности ZAB
Лидер фактически создаёт очередь для каждого последователя при отправке предложения. Предложение помещается в соответствующую очередь. Как показано на рисунке ниже, это процесс широковещательной рассылки сообщений в Zookeeper:
Предложение |
---|
proposal01:zxid1 |
proposal02:zxid2 |
proposal03:zxid3 |
Клиент отправил три запроса на запись, соответствующие предложению:
Лидер получает запросы и помещает их в очередь один за другим. Последователи получают запросы по очереди, обеспечивая тем самым упорядоченность данных.
Является ли Zookeeper строго согласованным?
Официально определено как упорядоченная согласованность.
Не обеспечивает строгой согласованности, почему?
Потому что лидер отправляет сообщения о фиксации всем последователям и наблюдателям после отправки, они не фиксируют данные одновременно. Например, из-за сетевых проблем разные узлы получают сообщения о фиксации позже, поэтому фиксация происходит позже, и данные могут быть несогласованными на короткое время. Однако после короткого периода времени все узлы фиксируют данные, и данные остаются согласованными. Кроме того, Zookeeper поддерживает строгую согласованность, вручную вызывая метод sync для обеспечения фиксации всех узлов.
Здесь возникает вопрос: если какой-либо узел не может зафиксировать данные, будет ли лидер пытаться повторить попытку? Как обеспечить согласованность данных?
Проблема потери данных при отказе лидера
Первый случай: предположим, что лидер уже записал данные на диск, но ещё не отправил предложение последователям, и в этот момент происходит сбой лидера. Тогда необходимо выбрать нового лидера. Новый лидер отправит предложение, и zxid будет иметь следующее правило увеличения:
Когда старый лидер восстанавливается, он становится последователем, и когда новый лидер отправляет последнее предложение, обнаруживается, что zxid предложения на старом лидере меньше, чем у нового лидера, поэтому старое предложение отбрасывается.
Второй случай: если лидер успешно отправил сообщение о фиксации последователям, но некоторые или все последователи ещё не зафиксировали данные, то есть не загрузили данные с диска в память, в это время происходит сбой лидера.
Затем необходимо выбрать последователя с наибольшим zxid из журнала дисков. Если zxid одинаковы, выберите узел с большим идентификатором узла в качестве лидера. Client (клиент): инициатор запроса.
Каждый сервер в процессе работы может находиться в одном из трёх состояний:
В кластере Zookeeper есть три роли:
Модель данных Zookeeper имеет следующие характеристики:
У сервера есть четыре состояния: LOOKING, FOLLOWING, LEADING и OBSERVING.
Существует три режима работы Zookeeper: одиночный, псевдокластерный и кластерный.
Во время инициализации кластера Zookeeper серверы (myid = 1–5) запускаются один за другим, начиная процесс выбора лидера:
В работающем кластере Zookeeper пять серверов (myid = 1–5), и внезапно лидер (сервер 3) выходит из строя. Начинается новый раунд выборов лидера:
— Если эпох и zxid равны, то выбирается сервер с наибольшим значением myid (конфигурация zoo.cfg).
Фаза восстановления: на этом этапе Follower отправляет свой lastZxid лидеру, который на основе lastZxid определяет, как синхронизировать данные. Реализация этого этапа отличается от фазы 2: если Follower получает команду TRUNC, он прекращает обработку предложений после L.lastCommittedZxid; если он получает команду DIFF, то принимает новые предложения.
Широковещательная фаза: в настоящее время отсутствует.
При первоначальном запуске кластера выбирается машина с наименьшим zxid в качестве лидера.
Когда лидер выходит из строя или теряет большинство последователей, ZK переходит в режим восстановления, чтобы выбрать нового лидера и восстановить все серверы до правильного состояния. Алгоритм выбора лидера в ZK использует протокол ZAB:
Анализ: чтобы лидер получил поддержку большинства серверов, общее количество серверов должно быть нечётным числом 2n + 1, и число живых серверов не должно быть меньше n + 1. Поскольку требуется более половины живых серверов для работы, надёжность кластера, предоставляемого 2n машинами, фактически такая же, как и у кластера, предоставленного 2n - 1 машинами.
Шаг 1: установка и развёртывание
# Скачать и распаковать
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
tar -zxvf zookeeper-3.4.9.tar.gz
# Установить глобальные переменные
vim ~/.bash_profile
# Добавить последнюю строку
export ZOOKEEPER_HOME=/home/zookeeper/zookeeper-3.4.9
export PATH=$ZOOKEEPER_HOME/bin:$PATH
# Сделать изменения активными
source ~/.bash_profile
# Скопировать конфигурационный файл
cp /home/zookeeper/zookeeper-3.4.9/conf/zoo_sample.cfg /home/zookeeper/zookeeper-3.4.9/conf/zoo.cfg
Шаг 2: настройка информации
# Интервал сердцебиения
tickTime=2000
# Каталог сохранения данных
dataDir=/home/zookeeper/zookeeper-3.4.9/dataDir
# Каталог сохранения журналов
dataLogDir=/home/zookeeper/zookeeper-3.4.9/dataLogDir
# Порт подключения клиентов к Zookeeper
clientPort=2181
# Максимальное время соединения для последователей при инициализации соединения (если превышено, соединение не устанавливается)
initLimit=5
# Представляет максимальное время передачи сообщения между лидером и последователем при отправке и получении сообщений
syncLimit=2
Шаг 1: установка и развёртывание
Установка аналогична одиночному режиму.
Шаг 2: настройка информации
Создайте файл myid в корневом каталоге dataDir и запишите значение (например, 1, 2, 3 и т. д.) в файл.
В режиме одиночной конфигурации добавьте следующие параметры:
# 1 представляет текущий идентификатор кластера
echo "1" > myid
Для серверов в кластере настройте следующие параметры:
# Формат: server.X=A:B:C
# X представляет myid (диапазон 1–255)
# A — это IP-адрес сервера
# B — порт, используемый этим сервером для обмена сообщениями с лидером кластера
# C — порт, используемый для выборов лидера
server.1=10.24.1.62:2888:3888
server.2=10.24.1.63:2888:3888
server.3=10.24.1.64:2888:3888
# Запустить ZK-сервер
./zkServer.sh start
# Подключиться к указанному серверу с помощью ZK Client
./zkCli.sh -server 127.0.0.1:2181
# Проверить статус ZK-сервера
./zkServer.sh status
# Остановить ZK-сервис
./zkServer.sh stop
# Перезапустить ZK-сервис
./zkServer.sh restart
# Порт, к которому подключаются клиенты сервера
clientPort=2181
# Каталог, в котором хранятся снимки данных
dataDir=/User/lry/zookeeper/data
# Минимальный временной интервал в ZK
tickTime=5000
# Каталог журналов транзакций
dataLogDir=/User/lry/zookeeper/datalog
# Максимальный размер очереди запросов для каждого сервера
globalOutstandingLimit=1000
# Размер каждого файла журнала транзакций по умолчанию
preAllocSize=64
# Количество файлов журнала транзакций, которые будут созданы перед созданием снимка
snapCount=100000
# Ограничение количества подключений для одного клиента на одном IP-адресе
maxClientCnxns=60
# Для многосетевых машин можно указать разные порты прослушивания для каждого IP
clientPortAddress=10.24.22.56
# Время ожидания сеанса, если клиент устанавливает время ожидания вне этого диапазона, оно будет принудительно установлено на максимальное или минимальное время
minSessionTimeoutmaxSessionTimeout
# Последователь при запуске синхронизирует все последние данные с лидером, а затем определяет своё начальное состояние готовности к обслуживанию внешних запросов. Лидер позволяет F завершить эту работу в течение времени initLimit.
initLimit
# Во время работы лидер отвечает за связь со всеми машинами в кластере, например, через механизм пульса, для определения состояния активности машины. Если L отправляет пульс и не получает ответ от F в течение времени syncLimit, считается, что F не в сети.
syncLimit
# По умолчанию лидер принимает клиентские подключения и предоставляет обычные услуги чтения и записи. Однако, если вы хотите, чтобы лидер сосредоточился на координации кластера, вы можете установить этот параметр на no, что значительно повысит производительность операций записи
leaderServes=yes
# server.[myid]=[hostname]:[порт для синхронизации и других коммуникаций]:[порт для голосования]
server.x=[hostname]:nnnnn[:nnnnn]
# Группировка и настройка веса для машин
group.x=nnnnn[:nnnnn]weight.x=nnnnn
# Время ожидания соединения при открытии во время выборов лидера, по умолчанию 5 секунд
cnxTimeout
# Максимальный объём данных для каждой ноды, по умолчанию 1M
jute.maxbuffer
``` **Режим работы EventLoop**
На рисунке выше представлен универсальный режим работы EventLoop. Когда происходит событие, приложение помещает его в очередь событий, а затем EventLoop опрашивает очередь и выполняет события или распределяет их соответствующим слушателям событий.
Существует несколько способов выполнения событий: немедленное выполнение, отложенное выполнение и периодическое выполнение.
**Принцип NioEventLoop**
В Netty EventLoop можно рассматривать как механизм обработки событий реакторной модели потоков. Каждый поток EventLoop поддерживает селектор выбора и очередь задач taskQueue. Он отвечает за обработку событий ввода-вывода, обычных задач и периодических задач. В Netty рекомендуется использовать NioEventLoop в качестве реализации. Рассмотрим исходный код ключевого метода run() NioEventLoop:
```java
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 轮询 I/O 事件
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 处理 I/O 事件
processSelectedKeys();
} finally {
// 处理所有任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
// 处理 I/O событий
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
// Обработка завершена, переходим к обработке асинхронной очереди задач
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
Структура исходного кода ясна: каждый цикл обработки NioEventLoop включает в себя этапы опроса событий select, обработки событий processSelectedKeys и обработки задач runAllTasks. Это типичный механизм работы реактора. Кроме того, Netty предоставляет параметр ioRatio для настройки соотношения времени обработки событий ввода-вывода и времени обработки задач.
Далее мы подробно рассмотрим принципы реализации Netty EventLoop, сосредоточившись на обработке событий и обработке задач.
Механизм обработки событий
С учётом общей архитектуры Netty, давайте посмотрим на диаграмму потока событий EventLoop для лучшего понимания дизайна Netty EventLoop. Механизм обработки событий NioEventLoop использует безблокировочную последовательную конструкцию:
BossEventLoopGroup отвечает за прослушивание клиентских событий Accept. При возникновении события оно регистрируется в WorkerEventLoopGroup. Для каждого нового канала выбирается только один NioEventLoop для привязки. Таким образом, обработка всех событий жизненного цикла канала является независимой от потока, и разные потоки NioEventLoop не пересекаются друг с другом.
ChannelPipeline также является потокобезопасным, и данные передаются первому обработчику ChannelHandler в ChannelPipeline. После обработки данных они передаются следующему обработчику, и весь процесс выполняется последовательно, без переключения контекста потока.
Безблокировочный последовательный дизайн не только максимизирует пропускную способность системы, но и снижает сложность разработки бизнес-логики пользователями, избавляя их от необходимости уделять слишком много внимания проблемам безопасности потоков. Хотя однопоточное выполнение предотвращает переключение контекста, оно может привести к блокировке длительных операций ввода-вывода. Если произойдёт блокировка одного события ввода-вывода, все последующие события ввода-вывода не смогут быть обработаны, что может вызвать накопление событий. Разрабатывая программы с использованием Netty, необходимо учитывать риски, связанные с реализацией ChannelHandler.
Важна надёжность потока NioEventLoop, поскольку его блокировка или зависание могут привести к сбою всей системы. В JDK реализация Epoll имеет уязвимость, которая позволяет пробуждать поток NIO даже при пустом списке событий, вызывая 100% загрузку ЦП. Это известная проблема JDK epoll с пустым опросом. Netty как высокопроизводительная и надёжная сетевая структура должна обеспечивать безопасность потоков ввода-вывода. Как она решает проблему JDK epoll? На самом деле Netty не решает эту проблему коренным образом, а просто обходит её.
Оставляя в стороне другие детали, давайте сосредоточимся на коде в методе select() события опроса, чтобы понять, как Netty решает проблему пустого опроса JDK epoll:
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
Netty предоставляет механизм обнаружения потенциального зависания потока. Реализация этого механизма следующая:
② Методы обратного вызова событий ChannelOutboundHandler и их триггеры
Методы обратного вызова событий в ChannelOutboundHandler чётко определены, и каждый тип операции имеет соответствующий метод обратного вызова, который можно увидеть непосредственно в списке интерфейсов ChannelOutboundHandler. Здесь каждый метод обратного вызова срабатывает перед соответствующей операцией. Мы не будем вдаваться в подробности. Кроме того, большинство интерфейсов в ChannelOutboundHandler содержат параметр ChannelPromise, чтобы можно было своевременно получать уведомления после завершения операции.
Вышеупомянутый ChannelPipeline можно разделить на входящие обработчики ChannelInboundHandler и исходящие обработчики ChannelOutboundHandler. Соответствующие типы событий можно разделить на входящие события и исходящие события.
Входящие события: направление распространения — Head->Tail, то есть распространение происходит в прямом порядке (A→B→C).
Исходящие события: направление распространения — Tail->Head, то есть распространение происходит в обратном порядке (C→B→A).
Пример кода для изучения механизма распространения событий ChannelPipeline:
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new SampleInBoundHandler("SampleInBoundHandlerA", false))
.addLast(new SampleInBoundHandler("SampleInBoundHandlerB", false))
.addLast(new SampleInBoundHandler("SampleInBoundHandlerC", true));
ch.pipeline()
.addLast(new SampleOutBoundHandler("SampleOutBoundHandlerA"))
.addLast(new SampleOutBoundHandler("SampleOutBoundHandlerB"))
.addLast(new SampleOutBoundHandler("SampleOutBoundHandlerC"));
}
}
Результат выполнения:
Реализация распространения событий в ChannelPipeline использует классический режим цепочки ответственности, где вызовы связаны друг с другом. Что произойдёт, если в одном узле возникнет логическая ошибка? ctx.fireExceptionCaugh передаст исключение в порядке от Head к Tail. Если пользователь не обрабатывает исключение, оно будет обработано Tail.

Рекомендуется, чтобы пользователь настраивал обработчик исключений. Пример кода:
public class ExceptionHandler extends ChannelDuplexHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof RuntimeException) {
System.out.println("Handle Business Exception Success.");
}
}
}
После добавления унифицированного обработчика исключений можно видеть, что исключения были изящно перехвачены и обработаны. Это также рекомендуемая практика обработки исключений Netty.

Временное колесо фактически представляет собой разновидность кольцевой структуры данных, которую можно представить как часы, разделённые на множество сегментов, каждый сегмент представляет определённый период времени. Цепочка задач планирования хранится в каждом сегменте, а указатель перемещается по сегментам с течением времени, выполняя все задачи, срок выполнения которых истёк в соответствующем сегменте. Задачи распределяются по сегментам в зависимости от времени по модулю.
В управлении тысячами сетевых подключений каждое подключение имеет задачу тайм-аута, запуск отдельного таймера для каждой задачи может привести к использованию большого количества ресурсов. Для решения этой проблемы можно использовать инструмент Netty HashedWheelTimer.
Инструмент Netty HashedWheelTimer предоставляет приблизительную реализацию таймера, поскольку это временное колесо не выполняет задачи точно в срок, а выполняет их через определённые промежутки времени после каждого интервала времени и выполняет все задачи, которые должны были быть выполнены до этого интервала времени.
Конечно, точность выполнения конкретной задачи таймера можно настроить, отрегулировав размер временного интервала в конструкторе HashedWheelTimer, но в большинстве сетевых приложений из-за существования задержки ввода-вывода не требуется строгое соблюдение точности времени выполнения, поэтому интервал времени по умолчанию 100 мс может удовлетворить большинство ситуаций, и нет необходимости тратить дополнительные усилия на настройку точности времени.
Особенности HashedWheelTimer
— Из анализа исходного кода видно, что фактическая точность времени HashedWheelTimer не очень высока, погрешность может составлять около 100 миллисекунд, и если количество ожидающих задач в очереди задач слишком велико, погрешность может быть больше.
— Однако HashedWheelTimer может обрабатывать большое количество задач таймера, и для определения списка кандидатов на обработку задач требуется только O(1) времени, в то время как для Timer и других требуется корректировка кучи, которая является временной сложностью O(logN).
— HashedWheelTimer по сути имитирует колесо времени, разбивая большое количество задач на небольшие списки задач, эффективно экономя ресурсы ЦП и потоков.
Анализ исходного кода
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit,
int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {
......
}
— threadFactory
: пользовательская фабрика потоков, используемая для создания объектов потока.
— tickDuration
: сколько времени проходит до перехода к следующему сегменту (эквивалентно перемещению часов на одну позицию).
— unit
: определяет единицу измерения tickDuration.
— ticksPerWheel
: количество сегментов в колесе.
— leakDetection
: следует ли включать обнаружение утечки памяти.
— maxPendingTimeouts
: максимальное количество задач, ожидающих выполнения. 0 или отрицательное число означает отсутствие ограничений.
Преимущества и недостатки
Варианты задач таймера
Вот некоторые из основных вариантов задач таймера:
— Timer. — ScheduledExecutorService. — ThreadPoolTaskScheduler (на основе ScheduledExecutorService). — Netty schedule (использует PriorityQueue). — Netty HashedWheelTimer (временное колесо). — Kafka TimingWheel (иерархическое временное колесо).
Пример использования:
// Создание экземпляра Timer
Timer timer = new
Примечание: в тексте запроса присутствуют фрагменты кода на языке Java, однако в ответе они не представлены. Также в запросе присутствуют ссылки на изображения, которые не были включены в ответ. ```
HashedWheelTimer();
// 提交一个任务,让它在 5s 后执行
Timeout timeout1 = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
System.out.println("5s 后执行该任务");
}
}, 5, TimeUnit.SECONDS);
// 再提交一个任务,让它在 10s 后执行
Timeout timeout2 = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) {
System.out.println("10s 后执行该任务");
}
}, 10, TimeUnit.SECONDS);
// 取消掉那个 5s 后执行的任务
if (!timeout1.isExpired()) {
timeout1.cancel();
}
// 原来那个 5s 后执行的任务,已经取消了。这里我们反悔了,我们要让这个任务在 3s 后执行
// 我们说过 timeout 持有上、下层的实例,所以下面的 timer 也可以写成 timeout1.timer()
timer.newTimeout(timeout1.task(), 3, TimeUnit.SECONDS);
Без блокировки очереди MPSC queue
FastThreadLocal
ByteBuf
Протокол кодирования
Модуль Netty-codec в основном отвечает за кодирование и декодирование работы, реализуя взаимное преобразование между исходными байтовыми данными и бизнес-объектами. Netty поддерживает большинство основных протоколов отрасли, таких как HTTP, HTTP2, Redis и XML, что экономит разработчикам много усилий. Кроме того, этот модуль предоставляет абстрактные классы кодирования и декодирования ByteToMessageDecoder и MessageToByteEncoder, которые можно легко реализовать с помощью наследования этих двух классов.
Декодирование и кодирование пакетов
Основной процесс
Процесс запуска сервера
Процесс запуска сервера Netty в целом делится на три этапа:
Настройка пула потоков:
EventLoopGroup group = new NioEventLoopGroup(1); ServerBootstrap b = new ServerBootstrap(); b.group(group);
* Многопоточный режим: модель Reactor с одним потоком имеет серьёзные узкие места в производительности, поэтому появилась модель Reactor с несколькими потоками. В Netty использование модели Reactor с несколькими потоками аналогично использованию модели с одним потоком. Разница в том, что NioEventLoopGroup может быть запущен без параметров, и он автоматически запустит количество потоков, равное удвоенному количеству ядер процессора. Конечно, вы также можете настроить фиксированное количество потоков самостоятельно.
```java
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(group);
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
Инициализация канала:
// Клиентский канал b.channel(NioSocketChannel.class); b.channel(OioSocketChannel.class);
// Серверный канал b.channel(NioServerSocketChannel.class); b.channel(OioServerSocketChannel.class); b.channel(EpollServerSocketChannel.class);
// UDP b.channel(NioDatagramChannel.class); b.channel(OioDatagramChannel.class);
* Зарегистрируйте обработчик канала:
ServerBootstrap требует регистрации обработчика канала через метод childHandler(). ChannelInitializer — это анонимный класс, который реализует интерфейс ChannelHandler и используется в качестве параметра ServerBootstrap.
```java
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
// HTTP кодировщик
.addLast("codec", new HttpServerCodec())
// Сжатие HTTP-контента
.addLast("compressor", new HttpContentCompressor())
// Агрегатор HTTP-сообщений
.addLast("aggregator", new HttpObjectAggregator(65536))
// Пользовательская логика обработки
.addLast("handler", new HttpServerHandler());
}
});
Методы option и childOption в ServerBootstrap используются для установки свойств Boss и Worker соответственно.
b.option(ChannelOption.SO_KEEPALIVE, true);
``` Ниже представлен перевод текста на русский язык:
#### **Рисунок показывает операцию get() API. Для операций set(), read() и write() можно обратиться к книге или API.**

### **Дополнительные операции**

Два следующих метода имеют довольно сложную для понимания логику работы, поэтому ниже представлено их объяснение:
* **hasArray()**: если ByteBuf поддерживается массивом байтов, то возвращает true. Другими словами, если ByteBuf находится в режиме кучи, это означает, что его внутренняя память поддерживается массивом байтов.
* **array()**: если ByteBuf поддерживается массивом байтов, возвращает массив, иначе выбрасывает исключение UnsupportedOperationException. То есть, если ByteBuf находится в режиме кучи, он поддерживается массивом.
### **Распределение ByteBuf**
Существует несколько способов создания и управления экземплярами ByteBuf: **последовательное распределение (ByteBufAllocator)**, **Unpooled буфер** и **класс ByteBufUtil**.
#### Последовательное распределение: интерфейс ByteBufAllocator
Netty реализует (ByteBuf) пул через интерфейс ByteBufAllocator. Netty предоставляет пул и непулы ByteBufAllocator:
* `ctx.channel().alloc().buffer()` — по сути, это ByteBufAllocator.DEFAULT.
* `ByteBufAllocator.DEFAULT.buffer()` — возвращает буфер Bytebuf, основанный на куче или прямой памяти. По умолчанию используется куча.
* `ByteBufAllocator.DEFAULT` — существует два типа: UnpooledByteBufAllocator.DEFAULT (непулы) и PooledByteBufAllocator.DEFAULT (пул). Для Java по умолчанию используется PooledByteBufAllocator (пул), а для Android — UnpooledByteBufAllocator (непулы).
* Можно использовать BootStrap для предоставления независимого экземпляра ByteBufAllocator для каждого канала.

Объяснение:
* Метод buffer() на верхнем рисунке возвращает буфер ByteBuf, основанный на куче или прямой памяти, по умолчанию — куча. Исходный код: AbstractByteBufAllocator() { this(false); }
* ByteBufAllocator.DEFAULT — может быть пулом или непулом. По умолчанию это пул (PooledByteBufAllocator.DEFAULT).
#### Unpooled буфер — непулы
Unpooled предоставляет статические вспомогательные методы для создания непулов ByteBuf.

Примечание:
* метод buffer() на верхней картинке возвращает непуловой буфер ByteBuf на основе кучи.
* wrappedBuffer() — создаёт представление и возвращает ByteBuf с упакованными данными. Очень полезно.
Создание ByteBuf кода:
```java
public void createByteBuf(ChannelHandlerContext ctx) {
// 1. Создание ByteBuf через Channel
ByteBuf buf1 = ctx.channel().alloc().buffer();
// 2. Создание через ByteBufAllocator.DEFAULT
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
// 3. Создание через Unpooled
ByteBuf buf3 = Unpooled.buffer();
}
Класс ByteBufUtil предоставляет статические вспомогательные методы для работы с ByteBuf: hexdump() и equals().
В версии Netty4.0 для ByteBuf и ByteBufHolder была введена технология счётчика ссылок. Следует различать счётчик ссылок и алгоритм достижимости (алгоритм сборки мусора JVM).
public static void releaseReferenceCountedObject(){
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// Увеличиваем счётчик ссылок на 1
buffer.retain();
// Выводим счётчик ссылок
buffer.refCnt();
// Уменьшаем счётчик ссылок на 1
buffer.release();
}
Zero-copy в Netty проявляется в нескольких аспектах:
Метод slice
без параметров эквивалентен вызову buf.slice(buf.readerIndex(), buf.readableBytes())
, который возвращает срез читаемой части buf. В то же время метод slice (int index, int length)
более гибкий. С его помощью можно задать различные параметры для получения срезов из разных областей buf.
Процесс создания header и body с использованием метода slice
не включает операцию копирования. Фактически header и body являются частями byteBuf, которые совместно используют его пространство хранения:
Использование FileRegion для реализации нулевого копирования
В Netty FileRegion используется для реализации нулевой копии при передаче файлов. Однако на нижнем уровне FileRegion зависит от функции нулевого копирования FileChannel.transfer
в Java NIO. После появления FileRegion можно напрямую записывать содержимое файла в Channel, не копируя его сначала во временный буфер, а затем записывая буфер в Channel. Такая операция нулевого копирования несомненно полезна при передаче больших файлов.
Традиционный поток ввода-вывода
Общий поток нулевого копирования
TCP-сцепка и распаковка
Сцепка и распаковка — это проблемы, связанные с передачей данных по протоколу TCP.
Предположим, что клиент отправляет два пакета данных D1 и D2 на сервер. Поскольку сервер может считывать разное количество байтов за один раз, возможны следующие ситуации:
— Сервер считывает два отдельных пакета, D1 и D2, без сцепки и распаковки. — Сервер получает два сцепленных пакета, D1 и D2. — Сервер считывает часть содержимого пакетов D1 и D2 за одно чтение, а оставшуюся часть D2 — за следующее чтение. — Сервер считывает частично пакет D1 за первое чтение и полностью пакет D2 за второе чтение. — Если TCP-кэш ещё меньше, пакеты D1 и D2 будут разделены на несколько частей для отправки.
Основная причина возникновения сцепки и распаковки заключается в том, что операционная система имеет буферный пул при отправке данных TCP. Например, размер пула составляет 1024 байта:
— Если объём отправляемых данных невелик, TCP объединяет несколько запросов в один и отправляет их. Это приводит к сцепке. — Если объём данных большой и превышает размер пула, TCP разделяет данные на несколько пакетов для отправки, что приводит к распаковке.
Решения
Для ситуаций, когда используются пакеты фиксированной длины, можно использовать FixedLengthFrameDecoder
. Этот декодер считывает сообщения фиксированной длины. Если текущее сообщение недостаточно длинное, он ждёт следующего сообщения для завершения. Использование этого декодера довольно просто: нужно только указать длину каждого сообщения в конструкторе.
Для обработки данных с разделителями Netty предоставляет два декодера: LineBasedFrameDecoder
и DelimiterBasedFrameDecoder
. Первый использует символы новой строки (\n
или \r\n
) для обработки данных, а второй — пользовательский разделитель.
Основной принцип решения проблемы сцепки и распаковки — добавление поля длины к данным пакета. Для этого используются LengthFieldBasedFrameDecoder
для декодирования данных и LengthFieldPrepender
для добавления поля длины перед отправкой данных. Обработка запросов данных от клиентов и ответ на них
ch.pipeline().addLast(new EchoServerHandler());
}
#### Создание собственного упаковщика и распаковщика пакетов
Можно реализовать `MessageToByteEncoder` и `ByteToMessageDecoder` для создания собственных упаковщиков и распаковщиков пакетов.
- `MessageToByteEncoder`: отвечает за кодирование ответа в объект ByteBuf.
- `ByteToMessageDecoder`: преобразует полученный объект ByteBuf в нужный формат данных.
## Высокая производительность
### Архитектура ввода-вывода
* **Синхронный неблокирующий ввод-вывод**: использует минимум ресурсов для выполнения максимального количества задач.
* **Нулевое копирование памяти**: минимизирует ненужные копии памяти, обеспечивая более эффективную передачу данных.
* **Пул памяти**: позволяет повторно использовать выделенную память, особенно прямую (heap). Реализуется с помощью бинарного дерева поиска для управления распределением памяти.
* **Последовательная обработка чтения и записи**: избегает использования блокировок, которые могут привести к снижению производительности.
* **Высокопроизводительные протоколы сериализации**: поддерживает протоколы, такие как protobuf, для быстрой сериализации данных.
### Оптимизация операционной системы
#### Настройка параметров файловых дескрипторов
* **Максимальное количество открытых файлов**: можно изменить с помощью команды `cat /proc/sys/fs/file-max`.
* **Количество открытых файлов для одного процесса**: по умолчанию 1024, можно настроить с помощью `ulimit -a`.
#### Оптимизация TCP/IP параметров
Необходимо уделить внимание следующим параметрам:
* net.ipv4.tcp_rmem: размер буфера для каждого соединения.
#### Многосетевые очереди и мягкие прерывания
* **Буферы TCP**: настройка параметров SO_SNDBUF и SO_RCVBUF для оптимального размера буферов отправки и получения.
* **Мягкие прерывания**: использование RPS (Receive Packet Steering) в ядре Linux ≥2.6.35 для улучшения производительности сетевого взаимодействия. RPS распределяет обработку мягких прерываний между несколькими процессорами, основываясь на информации о пакетах.
## Оптимизация производительности Netty
### Настройка количества потоков
**Оптимизация главного пула потоков**:
Для серверов Netty обычно достаточно одного потока для прослушивания порта. Однако при большом количестве устройств, подключающихся одновременно, рекомендуется использовать несколько портов и главный пул потоков (Reactor). Это позволит обрабатывать подключения параллельно, ускоряя процесс подключения и снижая вероятность ошибок.
**Настройка рабочих потоков (I/O)**:
Сначала можно использовать значение по умолчанию, равное количеству ядер процессора * 2. Затем провести тестирование производительности и проанализировать загрузку CPU для рабочих потоков. Если они перегружены, следует увеличить количество потоков.
### Оптимизация сердцебиения
Цель сердцебиения — убедиться в доступности канала связи и способности другой стороны принимать и отправлять сообщения. Для этого необходимо периодически проверять состояние канала.
В случае большого количества подключений важно своевременно обнаруживать неработающие соединения и удалять их из списка. Также важно выбрать оптимальный интервал сердцебиения, чтобы избежать перегрузки системы и частых пауз в работе приложения.
Сердцембиение может быть реализовано на разных уровнях:
* TCP Keep-Alive: работает на уровне TCP.
* Протокольный уровень: используется в протоколах с длительным соединением, таких как MQTT.
* Уровень приложения: реализуется через периодическую отправку сообщений о состоянии.
Существует два типа сердцебиения:
* Ping-Pong: одна сторона отправляет сообщение Ping, а другая отвечает сообщением Pong.
* Ping-Ping: обе стороны отправляют сообщения Ping без ожидания ответа.
Стратегии обнаружения сбоев:
* Истечение времени ожидания: если после нескольких попыток не получено ответное сообщение, считается, что канал не работает.
* Сбой при чтении или отправке: если во время обмена сообщениями возникают ошибки ввода-вывода, это также указывает на сбой канала. **Оптимизация сердцебиения**
- Для серверов с миллионами пользователей обычно не рекомендуется использовать слишком длинные периоды сердцебиения и тайм-ауты.
- Период сердцебиения обычно не должен превышать 60 секунд, а тайм-аут сердцебиения обычно равен удвоенному периоду сердцебиения.
- Рекомендуется использовать IdleStateHandler для реализации сердцебиения, вместо создания собственных задач пула потоков, что может увеличить нагрузку на систему и привести к проблемам с параллельной безопасностью.
- В случае тайм-аута или сбоя сердцебиения необходимо закрыть канал, чтобы клиент мог повторно подключиться.
- После срабатывания события IdleStateEvent пользователь может подписаться на это событие и выполнить собственную логику обработки, такую как закрытие канала, повторное подключение клиента, оповещение и ведение журнала.
**Оптимизация приёма и отправки буферов**
Для длинных каналов каждый канал должен поддерживать свои собственные буферы приёма и отправки. В JDK NIO используется класс ByteBuffer, который представляет собой фиксированный массив байтов, не допускающий динамического расширения.
Пример: если максимальное сообщение составляет 10 КБ, а среднее — 5 КБ, то для обработки 10 КБ сообщений ByteBuffer должен быть установлен на 10 КБ. Это означает, что каждый канал будет занимать дополнительные 5 КБ памяти. Если количество каналов составляет 1 миллион, каждый канал будет иметь свой собственный ByteBuffer для приёма, что приведёт к дополнительным 4882 МБ памяти.
Netty предоставляет ByteBuf, который поддерживает динамическое изменение размера. Он также предоставляет два распределителя буфера приёма:
* FixedRecvByteBufAllocator: Распределитель буфера приёма фиксированного размера, который не изменяет размер в зависимости от размера данных. Однако он может расширяться при необходимости.
* AdaptiveRecvByteBufAllocator: Распределитель буфера приёма с динамическим изменением размера, который адаптируется к размеру данных и может сжимать буфер при необходимости.
Рекомендуется использовать AdaptiveRecvByteBufAllocator, который можно указать при создании клиента или сервера.
Примечание: независимо от того, используется ли буфер приёма или отправки, рекомендуется устанавливать его размер равным среднему размеру сообщения, а не максимальному размеру, чтобы избежать ненужного использования памяти.
**Эффективное использование пулов памяти**
Каждый поток NioEventLoop обрабатывает N каналов. Процесс обработки канала включает в себя начало обработки канала A, создание буфера приёма (создание ByteBuf), декодирование сообщения, преобразование в объект POJO, отправку в фоновый поток в виде задачи, освобождение буфера приёма и переход к обработке канала B.
При использовании пула памяти, когда канал A получает новые данные, он запрашивает свободный ByteBuf из пула памяти NioEventLoop. После декодирования сообщения ByteBuf освобождается обратно в пул памяти. Использование пула памяти позволяет сократить количество запросов и операций GC с 15625 до 0 (предполагается, что всегда есть доступный ByteBuf).
По умолчанию Netty не использует пулы памяти, но их можно настроить при создании клиента или сервера.
Использование пула памяти требует, чтобы выделение и освобождение памяти происходило попарно, иначе это может привести к утечке памяти. Также важно отметить, что после завершения работы с ByteBuf необходимо явно вызвать ReferenceCountUtil.release(msg) для освобождения буфера приёма, иначе он будет считаться используемым, что также может привести к утечкам памяти.
**Предотвращение непреднамеренной блокировки I/O потока**
Обычно не следует выполнять длительные операции, такие как доступ к базе данных или вызов сторонних сервисов, в потоках ввода-вывода Netty. Однако некоторые скрытые блокирующие операции могут быть упущены, например, запись журналов.
В производственной среде обычно требуется регистрировать вызовы интерфейсов в режиме реального времени, а другие журналы имеют уровень ERROR. При возникновении проблем с вводом-выводом в журнале ошибок будет записано сообщение об ошибке. Если диск имеет высокую задержку записи, это может вызвать блокировку операции записи журнала, время которой невозможно предсказать, что, в свою очередь, может заблокировать поток ввода-вывода Netty и помешать обработке других каналов.
Хотя log4j поддерживает асинхронную запись журналов (AsyncAppender), при заполнении очереди журналов она может блокироваться синхронно, пока в очереди не появится свободное место.
Эти проблемы могут быть трудно обнаружить в тестовой среде, поскольку они часто возникают случайно и на короткое время.
**Разделение I/O потоков и бизнес-потоков**
Если сервер выполняет только простые операции с памятью и пересылку сообщений, можно увеличить размер пула рабочих потоков NioEventLoop и выполнять обработку бизнес-каналов непосредственно в потоке ввода-вывода, что позволит избежать переключения контекста и повысить производительность.
Однако если бизнес-логика сложна, рекомендуется разделить потоки ввода-вывода и бизнес-потока. Для потоков ввода-вывода можно создать большой пул потоков NioEventLoopGroup, где все каналы совместно используют один пул. Для бизнес-потоков можно создать несколько небольших пулов потоков, которые можно связать с потоками ввода-вывода. Это уменьшит конкуренцию за блокировки и улучшит производительность обработки.
**Управление количеством одновременных подключений на сервере**
Независимо от оптимизации производительности сервера, необходимо учитывать управление подключениями. Когда ресурсы становятся узким местом или происходит массовое подключение устройств, необходимо обеспечить защиту системы с помощью управления подключениями. Netty в основном фокусируется на управлении количеством одновременных подключений. Баг, также возможно баг бизнес-слоя, требуется конкретный анализ конкретной проблемы.
- **Некорректное закрытие сокета**: например, I/O поток блокируется неожиданно, или доля пользовательских задач в выполнении I/O потока слишком высока, что приводит к несвоевременной обработке I/O операций и невозможности своевременного освобождения канала.
**Решение:**
- **Избегайте обработки бизнес-логики (кроме отправки и приёма сердцебиений) в I/O потоках Netty (worker потоках).**
- **Будьте осторожны при выполнении пользовательских задач на I/O потоке.**
- **Используйте IdleStateHandler, ReadTimeoutHandler и WriteTimeoutHandler с осторожностью.**
# RabbitMQ
## Введение в режимы работы
На официальном сайте RabbitMQ представлены 6 рабочих режимов: простой режим, режим рабочей очереди, режим публикации/подписки, режим маршрутизации, режим темы и режим RPC. В этой статье мы рассмотрим только первые 5 режимов работы.
### Простой режим и режим рабочей очереди
Мы объединили эти два режима, потому что их принцип работы очень прост и состоит из трёх объектов: производителя, очереди и потребителя.
Производитель отвечает за создание сообщений и отправку их в очередь. Потребитель отслеживает очередь и обрабатывает сообщения, когда они появляются.
Когда есть несколько потребителей, они равномерно обрабатывают сообщения из очереди. Пример кода:
Производитель:
```java
//1. Получение соединения
Connection connection = ConnectionUtil.getConnection();
//2. Создание канала
Channel channel = connection.createChannel();
//3. Объявление очереди
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4. Отправка сообщения
channel.basicPublish("", QUEUE_NAME, null, "hello simple".getBytes());
System.out.println("Отправка прошла успешно");
//5. Освобождение соединения
channel.close();
connection.close();
Потребитель:
// 1. Получение соединения
Connection connection = ConnectionUtil.getConnection();
// 2. Создание канала
Channel channel = connection.createChannel();
// 3. Объявление очереди
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4. Мониторинг сообщений
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Получено: " + message);
}
});
Эти три режима используют обменник. Производитель не взаимодействует напрямую с очередью, а отправляет сообщения в обменник, который затем помещает сообщения в соответствующие очереди для обработки потребителями. Существует три основных типа обменников: разветвлённый, прямой и тематический. Принцип работы показан ниже:
Разветвлённый: не обрабатывает ключи маршрутизации. Нужно просто связать очередь с обменником. Сообщение, отправленное в обменник, будет перенаправлено во все связанные очереди. Это похоже на широковещательную рассылку в локальной сети, где каждое устройство в подсети получает копию сообщения. Разветвлённые обменники обеспечивают самую быструю пересылку сообщений.
Режим публикации/подписки использует разветвлённые обменники.
Прямой: обрабатывает ключи маршрутизации. Необходимо связать очередь с обменником и указать, что сообщение должно полностью соответствовать определённому ключу маршрутизации. Если очередь связана с обменником с требованием ключа маршрутизации «dog», то будут перенаправлены только сообщения с меткой «dog». Сообщения dog.puppy и dog.guard не будут перенаправляться, только dog.
Режим маршрутизации использует прямые обменники.
Тематический: сопоставляет ключи маршрутизации с определённым шаблоном. Здесь очередь должна быть связана с обменником по определённому шаблону. Символ «#» соответствует одному или нескольким словам, символ «*» — любому слову. Таким образом, «audit.#» может соответствовать «audit.irs.corporate», но «audit.*» будет соответствовать только «audit.irs».
Режим темы использует тематические обменники. Пример кода:
Производитель:
// 1. Получение соединения
Connection connection = ConnectionUtil.getConnection();
// 2. Создание канала
Channel channel = connection.createChannel();
// 3. Объявление обменника
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 4. Отправка сообщений
for (int i = 0; i < 100; i++) {
channel.basicPublish(EXCHANGE_NAME, "", null, ("hello ps" + i + "").getBytes());
}
System.out.println("Отправка прошла успешно");
// 5. Освобождение соединения
channel.close();
connection.close();
Несколько потребителей:
// 1. Получение соединения
Connection connection = ConnectionUtil.getConnection();
// 2. Создание канала
Channel channel = connection.createChannel();
// 3. Объявление обменника
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 4. Связывание очереди с обменником
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 5. Обработка сообщений
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("recv1:" + message);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
Apache Dubbo — это высокопроизводительный, лёгкий и открытый Java-фреймворк. Apache Dubbo предоставляет шесть основных функций: высокопроизводительные RPC-вызовы на основе интерфейсов, интеллектуальное обнаружение ошибок и балансировка нагрузки, автоматическое обнаружение и регистрация сервисов, высокая масштабируемость, управление потоком трафика во время выполнения и визуальное управление и обслуживание сервисов.
Высокопроизводительные RPC-вызовы на основе интерфейса
Обеспечивает высокую производительность на основе прокси-серверов для удалённых вызовов. Сервисы предоставляют интерфейсы, скрывая детали удалённого вызова от разработчиков. ExtensionLoader.getExtensionLoader(type.class).getExtension(name)来获取真正的实例来调用, посмотрите пример на официальном сайте.
public interface WheelMaker {
Wheel makeWheel(URL url);
}
// WheelMaker 接口的 адаптивная реализация класса
public class AdaptiveWheelMaker implements WheelMaker {
public Wheel makeWheel(URL url) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
// 1. Вызов url 的 getXXX метода для получения значения параметра
String wheelMakerName = url.getParameter("Wheel.maker");
if (wheelMakerName == null) {
throw new IllegalArgumentException("wheelMakerName == null");
}
// 2. Вызов ExtensionLoader для получения загрузчика
// 3. Вызов ExtensionLoader для получения расширения на основе имени, полученного из URL
WheelMaker wheelMaker = ExtensionLoader.getExtensionLoader(WheelMaker.class).getExtension(wheelMakerName);
// 4. Вызов конкретного метода реализации класса для выполнения вызова.
return wheelMaker.makeWheel(url);
}
}
Анализ аннотации Adaptive
Из исходного кода аннотации Adaptive можно увидеть, что эта аннотация может быть применена к классу или методу. Логика реализации аннотации Adaptive в классе и методе различна.
Если аннотация Adaptive применяется к классу, Dubbo не будет генерировать прокси-класс для этого класса. В Dubbo только два класса используют аннотацию Adaptive: AdaptiveCompiler и AdaptiveExtensionFactory. Это означает, что логика расширения загружается вручную, но это не является основным фокусом внимания.
Когда аннотация Adaptive используется в методе, Dubbo автоматически генерирует логику прокси для этого метода. Это означает, что логика загрузки расширения должна быть сгенерирована автоматически фреймворком. Реализация механизма следующая:
@SPI("apple")
public interface FruitGranter {
Fruit grant();
@Adaptive
String watering(URL url);
}
---
// Садовод яблок
public class AppleGranter implements FruitGranter {
@Override
public Fruit grant() {
return new Apple();
}
@Override
public String watering(URL url) {
System.out.println("watering apple");
return "watering finished";
}
}
---
// Банан садовод
public class BananaGranter implements FruitGranter {
@Override
public Fruit grant() {
return new Banana();
}
@Override
public String watering(URL url) {
System.out.println("watering banana");
return "watering success";
}
}
Реализация вызова:
public class ExtensionLoaderTest {
@Test
public void testGetExtensionLoader() {
// Сначала создаём фиктивный объект URL
URL url = URL.valueOf("dubbo://192.168.0.1:1412?fruit.granter=apple");
// Получаем объект FruitGranter через ExtensionLoader
FruitGranter granter = ExtensionLoader.getExtensionLoader(FruitGranter.class)
.getAdaptiveExtension();
// Используем этот объект FruitGranter для вызова его метода с адаптивной аннотацией и получаем результат вызова
String result = granter.watering(url);
System.out.println(result);
}
}
Этот метод генерирует внутренний класс.
Фреймворк Dubbo использует URL в качестве общей модели. Во время работы системы все данные о состоянии могут быть получены через URL. Например, информация о том, какие методы сериализации используются, какой протокол связи используется и как распределяется нагрузка, представлена в виде параметров URL. Поэтому во время работы фреймворка, когда требуется определённая информация, её можно получить из соответствующего ключа в списке параметров URL. Формат URL следующий:
protocol://username:password@host:port/path?k=v
Процесс раскрытия сервиса можно разделить на три части:
Раскрытие сервиса с точки зрения построения объектов можно разделить на две стадии:
В Dubbo инстанс, выполняющий функцию, называется invoker. И провайдер, и потребитель должны взаимодействовать с invoker. Через прокси-класс, созданный с использованием протокола, invoker упаковывается. Существует два способа внедрения сервиса:
Существует три способа использования сервиса:
Затем строится прокси, инкапсулируется возвращённая ссылка на сервис invoker, после этого вызывается этот класс прокси.
Способ вызова:
Перед вызовом вам, возможно, потребуется учесть следующее:
Последовательность вызовов:
Способы вызова:
Dubbo вводит модули Cluster, Directory, Router, LoadBalance, Invoker для обеспечения надёжности системы Dubbo. Отношения показаны на рисунке ниже:
Сервис обнаружения регистрирует несколько удалённых вызовов в Directory, а затем инкапсулирует их в invoker через Cluster. invoker обеспечивает отказоустойчивость.
При использовании потребителем он выбирает доступный invoker из Directory и инициирует вызов.
Вы можете думать о Cluster в Dubbo как о большой упаковке, которая включает в себя различные функции отказоустойчивости.
Отказоустойчивость кластера реализуется на стороне потребителя через подкласс Cluster. Существует 10 реализаций Cluster, каждый класс Cluster создаёт объект соответствующего ClusterInvoker. Основная идея заключается в том, чтобы позволить пользователю выборочно вызывать промежуточный уровень Cluster, скрывая конкретные детали реализации.
Кластер | Cluster Invoker | Функция |
---|---|---|
FailoverCluster | FailoverClusterInvoker | Автоматическое переключение при сбое, по умолчанию |
FailfastCluster | FailfastClusterInvoker | Один вызов, сбой вызывает исключение |
FailsafeCluster | FailsafeClusterInvoker | Запись журнала ошибок при возникновении ошибки |
FailbackCluster | FailbackClusterInvoker | Сбой возвращает null, повторная попытка 2 раза |
ForkingCluster | ForkingClusterInvoker | Одновременный вызов задачи, успех одного — успех |
BroadcastCluster | BroadcastClusterInvoker | Вызов каждого invoker, все доступны |
AvailableCluster | AvailableClusterInvoker | Используется доступный |
MergeableCluster | MergeableClusterInvoker | Объединяет результаты |
В Dubbo обычно существует четыре стратегии балансировки нагрузки.
Каталог услуг можно понимать как набор invokers с одинаковыми услугами. Ядром является класс RegistryDirectory. Он имеет три функции:
Маршрутизация услуг фактически является правилом маршрутизации, которое определяет, какие поставщики услуг могут быть вызваны потребителями услуг. Условие правила маршрутизации состоит из двух условий, которые используются для сопоставления потребителей услуг и поставщиков услуг соответственно. Например, существует такое правило:
host = 10.20.153.14 => host = 10.20.153.12
Это правило означает, что потребители услуг с IP-адресом 10.20.153.14 могут вызывать только поставщиков услуг с IP-адресом 10.20.153.12 и не могут вызывать других поставщиков услуг. Формат правил маршрутизации условий следующий:
[Условие соответствия потребителя услуг] => [Условие соответствия поставщика услуг]
Если условие соответствия потребителя услуг пусто, это означает, что ограничение на потребителя услуг не применяется. Если условие соответствия поставщика услуг пусто, это означает, что некоторые потребители услуг отключены. Зависимость:
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>${latest.version}</version>
</dependency>
Примечание: версия 0.2.x.RELEASE соответствует Spring Boot версии 2.x, а версия 0.1.x.RELEASE — Spring Boot версии 1.x.
nacos.config.server-addr=127.0.0.1:8848
@SpringBootApplication
@NacosPropertySource(dataId = "example", autoRefreshed = true)
public class NacosConfigApplication {
public static void main(String[] args) {
SpringApplication.run(NacosConfigApplication.class, args);
}
}
@Controller
@RequestMapping("config")
public class ConfigController {
@NacosValue(value = "${useLocalCache:false}", autoRefreshed = true)
private boolean useLocalCache;
@RequestMapping(value = "/get", method = GET)
@ResponseBody
public boolean get() {
return useLocalCache;
}
}
Пятый шаг: запустите NacosConfigApplication, вызовите curl http://localhost:8080/config/get и получите ответ false.
Шестой шаг: опубликуйте конфигурацию на сервере Nacos с помощью вызова Nacos Open API. Установите dataId как «example», а содержимое как «useLocalCache=true».
curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=example&group=DEFAULT_GROUP&content=useLocalCache=true"
В этом разделе показано, как запустить службу обнаружения Nacos в вашем проекте Spring Boot.
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-discovery-spring-boot-starter</artifactId>
<version>${latest.version}</version>
</dependency>
Примечание: версия 0.2.x.RELEASE соответствует Spring Boot версии 2.x, а версия 0.1.x.RELEASE — Spring Boot версии 1.x.
nacos.discovery.server-addr=127.0.0.1:8848
@Controller
@RequestMapping("discovery")
public class DiscoveryController {
@NacosInjected
private NamingService namingService;
@RequestMapping(value = "/get", method = GET)
@ResponseBody
public List<Instance> get(@RequestParam String serviceName) throws NacosException {
return namingService.getAllInstances(serviceName);
}
}
@SpringBootApplication
public class NacosDiscoveryApplication {
public static void main(String[] args) {
SpringApplication.run(NacosDiscoveryApplication.class, args);
}}
Четвертый шаг: запустите приложение NacosDiscoveryApplication и вызовите curl http://localhost:8080/discovery/get?serviceName=example. Ответ будет пустым массивом JSON [].
Пятый шаг: зарегистрируйте сервис с именем example на сервере Nacos, используя вызов Nacos Open API.
curl -X PUT 'http://127.0.0.1:8848/nacos/v1/ns/instance?serviceName=example&ip=127.0.0.1&port=8080'
[
{
"instanceId": "127.0.0.1-8080-DEFAULT-example",
"ip": "127.0.0.1",
"port": 8080,
"weight": 1.0,
"healthy": true,
"cluster": {
"serviceName": null,
"name": "",
"healthChecker": {
"type": "TCP"
},
"defaultPort": 80,
"defaultCheckPort": 80,
"useIPPort4Check": true,
"metadata": {}
},
"service": null,
"metadata": {}
}
]
Этот раздел описывает, как настроить управление конфигурацией и обнаружение служб с использованием Nacos и Spring Cloud.
После запуска сервера Nacos вы можете использовать следующие примеры кода для настройки управления конфигурацией и обнаружения служб в ваших приложениях Spring Cloud.
Управление конфигурацией:
<dependency>
<groupId>com.alibaba.cloud</groupId>
Используйте эту зависимость для интеграции Nacos в ваше приложение Spring Cloud.
nacos.config.server-addr=127.0.0.1:8848
Это позволит вашему приложению получать конфигурации от сервера Nacos.
@SpringBootApplication
@EnableConfigServer
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
После этого ваше приложение будет готово к получению конфигураций от сервера Nacos.
Обнаружение служб:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
Эта зависимость позволяет вашему приложению обнаруживать другие сервисы в вашей среде.
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
Здесь мы указываем адрес сервера обнаружения (в данном случае это локальный сервер Eureka).
@SpringBootApplication
@EnableEurekaClient
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
С помощью этой аннотации ваше приложение будет регистрироваться на сервере обнаружения и получать информацию о других сервисах. artifactIdspring-cloud-starter-alibaba-nacos-config version${latest.version} dependency
*注意*: версия [2.1.x.RELEASE](https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-alibaba-nacos-config) соответствует Spring Boot 2.1.x версии. Версия [2.0.x.RELEASE](https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-alibaba-nacos-config) соответствует Spring Boot 2.0.x версии, версия [1.5.x.RELEASE](https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-alibaba-nacos-config) — Spring Boot 1.5.x версии.
**Второй шаг**: в *bootstrap.properties* настроить адрес сервера Nacos и имя приложения:
```properties
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
spring.application.name=example
Объяснение: причина настройки spring.application.name заключается в том, что это часть поля dataId управления конфигурацией Nacos.
Третий шаг: в Nacos Spring Cloud поле dataId имеет следующий полный формат:
${prefix}-${spring.profiles.active}.${file-extension}
— prefix по умолчанию равен значению spring.application.name, но его также можно настроить с помощью параметра конфигурации spring.cloud.nacos.config.prefix.
— spring.profiles.active — это профиль текущей среды, подробности см. в документации Spring Boot. Обратите внимание: если spring.profiles.active пуст, соответствующий соединительный символ - также не существует, формат соединения dataId становится ${prefix}.${file-extension}.
— file-exetension — формат данных содержимого конфигурации, который можно настроить с помощью параметра конфигурации spring.cloud.nacos.config.file-extension. В настоящее время поддерживаются только типы properties и yaml.
Четвёртый шаг: реализовать автоматическое обновление конфигурации с помощью аннотации Spring Cloud @RefreshScope:
@RestController
@RequestMapping("/config")
@RefreshScope
public class ConfigController {
@Value("${useLocalCache:false}")
private boolean useLocalCache;
@RequestMapping("/get")
public boolean get() {
return useLocalCache;
}
}
Пятый шаг: сначала опубликовать конфигурацию на сервере Nacos через Open API Nacos: dataId равен example.properties, содержимое — useLocalCache=true:
curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=example.properties&group=DEFAULT_GROUP&content=useLocalCache=true"
Шестой шаг: запустить NacosConfigApplication, вызвать curl http://localhost:8080/config/get, ответ будет true.
Седьмой шаг: снова опубликовать конфигурацию через Open API Nacos на сервере Nacos: dataId равен example.properties, содержимое — useLocalCache=false:
curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=example.properties&group=DEFAULT_GROUP&content=useLocalCache=false"
Восьмой шаг: ещё раз посетите http://localhost:8080/config/get, на этот раз ответ будет false, что означает, что значение useLocalCache в программе было динамически обновлено.
В этом разделе показано, как включить функцию обнаружения служб Nacos в вашем проекте Spring Cloud, реализовав простой echo service, как показано на рисунке ниже:
Первый шаг: добавить зависимость:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>${latest.version}</version>
</dependency>
Обратите внимание: версия 2.1.x.RELEASE соответствует Spring Boot 2.1.x версии. Версия 2.0.x.RELEASE — Spring Boot 2.0.x версии, версия 1.5.x.RELEASE — Spring Boot 1.5.x версии.
Второй шаг: настроить поставщика услуг, чтобы поставщик услуг мог зарегистрировать свои услуги на сервере Nacos с помощью функции регистрации и обнаружения сервисов Nacos.
i. В application.properties настройте адрес сервера Nacos:
server.port=8070
spring.application.name=service-provider
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
ii. Используйте аннотацию Spring Cloud @EnableDiscoveryClient для включения функции регистрации и обнаружения служб:
@SpringBootApplication
@EnableDiscoveryClient
public class NacosProviderApplication {
public static void main(String[] args) {
SpringApplication.run(NacosProviderApplication.class, args);
}
@RestController
class EchoController {
@RequestMapping(value = "/echo/{string}", method = RequestMethod.GET)
public String echo(@PathVariable String string) {
return "Hello Nacos Discovery " + string;
}
}
} **Третий шаг**: настройка потребителя услуг, чтобы он мог получать нужные ему услуги через функцию регистрации и обнаружения сервисов Nacos с сервера Nacos.
i. В `application.properties` настройте адрес сервера Nacos:
```properties
server.port=8080
spring.application.name=service-consumer
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
ii. С помощью Spring Cloud включите функцию обнаружения сервисов, используя аннотацию @EnableDiscoveryClient
. Добавьте аннотацию @LoadBalanced
к экземпляру RestTemplate, чтобы включить интеграцию с @LoadBalanced
и Ribbon:
@SpringBootApplication
@EnableDiscoveryClient
public class NacosConsumerApplication {
@LoadBalanced
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
public static void main(String[] args) {
SpringApplication.run(NacosConsumerApplication.class, args);
}
@RestController
public class TestController {
private final RestTemplate restTemplate;
@Autowired
public TestController(RestTemplate restTemplate) {this.restTemplate = restTemplate;}
@RequestMapping(value = "/echo/{str}", method = RequestMethod.GET)
public String echo(@PathVariable String str) {
return restTemplate.getForObject("http://service-provider/echo/" + str, String.class);
}
}
}
Четвёртый шаг: запустите ProviderApplication
и ConsumerApplication
, вызовите http://localhost:8080/echo/2018
, и получите ответ «Hello Nacos Discovery 2018».
Sentinel — это система защиты стабильности сервисов в распределённых системах, которая фокусируется на трафике. Sentinel защищает стабильность сервисов с разных точек зрения, таких как управление трафиком, аварийное отключение, снижение нагрузки и защита от перегрузки системы. Sentinel обладает следующими характеристиками:
Основные характеристики Sentinel:
Экосистема Sentinel с открытым исходным кодом:
Sentinel состоит из двух частей:
Пример ниже показывает, как можно интегрировать Sentinel за три шага. Кроме того, Sentinel также предоставляет консоль управления, где вы можете отслеживать ресурсы и управлять правилами в режиме реального времени.
ШАГ 1. Добавление Jar-файла Sentinel в приложение
Если вы используете проект Maven, добавьте следующий код в файл pom.xml
:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.1</version>
</dependency>
Обратите внимание: Sentinel поддерживает только JDK 1.8 или более поздние версии. Если вы не используете инструмент управления зависимостями, загрузите JAR-файл непосредственно из Maven Center Repository.
ШАГУ 2. Определение ресурсов
Далее мы используем API Sentinel SphU.entry("HelloWorld")
и entry.exit()
для окружения кода, который необходимо контролировать. В следующем примере мы используем System.out.println("hello world");
в качестве ресурса и оборачиваем его в API (SphU.entry()
и entry.exit()
).
public static void main(String[] args) {
initFlowRules();
while (true) {
Entry entry = null;
try {
entry = SphU.entry("HelloWorld");
/*Ваш бизнес-код - начало*/
System.out.println("hello world");
/*Ваш бизнес-код - конец*/
} catch (BlockException e1) {
/*Логика управления потоком - начало*/
System.out.println("block!");
/*Логика управления потоком - конец*/
} finally {
if (entry != null) {
entry.exit();
}
}
}
}
После выполнения этих двух шагов модификация кода завершена. Конечно, мы также предоставляем модуль поддержки аннотаций, который позволяет определять ресурсы с низкой степенью вмешательства.
ШАГУ 3. Определение правил
Затем мы определяем правила, которые указывают, сколько запросов разрешено для каждого ресурса. Например, следующее правило позволяет ресурсу HelloWorld
обрабатывать не более 20 запросов в секунду.
private static void initFlowRules(){
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("HelloWorld");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 20.
rule.setCount(20);
rules.add(rule);
FlowRuleManager.loadRules(rules);
}
Выполнив эти три шага, вы сможете использовать Sentinel. Для получения дополнительной информации см. документацию. STEP 4. Проверка эффекта
После запуска Demo мы можем увидеть в журнале ~/logs/csp/${appName}-metrics.log.xxx следующий вывод:
|--timestamp-|------date time----|-resource-|p |block|s |e|rt
1529998904000|2018-06-26 15:41:44|HelloWorld|20|0 |20|0|0
1529998905000|2018-06-26 15:41:45|HelloWorld|20|5579 |20|0|728
1529998906000|2018-06-26 15:41:46|HelloWorld|20|15698|20|0|0
1529998907000|2018-06-26 15:41:47|HelloWorld|20|19262|20|0|0
1529998908000|2018-06-26 15:41:48|HelloWorld|20|19502|20|0|0
1529998909000|2018-06-26 15:41:49|HelloWorld|20|18386|20|0|0
Здесь p обозначает количество запросов, успешно прошедших через систему, block — количество заблокированных запросов, s — количество успешных выполнений, e — пользовательские исключения, rt — среднее время отклика. Мы видим, что программа стабильно выводит «hello world» 20 раз в секунду, что соответствует заданному порогу.
STEP 5. Запуск Sentinel Control Panel
Для запуска контрольной панели Sentinel можно обратиться к документации Sentinel (https://github.com/alibaba/Sentinel/wiki/控制台). Панель позволяет отслеживать состояние ресурсов в реальном времени и изменять правила ограничения потока.
InfluxDB — это распределённая база данных временных рядов с открытым исходным кодом, написанная на языке Go. Она не требует внешних зависимостей и занимает первое место в категории баз данных временных рядов по версии DB-Engines.
InfluxDB использует систему хранения TSM (Time-Structured Merge Tree) собственной разработки. TSM основана на LSM (Log-Structured Merge-Tree), но оптимизирована для удовлетворения требований к базам данных временных рядов.
LSM (Log-Structured Merge-Tree) — это принцип работы LevelDB, одного из самых популярных хранилищ ключей и значений. Однако у него есть некоторые ограничения, которые затрудняют его использование в качестве системы хранения для баз данных временных рядов:
TSM разделяет данные на сегменты (Shards) в зависимости от временного диапазона. Поскольку данные временных рядов обычно добавляются последовательно, сегменты также растут линейно. Новые данные обычно записываются в последний сегмент, а не распределяются между несколькими сегментами.
Преимущества сегментирования включают лёгкое физическое удаление данных (достаточно удалить весь сегмент) и возможность масштабирования путём добавления новых сегментов.
В ранних версиях InfluxDB каждый сегмент представлял собой отдельную базу данных LevelDB. Это приводило к проблемам с управлением большим количеством файлов, так как LevelDB использует многоуровневую систему сжатия.
Чтобы решить эту проблему, InfluxDB перешёл на BoltDB, который использует B+ деревья для хранения данных. Однако B+ деревья могут вызывать большое количество случайных операций записи, что снижает производительность.
Наконец, InfluxDB разработал собственную систему хранения TSM, основанную на принципах LSM, но адаптированную под требования баз данных временных рядов. TSM включает в себя такие компоненты, как кэш, WAL (журнал упреждающей записи) и файлы данных, а также операции, такие как очистка и уплотнение данных.
Основные особенности Spring Boot:
Spring — это фреймворк, который упрощает разработку корпоративных приложений на Java. Он предоставляет набор инструментов и библиотек для создания, настройки и управления приложениями.
На диаграмме показаны основные компоненты Spring:
Spring включает в себя следующие модули:
Диаграмма показывает основные пакеты Spring:
На диаграмме представлены основные аннотации Spring:
IoC (Inversion of Control) — это принцип проектирования, при котором управление созданием и зависимостью объектов осуществляется контейнером IoC, а не самим кодом приложения. Это позволяет отделить код приложения от деталей реализации и упростить тестирование.
В Spring IoC реализуется через контейнер IoC, который управляет жизненным циклом компонентов и их зависимостями. Контейнер создаёт экземпляры компонентов, внедряет зависимости и управляет их состоянием.
Существует три способа внедрения зависимостей в Spring:
AOP (Aspect-Oriented Programming) — это парадигма программирования, которая позволяет разделять сквозную логику (аспекты) от бизнес-логики. В Spring AOP реализуется через прокси-объекты, которые перехватывают вызовы методов и добавляют сквозную логику.
Существуют два типа прокси в Spring AOP:
Прокси-объекты могут быть созданы с использованием следующих технологий:
Сквозная логика, которую можно реализовать с помощью AOP, включает:
Фильтры в Spring используются для предварительной обработки запросов перед их передачей контроллеру. Они могут выполнять различные задачи, такие как проверка прав доступа, кодирование/декодирование данных, обработка исключений и т. д.
Фильтры могут быть реализованы с использованием интерфейса Filter или HandlerInterceptor.
Интерфейс Filter предоставляет методы doFilter() для обработки запроса и init() и destroy() для инициализации и очистки фильтра.
HandlerInterceptor предоставляет методы preHandle(), postHandle() и afterCompletion() для перехвата запросов и ответов.
Перехватчики в Spring позволяют добавлять логику до и после выполнения метода контроллера. Они реализуются с использованием интерфейса HandlerInterceptor или AspectJ.
С помощью перехватчиков можно выполнять такие задачи, как авторизация, логирование, кеширование и т. п. MethodInterceptor — это перехватчик в AOP-проекте, который перехватывает цель в виде метода, даже если это не метод контроллера.
Существует два основных способа реализации перехватчика MethodInterceptor:
Front Controller DispatcherServlet: получает запросы и отвечает на них, действует как маршрутизатор и снижает степень сцепления между компонентами. HandlerMapping: определяет обработчик на основе URL запроса. HandlerAdapter: отвечает за выполнение обработчиков. Обработчик Handler: обработчики, которые должны быть разработаны программистом. ViewResolver: разрешает представление на основе логики имени представления и преобразует ModelAndView в реальное представление (View). Представление View: интерфейс представления, поддерживающий различные типы представлений, такие как jsp, freemarker, pdf и другие.
Последовательность выполнения обычного HTTP-запроса пользователя:
Запрос → Контроллер → Сервис → DAO → База данных
После добавления фильтров и перехватчиков последовательность выполнения становится следующей:
Запрос → Фильтр → Контроллер → Перехватчик → Сервис → DAO → База данных
Пользовательские расширения могут быть добавлены до или после инициализации всего Spring контейнера. Существует три способа расширения:
Расширение выполняется после чтения beanDefinition из проекта и предоставляет дополнительную точку расширения. Здесь можно динамически регистрировать собственные beanDefinitions или загружать beanDefinitions из classpath. BeanFactoryPostProcessor
Является расширением интерфейса beanFactory. Вызывается после того, как Spring считывает информацию о beanDefinition, но перед тем, как происходит создание экземпляра bean. В этот момент пользователь может обработать некоторые вещи через реализацию этого расширения, например, изменить метаинформацию уже зарегистрированного beanDefinition.
public class TestBeanFactoryPostProcessor implements BeanFactoryPostProcessor {
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
System.out.println("[BeanFactoryPostProcessor]");
}
}
InstantiationAwareBeanPostProcessor
Этот интерфейс наследует от BeanPostProcess. Отличие состоит в следующем:
Основные точки расширения включают пять методов, которые применяются на двух основных этапах жизненного цикла bean: создание экземпляра и инициализация. Методы вызываются в следующем порядке:
postProcessBeforeInstantiation
: перед созданием экземпляра bean, аналогично new для этого bean.postProcessAfterInstantiation
: после создания экземпляра bean, аналогично новому для этого bean.postProcessPropertyValues
: когда bean полностью создан и происходит внедрение свойств с использованием аннотаций @Autowired
и @Resource
.postProcessBeforeInitialization
: перед инициализацией bean, аналогично внедрению bean в контекст Spring.postProcessAfterInitialization
: после инициализации bean, аналогично завершению внедрения bean в контекст Spring.Применение: Этот расширенный функционал полезен в различных сценариях, включая написание промежуточного программного обеспечения и бизнес-логику. Он позволяет выполнять сбор информации о bean, реализующих определённый интерфейс, или устанавливать значения для определённых типов bean.
public class TestInstantiationAwareBeanPostProcessor implements InstantiationAwareBeanPostProcessor {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
System.out.println("[TestInstantiationAwareBeanPostProcessor] before initialization " + beanName);
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
System.out.println("[TestInstantationAwareBeanPostProcessor] after initialization " + beanName);
return bean;
}
@Override
public Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException {
System.out.println("[TestInstantiationAwareBeanPostProcessor] before instantiation " + beanName);
return null;
}
@Override
public boolean postProcessAfterInstantiation(Object bean, String beanName) throws BeansException {
System.out.println("[TestInstantiationAwareBeanPostProcessor] after instantiation " + beanName);
return true;
}
@Override
public PropertyValues postProcessPropertyValues(PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException {
System.out.println("[TestInstantiationAwareBeanPostProcessor] postProcessPropertyValues " + beanName);
return pvs;
}
}
SmartInstantiationAwareBeanPostProcessor
Интерфейс имеет три точки расширения:
predictBeanType
: эта точка расширения происходит до postProcessBeforeInstantiation
. Метод используется для прогнозирования типа Bean и возвращает первый успешно предсказанный класс. Если предсказание невозможно, возвращается null.determineCandidateConstructors
: эта точка расширения следует после postProcessBeforeInstantiation
и используется для определения конструкторов, используемых для создания экземпляра данного bean. Пользователь может расширить эту точку для выбора соответствующего конструктора.getEarlyBeanReference
: эта точка расширения возникает после postProcessAfterInstantiation
, особенно в сценариях с циклическими зависимостями. Когда экземпляр bean готов, чтобы предотвратить циклические зависимости, происходит предварительное раскрытие метода обратного вызова. Этот метод вызывается в методе обратного вызова, который был предварительно раскрыт. ```
beanName) throws BeansException {
### BeanFactoryAware
Этот класс имеет только одну точку расширения, которая происходит после создания экземпляра компонента, но перед внедрением свойств, то есть до вызова сеттеров. Метод точки расширения этого класса — `setBeanFactory`, который позволяет получить атрибут `BeanFactory`.
Сценарий использования: можно получить `BeanFactory` после создания компонента, но до его инициализации, чтобы выполнить специализированную настройку для каждого компонента или сохранить `BeanFactory` в кэше для последующего использования.
```java
public class TestBeanFactoryAware implements BeanFactoryAware {
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
System.out.println("[TestBeanFactoryAware] " + beanFactory.getBean(TestBeanFactoryAware.class).getClass().getSimpleName());
}
}
Сам по себе этот класс не имеет точек расширения, но внутри него есть шесть точек расширения, которые могут быть реализованы и срабатывают после создания экземпляров компонентов, но до их инициализации. Этот класс используется для выполнения различных драйверов интерфейса, и после создания и заполнения свойств компонентов через выполнение расширенных интерфейсов, указанных в красном поле, можно получить соответствующие переменные контейнера. Поэтому здесь следует сказать, что существует шесть точек расширения:
EnvironmentAware
: используется для получения расширения класса EnviromentAware
, этот атрибут очень полезен и может получать все параметры системы. Конечно, я считаю, что расширение Aware
не обязательно, потому что Spring может напрямую внедрить его через аннотацию.EmbeddedValueResolverAware
: используется для получения расширения класса StringValueResolver
, которое используется для получения переменных на основе типа String
, обычно мы используем @Value
для получения, если вы реализуете этот интерфейс Aware
, сохраните StringValueResolver
в кэш и получите объект String
через этот класс, эффект будет таким же.ResourceLoaderAware
: используется для получения расширения класса ResourceLoader
, который может использоваться для получения ресурсов в пути к классам, вы можете расширить этот класс для получения объекта ResourceLoader
.ApplicationEventPublisherAware
: используется для получения расширения класса ApplicationEventPublisher
, который можно использовать для публикации событий и совместного использования с ApplicationListener
. Этот объект также можно внедрить через Spring.MessageSourceAware
: используется для получения расширения класса MessageSource
, который в основном используется для интернационализации.ApplicationContextAware
: используется для получения расширения класса ApplicationContext
, который должен быть хорошо знаком многим людям. Это диспетчер контекста Spring, который может вручную получать любые компоненты, зарегистрированные в контексте Spring. Мы часто расширяем этот интерфейс для кэширования Spring Context и инкапсуляции его в статический метод. В то же время ApplicationContext
также реализует интерфейсы BeanFactory
, MessageSource
и ApplicationEventPublisher
, которые также могут использоваться для связанных операций.class ApplicationContextAwareProcessor implements BeanPostProcessor {
private void invokeAwareInterfaces(Object bean) {
if (bean instanceof EnvironmentAware) {
((EnvironmentAware) bean).setEnvironment(this.applicationContext.getEnvironment());
}
if (bean instanceof EmbeddedValueResolverAware) {
((EmbeddedValueResolverAware) bean).setEmbeddedValueResolver(this.embeddedValueResolver);
}
if (bean instanceof ResourceLoaderAware) {
((ResourceLoaderAware) bean).setResourceLoader(this.applicationContext);
}
if (bean instanceof ApplicationEventPublisherAware) {
((ApplicationEventPublisherAware) bean).setApplicationEventPublisher(this.applicationContext);
}
if (bean instanceof MessageSourceAware) {
((MessageSourceAware) bean).setMessageSource(this.applicationContext);
}
if (bean instanceof ApplicationContextAware) {
((ApplicationContextAware) bean).setApplicationContext(this.applicationContext);
}
}
}
Это ещё один вид расширения Aware
. Точка расширения находится перед инициализацией компонента, то есть перед postProcessBeforeInitialization
, и метод точки расширения этого класса только один: setBeanName
.
Сценарии использования: пользователи могут расширять эту точку, получать beanName
из контейнера Spring перед инициализацией компонента и самостоятельно изменять значение beanName
.
public class NormalBeanA implements BeanNameAware{
public NormalBeanA() {
System.out.println("NormalBean constructor");
}
@Override
public void setBeanName(String name) {
System.out.println("[BeanNameAware] " + name);
}
}
Это не точка расширения, а фактически аннотация. Его функция заключается в том, что если метод аннотирован @PostConstruct
, он будет вызываться на этапе инициализации компонента. Здесь важно обратить внимание на стандартную точку срабатывания, которая находится после postProcessBeforeInitialization
и перед InitializingBean.afterPropertiesSet
. Сценарии использования: пользователь может аннотировать определённый метод для инициализации определённого свойства.
public class NormalBeanA {
public NormalBeanA() {
System.out.println("NormalBean constructor");
}
@PostConstruct
public void init(){
System.out.println("[PostConstruct] NormalBeanA");
}
}
В этом классе есть два метода, которые можно использовать для настройки инициализации компонентов: afterPropertiesSet
и destroy
. Первый метод вызывается после внедрения всех свойств компонента, а второй — при уничтожении компонента. Фабрика бинов в Mybatis
Динамический прокси интерфейса Mapper в MyBatis также внедряется через фабрику бинов Spring.
public class MapperFactoryBean<T> extends SqlSessionDaoSupport implements FactoryBean<T> {
// Тип интерфейса mapper
private Class<T> mapperInterface;
@Override
public T getObject() throws Exception {
// Получаем динамический прокси объекта интерфейса через SqlSession
return getSqlSession().getMapper(this.mapperInterface);
}
@Override
public Class<T> getObjectType() {
return this.mapperInterface;
}
}
Фабрика бинов в OpenFeign
Прокси интерфейса FeignClient также внедряется через фабрику бинов Spring.
public class FeignClientFactoryBean implements
FactoryBean<Object>, InitializingBean, ApplicationContextAware {
// Тип интерфейса FeignClient
private Class<?> type;
@Override
public Object getObject() throws Exception {
return getTarget();
}
@Override
public Class<?> getObjectType() {
return type;
}
}
@Import
Основная функция @Import — импортировать классы конфигурации и, в зависимости от используемых аннотаций (например, @EnableXXX), определять, какие бины должны быть внедрены в Spring.
public class UserImportSelector implements ImportSelector {
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
System.out.println("Вызов метода selectImports класса UserImportSelector для получения списка имён классов");
return new String[]{"com.sanyou.spring.extension.User"};
}
}
@Import(UserImportSelector.class)
public class Application {
public static void main(String[] args) {
AnnotationConfigApplicationContext applicationContext =
new AnnotationConfigApplicationContext();
// Регистрируем класс Application в контейнере
applicationContext.register(Application.class);
applicationContext.refresh();
System.out.println("Полученный Bean: " + applicationContext.getBean(User.class));
}
}
// Результат выполнения:
// Вызов метода selectImports класса UserImportSelector для получения списка имён классов
// Полученный Bean: com.sanyou.spring.extension.User@282003e1
public class UserImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) {
// Создаём определение Bean с типом User
AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.rootBeanDefinition(User.class)
// Устанавливаем значение свойства username для User как «张小凡»
.addPropertyValue("username", "张小凡")
.getBeanDefinition();
System.out.println("Внедрение User в контейнер Spring");
// Зарегистрируем определение Bean в контейнере
registry.registerBeanDefinition("user", beanDefinition);
}
}
// Импортируем UserImportBeanDefinitionRegistrar
@Import(UserImportBeanDefinitionRegistrar.class)
public class Application {
public static void main(String[] args) {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
// Регистрация класса Application в контейнере
applicationContext.register(Application.class);
applicationContext.refresh();
User user = applicationContext.getBean(User.class);
System.out.println("Полученный Bean: " + user + ", значение свойства username: " + user.getUsername());
}
}
// Результат выполнения:
// Внедрение User в контейнер Spring
// Полученный Bean: com.sanyou.spring.extension.User@6385cb26, значение свойства username: 张小凡
BeanPostProcessor работает во время создания бина. BeanPostProcessor является важным расширением точки в процессе создания бина, поскольку каждый этап создания бина вызывает методы BeanPostProcessor и его дочерних интерфейсов, передавая текущий создаваемый бин. Если вы хотите расширить определённый этап процесса создания бина, вы можете создать собственный BeanPostProcessor. Два часто используемых дочерних интерфейса:
public class UserBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof User) {
// Если текущий бин имеет тип User, присваиваем значение «张小凡» свойству username этого объекта
((User)
``` **Spring встроенные BeanPostProcessor**
| BeanPostProcessor | Функция |
| :------------------ | :------------------------------------------------- |
| AutowiredAnnotationBeanPostProcessor | Обработка @Autowired, @Value аннотаций |
| CommonAnnotationBeanPostProcessor | Обработка @Resource, @PostConstruct, @PreDestroy аннотаций |
| AnnotationAwareAspectJAutoProxyCreator | Обработка некоторых анноток или AOP-срезов динамического прокси |
| ApplicationContextAwareProcessor | Обработка аннотированных интерфейсов внедрения Aware |
| AsyncAnnotationBeanPostProcessor | Обработка @Async аннотации |
| ScheduledAnnotationBeanPostProcessor | Обработка @Scheduled аннотации |
**Применение BeanPostProcessor в Dubbo**
В Dubbo можно использовать @DubboReference (@Reference) для ссылки на интерфейс, предоставленный производителем. Обработка этой аннотации также осуществляется с помощью ReferenceAnnotationBeanPostProcessor, который является расширением BeanPostProcessor.
```java
public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements ApplicationContextAware, BeanFactoryPostProcessor {
// Игнорируется
}
BeanFactoryPostProcessor позволяет обрабатывать Spring контейнеры. Метод ввода параметров — это сам Spring контейнер. Через этот интерфейс можно выполнять любые операции над контейнером.
public class MyBeanFactoryPostProcessor implements BeanFactoryPostProcessor {
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
// Запретить циклические зависимости
((DefaultListableBeanFactory) beanFactory).setAllowCircularReferences(false);
}
}
SPI (Service Provider Interface) — это механизм динамической замены. Это отличная идея для развязки, которая позволяет интерфейсам и реализациям быть разделенными, а API-провайдерам предоставлять только интерфейсы, которые могут быть реализованы третьими сторонами. Затем можно использовать файлы конфигурации для замены или расширения. Этот механизм широко используется в фреймворках и повышает их расширяемость.
Согласно механизму Spring SPI, файлы конфигурации должны находиться в папке META-INF в пути к классам. Имя файла должно быть spring.factories, а содержимое файла — пары ключ-значение. Один ключ может иметь несколько значений, разделенных запятыми. Ключ и значение не обязательно должны быть связаны друг с другом.
В среде SpringBoot внешние конфигурационные файлы поддерживают форматы свойств и yaml. Однако если вы хотите использовать файлы конфигурации в формате json, вы можете использовать PropertySourceLoader.
public interface PropertySourceLoader {
// Может ли он анализировать файлы определенного формата
String[] getFileExtensions();
// Анализировать файлы конфигурации, читать содержимое и возвращать список PropertySource<?>
List<PropertySource<?>> load(String name, Resource resource) throws IOException;
}
SpringBoot предоставляет два варианта реализации PropertySourceLoader:
PropertiesPropertySourceLoader: может анализировать файлы конфигурации свойств или xml.
public class PropertiesPropertySourceLoader implements PropertySourceLoader {
private static final String XML_FILE_EXTENSION = ".xml";
public PropertiesPropertySourceLoader() {
}
public String[] getFileExtensions() {
return new String[]{"properties", "xml"};
}
public List<PropertySource<?>> load(String name, Resource resource) throws IOException {
Map<String, ?> properties = this.loadProperties(resource);
return properties.isEmpty() ? Collections.emptyList() : Collections.singletonList(new OriginTrackedMapPropertySource(name, Collections.unmodifiableMap(properties), true));
}
private Map<String, ?> loadProperties(Resource resource) throws IOException {
String filename = resource.getFilename();
return (Map)(filename != null && filename.endsWith(".xml") ? PropertiesLoaderUtils.loadProperties(resource) : (new
- **YamlPropertySourceLoader**: анализирует конфигурационные файлы, которые заканчиваются на .yml или .yaml.
```java
public class YamlPropertySourceLoader implements PropertySourceLoader {
@Override
public String[] getFileExtensions() {
return new String[] { "yml", "yaml" };
}
@Override
public List<PropertySource<?>> load(String name, Resource resource) throws IOException {
if (!ClassUtils.isPresent("org.yaml.snakeyaml.Yaml", null)) {
throw new IllegalStateException(
"Attempted to load " + name + " but snakeyaml was not found on the classpath");
}
List<Map<String, Object>> loaded = new OriginTrackedYamlLoader(resource).load();
if (loaded.isEmpty()) {
return Collections.emptyList();
}
List<PropertySource<?>> propertySources = new ArrayList<>(loaded.size());
for (int i = 0; i < loaded.size(); i++) {
String documentNumber = (loaded.size() != 1) ? " (document #" + i + ")" : "";
propertySources.add(new OriginTrackedMapPropertySource(name + documentNumber,
Collections.unmodifiableMap(loaded.get(i)), true));
}
return propertySources;
}
}
Реализация поддержки формата JSON
public class JsonPropertySourceLoader implements PropertySourceLoader {
@Override
public String[] getFileExtensions() {
//этот метод указывает, что класс поддерживает анализ конфигурационных файлов, заканчивающихся на .json
return new String[]{"json"};
}
@Override
public List<PropertySource<?>> load(String name, Resource resource) throws IOException {
ReadableByteChannel readableByteChannel = resource.readableChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate((int) resource.contentLength());
//читаем содержимое файла в ByteBuffer
readableByteChannel.read(byteBuffer);
//преобразуем прочитанные байты в строку
String content = new String(byteBuffer.array());
// преобразуем строку в объект JSONObject
JSONObject jsonObject = JSON.parseObject(content);
Map<String, Object> map = new HashMap<>(jsonObject.size());
//извлекаем пары ключ-значение из JSON и помещаем их в map
for (String key : jsonObject.keySet()) {
map.put(key, jsonObject.getString(key));
}
return Collections.singletonList(new MapPropertySource("jsonPropertySource", map));
}
}
Используйте механизм SPI для загрузки реализаций PropertySourceLoader. В файле spring.factories настройте следующее:
org.springframework.boot.env.PropertySourceLoader=\
cn.test.JsonPropertySourceLoader
В файле application.json настройте следующее:
{
"test.userName": "张小凡"
}
Создайте класс User:
public class User {
// внедряем свойства из конфигурационного файла
@Value("${test.userName:}")
private String userName;
}
Запустите проект:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(Application.class);
User user = applicationContext.getBean(User.class);
System.out.println("Полученный Bean равен " + user + ", значение свойства userName равно: " + user.getUserName());
}
@Bean
public User user() {
return new User();
}
}
// результат выполнения:
// Полученный Bean равен com.sanyou.spring.extension.User@481ba2cf, значение свойства userName равно: 张小凡
Nacos как центр конфигурации поддерживает не только файлы в формате properties и yaml, но и файлы в формате JSON. Поэтому клиент должен уметь анализировать эти форматы. SpringBoot уже поддерживает файлы в форматах properties и yaml. Nacos нужно только реализовать поддержку тех форматов, которые не поддерживаются SpringBoot.
ApplicationContextInitializer — это ещё одна точка расширения процесса запуска SpringBoot. Во время запуска SpringBoot вызывает метод initialize этого класса и передаёт ему ConfigurableApplicationContext. Саньюу: использование пользовательских тегов в Spring
myBean, атрибут класса в теге myBean определяет тип Bean, который будет внедрён в контейнер Spring. Функция этого тега аналогична другим тегам Spring.
Второй шаг: анализ пространства имён Анализ пространства имён прост. Spring предоставляет для этого всё необходимое — интерфейс NamespaceHandler. Достаточно реализовать этот интерфейс. Однако обычно мы не реализуем NamespaceHandler напрямую, а наследуем класс NamespaceHandlerSupport, который уже реализует этот интерфейс.
public class SanYouNameSpaceHandler extends NamespaceHandlerSupport {
@Override
public void init() {
// Регистрация парсера myBean-тегов
registerBeanDefinitionParser("myBean", new SanYouBeanDefinitionParser());
}
private static class SanYouBeanDefinitionParser extends AbstractSingleBeanDefinitionParser {
@Override
protected boolean shouldGenerateId() {
return true;
}
@Override
protected String getBeanClassName(Element element) {
return element.getAttribute("class");
}
}
}
Функция SanYouNameSpaceHandler заключается в чтении тега mybean из пространства имён sanyou и получении атрибута class. Затем этот атрибут используется для определения типа class, который внедряется в Spring-контейнер. Код, связанный с регистрацией, делегируется родительскому классу SanYouBeanDefinitionParser.
Третий шаг: создание и настройка файлов spring.handlers и spring.schemas Сначала создаются файлы spring.handlers и spring.schemas. Содержимое файла spring.handlers:
http\://sanyou.com/schema/sanyou=com.sanyou.spring.extension.namespace.SanYouNameSpaceHandler
Файл конфигурации spring.handlers сообщает, что пространство имён sanyou должно обрабатываться SanYouNameSpaceHandler. Содержимое файла spring.schemas:
http\://sanyou.com/schema/sanyou.xsd=META-INF/sanyou.xsd
В файле spring.schemas указан путь к файлу xsd. Файлы можно разместить в папке classpath/META-INF.
Четвёртый шаг: тестирование Создаётся файл applicationContext.xml и помещается в папку resources:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:sanyou="http://sanyou.com/schema/sanyou"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://sanyou.com/schema/sanyou
http://sanyou.com/schema/sanyou.xsd">
<!-- Использование sanyou-тега для настройки User Bean -->
<sanyou:mybean class="com.sanyou.spring.extension.User"/>
</beans>
Затем создаётся тестовый класс:
public class Application {
public static void main(String[] args) {
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
applicationContext.refresh();
User user = applicationContext.getBean(User.class);
System.out.println(user);
}
}
// Результат выполнения:
// com.sanyou.spring.extension.User@27fe3806
Eureka
Netflix Eureka — это основанный на REST компонент обнаружения сервисов от Netflix, включающий в себя сервер Eureka Server и клиент Eureka Client.
Клиент Eureka отвечает за регистрацию информации о сервисе на сервере Eureka. Сервер Eureka, похожий на реестр, содержит информацию о расположении и портах различных сервисов. Клиенты могут использовать эту информацию для поиска сервисов.
Рабочий процесс Eureka выглядит следующим образом:
В микросервисной архитектуре между сервисами обычно происходят межпроцессные вызовы, и коммуникация по сети может столкнуться с различными проблемами, такими как нормальное состояние микросервиса, сетевые разделы или сбои. Если большое количество экземпляров микросервисов будет отменено в течение фиксированного периода времени, это может серьёзно угрожать доступности всей микросервисной архитектуры.
Для решения этой проблемы Eureka разработала механизм самозащиты. Механизм самозащиты Eureka работает следующим образом:
После перехода Eureka Server в режим самозащиты возникают следующие ситуации:
Механизм самозащиты Eureka предназначен для предотвращения случайного удаления сервисов. Он работает следующим образом:
Если сервис-провайдер неожиданно перестаёт работать во время периода самозащиты, сервис-потребитель получит недействительный экземпляр сервиса, и вызов завершится неудачно. Чтобы решить эту проблему, сервис-потребителю необходимо иметь механизмы отказоустойчивости, такие как повторные попытки или прерыватели цепи.
Чтобы включить или отключить режим самозащиты в конфигурации Eureka Server, можно использовать следующий параметр:
eureka.server.enable-self-preservation=true
Клиент Eureka
Клиент Eureka — это Java-клиент для взаимодействия с Eureka Server. Клиент Eureka извлекает, обновляет и кэширует информацию о Eureka Server. Даже если все узлы Eureka Server выйдут из строя, сервис-потребитель всё равно сможет использовать кэшированную информацию для поиска сервис-провайдера. Однако при изменении сервиса информация может стать неактуальной.
Основные функции клиента Eureka включают:
# интервал вызова задачи продления, по умолчанию 30 секунд
eureka.instance.lease-renewal-interval-in-seconds=30
# срок действия услуги, по умолчанию 90 секунд.
eureka.instance.lease-expiration-duration-in-seconds=90
DiscoveryManager.getInstance().shutdownComponent();
Кэшированная информация о реестре используется в качестве основы для сервисов-потребителей, поэтому необходимо учитывать два важных параметра:
properties # включение функции извлечения реестра услуг потребителями из центра регистрации eureka.client.fetch-registry=true # установка интервала обновления реестра услуг потребителем из центра регистрации eureka.client.registry-fetch-interval-seconds=30
Механизм кэширования Eureka
Данные в Eureka Server хранятся в двухуровневой структуре ConcurrentHashMap:
Eureka Server предоставляет двухуровневый механизм кэширования для повышения эффективности отклика. Первый уровень кэша — readOnlyCacheMap, который является ConcurrentHashMap только для чтения. Этот кэш синхронизируется с readWriteCacheMap каждые 30 секунд по умолчанию. Второй уровень кэша — readWriteCacheMap, основанный на Guava Cache. Данные в readWriteCacheMap синхронизируются с хранилищем данных. Когда данные отсутствуют в кэше, они загружаются из CacheLoader.load и помещаются в кэш. Срок действия данных в readWriteCacheMap составляет 180 секунд по умолчанию, и данные удаляются при отключении службы, истечении срока действия, регистрации или изменении состояния.
При получении данных клиент Eureka сначала пытается получить их из первого уровня кэша. Если данные отсутствуют, они извлекаются из второго уровня кэша. В противном случае данные синхронизируются из хранилища данных в кэш перед извлечением. Двухуровневый механизм кэширования в Eureka Server эффективно сокращает время отклика и обеспечивает поддержку различных типов данных в зависимости от сценария использования. 90-е годы могут быть восприняты другими сервисами:
Эти три кэша вместе могут привести к максимальному времени задержки регистрации сервисов в 90 секунд, что требует внимания в особых бизнес-сценариях.
Общие настройки Eureka Server:
# Включить режим самозащиты сервера, который был описан в предыдущем разделе.
eureka.server.enable-self-preservation=true
# Интервал времени (в миллисекундах), через который будут удаляться устаревшие данные о сервисах (по умолчанию — 60 * 1000).
eureka.server.eviction-interval-timer-in-ms= 60000
# Период времени, через который удаляются устаревшие дельта-данные.
eureka.server.delta-retention-timer-interval-in-ms=0
# Ограничитель частоты запросов.
eureka.server.rate-limiter-burst-size=10
# Включен ли ограничитель частоты запросов.
eureka.server.rate-limiter-enabled=false
# Среднее значение частоты запросов.
eureka.server.rate-limiter-full-fetch-average-rate=100
# Применяется ли ограничение частоты запросов к стандартным клиентам. Если false, то ограничение применяется только к нестандартным клиентам.
eureka.server.rate-limiter-throttle-standard-clients=false
# Средняя частота запросов для получения данных о зарегистрированных сервисах и списках сервисов.
eureka.server.rate-limiter-registry-fetch-average-rate=500
# Список доверенных клиентов.
eureka.server.rate-limiter-privileged-clients=
# Ожидаемый процент обновления клиентских договоров в заданном временном диапазоне.
eureka.server.renewal-percent-threshold=0.85
# Пороговое значение времени обновления договоров.
eureka.server.renewal-threshold-update-interval-ms=0
# Время истечения срока действия кэшированных данных о регистрации сервисов.
eureka.server.response-cache-auto-expiration-in-seconds=180
# Частота обновления кэша данных о сервисах.
eureka.server.response-cache-update-interval-ms=0
# Длительность хранения данных в очереди дельт для обеспечения целостности при поиске.
eureka.server.retention-time-in-m-s-in-delta-queue=0
# Следует ли синхронизировать временные метки при их расхождении.
eureka.server.sync-when-timestamp-differs=true
# Используется ли стратегия только для чтения для кэша ответов, которая предотвращает истечение срока действия данных.
eureka.server.use-read-only-response-cache=true
################server node 与 node 之间关联的配置#####################33
# Отправляются ли сжатые данные в запросах репликации, всегда сжимаются.
eureka.server.enable-replicated-request-compression=false
# Указывает, следует ли обрабатывать запросы репликации пакетами для повышения эффективности сети.
eureka.server.batch-replication=false
# Максимальное количество событий репликации, которые могут быть сохранены в пуле репликации для резервного копирования. Этот пул отвечает за все события, кроме обновлений состояния. Размер пула можно настроить в зависимости от размера памяти, тайм-аута и потока репликации.
eureka.server.max-elements-in-peer-replication-pool=10000
# Максимальное количество событий репликации, которые могут быть сохранены в пуле состояний для резервного копирования.
eureka.server.max-elements-in-status-replication-pool=10000
# Максимальный период бездействия для потоков синхронизации между узлами кластера.
eureka.server.max-idle-thread-age-in-minutes-for-peer-replication=15
# Максимальный период бездействия для потоков синхронизации состояния между узлами кластера.
eureka.server.max-idle-thread-in-minutes-age-for-status-reпликация=15
# Максимальное количество потоков для репликации данных между экземплярами серверов.
eureka.server.max-threads-for-peer-репликация=20
# Максимальное количество потоков для синхронизации состояния между экземплярами серверов.
eureka.server.max-threads-for-статус-репликация=1
# Максимальная продолжительность обмена данными между экземплярами серверов.
eureka.server.max-time-for-репликация=30000
# Минимальное количество нормальных экземпляров серверов для репликации между серверами. -1 означает, что сервер является одноузловым.
eureka.server.min-available-instances-for-одноранговая-репликация=-1
# Минимальное количество потоков для запуска репликации между экземплярами серверов.
eureka.server.min-threads-для-одноранговой-репликации=5
# Минимальное количество потоков, используемых для синхронизации состояний между экземплярами серверов.
eureka.server.min-потоки-для-статуса-репликация=1
# Количество попыток повторной попытки репликации данных.
eureka.server.number-of-репликационные-попытки=5
# Интервал обновления данных между узлами серверов. По умолчанию 10 минут.
eureka.server.peer-eureka-nodes-обновление-интервал-мс=600000
# Интервал синхронизации обновлений статуса между серверами.
eureka.server.peer-eureka-статус-обновить-время-интервал-мс=0
# Тайм-аут подключения между равноправными узлами серверов.
eureka.server.равный-узел-подключение-тайм-аут-мс=200
# Истечение времени простоя соединения между равноправными узлами серверов после установления соединения.
eureka.server.равный-узел-соединение-простой-тайм-аут-секунд=30
# Тайм-аут чтения данных соединения между равноправными узлами серверов.
eureka.server.равный-узел-чтение-тайм-аут-мс=200
# Общее максимальное количество соединений между узлами серверов.
eureka.server.равный-узел-общее-количество-соединений=1000
# Максимальное количество соединений с одним хостом между узлами серверов.
eureka.server.равный-узел-общее-соединения-на-хост=10
# Количество попыток синхронизации реестра при запуске сервера.
eureka.server.реестр-синхронизация-повторы=
# Интервал ожидания между попытками синхронизации реестра при запуске сервера.
eureka.server.реестр-синхронизация-повторная попытка-ожидание-мс=
# Ожидание сервера перед получением информации о регистрации экземпляра при запуске, если информация не может быть получена от равноправного узла.
eureka.server.ожидание-времени-в-мс-при-синхронизации-пусто=0
Настройки Eureka Client:
# Включён ли клиент.
eureka.client.включено=true
# Регистрирует ли экземпляр собственные данные на сервере Eureka. По умолчанию true.
eureka.клиент.зарегистрироваться-с-эврикой=ложь
# Получает ли клиент информацию о регистрации с сервера Eureka. По умолчанию верно.
eureka.клиент.получить-реестр=ложь
# Отфильтровывает ли клиент экземпляры, находящиеся в состоянии «не работает». По умолчанию верно.
eureka.клиент.фильтр-только-экземпляры-вверх=верно
# Адрес зоны и URL-адрес службы для связи с сервером Eureka после регистрации.
eureka.клиент.serviceUrl.defaultZone=http://${eureka.instance.hostname}:${server.port}/eureka/
# Время ожидания соединения после подключения клиента к серверу Eureka. По умолчанию 30 секунд.
eureka.клиент.eureka-соединение-бездействие-тайм-аут-секунды=30
# Тайм-аут соединения клиента с сервером Eureka. По умолчанию 5 секунд.
eureka.клиент.eureka-сервер-подключиться-тайм-аут-секунды=5
# Тайм-аут чтения клиентом данных с сервера Eureka.
eureka.клиент.eureka-server-читать-тайм-аут-секунды=8
# Общее количество подключений клиента ко всем серверам Eureka. По умолчанию 200.
eureka.клиент.eureka-server-общее-число-соединений=200
# Количество подключений клиента к одному серверу Eureka. По умолчанию 50. ### Eureka Instance 常用配置:
```properties
#服务注册中心实例的主机名
eureka.instance.hostname=localhost
#注册在Eureka服务中的应用组名
eureka.instance.app-group-name=
#注册在的Eureka服务中的应用名称
eureka.instance.appname=
#该实例注册到服务中心的唯一ID
eureka.instance.instance-id=
#该实例的IP地址
eureka.instance.ip-address=
#该实例,相较于hostname是否优先使用IP
eureka.instance.prefer-ip-address=false
Eureka Server в кластере синхронизируют данные между собой через Replicate (копирование). Узлы не имеют разделения на главные и подчинённые, все они равноправны. В этой архитектуре узлы регистрируются друг у друга для повышения доступности, каждому узлу необходимо добавить один или несколько действительных serviceUrl, указывающих на другие узлы.
Если один из узлов Eureka Server выходит из строя, запросы клиентов Eureka Client автоматически перенаправляются на новый узел Eureka Server. Когда отказавший узел снова становится доступен, он снова включается в управление кластером серверов. Когда узел начинает принимать клиентские запросы, все операции будут выполняться с копированием запросов на все известные узлы Eureka Server.
Синхронизация данных между узлами Eureka Server следует принципу: «достаточно одной линии связи, чтобы соединить узлы, и информация может быть передана». Если существует несколько узлов, достаточно соединить их попарно, чтобы сформировать путь, и тогда все центры регистрации смогут обмениваться информацией. Каждый Eureka Server также является Eureka Client, и несколько Eureka Servers синхронизируют таблицы служб друг с другом через P2P.
Состояние между Eureka Server синхронизируется асинхронно, поэтому нет гарантии, что состояние между узлами будет одинаковым, но в конечном итоге оно будет согласованным.
Разделение Eureka
Eureka предоставляет концепции Region и Zone для разделения, которые заимствованы из AWS Amazon:
На рисунке выше us-east-1c, us-east-1d и us-east-1e представляют разные зоны. Клиенты Eureka Client в зоне предпочитают синхронизироваться с серверами Eureka Server в той же зоне, а также вызывать конечные точки, предпочитая получать список служб от серверов Eureka Server в своей зоне. После сбоя сервера Eureka Server в зоне он будет получать информацию из других зон.
Гарантия доступности Eureka
Все узлы Eureka Server равны, и сбой нескольких узлов не повлияет на работу оставшихся узлов. Оставшиеся узлы всё ещё могут предоставлять услуги регистрации и поиска. Клиент Eureka Client при попытке подключения к определённому серверу Eureka и обнаружении сбоя соединения автоматически переключится на другой узел. Пока работает хотя бы один сервер Eureka, регистрация услуг будет доступна (гарантия доступности), но полученная информация может быть не самой последней (не гарантируется строгая согласованность).
Протокол согласованности Eureka
Основное отличие между Eureka и Zookeeper заключается в том, что Eureka — это модель AP, а Zookeeper — модель CP. В случае разделения мозга Eureka обеспечивает доступность (каждый раздел всё ещё может предоставлять услуги независимо), что является децентрализованным. Как Eureka достигает конечной согласованности?
Широковещательное сообщение
Eureka Server управляет полным списком серверов (PeerEurekaNodes).
Когда Eureka Server получает запрос на регистрацию, отключение или сердцебиение от клиента, он широковещательно передаёт задачу через PeerEurekaNode другим серверам. Если широковещательная передача не удалась, она будет повторена до тех пор, пока задача не истечёт и не будет отменена. В это время данные между двумя серверами будут временно несогласованными. Примечание: Хотя широковещательная передача сообщения не удалась, пока принимается сердцебиение клиента, задача широковещательной передачи сердцебиения всё равно будет отправлена всем серверам (включая отключённые серверы).
Если сеть восстанавливается и принимаются широковещательные задачи сердцебиения от других серверов, возможны три ситуации:
Spring Cloud Eureka при запуске приложения инициализируется в классе EurekaAutoServiceRegistration и активно регистрируется на сервере Eureka.
После запуска Eureka запускает 40-секундный периодический процесс, который отслеживает изменения в IP-адресе и конфигурации приложения и при обнаружении изменений повторно регистрирует приложение.
При возврате статуса 404 при продлении регистрации происходит повторная регистрация.
Активное отключение
Eureka выполняет метод shutDown при закрытии контейнера Spring, сначала устанавливая своё состояние в DOWN, а затем отправляя команду cancel на сервер Eureka для отключения своего сервиса.
Проверка здоровья и автоматическое отключение
Проверка здоровья обычно основана на механизме TTL (Time To Live). Например, клиент отправляет сердцебиение каждые 5 секунд, и если сервер не получает сердцебиение в течение 15 секунд, он обновляет статус экземпляра как нездоровый. Если сервер не получает сердцебиения в течение 30 секунд, экземпляр удаляется из списка сервисов. По умолчанию сервер Eureka отправляет сердцебиение каждые 30 секунд и удаляет просроченные экземпляры через 90 секунд. Процесс очистки просроченных экземпляров выполняется каждые 60 секунд.
Перезапуск
При запуске Spring Cloud Eureka во время инициализации EurekaServerBootstrap#initEurekaServerContext вызывается PeerAwareInstanceRegistryImpl#syncUp для синхронизации данных с другими Eureka серверами.
Разделение мозга
После разделения мозга: сервисы в том же регионе, что и Eureka Server, могут нормально обращаться к нему, в то время как сервисы в других регионах автоматически истекают.
После восстановления после разделения мозга: принимаются запросы на сердцебиение от других серверов Eureka, и в этом случае есть три сценария:
Zuul — это открытый компонент, разработанный Netflix, предназначенный для решения проблем «шлюза».
Функции шлюза:
Использование Feign включает два этапа: создание динамического прокси Feign и выполнение Feign.
Feign по умолчанию реализуется с помощью ReflectiveFeign, создаваемого через Feign.Builder.
Балансировка нагрузки с использованием Feign интегрирована с Ribbon. Для реализации балансировки нагрузки необходимо обернуть Client, чтобы обеспечить балансировку нагрузки. Соответствующий код можно найти в RibbonClient и LBClient.
// RibbonClient основная логика
private final Client delegate;
private final LBClientFactory lbClientFactory;
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
// Создание RibbonRequest, включая Client, Request и uri
LBClient.RibbonRequest ribbonRequest =
new LBClient.RibbonRequest(delegate, request, uriWithoutHost);
// executeWithLoadBalancer реализует балансировку нагрузки
return lbClient(clientName).executeWithLoadBalancer(
ribbonRequest,
new FeignOptionsClientConfig(options)).toResponse();
} catch (ClientException e) {
propagateFirstIOException(e);
throw new RuntimeException(e);
}
}
По сути, это просто упаковка объекта Client, которая использует executeWithLoadBalancer для балансировки нагрузки, предоставляемой Ribbon. На основе Feign реализуется разрыв цепи
Разрыв цепи и ограничение потока на основе Feign (интеграция с Hystrix). Чтобы ограничить поток, необходимо перехватить выполнение метода, то есть переписать InvocationHandlerFactory, и выполнить разрыв цепи и ограничить поток перед выполнением метода. Соответствующий код можно найти в HystrixFeign, который фактически реализует HystrixInvocationHandler.
// HystrixInvocationHandler основная логика
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
HystrixCommand<Object> hystrixCommand =
new HystrixCommand<>(setterMethodMap.get(method)) {
@Override
protected Object run() throws Exception {
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
}
@Override
protected Object getFallback() {
};
}
...
return hystrixCommand.execute();
}
Общий процесс кодирования параметров Feign
Рисунок: Общий процесс кодирования параметров Feign
Резюме: Первые два шага — это этап создания прокси-сервера Feign, анализ параметров метода и аннотаций метаданных. Последние три шага — этап вызова, преобразование параметров метода в формат данных HTTP-запроса.
Contract интерфейс преобразует каждый метод интерфейса UserService и его аннотации в MethodMetadata, а затем использует RequestTemplate#request для создания запроса. RequestTemplate#request создаёт запрос, после чего можно вызвать Client#execute для отправки HTTP-запроса. У клиента есть конкретные реализации, такие как HttpURLConnection, Apache HttpComponnets, OkHttp3 и Netty. В этой статье основное внимание уделяется первым трём шагам: анализу метаданных методов Feign и процессу кодирования параметров.
Процесс сборки Feign в целом
Рисунок: Процесс сборки Feign в целом
Резюме: OpenFeign имеет два входа для сборки:
@EnableAutoConfiguration автоматическая сборка (spring.factories):
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.openfeign.ribbon.FeignRibbonClientAutoConfiguration,\
org.springframework.cloud.openfeign.FeignAutoConfiguration
@EnableFeignClients автоматический поиск
@EnableFeignClients вводит FeignClientsRegistrar, который запускает автоматический поиск и упаковывает интерфейс, помеченный @FeignClient, в объект FeignClientFactoryBean, и, наконец, генерирует прокси-объект этого интерфейса через Feign.Builder. По умолчанию конфигурация Feign.Builder — FeignClientsConfiguration, которая вводится через FeignAutoConfiguration.
Примечание: разрыв цепи и балансировка нагрузки выполняются FeignAutoConfiguration путём внедрения HystrixTargeter и FeignRibbonClientAutoConfiguration соответственно.
Прочее
Feign — декларативный клиент веб-службы.
Рисунок: Введение Feign
Feign использует динамический прокси:
Рисунок: Feign Рисунок: Удаленный вызов Feign
Лента
Лента — компонент Netflix с открытым исходным кодом для балансировки нагрузки.
Рисунок: введение ленты
Роль ленты на стороне потребителя услуг заключается в балансировке нагрузки, и по умолчанию используется алгоритм балансировки нагрузки Round Robin. Лента получает соответствующую информацию о регистрации служб из Eureka Service Discovery, а затем знает местоположение соответствующих служб. Затем лента выбирает машину в соответствии с настроенным алгоритмом балансировки нагрузки, и Feigin создаёт и отправляет запросы на эти машины, как показано на рисунке ниже.
Рисунок: Лента Рисунок: Правила ленты
Гистрикс
Hystrix — проект с открытым исходным кодом от Netflix, предоставляющий функцию разрыва цепи, которая может предотвратить каскадные сбои в распределённых системах.
Рисунок: Введение Hystrix
Hystrix изолирует, разрывает цепь и понижает уровень обслуживания. Проще говоря, Hystrix создаст много небольших пулов потоков, которые будут запрашивать услуги и возвращать результаты. Hystrix действует как промежуточный фильтр. Если наша служба токенов не работает, мы сразу же вернёмся без ожидания тайм-аута, чтобы вызвать исключение, что называется разрывом цепи. Но мы не можем просто ничего не делать и вернуться, иначе мы не сможем вручную добавить токены позже. Поэтому мы записываем сообщение в базу данных каждый раз, когда вызываем службу токенов, что называется понижением уровня обслуживания. Весь процесс изоляции, разрыва цепи и понижения уровня обслуживания Hystrix выглядит следующим образом:
Рисунок: Hystrix Рисунок: Разрыв цепи Hystrix
Шлюз
Spring Cloud Gateway — шлюз, разработанный Spring на основе Spring 5.0, Spring Boot 2.0 и Project Reactor. Spring Cloud Gateway предназначен для обеспечения простого, эффективного и унифицированного способа управления API-маршрутизацией для микросервисной архитектуры.
Рисунок: Введение шлюза
Config
В Spring Cloud предоставляется распределённая конфигурация Spring Cloud Config, которая поддерживает клиентскую и серверную части. Bus
Используя Spring Cloud Bus, можно легко построить шину сообщений.
OAuth2
В микросервисной системе, построенной с помощью Spring Cloud, можно использовать Spring Cloud OAuth2 для защиты микросервисной системы.
Sleuth
Spring Cloud Sleuth — это компонент Spring Cloud. Его основная функция заключается в предоставлении решения для отслеживания сервисных цепочек в распределённых системах.
Mybatis — это полуобъектно-реляционный картограф (ORM). Он инкапсулирует JDBC, загружает драйверы, создаёт соединения и создаёт операторы. Разработчикам нужно только сосредоточиться на написании SQL-запросов. Можно строго контролировать производительность выполнения SQL и обеспечивать гибкость.
Как полуобъектно-реляционная система, Mybatis может использовать XML или аннотации для настройки и сопоставления исходных данных. Он преобразует записи POJO в базу данных, избегая почти всех JDBC-кодов и ручного задания параметров и получения результатов.
Различные операторы настраиваются через файлы XML или аннотаций, а затем сопоставляются с параметрами Java-объектов и операторов SQL для создания окончательных выполняемых SQL-операторов. Наконец, Mybatis выполняет SQL и отображает результаты в Java-объекты и возвращает их.
Поскольку Mybatis фокусируется на SQL, он обеспечивает высокую гибкость и подходит для проектов с высокими требованиями к производительности или частыми изменениями требований, таких как интернет-проекты.
Преимущества MyBatis:
— Основан на программировании SQL-запросами, что обеспечивает гибкость и не влияет на существующий дизайн приложений или баз данных. SQL пишется в XML, что позволяет отделить SQL от кода программы и упрощает централизованное управление. Предоставляет XML-теги и поддерживает написание динамических SQL-запросов, которые можно повторно использовать.
— По сравнению с JDBC, объём кода уменьшается более чем на 50%, устраняя избыточный код JDBC. Не требуется вручную переключать соединения.
— Хорошая совместимость с различными базами данных (поскольку MyBatis использует JDBC для подключения к базе данных, он поддерживает все базы данных, поддерживаемые JDBC).
— Может хорошо интегрироваться со Spring.
— Предоставляет теги сопоставления, поддерживает объекты и отношения ORM с полями базы данных; предоставляет теги объектных отношений, поддерживает компоненты объектных отношений.
Недостатки MyBatis:
— Написание SQL-запросов требует больших усилий, особенно когда полей много или есть много связанных таблиц. Это требует от разработчиков хорошего понимания написания SQL-запросов.
— SQL-запросы зависят от базы данных, что приводит к плохой переносимости и невозможности легко менять базы данных.
#{} и ${} — в чём разница?
— ${}
— замена строки.
— #{}
— предварительная обработка.
Использование #{} может эффективно предотвратить внедрение SQL и повысить безопасность системы.
Ядро | Функция |
---|---|
Configuration | Сохраняет большую часть конфигурации MyBatis |
SqlSession | Основной API MyBatis для взаимодействия с базой данных и реализации функций базы данных |
Executor | Диспетчер MyBatis, отвечающий за генерацию SQL-запроса и обслуживание кеша запросов |
StatementHandler | Инкапсуляция JDBC, ответственная за операции с операторами JDBC, такими как установка параметров |
ParameterHandler | Преобразование пользовательских параметров в типы данных, соответствующие операторам JDBC |
ResultSetHandler | Отвечает за преобразование результатов JDBC ResultSet в коллекции List |
TypeHandler | Ответственный за отображение и преобразование между типами данных Java и типами данных JDBC (также известными как типы столбцов таблицы) |
MappedStatement | MappedStatement отвечает за упаковку одного узла <select|update|delete|insert> |
SqlSource | Отвечает за динамическое создание SQL-запросов на основе переданных параметров пользователя и упаковывает информацию в BoundSql |
BoundSql | Представляет динамически созданный SQL-запрос и соответствующую информацию о параметрах |
Принцип работы интерфейса Mapper заключается в использовании JDK динамического прокси для генерации прокси-объекта для интерфейса Mapper. Прокси-объект перехватывает методы интерфейса, находит соответствующий MapperStatement на основе класса полного имени + метода и вызывает исполнитель для выполнения соответствующего SQL. Затем он возвращает результат выполнения.
Методы в интерфейсе Mapper нельзя перегружать, поскольку используется стратегия сохранения и поиска на основе полного имени класса + имени метода.
Кэш MyBatis имеет два уровня: первый уровень и второй уровень. Первый уровень кэша включён по умолчанию и не может быть выключен. Второй уровень кэша можно включить или выключить.
Первый уровень кэша относится к уровню SqlSession. Когда один и тот же SQL-запрос выполняется в одном и том же сеансе SqlSession, последующие запросы будут извлекаться из кэша, а не из базы данных. Первый уровень кэша может хранить до 1024 SQL-запросов. Второй уровень кеша является общим для разных сеансов SqlSession и может использоваться совместно. Первый уровень кеша использует PerpetualCache в качестве основной структуры данных, которая по сути является хэш-картой. Второй уровень кеша по умолчанию выключен.
Когда отправляется первый запрос SQL, результаты запроса записываются в кэш первого уровня сеанса SqlSession. Структура данных кэша представляет собой карту, где ключ состоит из следующих элементов:
— Идентификатор MapperID. — Смещение offset. — Лимит limit. — SQL. — Все параметры ввода.
Значение представляет собой информацию о пользователе.
Если в промежутке между двумя запросами происходит операция фиксации (изменение, добавление или удаление), то весь кэш первого уровня в текущем сеансе SqlSession очищается. При следующем запросе данные необходимо будет извлечь из базы данных, а после извлечения записать обратно в кэш.
Область действия второго уровня кэша — уровень Mapper (один и тот же Mapper в одном пространстве имён). Mapper создаёт структуру данных кэша на уровне пространства имён. Кэш реализован через CacheExecutor, который фактически является прокси-объектом Executor. Все запросы сначала проверяются на наличие в кэше, и если они отсутствуют, то запрашиваются из базы данных. Если вы знакомы с веб-разработкой, то должны знать о cookie и session.
Session в MyBatis похож на концепцию session в веб-разработке. Данные действительны в течение сессии (или встречи). После завершения сессии данные удаляются.
Также можно рассматривать SqlSession как соединение с базой данных (но это не совсем верно, потому что после создания SqlSession, если не выполнять SQL, соединение бессмысленно, поэтому соединение устанавливается при выполнении SQL).
SqlSession определяет только некоторые методы выполнения SQL, а их реализация осуществляется дочерними классами, например, классом DefaultSqlSession. В MyBatis выполнение SQL происходит через Executor, который создаётся при открытии SqlSession.
Executor — это интерфейс, определяющий упаковку JDBC. MyBatis предоставляет несколько исполнителей: CacheExecutor, ClosedExecutor и SimpleExecutor.
CacheExecutor — это прокси-класс Executor, содержащий делегат, который нужно создать вручную (выбрать один из трёх вариантов: simple, reuse или batch). ClosedExecutor генерирует исключения для всех интерфейсов, указывая на закрытый Executor. При создании SqlSession по умолчанию используется SimpleExecutor.
Executor упаковывает JDBC. Когда мы используем JDBC для выполнения SQL, обычно сначала предварительно обрабатываем SQL, получая PreparedStatement, а затем вызываем executeXxx() для выполнения SQL. То есть Executor также создаёт объект Statement при выполнении SQL.
StatementHandler в JDBC вызывает Statement.executeXxx() для выполнения SQL. В MyBatis также используется Statement.executeXxx(), поэтому необходимо упомянуть StatementHandler. Его можно считать рабочим, выполняющим следующие задачи:
ParameterHandler отвечает за установку параметров после предварительной обработки SQL. Существует DefaultParameterHandler.
После выполнения statement.execute() можно получить результат с помощью statement.getResultSet(). Затем MyBatis использует ResultSetHandler для преобразования данных результата в объекты Java (ORM-отображение). Существует класс реализации DefaultResultHandler, который переопределяет метод handlerResultSets.
TypeHandler используется в двух местах:
# CentOS
yum install nginx;
# Ubuntu
sudo apt-get install nginx;
# Mac
brew install nginx;
После установки можно просмотреть информацию об установке Nginx с помощью команды rpm \-ql nginx
.
# Файлы конфигурации Nginx
/etc/nginx/nginx.conf # основной файл конфигурации
/etc/nginx/nginx.conf.default
# Исполняемые файлы
/usr/bin/nginx-upgrade
/usr/sbin/nginx
# Библиотеки Nginx
/usr/lib/systemd/system/nginx.service # для настройки системных служб
/usr/lib64/nginx/modules # каталог модулей Nginx
# Документация
/usr/share/doc/nginx-1.16.1
/usr/share/doc/nginx-1.16.1/CHANGES
/usr/share/doc/nginx-1.16.1/README
/usr/share/doc/nginx-1.16.1/README.dynamic
/usr/share/doc/nginx-1.16.1/UPGRADE-NOTES-1.6-to-1.10
# Каталоги статических ресурсов
/usr/share/nginx/html/404.html
/usr/share/nginx/html/50x.html
/usr/share/nginx/html/index.html
# Журналы Nginx
/var/log/nginx
Основные папки для внимания:
/etc/nginx/conf.d/
— папка для хранения дочерних конфигураций, /etc/nginx/nginx.conf
по умолчанию включает все файлы конфигурации в этой папке./usr/share/nginx/html/
— здесь размещаются статические файлы, но можно выбрать другое место в зависимости от предпочтений.Обычно настраиваются в файле /etc/nginx/nginx.conf
.
# Включить автозапуск при загрузке
systemctl enable nginx
# Отключить автозапуск при загрузке
systemctl disable nginx
# Запустить Nginx
systemctl start nginx # после успешного запуска Nginx можно напрямую обращаться к IP хоста, где будет отображаться страница по умолчанию Nginx
# Остановить Nginx
systemctl stop nginx
# Перезапустить Nginx
systemctl restart nginx
# Перегрузить конфигурацию Nginx без перезапуска
systemctl reload nginx
# Проверить статус Nginx
systemctl status nginx
# Просмотреть процессы Nginx
ps -ef | grep nginx
# Убить процесс Nginx
kill -9 pid # найти PID процесса Nginx и завершить его, -9 означает принудительное завершение процесса
# Запуск
nginx -s start
# Отправить сигнал главному процессу для перезагрузки конфигурации и горячего перезапуска
nginx -s reload
# Перезапуск Nginx
nginx -s reopen
# Быстрое закрытие
nginx -s stop
# Ожидание завершения работы процессов перед закрытием
nginx -s quit
# Просмотр текущей окончательной конфигурации Nginx
nginx -T
# Проверка конфигурации на наличие проблем
nginx -t
Структура уровней Nginx выглядит следующим образом:
location = / {
# Полное соответствие =
# Чувствительность к регистру ~
# Игнорирование регистра ~*
}
location ^~ /images/ {
# Соответствие началу ~^
# Можно использовать регулярные выражения, такие как:
# location ~* \.(gif|jpg|png)$ { }
}
location / {
# Если предыдущие правила не подходят, выполняется здесь
}
location
используется для сопоставления URI.
Используется для определения информации о вышестоящих серверах (то есть серверах приложений, предоставляемых бэкендом).
upstream test_server{
server 192.168.100.33:8081
}
``` **Прокси-проход**
Синтаксис: proxy_pass URL;
Контекст: location, if, limit_except
Пример:
proxy_pass http://127.0.0.1:8081
proxy_pass http://127.0.0.1:8081/proxy
URL параметр:
* URL должен начинаться с http или https;
* URL может содержать переменные;
* Наличие URI в URL напрямую влияет на URL запроса, отправляемого на вышестоящий сервер.
Разница между URL с / и без /:
* Без / Nginx не изменяет URL пользователя, а просто передаёт его вышестоящему серверу приложения;
* С / Nginx изменяет URL пользователя, удаляя из него часть после location.
Пример: пользователь запрашивает URL /bbs/abc/test.html. Запрос достигает вышестоящего сервера приложений с URL /bbs/abc/test.html, если используется proxy_pass http://127.0.0.1:8080, и с URL /abc/test.html при использовании proxy_pass http://127.0.0.1:8080/.
**Сценарии использования**
**Обратный прокси**
Пример 1:
* /etc/nginx/conf.d/proxy.conf
* 1.1 Имитация обслуживаемого сервиса
server {
listen 8080;
server_name localhost;
location /proxy/ {
root /usr/share/nginx/html/proxy;
index index.html;
}
}
* 1.2 /usr/share/nginx/html/proxy/index.html
<h1> 121.42.11.34 proxy html </h1>
Пример 2:
* /etc/nginx/conf.d/proxy.conf
* 2.1 Сервер бэкенда
upstream back_end {
server 121.42.11.34:8080 weight=2 max_conns=1000 fail_timeout=10s max_fails=3;
keepalive 32;
keepalive_requests 80;
keepalive_timeout 20s;
}
* 2.2 Конфигурация прокси
server {
listen 80;
# vim /etc/hosts для добавления записи: 121.5.180.193 proxy.lion.club
server_name proxy.lion.club;
location /proxy {
proxy_pass http://back_end/proxy;
}
}
**Балансировка нагрузки**
Пример 1:
* /etc/nginx/conf.d/balance.conf
* 1.1 Имитация первого обслуживаемого сервиса
server{
listen 8020;
location / {
return 200 'return 8020 \n';
}
}
* 1.2 Имитация второго обслуживаемого сервиса
server{
listen 8030;
location / {
return 200 'return 8030 \n';
}
}
* 1.3 Имитация третьего обслуживаемого сервиса
server{
listen 8040;
location / {
return 200 'return 8040 \n';
}
}
Пример 2:
* /etc/nginx/conf.d/balance.conf
* 2.1 Список серверов
upstream demo_server {
server 121.42.11.34:8020;
server 121.42.11.34:8030;
server 121.42.11.34:8040;
}
* 2.2 Конфигурация прокси
server {
listen 80;
server_name balance.lion.club;
location /balance/ {
proxy_pass http://demo_server;
}
}
Алгоритм хеширования:
upstream demo_server {
hash $request_uri;
server 121.42.11.34:8020;
server 121.42.11.34:8030;
server 121.42.11.34:8040;
}
Сервер:
listen 80;
server_name balance.lion.club;
location /balance/ {
proxy_pass http://demo_server;
}
Запрос всегда направляется на один и тот же сервер, пока сохраняется значение request_uri.
Алгоритм IP-хеширования:
upstream demo_server {
ip_hash;
server 121.42.11.34:8020;
server 121.42.11.34:8030;
server 121.42.11.34:8040;
}
Распределение запросов по серверам осуществляется на основе IP-адреса клиента. Это помогает решить проблемы с сохранением сессий на бэкенд-серверах.
Наименьший алгоритм соединения:
upstream demo_server {
zone test 10M; # zone можно установить имя и размер совместно используемой памяти
least_conn;
server 121.42.11.34:8020;
server 121.42.11.34:8030;
server 121.42.11.34:8040;
}
Каждый рабочий дочерний процесс считывает данные из общей памяти для получения информации о серверах. Затем выбирается сервер с наименьшим количеством установленных соединений для обработки запроса.
**Кэширование конфигурации**
① proxy_cache
Сохраняет некоторые ранее посещённые и потенциально повторно посещаемые ресурсы, чтобы пользователи могли получать их непосредственно от прокси-сервера, снижая нагрузку на вышестоящий сервер и ускоряя доступ.
② proxy_cache_path
Определяет путь к файлам кэша.
Параметры:
* path — путь к файлу кэша;
* level — уровень каталога;
* keys_zone — настройка общей памяти;
* inactive — время, в течение которого кэш будет очищен, если он не был посещён, по умолчанию 10 минут.
③ proxy_cache_key
Устанавливает ключ для файла кэша.
По умолчанию: proxy_cache_key $scheme$proxy_host$request_uri;.
④ proxy_cache_valid
Настраивает, какие коды состояния могут быть кэшированы и на какой срок.
Например: proxy_cache_valid 200 304 2m; означает, что файлы кэша со статусом 200 и 304 будут действительны в течение 2 минут. **Определение условий сохранения в кэш и условий, при которых ответ не будет сохранён в кэш**
Определяются условия, при которых ответ от сервера не будет сохраняться в кэш.
```nginx
Синтаксис: proxy_no_cache string;
Контекст: http, server, location
Пример: proxy_no_cache $http_pragma $http_authorization;
Определение условий, при которых ответ будет обходить кэш
Определяется условие, при котором ответ от сервера будет обходить кэш.
Синтаксис: proxy_cache_bypass string;
Контекст: http, server, location
Пример: proxy_cache_bypass $http_pragma $http_authorization;
Переменная upstream_cache_status
Хранит информацию о том, было ли попадание в кэш, полезна для отладки.
Значения:
proxy_cache_path /etc/nginx/cache_temp levels=2:2 keys_zone=cache_zone:30m max_size=2g inactive=60m use_temp_path=off;
upstream cache_server {
server 121.42.11.34:1010;
server 121.42.11.34:1020;
}
server {
listen 80;
server_name cache.lion.club;
# Сценарий 1: требования к оперативности невысоки, настраиваем кэш
location /demo {
proxy_cache cache_zone; # настройка кэша памяти, уже определённого выше
proxy_cache_valid 200 5m; # кэш состояния запросов со статусом 200 на 5 минут
proxy_cache_key $request_uri; # ключ кэширования файлов — URI запроса
add_header Nginx-Cache-Status $upstream_cache_status # установка статуса кэша в заголовке ответа клиенту
proxy_pass http://cache_server; # проксирование
}
# Сценарий 2: требования к оперативности очень высоки, настраиваем отсутствие кэша
# Если URI заканчивается на .txt или .text, устанавливаем значение переменной «no cache»
if ($request_uri ~ \\.(txt|text)$) {
set $cache_name "no cache"
}
location /test {
proxy_no_cache $cache_name; # если переменная имеет значение, то кэш не используется, иначе используется
proxy_cache cache_zone; # настройка кэша памяти
proxy_cache_valid 200 5m; # кэш состояния запросов со статусом 200 на 5 минут
proxy_cache_key $request_uri; # ключ кэширования файлов — URI запроса
add_header Nginx-Cache-Status $upstream_cache_status # установка статуса кэша в заголовке ответа клиенту
proxy_pass http://cache_server; # проксирование
}
}
Скачивается сжатый файл сертификата, который содержит папку Nginx. Файлы xxx.crt и xxx.key копируются в каталог сервера, после чего выполняется следующая конфигурация:
server {
listen 443 ssl http2 default_server; # порт доступа SSL равен 443
server_name lion.club; # доменное имя, связанное с сертификатом (здесь произвольное)
ssl_certificate /etc/nginx/https/lion.club_bundle.crt; # путь к сертификату
ssl_certificate_key /etc/nginx/https/lion.club.key; # путь к приватному ключу
ssl_session_timeout 10m;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2; # поддержка версий протокола SSL, по умолчанию последние три, основная версия [TLSv1.2]
location / {
root /usr/share/nginx/html;
index index.html index.htm;
}
}
В папке /etc/nginx/conf.d/
создаётся новый конфигурационный файл gzip.conf
:
# По умолчанию off, включение сжатия gzip
gzip on;
# MIME-типы файлов, которые должны быть сжаты, text/html принудительно включён
gzip_types text/plain text/css application/json application/x-javascript text/xml application/xml application/xml+rss text/javascript;
# ------- Вышеуказанные два параметра включены, сжатие gzip поддерживается --------
# По умолчанию off, модуль включён, Nginx сначала проверяет наличие файла с расширением .gz для статических файлов, если есть, возвращает содержимое файла .gz
gzip_static on;
# По умолчанию off, включается при использовании Nginx в качестве обратного прокси, используется для настройки включения или отключения сжатия gzip из содержимого прокси-сервера
gzip_proxied any;
# Используется для добавления заголовка Vary: Accept-Encoding в ответ, чтобы прокси-сервер мог определить, следует ли использовать сжатие gzip на основе заголовка Accept-Encoding запроса
gzip_vary on;
# Уровень сжатия, 1–9, где 1 — самый низкий уровень сжатия, а 9 — самый высокий, чем выше уровень, тем больше степень сжатия и время сжатия, рекомендуется 4–6
gzip_comp_level 6;
# Объём памяти, используемый для кэширования результатов сжатия, 16 8k означает получение 8k * 16 единиц
gzip_buffers 16 8k;
# Минимальный размер страницы, разрешённый для сжатия, размер страницы получается из заголовка Content-Length. Значение по умолчанию равно 0, независимо от размера страницы она будет сжата. Рекомендуется установить значение больше 1k байт, меньше 1k может привести к увеличению размера после сжатия
gzip_min_length 1k;
# Минимальная версия HTTP, необходимая для включения сжатия gzip, значение по умолчанию — 1.1
gzip_http_version 1.1;
server {
# Стандартный протокол HTTP
listen 80;
# Стандартный протокол HTTPS
listen 443 ssl;
# Для http2
listen 443 ssl http2;
# Прослушивание порта 80 с использованием IPv6
listen [::]:80;
# Только прослушивание с использованием IPv6
listen [::]:80 ipv6only=on;
}
server {
# Относительный или полный путь к файлу журнала
access_log /path/to/file.log;
# Включить 'on' или 'off'
access_log on;
}
server {
# Слушаю yourdomain.com
server_name yourdomain.com;
# Слушать несколько доменов server_name yourdomain.com www.yourdomain.com;
# Слушайте все домены
server_name *.yourdomain.com;
# Слушайте все корневые домены верхнего уровня
server_name yourdomain.*;
# Слушайте неуказанные имена хостов (прослушивает IP-адрес напрямую)
server_name "";
}
server {
listen 80;
server_name yourdomain.com;
location / {
root ### Перенаправление
```nginx
server {
listen 80;
server_name www.yourdomain.com;
return 301 http://yourdomain.com$request_uri;
}
server {
listen 80;
server_name www.yourdomain.com;
location /redirect-url {
return 301 http://otherdomain.com;
}
}
server {
listen 80;
server_name yourdomain.com;
location / {
proxy_pass http://0.0.0.0:3000;
# где 0.0.0.0:3000 — ваш сервер приложений (например, node.js), привязанный к 0.0.0.0 и прослушивающий порт 3000
}
}
upstream node_js {
server 0.0.0.0:3000;
server 0.0.0.0:4000;
server 123.131.121.122;
}
server {
listen 80;
server_name yourdomain.com;
location / {
proxy_pass http://node_js;
}
}
server {
listen 443 ssl;
server_name yourdomain.com;
ssl on;
ssl_certificate /path/to/cert.pem;
ssl_certificate_key /path/to/privatekey.pem;
ssl_stapling on;
ssl_stapling_verify on;
ssl_trusted_certificate /path/to/fullchain.pem;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
ssl_session_timeout 1h;
ssl_session_cache shared:SSL:50m;
add_header Strict-Transport-Security max-age=15768000;
}
# Постоянное перенаправление с HTTP на HTTPS
server {
listen 80;
server_name yourdomain.com;
return 301 https://$host$request_uri;
}
# Подсчёт IP-адресов посещений
awk '{print $1}' access.log | sort -n | uniq | wc -l
# Просмотр количества посещений за определённый промежуток времени (с 4 до 5 часов)
grep "07/Apr/2017:0[4-5]" access.log | awk '{print $1}' | sort | uniq -c| sort -nr | wc -l
# Поиск 100 наиболее частых IP-адресов посетителей
awk '{print $1}' access.log | sort -n |uniq -c | sort -rn | head -n 100
# Поиск IP-адресов, которые посещали сайт более 100 раз
awk '{print $1}' access.log | sort -n |uniq -c |awk '{if($1 >100) print $0}'|sort -nr
# Получение подробной информации о посещениях определённого IP-адреса, отсортированных по частоте посещений
grep '104.217.108.66' access.log |awk '{print $7}'|sort |uniq -c |sort -rn |head -n 100
# Статистика посещений страниц
# Топ-100 самых посещаемых страниц
awk '{print $7}' access.log | sort |uniq -c | sort -nr | head -n 100
# Топ-100 самых посещаемых не PHP-страниц
grep -v ".php" access.log | awk '{print $7}' | sort |uniq -c | sort -nr | head -n 100
# Страницы, которые посетили более 100 раз
cat access.log | cut -d ' ' -f 7 | sort |uniq -c | awk '{if ($1 > 100) print $0}' | less
# Последние 1000 записей в журнале, с указанием наиболее посещаемых страниц
tail -1000 access.log |awk '{print $7}'|sort|uniq -c|sort -nr|less
# Количество запросов в секунду, топ-100 временных точек (точность до секунды)
awk '{print $4}' access.log |cut -c 14-21|sort|uniq -c|sort -nr|head -n 100
# Количество запросов за минуту, топ-100 временных точек (точность до минуты)
awk '{print $4}' access.log |cut -c 14-18|sort|uniq -c|sort -nr|head -n 100
# Количество запросов за час, топ-100 временных точек (точность до часа)
awk '{print $4}' access.log |cut -c 14-15|sort|uniq -c|sort -nr|head -n 100
# Анализ производительности
# Список страниц с временем передачи более 3 секунд, первые 20 записей
cat access.log|awk '($NF > 3){print $7}'|sort -n|uniq -c|sort -nr|head -20
# Список PHP-страниц с временем запроса более 3 секунд, количество появлений, первые 100 записей
cat access.log|awk '($NF > 1 && $7~/\\.php/){print $7}'|sort -n|uniq -c|sort -nr|head -100
# Счётчик TCP-соединений
# Общее количество текущих TCP-соединений
netstat -tan | grep "ESTABLISHED" | grep ":80" | wc -l
# Использование tcpdump для анализа 80 порта и определения наиболее активных IP-адресов
tcpdump -i eth0 -tnn dst port 80 -c 1000 | awk -F"." '{print $1"."$2"."$3"."$4}' | sort | uniq -c | sort -nr
# Подсчёт уникальных IP-адресов в файле журнала
awk '{print $1}' $logpath |sort -n|uniq|wc -l
# IP-адрес, который посетил сайт в определённое время
sed -n '/22\/Jun\/2017:1[5]/,/22\/Jun\/2017:1[6]/p' $logpath|awk '{print $1}'|sort -n|uniq|wc -l
# Посетители, которые были на сайте более 100 раз
awk '{print $1}' $logpath|sort -n|uniq -c|awk '{if($1>100) print $0}'|sort -rn
# Наиболее частые запросы (ТОП-50)
awk '{print $7}' $logpath |sort |uniq -c|sort -rn |head -n 50
# Количество запросов в секунду (ТОП-50)
awk '{print $4}' $logpath|cut -c 14-21|sort |uniq -c|sort -nr|head -n 50
# Количество запросов за минуту (ТОП-50)
awk '{print $4}' $logpath|cut -c 14-18|sort|uniк -c|sort -nr|head -n 50
# Количество запросов за час (ТОП-50)
awk '{print $4}' $logpath|cut -c 14-15|sort|uniк -c|sort -nr|head -n 50
# Запросы, время передачи которых превышает 1 секунду (ТОП-20)
cat $logpath|awk '($NF > 1){print $7}'|sort -n|uniк -c|sort -nr|head -20
LVS
LVS — это аббревиатура от Linux Virtual Server, что означает виртуальный серверный кластер на базе Linux.
У LVS есть три основных режима балансировки нагрузки:
ENAT-режим (Enhance NAT). ENAT-режим также называют треугольным режимом или DNAT/SNAT-режимом.
Принцип работы
Преимущества
Недостатки
Анализ
Почему ответный пакет в ENAT не должен проходить через LVS?
В режиме full NAT ответному пакету необходимо пройти через LVS для повторного изменения IP. В ENAT это делается на RS с помощью модуля Ctk.
Почему LVS и RS могут находиться в разных VLAN?
Как и в случае с full NAT.
Структура ENAT
Диспетчеризация основана только на алгоритме, без учёта состояния реальных серверов и нагрузки.
RR: циклическая диспетчеризация (Round Robin). Диспетчер распределяет внешние запросы по очереди между серверами кластера. Все серверы рассматриваются одинаково, независимо от их текущего соединения и загрузки системы.
WRR: взвешенная циклическая диспетчеризация (Weight RR). Диспетчер использует алгоритм «взвешенной циклической диспетчеризации» для распределения запросов на основе мощности обработки реальных серверов. Это позволяет серверам с лучшей производительностью обрабатывать больше трафика. Диспетчер может автоматически запрашивать информацию о нагрузке реальных серверов и динамически корректировать их веса.
DH: диспетчеризация по целевому адресу (Destination Hash). На основе целевого IP-адреса запроса диспетчер выбирает соответствующий сервер из статически распределённого списка. Если сервер доступен и не перегружен, запрос отправляется на этот сервер. Иначе возвращается пустое значение.
SH: диспетчеризация по исходному адресу (Source Hash). Исходный IP-адрес запроса используется в качестве ключа хеширования для выбора соответствующего сервера из статического списка. Если сервер доступен и не перегружен, запрос направляется на этот сервер, иначе возвращается пустое значение.
LC: диспетчеризация с наименьшим количеством соединений (Least Connections). Диспетчер динамически направляет сетевые запросы на сервер с наименьшим числом установленных соединений. Если реальные серверы в кластере имеют схожую производительность, этот алгоритм обеспечивает равномерное распределение нагрузки.
WLC: взвешенная диспетчеризация с наименьшим количеством соединений (Weighted Least Connections, по умолчанию). В системах с большим разбросом производительности между реальными серверами, этот алгоритм оптимизирует балансировку нагрузки путём использования весов. Серверы с более высоким весом будут обрабатывать большую долю активных соединений. Диспетчер автоматически запрашивает информацию о загрузке реальных серверов и динамически корректирует их веса.
SED: диспетчеризация с кратчайшим ожиданием (Shortest Expected Delay). Улучшение на основе WLC. Overhead = (ACTIVE + 1) * 256 / вес. Не учитывает неактивные состояния. Текущее количество активных серверов + 1 используется для принятия следующего запроса. Минимальное число получает следующий запрос. +1 предназначен для учёта весов при перегрузке.
NQ: никогда не стоять в очереди / диспетчеризация с наименьшей очередью (Never Queue Scheduling NQ). Нет необходимости в очереди. Если один из реальных серверов имеет 0 подключений, запрос сразу направляется туда. SED не рассматривается.
LBLC: локальная диспетчеризация с наименьшим количеством соединений (locality-Based Least Connections). Алгоритм диспетчеризации на основе локальности для балансировки нагрузки по целевым IP-адресам. Обычно используется в системах кэширования. Он выбирает ближайший сервер на основе целевого IP-адреса. Если этот сервер доступен и не перегружен, запрос отправляется туда. Если сервера нет или он перегружен и есть сервер с половиной рабочей нагрузки, выбирается сервер с наименьшим соединением.
LBLCR: локальная диспетчеризация с копированием и наименьшим количеством соединений (Locality-Based Least Connections with Replication). Аналогичен LBLC, но поддерживает список серверов для каждого целевого IP-адреса вместо одного сервера. Если целевой IP-адрес имеет перегруженный сервер, новый сервер добавляется в список, и запрос направляется к нему. Когда группа серверов не обновляется в течение некоторого времени, самый загруженный сервер удаляется из списка для снижения уровня репликации. Пятый шаг: перед проведением теста на доступ необходимо убедиться, что все узлы бэкенда доступны по отдельности.
Тест на связность. Узлы бэкенда.
[root@lb01 conf]# curl -H host:www.etiantian.org 10.0.0.8
web01 www
[root@lb01 conf]# curl -H host:www.etiantian.org 10.0.0.7
web02 www
[root@lb01 conf]# curl -H host:www.etiantian.org 10.0.0.9
web03 www
[root@lb01 conf]# curl -H host:bbs.etiantian.org 10.0.0.9
web03 bbs
[root@lb01 conf]# curl -H host:bbs.etiantian.org 10.0.0.8
web01 bbs
[root@lb01 conf]# curl -H host:bbs.etiantian.org 10.0.0.7
web02 bbs
Шестой шаг: просмотр состояния виртуального IP-адреса.
[root@lb01 conf]# ip a
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP qlen 1000
link/ether 00:0c:29:90:7f:0d brd ff:ff:ff:ff:ff:ff
inet 10.0.0.5/24 brd 10.0.0.255 scope global eth0
inet 10.0.0.3/24 scope global secondary eth0:1
inet6 fe80::20c:29ff:fe90:7f0d/64 scope link
valid_lft forever preferred_lft forever
3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP qlen 1000
link/ether 00:0c:29:90:7f:17 brd ff:ff:ff:ff:ff:ff
inet 172.16.1.5/24 brd 172.16.1.255 scope global eth1
inet6 fe80::20c:29ff:fe90:7f17/64 scope link
valid_lft forever preferred_lft forever
Седьмой шаг: [резюме] изменение файла конфигурации.
Keepalived главный и резервный файлы конфигурации отличаются:
HAProxy обеспечивает высокую доступность, балансировку нагрузки и проксирование на основе TCP и HTTP-приложений. Он является бесплатным, быстрым и надёжным решением. HAProxy особенно подходит для веб-сайтов с высокой нагрузкой, которым обычно требуется сохранение сеансов или семиуровневая обработка. HAProxy работает на текущем оборудовании и может поддерживать тысячи одновременных подключений. Его режим работы делает его простым, безопасным и легко интегрируемым в существующую архитектуру, защищая при этом ваши веб-серверы от раскрытия в сети.
Так называемая четырёхслойная относится к четвёртому уровню эталонной модели ISO. Четырёхслойный балансировщик нагрузки также называется четырёхслойным коммутатором. Он в основном основан на анализе потоков на уровне IP и TCP/UDP для реализации балансировки нагрузки на основе IP и порта. Распространёнными четырёхслойными балансировщиками нагрузки являются LVS и F5.
Четырёхслойная балансировка нагрузки использует информацию об IP-адресе и номере порта для распределения трафика между серверами. Это позволяет эффективно распределять нагрузку между несколькими серверами, обеспечивая высокую производительность и доступность системы.
На примере типичного TCP-приложения, когда клиент отправляет SYN-запрос на балансировщик нагрузки, последний выбирает оптимальный сервер на основе настроенного алгоритма балансировки нагрузки. Затем он изменяет целевой IP-адрес в пакете на IP-адрес выбранного сервера и перенаправляет пакет на этот сервер. Таким образом, запрос успешно направляется на выбранный сервер, и процесс балансировки завершается.
В некоторых стратегиях балансировки нагрузки для обеспечения корректной передачи пакетов от выбранного сервера обратно к клиенту, балансировщик может также изменять исходный IP-адрес пакета. Это делается для того, чтобы клиент мог получить ответ от правильного сервера.
Семислойный балансировщик нагрузки (также известный как семислойный коммутатор) работает на самом высоком уровне OSI — прикладном уровне. Он поддерживает различные протоколы приложений, такие как HTTP, FTP, SMTP и другие. Семислойные балансировщики нагрузки могут анализировать содержимое пакетов и использовать эту информацию для более точного распределения нагрузки между серверами на основе конкретных требований приложения. Например, для балансировки нагрузки веб-сервера семислойный балансировщик может учитывать URL-адреса, домены, браузеры и языки запросов при принятии решения о том, какой сервер должен обрабатывать конкретный запрос.
Распространёнными семислойными балансировщиками нагрузки являются HAproxy, Nginx и другие.
Здесь мы снова рассмотрим пример с TCP-приложением. В отличие от четырёхслойной балансировки, семислойному балансировщику необходимо получать содержимое пакета для принятия решения о выборе сервера. Поэтому он сначала устанавливает соединение с клиентом и сервером, а затем получает содержимое пакета от клиента. На основе этой информации и настроенного алгоритма балансировки он определяет, на какой внутренний сервер направить запрос. Этот процесс аналогичен работе прокси-сервера.
При сравнении процессов четырёхслойной и семислойной балансировок нагрузки можно заметить, что в случае семислойной балансировки нагрузка на оборудование балансировщика выше, но при этом обеспечивается более точное распределение нагрузки в соответствии с требованиями приложения.
Четырёхслойная и семислойная балансировки нагрузки
Сравнение процессов четырёхслойной и семислойной балансировок показывает, что при использовании семислойного подхода балансировщик устанавливает два соединения — с клиентом и с внутренним сервером. В то время как при четырёхслойном подходе устанавливается только одно соединение. Это означает, что требования к оборудованию для семислойной балансировки выше, а производительность ниже, чем у четырёхслойных систем.
Существует несколько стратегий балансировки нагрузки, которые используются в зависимости от требований и особенностей системы:
HAProxy и LVS — это два разных решения для балансировки нагрузки. Они имеют свои преимущества и недостатки, и выбор между ними зависит от конкретных потребностей и условий использования. Вот некоторые из основных различий между ними:
Таким образом, выбор между HAProxy и LVS зависит от ваших конкретных требований и условий эксплуатации.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )