Skip to content

Commit

Permalink
Merge pull request #2344 from locustio/add-worker_connect-event
Browse files Browse the repository at this point in the history
Add worker_connect event
  • Loading branch information
cyberw authored May 15, 2023
2 parents 908c414 + 10bb929 commit c24d0ce
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
7 changes: 7 additions & 0 deletions locust/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ class Events:
:param data: Data dict with the data from the worker node
"""

worker_connect: EventHook
"""
Fired on master when a new worker connects
:param client_id: Client id of the connected worker
"""

spawning_complete: EventHook
"""
Fired when all simulated users has been spawned.
Expand Down
1 change: 1 addition & 0 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,7 @@ def client_listener(self) -> NoReturn:
logger.info(
f"Worker {client_id} (index {self.get_worker_index(client_id)}) reported as ready. {len(self.clients.ready + self.clients.running + self.clients.spawning)} workers connected."
)
self.environment.events.worker_connect.fire(client_id=msg.node_id)
if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed:
self.start(self.target_user_count, self.spawn_rate)
# emit a warning if the worker's clock seem to be out of sync with our clock
Expand Down
15 changes: 12 additions & 3 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ class TestMasterWorkerRunners(LocustTestCase):
def test_distributed_integration_run(self):
"""
Full integration test that starts both a MasterRunner and three WorkerRunner instances
and makes sure that their stats is sent to the Master.
and makes sure that their stats is sent to the Master. Also validates worker_connect event
"""

class TestUser(User):
Expand All @@ -853,13 +853,19 @@ def incr_stats(self):
with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):
# start a Master runner
master_env = Environment(user_classes=[TestUser])
worker_connect_events = []

def on_connect(client_id):
worker_connect_events.append(client_id)

master_env.events.worker_connect.add_listener(on_connect)
master = master_env.create_master_runner("*", 0)
sleep(0)
# start 3 Worker runners
workers = []
for i in range(3):
worker_env = Environment(user_classes=[TestUser])
worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)
worker: WorkerRunner = worker_env.create_worker_runner("127.0.0.1", master.server.port)
workers.append(worker)

# give workers time to connect
Expand All @@ -873,9 +879,12 @@ def incr_stats(self):
# give time for users to generate stats, and stats to be sent to master
sleep(1)
master.quit()
# make sure users are killed

for worker in workers:
# make sure users are killed
self.assertEqual(0, worker.user_count)
# make sure events happened correctly
self.assertIn(worker.client_id, worker_connect_events)

# check that stats are present in master
self.assertGreater(
Expand Down

0 comments on commit c24d0ce

Please sign in to comment.