1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/tenmg-flink-jobs-launcher

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

flink-jobs-launcher

maven

Важное предупреждение: в проекте flink-jobs уже предоставлен flink-jobs-clients в качестве замены этого проекта, настоятельно рекомендуется пользователям перейти на использование, этот проект не будет обновляться после версии flink-jobs 1.3+.

Введение

flink-jobs-launcher — это библиотека запуска приложений для проектов flink-jobs, которая может использоваться для запуска flink-jobs или обычных заданий Flink. С помощью flink-jobs-launcher можно быстро интегрировать Flink в существующие системы, основанные на Java, а также управлять Flink SQL через файлы конфигурации XML. Типичная архитектура развертывания flink-jobs-launcher выглядит следующим образом:

Типичная архитектура развёртывания flink-jobs-launcher

Начало работы

1.1.3 и более ранние версии поддерживают только CommandLineFlinkJobsLauncher, начиная с версии 1.1.4 рекомендуется использовать RestClusterClientFlinkJobsLauncher. Ниже приведён пример использования RestClusterClientFlinkJobsLauncher в Maven-проекте.

Добавление зависимостей

<!-- https://mvnrepository.com/artifact/cn.tenmg/flink-jobs-launcher -->
<dependency>
    <groupId>cn.tenmg</groupId>
    <artifactId>flink-jobs-launcher</artifactId>
    <version>${flink-jobs-launcher.version}</version>
</dependency>

Настройка конфигурации

# RPC configuration
jobmanager.rpc.address= 192.168.100.27
jobmanager.rpc.port=6123
# Рекомендуемый, но необязательный путь к файлу JAR, который flink-jobs-launcher отправляет на выполнение.
flink.jobs.default.jar=/yourpath/your-flink-jobs-app-1.0.0.jar
# Необязательный класс, который flink-jobs-launcher отправляет для выполнения. Вы также можете указать основной класс в файле JAR.
#flink.jobs.default.class=yourpackage.App

Отправка задания

Вызовите метод load класса XMLConfigLoader для загрузки файла конфигурации XML и отправьте его на выполнение с помощью запускателя:

FlinkJobs flinkJobs = XMLConfigLoader.getInstance().load(ClassUtils.getDefaultClassLoader().getResourceAsStream("flink-jobs.xml"));
RestClusterClientFlinkJobsLauncher launcher = new RestClusterClientFlinkJobsLauncher();
FlinkJobsInfo flinkJobsInfo = launcher.launch(flinkJobs);
System.out.println("Flink job launched: " + JSON.toJSONString(flinkJobsInfo));// Запуск задания flink-jobs

или

FlinkJobs flinkJobs = XMLConfigLoader.getInstance()
    .load("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\r\n" + 
        "<flink-jobs xmlns=\"http://www.10mg.cn/schema/flink-jobs\"\r\n" + 
        "   xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\r\n" + 
        "   xsi:schemaLocation=\"http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd\"\r\n" + 
        "   jar=\"/opt/flink-jobs/flink-jobs-quickstart-1.1.4.jar\" serviceName=\"HelloWorldService\">\r\n" + 
            "</flink-jobs>");
RestClusterClientFlinkJobsLauncher launcher = new RestClusterClientFlinkJobsLauncher();
System.out.println("Flink job launched: " + JSON.toJSONString(flinkJobsInfo));// Запуск задания flink-jobs

Остановка задания

System.out.println("Задание flink-jobs с идентификатором задания: " + flinkJobsInfo.getJobId() + " остановлено, путь сохранения: "
                + launcher.stop(flinkJobsInfo.getJobId()));// Остановка задания flink-jobs

Быстрый старт

Подробнее см. https://gitee.com/tenmg/flink-jobs-launcher-quickstart

Конфигурация задачи

<flink-jobs>

flink-jobs — это корневой узел XML-файла конфигурации задач flink-jobs. Обратите внимание, что необходимо правильно настроить пространство имён, обычно структура выглядит следующим образом:

<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd">
</flink-jobs>

Связанные атрибуты и описание:

Атрибут Тип Обязателен Описание
jar String Нет Исполняемый файл JAR. Можно настроить путь по умолчанию для исполняемого файла JAR с помощью конфигурации файла flink.jobs.default.jar.
class String Нет Основной класс. Можно настроить основной класс по умолчанию с помощью конфигурации файла flink.jobs.default.class.
serviceName String runtimeMode

<configuration>

Индивидуальная конфигурация заданий Apache Flink, формат: k1=v1[,k2=v3…]. Например: <configuration><![CDATA[pipeline.name=customJobName]]></configuration> означает, что пользовательское задание Apache Flink SQL называется customJobName. Подробные сведения о конфигурации см. в официальной документации Apache Flink.

<options>

Конфигурация параметров запуска, используемая для указания параметров запуска программы Apache Flink.

<option>

Определённая конфигурация параметра запуска. XSD-файл предоставляет список часто используемых значений ключей, которые могут быть автоматически предложены в среде IDE. При использовании CommandLineFlinkJobsLauncher поддерживаются не только эти параметры, но и любые другие параметры, поддерживаемые Apache Flink (см. документацию Apache Flink или запустите flink -h, чтобы получить помощь).

Свойства Тип Обязательный Описание
key String Да Параметр ключа.
value String Нет Значение параметра. Предоставляется непосредственно через опцию в виде текста, например <option>value</option> или <option><![CDATA[value]]></option>.

<params>

Таблица поиска параметров. Обычно используется в SQL, а также может использоваться в пользовательских сервисах приложения flink-jobs с помощью аргумента arguments.

<param>

Специфическая конфигурация параметров.

Свойства Тип Обязательный Описание
name String Да Имя параметра.
value String Нет Значение параметра.

<bsh>

Настройка для выполнения кода Java на основе Beanshell.

Свойства Тип Обязательный Описание
saveAs String Нет Сохраняет результат операции как новое имя переменной. Значение переменной — это возвращаемое значение кода Java на основе Beanshell (с использованием return xxx;).

<var>

Объявление переменных, используемых в коде Java на основе Beanshell.

Свойства Тип Обязательный Описание
name String Да Переменная, используемая в Beanshell.
value String Нет Значение переменной. По умолчанию совпадает с именем. flink-jobs ищет параметр с именем value и, если он существует и не равен null, использует его значение в качестве значения параметра; в противном случае используется значение value.

<java>

Код Java. Используется текстовое представление, например: <java>java code</java> или <option><![CDATA[java code]]></option>. Обратите внимание: при использовании дженериков нельзя использовать угловые скобки для объявления дженерика. Например, при использовании Map нельзя использовать «Map<String , String> map = new HashMap<String , String>();», но можно использовать «Map map = new HashMap();».

<execute-sql>

Выполнение кода SQL на основе DSL.

Свойства Тип Обязательный Описание
saveAs String Нет Сохраняет результат операции как новое имя переменной. Переменной присваивается возвращаемое значение tableEnv.executeSql(statement);.
dataSource String Нет Используемое имя источника данных. Здесь источник данных настроен в файле конфигурации приложения flink-jobs, а не в файле конфигурации flink-jobs-launcher. См. конфигурацию источника данных flink-jobs.
catalog String Нет Каталог, используемый для выполнения SQL-запросов Flink.
script String Нет Код SQL на основе DSL. Используется текстовое представление, такое как <execute-sql>SQL code</execute-sql> или <execute-sql><![CDATA[SQL code]]></execute-sql>. Поскольку Flink SQL не поддерживает DELETE и UPDATE, такие запросы выполняются в функции main программы с использованием JDBC.

<sql-query>

Выполнение SQL-запроса на основе DSL.

Свойства Тип Обязательный Описание
saveAs String Нет Сохранение результата запроса во временной таблице и сохранение результата операции как нового имени переменной. Переменной присваивается возвращаемое значение tableEnv.executeSql(statement);.
catalog String Нет Каталог, используемый для выполнения SQL-запросов Flink.
script String Нет Код SQL на основе DSL. Используется текстовое представление, такое как <sql-query>SQL code</sql-query> или <sql-query><![CDATA[SQL code]]></sql-query>.

<jdbc>

Выполнение JDBC SQL на основе DSL. Целевой код JDBC SQL выполняется в функции main приложения [flink-jobs].

Свойства Тип Обязательный Описание
saveAs String Нет Результат операции сохраняется как новое имя переменной. Переменной присваивается значение, возвращаемое методом executeLargeUpdate.
dataSource String Да Используемое имя источника данных. Здесь источник данных настроен в файле конфигурации приложения flink-jobs, а не в файле конфигурации flink-jobs-launcher. См. конфигурацию источника данных flink-jobs.
method String Нет Вызываемый метод JDBC. По умолчанию — executeLargeUpdate.
script String Да Код SQL на основе DSL.

<data-sync>

Запуск потоковой задачи на основе Flink SQL для синхронизации данных, поддерживается версия 1.1.2+. Подробные сведения об атрибутах и ​​описании приведены ниже:

