Skip to content

Commit

Permalink
feat: Add ability for P2P to wait for pushlog by peer (sourcenetwork#…
Browse files Browse the repository at this point in the history
…1098)

Relevant issue(s)
Resolves sourcenetwork#1097

Description
This PR splits the WaitForPushLogEvent function into two distinct versions. We can now wait for a log create by a specific peer or sent by an intermediary peer.
  • Loading branch information
fredcarle authored Feb 14, 2023
1 parent 709a73c commit 66675f2
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 111 deletions.
1 change: 1 addition & 0 deletions net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (s *server) pushLog(ctx context.Context, evt events.Update, pid peer.ID) er
DocKey: &pb.ProtoDocKey{DocKey: dockey},
Cid: &pb.ProtoCid{Cid: evt.Cid},
SchemaID: []byte(evt.SchemaID),
Creator: s.peer.host.ID().String(),
Log: &pb.Document_Log{
Block: evt.Block.RawData(),
},
Expand Down
207 changes: 110 additions & 97 deletions net/pb/net.pb.go

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions net/pb/net.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ message PushLogRequest {
Body body = 1;

message Body {
// docKey is the target DocKey.
// docKey is the DocKey of the document that is affected by the log.
bytes docKey = 1 [(gogoproto.customtype) = "ProtoDocKey"];
// cid is the target CID.
// cid is the CID of the composite of the document.
bytes cid = 2 [(gogoproto.customtype) = "ProtoCid"];
//
// schemaID is the SchemaID of the collection that the document resides in.
bytes schemaID = 3;
// record is the actual record payload.
Document.Log log = 4;
// creator is the peer ID of the peer that created the log.
string creator = 4;
// log hold the block that represent version of the document.
Document.Log log = 5;
}
}

Expand Down
5 changes: 4 additions & 1 deletion net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (p *Peer) RegisterNewDocument(
DocKey: &pb.ProtoDocKey{DocKey: dockey},
Cid: &pb.ProtoCid{Cid: c},
SchemaID: []byte(schemaID),
Creator: p.host.ID().String(),
Log: &pb.Document_Log{
Block: nd.RawData(),
},
Expand Down Expand Up @@ -602,6 +603,7 @@ func (p *Peer) handleDocUpdateLog(evt events.Update) error {
DocKey: &pb.ProtoDocKey{DocKey: dockey},
Cid: &pb.ProtoCid{Cid: evt.Cid},
SchemaID: []byte(evt.SchemaID),
Creator: p.host.ID().String(),
Log: &pb.Document_Log{
Block: evt.Block.RawData(),
},
Expand Down Expand Up @@ -697,7 +699,8 @@ func stopGRPCServer(ctx context.Context, server *grpc.Server) {
}

type EvtReceivedPushLog struct {
Peer peer.ID
ByPeer peer.ID
FromPeer peer.ID
}

type EvtPubSub struct {
Expand Down
7 changes: 6 additions & 1 deletion net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,13 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
}

if s.pushLogEmitter != nil {
byPeer, err := libpeer.Decode(req.Body.Creator)
if err != nil {
log.Info(ctx, "could not decode the peer id of the log creator", logging.NewKV("Error", err.Error()))
}
err = s.pushLogEmitter.Emit(EvtReceivedPushLog{
Peer: pid,
FromPeer: pid,
ByPeer: byPeer,
})
if err != nil {
// logging instead of returning an error because the event bus should
Expand Down
31 changes: 27 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,16 +311,39 @@ func (n *Node) WaitForPubSubEvent(id peer.ID) error {
}
}

// WaitForPushLogEvent listens to the event channel for a push log event from a given peer.
// WaitForPushLogByPeerEvent listens to the event channel for a push log event by a given peer.
//
// By refers to the log creator. It can be different than the log sender.
//
// It will block the calling thread until an event is yielded to an internal channel. This
// event is not nessecarily the next event and is dependent on the number of concurrent callers
// event is not necessarily the next event and is dependent on the number of concurrent callers
// (each event will only notify a single caller, not all of them).
func (n *Node) WaitForPushLogEvent(id peer.ID) error {
func (n *Node) WaitForPushLogByPeerEvent(id peer.ID) error {
for {
select {
case evt := <-n.pushLogEvent:
if evt.Peer != id {
if evt.ByPeer != id {
continue
}
return nil
case <-time.After(evtWaitTimeout):
return errors.New("waiting for pushlog timed out")
}
}
}

// WaitForPushLogFromPeerEvent listens to the event channel for a push log event from a given peer.
//
// From refers to the log sender. It can be different that the log creator.
//
// It will block the calling thread until an event is yielded to an internal channel. This
// event is not necessarily the next event and is dependent on the number of concurrent callers
// (each event will only notify a single caller, not all of them).
func (n *Node) WaitForPushLogFromPeerEvent(id peer.ID) error {
for {
select {
case evt := <-n.pushLogEvent:
if evt.FromPeer != id {
continue
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/net/order/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func executeTestCase(t *testing.T, test P2PTestCase) {
continue
}
log.Info(ctx, fmt.Sprintf("Waiting for node %d to sync with peer %d", n2, n))
err := p.WaitForPushLogEvent(nodes[n].PeerID())
err := p.WaitForPushLogByPeerEvent(nodes[n].PeerID())
require.NoError(t, err)
log.Info(ctx, fmt.Sprintf("Node %d synced", n2))
}
Expand Down Expand Up @@ -321,7 +321,7 @@ func executeTestCase(t *testing.T, test P2PTestCase) {
}
for _, rep := range reps {
log.Info(ctx, fmt.Sprintf("Waiting for node %d to sync with peer %d", rep, n))
err := nodes[rep].WaitForPushLogEvent(nodes[n].PeerID())
err := nodes[rep].WaitForPushLogByPeerEvent(nodes[n].PeerID())
require.NoError(t, err)
log.Info(ctx, fmt.Sprintf("Node %d synced", rep))

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/net/state/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func waitForNodesToSync(
wg *sync.WaitGroup,
) {
log.Info(ctx, fmt.Sprintf("Waiting for node %d to sync with peer %d", targetIndex, sourceIndex))
err := nodes[targetIndex].WaitForPushLogEvent(nodes[sourceIndex].PeerID())
err := nodes[targetIndex].WaitForPushLogByPeerEvent(nodes[sourceIndex].PeerID())
// This must be an assert and not a require, a panic here will block the test as
// the wait group will never complete.
assert.NoError(t, err)
Expand Down

0 comments on commit 66675f2

Please sign in to comment.