Spark-HTTP-Stream
Spark-HTTP-stream передаёт структурированный поток Spark по протоколу HTTP. В отличие от потоков TCP, потоков Kafka и потоков файлов HDFS, потоки HTTP часто проходят через распределённые кластеры больших данных в сети. Эта функция очень полезна для создания глобальных конвейеров обработки данных между различными центрами обработки данных (например, научными исследовательскими институтами), которые владеют отдельными наборами данных.
Spark-HTTP-поток предоставляет:
Также Spark-HTTP-поток обеспечивает:
Простая архитектура Spark-HTTP-потока показана ниже:
Импорт Spark-HTTP-потока
Используйте Maven для импорта Spark-HTTP-потока: com.github.bluejoe2008 spark-http-stream 0.9.1
Запуск автономного HttpStreamServer
HttpStreamServer фактически является сервером Jetty с HttpStreamServlet, его можно запустить с помощью следующего кода: val server = HttpStreamServer.start("/xxxx", 8080);
Когда запрашивается http://localhost:8080/xxxx, HttpStreamServlet будет использовать встроенный ActionsHandler для анализа сообщения запроса, выполнения определённого действия (fecthSchema, fetchStream и т. д.) и возврата ответного сообщения.
По умолчанию предоставляется NullActionsHandler. Конечно, его можно заменить на MemoryBufferAsReceiver: server.withBuffer() .addListener(new ObjectArrayPrinter()) .createTopic(String, Int, Boolean, Float, Double, Long, Byte) .createTopicString;
или на KafkaAsReceiver: server.withKafka("vm105:9092,vm106:9092,vm107:9092,vm181:9092,vm182:9092") .addListener(new ObjectArrayPrinter());
Как показано выше, в Spark-HTTP-потоке определены несколько видов ActionsHandler:
Следует отметить, что MemoryBufferAsReceiver поддерживает буфер сообщений на стороне сервера, а KafkaAsReceiver только перенаправляет сообщения в кластер Kafka.
HttpStreamSource, HttpStreamSink
Следующий код показывает, как загружать сообщения из HttpStreamSource: val lines = spark.readStream.format(classOf[HttpStreamSourceProvider].getName) .option("httpServletUrl", "http://localhost:8080/xxxx") .option("topic", "topic-1"); .option("includesTimestamp", "true") .load();
Параметры:
Следующий код показывает, как выводить сообщения в HttpStreamSink: val query = lines.writeStream .format(classOf[HttpStreamSinkProvider].getName) .option("httpServletUrl", "http://localhost:8080/xxxx") .option("topic", "topic-1") .start();
Параметры:
httpServletUrl — путь к серверу;
topic — название темы производимых сообщений;
maxPacketSize — максимальный размер в байтах каждого пакета сообщений, если фактический DataFrame слишком велик. subscriberId
unsubscribe
: отписаться
Обратите внимание, что некоторые методы доступны только в том случае, если сервер оснащён правильным ActionsHandler
. Например, KafkaAsReceiver
обрабатывает только действие actionSendStream
, это означает, что если вы вызвали методы fetchStream
и sendDataFrame
класса HttpStreamClient
, они будут работать корректно. Но при вызове метода subscribe
произойдёт ошибка и будет выброшено исключение UnsupportedActionException
.
Методы | MemoryBufferAsReceiver | KafkaAsReceiver |
---|---|---|
sendDataFrame | √ | √ |
sendRows | √ | √ |
fetchSchema | √ | X |
fecthStream | √ | X |
subscribe | √ | X |
unsubscribe | √ | X |
StreamListener
работает, когда поступают новые данные, которые будут использованы ActionsHandler
:
trait StreamListener {
def onArrive(topic: String, objects: Array[RowEx]);
}
Предоставляются два вида StreamListener
:
StreamCollector
: собирает данные в локальном буфере памяти;StreamPrinter
: печатает данные по мере поступления.Пример сообщений выглядит следующим образом:
++++++++topic=topic-1++++++++
RowEx([hello1,1,true,0.1,0.1,1,49],1,0,2017-08-27 20:37:56.432)
RowEx([hello2,2,false,0.2,0.2,2,50],1,1,2017-08-27 20:37:56.432)
RowEx([hello3,3,true,0.3,0.3,3,51],1,2,2017-08-27 20:37:56.432)
spark-http-stream поддерживает только те типы данных, которые могут быть распознаны кодировщиками Spark. Эти типы данных включают: String
, Boolean
, Int
, Long
, Float
, Double
, Byte
, Array[]
.
При получении строка будет заключена в объект RowEx
. RowEx
— это структура данных, более богатая, чем Row
. Она содержит несколько членов и методов:
originalRow
: исходная строка;batchId
: идентификатор пакета, переданный Spark;offsetInBatch
: смещение этой строки в текущем пакете;withTimestamp()
: возвращает строку с отметкой времени;withId()
: возвращает строку со своим идентификатором;extra()
: возвращает тройку (идентификатор пакета, смещение в пакете, отметка времени).Учитывая, что исходная строка имеет значения [hello1, 1, true, 0.1, 0.1, 1, 49], следующий код показывает содержимое упомянутых структур:
+---------------+-------+--------------+-----------+------------+--------+---------+
| String:hello1 | Int:1 | Boolean:true | Float:0.1 | Double:0.1 | Long:1 | Byte:49 |
+---------------+-------+--------------+-----------+------------+--------+---------+
+---------------+-------+--------------+-----++--------+-------+-------------------------------+
| String:hello1 | Int:1 | Boolean:true | ... || Long:1 | Int:0 | Timestamp:2017-08-27 20:37:56 |
+---------------+-------+--------------+-----++--------+-------+-------------------------------+
+---------------+-------+--------------+-----------+-----+-------------------------------+
| String:hello1 | Int:1 | Boolean:true | Float:0.1 | ... | Timestamp:2017-08-27 20:37:56 |
+---------------+-------+--------------+-----------+-----+-------------------------------+
+---------------+-------+--------------+-----------+------------+--------+---------+------------+
| String:hello1 | Int:1 | Boolean:true | Float:0.1 | Double:0.1 | Long:1 | Byte:49 | String:1-0 |
+---------------+-------+--------------+-----------+------------+--------+---------+------------+
+--------+-------+-------------------------------+
| Long:1 | Int:0 | Timestamp:2017-08-27 20:37:56 | ### SerDe
Spark-HTTP-Stream определяет SerializerFactory для создания SerializerInstance:
trait SerializerFactory {
def getSerializerInstance(serializerName: String): SerializerInstance;
}
Предоставляется объект SerializerFactory.DEFAULT, который может создавать два вида сериализаторов:
* java: создаёт JavaSerializer;
* kryo: создаёт KryoSerializer.
Приветствуется новый вид сериализатора, например, json-сериализатор.
По умолчанию HttpStreamClient и HttpStreamServer используют сериализатор kryo.
### Тесты
* HttpStreamServerClientTest: тестирует HttpStreamServer/Client, https://github.com/bluejoe2008/spark-http-stream/blob/master/src/test/scala/HttpStreamServerClientTest.scala.
* HttpStreamSourceSinkTest: тестирует HttpStreamSource и HttpStreamSink, https://github.com/bluejoe2008/spark-http-stream/blob/master/src/test/scala/HttpStreamSourceSinkTest.scala.
* HttpStreamKafkaTest: тестирует HttpStreamSink с Kafka в качестве основного получателя сообщений, https://github.com/bluejoe2008/spark-http-stream/blob/master/src/test/scala/HttpStreamKafkaTest.scala.
* HttpStreamDemo: инструмент помогает протестировать HttpTextStream и HttpTextSink, https://github.com/bluejoe2008/spark-http-stream/blob/master/src/test/scala/HttpStreamDemo.scala.
Шаги для тестирования HttpStreamDemo:
1. Выберите машину A, запустите `HttpStreamDemo start-server-on 8080 /xxxx`, это запустит HTTP-сервер, который получает данные от машины B.
2. Выберите машину B, запустите `nc -lk 9999`.
3. Запустите `HttpStreamDemo read-from http://machine-a-host:8080/xxxx` на машине B.
4. Запустите `HttpStreamDemo write-into http://machine-a-host:8080/xxxx` на машине C.
5. Введите текст в nc, данные будут получены HttpStreamSink и затем использованы как HttpStreamSource, наконец, отображены на консоли.
### Зависимости
* kafka-clients-0.10: используется KafkaAsReceiver.
* httpclient-4.5: HttpStreamClient использует проект HttpClient.
* jetty-9.0: HttpStreamServer разработан на основе Jetty.
* spark-2.1: библиотека структурированной потоковой передачи Spark.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )