Skip to content

Commit

Permalink
If changelog topic partitions are only listed in the actives_tps set …
Browse files Browse the repository at this point in the history
…and not in standbys, then those partitions need to be resumed to ensure GlobalTable stays in sync for all workers
  • Loading branch information
lorinmetzger committed May 1, 2022
1 parent f52b783 commit 5587d8e
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,14 @@ async def _restart_recovery(self) -> None:
T(self.app.flow_control.resume)()
T(consumer.resume_flow)()
self._set_recovery_ended()

# The changelog partitions are only in the active_tps set need to be resumed
active_only_partitions = (active_tps - standby_tps)
if active_only_partitions:
T(consumer.resume_partitions)(active_only_partitions)
T(self.app.flow_control.resume)()
T(consumer.resume_flow)()

self.log.info("Recovery complete")
if span:
span.set_tag("Recovery-Completed", True)
Expand Down

0 comments on commit 5587d8e

Please sign in to comment.