Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk action refresh param #1743

Merged
merged 7 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@ Properties
* ``recency`` (optional, defaults to 0): A number between [0,1] indicating whether to bias conflicting ids towards more recent ids (``recency`` towards 1) or whether to consider all ids for id conflicts (``recency`` towards 0). See the diagram below for details.
* ``detailed-results`` (optional, defaults to ``false``): Records more detailed meta-data for bulk requests. As it analyzes the corresponding bulk response in more detail, this might incur additional overhead which can skew measurement results. See the section below for the meta-data that are returned. This property must be set to ``true`` for individual bulk request failures to be logged by Rally.
* ``timeout`` (optional, defaults to ``1m``): Defines the `time period that Elasticsearch will wait per action <https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-query-params>`_ until it has finished processing the following operations: automatic index creation, dynamic mapping updates, waiting for active shards.
* ``refresh`` (optional): Control Elasticsearch refresh behavior for bulk requests via the ``refresh`` bulk API query parameter. Valid values are ``true``, ``wait_for``, and ``false``. Parameter values are specified as a string. If ``true``, Elasticsearch will refresh target shards in the background. If ``wait_for``, Elasticsearch blocks bulk requests until affected shards have been refreshed. If ``false``, Elasticsearch will use the default refresh behavior.

With multiple ``clients``, Rally will split each document using as many splits as there are ``clients``. This ensures that the bulk index operations are efficiently parallelized but has the drawback that the ingestion is not done in the order of each document. For example, if ``clients`` is set to 2, one client will index the document starting from the beginning, while the other will index starting from the middle.

Expand Down
13 changes: 13 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,9 @@ async def __call__(self, es, params):
* ``timeout``: a time unit value indicating the server-side timeout for the operation
* ``request-timeout``: a non-negative float indicating the client-side timeout for the operation. If not present, defaults to
``None`` and potentially falls back to the global timeout setting.
* ``refresh``: If ``"true"``, Elasticsearch will issue an async refresh to the index; i.e., ``?refresh=true``.
If ``"wait_for"``, Elasticsearch issues a synchronous refresh to the index; i.e., ``?refresh=wait_for``.
If ``"false""``, Elasticsearch will use refresh defaults; i.e., ``?refresh=false``.
"""
detailed_results = params.get("detailed-results", False)
api_kwargs = self._default_kw_params(params)
Expand All @@ -501,6 +504,16 @@ async def __call__(self, es, params):
bulk_params["timeout"] = params["timeout"]
if "pipeline" in params:
bulk_params["pipeline"] = params["pipeline"]
if "refresh" in params:
valid_refresh_values = ("wait_for", "true", "false")
if params["refresh"] in valid_refresh_values:
bulk_params["refresh"] = params["refresh"]
else:
self.logger.info(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my last comment, I see now what you're checking for. I think both INFO and WARNING aren't enough this case, and we should probably just error out because of an invalid value - otherwise we risk returning misleading results and the corresponding log message is buried deep in the file somewhere.

Maybe we can error out similar to how it's done here?:

rally/esrally/driver/runner.py

Lines 2609 to 2612 in b1a822c

if op_type not in self.supported_op_types:
raise exceptions.RallyAssertionError(
f"Unsupported operation-type [{op_type}]. Use one of [{', '.join(self.supported_op_types)}]."
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably also just lean on the client itself to return an error, if you pass something in that's not valid does it return anything meaningful?

"Using default bulk refresh parameter value \"false\" in place of unrecognized parameter value '%s'. Use one of %s.",
params["refresh"],
", ".join(valid_refresh_values),
)

with_action_metadata = mandatory(params, "action-metadata-present", self)
bulk_size = mandatory(params, "bulk-size", self)
Expand Down
8 changes: 8 additions & 0 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ def __init__(self, track, params, **kwargs):
raise exceptions.InvalidSyntax("'batch-size' must be numeric")

self.ingest_percentage = self.float_param(params, name="ingest-percentage", default_value=100, min_value=0, max_value=100)
self.refresh = params.get("refresh")
self.param_source = PartitionBulkIndexParamSource(
self.corpora,
self.batch_size,
Expand All @@ -644,6 +645,7 @@ def __init__(self, track, params, **kwargs):
self.on_conflict,
self.recency,
self.pipeline,
self.refresh,
self._params,
)

Expand Down Expand Up @@ -705,6 +707,7 @@ def __init__(
on_conflict,
recency,
pipeline=None,
refresh=None,
original_params=None,
):
"""
Expand All @@ -719,6 +722,10 @@ def __init__(
:param recency: A number between [0.0, 1.0] indicating whether to bias generation of conflicting ids towards more recent ones.
May be None.
:param pipeline: The name of the ingest pipeline to run.
:param refresh: Optional string values are "true", "wait_for", "false".
If "true", Elasticsearch refreshes the affected shards in the background.
If "wait_for", the client is blocked until Elasticsearch finishes the refresh operation.
If "false", Elasticsearch will use the default refresh behavior.
:param original_params: The original dict passed to the parent parameter source.
"""
self.corpora = corpora
Expand All @@ -732,6 +739,7 @@ def __init__(
self.on_conflict = on_conflict
self.recency = recency
self.pipeline = pipeline
self.refresh = refresh
b-deam marked this conversation as resolved.
Show resolved Hide resolved
self.original_params = original_params
# this is only intended for unit-testing
self.create_reader = original_params.pop("__create_reader", create_default_reader)
Expand Down
158 changes: 158 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,164 @@ async def test_bulk_index_error_logs_warning_with_detailed_stats_body(self, es):

es.bulk.assert_awaited_with(body=bulk_params["body"], params={})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_bulk_index_success_with_refresh_default(self, es):
bulk_response = {
"errors": False,
"took": 8,
"items": [{"create": {"_index": "test", "result": "created", "status": 201}}],
}
es.bulk = mock.AsyncMock(return_value=bulk_response)

bulk = runner.BulkIndex()

bulk_params = {
"body": _build_bulk_body("index_line"),
"index": "test",
"action-metadata-present": False,
"type": "_doc",
"bulk-size": 1,
"unit": "docs",
"detailed-results": True,
"refresh": "false",
}

result = await bulk(es, dict(bulk_params))

assert result == {
"took": 8,
"index": "test",
"weight": 1,
"unit": "docs",
"success": True,
"success-count": 1,
"error-count": 0,
"bulk-request-size-bytes": 10,
"ops": {"create": collections.Counter({"item-count": 1, "created": 1})},
"shards_histogram": [],
"total-document-size-bytes": 10,
}

es.bulk.assert_awaited_with(doc_type="_doc", index="test", body=bulk_params["body"], params={"refresh": "false"})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_bulk_index_success_with_refresh_true(self, es):
bulk_response = {
"errors": False,
"took": 8,
"items": [{"create": {"_index": "test", "result": "created", "status": 201, "forced_refresh": True}}],
}
es.bulk = mock.AsyncMock(return_value=bulk_response)

bulk = runner.BulkIndex()

bulk_params = {
"body": _build_bulk_body("index_line"),
"index": "test",
"action-metadata-present": False,
"type": "_doc",
"bulk-size": 1,
"unit": "docs",
"detailed-results": True,
"refresh": "true",
}

result = await bulk(es, dict(bulk_params))

assert result == {
"took": 8,
"index": "test",
"weight": 1,
"unit": "docs",
"success": True,
"success-count": 1,
"error-count": 0,
"bulk-request-size-bytes": 10,
"ops": {"create": collections.Counter({"item-count": 1, "created": 1})},
"shards_histogram": [],
"total-document-size-bytes": 10,
}

es.bulk.assert_awaited_with(doc_type="_doc", index="test", body=bulk_params["body"], params={"refresh": "true"})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_bulk_index_success_with_refresh_wait_for(self, es):
bulk_response = {
"errors": False,
"took": 8,
}
es.bulk = mock.AsyncMock(return_value=io.BytesIO(json.dumps(bulk_response).encode()))

bulk = runner.BulkIndex()

bulk_params = {
"body": _build_bulk_body("index_line"),
"index": "test",
"action-metadata-present": False,
"type": "_doc",
"bulk-size": 1,
"unit": "docs",
"refresh": "wait_for",
}

result = await bulk(es, bulk_params)

assert result == {
"took": 8,
"index": "test",
"weight": 1,
"unit": "docs",
"success": True,
"success-count": 1,
"error-count": 0,
}

es.bulk.assert_awaited_with(doc_type="_doc", index="test", body=bulk_params["body"], params={"refresh": "wait_for"})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_bulk_index_success_with_refresh_invalid(self, es):
bulk_response = {
"errors": False,
"took": 8,
"items": [{"create": {"_index": "test", "result": "created", "status": 201}}],
}
es.bulk = mock.AsyncMock(return_value=bulk_response)

bulk = runner.BulkIndex()

bulk_params = {
"body": _build_bulk_body("index_line"),
"index": "test",
"action-metadata-present": False,
"type": "_doc",
"bulk-size": 1,
"unit": "docs",
"detailed-results": True,
"refresh": "notvalid",
}

result = await bulk(es, dict(bulk_params))

assert result == {
"took": 8,
"index": "test",
"weight": 1,
"unit": "docs",
"success": True,
"success-count": 1,
"error-count": 0,
"bulk-request-size-bytes": 10,
"ops": {"create": collections.Counter({"item-count": 1, "created": 1})},
"shards_histogram": [],
"total-document-size-bytes": 10,
}

es.bulk.assert_awaited_with(doc_type="_doc", index="test", body=bulk_params["body"], params={})


class TestForceMergeRunner:
@mock.patch("elasticsearch.Elasticsearch")
Expand Down