1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/supermy-apFlume

В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
CHANGELOG.md 26 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 04.06.2025 23:14 8f405c7

2017-07-14

Пресс-тестирование flume-redis

JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
export JAVA_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

2017-07-05

Синхронизация данных

oracle2rabbitmq и преобразование данных в формат json
rabbitmq2mongodb хранение данных в mongodb

2017-07-04

Использование netcat для захвата данных, хранение в mongo, поддержка формата json

Sink поддерживает некоторые заголовки в динамической модели:
'db': имя базы данных
'collection': имя коллекции

По умолчанию имя базы данных events; имя базы данных можно задать с помощью заголовков
По умолчанию имя коллекции events; имя коллекции можно задать с помощью заголовков
Формат данных json

telnet >{"a":1}
> db.events.find()
{ "_id" : ObjectId("595b0030e4b074bb69ee49b2"), "a" : 1 }

2017-07-03

Добавление фильтрации по дате данных

Пользовательские данные сохраняются в течение 3 дней; 
Добавление фильтрации для данных журнала

2017-07-01

Тестирование Redis Sink в кластере

Тестирование 3A RedisClusterZSetSink завершено, запущено на производственном сервере;
Тестирование Log RedisClusterZrangeByScoreSink завершено, зависит от завершения тестирования 3A;
Тестирование Source RedisClusterSource завершено, конфигурация flume-redis2log.conf

2017-06-28

Оптимизация производительности Анализ slowlog get показал, что вторичное внесение данных в Redis занимает много времени; возможно, стоит выполнить это напрямую в скрипте? Замена на выполнение скрипта напрямую в Redis; исключение возврата данных и записи данных, проверка эффективности;

2017-06-27

Изменение пути хранения Redis, динамическое изменение;

127.0.0.1:6379> config set dir /disk01/redisdata/
127.0.0.1:6379> save

2017-06-26

Обновление до последней версии движка; результат "very good"; остановите кодовый движок; Обновите JAR-пакет; Обновите конфигурацию flume-taildir2redis.conf и загрузите её на сервер; com.supermy.redis.flume.redis.sink.RedisZSetSink Обновите Groovy-скрипты фильтра convert; проверка скриптов завершена; /etc/flume/conf/g-netuser-filter.groovy /etc/flume/conf/g-netuser-zset-search-replace.groovy Тестирование; Проверка логов; Проверка Redis; *** Новая версия мощная и динамичная; встроенный pipeline 2w line работает намного быстрее, чем eval ***

Начало развертывания версии для обработки логов; использование файловых каналов; утраченные данные можно восстановить; Локальное тестирование верификации; конфигурационный файл Flume config динамически загружается; при каждом обновлении происходит перезагрузка; проверка конфигурационного файла; flume-taildir2netlog2redis.conf установка параметров производительности до 20 000; очистка данных Redis; включение мониторинга Redis; запуск тестирования; завершение тестирования;Обновление на сервере:

  • обновление JAR-файлов
  • обновление конфигурационных файлов
  • flume-taildir2netlog2redis.conf
  • g-netlog-filter.groovy
  • g-netlog-search-replace.groovy
  • проверка конфигурационных файлов, изменение конфигурации сервера
  • включение мониторинга Redis
  • запуск службы и наблюдение за логами
  • работа службы подтверждена 6-26 10:48

Передача очередей в HDFS

Локальное тестирование службы Redis для приема данных

  • проверка конфигурационного файла flume-redis2log.conf
  • изменение определенной очереди;
  • проверка длины очереди Redis;
  • запуск службы;
  • наблюдение за логами; логи успешно выводятся;
  • проверка длины очереди Redis, все данные из очередей были приняты;
  • добавление скрипта с отметкой времени для повторного тестирования flume_1 | 2017-06-26 15:18:18,323 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{timestamp=1498301603000} body: 34 32 2E 35 38 2E 32 31 32 2E 33 34 7C 32 30 31 42.58.212.34|201 }
    Тестирование локальных служб для HDFS завершено успешно
a1.sinks.k1.type = hdfs  
a1.sinks.k1.channel = c1  
a1.sinks.k1.hdfs.path = /user/hadoop/my/%y-%m-%d/%H%M/%S  
a1.sinks.k1.hdfs.filePrefix = netlogact-  
a1.sinks.k1.hdfs.round = true  
a1.sinks.k1.hdfs.roundValue = 10  
a1.sinks.k1.hdfs.roundUnit = minute  

a1.sinks.k1.hdfs.useLocalTimeStamp = true // использовать локальное время
```Тестирование производственной системы  
    Память канала заполнена, используется файловый канал;  
      
---  
Запасной вариант  
      
    a1.channels = c1  
    a1.channels.c1.type = memory  
    a1.sinks = k1  
    a1.sinks.k1.type = hive  
    a1.sinks.k1.channel = c1  
    a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083  
    a1.sinks.k1.hive.database = logsdb  
    a1.sinks.k1.hive.table = weblogs  
    a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M  
    a1.sinks.k1.useLocalTimeStamp = false  
    a1.sinks.k1.round = true  
    a1.sinks.k1.roundValue = 10  
    a1.sinks.k1.roundUnit = minute  
    a1.sinks.k1.serializer = DELIMITED  
    a1.sinks.k1.serializer.delimiter = "\t"  
    a1.sinks.k1.serializer.serdeSeparator = '\t'  
    a1.sinks.k1.serializer.fieldnames = id,,msg  
      
    a1.channels = c1  
    a1.sinks = k1  
    a1.sinks.k1.type = hbase  
    a1.sinks.k1.table = foo_table  
    a1.sinks.k1.columnFamily = bar_cf  
    a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer  
    a1.sinks.k1.channel = c1  
      
    a1.sinks.k1.channel = c1  
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink  
    a1.sinks.k1.kafka.topic = mytopic  
    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092  
    a1.sinks.k1.kafka.flumeBatchSize = 20  
    a1.sinks.k1.kafka.producer.acks = 1  
    a1.sinks.k1.kafka.producer.linger.ms = 1  
    a1.sinks.k1.kafka.producer.compression.type = snappy  
      
Оптимизация todo: увеличение количества пакетов до 1 000 000, тестирование для оценки эффективности.2017-06-24
    Основное - завершение написания скриптов для обработки логов;

Тестовые скрипты
    flume-taildir2netlog.conf
    bin/flume-ng agent --conf conf --conf-file conf/flume-taildir2netlog.conf --name a1 -Dflume.root.logger=INFO,console

Тестовый скрипт, тестирование одного события вручную, правильное добавление в Redis
    bin/flume-ng agent --conf conf --conf-file conf/flume-netcat2netlog2redis.conf --name a1 -Dflume.root.logger=INFO,console

Тестирование добавления в Redis
    flume-taildir2netlog2redis.conf
    bin/flume-ng agent --conf conf --conf-file conf/flume-taildir2netlog2redis.conf --name a1 -Dflume.root.logger=INFO,console
    
    Мониторинг создаваемых данных
    LLEN netlogactlist

Тестирование вставки в Redis без использования EVAL для избежания блокировки
    flume-taildir2redis.conf  g-netuser-zset-search-replace.groovy  g-netuser-filter.groovy
        
Скрипты
    Добавление фильтрации нелегальных IP-адресов;
    
    
     
2017-06-23
    
    

Оптимизация скриптов Redis-Lua, передача параметров для избежания кэширования Lua в Redis;
    ok  netcat2log.conf тестирование интерцептора  
    ok  netcat2redis.conf тестирование добавления данных в Redis
        Конфигурация для удаления данных в определенное время;
        
2017-06-20
    Отладка Flume-netcat2redis.conf;
    
2017-06-20
    Отладка groovy + Redis ;
    Кодирование и декодирование данных до и после шифрования и расшифровки
    
2017-01-17
    export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.150.140:8650"
    
2017-01-04
    Подключение для шифрования и расшифровки данных, подходящее для передачи данных через интернет.2017-01-03
    redis-source [flume-redis2log.conf] тестирование данных для обработки
    127.0.0.1:6379> LPUSH jplist '{"message":1}'
    127.0.0.1:6379> LPUSH jplist '{"message":2}'
    127.0.0.1:6379> LPUSH jplist '{"message":2,"tags":["xyz"],"type":"abc"}'
    redis-sink [flume-netcat2redis.conf] генерация данных
    telnet 44444  // генерация данных
    127.0.0.1:6379> rpop jplist // обработка данных2016-12-13
     тестирование синхронизации данных Flume;
     локальное тестирование под нагрузкой ок; локальное тестирование под нагрузкой с использованием Docker ок;
  2016-10-18
     Flume инструмент для обработки данных на стороне клиента morphline-gork    морфлайн интерцептор ETL подготовка окружения завершена;
       морфлайн syslog лог ETL
      приоритет : 164
      время : Feb  4 10:46:14
      хост : syslog
      программа : sshd
      pid : 607
      сообщение : прослушивание на 0.  0.  0.  0 порту 22.
      сообщение : Feb  4 10:46:14 syslog sshd[607]: прослушивание на 0.  0.  0.  0 порту 22.
      nc -v 127.  0.  0.  1 44446
      >Feb  4 10:46:14 syslog sshd[607]: прослушивание на 0.  0.  0.  0 порту 22.
       морфлайн json ETL:
      nc -v 127.  0.  0.  1 44446
      Подключение к quickstart.  cloudera 41415 порту [tcp/*] успешно!
      {"username": "blue", "color": "green"}
      OK
      {"username": "tom", "color": "red"}
      OK
    2016-10-17
      csv2json добавление регулярного выражения интерцептора;
       ################пример################
      текст: 2011-0101
      h1,h2,h3,h4
      v1,v2,v3,v4
      регулярное выражение: (\d{4})-(\d{2})(\d{2})
      (.  +),(.  +),(.  +),(.  +)\n(.  +),(.  +),(.  +),(.  +)
      замена: $1-$2-$3
      { "$1" : "$5", "$2" : "$6", "$3" : "$7", "$4" : "$8" }
      результат: 2011-01-01
      { "h1" : "v1", "h2" : "v2", "h3" : "v3", "h4" : "v4" }
       a1.  sources.  r1.  custom.  query = select id,foo,bar  from testdata where id > $@$ order by id asc
       ## source интерцептор
      ###########sql source  csv to json ################
      a1.  sources.  r1.  interceptors = i1
      a1.  sources.  r1.  interceptors.  i1.  type = search_replace
      a1.  sources.  r1.  interceptors.  i1.  searchPattern = (.  +),(.  +),(.  +)
      a1.  sources.  r1.  interceptors.  i1.  replaceString = { "id" : $1, "foo" : $2, "bar" : $3 }
      a1.  sources.  r1.  interceptors.  i1.  charset = UTF-8
      ###########interceptors################
     ---------------------------------------
       Но Morphlines всё ещё является незаменимым инструментом для работы с текстовыми ETL, независимо от того, используете ли вы его напрямую для ETL или выполняете его на сервере.  Сочетание Flume и Morphlines обеспечивает такую же гибкость, как и Logstash.
      Онлайн-отладка: https://grokconstructor.appspot.com/do/match?example=1
      Уже имеется библиотека Grok: https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
       см. конфигурационный файл flume-netcat2morphline2log.conf
       Затем запустите агента.
       Затем telnet localhost 9999, введите hello world и нажмите Enter.
       В лог-файле агента можно увидеть следующую информацию:
       Событие: { headers:{message=hello world} body: }
      Обратите внимание, что свойство message добавлено в заголовки события.
    2016-10-10
      Пример командной строки
      bin/flume-ng agent --plugins-path plugins.d --conf conf --conf-file conf/flume-mysql2rabbitmq.conf --name a1 -Dflume.root.logger=INFO,console
   2016-10-09
      Тестирование flume-mysql2rabbitmq.conf
      rabbitmqctl не может создать пользователя при кросс-контейнерном доступе; docker-sync-flume.sh не работает;
      mqinit имеет проблемы;
        Тестирование netcat2rabbitmq для потребления данных, тестирование завершено успешно.
      fig up -d && sh conf/test-sync-flume.sh
      telnet 127.0.0.1 44446
      http://127.0.0.1:15672/#/queues/statuscheckvhost/aliveness-test
      heartbeat/alive   GetMessages получение сообщений;
    2016-09-27
      Добавлен тест плагина flume-mysql flume-rabbitmq
      fig up -d && sh conf/test-sync-flume.sh
       flume/lib каталог устарел, используется plugins.d каталог
  flume-sql jar плагин jar:flume-ng-sql-source-1.4.3-SNAPSHOT.jarОснован на Hibernate, уже поддерживает все реляционные базы данных.

```bash
$ mkdir -p $FLUME_HOME/plugins.d/sql-source/lib $FLUME_HOME/plugins.d/sql-source/libext
$ cp flume-ng-sql-source-0.8.0.jar $FLUME_HOME/plugins.d/sql-source/lib
mysql
$ wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz
$ tar xzf mysql-connector-java-5.1.35.tar.gz
$ cp mysql-connector-java-5.1.35-bin.jar $FLUME_HOME/plugins.d/lib/sql-source/libext
sql server
$ tar xzf sqljdbc_4.1.5605.100_enu.tar.gz
$ cp sqljdbc_4.1/enu/sqljdbc41.jar $FLUME_HOME/plugins.d/lib/sql-source/libext

