Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add transform-stats operation type #1288

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2326,6 +2326,40 @@ Meta-data

This operation returns no meta-data.

transform-stats
~~~~~~~~~~~~~~~

With the operation type ``transform-stats`` you can call the `transform stats API <https://www.elastic.co/guide/en/elasticsearch/reference/current/get-transform-stats.html>`_.

Properties
""""""""""

* ``transform-id`` (mandatory): The id of the transform.
* ``condition`` (optional, defaults to no condition): A structured object with the properties ``path`` and ``expected-value``. If the actual value returned by transform stats API is equal to the expected value at the provided path, this operation will return successfully. See below for an example how this can be used.

In the following example the ``transform-stats`` operation will wait until all data has been processed::

{
"operation-type": "transform-stats",
"transform-id": "mytransform",
"condition": {
"path": "checkpointing.operations_behind",
"expected-value": null
},
"retry-until-success": true
}

Throughput will be reported as number of completed ``transform-stats`` operations per second.

This operation is :ref:`retryable <track_operations>`.

Meta-data
"""""""""

* ``weight``: Always 1.
* ``unit``: Always "ops".
* ``success``: A boolean indicating whether the operation has succeeded.

composite
~~~~~~~~~

Expand Down
51 changes: 51 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def register_default_runners():
register_runner(track.OperationType.StartTransform, Retry(StartTransform()), async_runner=True)
register_runner(track.OperationType.WaitForTransform, Retry(WaitForTransform()), async_runner=True)
register_runner(track.OperationType.DeleteTransform, Retry(DeleteTransform()), async_runner=True)
register_runner(track.OperationType.TransformStats, Retry(TransformStats()), async_runner=True)


def runner_for(operation_type):
Expand Down Expand Up @@ -2116,6 +2117,56 @@ def __repr__(self, *args, **kwargs):
return "delete-transform"


class TransformStats(Runner):
"""
Gather index stats for one or all transforms.
"""

def _get(self, v, path):
if v is None:
return None
elif len(path) == 1:
return v.get(path[0])
else:
return self._get(v.get(path[0]), path[1:])

def _safe_string(self, v):
return str(v) if v is not None else None

async def __call__(self, es, params):
api_kwargs = self._default_kw_params(params)
transform_id = mandatory(params, "transform-id", self)
condition = params.get("condition")
response = await es.transform.get_transform_stats(transform_id=transform_id, **api_kwargs)
transforms = response.get("transforms", [])
transform_stats = transforms[0] if len(transforms) > 0 else {}
if condition:
path = mandatory(condition, "path", repr(self))
expected_value = mandatory(condition, "expected-value", repr(self))
actual_value = self._get(transform_stats, path.split("."))
return {
"weight": 1,
"unit": "ops",
"condition": {
"path": path,
# avoid mapping issues in the ES metrics store by always rendering values as strings
"actual-value": self._safe_string(actual_value),
"expected-value": self._safe_string(expected_value)
},
# currently we only support "==" as a predicate but that might change in the future
"success": actual_value == expected_value
}
else:
return {
"weight": 1,
"unit": "ops",
"success": True
}

def __repr__(self, *args, **kwargs):
return "transform-stats"


class SubmitAsyncSearch(Runner):
async def __call__(self, es, params):
request_params = params.get("request-params", {})
Expand Down
3 changes: 3 additions & 0 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ class OperationType(Enum):
DeleteComposableTemplate = 1031
CreateComponentTemplate = 1032
DeleteComponentTemplate = 1033
TransformStats = 1034

@property
def admin_op(self):
Expand Down Expand Up @@ -704,6 +705,8 @@ def from_hyphenated_string(cls, v):
return OperationType.WaitForTransform
elif v == "delete-transform":
return OperationType.DeleteTransform
elif v == "transform-stats":
return OperationType.TransformStats
elif v == "create-data-stream":
return OperationType.CreateDataStream
elif v == "delete-data-stream":
Expand Down
136 changes: 136 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4664,6 +4664,142 @@ async def test_delete_transform(self, es):
ignore=[404])


class TransformStatsRunnerTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_transform_stats_with_timeout_and_headers(self, es):
es.transform.get_transform_stats.return_value = as_future({})
transform_stats = runner.TransformStats()
transform_id = "a-transform"
result = await transform_stats(es, params={"transform-id": transform_id,
"request-timeout": 3.0,
"headers": {"header1": "value1"},
"opaque-id": "test-id1"})
self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertTrue(result["success"])

es.transform.get_transform_stats.assert_called_once_with(transform_id=transform_id,
headers={"header1": "value1"},
opaque_id="test-id1",
request_timeout=3.0)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_transform_stats_with_failed_condition(self, es):
transform_id = "a-transform"
es.transform.get_transform_stats.return_value = as_future({
"count": 3,
"transforms": [
{
"id": transform_id,
"state": "started",
"stats": {},
"checkpointing": {
"last": {},
"operations_behind": 10000
}
}
]
})

transform_stats = runner.TransformStats()

result = await transform_stats(es, params={
"transform-id": transform_id,
"condition": {
"path": "checkpointing.operations_behind",
"expected-value": None
}
})
self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertFalse(result["success"])
self.assertDictEqual({
"path": "checkpointing.operations_behind",
"actual-value": "10000",
"expected-value": None
}, result["condition"])

es.transform.get_transform_stats.assert_called_once_with(transform_id=transform_id)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_transform_stats_with_successful_condition(self, es):
transform_id = "a-transform"
es.transform.get_transform_stats.return_value = as_future({
"count": 3,
"transforms": [
{
"id": transform_id,
"state": "started",
"stats": {},
"checkpointing": {
"last": {}
}
}
]
})

transform_stats = runner.TransformStats()

result = await transform_stats(es, params={
"transform-id": transform_id,
"condition": {
"path": "checkpointing.operations_behind",
"expected-value": None
}
})
self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertTrue(result["success"])
self.assertDictEqual({
"path": "checkpointing.operations_behind",
"actual-value": None,
"expected-value": None
}, result["condition"])

es.transform.get_transform_stats.assert_called_once_with(transform_id=transform_id)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_transform_stats_with_non_existing_path(self, es):
transform_id = "a-transform"
es.transform.get_transform_stats.return_value = as_future({
"count": 3,
"transforms": [
{
"id": transform_id,
"state": "started",
"stats": {},
"checkpointing": {
"last": {}
}
}
]
})

transform_stats = runner.TransformStats()

result = await transform_stats(es, params={
"transform-id": transform_id,
"condition": {
"path": "checkpointing.last.checkpoint",
"expected-value": 42
}
})
self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertFalse(result["success"])
self.assertDictEqual({
"path": "checkpointing.last.checkpoint",
"actual-value": None,
"expected-value": "42"
}, result["condition"])

es.transform.get_transform_stats.assert_called_once_with(transform_id=transform_id)


class SubmitAsyncSearchTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
@run_async
Expand Down