From adf1dd483db0a21ede5f13697d89862100a73d76 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 10 Oct 2022 13:27:31 -0700 Subject: [PATCH 1/5] Implement a test --- .../declarative/checks/test_check_stream.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py index 827b99ab6484..cd47a49bd676 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py @@ -15,17 +15,19 @@ @pytest.mark.parametrize( - "test_name, record, streams_to_check, expectation", + "test_name, record, streams_to_check, stream_slice, expectation", [ - ("test success check", record, stream_names, (True, None)), - ("test fail check", None, stream_names, (True, None)), - ("test try to check invalid stream", record, ["invalid_stream_name"], None), + ("test_success_check", record, stream_names, {}, (True, None)), + ("test_success_check_stream_slice", record, stream_names, {"slice": "slice_value"}, (True, None)), + ("test_fail_check", None, stream_names, {}, (True, None)), + ("test_try_to_check_invalid stream", record, ["invalid_stream_name"], {}, None), ], ) -def test_check_stream(test_name, record, streams_to_check, expectation): +def test_check_stream(test_name, record, streams_to_check, stream_slice, expectation): stream = MagicMock() stream.name = "s1" - stream.read_records.return_value = iter([record]) + stream.stream_slices.return_value = iter([stream_slice]) + stream.read_records.side_effect = mock_responses({frozenset(stream_slice): iter([record])}) source = MagicMock() source.streams.return_value = [stream] @@ -38,3 +40,7 @@ def test_check_stream(test_name, record, streams_to_check, expectation): else: with pytest.raises(ValueError): check_stream.check_connection(source, logger, config) + + +def mock_responses(responses, default_response=None, **kwargs): + return lambda stream_slice, sync_mode: responses[frozenset(stream_slice)] if frozenset(stream_slice) in responses else default_response From 595db9e047eb829a6957625fc66b7bb073de19a7 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 10 Oct 2022 13:28:26 -0700 Subject: [PATCH 2/5] Implement fix --- .../sources/declarative/checks/check_stream.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index 3350a57fa3c7..e2b5143a7d0e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -35,8 +35,17 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi for stream_name in self.stream_names: if stream_name in stream_name_to_stream.keys(): stream = stream_name_to_stream[stream_name] + # Some streams need a stream slice to read records (eg if they have a SubstreamSlicer) try: - records = stream.read_records(sync_mode=SyncMode.full_refresh) + slices = stream.stream_slices( + cursor_field=stream.cursor_field, + sync_mode=SyncMode.full_refresh, + ) + try: + _slice = next(slices) + except StopIteration: + _slice = {} + records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice) next(records) except Exception as error: return False, f"Unable to connect to stream {stream_name} - {error}" From 92c0d7303933e65f2ada949cf89d3e7bb962b67d Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 10 Oct 2022 13:30:21 -0700 Subject: [PATCH 3/5] rename --- .../sources/declarative/checks/test_check_stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py index cd47a49bd676..30fe0ea32855 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py @@ -27,7 +27,7 @@ def test_check_stream(test_name, record, streams_to_check, stream_slice, expecta stream = MagicMock() stream.name = "s1" stream.stream_slices.return_value = iter([stream_slice]) - stream.read_records.side_effect = mock_responses({frozenset(stream_slice): iter([record])}) + stream.read_records.side_effect = mock_read_records({frozenset(stream_slice): iter([record])}) source = MagicMock() source.streams.return_value = [stream] @@ -42,5 +42,5 @@ def test_check_stream(test_name, record, streams_to_check, stream_slice, expecta check_stream.check_connection(source, logger, config) -def mock_responses(responses, default_response=None, **kwargs): +def mock_read_records(responses, default_response=None, **kwargs): return lambda stream_slice, sync_mode: responses[frozenset(stream_slice)] if frozenset(stream_slice) in responses else default_response From 692c48f24f03eb570cfb79e315b854f3d8dca797 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 10 Oct 2022 13:42:48 -0700 Subject: [PATCH 4/5] extract method --- .../declarative/checks/check_stream.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index e2b5143a7d0e..5a6f7c9552ee 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -35,20 +35,23 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi for stream_name in self.stream_names: if stream_name in stream_name_to_stream.keys(): stream = stream_name_to_stream[stream_name] - # Some streams need a stream slice to read records (eg if they have a SubstreamSlicer) try: - slices = stream.stream_slices( - cursor_field=stream.cursor_field, - sync_mode=SyncMode.full_refresh, - ) - try: - _slice = next(slices) - except StopIteration: - _slice = {} - records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice) + # Some streams need a stream slice to read records (eg if they have a SubstreamSlicer) + stream_slice = self._get_stream_slice(stream) + records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) next(records) except Exception as error: return False, f"Unable to connect to stream {stream_name} - {error}" else: raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}") return True, None + + def _get_stream_slice(self, stream): + slices = stream.stream_slices( + cursor_field=stream.cursor_field, + sync_mode=SyncMode.full_refresh, + ) + try: + return next(slices) + except StopIteration: + return {} From 363670fe8081524653fb479b60cc4c760713ce5b Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 17 Oct 2022 05:24:19 -0700 Subject: [PATCH 5/5] bump --- airbyte-cdk/python/CHANGELOG.md | 4 ++++ airbyte-cdk/python/setup.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index c9832fb8fc90..2fbb4f21c01a 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.1.100 + +- Low-code: Pass stream_slice to read_records when reading from CheckStream + ## 0.1.99 - Low-code: Fix default stream schema loader diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index c13ab4ce12ff..c3f205f35843 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.99", + version="0.1.100", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown",