Шаги разбиения на блоки должны реализовывать следующие три интерфейса (из которых обязательным является только Reader):
type Reader interface {
//Read каждый вызов Read() будет возвращать элемент данных, если данных больше нет, то будет возвращён нулевой элемент.
Read(chunkCtx *ChunkContext) (интерфейс{}, BatchError)
}
type Processor interface {
//Process обрабатывает элемент от reader и возвращает элемент результата
Process(item интерфейс{}, chunkCtx *ChunkContext) (интерфейс{}, BatchError)
}
type Writer interface {
//Write записывает элементы, сгенерированные процессором в блоке
Write(items []интерфейс{}, chunkCtx *ChunkContext) BatchError
}
Также фреймворк содержит интерфейс ItemReader, который в некоторых случаях может быть использован вместо Reader, он определяется следующим образом:
type ItemReader interface {
//ReadKeys считывает все ключи некоторого вида данных
ReadKeys() ([]интерфейс{}, ошибка)
//ReadItem считывает значение по одному ключу из результата ReadKeys
ReadItem(key интерфейс{}) (интерфейс{}, ошибка)
}
Для удобства можно реализовать следующие интерфейсы, чтобы выполнять некоторые действия инициализации или очистки в Reader или Writer:
type OpenCloser interface {
Open(execution *StepExecution) BatchError
Close(execution *StepExecution) BatchError
}
Пример кода можно посмотреть в test/example2.
Шаги разделения на разделы должны реализовывать интерфейс Partitioner, который используется для разделения всех данных шага на несколько разделов, каждый из которых соответствует одному подшагу. Фреймворк запускает несколько потоков для параллельного выполнения нескольких подшагов. Если необходимо объединить результаты выполнения подшагов, также нужно реализовать интерфейс Aggregator. Эти два интерфейса определяются следующим образом:
type Partitioner interface {
//Partition генерирует выполнения подшага из указанного выполнения шага и количества разделов
Partition(execution *StepExecution, partitions uint) ([]*StepExecution, BatchError)
//GetPartitionNames генерирует имена подшагов из указанного выполнения шага и количества разделов
GetPartitionNames(execution *StepExecution, partitions uint) []string
}
type Aggregator interface {
//Aggregate объединяет результаты всех выполнений подшага
Aggregate(execution *StepExecution, subExecutions []*StepExecution) BatchError
}
Подшаги шагов разделения могут быть простыми шагами (определёнными Handler) или шагами разбиения на блоки (через Reader/Processor/Writer). Если уже есть шаг разбиения на блоки, содержащий ItemReader, то можно создать шаги разделения, просто указав количество разделов:
step := gobatch.NewStep("partition_step").Handler(&ChunkHandler{db}).Partitions(10).Build()
Этот способ реализуется внутри фреймворка GoBatch на основе ItemReader.
Предположим, что у нас есть файл с содержимым (где каждая строка представляет собой запись, а поля разделены '\t'):
trade_1 account_1 cash 1000 normal 2022-02-27 12:12:12
trade_2 account_2 cash 1000 normal 2022-02-27 12:12:12
trade_3 account_3 cash 1000 normal 2022-02-27 12:12:12
……
Если мы хотим прочитать содержимое файла и вставить каждую запись в таблицу t_trade базы данных, мы можем сделать это следующим образом:
type Trade struct {
TradeNo string `order:"0"`
AccountNo string `order:"1"`
Type string `order:"2"`
Amount float64 `order:"3"`
TradeTime time.Time `order:"5"`
Status string `order:"4"`
}
var tradeFile = file.FileObjectModel{
FileStore: &file.LocalFileSystem{},
FileName: "/data/{date,yyyy-MM-dd}/trade.data",
Type: file.TSV,
Encoding: "utf-8",
ItemPrototype: &Trade{},
}
type TradeWriter struct {
db *gorm.DB
}
func (p *TradeWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
models := make([]*Trade, len(items))
for i, item := range items {
models[i] = item.(*Trade)
}
e := p.db.Table("t_trade").Create(models).Error
if e != nil {
return gobatch.NewBatchError(gobatch.ErrCodeDbFail, "save trade into db err", e)
}
return nil
}
func buildAndRunJob() {
//...
step := gobatch.NewStep("trade_import").ReadFile(tradeFile).Writer(&TradeWriter{db}).Partitions(10).Build()
//...
job := gobatch.NewJob("my_job").Step(...,step,...).Build()
gobatch.Register(job)
gobatch.Start(context.Background(), job.Name(), "{\"date\":\"20220202\"}")
}
``` **Слушатель**
Рамка предоставляет множество слушателей для обработки событий в процессе выполнения всего пакетного задания и отдельных шагов:
```go
type JobListener interface {
BeforeJob(execution *JobExecution) BatchError
AfterJob(execution *JobExecution) BatchError
}
type StepListener interface {
BeforeStep(execution *StepExecution) BatchError
AfterStep(execution *StepExecution) BatchError
}
type ChunkListener interface {
BeforeChunk(context *ChunkContext) BatchError
AfterChunk(context *ChunkContext) BatchError
OnError(context *ChunkContext, err BatchError)
}
type PartitionListener interface {
BeforePartition(execution *StepExecution) BatchError
AfterPartition(execution *StepExecution, subExecutions []*StepExecution) BatchError
OnError(execution *StepExecution, err BatchError)
}
Можно создать задание и указать слушателя:
func buildAndRunJob() {
//...
step := gobatch.NewStep("my_step").Handler(handler,...).Listener(listener,...).Build()
//...
job := gobatch.NewJob("my_job").Step(step,...).Listener(listener,...).Build()
}
Глобальные настройки
GoBatch требует использования базы данных для хранения контекста информации о выполнении заданий и шагов. Поэтому перед запуском задания необходимо зарегистрировать экземпляр *sql.DB в GoBatch:
gobatch.SetDB(sqlDb)
Если требуется использовать блочный шаг, необходимо установить менеджер транзакций (TransactionManager) в GoBatch. Интерфейс TransactionManager определяется следующим образом:
type TransactionManager interface {
BeginTx() (tx interface{}, err BatchError)
Commit(tx interface{}) BatchError
Rollback(tx interface{}) BatchError
}
GoBatch содержит встроенный менеджер транзакций по умолчанию с именем DefaultTxManager. Если уже установлен экземпляр DB, но ещё не установлен TransactionManager, GoBatch автоматически создаст экземпляр DefaultTxManager. Конечно, пользователь может также указать свой собственный менеджер транзакций вместо реализации по умолчанию:
gobatch.SetTransactionManager(&CustomTransactionManager{})
GoBatch использует пул для запуска заданий и шагов. По умолчанию максимальное количество одновременных заданий равно 10, а максимальное количество одновременных шагов — 1000. Чтобы изменить эти значения по умолчанию, выполните следующие действия:
gobatch.SetMaxRunningJobs(100)
gobatch.SetMaxRunningSteps(5000)
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )