Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embedded Ansible role worker #13551

Merged
merged 3 commits into from
Jan 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions app/models/embedded_ansible_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
class EmbeddedAnsibleWorker < MiqWorker
require_nested :Runner

self.required_roles = ['embedded_ansible']

def start_runner
self.class::Runner.start_worker(worker_options)
# TODO: return supervisord pid
end

def kill
# Does the base class's kill -9 work on the supervisord process as we want?
end

def status_update
# don't monitor the memory/cpu usage of this process yet
# If we don't have a pid of a process we want to monitor,super will catch an Errno::ESRCH and abort the worker
end

# Base class methods we override since we don't have a separate process. We might want to make these opt-in features in the base class that this subclass can choose to opt-out.
def release_db_connection; end
end
77 changes: 77 additions & 0 deletions app/models/embedded_ansible_worker/runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
class EmbeddedAnsibleWorker::Runner < MiqWorker::Runner
def prepare
update_embedded_ansible_manager

Thread.new do
setup_ansible
started_worker_record
end

self
end

# This thread runs forever until a stop request is received, which with send us to do_exit to exit our thread
def do_work_loop
Thread.new do
_log.info("waiting for ansible to start...")
loop do
# handle if the ansible setup blew up or timed out
break if worker.reload.started?
heartbeat
send(poll_method)
end

_log.info("entering ansible monitor loop")
loop do
heartbeat
do_work
send(poll_method)
end
end
end

def setup_ansible
_log.info("calling EmbeddedAnsible.configure")
EmbeddedAnsible.configure unless EmbeddedAnsible.configured?

_log.info("calling EmbeddedAnsible.start")
EmbeddedAnsible.start
_log.info("calling EmbeddedAnsible.start finished")
end

def do_work
if EmbeddedAnsible.running?
_log.info("#{log_prefix} supervisord is ok!")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE, drop this line before merging... it's helpful during testing...

else
_log.warn("#{log_prefix} supervisord is not running, restarting!")
EmbeddedAnsible.start
end
end

# Because we're running in a thread on the Server
# we need to intercept SystemExit and exit our thread,
# not the main server thread!
def do_exit(*args)
# ensure this doesn't fail or that we can still get to the super call
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this work?

ensure
  super
end

EmbeddedAnsible.disable
super
rescue SystemExit
_log.info("#{log_prefix} SystemExit received, exiting monitoring Thread")
Thread.exit
end

def update_embedded_ansible_manager
ansible = ManageIQ::Providers::EmbeddedAnsible::AutomationManager.first_or_initialize
server = MiqServer.my_server(true)
ansible.default_endpoint.url = URI::HTTPS.build(:host => server.hostname, :path => "/ansibleapi/v1")
ansible.name = "Embedded Ansible"
ansible.zone = server.zone
ansible.save!
end

# Base class methods we override since we don't have a separate process. We might want to make these opt-in features in the base class that this subclass can choose to opt-out.
def set_process_title; end
def set_connection_pool_size; end
def message_sync_active_roles(*_args); end
def message_sync_config(*_args); end
end
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ module MiqServer::WorkerManagement::Monitor::ClassNames
ManageIQ::Providers::StorageManager::CinderManager::EventCatcher
ManageIQ::Providers::Vmware::InfraManager::EventCatcher
ManageIQ::Providers::Vmware::CloudManager::EventCatcher
EmbeddedAnsibleWorker
MiqEventHandler
MiqGenericWorker
MiqNetappRefreshWorker
Expand Down Expand Up @@ -86,6 +87,7 @@ module MiqServer::WorkerManagement::Monitor::ClassNames
ManageIQ::Providers::Openstack::CloudManager::MetricsCollectorWorker
ManageIQ::Providers::Openstack::NetworkManager::MetricsCollectorWorker
ManageIQ::Providers::Openstack::InfraManager::MetricsCollectorWorker
EmbeddedAnsibleWorker
MiqReportingWorker
MiqSmartProxyWorker
MiqGenericWorker
Expand Down
12 changes: 10 additions & 2 deletions app/models/miq_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def self.close_pg_sockets_inherited_from_parent
end
end

def start
def start_runner
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i extracted the fork/process creation to a method so the subclass can override and NOT create a fork/process

self.class.before_fork
pid = fork(:cow_friendly => true) do
self.class.after_fork
Expand All @@ -345,7 +345,11 @@ def start
end

Process.detach(pid)
self.pid = pid
pid
end

def start
self.pid = start_runner
save

msg = "Worker started: ID [#{id}], PID [#{pid}], GUID [#{guid}]"
Expand Down Expand Up @@ -395,6 +399,10 @@ def is_stopped?
STATUSES_STOPPED.include?(status)
end

def started?
STATUS_STARTED == status
end

def actually_running?
MiqProcess.is_worker?(pid)
end
Expand Down
3 changes: 3 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,9 @@
:poll_method: :normal
:restart_interval: 0.hours
:starting_timeout: 10.minutes
:embedded_ansible_worker:
:poll: 10.seconds
:memory_threshold: 0.megabytes
:ems_refresh_core_worker:
:poll: 1.seconds
:nice_delta: 1
Expand Down
1 change: 1 addition & 0 deletions db/fixtures/server_roles.csv
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ name,description,max_concurrent,external_failover,role_scope
automate,Automation Engine,0,false,region
database_operations,Database Operations,0,false,region
database_owner,Database Owner,1,false,database
embedded_ansible,Embedded Ansible,1,false,region
ems_inventory,Provider Inventory,1,false,zone
ems_metrics_collector,Capacity & Utilization Data Collector,0,false,zone
ems_metrics_coordinator,Capacity & Utilization Coordinator,1,false,zone
Expand Down
4 changes: 4 additions & 0 deletions spec/factories/miq_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@
factory :ems_refresh_worker_amazon,
:parent => :miq_ems_refresh_worker,
:class => "ManageIQ::Providers::Amazon::CloudManager::RefreshWorker"

factory :embedded_ansible_worker,
:parent => :miq_worker,
:class => "EmbeddedAnsibleWorker"
end
40 changes: 40 additions & 0 deletions spec/models/embedded_ansible_worker/runner_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
describe EmbeddedAnsibleWorker::Runner do
context ".new" do
let(:miq_server) {
s = EvmSpecHelper.create_guid_miq_server_zone[1]
s.update(:hostname => "fancyserver")
s
}
let(:worker_guid) { MiqUUID.new_guid }
let(:worker) { FactoryGirl.create(:embedded_ansible_worker, :guid => worker_guid, :miq_server_id => miq_server.id) }
let(:runner) {
worker
allow_any_instance_of(described_class).to receive(:worker_initialization)
described_class.new(:guid => worker_guid)
}

context "#update_embedded_ansible_manager" do
it "creates initial" do
runner.update_embedded_ansible_manager

ansible = ManageIQ::Providers::EmbeddedAnsible::AutomationManager.first
expect(ansible.zone).to eq(miq_server.zone)
expect(ansible.default_endpoint.url).to eq("https://fancyserver/ansibleapi/v1")
end

it "updates existing" do
runner.update_embedded_ansible_manager
new_zone = FactoryGirl.create(:zone)
miq_server.update(:hostname => "boringserver", :zone => new_zone)

runner.update_embedded_ansible_manager
expect(ManageIQ::Providers::EmbeddedAnsible::AutomationManager.count).to eq(1)

ansible = ManageIQ::Providers::EmbeddedAnsible::AutomationManager.first
expect(ansible.zone).to eq(new_zone)
expect(ansible.default_endpoint.url).to eq("https://boringserver/ansibleapi/v1")
end
end
end
end