Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add transform-stats operation type #1288

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2326,6 +2326,41 @@ Meta-data

This operation returns no meta-data.

transform-stats
~~~~~~~~~~~
hendrikmuhs marked this conversation as resolved.
Show resolved Hide resolved

With the operation type ``transform-stats`` you can call the `transform stats API https://www.elastic.co/guide/en/elasticsearch/reference/current/get-transform-stats.html`_.
hendrikmuhs marked this conversation as resolved.
Show resolved Hide resolved

Properties
""""""""""

* ``transform_id`` (mandatory): The id of the transform.
hendrikmuhs marked this conversation as resolved.
Show resolved Hide resolved
* ``condition`` (optional, defaults to no condition): A structured object with the properties ``path`` and ``expected-value``. If the actual value returned by transform stats API is equal to the expected value at the provided path, this operation will return successfully. See below for an example how this can be used.

In the following example the ``transform-stats`` operation will wait until all data has been processed::

{
"operation-type": "transform-stats",
"transform-id": "mytransform",
"condition": {
"path": "checkpointing.operations_behind",
"expected-value": null
},
"retry-until-success": true
}

Throughput will be reported as number of completed `transform-stats` operations per second.
hendrikmuhs marked this conversation as resolved.
Show resolved Hide resolved

This operation is :ref:`retryable <track_operations>`.

Meta-data
"""""""""

* ``weight``: Always 1.
* ``unit``: Always "ops".
* ``success``: A boolean indicating whether the operation has succeeded.
* ``transform-stats``: The stats for the given transform id.
b-deam marked this conversation as resolved.
Show resolved Hide resolved

composite
~~~~~~~~~

Expand Down
53 changes: 53 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def register_default_runners():
register_runner(track.OperationType.StartTransform, Retry(StartTransform()), async_runner=True)
register_runner(track.OperationType.WaitForTransform, Retry(WaitForTransform()), async_runner=True)
register_runner(track.OperationType.DeleteTransform, Retry(DeleteTransform()), async_runner=True)
register_runner(track.OperationType.TransformStats, Retry(TransformStats()), async_runner=True)


def runner_for(operation_type):
Expand Down Expand Up @@ -2116,6 +2117,58 @@ def __repr__(self, *args, **kwargs):
return "delete-transform"


class TransformStats(Runner):
"""
Gather index stats for one or all transforms.
"""

def _get(self, v, path):
if v is None:
return None
elif len(path) == 1:
return v.get(path[0])
else:
return self._get(v.get(path[0]), path[1:])

def _safe_string(self, v):
return str(v) if v is not None else None

async def __call__(self, es, params):
api_kwargs = self._default_kw_params(params)
transform_id = mandatory(params, "transform-id", self)
condition = params.get("condition")
response = await es.transform.get_transform_stats(transform_id=transform_id, **api_kwargs)
transforms = response.get("transforms", [])
transform_stats = transforms[0] if len(transforms) > 0 else {}
if condition:
path = mandatory(condition, "path", repr(self))
expected_value = mandatory(condition, "expected-value", repr(self))
actual_value = self._get(transform_stats, path.split("."))
return {
"weight": 1,
"unit": "ops",
"condition": {
"path": path,
# avoid mapping issues in the ES metrics store by always rendering values as strings
"actual-value": self._safe_string(actual_value),
"expected-value": self._safe_string(expected_value)
},
# currently we only support "==" as a predicate but that might change in the future
"success": actual_value == expected_value,
"transform-stats": transform_stats
b-deam marked this conversation as resolved.
Show resolved Hide resolved
}
else:
return {
"weight": 1,
"unit": "ops",
"success": True,
"transform-stats": transform_stats
}

def __repr__(self, *args, **kwargs):
return "transform-stats"


class SubmitAsyncSearch(Runner):
async def __call__(self, es, params):
request_params = params.get("request-params", {})
Expand Down
3 changes: 3 additions & 0 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ class OperationType(Enum):
DeleteComposableTemplate = 1031
CreateComponentTemplate = 1032
DeleteComponentTemplate = 1033
TransformStats = 1034

@property
def admin_op(self):
Expand Down Expand Up @@ -704,6 +705,8 @@ def from_hyphenated_string(cls, v):
return OperationType.WaitForTransform
elif v == "delete-transform":
return OperationType.DeleteTransform
elif v == "transform-stats":
return OperationType.TransformStats
elif v == "create-data-stream":
return OperationType.CreateDataStream
elif v == "delete-data-stream":
Expand Down
136 changes: 136 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4664,6 +4664,142 @@ async def test_delete_transform(self, es):
ignore=[404])


class TransformStatsRunnerTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_transform_stats_with_timeout_and_headers(self, es):
es.transform.get_transform_stats.return_value = as_future({})
transform_stats = runner.TransformStats()
transform_id = "a-transform"
result = await transform_stats(es, params={"transform-id": transform_id,
"request-timeout": 3.0,
"headers": {"header1": "value1"},
"opaque-id": "test-id1"})
self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertTrue(result["success"])

es.transform.get_transform_stats.assert_called_once_with(transform_id=transform_id,
headers={"header1": "value1"},
opaque_id="test-id1",
request_timeout=3.0)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_transform_stats_with_failed_condition(self, es):
transform_id = "a-transform"
es.transform.get_transform_stats.return_value = as_future({
"count": 3,
"transforms": [
{
"id": transform_id,
"state": "started",
"stats": {},
"checkpointing": {
"last": {},
"operations_behind": 10000
}
}
]
})

transform_stats = runner.TransformStats()

result = await transform_stats(es, params={
"transform-id": transform_id,
"condition": {
"path": "checkpointing.operations_behind",
"expected-value": None
}
})
self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertFalse(result["success"])
self.assertDictEqual({
"path": "checkpointing.operations_behind",
"actual-value": "10000",
"expected-value": None
}, result["condition"])

es.transform.get_transform_stats.assert_called_once_with(transform_id=transform_id)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_transform_stats_with_successful_condition(self, es):
transform_id = "a-transform"
es.transform.get_transform_stats.return_value = as_future({
"count": 3,
"transforms": [
{
"id": transform_id,
"state": "started",
"stats": {},
"checkpointing": {
"last": {}
}
}
]
})

transform_stats = runner.TransformStats()

result = await transform_stats(es, params={
"transform-id": transform_id,
"condition": {
"path": "checkpointing.operations_behind",
"expected-value": None
}
})
self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertTrue(result["success"])
self.assertDictEqual({
"path": "checkpointing.operations_behind",
"actual-value": None,
"expected-value": None
}, result["condition"])

es.transform.get_transform_stats.assert_called_once_with(transform_id=transform_id)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_transform_stats_with_non_existing_path(self, es):
transform_id = "a-transform"
es.transform.get_transform_stats.return_value = as_future({
"count": 3,
"transforms": [
{
"id": transform_id,
"state": "started",
"stats": {},
"checkpointing": {
"last": {}
}
}
]
})

transform_stats = runner.TransformStats()

result = await transform_stats(es, params={
"transform-id": transform_id,
"condition": {
"path": "checkpointing.last.checkpoint",
"expected-value": 42
}
})
self.assertEqual(1, result["weight"])
self.assertEqual("ops", result["unit"])
self.assertFalse(result["success"])
self.assertDictEqual({
"path": "checkpointing.last.checkpoint",
"actual-value": None,
"expected-value": "42"
}, result["condition"])

es.transform.get_transform_stats.assert_called_once_with(transform_id=transform_id)


class SubmitAsyncSearchTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
@run_async
Expand Down