Описание операции с каталогом CataLog
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ru/dev/table/connectors/hive/
Необходимость различных JAR-файлов зависит от версии Hive.
Официальное описание коннектора каталога:
JAR-файлы можно поместить в папку lib
, либо на HTTP-сервер, а затем использовать HTTP-сервис при необходимости.
CREATE CATALOG testmyhive WITH (
'type' = 'hive',
'default-database' = 'zhp',
'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);
USE CATALOG testmyhive;
CREATE TABLE source_table_01 (
f0 INT,
f1 INT,
f2 STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5'
);
CREATE TABLE print_table_01 (
f0 INT,
f1 INT,
f2 STRING
) WITH (
'connector' = 'print'
);
INSERT INTO print_table_01 SELECT f0, f1, f2 FROM source_table_01;
SHOW TABLES;
SHOW FUNCTIONS;
SHOW CATALOGS;
SHOW DATABASES;
Если уже использовался, то можно просто создать новый каталог
CREATE CATALOG testmyhive WITH (
'type' = 'hive',
'default-database' = 'zhp',
'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);
USE CATALOG testmyhive;
INSERT INTO print_table_01 SELECT f0, f1, f2 FROM source_table_01;
Передача потока данных в Hive
Обратите внимание: Чтобы записывать данные в Hive, необходимо включить checkpointing.
CREATE CATALOG testmyhive WITH (
'type' = 'hive',
'default-database' = 'zhp',
'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);
USE CATALOG testmyhive;
DROP TABLE IF EXISTS item_test;
DROP TABLE IF EXISTS hive_flink_table;
CREATE TABLE item_test (
itemId BIGINT,
price BIGINT,
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'flink-catalog-v1',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'test-1',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
SET table.sql-dialect = hive;
CREATE TABLE hive_flink_table (
itemId BIGINT,
price BIGINT,
ups STRING
) TBLPROPERTIES (
'sink.rolling-policy.rollover-interval' = '1min',
'sink.partition-commit.trigger' = 'process-time',
'sink.partition-commit.policy.kind' = 'metastore,success-file'
);
SET table.sql-dialect = default;
INSERT INTO hive_flink_table SELECT itemId, price, 'XXXXaaa' AS ups FROM item_test;
public class KafkaSend {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Map<String, Long> map = new HashMap<>();
for (long i = 0; i < 10000; i++) {
map.put("itemId", i);
map.put("price", i + 1);
producer.send(new ProducerRecord<>("flink-catalog-v1", null, JSON.toJSONString(map)));
producer.flush();
Thread.sleep(1000L);
}
producer.close();
}
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )