From 3322ce2bf1b9f833dc2e30578a527660017d04c4 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 11 Sep 2024 02:23:01 +0200 Subject: [PATCH 01/10] MongoDB: Unlock processing multiple collections Either from server database, or from filesystem directory. --- CHANGES.md | 4 +- cratedb_toolkit/io/mongodb/adapter.py | 22 ++++-- cratedb_toolkit/io/mongodb/api.py | 66 ++++++++++++++--- cratedb_toolkit/io/mongodb/copy.py | 18 +++-- cratedb_toolkit/util/database.py | 38 +++++++--- doc/io/mongodb/loader.md | 100 ++++++++++++++++++-------- tests/io/mongodb/test_copy.py | 68 ++++++++++++++++++ 7 files changed, 253 insertions(+), 63 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 8cf493e5..d3fa3137 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,8 @@ ## Unreleased +- MongoDB: Unlock processing multiple collections, either from server database, + or from filesystem directory ## 2024/09/10 v0.0.22 - MongoDB: Rename columns with leading underscores to use double leading underscores @@ -14,7 +16,7 @@ This means relevant column definitions will not be included into the SQL DDL. - MongoDB: Make `ctk load table` use the `data OBJECT(DYNAMIC)` mapping strategy. - MongoDB: Sanitize lists of varying objects -- MongoDB: Add `--treatment` option for applying special treatments to certain items +- MongoDB: Add treatment option for applying special treatments to certain items on real-world data - MongoDB: Use pagination on source collection, for creating batches towards CrateDB - MongoDB: Unlock importing MongoDB Extended JSON files using `file+bson://...` diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index e539e778..c082ceba 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -1,3 +1,4 @@ +import glob import itertools import logging import typing as t @@ -35,7 +36,6 @@ def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]): url = str(url) mongodb_address = DatabaseAddress.from_string(url) mongodb_uri, mongodb_collection_address = mongodb_address.decode() - logger.info(f"Collection address: {mongodb_collection_address}") mongodb_database = mongodb_collection_address.schema mongodb_collection = mongodb_collection_address.table for custom_query_parameter in cls._custom_query_parameters: @@ -66,6 +66,10 @@ def offset(self) -> int: def setup(self): raise NotImplementedError() + @abstractmethod + def get_collections(self) -> t.List[str]: + raise NotImplementedError() + @abstractmethod def record_count(self, filter_=None) -> int: raise NotImplementedError() @@ -82,6 +86,9 @@ class MongoDBFileAdapter(MongoDBAdapterBase): def setup(self): self._path = Path(self.address.uri.path) + def get_collections(self) -> t.List[str]: + return list(glob.glob(str(self._path))) + def record_count(self, filter_=None) -> int: """ https://stackoverflow.com/a/27517681 @@ -95,8 +102,10 @@ def query(self): raise FileNotFoundError(f"Resource not found: {self._path}") if self.offset: raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") - if self._path.suffix in [".ndjson", ".jsonl"]: - data = pl.read_ndjson(self._path, batch_size=self.batch_size, n_rows=self.limit or None).to_dicts() + if self._path.suffix in [".json", ".jsonl", ".ndjson"]: + data = pl.read_ndjson( + self._path, batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True + ).to_dicts() elif ".bson" in str(self._path): data = IterableData(str(self._path), options={"format_in": "bson"}).iter() else: @@ -115,7 +124,12 @@ def setup(self): document_class=RawBSONDocument, datetime_conversion="DATETIME_AUTO", ) - self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name] + if self.collection_name: + self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name] + + def get_collections(self) -> t.List[str]: + database = self._mongodb_client.get_database(self.database_name) + return database.list_collection_names() def record_count(self, filter_=None) -> int: filter_ = filter_ or {} diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 35acb47b..05caff63 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -3,6 +3,9 @@ import typing as t from pathlib import Path +from boltons.urlutils import URL + +from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB from cratedb_toolkit.io.mongodb.copy import MongoDBFullLoad from cratedb_toolkit.io.mongodb.core import export, extract, translate @@ -96,22 +99,67 @@ def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = N ctk load table mongodb://localhost:27017/testdrive/demo """ - logger.info(f"Invoking MongoDBFullLoad. source_url={source_url}") + logger.info(f"mongodb_copy. source={source_url}, target={target_url}") # Optionally configure transformations. tm = None if transformation: tm = TransformationManager(path=transformation) + tasks = [] + + has_table = True + if "*" in source_url: + has_table = False + mongodb_address = DatabaseAddress.from_string(source_url) + mongodb_uri, mongodb_collection_address = mongodb_address.decode() + if mongodb_collection_address.table is None: + has_table = False + # Invoke `full-load` procedure. - mdb_full = MongoDBFullLoad( - mongodb_url=source_url, - cratedb_url=target_url, - tm=tm, - progress=progress, - ) - mdb_full.start() - return True + if has_table: + tasks.append( + MongoDBFullLoad( + mongodb_url=source_url, + cratedb_url=target_url, + tm=tm, + progress=progress, + ) + ) + else: + logger.info(f"Inquiring collections at {source_url}") + mongodb_uri = URL(source_url) + cratedb_uri = URL(target_url) + if cratedb_uri.path[-1] != "/": + cratedb_uri.path += "/" + mongodb_query_parameters = mongodb_uri.query_params + mongodb_adapter = mongodb_adapter_factory(mongodb_uri) + collections = mongodb_adapter.get_collections() + logger.info(f"Discovered collections: {len(collections)}") + logger.debug(f"Processing collections: {collections}") + for collection_path in collections: + mongodb_uri_effective = mongodb_uri.navigate(Path(collection_path).name) + mongodb_uri_effective.query_params = mongodb_query_parameters + cratedb_uri_effective = cratedb_uri.navigate(Path(collection_path).stem) + tasks.append( + MongoDBFullLoad( + mongodb_url=str(mongodb_uri_effective), + cratedb_url=str(cratedb_uri_effective), + tm=tm, + progress=progress, + ) + ) + + outcome = True + for task in tasks: + try: + outcome_task = task.start() + except Exception: + logger.exception("Task failed") + outcome_task = False + outcome = outcome and outcome_task + + return outcome def mongodb_relay_cdc(source_url, target_url, progress: bool = False): diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index f0980af1..99aadc54 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -76,16 +76,18 @@ def __init__( progress: bool = False, debug: bool = True, ): - # Decode database URL: MongoDB. self.mongodb_uri = URL(mongodb_url) + self.cratedb_uri = URL(cratedb_url) + + # Decode database URL: MongoDB. self.mongodb_adapter = mongodb_adapter_factory(self.mongodb_uri) # Decode database URL: CrateDB. - cratedb_address = DatabaseAddress.from_string(cratedb_url) - cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() - cratedb_table = cratedb_table_address.fullname + self.cratedb_address = DatabaseAddress.from_string(cratedb_url) + self.cratedb_sqlalchemy_url, self.cratedb_table_address = self.cratedb_address.decode() + cratedb_table = self.cratedb_table_address.fullname - self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) + self.cratedb_adapter = DatabaseAdapter(str(self.cratedb_sqlalchemy_url), echo=False) self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) # Transformation machinery. @@ -107,10 +109,11 @@ def __init__( def start(self): """ - Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB. + Read items from MongoDB table, convert to SQL INSERT statements, and submit to CrateDB. """ + logger.info(f"Starting MongoDBFullLoad. source={self.mongodb_uri}, target={self.cratedb_uri}") records_in = self.mongodb_adapter.record_count() - logger.info(f"Source: MongoDB collection={self.mongodb_adapter.collection_name} count={records_in}") + logger.info(f"Source: MongoDB {self.mongodb_adapter.address} count={records_in}") logger_on_error = logger.warning if self.debug: logger_on_error = logger.exception @@ -155,3 +158,4 @@ def start(self): logger.info(f"Number of records written: {records_out}") if records_out == 0: logger.warning("No data has been copied") + return True diff --git a/cratedb_toolkit/util/database.py b/cratedb_toolkit/util/database.py index ef6af8ee..7de7e409 100644 --- a/cratedb_toolkit/util/database.py +++ b/cratedb_toolkit/util/database.py @@ -1,6 +1,7 @@ # Copyright (c) 2023-2024, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. import io +import logging import os import typing as t from pathlib import Path @@ -21,6 +22,8 @@ except ImportError: from typing_extensions import Literal # type: ignore[assignment] +logger = logging.getLogger(__name__) + def run_sql(dburi: str, sql: str, records: bool = False): return DatabaseAdapter(dburi=dburi).run_sql(sql=sql, records=records) @@ -39,6 +42,7 @@ def __init__(self, dburi: str, echo: bool = False): self.dburi = dburi self.engine = sa.create_engine(self.dburi, echo=echo) # TODO: Make that go away. + logger.debug(f"Connecting to CrateDB: {dburi}") self.connection = self.engine.connect() @staticmethod @@ -369,7 +373,7 @@ def sa_is_empty(thing): return isinstance(thing, AsBoolean) -def decode_database_table(url: str) -> t.Tuple[str, str]: +def decode_database_table(url: str) -> t.Tuple[str, t.Union[str, None]]: """ Decode database and table names from database URI path and/or query string. @@ -382,21 +386,33 @@ def decode_database_table(url: str) -> t.Tuple[str, str]: This one uses `boltons`, the other one uses `yarl`. """ url_ = URL(url) + database, table = None, None try: database, table = url_.path.strip("/").split("/") except ValueError as ex: if "too many values to unpack" not in str(ex) and "not enough values to unpack" not in str(ex): raise - database = url_.query_params.get("database") - table = url_.query_params.get("table") - if url_.scheme == "crate" and not database: - database = url_.query_params.get("schema") - if database is None and table is None: - if url_.scheme.startswith("file"): - _, database, table = url_.path.rsplit("/", 2) - table, _ = table.split(".", 1) - if database is None and table is None: - raise ValueError("Database and table must be specified") from ex + try: + (database,) = url_.path.strip("/").split("/") + except ValueError as ex: + if "too many values to unpack" not in str(ex) and "not enough values to unpack" not in str(ex): + raise + + database = url_.query_params.get("database") + table = url_.query_params.get("table") + if url_.scheme == "crate" and not database: + database = url_.query_params.get("schema") + if database is None and table is None: + if url_.scheme.startswith("file"): + _, database, table = url_.path.rsplit("/", 2) + + # If table name is coming from a filesystem, strip suffix, e.g. `books-relaxed.ndjson`. + if table: + table, _ = table.split(".", 1) + + if database is None and table is None: + raise ValueError("Database and table must be specified") + return database, table diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index 40e736ee..b5403743 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -20,7 +20,7 @@ server instances and filesystems. - `file+bson://` - Read [MongoDB Extended JSON] format from filesystem. + Read files in [MongoDB Extended JSON] or [BSON] format from filesystem. ## Install ```shell @@ -32,6 +32,19 @@ The MongoDB I/O adapter can process MongoDB data from different sources. This section enumerates relevant connectivity options on behalf of concrete usage examples. +### MongoDB Atlas +Transfer a single collection from MongoDB Atlas. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/ticker/stocks +ctk load table "mongodb+srv://john:EeY6OocooL8rungu@testdrive.ahnaik1.mongodb.net/ticker/stocks?batch-size=5000" +``` + +Transfer all collections in database from MongoDB Atlas. +```shell +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/ticker +ctk load table "mongodb+srv://john:EeY6OocooL8rungu@testdrive.ahnaik1.mongodb.net/ticker?batch-size=5000" +``` + ### MongoDB Community and Enterprise Transfer data from MongoDB database/collection into CrateDB schema/table. ```shell @@ -45,33 +58,23 @@ ctk shell --command "SELECT * FROM testdrive.demo;" ctk show table "testdrive.demo" ``` -### MongoDB Atlas -Transfer data from MongoDB Atlas. -```shell -export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo -ctk load table "mongodb+srv://john:EeY6OocooL8rungu@testdrive.ahnaik1.mongodb.net/ticker/stocks?batch-size=5000" -``` - ### MongoDB JSON/BSON files Load data from MongoDB JSON/BSON files, for example produced by the `mongoexport` or `mongodump` programs. -In order to get hold of a few samples worth of data, the canonical MongoDB C -driver's [libbson test files] includes a few. In this case, let's acquire -the collection at [mongodb-json-files]. -```shell -git clone https://github.com/ozlerhakan/mongodb-json-files.git -``` + ```shell -CRATEDB_SQLALCHEMY_BASEURL=crate://crate@localhost:4200/testdrive -ctk load table \ - "file+bson:///path/to/mongodb-json-files/datasets/books.json" \ - --cratedb-sqlalchemy-url="${CRATEDB_SQLALCHEMY_BASEURL}/books" +# Extended JSON, full path. +ctk load table "file+bson:///path/to/mongodb-json-files/datasets/books.json" + +# Extended JSON, multiple files. +ctk load table "file+bson:///path/to/mongodb-json-files/datasets/*.json" + +# BSON, compressed, relative path. +ctk load table "file+bson:./var/data/testdrive/books.bson.gz" ``` -Address relative and/or compressed BSON files like -`file+bson:./tmp/testdrive/books.bson.gz`. -Example queries that fit the schema of `books.json`, and more, can be -found at [](#ready-made-queries). +To exercise a full example importing multiple MongoDB Extended JSON files, +see [](#file-import-tutorial). ## Options @@ -127,36 +130,69 @@ db.demo.find({}) EOF ``` -(ready-made-queries)= -### Ready-Made Queries -The [mongodb-json-files] repository includes a few samples worth of data in MongoDB -JSON/BSON format. After importing them, you may want to exercise those SQL queries, -for example using Admin UI or crash. +(file-import-tutorial)= +### File Import Tutorial + +The [mongodb-json-files] repository includes a few samples worth of data in +MongoDB JSON/BSON format. + +:::{rubric} Load +::: +Acquire a copy of the repository. +```shell +git clone https://github.com/ozlerhakan/mongodb-json-files.git +``` +The data import uses a Zyp project file [zyp-mongodb-json-files.yaml] that +describes a few adjustments needed to import all files flawlessly. Let's +acquire that file. +```shell +wget https://github.com/crate/cratedb-toolkit/raw/v0.0.22/examples/zyp/zyp-mongodb-json-files.yaml +``` +Load all referenced `.json` files into corresponding tables within the CrateDB +schema `datasets`, using a batch size of 2,500 items. +```shell +ctk load table \ + "file+bson:///path/to/mongodb-json-files/datasets/*.json?batch-size=2500" \ + --cratedb-sqlalchemy-url="crate://crate@localhost:4200/datasets" \ + --transformation zyp-mongodb-json-files.yaml +``` + +:::{rubric} Query +::: +After importing the example files, you may want to exercise those SQL queries, +for example using Admin UI or crash, to get an idea about how to work with +CrateDB SQL. -#### books.json +**books.json** ```sql SELECT data['title'] AS title, LEFT(data['shortDescription'], 60) AS description, DATE_FORMAT('%Y-%m-%d', data['publishedDate']) AS date, data['isbn'] AS isbn -FROM testdrive.books +FROM datasets.books WHERE 'Java' = ANY(data['categories']) ORDER BY title; ``` -#### city_inspections.json +**city_inspections.json** ```sql SELECT data['sector'] AS sector, data['business_name'] AS name -FROM testdrive.city_inspections +FROM datasets.city_inspections WHERE data['result'] = 'Violation Issued' AND UPPER(data['address']['city']) = 'STATEN ISLAND' ORDER BY sector, name; ``` +:::{tip} +Alternatively, have a look at the canonical +MongoDB C driver's [libbson test files]. +::: + + ### Backlog :::{todo} - Describe usage of `mongoimport` and `mongoexport`. @@ -167,8 +203,10 @@ ORDER BY sector, name; ::: +[BSON]: https://en.wikipedia.org/wiki/BSON [examples/zyp]: https://github.com/crate/cratedb-toolkit/tree/main/examples/zyp [libbson test files]: https://github.com/mongodb/mongo-c-driver/tree/master/src/libbson/tests/json [MongoDB Extended JSON]: https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ [mongodb-json-files]: https://github.com/ozlerhakan/mongodb-json-files [Zyp Transformations]: https://commons-codec.readthedocs.io/zyp/index.html +[zyp-mongodb-json-files.yaml]: https://github.com/crate/cratedb-toolkit/blob/v0.0.22/examples/zyp/zyp-mongodb-json-files.yaml diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py index fad770b9..c7cac80a 100644 --- a/tests/io/mongodb/test_copy.py +++ b/tests/io/mongodb/test_copy.py @@ -1,3 +1,7 @@ +from copy import deepcopy +from unittest import mock + +import pymongo import pytest from cratedb_toolkit.io.mongodb.api import mongodb_copy @@ -14,6 +18,70 @@ def check_prerequisites(): check_sqlalchemy2() +def test_mongodb_copy_server_database(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer for all collections in a database. + """ + + # Define source and target URLs. + mongodb_url = f"{mongodb.get_connection_url()}/testdrive" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive" + + # Define data. + data_in = {"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1563051934000} + data_out = deepcopy(data_in) + data_out.update({"_id": mock.ANY}) + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo1 = testdrive.create_collection("demo1") + demo1.insert_one(data_in) + demo2 = testdrive.create_collection("demo2") + demo2.insert_one(data_in) + + # Run transfer command. + mongodb_copy( + mongodb_url, + cratedb_url, + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.demo1") + cratedb.database.refresh_table("testdrive.demo2") + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo1;", records=True) + assert results[0]["data"] == data_out + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo2;", records=True) + assert results[0]["data"] == data_out + + +def test_mongodb_copy_filesystem_folder(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer for all files in a folder. + """ + + # Reset two database tables. + cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."books-canonical";') + cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."books-relaxed";') + + # Define source and target URLs. + fs_resource = "file+bson:./tests/io/mongodb/*.ndjson" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive" + + # Run transfer command. + mongodb_copy( + fs_resource, + cratedb_url, + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.books-canonical") + cratedb.database.refresh_table("testdrive.books-relaxed") + + assert cratedb.database.count_records("testdrive.books-canonical") == 4 + assert cratedb.database.count_records("testdrive.books-relaxed") == 4 + + def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb): """ Verify MongoDB Extended JSON -> CrateDB data transfer. From d825f2f9a7f6cc9f8fa010a96fc5672ba0ef99b0 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 11 Sep 2024 12:56:38 +0200 Subject: [PATCH 02/10] MongoDB: Process JSON files from HTTP resource, using `https+bson://` --- CHANGES.md | 1 + cratedb_toolkit/api/main.py | 9 +++++++ cratedb_toolkit/io/mongodb/adapter.py | 35 +++++++++++++++++++++++++-- cratedb_toolkit/io/mongodb/api.py | 2 ++ cratedb_toolkit/util/database.py | 2 +- doc/io/mongodb/loader.md | 13 +++++++--- tests/io/mongodb/test_copy.py | 30 +++++++++++++++++++++++ 7 files changed, 86 insertions(+), 6 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index d3fa3137..f4202029 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,7 @@ ## Unreleased - MongoDB: Unlock processing multiple collections, either from server database, or from filesystem directory +- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://` ## 2024/09/10 v0.0.22 - MongoDB: Rename columns with leading underscores to use double leading underscores diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index bb416cb6..642edf40 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -133,6 +133,15 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf progress=True, ) + elif source_url_obj.scheme.startswith("http"): + if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme: + mongodb_copy_generic( + str(source_url_obj), + target_url, + transformation=transformation, + progress=True, + ) + elif source_url_obj.scheme.startswith("influxdb"): from cratedb_toolkit.io.influxdb import influxdb_copy diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index c082ceba..b01570d6 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -80,7 +80,7 @@ def query(self): @define -class MongoDBFileAdapter(MongoDBAdapterBase): +class MongoDBFilesystemAdapter(MongoDBAdapterBase): _path: Path = field(init=False) def setup(self): @@ -113,6 +113,35 @@ def query(self): return batches(data, self.batch_size) +@define +class MongoDBResourceAdapter(MongoDBAdapterBase): + _url: URL = field(init=False) + + def setup(self): + self._url = self.address.uri + if "+bson" in self._url.scheme: + self._url.scheme = self._url.scheme.replace("+bson", "") + + def get_collections(self) -> t.List[str]: + raise NotImplementedError("HTTP+BSON loader does not support directory inquiry yet") + + def record_count(self, filter_=None) -> int: + return -1 + + def query(self): + if self.offset: + raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") + if self._url.path.endswith(".json") or self._url.path.endswith(".jsonl") or self._url.path.endswith(".ndjson"): + data = pl.read_ndjson( + str(self._url), batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True + ).to_dicts() + elif self._url.path.endswith(".bson"): + raise NotImplementedError("HTTP+BSON loader does not support .bson files yet. SIC") + else: + raise ValueError(f"Unsupported file type: {self._url}") + return batches(data, self.batch_size) + + @define class MongoDBServerAdapter(MongoDBAdapterBase): _mongodb_client: pymongo.MongoClient = field(init=False) @@ -142,7 +171,9 @@ def query(self): def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase: if mongodb_uri.scheme.startswith("file"): - return MongoDBFileAdapter.from_url(mongodb_uri) + return MongoDBFilesystemAdapter.from_url(mongodb_uri) + elif mongodb_uri.scheme.startswith("http"): + return MongoDBResourceAdapter.from_url(mongodb_uri) elif mongodb_uri.scheme.startswith("mongodb"): return MongoDBServerAdapter.from_url(mongodb_uri) raise ValueError("Unable to create MongoDB adapter") diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 05caff63..b72781d1 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -130,6 +130,8 @@ def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = N logger.info(f"Inquiring collections at {source_url}") mongodb_uri = URL(source_url) cratedb_uri = URL(target_url) + if Path(mongodb_uri.path).is_absolute() and mongodb_uri.path[-1] != "/": + mongodb_uri.path += "/" if cratedb_uri.path[-1] != "/": cratedb_uri.path += "/" mongodb_query_parameters = mongodb_uri.query_params diff --git a/cratedb_toolkit/util/database.py b/cratedb_toolkit/util/database.py index 7de7e409..0d4191d8 100644 --- a/cratedb_toolkit/util/database.py +++ b/cratedb_toolkit/util/database.py @@ -403,7 +403,7 @@ def decode_database_table(url: str) -> t.Tuple[str, t.Union[str, None]]: if url_.scheme == "crate" and not database: database = url_.query_params.get("schema") if database is None and table is None: - if url_.scheme.startswith("file"): + if url_.scheme.startswith("file") or url_.scheme.startswith("http"): _, database, table = url_.path.rsplit("/", 2) # If table name is coming from a filesystem, strip suffix, e.g. `books-relaxed.ndjson`. diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index b5403743..d7c3f98d 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -22,6 +22,10 @@ server instances and filesystems. Read files in [MongoDB Extended JSON] or [BSON] format from filesystem. +- `http+bson://` + + Read files in [MongoDB Extended JSON] or [BSON] format from HTTP resource. + ## Install ```shell pip install --upgrade 'cratedb-toolkit[mongodb]' @@ -63,13 +67,16 @@ Load data from MongoDB JSON/BSON files, for example produced by the `mongoexport` or `mongodump` programs. ```shell -# Extended JSON, full path. +# Extended JSON, filesystem, full path. ctk load table "file+bson:///path/to/mongodb-json-files/datasets/books.json" -# Extended JSON, multiple files. +# Extended JSON, HTTP resource. +ctk load table "https+bson://github.com/ozlerhakan/mongodb-json-files/raw/master/datasets/books.json" + +# Extended JSON, filesystem, multiple files. ctk load table "file+bson:///path/to/mongodb-json-files/datasets/*.json" -# BSON, compressed, relative path. +# BSON, filesystem, relative path, compressed. ctk load table "file+bson:./var/data/testdrive/books.bson.gz" ``` diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py index c7cac80a..f960ab90 100644 --- a/tests/io/mongodb/test_copy.py +++ b/tests/io/mongodb/test_copy.py @@ -23,6 +23,10 @@ def test_mongodb_copy_server_database(caplog, cratedb, mongodb): Verify MongoDB -> CrateDB data transfer for all collections in a database. """ + # Reset two database tables. + cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."demo1";') + cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."demo2";') + # Define source and target URLs. mongodb_url = f"{mongodb.get_connection_url()}/testdrive" cratedb_url = f"{cratedb.get_connection_url()}/testdrive" @@ -181,3 +185,29 @@ def test_mongodb_copy_filesystem_bson(caplog, cratedb): ) timestamp_type = type_result[0]["type"] assert timestamp_type == "bigint" + + +def test_mongodb_copy_http_json_relaxed(caplog, cratedb): + """ + Verify MongoDB Extended JSON -> CrateDB data transfer, when source file is on HTTP. + """ + + # Define source and target URLs. + json_resource = "https+bson://github.com/ozlerhakan/mongodb-json-files/raw/master/datasets/books.json" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Run transfer command. + mongodb_copy(json_resource, cratedb_url) + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 431 + + # Verify content in target database. + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo WHERE data['_id'] = 1;", records=True) + assert results[0]["data"]["authors"] == [ + "W. Frank Ableson", + "Charlie Collins", + "Robi Sen", + ] From 42f22ee4ccaec85e454d65adc6a23d3bd2f2421a Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 11 Sep 2024 13:37:04 +0200 Subject: [PATCH 03/10] MongoDB: Filter server collection using MongoDB query expression --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/adapter.py | 18 +++++++++++-- doc/io/mongodb/loader.md | 16 ++++++++--- tests/io/mongodb/test_copy.py | 39 +++++++++++++++++++++++++++ 4 files changed, 68 insertions(+), 6 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f4202029..e1b61217 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,7 @@ - MongoDB: Unlock processing multiple collections, either from server database, or from filesystem directory - MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://` +- MongoDB: Optionally filter server collection using MongoDB query expression ## 2024/09/10 v0.0.22 - MongoDB: Rename columns with leading underscores to use double leading underscores diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index b01570d6..ef75a485 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -1,5 +1,6 @@ import glob import itertools +import json import logging import typing as t from abc import abstractmethod @@ -28,7 +29,7 @@ class MongoDBAdapterBase: database_name: str collection_name: str - _custom_query_parameters = ["batch-size", "limit", "offset"] + _custom_query_parameters = ["batch-size", "filter", "limit", "offset"] @classmethod def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]): @@ -54,6 +55,10 @@ def __attrs_post_init__(self): def batch_size(self) -> int: return int(self.address.uri.query_params.get("batch-size", 500)) + @cached_property + def filter(self) -> t.Union[str, None]: + return json.loads(self.address.uri.query_params.get("filter", "null")) + @cached_property def limit(self) -> int: return int(self.address.uri.query_params.get("limit", 0)) @@ -100,6 +105,8 @@ def record_count(self, filter_=None) -> int: def query(self): if not self._path.exists(): raise FileNotFoundError(f"Resource not found: {self._path}") + if self.filter: + raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader") if self.offset: raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") if self._path.suffix in [".json", ".jsonl", ".ndjson"]: @@ -129,6 +136,8 @@ def record_count(self, filter_=None) -> int: return -1 def query(self): + if self.filter: + raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader") if self.offset: raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") if self._url.path.endswith(".json") or self._url.path.endswith(".jsonl") or self._url.path.endswith(".ndjson"): @@ -165,7 +174,12 @@ def record_count(self, filter_=None) -> int: return self._mongodb_collection.count_documents(filter=filter_) def query(self): - data = self._mongodb_collection.find().batch_size(self.batch_size).skip(self.offset).limit(self.limit) + data = ( + self._mongodb_collection.find(filter=self.filter) + .batch_size(self.batch_size) + .skip(self.offset) + .limit(self.limit) + ) return batches(data, self.batch_size) diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index d7c3f98d..ef760a45 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -91,16 +91,24 @@ The default batch size is 500. You can adjust the value by appending the HTTP URL query parameter `batch-size` to the source URL, like `mongodb+srv://managed.mongodb.net/ticker/stocks?batch-size=5000`. -### Offset -Use the HTTP URL query parameter `offset` on the source URL, like -`&offset=42`, in order to start processing at this record from the -beginning. +### Filter +Use the HTTP URL query parameter `filter` on the source URL, like +`&filter={"exchange": {"$eq": "NASDAQ"}}`, in order to provide a MongoDB +query filter as a JSON string. +It works in the same way like `mongoexport`'s `--query` option. +On more complex query expressions, make sure to properly encode the right +value using URL/Percent Encoding. ### Limit Use the HTTP URL query parameter `limit` on the source URL, like `&limit=100`, in order to limit processing to a total number of records. +### Offset +Use the HTTP URL query parameter `offset` on the source URL, like +`&offset=42`, in order to start processing at this record from the +beginning. + ## Zyp Transformations You can use [Zyp Transformations] to change the shape of the data while being transferred. In order to add it to the pipeline, use the `--transformation` diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py index f960ab90..e8da4494 100644 --- a/tests/io/mongodb/test_copy.py +++ b/tests/io/mongodb/test_copy.py @@ -1,3 +1,4 @@ +import json from copy import deepcopy from unittest import mock @@ -59,6 +60,44 @@ def test_mongodb_copy_server_database(caplog, cratedb, mongodb): assert results[0]["data"] == data_out +def test_mongodb_copy_server_collection_with_filter(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer for a specific collection, with filtering. + """ + + # Define source and target URLs. + filter_expression = json.dumps({"timestamp": {"$gt": 1563051934050}}) + mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo?filter={filter_expression}" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Define data. + data_in = [ + {"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1563051934000}, + {"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1563051934100}, + ] + data_out = deepcopy(data_in) + data_out[0].update({"_id": mock.ANY}) + data_out[1].update({"_id": mock.ANY}) + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo = testdrive.create_collection("demo") + demo.insert_many(data_in) + + # Run transfer command. + mongodb_copy( + mongodb_url, + cratedb_url, + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.demo") + assert cratedb.database.count_records("testdrive.demo") == 1 + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) + assert results[0]["data"] == data_out[1] + + def test_mongodb_copy_filesystem_folder(caplog, cratedb, mongodb): """ Verify MongoDB -> CrateDB data transfer for all files in a folder. From 6030d8363064ea8d293e3324bf6ac9ffe46cef5b Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 12 Sep 2024 14:31:09 +0200 Subject: [PATCH 04/10] MongoDB: Decrease default batch size to 100 --- cratedb_toolkit/io/mongodb/adapter.py | 2 +- doc/io/mongodb/loader.md | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index ef75a485..aeaa264c 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -53,7 +53,7 @@ def __attrs_post_init__(self): @cached_property def batch_size(self) -> int: - return int(self.address.uri.query_params.get("batch-size", 500)) + return int(self.address.uri.query_params.get("batch-size", 100)) @cached_property def filter(self) -> t.Union[str, None]: diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index ef760a45..00324b2b 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -87,8 +87,9 @@ see [](#file-import-tutorial). ## Options ### Batch Size -The default batch size is 500. You can adjust the value by appending the HTTP -URL query parameter `batch-size` to the source URL, like +The default batch size is 100, but for many datasets a much larger batch size +is applicable for most efficient data transfers. You can adjust the value by +appending the HTTP URL query parameter `batch-size` to the source URL, like `mongodb+srv://managed.mongodb.net/ticker/stocks?batch-size=5000`. ### Filter From 86b80245c67ef5eae9e251adf492026730eb752e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 12 Sep 2024 14:33:43 +0200 Subject: [PATCH 05/10] MongoDB: Cleanups. Tests. Hacks. This and that. --- cratedb_toolkit/api/main.py | 37 +++++++++++++++------------ cratedb_toolkit/io/cli.py | 4 ++- cratedb_toolkit/io/mongodb/adapter.py | 4 +-- cratedb_toolkit/io/mongodb/api.py | 10 ++++++-- cratedb_toolkit/io/mongodb/copy.py | 7 +++-- tests/io/mongodb/test_copy.py | 33 ++++++++++++++++++++++-- 6 files changed, 70 insertions(+), 25 deletions(-) diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 642edf40..5fb54898 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -119,14 +119,15 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf if source_url_obj.scheme.startswith("dynamodb"): from cratedb_toolkit.io.dynamodb.api import dynamodb_copy - if not dynamodb_copy(str(source_url_obj), target_url, progress=True): - msg = "Data loading failed" - logger.error(msg) - raise OperationFailed(msg) + if dynamodb_copy(str(source_url_obj), target_url, progress=True): + return True + else: + logger.error("Data loading failed or incomplete") + return False elif source_url_obj.scheme.startswith("file"): if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme: - mongodb_copy_generic( + return mongodb_copy_generic( str(source_url_obj), target_url, transformation=transformation, @@ -135,7 +136,7 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf elif source_url_obj.scheme.startswith("http"): if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme: - mongodb_copy_generic( + return mongodb_copy_generic( str(source_url_obj), target_url, transformation=transformation, @@ -149,19 +150,20 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf if asbool(source_url_obj.query_params.get("ssl")): http_scheme = "https" source_url_obj.scheme = source_url_obj.scheme.replace("influxdb2", http_scheme) - if not influxdb_copy(str(source_url_obj), target_url, progress=True): - msg = "Data loading failed" - logger.error(msg) - raise OperationFailed(msg) + if influxdb_copy(str(source_url_obj), target_url, progress=True): + return True + else: + logger.error("Data loading failed or incomplete") + return False elif source_url_obj.scheme.startswith("mongodb"): if "+cdc" in source_url_obj.scheme: source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "") from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc - mongodb_relay_cdc(str(source_url_obj), target_url, progress=True) + return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True) else: - mongodb_copy_generic( + return mongodb_copy_generic( str(source_url_obj), target_url, transformation=transformation, @@ -170,18 +172,21 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf else: raise NotImplementedError("Importing resource not implemented yet") + return False + def mongodb_copy_generic( source_url: str, target_url: str, transformation: t.Union[Path, None] = None, progress: bool = False ): from cratedb_toolkit.io.mongodb.api import mongodb_copy - if not mongodb_copy( + if mongodb_copy( source_url, target_url, transformation=transformation, progress=progress, ): - msg = "Data loading failed" - logger.error(msg) - raise OperationFailed(msg) + return True + else: + logger.error("Data loading failed or incomplete") + return False diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index 6d188be0..cd03961b 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -1,4 +1,5 @@ import logging +import sys from pathlib import Path import click @@ -85,4 +86,5 @@ def load_table( cluster = StandaloneCluster(address=address) else: raise NotImplementedError("Unable to select backend") - return cluster.load_table(resource=resource, target=target, transformation=transformation) + if not cluster.load_table(resource=resource, target=target, transformation=transformation): + sys.exit(2) diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index aeaa264c..9163321e 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -92,7 +92,7 @@ def setup(self): self._path = Path(self.address.uri.path) def get_collections(self) -> t.List[str]: - return list(glob.glob(str(self._path))) + return sorted(glob.glob(str(self._path))) def record_count(self, filter_=None) -> int: """ @@ -167,7 +167,7 @@ def setup(self): def get_collections(self) -> t.List[str]: database = self._mongodb_client.get_database(self.database_name) - return database.list_collection_names() + return sorted(database.list_collection_names()) def record_count(self, filter_=None) -> int: filter_ = filter_ or {} diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index b72781d1..de8c31d2 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -4,6 +4,7 @@ from pathlib import Path from boltons.urlutils import URL +from polars.exceptions import PanicException from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB @@ -130,7 +131,12 @@ def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = N logger.info(f"Inquiring collections at {source_url}") mongodb_uri = URL(source_url) cratedb_uri = URL(target_url) - if Path(mongodb_uri.path).is_absolute() and mongodb_uri.path[-1] != "/": + # What the hack? + if ( + mongodb_uri.scheme.startswith("mongodb") + and Path(mongodb_uri.path).is_absolute() + and mongodb_uri.path[-1] != "/" + ): mongodb_uri.path += "/" if cratedb_uri.path[-1] != "/": cratedb_uri.path += "/" @@ -156,7 +162,7 @@ def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = N for task in tasks: try: outcome_task = task.start() - except Exception: + except (Exception, PanicException): logger.exception("Task failed") outcome_task = False outcome = outcome and outcome_task diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index 99aadc54..ec894c0a 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -144,11 +144,14 @@ def start(self): result = connection.execute(sa.text(operation.statement), operation.parameters) result_size = result.rowcount if result_size < 0: - raise ValueError("Unable to insert one or more records") + raise IOError("Unable to insert one or more records") records_out += result_size progress_bar.update(n=result_size) except Exception as ex: - logger_on_error(f"Executing operation failed: {ex}\nOperation:\n{operation}") + logger_on_error( + f"Executing operation failed: {ex}\n" + f"Statement: {operation.statement}\nParameters: {str(operation.parameters)[:500]} [...]" + ) if self.on_error == "raise": raise continue diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py index e8da4494..36ac8c1c 100644 --- a/tests/io/mongodb/test_copy.py +++ b/tests/io/mongodb/test_copy.py @@ -1,5 +1,6 @@ import json from copy import deepcopy +from pathlib import Path from unittest import mock import pymongo @@ -98,9 +99,37 @@ def test_mongodb_copy_server_collection_with_filter(caplog, cratedb, mongodb): assert results[0]["data"] == data_out[1] -def test_mongodb_copy_filesystem_folder(caplog, cratedb, mongodb): +def test_mongodb_copy_filesystem_folder_absolute(caplog, cratedb, mongodb): """ - Verify MongoDB -> CrateDB data transfer for all files in a folder. + Verify MongoDB -> CrateDB data transfer for all files in a folder, with relative addressing. + """ + + # Reset two database tables. + cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."books-canonical";') + cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."books-relaxed";') + + # Define source and target URLs. + path = Path("./tests/io/mongodb/*.ndjson").absolute() + fs_resource = f"file+bson://{path}" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive" + + # Run transfer command. + mongodb_copy( + fs_resource, + cratedb_url, + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.books-canonical") + cratedb.database.refresh_table("testdrive.books-relaxed") + + assert cratedb.database.count_records("testdrive.books-canonical") == 4 + assert cratedb.database.count_records("testdrive.books-relaxed") == 4 + + +def test_mongodb_copy_filesystem_folder_relative(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer for all files in a folder, with relative addressing. """ # Reset two database tables. From 05fa39749143dc1ec37ed3207bc9f08107f7940e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 12 Sep 2024 16:58:02 +0200 Subject: [PATCH 06/10] MongoDB: Improve dispatching of server- vs. file-based processing --- cratedb_toolkit/api/main.py | 52 ++++++++----------------------------- 1 file changed, 11 insertions(+), 41 deletions(-) diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 5fb54898..db516915 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -125,24 +125,6 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf logger.error("Data loading failed or incomplete") return False - elif source_url_obj.scheme.startswith("file"): - if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme: - return mongodb_copy_generic( - str(source_url_obj), - target_url, - transformation=transformation, - progress=True, - ) - - elif source_url_obj.scheme.startswith("http"): - if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme: - return mongodb_copy_generic( - str(source_url_obj), - target_url, - transformation=transformation, - progress=True, - ) - elif source_url_obj.scheme.startswith("influxdb"): from cratedb_toolkit.io.influxdb import influxdb_copy @@ -156,37 +138,25 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf logger.error("Data loading failed or incomplete") return False - elif source_url_obj.scheme.startswith("mongodb"): + elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]: if "+cdc" in source_url_obj.scheme: source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "") from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True) else: - return mongodb_copy_generic( - str(source_url_obj), + from cratedb_toolkit.io.mongodb.api import mongodb_copy + + if mongodb_copy( + source_url_obj, target_url, transformation=transformation, progress=True, - ) + ): + return True + else: + logger.error("Data loading failed or incomplete") + return False + else: raise NotImplementedError("Importing resource not implemented yet") - - return False - - -def mongodb_copy_generic( - source_url: str, target_url: str, transformation: t.Union[Path, None] = None, progress: bool = False -): - from cratedb_toolkit.io.mongodb.api import mongodb_copy - - if mongodb_copy( - source_url, - target_url, - transformation=transformation, - progress=progress, - ): - return True - else: - logger.error("Data loading failed or incomplete") - return False From e129e26d1872fdfc13dd2999c5397d2f68a4b38b Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 12 Sep 2024 17:07:36 +0200 Subject: [PATCH 07/10] MongoDB: Avoid URL object <-> string conversions on a few spots --- cratedb_toolkit/io/mongodb/api.py | 22 +++++++++++++++------- cratedb_toolkit/io/mongodb/copy.py | 6 +++--- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index de8c31d2..b2017311 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -90,7 +90,12 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi return True -def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = None, progress: bool = False): +def mongodb_copy( + source_url: t.Union[str, URL], + target_url: t.Union[str, URL], + transformation: t.Union[Path, None] = None, + progress: bool = False, +): """ Transfer MongoDB collection using translator component. @@ -102,6 +107,9 @@ def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = N logger.info(f"mongodb_copy. source={source_url}, target={target_url}") + source_url = URL(source_url) + target_url = URL(target_url) + # Optionally configure transformations. tm = None if transformation: @@ -110,9 +118,9 @@ def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = N tasks = [] has_table = True - if "*" in source_url: + if "*" in source_url.path: has_table = False - mongodb_address = DatabaseAddress.from_string(source_url) + mongodb_address = DatabaseAddress(source_url) mongodb_uri, mongodb_collection_address = mongodb_address.decode() if mongodb_collection_address.table is None: has_table = False @@ -129,8 +137,8 @@ def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = N ) else: logger.info(f"Inquiring collections at {source_url}") - mongodb_uri = URL(source_url) - cratedb_uri = URL(target_url) + mongodb_uri = source_url + cratedb_uri = target_url # What the hack? if ( mongodb_uri.scheme.startswith("mongodb") @@ -151,8 +159,8 @@ def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = N cratedb_uri_effective = cratedb_uri.navigate(Path(collection_path).stem) tasks.append( MongoDBFullLoad( - mongodb_url=str(mongodb_uri_effective), - cratedb_url=str(cratedb_uri_effective), + mongodb_url=mongodb_uri_effective, + cratedb_url=cratedb_uri_effective, tm=tm, progress=progress, ) diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index ec894c0a..893a4282 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -69,8 +69,8 @@ class MongoDBFullLoad: def __init__( self, - mongodb_url: str, - cratedb_url: str, + mongodb_url: t.Union[str, URL], + cratedb_url: t.Union[str, URL], tm: t.Union[TransformationManager, None], on_error: t.Literal["ignore", "raise"] = "raise", progress: bool = False, @@ -83,7 +83,7 @@ def __init__( self.mongodb_adapter = mongodb_adapter_factory(self.mongodb_uri) # Decode database URL: CrateDB. - self.cratedb_address = DatabaseAddress.from_string(cratedb_url) + self.cratedb_address = DatabaseAddress(self.cratedb_uri) self.cratedb_sqlalchemy_url, self.cratedb_table_address = self.cratedb_address.decode() cratedb_table = self.cratedb_table_address.fullname From 9e5246486993ffc2537551dd5f0ecabc665835d7 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 13 Sep 2024 00:08:00 +0200 Subject: [PATCH 08/10] MongoDB: Improve URL computation when transferring whole databases --- cratedb_toolkit/io/mongodb/api.py | 42 ++++++++++++++--------------- cratedb_toolkit/model.py | 44 +++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 22 deletions(-) diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index b2017311..83a9f30f 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -11,7 +11,7 @@ from cratedb_toolkit.io.mongodb.copy import MongoDBFullLoad from cratedb_toolkit.io.mongodb.core import export, extract, translate from cratedb_toolkit.io.mongodb.transform import TransformationManager -from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.model import AddressPair, DatabaseAddress from cratedb_toolkit.util.cr8 import cr8_insert_json from cratedb_toolkit.util.database import DatabaseAdapter @@ -115,8 +115,7 @@ def mongodb_copy( if transformation: tm = TransformationManager(path=transformation) - tasks = [] - + # Check if source address URL includes a table name or not. has_table = True if "*" in source_url.path: has_table = False @@ -125,7 +124,12 @@ def mongodb_copy( if mongodb_collection_address.table is None: has_table = False - # Invoke `full-load` procedure. + # Build list of tasks. Either a single one when transferring a single + # collection into a table, or multiple ones when transferring multiple + # collections. + tasks = [] + + # `full-load` procedure, single collection. if has_table: tasks.append( MongoDBFullLoad( @@ -135,32 +139,26 @@ def mongodb_copy( progress=progress, ) ) + + # `full-load` procedure, multiple collections. else: logger.info(f"Inquiring collections at {source_url}") - mongodb_uri = source_url - cratedb_uri = target_url - # What the hack? - if ( - mongodb_uri.scheme.startswith("mongodb") - and Path(mongodb_uri.path).is_absolute() - and mongodb_uri.path[-1] != "/" - ): - mongodb_uri.path += "/" - if cratedb_uri.path[-1] != "/": - cratedb_uri.path += "/" - mongodb_query_parameters = mongodb_uri.query_params - mongodb_adapter = mongodb_adapter_factory(mongodb_uri) + address_pair_root = AddressPair(source_url=source_url, target_url=target_url) + + mongodb_adapter = mongodb_adapter_factory(address_pair_root.source_url) collections = mongodb_adapter.get_collections() logger.info(f"Discovered collections: {len(collections)}") logger.debug(f"Processing collections: {collections}") + for collection_path in collections: - mongodb_uri_effective = mongodb_uri.navigate(Path(collection_path).name) - mongodb_uri_effective.query_params = mongodb_query_parameters - cratedb_uri_effective = cratedb_uri.navigate(Path(collection_path).stem) + address_pair = address_pair_root.navigate( + source_path=Path(collection_path).name, + target_path=Path(collection_path).stem, + ) tasks.append( MongoDBFullLoad( - mongodb_url=mongodb_uri_effective, - cratedb_url=cratedb_uri_effective, + mongodb_url=address_pair.source_url, + cratedb_url=address_pair.target_url, tm=tm, progress=progress, ) diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index 66f443d7..2360ecd9 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -1,7 +1,10 @@ import dataclasses import typing as t from copy import deepcopy +from pathlib import Path +from attr import Factory +from attrs import define from boltons.urlutils import URL @@ -120,3 +123,44 @@ class InputOutputResource: url: str format: t.Optional[str] = None # noqa: A003 compression: t.Optional[str] = None + + +@define +class AddressPair: + """ + Manage two URL instances, specifically a pair of source/target URLs, + where target is mostly a CrateDB Server, while source is any. + """ + + source_url: URL + target_url: URL + + _source_url_query_parameters: t.Dict[str, t.Any] = Factory(dict) + _target_url_query_parameters: t.Dict[str, t.Any] = Factory(dict) + + __SERVER_SCHEMES__ = ["http", "https", "mongodb", "mongodb+srv"] + + def navigate(self, source_path: str, target_path: str) -> "AddressPair": + source_url_query_parameters = self.source_url.query_params + target_url_query_parameters = self.target_url.query_params + + source_url = URL(str(self.source_url)) + target_url = URL(str(self.target_url)) + + # Q: What the hack? + # A: It makes subsequent `.navigate()` operations work. + if ( + source_url.scheme in self.__SERVER_SCHEMES__ + and Path(source_url.path).is_absolute() + and source_url.path[-1] != "/" + ): + source_url.path += "/" + if target_url.path[-1] != "/": + target_url.path += "/" + + source_url = source_url.navigate(f"./{source_path}") + source_url.query_params = source_url_query_parameters + target_url = target_url.navigate(f"./{target_path}") + target_url.query_params = target_url_query_parameters + + return AddressPair(source_url, target_url) From 2a5df04467b0578527695026838daaed61ad9749 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 13 Sep 2024 11:45:16 +0200 Subject: [PATCH 09/10] Model: Use standard `deepcopy` method to clone `boltons.urlutils.URL` --- cratedb_toolkit/model.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index 2360ecd9..e90d71b5 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -144,8 +144,8 @@ def navigate(self, source_path: str, target_path: str) -> "AddressPair": source_url_query_parameters = self.source_url.query_params target_url_query_parameters = self.target_url.query_params - source_url = URL(str(self.source_url)) - target_url = URL(str(self.target_url)) + source_url = deepcopy(self.source_url) + target_url = deepcopy(self.target_url) # Q: What the hack? # A: It makes subsequent `.navigate()` operations work. From 94b52a8c1432c4e21008401ed0dfa8f94f0a62cc Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 13 Sep 2024 12:09:43 +0200 Subject: [PATCH 10/10] Model: Improve implementation of `AddressPair.navigate()` - Do not use the fundamental `.navigate()` method, as it needs too many workarounds. - Do not store and copy query parameters, because the implementation does not use `.navigate()` any longer. - Manipulate the `.path` property directly instead, computing it using the canonical `urljoin` function. - Adjustments about missing trailing slashes still need to take place. --- cratedb_toolkit/model.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index e90d71b5..1fe3a91a 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -1,7 +1,7 @@ import dataclasses import typing as t from copy import deepcopy -from pathlib import Path +from urllib.parse import urljoin from attr import Factory from attrs import define @@ -141,26 +141,21 @@ class AddressPair: __SERVER_SCHEMES__ = ["http", "https", "mongodb", "mongodb+srv"] def navigate(self, source_path: str, target_path: str) -> "AddressPair": - source_url_query_parameters = self.source_url.query_params - target_url_query_parameters = self.target_url.query_params - source_url = deepcopy(self.source_url) target_url = deepcopy(self.target_url) # Q: What the hack? - # A: It makes subsequent `.navigate()` operations work. - if ( - source_url.scheme in self.__SERVER_SCHEMES__ - and Path(source_url.path).is_absolute() - and source_url.path[-1] != "/" - ): + # A: Adjustments about missing trailing slashes, business as usual. + # It makes subsequent `.navigate()` operations work. + # Remark: It is not applicable for filesystem paths including wildcards, + # like `./datasets/*.ndjson`. In this case, `.navigate()` should + # strip the `*.ndjson` part, and replace it by the designated label. + if source_url.scheme in self.__SERVER_SCHEMES__ and source_url.path[-1] != "/": source_url.path += "/" if target_url.path[-1] != "/": target_url.path += "/" - source_url = source_url.navigate(f"./{source_path}") - source_url.query_params = source_url_query_parameters - target_url = target_url.navigate(f"./{target_path}") - target_url.query_params = target_url_query_parameters + source_url.path = urljoin(source_url.path, source_path) + target_url.path = urljoin(target_url.path, target_path) return AddressPair(source_url, target_url)