-
Notifications
You must be signed in to change notification settings - Fork 4.1k
/
test_config_observation.py
76 lines (59 loc) · 2.9 KB
/
test_config_observation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import json
import time
import pytest
from airbyte_cdk.config_observation import ConfigObserver, ObservedDict, observe_connector_config
class TestObservedDict:
def test_update_called_on_set_item(self, mocker):
mock_observer = mocker.Mock()
my_observed_dict = ObservedDict(
{"key": "value", "nested_dict": {"key": "value"}, "list_of_dict": [{"key": "value"}, {"key": "value"}]}, mock_observer
)
assert mock_observer.update.call_count == 0
my_observed_dict["nested_dict"]["key"] = "new_value"
assert mock_observer.update.call_count == 1
# Setting the same value again should call observer's update
my_observed_dict["key"] = "new_value"
assert mock_observer.update.call_count == 2
my_observed_dict["nested_dict"]["new_key"] = "value"
assert mock_observer.update.call_count == 3
my_observed_dict["list_of_dict"][0]["key"] = "new_value"
assert mock_observer.update.call_count == 4
my_observed_dict["list_of_dict"][0]["new_key"] = "new_value"
assert mock_observer.update.call_count == 5
my_observed_dict["new_list_of_dicts"] = [{"foo": "bar"}]
assert mock_observer.update.call_count == 6
my_observed_dict["new_list_of_dicts"][0]["new_key"] = "new_value"
assert mock_observer.update.call_count == 7
class TestConfigObserver:
def test_update(self, capsys):
config_observer = ConfigObserver()
config_observer.set_config(ObservedDict({"key": "value"}, config_observer))
before_time = time.time() * 1000
config_observer.update()
after_time = time.time() * 1000
captured = capsys.readouterr()
airbyte_message = json.loads(captured.out)
assert airbyte_message["type"] == "CONTROL"
assert "control" in airbyte_message
raw_control_message = airbyte_message["control"]
assert raw_control_message["type"] == "CONNECTOR_CONFIG"
assert raw_control_message["connectorConfig"] == {"config": dict(config_observer.config)}
assert before_time < raw_control_message["emitted_at"] < after_time
def test_observe_connector_config(capsys):
non_observed_config = {"foo": "bar"}
observed_config = observe_connector_config(non_observed_config)
observer = observed_config.observer
assert isinstance(observed_config, ObservedDict)
assert isinstance(observer, ConfigObserver)
assert observed_config.observer.config == observed_config
observed_config["foo"] = "foo"
captured = capsys.readouterr()
airbyte_message = json.loads(captured.out)
assert airbyte_message["control"]["connectorConfig"] == {"config": {"foo": "foo"}}
def test_observe_already_observed_config():
observed_config = observe_connector_config({"foo": "bar"})
with pytest.raises(ValueError):
observe_connector_config(observed_config)