Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add dockey field for commit field #1216

Merged
merged 21 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/dockey.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (key DocKey) UUID() uuid.UUID {
return key.uuid
}

// UUID returns the doc key in string form.
// String returns the doc key in string form.
islamaliev marked this conversation as resolved.
Show resolved Hide resolved
func (key DocKey) String() string {
buf := make([]byte, 1)
binary.PutUvarint(buf, uint64(key.version))
Expand Down
6 changes: 4 additions & 2 deletions client/request/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (

AverageFieldName = "_avg"
CountFieldName = "_count"
DocKeyFieldName = "_key"
KeyFieldName = "_key"
GroupFieldName = "_group"
SumFieldName = "_sum"
VersionFieldName = "_version"
Expand All @@ -46,6 +46,7 @@ const (
LinksFieldName = "links"
HeightFieldName = "height"
CidFieldName = "cid"
DockeyFieldName = "dockey"
SchemaVersionIDFieldName = "schemaVersionId"
DeltaFieldName = "delta"

Expand All @@ -69,7 +70,7 @@ var (
CountFieldName: true,
SumFieldName: true,
AverageFieldName: true,
DocKeyFieldName: true,
KeyFieldName: true,
}

Aggregates = map[string]struct{}{
Expand All @@ -86,6 +87,7 @@ var (
VersionFields = []string{
HeightFieldName,
CidFieldName,
DockeyFieldName,
SchemaVersionIDFieldName,
DeltaFieldName,
}
Expand Down
5 changes: 4 additions & 1 deletion core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type CompositeDAGDelta struct {
SchemaVersionID string
Priority uint64
Data []byte
DocKey []byte
SubDAGs []core.DAGLink
}

Expand All @@ -61,7 +62,8 @@ func (delta *CompositeDAGDelta) Marshal() ([]byte, error) {
SchemaVersionID string
Priority uint64
Data []byte
}{delta.SchemaVersionID, delta.Priority, delta.Data})
DocKey []byte
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -119,6 +121,7 @@ func (c CompositeDAG) Set(patch []byte, links []core.DAGLink) *CompositeDAGDelta
})
return &CompositeDAGDelta{
Data: patch,
DocKey: c.key.WithValueFlag().Bytes(),
SubDAGs: links,
SchemaVersionID: c.schemaVersionKey.SchemaVersionId,
}
Expand Down
2 changes: 1 addition & 1 deletion core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (reg LWWRegister) Set(value []byte) *LWWRegDelta {
// return NewLWWRegister(reg.id, value, reg.clock.Apply(), reg.clock)
return &LWWRegDelta{
Data: value,
DocKey: reg.key.Bytes(),
DocKey: reg.key.WithValueFlag().Bytes(),
SchemaVersionID: reg.schemaVersionKey.SchemaVersionId,
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type ReplicatorKey struct {
var _ Key = (*ReplicatorKey)(nil)

// Creates a new DataStoreKey from a string as best as it can,
// splitting the input using '/' as a field deliminater. It assumes
// splitting the input using '/' as a field deliminator. It assumes
// that the input string is in the following format:
//
// /[CollectionId]/[InstanceType]/[DocKey]/[FieldId]
Expand Down
69 changes: 40 additions & 29 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (db *db) newCollection(desc client.CollectionDescription) (*collection, err
}

docKeyField := desc.Schema.Fields[0]
if docKeyField.Kind != client.FieldKind_DocKey || docKeyField.Name != request.DocKeyFieldName {
if docKeyField.Kind != client.FieldKind_DocKey || docKeyField.Name != request.KeyFieldName {
return nil, ErrSchemaFirstFieldDocKey
}

Expand Down Expand Up @@ -255,7 +255,7 @@ func (db *db) updateCollection(

// validateUpdateCollection validates that the given collection description is a valid update.
//
// Will return true if the given desctiption differs from the current persisted state of the
// Will return true if the given description differs from the current persisted state of the
// collection. Will return an error if it fails validation.
func (db *db) validateUpdateCollection(
ctx context.Context,
Expand Down Expand Up @@ -309,7 +309,7 @@ func (db *db) validateUpdateCollection(
var existingField client.FieldDescription
var fieldAlreadyExists bool
if proposedField.ID != client.FieldID(0) ||
proposedField.Name == request.DocKeyFieldName {
proposedField.Name == request.KeyFieldName {
existingField, fieldAlreadyExists = existingFieldsByID[proposedField.ID]
}

Expand Down Expand Up @@ -597,11 +597,13 @@ func (c *collection) CreateMany(ctx context.Context, docs []*client.Document) er
return c.commitImplicitTxn(ctx, txn)
}

func (c *collection) create(ctx context.Context, txn datastore.Txn, doc *client.Document) error {
func (c *collection) getKeysFromDoc(
doc *client.Document,
) (client.DocKey, core.PrimaryDataStoreKey, error) {
// DocKey verification
buf, err := doc.Bytes()
if err != nil {
return err
return client.DocKey{}, core.PrimaryDataStoreKey{}, err
}
// @todo: grab the cid Prefix from the DocKey internal CID if available
pref := cid.Prefix{
Expand All @@ -613,17 +615,26 @@ func (c *collection) create(ctx context.Context, txn datastore.Txn, doc *client.
// And then feed it some data
doccid, err := pref.Sum(buf)
if err != nil {
return err
return client.DocKey{}, core.PrimaryDataStoreKey{}, err
}

dockey := client.NewDocKeyV0(doccid)
key := c.getPrimaryKeyFromDocKey(dockey)
if key.DocKey != doc.Key().String() {
return NewErrDocVerification(doc.Key().String(), key.DocKey)
primaryKey := c.getPrimaryKeyFromDocKey(dockey)
if primaryKey.DocKey != doc.Key().String() {
return client.DocKey{}, core.PrimaryDataStoreKey{},
NewErrDocVerification(doc.Key().String(), primaryKey.DocKey)
}
return dockey, primaryKey, nil
}

func (c *collection) create(ctx context.Context, txn datastore.Txn, doc *client.Document) error {
dockey, primaryKey, err := c.getKeysFromDoc(doc)
if err != nil {
return err
}

// check if doc already exists
exists, err := c.exists(ctx, txn, key)
exists, err := c.exists(ctx, txn, primaryKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -659,8 +670,8 @@ func (c *collection) Update(ctx context.Context, doc *client.Document) error {
}
defer c.discardImplicitTxn(ctx, txn)

dockey := c.getPrimaryKeyFromDocKey(doc.Key())
exists, err := c.exists(ctx, txn, dockey)
primaryKey := c.getPrimaryKeyFromDocKey(doc.Key())
exists, err := c.exists(ctx, txn, primaryKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -699,8 +710,8 @@ func (c *collection) Save(ctx context.Context, doc *client.Document) error {
defer c.discardImplicitTxn(ctx, txn)

// Check if document already exists with key
dockey := c.getPrimaryKeyFromDocKey(doc.Key())
exists, err := c.exists(ctx, txn, dockey)
primaryKey := c.getPrimaryKeyFromDocKey(doc.Key())
exists, err := c.exists(ctx, txn, primaryKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -728,7 +739,7 @@ func (c *collection) save(
// => Set/Publish new CRDT values
primaryKey := c.getPrimaryKeyFromDocKey(doc.Key())
links := make([]core.DAGLink, 0)
merge := make(map[string]any)
docProperties := make(map[string]any)
for k, v := range doc.Fields() {
val, err := doc.GetValueWithField(v)
if err != nil {
Expand All @@ -745,14 +756,14 @@ func (c *collection) save(
return cid.Undef, client.NewErrFieldNotExist(k)
}

c, _, err := c.saveDocValue(ctx, txn, fieldKey, val)
node, _, err := c.saveDocValue(ctx, txn, fieldKey, val)
if err != nil {
return cid.Undef, err
}
if val.IsDelete() {
merge[k] = nil
docProperties[k] = nil
} else {
merge[k] = val.Value()
docProperties[k] = val.Value()
}

// NOTE: We delay the final Clean() call until we know
Expand All @@ -766,7 +777,7 @@ func (c *collection) save(

link := core.DAGLink{
Name: k,
Cid: c.Cid(),
Cid: node.Cid(),
}
links = append(links, link)
}
Expand All @@ -776,7 +787,7 @@ func (c *collection) save(
if err != nil {
return cid.Undef, err
}
buf, err := em.Marshal(merge)
buf, err := em.Marshal(docProperties)
if err != nil {
return cid.Undef, nil
}
Expand Down Expand Up @@ -827,8 +838,8 @@ func (c *collection) Delete(ctx context.Context, key client.DocKey) (bool, error
}
defer c.discardImplicitTxn(ctx, txn)

dsKey := c.getPrimaryKeyFromDocKey(key)
exists, err := c.exists(ctx, txn, dsKey)
primaryKey := c.getPrimaryKeyFromDocKey(key)
exists, err := c.exists(ctx, txn, primaryKey)
if err != nil {
return false, err
}
Expand All @@ -837,7 +848,7 @@ func (c *collection) Delete(ctx context.Context, key client.DocKey) (bool, error
}

// run delete, commit if successful
deleted, err := c.delete(ctx, txn, dsKey)
deleted, err := c.delete(ctx, txn, primaryKey)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -911,8 +922,8 @@ func (c *collection) Exists(ctx context.Context, key client.DocKey) (bool, error
}
defer c.discardImplicitTxn(ctx, txn)

dsKey := c.getPrimaryKeyFromDocKey(key)
exists, err := c.exists(ctx, txn, dsKey)
primaryKey := c.getPrimaryKeyFromDocKey(key)
exists, err := c.exists(ctx, txn, primaryKey)
if err != nil && !errors.Is(err, ds.ErrNotFound) {
return false, err
}
Expand Down Expand Up @@ -964,7 +975,7 @@ func (c *collection) saveValueToMerkleCRDT(
args ...any) (ipld.Node, uint64, error) {
switch ctype {
case client.LWW_REGISTER:
datatype, err := c.db.crdtFactory.InstanceWithStores(
merkleCRDT, err := c.db.crdtFactory.InstanceWithStores(
txn,
core.NewCollectionSchemaVersionKey(c.Schema().VersionID),
c.db.events.Updates,
Expand All @@ -985,11 +996,11 @@ func (c *collection) saveValueToMerkleCRDT(
if !ok {
return nil, 0, ErrUnknownCRDTArgument
}
lwwreg := datatype.(*crdt.MerkleLWWRegister)
lwwreg := merkleCRDT.(*crdt.MerkleLWWRegister)
return lwwreg.Set(ctx, bytes)
case client.COMPOSITE:
key = key.WithFieldId(core.COMPOSITE_NAMESPACE)
datatype, err := c.db.crdtFactory.InstanceWithStores(
merkleCRDT, err := c.db.crdtFactory.InstanceWithStores(
txn,
core.NewCollectionSchemaVersionKey(c.Schema().VersionID),
c.db.events.Updates,
Expand All @@ -1014,7 +1025,7 @@ func (c *collection) saveValueToMerkleCRDT(
if !ok {
return nil, 0, ErrUnknownCRDTArgument
}
comp := datatype.(*crdt.MerkleCompositeDAG)
comp := merkleCRDT.(*crdt.MerkleCompositeDAG)
return comp.Set(ctx, bytes, links)
}
return nil, 0, ErrUnknownCRDT
Expand Down
2 changes: 1 addition & 1 deletion db/collection_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (c *collection) updateWithFilter(
}

// add successful updated doc to results
results.DocKeys = append(results.DocKeys, doc[request.DocKeyFieldName].(string))
results.DocKeys = append(results.DocKeys, doc[request.KeyFieldName].(string))
results.Count++
}

Expand Down
6 changes: 6 additions & 0 deletions docs/data_format_changes/i846-add-dockey-to-blocks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Store dockey field to delta (block) storage

To be able to request dockey field on commits, it had to be stored first.
Composite blocks didn't have dockey field, so it was added to the block struct.
Field blocks had dockey field, but it didn't store the key with it's instance type.
That's why all CIDs of commits needed to be regenerated.
18 changes: 9 additions & 9 deletions merkle/clock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,28 +160,28 @@ func (mc *MerkleClock) ProcessNode(
children := []cid.Cid{}

for _, l := range links {
child := l.Cid
log.Debug(ctx, "Scanning for replacement heads", logging.NewKV("Child", child))
isHead, err := mc.headset.IsHead(ctx, child)
linkCid := l.Cid
log.Debug(ctx, "Scanning for replacement heads", logging.NewKV("Child", linkCid))
isHead, err := mc.headset.IsHead(ctx, linkCid)
if err != nil {
return nil, NewErrCheckingHead(child, err)
return nil, NewErrCheckingHead(linkCid, err)
}

if isHead {
log.Debug(ctx, "Found head, replacing!")
// reached one of the current heads, replace it with the tip
// of current branch
err = mc.headset.Replace(ctx, child, root, rootPrio)
err = mc.headset.Replace(ctx, linkCid, root, rootPrio)
if err != nil {
return nil, NewErrReplacingHead(child, root, err)
return nil, NewErrReplacingHead(linkCid, root, err)
}

continue
}

known, err := mc.dagstore.Has(ctx, child)
known, err := mc.dagstore.Has(ctx, linkCid)
if err != nil {
return nil, NewErrCouldNotFindBlock(child, err)
return nil, NewErrCouldNotFindBlock(linkCid, err)
}
if known {
// we reached a non-head node in the known tree.
Expand All @@ -201,7 +201,7 @@ func (mc *MerkleClock) ProcessNode(
continue
}

children = append(children, child)
children = append(children, linkCid)
}

return children, nil
Expand Down
4 changes: 2 additions & 2 deletions merkle/clock/ipld.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func makeNode(delta core.Delta, heads []cid.Cid) (ipld.Node, error) {

// add delta specific links
if comp, ok := delta.(core.CompositeDelta); ok {
for _, dlink := range comp.Links() {
if err = nd.AddRawLink(dlink.Name, &ipld.Link{Cid: dlink.Cid}); err != nil {
for _, dagLink := range comp.Links() {
if err = nd.AddRawLink(dagLink.Name, &ipld.Link{Cid: dagLink.Cid}); err != nil {
return nil, err
}
}
Expand Down
Loading