Skip to content

Commit

Permalink
Implement ES daemon-mode in process launcher (#701)
Browse files Browse the repository at this point in the history
Implement ES daemon-mode in process and docker launchers

* Replace logfile watcher with pidfile watcher
* Remove old StartupWatcher / subproc path
* Mock out process launcher unit test.
* Fix sysstat telemetry in mocked tests.
* Add .j2 to docker-compose.yml, add healthcheck.
  • Loading branch information
drawlerr authored Jun 20, 2019
1 parent 4c160c8 commit 7f6db41
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 140 deletions.
9 changes: 5 additions & 4 deletions esrally/mechanic/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
# specific language governing permissions and limitations
# under the License.


class Node:
"""
Represents an Elasticsearch cluster node.
"""

def __init__(self, process, host_name, node_name, telemetry):
def __init__(self, pid, host_name, node_name, telemetry):
"""
Creates a new node.
:param process: Process handle for this node.
:param pid: PID for this node.
:param host_name: The name of the host where this node is running.
:param node_name: The name of this node.
:param telemetry: The attached telemetry.
"""
self.process = process
self.pid = pid
self.host_name = host_name
self.node_name = node_name
self.ip = None
Expand Down Expand Up @@ -102,7 +103,7 @@ def has_node(self, name):
return self.node(name) is not None

def add_node(self, host_name, node_name):
new_node = Node(process=None, host_name=host_name, node_name=node_name, telemetry=None)
new_node = Node(pid=None, host_name=host_name, node_name=node_name, telemetry=None)
self.nodes.append(new_node)
return new_node

Expand Down
210 changes: 91 additions & 119 deletions esrally/mechanic/launcher.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ def create(cfg, metrics_store, all_node_ips, cluster_settings=None, sources=Fals
p = []
for node_id in node_ids:
p.append(provisioner.local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, challenge_root_path, node_id))
l = launcher.InProcessLauncher(cfg, metrics_store, races_root)
l = launcher.ProcessLauncher(cfg, metrics_store, races_root)
elif external:
if len(plugins) > 0:
raise exceptions.SystemSetupError("You cannot specify any plugins for externally provisioned clusters. Please remove "
Expand Down
6 changes: 3 additions & 3 deletions esrally/mechanic/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ def prepare(self, binary):
installer.invoke_install_hook(team.BootstrapPhase.post_install, provisioner_vars.copy())

return NodeConfiguration(self.es_installer.car, self.es_installer.node_ip, self.es_installer.node_name,
self.es_installer.node_root_dir, self.es_installer.es_home_path, self.es_installer.node_log_dir,
self.es_installer.data_paths)
self.es_installer.node_root_dir, self.es_installer.es_home_path,
self.es_installer.node_log_dir, self.es_installer.data_paths)

def cleanup(self):
self.es_installer.cleanup(self.preserve)
Expand Down Expand Up @@ -495,7 +495,7 @@ def _render_template(self, loader, template_name, variables):
raise exceptions.SystemSetupError("%s in %s" % (str(e), template_name))

def _render_template_from_file(self, variables):
compose_file = "%s/resources/docker-compose.yml" % self.rally_root
compose_file = "%s/resources/docker-compose.yml.j2" % self.rally_root
return self._render_template(loader=jinja2.FileSystemLoader(io.dirname(compose_file)),
template_name=io.basename(compose_file),
variables=variables)
2 changes: 1 addition & 1 deletion esrally/mechanic/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ def __init__(self, metrics_store, node_count_on_host):

def attach_to_node(self, node):
self.node = node
self.process = sysstats.setup_process_stats(node.process.pid)
self.process = sysstats.setup_process_stats(node.pid)

def on_benchmark_start(self):
if self.process is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@ services:
{%- for host_path, docker_path in mounts.items() %}
- {{host_path}}:{{docker_path}}
{%- endfor %}
healthcheck:
test: nc -z 127.0.0.1 {{http_port}}
interval: 5s
timeout: 2s
retries: 10
6 changes: 5 additions & 1 deletion esrally/utils/sysstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ def cpu_model():
"""
:return: The CPU model name.
"""
return cpuinfo.get_cpu_info()["brand"] if cpuinfo_available else "Unknown"
if cpuinfo_available:
cpu_info = cpuinfo.get_cpu_info()
if "brand" in cpu_info:
return cpu_info["brand"]
return "Unknown"


def disks():
Expand Down
53 changes: 48 additions & 5 deletions tests/mechanic/launcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import datetime
import logging
import os
from unittest import TestCase, mock

from esrally import config, exceptions
from esrally import config, exceptions, metrics, paths
from esrally.mechanic import launcher, provisioner, team
from esrally.utils import opts
from esrally.mechanic import launcher
from esrally.utils.io import guess_java_home


class MockMetricsStore:
Expand Down Expand Up @@ -93,6 +96,46 @@ def info(self, *args, **kwargs):
return self._info


class MockPopen:
def __init__(self, *args, **kwargs):
# Currently, the only code that checks returncode directly during
# ProcessLauncherTests are telemetry. If we return 1 for them we can skip
# mocking them as their optional functionality is disabled.
self.returncode = 1

def communicate(self, input=None, timeout=None):
return [b"", b""]

def wait(self):
return 0


MOCK_PID_VALUE = 1234


class ProcessLauncherTests(TestCase):
@mock.patch('os.kill')
@mock.patch('subprocess.Popen',new=MockPopen)
@mock.patch('esrally.mechanic.java_resolver.java_home', return_value=(12, "/java_home/"))
@mock.patch('esrally.utils.jvm.supports_option', return_value=True)
@mock.patch('os.chdir')
@mock.patch('esrally.config.Config')
@mock.patch('esrally.metrics.MetricsStore')
@mock.patch('esrally.mechanic.provisioner.NodeConfiguration')
@mock.patch('esrally.mechanic.launcher.wait_for_pidfile', return_value=MOCK_PID_VALUE)
@mock.patch('psutil.Process')
def test_daemon_start_stop(self, process, wait_for_pidfile, node_config, ms, cfg, chdir, supports, java_home, kill):
proc_launcher = launcher.ProcessLauncher(cfg, ms, paths.races_root(cfg))

nodes = proc_launcher.start([node_config])
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0].pid, MOCK_PID_VALUE)

proc_launcher.keep_running = False
proc_launcher.stop(nodes)
self.assertTrue(kill.called)


class ExternalLauncherTests(TestCase):
test_host = opts.TargetHosts("127.0.0.1:9200,10.17.0.5:19200")
client_options = opts.ClientOptions("timeout:60")
Expand All @@ -102,7 +145,7 @@ def test_setup_external_cluster_single_node(self):

cfg.add(config.Scope.application, "mechanic", "telemetry.devices", [])
cfg.add(config.Scope.application, "client", "hosts", self.test_host)
cfg.add(config.Scope.application, "client", "options",self.client_options)
cfg.add(config.Scope.application, "client", "options", self.client_options)

m = launcher.ExternalLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
m.start()
Expand Down Expand Up @@ -138,7 +181,7 @@ def test_launches_cluster(self):
cluster_launcher = launcher.ClusterLauncher(cfg, MockMetricsStore(), client_factory_class=MockClientFactory)
cluster = cluster_launcher.start()

self.assertEqual([{"host": "10.0.0.10", "port":9200}, {"host": "10.0.0.11", "port":9200}], cluster.hosts)
self.assertEqual([{"host": "10.0.0.10", "port": 9200}, {"host": "10.0.0.11", "port": 9200}], cluster.hosts)
self.assertIsNotNone(cluster.telemetry)

def test_launches_cluster_with_telemetry_client_timeout_enabled(self):
Expand Down
14 changes: 12 additions & 2 deletions tests/mechanic/provisioner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,12 @@ def test_provisioning_with_defaults(self, uuid4):
volumes:
- %s:/usr/share/elasticsearch/data
- %s:/var/log/elasticsearch
- %s:/usr/share/elasticsearch/heapdump""" % (data_dir, log_dir, heap_dump_dir), docker_cfg)
- %s:/usr/share/elasticsearch/heapdump
healthcheck:
test: nc -z 127.0.0.1 39200
interval: 5s
timeout: 2s
retries: 10""" % (data_dir, log_dir, heap_dump_dir), docker_cfg)

@mock.patch("uuid.uuid4")
def test_provisioning_with_variables(self, uuid4):
Expand Down Expand Up @@ -646,4 +651,9 @@ def test_provisioning_with_variables(self, uuid4):
volumes:
- %s:/usr/share/elasticsearch/data
- %s:/var/log/elasticsearch
- %s:/usr/share/elasticsearch/heapdump""" % (data_dir, log_dir, heap_dump_dir), docker_cfg)
- %s:/usr/share/elasticsearch/heapdump
healthcheck:
test: nc -z 127.0.0.1 39200
interval: 5s
timeout: 2s
retries: 10""" % (data_dir, log_dir, heap_dump_dir), docker_cfg)
8 changes: 4 additions & 4 deletions tests/mechanic/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2056,7 +2056,7 @@ def test_enriches_cluster_nodes_for_elasticsearch_after_1_x(self):
t = telemetry.Telemetry(devices=[telemetry.ClusterMetaDataInfo(client)])

c = cluster.Cluster(hosts=[{"host": "localhost", "port": 39200}],
nodes=[cluster.Node(process=None, host_name="local", node_name="rally0", telemetry=None)],
nodes=[cluster.Node(pid=None, host_name="local", node_name="rally0", telemetry=None)],
telemetry=t)

t.attach_to_cluster(c)
Expand Down Expand Up @@ -2139,7 +2139,7 @@ def test_enriches_cluster_nodes_for_elasticsearch_1_x(self):
t = telemetry.Telemetry(devices=[telemetry.ClusterMetaDataInfo(client)])

c = cluster.Cluster(hosts=[{"host": "localhost", "port": 39200}],
nodes=[cluster.Node(process=None, host_name="local", node_name="rally0", telemetry=None)],
nodes=[cluster.Node(pid=None, host_name="local", node_name="rally0", telemetry=None)],
telemetry=t)

t.attach_to_cluster(c)
Expand Down Expand Up @@ -2825,7 +2825,7 @@ def test_stores_index_size_for_data_paths(self, metrics_store_node_count, get_si
metrics_store = metrics.EsMetricsStore(cfg)
device = telemetry.IndexSize(["/var/elasticsearch/data/1", "/var/elasticsearch/data/2"], metrics_store)
t = telemetry.Telemetry(enabled_devices=[], devices=[device])
node = cluster.Node(process=None, host_name="localhost", node_name="rally-node-0", telemetry=t)
node = cluster.Node(pid=None, host_name="localhost", node_name="rally-node-0", telemetry=t)
t.attach_to_node(node)
t.on_benchmark_start()
t.on_benchmark_stop()
Expand All @@ -2847,7 +2847,7 @@ def test_stores_nothing_if_no_data_path(self, run_subprocess, metrics_store_clus
metrics_store = metrics.EsMetricsStore(cfg)
device = telemetry.IndexSize(data_paths=[], metrics_store=metrics_store)
t = telemetry.Telemetry(devices=[device])
node = cluster.Node(process=None, host_name="localhost", node_name="rally-node-0", telemetry=t)
node = cluster.Node(pid=None, host_name="localhost", node_name="rally-node-0", telemetry=t)
t.attach_to_node(node)
t.on_benchmark_start()
t.on_benchmark_stop()
Expand Down

0 comments on commit 7f6db41

Please sign in to comment.