1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/bluejoe-spark-http-stream

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

Spark-HTTP-Stream

Spark-HTTP-stream передаёт структурированный поток Spark по протоколу HTTP. В отличие от потоков TCP, потоков Kafka и потоков файлов HDFS, потоки HTTP часто проходят через распределённые кластеры больших данных в сети. Эта функция очень полезна для создания глобальных конвейеров обработки данных между различными центрами обработки данных (например, научными исследовательскими институтами), которые владеют отдельными наборами данных.

Spark-HTTP-поток предоставляет:

  • HttpStreamServer — сервер HTTP, который получает, собирает и предоставляет потоки HTTP;
  • HttpStreamSource — считывает сообщения с HttpStreamServer и действует как структурированный источник потоковой передачи;
  • HttpStreamSink — отправляет сообщения на HttpStreamServer с помощью команд HTTP-POST и действует как структурированный приёмник потоковой передачи.

Также Spark-HTTP-поток обеспечивает:

  • HttpStreamClient — клиент, используемый для связи с HttpStreamServer, разработанный на основе HttpClient;
  • HttpStreamSourceProvider — StreamSourceProvider, который создаёт HttpStreamSource;
  • HttpStreamSinkProvider — StreamSinkProvider, который создаёт HttpStreamSink.

Простая архитектура Spark-HTTP-потока показана ниже:

Импорт Spark-HTTP-потока

Используйте Maven для импорта Spark-HTTP-потока: com.github.bluejoe2008 spark-http-stream 0.9.1

Запуск автономного HttpStreamServer

HttpStreamServer фактически является сервером Jetty с HttpStreamServlet, его можно запустить с помощью следующего кода: val server = HttpStreamServer.start("/xxxx", 8080);

Когда запрашивается http://localhost:8080/xxxx, HttpStreamServlet будет использовать встроенный ActionsHandler для анализа сообщения запроса, выполнения определённого действия (fecthSchema, fetchStream и т. д.) и возврата ответного сообщения.

По умолчанию предоставляется NullActionsHandler. Конечно, его можно заменить на MemoryBufferAsReceiver: server.withBuffer() .addListener(new ObjectArrayPrinter()) .createTopic(String, Int, Boolean, Float, Double, Long, Byte) .createTopicString;

или на KafkaAsReceiver: server.withKafka("vm105:9092,vm106:9092,vm107:9092,vm181:9092,vm182:9092") .addListener(new ObjectArrayPrinter());

Как показано выше, в Spark-HTTP-потоке определены несколько видов ActionsHandler:

  • NullActionsHandler — ничего не делает;
  • MemoryBufferAsReceiver — поддерживает локальный буфер памяти, сохраняет данные, отправленные производителями в буфер, и позволяет потребителям извлекать данные пакетами;
  • KafkaAsReceiver — перенаправляет все полученные данные в Kafka.

Следует отметить, что MemoryBufferAsReceiver поддерживает буфер сообщений на стороне сервера, а KafkaAsReceiver только перенаправляет сообщения в кластер Kafka.

HttpStreamSource, HttpStreamSink

Следующий код показывает, как загружать сообщения из HttpStreamSource: val lines = spark.readStream.format(classOf[HttpStreamSourceProvider].getName) .option("httpServletUrl", "http://localhost:8080/xxxx") .option("topic", "topic-1"); .option("includesTimestamp", "true") .load();

Параметры:

  • httpServletUrl — путь к сервлету;
  • topic — тема сообщений, которые будут использоваться;
  • includesTimestamp — указывает, содержит ли каждая строка загруженного DataFrame временную метку или нет, значение по умолчанию — false;
  • timestampColumnName — имя, назначенное столбцу временной метки, значение по умолчанию — «_TIMESTAMP_»;
  • msFetchPeriod — интервал времени в миллисекундах для опроса сообщений, значение по умолчанию — 1 (1 мс).

Следующий код показывает, как выводить сообщения в HttpStreamSink: val query = lines.writeStream .format(classOf[HttpStreamSinkProvider].getName) .option("httpServletUrl", "http://localhost:8080/xxxx") .option("topic", "topic-1") .start();

Параметры:

  • httpServletUrl — путь к серверу;

  • topic — название темы производимых сообщений;

  • maxPacketSize — максимальный размер в байтах каждого пакета сообщений, если фактический DataFrame слишком велик. subscriberId

  • unsubscribe: отписаться

Обратите внимание, что некоторые методы доступны только в том случае, если сервер оснащён правильным ActionsHandler. Например, KafkaAsReceiver обрабатывает только действие actionSendStream, это означает, что если вы вызвали методы fetchStream и sendDataFrame класса HttpStreamClient, они будут работать корректно. Но при вызове метода subscribe произойдёт ошибка и будет выброшено исключение UnsupportedActionException.

Методы MemoryBufferAsReceiver KafkaAsReceiver
sendDataFrame
sendRows
fetchSchema X
fecthStream X
subscribe X
unsubscribe X

StreamListener

StreamListener работает, когда поступают новые данные, которые будут использованы ActionsHandler:

trait StreamListener {
    def onArrive(topic: String, objects: Array[RowEx]);
}

Предоставляются два вида StreamListener:

  • StreamCollector: собирает данные в локальном буфере памяти;
  • StreamPrinter: печатает данные по мере поступления.

