-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENG-700 #377
base: main
Are you sure you want to change the base?
ENG-700 #377
Conversation
ekalosak
commented
Dec 14, 2021
- made servo issue a startup event when a run gets cancelled;
- made the PrometheusConnector.startup() idempotent
…PrometheusConnector.startup() idempotent
ENG-700 Don't hang on cancellation
This issue is mainly about the failure of the HPA+'s ServoX Connector failing to recover after the servo processes a However, because debugging this issue requires generating Abort failure replication instructions
Replication instructions
ObjectiveDon't hang on cleaning up HPA+ from the servo when Cancellation event occurs. Follow-on issues |
…ould have multiple sources
…line developer docs
… servo attribute is not typed as Optional
…opsani/servox into ek/issue-startup-after-run-cancellation
def run_main_loop(self) -> None: | ||
if self._main_loop_task: | ||
self._main_loop_task.cancel() | ||
loop = asyncio.get_event_loop() | ||
loop.create_task(self.servo.dispatch_event(servo.Events.startup)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def run_main_loop(self) -> None: | |
if self._main_loop_task: | |
self._main_loop_task.cancel() | |
loop = asyncio.get_event_loop() | |
loop.create_task(self.servo.dispatch_event(servo.Events.startup)) | |
async def run_main_loop(self) -> None: | |
if self._main_loop_task: | |
self._main_loop_task.cancel() | |
await self.servo.startup() | |
self.logger.info( | |
f"Servo started with {len(self.servo.connectors)} active connectors [{self.optimizer.id} @ {self.optimizer.url or self.optimizer.base_url}]" | |
) |
I agree we should update the startup/shutdown lifecycle given the implementation of cancel responses. However, we need to update a few more places for the sake of completeness:
- We're moving this startup into
run_main_loop()
(delete the old one):Line 257 in 6ce0cbc
await self.servo.startup() - await run_main_loop now that its async:
- shutdown the servo during cancellation handling:
Lines 373 to 404 in 6ce0cbc
if isinstance(error, (servo.errors.UnexpectedEventError, servo.errors.EventCancelledError)): if isinstance(error, servo.errors.UnexpectedEventError): self.logger.error( "servo has lost synchronization with the optimizer: restarting" ) elif isinstance(error, servo.errors.EventCancelledError): self.logger.error( "optimizer has cancelled operation in progress: cancelling and restarting loop" ) # Post a status to resolve the operation operation = progress['operation'] status = servo.api.Status.from_error(error) self.logger.error(f"Responding with {status.dict()}") runner = self._runner_for_servo(servo.current_servo()) await runner._post_event(operation, status.dict()) tasks = [ t for t in asyncio.all_tasks() if t is not asyncio.current_task() ] self.logger.info(f"Cancelling {len(tasks)} outstanding tasks") [self.logger.trace(f"\t{task.get_name()}") for task in tasks] [task.cancel() for task in tasks] await asyncio.gather(*tasks, return_exceptions=True) self.logger.trace("Cancelled tasks:") [self.logger.trace(f"\t{task.get_name()} {'cancelled' if task.cancelled() else 'not cancelled'}") for task in tasks] # Restart a fresh main loop if poll: runner = self._runner_for_servo(servo.current_servo()) runner.run_main_loop() Lines 518 to 527 in 6ce0cbc
# Shut down the servo runners, breaking active control loops if len(self.runners) == 1: self.logger.info(f"Shutting down servo...") else: self.logger.info(f"Shutting down {len(self.runners)} running servos...") for fut in asyncio.as_completed(list(map(lambda r: r.shutdown(reason=reason), self.runners)), timeout=30.0): try: await fut except Exception as error: self.logger.critical(f"Failed servo runner shutdown with error: {error}")
What are we doing with this? Seems pretty reasonable... Except I don't love that each connector has to be aware of being potentially restarted and guard against it. Seems like a lifecycle teardown of the channel would keep it more straightforward or even a blanket teardown of all channels. It shouldn't be the connector's responsibility to handle this state. It would have to be replicated everywhere |
Looking into the relevant internal project mgmt, it would seem @ekalosak reached a similar conclusion in that this should be handled externally. I'm stilling looking into it to see if it has impact on new startup behavior for the k8s connector |