Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: P2P client interface #1924

Merged
merged 37 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
022af8f
fix typo
nasdf Sep 28, 2023
a49626a
add client.P2P interface stubs to net.Peer
nasdf Sep 28, 2023
346fbce
remove P2P interface functions from db. implement P2P interface funct…
nasdf Sep 29, 2023
55e2f21
net package tests passing
nasdf Sep 29, 2023
1c3f4e1
remove grpc config
nasdf Sep 30, 2023
175c5a8
split p2p interface in cli and http. fix failing tests
nasdf Oct 2, 2023
292150c
clean up http handler constructor func
nasdf Oct 2, 2023
13f2fff
update p2p interface with test methods temporarily
nasdf Oct 3, 2023
1cfdda3
move integration test setup to new file
nasdf Oct 3, 2023
c55ea7e
fix nil cancel in node close
nasdf Oct 3, 2023
1ec5f38
integration tests passing
nasdf Oct 3, 2023
e5b7b53
fix linter errors
nasdf Oct 3, 2023
eb04aed
implement PeerInfo method in cli wrapper
nasdf Oct 3, 2023
ab11b65
fix incorrect context key usage in cli
nasdf Oct 3, 2023
78ac2cb
restore node addresses on restart in tests
nasdf Oct 3, 2023
f3887a4
fix node restart address logic in tests
nasdf Oct 3, 2023
5c2d60b
move test functions from p2p interface to tests/clients. disable p2p …
nasdf Oct 4, 2023
2a01990
fix race condition in test
nasdf Oct 4, 2023
7e6bb4f
add replicator delete test
nasdf Oct 4, 2023
94a7b8b
remove flaky logging asserts from net package
nasdf Oct 4, 2023
592e411
Merge branch 'develop' into nasdf/refactor/p2p-client-interface
nasdf Oct 4, 2023
687f0e2
Merge branch 'develop' into nasdf/refactor/p2p-client-interface
nasdf Oct 10, 2023
fa6b67e
restore peer subscribe tests
nasdf Oct 10, 2023
8cca432
add replicator cli examples
nasdf Oct 11, 2023
4be14e7
update readme p2p examples to reflect changes
nasdf Oct 12, 2023
edb9249
fix incorrect readme cli command
nasdf Oct 12, 2023
b907f30
remove grpc flag from readme
nasdf Oct 12, 2023
25139aa
improve cli replicator docs
nasdf Oct 13, 2023
580f016
split tests setup into db and client files
nasdf Oct 13, 2023
4532bfd
fix replicator example in readme
nasdf Oct 13, 2023
6ec4577
split test init logic into client and db files
nasdf Oct 13, 2023
76c9b5f
panic if cli test wrapper peer info fails
nasdf Oct 13, 2023
03d2a6e
Squashed commit of the following:
nasdf Oct 13, 2023
a96746c
Merge branch 'develop' into nasdf/refactor/p2p-client-interface
nasdf Oct 13, 2023
b967436
Merge branch 'develop' into nasdf/refactor/p2p-client-interface
nasdf Oct 16, 2023
abf2af3
possible fix for node restart race condition in tests
nasdf Oct 16, 2023
40b2603
Merge branch 'develop' into nasdf/refactor/p2p-client-interface
nasdf Oct 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/p2p_collection_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func MakeP2PCollectionAddCommand() *cobra.Command {
The collections are synchronized between nodes of a pubsub network.`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
return store.AddP2PCollection(cmd.Context(), args[0])
p2p := mustGetP2PContext(cmd)
return p2p.AddP2PCollection(cmd.Context(), args[0])
},
}
return cmd
Expand Down
4 changes: 2 additions & 2 deletions cli/p2p_collection_getall.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func MakeP2PCollectionGetAllCommand() *cobra.Command {
This is the list of collections of the node that are synchronized on the pubsub network.`,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

cols, err := store.GetAllP2PCollections(cmd.Context())
cols, err := p2p.GetAllP2PCollections(cmd.Context())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cli/p2p_collection_remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func MakeP2PCollectionRemoveCommand() *cobra.Command {
The removed collections will no longer be synchronized between nodes.`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
return store.RemoveP2PCollection(cmd.Context(), args[0])
p2p := mustGetP2PContext(cmd)
return p2p.RemoveP2PCollection(cmd.Context(), args[0])
},
}
return cmd
Expand Down
7 changes: 1 addition & 6 deletions cli/p2p_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ func MakeP2PInfoCommand() *cobra.Command {
Long: `Get peer info from a DefraDB node`,
RunE: func(cmd *cobra.Command, args []string) error {
db := cmd.Context().Value(dbContextKey).(*http.Client)

res, err := db.PeerInfo(cmd.Context())
if err != nil {
return err
}
return writeJSON(cmd, res)
return writeJSON(cmd, db.PeerInfo())
},
}
return cmd
Expand Down
19 changes: 14 additions & 5 deletions cli/p2p_replicator_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,36 @@
package cli

import (
"encoding/json"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/client"
)

func MakeP2PReplicatorDeleteCommand() *cobra.Command {
var collections []string
var cmd = &cobra.Command{
Use: "delete <peer>",
Use: "delete [-c, --collection] <peer>",
Short: "Delete a replicator. It will stop synchronizing",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Should this (and Long), not now read Delete replicators...? It looks like it can now delete multiple.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are referring to deleting multiple collections replicated to a single peer you are correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: Please change the short/long wording to reflect this :)

Long: `Delete a replicator. It will stop synchronizing.`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

addr, err := peer.AddrInfoFromString(args[0])
if err != nil {
var info peer.AddrInfo
if err := json.Unmarshal([]byte(args[0]), &info); err != nil {
return err
}
return store.DeleteReplicator(cmd.Context(), client.Replicator{Info: *addr})
rep := client.Replicator{
Info: info,
Schemas: collections,
}
return p2p.DeleteReplicator(cmd.Context(), rep)
},
}
cmd.Flags().StringSliceVarP(&collections, "collection", "c",
[]string{}, "Collection(s) to stop replicating")
return cmd
}
4 changes: 2 additions & 2 deletions cli/p2p_replicator_getall.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
Long: `Get all the replicators active in the P2P data sync system.
These are the replicators that are currently replicating data from one node to another.`,
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

Check warning on line 24 in cli/p2p_replicator_getall.go

View check run for this annotation

Codecov / codecov/patch

cli/p2p_replicator_getall.go#L24

Added line #L24 was not covered by tests

reps, err := store.GetAllReplicators(cmd.Context())
reps, err := p2p.GetAllReplicators(cmd.Context())

Check warning on line 26 in cli/p2p_replicator_getall.go

View check run for this annotation

Codecov / codecov/patch

cli/p2p_replicator_getall.go#L26

Added line #L26 was not covered by tests
if err != nil {
return err
}
Expand Down
14 changes: 8 additions & 6 deletions cli/p2p_replicator_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package cli

import (
"encoding/json"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/spf13/cobra"

Expand All @@ -27,21 +29,21 @@ A replicator replicates one or all collection(s) from this node to another.
`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

addr, err := peer.AddrInfoFromString(args[0])
if err != nil {
var info peer.AddrInfo
if err := json.Unmarshal([]byte(args[0]), &info); err != nil {
return err
}
rep := client.Replicator{
Info: *addr,
Info: info,
Schemas: collections,
}
return store.SetReplicator(cmd.Context(), rep)
return p2p.SetReplicator(cmd.Context(), rep)
},
}

cmd.Flags().StringSliceVarP(&collections, "collection", "c",
[]string{}, "Define the collection for the replicator")
[]string{}, "Collection(s) to replicate")
return cmd
}
113 changes: 25 additions & 88 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,14 @@
import (
"context"
"fmt"
gonet "net"
"net/http"
"os"
"os/signal"
"strings"
"syscall"

badger "github.com/dgraph-io/badger/v4"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
ma "github.com/multiformats/go-multiaddr"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/config"
Expand All @@ -37,7 +31,6 @@
httpapi "github.com/sourcenetwork/defradb/http"
"github.com/sourcenetwork/defradb/logging"
"github.com/sourcenetwork/defradb/net"
netpb "github.com/sourcenetwork/defradb/net/pb"
netutils "github.com/sourcenetwork/defradb/net/utils"
)

