Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #20771] limiting the number of requests performed to the backe… #21525

Merged
merged 12 commits into from
Jan 24, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ManifestDeclarativeSource(DeclarativeSource):

VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "schemas", "spec", "streams", "type", "version"}

def __init__(self, source_config: ConnectionDefinition, debug: bool = False, construct_using_pydantic_models: bool = False):
def __init__(self, source_config: ConnectionDefinition, debug: bool = False, component_factory: ModelToComponentFactory = None, construct_using_pydantic_models: bool = False):
"""
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
:param debug(bool): True if debug mode is enabled
Expand All @@ -71,7 +71,10 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False, con
self._legacy_source_config = resolved_source_config
self._debug = debug
self._legacy_factory = DeclarativeComponentFactory() # Legacy factory used to instantiate declarative components from the manifest
self._constructor = ModelToComponentFactory() # New factory which converts the manifest to Pydantic models to construct components
if component_factory:
self._constructor = component_factory
else:
self._constructor = ModelToComponentFactory() # New factory which converts the manifest to Pydantic models to construct components

self._validate_source()

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import DefaultPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import DefaultPaginator, PaginatorTestReadDecorator
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy

__all__ = ["DefaultPaginator", "NoPagination", "PaginationStrategy", "Paginator"]
__all__ = ["DefaultPaginator", "NoPagination", "PaginationStrategy", "Paginator", "PaginatorTestReadDecorator"]
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,68 @@ def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, A
if option_type != RequestOptionType.path:
options[self.page_size_option.field_name] = self.pagination_strategy.get_page_size()
return options


class PaginatorTestReadDecorator(Paginator):
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
"""
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
pages that are queried throughout a read command.
"""

_PAGE_COUNT_BEFORE_FIRST_NEXT_CALL = 1
_DEFAULT_PAGINATION_LIMIT = 5

def __init__(self, decorated, maximum_number_of_pages: int = None):
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
self._decorated = decorated
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
self._maximum_number_of_pages = maximum_number_of_pages if maximum_number_of_pages else self._DEFAULT_PAGINATION_LIMIT
maxi297 marked this conversation as resolved.
Show resolved Hide resolved

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
if self._page_count >= self._maximum_number_of_pages:
return None

self._page_count += 1
return self._decorated.next_page_token(response, last_records)

def path(self):
return self._decorated.path()

def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._decorated.get_request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def get_request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, str]:
return self._decorated.get_request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def get_request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._decorated.get_request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def get_request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._decorated.get_request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def reset(self):
self._decorated.reset()
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
#

from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever, SimpleRetrieverTestReadDecorator

__all__ = ["Retriever", "SimpleRetriever"]
__all__ = ["Retriever", "SimpleRetriever", "SimpleRetrieverTestReadDecorator"]
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
from dataclasses import InitVar, dataclass, field
from itertools import islice
from json import JSONDecodeError
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

Expand Down Expand Up @@ -416,6 +417,20 @@ def _parse_records_and_emit_request_and_responses(self, request, response, strea
yield from self.parse_response(response, stream_slice=stream_slice, stream_state=stream_state)


class SimpleRetrieverTestReadDecorator(SimpleRetriever):
"""
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
slices that are queried throughout a read command.
"""

_MAXIMUM_NUMBER_OF_SLICES = 5

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Optional[StreamState] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
return islice(super().stream_slices(sync_mode=sync_mode, stream_state=stream_state), self._MAXIMUM_NUMBER_OF_SLICES)


def _prepared_request_to_airbyte_message(request: requests.PreparedRequest) -> AirbyteMessage:
# FIXME: this should return some sort of trace message
request_dict = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import requests
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import DefaultPaginator, RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import (
DefaultPaginator, PaginatorTestReadDecorator, RequestOption, RequestOptionType
)
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.cursor_pagination_strategy import CursorPaginationStrategy


Expand Down Expand Up @@ -202,3 +204,25 @@ def test_reset():
strategy = MagicMock()
DefaultPaginator(strategy, config, url_base, options={}, page_size_option=page_size_request_option, page_token_option=page_token_request_option).reset()
assert strategy.reset.called


def test_limit_page_fetched():
maximum_number_of_pages = 5
number_of_next_performed = maximum_number_of_pages - 1
paginator = PaginatorTestReadDecorator(
DefaultPaginator(
page_size_option=MagicMock(),
page_token_option=MagicMock(),
pagination_strategy=MagicMock(),
config=MagicMock(),
url_base=MagicMock(),
options={},
),
maximum_number_of_pages
)

for _ in range(number_of_next_performed):
last_token = paginator.next_page_token(MagicMock(), MagicMock())
assert last_token

assert not paginator.next_page_token(MagicMock(), MagicMock())
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
_prepared_request_to_airbyte_message,
_response_to_airbyte_message,
)
Expand Down Expand Up @@ -629,3 +630,22 @@ def test_response_to_airbyte_message(test_name, response_body, response_headers,
actual_airbyte_message = _response_to_airbyte_message(response)

assert expected_airbyte_message == actual_airbyte_message


def test_limit_stream_slices():
stream_slicer = MagicMock()
stream_slicer.stream_slices.return_value = [{"date": f"2022-01-0{day}"} for day in range(1, 10)]
retriever = SimpleRetrieverTestReadDecorator(
name="stream_name",
primary_key=primary_key,
requester=MagicMock(),
paginator=MagicMock(),
record_selector=MagicMock(),
stream_slicer=stream_slicer,
options={},
config={},
)

truncated_slices = retriever.stream_slices(sync_mode=SyncMode.incremental, stream_state=None)

assert truncated_slices == [{"date": f"2022-01-0{day}"} for day in range(1, 6)]
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, ConfiguredAirbyteCatalog, Level
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory
from airbyte_cdk.sources.declarative.yaml_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.streams.http import HttpStream
from connector_builder.impl.adapter import CdkAdapter
Expand All @@ -15,7 +16,7 @@
class LowCodeSourceAdapter(CdkAdapter):
def __init__(self, manifest: Dict[str, Any]):
# Request and response messages are only emitted for a sources that have debug turned on
self._source = ManifestDeclarativeSource(manifest, debug=True)
self._source = ManifestDeclarativeSource(manifest, debug=True, component_factory=ModelToComponentFactory(is_test_read=True))

def get_http_streams(self, config: Dict[str, Any]) -> List[HttpStream]:
http_streams = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import requests
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, Level, Type
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.requesters.paginators import PaginatorTestReadDecorator
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetrieverTestReadDecorator
from airbyte_cdk.sources.declarative.parsers.custom_exceptions import UndefinedReferenceException
from airbyte_cdk.sources.streams.http import HttpStream

Expand Down Expand Up @@ -176,6 +178,33 @@ def parse_response(
}
}

MANIFEST_WITH_PAGINATOR = {
"version": "0.1.0",
"type" : "DeclarativeSource",
"definitions": {
},
"streams": [
{
"type" : "DeclarativeStream",
"retriever": {
"type" : "SimpleRetriever",
"record_selector": {"extractor": {"field_pointer": ["items"], "type": "DpathExtractor"}, "type": "RecordSelector"},
"paginator": {
"type": "DefaultPaginator",
"pagination_strategy": {
"type": "OffsetIncrement",
"page_size": 10
},
"url_base": "https://demonslayers.com/api/v1/"
},
"requester": {"url_base": "https://demonslayers.com/api/v1/", "http_method": "GET", "type": "HttpRequester"},
},
"$options": {"name": "hashiras", "path": "/hashiras"},
},
],
"check": {"stream_names": ["hashiras"], "type": "CheckStream"},
}

def test_get_http_streams():
expected_urls = {"https://demonslayers.com/api/v1/breathing_techniques", "https://demonslayers.com/api/v1/hashiras"}

Expand Down Expand Up @@ -310,3 +339,13 @@ def test_read_streams_invalid_reference():

with pytest.raises(UndefinedReferenceException):
LowCodeSourceAdapter(invalid_reference_manifest)


def test_stream_use_read_test_retriever_and_paginator():
adapter = LowCodeSourceAdapter(MANIFEST_WITH_PAGINATOR)
streams = adapter.get_http_streams(config={})

assert streams
for stream in streams:
assert isinstance(stream, SimpleRetrieverTestReadDecorator)
assert isinstance(stream.paginator, PaginatorTestReadDecorator)