From f02131bfebb1a713492a27005d59f721e8793325 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Sun, 9 Jan 2022 22:54:49 +0000 Subject: [PATCH 01/16] got paras' commented s3 file creation code to (mostly) run --- skylark/obj_store/s3_interface.py | 3 ++ skylark/test/test_replicator_client.py | 65 +++++++++++++++----------- 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/skylark/obj_store/s3_interface.py b/skylark/obj_store/s3_interface.py index 553aee359..a845148ae 100644 --- a/skylark/obj_store/s3_interface.py +++ b/skylark/obj_store/s3_interface.py @@ -42,6 +42,8 @@ def _on_done_upload(self, **kwargs): def bucket_exists(self): s3_client = AWSServer.get_boto3_client("s3", self.aws_region) + print("bucket name", self.bucket_name) + print([b["Name"] for b in s3_client.list_buckets()["Buckets"]]) return self.bucket_name in [b["Name"] for b in s3_client.list_buckets()["Buckets"]] def create_bucket(self): @@ -106,6 +108,7 @@ def _on_body_download(offset, chunk, **kwargs): ).finished_future def upload_object(self, src_file_path, dst_object_name, content_type="infer") -> Future: + print("uploading object", src_file_path, dst_object_name) src_file_path, dst_object_name = str(src_file_path), str(dst_object_name) assert dst_object_name.startswith("/") content_len = os.path.getsize(src_file_path) diff --git a/skylark/test/test_replicator_client.py b/skylark/test/test_replicator_client.py index f51f44921..5f181d899 100644 --- a/skylark/test/test_replicator_client.py +++ b/skylark/test/test_replicator_client.py @@ -5,6 +5,11 @@ from skylark import GB, MB, print_header from skylark.gateway.chunk import ChunkState +import tempfile +import concurrent +import os +from skylark.obj_store.s3_interface import S3Interface + from skylark.replicate.replication_plan import ReplicationJob, ReplicationTopology from skylark.replicate.replicator_client import ReplicatorClient from skylark.utils.utils import Timer @@ -27,6 +32,9 @@ def parse_args(): parser.add_argument("--n-chunks", default=16, type=int, help="Number of chunks in bucket") parser.add_argument("--skip-upload", action="store_true", help="Skip uploading objects to S3") + # bucket namespace + parser.add_argument("--bucket-prefix", default="sarah", help="Prefix for bucket to avoid naming collision") + # gateway provisioning parser.add_argument("--gcp-project", default="skylark-333700", help="GCP project ID") parser.add_argument("--gateway-docker-image", default="ghcr.io/parasj/skylark:main", help="Docker image for gateway instances") @@ -48,35 +56,40 @@ def parse_args(): def main(args): src_bucket, dst_bucket = f"skylark-{args.src_region.split(':')[1]}", f"skylark-{args.dest_region.split(':')[1]}" - # s3_interface_src = S3Interface(args.src_region.split(":")[1], src_bucket) - # s3_interface_dst = S3Interface(args.dest_region.split(":")[1], dst_bucket) - # s3_interface_src.create_bucket() - # s3_interface_dst.create_bucket() + s3_interface_src = S3Interface(args.src_region.split(":")[1], f"{args.bucket_prefix}-{src_bucket}") + s3_interface_dst = S3Interface(args.dest_region.split(":")[1], f"{args.bucket_prefix}-{dst_bucket}") + s3_interface_src.create_bucket() + s3_interface_dst.create_bucket() if not args.skip_upload: # todo implement object store support - pass - # matching_src_keys = list(s3_interface_src.list_objects(prefix=args.key_prefix)) - # matching_dst_keys = list(s3_interface_dst.list_objects(prefix=args.key_prefix)) - # if matching_src_keys: - # logger.warning(f"Deleting objects from source bucket: {matching_src_keys}") - # s3_interface_src.delete_objects(matching_src_keys) - # if matching_dst_keys: - # logger.warning(f"Deleting objects from destination bucket: {matching_dst_keys}") - # s3_interface_dst.delete_objects(matching_dst_keys) - - # # create test objects w/ random data - # logger.info("Creating test objects") - # obj_keys = [] - # futures = [] - # with tempfile.NamedTemporaryFile() as f: - # f.write(os.urandom(int(MB * args.chunk_size_mb))) - # f.seek(0) - # for i in trange(args.n_chunks): - # k = f"{args.key_prefix}/{i}" - # futures.append(s3_interface_src.upload_object(f.name, k)) - # obj_keys.append(k) - # concurrent.futures.wait(futures) + #pass + print("Not skipping upload...", src_bucket, dst_bucket) + matching_src_keys = list(s3_interface_src.list_objects(prefix=args.key_prefix)) + matching_dst_keys = list(s3_interface_dst.list_objects(prefix=args.key_prefix)) + if matching_src_keys: + logger.warning(f"Deleting objects from source bucket: {matching_src_keys}") + s3_interface_src.delete_objects(matching_src_keys) + if matching_dst_keys: + logger.warning(f"Deleting objects from destination bucket: {matching_dst_keys}") + s3_interface_dst.delete_objects(matching_dst_keys) + + # create test objects w/ random data + logger.info("Creating test objects") + obj_keys = [] + futures = [] + + # TODO: for n_chunks > 880, get syscall error + with tempfile.NamedTemporaryFile() as f: + f.write(os.urandom(int(MB * args.chunk_size_mb))) + f.seek(0) + for i in range(args.n_chunks): + k = f"{args.key_prefix}/{i}" + futures.append(s3_interface_src.upload_object(f.name, k)) + print("done", f.name, len(futures)) + obj_keys.append(k) + print("created all futures") + concurrent.futures.wait(futures) else: obj_keys = [f"{args.key_prefix}/{i}" for i in range(args.n_chunks)] From 094d2c4e0acc2f7186439775c0a905dbf694b481 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 10 Jan 2022 00:26:22 +0000 Subject: [PATCH 02/16] comments for adding s3 integration --- skylark/gateway/gateway_daemon.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index 4d74e67e1..8969b3d9b 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -91,19 +91,33 @@ def exit_handler(signum, frame): if current_hop.chunk_location_type == "src_object_store": logger.warning(f"NOT IMPLEMENTED: Queuing object store download for chunk {chunk_req.chunk.chunk_id}") self.chunk_store.state_fail(chunk_req.chunk.chunk_id) + + # update chunk state + self.chunk_store.state_start_download(chunk_req.chunk.chunk_id) + + # function to download data from S3 + # TODO: add this to a queue like with GatewaySender to prevent OOM + + # start in seperate thread + elif current_hop.chunk_location_type.startswith("random_"): + # update chunk state self.chunk_store.state_start_download(chunk_req.chunk.chunk_id) + size_mb_match = re.search(r"random_(\d+)MB", current_hop.chunk_location_type) assert size_mb_match is not None size_mb = int(size_mb_match.group(1)) + # function to write random data file def fn(chunk_req, size_mb): fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute()) + logger.info(f"Writing random data path {fpath}") os.system(f"dd if=/dev/zero of={fpath} bs={MB} count={size_mb}") chunk_req.chunk.chunk_length_bytes = os.path.getsize(fpath) self.chunk_store.chunk_requests[chunk_req.chunk.chunk_id] = chunk_req self.chunk_store.state_finish_download(chunk_req.chunk.chunk_id) - + + # generate random data in seperate thread threading.Thread(target=fn, args=(chunk_req, size_mb)).start() elif current_hop.chunk_location_type == "relay" or current_hop.chunk_location_type == "save_local": # do nothing, waiting for chunk to be be ready_to_upload From f40b50ed8e15f90744cd547a4f8a34be26c7c243 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 12 Jan 2022 02:39:56 +0000 Subject: [PATCH 03/16] add object store info to ChunkRequestHop --- skylark/gateway/chunk.py | 9 +++--- skylark/gateway/gateway_daemon.py | 2 +- skylark/replicate/replication_plan.py | 2 +- skylark/replicate/replicator_client.py | 40 ++++++++++++++++++++++---- skylark/test/test_replicator_client.py | 13 +++++---- 5 files changed, 49 insertions(+), 17 deletions(-) diff --git a/skylark/gateway/chunk.py b/skylark/gateway/chunk.py index fff0fd7bd..90f13cecb 100644 --- a/skylark/gateway/chunk.py +++ b/skylark/gateway/chunk.py @@ -70,13 +70,14 @@ class ChunkRequestHop: hop_ip_address: str chunk_location_type: str # enum of {"src_object_store", "dst_object_store", "relay", "random_XMB", "save_local"} + # TODO: cleanup # if chunk_location_type == "src_object_store": - # src_object_store_region: str = None # format is provider:region - # src_object_store_bucket: str = None + src_object_store_region: str = None # format is provider:region + src_object_store_bucket: str = None # if chunk_location_type == "dst_object_store": - # dst_object_store_region: str = None # format is provider:region - # dst_object_store_bucket: str = None + dst_object_store_region: str = None # format is provider:region + dst_object_store_bucket: str = None def as_dict(self): return asdict(self) diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index efedbf804..3bf8d5997 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -93,7 +93,7 @@ def exit_handler(signum, frame): self.chunk_store.state_fail(chunk_req.chunk.chunk_id) # update chunk state - self.chunk_store.state_start_download(chunk_req.chunk.chunk_id) + #self.chunk_store.state_start_download(chunk_req.chunk.chunk_id) # function to download data from S3 # TODO: add this to a queue like with GatewaySender to prevent OOM diff --git a/skylark/replicate/replication_plan.py b/skylark/replicate/replication_plan.py index 63e04d03b..1169e021e 100644 --- a/skylark/replicate/replication_plan.py +++ b/skylark/replicate/replication_plan.py @@ -20,7 +20,7 @@ class ReplicationJob: def src_obj_sizes(self): if self.source_region.split(":")[0] == "aws": - interface = S3Interface(self.source_region, self.source_bucket) + interface = S3Interface(self.source_region.split(":")[1], self.source_bucket) get_size = lambda o: interface.get_obj_size(o) else: raise NotImplementedError diff --git a/skylark/replicate/replicator_client.py b/skylark/replicate/replicator_client.py index 3f0de6bca..f4e5a2294 100644 --- a/skylark/replicate/replicator_client.py +++ b/skylark/replicate/replicator_client.py @@ -183,13 +183,21 @@ def run_replication_plan(self, job: ReplicationJob): # make list of chunks chunks = [] + print("get object sizes", job.source_bucket) + obj_file_size_bytes = job.src_obj_sizes() if job.source_bucket else None for idx, obj in enumerate(job.objs): - file_size_bytes = job.random_chunk_size_mb * MB # todo support object store objects + if obj_file_size_bytes: + # object store objects + file_size_bytes = obj_file_size_bytes[idx][1] + else: + # random data + file_size_bytes = job.random_chunk_size_mb * MB + chunks.append( Chunk( key=obj, chunk_id=idx, - file_offset_bytes=0, + file_offset_bytes=0, # TODO: what is this? chunk_length_bytes=file_size_bytes, ) ) @@ -223,10 +231,25 @@ def run_replication_plan(self, job: ReplicationJob): cr_path = [] for hop_idx, hop_instance in enumerate(path): # todo support object stores - if hop_idx == 0: # source gateway - location = f"random_{job.random_chunk_size_mb}MB" - elif hop_idx == len(path) - 1: # destination gateway - location = "save_local" + # TODO: cleanup... + src_object_store_region = None + src_object_store_bucket = None + dst_object_store_region = None + dst_object_store_bucket = None + if hop_idx == 0: + if job.source_bucket: # source bucket + location = "src_bucket" + src_object_store_region = job.source_region + src_object_store_bucket = job.source_bucket + else: # source gateway + location = f"random_{job.random_chunk_size_mb}MB" + elif hop_idx == len(path) - 1: + if job.dest_bucket: # destination bucket + location = "dst_object_store" + dst_object_store_region = job.dest_region + dst_object_store_bucket = job.dest_bucket + else: # destination gateway + location = "save_local" else: # intermediate gateway location = "relay" cr_path.append( @@ -234,6 +257,11 @@ def run_replication_plan(self, job: ReplicationJob): hop_cloud_region=hop_instance.region_tag, hop_ip_address=hop_instance.public_ip(), chunk_location_type=location, + src_object_store_region=src_object_store_region, + src_object_store_bucket=src_object_store_bucket, + dst_object_store_region=dst_object_store_region, + dst_object_store_bucket=dst_object_store_bucket, + ) ) chunk_requests_sharded[path[0]].append(ChunkRequest(chunk, cr_path)) diff --git a/skylark/test/test_replicator_client.py b/skylark/test/test_replicator_client.py index 88d64a816..f79ab9372 100644 --- a/skylark/test/test_replicator_client.py +++ b/skylark/test/test_replicator_client.py @@ -55,9 +55,10 @@ def parse_args(): def main(args): - src_bucket, dst_bucket = f"skylark-{args.src_region.split(':')[1]}", f"skylark-{args.dest_region.split(':')[1]}" - s3_interface_src = S3Interface(args.src_region.split(":")[1], f"{args.bucket_prefix}-{src_bucket}") - s3_interface_dst = S3Interface(args.dest_region.split(":")[1], f"{args.bucket_prefix}-{dst_bucket}") + src_bucket = f"{args.bucket_prefix}-skylark-{args.src_region.split(':')[1]}" + dst_bucket = f"{args.bucket_prefix}-skylark-{args.dest_region.split(':')[1]}" + s3_interface_src = S3Interface(args.src_region.split(":")[1], src_bucket) + s3_interface_dst = S3Interface(args.dest_region.split(":")[1], dst_bucket) s3_interface_src.create_bucket() s3_interface_dst.create_bucket() @@ -65,8 +66,10 @@ def main(args): # todo implement object store support #pass print("Not skipping upload...", src_bucket, dst_bucket) - matching_src_keys = list(s3_interface_src.list_objects(prefix=args.key_prefix)) - matching_dst_keys = list(s3_interface_dst.list_objects(prefix=args.key_prefix)) + + # TODO: fix this to get the key instead of S3Object + matching_src_keys = list([obj.key for obj in s3_interface_src.list_objects(prefix=args.key_prefix)]) + matching_dst_keys = list([obj.key for obj in s3_interface_dst.list_objects(prefix=args.key_prefix)]) if matching_src_keys: logger.warning(f"Deleting objects from source bucket: {matching_src_keys}") s3_interface_src.delete_objects(matching_src_keys) From 4f93f9dc3087fe7249c374b51fe9af406cbb8114 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Sat, 15 Jan 2022 22:42:57 +0000 Subject: [PATCH 04/16] remove --- skylark/replicate/replicator_client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/skylark/replicate/replicator_client.py b/skylark/replicate/replicator_client.py index 60be5cc47..66974417f 100644 --- a/skylark/replicate/replicator_client.py +++ b/skylark/replicate/replicator_client.py @@ -279,10 +279,10 @@ def run_replication_plan(self, job: ReplicationJob): hop_cloud_region=hop_instance.region_tag, hop_ip_address=hop_instance.public_ip(), chunk_location_type=location, - src_object_store_region=src_object_store_region, - src_object_store_bucket=src_object_store_bucket, - dst_object_store_region=dst_object_store_region, - dst_object_store_bucket=dst_object_store_bucket, + #src_object_store_region=src_object_store_region, + #src_object_store_bucket=src_object_store_bucket, + #dst_object_store_region=dst_object_store_region, + #dst_object_store_bucket=dst_object_store_bucket, ) ) From 2f8ae700e6c328551539dabd62b09b406642a5ef Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Sat, 22 Jan 2022 06:24:21 +0000 Subject: [PATCH 05/16] add GCS (google object store) --- skylark/obj_store/gcs_interface.py | 138 +++++++++++++++++++++++++ skylark/test/test_replicator_client.py | 35 +++++-- 2 files changed, 163 insertions(+), 10 deletions(-) create mode 100644 skylark/obj_store/gcs_interface.py diff --git a/skylark/obj_store/gcs_interface.py b/skylark/obj_store/gcs_interface.py new file mode 100644 index 000000000..a6e9e8ccb --- /dev/null +++ b/skylark/obj_store/gcs_interface.py @@ -0,0 +1,138 @@ +import mimetypes +import os +from concurrent.futures import Future +from typing import Iterator, List + +#import botocore.exceptions +#from awscrt.auth import AwsCredentialsProvider +#from awscrt.http import HttpHeaders, HttpRequest +#from awscrt.io import ClientBootstrap, DefaultHostResolver, EventLoopGroup +#from awscrt.s3 import S3Client, S3RequestTlsMode, S3RequestType + +from google.cloud import storage + +from skylark.compute.aws.aws_server import AWSServer +from skylark.obj_store.object_store_interface import NoSuchObjectException, ObjectStoreInterface, ObjectStoreObject + + +class S3Object(ObjectStoreObject): + def full_path(self): + return f"s3://{self.bucket}/{self.key}" + + +class GCSInterface(ObjectStoreInterface): + def __init__(self, gcp_region, bucket_name, use_tls=True, part_size=None, throughput_target_gbps=None): + # TODO: infer region? + self.gcp_region = gcp_region + + self.bucket_name = bucket_name + self.pending_downloads, self.completed_downloads = 0, 0 + self.pending_uploads, self.completed_uploads = 0, 0 + + self.gcs_part_size = part_size + self.gcs_throughput_target_gbps = throughput_target_gbps + + # TODO - figure out how paralllelism handled + self._gcs_client = storage.Client() + + def _on_done_download(self, **kwargs): + self.completed_downloads += 1 + self.pending_downloads -= 1 + + def _on_done_upload(self, **kwargs): + self.completed_uploads += 1 + self.pending_uploads -= 1 + + #def infer_s3_region(self, bucket_name: str): + # s3_client = AWSServer.get_boto3_client("s3") + # region = s3_client.get_bucket_location(Bucket=bucket_name).get("LocationConstraint", "us-east-1") + # return region if region is not None else "us-east-1" + + def bucket_exists(self): + try: + bucket = self._gcs_client.get_bucket(self.bucket_name) + return True + except Exception as e: + print(e) + return False + + def create_bucket(self): + if not self.bucket_exists(): + bucket = self._gcs_client.bucket(self.bucket_name) + bucket.storage_class = "COLDLINE" # TODO: which storage class? + print(self.gcp_region) + new_bucket = self._gcs_client.create_bucket(bucket, location=self.gcp_region) + assert self.bucket_exists() + + #def list_objects(self, prefix="") -> Iterator[S3Object]: + # prefix = prefix if not prefix.startswith("/") else prefix[1:] + # s3_client = AWSServer.get_boto3_client("s3", self.aws_region) + # paginator = s3_client.get_paginator("list_objects_v2") + # page_iterator = paginator.paginate(Bucket=self.bucket_name, Prefix=prefix) + # for page in page_iterator: + # for obj in page.get("Contents", []): + # yield S3Object("s3", self.bucket_name, obj["Key"], obj["Size"], obj["LastModified"]) + + #def delete_objects(self, keys: List[str]): + # s3_client = AWSServer.get_boto3_client("s3", self.aws_region) + # while keys: + # batch, keys = keys[:1000], keys[1000:] # take up to 1000 keys at a time + # s3_client.delete_objects(Bucket=self.bucket_name, Delete={"Objects": [{"Key": k} for k in batch]}) + + #def get_obj_metadata(self, obj_name): + # s3_resource = AWSServer.get_boto3_resource("s3", self.aws_region).Bucket(self.bucket_name) + # try: + # return s3_resource.Object(str(obj_name).lstrip("/")) + # except botocore.exceptions.ClientError as e: + # raise NoSuchObjectException(f"Object {obj_name} does not exist, or you do not have permission to access it") from e + + #def get_obj_size(self, obj_name): + # return self.get_obj_metadata(obj_name).content_length + + #def exists(self, obj_name): + # try: + # self.get_obj_metadata(obj_name) + # return True + # except NoSuchObjectException: + # return False + + ## todo: implement range request for download + #def download_object(self, src_object_name, dst_file_path) -> Future: + # src_object_name, dst_file_path = str(src_object_name), str(dst_file_path) + # src_object_name = "/" + src_object_name if src_object_name[0] != "/" else src_object_name + # download_headers = HttpHeaders([("host", self.bucket_name + ".s3." + self.aws_region + ".amazonaws.com")]) + # request = HttpRequest("GET", src_object_name, download_headers) + + # def _on_body_download(offset, chunk, **kwargs): + # if not os.path.exists(dst_file_path): + # open(dst_file_path, "a").close() + # with open(dst_file_path, "rb+") as f: + # f.seek(offset) + # f.write(chunk) + + # return self._s3_client.make_request( + # recv_filepath=dst_file_path, + # request=request, + # type=S3RequestType.GET_OBJECT, + # on_done=self._on_done_download, + # on_body=_on_body_download, + # ).finished_future + + #def upload_object(self, src_file_path, dst_object_name, content_type="infer") -> Future: + # print("uploading object", src_file_path, dst_object_name) + # src_file_path, dst_object_name = str(src_file_path), str(dst_object_name) + # dst_object_name = "/" + dst_object_name if dst_object_name[0] != "/" else dst_object_name + # content_len = os.path.getsize(src_file_path) + # if content_type == "infer": + # content_type = mimetypes.guess_type(src_file_path)[0] or "application/octet-stream" + # upload_headers = HttpHeaders() + # upload_headers.add("host", self.bucket_name + ".s3." + self.aws_region + ".amazonaws.com") + # upload_headers.add("Content-Type", content_type) + # upload_headers.add("Content-Length", str(content_len)) + # request = HttpRequest("PUT", dst_object_name, upload_headers) + # return self._s3_client.make_request( + # send_filepath=src_file_path, + # request=request, + # type=S3RequestType.PUT_OBJECT, + # on_done=self._on_done_upload, + # ).finished_future diff --git a/skylark/test/test_replicator_client.py b/skylark/test/test_replicator_client.py index 6716cf888..9935e41b8 100644 --- a/skylark/test/test_replicator_client.py +++ b/skylark/test/test_replicator_client.py @@ -7,11 +7,11 @@ import concurrent import os from skylark.obj_store.s3_interface import S3Interface +from skylark.obj_store.gcs_interface import GCSInterface import tempfile import concurrent import os -from skylark.obj_store.s3_interface import S3Interface from skylark.replicate.replication_plan import ReplicationJob, ReplicationTopology from skylark.replicate.replicator_client import ReplicatorClient @@ -60,12 +60,27 @@ def parse_args(): def main(args): + src_bucket = f"{args.bucket_prefix}-skylark-{args.src_region.split(':')[1]}" dst_bucket = f"{args.bucket_prefix}-skylark-{args.dest_region.split(':')[1]}" - s3_interface_src = S3Interface(args.src_region.split(":")[1], src_bucket) - s3_interface_dst = S3Interface(args.dest_region.split(":")[1], dst_bucket) - s3_interface_src.create_bucket() - s3_interface_dst.create_bucket() + + if "aws" in args.src_region: + obj_store_interface_src = S3Interface(args.src_region.split(":")[1], src_bucket) + elif "gcp" in args.src_region: + obj_store_interface_src = GCSInterface(args.src_region.split(":")[1][:-2], src_bucket) + else: + raise ValueError(f"No region in source region {args.src_region}") + + if "aws" in args.dest_region: + obj_store_interface_dst = S3Interface(args.dest_region.split(":")[1], dst_bucket) + elif "gcp" in args.dest_region: + obj_store_interface_dst = GCSInterface(args.dest_region.split(":")[1][:-2], dst_bucket) + else: + raise ValueError(f"No region in destination region {args.dst_region}") + + obj_store_interface_src.create_bucket() + obj_store_interface_dst.create_bucket() + if not args.skip_upload: # todo implement object store support @@ -73,14 +88,14 @@ def main(args): print("Not skipping upload...", src_bucket, dst_bucket) # TODO: fix this to get the key instead of S3Object - matching_src_keys = list([obj.key for obj in s3_interface_src.list_objects(prefix=args.key_prefix)]) - matching_dst_keys = list([obj.key for obj in s3_interface_dst.list_objects(prefix=args.key_prefix)]) + matching_src_keys = list([obj.key for obj in obj_store_interface_src.list_objects(prefix=args.key_prefix)]) + matching_dst_keys = list([obj.key for obj in obj_store_interface_dst.list_objects(prefix=args.key_prefix)]) if matching_src_keys: logger.warning(f"Deleting {len(matching_src_keys)} objects from source bucket") - s3_interface_src.delete_objects(matching_src_keys) + obj_store_interface_src.delete_objects(matching_src_keys) if matching_dst_keys: logger.warning(f"Deleting {len(matching_dst_keys)} objects from destination bucket") - s3_interface_dst.delete_objects(matching_dst_keys) + obj_store_interface_dst.delete_objects(matching_dst_keys) # create test objects w/ random data logger.info("Creating test objects") @@ -93,7 +108,7 @@ def main(args): f.seek(0) for i in range(args.n_chunks): k = f"{args.key_prefix}/{i}" - futures.append(s3_interface_src.upload_object(f.name, k)) + futures.append(obj_store_interface_src.upload_object(f.name, k)) obj_keys.append(k) concurrent.futures.wait(futures) else: From 947e146bd62cf33e4ffcda5ded5cae411199e9f3 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Thu, 27 Jan 2022 00:39:18 +0000 Subject: [PATCH 06/16] initial gcp cli support implementation --- skylark/cli/cli.py | 6 ++- skylark/cli/cli_helper.py | 51 ++++++++++++++++++++++- skylark/compute/aws/aws_cloud_provider.py | 36 ++++++++-------- 3 files changed, 71 insertions(+), 22 deletions(-) diff --git a/skylark/cli/cli.py b/skylark/cli/cli.py index d1e50fa12..6eb8baff9 100644 --- a/skylark/cli/cli.py +++ b/skylark/cli/cli.py @@ -32,6 +32,8 @@ copy_local_local, copy_local_s3, copy_s3_local, + copy_gcs_local, + copy_local_gcs, deprovision_skylark_instances, load_config, ls_local, @@ -78,9 +80,9 @@ def cp(src: str, dst: str): copy_local_s3(Path(path_src), bucket_dst, path_dst) elif provider_src == "s3" and provider_dst == "local": copy_s3_local(bucket_src, path_src, Path(path_dst)) - elif provider_src == "local" and provider_dst == "gcs": + elif provider_src == "local" and provider_dst == "gs": copy_local_gcs(Path(path_src), bucket_dst, path_dst) - elif provider_src == "gcs" and provider_dst == "local": + elif provider_src == "gs" and provider_dst == "local": copy_gcs_local(bucket_src, path_src, Path(path_dst)) else: raise NotImplementedError(f"{provider_src} to {provider_dst} not supported yet") diff --git a/skylark/cli/cli_helper.py b/skylark/cli/cli_helper.py index 56f32cfc5..a4a42e107 100644 --- a/skylark/cli/cli_helper.py +++ b/skylark/cli/cli_helper.py @@ -16,6 +16,7 @@ from skylark.compute.gcp.gcp_cloud_provider import GCPCloudProvider from skylark.obj_store.object_store_interface import ObjectStoreObject from skylark.obj_store.s3_interface import S3Interface +from skylark.obj_store.gcs_interface import GCSInterface from skylark.utils.utils import do_parallel from tqdm import tqdm @@ -84,11 +85,57 @@ def copy_local_local(src: Path, dst: Path): dst.parent.mkdir(exist_ok=True, parents=True) copyfile(src, dst) +# TODO: probably shoudl merge this with the s3 function (duplicate code) def copy_local_gcs(src: Path, dst_bucket: str, dst_key: str): - raise NotImplementedError(f"GCS not yet supported") + gcs = GCSInterface(None, dst_bucket) + ops: List[concurrent.futures.Future] = [] + path_mapping: Dict[concurrent.futures.Future, Path] = {} + def _copy(path: Path, dst_key: str, total_size=0.0): + if path.is_dir(): + for child in path.iterdir(): + total_size += _copy(child, os.path.join(dst_key, child.name)) + return total_size + else: + future = gcp.upload_object(path, dst_key) + ops.append(future) + path_mapping[future] = path + return path.stat().st_size + + total_bytes = _copy(src, dst_key) + + # wait for all uploads to complete, displaying a progress bar + with tqdm(total=total_bytes, unit="B", unit_scale=True, unit_divisor=1024, desc="Uploading") as pbar: + for op in concurrent.futures.as_completed(ops): + op.result() + pbar.update(path_mapping[op].stat().st_size) + +# TODO: probably shoudl merge this with the s3 function (duplicate code) def copy_gcs_local(src_bucket: str, src_key: str, dst: Path): - raise NotImplementedError(f"GCS not yet supported") + gcs = GCSInterface(None, src_bucket) + ops: List[concurrent.futures.Future] = [] + obj_mapping: Dict[concurrent.futures.Future, ObjectStoreObject] = {} + + # copy single object + def _copy(src_obj: ObjectStoreObject, dst: Path): + dst.parent.mkdir(exist_ok=True, parents=True) + future = gcs.download_object(src_obj.key, dst) + ops.append(future) + obj_mapping[future] = src_obj + return src_obj.size + + total_bytes = 0.0 + for obj in gcs.list_objects(prefix=src_key): + sub_key = obj.key[len(src_key) :] + sub_key = sub_key.lstrip("/") + dest_path = dst / sub_key + total_bytes += _copy(obj, dest_path) + + # wait for all downloads to complete, displaying a progress bar + with tqdm(total=total_bytes, unit="B", unit_scale=True, unit_divisor=1024, desc="Downloading") as pbar: + for op in concurrent.futures.as_completed(ops): + op.result() + pbar.update(obj_mapping[op].size) def copy_local_s3(src: Path, dst_bucket: str, dst_key: str, use_tls: bool = True): s3 = S3Interface(None, dst_bucket, use_tls=use_tls) diff --git a/skylark/compute/aws/aws_cloud_provider.py b/skylark/compute/aws/aws_cloud_provider.py index 9119fc356..fb4f19a52 100644 --- a/skylark/compute/aws/aws_cloud_provider.py +++ b/skylark/compute/aws/aws_cloud_provider.py @@ -24,24 +24,24 @@ def name(self): @staticmethod def region_list() -> List[str]: all_regions = [ - "af-south-1", - "ap-northeast-1", - "ap-northeast-2", - "ap-northeast-3", - "ap-east-1", - "ap-south-1", - "ap-southeast-1", - "ap-southeast-2", - "ap-southeast-3", - "ca-central-1", - "eu-central-1", - "eu-north-1", - "eu-south-1", - "eu-west-1", - "eu-west-2", - "eu-west-3", - "me-south-1", - "sa-east-1", + #"af-south-1", + #"ap-northeast-1", + #"ap-northeast-2", + #"ap-northeast-3", + #"ap-east-1", + #"ap-south-1", + #"ap-southeast-1", + #"ap-southeast-2", + #"ap-southeast-3", + #"ca-central-1", + #"eu-central-1", + #"eu-north-1", + #"eu-south-1", + #"eu-west-1", + #"eu-west-2", + #"eu-west-3", + #"me-south-1", + #"sa-east-1", "us-east-1", "us-east-2", "us-west-1", From da43f97abb31ed9975486238b88feda504051af2 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Thu, 27 Jan 2022 00:46:30 +0000 Subject: [PATCH 07/16] cleanup --- skylark/cli/cli.py | 4 ++-- skylark/cli/cli_helper.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/skylark/cli/cli.py b/skylark/cli/cli.py index 8a3269d26..bf9c2b725 100644 --- a/skylark/cli/cli.py +++ b/skylark/cli/cli.py @@ -34,8 +34,8 @@ copy_local_local, copy_local_s3, copy_s3_local, - copy_gcs_local, - copy_local_gcs, + copy_gcs_local, + copy_local_gcs, deprovision_skylark_instances, load_config, ls_local, diff --git a/skylark/cli/cli_helper.py b/skylark/cli/cli_helper.py index 4392e9cfd..a76a3edbf 100644 --- a/skylark/cli/cli_helper.py +++ b/skylark/cli/cli_helper.py @@ -85,6 +85,7 @@ def copy_local_local(src: Path, dst: Path): dst.parent.mkdir(exist_ok=True, parents=True) copyfile(src, dst) + # TODO: probably shoudl merge this with the s3 function (duplicate code) def copy_local_gcs(src: Path, dst_bucket: str, dst_key: str): gcs = GCSInterface(None, dst_bucket) @@ -97,7 +98,7 @@ def _copy(path: Path, dst_key: str, total_size=0.0): total_size += _copy(child, os.path.join(dst_key, child.name)) return total_size else: - future = gcp.upload_object(path, dst_key) + future = gcs.upload_object(path, dst_key) ops.append(future) path_mapping[future] = path return path.stat().st_size @@ -110,6 +111,7 @@ def _copy(path: Path, dst_key: str, total_size=0.0): op.result() pbar.update(path_mapping[op].stat().st_size) + # TODO: probably shoudl merge this with the s3 function (duplicate code) def copy_gcs_local(src_bucket: str, src_key: str, dst: Path): gcs = GCSInterface(None, src_bucket) From 05bc3da0d0f19b89e22738cb0a50c9a83740aab8 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Thu, 27 Jan 2022 03:03:52 +0000 Subject: [PATCH 08/16] fix arg --- skylark/test/test_replicator_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/skylark/test/test_replicator_client.py b/skylark/test/test_replicator_client.py index 352af83b6..114db2c86 100644 --- a/skylark/test/test_replicator_client.py +++ b/skylark/test/test_replicator_client.py @@ -77,7 +77,7 @@ def main(args): elif "gcp" in args.dest_region: obj_store_interface_dst = GCSInterface(args.dest_region.split(":")[1][:-2], dst_bucket) else: - raise ValueError(f"No region in destination region {args.dst_region}") + raise ValueError(f"No region in destination region {args.dest_region}") obj_store_interface_src.create_bucket() obj_store_interface_dst.create_bucket() @@ -130,11 +130,11 @@ def main(args): topo = ReplicationTopology() for i in range(args.num_gateways): topo.add_edge(args.src_region, i, args.inter_region, i, args.num_outgoing_connections) - topo.add_edge(args.inter_region, i, args.dst_region, i, args.num_outgoing_connections) + topo.add_edge(args.inter_region, i, args.dest_region, i, args.num_outgoing_connections) else: topo = ReplicationTopology() for i in range(args.num_gateways): - topo.add_edge(args.src_region, i, args.dst_region, i, args.num_outgoing_connections) + topo.add_edge(args.src_region, i, args.dest_region, i, args.num_outgoing_connections) logger.info("Creating replication client") rc = ReplicatorClient( topo, From fd44721c05f7d6f518d65eb4d8ec336ee73b844e Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Sun, 30 Jan 2022 06:09:22 +0000 Subject: [PATCH 09/16] remove prints --- skylark/obj_store/s3_interface.py | 3 --- skylark/test/test_replicator_client.py | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/skylark/obj_store/s3_interface.py b/skylark/obj_store/s3_interface.py index 404ba66a5..7b9f83939 100644 --- a/skylark/obj_store/s3_interface.py +++ b/skylark/obj_store/s3_interface.py @@ -63,8 +63,6 @@ def infer_s3_region(self, bucket_name: str): def bucket_exists(self): s3_client = AWSServer.get_boto3_client("s3", self.aws_region) - print("bucket name", self.bucket_name) - print([b["Name"] for b in s3_client.list_buckets()["Buckets"]]) return self.bucket_name in [b["Name"] for b in s3_client.list_buckets()["Buckets"]] def create_bucket(self): @@ -131,7 +129,6 @@ def _on_body_download(offset, chunk, **kwargs): ).finished_future def upload_object(self, src_file_path, dst_object_name, content_type="infer") -> Future: - print("uploading object", src_file_path, dst_object_name) src_file_path, dst_object_name = str(src_file_path), str(dst_object_name) dst_object_name = "/" + dst_object_name if dst_object_name[0] != "/" else dst_object_name content_len = os.path.getsize(src_file_path) diff --git a/skylark/test/test_replicator_client.py b/skylark/test/test_replicator_client.py index 8441dc81f..8c5e1a413 100644 --- a/skylark/test/test_replicator_client.py +++ b/skylark/test/test_replicator_client.py @@ -8,7 +8,7 @@ import os from skylark.obj_store.s3_interface import S3Interface from skylark.obj_store.gcs_interface import GCSInterface -from skylark.obj_store.azure_interface import AzureInterface +#from skylark.obj_store.azure_interface import AzureInterface import tempfile import concurrent From baa99c649b77568620e15bceba4b289630d5c746 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Mon, 31 Jan 2022 01:45:23 +0000 Subject: [PATCH 10/16] integrate replicate-json commant with obj store --- skylark/cli/cli.py | 17 ++++++++++++++--- skylark/compute/aws/aws_cloud_provider.py | 2 ++ skylark/test/test_replicator_client.py | 2 +- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/skylark/cli/cli.py b/skylark/cli/cli.py index 3611d3110..3fae21304 100644 --- a/skylark/cli/cli.py +++ b/skylark/cli/cli.py @@ -193,8 +193,10 @@ def replicate_json( size_total_mb: int = typer.Option(2048, "--size-total-mb", "-s", help="Total transfer size in MB (across n_chunks chunks)"), n_chunks: int = 512, # bucket options - use_random_data: bool = True, + use_random_data: bool = typer.Option(False, "--random-data", help="Use random data"), bucket_prefix: str = "skylark", + source_bucket: str = typer.Option(None, "--source-bucket", help="Source bucket url"), + dest_bucket: str = typer.Option(None, "--dest-bucket", help="Destination bucket url"), key_prefix: str = "/test/replicate_random", # gateway provisioning options reuse_gateways: bool = True, @@ -202,7 +204,7 @@ def replicate_json( # cloud provider specific options azure_subscription: Optional[str] = None, gcp_project: Optional[str] = None, - aws_instance_class: str = "m5.8xlarge", + aws_instance_class: str = "m5.4xlarge", azure_instance_class: str = "Standard_D32_v5", gcp_instance_class: Optional[str] = "n2-standard-32", gcp_use_premium_network: bool = True, @@ -256,7 +258,16 @@ def replicate_json( random_chunk_size_mb=chunk_size_mb, ) else: - raise NotImplementedError() + # TODO: Don't hardcode n_chunks + # TODO: Don't hardcode obj keys + job = ReplicationJob( + source_region=topo.source_region(), + source_bucket=source_bucket, + dest_region=topo.sink_region(), + dest_bucket=dest_bucket, + objs=[f"{key_prefix}/{i}" for i in range(n_chunks)], + random_chunk_size_mb=chunk_size_mb, + ) total_bytes = n_chunks * chunk_size_mb * MB job = rc.run_replication_plan(job) diff --git a/skylark/compute/aws/aws_cloud_provider.py b/skylark/compute/aws/aws_cloud_provider.py index 01f30d043..84a0b3713 100644 --- a/skylark/compute/aws/aws_cloud_provider.py +++ b/skylark/compute/aws/aws_cloud_provider.py @@ -245,6 +245,7 @@ def provision_instance( # catch botocore.exceptions.ClientError: "An error occurred (RequestLimitExceeded) when calling the RunInstances operation (reached max retries: 4): Request limit exceeded." and retry for i in range(4): try: + print("region", region) instance = ec2.create_instances( ImageId=self.get_ubuntu_ami_id(region), InstanceType=instance_class, @@ -274,6 +275,7 @@ def provision_instance( ], ) except botocore.exceptions.ClientError as e: + print("error region", region) if not "RequestLimitExceeded" in str(e): raise e else: diff --git a/skylark/test/test_replicator_client.py b/skylark/test/test_replicator_client.py index a6389eca1..46c1cacaf 100644 --- a/skylark/test/test_replicator_client.py +++ b/skylark/test/test_replicator_client.py @@ -35,7 +35,7 @@ def parse_args(): # object information parser.add_argument("--key-prefix", default="/test/direct_replication", help="S3 key prefix for all objects") parser.add_argument("--chunk-size-mb", default=128, type=int, help="Chunk size in MB") - parser.add_argument("--n-chunks", default=16, type=int, help="Number of chunks in bucket") + parser.add_argument("--n-chunks", default=512, type=int, help="Number of chunks in bucket") parser.add_argument("--skip-upload", action="store_true", help="Skip uploading objects to S3") # bucket namespace From 060f356923a40c46daab2fb55c5dd1dd29d76e32 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Mon, 31 Jan 2022 05:20:12 +0000 Subject: [PATCH 11/16] about to make big changes --- skylark/cli/cli.py | 9 +++-- skylark/compute/aws/aws_cloud_provider.py | 40 +++++++++---------- skylark/compute/server.py | 2 + skylark/gateway/chunk_store.py | 2 +- skylark/gateway/gateway_daemon.py | 8 +++- skylark/obj_store/gcs_interface.py | 48 ++++++++++++++++------- skylark/replicate/replicator_client.py | 1 + 7 files changed, 68 insertions(+), 42 deletions(-) diff --git a/skylark/cli/cli.py b/skylark/cli/cli.py index 3fae21304..94e2275ac 100644 --- a/skylark/cli/cli.py +++ b/skylark/cli/cli.py @@ -191,7 +191,7 @@ def replicate_random( def replicate_json( path: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, help="Path to JSON file describing replication plan"), size_total_mb: int = typer.Option(2048, "--size-total-mb", "-s", help="Total transfer size in MB (across n_chunks chunks)"), - n_chunks: int = 512, + n_chunks: int = typer.Option(512, "--n-chunks", "-n", help="Number of chunks"), # bucket options use_random_data: bool = typer.Option(False, "--random-data", help="Use random data"), bucket_prefix: str = "skylark", @@ -257,6 +257,8 @@ def replicate_json( objs=[f"{key_prefix}/{i}" for i in range(n_chunks)], random_chunk_size_mb=chunk_size_mb, ) + job = rc.run_replication_plan(job) + total_bytes = n_chunks * chunk_size_mb * MB else: # TODO: Don't hardcode n_chunks # TODO: Don't hardcode obj keys @@ -266,11 +268,10 @@ def replicate_json( dest_region=topo.sink_region(), dest_bucket=dest_bucket, objs=[f"{key_prefix}/{i}" for i in range(n_chunks)], - random_chunk_size_mb=chunk_size_mb, ) + job = rc.run_replication_plan(job) + total_bytes = sum([chunk_req.chunk.chunk_length_bytes for chunk_req in job.chunk_requests]) - total_bytes = n_chunks * chunk_size_mb * MB - job = rc.run_replication_plan(job) logger.info(f"{total_bytes / GB:.2f}GByte replication job launched") stats = rc.monitor_transfer( job, show_pbar=True, log_interval_s=log_interval_s, time_limit_seconds=time_limit_seconds, cancel_pending=False diff --git a/skylark/compute/aws/aws_cloud_provider.py b/skylark/compute/aws/aws_cloud_provider.py index 84a0b3713..0a1660adf 100644 --- a/skylark/compute/aws/aws_cloud_provider.py +++ b/skylark/compute/aws/aws_cloud_provider.py @@ -24,28 +24,28 @@ def name(self): @staticmethod def region_list() -> List[str]: all_regions = [ - "af-south-1", - "ap-northeast-1", - "ap-northeast-2", - "ap-northeast-3", - "ap-east-1", - "ap-south-1", - "ap-southeast-1", - "ap-southeast-2", - # "ap-southeast-3", # too new region, not well supported - "ca-central-1", - "eu-central-1", - "eu-north-1", - "eu-south-1", - "eu-west-1", - "eu-west-2", - "eu-west-3", - "me-south-1", - "sa-east-1", + #"af-south-1", + #"ap-northeast-1", + #"ap-northeast-2", + #"ap-northeast-3", + #"ap-east-1", + #"ap-south-1", + #"ap-southeast-1", + #"ap-southeast-2", + ## "ap-southeast-3", # too new region, not well supported + #"ca-central-1", + #"eu-central-1", + #"eu-north-1", + #"eu-south-1", + #"eu-west-1", + #"eu-west-2", + #"eu-west-3", + #"me-south-1", + #"sa-east-1", "us-east-1", "us-east-2", "us-west-1", - "us-west-2", + #"us-west-2", ] return all_regions @@ -245,7 +245,6 @@ def provision_instance( # catch botocore.exceptions.ClientError: "An error occurred (RequestLimitExceeded) when calling the RunInstances operation (reached max retries: 4): Request limit exceeded." and retry for i in range(4): try: - print("region", region) instance = ec2.create_instances( ImageId=self.get_ubuntu_ami_id(region), InstanceType=instance_class, @@ -275,7 +274,6 @@ def provision_instance( ], ) except botocore.exceptions.ClientError as e: - print("error region", region) if not "RequestLimitExceeded" in str(e): raise e else: diff --git a/skylark/compute/server.py b/skylark/compute/server.py index 469fe6d59..fb1e3bc68 100644 --- a/skylark/compute/server.py +++ b/skylark/compute/server.py @@ -248,6 +248,8 @@ def check_stderr(tup): gateway_daemon_cmd = f"python -u /pkg/skylark/gateway/gateway_daemon.py --chunk-dir /dev/shm/skylark/chunks --outgoing-ports '{json.dumps(outgoing_ports)}' --region {self.region_tag}" docker_launch_cmd = f"sudo docker run {docker_run_flags} --name skylark_gateway {gateway_docker_image} {gateway_daemon_cmd}" start_out, start_err = self.run_command(docker_launch_cmd) + print(start_out) + print(start_err) logger.debug(desc_prefix + f": Gateway started {start_out.strip()}") assert not start_err.strip(), f"Error starting gateway: {start_err.strip()}" gateway_container_hash = start_out.strip().split("\n")[-1][:12] diff --git a/skylark/gateway/chunk_store.py b/skylark/gateway/chunk_store.py index 41aa7ea4c..e82bbe899 100644 --- a/skylark/gateway/chunk_store.py +++ b/skylark/gateway/chunk_store.py @@ -43,7 +43,7 @@ def set_chunk_state(self, chunk_id: int, new_status: ChunkState): old_status = self.chunk_status.get(chunk_id) self.chunk_status[chunk_id] = new_status self.chunk_status_log.append({"chunk_id": chunk_id, "state": new_status, "time": datetime.utcnow()}) - logger.info(f"[chunk_store]:{chunk_id} state change from {old_status} to {new_status}") + #logger.info(f"[chunk_store]:{chunk_id} state change from {old_status} to {new_status}") def get_chunk_status_log(self) -> List[Dict]: return list(self.chunk_status_log) diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index 3e593fd62..142379781 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -86,7 +86,12 @@ def fn(chunk_req, dst_region, dst_bucket): fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute()) obj_store_interface = self.get_obj_store_interface(dst_region, dst_bucket) with self.ul_pool_semaphore: - obj_store_interface.upload_object(fpath, chunk_req.chunk.key).result() + with Timer() as t: + obj_store_interface.upload_object(fpath, chunk_req.chunk.key).result() + + mbps = chunk_req.chunk.chunk_length_bytes / t.elapsed / MB * 8 + logger.info(f"[gateway_daemon] Uploaded {chunk_req.chunk.key} at {mbps:.2f}Mbps") + logger.info(f"Pending upload: {obj_store_interface.pending_uploads}, uploaded: {obj_store_interface.completed_uploads}") self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id) # start in seperate thread @@ -131,6 +136,7 @@ def fn(chunk_req, src_region, src_bucket): obj_store_interface.download_object(chunk_req.chunk.key, fpath).result() mbps = chunk_req.chunk.chunk_length_bytes / t.elapsed / MB * 8 logger.info(f"[gateway_daemon] Downloaded {chunk_req.chunk.key} at {mbps:.2f}Mbps") + #logger.info(f"Pending download: {obj_store_interface.pending_downloads}, downloads: {obj_store_interface.completed_downloads}") self.chunk_store.chunk_requests[chunk_req.chunk.chunk_id] = chunk_req self.chunk_store.state_finish_download(chunk_req.chunk.chunk_id) diff --git a/skylark/obj_store/gcs_interface.py b/skylark/obj_store/gcs_interface.py index af105b9d5..b8d5af124 100644 --- a/skylark/obj_store/gcs_interface.py +++ b/skylark/obj_store/gcs_interface.py @@ -1,6 +1,7 @@ import mimetypes import os from concurrent.futures import Future, ThreadPoolExecutor +from concurrent.futures import ProcessPoolExecutor from typing import Iterator, List from google.cloud import storage # pytype: disable=import-error @@ -22,11 +23,11 @@ def __init__(self, gcp_region, bucket_name): self.pending_downloads, self.completed_downloads = 0, 0 self.pending_uploads, self.completed_uploads = 0, 0 - # TODO - figure out how paralllelism handled - self._gcs_client = storage.Client() + #self._gcs_client = storage.Client() # TODO: set number of threads - self.pool = ThreadPoolExecutor(max_workers=16) + #self.pool = ThreadPoolExecutor(max_workers=16) + self.pool = ProcessPoolExecutor(max_workers=16) def _on_done_download(self, **kwargs): self.completed_downloads += 1 @@ -66,7 +67,8 @@ def delete_objects(self, keys: List[str]): assert not self.exists(key) def get_obj_metadata(self, obj_name): - bucket = self._gcs_client.bucket(self.bucket_name) + gcs_client = storage.Client() + bucket = gcs_client.bucket(self.bucket_name) blob = bucket.get_blob(obj_name) if blob is None: raise NoSuchObjectException( @@ -84,25 +86,41 @@ def exists(self, obj_name): except NoSuchObjectException: return False + def _download_object_helper(self, src_object_name, dst_file_path, offset, **kwargs): + + gcs_client = storage.Client() + bucket = gcs_client.bucket(self.bucket_name) + blob = bucket.blob(src_object_name) + chunk = blob.download_as_string() + + # write file + if not os.path.exists(dst_file_path): + open(dst_file_path, "a").close() + with open(dst_file_path, "rb+") as f: + f.seek(offset) + f.write(chunk) + + # todo: implement range request for download def download_object(self, src_object_name, dst_file_path) -> Future: src_object_name, dst_file_path = str(src_object_name), str(dst_file_path) src_object_name = src_object_name if src_object_name[0] != "/" else src_object_name + return self.pool.submit(self._download_object_helper, src_object_name, dst_file_path, 0) - def _download_object_helper(offset, **kwargs): + #def _download_object_helper(offset, **kwargs): - bucket = self._gcs_client.bucket(self.bucket_name) - blob = bucket.blob(src_object_name) - chunk = blob.download_as_string() + # bucket = self._gcs_client.bucket(self.bucket_name) + # blob = bucket.blob(src_object_name) + # chunk = blob.download_as_string() - # write file - if not os.path.exists(dst_file_path): - open(dst_file_path, "a").close() - with open(dst_file_path, "rb+") as f: - f.seek(offset) - f.write(chunk) + # # write file + # if not os.path.exists(dst_file_path): + # open(dst_file_path, "a").close() + # with open(dst_file_path, "rb+") as f: + # f.seek(offset) + # f.write(chunk) - return self.pool.submit(_download_object_helper, 0) + #return self.pool.submit(_download_object_helper, 0) def upload_object(self, src_file_path, dst_object_name, content_type="infer") -> Future: src_file_path, dst_object_name = str(src_file_path), str(dst_object_name) diff --git a/skylark/replicate/replicator_client.py b/skylark/replicate/replicator_client.py index 83ac1bfd9..180215eb0 100644 --- a/skylark/replicate/replicator_client.py +++ b/skylark/replicate/replicator_client.py @@ -206,6 +206,7 @@ def run_replication_plan(self, job: ReplicationJob) -> ReplicationJob: # make list of chunks chunks = [] + print(job.source_bucket, job.objs[0]) obj_file_size_bytes = job.src_obj_sizes() if job.source_bucket else None for idx, obj in enumerate(job.objs): if obj_file_size_bytes: From b15e9d8717f4a7ff6f730c6df8857a3885010ca9 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Tue, 1 Feb 2022 02:47:58 +0000 Subject: [PATCH 12/16] draft of new object store process pool --- skylark/chunk.py | 1 + skylark/gateway/gateway_daemon.py | 103 ++++++++++++++++++----------- skylark/gateway/gateway_sender.py | 7 ++ skylark/obj_store/gcs_interface.py | 48 +++++--------- skylark/obj_store/s3_interface.py | 6 +- 5 files changed, 90 insertions(+), 75 deletions(-) diff --git a/skylark/chunk.py b/skylark/chunk.py index 332bd4c34..5bbbf341a 100644 --- a/skylark/chunk.py +++ b/skylark/chunk.py @@ -40,6 +40,7 @@ class ChunkRequest: src_object_store_bucket: Optional[str] = None dst_object_store_bucket: Optional[str] = None + def __post_init__(self): if self.src_type == "object_store": assert self.src_object_store_bucket is not None diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index 142379781..3e2533c1f 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -11,6 +11,9 @@ from threading import BoundedSemaphore from typing import Dict +from multiprocessing import Process +import concurrent.futures + import setproctitle from skylark.utils import logger from skylark import MB, print_header @@ -20,6 +23,8 @@ from skylark.gateway.gateway_receiver import GatewayReceiver from skylark.gateway.gateway_sender import GatewaySender +from skylark.gateway.gateway_obj_store import GatewayObjStoreConn + from skylark.obj_store.object_store_interface import ObjectStoreInterface from skylark.utils.utils import Timer @@ -30,7 +35,10 @@ def __init__(self, region: str, outgoing_ports: Dict[str, int], chunk_dir: PathL self.chunk_store = ChunkStore(chunk_dir) self.gateway_receiver = GatewayReceiver(chunk_store=self.chunk_store, max_pending_chunks=max_incoming_ports) self.gateway_sender = GatewaySender(chunk_store=self.chunk_store, outgoing_ports=outgoing_ports) - self.obj_store_interfaces: Dict[str, ObjectStoreInterface] = {} + print(outgoing_ports) + + self.obj_store_conn = GatewayObjStoreConn(chunk_store=self.chunk_store, max_conn=128) + #self.obj_store_interfaces: Dict[str, ObjectStoreInterface] = {} # Download thread pool self.dl_pool_semaphore = BoundedSemaphore(value=128) @@ -42,12 +50,12 @@ def __init__(self, region: str, outgoing_ports: Dict[str, int], chunk_dir: PathL self.api_server.start() logger.info(f"[gateway_daemon] API started at {self.api_server.url}") - def get_obj_store_interface(self, region: str, bucket: str) -> ObjectStoreInterface: - key = f"{region}:{bucket}" - if key not in self.obj_store_interfaces: - logger.warning(f"[gateway_daemon] ObjectStoreInferface not cached for {key}") - self.obj_store_interfaces[key] = ObjectStoreInterface.create(region, bucket) - return self.obj_store_interfaces[key] + #def get_obj_store_interface(self, region: str, bucket: str) -> ObjectStoreInterface: + # key = f"{region}:{bucket}" + # if key not in self.obj_store_interfaces: + # logger.warning(f"[gateway_daemon] ObjectStoreInferface not cached for {key}") + # self.obj_store_interfaces[key] = ObjectStoreInterface.create(region, bucket) + # return self.obj_store_interfaces[key] def cleanup(self): logger.warning("[gateway_daemon] Shutting down gateway daemon") @@ -62,10 +70,12 @@ def exit_handler(signum, frame): exit_flag.set() self.gateway_receiver.stop_servers() self.gateway_sender.stop_workers() + self.obj_store_conn.stop_workers() sys.exit(0) logger.info("[gateway_daemon] Starting gateway sender workers") self.gateway_sender.start_workers() + self.obj_store_conn.start_workers() signal.signal(signal.SIGINT, exit_handler) signal.signal(signal.SIGTERM, exit_handler) @@ -78,24 +88,27 @@ def exit_handler(signum, frame): self.chunk_store.state_start_upload(chunk_req.chunk.chunk_id) self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id) elif self.region == chunk_req.dst_region and chunk_req.dst_type == "object_store": + #print("queue", chunk_req) self.chunk_store.state_queue_upload(chunk_req.chunk.chunk_id) - self.chunk_store.state_start_upload(chunk_req.chunk.chunk_id) - - # function to upload data to object store - def fn(chunk_req, dst_region, dst_bucket): - fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute()) - obj_store_interface = self.get_obj_store_interface(dst_region, dst_bucket) - with self.ul_pool_semaphore: - with Timer() as t: - obj_store_interface.upload_object(fpath, chunk_req.chunk.key).result() - - mbps = chunk_req.chunk.chunk_length_bytes / t.elapsed / MB * 8 - logger.info(f"[gateway_daemon] Uploaded {chunk_req.chunk.key} at {mbps:.2f}Mbps") - logger.info(f"Pending upload: {obj_store_interface.pending_uploads}, uploaded: {obj_store_interface.completed_uploads}") - self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id) - - # start in seperate thread - threading.Thread(target=fn, args=(chunk_req, chunk_req.dst_region, chunk_req.dst_object_store_bucket)).start() + self.obj_store_conn.queue_request(chunk_req, "upload") + #self.chunk_store.state_start_upload(chunk_req.chunk.chunk_id) + + ## function to upload data to object store + #def fn(chunk_req, dst_region, dst_bucket): + # fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute()) + # obj_store_interface = self.get_obj_store_interface(dst_region, dst_bucket) + # with self.ul_pool_semaphore: + # with Timer() as t: + # obj_store_interface.upload_object(fpath, chunk_req.chunk.key).result() + + # mbps = chunk_req.chunk.chunk_length_bytes / t.elapsed / MB * 8 + # logger.info(f"[gateway_daemon] Uploaded {chunk_req.chunk.key} at {mbps:.2f}Mbps") + # logger.info(f"Pending upload: {obj_store_interface.pending_uploads}, uploaded: {obj_store_interface.completed_uploads}") + # self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id) + + ## start in seperate thread + ##threading.Thread(target=fn, args=(chunk_req, chunk_req.dst_region, chunk_req.dst_object_store_bucket)).start() + #p = Process(target=fn, args=(chunk_req, chunk_req.dst_region, chunk_req.dst_object_store_bucket)).start() elif self.region != chunk_req.dst_region: self.gateway_sender.queue_request(chunk_req) self.chunk_store.state_queue_upload(chunk_req.chunk.chunk_id) @@ -125,23 +138,33 @@ def fn(chunk_req, size_mb): # generate random data in seperate thread threading.Thread(target=fn, args=(chunk_req, size_mb)).start() elif self.region == chunk_req.src_region and chunk_req.src_type == "object_store": + #print("queue", chunk_req) self.chunk_store.state_start_download(chunk_req.chunk.chunk_id) - - # function to download data from source object store - def fn(chunk_req, src_region, src_bucket): - fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute()) - obj_store_interface = self.get_obj_store_interface(src_region, src_bucket) - with self.dl_pool_semaphore: - with Timer() as t: - obj_store_interface.download_object(chunk_req.chunk.key, fpath).result() - mbps = chunk_req.chunk.chunk_length_bytes / t.elapsed / MB * 8 - logger.info(f"[gateway_daemon] Downloaded {chunk_req.chunk.key} at {mbps:.2f}Mbps") - #logger.info(f"Pending download: {obj_store_interface.pending_downloads}, downloads: {obj_store_interface.completed_downloads}") - self.chunk_store.chunk_requests[chunk_req.chunk.chunk_id] = chunk_req - self.chunk_store.state_finish_download(chunk_req.chunk.chunk_id) - - # start in seperate thread - threading.Thread(target=fn, args=(chunk_req, chunk_req.src_region, chunk_req.src_object_store_bucket)).start() + self.obj_store_conn.queue_request(chunk_req, "download") + + ## function to download data from source object store + #def fn(chunk_req, src_region, src_bucket): + # fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute()) + # obj_store_interface = self.get_obj_store_interface(src_region, src_bucket) + # with self.dl_pool_semaphore: + # with Timer() as t: + # #print("create process") + # #p = Process(obj_store_interface.download_object(chunk_req.chunk.key, fpath).result) + # #p.start() + # #p.join() + # obj_store_interface.download_object(chunk_req.chunk.key, fpath).result() + # mbps = chunk_req.chunk.chunk_length_bytes / t.elapsed / MB * 8 + # logger.info(f"[gateway_daemon] Downloaded {chunk_req.chunk.key} at {mbps:.2f}Mbps") + # #logger.info(f"Pending download: {obj_store_interface.pending_downloads}, downloads: {obj_store_interface.completed_downloads}") + # self.chunk_store.chunk_requests[chunk_req.chunk.chunk_id] = chunk_req + # self.chunk_store.state_finish_download(chunk_req.chunk.chunk_id) + # print("process complete") + + ## start in seperate thread + ##threading.Thread(target=fn, args=(chunk_req, chunk_req.src_region, chunk_req.src_object_store_bucket)).start() + #print("submit task") + ##executor.submit(fn, chunk_req, chunk_req.src_region, chunk_req.src_object_store_bucket) + #p = Process(target=fn, args=(chunk_req, chunk_req.src_region, chunk_req.src_object_store_bucket)).start() elif self.region != chunk_req.src_region: # do nothing, waiting for chunk to be be ready_to_upload continue else: diff --git a/skylark/gateway/gateway_sender.py b/skylark/gateway/gateway_sender.py index b33063bde..588a4ed3e 100644 --- a/skylark/gateway/gateway_sender.py +++ b/skylark/gateway/gateway_sender.py @@ -45,10 +45,12 @@ def stop_workers(self): p.join() self.processes = [] + def worker_loop(self, worker_id: int, dest_ip: str): setproctitle.setproctitle(f"skylark-gateway-sender:{worker_id}") self.worker_id = worker_id + while not self.exit_flags[worker_id].is_set(): try: next_chunk_id = self.worker_queue.get_nowait() @@ -92,6 +94,11 @@ def wait_for_chunks(): def queue_request(self, chunk_request: ChunkRequest): self.worker_queue.put(chunk_request.chunk.chunk_id) + + + + + # send chunks to other instances def send_chunks(self, chunk_ids: List[int], dst_host: str): """Send list of chunks to gateway server, pipelining small chunks together into a single socket stream.""" # notify server of upcoming ChunkRequests diff --git a/skylark/obj_store/gcs_interface.py b/skylark/obj_store/gcs_interface.py index b8d5af124..9c9a73710 100644 --- a/skylark/obj_store/gcs_interface.py +++ b/skylark/obj_store/gcs_interface.py @@ -1,7 +1,6 @@ import mimetypes import os from concurrent.futures import Future, ThreadPoolExecutor -from concurrent.futures import ProcessPoolExecutor from typing import Iterator, List from google.cloud import storage # pytype: disable=import-error @@ -23,11 +22,11 @@ def __init__(self, gcp_region, bucket_name): self.pending_downloads, self.completed_downloads = 0, 0 self.pending_uploads, self.completed_uploads = 0, 0 - #self._gcs_client = storage.Client() + # TODO - figure out how paralllelism handled + self._gcs_client = storage.Client() # TODO: set number of threads - #self.pool = ThreadPoolExecutor(max_workers=16) - self.pool = ProcessPoolExecutor(max_workers=16) + self.pool = ThreadPoolExecutor(max_workers=256) def _on_done_download(self, **kwargs): self.completed_downloads += 1 @@ -67,8 +66,7 @@ def delete_objects(self, keys: List[str]): assert not self.exists(key) def get_obj_metadata(self, obj_name): - gcs_client = storage.Client() - bucket = gcs_client.bucket(self.bucket_name) + bucket = self._gcs_client.bucket(self.bucket_name) blob = bucket.get_blob(obj_name) if blob is None: raise NoSuchObjectException( @@ -86,41 +84,25 @@ def exists(self, obj_name): except NoSuchObjectException: return False - def _download_object_helper(self, src_object_name, dst_file_path, offset, **kwargs): - - gcs_client = storage.Client() - bucket = gcs_client.bucket(self.bucket_name) - blob = bucket.blob(src_object_name) - chunk = blob.download_as_string() - - # write file - if not os.path.exists(dst_file_path): - open(dst_file_path, "a").close() - with open(dst_file_path, "rb+") as f: - f.seek(offset) - f.write(chunk) - - # todo: implement range request for download def download_object(self, src_object_name, dst_file_path) -> Future: src_object_name, dst_file_path = str(src_object_name), str(dst_file_path) src_object_name = src_object_name if src_object_name[0] != "/" else src_object_name - return self.pool.submit(self._download_object_helper, src_object_name, dst_file_path, 0) - #def _download_object_helper(offset, **kwargs): + def _download_object_helper(offset, **kwargs): - # bucket = self._gcs_client.bucket(self.bucket_name) - # blob = bucket.blob(src_object_name) - # chunk = blob.download_as_string() + bucket = self._gcs_client.bucket(self.bucket_name) + blob = bucket.blob(src_object_name) + chunk = blob.download_as_string() - # # write file - # if not os.path.exists(dst_file_path): - # open(dst_file_path, "a").close() - # with open(dst_file_path, "rb+") as f: - # f.seek(offset) - # f.write(chunk) + # write file + if not os.path.exists(dst_file_path): + open(dst_file_path, "a").close() + with open(dst_file_path, "rb+") as f: + f.seek(offset) + f.write(chunk) - #return self.pool.submit(_download_object_helper, 0) + return self.pool.submit(_download_object_helper, 0) def upload_object(self, src_file_path, dst_object_name, content_type="infer") -> Future: src_file_path, dst_object_name = str(src_file_path), str(dst_object_name) diff --git a/skylark/obj_store/s3_interface.py b/skylark/obj_store/s3_interface.py index b1b4ed190..29714eeee 100644 --- a/skylark/obj_store/s3_interface.py +++ b/skylark/obj_store/s3_interface.py @@ -1,6 +1,5 @@ import mimetypes import os -from concurrent.futures import Future from typing import Iterator, List import botocore.exceptions @@ -27,7 +26,10 @@ def __init__(self, aws_region, bucket_name, use_tls=True, part_size=None, throug self.pending_uploads, self.completed_uploads = 0, 0 self.s3_part_size = part_size self.s3_throughput_target_gbps = throughput_target_gbps - event_loop_group = EventLoopGroup(num_threads=os.cpu_count(), cpu_group=None) + #num_threads=os.cpu_count() + #num_threads=256 + num_threads=256 + event_loop_group = EventLoopGroup(num_threads=num_threads, cpu_group=None) host_resolver = DefaultHostResolver(event_loop_group) bootstrap = ClientBootstrap(event_loop_group, host_resolver) From 9e9c8b6dcdf2d6ad37d29f7edd23556c8f6a162b Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Tue, 1 Feb 2022 02:49:36 +0000 Subject: [PATCH 13/16] add object store conn file --- skylark/gateway/gateway_obj_store.py | 126 +++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 skylark/gateway/gateway_obj_store.py diff --git a/skylark/gateway/gateway_obj_store.py b/skylark/gateway/gateway_obj_store.py new file mode 100644 index 000000000..88fb5aa00 --- /dev/null +++ b/skylark/gateway/gateway_obj_store.py @@ -0,0 +1,126 @@ +import queue +import socket +from multiprocessing import Event, Manager, Process, Value +from typing import Dict, List, Optional + +import requests +import setproctitle +from skylark.utils import logger +from skylark import MB +from skylark.chunk import ChunkRequest +from skylark.gateway.chunk_store import ChunkStore +from skylark.utils.utils import Timer, wait_for + +import concurrent.futures +from skylark.obj_store.object_store_interface import ObjectStoreInterface + +from dataclasses import dataclass + +@dataclass +class ObjStoreRequest: + chunk_req: ChunkRequest + req_type: str + + + + + +class GatewayObjStoreConn: + def __init__(self, chunk_store, max_conn=32): + + self.chunk_store = chunk_store + self.n_processes = max_conn + self.processes = [] + #self.processes = concurrent.futures.ProcessPoolExecutor(max_conn) + + # shared state + self.manager = Manager() + self.next_worker_id = Value("i", 0) + self.worker_queue: queue.Queue[ObjectStoreRequest] = self.manager.Queue() + self.exit_flags = [Event() for _ in range(self.n_processes)] + + # process-local state + self.worker_id: Optional[int] = None + self.obj_store_interfaces: Dict[str, ObjectStoreInterface] = {} + + # TODO: fill these out? + self.uploaded_chunk_ids: List[int] = [] + self.downloaded_chunk_ids: List[int] = [] + + # interact with object store + def get_obj_store_interface(self, region: str, bucket: str) -> ObjectStoreInterface: + key = f"{region}:{bucket}" + if key not in self.obj_store_interfaces: + logger.warning(f"[gateway_daemon] ObjectStoreInferface not cached for {key}") + self.obj_store_interfaces[key] = ObjectStoreInterface.create(region, bucket) + return self.obj_store_interfaces[key] + + def start_workers(self): + print("creating workers object store") + for i in range(self.n_processes): + p = Process(target=self.worker_loop, args=(i,)) + p.start() + self.processes.append(p) + + def stop_workers(self): + for i in range(self.n_processes): + self.exit_flags[i].set() + for p in self.processes: + p.join() + self.processes = [] + + def worker_loop(self, worker_id: int): + setproctitle.setproctitle(f"skylark-gateway-sender:{worker_id}") + self.worker_id = worker_id + + + while not self.exit_flags[worker_id].is_set(): + try: + request = self.worker_queue.get_nowait() + chunk_req = request.chunk_req + req_type = request.req_type + + except queue.Empty: + continue + + fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute()) + logger.debug(f"[obj_store:{self.worker_id}] Received chunk ID {chunk_req.chunk.chunk_id}") + + if req_type == "upload": + assert chunk_req.dst_type == "object_store" + # TODO: upload to object store + region = chunk_req.dst_region + bucket = chunk_req.dst_object_store_bucket + + self.chunk_store.state_start_upload(chunk_req.chunk.chunk_id) + + obj_store_interface = self.get_obj_store_interface(region, bucket) + obj_store_interface.upload_object(fpath, chunk_req.chunk.key).result() + + logger.debug(f"[obj_store:{self.worker_id}] Uploaded {chunk_req.chunk.chunk_id} to {bucket}") + self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id) + + elif req_type == "download": + assert chunk_req.src_type == "object_store" + # TODO: download from object store + region = chunk_req.src_region + bucket = chunk_req.src_object_store_bucket + + obj_store_interface = self.get_obj_store_interface(region, bucket) + obj_store_interface.download_object(chunk_req.chunk.key, fpath).result() + + logger.debug(f"[obj_store:{self.worker_id}] Downloaded {chunk_req.chunk.chunk_id} from {bucket}") + self.chunk_store.state_finish_download(chunk_req.chunk.chunk_id) + else: + raise ValueError(f"Invalid location for chunk req, {req_type}: {chunk_req.src_type}->{chunk_req.dst_type}") + + + + # close destination sockets + logger.info(f"[sender:{worker_id}] exiting") + + # TODO: wait for uploads to finish (check chunk exists) + + def queue_request(self, chunk_request: ChunkRequest, request_type: str): + self.worker_queue.put(ObjStoreRequest(chunk_request, request_type)) + From 9976ac9c6fa48bbb434a93d1fc6320a1b7bf5336 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Tue, 1 Feb 2022 04:21:55 +0000 Subject: [PATCH 14/16] add waiting in seperate thread --- skylark/gateway/gateway_daemon.py | 2 +- skylark/gateway/gateway_obj_store.py | 44 ++++++++++++++++++++++------ skylark/obj_store/s3_interface.py | 1 + 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index 3e2533c1f..fc14c3c51 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -37,7 +37,7 @@ def __init__(self, region: str, outgoing_ports: Dict[str, int], chunk_dir: PathL self.gateway_sender = GatewaySender(chunk_store=self.chunk_store, outgoing_ports=outgoing_ports) print(outgoing_ports) - self.obj_store_conn = GatewayObjStoreConn(chunk_store=self.chunk_store, max_conn=128) + self.obj_store_conn = GatewayObjStoreConn(chunk_store=self.chunk_store, max_conn=32) #self.obj_store_interfaces: Dict[str, ObjectStoreInterface] = {} # Download thread pool diff --git a/skylark/gateway/gateway_obj_store.py b/skylark/gateway/gateway_obj_store.py index 88fb5aa00..e220330e2 100644 --- a/skylark/gateway/gateway_obj_store.py +++ b/skylark/gateway/gateway_obj_store.py @@ -1,4 +1,5 @@ import queue +import threading import socket from multiprocessing import Event, Manager, Process, Value from typing import Dict, List, Optional @@ -69,11 +70,14 @@ def stop_workers(self): p.join() self.processes = [] + def download(region, bucket, fpath, key): + obj_store_interface = self.get_obj_store_interface(region, bucket) + def worker_loop(self, worker_id: int): setproctitle.setproctitle(f"skylark-gateway-sender:{worker_id}") self.worker_id = worker_id - + while not self.exit_flags[worker_id].is_set(): try: request = self.worker_queue.get_nowait() @@ -94,27 +98,49 @@ def worker_loop(self, worker_id: int): self.chunk_store.state_start_upload(chunk_req.chunk.chunk_id) - obj_store_interface = self.get_obj_store_interface(region, bucket) - obj_store_interface.upload_object(fpath, chunk_req.chunk.key).result() + logger.debug(f"[obj_store:{self.worker_id}] Start upload {chunk_req.chunk.chunk_id} to {bucket}") + def upload(region, bucket, fpath, key, chunk_id): + obj_store_interface = self.get_obj_store_interface(region, bucket) + obj_store_interface.upload_object(fpath, key).result() + self.chunk_store.state_finish_upload(chunk_id) + logger.debug(f"[obj_store:{self.worker_id}] Uploaded {chunk_id} to {bucket}") + + threading.Thread(target=upload, args=(region, bucket, fpath, chunk_req.chunk.key, chunk_req.chunk.chunk_id)).start() - logger.debug(f"[obj_store:{self.worker_id}] Uploaded {chunk_req.chunk.chunk_id} to {bucket}") - self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id) + #obj_store_interface = self.get_obj_store_interface(region, bucket) + + #futures.append(obj_store_interface.upload_object(fpath, chunk_req.chunk.key)) #.result() + + #logger.debug(f"[obj_store:{self.worker_id}] Uploaded {chunk_req.chunk.chunk_id} to {bucket}") + #self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id) elif req_type == "download": assert chunk_req.src_type == "object_store" # TODO: download from object store region = chunk_req.src_region bucket = chunk_req.src_object_store_bucket + + logger.debug(f"[obj_store:{self.worker_id}] Starting download {chunk_req.chunk.chunk_id} from {bucket}") + def download(region, bucket, fpath, key, chunk_id): + obj_store_interface = self.get_obj_store_interface(region, bucket) + obj_store_interface.download_object(key, fpath).result() + self.chunk_store.state_finish_download(chunk_id) + logger.debug(f"[obj_store:{self.worker_id}] Downloaded {chunk_id} from {bucket}") + + # wait for request to return in sepearte thread, so we can update chunk state + threading.Thread(target=download, args=(region, bucket, fpath, chunk_req.chunk.key, chunk_req.chunk.chunk_id)).start() + - obj_store_interface = self.get_obj_store_interface(region, bucket) - obj_store_interface.download_object(chunk_req.chunk.key, fpath).result() + #obj_store_interface = self.get_obj_store_interface(region, bucket) + #obj_store_interface.download_object(chunk_req.chunk.key, fpath) #.result() - logger.debug(f"[obj_store:{self.worker_id}] Downloaded {chunk_req.chunk.chunk_id} from {bucket}") - self.chunk_store.state_finish_download(chunk_req.chunk.chunk_id) + #logger.debug(f"[obj_store:{self.worker_id}] Downloaded {chunk_req.chunk.chunk_id} from {bucket}") + #self.chunk_store.state_finish_download(chunk_req.chunk.chunk_id) else: raise ValueError(f"Invalid location for chunk req, {req_type}: {chunk_req.src_type}->{chunk_req.dst_type}") + wait(futures) # close destination sockets logger.info(f"[sender:{worker_id}] exiting") diff --git a/skylark/obj_store/s3_interface.py b/skylark/obj_store/s3_interface.py index 29714eeee..5476ff610 100644 --- a/skylark/obj_store/s3_interface.py +++ b/skylark/obj_store/s3_interface.py @@ -2,6 +2,7 @@ import os from typing import Iterator, List +from concurrent.futures import Future, ThreadPoolExecutor import botocore.exceptions from awscrt.auth import AwsCredentialsProvider from awscrt.http import HttpHeaders, HttpRequest From 5e3c6f51cd0f7bc7d36a127a6b9d1e652a5dd9c7 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Tue, 1 Feb 2022 06:04:03 +0000 Subject: [PATCH 15/16] cleanup --- skylark/chunk.py | 1 - skylark/cli/cli.py | 4 +- skylark/compute/aws/aws_cloud_provider.py | 38 ++++++++-------- skylark/gateway/gateway_daemon.py | 53 +---------------------- skylark/gateway/gateway_obj_store.py | 31 +++---------- 5 files changed, 27 insertions(+), 100 deletions(-) diff --git a/skylark/chunk.py b/skylark/chunk.py index 5bbbf341a..332bd4c34 100644 --- a/skylark/chunk.py +++ b/skylark/chunk.py @@ -40,7 +40,6 @@ class ChunkRequest: src_object_store_bucket: Optional[str] = None dst_object_store_bucket: Optional[str] = None - def __post_init__(self): if self.src_type == "object_store": assert self.src_object_store_bucket is not None diff --git a/skylark/cli/cli.py b/skylark/cli/cli.py index 94e2275ac..8eede52d2 100644 --- a/skylark/cli/cli.py +++ b/skylark/cli/cli.py @@ -193,7 +193,7 @@ def replicate_json( size_total_mb: int = typer.Option(2048, "--size-total-mb", "-s", help="Total transfer size in MB (across n_chunks chunks)"), n_chunks: int = typer.Option(512, "--n-chunks", "-n", help="Number of chunks"), # bucket options - use_random_data: bool = typer.Option(False, "--random-data", help="Use random data"), + use_random_data: bool = False, bucket_prefix: str = "skylark", source_bucket: str = typer.Option(None, "--source-bucket", help="Source bucket url"), dest_bucket: str = typer.Option(None, "--dest-bucket", help="Destination bucket url"), @@ -204,7 +204,7 @@ def replicate_json( # cloud provider specific options azure_subscription: Optional[str] = None, gcp_project: Optional[str] = None, - aws_instance_class: str = "m5.4xlarge", + aws_instance_class: str = "m5.8xlarge", azure_instance_class: str = "Standard_D32_v5", gcp_instance_class: Optional[str] = "n2-standard-32", gcp_use_premium_network: bool = True, diff --git a/skylark/compute/aws/aws_cloud_provider.py b/skylark/compute/aws/aws_cloud_provider.py index 0a1660adf..01f30d043 100644 --- a/skylark/compute/aws/aws_cloud_provider.py +++ b/skylark/compute/aws/aws_cloud_provider.py @@ -24,28 +24,28 @@ def name(self): @staticmethod def region_list() -> List[str]: all_regions = [ - #"af-south-1", - #"ap-northeast-1", - #"ap-northeast-2", - #"ap-northeast-3", - #"ap-east-1", - #"ap-south-1", - #"ap-southeast-1", - #"ap-southeast-2", - ## "ap-southeast-3", # too new region, not well supported - #"ca-central-1", - #"eu-central-1", - #"eu-north-1", - #"eu-south-1", - #"eu-west-1", - #"eu-west-2", - #"eu-west-3", - #"me-south-1", - #"sa-east-1", + "af-south-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-east-1", + "ap-south-1", + "ap-southeast-1", + "ap-southeast-2", + # "ap-southeast-3", # too new region, not well supported + "ca-central-1", + "eu-central-1", + "eu-north-1", + "eu-south-1", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "me-south-1", + "sa-east-1", "us-east-1", "us-east-2", "us-west-1", - #"us-west-2", + "us-west-2", ] return all_regions diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index fc14c3c51..ebdbd9a83 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -50,13 +50,6 @@ def __init__(self, region: str, outgoing_ports: Dict[str, int], chunk_dir: PathL self.api_server.start() logger.info(f"[gateway_daemon] API started at {self.api_server.url}") - #def get_obj_store_interface(self, region: str, bucket: str) -> ObjectStoreInterface: - # key = f"{region}:{bucket}" - # if key not in self.obj_store_interfaces: - # logger.warning(f"[gateway_daemon] ObjectStoreInferface not cached for {key}") - # self.obj_store_interfaces[key] = ObjectStoreInterface.create(region, bucket) - # return self.obj_store_interfaces[key] - def cleanup(self): logger.warning("[gateway_daemon] Shutting down gateway daemon") self.api_server.shutdown() @@ -88,28 +81,9 @@ def exit_handler(signum, frame): self.chunk_store.state_start_upload(chunk_req.chunk.chunk_id) self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id) elif self.region == chunk_req.dst_region and chunk_req.dst_type == "object_store": - #print("queue", chunk_req) self.chunk_store.state_queue_upload(chunk_req.chunk.chunk_id) self.obj_store_conn.queue_request(chunk_req, "upload") - #self.chunk_store.state_start_upload(chunk_req.chunk.chunk_id) - - ## function to upload data to object store - #def fn(chunk_req, dst_region, dst_bucket): - # fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute()) - # obj_store_interface = self.get_obj_store_interface(dst_region, dst_bucket) - # with self.ul_pool_semaphore: - # with Timer() as t: - # obj_store_interface.upload_object(fpath, chunk_req.chunk.key).result() - - # mbps = chunk_req.chunk.chunk_length_bytes / t.elapsed / MB * 8 - # logger.info(f"[gateway_daemon] Uploaded {chunk_req.chunk.key} at {mbps:.2f}Mbps") - # logger.info(f"Pending upload: {obj_store_interface.pending_uploads}, uploaded: {obj_store_interface.completed_uploads}") - # self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id) - - ## start in seperate thread - ##threading.Thread(target=fn, args=(chunk_req, chunk_req.dst_region, chunk_req.dst_object_store_bucket)).start() - #p = Process(target=fn, args=(chunk_req, chunk_req.dst_region, chunk_req.dst_object_store_bucket)).start() - elif self.region != chunk_req.dst_region: + elif self.region != chunk_req.dst_region: self.gateway_sender.queue_request(chunk_req) self.chunk_store.state_queue_upload(chunk_req.chunk.chunk_id) else: @@ -138,33 +112,8 @@ def fn(chunk_req, size_mb): # generate random data in seperate thread threading.Thread(target=fn, args=(chunk_req, size_mb)).start() elif self.region == chunk_req.src_region and chunk_req.src_type == "object_store": - #print("queue", chunk_req) self.chunk_store.state_start_download(chunk_req.chunk.chunk_id) self.obj_store_conn.queue_request(chunk_req, "download") - - ## function to download data from source object store - #def fn(chunk_req, src_region, src_bucket): - # fpath = str(self.chunk_store.get_chunk_file_path(chunk_req.chunk.chunk_id).absolute()) - # obj_store_interface = self.get_obj_store_interface(src_region, src_bucket) - # with self.dl_pool_semaphore: - # with Timer() as t: - # #print("create process") - # #p = Process(obj_store_interface.download_object(chunk_req.chunk.key, fpath).result) - # #p.start() - # #p.join() - # obj_store_interface.download_object(chunk_req.chunk.key, fpath).result() - # mbps = chunk_req.chunk.chunk_length_bytes / t.elapsed / MB * 8 - # logger.info(f"[gateway_daemon] Downloaded {chunk_req.chunk.key} at {mbps:.2f}Mbps") - # #logger.info(f"Pending download: {obj_store_interface.pending_downloads}, downloads: {obj_store_interface.completed_downloads}") - # self.chunk_store.chunk_requests[chunk_req.chunk.chunk_id] = chunk_req - # self.chunk_store.state_finish_download(chunk_req.chunk.chunk_id) - # print("process complete") - - ## start in seperate thread - ##threading.Thread(target=fn, args=(chunk_req, chunk_req.src_region, chunk_req.src_object_store_bucket)).start() - #print("submit task") - ##executor.submit(fn, chunk_req, chunk_req.src_region, chunk_req.src_object_store_bucket) - #p = Process(target=fn, args=(chunk_req, chunk_req.src_region, chunk_req.src_object_store_bucket)).start() elif self.region != chunk_req.src_region: # do nothing, waiting for chunk to be be ready_to_upload continue else: diff --git a/skylark/gateway/gateway_obj_store.py b/skylark/gateway/gateway_obj_store.py index e220330e2..b127d9837 100644 --- a/skylark/gateway/gateway_obj_store.py +++ b/skylark/gateway/gateway_obj_store.py @@ -22,17 +22,12 @@ class ObjStoreRequest: chunk_req: ChunkRequest req_type: str - - - - class GatewayObjStoreConn: def __init__(self, chunk_store, max_conn=32): self.chunk_store = chunk_store self.n_processes = max_conn self.processes = [] - #self.processes = concurrent.futures.ProcessPoolExecutor(max_conn) # shared state self.manager = Manager() @@ -44,9 +39,6 @@ def __init__(self, chunk_store, max_conn=32): self.worker_id: Optional[int] = None self.obj_store_interfaces: Dict[str, ObjectStoreInterface] = {} - # TODO: fill these out? - self.uploaded_chunk_ids: List[int] = [] - self.downloaded_chunk_ids: List[int] = [] # interact with object store def get_obj_store_interface(self, region: str, bucket: str) -> ObjectStoreInterface: @@ -57,7 +49,6 @@ def get_obj_store_interface(self, region: str, bucket: str) -> ObjectStoreInterf return self.obj_store_interfaces[key] def start_workers(self): - print("creating workers object store") for i in range(self.n_processes): p = Process(target=self.worker_loop, args=(i,)) p.start() @@ -92,7 +83,6 @@ def worker_loop(self, worker_id: int): if req_type == "upload": assert chunk_req.dst_type == "object_store" - # TODO: upload to object store region = chunk_req.dst_region bucket = chunk_req.dst_object_store_bucket @@ -102,21 +92,16 @@ def worker_loop(self, worker_id: int): def upload(region, bucket, fpath, key, chunk_id): obj_store_interface = self.get_obj_store_interface(region, bucket) obj_store_interface.upload_object(fpath, key).result() + + # update chunk state self.chunk_store.state_finish_upload(chunk_id) logger.debug(f"[obj_store:{self.worker_id}] Uploaded {chunk_id} to {bucket}") + # wait for upload in seperate thread threading.Thread(target=upload, args=(region, bucket, fpath, chunk_req.chunk.key, chunk_req.chunk.chunk_id)).start() - #obj_store_interface = self.get_obj_store_interface(region, bucket) - - #futures.append(obj_store_interface.upload_object(fpath, chunk_req.chunk.key)) #.result() - - #logger.debug(f"[obj_store:{self.worker_id}] Uploaded {chunk_req.chunk.chunk_id} to {bucket}") - #self.chunk_store.state_finish_upload(chunk_req.chunk.chunk_id) - elif req_type == "download": assert chunk_req.src_type == "object_store" - # TODO: download from object store region = chunk_req.src_region bucket = chunk_req.src_object_store_bucket @@ -124,24 +109,18 @@ def upload(region, bucket, fpath, key, chunk_id): def download(region, bucket, fpath, key, chunk_id): obj_store_interface = self.get_obj_store_interface(region, bucket) obj_store_interface.download_object(key, fpath).result() + + # update chunk state self.chunk_store.state_finish_download(chunk_id) logger.debug(f"[obj_store:{self.worker_id}] Downloaded {chunk_id} from {bucket}") # wait for request to return in sepearte thread, so we can update chunk state threading.Thread(target=download, args=(region, bucket, fpath, chunk_req.chunk.key, chunk_req.chunk.chunk_id)).start() - - #obj_store_interface = self.get_obj_store_interface(region, bucket) - #obj_store_interface.download_object(chunk_req.chunk.key, fpath) #.result() - - #logger.debug(f"[obj_store:{self.worker_id}] Downloaded {chunk_req.chunk.chunk_id} from {bucket}") - #self.chunk_store.state_finish_download(chunk_req.chunk.chunk_id) else: raise ValueError(f"Invalid location for chunk req, {req_type}: {chunk_req.src_type}->{chunk_req.dst_type}") - wait(futures) - # close destination sockets logger.info(f"[sender:{worker_id}] exiting") From a9e31cb639ef3f7384ca61a1598a4521aaf76e07 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Tue, 1 Feb 2022 06:18:54 +0000 Subject: [PATCH 16/16] cleanup --- skylark/gateway/gateway_daemon.py | 2 +- skylark/gateway/gateway_obj_store.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/skylark/gateway/gateway_daemon.py b/skylark/gateway/gateway_daemon.py index ebdbd9a83..4809d384c 100644 --- a/skylark/gateway/gateway_daemon.py +++ b/skylark/gateway/gateway_daemon.py @@ -83,7 +83,7 @@ def exit_handler(signum, frame): elif self.region == chunk_req.dst_region and chunk_req.dst_type == "object_store": self.chunk_store.state_queue_upload(chunk_req.chunk.chunk_id) self.obj_store_conn.queue_request(chunk_req, "upload") - elif self.region != chunk_req.dst_region: + elif self.region != chunk_req.dst_region: self.gateway_sender.queue_request(chunk_req) self.chunk_store.state_queue_upload(chunk_req.chunk.chunk_id) else: diff --git a/skylark/gateway/gateway_obj_store.py b/skylark/gateway/gateway_obj_store.py index b127d9837..5bf355521 100644 --- a/skylark/gateway/gateway_obj_store.py +++ b/skylark/gateway/gateway_obj_store.py @@ -65,7 +65,7 @@ def download(region, bucket, fpath, key): obj_store_interface = self.get_obj_store_interface(region, bucket) def worker_loop(self, worker_id: int): - setproctitle.setproctitle(f"skylark-gateway-sender:{worker_id}") + setproctitle.setproctitle(f"skylark-gateway-obj-store:{worker_id}") self.worker_id = worker_id @@ -122,7 +122,7 @@ def download(region, bucket, fpath, key, chunk_id): # close destination sockets - logger.info(f"[sender:{worker_id}] exiting") + logger.info(f"[obj_store:{worker_id}] exiting") # TODO: wait for uploads to finish (check chunk exists)