1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/dufafei-spark-template

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

шаблон-spark

Шаблон кода для разработки на основе Spark.

Использование Spark-Sessions

val sparkConf = SparkConfBuilder().setAppName("name").setKryoSerializer().get()
val sparkSession = SparkSessionBuilder().setConf(sparkConf).get()
```## Использование Spark-Streaming ##
* Скрытие деталей получения StreamingContext и управления смещением
* Пользователю требуется только написать соответствующий бизнес-код
```scala
val sparkConf = SparkConfBuilder()
    .setAppName(name)
    .setKryoSerializer()
    .setSqlShufflePartition(1000)
    .setStreamingStopGracefullyOnShutdown(true)
    .setStreamingBackpressure(true)
    .setStreamingKafkaMaxRatePerPartition(10000)
    .get()
val kafkaParam: Map[String, Object] = Map[String, Object](
    "bootstrap.servers" -> "",
    "key.deserializer" -> "",
    "value.deserializer" -> "",
    "group.id" -> "",
    "enable.auto.commit" -> (false: java.lang.Boolean)
)
SparkStreamingBuilder()
    .setSparkConf(sparkConf)
    .setKafkaParam(kafkaParam)
    .setTopics(Array("test"))
    .setDuration(Seconds(5))
    .execute { input =>
      input.map(_.value()).foreachRDD { rdd =>
        if (!rdd.isEmpty()) {
          rdd.foreachPartition { partition =>
            partition.foreach(println(_))
          }
        }
      }
    }

Инструментальные классы

  • Чтение конфигурационных файлов — com.typesafe.config
def confPath: String = {
  val path = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
  var url = URLDecoder.decode(path, "utf-8")
  if (url.endsWith(".jar")) {
    url = url.substring(0, url.lastIndexOf("/") + 1)
  }
  val file = new File(url)
  val parent = file.getParent
  parent + "/conf"
}
def load(name: String): Config = {
  val conf = new File(confPath + "/" + name)
  ConfigFactory.parseFile(conf)
}
  • Подключение JDBC через пулы соединений — scalikejdbc
def init(
          name: String, url: String,
          username: String, password: String, settings: ConnectionPoolSettings
        ): Unit = {
  if (!ConnectionPool.isInitialized(Symbol(name))) {
    ConnectionPool.add(Symbol(name), url, username, password, settings)
    info(s"Успешная инициализация пула подключений к базе данных: $name")
  }
}
def init(name: String, dataSource: DataSource): Unit = {
  if (!ConnectionPool.isInitialized(Symbol(name))) {
    ConnectionPool.```scala
add(Symbol(name), new DataSourceConnectionPool(dataSource))
info(s"успешная инициализация пула соединений с базой данных: $name")
}
def getConnection(
                   driverClassName: String, url: String,
                   username: String, password: String
                 ): Connection = {
  Class.forName(driverClassName)
  DriverManager.getConnection(url, username, password)
}
def using[A](conn: Connection)(execute: DB => A): A = {
  val db = DB(conn)
  db.autoClose(false)
  using(db)(execute)
}
def using[A](name: String)(execute: DB => A): A = {
  val db = DB(ConnectionPool(Symbol(name)).borrow())
  db.autoClose(false)
  using(db)(execute)
}

Дополнительные детали см. в коде

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Описание недоступно Развернуть Свернуть
Apache-2.0
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/dufafei-spark-template.git
git@api.gitlife.ru:oschina-mirror/dufafei-spark-template.git
oschina-mirror
dufafei-spark-template
dufafei-spark-template
master