Skip to content

Commit

Permalink
Improve error message for missing corpora (#967)
Browse files Browse the repository at this point in the history
With this commit we provide a more helpful error message when a track uses
bulk-indexing but does not define any corpora.

Closes #568
  • Loading branch information
bartier authored Apr 20, 2020
1 parent 6721301 commit 137b4fc
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 11 deletions.
2 changes: 1 addition & 1 deletion esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ async def __call__(self, es, params):
* ``index``: name of the affected index. May be `None` if it could not be derived.
* ``bulk-size``: bulk size, e.g. 5.000.
* ``bulk-request-size-bytes``: size of the full bulk requset in bytes
* ``bulk-request-size-bytes``: size of the full bulk request in bytes
* ``total-document-size-bytes``: size of all documents contained in the bulk request in bytes
* ``weight``: operation-agnostic representation of the bulk size (used internally by Rally for throughput calculation).
* ``unit``: The unit in which to interpret ``bulk-size`` and ``weight``. Always "docs".
Expand Down
4 changes: 4 additions & 0 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,10 @@ def __init__(self, track, params, **kwargs):

self.corpora = self.used_corpora(track, params)

if len(self.corpora) == 0:
raise exceptions.InvalidSyntax(f"There is no document corpus definition for track {track}. You must add at "
f"least one before making bulk requests to Elasticsearch.")

for corpus in self.corpora:
for document_set in corpus.documents:
if document_set.includes_action_and_meta_data and self.id_conflicts != IndexIdConflict.NoConflicts:
Expand Down
98 changes: 88 additions & 10 deletions tests/track/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,39 +674,82 @@ def test_build_conflicting_ids(self):

class BulkIndexParamSourceTests(TestCase):
def test_create_without_params(self):
corpus = track.DocumentCorpus(name="default", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK,
number_of_documents=10,
target_index="test-idx",
target_type="test-type"
)])

with self.assertRaises(exceptions.InvalidSyntax) as ctx:
params.BulkIndexParamSource(track=track.Track(name="unit-test"), params={})
params.BulkIndexParamSource(track=track.Track(name="unit-test", corpora=[corpus]), params={})

self.assertEqual("Mandatory parameter 'bulk-size' is missing", ctx.exception.args[0])

def test_create_without_corpora_definition(self):
with self.assertRaises(exceptions.InvalidSyntax) as ctx:
params.BulkIndexParamSource(track=track.Track(name="unit-test"), params={})

self.assertEqual("There is no document corpus definition for track unit-test. "
"You must add at least one before making bulk requests to Elasticsearch.", ctx.exception.args[0])


def test_create_with_non_numeric_bulk_size(self):
corpus = track.DocumentCorpus(name="default", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK,
number_of_documents=10,
target_index="test-idx",
target_type="test-type"
)])

with self.assertRaises(exceptions.InvalidSyntax) as ctx:
params.BulkIndexParamSource(track=track.Track(name="unit-test"), params={
params.BulkIndexParamSource(track=track.Track(name="unit-test", corpora=[corpus]), params={
"bulk-size": "Three"
})

self.assertEqual("'bulk-size' must be numeric", ctx.exception.args[0])

def test_create_with_negative_bulk_size(self):
corpus = track.DocumentCorpus(name="default", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK,
number_of_documents=10,
target_index="test-idx",
target_type="test-type"
)])

with self.assertRaises(exceptions.InvalidSyntax) as ctx:
params.BulkIndexParamSource(track=track.Track(name="unit-test"), params={
params.BulkIndexParamSource(track=track.Track(name="unit-test", corpora=[corpus]), params={
"bulk-size": -5
})

self.assertEqual("'bulk-size' must be positive but was -5", ctx.exception.args[0])

def test_create_with_fraction_smaller_batch_size(self):
corpus = track.DocumentCorpus(name="default", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK,
number_of_documents=10,
target_index="test-idx",
target_type="test-type"
)])

