diff --git a/docs/api.rst b/docs/api.rst index 86e7838917..f53c75438e 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -44,7 +44,7 @@ Built in wait_time functions ============================ .. automodule:: locust.wait_time - :members: between, constant, constant_pacing + :members: between, constant, constant_pacing, constant_ips, constant_ips_total HttpSession class ================= diff --git a/docs/changelog.rst b/docs/changelog.rst index b238952788..3f20c25070 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -9,9 +9,8 @@ In development (master) * Continuously measure CPU usage and emit a warning if we get a five second average above 90% * Show CPU usage of slave nodes in the Web UI -* Fixed issue when running Locust distributed and new slave nodes connected during the hatching/ramp-up - phase (https://github.com/locustio/locust/issues/1168) - +* Add new timers that target iterations per second: constant_ips (the inverse of constant_pacing) and constant_ips_total (total iterations per second, across locusts) +* Fixed issue when running Locust distributed and new slave nodes connected during the hatching/ramp-up phase (https://github.com/locustio/locust/issues/1168) 0.13.5 ====== diff --git a/locust/main.py b/locust/main.py index 704184cc01..ae0f43e7c5 100644 --- a/locust/main.py +++ b/locust/main.py @@ -571,7 +571,7 @@ def sig_term_handler(): main_greenlet.join() code = 0 lr = runners.locust_runner - if len(lr.errors) or len(lr.exceptions) or lr.cpu_log_warning(): + if len(lr.errors) | len(lr.exceptions) | lr.cpu_log_warning() | lr.ips_log_warning(): code = options.exit_code_on_error shutdown(code=code) except KeyboardInterrupt as e: diff --git a/locust/runners.py b/locust/runners.py index 8dde0006f7..9d125f17c1 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -41,8 +41,10 @@ def __init__(self, locust_classes, options): self.state = STATE_INIT self.hatching_greenlet = None self.stepload_greenlet = None + self.target_user_count = None self.current_cpu_usage = 0 self.cpu_warning_emitted = False + self.ips_warning_emitted = False self.greenlet.spawn(self.monitor_cpu) self.exceptions = {} self.stats = global_stats @@ -77,8 +79,13 @@ def cpu_log_warning(self): """Called at the end of the test to repeat the warning & return the status""" if self.cpu_warning_emitted: logger.warning("Loadgen CPU usage was too high at some point during the test! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines") - return True - return False + return self.cpu_warning_emitted + + def ips_log_warning(self): + """Called at the end of the test to repeat the warning & return the status""" + if self.ips_warning_emitted: + logger.warning("Failed to reach targeted number of iterations per second (at some point during the test). The most common cause of this is target system overload or too few clients") + return self.ips_warning_emitted def weight_locusts(self, amount): """ @@ -210,6 +217,7 @@ def start_hatching(self, locust_count, hatch_rate, wait=False): self.exceptions = {} self.cpu_warning_emitted = False self.slave_cpu_warning_emitted = False + self.target_user_count = locust_count events.locust_start_hatching.fire() # Dynamically changing the locust count @@ -292,6 +300,7 @@ def on_locust_error(locust_instance, exception, tb): events.locust_error += on_locust_error def start_hatching(self, locust_count, hatch_rate, wait=False): + self.target_user_count = locust_count if hatch_rate > 100: logger.warning("Your selected hatch rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?") if self.hatching_greenlet: @@ -323,7 +332,6 @@ class MasterLocustRunner(DistributedLocustRunner): def __init__(self, *args, **kwargs): super(MasterLocustRunner, self).__init__(*args, **kwargs) self.slave_cpu_warning_emitted = False - self.target_user_count = None class SlaveNodesDict(dict): def get_by_state(self, state): diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 97b0792c2a..b8374ee791 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -13,7 +13,7 @@ SlaveLocustRunner, STATE_INIT, STATE_HATCHING, STATE_RUNNING, STATE_MISSING from locust.stats import global_stats, RequestStats from locust.test.testcases import LocustTestCase -from locust.wait_time import between, constant +from locust.wait_time import between, constant, constant_ips_total def mocked_rpc(): @@ -627,6 +627,26 @@ class MyLocust(Locust): self.assertTrue("HeyAnException" in exception["traceback"]) self.assertEqual(2, exception["count"]) + def test_constant_ips_total(self): + target_ips = 50 + run_time = 3 + + class MyTestLocust(Locust): + i = 0 + class task_set(TaskSet): + @task + def the_task(self): + MyTestLocust.i = MyTestLocust.i + 1 + wait_time = constant_ips_total(target_ips) + + options = mocked_options() + runner = LocalLocustRunner([MyTestLocust], options) + runners.locust_runner = runner # this is necessary for ips_total + runner.start_hatching(10, 999) + gevent.sleep(run_time) + runner.quit() + locust_runner = None # just in case someone depends on this not being set + self.assertAlmostEqual(MyTestLocust.i, target_ips * run_time, delta=25) class TestSlaveLocustRunner(LocustTestCase): def setUp(self): diff --git a/locust/test/test_wait_time.py b/locust/test/test_wait_time.py index 359c536dc8..fec7493288 100644 --- a/locust/test/test_wait_time.py +++ b/locust/test/test_wait_time.py @@ -3,7 +3,7 @@ from locust.core import HttpLocust, Locust, TaskSet, events, task from locust.exception import MissingWaitTimeError -from locust.wait_time import between, constant, constant_pacing +from locust.wait_time import between, constant, constant_pacing, constant_ips, constant_ips_total from .testcases import LocustTestCase, WebserverTestCase @@ -70,6 +70,26 @@ class TS(TaskSet): _ = ts2.wait_time() _ = ts2.wait_time() + def test_constant_ips(self): + # Note: constant_ips_total is tested in test_runners.py, because it requires a runner + class User(Locust): + wait_time = constant_ips(10) + class TS(TaskSet): + pass + ts = TS(User()) + + ts2 = TS(User()) + + previous_time = time.time() + for i in range(7): + ts.wait() + since_last_run = time.time() - previous_time + self.assertLess(abs(0.1 - since_last_run), 0.02) + previous_time = time.time() + time.sleep(random.random() * 0.1) + _ = ts2.wait_time() + _ = ts2.wait_time() + def test_missing_wait_time(self): class User(Locust): pass diff --git a/locust/wait_time.py b/locust/wait_time.py index 1399f37810..0e2bf846b7 100644 --- a/locust/wait_time.py +++ b/locust/wait_time.py @@ -1,5 +1,7 @@ import random from time import time +from locust import runners +import logging def between(min_wait, max_wait): @@ -46,8 +48,9 @@ def my_task(self): If a task execution exceeds the specified wait_time, the wait will be 0 before starting the next task. """ + def wait_time_func(self): - if not hasattr(self,"_cp_last_run"): + if not hasattr(self, "_cp_last_run"): self._cp_last_wait_time = wait_time self._cp_last_run = time() return wait_time @@ -56,4 +59,56 @@ def wait_time_func(self): self._cp_last_wait_time = max(0, wait_time - run_time) self._cp_last_run = time() return self._cp_last_wait_time + + return wait_time_func + + +def constant_ips(ips): + """ + This behaves exactly the same as constant_pacing but with an inverted parameter. + It takes iterations per second as a parameter instead of time between iterations. + """ + + return constant_pacing(1.0 / ips) + + +def constant_ips_total(ips): + """ + Returns a function that will track the run time of all tasks in this locust process, + and for each time it's called it will return a wait time that will try to make the + execution equal to the time specified by the wait_time argument. + + This is similar to constant_ips, but looks at all clients/locusts in a locust process. + + Note that in a distributed run, the iterations per second limit is applied per-slave, not globally. + + During rampup, the IPS is intentionally constrained to be the requested ips * the share of running clients. + + Will output a warning if IPS target is missed twice in a row + """ + + def wait_time_func(self): + lr = runners.locust_runner + if not lr: + logging.warning( + "You asked for constant total ips, but you seem to be running a locust directly. Hopefully you are only running one locust, in which case this will give a somewhat reasonable estimate." + ) + return 1.0 / ips + current_time = float(time()) + unstarted_clients = lr.target_user_count - lr.user_count + if not hasattr(self, "_cp_last_run"): + self._cp_last_run = 0 + self._cp_target_missed = False + next_time = self._cp_last_run + (lr.target_user_count + unstarted_clients) / float(ips) + if current_time > next_time: + if lr.state == runners.STATE_RUNNING and self._cp_target_missed and not lr.ips_warning_emitted: + logging.warning("Failed to reach target ips, even after rampup has finished") + lr.ips_warning_emitted = True # stop logging + self._cp_target_missed = True + self._cp_last_run = current_time + return 0 + self._cp_target_missed = False + self._cp_last_run = next_time + return next_time - current_time + return wait_time_func