Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB Full: Refactor transformation subsystem to commons-codec #269

Merged
merged 2 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@


## Unreleased
- MongoDB Full: Refactor transformation subsystem to `commons-codec`
- MongoDB: Update to commons-codec v0.0.16

## 2024/09/16 v0.0.23
- MongoDB: Unlock processing multiple collections, either from server database,
Expand Down
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/mongodb/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import pymongo
import sqlalchemy as sa
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB
from commons_codec.transform.mongodb import MongoDBCDCTranslator

from cratedb_toolkit.util import DatabaseAdapter

Expand All @@ -35,7 +35,7 @@
self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name)
self.cdc = MongoDBCDCTranslator(table_name=self.table_name)

Check warning on line 38 in cratedb_toolkit/io/mongodb/cdc.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/cdc.py#L38

Added line #L38 was not covered by tests

def start(self):
"""
Expand Down
49 changes: 3 additions & 46 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@

import sqlalchemy as sa
from boltons.urlutils import URL
from commons_codec.model import SQLOperation
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB
from pymongo.cursor import Cursor
from commons_codec.transform.mongodb import MongoDBCrateDBConverter, MongoDBFullLoadTranslator
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from zyp.model.collection import CollectionAddress

from cratedb_toolkit.io.core import BulkProcessor
from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.export import CrateDBConverter
from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany
Expand All @@ -23,45 +19,6 @@
logger = logging.getLogger(__name__)


class MongoDBFullLoadTranslator(MongoDBCDCTranslatorCrateDB):
"""
Translate a MongoDB document into a CrateDB document.
"""

def __init__(self, table_name: str, converter: CrateDBConverter, tm: TransformationManager = None):
super().__init__(table_name=table_name)
self.converter = converter
self.tm = tm

@staticmethod
def get_document_key(record: t.Dict[str, t.Any]) -> str:
"""
Return value of document key (MongoDB document OID) from CDC record.

"documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")}
"""
return record["_id"]

def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperation:
"""
Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents.
"""
if not isinstance(data, Cursor) and not isinstance(data, list):
data = [data]

# Define SQL INSERT statement.
sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);"

# Converge multiple MongoDB documents into SQL parameters for `executemany` operation.
parameters: t.List[DocumentDict] = []
for document in data:
record = self.converter.convert(self.decode_bson(document))
oid: str = self.get_document_key(record)
parameters.append({"oid": oid, "record": record})

return SQLOperation(sql, parameters)


class MongoDBFullLoad:
"""
Copy MongoDB collection into CrateDB table.
Expand Down Expand Up @@ -102,8 +59,8 @@ def __init__(
transformation = tm.project.get(address=address)
except KeyError:
pass
self.converter = CrateDBConverter(transformation=transformation)
self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter, tm=tm)
self.converter = MongoDBCrateDBConverter(transformation=transformation)
self.translator = MongoDBFullLoadTranslator(table_name=self.cratedb_table, converter=self.converter)

self.on_error = on_error
self.progress = progress
Expand Down
81 changes: 2 additions & 79 deletions cratedb_toolkit/io/mongodb/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,102 +24,25 @@
Export the documents from a MongoDB collection as JSON, to be ingested into CrateDB.
"""

import base64
import calendar
import logging
import typing as t
from uuid import UUID

import bsonjs
import dateutil.parser as dateparser
import orjson as json
import pymongo.collection
from attr import Factory
from attrs import define
from zyp.model.collection import CollectionTransformation
from commons_codec.transform.mongodb import MongoDBCrateDBConverter

from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.io.mongodb.util import sanitize_field_names

logger = logging.getLogger(__name__)


def date_converter(value):
if isinstance(value, int):
return value
dt = dateparser.parse(value)
return calendar.timegm(dt.utctimetuple()) * 1000


def timestamp_converter(value):
if len(str(value)) <= 10:
return value * 1000
return value


type_converter = {
"date": date_converter,
"timestamp": timestamp_converter,
"undefined": lambda x: None,
}


@define
class CrateDBConverter:
transformation: CollectionTransformation = Factory(CollectionTransformation)

def convert(self, data: DocumentDict) -> t.Dict[str, t.Any]:
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
"""
return self.extract_value(data)

def extract_value(self, value: t.Any, parent_type: t.Optional[str] = None) -> t.Any:
"""
Decode MongoDB Extended JSON.

- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/
"""
if isinstance(value, dict):
# Custom adjustments to compensate shape anomalies in source data.
self.apply_special_treatments(value)
if len(value) == 1:
if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]:
decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"])))
return self.extract_value(decoded, parent_type)
for k, v in value.items():
if k.startswith("$"):
return self.extract_value(v, k.lstrip("$"))
return {k.lstrip("$"): self.extract_value(v, parent_type) for (k, v) in value.items()}
if isinstance(value, list):
return [self.extract_value(v, parent_type) for v in value]
if parent_type:
converter = type_converter.get(parent_type)
if converter:
return converter(value)
return value

def apply_special_treatments(self, value: t.Any):
"""
Apply special treatments to value that can't be described otherwise up until now.
# Ignore certain items including anomalies that are not resolved, yet.

TODO: Needs an integration test feeding two records instead of just one.
"""

if self.transformation is None or self.transformation.treatment is None:
return None

return self.transformation.treatment.apply(value)


def convert(d):
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
"""
converter = CrateDBConverter()
converter = MongoDBCrateDBConverter()

Check warning on line 45 in cratedb_toolkit/io/mongodb/export.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/export.py#L45

Added line #L45 was not covered by tests
newdict = {}
for k, v in sanitize_field_names(d).items():
newdict[k] = converter.convert(v)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ kinesis = [
"lorrystream[carabas]>=0.0.6",
]
mongodb = [
"commons-codec[mongodb,zyp]>=0.0.15",
"commons-codec[mongodb,zyp]>=0.0.16",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down
4 changes: 1 addition & 3 deletions tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,7 @@ def test_mongodb_copy_filesystem_json_canonical(caplog, cratedb):
"SELECT pg_typeof(data['publishedDate']) AS type FROM testdrive.demo;", records=True
)
timestamp_type = type_result[0]["type"]

# FIXME: Why does the "canonical format" yield worse results?
assert timestamp_type == "text"
assert timestamp_type == "bigint"
Comment on lines -241 to +240
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent. 🍀



def test_mongodb_copy_filesystem_bson(caplog, cratedb):
Expand Down
107 changes: 0 additions & 107 deletions tests/io/mongodb/test_export.py

This file was deleted.