Skip to content

Commit

Permalink
Merge pull request #2033 from marcinh/feature/2031_test_stopping_feature
Browse files Browse the repository at this point in the history
Add test_stopping event
  • Loading branch information
cyberw authored Mar 1, 2022
2 parents dc8edc6 + 5dd6df4 commit 90c46ee
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 38 deletions.
14 changes: 10 additions & 4 deletions examples/test_data_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
# 4. User start
# 5. Inside a task
# ...
# 6. User stop
# 7. Test run stop
# 8. (not shown in this example) Locust quit
# 6. Test run stopping
# 7. User stop
# 8. Test run stop
# 9. (not shown in this example) Locust quit
#
# try it out by running:
# locust -f test_data_management.py --headless -u 2 -t 5
Expand Down Expand Up @@ -60,11 +61,16 @@ def _(environment, **_kwargs):
).json()["data"]


@events.test_stop.add_listener
@events.test_stopping.add_listener
def _(environment, **_kwargs):
print("stopping test run")


@events.test_stop.add_listener
def _(environment, **_kwargs):
print("test run stopped")


class MyUser(HttpUser):
host = "https://postman-echo.com"
wait_time = constant(180) # be nice to postman-echo
Expand Down
5 changes: 5 additions & 0 deletions locust/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ class Events:
users change during a test.
"""

test_stopping: EventHook
"""
Fired on each node when a load test is about to stop - before stopping users.
"""

test_stop: EventHook
"""
Fired on each node when a load test is stopped.
Expand Down
2 changes: 2 additions & 0 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ def stop(self):
if self.state == STATE_STOPPED:
return
logger.debug("Stopping all users")
self.environment.events.test_stopping.fire(environment=self.environment)
self.final_user_classes_count = {**self.user_classes_count}
self.update_state(STATE_CLEANUP)

Expand Down Expand Up @@ -793,6 +794,7 @@ def _wait_for_workers_report_after_ramp_up(self) -> float:
def stop(self, send_stop_to_client: bool = True):
if self.state not in [STATE_INIT, STATE_STOPPED, STATE_STOPPING]:
logger.debug("Stopping...")
self.environment.events.test_stopping.fire(environment=self.environment)
self.final_user_classes_count = {**self.reported_user_classes_count}
self.update_state(STATE_STOPPING)

Expand Down
127 changes: 93 additions & 34 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import json
import os
import random
import time
import unittest
from collections import defaultdict
from contextlib import contextmanager
from operator import itemgetter

import gevent
Expand Down Expand Up @@ -116,7 +114,22 @@ class HeyAnException(Exception):
pass


