Skip to content

Commit

Permalink
Merge branch 'develop' into nasdf/feat/lens-runtime-config
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Apr 8, 2024
2 parents 0d16c11 + 4811ba9 commit 6c6676d
Show file tree
Hide file tree
Showing 17 changed files with 1,053 additions and 73 deletions.
7 changes: 5 additions & 2 deletions client/ctype.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ const (
OBJECT
COMPOSITE
PN_COUNTER
P_COUNTER
)

// IsSupportedFieldCType returns true if the type is supported as a document field type.
func (t CType) IsSupportedFieldCType() bool {
switch t {
case NONE_CRDT, LWW_REGISTER, PN_COUNTER:
case NONE_CRDT, LWW_REGISTER, PN_COUNTER, P_COUNTER:
return true
default:
return false
Expand All @@ -38,7 +39,7 @@ func (t CType) IsSupportedFieldCType() bool {
// IsCompatibleWith returns true if the CRDT is compatible with the field kind
func (t CType) IsCompatibleWith(kind FieldKind) bool {
switch t {
case PN_COUNTER:
case PN_COUNTER, P_COUNTER:
if kind == FieldKind_NILLABLE_INT || kind == FieldKind_NILLABLE_FLOAT {
return true
}
Expand All @@ -61,6 +62,8 @@ func (t CType) String() string {
return "composite"
case PN_COUNTER:
return "pncounter"
case P_COUNTER:
return "pcounter"
default:
return "unknown"
}
Expand Down
102 changes: 59 additions & 43 deletions core/crdt/pncounter.go → core/crdt/counter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Democratized Data Foundation
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
Expand Down Expand Up @@ -33,18 +33,18 @@ import (

var (
// ensure types implements core interfaces
_ core.ReplicatedData = (*PNCounter[float64])(nil)
_ core.ReplicatedData = (*PNCounter[int64])(nil)
_ core.Delta = (*PNCounterDelta[float64])(nil)
_ core.Delta = (*PNCounterDelta[int64])(nil)
_ core.ReplicatedData = (*Counter[float64])(nil)
_ core.ReplicatedData = (*Counter[int64])(nil)
_ core.Delta = (*CounterDelta[float64])(nil)
_ core.Delta = (*CounterDelta[int64])(nil)
)

type Incrementable interface {
constraints.Integer | constraints.Float
}

// PNCounterDelta is a single delta operation for an PNCounter
type PNCounterDelta[T Incrementable] struct {
// CounterDelta is a single delta operation for a Counter
type CounterDelta[T Incrementable] struct {
DocID []byte
FieldName string
Priority uint64
Expand All @@ -59,17 +59,17 @@ type PNCounterDelta[T Incrementable] struct {
}

// GetPriority gets the current priority for this delta.
func (delta *PNCounterDelta[T]) GetPriority() uint64 {
func (delta *CounterDelta[T]) GetPriority() uint64 {
return delta.Priority
}

// SetPriority will set the priority for this delta.
func (delta *PNCounterDelta[T]) SetPriority(prio uint64) {
func (delta *CounterDelta[T]) SetPriority(prio uint64) {
delta.Priority = prio
}

// Marshal encodes the delta using CBOR.
func (delta *PNCounterDelta[T]) Marshal() ([]byte, error) {
func (delta *CounterDelta[T]) Marshal() ([]byte, error) {
h := &codec.CborHandle{}
buf := bytes.NewBuffer(nil)
enc := codec.NewEncoder(buf, h)
Expand All @@ -81,44 +81,50 @@ func (delta *PNCounterDelta[T]) Marshal() ([]byte, error) {
}

// Unmarshal decodes the delta from CBOR.
func (delta *PNCounterDelta[T]) Unmarshal(b []byte) error {
func (delta *CounterDelta[T]) Unmarshal(b []byte) error {
h := &codec.CborHandle{}
dec := codec.NewDecoderBytes(b, h)
return dec.Decode(delta)
}

// PNCounter, is a simple CRDT type that allows increment/decrement
// Counter, is a simple CRDT type that allows increment/decrement
// of an Int and Float data types that ensures convergence.
type PNCounter[T Incrementable] struct {
type Counter[T Incrementable] struct {
baseCRDT
AllowDecrement bool
}

// NewPNCounter returns a new instance of the PNCounter with the given ID.
func NewPNCounter[T Incrementable](
// NewCounter returns a new instance of the Counter with the given ID.
func NewCounter[T Incrementable](
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
fieldName string,
) PNCounter[T] {
return PNCounter[T]{newBaseCRDT(store, key, schemaVersionKey, fieldName)}
allowDecrement bool,
) Counter[T] {
return Counter[T]{newBaseCRDT(store, key, schemaVersionKey, fieldName), allowDecrement}
}

// Value gets the current register value
func (reg PNCounter[T]) Value(ctx context.Context) ([]byte, error) {
valueK := reg.key.WithValueFlag()
buf, err := reg.store.Get(ctx, valueK.ToDS())
// Value gets the current counter value
func (c Counter[T]) Value(ctx context.Context) ([]byte, error) {
valueK := c.key.WithValueFlag()
buf, err := c.store.Get(ctx, valueK.ToDS())
if err != nil {
return nil, err
}
return buf, nil
}

// Set generates a new delta with the supplied value
func (reg PNCounter[T]) Increment(ctx context.Context, value T) (*PNCounterDelta[T], error) {
// Set generates a new delta with the supplied value.
//
// WARNING: Incrementing an integer and causing it to overflow the int64 max value
// will cause the value to roll over to the int64 min value. Incremeting a float and
// causing it to overflow the float64 max value will act like a no-op.
func (c Counter[T]) Increment(ctx context.Context, value T) (*CounterDelta[T], error) {
// To ensure that the dag block is unique, we add a random number to the delta.
// This is done only on update (if the doc doesn't already exist) to ensure that the
// initial dag block of a document can be reproducible.
exists, err := reg.store.Has(ctx, reg.key.ToPrimaryDataStoreKey().ToDS())
exists, err := c.store.Has(ctx, c.key.ToPrimaryDataStoreKey().ToDS())
if err != nil {
return nil, err
}
Expand All @@ -131,37 +137,40 @@ func (reg PNCounter[T]) Increment(ctx context.Context, value T) (*PNCounterDelta
nonce = r.Int64()
}

return &PNCounterDelta[T]{
DocID: []byte(reg.key.DocID),
FieldName: reg.fieldName,
return &CounterDelta[T]{
DocID: []byte(c.key.DocID),
FieldName: c.fieldName,
Data: value,
SchemaVersionID: reg.schemaVersionKey.SchemaVersionId,
SchemaVersionID: c.schemaVersionKey.SchemaVersionId,
Nonce: nonce,
}, nil
}

// Merge implements ReplicatedData interface.
// It merges two PNCounterRegisty by adding the values together.
func (reg PNCounter[T]) Merge(ctx context.Context, delta core.Delta) error {
d, ok := delta.(*PNCounterDelta[T])
// It merges two CounterRegisty by adding the values together.
func (c Counter[T]) Merge(ctx context.Context, delta core.Delta) error {
d, ok := delta.(*CounterDelta[T])
if !ok {
return ErrMismatchedMergeType
}

return reg.incrementValue(ctx, d.Data, d.GetPriority())
return c.incrementValue(ctx, d.Data, d.GetPriority())
}

func (reg PNCounter[T]) incrementValue(ctx context.Context, value T, priority uint64) error {
key := reg.key.WithValueFlag()
marker, err := reg.store.Get(ctx, reg.key.ToPrimaryDataStoreKey().ToDS())
func (c Counter[T]) incrementValue(ctx context.Context, value T, priority uint64) error {
if !c.AllowDecrement && value < 0 {
return NewErrNegativeValue(value)
}
key := c.key.WithValueFlag()
marker, err := c.store.Get(ctx, c.key.ToPrimaryDataStoreKey().ToDS())
if err != nil && !errors.Is(err, ds.ErrNotFound) {
return err
}
if bytes.Equal(marker, []byte{base.DeletedObjectMarker}) {
key = key.WithDeletedFlag()
}

curValue, err := reg.getCurrentValue(ctx, key)
curValue, err := c.getCurrentValue(ctx, key)
if err != nil {
return err
}
Expand All @@ -172,16 +181,16 @@ func (reg PNCounter[T]) incrementValue(ctx context.Context, value T, priority ui
return err
}

err = reg.store.Put(ctx, key.ToDS(), b)
err = c.store.Put(ctx, key.ToDS(), b)
if err != nil {
return NewErrFailedToStoreValue(err)
}

return reg.setPriority(ctx, reg.key, priority)
return c.setPriority(ctx, c.key, priority)
}

func (reg PNCounter[T]) getCurrentValue(ctx context.Context, key core.DataStoreKey) (T, error) {
curValue, err := reg.store.Get(ctx, key.ToDS())
func (c Counter[T]) getCurrentValue(ctx context.Context, key core.DataStoreKey) (T, error) {
curValue, err := c.store.Get(ctx, key.ToDS())
if err != nil {
if errors.Is(err, ds.ErrNotFound) {
return 0, nil
Expand All @@ -192,14 +201,14 @@ func (reg PNCounter[T]) getCurrentValue(ctx context.Context, key core.DataStoreK
return getNumericFromBytes[T](curValue)
}

// DeltaDecode is a typed helper to extract a PNCounterDelta from a ipld.Node
func (reg PNCounter[T]) DeltaDecode(node ipld.Node) (core.Delta, error) {
// DeltaDecode is a typed helper to extract a CounterDelta from a ipld.Node
func (c Counter[T]) DeltaDecode(node ipld.Node) (core.Delta, error) {
pbNode, ok := node.(*dag.ProtoNode)
if !ok {
return nil, client.NewErrUnexpectedType[*dag.ProtoNode]("ipld.Node", node)
}

delta := &PNCounterDelta[T]{}
delta := &CounterDelta[T]{}
err := delta.Unmarshal(pbNode.Data())
if err != nil {
return nil, err
Expand All @@ -208,6 +217,13 @@ func (reg PNCounter[T]) DeltaDecode(node ipld.Node) (core.Delta, error) {
return delta, nil
}

func (c Counter[T]) CType() client.CType {
if c.AllowDecrement {
return client.PN_COUNTER
}
return client.P_COUNTER
}

func getNumericFromBytes[T Incrementable](b []byte) (T, error) {
var val T
err := cbor.Unmarshal(b, &val)
Expand Down
6 changes: 6 additions & 0 deletions core/crdt/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
const (
errFailedToGetPriority string = "failed to get priority"
errFailedToStoreValue string = "failed to store value"
errNegativeValue string = "value cannot be negative"
)

// Errors returnable from this package.
Expand All @@ -26,6 +27,7 @@ const (
var (
ErrFailedToGetPriority = errors.New(errFailedToGetPriority)
ErrFailedToStoreValue = errors.New(errFailedToStoreValue)
ErrNegativeValue = errors.New(errNegativeValue)
ErrEncodingPriority = errors.New("error encoding priority")
ErrDecodingPriority = errors.New("error decoding priority")
// ErrMismatchedMergeType - Tying to merge two ReplicatedData of different types
Expand All @@ -41,3 +43,7 @@ func NewErrFailedToGetPriority(inner error) error {
func NewErrFailedToStoreValue(inner error) error {
return errors.Wrap(errFailedToStoreValue, inner)
}

func NewErrNegativeValue[T Incrementable](value T) error {
return errors.New(errNegativeValue, errors.NewKV("Value", value))
}
2 changes: 1 addition & 1 deletion db/base/collection_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func MakePrimaryIndexKeyForCRDT(
WithInstanceInfo(key).
WithFieldId(core.COMPOSITE_NAMESPACE),
nil
case client.LWW_REGISTER, client.PN_COUNTER:
case client.LWW_REGISTER, client.PN_COUNTER, client.P_COUNTER:
field, ok := c.GetFieldByName(fieldName)
if !ok {
return core.DataStoreKey{}, client.NewErrFieldNotExist(fieldName)
Expand Down
31 changes: 16 additions & 15 deletions merkle/crdt/pncounter.go → merkle/crdt/counter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Democratized Data Foundation
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
Expand All @@ -21,40 +21,41 @@ import (
"github.com/sourcenetwork/defradb/merkle/clock"
)

// MerklePNCounter is a MerkleCRDT implementation of the PNCounter using MerkleClocks.
type MerklePNCounter[T crdt.Incrementable] struct {
// MerkleCounter is a MerkleCRDT implementation of the Counter using MerkleClocks.
type MerkleCounter[T crdt.Incrementable] struct {
*baseMerkleCRDT

reg crdt.PNCounter[T]
reg crdt.Counter[T]
}

// NewMerklePNCounter creates a new instance (or loaded from DB) of a MerkleCRDT
// backed by a PNCounter CRDT.
func NewMerklePNCounter[T crdt.Incrementable](
// NewMerkleCounter creates a new instance (or loaded from DB) of a MerkleCRDT
// backed by a Counter CRDT.
func NewMerkleCounter[T crdt.Incrementable](
store Stores,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
fieldName string,
) *MerklePNCounter[T] {
register := crdt.NewPNCounter[T](store.Datastore(), schemaVersionKey, key, fieldName)
allowDecrement bool,
) *MerkleCounter[T] {
register := crdt.NewCounter[T](store.Datastore(), schemaVersionKey, key, fieldName, allowDecrement)
clk := clock.NewMerkleClock(store.Headstore(), store.DAGstore(), key.ToHeadStoreKey(), register)
base := &baseMerkleCRDT{clock: clk, crdt: register}
return &MerklePNCounter[T]{
return &MerkleCounter[T]{
baseMerkleCRDT: base,
reg: register,
}
}

// Save the value of the PN Counter to the DAG.
func (mPNC *MerklePNCounter[T]) Save(ctx context.Context, data any) (ipld.Node, uint64, error) {
// Save the value of the Counter to the DAG.
func (mc *MerkleCounter[T]) Save(ctx context.Context, data any) (ipld.Node, uint64, error) {
value, ok := data.(*client.FieldValue)
if !ok {
return nil, 0, NewErrUnexpectedValueType(client.PN_COUNTER, &client.FieldValue{}, data)
return nil, 0, NewErrUnexpectedValueType(mc.reg.CType(), &client.FieldValue{}, data)
}
delta, err := mPNC.reg.Increment(ctx, value.Value().(T))
delta, err := mc.reg.Increment(ctx, value.Value().(T))
if err != nil {
return nil, 0, err
}
nd, err := mPNC.clock.AddDAGNode(ctx, delta)
nd, err := mc.clock.AddDAGNode(ctx, delta)
return nd, delta.GetPriority(), err
}
14 changes: 8 additions & 6 deletions merkle/crdt/merklecrdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,34 +66,36 @@ func (base *baseMerkleCRDT) Value(ctx context.Context) ([]byte, error) {
func InstanceWithStore(
store Stores,
schemaVersionKey core.CollectionSchemaVersionKey,
ctype client.CType,
cType client.CType,
kind client.FieldKind,
key core.DataStoreKey,
fieldName string,
) (MerkleCRDT, error) {
switch ctype {
switch cType {
case client.LWW_REGISTER:
return NewMerkleLWWRegister(
store,
schemaVersionKey,
key,
fieldName,
), nil
case client.PN_COUNTER:
case client.PN_COUNTER, client.P_COUNTER:
switch kind {
case client.FieldKind_NILLABLE_INT:
return NewMerklePNCounter[int64](
return NewMerkleCounter[int64](
store,
schemaVersionKey,
key,
fieldName,
cType == client.PN_COUNTER,
), nil
case client.FieldKind_NILLABLE_FLOAT:
return NewMerklePNCounter[float64](
return NewMerkleCounter[float64](
store,
schemaVersionKey,
key,
fieldName,
cType == client.PN_COUNTER,
), nil
}
case client.COMPOSITE:
Expand All @@ -104,5 +106,5 @@ func InstanceWithStore(
fieldName,
), nil
}
return nil, client.NewErrUnknownCRDT(ctype)
return nil, client.NewErrUnknownCRDT(cType)
}
Loading

0 comments on commit 6c6676d

Please sign in to comment.