Skip to content

Commit

Permalink
feat: Handle P2P with SourceHub ACP (sourcenetwork#2848)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves sourcenetwork#2366

## Description

Handles P2P with SourceHub ACP.

Local ACP remains blocked off, as documents synced would essentially
become public unless encrypted.
  • Loading branch information
AndrewSisley authored Jul 19, 2024
1 parent 517333c commit 25a3063
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 118 deletions.
5 changes: 3 additions & 2 deletions acp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,10 @@ If authentication fails for any reason a `403` forbidden response will be return
## _FAC Usage: (coming soon)_

## Warning / Caveats
- If using Local ACP, P2P will only work with collections that do not have a policy assigned. If you wish to use ACP
on collections connected to a multi-node network, please use SourceHub ACP.

The following features currently don't work with ACP, they are being actively worked on.
- [P2P: Adding a replicator with permissioned collection](https://github.com/sourcenetwork/defradb/issues/2366)
- [P2P: Subscription to a permissioned collection](https://github.com/sourcenetwork/defradb/issues/2366)
- [Adding Secondary Indexes](https://github.com/sourcenetwork/defradb/issues/2365)
- [Backing/Restoring Private Documents](https://github.com/sourcenetwork/defradb/issues/2430)

Expand Down
3 changes: 3 additions & 0 deletions acp/acp.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,7 @@ type ACP interface {
resourceName string,
docID string,
) (bool, error)

// SupportsP2P returns true if the implementation supports ACP across a peer network.
SupportsP2P() bool
}
5 changes: 5 additions & 0 deletions acp/source_hub_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ func (a *sourceHubBridge) CheckDocAccess(
}
}

func (a *sourceHubBridge) SupportsP2P() bool {
_, ok := a.client.(*acpSourceHub)
return ok
}

func (a *sourceHubBridge) Close() error {
return a.client.Close()
}
1 change: 0 additions & 1 deletion internal/db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ var (
ErrP2PColHasPolicy = errors.New("p2p collection specified has a policy on it")
ErrNoTransactionInContext = errors.New(errNoTransactionInContext)
ErrReplicatorColHasPolicy = errors.New("replicator collection specified has a policy on it")
ErrReplicatorSomeColsHavePolicy = errors.New("replicator can not use all collections, as some have policy")
ErrSelfTargetForReplicator = errors.New("can't target ourselves as a replicator")
ErrReplicatorCollections = errors.New(errReplicatorCollections)
ErrReplicatorNotFound = errors.New(errReplicatorNotFound)
Expand Down
19 changes: 5 additions & 14 deletions internal/db/p2p_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func (db *db) SetReplicator(ctx context.Context, rep client.Replicator) error {
return ErrSelfTargetForReplicator
}

// TODO-ACP: Support ACP <> P2P - https://github.com/sourcenetwork/defradb/issues/2366
// ctx = db.SetContextIdentity(ctx, identity)
ctx = SetContextTxn(ctx, txn)

storedRep := client.Replicator{}
Expand Down Expand Up @@ -82,29 +80,22 @@ func (db *db) SetReplicator(ctx context.Context, rep client.Replicator) error {
return NewErrReplicatorCollections(err)
}

if col.Description().Policy.HasValue() {
return ErrReplicatorColHasPolicy
}

collections = append(collections, col)
}

default:
// default to all collections (unless a collection contains a policy).
// TODO-ACP: default to all collections after resolving https://github.com/sourcenetwork/defradb/issues/2366
allCollections, err := db.GetCollections(ctx, client.CollectionFetchOptions{})
collections, err = db.GetCollections(ctx, client.CollectionFetchOptions{})
if err != nil {
return NewErrReplicatorCollections(err)
}
}

for _, col := range allCollections {
// Can not default to all collections if any collection has a policy.
// TODO-ACP: remove this check/loop after https://github.com/sourcenetwork/defradb/issues/2366
if db.acp.HasValue() && !db.acp.Value().SupportsP2P() {
for _, col := range collections {
if col.Description().Policy.HasValue() {
return ErrReplicatorSomeColsHavePolicy
return ErrReplicatorColHasPolicy
}
}
collections = allCollections
}

addedCols := []client.Collection{}
Expand Down
15 changes: 5 additions & 10 deletions internal/db/p2p_schema_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ func (db *db) AddP2PCollections(ctx context.Context, collectionIDs []string) err
}
defer txn.Discard(ctx)

// TODO-ACP: Support ACP <> P2P - https://github.com/sourcenetwork/defradb/issues/2366
// ctx = db.SetContextIdentity(ctx, identity)
ctx = SetContextTxn(ctx, txn)

// first let's make sure the collections actually exists
Expand All @@ -53,11 +51,11 @@ func (db *db) AddP2PCollections(ctx context.Context, collectionIDs []string) err
storeCollections = append(storeCollections, storeCol...)
}

// Ensure none of the collections have a policy on them, until following is implemented:
// TODO-ACP: ACP <> P2P https://github.com/sourcenetwork/defradb/issues/2366
for _, col := range storeCollections {
if col.Description().Policy.HasValue() {
return ErrP2PColHasPolicy
if db.acp.HasValue() && !db.acp.Value().SupportsP2P() {
for _, col := range storeCollections {
if col.Description().Policy.HasValue() {
return ErrP2PColHasPolicy
}
}
}

Expand Down Expand Up @@ -98,8 +96,6 @@ func (db *db) RemoveP2PCollections(ctx context.Context, collectionIDs []string)
}
defer txn.Discard(ctx)

// TODO-ACP: Support ACP <> P2P - https://github.com/sourcenetwork/defradb/issues/2366
// ctx = db.SetContextIdentity(ctx, identity)
ctx = SetContextTxn(ctx, txn)

// first let's make sure the collections actually exists
Expand Down Expand Up @@ -211,7 +207,6 @@ func (db *db) loadAndPublishP2PCollections(ctx context.Context) error {
if _, ok := colMap[col.SchemaRoot()]; ok {
continue
}
// TODO-ACP: Support ACP <> P2P - https://github.com/sourcenetwork/defradb/issues/2366
docIDChan, err := col.GetAllDocIDs(ctx)
if err != nil {
return err
Expand Down
65 changes: 0 additions & 65 deletions internal/db/p2p_schema_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,11 @@ package db

import (
"context"
"encoding/hex"
"fmt"
"testing"
"time"

"github.com/decred/dcrd/dcrec/secp256k1/v4"
"github.com/sourcenetwork/immutable"
"github.com/stretchr/testify/require"

"github.com/sourcenetwork/defradb/acp"
acpIdentity "github.com/sourcenetwork/defradb/acp/identity"
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/datastore/memory"
"github.com/sourcenetwork/defradb/event"
)

Expand Down Expand Up @@ -248,60 +240,3 @@ func TestGetAllP2PCollections_WithMultipleValidCollections_ShouldSucceed(t *test
require.NoError(t, err)
require.Equal(t, []string{schema2.Root, schema1.Root}, cols)
}

// This test documents that we don't allow adding p2p collections that have a policy
// until the following is implemented:
// TODO-ACP: ACP <> P2P https://github.com/sourcenetwork/defradb/issues/2366
func TestAddP2PCollectionsWithPermissionedCollection_Error(t *testing.T) {
ctx := context.Background()
rootstore := memory.NewDatastore(ctx)
db, err := newDB(ctx, rootstore, immutable.Some[acp.ACP](acp.NewLocalACP()), nil)
require.NoError(t, err)

policy := `
name: test
description: a policy
actor:
name: actor
resources:
user:
permissions:
read:
expr: owner
write:
expr: owner
relations:
owner:
types:
- actor
`

privKeyBytes, err := hex.DecodeString("028d53f37a19afb9a0dbc5b4be30c65731479ee8cfa0c9bc8f8bf198cc3c075f")
require.NoError(t, err)
privKey := secp256k1.PrivKeyFromBytes(privKeyBytes)
identity, err := acpIdentity.FromPrivateKey(privKey, time.Hour, immutable.None[string](), immutable.None[string](), false)
require.NoError(t, err)

ctx = SetContextIdentity(ctx, immutable.Some(identity))
policyResult, err := db.AddPolicy(ctx, policy)
policyID := policyResult.PolicyID
require.NoError(t, err)
require.Equal(t, "7b5ed30570e8d9206027ef6d5469879a6c1ea4595625c6ca33a19063a6ed6214", policyID)

schema := fmt.Sprintf(`
type User @policy(id: "%s", resource: "user") {
name: String
age: Int
}
`, policyID,
)
_, err = db.AddSchema(ctx, schema)
require.NoError(t, err)

col, err := db.GetCollectionByName(ctx, "User")
require.NoError(t, err)

err = db.AddP2PCollections(ctx, []string{col.SchemaRoot()})
require.Error(t, err)
require.ErrorIs(t, err, ErrP2PColHasPolicy)
}
9 changes: 8 additions & 1 deletion tests/integration/acp.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func init() {
type AddPolicy struct {
// NodeID may hold the ID (index) of the node we want to add policy to.
//
// If a value is not provided the policy will be added in all nodes.
// If a value is not provided the policy will be added in all nodes, unless testing with
// sourcehub ACP, in which case the policy will only be defined once.
NodeID immutable.Option[int]

// The raw policy string.
Expand Down Expand Up @@ -110,6 +111,12 @@ func addPolicyACP(

expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError)
assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised)

// The policy should only be added to a SourceHub chain once - there is no need to loop through
// the nodes.
if acpType == SourceHubACPType {
break
}
}
}

Expand Down
132 changes: 117 additions & 15 deletions tests/integration/acp/p2p/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,18 @@ import (
testUtils "github.com/sourcenetwork/defradb/tests/integration"
)

// This test documents that we don't allow setting replicator with a collections that has a policy
// until the following is implemented:
// TODO-ACP: ACP <> P2P https://github.com/sourcenetwork/defradb/issues/2366
func TestACP_P2POneToOneReplicatorWithPermissionedCollection_Error(t *testing.T) {
func TestACP_P2POneToOneReplicatorWithPermissionedCollection_LocalACP(t *testing.T) {
test := testUtils.TestCase{

Description: "Test acp, with p2p replicator with permissioned collection, error",

SupportedACPTypes: immutable.Some(
[]testUtils.ACPType{
testUtils.LocalACPType,
},
),
Actions: []any{

testUtils.RandomNetworkingConfig(),
testUtils.RandomNetworkingConfig(),

testUtils.AddPolicy{

Identity: immutable.Some(1),

Policy: `
name: test
description: a test policy which marks a collection in a database as a resource
Expand Down Expand Up @@ -63,10 +58,8 @@ func TestACP_P2POneToOneReplicatorWithPermissionedCollection_Error(t *testing.T)
types:
- actor
`,

ExpectedPolicyID: "94eb195c0e459aa79e02a1986c7e731c5015721c18a373f2b2a0ed140a04b454",
},

testUtils.SchemaUpdate{
Schema: `
type Users @policy(
Expand All @@ -78,11 +71,120 @@ func TestACP_P2POneToOneReplicatorWithPermissionedCollection_Error(t *testing.T)
}
`,
},

testUtils.ConfigureReplicator{
SourceNodeID: 0,
TargetNodeID: 1,
ExpectedError: "replicator can not use all collections, as some have policy",
ExpectedError: "replicator collection specified has a policy on it",
},
},
}

testUtils.ExecuteTestCase(t, test)
}

func TestACP_P2POneToOneReplicatorWithPermissionedCollection_SourceHubACP(t *testing.T) {
test := testUtils.TestCase{
SupportedACPTypes: immutable.Some(
[]testUtils.ACPType{
testUtils.SourceHubACPType,
},
),
Actions: []any{
testUtils.RandomNetworkingConfig(),
testUtils.RandomNetworkingConfig(),
testUtils.AddPolicy{
Identity: immutable.Some(1),
Policy: `
name: test
description: a test policy which marks a collection in a database as a resource
actor:
name: actor
resources:
users:
permissions:
read:
expr: owner + reader
write:
expr: owner
relations:
owner:
types:
- actor
reader:
types:
- actor
admin:
manages:
- reader
types:
- actor
`,
ExpectedPolicyID: "94eb195c0e459aa79e02a1986c7e731c5015721c18a373f2b2a0ed140a04b454",
},
testUtils.SchemaUpdate{
Schema: `
type Users @policy(
id: "94eb195c0e459aa79e02a1986c7e731c5015721c18a373f2b2a0ed140a04b454",
resource: "users"
) {
name: String
age: Int
}
`,
},
testUtils.ConfigureReplicator{
SourceNodeID: 0,
TargetNodeID: 1,
},
testUtils.CreateDoc{
NodeID: immutable.Some(0),
Identity: immutable.Some(1),
DocMap: map[string]any{
"name": "John",
},
},
testUtils.WaitForSync{},
testUtils.Request{
// Ensure that the document is accessible on all nodes to authorized actors
Identity: immutable.Some(1),
Request: `
query {
Users {
name
}
}
`,
Results: []map[string]any{
{
"name": "John",
},
},
},
testUtils.Request{
// Ensure that the document is hidden on all nodes to unidentified actors
Request: `
query {
Users {
name
}
}
`,
Results: []map[string]any{},
},
testUtils.Request{
// Ensure that the document is hidden on all nodes to unauthorized actors
Identity: immutable.Some(2),
Request: `
query {
Users {
name
}
}
`,
Results: []map[string]any{},
},
},
}
Expand Down
Loading

0 comments on commit 25a3063

Please sign in to comment.