From 29b4588d3afa042b24056672c3de0278d9758a2f Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Wed, 10 Apr 2024 12:45:53 -0700 Subject: [PATCH] Update index fields with `ParticipantIdentifier` And a small update to FitbitEcg table schema that should have been included when we originally added this data type. --- src/glue/jobs/json_to_parquet.py | 80 ++++++++++++++------------- src/glue/resources/table_columns.yaml | 4 ++ 2 files changed, 45 insertions(+), 39 deletions(-) diff --git a/src/glue/jobs/json_to_parquet.py b/src/glue/jobs/json_to_parquet.py index 33549ea3..d61c86fd 100644 --- a/src/glue/jobs/json_to_parquet.py +++ b/src/glue/jobs/json_to_parquet.py @@ -37,42 +37,42 @@ "enrolledparticipants": ["ParticipantIdentifier"], "fitbitprofiles": ["ParticipantIdentifier", "ModifiedDate"], "fitbitdevices": ["ParticipantIdentifier", "Date"], - "fitbitactivitylogs": ["LogId"], + "fitbitactivitylogs": ["ParticipantIdentifier", "LogId"], "fitbitdailydata": ["ParticipantIdentifier", "Date"], - "fitbitecg": ["FitbitEcgKey"], + "fitbitecg": ["ParticipantIdentifier", "FitbitEcgKey"], "fitbitintradaycombined": ["ParticipantIdentifier", "Type", "DateTime"], "fitbitrestingheartrates": ["ParticipantIdentifier", "Date"], - "fitbitsleeplogs": ["LogId"], - "healthkitv2characteristics": ["HealthKitCharacteristicKey"], - "healthkitv2samples": ["HealthKitSampleKey"], - "healthkitv2heartbeat": ["HealthKitHeartbeatSampleKey"], - "healthkitv2statistics": ["HealthKitStatisticKey"], - "healthkitv2clinicalrecords": ["HealthKitClinicalRecordKey"], - "healthkitv2electrocardiogram": ["HealthKitECGSampleKey"], - "healthkitv2workouts": ["HealthKitWorkoutKey"], - "healthkitv2activitysummaries": ["HealthKitActivitySummaryKey"], - "garminactivitydetailssummary": ["ParticipantID", "SummaryId"], - "garminactivitysummary": ["ParticipantID", "SummaryId"], - "garminbloodpressuresummary": ["ParticipantID", "SummaryId"], - "garmindailysummary": ["ParticipantID", "StartTimeInSeconds"], - "garminepochsummary": ["ParticipantID", "SummaryId"], - "garminhealthsnapshotsummary": ["ParticipantID", "StartTimeInSeconds"], - "garminhrvsummary": ["ParticipantID", "StartTimeInSeconds"], - "garminmanuallyupdatedactivitysummary": ["ParticipantID", "SummaryId"], - "garminmoveiqactivitysummary": ["ParticipantID", "SummaryId"], - "garminpulseoxsummary": ["ParticipantID", "SummaryId"], - "garminrespirationsummary": ["ParticipantID", "SummaryId"], + "fitbitsleeplogs": ["ParticipantIdentifier", "LogId"], + "healthkitv2characteristics": ["ParticipantIdentifier", "HealthKitCharacteristicKey"], + "healthkitv2samples": ["ParticipantIdentifier", "HealthKitSampleKey"], + "healthkitv2heartbeat": ["ParticipantIdentifier", "HealthKitHeartbeatSampleKey"], + "healthkitv2statistics": ["ParticipantIdentifier", "HealthKitStatisticKey"], + "healthkitv2clinicalrecords": ["ParticipantIdentifier", "HealthKitClinicalRecordKey"], + "healthkitv2electrocardiogram": ["ParticipantIdentifier", "HealthKitECGSampleKey"], + "healthkitv2workouts": ["ParticipantIdentifier", "HealthKitWorkoutKey"], + "healthkitv2activitysummaries": ["ParticipantIdentifier", "HealthKitActivitySummaryKey"], + "garminactivitydetailssummary": ["ParticipantIdentifier", "SummaryId"], + "garminactivitysummary": ["ParticipantIdentifier", "SummaryId"], + "garminbloodpressuresummary": ["ParticipantIdentifier", "SummaryId"], + "garmindailysummary": ["ParticipantIdentifier", "StartTimeInSeconds"], + "garminepochsummary": ["ParticipantIdentifier", "SummaryId"], + "garminhealthsnapshotsummary": ["ParticipantIdentifier", "StartTimeInSeconds"], + "garminhrvsummary": ["ParticipantIdentifier", "StartTimeInSeconds"], + "garminmanuallyupdatedactivitysummary": ["ParticipantIdentifier", "SummaryId"], + "garminmoveiqactivitysummary": ["ParticipantIdentifier", "SummaryId"], + "garminpulseoxsummary": ["ParticipantIdentifier", "SummaryId"], + "garminrespirationsummary": ["ParticipantIdentifier", "SummaryId"], "garminsleepsummary": [ - "ParticipantID", + "ParticipantIdentifier", "StartTimeInSeconds", "DurationInSeconds", "Validation", ], - "garminstressdetailsummary": ["ParticipantID", "StartTimeInSeconds"], - "garminthirdpartydailysummary": ["ParticipantID", "StartTimeInSeconds"], - "garminusermetricssummary": ["ParticipantID", "CalenderDate"], - "googlefitsamples": ["GoogleFitSampleKey"], - "symptomlog": ["DataPointKey"], + "garminstressdetailsummary": ["ParticipantIdentifier", "StartTimeInSeconds"], + "garminthirdpartydailysummary": ["ParticipantIdentifier", "StartTimeInSeconds"], + "garminusermetricssummary": ["ParticipantIdentifier", "CalenderDate"], + "googlefitsamples": ["ParticipantIdentifier", "GoogleFitSampleKey"], + "symptomlog": ["ParticipantIdentifier", "DataPointKey"], } @@ -533,11 +533,14 @@ def add_index_to_table( """Add partition and index fields to a DynamicFrame. A DynamicFrame containing the top-level fields already includes the index - fields, but DynamicFrame's which were flattened as a result of the - DynamicFrame.relationalize operation need to inherit the index and partition - fields from their parent. In order for this function to execute successfully, - the table's parent must already have the index fields and be included in - `processed_tables`. + fields -- `ParticipantIdentifier` and a primary key which is particular to + each data type (see global var `INDEX_FIELD_MAP`) -- but DynamicFrame's which + were flattened as a result of the DynamicFrame.relationalize operation need + to inherit the index and partition fields from their parent. We also propagate + the `ParticipantID` field where present, although this field is not included + in the SymptomLog data type. In order for this function to execute + successfully, the table's parent must already have the index fields and be + included in `processed_tables`. In addition to adding the index fields, this function formats the names of the (non-index) fields which were manipulated by the call to @@ -582,12 +585,11 @@ def add_index_to_table( else: selectable_original_field_name = original_field_name logger.info(f"Adding index to {original_field_name}") - parent_index = parent_table.select( - ( - [selectable_original_field_name, "cohort"] - + INDEX_FIELD_MAP[table_data_type] - ) - ).distinct() + index_fields = INDEX_FIELD_MAP[table_data_type] + additional_fields = [selectable_original_field_name, "cohort"] + if "ParticipantID" in parent_table.columns: + additional_fields.append("ParticipantID") + parent_index = parent_table.select(index_fields + additional_fields).distinct() this_index = parent_index.withColumnRenamed(original_field_name, "id") df_with_index = this_table.join(this_index, on="id", how="inner") # remove prefix from field names diff --git a/src/glue/resources/table_columns.yaml b/src/glue/resources/table_columns.yaml index 4d442763..91e4d5ae 100644 --- a/src/glue/resources/table_columns.yaml +++ b/src/glue/resources/table_columns.yaml @@ -330,6 +330,10 @@ tables: Type: string - Name: FirmwareVersion Type: string + - Name: export_start_date + Type: string + - Name: export_end_date + Type: string partition_keys: - Name: cohort Type: string