1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/Hu-Lyndon-streamsets-start-asset

В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
2.StreamSets-api.md 44 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 09.06.2025 14:20 9a477ac

streamsets-api

├─base Основная реализация абстракции Stage │ └─configurablestage Конфигурируемый базовый класс ├─credential Сертификаты ├─delegate Делегирование │ └─exported ├─el ├─ext │ ├─io │ └─json ├─impl │ └─annotationsprocessor ├─interceptor Интерцепторы ├─lineage Линеаризация (определение происхождения данных) └─service ├─dataformats │ └─log └─sshtunnel

Основные определения интерфейсов

image-20200314162051872

javax.annotation.processing.Processor

com.streamsets.pipeline.api.impl.annotationsprocessor.PipelineAnnotationsProcessor
``````java
@SupportedAnnotationTypes({
    "com.streamsets.pipeline.api.StageDef",  используется для объявления классов стадий данных. Классы должны реализовывать источник, процессор или целевую точку.
    "com.streamsets.pipeline.api.GenerateResourceBundle", используется для пометки пакетов ресурсов, которые должны быть сгенерированы для стадий и связанных классов, которые имеют метки или сообщения, отображаемые в UI и требующие локализации.
    "com.streamsets.pipeline.api.ElDef", классы, аннотированные ElDef, автоматически сканируются для EL констант и определений EL функций, которые могут быть использованы во всех конфигурациях библиотеки стадий.
    "com.streamsets.pipeline.api.lineage.LineagePublisherDef",
    "com.streamsets.pipeline.api.credential.CredentialStoreDef",  используется для объявления реализации CredentialStore для автоматического обнаружения.
    "com.streamsets.pipeline.api.service.ServiceDef", используется для объявления классов сервисов данных. Сервисы должны реализовывать Service, а затем указывать интерфейс в свойствах предоставления.
    "com.streamsets.pipeline.api.delegate.StageLibraryDelegateDef", делегирование библиотеки стадий позволяет фреймворку данных обрабатывать некоторые основные функции "в сторону" библиотеки стадий, чтобы изолировать зависимости этих функций. Отправка заданий в кластер является хорошим примером.
    "com.streamsets.pipeline.api.interceptor.InterceptorDef"
})
```### datacollector-resource-bundles.json

```json
[
  "com.streamsets.pipeline.api.OnRecordError",  используется для определения обработки ошибок записей стадией через встроенные конфигурации "on record error".
  "com.streamsets.pipeline.api.StageDef$DefaultOutputStreams",
  "com.streamsets.pipeline.api.StageDef$VariableOutputStreams",
  "com.streamsets.pipeline.api.ExecutionMode",
  "com.streamsets.pipeline.api.DeliveryGuarantee",
  "com.streamsets.pipeline.api.base.Errors",
  "com.streamsets.pipeline.api.StageUpgrader"
]
package com.streamsets.pipeline.api;

/**
 * Перечисление, определяющее обработку ошибочных записей для стадий с помощью встроенной конфигурации 'on record error'.
 */
// мы используем аннотацию только для справочных целей.
// обработчик аннотаций не работает в этом maven-проекте
// у нас есть жестко закодированный файл 'datacollector-resource-bundles.json' в ресурсах
@GenerateResourceBundle
public enum OnRecordError implements Label {
  DISCARD("Discard"),
  TO_ERROR("Send to Error"),
  STOP_PIPELINE("Stop Pipeline"),
  ;

  private final String label;

  OnRecordError(String label) {
    this.label = label;
  }

  @Override
  public String getLabel() {
    return label;
  }

}
public @interface StageDef {

  //enum для процессоров, использующих конфигурации LanePredicateMapping
  // мы используем аннотацию только для справочных целей.
  // обработчик аннотаций не работает в этом maven-проекте
  // у нас есть жестко закодированный файл 'datacollector-resource-bundles.json' в ресурсах

