Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk action refresh param #1743

Merged
merged 7 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@ Properties
* ``recency`` (optional, defaults to 0): A number between [0,1] indicating whether to bias conflicting ids towards more recent ids (``recency`` towards 1) or whether to consider all ids for id conflicts (``recency`` towards 0). See the diagram below for details.
* ``detailed-results`` (optional, defaults to ``false``): Records more detailed meta-data for bulk requests. As it analyzes the corresponding bulk response in more detail, this might incur additional overhead which can skew measurement results. See the section below for the meta-data that are returned. This property must be set to ``true`` for individual bulk request failures to be logged by Rally.
* ``timeout`` (optional, defaults to ``1m``): Defines the `time period that Elasticsearch will wait per action <https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-query-params>`_ until it has finished processing the following operations: automatic index creation, dynamic mapping updates, waiting for active shards.
* ``refresh`` (optional): Control Elasticsearch refresh behavior for bulk requests. Valid values are ``async``, ``sync``, and ``default``. Parameter values are mapped to their bulk API equivalents when using the ``refresh`` query parameter. If ``async``, Elasticsearch will refresh target shards in the background. If ``sync``, Elasticsearch blocks bulk requests until the refresh is complete. If ``default``, Elasticsearch will use the default refresh behavior.

With multiple ``clients``, Rally will split each document using as many splits as there are ``clients``. This ensures that the bulk index operations are efficiently parallelized but has the drawback that the ingestion is not done in the order of each document. For example, if ``clients`` is set to 2, one client will index the document starting from the beginning, while the other will index starting from the middle.

Expand Down
15 changes: 15 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,9 @@ async def __call__(self, es, params):
* ``timeout``: a time unit value indicating the server-side timeout for the operation
* ``request-timeout``: a non-negative float indicating the client-side timeout for the operation. If not present, defaults to
``None`` and potentially falls back to the global timeout setting.
* ``refresh``: If ``async``, Elasticsearch will issue an async refresh to the index like ?refresh=true.
If ``sync``, Elasticsearch issues a synchronous refresh to the index like ?refresh=wait_for.
If ``default``, Elasticsearch will use refresh defaults like ?refresh=false.
"""
detailed_results = params.get("detailed-results", False)
api_kwargs = self._default_kw_params(params)
Expand All @@ -501,6 +504,18 @@ async def __call__(self, es, params):
bulk_params["timeout"] = params["timeout"]
if "pipeline" in params:
bulk_params["pipeline"] = params["pipeline"]
if "refresh" in params:
refresh_params = {"sync": "wait_for", "async": "true", "default": "false"}
b-deam marked this conversation as resolved.
Show resolved Hide resolved
bulk_params["refresh"] = refresh_params.get(params["refresh"])

if bulk_params["refresh"] is None:
self.logger.warning(
"Using default bulk refresh parameter value '%s' in place of unrecognized specfified parameter value '%s'. "
"Use one of %s.",
refresh_params["default"],
params["refresh"],
", ".join(refresh_params.keys()),
)
b-deam marked this conversation as resolved.
Show resolved Hide resolved

with_action_metadata = mandatory(params, "action-metadata-present", self)
bulk_size = mandatory(params, "bulk-size", self)
Expand Down
8 changes: 8 additions & 0 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ def __init__(self, track, params, **kwargs):
raise exceptions.InvalidSyntax("'batch-size' must be numeric")

self.ingest_percentage = self.float_param(params, name="ingest-percentage", default_value=100, min_value=0, max_value=100)
self.refresh = params.get("refresh")
self.param_source = PartitionBulkIndexParamSource(
self.corpora,
self.batch_size,
Expand All @@ -644,6 +645,7 @@ def __init__(self, track, params, **kwargs):
self.on_conflict,
self.recency,
self.pipeline,
self.refresh,
self._params,
)

Expand Down Expand Up @@ -705,6 +707,7 @@ def __init__(
on_conflict,
recency,
pipeline=None,
refresh=None,
original_params=None,
):
"""
Expand All @@ -719,6 +722,10 @@ def __init__(
:param recency: A number between [0.0, 1.0] indicating whether to bias generation of conflicting ids towards more recent ones.
May be None.
:param pipeline: The name of the ingest pipeline to run.
:param refresh: Optional values are "async", "sync", "default".
If "async", Elasticsearch refreshes the affected shards in the background.
If "sync", the client is blocked until Elasticsearch finishes the refresh operation.
If "default", Elasticsearch will use the default refresh behavior.
:param original_params: The original dict passed to the parent parameter source.
"""
self.corpora = corpora
Expand All @@ -732,6 +739,7 @@ def __init__(
self.on_conflict = on_conflict
self.recency = recency
self.pipeline = pipeline
self.refresh = refresh
b-deam marked this conversation as resolved.
Show resolved Hide resolved
self.original_params = original_params
# this is only intended for unit-testing
self.create_reader = original_params.pop("__create_reader", create_default_reader)
Expand Down
158 changes: 158 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,164 @@ async def test_bulk_index_error_logs_warning_with_detailed_stats_body(self, es):

es.bulk.assert_awaited_with(body=bulk_params["body"], params={})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_bulk_index_success_with_refresh_default(self, es):
bulk_response = {
"errors": False,
"took": 8,
"items": [{"create": {"_index": "test", "result": "created", "status": 201}}],
}
es.bulk = mock.AsyncMock(return_value=bulk_response)

bulk = runner.BulkIndex()

bulk_params = {
"body": _build_bulk_body("index_line"),
"index": "test",
"action-metadata-present": False,
"type": "_doc",
"bulk-size": 1,
"unit": "docs",
"detailed-results": True,
"refresh": "default",
}

result = await bulk(es, dict(bulk_params))

assert result == {
"took": 8,
"index": "test",
"weight": 1,
"unit": "docs",
"success": True,
"success-count": 1,
"error-count": 0,
"bulk-request-size-bytes": 10,
"ops": {"create": collections.Counter({"item-count": 1, "created": 1})},
"shards_histogram": [],
"total-document-size-bytes": 10,
}

es.bulk.assert_awaited_with(doc_type="_doc", index="test", body=bulk_params["body"], params={"refresh": "false"})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_bulk_index_success_with_refresh_async(self, es):
bulk_response = {
"errors": False,
"took": 8,
"items": [{"create": {"_index": "test", "result": "created", "status": 201, "forced_refresh": True}}],
}
es.bulk = mock.AsyncMock(return_value=bulk_response)

bulk = runner.BulkIndex()

bulk_params = {
"body": _build_bulk_body("index_line"),
"index": "test",
"action-metadata-present": False,
"type": "_doc",
"bulk-size": 1,
"unit": "docs",
"detailed-results": True,
"refresh": "async",
}

result = await bulk(es, dict(bulk_params))

assert result == {
"took": 8,
"index": "test",
"weight": 1,
"unit": "docs",
"success": True,
"success-count": 1,
"error-count": 0,
"bulk-request-size-bytes": 10,
"ops": {"create": collections.Counter({"item-count": 1, "created": 1})},
"shards_histogram": [],
"total-document-size-bytes": 10,
}

es.bulk.assert_awaited_with(doc_type="_doc", index="test", body=bulk_params["body"], params={"refresh": "true"})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_bulk_index_success_with_refresh_sync(self, es):
bulk_response = {
"errors": False,
"took": 8,
}
es.bulk = mock.AsyncMock(return_value=io.BytesIO(json.dumps(bulk_response).encode()))

bulk = runner.BulkIndex()

bulk_params = {
"body": _build_bulk_body("index_line"),
"index": "test",
"action-metadata-present": False,
"type": "_doc",
"bulk-size": 1,
"unit": "docs",
"refresh": "sync",
}

result = await bulk(es, bulk_params)

assert result == {
"took": 8,
"index": "test",
"weight": 1,
"unit": "docs",
"success": True,
"success-count": 1,
"error-count": 0,
}

es.bulk.assert_awaited_with(doc_type="_doc", index="test", body=bulk_params["body"], params={"refresh": "wait_for"})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_bulk_index_success_with_refresh_invalid(self, es):
bulk_response = {
"errors": False,
"took": 8,
"items": [{"create": {"_index": "test", "result": "created", "status": 201}}],
}
es.bulk = mock.AsyncMock(return_value=bulk_response)

bulk = runner.BulkIndex()

bulk_params = {
"body": _build_bulk_body("index_line"),
"index": "test",
"action-metadata-present": False,
"type": "_doc",
"bulk-size": 1,
"unit": "docs",
"detailed-results": True,
"refresh": "notvalid",
}

result = await bulk(es, dict(bulk_params))

assert result == {
"took": 8,
"index": "test",
"weight": 1,
"unit": "docs",
"success": True,
"success-count": 1,
"error-count": 0,
"bulk-request-size-bytes": 10,
"ops": {"create": collections.Counter({"item-count": 1, "created": 1})},
"shards_histogram": [],
"total-document-size-bytes": 10,
}

es.bulk.assert_awaited_with(doc_type="_doc", index="test", body=bulk_params["body"], params={"refresh": None})


class TestForceMergeRunner:
@mock.patch("elasticsearch.Elasticsearch")
Expand Down