Свойства Тип Обязательный Описание
saveAs String Нет Результат операции сохраняется как новое имя переменной. Переменной присваивается объект org.apache.flink.table.api.TableResult, возвращаемый оператором INSERT. Обычно не используется.
from String Да Тема: String
FromConfig: String Нет Источник конфигурации. Например: properties.group.id=flink-jobs.
To: String Да Имя целевого источника данных, в настоящее время поддерживается только JDBC.
ToConfig: String Да Целевая конфигурация. Например: sink.buffer-flush.max-rows = 0.
Table: String Да Синхронное имя таблицы данных.
PrimaryKey: String Нет Первичный ключ, несколько имён столбцов разделяются «,». При включении интеллектуального режима автоматически получает информацию о первичном ключе.
Timestamp: String Нет Имя столбца временной метки, несколько имён столбцов используются для разделения «,». После установки этого значения при создании исходной и целевой таблиц будут добавлены эти столбцы, а также они будут записаны в эти столбцы во время синхронизации данных. Обычно используется для настройки унифицированного указания в файле конфигурации приложения flink-jobs вместо указания каждого отдельного задания синхронизации.
Smart: Boolean Нет Включение интеллектуального режима. Если не установлено, то глобальная конфигурация определяет, следует ли включать интеллектуальный режим, глобальное значение по умолчанию — data.sync.smart=true.

<column>: Element | Нет | Столбец синхронизации данных. При включении интеллектуального режима информация о столбце будет получена автоматически.

Атрибут Тип Обязательный Описание
FromName String Да Исходное имя столбца.
FromType String Нет Исходный тип данных. Если отсутствует, то при включённом интеллектуальном режиме будет автоматически получен целевой тип данных в качестве исходного типа данных, если интеллектуальный режим выключен, то обязательно.
ToName String Нет Целевое имя столбца. По умолчанию совпадает с исходным именем столбца.
ToType String Нет Целевой тип данных столбца. Если отсутствует, то будет автоматически получен при включённом интеллектуальном режиме, если интеллектуальный режим выключен — по умолчанию равен исходному типу данных столбца.
Strategy String Нет Стратегия синхронизации. Возможные значения: both/from/to, оба означает создание исходного и целевого столбцов, from означает создание только исходного столбца, to означает создание только целевого столбца, по умолчанию используется значение both.
Script String Нет Пользовательский скрипт. Обычно используется при необходимости выполнения функции преобразования.

Пример конфигурации XML

Для лучшего понимания файла конфигурации flink-jobs ниже приведены примеры конфигурационных файлов XML для нескольких распространённых сценариев:

Запуск обычной программы Flink

<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd"
    jar="D:\Programs\flink-1.8.3\examples\batch\WordCount.jar">
</flink-jobs>

Запуск пользовательского сервиса

Ниже приведён пример файла конфигурации задачи XML для пользовательского сервиса:

<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd"
    jar="/yourPath/yourJar.jar" serviceName="yourServiceName">
</flink-jobs>

Выполнение пакетной обработки SQL

Ниже представлен простой файл конфигурации задачи XML для выполнения пакетной обработки заказанного количества статистических SQL-задач:

<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd"
    jar="/yourPath/yourJar.jar">
    <!--Параметры задачи, некоторые общие параметры также могут быть указаны перед вызовом Java API, например системное время и т. д. -->
    <params>
        <param name="beginDate">2021-01-01</param>
        <param name="endDate">2021-07-01</param>
    </params>

    <!-- Использование источника данных с именем hivedb для создания каталога с именем hive -->
    <execute-sql dataSource="hivedb">
        <![CDATA[
            create catalog hive
        ]]>
    </execute-sql>
    <!--Загрузка модуля hive -->
    <execute-sql>
        <![CDATA[
            load module hive
        ]]>
    </execute-sql>
    <!--Использование модулей hive,core -->
    <execute-sql>
        <![CDATA[
            use modules hive,core
        ]]>
    </execute-sql>
    <!-- Использовать источник данных с именем pgdb для создания таблицы order_stats_daily (если имя исходной таблицы и указанное имя таблицы построения не совпадают, вы можете указать WITH ('table-name' = 'actrual_table_name') для указания) -->
    <execute-sql dataSource="pgdb">
        <![CDATA[
            CREATE TABLE order_stats_daily (
              stats_date DATE,
              `count` BIGINT,
              PRIMARY KEY (stats_date) NOT ENFORCED
            ) WITH ('sink.buffer-flush.max-rows' = '0')
        ]]>
    </execute-sql>
    <!-- Используйте каталог hive для запроса и сохраните результаты как временную таблицу tmp в default_catalog по умолчанию -->
    <sql-query saveAs="tmp" catalog="hive">
        <![CDATA[
            select cast(to_date(o.business_date) as date) stats_date, count(*) `count` from odc_order_info_par o where o.business_date >= :beginDate and o.business_date < :endDate group by cast(to_date(o.business_date) as date)
        ]]>
    </sql-query>
    <!-- Удалить существующую статистику order_stats_daily данных (FLINK SQL не поддерживает DELETE, здесь выполняется JDBC)-->
    <execute-sql dataSource="pgdb">
        <![CDATA[
            delete from order_stats_daily where stats_date >= :beginDate and stats_date < :endDate
        ]]>
    </execute-sql>
    <!-- Вставка данных. На самом деле Flink в конечном итоге выполнит Upsert синтаксис -->
    <execute-sql>
        <![CDATA[
            INSERT INTO order_stats_daily(stats_date,`count`) SELECT stats_date, `count` FROM tmp
        ]]> **Запуск потоковой обработки SQL**