Если пользовательский запрос не задан, будет выполнена команда SELECT <columns.to.select> FROM <table> каждый раз при запуске. Задержка выполнения запроса на query.delay миллисекунд.

agent.sources.sql-source.custom.query = SELECT incrementalField, field2 FROM table1 WHERE incrementalField > $@$ 

Для каждого источника определяется его тип.

agent.sources.sqlSource.type = org.keedio.flume.source.SQLSource
agent.sources.sqlSource.hibernate.connection.url = jdbc:db2://192.168.56.70:50000/sample
# Свойства подключения к базе данных Hibernate
agent.sources.sqlSource.hibernate.connection.user = db2inst1
agent.sources.sqlSource.hibernate.connection.password = db2inst1
agent.sources.sqlSource.hibernate.connection.autocommit = true
agent.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.DB2Dialect
agent.sources.sqlSource.hibernate.connection.driver_class = com.ibm.db2.jcc.DB2Driver
#agent.sources.sqlSource.table = employee1
# Столбцы для импорта в kafka (по умолчанию * импортирует всю строку)
#agent.sources.sqlSource.columns.to.select = *
# Задержка выполнения запроса, каждую заданную миллисекунду запрос будет отправлен
agent.sources.sqlSource.run.query.delay = 10000
```    # Файл состояния используется для сохранения последней прочитанной строки
    agent.sources.sqlSource.status.file.path = /var/log/flume
    agent.sources.sqlSource.status.file.name = sqlSource.status
    # Пользовательский запрос
    agent.sources.sqlSource.start.from = 19700101000000000000
    agent.sources.sqlSource.custom.query = SELECT * FROM (SELECT DECIMAL(test) * 1000000 AS INCREMENTAL, EMPLOYEE1.* FROM employee1 UNION SELECT DECIMAL(test) * 1000000 AS INCREMENTAL, EMPLOYEE2.* FROM employee2) WHERE INCREMENTAL > $@$ ORDER BY INCREMENTAL ASC
    agent.sources.sqlSource.batch.size = 1000
    agent.sources.sqlSource.max.rows = 1000
    agent.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
    agent.sources.sqlSource.hibernate.c3p0.min_size = 1
    agent.sources.sqlSource.hibernate.c3p0.max_size = 10
    # Канал может быть определен следующим образом.
    agent.sources.sqlSource.channels = memoryChannel
   2016-09-25
  плагин flume-rabbitmq jar:rabbitmq-flume-plugin-standalone-1.1.0.jar
  ### ИсточникКомпонент Source имеет следующие параметры конфигурации:Переменная          | Значение по умолчанию       | Описание
------------------- | ---------------------------- | -----------
host                | ``localhost``               | Хост RabbitMQ для подключения
port                | ``5672``                    | Порт для подключения
ssl                 | ``false``                   | Подключение к RabbitMQ через SSL
virtual-host        | ``/``                       | Имя виртуального хоста для подключения
username            | ``guest``                   | Имя пользователя для подключения
password            | ``guest``                   | Пароль для подключения
queue               |                              | **Обязательное** поле, указывающее имя очереди для потребления
auto-ack            | ``false``                   | Включение автоматического подтверждения для повышения пропускной способности с риском потери сообщений
requeuing           | ``false``                   | Инструкция брокеру отменить или перезапустить неудачные (отклоненные) сообщения
prefetchCount       | ``0``                       | Значение ``Basic.QoS`` для предварительного подсчета при потреблении
timeout             | ``-1``                      | Время ожидания потребителя перед повторной попыткой получения сообщения от RabbitMQ
threads             | ``1``                       | Количество потоков потребителя для создания

#### Возможные ключи заголовков событий

- exchange
- routing-key
- app-id
- content-encoding
- content-type
- correlation-id
- delivery-mode
- expires
- message-id
- priority
- reply-to
- timestamp
- type
- user-id

#### Пример```
a1.sources.r1.channels = c1
a1.sources.r1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.r1.host = localhost
a1.sources.r1.port = 5672
a1.sources.r1.virtual_host = /
a1.sources.r1.username = flume
a1.sources.r1.password = rabbitmq
a1.sources.r1.queue = events_for_s3
a1.sources.r1.prefetch_count = 10
```### Sink
Компонент Sink RabbitMQ позволяет публиковать события Flume в RabbitMQ. Переменная | Значение по умолчанию | Описание
------------------ | -------------------------- | -----------
host               | ``localhost``             | Хост RabbitMQ для подключения
port               | ``5672``                  | Порт для подключения
ssl                | ``false``                 | Подключение к RabbitMQ через SSL
virtual-host       | ``/``                     | Имя виртуального хоста для подключения
username           | ``guest``                 | Имя пользователя для подключения
password           | ``guest``                 | Пароль для подключения
exchange           | ``amq.topic``             | Обменник для публикации сообщения
routing-key        |                           | Ключ маршрутизации для публикации
auto-properties    | ``true``                  | Автоматическое заполнение свойств сообщения AMQP
mandatory-publish  | ``false``                 | Включение обязательной публикации
publisher-confirms | ``false``                 | Включение подтверждений публикации

#### Заголовки
При публикации сообщения о событии RabbitMQ Sink сначала проверяет заголовки события на наличие записи ``routing-key``. Если она установлена, то используется это значение при публикации сообщения. Если она не установлена, то используется значение конфигурируемого ключа маршрутизации, которое по умолчанию равно пустой строке.Если опция конфигурации ``auto-properties`` включена (по умолчанию), заголовки события проверяются на наличие стандартных записей AMQP Basic.Properties (за исключением AMQP таблицы ``headers``). Если свойство установлено в заголовках события, оно будет установлено в свойствах сообщения. Кроме того, если значение ``app-id`` не установлено в заголовках, оно будет по умолчанию равно ``RabbitMQSink``. Если значение ``timestamp`` не установлено в заголовках, оно будет по умолчанию равно текущему времени системы.##### Доступные ключи свойств

- app-id
- content-encoding
- content-type
- correlation-id
- delivery-mode
- expires
- message-id
- priority
- reply-to
- timestamp
- type
- user-id

#### Пример

a1.sinks.k1.channels = c1 a1.sinks.k1.type = com.aweber.flume.sink.rabbitmq.RabbitMQSink a1.sinks.k1.host = localhost a1.sinks.k1.port = 5672 a1.sinks.k1.virtual-host = / a1.sinks.k1.username = flume a1.sinks.k1.password = rabbitmq a1.sinks.k1.exchange = amq.topic a1.sinks.k1.routing-key = flume.event a1.sinks.k1.publisher-confirms = true

2016-03-22 flume для alpine работает

gosu запуск не работает

docker run --name flume-hdfs -e FLUME_AGENT_NAME=agent -d supermy/ap-flume docker run --name flume-hdfs -e FLUME_AGENT_NAME=agent-v /path/to/conf/dir:/opt/lib/flume/conf -d supermy/ap-flume

Переменная окружения Значение по умолчанию FLUME_AGENT_NAME Имя агента. Имя агента, указанное в конфигурационном файле, должно совпадать с этим именем. Не задано (обязательная настройка) FLUME_CONF_DIR Каталог конфигурации Flume. Здесь ожидается наличие конфигурационного файла. /opt/lib/flume/conf FLUME_CONF_FILE Имя конфигурационного файла Flume. /opt/lib/flume/conf/flume-conf.properties


Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/supermy-apFlume.git
git@api.gitlife.ru:oschina-mirror/supermy-apFlume.git
oschina-mirror
supermy-apFlume
supermy-apFlume
master