Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce splitstore memory usage during chain walks #6949

Merged
merged 16 commits into from
Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions blockstore/splitstore/markset.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,26 @@ type MarkSet interface {
SetConcurrent()
}

type MarkSetVisitor interface {
MarkSet
ObjectVisitor
}

type MarkSetEnv interface {
// Create creates a new markset within the environment.
// name is a unique name for this markset, mapped to the filesystem in disk-backed environments
// sizeHint is a hint about the expected size of the markset
Create(name string, sizeHint int64) (MarkSet, error)
// CreateVisitor is like Create, but returns a wider interface that supports atomic visits.
vyzo marked this conversation as resolved.
Show resolved Hide resolved
// It may not be supported by some markset types (e.g. bloom).
CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error)
vyzo marked this conversation as resolved.
Show resolved Hide resolved
// SupportsVisitor returns true if the marksets created by this environment support the visitor interface.
SupportsVisitor() bool
Close() error
}

func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
switch mtype {
case "bloom":
return NewBloomMarkSetEnv()
case "map":
return NewMapMarkSetEnv()
case "badger":
Expand Down
260 changes: 184 additions & 76 deletions blockstore/splitstore/markset_badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ type BadgerMarkSet struct {
writing map[int]map[string]struct{}
writers int
seqno int
version int

db *badger.DB
path string
}

var _ MarkSet = (*BadgerMarkSet)(nil)
var _ MarkSetVisitor = (*BadgerMarkSet)(nil)

var badgerMarkSetBatchSize = 16384

Expand All @@ -46,39 +48,13 @@ func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) {
return &BadgerMarkSetEnv{path: msPath}, nil
}

func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
func (e *BadgerMarkSetEnv) create(name string, sizeHint int64) (*BadgerMarkSet, error) {
name += ".tmp"
path := filepath.Join(e.path, name)

// clean up first
err := os.RemoveAll(path)
vyzo marked this conversation as resolved.
Show resolved Hide resolved
db, err := openTransientBadgerDB(path)
if err != nil {
return nil, xerrors.Errorf("error clearing markset directory: %w", err)
}

err = os.MkdirAll(path, 0755) //nolint:gosec
if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err)
}

opts := badger.DefaultOptions(path)
opts.SyncWrites = false
opts.CompactL0OnClose = false
opts.Compression = options.None
// Note: We use FileIO for loading modes to avoid memory thrashing and interference
// between the system blockstore and the markset.
// It was observed that using the default memory mapped option resulted in
// significant interference and unacceptably high block validation times once the markset
// exceeded 1GB in size.
opts.TableLoadingMode = options.FileIO
opts.ValueLogLoadingMode = options.FileIO
opts.Logger = &badgerLogger{
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
}

db, err := badger.Open(opts)
if err != nil {
return nil, xerrors.Errorf("error creating badger markset: %w", err)
return nil, xerrors.Errorf("error creating badger db: %w", err)
}

ms := &BadgerMarkSet{
Expand All @@ -92,86 +68,123 @@ func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error)
return ms, nil
}

func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return e.create(name, sizeHint)
}

func (e *BadgerMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
return e.create(name, sizeHint)
}

func (e *BadgerMarkSetEnv) SupportsVisitor() bool { return true }

func (e *BadgerMarkSetEnv) Close() error {
return os.RemoveAll(e.path)
}

func (s *BadgerMarkSet) Mark(c cid.Cid) error {
s.mx.Lock()

if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}

s.pend[string(c.Hash())] = struct{}{}
write, seqno := s.put(string(c.Hash()))
s.mx.Unlock()

if len(s.pend) < badgerMarkSetBatchSize {
s.mx.Unlock()
return nil
if write {
return s.write(seqno)
}

pend := s.pend
seqno := s.seqno
s.seqno++
s.writing[seqno] = pend
s.pend = make(map[string]struct{})
s.writers++
s.mx.Unlock()
return nil
}

defer func() {
s.mx.Lock()
defer s.mx.Unlock()
func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
s.mx.RLock()
defer s.mx.RUnlock()

delete(s.writing, seqno)
s.writers--
if s.writers == 0 {
s.cond.Broadcast()
}
}()
key := c.Hash()
pendKey := string(key)

empty := []byte{} // not nil
has, err := s.tryPending(pendKey)
if has || err != nil {
return has, err
}

batch := s.db.NewWriteBatch()
defer batch.Cancel()
return s.tryDB(key)
}

