Skip to content

Commit

Permalink
Allow to attach telemetry devices without reprovisioning (#737)
Browse files Browse the repository at this point in the history
By using ES_JAVA_OPTS we can provision a node, run a benchmark, and then
“dynamically” (i.e. without reprovisioning) start the node again with
telemetry attached.

Relates to #697
Relates to #711
  • Loading branch information
ebadyano authored Aug 20, 2019
1 parent 442d0f1 commit 556de2d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 38 deletions.
21 changes: 16 additions & 5 deletions esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,18 @@ def _start_node(self, node_configuration, node_count_on_host):
car = node_configuration.car
binary_path = node_configuration.binary_path
data_paths = node_configuration.data_paths
node_telemetry_dir = "%s/telemetry" % node_configuration.node_root_path
node_telemetry_dir = os.path.join(node_configuration.node_root_path, "telemetry")

java_major_version, java_home = java_resolver.java_home(car, self.cfg)

self.logger.info("Starting node [%s] based on car [%s].", node_name, car)

enabled_devices = self.cfg.opts("mechanic", "telemetry.devices")
telemetry_params = self.cfg.opts("mechanic", "telemetry.params")
node_telemetry = [
telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version),
telemetry.JitCompiler(node_telemetry_dir),
telemetry.Gc(node_telemetry_dir, java_major_version),
telemetry.DiskIo(self.metrics_store, node_count_on_host, node_telemetry_dir, node_name),
telemetry.NodeEnvironmentInfo(self.metrics_store),
telemetry.IndexSize(data_paths, self.metrics_store),
Expand All @@ -328,19 +332,26 @@ def _prepare_env(self, car, node_name, java_home, t):
env = {}
env.update(os.environ)
env.update(car.env)
self._set_env(env, "PATH", os.path.join(java_home, "bin"), separator=os.pathsep)
self._set_env(env, "PATH", os.path.join(java_home, "bin"), separator=os.pathsep, prepend=True)
# Don't merge here!
env["JAVA_HOME"] = java_home
env["ES_JAVA_OPTS"] = "-XX:+ExitOnOutOfMemoryError"

# we just blindly trust telemetry here...
for v in t.instrument_candidate_java_opts(car, node_name):
self._set_env(env, "ES_JAVA_OPTS", v)

self.logger.debug("env for [%s]: %s", node_name, str(env))
return env

def _set_env(self, env, k, v, separator=' '):
def _set_env(self, env, k, v, separator=' ', prepend=False):
if v is not None:
if k not in env:
env[k] = v
else: # merge
env[k] = v + separator + env[k]
elif prepend:
env[k] = v + separator + env[k]
else:
env[k] = env[k] + separator + v

@staticmethod
def _start_process(binary_path, env):
Expand Down
32 changes: 5 additions & 27 deletions esrally/mechanic/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import jinja2

from esrally import exceptions
from esrally.mechanic import team, java_resolver, telemetry
from esrally.utils import io, process, versions, jvm
from esrally.mechanic import team, java_resolver
from esrally.utils import io, process, versions


def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_root, node_id):
Expand All @@ -39,21 +39,10 @@ def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_

_, java_home = java_resolver.java_home(car, cfg)

node_telemetry_dir = os.path.join(node_root_dir, "telemetry")
java_major_version, java_home = java_resolver.java_home(car, cfg)
enabled_devices = cfg.opts("mechanic", "telemetry.devices")
telemetry_params = cfg.opts("mechanic", "telemetry.params")
node_telemetry = [
telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version),
telemetry.JitCompiler(node_telemetry_dir),
telemetry.Gc(node_telemetry_dir, java_major_version)
]
t = telemetry.Telemetry(enabled_devices, devices=node_telemetry)

es_installer = ElasticsearchInstaller(car, java_home, node_name, node_root_dir, all_node_ips, ip, http_port)
plugin_installers = [PluginInstaller(plugin, java_home) for plugin in plugins]

return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, t, distribution_version=distribution_version)
return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, distribution_version=distribution_version)


def no_op_provisioner():
Expand Down Expand Up @@ -158,14 +147,13 @@ class BareProvisioner:
of the benchmark candidate to the appropriate place.
"""

def __init__(self, cluster_settings, es_installer, plugin_installers, preserve, telemetry=None, distribution_version=None, apply_config=_apply_config):
def __init__(self, cluster_settings, es_installer, plugin_installers, preserve, distribution_version=None, apply_config=_apply_config):
self.preserve = preserve
self._cluster_settings = cluster_settings
self.es_installer = es_installer
self.plugin_installers = plugin_installers
self.distribution_version = distribution_version
self.apply_config = apply_config
self.telemetry = telemetry
self.logger = logging.getLogger(__name__)

def prepare(self, binary):
Expand Down Expand Up @@ -197,13 +185,7 @@ def prepare(self, binary):

def cleanup(self):
self.es_installer.cleanup(self.preserve)

def _prepare_java_opts(self):
# To detect out of memory errors during the benchmark
java_opts = ["-XX:+ExitOnOutOfMemoryError"]
if self.telemetry is not None:
java_opts.extend(self.telemetry.instrument_candidate_java_opts(self.es_installer.car, self.es_installer.node_name))
return java_opts


def _provisioner_variables(self):
plugin_variables = {}
Expand Down Expand Up @@ -237,10 +219,6 @@ def _provisioner_variables(self):
provisioner_vars.update(plugin_variables)
provisioner_vars["cluster_settings"] = cluster_settings

java_opts = self._prepare_java_opts()
if java_opts:
provisioner_vars["additional_java_settings"] = java_opts

return provisioner_vars


Expand Down
25 changes: 22 additions & 3 deletions tests/mechanic/launcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
import io, os
from unittest import TestCase, mock

from esrally import config, exceptions, paths
from esrally.mechanic import launcher
from esrally.mechanic import launcher, telemetry, team
from esrally.utils import opts


Expand Down Expand Up @@ -119,6 +119,7 @@ def wait(self):


class ProcessLauncherTests(TestCase):
@mock.patch('os.path.join', return_value="/telemetry")
@mock.patch('os.kill')
@mock.patch('subprocess.Popen',new=MockPopen)
@mock.patch('esrally.mechanic.java_resolver.java_home', return_value=(12, "/java_home/"))
Expand All @@ -129,7 +130,7 @@ class ProcessLauncherTests(TestCase):
@mock.patch('esrally.mechanic.provisioner.NodeConfiguration')
@mock.patch('esrally.mechanic.launcher.wait_for_pidfile', return_value=MOCK_PID_VALUE)
@mock.patch('psutil.Process')
def test_daemon_start_stop(self, process, wait_for_pidfile, node_config, ms, cfg, chdir, supports, java_home, kill):
def test_daemon_start_stop(self, process, wait_for_pidfile, node_config, ms, cfg, chdir, supports, java_home, kill, path):
proc_launcher = launcher.ProcessLauncher(cfg, ms, paths.races_root(cfg))

nodes = proc_launcher.start([node_config])
Expand All @@ -139,6 +140,24 @@ def test_daemon_start_stop(self, process, wait_for_pidfile, node_config, ms, cfg
proc_launcher.keep_running = False
proc_launcher.stop(nodes)
self.assertTrue(kill.called)

def test_env_options_order(self):
cfg = config.Config()
cfg.add(config.Scope.application, "mechanic", "keep.running", False)
proc_launcher = launcher.ProcessLauncher(cfg, MockMetricsStore(), races_root_dir="/home")
default_car = team.Car(names="default-car", root_path=None, config_paths=["/tmp/rally-config"])

node_telemetry = [
telemetry.FlightRecorder(telemetry_params={}, log_root="/tmp/telemetry", java_major_version=8)
]
t = telemetry.Telemetry(["jfr"], devices=node_telemetry)
env = proc_launcher._prepare_env(car=default_car, node_name="node0", java_home="/java_home", t=t)

self.assertEqual("/java_home/bin" + os.pathsep + os.environ["PATH"], env["PATH"])
self.assertEqual("-XX:+ExitOnOutOfMemoryError -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints "
"-XX:+UnlockCommercialFeatures -XX:+FlightRecorder "
"-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath=/tmp/telemetry/default-car-node0.jfr "
"-XX:StartFlightRecording=defaultrecording=true", env["ES_JAVA_OPTS"])


class ExternalLauncherTests(TestCase):
Expand Down
3 changes: 0 additions & 3 deletions tests/mechanic/provisioner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
"cluster_settings": {
"indices.query.bool.max_clause_count": 50000,
},
"additional_java_settings": ["-XX:+ExitOnOutOfMemoryError"],
"heap": "4g",
"cluster_name": "rally-benchmark",
"node_name": "rally-node-0",
Expand Down Expand Up @@ -185,7 +184,6 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
"indices.query.bool.max_clause_count": 50000,
"plugin.mandatory": ["x-pack-security"]
},
"additional_java_settings": ["-XX:+ExitOnOutOfMemoryError"],
"heap": "4g",
"cluster_name": "rally-benchmark",
"node_name": "rally-node-0",
Expand Down Expand Up @@ -263,7 +261,6 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
"indices.query.bool.max_clause_count": 50000,
"plugin.mandatory": ["x-pack"]
},
"additional_java_settings": ["-XX:+ExitOnOutOfMemoryError"],
"heap": "4g",
"cluster_name": "rally-benchmark",
"node_name": "rally-node-0",
Expand Down

0 comments on commit 556de2d

Please sign in to comment.