1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/zhuhuipei-flink-streaming-platform-web

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
catalog.md 4.1 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 03.03.2025 13:43 e11370c

Описание операции с каталогом CataLog

Указание зависимых JAR-файлов в официальной документации

https://ci.apache.org/projects/flink/flink-docs-release-1.12/ru/dev/table/connectors/hive/

Необходимость различных JAR-файлов зависит от версии Hive.

Официальное описание коннектора каталога:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/ru/dev/table/connectors/hive/#%D0%BA%D0%BE%D0%BD%D0%BD%D0%B5%D0%BA%D1%82%D0%BE%D1%80%D1%8B-%D0%BA-hive

JAR-файлы можно поместить в папку lib, либо на HTTP-сервер, а затем использовать HTTP-сервис при необходимости.

Пример 1

CREATE CATALOG testmyhive WITH (
    'type' = 'hive',
    'default-database' = 'zhp',
    'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);

USE CATALOG testmyhive;

CREATE TABLE source_table_01 (
    f0 INT,
    f1 INT,
    f2 STRING
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '5'
);

CREATE TABLE print_table_01 (
    f0 INT,
    f1 INT,
    f2 STRING
) WITH (
    'connector' = 'print'
);

INSERT INTO print_table_01 SELECT f0, f1, f2 FROM source_table_01;

SHOW TABLES;

SHOW FUNCTIONS;

SHOW CATALOGS;

SHOW DATABASES;

Пример 2

Если уже использовался, то можно просто создать новый каталог

CREATE CATALOG testmyhive WITH (
    'type' = 'hive',
    'default-database' = 'zhp',
    'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);

USE CATALOG testmyhive;

INSERT INTO print_table_01 SELECT f0, f1, f2 FROM source_table_01;

Пример 3

https://ci.apache.org/projects/flink/flink-docs-release-1.12/ru/dev/table/connectors/hive/hive_read_write.html

Передача потока данных в Hive

Обратите внимание: Чтобы записывать данные в Hive, необходимо включить checkpointing.

CREATE CATALOG testmyhive WITH (
    'type' = 'hive',
    'default-database' = 'zhp',
    'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);

USE CATALOG testmyhive;

DROP TABLE IF EXISTS item_test;

DROP TABLE IF EXISTS hive_flink_table;

CREATE TABLE item_test (
    itemId BIGINT,
    price BIGINT,
    proctime AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'flink-catalog-v1',
    'properties.bootstrap.servers' = '127.0.0.1:9092',
    'properties.group.id' = 'test-1',
    'format' = 'json',
    'scan.startup.mode' = 'earliest-offset'
);

SET table.sql-dialect = hive;

CREATE TABLE hive_flink_table (
    itemId BIGINT,
    price BIGINT,
    ups STRING
) TBLPROPERTIES (
    'sink.rolling-policy.rollover-interval' = '1min',
    'sink.partition-commit.trigger' = 'process-time',
    'sink.partition-commit.policy.kind' = 'metastore,success-file'
);

SET table.sql-dialect = default;

INSERT INTO hive_flink_table SELECT itemId, price, 'XXXXaaa' AS ups FROM item_test;

Пример отправки данных через Kafka для тестирования

public class KafkaSend {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);

        Map<String, Long> map = new HashMap<>();

        for (long i = 0; i < 10000; i++) {
            map.put("itemId", i);
            map.put("price", i + 1);
            producer.send(new ProducerRecord<>("flink-catalog-v1", null, JSON.toJSONString(map)));
            producer.flush();
            Thread.sleep(1000L);
        }

        producer.close();
    }
}

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/zhuhuipei-flink-streaming-platform-web.git
git@api.gitlife.ru:oschina-mirror/zhuhuipei-flink-streaming-platform-web.git
oschina-mirror
zhuhuipei-flink-streaming-platform-web
zhuhuipei-flink-streaming-platform-web
master