В этом разделе рассказывается, как использовать обученную модель TensorFlow для прогнозирования в spark.
Предпосылкой этого раздела является фундаментальное знание spark и scala.
Проще использовать pyspark, поскольку требуется только загрузка модели с Python на каждом исполнителе и отдельное прогнозирование.
Для рассмотрения производительности версия spark в scala является наиболее популярной.
Раздел показывает, как использовать обученную модель TensorFlow в spark через TensorFlow для Java.
Можно выполнять прогнозирование с помощью обученной модели TensorFlow на сотнях тысяч компьютеров, используя функцию параллельных вычислений spark.
Необходимые шаги для прогнозирования с обученной моделью TensorFlow в spark (scala):
(1) Подготовка файла модели protobuf
(2) Создание проекта spark (scala), вставка зависимостей jar-пакета для TensorFlow в java.
(3) Загрузка модели TensorFlow на стороне драйвера проекта spark (scala) и её успешная отладка.
(4) Загрузка модели TensorFlow на исполнитель проекта spark (scala) через RDD и её успешная отладка.
(5) Загрузка модели TensorFlow на исполнителя проекта spark (scala) через Data и её успешная отладка.
Здесь мы обучаем простую линейную регрессионную модель с помощью tf.keras
и сохраняем её в виде файла protobuf.
import tensorflow as tf
from tensorflow.keras import models,layers,optimizers
## Количество образцов
n = 800
## Генерация тестового набора данных
X = tf.random.uniform([n,2],minval=-10,maxval=10)
w0 = tf.constant([[2.0],[-1.0]])
b0 = tf.constant(3.0)
Y = X@w0 + b0 + tf.random.normal([n,1],mean = 0.0,stddev= 2.0) # @ — умножение матриц; добавление гауссовского шума
## Моделирование
tf.keras.backend.clear_session()
inputs = layers.Input(shape = (2,),name ="inputs") # Установить имя ввода как "входы"
outputs = layers.Dense(1, name = "outputs")(inputs) # Установить выходное имя как "выходы"
linear = models.Model(inputs = inputs,outputs = outputs)
linear.summary()
## Обучение с методом подгонки
linear.compile(optimizer="rmsprop",loss="mse",metrics=["mae"])
linear.fit(X,Y,batch_size = 8,epochs = 100)
tf.print("w = ",linear.layers[1].kernel)
tf.print("b = ",linear.layers[1].bias)
## Сохранить модель в формате pb
export_path = "../data/linear_model/"
version = "1" # Версия может использоваться для управления дальнейшими обновлениями
linear.save(export_path+version, save_format="tf")
!ls {export_path+version}
# Проверить информацию о файле модели
!saved_model_cli show --dir {export_path+str(version)} --all
Информация о файле модели, отмеченная красным, может быть использована позже.
Необходимо добавить следующую зависимость jar-пакета, если использовать maven для управления проектами.
<!-- https://mvnrepository.com/artifact/org.tensorflow/tensorflow -->
<dependency>
<groupId>org.tensorflow</groupId>
<artifactId>tensorflow</artifactId>
<version>1.15.0</version>
</dependency>
Вы также можете скачать jar-пакет org.tensorflow.tensorflow
, вместе с зависимыми org.tensorflow.libtensorflow
и org.tensorflowlibtensorflow_jni
по следующей ссылке, а затем добавить их все в проект.
https://mvnrepository.com/artifact/org.tensorflow/tensorflow/1.15.0
Следующая демонстрация выполняется в jupyter notebook. Нам нужно установить toree, чтобы он поддерживал spark(scala). ``` .load("/Users/liangyun/CodeFiles/eat_tensorflow2_in_30_days/data/linear_model/1","serve")
//Примечание: для версии Java TensorFlow использует статический граф, как в TensorFlow 1.X, т. е. используйте Session
, затем явные данные для подачи и результаты для получения, и, наконец, запустите его.
//Примечание: можно последовательно использовать несколько методов подачи, когда нам нужно подать несколько данных.
//Примечание: входные данные должны быть типа float
val sess = bundle.session() val x = tf.Tensor.create(Array(Array(1.0f,2.0f),Array(2.0f,3.0f))) val y = sess.runner().feed("serving_default_inputs:0", x) .fetch("StatefulPartitionedCall:0").run().get(0)
val result = Array.ofDimFloat y.copyTo(result)
if(x != null) x.close() if(y != null) y.close() if(sess != null) sess.close() if(bundle != null) bundle.close()
result
Результат:
Array(Array(3.019596), Array(3.9878292))

Здесь мы передаём модель TensorFlow, загруженную на стороне драйвера, каждому исполнителю посредством широковещательной рассылки и осуществляем прогнозирование с помощью распределённых вычислений на всех исполнителях.
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.WrappedArray
import org.{tensorflow=>tf}
val spark = SparkSession
.builder()
.appName("TfRDD")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
// Загрузка модели на стороне драйвера
val bundle = tf.SavedModelBundle
.load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1","serve")
// Широковещательная рассылка модели всем исполнителям
val broads = sc.broadcast(bundle)
// Создание набора данных
val rdd_data = sc.makeRDD(List(Array(1.0f,2.0f),Array(3.0f,5.0f),Array(6.0f,7.0f),Array(8.0f,3.0f)))
// Прогнозирование в пакетном режиме с использованием модели через mapPartitions
val rdd_result = rdd_data.mapPartitions(iter => {
val arr = iter.toArray
val model = broads.value
val sess = model.session()
val x = tf.Tensor.create(arr)
val y = sess.runner().feed("serving_default_inputs:0", x)
.fetch("StatefulPartitionedCall:0").run().get(0)
// Копирование прогноза в массив типа Float с той же формой
val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
y.copyTo(result)
result.iterator
})
rdd_result.take(5)
bundle.close
<!-- #endregion -->
Результат:
Array(Array(3.019596), Array(3.9264367), Array(7.8607616), Array(15.974984))

Распределённое прогнозирование с использованием модели TensorFlow также может быть реализовано на данных DataFrame, помимо реализации на данных RDD в Spark. Это может быть сделано путём регистрации метода прогнозирования в качестве функции sparkSQL.
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.WrappedArray
import org.{tensorflow=>tf}
object TfDataFrame extends Serializable{
def main(args:Array[String]):Unit = {
val spark = SparkSession
.builder()
.appName("TfDataFrame")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val bundle = tf.SavedModelBundle
.load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1","serve")
val broads = sc.broadcast(bundle)
// Создаем функцию прогнозирования и регистрируем ее как udf в sparkSQL
val tfpredict = (features:WrappedArray[Float]) => {
val bund = broads.value
val sess = bund.session()
val x = tf.Tensor.create(Array(features.toArray))
val y = sess.runner().feed("serving_default_inputs:0", x)
.fetch("StatefulPartitionedCall:0").run().get(0)
``` ```
val result = Array.ofDim[Float](y.shape()(0).toInt, y.shape()(1).toInt)
y.copyTo(result)
val y_pred = result(0)(0)
y_pred
}
spark.udf.register("tfpredict", tfpredict)
// Creating DataFrame dataset, and put the features into one of the columns
val dfdata = sc.parallelize(List(Array(1.0f, 2.0f), Array(3.0f, 5.0f), Array(7.0f, 8.0f))).toDF("features")
dfdata.show
// Call the sparkSQL predicting function, add a new column as y_preds
val dfresult = dfdata.selectExpr("features", "tfpredict(features) as y_preds")
dfresult.show
bundle.close
}
}
+----------+
| features|
+----------+
|[1.0, 2.0]|
|[3.0, 5.0]|
|[7.0, 8.0]|
+----------+
+----------+---------+
| features| y_preds|
+----------+---------+
|[1.0, 2.0]| 3.019596|
|[3.0, 5.0]|3.9264367|
|[7.0, 8.0]| 8.828995|
+----------+---------+
Мы реализовали распределённое прогнозирование с использованием модели линейной регрессии (реализованной с помощью tf.keras
), используя структуры данных RDD и DataFrame в spark.
Также можно использовать обученные нейронные сети для распределённого прогнозирования через spark с небольшими изменениями в этой демонстрации.
На самом деле возможности TensorFlow больше, чем просто реализация нейронных сетей; низкоуровневый язык графов способен выражать все виды численных вычислений.
Мы можем реализовать любой тип модели машинного обучения на TensorFlow 2.0 с помощью этих различных низкоуровневых API.
Также возможно экспортировать обученные модели в виде файлов и использовать их в распределённой системе, такой как spark, что открывает огромные возможности для будущих приложений.
Пожалуйста, оставляйте комментарии в официальном аккаунте WeChat «Python与算法之美» (Elegance of Python and Algorithms), если вы хотите пообщаться с автором о содержании. Автор постарается ответить, учитывая ограниченное время.
Вы также можете присоединиться к групповому чату с другими читателями, ответив 加群 (join group) в официальном аккаунте WeChat.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )