Это статья, которую я долго откладывал и всё время забывал написать. В статье, возможно, будет много воды, но она подробно описывает, как написать небольшой «фреймворк». Этот лаконичный слой клея уже более полугода служит в производственной среде, здесь мы попытались убрать код, связанный с бизнес-логикой, и выделить относительно простую версию.
В предыдущих статьях упоминалось, что Canal анализирует события binlog MySQL после того, как они были получены (источник: исходный код Canal com.alibaba.otter.canal.protocol.FlatMessage):
<объект, полученный после анализа Canal событий binlog MySQL (источник: исходный код Canal com.alibaba.otter.canal.protocol.FlatMessage)>
Если напрямую анализировать этот исходный объект, то придётся писать много шаблонного кода, который будет меняться при любом изменении, а это нежелательно. Поэтому было потрачено некоторое время на создание слоя клея Canal, который преобразует полученные FlatMessage в соответствующие экземпляры DTO в соответствии с именем таблицы, что может повысить эффективность разработки и уменьшить количество шаблонного кода. Схема потока данных этого слоя клея выглядит следующим образом:
<Схема потока данных слоя клея>
Для создания такого слоя клея используются следующие методы:
Проект состоит из следующих модулей:
Далее будет подробно рассмотрен процесс реализации этого слоя клея.
Чтобы не загрязнять внешние зависимости сервисов, которые используют этот модуль, все зависимости, кроме тех, которые связаны с преобразованием JSON, определяются как scope provide или test, версии зависимостей и BOM приведены ниже:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spring.boot.version>2.3.0.RELEASE</spring.boot.version>
<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
<lombok.version>1.18.12</lombok.version>
<fastjson.version>1.2.73</fastjson.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
Здесь модуль canal-glue-core по сути зависит только от fastjson и может быть полностью отделён от системы Spring.
Здесь представлена «задним числом» архитектурная схема, поскольку изначально для быстрого запуска в производство первоначальная версия не учитывала многие аспекты, и даже включала в себя код, привязанный к бизнес-логике, компоненты были выделены позже:
<Архитектурная схема>
При разработке модуля конфигурации рассматривались два способа: использование внешних конфигурационных файлов и аннотаций. Первоначально использовался формат JSON для внешних конфигурационных файлов, а аннотации были добавлены позже. В этом разделе кратко описывается загрузка конфигурации с использованием формата JSON, анализ аннотаций оставлен для модуля обработчика.
Изначально целью было быстро разработать слой клея, поэтому конфигурация использовала формат JSON с высокой читаемостью:
{
"version": 1,
"module": "canal-glue",
"databases": [
{
"database": "db_payment_service",
"processors": [
{
"table": "payment_order",
"processor": "x.y.z.PaymentOrderProcessor",
"exceptionHandler": "x.y.z.PaymentOrderExceptionHandler"
}
]
},
{
......
}
]
}
В дизайне конфигурации рекомендуется избегать использования массивов JSON в качестве верхнего уровня конфигурации, так как это может привести к странному дизайну объектов.
Поскольку приложения, использующие этот модуль, могут обрабатывать события binlog для нескольких вышестоящих баз данных, конфигурация модуля должна быть организована таким образом, чтобы можно было легко найти информацию о каждой базе данных. Конфигурация включает в себя имя базы данных, обработчики таблиц и обработчики исключений для каждой таблицы. Затем эти данные преобразуются в соответствующие классы сущностей:
@Data
public class CanalGlueProcessorConf {
private String table;
private String processor;
private String exceptionHandler;
}
@Data
public class CanalGlueDatabaseConf {
private String database;
private List<CanalGlueProcessorConf> processors;
}
@Data
public class CanalGlueConf {
private Long version;
private String module;
private List<CanalGlueDatabaseConf> database;
}
После создания классов сущностей можно приступить к написанию загрузчика конфигурации, который загружает конфигурацию из пути ClassPath:
public interface CanalGlueConfLoader {
CanalGlueConf load(String location);
}
// Реализация
public class ClassPathCanalGlueConfLoader implements CanalGlueConfLoader {
``` **Чтение файла с абсолютным путём в `ClassPath` и преобразование его содержимого в объект `CanalGlueConf` с использованием `Fasfjson`.**
Этот код представляет собой реализацию по умолчанию, которая может быть перекрыта пользовательской реализацией при использовании модуля `canal-glue`.
**Разработка основных модулей:**
* **Основные модели:** определение уникального идентификатора для конкретной таблицы в базе данных.
* **Адаптеры:** разработка слоя адаптеров.
* **Конвертеры и парсеры:** разработка слоёв конвертеров и парсеров.
* **Обработчики:** разработка слоя обработчиков.
* **Глобальные компоненты:** разработка модуля автоматической конфигурации глобальных компонентов (ограничено системой Spring, извлечено в модуль `spring-boot-starter-canal-glue`).
* **`CanalGlue`:** разработка компонента `CanalGlue`.
**Определение основных моделей:**
Определение интерфейса верхнего уровня `ModelTable`, который служит для идентификации конкретной таблицы в определённой базе данных:
```java
// Модельная таблица объектов
public interface ModelTable {
String database();
String table();
static ModelTable of(String database, String table) {
return DefaultModelTable.of(database, table);
}
}
Класс DefaultModelTable
реализует интерфейс ModelTable
:
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
public class DefaultModelTable implements ModelTable {
private final String database;
private final String table;
@Override
public String database() {
return database;
}
@Override
public String table() {
return table;
}
// ...
}
Здесь класс DefaultModelTable
переопределяет методы equals()
и hashCode()
для удобства использования экземпляров ModelTable
в качестве ключей в контейнерах типа HashMap
. Это позволяет создать структуру кэша ModelTable -> Processor
.
Также определяется класс событий CanalBinLogEvent
, который похож на класс FlatMessage
:
@Data
public class CanalBinLogEvent {
/**
* Идентификатор события, не имеет практического значения
*/
private Long id;
/**
* Текущие данные узла после изменения
*/
private List<Map<String, String>> data;
/**
* Список имён столбцов первичного ключа
*/
private List<String> pkNames;
/**
* Предыдущие данные узла до изменения
*/
private List<Map<String, String>> old;
/**
* Тип: UPDATE\INSERT\DELETE\QUERY
*/
private String type;
/**
* Время выполнения binlog
*/
private Long es;
/**
* Отметка времени построения DML
*/
private Long ts;
/**
* Выполняемый SQL, необязательно существует
*/
private String sql;
/**
* Имя базы данных
*/
private String database;
/**
* Название таблицы
*/
private String table;
/**
* Отображение типов SQL
*/
private Map<String, Integer> sqlType;
/**
* Отображение типов MySQL
*/
private Map<String, String> mysqlType;
/**
* Является ли это операцией DDL
*/
private Boolean isDdl;
}
После анализа события создаётся объект результата CanalBinLogResult
:
// Константы
@RequiredArgsConstructor
@Getter
public enum BinLogEventType {
QUERY("QUERY", "запрос"),
INSERT("INSERT", "вставка"),
UPDATE("UPDATE", "обновление"),
DELETE("DELETE", "удаление"),
ALTER("ALTER", "изменение столбца"),
UNKNOWN("UNKNOWN", "неизвестно"),
;
private final String type;
private final String description;
public static BinLogEventType fromType(String type) {
for (BinLogEventType binLogType : BinLogEventType.values()) {
if (binLogType.getType().equals(type)) {
return binLogType;
}
}
return UNKNOWN;
}
}
// Константы
@RequiredArgsConstructor
@Getter
public enum OperationType {
DML("dml", "DML-оператор"),
DDL("ddl", "DDL-оператор"),
;
private final String type;
private final String description;
}
@Data
public class CanalBinLogResult<T> {
``` **Разработка адаптера слоя**
Определяется верхний слой адаптера SPI интерфейса:
```java
public interface SourceAdapter<SOURCE, SINK> {
SINK adapt(SOURCE source);
}
Далее разрабатывается адаптер реализации класса:
// Исходная строка возвращается напрямую
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
class RawStringSourceAdapter implements SourceAdapter<String, String> {
@Override
public String adapt(String source) {
return source;
}
}
// Преобразование Fastjson
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
class FastJsonSourceAdapter<T> implements SourceAdapter<String, T> {
private final Class<T> klass;
@Override
public T adapt(String source) {
if (StringUtils.isEmpty(source)) {
return null;
}
return JSON.parseObject(source, klass);
}
}
// Фасад
public enum SourceAdapterFacade {
/**
* Одиночный экземпляр
*/
X;
private static final SourceAdapter<String, String> I_S_A = RawStringSourceAdapter.of();
@SuppressWarnings("unchecked")
public <T> T adapt(Class<T> klass, String source) {
if (klass.isAssignableFrom(String.class)) {
return (T) I_S_A.adapt(source);
}
return FastJsonSourceAdapter.of(klass).adapt(source);
}
}
В конечном итоге можно напрямую использовать метод SourceAdapterFacade#adapt(), так как в большинстве случаев используется только исходная строка и String -> Class экземпляр, и дизайн адаптера слоя может быть упрощён.
Разработка преобразователя и анализатора слоя
Для события binlog, обработанного Canal, данные и old атрибуты представляют собой структуру K-V, а все ключи являются строковыми типами, необходимо перебрать и вывести полный целевой экземпляр.
В настоящее время поддерживаются только типы объектов-оболочек, исходные типы, такие как int, не поддерживаются.
Чтобы лучше сопоставить целевые сущности с фактическими базами данных, таблицами и столбцами, а также типами столбцов, были введены две пользовательские аннотации CanalModel и @CanalField, которые определяются следующим образом:
// @CanalModel
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface CanalModel {
/**
* Целевая база данных
*/
String database();
/**
* Целевая таблица
*/
String table();
/**
* Стратегия преобразования имени атрибута -> имени столбца, доступны следующие значения: DEFAULT (исходный), UPPER_UNDERSCORE (преобразование верблюжьего регистра в нижний регистр с подчёркиванием) и LOWER_UNDERSCORE (преобразование верблюжьего регистра в нижний регистр без подчёркивания)
*/
FieldNamingPolicy fieldNamingPolicy() default FieldNamingPolicy.DEFAULT;
}
// @CanalField
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface CanalField {
/**
* Имя строки
*
* @return columnName
*/
String columnName() default "";
/**
* Тип поля SQL
*
* @return JDBCType
*/
JDBCType sqlType() default JDBCType.NULL;
/**
* Тип конвертера
*
* @return klass
*/
Class<? extends BaseCanalFieldConverter<?>> converterKlass() default NullCanalFieldConverter.class;
}
Верхний слой преобразователя определяется интерфейсом BinLogFieldConverter:
public interface BinLogFieldConverter<SOURCE, TARGET> {
TARGET convert(SOURCE source);
}
На данный момент можно сопоставить только по типу целевого атрибута и типу SQL, указанному в аннотации, поэтому мы определяем абстрактный преобразователь BaseCanalFieldConverter:
public abstract class BaseCanalFieldConverter<T> implements BinLogFieldConverter<String, T> {
private final SQLType sqlType;
private final Class<?> klass;
protected BaseCanalFieldConverter(SQLType sqlType, Class<?> klass) {
this.sqlType = sqlType;
this.klass = klass;
}
@Override
public T convert(String source) {
if (StringUtils.isEmpty(source)) {
return null;
}
return convertInternal(source);
}
/**
* Внутренний метод преобразования
*
* @param source Исходная строка
* @return T
*/
protected abstract T convertInternal(String source);
/**
* Возвращает тип SQL
*
* @return SQLType
*/
public SQLType sqlType() {
return sqlType;
}
/**
* Возвращает класс
*
* @return Class<?>
*/
public Class<?> typeKlass() {
return klass;
}}
``` **protected ExceptionHandler exceptionHandler() {**
**return EXCEPTION_HANDLER;**
**}**
/**
* 覆盖默认的ExceptionHandler.NO_OP
*/
private static final ExceptionHandler EXCEPTION_HANDLER = (event, throwable)
-> log.error("解析binlog事件出现异常,事件内容:{}", JSON.toJSONString(event), throwable);
**另外,有些场景需要对回调前或者回调后的结果做特化处理,因此引入了解析结果拦截器(链)的实现,对应的类是 BaseParseResultInterceptor:**
public abstract class BaseParseResultInterceptor<T> extends BaseParameterizedTypeReferenceSupport<T> {
public BaseParseResultInterceptor() {
super();
}
public void onParse(ModelTable modelTable) {
}
public void onBeforeInsertProcess(ModelTable modelTable, T beforeData, T afterData) {
}
public void onAfterInsertProcess(ModelTable modelTable, T beforeData, T afterData) {
}
public void onBeforeUpdateProcess(ModelTable modelTable, T beforeData, T afterData) {
}
public void onAfterUpdateProcess(ModelTable modelTable, T beforeData, T afterData) {
}
public void onBeforeDeleteProcess(ModelTable modelTable, T beforeData, T afterData) {
}
public void onAfterDeleteProcess(ModelTable modelTable, T beforeData, T afterData) {
}
public void onBeforeDDLProcess(ModelTable modelTable, T beforeData, T afterData, String sql) {
}
public void onAfterDDLProcess(ModelTable modelTable, T beforeData, T afterData, String sql) {
}
public void onParseFinish(ModelTable modelTable) {
}
public void onParseCompletion(ModelTable modelTable) {
}
}
**解析结果拦截器的回调时机可以参看上面的架构图或者 BaseCanalBinlogEventProcessor的源代码。**
### 开发全局组件自动配置模块
如果使用了 Spring контейнера, необходимо добавить один конфигурационный класс для загрузки всех существующих компонентов и добавить один глобальный конфигурационный класс CanalGlueAutoConfiguration (этот класс можно увидеть в проекте spring-boot-starter-canal-glue, этот модуль содержит только один класс):
@Configuration
public class CanalGlueAutoConfiguration implements SmartInitializingSingleton, BeanFactoryAware {
private ConfigurableListableBeanFactory configurableListableBeanFactory;
@Bean
@ConditionalOnMissingBean
public CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory() {
return InMemoryCanalBinlogEventProcessorFactory.of();
}
@Bean
@ConditionalOnMissingBean
public ModelTableMetadataManager modelTableMetadataManager(CanalFieldConverterFactory canalFieldConverterFactory) {
return InMemoryModelTableMetadataManager.of(canalFieldConverterFactory);
}
@Bean
@ConditionalOnMissingBean
public CanalFieldConverterFactory canalFieldConverterFactory() {
return InMemoryCanalFieldConverterFactory.of();
}
@Bean
@ConditionalOnMissingBean
public CanalBinLogEventParser canalBinLogEventParser() {
return DefaultCanalBinLogEventParser.of();
}
@Bean
@ConditionalOnMissingBean
public ParseResultInterceptorManager parseResultInterceptorManager(ModelTableMetadataManager modelTableMetadataManager) {
return InMemoryParseResultInterceptorManager.of(modelTableMetadataManager);
}
@Bean
@Primary
public CanalGlue canalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
return DefaultCanalGlue.of(canalBinlogEventProcessorFactory);
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public void afterSingletonsInstantiated() {
ParseResultInterceptorManager parseResultInterceptorManager
= configurableListableBeanFactory.getBean(ParseResultInterceptorManager.class);
ModelTableMetadataManager modelTableMetadataManager
= configurableListableBeanFactory.getBean(ModelTableMetadataManager.class);
CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory
= configurableListableBeanFactory.getBean(CanalBinlogEventProcessorFactory.class);
CanalBinLogEventParser canalBinLogEventParser
=
*Примечание: В данном тексте не удалось перевести некоторые фрагменты, так как они содержат специальные символы или непечатаемые символы.* ```
create_time": 93
},
"table": "t_order",
"ts": 1583143969460,
"type": "INSERT"
}
Результат выполнения:
Если напрямую связать Canal с Kafka и отправить данные в Topic, это также довольно просто. Пример использования вместе с потребителями Kafka:
@Slf4j
@Component
@RequiredArgsConstructor
public class CanalEventListeners {
private final CanalGlue canalGlue;
@KafkaListener(
id = "${canal.event.order.listener.id:db-order-service-listener}",
topics = "db_order_service",
containerFactory = "kafkaListenerContainerFactory"
)
public void onCrmMessage(String content) {
canalGlue.process(content);
}
}
Автор разработал canal-glue с целью создания высокоэффективного конвертера строк, поскольку он только начал работать с областью «малых данных», а человеческих ресурсов не хватало. Кроме того, требовалось обрабатывать большое количество отчётов, и было невозможно тратить много времени на обработку повторяющихся шаблонов кода. Хотя общий дизайн ещё не идеален, по крайней мере, в плане повышения эффективности разработки, canal-glue справился со своей задачей.
Репозиторий проекта:
Последний код репозитория временно находится в ветке develop.
(Эта статья завершена c-15-d e-a-20201005, но была отложена почти на месяц.)
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )