1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/supermy-apFlume

В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

#flume-fastetl

Описание

На стороне сбора данных используется мощность сборочных устройств для выполнения ETL-обработки больших данных в реальном времени; используется Redis и другие инструменты для полноты вычислений; особенностью является использование Groovy в качестве языка правил, что позволяет выполнять различные условия обработки.

Сценарий использования 1: RuleSearchAndReplaceInterceptor, обеспечение безопасности при передаче данных в интернете, шифрование и расшифровка данных через интерцептор; стандартные регулярные выражения не могут обеспечить эту функциональность.

Сценарий использования 2: RuleFilteringInterceptor, фильтрация данных на основе условий, что позволяет использовать скрипты Groovy для фильтрации данных, что очень гибко; стандартные регулярные выражения не поддерживают условия.

Сценарий использования 3: RuleSearchAndReplaceInterceptor, изменение формата данных, что позволяет использовать скрипты Groovy для преобразования формата данных; стандартные регулярные выражения могут выполнять эту задачу, но с меньшей эффективностью.

Сценарий использования 4: RuleSearchAndReplaceInterceptor, настройка атрибутов head, что позволяет использовать скрипты Groovy для настройки атрибутов head; стандартные настройки более сложны и не поддерживают гибкие бизнес-процессы.[новое] Сценарий использования 4: поддержка фильтрации содержимого; поддержка преобразования содержимого в скрипты, поддерживаемые redis-lua, и поддержка компонента flume-redis для обработки данных.

[новое] Сценарий использования 5: выполнение ETL данных, собранных с помощью Redis Lua, для обработки данных на уровне миллисекунд, включая обработку и извлечение данных на уровне миллиардов (обработка данных временных рядов RedisClusterZrangeByScoreSink 600 000/с).

Установка

Установка:

Скачайте flume 7.0, распакуйте;
Скопируйте каталог conf в каталог flume;
Скопируйте каталог plugins.d в каталог flume;

Оптимизация производительности:

Измените flume/bin/flume-ng

JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
export JAVA_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"  
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545  

Использование

Конфигурационные файлы для различных сценариев использования в каталоге conf;
Команда управления в последней строке каталога conf;

Однонажатие запуск для docker

fig up -d && fig logs    

g-filter.groovy

Фильтрационный скрипт, который позволяет использовать данные head и body в качестве условий для определения, следует ли фильтровать запись данных.```groovy // Когда его значение равно true, не фильтруем println "netuser filter" //println head //println body body = "20170621162925,113.225.23.151,test_10056368,1" //body = "20170621162925,113.225.23.152,test_10056368,3"

def split = body.split(",")

println split.size()

if (split.size() < 4) { // Данные некорректны, фильтруем return false }

def type = split[3]

println type.getClass() println type.substring(0, 1) == '1' //println type.toInteger() == 1

if (type.substring(0, 1) == '1' || type.substring(0, 1) == '2') { return true // не фильтруем } else { return false // фильтруем }


Скрипт замены, который позволяет изменять данные `head` и `body`, адаптируясь к различным бизнес-сценариям, скрипт поддерживает динамическое обновление;

``` groovy сценарий шифрования

        import  com.supermy.flume.interceptor.*
        import javax.crypto.Cipher;
        import javax.crypto.spec.SecretKeySpec;
        
        println head
        println body
        body = body.replace('a', 'aaa')
        head["newhead"] = 'abcd'
        
        
        
        String text = "Данные из Body, Я люблю BONC"
        
        //
        def key = new SecretKeySpec("123456789987654321".bytes, "AES")
        def c = Cipher.getInstance("AES")
        
        // шифрование
        c.init(Cipher.ENCRYPT_MODE, key)
        e_text = new String(Hex.encodeHex(c.doFinal(text.getBytes("UTF-8"))))
        
        // расшифровка
        c.init(Cipher.DECRYPT_MODE, key)
        text1 = new String(c.doFinal(Hex.decodeHex(e_text.toCharArray())))
        
        println text
        println e_text
        println text1
        
        
        def resultMap = [:]
        
        // шифрование данных для передачи через интернет
        
        
        resultMap["head"] = head
        resultMap["body"] = body
        
        return resultMap

``````groovy сценарий redis-lua
import com.google.gson.Gson

// Данные обрабатываются как lua-скрипт и вставляются в redis
println "netuser redis script подготовка"
//println head
//println body

//body = "20170621162925,113.225.23.151,test_10056368,1"
body = "20170621162925,113.225.23.152,test_10056368,2"
// eval "return redis.call('ZADD','KEYS[1]',ARGV[1],ARGV[2])" 1 keyset   123  u123



def split = body.split(",")



def type = split[3]
if (type.substring(0,1) == '1') {
    split[2]=split[2]+"@Start";
} else {
    split[2]=split[2]+"@End";
}

Gson gson = new Gson();

Map full= new HashMap();
full.put("script","return redis.call('ZADD',KEYS[2],KEYS[1],KEYS[3])");
full.put("args",new ArrayList());
full.put("keys",split);

String json = gson.toJson(full);
println json

Map m=gson.fromJson(json, HashMap.class);
println m


//StringBuffer sb = new StringBuffer("return redis.call('ZADD','");
//
//sb.append(split[1]).append("',").append(split[0]).append(",'").append(split[2]);
//if (type.substring(0,1) == '1') {
//    sb.append("@Start'");
//} else {
//    sb.append("@End'");
//}
//sb.append(")");
//println sb

def resultMap = [:]

resultMap["head"] = head
resultMap["body"] = json

return resultMap

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

FlumeFastETL — это фреймворк, основанный на вторичной разработке Flume и Redis, с использованием Groovy для редактирования бизнес-правил, способный обрабатывать в реальном времени данные объемом в триллионы байт. Развернуть Свернуть
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/supermy-apFlume.git
git@api.gitlife.ru:oschina-mirror/supermy-apFlume.git
oschina-mirror
supermy-apFlume
supermy-apFlume
master