  /**
   * Перечисление для использования в {@link #outputStreams()} для указания, что выходные потоки являются переменными, управляемыми конфигурацией стадии, указанной в {@link #outputStreamsDrivenByConfig()}.
   */
  @GenerateResourceBundle
  enum VariableOutputStreams implements Label {
  ;
``````java
package com.streamsets.pipeline.api;

/**
 * Определяет режимы выполнения конвейера.
 * <p/>
 * Используется в определениях этапов для ограничения режимов выполнения, которые поддерживает этап.
 * <p/>
 * Доступен через контекст этапа для определения текущего режима выполнения конвейера.
 * 
 * @see StageDef#execution()
 * @see Stage.Context#getExecutionMode()
 */
public enum ExecutionMode implements Label {
  STANDALONE("Standalone"),
  @Deprecated
  CLUSTER("Cluster"), // Сохранено для обратной совместимости - заменено CLUSTER_BATCH и CLUSTER_STREAMING
  CLUSTER_BATCH("Cluster Batch"),
  CLUSTER_YARN_STREAMING("Cluster Yarn Streaming"),
  CLUSTER_MESOS_STREAMING("Cluster Mesos Streaming"),
  SLAVE("Slave"),
  EDGE("Edge"),
  EMR_BATCH("Cluster EMR Batch"),
  BATCH("Batch"),
  STREAMING("Streaming"),
  ;

  private final String label;

  ExecutionMode(String label) {
    this.label = label;
  }

  @Override
  public String getLabel() {
    return label;
  }

}
package com.streamsets.pipeline.api;

/**
 * Доставка гарантии для конвейера.
 * В настоящее время используется только для источника PushSource.
 */
public enum DeliveryGuarantee implements Label {
  AT_LEAST_ONCE("At Least Once"),
  AT_MOST_ONCE("At Most Once");

  private final String label;

  DeliveryGuarantee(String label) {
    this.label = label;
  }

