1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/supermy-kafka-spark-redis

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
README.md 10 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 28.02.2025 08:29 cc19218

mybigdata

Статус сборки

Введение

  • Калькулятор реального времени для больших данных с использованием Kafka + Spark + Redis
  • Убедитесь, что версия Scala, используемая в Spark, совпадает с версией Scala на вашей системе

Основные характеристики

  • Реальные-time вычисления
  • Вычисления с использованием окон
  • Spark вызывает Redis через singleton + pipeline, что повышает производительность в десять раз по сравнению с обычным режимом

Применимые случаи использования

  • Подсчёт количества временных отрезков
  • Отбор временных отрезков

Преимущества

  • Преимущества передачи большого объёма данных с помощью Kafka
  • Гибкость вычислений с использованием окон в Spark
  • Интеграция Redis для выполнения расчётов на основе памяти

Недостатки

  • Ограничение максимального объёма хранения Redis

Быстрый запуск

  1. Необходимо наличие среды JDK 8;
  2. Настройка среды Kafka;
  3. Запуск примера 1;
  4. Запуск примера 2;

Настройка среды Kafka

Установка

brew install kafka redis

Конфигурирование и тестирование производительности

1. Сначала запустите службу ZooKeeper, порт должен быть 2181.

2. kafka-server-start.sh /usr/local/etc/kafka/server.properties

3. Создайте тему
    Создание,
        kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    Получение,
        kafka-topics.sh --list --zookeeper localhost:2181

4. Отправьте несколько сообщений: введите сообщение в консоли
    kafka-console-producer.sh --broker-list localhost:9092 --topic test

5. Запустите потребителя: наблюдайте за появлением сообщений в консоли
    kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

Топик разделён на партиции и имеет реплики
1. Создайте debugo01, этот топик имеет три партиции и одну реплику (не реплицируется). Этот топик распределён между всеми брокерами. Ниже приведены команды управления, которые можно выполнять на любом узле Kafka.
    kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 1 --partitions 3 --topic debugo01
    kafka-topics.sh --create --zookeeper localhost --replication-factor 1 --partitions 3 --topic debugo01

2. Создайте debugo02, этот топик имеет одну партицию и три реплики (на каждом узле есть одна реплика). Этот топик распределён между всеми брокерами. Ниже приведены команды управления, которые можно выполнять на любом узле Kafka.
    Необходимы три узла
    kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 3 --partitions 1 --topic debugo02

3. Список всех топиков
    kafka-topics.sh --list --zookeeper localhost:2181

4. Детали топика
    kafka-topics.sh --describe --zookeeper localhost:2181 --topic debugo01

5. Проверьте каталог логов, для топика debugo01, debugo01 является первой партицией, а debugo02 — второй. Для топика debugo02 каждая реплика является первой партицией.

6. Создайте топик debugo03, количество реплик равно двум, а количество партиций равно трём. Тогда брокер с ID 1 будет содержать первую и вторую партиции.
    kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 2 --partitions 3 --topic debugo03
    
    kafka-topics.sh --describe --zookeeper localhost:2181 --topic debugo03
    ll /var/kafka/debugo03*

Создание и получение сообщений
kafka-console-producer.sh --broker-list debugo01:9092 --topic debugo03
kafka-console-consumer.sh --zookeeper debugo01:2181 --from-beginning --topic debugo03

7. Тестирование производительности
    Четыре потока
    Производство данных: 500 000 сообщений за 12 секунд, то есть 41 667 сообщений в секунду
    Потребление данных: 150 сообщений за 27 секунд, то есть 5 556 сообщений в секунду
    Используйте команду perf для тестирования производительности нескольких топиков. Сначала скачайте kafka-perf_2.10-0.8.1.1.jar и скопируйте его в директорию kafka/libs.
    500 000 сообщений размером 1000 байт каждый, размер партии 1000, топик debugo01, четыре потока (при слишком большом размере сообщений могут возникнуть проблемы с OOM). Закончилось за 13 секунд.
    
        kafka-producer-perf-test.sh --messages 500000 --message-size 1000  --batch-size 1000 --topics debugo01 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092
    
    Аналогично протестируйте топик debugo02, который имеет одну партицию и три реплики (replicas-factor=3), заняло 39 секунд. Поэтому увеличение количества партиций и брокерских потоков значительно повысит производительность.
        kafka-producer-perf-test.sh --messages 500000 --message-size 1000  --batch-size 1000 --topics debugo02 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092
    
    Аналогично протестируйте топик debugo03, заняло 30 секунд.
        kafka-producer-perf-test.sh --messages 500000 --message-size 1000  --batch-size 1000 --topics debugo03 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092    
    
        kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo01 --threads 3
        kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo02 --threads 3
        kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo03 --threads 3
    
    Одиночное тестирование
        kafka-producer-perf-test.sh --messages 500000 --message-size 1000  --batch-size 1000 --topics debugo01 --threads 4 --broker-list localhost:9092
        kafka-consumer-perf-test.sh --zookeeper localhost --messages 500000 --topic debugo01 --threads 3

Компиляция и запуск

mvn scala:compile

Использование

Пример 1: Производство данных, KafkaWordCountProducer, потребление данных, KafkaWordCount

spark-streaming-kafka пакет: поддерживает отправку данных в Kafka
//topic и случайное число используются для создания сообщений
val message = new ProducerRecord[String, String](topic, null, str)
//Отправка сообщения в Kafka
producer.send(message)


При запуске редактировать конфигурацию,
(1)KafkaWordCountProducer
Выбрать метод KafkaWordCountProducer в KafkaWordCount.scala
VM options установить как: -Dspark.master=local
Установить входные параметры программы: Program arguments: localhost:9092 test 3 5
Извлечение адреса из ZooKeeper для получения данных, данный способ был прекращен
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) //Создание потока и получение данных

Пример 2: Производство данных, KafkaEventProducer$, потребление данных, UserClickCountAnalytics$

Производство данных
val producer = new Producer[String, String](kafkaConfig)
producer.send(new KeyedMessage[String, String](topic, event.toString))

Потребление данных, текущий подход быстрый и эффективный при минимальных затратах ресурсов
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

Пример 3: Упаковка и отправка версии в рабочее окружение

source /etc/profile
grapher=`ps -ef | grep spark |grep SparkStreaming.jar | awk '{print $2}'`
echo $grapher
kill -9 $grapher
nohup /opt/modules/spark/bin/spark-submit \
--master spark://127.0.0.1:7077 \
--driver-memory 3g \
--executor-memory 3g \
--total-executor-cores 24 \
--conf spark.ui.port=56689 \
--jars /opt/bin/sparkJars/kafka_2.10-0.8.2.1.jar,/opt/bin/sparkJars/spark-streaming-kafka_2.10-1.4.1.jar,/opt/bin/sparkJars/metrics-core-2.2.0.jar,/opt/bin/sparkJars/mysql-connector-java-5.1.26-bin.jar,/opt/bin/sparkJars/spark-streaming-kafka_2.10-1.4.1.jar \
--class com.hexun.streaming.StockCntSumKafkaLPcnt \
/opt/bin/UDF/SparkStreaming.jar \
>/opt/bin/initservice/stock.log 2>&1 &

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/supermy-kafka-spark-redis.git
git@api.gitlife.ru:oschina-mirror/supermy-kafka-spark-redis.git
oschina-mirror
supermy-kafka-spark-redis
supermy-kafka-spark-redis
master