From f3d4b5a722c0c5c88db540bb1a03def9a0f8c242 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 15 Nov 2022 00:53:07 +0100 Subject: [PATCH 01/27] wip --- airbyte-cdk/python/airbyte_cdk/connector.py | 39 +++++++++++++++++-- .../python/unit_tests/test_connector.py | 38 ++++++++++++++++++ 2 files changed, 74 insertions(+), 3 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/connector.py b/airbyte-cdk/python/airbyte_cdk/connector.py index bd47f188c907..16b151b048fb 100644 --- a/airbyte-cdk/python/airbyte_cdk/connector.py +++ b/airbyte-cdk/python/airbyte_cdk/connector.py @@ -7,11 +7,18 @@ import logging import os import pkgutil +import time from abc import ABC, abstractmethod from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar import yaml -from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification +from airbyte_cdk.models import ( + AirbyteConnectionStatus, + AirbyteControlConnectorConfigMessage, + AirbyteControlMessage, + ConnectorSpecification, + OrchestratorType, +) def load_optional_package_file(package: str, filename: str) -> Optional[bytes]: @@ -94,12 +101,38 @@ def write_config(config: Mapping[str, Any], config_path: str): ... +class MutableConfig(dict): + def __init__(self, raw_config, parent=None): + self.parent = parent + super(MutableConfig, self).__init__(raw_config) + for item, value in self.items(): + if isinstance(value, dict): + self[item] = MutableConfig(value, parent=self) + + def __setitem__(self, item, value): + previous_value = self.get(item) + super(MutableConfig, self).__setitem__(item, value) + if value != previous_value: + self.emit_airbyte_control_message() + + def emit_airbyte_control_message(self): + if self.parent is not None: + self.parent.emit_airbyte_control_message() + else: + control_message = AirbyteControlMessage( + type=OrchestratorType.CONNECTOR_CONFIG, + emitted_at=time.time() * 1000, + connectorConfig=AirbyteControlConnectorConfigMessage(config=self), + ) + print(control_message.json()) + + class DefaultConnectorMixin: # can be overridden to change an input config - def configure(self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: + def configure(self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> MutableConfig[str, Any]: config_path = os.path.join(temp_dir, "config.json") self.write_config(config, config_path) - return config + return MutableConfig(config) class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC): diff --git a/airbyte-cdk/python/unit_tests/test_connector.py b/airbyte-cdk/python/unit_tests/test_connector.py index 06e9dd16ead1..1a90e74b730e 100644 --- a/airbyte-cdk/python/unit_tests/test_connector.py +++ b/airbyte-cdk/python/unit_tests/test_connector.py @@ -8,12 +8,14 @@ import os import sys import tempfile +import time from pathlib import Path from typing import Any, Mapping import pytest import yaml from airbyte_cdk import AirbyteSpec, Connector +from airbyte_cdk.connector import MutableConfig from airbyte_cdk.models import AirbyteConnectionStatus logger = logging.getLogger("airbyte") @@ -130,3 +132,39 @@ def test_multiple_spec_files_raises_exception(self, integration, use_yaml_spec, def test_no_spec_file_raises_exception(self, integration): with pytest.raises(FileNotFoundError, match="Unable to find spec."): integration.spec(logger) + + +class TestMutableConfig: + def test_emit_message_when_updated_with_new_value(self, capsys): + config = MutableConfig({"key_a": "value_a", "key_b": "value_b"}) + # Check nothing is printed on init + captured = capsys.readouterr() + assert not captured.out + before_time = time.time() * 1000 + config["key_a"] = "new_value_a" + after_time = time.time() * 1000 + captured = capsys.readouterr() + raw_control_message = json.loads(captured.out) + assert raw_control_message["type"] == "CONNECTOR_CONFIG" + assert raw_control_message["connectorConfig"] == {"config": {"key_a": "new_value_a", "key_b": "value_b"}} + assert before_time < raw_control_message["emitted_at"] < after_time + + def test_emit_message_when_updated_with_new_nested_value(self, capsys): + config = MutableConfig({"key_a": {"key_a_a": "value_a_a"}, "key_b": "value_b"}) + # Check nothing is printed on + captured = capsys.readouterr() + assert not captured.out + before_time = time.time() * 1000 + config["key_a"]["key_a_a"] = "new_value_a_a" + after_time = time.time() * 1000 + captured = capsys.readouterr() + raw_control_message = json.loads(captured.out) + assert raw_control_message["type"] == "CONNECTOR_CONFIG" + assert raw_control_message["connectorConfig"] == {"config": {"key_a": {"key_a_a": "new_value_a_a"}, "key_b": "value_b"}} + assert before_time < raw_control_message["emitted_at"] < after_time + + def test_not_emit_message_when_updated_with_same_value(self, capsys): + config = MutableConfig({"key_a": "value_a", "key_b": "value_b"}) + config["key_a"] = "value_a" + captured = capsys.readouterr() + assert not captured.out From 24d24dd1f1cd90adf56b1427541056ad99161246 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 15 Nov 2022 15:23:28 +0100 Subject: [PATCH 02/27] implementation --- .../python/airbyte_cdk/config_observation.py | 60 +++++++++++++++++++ airbyte-cdk/python/airbyte_cdk/connector.py | 44 +++----------- .../sources/utils/schema_helpers.py | 21 +++---- .../python/unit_tests/sources/test_source.py | 1 + .../unit_tests/test_config_observation.py | 52 ++++++++++++++++ .../python/unit_tests/test_connector.py | 49 ++++----------- 6 files changed, 139 insertions(+), 88 deletions(-) create mode 100644 airbyte-cdk/python/airbyte_cdk/config_observation.py create mode 100644 airbyte-cdk/python/unit_tests/test_config_observation.py diff --git a/airbyte-cdk/python/airbyte_cdk/config_observation.py b/airbyte-cdk/python/airbyte_cdk/config_observation.py new file mode 100644 index 000000000000..aaae0929acf4 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/config_observation.py @@ -0,0 +1,60 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# +import time +from abc import ABC, abstractmethod +from typing import Any, Callable, MutableMapping + +from airbyte_cdk.models import AirbyteControlConnectorConfigMessage, AirbyteControlMessage, OrchestratorType + + +class BaseObserver(ABC): + @abstractmethod + def update(self): + ... + + +class ObservedDict(dict): + def __init__(self, non_observed_mapping: MutableMapping, observer: BaseObserver) -> None: + super().__init__(non_observed_mapping) + self.observer = observer + for item, value in self.items(): + if isinstance(value, MutableMapping): + self[item] = ObservedDict(value, observer) + + def __setitem__(self, item: Any, value: Any): + """Override dict__setitem__ by: + 1. Observing the new value if it is a dict + 2. Call observer update if the new value is different from the previous one + """ + previous_value = self.get(item) + value = ObservedDict(value, self.observer) if isinstance(value, MutableMapping) else value + super(ObservedDict, self).__setitem__(item, value) + if value != previous_value: + self.observer.update() + + +class ConfigObserver(BaseObserver): + """This class is made to track mutations on ObservedDict config. + When update is called the observed configuration is saved on disk a CONNECTOR_CONFIG control message is emitted on stdout. + """ + + def __init__(self, config_path: str, write_config_fn: Callable) -> None: + self.config_path = config_path + self.write_config_fn = write_config_fn + + def set_config(self, config: ObservedDict) -> None: + self.config = config + self.write_config_fn(self.config, self.config_path) + + def update(self) -> None: + self.write_config_fn(self.config, self.config_path) + self._emit_airbyte_control_message() + + def _emit_airbyte_control_message(self) -> None: + control_message = AirbyteControlMessage( + type=OrchestratorType.CONNECTOR_CONFIG, + emitted_at=time.time() * 1000, + connectorConfig=AirbyteControlConnectorConfigMessage(config=self.config), + ) + print(control_message.json()) diff --git a/airbyte-cdk/python/airbyte_cdk/connector.py b/airbyte-cdk/python/airbyte_cdk/connector.py index 16b151b048fb..91e822dfae1b 100644 --- a/airbyte-cdk/python/airbyte_cdk/connector.py +++ b/airbyte-cdk/python/airbyte_cdk/connector.py @@ -7,18 +7,12 @@ import logging import os import pkgutil -import time from abc import ABC, abstractmethod from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar import yaml -from airbyte_cdk.models import ( - AirbyteConnectionStatus, - AirbyteControlConnectorConfigMessage, - AirbyteControlMessage, - ConnectorSpecification, - OrchestratorType, -) +from airbyte_cdk.config_observation import ConfigObserver, ObservedDict +from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification def load_optional_package_file(package: str, filename: str) -> Optional[bytes]: @@ -101,38 +95,14 @@ def write_config(config: Mapping[str, Any], config_path: str): ... -class MutableConfig(dict): - def __init__(self, raw_config, parent=None): - self.parent = parent - super(MutableConfig, self).__init__(raw_config) - for item, value in self.items(): - if isinstance(value, dict): - self[item] = MutableConfig(value, parent=self) - - def __setitem__(self, item, value): - previous_value = self.get(item) - super(MutableConfig, self).__setitem__(item, value) - if value != previous_value: - self.emit_airbyte_control_message() - - def emit_airbyte_control_message(self): - if self.parent is not None: - self.parent.emit_airbyte_control_message() - else: - control_message = AirbyteControlMessage( - type=OrchestratorType.CONNECTOR_CONFIG, - emitted_at=time.time() * 1000, - connectorConfig=AirbyteControlConnectorConfigMessage(config=self), - ) - print(control_message.json()) - - class DefaultConnectorMixin: # can be overridden to change an input config - def configure(self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> MutableConfig[str, Any]: + def configure(self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> ObservedDict[str, Any]: config_path = os.path.join(temp_dir, "config.json") - self.write_config(config, config_path) - return MutableConfig(config) + config_observer = ConfigObserver(config_path, self.write_config) + observed_config = ObservedDict(config, config_observer) + config_observer.set_config(observed_config) + return observed_config class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py index aea02ecec950..26b35edbf4a9 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py @@ -176,23 +176,18 @@ def dict(self, *args, **kwargs): return super().dict(*args, **kwargs) -def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]: +def split_config(config: MutableMapping[str, Any]) -> Tuple[MutableMapping, InternalConfig]: """ - Break config map object into 2 instances: first is a dict with user defined - configuration and second is internal config that contains private keys for - acceptance test configuration. + Remove internal config keywords from config and build an InternalConfig from it. :param - config - Dict object that has been loaded from config file. + config - MutableMapping object that has been loaded from config file. - :return tuple of user defined config dict with filtered out internal + :return tuple of user defined config MutableMapping with filtered out internal parameters and SAT internal config object. """ - main_config = {} internal_config = {} - for k, v in config.items(): - if k in InternalConfig.KEYWORDS: - internal_config[k] = v - else: - main_config[k] = v - return main_config, InternalConfig.parse_obj(internal_config) + for key in list(config.keys()): + if key in InternalConfig.KEYWORDS: + internal_config[key] = config.pop(key) + return config, InternalConfig.parse_obj(internal_config) diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index 1034975c1892..2329dfab4cdd 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -404,6 +404,7 @@ def test_internal_config_limit(abstract_source, catalog): # Set limit and check if state is produced when limit is set for incremental stream logger_mock.reset_mock() + internal_config = {"some_config": 100, "_limit": STREAM_LIMIT} records = [r for r in abstract_source.read(logger=logger_mock, config=internal_config, catalog=catalog, state={})] assert len(records) == STREAM_LIMIT + 1 assert records[-1].type == Type.STATE diff --git a/airbyte-cdk/python/unit_tests/test_config_observation.py b/airbyte-cdk/python/unit_tests/test_config_observation.py new file mode 100644 index 000000000000..38332aa2fddb --- /dev/null +++ b/airbyte-cdk/python/unit_tests/test_config_observation.py @@ -0,0 +1,52 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# +import json +import time + +from airbyte_cdk.config_observation import ConfigObserver, ObservedDict + + +class TestObservedDict: + def test_update_called_on_set_item(self, mocker): + mock_observer = mocker.Mock() + my_observed_dict = ObservedDict({"key": "value"}, mock_observer) + assert mock_observer.update.call_count == 0 + my_observed_dict["key"] = {"nested_key": "nested_value"} + assert mock_observer.update.call_count == 1 + my_observed_dict["key"]["nested_key"] = "new_nested_value" + assert mock_observer.update.call_count == 2 + # Setting the same value again should not call observer's update + my_observed_dict["key"]["nested_key"] = "new_nested_value" + assert mock_observer.update.call_count == 2 + + def test_update_not_called_on_init_with_nested_fields(self, mocker): + mock_observer = mocker.Mock() + ObservedDict({"key": "value", "nested": {"key": "value"}}, mock_observer) + mock_observer.update.assert_not_called() + + +class TestConfigObserver: + def test_update(self, mocker, capsys): + mock_write_config_fn = mocker.Mock() + mock_config_path = mocker.Mock() + config_observer = ConfigObserver(mock_config_path, mock_write_config_fn) + config_observer.config = ObservedDict({"key": "value"}, config_observer) + before_time = time.time() * 1000 + config_observer.update() + after_time = time.time() * 1000 + captured = capsys.readouterr() + raw_control_message = json.loads(captured.out) + mock_write_config_fn.assert_called_with(config_observer.config, mock_config_path) + assert raw_control_message["type"] == "CONNECTOR_CONFIG" + assert raw_control_message["connectorConfig"] == {"config": dict(config_observer.config)} + assert before_time < raw_control_message["emitted_at"] < after_time + + def test_set_config(self, mocker): + mock_write_config_fn = mocker.Mock() + mock_config_path = mocker.Mock() + config_observer = ConfigObserver(mock_config_path, mock_write_config_fn) + observed_config = ObservedDict({"key": "value"}, config_observer) + config_observer.set_config(observed_config) + assert config_observer.config == observed_config + mock_write_config_fn.assert_called_once_with(observed_config, mock_config_path) diff --git a/airbyte-cdk/python/unit_tests/test_connector.py b/airbyte-cdk/python/unit_tests/test_connector.py index 1a90e74b730e..0024ab2dca2e 100644 --- a/airbyte-cdk/python/unit_tests/test_connector.py +++ b/airbyte-cdk/python/unit_tests/test_connector.py @@ -8,14 +8,13 @@ import os import sys import tempfile -import time from pathlib import Path from typing import Any, Mapping import pytest import yaml from airbyte_cdk import AirbyteSpec, Connector -from airbyte_cdk.connector import MutableConfig +from airbyte_cdk.config_observation import ConfigObserver, ObservedDict from airbyte_cdk.models import AirbyteConnectionStatus logger = logging.getLogger("airbyte") @@ -84,6 +83,16 @@ def test_write_config(integration, mock_config): assert mock_config == json.loads(actual.read()) +def test_configure(integration, mock_config): + temp_dir = tempfile.gettempdir() + config = integration.configure(mock_config, temp_dir) + assert isinstance(config, ObservedDict) + assert isinstance(config.observer, ConfigObserver) + assert config.observer.config == config + assert config.observer.config_path == os.path.join(temp_dir, "config.json") + assert config.observer.write_config_fn == integration.write_config + + class TestConnectorSpec: CONNECTION_SPECIFICATION = { "type": "object", @@ -132,39 +141,3 @@ def test_multiple_spec_files_raises_exception(self, integration, use_yaml_spec, def test_no_spec_file_raises_exception(self, integration): with pytest.raises(FileNotFoundError, match="Unable to find spec."): integration.spec(logger) - - -class TestMutableConfig: - def test_emit_message_when_updated_with_new_value(self, capsys): - config = MutableConfig({"key_a": "value_a", "key_b": "value_b"}) - # Check nothing is printed on init - captured = capsys.readouterr() - assert not captured.out - before_time = time.time() * 1000 - config["key_a"] = "new_value_a" - after_time = time.time() * 1000 - captured = capsys.readouterr() - raw_control_message = json.loads(captured.out) - assert raw_control_message["type"] == "CONNECTOR_CONFIG" - assert raw_control_message["connectorConfig"] == {"config": {"key_a": "new_value_a", "key_b": "value_b"}} - assert before_time < raw_control_message["emitted_at"] < after_time - - def test_emit_message_when_updated_with_new_nested_value(self, capsys): - config = MutableConfig({"key_a": {"key_a_a": "value_a_a"}, "key_b": "value_b"}) - # Check nothing is printed on - captured = capsys.readouterr() - assert not captured.out - before_time = time.time() * 1000 - config["key_a"]["key_a_a"] = "new_value_a_a" - after_time = time.time() * 1000 - captured = capsys.readouterr() - raw_control_message = json.loads(captured.out) - assert raw_control_message["type"] == "CONNECTOR_CONFIG" - assert raw_control_message["connectorConfig"] == {"config": {"key_a": {"key_a_a": "new_value_a_a"}, "key_b": "value_b"}} - assert before_time < raw_control_message["emitted_at"] < after_time - - def test_not_emit_message_when_updated_with_same_value(self, capsys): - config = MutableConfig({"key_a": "value_a", "key_b": "value_b"}) - config["key_a"] = "value_a" - captured = capsys.readouterr() - assert not captured.out From 355b90bf83c6e51a3e7f21256102885b2a682e67 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 15 Nov 2022 16:00:08 +0100 Subject: [PATCH 03/27] format --- airbyte-cdk/python/airbyte_cdk/config_observation.py | 1 + airbyte-cdk/python/unit_tests/test_config_observation.py | 1 + 2 files changed, 2 insertions(+) diff --git a/airbyte-cdk/python/airbyte_cdk/config_observation.py b/airbyte-cdk/python/airbyte_cdk/config_observation.py index aaae0929acf4..2e1106d076a4 100644 --- a/airbyte-cdk/python/airbyte_cdk/config_observation.py +++ b/airbyte-cdk/python/airbyte_cdk/config_observation.py @@ -1,6 +1,7 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # + import time from abc import ABC, abstractmethod from typing import Any, Callable, MutableMapping diff --git a/airbyte-cdk/python/unit_tests/test_config_observation.py b/airbyte-cdk/python/unit_tests/test_config_observation.py index 38332aa2fddb..aaa741dc9ce6 100644 --- a/airbyte-cdk/python/unit_tests/test_config_observation.py +++ b/airbyte-cdk/python/unit_tests/test_config_observation.py @@ -1,6 +1,7 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # + import json import time From 61e9d4b3591a726322b92a63c83e7e8b2e5f82f2 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 15 Nov 2022 16:03:08 +0100 Subject: [PATCH 04/27] bump version --- airbyte-cdk/python/CHANGELOG.md | 3 +++ airbyte-cdk/python/setup.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index d14aab7d1c39..16bd4dcc9227 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.10.0 +Emit `AirbyteControlMessage.ConnectorConfig` on `config` mutation. + ## 0.9.1 Low-code: Fix filtering vars in `InterpolatedRequestInputProvider.eval_request_inputs` diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index c0c0743d3f07..133323cb4827 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.9.1", + version="0.10.0", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", From 22f75c7647984b90f9cf87e9eb4ed5131eb7d9ea Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 16 Nov 2022 15:27:07 +0100 Subject: [PATCH 05/27] always update by default, even if same value --- airbyte-cdk/python/airbyte_cdk/config_observation.py | 7 ++++--- airbyte-cdk/python/unit_tests/test_config_observation.py | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/config_observation.py b/airbyte-cdk/python/airbyte_cdk/config_observation.py index 2e1106d076a4..6da1683cb77f 100644 --- a/airbyte-cdk/python/airbyte_cdk/config_observation.py +++ b/airbyte-cdk/python/airbyte_cdk/config_observation.py @@ -16,12 +16,13 @@ def update(self): class ObservedDict(dict): - def __init__(self, non_observed_mapping: MutableMapping, observer: BaseObserver) -> None: - super().__init__(non_observed_mapping) + def __init__(self, non_observed_mapping: MutableMapping, observer: BaseObserver, update_on_unchanged_value=True) -> None: self.observer = observer + self.update_on_unchanged_value = update_on_unchanged_value for item, value in self.items(): if isinstance(value, MutableMapping): self[item] = ObservedDict(value, observer) + super().__init__(non_observed_mapping) def __setitem__(self, item: Any, value: Any): """Override dict__setitem__ by: @@ -31,7 +32,7 @@ def __setitem__(self, item: Any, value: Any): previous_value = self.get(item) value = ObservedDict(value, self.observer) if isinstance(value, MutableMapping) else value super(ObservedDict, self).__setitem__(item, value) - if value != previous_value: + if self.update_on_unchanged_value or value != previous_value: self.observer.update() diff --git a/airbyte-cdk/python/unit_tests/test_config_observation.py b/airbyte-cdk/python/unit_tests/test_config_observation.py index aaa741dc9ce6..991c165a36cc 100644 --- a/airbyte-cdk/python/unit_tests/test_config_observation.py +++ b/airbyte-cdk/python/unit_tests/test_config_observation.py @@ -17,13 +17,13 @@ def test_update_called_on_set_item(self, mocker): assert mock_observer.update.call_count == 1 my_observed_dict["key"]["nested_key"] = "new_nested_value" assert mock_observer.update.call_count == 2 - # Setting the same value again should not call observer's update + # Setting the same value again should call observer's update my_observed_dict["key"]["nested_key"] = "new_nested_value" - assert mock_observer.update.call_count == 2 + assert mock_observer.update.call_count == 3 def test_update_not_called_on_init_with_nested_fields(self, mocker): mock_observer = mocker.Mock() - ObservedDict({"key": "value", "nested": {"key": "value"}}, mock_observer) + ObservedDict({"key": "value", "nested": {"nested_key": "nested_value"}}, mock_observer) mock_observer.update.assert_not_called() From d562117af9417f80a3ad80c5d698f6fcf0c822cd Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 16 Nov 2022 15:41:29 +0100 Subject: [PATCH 06/27] rename split_config to filter_internal_keywords --- airbyte-cdk/python/airbyte_cdk/entrypoint.py | 4 ++-- airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py | 4 ++-- .../python/airbyte_cdk/sources/utils/schema_helpers.py | 2 +- .../connectors/source-amazon-ads/unit_tests/utils.py | 4 ++-- .../source-azure-table/source_azure_table/source.py | 4 ++-- .../connectors/source-facebook-marketing/unit_tests/utils.py | 4 ++-- .../source-google-search-console/unit_tests/utils.py | 4 ++-- .../connectors/source-hubspot/source_hubspot/source.py | 4 ++-- .../connectors/source-mixpanel/unit_tests/utils.py | 4 ++-- 9 files changed, 17 insertions(+), 17 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/entrypoint.py b/airbyte-cdk/python/airbyte_cdk/entrypoint.py index b3b853429989..1958ab09a8bd 100644 --- a/airbyte-cdk/python/airbyte_cdk/entrypoint.py +++ b/airbyte-cdk/python/airbyte_cdk/entrypoint.py @@ -16,7 +16,7 @@ from airbyte_cdk.models import AirbyteMessage, Status, Type from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, filter_internal_keywords from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets, update_secrets from airbyte_cdk.utils.traced_exception import AirbyteTracedException @@ -92,7 +92,7 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: # Remove internal flags from config before validating so # jsonschema's additionalProperties flag wont fail the validation - connector_config, _ = split_config(config) + connector_config, _ = filter_internal_keywords(config) if self.source.check_config_against_spec or cmd == "check": try: check_config_against_spec_or_exit(connector_config, source_spec) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 389ffcc19d66..e366a67eb4e0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -23,7 +23,7 @@ from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.streams.http.http import HttpStream from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message -from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config +from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, filter_internal_keywords from airbyte_cdk.utils.event_timing import create_timer from airbyte_cdk.utils.traced_exception import AirbyteTracedException @@ -92,7 +92,7 @@ def read( ) -> Iterator[AirbyteMessage]: """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.""" logger.info(f"Starting syncing {self.name}") - config, internal_config = split_config(config) + config, internal_config = filter_internal_keywords(config) # TODO assert all streams exist in the connector # get the streams once in case the connector needs to make any queries to generate them stream_instances = {s.name: s for s in self.streams(config)} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py index 26b35edbf4a9..afd486d1d3b7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py @@ -176,7 +176,7 @@ def dict(self, *args, **kwargs): return super().dict(*args, **kwargs) -def split_config(config: MutableMapping[str, Any]) -> Tuple[MutableMapping, InternalConfig]: +def filter_internal_keywords(config: MutableMapping[str, Any]) -> Tuple[MutableMapping, InternalConfig]: """ Remove internal config keywords from config and build an InternalConfig from it. diff --git a/airbyte-integrations/connectors/source-amazon-ads/unit_tests/utils.py b/airbyte-integrations/connectors/source-amazon-ads/unit_tests/utils.py index 71f5d8566f31..8bc9c43388ab 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/unit_tests/utils.py +++ b/airbyte-integrations/connectors/source-amazon-ads/unit_tests/utils.py @@ -10,7 +10,7 @@ from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, filter_internal_keywords def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any]) -> Iterator[dict]: @@ -32,7 +32,7 @@ def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, def command_check(source: Source, config): logger = mock.MagicMock() - connector_config, _ = split_config(config) + connector_config, _ = filter_internal_keywords(config) if source.check_config_against_spec: source_spec: ConnectorSpecification = source.spec(logger) check_config_against_spec_or_exit(connector_config, source_spec) diff --git a/airbyte-integrations/connectors/source-azure-table/source_azure_table/source.py b/airbyte-integrations/connectors/source-azure-table/source_azure_table/source.py index d728ad4dd79b..8bf91070814f 100644 --- a/airbyte-integrations/connectors/source-azure-table/source_azure_table/source.py +++ b/airbyte-integrations/connectors/source-azure-table/source_azure_table/source.py @@ -17,7 +17,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.utils.schema_helpers import split_config +from airbyte_cdk.sources.utils.schema_helpers import filter_internal_keywords from airbyte_cdk.utils.event_timing import create_timer from .azure_table import AzureTableReader @@ -103,7 +103,7 @@ def read( stream_instances = {s.name: s for s in self.streams(logger=logger, config=config)} state_manager = ConnectorStateManager(stream_instance_map=stream_instances, state=state) logger.info(f"Starting syncing {self.name}") - config, internal_config = split_config(config) + config, internal_config = filter_internal_keywords(config) self._stream_to_instance_map = stream_instances with create_timer(self.name) as timer: for configured_stream in catalog.streams: diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/utils.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/utils.py index 776315e717e6..5f5f8bb56859 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/utils.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/utils.py @@ -7,12 +7,12 @@ from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, filter_internal_keywords def command_check(source: Source, config): logger = mock.MagicMock() - connector_config, _ = split_config(config) + connector_config, _ = filter_internal_keywords(config) if source.check_config_against_spec: source_spec: ConnectorSpecification = source.spec(logger) check_config_against_spec_or_exit(connector_config, source_spec) diff --git a/airbyte-integrations/connectors/source-google-search-console/unit_tests/utils.py b/airbyte-integrations/connectors/source-google-search-console/unit_tests/utils.py index 776315e717e6..5f5f8bb56859 100644 --- a/airbyte-integrations/connectors/source-google-search-console/unit_tests/utils.py +++ b/airbyte-integrations/connectors/source-google-search-console/unit_tests/utils.py @@ -7,12 +7,12 @@ from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, filter_internal_keywords def command_check(source: Source, config): logger = mock.MagicMock() - connector_config, _ = split_config(config) + connector_config, _ = filter_internal_keywords(config) if source.check_config_against_spec: source_spec: ConnectorSpecification = source.spec(logger) check_config_against_spec_or_exit(connector_config, source_spec) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py index a721b33ed64f..87c8cd724971 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py @@ -12,7 +12,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.utils.schema_helpers import split_config +from airbyte_cdk.sources.utils.schema_helpers import filter_internal_keywords from airbyte_cdk.utils.event_timing import create_timer from airbyte_cdk.utils.traced_exception import AirbyteTracedException from requests import HTTPError @@ -154,7 +154,7 @@ def read( This method is overridden to check whether the stream `quotes` exists in the source, if not skip reading that stream. """ logger.info(f"Starting syncing {self.name}") - config, internal_config = split_config(config) + config, internal_config = filter_internal_keywords(config) # TODO assert all streams exist in the connector # get the streams once in case the connector needs to make any queries to generate them stream_instances = {s.name: s for s in self.streams(config)} diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py index b108cfd2eefc..4041fbb4f3f6 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py @@ -10,7 +10,7 @@ from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, filter_internal_keywords def setup_response(status, body): @@ -23,7 +23,7 @@ def get_url_to_mock(stream): def command_check(source: Source, config): logger = mock.MagicMock() - connector_config, _ = split_config(config) + connector_config, _ = filter_internal_keywords(config) if source.check_config_against_spec: source_spec: ConnectorSpecification = source.spec(logger) check_config_against_spec_or_exit(connector_config, source_spec) From 733b89a4bd9de3b6823a7d318203c94340bff51b Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 16 Nov 2022 16:35:34 +0100 Subject: [PATCH 07/27] bing ads example --- .../source-bing-ads/source_bing_ads/client.py | 51 ++++++++----------- .../source-bing-ads/source_bing_ads/source.py | 4 +- 2 files changed, 24 insertions(+), 31 deletions(-) diff --git a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py index e6193ca3ce0f..12e74a281546 100644 --- a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py +++ b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py @@ -11,6 +11,7 @@ import backoff import pendulum +from airbyte_cdk.config_observation import ObservedDict from airbyte_cdk.logger import AirbyteLogger from bingads.authorization import AuthorizationData, OAuthTokens, OAuthWebAuthCodeGrant from bingads.service_client import ServiceClient @@ -36,47 +37,39 @@ class Client: # The time interval in milliseconds between two status polling attempts. report_poll_interval: int = 15000 - def __init__( - self, - tenant_id: str, - reports_start_date: str, - developer_token: str = None, - client_id: str = None, - client_secret: str = None, - refresh_token: str = None, - **kwargs: Mapping[str, Any], - ) -> None: + def __init__(self, config: ObservedDict) -> None: + self.config = config self.authorization_data: Mapping[str, AuthorizationData] = {} - self.refresh_token = refresh_token - self.developer_token = developer_token - - self.client_id = client_id - self.client_secret = client_secret - - self.authentication = self._get_auth_client(client_id, tenant_id, client_secret) + self.authentication = self._get_auth_client() self.oauth: OAuthTokens = self._get_access_token() - self.reports_start_date = pendulum.parse(reports_start_date).astimezone(tz=timezone.utc) + self.reports_start_date = pendulum.parse(config["reports_start_date"]).astimezone(tz=timezone.utc) - def _get_auth_client(self, client_id: str, tenant_id: str, client_secret: str = None) -> OAuthWebAuthCodeGrant: - # https://github.com/BingAds/BingAds-Python-SDK/blob/e7b5a618e87a43d0a5e2c79d9aa4626e208797bd/bingads/authorization.py#L390 - auth_creds = { - "client_id": client_id, + @property + def auth_creds(self): + return { + "client_id": self.config["client_id"], "redirection_uri": "", # should be empty string - "client_secret": None, - "tenant": tenant_id, + "client_secret": self.config.get("client_secret"), + "tenant": self.config["tenant_id"], } + + def _get_auth_client(self) -> OAuthWebAuthCodeGrant: + # https://github.com/BingAds/BingAds-Python-SDK/blob/e7b5a618e87a43d0a5e2c79d9aa4626e208797bd/bingads/authorization.py#L390 + # the `client_secret` should be provided for `non-public clients` only # https://docs.microsoft.com/en-us/advertising/guides/authentication-oauth-get-tokens?view=bingads-13#request-accesstoken - if client_secret and client_secret != "": - auth_creds["client_secret"] = client_secret - return OAuthWebAuthCodeGrant(**auth_creds) + + return OAuthWebAuthCodeGrant(**self.auth_creds, token_refreshed_callback=self.update_tokens) + + def update_tokens(self, new_oauth_tokens: OAuthTokens): + self.config["refresh_token"] = new_oauth_tokens.refresh_token @lru_cache(maxsize=4) def _get_auth_data(self, customer_id: str = None, account_id: Optional[str] = None) -> AuthorizationData: return AuthorizationData( account_id=account_id, customer_id=customer_id, - developer_token=self.developer_token, + developer_token=self.config["developer_token"], authentication=self.authentication, ) @@ -85,7 +78,7 @@ def _get_access_token(self) -> OAuthTokens: # clear caches to be able to use new access token self.get_service.cache_clear() self._get_auth_data.cache_clear() - return self.authentication.request_oauth_tokens_by_refresh_token(self.refresh_token) + return self.authentication.request_oauth_tokens_by_refresh_token(self.config["refresh_token"]) def is_token_expiring(self) -> bool: """ diff --git a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py index b16bdbf600ab..995271fb8e9d 100644 --- a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py +++ b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py @@ -719,7 +719,7 @@ class SourceBingAds(AbstractSource): def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]: try: - client = Client(**config) + client = Client(config) account_ids = {str(account["Id"]) for account in Accounts(client, config).read_records(SyncMode.full_refresh)} if account_ids: return True, None @@ -738,7 +738,7 @@ def get_report_streams(self, aggregation_type: str) -> List[Stream]: ] def streams(self, config: Mapping[str, Any]) -> List[Stream]: - client = Client(**config) + client = Client(config) streams = [ Accounts(client, config), AdGroups(client, config), From a59a1e6ed5efa7204b2a1e8476782462ba39a8d8 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 16 Nov 2022 17:29:11 +0100 Subject: [PATCH 08/27] wrap around AirbyteMessage --- airbyte-cdk/python/airbyte_cdk/config_observation.py | 5 +++-- airbyte-cdk/python/unit_tests/test_config_observation.py | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/config_observation.py b/airbyte-cdk/python/airbyte_cdk/config_observation.py index 6da1683cb77f..7b53fb382deb 100644 --- a/airbyte-cdk/python/airbyte_cdk/config_observation.py +++ b/airbyte-cdk/python/airbyte_cdk/config_observation.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from typing import Any, Callable, MutableMapping -from airbyte_cdk.models import AirbyteControlConnectorConfigMessage, AirbyteControlMessage, OrchestratorType +from airbyte_cdk.models import AirbyteControlConnectorConfigMessage, AirbyteControlMessage, AirbyteMessage, OrchestratorType, Type class BaseObserver(ABC): @@ -59,4 +59,5 @@ def _emit_airbyte_control_message(self) -> None: emitted_at=time.time() * 1000, connectorConfig=AirbyteControlConnectorConfigMessage(config=self.config), ) - print(control_message.json()) + airbyte_message = AirbyteMessage(type=Type.CONTROL, control=control_message) + print(airbyte_message.json()) diff --git a/airbyte-cdk/python/unit_tests/test_config_observation.py b/airbyte-cdk/python/unit_tests/test_config_observation.py index 991c165a36cc..672f47d1c21f 100644 --- a/airbyte-cdk/python/unit_tests/test_config_observation.py +++ b/airbyte-cdk/python/unit_tests/test_config_observation.py @@ -37,7 +37,10 @@ def test_update(self, mocker, capsys): config_observer.update() after_time = time.time() * 1000 captured = capsys.readouterr() - raw_control_message = json.loads(captured.out) + airbyte_message = json.loads(captured.out) + assert airbyte_message["type"] == "CONTROL" + assert "control" in airbyte_message + raw_control_message = airbyte_message["control"] mock_write_config_fn.assert_called_with(config_observer.config, mock_config_path) assert raw_control_message["type"] == "CONNECTOR_CONFIG" assert raw_control_message["connectorConfig"] == {"config": dict(config_observer.config)} From 616256ed084585c371b28962504c6c4db7b8e71e Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 16 Nov 2022 17:33:13 +0100 Subject: [PATCH 09/27] exclude unset --- airbyte-cdk/python/airbyte_cdk/config_observation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/config_observation.py b/airbyte-cdk/python/airbyte_cdk/config_observation.py index 7b53fb382deb..54c3b740482c 100644 --- a/airbyte-cdk/python/airbyte_cdk/config_observation.py +++ b/airbyte-cdk/python/airbyte_cdk/config_observation.py @@ -60,4 +60,4 @@ def _emit_airbyte_control_message(self) -> None: connectorConfig=AirbyteControlConnectorConfigMessage(config=self.config), ) airbyte_message = AirbyteMessage(type=Type.CONTROL, control=control_message) - print(airbyte_message.json()) + print(airbyte_message.json(exclude_unset=True)) From 3ef545d6126bd9d82eb0b043513e178c8c17e10b Mon Sep 17 00:00:00 2001 From: alafanechere Date: Mon, 21 Nov 2022 12:10:37 +0100 Subject: [PATCH 10/27] observer does not write config to disk --- .../python/airbyte_cdk/config_observation.py | 22 +++++-------------- .../unit_tests/test_config_observation.py | 18 +++------------ 2 files changed, 9 insertions(+), 31 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/config_observation.py b/airbyte-cdk/python/airbyte_cdk/config_observation.py index 54c3b740482c..eac02082f5c0 100644 --- a/airbyte-cdk/python/airbyte_cdk/config_observation.py +++ b/airbyte-cdk/python/airbyte_cdk/config_observation.py @@ -1,22 +1,18 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +from __future__ import ( # Used to evaluate type hints at runtime, a NameError: name 'ConfigObserver' is not defined is thrown otherwise + annotations, +) import time -from abc import ABC, abstractmethod -from typing import Any, Callable, MutableMapping +from typing import Any, MutableMapping from airbyte_cdk.models import AirbyteControlConnectorConfigMessage, AirbyteControlMessage, AirbyteMessage, OrchestratorType, Type -class BaseObserver(ABC): - @abstractmethod - def update(self): - ... - - class ObservedDict(dict): - def __init__(self, non_observed_mapping: MutableMapping, observer: BaseObserver, update_on_unchanged_value=True) -> None: + def __init__(self, non_observed_mapping: MutableMapping, observer: ConfigObserver, update_on_unchanged_value=True) -> None: self.observer = observer self.update_on_unchanged_value = update_on_unchanged_value for item, value in self.items(): @@ -36,21 +32,15 @@ def __setitem__(self, item: Any, value: Any): self.observer.update() -class ConfigObserver(BaseObserver): +class ConfigObserver: """This class is made to track mutations on ObservedDict config. When update is called the observed configuration is saved on disk a CONNECTOR_CONFIG control message is emitted on stdout. """ - def __init__(self, config_path: str, write_config_fn: Callable) -> None: - self.config_path = config_path - self.write_config_fn = write_config_fn - def set_config(self, config: ObservedDict) -> None: self.config = config - self.write_config_fn(self.config, self.config_path) def update(self) -> None: - self.write_config_fn(self.config, self.config_path) self._emit_airbyte_control_message() def _emit_airbyte_control_message(self) -> None: diff --git a/airbyte-cdk/python/unit_tests/test_config_observation.py b/airbyte-cdk/python/unit_tests/test_config_observation.py index 672f47d1c21f..d6dfdad311f5 100644 --- a/airbyte-cdk/python/unit_tests/test_config_observation.py +++ b/airbyte-cdk/python/unit_tests/test_config_observation.py @@ -28,11 +28,9 @@ def test_update_not_called_on_init_with_nested_fields(self, mocker): class TestConfigObserver: - def test_update(self, mocker, capsys): - mock_write_config_fn = mocker.Mock() - mock_config_path = mocker.Mock() - config_observer = ConfigObserver(mock_config_path, mock_write_config_fn) - config_observer.config = ObservedDict({"key": "value"}, config_observer) + def test_update(self, capsys): + config_observer = ConfigObserver() + config_observer.set_config(ObservedDict({"key": "value"}, config_observer)) before_time = time.time() * 1000 config_observer.update() after_time = time.time() * 1000 @@ -41,16 +39,6 @@ def test_update(self, mocker, capsys): assert airbyte_message["type"] == "CONTROL" assert "control" in airbyte_message raw_control_message = airbyte_message["control"] - mock_write_config_fn.assert_called_with(config_observer.config, mock_config_path) assert raw_control_message["type"] == "CONNECTOR_CONFIG" assert raw_control_message["connectorConfig"] == {"config": dict(config_observer.config)} assert before_time < raw_control_message["emitted_at"] < after_time - - def test_set_config(self, mocker): - mock_write_config_fn = mocker.Mock() - mock_config_path = mocker.Mock() - config_observer = ConfigObserver(mock_config_path, mock_write_config_fn) - observed_config = ObservedDict({"key": "value"}, config_observer) - config_observer.set_config(observed_config) - assert config_observer.config == observed_config - mock_write_config_fn.assert_called_once_with(observed_config, mock_config_path) From 0ff7318f83a1f75f67d17934b00aea1b5e65f4f0 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Mon, 21 Nov 2022 12:16:11 +0100 Subject: [PATCH 11/27] revert global changes --- airbyte-cdk/python/airbyte_cdk/entrypoint.py | 4 ++-- .../airbyte_cdk/sources/abstract_source.py | 4 ++-- .../sources/utils/schema_helpers.py | 21 ++++++++++++------- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/entrypoint.py b/airbyte-cdk/python/airbyte_cdk/entrypoint.py index 1958ab09a8bd..b3b853429989 100644 --- a/airbyte-cdk/python/airbyte_cdk/entrypoint.py +++ b/airbyte-cdk/python/airbyte_cdk/entrypoint.py @@ -16,7 +16,7 @@ from airbyte_cdk.models import AirbyteMessage, Status, Type from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, filter_internal_keywords +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets, update_secrets from airbyte_cdk.utils.traced_exception import AirbyteTracedException @@ -92,7 +92,7 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: # Remove internal flags from config before validating so # jsonschema's additionalProperties flag wont fail the validation - connector_config, _ = filter_internal_keywords(config) + connector_config, _ = split_config(config) if self.source.check_config_against_spec or cmd == "check": try: check_config_against_spec_or_exit(connector_config, source_spec) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index e366a67eb4e0..389ffcc19d66 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -23,7 +23,7 @@ from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.streams.http.http import HttpStream from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message -from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, filter_internal_keywords +from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config from airbyte_cdk.utils.event_timing import create_timer from airbyte_cdk.utils.traced_exception import AirbyteTracedException @@ -92,7 +92,7 @@ def read( ) -> Iterator[AirbyteMessage]: """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.""" logger.info(f"Starting syncing {self.name}") - config, internal_config = filter_internal_keywords(config) + config, internal_config = split_config(config) # TODO assert all streams exist in the connector # get the streams once in case the connector needs to make any queries to generate them stream_instances = {s.name: s for s in self.streams(config)} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py index afd486d1d3b7..aea02ecec950 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py @@ -176,18 +176,23 @@ def dict(self, *args, **kwargs): return super().dict(*args, **kwargs) -def filter_internal_keywords(config: MutableMapping[str, Any]) -> Tuple[MutableMapping, InternalConfig]: +def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]: """ - Remove internal config keywords from config and build an InternalConfig from it. + Break config map object into 2 instances: first is a dict with user defined + configuration and second is internal config that contains private keys for + acceptance test configuration. :param - config - MutableMapping object that has been loaded from config file. + config - Dict object that has been loaded from config file. - :return tuple of user defined config MutableMapping with filtered out internal + :return tuple of user defined config dict with filtered out internal parameters and SAT internal config object. """ + main_config = {} internal_config = {} - for key in list(config.keys()): - if key in InternalConfig.KEYWORDS: - internal_config[key] = config.pop(key) - return config, InternalConfig.parse_obj(internal_config) + for k, v in config.items(): + if k in InternalConfig.KEYWORDS: + internal_config[k] = v + else: + main_config[k] = v + return main_config, InternalConfig.parse_obj(internal_config) From 9e1483c57ff4aedf479959294beee33108a051ce Mon Sep 17 00:00:00 2001 From: alafanechere Date: Mon, 21 Nov 2022 12:17:55 +0100 Subject: [PATCH 12/27] revert global changes --- airbyte-cdk/python/unit_tests/sources/test_source.py | 1 - airbyte-cdk/python/unit_tests/test_connector.py | 11 ----------- .../connectors/source-amazon-ads/unit_tests/utils.py | 4 ++-- .../source-azure-table/source_azure_table/source.py | 4 ++-- .../source-facebook-marketing/unit_tests/utils.py | 4 ++-- .../source-google-search-console/unit_tests/utils.py | 4 ++-- .../source-hubspot/source_hubspot/source.py | 4 ++-- .../connectors/source-mixpanel/unit_tests/utils.py | 4 ++-- 8 files changed, 12 insertions(+), 24 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index 2329dfab4cdd..1034975c1892 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -404,7 +404,6 @@ def test_internal_config_limit(abstract_source, catalog): # Set limit and check if state is produced when limit is set for incremental stream logger_mock.reset_mock() - internal_config = {"some_config": 100, "_limit": STREAM_LIMIT} records = [r for r in abstract_source.read(logger=logger_mock, config=internal_config, catalog=catalog, state={})] assert len(records) == STREAM_LIMIT + 1 assert records[-1].type == Type.STATE diff --git a/airbyte-cdk/python/unit_tests/test_connector.py b/airbyte-cdk/python/unit_tests/test_connector.py index 0024ab2dca2e..06e9dd16ead1 100644 --- a/airbyte-cdk/python/unit_tests/test_connector.py +++ b/airbyte-cdk/python/unit_tests/test_connector.py @@ -14,7 +14,6 @@ import pytest import yaml from airbyte_cdk import AirbyteSpec, Connector -from airbyte_cdk.config_observation import ConfigObserver, ObservedDict from airbyte_cdk.models import AirbyteConnectionStatus logger = logging.getLogger("airbyte") @@ -83,16 +82,6 @@ def test_write_config(integration, mock_config): assert mock_config == json.loads(actual.read()) -def test_configure(integration, mock_config): - temp_dir = tempfile.gettempdir() - config = integration.configure(mock_config, temp_dir) - assert isinstance(config, ObservedDict) - assert isinstance(config.observer, ConfigObserver) - assert config.observer.config == config - assert config.observer.config_path == os.path.join(temp_dir, "config.json") - assert config.observer.write_config_fn == integration.write_config - - class TestConnectorSpec: CONNECTION_SPECIFICATION = { "type": "object", diff --git a/airbyte-integrations/connectors/source-amazon-ads/unit_tests/utils.py b/airbyte-integrations/connectors/source-amazon-ads/unit_tests/utils.py index 8bc9c43388ab..71f5d8566f31 100644 --- a/airbyte-integrations/connectors/source-amazon-ads/unit_tests/utils.py +++ b/airbyte-integrations/connectors/source-amazon-ads/unit_tests/utils.py @@ -10,7 +10,7 @@ from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, filter_internal_keywords +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any]) -> Iterator[dict]: @@ -32,7 +32,7 @@ def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, def command_check(source: Source, config): logger = mock.MagicMock() - connector_config, _ = filter_internal_keywords(config) + connector_config, _ = split_config(config) if source.check_config_against_spec: source_spec: ConnectorSpecification = source.spec(logger) check_config_against_spec_or_exit(connector_config, source_spec) diff --git a/airbyte-integrations/connectors/source-azure-table/source_azure_table/source.py b/airbyte-integrations/connectors/source-azure-table/source_azure_table/source.py index 8bf91070814f..d728ad4dd79b 100644 --- a/airbyte-integrations/connectors/source-azure-table/source_azure_table/source.py +++ b/airbyte-integrations/connectors/source-azure-table/source_azure_table/source.py @@ -17,7 +17,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.utils.schema_helpers import filter_internal_keywords +from airbyte_cdk.sources.utils.schema_helpers import split_config from airbyte_cdk.utils.event_timing import create_timer from .azure_table import AzureTableReader @@ -103,7 +103,7 @@ def read( stream_instances = {s.name: s for s in self.streams(logger=logger, config=config)} state_manager = ConnectorStateManager(stream_instance_map=stream_instances, state=state) logger.info(f"Starting syncing {self.name}") - config, internal_config = filter_internal_keywords(config) + config, internal_config = split_config(config) self._stream_to_instance_map = stream_instances with create_timer(self.name) as timer: for configured_stream in catalog.streams: diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/utils.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/utils.py index 5f5f8bb56859..776315e717e6 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/utils.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/utils.py @@ -7,12 +7,12 @@ from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, filter_internal_keywords +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config def command_check(source: Source, config): logger = mock.MagicMock() - connector_config, _ = filter_internal_keywords(config) + connector_config, _ = split_config(config) if source.check_config_against_spec: source_spec: ConnectorSpecification = source.spec(logger) check_config_against_spec_or_exit(connector_config, source_spec) diff --git a/airbyte-integrations/connectors/source-google-search-console/unit_tests/utils.py b/airbyte-integrations/connectors/source-google-search-console/unit_tests/utils.py index 5f5f8bb56859..776315e717e6 100644 --- a/airbyte-integrations/connectors/source-google-search-console/unit_tests/utils.py +++ b/airbyte-integrations/connectors/source-google-search-console/unit_tests/utils.py @@ -7,12 +7,12 @@ from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, filter_internal_keywords +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config def command_check(source: Source, config): logger = mock.MagicMock() - connector_config, _ = filter_internal_keywords(config) + connector_config, _ = split_config(config) if source.check_config_against_spec: source_spec: ConnectorSpecification = source.spec(logger) check_config_against_spec_or_exit(connector_config, source_spec) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py index 87c8cd724971..a721b33ed64f 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py @@ -12,7 +12,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.utils.schema_helpers import filter_internal_keywords +from airbyte_cdk.sources.utils.schema_helpers import split_config from airbyte_cdk.utils.event_timing import create_timer from airbyte_cdk.utils.traced_exception import AirbyteTracedException from requests import HTTPError @@ -154,7 +154,7 @@ def read( This method is overridden to check whether the stream `quotes` exists in the source, if not skip reading that stream. """ logger.info(f"Starting syncing {self.name}") - config, internal_config = filter_internal_keywords(config) + config, internal_config = split_config(config) # TODO assert all streams exist in the connector # get the streams once in case the connector needs to make any queries to generate them stream_instances = {s.name: s for s in self.streams(config)} diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py index 4041fbb4f3f6..b108cfd2eefc 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/utils.py @@ -10,7 +10,7 @@ from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, filter_internal_keywords +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config def setup_response(status, body): @@ -23,7 +23,7 @@ def get_url_to_mock(stream): def command_check(source: Source, config): logger = mock.MagicMock() - connector_config, _ = filter_internal_keywords(config) + connector_config, _ = split_config(config) if source.check_config_against_spec: source_spec: ConnectorSpecification = source.spec(logger) check_config_against_spec_or_exit(connector_config, source_spec) From 2bbff337a42932aa656e3abc54b8ca8c4899c61a Mon Sep 17 00:00:00 2001 From: alafanechere Date: Mon, 21 Nov 2022 12:19:08 +0100 Subject: [PATCH 13/27] revert global changes --- airbyte-cdk/python/airbyte_cdk/connector.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/connector.py b/airbyte-cdk/python/airbyte_cdk/connector.py index 91e822dfae1b..bd47f188c907 100644 --- a/airbyte-cdk/python/airbyte_cdk/connector.py +++ b/airbyte-cdk/python/airbyte_cdk/connector.py @@ -11,7 +11,6 @@ from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar import yaml -from airbyte_cdk.config_observation import ConfigObserver, ObservedDict from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification @@ -97,12 +96,10 @@ def write_config(config: Mapping[str, Any], config_path: str): class DefaultConnectorMixin: # can be overridden to change an input config - def configure(self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> ObservedDict[str, Any]: + def configure(self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: config_path = os.path.join(temp_dir, "config.json") - config_observer = ConfigObserver(config_path, self.write_config) - observed_config = ObservedDict(config, config_observer) - config_observer.set_config(observed_config) - return observed_config + self.write_config(config, config_path) + return config class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC): From 1f028606f1b2a9083b3f03486421fe867b8c4682 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Mon, 21 Nov 2022 12:48:21 +0100 Subject: [PATCH 14/27] observe from Oauth2Authenticator --- .../streams/http/requests_native_auth/oauth.py | 14 +++++++++++++- .../test_requests_native_auth.py | 16 ++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py index 5f2e21df8841..478781224e9a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py @@ -2,9 +2,10 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, List, Mapping +from typing import Any, List, Mapping, MutableMapping import pendulum +from airbyte_cdk.config_observation import ConfigObserver, ObservedDict from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_oauth import AbstractOauth2Authenticator @@ -12,6 +13,7 @@ class Oauth2Authenticator(AbstractOauth2Authenticator): """ Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials. The generated access token is attached to each request via the Authorization header. + If a connector_config is provided any mutation of it's value in the scope of this class will emit AirbyteControlConnectorConfigMessage. """ def __init__( @@ -26,6 +28,7 @@ def __init__( expires_in_name: str = "expires_in", refresh_request_body: Mapping[str, Any] = None, grant_type: str = "refresh_token", + connector_config: Mapping[str, Any] = None, ): self._token_refresh_endpoint = token_refresh_endpoint self._client_secret = client_secret @@ -39,6 +42,15 @@ def __init__( self._token_expiry_date = token_expiry_date or pendulum.now().subtract(days=1) self._access_token = None + self._connector_config = self._observe_connector_config(connector_config) if connector_config else None + + def _observe_connector_config(self, non_observed_connector_config: MutableMapping): + if isinstance(non_observed_connector_config, ObservedDict): + raise ValueError("This connector configuration is already observed") + connector_config_observer = ConfigObserver() + observed_connector_config = ObservedDict(non_observed_connector_config, connector_config_observer) + connector_config_observer.set_config(observed_connector_config) + return observed_connector_config def get_token_refresh_endpoint(self) -> str: return self._token_refresh_endpoint diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py b/airbyte-cdk/python/unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py index 97fc2d9e283d..480c80276823 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py @@ -2,6 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import json import logging import pendulum @@ -174,6 +175,21 @@ def test_auth_call_method(self, mocker): assert {"Authorization": "Bearer access_token"} == prepared_request.headers + def test_auth_with_config_mutation(self, capsys): + original_connector_config = {"refresh_token": "foo"} + oauth = Oauth2Authenticator( + token_refresh_endpoint=TestOauth2Authenticator.refresh_endpoint, + client_id=TestOauth2Authenticator.client_id, + client_secret=TestOauth2Authenticator.client_secret, + refresh_token=TestOauth2Authenticator.refresh_token, + connector_config=original_connector_config, + ) + oauth._connector_config["refresh_token"] = "bar" + captured = capsys.readouterr() + airbyte_message = json.loads(captured.out) + assert airbyte_message["control"]["connectorConfig"] == {"config": {"refresh_token": "bar"}} + assert original_connector_config["refresh_token"] == "foo" + def mock_request(method, url, data): if url == "refresh_end": From 04041d83e474810912dce1d1c176abddc3c62e72 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Mon, 21 Nov 2022 15:52:49 +0100 Subject: [PATCH 15/27] ref --- .../python/airbyte_cdk/config_observation.py | 11 +++++++++- .../http/requests_native_auth/oauth.py | 14 +++--------- .../unit_tests/test_config_observation.py | 22 ++++++++++++++++++- .../source-bing-ads/source_bing_ads/client.py | 6 ++--- 4 files changed, 37 insertions(+), 16 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/config_observation.py b/airbyte-cdk/python/airbyte_cdk/config_observation.py index eac02082f5c0..09daf3271ea1 100644 --- a/airbyte-cdk/python/airbyte_cdk/config_observation.py +++ b/airbyte-cdk/python/airbyte_cdk/config_observation.py @@ -21,7 +21,7 @@ def __init__(self, non_observed_mapping: MutableMapping, observer: ConfigObserve super().__init__(non_observed_mapping) def __setitem__(self, item: Any, value: Any): - """Override dict__setitem__ by: + """Override dict.__setitem__ by: 1. Observing the new value if it is a dict 2. Call observer update if the new value is different from the previous one """ @@ -51,3 +51,12 @@ def _emit_airbyte_control_message(self) -> None: ) airbyte_message = AirbyteMessage(type=Type.CONTROL, control=control_message) print(airbyte_message.json(exclude_unset=True)) + + +def observe_connector_config(non_observed_connector_config: MutableMapping[str, Any]): + if isinstance(non_observed_connector_config, ObservedDict): + raise ValueError("This connector configuration is already observed") + connector_config_observer = ConfigObserver() + observed_connector_config = ObservedDict(non_observed_connector_config, connector_config_observer) + connector_config_observer.set_config(observed_connector_config) + return observed_connector_config diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py index 478781224e9a..6b3bb315f1b0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py @@ -2,10 +2,10 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, List, Mapping, MutableMapping +from typing import Any, List, Mapping import pendulum -from airbyte_cdk.config_observation import ConfigObserver, ObservedDict +from airbyte_cdk.config_observation import observe_connector_config from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_oauth import AbstractOauth2Authenticator @@ -42,15 +42,7 @@ def __init__( self._token_expiry_date = token_expiry_date or pendulum.now().subtract(days=1) self._access_token = None - self._connector_config = self._observe_connector_config(connector_config) if connector_config else None - - def _observe_connector_config(self, non_observed_connector_config: MutableMapping): - if isinstance(non_observed_connector_config, ObservedDict): - raise ValueError("This connector configuration is already observed") - connector_config_observer = ConfigObserver() - observed_connector_config = ObservedDict(non_observed_connector_config, connector_config_observer) - connector_config_observer.set_config(observed_connector_config) - return observed_connector_config + self._connector_config = observe_connector_config(connector_config) if connector_config else None def get_token_refresh_endpoint(self) -> str: return self._token_refresh_endpoint diff --git a/airbyte-cdk/python/unit_tests/test_config_observation.py b/airbyte-cdk/python/unit_tests/test_config_observation.py index d6dfdad311f5..9d1f514a8933 100644 --- a/airbyte-cdk/python/unit_tests/test_config_observation.py +++ b/airbyte-cdk/python/unit_tests/test_config_observation.py @@ -5,7 +5,8 @@ import json import time -from airbyte_cdk.config_observation import ConfigObserver, ObservedDict +import pytest +from airbyte_cdk.config_observation import ConfigObserver, ObservedDict, observe_connector_config class TestObservedDict: @@ -42,3 +43,22 @@ def test_update(self, capsys): assert raw_control_message["type"] == "CONNECTOR_CONFIG" assert raw_control_message["connectorConfig"] == {"config": dict(config_observer.config)} assert before_time < raw_control_message["emitted_at"] < after_time + + +def test_observe_connector_config(capsys): + non_observed_config = {"foo": "bar"} + observed_config = observe_connector_config(non_observed_config) + observer = observed_config.observer + assert isinstance(observed_config, ObservedDict) + assert isinstance(observer, ConfigObserver) + assert observed_config.observer.config == observed_config + observed_config["foo"] = "foo" + captured = capsys.readouterr() + airbyte_message = json.loads(captured.out) + assert airbyte_message["control"]["connectorConfig"] == {"config": {"foo": "foo"}} + + +def test_observe_already_observed_config(): + observed_config = observe_connector_config({"foo": "bar"}) + with pytest.raises(ValueError): + observe_connector_config(observed_config) diff --git a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py index 12e74a281546..8903d880b71f 100644 --- a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py +++ b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py @@ -11,7 +11,7 @@ import backoff import pendulum -from airbyte_cdk.config_observation import ObservedDict +from airbyte_cdk.config_observation import observe_connector_config from airbyte_cdk.logger import AirbyteLogger from bingads.authorization import AuthorizationData, OAuthTokens, OAuthWebAuthCodeGrant from bingads.service_client import ServiceClient @@ -37,8 +37,8 @@ class Client: # The time interval in milliseconds between two status polling attempts. report_poll_interval: int = 15000 - def __init__(self, config: ObservedDict) -> None: - self.config = config + def __init__(self, config: Mapping) -> None: + self.config = observe_connector_config(config) self.authorization_data: Mapping[str, AuthorizationData] = {} self.authentication = self._get_auth_client() self.oauth: OAuthTokens = self._get_access_token() From 12db1b7b0de9c5fc6a7d9ae5cba2e382332617d7 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Mon, 21 Nov 2022 17:31:27 +0100 Subject: [PATCH 16/27] handle list of dicts --- .../python/airbyte_cdk/config_observation.py | 20 ++++++++++--- .../unit_tests/test_config_observation.py | 30 +++++++++++++------ 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/config_observation.py b/airbyte-cdk/python/airbyte_cdk/config_observation.py index 09daf3271ea1..ca52b44cbbc3 100644 --- a/airbyte-cdk/python/airbyte_cdk/config_observation.py +++ b/airbyte-cdk/python/airbyte_cdk/config_observation.py @@ -6,7 +6,7 @@ ) import time -from typing import Any, MutableMapping +from typing import Any, List, MutableMapping from airbyte_cdk.models import AirbyteControlConnectorConfigMessage, AirbyteControlMessage, AirbyteMessage, OrchestratorType, Type @@ -15,9 +15,16 @@ class ObservedDict(dict): def __init__(self, non_observed_mapping: MutableMapping, observer: ConfigObserver, update_on_unchanged_value=True) -> None: self.observer = observer self.update_on_unchanged_value = update_on_unchanged_value - for item, value in self.items(): + for item, value in non_observed_mapping.items(): + # Observe nested dicts if isinstance(value, MutableMapping): - self[item] = ObservedDict(value, observer) + non_observed_mapping[item] = ObservedDict(value, observer) + + # Observe nested list of dicts + if isinstance(value, List): + for i, sub_value in enumerate(value): + if isinstance(sub_value, MutableMapping): + value[i] = ObservedDict(sub_value, observer) super().__init__(non_observed_mapping) def __setitem__(self, item: Any, value: Any): @@ -26,7 +33,12 @@ def __setitem__(self, item: Any, value: Any): 2. Call observer update if the new value is different from the previous one """ previous_value = self.get(item) - value = ObservedDict(value, self.observer) if isinstance(value, MutableMapping) else value + if isinstance(value, MutableMapping): + value = ObservedDict(value, self.observer) + if isinstance(value, List): + for i, sub_value in enumerate(value): + if isinstance(sub_value, MutableMapping): + value[i] = ObservedDict(sub_value, self.observer) super(ObservedDict, self).__setitem__(item, value) if self.update_on_unchanged_value or value != previous_value: self.observer.update() diff --git a/airbyte-cdk/python/unit_tests/test_config_observation.py b/airbyte-cdk/python/unit_tests/test_config_observation.py index 9d1f514a8933..38dc2281aece 100644 --- a/airbyte-cdk/python/unit_tests/test_config_observation.py +++ b/airbyte-cdk/python/unit_tests/test_config_observation.py @@ -12,20 +12,32 @@ class TestObservedDict: def test_update_called_on_set_item(self, mocker): mock_observer = mocker.Mock() - my_observed_dict = ObservedDict({"key": "value"}, mock_observer) + my_observed_dict = ObservedDict( + {"key": "value", "nested_dict": {"key": "value"}, "list_of_dict": [{"key": "value"}, {"key": "value"}]}, mock_observer + ) assert mock_observer.update.call_count == 0 - my_observed_dict["key"] = {"nested_key": "nested_value"} + + my_observed_dict["nested_dict"]["key"] = "new_value" assert mock_observer.update.call_count == 1 - my_observed_dict["key"]["nested_key"] = "new_nested_value" - assert mock_observer.update.call_count == 2 + # Setting the same value again should call observer's update - my_observed_dict["key"]["nested_key"] = "new_nested_value" + my_observed_dict["key"] = "new_value" + assert mock_observer.update.call_count == 2 + + my_observed_dict["nested_dict"]["new_key"] = "value" assert mock_observer.update.call_count == 3 - def test_update_not_called_on_init_with_nested_fields(self, mocker): - mock_observer = mocker.Mock() - ObservedDict({"key": "value", "nested": {"nested_key": "nested_value"}}, mock_observer) - mock_observer.update.assert_not_called() + my_observed_dict["list_of_dict"][0]["key"] = "new_value" + assert mock_observer.update.call_count == 4 + + my_observed_dict["list_of_dict"][0]["new_key"] = "new_value" + assert mock_observer.update.call_count == 5 + + my_observed_dict["new_list_of_dicts"] = [{"foo": "bar"}] + assert mock_observer.update.call_count == 6 + + my_observed_dict["new_list_of_dicts"][0]["new_key"] = "new_value" + assert mock_observer.update.call_count == 7 class TestConfigObserver: From 63d129cc3a54b090c3d8f25b6bf2433fcfd32700 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 23 Nov 2022 16:06:27 +0100 Subject: [PATCH 17/27] implement SingleUseRefreshTokenOauth2Authenticator --- .../http/requests_native_auth/__init__.py | 10 ++- .../requests_native_auth/abstract_oauth.py | 9 +- .../http/requests_native_auth/oauth.py | 88 ++++++++++++++++++- 3 files changed, 99 insertions(+), 8 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/__init__.py index c4f64a971ea0..c336ef2b50e3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/__init__.py @@ -2,7 +2,13 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from .oauth import Oauth2Authenticator +from .oauth import Oauth2Authenticator, SingleUseRefreshTokenOauth2Authenticator from .token import BasicHttpAuthenticator, MultipleTokenAuthenticator, TokenAuthenticator -__all__ = ["Oauth2Authenticator", "TokenAuthenticator", "MultipleTokenAuthenticator", "BasicHttpAuthenticator"] +__all__ = [ + "Oauth2Authenticator", + "SingleUseRefreshTokenOauth2Authenticator", + "TokenAuthenticator", + "MultipleTokenAuthenticator", + "BasicHttpAuthenticator", +] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py index e7e0ce397e80..fab826bd7b46 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py @@ -64,6 +64,11 @@ def build_refresh_request_body(self) -> Mapping[str, Any]: return payload + def _get_refresh_access_token_response(self): + response = requests.request(method="POST", url=self.get_token_refresh_endpoint(), data=self.build_refresh_request_body()) + response.raise_for_status() + return response.json() + def refresh_access_token(self) -> Tuple[str, int]: """ Returns the refresh token and its lifespan in seconds @@ -71,9 +76,7 @@ def refresh_access_token(self) -> Tuple[str, int]: :return: a tuple of (access_token, token_lifespan_in_seconds) """ try: - response = requests.request(method="POST", url=self.get_token_refresh_endpoint(), data=self.build_refresh_request_body()) - response.raise_for_status() - response_json = response.json() + response_json = self._get_refresh_access_token_response() return response_json[self.get_access_token_name()], response_json[self.get_expires_in_name()] except Exception as e: raise Exception(f"Error while refreshing access token: {e}") from e diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py index 6b3bb315f1b0..daa7c298e9d2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py @@ -2,7 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, List, Mapping +from typing import Any, List, Mapping, Tuple import pendulum from airbyte_cdk.config_observation import observe_connector_config @@ -28,7 +28,6 @@ def __init__( expires_in_name: str = "expires_in", refresh_request_body: Mapping[str, Any] = None, grant_type: str = "refresh_token", - connector_config: Mapping[str, Any] = None, ): self._token_refresh_endpoint = token_refresh_endpoint self._client_secret = client_secret @@ -42,7 +41,6 @@ def __init__( self._token_expiry_date = token_expiry_date or pendulum.now().subtract(days=1) self._access_token = None - self._connector_config = observe_connector_config(connector_config) if connector_config else None def get_token_refresh_endpoint(self) -> str: return self._token_refresh_endpoint @@ -84,3 +82,87 @@ def access_token(self) -> str: @access_token.setter def access_token(self, value: str): self._access_token = value + + +class SingleUseRefreshTokenOauth2Authenticator(Oauth2Authenticator): + def __init__( + self, + connector_config: Mapping[str, Any], + token_refresh_endpoint: str, + scopes: List[str] = None, + token_expiry_date: pendulum.DateTime = None, + access_token_name: str = "access_token", + expires_in_name: str = "expires_in", + refresh_token_name: str = "refresh_token", + refresh_request_body: Mapping[str, Any] = None, + grant_type: str = "refresh_token", + credentials_configuration_field_name: str = "credentials", + ): + self.credentials_configuration_field_name = credentials_configuration_field_name + self._refresh_token_name = refresh_token_name + self._connector_config = observe_connector_config(connector_config) + self._validate_config() + super().__init__( + token_refresh_endpoint, + self.get_client_id(), + self.get_client_secret(), + self.get_refresh_token(), + scopes, + token_expiry_date, + access_token_name, + expires_in_name, + refresh_request_body, + grant_type, + ) + + def _validate_config(self): + for field_name, getter in [ + ("client_id", self.get_client_id), + ("client_secret", self.get_client_secret), + (self.get_refresh_token_name(), self.get_refresh_token), + ]: + try: + getter() + except KeyError: + raise ValueError( + f"This authenticator expects a {field_name} field under the {self.credentials_configuration_field_name} field. Please override this class getters or change your configuration structure." + ) + + def get_refresh_token_name(self) -> str: + return self._refresh_token_name + + def _get_config_credentials_field(self, field_name): + return self._connector_config[self.credentials_configuration_field_name][field_name] + + def get_client_id(self) -> str: + return self._get_config_credentials_field("client_id") + + def get_client_secret(self) -> str: + return self._get_config_credentials_field("client_secret") + + def set_refresh_token(self, new_refresh_token: str): + self._connector_config[self.credentials_configuration_field_name][self.get_refresh_token_name()] = new_refresh_token + + def get_refresh_token(self) -> str: + return self._get_config_credentials_field(self.get_refresh_token_name()) + + def get_access_token(self) -> str: + """Returns the access token""" + if self.token_has_expired(): + t0 = pendulum.now() + new_access_token, access_token_expires_in, new_refresh_token = self.refresh_access_token() + self.access_token = new_access_token + self.set_token_expiry_date(t0.add(seconds=access_token_expires_in)) + self.set_refresh_token(new_refresh_token) + return self.access_token + + def refresh_access_token(self) -> Tuple[str, int, str]: + try: + response_json = self._get_refresh_access_token_response() + return ( + response_json[self.get_access_token_name()], + response_json[self.get_expires_in_name()], + response_json[self.get_refresh_token_name()], + ) + except Exception as e: + raise Exception(f"Error while refreshing access token and refresh token: {e}") from e From 62ef55309db400d420f5ee9f9db2735ee908ce08 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 23 Nov 2022 16:07:06 +0100 Subject: [PATCH 18/27] test SingleUseRefreshTokenOauth2Authenticator --- .../test_requests_native_auth.py | 77 ++++++++++++++++--- 1 file changed, 66 insertions(+), 11 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py b/airbyte-cdk/python/unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py index 480c80276823..368669bf1223 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py @@ -6,11 +6,14 @@ import logging import pendulum +import pytest import requests +from airbyte_cdk.config_observation import ObservedDict from airbyte_cdk.sources.streams.http.requests_native_auth import ( BasicHttpAuthenticator, MultipleTokenAuthenticator, Oauth2Authenticator, + SingleUseRefreshTokenOauth2Authenticator, TokenAuthenticator, ) from requests import Response @@ -175,20 +178,72 @@ def test_auth_call_method(self, mocker): assert {"Authorization": "Bearer access_token"} == prepared_request.headers - def test_auth_with_config_mutation(self, capsys): - original_connector_config = {"refresh_token": "foo"} - oauth = Oauth2Authenticator( - token_refresh_endpoint=TestOauth2Authenticator.refresh_endpoint, - client_id=TestOauth2Authenticator.client_id, - client_secret=TestOauth2Authenticator.client_secret, - refresh_token=TestOauth2Authenticator.refresh_token, - connector_config=original_connector_config, + +class TestSingleUseRefreshTokenOauth2Authenticator: + @pytest.fixture + def connector_config(self): + return { + "credentials": { + "access_token": "my_access_token", + "refresh_token": "my_refresh_token", + "client_id": "my_client_id", + "client_secret": "my_client_secret", + } + } + + @pytest.fixture + def invalid_connector_config(self): + return {"no_credentials_key": "foo"} + + def test_init(self, connector_config): + authenticator = SingleUseRefreshTokenOauth2Authenticator( + connector_config, + token_refresh_endpoint="foobar", ) - oauth._connector_config["refresh_token"] = "bar" + assert isinstance(authenticator._connector_config, ObservedDict) + + def test_init_with_invalid_config(self, invalid_connector_config): + with pytest.raises(ValueError): + SingleUseRefreshTokenOauth2Authenticator( + invalid_connector_config, + token_refresh_endpoint="foobar", + ) + + def test_get_access_token(self, capsys, mocker, connector_config): + authenticator = SingleUseRefreshTokenOauth2Authenticator( + connector_config, + token_refresh_endpoint="foobar", + ) + authenticator.refresh_access_token = mocker.Mock(return_value=("new_access_token", 42, "new_refresh_token")) + authenticator.token_has_expired = mocker.Mock(return_value=True) + access_token = authenticator.get_access_token() captured = capsys.readouterr() airbyte_message = json.loads(captured.out) - assert airbyte_message["control"]["connectorConfig"] == {"config": {"refresh_token": "bar"}} - assert original_connector_config["refresh_token"] == "foo" + expected_new_config = connector_config.copy() + expected_new_config["credentials"]["refresh_token"] = "new_refresh_token" + assert airbyte_message["control"]["connectorConfig"]["config"] == expected_new_config + assert authenticator.access_token == access_token == "new_access_token" + assert authenticator.get_refresh_token() == "new_refresh_token" + assert authenticator.get_token_expiry_date() > pendulum.now() + authenticator.token_has_expired = mocker.Mock(return_value=False) + access_token = authenticator.get_access_token() + captured = capsys.readouterr() + assert not captured.out + assert authenticator.access_token == access_token == "new_access_token" + + def test_refresh_access_token(self, mocker, connector_config): + authenticator = SingleUseRefreshTokenOauth2Authenticator( + connector_config, + token_refresh_endpoint="foobar", + ) + authenticator._get_refresh_access_token_response = mocker.Mock( + return_value={ + authenticator.get_access_token_name(): "new_access_token", + authenticator.get_expires_in_name(): 42, + authenticator.get_refresh_token_name(): "new_refresh_token", + } + ) + assert authenticator.refresh_access_token() == ("new_access_token", 42, "new_refresh_token") def mock_request(method, url, data): From bbe802af8cdae80139ab64911d24a3d2c75e85c9 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 23 Nov 2022 16:08:39 +0100 Subject: [PATCH 19/27] call copy in ObservedDict --- airbyte-cdk/python/airbyte_cdk/config_observation.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/config_observation.py b/airbyte-cdk/python/airbyte_cdk/config_observation.py index ca52b44cbbc3..a25f1e160fe6 100644 --- a/airbyte-cdk/python/airbyte_cdk/config_observation.py +++ b/airbyte-cdk/python/airbyte_cdk/config_observation.py @@ -13,6 +13,7 @@ class ObservedDict(dict): def __init__(self, non_observed_mapping: MutableMapping, observer: ConfigObserver, update_on_unchanged_value=True) -> None: + non_observed_mapping = non_observed_mapping.copy() self.observer = observer self.update_on_unchanged_value = update_on_unchanged_value for item, value in non_observed_mapping.items(): @@ -46,7 +47,7 @@ def __setitem__(self, item: Any, value: Any): class ConfigObserver: """This class is made to track mutations on ObservedDict config. - When update is called the observed configuration is saved on disk a CONNECTOR_CONFIG control message is emitted on stdout. + When update is called a CONNECTOR_CONFIG control message is emitted on stdout. """ def set_config(self, config: ObservedDict) -> None: From 8fd52f9a04f34b1b74f6ece18de09fcdbd0dd0db Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 23 Nov 2022 16:44:57 +0100 Subject: [PATCH 20/27] add docstring --- .../http/requests_native_auth/oauth.py | 32 ++++++++++++++++--- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py index daa7c298e9d2..4de3037bf7ac 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py @@ -85,6 +85,14 @@ def access_token(self, value: str): class SingleUseRefreshTokenOauth2Authenticator(Oauth2Authenticator): + """ + Authenticator that should be used for API implementing single use refresh tokens: + when refreshing access token some API returns a new refresh token that needs to used in the next refresh flow. + This authenticator updates the configuration with new refresh token by emitting Airbyte control message from an observed mutation. + This authenticator expects a connector config with a"credentials" field with the following nested fields: client_id, client_secret, refresh_token. + This behavior can be changed by overriding getters or changing the default "credentials_configuration_field_name" value. + """ + def __init__( self, connector_config: Mapping[str, Any], @@ -101,7 +109,7 @@ def __init__( self.credentials_configuration_field_name = credentials_configuration_field_name self._refresh_token_name = refresh_token_name self._connector_config = observe_connector_config(connector_config) - self._validate_config() + self._validate_connector_config() super().__init__( token_refresh_endpoint, self.get_client_id(), @@ -115,15 +123,20 @@ def __init__( grant_type, ) - def _validate_config(self): + def _validate_connector_config(self): + """Validates the defined getters for configuration values are returning values. + + Raises: + ValueError: Raised if the defined getters are not returning a value. + """ for field_name, getter in [ ("client_id", self.get_client_id), ("client_secret", self.get_client_secret), (self.get_refresh_token_name(), self.get_refresh_token), ]: try: - getter() - except KeyError: + assert getter() + except (AssertionError, KeyError): raise ValueError( f"This authenticator expects a {field_name} field under the {self.credentials_configuration_field_name} field. Please override this class getters or change your configuration structure." ) @@ -141,13 +154,22 @@ def get_client_secret(self) -> str: return self._get_config_credentials_field("client_secret") def set_refresh_token(self, new_refresh_token: str): + """Set the new refresh token value. The mutation of the connector_config object will emit an Airbyte control message. + + Args: + new_refresh_token (str): The new refresh token value. + """ self._connector_config[self.credentials_configuration_field_name][self.get_refresh_token_name()] = new_refresh_token def get_refresh_token(self) -> str: return self._get_config_credentials_field(self.get_refresh_token_name()) def get_access_token(self) -> str: - """Returns the access token""" + """Retrieve new access and refresh token if the access token has expired. + The new refresh token is persisted with the set_refresh_token function + Returns: + str: The current access_token, updated if it was previously expired. + """ if self.token_has_expired(): t0 = pendulum.now() new_access_token, access_token_expires_in, new_refresh_token = self.refresh_access_token() From c5a0b6d010699781668094798b9ee42c3009eb55 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 23 Nov 2022 16:46:30 +0100 Subject: [PATCH 21/27] source harvest example --- .../connectors/source-harvest/source_harvest/auth.py | 7 +++++-- .../connectors/source-harvest/source_harvest/source.py | 5 +---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-harvest/source_harvest/auth.py b/airbyte-integrations/connectors/source-harvest/source_harvest/auth.py index 26bf79acf644..6a3c99e26503 100644 --- a/airbyte-integrations/connectors/source-harvest/source_harvest/auth.py +++ b/airbyte-integrations/connectors/source-harvest/source_harvest/auth.py @@ -4,7 +4,7 @@ from typing import Any, Mapping -from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator, TokenAuthenticator +from airbyte_cdk.sources.streams.http.requests_native_auth import SingleUseRefreshTokenOauth2Authenticator, TokenAuthenticator class HarvestMixin: @@ -29,8 +29,11 @@ class HarvestTokenAuthenticator(HarvestMixin, TokenAuthenticator): """ -class HarvestOauth2Authenticator(HarvestMixin, Oauth2Authenticator): +class HarvestOauth2Authenticator(SingleUseRefreshTokenOauth2Authenticator): """ Auth class for OAuth2 https://help.getharvest.com/api-v2/authentication-api/authentication/authentication/#for-server-side-applications """ + + def get_auth_header(self) -> Mapping[str, Any]: + return {**super().get_auth_header(), "Harvest-Account-ID": self._connector_config["account_id"]} diff --git a/airbyte-integrations/connectors/source-harvest/source_harvest/source.py b/airbyte-integrations/connectors/source-harvest/source_harvest/source.py index 93dab5b9e291..68a6124b9c57 100644 --- a/airbyte-integrations/connectors/source-harvest/source_harvest/source.py +++ b/airbyte-integrations/connectors/source-harvest/source_harvest/source.py @@ -54,11 +54,8 @@ def get_authenticator(config): credentials = config.get("credentials", {}) if credentials and "client_id" in credentials: return HarvestOauth2Authenticator( + config, token_refresh_endpoint="https://id.getharvest.com/api/v2/oauth2/token", - client_id=credentials.get("client_id"), - client_secret=credentials.get("client_secret"), - refresh_token=credentials.get("refresh_token"), - account_id=config["account_id"], ) api_token = credentials.get("api_token", config.get("api_token")) From 803b70ae052a60290896d62d184e3230ff1801a1 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Fri, 25 Nov 2022 12:04:03 +0100 Subject: [PATCH 22/27] use dpath --- .../http/requests_native_auth/oauth.py | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py index 4de3037bf7ac..34b5687269d5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py @@ -4,6 +4,7 @@ from typing import Any, List, Mapping, Tuple +import dpath import pendulum from airbyte_cdk.config_observation import observe_connector_config from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_oauth import AbstractOauth2Authenticator @@ -104,9 +105,13 @@ def __init__( refresh_token_name: str = "refresh_token", refresh_request_body: Mapping[str, Any] = None, grant_type: str = "refresh_token", - credentials_configuration_field_name: str = "credentials", + client_id_config_path="/credentials/client_id", + client_secret_config_path="/credentials/client_secret", + refresh_token_config_path="/credentials/refresh_token", ): - self.credentials_configuration_field_name = credentials_configuration_field_name + self._client_id_config_path = client_id_config_path + self._client_secret_config_path = client_secret_config_path + self._refresh_token_config_path = refresh_token_config_path self._refresh_token_name = refresh_token_name self._connector_config = observe_connector_config(connector_config) self._validate_connector_config() @@ -129,16 +134,16 @@ def _validate_connector_config(self): Raises: ValueError: Raised if the defined getters are not returning a value. """ - for field_name, getter in [ - ("client_id", self.get_client_id), - ("client_secret", self.get_client_secret), - (self.get_refresh_token_name(), self.get_refresh_token), + for field_path, getter in [ + (self._client_id_config_path, self.get_client_id), + (self._client_secret_config_path, self.get_client_secret), + (self._refresh_token_config_path, self.get_refresh_token), ]: try: assert getter() - except (AssertionError, KeyError): + except KeyError: raise ValueError( - f"This authenticator expects a {field_name} field under the {self.credentials_configuration_field_name} field. Please override this class getters or change your configuration structure." + f"This authenticator expects a value field under the {field_path} field path. Please override this class getters or change your configuration structure." ) def get_refresh_token_name(self) -> str: @@ -148,10 +153,13 @@ def _get_config_credentials_field(self, field_name): return self._connector_config[self.credentials_configuration_field_name][field_name] def get_client_id(self) -> str: - return self._get_config_credentials_field("client_id") + return dpath.util.get(self._connector_config, self._client_id_config_path) def get_client_secret(self) -> str: - return self._get_config_credentials_field("client_secret") + return dpath.util.get(self._connector_config, self._client_secret_config_path) + + def get_refresh_token(self) -> str: + return dpath.util.get(self._connector_config, self._refresh_token_config_path) def set_refresh_token(self, new_refresh_token: str): """Set the new refresh token value. The mutation of the connector_config object will emit an Airbyte control message. @@ -159,10 +167,7 @@ def set_refresh_token(self, new_refresh_token: str): Args: new_refresh_token (str): The new refresh token value. """ - self._connector_config[self.credentials_configuration_field_name][self.get_refresh_token_name()] = new_refresh_token - - def get_refresh_token(self) -> str: - return self._get_config_credentials_field(self.get_refresh_token_name()) + dpath.util.set(self._connector_config, self._refresh_token_config_path, new_refresh_token) def get_access_token(self) -> str: """Retrieve new access and refresh token if the access token has expired. From 37f474f5d7d218fbc6ce0d84b1ca2946f32c8631 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Fri, 25 Nov 2022 12:21:11 +0100 Subject: [PATCH 23/27] better doc string --- .../http/requests_native_auth/oauth.py | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py index 34b5687269d5..254fb534d5eb 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py @@ -90,8 +90,8 @@ class SingleUseRefreshTokenOauth2Authenticator(Oauth2Authenticator): Authenticator that should be used for API implementing single use refresh tokens: when refreshing access token some API returns a new refresh token that needs to used in the next refresh flow. This authenticator updates the configuration with new refresh token by emitting Airbyte control message from an observed mutation. - This authenticator expects a connector config with a"credentials" field with the following nested fields: client_id, client_secret, refresh_token. - This behavior can be changed by overriding getters or changing the default "credentials_configuration_field_name" value. + By default this authenticator expects a connector config with a"credentials" field with the following nested fields: client_id, client_secret, refresh_token. + This behavior can be changed by defining custom config path (using dpath paths) in client_id_config_path, client_secret_config_path, refresh_token_config_path constructor arguments. """ def __init__( @@ -109,6 +109,22 @@ def __init__( client_secret_config_path="/credentials/client_secret", refresh_token_config_path="/credentials/refresh_token", ): + """_summary_ + + Args: + connector_config (Mapping[str, Any]): The full connector configuration + token_refresh_endpoint (str): Full URL to the token refresh endpoint + scopes (List[str], optional): List of OAuth scopes to pass in the refresh token request body. Defaults to None. + token_expiry_date (pendulum.DateTime, optional): Datetime at which the current token will expire. Defaults to None. + access_token_name (str, optional): Name of the access token field, used to parse the refresh token response. Defaults to "access_token". + expires_in_name (str, optional): Name of the name of the field that characterizes when the current access token will expire, used to parse the refresh token response. Defaults to "expires_in". + refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token". + refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None. + grant_type (str, optional): OAuth grant type. Defaults to "refresh_token". + client_id_config_path (str, optional): Dpath to the client_id field in the connector configuration. Defaults to "/credentials/client_id". + client_secret_config_path (str, optional): Dpath to the client_secret field in the connector configuration. Defaults to "/credentials/client_secret". + refresh_token_config_path (str, optional): Dpath to the refresh_token field in the connector configuration. Defaults to "/credentials/refresh_token". + """ self._client_id_config_path = client_id_config_path self._client_secret_config_path = client_secret_config_path self._refresh_token_config_path = refresh_token_config_path @@ -134,16 +150,16 @@ def _validate_connector_config(self): Raises: ValueError: Raised if the defined getters are not returning a value. """ - for field_path, getter in [ - (self._client_id_config_path, self.get_client_id), - (self._client_secret_config_path, self.get_client_secret), - (self._refresh_token_config_path, self.get_refresh_token), + for field_path, getter, parameter_name in [ + (self._client_id_config_path, self.get_client_id, "client_id_config_path"), + (self._client_secret_config_path, self.get_client_secret, "client_secret_config_path"), + (self._refresh_token_config_path, self.get_refresh_token, "refresh_token_config_path"), ]: try: assert getter() except KeyError: raise ValueError( - f"This authenticator expects a value field under the {field_path} field path. Please override this class getters or change your configuration structure." + f"This authenticator expects a value under the {field_path} field path. Please check your configuration structure or change the {parameter_name} value at initialization of this authenticator." ) def get_refresh_token_name(self) -> str: From 08f64e4d555b5832f54ec4da8face3975bb1b2cb Mon Sep 17 00:00:00 2001 From: alafanechere Date: Fri, 25 Nov 2022 12:28:18 +0100 Subject: [PATCH 24/27] update changelog --- airbyte-cdk/python/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 75198dbd2f10..63b6ba35ca96 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,7 +1,7 @@ # Changelog ## 0.10.0 -Emit `AirbyteControlMessage.ConnectorConfig` on `config` mutation. +Declare a new authenticator `SingleUseRefreshTokenOauth2Authenticator` that can perform connector configuration mutation and emit `AirbyteControlMessage.ConnectorConfig`. ## 0.9.5 Low-code: Add jinja macro `format_datetime` From 518a36f49dc7c0df98ae50989d8c1e1882b95fa6 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 29 Nov 2022 11:53:15 +0100 Subject: [PATCH 25/27] use sequence instead of string path for dpath declaration --- .../http/requests_native_auth/oauth.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py index 254fb534d5eb..5be3b3d05d16 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py @@ -2,7 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, List, Mapping, Tuple +from typing import Any, List, Mapping, Sequence, Tuple import dpath import pendulum @@ -105,11 +105,11 @@ def __init__( refresh_token_name: str = "refresh_token", refresh_request_body: Mapping[str, Any] = None, grant_type: str = "refresh_token", - client_id_config_path="/credentials/client_id", - client_secret_config_path="/credentials/client_secret", - refresh_token_config_path="/credentials/refresh_token", + client_id_config_path: Sequence[str] = ("credentials", "client_id"), + client_secret_config_path: Sequence[str] = ("credentials", "client_secret"), + refresh_token_config_path: Sequence[str] = ("credentials", "refresh_token"), ): - """_summary_ + """ Args: connector_config (Mapping[str, Any]): The full connector configuration @@ -121,9 +121,9 @@ def __init__( refresh_token_name (str, optional): Name of the name of the refresh token field, used to parse the refresh token response. Defaults to "refresh_token". refresh_request_body (Mapping[str, Any], optional): Custom key value pair that will be added to the refresh token request body. Defaults to None. grant_type (str, optional): OAuth grant type. Defaults to "refresh_token". - client_id_config_path (str, optional): Dpath to the client_id field in the connector configuration. Defaults to "/credentials/client_id". - client_secret_config_path (str, optional): Dpath to the client_secret field in the connector configuration. Defaults to "/credentials/client_secret". - refresh_token_config_path (str, optional): Dpath to the refresh_token field in the connector configuration. Defaults to "/credentials/refresh_token". + client_id_config_path (Sequence[str]): Dpath to the client_id field in the connector configuration. Defaults to ("credentials", "client_id"). + client_secret_config_path (Sequence[str]): Dpath to the client_secret field in the connector configuration. Defaults to ("credentials", "client_secret"). + refresh_token_config_path (Sequence[str]): Dpath to the refresh_token field in the connector configuration. Defaults to ("credentials", "refresh_token"). """ self._client_id_config_path = client_id_config_path self._client_secret_config_path = client_secret_config_path @@ -165,9 +165,6 @@ def _validate_connector_config(self): def get_refresh_token_name(self) -> str: return self._refresh_token_name - def _get_config_credentials_field(self, field_name): - return self._connector_config[self.credentials_configuration_field_name][field_name] - def get_client_id(self) -> str: return dpath.util.get(self._connector_config, self._client_id_config_path) From 4da6581ece54a5c7418c82049b79f44aaf0f6fdb Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 29 Nov 2022 12:04:47 +0100 Subject: [PATCH 26/27] revert connector changes --- .../source-bing-ads/source_bing_ads/client.py | 51 +++++++++++-------- .../source-bing-ads/source_bing_ads/source.py | 4 +- .../source-harvest/source_harvest/auth.py | 7 +-- .../source-harvest/source_harvest/source.py | 5 +- 4 files changed, 37 insertions(+), 30 deletions(-) diff --git a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py index 8903d880b71f..e6193ca3ce0f 100644 --- a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py +++ b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/client.py @@ -11,7 +11,6 @@ import backoff import pendulum -from airbyte_cdk.config_observation import observe_connector_config from airbyte_cdk.logger import AirbyteLogger from bingads.authorization import AuthorizationData, OAuthTokens, OAuthWebAuthCodeGrant from bingads.service_client import ServiceClient @@ -37,39 +36,47 @@ class Client: # The time interval in milliseconds between two status polling attempts. report_poll_interval: int = 15000 - def __init__(self, config: Mapping) -> None: - self.config = observe_connector_config(config) + def __init__( + self, + tenant_id: str, + reports_start_date: str, + developer_token: str = None, + client_id: str = None, + client_secret: str = None, + refresh_token: str = None, + **kwargs: Mapping[str, Any], + ) -> None: self.authorization_data: Mapping[str, AuthorizationData] = {} - self.authentication = self._get_auth_client() + self.refresh_token = refresh_token + self.developer_token = developer_token + + self.client_id = client_id + self.client_secret = client_secret + + self.authentication = self._get_auth_client(client_id, tenant_id, client_secret) self.oauth: OAuthTokens = self._get_access_token() - self.reports_start_date = pendulum.parse(config["reports_start_date"]).astimezone(tz=timezone.utc) + self.reports_start_date = pendulum.parse(reports_start_date).astimezone(tz=timezone.utc) - @property - def auth_creds(self): - return { - "client_id": self.config["client_id"], + def _get_auth_client(self, client_id: str, tenant_id: str, client_secret: str = None) -> OAuthWebAuthCodeGrant: + # https://github.com/BingAds/BingAds-Python-SDK/blob/e7b5a618e87a43d0a5e2c79d9aa4626e208797bd/bingads/authorization.py#L390 + auth_creds = { + "client_id": client_id, "redirection_uri": "", # should be empty string - "client_secret": self.config.get("client_secret"), - "tenant": self.config["tenant_id"], + "client_secret": None, + "tenant": tenant_id, } - - def _get_auth_client(self) -> OAuthWebAuthCodeGrant: - # https://github.com/BingAds/BingAds-Python-SDK/blob/e7b5a618e87a43d0a5e2c79d9aa4626e208797bd/bingads/authorization.py#L390 - # the `client_secret` should be provided for `non-public clients` only # https://docs.microsoft.com/en-us/advertising/guides/authentication-oauth-get-tokens?view=bingads-13#request-accesstoken - - return OAuthWebAuthCodeGrant(**self.auth_creds, token_refreshed_callback=self.update_tokens) - - def update_tokens(self, new_oauth_tokens: OAuthTokens): - self.config["refresh_token"] = new_oauth_tokens.refresh_token + if client_secret and client_secret != "": + auth_creds["client_secret"] = client_secret + return OAuthWebAuthCodeGrant(**auth_creds) @lru_cache(maxsize=4) def _get_auth_data(self, customer_id: str = None, account_id: Optional[str] = None) -> AuthorizationData: return AuthorizationData( account_id=account_id, customer_id=customer_id, - developer_token=self.config["developer_token"], + developer_token=self.developer_token, authentication=self.authentication, ) @@ -78,7 +85,7 @@ def _get_access_token(self) -> OAuthTokens: # clear caches to be able to use new access token self.get_service.cache_clear() self._get_auth_data.cache_clear() - return self.authentication.request_oauth_tokens_by_refresh_token(self.config["refresh_token"]) + return self.authentication.request_oauth_tokens_by_refresh_token(self.refresh_token) def is_token_expiring(self) -> bool: """ diff --git a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py index 995271fb8e9d..b16bdbf600ab 100644 --- a/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py +++ b/airbyte-integrations/connectors/source-bing-ads/source_bing_ads/source.py @@ -719,7 +719,7 @@ class SourceBingAds(AbstractSource): def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]: try: - client = Client(config) + client = Client(**config) account_ids = {str(account["Id"]) for account in Accounts(client, config).read_records(SyncMode.full_refresh)} if account_ids: return True, None @@ -738,7 +738,7 @@ def get_report_streams(self, aggregation_type: str) -> List[Stream]: ] def streams(self, config: Mapping[str, Any]) -> List[Stream]: - client = Client(config) + client = Client(**config) streams = [ Accounts(client, config), AdGroups(client, config), diff --git a/airbyte-integrations/connectors/source-harvest/source_harvest/auth.py b/airbyte-integrations/connectors/source-harvest/source_harvest/auth.py index 6a3c99e26503..26bf79acf644 100644 --- a/airbyte-integrations/connectors/source-harvest/source_harvest/auth.py +++ b/airbyte-integrations/connectors/source-harvest/source_harvest/auth.py @@ -4,7 +4,7 @@ from typing import Any, Mapping -from airbyte_cdk.sources.streams.http.requests_native_auth import SingleUseRefreshTokenOauth2Authenticator, TokenAuthenticator +from airbyte_cdk.sources.streams.http.auth import Oauth2Authenticator, TokenAuthenticator class HarvestMixin: @@ -29,11 +29,8 @@ class HarvestTokenAuthenticator(HarvestMixin, TokenAuthenticator): """ -class HarvestOauth2Authenticator(SingleUseRefreshTokenOauth2Authenticator): +class HarvestOauth2Authenticator(HarvestMixin, Oauth2Authenticator): """ Auth class for OAuth2 https://help.getharvest.com/api-v2/authentication-api/authentication/authentication/#for-server-side-applications """ - - def get_auth_header(self) -> Mapping[str, Any]: - return {**super().get_auth_header(), "Harvest-Account-ID": self._connector_config["account_id"]} diff --git a/airbyte-integrations/connectors/source-harvest/source_harvest/source.py b/airbyte-integrations/connectors/source-harvest/source_harvest/source.py index 68a6124b9c57..93dab5b9e291 100644 --- a/airbyte-integrations/connectors/source-harvest/source_harvest/source.py +++ b/airbyte-integrations/connectors/source-harvest/source_harvest/source.py @@ -54,8 +54,11 @@ def get_authenticator(config): credentials = config.get("credentials", {}) if credentials and "client_id" in credentials: return HarvestOauth2Authenticator( - config, token_refresh_endpoint="https://id.getharvest.com/api/v2/oauth2/token", + client_id=credentials.get("client_id"), + client_secret=credentials.get("client_secret"), + refresh_token=credentials.get("refresh_token"), + account_id=config["account_id"], ) api_token = credentials.get("api_token", config.get("api_token")) From 380e175ff8df502d0614da566c38e0977bf20997 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 29 Nov 2022 13:30:18 +0100 Subject: [PATCH 27/27] format --- airbyte-cdk/python/airbyte_cdk/config_observation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-cdk/python/airbyte_cdk/config_observation.py b/airbyte-cdk/python/airbyte_cdk/config_observation.py index a25f1e160fe6..8d886e44bed9 100644 --- a/airbyte-cdk/python/airbyte_cdk/config_observation.py +++ b/airbyte-cdk/python/airbyte_cdk/config_observation.py @@ -1,6 +1,7 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # + from __future__ import ( # Used to evaluate type hints at runtime, a NameError: name 'ConfigObserver' is not defined is thrown otherwise annotations, )