Skip to content

Commit

Permalink
add completed events for pubsub and replicator
Browse files Browse the repository at this point in the history
  • Loading branch information
fredcarle committed Jun 25, 2024
1 parent e54c1ac commit e2af1e9
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
4 changes: 4 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ const (
PeerInfoName = Name("peer-info")
// ReplicatorName is the name of the replicator event.
ReplicatorName = Name("replicator")
// P2PTopicCompletedName is the name of the network p2p topic update completed event.
P2PTopicCompletedName = Name("p2p-topic-completed")
// ReplicatorCompletedName is the name of the replicator completed event.
ReplicatorCompletedName = Name("replicator-completed")
)

// Peer is an event that is published when
Expand Down
2 changes: 2 additions & 0 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (s *server) updatePubSubTopics(evt event.P2PTopic) {
log.ErrorContextE(s.peer.ctx, "Failed to remove pubsub topic.", err)
}
}
s.peer.bus.Publish(event.NewMessage(event.P2PTopicCompletedName, nil))
}

func (s *server) updateReplicators(evt event.Replicator) {
Expand Down Expand Up @@ -368,4 +369,5 @@ func (s *server) updateReplicators(evt event.Replicator) {
}
}
}
s.peer.bus.Publish(event.NewMessage(event.ReplicatorCompletedName, nil))
}
42 changes: 37 additions & 5 deletions tests/integration/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ func configureReplicator(
sourceNode := s.nodes[cfg.SourceNodeID]
targetNode := s.nodes[cfg.TargetNodeID]

err := sourceNode.SetReplicator(s.ctx, client.Replicator{
sub, err := sourceNode.Events().Subscribe(event.ReplicatorCompletedName)
require.NoError(s.t, err)
err = sourceNode.SetReplicator(s.ctx, client.Replicator{
Info: targetNode.PeerInfo(),
})

Expand All @@ -297,6 +299,11 @@ func configureReplicator(
if err == nil {
setupReplicatorWaitSync(s, 0, cfg)
}
for msg := range sub.Message() {
if msg.Name == event.ReplicatorCompletedName {
break
}
}
}

func deleteReplicator(
Expand All @@ -306,10 +313,17 @@ func deleteReplicator(
sourceNode := s.nodes[cfg.SourceNodeID]
targetNode := s.nodes[cfg.TargetNodeID]

err := sourceNode.DeleteReplicator(s.ctx, client.Replicator{
sub, err := sourceNode.Events().Subscribe(event.ReplicatorCompletedName)
require.NoError(s.t, err)
err = sourceNode.DeleteReplicator(s.ctx, client.Replicator{
Info: targetNode.PeerInfo(),
})
require.NoError(s.t, err)
for msg := range sub.Message() {
if msg.Name == event.ReplicatorCompletedName {
break
}
}
}

func setupReplicatorWaitSync(
Expand Down Expand Up @@ -390,14 +404,23 @@ func subscribeToCollection(
schemaRoots = append(schemaRoots, col.SchemaRoot())
}

err := n.AddP2PCollections(s.ctx, schemaRoots)
sub, err := n.Events().Subscribe(event.P2PTopicCompletedName)
require.NoError(s.t, err)

err = n.AddP2PCollections(s.ctx, schemaRoots)
expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError)
assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised)

for msg := range sub.Message() {
if msg.Name == event.P2PTopicCompletedName {
break
}
}

// The `n.Peer.AddP2PCollections(colIDs)` call above is calling some asynchronous functions
// for the pubsub subscription and those functions can take a bit of time to complete,
// we need to make sure this has finished before progressing.
time.Sleep(100 * time.Millisecond)
time.Sleep(1 * time.Millisecond)
}

// unsubscribeToCollection removes the given collections from subscriptions on the given nodes.
Expand All @@ -420,10 +443,19 @@ func unsubscribeToCollection(
schemaRoots = append(schemaRoots, col.SchemaRoot())
}

err := n.RemoveP2PCollections(s.ctx, schemaRoots)
sub, err := n.Events().Subscribe(event.P2PTopicCompletedName)
require.NoError(s.t, err)

err = n.RemoveP2PCollections(s.ctx, schemaRoots)
expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError)
assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised)

for msg := range sub.Message() {
if msg.Name == event.P2PTopicCompletedName {
break
}
}

// The `n.Peer.RemoveP2PCollections(colIDs)` call above is calling some asynchronous functions
// for the pubsub subscription and those functions can take a bit of time to complete,
// we need to make sure this has finished before progressing.
Expand Down

0 comments on commit e2af1e9

Please sign in to comment.