Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement fleet apply plan #1765

Merged
merged 8 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/dstack/_internal/cli/commands/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,21 @@ def _register(self):
help="The path to the configuration file. Defaults to [code]$PWD/.dstack.yml[/]",
dest="configuration_file",
)
self._parser.add_argument(
"-y",
"--yes",
help="Do not ask for confirmation",
action="store_true",
)
self._parser.add_argument(
"--force",
help="Force apply when no changes detected",
action="store_true",
)
self._parser.add_argument(
"-y",
"--yes",
help="Do not ask for confirmation",
"-d",
"--detach",
help="Exit immediately after sumbitting configuration",
action="store_true",
)

Expand Down
273 changes: 223 additions & 50 deletions src/dstack/_internal/cli/services/configurators/fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,34 @@
from pathlib import Path
from typing import List, Optional

import requests
from rich.live import Live
from rich.table import Table

from dstack._internal.cli.services.configurators.base import (
ApplyEnvVarsConfiguratorMixin,
BaseApplyConfigurator,
)
from dstack._internal.cli.utils.common import confirm_ask, console
from dstack._internal.cli.utils.fleet import print_fleets_table
from dstack._internal.cli.utils.common import (
LIVE_TABLE_PROVISION_INTERVAL_SECS,
LIVE_TABLE_REFRESH_RATE_PER_SEC,
confirm_ask,
console,
)
from dstack._internal.cli.utils.fleet import get_fleets_table
from dstack._internal.core.errors import ConfigurationError, ResourceNotExistsError
from dstack._internal.core.models.configurations import ApplyConfigurationType
from dstack._internal.core.models.fleets import FleetConfiguration, FleetSpec
from dstack._internal.core.models.instances import SSHKey
from dstack._internal.core.models.fleets import (
Fleet,
FleetConfiguration,
FleetPlan,
FleetSpec,
InstanceGroupPlacement,
)
from dstack._internal.core.models.instances import InstanceAvailability, InstanceStatus, SSHKey
from dstack._internal.utils.logging import get_logger
from dstack._internal.utils.ssh import convert_ssh_key_to_pem, generate_public_key, pkey_from_str
from dstack.api._public import Client
from dstack.api.utils import load_profile

