Слияние кода завершено, страница обновится автоматически
brew install kafka redis
1. Сначала запустите службу ZooKeeper, порт должен быть 2181.
2. kafka-server-start.sh /usr/local/etc/kafka/server.properties
3. Создайте тему
Создание,
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Получение,
kafka-topics.sh --list --zookeeper localhost:2181
4. Отправьте несколько сообщений: введите сообщение в консоли
kafka-console-producer.sh --broker-list localhost:9092 --topic test
5. Запустите потребителя: наблюдайте за появлением сообщений в консоли
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Топик разделён на партиции и имеет реплики
1. Создайте debugo01, этот топик имеет три партиции и одну реплику (не реплицируется). Этот топик распределён между всеми брокерами. Ниже приведены команды управления, которые можно выполнять на любом узле Kafka.
kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 1 --partitions 3 --topic debugo01
kafka-topics.sh --create --zookeeper localhost --replication-factor 1 --partitions 3 --topic debugo01
2. Создайте debugo02, этот топик имеет одну партицию и три реплики (на каждом узле есть одна реплика). Этот топик распределён между всеми брокерами. Ниже приведены команды управления, которые можно выполнять на любом узле Kafka.
Необходимы три узла
kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 3 --partitions 1 --topic debugo02
3. Список всех топиков
kafka-topics.sh --list --zookeeper localhost:2181
4. Детали топика
kafka-topics.sh --describe --zookeeper localhost:2181 --topic debugo01
5. Проверьте каталог логов, для топика debugo01, debugo01 является первой партицией, а debugo02 — второй. Для топика debugo02 каждая реплика является первой партицией.
6. Создайте топик debugo03, количество реплик равно двум, а количество партиций равно трём. Тогда брокер с ID 1 будет содержать первую и вторую партиции.
kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 2 --partitions 3 --topic debugo03
kafka-topics.sh --describe --zookeeper localhost:2181 --topic debugo03
ll /var/kafka/debugo03*
Создание и получение сообщений
kafka-console-producer.sh --broker-list debugo01:9092 --topic debugo03
kafka-console-consumer.sh --zookeeper debugo01:2181 --from-beginning --topic debugo03
7. Тестирование производительности
Четыре потока
Производство данных: 500 000 сообщений за 12 секунд, то есть 41 667 сообщений в секунду
Потребление данных: 150 сообщений за 27 секунд, то есть 5 556 сообщений в секунду
Используйте команду perf для тестирования производительности нескольких топиков. Сначала скачайте kafka-perf_2.10-0.8.1.1.jar и скопируйте его в директорию kafka/libs.
500 000 сообщений размером 1000 байт каждый, размер партии 1000, топик debugo01, четыре потока (при слишком большом размере сообщений могут возникнуть проблемы с OOM). Закончилось за 13 секунд.
kafka-producer-perf-test.sh --messages 500000 --message-size 1000 --batch-size 1000 --topics debugo01 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092
Аналогично протестируйте топик debugo02, который имеет одну партицию и три реплики (replicas-factor=3), заняло 39 секунд. Поэтому увеличение количества партиций и брокерских потоков значительно повысит производительность.
kafka-producer-perf-test.sh --messages 500000 --message-size 1000 --batch-size 1000 --topics debugo02 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092
Аналогично протестируйте топик debugo03, заняло 30 секунд.
kafka-producer-perf-test.sh --messages 500000 --message-size 1000 --batch-size 1000 --topics debugo03 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092
kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo01 --threads 3
kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo02 --threads 3
kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo03 --threads 3
Одиночное тестирование
kafka-producer-perf-test.sh --messages 500000 --message-size 1000 --batch-size 1000 --topics debugo01 --threads 4 --broker-list localhost:9092
kafka-consumer-perf-test.sh --zookeeper localhost --messages 500000 --topic debugo01 --threads 3
mvn scala:compile
spark-streaming-kafka пакет: поддерживает отправку данных в Kafka
//topic и случайное число используются для создания сообщений
val message = new ProducerRecord[String, String](topic, null, str)
//Отправка сообщения в Kafka
producer.send(message)
При запуске редактировать конфигурацию,
(1)KafkaWordCountProducer
Выбрать метод KafkaWordCountProducer в KafkaWordCount.scala
VM options установить как: -Dspark.master=local
Установить входные параметры программы: Program arguments: localhost:9092 test 3 5
Извлечение адреса из ZooKeeper для получения данных, данный способ был прекращен
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) //Создание потока и получение данных
Производство данных
val producer = new Producer[String, String](kafkaConfig)
producer.send(new KeyedMessage[String, String](topic, event.toString))
Потребление данных, текущий подход быстрый и эффективный при минимальных затратах ресурсов
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
source /etc/profile
grapher=`ps -ef | grep spark |grep SparkStreaming.jar | awk '{print $2}'`
echo $grapher
kill -9 $grapher
nohup /opt/modules/spark/bin/spark-submit \
--master spark://127.0.0.1:7077 \
--driver-memory 3g \
--executor-memory 3g \
--total-executor-cores 24 \
--conf spark.ui.port=56689 \
--jars /opt/bin/sparkJars/kafka_2.10-0.8.2.1.jar,/opt/bin/sparkJars/spark-streaming-kafka_2.10-1.4.1.jar,/opt/bin/sparkJars/metrics-core-2.2.0.jar,/opt/bin/sparkJars/mysql-connector-java-5.1.26-bin.jar,/opt/bin/sparkJars/spark-streaming-kafka_2.10-1.4.1.jar \
--class com.hexun.streaming.StockCntSumKafkaLPcnt \
/opt/bin/UDF/SparkStreaming.jar \
>/opt/bin/initservice/stock.log 2>&1 &
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )