1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/qxzzxq-setl

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

Если вы специалист по данным или инженер данных, то вам, возможно, знакомы следующие проблемы при работе над проектом ETL:

  • Переключение между несколькими проектами — это хлопотно.
  • Отладка чужого кода — кошмар.
  • Трата большого количества времени на решение проблем, не связанных с бизнесом.

SETL (произносится как «сетл») — это фреймворк на Scala, основанный на Apache Spark, который помогает структурировать проекты Spark ETL, модульно организовать логику преобразования данных и ускорить разработку.

Использование SETL

В новом проекте

Вы можете начать работу, клонировав этот шаблон проекта.

В существующем проекте

<dependency>
  <groupId>com.jcdecaux.setl</groupId>
  <artifactId>setl_2.12</artifactId>
  <version>1.0.0-RC1</version>
</dependency>

Чтобы использовать версию SNAPSHOT, добавьте репозиторий моментальных снимков Sonatype в свой pom.xml:

<repositories>
  <repository>
    <id>ossrh-snapshots</id>
    <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
  </repository>
</repositories>

<dependencies>
  <dependency>
    <groupId>com.jcdecaux.setl</groupId>
    <artifactId>setl_2.12</artifactId>
    <version>1.0.0-SNAPSHOT</version>
  </dependency>
</dependencies>

Быстрый старт

Основная концепция

С помощью SETL приложение ETL можно представить в виде Pipeline. Pipeline содержит несколько Stages. На каждом этапе можно найти одну или несколько Factories.

Класс Factory[T] — это абстракция преобразования данных, которое будет создавать объект типа T. Он имеет 4 метода (read, process, write и get), которые должны быть реализованы разработчиком.

Класс SparkRepository[T] — абстракция уровня доступа к данным. Его можно использовать для чтения/записи Dataset[T] из/в хранилище данных. Он должен быть определён в файле конфигурации. Вы можете иметь столько SparkRepositories, сколько захотите.

Точка входа в проект SETL — объект com.jcdecaux.setl.Setl, который будет обрабатывать создание экземпляра конвейера и репозитория Spark.

Пример кода

Пример создания и сохранения Dataset[TestObject] можно найти в стартовом шаблоне SETL. Клонируйте его :)

Здесь мы показываем простой пример создания и сохранения набора данных [TestObject]. Класс case TestObject определяется следующим образом:

case class TestObject(partition1: Int, partition2: String, clustering1: String, value: Long)

Инициализация контекста

Предположим, что мы хотим сохранить наш вывод в src/main/resources/test_csv. Мы можем создать файл конфигурации local.conf в src/main/resources со следующим содержимым, определяющим целевой магазин данных для сохранения нашего набора данных:

testObjectRepository {
  storage = "CSV"
  path = "src/main/resources/test_csv"
  inferSchema = "true"
  delimiter = ";"
  header = "true"
  saveMode = "Append"
}

В нашем файле App.scala мы создаём Setl и регистрируем этот магазин данных:

val setl: Setl = Setl.builder()
  .withDefaultConfigLoader()
  .getOrCreate()

// Регистрируем SparkRepository в контексте
setl.setSparkRepository[TestObject]("testObjectRepository")

Реализация Factory

Мы создадим наш Dataset[TestObject] внутри Factory[Dataset[TestObject]]. Factory[A] всегда будет создавать объект типа A, и он содержит 4 абстрактных метода. Методы, которые необходимо реализовать:

  • read;
  • process;
  • write;
  • get.
class MyFactory() extends Factory[Dataset[TestObject]] with HasSparkSession {
  
  import spark.implicits._
    
  // A repository is needed for writing data. It will be delivered by the pipeline
  @Delivery 
  private[this] val repo = SparkRepository[TestObject]

  private[this] var output = spark.emptyDataset[TestObject]

  override def read(): MyFactory.this.type = {
    // in our demo we don't need to read any data
    this
  }

  override def process(): MyFactory.this.type = {
    output = Seq(
      TestObject(1, "a", "A", 1L),
      TestObject(2, "b", "B", 2L)
    ).toDS()
    this
  }

