Skip to content

Commit

Permalink
Merge pull request #1977 from EzR1d3r/issue-1961-all-users-spawned-me…
Browse files Browse the repository at this point in the history
…ssage-bug

Fixing issue #1961 with incorrect "All users spawned" log messages wh…
  • Loading branch information
cyberw authored Jan 28, 2022
2 parents 68f1af3 + 9fb91d4 commit 48cee13
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 29 deletions.
15 changes: 9 additions & 6 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,32 +749,35 @@ def start(self, user_count: int, spawn_rate: float, wait=False) -> None:
# when the user count is really at the desired value.
timeout = gevent.Timeout(self._wait_for_workers_report_after_ramp_up())
timeout.start()
msg_prefix = "All users spawned"
try:
while self.user_count != self.target_user_count:
gevent.sleep()
gevent.sleep(0.01)
except gevent.Timeout:
pass
msg_prefix = (
"Spawning is complete and report waittime is expired, but not all reports recieved from workers"
)
finally:
timeout.cancel()

self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values()))
self.spawning_completed = True

logger.info("All users spawned: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count))
logger.info("%s: %s" % (msg_prefix, _format_user_classes_count_for_log(self.reported_user_classes_count)))

@functools.lru_cache()
def _wait_for_workers_report_after_ramp_up(self) -> float:
"""
The amount of time to wait after a ramp-up in order for all the workers to report their state
to the master. If not supplied by the user, it is 100ms by default. If the supplied value is a number,
to the master. If not supplied by the user, it is 1000ms by default. If the supplied value is a number,
it is taken as-is. If the supplied value is a pattern like "some_number * WORKER_REPORT_INTERVAL",
the value will be "some_number * WORKER_REPORT_INTERVAL". The most sensible value would be something
like "1.25 * WORKER_REPORT_INTERVAL". However, some users might find it too high, so it is left
to a really small value of 100ms by default.
to a relatively small value of 1000ms by default.
"""
locust_wait_for_workers_report_after_ramp_up = os.getenv("LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP")
if locust_wait_for_workers_report_after_ramp_up is None:
return 0.1
return 1.0

match = re.search(
r"^(?P<coeff>(\d+)|(\d+\.\d+))[ ]*\*[ ]*WORKER_REPORT_INTERVAL$",
Expand Down
107 changes: 106 additions & 1 deletion locust/test/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from locust.user import HttpUser, User, TaskSet
from .mock_locustfile import mock_locustfile, MOCK_LOCUSTFILE_CONTENT
from .testcases import LocustTestCase
from .util import temporary_file, get_free_tcp_port
from .util import temporary_file, get_free_tcp_port, patch_env


class TestLoadLocustfile(LocustTestCase):
Expand Down Expand Up @@ -879,3 +879,108 @@ def task2(self):
self.assertNotIn("task2", stdout_worker)
self.assertEqual(0, proc.returncode)
self.assertEqual(0, proc_worker.returncode)

def test_distributed(self):
LOCUSTFILE_CONTENT = textwrap.dedent(
"""
from locust import User, task, constant
class User1(User):
wait_time = constant(1)
@task
def t(self):
pass
"""
)
with mock_locustfile(content=LOCUSTFILE_CONTENT) as mocked:
proc = subprocess.Popen(
[
"locust",
"-f",
mocked.file_path,
"--headless",
"--master",
"--expect-workers",
"1",
"-u",
"3",
"-t",
"5s",
],
stderr=STDOUT,
stdout=PIPE,
)
proc_worker = subprocess.Popen(
[
"locust",
"-f",
mocked.file_path,
"--worker",
],
stderr=STDOUT,
stdout=PIPE,
)
stdout = proc.communicate()[0].decode("utf-8")
proc_worker.communicate()

self.assertIn('All users spawned: {"User1": 3} (3 total users)', stdout)
self.assertIn("Shutting down (exit code 0)", stdout)

self.assertEqual(0, proc.returncode)
self.assertEqual(0, proc_worker.returncode)

def test_distributed_report_timeout_expired(self):
LOCUSTFILE_CONTENT = textwrap.dedent(
"""
from locust import User, task, constant
class User1(User):
wait_time = constant(1)
@task
def t(self):
pass
"""
)
with mock_locustfile(content=LOCUSTFILE_CONTENT) as mocked, patch_env(
"LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.01"
) as _:
proc = subprocess.Popen(
[
"locust",
"-f",
mocked.file_path,
"--headless",
"--master",
"--expect-workers",
"1",
"-u",
"3",
"-t",
"5s",
],
stderr=STDOUT,
stdout=PIPE,
)
proc_worker = subprocess.Popen(
[
"locust",
"-f",
mocked.file_path,
"--worker",
],
stderr=STDOUT,
stdout=PIPE,
)
stdout = proc.communicate()[0].decode("utf-8")
proc_worker.communicate()

self.assertIn(
'Spawning is complete and report waittime is expired, but not all reports recieved from workers: {"User1": 2} (2 total users)',
stdout,
)
self.assertIn("Shutting down (exit code 0)", stdout)

self.assertEqual(0, proc.returncode)
self.assertEqual(0, proc_worker.returncode)
38 changes: 16 additions & 22 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
task,
)
from retry import retry
from .util import patch_env

NETWORK_BROKEN = "network broken"

Expand Down Expand Up @@ -756,7 +757,9 @@ def incr_stats(self):
context={},
)

