1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/mirrors-KSQL

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
klip-57-kafka-headers.md 14 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 28.11.2024 07:33 e6dfb69

Чтение заголовков Kafka в ksqlDB

Автор: Zara Lim (@jzaralim) | Целевая версия выпуска: 0.24.0; 7.2.0 | Статус: объединено | Обсуждение: https://github.com/confluentinc/ksql/pull/8293

Краткое содержание: предоставить данные заголовков Kafka в ksqlDB. В настоящее время заголовки являются единственным невидимым для пользователей метаданными в записи Kafka.

Мотивация и предпосылки

На данный момент ksqlDB предоставляет пользователям метки времени, раздел и смещение записей Kafka как псевдостолбцы. Единственные оставшиеся невидимые для пользователей ksqlDB метаданные — это заголовки. Заголовки Kafka представляют собой список из нуля или более пар ключ-значение, где ключи (необязательно уникальные) являются строками, а значения — байтовыми массивами. Заголовки обычно содержат метаданные о записях, которые затем можно использовать для маршрутизации или обработки (например, заголовок может хранить информацию о том, как десериализовать строки в значении). Если ksqlDB предоставит заголовки пользователю, то эти шаги можно будет выполнить в ksqlDB.

Что входит в рамки проекта

  • Синтаксис для доступа к заголовкам
  • Поддержка всех типов запросов, кроме INSERT-запросов
  • Включение столбцов заголовков в схему источника данных
  • Функции для декодирования байтовых строк в примитивные типы (преобразование BYTES в STRING уже существует, новые функции будут представлять собой преобразования в INT, BIGINT и DOUBLE)

Что не входит в рамки проекта

  • Запись заголовков в выходные темы — мы можем сделать это в будущем, но это отдельная тема для обсуждения
  • Расширенные функции для декодирования байтовых строк (например, функции для десериализации сложных сериализованных данных). Если пользователь хочет преобразовать что-то, для чего у нас нет функции, он может либо создать свою собственную UDF, либо использовать CREATE FUNCTION, когда это станет доступно.

Публичные API

Синтаксис

В отличие от других псевдостолбцов, если пользователь хочет получить доступ к заголовкам, он создаст столбец с ключевым словом HEADERS. Этот столбец будет заполнен полным списком ключей и значений заголовков. Может быть только один столбец HEADERS.

ksql> CREATE STREAM A (
    my_headers ARRAY<STRUCT<key STRING, value BYTES>> HEADERS);

Если есть несколько столбцов HEADERS, будет выдана ошибка. Тип столбца заголовка должен быть точно ARRAY<STRUCT<ключ STRING, значение BYTES>>. Будет выдана ошибка, если это не так.

ksql> CREATE STREAM B (my_headers ARRAY<BYTES> HEADERS);
Ошибка: Столбцы, указанные с помощью ключевого слова HEADERS, должны быть типизированы как ARRAY<STRUCT<key STRING, value BYTES>>.

Пользователи также могут пометить столбец как HEADER(<ключ>), который заполнит столбец последним заголовком, соответствующим ключу. Тип данных должен быть BYTES.

ksql> CREATE STREAM B (value BYTES HEADER(<key>), value_2 BYTES HEADER(<key_2>));

Если присутствует столбец с пометкой HEADERS, то не может быть столбцов с пометкой HEADER(<ключ>). В противном случае может быть несколько столбцов с пометкой HEADER(<ключ>), но никакие два не могут иметь одинаковый ключ. Если ключ отсутствует в заголовке, значением в столбце будет null.

Источники, созданные с использованием вывода схемы, могут иметь столбцы заголовков, их просто нужно указать. Например, оператор CREATE STREAM A (my_headers ARRAY<STRUCT<key STRING, value BYTES>> HEADERS) WITH (kafka_topic='a', value_format='avro'); создаст поток со столбцами из Schema Registry, а также один столбец заголовков.

Новые функции преобразования байтов

Также появятся новые функции преобразования байтов для помощи в декодировании данных заголовков:

  • INT_FROM_BYTES(bytes, [byte_order]) — входной байтовый массив должен иметь длину ровно 4 байта, иначе будет выдана ошибка. Преобразование будет обрабатываться методом getInt класса ByteBuffer. byte_order — это строка, которая может быть либо BIG_ENDIAN, либо LITTLE_ENDIAN. Если byte_value не указано, функция будет предполагать BIG_ENDIAN.
  • BIGINT_FROM_BYTES(bytes, [byte_order]) — входной байтовый массив должен иметь длину ровно 8 байтов, иначе будет выдана ошибка. Преобразование будет обрабатываться методом getLong класса ByteBuffer. Если byte_value не указано, функция будет предполагать BIG_ENDIAN. DOUBLE_FROM_BYTES(bytes, [byte_order]) — входной массив байтов должен быть ровно 8 байт в длину, иначе будет выброшено исключение. Преобразование будет обрабатываться методом getDouble класса ByteBuffer. Если byte_value не указано, то функция будет использовать значение по умолчанию BIG_ENDIAN.

Во всех этих случаях, когда возникает ошибка, она записывается в журнал обработки. Если функция является частью проекции, то в выходной столбец будет записано значение null. Если это часть предиката, то вся запись будет пропущена. Это соответствует тому, как обрабатываются ошибки, когда функции с аргументами столбцов значений выдают ошибки.

Запросы

Заголовочные столбцы можно использовать в качестве элементов выбора, аргументов функций или предикатов в любом запросе, как и любой другой столбец, за исключением операторов INSERT SELECT и INSERT VALUES. Они завершатся ошибкой, если попытаться вставить данные в столбец заголовков. Однако, когда постоянные запросы проецируют столбцы на основе заголовков, значения заголовков копируются в строку выходного значения (или ключа), а не в заголовки выходной записи.

Схема

Столбцы заголовков будут полностью являться частью схемы потока/таблицы. При выполнении SELECT * столбцы заголовков будут включены в проекцию, и они также появятся при описании источника. Однако заголовки не будут скопированы в поля заголовков тем приёмников.

Дизайн

В логической схеме столбцы будут представлены новым типом столбца HeaderColumn, который будет расширять Column и содержать необязательное String с именем key, представляющее ключ в HEADER(key). Для столбцов, определённых с помощью HEADERS, он будет содержать Optional.empty(). Они будут находиться в новом пространстве имён HEADER, которое функционирует идентично пространству имён VALUE, за исключением того, что StreamBuilder заполнит эти столбцы значениями заголовков.

План тестирования

Будут добавлены следующие тестовые случаи:

  • Создание STREAM/TABLE с использованием столбцов заголовков;
  • Запросы SELECT * FROM... включают столбцы заголовков в вывод;
  • DESCRIBE SOURCE... включает любые столбцы заголовков;
  • Push- и pull-запросы могут выбирать заголовки;
  • Заголовки могут использоваться в запросах JOIN, UDF, фильтрах, GROUP BY и PARTITION BY;
  • Тесты для новых функций;
  • Вставка значений в столбцы заголовков заблокирована;
  • Постоянные запросы, созданные в предыдущей версии, всё ещё работают;
  • На заголовки тем приёмника эти изменения не влияют;
  • Столбцы заголовков можно использовать с Java-клиентом, инструментом миграции и CLI.

Этапы разработки и сроки выполнения

Разработка функции будет осуществляться за флагом функции ksql.headers.enabled. Когда флаг выключен, создание новых потоков и таблиц с заголовками будет отключено, но источники с заголовками будут продолжать работать как обычно.

Этапы разработки:

  1. Создать флаг функции;
  2. Добавить HEADERS в синтаксис ksqlDB;
  3. Добавить столбцы заголовков к LogicalSchema;
  4. Обновить SourceBuilder для добавления заголовков;
  5. Обновить всё остальное, что потребуется для адаптации заголовков и новой LogicalSchema;
  6. Запретить использование столбцов заголовков в операторах INSERT;
  7. Добавить HEADER() в синтаксис ksqlDB;
  8. Обновить все связанные с заголовками элементы кода, чтобы включить извлечение ключей из заголовков;
  9. Отключить флаг функции;
  10. Реализовать новые функции преобразования байтов.

Документация будет обновлена следующим образом:

  • Синопсисы CREATE STREAM|TABLE будут обновлены, чтобы включать столбцы заголовков;
  • Будет добавлен новый раздел на странице документации о псевдостолбцах, посвящённый описанию доступа к заголовкам;
  • Будут добавлены документы по каждой из новых функций.

Последствия для совместимости

Все запросы, созданные до введения заголовков, должны продолжать работать. Никаких последствий для совместимости не ожидается.

Последствия для безопасности

Последствия для безопасности отсутствуют.

Рассмотренные альтернативы

Добавить ROWHEADERS как псевдостолбец, аналогичный timestamp/offset/partition

При таком подходе ROWHEADERS работает точно так же, как ROWTIME, ROWOFFSET и ROWPARTITION. Это самый низкий уровень усилий для реализации, но он был отклонён по следующим причинам:

  1. При выполнении соединений ksqlDB включает все столбцы в темы переразбиения и журнала изменений.

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/mirrors-KSQL.git
git@api.gitlife.ru:oschina-mirror/mirrors-KSQL.git
oschina-mirror
mirrors-KSQL
mirrors-KSQL
master