From 176dd95849ece9eb1182c218f8e1c7d0c782c205 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Thu, 19 Nov 2020 08:40:53 +0100 Subject: [PATCH] Ensure admin tasks are always executed When a user specified `--exclude-tasks`, Rally did not only exclude the specified tasks but also all administrative tasks. This happened because of a broken logic that always added an admin task filter internally. Due to another bug, administrative tasks where never correctly detected but we fixed that broken logic in #1112, which finally uncovered this issue. With this commit we ensure now that administrative tasks are always executed regardless whether the user excludes them. We also increase test coverage in this part of the code. --- esrally/track/loader.py | 16 ++++++++++------ tests/track/loader_test.py | 36 ++++++++++++++++++++++++++---------- tests/track/track_test.py | 27 +++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 16 deletions(-) diff --git a/esrally/track/loader.py b/esrally/track/loader.py index 3cefda864..4d598f770 100644 --- a/esrally/track/loader.py +++ b/esrally/track/loader.py @@ -693,27 +693,31 @@ def filter_tasks(t, filters, exclude=False): logger = logging.getLogger(__name__) - def filter_out_match(task, filters, exclude): - for f in filters: + def filter_out_match(task, user_defined_filters, force_include_filters, exclude): + for f in force_include_filters: if task.matches(f): - if hasattr(task, 'tasks') and exclude: + return False + + for f in user_defined_filters: + if task.matches(f): + if hasattr(task, "tasks") and exclude: return False return exclude return not exclude # always include administrative tasks - complete_filters = [track.AdminTaskFilter()] + filters + force_include_filters = [track.AdminTaskFilter()] for challenge in t.challenges: # don't modify the schedule while iterating over it tasks_to_remove = [] for task in challenge.schedule: - if filter_out_match(task, complete_filters, exclude): + if filter_out_match(task, filters, force_include_filters, exclude): tasks_to_remove.append(task) else: leafs_to_remove = [] for leaf_task in task: - if filter_out_match(leaf_task, complete_filters, exclude): + if filter_out_match(leaf_task, filters, force_include_filters, exclude): leafs_to_remove.append(leaf_task) for leaf_task in leafs_to_remove: logger.info("Removing sub-task [%s] from challenge [%s] due to task filter.", leaf_task, challenge) diff --git a/tests/track/loader_test.py b/tests/track/loader_test.py index c41f736de..ed3c93db6 100644 --- a/tests/track/loader_test.py +++ b/tests/track/loader_test.py @@ -1312,6 +1312,10 @@ def test_filters_tasks(self): "description": "description for unit test", "indices": [{"name": "test-index", "auto-managed": False}], "operations": [ + { + "name": "create-index", + "operation-type": "create-index" + }, { "name": "bulk-index", "operation-type": "bulk" @@ -1338,6 +1342,9 @@ def test_filters_tasks(self): { "name": "default-challenge", "schedule": [ + { + "operation": "create-index" + }, { "parallel": { "tasks": [ @@ -1376,7 +1383,7 @@ def test_filters_tasks(self): } reader = loader.TrackSpecificationReader() full_track = reader("unittest", track_specification, "/mappings") - self.assertEqual(4, len(full_track.challenges[0].schedule)) + self.assertEqual(5, len(full_track.challenges[0].schedule)) filtered = loader.filter_tasks(full_track, [track.TaskNameFilter("index-3"), track.TaskOpTypeFilter("search"), @@ -1385,16 +1392,21 @@ def test_filters_tasks(self): ]) schedule = filtered.challenges[0].schedule - self.assertEqual(3, len(schedule)) - self.assertEqual(["index-3", "match-all-parallel"], [t.name for t in schedule[0].tasks]) - self.assertEqual("match-all-serial", schedule[1].name) - self.assertEqual("cluster-stats", schedule[2].name) + self.assertEqual(4, len(schedule)) + self.assertEqual("create-index", schedule[0].name) + self.assertEqual(["index-3", "match-all-parallel"], [t.name for t in schedule[1].tasks]) + self.assertEqual("match-all-serial", schedule[2].name) + self.assertEqual("cluster-stats", schedule[3].name) def test_filters_exclude_tasks(self): track_specification = { "description": "description for unit test", "indices": [{"name": "test-index", "auto-managed": False}], "operations": [ + { + "name": "create-index", + "operation-type": "create-index" + }, { "name": "bulk-index", "operation-type": "bulk" @@ -1421,6 +1433,9 @@ def test_filters_exclude_tasks(self): { "name": "default-challenge", "schedule": [ + { + "operation": "create-index" + }, { "parallel": { "tasks": [ @@ -1459,15 +1474,16 @@ def test_filters_exclude_tasks(self): } reader = loader.TrackSpecificationReader() full_track = reader("unittest", track_specification, "/mappings") - self.assertEqual(4, len(full_track.challenges[0].schedule)) + self.assertEqual(5, len(full_track.challenges[0].schedule)) filtered = loader.filter_tasks(full_track, [track.TaskNameFilter("index-3"), track.TaskOpTypeFilter("search")], exclude=True) schedule = filtered.challenges[0].schedule - self.assertEqual(3, len(schedule)) - self.assertEqual(["index-1",'index-2'], [t.name for t in schedule[0].tasks]) - self.assertEqual("node-stats", schedule[1].name) - self.assertEqual("cluster-stats", schedule[2].name) + self.assertEqual(4, len(schedule)) + self.assertEqual("create-index", schedule[0].name) + self.assertEqual(["index-1", "index-2"], [t.name for t in schedule[1].tasks]) + self.assertEqual("node-stats", schedule[2].name) + self.assertEqual("cluster-stats", schedule[3].name) class TrackSpecificationReaderTests(TestCase): diff --git a/tests/track/track_test.py b/tests/track/track_test.py index aa2357b30..bc8595241 100644 --- a/tests/track/track_test.py +++ b/tests/track/track_test.py @@ -231,6 +231,33 @@ def test_string_hyphenation_is_symmetric(self): self.assertEqual(op_type, track.OperationType.from_hyphenated_string(op_type.to_hyphenated_string())) +class TaskFilterTests(TestCase): + def create_index_task(self): + return track.Task("create-index-task", + track.Operation("create-index-op", + operation_type=track.OperationType.CreateIndex.to_hyphenated_string())) + + def search_task(self): + return track.Task("search-task", + track.Operation("search-op", + operation_type=track.OperationType.Search.to_hyphenated_string())) + + def test_admin_task_filter(self): + f = track.AdminTaskFilter() + self.assertTrue(f.matches(self.create_index_task())) + self.assertFalse(f.matches(self.search_task())) + + def test_task_name_filter(self): + f = track.TaskNameFilter("create-index-task") + self.assertTrue(f.matches(self.create_index_task())) + self.assertFalse(f.matches(self.search_task())) + + def test_task_op_type_filter(self): + f = track.TaskOpTypeFilter(track.OperationType.CreateIndex.to_hyphenated_string()) + self.assertTrue(f.matches(self.create_index_task())) + self.assertFalse(f.matches(self.search_task())) + + class TaskTests(TestCase): def task(self, schedule=None, target_throughput=None, target_interval=None): op = track.Operation("bulk-index", track.OperationType.Bulk.name)