diff --git a/Dockerfile_k8s b/Dockerfile_k8s new file mode 100644 index 00000000000..12dbaa9006c --- /dev/null +++ b/Dockerfile_k8s @@ -0,0 +1,50 @@ +FROM continuumio/miniconda3:22.11.1 + +# TODO(romilb): Investigate if this image can be consolidated with the skypilot +# client image (`Dockerfile`) + +# Initialize conda for root user, install ssh and other local dependencies +RUN apt update -y && \ + apt install gcc rsync sudo patch openssh-server pciutils nano fuse -y && \ + rm -rf /var/lib/apt/lists/* && \ + apt remove -y python3 && \ + conda init + +# Setup SSH and generate hostkeys +RUN mkdir -p /var/run/sshd && \ + sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config && \ + sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd && \ + cd /etc/ssh/ && \ + ssh-keygen -A + +# Setup new user named sky and add to sudoers. Also add /opt/conda/bin to sudo path. +RUN useradd -m -s /bin/bash sky && \ + echo "sky ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers && \ + echo 'Defaults secure_path="/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"' > /etc/sudoers.d/sky + +# Switch to sky user +USER sky + +# Install SkyPilot pip dependencies +RUN pip install wheel Click colorama cryptography jinja2 jsonschema && \ + pip install networkx oauth2client pandas pendulum PrettyTable && \ + pip install ray==2.4.0 rich tabulate filelock && \ + pip install packaging 'protobuf<4.0.0' pulp && \ + pip install awscli boto3 pycryptodome==3.12.0 && \ + pip install docker kubernetes + +# Add /home/sky/.local/bin/ to PATH +RUN echo 'export PATH="$PATH:$HOME/.local/bin"' >> ~/.bashrc + +# Install SkyPilot. This is purposely separate from installing SkyPilot +# dependencies to optimize rebuild time +COPY --chown=sky . /skypilot/sky/ + +# TODO(romilb): Installing SkyPilot may not be necessary since ray up will do it +RUN cd /skypilot/ && \ + sudo mv -v sky/setup_files/* . && \ + pip install ".[aws]" + +# Set WORKDIR and initialize conda for sky user +WORKDIR /home/sky +RUN conda init diff --git a/sky/__init__.py b/sky/__init__.py index 715a126e1d1..c814cee3e62 100644 --- a/sky/__init__.py +++ b/sky/__init__.py @@ -32,19 +32,21 @@ Lambda = clouds.Lambda SCP = clouds.SCP Local = clouds.Local +Kubernetes = clouds.Kubernetes OCI = clouds.OCI optimize = Optimizer.optimize __all__ = [ '__version__', - 'IBM', 'AWS', 'Azure', 'GCP', + 'IBM', + 'Kubernetes', 'Lambda', - 'SCP', 'Local', 'OCI', + 'SCP', 'Optimizer', 'OptimizeTarget', 'backends', diff --git a/sky/adaptors/kubernetes.py b/sky/adaptors/kubernetes.py new file mode 100644 index 00000000000..79daa6f2434 --- /dev/null +++ b/sky/adaptors/kubernetes.py @@ -0,0 +1,140 @@ +"""Kubernetes adaptors""" + +# pylint: disable=import-outside-toplevel + +import functools +import os + +from sky.utils import ux_utils, env_options + +kubernetes = None +urllib3 = None + +_configured = False +_core_api = None +_auth_api = None +_networking_api = None +_custom_objects_api = None + +# Timeout to use for API calls +API_TIMEOUT = 5 + + +def import_package(func): + + @functools.wraps(func) + def wrapper(*args, **kwargs): + global kubernetes + global urllib3 + if kubernetes is None: + try: + import kubernetes as _kubernetes + import urllib3 as _urllib3 + except ImportError: + # TODO(romilb): Update this message to point to installation + # docs when they are ready. + raise ImportError('Fail to import dependencies for Kubernetes. ' + 'Run `pip install kubernetes` to ' + 'install them.') from None + kubernetes = _kubernetes + urllib3 = _urllib3 + return func(*args, **kwargs) + + return wrapper + + +@import_package +def get_kubernetes(): + return kubernetes + + +@import_package +def _load_config(): + global _configured + if _configured: + return + try: + # Load in-cluster config if running in a pod + # Kubernetes set environment variables for service discovery do not + # show up in SkyPilot tasks. For now, we work around by using + # DNS name instead of environment variables. + # See issue: https://github.com/skypilot-org/skypilot/issues/2287 + os.environ['KUBERNETES_SERVICE_HOST'] = 'kubernetes.default.svc' + os.environ['KUBERNETES_SERVICE_PORT'] = '443' + kubernetes.config.load_incluster_config() + except kubernetes.config.config_exception.ConfigException: + try: + kubernetes.config.load_kube_config() + except kubernetes.config.config_exception.ConfigException as e: + suffix = '' + if env_options.Options.SHOW_DEBUG_INFO.get(): + suffix += f' Error: {str(e)}' + # Check if exception was due to no current-context + if 'Expected key current-context' in str(e): + err_str = ('Failed to load Kubernetes configuration. ' + 'Kubeconfig does not contain any valid context(s).' + f'{suffix}\n' + ' If you were running a local Kubernetes ' + 'cluster, run `sky local up` to start the cluster.') + else: + err_str = ( + 'Failed to load Kubernetes configuration. ' + f'Please check if your kubeconfig file is valid.{suffix}') + with ux_utils.print_exception_no_traceback(): + raise ValueError(err_str) from None + _configured = True + + +@import_package +def core_api(): + global _core_api + if _core_api is None: + _load_config() + _core_api = kubernetes.client.CoreV1Api() + + return _core_api + + +@import_package +def auth_api(): + global _auth_api + if _auth_api is None: + _load_config() + _auth_api = kubernetes.client.RbacAuthorizationV1Api() + + return _auth_api + + +@import_package +def networking_api(): + global _networking_api + if _networking_api is None: + _load_config() + _networking_api = kubernetes.client.NetworkingV1Api() + + return _networking_api + + +@import_package +def custom_objects_api(): + global _custom_objects_api + if _custom_objects_api is None: + _load_config() + _custom_objects_api = kubernetes.client.CustomObjectsApi() + + return _custom_objects_api + + +@import_package +def api_exception(): + return kubernetes.client.rest.ApiException + + +@import_package +def config_exception(): + return kubernetes.config.config_exception.ConfigException + + +@import_package +def max_retry_error(): + return urllib3.exceptions.MaxRetryError diff --git a/sky/authentication.py b/sky/authentication.py index 3223357a68d..d5aa2ff1787 100644 --- a/sky/authentication.py +++ b/sky/authentication.py @@ -373,3 +373,33 @@ def setup_scp_authentication(config: Dict[str, Any]) -> Dict[str, Any]: with open(public_key_path, 'r') as f: public_key = f.read().strip() return _replace_ssh_info_in_config(config, public_key) + + +def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]: + get_or_generate_keys() + + # Run kubectl command to add the public key to the cluster. + public_key_path = os.path.expanduser(PUBLIC_SSH_KEY_PATH) + key_label = clouds.Kubernetes.SKY_SSH_KEY_SECRET_NAME + cmd = f'kubectl create secret generic {key_label} ' \ + f'--from-file=ssh-publickey={public_key_path}' + try: + subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True) + except subprocess.CalledProcessError as e: + output = e.output.decode('utf-8') + suffix = f'\nError message: {output}' + if 'already exists' in output: + logger.debug( + f'Key {key_label} already exists in the cluster, using it...') + elif any(err in output for err in ['connection refused', 'timeout']): + with ux_utils.print_exception_no_traceback(): + raise ConnectionError( + 'Failed to connect to the cluster. Check if your ' + 'cluster is running, your kubeconfig is correct ' + 'and you can connect to it using: ' + f'kubectl get namespaces.{suffix}') from e + else: + logger.error(suffix) + raise + + return config diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index f0137841ee8..4b71ca4ac44 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -401,7 +401,7 @@ class SSHConfigHelper(object): @classmethod def _get_generated_config(cls, autogen_comment: str, host_name: str, ip: str, username: str, ssh_key_path: str, - proxy_command: Optional[str]): + proxy_command: Optional[str], port: int): if proxy_command is not None: proxy = f'ProxyCommand {proxy_command}' else: @@ -423,7 +423,7 @@ def _get_generated_config(cls, autogen_comment: str, host_name: str, StrictHostKeyChecking no UserKnownHostsFile=/dev/null GlobalKnownHostsFile=/dev/null - Port 22 + Port {port} {proxy} """.rstrip()) codegen = codegen + '\n' @@ -431,12 +431,8 @@ def _get_generated_config(cls, autogen_comment: str, host_name: str, @classmethod @timeline.FileLockEvent(ssh_conf_lock_path) - def add_cluster( - cls, - cluster_name: str, - ips: List[str], - auth_config: Dict[str, str], - ): + def add_cluster(cls, cluster_name: str, ips: List[str], + auth_config: Dict[str, str], ports: List[int]): """Add authentication information for cluster to local SSH config file. If a host with `cluster_name` already exists and the configuration was @@ -452,6 +448,7 @@ def add_cluster( ips: List of public IP addresses in the cluster. First IP is head node. auth_config: read_yaml(handle.cluster_yaml)['auth'] + ports: List of port numbers for SSH corresponding to ips """ username = auth_config['ssh_user'] key_path = os.path.expanduser(auth_config['ssh_private_key']) @@ -492,8 +489,10 @@ def add_cluster( os.chmod(config_path, 0o644) proxy_command = auth_config.get('ssh_proxy_command', None) + head_port = ports[0] codegen = cls._get_generated_config(sky_autogen_comment, host_name, ip, - username, key_path, proxy_command) + username, key_path, proxy_command, + head_port) # Add (or overwrite) the new config. if overwrite: @@ -592,9 +591,15 @@ def _add_multinode_config( f'host named {worker_names[idx]}.') host_name = external_worker_ips[idx] logger.warning(f'Using {host_name} to identify host instead.') + # TODO(romilb): Update port number when k8s supports multinode codegens[idx] = cls._get_generated_config( - sky_autogen_comment, host_name, external_worker_ips[idx], - username, key_path, proxy_command) + sky_autogen_comment, + host_name, + external_worker_ips[idx], + username, + key_path, + proxy_command, + port=22) # All workers go to SKY_USER_FILE_PATH/ssh/{cluster_name} for i, line in enumerate(extra_config): @@ -605,16 +610,26 @@ def _add_multinode_config( host_name = worker_names[idx] overwrites[idx] = True overwrite_begin_idxs[idx] = i - 1 + # TODO(romilb): Update port number when k8s supports multinode codegens[idx] = cls._get_generated_config( - sky_autogen_comment, host_name, external_worker_ips[idx], - username, key_path, proxy_command) + sky_autogen_comment, + host_name, + external_worker_ips[idx], + username, + key_path, + proxy_command, + port=22) # This checks if all codegens have been created. for idx, ip in enumerate(external_worker_ips): if not codegens[idx]: - codegens[idx] = cls._get_generated_config( - sky_autogen_comment, worker_names[idx], ip, username, - key_path, proxy_command) + codegens[idx] = cls._get_generated_config(sky_autogen_comment, + worker_names[idx], + ip, + username, + key_path, + proxy_command, + port=22) for idx in range(len(external_worker_ips)): # Add (or overwrite) the new config. @@ -1068,6 +1083,8 @@ def _add_auth_to_cluster_config(cloud: clouds.Cloud, cluster_config_file: str): config = auth.setup_azure_authentication(config) elif isinstance(cloud, clouds.Lambda): config = auth.setup_lambda_authentication(config) + elif isinstance(cloud, clouds.Kubernetes): + config = auth.setup_kubernetes_authentication(config) elif isinstance(cloud, clouds.IBM): config = auth.setup_ibm_authentication(config) elif isinstance(cloud, clouds.SCP): @@ -1592,6 +1609,28 @@ def get_head_ip( return head_ip +@timeline.event +def get_head_ssh_port( + handle: 'cloud_vm_ray_backend.CloudVmRayResourceHandle', + use_cache: bool = True, + max_attempts: int = 1, +) -> int: + """Returns the ip of the head node.""" + del max_attempts # Unused. + # Use port 22 for everything except Kubernetes + # TODO(romilb): Add a get port method to the cloud classes. + head_ssh_port = 22 + if not isinstance(handle.launched_resources.cloud, clouds.Kubernetes): + return head_ssh_port + else: + if use_cache and handle.head_ssh_port is not None: + head_ssh_port = handle.head_ssh_port + else: + svc_name = f'{handle.get_cluster_name()}-ray-head-ssh' + head_ssh_port = clouds.Kubernetes.get_port(svc_name) + return head_ssh_port + + def check_network_connection(): # Tolerate 3 retries as it is observed that connections can fail. adapter = adapters.HTTPAdapter(max_retries=retry_lib.Retry(total=3)) @@ -1883,7 +1922,8 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: # Check if ray cluster status is healthy. ssh_credentials = ssh_credential_from_yaml(handle.cluster_yaml) runner = command_runner.SSHCommandRunner(external_ips[0], - **ssh_credentials) + **ssh_credentials, + port=handle.head_ssh_port) rc, output, _ = runner.run(RAY_STATUS_WITH_SKY_RAY_PORT_COMMAND, stream_logs=False, require_outputs=True, diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index e984d3ac2f9..b8daa6adabe 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -141,6 +141,7 @@ def _get_cluster_config_template(cloud): clouds.Local: 'local-ray.yml.j2', clouds.SCP: 'scp-ray.yml.j2', clouds.OCI: 'oci-ray.yml.j2', + clouds.Kubernetes: 'kubernetes-ray.yml.j2', } return cloud_to_template[type(cloud)] @@ -931,6 +932,34 @@ def _update_blocklist_on_lambda_error( self._blocked_resources.add( launchable_resources.copy(region=r.name, zone=None)) + def _update_blocklist_on_kubernetes_error( + self, launchable_resources: 'resources_lib.Resources', region, + zones, stdout, stderr): + del zones # Unused. + style = colorama.Style + stdout_splits = stdout.split('\n') + stderr_splits = stderr.split('\n') + errors = [ + s.strip() + for s in stdout_splits + stderr_splits + if 'KubernetesError:' in s.strip() + ] + if not errors: + logger.info('====== stdout ======') + for s in stdout_splits: + print(s) + logger.info('====== stderr ======') + for s in stderr_splits: + print(s) + with ux_utils.print_exception_no_traceback(): + raise RuntimeError('Errors occurred during provisioning; ' + 'check logs above.') + + logger.warning(f'Got error(s) in {region.name}:') + messages = '\n\t'.join(errors) + logger.warning(f'{style.DIM}\t{messages}{style.RESET_ALL}') + self._blocked_resources.add(launchable_resources.copy(zone=None)) + def _update_blocklist_on_scp_error( self, launchable_resources: 'resources_lib.Resources', region, zones, stdout, stderr): @@ -1115,6 +1144,7 @@ def _update_blocklist_on_error( clouds.IBM: self._update_blocklist_on_ibm_error, clouds.SCP: self._update_blocklist_on_scp_error, clouds.Local: self._update_blocklist_on_local_error, + clouds.Kubernetes: self._update_blocklist_on_kubernetes_error, clouds.OCI: self._update_blocklist_on_oci_error, } cloud = launchable_resources.cloud @@ -1729,6 +1759,9 @@ def ray_up(): cluster_name = logging_info['cluster_name'] logger.info(f'{style.BRIGHT}Launching on local cluster ' f'{cluster_name!r}.') + elif isinstance(to_provision_cloud, clouds.Kubernetes): + logger.info(f'{style.BRIGHT}Launching on {to_provision_cloud} ' + f'{style.RESET_ALL}') else: logger.info(f'{style.BRIGHT}Launching on {to_provision_cloud} ' f'{region_name}{style.RESET_ALL}{zone_str}') @@ -2092,7 +2125,7 @@ class CloudVmRayResourceHandle(backends.backend.ResourceHandle): - (optional) Launched resources - (optional) If TPU(s) are managed, a path to a deletion script. """ - _VERSION = 3 + _VERSION = 4 def __init__(self, *, @@ -2102,6 +2135,7 @@ def __init__(self, launched_resources: resources_lib.Resources, stable_internal_external_ips: Optional[List[Tuple[ str, str]]] = None, + stable_ssh_ports: Optional[List[int]] = None, tpu_create_script: Optional[str] = None, tpu_delete_script: Optional[str] = None) -> None: self._version = self._VERSION @@ -2111,6 +2145,7 @@ def __init__(self, # List of (internal_ip, external_ip) tuples for all the nodes # in the cluster, sorted by the external ips. self.stable_internal_external_ips = stable_internal_external_ips + self.stable_ssh_ports = stable_ssh_ports self.launched_nodes = launched_nodes self.launched_resources = launched_resources self.tpu_create_script = tpu_create_script @@ -2123,6 +2158,8 @@ def __repr__(self): f'\n\thead_ip={self.head_ip},' '\n\tstable_internal_external_ips=' f'{self.stable_internal_external_ips},' + '\n\tstable_ssh_ports=' + f'{self.stable_ssh_ports},' '\n\tcluster_yaml=' f'{self.cluster_yaml}, ' f'\n\tlaunched_resources={self.launched_nodes}x ' @@ -2193,6 +2230,19 @@ def _update_cluster_region(self): self.launched_resources = self.launched_resources.copy(region=region) + def _update_stable_ssh_ports(self, max_attempts: int = 1) -> None: + # TODO(romilb): Replace this with a call to the cloud class to get ports + if isinstance(self.launched_resources.cloud, clouds.Kubernetes): + head_port = backend_utils.get_head_ssh_port( + self, use_cache=False, max_attempts=max_attempts) + # TODO(romilb): Multinode doesn't work with Kubernetes yet. + worker_ports = [22] * (self.launched_nodes - 1) + ports = [head_port] + worker_ports + else: + # Use port 22 for other clouds + ports = [22] * self.launched_nodes + self.stable_ssh_ports = ports + def _update_stable_cluster_ips(self, max_attempts: int = 1) -> None: cluster_external_ips = backend_utils.get_node_ips( self.cluster_yaml, @@ -2259,6 +2309,16 @@ def external_ips(self, return [ips[1] for ips in self.stable_internal_external_ips] return None + def external_ssh_ports( + self, + max_attempts: int = _FETCH_IP_MAX_ATTEMPTS, + use_cached_ports: bool = True) -> Optional[List[int]]: + if not use_cached_ports: + self._update_stable_ssh_ports(max_attempts=max_attempts) + if self.stable_ssh_ports is not None: + return self.stable_ssh_ports + return None + def get_hourly_price(self) -> float: hourly_cost = (self.launched_resources.get_cost(3600) * self.launched_nodes) @@ -2275,6 +2335,13 @@ def head_ip(self): return external_ips[0] return None + @property + def head_ssh_port(self): + external_ssh_ports = self.external_ssh_ports() + if external_ssh_ports: + return external_ssh_ports[0] + return None + def __setstate__(self, state): self._version = self._VERSION @@ -2287,11 +2354,15 @@ def __setstate__(self, state): if version < 3: head_ip = state.pop('head_ip', None) state['stable_internal_external_ips'] = None + if version < 4: + # Version 4 adds self.stable_ssh_ports for Kubernetes support + state['stable_ssh_ports'] = None self.__dict__.update(state) - # Because the _update_stable_cluster_ips function uses the handle, - # we call it on the current instance after the state is updated + # Because the _update_stable_cluster_ips and _update_stable_ssh_ports + # functions use the handle, we call it on the current instance + # after the state is updated. if version < 3 and head_ip is not None: try: self._update_stable_cluster_ips() @@ -2299,6 +2370,8 @@ def __setstate__(self, state): # This occurs when an old cluster from was autostopped, # so the head IP in the database is not updated. pass + if version < 4: + self._update_stable_ssh_ports() self._update_cluster_region() @@ -2550,7 +2623,10 @@ def _provision( ip_list = handle.external_ips(max_attempts=_FETCH_IP_MAX_ATTEMPTS, use_cached_ips=False) + ssh_port_list = handle.external_ssh_ports( + max_attempts=_FETCH_IP_MAX_ATTEMPTS, use_cached_ports=False) assert ip_list is not None, handle + assert ssh_port_list is not None, handle if 'tpu_name' in config_dict: self._set_tpu_name(handle, config_dict['tpu_name']) @@ -2571,7 +2647,7 @@ def _provision( ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) runners = command_runner.SSHCommandRunner.make_runner_list( - ip_list, **ssh_credentials) + ip_list, port_list=ssh_port_list, **ssh_credentials) def _get_zone(runner): retry_count = 0 @@ -2608,13 +2684,14 @@ def _get_zone(runner): # to None. self._update_after_cluster_provisioned(handle, task, prev_cluster_status, ip_list, - lock_path) + ssh_port_list, lock_path) return handle def _update_after_cluster_provisioned( self, handle: CloudVmRayResourceHandle, task: task_lib.Task, prev_cluster_status: Optional[status_lib.ClusterStatus], - ip_list: List[str], lock_path: str) -> None: + ip_list: List[str], ssh_port_list: List[int], + lock_path: str) -> None: usage_lib.messages.usage.update_cluster_resources( handle.launched_nodes, handle.launched_resources) usage_lib.messages.usage.update_final_cluster_status( @@ -2666,7 +2743,8 @@ def _update_after_cluster_provisioned( status_lib.ClusterStatus.UP) auth_config = common_utils.read_yaml(handle.cluster_yaml)['auth'] backend_utils.SSHConfigHelper.add_cluster(handle.cluster_name, - ip_list, auth_config) + ip_list, auth_config, + ssh_port_list) common_utils.remove_file_if_exists(lock_path) @@ -2678,6 +2756,7 @@ def _sync_workdir(self, handle: CloudVmRayResourceHandle, fore = colorama.Fore style = colorama.Style ip_list = handle.external_ips() + port_list = handle.external_ssh_ports() assert ip_list is not None, 'external_ips is not cached in handle' full_workdir = os.path.abspath(os.path.expanduser(workdir)) @@ -2707,7 +2786,7 @@ def _sync_workdir(self, handle: CloudVmRayResourceHandle, # TODO(zhwu): refactor this with backend_utils.parallel_cmd_with_rsync runners = command_runner.SSHCommandRunner.make_runner_list( - ip_list, **ssh_credentials) + ip_list, port_list=port_list, **ssh_credentials) def _sync_workdir_node(runner: command_runner.SSHCommandRunner) -> None: runner.rsync( @@ -2761,6 +2840,7 @@ def _setup(self, handle: CloudVmRayResourceHandle, task: task_lib.Task, setup_file = os.path.basename(setup_sh_path) # Sync the setup script up and run it. ip_list = handle.external_ips() + port_list = handle.external_ssh_ports() assert ip_list is not None, 'external_ips is not cached in handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) @@ -2769,7 +2849,7 @@ def _setup(self, handle: CloudVmRayResourceHandle, task: task_lib.Task, # forwarding. ssh_credentials.pop('ssh_control_name') runners = command_runner.SSHCommandRunner.make_runner_list( - ip_list, **ssh_credentials) + ip_list, port_list=port_list, **ssh_credentials) # Need this `-i` option to make sure `source ~/.bashrc` work setup_cmd = f'/bin/bash -i /tmp/{setup_file} 2>&1' @@ -2851,7 +2931,9 @@ def _exec_code_on_head( fore = colorama.Fore ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) + head_ssh_port = backend_utils.get_head_ssh_port(handle) runner = command_runner.SSHCommandRunner(handle.head_ip, + port=head_ssh_port, **ssh_credentials) with tempfile.NamedTemporaryFile('w', prefix='sky_app_') as fp: fp.write(codegen) @@ -2893,7 +2975,6 @@ def _exec_code_on_head( mkdir_code = (f'{cd} && mkdir -p {remote_log_dir} && ' f'touch {remote_log_path}') code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd) - job_submit_cmd = mkdir_code + ' && ' + code if spot_dag is not None: @@ -3253,10 +3334,13 @@ def sync_down_logs( ip_list = handle.external_ips() assert ip_list is not None, 'external_ips is not cached in handle' + ssh_port_list = handle.external_ssh_ports() + assert ssh_port_list is not None, 'external_ssh_ports is not cached ' \ + 'in handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) runners = command_runner.SSHCommandRunner.make_runner_list( - ip_list, **ssh_credentials) + ip_list, port_list=ssh_port_list, **ssh_credentials) def _rsync_down(args) -> None: """Rsync down logs from remote nodes. @@ -3766,9 +3850,13 @@ def run_on_head( enabled, lines are printed only when '\r' or '\n' is found. """ head_ip = backend_utils.get_head_ip(handle, _FETCH_IP_MAX_ATTEMPTS) + head_ssh_port = backend_utils.get_head_ssh_port(handle, + _FETCH_IP_MAX_ATTEMPTS) ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) - runner = command_runner.SSHCommandRunner(head_ip, **ssh_credentials) + runner = command_runner.SSHCommandRunner(head_ip, + port=head_ssh_port, + **ssh_credentials) if under_remote_workdir: cmd = f'cd {SKY_REMOTE_WORKDIR} && {cmd}' @@ -3902,7 +3990,7 @@ def _set_tpu_name(self, handle: CloudVmRayResourceHandle, handle.cluster_yaml) runners = command_runner.SSHCommandRunner.make_runner_list( - ip_list, **ssh_credentials) + ip_list, port_list=None, **ssh_credentials) def _setup_tpu_name_on_node( runner: command_runner.SSHCommandRunner) -> None: @@ -3933,11 +4021,12 @@ def _execute_file_mounts(self, handle: CloudVmRayResourceHandle, logger.info(f'{fore.CYAN}Processing file mounts.{style.RESET_ALL}') start = time.time() ip_list = handle.external_ips() + port_list = handle.external_ssh_ports() assert ip_list is not None, 'external_ips is not cached in handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) runners = command_runner.SSHCommandRunner.make_runner_list( - ip_list, **ssh_credentials) + ip_list, port_list=port_list, **ssh_credentials) log_path = os.path.join(self.log_dir, 'file_mounts.log') # Check the files and warn @@ -4080,11 +4169,12 @@ def _execute_storage_mounts(self, handle: CloudVmRayResourceHandle, f'storage mount{plural}.{style.RESET_ALL}') start = time.time() ip_list = handle.external_ips() + port_list = handle.external_ssh_ports() assert ip_list is not None, 'external_ips is not cached in handle' ssh_credentials = backend_utils.ssh_credential_from_yaml( handle.cluster_yaml) runners = command_runner.SSHCommandRunner.make_runner_list( - ip_list, **ssh_credentials) + ip_list, port_list=port_list, **ssh_credentials) log_path = os.path.join(self.log_dir, 'storage_mounts.log') for dst, storage_obj in storage_mounts.items(): diff --git a/sky/backends/onprem_utils.py b/sky/backends/onprem_utils.py index 0666210e5aa..f95bae75f39 100644 --- a/sky/backends/onprem_utils.py +++ b/sky/backends/onprem_utils.py @@ -546,7 +546,7 @@ def do_filemounts_and_setup_on_local_workers( setup_script = log_lib.make_task_bash_script('\n'.join(setup_cmds)) worker_runners = command_runner.SSHCommandRunner.make_runner_list( - worker_ips, **ssh_credentials) + worker_ips, port_list=None, **ssh_credentials) # Uploads setup script to the worker node with tempfile.NamedTemporaryFile('w', prefix='sky_setup_') as f: diff --git a/sky/cli.py b/sky/cli.py index 380a5eca008..dd4e51d4098 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -62,10 +62,12 @@ from sky.data import storage_utils from sky.skylet import constants from sky.skylet import job_lib -from sky.utils import log_utils from sky.utils import common_utils -from sky.utils import dag_utils from sky.utils import command_runner +from sky.utils import dag_utils +from sky.utils import env_options +from sky.utils import kubernetes_utils +from sky.utils import log_utils from sky.utils import schemas from sky.utils import subprocess_utils from sky.utils import timeline @@ -3116,6 +3118,9 @@ def show_gpus( type is the lowest across all regions for both on-demand and spot instances. There may be multiple regions with the same lowest price. """ + # validation for the --cloud kubernetes + if cloud == 'kubernetes': + raise click.UsageError('Kubernetes does not have a service catalog.') # validation for the --region flag if region is not None and cloud is None: raise click.UsageError( @@ -4452,6 +4457,97 @@ def _delete_benchmark(benchmark: str) -> None: progress.refresh() +@cli.group(cls=_NaturalOrderGroup, hidden=True) +def local(): + """SkyPilot local tools CLI.""" + pass + + +@local.command('up', cls=_DocumentedCodeCommand) +@usage_lib.entrypoint +def local_up(): + """Creates a local cluster.""" + cluster_created = False + # Check if ~/.kube/config exists: + if os.path.exists(os.path.expanduser('~/.kube/config')): + curr_context = kubernetes_utils.get_current_kube_config_context_name() + skypilot_context = 'kind-skypilot' + if curr_context is not None and curr_context != skypilot_context: + click.echo( + f'Current context in kube config: {curr_context}' + '\nWill automatically switch to kind-skypilot after the local ' + 'cluster is created.') + with log_utils.safe_rich_status('Creating local cluster...'): + path_to_package = os.path.dirname(os.path.dirname(__file__)) + up_script_path = os.path.join(path_to_package, 'sky/utils/kubernetes', + 'create_cluster.sh') + # Get directory of script and run it from there + cwd = os.path.dirname(os.path.abspath(up_script_path)) + # Run script and don't print output + try: + subprocess_utils.run(up_script_path, cwd=cwd, capture_output=True) + cluster_created = True + except subprocess.CalledProcessError as e: + # Check if return code is 100 + if e.returncode == 100: + click.echo('\nLocal cluster already exists. ' + 'Run `sky local down` to delete it.') + else: + stderr = e.stderr.decode('utf-8') + click.echo(f'\nFailed to create local cluster. {stderr}') + if env_options.Options.SHOW_DEBUG_INFO.get(): + stdout = e.stdout.decode('utf-8') + click.echo(f'Logs:\n{stdout}') + sys.exit(1) + # Run sky check + with log_utils.safe_rich_status('Running sky check...'): + sky_check.check(quiet=True) + if cluster_created: + # Get number of CPUs + p = subprocess_utils.run( + 'kubectl get nodes -o jsonpath=\'{.items[0].status.capacity.cpu}\'', + capture_output=True) + num_cpus = int(p.stdout.decode('utf-8')) + if num_cpus < 2: + click.echo('Warning: Local cluster has less than 2 CPUs. ' + 'This may cause issues with running tasks.') + click.echo( + 'Local Kubernetes cluster created successfully with ' + f'{num_cpus} CPUs. `sky launch` can now run tasks locally.' + '\nHint: To change the number of CPUs, change your docker ' + 'runtime settings. See https://kind.sigs.k8s.io/docs/user/quick-start/#settings-for-docker-desktop for more info.' # pylint: disable=line-too-long + ) + + +@local.command('down', cls=_DocumentedCodeCommand) +@usage_lib.entrypoint +def local_down(): + """Deletes a local cluster.""" + cluster_removed = False + with log_utils.safe_rich_status('Removing local cluster...'): + path_to_package = os.path.dirname(os.path.dirname(__file__)) + down_script_path = os.path.join(path_to_package, 'sky/utils/kubernetes', + 'delete_cluster.sh') + try: + subprocess_utils.run(down_script_path, capture_output=True) + cluster_removed = True + except subprocess.CalledProcessError as e: + # Check if return code is 100 + if e.returncode == 100: + click.echo('\nLocal cluster does not exist.') + else: + stderr = e.stderr.decode('utf-8') + click.echo(f'\nFailed to delete local cluster. {stderr}') + if env_options.Options.SHOW_DEBUG_INFO.get(): + stdout = e.stdout.decode('utf-8') + click.echo(f'Logs:\n{stdout}') + if cluster_removed: + # Run sky check + with log_utils.safe_rich_status('Running sky check...'): + sky_check.check(quiet=True) + click.echo('Local cluster removed.') + + def main(): return cli() diff --git a/sky/clouds/__init__.py b/sky/clouds/__init__.py index c3e3fea8d93..1094a28bd1c 100644 --- a/sky/clouds/__init__.py +++ b/sky/clouds/__init__.py @@ -12,6 +12,7 @@ from sky.clouds.ibm import IBM from sky.clouds.scp import SCP from sky.clouds.oci import OCI +from sky.clouds.kubernetes import Kubernetes __all__ = [ 'IBM', @@ -23,6 +24,7 @@ 'Local', 'SCP', 'OCI', + 'Kubernetes', 'CloudImplementationFeatures', 'Region', 'Zone', diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index 723aa2d465d..5e4fdec042f 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -602,6 +602,10 @@ def query_status(cls, name: str, tag_filters: Dict[str, str], Returns: A list of ClusterStatus representing the status of all the alive nodes in the cluster. + + Raises: + exceptions.ClusterStatusFetchingError: raised if the status of the + cluster cannot be fetched. """ raise NotImplementedError diff --git a/sky/clouds/kubernetes.py b/sky/clouds/kubernetes.py new file mode 100644 index 00000000000..a2c339f9791 --- /dev/null +++ b/sky/clouds/kubernetes.py @@ -0,0 +1,408 @@ +"""Kubernetes.""" +import json +import os +import re +import typing +from typing import Dict, Iterator, List, Optional, Tuple + +from sky import clouds +from sky import exceptions +from sky import status_lib +from sky.adaptors import kubernetes +from sky.utils import common_utils +from sky.utils import ux_utils +from sky.skylet.providers.kubernetes import utils as kubernetes_utils + +if typing.TYPE_CHECKING: + # Renaming to avoid shadowing variables. + from sky import resources as resources_lib + +_CREDENTIAL_PATH = '~/.kube/config' + + +class KubernetesInstanceType: + """ + Class to represent the "Instance Type" in a Kubernetes. + + Since Kubernetes does not have a notion of instances, we generate + virtual instance types that represent the resources requested by a + pod ("node"). + + This name captures the following resource requests: + - CPU + - Memory + - Accelerators + + The name format is "{n}CPU--{k}GB" where n is the number of vCPUs and + k is the amount of memory in GB. Accelerators can be specified by + appending "--{a}{type}" where a is the number of accelerators and + type is the accelerator type. + + Examples: + - 4CPU--16GB + - 0.5CPU--1.5GB + - 4CPU--16GB--1V100 + """ + + def __init__(self, + cpus: Optional[float] = None, + memory: Optional[float] = None, + accelerator_count: Optional[float] = None, + accelerator_type: Optional[str] = None): + self.cpus = cpus + self.memory = memory + self.accelerator_count = accelerator_count + self.accelerator_type = accelerator_type + + @property + def name(self) -> str: + """Returns the name of the instance.""" + name = f'{self.cpus}CPU--{self.memory}GB' + if self.accelerator_count: + name += f'--{self.accelerator_count}{self.accelerator_type}' + return name + + @staticmethod + def is_valid_instance_type(name: str) -> bool: + """Returns whether the given name is a valid instance type.""" + pattern = re.compile(r'^(\d+(\.\d+)?CPU--\d+(\.\d+)?GB)(--\d+\S+)?$') + return bool(pattern.match(name)) + + @classmethod + def _parse_instance_type( + cls, + name: str) -> Tuple[float, float, Optional[float], Optional[str]]: + """Returns the cpus, memory, accelerator_count, and accelerator_type + from the given name.""" + pattern = re.compile( + r'^(?P\d+(\.\d+)?)CPU--(?P\d+(\.\d+)?)GB(?:--(?P\d+)(?P\S+))?$' # pylint: disable=line-too-long + ) + match = pattern.match(name) + if match: + cpus = float(match.group('cpus')) + memory = float(match.group('memory')) + accelerator_count = match.group('accelerator_count') + accelerator_type = match.group('accelerator_type') + if accelerator_count: + accelerator_count = float(accelerator_count) + accelerator_type = str(accelerator_type) + else: + accelerator_count = None + accelerator_type = None + return cpus, memory, accelerator_count, accelerator_type + else: + raise ValueError(f'Invalid instance name: {name}') + + @classmethod + def from_instance_type(cls, name: str) -> 'KubernetesInstanceType': + """Returns an instance name object from the given name.""" + if not cls.is_valid_instance_type(name): + raise ValueError(f'Invalid instance name: {name}') + cpus, memory, accelerator_count, accelerator_type = \ + cls._parse_instance_type(name) + return cls(cpus=cpus, + memory=memory, + accelerator_count=accelerator_count, + accelerator_type=accelerator_type) + + @classmethod + def from_resources(cls, + cpus: float, + memory: float, + accelerator_count: float = 0, + accelerator_type: str = '') -> 'KubernetesInstanceType': + """Returns an instance name object from the given resources.""" + name = f'{cpus}CPU--{memory}GB' + if accelerator_count > 0: + name += f'--{accelerator_count}{accelerator_type}' + return cls(cpus=cpus, + memory=memory, + accelerator_count=accelerator_count, + accelerator_type=accelerator_type) + + def __str__(self): + return self.name + + +@clouds.CLOUD_REGISTRY.register +class Kubernetes(clouds.Cloud): + """Kubernetes.""" + + SKY_SSH_KEY_SECRET_NAME = f'sky-ssh-{common_utils.get_user_hash()}' + + # Timeout for resource provisioning. This timeout determines how long to + # wait for pod to be in pending status before giving up. + # Larger timeout may be required for autoscaling clusters, since autoscaler + # may take some time to provision new nodes. + # Note that this timeout includes time taken by the Kubernetes scheduler + # itself, which can be upto 2-3 seconds. + # For non-autoscaling clusters, we conservatively set this to 10s. + # TODO(romilb): Make the timeout configurable. + TIMEOUT = 10 + + _DEFAULT_NUM_VCPUS = 2 + _DEFAULT_MEMORY_CPU_RATIO = 1 + _REPR = 'Kubernetes' + _regions: List[clouds.Region] = [clouds.Region('kubernetes')] + _CLOUD_UNSUPPORTED_FEATURES = { + # TODO(romilb): Stopping might be possible to implement with + # container checkpointing introduced in Kubernetes v1.25. See: + # https://kubernetes.io/blog/2022/12/05/forensic-container-checkpointing-alpha/ # pylint: disable=line-too-long + clouds.CloudImplementationFeatures.STOP: 'Kubernetes does not ' + 'support stopping VMs.', + clouds.CloudImplementationFeatures.AUTOSTOP: 'Kubernetes does not ' + 'support stopping VMs.', + clouds.CloudImplementationFeatures.MULTI_NODE: 'Multi-node is not ' + 'supported by the ' + 'Kubernetes ' + 'implementation yet.', + clouds.CloudImplementationFeatures.SPOT_INSTANCE: 'Spot instances are ' + 'not supported in ' + 'Kubernetes.', + clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER: 'Custom disk ' + 'tiers are not ' + 'supported in ' + 'Kubernetes.', + } + + IMAGE = 'us-central1-docker.pkg.dev/' \ + 'skypilot-375900/skypilotk8s/skypilot:latest' + + @classmethod + def _cloud_unsupported_features( + cls) -> Dict[clouds.CloudImplementationFeatures, str]: + return cls._CLOUD_UNSUPPORTED_FEATURES + + @classmethod + def regions(cls) -> List[clouds.Region]: + return cls._regions + + @classmethod + def regions_with_offering(cls, instance_type: Optional[str], + accelerators: Optional[Dict[str, int]], + use_spot: bool, region: Optional[str], + zone: Optional[str]) -> List[clouds.Region]: + # No notion of regions in Kubernetes - return a single region. + return cls.regions() + + def instance_type_to_hourly_cost(self, + instance_type: str, + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + # TODO(romilb): Investigate how users can provide their own cost catalog + # for Kubernetes clusters. + # For now, assume zero cost for Kubernetes clusters + return 0.0 + + def accelerators_to_hourly_cost(self, + accelerators: Dict[str, int], + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + del accelerators, use_spot, region, zone # unused + return 0.0 + + def get_egress_cost(self, num_gigabytes: float) -> float: + return 0.0 + + def __repr__(self): + return self._REPR + + def is_same_cloud(self, other: clouds.Cloud) -> bool: + return isinstance(other, Kubernetes) + + @classmethod + def get_port(cls, svc_name) -> int: + ns = kubernetes_utils.get_current_kube_config_context_namespace() + return kubernetes_utils.get_port(svc_name, ns) + + @classmethod + def get_default_instance_type( + cls, + cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[str] = None) -> Optional[str]: + del disk_tier # Unused. + # TODO(romilb): Allow fractional CPUs and memory + # TODO(romilb): We should check the maximum number of CPUs and memory + # that can be requested, and return None if the requested resources + # exceed the maximum. This may require thought about how to handle + # autoscaling clusters. + # We strip '+' from resource requests since Kubernetes can provision + # exactly the requested resources. + instance_cpus = int( + cpus.strip('+')) if cpus is not None else cls._DEFAULT_NUM_VCPUS + instance_mem = int( + memory.strip('+') + ) if memory is not None else \ + instance_cpus * cls._DEFAULT_MEMORY_CPU_RATIO + virtual_instance_type = KubernetesInstanceType(instance_cpus, + instance_mem).name + return virtual_instance_type + + @classmethod + def get_accelerators_from_instance_type( + cls, + instance_type: str, + ) -> Optional[Dict[str, int]]: + # TODO(romilb): Add GPU support. + return None + + @classmethod + def get_vcpus_mem_from_instance_type( + cls, instance_type: str) -> Tuple[Optional[float], Optional[float]]: + """Returns the #vCPUs and memory that the instance type offers.""" + k = KubernetesInstanceType.from_instance_type(instance_type) + return k.cpus, k.memory + + @classmethod + def zones_provision_loop( + cls, + *, + region: str, + num_nodes: int, + instance_type: str, + accelerators: Optional[Dict[str, int]] = None, + use_spot: bool = False, + ) -> Iterator[Optional[List[clouds.Zone]]]: + del num_nodes, region, instance_type, accelerators, use_spot # Unused. + for r in cls.regions(): + yield r.zones + + @classmethod + def get_zone_shell_cmd(cls) -> Optional[str]: + return None + + def make_deploy_resources_variables( + self, resources: 'resources_lib.Resources', + region: Optional['clouds.Region'], + zones: Optional[List['clouds.Zone']]) -> Dict[str, Optional[str]]: + del zones + if region is None: + region = self._regions[0] + + r = resources + acc_dict = self.get_accelerators_from_instance_type(r.instance_type) + if acc_dict is not None: + custom_resources = json.dumps(acc_dict, separators=(',', ':')) + else: + custom_resources = None + + # resources.memory and cpus are None if they are not explicitly set. + # We fetch the default values for the instance type in that case. + cpus, mem = self.get_vcpus_mem_from_instance_type( + resources.instance_type) + return { + 'instance_type': resources.instance_type, + 'custom_resources': custom_resources, + 'region': region.name, + 'cpus': str(cpus), + 'memory': str(mem), + 'timeout': str(self.TIMEOUT), + 'k8s_ssh_key_secret_name': self.SKY_SSH_KEY_SECRET_NAME, + # TODO(romilb): Allow user to specify custom images + 'image_id': self.IMAGE, + } + + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources'): + fuzzy_candidate_list: List[str] = [] + if resources.instance_type is not None: + assert resources.is_launchable(), resources + resources = resources.copy(accelerators=None) + return ([resources], fuzzy_candidate_list) + + def _make(instance_list): + resource_list = [] + for instance_type in instance_list: + r = resources.copy( + cloud=Kubernetes(), + instance_type=instance_type, + accelerators=None, + ) + resource_list.append(r) + return resource_list + + # Currently, handle a filter on accelerators only. + accelerators = resources.accelerators + if accelerators is None: + # Return a default instance type with the given number of vCPUs. + default_instance_type = Kubernetes.get_default_instance_type( + cpus=resources.cpus, + memory=resources.memory, + disk_tier=resources.disk_tier) + if default_instance_type is None: + return ([], []) + else: + return (_make([default_instance_type]), []) + + assert len(accelerators) == 1, resources + # If GPUs are requested, return an empty list. + # TODO(romilb): Add GPU support. + return ([], []) + + @classmethod + def check_credentials(cls) -> Tuple[bool, Optional[str]]: + if os.path.exists(os.path.expanduser(_CREDENTIAL_PATH)): + # Test using python API + return kubernetes_utils.check_credentials() + else: + return False, 'Credentials not found - ' \ + f'check if {_CREDENTIAL_PATH} exists.' + + def get_credential_file_mounts(self) -> Dict[str, str]: + return {_CREDENTIAL_PATH: _CREDENTIAL_PATH} + + def instance_type_exists(self, instance_type: str) -> bool: + return KubernetesInstanceType.is_valid_instance_type(instance_type) + + def validate_region_zone(self, region: Optional[str], zone: Optional[str]): + # Kubernetes doesn't have regions or zones, so we don't need to validate + return region, zone + + def accelerator_in_region_or_zone(self, + accelerator: str, + acc_count: int, + region: Optional[str] = None, + zone: Optional[str] = None) -> bool: + # TODO(romilb): All accelerators are marked as not available for now. + # In the future, we should return false for accelerators that we know + # are not supported by the cluster. + return False + + @classmethod + def query_status(cls, name: str, tag_filters: Dict[str, str], + region: Optional[str], zone: Optional[str], + **kwargs) -> List['status_lib.ClusterStatus']: + del tag_filters, region, zone, kwargs # Unused. + namespace = kubernetes_utils.get_current_kube_config_context_namespace() + + # Get all the pods with the label skypilot-cluster: + try: + pods = kubernetes.core_api().list_namespaced_pod( + namespace, + label_selector=f'skypilot-cluster={name}', + _request_timeout=kubernetes.API_TIMEOUT).items + except kubernetes.max_retry_error(): + with ux_utils.print_exception_no_traceback(): + ctx = kubernetes_utils.get_current_kube_config_context_name() + raise exceptions.ClusterStatusFetchingError( + f'Failed to query cluster {name!r} status. ' + 'Network error - check if the Kubernetes cluster in ' + f'context {ctx} is up and accessible.') from None + except Exception as e: # pylint: disable=broad-except + with ux_utils.print_exception_no_traceback(): + raise exceptions.ClusterStatusFetchingError( + f'Failed to query Kubernetes cluster {name!r} status: ' + f'{common_utils.format_exception(e)}') + + # Check if the pods are running or pending + cluster_status = [] + for pod in pods: + if pod.status.phase == 'Running': + cluster_status.append(status_lib.ClusterStatus.UP) + elif pod.status.phase == 'Pending': + cluster_status.append(status_lib.ClusterStatus.INIT) + # If pods are not found, we don't add them to the return list + return cluster_status diff --git a/sky/setup_files/MANIFEST.in b/sky/setup_files/MANIFEST.in index 0e1a993904b..a23e53ee203 100644 --- a/sky/setup_files/MANIFEST.in +++ b/sky/setup_files/MANIFEST.in @@ -7,12 +7,14 @@ include sky/skylet/providers/aws/* include sky/skylet/providers/aws/cloudwatch/* include sky/skylet/providers/azure/* include sky/skylet/providers/gcp/* -include sky/skylet/providers/lambda_cloud/* include sky/skylet/providers/ibm/* -include sky/skylet/providers/scp/* +include sky/skylet/providers/kubernetes/* +include sky/skylet/providers/lambda_cloud/* include sky/skylet/providers/oci/* +include sky/skylet/providers/scp/* include sky/skylet/ray_patches/*.patch include sky/spot/dashboard/* include sky/spot/dashboard/templates/* include sky/spot/dashboard/static/* include sky/templates/* +include sky/utils/kubernetes/* diff --git a/sky/setup_files/setup.py b/sky/setup_files/setup.py index c9ed89f0b4f..6447a9af629 100644 --- a/sky/setup_files/setup.py +++ b/sky/setup_files/setup.py @@ -157,6 +157,7 @@ def parse_readme(readme: str) -> str: 'cloudflare': aws_dependencies, 'scp': [], 'oci': ['oci'], + 'kubernetes': ['kubernetes'], } extras_require['all'] = sum(extras_require.values(), []) diff --git a/sky/skylet/providers/kubernetes/__init__.py b/sky/skylet/providers/kubernetes/__init__.py new file mode 100644 index 00000000000..b09a3fe4183 --- /dev/null +++ b/sky/skylet/providers/kubernetes/__init__.py @@ -0,0 +1,2 @@ +from sky.skylet.providers.kubernetes.utils import get_head_ssh_port, get_port +from sky.skylet.providers.kubernetes.node_provider import KubernetesNodeProvider diff --git a/sky/skylet/providers/kubernetes/config.py b/sky/skylet/providers/kubernetes/config.py new file mode 100644 index 00000000000..d684c4b4617 --- /dev/null +++ b/sky/skylet/providers/kubernetes/config.py @@ -0,0 +1,308 @@ +import copy +import logging +import math +import re + +from sky.adaptors import kubernetes +from sky.skylet.providers.kubernetes import utils + +logger = logging.getLogger(__name__) + +MEMORY_SIZE_UNITS = { + "K": 2**10, + "M": 2**20, + "G": 2**30, + "T": 2**40, + 'P': 2**50, +} + +log_prefix = 'KubernetesNodeProvider: ' + +# Timeout for deleting a Kubernetes resource (in seconds). +DELETION_TIMEOUT = 90 + + +class InvalidNamespaceError(ValueError): + + def __init__(self, field_name, namespace): + self.message = ( + f'Namespace of {field_name} config does not match provided ' + f'namespace "{namespace}". Either set it to {namespace} or remove the ' + 'field') + + def __str__(self): + return self.message + + +def using_existing_msg(resource_type, name): + return f'using existing {resource_type} "{name}"' + + +def updating_existing_msg(resource_type, name): + return f'updating existing {resource_type} "{name}"' + + +def not_found_msg(resource_type, name): + return f'{resource_type} "{name}" not found, attempting to create it' + + +def not_checking_msg(resource_type, name): + return f'not checking if {resource_type} "{name}" exists' + + +def created_msg(resource_type, name): + return f'successfully created {resource_type} "{name}"' + + +def not_provided_msg(resource_type): + return f'no {resource_type} config provided, must already exist' + + +def bootstrap_kubernetes(config): + namespace = utils.get_current_kube_config_context_namespace() + + _configure_services(namespace, config['provider']) + + if not config['provider'].get('_operator'): + # These steps are unecessary when using the Operator. + _configure_autoscaler_service_account(namespace, config['provider']) + _configure_autoscaler_role(namespace, config['provider']) + _configure_autoscaler_role_binding(namespace, config['provider']) + + return config + + +def fillout_resources_kubernetes(config): + """Fills CPU and GPU resources in the ray cluster config. + + For each node type and each of CPU/GPU, looks at container's resources + and limits, takes min of the two. + """ + if 'available_node_types' not in config: + return config + node_types = copy.deepcopy(config['available_node_types']) + head_node_type = config['head_node_type'] + for node_type in node_types: + + node_config = node_types[node_type]['node_config'] + # The next line is for compatibility with configs which define pod specs + # cf. KubernetesNodeProvider.create_node(). + pod = node_config.get('pod', node_config) + container_data = pod['spec']['containers'][0] + + autodetected_resources = get_autodetected_resources(container_data) + if node_types == head_node_type: + # we only autodetect worker type node memory resource + autodetected_resources.pop('memory') + if 'resources' not in config['available_node_types'][node_type]: + config['available_node_types'][node_type]['resources'] = {} + autodetected_resources.update( + config['available_node_types'][node_type]['resources']) + config['available_node_types'][node_type][ + 'resources'] = autodetected_resources + logger.debug(f'Updating the resources of node type {node_type} ' + f'to include {autodetected_resources}.') + return config + + +def get_autodetected_resources(container_data): + container_resources = container_data.get('resources', None) + if container_resources is None: + return {'CPU': 0, 'GPU': 0} + + node_type_resources = { + resource_name.upper(): get_resource(container_resources, resource_name) + for resource_name in ['cpu', 'gpu'] + } + + # TODO(romilb): Update this to allow fractional resources. + memory_limits = get_resource(container_resources, 'memory') + node_type_resources['memory'] = int(memory_limits) + + return node_type_resources + + +def get_resource(container_resources, resource_name): + limit = _get_resource(container_resources, + resource_name, + field_name='limits') + # float('inf') means there's no limit set + return 0 if limit == float('inf') else int(limit) + + +def _get_resource(container_resources, resource_name, field_name): + """Returns the resource quantity. + + The amount of resource is rounded up to nearest integer. + Returns float("inf") if the resource is not present. + + Args: + container_resources: Container's resource field. + resource_name: One of 'cpu', 'gpu' or memory. + field_name: One of 'requests' or 'limits'. + + Returns: + Union[int, float]: Detected resource quantity. + """ + if field_name not in container_resources: + # No limit/resource field. + return float('inf') + resources = container_resources[field_name] + # Look for keys containing the resource_name. For example, + # the key 'nvidia.com/gpu' contains the key 'gpu'. + matching_keys = [key for key in resources if resource_name in key.lower()] + if len(matching_keys) == 0: + return float('inf') + if len(matching_keys) > 1: + # Should have only one match -- mostly relevant for gpu. + raise ValueError(f'Multiple {resource_name} types not supported.') + # E.g. 'nvidia.com/gpu' or 'cpu'. + resource_key = matching_keys.pop() + resource_quantity = resources[resource_key] + if resource_name == 'memory': + return _parse_memory_resource(resource_quantity) + else: + return _parse_cpu_or_gpu_resource(resource_quantity) + + +def _parse_cpu_or_gpu_resource(resource): + resource_str = str(resource) + if resource_str[-1] == 'm': + # For example, '500m' rounds up to 1. + return math.ceil(int(resource_str[:-1]) / 1000) + else: + return float(resource_str) + + +def _parse_memory_resource(resource): + resource_str = str(resource) + try: + return int(resource_str) + except ValueError: + pass + memory_size = re.sub(r'([KMGTP]+)', r' \1', resource_str) + number, unit_index = [item.strip() for item in memory_size.split()] + unit_index = unit_index[0] + return float(number) * MEMORY_SIZE_UNITS[unit_index] + + +def _configure_autoscaler_service_account(namespace, provider_config): + account_field = 'autoscaler_service_account' + if account_field not in provider_config: + logger.info(log_prefix + not_provided_msg(account_field)) + return + + account = provider_config[account_field] + if 'namespace' not in account['metadata']: + account['metadata']['namespace'] = namespace + elif account['metadata']['namespace'] != namespace: + raise InvalidNamespaceError(account_field, namespace) + + name = account['metadata']['name'] + field_selector = f'metadata.name={name}' + accounts = (kubernetes.core_api().list_namespaced_service_account( + namespace, field_selector=field_selector).items) + if len(accounts) > 0: + assert len(accounts) == 1 + logger.info(log_prefix + using_existing_msg(account_field, name)) + return + + logger.info(log_prefix + not_found_msg(account_field, name)) + kubernetes.core_api().create_namespaced_service_account(namespace, account) + logger.info(log_prefix + created_msg(account_field, name)) + + +def _configure_autoscaler_role(namespace, provider_config): + role_field = 'autoscaler_role' + if role_field not in provider_config: + logger.info(log_prefix + not_provided_msg(role_field)) + return + + role = provider_config[role_field] + if 'namespace' not in role['metadata']: + role['metadata']['namespace'] = namespace + elif role['metadata']['namespace'] != namespace: + raise InvalidNamespaceError(role_field, namespace) + + name = role['metadata']['name'] + field_selector = f'metadata.name={name}' + accounts = (kubernetes.auth_api().list_namespaced_role( + namespace, field_selector=field_selector).items) + if len(accounts) > 0: + assert len(accounts) == 1 + logger.info(log_prefix + using_existing_msg(role_field, name)) + return + + logger.info(log_prefix + not_found_msg(role_field, name)) + kubernetes.auth_api().create_namespaced_role(namespace, role) + logger.info(log_prefix + created_msg(role_field, name)) + + +def _configure_autoscaler_role_binding(namespace, provider_config): + binding_field = 'autoscaler_role_binding' + if binding_field not in provider_config: + logger.info(log_prefix + not_provided_msg(binding_field)) + return + + binding = provider_config[binding_field] + if 'namespace' not in binding['metadata']: + binding['metadata']['namespace'] = namespace + elif binding['metadata']['namespace'] != namespace: + raise InvalidNamespaceError(binding_field, namespace) + for subject in binding['subjects']: + if 'namespace' not in subject: + subject['namespace'] = namespace + elif subject['namespace'] != namespace: + subject_name = subject['name'] + raise InvalidNamespaceError( + binding_field + f' subject {subject_name}', namespace) + + name = binding['metadata']['name'] + field_selector = f'metadata.name={name}' + accounts = (kubernetes.auth_api().list_namespaced_role_binding( + namespace, field_selector=field_selector).items) + if len(accounts) > 0: + assert len(accounts) == 1 + logger.info(log_prefix + using_existing_msg(binding_field, name)) + return + + logger.info(log_prefix + not_found_msg(binding_field, name)) + kubernetes.auth_api().create_namespaced_role_binding(namespace, binding) + logger.info(log_prefix + created_msg(binding_field, name)) + + +def _configure_services(namespace, provider_config): + service_field = 'services' + if service_field not in provider_config: + logger.info(log_prefix + not_provided_msg(service_field)) + return + + services = provider_config[service_field] + for service in services: + if 'namespace' not in service['metadata']: + service['metadata']['namespace'] = namespace + elif service['metadata']['namespace'] != namespace: + raise InvalidNamespaceError(service_field, namespace) + + name = service['metadata']['name'] + field_selector = f'metadata.name={name}' + services = (kubernetes.core_api().list_namespaced_service( + namespace, field_selector=field_selector).items) + if len(services) > 0: + assert len(services) == 1 + existing_service = services[0] + if service == existing_service: + logger.info(log_prefix + using_existing_msg('service', name)) + return + else: + logger.info(log_prefix + updating_existing_msg('service', name)) + kubernetes.core_api().patch_namespaced_service( + name, namespace, service) + else: + logger.info(log_prefix + not_found_msg('service', name)) + kubernetes.core_api().create_namespaced_service(namespace, service) + logger.info(log_prefix + created_msg('service', name)) + + +class KubernetesError(Exception): + pass diff --git a/sky/skylet/providers/kubernetes/node_provider.py b/sky/skylet/providers/kubernetes/node_provider.py new file mode 100644 index 00000000000..3ab8414b2d2 --- /dev/null +++ b/sky/skylet/providers/kubernetes/node_provider.py @@ -0,0 +1,333 @@ +import copy +import logging +import time +from typing import Dict +from urllib.parse import urlparse +from uuid import uuid4 + +from sky.adaptors import kubernetes +from sky.skylet.providers.kubernetes import config +from sky.skylet.providers.kubernetes import get_head_ssh_port +from sky.skylet.providers.kubernetes import utils +from ray.autoscaler._private.command_runner import SSHCommandRunner +from ray.autoscaler.node_provider import NodeProvider +from ray.autoscaler.tags import NODE_KIND_HEAD, TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_KIND + +logger = logging.getLogger(__name__) + +MAX_TAG_RETRIES = 3 +DELAY_BEFORE_TAG_RETRY = 0.5 + +RAY_COMPONENT_LABEL = 'cluster.ray.io/component' + + +# Monkey patch SSHCommandRunner to allow specifying SSH port +def set_port(self, port): + self.ssh_options.arg_dict['Port'] = port + + +SSHCommandRunner.set_port = set_port + + +def head_service_selector(cluster_name: str) -> Dict[str, str]: + """Selector for Operator-configured head service.""" + return {RAY_COMPONENT_LABEL: f'{cluster_name}-ray-head'} + + +def to_label_selector(tags): + label_selector = '' + for k, v in tags.items(): + if label_selector != '': + label_selector += ',' + label_selector += '{}={}'.format(k, v) + return label_selector + + +class KubernetesNodeProvider(NodeProvider): + + def __init__(self, provider_config, cluster_name): + NodeProvider.__init__(self, provider_config, cluster_name) + self.cluster_name = cluster_name + + # Kubernetes namespace to user + self.namespace = utils.get_current_kube_config_context_namespace() + + # Timeout for resource provisioning. If it takes longer than this + # timeout, the resource provisioning will be considered failed. + # This is useful for failover. May need to be adjusted for different + # kubernetes setups. + self.timeout = provider_config['timeout'] + + def non_terminated_nodes(self, tag_filters): + # Match pods that are in the 'Pending' or 'Running' phase. + # Unfortunately there is no OR operator in field selectors, so we + # have to match on NOT any of the other phases. + field_selector = ','.join([ + 'status.phase!=Failed', + 'status.phase!=Unknown', + 'status.phase!=Succeeded', + 'status.phase!=Terminating', + ]) + + tag_filters[TAG_RAY_CLUSTER_NAME] = self.cluster_name + label_selector = to_label_selector(tag_filters) + pod_list = kubernetes.core_api().list_namespaced_pod( + self.namespace, + field_selector=field_selector, + label_selector=label_selector) + + # Don't return pods marked for deletion, + # i.e. pods with non-null metadata.DeletionTimestamp. + return [ + pod.metadata.name + for pod in pod_list.items + if pod.metadata.deletion_timestamp is None + ] + + def is_running(self, node_id): + pod = kubernetes.core_api().read_namespaced_pod(node_id, self.namespace) + return pod.status.phase == 'Running' + + def is_terminated(self, node_id): + pod = kubernetes.core_api().read_namespaced_pod(node_id, self.namespace) + return pod.status.phase not in ['Running', 'Pending'] + + def node_tags(self, node_id): + pod = kubernetes.core_api().read_namespaced_pod(node_id, self.namespace) + return pod.metadata.labels + + def external_ip(self, node_id): + # Return the IP address of the first node with an external IP + nodes = kubernetes.core_api().list_node().items + for node in nodes: + if node.status.addresses: + for address in node.status.addresses: + if address.type == 'ExternalIP': + return address.address + # If no external IP is found, use the API server IP + api_host = kubernetes.core_api().api_client.configuration.host + parsed_url = urlparse(api_host) + return parsed_url.hostname + + def external_port(self, node_id): + # Extract the NodePort of the head node's SSH service + # Node id is str e.g., example-cluster-ray-head-v89lb + + # TODO(romilb): Implement caching here for performance. + # TODO(romilb): Multi-node would need more handling here. + cluster_name = node_id.split('-ray-head')[0] + return get_head_ssh_port(cluster_name, self.namespace) + + def internal_ip(self, node_id): + pod = kubernetes.core_api().read_namespaced_pod(node_id, self.namespace) + return pod.status.pod_ip + + def get_node_id(self, ip_address, use_internal_ip=True) -> str: + + def find_node_id(): + if use_internal_ip: + return self._internal_ip_cache.get(ip_address) + else: + return self._external_ip_cache.get(ip_address) + + if not find_node_id(): + all_nodes = self.non_terminated_nodes({}) + ip_func = self.internal_ip if use_internal_ip else self.external_ip + ip_cache = (self._internal_ip_cache + if use_internal_ip else self._external_ip_cache) + for node_id in all_nodes: + ip_cache[ip_func(node_id)] = node_id + + if not find_node_id(): + if use_internal_ip: + known_msg = f'Worker internal IPs: {list(self._internal_ip_cache)}' + else: + known_msg = f'Worker external IP: {list(self._external_ip_cache)}' + raise ValueError(f'ip {ip_address} not found. ' + known_msg) + + return find_node_id() + + def set_node_tags(self, node_ids, tags): + for _ in range(MAX_TAG_RETRIES - 1): + try: + self._set_node_tags(node_ids, tags) + return + except kubernetes.api_exception() as e: + if e.status == 409: + logger.info(config.log_prefix + + 'Caught a 409 error while setting' + ' node tags. Retrying...') + time.sleep(DELAY_BEFORE_TAG_RETRY) + continue + else: + raise + # One more try + self._set_node_tags(node_ids, tags) + + def _set_node_tags(self, node_id, tags): + pod = kubernetes.core_api().read_namespaced_pod(node_id, self.namespace) + pod.metadata.labels.update(tags) + kubernetes.core_api().patch_namespaced_pod(node_id, self.namespace, pod) + + def create_node(self, node_config, tags, count): + conf = copy.deepcopy(node_config) + pod_spec = conf.get('pod', conf) + service_spec = conf.get('service') + node_uuid = str(uuid4()) + tags[TAG_RAY_CLUSTER_NAME] = self.cluster_name + tags['ray-node-uuid'] = node_uuid + pod_spec['metadata']['namespace'] = self.namespace + if 'labels' in pod_spec['metadata']: + pod_spec['metadata']['labels'].update(tags) + else: + pod_spec['metadata']['labels'] = tags + + # Allow Operator-configured service to access the head node. + if tags[TAG_RAY_NODE_KIND] == NODE_KIND_HEAD: + head_selector = head_service_selector(self.cluster_name) + pod_spec['metadata']['labels'].update(head_selector) + + logger.info(config.log_prefix + + 'calling create_namespaced_pod (count={}).'.format(count)) + new_nodes = [] + for _ in range(count): + pod = kubernetes.core_api().create_namespaced_pod( + self.namespace, pod_spec) + new_nodes.append(pod) + + new_svcs = [] + if service_spec is not None: + logger.info(config.log_prefix + 'calling create_namespaced_service ' + '(count={}).'.format(count)) + + for new_node in new_nodes: + + metadata = service_spec.get('metadata', {}) + metadata['name'] = new_node.metadata.name + service_spec['metadata'] = metadata + service_spec['spec']['selector'] = {'ray-node-uuid': node_uuid} + svc = kubernetes.core_api().create_namespaced_service( + self.namespace, service_spec) + new_svcs.append(svc) + + # Wait for all pods to be ready, and if it exceeds the timeout, raise an + # exception. If pod's container is ContainerCreating, then we can assume + # that resources have been allocated and we can exit. + + start = time.time() + while True: + if time.time() - start > self.timeout: + raise config.KubernetesError( + 'Timed out while waiting for nodes to start. ' + 'Cluster may be out of resources or ' + 'may be too slow to autoscale.') + all_ready = True + + for node in new_nodes: + pod = kubernetes.core_api().read_namespaced_pod( + node.metadata.name, self.namespace) + if pod.status.phase == 'Pending': + # Iterate over each pod to check their status + if pod.status.container_statuses is not None: + for container_status in pod.status.container_statuses: + # Continue if container status is ContainerCreating + # This indicates this pod has been scheduled. + if container_status.state.waiting is not None and container_status.state.waiting.reason == 'ContainerCreating': + continue + else: + # If the container wasn't in creating state, + # then we know pod wasn't scheduled or had some + # other error, such as image pull error. + # See list of possible reasons for waiting here: + # https://stackoverflow.com/a/57886025 + all_ready = False + else: + # If container_statuses is None, then the pod hasn't + # been scheduled yet. + all_ready = False + if all_ready: + break + time.sleep(1) + + def terminate_node(self, node_id): + logger.info(config.log_prefix + 'calling delete_namespaced_pod') + try: + kubernetes.core_api().delete_namespaced_pod( + node_id, + self.namespace, + _request_timeout=config.DELETION_TIMEOUT) + except kubernetes.api_exception() as e: + if e.status == 404: + logger.warning(config.log_prefix + + f'Tried to delete pod {node_id},' + ' but the pod was not found (404).') + else: + raise + try: + kubernetes.core_api().delete_namespaced_service( + node_id, + self.namespace, + _request_timeout=config.DELETION_TIMEOUT) + kubernetes.core_api().delete_namespaced_service( + f'{node_id}-ssh', + self.namespace, + _request_timeout=config.DELETION_TIMEOUT) + except kubernetes.api_exception(): + pass + + def terminate_nodes(self, node_ids): + # TODO(romilb): terminate_nodes should be include optimizations for + # deletion of multiple nodes. Currently, it deletes one node at a time. + # We should look in to using deletecollection here for batch deletion. + for node_id in node_ids: + self.terminate_node(node_id) + + def get_command_runner(self, + log_prefix, + node_id, + auth_config, + cluster_name, + process_runner, + use_internal_ip, + docker_config=None): + """Returns the CommandRunner class used to perform SSH commands. + + Args: + log_prefix(str): stores "NodeUpdater: {}: ".format(). Used + to print progress in the CommandRunner. + node_id(str): the node ID. + auth_config(dict): the authentication configs from the autoscaler + yaml file. + cluster_name(str): the name of the cluster. + process_runner(module): the module to use to run the commands + in the CommandRunner. E.g., subprocess. + use_internal_ip(bool): whether the node_id belongs to an internal ip + or external ip. + docker_config(dict): If set, the docker information of the docker + container that commands should be run on. + """ + common_args = { + 'log_prefix': log_prefix, + 'node_id': node_id, + 'provider': self, + 'auth_config': auth_config, + 'cluster_name': cluster_name, + 'process_runner': process_runner, + 'use_internal_ip': use_internal_ip, + } + command_runner = SSHCommandRunner(**common_args) + if use_internal_ip: + port = 22 + else: + port = self.external_port(node_id) + command_runner.set_port(port) + return command_runner + + @staticmethod + def bootstrap_config(cluster_config): + return config.bootstrap_kubernetes(cluster_config) + + @staticmethod + def fillout_available_node_types_resources(cluster_config): + """Fills out missing "resources" field for available_node_types.""" + return config.fillout_resources_kubernetes(cluster_config) diff --git a/sky/skylet/providers/kubernetes/utils.py b/sky/skylet/providers/kubernetes/utils.py new file mode 100644 index 00000000000..60bc99d0050 --- /dev/null +++ b/sky/skylet/providers/kubernetes/utils.py @@ -0,0 +1,99 @@ +from typing import Tuple, Optional + +from sky.utils import common_utils +from sky.adaptors import kubernetes + +DEFAULT_NAMESPACE = 'default' + + +def get_head_ssh_port(cluster_name: str, namespace: str) -> int: + svc_name = f'{cluster_name}-ray-head-ssh' + return get_port(svc_name, namespace) + + +def get_port(svc_name: str, namespace: str) -> int: + """ + Gets the nodeport of the specified service. + + Args: + svc_name (str): Name of the kubernetes service. Note that this may be + different from the cluster name. + namespace (str): Kubernetes namespace to look for the service in. + """ + head_service = kubernetes.core_api().read_namespaced_service( + svc_name, namespace) + return head_service.spec.ports[0].node_port + + +def check_credentials(timeout: int = kubernetes.API_TIMEOUT) -> \ + Tuple[bool, Optional[str]]: + """ + Check if the credentials in kubeconfig file are valid + + Args: + timeout (int): Timeout in seconds for the test API call + + Returns: + bool: True if credentials are valid, False otherwise + str: Error message if credentials are invalid, None otherwise + """ + try: + ns = get_current_kube_config_context_namespace() + kubernetes.core_api().list_namespaced_pod(ns, _request_timeout=timeout) + return True, None + except ImportError: + # TODO(romilb): Update these error strs to also include link to docs + # when docs are ready. + return False, f'`kubernetes` package is not installed. ' \ + f'Install it with: pip install kubernetes' + except kubernetes.api_exception() as e: + # Check if the error is due to invalid credentials + if e.status == 401: + return False, 'Invalid credentials - do you have permission ' \ + 'to access the cluster?' + else: + return False, f'Failed to communicate with the cluster: {str(e)}' + except kubernetes.config_exception() as e: + return False, f'Invalid configuration file: {str(e)}' + except kubernetes.max_retry_error(): + return False, 'Failed to communicate with the cluster - timeout. ' \ + 'Check if your cluster is running and your network ' \ + 'is stable.' + except ValueError as e: + return False, common_utils.format_exception(e) + except Exception as e: + return False, f'An error occurred: {str(e)}' + + +def get_current_kube_config_context_name() -> Optional[str]: + """ + Get the current kubernetes context from the kubeconfig file + + Returns: + str | None: The current kubernetes context if it exists, None otherwise + """ + k8s = kubernetes.get_kubernetes() + try: + _, current_context = k8s.config.list_kube_config_contexts() + return current_context['name'] + except k8s.config.config_exception.ConfigException: + return None + + +def get_current_kube_config_context_namespace() -> str: + """ + Get the current kubernetes context namespace from the kubeconfig file + + Returns: + str | None: The current kubernetes context namespace if it exists, else + the default namespace. + """ + k8s = kubernetes.get_kubernetes() + try: + _, current_context = k8s.config.list_kube_config_contexts() + if 'namespace' in current_context['context']: + return current_context['context']['namespace'] + else: + return DEFAULT_NAMESPACE + except k8s.config.config_exception.ConfigException: + return DEFAULT_NAMESPACE diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 new file mode 100644 index 00000000000..9a2e97f551d --- /dev/null +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -0,0 +1,334 @@ +cluster_name: {{cluster_name}} + +# The maximum number of workers nodes to launch in addition to the head +# node. +max_workers: {{num_nodes - 1}} +upscaling_speed: {{num_nodes - 1}} +idle_timeout_minutes: 60 + + +# Kubernetes resources that need to be configured for the autoscaler to be +# able to manage the Ray cluster. If any of the provided resources don't +# exist, the autoscaler will attempt to create them. If this fails, you may +# not have the required permissions and will have to request them to be +# created by your cluster administrator. +provider: + type: external + module: sky.skylet.providers.kubernetes.KubernetesNodeProvider + + # Use False if running from outside of k8s cluster + use_internal_ips: false + + timeout: {{timeout}} + + # ServiceAccount created by the autoscaler for the head node pod that it + # runs in. If this field isn't provided, the head pod config below must + # contain a user-created service account with the proper permissions. + autoscaler_service_account: + apiVersion: v1 + kind: ServiceAccount + metadata: + labels: + parent: skypilot + name: autoscaler + + # Role created by the autoscaler for the head node pod that it runs in. + # If this field isn't provided, the role referenced in + # autoscaler_role_binding must exist and have at least these permissions. + autoscaler_role: + kind: Role + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + labels: + parent: skypilot + name: autoscaler + # TODO(romilb): This is a very permissive role - gives all access in the + # namespace. We should restrict this. For reference, this is required + # for autodown and creating more SkyPilot clusters from within the pod. + rules: + - apiGroups: ["*"] + resources: ["*"] + verbs: ["*"] + + # RoleBinding created by the autoscaler for the head node pod that it runs + # in. If this field isn't provided, the head pod config below must contain + # a user-created service account with the proper permissions. + autoscaler_role_binding: + apiVersion: rbac.authorization.k8s.io/v1 + kind: RoleBinding + metadata: + labels: + parent: skypilot + name: autoscaler + subjects: + - kind: ServiceAccount + name: autoscaler + roleRef: + kind: Role + name: autoscaler + apiGroup: rbac.authorization.k8s.io + + services: + # Service to expose the head node pod's SSH port. + - apiVersion: v1 + kind: Service + metadata: + labels: + parent: skypilot + skypilot-cluster: {{cluster_name}} + name: {{cluster_name}}-ray-head-ssh + spec: + type: NodePort + selector: + component: {{cluster_name}}-ray-head + ports: + - protocol: TCP + port: 22 + targetPort: 22 + # Service that maps to the head node of the Ray cluster. + - apiVersion: v1 + kind: Service + metadata: + labels: + parent: skypilot + skypilot-cluster: {{cluster_name}} + # NOTE: If you're running multiple Ray clusters with services + # on one Kubernetes cluster, they must have unique service + # names. + name: {{cluster_name}}-ray-head + spec: + # This selector must match the head node pod's selector below. + selector: + component: {{cluster_name}}-ray-head + ports: + - name: client + protocol: TCP + port: 10001 + targetPort: 10001 + - name: dashboard + protocol: TCP + port: 8265 + targetPort: 8265 + +# Specify the pod type for the ray head node (as configured below). +head_node_type: ray_head_default +# Specify the allowed pod types for this ray cluster and the resources they provide. +available_node_types: + ray_head_default: + node_config: + apiVersion: v1 + kind: Pod + metadata: + name: {{cluster_name}}-ray-head + # Must match the head node service selector above if a head node + # service is required. + labels: + parent: skypilot + component: {{cluster_name}}-ray-head + skypilot-cluster: {{cluster_name}} + spec: + # Change this if you altered the autoscaler_service_account above + # or want to provide your own. + serviceAccountName: autoscaler + + restartPolicy: Never + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumes: + - name: secret-volume + secret: + secretName: {{k8s_ssh_key_secret_name}} + - name: dshm + emptyDir: + medium: Memory + - name: dev-fuse # Required for fuse mounting + hostPath: + path: /dev/fuse + containers: + - name: ray-node + imagePullPolicy: IfNotPresent + image: {{image_id}} + # Do not change this command - it keeps the pod alive until it is + # explicitly killed. + command: ["/bin/bash", "-c", "--"] + args: ['trap : TERM INT; sleep infinity & wait;'] + ports: + - containerPort: 22 # Used for SSH + - containerPort: {{ray_port}} # Redis port + - containerPort: 10001 # Used by Ray Client + - containerPort: {{ray_dashboard_port}} # Used by Ray Dashboard + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumeMounts: + - name: secret-volume + readOnly: true + mountPath: "/etc/secret-volume" + - mountPath: /dev/shm + name: dshm + - mountPath: /dev/fuse # Required for FUSE mounting + name: dev-fuse + securityContext: # Required for FUSE mounting, but may be a security risk + privileged: true + lifecycle: + postStart: + exec: + command: ["/bin/bash", "-c", "mkdir -p ~/.ssh && cp /etc/secret-volume/ssh-publickey ~/.ssh/authorized_keys && sudo service ssh restart"] + resources: + requests: + cpu: {{cpus}} + memory: {{memory}}G + limits: + # The maximum memory that this pod is allowed to use. The + # limit will be detected by ray and split to use 10% for + # redis, 30% for the shared memory object store, and the + # rest for application memory. If this limit is not set and + # the object store size is not set manually, ray will + # allocate a very large object store in each pod that may + # cause problems for other pods. + cpu: {{cpus}} + memory: {{memory}}G + ray_worker_default: + # Minimum number of Ray workers of this Pod type. + min_workers: {{num_nodes - 1}} + # Maximum number of Ray workers of this Pod type. Takes precedence over min_workers. + max_workers: {{num_nodes - 1}} + # User-specified custom resources for use by Ray. Object with string keys and integer values. + # (Ray detects CPU and GPU from pod spec resource requests and limits, so no need to fill those here.) + # resources: {"example-resource-a": 1, "example-resource-b": 2} + node_config: + apiVersion: v1 + kind: Pod + metadata: + labels: + parent: skypilot + skypilot-cluster: {{cluster_name}} + # Automatically generates a name for the pod with this prefix. + generateName: {{cluster_name}}-ray-worker- + spec: + serviceAccountName: skypilot-service-account + restartPolicy: Never + volumes: + - name: secret-volume + secret: + secretName: {{k8s_ssh_key_secret_name}} + - name: dshm + emptyDir: + medium: Memory + - name: dev-fuse # Required for fuse mounting + hostPath: + path: /dev/fuse + containers: + - name: ray-node + imagePullPolicy: IfNotPresent + image: {{image_id}} + command: ["/bin/bash", "-c", "--"] + args: ["trap : TERM INT; sleep infinity & wait;"] + lifecycle: + postStart: + exec: + command: ["/bin/bash", "-c", "mkdir -p ~/.ssh && cp /etc/secret-volume/ssh-publickey ~/.ssh/authorized_keys && sudo service ssh restart"] + ports: + - containerPort: 22 # Used for SSH + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumeMounts: + - name: secret-volume + readOnly: true + mountPath: "/etc/secret-volume" + - mountPath: /dev/shm + name: dshm + - mountPath: /dev/fuse # Required for fuse mounting + name: dev-fuse + securityContext: # Required for FUSE mounting. TODO(romilb) - evaluate security risk + privileged: true + resources: + requests: + cpu: {{cpus}} + memory: {{memory}}G + limits: + # The maximum memory that this pod is allowed to use. The + # limit will be detected by ray and split to use 10% for + # redis, 30% for the shared memory object store, and the + # rest for application memory. If this limit is not set and + # the object store size is not set manually, ray will + # allocate a very large object store in each pod that may + # cause problems for other pods. + cpu: {{cpus}} + memory: {{memory}}G + +setup_commands: + # Disable `unattended-upgrades` to prevent apt-get from hanging. It should be called at the beginning before the process started to avoid being blocked. (This is a temporary fix.) + # Create ~/.ssh/config file in case the file does not exist in the image. + # Line 'sudo bash ..': set the ulimit as suggested by ray docs for performance. https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html#system-configuration + # Line 'sudo grep ..': set the number of threads per process to unlimited to avoid ray job submit stucking issue when the number of running ray jobs increase. + # Line 'mkdir -p ..': disable host key check + # Line 'python3 -c ..': patch the buggy ray files and enable `-o allow_other` option for `goofys` + - mkdir -p ~/.ssh; touch ~/.ssh/config; + pip3 --version > /dev/null 2>&1 || (curl -sSL https://bootstrap.pypa.io/get-pip.py -o get-pip.py && python3 get-pip.py && echo "PATH=$HOME/.local/bin:$PATH" >> ~/.bashrc); + (type -a python | grep -q python3) || echo 'alias python=python3' >> ~/.bashrc; + (type -a pip | grep -q pip3) || echo 'alias pip=pip3' >> ~/.bashrc; + {{ conda_installation_commands }} + source ~/.bashrc; + mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app && touch ~/.sudo_as_admin_successful; + (pip3 list | grep skypilot && [ "$(cat {{sky_remote_path}}/current_sky_wheel_hash)" == "{{sky_wheel_hash}}" ]) || (pip3 uninstall skypilot -y; pip3 install "$(echo {{sky_remote_path}}/{{sky_wheel_hash}}/skypilot-{{sky_version}}*.whl)" && echo "{{sky_wheel_hash}}" > {{sky_remote_path}}/current_sky_wheel_hash || exit 1); + sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1048576" >> /etc/security/limits.conf; echo "* hard nofile 1048576" >> /etc/security/limits.conf'; + sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || (sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf'); sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity; sudo systemctl daemon-reload; + mkdir -p ~/.ssh; (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; + python3 -c "from sky.skylet.ray_patches import patch; patch()" || exit 1; + [ -f /etc/fuse.conf ] && sudo sed -i 's/#user_allow_other/user_allow_other/g' /etc/fuse.conf || (sudo sh -c 'echo "user_allow_other" > /etc/fuse.conf'); + +# Command to start ray on the head node. You don't need to change this. +# NOTE: these are very performance-sensitive. Each new item opens/closes an SSH +# connection, which is expensive. Try your best to co-locate commands into fewer +# items! The same comment applies for worker_start_ray_commands. +# +# Increment the following for catching performance bugs easier: +# current num items (num SSH connections): 2 +# Note dashboard-host is set to 0.0.0.0 so that kubernetes can port forward. +head_start_ray_commands: + # Start skylet daemon. (Should not place it in the head_setup_commands, otherwise it will run before sky is installed.) + # NOTE: --disable-usage-stats in `ray start` saves 10 seconds of idle wait. + # Line "which prlimit ..": increase the limit of the number of open files for the raylet process, as the `ulimit` may not take effect at this point, because it requires + # all the sessions to be reloaded. This is a workaround. + - ((ps aux | grep -v nohup | grep -v grep | grep -q -- "python3 -m sky.skylet.skylet") || nohup python3 -m sky.skylet.skylet >> ~/.sky/skylet.log 2>&1 &); + ray stop; RAY_SCHEDULER_EVENTS=0 RAY_DEDUP_LOGS=0 ray start --disable-usage-stats --head --port={{ray_port}} --dashboard-port={{ray_dashboard_port}} --dashboard-host 0.0.0.0 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml {{"--resources='%s'" % custom_resources if custom_resources}} --temp-dir {{ray_temp_dir}} || exit 1; + which prlimit && for id in $(pgrep -f raylet/raylet); do sudo prlimit --nofile=1048576:1048576 --pid=$id || true; done; + {{dump_port_command}}; + +{%- if num_nodes > 1 %} +worker_start_ray_commands: + - ray stop; RAY_SCHEDULER_EVENTS=0 RAY_DEDUP_LOGS=0 ray start --disable-usage-stats --address=$RAY_HEAD_IP:{{ray_port}} --object-manager-port=8076 {{"--resources='%s'" % custom_resources if custom_resources}} --temp-dir {{ray_temp_dir}} || exit 1; + which prlimit && for id in $(pgrep -f raylet/raylet); do sudo prlimit --nofile=1048576:1048576 --pid=$id || true; done; +{%- else %} +worker_start_ray_commands: [] +{%- endif %} + +head_node: {} +worker_nodes: {} + +# Format: `REMOTE_PATH : LOCAL_PATH` +file_mounts: { + "{{sky_ray_yaml_remote_path}}": "{{sky_ray_yaml_local_path}}", + "{{sky_remote_path}}/{{sky_wheel_hash}}": "{{sky_local_path}}", +{%- for remote_path, local_path in credentials.items() %} + "{{remote_path}}": "{{local_path}}", +{%- endfor %} +} + +auth: + ssh_user: sky + ssh_private_key: {{ssh_private_key}} + +# These fields are required for external cloud providers. +head_setup_commands: [] +worker_setup_commands: [] +cluster_synced_files: [] +file_mounts_sync_continuously: False +initialization_commands: [] +rsync_exclude: [] + diff --git a/sky/utils/__init__.py b/sky/utils/__init__.py index f5d3dc7a34a..eff27bdd65b 100644 --- a/sky/utils/__init__.py +++ b/sky/utils/__init__.py @@ -1 +1,2 @@ """Utility functions.""" +from sky.skylet.providers.kubernetes import utils as kubernetes_utils diff --git a/sky/utils/command_runner.py b/sky/utils/command_runner.py index 1f56fa2453d..cea171773db 100644 --- a/sky/utils/command_runner.py +++ b/sky/utils/command_runner.py @@ -44,11 +44,14 @@ def ssh_options_list(ssh_private_key: Optional[str], ssh_control_name: Optional[str], *, ssh_proxy_command: Optional[str] = None, - timeout: int = 30) -> List[str]: + timeout: int = 30, + port: int = 22) -> List[str]: """Returns a list of sane options for 'ssh'.""" # Forked from Ray SSHOptions: # https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/_private/command_runner.py arg_dict = { + # SSH port + 'Port': port, # Supresses initial fingerprint verification. 'StrictHostKeyChecking': 'no', # SSH IP and fingerprint pairs no longer added to known_hosts. @@ -119,6 +122,7 @@ def __init__( ssh_private_key: str, ssh_control_name: Optional[str] = '__default__', ssh_proxy_command: Optional[str] = None, + port: int = 22, ): """Initialize SSHCommandRunner. @@ -138,6 +142,7 @@ def __init__( ssh_proxy_command: Optional, the value to pass to '-o ProxyCommand'. Useful for communicating with clusters without public IPs using a "jump server". + port: The port to use for ssh. """ self.ip = ip self.ssh_user = ssh_user @@ -146,6 +151,7 @@ def __init__( None if ssh_control_name is None else hashlib.md5( ssh_control_name.encode()).hexdigest()[:_HASH_MAX_LENGTH]) self._ssh_proxy_command = ssh_proxy_command + self.port = port @staticmethod def make_runner_list( @@ -154,11 +160,15 @@ def make_runner_list( ssh_private_key: str, ssh_control_name: Optional[str] = None, ssh_proxy_command: Optional[str] = None, + port_list: Optional[List[int]] = None, ) -> List['SSHCommandRunner']: """Helper function for creating runners with the same ssh credentials""" + if not port_list: + port_list = [22] * len(ip_list) return [ SSHCommandRunner(ip, ssh_user, ssh_private_key, ssh_control_name, - ssh_proxy_command) for ip in ip_list + ssh_proxy_command, port) + for ip, port in zip(ip_list, port_list) ] def _ssh_base_command(self, *, ssh_mode: SshMode, @@ -181,6 +191,7 @@ def _ssh_base_command(self, *, ssh_mode: SshMode, self.ssh_private_key, self.ssh_control_name, ssh_proxy_command=self._ssh_proxy_command, + port=self.port, ) + [f'{self.ssh_user}@{self.ip}'] def run( @@ -335,6 +346,7 @@ def rsync( self.ssh_private_key, self.ssh_control_name, ssh_proxy_command=self._ssh_proxy_command, + port=self.port, )) rsync_command.append(f'-e "ssh {ssh_options}"') # To support spaces in the path, we need to quote source and target. diff --git a/sky/utils/command_runner.pyi b/sky/utils/command_runner.pyi index 7120755441f..53e78db15a8 100644 --- a/sky/utils/command_runner.pyi +++ b/sky/utils/command_runner.pyi @@ -36,20 +36,25 @@ class SSHCommandRunner: ssh_user: str ssh_private_key: str ssh_control_name: Optional[str] + port: int def __init__(self, ip: str, ssh_user: str, ssh_private_key: str, - ssh_control_name: Optional[str] = ...) -> None: + ssh_control_name: Optional[str] = ..., + port: str = ...) -> None: ... @staticmethod def make_runner_list( - ip_list: List[str], - ssh_user: str, - ssh_private_key: str, - ssh_control_name: Optional[str] = ...) -> List['SSHCommandRunner']: + ip_list: List[str], + ssh_user: str, + ssh_private_key: str, + ssh_control_name: Optional[str] = ..., + ssh_proxy_command: Optional[str] = ..., + port_list: Optional[List[int]] = ..., + ) -> List['SSHCommandRunner']: ... @typing.overload diff --git a/sky/utils/kubernetes/__init__.py b/sky/utils/kubernetes/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sky/utils/kubernetes/create_cluster.sh b/sky/utils/kubernetes/create_cluster.sh new file mode 100755 index 00000000000..c5b74f6819d --- /dev/null +++ b/sky/utils/kubernetes/create_cluster.sh @@ -0,0 +1,45 @@ +#!/bin/bash +# Creates a local Kubernetes cluster using kind +# Usage: ./create_cluster.sh +# Invokes generate_kind_config.py to generate a kind-cluster.yaml with NodePort mappings +set -e + +# Limit port range to speed up kind cluster creation +PORT_RANGE_START=30000 +PORT_RANGE_END=30100 + +# Check if docker is running +if ! docker info > /dev/null 2>&1; then + >&2 echo "Docker is not running. Please start Docker and try again." + exit 1 +fi + +# Check if kind is installed +if ! kind version > /dev/null 2>&1; then + >&2 echo "kind is not installed. Please install kind and try again. Installation instructions: https://kind.sigs.k8s.io/docs/user/quick-start/#installation." + exit 1 +fi + +# Check if the local cluster already exists +if kind get clusters | grep -q skypilot; then + echo "Local cluster already exists. Exiting." + # Switch context to the local cluster + kubectl config use-context kind-skypilot + exit 100 +fi + +# Generate cluster YAML +echo "Generating /tmp/skypilot-kind.yaml" +python -m sky.utils.kubernetes.generate_kind_config --path /tmp/skypilot-kind.yaml --port-start ${PORT_RANGE_START} --port-end ${PORT_RANGE_END} + +kind create cluster --config /tmp/skypilot-kind.yaml --name skypilot + +# Load local skypilot image on to the cluster for faster startup +echo "Loading local skypilot image on to the cluster" +docker pull us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/skypilot:latest +kind load docker-image --name skypilot us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/skypilot:latest + +# Print CPUs available on the local cluster +NUM_CPUS=$(kubectl get nodes -o jsonpath='{.items[0].status.capacity.cpu}') +echo "Kubernetes cluster ready! Run `sky check` to setup Kubernetes access." +echo "Number of CPUs available on the local cluster: $NUM_CPUS" diff --git a/sky/utils/kubernetes/delete_cluster.sh b/sky/utils/kubernetes/delete_cluster.sh new file mode 100755 index 00000000000..1f93270f414 --- /dev/null +++ b/sky/utils/kubernetes/delete_cluster.sh @@ -0,0 +1,33 @@ +#!/bin/bash +# Deletes the local kind cluster +# Usage: ./delete_cluster.sh +# Raises error code 100 if the local cluster does not exist + +set -e +# Check if docker is running +if ! docker info > /dev/null 2>&1; then + >&2 echo "Docker is not running. Please start Docker and try again." + exit 1 +fi + +# Check if kind is installed +if ! kind version > /dev/null 2>&1; then + >&2 echo "kind is not installed. Please install kind and try again." + exit 1 +fi + +# Check if the local cluster exists +if ! kind get clusters | grep -q skypilot; then + echo "Local cluster does not exist. Exiting." + exit 100 +fi + +kind delete cluster --name skypilot +echo "Local cluster deleted!" + +# Switch to the first available context +AVAILABLE_CONTEXT=$(kubectl config get-contexts -o name | head -n 1) +if [ ! -z "$AVAILABLE_CONTEXT" ]; then + echo "Switching to context $AVAILABLE_CONTEXT" + kubectl config use-context $AVAILABLE_CONTEXT +fi diff --git a/sky/utils/kubernetes/generate_kind_config.py b/sky/utils/kubernetes/generate_kind_config.py new file mode 100644 index 00000000000..404deabcec6 --- /dev/null +++ b/sky/utils/kubernetes/generate_kind_config.py @@ -0,0 +1,73 @@ +"""Generates a kind cluster config file + +Maps specified ports from host to cluster container. +""" +import argparse +import textwrap + + +def generate_kind_config(path: str, + port_start: int = 30000, + port_end: int = 32768, + num_nodes=1) -> None: + """ + Generate a kind cluster config file with ports mapped from host to container + Args: + path: Path to generate the config file at + port_start: Port range start + port_end: Port range end + num_nodes: Number of nodes in the cluster + """ + + preamble = textwrap.dedent(f""" + apiVersion: kind.x-k8s.io/v1alpha4 + kind: Cluster + kubeadmConfigPatches: + - | + kind: ClusterConfiguration + apiServer: + extraArgs: + "service-node-port-range": {port_start}-{port_end} + nodes: + - role: control-plane + extraPortMappings:""") + suffix = '' + if num_nodes > 1: + for _ in range(1, num_nodes): + suffix += """- role: worker\n""" + with open(path, 'w') as f: + f.write(preamble) + for port in range(port_start, port_end + 1): + f.write(f""" + - containerPort: {port} + hostPort: {port} + listenAddress: "0.0.0.0" + protocol: tcp""") + f.write('\n') + if suffix: + f.write(suffix) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Generate a kind cluster ' + 'config file with ports mapped' + ' from host to container') + parser.add_argument('--path', + type=str, + default='/tmp/skypilot-kind.yaml', + help='Path to generate the config file at') + parser.add_argument('--port-start', + type=int, + default=30000, + help='Port range start') + parser.add_argument('--port-end', + type=int, + default=32768, + help='Port range end') + parser.add_argument('--num-nodes', + type=int, + default=1, + help='Number of nodes in the cluster') + args = parser.parse_args() + generate_kind_config(args.path, args.port_start, args.port_end, + args.num_nodes) diff --git a/tests/conftest.py b/tests/conftest.py index 7976bf10132..299ec455efe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,7 +21,8 @@ # # To only run tests for managed spot (without generic tests), use --managed-spot. all_clouds_in_smoke_tests = [ - 'aws', 'gcp', 'azure', 'lambda', 'cloudflare', 'ibm', 'scp', 'oci' + 'aws', 'gcp', 'azure', 'lambda', 'cloudflare', 'ibm', 'scp', 'oci', + 'kubernetes' ] default_clouds_to_run = ['gcp', 'azure'] @@ -37,6 +38,7 @@ 'ibm': 'ibm', 'scp': 'scp', 'oci': 'oci', + 'kubernetes': 'kubernetes' } @@ -130,25 +132,30 @@ def pytest_collection_modifyitems(config, items): in item.keywords) and config.getoption('--managed-spot'): item.add_marker(skip_marks['managed_spot']) + # Check if tests need to be run serially for Kubernetes and Lambda Cloud # We run Lambda Cloud tests serially because Lambda Cloud rate limits its # launch API to one launch every 10 seconds. - serial_mark = pytest.mark.xdist_group(name='serial_lambda_cloud') + # We run Kubernetes tests serially because the Kubernetes cluster may have + # limited resources (e.g., just 8 cpus). + serial_mark = pytest.mark.xdist_group( + name=f'serial_{generic_cloud_keyword}') # Handle generic tests - if generic_cloud == 'lambda': + if generic_cloud in ['lambda', 'kubernetes']: for item in items: if (_is_generic_test(item) and - 'no_lambda_cloud' not in item.keywords): + f'no_{generic_cloud_keyword}' not in item.keywords): item.add_marker(serial_mark) # Adding the serial mark does not update the item.nodeid, # but item.nodeid is important for pytest.xdist_group, e.g. # https://github.com/pytest-dev/pytest-xdist/blob/master/src/xdist/scheduler/loadgroup.py # This is a hack to update item.nodeid - item._nodeid = f'{item.nodeid}@serial_lambda_cloud' - # Handle Lambda Cloud specific tests + item._nodeid = f'{item.nodeid}@serial_{generic_cloud_keyword}' + # Handle generic cloud specific tests for item in items: - if 'lambda_cloud' in item.keywords: - item.add_marker(serial_mark) - item._nodeid = f'{item.nodeid}@serial_lambda_cloud' # See comment on item.nodeid above + if generic_cloud in ['lambda', 'kubernetes']: + if generic_cloud_keyword in item.keywords: + item.add_marker(serial_mark) + item._nodeid = f'{item.nodeid}@serial_{generic_cloud_keyword}' # See comment on item.nodeid above def _is_generic_test(item) -> bool: diff --git a/tests/kubernetes/README.md b/tests/kubernetes/README.md new file mode 100644 index 00000000000..5a44fffd1b2 --- /dev/null +++ b/tests/kubernetes/README.md @@ -0,0 +1,42 @@ +# SkyPilot Kubernetes Development Scripts + +This directory contains useful scripts and notes for developing SkyPilot on Kubernetes. + +## Building and pushing SkyPilot image + +We maintain a container image that has all basic SkyPilot dependencies installed. +This image is hosted at `us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/skypilot:latest`. + +To build this image locally and optionally push to the SkyPilot registry, run: +```bash +# Build and load image locally +./build_image.sh +# Build and push image (CAREFUL - this will push to the SkyPilot registry!) +./build_image.sh -p +``` + +## Running a local development cluster +We use (kind)[https://kind.sigs.k8s.io/] to run a local Kubernetes cluster +for development. + +```bash +sky local up +``` + +## Running a GKE cluster +1. Make sure ports 30000-32767 are open in your node pool VPC's firewall. +2. Create a GKE cluster with at least 1 node. We recommend creating nodes with at least 4 vCPUs. + * Note - only GKE standard clusters are supported. GKE autopilot clusters are not supported. +3. Get the kubeconfig for your cluster and place it in `~/.kube/config`: +```bash +gcloud container clusters get-credentials --region +# Example: +# gcloud container clusters get-credentials testcluster --region us-central1-c +``` +4. Verify by running `kubectl get nodes`. You should see your nodes. +5. You can run SkyPilot tasks now. + +## Other useful scripts +`scripts` directory contains other useful scripts for development, including +Kubernetes dashboard, ray yaml for testing the SkyPilot Kubernetes node provider +and more. \ No newline at end of file diff --git a/tests/kubernetes/build_image.sh b/tests/kubernetes/build_image.sh new file mode 100755 index 00000000000..2babd0cd95a --- /dev/null +++ b/tests/kubernetes/build_image.sh @@ -0,0 +1,45 @@ +#!/bin/bash +# Builds the Dockerfile_k8s image as the SkyPilot image. +# Optionally, if -p is specified, pushes the image to the registry. +# Uses buildx to build the image for both amd64 and arm64. +# Usage: ./build_image.sh [-p] +# -p: Push the image to the registry + +TAG=us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/skypilot:latest + +# Parse command line arguments +while getopts ":p" opt; do + case $opt in + p) + push=true + ;; + \?) + echo "Invalid option: -$OPTARG" >&2 + ;; + esac +done + +# Navigate to the root of the project (inferred from git) +cd "$(git rev-parse --show-toplevel)" + +# If push is used, build the image for both amd64 and arm64 +if [[ $push ]]; then + echo "Building and pushing for amd64 and arm64" + # Push both platforms as one image manifest list + docker buildx build --push --platform linux/amd64,linux/arm64 -t $TAG -f Dockerfile_k8s ./sky +fi + +# Load the right image depending on the architecture of the host machine (Apple Silicon or Intel) +if [[ $(uname -m) == "arm64" ]]; then + echo "Loading image for arm64 (Apple Silicon etc.)" + docker buildx build --load --platform linux/arm64 -t $TAG -f Dockerfile_k8s ./sky +elif [[ $(uname -m) == "x86_64" ]]; then + echo "Building for amd64 (Intel CPUs)" + docker buildx build --load --platform linux/amd64 -t $TAG -f Dockerfile_k8s ./sky +else + echo "Unsupported architecture: $(uname -m)" + exit 1 +fi + +echo "Tagging image as skypilot:latest" +docker tag $TAG skypilot:latest diff --git a/tests/kubernetes/scripts/clean_k8s.sh b/tests/kubernetes/scripts/clean_k8s.sh new file mode 100644 index 00000000000..66b14c18b6c --- /dev/null +++ b/tests/kubernetes/scripts/clean_k8s.sh @@ -0,0 +1 @@ +kubectl delete all -l parent=skypilot diff --git a/tests/kubernetes/scripts/dashboard.yaml b/tests/kubernetes/scripts/dashboard.yaml new file mode 100644 index 00000000000..80308368456 --- /dev/null +++ b/tests/kubernetes/scripts/dashboard.yaml @@ -0,0 +1,306 @@ +# Copyright 2017 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Namespace +metadata: + name: kubernetes-dashboard + +--- + +apiVersion: v1 +kind: ServiceAccount +metadata: + labels: + k8s-app: kubernetes-dashboard + name: kubernetes-dashboard + namespace: kubernetes-dashboard + +--- + +kind: Service +apiVersion: v1 +metadata: + labels: + k8s-app: kubernetes-dashboard + name: kubernetes-dashboard + namespace: kubernetes-dashboard +spec: + ports: + - port: 443 + targetPort: 8443 + selector: + k8s-app: kubernetes-dashboard + +--- + +apiVersion: v1 +kind: Secret +metadata: + labels: + k8s-app: kubernetes-dashboard + name: kubernetes-dashboard-certs + namespace: kubernetes-dashboard +type: Opaque + +--- + +apiVersion: v1 +kind: Secret +metadata: + labels: + k8s-app: kubernetes-dashboard + name: kubernetes-dashboard-csrf + namespace: kubernetes-dashboard +type: Opaque +data: + csrf: "" + +--- + +apiVersion: v1 +kind: Secret +metadata: + labels: + k8s-app: kubernetes-dashboard + name: kubernetes-dashboard-key-holder + namespace: kubernetes-dashboard +type: Opaque + +--- + +kind: ConfigMap +apiVersion: v1 +metadata: + labels: + k8s-app: kubernetes-dashboard + name: kubernetes-dashboard-settings + namespace: kubernetes-dashboard + +--- + +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + labels: + k8s-app: kubernetes-dashboard + name: kubernetes-dashboard + namespace: kubernetes-dashboard +rules: + # Allow Dashboard to get, update and delete Dashboard exclusive secrets. + - apiGroups: [""] + resources: ["secrets"] + resourceNames: ["kubernetes-dashboard-key-holder", "kubernetes-dashboard-certs", "kubernetes-dashboard-csrf"] + verbs: ["get", "update", "delete"] + # Allow Dashboard to get and update 'kubernetes-dashboard-settings' config map. + - apiGroups: [""] + resources: ["configmaps"] + resourceNames: ["kubernetes-dashboard-settings"] + verbs: ["get", "update"] + # Allow Dashboard to get metrics. + - apiGroups: [""] + resources: ["services"] + resourceNames: ["heapster", "dashboard-metrics-scraper"] + verbs: ["proxy"] + - apiGroups: [""] + resources: ["services/proxy"] + resourceNames: ["heapster", "http:heapster:", "https:heapster:", "dashboard-metrics-scraper", "http:dashboard-metrics-scraper"] + verbs: ["get"] + +--- + +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + labels: + k8s-app: kubernetes-dashboard + name: kubernetes-dashboard +rules: + # Allow Metrics Scraper to get metrics from the Metrics server + - apiGroups: ["metrics.k8s.io"] + resources: ["pods", "nodes"] + verbs: ["get", "list", "watch"] + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + labels: + k8s-app: kubernetes-dashboard + name: kubernetes-dashboard + namespace: kubernetes-dashboard +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: kubernetes-dashboard +subjects: + - kind: ServiceAccount + name: kubernetes-dashboard + namespace: kubernetes-dashboard + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: kubernetes-dashboard + labels: + k8s-app: kubernetes-dashboard +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: cluster-admin +subjects: +- kind: ServiceAccount + name: kubernetes-dashboard + namespace: kubernetes-dashboard + +--- + +kind: Deployment +apiVersion: apps/v1 +metadata: + labels: + k8s-app: kubernetes-dashboard + name: kubernetes-dashboard + namespace: kubernetes-dashboard +spec: + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + k8s-app: kubernetes-dashboard + template: + metadata: + labels: + k8s-app: kubernetes-dashboard + spec: + containers: + - name: kubernetes-dashboard + image: kubernetesui/dashboard:v2.3.1 + imagePullPolicy: Always + ports: + - containerPort: 8443 + protocol: TCP + args: + - --enable-skip-login + - --disable-settings-authorizer + - --auto-generate-certificates + - --namespace=kubernetes-dashboard + # Uncomment the following line to manually specify Kubernetes API server Host + # If not specified, Dashboard will attempt to auto discover the API server and connect + # to it. Uncomment only if the default does not work. + # - --apiserver-host=http://my-address:port + volumeMounts: + - name: kubernetes-dashboard-certs + mountPath: /certs + # Create on-disk volume to store exec logs + - mountPath: /tmp + name: tmp-volume + livenessProbe: + httpGet: + scheme: HTTPS + path: / + port: 8443 + initialDelaySeconds: 30 + timeoutSeconds: 30 + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + runAsUser: 1001 + runAsGroup: 2001 + volumes: + - name: kubernetes-dashboard-certs + secret: + secretName: kubernetes-dashboard-certs + - name: tmp-volume + emptyDir: {} + serviceAccountName: kubernetes-dashboard + nodeSelector: + "kubernetes.io/os": linux + # Comment the following tolerations if Dashboard must not be deployed on master + tolerations: + - key: node-role.kubernetes.io/master + effect: NoSchedule + +--- + +kind: Service +apiVersion: v1 +metadata: + labels: + k8s-app: dashboard-metrics-scraper + name: dashboard-metrics-scraper + namespace: kubernetes-dashboard +spec: + ports: + - port: 8000 + targetPort: 8000 + selector: + k8s-app: dashboard-metrics-scraper + +--- + +kind: Deployment +apiVersion: apps/v1 +metadata: + labels: + k8s-app: dashboard-metrics-scraper + name: dashboard-metrics-scraper + namespace: kubernetes-dashboard +spec: + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + k8s-app: dashboard-metrics-scraper + template: + metadata: + labels: + k8s-app: dashboard-metrics-scraper + annotations: + seccomp.security.alpha.kubernetes.io/pod: 'runtime/default' + spec: + containers: + - name: dashboard-metrics-scraper + image: kubernetesui/metrics-scraper:v1.0.6 + ports: + - containerPort: 8000 + protocol: TCP + livenessProbe: + httpGet: + scheme: HTTP + path: / + port: 8000 + initialDelaySeconds: 30 + timeoutSeconds: 30 + volumeMounts: + - mountPath: /tmp + name: tmp-volume + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + runAsUser: 1001 + runAsGroup: 2001 + serviceAccountName: kubernetes-dashboard + nodeSelector: + "kubernetes.io/os": linux + # Comment the following tolerations if Dashboard must not be deployed on master + tolerations: + - key: node-role.kubernetes.io/master + effect: NoSchedule + volumes: + - name: tmp-volume + emptyDir: {} \ No newline at end of file diff --git a/tests/kubernetes/scripts/delete.sh b/tests/kubernetes/scripts/delete.sh new file mode 100644 index 00000000000..08c8205ce77 --- /dev/null +++ b/tests/kubernetes/scripts/delete.sh @@ -0,0 +1 @@ +kubectl delete -f skypilot_ssh_k8s_deployment.yaml diff --git a/tests/kubernetes/scripts/install_dashboard.sh b/tests/kubernetes/scripts/install_dashboard.sh new file mode 100644 index 00000000000..0fc8f5b89da --- /dev/null +++ b/tests/kubernetes/scripts/install_dashboard.sh @@ -0,0 +1,5 @@ +kubectl apply -f dashboard.yaml +echo "Dashboard installed, please run 'kubectl proxy' and visit http://localhost:8001/api/v1/namespaces/kubernetes-dashboard/services/https:kubernetes-dashboard:/proxy/#/node?namespace=default" +kubectl proxy + +# kubectl get ns kubernetes-dashboard -o json | jq '.spec.finalizers = []' | kubectl replace --raw "/api/v1/namespaces/kubernetes-dashboard/finalize" -f - \ No newline at end of file diff --git a/tests/kubernetes/scripts/ray_k8s_sky.yaml b/tests/kubernetes/scripts/ray_k8s_sky.yaml new file mode 100644 index 00000000000..e686e91ff08 --- /dev/null +++ b/tests/kubernetes/scripts/ray_k8s_sky.yaml @@ -0,0 +1,265 @@ +# run with ray up ray_k8s_sky.yaml --no-config-cache +# An unique identifier for the head node and workers of this cluster. +cluster_name: example-cluster + +# The maximum number of workers nodes to launch in addition to the head +# node. +min_workers: 0 +max_workers: 0 + +# Kubernetes resources that need to be configured for the autoscaler to be +# able to manage the Ray cluster. If any of the provided resources don't +# exist, the autoscaler will attempt to create them. If this fails, you may +# not have the required permissions and will have to request them to be +# created by your cluster administrator. +provider: + type: external + module: sky.skylet.providers.kubernetes.KubernetesNodeProvider + + # Use False if running from outside of k8s cluster + use_internal_ips: false + + # Namespace to use for all resources created. + namespace: default + + # ServiceAccount created by the autoscaler for the head node pod that it + # runs in. If this field isn't provided, the head pod config below must + # contain a user-created service account with the proper permissions. + autoscaler_service_account: + apiVersion: v1 + kind: ServiceAccount + metadata: + labels: + parent: skypilot + name: autoscaler + + # Role created by the autoscaler for the head node pod that it runs in. + # If this field isn't provided, the role referenced in + # autoscaler_role_binding must exist and have at least these permissions. + autoscaler_role: + kind: Role + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + labels: + parent: skypilot + name: autoscaler + rules: + - apiGroups: [""] + resources: ["pods", "pods/status", "pods/exec"] + verbs: ["get", "watch", "list", "create", "delete", "patch"] + + # RoleBinding created by the autoscaler for the head node pod that it runs + # in. If this field isn't provided, the head pod config below must contain + # a user-created service account with the proper permissions. + autoscaler_role_binding: + apiVersion: rbac.authorization.k8s.io/v1 + kind: RoleBinding + metadata: + labels: + parent: skypilot + name: autoscaler + subjects: + - kind: ServiceAccount + name: autoscaler + roleRef: + kind: Role + name: autoscaler + apiGroup: rbac.authorization.k8s.io + + services: + # Service to expose the head node pod's SSH port. + - apiVersion: v1 + kind: Service + metadata: + labels: + parent: skypilot + name: example-cluster-ray-head-ssh + spec: + type: NodePort + selector: + component: example-cluster-ray-head + ports: + - protocol: TCP + port: 22 + targetPort: 22 + # Service that maps to the head node of the Ray cluster. + - apiVersion: v1 + kind: Service + metadata: + labels: + parent: skypilot + # NOTE: If you're running multiple Ray clusters with services + # on one Kubernetes cluster, they must have unique service + # names. + name: example-cluster-ray-head + spec: + # This selector must match the head node pod's selector below. + selector: + component: example-cluster-ray-head + ports: + - name: client + protocol: TCP + port: 10001 + targetPort: 10001 + - name: dashboard + protocol: TCP + port: 8265 + targetPort: 8265 + +# Specify the pod type for the ray head node (as configured below). +head_node_type: head_node +# Specify the allowed pod types for this ray cluster and the resources they provide. +available_node_types: + worker_node: + # Minimum number of Ray workers of this Pod type. + min_workers: 0 + # Maximum number of Ray workers of this Pod type. Takes precedence over min_workers. + max_workers: 0 + # User-specified custom resources for use by Ray. Object with string keys and integer values. + # (Ray detects CPU and GPU from pod spec resource requests and limits, so no need to fill those here.) + resources: {"example-resource-a": 1, "example-resource-b": 2} + node_config: + apiVersion: v1 + kind: Pod + metadata: + labels: + parent: skypilot + # Automatically generates a name for the pod with this prefix. + generateName: example-cluster-ray-worker- + spec: + restartPolicy: Never + volumes: + - name: secret-volume + secret: + secretName: ssh-key-secret + - name: dshm + emptyDir: + medium: Memory + containers: + - name: ray-node + imagePullPolicy: Never + image: skypilot:latest + command: ["/bin/bash", "-c", "--"] + args: ["trap : TERM INT; sleep infinity & wait;"] + lifecycle: + postStart: + exec: + command: ["/bin/bash", "-c", "mkdir -p ~/.ssh && cp /etc/secret-volume/ssh-publickey ~/.ssh/authorized_keys && sudo service ssh restart"] + ports: + - containerPort: 22 # Used for SSH + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumeMounts: + - name: secret-volume + readOnly: true + mountPath: "/etc/secret-volume" + - mountPath: /dev/shm + name: dshm + resources: + requests: + cpu: 1000m + memory: 1024Mi + limits: + # The maximum memory that this pod is allowed to use. The + # limit will be detected by ray and split to use 10% for + # redis, 30% for the shared memory object store, and the + # rest for application memory. If this limit is not set and + # the object store size is not set manually, ray will + # allocate a very large object store in each pod that may + # cause problems for other pods. + memory: 1024Mi + head_node: + node_config: + apiVersion: v1 + kind: Pod + metadata: + # Automatically generates a name for the pod with this prefix. + generateName: example-cluster-ray-head- + # Must match the head node service selector above if a head node + # service is required. + labels: + parent: skypilot + component: example-cluster-ray-head + spec: + # Change this if you altered the autoscaler_service_account above + # or want to provide your own. + serviceAccountName: autoscaler + + restartPolicy: Never + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumes: + - name: secret-volume + secret: + secretName: ssh-key-secret + - name: dshm + emptyDir: + medium: Memory + containers: + - name: ray-node + imagePullPolicy: Never + image: skypilot:latest + # Do not change this command - it keeps the pod alive until it is + # explicitly killed. + command: ["/bin/bash", "-c", "--"] + args: ['trap : TERM INT; sleep infinity & wait;'] + ports: + - containerPort: 22 # Used for SSH + - containerPort: 6379 # Redis port + - containerPort: 10001 # Used by Ray Client + - containerPort: 8265 # Used by Ray Dashboard + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumeMounts: + - name: secret-volume + readOnly: true + mountPath: "/etc/secret-volume" + - mountPath: /dev/shm + name: dshm + lifecycle: + postStart: + exec: + command: ["/bin/bash", "-c", "mkdir -p ~/.ssh && cp /etc/secret-volume/ssh-publickey ~/.ssh/authorized_keys && sudo service ssh restart"] + resources: + requests: + cpu: 1000m + memory: 1024Mi + limits: + # The maximum memory that this pod is allowed to use. The + # limit will be detected by ray and split to use 10% for + # redis, 30% for the shared memory object store, and the + # rest for application memory. If this limit is not set and + # the object store size is not set manually, ray will + # allocate a very large object store in each pod that may + # cause problems for other pods. + memory: 1024Mi + +# Command to start ray on the head node. You don't need to change this. +# Note dashboard-host is set to 0.0.0.0 so that kubernetes can port forward. +head_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --head --port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host 0.0.0.0 --object-store-memory 78643201 + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-store-memory 78643201 + + +auth: + ssh_user: sky + ssh_private_key: ~/.ssh/sky-key + +# These fields are required for external cloud providers. +setup_commands: [] +head_setup_commands: [] +worker_setup_commands: [] +cluster_synced_files: [] +file_mounts_sync_continuously: False +file_mounts: {} +initialization_commands: [] diff --git a/tests/kubernetes/scripts/run.sh b/tests/kubernetes/scripts/run.sh new file mode 100644 index 00000000000..d61b2442274 --- /dev/null +++ b/tests/kubernetes/scripts/run.sh @@ -0,0 +1,6 @@ +kubectl create secret generic ssh-key-secret --from-file=ssh-publickey=/Users/romilb/.ssh/sky-key.pub +kubectl apply -f skypilot_ssh_k8s_deployment.yaml +# Use kubectl describe service skypilot-service to get the port of the service +kubectl describe service skypilot-service | grep NodePort +echo Run the following command to ssh into the container: +echo ssh sky@127.0.0.1 -p port -i ~/.ssh/sky-key \ No newline at end of file diff --git a/tests/kubernetes/scripts/skypilot_ssh_k8s_deployment.yaml b/tests/kubernetes/scripts/skypilot_ssh_k8s_deployment.yaml new file mode 100644 index 00000000000..8929a916d2a --- /dev/null +++ b/tests/kubernetes/scripts/skypilot_ssh_k8s_deployment.yaml @@ -0,0 +1,54 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: skypilot-deployment + labels: + app: skypilot +spec: + replicas: 1 + selector: + matchLabels: + app: skypilot + template: + metadata: + labels: + app: skypilot + spec: + volumes: + - name: secret-volume + secret: + secretName: ssh-key-secret + containers: + - name: skypilot + image: us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/skypilot:latest + imagePullPolicy: Never + env: + - name: SECRET_THING + valueFrom: + secretKeyRef: + name: ssh-key-secret + key: ssh-publickey + ports: + - containerPort: 22 + command: ["/bin/bash", "-c", "sleep 1000000000"] + volumeMounts: + - name: secret-volume + readOnly: true + mountPath: "/etc/secret-volume" + lifecycle: + postStart: + exec: + command: ["/bin/bash", "-c", "mkdir -p ~/.ssh && cp /etc/secret-volume/ssh-publickey ~/.ssh/authorized_keys && sudo service ssh restart"] +--- +apiVersion: v1 +kind: Service +metadata: + name: skypilot-service +spec: + type: NodePort + selector: + app: skypilot + ports: + - protocol: TCP + port: 22 + targetPort: 22 diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 89b2626abba..cdd7f0032e3 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -638,6 +638,7 @@ def test_image_no_conda(): # ------------ Test stale job ------------ @pytest.mark.no_lambda_cloud # Lambda Cloud does not support stopping instances +@pytest.mark.no_kubernetes # Kubernetes does not support stopping instances def test_stale_job(generic_cloud: str): name = _get_cluster_name() test = Test( @@ -716,6 +717,7 @@ def test_gcp_stale_job_manual_restart(): # ---------- Check Sky's environment variables; workdir. ---------- @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet +@pytest.mark.no_kubernetes # K8s does not support num_nodes > 1 yet def test_env_check(generic_cloud: str): name = _get_cluster_name() test = Test( @@ -733,9 +735,15 @@ def test_env_check(generic_cloud: str): @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet. Run test_scp_file_mounts instead. def test_file_mounts(generic_cloud: str): name = _get_cluster_name() + extra_flags = '' + if generic_cloud in 'kubernetes': + # Kubernetes does not support multi-node + # NOTE: This test will fail if you have a Kubernetes cluster running on + # arm64 (e.g., Apple Silicon) since goofys does not work on arm64. + extra_flags = '--num-nodes 1' test_commands = [ *storage_setup_commands, - f'sky launch -y -c {name} --cloud {generic_cloud} examples/using_file_mounts.yaml', + f'sky launch -y -c {name} --cloud {generic_cloud} {extra_flags} examples/using_file_mounts.yaml', f'sky logs {name} 1 --status', # Ensure the job succeeded. ] test = Test( @@ -792,7 +800,7 @@ def test_aws_storage_mounts(): name = _get_cluster_name() storage_name = f'sky-test-{int(time.time())}' template_str = pathlib.Path( - 'tests/test_yamls/test_storage_mounting.yaml').read_text() + 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() template = jinja2.Template(template_str) content = template.render(storage_name=storage_name) with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: @@ -819,7 +827,7 @@ def test_gcp_storage_mounts(): name = _get_cluster_name() storage_name = f'sky-test-{int(time.time())}' template_str = pathlib.Path( - 'tests/test_yamls/test_storage_mounting.yaml').read_text() + 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() template = jinja2.Template(template_str) content = template.render(storage_name=storage_name) with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: @@ -841,6 +849,36 @@ def test_gcp_storage_mounts(): run_one_test(test) +@pytest.mark.kubernetes +def test_kubernetes_storage_mounts(): + # Tests bucket mounting on k8s, assuming S3 is configured. + # This test will fail if run on non x86_64 architecture, since goofys is + # built for x86_64 only. + name = _get_cluster_name() + storage_name = f'sky-test-{int(time.time())}' + template_str = pathlib.Path( + 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() + template = jinja2.Template(template_str) + content = template.render(storage_name=storage_name) + with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: + f.write(content) + f.flush() + file_path = f.name + test_commands = [ + *storage_setup_commands, + f'sky launch -y -c {name} --cloud kubernetes {file_path}', + f'sky logs {name} 1 --status', # Ensure job succeeded. + f'aws s3 ls {storage_name}/hello.txt', + ] + test = Test( + 'kubernetes_storage_mounts', + test_commands, + f'sky down -y {name}; sky storage delete {storage_name}', + timeout=20 * 60, # 20 mins + ) + run_one_test(test) + + @pytest.mark.cloudflare def test_cloudflare_storage_mounts(generic_cloud: str): name = _get_cluster_name() @@ -903,11 +941,15 @@ def test_ibm_storage_mounts(): @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet. Run test_scp_logs instead. def test_cli_logs(generic_cloud: str): name = _get_cluster_name() + num_nodes = 2 + if generic_cloud == 'kubernetes': + # Kubernetes does not support multi-node + num_nodes = 1 timestamp = time.time() test = Test( 'cli_logs', [ - f'sky launch -y -c {name} --cloud {generic_cloud} --num-nodes 2 "echo {timestamp} 1"', + f'sky launch -y -c {name} --cloud {generic_cloud} --num-nodes {num_nodes} "echo {timestamp} 1"', f'sky exec {name} "echo {timestamp} 2"', f'sky exec {name} "echo {timestamp} 3"', f'sky exec {name} "echo {timestamp} 4"', @@ -949,6 +991,7 @@ def test_scp_logs(): @pytest.mark.no_ibm # IBM Cloud does not have K80 gpus. run test_ibm_job_queue instead @pytest.mark.no_scp # SCP does not have K80 gpus. Run test_scp_job_queue instead @pytest.mark.no_oci # OCI does not have K80 gpus +@pytest.mark.no_kubernetes # Kubernetes not have gpus def test_job_queue(generic_cloud: str): name = _get_cluster_name() test = Test( @@ -1050,6 +1093,7 @@ def test_scp_job_queue(): @pytest.mark.no_ibm # IBM Cloud does not have T4 gpus. run test_ibm_job_queue_multinode instead @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet @pytest.mark.no_oci # OCI Cloud does not have T4 gpus. +@pytest.mark.no_kubernetes # Kubernetes not have gpus def test_job_queue_multinode(generic_cloud: str): name = _get_cluster_name() test = Test( @@ -1092,7 +1136,7 @@ def test_large_job_queue(generic_cloud: str): test = Test( 'large_job_queue', [ - f'sky launch -y -c {name} --cloud {generic_cloud}', + f'sky launch -y -c {name} --cpus 8 --cloud {generic_cloud}', f'for i in `seq 1 75`; do sky exec {name} -n {name}-$i -d "echo $i; sleep 100000000"; done', f'sky cancel -y {name} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16', 'sleep 75', @@ -1136,7 +1180,7 @@ def test_fast_large_job_queue(generic_cloud: str): test = Test( 'fast_large_job_queue', [ - f'sky launch -y -c {name} --cloud {generic_cloud}', + f'sky launch -y -c {name} --cpus 8 --cloud {generic_cloud}', f'for i in `seq 1 32`; do sky exec {name} -n {name}-$i -d "echo $i"; done', 'sleep 60', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep -v grep | grep SUCCEEDED | wc -l | grep 32', @@ -1191,6 +1235,7 @@ def test_ibm_job_queue_multinode(): @pytest.mark.no_ibm # IBM Cloud does not have K80 gpus @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet @pytest.mark.no_oci # OCI Cloud does not have K80 gpus +@pytest.mark.no_kubernetes # Kubernetes not have gpus def test_multi_echo(generic_cloud: str): name = _get_cluster_name() test = Test( @@ -1214,6 +1259,7 @@ def test_multi_echo(generic_cloud: str): @pytest.mark.no_lambda_cloud # Lambda Cloud does not have V100 gpus @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA @pytest.mark.no_scp # SCP does not have V100 (16GB) GPUs. Run test_scp_huggingface instead. +@pytest.mark.no_kubernetes # Kubernetes not have gpus def test_huggingface(generic_cloud: str): name = _get_cluster_name() test = Test( @@ -1324,6 +1370,7 @@ def test_tpu_vm_pod(): # ---------- Simple apps. ---------- @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet +@pytest.mark.no_kubernetes # Kubernetes does not support num_nodes > 1 node yet def test_multi_hostname(generic_cloud: str): name = _get_cluster_name() test = Test( @@ -1378,6 +1425,7 @@ def test_aws_http_server_with_custom_ports(): @pytest.mark.no_lambda_cloud # Lambda Cloud does not have V100 gpus @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet +@pytest.mark.no_kubernetes # Kubernetes does not support num_nodes > 1 node yet @pytest.mark.skip( reason= 'The resnet_distributed_tf_app is flaky, due to it failing to detect GPUs.') @@ -1451,6 +1499,7 @@ def test_azure_start_stop(): @pytest.mark.no_lambda_cloud # Lambda Cloud does not support stopping instances @pytest.mark.no_ibm # FIX(IBM) sporadically fails, as restarted workers stay uninitialized indefinitely @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet +@pytest.mark.no_kubernetes # Kubernetes does not autostop yet def test_autostop(generic_cloud: str): name = _get_cluster_name() test = Test( @@ -1507,6 +1556,7 @@ def test_autostop(generic_cloud: str): # ---------- Testing Autodowning ---------- @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet. Run test_scp_autodown instead. +@pytest.mark.no_kubernetes # Kubernetes does not support num_nodes > 1 yet. Run test_scp_kubernetes instead. def test_autodown(generic_cloud: str): name = _get_cluster_name() test = Test( @@ -1576,6 +1626,41 @@ def test_scp_autodown(): run_one_test(test) +@pytest.mark.kubernetes +def test_kubernetes_autodown(): + name = _get_cluster_name() + test = Test( + 'kubernetes_autodown', + [ + f'sky launch -y -d -c {name} --cloud kubernetes tests/test_yamls/minimal.yaml', + f'sky autostop -y {name} --down -i 1', + # Ensure autostop is set. + f'sky status | grep {name} | grep "1m (down)"', + # Ensure the cluster is not terminated early. + 'sleep 45', + f'sky status --refresh | grep {name} | grep UP', + # Ensure the cluster is terminated. + 'sleep 200', + f's=$(SKYPILOT_DEBUG=0 sky status --refresh) && printf "$s" && {{ echo "$s" | grep {name} | grep "Autodowned cluster\|terminated on the cloud"; }} || {{ echo "$s" | grep {name} && exit 1 || exit 0; }}', + f'sky launch -y -d -c {name} --cloud kubernetes --down tests/test_yamls/minimal.yaml', + f'sky status | grep {name} | grep UP', # Ensure the cluster is UP. + f'sky exec {name} --cloud kubernetes tests/test_yamls/minimal.yaml', + f'sky status | grep {name} | grep "1m (down)"', + 'sleep 200', + # Ensure the cluster is terminated. + f's=$(SKYPILOT_DEBUG=0 sky status --refresh) && printf "$s" && {{ echo "$s" | grep {name} | grep "Autodowned cluster\|terminated on the cloud"; }} || {{ echo "$s" | grep {name} && exit 1 || exit 0; }}', + f'sky launch -y -d -c {name} --cloud kubernetes --down tests/test_yamls/minimal.yaml', + f'sky autostop -y {name} --cancel', + 'sleep 200', + # Ensure the cluster is still UP. + f's=$(SKYPILOT_DEBUG=0 sky status --refresh) && printf "$s" && echo "$s" | grep {name} | grep UP', + ], + f'sky down -y {name}', + timeout=25 * 60, + ) + run_one_test(test) + + def _get_cancel_task_with_cloud(name, cloud, timeout=15 * 60): test = Test( f'{cloud}-cancel-task', @@ -1628,6 +1713,7 @@ def test_cancel_azure(): @pytest.mark.no_lambda_cloud # Lambda Cloud does not have V100 gpus @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet +@pytest.mark.no_kubernetes # Kubernetes does not support GPU yet def test_cancel_pytorch(generic_cloud: str): name = _get_cluster_name() test = Test( @@ -1676,6 +1762,7 @@ def test_cancel_ibm(): @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @pytest.mark.no_scp # SCP does not support spot instances +@pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances def test_use_spot(generic_cloud: str): """Test use-spot and sky exec.""" name = _get_cluster_name() @@ -1696,6 +1783,7 @@ def test_use_spot(generic_cloud: str): @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @pytest.mark.no_scp # SCP does not support spot instances +@pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances @pytest.mark.managed_spot def test_spot(generic_cloud: str): """Test the spot yaml.""" @@ -1728,6 +1816,7 @@ def test_spot(generic_cloud: str): @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @pytest.mark.no_scp # SCP does not support spot instances +@pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances @pytest.mark.managed_spot def test_spot_pipeline(generic_cloud: str): """Test a spot pipeline.""" @@ -1766,6 +1855,7 @@ def test_spot_pipeline(generic_cloud: str): @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @pytest.mark.no_scp # SCP does not support spot instances +@pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances @pytest.mark.managed_spot def test_spot_failed_setup(generic_cloud: str): """Test managed spot job with failed setup.""" @@ -1788,6 +1878,7 @@ def test_spot_failed_setup(generic_cloud: str): @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @pytest.mark.no_scp # SCP does not support spot instances +@pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances @pytest.mark.managed_spot def test_spot_pipeline_failed_setup(generic_cloud: str): """Test managed spot job with failed setup for a pipeline.""" @@ -1966,6 +2057,7 @@ def test_spot_pipeline_recovery_gcp(): @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @pytest.mark.no_scp # SCP does not support spot instances +@pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances @pytest.mark.managed_spot def test_spot_recovery_default_resources(generic_cloud: str): """Test managed spot recovery for default resources.""" @@ -2172,6 +2264,7 @@ def test_spot_cancellation_gcp(): @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @pytest.mark.no_scp # SCP does not support spot instances +@pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances @pytest.mark.managed_spot def test_spot_storage(generic_cloud: str): """Test storage with managed spot""" @@ -2226,6 +2319,7 @@ def test_spot_tpu(): @pytest.mark.no_lambda_cloud # Lambda Cloud does not support spot instances @pytest.mark.no_ibm # IBM Cloud does not support spot instances @pytest.mark.no_scp # SCP does not support spot instances +@pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances @pytest.mark.managed_spot def test_spot_inline_env(generic_cloud: str): """Test spot env""" @@ -2441,12 +2535,13 @@ def test_gcp_zero_quota_failover(): # ------- Testing user ray cluster -------- -def test_user_ray_cluster(): +@pytest.mark.no_kubernetes # Kubernetes does not support sky status -r yet. +def test_user_ray_cluster(generic_cloud: str): name = _get_cluster_name() test = Test( 'user-ray-cluster', [ - f'sky launch -y -c {name} "ray start --head"', + f'sky launch -y -c {name} --cloud {generic_cloud} "ray start --head"', f'sky exec {name} "echo hi"', f'sky logs {name} 1 --status', f'sky status -r | grep {name} | grep UP', diff --git a/tests/test_yamls/test_storage_mounting.yaml b/tests/test_yamls/test_storage_mounting.yaml.j2 similarity index 100% rename from tests/test_yamls/test_storage_mounting.yaml rename to tests/test_yamls/test_storage_mounting.yaml.j2