1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/throwableDoge-canal-glue

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

Упрощение ETL-работы, написание слоя клея Canal

Предпосылки

Это статья, которую я долго откладывал и всё время забывал написать. В статье, возможно, будет много воды, но она подробно описывает, как написать небольшой «фреймворк». Этот лаконичный слой клея уже более полугода служит в производственной среде, здесь мы попытались убрать код, связанный с бизнес-логикой, и выделить относительно простую версию.

В предыдущих статьях упоминалось, что Canal анализирует события binlog MySQL после того, как они были получены (источник: исходный код Canal com.alibaba.otter.canal.protocol.FlatMessage):

<объект, полученный после анализа Canal событий binlog MySQL (источник: исходный код Canal com.alibaba.otter.canal.protocol.FlatMessage)>

Если напрямую анализировать этот исходный объект, то придётся писать много шаблонного кода, который будет меняться при любом изменении, а это нежелательно. Поэтому было потрачено некоторое время на создание слоя клея Canal, который преобразует полученные FlatMessage в соответствующие экземпляры DTO в соответствии с именем таблицы, что может повысить эффективность разработки и уменьшить количество шаблонного кода. Схема потока данных этого слоя клея выглядит следующим образом:

<Схема потока данных слоя клея>

Для создания такого слоя клея используются следующие методы:

  • Рефлексия.
  • Аннотации.
  • Стратегический паттерн.
  • IOC-контейнер (необязательно).

Проект состоит из следующих модулей:

  • canal-glue-core: основные функции.
  • spring-boot-starter-canal-glue: адаптация IOC-контейнера Spring и добавление автоматической конфигурации.
  • canal-glue-example: примеры использования и базовые тесты.

Далее будет подробно рассмотрен процесс реализации этого слоя клея.

Введение зависимостей

Чтобы не загрязнять внешние зависимости сервисов, которые используют этот модуль, все зависимости, кроме тех, которые связаны с преобразованием JSON, определяются как scope provide или test, версии зависимостей и BOM приведены ниже:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spring.boot.version>2.3.0.RELEASE</spring.boot.version>
        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
        <lombok.version>1.18.12</lombok.version>
        <fastjson.version>1.2.73</fastjson.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <scope>import</scope>
            <type>pom</type>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
</dependencies>

Здесь модуль canal-glue-core по сути зависит только от fastjson и может быть полностью отделён от системы Spring.

Базовая архитектура

Здесь представлена «задним числом» архитектурная схема, поскольку изначально для быстрого запуска в производство первоначальная версия не учитывала многие аспекты, и даже включала в себя код, привязанный к бизнес-логике, компоненты были выделены позже:

<Архитектурная схема>

Модуль проектирования конфигурации (уже удалён)

При разработке модуля конфигурации рассматривались два способа: использование внешних конфигурационных файлов и аннотаций. Первоначально использовался формат JSON для внешних конфигурационных файлов, а аннотации были добавлены позже. В этом разделе кратко описывается загрузка конфигурации с использованием формата JSON, анализ аннотаций оставлен для модуля обработчика.

Изначально целью было быстро разработать слой клея, поэтому конфигурация использовала формат JSON с высокой читаемостью:

{
  "version": 1,
  "module": "canal-glue",
  "databases": [
    {
      "database": "db_payment_service",
      "processors": [
        {
          "table": "payment_order",
          "processor": "x.y.z.PaymentOrderProcessor",
          "exceptionHandler": "x.y.z.PaymentOrderExceptionHandler"
        }
      ]
    },
    {
      ......
    }
  ]
}

В дизайне конфигурации рекомендуется избегать использования массивов JSON в качестве верхнего уровня конфигурации, так как это может привести к странному дизайну объектов.

Поскольку приложения, использующие этот модуль, могут обрабатывать события binlog для нескольких вышестоящих баз данных, конфигурация модуля должна быть организована таким образом, чтобы можно было легко найти информацию о каждой базе данных. Конфигурация включает в себя имя базы данных, обработчики таблиц и обработчики исключений для каждой таблицы. Затем эти данные преобразуются в соответствующие классы сущностей:

@Data
public class CanalGlueProcessorConf {

    private String table;

    private String processor;

    private String exceptionHandler;
}

@Data
public class CanalGlueDatabaseConf {

    private String database;

    private List<CanalGlueProcessorConf> processors;
}

@Data
public class CanalGlueConf {

    private Long version;

    private String module;

    private List<CanalGlueDatabaseConf> database;
}

После создания классов сущностей можно приступить к написанию загрузчика конфигурации, который загружает конфигурацию из пути ClassPath:

public interface CanalGlueConfLoader {

    CanalGlueConf load(String location);
}

// Реализация
public class ClassPathCanalGlueConfLoader implements CanalGlueConfLoader {
``` **Чтение файла с абсолютным путём в `ClassPath` и преобразование его содержимого в объект `CanalGlueConf` с использованием `Fasfjson`.**

Этот код представляет собой реализацию по умолчанию, которая может быть перекрыта пользовательской реализацией при использовании модуля `canal-glue`.

**Разработка основных модулей:**
* **Основные модели:** определение уникального идентификатора для конкретной таблицы в базе данных.
* **Адаптеры:** разработка слоя адаптеров.
* **Конвертеры и парсеры:** разработка слоёв конвертеров и парсеров.
* **Обработчики:** разработка слоя обработчиков.
* **Глобальные компоненты:** разработка модуля автоматической конфигурации глобальных компонентов (ограничено системой Spring, извлечено в модуль `spring-boot-starter-canal-glue`).
* **`CanalGlue`:** разработка компонента `CanalGlue`.

**Определение основных моделей:**

Определение интерфейса верхнего уровня `ModelTable`, который служит для идентификации конкретной таблицы в определённой базе данных:
```java
// Модельная таблица объектов
public interface ModelTable {
    String database();
    String table();

    static ModelTable of(String database, String table) {
        return DefaultModelTable.of(database, table);
    }
}

Класс DefaultModelTable реализует интерфейс ModelTable:

@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
public class DefaultModelTable implements ModelTable {

    private final String database;
    private final String table;

    @Override
    public String database() {
        return database;
    }

    @Override
    public String table() {
        return table;
    }

    // ...
}

Здесь класс DefaultModelTable переопределяет методы equals() и hashCode() для удобства использования экземпляров ModelTable в качестве ключей в контейнерах типа HashMap. Это позволяет создать структуру кэша ModelTable -> Processor.

Также определяется класс событий CanalBinLogEvent, который похож на класс FlatMessage:

@Data
public class CanalBinLogEvent {

    /**
     * Идентификатор события, не имеет практического значения
     */
    private Long id;

    /**
     * Текущие данные узла после изменения
     */
    private List<Map<String, String>> data;

    /**
     * Список имён столбцов первичного ключа
     */
    private List<String> pkNames;

    /**
     * Предыдущие данные узла до изменения
     */
    private List<Map<String, String>> old;

    /**
     * Тип: UPDATE\INSERT\DELETE\QUERY
     */
    private String type;

    /**
     * Время выполнения binlog
     */
    private Long es;

    /**
     * Отметка времени построения DML
     */
    private Long ts;

    /**
     * Выполняемый SQL, необязательно существует
     */
    private String sql;

    /**
     * Имя базы данных
     */
    private String database;

    /**
     * Название таблицы
     */
    private String table;

    /**
     * Отображение типов SQL
     */
    private Map<String, Integer> sqlType;

    /**
     * Отображение типов MySQL
     */
    private Map<String, String> mysqlType;

    /**
    * Является ли это операцией DDL
    */
    private Boolean isDdl;
}

После анализа события создаётся объект результата CanalBinLogResult:

// Константы
@RequiredArgsConstructor
@Getter
public enum BinLogEventType {

    QUERY("QUERY", "запрос"),

    INSERT("INSERT", "вставка"),

    UPDATE("UPDATE", "обновление"),

    DELETE("DELETE", "удаление"),

    ALTER("ALTER", "изменение столбца"),

    UNKNOWN("UNKNOWN", "неизвестно"),

    ;

    private final String type;
    private final String description;

    public static BinLogEventType fromType(String type) {
        for (BinLogEventType binLogType : BinLogEventType.values()) {
            if (binLogType.getType().equals(type)) {
                return binLogType;
            }
        }
        return UNKNOWN;
    }
}

// Константы
@RequiredArgsConstructor
@Getter
public enum OperationType {

    DML("dml", "DML-оператор"),

    DDL("ddl", "DDL-оператор"),
    ;

    private final String type;
    private final String description;
}

@Data
public class CanalBinLogResult<T> {
``` **Разработка адаптера слоя**

Определяется верхний слой адаптера SPI интерфейса:  
```java
public interface SourceAdapter<SOURCE, SINK> {
    SINK adapt(SOURCE source);
}

Далее разрабатывается адаптер реализации класса:

// Исходная строка возвращается напрямую
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
class RawStringSourceAdapter implements SourceAdapter<String, String> {

    @Override
    public String adapt(String source) {
        return source;
    }
}

// Преобразование Fastjson
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
class FastJsonSourceAdapter<T> implements SourceAdapter<String, T> {

    private final Class<T> klass;

    @Override
    public T adapt(String source) {
        if (StringUtils.isEmpty(source)) {
            return null;
        }
        return JSON.parseObject(source, klass);
    }
}

// Фасад
public enum SourceAdapterFacade {

    /**
     * Одиночный экземпляр
     */
    X;

    private static final SourceAdapter<String, String> I_S_A = RawStringSourceAdapter.of();

    @SuppressWarnings("unchecked")
    public <T> T adapt(Class<T> klass, String source) {
        if (klass.isAssignableFrom(String.class)) {
            return (T) I_S_A.adapt(source);
        }
        return FastJsonSourceAdapter.of(klass).adapt(source);
    }
}

В конечном итоге можно напрямую использовать метод SourceAdapterFacade#adapt(), так как в большинстве случаев используется только исходная строка и String -> Class экземпляр, и дизайн адаптера слоя может быть упрощён.

Разработка преобразователя и анализатора слоя

Для события binlog, обработанного Canal, данные и old атрибуты представляют собой структуру K-V, а все ключи являются строковыми типами, необходимо перебрать и вывести полный целевой экземпляр.

В настоящее время поддерживаются только типы объектов-оболочек, исходные типы, такие как int, не поддерживаются.

Чтобы лучше сопоставить целевые сущности с фактическими базами данных, таблицами и столбцами, а также типами столбцов, были введены две пользовательские аннотации CanalModel и @CanalField, которые определяются следующим образом:

// @CanalModel
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface CanalModel {

    /**
     * Целевая база данных
     */
    String database();

    /**
     * Целевая таблица
     */
    String table();

    /**
    * Стратегия преобразования имени атрибута -> имени столбца, доступны следующие значения: DEFAULT (исходный), UPPER_UNDERSCORE (преобразование верблюжьего регистра в нижний регистр с подчёркиванием) и LOWER_UNDERSCORE (преобразование верблюжьего регистра в нижний регистр без подчёркивания)
    */
    FieldNamingPolicy fieldNamingPolicy() default FieldNamingPolicy.DEFAULT;
}

// @CanalField
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface CanalField {

    /**
     * Имя строки
     *
     * @return columnName
     */
    String columnName() default "";

    /**
     * Тип поля SQL
     *
     * @return JDBCType
     */
    JDBCType sqlType() default JDBCType.NULL;

    /**
     * Тип конвертера
     *
     * @return klass
     */
    Class<? extends BaseCanalFieldConverter<?>> converterKlass() default NullCanalFieldConverter.class;
}

Верхний слой преобразователя определяется интерфейсом BinLogFieldConverter:

public interface BinLogFieldConverter<SOURCE, TARGET> {

    TARGET convert(SOURCE source);
}

На данный момент можно сопоставить только по типу целевого атрибута и типу SQL, указанному в аннотации, поэтому мы определяем абстрактный преобразователь BaseCanalFieldConverter:

public abstract class BaseCanalFieldConverter<T> implements BinLogFieldConverter<String, T> {

    private final SQLType sqlType;
    private final Class<?> klass;

    protected BaseCanalFieldConverter(SQLType sqlType, Class<?> klass) {
        this.sqlType = sqlType;
        this.klass = klass;
    }

    @Override
    public T convert(String source) {
        if (StringUtils.isEmpty(source)) {
            return null;
        }
        return convertInternal(source);
    }

    /**
     * Внутренний метод преобразования
     *
     * @param source Исходная строка
     * @return T
     */
    protected abstract T convertInternal(String source);

    /**
     * Возвращает тип SQL
     *
     * @return SQLType
     */
    public SQLType sqlType() {
        return sqlType;
    }

    /**
     * Возвращает класс
     *
     * @return Class<?>
     */
    public Class<?> typeKlass() {
        return klass;
    }}
``` **protected ExceptionHandler exceptionHandler() {**
    **return EXCEPTION_HANDLER;**
**}**

/**
 * 覆盖默认的ExceptionHandler.NO_OP
 */
private static final ExceptionHandler EXCEPTION_HANDLER = (event, throwable)
    -> log.error("解析binlog事件出现异常,事件内容:{}", JSON.toJSONString(event), throwable);

**另外有些场景需要对回调前或者回调后的结果做特化处理因此引入了解析结果拦截器的实现对应的类是 BaseParseResultInterceptor:**

public abstract class BaseParseResultInterceptor<T> extends BaseParameterizedTypeReferenceSupport<T> {

    public BaseParseResultInterceptor() {
        super();
    }

    public void onParse(ModelTable modelTable) {

    }

    public void onBeforeInsertProcess(ModelTable modelTable, T beforeData, T afterData) {

    }

    public void onAfterInsertProcess(ModelTable modelTable, T beforeData, T afterData) {

    }

    public void onBeforeUpdateProcess(ModelTable modelTable, T beforeData, T afterData) {

    }

    public void onAfterUpdateProcess(ModelTable modelTable, T beforeData, T afterData) {

    }

    public void onBeforeDeleteProcess(ModelTable modelTable, T beforeData, T afterData) {

    }

    public void onAfterDeleteProcess(ModelTable modelTable, T beforeData, T afterData) {

    }

    public void onBeforeDDLProcess(ModelTable modelTable, T beforeData, T afterData, String sql) {

    }

    public void onAfterDDLProcess(ModelTable modelTable, T beforeData, T afterData, String sql) {

    }

    public void onParseFinish(ModelTable modelTable) {

    }

    public void onParseCompletion(ModelTable modelTable) {

    }
}

**解析结果拦截器的回调时机可以参看上面的架构图或者 BaseCanalBinlogEventProcessor的源代码**

### 开发全局组件自动配置模块

如果使用了 Spring контейнера, необходимо добавить один конфигурационный класс для загрузки всех существующих компонентов и добавить один глобальный конфигурационный класс CanalGlueAutoConfiguration (этот класс можно увидеть в проекте spring-boot-starter-canal-glue, этот модуль содержит только один класс):

@Configuration
public class CanalGlueAutoConfiguration implements SmartInitializingSingleton, BeanFactoryAware {

    private ConfigurableListableBeanFactory configurableListableBeanFactory;

    @Bean
    @ConditionalOnMissingBean
    public CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory() {
        return InMemoryCanalBinlogEventProcessorFactory.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public ModelTableMetadataManager modelTableMetadataManager(CanalFieldConverterFactory canalFieldConverterFactory) {
        return InMemoryModelTableMetadataManager.of(canalFieldConverterFactory);
    }

    @Bean
    @ConditionalOnMissingBean
    public CanalFieldConverterFactory canalFieldConverterFactory() {
        return InMemoryCanalFieldConverterFactory.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public CanalBinLogEventParser canalBinLogEventParser() {
        return DefaultCanalBinLogEventParser.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public ParseResultInterceptorManager parseResultInterceptorManager(ModelTableMetadataManager modelTableMetadataManager) {
        return InMemoryParseResultInterceptorManager.of(modelTableMetadataManager);
    }

    @Bean
    @Primary
    public CanalGlue canalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
        return DefaultCanalGlue.of(canalBinlogEventProcessorFactory);
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;
    }

    @SuppressWarnings({"rawtypes", "unchecked"})
    @Override
    public void afterSingletonsInstantiated() {
        ParseResultInterceptorManager parseResultInterceptorManager
                = configurableListableBeanFactory.getBean(ParseResultInterceptorManager.class);
        ModelTableMetadataManager modelTableMetadataManager
                = configurableListableBeanFactory.getBean(ModelTableMetadataManager.class);
        CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory
                = configurableListableBeanFactory.getBean(CanalBinlogEventProcessorFactory.class);
        CanalBinLogEventParser canalBinLogEventParser
                =

*Примечание: В данном тексте не удалось перевести некоторые фрагменты, так как они содержат специальные символы или непечатаемые символы.* ```
create_time": 93
  },
  "table": "t_order",
  "ts": 1583143969460,
  "type": "INSERT"
}

Результат выполнения:

Если напрямую связать Canal с Kafka и отправить данные в Topic, это также довольно просто. Пример использования вместе с потребителями Kafka:

@Slf4j
@Component
@RequiredArgsConstructor
public class CanalEventListeners {

    private final CanalGlue canalGlue;

    @KafkaListener(
            id = "${canal.event.order.listener.id:db-order-service-listener}",
            topics = "db_order_service", 
            containerFactory = "kafkaListenerContainerFactory"
    )
    public void onCrmMessage(String content) {
        canalGlue.process(content);
    }    
}

Заключение

Автор разработал canal-glue с целью создания высокоэффективного конвертера строк, поскольку он только начал работать с областью «малых данных», а человеческих ресурсов не хватало. Кроме того, требовалось обрабатывать большое количество отчётов, и было невозможно тратить много времени на обработку повторяющихся шаблонов кода. Хотя общий дизайн ещё не идеален, по крайней мере, в плане повышения эффективности разработки, canal-glue справился со своей задачей.

Репозиторий проекта:

Последний код репозитория временно находится в ветке develop.

(Эта статья завершена c-15-d e-a-20201005, но была отложена почти на месяц.)

Бонус

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Упростите работу ETL, написав слой клея Canal. Развернуть Свернуть
MIT
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/throwableDoge-canal-glue.git
git@api.gitlife.ru:oschina-mirror/throwableDoge-canal-glue.git
oschina-mirror
throwableDoge-canal-glue
throwableDoge-canal-glue
master