1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/Python_Ai_Road-eat_tensorflow2_in_30_days

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Chapter6-7.md 13 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 03.12.2024 15:14 374a95e

6-7 Вызов модели TensorFlow с использованием spark-scala

В этом разделе рассказывается, как использовать обученную модель TensorFlow для прогнозирования в spark.

Предпосылкой этого раздела является фундаментальное знание spark и scala.

Проще использовать pyspark, поскольку требуется только загрузка модели с Python на каждом исполнителе и отдельное прогнозирование.

Для рассмотрения производительности версия spark в scala является наиболее популярной.

Раздел показывает, как использовать обученную модель TensorFlow в spark через TensorFlow для Java.

Можно выполнять прогнозирование с помощью обученной модели TensorFlow на сотнях тысяч компьютеров, используя функцию параллельных вычислений spark.

0 Использование модели TensorFlow в spark-scala

Необходимые шаги для прогнозирования с обученной моделью TensorFlow в spark (scala):

(1) Подготовка файла модели protobuf

(2) Создание проекта spark (scala), вставка зависимостей jar-пакета для TensorFlow в java.

(3) Загрузка модели TensorFlow на стороне драйвера проекта spark (scala) и её успешная отладка.

(4) Загрузка модели TensorFlow на исполнитель проекта spark (scala) через RDD и её успешная отладка.

(5) Загрузка модели TensorFlow на исполнителя проекта spark (scala) через Data и её успешная отладка.

1. Подготовка файла модели Protobuf

Здесь мы обучаем простую линейную регрессионную модель с помощью tf.keras и сохраняем её в виде файла protobuf.

import tensorflow as tf
from tensorflow.keras import models,layers,optimizers

## Количество образцов
n = 800

## Генерация тестового набора данных
X = tf.random.uniform([n,2],minval=-10,maxval=10) 
w0 = tf.constant([[2.0],[-1.0]])
b0 = tf.constant(3.0)

Y = X@w0 + b0 + tf.random.normal([n,1],mean = 0.0,stddev= 2.0)  # @ — умножение матриц; добавление гауссовского шума

## Моделирование
tf.keras.backend.clear_session()
inputs = layers.Input(shape = (2,),name ="inputs") # Установить имя ввода как "входы"
outputs = layers.Dense(1, name = "outputs")(inputs) # Установить выходное имя как "выходы"
linear = models.Model(inputs = inputs,outputs = outputs)
linear.summary()

## Обучение с методом подгонки
linear.compile(optimizer="rmsprop",loss="mse",metrics=["mae"])
linear.fit(X,Y,batch_size = 8,epochs = 100)  

tf.print("w = ",linear.layers[1].kernel)
tf.print("b = ",linear.layers[1].bias)

## Сохранить модель в формате pb
export_path = "../data/linear_model/"
version = "1"       # Версия может использоваться для управления дальнейшими обновлениями
linear.save(export_path+version, save_format="tf") 
!ls {export_path+version}
# Проверить информацию о файле модели
!saved_model_cli show --dir {export_path+str(version)} --all

Информация о файле модели, отмеченная красным, может быть использована позже.

2. Создайте проект spark (scala), вставьте зависимости jar-пакетов для TensorFlow в java

Необходимо добавить следующую зависимость jar-пакета, если использовать maven для управления проектами.

<!-- https://mvnrepository.com/artifact/org.tensorflow/tensorflow -->
<dependency>
    <groupId>org.tensorflow</groupId>
    <artifactId>tensorflow</artifactId>
    <version>1.15.0</version>
</dependency>

Вы также можете скачать jar-пакет org.tensorflow.tensorflow, вместе с зависимыми org.tensorflow.libtensorflow и org.tensorflowlibtensorflow_jni по следующей ссылке, а затем добавить их все в проект. https://mvnrepository.com/artifact/org.tensorflow/tensorflow/1.15.0

3. Загрузка модели TensorFlow на стороне драйвера проекта spark (scala) и успешная её отладка

Следующая демонстрация выполняется в jupyter notebook. Нам нужно установить toree, чтобы он поддерживал spark(scala). ``` .load("/Users/liangyun/CodeFiles/eat_tensorflow2_in_30_days/data/linear_model/1","serve")

//Примечание: для версии Java TensorFlow использует статический граф, как в TensorFlow 1.X, т. е. используйте Session, затем явные данные для подачи и результаты для получения, и, наконец, запустите его. //Примечание: можно последовательно использовать несколько методов подачи, когда нам нужно подать несколько данных. //Примечание: входные данные должны быть типа float

val sess = bundle.session() val x = tf.Tensor.create(Array(Array(1.0f,2.0f),Array(2.0f,3.0f))) val y = sess.runner().feed("serving_default_inputs:0", x) .fetch("StatefulPartitionedCall:0").run().get(0)

val result = Array.ofDimFloat y.copyTo(result)

if(x != null) x.close() if(y != null) y.close() if(sess != null) sess.close() if(bundle != null) bundle.close()

result

Результат:

Array(Array(3.019596), Array(3.9878292))
![](../data/TfDriver.png)

4. Загрузка модели TensorFlow в исполнитель проекта spark (scala) через RDD и её успешная отладка

Здесь мы передаём модель TensorFlow, загруженную на стороне драйвера, каждому исполнителю посредством широковещательной рассылки и осуществляем прогнозирование с помощью распределённых вычислений на всех исполнителях.

import org.apache.spark.sql.SparkSession
import scala.collection.mutable.WrappedArray
import org.{tensorflow=>tf}

val spark = SparkSession
    .builder()
    .appName("TfRDD")
    .enableHiveSupport()
    .getOrCreate()

val sc = spark.sparkContext

// Загрузка модели на стороне драйвера
val bundle = tf.SavedModelBundle 
   .load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1","serve")

// Широковещательная рассылка модели всем исполнителям
val broads = sc.broadcast(bundle)

// Создание набора данных
val rdd_data = sc.makeRDD(List(Array(1.0f,2.0f),Array(3.0f,5.0f),Array(6.0f,7.0f),Array(8.0f,3.0f)))

// Прогнозирование в пакетном режиме с использованием модели через mapPartitions
val rdd_result = rdd_data.mapPartitions(iter => {
    
    val arr = iter.toArray
    val model = broads.value
    val sess = model.session()
    val x = tf.Tensor.create(arr)
    val y =  sess.runner().feed("serving_default_inputs:0", x)
             .fetch("StatefulPartitionedCall:0").run().get(0)

    // Копирование прогноза в массив типа Float с той же формой
    val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
    y.copyTo(result)
    result.iterator
    
})

rdd_result.take(5)
bundle.close
<!-- #endregion -->

Результат:

Array(Array(3.019596), Array(3.9264367), Array(7.8607616), Array(15.974984))
![](../data/TfRDD.png)

5. Загрузка модели TensorFlow в исполнитель проекта spark (scala) через Data и её успешная отладка

Распределённое прогнозирование с использованием модели TensorFlow также может быть реализовано на данных DataFrame, помимо реализации на данных RDD в Spark. Это может быть сделано путём регистрации метода прогнозирования в качестве функции sparkSQL.

import org.apache.spark.sql.SparkSession
import scala.collection.mutable.WrappedArray
import org.{tensorflow=>tf}

object TfDataFrame extends Serializable{
    
    def main(args:Array[String]):Unit = {
        
        val spark = SparkSession
        .builder()
        .appName("TfDataFrame")
        .enableHiveSupport()
        .getOrCreate()
        val sc = spark.sparkContext
        
        import spark.implicits._

        val bundle = tf.SavedModelBundle 
           .load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1","serve")

        val broads = sc.broadcast(bundle)
        
        // Создаем функцию прогнозирования и регистрируем ее как udf в sparkSQL
        val tfpredict = (features:WrappedArray[Float])  => {
            val bund = broads.value
            val sess = bund.session()
            val x = tf.Tensor.create(Array(features.toArray))
            val y =  sess.runner().feed("serving_default_inputs:0", x)
                   .fetch("StatefulPartitionedCall:0").run().get(0)
``` ```
val result = Array.ofDim[Float](y.shape()(0).toInt, y.shape()(1).toInt)
y.copyTo(result)
val y_pred = result(0)(0)
y_pred
}
spark.udf.register("tfpredict", tfpredict)

// Creating DataFrame dataset, and put the features into one of the columns
val dfdata = sc.parallelize(List(Array(1.0f, 2.0f), Array(3.0f, 5.0f), Array(7.0f, 8.0f))).toDF("features")
dfdata.show

// Call the sparkSQL predicting function, add a new column as y_preds
val dfresult = dfdata.selectExpr("features", "tfpredict(features) as y_preds")
dfresult.show
bundle.close
}
}
+----------+
|  features|
+----------+
|[1.0, 2.0]|
|[3.0, 5.0]|
|[7.0, 8.0]|
+----------+

+----------+---------+
|  features|  y_preds|
+----------+---------+
|[1.0, 2.0]| 3.019596|
|[3.0, 5.0]|3.9264367|
|[7.0, 8.0]| 8.828995|
+----------+---------+

Мы реализовали распределённое прогнозирование с использованием модели линейной регрессии (реализованной с помощью tf.keras), используя структуры данных RDD и DataFrame в spark.

Также можно использовать обученные нейронные сети для распределённого прогнозирования через spark с небольшими изменениями в этой демонстрации.

На самом деле возможности TensorFlow больше, чем просто реализация нейронных сетей; низкоуровневый язык графов способен выражать все виды численных вычислений.

Мы можем реализовать любой тип модели машинного обучения на TensorFlow 2.0 с помощью этих различных низкоуровневых API.

Также возможно экспортировать обученные модели в виде файлов и использовать их в распределённой системе, такой как spark, что открывает огромные возможности для будущих приложений.

Пожалуйста, оставляйте комментарии в официальном аккаунте WeChat «Python与算法之美» (Elegance of Python and Algorithms), если вы хотите пообщаться с автором о содержании. Автор постарается ответить, учитывая ограниченное время.

Вы также можете присоединиться к групповому чату с другими читателями, ответив 加群 (join group) в официальном аккаунте WeChat.

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/Python_Ai_Road-eat_tensorflow2_in_30_days.git
git@api.gitlife.ru:oschina-mirror/Python_Ai_Road-eat_tensorflow2_in_30_days.git
oschina-mirror
Python_Ai_Road-eat_tensorflow2_in_30_days
Python_Ai_Road-eat_tensorflow2_in_30_days
master