From 4811ba9386c3bb557dc88337a207fa9ed6649f54 Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Mon, 8 Apr 2024 12:56:10 -0400 Subject: [PATCH] feat: Add P Counter CRDT (#2482) ## Relevant issue(s) Resolves #2116 ## Description This PR adds a Positive Counter CRDT type. It converts the PNCounter implementation to a Counter implementation that supports both PN and P counters. --- client/ctype.go | 7 +- core/crdt/{pncounter.go => counter.go} | 102 ++++--- core/crdt/errors.go | 6 + db/base/collection_keys.go | 2 +- merkle/crdt/{pncounter.go => counter.go} | 31 +- merkle/crdt/merklecrdt.go | 14 +- request/graphql/schema/types/types.go | 16 +- .../mutation/create/crdt/pcounter_test.go | 57 ++++ .../mutation/update/crdt/pcounter_test.go | 265 ++++++++++++++++++ .../mutation/update/crdt/pncounter_test.go | 5 +- .../state/simple/peer/crdt/pcounter_test.go | 124 ++++++++ .../peer_replicator/crdt/pcounter_test.go | 160 +++++++++++ .../simple/replicator/crdt/pcounter_test.go | 71 +++++ .../query/simple/with_cid_doc_id_test.go | 94 +++++++ tests/integration/schema/crdt_type_test.go | 95 +++++++ .../updates/add/field/crdt/pcounter_test.go | 73 +++++ .../updates/add/field/crdt/pncounter_test.go | 4 +- 17 files changed, 1053 insertions(+), 73 deletions(-) rename core/crdt/{pncounter.go => counter.go} (56%) rename merkle/crdt/{pncounter.go => counter.go} (55%) create mode 100644 tests/integration/mutation/create/crdt/pcounter_test.go create mode 100644 tests/integration/mutation/update/crdt/pcounter_test.go create mode 100644 tests/integration/net/state/simple/peer/crdt/pcounter_test.go create mode 100644 tests/integration/net/state/simple/peer_replicator/crdt/pcounter_test.go create mode 100644 tests/integration/net/state/simple/replicator/crdt/pcounter_test.go create mode 100644 tests/integration/schema/updates/add/field/crdt/pcounter_test.go diff --git a/client/ctype.go b/client/ctype.go index c5f792df86..f9d961ec3e 100644 --- a/client/ctype.go +++ b/client/ctype.go @@ -23,12 +23,13 @@ const ( OBJECT COMPOSITE PN_COUNTER + P_COUNTER ) // IsSupportedFieldCType returns true if the type is supported as a document field type. func (t CType) IsSupportedFieldCType() bool { switch t { - case NONE_CRDT, LWW_REGISTER, PN_COUNTER: + case NONE_CRDT, LWW_REGISTER, PN_COUNTER, P_COUNTER: return true default: return false @@ -38,7 +39,7 @@ func (t CType) IsSupportedFieldCType() bool { // IsCompatibleWith returns true if the CRDT is compatible with the field kind func (t CType) IsCompatibleWith(kind FieldKind) bool { switch t { - case PN_COUNTER: + case PN_COUNTER, P_COUNTER: if kind == FieldKind_NILLABLE_INT || kind == FieldKind_NILLABLE_FLOAT { return true } @@ -61,6 +62,8 @@ func (t CType) String() string { return "composite" case PN_COUNTER: return "pncounter" + case P_COUNTER: + return "pcounter" default: return "unknown" } diff --git a/core/crdt/pncounter.go b/core/crdt/counter.go similarity index 56% rename from core/crdt/pncounter.go rename to core/crdt/counter.go index 7d8b02c1a4..01ca3cf0da 100644 --- a/core/crdt/pncounter.go +++ b/core/crdt/counter.go @@ -1,4 +1,4 @@ -// Copyright 2023 Democratized Data Foundation +// Copyright 2024 Democratized Data Foundation // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -33,18 +33,18 @@ import ( var ( // ensure types implements core interfaces - _ core.ReplicatedData = (*PNCounter[float64])(nil) - _ core.ReplicatedData = (*PNCounter[int64])(nil) - _ core.Delta = (*PNCounterDelta[float64])(nil) - _ core.Delta = (*PNCounterDelta[int64])(nil) + _ core.ReplicatedData = (*Counter[float64])(nil) + _ core.ReplicatedData = (*Counter[int64])(nil) + _ core.Delta = (*CounterDelta[float64])(nil) + _ core.Delta = (*CounterDelta[int64])(nil) ) type Incrementable interface { constraints.Integer | constraints.Float } -// PNCounterDelta is a single delta operation for an PNCounter -type PNCounterDelta[T Incrementable] struct { +// CounterDelta is a single delta operation for a Counter +type CounterDelta[T Incrementable] struct { DocID []byte FieldName string Priority uint64 @@ -59,17 +59,17 @@ type PNCounterDelta[T Incrementable] struct { } // GetPriority gets the current priority for this delta. -func (delta *PNCounterDelta[T]) GetPriority() uint64 { +func (delta *CounterDelta[T]) GetPriority() uint64 { return delta.Priority } // SetPriority will set the priority for this delta. -func (delta *PNCounterDelta[T]) SetPriority(prio uint64) { +func (delta *CounterDelta[T]) SetPriority(prio uint64) { delta.Priority = prio } // Marshal encodes the delta using CBOR. -func (delta *PNCounterDelta[T]) Marshal() ([]byte, error) { +func (delta *CounterDelta[T]) Marshal() ([]byte, error) { h := &codec.CborHandle{} buf := bytes.NewBuffer(nil) enc := codec.NewEncoder(buf, h) @@ -81,44 +81,50 @@ func (delta *PNCounterDelta[T]) Marshal() ([]byte, error) { } // Unmarshal decodes the delta from CBOR. -func (delta *PNCounterDelta[T]) Unmarshal(b []byte) error { +func (delta *CounterDelta[T]) Unmarshal(b []byte) error { h := &codec.CborHandle{} dec := codec.NewDecoderBytes(b, h) return dec.Decode(delta) } -// PNCounter, is a simple CRDT type that allows increment/decrement +// Counter, is a simple CRDT type that allows increment/decrement // of an Int and Float data types that ensures convergence. -type PNCounter[T Incrementable] struct { +type Counter[T Incrementable] struct { baseCRDT + AllowDecrement bool } -// NewPNCounter returns a new instance of the PNCounter with the given ID. -func NewPNCounter[T Incrementable]( +// NewCounter returns a new instance of the Counter with the given ID. +func NewCounter[T Incrementable]( store datastore.DSReaderWriter, schemaVersionKey core.CollectionSchemaVersionKey, key core.DataStoreKey, fieldName string, -) PNCounter[T] { - return PNCounter[T]{newBaseCRDT(store, key, schemaVersionKey, fieldName)} + allowDecrement bool, +) Counter[T] { + return Counter[T]{newBaseCRDT(store, key, schemaVersionKey, fieldName), allowDecrement} } -// Value gets the current register value -func (reg PNCounter[T]) Value(ctx context.Context) ([]byte, error) { - valueK := reg.key.WithValueFlag() - buf, err := reg.store.Get(ctx, valueK.ToDS()) +// Value gets the current counter value +func (c Counter[T]) Value(ctx context.Context) ([]byte, error) { + valueK := c.key.WithValueFlag() + buf, err := c.store.Get(ctx, valueK.ToDS()) if err != nil { return nil, err } return buf, nil } -// Set generates a new delta with the supplied value -func (reg PNCounter[T]) Increment(ctx context.Context, value T) (*PNCounterDelta[T], error) { +// Set generates a new delta with the supplied value. +// +// WARNING: Incrementing an integer and causing it to overflow the int64 max value +// will cause the value to roll over to the int64 min value. Incremeting a float and +// causing it to overflow the float64 max value will act like a no-op. +func (c Counter[T]) Increment(ctx context.Context, value T) (*CounterDelta[T], error) { // To ensure that the dag block is unique, we add a random number to the delta. // This is done only on update (if the doc doesn't already exist) to ensure that the // initial dag block of a document can be reproducible. - exists, err := reg.store.Has(ctx, reg.key.ToPrimaryDataStoreKey().ToDS()) + exists, err := c.store.Has(ctx, c.key.ToPrimaryDataStoreKey().ToDS()) if err != nil { return nil, err } @@ -131,29 +137,32 @@ func (reg PNCounter[T]) Increment(ctx context.Context, value T) (*PNCounterDelta nonce = r.Int64() } - return &PNCounterDelta[T]{ - DocID: []byte(reg.key.DocID), - FieldName: reg.fieldName, + return &CounterDelta[T]{ + DocID: []byte(c.key.DocID), + FieldName: c.fieldName, Data: value, - SchemaVersionID: reg.schemaVersionKey.SchemaVersionId, + SchemaVersionID: c.schemaVersionKey.SchemaVersionId, Nonce: nonce, }, nil } // Merge implements ReplicatedData interface. -// It merges two PNCounterRegisty by adding the values together. -func (reg PNCounter[T]) Merge(ctx context.Context, delta core.Delta) error { - d, ok := delta.(*PNCounterDelta[T]) +// It merges two CounterRegisty by adding the values together. +func (c Counter[T]) Merge(ctx context.Context, delta core.Delta) error { + d, ok := delta.(*CounterDelta[T]) if !ok { return ErrMismatchedMergeType } - return reg.incrementValue(ctx, d.Data, d.GetPriority()) + return c.incrementValue(ctx, d.Data, d.GetPriority()) } -func (reg PNCounter[T]) incrementValue(ctx context.Context, value T, priority uint64) error { - key := reg.key.WithValueFlag() - marker, err := reg.store.Get(ctx, reg.key.ToPrimaryDataStoreKey().ToDS()) +func (c Counter[T]) incrementValue(ctx context.Context, value T, priority uint64) error { + if !c.AllowDecrement && value < 0 { + return NewErrNegativeValue(value) + } + key := c.key.WithValueFlag() + marker, err := c.store.Get(ctx, c.key.ToPrimaryDataStoreKey().ToDS()) if err != nil && !errors.Is(err, ds.ErrNotFound) { return err } @@ -161,7 +170,7 @@ func (reg PNCounter[T]) incrementValue(ctx context.Context, value T, priority ui key = key.WithDeletedFlag() } - curValue, err := reg.getCurrentValue(ctx, key) + curValue, err := c.getCurrentValue(ctx, key) if err != nil { return err } @@ -172,16 +181,16 @@ func (reg PNCounter[T]) incrementValue(ctx context.Context, value T, priority ui return err } - err = reg.store.Put(ctx, key.ToDS(), b) + err = c.store.Put(ctx, key.ToDS(), b) if err != nil { return NewErrFailedToStoreValue(err) } - return reg.setPriority(ctx, reg.key, priority) + return c.setPriority(ctx, c.key, priority) } -func (reg PNCounter[T]) getCurrentValue(ctx context.Context, key core.DataStoreKey) (T, error) { - curValue, err := reg.store.Get(ctx, key.ToDS()) +func (c Counter[T]) getCurrentValue(ctx context.Context, key core.DataStoreKey) (T, error) { + curValue, err := c.store.Get(ctx, key.ToDS()) if err != nil { if errors.Is(err, ds.ErrNotFound) { return 0, nil @@ -192,14 +201,14 @@ func (reg PNCounter[T]) getCurrentValue(ctx context.Context, key core.DataStoreK return getNumericFromBytes[T](curValue) } -// DeltaDecode is a typed helper to extract a PNCounterDelta from a ipld.Node -func (reg PNCounter[T]) DeltaDecode(node ipld.Node) (core.Delta, error) { +// DeltaDecode is a typed helper to extract a CounterDelta from a ipld.Node +func (c Counter[T]) DeltaDecode(node ipld.Node) (core.Delta, error) { pbNode, ok := node.(*dag.ProtoNode) if !ok { return nil, client.NewErrUnexpectedType[*dag.ProtoNode]("ipld.Node", node) } - delta := &PNCounterDelta[T]{} + delta := &CounterDelta[T]{} err := delta.Unmarshal(pbNode.Data()) if err != nil { return nil, err @@ -208,6 +217,13 @@ func (reg PNCounter[T]) DeltaDecode(node ipld.Node) (core.Delta, error) { return delta, nil } +func (c Counter[T]) CType() client.CType { + if c.AllowDecrement { + return client.PN_COUNTER + } + return client.P_COUNTER +} + func getNumericFromBytes[T Incrementable](b []byte) (T, error) { var val T err := cbor.Unmarshal(b, &val) diff --git a/core/crdt/errors.go b/core/crdt/errors.go index e1148d1044..75af579850 100644 --- a/core/crdt/errors.go +++ b/core/crdt/errors.go @@ -17,6 +17,7 @@ import ( const ( errFailedToGetPriority string = "failed to get priority" errFailedToStoreValue string = "failed to store value" + errNegativeValue string = "value cannot be negative" ) // Errors returnable from this package. @@ -26,6 +27,7 @@ const ( var ( ErrFailedToGetPriority = errors.New(errFailedToGetPriority) ErrFailedToStoreValue = errors.New(errFailedToStoreValue) + ErrNegativeValue = errors.New(errNegativeValue) ErrEncodingPriority = errors.New("error encoding priority") ErrDecodingPriority = errors.New("error decoding priority") // ErrMismatchedMergeType - Tying to merge two ReplicatedData of different types @@ -41,3 +43,7 @@ func NewErrFailedToGetPriority(inner error) error { func NewErrFailedToStoreValue(inner error) error { return errors.Wrap(errFailedToStoreValue, inner) } + +func NewErrNegativeValue[T Incrementable](value T) error { + return errors.New(errNegativeValue, errors.NewKV("Value", value)) +} diff --git a/db/base/collection_keys.go b/db/base/collection_keys.go index 1277b96a81..98584454ab 100644 --- a/db/base/collection_keys.go +++ b/db/base/collection_keys.go @@ -47,7 +47,7 @@ func MakePrimaryIndexKeyForCRDT( WithInstanceInfo(key). WithFieldId(core.COMPOSITE_NAMESPACE), nil - case client.LWW_REGISTER, client.PN_COUNTER: + case client.LWW_REGISTER, client.PN_COUNTER, client.P_COUNTER: field, ok := c.GetFieldByName(fieldName) if !ok { return core.DataStoreKey{}, client.NewErrFieldNotExist(fieldName) diff --git a/merkle/crdt/pncounter.go b/merkle/crdt/counter.go similarity index 55% rename from merkle/crdt/pncounter.go rename to merkle/crdt/counter.go index 74b7adb156..6ca016cea6 100644 --- a/merkle/crdt/pncounter.go +++ b/merkle/crdt/counter.go @@ -1,4 +1,4 @@ -// Copyright 2023 Democratized Data Foundation +// Copyright 2024 Democratized Data Foundation // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -21,40 +21,41 @@ import ( "github.com/sourcenetwork/defradb/merkle/clock" ) -// MerklePNCounter is a MerkleCRDT implementation of the PNCounter using MerkleClocks. -type MerklePNCounter[T crdt.Incrementable] struct { +// MerkleCounter is a MerkleCRDT implementation of the Counter using MerkleClocks. +type MerkleCounter[T crdt.Incrementable] struct { *baseMerkleCRDT - reg crdt.PNCounter[T] + reg crdt.Counter[T] } -// NewMerklePNCounter creates a new instance (or loaded from DB) of a MerkleCRDT -// backed by a PNCounter CRDT. -func NewMerklePNCounter[T crdt.Incrementable]( +// NewMerkleCounter creates a new instance (or loaded from DB) of a MerkleCRDT +// backed by a Counter CRDT. +func NewMerkleCounter[T crdt.Incrementable]( store Stores, schemaVersionKey core.CollectionSchemaVersionKey, key core.DataStoreKey, fieldName string, -) *MerklePNCounter[T] { - register := crdt.NewPNCounter[T](store.Datastore(), schemaVersionKey, key, fieldName) + allowDecrement bool, +) *MerkleCounter[T] { + register := crdt.NewCounter[T](store.Datastore(), schemaVersionKey, key, fieldName, allowDecrement) clk := clock.NewMerkleClock(store.Headstore(), store.DAGstore(), key.ToHeadStoreKey(), register) base := &baseMerkleCRDT{clock: clk, crdt: register} - return &MerklePNCounter[T]{ + return &MerkleCounter[T]{ baseMerkleCRDT: base, reg: register, } } -// Save the value of the PN Counter to the DAG. -func (mPNC *MerklePNCounter[T]) Save(ctx context.Context, data any) (ipld.Node, uint64, error) { +// Save the value of the Counter to the DAG. +func (mc *MerkleCounter[T]) Save(ctx context.Context, data any) (ipld.Node, uint64, error) { value, ok := data.(*client.FieldValue) if !ok { - return nil, 0, NewErrUnexpectedValueType(client.PN_COUNTER, &client.FieldValue{}, data) + return nil, 0, NewErrUnexpectedValueType(mc.reg.CType(), &client.FieldValue{}, data) } - delta, err := mPNC.reg.Increment(ctx, value.Value().(T)) + delta, err := mc.reg.Increment(ctx, value.Value().(T)) if err != nil { return nil, 0, err } - nd, err := mPNC.clock.AddDAGNode(ctx, delta) + nd, err := mc.clock.AddDAGNode(ctx, delta) return nd, delta.GetPriority(), err } diff --git a/merkle/crdt/merklecrdt.go b/merkle/crdt/merklecrdt.go index c96791d07c..5bd95c86cd 100644 --- a/merkle/crdt/merklecrdt.go +++ b/merkle/crdt/merklecrdt.go @@ -66,12 +66,12 @@ func (base *baseMerkleCRDT) Value(ctx context.Context) ([]byte, error) { func InstanceWithStore( store Stores, schemaVersionKey core.CollectionSchemaVersionKey, - ctype client.CType, + cType client.CType, kind client.FieldKind, key core.DataStoreKey, fieldName string, ) (MerkleCRDT, error) { - switch ctype { + switch cType { case client.LWW_REGISTER: return NewMerkleLWWRegister( store, @@ -79,21 +79,23 @@ func InstanceWithStore( key, fieldName, ), nil - case client.PN_COUNTER: + case client.PN_COUNTER, client.P_COUNTER: switch kind { case client.FieldKind_NILLABLE_INT: - return NewMerklePNCounter[int64]( + return NewMerkleCounter[int64]( store, schemaVersionKey, key, fieldName, + cType == client.PN_COUNTER, ), nil case client.FieldKind_NILLABLE_FLOAT: - return NewMerklePNCounter[float64]( + return NewMerkleCounter[float64]( store, schemaVersionKey, key, fieldName, + cType == client.PN_COUNTER, ), nil } case client.COMPOSITE: @@ -104,5 +106,5 @@ func InstanceWithStore( fieldName, ), nil } - return nil, client.NewErrUnknownCRDT(ctype) + return nil, client.NewErrUnknownCRDT(cType) } diff --git a/request/graphql/schema/types/types.go b/request/graphql/schema/types/types.go index 2273e3adb9..7865e204db 100644 --- a/request/graphql/schema/types/types.go +++ b/request/graphql/schema/types/types.go @@ -161,8 +161,20 @@ var ( Description: "Last Write Wins register", }, client.PN_COUNTER.String(): &gql.EnumValueConfig{ - Value: client.PN_COUNTER, - Description: "Positive-Negative Counter", + Value: client.PN_COUNTER, + Description: `Positive-Negative Counter. + +WARNING: Incrementing an integer and causing it to overflow the int64 max value +will cause the value to roll over to the int64 min value. Incremeting a float and +causing it to overflow the float64 max value will act like a no-op.`, + }, + client.P_COUNTER.String(): &gql.EnumValueConfig{ + Value: client.P_COUNTER, + Description: `Positive Counter. + +WARNING: Incrementing an integer and causing it to overflow the int64 max value +will cause the value to roll over to the int64 min value. Incremeting a float and +causing it to overflow the float64 max value will act like a no-op.`, }, }, }) diff --git a/tests/integration/mutation/create/crdt/pcounter_test.go b/tests/integration/mutation/create/crdt/pcounter_test.go new file mode 100644 index 0000000000..681ca2ec76 --- /dev/null +++ b/tests/integration/mutation/create/crdt/pcounter_test.go @@ -0,0 +1,57 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package create + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestPCounterCreate_IntKindWithPositiveValue_NoError(t *testing.T) { + test := testUtils.TestCase{ + Description: "Document creation with P Counter", + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + points: Int @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John", + "points": 10 + }`, + }, + testUtils.Request{ + Request: `query { + Users { + _docID + name + points + } + }`, + Results: []map[string]any{ + { + "_docID": "bae-a688789e-d8a6-57a7-be09-22e005ab79e0", + "name": "John", + "points": int64(10), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/mutation/update/crdt/pcounter_test.go b/tests/integration/mutation/update/crdt/pcounter_test.go new file mode 100644 index 0000000000..c4ff85e8b4 --- /dev/null +++ b/tests/integration/mutation/update/crdt/pcounter_test.go @@ -0,0 +1,265 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package update + +import ( + "fmt" + "math" + "testing" + + "github.com/sourcenetwork/immutable" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestPCounterUpdate_IntKindWithNegativeIncrement_ShouldError(t *testing.T) { + test := testUtils.TestCase{ + Description: "Positive increments of a P Counter with Int type", + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + points: Int @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John", + "points": 0 + }`, + }, + testUtils.UpdateDoc{ + DocID: 0, + Doc: `{ + "points": -10 + }`, + ExpectedError: "value cannot be negative", + }, + testUtils.Request{ + Request: `query { + Users { + name + points + } + }`, + Results: []map[string]any{ + { + "name": "John", + "points": int64(0), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestPCounterUpdate_IntKindWithPositiveIncrement_ShouldIncrement(t *testing.T) { + test := testUtils.TestCase{ + Description: "Positive increments of a P Counter with Int type", + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + points: Int @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John", + "points": 0 + }`, + }, + testUtils.UpdateDoc{ + DocID: 0, + Doc: `{ + "points": 10 + }`, + }, + testUtils.UpdateDoc{ + DocID: 0, + Doc: `{ + "points": 10 + }`, + }, + testUtils.Request{ + Request: `query { + Users { + name + points + } + }`, + Results: []map[string]any{ + { + "name": "John", + "points": int64(20), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +// This test documents what happens when an overflow occurs in a P Counter with Int type. +func TestPCounterUpdate_IntKindWithPositiveIncrementOverflow_RollsOverToMinInt64(t *testing.T) { + test := testUtils.TestCase{ + Description: "Positive increments of a P Counter with Int type causing overflow behaviour", + SupportedMutationTypes: immutable.Some([]testUtils.MutationType{ + // GQL mutation will return a type error in this case + // because we are testing the internal overflow behaviour with + // a int64 but the GQL Int type is an int32. + testUtils.CollectionNamedMutationType, + testUtils.CollectionSaveMutationType, + }), + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + points: Int @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + Doc: fmt.Sprintf(`{ + "name": "John", + "points": %d + }`, math.MaxInt64), + }, + testUtils.UpdateDoc{ + DocID: 0, + Doc: `{ + "points": 1 + }`, + }, + testUtils.Request{ + Request: `query { + Users { + name + points + } + }`, + Results: []map[string]any{ + { + "name": "John", + "points": int64(math.MinInt64), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestPCounterUpdate_FloatKindWithPositiveIncrement_ShouldIncrement(t *testing.T) { + test := testUtils.TestCase{ + Description: "Positive increments of a P Counter with Float type. Note the lack of precision", + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + points: Float @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John", + "points": 0 + }`, + }, + testUtils.UpdateDoc{ + DocID: 0, + Doc: `{ + "points": 10.1 + }`, + }, + testUtils.UpdateDoc{ + DocID: 0, + Doc: `{ + "points": 10.2 + }`, + }, + testUtils.Request{ + Request: `query { + Users { + name + points + } + }`, + Results: []map[string]any{ + { + "name": "John", + // Note the lack of precision of float types. + "points": 20.299999999999997, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +// This test documents what happens when an overflow occurs in a P Counter with Float type. +// In this case it is the same as a no-op. +func TestPCounterUpdate_FloatKindWithPositiveIncrementOverflow_NoOp(t *testing.T) { + test := testUtils.TestCase{ + Description: "Positive increments of a P Counter with Float type and overflow causing a no-op", + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + points: Float @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + Doc: fmt.Sprintf(`{ + "name": "John", + "points": %g + }`, math.MaxFloat64), + }, + testUtils.UpdateDoc{ + DocID: 0, + Doc: `{ + "points": 1000 + }`, + }, + testUtils.Request{ + Request: `query { + Users { + name + points + } + }`, + Results: []map[string]any{ + { + "name": "John", + "points": math.MaxFloat64, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/mutation/update/crdt/pncounter_test.go b/tests/integration/mutation/update/crdt/pncounter_test.go index f8ede1cffc..fe350ab852 100644 --- a/tests/integration/mutation/update/crdt/pncounter_test.go +++ b/tests/integration/mutation/update/crdt/pncounter_test.go @@ -75,8 +75,9 @@ func TestPNCounterUpdate_IntKindWithPositiveIncrementOverflow_RollsOverToMinInt6 test := testUtils.TestCase{ Description: "Positive increments of a PN Counter with Int type causing overflow behaviour", SupportedMutationTypes: immutable.Some([]testUtils.MutationType{ - // GQL mutation will return an error - // when integer type overflows + // GQL mutation will return a type error in this case + // because we are testing the internal overflow behaviour with + // a int64 but the GQL Int type is an int32. testUtils.CollectionNamedMutationType, testUtils.CollectionSaveMutationType, }), diff --git a/tests/integration/net/state/simple/peer/crdt/pcounter_test.go b/tests/integration/net/state/simple/peer/crdt/pcounter_test.go new file mode 100644 index 0000000000..963b7d54cd --- /dev/null +++ b/tests/integration/net/state/simple/peer/crdt/pcounter_test.go @@ -0,0 +1,124 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package peer_test + +import ( + "testing" + + "github.com/sourcenetwork/immutable" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestP2PUpdate_WithPCounter_NoError(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + points: Int @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + // Create Shahzad on all nodes + Doc: `{ + "name": "Shahzad", + "points": 10 + }`, + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.UpdateDoc{ + NodeID: immutable.Some(0), + DocID: 0, + Doc: `{ + "points": 10 + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + Request: `query { + Users { + points + } + }`, + Results: []map[string]any{ + { + "points": int64(20), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestP2PUpdate_WithPCounterSimultaneousUpdate_NoError(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + Name: String + Age: Int @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + // Create John on all nodes + Doc: `{ + "Name": "John", + "Age": 0 + }`, + }, + testUtils.ConnectPeers{ + SourceNodeID: 0, + TargetNodeID: 1, + }, + testUtils.UpdateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "Age": 45 + }`, + }, + testUtils.UpdateDoc{ + NodeID: immutable.Some(1), + Doc: `{ + "Age": 45 + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + Request: `query { + Users { + Age + } + }`, + Results: []map[string]any{ + { + "Age": int64(90), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/net/state/simple/peer_replicator/crdt/pcounter_test.go b/tests/integration/net/state/simple/peer_replicator/crdt/pcounter_test.go new file mode 100644 index 0000000000..a7b3c67a59 --- /dev/null +++ b/tests/integration/net/state/simple/peer_replicator/crdt/pcounter_test.go @@ -0,0 +1,160 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package peer_replicator_test + +import ( + "testing" + + "github.com/sourcenetwork/immutable" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestP2PPeerReplicatorWithCreate_PCounter_NoError(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + points: Int @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John", + "points": 0 + }`, + }, + testUtils.ConfigureReplicator{ + SourceNodeID: 0, + TargetNodeID: 2, + }, + testUtils.ConnectPeers{ + SourceNodeID: 0, + TargetNodeID: 1, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Shahzad", + "points": 3000 + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + NodeID: immutable.Some(0), + Request: `query { + Users { + points + } + }`, + Results: []map[string]any{ + { + "points": int64(0), + }, + { + "points": int64(3000), + }, + }, + }, + testUtils.Request{ + NodeID: immutable.Some(1), + Request: `query { + Users { + points + } + }`, + Results: []map[string]any{ + { + "points": int64(0), + }, + }, + }, + testUtils.Request{ + NodeID: immutable.Some(2), + Request: `query { + Users { + points + } + }`, + Results: []map[string]any{ + { + "points": int64(0), + }, + { + "points": int64(3000), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestP2PPeerReplicatorWithUpdate_PCounter_NoError(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + points: Int @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John", + "points": 10 + }`, + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.ConfigureReplicator{ + SourceNodeID: 0, + TargetNodeID: 2, + }, + testUtils.UpdateDoc{ + // Update John's points on the first node only, and allow the value to sync + NodeID: immutable.Some(0), + Doc: `{ + "points": 10 + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + Request: `query { + Users { + points + } + }`, + Results: []map[string]any{ + { + "points": int64(20), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/net/state/simple/replicator/crdt/pcounter_test.go b/tests/integration/net/state/simple/replicator/crdt/pcounter_test.go new file mode 100644 index 0000000000..33ea5d136d --- /dev/null +++ b/tests/integration/net/state/simple/replicator/crdt/pcounter_test.go @@ -0,0 +1,71 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package replicator + +import ( + "testing" + + "github.com/sourcenetwork/immutable" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestP2POneToOneReplicatorUpdate_PCounter_NoError(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + points: Int @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + // This document is created in first node before the replicator is set up. + // Updates should be synced across nodes. + NodeID: immutable.Some(0), + Doc: `{ + "name": "John", + "points": 10 + }`, + }, + testUtils.ConfigureReplicator{ + SourceNodeID: 0, + TargetNodeID: 1, + }, + testUtils.UpdateDoc{ + // Update John's points on the first node only, and allow the value to sync + NodeID: immutable.Some(0), + Doc: `{ + "points": 10 + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + Request: `query { + Users { + points + } + }`, + Results: []map[string]any{ + { + "points": int64(20), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/query/simple/with_cid_doc_id_test.go b/tests/integration/query/simple/with_cid_doc_id_test.go index fee91f7399..7c265a409c 100644 --- a/tests/integration/query/simple/with_cid_doc_id_test.go +++ b/tests/integration/query/simple/with_cid_doc_id_test.go @@ -395,3 +395,97 @@ func TestCidAndDocIDQuery_ContainsPNCounterWithFloatKind_NoError(t *testing.T) { testUtils.ExecuteTestCase(t, test) } + +// Note: Only the first CID is reproducible given the added entropy to the Counter CRDT type. +func TestCidAndDocIDQuery_ContainsPCounterWithIntKind_NoError(t *testing.T) { + test := testUtils.TestCase{ + Description: "Simple query with first cid and docID with pcounter int type", + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + points: Int @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John", + "points": 10 + }`, + }, + testUtils.UpdateDoc{ + Doc: `{ + "points": 20 + }`, + }, + testUtils.Request{ + Request: `query { + Users ( + cid: "bafybeibinkgqwegghg7kqwk66etboc5jv42i4akasxrih35wrvykdwcima", + docID: "bae-a688789e-d8a6-57a7-be09-22e005ab79e0" + ) { + name + points + } + }`, + Results: []map[string]any{ + { + "name": "John", + "points": int64(10), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +// Note: Only the first CID is reproducible given the added entropy to the Counter CRDT type. +func TestCidAndDocIDQuery_ContainsPCounterWithFloatKind_NoError(t *testing.T) { + test := testUtils.TestCase{ + Description: "Simple query with first cid and docID with pcounter and float type", + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + points: Float @crdt(type: "pcounter") + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John", + "points": 10.2 + }`, + }, + testUtils.UpdateDoc{ + Doc: `{ + "points": 20.6 + }`, + }, + testUtils.Request{ + Request: `query { + Users ( + cid: "bafybeifsok5oy42zs2p7habfjr3ee3j7mxeag5nfdo7u4d2bfvm6hdhnpq", + docID: "bae-fa6a97e9-e0e9-5826-8a8c-57775d35e07c" + ) { + name + points + } + }`, + Results: []map[string]any{ + { + "name": "John", + "points": 10.2, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/schema/crdt_type_test.go b/tests/integration/schema/crdt_type_test.go index 47388262d0..fdc278e52c 100644 --- a/tests/integration/schema/crdt_type_test.go +++ b/tests/integration/schema/crdt_type_test.go @@ -130,3 +130,98 @@ func TestSchemaCreate_ContainsPNCounterWithInvalidType_Error(t *testing.T) { testUtils.ExecuteTestCase(t, test) } + +func TestSchemaCreate_ContainsPCounterTypeWithIntKind_NoError(t *testing.T) { + schemaVersionID := "bafkreidjvjnvtwwdkcdqwcmwxqzu3bxrbxs3rkn6h6h7kkxmibpli3mp7y" + + test := testUtils.TestCase{ + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + points: Int @crdt(type: "pcounter") + } + `, + }, + testUtils.GetSchema{ + VersionID: immutable.Some(schemaVersionID), + ExpectedResults: []client.SchemaDescription{ + { + Name: "Users", + VersionID: schemaVersionID, + Root: schemaVersionID, + Fields: []client.SchemaFieldDescription{ + { + Name: "_docID", + Kind: client.FieldKind_DocID, + }, + { + Name: "points", + Kind: client.FieldKind_NILLABLE_INT, + Typ: client.P_COUNTER, + }, + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestSchemaCreate_ContainsPCounterTypeWithFloatKind_NoError(t *testing.T) { + schemaVersionID := "bafkreiasm64v2oimv6uk3hlfap6awptumwkm4fxuoc3ck3ehfe2tmry66i" + + test := testUtils.TestCase{ + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + points: Float @crdt(type: "pcounter") + } + `, + }, + testUtils.GetSchema{ + VersionID: immutable.Some(schemaVersionID), + ExpectedResults: []client.SchemaDescription{ + { + Name: "Users", + VersionID: schemaVersionID, + Root: schemaVersionID, + Fields: []client.SchemaFieldDescription{ + { + Name: "_docID", + Kind: client.FieldKind_DocID, + }, + { + Name: "points", + Kind: client.FieldKind_NILLABLE_FLOAT, + Typ: client.P_COUNTER, + }, + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestSchemaCreate_ContainsPCounterTypeWithWrongKind_Error(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + points: String @crdt(type: "pcounter") + } + `, + ExpectedError: "CRDT type pcounter can't be assigned to field kind String", + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/schema/updates/add/field/crdt/pcounter_test.go b/tests/integration/schema/updates/add/field/crdt/pcounter_test.go new file mode 100644 index 0000000000..b7edfe7269 --- /dev/null +++ b/tests/integration/schema/updates/add/field/crdt/pcounter_test.go @@ -0,0 +1,73 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package crdt + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestSchemaUpdates_AddFieldCRDTPCounter_NoError(t *testing.T) { + test := testUtils.TestCase{ + Description: "Test schema update, add field with crdt P Counter (5)", + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + } + `, + }, + testUtils.SchemaPatch{ + Patch: ` + [ + { "op": "add", "path": "/Users/Fields/-", "value": {"Name": "foo", "Kind": "Int", "Typ": 5} } + ] + `, + }, + testUtils.Request{ + Request: `query { + Users { + name + foo + } + }`, + Results: []map[string]any{}, + }, + }, + } + testUtils.ExecuteTestCase(t, test) +} + +func TestSchemaUpdates_AddFieldCRDTPCounterWithMismatchKind_Error(t *testing.T) { + test := testUtils.TestCase{ + Description: "Test schema update, add field with crdt P Counter (5)", + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + } + `, + }, + testUtils.SchemaPatch{ + Patch: ` + [ + { "op": "add", "path": "/Users/Fields/-", "value": {"Name": "foo", "Kind": "Boolean", "Typ": 5} } + ] + `, + ExpectedError: "CRDT type pcounter can't be assigned to field kind Boolean", + }, + }, + } + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/schema/updates/add/field/crdt/pncounter_test.go b/tests/integration/schema/updates/add/field/crdt/pncounter_test.go index 2664118c0f..e4be1c1df8 100644 --- a/tests/integration/schema/updates/add/field/crdt/pncounter_test.go +++ b/tests/integration/schema/updates/add/field/crdt/pncounter_test.go @@ -30,7 +30,7 @@ func TestSchemaUpdates_AddFieldCRDTPNCounter_NoError(t *testing.T) { testUtils.SchemaPatch{ Patch: ` [ - { "op": "add", "path": "/Users/Fields/-", "value": {"Name": "foo", "Kind": 4, "Typ": 4} } + { "op": "add", "path": "/Users/Fields/-", "value": {"Name": "foo", "Kind": "Int", "Typ": 4} } ] `, }, @@ -62,7 +62,7 @@ func TestSchemaUpdates_AddFieldCRDTPNCounterWithMismatchKind_Error(t *testing.T) testUtils.SchemaPatch{ Patch: ` [ - { "op": "add", "path": "/Users/Fields/-", "value": {"Name": "foo", "Kind": 2, "Typ": 4} } + { "op": "add", "path": "/Users/Fields/-", "value": {"Name": "foo", "Kind": "Boolean", "Typ": 4} } ] `, ExpectedError: "CRDT type pncounter can't be assigned to field kind Boolean",