Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Add test cases for arrays and objects, and introduce verify_schema #250

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ smoke-test
.meltano/**
.tox/**
.secrets/**
.idea
.vscode/**
output/**
.env
Expand Down
4 changes: 3 additions & 1 deletion target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,10 @@ def copy_table_structure(

@contextmanager
def _connect(self) -> t.Iterator[sqlalchemy.engine.Connection]:
with self._engine.connect().execution_options() as conn:
engine = self._engine
with engine.connect().execution_options() as conn:
yield conn
engine.dispose()
Comment on lines 181 to +186
Copy link
Contributor Author

@amotl amotl Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This spot relates to an issue we observed after adding more test cases. Other than any other related changes to the test suite, this one affects the connector implementation. Without that change, there seems to be a flaw which apparently qualifies as a resource leak, manifesting in connection pool overflow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would want to understand this fully before merging. What is the root cause of this problem? Are we confident that this is all that's required to close the leak?

Copy link
Contributor Author

@amotl amotl Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Sebastian. Thanks for your review. I can only say that this spot has been found out by educated guessing and a bit more trial-and-error, so it was kind of empirically determined, including the solution.

Apologies that I can't come up with a better reasoning/rationale, but I can offer to submit another repro PR without all the other changes here, in order to better demonstrate the issue.


def drop_table(
self, table: sqlalchemy.Table, connection: sqlalchemy.engine.Connection
Expand Down
5 changes: 5 additions & 0 deletions target_postgres/tests/data_files/array_boolean.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type": "SCHEMA", "stream": "array_boolean", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "boolean"}}}}}
{"type": "RECORD", "stream": "array_boolean", "record": {"id": 1, "value": [ true, false ]}}
{"type": "RECORD", "stream": "array_boolean", "record": {"id": 2, "value": [ false ]}}
{"type": "RECORD", "stream": "array_boolean", "record": {"id": 3, "value": [ false, true, true, false ]}}
{"type": "STATE", "value": {"array_boolean": 3}}
6 changes: 0 additions & 6 deletions target_postgres/tests/data_files/array_data.singer

This file was deleted.

5 changes: 5 additions & 0 deletions target_postgres/tests/data_files/array_number.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type": "SCHEMA", "stream": "array_number", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "number"}}}}}
{"type": "RECORD", "stream": "array_number", "record": {"id": 1, "value": [ 42.42, 84.84, 23 ]}}
{"type": "RECORD", "stream": "array_number", "record": {"id": 2, "value": [ 1.0 ]}}
{"type": "RECORD", "stream": "array_number", "record": {"id": 3, "value": [ 1.11, 2.22, 3, 4, 5.55 ]}}
{"type": "STATE", "value": {"array_number": 3}}
6 changes: 6 additions & 0 deletions target_postgres/tests/data_files/array_string.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"type": "SCHEMA", "stream": "array_string", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array","items": {"type": "string"}}}}}
{"type": "RECORD", "stream": "array_string", "record": {"id": 1, "value": [ "apple", "orange", "pear" ]}}
{"type": "RECORD", "stream": "array_string", "record": {"id": 2, "value": [ "banana", "apple" ]}}
{"type": "RECORD", "stream": "array_string", "record": {"id": 3, "value": [ "pear" ]}}
{"type": "RECORD", "stream": "array_string", "record": {"id": 4, "value": [ "orange", "banana", "apple", "pear" ]}}
{"type": "STATE", "value": {"array_string": 4}}
5 changes: 5 additions & 0 deletions target_postgres/tests/data_files/array_timestamp.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type": "SCHEMA", "stream": "array_timestamp", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "string", "format": "date-time"}}}}}
{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 1, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02" ]}}
{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 2, "value": [ "2023-12-13T01:15:02" ]}}
{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 3, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02", "2023-12-13T01:17:02" ]}}
{"type": "STATE", "value": {"array_timestamp": 3}}
3 changes: 3 additions & 0 deletions target_postgres/tests/data_files/object_mixed.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"type": "SCHEMA", "stream": "object_mixed", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "object"}}}}
{"type": "RECORD", "stream": "object_mixed", "record": {"id": 1, "value": {"string": "foo", "integer": 42, "float": 42.42, "timestamp": "2023-12-13T01:15:02", "array_boolean": [true, false], "array_float": [42.42, 84.84], "array_integer": [42, 84], "array_string": ["foo", "bar"], "nested_object": {"foo": "bar"}}}}
{"type": "STATE", "value": {"object_mixed": 1}}
4 changes: 3 additions & 1 deletion target_postgres/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ class BasePostgresSDKTests:
@pytest.fixture()
def connection(self, runner):
engine = create_engine(runner)
return engine.connect()
with engine.connect() as connection:
yield connection
engine.dispose()


SDKTests = get_target_test_class(
Expand Down
169 changes: 140 additions & 29 deletions target_postgres/tests/test_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import sqlalchemy
from singer_sdk.exceptions import MissingKeyPropertiesError
from singer_sdk.testing import get_target_test_class, sync_end_to_end
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.types import TEXT, TIMESTAMP
from sqlalchemy.dialects.postgresql import ARRAY, JSONB
from sqlalchemy.types import BIGINT, TEXT, TIMESTAMP

from target_postgres.connector import PostgresConnector
from target_postgres.target import TargetPostgres
Expand Down Expand Up @@ -94,7 +94,7 @@ def verify_data(

Args:
target: The target to obtain a database connection from.
full_table_name: The schema and table name of the table to check data for.
table_name: The schema and table name of the table to check data for.
primary_key: The primary key of the table.
number_of_rows: The expected number of rows that should be in the table.
check_data: A dictionary representing the full contents of the first row in the
Expand Down Expand Up @@ -132,6 +132,45 @@ def verify_data(
sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}")
)
assert result.first()[0] == number_of_rows
engine.dispose()


def verify_schema(
target: TargetPostgres,
table_name: str,
check_columns: dict = None,
):
"""Checks whether the schema of a database table matches the provided column definitions.

Args:
target: The target to obtain a database connection from.
table_name: The schema and table name of the table to check data for.
check_columns: A dictionary mapping column names to their definitions. Currently,
it is all about the `type` attribute which is compared.
"""
engine = create_engine(target)
schema = target.config["default_target_schema"]
with engine.connect() as connection:
meta = sqlalchemy.MetaData()
table = sqlalchemy.Table(
table_name, meta, schema=schema, autoload_with=connection
)
for column in table.c:
# Ignore `_sdc` columns for now.
if column.name.startswith("_sdc"):
continue
try:
column_type_expected = check_columns[column.name]["type"]
except KeyError:
raise ValueError(
f"Invalid check_columns - missing definition for column: {column.name}"
)
if not isinstance(column.type, column_type_expected):
raise TypeError(
f"Column '{column.name}' (with type '{column.type}') "
f"does not match expected type: {column_type_expected}"
)
engine.dispose()


def test_sqlalchemy_url_config(postgres_config_no_ssl):
Expand Down Expand Up @@ -406,11 +445,92 @@ def test_duplicate_records(postgres_target):
verify_data(postgres_target, "test_duplicate_records", 2, "id", row)


def test_array_data(postgres_target):
file_name = "array_data.singer"
def test_array_boolean(postgres_target):
file_name = "array_boolean.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "value": [True, False]}
verify_data(postgres_target, "array_boolean", 3, "id", row)
verify_schema(
postgres_target,
"array_boolean",
check_columns={
"id": {"type": BIGINT},
"value": {"type": ARRAY},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrays have inner data types such as ARRAY[BOOLEAN] (not sure if that's the right syntax). We should additionally check for that.

Copy link
Contributor Author

@amotl amotl Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. Let me check how verify_schema could be improved accordingly.

We are not looking at multiple dimensions here, or eventual nestedness, right? It would just be about »verifying the type of the first element within the array against its specification, when the array is not empty«, correct?

},
)


def test_array_number(postgres_target):
file_name = "array_number.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]}
verify_data(postgres_target, "array_number", 3, "id", row)
verify_schema(
postgres_target,
"array_number",
check_columns={
"id": {"type": BIGINT},
"value": {"type": ARRAY},
},
)


def test_array_string(postgres_target):
file_name = "array_string.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "value": ["apple", "orange", "pear"]}
verify_data(postgres_target, "array_string", 4, "id", row)
verify_schema(
postgres_target,
"array_string",
check_columns={
"id": {"type": BIGINT},
"value": {"type": ARRAY},
},
)


def test_array_timestamp(postgres_target):
file_name = "array_timestamp.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]}
verify_data(postgres_target, "array_timestamp", 3, "id", row)
verify_schema(
postgres_target,
"array_timestamp",
check_columns={
"id": {"type": BIGINT},
"value": {"type": ARRAY},
},
)


def test_object_mixed(postgres_target):
file_name = "object_mixed.singer"
singer_file_to_target(file_name, postgres_target)
row = {"id": 1, "fruits": ["apple", "orange", "pear"]}
verify_data(postgres_target, "test_carts", 4, "id", row)
row = {
"id": 1,
"value": {
"string": "foo",
"integer": 42,
"float": Decimal("42.42"),
"timestamp": "2023-12-13T01:15:02",
"array_boolean": [True, False],
"array_float": [Decimal("42.42"), Decimal("84.84")],
"array_integer": [42, 84],
"array_string": ["foo", "bar"],
"nested_object": {"foo": "bar"},
},
}
verify_data(postgres_target, "object_mixed", 1, "id", row)
verify_schema(
postgres_target,
"object_mixed",
check_columns={
"id": {"type": BIGINT},
"value": {"type": JSONB},
},
)


def test_encoded_string_data(postgres_target):
Expand Down Expand Up @@ -456,41 +576,32 @@ def test_large_int(postgres_target):

def test_anyof(postgres_target):
"""Test that anyOf is handled correctly"""
engine = create_engine(postgres_target)
table_name = "commits"
file_name = f"{table_name}.singer"
schema = postgres_target.config["default_target_schema"]
singer_file_to_target(file_name, postgres_target)
with engine.connect() as connection:
meta = sqlalchemy.MetaData()
table = sqlalchemy.Table(
"commits", meta, schema=schema, autoload_with=connection
)
for column in table.c:
# {"type":"string"}
if column.name == "id":
assert isinstance(column.type, TEXT)

verify_schema(
postgres_target,
table_name,
check_columns={
# {"type":"string"}
"id": {"type": TEXT},
# Any of nullable date-time.
# Note that postgres timestamp is equivalent to jsonschema date-time.
# {"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}]}
if column.name in {"authored_date", "committed_date"}:
assert isinstance(column.type, TIMESTAMP)

"authored_date": {"type": TIMESTAMP},
"committed_date": {"type": TIMESTAMP},
# Any of nullable array of strings or single string.
# {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]}
if column.name == "parent_ids":
assert isinstance(column.type, ARRAY)

"parent_ids": {"type": ARRAY},
# Any of nullable string.
# {"anyOf":[{"type":"string"},{"type":"null"}]}
if column.name == "commit_message":
assert isinstance(column.type, TEXT)

"commit_message": {"type": TEXT},
# Any of nullable string or integer.
# {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]}
if column.name == "legacy_id":
assert isinstance(column.type, TEXT)
"legacy_id": {"type": TEXT},
},
)


def test_new_array_column(postgres_target):
Expand Down
Loading