Skip to content

Commit

Permalink
fix(targets): Handle missing record properties in SQL sinks (#1865)
Browse files Browse the repository at this point in the history
* fix(targets): Handle missing record properties in SQL sinks

* Update singer_sdk/sinks/sql.py

* Add test to target suite
  • Loading branch information
edgarrmondragon authored Jul 26, 2023
1 parent 019902e commit 6576f52
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 2 deletions.
14 changes: 12 additions & 2 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,20 @@ def bulk_insert_records(
insert_sql = sqlalchemy.text(insert_sql)

conformed_records = [self.conform_record(record) for record in records]
property_names = list(self.conform_schema(schema)["properties"].keys())

# Create new record dicts with missing properties filled in with None
new_records = [
{name: record.get(name) for name in property_names}
for record in conformed_records
]

self.logger.info("Inserting with SQL: %s", insert_sql)

with self.connector._connect() as conn, conn.begin():
conn.execute(insert_sql, conformed_records)
return len(conformed_records) if isinstance(conformed_records, list) else None
result = conn.execute(insert_sql, new_records)

return result.rowcount

def merge_upsert_from_table(
self,
Expand Down
2 changes: 2 additions & 0 deletions singer_sdk/testing/suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
TargetOptionalAttributes,
TargetRecordBeforeSchemaTest,
TargetRecordMissingKeyProperty,
TargetRecordMissingOptionalFields,
TargetSchemaNoProperties,
TargetSchemaUpdates,
TargetSpecialCharsInAttributes,
Expand Down Expand Up @@ -103,6 +104,7 @@ class TestSuite:
TargetOptionalAttributes,
TargetRecordBeforeSchemaTest,
TargetRecordMissingKeyProperty,
TargetRecordMissingOptionalFields,
TargetSchemaNoProperties,
TargetSchemaUpdates,
TargetSpecialCharsInAttributes,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"type": "SCHEMA", "stream": "record_missing_fields", "key_properties": ["id"], "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "optional": {"type": "string"}}, "required": ["id"]}}
{"type": "RECORD", "stream": "record_missing_fields", "record": {"id": 1, "optional": "now you see me"}}
{"type": "RECORD", "stream": "record_missing_fields", "record": {"id": 2}}
{"type": "STATE", "value": {}}
6 changes: 6 additions & 0 deletions singer_sdk/testing/target_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,9 @@ class TargetSpecialCharsInAttributes(TargetFileTestTemplate):
"""Test Target handles special chars in attributes."""

name = "special_chars_in_attributes"


class TargetRecordMissingOptionalFields(TargetFileTestTemplate):
"""Test Target handles record missing optional fields."""

name = "record_missing_fields"
29 changes: 29 additions & 0 deletions tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,35 @@ def test_sqlite_column_no_morph(sqlite_sample_target: SQLTarget):
target_sync_test(sqlite_sample_target, input=StringIO(tap_output_b), finalize=True)


def test_record_with_missing_properties(
sqlite_sample_target: SQLTarget,
):
"""Test handling of records with missing properties."""
tap_output = "\n".join(
json.dumps(msg)
for msg in [
{
"type": "SCHEMA",
"stream": "test_stream",
"schema": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
},
},
"key_properties": ["id"],
},
{
"type": "RECORD",
"stream": "test_stream",
"record": {"id": 1},
},
]
)
target_sync_test(sqlite_sample_target, input=StringIO(tap_output), finalize=True)


@pytest.mark.parametrize(
"stream_name,schema,key_properties,expected_dml",
[
Expand Down

0 comments on commit 6576f52

Please sign in to comment.