Пример сообщений выглядит следующим образом:

++++++++topic=topic-1++++++++
RowEx([hello1,1,true,0.1,0.1,1,49],1,0,2017-08-27 20:37:56.432)
RowEx([hello2,2,false,0.2,0.2,2,50],1,1,2017-08-27 20:37:56.432)
RowEx([hello3,3,true,0.3,0.3,3,51],1,2,2017-08-27 20:37:56.432)

Схема, типы данных, RowEx

spark-http-stream поддерживает только те типы данных, которые могут быть распознаны кодировщиками Spark. Эти типы данных включают: String, Boolean, Int, Long, Float, Double, Byte, Array[].

При получении строка будет заключена в объект RowEx. RowEx — это структура данных, более богатая, чем Row. Она содержит несколько членов и методов:

  • originalRow: исходная строка;
  • batchId: идентификатор пакета, переданный Spark;
  • offsetInBatch: смещение этой строки в текущем пакете;
  • withTimestamp(): возвращает строку с отметкой времени;
  • withId(): возвращает строку со своим идентификатором;
  • extra(): возвращает тройку (идентификатор пакета, смещение в пакете, отметка времени).

Учитывая, что исходная строка имеет значения [hello1, 1, true, 0.1, 0.1, 1, 49], следующий код показывает содержимое упомянутых структур:

originalRow:

+---------------+-------+--------------+-----------+------------+--------+---------+
| String:hello1 | Int:1 | Boolean:true | Float:0.1 | Double:0.1 | Long:1 | Byte:49 | 
+---------------+-------+--------------+-----------+------------+--------+---------+

RowEx:

+---------------+-------+--------------+-----++--------+-------+-------------------------------+
| String:hello1 | Int:1 | Boolean:true | ... || Long:1 | Int:0 | Timestamp:2017-08-27 20:37:56 |
+---------------+-------+--------------+-----++--------+-------+-------------------------------+

withTimestamp():

+---------------+-------+--------------+-----------+-----+-------------------------------+
| String:hello1 | Int:1 | Boolean:true | Float:0.1 | ... | Timestamp:2017-08-27 20:37:56 |
+---------------+-------+--------------+-----------+-----+-------------------------------+

withId():

+---------------+-------+--------------+-----------+------------+--------+---------+------------+
| String:hello1 | Int:1 | Boolean:true | Float:0.1 | Double:0.1 | Long:1 | Byte:49 | String:1-0 |
+---------------+-------+--------------+-----------+------------+--------+---------+------------+

extra():

+--------+-------+-------------------------------+
| Long:1 | Int:0 | Timestamp:2017-08-27 20:37:56 | ### SerDe

Spark-HTTP-Stream определяет SerializerFactory для создания SerializerInstance:

    trait SerializerFactory {
        def getSerializerInstance(serializerName: String): SerializerInstance;
    }

Предоставляется объект SerializerFactory.DEFAULT, который может создавать два вида сериализаторов:

* java: создаёт JavaSerializer;
* kryo: создаёт KryoSerializer.

Приветствуется новый вид сериализатора, например, json-сериализатор.

По умолчанию HttpStreamClient и HttpStreamServer используют сериализатор kryo.

### Тесты

* HttpStreamServerClientTest: тестирует HttpStreamServer/Client, https://github.com/bluejoe2008/spark-http-stream/blob/master/src/test/scala/HttpStreamServerClientTest.scala.
* HttpStreamSourceSinkTest: тестирует HttpStreamSource и HttpStreamSink, https://github.com/bluejoe2008/spark-http-stream/blob/master/src/test/scala/HttpStreamSourceSinkTest.scala.
* HttpStreamKafkaTest: тестирует HttpStreamSink с Kafka в качестве основного получателя сообщений, https://github.com/bluejoe2008/spark-http-stream/blob/master/src/test/scala/HttpStreamKafkaTest.scala.
* HttpStreamDemo: инструмент помогает протестировать HttpTextStream и HttpTextSink, https://github.com/bluejoe2008/spark-http-stream/blob/master/src/test/scala/HttpStreamDemo.scala.

Шаги для тестирования HttpStreamDemo:

1. Выберите машину A, запустите `HttpStreamDemo start-server-on 8080 /xxxx`, это запустит HTTP-сервер, который получает данные от машины B.
2. Выберите машину B, запустите `nc -lk 9999`.
3. Запустите `HttpStreamDemo read-from http://machine-a-host:8080/xxxx` на машине B.
4. Запустите `HttpStreamDemo write-into http://machine-a-host:8080/xxxx` на машине C.
5. Введите текст в nc, данные будут получены HttpStreamSink и затем использованы как HttpStreamSource, наконец, отображены на консоли.

### Зависимости

* kafka-clients-0.10: используется KafkaAsReceiver.
* httpclient-4.5: HttpStreamClient использует проект HttpClient.
* jetty-9.0: HttpStreamServer разработан на основе Jetty.
* spark-2.1: библиотека структурированной потоковой передачи Spark.

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Реализация источника и приёмника потоковой передачи Spark на основе HTTP. Развернуть Свернуть
BSD-2-Clause
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/bluejoe-spark-http-stream.git
git@api.gitlife.ru:oschina-mirror/bluejoe-spark-http-stream.git
oschina-mirror
bluejoe-spark-http-stream
bluejoe-spark-http-stream
master