Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run Modin on cluster #46

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions docs/pandas_on_ray.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,35 @@ has not already requested that functionality.
Using Pandas on Ray on a Cluster
--------------------------------

Currently, we do not yet support running Pandas on Ray on a cluster. Coming
Soon!
Currently, you can run Modin on a cluster using a Jupyter notebook interface.

First, create a config file which specifies the nodes in the cluster.
Then, run ``modin notebook --config=/path/to/config.yaml --port=8890`` from the
console in order to configure the cluster for use with Modin. The command will
launch a Jupyter notebook on the head node and expose it to the local machine
at the specified port.

A config file looks like this:

.. code-block:: yaml

# The execution engine on which Modin runs. Currently only supports ray.
execution_engine: ray

# Optional. The default SSH key used to access nodes.
key: ~/.ssh/key.pem

# Configuration for the head node. Requires hostname.
# Can set an optional key to override the global key.
head_node:
hostname: [email protected]

# Configuration for other nodes in the cluster. Each node requires a hostname.
# For each node, can set an optional key to override the global key.
nodes:
- hostname: [email protected]
- hostname: [email protected]
key: ~/.ssh/other_key.pem

Examples
--------
Expand Down
182 changes: 182 additions & 0 deletions modin/scripts/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import os
import subprocess
import yaml


REQUIRED, OPTIONAL = True, False
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))

CLUSTER_CONFIG_SCHEMA = {
# Execution engine for the cluster.
# Possible options: ray.
"execution_engine": (str, REQUIRED),

# Default key used to SSH into nodes.
"key": (str, OPTIONAL),

# Head node on which notebooks may run
"head_node": (
{
"hostname": (str, REQUIRED),
"key": (str, OPTIONAL), # overrides global key
},
REQUIRED),

# Other nodes part of the cluster
"nodes": (
[
{
"hostname": (str, REQUIRED),
"key": (str, OPTIONAL), # overrides global key
}
],
OPTIONAL),
}


def typename(v):
if isinstance(v, type):
return v.__name__
else:
return type(v).__name__


def check_required(config, schema):
"""Check required config entries"""
if type(config) is not dict and type(config) is not list:
raise ValueError("Config is not a dictionary or a list")
if type(config) != type(schema):
raise ValueError("Config is a {0}, but schema is a {1}".format(
typename(config), typename(schema)))
if type(config) is list:
if not len(config):
return
item_schema = schema[0]
for item_config in config:
check_required(item_config, item_schema)
elif type(config) is dict:
for k, (v, kreq) in schema.items():
if v is None:
continue
if kreq is REQUIRED:
if k not in config:
raise ValueError(
"Missing required config key {0} of type {1}".format(
k, typename(v)))
if not isinstance(v, type):
check_required(config[k], v)


def check_extraneous(config, schema):
"""Check that all items in config are valid in schema"""
if type(config) is not dict and type(config) is not list:
raise ValueError("Config is not a dictionary or a list")
if type(config) != type(schema):
raise ValueError("Config is a {0}, but schema is a {1}".format(
typename(config), typename(schema)))
if type(config) is list:
if not len(config):
return
item_schema = schema[0]
for item_config in config:
# Check required keys in the item's schema because check_required
# does not navigate extraneous schema paths
check_required(item_config, item_schema)
check_extraneous(item_config, item_schema)
elif type(config) is dict:
for k in config:
if k not in schema:
raise ValueError(
"Unexpected config key {0} not in {1}".format(
k, list(schema.keys())))
v, kreq = schema[k]
if v is None:
continue
elif isinstance(v, type):
if not isinstance(config[k], v):
raise ValueError(
"Expected {0} for config key {1}, but got {2}"
.format(typename(v), k, type(config[k]).__name__))
else:
check_extraneous(config[k], v)


def validate_config(config, schema=CLUSTER_CONFIG_SCHEMA):
"""Validates a configuration given a schema"""
check_required(config, schema)
check_extraneous(config, schema)


def load_config(filename):
"""Loads a YAML file"""
with open(filename) as f:
return yaml.load(f.read())


def resolve_script_path(script_basename):
"""Returns the filepath of the script"""
return os.path.join(SCRIPTS_DIR, script_basename)


def setup_head_node(config):
"""Sets up the head node given a valid configuration"""
hostname = config["head_node"]["hostname"]
key = config["head_node"].get("key") or config.get("key")
if not key:
raise ValueError("Missing key for head_node")

output = subprocess.check_output(
["sh", resolve_script_path("configure_head_node.sh"), hostname,
key])

redis_address = subprocess.check_output(
["sh", resolve_script_path("get_redis_address.sh"), output])
redis_address = redis_address.decode("ascii").strip()

return redis_address


def setup_nodes(config, redis_address):
"""Sets up nodes given the config and the redis address"""
try:
from subprocess import DEVNULL
except ImportError:
import os
DEVNULL = open(os.devnull, "wb")

for node in config.get("nodes", []):
hostname = node["hostname"]
key = node.get("key") or config.get("key")
if not key:
raise ValueError("Missing key for node {0}".format(hostname))

subprocess.Popen(
["sh", resolve_script_path("configure_node.sh"), hostname, key,
redis_address], stdout=DEVNULL, stderr=DEVNULL)


def setup_cluster(config):
"""Sets up a cluster given a valid configuration"""
if config["execution_engine"] != "ray":
raise NotImplementedError("Only Ray clusters supported for now")

redis_address = setup_head_node(config)
setup_nodes(config, redis_address)

return redis_address


def launch_notebook(config, port, blocking=True):
"""SSH into the head node, launches a notebook, and forwards port"""
hostname = config["head_node"]["hostname"]
key = config["head_node"].get("key") or config.get("key")
if not key:
raise ValueError("Missing key for head_node")

if blocking:
subprocess.call(
["sh", resolve_script_path("launch_notebook.sh"), hostname,
key, port])
else:
subprocess.Popen(["sh", resolve_script_path("launch_notebook.sh"),
hostname, key, port])
11 changes: 11 additions & 0 deletions modin/scripts/configure_head_node.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/sh

HOSTNAME=$1
KEY=$2

ssh -i $2 -o "StrictHostKeyChecking no" $1 << "ENDSSH"
python -m pip install modin jupyter
PATH=$PATH:~/.local/bin/ # ensure Ray is in the path
ray stop
ray start --head
ENDSSH
12 changes: 12 additions & 0 deletions modin/scripts/configure_node.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/sh

HOSTNAME=$1
KEY=$2
REDIS_ADDRESS=$3

ssh -i $2 -o "StrictHostKeyChecking no" $1 REDIS_ADDRESS=$REDIS_ADDRESS "bash -s" << "ENDSSH"
python -m pip install modin
PATH=$PATH:~/.local/bin/ # ensure Ray is in the path
ray stop
ray start --redis-address $REDIS_ADDRESS
ENDSSH
17 changes: 17 additions & 0 deletions modin/scripts/example_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# The execution engine on which Modin runs. Currently only supports ray.
execution_engine: ray

# Optional. The default SSH key used to access nodes.
key: ~/.ssh/key.pem

# Configuration for the head node. Requires hostname.
# Can set an optional key to override the global key.
head_node:
hostname: [email protected]

# Configuration for other nodes in the cluster. Each node requires a hostname.
# For each node, can set an optional key to override the global key.
nodes:
- hostname: [email protected]
- hostname: [email protected]
key: ~/.ssh/other_key.pem
7 changes: 7 additions & 0 deletions modin/scripts/get_redis_address.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/sh

STRING=$1

RAY_START_CMD=$(echo $STRING | grep -o "ray start --redis-address [0-9\.:]\+")
REDIS_ADDRESS=$(echo $RAY_START_CMD | grep -o "[0-9\.:]\+")
echo $REDIS_ADDRESS
9 changes: 9 additions & 0 deletions modin/scripts/launch_notebook.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/sh

HOSTNAME=$1
KEY=$2
PORT=$3

ssh -i $2 -L $PORT:localhost:$PORT $1 "bash -s" << INT
PATH=$PATH:~/.local/bin/ # ensure Jupyter is in the path
jupyter notebook --port=$PORT
49 changes: 49 additions & 0 deletions modin/scripts/scripts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import absolute_import
from __future__ import print_function

import click

from modin.scripts import cluster


@click.group()
def cli():
pass


@click.command()
@click.option(
"--config",
required=True,
type=str,
help="the config file for the cluster")
@click.option(
"--port",
required=True,
help="port to which to forward the notebook server")
def notebook(config, port):
config = cluster.load_config(config)
cluster.validate_config(config)
print("\nSetting up cluster\n")
redis_address = cluster.setup_cluster(config)
print("\nLaunching notebook\n")
print("*" * 68)
print(("To connect to the cluster, run the following commands in the "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this would go in our __init__ so we should be able to detect if the cli was used.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I'm still thinking about how to best set this up. Do you think setting an environment variable with the redis address and detecting that in the __init__ is a good solution?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would probably be good, but we probably should use two environment variables, one for MODIN_EXECUTION_FRAMEWORK and one for MODIN_RAY_REDIS_ADDRESS or something like that.

"notebook:\n"
"\t\timport ray\n"
"\t\tray.init(redis_address=\"{0}\")\n"
"\t\timport modin").format(redis_address))
print("*" * 68)

cluster.launch_notebook(config, port)


cli.add_command(notebook)


def main():
return cli()


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
"a single line of code change.",
packages=find_packages(),
url="https://github.com/modin-project/modin",
install_requires=["pandas==0.22", "ray==0.5"])
install_requires=["pandas==0.22", "ray==0.5"],
entry_points={"console_scripts": ["modin=modin.scripts.scripts:main"]})