diff --git a/locust/test/test_main.py b/locust/test/test_main.py index 4d33224d86..67a4b93c4a 100644 --- a/locust/test/test_main.py +++ b/locust/test/test_main.py @@ -186,6 +186,7 @@ def __enter__(self): universal_newlines=True, bufsize=1, **self.kwargs, + env={**os.environ, "PYTHONUNBUFFERED": "1"}, ) self.stdout_reader = gevent.spawn(self._read_output, self.process.stdout) @@ -414,26 +415,25 @@ def test_help_arg(self): self.assertIn("--skip-log-setup", output) def test_custom_arguments(self): - """ - Test that custom command-line arguments are correctly parsed and accessible. - """ port = get_free_tcp_port() file_content = textwrap.dedent(""" from locust import User, task, constant, events - @events.init_command_line_parser.add_listener def _(parser, **kw): parser.add_argument("--custom-string-arg") - class TestUser(User): wait_time = constant(10) - @task def my_task(self): print(self.environment.parsed_options.custom_string_arg) """) - with self.create_temp_locustfile(content=file_content) as temp_file_path: + with NamedTemporaryFile(mode="w+", delete=False, suffix=".py") as temp_file: + temp_file.write(file_content) + temp_file.flush() + temp_file_path = temp_file.name + + try: args = [ sys.executable, "-m", @@ -446,12 +446,19 @@ def my_task(self): str(port), ] - with self.launch_locust(args) as manager: - # Verify the web interface is accessible by sending a POST request to /swarm - response = self.make_http_request( - port=port, - method="POST", - path="/swarm", + with PopenContextManager(args) as manager: + if not wait_for_output_condition_non_threading( + manager.process, manager.output_lines, "Starting Locust", timeout=30 + ): + self.fail("Timeout waiting for Locust to start.") + + if not wait_for_output_condition_non_threading( + manager.process, manager.output_lines, "Starting web interface at", timeout=30 + ): + self.fail("Timeout waiting for web interface to start.") + + response = requests.post( + f"http://127.0.0.1:{port}/swarm", data={ "user_count": 1, "spawn_rate": 1, @@ -460,60 +467,68 @@ def my_task(self): }, ) - self.assertEqual(response.status_code, 200, msg=f"Expected status code 200, got {response.status_code}") + self.assertEqual(response.status_code, 200) - # Combine all output lines for assertions combined_output = "\n".join(manager.output_lines) - - # Assertions self.assertRegex( combined_output, r".*Shutting down[\S\s]*Aggregated.*", "No stats table printed after shutting down" ) self.assertNotRegex( combined_output, r".*Aggregated[\S\s]*Shutting down.*", "Stats table printed BEFORE shutting down" ) - self.assertNotIn("command_line_value", combined_output, "Command line value should not be in output.") - self.assertIn("web_form_value", combined_output, "Web form value should be present in output.") + self.assertNotIn("command_line_value", combined_output) + self.assertIn("web_form_value", combined_output) + finally: + os.unlink(temp_file_path) @unittest.skipIf(os.name == "nt", reason="Signal handling on Windows is hard") def test_custom_exit_code(self): - """ - Test that Locust exits with a custom exit code as defined by event listeners. - """ port = get_free_tcp_port() file_content = textwrap.dedent(""" from locust import User, task, constant, events - @events.quitting.add_listener def _(environment, **kw): environment.process_exit_code = 42 - @events.quit.add_listener def _(exit_code, **kw): print(f"Exit code in quit event {exit_code}") - class TestUser(User): wait_time = constant(3) - @task def my_task(self): print("running my_task()") """) - with self.create_temp_locustfile(content=file_content) as temp_file_path: + with NamedTemporaryFile(mode="w+", delete=False, suffix=".py") as temp_file: + temp_file.write(file_content) + temp_file.flush() + temp_file_path = temp_file.name + + try: args = [sys.executable, "-m", "locust", "-f", temp_file_path, "--web-port", str(port)] - with self.launch_locust(args) as manager: - # Initiate graceful shutdown of Locust - self.terminate_locust(manager, shutdown_message="Shutting down (exit code 42)", timeout_shutdown=30) + with PopenContextManager(args) as manager: + if not wait_for_output_condition_non_threading( + manager.process, manager.output_lines, "Starting Locust", timeout=30 + ): + self.fail("Timeout waiting for Locust to start.") + + if not wait_for_output_condition_non_threading( + manager.process, manager.output_lines, "Starting web interface at", timeout=30 + ): + self.fail("Timeout waiting for web interface to start.") + + manager.process.send_signal(signal.SIGTERM) + + manager.process.wait(timeout=3) - # Combine all output lines for assertions combined_output = "\n".join(manager.output_lines) - # Assertions self.assertIn("Shutting down (exit code 42)", combined_output) self.assertIn("Exit code in quit event 42", combined_output) - self.assertEqual(42, manager.process.returncode, "Locust did not exit with the expected exit code 42.") + self.assertEqual(42, manager.process.returncode) + finally: + os.unlink(temp_file_path) @unittest.skipIf(os.name == "nt", reason="Signal handling on Windows is hard") def test_webserver_multiple_locustfiles(self): @@ -823,7 +838,7 @@ def my_task(self): with ( self.create_temp_locustfile(content=user_file_content) as mocked1, - self.create_temp_locustfile(content=shape_file_content) as mocked2, # mocked2 is a string + self.create_temp_locustfile(content=shape_file_content) as mocked2, ): port = get_free_tcp_port() @@ -831,7 +846,7 @@ def my_task(self): "-f", mocked1, "-f", - mocked2, # Use mocked2 directly as it's a string + mocked2, "--web-port", str(port), ] @@ -888,7 +903,7 @@ def my_task(self): def test_default_headless_spawn_options(self): with mock_locustfile() as mocked: - proc = subprocess.Popen( + with PopenContextManager( [ "locust", "-f", @@ -902,23 +917,29 @@ def test_default_headless_spawn_options(self): "DEBUG", "--exit-code-on-error", "0", - # just to test --stop-timeout argument parsing, doesnt actually validate its function: "--stop-timeout", "1s", ], - stdout=PIPE, - stderr=PIPE, - text=True, - env=os.environ.copy(), - ) - stdout, stderr = proc.communicate(timeout=4) - self.assertNotIn("Traceback", stderr) - self.assertIn('Spawning additional {"UserSubclass": 1} ({"UserSubclass": 0} already running)...', stderr) - self.assertEqual(0, proc.returncode) + ) as popen_ctx: + try: + popen_ctx.process.wait(timeout=4) + except subprocess.TimeoutExpired: + print("Process timed out") + + all_output = "\n".join(popen_ctx.output_lines) + + self.assertNotIn("Traceback", all_output) + + # Check for the specific message + expected_message = 'Spawning additional {"UserSubclass": 1} ({"UserSubclass": 0} already running)...' + self.assertIn(expected_message, all_output) + + # Check return code + self.assertEqual(0, popen_ctx.process.returncode) def test_invalid_stop_timeout_string(self): with mock_locustfile() as mocked: - proc = subprocess.Popen( + with PopenContextManager( [ "locust", "-f", @@ -928,14 +949,14 @@ def test_invalid_stop_timeout_string(self): "--stop-timeout", "asdf1", ], - stdout=PIPE, - stderr=PIPE, - text=True, - env=os.environ.copy(), - ) - _, stderr = proc.communicate() - self.assertIn("ERROR/locust.main: Valid --stop-timeout formats are", stderr) - self.assertEqual(1, proc.returncode) + ) as popen_ctx: + popen_ctx.process.wait() + + all_output = "\n".join(popen_ctx.output_lines) + + self.assertIn("ERROR/locust.main: Valid --stop-timeout formats are", all_output) + + self.assertEqual(1, popen_ctx.process.returncode) @unittest.skipIf(os.name == "nt", reason="Signal handling on Windows is hard") def test_headless_spawn_options_wo_run_time(self): @@ -945,6 +966,9 @@ def test_headless_spawn_options_wo_run_time(self): """ with mock_locustfile() as mocked: args = [ + sys.executable, + "-m", + "locust", "-f", mocked.file_path, "--host", @@ -954,67 +978,82 @@ def test_headless_spawn_options_wo_run_time(self): "0", ] - with run_locust_process(file_content=None, args=args, port=None) as manager: + with PopenContextManager(args) as manager: if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, "All users spawned", timeout=30 + manager.process, manager.output_lines, "All users spawned", timeout=30 ): - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail("Timeout waiting for Locust to spawn all users.") - manager.proc.send_signal(signal.SIGTERM) + manager.process.send_signal(signal.SIGTERM) if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, "Shutting down (exit code 0)", timeout=30 + manager.process, manager.output_lines, "Shutting down (exit code 0)", timeout=30 ): - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail("Timeout waiting for Locust to shut down.") try: - manager.proc.wait(timeout=5) + manager.process.wait(timeout=5) except subprocess.TimeoutExpired: - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + manager.process.wait(timeout=5) self.fail("Locust process did not terminate gracefully after SIGTERM.") - combined_output = "\n".join(manager.output_lines) - print(f"Combined Locust Output:\n{combined_output}") + combined_output = "\n".join(manager.output_lines) + print(f"Combined Locust Output:\n{combined_output}") - self.assertIn("All users spawned", combined_output, msg="Expected 'All users spawned' not found in output.") - self.assertIn( - "Shutting down (exit code 0)", - combined_output, - msg="Expected 'Shutting down (exit code 0)' not found in output.", - ) - self.assertEqual( - 0, manager.proc.returncode, msg=f"Locust process exited with return code {manager.proc.returncode}" - ) - self.assertNotIn( - "Locust is running with the UserClass Picker Enabled", - combined_output, - msg="Unexpected 'UserClass Picker Enabled' message found in output.", - ) - self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.") + self.assertIn("All users spawned", combined_output, msg="Expected 'All users spawned' not found in output.") + self.assertIn( + "Shutting down (exit code 0)", + combined_output, + msg="Expected 'Shutting down (exit code 0)' not found in output.", + ) + self.assertEqual( + 0, + manager.process.returncode, + msg=f"Locust process exited with return code {manager.process.returncode}", + ) + self.assertNotIn( + "Locust is running with the UserClass Picker Enabled", + combined_output, + msg="Unexpected 'UserClass Picker Enabled' message found in output.", + ) + self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.") - @unittest.skipIf(os.name == "nt", reason="Signal handling on windows is hard") + @unittest.skipIf(os.name == "nt", reason="Signal handling on Windows is hard") def test_run_headless_with_multiple_locustfiles(self): + """ + Test Locust's headless mode with multiple Locustfiles. + Ensures that Locust starts, spawns users from multiple Locustfiles, and shuts down gracefully. + """ with TemporaryDirectory() as temp_dir: with mock_locustfile(dir=temp_dir): - with temporary_file( - content=textwrap.dedent( - """ - from locust import User, task, constant, events - class TestUser(User): - wait_time = constant(1) - @task - def my_task(self): - print("running my_task()") - """ - ), - dir=temp_dir, - ): + user_file_content = textwrap.dedent(""" + from locust import User, task, constant + + class TestUser(User): + wait_time = constant(1) + + @task + def my_task(self): + print("running my_task()") + """) + with self.create_temp_locustfile(content=user_file_content, dir=temp_dir): args = [ + sys.executable, + "-m", + "locust", "-f", temp_dir, "--headless", @@ -1024,28 +1063,36 @@ def my_task(self): "0", ] - with run_locust_process(file_content=None, args=args, port=None) as manager: + with PopenContextManager(args) as manager: if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, "All users spawned", timeout=30 + manager.process, manager.output_lines, "All users spawned", timeout=30 ): - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail("Timeout waiting for Locust to spawn all users.") - manager.proc.send_signal(signal.SIGTERM) + manager.process.send_signal(signal.SIGTERM) if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, "Shutting down (exit code 0)", timeout=30 + manager.process, manager.output_lines, "Shutting down (exit code 0)", timeout=30 ): - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail("Timeout waiting for Locust to shut down.") try: - manager.proc.wait(timeout=5) + manager.process.wait(timeout=5) except subprocess.TimeoutExpired: - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + manager.process.wait(timeout=5) self.fail("Locust process did not terminate gracefully after SIGTERM.") combined_output = "\n".join(manager.output_lines) @@ -1061,20 +1108,29 @@ def my_task(self): ) self.assertEqual( 0, - manager.proc.returncode, - msg=f"Locust process exited with return code {manager.proc.returncode}.", + manager.process.returncode, + msg=f"Locust process exited with return code {manager.process.returncode}.", ) self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.") - @unittest.skipIf(os.name == "nt", reason="Signal handling on windows is hard") + @unittest.skipIf(os.name == "nt", reason="Signal handling on Windows is hard") def test_default_headless_spawn_options_with_shape(self): """ Test Locust with default headless spawn options and a custom LoadTestShape. Ensures that Locust runs the shape and properly shuts down with exit code 0. """ - # Define the content for the custom LoadTestShape with a unique class name - content = MOCK_LOCUSTFILE_CONTENT + textwrap.dedent( - """ + MOCK_LOCUSTFILE_CONTENT = textwrap.dedent(""" + from locust import User, task, constant + + class TestUser(User): + wait_time = constant(1) + + @task + def my_task(self): + print("running my_task()") + """) + + shape_content = textwrap.dedent(""" from locust import LoadTestShape class MyLoadTestShape(LoadTestShape): @@ -1083,89 +1139,124 @@ def tick(self): if run_time < 2: return (10, 1) # (users, spawn rate) return None # Stop the test - """ - ) + """) - with mock_locustfile(content=content) as mocked: - args = ["-f", mocked.file_path, "--host", "https://test.com/", "--headless", "--exit-code-on-error", "0"] + combined_content = MOCK_LOCUSTFILE_CONTENT + shape_content - with run_locust_process(file_content=None, args=args, port=None) as manager: + with self.create_temp_locustfile(content=combined_content) as mocked: + args = [ + sys.executable, + "-m", + "locust", + "-f", + mocked, + "--host", + "https://test.com/", + "--headless", + "--exit-code-on-error", + "0", + ] + + with PopenContextManager(args) as manager: shape_update_message = "Shape test updating to 10 users at 1.00 spawn rate" if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, shape_update_message, timeout=10 + manager.process, manager.output_lines, shape_update_message, timeout=10 ): - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail("Timeout waiting for Locust to run the shape test.") shutdown_message = "Shutting down (exit code 0)" if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, shutdown_message, timeout=10 + manager.process, manager.output_lines, shutdown_message, timeout=10 ): print("Locust output after expected shutdown time:") print("\n".join(manager.output_lines)) - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail("Timeout waiting for Locust to shut down.") try: - manager.proc.wait(timeout=10) + manager.process.wait(timeout=10) except subprocess.TimeoutExpired: print("Locust did not terminate within the expected time.") - manager.proc.terminate() - manager.proc.wait(timeout=10) + manager.process.terminate() + manager.process.wait(timeout=10) self.fail("Locust process did not terminate gracefully after SIGTERM.") - combined_output = "\n".join(manager.output_lines) + combined_output = "\n".join(manager.output_lines) - self.assertIn(shape_update_message, combined_output, msg="Shape test output not found.") - self.assertRegex( - combined_output, - r".*Aggregated[\S\s]*Shutting down[\S\s]*Aggregated.*", - msg="Aggregated output not found before shutdown.", - ) - self.assertIn(shutdown_message, combined_output, msg="Expected shutdown message not found.") - self.assertEqual( - 0, manager.proc.returncode, msg=f"Locust process exited with return code {manager.proc.returncode}." - ) - self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.") + self.assertIn(shape_update_message, combined_output, msg="Shape test output not found.") + self.assertRegex( + combined_output, + r".*Aggregated[\S\s]*Shutting down[\S\s]*Aggregated.*", + msg="Aggregated output not found before shutdown.", + ) + self.assertIn(shutdown_message, combined_output, msg="Expected shutdown message not found.") + self.assertEqual( + 0, + manager.process.returncode, + msg=f"Locust process exited with return code {manager.process.returncode}.", + ) + self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.") @unittest.skipIf(os.name == "nt", reason="Signal handling on Windows is hard") def test_run_headless_with_multiple_locustfiles_with_shape(self): + """ + Test Locust's headless mode with multiple Locustfiles including a LoadTestShape. + Ensures that Locust starts, runs the shape, spawns users from multiple Locustfiles, and shuts down gracefully. + """ with TemporaryDirectory() as temp_dir: - with mock_locustfile( + with self.create_temp_locustfile( content=textwrap.dedent(""" from locust import User, task, between + class TestUser2(User): wait_time = between(2, 4) + @task def my_task(self): print("running my_task() again") """), dir=temp_dir, ) as mocked1: - with temporary_file( - content=textwrap.dedent(""" - from locust import User, task, between, LoadTestShape - class MyLoadTestShape(LoadTestShape): - def tick(self): - run_time = self.get_run_time() - if run_time < 2: - return (10, 1) # (users, spawn rate) - return None # Stop the test - - class TestUser(User): - wait_time = between(2, 4) - @task - def my_task(self): - print("running my_task()") - """), + shape_file_content = textwrap.dedent(""" + from locust import User, task, between, LoadTestShape + + class MyLoadTestShape(LoadTestShape): + def tick(self): + run_time = self.get_run_time() + if run_time < 2: + return (10, 1) # (users, spawn rate) + return None # Stop the test + + class TestUser(User): + wait_time = between(2, 4) + + @task + def my_task(self): + print("running my_task()") + """) + + with self.create_temp_locustfile( + content=shape_file_content, dir=temp_dir, ) as mocked2: args = [ + sys.executable, + "-m", + "locust", "-f", - f"{mocked1.file_path},{mocked2}", + f"{mocked1},{mocked2}", "--host", "https://test.com/", "--headless", @@ -1173,7 +1264,7 @@ def my_task(self): "0", ] - with run_locust_process(file_content=None, args=args, port=None) as manager: + with PopenContextManager(args) as manager: shape_start_message = "Shape test starting" shape_update_message = "Shape test updating to 10 users at 1.00 spawn rate" shape_stop_message = "Shape test stopping" @@ -1188,20 +1279,24 @@ def my_task(self): for message in messages_to_check: if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, message, timeout=10 + manager.process, manager.output_lines, message, timeout=10 ): print(f"Locust output after expected '{message}' time:") print("\n".join(manager.output_lines)) - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail(f"Timeout waiting for Locust to output: {message}") try: - manager.proc.wait(timeout=10) + manager.process.wait(timeout=10) except subprocess.TimeoutExpired: print("Locust did not terminate within the expected time.") - manager.proc.terminate() - manager.proc.wait(timeout=10) + manager.process.terminate() + manager.process.wait(timeout=10) self.fail("Locust process did not terminate gracefully after SIGTERM.") combined_output = "\n".join(manager.output_lines) @@ -1215,16 +1310,24 @@ def my_task(self): ) self.assertEqual( 0, - manager.proc.returncode, - msg=f"Locust process exited with return code {manager.proc.returncode}.", + manager.process.returncode, + msg=f"Locust process exited with return code {manager.process.returncode}.", ) self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.") @unittest.skipIf(os.name == "nt", reason="Signal handling on Windows is hard") def test_autostart_wo_run_time(self): + """ + Test Locust's autostart functionality without specifying run time. + Ensures that Locust starts automatically, informs about no run time limit, + and shuts down gracefully upon receiving a termination signal. + """ port = get_free_tcp_port() with mock_locustfile() as mocked: args = [ + sys.executable, # Use the Python interpreter + "-m", + "locust", "-f", mocked.file_path, "--web-port", @@ -1232,7 +1335,8 @@ def test_autostart_wo_run_time(self): "--autostart", ] - with run_locust_process(file_content=None, args=args, port=port) as manager: + with PopenContextManager(args) as manager: + # Define the messages to check in the output start_message = "Starting Locust" no_run_time_message = "No run time limit set, use CTRL+C to interrupt" @@ -1241,66 +1345,96 @@ def test_autostart_wo_run_time(self): no_run_time_message, ] + # Wait for each message sequentially for message in messages_to_check: if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, message, timeout=10 + manager.process, manager.output_lines, message, timeout=10 ): print(f"Locust output after expected '{message}' time:") print("\n".join(manager.output_lines)) - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail(f"Timeout waiting for Locust to output: {message}") - response = requests.get(f"http://localhost:{port}/") - self.assertEqual(200, response.status_code) + # Make an HTTP GET request to verify the web interface is accessible + try: + response = self.make_http_request(f"{port}", method="GET", path="/") + self.assertEqual(200, response.status_code, "Locust web interface did not return status code 200.") + except requests.exceptions.RequestException as e: + manager.process.terminate() + self.fail(f"Failed to connect to Locust web interface: {e}") - manager.proc.send_signal(signal.SIGTERM) + # Send SIGTERM to initiate graceful shutdown + manager.process.send_signal(signal.SIGTERM) + # Define the shutdown message to wait for shutdown_message = "Shutting down" + if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, shutdown_message, timeout=10 + manager.process, manager.output_lines, shutdown_message, timeout=10 ): print("Locust output after expected shutdown time:") print("\n".join(manager.output_lines)) - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail("Timeout waiting for Locust to shut down.") + # Wait for the process to terminate gracefully try: - manager.proc.wait(timeout=10) + manager.process.wait(timeout=10) except subprocess.TimeoutExpired: print("Locust did not terminate within the expected time.") - manager.proc.terminate() - manager.proc.wait(timeout=10) + manager.process.terminate() + manager.process.wait(timeout=10) self.fail("Locust process did not terminate gracefully after SIGTERM.") + # Combine all output lines for assertions combined_output = "\n".join(manager.output_lines) + # Assertions self.assertIn(start_message, combined_output, msg="Start message not found in output.") self.assertIn(no_run_time_message, combined_output, msg="No run time message not found in output.") self.assertIn(shutdown_message, combined_output, msg="Shutdown message not found in output.") self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.") + # Check response content using PyQuery d = pq(response.content.decode("utf-8")) self.assertIn('"state": "running"', str(d), msg="Expected 'running' state not found in response.") @unittest.skipIf(sys.platform == "darwin", reason="This is too messy on macOS") def test_autostart_w_run_time(self): + """ + Test Locust's autostart functionality with a specified run time. + Ensures that Locust starts automatically, applies the run time limit, + and shuts down gracefully after the run time. + """ port = get_free_tcp_port() with mock_locustfile() as mocked: args = [ + sys.executable, # Use the Python interpreter + "-m", + "locust", "-f", mocked.file_path, "--web-port", str(port), "-t", - "3", + "3", # Run time of 3 seconds "--autostart", "--autoquit", "1", ] - with run_locust_process(file_content=None, args=args, port=port) as manager: + with PopenContextManager(args) as manager: + # Define the messages to check in the output start_message = "Starting Locust" run_time_message = "Run time limit set to 3 seconds" shutdown_message = "Shutting down" @@ -1310,72 +1444,115 @@ def test_autostart_w_run_time(self): run_time_message, ] + # Wait for each message sequentially for message in messages_to_check: if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, message, timeout=10 + manager.process, manager.output_lines, message, timeout=10 ): print(f"Locust output after expected '{message}' time:") print("\n".join(manager.output_lines)) - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail(f"Timeout waiting for Locust to output: {message}") - response = requests.get(f"http://localhost:{port}/") - self.assertEqual(200, response.status_code) + # Make an HTTP GET request to verify the web interface is accessible + try: + response = self.make_http_request(f"{port}", method="GET", path="/") + self.assertEqual(200, response.status_code, "Locust web interface did not return status code 200.") + except requests.exceptions.RequestException as e: + manager.process.terminate() + self.fail(f"Failed to connect to Locust web interface: {e}") + # Wait for the shutdown message indicating autoquit if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, shutdown_message, timeout=10 + manager.process, manager.output_lines, shutdown_message, timeout=10 ): print("Locust output after expected shutdown time:") print("\n".join(manager.output_lines)) - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail("Timeout waiting for Locust to shut down.") + # Wait for the process to terminate gracefully try: - manager.proc.wait(timeout=10) + manager.process.wait(timeout=10) except subprocess.TimeoutExpired: print("Locust did not terminate within the expected time.") - manager.proc.terminate() - manager.proc.wait(timeout=10) + manager.process.terminate() + manager.process.wait(timeout=10) self.fail("Locust process did not terminate gracefully.") + # Combine all output lines for assertions combined_output = "\n".join(manager.output_lines) + # Assertions self.assertIn(start_message, combined_output, msg="Start message not found in output.") self.assertIn(run_time_message, combined_output, msg="Run time message not found in output.") self.assertIn(shutdown_message, combined_output, msg="Shutdown message not found in output.") self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.") - # Check response content + # Check response content using PyQuery d = pq(response.content.decode("utf-8")) self.assertIn('"state": "running"', str(d), msg="Expected 'running' state not found in response.") + # Verify exit code self.assertEqual( - 1, manager.proc.returncode, msg=f"Locust process exited with return code {manager.proc.returncode}." + 0, + manager.process.returncode, + msg=f"Locust process exited with return code {manager.process.returncode}.", ) @unittest.skipIf(os.name == "nt", reason="Signal handling on Windows is hard") def test_run_autostart_with_multiple_locustfiles(self): + """ + Test Locust's autostart functionality with multiple Locustfiles. + Ensures that Locust starts automatically, spawns users from multiple Locustfiles, + and shuts down gracefully. + """ with TemporaryDirectory() as temp_dir: - with mock_locustfile(dir=temp_dir): - with temporary_file( - content=textwrap.dedent( - """ - from locust import User, task, constant, events + # Create the first mock Locustfile in the temporary directory + with self.create_temp_locustfile( + content=textwrap.dedent(""" + from locust import User, task, constant + class TestUser(User): wait_time = constant(1) + @task def my_task(self): print("running my_task()") - """ - ), + """), + dir=temp_dir, + ) as mocked1: + # Create the second mock Locustfile in the same directory + with self.create_temp_locustfile( + content=textwrap.dedent(""" + from locust import User, task, constant + + class UserSubclass(User): + wait_time = constant(1) + + @task + def my_task(self): + print("running my_task()") + """), dir=temp_dir, - ): + ) as mocked2: port = get_free_tcp_port() args = [ + sys.executable, # Use the Python interpreter + "-m", + "locust", "-f", - temp_dir, + f"{mocked1},{mocked2}", # Pass multiple Locustfile paths separated by comma "--autostart", "-u", "2", @@ -1385,7 +1562,8 @@ def my_task(self): str(port), ] - with run_locust_process(file_content=None, args=args, port=port) as manager: + with PopenContextManager(args) as manager: + # Define the messages to check in the output start_message = "Starting Locust" all_users_spawned_message = "All users spawned:" user_count_messages = ['"TestUser": 1', '"UserSubclass": 1'] @@ -1399,33 +1577,47 @@ def my_task(self): task_running_message, ] + # Wait for each message sequentially for message in messages_to_check: if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, message, timeout=10 + manager.process, manager.output_lines, message, timeout=10 ): print(f"Locust output after expected '{message}' time:") - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail(f"Timeout waiting for Locust to output: {message}") - manager.proc.send_signal(signal.SIGTERM) + # Send SIGTERM to initiate graceful shutdown + manager.process.send_signal(signal.SIGTERM) + # Wait for the shutdown message if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, shutdown_message, timeout=10 + manager.process, manager.output_lines, shutdown_message, timeout=10 ): - manager.proc.terminate() - manager.proc.wait(timeout=5) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) self.fail("Timeout waiting for Locust to shut down.") + # Wait for the process to terminate gracefully try: - manager.proc.wait(timeout=10) + manager.process.wait(timeout=10) except subprocess.TimeoutExpired: - manager.proc.terminate() - manager.proc.wait(timeout=10) + manager.process.terminate() + manager.process.wait(timeout=10) self.fail("Locust process did not terminate gracefully after SIGTERM.") + # Combine all output lines for assertions combined_output = "\n".join(manager.output_lines) + # Assertions for message in messages_to_check + [shutdown_message]: self.assertIn( message, combined_output, msg=f"Expected message '{message}' not found in output." @@ -1434,20 +1626,27 @@ def my_task(self): self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.") self.assertEqual( 0, - manager.proc.returncode, - msg=f"Locust process exited with return code {manager.proc.returncode}.", + manager.process.returncode, + msg=f"Locust process exited with return code {manager.process.returncode}.", ) @unittest.skipIf(os.name == "nt", reason="Signal handling on Windows is hard") def test_autostart_w_load_shape(self): """ Test Locust's autostart functionality with a custom LoadTestShape. - Ensures that Locust starts automatically, applies the load shape, and shuts down after autoquit time. + Ensures that Locust starts automatically, applies the load shape, + and shuts down gracefully after the autoquit time. """ port = get_free_tcp_port() - locustfile_content = MOCK_LOCUSTFILE_CONTENT + textwrap.dedent( - """ - from locust import LoadTestShape + locustfile_content = textwrap.dedent(""" + from locust import User, task, constant, LoadTestShape + + class TestUser(User): + wait_time = constant(1) + + @task + def my_task(self): + print("running my_task()") class LoadTestShape(LoadTestShape): def tick(self): @@ -1455,11 +1654,13 @@ def tick(self): if run_time < 2: return (10, 1) # (users, spawn rate) return None # Stop the test - """ - ) + """) with mock_locustfile(content=locustfile_content) as mocked: args = [ + sys.executable, # Use the Python interpreter + "-m", + "locust", "-f", mocked.file_path, "--web-port", @@ -1469,59 +1670,84 @@ def tick(self): "3", ] - with run_locust_process(file_content=None, args=args, port=port) as manager: + with PopenContextManager(args) as manager: + # Wait for the web interface to start if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, "Starting web interface at", timeout=30 + manager.process, manager.output_lines, "Starting web interface at", timeout=30 ): - manager.proc.terminate() + manager.process.terminate() self.fail("Timeout waiting for Locust web interface to start.") + # Make an HTTP GET request to verify the web interface is accessible try: - response = requests.get(f"http://localhost:{port}/", timeout=10) + response = self.make_http_request(f"{port}", method="GET", path="/", timeout=10) self.assertEqual(200, response.status_code, "Locust web interface did not return status code 200.") except requests.exceptions.RequestException as e: - manager.proc.terminate() + manager.process.terminate() self.fail(f"Failed to connect to Locust web interface: {e}") + # Wait for Locust to apply the load shape and autoquit + if not wait_for_output_condition_non_threading( + manager.process, manager.output_lines, "Starting Locust", timeout=30 + ): + manager.process.terminate() + self.fail("Timeout waiting for Locust to start.") + + # Wait for Locust to shutdown after autoquit time + if not wait_for_output_condition_non_threading( + manager.process, manager.output_lines, "Shutting down (exit code 0)", timeout=50 + ): + manager.process.terminate() + self.fail("Timeout waiting for Locust to shut down after autoquit time.") + + # Wait for the process to terminate gracefully try: - manager.proc.wait(timeout=50) - success = True + manager.process.wait(timeout=10) except subprocess.TimeoutExpired: - success = False - manager.proc.send_signal(signal.SIGTERM) - try: - manager.proc.wait(timeout=5) - except subprocess.TimeoutExpired: - manager.proc.kill() - manager.proc.wait() - self.fail("Locust process did not terminate within the expected time.") + print("Locust did not terminate within the expected time.") + manager.process.terminate() + manager.process.wait(timeout=10) + self.fail("Locust process did not terminate gracefully after autoquit.") - combined_output = "\n".join(manager.output_lines) + # Combine all output lines for assertions + combined_output = "\n".join(manager.output_lines) - self.assertIn("Starting Locust", combined_output, "Expected 'Starting Locust' not found in output.") - self.assertIn( - "Shape test starting", combined_output, "Expected 'Shape test starting' not found in output." - ) - self.assertIn("Shutting down", combined_output, "Expected 'Shutting down' not found in output.") - self.assertIn( - "autoquit time reached", combined_output, "Expected 'autoquit time reached' not found in output." - ) - self.assertTrue(success, "Locust process did not terminate successfully.") + # Assertions + self.assertIn("Starting Locust", combined_output, msg="Expected 'Starting Locust' not found in output.") + self.assertIn( + "Shape test starting", combined_output, msg="Expected 'Shape test starting' not found in output." + ) + self.assertIn("Shutting down", combined_output, msg="Expected 'Shutting down' not found in output.") + self.assertIn( + "autoquit time reached", combined_output, msg="Expected 'autoquit time reached' not found in output." + ) + self.assertEqual( + 0, + manager.process.returncode, + msg=f"Locust process exited with return code {manager.process.returncode}.", + ) + self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.") + @unittest.skipIf(os.name == "nt", reason="Signal handling on Windows is hard") def test_autostart_multiple_locustfiles_with_shape(self): - content1 = textwrap.dedent( - """ + """ + Test Locust's autostart functionality with multiple Locustfiles including a LoadTestShape. + Ensures that Locust starts automatically, applies the load shape, spawns users from multiple Locustfiles, + and shuts down gracefully after the autoquit time. + """ + content1 = textwrap.dedent(""" from locust import User, task, between + class TestUser2(User): wait_time = between(2, 4) + @task def my_task(self): print("running my_task() again") - """ - ) - content2 = textwrap.dedent( - """ + """) + content2 = textwrap.dedent(""" from locust import User, task, between, LoadTestShape + class CustomLoadTestShape(LoadTestShape): def tick(self): run_time = self.get_run_time() @@ -1531,11 +1757,11 @@ def tick(self): class TestUser(User): wait_time = between(2, 4) + @task def my_task(self): print("running my_task()") - """ - ) + """) expected_outputs = [ "Starting web interface at", @@ -1545,54 +1771,77 @@ def my_task(self): "Starting Locust", ] - with mock_locustfile(content=content1) as mocked1, mock_locustfile(content=content2) as mocked2: - port = get_free_tcp_port() - args = [ - "-f", - f"{mocked1.file_path},{mocked2.file_path}", - "--autostart", - "--autoquit", - "3", - "--web-port", - str(port), - ] - - with run_locust_process(file_content=None, args=args, port=port) as manager: - try: - if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, expected_outputs[0], timeout=60 - ): - self.fail(f"Timeout waiting for: {expected_outputs[0]}") - - for output in expected_outputs[1:]: - if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, output, timeout=60 - ): - self.fail(f"Timeout waiting for: {output}") - - except Exception as e: - print(f"Test failed with exception: {e}") - print("Process output:") - print("\n".join(manager.output_lines)) - raise + with TemporaryDirectory() as temp_dir: + # Create the first mock Locustfile (TestUser2) in the temporary directory + with self.create_temp_locustfile(content=content1, dir=temp_dir) as mocked1: + # Create the second mock Locustfile with LoadTestShape (TestUser and CustomLoadTestShape) + with self.create_temp_locustfile(content=content2, dir=temp_dir) as mocked2: + port = get_free_tcp_port() + args = [ + sys.executable, # Use the Python interpreter + "-m", + "locust", + "-f", + f"{mocked1},{mocked2}", # Pass multiple Locustfile paths separated by comma + "--autostart", + "--autoquit", + "3", + "--web-port", + str(port), + ] - finally: - # Ensure process is terminated - if manager.proc.poll() is None: - manager.proc.terminate() + with PopenContextManager(args) as manager: try: - manager.proc.wait(timeout=5) - except subprocess.TimeoutExpired: - manager.proc.kill() - + # Wait for each expected message sequentially + for output in expected_outputs: + if not wait_for_output_condition_non_threading( + manager.process, manager.output_lines, output, timeout=60 + ): + self.fail(f"Timeout waiting for: {output}") + + except Exception as e: + print(f"Test failed with exception: {e}") + print("Process output:") + print("\n".join(manager.output_lines)) + raise + + finally: + # Ensure process is terminated + if manager.process.poll() is None: + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + + # Combine all output lines for assertions combined_output = "\n".join(manager.output_lines) + # Assertions for output in expected_outputs: self.assertIn(output, combined_output, f"Expected output not found: {output}") + # Verify that both user tasks are present + self.assertIn("running my_task()", combined_output, msg="TestUser task output not found.") + self.assertIn("running my_task() again", combined_output, msg="TestUser2 task output not found.") + + # Ensure no tracebacks are present + self.assertNotIn("Traceback", combined_output, msg="Unexpected traceback found in output.") + + # Verify exit code + self.assertEqual( + 0, + manager.process.returncode, + msg=f"Locust process exited with return code {manager.process.returncode}.", + ) + @unittest.skipIf(platform.system() == "Darwin", reason="Messy on macOS on GH") @unittest.skipIf(os.name == "nt", reason="Signal handling on windows is hard") def test_web_options(self): + """ + Test Locust's web host options. + Ensures that Locust starts with different web host configurations and the web interface is accessible. + """ expected_outputs = [ "Starting web interface at", "Starting Locust", @@ -1601,49 +1850,156 @@ def test_web_options(self): port = get_free_tcp_port() if platform.system() != "Darwin": - # MacOS only sets up the loopback interface for 127.0.0.1 and not for 127.*.*.*, so we can't test this with mock_locustfile() as mocked: - args = ["-f", mocked.file_path, "--web-host", "127.0.0.2", "--web-port", str(port)] + args = [ + sys.executable, + "-m", + "locust", + "-f", + mocked.file_path, + "--web-host", + "127.0.0.2", + "--web-port", + str(port), + ] - with run_locust_process(file_content=None, args=args, port=port) as manager: - try: - for output in expected_outputs: - if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, output, timeout=30 - ): - self.fail(f"Timeout waiting for: {output}") + with PopenContextManager(args) as manager: + for output in expected_outputs: + if not wait_for_output_condition_non_threading( + manager.process, manager.output_lines, output, timeout=30 + ): + print(f"Locust output after expected '{output}' time:") + print("\n".join(manager.output_lines)) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) + self.fail(f"Timeout waiting for Locust to output: {output}") + try: response = requests.get(f"http://127.0.0.2:{port}/", timeout=1) - self.assertEqual(200, response.status_code) + self.assertEqual( + 200, response.status_code, "Locust web interface did not return status code 200." + ) except requests.exceptions.RequestException as e: self.fail(f"Failed to connect to Locust web interface: {e}") - finally: - manager.proc.terminate() - manager.proc.wait(timeout=5) # Test with --web-host="*" with mock_locustfile() as mocked: - args = ["-f", mocked.file_path, "--web-host", "*", "--web-port", str(port)] + args = [ + sys.executable, + "-m", + "locust", + "-f", + mocked.file_path, + "--web-host", + "*", + "--web-port", + str(port), + ] + + with PopenContextManager(args) as manager: + for output in expected_outputs: + if not wait_for_output_condition_non_threading( + manager.process, manager.output_lines, output, timeout=30 + ): + print(f"Locust output after expected '{output}' time:") + print("\n".join(manager.output_lines)) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) + self.fail(f"Timeout waiting for Locust to output: {output}") - with run_locust_process(file_content=None, args=args, port=port) as manager: try: + response = requests.get(f"http://127.0.0.1:{port}/", timeout=1) + self.assertEqual(200, response.status_code, "Locust web interface did not return status code 200.") + except requests.exceptions.RequestException as e: + self.fail(f"Failed to connect to Locust web interface: {e}") + + all_output_lines = [] + + if platform.system() != "Darwin": + with mock_locustfile() as mocked: + args = [ + sys.executable, + "-m", + "locust", + "-f", + mocked.file_path, + "--web-host", + "127.0.0.2", + "--web-port", + str(port), + ] + + with PopenContextManager(args) as manager: for output in expected_outputs: if not wait_for_output_condition_non_threading( - manager.proc, manager.output_lines, output, timeout=30 + manager.process, manager.output_lines, output, timeout=30 ): - self.fail(f"Timeout waiting for: {output}") + print(f"Locust output after expected '{output}' time:") + print("\n".join(manager.output_lines)) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) + self.fail(f"Timeout waiting for Locust to output: {output}") + + try: + response = requests.get(f"http://127.0.0.2:{port}/", timeout=1) + self.assertEqual( + 200, response.status_code, "Locust web interface did not return status code 200." + ) + except requests.exceptions.RequestException as e: + self.fail(f"Failed to connect to Locust web interface: {e}") + + all_output_lines.extend(manager.output_lines) + + # Second run with --web-host="*" + with mock_locustfile() as mocked: + args = [ + sys.executable, + "-m", + "locust", + "-f", + mocked.file_path, + "--web-host", + "*", + "--web-port", + str(port), + ] + with PopenContextManager(args) as manager: + for output in expected_outputs: + if not wait_for_output_condition_non_threading( + manager.process, manager.output_lines, output, timeout=30 + ): + print(f"Locust output after expected '{output}' time:") + print("\n".join(manager.output_lines)) + manager.process.terminate() + try: + manager.process.wait(timeout=5) + except subprocess.TimeoutExpired: + manager.process.kill() + manager.process.wait(timeout=5) + self.fail(f"Timeout waiting for Locust to output: {output}") + + try: response = requests.get(f"http://127.0.0.1:{port}/", timeout=1) - self.assertEqual(200, response.status_code) + self.assertEqual(200, response.status_code, "Locust web interface did not return status code 200.") except requests.exceptions.RequestException as e: self.fail(f"Failed to connect to Locust web interface: {e}") - finally: - manager.proc.terminate() - manager.proc.wait(timeout=5) - # Check all expected outputs are in the combined output - combined_output = "\n".join(manager.output_lines) - print(f"Combined Locust Output:\n{combined_output}") + all_output_lines.extend(manager.output_lines) + + combined_output = "\n".join(all_output_lines) for output in expected_outputs: self.assertIn(output, combined_output, f"Expected output not found: {output}")