-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add event driven workflows runner #102
Add event driven workflows runner #102
Conversation
904527f
to
36f4e3d
Compare
def runner | ||
@runner ||= new.tap(&:start) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE this ensures that this will work from the AutomationWorker as well as in development using simulate_queue_worker.
d02ea7d
to
720e212
Compare
return if workflows.key?(workflow.id) | ||
|
||
workflows[workflow.id] = [workflow, queue_args] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does workflows[] ||=
do the same thing with concurrent hashes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah ||= works I can switch to that
app/models/manageiq/providers/workflows/automation_manager/workflow_instance.rb
Show resolved
Hide resolved
720e212
to
7a0e8d8
Compare
Checked commit agrare@7a0e8d8 with ruby 3.1.5, rubocop 1.56.3, haml-lint 0.51.0, and yamllint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was expecting this file to live on the floe side (or at least everything expect the runner method itself - that part I expected to be passed in).
That is, I expected some sort of generic queueing / pulling from a queue / checking strategy in floe, and then the specifics about how to queue something would come from manageiq-providers-workflows via callbacks or lambdas or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So aside from the boilerplate thread management, almost all of what is here is dealing with ConfigurationScript
records and MiqQueue
. We could move more into Floe but we'd have to have callbacks for almost everything in the docker_wait method (currently it is basically just docker_runner.wait, find the ConfigurationScript
record, miq_queue.put).
app/models/manageiq/providers/workflows/automation_manager/workflow_instance.rb
Show resolved
Hide resolved
if wf.end? | ||
ManageIQ::Providers::Workflows::Runner.runner.delete_workflow(self) | ||
else | ||
deliver_on = wf.wait_until || 1.minute.from_now.utc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yay
The whole multiple events and keying off of workflow has me a little. I feel the purpose of this is to react to an event from Docker If you run 3 tasks for a workflow, then you get 3 "I am done" events out and kick the same workflow accordingly. I feel the entry should go into this cache when we kick off a task. Maybe this is where my idea of keying off of a |
offline: agrare doesn't want to store container_ref in the cache, it is too volatile and would prefer to store a workflow.id or something that is more stable |
Backported to
|
Add event driven workflows runner (cherry picked from commit 572047c)
Add a Workflows::Runner class which keeps a persistent thread waiting for events from Floe. Once an event is raised we try to link the event up to a workflow and we queue a step action if we find one.
If a workflow isn't found for example if it is already completed and removed from the list then we simply continue without queueing anything.
It is common to get multiple events for a single container run so this isn't a perfect "queue step when ready" and can be more efficient but currently this is erring on the side of caution (check step_ready on every event).
Also if the worker goes down and is restarted for any reason, the typical timer-based step is still on the queue and the running workflow will be added to the runner on the next timed check.
Depends on:
Fixes #109