flink-jobs-launcher
Важное предупреждение: в проекте flink-jobs уже предоставлен flink-jobs-clients в качестве замены этого проекта, настоятельно рекомендуется пользователям перейти на использование, этот проект не будет обновляться после версии flink-jobs 1.3+.
flink-jobs-launcher — это библиотека запуска приложений для проектов flink-jobs, которая может использоваться для запуска flink-jobs или обычных заданий Flink. С помощью flink-jobs-launcher можно быстро интегрировать Flink в существующие системы, основанные на Java, а также управлять Flink SQL через файлы конфигурации XML. Типичная архитектура развертывания flink-jobs-launcher выглядит следующим образом:
1.1.3 и более ранние версии поддерживают только CommandLineFlinkJobsLauncher, начиная с версии 1.1.4 рекомендуется использовать RestClusterClientFlinkJobsLauncher. Ниже приведён пример использования RestClusterClientFlinkJobsLauncher в Maven-проекте.
<!-- https://mvnrepository.com/artifact/cn.tenmg/flink-jobs-launcher -->
<dependency>
<groupId>cn.tenmg</groupId>
<artifactId>flink-jobs-launcher</artifactId>
<version>${flink-jobs-launcher.version}</version>
</dependency>
# RPC configuration
jobmanager.rpc.address= 192.168.100.27
jobmanager.rpc.port=6123
# Рекомендуемый, но необязательный путь к файлу JAR, который flink-jobs-launcher отправляет на выполнение.
flink.jobs.default.jar=/yourpath/your-flink-jobs-app-1.0.0.jar
# Необязательный класс, который flink-jobs-launcher отправляет для выполнения. Вы также можете указать основной класс в файле JAR.
#flink.jobs.default.class=yourpackage.App
Вызовите метод load класса XMLConfigLoader для загрузки файла конфигурации XML и отправьте его на выполнение с помощью запускателя:
FlinkJobs flinkJobs = XMLConfigLoader.getInstance().load(ClassUtils.getDefaultClassLoader().getResourceAsStream("flink-jobs.xml"));
RestClusterClientFlinkJobsLauncher launcher = new RestClusterClientFlinkJobsLauncher();
FlinkJobsInfo flinkJobsInfo = launcher.launch(flinkJobs);
System.out.println("Flink job launched: " + JSON.toJSONString(flinkJobsInfo));// Запуск задания flink-jobs
или
FlinkJobs flinkJobs = XMLConfigLoader.getInstance()
.load("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\r\n" +
"<flink-jobs xmlns=\"http://www.10mg.cn/schema/flink-jobs\"\r\n" +
" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\r\n" +
" xsi:schemaLocation=\"http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd\"\r\n" +
" jar=\"/opt/flink-jobs/flink-jobs-quickstart-1.1.4.jar\" serviceName=\"HelloWorldService\">\r\n" +
"</flink-jobs>");
RestClusterClientFlinkJobsLauncher launcher = new RestClusterClientFlinkJobsLauncher();
System.out.println("Flink job launched: " + JSON.toJSONString(flinkJobsInfo));// Запуск задания flink-jobs
System.out.println("Задание flink-jobs с идентификатором задания: " + flinkJobsInfo.getJobId() + " остановлено, путь сохранения: "
+ launcher.stop(flinkJobsInfo.getJobId()));// Остановка задания flink-jobs
Подробнее см. https://gitee.com/tenmg/flink-jobs-launcher-quickstart
<flink-jobs>
flink-jobs — это корневой узел XML-файла конфигурации задач flink-jobs. Обратите внимание, что необходимо правильно настроить пространство имён, обычно структура выглядит следующим образом:
<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd">
</flink-jobs>
Связанные атрибуты и описание:
Атрибут | Тип | Обязателен | Описание |
---|---|---|---|
jar | String | Нет | Исполняемый файл JAR. Можно настроить путь по умолчанию для исполняемого файла JAR с помощью конфигурации файла flink.jobs.default.jar . |
class | String | Нет | Основной класс. Можно настроить основной класс по умолчанию с помощью конфигурации файла flink.jobs.default.class . |
serviceName | String | runtimeMode |
<configuration>
Индивидуальная конфигурация заданий Apache Flink, формат: k1=v1[,k2=v3…]
. Например: <configuration><![CDATA[pipeline.name=customJobName]]></configuration>
означает, что пользовательское задание Apache Flink SQL называется customJobName. Подробные сведения о конфигурации см. в официальной документации Apache Flink.
<options>
Конфигурация параметров запуска, используемая для указания параметров запуска программы Apache Flink.
<option>
Определённая конфигурация параметра запуска. XSD-файл предоставляет список часто используемых значений ключей, которые могут быть автоматически предложены в среде IDE. При использовании CommandLineFlinkJobsLauncher поддерживаются не только эти параметры, но и любые другие параметры, поддерживаемые Apache Flink (см. документацию Apache Flink или запустите flink -h
, чтобы получить помощь).
Свойства | Тип | Обязательный | Описание |
---|---|---|---|
key | String |
Да | Параметр ключа. |
value | String |
Нет | Значение параметра. Предоставляется непосредственно через опцию в виде текста, например <option>value</option> или <option><![CDATA[value]]></option> . |
<params>
Таблица поиска параметров. Обычно используется в SQL, а также может использоваться в пользовательских сервисах приложения flink-jobs с помощью аргумента arguments.
<param>
Специфическая конфигурация параметров.
Свойства | Тип | Обязательный | Описание |
---|---|---|---|
name | String |
Да | Имя параметра. |
value | String |
Нет | Значение параметра. |
<bsh>
Настройка для выполнения кода Java на основе Beanshell.
Свойства | Тип | Обязательный | Описание |
---|---|---|---|
saveAs | String |
Нет | Сохраняет результат операции как новое имя переменной. Значение переменной — это возвращаемое значение кода Java на основе Beanshell (с использованием return xxx; ). |
<var>
Объявление переменных, используемых в коде Java на основе Beanshell.
Свойства | Тип | Обязательный | Описание |
---|---|---|---|
name | String |
Да | Переменная, используемая в Beanshell. |
value | String |
Нет | Значение переменной. По умолчанию совпадает с именем. flink-jobs ищет параметр с именем value и, если он существует и не равен null, использует его значение в качестве значения параметра; в противном случае используется значение value. |
<java>
Код Java. Используется текстовое представление, например: <java>java code</java>
или <option><![CDATA[java code]]></option>
. Обратите внимание: при использовании дженериков нельзя использовать угловые скобки для объявления дженерика. Например, при использовании Map нельзя использовать «Map<String , String> map = new HashMap<String , String>();», но можно использовать «Map map = new HashMap();».
<execute-sql>
Выполнение кода SQL на основе DSL.
Свойства | Тип | Обязательный | Описание |
---|---|---|---|
saveAs | String |
Нет | Сохраняет результат операции как новое имя переменной. Переменной присваивается возвращаемое значение tableEnv.executeSql(statement);. |
dataSource | String |
Нет | Используемое имя источника данных. Здесь источник данных настроен в файле конфигурации приложения flink-jobs, а не в файле конфигурации flink-jobs-launcher. См. конфигурацию источника данных flink-jobs. |
catalog | String |
Нет | Каталог, используемый для выполнения SQL-запросов Flink. |
script | String |
Нет | Код SQL на основе DSL. Используется текстовое представление, такое как <execute-sql>SQL code</execute-sql> или <execute-sql><![CDATA[SQL code]]></execute-sql> . Поскольку Flink SQL не поддерживает DELETE и UPDATE, такие запросы выполняются в функции main программы с использованием JDBC. |
<sql-query>
Выполнение SQL-запроса на основе DSL.
Свойства | Тип | Обязательный | Описание |
---|---|---|---|
saveAs | String |
Нет | Сохранение результата запроса во временной таблице и сохранение результата операции как нового имени переменной. Переменной присваивается возвращаемое значение tableEnv.executeSql(statement);. |
catalog | String |
Нет | Каталог, используемый для выполнения SQL-запросов Flink. |
script | String |
Нет | Код SQL на основе DSL. Используется текстовое представление, такое как <sql-query>SQL code</sql-query> или <sql-query><![CDATA[SQL code]]></sql-query> . |
<jdbc>
Выполнение JDBC SQL на основе DSL. Целевой код JDBC SQL выполняется в функции main приложения [flink-jobs].
Свойства | Тип | Обязательный | Описание |
---|---|---|---|
saveAs | String |
Нет | Результат операции сохраняется как новое имя переменной. Переменной присваивается значение, возвращаемое методом executeLargeUpdate. |
dataSource | String |
Да | Используемое имя источника данных. Здесь источник данных настроен в файле конфигурации приложения flink-jobs, а не в файле конфигурации flink-jobs-launcher. См. конфигурацию источника данных flink-jobs. |
method | String |
Нет | Вызываемый метод JDBC. По умолчанию — executeLargeUpdate. |
script | String |
Да | Код SQL на основе DSL. |
<data-sync>
Запуск потоковой задачи на основе Flink SQL для синхронизации данных, поддерживается версия 1.1.2+. Подробные сведения об атрибутах и описании приведены ниже:
Свойства | Тип | Обязательный | Описание |
---|---|---|---|
saveAs | String |
Нет | Результат операции сохраняется как новое имя переменной. Переменной присваивается объект org.apache.flink.table.api.TableResult, возвращаемый оператором INSERT. Обычно не используется. |
from | String |
Да |
Тема: String
|
FromConfig: String
|
Нет | Источник конфигурации. Например: properties.group.id=flink-jobs . |
|
To: String
|
Да | Имя целевого источника данных, в настоящее время поддерживается только JDBC. | |
ToConfig: String
|
Да | Целевая конфигурация. Например: sink.buffer-flush.max-rows = 0 . |
|
Table: String
|
Да | Синхронное имя таблицы данных. | |
PrimaryKey: String
|
Нет | Первичный ключ, несколько имён столбцов разделяются «,». При включении интеллектуального режима автоматически получает информацию о первичном ключе. | |
Timestamp: String
|
Нет | Имя столбца временной метки, несколько имён столбцов используются для разделения «,». После установки этого значения при создании исходной и целевой таблиц будут добавлены эти столбцы, а также они будут записаны в эти столбцы во время синхронизации данных. Обычно используется для настройки унифицированного указания в файле конфигурации приложения flink-jobs вместо указания каждого отдельного задания синхронизации. | |
Smart: Boolean
|
Нет | Включение интеллектуального режима. Если не установлено, то глобальная конфигурация определяет, следует ли включать интеллектуальный режим, глобальное значение по умолчанию — data.sync.smart=true . |
<column>
: Element
| Нет | Столбец синхронизации данных. При включении интеллектуального режима информация о столбце будет получена автоматически.
Атрибут | Тип | Обязательный | Описание |
---|---|---|---|
FromName | String |
Да | Исходное имя столбца. |
FromType | String |
Нет | Исходный тип данных. Если отсутствует, то при включённом интеллектуальном режиме будет автоматически получен целевой тип данных в качестве исходного типа данных, если интеллектуальный режим выключен, то обязательно. |
ToName | String |
Нет | Целевое имя столбца. По умолчанию совпадает с исходным именем столбца. |
ToType | String |
Нет | Целевой тип данных столбца. Если отсутствует, то будет автоматически получен при включённом интеллектуальном режиме, если интеллектуальный режим выключен — по умолчанию равен исходному типу данных столбца. |
Strategy | String |
Нет | Стратегия синхронизации. Возможные значения: both/from/to, оба означает создание исходного и целевого столбцов, from означает создание только исходного столбца, to означает создание только целевого столбца, по умолчанию используется значение both. |
Script | String |
Нет | Пользовательский скрипт. Обычно используется при необходимости выполнения функции преобразования. |
Для лучшего понимания файла конфигурации flink-jobs ниже приведены примеры конфигурационных файлов XML для нескольких распространённых сценариев:
<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd"
jar="D:\Programs\flink-1.8.3\examples\batch\WordCount.jar">
</flink-jobs>
Ниже приведён пример файла конфигурации задачи XML для пользовательского сервиса:
<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd"
jar="/yourPath/yourJar.jar" serviceName="yourServiceName">
</flink-jobs>
Ниже представлен простой файл конфигурации задачи XML для выполнения пакетной обработки заказанного количества статистических SQL-задач:
<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd"
jar="/yourPath/yourJar.jar">
<!--Параметры задачи, некоторые общие параметры также могут быть указаны перед вызовом Java API, например системное время и т. д. -->
<params>
<param name="beginDate">2021-01-01</param>
<param name="endDate">2021-07-01</param>
</params>
<!-- Использование источника данных с именем hivedb для создания каталога с именем hive -->
<execute-sql dataSource="hivedb">
<![CDATA[
create catalog hive
]]>
</execute-sql>
<!--Загрузка модуля hive -->
<execute-sql>
<![CDATA[
load module hive
]]>
</execute-sql>
<!--Использование модулей hive,core -->
<execute-sql>
<![CDATA[
use modules hive,core
]]>
</execute-sql>
<!-- Использовать источник данных с именем pgdb для создания таблицы order_stats_daily (если имя исходной таблицы и указанное имя таблицы построения не совпадают, вы можете указать WITH ('table-name' = 'actrual_table_name') для указания) -->
<execute-sql dataSource="pgdb">
<![CDATA[
CREATE TABLE order_stats_daily (
stats_date DATE,
`count` BIGINT,
PRIMARY KEY (stats_date) NOT ENFORCED
) WITH ('sink.buffer-flush.max-rows' = '0')
]]>
</execute-sql>
<!-- Используйте каталог hive для запроса и сохраните результаты как временную таблицу tmp в default_catalog по умолчанию -->
<sql-query saveAs="tmp" catalog="hive">
<![CDATA[
select cast(to_date(o.business_date) as date) stats_date, count(*) `count` from odc_order_info_par o where o.business_date >= :beginDate and o.business_date < :endDate group by cast(to_date(o.business_date) as date)
]]>
</sql-query>
<!-- Удалить существующую статистику order_stats_daily данных (FLINK SQL не поддерживает DELETE, здесь выполняется JDBC)-->
<execute-sql dataSource="pgdb">
<![CDATA[
delete from order_stats_daily where stats_date >= :beginDate and stats_date < :endDate
]]>
</execute-sql>
<!-- Вставка данных. На самом деле Flink в конечном итоге выполнит Upsert синтаксис -->
<execute-sql>
<![CDATA[
INSERT INTO order_stats_daily(stats_date,`count`) SELECT stats_date, `count` FROM tmp
]]> **Запуск потоковой обработки SQL**
Ниже приведён XML-файл конфигурации задачи для синхронизации гетерогенных баз данных с помощью Debezium:
Конфигурация Flink-Jobs
#### Запуск задачи синхронизации данных
Ниже представлен файл конфигурации XML для выполнения задачи синхронизации разнородных баз данных с помощью Debezium:
```xml version="1.0" encoding="UTF-8"?>
<flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs-1.1.2.xsd">
<data-sync table="od_order_info" to="data_skyline"
from="kafka" topic="testdb.testdb.od_order_info">
<!-- В случае, когда структура таблиц в источнике и целевой базе данных одинакова (имена и типы полей совпадают), интеллектуальный режим позволяет автоматически получать метаданные таблицы из целевой базы данных, что упрощает процесс синхронизации данных. -->
<!-- Если структура таблиц в источнике и целевой базе данных отличается (например, различаются имена или типы полей), необходимо настроить информацию о различиях столбцов вручную, например, указать исходный тип и функцию преобразования: -->
<column fromName="UPDATE_TIME" fromType="BIGINT">TO_TIMESTAMP(FROM_UNIXTIME(UPDATE_TIME/1000, 'yyyy-MM-dd HH:mm:ss'))</column>
<!-- Кроме того, если интеллектуальный режим отключен, необходимо предоставить подробную информацию обо всех столбцах. -->
</data-sync>
</flink-jobs>
По умолчанию конфигурационный файл называется flink-jobs-launcher.properties
(обратите внимание: он должен находиться в каталоге classpath
), но можно изменить путь и имя файла конфигурации с помощью файла flink-jobs-launcher-context-loader.properties
.
Свойство | Тип | Обязательное | Описание |
---|---|---|---|
flink.jobs.default.jar | String | Нет | JAR-пакет, который по умолчанию отправляется при запуске Flink. Используется, если в конфигурации задачи не указано свойство jar . |
flink.jobs.default.class | String | Нет | Основной класс, который по умолчанию запускается при отправке задания в Flink. Используется, если в конфигурации задачи не указано свойство class . |
Свойство | Тип | Обязательное | Описание |
---|---|---|---|
jobmanager.rpc.servers | String | Нет | Адрес удалённого вызова Flink кластера, формат: host1:port1,host2:port2,… , где порт может быть опущен, в этом случае все порты будут равны значению jobmanager.rpc.port . |
jobmanager.rpc.address | String | Нет | Удалённый адрес вызова Flink кластера. Рекомендуется использовать только один хост, использование устарело. Настройка jobmanager.rpc.servers делает эту настройку недействительной. |
jobmanager.rpc.port | int | Да | Порт удалённого вызова Flink кластера по умолчанию равен 6123. |
jobmanager.* | — | — | Другие настройки Flink кластера см. на официальном сайте Flink. |
rest.* | — | — | Другие настройки Flink кластера см. на официальном сайте Flink. |
Свойство | Тип | Обязательное | Описание |
---|---|---|---|
commandline.flink.home | String | Нет | Если доступ к командной строке Flink не требует добавления каталога, настройка не требуется. |
commandline.launch.action | String | Нет | По умолчанию равно run . |
commandline.launch.temp_file_prefix | String | Нет | Префикс временных файлов, созданных во время запуска, по умолчанию flink-jobs_ . |
commandline.yarn.rest | String | Нет | YARN REST адрес и порт, необходимые для запуска Flink On YARN. |
commandline.yarn.application_check_attempts | String | Нет | Значение по умолчанию — 60. Определяет количество попыток проверки Application ID через автоматически сгенерированное уникальное имя приложения при работе с Flink On YARN. |
commandline.yarn.time_millis_between_application_check_attempts | String | Нет | Значение по умолчанию — 3000. Определяет интервал ожидания между двумя попытками проверки Application ID при работе с Flink On YARN через автоматически сгенерированное уникальное имя приложения. |
commandline.yarn.application_id_prefix | String | Нет | Значение по умолчанию — application_ . Определяет префикс автоматически сгенерированного уникального имени приложения при работе с Flink On YARN. |
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )