Что такое конвейер? Что такое конвейер?
Движущиеся данные Данные проходят через конвейер в виде партий. Вот как это работает:
Проектирование потока данных Вы можете ветвить и объединять потоки данных в конвейере.
Удаление нежелательных записей Вы можете удалять записи из конвейера на каждом этапе, определяя необходимые поля или предварительные условия для вхождения записи в этап.
Обработка ошибочных записей Обработка ошибочных записей
Атрибуты заголовка записи Атрибуты заголовка записи
Атрибуты поля Атрибуты поля — это атрибуты, которые предоставляют дополнительную информацию о каждом поле, которую можно использовать в логике конвейера по мере необходимости.- Обработка измененных данных Обработка измененных данных
Удаление контрольных символов Удаление контрольных символов
Этапы разработки Этапы разработки
Быстрые клавиши для проектирования потока данных Быстрые клавиши для проектирования потока данных
Функциональность предварительного просмотра технологии Data Collector включает определенные новые функции и этапы с маркировкой "Предварительный просмотр технологии". Функциональность предварительного просмотра технологии доступна для использования в разработке и тестировании, но не предназначена для использования в производственной среде.- Тестовый источник для предварительного просмотра Тестовый источник может предоставлять тестовые данные для предварительного просмотра данных, чтобы помочь в разработке потока данных. В Control Hub вы также можете использовать тестовые источники при разработке фрагментов потока данных. Тестовые источники не используются при запуске потока данных.- Понимание состояний потока данных Понимание состояний потока данных
Поток данных описывает передачу данных от исходной системы к целевой системе и определяет, как следует преобразовывать данные по пути.Вы можете использовать один исходный этап для представления исходной системы, несколько обработчиков этапов для преобразования данных и несколько целевых этапов для представления целевой системы. При создании конвейера можно использовать этапы разработки для предоставления примерных данных и генерации ошибок для тестирования обработки ошибок. Вы можете использовать предварительный просмотр данных для определения того, как этапы изменяют данные в конвейере. Вы можете использовать этап выполнения программы для выполнения задач, запускаемых событиями, или сохранения информации о событиях. Для обработки больших объемов данных можно использовать мультипоточный конвейер или конвейер в режиме кластера.При записи в Hive или Parquet или PostgreSQL вы можете реализовать решение для обнаружения дрейфа данных, которое может обнаружить изменения в поступающих данных и обновить таблицы в целевой системе.
При запуске конвейера Data Collector выполняет его до тех пор, пока конвейер не будет остановлен или Data Collector не будет закрыт. Вы можете использовать Data Collector для запуска нескольких конвейеров.
При выполнении конвейера вы можете наблюдать за его работой для проверки правильности выполнения. Вы также можете определить метрики и правила для данных, а также оповещения, чтобы уведомлять вас при достижении определенного порога.
Данные поступают порциями через конвейер. Вот как это работает:
Источник создает порцию данных при чтении данных из исходной системы или при чтении данных из исходной системы. Он отслеживает смещение, которое представляет собой точку, где источник прекращает чтение данных.При заполнении порции или истечении времени ожидания порции источник отправляет порцию. Порции перемещаются по конвейеру между процессорами до тех пор, пока не достигнут конечной точки конвейера. Цель заключается в записи батчей в целевую систему, после чего Data Collector внутренне подтверждает смещение. На основе гарантий доставки по каналу Data Collector либо немедленно подтверждает смещение после записи его в любую целевую систему, либо немедленно подтверждает смещение после получения подтверждения записи от всех целевых систем. После подтверждения смещения исходная стадия создает новый батч.Обратите внимание, что это описание общего поведения канала. Поведение может отличаться в зависимости от конкретной конфигурации канала. Например, для потребителей Kafka смещения хранятся в Kafka или ZooKeeper. Для исходных систем, не хранящих данные (например, Omniture и HTTP Client), смещения не хранятся, так как это не относится к ним.
Вышеуказанная информация описывает стандартный однопоточный канал — исходный сервер создает батч и передает его по каналу, создавая новый батч только после завершения обработки предыдущего батча.
Некоторые источники могут создавать несколько потоков для включения параллельной обработки в многопоточном канале. В многопоточном канале можно настроить источник для создания определенного числа потоков или параллелизма. Затем Data Collector создает множество исполнителей канала в соответствии с атрибутом "Max Runners" канала для выполнения обработки канала. Каждый поток подключается к исходной системе, создает батч данных и передает его доступному исполнителю канала.Каждый исполнитель канала обрабатывает один батч за раз, как если бы канал работал в одном потоке. Когда поток данных замедляется, исполнители канала остаются бездействующими, ожидая, пока их не потребуется, и регулярно создают пустые батчи. Можно настроить атрибут "idle time for runners" канала для указания интервала или выбора выхода из создания пустых батчей.Все общие ссылки на канал в этом руководстве описывают однопоточный канал, но эта информация обычно применима и к многопоточным каналам. Для получения дополнительной информации, специфичной для многопоточных каналов, обратитесь к разделу "Обзор многопоточных каналов".
При настройке канала можно определить, как будет обрабатываться данные: нужно ли предотвращать потерю данных или дублирование данных?
Атрибуты гарантий доставки канала предоставляют следующие варианты:
По крайней мере один раз
Обеспечивает обработку всех данных через канал.
Если сбой приводит к остановке Data Collector во время обработки батча данных, при перезапуске будет повторно обработан этот батч. Этот вариант гарантирует отсутствие потери данных. Используйте этот вариант, чтобы Data Collector подтверждал смещение после записи в систему источника. Если сбой происходит после того, как Data Collector передает данные в систему источника, но до получения подтверждения и подтверждения смещения, в системе источника может быть дублировано не более одной партии данных.
Максимум один раз
Убедитесь, что данные не будут повторно обрабатываться. Если сбой приводит к остановке Data Collector во время обработки партии данных, при запуске он начнет обрабатывать следующую партию данных. Этот вариант позволяет избежать повторной обработки данных для предотвращения дублирования данных в целевой системе. Используйте этот вариант, чтобы Data Collector подтверждал смещение после записи, не ожидая подтверждения от системы источника. Если сбой происходит после того, как Data Collector передаёт данные в систему источника и подтверждает смещение, в системе источника может быть не записана не более одной партии данных.
Тип данных | Описание |
---|---|
BOOLEAN | Истина или Ложь. Соответствует Java типу данных Boolean. |
BYTE | 8-битное знаковое целое число. Диапазон значений от -128 до 127. Соответствует Java типу данных Byte. 8-битное знаковое целое число. Диапазон значений от -128 до 127. Соответствует Java типу данных Byte. |
BYTE_ARRAY | Массив значений типа byte. Соответствует Java типу данных Byte[]. Массив значений типа byte. Соответствует Java типу данных Byte[]. |
CHAR | Хранит одиночный символ Unicode. Соответствует Java типу данных Char. Хранит одиночный символ Unicode. Соответствует Java типу данных Char. |
DATE | Объект времени, который включает день, месяц и четырёхзначный год. Не включает часовой пояс. Объект времени, который включает день, месяц и четырёхзначный год. Не включает часовой пояс. |
DATETIME | Объект времени, который включает день, месяц, четырёхзначный год, часы, минуты и секунды. Точность до миллисекунды. Не включает часовой пояс. Объект времени, который включает день, месяц, четырёхзначный год, часы, минуты и секунды. Точность до миллисекунды. Не включает часовой пояс. |
DECIMAL | Символические десятичные числа произвольной точности. Соответствует Java классу BigDecimal. Символические десятичные числа произвольной точности. Соответствует Java классу BigDecimal. |
DOUBLE | 64-битное двойное-точность IEEE 754 число с плавающей запятой. Соответствует Java типу данных Double. 64-битное двойное-точность IEEE 754 число с плавающей запятой. Соответствует Java типу данных Double. |
FLOAT | 32-битное одинарное-точность IEEE 754 число с плавающей запятой. Соответствует Java типу данных Float. 32位单精度IEEE 754浮点。 对应于Java数据类型Float。 |
INTEGER | 32-битное знаковое целое число. Соответствует Java типу данных Int. 32位带符号的整数。 对应于Java数据类型Int。 |
LIST | Вложенный тип. Содержит перечисленный список значений. Значения могут быть любого типа, и каждое значение в списке может быть разного типа. |
--- | --- |
LIST | Вложенный тип. Содержит список значений, каждый из которых может быть любого типа. |
LIST_MAP | Вложенный тип. Содержит список пар ключ-значение, сохраненных в определенном порядке. Ключи всегда являются строками. Значения могут быть любого типа, и каждое значение может быть разного типа. |
LONG | 64-битное знаковое целое число. Диапазон значений от -9,223,372,036,854,775,808 до 9,223,372,036,854,775,807. Соответствует Java типу данных Long. |
MAP | Вложенный тип. Содержит список пар ключ-значение. Ключи всегда являются строками. Значения могут быть любого типа, и каждое значение может быть разного типа. |
SHORT | 16-битное знаковое целое число. Диапазон значений от -32,768 до 32767. |
STRING | Текст. Соответствует Java классу String. |
TIME | Объект времени, который включает часы, минуты и секунды. Точность до миллисекунды. Не включает часовой пояс. |
ZONED_DATETIME | Объект времени, который включает день, месяц, четырёхзначный год, часы, минуты, секунды и часовой пояс. Точность до наносекунды. Соответствует Java классу ZonedDateTime. |
Когда один этап подключается к нескольким этапам, все данные передаются ко всем подключенным этапам. Вы можете настроить обязательные поля для этапов, чтобы отбрасывать записи до того, как они попадут на этап, но по умолчанию все записи проходят.
Например, в следующей трубе все данные из источника "каталог" передаются на два ветвления для обработки разных типов данных. Однако вы можете настроить обязательные поля для разделителя полей или заменителя полей, чтобы отбрасывать ненужные записи.
Для маршрутизации данных на основе более сложных условий используйте селектор потока.
Некоторые этапы генерируют события, которые передаются в поток событий. Поток событий исходит из этапа, генерирующего события, такого как начальная точка или конечная точка, и передается через поток событий, как показано ниже:
Дополнительную информацию о фреймворке событий и потоках событий см. в разделе Обзор триггеров потока данных.
Данные из стандартного потока селектора потока и все данные из этапа заменителя полей передаются на этап оценки выражений для дальнейшей обработки, но порядок не определен, и записи не объединяются.
Важно: проверка трубы не блокирует дублирование данных. Чтобы избежать записи дублирующихся данных в конечную точку, настройте логику трубы для удаления дублирующихся данных или предотвращения их генерации.
Обратите внимание, что вы не можете объединять потоки событий с потоками данных. Записи событий должны течь от этапа, генерирующего события, до конечной точки или исполнителя, но не могут быть объединены с потоками данных. Дополнительную информацию о фреймворке событий и потоках событий см. в разделе Обзор триггеров потока данных. Вы можете определить обязательные поля или предварительные условия для удаления записей из трубки на каждом этапе.
Предварительные условия — это условия, которые должны быть выполнены для того, чтобы запись могла перейти на этап обработки. Этапы обрабатывают все предварительные условия перед передачей записи на этап или обработкой ошибок. Когда запись не удовлетворяет всем настроенным предварительным условиям, она обрабатывается в соответствии с настройками обработки ошибок, настроенными для этого этапа.
Вы можете определить предварительные условия для любого процессора, исполняющего и большинства целевых этапов. Вы можете использовать большинство функций, констант трубки и свойств времени выполнения в предварительных условиях.
Настройте предварительные условия как часть общей логики трубки или для минимизации обработки ошибок. Например, вы можете использовать следующее выражение для исключения записей, исходящих из стран, кроме США:
${record:value('/COUNTRY') == 'US'}
```Записи с ошибками содержат информацию о неудовлетворенных предварительных условиях в свойстве заголовка errorMessage. Вы также можете просмотреть сообщения об ошибках при просмотре последних записей с ошибками в режиме мониторинга, когда трубка работает.# Обработка записей с ошибками
Вы можете настроить обработку записей с ошибками на уровне этапа и на уровне всей конвейерной линии. Вы также можете указать версию записи, которая будет использоваться как основа для записей с ошибками.
Когда на этапе происходит ошибка при обработке записи, Data Collector обрабатывает запись в соответствии с настройками этапа. Один из вариантов настройки этапа — передать запись в конвейер для обработки ошибок. Для этого варианта Data Collector обрабатывает запись в соответствии с настройками обработки ошибок записей конвейера.
При настройке конвейера обратите внимание, что обработка ошибок этапа имеет приоритет над обработкой ошибок конвейера. То есть конвейер можно настроить для записи ошибочных записей в файл, но если этап настроен для игнорирования ошибочных записей, эти записи будут игнорированы. Вы можете использовать эту функцию для уменьшения типов ошибочных записей, которые сохраняются для проверки и повторной обработки. Обратите внимание, что записи, не содержащие обязательных полей, не попадают в стадию. Они передаются напрямую в конвейер для обработки ошибок.
## Обработка записей с ошибками в конвейереОбработка записей с ошибками в конвейере определяет, как Data Collector обрабатывает записи с ошибками, отправленные в конвейер для обработки ошибок. Она также обрабатывает записи, намеренно удалённые из конвейера, например, записи без обязательных полей.Конвейер обрабатывает записи с ошибками в соответствии с атрибутами "Записи с ошибками" на вкладке "Записи с ошибками". Когда Data Collector встречает неожиданную ошибку, он останавливает конвейер и фиксирует эту ошибку.
Конвейер предоставляет следующие опции для обработки записей с ошибками:
- Удаление
Конвейер удаляет запись. Data Collector включает количество и метрики ошибочных записей.
- Отправка ответа источнику
Конвейер отправляет ошибочную запись обратно в источник микросервиса, чтобы включить её в ответ для клиента REST API источника. Data Collector включает количество и метрики ошибочных записей. Используется только в [конвейере микросервиса](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Microservice/Microservice_Title.html#concept_qfh_xdm_p2b).
 Недействительно в конвейере Data Collector Edge.
- Запись в Amazon S3
Конвейер записывает ошибочную запись и связанные с ней детали в Amazon S3. Data Collector включает количество и метрики ошибочных записей. Вы определяете атрибуты конфигурации Amazon S3.  Недействительно в конвейере Data Collector Edge.
- Запись в другой конвейер
Конвейер записывает ошибочную запись в SDC RPC конвейер. Data Collector включает количество и метрики ошибочных записей. При записи в другой конвейер Data Collector фактически создаёт SDC RPC исходный конвейер для передачи ошибочной записи в другой конвейер.
Вам необходимо создать SDC RPC целевой конвейер для обработки ошибочных записей. Конвейер должен включать SDC RPC источник, настроенный для чтения ошибочных записей из этого конвейера.
Дополнительные сведения о SDC RPC конвейерах см. в разделе [Обзор SDC RPC конвейеров](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/RPC_Pipelines/SDC_RPCpipelines_title.html#concept_lnh_z3z_bt).
 Недействителен в конвейере Data Collector Edge.
- Запись в Azure Event Hubs
Пайплайн записывает сведения об ошибках и связанные детали в Microsoft Azure Event Hubs. Data Collector включает в себя количество записей об ошибках и записи в метриках.
Вы определяете конфигурационные свойства Azure Event Hubs, которые будут использоваться.
 Недействителен в пайплайне Data Collector Edge.
- Запись в Elasticsearch
Пайплайн записывает сведения об ошибках и связанные детали в Elasticsearch. Data Collector включает в себя количество записей об ошибках и записи в метриках.
Вы определяете конфигурационные свойства Elasticsearch кластера, которые будут использоваться.
 Недействителен в пайплайне Data Collector Edge.- Запись в файл
Пайплайн записывает сведения об ошибках и связанные детали в локальный каталог. Датаколлектор включает в себя количество записей об ошибках и записи в метриках.
Вы определяете каталог и максимальный размер файла, который будет использоваться. Файлы ошибок названы в соответствии с атрибутами свойств файла пайплайна.
В настоящее время пайплайны кластера не поддерживают запись в файл.
- Запись в Google Cloud Storage
Пайплайн записывает сведения об ошибках и связанные детали в Google Cloud Storage. Датаколлектор включает в себя количество записей об ошибках и записи в метриках.
Вы определяете конфигурационные свойства Google Cloud Storage.
 Недействителен в пайплайне Data Collector Edge.
- Запись в Google Pub/Sub
Пайплайн записывает сведения об ошибках и связанные детали в Google Pub/Sub. Датаколлектор включает в себя количество записей об ошибках и записи в метриках.
Вы определяете конфигурационные свойства Google Pub/Sub.
 Недействителен в пайплайне Data Collector Edge.
- Запись в Kafka
Пайплайн записывает сведения об ошибках и связанные детали в Kafka. Датаколлектор включает в себя количество записей об ошибках и записи в метриках. Вы определяете конфигурационные свойства Kafka кластера, которые будут использоваться.
 Недействителен в пайплайне Data Collector Edge.
- Запись в Amazon Kinesis Streams
Пайплайн записывает сведения об ошибках и связанные детали в Amazon Kinesis Streams. Датаколлектор включает в себя количество записей об ошибках и записи в метриках. Вы можете определить конфигурационные свойства для используемого Kinesis потока.
 Недействителен для Data Collector Edge-канала.
- Запись в MapR поток
Пайплайн записывает сведения об ошибках и связанные с ними детали в MapR поток. Data Collector включает в себя счетчик ошибок и метрики записей.
Вы определяете конфигурационные свойства для используемого MapR Streams кластера.
 Недействителен для Data Collector Edge-канала.
- Запись в MQTT
Пайплайн записывает сведения об ошибках и связанные с ними детали в MQTT-агент. Data Collector включает в себя счетчик ошибок и метрики записей.
Вы определяете конфигурационные свойства для используемого MQTT-агента.
## Обработка ошибок записи
Большинство стадий включают опции для обработки ошибок записи. При возникновении ошибки при обработке записи, Data Collector обрабатывает запись в соответствии с опциями "Обработка ошибок записи" на вкладке "Общие" стадии.Стадии включают следующие опции обработки ошибок:
- Отбрасывание
Стадия тихо отбрасывает запись. Data Collector не записывает информацию об ошибке и не включает запись, на которой произошла ошибка, в счетчик ошибок записи и метрики.
- Передача в ошибки
Стадия передает запись в пайплайн для обработки ошибок. Пайплайн обрабатывает запись в соответствии с конфигурацией обработки ошибок пайплайна.
При мониторинге пайплайна вы можете просмотреть последние записи об ошибках и проблемы, с которыми они столкнулись, на вкладке "Записи об ошибках" стадии. После остановки пайплайна эта информация становится недоступной.
- Остановка пайплайна
Data Collector останавливает пайплайн и записывает информацию об ошибке. Ошибки остановки пайплайна отображаются как тревоги в режиме мониторинга и как ошибки в истории пайплайна.
В настоящее время пайплайны в режиме кластера не поддерживают остановку пайплайна.
## Пример
Стадия Kafka Consumer источника читает JSON-данные с максимальной длиной объекта 4096 символов, и стадия сталкивается с объектом длиной 5000 символов. В зависимости от конфигурации стадии, Data Collector либо отбрасывает запись, либо останавливает пайплайн, либо передает запись пайплайну для обработки ошибок записи.
После настройки стадии для передачи записей пайплайну, в зависимости от конфигурации обработки ошибок пайплайна, произойдет одно из следующих:- Когда пайплайн отбрасывает запись об ошибке,
Data CollectorЗаписи с ошибками будут отбрасываться без учета операции или причины.
При мониторинге конвейера вы можете просмотреть последнюю ошибочную запись и информацию об ошибке на вкладке "Ошибка записи" на этом этапе. Однако после остановки конвейера эта информация станет недоступной.
- Когда конвейер записывает ошибочные записи в целевую систему, Collectор данных записывает ошибочные записи и другую информацию об ошибках в целевую систему. Он также включает ошибочные записи в счетчики мониторинга и метрики.
## Ошибочные записи и версии
Когда Collectор данных создает ошибочные записи, он сохраняет данные и атрибуты из записи, которая вызвала ошибку, а затем добавляет информацию об ошибке в качестве атрибутов заголовка записи. Список атрибутов заголовка записи и других внутренних атрибутов заголовка, связанных с записью, см. в разделе [Внутренние атрибуты](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Pipeline_Design/RecordHeaderAttributes.html#concept_itf_55z_dz).
При настройке конвейера можно указать версию записи, которую следует использовать:
- Оригинальная запись — запись, сгенерированная источником впервые. Используйте эту запись, если вы хотите получить оригинальную запись без каких-либо дополнительных обработок конвейера.- Текущая запись — это запись на этапе, где произошла ошибка. В зависимости от типа ошибки эта запись может быть необработанной или частично обработанной на этапе возникновения ошибки. Используйте эту запись, если вы хотите сохранить обработку, которую конвейер выполнил до возникновения ошибки.
**Связанные концепции**
[Удаление нежелательных записей](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Pipeline_Design/DroppingUnwantedRecords.html#concept_dnj_bkm_vq)
# Атрибуты заголовка записи
Атрибуты заголовка записи — это атрибуты заголовка записи, которые вы можете использовать в логике конвейера по мере необходимости.Некоторые этапы создают атрибуты заголовка записи для определенных целей. Например, [источник с включенным CDC](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Pipeline_Design/CDC-Overview.html#concept_iws_mhd_ty) включает тип CRUD-операции в атрибут заголовка записи sdc.operation.type. Это позволяет целевым системам с включенным CRUD определить тип операции, который следует использовать при обработке записи. Аналогично, обработчик метаданных Hive генерирует атрибуты заголовка записи, которые некоторые целевые системы могут использовать как часть [решения для синхронизации с Hive](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Hive_Drift_Solution/HiveDriftSolution_title.html#concept_phk_bdf_2w). Другие этапы включают в себя запись информации, связанной с обработкой, в заголовок записи для общего использования. Например, если вы хотите обрабатывать события на основе этой информации, то [этап генерации событий](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Event_Handling/EventFramework-Title.html#concept_zrl_mhn_lx) будет включать тип события в заголовок записи. Некоторые источники включают информацию, такую как имя исходного файла, положение или разделение для каждой записи.Вы можете использовать некоторые процессоры для создания или обновления заголовка записи. Например, вы можете использовать калькулятор выражений для создания [атрибутов заголовка записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Pipeline_Design/RecordHeaderAttributes.html#concept_lmn_gdc_1w) на основе записи.
Включение атрибутов в заголовок записи не требует их использования в конвейере. Например, вы можете использовать источник Salesforce, включенный в CDC, в конвейере, не являющемся CDC, и игнорировать автоматически созданные атрибуты заголовка записи CDC.
При записи данных в целевую систему атрибуты заголовка записи сохраняются вместе с записью только при использовании целевого Google Pub/Sub Publisher или при использовании другого целевого объекта с форматом данных записи SDC. Чтобы сохранить информацию при использовании других форматов данных, используйте калькулятор выражений для копирования информации из атрибутов заголовка записи в поля записи.
## Использование атрибутов заголовка
Вы можете использовать оценку выражений или любой процессор сценария для создания или обновления атрибутов заголовка записи. Например, целевой объект MongoDB требует указания операции CRUD в атрибутах заголовка записи. Если источник данных не создает эту информацию автоматически, вы можете использовать калькулятор выражений или процессор сценария для установки значений атрибутов.Атрибуты заголовка записи являются строковыми значениями. Вы можете использовать [функцию record:attribute](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Expression_Language/Functions.html#concept_p1z_ggv_1r) в любом выражении для включения значений атрибутов в вычислениях.
**Примечание:** Атрибуты заголовка записи не имеют пути к полям. При использовании атрибутов в выражениях используйте только имя атрибута, заключив его в кавычки, так как атрибуты являются строками, как показано ниже:
${record:attribute('<имя атрибута>')}
Например, следующий калькулятор выражений добавляет атрибуты заголовка записи, созданные источником каталога, в запись:

### Внутренние свойства
При перемещении записей между различными стадиями, Data Collector генерирует и обновляет некоторые только для чтения внутренние свойства заголовка записи. Эти свойства можно использовать для отладки, но их можно обновлять только Data Collector.
Функции record:attribute не позволяют доступ к внутренним свойствам заголовка записи. Ниже приведена таблица, описывающая внутренние свойства заголовка записи и функции, доступные для доступа к данным в конвейере:| Внутренние свойства заголовка записи | Описание | Связанные функции |
| :----------------------------------- | :----------------------------------------------------------------------- | :------------------------- |
| stageCreator | ID стадии, создавшей запись. | запись:creator() |
| sourceId | Источник записи. Может содержать различные данные в зависимости от типа источника. | запись:id() |
| stagePath | Список стадий, обрабатывающих запись в порядке их названий. | запись:путь() |
| trackingId | Траектория записи через конвейер, начиная с sourceId, затем перечисляются стадии, обрабатывающие запись. | Неприменимо |
| previousTrackingId | Трекинг ID записи перед тем, как она попала на текущую стадию. | Неприменимо |
| errorStage | Стадия, которая породила ошибку. Только для записей с ошибками. | запись:errorStage() |
| errorStageLabel | Пользовательское имя стадии. Только для записей с ошибками. | запись:errorStageLabel() |
| errorCode | Код ошибки. Только для записей с ошибками. | запись:errorCode() |
| errorJobId | ID задания, запустившего конвейер. Только для записей с ошибками, запущенных конвейером, запущенным заданием Control Hub. | Неприменимо |
| errorMessage | Сообщение об ошибке. Только для записей с ошибками. | | | запись:errorMessage() |
| errorTimestamp | Время возникновения ошибки. Только для записей с ошибками. | запись:errorTime() |
| errorStackTrace | Стек вызовов, связанный с ошибкой. Только для записей с ошибками. | Неприменимо |## Этапы генерации заголовка записи
Ниже приведена таблица, которая показывает этапы, на которых создаются атрибуты заголовков записей для включения специального обработки:Ниже приведена таблица, которая показывает этапы, на которых создаются атрибуты заголовков записей для включения специальной обработки. | Этап | Описание |
| :----------------------------------------------------------- | :----------------------------------------------------------- |
| CDC-активированные источники | В заголовке свойства sdc.operation.type содержится тип операции CRUD, а в заголовке записи могут содержаться другие данные CRUD и CDC. Дополнительные сведения см. в разделе [CDC-активированные источники](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Pipeline_Design/CDC-Overview.html#concept_iws_mhd_ty). |
| Источники, обрабатывающие данные в формате Avro и процессор Data Parser | В заголовке записи включается схема Avro. |
| Источники, обрабатывающие данные XML | В заголовке записи можно включить пространство имен в xmlns. Дополнительные сведения см. в разделе [Включение полей XPaths](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Data_Formats/XMLDFormat.html#concept_w3k_1ch_qz). |
| [Этапы, генерирующие события](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Event_Handling/EventFramework-Title.html#concept_zrl_mhn_lx) | Заголовок записи содержит данные о событиях. Дополнительные сведения см. в разделе "Записи событий" в документации по этапам. |
| Источник Amazon S3 | Может быть настроен для включения в заголовке записи системных и пользовательских метаданных объектов.Дополнительные сведения см. в разделе [Источник Amazon S3](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/AmazonS3.html#concept_inh_qjx_yw).
| Источник Amazon SQS Consumer | Может быть настроен для включения в заголовке записи свойств сообщений SQS. Дополнительные сведения см. в разделе [Источник Amazon SQS Consumer](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/AmazonSQS.html#concept_lk3_rfc_vbb). |
| [Источник Azure Data Lake Storage Gen1](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/ADLS-G1.html#concept_dnh_pwp_b3b) | В заголовке записи содержится информация о первоначальном файле записи. |
| [Источник Azure Data Lake Storage Gen2](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/ADLS-G2.html#concept_osx_qgz_xhb) | В заголовке записи содержатся сведения о файле-источнике записи. |
| Источник каталога | В [заголовке записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/Directory.html#concept_tlj_3g1_2z) содержатся сведения о файле-источнике записи. |
| Источник хвоста файла | В [заголовке записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/FileTail.html#concept_tlj_3g1_2z) содержатся сведения о файле-источнике записи. Может быть настроен для использования [атрибутов меток](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/FileTail.html#concept_crd_tlx_fs) для набора файлов. |
| Источник подписчика Google Pub/Sub | При наличии в [заголовке записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/PubSub.html#concept_55z_3g1_2z)html#concept_qvl_y4q_v1b) включаются пользовательские атрибуты сообщения [атрибуты](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/PubSub.html#concept_qvl_y4q_v1b). |
| Источник Groovy-скриптов | Может быть настроен для создания [заголовка записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/GroovyScripting.html#concept_dyg_zpr_l3b) атрибутов. |
| Источник Hadoop FS | В [заголовке записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/HadoopFS-origin.html#concept_efs_2fs_fdb) содержатся сведения о файлах и смещениях. |
| [Источник Hadoop FS Standalone](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/HDFSStandalone.html#concept_djz_pdm_hdb) | В заголовке записи содержатся сведения о файле-источнике записи. |
| Источник HTTP-клиента | В [заголовке записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/HTTPClient.html#concept_etl_bsh_l2b) включаются заголовки ответа. |
| HTTP Server origin | В [заголовке свойства записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/HTTPServer.html#concept_rvj_5qy_4cb) включена информация о URL-адресе запроса и заголовочных полях запроса. |
| JavaScript Scripting origin | Может быть настроен для создания [заголовочных свойств записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/JavaScriptScripting.html#concept_ty1_t13_p3b). |
| JDBC Multitable Consumer origin | В [заголовочных свойствах записи JDBC](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MultiTableJDBCConsumer.html) содержатся свойства записи.html#concept_xrx_11y_4y) включена информация о таблицах и типах данных. |
| JDBC Query Consumer origin | Может быть настроен для включения информации о таблицах и типах данных в [заголовочных свойствах записи JDBC](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/JDBCConsumer.html#concept_egw_d4c_kw). |
| Jython Scripting origin | Может быть настроен для создания [заголовочных свойств записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/JythonScripting.html#concept_ydl_j13_p3b). |
| Kafka Consumer origin | В [заголовке свойства записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/KConsumer.html#concept_tlj_3g1_2z) включена информация о источнике записи. |
| Kafka Multitopic Consumer origin | В [заголовке свойства записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/KafkaMultiConsumer.html#concept_tlj_3g1_2z) включена информация о источнике записи. |
| MapR FS Standalone origin | В [заголовочных свойствах записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MapRFS.html#concept_efs_2fs_fdb) включена информация о файлах и смещениях. |
| MapR Multitopic Streams Consumer origin | В [заголовочных свойствах записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MapRFSStandalone.html#concept_uyy_nsn_ndb) включена информация о исходных файлах записи. |
| MapR Streams Consumer origin | Содержит информацию о происхождении записи в [заголовке свойств записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MapRStreams.html#concept_uyy_nsn_ndb). |com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MapRStreamsMultiConsumer.html#concept_tlj_3g1_2z). |
| MQTT Subscriber origin | Содержит информацию о происхождении записи в [заголовке свойств записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MQTTSubscriber.html#concept_fwm_2kn_scb). |
| Pulsar Consumer origin | Содержит информацию о происхождении записи в [заголовке сообщения](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/PulsarConsumer.html#concept_tvr_r3m_y2b). |
| RabbitMQ Consumer origin | Содержит информацию о происхождении записи в [заголовке свойств записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/RabbitMQ.html#concept_rg5_yts_y1b) в виде RabbitMQ [свойств](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/RabbitMQ.html#concept_rg5_yts_y1b). |
| REST Service origin | Содержит информацию о URL и заголовках запроса в [заголовке свойств записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/RESTService.html#concept_rrj_5qy_4cb). |
| Salesforce origin | Содержит информацию о происхождении записи в [заголовке Salesforce](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/Salesforce.html#concept_psx_1wg_cy). |
| SFTP/FTP/FTPS Client origin | Содержит информацию о первоначальном файле записи в [заголовке свойств записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/SFTP.html#concept_tlj_3g1_2z). | | SQL Server 2019 BDC Multitable Consumer origin | Содержит информацию о таблицах и типах данных в [заголовке свойств записи JDBC](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/SQLServerBDCMultitable.html#SQLServerBDCMultitable-HeaderAtts). |
| Teradata Consumer origin | Содержит информацию о таблицах и типах данных в [заголовке свойств записи JDBC](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/TeradataConsumer.html#concept_tlj_3g1_2z). |
| Couchbase Lookup processor | Включает информацию о состоянии документа для поиска в [заголовке записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/CouchbaseLookup.html#concept_am3_jjj_j3b). |
| Expression Evaluator processor | Может быть настроен для создания или обновления [заголовка записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/Expression.html#concept_qf3_mfq_f5). |
| Groovy Evaluator processor | Может быть настроен для создания или обновления [заголовка записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/Groovy.html#concept_uw4_kqw_1y). |
| Hive Metadata processor | Генерирует [заголовок записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/HiveMetadata.html#concept_g3p_sss_dw) для записей данных. Эти заголовки могут быть использованы как часть решения для синхронизации с [Hive](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Hive_Drift_Solution/HiveDriftSolution_title.html#concept_phk_bdf_2w). Может быть настроен для добавления [пользовательских заголовков](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/HiveMetadata.html#concept_jv2_jjn_l1b) к метаданным записей. || HTTP Client processor | Включает заголовочные поля ответа в [заголовке записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/HTTPClient.html#concept_ekz_wrz_zw). |
| JavaScript Evaluator processor | Может быть настроен для создания или обновления [заголовка записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/JavaScript.html#concept_sh1_khh_cy). |
| Jython Evaluator processor | Может быть настроен для создания или обновления [заголовка записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/Jython.html#concept_5w4_kqw_1y). |
| Schema Generator processor | Генерирует схему и записывает её в пользовательское определенное свойство [заголовка записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/SchemaGenerator.html#concept_rfz_ks3_x1b). |
## Атрибуты заголовка записи для записи на основе записейЦель может использовать информацию из атрибутов заголовка записи для записи данных. Цели записи данных в формате Avro могут использовать Avro схемы в заголовке записи. Цели Hadoop FS и MapR FS могут использовать атрибуты заголовка записи для определения каталога, в который нужно записать, и для определения времени, когда файлы должны быть перекатаны. Это часть решения для синхронизации данных Hive Drift. Дополнительная информация доступна по ссылке [решение для синхронизации данных Hive Drift](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Hive_Drift_Solution/HiveDriftSolution_title.html#concept_phk_bdf_2w).
Чтобы использовать атрибуты заголовка записи, настройте цель для использования заголовка и убедитесь, что записи содержат атрибуты заголовка.
Hive метаданные обработчик автоматически генерирует атрибуты заголовка записи для Hadoop FS и MapR FS для использования в решении для синхронизации данных Hive Drift. Для всех других целей атрибуты заголовка записи могут быть добавлены с помощью вычислителя выражений или обработчика скриптов.
Вы можете использовать следующие атрибуты заголовка записи в цели:
- Атрибут targetDirectory для всех целей Azure Data Lake Storage, а также для Hadoop FS, Local FS и MapR FS
Атрибут заголовка записи targetDirectory определяет каталог, в который записываются записи. Если каталог не существует, цель создаст его. Атрибут заголовка записи targetDirectory заменяет атрибут "шаблон каталога" в настройках цели. При использовании `targetDirectory` для определения каталога, временная метка, настроенная для цели, используется только для определения, была ли запись задержана. Временная метка не используется для определения каталога, в который будет записана запись.
Чтобы использовать атрибут заголовка записи `targetDirectory`, выберите **Каталог** в разделе **Заголовок** на вкладке **Выход**.
- Атрибут `avroSchema` для всех целей записи данных в формате Avro
Атрибут заголовка записи `avroSchema` определяет Avro схему записи. При использовании этого атрибута заголовка схема Avro, используемая в цели, не может быть определена.
Чтобы использовать атрибут заголовка записи `avroSchema`, выберите **Avro** в разделе **Формат данных** и выберите **В заголовке записи** для атрибута **Положение Avro схемы**.
- Атрибут `roll` для всех целей Azure Data Lake Storage, а также для Hadoop FS, Local FS и MapR FS
При наличии атрибута `roll` в заголовке записи происходит перекат файла.
Вы можете определить имя атрибута `roll`. При использовании Hive метаданных обработчика для генерации атрибута `roll` используйте значение по умолчанию "roll". При использовании вычислителя выражений используйте имя атрибута `roll`, определенное в обработчике. Чтобы использовать свойства заголовков, выберите "Использовать свойства заголовков" на вкладке "Выход" и определите имя этого свойства.### Генерация свойств для записи на основе записей
Вы можете использовать обработчик метаданных Hive, выражения оценки или обработчик скриптов для генерации свойств заголовков для записи на основе записей. Обработчик метаданных Hive автоматически генерирует свойства заголовков для Hadoop FS и MapR FS в качестве части решения для синхронизации Hive. Для всех остальных целей можно использовать выражения оценки или обработчик скриптов для добавления свойств заголовков.
Чтобы использовать Expression Evaluator или обработчик скриптов, вы должны сгенерировать свойства заголовков, ожидаемые целевым объектом. Используйте следующие критерии для генерации свойств заголовков:
- Генерация целевого каталога
При использовании Expression Evaluator или обработчика скриптов для генерации целевого каталога обратите внимание на следующие детали:Целевой объект ожидает, что этот каталог будет расположен в заголовочном свойстве с именем "targetDirectory". Используемый каталог целевого объекта полностью совпадает с тем, что указано в заголовочном свойстве targetDirectory. В отличие от шаблонов каталогов, каталог, указанный в свойстве targetDirectory, не должен содержать никаких компонентов, требующих оценки, таких как параметры времени выполнения, переменные или свойства времени выполнения. При определении выражения, вычисляющего каталог, вы можете использовать любые допустимые компоненты, включая выражения, вычисляющие данные в записи. Например, вы хотите записывать записи в разные каталоги на основе данных, собираемых в процессе выполнения трубопровода, а также на основе идентификаторов региона и магазина. Вы можете определить временную переменную DIR, которая определяет базу каталога, и для каждого собираемого в процессе выполнения трубопровода Data Collector определите DIR. Затем вы можете использовать следующее выражение в Expression Evaluator для определения свойства targetDirectory: `$ {runtime:conf('DIR')/transactions/${record.value('/region')}/${record.value('/storeID')}}`
- Генерация схемы Avro
При использовании Expression Evaluator или обработчика скриптов для генерации схемы Avro обратите внимание на следующие детали: целевой объект ожидает, что схема Avro будет указана в заголовочном свойстве с именем "avroSchema". Используйте стандартный формат схемы Avro, например: `{"type":"record","name":"table_name","namespace":"database_name", "fields":[{"name":"int_val","type":["null","int"],"default":null}, {"name":"str_val","type":["null","string"],"default":null}]}` Имена базы данных и таблицы должны быть указаны в схеме Avro.
**Подсказка:** Вы можете использовать генератор схем Avro для помощи в генерации схем Avro.
- Свойство наклона (roll)
При создании свойства roll с помощью Expression Evaluator или Script Processor обратите внимание на следующие детали: используйте любое имя для свойства, а затем укажите имя свойства в целевом положении. Настройте выражение, определяющее, когда файлы будут скролиться.Чтобы определить эти заголовочные свойства в Expression Evaluator, выполните следующие шаги:
1. В разделе "Expression Evaluator" на вкладке "Expression", укажите имя "Заголовочное свойство".
Чтобы создать целевую директорию, используйте `targetDirectory`.
Чтобы создать Avro схему, используйте `avroSchema`.
Вы можете использовать любое имя для заголовочного свойства, которое будет служить индикатором скролирования.
2. Для "Заголовочного свойства выражения" определите выражение, результат которого будет содержать информацию, которую вы хотите использовать в целевом объекте.
Для получения информации о создании заголовочных свойств с помощью Script Processor, обратитесь к документации Script Processor.
## Просмотр свойств в предварительном просмотре данных
Вы можете использовать предварительный просмотр данных для просмотра заголовочных свойств, связанных с записями на любом этапе в конвейере. Чтобы просмотреть заголовочные свойства записей, включите предварительный просмотр свойств "Показать заголовки записей/полей".
Например, следующее изображение показывает записи, созданные источником каталога в предварительном просмотре данных.

Список "Заголовок записи" на этом этапе конвейера отображает набор только для чтения внутренних заголовочных свойств записей. Заголовочные свойства "Значения" создаются источником каталога.# Свойства полей
Свойства полей предоставляют дополнительную информацию о каждом поле, которая может быть использована в логике конвейера.
Некоторые этапы генерируют свойства полей. Например, источник Salesforce содержит оригинальный тип данных Salesforce для каждого поля в свойстве `salesforce.salesforceType`.
Вы можете создавать, изменять и оценивать свойства полей в конвейере. Выражение-калькулятор может создавать и изменять свойства на уровне полей. Вы можете использовать функции свойств полей для оценки свойств полей.
Например, вы можете использовать Expression Evaluator для создания свойств полей на основе данных записей, а затем передать записи в селектор потока, который будет маршрутизировать данные на основе значений свойств.
Свойства полей автоматически включаются в записи, записываемые в целевую систему при использовании формата данных SDC RPC в целевом объекте.
Чтобы включить свойства полей в данные записей или использовать их в вычислениях, используйте функции `record:fieldAttribute` и `record:fieldAttributeOrDefault`. Дополнительная информация о функциях свойств полей доступна в [документации по функциям записей](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Expression_Language/Functions.html#concept_p1z_ggv_1r). При использовании данных для предварительного просмотра можно включить просмотр атрибутов полей и атрибутов заголовков записей для помощи в разработке потока.## Использование атрибутов полей
Как и атрибуты заголовков записей, вы можете использовать Expression Evaluator или любой скриптовый процессор для создания или обновления атрибутов полей. Например, при обработке данных Avro источник может помещать информацию о точности и количестве знаков после запятой для числовых данных в атрибуты полей. Если вы хотите увеличить эти значения перед записью их в целевую систему, вы можете использовать Expression Evaluator или любой скриптовый процессор для установки значений атрибутов.
Атрибуты полей являются строковыми значениями. Вы также можете использовать функцию `record:fieldAttribute` в любом выражении для включения значений атрибутов в вычисления.
**Важно:** атрибуты полей не имеют пути к полю. При использовании атрибутов в выражениях после указания пути к связанному полю используйте только имя атрибута в кавычках, как показано ниже:
${record:fieldAttribute(<путь к полю>,'<имя атрибута>')}
Например, предположим, что вы обрабатываете данные Avro как часть [решения Avive для синхронизации с Hive](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Hive_Drift_Solution/HiveDriftSolution_title.html#concept_phk_bdf_2w). Все числовые поля содержат автоматически сгенерированные атрибуты точности и колич�数值表达式已经转换为数字,无需进一步修改。
最终翻译结果如下:
## Использование атрибутов полей
Как и атрибуты заголовков записей, вы можете использовать Expression Evaluator или любой скриптовый процессор для создания или обновления атрибутов полей. Например, при обработке данных Avro источник может помещать информацию о точности и количестве знаков после запятой для числовых данных в атрибуты полей. Если вы хотите увеличить эти значения перед записью их в целевую систему, вы можете использовать Expression Evaluator или любой скриптовый процессор для установки значений атрибутов.
Атрибуты полей являются строковыми значениями. Вы также можете использовать функцию `record:fieldAttribute` в любом выражении для включения значений атрибутов в вычисления.
**Важно:** атрибуты полей не имеют пути к полю. При использовании атрибутов в выражениях после указания пути к связанному полю используйте только имя атрибута в кавычках, как показано ниже:
${record:fieldAttribute(<путь к полю>,'<имя атрибута>')}
Например, предположим, что вы обрабатываете данные Avro как часть [решения Avive для синхронизации с Hive](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Hive_Drift_Solution/HiveDriftSolution_title.html#concept_phk_bdf_2w). Все числовые поля содержат автоматически сгенерированные атрибуты точности и количества знаков после запятой. Перед передачей данных в процессор метаданных Hive вы хотите создать новый численный поле "цена" на основе существующего численного поля "стоимость".Следующее выражение в Expression Evaluator создает поле "цена" и определяет атрибуты точности и количества знаков после запятой на основе существующих значений точности и количества знаков после запятой поля "стоимость".

## Этапы генерации атрибутов полейНиже приведена таблица, показывающая этапы, которые генерируют атрибуты полей для включения специального обработки.
| Этап | Описание |
| :----------------------------------------------------- | :----------------------------------------------------------- |
| Источники, обрабатывающие данные Avro<br />Origins that process Avro data | Включает атрибуты "точности" и "масштаба" для каждого десятичного поля. |
| Источники, обрабатывающие данные XML<br />Origins that process XML data | В атрибуты полей можно включать [информацию XPath](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Data_Formats/XMLDFormat.html#concept_w3k_1ch_qz), атрибуты XML и объявления пространств имен. |
| Google BigQuery источник | Включает атрибуты полей с оригинальной точностью для полей Datetime, Time и Timestamp. |
| Salesforce источник | Включает информацию о типах данных в атрибуты полей. |
| Salesforce Lookup Processor | Включает информацию о типах данных в атрибуты полей. |
| XML Parser | В атрибуты полей можно включать информацию XPath, атрибуты XML и объявления пространств имен. |
## Просмотр свойств полей в предварительном просмотре данныхТак же, как и свойства заголовков записей, вы можете использовать предварительный просмотр данных для просмотра свойств полей в потоке.
# Обработка изменений данных
Некоторые стадии позволяют легко обрабатывать изменения данных в потоке, например, захваченные изменения данных (CDC) или транзакционные данные.
Источники, включенные в режиме CDC, могут читать захваченные изменения данных. Некоторые из них специально предназначены для чтения захваченных изменений данных, другие могут быть настроены для этого. При чтении изменённых данных они определяют соответствующие CRUD-операции и включают их в заголовок записи `sdc.operation.type`.
Обработчики и целевые объекты, включённые в режиме CRUD, могут использовать тип CRUD-операции в заголовке записи при записи, что позволяет внешним системам выполнять соответствующие действия.
Использование источников, включённых в режиме CDC, и стадий, включённых в режиме CRUD, позволяет легко записывать изменённые данные из одной системы в другую. Вы также можете использовать источники, включённые в режиме CDC, для записи в целевые объекты, не поддерживающие CRUD, и использовать источники, не включённые в режиме CDC, для записи в стадии, поддерживающие CRUD. Подробнее об этом см. в [пользовательском руководстве](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Pipeline_Design/CDC-Overview.html#concept_y5y_5xd_5y).## Заголовочные свойства CRUD-операций
`sdc.operation.type` Источники, включенные в режиме CDC, включают заголовок записи `sdc.operation.type` во всех записях при чтении измененных данных.
Обработчики и целевые объекты, включенные в режиме CRUD, могут использовать тип CRUD-операции в заголовке записи при записи, что позволяет внешним системам выполнять соответствующие действия.
Заголовок записи `sdc.operation.type` использует следующие целые числа для представления CRUD-операций:
- 1 для вставки записи
- 2 для удаления записи
- 3 для обновления записи
- 4 для вставки или обновления записи
- 5 для не поддерживаемой операции или кода
- 6 для восстановления удаленной записи
- 7 для замены записи
- 8 для слияния записи
**Примечание:** В зависимости от поддерживаемых операций исходной системы, некоторые источники используют только подмножество операций CRUD. Аналогично, целевые объекты распознают только подмножество операций CRUD, поддерживаемых целевой системой. Подробнее об этом см. в документации источников и целевых объектов.
### Ранние реализации
В ранних версиях для некоторых источников, включенных в режиме CDC, использовались разные заголовки записей, но теперь все они используют заголовок записи `sdc.operation.type`. Ранние заголовки CRUD сохранены для обратной совместимости.Аналогично, целевые объекты, включенные в режиме CRUD, могут искать тип CRUD-операции в других заголовках записей, но теперь они сначала ищут заголовок записи `sdc.operation.type`, а затем альтернативные заголовки. Функция альтернативных заголовков сохранена для обратной совместимости.
## Этапы включения CDC
Этапы включения CDC предоставляются в записи заголовка свойств `sdc.operation.type`, которые определяют типы операций CRUD. Некоторые источники предоставляют альтернативные и дополнительные заголовочные свойства.Следующие этапы предоставляют заголовочные свойства записей CRUD:
| Фаза, включающая CDC | Заголовок атрибута записи CRUD |
| :------------------- | :---------------------------- |
| MapR DB CDC | В заголовке записи атрибут `sdc.operation.type` включает тип CRUD операции. В заголовке записи также включаются другие атрибуты CDC. Дополнительная информация доступна в разделе [CRUD операции и атрибуты заголовка CDC](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MapRdbCDC.html#concept_oq4_mhh_qbb). |
| MongoDB Oplog | В заголовке записи атрибут `sdc.operation.type` включает тип CRUD операции. В заголовке записи также могут быть включены другие атрибуты CDC, такие как `op` и `ns`. Дополнительная информация доступна в разделе [Сгенерированные записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MongoDBOplog.html#concept_wc3_byl_5y). |
| MySQL двоичный журнал | В заголовке записи атрибут `sdc.operation.type` включает тип CRUD операции. В записи также включаются другие атрибуты CDC. Дополнительная информация доступна в разделе [Сгенерированные записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MySQLBinaryLog.html#concept_rfd_15l_dy). |
| Oracle CDC клиент | В заголовке записи атрибуты `sdc.operation.type` и `oracle.cdc.operation` включают тип CRUD операции. Дополнительная информация доступна в разделе [Атрибуты заголовка CRUD операций](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/OracleCDC.html#concept_x4h_m42_5y). |В заголовке записи также включаются другие атрибуты CDC с префиксом `oracle.cdc`, например `oracle.cdc.table`.
| PostgreSQL CDC клиент | В записи включаются атрибуты CRUD операций. В заголовке записи также включаются другие атрибуты CDC с префиксом `postgres.cdc`, например `postgres.cdc.lsn`. Дополнительная информация доступна в разделе [Сгенерированные записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/PostgreSQL.html#concept_zwv_tw5_n2b). |
| Salesforce | В заголовке записи атрибут `sdc.operation.type` включает тип CRUD операции. | Для получения дополнительной информации см. [заголовочные атрибуты CRUD-операций](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/Salesforce.html#concept_yns_y2m_5y). |
| SQL-парсер | В следующих двух заголовках включены типы CRUD-операций: `sdc.operation.type` и `oracle.cdc.operation`. Для получения дополнительной информации см. [сгенерированные записи](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/SQLParser.html#concept_qwp_cwf_wdb). |
| SQL Server CDC-клиент | В заголовочном атрибуте записи `sdc.operation.type` включены типы CRUD-операций. В заголовочном атрибуте `jdbc` включены данные CDC. Для получения дополнительной информации см. [заголовочные атрибуты записей](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/SQLServerCDC.html#concept_pc4_xts_r1b). |
| SQL Server изменений отслеживания | В заголовочном атрибуте записи `sdc.operation.type` включены типы CRUD-операций. В заголовочном атрибуте `jdbc` включены данные CDC. Для получения дополнительной информации см. [заголовочные атрибуты записей](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/SQLServerCDC.html#concept_pc4_xts_r1b). |`SYS_CHANGE` включены дополнительные данные из таблицы изменений. Для получения дополнительной информации см. [заголовочные атрибуты записей](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/SQLServerChange.html#concept_pc4_xts_r1b). ## Изменения данных в SQL Server или Azure SQL DatabaseSQL Server и Azure SQL Database предоставляют несколько методов для отслеживания изменений данных. В Data Collector подходящий источник зависит от метода, используемого базой данных для отслеживания изменений, как показано в следующей таблице:
| Метод отслеживания изменений | Источник Data Collector |
| :--------------------------- | :------------------------------------------------------------------------------------- |
| Таблица CDC | [Клиент CDC SQL Server](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/SQLServerCDC.html#concept_ut3_ywc_v1b) Для дополнительной информации о таблицах CDC, см. [документацию Microsoft](https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-ver15). |
| Таблица отслеживания изменений | [Отслеживание изменений SQL Server](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/SQLServerChange.html#concept_ewq_b2s_r1b) Для дополнительной информации о таблицах отслеживания изменений, см. [документацию Microsoft](https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-tracking-sql-server?view=sql-server-ver15). |
| Таблица временных данных | [JDBC Multi Table Consumer](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MultiTableJDBCConsumer.html#concept_zp3_wnw_4y) или [JDBC Query Consumer](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/JDBCConsumer.html#concept_qhf_hjr_bs) Для дополнительной информации о таблицах временных данных, см. [документацию Microsoft](https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15). |## Этапы для включения CRUD
1. Создайте модель данных (Model).
2. Создайте представление (View).
3. Создайте контроллер (Controller).
4. Настройте маршрутизацию (Routing).
5. Проведите тестирование (Testing).
Каждый из этих этапов важен для корректной работы CRUD-приложения.Следующие этапы идентифицируют операции CRUD, хранящиеся в атрибутах заголовка записи, и могут выполнять записи на основе этих значений. Некоторые этапы также предоставляют атрибуты, связанные с CRUD.
| Фаза, включающая CRUD | Поддерживаемые операции | Обработка фазы |
| :--------------------- | :---------------------- | :-------------- |
| Обработчик JDBC Tee | Вставка, обновление, удаление | Определяется операция, используемая на основе следующих условий: `sdc.operation.type` заголовок записи атрибута фазы. По умолчанию используются операции ввода и обработки неподдерживаемых операций. Атрибуты обработки включают атрибут "журнал изменений", который позволяет обрабатывать записи в зависимости от источника, используемого для включения CDC. Дополнительная информация доступна по ссылке [Определение CRUD операций](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/JDBCTee.html#concept_qfd_tpm_5y). |
| Цель Couchbase | Вставка, обновление, UPSERT, удаление | Определяется операция, используемая на основе следующих условий: `sdc.operation.type` заголовок записи атрибута фазы. По умолчанию используются операции записи и обработки неподдерживаемых операций. Дополнительная информация доступна по ссылке [Определение CRUD операций](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Destinations/Couchbase.html#concept_i1c_vby_g3b). || Цель Elasticsearch | Создание (вставка), обновление, индексирование (UPSERT), удаление, обновление с `doc_as_upsert` (MERGE) | Определяется операция, используемая на основе следующих условий: `sdc.operation.type` заголовок записи атрибута фазы. По умолчанию используются операции и обработка неподдерживаемых операций. Дополнительная информация доступна по ссылке [Определение CRUD операций](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Destinations/Elasticsearch.html#concept_w2r_ktb_ry). |
| Цель GPSS Producer | Вставка, обновление, объединение | Определяется операция, используемая на основе следующих условий: `sdc.operation.type` заголовок записи атрибута фазы. По умолчанию используются операции и обработка неподдерживаемых операций. Дополнительная информация доступна по ссылке [Определение CRUD операций](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Destinations/GPSS.html#concept_alc_2bf_r3b). |
| Цель JDBC Producer | Вставка, обновление, удаление | Определяется операция, используемая на основе следующих условий: `sdc.operation.type` | Назначение | Действия | Определение действий, которое следует использовать, зависит от следующих условий: `sdc.operation.type` атрибуты обработки действий по умолчанию и неподдерживаемых действий в стадии заголовка записи. Для получения дополнительной информации см. [Определение CRUD действий](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Destinations/JDBCProducer.html#concept_plv_jpn_5y). || :------------------- | :------------------------------------------------------------- | :------------------------------------------------------------------------------------------------------ |
| Цель Kudu | Вставка, обновление, UPSERT, удаление | Определение действий, которое следует использовать, зависит от следующих условий: атрибуты `sdc.operation.type` обработки действий по умолчанию и неподдерживаемых действий в стадии заголовка записи. Для получения дополнительной информации см. [Определение CRUD действий](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Destinations/Kudu.html#concept_dvg_vvj_wx). |
| Цель MapR DB JSON | Вставка, обновление, удаление | Определение действий, которое следует использовать, зависит от следующих условий: атрибуты `sdc.operation.type` вставки API и установки API в стадии заголовка записи. Для получения дополнительной информации см. [Запись в MapR DB JSON](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Destinations/MapRDBJSON.html#concept_hy5_3nb_xbb). |
| Цель MongoDB | Вставка, обновление, замена, удаление | Определение действий, которое следует использовать, зависит от следующих условий: атрибуты `sdc.operation.type` обработки действий по умолчанию и неподдерживаемых действий в стадии заголовка записи. Для получения дополнительной информации см. [Определение CRUD действий](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Destinations/MongoDB.html#concept_bkc_m24_4v). || Цель Redis | Вставка, обновление, замена, удаление | Определение действий, которое следует использовать, зависит от следующих условий: `sdc.operation.type` атрибуты обработки действий по умолчанию и неподдерживаемых действий в стадии заголовка записи. Для получения дополнительной информации см. [Определение CRUD действий](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Destinations/Redis.html#concept_dz2_4xh_xbb). |
| Цель Salesforce | Вставка, обновление, UPSERT, удаление, неудаление | Определение действий, которое следует использовать, зависит от следующих условий: `sdc.operation.type` атрибуты обработки действий по умолчанию и неподдерживаемых действий в стадии заголовка записи. Для получения дополнительной информации см. [Определение CRUD действий](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Destinations/Salesforce.html#concept_opg_tyg_4z). |
| Цель Snowflake | Вставка, обновление, удаление | Определяет операцию, используемую в зависимости от следующих условий: `sdc.operation.type` для получения информации о свойствах заголовка записи, см. [Определение CRUD-операций](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Destinations/Snowflake.html#concept_umx_qgj_ggb). |
## Записи измененийЖурналы изменений могут предоставлять данные журнала в различных форматах. Процессор JDBC Tee и целевой объект JDBC Producer могут декодировать большинство форматов журналов изменений для генерации записей данных на основе исходных журналов изменений. При использовании других целевых объектов, поддерживающих CRUD, может потребоваться добавить дополнительную обработку в конвейер для изменения формата записей.
Например, записи Microsoft SQL CDC, созданные источником JDBC Query Consumer, включают CDC-поля в записях данных. Вы можете использовать процессор Field Remover для удаления всех ненужных полей из записей.
С другой стороны, MySQL Server binary log, прочитанный источником MySQL Binary Log, предоставляет новые или обновленные данные в поле New Data map и измененные или удаленные данные в поле Changed Data map. Вы можете использовать процессор Field Flattener для разглаживания полей карты с использованием необходимых данных и процессор Field Remover для удаления всех ненужных полей.
Для получения дополнительной информации о формате сгенерированных записей обратитесь к исходной документации, поддерживающей CDC.
## Практические примеры
Вы можете использовать источники, поддерживающие CDC, и целевые объекты, поддерживающие CRUD, вместе или отдельно в конвейере. Ниже приведены некоторые типичные примеры:- Источники, поддерживающие CDC, и целевые объекты, поддерживающие CRUD
Вы можете использовать источники, поддерживающие CDC, и целевые объекты, поддерживающие CRUD, для удобной обработки изменённых записей и записи их в целевую систему. Например, предположим, что вы хотите записать данные CDC из Microsoft SQL Server в Kudu. Для этого вы можете использовать источник SQL Server CDC Client, поддерживающий CDC, для чтения данных из таблицы изменений Microsoft SQL Server. Источник помещает тип операции CRUD в заголовок свойства `sdc.operation.type`: 1 для INSERT, 2 для DELETE, 3 для UPDATE. Вы настраиваете конвейер для записи в целевой объект Kudu, поддерживающий CRUD. В целевом объекте Kudu вы можете задать по умолчанию операцию для записей, для которых свойство `sdc.operation.type` не установлено, и настроить обработку ошибок для недопустимых значений. Вы устанавливаете значение по умолчанию как INSERT и настраиваете целевой объект для использования этого значения по умолчанию для представления недопустимых значений. В целевом объекте Kudu INSERT поддерживается значением 1, DELETE — 2, UPDATE — 3, а UPSERT — 4. При запуске конвейера источник SQL Server CDC Client определяет тип операции CRUD для каждой записи и записывает его в заголовок свойства `sdc.operation.type`. Целевой объект Kudu использует операцию из свойства `sdc.operation.type` для определения того, как обрабатывать каждую запись в целевой системе. Записи, для которых свойство `sdc.operation.type` не установлено, например, созданные конвейером, будут рассматриваться как записи INSERT. Записи с недопустимыми значениями будут обрабатываться с тем же поведением по умолчанию.
- Включение источника CDC для записи в некrud-назначение Если требуется записать изменённые данные в некdc-назначение, можно использовать выражение-оценку или скрипт-обработчик для перемещения информации о crud-операциях из заголовочного свойства `sdc.operation.type` в поля, чтобы эта информация оставалась в записи. Например, предположим, что вы хотите читать данные из Oracle LogMiner redo-лога и записывать записи вместе со всем CDC-информацией в поле записи в Hive-таблицу. Для этого вы будете использовать Oracle CDC Client источник для чтения redo-лога, а затем добавите Expression Evaluator для перемещения crud-информации из заголовочного свойства `sdc.operation.type` в запись. Oracle CDC клиент записывает другую CDC-информацию (например, имя таблицы и scn) в заголовочное свойство `oracle.cdc`, поэтому вы также можете использовать выражение для извлечения этой информации в запись. Затем вы можете использовать Hadoop FS назначение для записи усилённых записей в Hive.
- Некdc-источник для crud-назначения
При чтении данных из некdc-источника можно использовать Expression Evaluator или скрипт-обработчик для определения заголовочного свойства `sdc.operation.type`. Например, предположим, что вы хотите читать данные из транзакционной базы данных таблицы и поддерживать синхронизацию с измерительной таблицей. Вы будете использовать JDBC запрос-потребитель для чтения исходной таблицы и JDBC поиск-обработчик для проверки значений первичных ключей каждой записи в измерительной таблице. Затем, на основе выходных данных поиска-обработчика, вы будете знать, есть ли в таблице соответствующая запись. Используя выражение-оценку, установите заголовочное свойство `sdc.operation.type` записи — 3 для обновления записей с соответствующими записями, а 1 для вставки новых записей. Когда вы передаете записи в JDBC Производитель назначение, назначение будет использовать операцию из заголовочного свойства `sdc.operation.type` для определения того, как запись будет записана в измерительную таблицу.
# Удаление контрольных символов
Вы можете использовать несколько этапов для удаления контрольных символов из данных, таких как escape-символы или конец передачи. Удаление контрольных символов помогает избежать создания недействительных записей.
Когда Data Collector удаляет контрольные символы, он удаляет ASCII коды символов 0-31 и 127, за исключением следующих:
- 9-табуляция
- 10-перенос строки
- 13-возврат кареткиВы можете использовать свойство "Игнорировать контрольные символы" на следующих этапах для удаления контрольных символов:
- Происхождение Amazon S3
- Источник потребителей Amazon SQS
- Источник Azure Data Lake Storage Gen1
- Источник Azure Data Lake Storage Gen2
- Источник потребителей Azure IoT / Центра событий
- Источник CoAP сервера
- Источник каталога
- Источник окончания файла
- Происхождение Google Cloud Storage
- Источник подписчика Google Publish / Subscribe
- Источник gRPC клиента
- Происхождение Hadoop FS
- Независимая версия Hadoop FS
- Источник HTTP клиента
- Источник HTTP сервера
- Источник JMS потребителя
- Источник Kafka потребителя
- Источник многотематического потребителя Kafka
- Источник потребителя Kinesis
- Источник MapR FS
- Независимый источник MapR FS
- Источник многотематического потока MapR Streams потребителя
- Источник потребителя MapR Streams
- Источник MQTT подписчика
- Источник потребителя Apache Pulsar
- Источник потребителя RabbitMQ
- Источник потребителя Redis
- Источник SFTP / FTP / FTPS клиента
- Источник TCP сервера
- Происхождение WebSocket клиента
- Происхождение WebSocket сервера
- Обработчик парсера данных
- Обработчик HTTP клиента
- Обработчик парсера JSON
- Обработчик парсера логов
- Обработчик парсера XML
# Этап разработкиВы можете использовать несколько этапов разработки для помощи в разработке и тестировании конвейеров.
**Внимание:** Не используйте этапы разработки в конвейерах производства.
Во время разработки или тестирования конвейеров можно использовать следующие этапы:
- Разработка генератора данных источника
Генерирует записи с указанными именами полей и типами полей. Для типов полей вы можете выбрать один из типов данных, таких как String и Integer, или один из типов данных, таких как адресная информация, телефонные номера и названия книг.
Вы можете использовать "Map" или "List Map" как тип поля источника.
Источник может генерировать события для тестирования функций обработки событий. Чтобы сгенерировать записи событий, выберите свойство "Generate Events".
При генерации событий источник использует настроенные поля в качестве тела записи события и добавляет свойства заголовка записи события. Вы также можете использовать свойство "Event Name" для указания типа события. Например, чтобы создать событие "no-more-data", введите "no-more-data" в поле "Event Name". Дополнительную информацию о фреймворке событий и свойствах заголовка записи событий см. в разделе "[Обзор триггеров потока данных](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Event_Handling/EventFramework-Title.html#concept_cph_5h4_lx)". Источник также может генерировать несколько потоков для тестирования конвейеров с несколькими потоками. Чтобы сгенерировать несколько потоков, введите число больше 1 в поле "Number of Threads". Дополнительную информацию о конвейерах с несколькими потоками см. в разделе "[Обзор конвейеров с несколькими потоками](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Multithreaded_Pipelines/MultithreadedPipelines.html#concept_zpp_2xc_py)".
Свойства ошибок записей недействительны на этом этапе.
- Разработка случайного источника записей
Генерирует записи с настроенным количеством Long полей. Вы можете определить задержку между пакетами и максимальное количество записей, которые нужно сгенерировать.
Свойства ошибок записей недействительны на этом этапе.
- Разработка оригинального источника данных
Генерирует записи на основе данных, предоставленных пользователем. Вы можете ввести исходные данные, выбрать формат данных и настроить любые связанные с форматом конфигурационные параметры.
Например, вы можете ввести набор данных логов, выбрать формат данных логов и определить формат логов и другие атрибуты логов.
В предварительном просмотре данных этот этап отображает исходные данные источника и данные, сгенерированные на этом этапе.
Источник может генерировать события для тестирования функций обработки событий. Чтобы сгенерировать события, выберите свойство " **Generate Events**".
Свойства ошибок записей недействительны на этом этапе.
- Dev SDC RPC с буферизированным источником
Принимает записи из целевого SDC RPC, временно буферизирует записи на диске перед передачей их следующей стадии. Используется в качестве источника в целевом SDC RPC. **Примечание:** Временно буферизированные записи будут потеряны после намеренного или неумышленного останова транспортной линии.  недоступно в транспортной линии Data Collector Edge.
- Источник воспроизведения снимка разработки
Читает записи из загруженного файла снимка. Источник может начать чтение с первой группы записей в файле снимка. Также можно настроить источник для начала чтения с определенного места в файле снимка.
 недоступно в транспортной линии Data Collector Edge.
- Источник чтения сенсоров
Генерирует записи с одним из следующих типов данных: атмосферные данные, такие как данные, сгенерированные датчиком атмосферы BMxx80. Записи включают следующие поля: температура_C, давление_KPa и влажность. Например: `{"temperature_C": "12.34", "pressure_KPa": "567.89", "humidity": "534.44"}` Данные температуры процессора Raspberry Pi, таких как данные, сгенерированные встроенным термодатчиком BCM2835. Записи содержат поле: температура_C. Например: `{"temperature_C": "123.45"}`
 недоступно в транспортной линии Data Collector.
- Обработчик идентификации разработчика
Передает все записи следующей стадии. Используется в качестве заполнителя в транспортной линии. Можно определить обязательные поля и предпосылки, а также настроить обработку ошибок стадии.- Обработчик случайной ошибки разработчика
Генерирует записи ошибок для тестирования обработки ошибок транспортной линии. Можно настроить стадию для отбрасывания записей, определения обязательных полей и предпосылок, а также настройки обработки ошибок стадии.
- Обработчик создания записей разработчика
Генерирует две записи для каждой записи, входящей в стадию. Можно определить обязательные поля и предпосылки, а также настроить обработку ошибок стадии.
 недоступно в транспортной линии Data Collector Edge.
- К активному назначению
Генерирует события для тестирования функций обработки событий. Для генерации событий выберите свойство "Сгенерировать события". Цель состоит в том, чтобы для каждого входящей записи создавать запись события. Он использует входящую запись как тело записи события и добавляет заголовочные атрибуты записи события. Обратите внимание, что любые заголовочные атрибуты входящей записи могут быть потеряны или заменены.
Дополнительные сведения о фреймворке событий и заголовочных атрибутах записей событий см. в разделе "[Обзор триггеров потока данных](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Event_Handling/EventFramework-Title.html#concept_cph_5h4_lx)".# Быстрые клавиатурные сочетания для проектирования конвейера
При проектировании конвейера можно использовать следующие клавиатурные сочетания:
| Клавиатурное сочетание | Описание |
| :--------------------- | :------- |
| Command + Z | Отменяет действие на холсте конвейера или панели свойств. |
| Command + Shift + Z | Повторяет действие на холсте конвейера или панели свойств. |
# Функции предварительного просмотра технологии
Data Collector включает некоторые новые функции и стадии с названием "Technology Preview". Функции предварительного просмотра технологии можно использовать для разработки и тестирования, но не для производства.
Когда функции предварительного просмотра технологии одобряются для производства, это отражается в выпусках и документации, а значок предварительного просмотра технологии удаляется из пользовательского интерфейса.
Стадии предварительного просмотра технологии отображаются в верхнем левом углу с значком "Technology Preview", как показано ниже:
В настоящее время Data Collector включает следующие стадии предварительного просмотра технологии:
- [Cron Scheduler](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/CronScheduler.html#concept_nsz_mnr_2jb) - создает записи с текущей датой и временем, основанными на cron-выражении.
- [gRPC клиентский источник](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/gRPCClient.html#concept_yp1_4zs_yfb) - обрабатывает данные в gRPC-сервере, вызывая методы gRPC-сервера и читая ответы, возвращаемые сервером. Источник может вызывать unary RPC или серверные потоковые RPC-методы. Используйте этот источник только в конвейерах, настроенных для режима выполнения на краях.
- [Пуск конвейера](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/StartPipeline.html#concept_h1l_xpr_2jb) - запускает конвейеры Data Collector, Data Collector Edge или Transformer.
- [Control Hub API процессор](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/ControlHubAPI.html#concept_akz_zsr_2jb) - вызывает API Control Hub.
- [Пуск задачи процессор](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/StartJob.html#concept_irv_l5r_2jb) - запускает задачи Control Hub.
- [Пуск конвейера процессор](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/StartPipeline.html#concept_bbc_cxr_2jb) - запускает конвейеры Data Collector, Data Collector Edge или Transformer.
# Тестовый источник данныхТестовый источник может предоставлять тестовые данные для предварительного просмотра данных, что помогает в разработке и тестировании конвейеров. В Control Hub тестовый источник также используется при разработке фрагментов конвейеров. При запуске конвейера тестовый источник не используется.
Тестовый источник представляет собой виртуальную стадию, настроенную в свойствах конвейера или фрагмента конвейера. Тестовый источник не отображается на холсте. Это позволяет настроить реальные источники данных на холсте (используемые в производственных целях), а тестовые источники — в свойствах конвейера или фрагмента.
Любой доступный источник данных может быть использован в качестве тестового источника. Тестовые данные могут быть представлены в любом формате, поддерживаемом тестовым источником. Тестовые данные не обязательно должны соответствовать формату данных, используемому в производственных целях.
Для использования тестового источника в предварительном просмотре данных необходимо выбрать тестовый источник в диалоговом окне конфигурации предварительного просмотра данных. По умолчанию предварительный просмотр данных использует источник данных, настроенный на холсте, как источник предварительного просмотра.Например, предположим, что ваш отдел IT предоставил отдельное хранилище Amazon S3 для тестовых данных. На холсте Amazon S3 источник настроен для использования реального хранилища для производства. В свойствах конвейера виртуальный Amazon S3 тестовый источник настроен для использования тестового хранилища.
Во время разработки и тестирования конвейера можно настроить предварительный просмотр данных для использования тестового источника, оставляя производственные данные неизменными. Затем, когда конвейер готов, запускается конвейер. Data Collector использует реальный источник данных Amazon S3 для обработки производственных данных, игнорируя тестовый источник данных.
Вы можете использовать тестовый источник по мере необходимости, но вот несколько общих сценариев:
- Использование тестового источника для доступа к тестовым данным в одном и том же исходном системе.
Используйте тот же тип источника данных на холсте в качестве тестового источника и настройте их для доступа к производственным и тестовым данным соответственно для предварительного просмотра данных.
- Использование тестового источника для доступа к тестовым данным в другой исходной системе. При необходимости вы можете использовать любой источник данных для доступа к тестовым данным для предварительного просмотра. Исходное местоположение и формат данных становятся несущественными после передачи в конвейер.- Использование источника данных разработки для предоставления тестовых данных.
Если у вас есть доступные тестовые данные, вы можете использовать исходный источник данных в качестве тестового источника. Или используйте другие стадии разработки для предоставления тестовых данных.
## Настройка тестового источника
Настройте тестовый источник в свойствах конвейера. При использовании Control Hub вы также можете настроить тестовый источник в свойствах фрагмента конвейера.
1. На вкладке **Общие** в свойствах конвейера или фрагмента выберите исходный тип, который хотите использовать.
Вы можете выбрать любой доступный исходный тип.
2. На вкладке **Тестовый исход** настройте свойства исхода.
Свойства тестового исхода аналогичны свойствам реального исхода, и все они отображаются на одной вкладке.
Подробную информацию о настройке исхода см. в разделе "Исходы" главы "Настройка исхода <type of origin>". Например, для получения помощи по настройке тестового исхода каталога см. [../Origins/Directory.html#task_gfj_ssv_yq](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/Directory.html#task_gfj_ssv_yq).
## Использование тестового исхода в предварительном просмотре данных
Чтобы использовать настроенный тестовый исход в предварительном просмотре данных, настройте параметры предварительного просмотра.1. Нажмите значок предварительного просмотра данных, чтобы начать предварительный просмотр данных.
2. В диалоговом окне **Параметры предварительного просмотра** установите свойство **Источник предварительного просмотра** на **Тестовый исход**, а затем настройте остальные параметры предварительного просмотра данных по необходимости.
Подробнее о предварительном просмотре данных см. в разделе [Обзор предварительного просмотра данных](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Data_Preview/DataPreview_Title.html#concept_jtn_s3m_lq).
# Понимание состояния конвейера
Состояние конвейера — это текущее состояние конвейера, например "Запущен" или "Остановлен". Состояние конвейера может отображаться в списке "Все конвейеры". Состояние конвейера также может отображаться в журналах Data Collector.
Обычно в списке "Все конвейеры" отображаются следующие состояния конвейера:
- Изменен — конвейер создан или изменен, и он не запускался с момента последнего изменения.
- Завершен — конвейер завершил все ожидаемые операции и был остановлен.
- RUN_ERROR — конвейер столкнулся с ошибкой при запуске или остановке.
- Запущен — конвейер запущен.
- Остановлен — конвейер был остановлен вручную.
- START_ERROR — конвейер столкнулся с ошибкой при запуске и не может быть запущен.
- STOP_ERROR — конвейер столкнулся с ошибкой при остановке.Следующие состояния конвейера являются переходными и редко отображаются в списке "Все конвейеры". Эти состояния могут отображаться в журналах Data Collector, если уровень журналирования конвейера установлен на Debug.
- CONNECT_ERROR — при работе в режиме кластера, Data Collector не может подключиться к основному кластерному менеджеру, например Mesos или YARN.
- Подключение — канал готовится к перезапуску после перезапуска Data Collector.
- DISCONNECTED — канал отсоединен от внешней системы, обычно из-за перезапуска или завершения работы Data Collector.
- Завершение — канал завершает все ожидаемые операции.
- Повторная попытка — канал пытается повторно запуститься после возникновения ошибки. Это происходит только если канал настроен на повторную попытку при возникновении ошибки.
- RUNNING_ERROR — канал столкнулся с ошибкой во время выполнения.
- Запуск — канал инициализируется, но ещё не запущен.
- STARTING_ERROR — канал столкнулся с ошибкой при запуске.
- Остановка — канал останавливается после ручного запроса на остановку.
- STOPPING_ERROR — канал столкнулся с ошибкой при остановке.
[Примеры перехода состояния](https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Pipeline_Maintenance/PipelineStates-Examples.html)
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )