Skip to content

Commit

Permalink
Merge branch 'main' into ci/enable_azure
Browse files Browse the repository at this point in the history
  • Loading branch information
parasj committed Sep 10, 2022
2 parents 4a4ee44 + 3767b2e commit 8a30e2f
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 6 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ You can use Skyplane to transfer data:
* Between object stores across multiple cloud providers
* (experimental) Between local storage and cloud object stores

Skyplane currently only supports MacOS and Linux. For Windows, first [install Linux on Windows with WSL](https://docs.microsoft.com/en-us/windows/wsl/install) to run Skyplane.

# Getting started

## Installation
Expand Down
16 changes: 16 additions & 0 deletions docs/configure.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ Skyplane comes with a variety of knobs to tune to adjust performance or change h
* CLI configuration
* `autoconfirm`: If set, it will not ask for you to confirm the transfers from the CLI. (default False)
* `autoshutdown_minutes`: If set, VMs will automatically shut down after this time in minutes. (default 15)
* `usage_stats`: If set, Skyplane will send aggregate performance statistics for a collective throughput grid. (default True)
* Transfer parallelism
* `max_instances`: Maximum number of instances to use for parallel transfers. (default 10)
* Network configuration
* `bbr`: If set, the VM will use BBR congestion control instead of CUBIC. (default False)
* `compress`: If set, gateway VMs will compress data before egress to reduce costs. (default True)
* `encrypt_e2e`: If set, gateway VMs will encrypt data end-to-end. (default True)
* `encrypt_socket_tls`: If set, all sockets between gateways will be encrypted with TLS. (default False)
Expand All @@ -21,9 +23,13 @@ Skyplane comes with a variety of knobs to tune to adjust performance or change h
* `multipart_max_chunks`: Maximum number of chunks for multipart transfers. (default 9990).
* Instance provisioning configuration
* `aws_instance_class`: AWS instance class to use for provisioning. (default m5.8xlarge)
* `aws_use_spot_instances`: If set, AWS will use spot instances instead of on-demand instances. (default False)
* `azure_instance_class`: Azure instance class to use for provisioning. (default Standard_D32_v4)
* `azure_use_spot_instances`: If set, Azure will use spot instances instead of on-demand instances. (default False)
* `gcp_instance_class`: GCP instance class to use for provisioning. (default n2-standard-32)
* `gcp_use_premium_network`: If set, will provision VMs on GCP's premium network tier. (default True)
* `gcp_service_account_name`: GCP service account name to use for provisioning. (default skyplane-manual)
* `gcp_use_spot_instances`: If set, GCP will use spot instances instead of on-demand instances. (default False)
```

## Increasing performance of transfers via paralllelism
Expand Down Expand Up @@ -62,6 +68,16 @@ To ensure that all gateways are stopped and no longer incur charges, run:
$ skyplane deprovision
```

## Spot Instances to reduce instance costs
Spot instances reduce the cost of provisioning VMs. These instances are charged at a lower price than on-demand instances but can be preempted at any time. If this occurs, the transfer will fail.

To use spot instances, run:
```bash
$ skyplane config set aws_use_spot_instances True
$ skyplane config set azure_use_spot_instances True
$ skyplane config set gcp_use_spot_instances True
```

## Configuring networking between gateways
Skyplane supports encrypting data end-to-end. This is useful for encrypting data that is stored on a local disk. We enable end-to-end encryption by default. To disable it, run:
```bash
Expand Down
6 changes: 6 additions & 0 deletions skyplane/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,12 @@ def cp(
use_e2ee=cloud_config.get_flag("encrypt_e2e") if src_region != dst_region else False,
use_socket_tls=cloud_config.get_flag("encrypt_socket_tls") if src_region != dst_region else False,
aws_instance_class=cloud_config.get_flag("aws_instance_class"),
aws_use_spot_instances=cloud_config.get_flag("aws_use_spot_instances"),
azure_instance_class=cloud_config.get_flag("azure_instance_class"),
azure_use_spot_instances=cloud_config.get_flag("azure_use_spot_instances"),
gcp_instance_class=cloud_config.get_flag("gcp_instance_class"),
gcp_use_premium_network=cloud_config.get_flag("gcp_use_premium_network"),
gcp_use_spot_instances=cloud_config.get_flag("gcp_use_spot_instances"),
multipart_enabled=multipart,
multipart_min_threshold_mb=cloud_config.get_flag("multipart_min_threshold_mb"),
multipart_min_size_mb=cloud_config.get_flag("multipart_min_size_mb"),
Expand Down Expand Up @@ -387,9 +390,12 @@ def sync(
use_e2ee=cloud_config.get_flag("encrypt_e2e") if src_region != dst_region else False,
use_socket_tls=cloud_config.get_flag("encrypt_socket_tls") if src_region != dst_region else False,
aws_instance_class=cloud_config.get_flag("aws_instance_class"),
aws_use_spot_instances=cloud_config.get_flag("aws_use_spot_instances"),
azure_instance_class=cloud_config.get_flag("azure_instance_class"),
azure_use_spot_instances=cloud_config.get_flag("azure_use_spot_instances"),
gcp_instance_class=cloud_config.get_flag("gcp_instance_class"),
gcp_use_premium_network=cloud_config.get_flag("gcp_use_premium_network"),
gcp_use_spot_instances=cloud_config.get_flag("gcp_use_spot_instances"),
multipart_enabled=multipart,
multipart_min_threshold_mb=cloud_config.get_flag("multipart_min_threshold_mb"),
multipart_min_size_mb=cloud_config.get_flag("multipart_min_size_mb"),
Expand Down
12 changes: 11 additions & 1 deletion skyplane/cli/cli_impl/cp_replicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,11 @@ def launch_replication_job(
multipart_min_size_mb: int = 8,
multipart_max_chunks: int = 9990,
# cloud provider specific options
aws_use_spot_instances: bool = False,
aws_instance_class: str = "m5.8xlarge",
azure_use_spot_instances: bool = False,
azure_instance_class: str = "Standard_D32_v4",
gcp_use_spot_instances: bool = False,
gcp_instance_class: str = "n2-standard-32",
gcp_use_premium_network: bool = True,
# logging options
Expand Down Expand Up @@ -289,7 +292,14 @@ def launch_replication_job(
stats = TransferStats.empty()
try:
rc.provision_gateways(
reuse_gateways, use_bbr=use_bbr, use_compression=use_compression, use_e2ee=use_e2ee, use_socket_tls=use_socket_tls
reuse_gateways,
use_bbr=use_bbr,
use_compression=use_compression,
use_e2ee=use_e2ee,
use_socket_tls=use_socket_tls,
aws_use_spot_instances=aws_use_spot_instances,
azure_use_spot_instances=azure_use_spot_instances,
gcp_use_spot_instances=gcp_use_spot_instances,
)
for node, gw in rc.bound_nodes.items():
logger.fs.info(f"Log URLs for {gw.uuid()} ({node.region}:{node.instance})")
Expand Down
6 changes: 6 additions & 0 deletions skyplane/compute/aws/aws_cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ def provision_instance(
tags={"skyplane": "true"},
ebs_volume_size: int = 128,
iam_name: str = "skyplane_gateway",
use_spot_instances: bool = False,
) -> AWSServer:

assert not region.startswith("aws:"), "Region should be AWS region"
Expand Down Expand Up @@ -373,6 +374,10 @@ def check_instance_profile():
wait_for(check_instance_profile, timeout=60, interval=0.5)

def start_instance(subnet_id: str):
if use_spot_instances:
market_options = {"MarketType": "spot"}
else:
market_options = {}
return ec2.create_instances(
ImageId="resolve:ssm:/aws/service/ecs/optimized-ami/amazon-linux-2/recommended/image_id",
InstanceType=instance_class,
Expand All @@ -399,6 +404,7 @@ def start_instance(subnet_id: str):
],
IamInstanceProfile={"Name": iam_instance_profile_name},
InstanceInitiatedShutdownBehavior="terminate",
InstanceMarketOptions=market_options,
)

backoff = 1
Expand Down
6 changes: 5 additions & 1 deletion skyplane/compute/azure/azure_cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ def set_up_resource_group(self, clean_up_orphans=True):

# This code, along with some code in azure_server.py, is based on
# https://github.com/ucbrise/mage-scripts/blob/main/azure_cloud.py.
def provision_instance(self, location: str, vm_size: str, name: Optional[str] = None, uname: str = "skyplane") -> AzureServer:
def provision_instance(
self, location: str, vm_size: str, name: Optional[str] = None, uname: str = "skyplane", use_spot_instances: bool = False
) -> AzureServer:
assert ":" not in location, "invalid colon in Azure location"

if name is None:
Expand Down Expand Up @@ -375,6 +377,8 @@ def provision_instance(self, location: str, vm_size: str, name: Optional[str] =
}
],
},
# use spot instances if use_spot_instances is set
"priority": "Spot" if use_spot_instances else "Regular",
},
)
vm_result = poller.result()
Expand Down
12 changes: 11 additions & 1 deletion skyplane/compute/gcp/gcp_cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,14 @@ def wait_for_operation_to_complete(self, zone, operation_name, timeout=120):
time.sleep(time_intervals.pop(0))

def provision_instance(
self, region, instance_class, name=None, premium_network=False, uname="skyplane", tags={"skyplane": "true"}
self,
region,
instance_class,
name=None,
premium_network=False,
uname="skyplane",
tags={"skyplane": "true"},
use_spot_instances: bool = False,
) -> GCPServer:
assert not region.startswith("gcp:"), "Region should be GCP region"
if name is None:
Expand Down Expand Up @@ -331,6 +338,9 @@ def provision_instance(
"scheduling": {"onHostMaintenance": "TERMINATE", "automaticRestart": False},
"deletionProtection": False,
}
# use preemtible instances if use_spot_instances is True
if use_spot_instances:
req_body["scheduling"]["preemptible"] = True
try:
result = compute.instances().insert(project=self.auth.project_id, zone=region, body=req_body).execute()
self.wait_for_operation_to_complete(region, result["name"])
Expand Down
6 changes: 6 additions & 0 deletions skyplane/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
"num_connections": int,
"max_instances": int,
"autoshutdown_minutes": int,
"aws_use_spot_instances": bool,
"azure_use_spot_instances": bool,
"gcp_use_spot_instances": bool,
"aws_instance_class": str,
"azure_instance_class": str,
"gcp_instance_class": str,
Expand All @@ -42,6 +45,9 @@
"num_connections": 32,
"max_instances": 1,
"autoshutdown_minutes": 15,
"aws_use_spot_instances": False,
"azure_use_spot_instances": False,
"gcp_use_spot_instances": False,
"aws_instance_class": "m5.8xlarge",
"azure_instance_class": "Standard_D32_v5",
"gcp_instance_class": "n2-standard-32",
Expand Down
14 changes: 11 additions & 3 deletions skyplane/replicate/replicator_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ def provision_gateways(
use_compression=True,
use_e2ee=True,
use_socket_tls=False,
aws_use_spot_instances: bool = False,
azure_use_spot_instances: bool = False,
gcp_use_spot_instances: bool = False,
):
regions_to_provision = [node.region for node in self.topology.gateway_nodes]
aws_regions_to_provision = [r for r in regions_to_provision if r.startswith("aws:")]
Expand Down Expand Up @@ -190,14 +193,19 @@ def provision_gateway_instance(region: str) -> Server:
provider, subregion = region.split(":")
if provider == "aws":
assert self.aws.auth.enabled()
server = self.aws.provision_instance(subregion, self.aws_instance_class)
server = self.aws.provision_instance(subregion, self.aws_instance_class, use_spot_instances=aws_use_spot_instances)
elif provider == "azure":
assert self.azure.auth.enabled()
server = self.azure.provision_instance(subregion, self.azure_instance_class)
server = self.azure.provision_instance(subregion, self.azure_instance_class, use_spot_instances=azure_use_spot_instances)
elif provider == "gcp":
assert self.gcp.auth.enabled()
# todo specify network tier in ReplicationTopology
server = self.gcp.provision_instance(subregion, self.gcp_instance_class, premium_network=self.gcp_use_premium_network)
server = self.gcp.provision_instance(
subregion,
self.gcp_instance_class,
premium_network=self.gcp_use_premium_network,
use_spot_instances=gcp_use_spot_instances,
)
else:
raise NotImplementedError(f"Unknown provider {provider}")
server.enable_auto_shutdown()
Expand Down

0 comments on commit 8a30e2f

Please sign in to comment.