1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/chararch-gobatch

Клонировать/Скачать
README_zh.md 9.9 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 01.12.2024 16:30 51b723c

Написание шагов разбиения на блоки

Шаги разбиения на блоки должны реализовывать следующие три интерфейса (из которых обязательным является только Reader):

type Reader interface {
    //Read каждый вызов Read() будет возвращать элемент данных, если данных больше нет, то будет возвращён нулевой элемент.
    Read(chunkCtx *ChunkContext) (интерфейс{}, BatchError)
}
type Processor interface {
    //Process обрабатывает элемент от reader и возвращает элемент результата
    Process(item интерфейс{}, chunkCtx *ChunkContext) (интерфейс{}, BatchError)
}
type Writer interface {
    //Write записывает элементы, сгенерированные процессором в блоке
    Write(items []интерфейс{}, chunkCtx *ChunkContext) BatchError
}

Также фреймворк содержит интерфейс ItemReader, который в некоторых случаях может быть использован вместо Reader, он определяется следующим образом:

type ItemReader interface {
    //ReadKeys считывает все ключи некоторого вида данных
    ReadKeys() ([]интерфейс{}, ошибка)
    //ReadItem считывает значение по одному ключу из результата ReadKeys
    ReadItem(key интерфейс{}) (интерфейс{}, ошибка)
}

Для удобства можно реализовать следующие интерфейсы, чтобы выполнять некоторые действия инициализации или очистки в Reader или Writer:

type OpenCloser interface {
    Open(execution *StepExecution) BatchError
    Close(execution *StepExecution) BatchError
}

Пример кода можно посмотреть в test/example2.

Написание шагов разделения на разделы

Шаги разделения на разделы должны реализовывать интерфейс Partitioner, который используется для разделения всех данных шага на несколько разделов, каждый из которых соответствует одному подшагу. Фреймворк запускает несколько потоков для параллельного выполнения нескольких подшагов. Если необходимо объединить результаты выполнения подшагов, также нужно реализовать интерфейс Aggregator. Эти два интерфейса определяются следующим образом:

type Partitioner interface {
    //Partition генерирует выполнения подшага из указанного выполнения шага и количества разделов
    Partition(execution *StepExecution, partitions uint) ([]*StepExecution, BatchError)
    //GetPartitionNames генерирует имена подшагов из указанного выполнения шага и количества разделов
    GetPartitionNames(execution *StepExecution, partitions uint) []string
}

type Aggregator interface {
    //Aggregate объединяет результаты всех выполнений подшага
    Aggregate(execution *StepExecution, subExecutions []*StepExecution) BatchError
}

Подшаги шагов разделения могут быть простыми шагами (определёнными Handler) или шагами разбиения на блоки (через Reader/Processor/Writer). Если уже есть шаг разбиения на блоки, содержащий ItemReader, то можно создать шаги разделения, просто указав количество разделов:

step := gobatch.NewStep("partition_step").Handler(&ChunkHandler{db}).Partitions(10).Build()

Этот способ реализуется внутри фреймворка GoBatch на основе ItemReader.

Чтение и запись файлов

Предположим, что у нас есть файл с содержимым (где каждая строка представляет собой запись, а поля разделены '\t'):

trade_1 account_1   cash    1000    normal  2022-02-27 12:12:12
trade_2 account_2   cash    1000    normal  2022-02-27 12:12:12
trade_3 account_3   cash    1000    normal  2022-02-27 12:12:12
……

Если мы хотим прочитать содержимое файла и вставить каждую запись в таблицу t_trade базы данных, мы можем сделать это следующим образом:

type Trade struct {
    TradeNo   string    `order:"0"`
    AccountNo string    `order:"1"`
    Type      string    `order:"2"`
    Amount    float64   `order:"3"`
    TradeTime time.Time `order:"5"`
    Status    string    `order:"4"`
}

var tradeFile = file.FileObjectModel{
    FileStore:     &file.LocalFileSystem{},
    FileName:      "/data/{date,yyyy-MM-dd}/trade.data",
    Type:          file.TSV,
    Encoding:      "utf-8",
    ItemPrototype: &Trade{},
}

type TradeWriter struct {
    db *gorm.DB
}

func (p *TradeWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
    models := make([]*Trade, len(items))
    for i, item := range items {
        models[i] = item.(*Trade)
    }
    e := p.db.Table("t_trade").Create(models).Error
    if e != nil {
        return gobatch.NewBatchError(gobatch.ErrCodeDbFail, "save trade into db err", e)
    }
    return nil
}

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("trade_import").ReadFile(tradeFile).Writer(&TradeWriter{db}).Partitions(10).Build()
    //...
    job := gobatch.NewJob("my_job").Step(...,step,...).Build()
    gobatch.Register(job)
    gobatch.Start(context.Background(), job.Name(), "{\"date\":\"20220202\"}")
}
``` **Слушатель**

Рамка предоставляет множество слушателей для обработки событий в процессе выполнения всего пакетного задания и отдельных шагов:
```go
type JobListener interface {
    BeforeJob(execution *JobExecution) BatchError
    AfterJob(execution *JobExecution) BatchError
}

type StepListener interface {
    BeforeStep(execution *StepExecution) BatchError
    AfterStep(execution *StepExecution) BatchError
}

type ChunkListener interface {
    BeforeChunk(context *ChunkContext) BatchError
    AfterChunk(context *ChunkContext) BatchError
    OnError(context *ChunkContext, err BatchError)
}

type PartitionListener interface {
    BeforePartition(execution *StepExecution) BatchError
    AfterPartition(execution *StepExecution, subExecutions []*StepExecution) BatchError
    OnError(execution *StepExecution, err BatchError)
}

Можно создать задание и указать слушателя:

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("my_step").Handler(handler,...).Listener(listener,...).Build()
    //...
    job := gobatch.NewJob("my_job").Step(step,...).Listener(listener,...).Build()
}

Глобальные настройки

Указание экземпляра DB

GoBatch требует использования базы данных для хранения контекста информации о выполнении заданий и шагов. Поэтому перед запуском задания необходимо зарегистрировать экземпляр *sql.DB в GoBatch:

    gobatch.SetDB(sqlDb)

Указание менеджера транзакций

Если требуется использовать блочный шаг, необходимо установить менеджер транзакций (TransactionManager) в GoBatch. Интерфейс TransactionManager определяется следующим образом:

type TransactionManager interface {
    BeginTx() (tx interface{}, err BatchError)
    Commit(tx interface{}) BatchError
    Rollback(tx interface{}) BatchError
}

GoBatch содержит встроенный менеджер транзакций по умолчанию с именем DefaultTxManager. Если уже установлен экземпляр DB, но ещё не установлен TransactionManager, GoBatch автоматически создаст экземпляр DefaultTxManager. Конечно, пользователь может также указать свой собственный менеджер транзакций вместо реализации по умолчанию:

  gobatch.SetTransactionManager(&CustomTransactionManager{})

Установка максимального количества одновременных заданий и максимального количества одновременных шагов

GoBatch использует пул для запуска заданий и шагов. По умолчанию максимальное количество одновременных заданий равно 10, а максимальное количество одновременных шагов — 1000. Чтобы изменить эти значения по умолчанию, выполните следующие действия:

    gobatch.SetMaxRunningJobs(100)
    gobatch.SetMaxRunningSteps(5000)

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/chararch-gobatch.git
git@api.gitlife.ru:oschina-mirror/chararch-gobatch.git
oschina-mirror
chararch-gobatch
chararch-gobatch
master