Чтение заголовков Kafka в ksqlDB
Автор: Zara Lim (@jzaralim) | Целевая версия выпуска: 0.24.0; 7.2.0 | Статус: объединено | Обсуждение: https://github.com/confluentinc/ksql/pull/8293
Краткое содержание: предоставить данные заголовков Kafka в ksqlDB. В настоящее время заголовки являются единственным невидимым для пользователей метаданными в записи Kafka.
На данный момент ksqlDB предоставляет пользователям метки времени, раздел и смещение записей Kafka как псевдостолбцы. Единственные оставшиеся невидимые для пользователей ksqlDB метаданные — это заголовки. Заголовки Kafka представляют собой список из нуля или более пар ключ-значение, где ключи (необязательно уникальные) являются строками, а значения — байтовыми массивами. Заголовки обычно содержат метаданные о записях, которые затем можно использовать для маршрутизации или обработки (например, заголовок может хранить информацию о том, как десериализовать строки в значении). Если ksqlDB предоставит заголовки пользователю, то эти шаги можно будет выполнить в ksqlDB.
В отличие от других псевдостолбцов, если пользователь хочет получить доступ к заголовкам, он создаст столбец с ключевым словом HEADERS. Этот столбец будет заполнен полным списком ключей и значений заголовков. Может быть только один столбец HEADERS.
ksql> CREATE STREAM A (
my_headers ARRAY<STRUCT<key STRING, value BYTES>> HEADERS);
Если есть несколько столбцов HEADERS, будет выдана ошибка. Тип столбца заголовка должен быть точно ARRAY<STRUCT<ключ STRING, значение BYTES>>. Будет выдана ошибка, если это не так.
ksql> CREATE STREAM B (my_headers ARRAY<BYTES> HEADERS);
Ошибка: Столбцы, указанные с помощью ключевого слова HEADERS, должны быть типизированы как ARRAY<STRUCT<key STRING, value BYTES>>.
Пользователи также могут пометить столбец как HEADER(<ключ>), который заполнит столбец последним заголовком, соответствующим ключу. Тип данных должен быть BYTES.
ksql> CREATE STREAM B (value BYTES HEADER(<key>), value_2 BYTES HEADER(<key_2>));
Если присутствует столбец с пометкой HEADERS, то не может быть столбцов с пометкой HEADER(<ключ>). В противном случае может быть несколько столбцов с пометкой HEADER(<ключ>), но никакие два не могут иметь одинаковый ключ. Если ключ отсутствует в заголовке, значением в столбце будет null.
Источники, созданные с использованием вывода схемы, могут иметь столбцы заголовков, их просто нужно указать. Например, оператор CREATE STREAM A (my_headers ARRAY<STRUCT<key STRING, value BYTES>> HEADERS) WITH (kafka_topic='a', value_format='avro');
создаст поток со столбцами из Schema Registry, а также один столбец заголовков.
Также появятся новые функции преобразования байтов для помощи в декодировании данных заголовков:
INT_FROM_BYTES(bytes, [byte_order])
— входной байтовый массив должен иметь длину ровно 4 байта, иначе будет выдана ошибка. Преобразование будет обрабатываться методом getInt
класса ByteBuffer
. byte_order
— это строка, которая может быть либо BIG_ENDIAN
, либо LITTLE_ENDIAN
. Если byte_value
не указано, функция будет предполагать BIG_ENDIAN
.BIGINT_FROM_BYTES(bytes, [byte_order])
— входной байтовый массив должен иметь длину ровно 8 байтов, иначе будет выдана ошибка. Преобразование будет обрабатываться методом getLong
класса ByteBuffer
. Если byte_value
не указано, функция будет предполагать BIG_ENDIAN
. DOUBLE_FROM_BYTES(bytes, [byte_order]) — входной массив байтов должен быть ровно 8 байт в длину, иначе будет выброшено исключение. Преобразование будет обрабатываться методом getDouble класса ByteBuffer. Если byte_value не указано, то функция будет использовать значение по умолчанию BIG_ENDIAN.Во всех этих случаях, когда возникает ошибка, она записывается в журнал обработки. Если функция является частью проекции, то в выходной столбец будет записано значение null. Если это часть предиката, то вся запись будет пропущена. Это соответствует тому, как обрабатываются ошибки, когда функции с аргументами столбцов значений выдают ошибки.
Заголовочные столбцы можно использовать в качестве элементов выбора, аргументов функций или предикатов в любом запросе, как и любой другой столбец, за исключением операторов INSERT SELECT и INSERT VALUES. Они завершатся ошибкой, если попытаться вставить данные в столбец заголовков. Однако, когда постоянные запросы проецируют столбцы на основе заголовков, значения заголовков копируются в строку выходного значения (или ключа), а не в заголовки выходной записи.
Столбцы заголовков будут полностью являться частью схемы потока/таблицы. При выполнении SELECT * столбцы заголовков будут включены в проекцию, и они также появятся при описании источника. Однако заголовки не будут скопированы в поля заголовков тем приёмников.
В логической схеме столбцы будут представлены новым типом столбца HeaderColumn, который будет расширять Column и содержать необязательное String с именем key, представляющее ключ в HEADER(key). Для столбцов, определённых с помощью HEADERS, он будет содержать Optional.empty(). Они будут находиться в новом пространстве имён HEADER, которое функционирует идентично пространству имён VALUE, за исключением того, что StreamBuilder заполнит эти столбцы значениями заголовков.
Будут добавлены следующие тестовые случаи:
Разработка функции будет осуществляться за флагом функции ksql.headers.enabled. Когда флаг выключен, создание новых потоков и таблиц с заголовками будет отключено, но источники с заголовками будут продолжать работать как обычно.
Этапы разработки:
Документация будет обновлена следующим образом:
Все запросы, созданные до введения заголовков, должны продолжать работать. Никаких последствий для совместимости не ожидается.
Последствия для безопасности отсутствуют.
Добавить ROWHEADERS как псевдостолбец, аналогичный timestamp/offset/partition
При таком подходе ROWHEADERS работает точно так же, как ROWTIME, ROWOFFSET и ROWPARTITION. Это самый низкий уровень усилий для реализации, но он был отклонён по следующим причинам:
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )