Skip to content

Commit

Permalink
Fix conversion slowdown (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
BTheunissen authored Nov 21, 2023
1 parent f335db7 commit 2201d4e
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 135 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "shaped-target-clickhouse"
version = "0.1.7"
version = "0.1.8"
description = "`target-clickhouse` is a Singer target for clickhouse, built with the Meltano Singer SDK."
readme = "README.md"
authors = ["Ben Theunissen"]
Expand Down
82 changes: 38 additions & 44 deletions target_clickhouse/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

from __future__ import annotations

import logging
from typing import Any, Iterable

import jsonschema.exceptions as jsonschema_exceptions
import simplejson as json
import sqlalchemy
from jsonschema import ValidationError
from pendulum import now
from singer_sdk.sinks import SQLSink
from sqlalchemy.sql.expression import bindparam
Expand Down Expand Up @@ -137,57 +135,53 @@ def activate_version(self, new_version: int) -> None:
with self.connector._connect() as conn, conn.begin(): # noqa: SLF001
conn.execute(query)


def _validate_and_parse(self, record: dict) -> dict:
"""Validate or repair the record, parsing to python-native types as needed.
"""Pre-validate and repair records for string type mismatches, then validate.
Args:
record: Individual record in the stream.
Returns:
Validated record.
"""
validation_error = True
while validation_error:
try:
self._validator.validate(record)
self._parse_timestamps_in_record(
record=record,
schema=self.schema,
treatment=self.datetime_error_treatment,
)
validation_error = False
except jsonschema_exceptions.ValidationError as e:
record = handle_validation_error(record, e, self.logger)
# Pre-validate and correct string type mismatches.
record = self._pre_validate_for_string_type(record)

try:
self._validator.validate(record)
self._parse_timestamps_in_record(
record=record,
schema=self.schema,
treatment=self.datetime_error_treatment,
)
except jsonschema_exceptions.ValidationError as e:
if self.logger:
self.logger.exception(f"Record failed validation: {record}")
raise e # noqa: RERAISES

return record

def _pre_validate_for_string_type(self, record: dict) -> dict:
"""Pre-validate record for string type mismatches and correct them.
def handle_validation_error(record,
e: ValidationError,
logger: logging.Logger | None = None):
if "'string'" in e.message:
if logger:
logger.debug(
f"Received non valid record for types 'string', {e.path}, "
f"attempting conversion for record, {record}",
)
# Get the parent key path to the problematic value.
record = record.copy()
parent_key = e.path[0]
problem_value = record[parent_key]

# Convert the problematic value to string only if it's not null.
if problem_value is not None:
if isinstance(problem_value, (dict, list)):
# Convert the dict to JSON string.
record[parent_key] = json.dumps(problem_value)
else:
# Convert the value to string.
record[parent_key] = str(problem_value)

if logger:
logger.debug(f"Validating converted record at parent key: {parent_key}")
return record
return None
return None
Args:
record: Individual record in the stream.
Returns:
Record with corrected string type mismatches.
"""
for key, value in record.items():
# Checking if the schema expects a string for this key.
expected_type = self.schema.get("properties", {}).get(key, {}).get("type")
if expected_type == "string" and not isinstance(value, str):
# Convert the value to string if it's not already a string.
record[key] = (
json.dumps(value)
if isinstance(value, (dict, list)) else str(value)
)
if self.logger:
self.logger.debug(
f"Converted field {key} to string: {record[key]}",
)

return record
164 changes: 74 additions & 90 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import json
import logging

from jsonschema import Draft7Validator, ValidationError
from jsonschema import Draft7Validator

from target_clickhouse.sinks import handle_validation_error

# Schema that allows a field to be either a string or null
# Schema definitions
schema = {
"type": "object",
"properties": {
Expand All @@ -27,105 +25,72 @@
"required": ["name", "age", "address"],
}


# Validator instance
# Validator instances
validator = Draft7Validator(schema)

nested_validator = Draft7Validator(nested_schema)

# Set up the logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def pre_validate_for_string_type(record: dict, schema: dict) -> dict:
for key, value in record.items():
expected_type = schema.get("properties", {}).get(key, {}).get("type")
if "string" in expected_type and not isinstance(value, str):
record[key] = (
json.dumps(value)
if isinstance(value, (dict, list)) else str(value)
)
if logger:
logger.debug(f"Converted field {key} to string: {record[key]}")
return record

# Test cases
def test_validation_string_conversion():
record = {"name": 123, "age": 30}
try:
validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
updated_record["name"] == "123"
), "The 'name' should have been converted to a string."
validator.validate(updated_record) # This should not raise an error
pre_validated_record = pre_validate_for_string_type(record, schema)
validator.validate(pre_validated_record) # This should not raise an error
assert (
pre_validated_record["name"] == "123"
), "The 'name' should have been converted to a string."

def test_validation_no_error_raised():
record = {"name": "John", "age": 30}
# This should not raise an error, hence no need to handle validation
validator.validate(record) # This should not raise an error

def test_validation_null_allowed():
record = {"name": None, "age": 30}
try:
validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
updated_record is None
), "The 'name' field is null and should be valid."

def test_validation_non_string_non_null_field():
record = {"name": {"first": "John", "last": "Doe"}, "age": 30}
try:
validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
isinstance(updated_record["name"], str)
), "The 'name' should have been converted to a string."
validator.validate(record) # This should not raise an error

def test_nested_dict_string_conversion():
record = {"name": "John", "age": 30, "address": {"street": 123, "city": "New York"}}
try:
validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
updated_record["address"]["street"] == "123"
), "The 'street' should have been converted to a string."
validator.validate(updated_record) # This should not raise an error
record = {
"name": "John", "age": 30,
"address": {"street": 123, "city": "New York"},
}
pre_validated_record = pre_validate_for_string_type(record, nested_schema)
validator.validate(pre_validated_record) # This should not raise an error
assert (
"street" in json.loads(pre_validated_record["address"])
), "The 'address' should have been converted to a JSON string."

def test_nested_dict_with_nested_non_string():
record = {"name": "John", "age": 30,
"address": {"street": "Main", "city": {"name": "New York"}}}
try:
validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
isinstance(updated_record["address"]["city"], str)
), "The 'city' should have been converted to a string."
validator.validate(updated_record) # This should not raise an error

def test_single_level_schema_nested_dict_to_string():
record = {"name": {"first": "John", "last": "Doe"}, "age": 30}
try:
nested_validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
isinstance(updated_record["name"], str)
), "The 'name' should have been converted to a JSON string."
assert (
json.loads(updated_record["name"]) == {"first": "John", "last": "Doe"}
), "The JSON string is not correct."

def test_single_level_schema_deeply_nested_dict_to_string():
record = {
"name": "John",
"age": 30,
"name": "John", "age": 30,
"address": {"street": "Main", "city": {"name": "New York"}},
}
try:
nested_validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
isinstance(updated_record["address"], str)
), "The 'address' field should have been converted to a JSON string."
assert (
"street" in json.loads(updated_record["address"])
), "The JSON string does not correctly represent the nested dict."
pre_validated_record = pre_validate_for_string_type(record, nested_schema)
validator.validate(pre_validated_record) # This should not raise an error
assert (
"city" in json.loads(pre_validated_record["address"])
), "The 'city' should have been converted to a JSON string."

def test_single_level_schema_nested_dict_to_string():
record = {"name": {"first": "John", "last": "Doe"}, "age": 30, "address": None}
pre_validated_record = pre_validate_for_string_type(record, nested_schema)
nested_validator.validate(pre_validated_record) # This should not raise an error
assert (
json.loads(pre_validated_record["name"]) == {"first": "John", "last": "Doe"}
), "The JSON string is not correct."

def test_single_level_schema_deeply_nested_list_of_dicts_to_string():
record = {
Expand All @@ -136,14 +101,33 @@ def test_single_level_schema_deeply_nested_list_of_dicts_to_string():
{"street": "Second", "city": {"name": "Los Angeles"}},
],
}
address_str = json.dumps(record["address"])
try:
nested_validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
isinstance(updated_record["address"], str)
), "The 'address' field should have been converted to a JSON string."
assert (
updated_record["address"] == address_str
), "The JSON string does not correctly represent the nested list of dicts."
pre_validated_record = pre_validate_for_string_type(record, nested_schema)
nested_validator.validate(pre_validated_record) # This should not raise an error
address_list = json.loads(pre_validated_record["address"])
assert (
all("street" in addr for addr in address_list)
), "The JSON string does not correctly represent the nested list of dicts."

def test_multiple_fields_conversion():
# Test record with multiple fields needing conversion
record = {
"name": {"first": "John", "last": "Doe"}, # Expected to be a string
"age": 30,
"address": {"street": "Main", "city": {"name": "New York"}},
}
pre_validated_record = pre_validate_for_string_type(record, nested_schema)
nested_validator.validate(pre_validated_record) # This should not raise an error

# Asserting the conversions
assert (
isinstance(pre_validated_record["name"], str)
), "The 'name' should have been converted to a JSON string."
assert (
isinstance(pre_validated_record["address"], str,
)), "The 'address' should have been converted to a JSON string."
assert (
json.loads(pre_validated_record["name"]) == {"first": "John", "last": "Doe"}
), "The JSON string for 'name' is not correct."
assert (
"street" in json.loads(pre_validated_record["address"])
), "The JSON string for 'address' does not correctly represent the nested dict."

0 comments on commit 2201d4e

Please sign in to comment.