-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #6833 from filecoin-project/feat/splitstore-badger…
…-markset Splitstore: support on-disk marksets using badger
- Loading branch information
Showing
7 changed files
with
288 additions
and
32 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,230 @@ | ||
package splitstore | ||
|
||
import ( | ||
"os" | ||
"path/filepath" | ||
"sync" | ||
|
||
"golang.org/x/xerrors" | ||
|
||
"github.com/dgraph-io/badger/v2" | ||
"github.com/dgraph-io/badger/v2/options" | ||
"go.uber.org/zap" | ||
|
||
cid "github.com/ipfs/go-cid" | ||
) | ||
|
||
type BadgerMarkSetEnv struct { | ||
path string | ||
} | ||
|
||
var _ MarkSetEnv = (*BadgerMarkSetEnv)(nil) | ||
|
||
type BadgerMarkSet struct { | ||
mx sync.RWMutex | ||
cond sync.Cond | ||
pend map[string]struct{} | ||
writing map[int]map[string]struct{} | ||
writers int | ||
seqno int | ||
|
||
db *badger.DB | ||
path string | ||
} | ||
|
||
var _ MarkSet = (*BadgerMarkSet)(nil) | ||
|
||
var badgerMarkSetBatchSize = 16384 | ||
|
||
func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) { | ||
msPath := filepath.Join(path, "markset.badger") | ||
err := os.MkdirAll(msPath, 0755) //nolint:gosec | ||
if err != nil { | ||
return nil, xerrors.Errorf("error creating markset directory: %w", err) | ||
} | ||
|
||
return &BadgerMarkSetEnv{path: msPath}, nil | ||
} | ||
|
||
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { | ||
path := filepath.Join(e.path, name) | ||
|
||
// clean up first | ||
err := os.RemoveAll(path) | ||
if err != nil { | ||
return nil, xerrors.Errorf("error clearing markset directory: %w", err) | ||
} | ||
|
||
err = os.MkdirAll(path, 0755) //nolint:gosec | ||
if err != nil { | ||
return nil, xerrors.Errorf("error creating markset directory: %w", err) | ||
} | ||
|
||
opts := badger.DefaultOptions(path) | ||
opts.SyncWrites = false | ||
opts.CompactL0OnClose = false | ||
opts.Compression = options.None | ||
// Note: We use FileIO for loading modes to avoid memory thrashing and interference | ||
// between the system blockstore and the markset. | ||
// It was observed that using the default memory mapped option resulted in | ||
// significant interference and unacceptably high block validation times once the markset | ||
// exceeded 1GB in size. | ||
opts.TableLoadingMode = options.FileIO | ||
opts.ValueLogLoadingMode = options.FileIO | ||
opts.Logger = &badgerLogger{ | ||
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(), | ||
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), | ||
} | ||
|
||
db, err := badger.Open(opts) | ||
if err != nil { | ||
return nil, xerrors.Errorf("error creating badger markset: %w", err) | ||
} | ||
|
||
ms := &BadgerMarkSet{ | ||
pend: make(map[string]struct{}), | ||
writing: make(map[int]map[string]struct{}), | ||
db: db, | ||
path: path, | ||
} | ||
ms.cond.L = &ms.mx | ||
|
||
return ms, nil | ||
} | ||
|
||
func (e *BadgerMarkSetEnv) Close() error { | ||
return os.RemoveAll(e.path) | ||
} | ||
|
||
func (s *BadgerMarkSet) Mark(c cid.Cid) error { | ||
s.mx.Lock() | ||
|
||
if s.pend == nil { | ||
s.mx.Unlock() | ||
return errMarkSetClosed | ||
} | ||
|
||
s.pend[string(c.Hash())] = struct{}{} | ||
|
||
if len(s.pend) < badgerMarkSetBatchSize { | ||
s.mx.Unlock() | ||
return nil | ||
} | ||
|
||
pend := s.pend | ||
seqno := s.seqno | ||
s.seqno++ | ||
s.writing[seqno] = pend | ||
s.pend = make(map[string]struct{}) | ||
s.writers++ | ||
s.mx.Unlock() | ||
|
||
defer func() { | ||
s.mx.Lock() | ||
defer s.mx.Unlock() | ||
|
||
delete(s.writing, seqno) | ||
s.writers-- | ||
if s.writers == 0 { | ||
s.cond.Broadcast() | ||
} | ||
}() | ||
|
||
empty := []byte{} // not nil | ||
|
||
batch := s.db.NewWriteBatch() | ||
defer batch.Cancel() | ||
|
||
for k := range pend { | ||
if err := batch.Set([]byte(k), empty); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
err := batch.Flush() | ||
if err != nil { | ||
return xerrors.Errorf("error flushing batch to badger markset: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) { | ||
s.mx.RLock() | ||
defer s.mx.RUnlock() | ||
|
||
if s.pend == nil { | ||
return false, errMarkSetClosed | ||
} | ||
|
||
key := c.Hash() | ||
pendKey := string(key) | ||
_, ok := s.pend[pendKey] | ||
if ok { | ||
return true, nil | ||
} | ||
|
||
for _, wr := range s.writing { | ||
_, ok := wr[pendKey] | ||
if ok { | ||
return true, nil | ||
} | ||
} | ||
|
||
err := s.db.View(func(txn *badger.Txn) error { | ||
_, err := txn.Get(key) | ||
return err | ||
}) | ||
|
||
switch err { | ||
case nil: | ||
return true, nil | ||
|
||
case badger.ErrKeyNotFound: | ||
return false, nil | ||
|
||
default: | ||
return false, xerrors.Errorf("error checking badger markset: %w", err) | ||
} | ||
} | ||
|
||
func (s *BadgerMarkSet) Close() error { | ||
s.mx.Lock() | ||
defer s.mx.Unlock() | ||
|
||
if s.pend == nil { | ||
return nil | ||
} | ||
|
||
for s.writers > 0 { | ||
s.cond.Wait() | ||
} | ||
|
||
s.pend = nil | ||
db := s.db | ||
s.db = nil | ||
|
||
err := db.Close() | ||
if err != nil { | ||
return xerrors.Errorf("error closing badger markset: %w", err) | ||
} | ||
|
||
err = os.RemoveAll(s.path) | ||
if err != nil { | ||
return xerrors.Errorf("error deleting badger markset: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (s *BadgerMarkSet) SetConcurrent() {} | ||
|
||
// badger logging through go-log | ||
type badgerLogger struct { | ||
*zap.SugaredLogger | ||
skip2 *zap.SugaredLogger | ||
} | ||
|
||
func (b *badgerLogger) Warningf(format string, args ...interface{}) {} | ||
func (b *badgerLogger) Infof(format string, args ...interface{}) {} | ||
func (b *badgerLogger) Debugf(format string, args ...interface{}) {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters