Skip to content

Commit

Permalink
reformat code
Browse files Browse the repository at this point in the history
  • Loading branch information
danlu1 committed Oct 24, 2024
1 parent 19bcc84 commit 3e11436
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 55 deletions.
84 changes: 65 additions & 19 deletions scripts/table_updates/tests/test_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
def syn():
return create_autospec(synapseclient.Synapse)


@pytest.fixture(scope="session")
def table_schema():
schema = synapseclient.table.Schema(
Expand All @@ -26,42 +27,87 @@ def table_schema():
@pytest.mark.parametrize(
"query_return_df,select,query,expected_df",
[
(pd.DataFrame({'col1': ['value1', 'value2']}), "col1", "SELECT col1 from syn123456",pd.DataFrame({'col1': ['value1', 'value2']})),
(pd.DataFrame({'col1': ['value1', 'value2'],'col2': [1, 2]}), "col1,col2", "SELECT col1,col2 from syn123456",pd.DataFrame({'col1': ['value1', 'value2'],'col2': [1, 2]})),
(pd.DataFrame({'col1': ["NA", "value1", "None"],'col2': [1, 2, 3]}),"*","SELECT * from syn123456",pd.DataFrame({'col1': [np.nan, "value1", "None"],'col2': [1, 2, 3]})),
(pd.DataFrame(columns = ["col1", "col2"]),"*","SELECT * from syn123456",pd.DataFrame(columns = ["col1", "col2"])),
(
pd.DataFrame({"col1": ["value1", "value2"]}),
"col1",
"SELECT col1 from syn123456",
pd.DataFrame({"col1": ["value1", "value2"]}),
),
(
pd.DataFrame({"col1": ["value1", "value2"], "col2": [1, 2]}),
"col1,col2",
"SELECT col1,col2 from syn123456",
pd.DataFrame({"col1": ["value1", "value2"], "col2": [1, 2]}),
),
(
pd.DataFrame({"col1": ["NA", "value1", "None"], "col2": [1, 2, 3]}),
"*",
"SELECT * from syn123456",
pd.DataFrame({"col1": [np.nan, "value1", "None"], "col2": [1, 2, 3]}),
),
(
pd.DataFrame(columns=["col1", "col2"]),
"*",
"SELECT * from syn123456",
pd.DataFrame(columns=["col1", "col2"]),
),
],
ids=[
"selected_single_column",
"selected_multiple_column",
"pull_table_with_na_values_all_columns",
"pull_empty_table_all_columns",
],
ids = ["selected_single_column","selected_multiple_column","pull_table_with_na_values_all_columns","pull_empty_table_all_columns"],
)
def test_download_synapse_table_default_condition(syn, table_schema, query_return_df, select, query, expected_df):
syn.tableQuery = MagicMock(return_value = Table(table_schema, query_return_df))
def test_download_synapse_table_default_condition(
syn, table_schema, query_return_df, select, query, expected_df
):
syn.tableQuery = MagicMock(return_value=Table(table_schema, query_return_df))
result = utilities.download_synapse_table(syn, "syn123456", select)

# validate
syn.tableQuery.assert_called_once_with(query)
pd.testing.assert_frame_equal(result, expected_df)


@pytest.mark.parametrize(
"query_return_df,condition,query,expected_df",
[
(pd.DataFrame({'col1': ['value1'],'col2': [1]}), "col1 = 'value1'", "SELECT * from syn123456 WHERE col1 = 'value1'",pd.DataFrame({'col1': ['value1'],'col2': [1]})),
(pd.DataFrame({'col1': ["NA", "value1", "None"],'col2': [1, 1, 1]}), "col2 = 1","SELECT * from syn123456 WHERE col2 = 1",pd.DataFrame({'col1': [np.nan, "value1", "None"],'col2': [1, 1, 1]})),
(
pd.DataFrame({"col1": ["value1"], "col2": [1]}),
"col1 = 'value1'",
"SELECT * from syn123456 WHERE col1 = 'value1'",
pd.DataFrame({"col1": ["value1"], "col2": [1]}),
),
(
pd.DataFrame({"col1": ["NA", "value1", "None"], "col2": [1, 1, 1]}),
"col2 = 1",
"SELECT * from syn123456 WHERE col2 = 1",
pd.DataFrame({"col1": [np.nan, "value1", "None"], "col2": [1, 1, 1]}),
),
],
ids = ["selected_row_all_columns","pull_table_with_na_values_all_columns"],
ids=["selected_row_all_columns", "pull_table_with_na_values_all_columns"],
)
def test_download_synapse_table_with_condition(syn, table_schema, query_return_df, condition, query,expected_df):
syn.tableQuery = MagicMock(return_value = Table(table_schema, query_return_df))
result = utilities.download_synapse_table(syn, "syn123456", condition = condition)
def test_download_synapse_table_with_condition(
syn, table_schema, query_return_df, condition, query, expected_df
):
syn.tableQuery = MagicMock(return_value=Table(table_schema, query_return_df))
result = utilities.download_synapse_table(syn, "syn123456", condition=condition)

# validate
syn.tableQuery.assert_called_once_with(query)
pd.testing.assert_frame_equal(result, expected_df)

def test_download_empty_synapse_table_with_condition(syn, table_schema, ):
syn.tableQuery = MagicMock(return_value = Table(table_schema, pd.DataFrame(columns = ["col1", "col2"])))
result = utilities.download_synapse_table(syn, "syn123456", condition = "col2 = 1")


def test_download_empty_synapse_table_with_condition(
syn,
table_schema,
):
syn.tableQuery = MagicMock(
return_value=Table(table_schema, pd.DataFrame(columns=["col1", "col2"]))
)
result = utilities.download_synapse_table(syn, "syn123456", condition="col2 = 1")

# validate
syn.tableQuery.assert_called_once_with("SELECT * from syn123456 WHERE col2 = 1")
pd.testing.assert_frame_equal(result, pd.DataFrame(columns = ["col1", "col2"]))

pd.testing.assert_frame_equal(result, pd.DataFrame(columns=["col1", "col2"]))
120 changes: 90 additions & 30 deletions scripts/table_updates/update_data_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def get_main_genie_clinical_sample_file(
Returns:
pandas.DataFrame: the read in clinical file as dataframe
"""
release_files = download_synapse_table(syn, table_id = release_files_table_synid)
release_files = download_synapse_table(syn, table_id=release_files_table_synid)
clinical_link_synid = release_files[
(release_files["release"] == release)
& (release_files["name"] == "data_clinical_sample.txt")
Expand All @@ -74,17 +74,25 @@ def get_main_genie_clinical_sample_file(
return clinical_df[["SAMPLE_ID", "SEQ_YEAR"]]


def _store_data(syn: synapseclient.Synapse, table_id: str, label_data: pandas.DataFrame, table_type: str, cohort: str, logger: logging.Logger, dry_run: bool):
def _store_data(
syn: synapseclient.Synapse,
table_id: str,
label_data: pandas.DataFrame,
table_type: str,
cohort: str,
logger: logging.Logger,
dry_run: bool,
):
"""Helper function to store data to each table in the master table.
Before uploading data to the Synapse table, the provided label data is filtered
based on matching columns between the label data and the table schema, as well
as the form_label. Data cleansing, including the removal of rows with no data
Before uploading data to the Synapse table, the provided label data is filtered
based on matching columns between the label data and the table schema, as well
as the form_label. Data cleansing, including the removal of rows with no data
and the conversion of numeric values to integers, is applied to the label data.
When table_type is set to 'primary', existing data for the cohort is wiped, and
new data is inserted. When table_type is set to 'irr', only records that do not
already exist in the table are added. The dry_run flag can be used to toggle
When table_type is set to 'primary', existing data for the cohort is wiped, and
new data is inserted. When table_type is set to 'irr', only records that do not
already exist in the table are added. The dry_run flag can be used to toggle
between uploading the table to Synapse or saving it locally.
Args:
Expand Down Expand Up @@ -136,7 +144,13 @@ def _store_data(syn: synapseclient.Synapse, table_id: str, label_data: pandas.Da
)
if table_type == "irr":
# check for exsiting id to update for new data only
existing_records = list(set(download_synapse_table(syn, table_id = table_schema.id, condition= f"cohort = '{cohort}'")["record_id"]))
existing_records = list(
set(
download_synapse_table(
syn, table_id=table_schema.id, condition=f"cohort = '{cohort}'"
)["record_id"]
)
)
temp_data = temp_data[~temp_data["record_id"].isin(existing_records)]
if not dry_run:
if table_type == "primary":
Expand All @@ -146,7 +160,15 @@ def _store_data(syn: synapseclient.Synapse, table_id: str, label_data: pandas.Da
temp_data.to_csv(table_id + "_temp.csv")


def store_data(syn: synapseclient.Synapse, master_table: pandas.DataFrame, label_data: pandas.DataFrame, table_type: str, cohort: str, logger: logging.Logger, dry_run: bool):
def store_data(
syn: synapseclient.Synapse,
master_table: pandas.DataFrame,
label_data: pandas.DataFrame,
table_type: str,
cohort: str,
logger: logging.Logger,
dry_run: bool,
):
"""Store data to each table in the master table.
Args:
Expand All @@ -162,6 +184,7 @@ def store_data(syn: synapseclient.Synapse, master_table: pandas.DataFrame, label
for table_id in master_table["id"]:
_store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run)


def get_phi_cutoff(unit):
switcher = {"day": math.floor(89 * 365), "month": math.floor(89 * 12), "year": 89}
return switcher.get(unit, "Invalid unit")
Expand Down Expand Up @@ -257,25 +280,31 @@ def _redact_table(df, interval_cols_info):
return df, record_to_redact


def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas.DataFrame, full_data_table_info: pandas.DataFrame, cohort: str, logger: logging.Logger):
def update_redact_table(
syn: synapseclient.Synapse,
redacted_table_info: pandas.DataFrame,
full_data_table_info: pandas.DataFrame,
cohort: str,
logger: logging.Logger,
):
"""Update redacted table
Before uploading data to the Synapse table, records are identified for redaction
based on criteria such as birth year, sequencing age, vital status, and interval
Before uploading data to the Synapse table, records are identified for redaction
based on criteria such as birth year, sequencing age, vital status, and interval
fields. The redacted data is then stored in the BPC internal tables.
A special case applies to the Patient Characteristics table: flagged records are
updated, with the birth_year field cleared. Additionally, the "redacted" column
A special case applies to the Patient Characteristics table: flagged records are
updated, with the birth_year field cleared. Additionally, the "redacted" column
in this table is updated within the Sage Internal project.
Args:
syn (synapseclient.Synapse): Synapse client connection
redacted_table_info (pandas.DataFrame): Table of all of the redacted tables
full_data_table_info (pandas.DataFrame): Table of all of the primary or irr BPC tables
cohort (string): Cohort name
logger (logging.Logger): The custom logger. Optional.
"""
interval_cols_info = download_synapse_table(syn, table_id = "syn23281483")
interval_cols_info = download_synapse_table(syn, table_id="syn23281483")
# Create new master table
master_table = redacted_table_info.merge(
full_data_table_info, on="name", suffixes=("_redacted", "_full")
Expand All @@ -292,9 +321,24 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas.
].values[0]
# download tables
condition = f"cohort = '{cohort}'"
curation_info = download_synapse_table(syn, table_id = curation_table_id, select = "record_id, curation_dt", condition = condition)
patient_info = download_synapse_table(syn, table_id = patient_table_id, select = "record_id, birth_year, hybrid_death_ind", condition = condition)
sample_info = download_synapse_table(syn, table_id = sample_table_id, select = "record_id, cpt_genie_sample_id, age_at_seq_report", condition = condition)
curation_info = download_synapse_table(
syn,
table_id=curation_table_id,
select="record_id, curation_dt",
condition=condition,
)
patient_info = download_synapse_table(
syn,
table_id=patient_table_id,
select="record_id, birth_year, hybrid_death_ind",
condition=condition,
)
sample_info = download_synapse_table(
syn,
table_id=sample_table_id,
select="record_id, cpt_genie_sample_id, age_at_seq_report",
condition=condition,
)
patient_curation_info = patient_info.merge(
curation_info, how="left", on="record_id"
)
Expand All @@ -319,7 +363,7 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas.
for _, row in master_table.iterrows():
if row["name"] != "Patient Characteristics":
table_id = row["id_full"]
df = download_synapse_table(syn, table_id, condition = condition)
df = download_synapse_table(syn, table_id, condition=condition)
new_df, new_record_to_redact = _redact_table(df, interval_cols_info)
new_df.reset_index(drop=True, inplace=True)
record_to_redact = record_to_redact + new_record_to_redact
Expand All @@ -332,7 +376,7 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas.
table = syn.store(Table(table_schema, new_df))

# Modify patient table
df = download_synapse_table(syn, table_id = patient_table_id, condition = condition)
df = download_synapse_table(syn, table_id=patient_table_id, condition=condition)
new_df, new_record_to_redact = _redact_table(df, interval_cols_info)
new_df.reset_index(drop=True, inplace=True)
record_to_redact = record_to_redact + new_record_to_redact
Expand Down Expand Up @@ -361,7 +405,9 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas.
pt_dat_query = syn.tableQuery(
f"SELECT cohort, record_id FROM {full_pt_id} where cohort = '{cohort}'"
)
pt_dat = download_synapse_table(syn, table_id = full_pt_id, select = "cohort, record_id", condition = condition)
pt_dat = download_synapse_table(
syn, table_id=full_pt_id, select="cohort, record_id", condition=condition
)
pt_dat.index = pt_dat.index.map(str)
pt_dat["index"] = pt_dat.index
info_to_update = new_df[["cohort", "record_id", "redacted"]]
Expand Down Expand Up @@ -398,7 +444,9 @@ def custom_fix_for_cancer_panel_test_table(
].values[0]
cpt_table_schema = syn.get(cpt_table_id)
cpt_dat_query = syn.tableQuery("SELECT cpt_genie_sample_id FROM %s" % cpt_table_id)
cpt_dat = download_synapse_table(syn, table_id = cpt_table_id, select = "cpt_genie_sample_id")
cpt_dat = download_synapse_table(
syn, table_id=cpt_table_id, select="cpt_genie_sample_id"
)
cpt_dat.index = cpt_dat.index.map(str)
cpt_dat["index"] = cpt_dat.index
genie_sample_dat = get_main_genie_clinical_sample_file(
Expand All @@ -424,9 +472,16 @@ def custom_fix_for_cancer_panel_test_table(
"SELECT cpt_sample_type FROM %s WHERE cpt_sample_type in (1,2,3,4,5,6,7)"
% cpt_table_id
)
cpt_dat = download_synapse_table(syn, table_id = cpt_table_id, select = "cpt_sample_type", condition = "cpt_sample_type in (1,2,3,4,5,6,7)")
cpt_dat = download_synapse_table(
syn,
table_id=cpt_table_id,
select="cpt_sample_type",
condition="cpt_sample_type in (1,2,3,4,5,6,7)",
)
cpt_dat["cpt_sample_type"] = pandas.to_numeric(cpt_dat["cpt_sample_type"])
sample_type_mapping = download_synapse_table(syn, table_id = config['main_genie_sample_mapping_table'])
sample_type_mapping = download_synapse_table(
syn, table_id=config["main_genie_sample_mapping_table"]
)
sample_type_mapping_dict = sample_type_mapping.set_index("CODE").to_dict()[
"DESCRIPTION"
]
Expand All @@ -444,7 +499,10 @@ def main():
description="Update data tables on Synapse for BPC databases"
)
parser.add_argument(
"table", type=str, help="Specify table type to run", choices=TABLES["production"].keys()
"table",
type=str,
help="Specify table type to run",
choices=TABLES["production"].keys(),
)
parser.add_argument(
"-s",
Expand Down Expand Up @@ -499,7 +557,7 @@ def main():
else:
TABLE_INFO = TABLES["staging"]
table_id, condition = list(TABLE_INFO[table_type])
master_table = download_synapse_table(syn, table_id, condition = condition)
master_table = download_synapse_table(syn, table_id, condition=condition)
# download data files
# TODO: find the cohort that has new data
# This is a mapping to all the intake data. e.g: ProstateBPCIntake_data
Expand All @@ -514,7 +572,9 @@ def main():
custom_fix_for_cancer_panel_test_table(syn, master_table, logger, config)
if table_type == "primary":
table_id, condition = list(TABLE_INFO["redacted"])
redacted_table_info = download_synapse_table(syn, table_id, condition = condition)
redacted_table_info = download_synapse_table(
syn, table_id, condition=condition
)
logger.info("Updating redacted tables...")
update_redact_table(syn, redacted_table_info, master_table, cohort, logger)
logger.info("Updating version for redacted tables")
Expand Down
16 changes: 10 additions & 6 deletions scripts/table_updates/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ def check_empty_row(row, cols_to_skip):
return row.drop(cols_to_skip).isnull().all()


def download_synapse_table(syn, table_id: str, select: str = "*", condition: str = "") -> pandas.DataFrame:
def download_synapse_table(
syn, table_id: str, select: str = "*", condition: str = ""
) -> pandas.DataFrame:
"""Download Synapse Table with the given table ID and condition
Args:
syn: Synapse credential
table_id: Synapse ID of a table
select: Columns to be selected. Defaults to all columns.
condition: Additional condition for querying the table. Defaults to all rows.
select: Columns to be selected. Defaults to all columns.
condition: Additional condition for querying the table. Defaults to all rows.
Returns:
A Pandas dataframe of the Synapse table
Expand All @@ -78,9 +80,11 @@ def download_synapse_table(syn, table_id: str, select: str = "*", condition: str
"-NaN",
"nan",
"-nan",
""
"",
]
synapse_table = synapse_table.asDataFrame(na_values=na_values, keep_default_na=False)
synapse_table = synapse_table.asDataFrame(
na_values=na_values, keep_default_na=False
)
return synapse_table


Expand Down Expand Up @@ -113,7 +117,7 @@ def get_data(syn, label_data_id, cohort):
"-NaN",
"nan",
"-nan",
""
"",
]
label_data = pandas.read_csv(
syn.get(label_data_id).path,
Expand Down

0 comments on commit 3e11436

Please sign in to comment.