Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support private versus public IPs for gateways #817

Merged
merged 171 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
171 commits
Select commit Hold shift + click to select a range
e66f065
Add Broadcast client API interface (#675)
lynnliu030 Nov 29, 2022
8b842c1
add bucket replication script for experiments
sarahwooders Nov 29, 2022
d6a5a9a
fix region logic
sarahwooders Dec 1, 2022
e15d8fb
merge
sarahwooders Dec 1, 2022
dfecc9c
add log copying
sarahwooders Dec 2, 2022
5881d45
update transfer cost grid
lynnliu030 Dec 4, 2022
9f9c4dd
switch batch to recursive
sarahwooders Dec 5, 2022
771f4dd
Merge branch 'broadcast' of github.com:skyplane-project/skyplane into…
sarahwooders Dec 5, 2022
43a9514
change the upload ids
lynnliu030 Dec 6, 2022
0133f32
put
lynnliu030 Dec 7, 2022
0da6d43
changes for broadcast multipart
lynnliu030 Dec 8, 2022
59b4176
Mux_and fix (#718)
sarahwooders Dec 8, 2022
99f584d
couple gateawy fixes
sarahwooders Dec 8, 2022
d785084
just return 0
sarahwooders Dec 8, 2022
d6f7065
fix bc verification
lynnliu030 Dec 8, 2022
1093d5f
print per-dst remaining bytes
lynnliu030 Dec 8, 2022
8fb9a72
fix obj store wait time
sarahwooders Dec 8, 2022
689386b
Merge branch 'multipart' of github.com:skyplane-project/skyplane into…
sarahwooders Dec 8, 2022
9c56692
assert completed = file deleted
sarahwooders Dec 8, 2022
b5350d0
minor fix
sarahwooders Dec 8, 2022
b4eb68d
accidentally deleted line
sarahwooders Dec 9, 2022
28ca507
fix issue with terminal operators
sarahwooders Dec 9, 2022
c7e56da
update tput profile & ILP
lynnliu030 Dec 9, 2022
651fb97
update run config
lynnliu030 Dec 9, 2022
3590de7
reduce client parallism
sarahwooders Dec 9, 2022
aa83a89
count num processes
sarahwooders Dec 10, 2022
fb460b6
add gw programs
lynnliu030 Dec 10, 2022
bbbfd30
add back print
sarahwooders Dec 10, 2022
207b8f4
modify gateway program processing on gateway side
sarahwooders Dec 10, 2022
5a798f7
working 5 dest transfer
sarahwooders Dec 11, 2022
c176230
broadcast random
lynnliu030 Dec 11, 2022
b5e3d18
add more regions
sarahwooders Dec 11, 2022
04b84f0
increase queue size
sarahwooders Dec 11, 2022
9befa07
merge
lynnliu030 Dec 12, 2022
eb6772e
change regions
sarahwooders Dec 12, 2022
02882c6
fix process counting
sarahwooders Dec 12, 2022
5f22237
reduce # of connections
lynnliu030 Dec 12, 2022
5a6cfe6
lower # of connections
lynnliu030 Dec 12, 2022
4db2767
fix error
sarahwooders Dec 12, 2022
de19866
Merge branch 'multipart' of github.com:skyplane-project/skyplane into…
sarahwooders Dec 12, 2022
7c43ab2
add banned nodes
sarahwooders Dec 12, 2022
3104803
Filter out specific regions and fix ILP (#723)
sarahwooders Dec 13, 2022
a47c341
merge
lynnliu030 Dec 13, 2022
319756b
remove multiplication
lynnliu030 Dec 13, 2022
b0c6ac8
remove multiplication
lynnliu030 Dec 13, 2022
4a5cdc4
Check gbyte_to_transfer
parasj Dec 13, 2022
2682608
script with aws/gcp/azure
lynnliu030 Dec 13, 2022
cb7b514
update num_vms for iterative ILP
lynnliu030 Dec 13, 2022
3aa336a
Merge remote-tracking branch 'origin/main' into multipart
parasj Dec 13, 2022
ec3d9c5
update aws script
lynnliu030 Dec 13, 2022
4e7554e
reset queue sizes
sarahwooders Dec 13, 2022
0f686c2
merge
sarahwooders Dec 13, 2022
ca4115f
Increase retry pool by default
parasj Dec 13, 2022
d1c956f
update script
lynnliu030 Dec 13, 2022
76f68d8
update script
lynnliu030 Dec 13, 2022
e44d5b3
modify connection num
sarahwooders Dec 23, 2022
34f2fb3
Merge branch 'multipart' of github.com:skyplane-project/skyplane into…
sarahwooders Dec 23, 2022
b7f2956
fixed ips
lynnliu030 Dec 24, 2022
5f68afb
Merge branch 'multipart' of https://github.com/skyplane-project/skypl…
lynnliu030 Dec 24, 2022
ac725ee
merge
sarahwooders Dec 24, 2022
7091c90
add topology plotting during runtime
sarahwooders Dec 29, 2022
7ffb026
update visualize gateway program
sarahwooders Dec 29, 2022
3b4c573
fix partitions
lynnliu030 Dec 29, 2022
6491a99
partially implemented support for reading existing gw program
sarahwooders Dec 30, 2022
a0827ec
Merge branch 'multipart' of github.com:skyplane-project/skyplane into…
sarahwooders Dec 30, 2022
a70b712
change instance types
sarahwooders Dec 31, 2022
1cdf740
add log directory for deprovision
sarahwooders Jan 5, 2023
d4c065a
reduce gateway cp parallism and fix recieve bug
sarahwooders Jan 14, 2023
b46e8dc
fix tracker timer
lynnliu030 Jan 31, 2023
862d36b
tracker output
lynnliu030 Feb 5, 2023
1e920e4
update p2p algorithms
lynnliu030 Feb 6, 2023
f5966b8
fix region issue
sarahwooders Feb 13, 2023
d131b75
merge
sarahwooders Feb 13, 2023
4694265
test broadcast object store in gcp
sarahwooders Feb 25, 2023
8f5c707
merge in changes from main
sarahwooders Feb 25, 2023
7baa9ba
stash
sarahwooders Feb 28, 2023
65f9df8
stash
sarahwooders Feb 28, 2023
c358bcb
map subregions
sarahwooders Mar 2, 2023
a56aafc
add basic obj store interfacing to client, write tests, and fix bucke…
sarahwooders Mar 8, 2023
6484f35
reformat
sarahwooders Mar 8, 2023
63ac676
fix formatting
sarahwooders Mar 8, 2023
369de0d
temporarily give up on azure
sarahwooders Mar 8, 2023
ab97b84
move client test to integration test
sarahwooders Mar 8, 2023
d296033
add new files and use generator
sarahwooders Mar 13, 2023
00c5088
reformat
sarahwooders Mar 13, 2023
16ab757
reformat
sarahwooders Mar 13, 2023
4081a36
fix imports
sarahwooders Mar 13, 2023
c0eb132
fix formatting
sarahwooders Mar 13, 2023
dffee82
add basic obj store interfacing to client, write tests, and fix bucke…
sarahwooders Mar 8, 2023
bcda24b
reformat
sarahwooders Mar 8, 2023
6b583f8
fix formatting
sarahwooders Mar 8, 2023
acd101e
temporarily give up on azure
sarahwooders Mar 8, 2023
10f6cd3
move client test to integration test
sarahwooders Mar 8, 2023
69f5af6
add new files and use generator
sarahwooders Mar 13, 2023
e957d4f
reformat
sarahwooders Mar 13, 2023
afa0ec8
fix imports
sarahwooders Mar 13, 2023
5206e0d
fix formatting
sarahwooders Mar 13, 2023
6b4fb9a
add cost function
sarahwooders Mar 14, 2023
03c100d
reformat and remove variables not needed
lynnliu030 Mar 14, 2023
d161e3d
add cost estimation to client dataplane
sarahwooders Mar 14, 2023
f481516
Merge branch 'main' of github.com:sarahwooders/skyplane
sarahwooders Mar 14, 2023
00d0667
add transfer pairs
sarahwooders Mar 14, 2023
8040dd3
merge
sarahwooders Mar 15, 2023
400214a
add logging for error
sarahwooders Mar 15, 2023
92f5d41
error prints
sarahwooders Mar 15, 2023
4fb9a0f
file size fix?
sarahwooders Mar 15, 2023
5805021
fix error? idk
sarahwooders Mar 15, 2023
b4014d3
Merge branch 'skyplane-project:main' into main
sarahwooders Apr 8, 2023
67e5a39
dataplane
sarahwooders Apr 8, 2023
ad1a491
initial implementation
sarahwooders Apr 10, 2023
e9bc643
add pipeline file
sarahwooders Apr 10, 2023
1a4c225
reformat
sarahwooders Apr 10, 2023
8b80f25
add upload id pipelining for multipart
sarahwooders Apr 11, 2023
d46b2d2
initial TransferJob rework (multicast)
abiswal2001 Apr 11, 2023
e313be2
fix transfer generation, but gateway wont start
sarahwooders Apr 12, 2023
c34277d
add deprovisioning and copy error logs
sarahwooders Apr 12, 2023
643a203
half way through removing chunk req
sarahwooders Apr 12, 2023
9d58956
direct transfer works
sarahwooders Apr 14, 2023
c8f99b6
remove docker script for old gateway
sarahwooders Apr 14, 2023
1c80d39
reformat
sarahwooders Apr 14, 2023
ec19129
fix broadcast important
sarahwooders Apr 16, 2023
3f68bb5
working multicast but broken transfer tracking
sarahwooders Apr 21, 2023
0858e87
add multi dest tracker
sarahwooders Apr 21, 2023
460d78c
reformat/cleanup
sarahwooders Apr 21, 2023
9c12137
scaffold more planners
sarahwooders Apr 21, 2023
f081063
implement verification
sarahwooders Apr 24, 2023
123e799
fix different prefix
sarahwooders Apr 24, 2023
4020e14
cleanup
sarahwooders Apr 24, 2023
bee41b2
try to fix docs
sarahwooders Apr 24, 2023
3934177
remove old imports
sarahwooders Apr 24, 2023
cd7876b
remove pandas
sarahwooders Apr 24, 2023
7cfe700
update poetry
sarahwooders Apr 24, 2023
1bd7110
remove experiment import
sarahwooders Apr 25, 2023
0a18559
fix most tests
sarahwooders Apr 25, 2023
8de0414
reformat
sarahwooders Apr 25, 2023
4c85d3b
merge
sarahwooders Apr 25, 2023
68b3867
Merge branch 'sarahwooders-gateway-program-refactor'
sarahwooders Apr 25, 2023
47d0cda
cleanup
sarahwooders Apr 25, 2023
8b8b718
fixed after merge thank god
sarahwooders Apr 25, 2023
14b5580
reformat
sarahwooders Apr 25, 2023
9256710
reformat and add cost estimate fixes
sarahwooders Apr 25, 2023
1bde576
add back throughput
sarahwooders Apr 26, 2023
dfd104b
more cleanup
sarahwooders Apr 26, 2023
36cc6a7
cleanup
sarahwooders Apr 26, 2023
096fc05
remove dockerfile
sarahwooders Apr 26, 2023
5c99bf6
fix ibm imports
sarahwooders Apr 26, 2023
7418ee4
fix imports
sarahwooders Apr 26, 2023
8b4b25f
more cleanuP
sarahwooders Apr 26, 2023
35c90b0
fix ibm imports and pbar
sarahwooders Apr 26, 2023
d2a0aed
add bar for multipart completion
sarahwooders Apr 26, 2023
cc51688
cleanup and remove ibm test
sarahwooders Apr 26, 2023
4599973
forgot to add operator files
sarahwooders Apr 26, 2023
c793ea8
support CLI
sarahwooders Apr 27, 2023
e66fdba
comment out on-prem
sarahwooders Apr 27, 2023
00caafe
ignore solver for linting
sarahwooders Apr 27, 2023
2dc42de
reformat
sarahwooders Apr 27, 2023
e737d3d
format
sarahwooders Apr 27, 2023
2952746
fix
sarahwooders Apr 27, 2023
82030b6
fix errors
sarahwooders Apr 27, 2023
d0017e1
fix pytype issues
sarahwooders Apr 27, 2023
3238983
fix transfer list bug
sarahwooders Apr 29, 2023
5883e51
Merge branch 'skyplane-project:main' into main
sarahwooders Apr 30, 2023
acd952f
add private ips
sarahwooders Apr 30, 2023
08059aa
merge
sarahwooders Apr 30, 2023
2649fe2
add back region tag check
sarahwooders Apr 30, 2023
bff0155
Merge branch 'main' of github.com:sarahwooders/skyplane
sarahwooders Apr 30, 2023
618dbb4
cleanup
sarahwooders Apr 30, 2023
8a5ef62
remove pop
sarahwooders Apr 30, 2023
c2a2242
fix queue
sarahwooders Apr 30, 2023
07f579d
fix pytype
sarahwooders Apr 30, 2023
167f894
fix errors
sarahwooders Apr 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions skyplane/api/dataplane.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,12 @@ def provision(
def copy_gateway_logs(self):
# copy logs from all gateways in parallel
def copy_log(instance):
out_file = self.transfer_dir / f"gateway_{instance.uuid()}.stdout"
err_file = self.transfer_dir / f"gateway_{instance.uuid()}.stdout"
logger.fs.info(f"[Dataplane.copy_gateway_logs] Copying logs from {instance.uuid()}: {out_file}")
instance.run_command("sudo docker logs -t skyplane_gateway 2> /tmp/gateway.stderr > /tmp/gateway.stdout")
instance.download_file("/tmp/gateway.stdout", self.transfer_dir / f"gateway_{instance.uuid()}.stdout")
instance.download_file("/tmp/gateway.stderr", self.transfer_dir / f"gateway_{instance.uuid()}.stderr")
instance.download_file("/tmp/gateway.stdout", out_file)
instance.download_file("/tmp/gateway.stderr", err_file)

do_parallel(copy_log, self.bound_nodes.values(), n=-1)

Expand Down
11 changes: 6 additions & 5 deletions skyplane/api/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ def _provision_task(self, task: ProvisionerTask):
server = self.azure.provision_instance(task.region, task.vm_type, use_spot_instances=task.spot, tags=task.tags)
elif task.cloud_provider == "gcp":
assert self.gcp.auth.enabled(), "GCP credentials not configured"
# TODO: specify network tier in
server = self.gcp.provision_instance(
task.region,
task.vm_type,
Expand Down Expand Up @@ -248,10 +247,12 @@ def provision(self, authorize_firewall: bool = True, max_jobs: int = 16, spinner
# NOTE: the following setup is for broadcast only
if aws_provisioned:
authorize_ip_jobs.extend([partial(self.aws.add_ips_to_security_group, r, None) for r in set(aws_regions)])
# if gcp_provisioned:
# def authorize_gcp_gateways():
# self.gcp_firewall_rules.add(self.gcp.authorize_gateways(public_ips + private_ips))
# authorize_ip_jobs.append(authorize_gcp_gateways)
if gcp_provisioned:

def authorize_gcp_gateways():
self.gcp_firewall_rules.add(self.gcp.authorize_gateways(public_ips + private_ips))

authorize_ip_jobs.append(authorize_gcp_gateways)

do_parallel(
lambda fn: fn(),
Expand Down
16 changes: 3 additions & 13 deletions skyplane/api/tracker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
from pprint import pprint
from pprint import pprint
import json
import time
from abc import ABC
Expand Down Expand Up @@ -221,12 +222,7 @@ def monitor_single_dst_helper(dst_region):
job.finalize()
except Exception as e:
UsageClient.log_exception(
"finalize job",
e,
args,
self.dataplane.topology.src_region_tag,
self.dataplane.topology.dest_region_tags,
session_start_timestamp_ms,
"finalize job", e, args, self.dataplane.src_region_tag, self.dataplane.dst_region_tag, session_start_timestamp_ms
)
raise e
end_time = int(time.time())
Expand Down Expand Up @@ -374,11 +370,5 @@ def query_bytes_dispatched(self):
return 0
bytes_total_per_job = {}
for job_uuid in self.job_complete_chunk_ids.keys():
bytes_total_per_job[job_uuid] = sum(
[
cr.chunk_length_bytes
for cr in self.job_chunk_requests[job_uuid].values()
# if cr.chunk_id in self.job_complete_chunk_ids[job_uuid]
]
)
bytes_total_per_job[job_uuid] = sum([cr.chunk_length_bytes for cr in self.job_chunk_requests[job_uuid].values()])
return sum(bytes_total_per_job.values())
14 changes: 11 additions & 3 deletions skyplane/api/transfer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from collections import defaultdict
from dataclasses import dataclass, field
from queue import Queue

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Callable, Generator, List, Optional, Tuple, TypeVar, Dict

from abc import ABC, abstractmethod
Expand Down Expand Up @@ -341,7 +343,12 @@ def chunk(self, transfer_pair_generator: Generator[TransferPair, None, None]) ->
yield multipart_chunk_queue.get()

if self.transfer_config.multipart_enabled:
while not multipart_send_queue.empty() or not multipart_chunk_queue.empty():
# wait for processing multipart requests to finish
logger.fs.debug("Waiting for multipart threads to finish")
# while not multipart_send_queue.empty():
# TODO: may be an issue waiting for this in case of force-quit
while not multipart_send_queue.empty():
logger.fs.debug(f"Remaining in multipart queue: sent {multipart_send_queue.qsize()}")
time.sleep(0.1)
# send sentinel to all threads
multipart_exit_event.set()
Expand Down Expand Up @@ -587,6 +594,7 @@ def dispatch(
bytes_dispatched = [0] * len(src_gateways)
n_multiparts = 0
start = time.time()

for batch in batches:
# send upload_id mappings to sink gateways
upload_id_batch = [cr for cr in batch if cr.upload_id_mapping is not None]
Expand Down Expand Up @@ -642,7 +650,6 @@ def dispatch(

def finalize(self):
"""Complete the multipart upload requests"""
print("Finalizing multipart uploads...")
typer.secho(f"Finalizing multipart uploads...", fg="bright_black")
groups = defaultdict(list)
for req in self.multipart_transfer_list:
Expand All @@ -657,9 +664,10 @@ def finalize(self):

def complete_fn(batch):
for req in batch:
logger.fs.debug(f"Finalize upload id {req['upload_id']} for key {req['key']}")
obj_store_interface.complete_multipart_upload(req["key"], req["upload_id"])

do_parallel(complete_fn, batches, n=-1)
do_parallel(complete_fn, batches, n=8)

def verify(self):
"""Verify the integrity of the transfered destination objects"""
Expand Down
2 changes: 1 addition & 1 deletion skyplane/api/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class UsageStatsToReport:
#: The source region of the transfer session.
source_region: str
#: The destination region of the transfer session.
destination_region: str
destination_region: str # TODO: make into dest_regions
#: The source cloud provider of the transfer session.
source_cloud_provider: str
#: The destination cloud provider of the transfer session.
Expand Down
4 changes: 1 addition & 3 deletions skyplane/compute/ibmcloud/ibmcloud_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ def __init__(self, config: Optional[SkyplaneConfig] = None):

@imports.inject("ibm_cloud_sdk_core", "ibm_cloud_sdk_core.authenticators", pip_extra="ibmcloud")
def get_iam_authenticator(ibm_cloud_sdk_core, self):
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator

return IAMAuthenticator(self.config.ibmcloud_iam_key, url=self.config.ibmcloud_iam_endpoint)
return ibm_cloud_sdk_core.authenticators.IAMAuthenticator(self.config.ibmcloud_iam_key, url=self.config.ibmcloud_iam_endpoint)

def get_ibmcloud_endpoint(self, region, compute_backend="public"):
if region is not None:
Expand Down
2 changes: 1 addition & 1 deletion skyplane/gateway/chunk_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def add_partition(self, partition_id: str):
raise ValueError(f"Partition {partition_id} already exists")
self.chunk_requests[partition_id] = GatewayQueue()

def add_partition(self, partition_id: str, queue: Optional[GatewayQueue]):
def add_partition(self, partition_id: str, queue: GatewayQueue):
"""Create a queue for this partition."""
print("Adding partition", partition_id, queue)
if partition_id in self.chunk_requests:
Expand Down
5 changes: 3 additions & 2 deletions skyplane/gateway/gateway_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,12 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_
elif op["op_type"] == "send":
# TODO: handle private ips for GCP->GCP
target_gateway_info = self.gateway_info[op["target_gateway_id"]]
print("Gateway sender sending to ", target_gateway_info["private_ip_address"])
ip_addr = target_gateway_info["private_ip_address"] if op["private_ip"] else target_gateway_info["public_ip_address"]
print("Gateway sender sending to ", ip_addr, "private", op["private_ip"])
operators[handle] = GatewaySender(
handle,
region=self.region,
ip_addr=target_gateway_info["private_ip_address"],
ip_addr=ip_addr,
input_queue=input_queue,
output_queue=output_queue,
error_event=self.error_event,
Expand Down
11 changes: 10 additions & 1 deletion skyplane/gateway/gateway_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,22 @@ def __repr__(self):


class GatewaySend(GatewayOperator):
def __init__(self, target_gateway_id: str, region: str, num_connections: int = 32, compress: bool = False, encrypt: bool = False):
def __init__(
self,
target_gateway_id: str,
region: str,
num_connections: int = 32,
compress: bool = False,
encrypt: bool = False,
private_ip: bool = False,
):
super().__init__("send")
self.target_gateway_id = target_gateway_id # gateway to send to
self.region = region # region to send to
self.num_connections = num_connections # default this for now
self.compress = compress
self.encrypt = encrypt
self.private_ip = private_ip # whether to send to private or public IP (private for GCP->GCP)


class GatewayReceive(GatewayOperator):
Expand Down
4 changes: 2 additions & 2 deletions skyplane/gateway/gateway_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def put(self, chunk_req):
self.q.put(chunk_req)

def pop(self, requester_handle=None):
self.q.pop()
self.q.get()

def get_nowait(self, requester_handle=None):
return self.q.get_nowait()
Expand Down Expand Up @@ -48,7 +48,7 @@ def put(self, chunk_req):
self.q[handle].put(chunk_req)

def pop(self, requester_handle):
self.q[requester_handle].pop()
self.q[requester_handle].get()

def get_nowait(self, requester_handle):
return self.q[requester_handle].get_nowait()
16 changes: 8 additions & 8 deletions skyplane/gateway/operators/gateway_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
import traceback
from functools import partial
from multiprocessing import Event, Process
from multiprocessing import Event, Process, Queue
from multiprocessing.managers import DictProxy
from typing import Dict, List, Optional

Expand Down Expand Up @@ -35,7 +35,7 @@ def __init__(
input_queue: GatewayQueue,
output_queue: GatewayQueue,
error_event,
error_queue: GatewayQueue,
error_queue: Queue,
chunk_store: ChunkStore,
n_processes: Optional[int] = 1,
):
Expand Down Expand Up @@ -163,7 +163,7 @@ def __init__(
input_queue: GatewayQueue,
output_queue: GatewayQueue,
error_event,
error_queue: GatewayQueue,
error_queue: Queue,
chunk_store: ChunkStore,
ip_addr: str,
use_tls: Optional[bool] = True,
Expand Down Expand Up @@ -358,7 +358,7 @@ def __init__(
input_queue: GatewayQueue,
output_queue: GatewayQueue,
error_event,
error_queue: GatewayQueue,
error_queue: Queue,
chunk_store: ChunkStore,
size_mb: int,
n_processes: Optional[int] = 1,
Expand Down Expand Up @@ -398,7 +398,7 @@ def __init__(
input_queue: GatewayQueue,
output_queue: GatewayQueue,
error_event,
error_queue: GatewayQueue,
error_queue: Queue,
chunk_store: ChunkStore,
n_processes: int = 1,
):
Expand All @@ -417,7 +417,7 @@ def __init__(
input_queue: GatewayQueue,
output_queue: GatewayQueue,
error_event,
error_queue: GatewayQueue,
error_queue: Queue,
n_processes: Optional[int] = 1,
chunk_store: Optional[ChunkStore] = None,
bucket_name: Optional[str] = None,
Expand Down Expand Up @@ -452,7 +452,7 @@ def __init__(
input_queue: GatewayQueue,
output_queue: GatewayQueue,
error_event,
error_queue: GatewayQueue,
error_queue: Queue,
n_processes: int = 32,
chunk_store: Optional[ChunkStore] = None,
bucket_name: Optional[str] = None,
Expand Down Expand Up @@ -525,7 +525,7 @@ def __init__(
input_queue: GatewayQueue,
output_queue: GatewayQueue,
error_event,
error_queue: GatewayQueue,
error_queue: Queue,
upload_id_map: DictProxy, # map of upload_id mappings from client
n_processes: Optional[int] = 32,
chunk_store: Optional[ChunkStore] = None,
Expand Down
11 changes: 10 additions & 1 deletion skyplane/planner/planner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from importlib.resources import path
from typing import List, Optional, Tuple
from typing import List, Optional, Tuple

from skyplane import compute

Expand Down Expand Up @@ -118,6 +119,7 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
for job in jobs:
src_bucket = job.src_iface.bucket()
src_region_tag = job.src_iface.region_tag()
src_provider = src_region_tag.split(":")[0]

# give each job a different partition id, so we can read/write to different buckets
partition_id = jobs.index(job)
Expand All @@ -139,9 +141,16 @@ def plan(self, jobs: List[TransferJob]) -> TopologyPlan:
# can send to any gateway in region
mux_or = src_program.add_operator(GatewayMuxOr(), parent_handle=mux_and, partition_id=partition_id)
for i in range(self.n_instances):
private_ip = False
if dst_gateways[i].provider == "gcp" and src_provider == "gcp":
print("Using private IP for GCP to GCP transfer", src_region_tag, dst_region_tag)
private_ip = True
src_program.add_operator(
GatewaySend(
target_gateway_id=dst_gateways[i].gateway_id, region=dst_region_tag, num_connections=self.n_connections
target_gateway_id=dst_gateways[i].gateway_id,
region=dst_region_tag,
num_connections=self.n_connections,
private_ip=private_ip,
),
parent_handle=mux_or,
partition_id=partition_id,
Expand Down
4 changes: 2 additions & 2 deletions skyplane/planner/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def regions(self) -> List[str]:

def add_gateway(self, region_tag: str):
"""Create gateway in specified region"""
gateway_id = region_tag + str(len([gateway for gateway in self.gateways.values() if gateway.region == region_tag]))
assert gateway_id not in self.gateways
gateway_id = region_tag + str(len([gateway for gateway in self.gateways.values() if gateway.region_tag == region_tag]))
assert gateway_id not in self.gateways, f"Gateway id {gateway_id} in {self.gateways}"
gateway = TopologyPlanGateway(region_tag, gateway_id)
self.gateways[gateway_id] = gateway
return gateway
Expand Down