class TestLocustRunner(LocustTestCase):
class LocustRunnerTestCase(LocustTestCase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.runner_stopping = False
self.runner_stopped = False

def setUp(self):
super().setUp()
self.reset_state()

def reset_state(self):
self.runner_stopping = False
self.runner_stopped = False


class TestLocustRunner(LocustRunnerTestCase):
def test_cpu_warning(self):
_monitor_interval = runners.CPU_MONITOR_INTERVAL
runners.CPU_MONITOR_INTERVAL = 2.0
Expand Down Expand Up @@ -197,19 +210,23 @@ class MyUser(User):
def my_task(self):
pass

test_stop_run = [0]
environment = Environment(user_classes=[User])
environment = Environment(user_classes=[MyUser])

def on_test_stop(*args, **kwargs):
test_stop_run[0] += 1
@environment.events.test_stopping.add_listener
def on_test_stopping(*_, **__):
self.runner_stopping = True

environment.events.test_stop.add_listener(on_test_stop)
@environment.events.test_stop.add_listener
def on_test_stop(*_, **__):
self.runner_stopped = True

runner = LocalRunner(environment)
runner.start(user_count=3, spawn_rate=3, wait=False)
self.assertEqual(0, test_stop_run[0])
self.assertFalse(self.runner_stopping)
self.assertFalse(self.runner_stopped)
runner.stop()
self.assertEqual(1, test_stop_run[0])
self.assertTrue(self.runner_stopping)
self.assertTrue(self.runner_stopped)

def test_stop_event_quit(self):
class MyUser(User):
Expand All @@ -219,19 +236,23 @@ class MyUser(User):
def my_task(self):
pass

test_stop_run = [0]
environment = Environment(user_classes=[User])
environment = Environment(user_classes=[MyUser])

def on_test_stop(*args, **kwargs):
test_stop_run[0] += 1
@environment.events.test_stopping.add_listener
def on_test_stopping(*_, **__):
self.runner_stopping = True

environment.events.test_stop.add_listener(on_test_stop)
@environment.events.test_stop.add_listener
def on_test_stop(*_, **__):
self.runner_stopped = True

runner = LocalRunner(environment)
runner.start(user_count=3, spawn_rate=3, wait=False)
self.assertEqual(0, test_stop_run[0])
self.assertFalse(self.runner_stopping)
self.assertFalse(self.runner_stopped)
runner.quit()
self.assertEqual(1, test_stop_run[0])
self.assertTrue(self.runner_stopping)
self.assertTrue(self.runner_stopped)

def test_stop_event_stop_and_quit(self):
class MyUser(User):
Expand All @@ -241,20 +262,51 @@ class MyUser(User):
def my_task(self):
pass

test_stop_run = [0]
environment = Environment(user_classes=[MyUser])

def on_test_stop(*args, **kwargs):
test_stop_run[0] += 1
@environment.events.test_stopping.add_listener
def on_test_stopping(*_, **__):
self.runner_stopping = True

environment.events.test_stop.add_listener(on_test_stop)
@environment.events.test_stop.add_listener
def on_test_stop(*_, **__):
self.runner_stopped = True

runner = LocalRunner(environment)
runner.start(user_count=3, spawn_rate=3, wait=False)
self.assertEqual(0, test_stop_run[0])
self.assertFalse(self.runner_stopping)
self.assertFalse(self.runner_stopped)
runner.stop()
runner.quit()
self.assertEqual(1, test_stop_run[0])
self.assertTrue(self.runner_stopping)
self.assertTrue(self.runner_stopped)

def test_stopping_event(self):
on_stop_called = [False]

class MyUser(User):
on_stop_called = False
wait_time = constant(1)

@task
def my_task(self):
pass

def on_stop(self):
MyUser.on_stop_called = True

environment = Environment(user_classes=[MyUser])

@environment.events.test_stopping.add_listener
def on_test_stopping(*_, **__):
on_stop_called[0] = MyUser.on_stop_called
self.runner_stopping = True

runner = LocalRunner(environment)
runner.start(user_count=3, spawn_rate=3, wait=False)
runner.quit()
self.assertTrue(self.runner_stopping)
self.assertFalse(on_stop_called[0])

def test_change_user_count_during_spawning(self):
class MyUser(User):
Expand Down Expand Up @@ -1774,7 +1826,7 @@ def on_test_start(*args, **kwargs):
self.assertTrue(test_start_event_fired[0])


class TestMasterRunner(LocustTestCase):
class TestMasterRunner(LocustRunnerTestCase):
def setUp(self):
super().setUp()
self.environment = Environment(events=locust.events, catch_exceptions=False)
Expand Down Expand Up @@ -2224,27 +2276,31 @@ def my_task(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner(user_classes=[TestUser])

run_count = [0]
@self.environment.events.test_stopping.add_listener
def on_test_stopping(*_, **__):
self.runner_stopping = True

@self.environment.events.test_stop.add_listener
def on_test_stop(*a, **kw):
run_count[0] += 1
def on_test_stop(*_, **__):
self.runner_stopped = True

for i in range(5):
server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))

master.start(7, 7)
self.assertEqual(5, len(server.outbox))
master.stop()
self.assertEqual(1, run_count[0])
self.assertTrue(self.runner_stopping)
self.assertTrue(self.runner_stopped)

run_count[0] = 0
self.reset_state()
for i in range(5):
server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))
master.start(7, 7)
master.stop()
master.quit()
self.assertEqual(1, run_count[0])
self.assertTrue(self.runner_stopping)
self.assertTrue(self.runner_stopped)

def test_stop_event_quit(self):
"""
Expand All @@ -2259,19 +2315,22 @@ def my_task(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner(user_classes=[TestUser])

run_count = [0]
@self.environment.events.test_stopping.add_listener
def on_test_stopping(*_, **__):
self.runner_stopping = True

@self.environment.events.test_stop.add_listener
def on_test_stop(*a, **kw):
run_count[0] += 1
def on_test_stop(*_, **__):
self.runner_stopped = True

for i in range(5):
server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))

master.start(7, 7)
self.assertEqual(5, len(server.outbox))
master.quit()
self.assertEqual(1, run_count[0])
self.assertTrue(self.runner_stopping)
self.assertTrue(self.runner_stopped)

def test_spawn_zero_locusts(self):
class MyTaskSet(TaskSet):
Expand Down

0 comments on commit 90c46ee

Please sign in to comment.