From 10bb929a02f19fe393342a042f91a8fea83c51c3 Mon Sep 17 00:00:00 2001 From: Lars Holmberg Date: Mon, 15 May 2023 09:42:29 +0200 Subject: [PATCH] Add worker_connect event. It will be used by locust-plugins Timescale listener. --- locust/event.py | 7 +++++++ locust/runners.py | 1 + locust/test/test_runners.py | 15 ++++++++++++--- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/locust/event.py b/locust/event.py index fc1f9be4e8..c9e49fedae 100644 --- a/locust/event.py +++ b/locust/event.py @@ -111,6 +111,13 @@ class Events: :param data: Data dict with the data from the worker node """ + worker_connect: EventHook + """ + Fired on master when a new worker connects + + :param client_id: Client id of the connected worker + """ + spawning_complete: EventHook """ Fired when all simulated users has been spawned. diff --git a/locust/runners.py b/locust/runners.py index deda158e11..854295cc1d 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -1036,6 +1036,7 @@ def client_listener(self) -> NoReturn: logger.info( f"Worker {client_id} (index {self.get_worker_index(client_id)}) reported as ready. {len(self.clients.ready + self.clients.running + self.clients.spawning)} workers connected." ) + self.environment.events.worker_connect.fire(client_id=msg.node_id) if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed: self.start(self.target_user_count, self.spawn_rate) # emit a warning if the worker's clock seem to be out of sync with our clock diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 2042d32dcb..f687f214ec 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -833,7 +833,7 @@ class TestMasterWorkerRunners(LocustTestCase): def test_distributed_integration_run(self): """ Full integration test that starts both a MasterRunner and three WorkerRunner instances - and makes sure that their stats is sent to the Master. + and makes sure that their stats is sent to the Master. Also validates worker_connect event """ class TestUser(User): @@ -853,13 +853,19 @@ def incr_stats(self): with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3): # start a Master runner master_env = Environment(user_classes=[TestUser]) + worker_connect_events = [] + + def on_connect(client_id): + worker_connect_events.append(client_id) + + master_env.events.worker_connect.add_listener(on_connect) master = master_env.create_master_runner("*", 0) sleep(0) # start 3 Worker runners workers = [] for i in range(3): worker_env = Environment(user_classes=[TestUser]) - worker = worker_env.create_worker_runner("127.0.0.1", master.server.port) + worker: WorkerRunner = worker_env.create_worker_runner("127.0.0.1", master.server.port) workers.append(worker) # give workers time to connect @@ -873,9 +879,12 @@ def incr_stats(self): # give time for users to generate stats, and stats to be sent to master sleep(1) master.quit() - # make sure users are killed + for worker in workers: + # make sure users are killed self.assertEqual(0, worker.user_count) + # make sure events happened correctly + self.assertIn(worker.client_id, worker_connect_events) # check that stats are present in master self.assertGreater(