Skip to content

Commit

Permalink
If changelog topic partitions are only listed in the actives_tps set … (
Browse files Browse the repository at this point in the history
#325)

* If changelog topic partitions are only listed in the actives_tps set and not in standbys, then those partitions need to be resumed to ensure GlobalTable stays in sync for all workers

* fix recovery linting

* pass flake8 test

Co-authored-by: lmetzger <[email protected]>
Co-authored-by: Vikram Patki <[email protected]>
  • Loading branch information
3 people authored Jul 19, 2022
1 parent 1e9d4a5 commit c4f5b18
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,14 @@ async def _restart_recovery(self) -> None:
T(self.app.flow_control.resume)()
T(consumer.resume_flow)()
self._set_recovery_ended()

# The changelog partitions only in the active_tps set need to be resumed
active_only_partitions = active_tps - standby_tps
if active_only_partitions:
T(consumer.resume_partitions)(active_only_partitions)
T(self.app.flow_control.resume)()
T(consumer.resume_flow)()

self.log.info("Recovery complete")
if span:
span.set_tag("Recovery-Completed", True)
Expand Down

0 comments on commit c4f5b18

Please sign in to comment.