Skip to content

Commit

Permalink
First cut of the multiprocessing support for vPoller
Browse files Browse the repository at this point in the history
  • Loading branch information
dnaeon committed Sep 3, 2014
1 parent 1fcab98 commit aa54ced
Showing 1 changed file with 121 additions and 1 deletion.
122 changes: 121 additions & 1 deletion src/vpoller/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import types
import logging
import ConfigParser
import multiprocessing
from time import time, asctime

import zmq
Expand All @@ -39,7 +40,126 @@
from vpoller.daemon import Daemon
from vconnector.core import VConnectorDatabase

class VPollerWorker(Daemon):
class VPollerWorkerManager(object):
"""
Manager of vPoller Workers
"""
def __init__(self, config_file, num_workers=0):
"""
Initializes a new vPoller Worker Manager
Args:
config_file (str): Path to the vPoller configuration file
num_workers (str): Number of vPoller Worker processes to create
"""
self.config_file = config_file
self.num_workers = num_workers
self.time_to_die = multiprocessing.Event()
self.config = {}
self.workers = []
self.zcontenxt = None
self.zpoller = None
self.mgmt_socket = None
self.config_defaults = {
'mgmt': 'tcp://*:10000',
}

def start(self):
"""
Start the vPoller Worker Manager and processes
"""
logging.info('Starting up vPoller Worker Manager')

self.load_config()
self.create_sockets()
self.start_workers()

while not self.time_to_die.is_set():
self.process_mgmt_msg()

self.stop()

def stop(self):
self.close_sockets()
self.stop_workers()

def load_config(self):
"""
Loads the vPoller Worker Manager configuration settings
Raises:
VPollerException
"""
logging.debug('Loading config file %s', self.config_file)

parser = ConfigParser.ConfigParser(self.config_defaults)
parser.read(self.config_file)

self.config['mgmt'] = parser.get('worker:manager', 'mgmt')

def start_workers(self):
"""
Start the vPoller Worker processes
"""
logging.info('Starting up vPoller Worker processes')

if self.num_workers <= 0:
self.num_workers = multiprocessing.cpu_count()

for i in xrange(self.num_workers):
worker = VPollerWorker(config=self.config_file)
worker.daemon = True
self.workers.append(worker)
worker.start()

def stop_workers(self):
"""
Stop the vPoller Worker processes
"""
logging.info('Stopping vPoller Worker processes')

for worker in self.workers:
worker.worker_shutdown()
worker.join()

def create_sockets(self):
"""
Creates the ZeroMQ sockets used by the vPoller Worker Manager
"""
logging.debug('Creating vPoller Worker Manager sockets')

self.zcontext = zmq.Context()

self.mgmt_socket = self.zcontext.socket(zmq.REP)
self.mgmt_socket.bind(self.config.get('mgmt'))

self.zpoller = zmq.Poller()
self.zpoller.register(self.mgmt_socket, zmq.POLLIN)

def close_sockets(self):
"""
Closes the ZeroMQ sockets used by the Manager
"""
logging.debug('Closing vPoller Worker Manager sockets')

self.zpoller.unregister(self.mgmt_socket)
self.mgmt_socket.close()
self.zcontext.term()

def process_mgmt_msg(self):
socks = dict(self.zpoller.poll())
if socks.get(self.mgmt_socket) == zmq.POLLIN:
pass

class VPollerWorker(multiprocessing.Process):
"""
VPollerWorker class
Expand Down

0 comments on commit aa54ced

Please sign in to comment.