Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Ray debugger on cluster #17231

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/ray/_private/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class RayParams:
failure.
start_initial_python_workers_for_first_job (bool): If true, start
initial Python workers for the first job on the node.
ray_debugger_external (bool): If true, make the Ray debugger for a worker
available externally to the node it is running on.
"""

def __init__(self,
Expand Down Expand Up @@ -162,6 +164,7 @@ def __init__(self,
include_log_monitor=None,
autoscaling_config=None,
start_initial_python_workers_for_first_job=False,
ray_debugger_external=False,
_system_config=None,
enable_object_reconstruction=False,
metrics_agent_port=None,
Expand Down Expand Up @@ -215,6 +218,7 @@ def __init__(self,
self.no_monitor = no_monitor
self.start_initial_python_workers_for_first_job = (
start_initial_python_workers_for_first_job)
self.ray_debugger_external = ray_debugger_external
self._system_config = _system_config or {}
self._enable_object_reconstruction = enable_object_reconstruction
self._check_usage()
Expand Down
6 changes: 5 additions & 1 deletion python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,8 @@ def start_raylet(redis_address,
socket_to_use=None,
start_initial_python_workers_for_first_job=False,
max_bytes=0,
backup_count=0):
backup_count=0,
ray_debugger_external=False):
"""Start a raylet, which is a combined local scheduler and object manager.

