Skip to content

Commit

Permalink
fix: Improve DAG sync with highly concurrent updates (sourcenetwork#1031
Browse files Browse the repository at this point in the history
)

Relevant issue(s)
Resolves sourcenetwork#1029
Resolves sourcenetwork#1030

Description
This PR resolves the eventual consistency bug when highly concurrent updates would cause the DAG syncer to fail reaching consistency. It also adds a simple retry mechanic to avoid sync issues when there is a transaction conflict on the head store.
  • Loading branch information
fredcarle authored Jan 17, 2023
1 parent b336615 commit d9a4419
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 46 deletions.
10 changes: 10 additions & 0 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ func init() {
log.FeedbackFatalE(context.Background(), "Could not bind net.peers", err)
}

startCmd.Flags().Int(
"max-txn-retries", cfg.Datastore.MaxTxnRetries,
"Specify the maximum number of retries per transaction",
)
err = viper.BindPFlag("datastore.maxtxnretries", startCmd.Flags().Lookup("max-txn-retries"))
if err != nil {
log.FeedbackFatalE(context.Background(), "Could not bind datastore.maxtxnretries", err)
}

startCmd.Flags().String(
"store", cfg.Datastore.Store,
"Specify the datastore to use (supported: badger, memory)",
Expand Down Expand Up @@ -228,6 +237,7 @@ func start(ctx context.Context) (*defraInstance, error) {

options := []db.Option{
db.WithUpdateEvents(),
db.WithMaxRetries(cfg.Datastore.MaxTxnRetries),
}

db, err := db.NewDB(ctx, rootstore, options...)
Expand Down
2 changes: 2 additions & 0 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type DB interface {

Events() events.Events

MaxTxnRetries() int

PrintDump(ctx context.Context) error

// SetReplicator adds a replicator to the persisted list or adds
Expand Down
8 changes: 8 additions & 0 deletions client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
errUnexpectedType string = "unexpected type"
errParsingFailed string = "failed to parse argument"
errUninitializeProperty string = "invalid state, required property is uninitialized"
errMaxTxnRetries string = "reached maximum transaction reties"
)

// Errors returnable from this package.
Expand All @@ -43,6 +44,7 @@ var (
ErrInvalidDeleteTarget = errors.New("the target document to delete is of invalid type")
ErrMalformedDocKey = errors.New("malformed DocKey, missing either version or cid")
ErrInvalidDocKeyVersion = errors.New("invalid DocKey version")
ErrMaxTxnRetries = errors.New(errMaxTxnRetries)
)

// NewErrFieldNotExist returns an error indicating that the given field does not exist.
Expand Down Expand Up @@ -97,3 +99,9 @@ func NewErrUninitializeProperty(host string, propertyName string) error {
errors.NewKV("PropertyName", propertyName),
)
}

// NewErrFieldIndexNotExist returns an error indicating that a field does not exist at the
// given location.
func NewErrMaxTxnRetries(inner error) error {
return errors.Wrap(errMaxTxnRetries, inner)
}
8 changes: 5 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,10 @@ func (cfg *Config) setBadgerVLogMaxSize() {

// DatastoreConfig configures datastores.
type DatastoreConfig struct {
Store string
Memory MemoryConfig
Badger BadgerConfig
Store string
Memory MemoryConfig
Badger BadgerConfig
MaxTxnRetries int
}

// BadgerConfig configures Badger's on-disk / filesystem mode.
Expand Down Expand Up @@ -289,6 +290,7 @@ func defaultDatastoreConfig() *DatastoreConfig {
ValueLogFileSize: 1 * GiB,
Options: &opts,
},
MaxTxnRetries: 5,
}
}

