Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: P2P client interface #1924

Merged
merged 37 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
022af8f
fix typo
nasdf Sep 28, 2023
a49626a
add client.P2P interface stubs to net.Peer
nasdf Sep 28, 2023
346fbce
remove P2P interface functions from db. implement P2P interface funct…
nasdf Sep 29, 2023
55e2f21
net package tests passing
nasdf Sep 29, 2023
1c3f4e1
remove grpc config
nasdf Sep 30, 2023
175c5a8
split p2p interface in cli and http. fix failing tests
nasdf Oct 2, 2023
292150c
clean up http handler constructor func
nasdf Oct 2, 2023
13f2fff
update p2p interface with test methods temporarily
nasdf Oct 3, 2023
1cfdda3
move integration test setup to new file
nasdf Oct 3, 2023
c55ea7e
fix nil cancel in node close
nasdf Oct 3, 2023
1ec5f38
integration tests passing
nasdf Oct 3, 2023
e5b7b53
fix linter errors
nasdf Oct 3, 2023
eb04aed
implement PeerInfo method in cli wrapper
nasdf Oct 3, 2023
ab11b65
fix incorrect context key usage in cli
nasdf Oct 3, 2023
78ac2cb
restore node addresses on restart in tests
nasdf Oct 3, 2023
f3887a4
fix node restart address logic in tests
nasdf Oct 3, 2023
5c2d60b
move test functions from p2p interface to tests/clients. disable p2p …
nasdf Oct 4, 2023
2a01990
fix race condition in test
nasdf Oct 4, 2023
7e6bb4f
add replicator delete test
nasdf Oct 4, 2023
94a7b8b
remove flaky logging asserts from net package
nasdf Oct 4, 2023
592e411
Merge branch 'develop' into nasdf/refactor/p2p-client-interface
nasdf Oct 4, 2023
687f0e2
Merge branch 'develop' into nasdf/refactor/p2p-client-interface
nasdf Oct 10, 2023
fa6b67e
restore peer subscribe tests
nasdf Oct 10, 2023
8cca432
add replicator cli examples
nasdf Oct 11, 2023
4be14e7
update readme p2p examples to reflect changes
nasdf Oct 12, 2023
edb9249
fix incorrect readme cli command
nasdf Oct 12, 2023
b907f30
remove grpc flag from readme
nasdf Oct 12, 2023
25139aa
improve cli replicator docs
nasdf Oct 13, 2023
580f016
split tests setup into db and client files
nasdf Oct 13, 2023
4532bfd
fix replicator example in readme
nasdf Oct 13, 2023
6ec4577
split test init logic into client and db files
nasdf Oct 13, 2023
76c9b5f
panic if cli test wrapper peer info fails
nasdf Oct 13, 2023
03d2a6e
Squashed commit of the following:
nasdf Oct 13, 2023
a96746c
Merge branch 'develop' into nasdf/refactor/p2p-client-interface
nasdf Oct 13, 2023
b967436
Merge branch 'develop' into nasdf/refactor/p2p-client-interface
nasdf Oct 16, 2023
abf2af3
possible fix for node restart race condition in tests
nasdf Oct 16, 2023
40b2603
Merge branch 'develop' into nasdf/refactor/p2p-client-interface
nasdf Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 36 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,57 +244,73 @@ When starting a node for the first time, a key pair is generated and stored in i

Each node has a unique `PeerID` generated from its public key. This ID allows other nodes to connect to it.

To view your node's peer info:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: Thank you for adding this!


```shell
defradb client p2p info
```

There are two types of peer-to-peer relationships supported: **pubsub** peering and **replicator** peering.

Pubsub peering *passively* synchronizes data between nodes by broadcasting *Document Commit* updates to the topic of the commit's document key. Nodes need to be listening on the pubsub channel to receive updates. This is for when two nodes *already* have share a document and want to keep them in sync.

Replicator peering *actively* pushes changes from a specific collection *to* a target peer.

### Pubsub example
<details>
<summary>Pubsub example</summary>

Pubsub peers can be specified on the command line using the `--peers` flag, which accepts a comma-separated list of peer [multiaddresses](https://docs.libp2p.io/concepts/addressing/). For example, a node at IP `192.168.1.12` listening on 9000 with PeerID `12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B` would be referred to using the multiaddress `/ip4/192.168.1.12/tcp/9000/p2p/12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B`.

Let's go through an example of two nodes (*nodeA* and *nodeB*) connecting with each other over pubsub, on the same machine.

Start *nodeA* with a default configuration:

```
```shell
defradb start
```

Obtain the PeerID from its console output. In this example, we use `12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B`, but locally it will be different.
Obtain the node's peer info:

```shell
defradb client p2p info
```

In this example, we use `12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B`, but locally it will be different.

For *nodeB*, we provide the following configuration:

```
defradb start --rootdir ~/.defradb-nodeB --url localhost:9182 --p2paddr /ip4/0.0.0.0/tcp/9172 --tcpaddr /ip4/0.0.0.0/tcp/9162 --peers /ip4/0.0.0.0/tcp/9171/p2p/12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B
```shell
defradb start --rootdir ~/.defradb-nodeB --url localhost:9182 --p2paddr /ip4/0.0.0.0/tcp/9172 --peers /ip4/0.0.0.0/tcp/9171/p2p/12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B
```

About the flags:

- `--rootdir` specifies the root dir (config and data) to use
- `--url` is the address to listen on for the client HTTP and GraphQL API
- `--p2paddr` is the multiaddress for the P2P networking to listen on
- `--tcpaddr` is the multiaddress for the gRPC server to listen on
- `--peers` is a comma-separated list of peer multiaddresses

This starts two nodes and connects them via pubsub networking.
</details>

### Collection subscription example
<details>
<summary>Subscription example</summary>

It is possible to subscribe to updates on a given collection by using its ID as the pubsub topic. The ID of a collection is found as the field `collectionID` in one of its documents. Here we use the collection ID of the `User` type we created above. After setting up 2 nodes as shown in the [Pubsub example](#pubsub-example) section, we can subscribe to collections updates on *nodeA* from *nodeB* by using the `rpc p2pcollection` command:
It is possible to subscribe to updates on a given collection by using its ID as the pubsub topic. The ID of a collection is found as the field `collectionID` in one of its documents. Here we use the collection ID of the `User` type we created above. After setting up 2 nodes as shown in the [Pubsub example](#pubsub-example) section, we can subscribe to collections updates on *nodeA* from *nodeB* by using the following command:

```shell
defradb client rpc p2pcollection add --url localhost:9182 bafkreibpnvkvjqvg4skzlijka5xe63zeu74ivcjwd76q7yi65jdhwqhske
defradb client p2p collection add --url localhost:9182 bafkreibpnvkvjqvg4skzlijka5xe63zeu74ivcjwd76q7yi65jdhwqhske
```

Multiple collection IDs can be added at once.

```shell
defradb client rpc p2pcollection add --url localhost:9182 <collection1ID> <collection2ID> <collection3ID>
defradb client p2p collection add --url localhost:9182 <collection1ID>,<collection2ID>,<collection3ID>
```
</details>

### Replicator example
<details>
<summary>Replicator example</summary>

Replicator peering is targeted: it allows a node to actively send updates to another node. Let's go through an example of *nodeA* actively replicating to *nodeB*:

Expand Down Expand Up @@ -334,14 +350,20 @@ defradb client schema add --url localhost:9182 '
'
```

Set *nodeA* to actively replicate the "Article" collection to *nodeB*:
Then copy the peer info from *nodeB*:

```shell
defradb client rpc replicator set -c "Article" /ip4/0.0.0.0/tcp/9172/p2p/<peerID_of_nodeB>
defradb client p2p info
```

As we add or update documents in the "Article" collection on *nodeA*, they will be actively pushed to *nodeB*. Note that changes to *nodeB* will still be passively published back to *nodeA*, via pubsub.
Set *nodeA* to actively replicate the Article collection to *nodeB*:

```shell
defradb client p2p replicator set -c Article <peer_info_of_nodeB>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: I had to check the help text of defradb client p2p replicator set to be sure that <peer_info_of_nodeB> meant the full json object - it might be worth renaming this to reflect that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it to nodeB_peer_info_json to hopefully make it more obvious.

```

As we add or update documents in the Article collection on *nodeA*, they will be actively pushed to *nodeB*. Note that changes to *nodeB* will still be passively published back to *nodeA*, via pubsub.
</details>

## Securing the HTTP API with TLS

Expand Down
4 changes: 2 additions & 2 deletions cli/p2p_collection_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Example: add multiple collections
`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

var collectionIDs []string
for _, id := range strings.Split(args[0], ",") {
Expand All @@ -42,7 +42,7 @@ Example: add multiple collections
collectionIDs = append(collectionIDs, id)
}

return store.AddP2PCollections(cmd.Context(), collectionIDs)
return p2p.AddP2PCollections(cmd.Context(), collectionIDs)
},
}
return cmd
Expand Down
4 changes: 2 additions & 2 deletions cli/p2p_collection_getall.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func MakeP2PCollectionGetAllCommand() *cobra.Command {
This is the list of collections of the node that are synchronized on the pubsub network.`,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

cols, err := store.GetAllP2PCollections(cmd.Context())
cols, err := p2p.GetAllP2PCollections(cmd.Context())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cli/p2p_collection_remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Example: remove multiple collections
`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

var collectionIDs []string
for _, id := range strings.Split(args[0], ",") {
Expand All @@ -42,7 +42,7 @@ Example: remove multiple collections
collectionIDs = append(collectionIDs, id)
}

return store.RemoveP2PCollections(cmd.Context(), collectionIDs)
return p2p.RemoveP2PCollections(cmd.Context(), collectionIDs)
},
}
return cmd
Expand Down
7 changes: 1 addition & 6 deletions cli/p2p_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ func MakeP2PInfoCommand() *cobra.Command {
Long: `Get peer info from a DefraDB node`,
RunE: func(cmd *cobra.Command, args []string) error {
db := cmd.Context().Value(dbContextKey).(*http.Client)

res, err := db.PeerInfo(cmd.Context())
if err != nil {
return err
}
return writeJSON(cmd, res)
return writeJSON(cmd, db.PeerInfo())
},
}
return cmd
Expand Down
27 changes: 20 additions & 7 deletions cli/p2p_replicator_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,40 @@
package cli

