From 7af1606420ebccefcefa2abdeddd581642a5dce8 Mon Sep 17 00:00:00 2001 From: d33bs Date: Sat, 27 Apr 2024 14:56:19 -0600 Subject: [PATCH 01/14] customize sorting capabilities for further performance in #175 --- cytotable/constants.py | 7 +++++++ cytotable/convert.py | 39 +++++++++++++++++++++++++++++++++++---- cytotable/presets.py | 19 +++++++++++-------- tests/conftest.py | 8 ++++---- 4 files changed, 57 insertions(+), 16 deletions(-) diff --git a/cytotable/constants.py b/cytotable/constants.py index 591b3238..2fb3182c 100644 --- a/cytotable/constants.py +++ b/cytotable/constants.py @@ -68,6 +68,13 @@ ], } +# metadata column names and types for internal use within CytoTable +CYOTABLE_META_COLUMN_TYPES = { + "cytotable_meta_source_path": "VARCHAR", + "cytotable_meta_offset": "BIGINT", + "cytotable_meta_rownum": "BIGINT", +} + CYTOTABLE_DEFAULT_PARQUET_METADATA = { "data-producer": "https://github.com/cytomining/CytoTable", "data-producer-version": str(_get_cytotable_version()), diff --git a/cytotable/convert.py b/cytotable/convert.py index 84972935..a61fdf05 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -309,6 +309,8 @@ def _source_chunk_to_parquet( _write_parquet_table_with_metadata, ) + from cytotable.constants import CYOTABLE_META_COLUMN_TYPES + # attempt to build dest_path source_dest_path = ( f"{dest_path}/{str(pathlib.Path(source_group_name).stem).lower()}/" @@ -318,7 +320,20 @@ def _source_chunk_to_parquet( # build the column selection block of query select_columns = ",".join( + # add cytotable metadata columns [ + ( + f"CAST( '{source['source_path']}' AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path']})" + " AS cytotable_meta_source_path" + ), + f"CAST( {offset} as {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset']}) AS cytotable_meta_offset", + ( + f"CAST( (row_number() OVER ()) AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum']})" + " AS cytotable_meta_rownum" + ), + ] + # add source table columns + + [ # here we cast the column to the specified type ensure the colname remains the same f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\"" for column in source["columns"] @@ -422,7 +437,10 @@ def _prepend_column_name( import pyarrow.parquet as parquet - from cytotable.constants import CYTOTABLE_ARROW_USE_MEMORY_MAPPING + from cytotable.constants import ( + CYTOTABLE_ARROW_USE_MEMORY_MAPPING, + CYOTABLE_META_COLUMN_TYPES, + ) from cytotable.utils import _write_parquet_table_with_metadata logger = logging.getLogger(__name__) @@ -470,8 +488,10 @@ def _prepend_column_name( # source_group_name_stem: 'Cells' # column_name: 'AreaShape_Area' # updated_column_name: 'Cells_AreaShape_Area' - if column_name not in identifying_columns and not column_name.startswith( - source_group_name_stem.capitalize() + if ( + column_name not in identifying_columns + and not column_name.startswith(source_group_name_stem.capitalize()) + and column_name not in list(CYOTABLE_META_COLUMN_TYPES.keys()) ): updated_column_names.append(f"{source_group_name_stem}_{column_name}") # if-condition for prepending 'Metadata_' to column name @@ -697,13 +717,25 @@ def _prepare_join_sql( """ import pathlib + from cytotable.constants import CYOTABLE_META_COLUMN_TYPES + # replace with real location of sources for join sql + order_by_tables = [] for key, val in sources.items(): if pathlib.Path(key).stem.lower() in joins.lower(): joins = joins.replace( f"'{str(pathlib.Path(key).stem.lower())}.parquet'", str([str(table) for table in val[0]["table"]]), ) + order_by_tables.append(str(pathlib.Path(key).stem.lower())) + + order_by_sql = "ORDER BY " + ", ".join([f"{table}.{meta_column}" for table in order_by_tables for meta_column in list(CYOTABLE_META_COLUMN_TYPES.keys())]) + joins = f"""{joins} + {order_by_sql} + """ + + print(joins) + return joins @@ -752,7 +784,6 @@ def _join_source_chunk( result = ddb_reader.execute( f""" {joins} - {"ORDER BY ALL" if "ORDER BY" not in joins.upper() else ""} LIMIT {chunk_size} OFFSET {offset} """ ).arrow() diff --git a/cytotable/presets.py b/cytotable/presets.py index 1503d2f4..85c53b6b 100644 --- a/cytotable/presets.py +++ b/cytotable/presets.py @@ -29,8 +29,11 @@ # compartment and metadata joins performed using DuckDB SQL # and modified at runtime as needed "CONFIG_JOINS": """ - WITH Image_Filtered AS ( + WITH image AS ( SELECT + cytotable_meta_source_path, + cytotable_meta_offset, + cytotable_meta_rownum, /* seeks columns by name, avoiding failure if some do not exist */ COLUMNS('^Metadata_ImageNumber$|^Image_Metadata_Well$|^Image_Metadata_Plate$') FROM @@ -46,7 +49,7 @@ LEFT JOIN read_parquet('nuclei.parquet') AS nuclei ON nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber AND nuclei.Metadata_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Nuclei - LEFT JOIN Image_Filtered AS image ON + LEFT JOIN image AS image ON image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber """, }, @@ -74,7 +77,7 @@ # compartment and metadata joins performed using DuckDB SQL # and modified at runtime as needed "CONFIG_JOINS": """ - WITH Per_Image_Filtered AS ( + WITH per_image AS ( SELECT Metadata_ImageNumber, Image_Metadata_Well, @@ -92,7 +95,7 @@ LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei ON per_nuclei.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber AND per_nuclei.Nuclei_Number_Object_Number = per_cytoplasm.Cytoplasm_Parent_Nuclei - LEFT JOIN Per_Image_Filtered AS per_image ON + LEFT JOIN per_image AS per_image ON per_image.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber """, }, @@ -125,7 +128,7 @@ # compartment and metadata joins performed using DuckDB SQL # and modified at runtime as needed "CONFIG_JOINS": """ - WITH Per_Image_Filtered AS ( + WITH per_image AS ( SELECT Metadata_ImageNumber, Image_Metadata_Well, @@ -143,7 +146,7 @@ LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei ON per_nuclei.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber AND per_nuclei.Metadata_Nuclei_Number_Object_Number = per_cytoplasm.Metadata_Cytoplasm_Parent_Nuclei - LEFT JOIN Per_Image_Filtered AS per_image ON + LEFT JOIN per_image AS per_image ON per_image.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber """, }, @@ -178,7 +181,7 @@ # compartment and metadata joins performed using DuckDB SQL # and modified at runtime as needed "CONFIG_JOINS": """ - WITH Image_Filtered AS ( + WITH image AS ( SELECT Metadata_TableNumber, Metadata_ImageNumber, @@ -199,7 +202,7 @@ nuclei.Metadata_TableNumber = cytoplasm.Metadata_TableNumber AND nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber AND nuclei.Nuclei_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Nuclei - LEFT JOIN Image_Filtered AS image ON + LEFT JOIN image AS image ON image.Metadata_TableNumber = cytoplasm.Metadata_TableNumber AND image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber """, diff --git a/tests/conftest.py b/tests/conftest.py index b11ceeb8..4929482f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -448,7 +448,7 @@ def fixture_cellprofiler_merged_nf1data( .execute( """ /* perform query on sqlite tables through duckdb */ - WITH Per_Image_Filtered AS ( + WITH per_image AS ( SELECT ImageNumber, Image_Metadata_Well, @@ -456,7 +456,7 @@ def fixture_cellprofiler_merged_nf1data( FROM Per_Image ) SELECT * - FROM Per_Image_Filtered image + FROM per_image image LEFT JOIN Per_Cytoplasm cytoplasm ON image.ImageNumber = cytoplasm.ImageNumber LEFT JOIN Per_Cells cells ON @@ -505,7 +505,7 @@ def fixture_cytominerdatabase_merged_cellhealth( """ sql_stmt = """ - WITH Image_Filtered AS ( + WITH image AS ( SELECT TableNumber, ImageNumber, @@ -534,7 +534,7 @@ def fixture_cytominerdatabase_merged_cellhealth( FROM Nuclei ) SELECT DISTINCT * - FROM Image_Filtered image + FROM image LEFT JOIN Cytoplasm_renamed cytoplasm ON image.ImageNumber = cytoplasm.ImageNumber LEFT JOIN Cells_renamed cells ON From 482efd7278fc9d7d45e678ee6876866e17da1a99 Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 29 Apr 2024 11:51:54 -0600 Subject: [PATCH 02/14] simplify sql; exclude cytotable meta --- cytotable/convert.py | 8 ++++++++ cytotable/presets.py | 27 +++++++++------------------ 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index a61fdf05..c0c0c8d0 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -774,17 +774,25 @@ def _join_source_chunk( import pyarrow.parquet as parquet from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata + from cytotable.constants import CYOTABLE_META_COLUMN_TYPES # Attempt to read the data to parquet file # using duckdb for extraction and pyarrow for # writing data to a parquet file. # read data with chunk size + offset # and export to parquet + exclude_meta_cols = [f"c NOT LIKE '{col}%'" for col in list(CYOTABLE_META_COLUMN_TYPES.keys())] with _duckdb_reader() as ddb_reader: result = ddb_reader.execute( f""" + WITH joined AS ( {joins} LIMIT {chunk_size} OFFSET {offset} + ) + SELECT + /* exclude metadata columns from the results by using a CTE */ + COLUMNS (c -> ({" AND ".join(exclude_meta_cols)})) + FROM joined; """ ).arrow() diff --git a/cytotable/presets.py b/cytotable/presets.py index 85c53b6b..008a78a4 100644 --- a/cytotable/presets.py +++ b/cytotable/presets.py @@ -29,28 +29,19 @@ # compartment and metadata joins performed using DuckDB SQL # and modified at runtime as needed "CONFIG_JOINS": """ - WITH image AS ( - SELECT - cytotable_meta_source_path, - cytotable_meta_offset, - cytotable_meta_rownum, - /* seeks columns by name, avoiding failure if some do not exist */ - COLUMNS('^Metadata_ImageNumber$|^Image_Metadata_Well$|^Image_Metadata_Plate$') - FROM - read_parquet('image.parquet') - ) SELECT - * + image.Metadata_ImageNumber, + cytoplasm.*, + cells.*, + nuclei.* FROM read_parquet('cytoplasm.parquet') AS cytoplasm - LEFT JOIN read_parquet('cells.parquet') AS cells ON - cells.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber - AND cells.Metadata_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells - LEFT JOIN read_parquet('nuclei.parquet') AS nuclei ON - nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber + LEFT JOIN read_parquet('cells.parquet') AS cells USING (Metadata_ImageNumber) + LEFT JOIN read_parquet('nuclei.parquet') AS nuclei USING (Metadata_ImageNumber) + LEFT JOIN read_parquet('image.parquet') AS image USING (Metadata_ImageNumber) + WHERE + cells.Metadata_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells AND nuclei.Metadata_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Nuclei - LEFT JOIN image AS image ON - image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber """, }, "cellprofiler_sqlite": { From 7742b1382e2a0f61884569a4339a4ef31ae56246 Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 29 Apr 2024 12:57:25 -0600 Subject: [PATCH 03/14] exclude duplicate columns --- cytotable/presets.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cytotable/presets.py b/cytotable/presets.py index 008a78a4..da700dfd 100644 --- a/cytotable/presets.py +++ b/cytotable/presets.py @@ -31,9 +31,9 @@ "CONFIG_JOINS": """ SELECT image.Metadata_ImageNumber, - cytoplasm.*, - cells.*, - nuclei.* + cytoplasm.* EXCLUDE (Metadata_ImageNumber), + cells.* EXCLUDE (Metadata_ImageNumber, Metadata_ObjectNumber), + nuclei.* EXCLUDE (Metadata_ImageNumber, Metadata_ObjectNumber) FROM read_parquet('cytoplasm.parquet') AS cytoplasm LEFT JOIN read_parquet('cells.parquet') AS cells USING (Metadata_ImageNumber) From 9b39c4d79ee836c5372c4d17a0bdce848dd1858d Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 29 Apr 2024 14:26:16 -0600 Subject: [PATCH 04/14] updating tests --- cytotable/convert.py | 17 +++++++++++------ tests/conftest.py | 15 +++++++++++++++ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index c0c0c8d0..6fd07115 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -728,15 +728,20 @@ def _prepare_join_sql( str([str(table) for table in val[0]["table"]]), ) order_by_tables.append(str(pathlib.Path(key).stem.lower())) - - order_by_sql = "ORDER BY " + ", ".join([f"{table}.{meta_column}" for table in order_by_tables for meta_column in list(CYOTABLE_META_COLUMN_TYPES.keys())]) + + order_by_sql = "ORDER BY " + ", ".join( + [ + f"{table}.{meta_column}" + for table in order_by_tables + for meta_column in list(CYOTABLE_META_COLUMN_TYPES.keys()) + ] + ) joins = f"""{joins} {order_by_sql} """ print(joins) - return joins @@ -771,8 +776,6 @@ def _join_source_chunk( import pathlib - import pyarrow.parquet as parquet - from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata from cytotable.constants import CYOTABLE_META_COLUMN_TYPES @@ -781,7 +784,9 @@ def _join_source_chunk( # writing data to a parquet file. # read data with chunk size + offset # and export to parquet - exclude_meta_cols = [f"c NOT LIKE '{col}%'" for col in list(CYOTABLE_META_COLUMN_TYPES.keys())] + exclude_meta_cols = [ + f"c NOT LIKE '{col}%'" for col in list(CYOTABLE_META_COLUMN_TYPES.keys()) + ] with _duckdb_reader() as ddb_reader: result = ddb_reader.execute( f""" diff --git a/tests/conftest.py b/tests/conftest.py index 4929482f..20b70fde 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -219,6 +219,9 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: "ImageNumber": pa.array(["1", "1", "2", "2"]), "Image_Metadata_Plate": pa.array(["001", "001", "002", "002"]), "Image_Metadata_Well": pa.array(["A1", "A1", "A2", "A2"]), + "cytotable_meta_source_path": pa.array(["image.csv", "image.csv", "image.csv", "image.csv"]), + "cytotable_meta_offset": pa.array([50, 50, 100, 100]), + "cytotable_meta_rownum": pa.array([1, 2, 3, 4]), } ) table_cytoplasm = pa.Table.from_pydict( @@ -226,6 +229,9 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: "ImageNumber": pa.array(["1", "1", "2", "2"]), "Cytoplasm_ObjectNumber": pa.array([1, 2, 1, 2]), "Cytoplasm_Feature_X": pa.array([0.1, 0.2, 0.1, 0.2]), + "cytotable_meta_source_path": pa.array(["cytoplasm.csv", "cytoplasm.csv", "cytoplasm.csv", "cytoplasm.csv"]), + "cytotable_meta_offset": pa.array([50, 50, 100, 100]), + "cytotable_meta_rownum": pa.array([1, 2, 3, 4]), } ) table_cells = pa.Table.from_pydict( @@ -233,6 +239,9 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: "ImageNumber": pa.array(["1", "1", "2", "2"]), "Cells_ObjectNumber": pa.array([1, 2, 1, 2]), "Cells_Feature_Y": pa.array([0.01, 0.02, 0.01, 0.02]), + "cytotable_meta_source_path": pa.array(["cells.csv", "cells.csv", "cells.csv", "cells.csv"]), + "cytotable_meta_offset": pa.array([50, 50, 100, 100]), + "cytotable_meta_rownum": pa.array([1, 2, 3, 4]), } ) table_nuclei_1 = pa.Table.from_pydict( @@ -255,6 +264,9 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: 0.002, ] ), + "cytotable_meta_source_path": pa.array(["nuclei_1.csv", "nuclei_1.csv"]), + "cytotable_meta_offset": pa.array([50, 50]), + "cytotable_meta_rownum": pa.array([1, 2]), } ) @@ -263,6 +275,9 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: "ImageNumber": pa.array(["2", "2"]), "Nuclei_ObjectNumber": pa.array([1, 2]), "Nuclei_Feature_Z": pa.array([0.001, 0.002]), + "cytotable_meta_source_path": pa.array(["nuclei_1.csv", "nuclei_1.csv"]), + "cytotable_meta_offset": pa.array([50, 50]), + "cytotable_meta_rownum": pa.array([1, 2]), } ) From add4050be0ffa0235873bb98fb26cf257dca0177 Mon Sep 17 00:00:00 2001 From: d33bs Date: Tue, 30 Apr 2024 12:10:14 -0600 Subject: [PATCH 05/14] fixing tests --- tests/conftest.py | 2 ++ tests/test_convert.py | 70 ++++++++++++++++++++++++------------------- 2 files changed, 42 insertions(+), 30 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 20b70fde..d5c01dcc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -228,6 +228,8 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: { "ImageNumber": pa.array(["1", "1", "2", "2"]), "Cytoplasm_ObjectNumber": pa.array([1, 2, 1, 2]), + "Cytoplasm_Parent_Cells": pa.array([1, 2, 1, 2]), + "Cytoplasm_Parent_Nuclei": pa.array([1, 2, 1, 2]), "Cytoplasm_Feature_X": pa.array([0.1, 0.2, 0.1, 0.2]), "cytotable_meta_source_path": pa.array(["cytoplasm.csv", "cytoplasm.csv", "cytoplasm.csv", "cytoplasm.csv"]), "cytotable_meta_offset": pa.array([50, 50, 100, 100]), diff --git a/tests/test_convert.py b/tests/test_convert.py index 07598b8e..47a6b29f 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -320,7 +320,7 @@ def test_prepare_join_sql( example_local_sources: Dict[str, List[Dict[str, Any]]], ): """ - Tests _prepare_join_sql + Tests _prepare_join_sql by using sources to run the SQL join statement. After running _prepare_join_sql we'd expect something like: SELECT @@ -337,31 +337,43 @@ def test_prepare_join_sql( # attempt to run query against prepared_join_sql with test data with _duckdb_reader() as ddb_reader: - result = ( - ddb_reader.execute( - _prepare_join_sql( - sources=example_local_sources, - # simplified join for example dataset - joins=""" + result = ddb_reader.execute( + _prepare_join_sql( + sources=example_local_sources, + # simplified join for example dataset + joins=""" SELECT * FROM - read_parquet('image.parquet') AS image - LEFT JOIN read_parquet('cytoplasm.parquet') AS cytoplasm ON - cytoplasm.ImageNumber = image.ImageNumber - LEFT JOIN read_parquet('cells.parquet') AS cells ON - cells.ImageNumber = cytoplasm.ImageNumber - LEFT JOIN read_parquet('nuclei.parquet') AS nuclei ON - nuclei.ImageNumber = cytoplasm.ImageNumber + read_parquet('cytoplasm.parquet') AS cytoplasm + LEFT JOIN read_parquet('cells.parquet') AS cells USING (ImageNumber) + LEFT JOIN read_parquet('nuclei.parquet') AS nuclei USING (ImageNumber) + LEFT JOIN read_parquet('image.parquet') AS image USING (ImageNumber) + WHERE + cells.Cells_ObjectNumber = cytoplasm.Cytoplasm_Parent_Cells + AND nuclei.Nuclei_ObjectNumber = cytoplasm.Cytoplasm_Parent_Nuclei """, - ).result() - ) - .arrow() - .to_pydict() - ) - - # check that we received data back - assert len(result) == 9 + ).result() + ).arrow() + + # check that we received expected data back + assert result.shape == (4, 23) + assert result.slice(length=1).to_pydict() == { + "ImageNumber": ["1"], + "Cytoplasm_ObjectNumber": [1], + "Cytoplasm_Parent_Cells": [1], + "Cytoplasm_Parent_Nuclei": [1], + "Cytoplasm_Feature_X": [0.1], + "cytotable_meta_source_path": ["image.csv"], + "cytotable_meta_offset": [50], + "cytotable_meta_rownum": [1], + "Cells_ObjectNumber": [1], + "Cells_Feature_Y": [0.01], + "Nuclei_ObjectNumber": [1], + "Nuclei_Feature_Z": [0.001], + "Image_Metadata_Plate": ["001"], + "Image_Metadata_Well": ["A1"], + } def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str): @@ -401,10 +413,8 @@ def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str): dest_path=f"{fx_tempdir}/destination.parquet", joins=f""" SELECT * - FROM read_parquet('{fx_tempdir}/example_a_merged.parquet') as example_a - JOIN read_parquet('{fx_tempdir}/example_b_merged.parquet') as example_b ON - example_b.id1 = example_a.id1 - AND example_b.id2 = example_a.id2 + FROM read_parquet('{test_path_a}') as example_a + JOIN read_parquet('{test_path_b}') as example_b USING(id1, id2) """, chunk_size=2, offset=0, @@ -417,10 +427,10 @@ def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str): assert result_table.equals( other=pa.Table.from_pydict( { - "field1": ["foo", "foo"], - "field2": [True, True], - "id1": [1, 1], - "id2": ["a", "b"], + "field1": ["foo", "bar"], + "field2": [True, False], + "id1": [1, 2], + "id2": ["a", "a"], }, # use schema from result as a reference for col order schema=result_table.schema, From 80767235605248c57510fda406f026087fc2f77b Mon Sep 17 00:00:00 2001 From: d33bs Date: Tue, 30 Apr 2024 14:09:56 -0600 Subject: [PATCH 06/14] simulate csv source by removing meta --- tests/conftest.py | 52 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index d5c01dcc..3cfc73ae 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,6 +29,7 @@ from sqlalchemy.util import deprecations from cytotable.utils import _column_sort, _default_parsl_config, _parsl_loaded +from cytotable.constants import CYOTABLE_META_COLUMN_TYPES # filters sqlalchemy 2.0 uber warning # referenced from: https://stackoverflow.com/a/76308286 @@ -219,7 +220,9 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: "ImageNumber": pa.array(["1", "1", "2", "2"]), "Image_Metadata_Plate": pa.array(["001", "001", "002", "002"]), "Image_Metadata_Well": pa.array(["A1", "A1", "A2", "A2"]), - "cytotable_meta_source_path": pa.array(["image.csv", "image.csv", "image.csv", "image.csv"]), + "cytotable_meta_source_path": pa.array( + ["image.csv", "image.csv", "image.csv", "image.csv"] + ), "cytotable_meta_offset": pa.array([50, 50, 100, 100]), "cytotable_meta_rownum": pa.array([1, 2, 3, 4]), } @@ -231,7 +234,9 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: "Cytoplasm_Parent_Cells": pa.array([1, 2, 1, 2]), "Cytoplasm_Parent_Nuclei": pa.array([1, 2, 1, 2]), "Cytoplasm_Feature_X": pa.array([0.1, 0.2, 0.1, 0.2]), - "cytotable_meta_source_path": pa.array(["cytoplasm.csv", "cytoplasm.csv", "cytoplasm.csv", "cytoplasm.csv"]), + "cytotable_meta_source_path": pa.array( + ["cytoplasm.csv", "cytoplasm.csv", "cytoplasm.csv", "cytoplasm.csv"] + ), "cytotable_meta_offset": pa.array([50, 50, 100, 100]), "cytotable_meta_rownum": pa.array([1, 2, 3, 4]), } @@ -241,7 +246,9 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: "ImageNumber": pa.array(["1", "1", "2", "2"]), "Cells_ObjectNumber": pa.array([1, 2, 1, 2]), "Cells_Feature_Y": pa.array([0.01, 0.02, 0.01, 0.02]), - "cytotable_meta_source_path": pa.array(["cells.csv", "cells.csv", "cells.csv", "cells.csv"]), + "cytotable_meta_source_path": pa.array( + ["cells.csv", "cells.csv", "cells.csv", "cells.csv"] + ), "cytotable_meta_offset": pa.array([50, 50, 100, 100]), "cytotable_meta_rownum": pa.array([1, 2, 3, 4]), } @@ -308,7 +315,18 @@ def fixture_example_local_sources( parents=True, exist_ok=True ) # write example input - csv.write_csv(table, f"{fx_tempdir}/example/{number}/{name}.csv") + csv.write_csv( + # we remove simulated cytotable metadata columns to be more realistic + # (incoming sources would not usually contain these) + table.select( + [ + column + for column in table.column_names + if column not in list(CYOTABLE_META_COLUMN_TYPES.keys()) + ] + ), + f"{fx_tempdir}/example/{number}/{name}.csv", + ) # write example output parquet.write_table( table, f"{fx_tempdir}/example_dest/{name}/{number}/{name}.parquet" @@ -373,13 +391,25 @@ def col_renames(name: str, table: pa.Table): """ return table.rename_columns( [ - f"Metadata_{colname}" - if colname in ["ImageNumber", "ObjectNumber"] - else f"Metadata_{name}_{colname}" - if any(name in colname for name in ["Parent_Cells", "Parent_Nuclei"]) - else f"{name}_{colname}" - if not (colname.startswith(name) or colname.startswith("Metadata_")) - else colname + ( + f"Metadata_{colname}" + if colname in ["ImageNumber", "ObjectNumber"] + else ( + f"Metadata_{name}_{colname}" + if any( + name in colname + for name in ["Parent_Cells", "Parent_Nuclei"] + ) + else ( + f"{name}_{colname}" + if not ( + colname.startswith(name) + or colname.startswith("Metadata_") + ) + else colname + ) + ) + ) for colname in table.column_names ] ) From a9f31997a87a16dd70117214c05d9d1bb766273a Mon Sep 17 00:00:00 2001 From: d33bs Date: Tue, 30 Apr 2024 14:27:31 -0600 Subject: [PATCH 07/14] update preset sql to use refined syntax --- cytotable/presets.py | 86 ++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 52 deletions(-) diff --git a/cytotable/presets.py b/cytotable/presets.py index da700dfd..d8d99849 100644 --- a/cytotable/presets.py +++ b/cytotable/presets.py @@ -68,26 +68,21 @@ # compartment and metadata joins performed using DuckDB SQL # and modified at runtime as needed "CONFIG_JOINS": """ - WITH per_image AS ( - SELECT - Metadata_ImageNumber, - Image_Metadata_Well, - Image_Metadata_Plate - FROM - read_parquet('per_image.parquet') - ) SELECT - * + per_image.Metadata_ImageNumber, + per_image.Image_Metadata_Well, + per_image.Image_Metadata_Plate, + per_cytoplasm.* EXCLUDE (Metadata_ImageNumber), + per_cells.* EXCLUDE (Metadata_ImageNumber, Metadata_ObjectNumber), + per_nuclei.* EXCLUDE (Metadata_ImageNumber, Metadata_ObjectNumber) FROM read_parquet('per_cytoplasm.parquet') AS per_cytoplasm - LEFT JOIN read_parquet('per_cells.parquet') AS per_cells ON - per_cells.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber - AND per_cells.Cells_Number_Object_Number = per_cytoplasm.Cytoplasm_Parent_Cells - LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei ON - per_nuclei.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber + LEFT JOIN read_parquet('per_cells.parquet') AS per_cells USING (Metadata_ImageNumber) + LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei USING (Metadata_ImageNumber) + LEFT JOIN read_parquet('per_image.parquet') AS per_image USING (Metadata_ImageNumber) + WHERE + per_cells.Cells_Number_Object_Number = per_cytoplasm.Cytoplasm_Parent_Cells AND per_nuclei.Nuclei_Number_Object_Number = per_cytoplasm.Cytoplasm_Parent_Nuclei - LEFT JOIN per_image AS per_image ON - per_image.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber """, }, "cellprofiler_sqlite_pycytominer": { @@ -119,26 +114,21 @@ # compartment and metadata joins performed using DuckDB SQL # and modified at runtime as needed "CONFIG_JOINS": """ - WITH per_image AS ( - SELECT - Metadata_ImageNumber, - Image_Metadata_Well, - Image_Metadata_Plate - FROM - read_parquet('per_image.parquet') - ) SELECT - * + per_image.Metadata_ImageNumber, + per_image.Image_Metadata_Well, + per_image.Image_Metadata_Plate, + per_cytoplasm.* EXCLUDE (Metadata_ImageNumber), + per_cells.* EXCLUDE (Metadata_ImageNumber), + per_nuclei.* EXCLUDE (Metadata_ImageNumber) FROM read_parquet('per_cytoplasm.parquet') AS per_cytoplasm - LEFT JOIN read_parquet('per_cells.parquet') AS per_cells ON - per_cells.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber - AND per_cells.Metadata_Cells_Number_Object_Number = per_cytoplasm.Metadata_Cytoplasm_Parent_Cells - LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei ON - per_nuclei.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber + LEFT JOIN read_parquet('per_cells.parquet') AS per_cells USING (Metadata_ImageNumber) + LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei USING (Metadata_ImageNumber) + LEFT JOIN read_parquet('per_image.parquet') AS per_image USING (Metadata_ImageNumber) + WHERE + per_cells.Metadata_Cells_Number_Object_Number = per_cytoplasm.Metadata_Cytoplasm_Parent_Cells AND per_nuclei.Metadata_Nuclei_Number_Object_Number = per_cytoplasm.Metadata_Cytoplasm_Parent_Nuclei - LEFT JOIN per_image AS per_image ON - per_image.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber """, }, "cell-health-cellprofiler-to-cytominer-database": { @@ -172,30 +162,22 @@ # compartment and metadata joins performed using DuckDB SQL # and modified at runtime as needed "CONFIG_JOINS": """ - WITH image AS ( - SELECT - Metadata_TableNumber, - Metadata_ImageNumber, - Image_Metadata_Well, - Image_Metadata_Plate - FROM - read_parquet('image.parquet') - ) SELECT - * + image.Metadata_TableNumber, + image.Metadata_ImageNumber, + image.Image_Metadata_Well, + image.Image_Metadata_Plate, + cytoplasm.* EXCLUDE (Metadata_TableNumber, Metadata_ImageNumber), + cells.* EXCLUDE (Metadata_TableNumber, Metadata_ImageNumber), + nuclei.* EXCLUDE (Metadata_TableNumber, Metadata_ImageNumber) FROM read_parquet('cytoplasm.parquet') AS cytoplasm - LEFT JOIN read_parquet('cells.parquet') AS cells ON - cells.Metadata_TableNumber = cytoplasm.Metadata_TableNumber - AND cells.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber - AND cells.Cells_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells - LEFT JOIN read_parquet('nuclei.parquet') AS nuclei ON - nuclei.Metadata_TableNumber = cytoplasm.Metadata_TableNumber - AND nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber + LEFT JOIN read_parquet('cells.parquet') AS cells USING (Metadata_TableNumber, Metadata_ImageNumber) + LEFT JOIN read_parquet('nuclei.parquet') AS nuclei USING (Metadata_TableNumber, Metadata_ImageNumber) + LEFT JOIN read_parquet('image.parquet') AS image USING (Metadata_TableNumber, Metadata_ImageNumber) + WHERE + cells.Cells_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells AND nuclei.Nuclei_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Nuclei - LEFT JOIN image AS image ON - image.Metadata_TableNumber = cytoplasm.Metadata_TableNumber - AND image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber """, }, "in-carta": { From 9b04d01e540866d3ec99bad661f216293955ae57 Mon Sep 17 00:00:00 2001 From: d33bs Date: Tue, 30 Apr 2024 17:04:12 -0600 Subject: [PATCH 08/14] address mixed type queries and tests --- cytotable/convert.py | 27 ++++++++++++++++----------- cytotable/presets.py | 4 ++-- cytotable/utils.py | 32 +++++++++++++++++++++++++++++--- tests/conftest.py | 39 ++++++++++++++++++--------------------- tests/test_convert.py | 16 ++++++++++++++++ 5 files changed, 81 insertions(+), 37 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index 6fd07115..1b7d5c21 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -109,6 +109,7 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]] # offset is set to 0 start at first row # result from table offset=0, + add_cytotable_meta=False, ) with _duckdb_reader() as ddb_reader: return ( @@ -303,14 +304,13 @@ def _source_chunk_to_parquet( from cloudpathlib import AnyPath from pyarrow import parquet + from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.utils import ( _duckdb_reader, _sqlite_mixed_type_query_to_parquet, _write_parquet_table_with_metadata, ) - from cytotable.constants import CYOTABLE_META_COLUMN_TYPES - # attempt to build dest_path source_dest_path = ( f"{dest_path}/{str(pathlib.Path(source_group_name).stem).lower()}/" @@ -318,18 +318,24 @@ def _source_chunk_to_parquet( ) pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True) + source_path_str = ( + source["source_path"] + if "table_name" not in source.keys() + else f"{source['source_path']}_table_{source['table_name']}" + ) # build the column selection block of query select_columns = ",".join( # add cytotable metadata columns [ ( - f"CAST( '{source['source_path']}' AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path']})" - " AS cytotable_meta_source_path" + f"CAST( '{source_path_str}' " + f"AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path']})" + ' AS "cytotable_meta_source_path"' ), - f"CAST( {offset} as {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset']}) AS cytotable_meta_offset", + f"CAST( {offset} AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset']}) AS \"cytotable_meta_offset\"", ( f"CAST( (row_number() OVER ()) AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum']})" - " AS cytotable_meta_rownum" + ' AS "cytotable_meta_rownum"' ), ] # add source table columns @@ -389,6 +395,7 @@ def _source_chunk_to_parquet( table_name=str(source["table_name"]), chunk_size=chunk_size, offset=offset, + add_cytotable_meta=True, ), where=result_filepath, ) @@ -438,8 +445,8 @@ def _prepend_column_name( import pyarrow.parquet as parquet from cytotable.constants import ( - CYTOTABLE_ARROW_USE_MEMORY_MAPPING, CYOTABLE_META_COLUMN_TYPES, + CYTOTABLE_ARROW_USE_MEMORY_MAPPING, ) from cytotable.utils import _write_parquet_table_with_metadata @@ -740,8 +747,6 @@ def _prepare_join_sql( {order_by_sql} """ - print(joins) - return joins @@ -776,8 +781,8 @@ def _join_source_chunk( import pathlib - from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata from cytotable.constants import CYOTABLE_META_COLUMN_TYPES + from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata # Attempt to read the data to parquet file # using duckdb for extraction and pyarrow for @@ -794,7 +799,7 @@ def _join_source_chunk( {joins} LIMIT {chunk_size} OFFSET {offset} ) - SELECT + SELECT /* exclude metadata columns from the results by using a CTE */ COLUMNS (c -> ({" AND ".join(exclude_meta_cols)})) FROM joined; diff --git a/cytotable/presets.py b/cytotable/presets.py index d8d99849..4f4191a5 100644 --- a/cytotable/presets.py +++ b/cytotable/presets.py @@ -73,8 +73,8 @@ per_image.Image_Metadata_Well, per_image.Image_Metadata_Plate, per_cytoplasm.* EXCLUDE (Metadata_ImageNumber), - per_cells.* EXCLUDE (Metadata_ImageNumber, Metadata_ObjectNumber), - per_nuclei.* EXCLUDE (Metadata_ImageNumber, Metadata_ObjectNumber) + per_cells.* EXCLUDE (Metadata_ImageNumber), + per_nuclei.* EXCLUDE (Metadata_ImageNumber) FROM read_parquet('per_cytoplasm.parquet') AS per_cytoplasm LEFT JOIN read_parquet('per_cells.parquet') AS per_cells USING (Metadata_ImageNumber) diff --git a/cytotable/utils.py b/cytotable/utils.py index c93c95d1..fc36f461 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -171,6 +171,7 @@ def _sqlite_mixed_type_query_to_parquet( table_name: str, chunk_size: int, offset: int, + add_cytotable_meta: bool = False, ) -> str: """ Performs SQLite table data extraction where one or many @@ -186,6 +187,8 @@ def _sqlite_mixed_type_query_to_parquet( Row count to use for chunked output. offset: int: The offset for chunking the data from source. + add_cytotable_meta: bool, default=False: + Whether to add CytoTable metadata fields or not Returns: pyarrow.Table: @@ -195,7 +198,10 @@ def _sqlite_mixed_type_query_to_parquet( import pyarrow as pa - from cytotable.constants import SQLITE_AFFINITY_DATA_TYPE_SYNONYMS + from cytotable.constants import ( + CYOTABLE_META_COLUMN_TYPES, + SQLITE_AFFINITY_DATA_TYPE_SYNONYMS, + ) from cytotable.exceptions import DatatypeException # open sqlite3 connection @@ -207,7 +213,7 @@ def _sqlite_mixed_type_query_to_parquet( # See the following for more information: # https://sqlite.org/pragma.html#pragma_table_info cursor.execute( - f""" + """ SELECT :table_name as table_name, name as column_name, type as column_type @@ -255,10 +261,30 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str: for col in column_info ] + if add_cytotable_meta: + query_parts += [ + ( + f"CAST( '{f'{source_path}_table_{table_name}'}' " + f"AS {_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path'].lower())}) " + "AS cytotable_meta_source_path" + ), + ( + f"CAST( {offset} " + f"AS {_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset'].lower())}) " + "AS cytotable_meta_offset" + ), + ( + f"CAST( (ROW_NUMBER() OVER ()) AS " + f"{_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum'].lower())}) " + "AS cytotable_meta_rownum" + ), + ] + # perform the select using the cases built above and using chunksize + offset cursor.execute( f""" - SELECT {', '.join(query_parts)} + SELECT + {', '.join(query_parts)} FROM {table_name} ORDER BY {', '.join([col['column_name'] for col in column_info])} LIMIT {chunk_size} OFFSET {offset}; diff --git a/tests/conftest.py b/tests/conftest.py index 3cfc73ae..8323af21 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -28,8 +28,8 @@ from pycytominer.cyto_utils.cells import SingleCells from sqlalchemy.util import deprecations -from cytotable.utils import _column_sort, _default_parsl_config, _parsl_loaded from cytotable.constants import CYOTABLE_META_COLUMN_TYPES +from cytotable.utils import _column_sort, _default_parsl_config, _parsl_loaded # filters sqlalchemy 2.0 uber warning # referenced from: https://stackoverflow.com/a/76308286 @@ -322,7 +322,7 @@ def fixture_example_local_sources( [ column for column in table.column_names - if column not in list(CYOTABLE_META_COLUMN_TYPES.keys()) + if column not in CYOTABLE_META_COLUMN_TYPES ] ), f"{fx_tempdir}/example/{number}/{name}.csv", @@ -495,22 +495,19 @@ def fixture_cellprofiler_merged_nf1data( .execute( """ /* perform query on sqlite tables through duckdb */ - WITH per_image AS ( - SELECT - ImageNumber, - Image_Metadata_Well, - Image_Metadata_Plate - FROM Per_Image - ) - SELECT * - FROM per_image image - LEFT JOIN Per_Cytoplasm cytoplasm ON - image.ImageNumber = cytoplasm.ImageNumber - LEFT JOIN Per_Cells cells ON - cells.ImageNumber = cytoplasm.ImageNumber - AND cells.Cells_Number_Object_Number = cytoplasm.Cytoplasm_Parent_Cells - LEFT JOIN Per_Nuclei nuclei ON - nuclei.ImageNumber = cytoplasm.ImageNumber + SELECT + image.ImageNumber, + image.Image_Metadata_Well, + image.Image_Metadata_Plate, + cytoplasm.*, + cells.*, + nuclei.* + FROM Per_Cytoplasm cytoplasm + LEFT JOIN Per_Cells cells USING (ImageNumber) + LEFT JOIN Per_Nuclei nuclei USING (ImageNumber) + LEFT JOIN Per_Image image USING (ImageNumber) + WHERE + cells.Cells_Number_Object_Number = cytoplasm.Cytoplasm_Parent_Cells AND nuclei.Nuclei_Number_Object_Number = cytoplasm.Cytoplasm_Parent_Nuclei """ ) @@ -552,7 +549,7 @@ def fixture_cytominerdatabase_merged_cellhealth( """ sql_stmt = """ - WITH image AS ( + WITH image_filtered AS ( SELECT TableNumber, ImageNumber, @@ -581,9 +578,9 @@ def fixture_cytominerdatabase_merged_cellhealth( FROM Nuclei ) SELECT DISTINCT * - FROM image + FROM image_filtered LEFT JOIN Cytoplasm_renamed cytoplasm ON - image.ImageNumber = cytoplasm.ImageNumber + image_filtered.ImageNumber = cytoplasm.ImageNumber LEFT JOIN Cells_renamed cells ON cells.ImageNumber = cytoplasm.ImageNumber AND cells.Cells_Number_Object_Number = cytoplasm.Cytoplasm_Parent_Cells diff --git a/tests/test_convert.py b/tests/test_convert.py index 47a6b29f..66a0d289 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -20,6 +20,7 @@ from pyarrow import csv, parquet from pycytominer.cyto_utils.cells import SingleCells +from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.convert import ( _concat_join_sources, _concat_source_group, @@ -1011,6 +1012,12 @@ def test_sqlite_mixed_type_query_to_parquet( "Tbl_a_col_text": ["sample", "sample"], "Tbl_a_col_blob": [b"another_blob", b"sample_blob"], "Tbl_a_col_real": [None, 0.5], + "cytotable_meta_source_path": [ + f"{pathlib.Path(fx_tempdir).resolve()}/example_mixed_types.sqlite_table_tbl_a", + f"{pathlib.Path(fx_tempdir).resolve()}/example_mixed_types.sqlite_table_tbl_a", + ], + "cytotable_meta_offset": [0, 0], + "cytotable_meta_rownum": [2, 1], } @@ -1122,6 +1129,15 @@ def test_in_carta_to_parquet( ]["table"][0] ) + # drop cytotable metadata columns for comparisons (example sources won't contain these) + cytotable_result_table = cytotable_result_table.select( + [ + column + for column in cytotable_result_table.column_names + if column not in CYOTABLE_META_COLUMN_TYPES + ] + ) + # check the data against one another assert cytotable_result_table.schema.equals(ddb_result.schema) assert cytotable_result_table.shape == ddb_result.shape From 4908f6e3b6b41da77b98aa584337d237436e30e0 Mon Sep 17 00:00:00 2001 From: d33bs Date: Wed, 1 May 2024 10:21:41 -0600 Subject: [PATCH 09/14] simplify and further clarity in test --- cytotable/convert.py | 16 +++++++-------- tests/test_convert.py | 46 ++++++++++++++++++++++++++----------------- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index 87e76888..6580be70 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -499,7 +499,7 @@ def _prepend_column_name( if ( column_name not in identifying_columns and not column_name.startswith(source_group_name_stem.capitalize()) - and column_name not in list(CYOTABLE_META_COLUMN_TYPES.keys()) + and column_name not in CYOTABLE_META_COLUMN_TYPES ): updated_column_names.append(f"{source_group_name_stem}_{column_name}") # if-condition for prepending 'Metadata_' to column name @@ -731,24 +731,24 @@ def _prepare_join_sql( order_by_tables = [] for key, val in sources.items(): if pathlib.Path(key).stem.lower() in joins.lower(): + table_name = str(pathlib.Path(key).stem.lower()) joins = joins.replace( - f"'{str(pathlib.Path(key).stem.lower())}.parquet'", + f"'{table_name}.parquet'", str([str(table) for table in val[0]["table"]]), ) - order_by_tables.append(str(pathlib.Path(key).stem.lower())) + order_by_tables.append(table_name) + # create order by statement with from all tables using cytotable metadata order_by_sql = "ORDER BY " + ", ".join( [ f"{table}.{meta_column}" for table in order_by_tables - for meta_column in list(CYOTABLE_META_COLUMN_TYPES.keys()) + for meta_column in CYOTABLE_META_COLUMN_TYPES ] ) - joins = f"""{joins} - {order_by_sql} - """ - return joins + # add the order by statements to the join + return joins + order_by_sql @python_app diff --git a/tests/test_convert.py b/tests/test_convert.py index 14deae8c..d527543d 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -391,7 +391,10 @@ def test_prepare_join_sql( # simplified join for example dataset joins=""" SELECT - * + image.ImageNumber, + cytoplasm.*, + cells.*, + nuclei.* FROM read_parquet('cytoplasm.parquet') AS cytoplasm LEFT JOIN read_parquet('cells.parquet') AS cells USING (ImageNumber) @@ -402,25 +405,32 @@ def test_prepare_join_sql( AND nuclei.Nuclei_ObjectNumber = cytoplasm.Cytoplasm_Parent_Nuclei """, ).result() - ).arrow() + ).df() # check that we received expected data back - assert result.shape == (4, 23) - assert result.slice(length=1).to_pydict() == { - "ImageNumber": ["1"], - "Cytoplasm_ObjectNumber": [1], - "Cytoplasm_Parent_Cells": [1], - "Cytoplasm_Parent_Nuclei": [1], - "Cytoplasm_Feature_X": [0.1], - "cytotable_meta_source_path": ["image.csv"], - "cytotable_meta_offset": [50], - "cytotable_meta_rownum": [1], - "Cells_ObjectNumber": [1], - "Cells_Feature_Y": [0.01], - "Nuclei_ObjectNumber": [1], - "Nuclei_Feature_Z": [0.001], - "Image_Metadata_Plate": ["001"], - "Image_Metadata_Well": ["A1"], + assert result.shape == (4, 21) + assert result.iloc[0].to_dict() == { + "ImageNumber": "1", + "ImageNumber_1": "1", + "Cytoplasm_ObjectNumber": 1, + "Cytoplasm_Parent_Cells": 1, + "Cytoplasm_Parent_Nuclei": 1, + "Cytoplasm_Feature_X": 0.1, + "cytotable_meta_source_path": "cytoplasm.csv", + "cytotable_meta_offset": 50, + "cytotable_meta_rownum": 1, + "ImageNumber_2": "1", + "Cells_ObjectNumber": 1, + "Cells_Feature_Y": 0.01, + "cytotable_meta_source_path_1": "cells.csv", + "cytotable_meta_offset_1": 50, + "cytotable_meta_rownum_1": 1, + "ImageNumber_3": "1", + "Nuclei_ObjectNumber": 1, + "Nuclei_Feature_Z": 0.001, + "cytotable_meta_source_path_2": "nuclei_1.csv", + "cytotable_meta_offset_2": 50, + "cytotable_meta_rownum_2": 1, } From 49a9d141439291536962dae795c6901dac663f93 Mon Sep 17 00:00:00 2001 From: d33bs Date: Wed, 1 May 2024 10:23:57 -0600 Subject: [PATCH 10/14] correcting comment --- cytotable/convert.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index 6580be70..c0612fe6 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -801,7 +801,8 @@ def _join_source_chunk( LIMIT {chunk_size} OFFSET {offset} ) SELECT - /* exclude metadata columns from the results by using a CTE */ + /* exclude metadata columns from the results + by using a lambda on column names based on exclude_meta_cols. */ COLUMNS (c -> ({" AND ".join(exclude_meta_cols)})) FROM joined; """ From 88e7fe65ca4b69eca7cecf8ae3da845cd0541e88 Mon Sep 17 00:00:00 2001 From: d33bs Date: Wed, 12 Jun 2024 10:40:16 -0600 Subject: [PATCH 11/14] make sorting optional --- cytotable/convert.py | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index c0612fe6..8021c589 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -277,6 +277,7 @@ def _source_chunk_to_parquet( chunk_size: int, offset: int, dest_path: str, + sort_output: bool ) -> str: """ Export source data to chunked parquet file using chunk size and offsets. @@ -293,6 +294,8 @@ def _source_chunk_to_parquet( The offset for chunking the data from source. dest_path: str Path to store the output data. + sort_output: bool + Specifies whether to sort cytotable output or not. Returns: str @@ -325,9 +328,9 @@ def _source_chunk_to_parquet( else f"{source['source_path']}_table_{source['table_name']}" ) # build the column selection block of query - select_columns = ",".join( - # add cytotable metadata columns - [ + + # add cytotable metadata columns + cytotable_metadata_cols = [ ( f"CAST( '{source_path_str}' " f"AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path']})" @@ -339,12 +342,17 @@ def _source_chunk_to_parquet( ' AS "cytotable_meta_rownum"' ), ] - # add source table columns - + [ + # add source table columns + casted_source_cols = [ # here we cast the column to the specified type ensure the colname remains the same f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\"" for column in source["columns"] ] + + # create selection statement from lists above + select_columns = ",".join( + # if we should sort the output, add the metadata_cols + cytotable_metadata_cols + casted_source_cols if sort_output else casted_source_cols ) # build output query and filepath base @@ -396,7 +404,7 @@ def _source_chunk_to_parquet( table_name=str(source["table_name"]), chunk_size=chunk_size, offset=offset, - add_cytotable_meta=True, + add_cytotable_meta=True if sort_output else False, ), where=result_filepath, ) @@ -707,6 +715,7 @@ def _concat_source_group( def _prepare_join_sql( sources: Dict[str, List[Dict[str, Any]]], joins: str, + sort_output: bool, ) -> str: """ Prepare join SQL statement with actual locations of data based on the sources. @@ -718,6 +727,8 @@ def _prepare_join_sql( joins: str: DuckDB-compatible SQL which will be used to perform the join operations using the join_group keys as a reference. + sort_output: bool + Specifies whether to sort cytotable output or not. Returns: str: @@ -748,7 +759,7 @@ def _prepare_join_sql( ) # add the order by statements to the join - return joins + order_by_sql + return joins + order_by_sql if sort_output else joins @python_app @@ -1024,6 +1035,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals chunk_size: Optional[int], infer_common_schema: bool, drop_null: bool, + sort_output: bool, data_type_cast_map: Optional[Dict[str, str]] = None, **kwargs, ) -> Union[Dict[str, List[Dict[str, Any]]], str]: @@ -1062,6 +1074,8 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals Whether to infer a common schema when concatenating sources. drop_null: bool: Whether to drop null results. + sort_output: bool + Specifies whether to sort cytotable output or not. data_type_cast_map: Dict[str, str] A dictionary mapping data type groups to specific types. Roughly includes Arrow data types language from: @@ -1159,6 +1173,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals chunk_size=chunk_size, offset=offset, dest_path=expanded_dest_path, + sort_output=sort_output ), source_group_name=source_group_name, identifying_columns=identifying_columns, @@ -1219,7 +1234,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals evaluated_results = evaluate_futures(results) prepared_joins_sql = _prepare_join_sql( - sources=evaluated_results, joins=joins + sources=evaluated_results, joins=joins, sort_output=sort_output ).result() # map joined results based on the join groups gathered above @@ -1272,6 +1287,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals infer_common_schema: bool = True, drop_null: bool = False, data_type_cast_map: Optional[Dict[str, str]] = None, + sort_output: bool = True, preset: Optional[str] = "cellprofiler_csv", parsl_config: Optional[parsl.Config] = None, **kwargs, @@ -1313,8 +1329,14 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals DuckDB-compatible SQL which will be used to perform the join operations. chunk_size: Optional[int] (Default value = None) Size of join chunks which is used to limit data size during join ops - infer_common_schema: bool: (Default value = True) + infer_common_schema: bool (Default value = True) Whether to infer a common schema when concatenating sources. + data_type_cast_map: Dict[str, str], (Default value = None) + A dictionary mapping data type groups to specific types. + Roughly includes Arrow data types language from: + https://arrow.apache.org/docs/python/api/datatypes.html + sort_output: bool (Default value = True) + Specifies whether to sort cytotable output or not. drop_null: bool (Default value = False) Whether to drop nan/null values from results preset: str (Default value = "cellprofiler_csv") @@ -1429,6 +1451,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals infer_common_schema=infer_common_schema, drop_null=drop_null, data_type_cast_map=data_type_cast_map, + sort_output=sort_output, **kwargs, ) From 5def23d4b599d07ee0146293ca559459e726e253 Mon Sep 17 00:00:00 2001 From: d33bs Date: Wed, 12 Jun 2024 11:39:13 -0600 Subject: [PATCH 12/14] fix existing tests --- tests/test_convert.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_convert.py b/tests/test_convert.py index d527543d..896fb993 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -404,6 +404,7 @@ def test_prepare_join_sql( cells.Cells_ObjectNumber = cytoplasm.Cytoplasm_Parent_Cells AND nuclei.Nuclei_ObjectNumber = cytoplasm.Cytoplasm_Parent_Nuclei """, + sort_output=True ).result() ).df() @@ -638,6 +639,7 @@ def test_to_parquet( chunk_size=4, infer_common_schema=False, drop_null=True, + sort_output=True ), ) From 8c40263ed99228c20cca9da58acb09337b93ca51 Mon Sep 17 00:00:00 2001 From: d33bs Date: Wed, 12 Jun 2024 12:15:35 -0600 Subject: [PATCH 13/14] further sorting options applied --- cytotable/convert.py | 53 +++++++++++++++++++++++++++---------------- cytotable/utils.py | 15 +++++++++++- tests/test_convert.py | 5 ++-- 3 files changed, 50 insertions(+), 23 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index 8021c589..c0d782c5 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -25,7 +25,9 @@ @python_app -def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]: +def _get_table_columns_and_types( + source: Dict[str, Any], sort_output: bool +) -> List[Dict[str, str]]: """ Gather column data from table through duckdb. @@ -33,6 +35,8 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]] source: Dict[str, Any] Contains the source data to be chunked. Represents a single file or table of some kind. + sort_output: + Specifies whether to sort cytotable output or not. Returns: List[Dict[str, str]] @@ -111,6 +115,7 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]] # result from table offset=0, add_cytotable_meta=False, + sort_output=sort_output, ) with _duckdb_reader() as ddb_reader: return ( @@ -277,7 +282,7 @@ def _source_chunk_to_parquet( chunk_size: int, offset: int, dest_path: str, - sort_output: bool + sort_output: bool, ) -> str: """ Export source data to chunked parquet file using chunk size and offsets. @@ -331,28 +336,30 @@ def _source_chunk_to_parquet( # add cytotable metadata columns cytotable_metadata_cols = [ - ( - f"CAST( '{source_path_str}' " - f"AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path']})" - ' AS "cytotable_meta_source_path"' - ), - f"CAST( {offset} AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset']}) AS \"cytotable_meta_offset\"", - ( - f"CAST( (row_number() OVER ()) AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum']})" - ' AS "cytotable_meta_rownum"' - ), - ] + ( + f"CAST( '{source_path_str}' " + f"AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path']})" + ' AS "cytotable_meta_source_path"' + ), + f"CAST( {offset} AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset']}) AS \"cytotable_meta_offset\"", + ( + f"CAST( (row_number() OVER ()) AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum']})" + ' AS "cytotable_meta_rownum"' + ), + ] # add source table columns casted_source_cols = [ - # here we cast the column to the specified type ensure the colname remains the same - f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\"" - for column in source["columns"] - ] + # here we cast the column to the specified type ensure the colname remains the same + f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\"" + for column in source["columns"] + ] # create selection statement from lists above select_columns = ",".join( # if we should sort the output, add the metadata_cols - cytotable_metadata_cols + casted_source_cols if sort_output else casted_source_cols + cytotable_metadata_cols + casted_source_cols + if sort_output + else casted_source_cols ) # build output query and filepath base @@ -382,6 +389,11 @@ def _source_chunk_to_parquet( ORDER BY ALL LIMIT {chunk_size} OFFSET {offset} """ + if sort_output + else f""" + {base_query} + LIMIT {chunk_size} OFFSET {offset} + """ ).arrow(), where=result_filepath, ) @@ -405,6 +417,7 @@ def _source_chunk_to_parquet( chunk_size=chunk_size, offset=offset, add_cytotable_meta=True if sort_output else False, + sort_output=sort_output, ), where=result_filepath, ) @@ -1147,7 +1160,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals **{ "columns": _prep_cast_column_data_types( columns=_get_table_columns_and_types( - source=source, + source=source, sort_output=sort_output ), data_type_cast_map=data_type_cast_map, ) @@ -1173,7 +1186,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals chunk_size=chunk_size, offset=offset, dest_path=expanded_dest_path, - sort_output=sort_output + sort_output=sort_output, ), source_group_name=source_group_name, identifying_columns=identifying_columns, diff --git a/cytotable/utils.py b/cytotable/utils.py index 34ec7f29..62972b96 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -171,6 +171,7 @@ def _sqlite_mixed_type_query_to_parquet( table_name: str, chunk_size: int, offset: int, + sort_output: bool, add_cytotable_meta: bool = False, ) -> str: """ @@ -187,6 +188,8 @@ def _sqlite_mixed_type_query_to_parquet( Row count to use for chunked output. offset: int: The offset for chunking the data from source. + sort_output: bool + Specifies whether to sort cytotable output or not. add_cytotable_meta: bool, default=False: Whether to add CytoTable metadata fields or not @@ -281,7 +284,7 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str: ] # perform the select using the cases built above and using chunksize + offset - cursor.execute( + sql_stmt = ( f""" SELECT {', '.join(query_parts)} @@ -289,7 +292,17 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str: ORDER BY {', '.join([col['column_name'] for col in column_info])} LIMIT {chunk_size} OFFSET {offset}; """ + if sort_output + else f""" + SELECT + {', '.join(query_parts)} + FROM {table_name} + LIMIT {chunk_size} OFFSET {offset}; + """ ) + + # execute the sql stmt + cursor.execute(sql_stmt) # collect the results and include the column name with values results = [ dict(zip([desc[0] for desc in cursor.description], row)) diff --git a/tests/test_convert.py b/tests/test_convert.py index 896fb993..6f5eeac9 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -404,7 +404,7 @@ def test_prepare_join_sql( cells.Cells_ObjectNumber = cytoplasm.Cytoplasm_Parent_Cells AND nuclei.Nuclei_ObjectNumber = cytoplasm.Cytoplasm_Parent_Nuclei """, - sort_output=True + sort_output=True, ).result() ).df() @@ -639,7 +639,7 @@ def test_to_parquet( chunk_size=4, infer_common_schema=False, drop_null=True, - sort_output=True + sort_output=True, ), ) @@ -1027,6 +1027,7 @@ def test_sqlite_mixed_type_query_to_parquet( table_name=table_name, chunk_size=2, offset=0, + sort_output=True, ), where=result_filepath, ) From 4ffe9c12ebc684bf4eb7a717a34750c3e49f57aa Mon Sep 17 00:00:00 2001 From: d33bs Date: Wed, 12 Jun 2024 12:20:40 -0600 Subject: [PATCH 14/14] add a test for unsorted output --- tests/test_convert.py | 58 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/tests/test_convert.py b/tests/test_convert.py index 6f5eeac9..f221ff10 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -665,6 +665,64 @@ def test_to_parquet( assert parquet_result.shape == csv_source.shape +def test_to_parquet_unsorted( + load_parsl_default: None, + fx_tempdir: str, + example_local_sources: Dict[str, List[Dict[str, Any]]], +): + """ + Tests _to_parquet with sort_output == False (unsorted) + """ + + flattened_example_sources = list( + itertools.chain(*list(example_local_sources.values())) + ) + + # note: we cast here for mypy linting (dict and str treatment differ) + result: Dict[str, List[Dict[str, Any]]] = cast( + dict, + _to_parquet( + source_path=str( + example_local_sources["image.csv"][0]["source_path"].parent + ), + dest_path=fx_tempdir, + source_datatype=None, + compartments=["cytoplasm", "cells", "nuclei"], + metadata=["image"], + identifying_columns=["imagenumber"], + concat=False, + join=False, + joins=None, + chunk_columns=None, + chunk_size=4, + infer_common_schema=False, + drop_null=True, + sort_output=False, + ), + ) + + flattened_results = list(itertools.chain(*list(result.values()))) + for i, flattened_result in enumerate(flattened_results): + csv_source = ( + _duckdb_reader() + .execute( + f""" + select * from + read_csv_auto('{str(flattened_example_sources[i]["source_path"])}', + ignore_errors=TRUE) + """ + ) + .arrow() + ) + parquet_result = parquet.ParquetDataset( + path_or_paths=flattened_result["table"], + # set the order of the columns uniformly for schema comparison + schema=csv_source.schema, + ).read() + assert parquet_result.schema.equals(csv_source.schema) + assert parquet_result.shape == csv_source.shape + + def test_infer_source_group_common_schema( load_parsl_default: None, example_local_sources: Dict[str, List[Dict[str, Any]]],