  override def write(): MyFactory.this.type = {
    repo.save(output)  // use the repository to save the output
    this
  }

  override def get(): Dataset[TestObject] = output

}

Определите конвейер.

Чтобы выполнить фабрику, мы должны добавить её в конвейер. Когда мы вызываем setl.newPipeline(), Setl создаст новый Pipeline и настроит все зарегистрированные репозитории как входы конвейера. Затем мы можем вызвать addStage, чтобы добавить нашу фабрику в конвейер.

val pipeline = setl
  .newPipeline()
  .addStage[MyFactory]()

Запустите наш конвейер.

pipeline.describe().run()

Набор данных будет сохранён в src/main/resources/test_csv.

Что ещё?

Поскольку наша MyFactory создаёт Dataset[TestObject], она может быть использована другими фабриками того же конвейера.

class AnotherFactory extends Factory[String] with HasSparkSession {

  import spark.implicits._

  @Delivery
  private[this] val outputOfMyFactory = spark.emptyDataset[TestObject]

  override def read(): AnotherFactory.this.type = this

  override def process(): AnotherFactory.this.type = this

  override def write(): AnotherFactory.this.type = {
    outputOfMyFactory.show()
    this
  }

  override def get(): String = "output"
}

Добавьте эту фабрику в конвейер:

pipeline.addStage[AnotherFactory]()

Пользовательский коннектор

Вы можете реализовать собственный источник данных, реализовав интерфейс ConnectorInterface.

class CustomConnector extends ConnectorInterface with CanDrop {
  override def setConf(conf: Conf): Unit = null

  override def read(): DataFrame = {
    import spark.implicits._
    Seq(1, 2, 3).toDF("id")
  }

  override def write(t: DataFrame, suffix: Option[String]): Unit = logDebug("Write with suffix")

  override def write(t: DataFrame): Unit = logDebug("Write")

  /**
   * Drop the entire table.
   */
  override def drop(): Unit = logDebug("drop")
}

Чтобы использовать его, просто установите хранилище на OTHER и предоставьте ссылку на класс вашего коннектора:

myConnector {
  storage = "OTHER"
  class = "com.example.CustomConnector"  // ссылка на класс вашего коннектора
}

Создайте диаграмму конвейера

Вы можете создать диаграмму Mermaid с помощью:

pipeline.showDiagram()

У вас будет некоторый журнал, подобный этому:

--------- MERMAID DIAGRAM ---------
classDiagram
class MyFactory {
  <<Factory[Dataset[TestObject]]>>
  +SparkRepository[TestObject]
}

class DatasetTestObject {
  <<Dataset[TestObject]>>
  >partition1: Int
  >partition2: String
  >clustering1: String
  >value: Long
}

DatasetTestObject <|.. MyFactory : Output
class AnotherFactory {
  <<Factory[String]>>
  +Dataset[TestObject]
}

class StringFinal {
  <<String>>
  
}

StringFinal <|.. AnotherFactory : Output
class SparkRepositoryTestObjectExternal {
  <<SparkRepository[TestObject]>>
  
}

AnotherFactory <|-- DatasetTestObject : Input
MyFactory <|-- SparkRepositoryTestObjectExternal : Input

------- END OF MERMAID CODE -------

Вы можете скопировать предыдущий код в средство просмотра Markdown, которое поддерживает Mermaid.
Или вы можете попробовать онлайн-редактор: **Конфигурация приложения**

Система конфигурации SETL позволяет пользователям выполнять своё Spark-приложение в различных средах исполнения, используя специфичные для среды конфигурации.

В каталоге `src/main/resources` у вас должно быть как минимум два файла конфигурации с именами `application.conf` и `local.conf`. Эти файлы необходимы, если вы хотите запустить приложение только в одной среде.

Вы также можете создать другие конфигурации (например, `dev.conf` и `prod.conf`), в которых можно определить параметры, специфичные для конкретной среды.

* *application.conf* — этот файл конфигурации должен содержать универсальные конфигурации, которые можно использовать независимо от среды выполнения.
* *env.conf (например, local.conf, dev.conf)* — эти файлы должны содержать параметры, специфичные для определённой среды. По умолчанию будет использоваться `local.conf`.

