Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce total iterations-per-second limiting timers #1241

Closed
wants to merge 10 commits into from
2 changes: 1 addition & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Built in wait_time functions
============================

.. automodule:: locust.wait_time
:members: between, constant, constant_pacing
:members: between, constant, constant_pacing, constant_ips, constant_ips_total

HttpSession class
=================
Expand Down
5 changes: 2 additions & 3 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ In development (master)

* Continuously measure CPU usage and emit a warning if we get a five second average above 90%
* Show CPU usage of slave nodes in the Web UI
* Fixed issue when running Locust distributed and new slave nodes connected during the hatching/ramp-up
phase (https://github.com/locustio/locust/issues/1168)

* Add new timers that target iterations per second: constant_ips (the inverse of constant_pacing) and constant_ips_total (total iterations per second, across locusts)
* Fixed issue when running Locust distributed and new slave nodes connected during the hatching/ramp-up phase (https://github.com/locustio/locust/issues/1168)

0.13.5
======
Expand Down
2 changes: 1 addition & 1 deletion locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ def sig_term_handler():
main_greenlet.join()
code = 0
lr = runners.locust_runner
if len(lr.errors) or len(lr.exceptions) or lr.cpu_log_warning():
if len(lr.errors) | len(lr.exceptions) | lr.cpu_log_warning() | lr.ips_log_warning():
code = options.exit_code_on_error
shutdown(code=code)
except KeyboardInterrupt as e:
Expand Down
14 changes: 11 additions & 3 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ def __init__(self, locust_classes, options):
self.state = STATE_INIT
self.hatching_greenlet = None
self.stepload_greenlet = None
self.target_user_count = None
self.current_cpu_usage = 0
self.cpu_warning_emitted = False
self.ips_warning_emitted = False
self.greenlet.spawn(self.monitor_cpu)
self.exceptions = {}
self.stats = global_stats
Expand Down Expand Up @@ -77,8 +79,13 @@ def cpu_log_warning(self):
"""Called at the end of the test to repeat the warning & return the status"""
if self.cpu_warning_emitted:
logger.warning("Loadgen CPU usage was too high at some point during the test! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines")
return True
return False
return self.cpu_warning_emitted

def ips_log_warning(self):
"""Called at the end of the test to repeat the warning & return the status"""
if self.ips_warning_emitted:
logger.warning("Failed to reach targeted number of iterations per second (at some point during the test). The most common cause of this is target system overload or too few clients")
return self.ips_warning_emitted

def weight_locusts(self, amount):
"""
Expand Down Expand Up @@ -210,6 +217,7 @@ def start_hatching(self, locust_count, hatch_rate, wait=False):
self.exceptions = {}
self.cpu_warning_emitted = False
self.slave_cpu_warning_emitted = False
self.target_user_count = locust_count
events.locust_start_hatching.fire()

# Dynamically changing the locust count
Expand Down Expand Up @@ -292,6 +300,7 @@ def on_locust_error(locust_instance, exception, tb):
events.locust_error += on_locust_error

def start_hatching(self, locust_count, hatch_rate, wait=False):
self.target_user_count = locust_count
if hatch_rate > 100:
logger.warning("Your selected hatch rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?")
if self.hatching_greenlet:
Expand Down Expand Up @@ -323,7 +332,6 @@ class MasterLocustRunner(DistributedLocustRunner):
def __init__(self, *args, **kwargs):
super(MasterLocustRunner, self).__init__(*args, **kwargs)
self.slave_cpu_warning_emitted = False
self.target_user_count = None

class SlaveNodesDict(dict):
def get_by_state(self, state):
Expand Down
22 changes: 21 additions & 1 deletion locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
SlaveLocustRunner, STATE_INIT, STATE_HATCHING, STATE_RUNNING, STATE_MISSING
from locust.stats import global_stats, RequestStats
from locust.test.testcases import LocustTestCase
from locust.wait_time import between, constant
from locust.wait_time import between, constant, constant_ips_total


def mocked_rpc():
Expand Down Expand Up @@ -627,6 +627,26 @@ class MyLocust(Locust):
self.assertTrue("HeyAnException" in exception["traceback"])
self.assertEqual(2, exception["count"])

def test_constant_ips_total(self):
target_ips = 50
run_time = 3

class MyTestLocust(Locust):
i = 0
class task_set(TaskSet):
@task
def the_task(self):
MyTestLocust.i = MyTestLocust.i + 1
wait_time = constant_ips_total(target_ips)

options = mocked_options()
runner = LocalLocustRunner([MyTestLocust], options)
runners.locust_runner = runner # this is necessary for ips_total
runner.start_hatching(10, 999)
gevent.sleep(run_time)
runner.quit()
locust_runner = None # just in case someone depends on this not being set
self.assertAlmostEqual(MyTestLocust.i, target_ips * run_time, delta=25)

class TestSlaveLocustRunner(LocustTestCase):
def setUp(self):
Expand Down
22 changes: 21 additions & 1 deletion locust/test/test_wait_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from locust.core import HttpLocust, Locust, TaskSet, events, task
from locust.exception import MissingWaitTimeError
from locust.wait_time import between, constant, constant_pacing
from locust.wait_time import between, constant, constant_pacing, constant_ips, constant_ips_total

from .testcases import LocustTestCase, WebserverTestCase

Expand Down Expand Up @@ -70,6 +70,26 @@ class TS(TaskSet):
_ = ts2.wait_time()
_ = ts2.wait_time()

def test_constant_ips(self):
# Note: constant_ips_total is tested in test_runners.py, because it requires a runner
class User(Locust):
wait_time = constant_ips(10)
class TS(TaskSet):
pass
ts = TS(User())

ts2 = TS(User())

previous_time = time.time()
for i in range(7):
ts.wait()
since_last_run = time.time() - previous_time
self.assertLess(abs(0.1 - since_last_run), 0.02)
previous_time = time.time()
time.sleep(random.random() * 0.1)
_ = ts2.wait_time()
_ = ts2.wait_time()

def test_missing_wait_time(self):
class User(Locust):
pass
Expand Down
57 changes: 56 additions & 1 deletion locust/wait_time.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import random
from time import time
from locust import runners
import logging


def between(min_wait, max_wait):
Expand Down Expand Up @@ -46,8 +48,9 @@ def my_task(self):
If a task execution exceeds the specified wait_time, the wait will be 0 before starting
the next task.
"""

def wait_time_func(self):
if not hasattr(self,"_cp_last_run"):
if not hasattr(self, "_cp_last_run"):
self._cp_last_wait_time = wait_time
self._cp_last_run = time()
return wait_time
Expand All @@ -56,4 +59,56 @@ def wait_time_func(self):
self._cp_last_wait_time = max(0, wait_time - run_time)
self._cp_last_run = time()
return self._cp_last_wait_time

return wait_time_func


def constant_ips(ips):
"""
This behaves exactly the same as constant_pacing but with an inverted parameter.
It takes iterations per second as a parameter instead of time between iterations.
"""

return constant_pacing(1.0 / ips)


def constant_ips_total(ips):
"""
Returns a function that will track the run time of all tasks in this locust process,
and for each time it's called it will return a wait time that will try to make the
execution equal to the time specified by the wait_time argument.

This is similar to constant_ips, but looks at all clients/locusts in a locust process.

Note that in a distributed run, the iterations per second limit is applied per-slave, not globally.

During rampup, the IPS is intentionally constrained to be the requested ips * the share of running clients.

Will output a warning if IPS target is missed twice in a row
"""

def wait_time_func(self):
lr = runners.locust_runner
if not lr:
logging.warning(
"You asked for constant total ips, but you seem to be running a locust directly. Hopefully you are only running one locust, in which case this will give a somewhat reasonable estimate."
)
return 1.0 / ips
current_time = float(time())
unstarted_clients = lr.target_user_count - lr.user_count
if not hasattr(self, "_cp_last_run"):
self._cp_last_run = 0
self._cp_target_missed = False
next_time = self._cp_last_run + (lr.target_user_count + unstarted_clients) / float(ips)
if current_time > next_time:
if lr.state == runners.STATE_RUNNING and self._cp_target_missed and not lr.ips_warning_emitted:
logging.warning("Failed to reach target ips, even after rampup has finished")
lr.ips_warning_emitted = True # stop logging
self._cp_target_missed = True
self._cp_last_run = current_time
return 0
self._cp_target_missed = False
self._cp_last_run = next_time
return next_time - current_time

return wait_time_func