Expand Down
1 change: 1 addition & 0 deletions config/configfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ datastore:
# Maximum file size of the value log files. The in-memory file size will be 2*valuelogfilesize.
# Human friendly units can be used (ex: 500MB).
valuelogfilesize: {{ .Datastore.Badger.ValueLogFileSize }}
maxtxnretries: {{ .Datastore.MaxTxnRetries }}
# memory:
# size: {{ .Datastore.Memory.Size }}
Expand Down
2 changes: 2 additions & 0 deletions config/configfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func TestReadConfigFileForDatastore(t *testing.T) {
cfg.Datastore.Store = "badger"
cfg.Datastore.Badger.Path = "dataPath"
cfg.Datastore.Badger.ValueLogFileSize = 512 * MiB
cfg.Datastore.MaxTxnRetries = 3

err := cfg.WriteConfigFileToRootDir(dir)
if err != nil {
Expand All @@ -132,4 +133,5 @@ func TestReadConfigFileForDatastore(t *testing.T) {
assert.Equal(t, cfg.Datastore.Store, cfgFromFile.Datastore.Store)
assert.Equal(t, dir+"/"+cfg.Datastore.Badger.Path, cfgFromFile.Datastore.Badger.Path)
assert.Equal(t, cfg.Datastore.Badger.ValueLogFileSize, cfgFromFile.Datastore.Badger.ValueLogFileSize)
assert.Equal(t, cfg.Datastore.MaxTxnRetries, cfgFromFile.Datastore.MaxTxnRetries)
}
2 changes: 1 addition & 1 deletion core/crdt/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (base baseCRDT) setPriority(
) error {
prioK := key.WithPriorityFlag()
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(buf, priority+1)
n := binary.PutUvarint(buf, priority)
if n == 0 {
return ErrEncodingPriority
}
Expand Down
2 changes: 1 addition & 1 deletion core/crdt/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestBaseCRDTSetGetPriority(t *testing.T) {
return
}

if priority-1 != uint64(10) {
if priority != uint64(10) {
t.Errorf("baseCRDT incorrect priority. Have %v, want %v", priority, uint64(10))
}
}
5 changes: 5 additions & 0 deletions core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ func (reg LWWRegister) setValue(ctx context.Context, val []byte, priority uint64
return nil
} else if priority == curPrio {
curValue, _ := reg.store.Get(ctx, valueK.ToDS())
// Do not use the first byte of the current value in the comparison.
// It's metadata that will falsify the result.
if len(curValue) > 0 {
curValue = curValue[1:]
}
if bytes.Compare(curValue, val) >= 0 {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions core/crdt/lwwreg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func setupLWWRegister() LWWRegister {
func setupLoadedLWWRegster(ctx context.Context) LWWRegister {
lww := setupLWWRegister()
addDelta := lww.Set([]byte("test"))
addDelta.SetPriority(1)
lww.Merge(ctx, addDelta, "test")
return lww
}
Expand Down
23 changes: 23 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ var (
_ client.Collection = (*collection)(nil)
)

const (
defaultMaxTxnRetries = 5
)

// DB is the main interface for interacting with the
// DefraDB storage system.
type db struct {
Expand All @@ -57,6 +61,9 @@ type db struct {

parser core.Parser

// The maximum number of retries per transaction.
maxTxnRetries immutable.Option[int]

// The options used to init the database
options any
}
Expand All @@ -75,6 +82,13 @@ func WithUpdateEvents() Option {
}
}

// WithMaxRetries sets the maximum number of retries per transaction.
func WithMaxRetries(num int) Option {
return func(db *db) {
db.maxTxnRetries = immutable.Some(num)
}
}

// NewDB creates a new instance of the DB using the given options.
func NewDB(ctx context.Context, rootstore datastore.RootStore, options ...Option) (client.DB, error) {
return newDB(ctx, rootstore, options...)
Expand Down Expand Up @@ -175,6 +189,15 @@ func (db *db) Events() events.Events {
return db.events
}

// MaxRetries returns the maximum number of retries per transaction.
// Defaults to `defaultMaxTxnRetries` if not explicitely set
func (db *db) MaxTxnRetries() int {
if db.maxTxnRetries.HasValue() {
return db.maxTxnRetries.Value()
}
return defaultMaxTxnRetries
}

// PrintDump prints the entire database to console.
func (db *db) PrintDump(ctx context.Context) error {
return printStore(ctx, db.multistore.Rootstore())
Expand Down
3 changes: 3 additions & 0 deletions docs/data_format_changes/i1031-improve-dag-sync.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Change the way the priority is set

The priority of a field is set both in the data store and in the block store. Previously, the data store priority was up by one against the block store. We changed it to be the same which resulted in a breaking change on the priority comparison from one version to the next.
92 changes: 51 additions & 41 deletions net/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,55 +44,65 @@ func (p *Peer) processLog(
getter ipld.NodeGetter) ([]cid.Cid, error) {
log.Debug(ctx, "Running processLog")

txn, err := p.db.NewTxn(ctx, false)
if err != nil {
return nil, err
}
defer txn.Discard(ctx)
// TODO: Implement better transaction retry mechanics
// Github issue #1028
var txnErr error
for retry := 0; retry < p.db.MaxTxnRetries(); retry++ {
txn, err := p.db.NewTxn(ctx, false)
if err != nil {
return nil, err
}
defer txn.Discard(ctx)

// KEEPING FOR REFERENCE FOR NOW
// check if we already have this block
// exists, err := txn.DAGstore().Has(ctx, c)
// if err != nil {
// return nil, errors.Wrap("failed to check for existing block %s", c, err)
// }
// if exists {
// log.Debugf("Already have block %s locally, skipping.", c)
// return nil, nil
// }

crdt, err := initCRDTForType(ctx, txn, col, dockey, field)
if err != nil {
return nil, err
}

// KEEPING FOR REFERENCE FOR NOW
// check if we already have this block
// exists, err := txn.DAGstore().Has(ctx, c)
// if err != nil {
// return nil, errors.Wrap("failed to check for existing block %s", c, err)
// }
// if exists {
// log.Debugf("Already have block %s locally, skipping.", c)
// return nil, nil
// }
delta, err := crdt.DeltaDecode(nd)
if err != nil {
return nil, errors.Wrap("failed to decode delta object", err)
}

crdt, err := initCRDTForType(ctx, txn, col, dockey, field)
if err != nil {
return nil, err
}
log.Debug(
ctx,
"Processing PushLog request",
logging.NewKV("DocKey", dockey),
logging.NewKV("CID", c),
)

delta, err := crdt.DeltaDecode(nd)
if err != nil {
return nil, errors.Wrap("failed to decode delta object", err)
}
if err := txn.DAGstore().Put(ctx, nd); err != nil {
return nil, err
}

log.Debug(
ctx,
"Processing PushLog request",
logging.NewKV("DocKey", dockey),
logging.NewKV("CID", c),
)
height := delta.GetPriority()
ng := p.createNodeGetter(crdt, getter)
cids, err := crdt.Clock().ProcessNode(ctx, ng, c, delta.GetPriority(), delta, nd)
if err != nil {
return nil, err
}

if err := txn.DAGstore().Put(ctx, nd); err != nil {
return nil, err
}
// mark this obj as done
p.queuedChildren.Remove(c)

ng := p.createNodeGetter(crdt, getter)
cids, err := crdt.Clock().ProcessNode(ctx, ng, c, height, delta, nd)
if err != nil {
return nil, err
txnErr = txn.Commit(ctx)
if txnErr != nil {
continue
}
return cids, txnErr
}

// mark this obj as done
p.queuedChildren.Remove(c)

return cids, txn.Commit(ctx)
return nil, client.NewErrMaxTxnRetries(txnErr)
}

func initCRDTForType(
Expand Down

0 comments on commit d9a4419

Please sign in to comment.