Skip to content

Commit

Permalink
Support gradual ramp-up
Browse files Browse the repository at this point in the history
With this commit we allow users to ramp-up load gradually by specifying
the task property `ramp-up-time-period`. If a non-zero value is
specified, Rally will gradually add clients during that time period
until the target client count as specified by `clients` is reached. This
reduces the potential for initial load spikes when running with many
concurrent clients.

Closes elastic#1195
  • Loading branch information
danielmitterdorfer committed May 19, 2021
1 parent 9608870 commit dcaed59
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 71 deletions.
Binary file added docs/track-ramp-up.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
17 changes: 17 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ The ``schedule`` element contains a list of tasks that are executed by Rally, i.
* ``clients`` (optional, defaults to 1): The number of clients that should execute a task concurrently.
* ``warmup-iterations`` (optional, defaults to 0): Number of iterations that each client should execute to warmup the benchmark candidate. Warmup iterations will not show up in the measurement results.
* ``iterations`` (optional, defaults to 1): Number of measurement iterations that each client executes. The command line report will automatically adjust the percentile numbers based on this number (i.e. if you just run 5 iterations you will not get a 99.9th percentile because we need at least 1000 iterations to determine this value precisely).
* ``ramp-up-time-period`` (optional, defaults to 0): Rally will start clients gradually. It reaches the number specified by ``clients`` at the end of the specified time period. See the section on :ref:`ramp-up <track_ramp_up>` for more details.
* ``warmup-time-period`` (optional, defaults to 0): A time period in seconds that Rally considers for warmup of the benchmark candidate. All response data captured during warmup will not show up in the measurement results.
* ``time-period`` (optional): A time period in seconds that Rally considers for measurement. Note that for bulk indexing you should usually not define this time period. Rally will just bulk index all documents and consider every sample after the warmup time period as measurement sample.
* ``schedule`` (optional, defaults to ``deterministic``): Defines the schedule for this task, i.e. it defines at which point in time during the benchmark an operation should be executed. For example, if you specify a ``deterministic`` schedule and a target-interval of 5 (seconds), Rally will attempt to execute the corresponding operation at second 0, 5, 10, 15 ... . Out of the box, Rally supports ``deterministic`` and ``poisson`` but you can define your own :doc:`custom schedules </adding_tracks>`.
Expand Down Expand Up @@ -554,6 +555,21 @@ If you want as much reproducibility as possible you can choose the `deterministi

If you have more complex needs on how to model traffic, you can also implement a :doc:`custom schedule </adding_tracks>`.

.. _track_ramp_up:

Ramp-up load
~~~~~~~~~~~~

For benchmarks involving many clients it can be useful to increase load gradually. This avoids load spikes at the beginning of a benchmark when Elasticsearch is not yet warmed up. Rally will gradually add more clients over time but each client will already attempt to reach its specified target throughput. The diagram below shows how clients are added over time:

.. image:: track-ramp-up.png
:alt: How ramp-up works

.. note::

As the number of clients and thus the throughput varies during ramp-up, you should set ``warmup-time-period`` at least to the same value as ``ramp-up-time-period`` to get stable measurement results.


Time-based vs. iteration-based
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand All @@ -562,6 +578,7 @@ You should usually use time periods for batch style operations and iterations fo
All tasks in the ``schedule`` list are executed sequentially in the order in which they have been defined. However, it is also possible to execute multiple tasks concurrently, by wrapping them in a ``parallel`` element. The ``parallel`` element defines of the following properties:

* ``clients`` (optional): The number of clients that should execute the provided tasks. If you specify this property, Rally will only use as many clients as you have defined on the ``parallel`` element (see examples)!
* ``ramp-up-time-period`` (optional, defaults to 0): The time-period in seconds across all nested tasks to spend in ramp-up. If this property is defined here, it cannot be overridden in nested tasks. See the section on :ref:`ramp-up <track_ramp_up>` for more details.
* ``warmup-time-period`` (optional, defaults to 0): Allows to define a default value for all tasks of the ``parallel`` element.
* ``time-period`` (optional, no default value if not specified): Allows to define a default value for all tasks of the ``parallel`` element.
* ``warmup-iterations`` (optional, defaults to 0): Allows to define a default value for all tasks of the ``parallel`` element.
Expand Down
89 changes: 64 additions & 25 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1454,15 +1454,7 @@ def es_clients(all_hosts, all_client_options):
if task not in params_per_task:
param_source = track.operation_parameters(self.track, task)
params_per_task[task] = param_source
# We cannot use the global client index here because we need to support parallel execution of tasks
# with multiple clients. Consider the following scenario:
#
# * Clients 0-3 bulk index into indexA
# * Clients 4-7 bulk index into indexB
#
# Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we
# need to start from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB.
schedule = schedule_for(task, task_allocation.client_index_in_task, params_per_task[task])
schedule = schedule_for(task_allocation, params_per_task[task])
async_executor = AsyncExecutor(
client_id, task, schedule, es, self.sampler, self.cancel, self.complete,
task.error_behavior(self.abort_on_error))
Expand Down Expand Up @@ -1546,6 +1538,13 @@ async def __call__(self, *args, **kwargs):
# lazily initialize the schedule
self.logger.debug("Initializing schedule for client id [%s].", self.client_id)
schedule = self.schedule_handle()
# Start the schedule's timer early so the warmup period is independent of any deferred start due to ramp-up
self.schedule_handle.start()
rampup_wait_time = self.schedule_handle.ramp_up_wait_time
if rampup_wait_time:
self.logger.debug("client id [%s] waiting [%.2f]s for ramp-up.", self.client_id, rampup_wait_time)
await asyncio.sleep(rampup_wait_time)

self.logger.debug("Entering main loop for client id [%s].", self.client_id)
# noinspection PyBroadException
try:
Expand Down Expand Up @@ -1714,18 +1713,29 @@ def __repr__(self, *args, **kwargs):


class TaskAllocation:
def __init__(self, task, client_index_in_task):
def __init__(self, task, client_index_in_task, global_client_index, total_clients):
"""
:param task: The current task which is always a leaf task.
:param client_index_in_task: The task-specific index for the allocated client.
:param global_client_index: The globally unique index for the allocated client across
all concurrently executed tasks.
:param total_clients: The total number of clients executing tasks concurrently.
"""
self.task = task
self.client_index_in_task = client_index_in_task
self.global_client_index = global_client_index
self.total_clients = total_clients

def __hash__(self):
return hash(self.task) ^ hash(self.client_index_in_task)
return hash(self.task) ^ hash(self.global_client_index)

def __eq__(self, other):
return isinstance(other, type(self)) and self.task == other.task and self.client_index_in_task == other.client_index_in_task
return isinstance(other, type(self)) and self.task == other.task and self.global_client_index == other.global_client_index

def __repr__(self, *args, **kwargs):
return "TaskAllocation [%d/%d] for %s" % (self.client_index_in_task, self.task.clients, self.task)
return f"TaskAllocation [{self.client_index_in_task}/{self.task.clients}] for {self.task} " \
f"and [{self.global_client_index}/{self.total_clients}] in total"


class Allocator:
Expand Down Expand Up @@ -1771,7 +1781,14 @@ def allocations(self):
physical_client_index = client_index % max_clients
if sub_task.completes_parent:
clients_executing_completing_task.append(physical_client_index)
allocations[physical_client_index].append(TaskAllocation(sub_task, client_index - start_client_index))

ta = TaskAllocation(task=sub_task,
client_index_in_task=client_index - start_client_index,
global_client_index=client_index,
# if task represents a parallel structure this is the total number of clients
# executing sub-tasks concurrently.
total_clients=task.clients)
allocations[physical_client_index].append(ta)
start_client_index += sub_task.clients

# uneven distribution between tasks and clients, e.g. there are 5 (parallel) tasks but only 2 clients. Then, one of them
Expand Down Expand Up @@ -1849,25 +1866,35 @@ def clients(self):

# Runs a concrete schedule on one worker client
# Needs to determine the runners and concrete iterations per client.
def schedule_for(task, client_index, parameter_source):
def schedule_for(task_allocation, parameter_source):
"""
Calculates a client's schedule for a given task.
:param task: The task that should be executed.
:param client_index: The current client index. Must be in the range [0, `task.clients').
:param task_allocation: The task allocation that should be executed by this schedule.
:param parameter_source: The parameter source that should be used for this task.
:return: A generator for the operations the given client needs to perform for this task.
"""
logger = logging.getLogger(__name__)
task = task_allocation.task
op = task.operation
num_clients = task.clients
sched = scheduler.scheduler_for(task)

# We cannot use the global client index here because we need to support parallel execution of tasks
# with multiple clients. Consider the following scenario:
#
# * Clients 0-3 bulk index into indexA
# * Clients 4-7 bulk index into indexB
#
# Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we
# need to start from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB.
client_index = task_allocation.client_index_in_task

# guard all logging statements with the client index and only emit them for the first client. This information is
# repetitive and may cause issues in thespian with many clients (an excessive number of actor messages is sent).
if client_index == 0:
logger.info("Choosing [%s] for [%s].", sched, task)
runner_for_op = runner.runner_for(op.type)
params_for_op = parameter_source.partition(client_index, num_clients)
params_for_op = parameter_source.partition(client_index, task.clients)
if hasattr(sched, "parameter_source"):
if client_index == 0:
logger.debug("Setting parameter source [%s] for scheduler [%s]", params_for_op, sched)
Expand Down Expand Up @@ -1900,7 +1927,7 @@ def schedule_for(task, client_index, parameter_source):
else:
logger.info("%s schedule will determine when the schedule for [%s] terminates.", str(loop_control), task.name)

return ScheduleHandle(task.name, sched, loop_control, runner_for_op, params_for_op)
return ScheduleHandle(task_allocation, sched, loop_control, runner_for_op, params_for_op)


def requires_time_period_schedule(task, task_runner, params):
Expand All @@ -1917,18 +1944,18 @@ def requires_time_period_schedule(task, task_runner, params):


class ScheduleHandle:
def __init__(self, task_name, sched, task_progress_control, runner, params):
def __init__(self, task_allocation, sched, task_progress_control, runner, params):
"""
Creates a generator that will yield individual task invocations for the provided schedule.
:param task_name: The name of the task for which the schedule is generated.
:param task_allocation: The task allocation for which the schedule is generated.
:param sched: The scheduler for this task.
:param task_progress_control: Controls how and how often this generator will loop.
:param runner: The runner for a given operation.
:param params: The parameter source for a given operation.
:return: A generator for the corresponding parameters.
"""
self.task_name = task_name
self.task_allocation = task_allocation
self.sched = sched
self.task_progress_control = task_progress_control
self.runner = runner
Expand All @@ -1939,6 +1966,20 @@ def __init__(self, task_name, sched, task_progress_control, runner, params):
#self.io_pool_exc = ThreadPoolExecutor(max_workers=1)
#self.loop = asyncio.get_event_loop()

@property
def ramp_up_wait_time(self):
"""
:return: the number of seconds to wait until this client should start so load can gradually ramp-up.
"""
ramp_up_time_period = self.task_allocation.task.ramp_up_time_period
if ramp_up_time_period:
return ramp_up_time_period * (self.task_allocation.global_client_index / self.task_allocation.total_clients)
else:
return 0

def start(self):
self.task_progress_control.start()

def before_request(self, now):
self.sched.before_request(now)

Expand All @@ -1949,7 +1990,6 @@ async def __call__(self):
next_scheduled = 0
if self.task_progress_control.infinite:
param_source_knows_progress = hasattr(self.params, "percent_completed")
self.task_progress_control.start()
while True:
try:
next_scheduled = self.sched.next(next_scheduled)
Expand All @@ -1962,7 +2002,6 @@ async def __call__(self):
except StopIteration:
return
else:
self.task_progress_control.start()
while not self.task_progress_control.completed:
try:
next_scheduled = self.sched.next(next_scheduled)
Expand Down
15 changes: 15 additions & 0 deletions esrally/resources/track-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
"type": "integer",
"minimum": 1
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down Expand Up @@ -75,6 +80,11 @@
"minimum": 1,
"description": "Defines the number of times to run the operation."
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down Expand Up @@ -146,6 +156,11 @@
"minimum": 1,
"description": "Defines the number of times to run the operation."
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down
Loading

0 comments on commit dcaed59

Please sign in to comment.