Skip to content

Commit

Permalink
SAT: enforce bypass_reason declaration on empty_streams when `tes…
Browse files Browse the repository at this point in the history
…t_strictness_level == high` (airbytehq#18425)
  • Loading branch information
alafanechere authored and jhammarstedt committed Oct 31, 2022
1 parent ce3fe01 commit 73bb7de
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.14
Fail basic read in `high` `test_strictness_level` if no `bypass_reason` is set on empty_streams. [#18425](https://github.com/airbytehq/airbyte/pull/18425/).

## 0.2.13
Fail tests in `high` `test_strictness_level` if all tests are not configured. [#18414](https://github.com/airbytehq/airbyte/pull/18414/).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.2.13
LABEL io.airbyte.version=0.2.14
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,20 @@ def validate_extra_records(cls, extra_records, values):
return extra_records


class EmptyStreamConfiguration(BaseConfig):
name: str
bypass_reason: Optional[str] = Field(default=None, description="Reason why this stream is considered empty.")

def __hash__(self): # make it hashable
return hash((type(self),) + tuple(self.__dict__.values()))


class BasicReadTestConfig(BaseConfig):
config_path: str = config_path
configured_catalog_path: Optional[str] = configured_catalog_path
empty_streams: Set[str] = Field(default_factory=set, description="We validate that all streams has records. These are exceptions")
empty_streams: Set[EmptyStreamConfiguration] = Field(
default_factory=set, description="We validate that all streams has records. These are exceptions"
)
expect_records: Optional[ExpectedRecordsConfig] = Field(description="Expected records from the read")
validate_schema: bool = Field(True, description="Ensure that records match the schema of the corresponding stream")
# TODO: remove this field after https://github.com/airbytehq/airbyte/issues/8312 is done
Expand Down Expand Up @@ -206,6 +216,11 @@ def migrate_legacy_to_current_config(legacy_config: dict) -> dict:
migrated_config["acceptance_tests"] = {}
for test_name, test_configs in legacy_config["tests"].items():
migrated_config["acceptance_tests"][test_name] = {"tests": test_configs}
for basic_read_tests in migrated_config["acceptance_tests"].get("basic_read", {}).get("tests", []):
if "empty_streams" in basic_read_tests:
basic_read_tests["empty_streams"] = [
{"name": empty_stream_name} for empty_stream_name in basic_read_tests.get("empty_streams", [])
]
return migrated_config

@root_validator(pre=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from docker.errors import ContainerError
from jsonschema._utils import flatten
from source_acceptance_test.base import BaseTest
from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig, DiscoveryTestConfig, SpecTestConfig
from source_acceptance_test.config import BasicReadTestConfig, Config, ConnectionTestConfig, DiscoveryTestConfig, SpecTestConfig
from source_acceptance_test.utils import ConnectorRunner, SecretDict, filter_output, make_hashable, verify_records_schema
from source_acceptance_test.utils.backward_compatibility import CatalogDiffChecker, SpecDiffChecker, validate_previous_configs
from source_acceptance_test.utils.common import find_all_values_for_key_in_schema, find_keyword_schema
Expand Down Expand Up @@ -385,13 +385,14 @@ def _validate_empty_streams(self, records, configured_catalog, allowed_empty_str
"""
Only certain streams allowed to be empty
"""
allowed_empty_stream_names = set([allowed_empty_stream.name for allowed_empty_stream in allowed_empty_streams])
counter = Counter(record.stream for record in records)

all_streams = set(stream.stream.name for stream in configured_catalog.streams)
streams_with_records = set(counter.keys())
streams_without_records = all_streams - streams_with_records

streams_without_records = streams_without_records - allowed_empty_streams
streams_without_records = streams_without_records - allowed_empty_stream_names
assert not streams_without_records, f"All streams should return some records, streams without records: {streams_without_records}"

def _validate_field_appears_at_least_once_in_stream(self, records: List, schema: Dict):
Expand Down Expand Up @@ -466,7 +467,9 @@ def test_read(
expected_records: List[AirbyteRecordMessage],
docker_runner: ConnectorRunner,
detailed_logger,
test_strictness_level: Config.TestStrictnessLevel,
):
self.enforce_strictness_level(test_strictness_level, inputs)
output = docker_runner.call_read(connector_config, configured_catalog)
records = [message.record for message in filter_output(output, Type.RECORD)]

Expand Down Expand Up @@ -578,3 +581,11 @@ def group_by_stream(records: List[AirbyteRecordMessage]) -> MutableMapping[str,
result[record.stream].append(record.data)

return result

@staticmethod
def enforce_strictness_level(test_strictness_level: Config.TestStrictnessLevel, inputs: BasicReadTestConfig):
if test_strictness_level is Config.TestStrictnessLevel.high:
if inputs.empty_streams:
all_empty_streams_have_bypass_reasons = all([bool(empty_stream.bypass_reason) for empty_stream in inputs.empty_streams])
if not all_empty_streams_have_bypass_reasons:
pytest.fail("A bypass_reason must be filled in for all empty streams when test_strictness_level is set to high.")
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
TraceType,
Type,
)
from source_acceptance_test.config import BasicReadTestConfig
from source_acceptance_test.config import BasicReadTestConfig, Config, EmptyStreamConfiguration
from source_acceptance_test.tests import test_core
from source_acceptance_test.tests.test_core import TestBasicRead as _TestBasicRead
from source_acceptance_test.tests.test_core import TestDiscovery as _TestDiscovery

Expand Down Expand Up @@ -236,22 +237,30 @@ def test_additional_properties_is_true(discovered_catalog, expectation):


@pytest.mark.parametrize(
"schema, record, should_fail",
"schema, record, expectation",
[
({"type": "object"}, {"aa": 23}, False),
({"type": "object"}, {}, False),
({"type": "object", "properties": {"created": {"type": "string"}}}, {"aa": 23}, True),
({"type": "object", "properties": {"created": {"type": "string"}}}, {"created": "23"}, False),
({"type": "object", "properties": {"created": {"type": "string"}}}, {"root": {"created": "23"}}, True),
({"type": "object"}, {"aa": 23}, does_not_raise()),
({"type": "object"}, {}, does_not_raise()),
(
{"type": "object", "properties": {"created": {"type": "string"}}},
{"aa": 23},
pytest.raises(AssertionError, match="should have some fields mentioned by json schema"),
),
({"type": "object", "properties": {"created": {"type": "string"}}}, {"created": "23"}, does_not_raise()),
(
{"type": "object", "properties": {"created": {"type": "string"}}},
{"root": {"created": "23"}},
pytest.raises(AssertionError, match="should have some fields mentioned by json schema"),
),
# Recharge shop stream case
(
{"type": "object", "properties": {"shop": {"type": ["null", "object"]}, "store": {"type": ["null", "object"]}}},
{"shop": {"a": "23"}, "store": {"b": "23"}},
False,
does_not_raise(),
),
],
)
def test_read(schema, record, should_fail):
def test_read(mocker, schema, record, expectation):
catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
Expand All @@ -267,11 +276,10 @@ def test_read(schema, record, should_fail):
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data=record, emitted_at=111))
]
t = _TestBasicRead()
if should_fail:
with pytest.raises(AssertionError, match="should have some fields mentioned by json schema"):
t.test_read(None, catalog, input_config, [], docker_runner_mock, MagicMock())
else:
t.test_read(None, catalog, input_config, [], docker_runner_mock, MagicMock())
t.enforce_strictness_level = mocker.Mock()
with expectation:
t.test_read(None, catalog, input_config, [], docker_runner_mock, MagicMock(), Config.TestStrictnessLevel.low)
t.enforce_strictness_level.assert_called_with(Config.TestStrictnessLevel.low, input_config)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -836,3 +844,45 @@ def test_validate_field_appears_at_least_once(records, configured_catalog, expec
t._validate_field_appears_at_least_once(records=records, configured_catalog=configured_catalog)
else:
t._validate_field_appears_at_least_once(records=records, configured_catalog=configured_catalog)


@pytest.mark.parametrize(
"test_strictness_level, basic_read_test_config, expect_test_failure",
[
pytest.param(
Config.TestStrictnessLevel.low,
BasicReadTestConfig(config_path="config_path", empty_streams={EmptyStreamConfiguration(name="my_empty_stream")}),
False,
id="[LOW test strictness level] Empty streams can be declared without bypass_reason.",
),
pytest.param(
Config.TestStrictnessLevel.low,
BasicReadTestConfig(
config_path="config_path", empty_streams={EmptyStreamConfiguration(name="my_empty_stream", bypass_reason="good reason")}
),
False,
id="[LOW test strictness level] Empty streams can be declared with a bypass_reason.",
),
pytest.param(
Config.TestStrictnessLevel.high,
BasicReadTestConfig(config_path="config_path", empty_streams={EmptyStreamConfiguration(name="my_empty_stream")}),
True,
id="[HIGH test strictness level] Empty streams can't be declared without bypass_reason.",
),
pytest.param(
Config.TestStrictnessLevel.high,
BasicReadTestConfig(
config_path="config_path", empty_streams={EmptyStreamConfiguration(name="my_empty_stream", bypass_reason="good reason")}
),
False,
id="[HIGH test strictness level] Empty streams can be declared with a bypass_reason.",
),
],
)
def test_enforce_strictness_level(mocker, test_strictness_level, basic_read_test_config, expect_test_failure):
mocker.patch.object(test_core, "pytest")
assert _TestBasicRead.enforce_strictness_level(test_strictness_level, basic_read_test_config) is None
if expect_test_failure:
test_core.pytest.fail.assert_called_once()
else:
test_core.pytest.fail.assert_not_called()
Loading

0 comments on commit 73bb7de

Please sign in to comment.