Skip to content

Commit

Permalink
SAT: Capture configuration updates from connectors' control messages (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored Dec 6, 2022
1 parent 9007271 commit d28964c
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 8 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ acceptance_tests_logs/

# Secrets
secrets
updated_configurations
!airbyte-integrations/connector-templates/**/secrets

# Test logs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.22
Capture control messages to store and use updated configurations. [#19979](https://github.com/airbytehq/airbyte/pull/19979).

## 0.2.21
Optionally disable discovered catalog caching. [#19806](https://github.com/airbytehq/airbyte/pull/19806).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.2.21
LABEL io.airbyte.version=0.2.22
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import logging
import os
from glob import glob
from logging import Logger
from pathlib import Path
from subprocess import STDOUT, check_output, run
Expand Down Expand Up @@ -55,8 +56,19 @@ def cache_discovered_catalog_fixture(acceptance_test_config: Config) -> bool:

@pytest.fixture(name="connector_config_path")
def connector_config_path_fixture(inputs, base_path) -> Path:
"""Fixture with connector's config path"""
return Path(base_path) / getattr(inputs, "config_path")
"""Fixture with connector's config path. The path to the latest updated configurations will be returned if any."""
original_configuration_path = Path(base_path) / getattr(inputs, "config_path")
updated_configurations_glob = f"{original_configuration_path.parent}/updated_configurations/{original_configuration_path.stem}|**{original_configuration_path.suffix}"
existing_configurations_path_creation_time = [
(config_file_path, os.path.getctime(config_file_path)) for config_file_path in glob(updated_configurations_glob)
]
if existing_configurations_path_creation_time:
existing_configurations_path_creation_time.sort(key=lambda x: x[1])
most_recent_configuration_path = existing_configurations_path_creation_time[-1][0]
else:
most_recent_configuration_path = original_configuration_path
logging.info(f"Using {most_recent_configuration_path} as configuration. It is the most recent version.")
return Path(most_recent_configuration_path)


@pytest.fixture(name="invalid_connector_config_path")
Expand Down Expand Up @@ -127,8 +139,8 @@ def connector_spec_fixture(connector_spec_path) -> ConnectorSpecification:


@pytest.fixture(name="docker_runner")
def docker_runner_fixture(image_tag, tmp_path) -> ConnectorRunner:
return ConnectorRunner(image_tag, volume=tmp_path)
def docker_runner_fixture(image_tag, tmp_path, connector_config_path) -> ConnectorRunner:
return ConnectorRunner(image_tag, volume=tmp_path, connector_configuration_path=connector_config_path)


@pytest.fixture(name="previous_connector_image_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from typing import Iterable, List, Mapping, Optional

import docker
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, OrchestratorType
from airbyte_cdk.models import Type as AirbyteMessageType
from docker.errors import ContainerError, NotFound
from docker.models.containers import Container
from pydantic import ValidationError


class ConnectorRunner:
def __init__(self, image_name: str, volume: Path):
def __init__(self, image_name: str, volume: Path, connector_configuration_path: Optional[Path] = None):
self._client = docker.from_env()
try:
self._image = self._client.images.get(image_name)
Expand All @@ -26,6 +27,7 @@ def __init__(self, image_name: str, volume: Path):
print("Pulling completed")
self._runs = 0
self._volume_base = volume
self._connector_configuration_path = connector_configuration_path

@property
def output_folder(self) -> Path:
Expand Down Expand Up @@ -93,6 +95,7 @@ def call_read_with_state(self, config, catalog, state, **kwargs) -> List[Airbyte
return output

def run(self, cmd, config=None, state=None, catalog=None, raise_container_error: bool = True, **kwargs) -> Iterable[AirbyteMessage]:

self._runs += 1
volumes = self._prepare_volumes(config, state, catalog)
logging.debug(f"Docker run {self._image}: \n{cmd}\n" f"input: {self.input_folder}\noutput: {self.output_folder}")
Expand All @@ -109,7 +112,15 @@ def run(self, cmd, config=None, state=None, catalog=None, raise_container_error:
for line in self.read(container, command=cmd, with_ext=raise_container_error):
f.write(line.encode())
try:
yield AirbyteMessage.parse_raw(line)
airbyte_message = AirbyteMessage.parse_raw(line)
if (
airbyte_message.type is AirbyteMessageType.CONTROL
and airbyte_message.control.type is OrchestratorType.CONNECTOR_CONFIG
):
self._persist_new_configuration(
airbyte_message.control.connectorConfig.config, int(airbyte_message.control.emitted_at)
)
yield airbyte_message
except ValidationError as exc:
logging.warning("Unable to parse connector's output %s, error: %s", line, exc)

Expand Down Expand Up @@ -168,3 +179,37 @@ def env_variables(self):
@property
def entry_point(self):
return self._image.attrs["Config"]["Entrypoint"]

def _persist_new_configuration(self, new_configuration: dict, configuration_emitted_at: int) -> Optional[Path]:
"""Store new configuration values to an updated_configurations subdir under the original configuration path.
N.B. The new configuration will not be stored if no configuration path was passed to the ConnectorRunner.
Args:
new_configuration (dict): The updated configuration
configuration_emitted_at (int): Timestamp at which the configuration was emitted (ms)
Returns:
Optional[Path]: The updated configuration path if it was persisted.
"""
if self._connector_configuration_path is None:
logging.warning("No configuration path was passed to the ConnectorRunner. The new configuration was not persisted")
return None

with open(self._connector_configuration_path) as old_configuration_file:
old_configuration = json.load(old_configuration_file)

if new_configuration != old_configuration:
file_prefix = self._connector_configuration_path.stem.split("|")[0]
if "/updated_configurations/" not in str(self._connector_configuration_path):
Path(self._connector_configuration_path.parent / "updated_configurations").mkdir(exist_ok=True)
new_configuration_file_path = Path(
f"{self._connector_configuration_path.parent}/updated_configurations/{file_prefix}|{configuration_emitted_at}{self._connector_configuration_path.suffix}"
)
else:
new_configuration_file_path = Path(
f"{self._connector_configuration_path.parent}/{file_prefix}|{configuration_emitted_at}{self._connector_configuration_path.suffix}"
)

with open(new_configuration_file_path, "w") as new_configuration_file:
json.dump(new_configuration, new_configuration_file)
logging.info(f"Stored most recent configuration value to {new_configuration_file_path}")
return new_configuration_file_path
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


import json

import pytest
from airbyte_cdk.models import (
AirbyteControlConnectorConfigMessage,
AirbyteControlMessage,
AirbyteMessage,
AirbyteRecordMessage,
OrchestratorType,
)
from airbyte_cdk.models import Type as AirbyteMessageType
from source_acceptance_test.utils import connector_runner


class TestContainerRunner:
def test_run_call_persist_configuration(self, mocker, tmp_path):
old_configuration_path = tmp_path / "config.json"
new_configuration = {"field_a": "new_value_a"}
mocker.patch.object(connector_runner, "docker")
records_reads = [
AirbyteMessage(
type=AirbyteMessageType.RECORD, record=AirbyteRecordMessage(stream="test_stream", data={"foo": "bar"}, emitted_at=1.0)
).json(exclude_unset=False),
AirbyteMessage(
type=AirbyteMessageType.CONTROL,
control=AirbyteControlMessage(
type=OrchestratorType.CONNECTOR_CONFIG,
emitted_at=1.0,
connectorConfig=AirbyteControlConnectorConfigMessage(config=new_configuration),
),
).json(exclude_unset=False),
]
mocker.patch.object(connector_runner.ConnectorRunner, "read", mocker.Mock(return_value=records_reads))
mocker.patch.object(connector_runner.ConnectorRunner, "_persist_new_configuration")

runner = connector_runner.ConnectorRunner("source-test:dev", tmp_path, connector_configuration_path=old_configuration_path)
list(runner.run("dummy_cmd"))
runner._persist_new_configuration.assert_called_once_with(new_configuration, 1)

@pytest.mark.parametrize(
"pass_configuration_path, old_configuration, new_configuration, new_configuration_emitted_at, expect_new_configuration",
[
pytest.param(
True,
{"field_a": "value_a"},
{"field_a": "value_a"},
1,
False,
id="Config unchanged: No new configuration persisted",
),
pytest.param(
True, {"field_a": "value_a"}, {"field_a": "new_value_a"}, 1, True, id="Config changed: New configuration persisted"
),
pytest.param(
False,
{"field_a": "value_a"},
{"field_a": "new_value_a"},
1,
False,
id="Config changed but persistence is disable: New configuration not persisted",
),
],
)
def test_persist_new_configuration(
self,
mocker,
tmp_path,
pass_configuration_path,
old_configuration,
new_configuration,
new_configuration_emitted_at,
expect_new_configuration,
):
if pass_configuration_path:
old_configuration_path = tmp_path / "config.json"
with open(old_configuration_path, "w") as old_configuration_file:
json.dump(old_configuration, old_configuration_file)
else:
old_configuration_path = None
mocker.patch.object(connector_runner, "docker")
runner = connector_runner.ConnectorRunner("source-test:dev", tmp_path, old_configuration_path)
new_configuration_path = runner._persist_new_configuration(new_configuration, new_configuration_emitted_at)
if not expect_new_configuration:
assert new_configuration_path is None
else:
assert new_configuration_path == tmp_path / "updated_configurations" / f"config|{new_configuration_emitted_at}.json"
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,23 @@ def test_configured_catalog_fixture(mocker, configured_catalog_path):
else:
assert configured_catalog == conftest.build_configured_catalog_from_discovered_catalog_and_empty_streams.return_value
conftest.build_configured_catalog_from_discovered_catalog_and_empty_streams.assert_called_once_with(mock_discovered_catalog, set())


@pytest.mark.parametrize(
"updated_configurations", [[], ["config|created_last.json"], ["config|created_first.json", "config|created_last.json"]]
)
def test_connector_config_path_fixture(mocker, tmp_path, updated_configurations):
inputs = mocker.Mock(config_path="config.json")
base_path = tmp_path
if updated_configurations:
updated_configurations_dir = tmp_path / "updated_configurations"
updated_configurations_dir.mkdir()
for configuration_file_name in updated_configurations:
updated_configuration_path = updated_configurations_dir / configuration_file_name
updated_configuration_path.touch()

connector_config_path = conftest.connector_config_path_fixture.__wrapped__(inputs, base_path)
if not updated_configurations:
assert connector_config_path == base_path / "config.json"
else:
assert connector_config_path == base_path / "updated_configurations" / "config|created_last.json"

0 comments on commit d28964c

Please sign in to comment.