Skip to content

Commit

Permalink
Merge pull request #2093 from andydunstall/bugfix/2091-fixed-count-us…
Browse files Browse the repository at this point in the history
…ers-not-rebalanced

dispatch: rebalance users with a fixed count
  • Loading branch information
cyberw authored May 9, 2022
2 parents 3d1412c + acbd1b3 commit 61cb0cd
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 0 deletions.
8 changes: 8 additions & 0 deletions locust/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ def _prepare_rebalance(self) -> None:
we started from 0 user. So, if we were currently running 500 users, then the `_distribute_users` will
perform a fake ramp-up without any waiting and return the final distribution.
"""
# Reset users before recalculating since the current users is used to calculate how many
# fixed users to add.
self._users_on_workers = {
worker_node.id: {user_class.__name__: 0 for user_class in self._user_classes}
for worker_node in self._worker_nodes
}
self._try_dispatch_fixed = True

users_on_workers, user_gen, worker_gen, active_users = self._distribute_users(self._current_user_count)

self._users_on_workers = users_on_workers
Expand Down
149 changes: 149 additions & 0 deletions locust/test/test_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2732,6 +2732,73 @@ class User3(User):

self.assertFalse(users_dispatcher._rebalance)

def test_remove_worker_during_ramp_up_with_fixed_user(self):
class User1(User):
fixed_count = 2

class User2(User):
weight = 1

class User3(User):
weight = 1

user_classes = [User1, User2, User3]

worker_nodes = [WorkerNode(str(i + 1)) for i in range(3)]

users_dispatcher = UsersDispatcher(worker_nodes=worker_nodes, user_classes=user_classes)

sleep_time = 0.2 # Speed-up test

users_dispatcher.new_dispatch(target_user_count=9, spawn_rate=3)
users_dispatcher._wait_between_dispatch = sleep_time

# Dispatch iteration 1
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 1, "User3": 0})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 1)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 1)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 1)

# Dispatch iteration 2
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
delta = time.perf_counter() - ts
self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta)
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 2)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.remove_worker(worker_nodes[1])

self.assertTrue(users_dispatcher._rebalance)

# Re-balance
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, f"Expected re-balance dispatch to be instantaneous but got {delta}s")
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 3)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 3
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
delta = time.perf_counter() - ts
self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta)
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 4, "User3": 3})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 5)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4)


class TestAddWorker(unittest.TestCase):
def test_add_worker_during_ramp_up(self):
Expand Down Expand Up @@ -3176,6 +3243,88 @@ class User3(User):
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3)

def test_add_worker_during_ramp_up_with_fixed_user(self):
class User1(User):
fixed_count = 2

class User2(User):
weight = 1

class User3(User):
weight = 1

user_classes = [User1, User2, User3]

worker_nodes = [
WorkerNode("hostname1_worker1"),
WorkerNode("hostname1_worker2"),
WorkerNode("hostname2_worker1"),
]

users_dispatcher = UsersDispatcher(worker_nodes=[worker_nodes[0], worker_nodes[2]], user_classes=user_classes)

sleep_time = 0.2 # Speed-up test

users_dispatcher.new_dispatch(target_user_count=11, spawn_rate=3)
users_dispatcher._wait_between_dispatch = sleep_time

# Dispatch iteration 1
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, delta)
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 1, "User3": 0})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 2)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 1)

# Dispatch iteration 2
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
delta = time.perf_counter() - ts
self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta)
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 3)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3)

self.assertFalse(users_dispatcher._rebalance)

users_dispatcher.add_worker(worker_nodes[1])

self.assertTrue(users_dispatcher._rebalance)

# Re-balance
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
delta = time.perf_counter() - ts
self.assertTrue(0 <= delta <= _TOLERANCE, f"Expected re-balance dispatch to be instantaneous but got {delta}s")
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 2)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2)

self.assertFalse(users_dispatcher._rebalance)

# Dispatch iteration 3
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
delta = time.perf_counter() - ts
self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta)
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 4, "User3": 3})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 3)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3)

# Dispatch iteration 4
ts = time.perf_counter()
dispatched_users = next(users_dispatcher)
delta = time.perf_counter() - ts
self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta)
self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 5, "User3": 4})
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 4)
# without host-based balancing the following two values would be reversed
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3)
self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4)


class TestRampUpUsersFromZeroWithFixed(unittest.TestCase):
class RampUpCase:
Expand Down

0 comments on commit 61cb0cd

Please sign in to comment.