diff --git a/.gitignore b/.gitignore index 1876fcbfcf48..a6e0049c2f68 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ acceptance_tests_logs/ # Secrets secrets +updated_configurations !airbyte-integrations/connector-templates/**/secrets # Test logs diff --git a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md index 017634fd619b..c678e58b9e87 100644 --- a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.2.22 +Capture control messages to store and use updated configurations. [#19979](https://github.com/airbytehq/airbyte/pull/19979). + ## 0.2.21 Optionally disable discovered catalog caching. [#19806](https://github.com/airbytehq/airbyte/pull/19806). diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index d1f803f6eca8..e7a3eff1bcd4 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./ COPY source_acceptance_test ./source_acceptance_test RUN pip install . -LABEL io.airbyte.version=0.2.21 +LABEL io.airbyte.version=0.2.22 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"] diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py index 0696b55e134c..60333371b81d 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py @@ -7,6 +7,7 @@ import json import logging import os +from glob import glob from logging import Logger from pathlib import Path from subprocess import STDOUT, check_output, run @@ -55,8 +56,19 @@ def cache_discovered_catalog_fixture(acceptance_test_config: Config) -> bool: @pytest.fixture(name="connector_config_path") def connector_config_path_fixture(inputs, base_path) -> Path: - """Fixture with connector's config path""" - return Path(base_path) / getattr(inputs, "config_path") + """Fixture with connector's config path. The path to the latest updated configurations will be returned if any.""" + original_configuration_path = Path(base_path) / getattr(inputs, "config_path") + updated_configurations_glob = f"{original_configuration_path.parent}/updated_configurations/{original_configuration_path.stem}|**{original_configuration_path.suffix}" + existing_configurations_path_creation_time = [ + (config_file_path, os.path.getctime(config_file_path)) for config_file_path in glob(updated_configurations_glob) + ] + if existing_configurations_path_creation_time: + existing_configurations_path_creation_time.sort(key=lambda x: x[1]) + most_recent_configuration_path = existing_configurations_path_creation_time[-1][0] + else: + most_recent_configuration_path = original_configuration_path + logging.info(f"Using {most_recent_configuration_path} as configuration. It is the most recent version.") + return Path(most_recent_configuration_path) @pytest.fixture(name="invalid_connector_config_path") @@ -127,8 +139,8 @@ def connector_spec_fixture(connector_spec_path) -> ConnectorSpecification: @pytest.fixture(name="docker_runner") -def docker_runner_fixture(image_tag, tmp_path) -> ConnectorRunner: - return ConnectorRunner(image_tag, volume=tmp_path) +def docker_runner_fixture(image_tag, tmp_path, connector_config_path) -> ConnectorRunner: + return ConnectorRunner(image_tag, volume=tmp_path, connector_configuration_path=connector_config_path) @pytest.fixture(name="previous_connector_image_name") diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py index 0d7aa48045a3..006c3a534013 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py @@ -9,14 +9,15 @@ from typing import Iterable, List, Mapping, Optional import docker -from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog +from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, OrchestratorType +from airbyte_cdk.models import Type as AirbyteMessageType from docker.errors import ContainerError, NotFound from docker.models.containers import Container from pydantic import ValidationError class ConnectorRunner: - def __init__(self, image_name: str, volume: Path): + def __init__(self, image_name: str, volume: Path, connector_configuration_path: Optional[Path] = None): self._client = docker.from_env() try: self._image = self._client.images.get(image_name) @@ -26,6 +27,7 @@ def __init__(self, image_name: str, volume: Path): print("Pulling completed") self._runs = 0 self._volume_base = volume + self._connector_configuration_path = connector_configuration_path @property def output_folder(self) -> Path: @@ -93,6 +95,7 @@ def call_read_with_state(self, config, catalog, state, **kwargs) -> List[Airbyte return output def run(self, cmd, config=None, state=None, catalog=None, raise_container_error: bool = True, **kwargs) -> Iterable[AirbyteMessage]: + self._runs += 1 volumes = self._prepare_volumes(config, state, catalog) logging.debug(f"Docker run {self._image}: \n{cmd}\n" f"input: {self.input_folder}\noutput: {self.output_folder}") @@ -109,7 +112,15 @@ def run(self, cmd, config=None, state=None, catalog=None, raise_container_error: for line in self.read(container, command=cmd, with_ext=raise_container_error): f.write(line.encode()) try: - yield AirbyteMessage.parse_raw(line) + airbyte_message = AirbyteMessage.parse_raw(line) + if ( + airbyte_message.type is AirbyteMessageType.CONTROL + and airbyte_message.control.type is OrchestratorType.CONNECTOR_CONFIG + ): + self._persist_new_configuration( + airbyte_message.control.connectorConfig.config, int(airbyte_message.control.emitted_at) + ) + yield airbyte_message except ValidationError as exc: logging.warning("Unable to parse connector's output %s, error: %s", line, exc) @@ -168,3 +179,37 @@ def env_variables(self): @property def entry_point(self): return self._image.attrs["Config"]["Entrypoint"] + + def _persist_new_configuration(self, new_configuration: dict, configuration_emitted_at: int) -> Optional[Path]: + """Store new configuration values to an updated_configurations subdir under the original configuration path. + N.B. The new configuration will not be stored if no configuration path was passed to the ConnectorRunner. + Args: + new_configuration (dict): The updated configuration + configuration_emitted_at (int): Timestamp at which the configuration was emitted (ms) + + Returns: + Optional[Path]: The updated configuration path if it was persisted. + """ + if self._connector_configuration_path is None: + logging.warning("No configuration path was passed to the ConnectorRunner. The new configuration was not persisted") + return None + + with open(self._connector_configuration_path) as old_configuration_file: + old_configuration = json.load(old_configuration_file) + + if new_configuration != old_configuration: + file_prefix = self._connector_configuration_path.stem.split("|")[0] + if "/updated_configurations/" not in str(self._connector_configuration_path): + Path(self._connector_configuration_path.parent / "updated_configurations").mkdir(exist_ok=True) + new_configuration_file_path = Path( + f"{self._connector_configuration_path.parent}/updated_configurations/{file_prefix}|{configuration_emitted_at}{self._connector_configuration_path.suffix}" + ) + else: + new_configuration_file_path = Path( + f"{self._connector_configuration_path.parent}/{file_prefix}|{configuration_emitted_at}{self._connector_configuration_path.suffix}" + ) + + with open(new_configuration_file_path, "w") as new_configuration_file: + json.dump(new_configuration, new_configuration_file) + logging.info(f"Stored most recent configuration value to {new_configuration_file_path}") + return new_configuration_file_path diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_container_runner.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_container_runner.py new file mode 100644 index 000000000000..eee26a3763d0 --- /dev/null +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_container_runner.py @@ -0,0 +1,91 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import json + +import pytest +from airbyte_cdk.models import ( + AirbyteControlConnectorConfigMessage, + AirbyteControlMessage, + AirbyteMessage, + AirbyteRecordMessage, + OrchestratorType, +) +from airbyte_cdk.models import Type as AirbyteMessageType +from source_acceptance_test.utils import connector_runner + + +class TestContainerRunner: + def test_run_call_persist_configuration(self, mocker, tmp_path): + old_configuration_path = tmp_path / "config.json" + new_configuration = {"field_a": "new_value_a"} + mocker.patch.object(connector_runner, "docker") + records_reads = [ + AirbyteMessage( + type=AirbyteMessageType.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"foo": "bar"}, emitted_at=1.0) + ).json(exclude_unset=False), + AirbyteMessage( + type=AirbyteMessageType.CONTROL, + control=AirbyteControlMessage( + type=OrchestratorType.CONNECTOR_CONFIG, + emitted_at=1.0, + connectorConfig=AirbyteControlConnectorConfigMessage(config=new_configuration), + ), + ).json(exclude_unset=False), + ] + mocker.patch.object(connector_runner.ConnectorRunner, "read", mocker.Mock(return_value=records_reads)) + mocker.patch.object(connector_runner.ConnectorRunner, "_persist_new_configuration") + + runner = connector_runner.ConnectorRunner("source-test:dev", tmp_path, connector_configuration_path=old_configuration_path) + list(runner.run("dummy_cmd")) + runner._persist_new_configuration.assert_called_once_with(new_configuration, 1) + + @pytest.mark.parametrize( + "pass_configuration_path, old_configuration, new_configuration, new_configuration_emitted_at, expect_new_configuration", + [ + pytest.param( + True, + {"field_a": "value_a"}, + {"field_a": "value_a"}, + 1, + False, + id="Config unchanged: No new configuration persisted", + ), + pytest.param( + True, {"field_a": "value_a"}, {"field_a": "new_value_a"}, 1, True, id="Config changed: New configuration persisted" + ), + pytest.param( + False, + {"field_a": "value_a"}, + {"field_a": "new_value_a"}, + 1, + False, + id="Config changed but persistence is disable: New configuration not persisted", + ), + ], + ) + def test_persist_new_configuration( + self, + mocker, + tmp_path, + pass_configuration_path, + old_configuration, + new_configuration, + new_configuration_emitted_at, + expect_new_configuration, + ): + if pass_configuration_path: + old_configuration_path = tmp_path / "config.json" + with open(old_configuration_path, "w") as old_configuration_file: + json.dump(old_configuration, old_configuration_file) + else: + old_configuration_path = None + mocker.patch.object(connector_runner, "docker") + runner = connector_runner.ConnectorRunner("source-test:dev", tmp_path, old_configuration_path) + new_configuration_path = runner._persist_new_configuration(new_configuration, new_configuration_emitted_at) + if not expect_new_configuration: + assert new_configuration_path is None + else: + assert new_configuration_path == tmp_path / "updated_configurations" / f"config|{new_configuration_emitted_at}.json" diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_global_fixtures.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_global_fixtures.py index 9b1ecf07ab5f..7420c193bd4c 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_global_fixtures.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_global_fixtures.py @@ -179,3 +179,23 @@ def test_configured_catalog_fixture(mocker, configured_catalog_path): else: assert configured_catalog == conftest.build_configured_catalog_from_discovered_catalog_and_empty_streams.return_value conftest.build_configured_catalog_from_discovered_catalog_and_empty_streams.assert_called_once_with(mock_discovered_catalog, set()) + + +@pytest.mark.parametrize( + "updated_configurations", [[], ["config|created_last.json"], ["config|created_first.json", "config|created_last.json"]] +) +def test_connector_config_path_fixture(mocker, tmp_path, updated_configurations): + inputs = mocker.Mock(config_path="config.json") + base_path = tmp_path + if updated_configurations: + updated_configurations_dir = tmp_path / "updated_configurations" + updated_configurations_dir.mkdir() + for configuration_file_name in updated_configurations: + updated_configuration_path = updated_configurations_dir / configuration_file_name + updated_configuration_path.touch() + + connector_config_path = conftest.connector_config_path_fixture.__wrapped__(inputs, base_path) + if not updated_configurations: + assert connector_config_path == base_path / "config.json" + else: + assert connector_config_path == base_path / "updated_configurations" / "config|created_last.json"