1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/xiaochengtech-dbsync

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
fetch.go 3.5 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Xiaosong Gao Отправлено 09.12.2022 10:50 43f9891
package dbsync
import (
"errors"
"fmt"
"time"
)
// 获取增量数据时的配置信息
type FetchOptions struct {
IgnoreFields []string // 忽略的列名称
PageNumber int // 分页获取增量的页码,从1开始
PageSize int // 分页获取增量的页大小
UpdateTimeFieldName string // 更新时间所在列的列名称
LastUpdateTime int64 // 从哪个时间戳开始查询,这是大于的关系
WhereSqlStmt string // 自定义SQL查询语句的Where子句
WhereSqlArgs []interface{} // 自定义SQL查询语句的Where子句的参数列表
}
// 获取增量数据的返回结果
type FetchResult struct {
Columns []string `json:"columns"` // 列名称
Data [][]interface{} `json:"data"` // 待同步的数据,每一行是一条数据,与列名称一一对应
Count int64 `json:"count"` // 数据的数量
}
// 获取增量更新的数据
func DoFetch(
db SQLCommon,
tableName string,
options FetchOptions,
) (rsp FetchResult, err error) {
defer func() {
if r := recover(); r != nil {
err = r.(error)
}
}()
// 参数校验和处理
if options.UpdateTimeFieldName == "" {
err = errors.New("options.UpdateTimeFieldName must be not nil")
return
}
if options.PageNumber <= 0 {
options.PageNumber = 1
}
if options.PageSize <= 0 {
options.PageSize = 100
}
// 拼接SQL语句
whereStmt := fmt.Sprintf("%s > ?", options.UpdateTimeFieldName)
whereArgs := []interface{}{time.Unix(options.LastUpdateTime, 0)}
if options.WhereSqlStmt != "" {
whereStmt = fmt.Sprintf("%s AND (%s)", whereStmt, options.WhereSqlStmt)
whereArgs = append(whereArgs, options.WhereSqlArgs...)
}
offset, size := (options.PageNumber-1)*options.PageSize, options.PageSize
sqlStmt := fmt.Sprintf("SELECT * FROM %s WHERE %s ORDER BY %s ASC LIMIT %d OFFSET %d",
tableName, whereStmt, options.UpdateTimeFieldName, size, offset)
// 执行查询语句
rows, err := db.Query(sqlStmt, whereArgs...)
if err != nil {
return
}
defer rows.Close()
// 获取所有列名
columns, err := rows.Columns()
if err != nil {
return
}
// 如果有忽略字段,则更新列的结果映射关系
ignoreMap := make(map[string]bool)
for _, ignoreFieldName := range options.IgnoreFields {
ignoreMap[ignoreFieldName] = true
}
// 建立列是否应该存入结果的映射关系
columnValidMap, validLen := make(map[int]int), 0
for i, columnName := range columns {
if ignoreMap[columnName] {
columnValidMap[i] = -1
} else {
columnValidMap[i] = validLen
validLen += 1
rsp.Columns = append(rsp.Columns, columnName)
}
}
// 生成每行的处理缓存
cache := make([]interface{}, len(columns))
for i := range cache {
var tmp interface{}
cache[i] = &tmp
}
// 遍历结果集的数据
for rows.Next() {
if err = rows.Scan(cache...); err != nil {
return
}
item := make([]interface{}, validLen)
for j, data := range cache {
if k := columnValidMap[j]; k >= 0 {
item[k] = convertFetchType(data)
}
}
rsp.Data = append(rsp.Data, item)
}
rsp.Count = int64(len(rsp.Data))
return
}
// 类型转换方法
func convertFetchType(data interface{}) interface{} {
item := *data.(*interface{})
switch item := item.(type) {
case nil: // 空值
return nil
case []uint8: // 字符串
return string(item)
case time.Time: // 时间类型
return item.Unix()
case int, int8, int16, int32, int64, float32, float64, byte: // 数字型
return item
case bool: // 布尔型
return item
default:
return item
}
}

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/xiaochengtech-dbsync.git
git@api.gitlife.ru:oschina-mirror/xiaochengtech-dbsync.git
oschina-mirror
xiaochengtech-dbsync
xiaochengtech-dbsync
master