Expand Down Expand Up @@ -113,15 +106,6 @@
log.FeedbackFatalE(context.Background(), "Could not bind net.p2paddress", err)
}

cmd.Flags().String(
"tcpaddr", cfg.Net.TCPAddress,
"Listener address for the tcp gRPC server (formatted as a libp2p MultiAddr)",
)
err = cfg.BindFlag("net.tcpaddress", cmd.Flags().Lookup("tcpaddr"))
if err != nil {
log.FeedbackFatalE(context.Background(), "Could not bind net.tcpaddress", err)
}

cmd.Flags().Bool(
"no-p2p", cfg.Net.P2PDisabled,
"Disable the peer-to-peer network synchronization system",
Expand Down Expand Up @@ -186,15 +170,10 @@

func (di *defraInstance) close(ctx context.Context) {
if di.node != nil {
if err := di.node.Close(); err != nil {
log.FeedbackInfo(
ctx,
"The node could not be closed successfully",
logging.NewKV("Error", err.Error()),
)
}
di.node.Close()
} else {
di.db.Close()

Check warning on line 175 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L173-L175

Added lines #L173 - L175 were not covered by tests
}
di.db.Close(ctx)
if err := di.server.Close(); err != nil {
log.FeedbackInfo(
ctx,
Expand Down Expand Up @@ -237,16 +216,16 @@
}

// init the p2p node
var n *net.Node
var node *net.Node

Check warning on line 219 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L219

Added line #L219 was not covered by tests
if !cfg.Net.P2PDisabled {
log.FeedbackInfo(ctx, "Starting P2P node", logging.NewKV("P2P address", cfg.Net.P2PAddress))
n, err = net.NewNode(
node, err = net.NewNode(

Check warning on line 222 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L222

Added line #L222 was not covered by tests
ctx,
db,
net.WithConfig(cfg),
)
if err != nil {
db.Close(ctx)
db.Close()

Check warning on line 228 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L228

Added line #L228 was not covered by tests
return nil, errors.Wrap("failed to start P2P node", err)
}

Expand All @@ -258,55 +237,13 @@
return nil, errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %v", cfg.Net.Peers), err)
}
log.Debug(ctx, "Bootstrapping with peers", logging.NewKV("Addresses", addrs))
n.Boostrap(addrs)
node.Bootstrap(addrs)

Check warning on line 240 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L240

Added line #L240 was not covered by tests
}

if err := n.Start(); err != nil {
if e := n.Close(); e != nil {
err = errors.Wrap(fmt.Sprintf("failed to close node: %v", e.Error()), err)
}
db.Close(ctx)
if err := node.Start(); err != nil {
node.Close()

Check warning on line 244 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L243-L244

Added lines #L243 - L244 were not covered by tests
return nil, errors.Wrap("failed to start P2P listeners", err)
}

MtcpAddr, err := ma.NewMultiaddr(cfg.Net.TCPAddress)
if err != nil {
return nil, errors.Wrap("failed to parse multiaddress", err)
}
addr, err := netutils.TCPAddrFromMultiAddr(MtcpAddr)
if err != nil {
return nil, errors.Wrap("failed to parse TCP address", err)
}

rpcTimeoutDuration, err := cfg.Net.RPCTimeoutDuration()
if err != nil {
return nil, errors.Wrap("failed to parse RPC timeout duration", err)
}

server := grpc.NewServer(
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
grpc_recovery.UnaryServerInterceptor(),
),
),
grpc.KeepaliveParams(
keepalive.ServerParameters{
MaxConnectionIdle: rpcTimeoutDuration,
},
),
)
tcplistener, err := gonet.Listen("tcp", addr)
if err != nil {
return nil, errors.Wrap(fmt.Sprintf("failed to listen on TCP address %v", addr), err)
}

go func() {
log.FeedbackInfo(ctx, "Started RPC server", logging.NewKV("Address", addr))
netpb.RegisterCollectionServer(server, n.Peer)
if err := server.Serve(tcplistener); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
log.FeedbackFatalE(ctx, "Failed to start RPC server", err)
}
}()
}