with self.assertRaises(exceptions.InvalidSyntax) as ctx:
params.BulkIndexParamSource(track=track.Track(name="unit-test"), params={
params.BulkIndexParamSource(track=track.Track(name="unit-test", corpora=[corpus]), params={
"bulk-size": 5,
"batch-size": 3
})

self.assertEqual("'batch-size' must be greater than or equal to 'bulk-size'", ctx.exception.args[0])

def test_create_with_fraction_larger_batch_size(self):
corpus = track.DocumentCorpus(name="default", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK,
number_of_documents=10,
target_index="test-idx",
target_type="test-type"
)])

with self.assertRaises(exceptions.InvalidSyntax) as ctx:
params.BulkIndexParamSource(track=track.Track(name="unit-test"), params={
params.BulkIndexParamSource(track=track.Track(name="unit-test", corpora=[corpus]), params={
"bulk-size": 5,
"batch-size": 8
})
Expand Down Expand Up @@ -748,34 +791,62 @@ def test_create_with_unknown_on_conflict_setting(self):
self.assertEqual("Unknown 'on-conflict' setting [delete]", ctx.exception.args[0])

def test_create_with_ingest_percentage_too_low(self):
corpus = track.DocumentCorpus(name="default", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK,
number_of_documents=10,
target_index="test-idx",
target_type="test-type"
)])

with self.assertRaises(exceptions.InvalidSyntax) as ctx:
params.BulkIndexParamSource(track=track.Track(name="unit-test"), params={
params.BulkIndexParamSource(track=track.Track(name="unit-test", corpora=[corpus]), params={
"bulk-size": 5000,
"ingest-percentage": 0.0
})

self.assertEqual("'ingest-percentage' must be in the range (0.0, 100.0] but was 0.0", ctx.exception.args[0])

def test_create_with_ingest_percentage_too_high(self):
corpus = track.DocumentCorpus(name="default", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK,
number_of_documents=10,
target_index="test-idx",
target_type="test-type"
)])

with self.assertRaises(exceptions.InvalidSyntax) as ctx:
params.BulkIndexParamSource(track=track.Track(name="unit-test"), params={
params.BulkIndexParamSource(track=track.Track(name="unit-test", corpora=[corpus]), params={
"bulk-size": 5000,
"ingest-percentage": 100.1
})

self.assertEqual("'ingest-percentage' must be in the range (0.0, 100.0] but was 100.1", ctx.exception.args[0])

def test_create_with_ingest_percentage_not_numeric(self):
corpus = track.DocumentCorpus(name="default", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK,
number_of_documents=10,
target_index="test-idx",
target_type="test-type"
)])

with self.assertRaises(exceptions.InvalidSyntax) as ctx:
params.BulkIndexParamSource(track=track.Track(name="unit-test"), params={
params.BulkIndexParamSource(track=track.Track(name="unit-test", corpora=[corpus]), params={
"bulk-size": 5000,
"ingest-percentage": "100 percent"
})

self.assertEqual("'ingest-percentage' must be numeric", ctx.exception.args[0])

def test_create_valid_param_source(self):
self.assertIsNotNone(params.BulkIndexParamSource(track.Track(name="unit-test"), params={
corpus = track.DocumentCorpus(name="default", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK,
number_of_documents=10,
target_index="test-idx",
target_type="test-type"
)])

self.assertIsNotNone(params.BulkIndexParamSource(track.Track(name="unit-test", corpora=[corpus]), params={
"conflicts": "random",
"bulk-size": 5000,
"batch-size": 20000,
Expand Down Expand Up @@ -946,7 +1017,14 @@ def schedule(param_source):
self.assertEqual(3, len(list(schedule(partition))))

def test_create_with_conflict_probability_zero(self):
params.BulkIndexParamSource(track=track.Track(name="unit-test"), params={
corpus = track.DocumentCorpus(name="default", documents=[
track.Documents(source_format=track.Documents.SOURCE_FORMAT_BULK,
number_of_documents=10,
target_index="test-idx",
target_type="test-type"
)])

params.BulkIndexParamSource(track=track.Track(name="unit-test", corpora=[corpus]), params={
"bulk-size": 5000,
"conflicts": "sequential",
"conflict-probability": 0
Expand Down

0 comments on commit 137b4fc

Please sign in to comment.