Skip to content

Commit

Permalink
feat: Add dockey field for commit field (#1216)
Browse files Browse the repository at this point in the history
Add dockey field for commit field

Dockey was not stored for composite deltas, with this change we store dockey in the following format /collection_id/instance_type/key/field_id.
For field deltas dockey was stored but without instance_type. This change add missing intance_type to field delta.

Adjust CIDs in all tests
Add breaking change file
Switch some tests to use new framework
  • Loading branch information
islamaliev authored and shahzadlone committed Apr 13, 2023
1 parent ab03f24 commit 1208b5d
Show file tree
Hide file tree
Showing 47 changed files with 1,898 additions and 1,429 deletions.
2 changes: 1 addition & 1 deletion client/dockey.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (key DocKey) UUID() uuid.UUID {
return key.uuid
}

// UUID returns the doc key in string form.
// String returns the doc key in string form.
func (key DocKey) String() string {
buf := make([]byte, 1)
binary.PutUvarint(buf, uint64(key.version))
Expand Down
6 changes: 4 additions & 2 deletions client/request/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (

AverageFieldName = "_avg"
CountFieldName = "_count"
DocKeyFieldName = "_key"
KeyFieldName = "_key"
GroupFieldName = "_group"
SumFieldName = "_sum"
VersionFieldName = "_version"
Expand All @@ -46,6 +46,7 @@ const (
LinksFieldName = "links"
HeightFieldName = "height"
CidFieldName = "cid"
DockeyFieldName = "dockey"
SchemaVersionIDFieldName = "schemaVersionId"
DeltaFieldName = "delta"

Expand All @@ -69,7 +70,7 @@ var (
CountFieldName: true,
SumFieldName: true,
AverageFieldName: true,
DocKeyFieldName: true,
KeyFieldName: true,
}

Aggregates = map[string]struct{}{
Expand All @@ -86,6 +87,7 @@ var (
VersionFields = []string{
HeightFieldName,
CidFieldName,
DockeyFieldName,
SchemaVersionIDFieldName,
DeltaFieldName,
}
Expand Down
5 changes: 4 additions & 1 deletion core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type CompositeDAGDelta struct {
SchemaVersionID string
Priority uint64
Data []byte
DocKey []byte
SubDAGs []core.DAGLink
}

Expand All @@ -61,7 +62,8 @@ func (delta *CompositeDAGDelta) Marshal() ([]byte, error) {
SchemaVersionID string
Priority uint64
Data []byte
}{delta.SchemaVersionID, delta.Priority, delta.Data})
DocKey []byte
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -119,6 +121,7 @@ func (c CompositeDAG) Set(patch []byte, links []core.DAGLink) *CompositeDAGDelta
})
return &CompositeDAGDelta{
Data: patch,
DocKey: c.key.WithValueFlag().Bytes(),
SubDAGs: links,
SchemaVersionID: c.schemaVersionKey.SchemaVersionId,
}
Expand Down
2 changes: 1 addition & 1 deletion core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (reg LWWRegister) Set(value []byte) *LWWRegDelta {
// return NewLWWRegister(reg.id, value, reg.clock.Apply(), reg.clock)
return &LWWRegDelta{
Data: value,
DocKey: reg.key.Bytes(),
DocKey: reg.key.WithValueFlag().Bytes(),
SchemaVersionID: reg.schemaVersionKey.SchemaVersionId,
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type ReplicatorKey struct {
var _ Key = (*ReplicatorKey)(nil)

// Creates a new DataStoreKey from a string as best as it can,
// splitting the input using '/' as a field deliminater. It assumes
// splitting the input using '/' as a field deliminator. It assumes
// that the input string is in the following format:
//
// /[CollectionId]/[InstanceType]/[DocKey]/[FieldId]
Expand Down
69 changes: 40 additions & 29 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (db *db) newCollection(desc client.CollectionDescription) (*collection, err
}

docKeyField := desc.Schema.Fields[0]
if docKeyField.Kind != client.FieldKind_DocKey || docKeyField.Name != request.DocKeyFieldName {
if docKeyField.Kind != client.FieldKind_DocKey || docKeyField.Name != request.KeyFieldName {
return nil, ErrSchemaFirstFieldDocKey
}

Expand Down Expand Up @@ -255,7 +255,7 @@ func (db *db) updateCollection(

// validateUpdateCollection validates that the given collection description is a valid update.
//
// Will return true if the given desctiption differs from the current persisted state of the
// Will return true if the given description differs from the current persisted state of the
// collection. Will return an error if it fails validation.
func (db *db) validateUpdateCollection(
ctx context.Context,
Expand Down Expand Up @@ -309,7 +309,7 @@ func (db *db) validateUpdateCollection(
var existingField client.FieldDescription
var fieldAlreadyExists bool
if proposedField.ID != client.FieldID(0) ||
proposedField.Name == request.DocKeyFieldName {
proposedField.Name == request.KeyFieldName {
existingField, fieldAlreadyExists = existingFieldsByID[proposedField.ID]
}

Expand Down Expand Up @@ -597,11 +597,13 @@ func (c *collection) CreateMany(ctx context.Context, docs []*client.Document) er
return c.commitImplicitTxn(ctx, txn)
}

func (c *collection) create(ctx context.Context, txn datastore.Txn, doc *client.Document) error {
func (c *collection) getKeysFromDoc(
doc *client.Document,
) (client.DocKey, core.PrimaryDataStoreKey, error) {
// DocKey verification
buf, err := doc.Bytes()
if err != nil {
return err
return client.DocKey{}, core.PrimaryDataStoreKey{}, err
}
// @todo: grab the cid Prefix from the DocKey internal CID if available
pref := cid.Prefix{
Expand All @@ -613,17 +615,26 @@ func (c *collection) create(ctx context.Context, txn datastore.Txn, doc *client.
// And then feed it some data
doccid, err := pref.Sum(buf)
if err != nil {
return err
return client.DocKey{}, core.PrimaryDataStoreKey{}, err
}

dockey := client.NewDocKeyV0(doccid)
key := c.getPrimaryKeyFromDocKey(dockey)
if key.DocKey != doc.Key().String() {
return NewErrDocVerification(doc.Key().String(), key.DocKey)
primaryKey := c.getPrimaryKeyFromDocKey(dockey)
if primaryKey.DocKey != doc.Key().String() {
return client.DocKey{}, core.PrimaryDataStoreKey{},
NewErrDocVerification(doc.Key().String(), primaryKey.DocKey)
}
return dockey, primaryKey, nil
}

func (c *collection) create(ctx context.Context, txn datastore.Txn, doc *client.Document) error {
dockey, primaryKey, err := c.getKeysFromDoc(doc)
if err != nil {
return err
}

// check if doc already exists
exists, err := c.exists(ctx, txn, key)
exists, err := c.exists(ctx, txn, primaryKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -659,8 +670,8 @@ func (c *collection) Update(ctx context.Context, doc *client.Document) error {
}
defer c.discardImplicitTxn(ctx, txn)

dockey := c.getPrimaryKeyFromDocKey(doc.Key())
exists, err := c.exists(ctx, txn, dockey)
primaryKey := c.getPrimaryKeyFromDocKey(doc.Key())
exists, err := c.exists(ctx, txn, primaryKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -699,8 +710,8 @@ func (c *collection) Save(ctx context.Context, doc *client.Document) error {
defer c.discardImplicitTxn(ctx, txn)

// Check if document already exists with key
dockey := c.getPrimaryKeyFromDocKey(doc.Key())
exists, err := c.exists(ctx, txn, dockey)
primaryKey := c.getPrimaryKeyFromDocKey(doc.Key())
exists, err := c.exists(ctx, txn, primaryKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -728,7 +739,7 @@ func (c *collection) save(
// => Set/Publish new CRDT values
primaryKey := c.getPrimaryKeyFromDocKey(doc.Key())
links := make([]core.DAGLink, 0)
merge := make(map[string]any)
docProperties := make(map[string]any)
for k, v := range doc.Fields() {
val, err := doc.GetValueWithField(v)
if err != nil {
Expand All @@ -745,14 +756,14 @@ func (c *collection) save(
return cid.Undef, client.NewErrFieldNotExist(k)
}

c, _, err := c.saveDocValue(ctx, txn, fieldKey, val)
node, _, err := c.saveDocValue(ctx, txn, fieldKey, val)
if err != nil {
return cid.Undef, err
}
if val.IsDelete() {
merge[k] = nil
docProperties[k] = nil
} else {
merge[k] = val.Value()
docProperties[k] = val.Value()
}

// NOTE: We delay the final Clean() call until we know
Expand All @@ -766,7 +777,7 @@ func (c *collection) save(

link := core.DAGLink{
Name: k,
Cid: c.Cid(),
Cid: node.Cid(),
}
links = append(links, link)
}
Expand All @@ -776,7 +787,7 @@ func (c *collection) save(
if err != nil {
return cid.Undef, err
}
buf, err := em.Marshal(merge)
buf, err := em.Marshal(docProperties)
if err != nil {
return cid.Undef, nil
}
Expand Down Expand Up @@ -827,8 +838,8 @@ func (c *collection) Delete(ctx context.Context, key client.DocKey) (bool, error
}
defer c.discardImplicitTxn(ctx, txn)

dsKey := c.getPrimaryKeyFromDocKey(key)
exists, err := c.exists(ctx, txn, dsKey)
primaryKey := c.getPrimaryKeyFromDocKey(key)
exists, err := c.exists(ctx, txn, primaryKey)
if err != nil {
return false, err
}
Expand All @@ -837,7 +848,7 @@ func (c *collection) Delete(ctx context.Context, key client.DocKey) (bool, error
}

// run delete, commit if successful
deleted, err := c.delete(ctx, txn, dsKey)
deleted, err := c.delete(ctx, txn, primaryKey)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -911,8 +922,8 @@ func (c *collection) Exists(ctx context.Context, key client.DocKey) (bool, error
}
defer c.discardImplicitTxn(ctx, txn)

dsKey := c.getPrimaryKeyFromDocKey(key)
exists, err := c.exists(ctx, txn, dsKey)
primaryKey := c.getPrimaryKeyFromDocKey(key)
exists, err := c.exists(ctx, txn, primaryKey)
if err != nil && !errors.Is(err, ds.ErrNotFound) {
return false, err
}
Expand Down Expand Up @@ -964,7 +975,7 @@ func (c *collection) saveValueToMerkleCRDT(
args ...any) (ipld.Node, uint64, error) {
switch ctype {
case client.LWW_REGISTER:
datatype, err := c.db.crdtFactory.InstanceWithStores(
merkleCRDT, err := c.db.crdtFactory.InstanceWithStores(
txn,
core.NewCollectionSchemaVersionKey(c.Schema().VersionID),
c.db.events.Updates,
Expand All @@ -985,11 +996,11 @@ func (c *collection) saveValueToMerkleCRDT(
if !ok {
return nil, 0, ErrUnknownCRDTArgument
}
lwwreg := datatype.(*crdt.MerkleLWWRegister)
lwwreg := merkleCRDT.(*crdt.MerkleLWWRegister)
return lwwreg.Set(ctx, bytes)
case client.COMPOSITE:
key = key.WithFieldId(core.COMPOSITE_NAMESPACE)
datatype, err := c.db.crdtFactory.InstanceWithStores(
merkleCRDT, err := c.db.crdtFactory.InstanceWithStores(
txn,
core.NewCollectionSchemaVersionKey(c.Schema().VersionID),
c.db.events.Updates,
Expand All @@ -1014,7 +1025,7 @@ func (c *collection) saveValueToMerkleCRDT(
if !ok {
return nil, 0, ErrUnknownCRDTArgument
}
comp := datatype.(*crdt.MerkleCompositeDAG)
comp := merkleCRDT.(*crdt.MerkleCompositeDAG)
return comp.Set(ctx, bytes, links)
}
return nil, 0, ErrUnknownCRDT
Expand Down
2 changes: 1 addition & 1 deletion db/collection_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (c *collection) updateWithFilter(
}

// add successful updated doc to results
results.DocKeys = append(results.DocKeys, doc[request.DocKeyFieldName].(string))
results.DocKeys = append(results.DocKeys, doc[request.KeyFieldName].(string))
results.Count++
}

Expand Down
6 changes: 6 additions & 0 deletions docs/data_format_changes/i846-add-dockey-to-blocks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Store dockey field to delta (block) storage

To be able to request dockey field on commits, it had to be stored first.
Composite blocks didn't have dockey field, so it was added to the block struct.
Field blocks had dockey field, but it didn't store the key with it's instance type.
That's why all CIDs of commits needed to be regenerated.
18 changes: 9 additions & 9 deletions merkle/clock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,28 +160,28 @@ func (mc *MerkleClock) ProcessNode(
children := []cid.Cid{}

for _, l := range links {
child := l.Cid
log.Debug(ctx, "Scanning for replacement heads", logging.NewKV("Child", child))
isHead, err := mc.headset.IsHead(ctx, child)
linkCid := l.Cid
log.Debug(ctx, "Scanning for replacement heads", logging.NewKV("Child", linkCid))
isHead, err := mc.headset.IsHead(ctx, linkCid)
if err != nil {
return nil, NewErrCheckingHead(child, err)
return nil, NewErrCheckingHead(linkCid, err)
}

if isHead {
log.Debug(ctx, "Found head, replacing!")
// reached one of the current heads, replace it with the tip
// of current branch
err = mc.headset.Replace(ctx, child, root, rootPrio)
err = mc.headset.Replace(ctx, linkCid, root, rootPrio)
if err != nil {
return nil, NewErrReplacingHead(child, root, err)
return nil, NewErrReplacingHead(linkCid, root, err)
}

continue
}

known, err := mc.dagstore.Has(ctx, child)
known, err := mc.dagstore.Has(ctx, linkCid)
if err != nil {
return nil, NewErrCouldNotFindBlock(child, err)
return nil, NewErrCouldNotFindBlock(linkCid, err)
}
if known {
// we reached a non-head node in the known tree.
Expand All @@ -201,7 +201,7 @@ func (mc *MerkleClock) ProcessNode(
continue
}

children = append(children, child)
children = append(children, linkCid)
}

return children, nil
Expand Down
4 changes: 2 additions & 2 deletions merkle/clock/ipld.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func makeNode(delta core.Delta, heads []cid.Cid) (ipld.Node, error) {

// add delta specific links
if comp, ok := delta.(core.CompositeDelta); ok {
for _, dlink := range comp.Links() {
if err = nd.AddRawLink(dlink.Name, &ipld.Link{Cid: dlink.Cid}); err != nil {
for _, dagLink := range comp.Links() {
if err = nd.AddRawLink(dagLink.Name, &ipld.Link{Cid: dagLink.Cid}); err != nil {
return nil, err
}
}
Expand Down
Loading

0 comments on commit 1208b5d

Please sign in to comment.