Skip to content

Commit

Permalink
Add progress indication for daily logging volume
Browse files Browse the repository at this point in the history
With this commit we implement the property `pecent_completed` on the
Elasticlogs bulk source to provide an informational progress indication
for the challenge `index-logs-fixed-daily-volume`. Because we cannot
specify a total number of iterations or a specific time-period we need
to rely on feedback from the generator when all data has been ingested
and thus we also need to rely on the generator to indicate progress.

Relates elastic#39
  • Loading branch information
danielmitterdorfer authored Sep 2, 2019
1 parent 343e3fa commit 836eaee
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
10 changes: 9 additions & 1 deletion eventdata/parameter_sources/elasticlogs_bulk_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,15 @@ def partition(self, partition_index, total_partitions):
return ElasticlogsBulkSource(self.orig_args[0], new_params, **self.orig_args[2])

def size(self):
return 1
# progress is determined either by:
#
# * the `time-period` or `iteration` property specified on the corresponding task
# * `#params()` raising `StopIteration` when `RandomEvent` is exhausted
return None

@property
def percent_completed(self):
return self._randomevent.percent_completed

def params(self):
# Build bulk array
Expand Down
17 changes: 13 additions & 4 deletions eventdata/parameter_sources/randomevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,23 @@ def __init__(self, params, agent=Agent, client_ip=ClientIp, referrer=Referrer, r
if "daily_logging_volume" in params and "client_count" in params:
# in bytes
self.daily_logging_volume = convert_to_bytes(params["daily_logging_volume"]) // int(params["client_count"])
self.current_logging_volume = 0
self.remaining_days = params.get("number_of_days")
else:
self.daily_logging_volume = None
self.current_logging_volume = 0
self.remaining_days = None
self.current_logging_volume = 0
self.total_days = params.get("number_of_days")
self.remaining_days = self.total_days
self.record_raw_event_size = params.get("record_raw_event_size", False)

@property
def percent_completed(self):
if self.daily_logging_volume is None or self.total_days is None:
return None
else:
full_days = self.total_days - self.remaining_days
already_generated = self.daily_logging_volume * full_days + self.current_logging_volume
total = self.total_days * self.daily_logging_volume
return already_generated / total

def generate_event(self):
if self.remaining_days == 0:
raise StopIteration()
Expand Down
11 changes: 10 additions & 1 deletion tests/parameter_sources/randomevent_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def test_random_events_with_daily_logging_volume():
referrer=StaticReferrer,
request=StaticRequest)

assert e.percent_completed is None

# 5 events fit into one kilobyte
for i in range(5):
doc, index, _ = e.generate_event()
Expand All @@ -110,6 +112,8 @@ def test_random_events_with_daily_logging_volume():
doc, index, _ = e.generate_event()
assert index == "logs-20190107"

assert e.percent_completed is None


def test_random_events_with_daily_logging_volume_and_maximum_days():
e = RandomEvent(params={
Expand All @@ -127,18 +131,23 @@ def test_random_events_with_daily_logging_volume_and_maximum_days():
referrer=StaticReferrer,
request=StaticRequest)

assert e.percent_completed == 0.0

# 5 events fit into one kilobyte
for i in range(5):
doc, index, _ = e.generate_event()
assert index == "logs-20190105"

assert e.percent_completed == 0.5

for i in range(5):
doc, index, _ = e.generate_event()
assert index == "logs-20190106"
# no more events allowed on the next day
with pytest.raises(StopIteration):
doc, index, _ = e.generate_event()
print(index)

assert e.percent_completed == 1.0


def test_convert_bytes_to_bytes():
Expand Down

0 comments on commit 836eaee

Please sign in to comment.