Skip to content

Commit

Permalink
SAT: make expect_records mandatory in high test_strictness_level (
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored Oct 28, 2022
1 parent 592a785 commit c25ac8a
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 83 deletions.
4 changes: 2 additions & 2 deletions airbyte-integrations/bases/source-acceptance-test/.coveragerc
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[report]
# show lines missing coverage
show_missing = true
# coverage 64% measured on 62303a85def89450d2e46573a3d96cd326f2e921 (2022-08-09)
# coverage 74% measured on 4977ac2c527f03c15ce0094cfd48f6104a0fd82f (2022-10-26)
# This number should probably never be adjusted down, only up i.e: we should only ever increase our test coverage
fail_under = 64
fail_under = 74
skip_covered = true
skip_empty = true
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.15
Make `expect_records` mandatory in `high` `test_strictness_level`. [#18497](https://github.com/airbytehq/airbyte/pull/18497/).

## 0.2.14
Fail basic read in `high` `test_strictness_level` if no `bypass_reason` is set on empty_streams. [#18425](https://github.com/airbytehq/airbyte/pull/18425/).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.2.14
LABEL io.airbyte.version=0.2.15
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class ExpectedRecordsConfig(BaseModel):
class Config:
extra = "forbid"

path: Path = Field(description="File with expected records")
bypass_reason: Optional[str] = Field(description="Reason why this test is bypassed.")
path: Optional[Path] = Field(description="File with expected records")
extra_fields: bool = Field(False, description="Allow records to have other fields")
exact_order: bool = Field(False, description="Ensure that records produced in exact same order")
extra_records: bool = Field(
Expand All @@ -92,6 +93,14 @@ def validate_extra_records(cls, extra_records, values):
raise ValueError("extra_records must be off if extra_fields enabled")
return extra_records

@validator("path", always=True)
def no_bypass_reason_when_path_is_set(cls, path, values):
if path and values.get("bypass_reason"):
raise ValueError("You can't set a bypass_reason if a path is set")
if not path and not values.get("bypass_reason"):
raise ValueError("A path or a bypass_reason must be set")
return path


class EmptyStreamConfiguration(BaseConfig):
name: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from logging import Logger
from pathlib import Path
from subprocess import STDOUT, check_output, run
from typing import Any, List, MutableMapping, Optional
from typing import Any, List, MutableMapping, Optional, Set

import pytest
from airbyte_cdk.models import (
Expand All @@ -24,7 +24,8 @@
)
from docker import errors
from source_acceptance_test.base import BaseTest
from source_acceptance_test.config import Config
from source_acceptance_test.config import Config, EmptyStreamConfiguration
from source_acceptance_test.tests import TestBasicRead
from source_acceptance_test.utils import ConnectorRunner, SecretDict, filter_output, load_config, load_yaml_or_json_path


Expand Down Expand Up @@ -167,14 +168,62 @@ def pull_docker_image(acceptance_test_config) -> None:
pytest.exit(f"Docker image `{image_name}` not found, please check your {config_filename} file", returncode=1)


@pytest.fixture(name="expected_records")
def expected_records_fixture(inputs, base_path) -> List[AirbyteRecordMessage]:
expect_records = getattr(inputs, "expect_records")
if not expect_records:
return []

with open(str(base_path / getattr(expect_records, "path"))) as f:
return [AirbyteRecordMessage.parse_raw(line) for line in f]
@pytest.fixture(name="empty_streams")
def empty_streams_fixture(inputs, test_strictness_level) -> Set[EmptyStreamConfiguration]:
empty_streams = getattr(inputs, "empty_streams", set())
if test_strictness_level is Config.TestStrictnessLevel.high and empty_streams:
all_empty_streams_have_bypass_reasons = all([bool(empty_stream.bypass_reason) for empty_stream in inputs.empty_streams])
if not all_empty_streams_have_bypass_reasons:
pytest.fail("A bypass_reason must be filled in for all empty streams when test_strictness_level is set to high.")
return empty_streams


@pytest.fixture(name="expected_records_by_stream")
def expected_records_by_stream_fixture(
test_strictness_level: Config.TestStrictnessLevel,
configured_catalog: ConfiguredAirbyteCatalog,
empty_streams: Set[EmptyStreamConfiguration],
inputs,
base_path,
) -> MutableMapping[str, List[MutableMapping]]:
def enforce_high_strictness_level_rules(expect_records_config, configured_catalog, empty_streams, records_by_stream) -> Optional[str]:
error_prefix = "High strictness level error: "
if expect_records_config is None:
pytest.fail(error_prefix + "expect_records must be configured for the basic_read test.")
elif expect_records_config.path:
not_seeded_streams = find_not_seeded_streams(configured_catalog, empty_streams, records_by_stream)
if not_seeded_streams:
pytest.fail(
error_prefix
+ f"{', '.join(not_seeded_streams)} streams are declared in the catalog but do not have expected records. Please add expected records to {expect_records_config.path} or declare these streams in empty_streams."
)

expect_records_config = inputs.expect_records

expected_records_by_stream = {}
if expect_records_config:
if expect_records_config.path:
expected_records_file_path = str(base_path / expect_records_config.path)
with open(expected_records_file_path, "r") as f:
all_records = [AirbyteRecordMessage.parse_raw(line) for line in f]
expected_records_by_stream = TestBasicRead.group_by_stream(all_records)

if test_strictness_level is Config.TestStrictnessLevel.high:
enforce_high_strictness_level_rules(expect_records_config, configured_catalog, empty_streams, expected_records_by_stream)
return expected_records_by_stream


def find_not_seeded_streams(
configured_catalog: ConfiguredAirbyteCatalog,
empty_streams: Set[EmptyStreamConfiguration],
records_by_stream: MutableMapping[str, List[MutableMapping]],
) -> Set[str]:
stream_names_in_catalog = set([configured_stream.stream.name for configured_stream in configured_catalog.streams])
empty_streams_names = set([stream.name for stream in empty_streams])
expected_record_stream_names = set(records_by_stream.keys())
expected_seeded_stream_names = stream_names_in_catalog - empty_streams_names

return expected_seeded_stream_names - expected_record_stream_names


@pytest.fixture(name="cached_schemas", scope="session")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from docker.errors import ContainerError
from jsonschema._utils import flatten
from source_acceptance_test.base import BaseTest
from source_acceptance_test.config import BasicReadTestConfig, Config, ConnectionTestConfig, DiscoveryTestConfig, SpecTestConfig
from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig, DiscoveryTestConfig, SpecTestConfig
from source_acceptance_test.utils import ConnectorRunner, SecretDict, filter_output, make_hashable, verify_records_schema
from source_acceptance_test.utils.backward_compatibility import CatalogDiffChecker, SpecDiffChecker, validate_previous_configs
from source_acceptance_test.utils.common import find_all_values_for_key_in_schema, find_keyword_schema
Expand Down Expand Up @@ -435,14 +435,17 @@ def _validate_field_appears_at_least_once(self, records: List, configured_catalo
assert not stream_name_to_empty_fields_mapping, msg

def _validate_expected_records(
self, records: List[AirbyteRecordMessage], expected_records: List[AirbyteRecordMessage], flags, detailed_logger: Logger
self,
records: List[AirbyteRecordMessage],
expected_records_by_stream: MutableMapping[str, List[MutableMapping]],
flags,
detailed_logger: Logger,
):
"""
We expect some records from stream to match expected_records, partially or fully, in exact or any order.
"""
actual_by_stream = self.group_by_stream(records)
expected_by_stream = self.group_by_stream(expected_records)
for stream_name, expected in expected_by_stream.items():
for stream_name, expected in expected_records_by_stream.items():
actual = actual_by_stream.get(stream_name, [])
detailed_logger.info(f"Actual records for stream {stream_name}:")
detailed_logger.log_json_list(actual)
Expand All @@ -464,12 +467,10 @@ def test_read(
connector_config,
configured_catalog,
inputs: BasicReadTestConfig,
expected_records: List[AirbyteRecordMessage],
expected_records_by_stream: MutableMapping[str, List[MutableMapping]],
docker_runner: ConnectorRunner,
detailed_logger,
test_strictness_level: Config.TestStrictnessLevel,
):
self.enforce_strictness_level(test_strictness_level, inputs)
output = docker_runner.call_read(connector_config, configured_catalog)
records = [message.record for message in filter_output(output, Type.RECORD)]

Expand All @@ -489,9 +490,12 @@ def test_read(
if inputs.validate_data_points:
self._validate_field_appears_at_least_once(records=records, configured_catalog=configured_catalog)

if expected_records:
if expected_records_by_stream:
self._validate_expected_records(
records=records, expected_records=expected_records, flags=inputs.expect_records, detailed_logger=detailed_logger
records=records,
expected_records_by_stream=expected_records_by_stream,
flags=inputs.expect_records,
detailed_logger=detailed_logger,
)

def test_airbyte_trace_message_on_failure(self, connector_config, inputs: BasicReadTestConfig, docker_runner: ConnectorRunner):
Expand Down Expand Up @@ -581,11 +585,3 @@ def group_by_stream(records: List[AirbyteRecordMessage]) -> MutableMapping[str,
result[record.stream].append(record.data)

return result

@staticmethod
def enforce_strictness_level(test_strictness_level: Config.TestStrictnessLevel, inputs: BasicReadTestConfig):
if test_strictness_level is Config.TestStrictnessLevel.high:
if inputs.empty_streams:
all_empty_streams_have_bypass_reasons = all([bool(empty_stream.bypass_reason) for empty_stream in inputs.empty_streams])
if not all_empty_streams_have_bypass_reasons:
pytest.fail("A bypass_reason must be filled in for all empty streams when test_strictness_level is set to high.")
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,18 @@ def test_config_parsing(self, raw_config, expected_output_config, expected_error
def test_legacy_config_migration(self, legacy_config, expected_parsed_config):
assert config.Config.is_legacy(legacy_config)
assert config.Config.parse_obj(legacy_config) == expected_parsed_config


class TestExpectedRecordsConfig:
@pytest.mark.parametrize(
"path, bypass_reason, expectation",
[
pytest.param("my_path", None, does_not_raise()),
pytest.param(None, "Good bypass reason", does_not_raise()),
pytest.param(None, None, pytest.raises(ValidationError)),
pytest.param("my_path", "Good bypass reason", pytest.raises(ValidationError)),
],
)
def test_bypass_reason_behavior(self, path, bypass_reason, expectation):
with expectation:
config.ExpectedRecordsConfig(path=path, bypass_reason=bypass_reason)
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
TraceType,
Type,
)
from source_acceptance_test.config import BasicReadTestConfig, Config, EmptyStreamConfiguration
from source_acceptance_test.tests import test_core
from source_acceptance_test.config import BasicReadTestConfig
from source_acceptance_test.tests.test_core import TestBasicRead as _TestBasicRead
from source_acceptance_test.tests.test_core import TestDiscovery as _TestDiscovery

Expand Down Expand Up @@ -260,7 +259,7 @@ def test_additional_properties_is_true(discovered_catalog, expectation):
),
],
)
def test_read(mocker, schema, record, expectation):
def test_read(schema, record, expectation):
catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
Expand All @@ -276,10 +275,8 @@ def test_read(mocker, schema, record, expectation):
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data=record, emitted_at=111))
]
t = _TestBasicRead()
t.enforce_strictness_level = mocker.Mock()
with expectation:
t.test_read(None, catalog, input_config, [], docker_runner_mock, MagicMock(), Config.TestStrictnessLevel.low)
t.enforce_strictness_level.assert_called_with(Config.TestStrictnessLevel.low, input_config)
t.test_read(None, catalog, input_config, [], docker_runner_mock, MagicMock())


@pytest.mark.parametrize(
Expand Down Expand Up @@ -844,45 +841,3 @@ def test_validate_field_appears_at_least_once(records, configured_catalog, expec
t._validate_field_appears_at_least_once(records=records, configured_catalog=configured_catalog)
else:
t._validate_field_appears_at_least_once(records=records, configured_catalog=configured_catalog)


@pytest.mark.parametrize(
"test_strictness_level, basic_read_test_config, expect_test_failure",
[
pytest.param(
Config.TestStrictnessLevel.low,
BasicReadTestConfig(config_path="config_path", empty_streams={EmptyStreamConfiguration(name="my_empty_stream")}),
False,
id="[LOW test strictness level] Empty streams can be declared without bypass_reason.",
),
pytest.param(
Config.TestStrictnessLevel.low,
BasicReadTestConfig(
config_path="config_path", empty_streams={EmptyStreamConfiguration(name="my_empty_stream", bypass_reason="good reason")}
),
False,
id="[LOW test strictness level] Empty streams can be declared with a bypass_reason.",
),
pytest.param(
Config.TestStrictnessLevel.high,
BasicReadTestConfig(config_path="config_path", empty_streams={EmptyStreamConfiguration(name="my_empty_stream")}),
True,
id="[HIGH test strictness level] Empty streams can't be declared without bypass_reason.",
),
pytest.param(
Config.TestStrictnessLevel.high,
BasicReadTestConfig(
config_path="config_path", empty_streams={EmptyStreamConfiguration(name="my_empty_stream", bypass_reason="good reason")}
),
False,
id="[HIGH test strictness level] Empty streams can be declared with a bypass_reason.",
),
],
)
def test_enforce_strictness_level(mocker, test_strictness_level, basic_read_test_config, expect_test_failure):
mocker.patch.object(test_core, "pytest")
assert _TestBasicRead.enforce_strictness_level(test_strictness_level, basic_read_test_config) is None
if expect_test_failure:
test_core.pytest.fail.assert_called_once()
else:
test_core.pytest.fail.assert_not_called()
Loading

0 comments on commit c25ac8a

Please sign in to comment.