GoBatch
GoBatch — это фреймворк для пакетной обработки данных на языке Go, подобный Spring Batch в Java. Если вы знакомы со Spring Batch, то GoBatch будет вам легко использовать.
В GoBatch задание Job разделено на несколько шагов Steps, которые выполняются последовательно. При выполнении задания Job GoBatch создаёт данные JobExecution, хранящиеся в базе данных, а также создаёт StepExecution при выполнении шага Step.
Существует три типа шагов:
go get -u github.com/chararch/gobatch
import (
"chararch/gobatch"
"context"
"database/sql"
"fmt"
)
// simple task
func mytask() {
fmt.Println("mytask executed")
}
//reader
type myReader struct {
}
func (r *myReader) Read(chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
curr, _ := chunkCtx.StepExecution.StepContext.GetInt("read.num", 0)
if curr < 100 {
chunkCtx.StepExecution.StepContext.Put("read.num", curr+1)
return fmt.Sprintf("value-%v", curr), nil
}
return nil, nil
}
//processor
type myProcessor struct {
}
func (r *myProcessor) Process(item interface{}, chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
return fmt.Sprintf("processed-%v", item), nil
}
//writer
type myWriter struct {
}
func (r *myWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
fmt.Printf("write: %v\n", items)
return nil
}
func main() {
//set db for gobatch to store job&step execution context
db, err := sql.Open("mysql", "gobatch:gobatch123@tcp(127.0.0.1:3306)/gobatch?charset=utf8&parseTime=true")
if err != nil {
panic(err)
}
gobatch.SetDB(db)
//build steps
step1 := gobatch.NewStep("mytask").Handler(mytask).Build()
//step2 := gobatch.NewStep("my_step").Handler(&myReader{}, &myProcessor{}, &myWriter{}).Build()
step2 := gobatch.NewStep("my_step").Reader(&myReader{}).Processor(&myProcessor{}).Writer(&myWriter{}).ChunkSize(10).Build()
//build job
job := gobatch.NewJob("my_job").Step(step1, step2).Build()
//register job to gobatch
gobatch.Register(job)
//run
//gobatch.StartAsync(context.Background(), job.Name(), "")
gobatch.Start(context.Background(), job.Name(), "")
}
Вы можете посмотреть код в test/example.go.
Есть несколько способов написать логику простого шага:
// 1. написать функцию с одной из следующих сигнатур
func(execution *StepExecution) BatchError
func(execution *StepExecution)
func() error
func()
// 2. реализовать интерфейс Handler
type Handler **Предположим, мы хотим экспортировать данные из таблицы t_trade в файл формата csv.**
**Мы можем сделать это следующим образом:**
type Trade struct {
TradeNo string order:"0" header:"trade_no"
AccountNo string order:"1" header:"account_no"
Type string order:"2" header:"type"
Amount float64 order:"3" header:"amount"
TradeTime time.Time order:"5" header:"trade_time" format:"2006-01-02_15:04:05"
Status string order:"4" header:"status"
}
var tradeFileCsv = file.FileObjectModel{ FileStore: &file.LocalFileSystem{}, FileName: "/data/{date,yyyy-MM-dd}/trade_export.csv", Type: file.CSV, Encoding: "utf-8", ItemPrototype: &Trade{}, }
type TradeReader struct { db *gorm.DB }
func (h *TradeReader) ReadKeys() ([]interface{}, error) { var ids []int64 h.db.Table("t_trade").Select("id").Find(&ids) var result []interface{} for _, id := range ids { result = append(result, id) } return result, nil }
func (h *TradeReader) ReadItem(key interface{}) (interface{}, error) { id := int64(0) switch r := key.(type) { case int64: id = r case float64: id = int64(r) default: return nil, fmt.Errorf("key type error, type:%T, value:%v", key, key) } trade := &Trade{} result := h.db.Table("t_trade").Find(trade, "id = ?", id) if result.Error != nil { return nil, result.Error } return trade, nil }
func buildAndRunJob() { //... step := gobatch.NewStep("trade_export").Reader(&TradeReader{db}).WriteFile(tradeFileCsv).Partitions(10).Build() //... }
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )