Skip to content

Commit

Permalink
Merge pull request #69 from Sage-Bionetworks/etl-517
Browse files Browse the repository at this point in the history
[ETL-517] Write JSON records as NDJSON in blocks
  • Loading branch information
philerooski authored Aug 16, 2023
2 parents 6ed333c + 359c0e0 commit f1d9bbf
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 12 deletions.
61 changes: 49 additions & 12 deletions src/glue/jobs/s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import os
import sys
import typing
import zipfile
import boto3
from awsglue.utils import getResolvedOptions
Expand Down Expand Up @@ -261,6 +262,45 @@ def get_output_filename(metadata: dict) -> str:
)
return output_fname

def transform_block(
input_json: typing.IO,
dataset_identifier: str,
metadata: dict,
block_size: int=10000
):
"""
A generator function which yields a block of transformed JSON records.
This function can be used with `write_file_to_json_dataset`. Some JSON files
are too large to have all of their records kept in memory before being written
to the resulting transformed NDJSON file. To avoid an OOM error, we do the
transformations and write the records in blocks.
Args:
input_json (typing.IO): A file-like object of the JSON to be transformed.
dataset_identifier (str): The data type of `input_json`.
metadata (dict): Metadata derived from the file basename. See `get_metadata`.
block_size (int, optional): The number of records to process in each block.
Default is 10000.
Yields:
list: A block of transformed JSON records.
"""
block = []
for json_line in input_json:
json_obj = json.loads(json_line)
json_obj = transform_json(
json_obj=json_obj,
dataset_identifier=dataset_identifier,
metadata=metadata
)
block.append(json_obj)
if len(block) == block_size:
yield block
block = []
if block: # yield final block
yield block

def write_file_to_json_dataset(
z: zipfile.ZipFile,
json_path: str,
Expand Down Expand Up @@ -296,28 +336,25 @@ def write_file_to_json_dataset(
else:
s3_metadata["start_date"] = metadata["start_date"].isoformat()
s3_metadata["end_date"] = metadata["end_date"].isoformat()
output_filename = get_output_filename(metadata=metadata)
output_path = os.path.join(dataset_identifier, output_filename)
data = []
with z.open(json_path, "r") as p:
for json_line in p:
json_obj = json.loads(json_line)
json_obj = transform_json(
json_obj=json_obj,
with z.open(json_path, "r") as input_json:
with open(output_path, "w+") as f_out:
for transformed_block in transform_block(
input_json=input_json,
dataset_identifier=dataset_identifier,
metadata=metadata
)
data.append(json_obj)
output_filename = get_output_filename(metadata=metadata)
output_path = os.path.join(dataset_identifier, output_filename)
):
for transformed_record in transformed_block:
f_out.write("{}\n".format(json.dumps(transformed_record)))
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
workflow_run_properties["json_prefix"],
f"dataset={dataset_identifier}",
output_filename
)
logger.debug("Output Key: %s", s3_output_key)
with open(output_path, "w") as f_out:
for record in data:
f_out.write("{}\n".format(json.dumps(record)))
with open(output_path, "rb") as f_in:
response = s3_client.put_object(
Body = f_in,
Expand Down
64 changes: 64 additions & 0 deletions tests/test_s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,70 @@ def test_transform_json_garmin_two_levels_down(self):
# Check that other properties were not affected
assert transformed_json["Summaries"][0]["Dummy"] == 1

def test_transform_block_empty_file(self, s3_obj):
sample_metadata = {
"type": "HealthKitV2Samples",
"start_date": datetime.datetime(2022, 1, 12, 0, 0),
"end_date": datetime.datetime(2023, 1, 14, 0, 0),
"subtype": "Weight",
}
with zipfile.ZipFile(io.BytesIO(s3_obj["Body"])) as z:
json_path = "HealthKitV2Samples_Weight_20230112-20230114.json"
with z.open(json_path, "r") as input_json:
transformed_block = s3_to_json.transform_block(
input_json=input_json,
dataset_identifier=sample_metadata["type"],
metadata=sample_metadata,
block_size=2
)
with pytest.raises(StopIteration):
next(transformed_block)

def test_transform_block_non_empty_file_block_size(self, s3_obj):
sample_metadata = {
"type": "FitbitSleepLogs",
"start_date": datetime.datetime(2022, 1, 12, 0, 0),
"end_date": datetime.datetime(2023, 1, 14, 0, 0),
}
with zipfile.ZipFile(io.BytesIO(s3_obj["Body"])) as z:
json_path = "FitbitSleepLogs_20230112-20230114.json"
with z.open(json_path, "r") as input_json:
transformed_block = s3_to_json.transform_block(
input_json=input_json,
dataset_identifier=sample_metadata["type"],
metadata=sample_metadata,
block_size=2
)
first_block = next(transformed_block)
assert len(first_block) == 2
assert (
isinstance(first_block[0], dict)
and isinstance(first_block[1], dict)
)

def test_transform_block_non_empty_file_all_blocks(self, s3_obj):
sample_metadata = {
"type": "FitbitSleepLogs",
"start_date": datetime.datetime(2022, 1, 12, 0, 0),
"end_date": datetime.datetime(2023, 1, 14, 0, 0),
}
with zipfile.ZipFile(io.BytesIO(s3_obj["Body"])) as z:
json_path = "FitbitSleepLogs_20230112-20230114.json"
with z.open(json_path, "r") as input_json:
record_count = len(input_json.readlines())
with z.open(json_path, "r") as input_json:
transformed_block = s3_to_json.transform_block(
input_json=input_json,
dataset_identifier=sample_metadata["type"],
metadata=sample_metadata,
block_size=10
)
counter = 0
for block in transformed_block:
counter += len(block)
# Should be 12
assert counter == record_count

def test_get_output_filename_generic(self):
sample_metadata = {
"type": "FitbitDevices",
Expand Down

0 comments on commit f1d9bbf

Please sign in to comment.