sOpt := []func(*httpapi.Server){
Expand All @@ -315,10 +252,6 @@
httpapi.WithAllowedOrigins(cfg.API.AllowedOrigins...),
}

if n != nil {
sOpt = append(sOpt, httpapi.WithPeerID(n.PeerID().String()))
}

if cfg.API.TLS {
sOpt = append(
sOpt,
Expand All @@ -328,32 +261,36 @@
)
}

s := httpapi.NewServer(db, sOpt...)
if err := s.Listen(ctx); err != nil {
return nil, errors.Wrap(fmt.Sprintf("failed to listen on TCP address %v", s.Addr), err)
var server *httpapi.Server
if node != nil {
server = httpapi.NewServer(node, sOpt...)
} else {
server = httpapi.NewServer(db, sOpt...)
}
if err := server.Listen(ctx); err != nil {
return nil, errors.Wrap(fmt.Sprintf("failed to listen on TCP address %v", server.Addr), err)

Check warning on line 271 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L264-L271

Added lines #L264 - L271 were not covered by tests
}
// save the address on the config in case the port number was set to random
cfg.API.Address = s.AssignedAddr()
cfg.API.Address = server.AssignedAddr()

Check warning on line 274 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L274

Added line #L274 was not covered by tests

// run the server in a separate goroutine
go func() {
log.FeedbackInfo(ctx, fmt.Sprintf("Providing HTTP API at %s.", cfg.API.AddressToURL()))
if err := s.Run(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
if err := server.Run(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {

Check warning on line 279 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L279

Added line #L279 was not covered by tests
log.FeedbackErrorE(ctx, "Failed to run the HTTP server", err)
if n != nil {
if err := n.Close(); err != nil {
log.FeedbackErrorE(ctx, "Failed to close node", err)
}
if node != nil {
node.Close()
} else {
db.Close()

Check warning on line 284 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L281-L284

Added lines #L281 - L284 were not covered by tests
}
db.Close(ctx)
os.Exit(1)
}
}()

return &defraInstance{
node: n,
node: node,

Check warning on line 291 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L291

Added line #L291 was not covered by tests
db: db,
server: s,
server: server,

Check warning on line 293 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L293

Added line #L293 was not covered by tests
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions cli/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ func mustGetStoreContext(cmd *cobra.Command) client.Store {
return cmd.Context().Value(storeContextKey).(client.Store)
}

// mustGetP2PContext returns the p2p implementation for the current command context.
//
// If a p2p implementation is not set in the current context this function panics.
func mustGetP2PContext(cmd *cobra.Command) client.P2P {
return cmd.Context().Value(dbContextKey).(client.P2P)
}

// tryGetCollectionContext returns the collection for the current command context
// and a boolean indicating if the collection was set.
func tryGetCollectionContext(cmd *cobra.Command) (client.Collection, bool) {
Expand Down
5 changes: 1 addition & 4 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type DB interface {
// be created after calling this to resume operations on the prior data - this is however dependant on
// the behaviour of the rootstore provided on database instance creation, as this function will Close
// the provided rootstore.
Close(context.Context)
Close()

// Events returns the database event queue.
//
Expand All @@ -82,9 +82,6 @@ type DB interface {

// Store contains the core DefraDB read-write operations.
type Store interface {
// P2P holds the P2P related methods that must be implemented by the database.
P2P

// Backup holds the backup related methods that must be implemented by the database.
Backup

Expand Down
8 changes: 8 additions & 0 deletions client/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@ package client

import (
"context"

"github.com/libp2p/go-libp2p/core/peer"
)

// P2P is a peer connected database implementation.
type P2P interface {
DB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: It took me a while to figure it out, but you have removed functionality here. client.P2P used to respect the transactionality of client.Store, but now it only works with implicit transactions.

I'm not sure we wish to lose this, and this change should be made very visible and open to discussion with the team.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed this with @fredcarle and forgot to make a note. We came to the conclusion that peer operations only make sense when done implicitly. It would be pretty difficult to have the transaction also rollback the peer connections that happen during these operations.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not losing functionality. We are removing the need for the Store interface to support P2P related actions. The functionality still exists but is handled all within the net package. However, whatever implements the P2P interface needs access to a db instance and thus support the DB interface. Action related to the P2P interface can still use transactions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We came to the conclusion that peer operations only make sense when done implicitly.

I do disagree with this, with the obvious example being during an update to a new application version - there may be several of these operations (plus other stuff, like schema updates etc) and if any one of those operations fails, they should all fail so that the update is not partially applied.

I will not push back against the both of you on this, but I do think explicit transactions here are useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fred and I spoke about this last week, currently (in develop), the P2P interface only looks like it supports explicit transactions - it doesn't actually respect them. This PR fixes this by making the interface reflect what is actually supported.

I'm happy with the change, it is better than what was before :)


// PeerInfo returns the p2p host id and listening addresses.
PeerInfo() peer.AddrInfo

// SetReplicator adds a replicator to the persisted list or adds
// schemas if the replicator already exists.
SetReplicator(ctx context.Context, rep Replicator) error
Expand Down
Loading