Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process pool for uploading/downloading object store #121

Merged
merged 30 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f02131b
got paras' commented s3 file creation code to (mostly) run
sarahwooders Jan 9, 2022
094d2c4
comments for adding s3 integration
Jan 10, 2022
03555d9
Merge branch 'main' of github.com:parasj/skylark into main
Jan 10, 2022
f40b50e
add object store info to ChunkRequestHop
Jan 12, 2022
72ab083
remote changes
sarahwooders Jan 13, 2022
f007a15
Merge branch 'main' of github.com:parasj/skylark into main
sarahwooders Jan 15, 2022
4f93f9d
remove
sarahwooders Jan 15, 2022
f886938
merge
sarahwooders Jan 20, 2022
26a67c7
Merge branch 'main' of github.com:parasj/skylark into main
sarahwooders Jan 22, 2022
2f8ae70
add GCS (google object store)
sarahwooders Jan 22, 2022
adc761e
merge
sarahwooders Jan 22, 2022
947e146
initial gcp cli support implementation
sarahwooders Jan 27, 2022
4f0938f
merge
sarahwooders Jan 27, 2022
da43f97
cleanup
sarahwooders Jan 27, 2022
05bc3da
fix arg
sarahwooders Jan 27, 2022
3a30d60
Merge branch 'main' of github.com:parasj/skylark into main
sarahwooders Jan 27, 2022
9839a26
Merge branch 'main' of github.com:parasj/skylark into main
sarahwooders Jan 27, 2022
0e545ca
Merge branch 'main' of github.com:parasj/skylark into main
sarahwooders Jan 27, 2022
fd44721
remove prints
sarahwooders Jan 30, 2022
6c9ce97
Merge branch 'main' of github.com:parasj/skylark into main
sarahwooders Jan 30, 2022
65833d7
Merge branch 'main' of github.com:parasj/skylark into main
sarahwooders Jan 31, 2022
4847c50
Merge branch 'main' of github.com:parasj/skylark into main
sarahwooders Jan 31, 2022
baa99c6
integrate replicate-json commant with obj store
sarahwooders Jan 31, 2022
060f356
about to make big changes
sarahwooders Jan 31, 2022
b15e9d8
draft of new object store process pool
sarahwooders Feb 1, 2022
9e9c8b6
add object store conn file
sarahwooders Feb 1, 2022
9976ac9
add waiting in seperate thread
sarahwooders Feb 1, 2022
5e3c6f5
cleanup
sarahwooders Feb 1, 2022
a9e31cb
cleanup
sarahwooders Feb 1, 2022
6b89185
Merge branch 'main' into dev/obj-store-refactor
sarahwooders Feb 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions skylark/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ChunkRequest:
src_object_store_bucket: Optional[str] = None
dst_object_store_bucket: Optional[str] = None


def __post_init__(self):
if self.src_type == "object_store":
assert self.src_object_store_bucket is not None
Expand Down
24 changes: 18 additions & 6 deletions skylark/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,20 @@ def replicate_random(
def replicate_json(
path: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, help="Path to JSON file describing replication plan"),
size_total_mb: int = typer.Option(2048, "--size-total-mb", "-s", help="Total transfer size in MB (across n_chunks chunks)"),
n_chunks: int = 512,
n_chunks: int = typer.Option(512, "--n-chunks", "-n", help="Number of chunks"),
# bucket options
use_random_data: bool = True,
use_random_data: bool = typer.Option(False, "--random-data", help="Use random data"),
bucket_prefix: str = "skylark",
source_bucket: str = typer.Option(None, "--source-bucket", help="Source bucket url"),
dest_bucket: str = typer.Option(None, "--dest-bucket", help="Destination bucket url"),
key_prefix: str = "/test/replicate_random",
# gateway provisioning options
reuse_gateways: bool = True,
gateway_docker_image: str = os.environ.get("SKYLARK_DOCKER_IMAGE", "ghcr.io/parasj/skylark:main"),
# cloud provider specific options
azure_subscription: Optional[str] = None,
gcp_project: Optional[str] = None,
aws_instance_class: str = "m5.8xlarge",
aws_instance_class: str = "m5.4xlarge",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we are moving away from m5.8xlarge?

azure_instance_class: str = "Standard_D32_v5",
gcp_instance_class: Optional[str] = "n2-standard-32",
gcp_use_premium_network: bool = True,
Expand Down Expand Up @@ -255,11 +257,21 @@ def replicate_json(
objs=[f"{key_prefix}/{i}" for i in range(n_chunks)],
random_chunk_size_mb=chunk_size_mb,
)
job = rc.run_replication_plan(job)
total_bytes = n_chunks * chunk_size_mb * MB
else:
raise NotImplementedError()
# TODO: Don't hardcode n_chunks
# TODO: Don't hardcode obj keys
job = ReplicationJob(
source_region=topo.source_region(),
source_bucket=source_bucket,
dest_region=topo.sink_region(),
dest_bucket=dest_bucket,
objs=[f"{key_prefix}/{i}" for i in range(n_chunks)],
)
job = rc.run_replication_plan(job)
total_bytes = sum([chunk_req.chunk.chunk_length_bytes for chunk_req in job.chunk_requests])

total_bytes = n_chunks * chunk_size_mb * MB
job = rc.run_replication_plan(job)
logger.info(f"{total_bytes / GB:.2f}GByte replication job launched")
stats = rc.monitor_transfer(
job, show_pbar=True, log_interval_s=log_interval_s, time_limit_seconds=time_limit_seconds, cancel_pending=False
Expand Down
38 changes: 19 additions & 19 deletions skylark/compute/aws/aws_cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,28 @@ def name(self):
@staticmethod
def region_list() -> List[str]:
all_regions = [
"af-south-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-east-1",
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
# "ap-southeast-3", # too new region, not well supported
"ca-central-1",
"eu-central-1",
"eu-north-1",
"eu-south-1",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"me-south-1",
"sa-east-1",
#"af-south-1",
#"ap-northeast-1",
#"ap-northeast-2",
#"ap-northeast-3",
#"ap-east-1",
#"ap-south-1",
#"ap-southeast-1",
#"ap-southeast-2",
## "ap-southeast-3", # too new region, not well supported
#"ca-central-1",
#"eu-central-1",
#"eu-north-1",
#"eu-south-1",
#"eu-west-1",
#"eu-west-2",
#"eu-west-3",
#"me-south-1",
#"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-1",
"us-west-2",
#"us-west-2",
]
return all_regions

Expand Down
2 changes: 2 additions & 0 deletions skylark/compute/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ def check_stderr(tup):
gateway_daemon_cmd = f"python -u /pkg/skylark/gateway/gateway_daemon.py --chunk-dir /dev/shm/skylark/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag}"
docker_launch_cmd = f"sudo docker run {docker_run_flags} --name skylark_gateway {gateway_docker_image} {gateway_daemon_cmd}"
start_out, start_err = self.run_command(docker_launch_cmd)
print(start_out)
print(start_err)
logger.debug(desc_prefix + f": Gateway started {start_out.strip()}")
assert not start_err.strip(), f"Error starting gateway: {start_err.strip()}"
gateway_container_hash = start_out.strip().split("\n")[-1][:12]
Expand Down
2 changes: 1 addition & 1 deletion skylark/gateway/chunk_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def set_chunk_state(self, chunk_id: int, new_status: ChunkState):
old_status = self.chunk_status.get(chunk_id)
self.chunk_status[chunk_id] = new_status
self.chunk_status_log.append({"chunk_id": chunk_id, "state": new_status, "time": datetime.utcnow()})
logger.info(f"[chunk_store]:{chunk_id} state change from {old_status} to {new_status}")
#logger.info(f"[chunk_store]:{chunk_id} state change from {old_status} to {new_status}")

def get_chunk_status_log(self) -> List[Dict]:
return list(self.chunk_status_log)
Expand Down
97 changes: 63 additions & 34 deletions skylark/gateway/gateway_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from threading import BoundedSemaphore
from typing import Dict

from multiprocessing import Process
import concurrent.futures

import setproctitle
from skylark.utils import logger
from skylark import MB, print_header
Expand All @@ -20,6 +23,8 @@
from skylark.gateway.gateway_receiver import GatewayReceiver
from skylark.gateway.gateway_sender import GatewaySender

from skylark.gateway.gateway_obj_store import GatewayObjStoreConn

from skylark.obj_store.object_store_interface import ObjectStoreInterface
from skylark.utils.utils import Timer

Expand All @@ -30,7 +35,10 @@ def __init__(self, region: str, outgoing_ports: Dict[str, int], chunk_dir: PathL
self.chunk_store = ChunkStore(chunk_dir)
self.gateway_receiver = GatewayReceiver(chunk_store=self.chunk_store, max_pending_chunks=max_incoming_ports)
self.gateway_sender = GatewaySender(chunk_store=self.chunk_store, outgoing_ports=outgoing_ports)
self.obj_store_interfaces: Dict[str, ObjectStoreInterface] = {}
print(outgoing_ports)

self.obj_store_conn = GatewayObjStoreConn(chunk_store=self.chunk_store, max_conn=32)
#self.obj_store_interfaces: Dict[str, ObjectStoreInterface] = {}

# Download thread pool
self.dl_pool_semaphore = BoundedSemaphore(value=128)
Expand All @@ -42,12 +50,12 @@ def __init__(self, region: str, outgoing_ports: Dict[str, int], chunk_dir: PathL
self.api_server.start()
logger.info(f"[gateway_daemon] API started at {self.api_server.url}")

def get_obj_store_interface(self, region: str, bucket: str) -> ObjectStoreInterface:
key = f"{region}:{bucket}"
if key not in self.obj_store_interfaces:
logger.warning(f"[gateway_daemon] ObjectStoreInferface not cached for {key}")
self.obj_store_interfaces[key] = ObjectStoreInterface.create(region, bucket)
return self.obj_store_interfaces[key]
#def get_obj_store_interface(self, region: str, bucket: str) -> ObjectStoreInterface:
# key = f"{region}:{bucket}"
# if key not in self.obj_store_interfaces:
# logger.warning(f"[gateway_daemon] ObjectStoreInferface not cached for {key}")
# self.obj_store_interfaces[key] = ObjectStoreInterface.create(region, bucket)
# return self.obj_store_interfaces[key]

def cleanup(self):
logger.warning("[gateway_daemon] Shutting down gateway daemon")
Expand All @@ -62,10 +70,12 @@ def exit_handler(signum, frame):
exit_flag.set()
self.gateway_receiver.stop_servers()
self.gateway_sender.stop_workers()
self.obj_store_conn.stop_workers()
sys.exit(0)

logger.info("[gateway_daemon] Starting gateway sender workers")
self.gateway_sender.start_workers()
self.obj_store_conn.start_workers()
signal.signal(signal.SIGINT, exit_handler)
signal.signal(signal.SIGTERM, exit_handler)

Expand All @@ -78,19 +88,27 @@ def exit_handler(signum, frame):
self.chunk_store.state_start_upload(chunk_req.chunk.chunk_id)
self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id)
elif self.region == chunk_req.dst_region and chunk_req.dst_type == "object_store":
#print("queue", chunk_req)
self.chunk_store.state_queue_upload(chunk_req.chunk.chunk_id)
self.chunk_store.state_start_upload(chunk_req.chunk.chunk_id)

# function to upload data to object store
def fn(chunk_req, dst_region, dst_bucket):
fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute())
obj_store_interface = self.get_obj_store_interface(dst_region, dst_bucket)
with self.ul_pool_semaphore:
obj_store_interface.upload_object(fpath, chunk_req.chunk.key).result()
self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id)

# start in seperate thread
threading.Thread(target=fn, args=(chunk_req, chunk_req.dst_region, chunk_req.dst_object_store_bucket)).start()
self.obj_store_conn.queue_request(chunk_req, "upload")
#self.chunk_store.state_start_upload(chunk_req.chunk.chunk_id)

## function to upload data to object store
#def fn(chunk_req, dst_region, dst_bucket):
# fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute())
# obj_store_interface = self.get_obj_store_interface(dst_region, dst_bucket)
# with self.ul_pool_semaphore:
# with Timer() as t:
# obj_store_interface.upload_object(fpath, chunk_req.chunk.key).result()

# mbps = chunk_req.chunk.chunk_length_bytes / t.elapsed / MB * 8
# logger.info(f"[gateway_daemon] Uploaded {chunk_req.chunk.key} at {mbps:.2f}Mbps")
# logger.info(f"Pending upload: {obj_store_interface.pending_uploads}, uploaded: {obj_store_interface.completed_uploads}")
# self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id)

## start in seperate thread
##threading.Thread(target=fn, args=(chunk_req, chunk_req.dst_region, chunk_req.dst_object_store_bucket)).start()
#p = Process(target=fn, args=(chunk_req, chunk_req.dst_region, chunk_req.dst_object_store_bucket)).start()
elif self.region != chunk_req.dst_region:
self.gateway_sender.queue_request(chunk_req)
self.chunk_store.state_queue_upload(chunk_req.chunk.chunk_id)
Expand Down Expand Up @@ -120,22 +138,33 @@ def fn(chunk_req, size_mb):
# generate random data in seperate thread
threading.Thread(target=fn, args=(chunk_req, size_mb)).start()
elif self.region == chunk_req.src_region and chunk_req.src_type == "object_store":
#print("queue", chunk_req)
self.chunk_store.state_start_download(chunk_req.chunk.chunk_id)

# function to download data from source object store
def fn(chunk_req, src_region, src_bucket):
fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute())
obj_store_interface = self.get_obj_store_interface(src_region, src_bucket)
with self.dl_pool_semaphore:
with Timer() as t:
obj_store_interface.download_object(chunk_req.chunk.key, fpath).result()
mbps = chunk_req.chunk.chunk_length_bytes / t.elapsed / MB * 8
logger.info(f"[gateway_daemon] Downloaded {chunk_req.chunk.key} at {mbps:.2f}Mbps")
self.chunk_store.chunk_requests[chunk_req.chunk.chunk_id] = chunk_req
self.chunk_store.state_finish_download(chunk_req.chunk.chunk_id)

