Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed load/users getting distributed to missing worker #2010

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions locust/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
if TYPE_CHECKING:
from locust.runners import WorkerNode

HEARTBEAT_DEAD = -10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import from runners instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that, but was getting error during initialization.
Will try the reverse tomorrow..

Could you help me with any resource on how to setup debugging in distributed mode.. That would be really helpful for me to contribute in a better way.. I am resorting to print statement for debugging(This is my first python project)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I dont have much to contribute with debugging distributed. You know how to run an individual unit test, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, initially used to run tox.. later found out the way to run individually.



# To profile line-by-line, uncomment the code below (i.e. `import line_profiler ...`) and
# place `@profile` on the functions/methods you wish to profile. Then, in the unit test you are
Expand Down Expand Up @@ -201,6 +203,13 @@ def remove_worker(self, worker_node: "WorkerNode") -> None:
return
self._prepare_rebalance()

def prepare_rebalance(self) -> None:
"""
This method will remove missing workers after the HEARTBEAT_DEAD interval has passed from getting users during distribution
"""
self._worker_nodes = [w for w in self._worker_nodes if w.heartbeat > HEARTBEAT_DEAD]
self._prepare_rebalance()

def _prepare_rebalance(self) -> None:
"""
When a rebalance is required because of added and/or removed workers, we compute the desired state as if
Expand Down
5 changes: 5 additions & 0 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
CPU_MONITOR_INTERVAL = 5.0
HEARTBEAT_INTERVAL = 1
HEARTBEAT_LIVENESS = 3
HEARTBEAT_DEAD = -10
radhakrishnaakamat marked this conversation as resolved.
Show resolved Hide resolved
FALLBACK_INTERVAL = 5


Expand Down Expand Up @@ -846,6 +847,10 @@ def heartbeat_worker(self):
continue

for client in self.clients.all:
if client.heartbeat < HEARTBEAT_DEAD and client.state == STATE_MISSING:
if self._users_dispatcher is not None:
logger.info("Worker %s failed to send heartbeat after being marked missing for long time, removing the worker" % str(client.id))
self._users_dispatcher.prepare_rebalance()
if client.heartbeat < 0 and client.state != STATE_MISSING:
logger.info("Worker %s failed to send heartbeat, setting state to missing." % str(client.id))
client.state = STATE_MISSING
Expand Down
25 changes: 24 additions & 1 deletion locust/test/test_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Dict, List, Tuple

from locust import User
from locust.dispatch import UsersDispatcher
from locust.dispatch import HEARTBEAT_DEAD, UsersDispatcher
from locust.runners import WorkerNode
from locust.test.util import clear_all_functools_lru_cache

Expand Down Expand Up @@ -3273,6 +3273,29 @@ class User5(User):
user_classes=[User1, User2, User3, User4, User5],
)

def test_remove_missing_workers_with_dead_heartbeat(self):
class User1(User):
weight = 1

class User2(User):
weight = 1

class User3(User):
weight = 1

user_classes = [User1, User2, User3]

worker_nodes = [WorkerNode(str(i + 1)) for i in range(3)]
worker_nodes[1].heartbeat = HEARTBEAT_DEAD - 1

users_dispatcher = UsersDispatcher(worker_nodes=worker_nodes, user_classes=user_classes)

users_dispatcher.new_dispatch(target_user_count=9, spawn_rate=3)
users_dispatcher._wait_between_dispatch = 0
users_dispatcher.prepare_rebalance()
list(users_dispatcher)
cyberw marked this conversation as resolved.
Show resolved Hide resolved
self.assertIs(len(users_dispatcher._worker_nodes), 2, "Expected worker 2 will be removed from worker nodes")
cyberw marked this conversation as resolved.
Show resolved Hide resolved

def test_ramp_up_partially_ramp_down_and_rump_up_to_target(self):
class User1(User):
fixed_count = 50
Expand Down