Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed load/users getting distributed to missing worker #2010

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
CPU_MONITOR_INTERVAL = 5.0
HEARTBEAT_INTERVAL = 1
HEARTBEAT_LIVENESS = 3
HEARTBEAT_DEAD_INTERNAL = -60
FALLBACK_INTERVAL = 5


Expand Down Expand Up @@ -845,7 +846,12 @@ def heartbeat_worker(self):
self.reset_connection()
continue

missing_clients_to_be_removed = []
for client in self.clients.all:
# if clients goes missing for more than HEARTBEAT_DEAD_INTERNAL then add them to be removed list
if client.state == STATE_MISSING and client.heartbeat <= HEARTBEAT_DEAD_INTERNAL:
missing_clients_to_be_removed.append(client.id)

if client.heartbeat < 0 and client.state != STATE_MISSING:
logger.info("Worker %s failed to send heartbeat, setting state to missing." % str(client.id))
client.state = STATE_MISSING
Expand All @@ -861,6 +867,17 @@ def heartbeat_worker(self):
else:
client.heartbeat -= 1

# if there are any missing clients to be removed then remove them and trigger rebalance.
if len(missing_clients_to_be_removed) > 0:
for to_remove_client_id in missing_clients_to_be_removed:
if self.clients.get(to_remove_client_id) is not None:
del self.clients[to_remove_client_id]
if self.state == STATE_RUNNING or self.state == STATE_SPAWNING:
# _users_dispatcher is set to none so that during redistribution the dead clients are not picked, alternative is to call self.stop() before start
self._users_dispatcher = None
# trigger redistribution after missing cclient removal
self.start(user_count=self.target_user_count, spawn_rate=self.spawn_rate)

def reset_connection(self):
logger.info("Reset connection to worker")
try:
Expand Down
90 changes: 90 additions & 0 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -1969,6 +1969,96 @@ def my_task(self):
self.assertEqual(0, master.worker_count)
self.assertEqual(STATE_STOPPED, master.state, "All workers went missing but test didn't stop.")

@mock.patch("locust.runners.HEARTBEAT_INTERVAL", new=0.1)
@mock.patch("locust.runners.HEARTBEAT_DEAD_INTERNAL", new=-3)
def test_worker_missing_after_heartbeat_dead_interval(self):
class TestUser(User):
@task
def my_task(self):
gevent.sleep(600)

with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server, patch_env(
"LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"
):
master = self.get_runner(user_classes=[TestUser])
server.mocked_send(Message("client_ready", __version__, "fake_client1"))
server.mocked_send(Message("client_ready", __version__, "fake_client2"))
server.mocked_send(Message("client_ready", __version__, "fake_client3"))

master.start(3, 3)
server.mocked_send(Message("spawning", None, "fake_client1"))
server.mocked_send(Message("spawning", None, "fake_client2"))
server.mocked_send(Message("spawning", None, "fake_client3"))

sleep(0.1)
server.mocked_send(
Message(
"heartbeat",
{"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},
"fake_client1",
)
)
server.mocked_send(
Message(
"heartbeat",
{"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},
"fake_client2",
)
)
server.mocked_send(
Message(
"heartbeat",
{"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},
"fake_client3",
)
)

sleep(0.1)
# initially all workers are in active state
self.assertEqual(0, len(master.clients.missing))
self.assertEqual(3, master.worker_count)
server.mocked_send(
Message(
"heartbeat",
{"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},
"fake_client1",
)
)

server.mocked_send(
Message(
"heartbeat",
{"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},
"fake_client2",
)
)

sleep(0.6)
# 4 intervals are passed since all 3 heart beats all workers are in missing state
self.assertEqual(3, len(master.clients.missing))
self.assertEqual(0, master.worker_count)

server.mocked_send(
Message(
"heartbeat",
{"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},
"fake_client1",
)
)

server.mocked_send(
Message(
"heartbeat",
{"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},
"fake_client2",
)
)
sleep(0.2)
# hearbeat received from two workers so they are active, for fake_client3 HEARTBEAT_DEAD_INTERNAL has been breached, so it will be removed from worker list
self.assertEqual(0, len(master.clients.missing))
self.assertEqual(2, master.worker_count)
master.stop()

def test_master_total_stats(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
Expand Down