From c3b04f4d97de7c379bd78905d83df8bc71c7d229 Mon Sep 17 00:00:00 2001 From: Brad Deam <54515790+b-deam@users.noreply.github.com> Date: Mon, 3 Jul 2023 10:09:22 +0930 Subject: [PATCH] Refactor version number checks (#1738) Serverless Elasticsearch doesn't return a `version.number` field from the Info API (/) , which is actually something we rely on somewhat commonly throughout the codebase. To fix this, we need a clearly defined way of determining whether or not Rally is talking to a Serverless Elasticsearch, this applies for both clients used to target the system under test, as well as any ancillary clients like those used by a remote metrics store. In order to do so, we take two approaches: - Create a `is_serverless | bool` property on existing ES clients objects - Refactor all version checks to default to selecting the 'minimum' version required for a specific conditional, and only use the `is_serverless` property if we need to do something specific for serverless --- esrally/client/__init__.py | 1 + esrally/client/asynchronous.py | 27 ++++++++++++---- esrally/client/factory.py | 44 ++++++++++++++++++++++--- esrally/client/synchronous.py | 59 ++++++++++++++++++++-------------- esrally/driver/driver.py | 25 +++++++++----- esrally/driver/runner.py | 5 +-- esrally/mechanic/__init__.py | 1 - esrally/mechanic/mechanic.py | 19 +---------- esrally/metrics.py | 29 +++++++++-------- esrally/racecontrol.py | 27 ++++++++++++---- esrally/telemetry.py | 17 ++++++---- esrally/tracker/tracker.py | 19 ++++++----- esrally/utils/versions.py | 6 ++++ tests/client/factory_test.py | 4 +++ tests/driver/runner_test.py | 1 + tests/metrics_test.py | 7 +++- tests/utils/versions_test.py | 13 ++++++++ 17 files changed, 203 insertions(+), 101 deletions(-) diff --git a/esrally/client/__init__.py b/esrally/client/__init__.py index 1404d2592..37fdd40be 100644 --- a/esrally/client/__init__.py +++ b/esrally/client/__init__.py @@ -18,6 +18,7 @@ from .context import RequestContextHolder, RequestContextManager from .factory import ( EsClientFactory, + cluster_distribution_version, create_api_key, delete_api_keys, wait_for_rest_layer, diff --git a/esrally/client/asynchronous.py b/esrally/client/asynchronous.py index bc05354cf..8d82475f4 100644 --- a/esrally/client/asynchronous.py +++ b/esrally/client/asynchronous.py @@ -285,21 +285,30 @@ async def put_lifecycle(self, *args, **kwargs): class RallyAsyncElasticsearch(AsyncElasticsearch, RequestContextHolder): def __init__(self, *args, **kwargs): distribution_version = kwargs.pop("distribution_version", None) + distribution_flavor = kwargs.pop("distribution_flavor", None) super().__init__(*args, **kwargs) # skip verification at this point; we've already verified this earlier with the synchronous client. # The async client is used in the hot code path and we use customized overrides (such as that we don't # parse response bodies in some cases for performance reasons, e.g. when using the bulk API). self._verified_elasticsearch = True - if distribution_version: - self.distribution_version = versions.Version.from_string(distribution_version) - else: - self.distribution_version = None + self.distribution_version = distribution_version + self.distribution_flavor = distribution_flavor # some ILM method signatures changed in 'elasticsearch-py' 8.x, # so we override method(s) here to provide BWC for any custom # runners that aren't using the new kwargs self.ilm = RallyIlmClient(self) + @property + def is_serverless(self): + return versions.is_serverless(self.distribution_flavor) + + def options(self, *args, **kwargs): + new_self = super().options(*args, **kwargs) + new_self.distribution_version = self.distribution_version + new_self.distribution_flavor = self.distribution_flavor + return new_self + async def perform_request( self, method: str, @@ -328,9 +337,13 @@ async def perform_request( # Converts all parts of a Accept/Content-Type headers # from application/X -> application/vnd.elasticsearch+X # see https://github.com/elastic/elasticsearch/issues/51816 - if self.distribution_version is not None and self.distribution_version >= versions.Version.from_string("8.0.0"): - _mimetype_header_to_compat("Accept", request_headers) - _mimetype_header_to_compat("Content-Type", request_headers) + # Not applicable to serverless + if not self.is_serverless: + if versions.is_version_identifier(self.distribution_version) and ( + versions.Version.from_string(self.distribution_version) >= versions.Version.from_string("8.0.0") + ): + _mimetype_header_to_compat("Accept", request_headers) + _mimetype_header_to_compat("Content-Type", request_headers) if params: target = f"{path}?{_quote_query(params)}" diff --git a/esrally/client/factory.py b/esrally/client/factory.py index 116438d44..3fa19010a 100644 --- a/esrally/client/factory.py +++ b/esrally/client/factory.py @@ -31,7 +31,7 @@ class EsClientFactory: compatibility guarantees that are broader than the library's defaults. """ - def __init__(self, hosts, client_options, distribution_version=None): + def __init__(self, hosts, client_options, distribution_version=None, distribution_flavor=None): def host_string(host): # protocol can be set at either host or client opts level protocol = "https" if client_options.get("use_ssl") or host.get("use_ssl") else "http" @@ -41,8 +41,10 @@ def host_string(host): self.client_options = dict(client_options) self.ssl_context = None # This attribute is necessary for the backwards-compatibility logic contained in - # RallySyncElasticsearch.perform_request() and RallyAsyncElasticsearch.perform_request(). + # RallySyncElasticsearch.perform_request() and RallyAsyncElasticsearch.perform_request(), and also for + # identification of whether or not a client is 'serverless'. self.distribution_version = distribution_version + self.distribution_flavor = distribution_flavor self.logger = logging.getLogger(__name__) masked_client_options = dict(client_options) @@ -181,7 +183,11 @@ def create(self): from esrally.client.synchronous import RallySyncElasticsearch return RallySyncElasticsearch( - distribution_version=self.distribution_version, hosts=self.hosts, ssl_context=self.ssl_context, **self.client_options + distribution_version=self.distribution_version, + distribution_flavor=self.distribution_flavor, + hosts=self.hosts, + ssl_context=self.ssl_context, + **self.client_options, ) def create_async(self, api_key=None, client_id=None): @@ -226,6 +232,7 @@ async def on_request_end(session, trace_config_ctx, params): async_client = RallyAsyncElasticsearch( distribution_version=self.distribution_version, + distribution_flavor=self.distribution_flavor, hosts=self.hosts, transport_class=RallyAsyncTransport, ssl_context=self.ssl_context, @@ -316,6 +323,32 @@ def wait_for_rest_layer(es, max_attempts=40): return False +def cluster_distribution_version(hosts, client_options, client_factory=EsClientFactory): + """ + Attempt to get the target cluster's distribution version, build flavor, and build hash by creating and using + a 'sync' Elasticsearch client. + + :param hosts: The host(s) to connect to. + :param client_options: The client options to customize the Elasticsearch client. + :param client_factory: Factory class that creates the Elasticsearch client. + :return: The cluster's build flavor, version number, and build hash. For Serverless Elasticsearch these may all be + the build flavor value. + """ + # no way for us to know whether we're talking to a serverless elasticsearch or not, so we default to the sync client + es = client_factory(hosts, client_options).create() + # unconditionally wait for the REST layer - if it's not up by then, we'll intentionally raise the original error + wait_for_rest_layer(es) + version = es.info()["version"] + + version_build_flavor = version.get("build_flavor", "oss") + # build hash will only be available for serverless if the client has operator privs + version_build_hash = version.get("build_hash", version_build_flavor) + # version number does not exist for serverless + version_number = version.get("number", version_build_flavor) + + return version_build_flavor, version_number, version_build_hash + + def create_api_key(es, client_id, max_attempts=5): """ Creates an API key for the provided ``client_id``. @@ -366,7 +399,8 @@ def raise_exception(failed_ids, cause=None): # Before ES 7.10, deleting API keys by ID had to be done individually. # After ES 7.10, a list of API key IDs can be deleted in one request. - current_version = versions.Version.from_string(es.info()["version"]["number"]) + version = es.info()["version"] + current_version = versions.Version.from_string(version.get("number", "7.10.0")) minimum_version = versions.Version.from_string("7.10.0") deleted = [] @@ -377,7 +411,7 @@ def raise_exception(failed_ids, cause=None): import elasticsearch try: - if current_version >= minimum_version: + if current_version >= minimum_version or es.is_serverless: resp = es.security.invalidate_api_key(ids=remaining) deleted += resp["invalidated_api_keys"] remaining = [i for i in ids if i not in deleted] diff --git a/esrally/client/synchronous.py b/esrally/client/synchronous.py index 486ccb7a2..f42918a6a 100644 --- a/esrally/client/synchronous.py +++ b/esrally/client/synchronous.py @@ -15,7 +15,6 @@ # specific language governing permissions and limitations # under the License. -import re import warnings from typing import Any, Iterable, Mapping, Optional @@ -76,24 +75,25 @@ def raise_error(cls, state, meta, body): @classmethod def check_product(cls, headers, response): # type: (dict[str, str], dict[str, str]) -> int - """Verifies that the server we're talking to is Elasticsearch. + """ + Verifies that the server we're talking to is Elasticsearch. Does this by checking HTTP headers and the deserialized response to the 'info' API. Returns one of the states above. """ + + version = response.get("version", {}) try: - version = response.get("version", {}) - version_number = tuple( - int(x) if x is not None else 999 for x in re.search(r"^([0-9]+)\.([0-9]+)(?:\.([0-9]+))?", version["number"]).groups() - ) - except (KeyError, TypeError, ValueError, AttributeError): - # No valid 'version.number' field, effectively 0.0.0 - version = {} - version_number = (0, 0, 0) + version_number = versions.Version.from_string(version.get("number", None)) + except TypeError: + # No valid 'version.number' field, either Serverless Elasticsearch, or not Elasticsearch at all + version_number = versions.Version.from_string("0.0.0") + + build_flavor = version.get("build_flavor", None) # Check all of the fields and headers for missing/valid values. try: bad_tagline = response.get("tagline", None) != "You Know, for Search" - bad_build_flavor = version.get("build_flavor", None) != "default" + bad_build_flavor = build_flavor not in ("default", "serverless") bad_product_header = headers.get("x-elastic-product", None) != "Elasticsearch" except (AttributeError, TypeError): bad_tagline = True @@ -101,19 +101,19 @@ def check_product(cls, headers, response): bad_product_header = True # 7.0-7.13 and there's a bad 'tagline' or unsupported 'build_flavor' - if (7, 0, 0) <= version_number < (7, 14, 0): + if versions.Version.from_string("7.0.0") <= version_number < versions.Version.from_string("7.14.0"): if bad_tagline: return cls.UNSUPPORTED_PRODUCT elif bad_build_flavor: return cls.UNSUPPORTED_DISTRIBUTION elif ( - # No version or version less than 6.x - version_number < (6, 0, 0) - # 6.x and there's a bad 'tagline' - or ((6, 0, 0) <= version_number < (7, 0, 0) and bad_tagline) + # No version or version less than 6.8.0, and we're not talking to a serverless elasticsearch + (version_number < versions.Version.from_string("6.8.0") and not versions.is_serverless(build_flavor)) + # 6.8.0 and there's a bad 'tagline' + or (versions.Version.from_string("6.8.0") <= version_number < versions.Version.from_string("7.0.0") and bad_tagline) # 7.14+ and there's a bad 'X-Elastic-Product' HTTP header - or ((7, 14, 0) <= version_number and bad_product_header) + or (versions.Version.from_string("7.14.0") <= version_number and bad_product_header) ): return cls.UNSUPPORTED_PRODUCT @@ -123,13 +123,21 @@ def check_product(cls, headers, response): class RallySyncElasticsearch(Elasticsearch): def __init__(self, *args, **kwargs): distribution_version = kwargs.pop("distribution_version", None) + distribution_flavor = kwargs.pop("distribution_flavor", None) super().__init__(*args, **kwargs) self._verified_elasticsearch = None + self.distribution_version = distribution_version + self.distribution_flavor = distribution_flavor - if distribution_version: - self.distribution_version = versions.Version.from_string(distribution_version) - else: - self.distribution_version = None + @property + def is_serverless(self): + return versions.is_serverless(self.distribution_flavor) + + def options(self, *args, **kwargs): + new_self = super().options(*args, **kwargs) + new_self.distribution_version = self.distribution_version + new_self.distribution_flavor = self.distribution_flavor + return new_self def perform_request( self, @@ -172,9 +180,12 @@ def perform_request( # Converts all parts of a Accept/Content-Type headers # from application/X -> application/vnd.elasticsearch+X # see https://github.com/elastic/elasticsearch/issues/51816 - if self.distribution_version is not None and self.distribution_version >= versions.Version.from_string("8.0.0"): - _mimetype_header_to_compat("Accept", request_headers) - _mimetype_header_to_compat("Content-Type", request_headers) + if not self.is_serverless: + if versions.is_version_identifier(self.distribution_version) and ( + versions.Version.from_string(self.distribution_version) >= versions.Version.from_string("8.0.0") + ): + _mimetype_header_to_compat("Accept", headers) + _mimetype_header_to_compat("Content-Type", headers) if params: target = f"{path}?{_quote_query(params)}" diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index ab69e00ad..2a64f42c6 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -349,17 +349,21 @@ def _create_track_preparator(self, host): def _after_track_prepared(self): cluster_version = self.cluster_details["version"] if self.cluster_details else {} + # manually compiled versions don't expose build_flavor but Rally expects a value in telemetry devices + # we should default to trial/basic, but let's default to oss for now to avoid breaking the charts + build_flavor = cluster_version.get("build_flavor", "oss") + build_version = cluster_version.get("number", build_flavor) + build_hash = cluster_version.get("build_hash", build_flavor) + for child in self.children: self.send(child, thespian.actors.ActorExitRequest()) self.children = [] self.send( self.start_sender, PreparationComplete( - # manually compiled versions don't expose build_flavor but Rally expects a value in telemetry devices - # we should default to trial/basic, but let's default to oss for now to avoid breaking the charts - cluster_version.get("build_flavor", "oss"), - cluster_version.get("number"), - cluster_version.get("build_hash"), + build_flavor, + build_version, + build_hash, ), ) @@ -599,6 +603,7 @@ def __init__(self, target, config, es_client_factory_class=client.EsClientFactor def create_es_clients(self): all_hosts = self.config.opts("client", "hosts").all_hosts distribution_version = self.config.opts("mechanic", "distribution.version", mandatory=False) + distribution_flavor = self.config.opts("mechanic", "distribution.flavor", mandatory=False) es = {} for cluster_name, cluster_hosts in all_hosts.items(): all_client_options = self.config.opts("client", "options").all_client_options @@ -606,7 +611,7 @@ def create_es_clients(self): # Use retries to avoid aborts on long living connections for telemetry devices cluster_client_options["retry_on_timeout"] = True es[cluster_name] = self.es_client_factory( - cluster_hosts, cluster_client_options, distribution_version=distribution_version + cluster_hosts, cluster_client_options, distribution_version=distribution_version, distribution_flavor=distribution_flavor ).create() return es @@ -1729,13 +1734,16 @@ def _logging_exception_handler(self, loop, context): self.logger.error("Uncaught exception in event loop: %s", context) async def run(self): - def es_clients(client_id, all_hosts, all_client_options, distribution_version): + def es_clients(client_id, all_hosts, all_client_options, distribution_version, distribution_flavor): es = {} context = self.client_contexts.get(client_id) api_key = context.api_key for cluster_name, cluster_hosts in all_hosts.items(): es[cluster_name] = client.EsClientFactory( - cluster_hosts, all_client_options[cluster_name], distribution_version=distribution_version + cluster_hosts, + all_client_options[cluster_name], + distribution_version=distribution_version, + distribution_flavor=distribution_flavor, ).create_async(api_key=api_key, client_id=client_id) return es @@ -1758,6 +1766,7 @@ def es_clients(client_id, all_hosts, all_client_options, distribution_version): self.cfg.opts("client", "hosts").all_hosts, self.cfg.opts("client", "options"), self.cfg.opts("mechanic", "distribution.version", mandatory=False), + self.cfg.opts("mechanic", "distribution.flavor", mandatory=False), ) clients.append(es) async_executor = AsyncExecutor( diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 0664506a2..7c03e6b95 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -2094,12 +2094,13 @@ async def __call__(self, es, params): repository = mandatory(params, "repository", repr(self)) wait_period = params.get("completion-recheck-wait-period", 1) es_info = await es.info() - es_version = Version.from_string(es_info["version"]["number"]) + es_version = es_info["version"].get("number", "8.3.0") + request_args = {"repository": repository, "snapshot": "_current", "verbose": False} # significantly reduce response size when lots of snapshots have been taken # only available since ES 8.3.0 (https://github.com/elastic/elasticsearch/pull/86269) - if (es_version.major, es_version.minor) >= (8, 3): + if (Version.from_string(es_version) >= Version.from_string("8.3.0")) or es.is_serverless: request_args["index_names"] = False while True: diff --git a/esrally/mechanic/__init__.py b/esrally/mechanic/__init__.py index 37a6f63cb..e713034d0 100644 --- a/esrally/mechanic/__init__.py +++ b/esrally/mechanic/__init__.py @@ -24,7 +24,6 @@ StartEngine, StopEngine, build, - cluster_distribution_version, download, install, start, diff --git a/esrally/mechanic/mechanic.py b/esrally/mechanic/mechanic.py index 8d41dd4cf..4ee35923b 100644 --- a/esrally/mechanic/mechanic.py +++ b/esrally/mechanic/mechanic.py @@ -26,7 +26,7 @@ import thespian.actors -from esrally import PROGRAM_NAME, actor, client, config, exceptions, metrics, paths +from esrally import PROGRAM_NAME, actor, config, exceptions, metrics, paths from esrally.mechanic import launcher, provisioner, supplier, team from esrally.utils import console, net @@ -271,23 +271,6 @@ class NodesStopped: pass -def cluster_distribution_version(cfg, client_factory=client.EsClientFactory): - """ - Attempt to get the cluster's distribution version even before it is actually started (which makes only sense for externally - provisioned clusters). - - :param cfg: The current config object. - :param client_factory: Factory class that creates the Elasticsearch client. - :return: The distribution version. - """ - hosts = cfg.opts("client", "hosts").default - client_options = cfg.opts("client", "options").default - es = client_factory(hosts, client_options).create() - # unconditionally wait for the REST layer - if it's not up by then, we'll intentionally raise the original error - client.wait_for_rest_layer(es) - return es.info()["version"]["number"] - - def to_ip_port(hosts): ip_port_pairs = [] for host in hosts: diff --git a/esrally/metrics.py b/esrally/metrics.py index d36ef8225..f447c8789 100644 --- a/esrally/metrics.py +++ b/esrally/metrics.py @@ -48,16 +48,6 @@ def __init__(self, client, cluster_version=None): self._cluster_version = cluster_version self.retryable_status_codes = [502, 503, 504, 429] - # TODO #1335: Use version-specific support for metrics stores after 7.8.0. - def probe_version(self): - info = self.guarded(self._client.info) - try: - self._cluster_version = versions.components(info["version"]["number"]) - except BaseException: - msg = "Could not determine version of metrics cluster" - self.logger.exception(msg) - raise exceptions.RallyError(msg) - def put_template(self, name, template): tmpl = json.loads(template) return self.guarded(self._client.indices.put_template, name=name, **tmpl) @@ -238,8 +228,11 @@ def __init__(self, cfg): self._config = cfg host = self._config.opts("reporting", "datastore.host") port = self._config.opts("reporting", "datastore.port") + hosts = [{"host": host, "port": port}] secure = convert.to_bool(self._config.opts("reporting", "datastore.secure")) user = self._config.opts("reporting", "datastore.user") + distribution_version = None + distribution_flavor = None try: password = os.environ["RALLY_REPORTING_DATASTORE_PASSWORD"] except KeyError: @@ -266,13 +259,21 @@ def __init__(self, cfg): client_options["basic_auth_user"] = user client_options["basic_auth_password"] = password - factory = client.EsClientFactory(hosts=[{"host": host, "port": port}], client_options=client_options) + # TODO #1335: Use version-specific support for metrics stores after 7.8.0. + if self.probe_version: + distribution_flavor, distribution_version, _ = client.cluster_distribution_version(hosts=hosts, client_options=client_options) + self._cluster_version = distribution_version + + factory = client.EsClientFactory( + hosts=hosts, + client_options=client_options, + distribution_version=distribution_version, + distribution_flavor=distribution_flavor, + ) self._client = factory.create() def create(self): c = EsClient(self._client) - if self.probe_version: - c.probe_version() return c @@ -1509,7 +1510,7 @@ def to_result_dicts(self): # allow to logically delete records, e.g. for UI purposes when we only want to show the latest result "active": True, } - if self.distribution_version: + if versions.is_version_identifier(self.distribution_version): result_template["distribution-major-version"] = versions.major_version(self.distribution_version) if self.team_revision: result_template["team-revision"] = self.team_revision diff --git a/esrally/racecontrol.py b/esrally/racecontrol.py index 87d1181b1..8f0b04889 100644 --- a/esrally/racecontrol.py +++ b/esrally/racecontrol.py @@ -26,6 +26,7 @@ from esrally import ( PROGRAM_NAME, actor, + client, config, doc_link, driver, @@ -183,13 +184,27 @@ def setup(self, sources=False): # but there are rare cases (external pipeline and user did not specify the distribution version) where we need # to derive it ourselves. For source builds we always assume "main" if not sources and not self.cfg.exists("mechanic", "distribution.version"): - distribution_version = mechanic.cluster_distribution_version(self.cfg) - self.logger.info("Automatically derived distribution version [%s]", distribution_version) + hosts = self.cfg.opts("client", "hosts").default + client_options = self.cfg.opts("client", "options").default + distribution_flavor, distribution_version, distribution_build_hash = client.factory.cluster_distribution_version( + hosts, client_options + ) + + self.logger.info( + "Automatically derived distribution flavor [%s], version [%s], and build hash [%s]", + distribution_flavor, + distribution_version, + distribution_build_hash, + ) self.cfg.add(config.Scope.benchmark, "mechanic", "distribution.version", distribution_version) - min_es_version = versions.Version.from_string(version.minimum_es_version()) - specified_version = versions.Version.from_string(distribution_version) - if specified_version < min_es_version: - raise exceptions.SystemSetupError(f"Cluster version must be at least [{min_es_version}] but was [{distribution_version}]") + self.cfg.add(config.Scope.benchmark, "mechanic", "distribution.flavor", distribution_flavor) + if not versions.is_serverless(distribution_flavor): + min_es_version = versions.Version.from_string(version.minimum_es_version()) + specified_version = versions.Version.from_string(distribution_version) + if specified_version < min_es_version: + raise exceptions.SystemSetupError( + f"Cluster version must be at least [{min_es_version}] but was [{distribution_version}]" + ) self.current_track = track.load_track(self.cfg, install_dependencies=True) self.track_revision = self.cfg.opts("track", "repository.revision", mandatory=False) diff --git a/esrally/telemetry.py b/esrally/telemetry.py index 737375741..86ba574d4 100644 --- a/esrally/telemetry.py +++ b/esrally/telemetry.py @@ -757,8 +757,9 @@ def __init__(self, telemetry_params, clients, metrics_store): def on_benchmark_start(self): default_client = self.clients["default"] - distribution_version = default_client.info()["version"]["number"] - if Version.from_string(distribution_version) < Version(major=7, minor=2, patch=0): + es_info = default_client.info() + es_version = es_info["version"].get("number", "7.2.0") + if Version.from_string(es_version) < Version(major=7, minor=2, patch=0): console.warn(NodeStats.warning, logger=self.logger) for cluster_name in self.specified_cluster_names: @@ -1350,9 +1351,9 @@ def __init__(self, telemetry_params, clients, metrics_store): def on_benchmark_start(self): for cluster_name in self.specified_cluster_names: recorder = DataStreamStatsRecorder(cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval) - client_info = self.clients[cluster_name].info() - distribution_version = client_info["version"]["number"] - distribution_flavor = client_info["version"].get("build_flavor", "oss") + es_info = self.clients[cluster_name].info() + distribution_version = es_info["version"].get("number", "7.9.0") + distribution_flavor = es_info["version"].get("build_flavor", "oss") if Version.from_string(distribution_version) < Version(major=7, minor=9, patch=0): raise exceptions.SystemSetupError( "The data-stream-stats telemetry device can only be used with clusters from version 7.9 onwards" @@ -1772,9 +1773,11 @@ def on_benchmark_start(self): except BaseException: self.logger.exception("Could not retrieve cluster version info") return - revision = client_info["version"]["build_hash"] - distribution_version = client_info["version"]["number"] distribution_flavor = client_info["version"].get("build_flavor", "oss") + # build hash will only be available on serverless if the client has operator privs + revision = client_info["version"].get("build_hash", distribution_flavor) + # build version does not exist for serverless + distribution_version = client_info["version"].get("number", distribution_flavor) self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "source_revision", revision) self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "distribution_version", distribution_version) self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "distribution_flavor", distribution_flavor) diff --git a/esrally/tracker/tracker.py b/esrally/tracker/tracker.py index cc0a30db5..31249130f 100644 --- a/esrally/tracker/tracker.py +++ b/esrally/tracker/tracker.py @@ -22,9 +22,9 @@ from jinja2 import Environment, FileSystemLoader from esrally import PROGRAM_NAME -from esrally.client import EsClientFactory +from esrally.client import factory from esrally.tracker import corpus, index -from esrally.utils import console, io, opts +from esrally.utils import console, io def process_template(templates_path, template_filename, template_vars, output_path): @@ -74,16 +74,19 @@ def create_track(cfg): track_name = cfg.opts("track", "track.name") indices = cfg.opts("generator", "indices") root_path = cfg.opts("generator", "output.path") - target_hosts = cfg.opts("client", "hosts") - client_options = cfg.opts("client", "options") + target_hosts = cfg.opts("client", "hosts").default + client_options = cfg.opts("client", "options").default data_streams = cfg.opts("generator", "data_streams") - client = EsClientFactory( - hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT], client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT] + distribution_flavor, distribution_version, _ = factory.cluster_distribution_version(target_hosts, client_options) + client = factory.EsClientFactory( + hosts=target_hosts, + client_options=client_options, + distribution_version=distribution_version, + distribution_flavor=distribution_flavor, ).create() - info = client.info() - console.info(f"Connected to Elasticsearch cluster [{info['name']}] version [{info['version']['number']}].\n", logger=logger) + console.info(f"Connected to Elasticsearch cluster version [{distribution_version}] flavor [{distribution_flavor}] \n", logger=logger) output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), track_name)) io.ensure_dir(output_path) diff --git a/esrally/utils/versions.py b/esrally/utils/versions.py index ac7ded777..00f02b5df 100644 --- a/esrally/utils/versions.py +++ b/esrally/utils/versions.py @@ -33,6 +33,10 @@ def is_version_identifier(text, strict=True): return text is not None and _versions_pattern(strict).match(text) is not None +def is_serverless(text): + return text == "serverless" + + def major_version(version): """ Determines the major version of a given version string. @@ -180,6 +184,8 @@ def best_match(available_alternatives, distribution_version): major, _, _, _ = components(distribution_version) if major > _latest_major(available_alternatives): return "master" + elif is_serverless(distribution_version): + return "master" elif not distribution_version: return "master" return None diff --git a/tests/client/factory_test.py b/tests/client/factory_test.py index a2c9d4abe..284926a28 100644 --- a/tests/client/factory_test.py +++ b/tests/client/factory_test.py @@ -342,6 +342,7 @@ def test_create_async_client_with_api_key_auth_override(self, es): es.assert_called_once_with( distribution_version=None, + distribution_flavor=None, hosts=["https://localhost:9200"], transport_class=RallyAsyncTransport, ssl_context=f.ssl_context, @@ -566,6 +567,7 @@ def test_retries_api_key_creation_on_transport_errors(self, es, sleep): @mock.patch("elasticsearch.Elasticsearch") def test_successfully_deletes_api_keys(self, es, version): ids = ["foo", "bar", "baz"] + es.is_serverless = False es.info.return_value = {"version": {"number": version}} if version == "7.9.0": es.security.invalidate_api_key.return_value = [ @@ -590,6 +592,7 @@ def test_successfully_deletes_api_keys(self, es, version): @mock.patch("elasticsearch.Elasticsearch") def test_retries_api_keys_deletion_on_transport_errors(self, es, sleep, version): max_attempts = 5 + es.is_serverless = False es.info.return_value = {"version": {"number": version}} ids = ["foo", "bar", "baz"] if version == "7.9.0": @@ -625,6 +628,7 @@ def test_retries_api_keys_deletion_on_transport_errors(self, es, sleep, version) @pytest.mark.parametrize("version", ["7.9.0", "7.10.0"]) @mock.patch("elasticsearch.Elasticsearch") def test_raises_exception_when_api_key_deletion_fails(self, es, version): + es.is_serverless = False es.info.return_value = {"version": {"number": version}} ids = ["foo", "bar", "baz", "qux"] failed_to_delete = ["baz", "qux"] diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 065fd79dc..3e73be5f5 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -4107,6 +4107,7 @@ class TestWaitForCurrentSnapshotsCreate: @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_wait_for_current_snapshots_create_before_8_3_0(self, es): + es.is_serverless = False es.info = mock.AsyncMock( return_value={ "name": "es01", diff --git a/tests/metrics_test.py b/tests/metrics_test.py index cf7dbcc93..225caf4b6 100644 --- a/tests/metrics_test.py +++ b/tests/metrics_test.py @@ -123,11 +123,13 @@ def test_config_opts_parsing(self, client_esclientfactory, password_configuratio _datastore_user = "".join([random.choice(string.ascii_letters) for _ in range(8)]) _datastore_password = "".join([random.choice(string.ascii_letters + string.digits + "_-@#$/") for _ in range(12)]) _datastore_verify_certs = random.choice([True, False]) + _datastore_probe_version = False cfg.add(config.Scope.applicationOverride, "reporting", "datastore.host", _datastore_host) cfg.add(config.Scope.applicationOverride, "reporting", "datastore.port", _datastore_port) cfg.add(config.Scope.applicationOverride, "reporting", "datastore.secure", _datastore_secure) cfg.add(config.Scope.applicationOverride, "reporting", "datastore.user", _datastore_user) + cfg.add(config.Scope.applicationOverride, "reporting", "datastore.probe.cluster_version", _datastore_probe_version) if password_configuration == "config": cfg.add(config.Scope.applicationOverride, "reporting", "datastore.password", _datastore_password) @@ -158,7 +160,10 @@ def test_config_opts_parsing(self, client_esclientfactory, password_configuratio } client_esclientfactory.assert_called_with( - hosts=[{"host": _datastore_host, "port": _datastore_port}], client_options=expected_client_options + hosts=[{"host": _datastore_host, "port": _datastore_port}], + client_options=expected_client_options, + distribution_version=None, + distribution_flavor=None, ) @mock.patch("random.random") diff --git a/tests/utils/versions_test.py b/tests/utils/versions_test.py index 316d7faf0..845ceb311 100644 --- a/tests/utils/versions_test.py +++ b/tests/utils/versions_test.py @@ -26,6 +26,7 @@ class TestsVersions: def test_is_version_identifier(self): + assert versions.is_version_identifier("serverless") is False assert versions.is_version_identifier(None) is False assert versions.is_version_identifier("") is False assert versions.is_version_identifier(" \t ") is False @@ -44,6 +45,18 @@ def test_is_version_identifier(self): assert versions.is_version_identifier("23", strict=False) assert versions.is_version_identifier("20.3.7-SNAPSHOT", strict=False) + def test_is_serverless(self): + assert versions.is_serverless("serverless") + assert versions.is_serverless("default") is False + assert versions.is_serverless(None) is False + assert versions.is_serverless("") is False + assert versions.is_serverless(" \t ") is False + assert versions.is_serverless("8-ab-c") is False + assert versions.is_serverless("8.9.0") is False + assert versions.is_serverless("8.1") is False + assert versions.is_serverless("8") is False + assert versions.is_serverless("20.3.7-SNAPSHOT") is False + def test_finds_components_for_valid_version(self): assert versions.components("5.0.3") == (5, 0, 3, None) assert versions.components("7.12.1-SNAPSHOT") == (7, 12, 1, "SNAPSHOT")