logger = get_logger(__name__)
Expand All @@ -39,63 +55,85 @@ def apply_configuration(
profile = load_profile(Path.cwd(), None)
spec = FleetSpec(
configuration=conf,
configuration_path=configuration_path,
profile=profile,
)
_preprocess_spec(spec)
confirmed = False
if conf.name is not None:
try:
fleet = self.api.client.fleets.get(
project_name=self.api.project,
name=conf.name,

with console.status("Getting apply plan..."):
plan = _get_plan(api=self.api, spec=spec)
_print_plan_header(plan)

action_message = ""
confirm_message = ""
if plan.current_resource is None:
if plan.spec.configuration.name is not None:
action_message += (
f"Fleet [code]{plan.spec.configuration.name}[/] does not exist yet."
)
except ResourceNotExistsError:
pass
else:
if fleet.spec.configuration == conf:
if not command_args.force:
console.print(
"Fleet configuration has not changed. Use --force to recreate the fleet."
)
return
if not command_args.yes and not confirm_ask(
"Fleet configuration has not changed. Re-create the fleet?"
):
console.print("\nExiting...")
return
elif not command_args.yes and not confirm_ask(
f"Fleet [code]{conf.name}[/] already exists. Re-create the fleet?"
):
console.print("\nExiting...")
confirm_message += "Create the fleet?"
else:
action_message += f"Found fleet [code]{plan.spec.configuration.name}[/]."
if plan.current_resource.spec == plan.spec:
if command_args.yes and not command_args.force:
# --force is required only with --yes,
# otherwise we may ask for force apply interactively.
console.print(
"No configuration changes detected. Use --force to apply anyway."
)
return
confirmed = True
with console.status("Deleting fleet..."):
self.api.client.fleets.delete(project_name=self.api.project, names=[conf.name])
# Fleet deletion is async. Wait for fleet to be deleted.
while True:
try:
self.api.client.fleets.get(
project_name=self.api.project, name=conf.name
)
except ResourceNotExistsError:
break
else:
time.sleep(1)
if not confirmed and not command_args.yes:
confirm_message = "Configuration does not specify the fleet name. Create a new fleet?"
if conf.name is not None:
confirm_message = (
f"Fleet [code]{conf.name}[/] does not exist yet. Create the fleet?"
action_message += " No configuration changes detected."
confirm_message += "Re-create the fleet?"
else:
action_message += " Configuration changes detected."
confirm_message += "Re-create the fleet?"

console.print(action_message)
if not command_args.yes and not confirm_ask(confirm_message):
console.print("\nExiting...")
return

if plan.current_resource is not None:
with console.status("Deleting existing fleet..."):
self.api.client.fleets.delete(
project_name=self.api.project, names=[plan.current_resource.name]
)
if not confirm_ask(confirm_message):
console.print("\nExiting...")
return
# Fleet deletion is async. Wait for fleet to be deleted.
while True:
try:
self.api.client.fleets.get(
project_name=self.api.project, name=plan.current_resource.name
)
except ResourceNotExistsError:
break
else:
time.sleep(1)

with console.status("Creating fleet..."):
fleet = self.api.client.fleets.create(
project_name=self.api.project,
spec=spec,
)
print_fleets_table([fleet])
if command_args.detach:
console.print("Fleet configuration submitted. Exiting...")
return
console.print()
try:
with Live(console=console, refresh_per_second=LIVE_TABLE_REFRESH_RATE_PER_SEC) as live:
while True:
live.update(get_fleets_table([fleet], verbose=True))
if _finished_provisioning(fleet):
break
time.sleep(LIVE_TABLE_PROVISION_INTERVAL_SECS)
fleet = self.api.client.fleets.get(self.api.project, fleet.name)
except KeyboardInterrupt:
if confirm_ask("Delete the fleet before exiting?"):
with console.status("Deleting fleet..."):
self.api.client.fleets.delete(
project_name=self.api.project, names=[fleet.name]
)
else:
console.print("Exiting... Fleet provisioning will continue in the background.")

def delete_configuration(
self,
Expand Down Expand Up @@ -168,3 +206,138 @@ def _resolve_ssh_key(ssh_key_path: Optional[str]) -> Optional[SSHKey]:
logger.debug("Key type is not supported", repr(e))
console.print("[error]Key type is not supported[/]")
exit()


def _print_plan_header(plan: FleetPlan):
def th(s: str) -> str:
return f"[bold]{s}[/bold]"

configuration_table = Table(box=None, show_header=False)
configuration_table.add_column(no_wrap=True) # key
configuration_table.add_column() # value

configuration_table.add_row(th("Project"), plan.project_name)
configuration_table.add_row(th("User"), plan.user)
configuration_table.add_row(th("Configuration"), plan.spec.configuration_path)
configuration_table.add_row(th("Type"), "fleet")

fleet_type = "cloud"
nodes = plan.spec.configuration.nodes or "-"
placement = plan.spec.configuration.placement or InstanceGroupPlacement.ANY
backends = None
if plan.spec.configuration.backends is not None:
backends = ", ".join(b.value for b in plan.spec.configuration.backends)
regions = None
if plan.spec.configuration.regions is not None:
regions = ", ".join(plan.spec.configuration.regions)
resources = None
if plan.spec.configuration.resources is not None:
resources = plan.spec.configuration.resources.pretty_format()
spot_policy = plan.spec.merged_profile.spot_policy
if plan.spec.configuration.ssh_config is not None:
fleet_type = "ssh"
nodes = len(plan.spec.configuration.ssh_config.hosts)
resources = None
spot_policy = None

configuration_table.add_row(th("Fleet type"), fleet_type)
configuration_table.add_row(th("Nodes"), str(nodes))
configuration_table.add_row(th("Placement"), placement.value)
if backends is not None:
configuration_table.add_row(th("Backends"), str(backends))
if regions is not None:
configuration_table.add_row(th("Regions"), str(regions))
if resources is not None:
configuration_table.add_row(th("Resources"), resources)
if spot_policy is not None:
configuration_table.add_row(th("Spot policy"), spot_policy)

offers_table = Table(box=None)
offers_table.add_column("#")
offers_table.add_column("BACKEND")
offers_table.add_column("REGION")
offers_table.add_column("INSTANCE")
offers_table.add_column("RESOURCES")
offers_table.add_column("SPOT")
offers_table.add_column("PRICE")
offers_table.add_column()

offers_limit = 3
print_offers = plan.offers[:offers_limit]

for index, offer in enumerate(print_offers, start=1):
resources = offer.instance.resources

availability = ""
if offer.availability in {
InstanceAvailability.NOT_AVAILABLE,
InstanceAvailability.NO_QUOTA,
}:
availability = offer.availability.value.replace("_", " ").title()
offers_table.add_row(
f"{index}",
offer.backend.replace("remote", "ssh"),
offer.region,
offer.instance.name,
resources.pretty_format(),
"yes" if resources.spot else "no",
f"${offer.price:g}",
availability,
style=None if index == 1 else "secondary",
)
if len(plan.offers) > offers_limit:
offers_table.add_row("", "...", style="secondary")

console.print(configuration_table)
console.print()

if len(print_offers) > 0:
console.print(offers_table)
if len(plan.offers) > offers_limit:
console.print(
f"[secondary] Shown {len(print_offers)} of {plan.total_offers} offers, "
f"${plan.max_offer_price:g} max[/]"
)
console.print()


def _get_plan(api: Client, spec: FleetSpec) -> FleetPlan:
try:
return api.client.fleets.get_plan(
project_name=api.project,
spec=spec,
)
except requests.exceptions.HTTPError as e:
# Handle older server versions that do not have /get_plan for fleets
# TODO: Can be removed in 0.19
if e.response.status_code == 405:
logger.warning(
"Fleet apply plan is not fully supported before 0.18.17. "
"Update the server to view full-featured apply plan."
)
spec.configuration_path = None
current_fleet = None
if spec.configuration.name is not None:
try:
current_fleet = api.client.fleets.get(
project_name=api.project, name=spec.configuration.name
)
except ResourceNotExistsError:
pass
return FleetPlan(
project_name=api.project,
user="?",
spec=spec,
current_resource=current_fleet,
offers=[],
total_offers=0,
max_offer_price=0,
)
raise e


def _finished_provisioning(fleet: Fleet) -> bool:
for instance in fleet.instances:
if instance.status in [InstanceStatus.PENDING, InstanceStatus.PROVISIONING]:
return False
return True
12 changes: 3 additions & 9 deletions src/dstack/_internal/cli/services/configurators/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def apply_configuration(
repo_config = ConfigManager().get_repo_config_or_error(repo.get_repo_dir_or_error())
self.api.ssh_identity_file = repo_config.ssh_key_path
profile = load_profile(Path.cwd(), configurator_args.profile)
with console.status("Getting run plan..."):
with console.status("Getting apply plan..."):
run_plan = self.api.runs.get_plan(
configuration=conf,
repo=repo,
Expand Down Expand Up @@ -102,12 +102,12 @@ def apply_configuration(
try:
with console.status("Submitting run..."):
run = self.api.runs.exec_plan(
run_plan, repo, reserve_ports=not configurator_args.detach
run_plan, repo, reserve_ports=not command_args.detach
)
except ServerClientError as e:
raise CLIError(e.msg)

if configurator_args.detach:
if command_args.detach:
console.print(f"Run [code]{run.name}[/] submitted, detaching...")
return

Expand Down Expand Up @@ -230,12 +230,6 @@ def register_args(cls, parser: argparse.ArgumentParser):
dest="run_name",
help="The name of the run. If not specified, a random name is assigned",
)
parser.add_argument(
"-d",
"--detach",
help="Do not poll logs and run status",
action="store_true",
)
parser.add_argument(
"--max-offers",
help="Number of offers to show in the run plan",
Expand Down
6 changes: 3 additions & 3 deletions src/dstack/_internal/cli/utils/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ def print_run_plan(run_plan: RunPlan, offers_limit: int = 3):
def th(s: str) -> str:
return f"[bold]{s}[/bold]"

props.add_row(th("Configuration"), run_plan.run_spec.configuration_path)
props.add_row(th("Project"), run_plan.project_name)
props.add_row(th("User"), run_plan.user)
props.add_row(th("Pool"), profile.pool_name)
props.add_row(th("Min resources"), pretty_req)
props.add_row(th("Configuration"), run_plan.run_spec.configuration_path)
props.add_row(th("Type"), run_plan.run_spec.configuration.type)
props.add_row(th("Resources"), pretty_req)
props.add_row(th("Max price"), max_price)
props.add_row(th("Max duration"), max_duration)
props.add_row(th("Spot policy"), spot_policy)
Expand Down
Loading