for k := range pend {
if err := batch.Set([]byte(k), empty); err != nil {
return err
func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
key := c.Hash()
pendKey := string(key)

s.mx.RLock()

has, err := s.tryPending(pendKey)
if has || err != nil {
s.mx.RUnlock()
return false, err
}

has, err = s.tryDB(key)
if has || err != nil {
s.mx.RUnlock()
return false, err
}

// we need to upgrade the lock to exclusive in order to write; take the version count to see
// if there was another write while we were upgrading
version := s.version
s.mx.RUnlock()

s.mx.Lock()
// we have to do the check dance again
has, err = s.tryPending(pendKey)
if has || err != nil {
s.mx.Unlock()
return false, err
}

if version != s.version {
// something was written to the db, we need to check it
has, err = s.tryDB(key)
if has || err != nil {
s.mx.Unlock()
return false, err
vyzo marked this conversation as resolved.
Show resolved Hide resolved
}
}

err := batch.Flush()
if err != nil {
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
write, seqno := s.put(pendKey)
s.mx.Unlock()

if write {
err = s.write(seqno)
}

return nil
return true, err
}

func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
s.mx.RLock()
defer s.mx.RUnlock()

// reader holds the (r)lock
func (s *BadgerMarkSet) tryPending(key string) (has bool, err error) {
if s.pend == nil {
return false, errMarkSetClosed
}

key := c.Hash()
pendKey := string(key)
_, ok := s.pend[pendKey]
if ok {
if _, ok := s.pend[key]; ok {
return true, nil
}

for _, wr := range s.writing {
_, ok := wr[pendKey]
if ok {
if _, ok := wr[key]; ok {
return true, nil
}
}

err := s.db.View(func(txn *badger.Txn) error {
return false, nil
}

func (s *BadgerMarkSet) tryDB(key []byte) (has bool, err error) {
err = s.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
return err
})
Expand All @@ -184,10 +197,70 @@ func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
return false, nil

default:
return false, xerrors.Errorf("error checking badger markset: %w", err)
return false, err
}
}

// writer holds the exclusive lock
func (s *BadgerMarkSet) put(key string) (write bool, seqno int) {
s.pend[key] = struct{}{}
if len(s.pend) < badgerMarkSetBatchSize {
return false, 0
}

seqno = s.seqno
s.seqno++
s.writing[seqno] = s.pend
s.pend = make(map[string]struct{})
vyzo marked this conversation as resolved.
Show resolved Hide resolved

return true, seqno
}

func (s *BadgerMarkSet) write(seqno int) (err error) {
s.mx.Lock()
if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}

pend := s.writing[seqno]
s.writers++
vyzo marked this conversation as resolved.
Show resolved Hide resolved
s.mx.Unlock()

defer func() {
s.mx.Lock()
defer s.mx.Unlock()

if err == nil {
delete(s.writing, seqno)
s.version++
}

s.writers--
if s.writers == 0 {
s.cond.Broadcast()
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
}
}()

empty := []byte{} // not nil

batch := s.db.NewWriteBatch()
defer batch.Cancel()

for k := range pend {
if err = batch.Set([]byte(k), empty); err != nil {
return xerrors.Errorf("error setting batch: %w", err)
}
}

err = batch.Flush()
if err != nil {
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
}

return nil
}

func (s *BadgerMarkSet) Close() error {
s.mx.Lock()
defer s.mx.Unlock()
Expand All @@ -204,21 +277,56 @@ func (s *BadgerMarkSet) Close() error {
db := s.db
s.db = nil

return closeTransientBadgerDB(db, s.path)
}

func (s *BadgerMarkSet) SetConcurrent() {}

func openTransientBadgerDB(path string) (*badger.DB, error) {
// clean up first
err := os.RemoveAll(path)
if err != nil {
return nil, xerrors.Errorf("error clearing markset directory: %w", err)
}

err = os.MkdirAll(path, 0755) //nolint:gosec
if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err)
}

opts := badger.DefaultOptions(path)
opts.SyncWrites = false
opts.CompactL0OnClose = false
opts.Compression = options.None
// Note: We use FileIO for loading modes to avoid memory thrashing and interference
// between the system blockstore and the markset.
// It was observed that using the default memory mapped option resulted in
// significant interference and unacceptably high block validation times once the markset
// exceeded 1GB in size.
opts.TableLoadingMode = options.FileIO
opts.ValueLogLoadingMode = options.FileIO
opts.Logger = &badgerLogger{
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
}

return badger.Open(opts)
}

func closeTransientBadgerDB(db *badger.DB, path string) error {
err := db.Close()
if err != nil {
return xerrors.Errorf("error closing badger markset: %w", err)
}

err = os.RemoveAll(s.path)
err = os.RemoveAll(path)
if err != nil {
return xerrors.Errorf("error deleting badger markset: %w", err)
}

return nil
}

func (s *BadgerMarkSet) SetConcurrent() {}

// badger logging through go-log
type badgerLogger struct {
*zap.SugaredLogger
Expand Down
Loading