From 6925337424b6358eb91ad77602a439401b6add10 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 03:16:20 -0500 Subject: [PATCH 01/30] Add logic to distribute read load across different files in S3 --- baseplate/sidecars/live_data_watcher.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index a929f1b0e..a12a6fa31 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -6,6 +6,7 @@ import os import sys import time +import random from enum import Enum from pathlib import Path @@ -122,10 +123,31 @@ def _load_from_s3(data: bytes) -> bytes: # and json is an easier format for znode authors to work with. loader_config = json.loads(data.decode("UTF-8")) try: + # Default to 1 since we generate random numbers from 0 to num_file_shards exclusive. + # We can't assume that every caller of this method will be using prefix sharding on + # their S3 objects. + num_file_shards = loader_config.get("num_file_shards", 1) + + # If the num_file_shards key is present, we may have multiple copies of the same manifest + # uploaded so fetch one randomly using a randomly generated prefix. + # Generate a random number from 0 to num_file_shards exclusive to use as prefix. + file_key_prefix = random.randrange(num_file_shards) + + # If 0 is generated, don’t append a prefix, fetch the file with no prefix + # since we always upload one file without a prefix. + if file_key_prefix == 0: + sharded_file_key_prefix = "" + else: + # If any other number is generated, fetch one of the copies of the + # file which has an included prefix. + sharded_file_key_prefix = str(file_key_prefix) + "/" + + # Append prefix (if it exists) to our original file key. + file_key = sharded_file_key_prefix + loader_config["file_key"] region_name = loader_config["region_name"] s3_kwargs = { "Bucket": loader_config["bucket_name"], - "Key": loader_config["file_key"], + "Key": file_key, "SSECustomerKey": loader_config["sse_key"], "SSECustomerAlgorithm": "AES256", } From 6d08a4781ea8f6d8c178cbf00a9f6f97126c3f4c Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 03:21:02 -0500 Subject: [PATCH 02/30] lint --- baseplate/sidecars/live_data_watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index a12a6fa31..0e6786e98 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -4,9 +4,9 @@ import json import logging import os +import random import sys import time -import random from enum import Enum from pathlib import Path From f139232dd5918dfeb1c34bdd07d6ad465dbc9d9d Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 03:31:26 -0500 Subject: [PATCH 03/30] add test for live data watcher --- .../unit/sidecars/live_data_watcher_tests.py | 39 +++++++++++++++---- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index 0a7916a32..cfc3657a4 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -13,6 +13,8 @@ from baseplate.sidecars.live_data_watcher import NodeWatcher +NUM_FILE_SHARDS = 5 + class NodeWatcherTests(unittest.TestCase): mock_s3 = mock_s3() @@ -26,13 +28,21 @@ def setUp(self): region_name="us-east-1", ) s3_client.create_bucket(Bucket=bucket_name) - s3_client.put_object( - Bucket=bucket_name, - Key="test_file_key", - Body=json.dumps(s3_data).encode(), - SSECustomerKey="test_decryption_key", - SSECustomerAlgorithm="AES256", - ) + default_file_key = "test_file_key" + for file_shard_num in range(NUM_FILE_SHARDS): + if file_shard_num == 0: + # The first copy should just be the original file. + sharded_file_key = default_file_key + else: + # All other copies should include the sharded prefix. + sharded_file_key = str(file_shard_num) + "/" + default_file_key + s3_client.put_object( + Bucket=bucket_name, + Key=sharded_file_key, + Body=json.dumps(s3_data).encode(), + SSECustomerKey="test_decryption_key", + SSECustomerAlgorithm="AES256", + ) def tearDown(self): self.mock_s3.stop() @@ -53,6 +63,21 @@ def test_s3_load_type_on_change(self): self.assertEqual(dest.owner(), pwd.getpwuid(os.getuid()).pw_name) self.assertEqual(dest.group(), grp.getgrgid(os.getgid()).gr_name) + def test_s3_load_type_sharded_on_change(self): + dest = self.output_dir.joinpath("data.txt") + inst = NodeWatcher(str(dest), os.getuid(), os.getgid(), 777) + + new_content = b'{"live_data_watcher_load_type":"S3","bucket_name":"test_bucket","file_key":"test_file_key","sse_key":"test_decryption_key","region_name":"us-east-1", "num_file_shards": 5}' + expected_content = b'{"foo_encrypted": "bar_encrypted"}' + + # For safe measure, run this 20 times. It should succeed every time. + # We've uploaded 5 files to S3 in setUp() and num_file_shards=5 in the ZK node so we should be fetching one of these 5 files randomly (and successfully) - and all should have the same content. + for i in range(20): + inst.on_change(new_content, None) + self.assertEqual(expected_content, dest.read_bytes()) + self.assertEqual(dest.owner(), pwd.getpwuid(os.getuid()).pw_name) + self.assertEqual(dest.group(), grp.getgrgid(os.getgid()).gr_name) + def test_on_change(self): dest = self.output_dir.joinpath("data.txt") inst = NodeWatcher(str(dest), os.getuid(), os.getgid(), 777) From 624d3593d30fc3cd3b460dd59c3dd76feda15ab3 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 03:43:22 -0500 Subject: [PATCH 04/30] intentionally fail test to verify --- baseplate/sidecars/live_data_watcher.py | 1 + tests/unit/sidecars/live_data_watcher_tests.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index 0e6786e98..376c78694 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -151,6 +151,7 @@ def _load_from_s3(data: bytes) -> bytes: "SSECustomerKey": loader_config["sse_key"], "SSECustomerAlgorithm": "AES256", } + logger.error(file_key) except KeyError as e: # We require all of these keys to properly read from S3. logger.exception( diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index cfc3657a4..d8e8729b5 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -68,7 +68,7 @@ def test_s3_load_type_sharded_on_change(self): inst = NodeWatcher(str(dest), os.getuid(), os.getgid(), 777) new_content = b'{"live_data_watcher_load_type":"S3","bucket_name":"test_bucket","file_key":"test_file_key","sse_key":"test_decryption_key","region_name":"us-east-1", "num_file_shards": 5}' - expected_content = b'{"foo_encrypted": "bar_encrypted"}' + expected_content = b'{"foo_encrypted": "bar_encrypteds"}' # For safe measure, run this 20 times. It should succeed every time. # We've uploaded 5 files to S3 in setUp() and num_file_shards=5 in the ZK node so we should be fetching one of these 5 files randomly (and successfully) - and all should have the same content. From 046df3803d51c576b2cd4a1a88d011dacc9dc499 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 03:55:58 -0500 Subject: [PATCH 05/30] intentionally fail test to verify --- baseplate/sidecars/live_data_watcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index 376c78694..7d7c2a853 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -127,12 +127,12 @@ def _load_from_s3(data: bytes) -> bytes: # We can't assume that every caller of this method will be using prefix sharding on # their S3 objects. num_file_shards = loader_config.get("num_file_shards", 1) - + logger.error(num_file_shards) # If the num_file_shards key is present, we may have multiple copies of the same manifest # uploaded so fetch one randomly using a randomly generated prefix. # Generate a random number from 0 to num_file_shards exclusive to use as prefix. file_key_prefix = random.randrange(num_file_shards) - + logger.error(file_key_prefix) # If 0 is generated, don’t append a prefix, fetch the file with no prefix # since we always upload one file without a prefix. if file_key_prefix == 0: From 52516a29bab43b28d75aa12e828616a54356d978 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 04:54:03 -0500 Subject: [PATCH 06/30] remove logs --- baseplate/sidecars/live_data_watcher.py | 5 ++--- tests/unit/sidecars/live_data_watcher_tests.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index 7d7c2a853..0e6786e98 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -127,12 +127,12 @@ def _load_from_s3(data: bytes) -> bytes: # We can't assume that every caller of this method will be using prefix sharding on # their S3 objects. num_file_shards = loader_config.get("num_file_shards", 1) - logger.error(num_file_shards) + # If the num_file_shards key is present, we may have multiple copies of the same manifest # uploaded so fetch one randomly using a randomly generated prefix. # Generate a random number from 0 to num_file_shards exclusive to use as prefix. file_key_prefix = random.randrange(num_file_shards) - logger.error(file_key_prefix) + # If 0 is generated, don’t append a prefix, fetch the file with no prefix # since we always upload one file without a prefix. if file_key_prefix == 0: @@ -151,7 +151,6 @@ def _load_from_s3(data: bytes) -> bytes: "SSECustomerKey": loader_config["sse_key"], "SSECustomerAlgorithm": "AES256", } - logger.error(file_key) except KeyError as e: # We require all of these keys to properly read from S3. logger.exception( diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index d8e8729b5..cfc3657a4 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -68,7 +68,7 @@ def test_s3_load_type_sharded_on_change(self): inst = NodeWatcher(str(dest), os.getuid(), os.getgid(), 777) new_content = b'{"live_data_watcher_load_type":"S3","bucket_name":"test_bucket","file_key":"test_file_key","sse_key":"test_decryption_key","region_name":"us-east-1", "num_file_shards": 5}' - expected_content = b'{"foo_encrypted": "bar_encrypteds"}' + expected_content = b'{"foo_encrypted": "bar_encrypted"}' # For safe measure, run this 20 times. It should succeed every time. # We've uploaded 5 files to S3 in setUp() and num_file_shards=5 in the ZK node so we should be fetching one of these 5 files randomly (and successfully) - and all should have the same content. From ad035a7094f942b4b943a6746cad9f6f894e80fa Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 04:57:36 -0500 Subject: [PATCH 07/30] verify this fails --- tests/unit/sidecars/live_data_watcher_tests.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index cfc3657a4..0d194673a 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -35,7 +35,7 @@ def setUp(self): sharded_file_key = default_file_key else: # All other copies should include the sharded prefix. - sharded_file_key = str(file_shard_num) + "/" + default_file_key + sharded_file_key = default_file_key s3_client.put_object( Bucket=bucket_name, Key=sharded_file_key, @@ -71,7 +71,9 @@ def test_s3_load_type_sharded_on_change(self): expected_content = b'{"foo_encrypted": "bar_encrypted"}' # For safe measure, run this 20 times. It should succeed every time. - # We've uploaded 5 files to S3 in setUp() and num_file_shards=5 in the ZK node so we should be fetching one of these 5 files randomly (and successfully) - and all should have the same content. + # We've uploaded 5 files to S3 in setUp() and num_file_shards=5 in the + # ZK node so we should be fetching one of these 5 files randomly (and successfully) + # and all should have the same content. for i in range(20): inst.on_change(new_content, None) self.assertEqual(expected_content, dest.read_bytes()) From 0297f504badebee153e37d7d328ea14cfd6a309f Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 05:01:48 -0500 Subject: [PATCH 08/30] and it does --- tests/unit/sidecars/live_data_watcher_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index 0d194673a..06b3392cd 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -35,7 +35,7 @@ def setUp(self): sharded_file_key = default_file_key else: # All other copies should include the sharded prefix. - sharded_file_key = default_file_key + sharded_file_key = str(file_shard_num) + "/" + default_file_key s3_client.put_object( Bucket=bucket_name, Key=sharded_file_key, From b844da510b4f739a80f161c5e8e5ff75d61a0c96 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 05:46:19 -0500 Subject: [PATCH 09/30] change range --- baseplate/sidecars/live_data_watcher.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index 0e6786e98..a2319dbe2 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -123,24 +123,17 @@ def _load_from_s3(data: bytes) -> bytes: # and json is an easier format for znode authors to work with. loader_config = json.loads(data.decode("UTF-8")) try: - # Default to 1 since we generate random numbers from 0 to num_file_shards exclusive. - # We can't assume that every caller of this method will be using prefix sharding on - # their S3 objects. - num_file_shards = loader_config.get("num_file_shards", 1) - - # If the num_file_shards key is present, we may have multiple copies of the same manifest - # uploaded so fetch one randomly using a randomly generated prefix. - # Generate a random number from 0 to num_file_shards exclusive to use as prefix. - file_key_prefix = random.randrange(num_file_shards) + num_file_shards = loader_config.get("num_file_shards") - # If 0 is generated, don’t append a prefix, fetch the file with no prefix - # since we always upload one file without a prefix. - if file_key_prefix == 0: + # We can't assume that every caller of this method will be using prefix sharding on + # their S3 objects. If it's not present, set the prefix to empty string "" + if not num_file_shards: sharded_file_key_prefix = "" else: - # If any other number is generated, fetch one of the copies of the - # file which has an included prefix. - sharded_file_key_prefix = str(file_key_prefix) + "/" + # If the num_file_shards key is present, we may have multiple copies of the same manifest + # uploaded so fetch one randomly using a randomly generated prefix. + # Generate a random number from 1 to num_file_shards exclusive to use as prefix. + sharded_file_key_prefix = str(random.randrange(1, num_file_shards)) + "/" # Append prefix (if it exists) to our original file key. file_key = sharded_file_key_prefix + loader_config["file_key"] From f3058f382a2ddc1639cfdd264e54ed8eb1c498db Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 05:46:53 -0500 Subject: [PATCH 10/30] change range --- tests/unit/sidecars/live_data_watcher_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index 06b3392cd..88f0e1f9c 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -13,7 +13,7 @@ from baseplate.sidecars.live_data_watcher import NodeWatcher -NUM_FILE_SHARDS = 5 +NUM_FILE_SHARDS = 6 class NodeWatcherTests(unittest.TestCase): From 66991977d591b4f4de3ebf1bb605322699364a6f Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 06:00:35 -0500 Subject: [PATCH 11/30] change range --- baseplate/sidecars/live_data_watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index a2319dbe2..5738cb31d 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -127,7 +127,7 @@ def _load_from_s3(data: bytes) -> bytes: # We can't assume that every caller of this method will be using prefix sharding on # their S3 objects. If it's not present, set the prefix to empty string "" - if not num_file_shards: + if not num_file_shards or num_file_shards == 1: sharded_file_key_prefix = "" else: # If the num_file_shards key is present, we may have multiple copies of the same manifest From aa0eff1db6fd9fed56eb93a220ade305ad1802c9 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 10:25:48 -0500 Subject: [PATCH 12/30] comments --- baseplate/sidecars/live_data_watcher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index 5738cb31d..73ee22f43 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -126,7 +126,8 @@ def _load_from_s3(data: bytes) -> bytes: num_file_shards = loader_config.get("num_file_shards") # We can't assume that every caller of this method will be using prefix sharding on - # their S3 objects. If it's not present, set the prefix to empty string "" + # their S3 objects (but at least one service definitely does - experiments). + # If it's not present or the value is 1, set the prefix to empty string "" if not num_file_shards or num_file_shards == 1: sharded_file_key_prefix = "" else: From d16b2105ff9f20688fb58ebe3a807a07f592e13f Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 10:29:23 -0500 Subject: [PATCH 13/30] comments --- baseplate/sidecars/live_data_watcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index 73ee22f43..6f7fd11b9 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -125,8 +125,8 @@ def _load_from_s3(data: bytes) -> bytes: try: num_file_shards = loader_config.get("num_file_shards") - # We can't assume that every caller of this method will be using prefix sharding on - # their S3 objects (but at least one service definitely does - experiments). + # We can't assume that every ZK Node that is being NodeWatched by the live-data-fetcher + # will make use of S3 prefix sharding - but, we know at least one does (/experiments). # If it's not present or the value is 1, set the prefix to empty string "" if not num_file_shards or num_file_shards == 1: sharded_file_key_prefix = "" From cd0e4cf3da0c3924854b538ae94f58f6c67a6f41 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 10:43:32 -0500 Subject: [PATCH 14/30] use function --- baseplate/sidecars/live_data_watcher.py | 29 ++++++++++++++----------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index 6f7fd11b9..9f6ca2a47 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -116,6 +116,20 @@ def _parse_loader_type(data: bytes) -> LoaderType: return LoaderType(loader_type) +def _generate_sharded_file_key_prefix(num_file_shards: Optional[int]) -> str: + # We can't assume that every ZK Node that is being NodeWatched by the live-data-fetcher + # will make use of S3 prefix sharding - but, we know at least one does (/experiments). + # If it's not present or the value is 1, set the prefix to empty string "" + if not num_file_shards or num_file_shards == 1: + sharded_file_key_prefix = "" + else: + # If the num_file_shards key is present, we may have multiple copies of the same manifest + # uploaded so fetch one randomly using a randomly generated prefix. + # Generate a random number from 1 to num_file_shards exclusive to use as prefix. + sharded_file_key_prefix = str(random.randrange(1, num_file_shards)) + "/" + return sharded_file_key_prefix + + def _load_from_s3(data: bytes) -> bytes: # While many of the baseplate configurations use an ini format, # we've opted for json in these internal-to-znode-configs because @@ -124,20 +138,9 @@ def _load_from_s3(data: bytes) -> bytes: loader_config = json.loads(data.decode("UTF-8")) try: num_file_shards = loader_config.get("num_file_shards") - - # We can't assume that every ZK Node that is being NodeWatched by the live-data-fetcher - # will make use of S3 prefix sharding - but, we know at least one does (/experiments). - # If it's not present or the value is 1, set the prefix to empty string "" - if not num_file_shards or num_file_shards == 1: - sharded_file_key_prefix = "" - else: - # If the num_file_shards key is present, we may have multiple copies of the same manifest - # uploaded so fetch one randomly using a randomly generated prefix. - # Generate a random number from 1 to num_file_shards exclusive to use as prefix. - sharded_file_key_prefix = str(random.randrange(1, num_file_shards)) + "/" - # Append prefix (if it exists) to our original file key. - file_key = sharded_file_key_prefix + loader_config["file_key"] + file_key = _generate_sharded_file_key_prefix(num_file_shards) + loader_config["file_key"] + region_name = loader_config["region_name"] s3_kwargs = { "Bucket": loader_config["bucket_name"], From 7ce12966573645fbe3b70a1c91e510fe9180b45e Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 10:49:38 -0500 Subject: [PATCH 15/30] move more things into helper --- baseplate/sidecars/live_data_watcher.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index 9f6ca2a47..2b2163c7f 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -116,7 +116,7 @@ def _parse_loader_type(data: bytes) -> LoaderType: return LoaderType(loader_type) -def _generate_sharded_file_key_prefix(num_file_shards: Optional[int]) -> str: +def _generate_sharded_file_key(num_file_shards: Optional[int], file_key: str) -> str: # We can't assume that every ZK Node that is being NodeWatched by the live-data-fetcher # will make use of S3 prefix sharding - but, we know at least one does (/experiments). # If it's not present or the value is 1, set the prefix to empty string "" @@ -127,7 +127,8 @@ def _generate_sharded_file_key_prefix(num_file_shards: Optional[int]) -> str: # uploaded so fetch one randomly using a randomly generated prefix. # Generate a random number from 1 to num_file_shards exclusive to use as prefix. sharded_file_key_prefix = str(random.randrange(1, num_file_shards)) + "/" - return sharded_file_key_prefix + # Append prefix (if it exists) to our original file key. + return sharded_file_key_prefix + file_key def _load_from_s3(data: bytes) -> bytes: @@ -138,13 +139,16 @@ def _load_from_s3(data: bytes) -> bytes: loader_config = json.loads(data.decode("UTF-8")) try: num_file_shards = loader_config.get("num_file_shards") - # Append prefix (if it exists) to our original file key. - file_key = _generate_sharded_file_key_prefix(num_file_shards) + loader_config["file_key"] + + # We expect this key to always be present, otherwise it's an exception. + file_key = loader_config["file_key"] + + sharded_file_key = _generate_sharded_file_key(num_file_shards, file_key) region_name = loader_config["region_name"] s3_kwargs = { "Bucket": loader_config["bucket_name"], - "Key": file_key, + "Key": sharded_file_key, "SSECustomerKey": loader_config["sse_key"], "SSECustomerAlgorithm": "AES256", } From 907acae0beca1f889306e2a0af98052eab45bbde Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 10:59:35 -0500 Subject: [PATCH 16/30] fix unit test --- .../unit/sidecars/live_data_watcher_tests.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index 88f0e1f9c..40d73447b 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -12,6 +12,7 @@ from moto import mock_s3 from baseplate.sidecars.live_data_watcher import NodeWatcher +from baseplate.sidecars.live_data_watcher import _generate_sharded_file_key NUM_FILE_SHARDS = 6 @@ -63,6 +64,30 @@ def test_s3_load_type_on_change(self): self.assertEqual(dest.owner(), pwd.getpwuid(os.getuid()).pw_name) self.assertEqual(dest.group(), grp.getgrgid(os.getgid()).gr_name) + def test_generate_sharded_file_key_with_no_shards_key(self): + original_file_key = "test_file_key" + actual_sharded_file_key = _generate_sharded_file_key(None, original_file_key) + expected_sharded_file_key = "test_file_key" + self.assertEqual(actual_sharded_file_key, expected_sharded_file_key) + + def test_generate_sharded_file_key(self): + original_file_key = "test_file_key" + possible_sharded_file_keys = set( + [ + "1_test_file_key", + "2_test_file_key", + "3_test_file_key", + "4_test_file_key", + "5_test_file_key", + ] + ) + for i in range(50): + actual_sharded_file_key = _generate_sharded_file_key(NUM_FILE_SHARDS, original_file_key) + # If num_file_shards is provided, the generated file key MUST have a prefix. + self.assertTrue(actual_sharded_file_key in possible_sharded_file_keys) + # Make sure we aren't generating a file without the prefix. + self.assertFalse(actual_sharded_file_key == original_file_key) + def test_s3_load_type_sharded_on_change(self): dest = self.output_dir.joinpath("data.txt") inst = NodeWatcher(str(dest), os.getuid(), os.getgid(), 777) From f70e98e78b3ff12cdc120d6839879361206f0447 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 11:00:23 -0500 Subject: [PATCH 17/30] fix unit test --- tests/unit/sidecars/live_data_watcher_tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index 40d73447b..d29b8b3ed 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -95,11 +95,11 @@ def test_s3_load_type_sharded_on_change(self): new_content = b'{"live_data_watcher_load_type":"S3","bucket_name":"test_bucket","file_key":"test_file_key","sse_key":"test_decryption_key","region_name":"us-east-1", "num_file_shards": 5}' expected_content = b'{"foo_encrypted": "bar_encrypted"}' - # For safe measure, run this 20 times. It should succeed every time. + # For safe measure, run this 50 times. It should succeed every time. # We've uploaded 5 files to S3 in setUp() and num_file_shards=5 in the # ZK node so we should be fetching one of these 5 files randomly (and successfully) # and all should have the same content. - for i in range(20): + for i in range(50): inst.on_change(new_content, None) self.assertEqual(expected_content, dest.read_bytes()) self.assertEqual(dest.owner(), pwd.getpwuid(os.getuid()).pw_name) From 2d9b459c7589a8eee2d88ab8b1af381b1f24d758 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 11:01:51 -0500 Subject: [PATCH 18/30] rename unit test --- tests/unit/sidecars/live_data_watcher_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index d29b8b3ed..f1a92c60c 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -64,7 +64,7 @@ def test_s3_load_type_on_change(self): self.assertEqual(dest.owner(), pwd.getpwuid(os.getuid()).pw_name) self.assertEqual(dest.group(), grp.getgrgid(os.getgid()).gr_name) - def test_generate_sharded_file_key_with_no_shards_key(self): + def test_generate_sharded_file_key_with_no_sharding(self): original_file_key = "test_file_key" actual_sharded_file_key = _generate_sharded_file_key(None, original_file_key) expected_sharded_file_key = "test_file_key" From c9d0bfbd6ef0121fbb1530962fc4dd1d908531e4 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 11:03:43 -0500 Subject: [PATCH 19/30] rename unit test --- .../unit/sidecars/live_data_watcher_tests.py | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index f1a92c60c..3b0e8817d 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -53,24 +53,13 @@ def run(self, result: unittest.TestResult = None) -> unittest.TestResult: self.output_dir = Path(loc) return super().run(result) - def test_s3_load_type_on_change(self): - dest = self.output_dir.joinpath("data.txt") - inst = NodeWatcher(str(dest), os.getuid(), os.getgid(), 777) - - new_content = b'{"live_data_watcher_load_type":"S3","bucket_name":"test_bucket","file_key":"test_file_key","sse_key":"test_decryption_key","region_name":"us-east-1"}' - expected_content = b'{"foo_encrypted": "bar_encrypted"}' - inst.on_change(new_content, None) - self.assertEqual(expected_content, dest.read_bytes()) - self.assertEqual(dest.owner(), pwd.getpwuid(os.getuid()).pw_name) - self.assertEqual(dest.group(), grp.getgrgid(os.getgid()).gr_name) - - def test_generate_sharded_file_key_with_no_sharding(self): + def test_generate_sharded_file_key_no_sharding(self): original_file_key = "test_file_key" actual_sharded_file_key = _generate_sharded_file_key(None, original_file_key) expected_sharded_file_key = "test_file_key" self.assertEqual(actual_sharded_file_key, expected_sharded_file_key) - def test_generate_sharded_file_key(self): + def test_generate_sharded_file_key_sharding(self): original_file_key = "test_file_key" possible_sharded_file_keys = set( [ @@ -88,7 +77,18 @@ def test_generate_sharded_file_key(self): # Make sure we aren't generating a file without the prefix. self.assertFalse(actual_sharded_file_key == original_file_key) - def test_s3_load_type_sharded_on_change(self): + def test_s3_load_type_on_change_no_sharding(self): + dest = self.output_dir.joinpath("data.txt") + inst = NodeWatcher(str(dest), os.getuid(), os.getgid(), 777) + + new_content = b'{"live_data_watcher_load_type":"S3","bucket_name":"test_bucket","file_key":"test_file_key","sse_key":"test_decryption_key","region_name":"us-east-1"}' + expected_content = b'{"foo_encrypted": "bar_encrypted"}' + inst.on_change(new_content, None) + self.assertEqual(expected_content, dest.read_bytes()) + self.assertEqual(dest.owner(), pwd.getpwuid(os.getuid()).pw_name) + self.assertEqual(dest.group(), grp.getgrgid(os.getgid()).gr_name) + + def test_s3_load_type_on_change_sharding(self): dest = self.output_dir.joinpath("data.txt") inst = NodeWatcher(str(dest), os.getuid(), os.getgid(), 777) From 203a8f6ffb5ed58ea7751c6d8da0508de5351725 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 11:09:06 -0500 Subject: [PATCH 20/30] import order --- tests/unit/sidecars/live_data_watcher_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index 3b0e8817d..3841b453b 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -11,8 +11,8 @@ from moto import mock_s3 -from baseplate.sidecars.live_data_watcher import NodeWatcher from baseplate.sidecars.live_data_watcher import _generate_sharded_file_key +from baseplate.sidecars.live_data_watcher import NodeWatcher NUM_FILE_SHARDS = 6 From f386fb5ec90dfaef1869e86ee5c895747feab685 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 11:15:02 -0500 Subject: [PATCH 21/30] add logging since a test failed --- tests/unit/sidecars/live_data_watcher_tests.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index 3841b453b..fbba1c107 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -16,6 +16,10 @@ NUM_FILE_SHARDS = 6 +import logging + +logger = logging.getLogger(__name__) + class NodeWatcherTests(unittest.TestCase): mock_s3 = mock_s3() @@ -74,6 +78,7 @@ def test_generate_sharded_file_key_sharding(self): actual_sharded_file_key = _generate_sharded_file_key(NUM_FILE_SHARDS, original_file_key) # If num_file_shards is provided, the generated file key MUST have a prefix. self.assertTrue(actual_sharded_file_key in possible_sharded_file_keys) + logger.error(actual_sharded_file_key) # Make sure we aren't generating a file without the prefix. self.assertFalse(actual_sharded_file_key == original_file_key) From fc9162e591042a7a64dd7ccfbe576bd715459f08 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 11:16:51 -0500 Subject: [PATCH 22/30] import order --- tests/unit/sidecars/live_data_watcher_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index fbba1c107..1d9a95701 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -1,5 +1,6 @@ import grp import json +import logging import os import pwd import tempfile @@ -16,7 +17,6 @@ NUM_FILE_SHARDS = 6 -import logging logger = logging.getLogger(__name__) From 8ae0658574bc0a3ac0df4d3191a6ac119d1b6c9b Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 11:22:01 -0500 Subject: [PATCH 23/30] move logger up --- tests/unit/sidecars/live_data_watcher_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index 1d9a95701..541627697 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -76,9 +76,9 @@ def test_generate_sharded_file_key_sharding(self): ) for i in range(50): actual_sharded_file_key = _generate_sharded_file_key(NUM_FILE_SHARDS, original_file_key) + logger.error(actual_sharded_file_key) # If num_file_shards is provided, the generated file key MUST have a prefix. self.assertTrue(actual_sharded_file_key in possible_sharded_file_keys) - logger.error(actual_sharded_file_key) # Make sure we aren't generating a file without the prefix. self.assertFalse(actual_sharded_file_key == original_file_key) From 9f97142be2e707f2605016ecaab91c4ddbc38c8a Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 11:23:29 -0500 Subject: [PATCH 24/30] oops, the delimiter should have been / not _ --- tests/unit/sidecars/live_data_watcher_tests.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index 541627697..9f7b840bd 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -67,16 +67,15 @@ def test_generate_sharded_file_key_sharding(self): original_file_key = "test_file_key" possible_sharded_file_keys = set( [ - "1_test_file_key", - "2_test_file_key", - "3_test_file_key", - "4_test_file_key", - "5_test_file_key", + "1/test_file_key", + "2/test_file_key", + "3/test_file_key", + "4/test_file_key", + "5/test_file_key", ] ) for i in range(50): actual_sharded_file_key = _generate_sharded_file_key(NUM_FILE_SHARDS, original_file_key) - logger.error(actual_sharded_file_key) # If num_file_shards is provided, the generated file key MUST have a prefix. self.assertTrue(actual_sharded_file_key in possible_sharded_file_keys) # Make sure we aren't generating a file without the prefix. From 3db971dd498a30ede6e58ff3d1704eb3ed60c381 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 11:34:46 -0500 Subject: [PATCH 25/30] add 0 and 1 to no sharding unit test --- tests/unit/sidecars/live_data_watcher_tests.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index 9f7b840bd..b59e0219d 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -59,9 +59,11 @@ def run(self, result: unittest.TestResult = None) -> unittest.TestResult: def test_generate_sharded_file_key_no_sharding(self): original_file_key = "test_file_key" - actual_sharded_file_key = _generate_sharded_file_key(None, original_file_key) expected_sharded_file_key = "test_file_key" - self.assertEqual(actual_sharded_file_key, expected_sharded_file_key) + possible_no_sharding_values = [0, 1, None] + for values in possible_no_sharding_values: + actual_sharded_file_key = _generate_sharded_file_key(values, original_file_key) + self.assertEqual(actual_sharded_file_key, expected_sharded_file_key) def test_generate_sharded_file_key_sharding(self): original_file_key = "test_file_key" From a976e5aff776ece20d65e66d89b7a65dddce2b7f Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 14:28:04 -0500 Subject: [PATCH 26/30] clean up python --- baseplate/sidecars/live_data_watcher.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index 2b2163c7f..31c9ebf85 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -119,10 +119,10 @@ def _parse_loader_type(data: bytes) -> LoaderType: def _generate_sharded_file_key(num_file_shards: Optional[int], file_key: str) -> str: # We can't assume that every ZK Node that is being NodeWatched by the live-data-fetcher # will make use of S3 prefix sharding - but, we know at least one does (/experiments). - # If it's not present or the value is 1, set the prefix to empty string "" - if not num_file_shards or num_file_shards == 1: - sharded_file_key_prefix = "" - else: + # If it's not present or the value is 0 or 1, set the prefix to empty string "" + sharded_file_key_prefix = "" + no_shard_values = [None, 0, 1] + if not num_file_shards in no_shard_values: # If the num_file_shards key is present, we may have multiple copies of the same manifest # uploaded so fetch one randomly using a randomly generated prefix. # Generate a random number from 1 to num_file_shards exclusive to use as prefix. From f94047bcd1f44b8accd73838c0a86423821219ab Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 14:30:34 -0500 Subject: [PATCH 27/30] clean up python --- baseplate/sidecars/live_data_watcher.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index 31c9ebf85..3e11583cb 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -121,8 +121,7 @@ def _generate_sharded_file_key(num_file_shards: Optional[int], file_key: str) -> # will make use of S3 prefix sharding - but, we know at least one does (/experiments). # If it's not present or the value is 0 or 1, set the prefix to empty string "" sharded_file_key_prefix = "" - no_shard_values = [None, 0, 1] - if not num_file_shards in no_shard_values: + if num_file_shards is not None and num_file_shards > 1: # If the num_file_shards key is present, we may have multiple copies of the same manifest # uploaded so fetch one randomly using a randomly generated prefix. # Generate a random number from 1 to num_file_shards exclusive to use as prefix. From 3264b3086ee871165b0df5e9ad6fdfce1fc15772 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 14:31:33 -0500 Subject: [PATCH 28/30] comments --- baseplate/sidecars/live_data_watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index 3e11583cb..b795e175d 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -119,7 +119,7 @@ def _parse_loader_type(data: bytes) -> LoaderType: def _generate_sharded_file_key(num_file_shards: Optional[int], file_key: str) -> str: # We can't assume that every ZK Node that is being NodeWatched by the live-data-fetcher # will make use of S3 prefix sharding - but, we know at least one does (/experiments). - # If it's not present or the value is 0 or 1, set the prefix to empty string "" + # If it's not present or the value is less than 2, set the prefix to empty string "" sharded_file_key_prefix = "" if num_file_shards is not None and num_file_shards > 1: # If the num_file_shards key is present, we may have multiple copies of the same manifest From b66192923bb5a167c9f5b3502fcb74b4ed66b899 Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 14:40:18 -0500 Subject: [PATCH 29/30] add negative values to test --- tests/unit/sidecars/live_data_watcher_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/sidecars/live_data_watcher_tests.py b/tests/unit/sidecars/live_data_watcher_tests.py index b59e0219d..98bf9cc9e 100644 --- a/tests/unit/sidecars/live_data_watcher_tests.py +++ b/tests/unit/sidecars/live_data_watcher_tests.py @@ -60,7 +60,7 @@ def run(self, result: unittest.TestResult = None) -> unittest.TestResult: def test_generate_sharded_file_key_no_sharding(self): original_file_key = "test_file_key" expected_sharded_file_key = "test_file_key" - possible_no_sharding_values = [0, 1, None] + possible_no_sharding_values = [-2, -1, 0, 1, None] for values in possible_no_sharding_values: actual_sharded_file_key = _generate_sharded_file_key(values, original_file_key) self.assertEqual(actual_sharded_file_key, expected_sharded_file_key) From 0691085f3c8c0a8b049fdfbf27f02020f637043d Mon Sep 17 00:00:00 2001 From: Siddharth Manoj Date: Thu, 30 Nov 2023 15:59:08 -0500 Subject: [PATCH 30/30] revert retries from 10 -> 5 --- baseplate/sidecars/live_data_watcher.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/baseplate/sidecars/live_data_watcher.py b/baseplate/sidecars/live_data_watcher.py index b795e175d..acbefb444 100644 --- a/baseplate/sidecars/live_data_watcher.py +++ b/baseplate/sidecars/live_data_watcher.py @@ -167,16 +167,15 @@ def _load_from_s3(data: bytes) -> bytes: # a public resource belonging to another cluster/AWS account unless the request credentials # are unsigned. - # Access S3 with 10 max retries enabled: + # Default # of retries in legacy mode (current mode) is 5. s3_client = boto3.client( "s3", - config=Config(signature_version=UNSIGNED, retries={"total_max_attempts": 10}), + config=Config(signature_version=UNSIGNED), region_name=region_name, ) else: s3_client = boto3.client( "s3", - config=Config(retries={"total_max_attempts": 10}), region_name=region_name, )