Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDK: Emit control message on config mutation #19428

Merged
merged 30 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f3d4b5a
wip
alafanechere Nov 14, 2022
24d24dd
implementation
alafanechere Nov 15, 2022
355b90b
format
alafanechere Nov 15, 2022
61e9d4b
bump version
alafanechere Nov 15, 2022
1101ccc
Merge branch 'master' into augustin/cdk/emit-updated-configs
alafanechere Nov 15, 2022
22f75c7
always update by default, even if same value
alafanechere Nov 16, 2022
d562117
rename split_config to filter_internal_keywords
alafanechere Nov 16, 2022
733b89a
bing ads example
alafanechere Nov 16, 2022
a59a1e6
wrap around AirbyteMessage
alafanechere Nov 16, 2022
616256e
exclude unset
alafanechere Nov 16, 2022
3ef545d
observer does not write config to disk
alafanechere Nov 21, 2022
0ff7318
revert global changes
alafanechere Nov 21, 2022
9e1483c
revert global changes
alafanechere Nov 21, 2022
2bbff33
revert global changes
alafanechere Nov 21, 2022
1f02860
observe from Oauth2Authenticator
alafanechere Nov 21, 2022
f40d99c
Merge branch 'master' into augustin/cdk/emit-updated-configs
alafanechere Nov 21, 2022
04041d8
ref
alafanechere Nov 21, 2022
12db1b7
handle list of dicts
alafanechere Nov 21, 2022
63d129c
implement SingleUseRefreshTokenOauth2Authenticator
alafanechere Nov 23, 2022
62ef553
test SingleUseRefreshTokenOauth2Authenticator
alafanechere Nov 23, 2022
bbe802a
call copy in ObservedDict
alafanechere Nov 23, 2022
8fd52f9
add docstring
alafanechere Nov 23, 2022
c5a0b6d
source harvest example
alafanechere Nov 23, 2022
803b70a
use dpath
alafanechere Nov 25, 2022
37f474f
better doc string
alafanechere Nov 25, 2022
08f64e4
update changelog
alafanechere Nov 25, 2022
518a36f
use sequence instead of string path for dpath declaration
alafanechere Nov 29, 2022
64bbc48
Merge branch 'master' into augustin/cdk/emit-updated-configs
alafanechere Nov 29, 2022
4da6581
revert connector changes
alafanechere Nov 29, 2022
380e175
format
alafanechere Nov 29, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.10.0
Emit `AirbyteControlMessage.ConnectorConfig` on `config` mutation.

## 0.9.2
Low-code: Make `default_paginator.page_token_option` optional

Expand Down
61 changes: 61 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import time
from abc import ABC, abstractmethod
from typing import Any, Callable, MutableMapping

from airbyte_cdk.models import AirbyteControlConnectorConfigMessage, AirbyteControlMessage, OrchestratorType


class BaseObserver(ABC):
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
@abstractmethod
def update(self):
...


class ObservedDict(dict):
def __init__(self, non_observed_mapping: MutableMapping, observer: BaseObserver) -> None:
super().__init__(non_observed_mapping)
self.observer = observer
for item, value in self.items():
if isinstance(value, MutableMapping):
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be DRYd with the logic in set item?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, will do.

self[item] = ObservedDict(value, observer)

def __setitem__(self, item: Any, value: Any):
"""Override dict__setitem__ by:
1. Observing the new value if it is a dict
2. Call observer update if the new value is different from the previous one
"""
previous_value = self.get(item)
value = ObservedDict(value, self.observer) if isinstance(value, MutableMapping) else value
super(ObservedDict, self).__setitem__(item, value)
if value != previous_value:
self.observer.update()


class ConfigObserver(BaseObserver):
"""This class is made to track mutations on ObservedDict config.
When update is called the observed configuration is saved on disk a CONNECTOR_CONFIG control message is emitted on stdout.
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, config_path: str, write_config_fn: Callable) -> None:
self.config_path = config_path
self.write_config_fn = write_config_fn

def set_config(self, config: ObservedDict) -> None:
self.config = config
self.write_config_fn(self.config, self.config_path)

def update(self) -> None:
self.write_config_fn(self.config, self.config_path)
self._emit_airbyte_control_message()

def _emit_airbyte_control_message(self) -> None:
control_message = AirbyteControlMessage(
type=OrchestratorType.CONNECTOR_CONFIG,
emitted_at=time.time() * 1000,
connectorConfig=AirbyteControlConnectorConfigMessage(config=self.config),
)
print(control_message.json())
9 changes: 6 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar

import yaml
from airbyte_cdk.config_observation import ConfigObserver, ObservedDict
from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification


Expand Down Expand Up @@ -96,10 +97,12 @@ def write_config(config: Mapping[str, Any], config_path: str):

class DefaultConnectorMixin:
# can be overridden to change an input config
def configure(self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
def configure(self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> ObservedDict[str, Any]:
config_path = os.path.join(temp_dir, "config.json")
self.write_config(config, config_path)
return config
config_observer = ConfigObserver(config_path, self.write_config)
observed_config = ObservedDict(config, config_observer)
config_observer.set_config(observed_config)
return observed_config


class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC):
Expand Down
21 changes: 8 additions & 13 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,18 @@ def dict(self, *args, **kwargs):
return super().dict(*args, **kwargs)


def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]:
def split_config(config: MutableMapping[str, Any]) -> Tuple[MutableMapping, InternalConfig]:
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
"""
Break config map object into 2 instances: first is a dict with user defined
configuration and second is internal config that contains private keys for
acceptance test configuration.
Remove internal config keywords from config and build an InternalConfig from it.

