Skip to content

Commit

Permalink
MongoDB: Filter server collection using MongoDB query expression
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 11, 2024
1 parent e3810ad commit e8001c0
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- MongoDB: Unlock processing multiple collections, either from server database,
or from filesystem directory
- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://`
- MongoDB: Optionally filter server collection using MongoDB query expression

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
18 changes: 16 additions & 2 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import glob
import itertools
import json
import logging
import typing as t
from abc import abstractmethod
Expand Down Expand Up @@ -28,7 +29,7 @@ class MongoDBAdapterBase:
database_name: str
collection_name: str

_custom_query_parameters = ["batch-size", "limit", "offset"]
_custom_query_parameters = ["batch-size", "filter", "limit", "offset"]

@classmethod
def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]):
Expand All @@ -54,6 +55,10 @@ def __attrs_post_init__(self):
def batch_size(self) -> int:
return int(self.address.uri.query_params.get("batch-size", 500))

@cached_property
def filter(self) -> t.Union[str, None]:
return json.loads(self.address.uri.query_params.get("filter", "null"))

@cached_property
def limit(self) -> int:
return int(self.address.uri.query_params.get("limit", 0))
Expand Down Expand Up @@ -100,6 +105,8 @@ def record_count(self, filter_=None) -> int:
def query(self):
if not self._path.exists():
raise FileNotFoundError(f"Resource not found: {self._path}")
if self.filter:
raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader")

Check warning on line 109 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L109

Added line #L109 was not covered by tests
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")
if self._path.suffix in [".json", ".jsonl", ".ndjson"]:
Expand Down Expand Up @@ -129,6 +136,8 @@ def record_count(self, filter_=None) -> int:
return -1

def query(self):
if self.filter:
raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader")

Check warning on line 140 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L140

Added line #L140 was not covered by tests
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")

Check warning on line 142 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L142

Added line #L142 was not covered by tests
if self._url.path.endswith(".json") or self._url.path.endswith(".jsonl") or self._url.path.endswith(".ndjson"):
Expand Down Expand Up @@ -165,7 +174,12 @@ def record_count(self, filter_=None) -> int:
return self._mongodb_collection.count_documents(filter=filter_)

def query(self):
data = self._mongodb_collection.find().batch_size(self.batch_size).skip(self.offset).limit(self.limit)
data = (
self._mongodb_collection.find(filter=self.filter)
.batch_size(self.batch_size)
.skip(self.offset)
.limit(self.limit)
)
return batches(data, self.batch_size)


Expand Down
16 changes: 12 additions & 4 deletions doc/io/mongodb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,24 @@ The default batch size is 500. You can adjust the value by appending the HTTP
URL query parameter `batch-size` to the source URL, like
`mongodb+srv://managed.mongodb.net/ticker/stocks?batch-size=5000`.

### Offset
Use the HTTP URL query parameter `offset` on the source URL, like
`&offset=42`, in order to start processing at this record from the
beginning.
### Filter
Use the HTTP URL query parameter `filter` on the source URL, like
`&filter={"exchange": {"$eq": "NASDAQ"}}`, in order to provide a MongoDB
query filter as a JSON string.
It works in the same way like `mongoexport`'s `--query` option.
On more complex query expressions, make sure to properly encode the right
value using URL/Percent Encoding.

### Limit
Use the HTTP URL query parameter `limit` on the source URL, like
`&limit=100`, in order to limit processing to a total number of
records.

### Offset
Use the HTTP URL query parameter `offset` on the source URL, like
`&offset=42`, in order to start processing at this record from the
beginning.

## Zyp Transformations
You can use [Zyp Transformations] to change the shape of the data while being
transferred. In order to add it to the pipeline, use the `--transformation`
Expand Down

0 comments on commit e8001c0

Please sign in to comment.