Skip to content

Commit

Permalink
Move vPoller Helpers from client applications to the vPoller Worker
Browse files Browse the repository at this point in the history
* Will allow helper modules to be used by different clients as the
  helper modules are no longer client-side specific
  • Loading branch information
dnaeon committed Sep 9, 2014
1 parent 0c32a03 commit ca6b0d4
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 57 deletions.
68 changes: 17 additions & 51 deletions src/vpoller-client
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,10 @@ discovering and polling of vSphere Object properties.

import json
import logging
import importlib
from sys import exit

from vpoller.client import VPollerClient
from docopt import docopt

def vpoller_helpers(helper, msg, data):
"""
vPoller Helpers
Dynamically loads a vPoller helper for post-processing
any result messages. Useful when you need to return a
result in other formats to be used by other tools/systems.
An example of such a helper is the Zabbix helper, which
takes care of returning result in Zabbix-friendly format.
Args:
helper (str): The helper module name
msg (dict): The original request message
data (str): The result message from the request
"""
try:
helper_module = importlib.import_module(helper)
except ImportError as e:
return 'Cannot import helper module: %s' % e

agent = helper_module.HelperAgent(msg, data)
result = agent.run()

return result

def main():

usage="""
Expand Down Expand Up @@ -108,40 +79,35 @@ Examples:
level=level
)

# List of available helpers
helpers = ['vpoller.helpers.zabbix', 'vpoller.helpers.csvhelper']

client = VPollerClient(
endpoint=args["--endpoint"],
retries=int(args["--retries"]),
timeout=int(args["--timeout"])
)

# Message we send out to workers
# Message we send out for processing
msg = {
'method': args['--method'],
'hostname': args['--vsphere-host'],
'name': args['--name'],
'username': args['--guest-username'],
'password': args['--guest-password'],
'key': args['--key'],
'method': args['--method'],
'hostname': args['--vsphere-host'],
'name': args['--name'],
'username': args['--guest-username'],
'password': args['--guest-password'],
'key': args['--key'],
'properties': args['--properties'].split(',') if args['--properties'] else None,
}
'helper': args['--helper'],
}

# Get the result and exit code
data = client.run(msg)
rc = data['success']

# Do we use a helper module?
if args['--helper'] and args['--helper'] in helpers:
result = vpoller_helpers(args['--helper'], msg, data)
if isinstance(data, dict):
result = json.dumps(
data,
indent=4,
ensure_ascii=False
)
else:
result = json.dumps(data, indent=4)

print result
result = data

exit(rc)
print result

if __name__ == '__main__':
main()

102 changes: 96 additions & 6 deletions src/vpoller/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"""

import logging
import importlib
import multiprocessing
from platform import node
from ConfigParser import ConfigParser
Expand Down Expand Up @@ -70,6 +71,7 @@ def __init__(self, config_file, num_workers=0):
'db': '/var/lib/vconnector/vconnector.db',
'mgmt': 'tcp://*:10000',
'proxy': 'tcp://localhost:10123',
'helpers': None,
}

def start(self):
Expand Down Expand Up @@ -124,6 +126,10 @@ def load_config(self):
self.config['mgmt'] = parser.get('worker', 'mgmt')
self.config['db'] = parser.get('worker', 'db')
self.config['proxy'] = parser.get('worker', 'proxy')
self.config['helpers'] = parser.get('worker', 'helpers')

if self.config['helpers']:
self.config['helpers'] = self.config['helpers'].split(',')

logging.debug(
'Worker Manager configuration: %s',
Expand All @@ -148,7 +154,8 @@ def start_workers(self):
for i in xrange(self.num_workers):
worker = VPollerWorker(
db=self.config.get('db'),
proxy=self.config.get('proxy')
proxy=self.config.get('proxy'),
helpers=self.config.get('helpers')
)
worker.daemon = True
self.workers.append(worker)
Expand Down Expand Up @@ -251,7 +258,8 @@ def status(self):
'proxy': self.config.get('proxy'),
'mgmt': self.config.get('mgmt'),
'db': self.config.get('db'),
'concurrency': len(self.workers),
'concurrency': self.num_workers,
'helpers': self.config.get('helpers'),
}
}

Expand All @@ -274,21 +282,24 @@ class VPollerWorker(multiprocessing.Process):
run() method
"""
def __init__(self, db, proxy):
def __init__(self, db, proxy, helpers):
"""
Initialize a new VPollerWorker object
Args:
db (str): Path to the vConnector database file
proxy (str): Endpoint to which vPoller Workers connect
and receive new tasks for processing
db (str): Path to the vConnector database file
proxy (str): Endpoint to which vPoller Workers connect
and receive new tasks for processing
helpers (list): A list of helper modules to be loaded
"""
super(VPollerWorker, self).__init__()
self.config = {
'db': db,
'proxy': proxy,
'helpers': helpers,
}
self.helpers = {}
self.time_to_die = multiprocessing.Event()
self.agents = {}
self.zcontext = None
Expand All @@ -305,6 +316,7 @@ def run(self):
"""
logging.info('Worker process is starting')

self.load_helpers()
self.create_sockets()
self.create_agents()

Expand Down Expand Up @@ -333,6 +345,74 @@ def signal_stop(self):
"""
self.time_to_die.set()

def load_helpers(self):
"""
Loads helper modules for post-processing of results
"""
if not self.config.get('helpers'):
return

for helper in self.config.get('helpers'):
logging.info('Loading helper module %s', helper)
try:
module = importlib.import_module(helper)
except ImportError as e:
logging.warning(
'Cannot import helper module: %s',
e
)
continue

if not hasattr(module, 'HelperAgent'):
logging.warning(
'Module %s does not provide a HelperAgent interface',
helper
)
continue

if not hasattr(module.HelperAgent, 'run'):
logging.warning(
'In module %s HelperAgent class does not provide a run() method',
helper
)
continue

logging.info(
'Helper module %s successfully loaded',
helper
)
self.helpers[helper] = module

def run_helper(self, helper, msg, data):
"""
Run a helper to post-process result data
Args:
helper (str): Name of the helper to run
msg (dict): The original message request
data (dict): The data to be processed
"""
if helper not in self.helpers:
return data

logging.debug(
'Invoking helper module %s for processing of data',
helper
)

module = self.helpers[helper]
h = module.HelperAgent(msg=msg, data=data)

try:
result = h.run()
except Exception as e:
logging.warning('Helper module raised an exception: %s', e)
return data

return result

def wait_for_tasks(self):
"""
Poll the worker socket for new tasks
Expand Down Expand Up @@ -365,6 +445,16 @@ def wait_for_tasks(self):

# Process task and return result to client
result = self.process_client_msg(msg)

# Process data using a helper before sending it to client?
if 'helper' in msg:
r = self.run_helper(
helper=msg['helper'],
msg=msg,
data=result
)
result = r

self.worker_socket.send(_id, zmq.SNDMORE)
self.worker_socket.send(_empty, zmq.SNDMORE)
try:
Expand Down

0 comments on commit ca6b0d4

Please sign in to comment.