From 5587d8ee54e45c2ca979027c9c9d7beaa26146bc Mon Sep 17 00:00:00 2001 From: lmetzger Date: Sun, 1 May 2022 11:55:12 -0400 Subject: [PATCH 1/3] If changelog topic partitions are only listed in the actives_tps set and not in standbys, then those partitions need to be resumed to ensure GlobalTable stays in sync for all workers --- faust/tables/recovery.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 8e5ce888b..93dc1671c 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -470,6 +470,14 @@ async def _restart_recovery(self) -> None: T(self.app.flow_control.resume)() T(consumer.resume_flow)() self._set_recovery_ended() + + # The changelog partitions are only in the active_tps set need to be resumed + active_only_partitions = (active_tps - standby_tps) + if active_only_partitions: + T(consumer.resume_partitions)(active_only_partitions) + T(self.app.flow_control.resume)() + T(consumer.resume_flow)() + self.log.info("Recovery complete") if span: span.set_tag("Recovery-Completed", True) From 13fe389811ef74f8b6014169de09722f1841674a Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 15 Jul 2022 13:40:12 -0400 Subject: [PATCH 2/3] fix recovery linting --- faust/tables/recovery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index da78fcb47..37482d653 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -472,7 +472,7 @@ async def _restart_recovery(self) -> None: self._set_recovery_ended() # The changelog partitions are only in the active_tps set need to be resumed - active_only_partitions = (active_tps - standby_tps) + active_only_partitions = active_tps - standby_tps if active_only_partitions: T(consumer.resume_partitions)(active_only_partitions) T(self.app.flow_control.resume)() From 156b6364c1ecfb557250d4829de8a92857c2c9a7 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Mon, 18 Jul 2022 09:00:58 -0400 Subject: [PATCH 3/3] pass flake8 test --- faust/tables/recovery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 37482d653..1adbcf4b5 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -471,7 +471,7 @@ async def _restart_recovery(self) -> None: T(consumer.resume_flow)() self._set_recovery_ended() - # The changelog partitions are only in the active_tps set need to be resumed + # The changelog partitions only in the active_tps set need to be resumed active_only_partitions = active_tps - standby_tps if active_only_partitions: T(consumer.resume_partitions)(active_only_partitions)