Skip to content

Commit

Permalink
Add a VMPollerClient class for use by clients for sending out messages
Browse files Browse the repository at this point in the history
  • Loading branch information
dnaeon committed Aug 21, 2013
1 parent a57c633 commit 126d7cf
Showing 1 changed file with 112 additions and 7 deletions.
119 changes: 112 additions & 7 deletions src/vmpollerd/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import os
import glob
import syslog
import ConfigParser

import zmq
from vmconnector.core import VMConnector
Expand Down Expand Up @@ -88,7 +89,7 @@ def run(self, config_dir="/etc/vm-poller"):
# Time to fire up our poller Agents
self.start_agents()

# Bind to our ZeroMQ proxy as a worker
# Connect to our ZeroMQ proxy as a worker
# TODO: The endpoint we bind should be configurable
syslog.syslog("Connecting to the VMPoller Proxy server")
self.worker = self.zcontext.socket(zmq.REP)
Expand Down Expand Up @@ -353,7 +354,7 @@ def get_datastore_property(self, msg):

class VMPollerProxy(Daemon):
"""
VMPoller Proxy object.
VMPoller Proxy class
ZeroMQ proxy which load-balances all client requests to a
pool of connected ZeroMQ workers.
Expand All @@ -365,19 +366,34 @@ class VMPollerProxy(Daemon):
run() method
"""
def run(self):
def run(self, config_file="/etc/vm-poller/vm-pollerd-proxy.conf"):
if not os.path.exists(config_file):
raise VMPollerException, "Cannot read configuration for proxy: %s" % e

config = ConfigParser.ConfigParser()
config.read(config_file)

self.frontend_endpoint = config.get('Default', 'frontend')
self.backend_endpoint = config.get('Default', 'backend')

# ZeroMQ context
self.zcontext = zmq.Context()

# Socket facing clients
# TODO: The endpoint we bind to should be configurable
self.frontend = self.zcontext.socket(zmq.ROUTER)
self.frontend.bind("tcp://*:11555")

try:
self.frontend.bind(self.frontend_endpoint)
except zmq.ZMQError as e:
raise VMPollerException, "Cannot bind frontend socket: %s" % e

# Socket facing workers
# TODO: The endpoint we bind should be configurable
self.backend = self.zcontext.socket(zmq.DEALER)
self.backend.bind("tcp://*:11556")

try:
self.backend.bind(self.backend_endpoint)
except zmq.ZMQError as e:
raise VMPollerException, "Cannot bind backend socket: %s" e

# Start the proxy
syslog.syslog("Starting the VMPoller Proxy")
Expand All @@ -387,3 +403,92 @@ def run(self):
self.frontend.close()
self.backend.close()
self.zcontext.term()

class VMPollerClient(object):
"""
VMPoller Client class
Defines methods for use by clients for sending out message requests.
Sends out messages to a VMPoller Proxy server requesting properties of
different vSphere objects, e.g. datastores, hosts, etc.
Returns:
The result message back. Example result message on success looks like this:
{ "status": 0,
"name": <name-of-object>,
"property": <requested-property>,
"value": <value-of-the-retrieved-property>
}
An example error message looks like this:
{ "status": -1
"reason": <reason-of-the-failure>
}
"""
def __init__(self, config_file="/etc/vm-poller/vm-pollerd-client.conf"):
if not os.path.exists(config_file):
raise VMPollerException, "Config file %s does not exists" % config

config = ConfigParser.ConfigParser()
config.read(config_file)

self.timeout = config.get('Default', 'timeout')
self.retries = config.get('Default', 'retries')
self.endpoint = config.get('Default', 'endpoint')

self.zcontext = zmq.Context()

self.zclient = self.zcontext.socket(zmq.REQ)
self.zclient.setsockopt(zmq.LINGER, 0)
self.zclient.connect(self.endpoint)

self.zpoller = zmq.Poller()
self.zpoller.register(self.zclient, zmq.POLLIN)

def run(self, msg):
# Partially based on the Lazy Pirate Pattern
# http://zguide.zeromq.org/py:all#Client-Side-Reliability-Lazy-Pirate-Pattern

result = None

while self.retries > 0:
# Send our message out
self.zclient.send_json(msg)

socks = dict(self.zpoller.poll(self.timeout))

# Do we have a reply?
if socks[self.zclient] == zmq.POLLIN:
result = self.zclient.recv_json()
break
else:
# We didn't get a reply back from the server, let's retry
self.retries -= 1
syslog.syslog("Didn't get a reply from server, retrying...")

# Socket is confused. Close and remove it.
self.zclient.close()
self.zpoller.unregister(self.zclient)

# Re-establish the connection
self.zclient.connect(self.endpoint)
self.zpoller.register(self.zclient, zmq.POLLIN)

# Close the socket and terminate the context
self.zclient.close()
self.zpoller.unregister(self.zclient)
self.zcontext.term()

# Did we have any result reply at all?
if not result:
return "Did not get a reply from the server"

# Was the request successful?
if result["status"] != 0:
return result["reason"]
else:
return result["value"]

0 comments on commit 126d7cf

Please sign in to comment.