**Как использовать конфигурацию**

Представьте случай, когда у нас есть две среды: локальная среда разработки и удалённая производственная среда. Нашему приложению нужен репозиторий для сохранения и загрузки данных. В этом случае давайте подготовим `application.conf`, `local.conf`, `prod.conf` и `storage.conf`:

```hocon
# application.conf
setl.environment = ${app.environment}
setl.config {
  spark.app.name = "my_application"
  # и другие общие конфигурации Spark
}
# local.conf
include "application.conf"

setl.config {
  spark.default.parallelism = "200"
  spark.sql.shuffle.partitions = "200"
  # и другие локальные конфигурации Spark
}

app.root.dir = "/some/local/path"

include "storage.conf"
# prod.conf
setl.config {
  spark.default.parallelism = "1000"
  spark.sql.shuffle.partitions = "1000"
  # и другие производственные конфигурации Spark
}

app.root.dir = "/some/remote/path"

include "storage.conf"
# storage.conf
myRepository {
  storage = "CSV"
  path = ${app.root.dir}  // этот путь будет зависеть от выполнения
``` **Окружение**

inferSchema = "true"
delimiter = ";"
header = "true"
saveMode = "Append"
}

**Чтобы скомпилировать с локальной конфигурацией, с maven, просто запустите:**

```shell
mvn compile

Чтобы скомпилировать с производственной конфигурацией, передайте свойство jvm app.environment.

mvn compile -Dapp.environment=prod

Убедитесь, что в вашем каталоге ресурсов включено фильтрование:

<resources>
    <resource>
        <directory>src/main/resources</directory>
        <filtering>true</filtering>
    </resource>
</resources>

Зависимости

В настоящее время SETL поддерживает следующие источники данных. Вам не нужно предоставлять эти библиотеки в своём проекте (кроме драйвера JDBC):

— Все форматы файлов, поддерживаемые Apache Spark (csv, json, parquet и т. д.); — Delta; — Excel (crealytics/spark-excel); — Cassandra (datastax/spark-cassandra-connector); — DynamoDB (audienceproject/spark-dynamodb); — JDBC (вы должны предоставить драйвер jdbc).

Чтобы читать/записывать данные из/в AWS S3 (или другие службы хранения), вы должны включить соответствующую библиотеку hadoop в свой проект.

Например:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-aws</artifactId>
    <version>2.9.2</version>
</dependency>

Вы также должны указать Scala и Spark в файле pom. SETL протестирован на следующих версиях Spark:

Версия Spark Версия Scala Примечание
3.0 2.12 :heavy_check_mark: Ok
2.4 2.12 :heavy_check_mark: Ok
2.4 2.11 :warning: см. известные проблемы
2.3 2.11 :warning: см. известные проблемы

Известные проблемы

Spark 2.4 с Scala 2.11

При использовании setl_2.11-1.x.x со Spark 2.4 и Scala 2.11 вам может потребоваться вручную включить следующие зависимости, чтобы переопределить версию по умолчанию:

<dependency>
    <groupId>com.audienceproject</groupId>
    <artifactId>spark-dynamodb_2.11</artifactId>
    <version>1.0.4</version>
</dependency>
<dependency>
    <groupId>io.delta</groupId>
    <artifactId>delta-core_2.11</artifactId>
    <version>0.6.1</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.5.1</version>
</dependency>

Spark 2.3 с Scala 2.11

  • DynamoDBConnector не работает со Spark версии 2.3;
  • аннотацию Compress можно использовать только для поля Struct или массива полей Struct со Spark 2.3.

Покрытие тестами

coverage.svg

Документация

https://setl-framework.github.io/setl/

Вклад в SETL

Ознакомьтесь с нашим руководством по внесению вклада.

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Простой ETL-фреймворк на основе Spark, который просто работает. Развернуть Свернуть
Apache-2.0
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/qxzzxq-setl.git
git@api.gitlife.ru:oschina-mirror/qxzzxq-setl.git
oschina-mirror
qxzzxq-setl
qxzzxq-setl
master