Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB: Improve support for reading JSON/BSON files #261

Merged
merged 10 commits into from
Sep 13, 2024
6 changes: 5 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@


## Unreleased
- MongoDB: Unlock processing multiple collections, either from server database,
or from filesystem directory
- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://`
- MongoDB: Optionally filter server collection using MongoDB query expression

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand All @@ -14,7 +18,7 @@
This means relevant column definitions will not be included into the SQL DDL.
- MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy.
- MongoDB: Sanitize lists of varying objects
- MongoDB: Add `--treatment` option for applying special treatments to certain items
- MongoDB: Add treatment option for applying special treatments to certain items
on real-world data
- MongoDB: Use pagination on source collection, for creating batches towards CrateDB
- MongoDB: Unlock importing MongoDB Extended JSON files using `file+bson://...`
Expand Down
60 changes: 22 additions & 38 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,11 @@
if source_url_obj.scheme.startswith("dynamodb"):
from cratedb_toolkit.io.dynamodb.api import dynamodb_copy

if not dynamodb_copy(str(source_url_obj), target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)

elif source_url_obj.scheme.startswith("file"):
if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme:
mongodb_copy_generic(
str(source_url_obj),
target_url,
transformation=transformation,
progress=True,
)
if dynamodb_copy(str(source_url_obj), target_url, progress=True):
return True

Check warning on line 123 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L122-L123

Added lines #L122 - L123 were not covered by tests
else:
logger.error("Data loading failed or incomplete")
return False

Check warning on line 126 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L125-L126

Added lines #L125 - L126 were not covered by tests

elif source_url_obj.scheme.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy
Expand All @@ -140,39 +132,31 @@
if asbool(source_url_obj.query_params.get("ssl")):
http_scheme = "https"
source_url_obj.scheme = source_url_obj.scheme.replace("influxdb2", http_scheme)
if not influxdb_copy(str(source_url_obj), target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
if influxdb_copy(str(source_url_obj), target_url, progress=True):
return True

Check warning on line 136 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L135-L136

Added lines #L135 - L136 were not covered by tests
else:
logger.error("Data loading failed or incomplete")
return False

Check warning on line 139 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L138-L139

Added lines #L138 - L139 were not covered by tests

elif source_url_obj.scheme.startswith("mongodb"):
elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]:
if "+cdc" in source_url_obj.scheme:
source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "")
from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

mongodb_relay_cdc(str(source_url_obj), target_url, progress=True)
return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True)

Check warning on line 146 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L146

Added line #L146 was not covered by tests
else:
mongodb_copy_generic(
str(source_url_obj),
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if mongodb_copy(
source_url_obj,
target_url,
transformation=transformation,
progress=True,
)
):
return True
else:
logger.error("Data loading failed or incomplete")
return False

Check warning on line 159 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L158-L159

Added lines #L158 - L159 were not covered by tests

else:
raise NotImplementedError("Importing resource not implemented yet")


def mongodb_copy_generic(
source_url: str, target_url: str, transformation: t.Union[Path, None] = None, progress: bool = False
):
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(
source_url,
target_url,
transformation=transformation,
progress=progress,
):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
4 changes: 3 additions & 1 deletion cratedb_toolkit/io/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import sys
from pathlib import Path

import click
Expand Down Expand Up @@ -85,4 +86,5 @@
cluster = StandaloneCluster(address=address)
else:
raise NotImplementedError("Unable to select backend")
return cluster.load_table(resource=resource, target=target, transformation=transformation)
if not cluster.load_table(resource=resource, target=target, transformation=transformation):
sys.exit(2)

Check warning on line 90 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L90

Added line #L90 was not covered by tests
77 changes: 68 additions & 9 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import glob
import itertools
import json
import logging
import typing as t
from abc import abstractmethod
Expand Down Expand Up @@ -27,15 +29,14 @@
database_name: str
collection_name: str

_custom_query_parameters = ["batch-size", "limit", "offset"]
_custom_query_parameters = ["batch-size", "filter", "limit", "offset"]

