diff --git a/CHANGES.md b/CHANGES.md index 8cf493e..e1b6121 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,10 @@ ## Unreleased +- MongoDB: Unlock processing multiple collections, either from server database, + or from filesystem directory +- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://` +- MongoDB: Optionally filter server collection using MongoDB query expression ## 2024/09/10 v0.0.22 - MongoDB: Rename columns with leading underscores to use double leading underscores @@ -14,7 +18,7 @@ This means relevant column definitions will not be included into the SQL DDL. - MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy. - MongoDB: Sanitize lists of varying objects -- MongoDB: Add `--treatment` option for applying special treatments to certain items +- MongoDB: Add treatment option for applying special treatments to certain items on real-world data - MongoDB: Use pagination on source collection, for creating batches towards CrateDB - MongoDB: Unlock importing MongoDB Extended JSON files using `file+bson://...` diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index bb416cb..db51691 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -119,19 +119,11 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf if source_url_obj.scheme.startswith("dynamodb"): from cratedb_toolkit.io.dynamodb.api import dynamodb_copy - if not dynamodb_copy(str(source_url_obj), target_url, progress=True): - msg = "Data loading failed" - logger.error(msg) - raise OperationFailed(msg) - - elif source_url_obj.scheme.startswith("file"): - if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme: - mongodb_copy_generic( - str(source_url_obj), - target_url, - transformation=transformation, - progress=True, - ) + if dynamodb_copy(str(source_url_obj), target_url, progress=True): + return True + else: + logger.error("Data loading failed or incomplete") + return False elif source_url_obj.scheme.startswith("influxdb"): from cratedb_toolkit.io.influxdb import influxdb_copy @@ -140,39 +132,31 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf if asbool(source_url_obj.query_params.get("ssl")): http_scheme = "https" source_url_obj.scheme = source_url_obj.scheme.replace("influxdb2", http_scheme) - if not influxdb_copy(str(source_url_obj), target_url, progress=True): - msg = "Data loading failed" - logger.error(msg) - raise OperationFailed(msg) + if influxdb_copy(str(source_url_obj), target_url, progress=True): + return True + else: + logger.error("Data loading failed or incomplete") + return False - elif source_url_obj.scheme.startswith("mongodb"): + elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]: if "+cdc" in source_url_obj.scheme: source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "") from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc - mongodb_relay_cdc(str(source_url_obj), target_url, progress=True) + return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True) else: - mongodb_copy_generic( - str(source_url_obj), + from cratedb_toolkit.io.mongodb.api import mongodb_copy + + if mongodb_copy( + source_url_obj, target_url, transformation=transformation, progress=True, - ) + ): + return True + else: + logger.error("Data loading failed or incomplete") + return False + else: raise NotImplementedError("Importing resource not implemented yet") - - -def mongodb_copy_generic( - source_url: str, target_url: str, transformation: t.Union[Path, None] = None, progress: bool = False -): - from cratedb_toolkit.io.mongodb.api import mongodb_copy - - if not mongodb_copy( - source_url, - target_url, - transformation=transformation, - progress=progress, - ): - msg = "Data loading failed" - logger.error(msg) - raise OperationFailed(msg) diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index 6d188be..cd03961 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -1,4 +1,5 @@ import logging +import sys from pathlib import Path import click @@ -85,4 +86,5 @@ def load_table( cluster = StandaloneCluster(address=address) else: raise NotImplementedError("Unable to select backend") - return cluster.load_table(resource=resource, target=target, transformation=transformation) + if not cluster.load_table(resource=resource, target=target, transformation=transformation): + sys.exit(2) diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index e539e77..9163321 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -1,4 +1,6 @@ +import glob import itertools +import json import logging import typing as t from abc import abstractmethod @@ -27,7 +29,7 @@ class MongoDBAdapterBase: database_name: str collection_name: str - _custom_query_parameters = ["batch-size", "limit", "offset"] + _custom_query_parameters = ["batch-size", "filter", "limit", "offset"] @classmethod def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]): @@ -35,7 +37,6 @@ def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]): url = str(url) mongodb_address = DatabaseAddress.from_string(url) mongodb_uri, mongodb_collection_address = mongodb_address.decode() - logger.info(f"Collection address: {mongodb_collection_address}") mongodb_database = mongodb_collection_address.schema mongodb_collection = mongodb_collection_address.table for custom_query_parameter in cls._custom_query_parameters: @@ -52,7 +53,11 @@ def __attrs_post_init__(self): @cached_property def batch_size(self) -> int: - return int(self.address.uri.query_params.get("batch-size", 500)) + return int(self.address.uri.query_params.get("batch-size", 100)) + + @cached_property + def filter(self) -> t.Union[str, None]: + return json.loads(self.address.uri.query_params.get("filter", "null")) @cached_property def limit(self) -> int: @@ -66,6 +71,10 @@ def offset(self) -> int: def setup(self): raise NotImplementedError() + @abstractmethod + def get_collections(self) -> t.List[str]: + raise NotImplementedError() + @abstractmethod def record_count(self, filter_=None) -> int: raise NotImplementedError() @@ -76,12 +85,15 @@ def query(self): @define -class MongoDBFileAdapter(MongoDBAdapterBase): +class MongoDBFilesystemAdapter(MongoDBAdapterBase): _path: Path = field(init=False) def setup(self): self._path = Path(self.address.uri.path) + def get_collections(self) -> t.List[str]: + return sorted(glob.glob(str(self._path))) + def record_count(self, filter_=None) -> int: """ https://stackoverflow.com/a/27517681 @@ -93,10 +105,14 @@ def record_count(self, filter_=None) -> int: def query(self): if not self._path.exists(): raise FileNotFoundError(f"Resource not found: {self._path}") + if self.filter: + raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader") if self.offset: raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") - if self._path.suffix in [".ndjson", ".jsonl"]: - data = pl.read_ndjson(self._path, batch_size=self.batch_size, n_rows=self.limit or None).to_dicts() + if self._path.suffix in [".json", ".jsonl", ".ndjson"]: + data = pl.read_ndjson( + self._path, batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True + ).to_dicts() elif ".bson" in str(self._path): data = IterableData(str(self._path), options={"format_in": "bson"}).iter() else: @@ -104,6 +120,37 @@ def query(self): return batches(data, self.batch_size) +@define +class MongoDBResourceAdapter(MongoDBAdapterBase): + _url: URL = field(init=False) + + def setup(self): + self._url = self.address.uri + if "+bson" in self._url.scheme: + self._url.scheme = self._url.scheme.replace("+bson", "") + + def get_collections(self) -> t.List[str]: + raise NotImplementedError("HTTP+BSON loader does not support directory inquiry yet") + + def record_count(self, filter_=None) -> int: + return -1 + + def query(self): + if self.filter: + raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader") + if self.offset: + raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") + if self._url.path.endswith(".json") or self._url.path.endswith(".jsonl") or self._url.path.endswith(".ndjson"): + data = pl.read_ndjson( + str(self._url), batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True + ).to_dicts() + elif self._url.path.endswith(".bson"): + raise NotImplementedError("HTTP+BSON loader does not support .bson files yet. SIC") + else: + raise ValueError(f"Unsupported file type: {self._url}") + return batches(data, self.batch_size) + + @define class MongoDBServerAdapter(MongoDBAdapterBase): _mongodb_client: pymongo.MongoClient = field(init=False) @@ -115,20 +162,32 @@ def setup(self): document_class=RawBSONDocument, datetime_conversion="DATETIME_AUTO", ) - self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name] + if self.collection_name: + self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name] + + def get_collections(self) -> t.List[str]: + database = self._mongodb_client.get_database(self.database_name) + return sorted(database.list_collection_names()) def record_count(self, filter_=None) -> int: filter_ = filter_ or {} return self._mongodb_collection.count_documents(filter=filter_) def query(self): - data = self._mongodb_collection.find().batch_size(self.batch_size).skip(self.offset).limit(self.limit) + data = ( + self._mongodb_collection.find(filter=self.filter) + .batch_size(self.batch_size) + .skip(self.offset) + .limit(self.limit) + ) return batches(data, self.batch_size) def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase: if mongodb_uri.scheme.startswith("file"): - return MongoDBFileAdapter.from_url(mongodb_uri) + return MongoDBFilesystemAdapter.from_url(mongodb_uri) + elif mongodb_uri.scheme.startswith("http"): + return MongoDBResourceAdapter.from_url(mongodb_uri) elif mongodb_uri.scheme.startswith("mongodb"): return MongoDBServerAdapter.from_url(mongodb_uri) raise ValueError("Unable to create MongoDB adapter") diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 35acb47..83a9f30 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -3,11 +3,15 @@ import typing as t from pathlib import Path +from boltons.urlutils import URL +from polars.exceptions import PanicException + +from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB from cratedb_toolkit.io.mongodb.copy import MongoDBFullLoad from cratedb_toolkit.io.mongodb.core import export, extract, translate from cratedb_toolkit.io.mongodb.transform import TransformationManager -from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.model import AddressPair, DatabaseAddress from cratedb_toolkit.util.cr8 import cr8_insert_json from cratedb_toolkit.util.database import DatabaseAdapter @@ -86,7 +90,12 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi return True -def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = None, progress: bool = False): +def mongodb_copy( + source_url: t.Union[str, URL], + target_url: t.Union[str, URL], + transformation: t.Union[Path, None] = None, + progress: bool = False, +): """ Transfer MongoDB collection using translator component. @@ -96,22 +105,75 @@ def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = N ctk load table mongodb://localhost:27017/testdrive/demo """ - logger.info(f"Invoking MongoDBFullLoad. source_url={source_url}") + logger.info(f"mongodb_copy. source={source_url}, target={target_url}") + + source_url = URL(source_url) + target_url = URL(target_url) # Optionally configure transformations. tm = None if transformation: tm = TransformationManager(path=transformation) - # Invoke `full-load` procedure. - mdb_full = MongoDBFullLoad( - mongodb_url=source_url, - cratedb_url=target_url, - tm=tm, - progress=progress, - ) - mdb_full.start() - return True + # Check if source address URL includes a table name or not. + has_table = True + if "*" in source_url.path: + has_table = False + mongodb_address = DatabaseAddress(source_url) + mongodb_uri, mongodb_collection_address = mongodb_address.decode() + if mongodb_collection_address.table is None: + has_table = False + + # Build list of tasks. Either a single one when transferring a single + # collection into a table, or multiple ones when transferring multiple + # collections. + tasks = [] + + # `full-load` procedure, single collection. + if has_table: + tasks.append( + MongoDBFullLoad( + mongodb_url=source_url, + cratedb_url=target_url, + tm=tm, + progress=progress, + ) + ) + + # `full-load` procedure, multiple collections. + else: + logger.info(f"Inquiring collections at {source_url}") + address_pair_root = AddressPair(source_url=source_url, target_url=target_url) + + mongodb_adapter = mongodb_adapter_factory(address_pair_root.source_url) + collections = mongodb_adapter.get_collections() + logger.info(f"Discovered collections: {len(collections)}") + logger.debug(f"Processing collections: {collections}") + + for collection_path in collections: + address_pair = address_pair_root.navigate( + source_path=Path(collection_path).name, + target_path=Path(collection_path).stem, + ) + tasks.append( + MongoDBFullLoad( + mongodb_url=address_pair.source_url, + cratedb_url=address_pair.target_url, + tm=tm, + progress=progress, + ) + ) + + outcome = True + for task in tasks: + try: + outcome_task = task.start() + except (Exception, PanicException): + logger.exception("Task failed") + outcome_task = False + outcome = outcome and outcome_task + + return outcome def mongodb_relay_cdc(source_url, target_url, progress: bool = False): diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index f0980af..893a428 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -69,23 +69,25 @@ class MongoDBFullLoad: def __init__( self, - mongodb_url: str, - cratedb_url: str, + mongodb_url: t.Union[str, URL], + cratedb_url: t.Union[str, URL], tm: t.Union[TransformationManager, None], on_error: t.Literal["ignore", "raise"] = "raise", progress: bool = False, debug: bool = True, ): - # Decode database URL: MongoDB. self.mongodb_uri = URL(mongodb_url) + self.cratedb_uri = URL(cratedb_url) + + # Decode database URL: MongoDB. self.mongodb_adapter = mongodb_adapter_factory(self.mongodb_uri) # Decode database URL: CrateDB. - cratedb_address = DatabaseAddress.from_string(cratedb_url) - cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() - cratedb_table = cratedb_table_address.fullname + self.cratedb_address = DatabaseAddress(self.cratedb_uri) + self.cratedb_sqlalchemy_url, self.cratedb_table_address = self.cratedb_address.decode() + cratedb_table = self.cratedb_table_address.fullname - self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) + self.cratedb_adapter = DatabaseAdapter(str(self.cratedb_sqlalchemy_url), echo=False) self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) # Transformation machinery. @@ -107,10 +109,11 @@ def __init__( def start(self): """ - Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB. + Read items from MongoDB table, convert to SQL INSERT statements, and submit to CrateDB. """ + logger.info(f"Starting MongoDBFullLoad. source={self.mongodb_uri}, target={self.cratedb_uri}") records_in = self.mongodb_adapter.record_count() - logger.info(f"Source: MongoDB collection={self.mongodb_adapter.collection_name} count={records_in}") + logger.info(f"Source: MongoDB {self.mongodb_adapter.address} count={records_in}") logger_on_error = logger.warning if self.debug: logger_on_error = logger.exception @@ -141,11 +144,14 @@ def start(self): result = connection.execute(sa.text(operation.statement), operation.parameters) result_size = result.rowcount if result_size < 0: - raise ValueError("Unable to insert one or more records") + raise IOError("Unable to insert one or more records") records_out += result_size progress_bar.update(n=result_size) except Exception as ex: - logger_on_error(f"Executing operation failed: {ex}\nOperation:\n{operation}") + logger_on_error( + f"Executing operation failed: {ex}\n" + f"Statement: {operation.statement}\nParameters: {str(operation.parameters)[:500]} [...]" + ) if self.on_error == "raise": raise continue @@ -155,3 +161,4 @@ def start(self): logger.info(f"Number of records written: {records_out}") if records_out == 0: logger.warning("No data has been copied") + return True diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index 66f443d..1fe3a91 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -1,7 +1,10 @@ import dataclasses import typing as t from copy import deepcopy +from urllib.parse import urljoin +from attr import Factory +from attrs import define from boltons.urlutils import URL @@ -120,3 +123,39 @@ class InputOutputResource: url: str format: t.Optional[str] = None # noqa: A003 compression: t.Optional[str] = None + + +@define +class AddressPair: + """ + Manage two URL instances, specifically a pair of source/target URLs, + where target is mostly a CrateDB Server, while source is any. + """ + + source_url: URL + target_url: URL + + _source_url_query_parameters: t.Dict[str, t.Any] = Factory(dict) + _target_url_query_parameters: t.Dict[str, t.Any] = Factory(dict) + + __SERVER_SCHEMES__ = ["http", "https", "mongodb", "mongodb+srv"] + + def navigate(self, source_path: str, target_path: str) -> "AddressPair": + source_url = deepcopy(self.source_url) + target_url = deepcopy(self.target_url) + + # Q: What the hack? + # A: Adjustments about missing trailing slashes, business as usual. + # It makes subsequent `.navigate()` operations work. + # Remark: It is not applicable for filesystem paths including wildcards, + # like `./datasets/*.ndjson`. In this case, `.navigate()` should + # strip the `*.ndjson` part, and replace it by the designated label. + if source_url.scheme in self.__SERVER_SCHEMES__ and source_url.path[-1] != "/": + source_url.path += "/" + if target_url.path[-1] != "/": + target_url.path += "/" + + source_url.path = urljoin(source_url.path, source_path) + target_url.path = urljoin(target_url.path, target_path) + + return AddressPair(source_url, target_url) diff --git a/cratedb_toolkit/util/database.py b/cratedb_toolkit/util/database.py index ef6af8e..0d4191d 100644 --- a/cratedb_toolkit/util/database.py +++ b/cratedb_toolkit/util/database.py @@ -1,6 +1,7 @@ # Copyright (c) 2023-2024, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. import io +import logging import os import typing as t from pathlib import Path @@ -21,6 +22,8 @@ except ImportError: from typing_extensions import Literal # type: ignore[assignment] +logger = logging.getLogger(__name__) + def run_sql(dburi: str, sql: str, records: bool = False): return DatabaseAdapter(dburi=dburi).run_sql(sql=sql, records=records) @@ -39,6 +42,7 @@ def __init__(self, dburi: str, echo: bool = False): self.dburi = dburi self.engine = sa.create_engine(self.dburi, echo=echo) # TODO: Make that go away. + logger.debug(f"Connecting to CrateDB: {dburi}") self.connection = self.engine.connect() @staticmethod @@ -369,7 +373,7 @@ def sa_is_empty(thing): return isinstance(thing, AsBoolean) -def decode_database_table(url: str) -> t.Tuple[str, str]: +def decode_database_table(url: str) -> t.Tuple[str, t.Union[str, None]]: """ Decode database and table names from database URI path and/or query string. @@ -382,21 +386,33 @@ def decode_database_table(url: str) -> t.Tuple[str, str]: This one uses `boltons`, the other one uses `yarl`. """ url_ = URL(url) + database, table = None, None try: database, table = url_.path.strip("/").split("/") except ValueError as ex: if "too many values to unpack" not in str(ex) and "not enough values to unpack" not in str(ex): raise - database = url_.query_params.get("database") - table = url_.query_params.get("table") - if url_.scheme == "crate" and not database: - database = url_.query_params.get("schema") - if database is None and table is None: - if url_.scheme.startswith("file"): - _, database, table = url_.path.rsplit("/", 2) - table, _ = table.split(".", 1) - if database is None and table is None: - raise ValueError("Database and table must be specified") from ex + try: + (database,) = url_.path.strip("/").split("/") + except ValueError as ex: + if "too many values to unpack" not in str(ex) and "not enough values to unpack" not in str(ex): + raise + + database = url_.query_params.get("database") + table = url_.query_params.get("table") + if url_.scheme == "crate" and not database: + database = url_.query_params.get("schema") + if database is None and table is None: + if url_.scheme.startswith("file") or url_.scheme.startswith("http"): + _, database, table = url_.path.rsplit("/", 2) + + # If table name is coming from a filesystem, strip suffix, e.g. `books-relaxed.ndjson`. + if table: + table, _ = table.split(".", 1) + + if database is None and table is None: + raise ValueError("Database and table must be specified") + return database, table diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index 40e736e..00324b2 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -20,7 +20,11 @@ server instances and filesystems. - `file+bson://` - Read [MongoDB Extended JSON] format from filesystem. + Read files in [MongoDB Extended JSON] or [BSON] format from filesystem. + +- `http+bson://` + + Read files in [MongoDB Extended JSON] or [BSON] format from HTTP resource. ## Install ```shell @@ -32,6 +36,19 @@ The MongoDB I/O adapter can process MongoDB data from different sources. This section enumerates relevant connectivity options on behalf of concrete usage examples. +### MongoDB Atlas +Transfer a single collection from MongoDB Atlas. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/ticker/stocks +ctk load table "mongodb+srv://john:EeY6OocooL8rungu@testdrive.ahnaik1.mongodb.net/ticker/stocks?batch-size=5000" +``` + +Transfer all collections in database from MongoDB Atlas. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/ticker +ctk load table "mongodb+srv://john:EeY6OocooL8rungu@testdrive.ahnaik1.mongodb.net/ticker?batch-size=5000" +``` + ### MongoDB Community and Enterprise Transfer data from MongoDB database/collection into CrateDB schema/table. ```shell @@ -45,52 +62,54 @@ ctk shell --command "SELECT * FROM testdrive.demo;" ctk show table "testdrive.demo" ``` -### MongoDB Atlas -Transfer data from MongoDB Atlas. -```shell -export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo -ctk load table "mongodb+srv://john:EeY6OocooL8rungu@testdrive.ahnaik1.mongodb.net/ticker/stocks?batch-size=5000" -``` - ### MongoDB JSON/BSON files Load data from MongoDB JSON/BSON files, for example produced by the `mongoexport` or `mongodump` programs. -In order to get hold of a few samples worth of data, the canonical MongoDB C -driver's [libbson test files] includes a few. In this case, let's acquire -the collection at [mongodb-json-files]. -```shell -git clone https://github.com/ozlerhakan/mongodb-json-files.git -``` + ```shell -CRATEDB_SQLALCHEMY_BASEURL=crate://crate@localhost:4200/testdrive -ctk load table \ - "file+bson:///path/to/mongodb-json-files/datasets/books.json" \ - --cratedb-sqlalchemy-url="${CRATEDB_SQLALCHEMY_BASEURL}/books" +# Extended JSON, filesystem, full path. +ctk load table "file+bson:///path/to/mongodb-json-files/datasets/books.json" + +# Extended JSON, HTTP resource. +ctk load table "https+bson://github.com/ozlerhakan/mongodb-json-files/raw/master/datasets/books.json" + +# Extended JSON, filesystem, multiple files. +ctk load table "file+bson:///path/to/mongodb-json-files/datasets/*.json" + +# BSON, filesystem, relative path, compressed. +ctk load table "file+bson:./var/data/testdrive/books.bson.gz" ``` -Address relative and/or compressed BSON files like -`file+bson:./tmp/testdrive/books.bson.gz`. -Example queries that fit the schema of `books.json`, and more, can be -found at [](#ready-made-queries). +To exercise a full example importing multiple MongoDB Extended JSON files, +see [](#file-import-tutorial). ## Options ### Batch Size -The default batch size is 500. You can adjust the value by appending the HTTP -URL query parameter `batch-size` to the source URL, like +The default batch size is 100, but for many datasets a much larger batch size +is applicable for most efficient data transfers. You can adjust the value by +appending the HTTP URL query parameter `batch-size` to the source URL, like `mongodb+srv://managed.mongodb.net/ticker/stocks?batch-size=5000`. -### Offset -Use the HTTP URL query parameter `offset` on the source URL, like -`&offset=42`, in order to start processing at this record from the -beginning. +### Filter +Use the HTTP URL query parameter `filter` on the source URL, like +`&filter={"exchange": {"$eq": "NASDAQ"}}`, in order to provide a MongoDB +query filter as a JSON string. +It works in the same way like `mongoexport`'s `--query` option. +On more complex query expressions, make sure to properly encode the right +value using URL/Percent Encoding. ### Limit Use the HTTP URL query parameter `limit` on the source URL, like `&limit=100`, in order to limit processing to a total number of records. +### Offset +Use the HTTP URL query parameter `offset` on the source URL, like +`&offset=42`, in order to start processing at this record from the +beginning. + ## Zyp Transformations You can use [Zyp Transformations] to change the shape of the data while being transferred. In order to add it to the pipeline, use the `--transformation` @@ -127,36 +146,69 @@ db.demo.find({}) EOF ``` -(ready-made-queries)= -### Ready-Made Queries -The [mongodb-json-files] repository includes a few samples worth of data in MongoDB -JSON/BSON format. After importing them, you may want to exercise those SQL queries, -for example using Admin UI or crash. +(file-import-tutorial)= +### File Import Tutorial + +The [mongodb-json-files] repository includes a few samples worth of data in +MongoDB JSON/BSON format. + +:::{rubric} Load +::: +Acquire a copy of the repository. +```shell +git clone https://github.com/ozlerhakan/mongodb-json-files.git +``` +The data import uses a Zyp project file [zyp-mongodb-json-files.yaml] that +describes a few adjustments needed to import all files flawlessly. Let's +acquire that file. +```shell +wget https://github.com/crate/cratedb-toolkit/raw/v0.0.22/examples/zyp/zyp-mongodb-json-files.yaml +``` +Load all referenced `.json` files into corresponding tables within the CrateDB +schema `datasets`, using a batch size of 2,500 items. +```shell +ctk load table \ + "file+bson:///path/to/mongodb-json-files/datasets/*.json?batch-size=2500" \ + --cratedb-sqlalchemy-url="crate://crate@localhost:4200/datasets" \ + --transformation zyp-mongodb-json-files.yaml +``` + +:::{rubric} Query +::: +After importing the example files, you may want to exercise those SQL queries, +for example using Admin UI or crash, to get an idea about how to work with +CrateDB SQL. -#### books.json +**books.json** ```sql SELECT data['title'] AS title, LEFT(data['shortDescription'], 60) AS description, DATE_FORMAT('%Y-%m-%d', data['publishedDate']) AS date, data['isbn'] AS isbn -FROM testdrive.books +FROM datasets.books WHERE 'Java' = ANY(data['categories']) ORDER BY title; ``` -#### city_inspections.json +**city_inspections.json** ```sql SELECT data['sector'] AS sector, data['business_name'] AS name -FROM testdrive.city_inspections +FROM datasets.city_inspections WHERE data['result'] = 'Violation Issued' AND UPPER(data['address']['city']) = 'STATEN ISLAND' ORDER BY sector, name; ``` +:::{tip} +Alternatively, have a look at the canonical +MongoDB C driver's [libbson test files]. +::: + + ### Backlog :::{todo} - Describe usage of `mongoimport` and `mongoexport`. @@ -167,8 +219,10 @@ ORDER BY sector, name; ::: +[BSON]: https://en.wikipedia.org/wiki/BSON [examples/zyp]: https://github.com/crate/cratedb-toolkit/tree/main/examples/zyp [libbson test files]: https://github.com/mongodb/mongo-c-driver/tree/master/src/libbson/tests/json [MongoDB Extended JSON]: https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ [mongodb-json-files]: https://github.com/ozlerhakan/mongodb-json-files [Zyp Transformations]: https://commons-codec.readthedocs.io/zyp/index.html +[zyp-mongodb-json-files.yaml]: https://github.com/crate/cratedb-toolkit/blob/v0.0.22/examples/zyp/zyp-mongodb-json-files.yaml diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py index fad770b..36ac8c1 100644 --- a/tests/io/mongodb/test_copy.py +++ b/tests/io/mongodb/test_copy.py @@ -1,3 +1,9 @@ +import json +from copy import deepcopy +from pathlib import Path +from unittest import mock + +import pymongo import pytest from cratedb_toolkit.io.mongodb.api import mongodb_copy @@ -14,6 +20,140 @@ def check_prerequisites(): check_sqlalchemy2() +def test_mongodb_copy_server_database(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer for all collections in a database. + """ + + # Reset two database tables. + cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."demo1";') + cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."demo2";') + + # Define source and target URLs. + mongodb_url = f"{mongodb.get_connection_url()}/testdrive" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive" + + # Define data. + data_in = {"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1563051934000} + data_out = deepcopy(data_in) + data_out.update({"_id": mock.ANY}) + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo1 = testdrive.create_collection("demo1") + demo1.insert_one(data_in) + demo2 = testdrive.create_collection("demo2") + demo2.insert_one(data_in) + + # Run transfer command. + mongodb_copy( + mongodb_url, + cratedb_url, + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.demo1") + cratedb.database.refresh_table("testdrive.demo2") + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo1;", records=True) + assert results[0]["data"] == data_out + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo2;", records=True) + assert results[0]["data"] == data_out + + +def test_mongodb_copy_server_collection_with_filter(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer for a specific collection, with filtering. + """ + + # Define source and target URLs. + filter_expression = json.dumps({"timestamp": {"$gt": 1563051934050}}) + mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo?filter={filter_expression}" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Define data. + data_in = [ + {"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1563051934000}, + {"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1563051934100}, + ] + data_out = deepcopy(data_in) + data_out[0].update({"_id": mock.ANY}) + data_out[1].update({"_id": mock.ANY}) + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo = testdrive.create_collection("demo") + demo.insert_many(data_in) + + # Run transfer command. + mongodb_copy( + mongodb_url, + cratedb_url, + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.demo") + assert cratedb.database.count_records("testdrive.demo") == 1 + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) + assert results[0]["data"] == data_out[1] + + +def test_mongodb_copy_filesystem_folder_absolute(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer for all files in a folder, with relative addressing. + """ + + # Reset two database tables. + cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."books-canonical";') + cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."books-relaxed";') + + # Define source and target URLs. + path = Path("./tests/io/mongodb/*.ndjson").absolute() + fs_resource = f"file+bson://{path}" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive" + + # Run transfer command. + mongodb_copy( + fs_resource, + cratedb_url, + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.books-canonical") + cratedb.database.refresh_table("testdrive.books-relaxed") + + assert cratedb.database.count_records("testdrive.books-canonical") == 4 + assert cratedb.database.count_records("testdrive.books-relaxed") == 4 + + +def test_mongodb_copy_filesystem_folder_relative(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer for all files in a folder, with relative addressing. + """ + + # Reset two database tables. + cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."books-canonical";') + cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."books-relaxed";') + + # Define source and target URLs. + fs_resource = "file+bson:./tests/io/mongodb/*.ndjson" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive" + + # Run transfer command. + mongodb_copy( + fs_resource, + cratedb_url, + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.books-canonical") + cratedb.database.refresh_table("testdrive.books-relaxed") + + assert cratedb.database.count_records("testdrive.books-canonical") == 4 + assert cratedb.database.count_records("testdrive.books-relaxed") == 4 + + def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb): """ Verify MongoDB Extended JSON -> CrateDB data transfer. @@ -113,3 +253,29 @@ def test_mongodb_copy_filesystem_bson(caplog, cratedb): ) timestamp_type = type_result[0]["type"] assert timestamp_type == "bigint" + + +def test_mongodb_copy_http_json_relaxed(caplog, cratedb): + """ + Verify MongoDB Extended JSON -> CrateDB data transfer, when source file is on HTTP. + """ + + # Define source and target URLs. + json_resource = "https+bson://github.com/ozlerhakan/mongodb-json-files/raw/master/datasets/books.json" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Run transfer command. + mongodb_copy(json_resource, cratedb_url) + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 431 + + # Verify content in target database. + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo WHERE data['_id'] = 1;", records=True) + assert results[0]["data"]["authors"] == [ + "W. Frank Ableson", + "Charlie Collins", + "Robi Sen", + ]