Skip to content

Commit

Permalink
sync access to fast node cache to avoid concurrent write fatal error (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
p0mvn authored Feb 21, 2022
1 parent 26345cb commit 2a1daf4
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
2 changes: 0 additions & 2 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,6 @@ func (tree *MutableTree) getUnsavedFastNodeRemovals() map[string]interface{} {
func (tree *MutableTree) addUnsavedAddition(key []byte, node *FastNode) {
delete(tree.unsavedFastNodeRemovals, string(key))
tree.unsavedFastNodeAdditions[string(key)] = node
tree.ndb.cacheFastNode(node)
}

func (tree *MutableTree) saveFastNodeAdditions() error {
Expand All @@ -810,7 +809,6 @@ func (tree *MutableTree) saveFastNodeAdditions() error {
func (tree *MutableTree) addUnsavedRemoval(key []byte) {
delete(tree.unsavedFastNodeAdditions, string(key))
tree.unsavedFastNodeRemovals[string(key)] = true
tree.ndb.uncacheFastNode(key)
}

func (tree *MutableTree) saveFastNodeRemovals() error {
Expand Down
19 changes: 13 additions & 6 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,12 @@ func (ndb *nodeDB) GetNode(hash []byte) *Node {
}

func (ndb *nodeDB) GetFastNode(key []byte) (*FastNode, error) {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
if !ndb.hasUpgradedToFastStorage() {
return nil, errors.New("storage version is not fast")
}

ndb.mtx.Lock()
defer ndb.mtx.Unlock()

if len(key) == 0 {
return nil, fmt.Errorf("nodeDB.GetFastNode() requires key, len(key) equals 0")
}
Expand Down Expand Up @@ -233,6 +232,9 @@ func (ndb *nodeDB) SaveFastNodeNoCache(node *FastNode) error {
// 1.1.0-<version of the current live state>. Returns error if storage version is incorrect or on
// db error, nil otherwise. Requires changes to be comitted after to be persisted.
func (ndb *nodeDB) setFastStorageVersionToBatch() error {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()

var newVersion string
if ndb.storageVersion >= fastStorageVersionValue {
// Storage version should be at index 0 and latest fast cache version at index 1
Expand Down Expand Up @@ -270,6 +272,8 @@ func (ndb *nodeDB) hasUpgradedToFastStorage() bool {
// We determine this by checking the version of the live state and the version of the live state when
// latest storage was updated on disk the last time.
func (ndb *nodeDB) shouldForceFastStorageUpgrade() bool {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
versions := strings.Split(ndb.storageVersion, fastStorageVersionDelimiter)

if len(versions) == 2 {
Expand All @@ -281,6 +285,7 @@ func (ndb *nodeDB) shouldForceFastStorageUpgrade() bool {
}

// SaveNode saves a FastNode to disk.
// CONTRACT: the caller must serizlize access to this method through ndb.mtx.
func (ndb *nodeDB) saveFastNodeUnlocked(node *FastNode, shouldAddToCache bool) error {
if node.key == nil {
return fmt.Errorf("FastNode cannot have a nil value for key")
Expand Down Expand Up @@ -435,7 +440,6 @@ func (ndb *nodeDB) DeleteVersionsFrom(version int64) error {
if err = ndb.batch.Delete(ndb.nodeKey(hash)); err != nil {
return err
}
ndb.uncacheNode(hash)
} else if toVersion >= version-1 {
if err := ndb.batch.Delete(key); err != nil {
return err
Expand Down Expand Up @@ -470,10 +474,9 @@ func (ndb *nodeDB) DeleteVersionsFrom(version int64) error {
}

if version <= fastNode.versionLastUpdatedAt {
if err = ndb.batch.Delete(keyWithPrefix); err != nil {
if err := ndb.DeleteFastNode(fastNode.key); err != nil {
return err
}
ndb.uncacheFastNode(key)
}
return nil
})
Expand Down Expand Up @@ -559,6 +562,8 @@ func (ndb *nodeDB) DeleteVersionsRange(fromVersion, toVersion int64) error {
}

func (ndb *nodeDB) DeleteFastNode(key []byte) error {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
if err := ndb.batch.Delete(ndb.fastNodeKey(key)); err != nil {
return err
}
Expand Down Expand Up @@ -830,6 +835,7 @@ func (ndb *nodeDB) cacheNode(node *Node) {
}
}

// CONTRACT: the caller must serizlize access to this method through ndb.mtx.
func (ndb *nodeDB) uncacheFastNode(key []byte) {
if elem, ok := ndb.fastNodeCache[string(key)]; ok {
ndb.fastNodeCacheQueue.Remove(elem)
Expand All @@ -839,6 +845,7 @@ func (ndb *nodeDB) uncacheFastNode(key []byte) {

// Add a node to the cache and pop the least recently used node if we've
// reached the cache size limit.
// CONTRACT: the caller must serizlize access to this method through ndb.mtx.
func (ndb *nodeDB) cacheFastNode(node *FastNode) {
elem := ndb.fastNodeCacheQueue.PushBack(node)
ndb.fastNodeCache[string(node.key)] = elem
Expand Down

0 comments on commit 2a1daf4

Please sign in to comment.