import (
"encoding/json"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/client"
)

func MakeP2PReplicatorDeleteCommand() *cobra.Command {
var collections []string
var cmd = &cobra.Command{
Use: "delete <peer>",
Use: "delete [-c, --collection] <peer>",
Short: "Delete a replicator. It will stop synchronizing",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Should this (and Long), not now read Delete replicators...? It looks like it can now delete multiple.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are referring to deleting multiple collections replicated to a single peer you are correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: Please change the short/long wording to reflect this :)

Long: `Delete a replicator. It will stop synchronizing.`,
Args: cobra.ExactArgs(1),
Long: `Delete a replicator. It will stop synchronizing.

Example:
defradb client p2p replicator delete -c Users '{"ID": "12D3", "Addrs": ["/ip4/0.0.0.0/tcp/9171"]}'
`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

addr, err := peer.AddrInfoFromString(args[0])
if err != nil {
var info peer.AddrInfo
if err := json.Unmarshal([]byte(args[0]), &info); err != nil {
return err
}
return store.DeleteReplicator(cmd.Context(), client.Replicator{Info: *addr})
rep := client.Replicator{
Info: info,
Schemas: collections,
}
return p2p.DeleteReplicator(cmd.Context(), rep)
},
}
cmd.Flags().StringSliceVarP(&collections, "collection", "c",
[]string{}, "Collection(s) to stop replicating")
return cmd
}
4 changes: 2 additions & 2 deletions cli/p2p_replicator_getall.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
Long: `Get all the replicators active in the P2P data sync system.
These are the replicators that are currently replicating data from one node to another.`,
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

Check warning on line 24 in cli/p2p_replicator_getall.go

View check run for this annotation

Codecov / codecov/patch

cli/p2p_replicator_getall.go#L24

Added line #L24 was not covered by tests

reps, err := store.GetAllReplicators(cmd.Context())
reps, err := p2p.GetAllReplicators(cmd.Context())

Check warning on line 26 in cli/p2p_replicator_getall.go

View check run for this annotation

Codecov / codecov/patch

cli/p2p_replicator_getall.go#L26

Added line #L26 was not covered by tests
if err != nil {
return err
}
Expand Down
17 changes: 11 additions & 6 deletions cli/p2p_replicator_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package cli

import (
"encoding/json"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/spf13/cobra"

Expand All @@ -24,24 +26,27 @@ func MakeP2PReplicatorSetCommand() *cobra.Command {
Short: "Set a P2P replicator",
Long: `Add a new target replicator.
A replicator replicates one or all collection(s) from this node to another.

Example:
defradb client p2p replicator set -c Users '{"ID": "12D3", "Addrs": ["/ip4/0.0.0.0/tcp/9171"]}'
`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

addr, err := peer.AddrInfoFromString(args[0])
if err != nil {
var info peer.AddrInfo
if err := json.Unmarshal([]byte(args[0]), &info); err != nil {
return err
}
rep := client.Replicator{
Info: *addr,
Info: info,
Schemas: collections,
}
return store.SetReplicator(cmd.Context(), rep)
return p2p.SetReplicator(cmd.Context(), rep)
},
}

cmd.Flags().StringSliceVarP(&collections, "collection", "c",
[]string{}, "Define the collection for the replicator")
[]string{}, "Collection(s) to replicate")
return cmd
}
58 changes: 25 additions & 33 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,10 @@

func (di *defraInstance) close(ctx context.Context) {
if di.node != nil {
if err := di.node.Close(); err != nil {
log.FeedbackInfo(
ctx,
"The node could not be closed successfully",
logging.NewKV("Error", err.Error()),
)
}
di.node.Close()
} else {
di.db.Close()

Check warning on line 176 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L174-L176

Added lines #L174 - L176 were not covered by tests
}
di.db.Close(ctx)
if err := di.server.Close(); err != nil {
log.FeedbackInfo(
ctx,
Expand Down Expand Up @@ -222,7 +217,7 @@
}

// init the p2p node
var n *net.Node
var node *net.Node

Check warning on line 220 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L220

Added line #L220 was not covered by tests
if !cfg.Net.P2PDisabled {
nodeOpts := []net.NodeOpt{
net.WithConfig(cfg),
Expand All @@ -239,9 +234,9 @@
nodeOpts = append(nodeOpts, net.WithPrivateKey(key))
}
log.FeedbackInfo(ctx, "Starting P2P node", logging.NewKV("P2P address", cfg.Net.P2PAddress))
n, err = net.NewNode(ctx, db, nodeOpts...)
node, err = net.NewNode(ctx, db, nodeOpts...)

Check warning on line 237 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L237

Added line #L237 was not covered by tests
if err != nil {
db.Close(ctx)
db.Close()

Check warning on line 239 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L239

Added line #L239 was not covered by tests
return nil, errors.Wrap("failed to start P2P node", err)
}

Expand All @@ -253,14 +248,11 @@
return nil, errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %v", cfg.Net.Peers), err)
}
log.Debug(ctx, "Bootstrapping with peers", logging.NewKV("Addresses", addrs))
n.Bootstrap(addrs)
node.Bootstrap(addrs)

Check warning on line 251 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L251

Added line #L251 was not covered by tests
}

if err := n.Start(); err != nil {
if e := n.Close(); e != nil {
err = errors.Wrap(fmt.Sprintf("failed to close node: %v", e.Error()), err)
}
db.Close(ctx)
if err := node.Start(); err != nil {
node.Close()

Check warning on line 255 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L254-L255

Added lines #L254 - L255 were not covered by tests
return nil, errors.Wrap("failed to start P2P listeners", err)
}
}
Expand All @@ -271,10 +263,6 @@
httpapi.WithAllowedOrigins(cfg.API.AllowedOrigins...),
}

if n != nil {
sOpt = append(sOpt, httpapi.WithPeerID(n.PeerID().String()))
}

if cfg.API.TLS {
sOpt = append(
sOpt,
Expand All @@ -284,32 +272,36 @@
)
}

s := httpapi.NewServer(db, sOpt...)
if err := s.Listen(ctx); err != nil {
return nil, errors.Wrap(fmt.Sprintf("failed to listen on TCP address %v", s.Addr), err)
var server *httpapi.Server
if node != nil {
server = httpapi.NewServer(node, sOpt...)
} else {
server = httpapi.NewServer(db, sOpt...)
}
if err := server.Listen(ctx); err != nil {
return nil, errors.Wrap(fmt.Sprintf("failed to listen on TCP address %v", server.Addr), err)

Check warning on line 282 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L275-L282

Added lines #L275 - L282 were not covered by tests
}
// save the address on the config in case the port number was set to random
cfg.API.Address = s.AssignedAddr()
cfg.API.Address = server.AssignedAddr()

Check warning on line 285 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L285

Added line #L285 was not covered by tests

// run the server in a separate goroutine
go func() {
log.FeedbackInfo(ctx, fmt.Sprintf("Providing HTTP API at %s.", cfg.API.AddressToURL()))
if err := s.Run(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
if err := server.Run(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {

Check warning on line 290 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L290

Added line #L290 was not covered by tests
log.FeedbackErrorE(ctx, "Failed to run the HTTP server", err)
if n != nil {
if err := n.Close(); err != nil {
log.FeedbackErrorE(ctx, "Failed to close node", err)
}
if node != nil {
node.Close()
} else {
db.Close()

Check warning on line 295 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L292-L295

Added lines #L292 - L295 were not covered by tests
}
db.Close(ctx)
os.Exit(1)
}
}()

return &defraInstance{
node: n,
node: node,

Check warning on line 302 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L302

Added line #L302 was not covered by tests
db: db,
server: s,
server: server,

Check warning on line 304 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L304

Added line #L304 was not covered by tests
}, nil
}

Expand Down
Loading