Skip to content

Commit

Permalink
Load task modules when instantiating a VPollerWorker instance
Browse files Browse the repository at this point in the history
  • Loading branch information
dnaeon committed Jan 30, 2015
1 parent 4efc135 commit ef2668e
Showing 1 changed file with 48 additions and 14 deletions.
62 changes: 48 additions & 14 deletions src/vpoller/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

from vpoller import __version__
from vpoller.log import logger
from vpoller.client import validate_message
from vpoller.exceptions import VPollerException
from vpoller.task.registry import registry
from vconnector.core import VConnector
Expand Down Expand Up @@ -83,6 +84,7 @@ def __init__(self, config_file, num_workers=0):
'mgmt': 'tcp://*:10000',
'proxy': 'tcp://localhost:10123',
'helpers': None,
'tasks': None,
}

def start(self):
Expand Down Expand Up @@ -138,9 +140,13 @@ def load_config(self):
self.config['db'] = parser.get('worker', 'db')
self.config['proxy'] = parser.get('worker', 'proxy')
self.config['helpers'] = parser.get('worker', 'helpers')
self.config['tasks'] = parser.get('worker', 'tasks')

if self.config['helpers']:
self.config['helpers'] = self.config['helpers'].split(',')

if self.config['tasks']:
self.config['tasks'] = self.config['tasks'].split(',')

logger.debug(
'Worker Manager configuration: %s',
Expand All @@ -166,7 +172,8 @@ def start_workers(self):
worker = VPollerWorker(
db=self.config.get('db'),
proxy=self.config.get('proxy'),
helpers=self.config.get('helpers')
helpers=self.config.get('helpers'),
tasks=self.config.get('tasks')
)
worker.daemon = True
self.workers.append(worker)
Expand Down Expand Up @@ -271,6 +278,7 @@ def status(self):
'db': self.config.get('db'),
'concurrency': self.num_workers,
'helpers': self.config.get('helpers'),
'tasks': self.config.get('tasks'),
}
}

Expand All @@ -293,7 +301,7 @@ class VPollerWorker(multiprocessing.Process):
run() method
"""
def __init__(self, db, proxy, helpers):
def __init__(self, db, proxy, helpers, tasks):
"""
Initialize a new VPollerWorker object
Expand All @@ -302,15 +310,18 @@ def __init__(self, db, proxy, helpers):
proxy (str): Endpoint to which vPoller Workers connect
and receive new tasks for processing
helpers (list): A list of helper modules to be loaded
task (list): A list of task modules to be loaded
"""
super(VPollerWorker, self).__init__()
self.config = {
'db': db,
'proxy': proxy,
'helpers': helpers,
'tasks': tasks,
}
self.helpers = {}
self.task_modules = {}
self.helper_modules = {}
self.time_to_die = multiprocessing.Event()
self.agents = {}
self.zcontext = None
Expand All @@ -327,7 +338,8 @@ def run(self):
"""
logger.info('Worker process is starting')

self.load_helpers()
self.load_task_modules()
self.load_helper_modules()
self.create_sockets()
self.create_agents()

Expand Down Expand Up @@ -356,7 +368,31 @@ def signal_stop(self):
"""
self.time_to_die.set()

def load_helpers(self):
def load_task_modules(self):
"""
Loads the task modules
"""
if not self.config.get('tasks'):
raise VPollerException('No task modules provided')

for task in self.config.get('tasks'):
task = task.strip()
logger.info('Loading task module %s', task)
try:
module = importlib.import_module(task)
except ImportError as e:
logger.warning(
'Cannot import task module: %s',
e.message
)
continue
self.task_modules[task] = module

if not self.task_modules:
raise VPollerException('No task modules loaded')

def load_helper_modules(self):
"""
Loads helper modules for post-processing of results
Expand Down Expand Up @@ -394,7 +430,7 @@ def load_helpers(self):
'Helper module %s successfully loaded',
helper
)
self.helpers[helper] = module
self.helper_modules[helper] = module

def run_helper(self, helper, msg, data):
"""
Expand All @@ -411,7 +447,7 @@ def run_helper(self, helper, msg, data):
helper
)

module = self.helpers[helper]
module = self.helper_modules[helper]
h = module.HelperAgent(msg=msg, data=data)

try:
Expand All @@ -421,7 +457,7 @@ def run_helper(self, helper, msg, data):
return data

return result

def wait_for_tasks(self):
"""
Poll the worker socket for new tasks
Expand Down Expand Up @@ -456,7 +492,7 @@ def wait_for_tasks(self):
result = self.process_client_msg(msg)

# Process data using a helper before sending it to client?
if 'helper' in msg and msg['helper'] in self.helpers:
if 'helper' in msg and msg['helper'] in self.helper_modules:
data = self.run_helper(
helper=msg['helper'],
msg=msg,
Expand Down Expand Up @@ -592,11 +628,9 @@ def process_client_msg(self, msg):
if not agent:
return {'success': 1, 'msg': 'Unknown or missing agent name'}

#
#if not _validate_client_msg(msg, task.required):
# return {'success': 1, 'msg': 'Incorrect task request received'}
#
if not validate_message(msg=msg, required=task.required):
return {'success': 1, 'msg': 'Invalid task request'}

result = task.function(agent, msg)

return result

0 comments on commit ef2668e

Please sign in to comment.