From cf63ee55778aef9f6e93fff78acf86cd4e1f7f21 Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc Date: Fri, 20 Jan 2023 13:52:05 -0500 Subject: [PATCH] [ISSUE #20771] adding slices to connector builder read request (#21605) * [ISSUE #20771] adding slices to connector builder read request * [ISSUE #20771] formatting * [ISSUE #20771] set flag when limit requests reached (#21619) * [ISSUE #20771] set flag when limit requests reached * [ISSUE #20771] assert proper value on test read objects __init__ * [ISSUE #20771] code review and fix edge case --- .../airbyte_cdk/sources/abstract_source.py | 9 +- .../parsers/model_to_component_factory.py | 12 +- .../paginators/default_paginator.py | 4 +- .../retrievers/simple_retriever.py | 12 +- .../retrievers/test_simple_retriever.py | 12 +- .../test_manifest_declarative_source.py | 90 +++++++++++ .../sources/test_abstract_source.py | 51 ++++++ .../python/unit_tests/sources/test_source.py | 22 +-- .../connector_builder/entrypoint.py | 8 +- .../generated/models/stream_read.py | 2 + .../connector_builder/impl/adapter.py | 8 + .../connector_builder/impl/default_api.py | 81 +++++++--- .../impl/low_code_cdk_adapter.py | 20 ++- .../src/main/openapi/openapi.yaml | 4 + .../impl/test_default_api.py | 148 +++++++++++++++--- .../impl/test_low_code_cdk_adapter.py | 19 ++- 16 files changed, 417 insertions(+), 85 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index d1ac63e76e82..12e0fc57480f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -2,6 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import json import logging from abc import ABC, abstractmethod from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union @@ -9,10 +10,12 @@ from airbyte_cdk.models import ( AirbyteCatalog, AirbyteConnectionStatus, + AirbyteLogMessage, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, + Level, Status, SyncMode, ) @@ -232,7 +235,8 @@ def _read_incremental( has_slices = False for _slice in slices: has_slices = True - logger.debug("Processing stream slice", extra={"slice": _slice}) + if logger.isEnabledFor(logging.DEBUG): + yield AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"slice:{json.dumps(_slice)}")) records = stream_instance.read_records( sync_mode=SyncMode.incremental, stream_slice=_slice, @@ -281,7 +285,8 @@ def _read_full_refresh( ) total_records_counter = 0 for _slice in slices: - logger.debug("Processing stream slice", extra={"slice": _slice}) + if logger.isEnabledFor(logging.DEBUG): + yield AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"slice:{json.dumps(_slice)}")) record_data_or_messages = stream_instance.read_records( stream_slice=_slice, sync_mode=SyncMode.full_refresh, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 214d6742a8f6..ff6a5c34c854 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -116,9 +116,10 @@ class ModelToComponentFactory: - def __init__(self, is_test_read=False): + def __init__(self, limit_pages_fetched_per_slice: int = None, limit_slices_fetched: int = None): self._init_mappings() - self._is_test_read = is_test_read + self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice + self._limit_slices_fetched = limit_slices_fetched def _init_mappings(self): self.PYDANTIC_MODEL_TO_CONSTRUCTOR: [Type[BaseModel], Callable] = { @@ -482,8 +483,8 @@ def create_default_paginator(self, model: DefaultPaginatorModel, config: Config, config=config, options=model.options, ) - if self._is_test_read: - return PaginatorTestReadDecorator(paginator) + if self._limit_pages_fetched_per_slice: + return PaginatorTestReadDecorator(paginator, self._limit_pages_fetched_per_slice) return paginator def create_dpath_extractor(self, model: DpathExtractorModel, config: Config, **kwargs) -> DpathExtractor: @@ -681,7 +682,7 @@ def create_simple_retriever(self, model: SimpleRetrieverModel, config: Config, * self._create_component_from_model(model=model.stream_slicer, config=config) if model.stream_slicer else SingleSlice(options={}) ) - if self._is_test_read: + if self._limit_slices_fetched: return SimpleRetrieverTestReadDecorator( name=model.name, paginator=paginator, @@ -690,6 +691,7 @@ def create_simple_retriever(self, model: SimpleRetrieverModel, config: Config, * record_selector=record_selector, stream_slicer=stream_slicer, config=config, + maximum_number_of_slices=self._limit_slices_fetched, options=model.options, ) return SimpleRetriever( diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py index 3de89e4d3823..7c8775edae70 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py @@ -172,9 +172,11 @@ class PaginatorTestReadDecorator(Paginator): _DEFAULT_PAGINATION_LIMIT = 5 def __init__(self, decorated, maximum_number_of_pages: int = None): + if maximum_number_of_pages and maximum_number_of_pages < 1: + raise ValueError(f"The maximum number of pages on a test read needs to be strictly positive. Got {maximum_number_of_pages}") + self._maximum_number_of_pages = maximum_number_of_pages if maximum_number_of_pages else self._DEFAULT_PAGINATION_LIMIT self._decorated = decorated self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL - self._maximum_number_of_pages = maximum_number_of_pages if maximum_number_of_pages else self._DEFAULT_PAGINATION_LIMIT def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: if self._page_count >= self._maximum_number_of_pages: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 4c75e5c4a0d4..049b4af70a47 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -417,18 +417,26 @@ def _parse_records_and_emit_request_and_responses(self, request, response, strea yield from self.parse_response(response, stream_slice=stream_slice, stream_state=stream_state) +@dataclass class SimpleRetrieverTestReadDecorator(SimpleRetriever): """ In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of slices that are queried throughout a read command. """ - _MAXIMUM_NUMBER_OF_SLICES = 5 + maximum_number_of_slices: int = 5 + + def __post_init__(self, options: Mapping[str, Any]): + super().__post_init__(options) + if self.maximum_number_of_slices and self.maximum_number_of_slices < 1: + raise ValueError( + f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}" + ) def stream_slices( self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Optional[StreamState] = None ) -> Iterable[Optional[Mapping[str, Any]]]: - return islice(super().stream_slices(sync_mode=sync_mode, stream_state=stream_state), self._MAXIMUM_NUMBER_OF_SLICES) + return islice(super().stream_slices(sync_mode=sync_mode, stream_state=stream_state), self.maximum_number_of_slices) def _prepared_request_to_airbyte_message(request: requests.PreparedRequest) -> AirbyteMessage: diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index f151af70c9d0..1a8db7a368bf 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -633,8 +633,9 @@ def test_response_to_airbyte_message(test_name, response_body, response_headers, def test_limit_stream_slices(): + maximum_number_of_slices = 4 stream_slicer = MagicMock() - stream_slicer.stream_slices.return_value = [{"date": f"2022-01-0{day}"} for day in range(1, 10)] + stream_slicer.stream_slices.return_value = _generate_slices(maximum_number_of_slices * 2) retriever = SimpleRetrieverTestReadDecorator( name="stream_name", primary_key=primary_key, @@ -642,10 +643,15 @@ def test_limit_stream_slices(): paginator=MagicMock(), record_selector=MagicMock(), stream_slicer=stream_slicer, + maximum_number_of_slices=maximum_number_of_slices, options={}, config={}, ) - truncated_slices = retriever.stream_slices(sync_mode=SyncMode.incremental, stream_state=None) + truncated_slices = list(retriever.stream_slices(sync_mode=SyncMode.incremental, stream_state=None)) - assert truncated_slices == [{"date": f"2022-01-0{day}"} for day in range(1, 6)] + assert truncated_slices == _generate_slices(maximum_number_of_slices) + + +def _generate_slices(number_of_slices): + return [{"date": f"2022-01-0{day + 1}"} for day in range(number_of_slices)] diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py index 01faf5c25c44..79d09c3787f5 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -12,6 +12,7 @@ from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from jsonschema.exceptions import ValidationError +from unittest.mock import patch logger = logging.getLogger("airbyte") @@ -542,6 +543,95 @@ def test_manifest_without_at_least_one_stream(self, construct_using_pydantic_mod ManifestDeclarativeSource(source_config=manifest, construct_using_pydantic_models=construct_using_pydantic_models) + @patch("airbyte_cdk.sources.declarative.declarative_source.DeclarativeSource.read") + def test_given_debug_when_read_then_set_log_level(self, declarative_source_read): + any_valid_manifest = { + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"}, + "schema_loader": { + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "page_size": 10, + }, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + { + "type": "DeclarativeStream", + "$options": {"name": "stream_with_custom_requester", "primary_key": "id", "url_base": "https://api.sendgrid.com"}, + "schema_loader": { + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "page_size": 10, + }, + }, + "requester": { + "type": "CustomRequester", + "class_name": "unit_tests.sources.declarative.external_component.SampleCustomComponent", + "path": "/v3/marketing/lists", + "custom_request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + source = ManifestDeclarativeSource(source_config=any_valid_manifest, debug=True, construct_using_pydantic_models=True) + + debug_logger = logging.getLogger("logger.debug") + list(source.read(debug_logger, {}, {}, {})) + + assert debug_logger.isEnabledFor(logging.DEBUG) + + def test_generate_schema(): schema_str = ManifestDeclarativeSource.generate_schema() schema = json.loads(schema_str) diff --git a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py index bbb808a0ec74..b7747bc5f0d0 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py @@ -331,6 +331,57 @@ def test_valid_full_refresh_read_with_slices(mocker): assert expected == messages +def test_read_full_refresh_with_slices_sends_slice_messages(mocker): + """Given the logger is debug and a full refresh, AirbyteMessages are sent for slices""" + debug_logger = logging.getLogger("airbyte.debug") + debug_logger.setLevel(logging.DEBUG) + slices = [{"1": "1"}, {"2": "2"}] + stream = MockStream( + [({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices], + name="s1", + ) + + mocker.patch.object(MockStream, "get_json_schema", return_value={}) + mocker.patch.object(MockStream, "stream_slices", return_value=slices) + + src = MockSource(streams=[stream]) + catalog = ConfiguredAirbyteCatalog( + streams=[ + _configured_stream(stream, SyncMode.full_refresh), + ] + ) + + messages = src.read(debug_logger, {}, catalog) + + assert 2 == len(list(filter(lambda message: message.log and message.log.message.startswith("slice:"), messages))) + + +def test_read_incremental_with_slices_sends_slice_messages(mocker): + """Given the logger is debug and a incremental, AirbyteMessages are sent for slices""" + debug_logger = logging.getLogger("airbyte.debug") + debug_logger.setLevel(logging.DEBUG) + slices = [{"1": "1"}, {"2": "2"}] + stream = MockStream( + [({"sync_mode": SyncMode.incremental, "stream_slice": s, 'stream_state': {}}, [s]) for s in slices], + name="s1", + ) + + MockStream.supports_incremental = mocker.PropertyMock(return_value=True) + mocker.patch.object(MockStream, "get_json_schema", return_value={}) + mocker.patch.object(MockStream, "stream_slices", return_value=slices) + + src = MockSource(streams=[stream]) + catalog = ConfiguredAirbyteCatalog( + streams=[ + _configured_stream(stream, SyncMode.incremental), + ] + ) + + messages = src.read(debug_logger, {}, catalog) + + assert 2 == len(list(filter(lambda message: message.log and message.log.message.startswith("slice:"), messages))) + + class TestIncrementalRead: @pytest.mark.parametrize( "use_legacy", diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index 5b67d57444eb..9ad96f8a6332 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -390,6 +390,7 @@ def test_internal_config_limit(abstract_source, catalog): logger_mock.level = logging.DEBUG del catalog.streams[1] STREAM_LIMIT = 2 + SLICE_DEBUG_LOG_COUNT = 1 FULL_RECORDS_NUMBER = 3 streams = abstract_source.streams(None) http_stream = streams[0] @@ -398,7 +399,7 @@ def test_internal_config_limit(abstract_source, catalog): catalog.streams[0].sync_mode = SyncMode.full_refresh records = [r for r in abstract_source.read(logger=logger_mock, config=internal_config, catalog=catalog, state={})] - assert len(records) == STREAM_LIMIT + assert len(records) == STREAM_LIMIT + SLICE_DEBUG_LOG_COUNT logger_info_args = [call[0][0] for call in logger_mock.info.call_args_list] # Check if log line matches number of limit read_log_record = [_l for _l in logger_info_args if _l.startswith("Read")] @@ -407,13 +408,13 @@ def test_internal_config_limit(abstract_source, catalog): # No limit, check if state record produced for incremental stream catalog.streams[0].sync_mode = SyncMode.incremental records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] - assert len(records) == FULL_RECORDS_NUMBER + 1 + assert len(records) == FULL_RECORDS_NUMBER + SLICE_DEBUG_LOG_COUNT + 1 assert records[-1].type == Type.STATE # Set limit and check if state is produced when limit is set for incremental stream logger_mock.reset_mock() records = [r for r in abstract_source.read(logger=logger_mock, config=internal_config, catalog=catalog, state={})] - assert len(records) == STREAM_LIMIT + 1 + assert len(records) == STREAM_LIMIT + SLICE_DEBUG_LOG_COUNT + 1 assert records[-1].type == Type.STATE logger_info_args = [call[0][0] for call in logger_mock.info.call_args_list] read_log_record = [_l for _l in logger_info_args if _l.startswith("Read")] @@ -425,14 +426,15 @@ def test_internal_config_limit(abstract_source, catalog): def test_source_config_no_transform(abstract_source, catalog): logger_mock = MagicMock() + SLICE_DEBUG_LOG_COUNT = 1 logger_mock.level = logging.DEBUG streams = abstract_source.streams(None) http_stream, non_http_stream = streams http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA http_stream.read_records.return_value, non_http_stream.read_records.return_value = [[{"value": 23}] * 5] * 2 records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] - assert len(records) == 2 * 5 - assert [r.record.data for r in records] == [{"value": 23}] * 2 * 5 + assert len(records) == 2 * (5 + SLICE_DEBUG_LOG_COUNT) + assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": 23}] * 2 * 5 assert http_stream.get_json_schema.call_count == 5 assert non_http_stream.get_json_schema.call_count == 5 @@ -440,6 +442,7 @@ def test_source_config_no_transform(abstract_source, catalog): def test_source_config_transform(abstract_source, catalog): logger_mock = MagicMock() logger_mock.level = logging.DEBUG + SLICE_DEBUG_LOG_COUNT = 2 streams = abstract_source.streams(None) http_stream, non_http_stream = streams http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) @@ -447,18 +450,19 @@ def test_source_config_transform(abstract_source, catalog): http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA http_stream.read_records.return_value, non_http_stream.read_records.return_value = [{"value": 23}], [{"value": 23}] records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] - assert len(records) == 2 - assert [r.record.data for r in records] == [{"value": "23"}] * 2 + assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT + assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": "23"}] * 2 def test_source_config_transform_and_no_transform(abstract_source, catalog): logger_mock = MagicMock() logger_mock.level = logging.DEBUG + SLICE_DEBUG_LOG_COUNT = 2 streams = abstract_source.streams(None) http_stream, non_http_stream = streams http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA http_stream.read_records.return_value, non_http_stream.read_records.return_value = [{"value": 23}], [{"value": 23}] records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] - assert len(records) == 2 - assert [r.record.data for r in records] == [{"value": "23"}, {"value": 23}] + assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT + assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": "23"}, {"value": 23}] diff --git a/airbyte-connector-builder-server/connector_builder/entrypoint.py b/airbyte-connector-builder-server/connector_builder/entrypoint.py index c27750155178..d2ee54d20c2b 100644 --- a/airbyte-connector-builder-server/connector_builder/entrypoint.py +++ b/airbyte-connector-builder-server/connector_builder/entrypoint.py @@ -4,10 +4,14 @@ from connector_builder.generated.apis.default_api_interface import initialize_router from connector_builder.impl.default_api import DefaultApiImpl -from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter +from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapterFactory from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5 +_MAXIMUM_NUMBER_OF_SLICES = 5 +_ADAPTER_FACTORY = LowCodeSourceAdapterFactory(_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, _MAXIMUM_NUMBER_OF_SLICES) + app = FastAPI( title="Connector Builder Server API", description="Connector Builder Server API ", @@ -22,4 +26,4 @@ allow_headers=["*"], ) -app.include_router(initialize_router(DefaultApiImpl(LowCodeSourceAdapter))) +app.include_router(initialize_router(DefaultApiImpl(_ADAPTER_FACTORY, _MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, _MAXIMUM_NUMBER_OF_SLICES))) diff --git a/airbyte-connector-builder-server/connector_builder/generated/models/stream_read.py b/airbyte-connector-builder-server/connector_builder/generated/models/stream_read.py index 163bb131e7c2..775b148fa3a7 100644 --- a/airbyte-connector-builder-server/connector_builder/generated/models/stream_read.py +++ b/airbyte-connector-builder-server/connector_builder/generated/models/stream_read.py @@ -19,11 +19,13 @@ class StreamRead(BaseModel): logs: The logs of this StreamRead. slices: The slices of this StreamRead. + test_read_limit_reached: The test_read_limit_reached of this StreamRead. inferred_schema: The inferred_schema of this StreamRead [Optional]. """ logs: List[object] slices: List[StreamReadSlices] + test_read_limit_reached: bool inferred_schema: Optional[Dict[str, Any]] = None StreamRead.update_forward_refs() diff --git a/airbyte-connector-builder-server/connector_builder/impl/adapter.py b/airbyte-connector-builder-server/connector_builder/impl/adapter.py index 840e0996fbaa..33da6ced709a 100644 --- a/airbyte-connector-builder-server/connector_builder/impl/adapter.py +++ b/airbyte-connector-builder-server/connector_builder/impl/adapter.py @@ -32,3 +32,11 @@ def read_stream(self, stream: str, config: Dict[str, Any]) -> Iterator[AirbyteMe :param config: The user-provided configuration as specified by the source's spec. :return: An iterator over `AirbyteMessage` objects. """ + + +class CdkAdapterFactory(ABC): + + @abstractmethod + def create(self, manifest: Dict[str, Any]) -> CdkAdapter: + """Return an implementation of CdkAdapter""" + pass diff --git a/airbyte-connector-builder-server/connector_builder/impl/default_api.py b/airbyte-connector-builder-server/connector_builder/impl/default_api.py index ef7ad588c3e7..23fff47ce88f 100644 --- a/airbyte-connector-builder-server/connector_builder/impl/default_api.py +++ b/airbyte-connector-builder-server/connector_builder/impl/default_api.py @@ -6,7 +6,7 @@ import logging import traceback from json import JSONDecodeError -from typing import Any, Callable, Dict, Iterable, Iterator, Optional, Union +from typing import Any, Dict, Iterable, Iterator, Optional, Union from urllib.parse import parse_qs, urljoin, urlparse from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type @@ -21,17 +21,21 @@ from connector_builder.generated.models.streams_list_read import StreamsListRead from connector_builder.generated.models.streams_list_read_streams import StreamsListReadStreams from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody -from connector_builder.impl.adapter import CdkAdapter +from connector_builder.impl.adapter import CdkAdapter, CdkAdapterFactory from fastapi import Body, HTTPException from jsonschema import ValidationError class DefaultApiImpl(DefaultApi): + logger = logging.getLogger("airbyte.connector-builder") - def __init__(self, adapter_cls: Callable[[Dict[str, Any]], CdkAdapter], max_record_limit: int = 1000): - self.adapter_cls = adapter_cls + def __init__(self, adapter_factory: CdkAdapterFactory, max_pages_per_slice, max_slices, max_record_limit: int = 1000): + self.adapter_factory = adapter_factory + self._max_pages_per_slice = max_pages_per_slice + self._max_slices = max_slices self.max_record_limit = max_record_limit + super().__init__() async def get_manifest_template(self) -> str: @@ -124,7 +128,7 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo else: record_limit = min(stream_read_request_body.record_limit, self.max_record_limit) - single_slice = StreamReadSlices(pages=[]) + slices = [] log_messages = [] try: for message_group in self._get_message_groups( @@ -135,7 +139,7 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo if isinstance(message_group, AirbyteLogMessage): log_messages.append({"message": message_group.message}) else: - single_slice.pages.append(message_group) + slices.append(message_group) except Exception as error: # TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec self.logger.error(f"Could not perform read with with error: {error.args[0]} - {self._get_stacktrace_as_string(error)}") @@ -144,11 +148,20 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo detail=f"Could not perform read with with error: {error.args[0]}", ) - return StreamRead(logs=log_messages, slices=[single_slice], + return StreamRead(logs=log_messages, slices=slices, test_read_limit_reached=self._has_reached_limit(slices), inferred_schema=schema_inferrer.get_stream_schema(stream_read_request_body.stream)) + def _has_reached_limit(self, slices): + if len(slices) >= self._max_slices: + return True + + for slice in slices: + if len(slice.pages) >= self._max_pages_per_slice: + return True + return False + def _get_message_groups(self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, limit: int) -> Iterable[ - Union[StreamReadPages, AirbyteLogMessage]]: + Union[StreamReadSlices, AirbyteLogMessage]]: """ Message groups are partitioned according to when request log messages are received. Subsequent response log messages and record messages belong to the prior request log message and when we encounter another request, append the latest @@ -164,33 +177,53 @@ def _get_message_groups(self, messages: Iterator[AirbyteMessage], schema_inferre Note: The exception is that normal log messages can be received at any time which are not incorporated into grouping """ - first_page = True - current_records = [] + records_count = 0 + at_least_one_page_in_group = False + current_page_records = [] + current_slice_pages = [] current_page_request: Optional[HttpRequest] = None current_page_response: Optional[HttpResponse] = None - while len(current_records) < limit and (message := next(messages, None)): - if first_page and message.type == Type.LOG and message.log.message.startswith("request:"): - first_page = False - request = self._create_request_from_log_message(message.log) - current_page_request = request + while records_count < limit and (message := next(messages, None)): + if self._need_to_close_page(at_least_one_page_in_group, message): + self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records) + + if at_least_one_page_in_group and message.type == Type.LOG and message.log.message.startswith("slice:"): + yield StreamReadSlices(pages=current_slice_pages) + current_slice_pages = [] + at_least_one_page_in_group = False elif message.type == Type.LOG and message.log.message.startswith("request:"): - if not current_page_request or not current_page_response: - raise ValueError("Every message grouping should have at least one request and response") - yield StreamReadPages(request=current_page_request, response=current_page_response, records=current_records) + if not at_least_one_page_in_group: + at_least_one_page_in_group = True current_page_request = self._create_request_from_log_message(message.log) - current_records = [] + current_page_response = None elif message.type == Type.LOG and message.log.message.startswith("response:"): current_page_response = self._create_response_from_log_message(message.log) elif message.type == Type.LOG: yield message.log elif message.type == Type.RECORD: - current_records.append(message.record.data) + current_page_records.append(message.record.data) + records_count += 1 schema_inferrer.accumulate(message.record) else: - if not current_page_request or not current_page_response: - raise ValueError("Every message grouping should have at least one request and response") - yield StreamReadPages(request=current_page_request, response=current_page_response, records=current_records) + self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records) + yield StreamReadSlices(pages=current_slice_pages) + + def _need_to_close_page(self, at_least_one_page_in_group, message): + return ( + at_least_one_page_in_group + and message.type == Type.LOG + and (message.log.message.startswith("request:") or message.log.message.startswith("slice:")) + ) + + def _close_page(self, current_page_request, current_page_response, current_slice_pages, current_page_records): + if not current_page_request or not current_page_response: + raise ValueError("Every message grouping should have at least one request and response") + + current_slice_pages.append( + StreamReadPages(request=current_page_request, response=current_page_response, records=current_page_records) + ) + current_page_records.clear() def _create_request_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpRequest]: # TODO: As a temporary stopgap, the CDK emits request data as a log message string. Ideally this should come in the @@ -228,7 +261,7 @@ def _create_response_from_log_message(self, log_message: AirbyteLogMessage) -> O def _create_low_code_adapter(self, manifest: Dict[str, Any]) -> CdkAdapter: try: - return self.adapter_cls(manifest=manifest) + return self.adapter_factory.create(manifest) except ValidationError as error: # TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec self.logger.error(f"Invalid connector manifest with error: {error.message} - {DefaultApiImpl._get_stacktrace_as_string(error)}") diff --git a/airbyte-connector-builder-server/connector_builder/impl/low_code_cdk_adapter.py b/airbyte-connector-builder-server/connector_builder/impl/low_code_cdk_adapter.py index f0cf0aa4d7bf..f298cc3588ad 100644 --- a/airbyte-connector-builder-server/connector_builder/impl/low_code_cdk_adapter.py +++ b/airbyte-connector-builder-server/connector_builder/impl/low_code_cdk_adapter.py @@ -10,13 +10,17 @@ from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory from airbyte_cdk.sources.declarative.yaml_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.streams.http import HttpStream -from connector_builder.impl.adapter import CdkAdapter +from connector_builder.impl.adapter import CdkAdapter, CdkAdapterFactory class LowCodeSourceAdapter(CdkAdapter): - def __init__(self, manifest: Dict[str, Any]): + def __init__(self, manifest: Dict[str, Any], limit_page_fetched_per_slice, limit_slices_fetched): # Request and response messages are only emitted for a sources that have debug turned on - self._source = ManifestDeclarativeSource(manifest, debug=True, component_factory=ModelToComponentFactory(is_test_read=True)) + self._source = ManifestDeclarativeSource( + manifest, + debug=True, + component_factory=ModelToComponentFactory(limit_page_fetched_per_slice, limit_slices_fetched) + ) def get_http_streams(self, config: Dict[str, Any]) -> List[HttpStream]: http_streams = [] @@ -59,3 +63,13 @@ def read_stream(self, stream: str, config: Dict[str, Any]) -> Iterator[AirbyteMe except Exception as e: yield AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.ERROR, message=str(e))) return + + +class LowCodeSourceAdapterFactory(CdkAdapterFactory): + + def __init__(self, max_pages_per_slice, max_slices): + self._max_pages_per_slice = max_pages_per_slice + self._max_slices = max_slices + + def create(self, manifest: Dict[str, Any]) -> CdkAdapter: + return LowCodeSourceAdapter(manifest, self._max_pages_per_slice, self._max_slices) diff --git a/airbyte-connector-builder-server/src/main/openapi/openapi.yaml b/airbyte-connector-builder-server/src/main/openapi/openapi.yaml index 61a828e3fbf0..c31d65d3ee94 100644 --- a/airbyte-connector-builder-server/src/main/openapi/openapi.yaml +++ b/airbyte-connector-builder-server/src/main/openapi/openapi.yaml @@ -77,6 +77,7 @@ components: required: - logs - slices + - test_read_limit_reached properties: logs: type: array @@ -123,6 +124,9 @@ components: type: object description: The STATE AirbyteMessage emitted at the end of this slice. This can be omitted if a stream slicer is not configured. # $ref: "#/components/schemas/AirbyteProtocol/definitions/AirbyteStateMessage" + test_read_limit_reached: + type: boolean + description: Whether the maximum number of request per slice or the maximum number of slices queried has been reached inferred_schema: type: object description: The narrowest JSON Schema against which every AirbyteRecord in the slices can validate successfully. This is inferred from reading every record in the output slices. diff --git a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py index d9bbf6a07fa1..d545728c8581 100644 --- a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py +++ b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py @@ -18,10 +18,13 @@ from connector_builder.generated.models.streams_list_read_streams import StreamsListReadStreams from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody from connector_builder.impl.default_api import DefaultApiImpl -from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter +from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapterFactory from fastapi import HTTPException from pydantic.error_wrappers import ValidationError +MAX_PAGES_PER_SLICE = 4 +MAX_SLICES = 3 + MANIFEST = { "version": "0.1.0", "type": "DeclarativeSource", @@ -93,13 +96,17 @@ def record_message(stream: str, data: dict) -> AirbyteMessage: return AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream=stream, data=data, emitted_at=1234)) +def slice_message() -> AirbyteMessage: + return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message='slice:{"key": "value"}')) + + def test_list_streams(): expected_streams = [ StreamsListReadStreams(name="hashiras", url="https://demonslayers.com/api/v1/hashiras"), StreamsListReadStreams(name="breathing-techniques", url="https://demonslayers.com/api/v1/breathing_techniques"), ] - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) streams_list_request_body = StreamsListRequestBody(manifest=MANIFEST, config=CONFIG) loop = asyncio.get_event_loop() actual_streams = loop.run_until_complete(api.list_streams(streams_list_request_body)) @@ -133,7 +140,7 @@ def test_list_streams_with_interpolated_urls(): expected_streams = StreamsListRead(streams=[StreamsListReadStreams(name="demons", url="https://upper-six.muzan.com/api/v1/demons")]) - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) streams_list_request_body = StreamsListRequestBody(manifest=manifest, config=CONFIG) loop = asyncio.get_event_loop() actual_streams = loop.run_until_complete(api.list_streams(streams_list_request_body)) @@ -167,7 +174,7 @@ def test_list_streams_with_unresolved_interpolation(): # The interpolated string {{ config['not_in_config'] }} doesn't resolve to anything so it ends up blank during interpolation expected_streams = StreamsListRead(streams=[StreamsListReadStreams(name="demons", url="https://.muzan.com/api/v1/demons")]) - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) streams_list_request_body = StreamsListRequestBody(manifest=manifest, config=CONFIG) loop = asyncio.get_event_loop() @@ -210,7 +217,7 @@ def test_read_stream(): ), ] - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ request_log_message(request), @@ -224,7 +231,7 @@ def test_read_stream(): ) ) - api = DefaultApiImpl(mock_source_adapter_cls) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() actual_response: StreamRead = loop.run_until_complete( @@ -275,7 +282,7 @@ def test_read_stream_with_logs(): {"message": "log message after the response"}, ] - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message="log message before the request")), @@ -289,7 +296,7 @@ def test_read_stream_with_logs(): ) ) - api = DefaultApiImpl(mock_source_adapter_cls) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() actual_response: StreamRead = loop.run_until_complete( @@ -318,7 +325,7 @@ def test_read_stream_record_limit(request_record_limit, max_record_limit): "body": {"custom": "field"}, } response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ request_log_message(request), @@ -335,7 +342,7 @@ def test_read_stream_record_limit(request_record_limit, max_record_limit): n_records = 2 record_limit = min(request_record_limit, max_record_limit) - api = DefaultApiImpl(mock_source_adapter_cls, max_record_limit=max_record_limit) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES, max_record_limit=max_record_limit) loop = asyncio.get_event_loop() actual_response: StreamRead = loop.run_until_complete( api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config=CONFIG, stream="hashiras", record_limit=request_record_limit)) @@ -361,7 +368,7 @@ def test_read_stream_default_record_limit(max_record_limit): "body": {"custom": "field"}, } response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ request_log_message(request), @@ -377,7 +384,7 @@ def test_read_stream_default_record_limit(max_record_limit): ) n_records = 2 - api = DefaultApiImpl(mock_source_adapter_cls, max_record_limit=max_record_limit) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES, max_record_limit=max_record_limit) loop = asyncio.get_event_loop() actual_response: StreamRead = loop.run_until_complete( api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config=CONFIG, stream="hashiras")) @@ -396,7 +403,7 @@ def test_read_stream_limit_0(): "body": {"custom": "field"}, } response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ request_log_message(request), @@ -410,7 +417,7 @@ def test_read_stream_limit_0(): ] ) ) - api = DefaultApiImpl(mock_source_adapter_cls) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() with pytest.raises(ValidationError): @@ -451,7 +458,7 @@ def test_read_stream_no_records(): ), ] - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ request_log_message(request), @@ -462,7 +469,7 @@ def test_read_stream_no_records(): ) ) - api = DefaultApiImpl(mock_source_adapter_cls) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() actual_response: StreamRead = loop.run_until_complete( @@ -499,7 +506,7 @@ def test_invalid_manifest(): expected_status_code = 400 - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() with pytest.raises(HTTPException) as actual_exception: loop.run_until_complete(api.read_stream(StreamReadRequestBody(manifest=invalid_manifest, config={}, stream="hashiras"))) @@ -510,7 +517,7 @@ def test_invalid_manifest(): def test_read_stream_invalid_group_format(): response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ response_log_message(response), @@ -520,7 +527,7 @@ def test_read_stream_invalid_group_format(): ) ) - api = DefaultApiImpl(mock_source_adapter_cls) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() with pytest.raises(HTTPException) as actual_exception: @@ -532,7 +539,7 @@ def test_read_stream_invalid_group_format(): def test_read_stream_returns_error_if_stream_does_not_exist(): expected_status_code = 400 - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() with pytest.raises(HTTPException) as actual_exception: loop.run_until_complete(api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config={}, stream="not_in_manifest"))) @@ -582,7 +589,7 @@ def test_read_stream_returns_error_if_stream_does_not_exist(): ) def test_create_request_from_log_message(log_message, expected_request): airbyte_log_message = AirbyteLogMessage(level=Level.INFO, message=log_message) - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) actual_request = api._create_request_from_log_message(airbyte_log_message) assert actual_request == expected_request @@ -617,15 +624,104 @@ def test_create_response_from_log_message(log_message, expected_response): response_message = f"response:{json.dumps(log_message)}" airbyte_log_message = AirbyteLogMessage(level=Level.INFO, message=response_message) - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) actual_response = api._create_response_from_log_message(airbyte_log_message) assert actual_response == expected_response -def make_mock_adapter_cls(return_value: Iterator) -> MagicMock: - mock_source_adapter_cls = MagicMock() +def test_read_stream_with_many_slices(): + request = {} + response = {"status_code": 200} + + mock_source_adapter_cls = make_mock_adapter_factory( + iter( + [ + slice_message(), + request_log_message(request), + response_log_message(response), + record_message("hashiras", {"name": "Muichiro Tokito"}), + slice_message(), + request_log_message(request), + response_log_message(response), + record_message("hashiras", {"name": "Shinobu Kocho"}), + record_message("hashiras", {"name": "Mitsuri Kanroji"}), + request_log_message(request), + response_log_message(response), + record_message("hashiras", {"name": "Obanai Iguro"}), + request_log_message(request), + response_log_message(response), + ] + ) + ) + + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) + + loop = asyncio.get_event_loop() + stream_read: StreamRead = loop.run_until_complete( + api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config=CONFIG, stream="hashiras")) + ) + + assert not stream_read.test_read_limit_reached + assert len(stream_read.slices) == 2 + + assert len(stream_read.slices[0].pages) == 1 + assert len(stream_read.slices[0].pages[0].records) == 1 + + assert len(stream_read.slices[1].pages) == 3 + assert len(stream_read.slices[1].pages[0].records) == 2 + assert len(stream_read.slices[1].pages[1].records) == 1 + assert len(stream_read.slices[1].pages[2].records) == 0 + + + +def test_read_stream_given_maximum_number_of_slices_then_test_read_limit_reached(): + maximum_number_of_slices = 5 + request = {} + response = {"status_code": 200} + mock_source_adapter_cls = make_mock_adapter_factory( + iter( + [ + slice_message(), + request_log_message(request), + response_log_message(response) + ] * maximum_number_of_slices + ) + ) + + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) + + loop = asyncio.get_event_loop() + stream_read: StreamRead = loop.run_until_complete( + api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config=CONFIG, stream="hashiras")) + ) + + assert stream_read.test_read_limit_reached + + +def test_read_stream_given_maximum_number_of_pages_then_test_read_limit_reached(): + maximum_number_of_pages_per_slice = 5 + request = {} + response = {"status_code": 200} + mock_source_adapter_cls = make_mock_adapter_factory( + iter( + [slice_message()] + [request_log_message(request), response_log_message(response)] * maximum_number_of_pages_per_slice + ) + ) + + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) + + loop = asyncio.get_event_loop() + stream_read: StreamRead = loop.run_until_complete( + api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config=CONFIG, stream="hashiras")) + ) + + assert stream_read.test_read_limit_reached + + +def make_mock_adapter_factory(return_value: Iterator) -> MagicMock: + mock_source_adapter_factory = MagicMock() mock_source_adapter = MagicMock() mock_source_adapter.read_stream.return_value = return_value - mock_source_adapter_cls.return_value = mock_source_adapter - return mock_source_adapter_cls + mock_source_adapter_factory.create.return_value = mock_source_adapter + return mock_source_adapter_factory diff --git a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_low_code_cdk_adapter.py b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_low_code_cdk_adapter.py index 1003518e751d..645969186ff9 100644 --- a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_low_code_cdk_adapter.py +++ b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_low_code_cdk_adapter.py @@ -208,7 +208,7 @@ def parse_response( def test_get_http_streams(): expected_urls = {"https://demonslayers.com/api/v1/breathing_techniques", "https://demonslayers.com/api/v1/hashiras"} - adapter = LowCodeSourceAdapter(MANIFEST) + adapter = LowCodeSourceAdapter(MANIFEST, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) actual_streams = adapter.get_http_streams(config={}) actual_urls = {http_stream.url_base + http_stream.path() for http_stream in actual_streams} @@ -216,10 +216,13 @@ def test_get_http_streams(): assert actual_urls == expected_urls +MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5 +MAXIMUM_NUMBER_OF_SLICES = 5 + def test_get_http_manifest_with_references(): expected_urls = {"https://demonslayers.com/api/v1/ranks"} - adapter = LowCodeSourceAdapter(MANIFEST_WITH_REFERENCES) + adapter = LowCodeSourceAdapter(MANIFEST_WITH_REFERENCES, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) actual_streams = adapter.get_http_streams(config={}) actual_urls = {http_stream.url_base + http_stream.path() for http_stream in actual_streams} @@ -233,7 +236,7 @@ def test_get_http_streams_non_declarative_streams(): mock_source = MagicMock() mock_source.streams.return_value = [non_declarative_stream] - adapter = LowCodeSourceAdapter(MANIFEST) + adapter = LowCodeSourceAdapter(MANIFEST, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) adapter._source = mock_source with pytest.raises(TypeError): adapter.get_http_streams(config={}) @@ -246,7 +249,7 @@ def test_get_http_streams_non_http_stream(): mock_source = MagicMock() mock_source.streams.return_value = [declarative_stream_non_http_retriever] - adapter = LowCodeSourceAdapter(MANIFEST) + adapter = LowCodeSourceAdapter(MANIFEST, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) adapter._source = mock_source with pytest.raises(TypeError): adapter.get_http_streams(config={}) @@ -276,7 +279,7 @@ def test_read_streams(): mock_source = MagicMock() mock_source.read.return_value = expected_messages - adapter = LowCodeSourceAdapter(MANIFEST) + adapter = LowCodeSourceAdapter(MANIFEST, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) adapter._source = mock_source actual_messages = list(adapter.read_stream("hashiras", {})) @@ -301,7 +304,7 @@ def return_value(*args, **kwargs): mock_source.read.side_effect = return_value - adapter = LowCodeSourceAdapter(MANIFEST) + adapter = LowCodeSourceAdapter(MANIFEST, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) adapter._source = mock_source actual_messages = list(adapter.read_stream("hashiras", {})) @@ -338,11 +341,11 @@ def test_read_streams_invalid_reference(): } with pytest.raises(UndefinedReferenceException): - LowCodeSourceAdapter(invalid_reference_manifest) + LowCodeSourceAdapter(invalid_reference_manifest, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) def test_stream_use_read_test_retriever_and_paginator(): - adapter = LowCodeSourceAdapter(MANIFEST_WITH_PAGINATOR) + adapter = LowCodeSourceAdapter(MANIFEST_WITH_PAGINATOR, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) streams = adapter.get_http_streams(config={}) assert streams