Если вы специалист по данным или инженер данных, то вам, возможно, знакомы следующие проблемы при работе над проектом ETL:
SETL (произносится как «сетл») — это фреймворк на Scala, основанный на Apache Spark, который помогает структурировать проекты Spark ETL, модульно организовать логику преобразования данных и ускорить разработку.
Вы можете начать работу, клонировав этот шаблон проекта.
<dependency>
<groupId>com.jcdecaux.setl</groupId>
<artifactId>setl_2.12</artifactId>
<version>1.0.0-RC1</version>
</dependency>
Чтобы использовать версию SNAPSHOT, добавьте репозиторий моментальных снимков Sonatype в свой pom.xml
:
<repositories>
<repository>
<id>ossrh-snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.jcdecaux.setl</groupId>
<artifactId>setl_2.12</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
С помощью SETL приложение ETL можно представить в виде Pipeline
. Pipeline
содержит несколько Stages
. На каждом этапе можно найти одну или несколько Factories
.
Класс Factory[T]
— это абстракция преобразования данных, которое будет создавать объект типа T
. Он имеет 4 метода (read, process, write и get), которые должны быть реализованы разработчиком.
Класс SparkRepository[T]
— абстракция уровня доступа к данным. Его можно использовать для чтения/записи Dataset[T]
из/в хранилище данных. Он должен быть определён в файле конфигурации. Вы можете иметь столько SparkRepositories, сколько захотите.
Точка входа в проект SETL — объект com.jcdecaux.setl.Setl
, который будет обрабатывать создание экземпляра конвейера и репозитория Spark.
Пример создания и сохранения Dataset[TestObject]
можно найти в стартовом шаблоне SETL. Клонируйте его :)
Здесь мы показываем простой пример создания и сохранения набора данных [TestObject]. Класс case TestObject определяется следующим образом:
case class TestObject(partition1: Int, partition2: String, clustering1: String, value: Long)
Предположим, что мы хотим сохранить наш вывод в src/main/resources/test_csv
. Мы можем создать файл конфигурации local.conf в src/main/resources
со следующим содержимым, определяющим целевой магазин данных для сохранения нашего набора данных:
testObjectRepository {
storage = "CSV"
path = "src/main/resources/test_csv"
inferSchema = "true"
delimiter = ";"
header = "true"
saveMode = "Append"
}
В нашем файле App.scala
мы создаём Setl
и регистрируем этот магазин данных:
val setl: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
// Регистрируем SparkRepository в контексте
setl.setSparkRepository[TestObject]("testObjectRepository")
Мы создадим наш Dataset[TestObject]
внутри Factory[Dataset[TestObject]]
. Factory[A]
всегда будет создавать объект типа A
, и он содержит 4 абстрактных метода. Методы, которые необходимо реализовать:
class MyFactory() extends Factory[Dataset[TestObject]] with HasSparkSession {
import spark.implicits._
// A repository is needed for writing data. It will be delivered by the pipeline
@Delivery
private[this] val repo = SparkRepository[TestObject]
private[this] var output = spark.emptyDataset[TestObject]
override def read(): MyFactory.this.type = {
// in our demo we don't need to read any data
this
}
override def process(): MyFactory.this.type = {
output = Seq(
TestObject(1, "a", "A", 1L),
TestObject(2, "b", "B", 2L)
).toDS()
this
}
override def write(): MyFactory.this.type = {
repo.save(output) // use the repository to save the output
this
}
override def get(): Dataset[TestObject] = output
}
Определите конвейер.
Чтобы выполнить фабрику, мы должны добавить её в конвейер.
Когда мы вызываем setl.newPipeline()
, Setl создаст новый Pipeline и настроит все зарегистрированные репозитории как входы конвейера. Затем мы можем вызвать addStage
, чтобы добавить нашу фабрику в конвейер.
val pipeline = setl
.newPipeline()
.addStage[MyFactory]()
Запустите наш конвейер.
pipeline.describe().run()
Набор данных будет сохранён в src/main/resources/test_csv
.
Что ещё?
Поскольку наша MyFactory
создаёт Dataset[TestObject]
, она может быть использована другими фабриками того же конвейера.
class AnotherFactory extends Factory[String] with HasSparkSession {
import spark.implicits._
@Delivery
private[this] val outputOfMyFactory = spark.emptyDataset[TestObject]
override def read(): AnotherFactory.this.type = this
override def process(): AnotherFactory.this.type = this
override def write(): AnotherFactory.this.type = {
outputOfMyFactory.show()
this
}
override def get(): String = "output"
}
Добавьте эту фабрику в конвейер:
pipeline.addStage[AnotherFactory]()
Вы можете реализовать собственный источник данных, реализовав интерфейс ConnectorInterface
.
class CustomConnector extends ConnectorInterface with CanDrop {
override def setConf(conf: Conf): Unit = null
override def read(): DataFrame = {
import spark.implicits._
Seq(1, 2, 3).toDF("id")
}
override def write(t: DataFrame, suffix: Option[String]): Unit = logDebug("Write with suffix")
override def write(t: DataFrame): Unit = logDebug("Write")
/**
* Drop the entire table.
*/
override def drop(): Unit = logDebug("drop")
}
Чтобы использовать его, просто установите хранилище на OTHER и предоставьте ссылку на класс вашего коннектора:
myConnector {
storage = "OTHER"
class = "com.example.CustomConnector" // ссылка на класс вашего коннектора
}
Вы можете создать диаграмму Mermaid с помощью:
pipeline.showDiagram()
У вас будет некоторый журнал, подобный этому:
--------- MERMAID DIAGRAM ---------
classDiagram
class MyFactory {
<<Factory[Dataset[TestObject]]>>
+SparkRepository[TestObject]
}
class DatasetTestObject {
<<Dataset[TestObject]>>
>partition1: Int
>partition2: String
>clustering1: String
>value: Long
}
DatasetTestObject <|.. MyFactory : Output
class AnotherFactory {
<<Factory[String]>>
+Dataset[TestObject]
}
class StringFinal {
<<String>>
}
StringFinal <|.. AnotherFactory : Output
class SparkRepositoryTestObjectExternal {
<<SparkRepository[TestObject]>>
}
AnotherFactory <|-- DatasetTestObject : Input
MyFactory <|-- SparkRepositoryTestObjectExternal : Input
------- END OF MERMAID CODE -------
Вы можете скопировать предыдущий код в средство просмотра Markdown, которое поддерживает Mermaid.
Или вы можете попробовать онлайн-редактор: **Конфигурация приложения**
Система конфигурации SETL позволяет пользователям выполнять своё Spark-приложение в различных средах исполнения, используя специфичные для среды конфигурации.
В каталоге `src/main/resources` у вас должно быть как минимум два файла конфигурации с именами `application.conf` и `local.conf`. Эти файлы необходимы, если вы хотите запустить приложение только в одной среде.
Вы также можете создать другие конфигурации (например, `dev.conf` и `prod.conf`), в которых можно определить параметры, специфичные для конкретной среды.
* *application.conf* — этот файл конфигурации должен содержать универсальные конфигурации, которые можно использовать независимо от среды выполнения.
* *env.conf (например, local.conf, dev.conf)* — эти файлы должны содержать параметры, специфичные для определённой среды. По умолчанию будет использоваться `local.conf`.
**Как использовать конфигурацию**
Представьте случай, когда у нас есть две среды: локальная среда разработки и удалённая производственная среда. Нашему приложению нужен репозиторий для сохранения и загрузки данных. В этом случае давайте подготовим `application.conf`, `local.conf`, `prod.conf` и `storage.conf`:
```hocon
# application.conf
setl.environment = ${app.environment}
setl.config {
spark.app.name = "my_application"
# и другие общие конфигурации Spark
}
# local.conf
include "application.conf"
setl.config {
spark.default.parallelism = "200"
spark.sql.shuffle.partitions = "200"
# и другие локальные конфигурации Spark
}
app.root.dir = "/some/local/path"
include "storage.conf"
# prod.conf
setl.config {
spark.default.parallelism = "1000"
spark.sql.shuffle.partitions = "1000"
# и другие производственные конфигурации Spark
}
app.root.dir = "/some/remote/path"
include "storage.conf"
# storage.conf
myRepository {
storage = "CSV"
path = ${app.root.dir} // этот путь будет зависеть от выполнения
``` **Окружение**
inferSchema = "true"
delimiter = ";"
header = "true"
saveMode = "Append"
}
**Чтобы скомпилировать с локальной конфигурацией, с maven, просто запустите:**
```shell
mvn compile
Чтобы скомпилировать с производственной конфигурацией, передайте свойство jvm app.environment
.
mvn compile -Dapp.environment=prod
Убедитесь, что в вашем каталоге ресурсов включено фильтрование:
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
В настоящее время SETL поддерживает следующие источники данных. Вам не нужно предоставлять эти библиотеки в своём проекте (кроме драйвера JDBC):
— Все форматы файлов, поддерживаемые Apache Spark (csv, json, parquet и т. д.); — Delta; — Excel (crealytics/spark-excel); — Cassandra (datastax/spark-cassandra-connector); — DynamoDB (audienceproject/spark-dynamodb); — JDBC (вы должны предоставить драйвер jdbc).
Чтобы читать/записывать данные из/в AWS S3 (или другие службы хранения), вы должны включить соответствующую библиотеку hadoop в свой проект.
Например:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.9.2</version>
</dependency>
Вы также должны указать Scala и Spark в файле pom. SETL протестирован на следующих версиях Spark:
Версия Spark | Версия Scala | Примечание |
---|---|---|
3.0 | 2.12 |
![]() |
2.4 | 2.12 |
![]() |
2.4 | 2.11 |
![]() |
2.3 | 2.11 |
![]() |
При использовании setl_2.11-1.x.x
со Spark 2.4 и Scala 2.11 вам может потребоваться вручную включить следующие зависимости, чтобы переопределить версию по умолчанию:
<dependency>
<groupId>com.audienceproject</groupId>
<artifactId>spark-dynamodb_2.11</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.11</artifactId>
<version>0.6.1</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.5.1</version>
</dependency>
https://setl-framework.github.io/setl/
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )