2017-07-14
Пресс-тестирование flume-redis
JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
export JAVA_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545
2017-07-05
Синхронизация данных
oracle2rabbitmq и преобразование данных в формат json
rabbitmq2mongodb хранение данных в mongodb
2017-07-04
Использование netcat для захвата данных, хранение в mongo, поддержка формата json
Sink поддерживает некоторые заголовки в динамической модели:
'db': имя базы данных
'collection': имя коллекции
По умолчанию имя базы данных events; имя базы данных можно задать с помощью заголовков
По умолчанию имя коллекции events; имя коллекции можно задать с помощью заголовков
Формат данных json
telnet >{"a":1}
> db.events.find()
{ "_id" : ObjectId("595b0030e4b074bb69ee49b2"), "a" : 1 }
2017-07-03
Добавление фильтрации по дате данных
Пользовательские данные сохраняются в течение 3 дней;
Добавление фильтрации для данных журнала
2017-07-01
Тестирование Redis Sink в кластере
Тестирование 3A RedisClusterZSetSink завершено, запущено на производственном сервере;
Тестирование Log RedisClusterZrangeByScoreSink завершено, зависит от завершения тестирования 3A;
Тестирование Source RedisClusterSource завершено, конфигурация flume-redis2log.conf
2017-06-28
Оптимизация производительности Анализ slowlog get показал, что вторичное внесение данных в Redis занимает много времени; возможно, стоит выполнить это напрямую в скрипте? Замена на выполнение скрипта напрямую в Redis; исключение возврата данных и записи данных, проверка эффективности;
2017-06-27
Изменение пути хранения Redis, динамическое изменение;
127.0.0.1:6379> config set dir /disk01/redisdata/
127.0.0.1:6379> save
2017-06-26
Обновление до последней версии движка; результат "very good"; остановите кодовый движок; Обновите JAR-пакет; Обновите конфигурацию flume-taildir2redis.conf и загрузите её на сервер; com.supermy.redis.flume.redis.sink.RedisZSetSink Обновите Groovy-скрипты фильтра convert; проверка скриптов завершена; /etc/flume/conf/g-netuser-filter.groovy /etc/flume/conf/g-netuser-zset-search-replace.groovy Тестирование; Проверка логов; Проверка Redis; *** Новая версия мощная и динамичная; встроенный pipeline 2w line работает намного быстрее, чем eval ***
Начало развертывания версии для обработки логов; использование файловых каналов; утраченные данные можно восстановить; Локальное тестирование верификации; конфигурационный файл Flume config динамически загружается; при каждом обновлении происходит перезагрузка; проверка конфигурационного файла; flume-taildir2netlog2redis.conf установка параметров производительности до 20 000; очистка данных Redis; включение мониторинга Redis; запуск тестирования; завершение тестирования;Обновление на сервере:
Передача очередей в HDFS
Локальное тестирование службы Redis для приема данных
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /user/hadoop/my/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = netlogact-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true // использовать локальное время
```Тестирование производственной системы
Память канала заполнена, используется файловый канал;
---
Запасной вариант
a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames = id,,msg
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
Оптимизация todo: увеличение количества пакетов до 1 000 000, тестирование для оценки эффективности.2017-06-24
Основное - завершение написания скриптов для обработки логов;
Тестовые скрипты
flume-taildir2netlog.conf
bin/flume-ng agent --conf conf --conf-file conf/flume-taildir2netlog.conf --name a1 -Dflume.root.logger=INFO,console
Тестовый скрипт, тестирование одного события вручную, правильное добавление в Redis
bin/flume-ng agent --conf conf --conf-file conf/flume-netcat2netlog2redis.conf --name a1 -Dflume.root.logger=INFO,console
Тестирование добавления в Redis
flume-taildir2netlog2redis.conf
bin/flume-ng agent --conf conf --conf-file conf/flume-taildir2netlog2redis.conf --name a1 -Dflume.root.logger=INFO,console
Мониторинг создаваемых данных
LLEN netlogactlist
Тестирование вставки в Redis без использования EVAL для избежания блокировки
flume-taildir2redis.conf g-netuser-zset-search-replace.groovy g-netuser-filter.groovy
Скрипты
Добавление фильтрации нелегальных IP-адресов;
2017-06-23
Оптимизация скриптов Redis-Lua, передача параметров для избежания кэширования Lua в Redis;
ok netcat2log.conf тестирование интерцептора
ok netcat2redis.conf тестирование добавления данных в Redis
Конфигурация для удаления данных в определенное время;
2017-06-20
Отладка Flume-netcat2redis.conf;
2017-06-20
Отладка groovy + Redis ;
Кодирование и декодирование данных до и после шифрования и расшифровки
2017-01-17
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.150.140:8650"
2017-01-04
Подключение для шифрования и расшифровки данных, подходящее для передачи данных через интернет.2017-01-03
redis-source [flume-redis2log.conf] тестирование данных для обработки
127.0.0.1:6379> LPUSH jplist '{"message":1}'
127.0.0.1:6379> LPUSH jplist '{"message":2}'
127.0.0.1:6379> LPUSH jplist '{"message":2,"tags":["xyz"],"type":"abc"}'
redis-sink [flume-netcat2redis.conf] генерация данных
telnet 44444 // генерация данных
127.0.0.1:6379> rpop jplist // обработка данных2016-12-13
тестирование синхронизации данных Flume;
локальное тестирование под нагрузкой ок; локальное тестирование под нагрузкой с использованием Docker ок;
2016-10-18
Flume инструмент для обработки данных на стороне клиента morphline-gork морфлайн интерцептор ETL подготовка окружения завершена;
морфлайн syslog лог ETL
приоритет : 164
время : Feb 4 10:46:14
хост : syslog
программа : sshd
pid : 607
сообщение : прослушивание на 0. 0. 0. 0 порту 22.
сообщение : Feb 4 10:46:14 syslog sshd[607]: прослушивание на 0. 0. 0. 0 порту 22.
nc -v 127. 0. 0. 1 44446
>Feb 4 10:46:14 syslog sshd[607]: прослушивание на 0. 0. 0. 0 порту 22.
морфлайн json ETL:
nc -v 127. 0. 0. 1 44446
Подключение к quickstart. cloudera 41415 порту [tcp/*] успешно!
{"username": "blue", "color": "green"}
OK
{"username": "tom", "color": "red"}
OK
2016-10-17
csv2json добавление регулярного выражения интерцептора;
################пример################
текст: 2011-0101
h1,h2,h3,h4
v1,v2,v3,v4
регулярное выражение: (\d{4})-(\d{2})(\d{2})
(. +),(. +),(. +),(. +)\n(. +),(. +),(. +),(. +)
замена: $1-$2-$3
{ "$1" : "$5", "$2" : "$6", "$3" : "$7", "$4" : "$8" }
результат: 2011-01-01
{ "h1" : "v1", "h2" : "v2", "h3" : "v3", "h4" : "v4" }
a1. sources. r1. custom. query = select id,foo,bar from testdata where id > $@$ order by id asc
## source интерцептор
###########sql source csv to json ################
a1. sources. r1. interceptors = i1
a1. sources. r1. interceptors. i1. type = search_replace
a1. sources. r1. interceptors. i1. searchPattern = (. +),(. +),(. +)
a1. sources. r1. interceptors. i1. replaceString = { "id" : $1, "foo" : $2, "bar" : $3 }
a1. sources. r1. interceptors. i1. charset = UTF-8
###########interceptors################
---------------------------------------
Но Morphlines всё ещё является незаменимым инструментом для работы с текстовыми ETL, независимо от того, используете ли вы его напрямую для ETL или выполняете его на сервере. Сочетание Flume и Morphlines обеспечивает такую же гибкость, как и Logstash.
Онлайн-отладка: https://grokconstructor.appspot.com/do/match?example=1
Уже имеется библиотека Grok: https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
см. конфигурационный файл flume-netcat2morphline2log.conf
Затем запустите агента.
Затем telnet localhost 9999, введите hello world и нажмите Enter.
В лог-файле агента можно увидеть следующую информацию:
Событие: { headers:{message=hello world} body: }
Обратите внимание, что свойство message добавлено в заголовки события.
2016-10-10
Пример командной строки
bin/flume-ng agent --plugins-path plugins.d --conf conf --conf-file conf/flume-mysql2rabbitmq.conf --name a1 -Dflume.root.logger=INFO,console
2016-10-09
Тестирование flume-mysql2rabbitmq.conf
rabbitmqctl не может создать пользователя при кросс-контейнерном доступе; docker-sync-flume.sh не работает;
mqinit имеет проблемы;
Тестирование netcat2rabbitmq для потребления данных, тестирование завершено успешно.
fig up -d && sh conf/test-sync-flume.sh
telnet 127.0.0.1 44446
http://127.0.0.1:15672/#/queues/statuscheckvhost/aliveness-test
heartbeat/alive GetMessages получение сообщений;
2016-09-27
Добавлен тест плагина flume-mysql flume-rabbitmq
fig up -d && sh conf/test-sync-flume.sh
flume/lib каталог устарел, используется plugins.d каталог
flume-sql jar плагин jar:flume-ng-sql-source-1.4.3-SNAPSHOT.jarОснован на Hibernate, уже поддерживает все реляционные базы данных.
```bash
$ mkdir -p $FLUME_HOME/plugins.d/sql-source/lib $FLUME_HOME/plugins.d/sql-source/libext
$ cp flume-ng-sql-source-0.8.0.jar $FLUME_HOME/plugins.d/sql-source/lib
mysql
$ wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz
$ tar xzf mysql-connector-java-5.1.35.tar.gz
$ cp mysql-connector-java-5.1.35-bin.jar $FLUME_HOME/plugins.d/lib/sql-source/libext
sql server
$ tar xzf sqljdbc_4.1.5605.100_enu.tar.gz
$ cp sqljdbc_4.1/enu/sqljdbc41.jar $FLUME_HOME/plugins.d/lib/sql-source/libext
Если пользовательский запрос не задан, будет выполнена команда SELECT <columns.to.select> FROM <table>
каждый раз при запуске. Задержка выполнения запроса на query.delay
миллисекунд.
agent.sources.sql-source.custom.query = SELECT incrementalField, field2 FROM table1 WHERE incrementalField > $@$
Для каждого источника определяется его тип.
agent.sources.sqlSource.type = org.keedio.flume.source.SQLSource
agent.sources.sqlSource.hibernate.connection.url = jdbc:db2://192.168.56.70:50000/sample
# Свойства подключения к базе данных Hibernate
agent.sources.sqlSource.hibernate.connection.user = db2inst1
agent.sources.sqlSource.hibernate.connection.password = db2inst1
agent.sources.sqlSource.hibernate.connection.autocommit = true
agent.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.DB2Dialect
agent.sources.sqlSource.hibernate.connection.driver_class = com.ibm.db2.jcc.DB2Driver
#agent.sources.sqlSource.table = employee1
# Столбцы для импорта в kafka (по умолчанию * импортирует всю строку)
#agent.sources.sqlSource.columns.to.select = *
# Задержка выполнения запроса, каждую заданную миллисекунду запрос будет отправлен
agent.sources.sqlSource.run.query.delay = 10000
``` # Файл состояния используется для сохранения последней прочитанной строки
agent.sources.sqlSource.status.file.path = /var/log/flume
agent.sources.sqlSource.status.file.name = sqlSource.status
# Пользовательский запрос
agent.sources.sqlSource.start.from = 19700101000000000000
agent.sources.sqlSource.custom.query = SELECT * FROM (SELECT DECIMAL(test) * 1000000 AS INCREMENTAL, EMPLOYEE1.* FROM employee1 UNION SELECT DECIMAL(test) * 1000000 AS INCREMENTAL, EMPLOYEE2.* FROM employee2) WHERE INCREMENTAL > $@$ ORDER BY INCREMENTAL ASC
agent.sources.sqlSource.batch.size = 1000
agent.sources.sqlSource.max.rows = 1000
agent.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent.sources.sqlSource.hibernate.c3p0.min_size = 1
agent.sources.sqlSource.hibernate.c3p0.max_size = 10
# Канал может быть определен следующим образом.
agent.sources.sqlSource.channels = memoryChannel
2016-09-25
плагин flume-rabbitmq jar:rabbitmq-flume-plugin-standalone-1.1.0.jar
### ИсточникКомпонент Source имеет следующие параметры конфигурации:Переменная | Значение по умолчанию | Описание
------------------- | ---------------------------- | -----------
host | ``localhost`` | Хост RabbitMQ для подключения
port | ``5672`` | Порт для подключения
ssl | ``false`` | Подключение к RabbitMQ через SSL
virtual-host | ``/`` | Имя виртуального хоста для подключения
username | ``guest`` | Имя пользователя для подключения
password | ``guest`` | Пароль для подключения
queue | | **Обязательное** поле, указывающее имя очереди для потребления
auto-ack | ``false`` | Включение автоматического подтверждения для повышения пропускной способности с риском потери сообщений
requeuing | ``false`` | Инструкция брокеру отменить или перезапустить неудачные (отклоненные) сообщения
prefetchCount | ``0`` | Значение ``Basic.QoS`` для предварительного подсчета при потреблении
timeout | ``-1`` | Время ожидания потребителя перед повторной попыткой получения сообщения от RabbitMQ
threads | ``1`` | Количество потоков потребителя для создания
#### Возможные ключи заголовков событий
- exchange
- routing-key
- app-id
- content-encoding
- content-type
- correlation-id
- delivery-mode
- expires
- message-id
- priority
- reply-to
- timestamp
- type
- user-id
#### Пример```
a1.sources.r1.channels = c1
a1.sources.r1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.r1.host = localhost
a1.sources.r1.port = 5672
a1.sources.r1.virtual_host = /
a1.sources.r1.username = flume
a1.sources.r1.password = rabbitmq
a1.sources.r1.queue = events_for_s3
a1.sources.r1.prefetch_count = 10
```### Sink
Компонент Sink RabbitMQ позволяет публиковать события Flume в RabbitMQ. Переменная | Значение по умолчанию | Описание
------------------ | -------------------------- | -----------
host | ``localhost`` | Хост RabbitMQ для подключения
port | ``5672`` | Порт для подключения
ssl | ``false`` | Подключение к RabbitMQ через SSL
virtual-host | ``/`` | Имя виртуального хоста для подключения
username | ``guest`` | Имя пользователя для подключения
password | ``guest`` | Пароль для подключения
exchange | ``amq.topic`` | Обменник для публикации сообщения
routing-key | | Ключ маршрутизации для публикации
auto-properties | ``true`` | Автоматическое заполнение свойств сообщения AMQP
mandatory-publish | ``false`` | Включение обязательной публикации
publisher-confirms | ``false`` | Включение подтверждений публикации
#### Заголовки
При публикации сообщения о событии RabbitMQ Sink сначала проверяет заголовки события на наличие записи ``routing-key``. Если она установлена, то используется это значение при публикации сообщения. Если она не установлена, то используется значение конфигурируемого ключа маршрутизации, которое по умолчанию равно пустой строке.Если опция конфигурации ``auto-properties`` включена (по умолчанию), заголовки события проверяются на наличие стандартных записей AMQP Basic.Properties (за исключением AMQP таблицы ``headers``). Если свойство установлено в заголовках события, оно будет установлено в свойствах сообщения. Кроме того, если значение ``app-id`` не установлено в заголовках, оно будет по умолчанию равно ``RabbitMQSink``. Если значение ``timestamp`` не установлено в заголовках, оно будет по умолчанию равно текущему времени системы.##### Доступные ключи свойств
- app-id
- content-encoding
- content-type
- correlation-id
- delivery-mode
- expires
- message-id
- priority
- reply-to
- timestamp
- type
- user-id
#### Пример
a1.sinks.k1.channels = c1 a1.sinks.k1.type = com.aweber.flume.sink.rabbitmq.RabbitMQSink a1.sinks.k1.host = localhost a1.sinks.k1.port = 5672 a1.sinks.k1.virtual-host = / a1.sinks.k1.username = flume a1.sinks.k1.password = rabbitmq a1.sinks.k1.exchange = amq.topic a1.sinks.k1.routing-key = flume.event a1.sinks.k1.publisher-confirms = true
2016-03-22 flume для alpine работает
gosu запуск не работает
docker run --name flume-hdfs -e FLUME_AGENT_NAME=agent -d supermy/ap-flume docker run --name flume-hdfs -e FLUME_AGENT_NAME=agent-v /path/to/conf/dir:/opt/lib/flume/conf -d supermy/ap-flume
Переменная окружения Значение по умолчанию FLUME_AGENT_NAME Имя агента. Имя агента, указанное в конфигурационном файле, должно совпадать с этим именем. Не задано (обязательная настройка) FLUME_CONF_DIR Каталог конфигурации Flume. Здесь ожидается наличие конфигурационного файла. /opt/lib/flume/conf FLUME_CONF_FILE Имя конфигурационного файла Flume. /opt/lib/flume/conf/flume-conf.properties
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )