Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run Modin on cluster #46

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions docs/pandas_on_ray.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,35 @@ has not already requested that functionality.
Using Pandas on Ray on a Cluster
--------------------------------

Currently, we do not yet support running Pandas on Ray on a cluster. Coming
Soon!
Currently, you can run Modin on a cluster using a Jupyter notebook interface.

First, create a config file which specifies the nodes in the cluster.
Then, run ``modin notebook --config=/path/to/config.yaml --port=8890`` from the
console in order to configure the cluster for use with Modin. The command will
launch a Jupyter notebook on the head node and expose it to the local machine
at the specified port.

A config file looks like this:

.. code-block:: yaml

# The execution framework on which Modin runs. Currently only supports ray.
execution_framework: ray

# Optional. The default SSH key used to access nodes.
key: ~/.ssh/key.pem

# Configuration for the head node. Requires hostname.
# Can set an optional key to override the global key.
head_node:
hostname: [email protected]

# Configuration for other nodes in the cluster. Each node requires a hostname.
# For each node, can set an optional key to override the global key.
nodes:
- hostname: [email protected]
- hostname: [email protected]
key: ~/.ssh/other_key.pem

Examples
--------
Expand Down
Empty file.
150 changes: 150 additions & 0 deletions modin/experimental/scripts/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest adding the from __future__ import ... lines for consistency

import subprocess
import yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl can you comment on whether whether it makes sense to support an autoscaler config that takes in a bunch of IP addresses for nodes in an existing cluster?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that should work



REQUIRED, OPTIONAL = True, False
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))

CLUSTER_CONFIG_SCHEMA = {
# Execution engine for the cluster.
# Possible options: ray.
"execution_framework": (str, REQUIRED),

# Default key used to SSH into nodes.
"key": (str, OPTIONAL),

# Head node on which notebooks may run
"head_node": (
{
"hostname": (str, REQUIRED),
"key": (str, OPTIONAL), # overrides global key
},
REQUIRED),

# Other nodes part of the cluster
"nodes": (
[
{
"hostname": (str, REQUIRED),
"key": (str, OPTIONAL), # overrides global key
}
],
OPTIONAL),
}


def typename(v):
if isinstance(v, type):
return v.__name__
else:
return type(v).__name__


def check_required(config, schema):
"""Check required config entries"""
if not isinstance(config, (dict, list)):
raise ValueError("Config is not a dictionary or a list")
if not isinstance(config, type(schema)):
raise ValueError("Config is a {0}, but schema is a {1}".format(
typename(config), typename(schema)))
if isinstance(config, list):
if not len(config):
return
item_schema = schema[0]
for item_config in config:
check_required(item_config, item_schema)
elif isinstance(config, dict):
for k, (v, kreq) in schema.items():
if v is None:
continue
if kreq is REQUIRED:
if k not in config:
raise ValueError(
"Missing required config key {0} of type {1}".format(
k, typename(v)))
if not isinstance(v, type):
check_required(config[k], v)


def check_extraneous(config, schema):
"""Check that all items in config are valid in schema"""
if not isinstance(config, (dict, list)):
raise ValueError("Config is not a dictionary or a list")
if not isinstance(config, type(schema)):
raise ValueError("Config is a {0}, but schema is a {1}".format(
typename(config), typename(schema)))
if isinstance(config, list):
if not len(config):
return
item_schema = schema[0]
for item_config in config:
# Check required keys in the item's schema because check_required
# does not navigate extraneous schema paths
check_required(item_config, item_schema)
check_extraneous(item_config, item_schema)
elif isinstance(config, dict):
for k in config:
if k not in schema:
raise ValueError(
"Unexpected config key {0} not in {1}".format(
k, list(schema.keys())))
v, kreq = schema[k]
if v is None:
continue
elif isinstance(v, type):
if not isinstance(config[k], v):
raise ValueError(
"Expected {0} for config key {1}, but got {2}"
.format(typename(v), k, type(config[k]).__name__))
else:
check_extraneous(config[k], v)


def validate_config(config, schema=CLUSTER_CONFIG_SCHEMA):
"""Validates a configuration given a schema"""
check_required(config, schema)
check_extraneous(config, schema)


def load_config(filename):
"""Loads a YAML file"""
with open(filename) as f:
return yaml.load(f.read())


def resolve_script_path(script_basename):
"""Returns the filepath of the script"""
return os.path.join(SCRIPTS_DIR, script_basename)


def add_key_to_ssh_agent(key_path):
"""Adds a key to SSH agent"""
subprocess.call(["ssh-add", key_path])


def add_keys_to_ssh_agent(config):
"""Adds keys to SSH agent"""
if "key" in config:
add_key_to_ssh_agent(config["key"])
if "key" in config["head_node"]:
add_key_to_ssh_agent(config["head_node"]["key"])
for node_config in config.get("nodes", []):
if "key" in node_config:
add_key_to_ssh_agent(node_config["key"])


def setup_notebook_ray(config, port):
# Create nodes file
with open("nodes.txt", "w") as f:
for node_config in config.get("nodes", []):
f.write(node_config["hostname"] + "\n")

# Launch cluster
subprocess.call(
["sh", resolve_script_path("configure_ray_cluster.sh"),
config["head_node"]["hostname"], "nodes.txt", port])
52 changes: 52 additions & 0 deletions modin/experimental/scripts/configure_ray_cluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/sh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made some scripts like this in the past, and in the end I've always had to convert them from bash -> Python. Happy to share examples.

Copy link
Collaborator Author

@pschafhalter pschafhalter Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love to take a look!


HOSTNAME=$1
NODE_LIST_FILEPATH=$2
NOTEBOOK_PORT=$3

HEAD_NODE_CONFIG=".modin_head_node_setup.sh"
NODE_LIST_FILE=$(basename $NODE_LIST_FILEPATH)
REDIS_PORT=6379

cat > $HEAD_NODE_CONFIG <<- EOM
PATH=\$PATH:~/.local/bin/ # ensure Ray is in the path

echo "Attempting to connect to worker nodes"
for HOST in \$(cat $NODE_LIST_FILE); do
echo -n "Connecting to $HOST..."
ssh -o "StrictHostKeyChecking no" $HOST uptime
echo "Connected"
done

if command -v ray; then
ray stop
fi

echo "Installing python dependencies..."
python -m pip install modin jupyter

echo "Starting Ray on port $REDIS_PORT..."
ray start --head --redis-port=$REDIS_PORT

echo "Connecting worker nodes..."
HEAD_NODE_IP=\$(hostname -I | tr -d "[:space:]")
REDIS_ADDRESS="\$HEAD_NODE_IP:$REDIS_PORT"

for HOST in \$(cat $NODE_LIST_FILE); do
echo "Starting Ray on \$HOST"
ssh -o StrictHostKeyChecking=no \$HOST "python -m pip install modin; PATH=\\\$PATH:~/.local/bin; ray start --redis-address=\$REDIS_ADDRESS" > /dev/null &
done

if [ "${NOTEBOOK_PORT-}" ]; then
MODIN_EXECUTION_FRAMEWORK=ray MODIN_RAY_REDIS_ADDRESS=\$REDIS_ADDRESS jupyter notebook --port $NOTEBOOK_PORT
fi
EOM

scp $NODE_LIST_FILEPATH $HOSTNAME:~
scp $HEAD_NODE_CONFIG $HOSTNAME:~

if [ "${NOTEBOOK_PORT-}" ]; then
ssh -A -L $NOTEBOOK_PORT:localhost:$NOTEBOOK_PORT -o StrictHostKeyChecking=no $HOSTNAME "bash $HEAD_NODE_CONFIG"
else
ssh -A -o StrictHostKeyChecking=no $HOSTNAME "bash $HEAD_NODE_CONFIG"
fi
17 changes: 17 additions & 0 deletions modin/experimental/scripts/example_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# The execution framework on which Modin runs. Currently only supports ray.
execution_framework: ray

# Optional. The default SSH key used to access nodes.
key: ~/.ssh/key.pem

# Configuration for the head node. Requires hostname.
# Can set an optional key to override the global key.
head_node:
hostname: [email protected]

# Configuration for other nodes in the cluster. Each node requires a hostname.
# For each node, can set an optional key to override the global key.
nodes:
- hostname: [email protected]
- hostname: [email protected]
key: ~/.ssh/other_key.pem
45 changes: 45 additions & 0 deletions modin/experimental/scripts/scripts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import click

from modin.experimental.scripts import cluster


@click.group()
def cli():
pass


@click.command()
@click.option(
"--config",
required=True,
type=str,
help="the config file for the cluster")
@click.option(
"--port",
default="8890",
help="port to which to forward the notebook server")
def notebook(config, port):
config = cluster.load_config(config)
cluster.validate_config(config)
execution_framework = config["execution_framework"]
if execution_framework == "ray":
cluster.setup_notebook_ray(config, port)
else:
raise NotImplementedError(
"Execution framework '{0}' not supported yet".format(
execution_framework))


cli.add_command(notebook)


def main():
return cli()


if __name__ == "__main__":
main()
8 changes: 7 additions & 1 deletion modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
CategoricalIndex, Series, bdate_range, DatetimeIndex,
Timedelta, Timestamp, to_timedelta, set_eng_float_format,
set_option, NaT, PeriodIndex, Categorical)
import os
import threading

DEFAULT_NPARTITIONS = 8
Expand Down Expand Up @@ -46,6 +47,11 @@ def get_npartitions():
try:
if threading.current_thread().name == "MainThread":
import ray
ray.init()
if os.environ.get("MODIN_EXECUTION_FRAMEWORK") == "ray" and \
os.environ.get("MODIN_RAY_REDIS_ADDRESS"):
redis_address = os.environ.get("MODIN_RAY_REDIS_ADDRESS")
ray.init(redis_address=redis_address)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect it will simplify users' lives to just call ray.init(redis_addres=...) but I could be wrong about that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devin-petersohn thoughts? This was what I was doing in an older version of the PR.

else:
ray.init()
except AssertionError:
pass
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
"a single line of code change.",
packages=find_packages(),
url="https://github.com/modin-project/modin",
install_requires=["pandas==0.22", "ray==0.5"])
install_requires=["pandas==0.22", "ray==0.5"],
entry_points={"console_scripts": ["modin=modin.experimental.scripts.scripts:main"]})