Args:
Expand Down Expand Up @@ -1389,6 +1390,8 @@ def start_raylet(redis_address,
RotatingFileHandler's maxBytes.
backup_count (int): Log rotation parameter. Corresponding to
RotatingFileHandler's backupCount.
ray_debugger_external (bool): True if the Ray debugger should be made
available externally to this node.
Returns:
ProcessInfo for the process that was started.
"""
Expand Down Expand Up @@ -1527,6 +1530,7 @@ def start_raylet(redis_address,
f"--metrics_export_port={metrics_export_port}",
f"--object_store_memory={object_store_memory}",
f"--plasma_directory={plasma_directory}",
f"--ray-debugger-external={1 if ray_debugger_external else 0}",
]
if worker_port_list is not None:
command.append(f"--worker_port_list={worker_port_list}")
Expand Down
4 changes: 3 additions & 1 deletion python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,9 @@ def start_raylet(self,
max_bytes=self.max_bytes,
backup_count=self.backup_count,
start_initial_python_workers_for_first_job=self._ray_params.
start_initial_python_workers_for_first_job)
start_initial_python_workers_for_first_job,
ray_debugger_external=self._ray_params.ray_debugger_external,
)
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]

Expand Down
13 changes: 10 additions & 3 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,12 @@ def debug(address):
type=str,
help="Module path to the Python function that will be used to set up the "
"runtime env in agent.")
@click.option(
"--ray-debugger-external",
is_flag=True,
default=False,
help="Make the Ray debugger available externally to the node. This is only"
"safe to activate if the node is behind a firewall.")
@add_click_options(logging_options)
def start(node_ip_address, address, port, redis_password, redis_shard_ports,
object_manager_port, node_manager_port, gcs_server_port,
Expand All @@ -510,8 +516,8 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
temp_dir, system_config, lru_evict, enable_object_reconstruction,
metrics_export_port, no_monitor, tracing_startup_hook,
worker_setup_hook, runtime_env_setup_hook, log_style, log_color,
verbose):
worker_setup_hook, runtime_env_setup_hook, ray_debugger_external,
log_style, log_color, verbose):
"""Start Ray processes manually on the local machine."""
cli_logger.configure(log_style, log_color, verbose)
if gcs_server_port and not head:
Expand Down Expand Up @@ -571,7 +577,8 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
no_monitor=no_monitor,
tracing_startup_hook=tracing_startup_hook,
worker_setup_hook=worker_setup_hook,
runtime_env_setup_hook=runtime_env_setup_hook)
runtime_env_setup_hook=runtime_env_setup_hook,
ray_debugger_external=ray_debugger_external)
if head:
# Use default if port is none, allocate an available port if port is 0
if port is None:
Expand Down
29 changes: 21 additions & 8 deletions python/ray/util/rpdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self,
breakpoint_uuid,
host,
port,
ip_address,
patch_stdstreams=False,
quiet=False):
self._breakpoint_uuid = breakpoint_uuid
Expand All @@ -87,12 +88,12 @@ def __init__(self,
self._listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
True)
self._listen_socket.bind((host, port))
self._ip_address = ip_address

def listen(self):
if not self._quiet:
cry("RemotePdb session open at %s:%s, "
"use 'ray debug' to connect..." %
self._listen_socket.getsockname())
"use 'ray debug' to connect..." % (self._ip_address, self._listen_socket.getsockname()[1]))
self._listen_socket.listen(1)
connection, address = self._listen_socket.accept()
if not self._quiet:
Expand Down Expand Up @@ -190,26 +191,35 @@ def connect_ray_pdb(host=None,
port=None,
patch_stdstreams=False,
quiet=None,
breakpoint_uuid=None):
breakpoint_uuid=None,
debugger_external=False):
"""
Opens a remote PDB on first available port.
"""
if host is None:
if debugger_external:
assert not host, "Cannot specify both host and debugger_external"
host = "0.0.0.0"
elif host is None:
host = os.environ.get("REMOTE_PDB_HOST", "127.0.0.1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we print a warning or something if REMOTE_PDB_HOST is set and the debugger_external flag is passed?

if port is None:
port = int(os.environ.get("REMOTE_PDB_PORT", "0"))
if quiet is None:
quiet = bool(os.environ.get("REMOTE_PDB_QUIET", ""))
if not breakpoint_uuid:
breakpoint_uuid = uuid.uuid4().hex
if debugger_external:
ip_address = ray.worker.global_worker.node_ip_address
else:
ip_address = "localhost"
rdb = RemotePdb(
breakpoint_uuid=breakpoint_uuid,
host=host,
port=port,
ip_address=ip_address,
patch_stdstreams=patch_stdstreams,
quiet=quiet)
sockname = rdb._listen_socket.getsockname()
pdb_address = "{}:{}".format(sockname[0], sockname[1])
pdb_address = "{}:{}".format(ip_address, sockname[1])
parentframeinfo = inspect.getouterframes(inspect.currentframe())[2]
data = {
"proctitle": setproctitle.getproctitle(),
Expand Down Expand Up @@ -239,13 +249,16 @@ def set_trace(breakpoint_uuid=None):
if ray.worker.global_worker.debugger_breakpoint == b"":
frame = sys._getframe().f_back
rdb = connect_ray_pdb(
None, None, False, None,
breakpoint_uuid.decode() if breakpoint_uuid else None)
host=None, port=None, patch_stdstreams=False, quiet=None,
breakpoint_uuid=breakpoint_uuid.decode() if breakpoint_uuid else None,
debugger_external=ray.worker.global_worker.ray_debugger_external)
rdb.set_trace(frame=frame)


def post_mortem():
rdb = connect_ray_pdb(None, None, False, None)
rdb = connect_ray_pdb(
host=None, port=None, patch_stdstreams=False, quet=None,
debugger_external=ray.worker.global_worker.ray_debugger_external)
rdb.post_mortem()


Expand Down
15 changes: 12 additions & 3 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ def __init__(self):
# by the worker should drop into the debugger at the specified
# breakpoint ID.
self.debugger_get_breakpoint = b""
# If True, make the debugger external to the node this worker is
# running on.
self.ray_debugger_external = False
self._load_code_from_local = False
# Used to toggle whether or not logs should be filtered to only those
# produced in the same job.
Expand Down Expand Up @@ -1223,7 +1226,8 @@ def connect(node,
namespace=None,
job_config=None,
runtime_env_hash=0,
worker_shim_pid=0):
worker_shim_pid=0,
ray_debugger_external=False):
"""Connect this worker to the raylet, to Plasma, and to Redis.

Args:
Expand All @@ -1239,6 +1243,8 @@ def connect(node,
runtime_env_hash (int): The hash of the runtime env for this worker.
worker_shim_pid (int): The PID of the process for setup worker
runtime env.
ray_debugger_host (bool): The host to bind a Ray debugger to on
this worker.
"""
# Do some basic checking to make sure we didn't call ray.init twice.
error_message = "Perhaps you called ray.init twice by accident?"
Expand Down Expand Up @@ -1338,6 +1344,8 @@ def connect(node,
if mode == WORKER_MODE:
os.environ["PYTHONBREAKPOINT"] = "ray.util.rpdb.set_trace"

worker.ray_debugger_external = ray_debugger_external

serialized_job_config = job_config.serialize()
worker.core_worker = ray._raylet.CoreWorker(
mode, node.plasma_store_socket_name, node.raylet_socket_name, job_id,
Expand Down Expand Up @@ -1589,8 +1597,9 @@ def get(object_refs, *, timeout=None):
if debugger_breakpoint != b"":
frame = sys._getframe().f_back
rdb = ray.util.pdb.connect_ray_pdb(
None, None, False, None,
debugger_breakpoint.decode() if debugger_breakpoint else None)
host=None, port=None, patch_stdstreams=False, quiet=None,
breakpoint_uuid=debugger_breakpoint.decode() if debugger_breakpoint else None,
debugger_external=worker.ray_debugger_external)
rdb.set_trace(frame=frame)

return values
Expand Down
8 changes: 7 additions & 1 deletion python/ray/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@
type=int,
default=0,
help="The PID of the process for setup worker runtime env.")
parser.add_argument(
"--ray-debugger-external",
default=False,
action="store_true",
help="True if Ray debugger is made available externally.")
if __name__ == "__main__":
# NOTE(sang): For some reason, if we move the code below
# to a separate function, tensorflow will capture that method
Expand Down Expand Up @@ -184,7 +189,8 @@
node,
mode=mode,
runtime_env_hash=args.runtime_env_hash,
worker_shim_pid=args.worker_shim_pid)
worker_shim_pid=args.worker_shim_pid,
ray_debugger_external=args.ray_debugger_external)

# Add code search path to sys.path, set load_code_from_local.
core_worker = ray.worker.global_worker.core_worker
Expand Down
3 changes: 3 additions & 0 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ DEFINE_string(redis_password, "", "The password of redis.");
DEFINE_string(temp_dir, "", "Temporary directory.");
DEFINE_string(session_dir, "", "The path of this ray session directory.");
DEFINE_string(resource_dir, "", "The path of this ray resource directory.");
DEFINE_int32(ray_debugger_external, 0, "Make Ray debugger externally accessible.");
// store options
DEFINE_int64(object_store_memory, -1, "The initial memory of the object store.");
#ifdef __linux__
Expand Down Expand Up @@ -95,6 +96,7 @@ int main(int argc, char *argv[]) {
const std::string temp_dir = FLAGS_temp_dir;
const std::string session_dir = FLAGS_session_dir;
const std::string resource_dir = FLAGS_resource_dir;
const int ray_debugger_external = FLAGS_ray_debugger_external;
const int64_t object_store_memory = FLAGS_object_store_memory;
const std::string plasma_directory = FLAGS_plasma_directory;
const bool huge_pages = FLAGS_huge_pages;
Expand Down Expand Up @@ -203,6 +205,7 @@ int main(int argc, char *argv[]) {
node_manager_config.temp_dir = temp_dir;
node_manager_config.session_dir = session_dir;
node_manager_config.resource_dir = resource_dir;
node_manager_config.ray_debugger_external = ray_debugger_external;
node_manager_config.max_io_workers = RayConfig::instance().max_io_workers();
node_manager_config.min_spilling_size = RayConfig::instance().min_spilling_size();

Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
config.worker_commands,
/*starting_worker_timeout_callback=*/
[this] { cluster_task_manager_->ScheduleAndDispatchTasks(); },
config.ray_debugger_external,
/*get_time=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; }),
client_call_manager_(io_service),
worker_rpc_pool_(client_call_manager_),
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ struct NodeManagerConfig {
std::string session_dir;
/// The path of this ray resource dir.
std::string resource_dir;
/// If true make Ray debugger available externally.
int ray_debugger_external;
/// The raylet config list of this node.
std::string raylet_config;
// The time between record metrics in milliseconds, or 0 to disable.
Expand Down
6 changes: 6 additions & 0 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service, const NodeID node_id
std::shared_ptr<gcs::GcsClient> gcs_client,
const WorkerCommandMap &worker_commands,
std::function<void()> starting_worker_timeout_callback,
int ray_debugger_external,
const std::function<double()> get_time)
: io_service_(&io_service),
node_id_(node_id),
Expand All @@ -73,6 +74,7 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service, const NodeID node_id
maximum_startup_concurrency_(maximum_startup_concurrency),
gcs_client_(std::move(gcs_client)),
starting_worker_timeout_callback_(starting_worker_timeout_callback),
ray_debugger_external(ray_debugger_external),
first_job_registered_python_worker_count_(0),
first_job_driver_wait_num_python_workers_(std::min(
num_initial_python_workers_for_first_job, maximum_startup_concurrency)),
Expand Down Expand Up @@ -318,6 +320,10 @@ Process WorkerPool::StartWorkerProcess(
worker_command_args.push_back("--serialized-runtime-env-context=" +
serialized_runtime_env_context);
}

if (ray_debugger_external) {
worker_command_args.push_back("--ray-debugger-external");
}
}

// We use setproctitle to change python worker process title,
Expand Down
5 changes: 5 additions & 0 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// language.
/// \param starting_worker_timeout_callback The callback that will be triggered once
/// it times out to start a worker.
/// \param ray_debugger_external Ray debugger in workers will be started in a way
/// that they are accessible from outside the node.
/// \param get_time A callback to get the current time.
WorkerPool(instrumented_io_context &io_service, const NodeID node_id,
const std::string node_address, int num_workers_soft_limit,
Expand All @@ -130,6 +132,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
std::shared_ptr<gcs::GcsClient> gcs_client,
const WorkerCommandMap &worker_commands,
std::function<void()> starting_worker_timeout_callback,
int ray_debugger_external,
const std::function<double()> get_time);

/// Destructor responsible for freeing a set of workers owned by this class.
Expand Down Expand Up @@ -532,6 +535,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
std::shared_ptr<gcs::GcsClient> gcs_client_;
/// The callback that will be triggered once it times out to start a worker.
std::function<void()> starting_worker_timeout_callback_;
/// If 1, expose Ray debuggers started by the workers externally (to this node).
int ray_debugger_external;
FRIEND_TEST(WorkerPoolTest, InitialWorkerProcessCount);

/// The Job ID of the firstly received job.
Expand Down