@classmethod
def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]):
if not isinstance(url, str):
url = str(url)
mongodb_address = DatabaseAddress.from_string(url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
logger.info(f"Collection address: {mongodb_collection_address}")
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table
for custom_query_parameter in cls._custom_query_parameters:
Expand All @@ -52,7 +53,11 @@

@cached_property
def batch_size(self) -> int:
return int(self.address.uri.query_params.get("batch-size", 500))
return int(self.address.uri.query_params.get("batch-size", 100))

@cached_property
def filter(self) -> t.Union[str, None]:
return json.loads(self.address.uri.query_params.get("filter", "null"))

@cached_property
def limit(self) -> int:
Expand All @@ -66,6 +71,10 @@
def setup(self):
raise NotImplementedError()

@abstractmethod
def get_collections(self) -> t.List[str]:
raise NotImplementedError()

Check warning on line 76 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L76

Added line #L76 was not covered by tests

@abstractmethod
def record_count(self, filter_=None) -> int:
raise NotImplementedError()
Expand All @@ -76,12 +85,15 @@


@define
class MongoDBFileAdapter(MongoDBAdapterBase):
class MongoDBFilesystemAdapter(MongoDBAdapterBase):
_path: Path = field(init=False)

def setup(self):
self._path = Path(self.address.uri.path)

def get_collections(self) -> t.List[str]:
return sorted(glob.glob(str(self._path)))

def record_count(self, filter_=None) -> int:
"""
https://stackoverflow.com/a/27517681
Expand All @@ -93,17 +105,52 @@
def query(self):
if not self._path.exists():
raise FileNotFoundError(f"Resource not found: {self._path}")
if self.filter:
raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader")

Check warning on line 109 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L109

Added line #L109 was not covered by tests
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")
if self._path.suffix in [".ndjson", ".jsonl"]:
data = pl.read_ndjson(self._path, batch_size=self.batch_size, n_rows=self.limit or None).to_dicts()
if self._path.suffix in [".json", ".jsonl", ".ndjson"]:
data = pl.read_ndjson(
self._path, batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True
).to_dicts()
elif ".bson" in str(self._path):
data = IterableData(str(self._path), options={"format_in": "bson"}).iter()
else:
raise ValueError(f"Unsupported file type: {self._path.suffix}")
return batches(data, self.batch_size)


@define
class MongoDBResourceAdapter(MongoDBAdapterBase):
_url: URL = field(init=False)

def setup(self):
self._url = self.address.uri
if "+bson" in self._url.scheme:
self._url.scheme = self._url.scheme.replace("+bson", "")

def get_collections(self) -> t.List[str]:
raise NotImplementedError("HTTP+BSON loader does not support directory inquiry yet")

Check warning on line 133 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L133

Added line #L133 was not covered by tests

def record_count(self, filter_=None) -> int:
return -1

def query(self):
if self.filter:
raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader")

Check warning on line 140 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L140

Added line #L140 was not covered by tests
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")

Check warning on line 142 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L142

Added line #L142 was not covered by tests
if self._url.path.endswith(".json") or self._url.path.endswith(".jsonl") or self._url.path.endswith(".ndjson"):
data = pl.read_ndjson(
str(self._url), batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True
).to_dicts()
elif self._url.path.endswith(".bson"):
raise NotImplementedError("HTTP+BSON loader does not support .bson files yet. SIC")

Check warning on line 148 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L147-L148

Added lines #L147 - L148 were not covered by tests
else:
raise ValueError(f"Unsupported file type: {self._url}")

Check warning on line 150 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L150

Added line #L150 was not covered by tests
return batches(data, self.batch_size)


@define
class MongoDBServerAdapter(MongoDBAdapterBase):
_mongodb_client: pymongo.MongoClient = field(init=False)
Expand All @@ -115,20 +162,32 @@
document_class=RawBSONDocument,
datetime_conversion="DATETIME_AUTO",
)
self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name]
if self.collection_name:
self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name]

def get_collections(self) -> t.List[str]:
database = self._mongodb_client.get_database(self.database_name)
return sorted(database.list_collection_names())

def record_count(self, filter_=None) -> int:
filter_ = filter_ or {}
return self._mongodb_collection.count_documents(filter=filter_)

def query(self):
data = self._mongodb_collection.find().batch_size(self.batch_size).skip(self.offset).limit(self.limit)
data = (
self._mongodb_collection.find(filter=self.filter)
.batch_size(self.batch_size)
.skip(self.offset)
.limit(self.limit)
)
return batches(data, self.batch_size)


def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase:
if mongodb_uri.scheme.startswith("file"):
return MongoDBFileAdapter.from_url(mongodb_uri)
return MongoDBFilesystemAdapter.from_url(mongodb_uri)
elif mongodb_uri.scheme.startswith("http"):
return MongoDBResourceAdapter.from_url(mongodb_uri)
elif mongodb_uri.scheme.startswith("mongodb"):
return MongoDBServerAdapter.from_url(mongodb_uri)
raise ValueError("Unable to create MongoDB adapter")
86 changes: 74 additions & 12 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import typing as t
from pathlib import Path

from boltons.urlutils import URL
from polars.exceptions import PanicException

from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
from cratedb_toolkit.io.mongodb.copy import MongoDBFullLoad
from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.model import AddressPair, DatabaseAddress
from cratedb_toolkit.util.cr8 import cr8_insert_json
from cratedb_toolkit.util.database import DatabaseAdapter

Expand Down Expand Up @@ -86,7 +90,12 @@
return True


def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = None, progress: bool = False):
def mongodb_copy(
source_url: t.Union[str, URL],
target_url: t.Union[str, URL],
transformation: t.Union[Path, None] = None,
progress: bool = False,
):
"""
Transfer MongoDB collection using translator component.

Expand All @@ -96,22 +105,75 @@
ctk load table mongodb://localhost:27017/testdrive/demo
"""

logger.info(f"Invoking MongoDBFullLoad. source_url={source_url}")
logger.info(f"mongodb_copy. source={source_url}, target={target_url}")

source_url = URL(source_url)
target_url = URL(target_url)

# Optionally configure transformations.
tm = None
if transformation:
tm = TransformationManager(path=transformation)

# Invoke `full-load` procedure.
mdb_full = MongoDBFullLoad(
mongodb_url=source_url,
cratedb_url=target_url,
tm=tm,
progress=progress,
)
mdb_full.start()
return True
# Check if source address URL includes a table name or not.
has_table = True
if "*" in source_url.path:
has_table = False
mongodb_address = DatabaseAddress(source_url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
if mongodb_collection_address.table is None:
has_table = False

# Build list of tasks. Either a single one when transferring a single
# collection into a table, or multiple ones when transferring multiple
# collections.
tasks = []

# `full-load` procedure, single collection.
if has_table:
tasks.append(
MongoDBFullLoad(
mongodb_url=source_url,
cratedb_url=target_url,
tm=tm,
progress=progress,
)
)

# `full-load` procedure, multiple collections.
else:
logger.info(f"Inquiring collections at {source_url}")
address_pair_root = AddressPair(source_url=source_url, target_url=target_url)

mongodb_adapter = mongodb_adapter_factory(address_pair_root.source_url)
collections = mongodb_adapter.get_collections()
logger.info(f"Discovered collections: {len(collections)}")
logger.debug(f"Processing collections: {collections}")

for collection_path in collections:
address_pair = address_pair_root.navigate(
source_path=Path(collection_path).name,
target_path=Path(collection_path).stem,
)
tasks.append(
MongoDBFullLoad(
mongodb_url=address_pair.source_url,
cratedb_url=address_pair.target_url,
tm=tm,
progress=progress,
)
)

outcome = True
for task in tasks:
try:
outcome_task = task.start()
except (Exception, PanicException):
logger.exception("Task failed")
outcome_task = False

Check warning on line 173 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L171-L173

Added lines #L171 - L173 were not covered by tests
outcome = outcome and outcome_task

return outcome


def mongodb_relay_cdc(source_url, target_url, progress: bool = False):
Expand Down
Loading