Skip to content

Commit

Permalink
Add Workflows::Runner which uses docker events
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Aug 15, 2024
1 parent 3ab58b2 commit 36f4e3d
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ def queue_callback(state, message, _result)
end

def run(args = {})
zone, role, object_type, object_id = args.values_at(:zone, :role, :object_type, :object_id)
queue_args = args.slice(:zone, :role, :object_type, :object_id)
zone, role, object_type, object_id = queue_args.values_at(:zone, :role, :object_type, :object_id)

ManageIQ::Providers::Workflows::Runner.runner.add_workflow(self, queue_args)

object = object_type.constantize.find_by(:id => object_id) if object_type && object_id
object.before_ae_starts({}) if object.present? && object.respond_to?(:before_ae_starts)

creds = resolved_credentials
wf = Floe::Workflow.new(payload, context, creds)
wf = Floe::Workflow.new(payload, context, resolved_credentials)
wf.run_nonblock
update_credentials!(wf.credentials)

Expand All @@ -76,7 +78,12 @@ def run(args = {})
object.after_ae_delivery(ae_result)
end

run_queue(:zone => zone, :role => role, :object => object, :deliver_on => 10.seconds.from_now.utc, :server_guid => MiqServer.my_server.guid) unless wf.end?
if wf.end?
ManageIQ::Providers::Workflows::Runner.runner.delete_workflow(self)
else
deliver_on = wf.wait_until || 1.minute.from_now.utc
run_queue(:zone => zone, :role => role, :object => object, :deliver_on => deliver_on, :server_guid => MiqServer.my_server.guid)
end
end

private
Expand Down
4 changes: 4 additions & 0 deletions lib/manageiq/providers/workflows/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def self.seedable_classes
]
end

def self.automation_runners
[ManageIQ::Providers::Workflows::Runner]
end

def self.floe_runner_name
if (runner_setting = Settings.ems.ems_workflows.runner.presence)
runner_setting
Expand Down
91 changes: 91 additions & 0 deletions lib/manageiq/providers/workflows/runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
module ManageIQ
module Providers
module Workflows
class Runner
include Vmdb::Logging

class << self
def runner
@runner ||= new.tap(&:start)
end
end

attr_reader :workflows

def initialize
require "floe"
require "concurrent/hash"

@workflows = Concurrent::Hash.new
@docker_wait_thread = nil
end

def start
$workflows_log.debug("Runner: Starting workflows runner...")
self.docker_wait_thread = Thread.new { docker_wait }
$workflows_log.debug("Runner: Starting workflows runner...Complete")
end

def stop
$workflows_log.debug("Runner: Stopping workflows runner...")
stop_thread(docker_wait_thread)

self.docker_wait_thread = nil
$workflows_log.debug("Runner: Stopping workflows runner...Complete")
end

def add_workflow(workflow, queue_args)
return if workflows.key?(workflow.id)

workflows[workflow.id] = [workflow, queue_args]
end

def delete_workflow(workflow)
workflows.delete(workflow.id)
end

private

attr_accessor :docker_wait_thread

def docker_wait
loop do
docker_runner = Floe::Runner.for_resource("docker")
docker_runner.wait do |event, runner_context|
$workflows_log.info("Runner: Caught event [#{event}] for container [#{runner_context["container_ref"]}]")

workflow_id = workflow_by_runner_context(runner_context)
next if workflow_id.nil?

workflow, queue_args = workflows[workflow_id]
next if workflow.nil?

$workflows_log.info("Runner: Queueing update for WorkflowInstance ID: [#{workflow_id}]")

workflow.run_queue(**queue_args)
end
rescue => err
$workflows_log.warn("Error: [#{err}]")
$workflows_log.log_backtrace(err)
end
end

def stop_thread(thread)
return if thread.nil?

thread.kill
thread.join(0)
end

def workflow_by_runner_context(runner_context)
workflows.each do |id, (workflow, _)|
context = workflow.reload.context
container_ref = context.dig("State", "RunnerContext", "container_ref")

return id if container_ref == runner_context["container_ref"]
end
end
end
end
end
end
32 changes: 32 additions & 0 deletions spec/lib/manageiq/providers/workflows/runner_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
RSpec.describe ManageIQ::Providers::Workflows::Runner do
require "floe"

let(:subject) { described_class.new }
let(:workflow) { FactoryBot.create(:workflows_automation_workflow_instance) }
let(:queue_args) { {:role => "automate"} }

describe ".add_workflow" do
it "adds the workflow to the workflows hash" do
subject.add_workflow(workflow, queue_args)
expect(subject.workflows.count).to eq(1)
expect(subject.workflows[workflow.id]).to eq([workflow, queue_args])
end
end

describe ".delete_workflow" do
context "with nothing in #workflows" do
it "doesn't throw an exception" do
expect(subject.delete_workflow(workflow)).to be_nil
end
end

context "with a workflow in #workflows" do
before { subject.add_workflow(workflow, queue_args) }

it "deletes the workflow from #workflows" do
subject.delete_workflow(workflow)
expect(subject.workflows).to be_empty
end
end
end
end

0 comments on commit 36f4e3d

Please sign in to comment.