1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/chararch-gobatch

Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

GoBatch

GoBatch — это фреймворк для пакетной обработки данных на языке Go, подобный Spring Batch в Java. Если вы знакомы со Spring Batch, то GoBatch будет вам легко использовать.

Архитектура

В GoBatch задание Job разделено на несколько шагов Steps, которые выполняются последовательно. При выполнении задания Job GoBatch создаёт данные JobExecution, хранящиеся в базе данных, а также создаёт StepExecution при выполнении шага Step.

Существует три типа шагов:

  • Simple Step выполняет бизнес-логику, определённую в Handler, в одном потоке.
  • Chunk Step обрабатывает данные порциями. Процесс состоит из чтения порции данных, их обработки и записи результата. Этот процесс повторяется до тех пор, пока есть данные для чтения.
  • Partition Step разбивает задачу на несколько подзадач, затем параллельно выполняет подзадачи в подшагах и агрегирует результаты подшагов в конце.

Особенности

  • Модульная конструкция для пакетных приложений.
  • Последовательный и параллельный процесс по вашему запросу.
  • Возможность установки точки останова для возобновления работы.
  • Встроенный компонент обработки файлов.
  • Слушатели для выполнения заданий и шагов.
  • Простота расширения.

Установка

go get -u github.com/chararch/gobatch

Использование шага

  1. Создайте или выберите базу данных, например, gobatch.
  2. Создайте таблицы из sql/schema_mysql.sql в предыдущей базе данных.
  3. Напишите код gobatch и запустите его.

Код

Пример

import (
    "chararch/gobatch"
    "context"
    "database/sql"
    "fmt"
)

// simple task
func mytask() {
    fmt.Println("mytask executed")
}

//reader
type myReader struct {
}
func (r *myReader) Read(chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
    curr, _ := chunkCtx.StepExecution.StepContext.GetInt("read.num", 0)
    if curr < 100 {
        chunkCtx.StepExecution.StepContext.Put("read.num", curr+1)
        return fmt.Sprintf("value-%v", curr), nil
    }
    return nil, nil
}

//processor
type myProcessor struct {
}
func (r *myProcessor) Process(item interface{}, chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
    return fmt.Sprintf("processed-%v", item), nil
}

//writer
type myWriter struct {
}
func (r *myWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
    fmt.Printf("write: %v\n", items)
    return nil
}

func main()  {
    //set db for gobatch to store job&step execution context
    db, err := sql.Open("mysql", "gobatch:gobatch123@tcp(127.0.0.1:3306)/gobatch?charset=utf8&parseTime=true")
    if err != nil {
        panic(err)
    }
    gobatch.SetDB(db)

    //build steps
    step1 := gobatch.NewStep("mytask").Handler(mytask).Build()
    //step2 := gobatch.NewStep("my_step").Handler(&myReader{}, &myProcessor{}, &myWriter{}).Build()
    step2 := gobatch.NewStep("my_step").Reader(&myReader{}).Processor(&myProcessor{}).Writer(&myWriter{}).ChunkSize(10).Build()

    //build job
    job := gobatch.NewJob("my_job").Step(step1, step2).Build()

    //register job to gobatch
    gobatch.Register(job)

    //run
    //gobatch.StartAsync(context.Background(), job.Name(), "")
    gobatch.Start(context.Background(), job.Name(), "")
}

Вы можете посмотреть код в test/example.go.

Написание простого шага

Есть несколько способов написать логику простого шага:

// 1. написать функцию с одной из следующих сигнатур
func(execution *StepExecution) BatchError
func(execution *StepExecution)
func() error
func()

// 2. реализовать интерфейс Handler
type Handler **Предположим, мы хотим экспортировать данные из таблицы t_trade в файл формата csv.**

**Мы можем сделать это следующим образом:**

type Trade struct { TradeNo string order:"0" header:"trade_no" AccountNo string order:"1" header:"account_no" Type string order:"2" header:"type" Amount float64 order:"3" header:"amount" TradeTime time.Time order:"5" header:"trade_time" format:"2006-01-02_15:04:05" Status string order:"4" header:"status" }

var tradeFileCsv = file.FileObjectModel{ FileStore: &file.LocalFileSystem{}, FileName: "/data/{date,yyyy-MM-dd}/trade_export.csv", Type: file.CSV, Encoding: "utf-8", ItemPrototype: &Trade{}, }

type TradeReader struct { db *gorm.DB }

func (h *TradeReader) ReadKeys() ([]interface{}, error) { var ids []int64 h.db.Table("t_trade").Select("id").Find(&ids) var result []interface{} for _, id := range ids { result = append(result, id) } return result, nil }

func (h *TradeReader) ReadItem(key interface{}) (interface{}, error) { id := int64(0) switch r := key.(type) { case int64: id = r case float64: id = int64(r) default: return nil, fmt.Errorf("key type error, type:%T, value:%v", key, key) } trade := &Trade{} result := h.db.Table("t_trade").Find(trade, "id = ?", id) if result.Error != nil { return nil, result.Error } return trade, nil }

func buildAndRunJob() { //... step := gobatch.NewStep("trade_export").Reader(&TradeReader{db}).WriteFile(tradeFileCsv).Partitions(10).Build() //... }


Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

GoBatch — это фреймворк для пакетной обработки на Go, похожий на Spring Batch в Java. Развернуть Свернуть
MIT
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/chararch-gobatch.git
git@api.gitlife.ru:oschina-mirror/chararch-gobatch.git
oschina-mirror
chararch-gobatch
chararch-gobatch
master