diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index f54afbca8..1adbcf4b5 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -470,6 +470,14 @@ async def _restart_recovery(self) -> None: T(self.app.flow_control.resume)() T(consumer.resume_flow)() self._set_recovery_ended() + + # The changelog partitions only in the active_tps set need to be resumed + active_only_partitions = active_tps - standby_tps + if active_only_partitions: + T(consumer.resume_partitions)(active_only_partitions) + T(self.app.flow_control.resume)() + T(consumer.resume_flow)() + self.log.info("Recovery complete") if span: span.set_tag("Recovery-Completed", True)