Skip to content

Commit

Permalink
MongoDB: Cleanups. Tests. Hacks. This and that.
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 12, 2024
1 parent 8b9f508 commit b35440f
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 25 deletions.
37 changes: 21 additions & 16 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,15 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
if source_url_obj.scheme.startswith("dynamodb"):
from cratedb_toolkit.io.dynamodb.api import dynamodb_copy

if not dynamodb_copy(str(source_url_obj), target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
if dynamodb_copy(str(source_url_obj), target_url, progress=True):
return True

Check warning on line 123 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L122-L123

Added lines #L122 - L123 were not covered by tests
else:
logger.error("Data loading failed or incomplete")
return False

Check warning on line 126 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L125-L126

Added lines #L125 - L126 were not covered by tests

elif source_url_obj.scheme.startswith("file"):
if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme:
mongodb_copy_generic(
return mongodb_copy_generic(

Check warning on line 130 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L130

Added line #L130 was not covered by tests
str(source_url_obj),
target_url,
transformation=transformation,
Expand All @@ -135,7 +136,7 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf

elif source_url_obj.scheme.startswith("http"):
if "+bson" in source_url_obj.scheme or "+mongodb" in source_url_obj.scheme:
mongodb_copy_generic(
return mongodb_copy_generic(

Check warning on line 139 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L138-L139

Added lines #L138 - L139 were not covered by tests
str(source_url_obj),
target_url,
transformation=transformation,
Expand All @@ -149,19 +150,20 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
if asbool(source_url_obj.query_params.get("ssl")):
http_scheme = "https"
source_url_obj.scheme = source_url_obj.scheme.replace("influxdb2", http_scheme)
if not influxdb_copy(str(source_url_obj), target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
if influxdb_copy(str(source_url_obj), target_url, progress=True):
return True

Check warning on line 154 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L153-L154

Added lines #L153 - L154 were not covered by tests
else:
logger.error("Data loading failed or incomplete")
return False

Check warning on line 157 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L156-L157

Added lines #L156 - L157 were not covered by tests

elif source_url_obj.scheme.startswith("mongodb"):
if "+cdc" in source_url_obj.scheme:
source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "")
from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

mongodb_relay_cdc(str(source_url_obj), target_url, progress=True)
return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True)

Check warning on line 164 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L164

Added line #L164 was not covered by tests
else:
mongodb_copy_generic(
return mongodb_copy_generic(
str(source_url_obj),
target_url,
transformation=transformation,
Expand All @@ -170,18 +172,21 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
else:
raise NotImplementedError("Importing resource not implemented yet")

return False

Check warning on line 175 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L175

Added line #L175 was not covered by tests


def mongodb_copy_generic(
source_url: str, target_url: str, transformation: t.Union[Path, None] = None, progress: bool = False
):
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(
if mongodb_copy(
source_url,
target_url,
transformation=transformation,
progress=progress,
):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
return True
else:
logger.error("Data loading failed or incomplete")
return False

Check warning on line 192 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L191-L192

Added lines #L191 - L192 were not covered by tests
4 changes: 3 additions & 1 deletion cratedb_toolkit/io/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import sys
from pathlib import Path

import click
Expand Down Expand Up @@ -85,4 +86,5 @@ def load_table(
cluster = StandaloneCluster(address=address)
else:
raise NotImplementedError("Unable to select backend")
return cluster.load_table(resource=resource, target=target, transformation=transformation)
if not cluster.load_table(resource=resource, target=target, transformation=transformation):
sys.exit(2)

Check warning on line 90 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L90

Added line #L90 was not covered by tests
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def setup(self):
self._path = Path(self.address.uri.path)

def get_collections(self) -> t.List[str]:
return list(glob.glob(str(self._path)))
return sorted(glob.glob(str(self._path)))

def record_count(self, filter_=None) -> int:
"""
Expand Down Expand Up @@ -167,7 +167,7 @@ def setup(self):

def get_collections(self) -> t.List[str]:
database = self._mongodb_client.get_database(self.database_name)
return database.list_collection_names()
return sorted(database.list_collection_names())

def record_count(self, filter_=None) -> int:
filter_ = filter_ or {}
Expand Down
10 changes: 8 additions & 2 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path

from boltons.urlutils import URL
from polars.exceptions import PanicException

from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
Expand Down Expand Up @@ -130,7 +131,12 @@ def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = N
logger.info(f"Inquiring collections at {source_url}")
mongodb_uri = URL(source_url)
cratedb_uri = URL(target_url)
if Path(mongodb_uri.path).is_absolute() and mongodb_uri.path[-1] != "/":
# What the hack?
if (
mongodb_uri.scheme.startswith("mongodb")
and Path(mongodb_uri.path).is_absolute()
and mongodb_uri.path[-1] != "/"
):
mongodb_uri.path += "/"
if cratedb_uri.path[-1] != "/":
cratedb_uri.path += "/"
Expand All @@ -156,7 +162,7 @@ def mongodb_copy(source_url, target_url, transformation: t.Union[Path, None] = N
for task in tasks:
try:
outcome_task = task.start()
except Exception:
except (Exception, PanicException):
logger.exception("Task failed")
outcome_task = False

Check warning on line 167 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L165-L167

Added lines #L165 - L167 were not covered by tests
outcome = outcome and outcome_task
Expand Down
7 changes: 5 additions & 2 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,14 @@ def start(self):
result = connection.execute(sa.text(operation.statement), operation.parameters)
result_size = result.rowcount
if result_size < 0:
raise ValueError("Unable to insert one or more records")
raise IOError("Unable to insert one or more records")

Check warning on line 147 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L147

Added line #L147 was not covered by tests
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(f"Executing operation failed: {ex}\nOperation:\n{operation}")
logger_on_error(

Check warning on line 151 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L151

Added line #L151 was not covered by tests
f"Executing operation failed: {ex}\n"
f"Statement: {operation.statement}\nParameters: {str(operation.parameters)[:500]} [...]"
)
if self.on_error == "raise":
raise
continue
Expand Down
33 changes: 31 additions & 2 deletions tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from copy import deepcopy
from pathlib import Path
from unittest import mock

import pymongo
Expand Down Expand Up @@ -98,9 +99,37 @@ def test_mongodb_copy_server_collection_with_filter(caplog, cratedb, mongodb):
assert results[0]["data"] == data_out[1]


def test_mongodb_copy_filesystem_folder(caplog, cratedb, mongodb):
def test_mongodb_copy_filesystem_folder_absolute(caplog, cratedb, mongodb):
"""
Verify MongoDB -> CrateDB data transfer for all files in a folder.
Verify MongoDB -> CrateDB data transfer for all files in a folder, with relative addressing.
"""

# Reset two database tables.
cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."books-canonical";')
cratedb.database.run_sql('DROP TABLE IF EXISTS testdrive."books-relaxed";')

# Define source and target URLs.
path = Path("./tests/io/mongodb/*.ndjson").absolute()
fs_resource = f"file+bson://{path}"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive"

# Run transfer command.
mongodb_copy(
fs_resource,
cratedb_url,
)

# Verify data in target database.
cratedb.database.refresh_table("testdrive.books-canonical")
cratedb.database.refresh_table("testdrive.books-relaxed")

assert cratedb.database.count_records("testdrive.books-canonical") == 4
assert cratedb.database.count_records("testdrive.books-relaxed") == 4


def test_mongodb_copy_filesystem_folder_relative(caplog, cratedb, mongodb):
"""
Verify MongoDB -> CrateDB data transfer for all files in a folder, with relative addressing.
"""

# Reset two database tables.
Expand Down

0 comments on commit b35440f

Please sign in to comment.