Skip to content

Commit

Permalink
Merge pull request #6775 from filecoin-project/feat/splitstore-hot-me…
Browse files Browse the repository at this point in the history
…ssages

Splitstore: add retention policy option for keeping messages in the hotstore
  • Loading branch information
magik6k authored Jul 22, 2021
2 parents c8776d7 + da5aeda commit 0c02207
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 10 deletions.
7 changes: 7 additions & 0 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ type Config struct {
// which skips moving (as it is a noop, but still takes time to read all the cold objects)
// and directly purges cold blocks.
DiscardColdBlocks bool

// HotstoreMessageRetention indicates the hotstore retention policy for messages.
// It has the following semantics:
// - a value of 0 will only retain messages within the compaction boundary (4 finalities)
// - a positive integer indicates the number of finalities, outside the compaction boundary,
// for which messages will be retained in the hotstore.
HotStoreMessageRetention uint64
}

// ChainAccessor allows the Splitstore to access the chain. It will most likely
Expand Down
32 changes: 25 additions & 7 deletions blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,13 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
currentEpoch := curTs.Height()
boundaryEpoch := currentEpoch - CompactionBoundary

log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "compactionIndex", s.compactionIndex)
var inclMsgsEpoch abi.ChainEpoch
inclMsgsRange := abi.ChainEpoch(s.cfg.HotStoreMessageRetention) * build.Finality
if inclMsgsRange < boundaryEpoch {
inclMsgsEpoch = boundaryEpoch - inclMsgsRange
}

log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex)

markSet, err := s.markSetEnv.Create("live", s.markSetSize)
if err != nil {
Expand Down Expand Up @@ -430,7 +436,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
startMark := time.Now()

var count int64
err = s.walkChain(curTs, boundaryEpoch, true,
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
Expand Down Expand Up @@ -625,7 +631,7 @@ func (s *SplitStore) endTxnProtect() {
s.txnMissing = nil
}

func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool,
func (s *SplitStore) walkChain(ts *types.TipSet, inclState abi.ChainEpoch, inclMsgs abi.ChainEpoch,
f func(cid.Cid) error) error {
visited := cid.NewSet()
walked := cid.NewSet()
Expand Down Expand Up @@ -653,14 +659,25 @@ func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMs
return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err)
}

// we only scan the block if it is at or above the boundary
if hdr.Height >= boundary || hdr.Height == 0 {
scanCnt++
if inclMsgs && hdr.Height > 0 {
// message are retained if within the inclMsgs boundary
if hdr.Height >= inclMsgs && hdr.Height > 0 {
if inclMsgs < inclState {
// we need to use walkObjectIncomplete here, as messages may be missing early on if we
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
stopWalk := func(_ cid.Cid) error { return errStopWalk }
if err := s.walkObjectIncomplete(hdr.Messages, walked, f, stopWalk); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
} else {
if err := s.walkObject(hdr.Messages, walked, f); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
}
}

// state and message receipts is only retained if within the inclState boundary
if hdr.Height >= inclState || hdr.Height == 0 {
if hdr.Height > 0 {
if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil {
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
}
Expand All @@ -669,6 +686,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMs
if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil {
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
}
scanCnt++
}

if hdr.Height > 0 {
Expand Down
2 changes: 1 addition & 1 deletion blockstore/splitstore/splitstore_warmup.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
count := int64(0)
xcount := int64(0)
missing := int64(0)
err := s.walkChain(curTs, epoch, false,
err := s.walkChain(curTs, epoch, epoch+1, // we don't load messages in warmup
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
Expand Down
2 changes: 2 additions & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ type Splitstore struct {
ColdStoreType string
HotStoreType string
MarkSetType string

HotStoreMessageRetention uint64
}

// // Full Node
Expand Down
5 changes: 3 additions & 2 deletions node/modules/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
}

cfg := &splitstore.Config{
MarkSetType: cfg.Splitstore.MarkSetType,
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
MarkSetType: cfg.Splitstore.MarkSetType,
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention,
}
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
if err != nil {
Expand Down

0 comments on commit 0c02207

Please sign in to comment.