-
Notifications
You must be signed in to change notification settings - Fork 313
/
client.py
316 lines (262 loc) · 13.9 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextvars
import logging
import time
import certifi
import urllib3
from esrally import doc_link, exceptions
from esrally.utils import console, convert
class RequestContextManager:
"""
Ensures that request context span the defined scope and allow nesting of request contexts with proper propagation.
This means that we can span a top-level request context, open sub-request contexts that can be used to measure
individual timings and still measure the proper total time on the top-level request context.
"""
def __init__(self, request_context_holder):
self.ctx_holder = request_context_holder
self.ctx = None
self.token = None
async def __aenter__(self):
self.ctx, self.token = self.ctx_holder.init_request_context()
return self
@property
def request_start(self):
return self.ctx["request_start"]
@property
def request_end(self):
return self.ctx["request_end"]
async def __aexit__(self, exc_type, exc_val, exc_tb):
# propagate earliest request start and most recent request end to parent
request_start = self.request_start
request_end = self.request_end
self.ctx_holder.restore_context(self.token)
# don't attempt to restore these values on the top-level context as they don't exist
if self.token.old_value != contextvars.Token.MISSING:
self.ctx_holder.update_request_start(request_start)
self.ctx_holder.update_request_end(request_end)
self.token = None
return False
class RequestContextHolder:
"""
Holds request context variables. This class is only meant to be used together with RequestContextManager.
"""
request_context = contextvars.ContextVar("rally_request_context")
def new_request_context(self):
return RequestContextManager(self)
@classmethod
def init_request_context(cls):
ctx = {}
token = cls.request_context.set(ctx)
return ctx, token
@classmethod
def restore_context(cls, token):
cls.request_context.reset(token)
@classmethod
def update_request_start(cls, new_request_start):
meta = cls.request_context.get()
# this can happen if multiple requests are sent on the wire for one logical request (e.g. scrolls)
if "request_start" not in meta:
meta["request_start"] = new_request_start
@classmethod
def update_request_end(cls, new_request_end):
meta = cls.request_context.get()
meta["request_end"] = new_request_end
@classmethod
def on_request_start(cls):
cls.update_request_start(time.perf_counter())
@classmethod
def on_request_end(cls):
cls.update_request_end(time.perf_counter())
@classmethod
def return_raw_response(cls):
ctx = cls.request_context.get()
ctx["raw_response"] = True
class EsClientFactory:
"""
Abstracts how the Elasticsearch client is created. Intended for testing.
"""
def __init__(self, hosts, client_options):
self.hosts = hosts
self.client_options = dict(client_options)
self.ssl_context = None
self.logger = logging.getLogger(__name__)
masked_client_options = dict(client_options)
if "basic_auth_password" in masked_client_options:
masked_client_options["basic_auth_password"] = "*****"
if "http_auth" in masked_client_options:
masked_client_options["http_auth"] = (masked_client_options["http_auth"][0], "*****")
self.logger.info("Creating ES client connected to %s with options [%s]", hosts, masked_client_options)
# we're using an SSL context now and it is not allowed to have use_ssl present in client options anymore
if self.client_options.pop("use_ssl", False):
# pylint: disable=import-outside-toplevel
import ssl
self.logger.info("SSL support: on")
self.client_options["scheme"] = "https"
# ssl.Purpose.CLIENT_AUTH allows presenting client certs and can only be enabled during instantiation
# but can be disabled via the verify_mode property later on.
self.ssl_context = ssl.create_default_context(
ssl.Purpose.CLIENT_AUTH, cafile=self.client_options.pop("ca_certs", certifi.where())
)
if not self.client_options.pop("verify_certs", True):
self.logger.info("SSL certificate verification: off")
# order matters to avoid ValueError: check_hostname needs a SSL context with either CERT_OPTIONAL or CERT_REQUIRED
self.ssl_context.verify_mode = ssl.CERT_NONE
self.ssl_context.check_hostname = False
self.logger.warning(
"User has enabled SSL but disabled certificate verification. This is dangerous but may be ok for a "
"benchmark. Disabling urllib warnings now to avoid a logging storm. "
"See https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings for details."
)
# disable: "InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly \
# advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings"
urllib3.disable_warnings()
else:
self.ssl_context.verify_mode = ssl.CERT_REQUIRED
self.ssl_context.check_hostname = True
self.logger.info("SSL certificate verification: on")
# When using SSL_context, all SSL related kwargs in client options get ignored
client_cert = self.client_options.pop("client_cert", False)
client_key = self.client_options.pop("client_key", False)
if not client_cert and not client_key:
self.logger.info("SSL client authentication: off")
elif bool(client_cert) != bool(client_key):
self.logger.error("Supplied client-options contain only one of client_cert/client_key. ")
defined_client_ssl_option = "client_key" if client_key else "client_cert"
missing_client_ssl_option = "client_cert" if client_key else "client_key"
console.println(
"'{}' is missing from client-options but '{}' has been specified.\n"
"If your Elasticsearch setup requires client certificate verification both need to be supplied.\n"
"Read the documentation at {}\n".format(
missing_client_ssl_option,
defined_client_ssl_option,
console.format.link(doc_link("command_line_reference.html#client-options")),
)
)
raise exceptions.SystemSetupError(
"Cannot specify '{}' without also specifying '{}' in client-options.".format(
defined_client_ssl_option, missing_client_ssl_option
)
)
elif client_cert and client_key:
self.logger.info("SSL client authentication: on")
self.ssl_context.load_cert_chain(certfile=client_cert, keyfile=client_key)
else:
self.logger.info("SSL support: off")
self.client_options["scheme"] = "http"
if self._is_set(self.client_options, "basic_auth_user") and self._is_set(self.client_options, "basic_auth_password"):
self.logger.info("HTTP basic authentication: on")
self.client_options["http_auth"] = (self.client_options.pop("basic_auth_user"), self.client_options.pop("basic_auth_password"))
else:
self.logger.info("HTTP basic authentication: off")
if self._is_set(self.client_options, "compressed"):
console.warn("You set the deprecated client option 'compressed‘. Please use 'http_compress' instead.", logger=self.logger)
self.client_options["http_compress"] = self.client_options.pop("compressed")
if self._is_set(self.client_options, "http_compress"):
self.logger.info("HTTP compression: on")
else:
self.logger.info("HTTP compression: off")
if self._is_set(self.client_options, "enable_cleanup_closed"):
self.client_options["enable_cleanup_closed"] = convert.to_bool(self.client_options.pop("enable_cleanup_closed"))
def _is_set(self, client_opts, k):
try:
return client_opts[k]
except KeyError:
return False
def create(self):
# pylint: disable=import-outside-toplevel
import elasticsearch
return elasticsearch.Elasticsearch(hosts=self.hosts, ssl_context=self.ssl_context, **self.client_options)
def create_async(self):
# pylint: disable=import-outside-toplevel
import io
import aiohttp
import elasticsearch
from elasticsearch.serializer import JSONSerializer
import esrally.async_connection
class LazyJSONSerializer(JSONSerializer):
def loads(self, s):
meta = RallyAsyncElasticsearch.request_context.get()
if "raw_response" in meta:
return io.BytesIO(s)
else:
return super().loads(s)
async def on_request_start(session, trace_config_ctx, params):
RallyAsyncElasticsearch.on_request_start()
async def on_request_end(session, trace_config_ctx, params):
RallyAsyncElasticsearch.on_request_end()
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_request_end.append(on_request_end)
# ensure that we also stop the timer when a request "ends" with an exception (e.g. a timeout)
trace_config.on_request_exception.append(on_request_end)
# override the builtin JSON serializer
self.client_options["serializer"] = LazyJSONSerializer()
self.client_options["trace_config"] = trace_config
class VerifiedAsyncTransport(elasticsearch.AsyncTransport):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# skip verification at this point; we've already verified this earlier with the synchronous client.
# The async client is used in the hot code path and we use customized overrides (such as that we don't
# parse response bodies in some cases for performance reasons, e.g. when using the bulk API).
self._verified_elasticsearch = True
class RallyAsyncElasticsearch(elasticsearch.AsyncElasticsearch, RequestContextHolder):
pass
return RallyAsyncElasticsearch(
hosts=self.hosts,
transport_class=VerifiedAsyncTransport,
connection_class=esrally.async_connection.AIOHttpConnection,
ssl_context=self.ssl_context,
**self.client_options,
)
def wait_for_rest_layer(es, max_attempts=40):
"""
Waits for ``max_attempts`` until Elasticsearch's REST API is available.
:param es: Elasticsearch client to use for connecting.
:param max_attempts: The maximum number of attempts to check whether the REST API is available.
:return: True iff Elasticsearch's REST API is available.
"""
# assume that at least the hosts that we expect to contact should be available. Note that this is not 100%
# bullet-proof as a cluster could have e.g. dedicated masters which are not contained in our list of target hosts
# but this is still better than just checking for any random node's REST API being reachable.
expected_node_count = len(es.transport.hosts)
logger = logging.getLogger(__name__)
for attempt in range(max_attempts):
logger.debug("REST API is available after %s attempts", attempt)
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
# see also WaitForHttpResource in Elasticsearch tests. Contrary to the ES tests we consider the API also
# available when the cluster status is RED (as long as all required nodes are present)
es.cluster.health(wait_for_nodes=">={}".format(expected_node_count))
logger.info("REST API is available for >= [%s] nodes after [%s] attempts.", expected_node_count, attempt)
return True
except elasticsearch.ConnectionError as e:
if "SSL: UNKNOWN_PROTOCOL" in str(e):
raise exceptions.SystemSetupError("Could not connect to cluster via https. Is this an https endpoint?", e)
else:
logger.debug("Got connection error on attempt [%s]. Sleeping...", attempt)
time.sleep(3)
except elasticsearch.TransportError as e:
# cluster block, x-pack not initialized yet, our wait condition is not reached
if e.status_code in (503, 401, 408):
logger.debug("Got status code [%s] on attempt [%s]. Sleeping...", e.status_code, attempt)
time.sleep(3)
else:
logger.warning("Got unexpected status code [%s] on attempt [%s].", e.status_code, attempt)
raise e
return False