1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/mirrors-Badger

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
value.go 32 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Alex Zaytsev Отправлено 07.04.2025 19:32 a9a63e2
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187
/*
* SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
* SPDX-License-Identifier: Apache-2.0
*/
package badger
import (
"bytes"
"context"
"errors"
"fmt"
"hash"
"hash/crc32"
"io"
"math"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
// maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of
// uint32, so limiting at max uint32.
var maxVlogFileSize uint32 = math.MaxUint32
// Values have their first byte being byteData or byteDelete. This helps us distinguish between
// a key that has never been seen and a key that has been explicitly deleted.
const (
bitDelete byte = 1 << 0 // Set if the key has been deleted.
bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key.
bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
// Set if item shouldn't be discarded via compactions (used by merge operator)
bitMergeEntry byte = 1 << 3
// The MSB 2 bits are for transactions.
bitTxn byte = 1 << 6 // Set if the entry is part of a txn.
bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.
mi int64 = 1 << 20 //nolint:unused
// size of vlog header.
// +----------------+------------------+
// | keyID(8 bytes) | baseIV(12 bytes)|
// +----------------+------------------+
vlogHeaderSize = 20
)
var errStop = errors.New("Stop iteration")
var errTruncate = errors.New("Do truncate")
type logEntry func(e Entry, vp valuePointer) error
type safeRead struct {
k []byte
v []byte
recordOffset uint32
lf *logFile
}
// hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number
// bytes read. The hashReader writes to h (hash) what it reads from r.
type hashReader struct {
r io.Reader
h hash.Hash32
bytesRead int // Number of bytes read.
}
func newHashReader(r io.Reader) *hashReader {
hash := crc32.New(y.CastagnoliCrcTable)
return &hashReader{
r: r,
h: hash,
}
}
// Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure.
func (t *hashReader) Read(p []byte) (int, error) {
n, err := t.r.Read(p)
if err != nil {
return n, err
}
t.bytesRead += n
return t.h.Write(p[:n])
}
// ReadByte reads exactly one byte from the reader. Returns error on failure.
func (t *hashReader) ReadByte() (byte, error) {
b := make([]byte, 1)
_, err := t.Read(b)
return b[0], err
}
// Sum32 returns the sum32 of the underlying hash.
func (t *hashReader) Sum32() uint32 {
return t.h.Sum32()
}
// Entry reads an entry from the provided reader. It also validates the checksum for every entry
// read. Returns error on failure.
func (r *safeRead) Entry(reader io.Reader) (*Entry, error) {
tee := newHashReader(reader)
var h header
hlen, err := h.DecodeFrom(tee)
if err != nil {
return nil, err
}
if h.klen > uint32(1<<16) { // Key length must be below uint16.
return nil, errTruncate
}
kl := int(h.klen)
if cap(r.k) < kl {
r.k = make([]byte, 2*kl)
}
vl := int(h.vlen)
if cap(r.v) < vl {
r.v = make([]byte, 2*vl)
}
e := &Entry{}
e.offset = r.recordOffset
e.hlen = hlen
buf := make([]byte, h.klen+h.vlen)
if _, err := io.ReadFull(tee, buf[:]); err != nil {
if err == io.EOF {
err = errTruncate
}
return nil, err
}
if r.lf.encryptionEnabled() {
if buf, err = r.lf.decryptKV(buf[:], r.recordOffset); err != nil {
return nil, err
}
}
e.Key = buf[:h.klen]
e.Value = buf[h.klen:]
var crcBuf [crc32.Size]byte
if _, err := io.ReadFull(reader, crcBuf[:]); err != nil {
if err == io.EOF {
err = errTruncate
}
return nil, err
}
crc := y.BytesToU32(crcBuf[:])
if crc != tee.Sum32() {
return nil, errTruncate
}
e.meta = h.meta
e.UserMeta = h.userMeta
e.ExpiresAt = h.expiresAt
return e, nil
}
func (vlog *valueLog) rewrite(f *logFile) error {
vlog.filesLock.RLock()
for _, fid := range vlog.filesToBeDeleted {
if fid == f.fid {
vlog.filesLock.RUnlock()
return fmt.Errorf("value log file already marked for deletion fid: %d", fid)
}
}
maxFid := vlog.maxFid
y.AssertTruef(f.fid < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
vlog.filesLock.RUnlock()
vlog.opt.Infof("Rewriting fid: %d", f.fid)
wb := make([]*Entry, 0, 1000)
var size int64
y.AssertTrue(vlog.db != nil)
var count, moved int
fe := func(e Entry) error {
count++
if count%100000 == 0 {
vlog.opt.Debugf("Processing entry %d", count)
}
vs, err := vlog.db.get(e.Key)
if err != nil {
return err
}
if discardEntry(e, vs, vlog.db) {
return nil
}
// Value is still present in value log.
if len(vs.Value) == 0 {
return fmt.Errorf("Empty value: %+v", vs)
}
var vp valuePointer
vp.Decode(vs.Value)
// If the entry found from the LSM Tree points to a newer vlog file, don't do anything.
if vp.Fid > f.fid {
return nil
}
// If the entry found from the LSM Tree points to an offset greater than the one
// read from vlog, don't do anything.
if vp.Offset > e.offset {
return nil
}
// If the entry read from LSM Tree and vlog file point to the same vlog file and offset,
// insert them back into the DB.
// NOTE: It might be possible that the entry read from the LSM Tree points to
// an older vlog file. See the comments in the else part.
if vp.Fid == f.fid && vp.Offset == e.offset {
moved++
// This new entry only contains the key, and a pointer to the value.
ne := new(Entry)
// Remove only the bitValuePointer and transaction markers. We
// should keep the other bits.
ne.meta = e.meta &^ (bitValuePointer | bitTxn | bitFinTxn)
ne.UserMeta = e.UserMeta
ne.ExpiresAt = e.ExpiresAt
ne.Key = append([]byte{}, e.Key...)
ne.Value = append([]byte{}, e.Value...)
es := ne.estimateSizeAndSetThreshold(vlog.db.valueThreshold())
// Consider size of value as well while considering the total size
// of the batch. There have been reports of high memory usage in
// rewrite because we don't consider the value size. See #1292.
es += int64(len(e.Value))
// Ensure length and size of wb is within transaction limits.
if int64(len(wb)+1) >= vlog.opt.maxBatchCount ||
size+es >= vlog.opt.maxBatchSize {
if err := vlog.db.batchSet(wb); err != nil {
return err
}
size = 0
wb = wb[:0]
}
wb = append(wb, ne)
size += es
} else { //nolint:staticcheck
// It might be possible that the entry read from LSM Tree points to
// an older vlog file. This can happen in the following situation.
// Assume DB is opened with
// numberOfVersionsToKeep=1
//
// Now, if we have ONLY one key in the system "FOO" which has been
// updated 3 times and the same key has been garbage collected 3
// times, we'll have 3 versions of the movekey
// for the same key "FOO".
//
// NOTE: moveKeyi is the gc'ed version of the original key with version i
// We're calling the gc'ed keys as moveKey to simplify the
// explanantion. We used to add move keys but we no longer do that.
//
// Assume we have 3 move keys in L0.
// - moveKey1 (points to vlog file 10),
// - moveKey2 (points to vlog file 14) and
// - moveKey3 (points to vlog file 15).
//
// Also, assume there is another move key "moveKey1" (points to
// vlog file 6) (this is also a move Key for key "FOO" ) on upper
// levels (let's say 3). The move key "moveKey1" on level 0 was
// inserted because vlog file 6 was GCed.
//
// Here's what the arrangement looks like
// L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15)
// L1 => ....
// L2 => ....
// L3 => (moveKey1 => vlog6)
//
// When L0 compaction runs, it keeps only moveKey3 because the number of versions
// to keep is set to 1. (we've dropped moveKey1's latest version)
//
// The new arrangement of keys is
// L0 => ....
// L1 => (moveKey3 => vlog15)
// L2 => ....
// L3 => (moveKey1 => vlog6)
//
// Now if we try to GC vlog file 10, the entry read from vlog file
// will point to vlog10 but the entry read from LSM Tree will point
// to vlog6. The move key read from LSM tree will point to vlog6
// because we've asked for version 1 of the move key.
//
// This might seem like an issue but it's not really an issue
// because the user has set the number of versions to keep to 1 and
// the latest version of moveKey points to the correct vlog file
// and offset. The stale move key on L3 will be eventually dropped
// by compaction because there is a newer versions in the upper
// levels.
}
return nil
}
_, err := f.iterate(vlog.opt.ReadOnly, 0, func(e Entry, vp valuePointer) error {
return fe(e)
})
if err != nil {
return err
}
batchSize := 1024
var loops int
for i := 0; i < len(wb); {
loops++
if batchSize == 0 {
vlog.db.opt.Warningf("We shouldn't reach batch size of zero.")
return ErrNoRewrite
}
end := i + batchSize
if end > len(wb) {
end = len(wb)
}
if err := vlog.db.batchSet(wb[i:end]); err != nil {
if err == ErrTxnTooBig {
// Decrease the batch size to half.
batchSize = batchSize / 2
continue
}
return err
}
i += batchSize
}
vlog.opt.Infof("Processed %d entries in %d loops", len(wb), loops)
vlog.opt.Infof("Total entries: %d. Moved: %d", count, moved)
vlog.opt.Infof("Removing fid: %d", f.fid)
var deleteFileNow bool
// Entries written to LSM. Remove the older file now.
{
vlog.filesLock.Lock()
// Just a sanity-check.
if _, ok := vlog.filesMap[f.fid]; !ok {
vlog.filesLock.Unlock()
return fmt.Errorf("Unable to find fid: %d", f.fid)
}
if vlog.iteratorCount() == 0 {
delete(vlog.filesMap, f.fid)
deleteFileNow = true
} else {
vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, f.fid)
}
vlog.filesLock.Unlock()
}
if deleteFileNow {
if err := vlog.deleteLogFile(f); err != nil {
return err
}
}
return nil
}
func (vlog *valueLog) incrIteratorCount() {
vlog.numActiveIterators.Add(1)
}
func (vlog *valueLog) iteratorCount() int {
return int(vlog.numActiveIterators.Load())
}
func (vlog *valueLog) decrIteratorCount() error {
num := vlog.numActiveIterators.Add(-1)
if num != 0 {
return nil
}
vlog.filesLock.Lock()
lfs := make([]*logFile, 0, len(vlog.filesToBeDeleted))
for _, id := range vlog.filesToBeDeleted {
lfs = append(lfs, vlog.filesMap[id])
delete(vlog.filesMap, id)
}
vlog.filesToBeDeleted = nil
vlog.filesLock.Unlock()
for _, lf := range lfs {
if err := vlog.deleteLogFile(lf); err != nil {
return err
}
}
return nil
}
func (vlog *valueLog) deleteLogFile(lf *logFile) error {
if lf == nil {
return nil
}
lf.lock.Lock()
defer lf.lock.Unlock()
// Delete fid from discard stats as well.
vlog.discardStats.Update(lf.fid, -1)
return lf.Delete()
}
func (vlog *valueLog) dropAll() (int, error) {
// If db is opened in InMemory mode, we don't need to do anything since there are no vlog files.
if vlog.db.opt.InMemory {
return 0, nil
}
// We don't want to block dropAll on any pending transactions. So, don't worry about iterator
// count.
var count int
deleteAll := func() error {
vlog.filesLock.Lock()
defer vlog.filesLock.Unlock()
for _, lf := range vlog.filesMap {
if err := vlog.deleteLogFile(lf); err != nil {
return err
}
count++
}
vlog.filesMap = make(map[uint32]*logFile)
vlog.maxFid = 0
return nil
}
if err := deleteAll(); err != nil {
return count, err
}
vlog.db.opt.Infof("Value logs deleted. Creating value log file: 1")
if _, err := vlog.createVlogFile(); err != nil { // Called while writes are stopped.
return count, err
}
return count, nil
}
func (db *DB) valueThreshold() int64 {
return db.threshold.valueThreshold.Load()
}
type valueLog struct {
dirPath string
// guards our view of which files exist, which to be deleted, how many active iterators
filesLock sync.RWMutex
filesMap map[uint32]*logFile
maxFid uint32
filesToBeDeleted []uint32
// A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
numActiveIterators atomic.Int32
db *DB
writableLogOffset atomic.Uint32 // read by read, written by write
numEntriesWritten uint32
opt Options
garbageCh chan struct{}
discardStats *discardStats
}
func vlogFilePath(dirPath string, fid uint32) string {
return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
}
func (vlog *valueLog) fpath(fid uint32) string {
return vlogFilePath(vlog.dirPath, fid)
}
func (vlog *valueLog) populateFilesMap() error {
vlog.filesMap = make(map[uint32]*logFile)
files, err := os.ReadDir(vlog.dirPath)
if err != nil {
return errFile(err, vlog.dirPath, "Unable to open log dir.")
}
found := make(map[uint64]struct{})
for _, file := range files {
if !strings.HasSuffix(file.Name(), ".vlog") {
continue
}
fsz := len(file.Name())
fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
if err != nil {
return errFile(err, file.Name(), "Unable to parse log id.")
}
if _, ok := found[fid]; ok {
return errFile(err, file.Name(), "Duplicate file found. Please delete one.")
}
found[fid] = struct{}{}
lf := &logFile{
fid: uint32(fid),
path: vlog.fpath(uint32(fid)),
registry: vlog.db.registry,
}
vlog.filesMap[uint32(fid)] = lf
if vlog.maxFid < uint32(fid) {
vlog.maxFid = uint32(fid)
}
}
return nil
}
func (vlog *valueLog) createVlogFile() (*logFile, error) {
fid := vlog.maxFid + 1
path := vlog.fpath(fid)
lf := &logFile{
fid: fid,
path: path,
registry: vlog.db.registry,
writeAt: vlogHeaderSize,
opt: vlog.opt,
}
err := lf.open(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*vlog.opt.ValueLogFileSize)
if err != z.NewFile && err != nil {
return nil, err
}
vlog.filesLock.Lock()
vlog.filesMap[fid] = lf
y.AssertTrue(vlog.maxFid < fid)
vlog.maxFid = fid
// writableLogOffset is only written by write func, by read by Read func.
// To avoid a race condition, all reads and updates to this variable must be
// done via atomics.
vlog.writableLogOffset.Store(vlogHeaderSize)
vlog.numEntriesWritten = 0
vlog.filesLock.Unlock()
return lf, nil
}
func errFile(err error, path string, msg string) error {
return fmt.Errorf("%s. Path=%s. Error=%v", msg, path, err)
}
// init initializes the value log struct. This initialization needs to happen
// before compactions start.
func (vlog *valueLog) init(db *DB) {
vlog.opt = db.opt
vlog.db = db
// We don't need to open any vlog files or collect stats for GC if DB is opened
// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
if vlog.opt.InMemory {
return
}
vlog.dirPath = vlog.opt.ValueDir
vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
lf, err := InitDiscardStats(vlog.opt)
y.Check(err)
vlog.discardStats = lf
// See TestPersistLFDiscardStats for purpose of statement below.
db.logToSyncChan(endVLogInitMsg)
}
func (vlog *valueLog) open(db *DB) error {
// We don't need to open any vlog files or collect stats for GC if DB is opened
// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
if db.opt.InMemory {
return nil
}
if err := vlog.populateFilesMap(); err != nil {
return err
}
// If no files are found, then create a new file.
if len(vlog.filesMap) == 0 {
if vlog.opt.ReadOnly {
return nil
}
_, err := vlog.createVlogFile()
return y.Wrapf(err, "Error while creating log file in valueLog.open")
}
fids := vlog.sortedFids()
for _, fid := range fids {
lf, ok := vlog.filesMap[fid]
y.AssertTrue(ok)
// Just open in RDWR mode. This should not create a new log file.
lf.opt = vlog.opt
if err := lf.open(vlog.fpath(fid), os.O_RDWR,
2*vlog.opt.ValueLogFileSize); err != nil {
return y.Wrapf(err, "Open existing file: %q", lf.path)
}
// We shouldn't delete the maxFid file.
if lf.size.Load() == vlogHeaderSize && fid != vlog.maxFid {
vlog.opt.Infof("Deleting empty file: %s", lf.path)
if err := lf.Delete(); err != nil {
return y.Wrapf(err, "while trying to delete empty file: %s", lf.path)
}
delete(vlog.filesMap, fid)
}
}
if vlog.opt.ReadOnly {
return nil
}
// Now we can read the latest value log file, and see if it needs truncation. We could
// technically do this over all the value log files, but that would mean slowing down the value
// log open.
last, ok := vlog.filesMap[vlog.maxFid]
y.AssertTrue(ok)
lastOff, err := last.iterate(vlog.opt.ReadOnly, vlogHeaderSize,
func(_ Entry, vp valuePointer) error {
return nil
})
if err != nil {
return y.Wrapf(err, "while iterating over: %s", last.path)
}
if err := last.Truncate(int64(lastOff)); err != nil {
return y.Wrapf(err, "while truncating last value log file: %s", last.path)
}
// Don't write to the old log file. Always create a new one.
if _, err := vlog.createVlogFile(); err != nil {
return y.Wrapf(err, "Error while creating log file in valueLog.open")
}
return nil
}
func (vlog *valueLog) Close() error {
if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory {
return nil
}
vlog.opt.Debugf("Stopping garbage collection of values.")
var err error
for id, lf := range vlog.filesMap {
lf.lock.Lock() // We won’t release the lock.
offset := int64(-1)
if !vlog.opt.ReadOnly && id == vlog.maxFid {
offset = int64(vlog.woffset())
}
if terr := lf.Close(offset); terr != nil && err == nil {
err = terr
}
}
if vlog.discardStats != nil {
vlog.db.captureDiscardStats()
if terr := vlog.discardStats.Close(-1); terr != nil && err == nil {
err = terr
}
}
return err
}
// sortedFids returns the file id's not pending deletion, sorted. Assumes we have shared access to
// filesMap.
func (vlog *valueLog) sortedFids() []uint32 {
toBeDeleted := make(map[uint32]struct{})
for _, fid := range vlog.filesToBeDeleted {
toBeDeleted[fid] = struct{}{}
}
ret := make([]uint32, 0, len(vlog.filesMap))
for fid := range vlog.filesMap {
if _, ok := toBeDeleted[fid]; !ok {
ret = append(ret, fid)
}
}
sort.Slice(ret, func(i, j int) bool {
return ret[i] < ret[j]
})
return ret
}
type request struct {
// Input values
Entries []*Entry
// Output values and wait group stuff below
Ptrs []valuePointer
Wg sync.WaitGroup
Err error
ref atomic.Int32
}
func (req *request) reset() {
req.Entries = req.Entries[:0]
req.Ptrs = req.Ptrs[:0]
req.Wg = sync.WaitGroup{}
req.Err = nil
req.ref.Store(0)
}
func (req *request) IncrRef() {
req.ref.Add(1)
}
func (req *request) DecrRef() {
nRef := req.ref.Add(-1)
if nRef > 0 {
return
}
req.Entries = nil
requestPool.Put(req)
}
func (req *request) Wait() error {
req.Wg.Wait()
err := req.Err
req.DecrRef() // DecrRef after writing to DB.
return err
}
type requests []*request
func (reqs requests) DecrRef() {
for _, req := range reqs {
req.DecrRef()
}
}
func (reqs requests) IncrRef() {
for _, req := range reqs {
req.IncrRef()
}
}
// sync function syncs content of latest value log file to disk. Syncing of value log directory is
// not required here as it happens every time a value log file rotation happens(check createVlogFile
// function). During rotation, previous value log file also gets synced to disk. It only syncs file
// if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
// fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
func (vlog *valueLog) sync() error {
if vlog.opt.SyncWrites || vlog.opt.InMemory {
return nil
}
vlog.filesLock.RLock()
maxFid := vlog.maxFid
curlf := vlog.filesMap[maxFid]
// Sometimes it is possible that vlog.maxFid has been increased but file creation
// with same id is still in progress and this function is called. In those cases
// entry for the file might not be present in vlog.filesMap.
if curlf == nil {
vlog.filesLock.RUnlock()
return nil
}
curlf.lock.RLock()
vlog.filesLock.RUnlock()
err := curlf.Sync()
curlf.lock.RUnlock()
return err
}
func (vlog *valueLog) woffset() uint32 {
return vlog.writableLogOffset.Load()
}
// validateWrites will check whether the given requests can fit into 4GB vlog file.
// NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type
// uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB.
func (vlog *valueLog) validateWrites(reqs []*request) error {
vlogOffset := uint64(vlog.woffset())
for _, req := range reqs {
// calculate size of the request.
size := estimateRequestSize(req)
estimatedVlogOffset := vlogOffset + size
if estimatedVlogOffset > uint64(maxVlogFileSize) {
return fmt.Errorf("Request size offset %d is bigger than maximum offset %d",
estimatedVlogOffset, maxVlogFileSize)
}
if estimatedVlogOffset >= uint64(vlog.opt.ValueLogFileSize) {
// We'll create a new vlog file if the estimated offset is greater or equal to
// max vlog size. So, resetting the vlogOffset.
vlogOffset = 0
continue
}
// Estimated vlog offset will become current vlog offset if the vlog is not rotated.
vlogOffset = estimatedVlogOffset
}
return nil
}
// estimateRequestSize returns the size that needed to be written for the given request.
func estimateRequestSize(req *request) uint64 {
size := uint64(0)
for _, e := range req.Entries {
size += uint64(maxHeaderSize + len(e.Key) + len(e.Value) + crc32.Size)
}
return size
}
// write is thread-unsafe by design and should not be called concurrently.
func (vlog *valueLog) write(reqs []*request) error {
if vlog.db.opt.InMemory {
return nil
}
// Validate writes before writing to vlog. Because, we don't want to partially write and return
// an error.
if err := vlog.validateWrites(reqs); err != nil {
return y.Wrapf(err, "while validating writes")
}
vlog.filesLock.RLock()
maxFid := vlog.maxFid
curlf := vlog.filesMap[maxFid]
vlog.filesLock.RUnlock()
defer func() {
if vlog.opt.SyncWrites {
if err := curlf.Sync(); err != nil {
vlog.opt.Errorf("Error while curlf sync: %v\n", err)
}
}
}()
write := func(buf *bytes.Buffer) error {
if buf.Len() == 0 {
return nil
}
n := uint32(buf.Len())
endOffset := vlog.writableLogOffset.Add(n)
// Increase the file size if we cannot accommodate this entry.
// [Aman] Should this be >= or just >? Doesn't make sense to extend the file if it big enough already.
if int(endOffset) >= len(curlf.Data) {
if err := curlf.Truncate(int64(endOffset)); err != nil {
return err
}
}
start := int(endOffset - n)
y.AssertTrue(copy(curlf.Data[start:], buf.Bytes()) == int(n))
curlf.size.Store(endOffset)
return nil
}
toDisk := func() error {
if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) ||
vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries {
if err := curlf.doneWriting(vlog.woffset()); err != nil {
return err
}
newlf, err := vlog.createVlogFile()
if err != nil {
return err
}
curlf = newlf
}
return nil
}
buf := new(bytes.Buffer)
for i := range reqs {
b := reqs[i]
b.Ptrs = b.Ptrs[:0]
var written, bytesWritten int
valueSizes := make([]int64, 0, len(b.Entries))
for j := range b.Entries {
buf.Reset()
e := b.Entries[j]
valueSizes = append(valueSizes, int64(len(e.Value)))
if e.skipVlogAndSetThreshold(vlog.db.valueThreshold()) {
b.Ptrs = append(b.Ptrs, valuePointer{})
continue
}
var p valuePointer
p.Fid = curlf.fid
p.Offset = vlog.woffset()
// We should not store transaction marks in the vlog file because it will never have all
// the entries in a transaction. If we store entries with transaction marks then value
// GC will not be able to iterate on the entire vlog file.
// But, we still want the entry to stay intact for the memTable WAL. So, store the meta
// in a temporary variable and reassign it after writing to the value log.
tmpMeta := e.meta
e.meta = e.meta &^ (bitTxn | bitFinTxn)
plen, err := curlf.encodeEntry(buf, e, p.Offset) // Now encode the entry into buffer.
if err != nil {
return err
}
// Restore the meta.
e.meta = tmpMeta
p.Len = uint32(plen)
b.Ptrs = append(b.Ptrs, p)
if err := write(buf); err != nil {
return err
}
written++
bytesWritten += buf.Len()
// No need to flush anything, we write to file directly via mmap.
}
y.NumWritesVlogAdd(vlog.opt.MetricsEnabled, int64(written))
y.NumBytesWrittenVlogAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))
vlog.numEntriesWritten += uint32(written)
vlog.db.threshold.update(valueSizes)
// We write to disk here so that all entries that are part of the same transaction are
// written to the same vlog file.
if err := toDisk(); err != nil {
return err
}
}
return toDisk()
}
// Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
// (if non-nil)
func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
vlog.filesLock.RLock()
defer vlog.filesLock.RUnlock()
ret, ok := vlog.filesMap[vp.Fid]
if !ok {
// log file has gone away, we can't do anything. Return.
return nil, fmt.Errorf("file with ID: %d not found", vp.Fid)
}
// Check for valid offset if we are reading from writable log.
maxFid := vlog.maxFid
// In read-only mode we don't need to check for writable offset as we are not writing anything.
// Moreover, this offset is not set in readonly mode.
if !vlog.opt.ReadOnly && vp.Fid == maxFid {
currentOffset := vlog.woffset()
if vp.Offset >= currentOffset {
return nil, fmt.Errorf(
"Invalid value pointer offset: %d greater than current offset: %d",
vp.Offset, currentOffset)
}
}
ret.lock.RLock()
return ret, nil
}
// Read reads the value log at a given location.
// TODO: Make this read private.
func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {
buf, lf, err := vlog.readValueBytes(vp)
// log file is locked so, decide whether to lock immediately or let the caller to
// unlock it, after caller uses it.
cb := vlog.getUnlockCallback(lf)
if err != nil {
return nil, cb, err
}
if vlog.opt.VerifyValueChecksum {
hash := crc32.New(y.CastagnoliCrcTable)
if _, err := hash.Write(buf[:len(buf)-crc32.Size]); err != nil {
runCallback(cb)
return nil, nil, y.Wrapf(err, "failed to write hash for vp %+v", vp)
}
// Fetch checksum from the end of the buffer.
checksum := buf[len(buf)-crc32.Size:]
if hash.Sum32() != y.BytesToU32(checksum) {
runCallback(cb)
return nil, nil, y.Wrapf(y.ErrChecksumMismatch, "value corrupted for vp: %+v", vp)
}
}
var h header
headerLen := h.Decode(buf)
kv := buf[headerLen:]
if lf.encryptionEnabled() {
kv, err = lf.decryptKV(kv, vp.Offset)
if err != nil {
return nil, cb, err
}
}
if uint32(len(kv)) < h.klen+h.vlen {
vlog.db.opt.Errorf("Invalid read: vp: %+v", vp)
return nil, nil, fmt.Errorf("Invalid read: Len: %d read at:[%d:%d]",
len(kv), h.klen, h.klen+h.vlen)
}
return kv[h.klen : h.klen+h.vlen], cb, nil
}
// getUnlockCallback will returns a function which unlock the logfile if the logfile is mmaped.
// otherwise, it unlock the logfile and return nil.
func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
if lf == nil {
return nil
}
return lf.lock.RUnlock
}
// readValueBytes return vlog entry slice and read locked log file. Caller should take care of
// logFile unlocking.
func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {
lf, err := vlog.getFileRLocked(vp)
if err != nil {
return nil, nil, err
}
buf, err := lf.read(vp)
y.NumReadsVlogAdd(vlog.db.opt.MetricsEnabled, 1)
y.NumBytesReadsVlogAdd(vlog.db.opt.MetricsEnabled, int64(len(buf)))
return buf, lf, err
}
func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
vlog.filesLock.RLock()
defer vlog.filesLock.RUnlock()
LOOP:
// Pick a candidate that contains the largest amount of discardable data
fid, discard := vlog.discardStats.MaxDiscard()
// MaxDiscard will return fid=0 if it doesn't have any discard data. The
// vlog files start from 1.
if fid == 0 {
vlog.opt.Debugf("No file with discard stats")
return nil
}
lf, ok := vlog.filesMap[fid]
// This file was deleted but it's discard stats increased because of compactions. The file
// doesn't exist so we don't need to do anything. Skip it and retry.
if !ok {
vlog.discardStats.Update(fid, -1)
goto LOOP
}
// We have a valid file.
fi, err := lf.Fd.Stat()
if err != nil {
vlog.opt.Errorf("Unable to get stats for value log fid: %d err: %+v", fi, err)
return nil
}
if thr := discardRatio * float64(fi.Size()); float64(discard) < thr {
vlog.opt.Debugf("Discard: %d less than threshold: %.0f for file: %s",
discard, thr, fi.Name())
return nil
}
if fid < vlog.maxFid {
vlog.opt.Infof("Found value log max discard fid: %d discard: %d\n", fid, discard)
lf, ok := vlog.filesMap[fid]
y.AssertTrue(ok)
return lf
}
// Don't randomly pick any value log file.
return nil
}
func discardEntry(e Entry, vs y.ValueStruct, db *DB) bool {
if vs.Version != y.ParseTs(e.Key) {
// Version not found. Discard.
return true
}
if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
return true
}
if (vs.Meta & bitValuePointer) == 0 {
// Key also stores the value in LSM. Discard.
return true
}
if (vs.Meta & bitFinTxn) > 0 {
// Just a txn finish entry. Discard.
return true
}
return false
}
func (vlog *valueLog) doRunGC(lf *logFile) error {
_, span := otel.Tracer("").Start(context.TODO(), "Badger.GC")
span.SetAttributes(attribute.String("GC rewrite for", lf.path))
defer span.End()
if err := vlog.rewrite(lf); err != nil {
return err
}
// Remove the file from discardStats.
vlog.discardStats.Update(lf.fid, -1)
return nil
}
func (vlog *valueLog) waitOnGC(lc *z.Closer) {
defer lc.Done()
<-lc.HasBeenClosed() // Wait for lc to be closed.
// Block any GC in progress to finish, and don't allow any more writes to runGC by filling up
// the channel of size 1.
vlog.garbageCh <- struct{}{}
}
func (vlog *valueLog) runGC(discardRatio float64) error {
select {
case vlog.garbageCh <- struct{}{}:
// Pick a log file for GC.
defer func() {
<-vlog.garbageCh
}()
lf := vlog.pickLog(discardRatio)
if lf == nil {
return ErrNoRewrite
}
return vlog.doRunGC(lf)
default:
return ErrRejected
}
}
func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
if vlog.opt.InMemory {
return
}
for fid, discard := range stats {
vlog.discardStats.Update(fid, discard)
}
// The following is to coordinate with some test cases where we want to
// verify that at least one iteration of updateDiscardStats has been completed.
vlog.db.logToSyncChan(updateDiscardStatsMsg)
}
type vlogThreshold struct {
logger Logger
percentile float64
valueThreshold atomic.Int64
valueCh chan []int64
clearCh chan bool
closer *z.Closer
// Metrics contains a running log of statistics like amount of data stored etc.
vlMetrics *z.HistogramData
}
func initVlogThreshold(opt *Options) *vlogThreshold {
getBounds := func() []float64 {
mxbd := opt.maxValueThreshold
mnbd := float64(opt.ValueThreshold)
y.AssertTruef(mxbd >= mnbd, "maximum threshold bound is less than the min threshold")
size := math.Min(mxbd-mnbd+1, 1024.0)
bdstp := (mxbd - mnbd) / size
bounds := make([]float64, int64(size))
for i := range bounds {
if i == 0 {
bounds[0] = mnbd
continue
}
if i == int(size-1) {
bounds[i] = mxbd
continue
}
bounds[i] = bounds[i-1] + bdstp
}
return bounds
}
lt := &vlogThreshold{
logger: opt.Logger,
percentile: opt.VLogPercentile,
valueCh: make(chan []int64, 1000),
clearCh: make(chan bool, 1),
closer: z.NewCloser(1),
vlMetrics: z.NewHistogramData(getBounds()),
}
lt.valueThreshold.Store(opt.ValueThreshold)
return lt
}
func (v *vlogThreshold) Clear(opt Options) {
v.valueThreshold.Store(opt.ValueThreshold)
v.clearCh <- true
}
func (v *vlogThreshold) update(sizes []int64) {
v.valueCh <- sizes
}
func (v *vlogThreshold) close() {
v.closer.SignalAndWait()
}
func (v *vlogThreshold) listenForValueThresholdUpdate() {
defer v.closer.Done()
for {
select {
case <-v.closer.HasBeenClosed():
return
case val := <-v.valueCh:
for _, e := range val {
v.vlMetrics.Update(e)
}
// we are making it to get Options.VlogPercentile so that values with sizes
// in range of Options.VlogPercentile will make it to the LSM tree and rest to the
// value log file.
p := int64(v.vlMetrics.Percentile(v.percentile))
if v.valueThreshold.Load() != p {
if v.logger != nil {
v.logger.Infof("updating value of threshold to: %d", p)
}
v.valueThreshold.Store(p)
}
case <-v.clearCh:
v.vlMetrics.Clear()
}
}
}

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/mirrors-Badger.git
git@api.gitlife.ru:oschina-mirror/mirrors-Badger.git
oschina-mirror
mirrors-Badger
mirrors-Badger
main