# start in seperate thread
threading.Thread(target=fn, args=(chunk_req, chunk_req.src_region, chunk_req.src_object_store_bucket)).start()
self.obj_store_conn.queue_request(chunk_req, "download")

## function to download data from source object store
#def fn(chunk_req, src_region, src_bucket):
# fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute())
# obj_store_interface = self.get_obj_store_interface(src_region, src_bucket)
# with self.dl_pool_semaphore:
# with Timer() as t:
# #print("create process")
# #p = Process(obj_store_interface.download_object(chunk_req.chunk.key, fpath).result)
# #p.start()
# #p.join()
# obj_store_interface.download_object(chunk_req.chunk.key, fpath).result()
# mbps = chunk_req.chunk.chunk_length_bytes / t.elapsed / MB * 8
# logger.info(f"[gateway_daemon] Downloaded {chunk_req.chunk.key} at {mbps:.2f}Mbps")
# #logger.info(f"Pending download: {obj_store_interface.pending_downloads}, downloads: {obj_store_interface.completed_downloads}")
# self.chunk_store.chunk_requests[chunk_req.chunk.chunk_id] = chunk_req
# self.chunk_store.state_finish_download(chunk_req.chunk.chunk_id)
# print("process complete")

## start in seperate thread
##threading.Thread(target=fn, args=(chunk_req, chunk_req.src_region, chunk_req.src_object_store_bucket)).start()
#print("submit task")
##executor.submit(fn, chunk_req, chunk_req.src_region, chunk_req.src_object_store_bucket)
#p = Process(target=fn, args=(chunk_req, chunk_req.src_region, chunk_req.src_object_store_bucket)).start()
elif self.region != chunk_req.src_region: # do nothing, waiting for chunk to be be ready_to_upload
continue
else:
Expand Down
Loading