-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Improve P2P event handling #1388
Conversation
@@ -140,6 +140,9 @@ func connectPeers( | |||
nodes []*node.Node, | |||
addresses []string, | |||
) chan struct{} { | |||
// If we have some database actions prior to connecting the peers, we want to ensure that they had time to | |||
// complete before we connect. Otherwise we might wrongly catch them in our wait function. | |||
time.Sleep(100 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
praise: Nice spot - I didn't notice this looking through your wip branch earlier :)
Codecov Report
@@ Coverage Diff @@
## develop #1388 +/- ##
===========================================
- Coverage 70.76% 70.65% -0.11%
===========================================
Files 185 185
Lines 17884 17934 +50
===========================================
+ Hits 12655 12671 +16
- Misses 4279 4305 +26
- Partials 950 958 +8
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really great work Fred - thanks a whole bunch for sorting these out :)
Left a couple of small comments which would be good to resolve before merge.
@@ -629,10 +627,16 @@ func (p *Peer) handleDocCreateLog(evt events.Update) error { | |||
return errors.Wrap("failed to get DocKey from broadcast message", err) | |||
} | |||
|
|||
// We need to register the document before pushing to the replicators if we want to | |||
// ensure that we have subscribed to the topic. | |||
err = p.RegisterNewDocument(p.ctx, dockey, evt.Cid, evt.Block, evt.SchemaID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: This change is not noted in the description, is it still needed, or is this left here accidentally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's still needed for the situation where the target would receive the push to replicator and then the target getting an update before the source registers to the topic. This probably would never happen in real life but it did happen in the tests.
@@ -804,6 +808,30 @@ func (p *Peer) AddP2PCollections(collections []string) error { | |||
addedTopics = append(addedTopics, col) | |||
} | |||
|
|||
// If adding the collection topics succeeds, we remove the collections' documents | |||
// from the pubsub topics to avoid receiving duplicate events. | |||
for _, col := range collections { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: This is potentially very expensive - how long do you expect this code to live? It might be worth noting this in the public documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment I can't think of a better way to handle this. It would only be expensive if a dev decides to subscribe to a given collection after it has millions of documents. It's also a one-off event if someone was to do it so I don't think we need to worry about speed too much. Not at this time anyways.
I can add a warning to the doc though.
return err | ||
} | ||
for key := range keyChan { | ||
err := p.server.removePubSubTopic(key.Key.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: This will not be rolled back if the txn commit fails, but it should be. I do not see this as a new issue however (just expanded), as the addedTopics
block also suffers from this. Strongly suggest creating a ticket to roll this back properly on commit error (might already be one, I'm not sure though).
Same issue exists in RemoveP2PCollections
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ticket opened 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cheers :)
if err != nil { | ||
log.Info(ctx, "could not decode the peer id of the log creator", logging.NewKV("Error", err.Error())) | ||
} | ||
err = s.pushLogEmitter.Emit(EvtReceivedPushLog{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
praise: I like that this is in a defer now
@@ -307,7 +317,7 @@ func configureReplicator( | |||
docIDsSyncedToSource[currentdocID] = struct{}{} | |||
} | |||
|
|||
if action.NodeID.HasValue() && action.NodeID.Value() == cfg.SourceNodeID { | |||
if !action.NodeID.HasValue() || action.NodeID.Value() == cfg.SourceNodeID { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Are you sure this is correct?
If !action.NodeID.HasValue()
then the document will be created synchronously and locally by the test framework in all nodes. Does the P2P system still transmit sync events if they already exist on the node?
todo: If this is correct, can you please add a comment documenting this :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the P2P system still transmit sync events if they already exist on the node?
Yes. The Replicator doesn't know that the target has the document already so it still sends the created document which will cause the target to send a received push log
event.
953bd35
to
93b74ce
Compare
Description This PR seems to have fixed all known flakyness from the P2P integration tests. It does so at different levels: - We prevent pre peer connection and pre replicator configuration actions from having their events received by the wait function by adding a 100ms sleep call pre setup. - We ensure that the wait go routines are ready before continuing with the actions. - We add a docQueue to the PushLog function so that we don't handle multiple updates to the same document at once. This helps avoid unnecessary transaction conflicts. This would also sometimes cause the PushLog function to hang and the RPC call hitting a timeout. - We handle pubsub topic subscriptions a little better by subscribing only to the collection OR the documents. Subscribing to documents and the collection would cause double the event handling.
Relevant issue(s)
Resolves #1334
Resolves #1335
Resolves #1336
Resolves #1337
Resolves #1338
Resolves #1375
Description
This PR seems to have fixed all known flakyness from the P2P integration tests. It does so at different levels:
Tasks
How has this been tested?
Integration test (a 6 hour run without errors)
Specify the platform(s) on which this was tested: