diff --git a/locust/runners.py b/locust/runners.py index 3655d79697..61d9446c24 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -749,32 +749,35 @@ def start(self, user_count: int, spawn_rate: float, wait=False) -> None: # when the user count is really at the desired value. timeout = gevent.Timeout(self._wait_for_workers_report_after_ramp_up()) timeout.start() + msg_prefix = "All users spawned" try: while self.user_count != self.target_user_count: - gevent.sleep() + gevent.sleep(0.01) except gevent.Timeout: - pass + msg_prefix = ( + "Spawning is complete and report waittime is expired, but not all reports recieved from workers" + ) finally: timeout.cancel() self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values())) self.spawning_completed = True - logger.info("All users spawned: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count)) + logger.info("%s: %s" % (msg_prefix, _format_user_classes_count_for_log(self.reported_user_classes_count))) @functools.lru_cache() def _wait_for_workers_report_after_ramp_up(self) -> float: """ The amount of time to wait after a ramp-up in order for all the workers to report their state - to the master. If not supplied by the user, it is 100ms by default. If the supplied value is a number, + to the master. If not supplied by the user, it is 1000ms by default. If the supplied value is a number, it is taken as-is. If the supplied value is a pattern like "some_number * WORKER_REPORT_INTERVAL", the value will be "some_number * WORKER_REPORT_INTERVAL". The most sensible value would be something like "1.25 * WORKER_REPORT_INTERVAL". However, some users might find it too high, so it is left - to a really small value of 100ms by default. + to a relatively small value of 1000ms by default. """ locust_wait_for_workers_report_after_ramp_up = os.getenv("LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP") if locust_wait_for_workers_report_after_ramp_up is None: - return 0.1 + return 1.0 match = re.search( r"^(?P(\d+)|(\d+\.\d+))[ ]*\*[ ]*WORKER_REPORT_INTERVAL$", diff --git a/locust/test/test_main.py b/locust/test/test_main.py index 2b007d966e..d83a024fb2 100644 --- a/locust/test/test_main.py +++ b/locust/test/test_main.py @@ -16,7 +16,7 @@ from locust.user import HttpUser, User, TaskSet from .mock_locustfile import mock_locustfile, MOCK_LOCUSTFILE_CONTENT from .testcases import LocustTestCase -from .util import temporary_file, get_free_tcp_port +from .util import temporary_file, get_free_tcp_port, patch_env class TestLoadLocustfile(LocustTestCase): @@ -879,3 +879,108 @@ def task2(self): self.assertNotIn("task2", stdout_worker) self.assertEqual(0, proc.returncode) self.assertEqual(0, proc_worker.returncode) + + def test_distributed(self): + LOCUSTFILE_CONTENT = textwrap.dedent( + """ + from locust import User, task, constant + + class User1(User): + wait_time = constant(1) + + @task + def t(self): + pass + """ + ) + with mock_locustfile(content=LOCUSTFILE_CONTENT) as mocked: + proc = subprocess.Popen( + [ + "locust", + "-f", + mocked.file_path, + "--headless", + "--master", + "--expect-workers", + "1", + "-u", + "3", + "-t", + "5s", + ], + stderr=STDOUT, + stdout=PIPE, + ) + proc_worker = subprocess.Popen( + [ + "locust", + "-f", + mocked.file_path, + "--worker", + ], + stderr=STDOUT, + stdout=PIPE, + ) + stdout = proc.communicate()[0].decode("utf-8") + proc_worker.communicate() + + self.assertIn('All users spawned: {"User1": 3} (3 total users)', stdout) + self.assertIn("Shutting down (exit code 0)", stdout) + + self.assertEqual(0, proc.returncode) + self.assertEqual(0, proc_worker.returncode) + + def test_distributed_report_timeout_expired(self): + LOCUSTFILE_CONTENT = textwrap.dedent( + """ + from locust import User, task, constant + + class User1(User): + wait_time = constant(1) + + @task + def t(self): + pass + """ + ) + with mock_locustfile(content=LOCUSTFILE_CONTENT) as mocked, patch_env( + "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.01" + ) as _: + proc = subprocess.Popen( + [ + "locust", + "-f", + mocked.file_path, + "--headless", + "--master", + "--expect-workers", + "1", + "-u", + "3", + "-t", + "5s", + ], + stderr=STDOUT, + stdout=PIPE, + ) + proc_worker = subprocess.Popen( + [ + "locust", + "-f", + mocked.file_path, + "--worker", + ], + stderr=STDOUT, + stdout=PIPE, + ) + stdout = proc.communicate()[0].decode("utf-8") + proc_worker.communicate() + + self.assertIn( + 'Spawning is complete and report waittime is expired, but not all reports recieved from workers: {"User1": 2} (2 total users)', + stdout, + ) + self.assertIn("Shutting down (exit code 0)", stdout) + + self.assertEqual(0, proc.returncode) + self.assertEqual(0, proc_worker.returncode) diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 483437251d..5859df15a1 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -48,6 +48,7 @@ task, ) from retry import retry +from .util import patch_env NETWORK_BROKEN = "network broken" @@ -756,7 +757,9 @@ def incr_stats(self): context={}, ) - with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3): + with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env( + "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1" + ): # start a Master runner options = parse_options(["--enable-rebalancing"]) master_env = Environment(user_classes=[TestUser], parsed_options=options) @@ -1173,7 +1176,7 @@ def tick(self): return None locust_worker_additional_wait_before_ready_after_stop = 5 - with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env( + with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env( "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP", str(locust_worker_additional_wait_before_ready_after_stop), ): @@ -1430,7 +1433,7 @@ def tick(self): user_class.weight = random.uniform(1, 20) locust_worker_additional_wait_before_ready_after_stop = 5 - with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env( + with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env( "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP", str(locust_worker_additional_wait_before_ready_after_stop), ): @@ -1548,7 +1551,7 @@ def tick(self): return None locust_worker_additional_wait_before_ready_after_stop = 2 - with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env( + with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env( "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP", str(locust_worker_additional_wait_before_ready_after_stop), ): @@ -1874,7 +1877,9 @@ class TestUser(User): def my_task(self): gevent.sleep(600) - with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: + with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server, patch_env( + "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1" + ): master = self.get_runner(user_classes=[TestUser]) server.mocked_send(Message("client_ready", __version__, "fake_client1")) server.mocked_send(Message("client_ready", __version__, "fake_client2")) @@ -1900,7 +1905,9 @@ class TestUser(User): def my_task(self): gevent.sleep(600) - with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: + with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server, patch_env( + "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1" + ): master = self.get_runner(user_classes=[TestUser]) server.mocked_send(Message("client_ready", __version__, "fake_client1")) server.mocked_send(Message("client_ready", __version__, "fake_client2")) @@ -2608,16 +2615,16 @@ def assert_cache_hits(): master = self.get_runner() master._wait_for_workers_report_after_ramp_up.cache_clear() - self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 0.1) + self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 1.0) assert_cache_hits() master._wait_for_workers_report_after_ramp_up.cache_clear() - with _patch_env("LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "5.7"): + with patch_env("LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "5.7"): self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 5.7) assert_cache_hits() master._wait_for_workers_report_after_ramp_up.cache_clear() - with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=1.5), _patch_env( + with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=1.5), patch_env( "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "5.7 * WORKER_REPORT_INTERVAL" ): self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 5.7 * 1.5) @@ -2626,19 +2633,6 @@ def assert_cache_hits(): master._wait_for_workers_report_after_ramp_up.cache_clear() -@contextmanager -def _patch_env(name: str, value: str): - prev_value = os.getenv(name) - os.environ[name] = value - try: - yield - finally: - if prev_value is None: - del os.environ[name] - else: - os.environ[name] = prev_value - - class TestWorkerRunner(LocustTestCase): def setUp(self): super().setUp() diff --git a/locust/test/util.py b/locust/test/util.py index 8c822a18b2..1f6514c413 100644 --- a/locust/test/util.py +++ b/locust/test/util.py @@ -28,6 +28,19 @@ def temporary_file(content, suffix="_locustfile.py"): os.remove(f.name) +@contextmanager +def patch_env(name: str, value: str): + prev_value = os.getenv(name) + os.environ[name] = value + try: + yield + finally: + if prev_value is None: + del os.environ[name] + else: + os.environ[name] = prev_value + + def get_free_tcp_port(): """ Find an unused TCP port