ksqlDB — это база данных для создания приложений потоковой обработки поверх Apache Kafka. Она распределённая, масштабируемая, надёжная и работает в реальном времени. ksqlDB объединяет мощь потоковой обработки в реальном времени с доступностью реляционной базы данных благодаря знакомому лёгкому синтаксису SQL. ksqlDB предлагает следующие основные примитивы:
Комбинация этих мощных примитивов позволяет создавать полноценное потоковое приложение только с помощью операторов SQL, минимизируя сложность и операционные издержки. ksqlDB поддерживает широкий спектр операций, включая агрегацию, объединение, оконные функции, сегментацию сеансов и многое другое. Вы можете найти дополнительные руководства и ресурсы ksqlDB здесь.
См. документацию ksqlDB для последней стабильной версии.
ksqlDB позволяет определять материализованные представления над вашими потоками и таблицами. Материализованные представления определяются так называемым «постоянным запросом». Эти запросы известны как постоянные, потому что они поддерживают свои инкрементно обновляемые результаты с помощью таблицы.
CREATE TABLE hourly_metrics AS
SELECT url, COUNT(*)
FROM page_views
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY url EMIT CHANGES;
Результаты можно «вытягивать» из материализованных представлений по запросу через SELECT
запросы. Следующий запрос вернёт одну строку:
SELECT * FROM hourly_metrics
WHERE url = 'http://myurl.com' AND WINDOWSTART = '2019-11-20T19:00';
Результаты также могут непрерывно «проталкиваться» клиентам через потоковые SELECT
запросы. Следующий потоковый запрос будет отправлять клиенту все инкрементные изменения, внесённые в материализованное представление:
SELECT * FROM hourly_metrics EMIT CHANGES;
Потоковые запросы будут выполняться постоянно, пока они не будут явно остановлены.
Apache Kafka является популярным выбором для питания конвейеров данных. ksqlDB упрощает преобразование данных внутри конвейера, подготавливая сообщения к чистому попаданию в другую систему.
CREATE STREAM vip_actions AS
SELECT userid, page, action
FROM clickstream c
LEFT JOIN users u ON c.userid = u.user_id
WHERE u.level = 'Platinum' EMIT CHANGES;
ksqlDB хорошо подходит для выявления закономерностей или аномалий в данных в реальном времени. Обрабатывая поток по мере поступления данных, вы можете идентифицировать и правильно отображать необычные события с миллисекундной задержкой.
CREATE TABLE possible_fraud AS
SELECT card_number, count(*)
FROM authorization_attempts
WINDOW TUMBLING (SIZE 5 SECONDS)
GROUP BY card_number
HAVING count(*) > 3 EMIT CHANGES;
Способность Kafka обеспечивать масштабируемость... Организованный мониторинг и оповещение с помощью упорядоченных записей и потоковой обработки данных
ksqlDB предоставляет знакомый синтаксис для отслеживания, понимания и управления оповещениями.
CREATE TABLE error_counts AS
SELECT error_code, count(*)
FROM monitoring_stream
WINDOW TUMBLING (SIZE 1 MINUTE)
WHERE type = 'ERROR'
GROUP BY error_code EMIT CHANGES;
ksqldb включает встроенную интеграцию с источниками и приёмниками данных Kafka Connect, обеспечивая унифицированный SQL-интерфейс для широкого спектра внешних систем (https://www.confluent.io/hub).
Следующий запрос представляет собой простой постоянный потоковый запрос, который будет выводить все свои результаты в тему с именем clicks_transformed
:
CREATE STREAM clicks_transformed AS
SELECT userid, page, action
FROM clickstream c
LEFT JOIN users u ON c.userid = u.user_id EMIT CHANGES;
Вместо того чтобы просто отправлять весь непрерывный вывод запроса в Kafka-тему, часто бывает очень полезно направить вывод в другое хранилище данных. Интеграция ksqlDB с Kafka Connect делает этот шаблон очень простым.
Следующее утверждение создаст коннектор приёмника Kafka Connect, который непрерывно отправляет весь вывод из вышеупомянутого потокового запроса ETL непосредственно в Elasticsearch:
CREATE SINK CONNECTOR es_sink WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'topics' = 'clicks_transformed',
'key.ignore' = 'true',
'schema.ignore' = 'true',
'type.name' = '',
'connection.url' = 'http://elasticsearch:9200');
Для получения помощи, вопросов или запросов о ksqlDB используйте нашу пользовательскую группу Google (https://groups.google.com/forum/#!forum/ksql-users) или наш общедоступный канал Slack #ksqldb в Confluent Community Slack. Добро пожаловать всем!
Вы можете получить помощь, узнать, как внести свой вклад в ksqlDB, и найти последние новости, связавшись с сообществом Confluent.
По более общим вопросам о платформе Confluent обращайтесь в группу Google Confluent (https://groups.google.com/forum/#!forum/confluent-platform).
Вклад в код, примеры, документацию и т. д. очень ценятся.
Проект лицензирован в соответствии с лицензией сообщества Confluent.
Apache, Apache Kafka, Kafka и связанные с ними названия проектов с открытым исходным кодом являются товарными знаками Apache Software Foundation.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )