Flow и параллельная обработка данных
Это обеспечивает простой способ создания процесса потока и значительно повышает эффективность обработки данных.
Схема архитектуры
Рисунок: Схема архитектуры (flow.png)
Например, мы подсчитываем слова в файле и получаем 10 наиболее часто встречающихся слов. Тестовый файл слишком мал, вы можете увеличить его размер, скопировав его несколько раз. Давайте сравним два способа ниже:
wordCount := map[string]int{}
reverse := true
// Вы можете заменить файл на файл большего размера.
file := "testfile/2553.txt"
start := time.Now()
f, err := os.Open(file)
if err != nil {
panic(err)
}
defer f.Close()
sc := bufio.NewScanner(f)
// Разделение строк
for sc.Scan() {
line := sc.Text()
sps := splitText(line)
for i := 0; i < len(sps); i++ {
st := strings.TrimSpace(sps[i])
if len(st) > 0 {
wordCount[st]++
}
}
}
// Сортировка по времени появления слова по убыванию
sortedWc := sortWc(wordCount, reverse)
duration := time.Since(start)
// Вывод прошедшего времени
fmt.Printf("duration(ms):%v\n", duration.Milliseconds())
// Печать топ-N
topN := 10
if topN > len(sortedWc) {
topN = len(sortedWc)
}
fmt.Println("sortedWc-top", topN, ":")
for i := 0; i < topN; i++ {
fmt.Println(sortedWc[i])
}
Общий способ работает медленно и имеет более низкую загрузку процессора и ввода-вывода при очень большом размере файла.
Мы разделяем операции ввода-вывода и процессора.
//ReadFileProcessor читает строки файла и помещает строку в OutTaskChan для дальнейшей обработки следующим узлом потока.
type ReadFileProcessor struct {
Filepath string
}
func (g *ReadFileProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool) {
f, err := os.Open(g.Filepath)
if err != nil {
panic(err)
}
defer f.Close()
sc := bufio.NewScanner(f)
for sc.Scan() {
select {
case <- ctx.Done() :
return
default:
line := sc.Text()
outTask <- line
}
}
return
}
//SplitAndCountProcessor разделяет строку и подсчитывает появление слова.
type SplitAndCountProcessor struct {
}
func (s *SplitAndCountProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool) {
wordCount := map[string]int{}
for {
select {
case <-ctx.Done():
return true
case task, ok := <-inTasks:
if ok {
line := task.(string)
sps := splitText(line)
for i := 0; i < len(sps); i++ {
st := strings.TrimSpace(sps[i])
if len(st) > 0 {
wordCount[st]++
}
}
} else {
outTask <- wordCount
return
}
}
}
}
//SumWordCountProcessor подводит итоги появления слов.
type SumWordCountProcessor struct {
reverse bool
}
func (s *SumWordCountProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool) {
wordCount := map[string]int{}
for {
select {
case <-ctx.Done():
return true
case task, ok := <-inTasks:
if ok {
wc := task.(map[string]int)
for key, val := range wc {
wordCount[key] += val
}
} else {
sortedWc := sortWc(wordCount, s.reverse)
outTask <- sortedWc
return
}
}
}
}
start := ```
time.Now()
fp := flowprocess.NewFlow()
queneCount := 4000
//Node-0: read file lines. We define 1 processor to read file.
fp.AddNodeProcessors(queneCount,
&ReadFileProcessor{
//You can replace the file with a larger file.
Filepath: "testfile/2553.txt",
})
//Node-1: split and count. we define 4 parallel processors to split and count.
fp.AddNodeProcessors(queneCount,
&SplitAndCountProcessor{},
&SplitAndCountProcessor{},
&SplitAndCountProcessor{},
&SplitAndCountProcessor{},
)
result := &SumWordCountProcessor{
reverse: true,
}
//Node-2: we define 1 processor to summarize.
fp.AddNodeProcessors(1,
result,
)
fp.Start()
if res, ok := fp.Result(); ok {
sortedWc := res.([]wordAndCount)
duration := time.Since(start)
fmt.Printf("duration(ms):%v\n", duration.Milliseconds())
topN := 10
if topN > len(sortedWc) {
topN = len(sortedWc)
}
fmt.Println("sortedWc-top", topN, ":")
for i := 0; i < topN; i++ {
fmt.Println(sortedWc[i])
}
}
The 'Flow and Parallel way' is faster and has higher CPU and IO usage when the file is very large.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )