Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add heartbeat to detect down slaves #927

Merged
merged 24 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3d4b927
Replace zmq sockets with one DEALER-ROUTER socket
Dec 10, 2018
095acbd
Remove client_id parameter from send_multipart method
Dec 10, 2018
c02824e
Add heartbeat worker to server and client
Dec 10, 2018
3ff4080
Use new clients.all property in heartbeat worker
Dec 10, 2018
28b0bc9
Fix reporting of stopped state
Dec 10, 2018
7b5e62d
Fix tests after changing ZMQ sockets to DEALER-ROUTER
Dec 10, 2018
d019117
Change heartbeat log msg to info so that it does not appear in tests
Dec 11, 2018
a5c768c
Add tests for zmqrpc.py
Dec 11, 2018
7e1c8e9
Remove commented imports, add note about sleep
Dec 11, 2018
6984741
Support str/unicode diff in py2 vs py3
Dec 11, 2018
197949a
Ensure failed zmqrpc tests clean up bound sockets
Dec 11, 2018
e617573
Create throw away variable for identity from from ZMQ message
Dec 11, 2018
06062f7
Replace usage of parse_options in tests with mock options
Dec 11, 2018
5a416fc
Set coverage concurrency to gevent
Dec 11, 2018
9818dd3
Add test that shows master heartbeat worker marks slaves missing
Dec 11, 2018
7c0d155
Add assertions to test_zmqrpc.py
Dec 11, 2018
bd56d22
Use unittest assertions
Dec 11, 2018
e757591
Change assertion value to bytes object
Dec 11, 2018
5854b60
Add cmdline options for heartbeat liveness and interval
Dec 17, 2018
493db4c
Add new option heartbeat_liveness to test_runners mock options
Dec 17, 2018
72f16cb
Ensure SlaveNode class uses heartbeat_liveness default or passed
Dec 17, 2018
50b8e7d
Ensure hatch data can be updated for slaves currently hatching
Dec 18, 2018
bb9b473
Add test for start hatching accepted slave states
Dec 18, 2018
25272c6
Remove unneeded imports of mock
Dec 18, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[run]
branch = True
source = locust
concurrency = gevent

[report]
exclude_lines =
Expand Down
16 changes: 16 additions & 0 deletions locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,22 @@ def parse_options():
help="Port that locust master should bind to. Only used when running with --master. Defaults to 5557. Note that Locust will also use this port + 1, so by default the master node will bind to 5557 and 5558."
)

parser.add_option(
'--heartbeat-liveness',
action='store',
type='int',
default=3,
help="set number of seconds before failed heartbeat from slave"
)

parser.add_option(
'--heartbeat-interval',
action='store',
type='int',
default=1,
help="set number of seconds delay between slave heartbeats to master"
)

parser.add_option(
'--expect-slaves',
action='store',
Expand Down
41 changes: 23 additions & 18 deletions locust/rpc/zmqrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,35 @@


class BaseSocket(object):
def __init__(self, sock_type):
context = zmq.Context()
self.socket = context.socket(sock_type)

def send(self, msg):
self.sender.send(msg.serialize())

self.socket.send(msg.serialize())

def send_to_client(self, msg):
self.socket.send_multipart([msg.node_id.encode(), msg.serialize()])

def recv(self):
data = self.receiver.recv()
return Message.unserialize(data)
data = self.socket.recv()
msg = Message.unserialize(data)
return msg

def recv_from_client(self):
data = self.socket.recv_multipart()
addr = data[0]
msg = Message.unserialize(data[1])
return addr, msg

class Server(BaseSocket):
def __init__(self, host, port):
context = zmq.Context()
self.receiver = context.socket(zmq.PULL)
self.receiver.bind("tcp://%s:%i" % (host, port))

self.sender = context.socket(zmq.PUSH)
self.sender.bind("tcp://%s:%i" % (host, port+1))

BaseSocket.__init__(self, zmq.ROUTER)
self.socket.bind("tcp://%s:%i" % (host, port))

class Client(BaseSocket):
def __init__(self, host, port):
context = zmq.Context()
self.receiver = context.socket(zmq.PULL)
self.receiver.connect("tcp://%s:%i" % (host, port+1))

self.sender = context.socket(zmq.PUSH)
self.sender.connect("tcp://%s:%i" % (host, port))
def __init__(self, host, port, identity):
BaseSocket.__init__(self, zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, identity.encode())
self.socket.connect("tcp://%s:%i" % (host, port))

64 changes: 49 additions & 15 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# global locust runner singleton
locust_runner = None

STATE_INIT, STATE_HATCHING, STATE_RUNNING, STATE_CLEANUP, STATE_STOPPED = ["ready", "hatching", "running", "cleanup", "stopped"]
STATE_INIT, STATE_HATCHING, STATE_RUNNING, STATE_CLEANUP, STATE_STOPPED, STATE_MISSING = ["ready", "hatching", "running", "cleanup", "stopped", "missing"]
SLAVE_REPORT_INTERVAL = 3.0


Expand Down Expand Up @@ -213,25 +213,32 @@ def __init__(self, locust_classes, options):
self.master_port = options.master_port
self.master_bind_host = options.master_bind_host
self.master_bind_port = options.master_bind_port
self.heartbeat_liveness = options.heartbeat_liveness
self.heartbeat_interval = options.heartbeat_interval

def noop(self, *args, **kwargs):
""" Used to link() greenlets to in order to be compatible with gevent 1.0 """
pass

class SlaveNode(object):
def __init__(self, id, state=STATE_INIT):
def __init__(self, id, state=STATE_INIT, heartbeat_liveness=3):
self.id = id
self.state = state
self.user_count = 0
self.heartbeat = heartbeat_liveness

class MasterLocustRunner(DistributedLocustRunner):
def __init__(self, *args, **kwargs):
super(MasterLocustRunner, self).__init__(*args, **kwargs)

class SlaveNodesDict(dict):
def get_by_state(self, state):
return [c for c in six.itervalues(self) if c.state == state]

@property
def all(self):
return six.itervalues(self)

@property
def ready(self):
return self.get_by_state(STATE_INIT)
Expand All @@ -247,6 +254,7 @@ def running(self):
self.clients = SlaveNodesDict()
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
self.greenlet = Group()
self.greenlet.spawn(self.heartbeat_worker).link_exception(callback=self.noop)
self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)

# listener that gathers info on how many locust users the slaves has spawned
Expand All @@ -268,7 +276,7 @@ def user_count(self):
return sum([c.user_count for c in six.itervalues(self.clients)])

def start_hatching(self, locust_count, hatch_rate):
num_slaves = len(self.clients.ready) + len(self.clients.running)
num_slaves = len(self.clients.ready) + len(self.clients.running) + len(self.clients.hatching)
if not num_slaves:
logger.warning("You are running in distributed mode but have no slave servers connected. "
"Please connect slaves prior to swarming.")
Expand All @@ -286,7 +294,7 @@ def start_hatching(self, locust_count, hatch_rate):
self.exceptions = {}
events.master_start_hatching.fire()

for client in six.itervalues(self.clients):
for client in (self.clients.ready + self.clients.running + self.clients.hatching):
data = {
"hatch_rate":slave_hatch_rate,
"num_clients":slave_num_clients,
Expand All @@ -298,36 +306,49 @@ def start_hatching(self, locust_count, hatch_rate):
data["num_clients"] += 1
remaining -= 1

self.server.send(Message("hatch", data, None))
self.server.send_to_client(Message("hatch", data, client.id))

self.stats.start_time = time()
self.state = STATE_HATCHING

def stop(self):
for client in self.clients.hatching + self.clients.running:
self.server.send(Message("stop", None, None))
for client in self.clients.all:
self.server.send_to_client(Message("stop", None, client.id))
events.master_stop_hatching.fire()

def quit(self):
for client in six.itervalues(self.clients):
self.server.send(Message("quit", None, None))
for client in self.clients.all:
self.server.send_to_client(Message("quit", None, client.id))
self.greenlet.kill(block=True)

def heartbeat_worker(self):
while True:
gevent.sleep(self.heartbeat_interval)
for client in self.clients.all:
if client.heartbeat < 0 and client.state != STATE_MISSING:
logger.info('Slave %s failed to send heartbeat, setting state to missing.' % str(client.id))
client.state = STATE_MISSING
else:
client.heartbeat -= 1

def client_listener(self):
while True:
msg = self.server.recv()
client_id, msg = self.server.recv_from_client()
msg.node_id = client_id
if msg.type == "client_ready":
id = msg.node_id
self.clients[id] = SlaveNode(id)
self.clients[id] = SlaveNode(id, heartbeat_liveness=self.heartbeat_liveness)
logger.info("Client %r reported as ready. Currently %i clients ready to swarm." % (id, len(self.clients.ready)))
## emit a warning if the slave's clock seem to be out of sync with our clock
#if abs(time() - msg.data["time"]) > 5.0:
# warnings.warn("The slave node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
elif msg.type == "client_stopped":
del self.clients[msg.node_id]
if len(self.clients.hatching + self.clients.running) == 0:
self.state = STATE_STOPPED
logger.info("Removing %s client from running clients" % (msg.node_id))
elif msg.type == "heartbeat":
if msg.node_id in self.clients:
self.clients[msg.node_id].heartbeat = self.heartbeat_liveness
self.clients[msg.node_id].state = msg.data['state']
elif msg.type == "stats":
events.slave_report.fire(client_id=msg.node_id, data=msg.data)
elif msg.type == "hatching":
Expand All @@ -345,6 +366,9 @@ def client_listener(self):
elif msg.type == "exception":
self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])

if not self.state == STATE_INIT and all(map(lambda x: x.state == STATE_INIT, self.clients.all)):
self.state = STATE_STOPPED

@property
def slave_count(self):
return len(self.clients.ready) + len(self.clients.hatching) + len(self.clients.running)
Expand All @@ -354,16 +378,19 @@ def __init__(self, *args, **kwargs):
super(SlaveLocustRunner, self).__init__(*args, **kwargs)
self.client_id = socket.gethostname() + "_" + uuid4().hex

self.client = rpc.Client(self.master_host, self.master_port)
self.client = rpc.Client(self.master_host, self.master_port, self.client_id)
self.greenlet = Group()

self.greenlet.spawn(self.heartbeat).link_exception(callback=self.noop)
self.greenlet.spawn(self.worker).link_exception(callback=self.noop)
self.client.send(Message("client_ready", None, self.client_id))
self.slave_state = STATE_INIT
self.greenlet.spawn(self.stats_reporter).link_exception(callback=self.noop)

# register listener for when all locust users have hatched, and report it to the master node
def on_hatch_complete(user_count):
self.client.send(Message("hatch_complete", {"count":user_count}, self.client_id))
self.slave_state = STATE_RUNNING
events.hatch_complete += on_hatch_complete

# register listener that adds the current number of spawned locusts to the report that is sent to the master node
Expand All @@ -382,10 +409,16 @@ def on_locust_error(locust_instance, exception, tb):
self.client.send(Message("exception", {"msg" : str(exception), "traceback" : formatted_tb}, self.client_id))
events.locust_error += on_locust_error

def heartbeat(self):
while True:
self.client.send(Message('heartbeat', {'state': self.slave_state}, self.client_id))
gevent.sleep(self.heartbeat_interval)

def worker(self):
while True:
msg = self.client.recv()
if msg.type == "hatch":
self.slave_state = STATE_HATCHING
self.client.send(Message("hatching", None, self.client_id))
job = msg.data
self.hatch_rate = job["hatch_rate"]
Expand All @@ -396,6 +429,7 @@ def worker(self):
self.stop()
self.client.send(Message("client_stopped", None, self.client_id))
self.client.send(Message("client_ready", None, self.client_id))
self.slave_state = STATE_INIT
elif msg.type == "quit":
logger.info("Got quit message from master, shutting down...")
self.stop()
Expand Down
68 changes: 55 additions & 13 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@
from locust import events
from locust.core import Locust, TaskSet, task
from locust.exception import LocustError
from locust.main import parse_options
from locust.rpc import Message
from locust.runners import LocalLocustRunner, MasterLocustRunner
from locust.runners import LocalLocustRunner, MasterLocustRunner, SlaveNode, STATE_INIT, STATE_HATCHING, STATE_RUNNING, STATE_MISSING
from locust.stats import global_stats, RequestStats
from locust.test.testcases import LocustTestCase


