Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Long running benchmarks can cause excessive memory usage when using the in-memory metrics store #1723

Open
b-deam opened this issue Jun 2, 2023 · 0 comments
Assignees
Labels
enhancement Improves the status quo

Comments

@b-deam
Copy link
Member

b-deam commented Jun 2, 2023

#1721 highlighted that we have some room for improvement on how we handle metrics storage during long running benchmarks (i.e. days), especially when using the default in-memory metrics store. As long running benchmarks can end up causing excessive memory usage by esrally, and ultimately end up triggering the kernel OOMKiller on the load driver.

This is particularly an issue for benchmarks that have very long running task(s) where we want to calculate statistics on per-request latencies, as we currently store samples for every request performed (more details below) in memory for the duration of a task. Once a task is completed we serialise the samples and compress them with zlib, and only attempt to reload them into memory once a benchmark is complete.

If you consider that some of these benchmarks can execute hundreds of thousands of requests per-second and can run for days, you can quickly see how this becomes an issue.

We create one Sampler per Worker (i.e. per-core) that is shared by all clients on that Worker to store their samples during task execution, and we also limit the amount of samples that can be stored per-Sampler to 2^20 (2097152):

rally/esrally/driver/driver.py

Lines 1327 to 1339 in 2470328

self.logger.debug("Worker[%d] is executing tasks at index [%d].", self.worker_id, self.current_task_index)
self.sampler = Sampler(start_timestamp=time.perf_counter(), buffer_size=self.sample_queue_size)
executor = AsyncIoAdapter(
self.config,
self.track,
task_allocations,
self.sampler,
self.cancel,
self.complete,
self.on_error,
self.client_contexts,
self.worker_id,
)

rally/esrally/driver/driver.py

Lines 1363 to 1413 in 2470328

class Sampler:
"""
Encapsulates management of gathered samples.
"""
def __init__(self, start_timestamp, buffer_size=16384):
self.start_timestamp = start_timestamp
self.q = queue.Queue(maxsize=buffer_size)
self.logger = logging.getLogger(__name__)
def add(
self,
task,
client_id,
sample_type,
meta_data,
absolute_time,
request_start,
latency,
service_time,
processing_time,
throughput,
ops,
ops_unit,
time_period,
percent_completed,
dependent_timing=None,
):
try:
self.q.put_nowait(
Sample(
client_id,
absolute_time,
request_start,
self.start_timestamp,
task,
sample_type,
meta_data,
latency,
service_time,
processing_time,
throughput,
ops,
ops_unit,
time_period,
percent_completed,
dependent_timing,
)
)
except queue.Full:
self.logger.warning("Dropping sample for [%s] due to a full sampling queue.", task.operation.name)

The size of each sample will depend on the operation's metadata etc, but if we assume that a sample is it at least 4KB (rough estimate from testing) then we can use ~8GB of memory per-worker, per-task before dropping samples on the floor.

We actually do some form of compression on in-memory metrics in to_externalizable(), but only either after all tasks have reached a join point, or on benchmark completion:

def move_to_next_task(self, workers_curr_step):
if self.config.opts("track", "test.mode.enabled"):
# don't wait if test mode is enabled and start the next task immediately.
waiting_period = 0
else:
# start the next task in one second (relative to master's timestamp)
#
# Assumption: We don't have a lot of clock skew between reaching the join point and sending the next task
# (it doesn't matter too much if we're a few ms off).
waiting_period = 1.0
# Some metrics store implementations return None because no external representation is required.
# pylint: disable=assignment-from-none
m = self.metrics_store.to_externalizable(clear=True)
self.target.on_task_finished(m, waiting_period)
# Using a perf_counter here is fine also in the distributed case as we subtract it from `master_received_msg_at` making it
# a relative instead of an absolute value.
start_next_task = time.perf_counter() + waiting_period
for worker_id, worker in enumerate(self.workers):
worker_ended_task_at, master_received_msg_at = workers_curr_step[worker_id]
worker_start_timestamp = worker_ended_task_at + (start_next_task - master_received_msg_at)
self.logger.debug(
"Scheduling next task for worker id [%d] at their timestamp [%f] (master timestamp [%f])",
worker_id,
worker_start_timestamp,
start_next_task,
)
self.target.drive_at(worker, worker_start_timestamp)

rally/esrally/metrics.py

Lines 1153 to 1161 in 2470328

def to_externalizable(self, clear=False):
docs = self.docs
if clear:
self.docs = []
compressed = zlib.compress(pickle.dumps(docs))
self.logger.debug(
"Compression changed size of metric store from [%d] bytes to [%d] bytes", sys.getsizeof(docs, -1), sys.getsizeof(compressed, -1)
)
return compressed

The Driver already wakes up every POST_PROCESS_INTERVAL_SECONDS (30) to flush the collected samples:

@actor.no_retry("driver") # pylint: disable=no-value-for-parameter
def receiveMsg_WakeupMessage(self, msg, sender):
if msg.payload == DriverActor.RESET_RELATIVE_TIME_MARKER:
self.coordinator.reset_relative_time()
elif not self.coordinator.finished():
self.post_process_timer += DriverActor.WAKEUP_INTERVAL_SECONDS
if self.post_process_timer >= DriverActor.POST_PROCESS_INTERVAL_SECONDS:
self.post_process_timer = 0
self.coordinator.post_process_samples()
self.coordinator.update_progress_message()
self.wakeupAfter(datetime.timedelta(seconds=DriverActor.WAKEUP_INTERVAL_SECONDS))

def post_process_samples(self):
# we do *not* do this here to avoid concurrent updates (actors are single-threaded) but rather to make it clear that we use
# only a snapshot and that new data will go to a new sample set.
raw_samples = self.raw_samples
self.raw_samples = []
self.sample_post_processor(raw_samples)

rally/esrally/driver/driver.py

Lines 1062 to 1068 in 2470328

# this will be a noop for the in-memory metrics store. If we use an ES metrics store however, this will ensure that we already send
# the data and also clear the in-memory buffer. This allows users to see data already while running the benchmark. In cases where
# it does not matter (i.e. in-memory) we will still defer this step until the end.
#
# Don't force refresh here in the interest of short processing times. We don't need to query immediately afterwards so there is
# no need for frequent refreshes.
self.metrics_store.flush(refresh=False)

Ideas:

  1. Maybe revise the default queue size for the samplers to something lower than 2^20
  2. Consider implementing similar serialisation and compression techniques on samples as part of the routine flush() call made during a benchmark's execution for in-memory metrics stores?
@b-deam b-deam added the enhancement Improves the status quo label Jun 2, 2023
@b-deam b-deam self-assigned this Jun 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Improves the status quo
Projects
None yet
Development

No branches or pull requests

1 participant