Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

census cell dup check #569

Merged
merged 10 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,18 @@ repos:
- typing_extensions
- types-setuptools
- types-PyYAML
- id: mypy
name: mypy-tools-not-builder
files: ^tools/
exclude: ^tools/cellxgene_census_builder
args: ["--config", "./tools/pyproject.toml"]
additional_dependencies:
- numpy
- pandas-stubs
- typing_extensions

- repo: https://github.com/nbQA-dev/nbQA
rev: 1.7.0
hooks:
- id: nbqa-black
files: ^api/python/notebooks
6 changes: 6 additions & 0 deletions tools/cell_dup_check/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

To install and run from source:
* clone the repo and/or create a copy of this directory
* create a python venv, and install the packages listed in requirements.txt. E.g., `pip install -r requirements.txt`
* open the notebook, set the config variables, run all cells

Empty file.
133 changes: 133 additions & 0 deletions tools/cell_dup_check/_csr_iter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from concurrent import futures
from typing import Generator, Iterator, Literal, Tuple, TypeVar

import numpy as np
import numpy.typing as npt
import pandas as pd
import scipy.sparse as sparse
import tiledbsoma as soma

"""
This might make a good addition to soma.ExperimentAxisQuery.

TODO: this implementation does not leverage the ExperimentAxisQuery internals
for fast CSR creation, and instead uses the (slower) scipy.sparse implementation.
If integrated, it should be migrated to the much faster implementation.
"""

_RT = Tuple[Tuple[npt.NDArray[np.int64], npt.NDArray[np.int64]], sparse.spmatrix]


def X_sparse_iter(
query: soma.ExperimentAxisQuery,
X_name: str = "raw",
row_stride: int = 2**16, # row stride
fmt: Literal["csr", "csc"] = "csr", # the resulting sparse format
be_eager: bool = True,
) -> Iterator[_RT]:
"""
Return a row-wise iterator over the user-specified X SparseNdMatrix, returning for each
iteration step:
* obs coords (coordinates)
* var_coords (coordinates)
* X contents as a SciPy csr_matrix or csc_matrix
The coordinates and X matrix chunks are indexed positionally, i.e. for any
given value in the matrix, X[i, j], the original soma_joinid (aka soma_dim_0
and soma_dim_1) are present in obs_coords[i] and var_coords[j].

Args:
query:
A SOMA ExperimentAxisQuery defining the coordinates over which the iterator will
read.
X_name:
The name of the X layer.
row_stride:
The number of rows to return in each step.
fmt:
The SciPy sparse array layout. Supported: 'csc' and 'csr' (default).
be_eager:
If true, will use multiple threads to parallelize reading
and processing. This will improve speed, but at the cost
of some additional memory use.

Returns:
An iterator which iterates over a tuple of:
obs_coords
var_coords
SciPy sparse matrix

Lifecycle:
experimental
"""
if fmt == "csr":
fmt_ctor = sparse.csr_matrix
elif fmt == "csc":
fmt_ctor = sparse.csc_matrix
else:
raise ValueError("fmt must be 'csr' or 'csc'")

# Lazy partition array by chunk_size on first dimension
obs_coords = query.obs_joinids().to_numpy()
obs_coord_chunker = (obs_coords[i : i + row_stride] for i in range(0, len(obs_coords), row_stride))

# Lazy read into Arrow Table. Yields (coords, Arrow.Table)
X = query._ms.X[X_name]
var_coords = query.var_joinids().to_numpy()
table_reader = (
(
(obs_coords_chunk, var_coords),
X.read(coords=(obs_coords_chunk, var_coords)).tables().concat(),
)
for obs_coords_chunk in obs_coord_chunker
)
if be_eager:
table_reader = (t for t in _EagerIterator(table_reader, query._threadpool))

# lazy reindex of obs coordinates. Yields coords and (data, i, j) as numpy ndarrays
coo_reindexer = (
(
(obs_coords_chunk, var_coords),
(
tbl["soma_data"].to_numpy(),
pd.Index(obs_coords_chunk).get_indexer(tbl["soma_dim_0"].to_numpy()), # type: ignore[no-untyped-call]
query.indexer.by_var(tbl["soma_dim_1"].to_numpy()),
),
)
for (obs_coords_chunk, var_coords), tbl in table_reader
)
if be_eager:
coo_reindexer = (t for t in _EagerIterator(coo_reindexer, query._threadpool))

# lazy convert Arrow table to Scipy sparse.csr_matrix
csr_reader: Generator[_RT, None, None] = (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reader thanks you for the explicit type hint!

(
(obs_coords_chunk, var_coords),
fmt_ctor(
sparse.coo_matrix(
(data, (i, j)),
shape=(len(obs_coords_chunk), query.n_vars),
)
),
)
for (obs_coords_chunk, var_coords), (data, i, j) in coo_reindexer
)
if be_eager:
csr_reader = (t for t in _EagerIterator(csr_reader, query._threadpool))

yield from csr_reader


_T = TypeVar("_T")


class _EagerIterator(Iterator[_T]):
def __init__(self, iterator: Iterator[_T], pool: futures.Executor):
super().__init__()
self.iterator = iterator
self._pool = pool
self._future = self._pool.submit(self.iterator.__next__)

def __next__(self) -> _T:
res = self._future.result()
self._future = self._pool.submit(self.iterator.__next__)
return res
Loading