Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add order to limit and offset queries for deterministic results #182

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ def _source_chunk_to_parquet(
table=ddb_reader.execute(
f"""
{base_query}
/* order by all columns for deterministic output */
ORDER BY ALL
LIMIT {chunk_size} OFFSET {offset}
"""
).arrow(),
Expand Down Expand Up @@ -750,6 +752,7 @@ def _join_source_chunk(
result = ddb_reader.execute(
f"""
{joins}
{"ORDER BY ALL" if "ORDER BY" not in joins.upper() else ""}
LIMIT {chunk_size} OFFSET {offset}
"""
).arrow()
Expand Down
7 changes: 6 additions & 1 deletion cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,12 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str:

# perform the select using the cases built above and using chunksize + offset
cursor.execute(
f'SELECT {", ".join(query_parts)} FROM {table_name} LIMIT {chunk_size} OFFSET {offset};'
f"""
SELECT {', '.join(query_parts)}
FROM {table_name}
ORDER BY {', '.join([col['column_name'] for col in column_info])}
LIMIT {chunk_size} OFFSET {offset};
"""
)
# collect the results and include the column name with values
results = [
Expand Down
20 changes: 10 additions & 10 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,10 @@ def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str):
assert result_table.equals(
other=pa.Table.from_pydict(
{
"id1": [1, 2],
"id2": ["a", "a"],
"field1": ["foo", "bar"],
"field2": [True, False],
"field1": ["foo", "foo"],
"field2": [True, True],
"id1": [1, 1],
"id2": ["a", "b"],
},
# use schema from result as a reference for col order
schema=result_table.schema,
Expand Down Expand Up @@ -977,10 +977,10 @@ def test_sqlite_mixed_type_query_to_parquet(
]
# check the values per column
assert parquet.read_table(source=result_filepath).to_pydict() == {
"col_integer": [1, None],
"col_integer": [None, 1],
"col_text": ["sample", "sample"],
"col_blob": [b"sample_blob", b"another_blob"],
"col_real": [0.5, None],
"col_blob": [b"another_blob", b"sample_blob"],
"col_real": [None, 0.5],
}

# run full convert on mixed type database
Expand All @@ -997,10 +997,10 @@ def test_sqlite_mixed_type_query_to_parquet(
assert parquet.read_table(
source=result["Tbl_a.sqlite"][0]["table"][0]
).to_pydict() == {
"Tbl_a_col_integer": [1, None],
"Tbl_a_col_integer": [None, 1],
"Tbl_a_col_text": ["sample", "sample"],
"Tbl_a_col_blob": [b"sample_blob", b"another_blob"],
"Tbl_a_col_real": [0.5, None],
"Tbl_a_col_blob": [b"another_blob", b"sample_blob"],
"Tbl_a_col_real": [None, 0.5],
}


Expand Down
9 changes: 0 additions & 9 deletions tests/test_convert_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from pyarrow import parquet

from cytotable.convert import convert
from cytotable.presets import config
from cytotable.sources import _get_source_filepaths


Expand Down Expand Up @@ -125,11 +124,6 @@ def test_convert_s3_path_sqlite(
race conditions with nested pytest fixture post-yield deletions.
"""

# create a modified join sql for deterministic comparisons
modified_joins = (
str(config["cellprofiler_sqlite_pycytominer"]["CONFIG_JOINS"]) + " ORDER BY ALL"
)

# local sqlite read
local_cytotable_table = parquet.read_table(
source=convert(
Expand All @@ -141,7 +135,6 @@ def test_convert_s3_path_sqlite(
dest_datatype="parquet",
chunk_size=100,
preset="cellprofiler_sqlite_pycytominer",
joins=modified_joins,
)
)

Expand All @@ -161,7 +154,6 @@ def test_convert_s3_path_sqlite(
# sequential s3 SQLite files. See below for more information
# https://cloudpathlib.drivendata.org/stable/caching/#automatically
local_cache_dir=f"{fx_tempdir}/sqlite_s3_cache/1",
joins=modified_joins,
)
)

Expand All @@ -181,7 +173,6 @@ def test_convert_s3_path_sqlite(
# sequential s3 SQLite files. See below for more information
# https://cloudpathlib.drivendata.org/stable/caching/#automatically
local_cache_dir=f"{fx_tempdir}/sqlite_s3_cache/2",
joins=modified_joins,
)
)

Expand Down
Loading