From 0b8bca9f42801e50e8a6367bfd9fd4ab42190d34 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Mon, 4 Nov 2024 08:56:47 +0100 Subject: [PATCH 01/14] Add ACP to pubsub KMS --- event/event.go | 9 + internal/db/collection_retriever.go | 72 ++++ internal/db/db.go | 32 ++ internal/db/errors.go | 6 + internal/db/iterator.go | 111 ++++++ internal/db/merge.go | 11 +- internal/db/p2p_replicator.go | 27 +- internal/db/permission/check.go | 4 +- internal/kms/pubsub.go | 79 +++- node/node.go | 9 + tests/integration/acp.go | 94 +++-- tests/integration/encryption/peer_acp_test.go | 375 ++++++++++++++++++ tests/integration/events.go | 43 +- tests/integration/p2p.go | 5 +- tests/integration/state.go | 13 +- tests/integration/test_case.go | 7 + tests/integration/utils.go | 5 +- 17 files changed, 828 insertions(+), 74 deletions(-) create mode 100644 internal/db/collection_retriever.go create mode 100644 internal/db/iterator.go create mode 100644 tests/integration/encryption/peer_acp_test.go diff --git a/event/event.go b/event/event.go index 5ae882c6bb..53d5f0dbb4 100644 --- a/event/event.go +++ b/event/event.go @@ -96,6 +96,15 @@ type Merge struct { SchemaRoot string } +// MergeComplete is a notification that a merge has been completed. +type MergeComplete struct { + // Merge is the merge that was completed. + Merge Merge + + // Decrypted specifies if the merge payload was decrypted. + Decrypted bool +} + // Message contains event info. type Message struct { // Name is the name of the event this message was generated from. diff --git a/internal/db/collection_retriever.go b/internal/db/collection_retriever.go new file mode 100644 index 0000000000..4549aa43cd --- /dev/null +++ b/internal/db/collection_retriever.go @@ -0,0 +1,72 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package db + +import ( + "context" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/internal/db/description" +) + +// CollectionRetriever is a helper struct that retrieves a collection from a document ID. +type CollectionRetriever struct { + db *db +} + +// NewCollectionRetriever creates a new CollectionRetriever. +func NewCollectionRetriever(database client.DB) *CollectionRetriever { + internalDB, ok := database.(*db) + if !ok { + return nil + } + return &CollectionRetriever{ + db: internalDB, + } +} + +// RetrieveCollectionFromDocID retrieves a collection from a document ID. +func (r *CollectionRetriever) RetrieveCollectionFromDocID( + ctx context.Context, + docID string, +) (client.Collection, error) { + ctx, txn, err := ensureContextTxn(ctx, r.db, false) + if err != nil { + return nil, err + } + defer txn.Discard(ctx) + + headIterator, err := NewHeadBlocksIteratorFromTxn(ctx, txn, docID) + if err != nil { + return nil, err + } + + hasValue, err := headIterator.Next() + if err != nil { + return nil, err + } + + if !hasValue { + return nil, NewErrDocIDNotFound(docID) + } + + schema, err := description.GetSchemaVersion(ctx, txn, headIterator.CurrentBlock().Delta.GetSchemaVersionID()) + if err != nil { + return nil, err + } + + col, err := getCollectionFromRootSchema(ctx, r.db, schema.Root) + if err != nil { + return nil, err + } + + return col, nil +} diff --git a/internal/db/db.go b/internal/db/db.go index f2782bbe3a..75aab2936c 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -262,9 +262,41 @@ func (db *db) AddDocActorRelationship( return client.AddDocActorRelationshipResult{}, err } + err = db.publishDocUpdateEvent(ctx, docID, collection) + if err != nil { + return client.AddDocActorRelationshipResult{}, err + } + return client.AddDocActorRelationshipResult{ExistedAlready: exists}, nil } +func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collection client.Collection) error { + headsIterator, err := NewHeadBlocksIterator(ctx, db.multistore.Headstore(), db.Blockstore(), docID) + if err != nil { + return err + } + defer headsIterator.Close() + + for { + hasValue, err := headsIterator.Next() + if err != nil { + return err + } + if !hasValue { + break + } + + updateEvent := event.Update{ + DocID: docID, + Cid: headsIterator.CurrentCid(), + SchemaRoot: collection.Schema().Root, + Block: headsIterator.CurrentRawBlock(), + } + db.events.Publish(event.NewMessage(event.UpdateName, updateEvent)) + } + return nil +} + func (db *db) DeleteDocActorRelationship( ctx context.Context, collectionName string, diff --git a/internal/db/errors.go b/internal/db/errors.go index bd38cf052e..920e587e50 100644 --- a/internal/db/errors.go +++ b/internal/db/errors.go @@ -106,6 +106,7 @@ const ( errColNotMaterialized string = "non-materialized collections are not supported" errMaterializedViewAndACPNotSupported string = "materialized views do not support ACP" errInvalidDefaultFieldValue string = "default field value is invalid" + errDocIDNotFound string = "docID not found" ) var ( @@ -152,6 +153,7 @@ var ( ErrContextDone = errors.New("context done") ErrFailedToRetryDoc = errors.New("failed to retry doc") ErrTimeoutDocRetry = errors.New("timeout while retrying doc") + ErrDocIDNotFound = errors.New(errDocIDNotFound) ) // NewErrFailedToGetHeads returns a new error indicating that the heads of a document @@ -690,3 +692,7 @@ func NewErrDefaultFieldValueInvalid(collection string, inner error) error { errors.NewKV("Inner", inner), ) } + +func NewErrDocIDNotFound(docID string) error { + return errors.New(errDocIDNotFound, errors.NewKV("DocID", docID)) +} diff --git a/internal/db/iterator.go b/internal/db/iterator.go new file mode 100644 index 0000000000..cfa79e71eb --- /dev/null +++ b/internal/db/iterator.go @@ -0,0 +1,111 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package db + +import ( + "context" + "io" + + "github.com/ipfs/go-cid" + + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/internal/core" + coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/keys" + "github.com/sourcenetwork/defradb/internal/merkle/clock" +) + +// DocHeadBlocksIterator is an iterator that iterates over the head blocks of a document. +type DocHeadBlocksIterator struct { + ctx context.Context + blockstore datastore.Blockstore + cids []cid.Cid + + currentCid cid.Cid + currentBlock *coreblock.Block + currentRawBlock []byte +} + +var _ io.Closer = (*DocHeadBlocksIterator)(nil) + +func (h *DocHeadBlocksIterator) Close() error { + return nil +} + +// NewHeadBlocksIterator creates a new DocHeadBlocksIterator. +func NewHeadBlocksIterator( + ctx context.Context, + headstore datastore.DSReaderWriter, + blockstore datastore.Blockstore, + docID string, +) (*DocHeadBlocksIterator, error) { + headStoreKey := keys.HeadStoreKey{ + DocID: docID, + FieldID: core.COMPOSITE_NAMESPACE, + } + headset := clock.NewHeadSet(headstore, headStoreKey) + cids, _, err := headset.List(ctx) + if err != nil { + return nil, err + } + return &DocHeadBlocksIterator{ + ctx: ctx, + blockstore: blockstore, + cids: cids, + }, nil +} + +// NewHeadBlocksIteratorFromTxn creates a new DocHeadBlocksIterator from a transaction. +func NewHeadBlocksIteratorFromTxn( + ctx context.Context, + txn datastore.Txn, + docID string, +) (*DocHeadBlocksIterator, error) { + return NewHeadBlocksIterator(ctx, txn.Headstore(), txn.Blockstore(), docID) +} + +// Next advances the iterator to the next block. +func (h *DocHeadBlocksIterator) Next() (bool, error) { + if len(h.cids) == 0 { + return false, nil + } + nextCid := h.cids[0] + h.cids = h.cids[1:] + + rawBlock, err := h.blockstore.Get(h.ctx, nextCid) + if err != nil { + return false, err + } + blk, err := coreblock.GetFromBytes(rawBlock.RawData()) + if err != nil { + return false, err + } + + h.currentCid = nextCid + h.currentBlock = blk + h.currentRawBlock = rawBlock.RawData() + return true, nil +} + +// CurrentCid returns the CID of the current block. +func (h *DocHeadBlocksIterator) CurrentCid() cid.Cid { + return h.currentCid +} + +// CurrentBlock returns the current block. +func (h *DocHeadBlocksIterator) CurrentBlock() *coreblock.Block { + return h.currentBlock +} + +// CurrentRawBlock returns the raw data of the current block. +func (h *DocHeadBlocksIterator) CurrentRawBlock() []byte { + return h.currentRawBlock +} diff --git a/internal/db/merge.go b/internal/db/merge.go index 74db1ad302..898700a9ed 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -84,7 +84,10 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error { } // send a complete event so we can track merges in the integration tests - db.events.Publish(event.NewMessage(event.MergeCompleteName, dagMerge)) + db.events.Publish(event.NewMessage(event.MergeCompleteName, event.MergeComplete{ + Merge: dagMerge, + Decrypted: len(mp.missingEncryptionBlocks) == 0, + })) return nil } @@ -264,7 +267,9 @@ func (mp *mergeProcessor) tryFetchMissingBlocksAndMerge(ctx context.Context) err return res.Error } - clear(mp.missingEncryptionBlocks) + if len(res.Items) == 0 { + return nil + } for i := range res.Items { _, link, err := cid.CidFromBytes(res.Items[i].Link) @@ -280,6 +285,8 @@ func (mp *mergeProcessor) tryFetchMissingBlocksAndMerge(ctx context.Context) err mp.availableEncryptionBlocks[cidlink.Link{Cid: link}] = &encBlock } + clear(mp.missingEncryptionBlocks) + err := mp.mergeComposites(ctx) if err != nil { return err diff --git a/internal/db/p2p_replicator.go b/internal/db/p2p_replicator.go index 61c082d210..6e8f1fb0ef 100644 --- a/internal/db/p2p_replicator.go +++ b/internal/db/p2p_replicator.go @@ -26,7 +26,6 @@ import ( "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/internal/core" - coreblock "github.com/sourcenetwork/defradb/internal/core/block" "github.com/sourcenetwork/defradb/internal/keys" "github.com/sourcenetwork/defradb/internal/merkle/clock" ) @@ -646,31 +645,29 @@ func (db *db) retryDoc(ctx context.Context, docID string) error { return err } defer txn.Discard(ctx) - headStoreKey := keys.HeadStoreKey{ - DocID: docID, - FieldID: core.COMPOSITE_NAMESPACE, - } - headset := clock.NewHeadSet(txn.Headstore(), headStoreKey) - cids, _, err := headset.List(ctx) + + headsIterator, err := NewHeadBlocksIteratorFromTxn(ctx, txn, docID) if err != nil { return err } + defer headsIterator.Close() - for _, c := range cids { + for { select { case <-ctx.Done(): return ErrContextDone default: } - rawblk, err := txn.Blockstore().Get(ctx, c) + + hasValue, err := headsIterator.Next() if err != nil { return err } - blk, err := coreblock.GetFromBytes(rawblk.RawData()) - if err != nil { - return err + if !hasValue { + break } - schema, err := db.getSchemaByVersionID(ctx, blk.Delta.GetSchemaVersionID()) + + schema, err := db.getSchemaByVersionID(ctx, headsIterator.CurrentBlock().Delta.GetSchemaVersionID()) if err != nil { return err } @@ -678,9 +675,9 @@ func (db *db) retryDoc(ctx context.Context, docID string) error { defer close(successChan) updateEvent := event.Update{ DocID: docID, - Cid: c, + Cid: headsIterator.CurrentCid(), SchemaRoot: schema.Root, - Block: rawblk.RawData(), + Block: headsIterator.CurrentRawBlock(), IsRetry: true, // Because the retry is done in a separate goroutine but the retry handling process should be synchronous, // we use a channel to block while waiting for the success status of the retry. diff --git a/internal/db/permission/check.go b/internal/db/permission/check.go index b19500f41b..ce111bccaf 100644 --- a/internal/db/permission/check.go +++ b/internal/db/permission/check.go @@ -50,7 +50,7 @@ func CheckAccessOfDocOnCollectionWithACP( // Now that we know acp is available and the collection is permissioned, before checking access with // acp directly we need to make sure that the document is not public, as public documents will not - // be regestered with acp. We give unrestricted access to public documents, so it does not matter + // be registered with acp. We give unrestricted access to public documents, so it does not matter // whether the request has a signature identity or not at this stage of the check. isRegistered, err := acpSystem.IsDocRegistered( ctx, @@ -69,7 +69,7 @@ func CheckAccessOfDocOnCollectionWithACP( // At this point if the request is not signatured, then it has no access, because: // the collection has a policy on it, and the acp is enabled/available, - // and the document is not public (is regestered with acp). + // and the document is not public (is registered with acp). if !identity.HasValue() { return false, nil } diff --git a/internal/kms/pubsub.go b/internal/kms/pubsub.go index cbcd6ee141..7da101346f 100644 --- a/internal/kms/pubsub.go +++ b/internal/kms/pubsub.go @@ -20,12 +20,16 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" libpeer "github.com/libp2p/go-libp2p/core/peer" rpc "github.com/sourcenetwork/go-libp2p-pubsub-rpc" + "github.com/sourcenetwork/immutable" grpcpeer "google.golang.org/grpc/peer" + "github.com/sourcenetwork/defradb/acp" + "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/crypto" "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/event" + coreblock "github.com/sourcenetwork/defradb/internal/core/block" "github.com/sourcenetwork/defradb/internal/encryption" ) @@ -36,6 +40,10 @@ type PubSubServer interface { SendPubSubMessage(context.Context, string, []byte) (<-chan rpc.Response, error) } +type CollectionRetriever interface { + RetrieveCollectionFromDocID(context.Context, string) (client.Collection, error) +} + type pubSubService struct { ctx context.Context peerID libpeer.ID @@ -43,6 +51,9 @@ type pubSubService struct { keyRequestedSub *event.Subscription eventBus *event.Bus encStore *ipldEncStorage + acp immutable.Option[acp.ACP] + colRetriever CollectionRetriever + nodeDID string } var _ Service = (*pubSubService)(nil) @@ -69,13 +80,19 @@ func NewPubSubService( pubsub PubSubServer, eventBus *event.Bus, encstore datastore.Blockstore, + acp immutable.Option[acp.ACP], + colRetriever CollectionRetriever, + nodeDID string, ) (*pubSubService, error) { s := &pubSubService{ - ctx: ctx, - peerID: peerID, - pubsub: pubsub, - eventBus: eventBus, - encStore: newIPLDEncryptionStorage(encstore), + ctx: ctx, + peerID: peerID, + pubsub: pubsub, + eventBus: eventBus, + encStore: newIPLDEncryptionStorage(encstore), + acp: acp, + colRetriever: colRetriever, + nodeDID: nodeDID, } err := pubsub.AddPubSubTopic(pubsubTopic, s.handleRequestFromPeer) if err != nil { @@ -127,6 +144,7 @@ func (s *pubSubService) handleKeyRequestedEvent() { } type fetchEncryptionKeyRequest struct { + Identity []byte Links [][]byte EphemeralPublicKey []byte } @@ -153,6 +171,7 @@ func (s *pubSubService) prepareFetchEncryptionKeyRequest( ephemeralPublicKey []byte, ) (*fetchEncryptionKeyRequest, error) { req := &fetchEncryptionKeyRequest{ + Identity: []byte(s.nodeDID), EphemeralPublicKey: ephemeralPublicKey, } @@ -260,9 +279,12 @@ func (s *pubSubService) tryGenEncryptionKeyLocally( req *fetchEncryptionKeyRequest, ) (*fetchEncryptionKeyReply, error) { blocks, err := s.getEncryptionKeysLocally(ctx, req) - if err != nil || len(blocks) == 0 { + if err != nil { return nil, err } + if len(blocks) == 0 { + return &fetchEncryptionKeyReply{}, nil + } reqEphPubKey, err := crypto.X25519PublicKeyFromBytes(req.EphemeralPublicKey) if err != nil { @@ -317,6 +339,14 @@ func (s *pubSubService) getEncryptionKeysLocally( continue } + hasPerm, err := s.doesIdentityHaveDocPermission(ctx, string(req.Identity), encBlock) + if err != nil { + return nil, err + } + if !hasPerm { + continue + } + encBlockBytes, err := encBlock.Marshal() if err != nil { return nil, err @@ -327,6 +357,43 @@ func (s *pubSubService) getEncryptionKeysLocally( return blocks, nil } +func (s *pubSubService) doesIdentityHaveDocPermission( + ctx context.Context, + actorIdentity string, + entBlock *coreblock.Encryption, +) (bool, error) { + if !s.acp.HasValue() { + return true, nil + } + + docID := string(entBlock.DocID) + collection, err := s.colRetriever.RetrieveCollectionFromDocID(ctx, docID) + if err != nil { + return false, err + } + + policy := collection.Definition().Description.Policy + if !policy.HasValue() || policy.Value().ID == "" || policy.Value().ResourceName == "" { + return true, nil + } + + policyID, resourceName := policy.Value().ID, policy.Value().ResourceName + + isRegistered, err := s.acp.Value().IsDocRegistered(ctx, policyID, resourceName, docID) + if err != nil { + return false, err + } + + if !isRegistered { + // Unrestricted access as it is a public document. + return true, nil + } + + hasPerm, err := s.acp.Value().CheckDocAccess(ctx, acp.ReadPermission, actorIdentity, policyID, resourceName, docID) + + return hasPerm, err +} + func encodeToBase64(data []byte) []byte { encoded := make([]byte, base64.StdEncoding.EncodedLen(len(data))) base64.StdEncoding.Encode(encoded, data) diff --git a/node/node.go b/node/node.go index 0a1b813862..aa47bfbc5c 100644 --- a/node/node.go +++ b/node/node.go @@ -158,6 +158,12 @@ func (n *Node) Start(ctx context.Context) error { if err != nil { return err } + + ident, err := n.DB.GetNodeIdentity(ctx) + if err != nil { + return err + } + if n.options.kmsType.HasValue() { switch n.options.kmsType.Value() { case kms.PubSubServiceType: @@ -167,6 +173,9 @@ func (n *Node) Start(ctx context.Context) error { n.Peer.Server(), n.DB.Events(), n.DB.Encstore(), + acp, + db.NewCollectionRetriever(n.DB), + ident.Value().DID, ) } if err != nil { diff --git a/tests/integration/acp.go b/tests/integration/acp.go index 8269245757..92f3250ad9 100644 --- a/tests/integration/acp.go +++ b/tests/integration/acp.go @@ -181,11 +181,14 @@ func addDocActorRelationshipACP( s *state, action AddDocActorRelationship, ) { + var docID string + actionNodeID := action.NodeID nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collectionName, docID := getCollectionAndDocInfo(s, action.CollectionID, action.DocID, nodeID) + var collectionName string + collectionName, docID = getCollectionAndDocInfo(s, action.CollectionID, action.DocID, nodeID) exists, err := node.AddDocActorRelationship( getContextWithIdentity(s.ctx, s, action.RequestorIdentity, nodeID), @@ -206,9 +209,14 @@ func addDocActorRelationshipACP( // The relationship should only be added to a SourceHub chain once - there is no need to loop through // the nodes. if acpType == SourceHubACPType { + actionNodeID = immutable.Some(0) break } } + + if action.ExpectedError == "" { + waitForUpdateEvents(s, actionNodeID, map[string]struct{}{docID: {}}) + } } // DeleteDocActorRelationship will attempt to delete a relationship between a document and an actor. @@ -356,7 +364,9 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { return nil, err } - out, err := exec.Command("sourcehubd", "init", moniker, "--chain-id", chainID, "--home", directory).CombinedOutput() + args := []string{"init", moniker, "--chain-id", chainID, "--home", directory} + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + out, err := exec.Command("sourcehubd", args...).CombinedOutput() s.t.Log(string(out)) if err != nil { return nil, err @@ -389,22 +399,27 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { return nil, err } - out, err = exec.Command( - "sourcehubd", "keys", "import-hex", validatorName, acpKeyHex, + args = []string{ + "keys", "import-hex", validatorName, acpKeyHex, "--keyring-backend", keyringBackend, "--home", directory, - ).CombinedOutput() + } + + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + out, err = exec.Command("sourcehubd", args...).CombinedOutput() s.t.Log(string(out)) if err != nil { return nil, err } - out, err = exec.Command( - "sourcehubd", "keys", "show", validatorName, + args = []string{ + "keys", "show", validatorName, "--address", "--keyring-backend", keyringBackend, "--home", directory, - ).CombinedOutput() + } + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + out, err = exec.Command("sourcehubd", args...).CombinedOutput() s.t.Log(string(out)) if err != nil { return nil, err @@ -414,28 +429,31 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { validatorAddress := strings.TrimSpace(string(out)) s.sourcehubAddress = validatorAddress - out, err = exec.Command( - "sourcehubd", "genesis", "add-genesis-account", validatorAddress, "900000000stake", + args = []string{"genesis", "add-genesis-account", validatorAddress, "900000000stake", "--keyring-backend", keyringBackend, "--home", directory, - ).CombinedOutput() + } + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + out, err = exec.Command("sourcehubd", args...).CombinedOutput() s.t.Log(string(out)) if err != nil { return nil, err } - out, err = exec.Command( - "sourcehubd", "genesis", "gentx", validatorName, "10000000stake", + args = []string{"genesis", "gentx", validatorName, "10000000stake", "--chain-id", chainID, "--keyring-backend", keyringBackend, - "--home", directory, - ).CombinedOutput() + "--home", directory} + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + out, err = exec.Command("sourcehubd", args...).CombinedOutput() s.t.Log(string(out)) if err != nil { return nil, err } - out, err = exec.Command("sourcehubd", "genesis", "collect-gentxs", "--home", directory).CombinedOutput() + args = []string{"genesis", "collect-gentxs", "--home", directory} + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + out, err = exec.Command("sourcehubd", args...).CombinedOutput() s.t.Log(string(out)) if err != nil { return nil, err @@ -485,8 +503,7 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { releaseP2pPort() releasePprofPort() - sourceHubCmd := exec.Command( - "sourcehubd", + args = []string{ "start", "--minimum-gas-prices", "0stake", "--home", directory, @@ -494,7 +511,9 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { "--rpc.laddr", rpcAddress, "--p2p.laddr", p2pAddress, "--rpc.pprof_laddr", pprofAddress, - ) + } + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + sourceHubCmd := exec.Command("sourcehubd", args...) var bf testBuffer bf.Lines = make(chan string, 100) sourceHubCmd.Stdout = &bf @@ -566,23 +585,32 @@ func getFreePort() (int, func(), error) { // crossLock forms a cross process lock by attempting to listen to the given port. // -// This function will only return once the port is free. A function to unbind from the -// port is returned - this unlock function may be called multiple times without issue. +// This function will only return once the port is free or the timeout is reached. +// A function to unbind from the port is returned - this unlock function may be called +// multiple times without issue. func crossLock(port uint16) (func(), error) { - l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", port)) - if err != nil { - if strings.Contains(err.Error(), "address already in use") { - time.Sleep(5 * time.Millisecond) - return crossLock(port) + timeout := time.After(20 * time.Second) + for { + select { + case <-timeout: + return nil, fmt.Errorf("timeout reached while trying to acquire cross process lock on port %v", port) + default: + l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", port)) + if err != nil { + if strings.Contains(err.Error(), "address already in use") { + time.Sleep(5 * time.Millisecond) + continue + } + return nil, err + } + + return func() { + // there are no errors that this returns that we actually care about + _ = l.Close() + }, + nil } - return nil, err } - - return func() { - // there are no errors that this returns that we actually care about - _ = l.Close() - }, - nil } func getNodeAudience(s *state, nodeIndex int) immutable.Option[string] { diff --git a/tests/integration/encryption/peer_acp_test.go b/tests/integration/encryption/peer_acp_test.go new file mode 100644 index 0000000000..728ba39953 --- /dev/null +++ b/tests/integration/encryption/peer_acp_test.go @@ -0,0 +1,375 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package encryption + +import ( + "fmt" + "testing" + "time" + + "github.com/sourcenetwork/immutable" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +const policy = ` +name: Test Policy + +description: A Policy + +actor: + name: actor + +resources: + users: + permissions: + read: + expr: owner + reader + writer + + write: + expr: owner + writer + + nothing: + expr: dummy + + relations: + owner: + types: + - actor + + reader: + types: + - actor + + writer: + types: + - actor + + admin: + manages: + - reader + types: + - actor + + dummy: + types: + - actor +` + +func TestDocEncryptionACP_IfUserAndNodeHaveAccess_ShouldFetch(t *testing.T) { + expectedPolicyID := "fc56b7509c20ac8ce682b3b9b4fdaad868a9c70dda6ec16720298be64f16e9a4" + + test := testUtils.TestCase{ + KMS: testUtils.KMS{Activated: true}, + SupportedACPTypes: immutable.Some( + []testUtils.ACPType{ + testUtils.SourceHubACPType, + }, + ), + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.AddPolicy{ + Identity: testUtils.ClientIdentity(0), + Policy: policy, + ExpectedPolicyID: expectedPolicyID, + }, + testUtils.SchemaUpdate{ + Schema: fmt.Sprintf(` + type Users @policy( + id: "%s", + resource: "users" + ) { + name: String + age: Int + } + `, + expectedPolicyID, + ), + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.ClientIdentity(0), + Doc: ` + { + "name": "Fred", + "age": 33 + } + `, + IsDocEncrypted: true, + }, + testUtils.AddDocActorRelationship{ + RequestorIdentity: testUtils.ClientIdentity(0), + TargetIdentity: testUtils.ClientIdentity(1), + DocID: 0, + Relation: "reader", + }, + testUtils.AddDocActorRelationship{ + RequestorIdentity: testUtils.ClientIdentity(0), + TargetIdentity: testUtils.NodeIdentity(1), + DocID: 0, + Relation: "reader", + }, + testUtils.WaitForSync{ + Decrypted: []int{0}, + }, + testUtils.Request{ + NodeID: immutable.Some(1), + Identity: testUtils.ClientIdentity(1), + Request: ` + query { + Users { + name + } + } + `, + Results: map[string]any{ + "Users": []map[string]any{ + {"name": "Fred"}, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestDocEncryptionACP_IfUserHasAccessButNotNode_ShouldNotFetch(t *testing.T) { + expectedPolicyID := "fc56b7509c20ac8ce682b3b9b4fdaad868a9c70dda6ec16720298be64f16e9a4" + + test := testUtils.TestCase{ + KMS: testUtils.KMS{Activated: true}, + SupportedACPTypes: immutable.Some( + []testUtils.ACPType{ + testUtils.SourceHubACPType, + }, + ), + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.AddPolicy{ + Identity: testUtils.ClientIdentity(0), + Policy: policy, + ExpectedPolicyID: expectedPolicyID, + }, + testUtils.SchemaUpdate{ + Schema: fmt.Sprintf(` + type Users @policy( + id: "%s", + resource: "users" + ) { + name: String + age: Int + } + `, + expectedPolicyID, + ), + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.ClientIdentity(0), + Doc: ` + { + "name": "Fred", + "age": 33 + } + `, + IsDocEncrypted: true, + }, + testUtils.AddDocActorRelationship{ + RequestorIdentity: testUtils.ClientIdentity(0), + TargetIdentity: testUtils.ClientIdentity(1), + DocID: 0, + Relation: "reader", + }, + testUtils.Wait{Duration: 100 * time.Millisecond}, + testUtils.Request{ + NodeID: immutable.Some(1), + Identity: testUtils.ClientIdentity(1), + Request: ` + query { + Users { + name + } + } + `, + Results: map[string]any{ + "Users": []map[string]any{}, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestDocEncryptionACP_IfNodeHasAccessToSomeDocs_ShouldFetchOnlyThem(t *testing.T) { + expectedPolicyID := "fc56b7509c20ac8ce682b3b9b4fdaad868a9c70dda6ec16720298be64f16e9a4" + + test := testUtils.TestCase{ + KMS: testUtils.KMS{Activated: true}, + SupportedACPTypes: immutable.Some( + []testUtils.ACPType{ + testUtils.SourceHubACPType, + }, + ), + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.AddPolicy{ + Identity: testUtils.NodeIdentity(0), + Policy: policy, + ExpectedPolicyID: expectedPolicyID, + }, + testUtils.SchemaUpdate{ + Schema: fmt.Sprintf(` + type Users @policy( + id: "%s", + resource: "users" + ) { + name: String + age: Int + } + `, + expectedPolicyID, + ), + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + // encrypted, private, shared + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.NodeIdentity(0), + Doc: ` + { + "name": "Fred", + "age": 33 + } + `, + IsDocEncrypted: true, + }, + testUtils.AddDocActorRelationship{ + RequestorIdentity: testUtils.NodeIdentity(0), + TargetIdentity: testUtils.NodeIdentity(1), + DocID: 0, + Relation: "reader", + }, + // encrypted, private, not shared + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.NodeIdentity(0), + Doc: ` + { + "name": "Andy", + "age": 33 + } + `, + IsDocEncrypted: true, + }, + // encrypted, public + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: ` + { + "name": "Islam", + "age": 33 + } + `, + IsDocEncrypted: true, + }, + // not encrypted, private, shared + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.NodeIdentity(0), + Doc: ` + { + "name": "John", + "age": 33 + } + `, + }, + testUtils.AddDocActorRelationship{ + RequestorIdentity: testUtils.NodeIdentity(0), + TargetIdentity: testUtils.NodeIdentity(1), + DocID: 3, + Relation: "reader", + }, + // not encrypted, private, not shared + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.NodeIdentity(0), + Doc: ` + { + "name": "Keenan", + "age": 33 + } + `, + }, + // not encrypted, public + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: ` + { + "name": "Shahzad", + "age": 33 + } + `, + }, + testUtils.WaitForSync{ + Decrypted: []int{0, 2}, + }, + testUtils.Request{ + NodeID: immutable.Some(1), + Identity: testUtils.NodeIdentity(1), + Request: ` + query { + Users { + name + } + } + `, + Results: map[string]any{ + "Users": []map[string]any{ + {"name": "John"}, + {"name": "Islam"}, + {"name": "Shahzad"}, + {"name": "Fred"}, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/events.go b/tests/integration/events.go index 6129d600ee..5d017429eb 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -74,7 +74,7 @@ func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { // all previous documents should be merged on the subscriber node for key, val := range s.nodeP2P[cfg.SourceNodeID].actualDocHeads { - s.nodeP2P[cfg.TargetNodeID].expectedDocHeads[key] = val + s.nodeP2P[cfg.TargetNodeID].expectedDocHeads[key] = val.cid } // update node connections and replicators @@ -196,41 +196,59 @@ func waitForUpdateEvents( // // Will fail the test if an event is not received within the expected time interval to prevent tests // from running forever. -func waitForMergeEvents(s *state) { +func waitForMergeEvents(s *state, action WaitForSync) { for nodeID := 0; nodeID < len(s.nodes); nodeID++ { expect := s.nodeP2P[nodeID].expectedDocHeads // remove any docs that are already merged // up to the expected document head for key, val := range s.nodeP2P[nodeID].actualDocHeads { - if head, ok := expect[key]; ok && head.String() == val.String() { + if head, ok := expect[key]; ok && head.String() == val.cid.String() { delete(expect, key) } } + expectDecrypted := make(map[string]struct{}, len(action.Decrypted)) + for _, docIndex := range action.Decrypted { + if len(s.docIDs[0]) <= docIndex { + require.Fail(s.t, "doc index %d out of range", docIndex) + } + docID := s.docIDs[0][docIndex].String() + actual, hasActual := s.nodeP2P[nodeID].actualDocHeads[docID] + if !hasActual || !actual.decrypted { + expectDecrypted[docID] = struct{}{} + } + } + // wait for all expected doc heads to be merged // // the order of merges does not matter as we only // expect the latest head to eventually be merged // // unexpected merge events are ignored - for len(expect) > 0 { - var evt event.Merge + for len(expect) > 0 || len(expectDecrypted) > 0 { + var evt event.MergeComplete select { case msg, ok := <-s.nodeEvents[nodeID].merge.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for merge complete event") } - evt = msg.Data.(event.Merge) + evt = msg.Data.(event.MergeComplete) case <-time.After(30 * eventTimeout): require.Fail(s.t, "timeout waiting for merge complete event") } - head, ok := expect[evt.DocID] - if ok && head.String() == evt.Cid.String() { - delete(expect, evt.DocID) + _, ok := expectDecrypted[evt.Merge.DocID] + if ok && evt.Decrypted { + delete(expectDecrypted, evt.Merge.DocID) + } + + head, ok := expect[evt.Merge.DocID] + if ok && head.String() == evt.Merge.Cid.String() { + delete(expect, evt.Merge.DocID) } + s.nodeP2P[nodeID].actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted} } } } @@ -247,7 +265,8 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) { } // update the actual document head on the node that updated it - s.nodeP2P[nodeID].actualDocHeads[evt.DocID] = evt.Cid + // as the node created the document, it is already decrypted + s.nodeP2P[nodeID].actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true} // update the expected document heads of replicator targets for id := range s.nodeP2P[nodeID].replicators { @@ -309,8 +328,8 @@ func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} { return expect } -func waitForSync(s *state) { - waitForMergeEvents(s) +func waitForSync(s *state, action WaitForSync) { + waitForMergeEvents(s, action) } // getEventsForUpdateWithFilter returns a map of docIDs that should be diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index 7c5b20e69a..87e224dce4 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -133,7 +133,10 @@ type GetAllP2PCollections struct { // // For example you will likely wish to `WaitForSync` after creating a document in node 0 before querying // node 1 to see if it has been replicated. -type WaitForSync struct{} +type WaitForSync struct { + // Decrypted is a list of document indexes that are expected to be merged and synced decrypted. + Decrypted []int +} // connectPeers connects two existing, started, nodes as peers. It returns a channel // that will receive an empty struct upon sync completion of all expected peer-sync events. diff --git a/tests/integration/state.go b/tests/integration/state.go index b4a3777d03..ccd2870a8c 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -45,7 +45,7 @@ type p2pState struct { // actualDocHeads contains all document heads that exist on a node. // // The map key is the doc id. The map value is the doc head. - actualDocHeads map[string]cid.Cid + actualDocHeads map[string]docHeadState // expectedDocHeads contains all document heads that are expected to exist on a node. // @@ -53,13 +53,22 @@ type p2pState struct { expectedDocHeads map[string]cid.Cid } +// docHeadState contains the state of a document head. +// It is used to track if a document at a certain head has been decrypted. +type docHeadState struct { + // The actual document head. + cid cid.Cid + // Indicates if the document at the given head has been decrypted. + decrypted bool +} + // newP2PState returns a new empty p2p state. func newP2PState() *p2pState { return &p2pState{ connections: make(map[int]struct{}), replicators: make(map[int]struct{}), peerCollections: make(map[int]struct{}), - actualDocHeads: make(map[string]cid.Cid), + actualDocHeads: make(map[string]docHeadState), expectedDocHeads: make(map[string]cid.Cid), } } diff --git a/tests/integration/test_case.go b/tests/integration/test_case.go index e1c9b0b6f1..a1ab291257 100644 --- a/tests/integration/test_case.go +++ b/tests/integration/test_case.go @@ -12,6 +12,7 @@ package tests import ( "testing" + "time" "github.com/lens-vm/lens/host-go/config/model" "github.com/sourcenetwork/immutable" @@ -806,3 +807,9 @@ type GetNodeIdentity struct { // Default value is `NoIdentity()`. ExpectedIdentity immutable.Option[identityRef] } + +// Wait is an action that will wait for the given duration. +type Wait struct { + // Duration is the duration to wait. + Duration time.Duration +} diff --git a/tests/integration/utils.go b/tests/integration/utils.go index aff1ebecb7..c050adec39 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -400,7 +400,10 @@ func performAction( assertClientIntrospectionResults(s, action) case WaitForSync: - waitForSync(s) + waitForSync(s, action) + + case Wait: + <-time.After(action.Duration) case Benchmark: benchmarkAction(s, actionIndex, action) From 42fec71609b9e63d49ab233f279df090e30aa407 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Mon, 4 Nov 2024 09:26:45 +0100 Subject: [PATCH 02/14] Fix lint --- internal/db/db.go | 10 +++++----- internal/db/p2p_replicator.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/db/db.go b/internal/db/db.go index 75aab2936c..c285884a13 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -16,6 +16,7 @@ package db import ( "context" + "errors" "sync" "sync/atomic" "time" @@ -30,7 +31,7 @@ import ( "github.com/sourcenetwork/defradb/acp/identity" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/errors" + defraErrors "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/internal/core" "github.com/sourcenetwork/defradb/internal/db/permission" @@ -275,12 +276,11 @@ func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collectio if err != nil { return err } - defer headsIterator.Close() for { hasValue, err := headsIterator.Next() if err != nil { - return err + return errors.Join(err, headsIterator.Close()) } if !hasValue { break @@ -294,7 +294,7 @@ func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collectio } db.events.Publish(event.NewMessage(event.UpdateName, updateEvent)) } - return nil + return headsIterator.Close() } func (db *db) DeleteDocActorRelationship( @@ -363,7 +363,7 @@ func (db *db) initialize(ctx context.Context) error { } exists, err := txn.Systemstore().Has(ctx, ds.NewKey("init")) - if err != nil && !errors.Is(err, ds.ErrNotFound) { + if err != nil && !defraErrors.Is(err, ds.ErrNotFound) { return err } // if we're loading an existing database, just load the schema diff --git a/internal/db/p2p_replicator.go b/internal/db/p2p_replicator.go index 6e8f1fb0ef..ca48125a32 100644 --- a/internal/db/p2p_replicator.go +++ b/internal/db/p2p_replicator.go @@ -639,7 +639,7 @@ func (db *db) retryReplicator(ctx context.Context, peerID string) { } } -func (db *db) retryDoc(ctx context.Context, docID string) error { +func (db *db) retryDoc(ctx context.Context, docID string) (resErr error) { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -650,7 +650,7 @@ func (db *db) retryDoc(ctx context.Context, docID string) error { if err != nil { return err } - defer headsIterator.Close() + defer func() { resErr = headsIterator.Close() }() for { select { From a8edea0418fb9e384627b7297e7b7536d8f4d07d Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Tue, 5 Nov 2024 12:52:40 +0100 Subject: [PATCH 03/14] Fix iterator error handling --- internal/db/p2p_replicator.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/db/p2p_replicator.go b/internal/db/p2p_replicator.go index ca48125a32..5b7754fff4 100644 --- a/internal/db/p2p_replicator.go +++ b/internal/db/p2p_replicator.go @@ -13,6 +13,7 @@ package db import ( "context" "encoding/json" + "errors" "time" "github.com/fxamacker/cbor/v2" @@ -23,7 +24,7 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/errors" + dbErrors "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/internal/core" "github.com/sourcenetwork/defradb/internal/keys" @@ -162,7 +163,7 @@ func (db *db) getDocsHeads( log.ErrorContextE( ctx, "Failed to get all docIDs", - NewErrReplicatorDocID(err, errors.NewKV("Collection", col.Name().Value())), + NewErrReplicatorDocID(err, dbErrors.NewKV("Collection", col.Name().Value())), ) continue } @@ -650,7 +651,7 @@ func (db *db) retryDoc(ctx context.Context, docID string) (resErr error) { if err != nil { return err } - defer func() { resErr = headsIterator.Close() }() + defer func() { resErr = errors.Join(resErr, headsIterator.Close()) }() for { select { From ac77afd9105790b44e0d87781767c31e137209c7 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Tue, 5 Nov 2024 13:14:05 +0100 Subject: [PATCH 04/14] Use client.DB instead of db --- internal/db/collection_retriever.go | 25 ++++++++++++++++--------- internal/db/errors.go | 6 ++++++ 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/internal/db/collection_retriever.go b/internal/db/collection_retriever.go index 4549aa43cd..7e228b3cf7 100644 --- a/internal/db/collection_retriever.go +++ b/internal/db/collection_retriever.go @@ -15,21 +15,18 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/internal/db/description" + "github.com/sourcenetwork/immutable" ) // CollectionRetriever is a helper struct that retrieves a collection from a document ID. type CollectionRetriever struct { - db *db + db client.DB } // NewCollectionRetriever creates a new CollectionRetriever. -func NewCollectionRetriever(database client.DB) *CollectionRetriever { - internalDB, ok := database.(*db) - if !ok { - return nil - } +func NewCollectionRetriever(db client.DB) *CollectionRetriever { return &CollectionRetriever{ - db: internalDB, + db: db, } } @@ -63,10 +60,20 @@ func (r *CollectionRetriever) RetrieveCollectionFromDocID( return nil, err } - col, err := getCollectionFromRootSchema(ctx, r.db, schema.Root) + cols, err := r.db.GetCollections( + ctx, + client.CollectionFetchOptions{ + SchemaRoot: immutable.Some(schema.Root), + }, + ) + if err != nil { return nil, err } - return col, nil + if len(cols) == 0 { + return nil, NewErrCollectionWithSchemaRootNotFound(schema.Root) + } + + return cols[0], nil } diff --git a/internal/db/errors.go b/internal/db/errors.go index 920e587e50..1bc200f2b4 100644 --- a/internal/db/errors.go +++ b/internal/db/errors.go @@ -107,6 +107,7 @@ const ( errMaterializedViewAndACPNotSupported string = "materialized views do not support ACP" errInvalidDefaultFieldValue string = "default field value is invalid" errDocIDNotFound string = "docID not found" + errCollectionWithSchemaRootNotFound string = "collection with schema root not found" ) var ( @@ -154,6 +155,7 @@ var ( ErrFailedToRetryDoc = errors.New("failed to retry doc") ErrTimeoutDocRetry = errors.New("timeout while retrying doc") ErrDocIDNotFound = errors.New(errDocIDNotFound) + ErrorCollectionWithSchemaRootNotFound = errors.New(errCollectionWithSchemaRootNotFound) ) // NewErrFailedToGetHeads returns a new error indicating that the heads of a document @@ -696,3 +698,7 @@ func NewErrDefaultFieldValueInvalid(collection string, inner error) error { func NewErrDocIDNotFound(docID string) error { return errors.New(errDocIDNotFound, errors.NewKV("DocID", docID)) } + +func NewErrCollectionWithSchemaRootNotFound(schemaRoot string) error { + return errors.New(errCollectionWithSchemaRootNotFound, errors.NewKV("SchemaRoot", schemaRoot)) +} From 711c1af3b83d48bd6cd9094443e80f83d84f50f9 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Tue, 5 Nov 2024 14:08:28 +0100 Subject: [PATCH 05/14] PR fixup --- internal/db/collection_retriever.go | 2 +- internal/db/db.go | 60 +++++++++++++++-------------- internal/db/iterator.go | 1 + internal/kms/pubsub.go | 30 +++++---------- 4 files changed, 43 insertions(+), 50 deletions(-) diff --git a/internal/db/collection_retriever.go b/internal/db/collection_retriever.go index 7e228b3cf7..c59c3945cf 100644 --- a/internal/db/collection_retriever.go +++ b/internal/db/collection_retriever.go @@ -75,5 +75,5 @@ func (r *CollectionRetriever) RetrieveCollectionFromDocID( return nil, NewErrCollectionWithSchemaRootNotFound(schema.Root) } - return cols[0], nil + return cols[0], headIterator.Close() } diff --git a/internal/db/db.go b/internal/db/db.go index c285884a13..c853baa02f 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -228,6 +228,32 @@ func (db *db) AddPolicy( return client.AddPolicyResult{PolicyID: policyID}, nil } +func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collection client.Collection) error { + headsIterator, err := NewHeadBlocksIterator(ctx, db.multistore.Headstore(), db.Blockstore(), docID) + if err != nil { + return err + } + + for { + hasValue, err := headsIterator.Next() + if err != nil { + return errors.Join(err, headsIterator.Close()) + } + if !hasValue { + break + } + + updateEvent := event.Update{ + DocID: docID, + Cid: headsIterator.CurrentCid(), + SchemaRoot: collection.Schema().Root, + Block: headsIterator.CurrentRawBlock(), + } + db.events.Publish(event.NewMessage(event.UpdateName, updateEvent)) + } + return headsIterator.Close() +} + func (db *db) AddDocActorRelationship( ctx context.Context, collectionName string, @@ -263,38 +289,14 @@ func (db *db) AddDocActorRelationship( return client.AddDocActorRelationshipResult{}, err } - err = db.publishDocUpdateEvent(ctx, docID, collection) - if err != nil { - return client.AddDocActorRelationshipResult{}, err - } - - return client.AddDocActorRelationshipResult{ExistedAlready: exists}, nil -} - -func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collection client.Collection) error { - headsIterator, err := NewHeadBlocksIterator(ctx, db.multistore.Headstore(), db.Blockstore(), docID) - if err != nil { - return err - } - - for { - hasValue, err := headsIterator.Next() + if !exists { + err = db.publishDocUpdateEvent(ctx, docID, collection) if err != nil { - return errors.Join(err, headsIterator.Close()) + return client.AddDocActorRelationshipResult{}, err } - if !hasValue { - break - } - - updateEvent := event.Update{ - DocID: docID, - Cid: headsIterator.CurrentCid(), - SchemaRoot: collection.Schema().Root, - Block: headsIterator.CurrentRawBlock(), - } - db.events.Publish(event.NewMessage(event.UpdateName, updateEvent)) } - return headsIterator.Close() + + return client.AddDocActorRelationshipResult{ExistedAlready: exists}, nil } func (db *db) DeleteDocActorRelationship( diff --git a/internal/db/iterator.go b/internal/db/iterator.go index cfa79e71eb..38ead11dd3 100644 --- a/internal/db/iterator.go +++ b/internal/db/iterator.go @@ -37,6 +37,7 @@ type DocHeadBlocksIterator struct { var _ io.Closer = (*DocHeadBlocksIterator)(nil) func (h *DocHeadBlocksIterator) Close() error { + h.cids = nil return nil } diff --git a/internal/kms/pubsub.go b/internal/kms/pubsub.go index 7da101346f..13cef16269 100644 --- a/internal/kms/pubsub.go +++ b/internal/kms/pubsub.go @@ -24,12 +24,14 @@ import ( grpcpeer "google.golang.org/grpc/peer" "github.com/sourcenetwork/defradb/acp" + "github.com/sourcenetwork/defradb/acp/identity" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/crypto" "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/event" coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/db/permission" "github.com/sourcenetwork/defradb/internal/encryption" ) @@ -372,26 +374,14 @@ func (s *pubSubService) doesIdentityHaveDocPermission( return false, err } - policy := collection.Definition().Description.Policy - if !policy.HasValue() || policy.Value().ID == "" || policy.Value().ResourceName == "" { - return true, nil - } - - policyID, resourceName := policy.Value().ID, policy.Value().ResourceName - - isRegistered, err := s.acp.Value().IsDocRegistered(ctx, policyID, resourceName, docID) - if err != nil { - return false, err - } - - if !isRegistered { - // Unrestricted access as it is a public document. - return true, nil - } - - hasPerm, err := s.acp.Value().CheckDocAccess(ctx, acp.ReadPermission, actorIdentity, policyID, resourceName, docID) - - return hasPerm, err + return permission.CheckAccessOfDocOnCollectionWithACP( + ctx, + immutable.Some(identity.Identity{DID: actorIdentity}), + s.acp.Value(), + collection, + acp.ReadPermission, + docID, + ) } func encodeToBase64(data []byte) []byte { From 784217e034bb5e3eb38179883f98ccd7bb4912a4 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Tue, 5 Nov 2024 14:11:47 +0100 Subject: [PATCH 06/14] Don't expect Update events on no-op --- tests/integration/acp.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration/acp.go b/tests/integration/acp.go index 92f3250ad9..74c36ff1b3 100644 --- a/tests/integration/acp.go +++ b/tests/integration/acp.go @@ -74,6 +74,7 @@ func init() { if acpType == "" { acpType = LocalACPType } + acpType = SourceHubACPType } // AddPolicy will attempt to add the given policy using DefraDB's ACP system. @@ -214,7 +215,7 @@ func addDocActorRelationshipACP( } } - if action.ExpectedError == "" { + if action.ExpectedError == "" && !action.ExpectedExistence { waitForUpdateEvents(s, actionNodeID, map[string]struct{}{docID: {}}) } } @@ -467,7 +468,7 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { // // We need to lock before getting the ports, otherwise they may try and use the port we use for locking. // We can only unlock after the source hub node has started and begun listening on the assigned ports. - unlock, err := crossLock(55555) + unlock, err := crossLock(55559) if err != nil { return nil, err } From b60370dd1011724663d8102bcc0e5f7963ab0955 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Tue, 5 Nov 2024 14:27:59 +0100 Subject: [PATCH 07/14] Format --- internal/db/collection_retriever.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/db/collection_retriever.go b/internal/db/collection_retriever.go index c59c3945cf..a624cc739c 100644 --- a/internal/db/collection_retriever.go +++ b/internal/db/collection_retriever.go @@ -13,9 +13,10 @@ package db import ( "context" + "github.com/sourcenetwork/immutable" + "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/internal/db/description" - "github.com/sourcenetwork/immutable" ) // CollectionRetriever is a helper struct that retrieves a collection from a document ID. From a47508980dc0160a9d57736630d58f0afb57f899 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Tue, 5 Nov 2024 15:35:42 +0100 Subject: [PATCH 08/14] Revert test values --- tests/integration/acp.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/acp.go b/tests/integration/acp.go index 74c36ff1b3..b98be7a059 100644 --- a/tests/integration/acp.go +++ b/tests/integration/acp.go @@ -74,7 +74,6 @@ func init() { if acpType == "" { acpType = LocalACPType } - acpType = SourceHubACPType } // AddPolicy will attempt to add the given policy using DefraDB's ACP system. @@ -468,7 +467,7 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { // // We need to lock before getting the ports, otherwise they may try and use the port we use for locking. // We can only unlock after the source hub node has started and begun listening on the assigned ports. - unlock, err := crossLock(55559) + unlock, err := crossLock(55555) if err != nil { return nil, err } From 5dad4b460597bcbbccc27c96e18cabc2212a16ea Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Tue, 5 Nov 2024 16:15:26 +0100 Subject: [PATCH 09/14] Add another test, track closed nodes --- tests/integration/encryption/peer_acp_test.go | 93 +++++++++++++++++++ tests/integration/events.go | 14 ++- tests/integration/state.go | 4 + tests/integration/utils.go | 7 +- 4 files changed, 113 insertions(+), 5 deletions(-) diff --git a/tests/integration/encryption/peer_acp_test.go b/tests/integration/encryption/peer_acp_test.go index 728ba39953..bb6705c626 100644 --- a/tests/integration/encryption/peer_acp_test.go +++ b/tests/integration/encryption/peer_acp_test.go @@ -373,3 +373,96 @@ func TestDocEncryptionACP_IfNodeHasAccessToSomeDocs_ShouldFetchOnlyThem(t *testi testUtils.ExecuteTestCase(t, test) } + +func TestDocEncryptionACP_IfClientNodeHasDocPermissionButServerNodeIsNotAvailable_ShouldNotFetch(t *testing.T) { + expectedPolicyID := "fc56b7509c20ac8ce682b3b9b4fdaad868a9c70dda6ec16720298be64f16e9a4" + + test := testUtils.TestCase{ + KMS: testUtils.KMS{Activated: true}, + SupportedACPTypes: immutable.Some( + []testUtils.ACPType{ + testUtils.SourceHubACPType, + }, + ), + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.AddPolicy{ + Identity: testUtils.NodeIdentity(0), + Policy: policy, + ExpectedPolicyID: expectedPolicyID, + }, + testUtils.SchemaUpdate{ + Schema: fmt.Sprintf(` + type Users @policy( + id: "%s", + resource: "users" + ) { + name: String + age: Int + } + `, + expectedPolicyID, + ), + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.ConnectPeers{ + SourceNodeID: 2, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 2, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.NodeIdentity(0), + Doc: ` + { + "name": "Fred", + "age": 33 + } + `, + IsDocEncrypted: true, + }, + testUtils.WaitForSync{}, + testUtils.Close{ + NodeID: immutable.Some(0), + }, + testUtils.AddDocActorRelationship{ + NodeID: immutable.Some(1), + RequestorIdentity: testUtils.NodeIdentity(0), + TargetIdentity: testUtils.NodeIdentity(1), + DocID: 0, + Relation: "reader", + }, + testUtils.Wait{ + Duration: 100 * time.Millisecond, + }, + testUtils.Request{ + NodeID: immutable.Some(1), + Identity: testUtils.NodeIdentity(1), + Request: ` + query { + Users { + name + } + } + `, + Results: map[string]any{ + "Users": []map[string]any{}, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/events.go b/tests/integration/events.go index 5d017429eb..1fbc64416e 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -160,6 +160,10 @@ func waitForUpdateEvents( continue // node is not selected } + if _, ok := s.closedNodes[i]; ok { + continue // node is closed + } + expect := make(map[string]struct{}, len(docIDs)) for k := range docIDs { expect[k] = struct{}{} @@ -170,17 +174,17 @@ func waitForUpdateEvents( select { case msg, ok := <-s.nodeEvents[i].update.Message(): if !ok { - require.Fail(s.t, "subscription closed waiting for update event") + require.Fail(s.t, "subscription closed waiting for update event", "Node %d", i) } evt = msg.Data.(event.Update) case <-time.After(eventTimeout): - require.Fail(s.t, "timeout waiting for update event") + require.Fail(s.t, "timeout waiting for update event", "Node %d", i) } // make sure the event is expected _, ok := expect[evt.DocID] - require.True(s.t, ok, "unexpected document update") + require.True(s.t, ok, "unexpected document update", "Node %d", i) delete(expect, evt.DocID) // we only need to update the network state if the nodes @@ -198,6 +202,10 @@ func waitForUpdateEvents( // from running forever. func waitForMergeEvents(s *state, action WaitForSync) { for nodeID := 0; nodeID < len(s.nodes); nodeID++ { + if _, ok := s.closedNodes[nodeID]; ok { + continue // node is closed + } + expect := s.nodeP2P[nodeID].expectedDocHeads // remove any docs that are already merged diff --git a/tests/integration/state.go b/tests/integration/state.go index ccd2870a8c..e7130f2ebd 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -165,6 +165,9 @@ type state struct { // The nodes active in this test. nodes []clients.Client + // closedNodes contains the indexes of nodes that have been closed. + closedNodes map[int]struct{} + // nodeP2P contains p2p states for all nodes nodeP2P []*p2pState @@ -232,6 +235,7 @@ func newState( nodeConfigs: [][]net.NodeOpt{}, nodeP2P: []*p2pState{}, nodes: []clients.Client{}, + closedNodes: map[int]struct{}{}, dbPaths: []string{}, collections: [][]client.Collection{}, collectionNames: collectionNames, diff --git a/tests/integration/utils.go b/tests/integration/utils.go index c050adec39..f827ac0130 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -577,9 +577,10 @@ func closeNodes( s *state, action Close, ) { - _, nodes := getNodesWithIDs(action.NodeID, s.nodes) - for _, node := range nodes { + nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) + for i, node := range nodes { node.Close() + s.closedNodes[nodeIDs[i]] = struct{}{} } } @@ -784,6 +785,8 @@ func startNodes(s *state, action Start) { require.NoError(s.t, err) s.nodeEvents[nodeIndex] = eventState + delete(s.closedNodes, nodeIndex) + waitForNetworkSetupEvents(s, i) } From e52c61b8c49e1deb36a2ab011bfa2fbeab8d5d3a Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Tue, 5 Nov 2024 16:52:51 +0100 Subject: [PATCH 10/14] Add Join to defra errors --- errors/errors.go | 29 +++++++++++++++++++++++++++++ internal/db/db.go | 5 ++--- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/errors/errors.go b/errors/errors.go index 45c1202e77..dd5bf706c4 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -63,10 +63,39 @@ func Wrap(message string, inner error, keyvals ...KV) error { return err } +// Is reports whether any error in err's tree matches target. +// +// The tree consists of err itself, followed by the errors obtained by repeatedly +// calling its Unwrap() error or Unwrap() []error method. When err wraps multiple +// errors, Is examines err followed by a depth-first traversal of its children. +// +// An error is considered to match a target if it is equal to that target or if +// it implements a method Is(error) bool such that Is(target) returns true. +// +// An error type might provide an Is method so it can be treated as equivalent +// to an existing error. For example, if MyError defines +// +// func (m MyError) Is(target error) bool { return target == fs.ErrExist } +// +// then Is(MyError{}, fs.ErrExist) returns true. See [syscall.Errno.Is] for +// an example in the standard library. An Is method should only shallowly +// compare err and the target and not call [Unwrap] on either. func Is(err, target error) bool { return errors.Is(err, target) } +// Join returns an error that wraps the given errors. +// Any nil error values are discarded. +// Join returns nil if every value in errs is nil. +// The error formats as the concatenation of the strings obtained +// by calling the Error method of each element of errs, with a newline +// between each string. +// +// A non-nil error returned by Join implements the Unwrap() []error method. +func Join(errs ...error) error { + return errors.Join(errs...) +} + // This function will not be inlined by the compiler as it will spoil any stacktrace // generated. // diff --git a/internal/db/db.go b/internal/db/db.go index c853baa02f..e717610725 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -16,7 +16,6 @@ package db import ( "context" - "errors" "sync" "sync/atomic" "time" @@ -31,7 +30,7 @@ import ( "github.com/sourcenetwork/defradb/acp/identity" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/datastore" - defraErrors "github.com/sourcenetwork/defradb/errors" + "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/internal/core" "github.com/sourcenetwork/defradb/internal/db/permission" @@ -365,7 +364,7 @@ func (db *db) initialize(ctx context.Context) error { } exists, err := txn.Systemstore().Has(ctx, ds.NewKey("init")) - if err != nil && !defraErrors.Is(err, ds.ErrNotFound) { + if err != nil && !errors.Is(err, ds.ErrNotFound) { return err } // if we're loading an existing database, just load the schema From f02c09dd051ec5cf0ad3b126ddfe947ad1cd4397 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Tue, 5 Nov 2024 17:55:18 +0100 Subject: [PATCH 11/14] Make collection retriever private --- internal/db/collection_retriever.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/db/collection_retriever.go b/internal/db/collection_retriever.go index a624cc739c..17a9ed743e 100644 --- a/internal/db/collection_retriever.go +++ b/internal/db/collection_retriever.go @@ -19,20 +19,20 @@ import ( "github.com/sourcenetwork/defradb/internal/db/description" ) -// CollectionRetriever is a helper struct that retrieves a collection from a document ID. -type CollectionRetriever struct { +// collectionRetriever is a helper struct that retrieves a collection from a document ID. +type collectionRetriever struct { db client.DB } // NewCollectionRetriever creates a new CollectionRetriever. -func NewCollectionRetriever(db client.DB) *CollectionRetriever { - return &CollectionRetriever{ +func NewCollectionRetriever(db client.DB) *collectionRetriever { + return &collectionRetriever{ db: db, } } // RetrieveCollectionFromDocID retrieves a collection from a document ID. -func (r *CollectionRetriever) RetrieveCollectionFromDocID( +func (r *collectionRetriever) RetrieveCollectionFromDocID( ctx context.Context, docID string, ) (client.Collection, error) { From 5c038686af931d24ffccb1d90eedf28ca1ecf441 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Tue, 5 Nov 2024 18:03:16 +0100 Subject: [PATCH 12/14] Remove Close method of head iterator --- internal/db/collection_retriever.go | 2 +- internal/db/db.go | 4 ++-- internal/db/iterator.go | 8 -------- internal/db/p2p_replicator.go | 4 +--- 4 files changed, 4 insertions(+), 14 deletions(-) diff --git a/internal/db/collection_retriever.go b/internal/db/collection_retriever.go index 17a9ed743e..1099e1abfc 100644 --- a/internal/db/collection_retriever.go +++ b/internal/db/collection_retriever.go @@ -76,5 +76,5 @@ func (r *collectionRetriever) RetrieveCollectionFromDocID( return nil, NewErrCollectionWithSchemaRootNotFound(schema.Root) } - return cols[0], headIterator.Close() + return cols[0], nil } diff --git a/internal/db/db.go b/internal/db/db.go index e717610725..3aee6de016 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -236,7 +236,7 @@ func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collectio for { hasValue, err := headsIterator.Next() if err != nil { - return errors.Join(err, headsIterator.Close()) + return err } if !hasValue { break @@ -250,7 +250,7 @@ func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collectio } db.events.Publish(event.NewMessage(event.UpdateName, updateEvent)) } - return headsIterator.Close() + return nil } func (db *db) AddDocActorRelationship( diff --git a/internal/db/iterator.go b/internal/db/iterator.go index 38ead11dd3..00519d1915 100644 --- a/internal/db/iterator.go +++ b/internal/db/iterator.go @@ -12,7 +12,6 @@ package db import ( "context" - "io" "github.com/ipfs/go-cid" @@ -34,13 +33,6 @@ type DocHeadBlocksIterator struct { currentRawBlock []byte } -var _ io.Closer = (*DocHeadBlocksIterator)(nil) - -func (h *DocHeadBlocksIterator) Close() error { - h.cids = nil - return nil -} - // NewHeadBlocksIterator creates a new DocHeadBlocksIterator. func NewHeadBlocksIterator( ctx context.Context, diff --git a/internal/db/p2p_replicator.go b/internal/db/p2p_replicator.go index 5b7754fff4..a6d28f261a 100644 --- a/internal/db/p2p_replicator.go +++ b/internal/db/p2p_replicator.go @@ -13,7 +13,6 @@ package db import ( "context" "encoding/json" - "errors" "time" "github.com/fxamacker/cbor/v2" @@ -640,7 +639,7 @@ func (db *db) retryReplicator(ctx context.Context, peerID string) { } } -func (db *db) retryDoc(ctx context.Context, docID string) (resErr error) { +func (db *db) retryDoc(ctx context.Context, docID string) error { ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -651,7 +650,6 @@ func (db *db) retryDoc(ctx context.Context, docID string) (resErr error) { if err != nil { return err } - defer func() { resErr = errors.Join(resErr, headsIterator.Close()) }() for { select { From 1bab12843f494d772efbc29b5ca841f0a5aeab5d Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Thu, 7 Nov 2024 18:22:45 +0100 Subject: [PATCH 13/14] Make collectionRetriever work as value --- internal/db/collection_retriever.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/db/collection_retriever.go b/internal/db/collection_retriever.go index 1099e1abfc..6fc134c722 100644 --- a/internal/db/collection_retriever.go +++ b/internal/db/collection_retriever.go @@ -25,14 +25,14 @@ type collectionRetriever struct { } // NewCollectionRetriever creates a new CollectionRetriever. -func NewCollectionRetriever(db client.DB) *collectionRetriever { - return &collectionRetriever{ +func NewCollectionRetriever(db client.DB) collectionRetriever { + return collectionRetriever{ db: db, } } // RetrieveCollectionFromDocID retrieves a collection from a document ID. -func (r *collectionRetriever) RetrieveCollectionFromDocID( +func (r collectionRetriever) RetrieveCollectionFromDocID( ctx context.Context, docID string, ) (client.Collection, error) { From fd326a2b0cf9b32a7225e7396aa680326cb2b4f9 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Thu, 7 Nov 2024 20:19:20 +0100 Subject: [PATCH 14/14] Add documentation --- internal/db/db.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/db/db.go b/internal/db/db.go index 3aee6de016..630bd0ae43 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -227,6 +227,9 @@ func (db *db) AddPolicy( return client.AddPolicyResult{PolicyID: policyID}, nil } +// publishDocUpdateEvent publishes an update event for a document. +// It uses heads iterator to read the document's head blocks directly from the storage, i.e. without +// using a transaction. func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collection client.Collection) error { headsIterator, err := NewHeadBlocksIterator(ctx, db.multistore.Headstore(), db.Blockstore(), docID) if err != nil {