:param
config - Dict object that has been loaded from config file.
config - MutableMapping object that has been loaded from config file.

:return tuple of user defined config dict with filtered out internal
:return tuple of user defined config MutableMapping with filtered out internal
parameters and SAT internal config object.
"""
main_config = {}
internal_config = {}
for k, v in config.items():
if k in InternalConfig.KEYWORDS:
internal_config[k] = v
else:
main_config[k] = v
return main_config, InternalConfig.parse_obj(internal_config)
for key in list(config.keys()):
if key in InternalConfig.KEYWORDS:
internal_config[key] = config.pop(key)
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
return config, InternalConfig.parse_obj(internal_config)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.9.2",
version="0.10.0",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
1 change: 1 addition & 0 deletions airbyte-cdk/python/unit_tests/sources/test_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ def test_internal_config_limit(abstract_source, catalog):

# Set limit and check if state is produced when limit is set for incremental stream
logger_mock.reset_mock()
internal_config = {"some_config": 100, "_limit": STREAM_LIMIT}
records = [r for r in abstract_source.read(logger=logger_mock, config=internal_config, catalog=catalog, state={})]
assert len(records) == STREAM_LIMIT + 1
assert records[-1].type == Type.STATE
Expand Down
53 changes: 53 additions & 0 deletions airbyte-cdk/python/unit_tests/test_config_observation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import time

from airbyte_cdk.config_observation import ConfigObserver, ObservedDict


class TestObservedDict:
def test_update_called_on_set_item(self, mocker):
mock_observer = mocker.Mock()
my_observed_dict = ObservedDict({"key": "value"}, mock_observer)
assert mock_observer.update.call_count == 0
my_observed_dict["key"] = {"nested_key": "nested_value"}
assert mock_observer.update.call_count == 1
my_observed_dict["key"]["nested_key"] = "new_nested_value"
assert mock_observer.update.call_count == 2
# Setting the same value again should not call observer's update
my_observed_dict["key"]["nested_key"] = "new_nested_value"
assert mock_observer.update.call_count == 2
alafanechere marked this conversation as resolved.
Show resolved Hide resolved

def test_update_not_called_on_init_with_nested_fields(self, mocker):
mock_observer = mocker.Mock()
ObservedDict({"key": "value", "nested": {"key": "value"}}, mock_observer)
mock_observer.update.assert_not_called()


class TestConfigObserver:
def test_update(self, mocker, capsys):
mock_write_config_fn = mocker.Mock()
mock_config_path = mocker.Mock()
config_observer = ConfigObserver(mock_config_path, mock_write_config_fn)
config_observer.config = ObservedDict({"key": "value"}, config_observer)
before_time = time.time() * 1000
config_observer.update()
after_time = time.time() * 1000
captured = capsys.readouterr()
raw_control_message = json.loads(captured.out)
mock_write_config_fn.assert_called_with(config_observer.config, mock_config_path)
assert raw_control_message["type"] == "CONNECTOR_CONFIG"
assert raw_control_message["connectorConfig"] == {"config": dict(config_observer.config)}
assert before_time < raw_control_message["emitted_at"] < after_time

def test_set_config(self, mocker):
mock_write_config_fn = mocker.Mock()
mock_config_path = mocker.Mock()
config_observer = ConfigObserver(mock_config_path, mock_write_config_fn)
observed_config = ObservedDict({"key": "value"}, config_observer)
config_observer.set_config(observed_config)
assert config_observer.config == observed_config
mock_write_config_fn.assert_called_once_with(observed_config, mock_config_path)
11 changes: 11 additions & 0 deletions airbyte-cdk/python/unit_tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pytest
import yaml
from airbyte_cdk import AirbyteSpec, Connector
from airbyte_cdk.config_observation import ConfigObserver, ObservedDict
from airbyte_cdk.models import AirbyteConnectionStatus

logger = logging.getLogger("airbyte")
Expand Down Expand Up @@ -82,6 +83,16 @@ def test_write_config(integration, mock_config):
assert mock_config == json.loads(actual.read())


def test_configure(integration, mock_config):
temp_dir = tempfile.gettempdir()
config = integration.configure(mock_config, temp_dir)
assert isinstance(config, ObservedDict)
assert isinstance(config.observer, ConfigObserver)
assert config.observer.config == config
assert config.observer.config_path == os.path.join(temp_dir, "config.json")
assert config.observer.write_config_fn == integration.write_config


class TestConnectorSpec:
CONNECTION_SPECIFICATION = {
"type": "object",
Expand Down