  @Override
  public String getLabel() {
    return label;
  }

}
package com.streamsets.pipeline.api.base;

import com.streamsets.pipeline.api.ErrorCode;
import com.streamsets.pipeline.api.GenerateResourceBundle;
```/**
 * Ошибки, возникающие в Data Collector при проблемах с конфигурацией этапа.
 */
@GenerateResourceBundle
public enum Errors implements ErrorCode {
  API_00("Этап '{}' требует ровно {} выходных потоков. Есть '{}'"),
  API_01("Невозможно преобразовать поле типа {} '{}' в Boolean"),
  API_02("Невозможно преобразовать поле типа {} '{}' в Byte[]"),
  API_03("Невозможно преобразовать Byte[] в {}"),
  API_04("Невозможно преобразовать поле типа {} '{}' в Byte"),
  API_05("Невозможно преобразовать поле типа {} '{}' в Char"),
  API_06("Невозможно преобразовать '{}' в дату. Используйте следующий формат даты ISO 8601 UTC: yyyy-MM-dd'T'HH:mm'Z'"),
  API_07("Невозможно преобразовать поле типа {} '{}' в Date"),
  API_08("Невозможно преобразовать поле типа {} '{}' в Decimal"),
  API_09("Невозможно преобразовать поле типа {} '{}' в Double"),
  API_10("Невозможно преобразовать поле типа {} '{}' в Float"),
  API_11("Невозможно преобразовать поле типа {} '{}' в Integer"),
  API_12("Невозможно преобразовать поле типа {} '{}' в List"),
  API_13("Невозможно преобразовать List в {}"),
  API_14("Невозможно преобразовать поле типа {} '{}' в Long"),
  API_15("Невозможно преобразовать поле типа {} '{}' в Map"),
  API_16("Невозможно преобразовать Map в {}"),
  API_17("Невозможно преобразовать поле типа {} '{}' в Short"),
  API_18("Невозможно преобразовать Map, List или Byte[] в String"),
}
``````java
package com.streamsets.pipeline.api;

import java.util.List;/**
 * Интерфейс <code>StageUpgrader</code> позволяет стадиям обновлять их конфигурацию с предыдущих версий стадии.
 * <p/>
 * Обновитель вызывается только тогда, когда версия конфигурации стадии в потоке старше, чем версия используемой
 * стадии в потоке (это обычно происходит сразу после обновления Data Collector или библиотеки стадии).
 */
public interface StageUpgrader {  /**
   * Коды ошибок, используемые обновителем.
   */
  @GenerateResourceBundle
  public enum Error implements ErrorCode {
    UPGRADER_00("Обновитель не реализован для стадии '{}:{}' экземпляра '{}'"),
    UPGRADER_01("Не удалось обновить стадию '{}:{}' экземпляра '{}' с версии '{}' до версии '{}'"),
    ;

    private final String message;

    Error(String message) {
      this.message = message;
    }

    @Override
    public String getCode() {
      return name();
    }

    @Override
    public String getMessage() {
      return message;
    }
  }

  /**
   * Заданная реализация <code>StageUpgrader</code>. Она проваливает все обновления.
   */
  public static class Default implements StageUpgrader {
    /**
     * Это реализация всегда выбрасывает исключение.
     */
    @Override
    public List<Config> upgrade(List<Config> configs, Context context) throws StageException {
      throw new StageException(
        Error.UPGRADER_00,
        context.getLibrary(),
        context.getStageName(),
        context.getStageInstance()
      );
    }
  }

  /**
   * Обновление контекста с различными метаданными обновления.
   */
  public static interface Context {

    /**
     * Имя библиотеки этапа
     */
    public String getLibrary();

    /**
     * Имя этапа (уникально идентифицирует этот этап в SDC)
     */
    public String getStageName();

    /**
     * Имя экземпляра этапа (уникально идентифицирует этот экземпляр этапа в определенном потоке).
     */
    public String getStageInstance();

    /**
     * Текущая версия конфигурации.
     */
    public int getFromVersion();

    /**
     * Желаемая целевая версия после обновления.
     */
    public int getToVersion();
  }    /**
     * Регистрация указанного сервиса с предоставленной конфигурацией.
     *
     * @param service Интерфейс сервиса, который необходимо зарегистрировать.
     * @param configs Начальная конфигурация для сервиса.
     */
    public void registerService(Class<?> service, List<Config> configs);
  }  /**
   * Обновление конфигурации этапа с предыдущей версии до текущей версии.
   *
   * @param configs Текущая конфигурация, которая требует обновления.
   * @param context Контекст обновления с различными метаданными.
   * @return Обновленная конфигурация.
   * @throws StageException если конфигурации не могут быть обновлены.
   */
  default public List<Config> upgrade(List<Config> configs, Context context) throws StageException {
    return upgrade(
      context.getLibrary(),
      context.getStageName(),
      context.getStageInstance(),
      context.getFromVersion(),
      context.getToVersion(),
      configs
    );
  }

  /**
   * Обновление конфигурации этапа с предыдущей версии до текущей версии.
   *
   * @param library имя библиотеки этапа.
   * @param stageName имя этапа.
   * @param stageInstance имя экземпляра этапа.
   * @param fromVersion версия, записанная в конфигурации этапа для обновления.
   * @param toVersion версия библиотеки этапа, до которой требуется обновить конфигурацию.
   * @param configs конфигурации для обновления.
   * @return Обновленная конфигурация.
   * @throws StageException если конфигурации не могут быть обновлены.
   */
  @Deprecated
  default public List<Config> upgrade(
    String library,
    String stageName,
    String stageInstance,
    int fromVersion,
    int toVersion,
    List<Config> configs
  ) throws StageException {
    // По умолчанию поведение не выполняет никаких действий
    return configs;
  }

## pipeline.api

## Batch Record EventRecord

![image-20200311220946136](./image/image--20200311220946136.png)

### Batch

Датасет-обработчик (Processor) и целевая точка (Target) этапы получают экземпляр пакета, который позволяет им получить доступ к записям в текущем пакете для обработки.```java
/**
 * Возвращает итератор со всеми записями в пакете для текущего этапа.
 * Каждый раз, когда этот метод вызывается, он возвращает новый итератор со всеми записями в пакете.
 * @return итератор со всеми записями в пакете для текущего этапа.
 */
public Iterator<Record> getRecords();

Record

Класс Record представляет единицу данных, обрабатываемую пайплайнами Data Collector. Запись имеет поле Field, представляющее данные, а также заголовок для метаданных.

Запись представляет единицу данных, обрабатываемую пайплайнами Data Collector. Запись имеет поле Field, представляющее данные, а также заголовок для метаданных.

Заголовок записи Record представляет связанные метаданные, часть из которых генерируется контейнером выполнения пайплайна.

API записи поддерживает выражения путей к полям (упрощенный синтаксис, похожий на XPath), чтобы напрямую получить доступ к значениям внутри структуры данных поля.
  • Пустая строка "" указывает на корневой элемент записи Record
  • Строка /{NAME} указывает на запись '{NAME}' в Map или LIST_MAP
  • Строка [{INDEX}], где {INDEX} — это ноль или положительное целое число, указывает на запись '{INDEX}' в List или LIST_MAP
Выражение пути к полю может использоваться для ссылки на составную структуру данных любой глубины и состава, для доступа к значениям внутри неё. **field-path выражение** можно использовать для ссылки на глубину и структуру сложной данных структуры пример:
  • /contactInfo/firstName
  • /contactInfo/lastName
  • /contactInfo/email[[0]]
  • /contactInfo/address/firstLine
  • /contactInfo/address/secondLine
  • /contactInfo/address/city
  • /contactInfo/address/state
  • /contactInfo/address/zip
  • /contactInfo/phone[[0]]/number
  • /contactInfo/phone[[0]]/type
  • /contactInfo/phone[[1]]/number
  • /contactInfo/phone[[1]]/type
Используя **field-path выражения** можно проверять наличие, доступ, изменять и удалять структуру данных Field.

Методы {@link #get(String)}, {@link #has(String)}, {@link #delete(String)} и {@link #getEscapedFieldPaths()} работают с использованием field-path выражений.

Использование выражений field-path позволяет проверять наличие, доступ, изменять и удалять структуру данных Field.

{@link #get(String)},

{@link #has(String)},

{@link #delete(String)}

{@link #getEscapedFieldPaths()}

методы работают с использованием выражений field-path.

ВАЖНО: Имена ключей карты, которые не являются словом или используют любое из следующих 3 специальных символов '/', '[' или ']' должны быть заключены в одинарные или двойные кавычки, например: '"foo bar"/"xyz"]'.

image-20200311234129548

public interface Source extends ProtoSource<Source.Context> {
public interface Processor extends Stage<Processor.Context> {
public interface Target extends Stage<Target.Context> {

BatchContext

image-20200312075921536

ProtoConfigurableEntityТипичная конфигурируемая сущность, предоставляющая общий базовый интерфейс для всех конфигурируемых объектов, включая Stage и Service.

public interface ELContext {}
public interface Context extends ConfigIssueContext, ELContext, MetricContext {
    // Получить конфигурацию, связанную со стадией, из основного конфигурационного файла data collector
    public Configuration getConfiguration(); 
    // Возвращает идентификатор запуска (runner id), который остается неизменным при выполнении в разных потоках. Это значение может использоваться для создания временных ресурсов на удаленной системе, чтобы гарантировать, что различные экземпляры одной стадии в многопоточном канале не конфликтуют друг с другом. 0 используется для встроенных синглтонов.
    public int getRunnerId(); 
    // Возвращает количество созданных запусков (runner) для этого конвейера (pipeline).
    public int getRunnerCount(); 
    // Возвращает абсолютный путь к папке ресурсов SDC. Что?
    public String getResourcesDirectory(); 
    // Возвращает константы уровня конвейера. Непрерывная карта
    public Map<String, Object> getPipelineConstants();
    // Создает пустую запись
    // @param recordSourceId Идентификатор записи, который должен содержать достаточно информации для отслеживания источника записи.
    // Возвращает пустую запись с указанным идентификатором.
    Record createRecord(String recordSourceId); 
    // Создает пустую запись, включая исходные данные записи.
}
```    // @param recordSourceId Идентификатор записи, который должен содержать достаточно информации для отслеживания источника записи.
    // @param raw Исходные данные записи. byte[]
    // @param rawMime MIME-тип исходных данных.
    Record createRecord(String recordSourceId, byte[] raw, String rawMime);
 }
 ```#### Миксин интерфейс 1: ConfigIssueContextИнтерфейс для создания объектов ConfigIssue. Этот интерфейс используется для контекстных объектов всех конфигурируемых компонентов. В настоящее время он параметризирован для обеспечения обратной совместимости с классом Stage. (По предварительной оценке, этот интерфейс используется для операций с конфигурационными параметрами и проверки правильности конфигурации, так как соответствующие доказательства были найдены в интерфейсах Stage и Service.)

```java
// Предоставляет функциональность для создания конфигурационных проблем для объектов, конфигурируемых с помощью аннотации @ConfigDef.
public ConfigIssue createConfigIssue(String configGroup, String configName, ErrorCode errorCode, Object... args);
Доказательства:

В интерфейсах Stage и Service контекстные объекты содержат следующие элементы:

public interface Context extends ProtoConfigurableEntity.Context {
    // Интерфейс для конфигурационных проблем. Сохранен для обратной совместимости.
    public interface ConfigIssue {}
    // Когда Pipeline инициализируется для обработки данных, этот метод вызывается один раз.
    // Если Stage возвращает пустой список ConfigIssue, то этот Stage может обрабатывать данные.
    // В противном случае, считается, что произошла конфигурационная ошибка или возникли проблемы, и Stage еще не готов к обработке данных, поэтому инициализация Pipeline прекращается.
    public List<ConfigIssue> init(Info info, C context);
}
```#### Миксин интерфейс 2: ELContext

Определяет валидацию, установку переменных и создание функций на основе выражений EL.

Обратите внимание на следующие функции, которые определяют возможность взаимной ссылки выражений EL.

```java
// Создает ELEval с использованием определенных EL-функций и констант для указанного Stage конфигурации, а также других EL-функций и констант.
// @param configName имя конфигурации Stage.
// @param elDefClasses класс, определяющий дополнительные EL-функции и константы для конфигурирования экземпляра <code>ELEval</code>. 
public ELEval createELEval(String configName, Class<?>... elDefClasses);

Миксин интерфейс 3: MetricContext

Вероятно, содержит функциональность, связанную с мониторингом.

Этот интерфейс включает сторонние библиотеки Maven: io.dropwizard.metrics:metrics-core

import com.codahale.metrics.Counter;  // Метрика для увеличения и уменьшения значений
import com.codahale.metrics.Gauge;    // Моментальный показатель определенного метрического значения
import com.codahale.metrics.Histogram;  // Гистограмма
import com.codahale.metrics.Meter;      // Взвешенный скользящий средний показатель, предоставляющий частоту сбора данных за 1, 5 и 15 минут
import com.codahale.metrics.MetricRegistry;  // Регистратор метрик, содержащий центр регистрации метрик
import com.codahale.metrics.Timer;      // Возвращает метрическую регистратуру, используемую pipeline.
    public MetricRegistry getMetrics();
```    // Creates a timer with a name composed of the pipeline name, the stage instance name, and the specified name.
    // @param name the name of the <code>Timer</code>.
    // @return a <code>Timer</code> with a name composed of the pipeline name, the stage instance name, and the specified name.
    public Timer createTimer(String name);    public Timer getTimer(String name);

    public Meter createMeter(String name);

    public Meter getMeter(String name);

    public Counter createCounter(String name);

    public Counter getCounter(String name);

    public Histogram createHistogram(String name);

    public Histogram getHistogram(String name);

    public Gauge<Map<String, Object>> createGauge(String name);

    public Gauge<Map<String, Object>> createGauge(String name, Comparator<String> comparator);

    public Gauge<Map<String, Object>> getGauge(String name);

Этап

Определяет базовый внутренний интерфейс для реализации сборщика контекста и жизненного цикла этапа, а также общего контекста.

image-20200314150104809

В интерфейсе определены пять основных элементов:

public interface Stage<C extends Stage.Context> extends ProtoConfigurableEntity {
    public interface Info {} // Предоставляет функциональность для предоставления метаданных этапа
    public interface UserContext {} // Предоставляет метаданные для разработчиков pipeline или задачи
    public interface Context extends ProtoConfigurableEntity.Context {} // Здесь смешан контекст ProtoConfigurableEntity
    public interface ConfigIssue {}
    public List<ConfigIssue> init(Info info, C context);
    public void destroy();
}

Stage.Context

В контексте этапа, помимо смешивания контекста ProtoConfigurableEntity, добавлены следующие функции:


    // Возвращает версию текущей среды.
    public String getEnvironmentVersion();

    // Возвращает текущий режим выполнения pipeline.
    public ExecutionMode getExecutionMode();

    /**
     * Этот метод всегда возвращает -1 и больше не поддерживается.
     */
    @Deprecated
    public long getPipelineMaxMemory();    // Указывает, выполняется ли pipeline в режиме предварительного просмотра.
    public boolean isPreview();

    // Возвращает пользовательский контекст, связанный с текущим выполняющимся pipeline.  ### можно заметить, что здесь есть концепция управления метаданными ###
    public UserContext getUserContext();

    // Возвращает объект Info, представляющий Stage
    public Info getStageInfo();

    // Возвращает список объектов Info для всех Stage в текущем pipeline
    public List<Info> getPipelineInfo();

    // Отправляет сообщение об ошибке
    public void reportError(Exception exception);

    public void reportError(String errorMessage);

    // Отправляет сообщение об ошибке с использованием локализуемого кода ошибки и параметров.
    // @param errorCode код ошибки для отправки.
    // @param args параметры для шаблона сообщения кода ошибки.
    public void reportError(ErrorCode errorCode, Object... args);

    // Возвращает настройки обработки ошибок для Stage.
    // Stage должен быть настроен для поддержки этой настройки обработки ошибок.
    public OnRecordError getOnErrorRecord();

    // Возвращает время завершения предыдущего Stage.
    // Возвращает время завершения предыдущего Stage, если неизвестно, то 0.
    public long getLastBatchTime();

    // Проверяет, был ли pipeline остановлен во время обработки Stage.
    public boolean isStopped();

    // Проверяет, является ли Stage Stage для обработки ошибок.
    public boolean isErrorStage();    // Создает стандартное событие записи, содержащее предварительно заполненные необходимые заголовочные свойства.
    // @param type Тип события (значение определяется генерирующим Stage)
    // @param version Версия типа события (для поддержки эволюции событий)
    // @param recordSourceId Идентификатор записи. Он должен содержать достаточно информации для отслеживания источника записи.
    // @return Новая запись.
    public EventRecord createEventRecord(String type, int version, String recordSourceId);

    // Создает событие линеажа и инициализирует стандартные поля.
    // @param type Тип события линеажа
    // @return Инициализированное событие линеажа
    LineageEvent createLineageEvent(LineageEventType type);

    // Публикует данное событие линеажа в конфигурированное хранилище линеажа.
    // @param event Событие линеажа, которое необходимо распространить в конфигурированное хранилище линеажа.
    public void publishLineageEvent(LineageEvent event);

    // Возвращает уникальный идентификатор, который идентифицирует этот сборщик данных.
    // Возвращает уникальный идентификатор, который идентифицирует этот сборщик данных.
    public String getSdcId();

    /**
     * После текущего Stage, переводит состояние pipeline в FINISHED.
     */
    @Deprecated
    void finishPipeline();```markdown
### ToXXXContext

Эти ToXXXContext имеют некоторые сходства.

- ToSourceResponseContext

  Описывает методы для различных контекстных классов, которые работают с обработкой ответов источника.  **По описанию этого класса можно сделать вывод, что StreamSets поддерживает не только DAG, но и DCG.**

  ```java
  void toSourceResponse(Record record);
  • ToEventContext

    Описывает методы для различных контекстных классов, которые работают с обработкой событий.

    public void toEvent(EventRecord record);
  • ToErrorContext

    Описывает методы для различных контекстных классов, которые работают с обработкой ошибок.

    public void toError(Record record, Exception exception);
    public void toError(Record record, String errorMessage);
    public void toError(Record record, ErrorCode errorCode, Object... args);

Источник — это начальная стадия сборщика данных. На начальной стадии используются данные из внешних систем, создавая записи, которые могут быть обработаны стадией обработки (Processor) или целевой стадией (Target).

![image-20200312170851178](./image/image-20200312170851178.png)


### Обработчик

Обработчик — это стадия обработки данных сборщика. На стадии обработки принимаются записи из стадии источника (Source) или других стадий обработки, выполняются операции над этими записями и они записываются для дальнейшей обработки другим обработчиком или целевой стадией (Target).

![image-20200312171746554](./image/image-20200312171746554.png)


### Цель

Цель — это целевая стадия сборщика данных. На целевой стадии принимаются записи из стадии источника (Source) или стадии обработки (Processor) и они записываются в внешние системы.
```![image-20200312164843344](./image/image-20200312164843344.png)


### Экзекьютор

В отличие от целевой стадии, которая сохраняет записи, экзекьютор выполняет операции на данных из внешних систем, которые отображаются в поступающих записях.

![image-20200312181116619](./image/image-20200312181116619.png)


### LineagePublisher

Сборщики данных собирают различные метаданные по линии преемственности для всех потоков во время их выполнения. Можно реализовать подкласс LineagePublisher и настроить сборщик данных для передачи этих данных в стороннюю систему (например, Cloudera Navigator или Oracle Metadata Manager).

```java
interface ConfigIssue {}

public interface Context {
    String getId();
	ConfigIssue createConfigIssue(ErrorCode errorCode, Object... args);
	String getConfig(String configName);
}
// Этот метод вызывается только один раз во время запуска сборщика данных.
List<ConfigIssue> init(Context context);
// Обрабатывает список переданных событий и отправляет их в сборщик линии преемственности. Возвращает true, если все события успешно записаны в удаленную систему.
boolean publishEvents(List<LineageEvent> events);

Сервис

Базовый интерфейс для служб сборщика данных, это переиспользуемый и настраиваемый код, на который стадии могут ссылаться для внешнего использования и совместного использования общей функциональности.Сервисы сборщика данных представляют базовый интерфейс, который обеспечивает переиспользуемый и настраиваемый код, на который стадии могут ссылаться для внешнего использования и совместного использования общей функциональности.

// LineagePublisher контекст, который предоставляет информацию о времени выполнения и услуги для плагина.
// Примечание: этот контекст предназначен для предоставления информации о времени выполнения и услуг для плагина LineagePublisher. Однако он не использует LineagePublisher.Context, а наследует ProtoConfigurableEntity.Context.
public interface Context extends ProtoConfigurableEntity.Context {}  // ??? странный комментарий
// Этот метод вызывается один раз при запуске конвейера.
// Если служба возвращает пустой список ConfigIssues, то считается, что она готова к использованию.
// В противном случае, она считается неправильно настроенной или имеет необратимые ошибки, и служба еще не готова к использованию, поэтому инициализация всего конвейера завершится неудачей.
List<ConfigIssue> init(Context context);
``````java
void разрушить();

Другие службы

Интерфейсный слой определяет некоторые общие интерфейсы служб:

  • SshTunnelService

    Служба, которая предоставляет поддержку SSH-туннеля. Она инициирует прямой SSH-туннель к настроенному SSH-серверу.

  • LogParserService

    Служба для парсинга логов из входных потоков.

  • DataFormatGeneratorService

    Служба, которая позволяет сериализовать записи в различные файловые форматы.

  • DataFormatParserService

    Служба для парсинга различных данных из входных потоков.

  • SdcRecordGeneratorService

    Специальная служба формата данных, которая всегда использует SDC_RECORD в качестве выходного формата и, следовательно, не имеет параметров ввода для настройки в UI. Это особенно полезно для этапов ошибок, которые всегда сериализуют SDC_RECORD.

Интерцептор

Интерцептор является подключаемым компонентом, подобным Stage.

В отличие от Stage, он недоступен для пользователя и "скрыт" в выполнении в рамках выполнения. Это позволяет интерцептору выполнять работу, которую пользователь не должен отключать.

// Основные элементы
// Интерцептор включает мониторинг контекста
public interface Context extends MetricContext {}
// Этот метод будет вызван один раз перед обработкой данных при инициализации конвейера.
public List<ConfigIssue> init(Context context);
// Используется для перехвата записей и выполнения необходимых действий.
public List<Record> intercept(List<Record> records) throws StageException;
```## Обертка Exception

### StageException

Родительский класс для всех исключений Stage

```java
private final transient ErrorCode errorCode;
private final transient ErrorMessage errorMessage;
private final transient Object[] params;
private List<AntennaDoctorMessage> antennaDoctorMessages;

...
    
public interface ErrorCode {

  /**
   * Возвращает код ошибки.
   *
   * @return код ошибки.
   */
  public String getCode();

  /**
   * Возвращает встроенный шаблон сообщения по умолчанию для кода ошибки.
   *
   * @return шаблон сообщения по умолчанию
   */
  public String getMessage();

}

...

public class ErrorMessage implements LocalizableString {
  private static final Object[] NULL_ONE_ARG = {null};

  private final String errorCode;
  private final LocalizableString localizableMessage;
  private final long timestamp;
  private final boolean prependErrorCode;
  private final String stackTrace;
  private final List<AntennaDoctorMessage> antennaDoctorMessages;
}

...
public class AntennaDoctorMessage {
  private final String summary;
  private final String description;
}

image- Yöntem hatası kodu ve mesajı

Делегирование

image-20200314121802851

Client определен внутри ClusterJob, Client может закрыто обращаться к элементам внутри ClusterJob. В дальнейшем реализация должна быть в виде делегирования.

credential

Система хранения учетных данных предоставляет программный доступ, через имя ссылки, к чувствительным данным, таким как пароли системы.

image-20200314123329655

ext

image-20200314125441992

EL Expression

image-20200314125619691

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/Hu-Lyndon-streamsets-start-asset.git
git@api.gitlife.ru:oschina-mirror/Hu-Lyndon-streamsets-start-asset.git
oschina-mirror
Hu-Lyndon-streamsets-start-asset
Hu-Lyndon-streamsets-start-asset
master