diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index df550bb0d53a..1c5a7ed750de 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.8.1 +Low-code: Don't update cursor for non-record messages and fix default loader for connector builder manifests + ## 0.8.0 Low-code: Allow for request and response to be emitted as log messages diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index e35820ed9d32..62dbaf09addd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -371,11 +371,14 @@ def read_records( stream_state, ) for record in records_generator: - self.stream_slicer.update_cursor(stream_slice, last_record=record) + # Only record messages should be parsed to update the cursor which is indicated by the Mapping type + if isinstance(record, Mapping): + self.stream_slicer.update_cursor(stream_slice, last_record=record) yield record else: last_record = self._last_records[-1] if self._last_records else None - self.stream_slicer.update_cursor(stream_slice, last_record=last_record) + if last_record and isinstance(last_record, Mapping): + self.stream_slicer.update_cursor(stream_slice, last_record=last_record) yield from [] def stream_slices( diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/default_schema_loader.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/default_schema_loader.py index 8e5129ebeab9..9344ffeed684 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/default_schema_loader.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/default_schema_loader.py @@ -38,7 +38,7 @@ def get_json_schema(self) -> Mapping[str, Any]: try: return self.default_loader.get_json_schema() - except FileNotFoundError: + except OSError: # A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the # runtime options stores stream name 'name' so we'll do the same here stream_name = self._options.get("name", "") diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py index de3fb380d1bc..fa4e5a99a005 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_file_schema_loader.py @@ -15,15 +15,18 @@ def _default_file_path() -> str: - # schema files are always in "source_/schemas/.json - # the connector's module name can be inferred by looking at the modules loaded and look for the one starting with source_ + # Schema files are always in "source_/schemas/.json + # The connector's module name can be inferred by looking at the modules loaded and look for the one starting with source_ source_modules = [ - k for k, v in sys.modules.items() if "source_" in k # example: ['source_exchange_rates', 'source_exchange_rates.source'] - ] - if not source_modules: - raise RuntimeError("Expected at least one module starting with 'source_'") - module = source_modules[0].split(".")[0] - return f"./{module}/schemas/{{{{options['name']}}}}.json" + k for k, v in sys.modules.items() if "source_" in k + ] # example: ['source_exchange_rates', 'source_exchange_rates.source'] + if source_modules: + module = source_modules[0].split(".")[0] + return f"./{module}/schemas/{{{{options['name']}}}}.json" + + # If we are not in a source_ module, the most likely scenario is we're processing a manifest from the connector builder + # server which does not require a json schema to be defined. + return "./{{options['name']}}.json" @dataclass diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 09570bdbe582..773743d05205 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.8.0", + version="0.8.1", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index c9dba84fb101..9f8059cb8e4f 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -7,18 +7,23 @@ import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status import pytest import requests -from airbyte_cdk.models import SyncMode +from airbyte_cdk.models import AirbyteLogMessage, Level, SyncMode from airbyte_cdk.sources.declarative.exceptions import ReadException from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever +from airbyte_cdk.sources.declarative.stream_slicers import DatetimeStreamSlicer from airbyte_cdk.sources.streams.http.auth import NoAuth from airbyte_cdk.sources.streams.http.http import HttpStream primary_key = "pk" records = [{"id": 1}, {"id": 2}] +request_response_logs = [ + AirbyteLogMessage(level=Level.INFO, message="request:{}"), + AirbyteLogMessage(level=Level.INFO, message="response{}"), +] config = {} @@ -90,7 +95,7 @@ def test_simple_retriever_full(mock_http_stream): assert retriever._last_response is None assert retriever._last_records is None - assert retriever.parse_response(response, stream_state=None) == records + assert retriever.parse_response(response, stream_state={}) == records assert retriever._last_response == response assert retriever._last_records == records @@ -107,6 +112,67 @@ def test_simple_retriever_full(mock_http_stream): paginator.reset.assert_called() +@patch.object(HttpStream, "_read_pages", return_value=[*request_response_logs, *records]) +def test_simple_retriever_with_request_response_logs(mock_http_stream): + requester = MagicMock() + paginator = MagicMock() + record_selector = MagicMock() + iterator = DatetimeStreamSlicer( + start_datetime="", end_datetime="", step="1d", cursor_field="id", datetime_format="", config={}, options={} + ) + + retriever = SimpleRetriever( + name="stream_name", + primary_key=primary_key, + requester=requester, + paginator=paginator, + record_selector=record_selector, + stream_slicer=iterator, + options={}, + config={}, + ) + + actual_messages = [r for r in retriever.read_records(SyncMode.full_refresh)] + paginator.reset.assert_called() + + assert isinstance(actual_messages[0], AirbyteLogMessage) + assert isinstance(actual_messages[1], AirbyteLogMessage) + assert actual_messages[2] == records[0] + assert actual_messages[3] == records[1] + + +@patch.object(HttpStream, "_read_pages", return_value=[]) +def test_simple_retriever_with_request_response_log_last_records(mock_http_stream): + requester = MagicMock() + paginator = MagicMock() + record_selector = MagicMock() + record_selector.select_records.return_value = request_response_logs + response = requests.Response() + iterator = DatetimeStreamSlicer( + start_datetime="", end_datetime="", step="1d", cursor_field="id", datetime_format="", config={}, options={} + ) + + retriever = SimpleRetriever( + name="stream_name", + primary_key=primary_key, + requester=requester, + paginator=paginator, + record_selector=record_selector, + stream_slicer=iterator, + options={}, + config={}, + ) + + assert retriever._last_response is None + assert retriever._last_records is None + assert retriever.parse_response(response, stream_state={}) == request_response_logs + assert retriever._last_response == response + assert retriever._last_records == request_response_logs + + [r for r in retriever.read_records(SyncMode.full_refresh)] + paginator.reset.assert_called() + + @pytest.mark.parametrize( "test_name, requester_response, expected_should_retry, expected_backoff_time", [