Skip to content

Commit

Permalink
MongoDB: Improve error handling wrt. bulk operations vs. usability
Browse files Browse the repository at this point in the history
In order to have both, efficient bulk insert operations, and on-the-spot
error messages on records that fail to insert, let's introduce a
two-stage approach:

First, try to insert a batch. When it fails, determine invalid records
and insert them one-by-one, in order to relay corresponding error
messages to the user.
  • Loading branch information
amotl committed Sep 13, 2024
1 parent a25127e commit 81e75f6
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
or from filesystem directory
- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://`
- MongoDB: Optionally filter server collection using MongoDB query expression
- MongoDB: Improve error handling wrt. bulk operations vs. usability

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
180 changes: 180 additions & 0 deletions cratedb_toolkit/io/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# TODO: Maybe refactor to `sqlalchemy-cratedb` or `commons-codec` on another iteration?
import typing as t
from functools import cached_property

import sqlalchemy as sa
from attr import Factory
from attrs import define
from commons_codec.model import SQLOperation
from pympler.asizeof import asizeof
from sqlalchemy.exc import ProgrammingError
from tqdm import tqdm

from cratedb_toolkit.util.database import logger


class BulkResultItem(t.TypedDict):
"""
Define the shape of a CrateDB bulk request response item.
"""

rowcount: int


@define
class BulkResponse:
"""
Manage CrateDB bulk request responses.
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
TODO: Think about refactoring this to `sqlalchemy_cratedb.support`.
"""

parameters: t.Union[t.List[t.Dict[str, t.Any]], None]
cratedb_bulk_result: t.Union[t.List[BulkResultItem], None]

@cached_property
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
"""
Compute list of failed records.
CrateDB signals failed insert using `rowcount=-2`.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
"""
if self.parameters is None or self.cratedb_bulk_result is None:
return []
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.parameters, self.cratedb_bulk_result):
if status["rowcount"] == -2:
errors.append(record)
return errors

@cached_property
def parameter_count(self) -> int:
"""
Compute bulk size / length of parameter list.
"""
if not self.parameters:
return 0
return len(self.parameters)

@cached_property
def success_count(self) -> int:
"""
Compute number of succeeding records within a batch.
"""
return self.parameter_count - self.failed_count

@cached_property
def failed_count(self) -> int:
"""
Compute number of failed records within a batch.
"""
return len(self.failed_records)


@define
class BulkMetrics:
"""
Manage a few details for a `BulkProcessor` task.
"""

count_success_total: int = 0
count_error_total: int = 0
bytes_write_total: int = 0
bytes_error_total: int = 0
rate_current: int = 0
rate_max: int = 0


@define
class BulkProcessor:
"""
Generic driver to run a bulk operation against CrateDB, which can fall back to record-by-record operation.
It aims to provide a combination of both performance/efficiency by using bulk operations,
and also good usability and on-the-spot error message for records that fail to insert.
Background: This is a canonical client-side API wrapper for CrateDB's bulk operations HTTP endpoint.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
"""

connection: sa.Connection
data: t.Iterable[t.List[t.Dict[str, t.Any]]]
batch_to_operation: t.Callable[[t.List[t.Dict[str, t.Any]]], SQLOperation]
progress_bar: t.Union[tqdm, None] = None
on_error: t.Literal["ignore", "raise"] = "ignore"
debug: bool = False

_metrics: BulkMetrics = Factory(BulkMetrics)

@cached_property
def log_level(self):
if self.debug:
return logger.exception
else:
return logger.warning

def start(self) -> BulkMetrics:
# Acquire batches of documents, convert to SQL operations, and submit to CrateDB.
for batch in self.data:
current_batch_size = len(batch)

self.progress_bar and self.progress_bar.set_description("ACQUIRE")

try:
operation = self.batch_to_operation(batch)
except Exception as ex:
self.log_level(f"Computing query failed: {ex}")
if self.on_error == "raise":
raise
continue

self._metrics.bytes_write_total += asizeof(operation)
statement = sa.text(operation.statement)

# Submit operation to CrateDB, using `bulk_args`.
self.progress_bar and self.progress_bar.set_description("SUBMIT ")
try:
cursor = self.connection.execute(statement=statement, parameters=operation.parameters)
self.connection.commit()
cratedb_bulk_result = getattr(cursor.context, "last_executemany_result", None)
bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result)
failed_records = bulk_response.failed_records
count_success_local = bulk_response.success_count
self._metrics.count_success_total += bulk_response.success_count
self.progress_bar and self.progress_bar.update(n=bulk_response.success_count)

# When a batch is of size one, an exception is raised.
# Just signal the same condition as if a batch would have failed.
except ProgrammingError:
failed_records = [operation.parameters]
count_success_local = 0

# When bulk operations fail, try inserting failed records record-by-record,
# in order to relay proper error messages to the user.
if failed_records:
logger.warning(
f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. "
f"Falling back to per-record operations."
)
for record in failed_records:
try:
self.connection.execute(statement=statement, parameters=record)
self.connection.commit()
self._metrics.count_success_total += 1
except Exception as ex:
logger.warning(f"Operation failed: {ex}")
logger.debug(f"Failing record: {record}")
self._metrics.count_error_total += 1
self._metrics.bytes_error_total += asizeof(record)
if self.on_error == "raise":
raise
self.progress_bar and self.progress_bar.update(n=1)

self.progress_bar and self.progress_bar.close()