Ниже приведён XML-файл конфигурации задачи для синхронизации гетерогенных баз данных с помощью Debezium:

Конфигурация Flink-Jobs

#### Запуск задачи синхронизации данных

Ниже представлен файл конфигурации XML для выполнения задачи синхронизации разнородных баз данных с помощью Debezium:

```xml version="1.0" encoding="UTF-8"?>
<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs-1.1.2.xsd">
    <data-sync table="od_order_info" to="data_skyline"
        from="kafka" topic="testdb.testdb.od_order_info">
        <!-- В случае, когда структура таблиц в источнике и целевой базе данных одинакова (имена и типы полей совпадают), интеллектуальный режим позволяет автоматически получать метаданные таблицы из целевой базы данных, что упрощает процесс синхронизации данных. -->
        <!-- Если структура таблиц в источнике и целевой базе данных отличается (например, различаются имена или типы полей), необходимо настроить информацию о различиях столбцов вручную, например, указать исходный тип и функцию преобразования: -->
        <column fromName="UPDATE_TIME" fromType="BIGINT">TO_TIMESTAMP(FROM_UNIXTIME(UPDATE_TIME/1000, 'yyyy-MM-dd HH:mm:ss'))</column>
        <!-- Кроме того, если интеллектуальный режим отключен, необходимо предоставить подробную информацию обо всех столбцах. -->
    </data-sync>
</flink-jobs>

Конфигурационный файл

По умолчанию конфигурационный файл называется flink-jobs-launcher.properties (обратите внимание: он должен находиться в каталоге classpath), но можно изменить путь и имя файла конфигурации с помощью файла flink-jobs-launcher-context-loader.properties.

Общие настройки

Свойство Тип Обязательное Описание
flink.jobs.default.jar String Нет JAR-пакет, который по умолчанию отправляется при запуске Flink. Используется, если в конфигурации задачи не указано свойство jar.
flink.jobs.default.class String Нет Основной класс, который по умолчанию запускается при отправке задания в Flink. Используется, если в конфигурации задачи не указано свойство class.

RestClusterClientFlinkJobsLauncher

Свойство Тип Обязательное Описание
jobmanager.rpc.servers String Нет Адрес удалённого вызова Flink кластера, формат: host1:port1,host2:port2,…, где порт может быть опущен, в этом случае все порты будут равны значению jobmanager.rpc.port.
jobmanager.rpc.address String Нет Удалённый адрес вызова Flink кластера. Рекомендуется использовать только один хост, использование устарело. Настройка jobmanager.rpc.servers делает эту настройку недействительной.
jobmanager.rpc.port int Да Порт удалённого вызова Flink кластера по умолчанию равен 6123.
jobmanager.* Другие настройки Flink кластера см. на официальном сайте Flink.
rest.* Другие настройки Flink кластера см. на официальном сайте Flink.

CommandLineFlinkJobsLauncher

Свойство Тип Обязательное Описание
commandline.flink.home String Нет Если доступ к командной строке Flink не требует добавления каталога, настройка не требуется.
commandline.launch.action String Нет По умолчанию равно run.
commandline.launch.temp_file_prefix String Нет Префикс временных файлов, созданных во время запуска, по умолчанию flink-jobs_.
commandline.yarn.rest String Нет YARN REST адрес и порт, необходимые для запуска Flink On YARN.
commandline.yarn.application_check_attempts String Нет Значение по умолчанию — 60. Определяет количество попыток проверки Application ID через автоматически сгенерированное уникальное имя приложения при работе с Flink On YARN.
commandline.yarn.time_millis_between_application_check_attempts String Нет Значение по умолчанию — 3000. Определяет интервал ожидания между двумя попытками проверки Application ID при работе с Flink On YARN через автоматически сгенерированное уникальное имя приложения.
commandline.yarn.application_id_prefix String Нет Значение по умолчанию — application_. Определяет префикс автоматически сгенерированного уникального имени приложения при работе с Flink On YARN.

Участие в проекте

  1. Fork этого репозитория.
  2. Создайте ветку Feat_xxx.
  3. Отправьте код.
  4. Создайте Pull Request.

Ссылки

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Библиотека класса запуска приложений flink-jobs: игра с Flink. Развернуть Свернуть
Apache-2.0
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/tenmg-flink-jobs-launcher.git
git@api.gitlife.ru:oschina-mirror/tenmg-flink-jobs-launcher.git
oschina-mirror
tenmg-flink-jobs-launcher
tenmg-flink-jobs-launcher
master