Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETL-631] Update index fields with ParticipantIdentifier and propagate participant id fields #110

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 41 additions & 39 deletions src/glue/jobs/json_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,42 +37,42 @@
"enrolledparticipants": ["ParticipantIdentifier"],
"fitbitprofiles": ["ParticipantIdentifier", "ModifiedDate"],
"fitbitdevices": ["ParticipantIdentifier", "Date"],
"fitbitactivitylogs": ["LogId"],
"fitbitactivitylogs": ["ParticipantIdentifier", "LogId"],
"fitbitdailydata": ["ParticipantIdentifier", "Date"],
"fitbitecg": ["FitbitEcgKey"],
"fitbitecg": ["ParticipantIdentifier", "FitbitEcgKey"],
"fitbitintradaycombined": ["ParticipantIdentifier", "Type", "DateTime"],
"fitbitrestingheartrates": ["ParticipantIdentifier", "Date"],
"fitbitsleeplogs": ["LogId"],
"healthkitv2characteristics": ["HealthKitCharacteristicKey"],
"healthkitv2samples": ["HealthKitSampleKey"],
"healthkitv2heartbeat": ["HealthKitHeartbeatSampleKey"],
"healthkitv2statistics": ["HealthKitStatisticKey"],
"healthkitv2clinicalrecords": ["HealthKitClinicalRecordKey"],
"healthkitv2electrocardiogram": ["HealthKitECGSampleKey"],
"healthkitv2workouts": ["HealthKitWorkoutKey"],
"healthkitv2activitysummaries": ["HealthKitActivitySummaryKey"],
"garminactivitydetailssummary": ["ParticipantID", "SummaryId"],
"garminactivitysummary": ["ParticipantID", "SummaryId"],
"garminbloodpressuresummary": ["ParticipantID", "SummaryId"],
"garmindailysummary": ["ParticipantID", "StartTimeInSeconds"],
"garminepochsummary": ["ParticipantID", "SummaryId"],
"garminhealthsnapshotsummary": ["ParticipantID", "StartTimeInSeconds"],
"garminhrvsummary": ["ParticipantID", "StartTimeInSeconds"],
"garminmanuallyupdatedactivitysummary": ["ParticipantID", "SummaryId"],
"garminmoveiqactivitysummary": ["ParticipantID", "SummaryId"],
"garminpulseoxsummary": ["ParticipantID", "SummaryId"],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was ParticipantID the wrong key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a one-to-one mapping from ParticipantIdentifier to ParticipantID. They are two different ways of representing the same identifier.

ParticipantIdentifier is the only field present in every data type (including SymptomLog) and the CE docs suggest it's the more "official" identifier -- kind of like HealthCode to ExternalId in mPower.

"garminrespirationsummary": ["ParticipantID", "SummaryId"],
"fitbitsleeplogs": ["ParticipantIdentifier", "LogId"],
"healthkitv2characteristics": ["ParticipantIdentifier", "HealthKitCharacteristicKey"],
"healthkitv2samples": ["ParticipantIdentifier", "HealthKitSampleKey"],
"healthkitv2heartbeat": ["ParticipantIdentifier", "HealthKitHeartbeatSampleKey"],
"healthkitv2statistics": ["ParticipantIdentifier", "HealthKitStatisticKey"],
"healthkitv2clinicalrecords": ["ParticipantIdentifier", "HealthKitClinicalRecordKey"],
"healthkitv2electrocardiogram": ["ParticipantIdentifier", "HealthKitECGSampleKey"],
"healthkitv2workouts": ["ParticipantIdentifier", "HealthKitWorkoutKey"],
"healthkitv2activitysummaries": ["ParticipantIdentifier", "HealthKitActivitySummaryKey"],
"garminactivitydetailssummary": ["ParticipantIdentifier", "SummaryId"],
"garminactivitysummary": ["ParticipantIdentifier", "SummaryId"],
"garminbloodpressuresummary": ["ParticipantIdentifier", "SummaryId"],
"garmindailysummary": ["ParticipantIdentifier", "StartTimeInSeconds"],
"garminepochsummary": ["ParticipantIdentifier", "SummaryId"],
"garminhealthsnapshotsummary": ["ParticipantIdentifier", "StartTimeInSeconds"],
"garminhrvsummary": ["ParticipantIdentifier", "StartTimeInSeconds"],
"garminmanuallyupdatedactivitysummary": ["ParticipantIdentifier", "SummaryId"],
"garminmoveiqactivitysummary": ["ParticipantIdentifier", "SummaryId"],
"garminpulseoxsummary": ["ParticipantIdentifier", "SummaryId"],
"garminrespirationsummary": ["ParticipantIdentifier", "SummaryId"],
"garminsleepsummary": [
"ParticipantID",
"ParticipantIdentifier",
"StartTimeInSeconds",
"DurationInSeconds",
"Validation",
],
"garminstressdetailsummary": ["ParticipantID", "StartTimeInSeconds"],
"garminthirdpartydailysummary": ["ParticipantID", "StartTimeInSeconds"],
"garminusermetricssummary": ["ParticipantID", "CalenderDate"],
"googlefitsamples": ["GoogleFitSampleKey"],
"symptomlog": ["DataPointKey"],
"garminstressdetailsummary": ["ParticipantIdentifier", "StartTimeInSeconds"],
"garminthirdpartydailysummary": ["ParticipantIdentifier", "StartTimeInSeconds"],
"garminusermetricssummary": ["ParticipantIdentifier", "CalenderDate"],
"googlefitsamples": ["ParticipantIdentifier", "GoogleFitSampleKey"],
"symptomlog": ["ParticipantIdentifier", "DataPointKey"],
}


Expand Down Expand Up @@ -533,11 +533,14 @@ def add_index_to_table(
"""Add partition and index fields to a DynamicFrame.

A DynamicFrame containing the top-level fields already includes the index
fields, but DynamicFrame's which were flattened as a result of the
DynamicFrame.relationalize operation need to inherit the index and partition
fields from their parent. In order for this function to execute successfully,
the table's parent must already have the index fields and be included in
`processed_tables`.
fields -- `ParticipantIdentifier` and a primary key which is particular to
each data type (see global var `INDEX_FIELD_MAP`) -- but DynamicFrame's which
were flattened as a result of the DynamicFrame.relationalize operation need
to inherit the index and partition fields from their parent. We also propagate
the `ParticipantID` field where present, although this field is not included
in the SymptomLog data type. In order for this function to execute
successfully, the table's parent must already have the index fields and be
included in `processed_tables`.

In addition to adding the index fields, this function formats the names
of the (non-index) fields which were manipulated by the call to
Expand Down Expand Up @@ -582,12 +585,11 @@ def add_index_to_table(
else:
selectable_original_field_name = original_field_name
logger.info(f"Adding index to {original_field_name}")
parent_index = parent_table.select(
(
[selectable_original_field_name, "cohort"]
+ INDEX_FIELD_MAP[table_data_type]
)
).distinct()
index_fields = INDEX_FIELD_MAP[table_data_type]
Copy link
Contributor

@rxu17 rxu17 Apr 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that now Participant_Identifier is a required index field for every data type (as far as I can tell in INDEX_FIELD_MAP), I'm thinking maybe we could have a test to ensure that Participant_Identifier is in all the key-value pairs in the dict of INDEX_FIELD_MAP. This is more of future proofing/double checking that we don't modify code that would accidentally affect this. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to assume that ParticipantIdentifier will be included with every data type going forward. That's a CE decision, and why I explicitly included the field in the INDEX_FIELD_MAP rather than specifying it once in this function.

additional_fields = [selectable_original_field_name, "cohort"]
if "ParticipantID" in parent_table.columns:
Copy link
Contributor

@rxu17 rxu17 Apr 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we modify /add a new test for this function to check this part that ParticipantID gets included in additional_fields if it exists? I was reading the JIRA ticket/slack thread - why would both ParticipantID and ParticipantIdentifier exist in a dataset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, yes we should. I'm always forgetting about tests 🤦

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would both ParticipantID and ParticipantIdentifier exist in a dataset?

I don't know the exact reason, but it's not unusual to have one be the "global" identifier and the other to be a study or app specific identifier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not confusing at all :D

additional_fields.append("ParticipantID")
parent_index = parent_table.select(index_fields + additional_fields).distinct()
this_index = parent_index.withColumnRenamed(original_field_name, "id")
df_with_index = this_table.join(this_index, on="id", how="inner")
# remove prefix from field names
Expand Down
4 changes: 4 additions & 0 deletions src/glue/resources/table_columns.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ tables:
Type: string
- Name: FirmwareVersion
Type: string
- Name: export_start_date
Type: string
- Name: export_end_date
Type: string
partition_keys:
- Name: cohort
Type: string
Expand Down
Loading