Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase sorting scalability via CytoTable metadata columns #204

Merged
merged 15 commits into from
Jun 12, 2024
7 changes: 7 additions & 0 deletions cytotable/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@
],
}

# metadata column names and types for internal use within CytoTable
CYOTABLE_META_COLUMN_TYPES = {
"cytotable_meta_source_path": "VARCHAR",
"cytotable_meta_offset": "BIGINT",
"cytotable_meta_rownum": "BIGINT",
}

CYTOTABLE_DEFAULT_PARQUET_METADATA = {
"data-producer": "https://github.com/cytomining/CytoTable",
"data-producer-version": str(_get_cytotable_version()),
Expand Down
66 changes: 58 additions & 8 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]
# offset is set to 0 start at first row
# result from table
offset=0,
add_cytotable_meta=False,
)
with _duckdb_reader() as ddb_reader:
return (
Expand Down Expand Up @@ -304,6 +305,7 @@ def _source_chunk_to_parquet(
from cloudpathlib import AnyPath
from pyarrow import parquet

from cytotable.constants import CYOTABLE_META_COLUMN_TYPES
from cytotable.utils import (
_duckdb_reader,
_sqlite_mixed_type_query_to_parquet,
Expand All @@ -317,9 +319,28 @@ def _source_chunk_to_parquet(
)
pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True)

source_path_str = (
source["source_path"]
if "table_name" not in source.keys()
else f"{source['source_path']}_table_{source['table_name']}"
)
# build the column selection block of query
select_columns = ",".join(
# add cytotable metadata columns
[
(
f"CAST( '{source_path_str}' "
f"AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path']})"
' AS "cytotable_meta_source_path"'
),
f"CAST( {offset} AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset']}) AS \"cytotable_meta_offset\"",
(
f"CAST( (row_number() OVER ()) AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum']})"
' AS "cytotable_meta_rownum"'
),
]
# add source table columns
+ [
d33bs marked this conversation as resolved.
Show resolved Hide resolved
# here we cast the column to the specified type ensure the colname remains the same
f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\""
for column in source["columns"]
Expand Down Expand Up @@ -375,6 +396,7 @@ def _source_chunk_to_parquet(
table_name=str(source["table_name"]),
chunk_size=chunk_size,
offset=offset,
add_cytotable_meta=True,
d33bs marked this conversation as resolved.
Show resolved Hide resolved
),
where=result_filepath,
)
Expand Down Expand Up @@ -423,7 +445,10 @@ def _prepend_column_name(

import pyarrow.parquet as parquet

from cytotable.constants import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.constants import (
CYOTABLE_META_COLUMN_TYPES,
CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
)
from cytotable.utils import _write_parquet_table_with_metadata

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -471,8 +496,10 @@ def _prepend_column_name(
# source_group_name_stem: 'Cells'
# column_name: 'AreaShape_Area'
# updated_column_name: 'Cells_AreaShape_Area'
if column_name not in identifying_columns and not column_name.startswith(
source_group_name_stem.capitalize()
if (
column_name not in identifying_columns
and not column_name.startswith(source_group_name_stem.capitalize())
and column_name not in CYOTABLE_META_COLUMN_TYPES
):
updated_column_names.append(f"{source_group_name_stem}_{column_name}")
# if-condition for prepending 'Metadata_' to column name
Expand Down Expand Up @@ -698,15 +725,30 @@ def _prepare_join_sql(
"""
import pathlib

from cytotable.constants import CYOTABLE_META_COLUMN_TYPES

# replace with real location of sources for join sql
order_by_tables = []
for key, val in sources.items():
if pathlib.Path(key).stem.lower() in joins.lower():
table_name = str(pathlib.Path(key).stem.lower())
joins = joins.replace(
f"'{str(pathlib.Path(key).stem.lower())}.parquet'",
f"'{table_name}.parquet'",
str([str(table) for table in val[0]["table"]]),
)
order_by_tables.append(table_name)

# create order by statement with from all tables using cytotable metadata
order_by_sql = "ORDER BY " + ", ".join(
[
f"{table}.{meta_column}"
for table in order_by_tables
for meta_column in CYOTABLE_META_COLUMN_TYPES
]
)

return joins
# add the order by statements to the join
return joins + order_by_sql
d33bs marked this conversation as resolved.
Show resolved Hide resolved


@python_app
Expand Down Expand Up @@ -740,21 +782,29 @@ def _join_source_chunk(

import pathlib

import pyarrow.parquet as parquet

from cytotable.constants import CYOTABLE_META_COLUMN_TYPES
from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata

# Attempt to read the data to parquet file
# using duckdb for extraction and pyarrow for
# writing data to a parquet file.
# read data with chunk size + offset
# and export to parquet
exclude_meta_cols = [
f"c NOT LIKE '{col}%'" for col in list(CYOTABLE_META_COLUMN_TYPES.keys())
]
with _duckdb_reader() as ddb_reader:
result = ddb_reader.execute(
f"""
WITH joined AS (
{joins}
{"ORDER BY ALL" if "ORDER BY" not in joins.upper() else ""}
LIMIT {chunk_size} OFFSET {offset}
)
SELECT
/* exclude metadata columns from the results
by using a lambda on column names based on exclude_meta_cols. */
COLUMNS (c -> ({" AND ".join(exclude_meta_cols)}))
FROM joined;
"""
).arrow()

Expand Down
110 changes: 43 additions & 67 deletions cytotable/presets.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,19 @@
# compartment and metadata joins performed using DuckDB SQL
# and modified at runtime as needed
"CONFIG_JOINS": """
WITH Image_Filtered AS (
SELECT
/* seeks columns by name, avoiding failure if some do not exist */
COLUMNS('^Metadata_ImageNumber$|^Image_Metadata_Well$|^Image_Metadata_Plate$')
FROM
read_parquet('image.parquet')
)
SELECT
*
image.Metadata_ImageNumber,
cytoplasm.* EXCLUDE (Metadata_ImageNumber),
cells.* EXCLUDE (Metadata_ImageNumber, Metadata_ObjectNumber),
nuclei.* EXCLUDE (Metadata_ImageNumber, Metadata_ObjectNumber)
FROM
read_parquet('cytoplasm.parquet') AS cytoplasm
LEFT JOIN read_parquet('cells.parquet') AS cells ON
cells.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
AND cells.Metadata_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells
LEFT JOIN read_parquet('nuclei.parquet') AS nuclei ON
nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
LEFT JOIN read_parquet('cells.parquet') AS cells USING (Metadata_ImageNumber)
LEFT JOIN read_parquet('nuclei.parquet') AS nuclei USING (Metadata_ImageNumber)
LEFT JOIN read_parquet('image.parquet') AS image USING (Metadata_ImageNumber)
WHERE
cells.Metadata_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells
AND nuclei.Metadata_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Nuclei
LEFT JOIN Image_Filtered AS image ON
image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
""",
},
"cellprofiler_sqlite": {
Expand All @@ -74,26 +68,21 @@
# compartment and metadata joins performed using DuckDB SQL
# and modified at runtime as needed
"CONFIG_JOINS": """
WITH Per_Image_Filtered AS (
SELECT
Metadata_ImageNumber,
Image_Metadata_Well,
Image_Metadata_Plate
FROM
read_parquet('per_image.parquet')
)
SELECT
*
per_image.Metadata_ImageNumber,
per_image.Image_Metadata_Well,
per_image.Image_Metadata_Plate,
per_cytoplasm.* EXCLUDE (Metadata_ImageNumber),
per_cells.* EXCLUDE (Metadata_ImageNumber),
per_nuclei.* EXCLUDE (Metadata_ImageNumber)
FROM
read_parquet('per_cytoplasm.parquet') AS per_cytoplasm
LEFT JOIN read_parquet('per_cells.parquet') AS per_cells ON
per_cells.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber
AND per_cells.Cells_Number_Object_Number = per_cytoplasm.Cytoplasm_Parent_Cells
LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei ON
per_nuclei.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber
LEFT JOIN read_parquet('per_cells.parquet') AS per_cells USING (Metadata_ImageNumber)
LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei USING (Metadata_ImageNumber)
LEFT JOIN read_parquet('per_image.parquet') AS per_image USING (Metadata_ImageNumber)
WHERE
per_cells.Cells_Number_Object_Number = per_cytoplasm.Cytoplasm_Parent_Cells
AND per_nuclei.Nuclei_Number_Object_Number = per_cytoplasm.Cytoplasm_Parent_Nuclei
LEFT JOIN Per_Image_Filtered AS per_image ON
per_image.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber
""",
},
"cellprofiler_sqlite_pycytominer": {
Expand Down Expand Up @@ -125,26 +114,21 @@
# compartment and metadata joins performed using DuckDB SQL
# and modified at runtime as needed
"CONFIG_JOINS": """
WITH Per_Image_Filtered AS (
SELECT
Metadata_ImageNumber,
Image_Metadata_Well,
Image_Metadata_Plate
FROM
read_parquet('per_image.parquet')
)
SELECT
*
per_image.Metadata_ImageNumber,
per_image.Image_Metadata_Well,
per_image.Image_Metadata_Plate,
per_cytoplasm.* EXCLUDE (Metadata_ImageNumber),
per_cells.* EXCLUDE (Metadata_ImageNumber),
per_nuclei.* EXCLUDE (Metadata_ImageNumber)
FROM
read_parquet('per_cytoplasm.parquet') AS per_cytoplasm
LEFT JOIN read_parquet('per_cells.parquet') AS per_cells ON
per_cells.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber
AND per_cells.Metadata_Cells_Number_Object_Number = per_cytoplasm.Metadata_Cytoplasm_Parent_Cells
LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei ON
per_nuclei.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber
LEFT JOIN read_parquet('per_cells.parquet') AS per_cells USING (Metadata_ImageNumber)
LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei USING (Metadata_ImageNumber)
LEFT JOIN read_parquet('per_image.parquet') AS per_image USING (Metadata_ImageNumber)
WHERE
per_cells.Metadata_Cells_Number_Object_Number = per_cytoplasm.Metadata_Cytoplasm_Parent_Cells
AND per_nuclei.Metadata_Nuclei_Number_Object_Number = per_cytoplasm.Metadata_Cytoplasm_Parent_Nuclei
LEFT JOIN Per_Image_Filtered AS per_image ON
per_image.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber
""",
},
"cell-health-cellprofiler-to-cytominer-database": {
Expand Down Expand Up @@ -178,30 +162,22 @@
# compartment and metadata joins performed using DuckDB SQL
# and modified at runtime as needed
"CONFIG_JOINS": """
WITH Image_Filtered AS (
SELECT
Metadata_TableNumber,
Metadata_ImageNumber,
Image_Metadata_Well,
Image_Metadata_Plate
FROM
read_parquet('image.parquet')
)
SELECT
*
image.Metadata_TableNumber,
image.Metadata_ImageNumber,
image.Image_Metadata_Well,
image.Image_Metadata_Plate,
cytoplasm.* EXCLUDE (Metadata_TableNumber, Metadata_ImageNumber),
cells.* EXCLUDE (Metadata_TableNumber, Metadata_ImageNumber),
nuclei.* EXCLUDE (Metadata_TableNumber, Metadata_ImageNumber)
FROM
read_parquet('cytoplasm.parquet') AS cytoplasm
LEFT JOIN read_parquet('cells.parquet') AS cells ON
cells.Metadata_TableNumber = cytoplasm.Metadata_TableNumber
AND cells.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
AND cells.Cells_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells
LEFT JOIN read_parquet('nuclei.parquet') AS nuclei ON
nuclei.Metadata_TableNumber = cytoplasm.Metadata_TableNumber
AND nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
LEFT JOIN read_parquet('cells.parquet') AS cells USING (Metadata_TableNumber, Metadata_ImageNumber)
LEFT JOIN read_parquet('nuclei.parquet') AS nuclei USING (Metadata_TableNumber, Metadata_ImageNumber)
LEFT JOIN read_parquet('image.parquet') AS image USING (Metadata_TableNumber, Metadata_ImageNumber)
WHERE
cells.Cells_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells
AND nuclei.Nuclei_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Nuclei
LEFT JOIN Image_Filtered AS image ON
image.Metadata_TableNumber = cytoplasm.Metadata_TableNumber
AND image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
""",
},
"in-carta": {
Expand Down
32 changes: 29 additions & 3 deletions cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def _sqlite_mixed_type_query_to_parquet(
table_name: str,
chunk_size: int,
offset: int,
add_cytotable_meta: bool = False,
) -> str:
"""
Performs SQLite table data extraction where one or many
Expand All @@ -186,6 +187,8 @@ def _sqlite_mixed_type_query_to_parquet(
Row count to use for chunked output.
offset: int:
The offset for chunking the data from source.
add_cytotable_meta: bool, default=False:
Whether to add CytoTable metadata fields or not

Returns:
pyarrow.Table:
Expand All @@ -195,7 +198,10 @@ def _sqlite_mixed_type_query_to_parquet(

import pyarrow as pa

from cytotable.constants import SQLITE_AFFINITY_DATA_TYPE_SYNONYMS
from cytotable.constants import (
CYOTABLE_META_COLUMN_TYPES,
SQLITE_AFFINITY_DATA_TYPE_SYNONYMS,
)
from cytotable.exceptions import DatatypeException

# open sqlite3 connection
Expand All @@ -207,7 +213,7 @@ def _sqlite_mixed_type_query_to_parquet(
# See the following for more information:
# https://sqlite.org/pragma.html#pragma_table_info
cursor.execute(
f"""
"""
SELECT :table_name as table_name,
name as column_name,
type as column_type
Expand Down Expand Up @@ -255,10 +261,30 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str:
for col in column_info
]

if add_cytotable_meta:
query_parts += [
(
f"CAST( '{f'{source_path}_table_{table_name}'}' "
f"AS {_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path'].lower())}) "
"AS cytotable_meta_source_path"
),
(
f"CAST( {offset} "
f"AS {_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset'].lower())}) "
"AS cytotable_meta_offset"
),
(
f"CAST( (ROW_NUMBER() OVER ()) AS "
f"{_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum'].lower())}) "
"AS cytotable_meta_rownum"
),
]

# perform the select using the cases built above and using chunksize + offset
cursor.execute(
f"""
SELECT {', '.join(query_parts)}
SELECT
{', '.join(query_parts)}
FROM {table_name}
ORDER BY {', '.join([col['column_name'] for col in column_info])}
LIMIT {chunk_size} OFFSET {offset};
Expand Down
Loading
Loading