From 210423cc679b874b4d9bd4c067fe693f3aea8f08 Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Mon, 16 Jan 2023 20:43:53 -0500 Subject: [PATCH] fix: Improve DAG sync with highly concurrent updates (#1031) Relevant issue(s) Resolves #1029 Resolves #1030 Description This PR resolves the eventual consistency bug when highly concurrent updates would cause the DAG syncer to fail reaching consistency. It also adds a simple retry mechanic to avoid sync issues when there is a transaction conflict on the head store. --- cli/start.go | 10 ++ client/db.go | 2 + client/errors.go | 8 ++ config/config.go | 8 +- config/configfile.go | 1 + config/configfile_test.go | 2 + core/crdt/base.go | 2 +- core/crdt/base_test.go | 2 +- core/crdt/lwwreg.go | 5 + core/crdt/lwwreg_test.go | 1 + db/db.go | 23 +++++ .../i1031-improve-dag-sync.md | 3 + net/process.go | 92 ++++++++++--------- 13 files changed, 113 insertions(+), 46 deletions(-) create mode 100644 docs/data_format_changes/i1031-improve-dag-sync.md diff --git a/cli/start.go b/cli/start.go index 49f5494627..b91fca39f3 100644 --- a/cli/start.go +++ b/cli/start.go @@ -90,6 +90,15 @@ func init() { log.FeedbackFatalE(context.Background(), "Could not bind net.peers", err) } + startCmd.Flags().Int( + "max-txn-retries", cfg.Datastore.MaxTxnRetries, + "Specify the maximum number of retries per transaction", + ) + err = viper.BindPFlag("datastore.maxtxnretries", startCmd.Flags().Lookup("max-txn-retries")) + if err != nil { + log.FeedbackFatalE(context.Background(), "Could not bind datastore.maxtxnretries", err) + } + startCmd.Flags().String( "store", cfg.Datastore.Store, "Specify the datastore to use (supported: badger, memory)", @@ -228,6 +237,7 @@ func start(ctx context.Context) (*defraInstance, error) { options := []db.Option{ db.WithUpdateEvents(), + db.WithMaxRetries(cfg.Datastore.MaxTxnRetries), } db, err := db.NewDB(ctx, rootstore, options...) diff --git a/client/db.go b/client/db.go index 7d13382bb9..ec032eab2a 100644 --- a/client/db.go +++ b/client/db.go @@ -37,6 +37,8 @@ type DB interface { Events() events.Events + MaxTxnRetries() int + PrintDump(ctx context.Context) error // SetReplicator adds a replicator to the persisted list or adds diff --git a/client/errors.go b/client/errors.go index 434dfe78c8..5e55c22710 100644 --- a/client/errors.go +++ b/client/errors.go @@ -22,6 +22,7 @@ const ( errUnexpectedType string = "unexpected type" errParsingFailed string = "failed to parse argument" errUninitializeProperty string = "invalid state, required property is uninitialized" + errMaxTxnRetries string = "reached maximum transaction reties" ) // Errors returnable from this package. @@ -43,6 +44,7 @@ var ( ErrInvalidDeleteTarget = errors.New("the target document to delete is of invalid type") ErrMalformedDocKey = errors.New("malformed DocKey, missing either version or cid") ErrInvalidDocKeyVersion = errors.New("invalid DocKey version") + ErrMaxTxnRetries = errors.New(errMaxTxnRetries) ) // NewErrFieldNotExist returns an error indicating that the given field does not exist. @@ -97,3 +99,9 @@ func NewErrUninitializeProperty(host string, propertyName string) error { errors.NewKV("PropertyName", propertyName), ) } + +// NewErrFieldIndexNotExist returns an error indicating that a field does not exist at the +// given location. +func NewErrMaxTxnRetries(inner error) error { + return errors.Wrap(errMaxTxnRetries, inner) +} diff --git a/config/config.go b/config/config.go index 6a150da8c9..aad992af43 100644 --- a/config/config.go +++ b/config/config.go @@ -190,9 +190,10 @@ func (cfg *Config) setBadgerVLogMaxSize() { // DatastoreConfig configures datastores. type DatastoreConfig struct { - Store string - Memory MemoryConfig - Badger BadgerConfig + Store string + Memory MemoryConfig + Badger BadgerConfig + MaxTxnRetries int } // BadgerConfig configures Badger's on-disk / filesystem mode. @@ -289,6 +290,7 @@ func defaultDatastoreConfig() *DatastoreConfig { ValueLogFileSize: 1 * GiB, Options: &opts, }, + MaxTxnRetries: 5, } } diff --git a/config/configfile.go b/config/configfile.go index bdd2dad5b9..cbf3898f84 100644 --- a/config/configfile.go +++ b/config/configfile.go @@ -59,6 +59,7 @@ datastore: # Maximum file size of the value log files. The in-memory file size will be 2*valuelogfilesize. # Human friendly units can be used (ex: 500MB). valuelogfilesize: {{ .Datastore.Badger.ValueLogFileSize }} + maxtxnretries: {{ .Datastore.MaxTxnRetries }} # memory: # size: {{ .Datastore.Memory.Size }} diff --git a/config/configfile_test.go b/config/configfile_test.go index 5080964966..3eacc1f48b 100644 --- a/config/configfile_test.go +++ b/config/configfile_test.go @@ -110,6 +110,7 @@ func TestReadConfigFileForDatastore(t *testing.T) { cfg.Datastore.Store = "badger" cfg.Datastore.Badger.Path = "dataPath" cfg.Datastore.Badger.ValueLogFileSize = 512 * MiB + cfg.Datastore.MaxTxnRetries = 3 err := cfg.WriteConfigFileToRootDir(dir) if err != nil { @@ -132,4 +133,5 @@ func TestReadConfigFileForDatastore(t *testing.T) { assert.Equal(t, cfg.Datastore.Store, cfgFromFile.Datastore.Store) assert.Equal(t, dir+"/"+cfg.Datastore.Badger.Path, cfgFromFile.Datastore.Badger.Path) assert.Equal(t, cfg.Datastore.Badger.ValueLogFileSize, cfgFromFile.Datastore.Badger.ValueLogFileSize) + assert.Equal(t, cfg.Datastore.MaxTxnRetries, cfgFromFile.Datastore.MaxTxnRetries) } diff --git a/core/crdt/base.go b/core/crdt/base.go index 280c00f273..d24b263645 100644 --- a/core/crdt/base.go +++ b/core/crdt/base.go @@ -45,7 +45,7 @@ func (base baseCRDT) setPriority( ) error { prioK := key.WithPriorityFlag() buf := make([]byte, binary.MaxVarintLen64) - n := binary.PutUvarint(buf, priority+1) + n := binary.PutUvarint(buf, priority) if n == 0 { return ErrEncodingPriority } diff --git a/core/crdt/base_test.go b/core/crdt/base_test.go index 708f1b6de0..5fd7d9248e 100644 --- a/core/crdt/base_test.go +++ b/core/crdt/base_test.go @@ -70,7 +70,7 @@ func TestBaseCRDTSetGetPriority(t *testing.T) { return } - if priority-1 != uint64(10) { + if priority != uint64(10) { t.Errorf("baseCRDT incorrect priority. Have %v, want %v", priority, uint64(10)) } } diff --git a/core/crdt/lwwreg.go b/core/crdt/lwwreg.go index 226b558a66..ecafe34c2a 100644 --- a/core/crdt/lwwreg.go +++ b/core/crdt/lwwreg.go @@ -149,6 +149,11 @@ func (reg LWWRegister) setValue(ctx context.Context, val []byte, priority uint64 return nil } else if priority == curPrio { curValue, _ := reg.store.Get(ctx, valueK.ToDS()) + // Do not use the first byte of the current value in the comparison. + // It's metadata that will falsify the result. + if len(curValue) > 0 { + curValue = curValue[1:] + } if bytes.Compare(curValue, val) >= 0 { return nil } diff --git a/core/crdt/lwwreg_test.go b/core/crdt/lwwreg_test.go index b1b928538c..887a155424 100644 --- a/core/crdt/lwwreg_test.go +++ b/core/crdt/lwwreg_test.go @@ -39,6 +39,7 @@ func setupLWWRegister() LWWRegister { func setupLoadedLWWRegster(ctx context.Context) LWWRegister { lww := setupLWWRegister() addDelta := lww.Set([]byte("test")) + addDelta.SetPriority(1) lww.Merge(ctx, addDelta, "test") return lww } diff --git a/db/db.go b/db/db.go index d748303806..2af83a3e8a 100644 --- a/db/db.go +++ b/db/db.go @@ -43,6 +43,10 @@ var ( _ client.Collection = (*collection)(nil) ) +const ( + defaultMaxTxnRetries = 5 +) + // DB is the main interface for interacting with the // DefraDB storage system. type db struct { @@ -57,6 +61,9 @@ type db struct { parser core.Parser + // The maximum number of retries per transaction. + maxTxnRetries immutable.Option[int] + // The options used to init the database options any } @@ -75,6 +82,13 @@ func WithUpdateEvents() Option { } } +// WithMaxRetries sets the maximum number of retries per transaction. +func WithMaxRetries(num int) Option { + return func(db *db) { + db.maxTxnRetries = immutable.Some(num) + } +} + // NewDB creates a new instance of the DB using the given options. func NewDB(ctx context.Context, rootstore datastore.RootStore, options ...Option) (client.DB, error) { return newDB(ctx, rootstore, options...) @@ -175,6 +189,15 @@ func (db *db) Events() events.Events { return db.events } +// MaxRetries returns the maximum number of retries per transaction. +// Defaults to `defaultMaxTxnRetries` if not explicitely set +func (db *db) MaxTxnRetries() int { + if db.maxTxnRetries.HasValue() { + return db.maxTxnRetries.Value() + } + return defaultMaxTxnRetries +} + // PrintDump prints the entire database to console. func (db *db) PrintDump(ctx context.Context) error { return printStore(ctx, db.multistore.Rootstore()) diff --git a/docs/data_format_changes/i1031-improve-dag-sync.md b/docs/data_format_changes/i1031-improve-dag-sync.md new file mode 100644 index 0000000000..8a874b063c --- /dev/null +++ b/docs/data_format_changes/i1031-improve-dag-sync.md @@ -0,0 +1,3 @@ +# Change the way the priority is set + +The priority of a field is set both in the data store and in the block store. Previously, the data store priority was up by one against the block store. We changed it to be the same which resulted in a breaking change on the priority comparison from one version to the next. \ No newline at end of file diff --git a/net/process.go b/net/process.go index 2525725838..dd3cdf6ea4 100644 --- a/net/process.go +++ b/net/process.go @@ -44,55 +44,65 @@ func (p *Peer) processLog( getter ipld.NodeGetter) ([]cid.Cid, error) { log.Debug(ctx, "Running processLog") - txn, err := p.db.NewTxn(ctx, false) - if err != nil { - return nil, err - } - defer txn.Discard(ctx) + // TODO: Implement better transaction retry mechanics + // Github issue #1028 + var txnErr error + for retry := 0; retry < p.db.MaxTxnRetries(); retry++ { + txn, err := p.db.NewTxn(ctx, false) + if err != nil { + return nil, err + } + defer txn.Discard(ctx) + + // KEEPING FOR REFERENCE FOR NOW + // check if we already have this block + // exists, err := txn.DAGstore().Has(ctx, c) + // if err != nil { + // return nil, errors.Wrap("failed to check for existing block %s", c, err) + // } + // if exists { + // log.Debugf("Already have block %s locally, skipping.", c) + // return nil, nil + // } + + crdt, err := initCRDTForType(ctx, txn, col, dockey, field) + if err != nil { + return nil, err + } - // KEEPING FOR REFERENCE FOR NOW - // check if we already have this block - // exists, err := txn.DAGstore().Has(ctx, c) - // if err != nil { - // return nil, errors.Wrap("failed to check for existing block %s", c, err) - // } - // if exists { - // log.Debugf("Already have block %s locally, skipping.", c) - // return nil, nil - // } + delta, err := crdt.DeltaDecode(nd) + if err != nil { + return nil, errors.Wrap("failed to decode delta object", err) + } - crdt, err := initCRDTForType(ctx, txn, col, dockey, field) - if err != nil { - return nil, err - } + log.Debug( + ctx, + "Processing PushLog request", + logging.NewKV("DocKey", dockey), + logging.NewKV("CID", c), + ) - delta, err := crdt.DeltaDecode(nd) - if err != nil { - return nil, errors.Wrap("failed to decode delta object", err) - } + if err := txn.DAGstore().Put(ctx, nd); err != nil { + return nil, err + } - log.Debug( - ctx, - "Processing PushLog request", - logging.NewKV("DocKey", dockey), - logging.NewKV("CID", c), - ) - height := delta.GetPriority() + ng := p.createNodeGetter(crdt, getter) + cids, err := crdt.Clock().ProcessNode(ctx, ng, c, delta.GetPriority(), delta, nd) + if err != nil { + return nil, err + } - if err := txn.DAGstore().Put(ctx, nd); err != nil { - return nil, err - } + // mark this obj as done + p.queuedChildren.Remove(c) - ng := p.createNodeGetter(crdt, getter) - cids, err := crdt.Clock().ProcessNode(ctx, ng, c, height, delta, nd) - if err != nil { - return nil, err + txnErr = txn.Commit(ctx) + if txnErr != nil { + continue + } + return cids, txnErr } - // mark this obj as done - p.queuedChildren.Remove(c) - - return cids, txn.Commit(ctx) + return nil, client.NewErrMaxTxnRetries(txnErr) } func initCRDTForType(