return self._metrics
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ def mongodb_copy(
for task in tasks:
try:
outcome_task = task.start()
except (Exception, PanicException):
logger.exception("Task failed")
except (Exception, PanicException) as ex:
logger.error(f"Task failed: {ex}")
outcome_task = False
outcome = outcome and outcome_task

Expand Down
67 changes: 24 additions & 43 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
from tqdm.contrib.logging import logging_redirect_tqdm
from zyp.model.collection import CollectionAddress

from cratedb_toolkit.io.core import BulkProcessor
from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.export import CrateDBConverter
from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -44,9 +46,7 @@ def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperat
"""
Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents.
"""
if isinstance(data, Cursor):
data = list(data)
if not isinstance(data, list):
if not isinstance(data, Cursor) and not isinstance(data, list):
data = [data]

# Define SQL INSERT statement.
Expand All @@ -72,10 +72,12 @@ def __init__(
mongodb_url: t.Union[str, URL],
cratedb_url: t.Union[str, URL],
tm: t.Union[TransformationManager, None],
on_error: t.Literal["ignore", "raise"] = "raise",
on_error: t.Literal["ignore", "raise"] = "ignore",
progress: bool = False,
debug: bool = True,
):
monkeypatch_executemany()

self.mongodb_uri = URL(mongodb_url)
self.cratedb_uri = URL(cratedb_url)

Expand Down Expand Up @@ -114,51 +116,30 @@ def start(self):
logger.info(f"Starting MongoDBFullLoad. source={self.mongodb_uri}, target={self.cratedb_uri}")
records_in = self.mongodb_adapter.record_count()
logger.info(f"Source: MongoDB {self.mongodb_adapter.address} count={records_in}")
logger_on_error = logger.warning
if self.debug:
logger_on_error = logger.exception
with self.cratedb_adapter.engine.connect() as connection, logging_redirect_tqdm():
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
records_out: int = 0

# Acquire batches of documents, convert to SQL operations, and submit to CrateDB.
for documents in self.mongodb_adapter.query():
progress_bar.set_description("ACQUIRE")

try:
operation = self.translator.to_sql(documents)
except Exception as ex:
logger_on_error(f"Computing query failed: {ex}")
if self.on_error == "raise":
raise
continue

# Submit operation to CrateDB.
progress_bar.set_description("SUBMIT ")
try:
result = connection.execute(sa.text(operation.statement), operation.parameters)
result_size = result.rowcount
if result_size < 0:
raise IOError("Unable to insert one or more records")
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(
f"Executing operation failed: {ex}\n"
f"Statement: {operation.statement}\nParameters: {str(operation.parameters)[:500]} [...]"
)
if self.on_error == "raise":
raise
continue

progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out == 0:

processor = BulkProcessor(
connection=connection,
data=self.mongodb_adapter.query(),
batch_to_operation=self.translator.to_sql,
progress_bar=progress_bar,
on_error=self.on_error,
debug=self.debug,
)
metrics = processor.start()
logger.info(f"Bulk processor metrics: {metrics}")

logger.info(
"Number of records written: "
f"success={metrics.count_success_total}, error={metrics.count_error_total}"
)
if metrics.count_success_total == 0:
logger.warning("No data has been copied")

return True
Empty file.
19 changes: 19 additions & 0 deletions cratedb_toolkit/sqlalchemy/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from sqlalchemy_cratedb import dialect


def do_executemany(self, cursor, statement, parameters, context=None):
"""
Improved version of `do_executemany` that stores its response into the request context instance.
TODO: Refactor this to `sqlalchemy_cratedb.CrateDialect`.
"""
result = cursor.executemany(statement, parameters)
if context is not None:
context.last_executemany_result = result


def monkeypatch_executemany():
"""
Enable improved version of `do_executemany`.
"""
dialect.do_executemany = do_executemany
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ dependencies = [
'importlib-metadata; python_version < "3.8"',
'importlib-resources; python_version < "3.9"',
"polars<1.7",
"pympler<1.2",
"python-dateutil<3",
"python-dotenv<2",
"python-slugify<9",
Expand Down
3 changes: 3 additions & 0 deletions tests/io/mongodb/mixed.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_id":1,"name":"Foo","date":{"$date":"2011-01-14T08:00:00Z"}}
{"_id":2,"name":"Bar","date":{"$date":"2011-01-15T08:00:00Z"},"nested_array":[[1,2]]}
{"_id":3,"name":"Baz","date":{"$date":"2011-01-16T08:00:00Z"}}
22 changes: 21 additions & 1 deletion tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def test_mongodb_copy_filesystem_folder_relative(caplog, cratedb, mongodb):
assert cratedb.database.count_records("testdrive.books-relaxed") == 4


def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb):
def test_mongodb_copy_filesystem_json_relaxed_success(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer.
"""
Expand Down Expand Up @@ -187,6 +187,26 @@ def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb):
assert timestamp_type == "bigint"


def test_mongodb_copy_filesystem_json_relaxed_warning(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer, which should omit a warning on an invalid record.
"""

# Define source and target URLs.
json_resource = "file+bson:./tests/io/mongodb/mixed.ndjson"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Run transfer command.
mongodb_copy(json_resource, cratedb_url)

# Verify metadata in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 2

assert "Dynamic nested arrays are not supported" in caplog.text


def test_mongodb_copy_filesystem_json_canonical(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer.
Expand Down

0 comments on commit 81e75f6

Please sign in to comment.