1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/mqyqingkong-flowprocess

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

Flow и параллельная обработка данных

Это обеспечивает простой способ создания процесса потока и значительно повышает эффективность обработки данных.

Схема архитектуры

Рисунок: Схема архитектуры (flow.png)

Использование

Например, мы подсчитываем слова в файле и получаем 10 наиболее часто встречающихся слов. Тестовый файл слишком мал, вы можете увеличить его размер, скопировав его несколько раз. Давайте сравним два способа ниже:

1. Общий способ

    wordCount := map[string]int{}
    reverse := true
    // Вы можете заменить файл на файл большего размера.
    file := "testfile/2553.txt"
    start := time.Now()
    f, err := os.Open(file)
    if err != nil {
        panic(err)
    }
    defer f.Close()

    sc := bufio.NewScanner(f)
    // Разделение строк
    for sc.Scan() {
        line := sc.Text()
        sps := splitText(line)
        for i := 0; i < len(sps); i++ {
            st := strings.TrimSpace(sps[i])
            if len(st) > 0 {
                wordCount[st]++
            }
        }
    }
    // Сортировка по времени появления слова по убыванию
    sortedWc := sortWc(wordCount, reverse)

    duration := time.Since(start)

    // Вывод прошедшего времени
    fmt.Printf("duration(ms):%v\n", duration.Milliseconds())

    // Печать топ-N
    topN := 10
    if topN > len(sortedWc) {
        topN = len(sortedWc)
    }
    fmt.Println("sortedWc-top", topN, ":")
    for i := 0; i < topN; i++ {
        fmt.Println(sortedWc[i])
    }

Общий способ работает медленно и имеет более низкую загрузку процессора и ввода-вывода при очень большом размере файла.

2. Flow и параллельный способ

Мы разделяем операции ввода-вывода и процессора.

(1) Определение flownode-0 процессора (чтение строк файла)

//ReadFileProcessor читает строки файла и помещает строку в OutTaskChan для дальнейшей обработки следующим узлом потока.
type ReadFileProcessor struct {
    Filepath string
}

func (g *ReadFileProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool)  {
    f, err := os.Open(g.Filepath)
    if err != nil {
        panic(err)
    }
    defer f.Close()

    sc := bufio.NewScanner(f)
    for sc.Scan() {
        select {
        case <- ctx.Done() :
            return
        default:
            line := sc.Text()
            outTask <- line
        }
    }
    return 
}

(2) Определение flownode-1 процессора (разделение и подсчёт)

//SplitAndCountProcessor разделяет строку и подсчитывает появление слова.
type SplitAndCountProcessor struct {
}

func (s *SplitAndCountProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool)  {
    wordCount := map[string]int{}
    for {
        select {
        case <-ctx.Done():
            return true
        case task, ok := <-inTasks:
            if ok {
                line := task.(string)
                sps := splitText(line)
                for i := 0; i < len(sps); i++ {
                    st := strings.TrimSpace(sps[i])
                    if len(st) > 0 {
                        wordCount[st]++
                    }
                }
            } else {
                outTask <- wordCount
                return
            }
        }
    }
}

(3) Определение flownode-2 процессора (подведение итогов)

//SumWordCountProcessor подводит итоги появления слов.
type SumWordCountProcessor struct {
    reverse   bool
}

func (s *SumWordCountProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool)  {
    wordCount := map[string]int{}
    for {
        select {
        case <-ctx.Done():
            return true
        case task, ok := <-inTasks:
            if ok {
                wc := task.(map[string]int)
                for key, val := range wc {
                    wordCount[key] += val
                }
            } else {
                sortedWc := sortWc(wordCount, s.reverse)
                outTask <- sortedWc
                return
            }
        }
    }
}

(4) Определение процесса потока

    start := ```
time.Now()
fp := flowprocess.NewFlow()
queneCount := 4000
//Node-0: read file lines. We define 1 processor to read file.
fp.AddNodeProcessors(queneCount,
    &ReadFileProcessor{
        //You can replace the file with a larger file.
        Filepath: "testfile/2553.txt",
    })

//Node-1: split and count. we define 4 parallel processors to split and count.
fp.AddNodeProcessors(queneCount,
    &SplitAndCountProcessor{},
    &SplitAndCountProcessor{},
    &SplitAndCountProcessor{},
    &SplitAndCountProcessor{},
)

result := &SumWordCountProcessor{
    reverse: true,
}

//Node-2: we define 1 processor to summarize.
fp.AddNodeProcessors(1,
    result,
)

fp.Start()
if res, ok := fp.Result(); ok {
    sortedWc := res.([]wordAndCount)
    duration := time.Since(start)
    fmt.Printf("duration(ms):%v\n", duration.Milliseconds())

    topN := 10
    if topN > len(sortedWc) {
        topN = len(sortedWc)
    }
    fmt.Println("sortedWc-top", topN, ":")
    for i := 0; i < topN; i++ {
        fmt.Println(sortedWc[i])
    }
}

The 'Flow and Parallel way' is faster and has higher CPU and IO usage when the file is very large.

3、A practice

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Описание недоступно Развернуть Свернуть
Apache-2.0
Отмена

Обновления (6)

все

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/mqyqingkong-flowprocess.git
git@api.gitlife.ru:oschina-mirror/mqyqingkong-flowprocess.git
oschina-mirror
mqyqingkong-flowprocess
mqyqingkong-flowprocess
master