def mocked_rpc_server():
class MockedRpcServer(object):
queue = Queue()
Expand All @@ -35,21 +33,37 @@ def recv(self):
def send(self, message):
self.outbox.append(message.serialize())

def send_to_client(self, message):
self.outbox.append([message.node_id, message.serialize()])

def recv_from_client(self):
results = self.queue.get()
msg = Message.unserialize(results)
return msg.node_id, msg

return MockedRpcServer

class mocked_options(object):
def __init__(self):
self.hatch_rate = 5
self.num_clients = 5
self.host = '/'
self.master_host = 'localhost'
self.master_port = 5557
self.master_bind_host = '*'
self.master_bind_port = 5557
self.heartbeat_liveness = 3
self.heartbeat_interval = 0.01

def reset_stats(self):
pass

class TestMasterRunner(LocustTestCase):
def setUp(self):
global_stats.reset_all()
self._slave_report_event_handlers = [h for h in events.slave_report._handlers]
self.options = mocked_options()

parser, _, _ = parse_options()
args = [
"--clients", "10",
"--hatch-rate", "10"
]
opts, _ = parser.parse_args(args)
self.options = opts

def tearDown(self):
events.slave_report._handlers = self._slave_report_event_handlers
Expand Down Expand Up @@ -94,7 +108,18 @@ class MyTestLocust(Locust):
server.mocked_send(Message("stats", data, "fake_client"))
s = master.stats.get("/", "GET")
self.assertEqual(700, s.median_response_time)


def test_master_marks_downed_slaves_as_missing(self):
class MyTestLocust(Locust):
pass

with mock.patch("locust.rpc.rpc.Server", mocked_rpc_server()) as server:
master = MasterLocustRunner(MyTestLocust, self.options)
server.mocked_send(Message("client_ready", None, "fake_client"))
sleep(0.1)
# print(master.clients['fake_client'].__dict__)
assert master.clients['fake_client'].state == STATE_MISSING

def test_master_total_stats(self):
import mock

Expand Down Expand Up @@ -174,6 +199,23 @@ class MyTestLocust(Locust):
self.assertEqual(30, master.stats.total.get_current_response_time_percentile(0.5))
self.assertEqual(3000, master.stats.total.get_current_response_time_percentile(0.95))

def test_sends_hatch_data_to_ready_running_hatching_slaves(self):
'''Sends hatch job to running, ready, or hatching slaves'''
class MyTestLocust(Locust):
pass

with mock.patch("locust.rpc.rpc.Server", mocked_rpc_server()) as server:
master = MasterLocustRunner(MyTestLocust, self.options)
master.clients[1] = SlaveNode(1)
master.clients[2] = SlaveNode(2)
master.clients[3] = SlaveNode(3)
master.clients[1].state = STATE_INIT
master.clients[2].state = STATE_HATCHING
master.clients[3].state = STATE_RUNNING
master.start_hatching(5,5)

self.assertEqual(3, len(server.outbox))

def test_spawn_zero_locusts(self):
class MyTaskSet(TaskSet):
@task
Expand Down Expand Up @@ -217,7 +259,7 @@ class MyTestLocust(Locust):
self.assertEqual(5, len(server.outbox))

num_clients = 0
for msg in server.outbox:
for _, msg in server.outbox:
num_clients += Message.unserialize(msg).data["num_clients"]

self.assertEqual(7, num_clients, "Total number of locusts that would have been spawned is not 7")
Expand All @@ -237,7 +279,7 @@ class MyTestLocust(Locust):
self.assertEqual(5, len(server.outbox))

num_clients = 0
for msg in server.outbox:
for _, msg in server.outbox:
num_clients += Message.unserialize(msg).data["num_clients"]

self.assertEqual(2, num_clients, "Total number of locusts that would have been spawned is not 2")
Expand Down
Loading