From 2a1daf4268e04623d8e709b2b9b1ba22918504ce Mon Sep 17 00:00:00 2001 From: Roman <34196718+p0mvn@users.noreply.github.com> Date: Mon, 21 Feb 2022 16:50:47 -0500 Subject: [PATCH] sync access to fast node cache to avoid concurrent write fatal error (#23) --- mutable_tree.go | 2 -- nodedb.go | 19 +++++++++++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/mutable_tree.go b/mutable_tree.go index 078c6eb2b..25d26366a 100644 --- a/mutable_tree.go +++ b/mutable_tree.go @@ -789,7 +789,6 @@ func (tree *MutableTree) getUnsavedFastNodeRemovals() map[string]interface{} { func (tree *MutableTree) addUnsavedAddition(key []byte, node *FastNode) { delete(tree.unsavedFastNodeRemovals, string(key)) tree.unsavedFastNodeAdditions[string(key)] = node - tree.ndb.cacheFastNode(node) } func (tree *MutableTree) saveFastNodeAdditions() error { @@ -810,7 +809,6 @@ func (tree *MutableTree) saveFastNodeAdditions() error { func (tree *MutableTree) addUnsavedRemoval(key []byte) { delete(tree.unsavedFastNodeAdditions, string(key)) tree.unsavedFastNodeRemovals[string(key)] = true - tree.ndb.uncacheFastNode(key) } func (tree *MutableTree) saveFastNodeRemovals() error { diff --git a/nodedb.go b/nodedb.go index 7bee811c5..61cb7501a 100644 --- a/nodedb.go +++ b/nodedb.go @@ -151,13 +151,12 @@ func (ndb *nodeDB) GetNode(hash []byte) *Node { } func (ndb *nodeDB) GetFastNode(key []byte) (*FastNode, error) { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() if !ndb.hasUpgradedToFastStorage() { return nil, errors.New("storage version is not fast") } - ndb.mtx.Lock() - defer ndb.mtx.Unlock() - if len(key) == 0 { return nil, fmt.Errorf("nodeDB.GetFastNode() requires key, len(key) equals 0") } @@ -233,6 +232,9 @@ func (ndb *nodeDB) SaveFastNodeNoCache(node *FastNode) error { // 1.1.0-. Returns error if storage version is incorrect or on // db error, nil otherwise. Requires changes to be comitted after to be persisted. func (ndb *nodeDB) setFastStorageVersionToBatch() error { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + var newVersion string if ndb.storageVersion >= fastStorageVersionValue { // Storage version should be at index 0 and latest fast cache version at index 1 @@ -270,6 +272,8 @@ func (ndb *nodeDB) hasUpgradedToFastStorage() bool { // We determine this by checking the version of the live state and the version of the live state when // latest storage was updated on disk the last time. func (ndb *nodeDB) shouldForceFastStorageUpgrade() bool { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() versions := strings.Split(ndb.storageVersion, fastStorageVersionDelimiter) if len(versions) == 2 { @@ -281,6 +285,7 @@ func (ndb *nodeDB) shouldForceFastStorageUpgrade() bool { } // SaveNode saves a FastNode to disk. +// CONTRACT: the caller must serizlize access to this method through ndb.mtx. func (ndb *nodeDB) saveFastNodeUnlocked(node *FastNode, shouldAddToCache bool) error { if node.key == nil { return fmt.Errorf("FastNode cannot have a nil value for key") @@ -435,7 +440,6 @@ func (ndb *nodeDB) DeleteVersionsFrom(version int64) error { if err = ndb.batch.Delete(ndb.nodeKey(hash)); err != nil { return err } - ndb.uncacheNode(hash) } else if toVersion >= version-1 { if err := ndb.batch.Delete(key); err != nil { return err @@ -470,10 +474,9 @@ func (ndb *nodeDB) DeleteVersionsFrom(version int64) error { } if version <= fastNode.versionLastUpdatedAt { - if err = ndb.batch.Delete(keyWithPrefix); err != nil { + if err := ndb.DeleteFastNode(fastNode.key); err != nil { return err } - ndb.uncacheFastNode(key) } return nil }) @@ -559,6 +562,8 @@ func (ndb *nodeDB) DeleteVersionsRange(fromVersion, toVersion int64) error { } func (ndb *nodeDB) DeleteFastNode(key []byte) error { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() if err := ndb.batch.Delete(ndb.fastNodeKey(key)); err != nil { return err } @@ -830,6 +835,7 @@ func (ndb *nodeDB) cacheNode(node *Node) { } } +// CONTRACT: the caller must serizlize access to this method through ndb.mtx. func (ndb *nodeDB) uncacheFastNode(key []byte) { if elem, ok := ndb.fastNodeCache[string(key)]; ok { ndb.fastNodeCacheQueue.Remove(elem) @@ -839,6 +845,7 @@ func (ndb *nodeDB) uncacheFastNode(key []byte) { // Add a node to the cache and pop the least recently used node if we've // reached the cache size limit. +// CONTRACT: the caller must serizlize access to this method through ndb.mtx. func (ndb *nodeDB) cacheFastNode(node *FastNode) { elem := ndb.fastNodeCacheQueue.PushBack(node) ndb.fastNodeCache[string(node.key)] = elem