with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):
with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(
"LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"
):
# start a Master runner
options = parse_options(["--enable-rebalancing"])
master_env = Environment(user_classes=[TestUser], parsed_options=options)
Expand Down Expand Up @@ -1173,7 +1176,7 @@ def tick(self):
return None

locust_worker_additional_wait_before_ready_after_stop = 5
with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env(
with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(
"LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",
str(locust_worker_additional_wait_before_ready_after_stop),
):
Expand Down Expand Up @@ -1430,7 +1433,7 @@ def tick(self):
user_class.weight = random.uniform(1, 20)

locust_worker_additional_wait_before_ready_after_stop = 5
with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env(
with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(
"LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",
str(locust_worker_additional_wait_before_ready_after_stop),
):
Expand Down Expand Up @@ -1548,7 +1551,7 @@ def tick(self):
return None

locust_worker_additional_wait_before_ready_after_stop = 2
with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env(
with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(
"LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",
str(locust_worker_additional_wait_before_ready_after_stop),
):
Expand Down Expand Up @@ -1874,7 +1877,9 @@ class TestUser(User):
def my_task(self):
gevent.sleep(600)

with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server, patch_env(
"LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"
):
master = self.get_runner(user_classes=[TestUser])
server.mocked_send(Message("client_ready", __version__, "fake_client1"))
server.mocked_send(Message("client_ready", __version__, "fake_client2"))
Expand All @@ -1900,7 +1905,9 @@ class TestUser(User):
def my_task(self):
gevent.sleep(600)

with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server, patch_env(
"LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"
):
master = self.get_runner(user_classes=[TestUser])
server.mocked_send(Message("client_ready", __version__, "fake_client1"))
server.mocked_send(Message("client_ready", __version__, "fake_client2"))
Expand Down Expand Up @@ -2608,16 +2615,16 @@ def assert_cache_hits():
master = self.get_runner()

master._wait_for_workers_report_after_ramp_up.cache_clear()
self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 0.1)
self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 1.0)
assert_cache_hits()

master._wait_for_workers_report_after_ramp_up.cache_clear()
with _patch_env("LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "5.7"):
with patch_env("LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "5.7"):
self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 5.7)
assert_cache_hits()

master._wait_for_workers_report_after_ramp_up.cache_clear()
with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=1.5), _patch_env(
with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=1.5), patch_env(
"LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "5.7 * WORKER_REPORT_INTERVAL"
):
self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 5.7 * 1.5)
Expand All @@ -2626,19 +2633,6 @@ def assert_cache_hits():
master._wait_for_workers_report_after_ramp_up.cache_clear()


@contextmanager
def _patch_env(name: str, value: str):
prev_value = os.getenv(name)
os.environ[name] = value
try:
yield
finally:
if prev_value is None:
del os.environ[name]
else:
os.environ[name] = prev_value


class TestWorkerRunner(LocustTestCase):
def setUp(self):
super().setUp()
Expand Down
13 changes: 13 additions & 0 deletions locust/test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ def temporary_file(content, suffix="_locustfile.py"):
os.remove(f.name)


@contextmanager
def patch_env(name: str, value: str):
prev_value = os.getenv(name)
os.environ[name] = value
try:
yield
finally:
if prev_value is None:
del os.environ[name]
else:
os.environ[name] = prev_value


def get_free_tcp_port():
"""
Find an unused TCP port
Expand Down

0 comments on commit 48cee13

Please sign in to comment.