Слияние кода завершено, страница обновится автоматически
Шаблон кода для разработки на основе Spark.
val sparkConf = SparkConfBuilder().setAppName("name").setKryoSerializer().get()
val sparkSession = SparkSessionBuilder().setConf(sparkConf).get()
```## Использование Spark-Streaming ##
* Скрытие деталей получения StreamingContext и управления смещением
* Пользователю требуется только написать соответствующий бизнес-код
```scala
val sparkConf = SparkConfBuilder()
.setAppName(name)
.setKryoSerializer()
.setSqlShufflePartition(1000)
.setStreamingStopGracefullyOnShutdown(true)
.setStreamingBackpressure(true)
.setStreamingKafkaMaxRatePerPartition(10000)
.get()
val kafkaParam: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "",
"key.deserializer" -> "",
"value.deserializer" -> "",
"group.id" -> "",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
SparkStreamingBuilder()
.setSparkConf(sparkConf)
.setKafkaParam(kafkaParam)
.setTopics(Array("test"))
.setDuration(Seconds(5))
.execute { input =>
input.map(_.value()).foreachRDD { rdd =>
if (!rdd.isEmpty()) {
rdd.foreachPartition { partition =>
partition.foreach(println(_))
}
}
}
}
com.typesafe.config
def confPath: String = {
val path = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
var url = URLDecoder.decode(path, "utf-8")
if (url.endsWith(".jar")) {
url = url.substring(0, url.lastIndexOf("/") + 1)
}
val file = new File(url)
val parent = file.getParent
parent + "/conf"
}
def load(name: String): Config = {
val conf = new File(confPath + "/" + name)
ConfigFactory.parseFile(conf)
}
scalikejdbc
def init(
name: String, url: String,
username: String, password: String, settings: ConnectionPoolSettings
): Unit = {
if (!ConnectionPool.isInitialized(Symbol(name))) {
ConnectionPool.add(Symbol(name), url, username, password, settings)
info(s"Успешная инициализация пула подключений к базе данных: $name")
}
}
def init(name: String, dataSource: DataSource): Unit = {
if (!ConnectionPool.isInitialized(Symbol(name))) {
ConnectionPool.```scala
add(Symbol(name), new DataSourceConnectionPool(dataSource))
info(s"успешная инициализация пула соединений с базой данных: $name")
}
def getConnection(
driverClassName: String, url: String,
username: String, password: String
): Connection = {
Class.forName(driverClassName)
DriverManager.getConnection(url, username, password)
}
def using[A](conn: Connection)(execute: DB => A): A = {
val db = DB(conn)
db.autoClose(false)
using(db)(execute)
}
def using[A](name: String)(execute: DB => A): A = {
val db = DB(ConnectionPool(Symbol(name)).borrow())
db.autoClose(false)
using(db)(execute)
}
Дополнительные детали см. в коде
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )