diff --git a/airbyte-cdk/python/.bumpversion.cfg b/airbyte-cdk/python/.bumpversion.cfg index 44db9b6e9a4d..1278da16b49c 100644 --- a/airbyte-cdk/python/.bumpversion.cfg +++ b/airbyte-cdk/python/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.22.0 +current_version = 0.23.0 commit = False [bumpversion:file:setup.py] diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index ce125c91cac2..22592251d702 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.23.0 +Limiting the number of HTTP requests during a test read + ## 0.22.0 Surface the resolved manifest in the CDK diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 3f9efab8e48f..a9efc45fb21b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -2,6 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import json import logging from abc import ABC, abstractmethod from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union @@ -9,10 +10,12 @@ from airbyte_cdk.models import ( AirbyteCatalog, AirbyteConnectionStatus, + AirbyteLogMessage, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, + Level, Status, SyncMode, ) @@ -34,6 +37,8 @@ class AbstractSource(Source, ABC): in this class to create an Airbyte Specification compliant Source. """ + SLICE_LOG_PREFIX = "slice:" + @abstractmethod def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: """ @@ -236,7 +241,10 @@ def _read_incremental( has_slices = False for _slice in slices: has_slices = True - logger.debug("Processing stream slice", extra={"slice": _slice}) + if logger.isEnabledFor(logging.DEBUG): + yield AirbyteMessage( + type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice)}") + ) records = stream_instance.read_records( sync_mode=SyncMode.incremental, stream_slice=_slice, @@ -285,7 +293,10 @@ def _read_full_refresh( ) total_records_counter = 0 for _slice in slices: - logger.debug("Processing stream slice", extra={"slice": _slice}) + if logger.isEnabledFor(logging.DEBUG): + yield AirbyteMessage( + type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice)}") + ) record_data_or_messages = stream_instance.read_records( stream_slice=_slice, sync_mode=SyncMode.full_refresh, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 40fb1b6fac81..05d9e7411358 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -50,7 +50,13 @@ class ManifestDeclarativeSource(DeclarativeSource): VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "schemas", "spec", "streams", "type", "version"} - def __init__(self, source_config: ConnectionDefinition, debug: bool = False, construct_using_pydantic_models: bool = False): + def __init__( + self, + source_config: ConnectionDefinition, + debug: bool = False, + component_factory: ModelToComponentFactory = None, + construct_using_pydantic_models: bool = False, + ): """ :param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector :param debug(bool): True if debug mode is enabled @@ -71,7 +77,12 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False, con self._legacy_source_config = resolved_source_config self._debug = debug self._legacy_factory = DeclarativeComponentFactory() # Legacy factory used to instantiate declarative components from the manifest - self._constructor = ModelToComponentFactory() # New factory which converts the manifest to Pydantic models to construct components + if component_factory: + self._constructor = component_factory + else: + self._constructor = ( + ModelToComponentFactory() + ) # New factory which converts the manifest to Pydantic models to construct components self._validate_source() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index ca5e46d79c6c..8f022f34fa3a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -89,11 +89,11 @@ WaitUntilTimeFromHeaderBackoffStrategy, ) from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction -from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator, NoPagination +from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator, NoPagination, PaginatorTestReadDecorator from airbyte_cdk.sources.declarative.requesters.paginators.strategies import CursorPaginationStrategy, OffsetIncrement, PageIncrement from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider -from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever +from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever, SimpleRetrieverTestReadDecorator from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader, InlineSchemaLoader, JsonFileSchemaLoader from airbyte_cdk.sources.declarative.spec import Spec from airbyte_cdk.sources.declarative.stream_slicers import ( @@ -116,8 +116,71 @@ class ModelToComponentFactory: - @staticmethod - def create_component(model_type: Type[BaseModel], component_definition: ComponentDefinition, config: Config) -> type: + def __init__(self, limit_pages_fetched_per_slice: int = None, limit_slices_fetched: int = None): + self._init_mappings() + self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice + self._limit_slices_fetched = limit_slices_fetched + + def _init_mappings(self): + self.PYDANTIC_MODEL_TO_CONSTRUCTOR: [Type[BaseModel], Callable] = { + AddedFieldDefinitionModel: self.create_added_field_definition, + AddFieldsModel: self.create_add_fields, + ApiKeyAuthenticatorModel: self.create_api_key_authenticator, + BasicHttpAuthenticatorModel: self.create_basic_http_authenticator, + BearerAuthenticatorModel: self.create_bearer_authenticator, + CheckStreamModel: self.create_check_stream, + CartesianProductStreamSlicerModel: self.create_cartesian_product_slicer, + CompositeErrorHandlerModel: self.create_composite_error_handler, + ConstantBackoffStrategyModel: self.create_constant_backoff_strategy, + CursorPaginationModel: self.create_cursor_pagination, + CustomAuthenticatorModel: self.create_custom_component, + CustomBackoffStrategyModel: self.create_custom_component, + CustomErrorHandlerModel: self.create_custom_component, + CustomRecordExtractorModel: self.create_custom_component, + CustomRequesterModel: self.create_custom_component, + CustomRequestOptionsProviderModel: self.create_custom_component, + # todo: Remove later when we deprecate request providers from interface + CustomRetrieverModel: self.create_custom_component, + CustomPaginationStrategyModel: self.create_custom_component, + CustomStreamSlicerModel: self.create_custom_component, + CustomTransformationModel: self.create_custom_component, + DatetimeStreamSlicerModel: self.create_datetime_stream_slicer, + DeclarativeStreamModel: self.create_declarative_stream, + DefaultErrorHandlerModel: self.create_default_error_handler, + DefaultPaginatorModel: self.create_default_paginator, + DpathExtractorModel: self.create_dpath_extractor, + ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy, + HttpRequesterModel: self.create_http_requester, + HttpResponseFilterModel: self.create_http_response_filter, + InlineSchemaLoaderModel: self.create_inline_schema_loader, + InterpolatedRequestOptionsProviderModel: self.create_interpolated_request_options_provider, + JsonDecoderModel: self.create_json_decoder, + JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, + ListStreamSlicerModel: self.create_list_stream_slicer, + MinMaxDatetimeModel: self.create_min_max_datetime, + NoAuthModel: self.create_no_auth, + NoPaginationModel: self.create_no_pagination, + OAuthAuthenticatorModel: self.create_oauth_authenticator, + OffsetIncrementModel: self.create_offset_increment, + PageIncrementModel: self.create_page_increment, + ParentStreamConfigModel: self.create_parent_stream_config, + RecordFilterModel: self.create_record_filter, + RecordSelectorModel: self.create_record_selector, + RemoveFieldsModel: self.create_remove_fields, + RequestOptionModel: self.create_request_option, + SessionTokenAuthenticatorModel: self.create_session_token_authenticator, + SimpleRetrieverModel: self.create_simple_retriever, + SingleSliceModel: self.create_single_slice, + SpecModel: self.create_spec, + SubstreamSlicerModel: self.create_substream_slicer, + WaitTimeFromHeaderModel: self.create_wait_time_from_header, + WaitUntilTimeFromHeaderModel: self.create_wait_until_time_from_header, + } + + # Needed for the case where we need to perform a second parse on the fields of a custom component + self.TYPE_NAME_TO_MODEL = {cls.__name__: cls for cls in self.PYDANTIC_MODEL_TO_CONSTRUCTOR} + + def create_component(self, model_type: Type[BaseModel], component_definition: ComponentDefinition, config: Config) -> type: """ Takes a given Pydantic model type and Mapping representing a component definition and creates a declarative component and subcomponents which will be used at runtime. This is done by first parsing the mapping into a Pydantic model and then creating @@ -138,591 +201,541 @@ def create_component(model_type: Type[BaseModel], component_definition: Componen if not isinstance(declarative_component_model, model_type): raise ValueError(f"Expected {model_type.__name__} component, but received {declarative_component_model.__class__.__name__}") - return _create_component_from_model(model=declarative_component_model, config=config) + return self._create_component_from_model(model=declarative_component_model, config=config) + + def _create_component_from_model(self, model: BaseModel, config: Config, **kwargs) -> Any: + if model.__class__ not in self.PYDANTIC_MODEL_TO_CONSTRUCTOR: + raise ValueError(f"{model.__class__} with attributes {model} is not a valid component type") + component_constructor = self.PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__) + return component_constructor(model=model, config=config, **kwargs) + @staticmethod + def create_added_field_definition(model: AddedFieldDefinitionModel, config: Config, **kwargs) -> AddedFieldDefinition: + interpolated_value = InterpolatedString.create(model.value, options=model.options) + return AddedFieldDefinition(path=model.path, value=interpolated_value, options=model.options) -def _create_component_from_model(model: BaseModel, config: Config, **kwargs) -> Any: - if model.__class__ not in PYDANTIC_MODEL_TO_CONSTRUCTOR: - raise ValueError(f"{model.__class__} with attributes {model} is not a valid component type") - component_constructor = PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__) - return component_constructor(model=model, config=config, **kwargs) + def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs) -> AddFields: + added_field_definitions = [ + self._create_component_from_model(model=added_field_definition_model, config=config) + for added_field_definition_model in model.fields + ] + return AddFields(fields=added_field_definitions, options=model.options) + @staticmethod + def create_api_key_authenticator(model: ApiKeyAuthenticatorModel, config: Config, **kwargs) -> ApiKeyAuthenticator: + return ApiKeyAuthenticator(api_token=model.api_token, header=model.header, config=config, options=model.options) -def create_added_field_definition(model: AddedFieldDefinitionModel, config: Config, **kwargs) -> AddedFieldDefinition: - interpolated_value = InterpolatedString.create(model.value, options=model.options) - return AddedFieldDefinition(path=model.path, value=interpolated_value, options=model.options) + @staticmethod + def create_basic_http_authenticator(model: BasicHttpAuthenticatorModel, config: Config, **kwargs) -> BasicHttpAuthenticator: + return BasicHttpAuthenticator(password=model.password, username=model.username, config=config, options=model.options) + + @staticmethod + def create_bearer_authenticator(model: BearerAuthenticatorModel, config: Config, **kwargs) -> BearerAuthenticator: + return BearerAuthenticator( + api_token=model.api_token, + config=config, + options=model.options, + ) + def create_cartesian_product_slicer( + self, model: CartesianProductStreamSlicerModel, config: Config, **kwargs + ) -> CartesianProductStreamSlicer: + stream_slicers = [ + self._create_component_from_model(model=stream_slicer_model, config=config) for stream_slicer_model in model.stream_slicers + ] + return CartesianProductStreamSlicer(stream_slicers=stream_slicers, options=model.options) -def create_add_fields(model: AddFieldsModel, config: Config, **kwargs) -> AddFields: - added_field_definitions = [ - _create_component_from_model(model=added_field_definition_model, config=config) for added_field_definition_model in model.fields - ] - return AddFields(fields=added_field_definitions, options=model.options) + @staticmethod + def create_check_stream(model: CheckStreamModel, config: Config, **kwargs): + return CheckStream(stream_names=model.stream_names, options={}) + def create_composite_error_handler(self, model: CompositeErrorHandlerModel, config: Config, **kwargs) -> CompositeErrorHandler: + error_handlers = [ + self._create_component_from_model(model=error_handler_model, config=config) for error_handler_model in model.error_handlers + ] + return CompositeErrorHandler(error_handlers=error_handlers, options=model.options) -def create_api_key_authenticator(model: ApiKeyAuthenticatorModel, config: Config, **kwargs) -> ApiKeyAuthenticator: - return ApiKeyAuthenticator(api_token=model.api_token, header=model.header, config=config, options=model.options) + @staticmethod + def create_constant_backoff_strategy(model: ConstantBackoffStrategyModel, config: Config, **kwargs) -> ConstantBackoffStrategy: + return ConstantBackoffStrategy( + backoff_time_in_seconds=model.backoff_time_in_seconds, + config=config, + options=model.options, + ) + def create_cursor_pagination(self, model: CursorPaginationModel, config: Config, **kwargs) -> CursorPaginationStrategy: + if model.decoder: + decoder = self._create_component_from_model(model=model.decoder, config=config) + else: + decoder = JsonDecoder(options=model.options) + + return CursorPaginationStrategy( + cursor_value=model.cursor_value, + decoder=decoder, + page_size=model.page_size, + stop_condition=model.stop_condition, + config=config, + options=model.options, + ) -def create_basic_http_authenticator(model: BasicHttpAuthenticatorModel, config: Config, **kwargs) -> BasicHttpAuthenticator: - return BasicHttpAuthenticator(password=model.password, username=model.username, config=config, options=model.options) + def create_custom_component(self, model, config: Config, **kwargs) -> type: + """ + Generically creates a custom component based on the model type and a class_name reference to the custom Python class being + instantiated. Only the model's additional properties that match the custom class definition are passed to the constructor + :param model: The Pydantic model of the custom component being created + :param config: The custom defined connector config + :return: The declarative component built from the Pydantic model to be used at runtime + """ + custom_component_class = self._get_class_from_fully_qualified_class_name(model.class_name) + component_fields = get_type_hints(custom_component_class) + model_args = model.dict() + model_args["config"] = config + + # Pydantic is unable to parse a custom component's fields that are subcomponents into models because their fields and types are not + # defined in the schema. The fields and types are defined within the Python class implementation. Pydantic can only parse down to + # the custom component and this code performs a second parse to convert the sub-fields first into models, then declarative components + for model_field, model_value in model_args.items(): + # If a custom component field doesn't have a type set, we try to use the type hints to infer the type + if isinstance(model_value, dict) and "type" not in model_value and model_field in component_fields: + derived_type = self._derive_component_type_from_type_hints(component_fields.get(model_field)) + if derived_type: + model_value["type"] = derived_type + + if self._is_component(model_value): + model_args[model_field] = self._create_nested_component(model, model_field, model_value, config) + elif isinstance(model_value, list): + vals = [] + for v in model_value: + if isinstance(v, dict) and "type" not in v and model_field in component_fields: + derived_type = self._derive_component_type_from_type_hints(component_fields.get(model_field)) + if derived_type: + v["type"] = derived_type + if self._is_component(v): + vals.append(self._create_nested_component(model, model_field, v, config)) + else: + vals.append(v) + model_args[model_field] = vals + + kwargs = {class_field: model_args[class_field] for class_field in component_fields.keys() if class_field in model_args} + return custom_component_class(**kwargs) -def create_bearer_authenticator(model: BearerAuthenticatorModel, config: Config, **kwargs) -> BearerAuthenticator: - return BearerAuthenticator( - api_token=model.api_token, - config=config, - options=model.options, - ) + @staticmethod + def _get_class_from_fully_qualified_class_name(class_name: str) -> type: + split = class_name.split(".") + module = ".".join(split[:-1]) + class_name = split[-1] + return getattr(importlib.import_module(module), class_name) + @staticmethod + def _derive_component_type_from_type_hints(field_type: str) -> Optional[str]: + interface = field_type + while True: + origin = get_origin(interface) + if origin: + # Unnest types until we reach the raw type + # List[T] -> T + # Optional[List[T]] -> T + args = get_args(interface) + interface = args[0] + else: + break + if isinstance(interface, type) and not ModelToComponentFactory.is_builtin_type(interface): + return interface.__name__ + return None -def create_cartesian_product_slicer(model: CartesianProductStreamSlicerModel, config: Config, **kwargs) -> CartesianProductStreamSlicer: - stream_slicers = [ - _create_component_from_model(model=stream_slicer_model, config=config) for stream_slicer_model in model.stream_slicers - ] - return CartesianProductStreamSlicer(stream_slicers=stream_slicers, options=model.options) - - -def create_check_stream(model: CheckStreamModel, config: Config, **kwargs): - return CheckStream(stream_names=model.stream_names, options={}) - - -def create_composite_error_handler(model: CompositeErrorHandlerModel, config: Config, **kwargs) -> CompositeErrorHandler: - error_handlers = [ - _create_component_from_model(model=error_handler_model, config=config) for error_handler_model in model.error_handlers - ] - return CompositeErrorHandler(error_handlers=error_handlers, options=model.options) - - -def create_constant_backoff_strategy(model: ConstantBackoffStrategyModel, config: Config, **kwargs) -> ConstantBackoffStrategy: - return ConstantBackoffStrategy( - backoff_time_in_seconds=model.backoff_time_in_seconds, - config=config, - options=model.options, - ) - - -def create_cursor_pagination(model: CursorPaginationModel, config: Config, **kwargs) -> CursorPaginationStrategy: - if model.decoder: - decoder = _create_component_from_model(model=model.decoder, config=config) - else: - decoder = JsonDecoder(options=model.options) - - return CursorPaginationStrategy( - cursor_value=model.cursor_value, - decoder=decoder, - page_size=model.page_size, - stop_condition=model.stop_condition, - config=config, - options=model.options, - ) - - -def create_custom_component(model, config: Config, **kwargs) -> type: - """ - Generically creates a custom component based on the model type and a class_name reference to the custom Python class being - instantiated. Only the model's additional properties that match the custom class definition are passed to the constructor - :param model: The Pydantic model of the custom component being created - :param config: The custom defined connector config - :return: The declarative component built from the Pydantic model to be used at runtime - """ - - custom_component_class = _get_class_from_fully_qualified_class_name(model.class_name) - component_fields = get_type_hints(custom_component_class) - model_args = model.dict() - model_args["config"] = config - - # Pydantic is unable to parse a custom component's fields that are subcomponents into models because their fields and types are not - # defined in the schema. The fields and types are defined within the Python class implementation. Pydantic can only parse down to - # the custom component and this code performs a second parse to convert the sub-fields first into models, then declarative components - for model_field, model_value in model_args.items(): - # If a custom component field doesn't have a type set, we try to use the type hints to infer the type - if isinstance(model_value, dict) and "type" not in model_value and model_field in component_fields: - derived_type = _derive_component_type_from_type_hints(component_fields.get(model_field)) - if derived_type: - model_value["type"] = derived_type - - if _is_component(model_value): - model_args[model_field] = _create_nested_component(model, model_field, model_value, config) - elif isinstance(model_value, list): - vals = [] - for v in model_value: - if isinstance(v, dict) and "type" not in v and model_field in component_fields: - derived_type = _derive_component_type_from_type_hints(component_fields.get(model_field)) - if derived_type: - v["type"] = derived_type - if _is_component(v): - vals.append(_create_nested_component(model, model_field, v, config)) - else: - vals.append(v) - model_args[model_field] = vals - - kwargs = {class_field: model_args[class_field] for class_field in component_fields.keys() if class_field in model_args} - return custom_component_class(**kwargs) - - -def _get_class_from_fully_qualified_class_name(class_name: str) -> type: - split = class_name.split(".") - module = ".".join(split[:-1]) - class_name = split[-1] - return getattr(importlib.import_module(module), class_name) - - -def _derive_component_type_from_type_hints(field_type: str) -> Optional[str]: - interface = field_type - while True: - origin = get_origin(interface) - if origin: - # Unnest types until we reach the raw type - # List[T] -> T - # Optional[List[T]] -> T - args = get_args(interface) - interface = args[0] + @staticmethod + def is_builtin_type(cls) -> bool: + if not cls: + return False + return cls.__module__ == "builtins" + + def _create_nested_component(self, model, model_field: str, model_value: Any, config: Config) -> Any: + type_name = model_value.get("type", None) + if not type_name: + # If no type is specified, we can assume this is a dictionary object which can be returned instead of a subcomponent + return model_value + + model_type = self.TYPE_NAME_TO_MODEL.get(type_name, None) + if model_type: + parsed_model = model_type.parse_obj(model_value) + return self._create_component_from_model(model=parsed_model, config=config) else: - break - if isinstance(interface, type) and not is_builtin_type(interface): - return interface.__name__ - return None - - -def is_builtin_type(cls) -> bool: - if not cls: - return False - return cls.__module__ == "builtins" - - -def _create_nested_component(model, model_field: str, model_value: Any, config: Config) -> Any: - type_name = model_value.get("type", None) - if not type_name: - # If no type is specified, we can assume this is a dictionary object which can be returned instead of a subcomponent - return model_value - - model_type = TYPE_NAME_TO_MODEL.get(type_name, None) - if model_type: - parsed_model = model_type.parse_obj(model_value) - return _create_component_from_model(model=parsed_model, config=config) - else: - raise ValueError( - f"Error creating custom component {model.class_name}. Subcomponent creation has not been implemented for '{type_name}'" + raise ValueError( + f"Error creating custom component {model.class_name}. Subcomponent creation has not been implemented for '{type_name}'" + ) + + @staticmethod + def _is_component(model_value: Any) -> bool: + return isinstance(model_value, dict) and model_value.get("type") + + def create_datetime_stream_slicer(self, model: DatetimeStreamSlicerModel, config: Config, **kwargs) -> DatetimeStreamSlicer: + start_datetime = ( + model.start_datetime if isinstance(model.start_datetime, str) else self.create_min_max_datetime(model.start_datetime, config) + ) + end_datetime = ( + model.end_datetime if isinstance(model.end_datetime, str) else self.create_min_max_datetime(model.end_datetime, config) ) + end_time_option = ( + RequestOption( + inject_into=RequestOptionType(model.end_time_option.inject_into.value), + field_name=model.end_time_option.field_name, + options=model.options, + ) + if model.end_time_option + else None + ) + start_time_option = ( + RequestOption( + inject_into=RequestOptionType(model.start_time_option.inject_into.value), + field_name=model.start_time_option.field_name, + options=model.options, + ) + if model.start_time_option + else None + ) -def _is_component(model_value: Any) -> bool: - return isinstance(model_value, dict) and model_value.get("type") + return DatetimeStreamSlicer( + cursor_field=model.cursor_field, + cursor_granularity=model.cursor_granularity, + datetime_format=model.datetime_format, + end_datetime=end_datetime, + start_datetime=start_datetime, + step=model.step, + end_time_option=end_time_option, + lookback_window=model.lookback_window, + start_time_option=start_time_option, + stream_state_field_end=model.stream_state_field_end, + stream_state_field_start=model.stream_state_field_start, + config=config, + options=model.options, + ) + + def create_declarative_stream(self, model: DeclarativeStreamModel, config: Config, **kwargs) -> DeclarativeStream: + retriever = self._create_component_from_model(model=model.retriever, config=config) + + if model.schema_loader: + schema_loader = self._create_component_from_model(model=model.schema_loader, config=config) + else: + options = model.options or {} + if "name" not in options: + options["name"] = model.name + schema_loader = DefaultSchemaLoader(config=config, options=options) + + transformations = [] + if model.transformations: + for transformation_model in model.transformations: + transformations.append(self._create_component_from_model(model=transformation_model, config=config)) + return DeclarativeStream( + checkpoint_interval=model.checkpoint_interval, + name=model.name, + primary_key=model.primary_key, + retriever=retriever, + schema_loader=schema_loader, + stream_cursor_field=model.stream_cursor_field or [], + transformations=transformations, + config=config, + options={}, + ) + def create_default_error_handler(self, model: DefaultErrorHandlerModel, config: Config, **kwargs) -> DefaultErrorHandler: + backoff_strategies = [] + if model.backoff_strategies: + for backoff_strategy_model in model.backoff_strategies: + backoff_strategies.append(self._create_component_from_model(model=backoff_strategy_model, config=config)) + else: + backoff_strategies.append(DEFAULT_BACKOFF_STRATEGY(config=config, options=model.options)) -def create_datetime_stream_slicer(model: DatetimeStreamSlicerModel, config: Config, **kwargs) -> DatetimeStreamSlicer: - start_datetime = ( - model.start_datetime if isinstance(model.start_datetime, str) else create_min_max_datetime(model.start_datetime, config) - ) - end_datetime = model.end_datetime if isinstance(model.end_datetime, str) else create_min_max_datetime(model.end_datetime, config) + response_filters = [] + if model.response_filters: + for response_filter_model in model.response_filters: + response_filters.append(self._create_component_from_model(model=response_filter_model, config=config)) + else: + response_filters.append( + HttpResponseFilter( + ResponseAction.RETRY, http_codes=HttpResponseFilter.DEFAULT_RETRIABLE_ERRORS, config=config, options=model.options + ) + ) + response_filters.append(HttpResponseFilter(ResponseAction.IGNORE, config=config, options=model.options)) - end_time_option = ( - RequestOption( - inject_into=RequestOptionType(model.end_time_option.inject_into.value), - field_name=model.end_time_option.field_name, + return DefaultErrorHandler( + backoff_strategies=backoff_strategies, + max_retries=model.max_retries, + response_filters=response_filters, + config=config, options=model.options, ) - if model.end_time_option - else None - ) - start_time_option = ( - RequestOption( - inject_into=RequestOptionType(model.start_time_option.inject_into.value), - field_name=model.start_time_option.field_name, + + def create_default_paginator(self, model: DefaultPaginatorModel, config: Config, **kwargs) -> DefaultPaginator: + decoder = self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(options={}) + page_size_option = ( + self._create_component_from_model(model=model.page_size_option, config=config) if model.page_size_option else None + ) + page_token_option = ( + self._create_component_from_model(model=model.page_token_option, config=config) if model.page_token_option else None + ) + pagination_strategy = self._create_component_from_model(model=model.pagination_strategy, config=config) + + paginator = DefaultPaginator( + decoder=decoder, + page_size_option=page_size_option, + page_token_option=page_token_option, + pagination_strategy=pagination_strategy, + url_base=model.url_base, + config=config, options=model.options, ) - if model.start_time_option - else None - ) - - return DatetimeStreamSlicer( - cursor_field=model.cursor_field, - cursor_granularity=model.cursor_granularity, - datetime_format=model.datetime_format, - end_datetime=end_datetime, - start_datetime=start_datetime, - step=model.step, - end_time_option=end_time_option, - lookback_window=model.lookback_window, - start_time_option=start_time_option, - stream_state_field_end=model.stream_state_field_end, - stream_state_field_start=model.stream_state_field_start, - config=config, - options=model.options, - ) - - -def create_declarative_stream(model: DeclarativeStreamModel, config: Config, **kwargs) -> DeclarativeStream: - retriever = _create_component_from_model(model=model.retriever, config=config) - - if model.schema_loader: - schema_loader = _create_component_from_model(model=model.schema_loader, config=config) - else: - options = model.options or {} - if "name" not in options: - options["name"] = model.name - schema_loader = DefaultSchemaLoader(config=config, options=options) - - transformations = [] - if model.transformations: - for transformation_model in model.transformations: - transformations.append(_create_component_from_model(model=transformation_model, config=config)) - return DeclarativeStream( - checkpoint_interval=model.checkpoint_interval, - name=model.name, - primary_key=model.primary_key, - retriever=retriever, - schema_loader=schema_loader, - stream_cursor_field=model.stream_cursor_field or [], - transformations=transformations, - config=config, - options={}, - ) - - -def create_default_error_handler(model: DefaultErrorHandlerModel, config: Config, **kwargs) -> DefaultErrorHandler: - backoff_strategies = [] - if model.backoff_strategies: - for backoff_strategy_model in model.backoff_strategies: - backoff_strategies.append(_create_component_from_model(model=backoff_strategy_model, config=config)) - else: - backoff_strategies.append(DEFAULT_BACKOFF_STRATEGY(config=config, options=model.options)) - - response_filters = [] - if model.response_filters: - for response_filter_model in model.response_filters: - response_filters.append(_create_component_from_model(model=response_filter_model, config=config)) - else: - response_filters.append( - HttpResponseFilter( - ResponseAction.RETRY, http_codes=HttpResponseFilter.DEFAULT_RETRIABLE_ERRORS, config=config, options=model.options + if self._limit_pages_fetched_per_slice: + return PaginatorTestReadDecorator(paginator, self._limit_pages_fetched_per_slice) + return paginator + + def create_dpath_extractor(self, model: DpathExtractorModel, config: Config, **kwargs) -> DpathExtractor: + decoder = self._create_component_from_model(model.decoder, config=config) if model.decoder else JsonDecoder(options={}) + return DpathExtractor(decoder=decoder, field_pointer=model.field_pointer, config=config, options=model.options) + + @staticmethod + def create_exponential_backoff_strategy(model: ExponentialBackoffStrategyModel, config: Config) -> ExponentialBackoffStrategy: + return ExponentialBackoffStrategy(factor=model.factor, options=model.options, config=config) + + def create_http_requester(self, model: HttpRequesterModel, config: Config, **kwargs) -> HttpRequester: + authenticator = self._create_component_from_model(model=model.authenticator, config=config) if model.authenticator else None + error_handler = ( + self._create_component_from_model(model=model.error_handler, config=config) + if model.error_handler + else DefaultErrorHandler(backoff_strategies=[], response_filters=[], config=config, options=model.options) + ) + request_options_provider = ( + self._create_component_from_model(model=model.request_options_provider, config=config) + if model.request_options_provider + else None + ) + + return HttpRequester( + name=model.name, + url_base=model.url_base, + path=model.path, + authenticator=authenticator, + error_handler=error_handler, + http_method=model.http_method, + request_options_provider=request_options_provider, + config=config, + options=model.options, + ) + + @staticmethod + def create_http_response_filter(model: HttpResponseFilterModel, config: Config, **kwargs) -> HttpResponseFilter: + action = ResponseAction(model.action.value) + http_codes = ( + set(model.http_codes) if model.http_codes else set() + ) # JSON schema notation has no set data type. The schema enforces an array of unique elements + + return HttpResponseFilter( + action=action, + error_message=model.error_message or "", + error_message_contains=model.error_message_contains, + http_codes=http_codes, + predicate=model.predicate or "", + config=config, + options=model.options, + ) + + @staticmethod + def create_inline_schema_loader(model: InlineSchemaLoaderModel, config: Config, **kwargs) -> InlineSchemaLoader: + return InlineSchemaLoader(schema=model.schema_, options={}) + + @staticmethod + def create_interpolated_request_options_provider( + model: InterpolatedRequestOptionsProviderModel, config: Config, **kwargs + ) -> InterpolatedRequestOptionsProvider: + return InterpolatedRequestOptionsProvider( + request_body_data=model.request_body_data, + request_body_json=model.request_body_json, + request_headers=model.request_headers, + request_parameters=model.request_parameters, + config=config, + options=model.options, + ) + + @staticmethod + def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs) -> JsonDecoder: + return JsonDecoder(options={}) + + @staticmethod + def create_json_file_schema_loader(model: JsonFileSchemaLoaderModel, config: Config, **kwargs) -> JsonFileSchemaLoader: + return JsonFileSchemaLoader(file_path=model.file_path, config=config, options=model.options) + + @staticmethod + def create_list_stream_slicer(model: ListStreamSlicerModel, config: Config, **kwargs) -> ListStreamSlicer: + request_option = ( + RequestOption( + inject_into=RequestOptionType(model.request_option.inject_into.value), + field_name=model.request_option.field_name, + options=model.options, ) + if model.request_option + else None ) - response_filters.append(HttpResponseFilter(ResponseAction.IGNORE, config=config, options=model.options)) - - return DefaultErrorHandler( - backoff_strategies=backoff_strategies, - max_retries=model.max_retries, - response_filters=response_filters, - config=config, - options=model.options, - ) - - -def create_default_paginator(model: DefaultPaginatorModel, config: Config, **kwargs) -> DefaultPaginator: - decoder = _create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(options={}) - page_size_option = _create_component_from_model(model=model.page_size_option, config=config) if model.page_size_option else None - page_token_option = _create_component_from_model(model=model.page_token_option, config=config) if model.page_token_option else None - pagination_strategy = _create_component_from_model(model=model.pagination_strategy, config=config) - - return DefaultPaginator( - decoder=decoder, - page_size_option=page_size_option, - page_token_option=page_token_option, - pagination_strategy=pagination_strategy, - url_base=model.url_base, - config=config, - options=model.options, - ) - - -def create_dpath_extractor(model: DpathExtractorModel, config: Config, **kwargs) -> DpathExtractor: - decoder = _create_component_from_model(model.decoder, config=config) if model.decoder else JsonDecoder(options={}) - return DpathExtractor(decoder=decoder, field_pointer=model.field_pointer, config=config, options=model.options) - - -def create_exponential_backoff_strategy(model: ExponentialBackoffStrategyModel, config: Config) -> ExponentialBackoffStrategy: - return ExponentialBackoffStrategy(factor=model.factor, options=model.options, config=config) - - -def create_http_requester(model: HttpRequesterModel, config: Config, **kwargs) -> HttpRequester: - authenticator = _create_component_from_model(model=model.authenticator, config=config) if model.authenticator else None - error_handler = ( - _create_component_from_model(model=model.error_handler, config=config) - if model.error_handler - else DefaultErrorHandler(backoff_strategies=[], response_filters=[], config=config, options=model.options) - ) - request_options_provider = ( - _create_component_from_model(model=model.request_options_provider, config=config) if model.request_options_provider else None - ) - - return HttpRequester( - name=model.name, - url_base=model.url_base, - path=model.path, - authenticator=authenticator, - error_handler=error_handler, - http_method=model.http_method, - request_options_provider=request_options_provider, - config=config, - options=model.options, - ) - - -def create_http_response_filter(model: HttpResponseFilterModel, config: Config, **kwargs) -> HttpResponseFilter: - action = ResponseAction(model.action.value) - http_codes = ( - set(model.http_codes) if model.http_codes else set() - ) # JSON schema notation has no set data type. The schema enforces an array of unique elements - - return HttpResponseFilter( - action=action, - error_message=model.error_message or "", - error_message_contains=model.error_message_contains, - http_codes=http_codes, - predicate=model.predicate or "", - config=config, - options=model.options, - ) - - -def create_inline_schema_loader(model: InlineSchemaLoaderModel, config: Config, **kwargs) -> InlineSchemaLoader: - return InlineSchemaLoader(schema=model.schema_, options={}) - - -def create_interpolated_request_options_provider( - model: InterpolatedRequestOptionsProviderModel, config: Config, **kwargs -) -> InterpolatedRequestOptionsProvider: - return InterpolatedRequestOptionsProvider( - request_body_data=model.request_body_data, - request_body_json=model.request_body_json, - request_headers=model.request_headers, - request_parameters=model.request_parameters, - config=config, - options=model.options, - ) - - -def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs) -> JsonDecoder: - return JsonDecoder(options={}) - - -def create_json_file_schema_loader(model: JsonFileSchemaLoaderModel, config: Config, **kwargs) -> JsonFileSchemaLoader: - return JsonFileSchemaLoader(file_path=model.file_path, config=config, options=model.options) - - -def create_list_stream_slicer(model: ListStreamSlicerModel, config: Config, **kwargs) -> ListStreamSlicer: - request_option = ( - RequestOption( - inject_into=RequestOptionType(model.request_option.inject_into.value), - field_name=model.request_option.field_name, + return ListStreamSlicer( + cursor_field=model.cursor_field, + request_option=request_option, + slice_values=model.slice_values, + config=config, + options=model.options, + ) + + @staticmethod + def create_min_max_datetime(model: MinMaxDatetimeModel, config: Config, **kwargs) -> MinMaxDatetime: + return MinMaxDatetime( + datetime=model.datetime, + datetime_format=model.datetime_format, + max_datetime=model.max_datetime, + min_datetime=model.min_datetime, + options=model.options, + ) + + @staticmethod + def create_no_auth(model: NoAuthModel, config: Config, **kwargs) -> NoAuth: + return NoAuth(options=model.options) + + @staticmethod + def create_no_pagination(model: NoPaginationModel, config: Config, **kwargs) -> NoPagination: + return NoPagination(options={}) + + @staticmethod + def create_oauth_authenticator(model: OAuthAuthenticatorModel, config: Config, **kwargs) -> DeclarativeOauth2Authenticator: + return DeclarativeOauth2Authenticator( + access_token_name=model.access_token_name, + client_id=model.client_id, + client_secret=model.client_secret, + expires_in_name=model.expires_in_name, + grant_type=model.grant_type, + refresh_request_body=model.refresh_request_body, + refresh_token=model.refresh_token, + scopes=model.scopes, + token_expiry_date=model.token_expiry_date, + token_expiry_date_format=model.token_expiry_date_format, + token_refresh_endpoint=model.token_refresh_endpoint, + config=config, + options=model.options, + ) + + @staticmethod + def create_offset_increment(model: OffsetIncrementModel, config: Config, **kwargs) -> OffsetIncrement: + return OffsetIncrement(page_size=model.page_size, config=config, options=model.options) + + @staticmethod + def create_page_increment(model: PageIncrementModel, config: Config, **kwargs) -> PageIncrement: + return PageIncrement(page_size=model.page_size, start_from_page=model.start_from_page, options=model.options) + + def create_parent_stream_config(self, model: ParentStreamConfigModel, config: Config, **kwargs) -> ParentStreamConfig: + declarative_stream = self._create_component_from_model(model.stream, config=config) + request_option = self._create_component_from_model(model.request_option, config=config) if model.request_option else None + return ParentStreamConfig( + parent_key=model.parent_key, + request_option=request_option, + stream=declarative_stream, + stream_slice_field=model.stream_slice_field, options=model.options, ) - if model.request_option - else None - ) - return ListStreamSlicer( - cursor_field=model.cursor_field, - request_option=request_option, - slice_values=model.slice_values, - config=config, - options=model.options, - ) + @staticmethod + def create_record_filter(model: RecordFilterModel, config: Config, **kwargs) -> RecordFilter: + return RecordFilter(condition=model.condition, config=config, options=model.options) + + @staticmethod + def create_request_option(model: RequestOptionModel, config: Config, **kwargs) -> RequestOption: + inject_into = RequestOptionType(model.inject_into.value) + return RequestOption(field_name=model.field_name, inject_into=inject_into, options={}) + + def create_record_selector(self, model: RecordSelectorModel, config: Config, **kwargs) -> RecordSelector: + extractor = self._create_component_from_model(model=model.extractor, config=config) + record_filter = self._create_component_from_model(model.record_filter, config=config) if model.record_filter else None + + return RecordSelector(extractor=extractor, record_filter=record_filter, options=model.options) -def create_min_max_datetime(model: MinMaxDatetimeModel, config: Config, **kwargs) -> MinMaxDatetime: - return MinMaxDatetime( - datetime=model.datetime, - datetime_format=model.datetime_format, - max_datetime=model.max_datetime, - min_datetime=model.min_datetime, - options=model.options, - ) - - -def create_no_auth(model: NoAuthModel, config: Config, **kwargs) -> NoAuth: - return NoAuth(options=model.options) - - -def create_no_pagination(model: NoPaginationModel, config: Config, **kwargs) -> NoPagination: - return NoPagination(options={}) - - -def create_oauth_authenticator(model: OAuthAuthenticatorModel, config: Config, **kwargs) -> DeclarativeOauth2Authenticator: - return DeclarativeOauth2Authenticator( - access_token_name=model.access_token_name, - client_id=model.client_id, - client_secret=model.client_secret, - expires_in_name=model.expires_in_name, - grant_type=model.grant_type, - refresh_request_body=model.refresh_request_body, - refresh_token=model.refresh_token, - scopes=model.scopes, - token_expiry_date=model.token_expiry_date, - token_expiry_date_format=model.token_expiry_date_format, - token_refresh_endpoint=model.token_refresh_endpoint, - config=config, - options=model.options, - ) - - -def create_offset_increment(model: OffsetIncrementModel, config: Config, **kwargs) -> OffsetIncrement: - return OffsetIncrement(page_size=model.page_size, config=config, options=model.options) - - -def create_page_increment(model: PageIncrementModel, config: Config, **kwargs) -> PageIncrement: - return PageIncrement(page_size=model.page_size, start_from_page=model.start_from_page, options=model.options) - - -def create_parent_stream_config(model: ParentStreamConfigModel, config: Config, **kwargs) -> ParentStreamConfig: - declarative_stream = _create_component_from_model(model.stream, config=config) - request_option = _create_component_from_model(model.request_option, config=config) if model.request_option else None - return ParentStreamConfig( - parent_key=model.parent_key, - request_option=request_option, - stream=declarative_stream, - stream_slice_field=model.stream_slice_field, - options=model.options, - ) - - -def create_record_filter(model: RecordFilterModel, config: Config, **kwargs) -> RecordFilter: - return RecordFilter(condition=model.condition, config=config, options=model.options) - - -def create_request_option(model: RequestOptionModel, config: Config, **kwargs) -> RequestOption: - inject_into = RequestOptionType(model.inject_into.value) - return RequestOption(field_name=model.field_name, inject_into=inject_into, options={}) - - -def create_record_selector(model: RecordSelectorModel, config: Config, **kwargs) -> RecordSelector: - extractor = _create_component_from_model(model=model.extractor, config=config) - record_filter = _create_component_from_model(model.record_filter, config=config) if model.record_filter else None - - return RecordSelector(extractor=extractor, record_filter=record_filter, options=model.options) - - -def create_remove_fields(model: RemoveFieldsModel, config: Config, **kwargs) -> RemoveFields: - return RemoveFields(field_pointers=model.field_pointers, options={}) - - -def create_session_token_authenticator(model: SessionTokenAuthenticatorModel, config: Config, **kwargs) -> SessionTokenAuthenticator: - return SessionTokenAuthenticator( - api_url=model.api_url, - header=model.header, - login_url=model.login_url, - password=model.password, - session_token=model.session_token, - session_token_response_key=model.session_token_response_key, - username=model.username, - validate_session_url=model.validate_session_url, - config=config, - options=model.options, - ) - - -def create_simple_retriever(model: SimpleRetrieverModel, config: Config, **kwargs) -> SimpleRetriever: - requester = _create_component_from_model(model=model.requester, config=config) - record_selector = _create_component_from_model(model=model.record_selector, config=config) - paginator = ( - _create_component_from_model(model=model.paginator, config=config, url_base=model.requester.url_base) - if model.paginator - else NoPagination(options={}) - ) - stream_slicer = ( - _create_component_from_model(model=model.stream_slicer, config=config) if model.stream_slicer else SingleSlice(options={}) - ) - - return SimpleRetriever( - name=model.name, - paginator=paginator, - primary_key=model.primary_key.__root__ if model.primary_key else None, - requester=requester, - record_selector=record_selector, - stream_slicer=stream_slicer, - config=config, - options=model.options, - ) + @staticmethod + def create_remove_fields(model: RemoveFieldsModel, config: Config, **kwargs) -> RemoveFields: + return RemoveFields(field_pointers=model.field_pointers, options={}) - -def create_single_slice(model: SingleSliceModel, config: Config, **kwargs) -> SingleSlice: - return SingleSlice(options={}) - - -def create_spec(model: SpecModel, config: Config, **kwargs) -> Spec: - return Spec(connection_specification=model.connection_specification, documentation_url=model.documentation_url, options={}) - - -def create_substream_slicer(model: SubstreamSlicerModel, config: Config, **kwargs) -> SubstreamSlicer: - parent_stream_configs = [] - if model.parent_stream_configs: - parent_stream_configs.extend( - [ - _create_component_from_model(model=parent_stream_config, config=config) - for parent_stream_config in model.parent_stream_configs - ] + @staticmethod + def create_session_token_authenticator(model: SessionTokenAuthenticatorModel, config: Config, **kwargs) -> SessionTokenAuthenticator: + return SessionTokenAuthenticator( + api_url=model.api_url, + header=model.header, + login_url=model.login_url, + password=model.password, + session_token=model.session_token, + session_token_response_key=model.session_token_response_key, + username=model.username, + validate_session_url=model.validate_session_url, + config=config, + options=model.options, ) - return SubstreamSlicer(parent_stream_configs=parent_stream_configs, options=model.options) - - -def create_wait_time_from_header(model: WaitTimeFromHeaderModel, config: Config, **kwargs) -> WaitTimeFromHeaderBackoffStrategy: - return WaitTimeFromHeaderBackoffStrategy(header=model.header, options=model.options, config=config, regex=model.regex) - - -def create_wait_until_time_from_header( - model: WaitUntilTimeFromHeaderModel, config: Config, **kwargs -) -> WaitUntilTimeFromHeaderBackoffStrategy: - return WaitUntilTimeFromHeaderBackoffStrategy( - header=model.header, options=model.options, config=config, min_wait=model.min_wait, regex=model.regex - ) - - -PYDANTIC_MODEL_TO_CONSTRUCTOR: [Type[BaseModel], Callable] = { - AddedFieldDefinitionModel: create_added_field_definition, - AddFieldsModel: create_add_fields, - ApiKeyAuthenticatorModel: create_api_key_authenticator, - BasicHttpAuthenticatorModel: create_basic_http_authenticator, - BearerAuthenticatorModel: create_bearer_authenticator, - CheckStreamModel: create_check_stream, - CartesianProductStreamSlicerModel: create_cartesian_product_slicer, - CompositeErrorHandlerModel: create_composite_error_handler, - ConstantBackoffStrategyModel: create_constant_backoff_strategy, - CursorPaginationModel: create_cursor_pagination, - CustomAuthenticatorModel: create_custom_component, - CustomBackoffStrategyModel: create_custom_component, - CustomErrorHandlerModel: create_custom_component, - CustomRecordExtractorModel: create_custom_component, - CustomRequesterModel: create_custom_component, - CustomRequestOptionsProviderModel: create_custom_component, # todo: Remove later when we deprecate request providers from interface - CustomRetrieverModel: create_custom_component, - CustomPaginationStrategyModel: create_custom_component, - CustomStreamSlicerModel: create_custom_component, - CustomTransformationModel: create_custom_component, - DatetimeStreamSlicerModel: create_datetime_stream_slicer, - DeclarativeStreamModel: create_declarative_stream, - DefaultErrorHandlerModel: create_default_error_handler, - DefaultPaginatorModel: create_default_paginator, - DpathExtractorModel: create_dpath_extractor, - ExponentialBackoffStrategyModel: create_exponential_backoff_strategy, - HttpRequesterModel: create_http_requester, - HttpResponseFilterModel: create_http_response_filter, - InlineSchemaLoaderModel: create_inline_schema_loader, - InterpolatedRequestOptionsProviderModel: create_interpolated_request_options_provider, - JsonDecoderModel: create_json_decoder, - JsonFileSchemaLoaderModel: create_json_file_schema_loader, - ListStreamSlicerModel: create_list_stream_slicer, - MinMaxDatetimeModel: create_min_max_datetime, - NoAuthModel: create_no_auth, - NoPaginationModel: create_no_pagination, - OAuthAuthenticatorModel: create_oauth_authenticator, - OffsetIncrementModel: create_offset_increment, - PageIncrementModel: create_page_increment, - ParentStreamConfigModel: create_parent_stream_config, - RecordFilterModel: create_record_filter, - RecordSelectorModel: create_record_selector, - RemoveFieldsModel: create_remove_fields, - RequestOptionModel: create_request_option, - SessionTokenAuthenticatorModel: create_session_token_authenticator, - SimpleRetrieverModel: create_simple_retriever, - SingleSliceModel: create_single_slice, - SpecModel: create_spec, - SubstreamSlicerModel: create_substream_slicer, - WaitTimeFromHeaderModel: create_wait_time_from_header, - WaitUntilTimeFromHeaderModel: create_wait_until_time_from_header, -} - - -# Needed for the case where we need to perform a second parse on the fields of a custom component -TYPE_NAME_TO_MODEL = {cls.__name__: cls for cls in PYDANTIC_MODEL_TO_CONSTRUCTOR} + def create_simple_retriever(self, model: SimpleRetrieverModel, config: Config, **kwargs) -> SimpleRetriever: + requester = self._create_component_from_model(model=model.requester, config=config) + record_selector = self._create_component_from_model(model=model.record_selector, config=config) + paginator = ( + self._create_component_from_model(model=model.paginator, config=config, url_base=model.requester.url_base) + if model.paginator + else NoPagination(options={}) + ) + stream_slicer = ( + self._create_component_from_model(model=model.stream_slicer, config=config) if model.stream_slicer else SingleSlice(options={}) + ) + + if self._limit_slices_fetched: + return SimpleRetrieverTestReadDecorator( + name=model.name, + paginator=paginator, + primary_key=model.primary_key.__root__ if model.primary_key else None, + requester=requester, + record_selector=record_selector, + stream_slicer=stream_slicer, + config=config, + maximum_number_of_slices=self._limit_slices_fetched, + options=model.options, + ) + return SimpleRetriever( + name=model.name, + paginator=paginator, + primary_key=model.primary_key.__root__ if model.primary_key else None, + requester=requester, + record_selector=record_selector, + stream_slicer=stream_slicer, + config=config, + options=model.options, + ) + + @staticmethod + def create_single_slice(model: SingleSliceModel, config: Config, **kwargs) -> SingleSlice: + return SingleSlice(options={}) + + @staticmethod + def create_spec(model: SpecModel, config: Config, **kwargs) -> Spec: + return Spec(connection_specification=model.connection_specification, documentation_url=model.documentation_url, options={}) + + def create_substream_slicer(self, model: SubstreamSlicerModel, config: Config, **kwargs) -> SubstreamSlicer: + parent_stream_configs = [] + if model.parent_stream_configs: + parent_stream_configs.extend( + [ + self._create_component_from_model(model=parent_stream_config, config=config) + for parent_stream_config in model.parent_stream_configs + ] + ) + + return SubstreamSlicer(parent_stream_configs=parent_stream_configs, options=model.options) + + @staticmethod + def create_wait_time_from_header(model: WaitTimeFromHeaderModel, config: Config, **kwargs) -> WaitTimeFromHeaderBackoffStrategy: + return WaitTimeFromHeaderBackoffStrategy(header=model.header, options=model.options, config=config, regex=model.regex) + + @staticmethod + def create_wait_until_time_from_header( + model: WaitUntilTimeFromHeaderModel, config: Config, **kwargs + ) -> WaitUntilTimeFromHeaderBackoffStrategy: + return WaitUntilTimeFromHeaderBackoffStrategy( + header=model.header, options=model.options, config=config, min_wait=model.min_wait, regex=model.regex + ) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/__init__.py index c8bd6fd13ffd..cd569f52abcc 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/__init__.py @@ -2,9 +2,9 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import DefaultPaginator +from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import DefaultPaginator, PaginatorTestReadDecorator from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy -__all__ = ["DefaultPaginator", "NoPagination", "PaginationStrategy", "Paginator"] +__all__ = ["DefaultPaginator", "NoPagination", "PaginationStrategy", "Paginator", "PaginatorTestReadDecorator"] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py index 2ef4fcd9d672..ed87e6a07087 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py @@ -160,3 +160,69 @@ def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, A if option_type != RequestOptionType.path: options[self.page_size_option.field_name] = self.pagination_strategy.get_page_size() return options + + +class PaginatorTestReadDecorator(Paginator): + """ + In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of + pages that are queried throughout a read command. + """ + + _PAGE_COUNT_BEFORE_FIRST_NEXT_CALL = 1 + + def __init__(self, decorated, maximum_number_of_pages: int = 5): + if maximum_number_of_pages and maximum_number_of_pages < 1: + raise ValueError(f"The maximum number of pages on a test read needs to be strictly positive. Got {maximum_number_of_pages}") + self._maximum_number_of_pages = maximum_number_of_pages + self._decorated = decorated + self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL + + def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: + if self._page_count >= self._maximum_number_of_pages: + return None + + self._page_count += 1 + return self._decorated.next_page_token(response, last_records) + + def path(self): + return self._decorated.path() + + def get_request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._decorated.get_request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + + def get_request_headers( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, str]: + return self._decorated.get_request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + + def get_request_body_data( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._decorated.get_request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + + def get_request_body_json( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._decorated.get_request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + + def reset(self): + self._decorated.reset() + self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py index bd96fc27b153..01d8d01e768b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py @@ -2,7 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import abstractmethod +from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any, List, Mapping, Optional @@ -12,7 +12,7 @@ @dataclass -class Paginator(RequestOptionsProvider, JsonSchemaMixin): +class Paginator(ABC, RequestOptionsProvider, JsonSchemaMixin): """ Defines the token to use to fetch the next page of records from the API. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/__init__.py index 9c47818b3e72..a160e9d27da7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/__init__.py @@ -3,6 +3,6 @@ # from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever -from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever +from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever, SimpleRetrieverTestReadDecorator -__all__ = ["Retriever", "SimpleRetriever"] +__all__ = ["Retriever", "SimpleRetriever", "SimpleRetrieverTestReadDecorator"] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 65128a349483..049b4af70a47 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -5,6 +5,7 @@ import json import logging from dataclasses import InitVar, dataclass, field +from itertools import islice from json import JSONDecodeError from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union @@ -416,6 +417,28 @@ def _parse_records_and_emit_request_and_responses(self, request, response, strea yield from self.parse_response(response, stream_slice=stream_slice, stream_state=stream_state) +@dataclass +class SimpleRetrieverTestReadDecorator(SimpleRetriever): + """ + In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of + slices that are queried throughout a read command. + """ + + maximum_number_of_slices: int = 5 + + def __post_init__(self, options: Mapping[str, Any]): + super().__post_init__(options) + if self.maximum_number_of_slices and self.maximum_number_of_slices < 1: + raise ValueError( + f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}" + ) + + def stream_slices( + self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Optional[StreamState] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + return islice(super().stream_slices(sync_mode=sync_mode, stream_state=stream_state), self.maximum_number_of_slices) + + def _prepared_request_to_airbyte_message(request: requests.PreparedRequest) -> AirbyteMessage: # FIXME: this should return some sort of trace message request_dict = { diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index ccb663e991ba..8424ac5cb726 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.22.0", + version="0.23.0", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py index 8aec567318bb..a55726e99227 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py @@ -9,7 +9,12 @@ import requests from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean -from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import DefaultPaginator, RequestOption, RequestOptionType +from airbyte_cdk.sources.declarative.requesters.paginators.default_paginator import ( + DefaultPaginator, + PaginatorTestReadDecorator, + RequestOption, + RequestOptionType, +) from airbyte_cdk.sources.declarative.requesters.paginators.strategies.cursor_pagination_strategy import CursorPaginationStrategy @@ -202,3 +207,25 @@ def test_reset(): strategy = MagicMock() DefaultPaginator(strategy, config, url_base, options={}, page_size_option=page_size_request_option, page_token_option=page_token_request_option).reset() assert strategy.reset.called + + +def test_limit_page_fetched(): + maximum_number_of_pages = 5 + number_of_next_performed = maximum_number_of_pages - 1 + paginator = PaginatorTestReadDecorator( + DefaultPaginator( + page_size_option=MagicMock(), + page_token_option=MagicMock(), + pagination_strategy=MagicMock(), + config=MagicMock(), + url_base=MagicMock(), + options={}, + ), + maximum_number_of_pages + ) + + for _ in range(number_of_next_performed): + last_token = paginator.next_page_token(MagicMock(), MagicMock()) + assert last_token + + assert not paginator.next_page_token(MagicMock(), MagicMock()) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index abb33bf3ca37..1a8db7a368bf 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -15,6 +15,7 @@ from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( SimpleRetriever, + SimpleRetrieverTestReadDecorator, _prepared_request_to_airbyte_message, _response_to_airbyte_message, ) @@ -629,3 +630,28 @@ def test_response_to_airbyte_message(test_name, response_body, response_headers, actual_airbyte_message = _response_to_airbyte_message(response) assert expected_airbyte_message == actual_airbyte_message + + +def test_limit_stream_slices(): + maximum_number_of_slices = 4 + stream_slicer = MagicMock() + stream_slicer.stream_slices.return_value = _generate_slices(maximum_number_of_slices * 2) + retriever = SimpleRetrieverTestReadDecorator( + name="stream_name", + primary_key=primary_key, + requester=MagicMock(), + paginator=MagicMock(), + record_selector=MagicMock(), + stream_slicer=stream_slicer, + maximum_number_of_slices=maximum_number_of_slices, + options={}, + config={}, + ) + + truncated_slices = list(retriever.stream_slices(sync_mode=SyncMode.incremental, stream_state=None)) + + assert truncated_slices == _generate_slices(maximum_number_of_slices) + + +def _generate_slices(number_of_slices): + return [{"date": f"2022-01-0{day + 1}"} for day in range(number_of_slices)] diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py index 01faf5c25c44..7d22e1cb7674 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -6,6 +6,7 @@ import logging import os import sys +from unittest.mock import patch import pytest import yaml @@ -541,6 +542,94 @@ def test_manifest_without_at_least_one_stream(self, construct_using_pydantic_mod with pytest.raises(ValidationError): ManifestDeclarativeSource(source_config=manifest, construct_using_pydantic_models=construct_using_pydantic_models) + @patch("airbyte_cdk.sources.declarative.declarative_source.DeclarativeSource.read") + def test_given_debug_when_read_then_set_log_level(self, declarative_source_read): + any_valid_manifest = { + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"}, + "schema_loader": { + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "page_size": 10, + }, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + { + "type": "DeclarativeStream", + "$options": {"name": "stream_with_custom_requester", "primary_key": "id", "url_base": "https://api.sendgrid.com"}, + "schema_loader": { + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "page_size": 10, + }, + }, + "requester": { + "type": "CustomRequester", + "class_name": "unit_tests.sources.declarative.external_component.SampleCustomComponent", + "path": "/v3/marketing/lists", + "custom_request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + source = ManifestDeclarativeSource(source_config=any_valid_manifest, debug=True, construct_using_pydantic_models=True) + + debug_logger = logging.getLogger("logger.debug") + list(source.read(debug_logger, {}, {}, {})) + + assert debug_logger.isEnabledFor(logging.DEBUG) + def test_generate_schema(): schema_str = ManifestDeclarativeSource.generate_schema() diff --git a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py index bbb808a0ec74..b7747bc5f0d0 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py @@ -331,6 +331,57 @@ def test_valid_full_refresh_read_with_slices(mocker): assert expected == messages +def test_read_full_refresh_with_slices_sends_slice_messages(mocker): + """Given the logger is debug and a full refresh, AirbyteMessages are sent for slices""" + debug_logger = logging.getLogger("airbyte.debug") + debug_logger.setLevel(logging.DEBUG) + slices = [{"1": "1"}, {"2": "2"}] + stream = MockStream( + [({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices], + name="s1", + ) + + mocker.patch.object(MockStream, "get_json_schema", return_value={}) + mocker.patch.object(MockStream, "stream_slices", return_value=slices) + + src = MockSource(streams=[stream]) + catalog = ConfiguredAirbyteCatalog( + streams=[ + _configured_stream(stream, SyncMode.full_refresh), + ] + ) + + messages = src.read(debug_logger, {}, catalog) + + assert 2 == len(list(filter(lambda message: message.log and message.log.message.startswith("slice:"), messages))) + + +def test_read_incremental_with_slices_sends_slice_messages(mocker): + """Given the logger is debug and a incremental, AirbyteMessages are sent for slices""" + debug_logger = logging.getLogger("airbyte.debug") + debug_logger.setLevel(logging.DEBUG) + slices = [{"1": "1"}, {"2": "2"}] + stream = MockStream( + [({"sync_mode": SyncMode.incremental, "stream_slice": s, 'stream_state': {}}, [s]) for s in slices], + name="s1", + ) + + MockStream.supports_incremental = mocker.PropertyMock(return_value=True) + mocker.patch.object(MockStream, "get_json_schema", return_value={}) + mocker.patch.object(MockStream, "stream_slices", return_value=slices) + + src = MockSource(streams=[stream]) + catalog = ConfiguredAirbyteCatalog( + streams=[ + _configured_stream(stream, SyncMode.incremental), + ] + ) + + messages = src.read(debug_logger, {}, catalog) + + assert 2 == len(list(filter(lambda message: message.log and message.log.message.startswith("slice:"), messages))) + + class TestIncrementalRead: @pytest.mark.parametrize( "use_legacy", diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index 0cae909b2f08..b8a8291058e8 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -401,6 +401,7 @@ def test_internal_config_limit(mocker, abstract_source, catalog): logger_mock.level = logging.DEBUG del catalog.streams[1] STREAM_LIMIT = 2 + SLICE_DEBUG_LOG_COUNT = 1 FULL_RECORDS_NUMBER = 3 streams = abstract_source.streams(None) http_stream = streams[0] @@ -409,7 +410,7 @@ def test_internal_config_limit(mocker, abstract_source, catalog): catalog.streams[0].sync_mode = SyncMode.full_refresh records = [r for r in abstract_source.read(logger=logger_mock, config=internal_config, catalog=catalog, state={})] - assert len(records) == STREAM_LIMIT + assert len(records) == STREAM_LIMIT + SLICE_DEBUG_LOG_COUNT logger_info_args = [call[0][0] for call in logger_mock.info.call_args_list] # Check if log line matches number of limit read_log_record = [_l for _l in logger_info_args if _l.startswith("Read")] @@ -418,13 +419,13 @@ def test_internal_config_limit(mocker, abstract_source, catalog): # No limit, check if state record produced for incremental stream catalog.streams[0].sync_mode = SyncMode.incremental records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] - assert len(records) == FULL_RECORDS_NUMBER + 1 + assert len(records) == FULL_RECORDS_NUMBER + SLICE_DEBUG_LOG_COUNT + 1 assert records[-1].type == Type.STATE # Set limit and check if state is produced when limit is set for incremental stream logger_mock.reset_mock() records = [r for r in abstract_source.read(logger=logger_mock, config=internal_config, catalog=catalog, state={})] - assert len(records) == STREAM_LIMIT + 1 + assert len(records) == STREAM_LIMIT + SLICE_DEBUG_LOG_COUNT + 1 assert records[-1].type == Type.STATE logger_info_args = [call[0][0] for call in logger_mock.info.call_args_list] read_log_record = [_l for _l in logger_info_args if _l.startswith("Read")] @@ -435,6 +436,7 @@ def test_internal_config_limit(mocker, abstract_source, catalog): def test_source_config_no_transform(mocker, abstract_source, catalog): + SLICE_DEBUG_LOG_COUNT = 1 logger_mock = mocker.MagicMock() logger_mock.level = logging.DEBUG streams = abstract_source.streams(None) @@ -442,8 +444,8 @@ def test_source_config_no_transform(mocker, abstract_source, catalog): http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA http_stream.read_records.return_value, non_http_stream.read_records.return_value = [[{"value": 23}] * 5] * 2 records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] - assert len(records) == 2 * 5 - assert [r.record.data for r in records] == [{"value": 23}] * 2 * 5 + assert len(records) == 2 * (5 + SLICE_DEBUG_LOG_COUNT) + assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": 23}] * 2 * 5 assert http_stream.get_json_schema.call_count == 5 assert non_http_stream.get_json_schema.call_count == 5 @@ -451,6 +453,7 @@ def test_source_config_no_transform(mocker, abstract_source, catalog): def test_source_config_transform(mocker, abstract_source, catalog): logger_mock = mocker.MagicMock() logger_mock.level = logging.DEBUG + SLICE_DEBUG_LOG_COUNT = 2 streams = abstract_source.streams(None) http_stream, non_http_stream = streams http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) @@ -458,21 +461,22 @@ def test_source_config_transform(mocker, abstract_source, catalog): http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA http_stream.read_records.return_value, non_http_stream.read_records.return_value = [{"value": 23}], [{"value": 23}] records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] - assert len(records) == 2 - assert [r.record.data for r in records] == [{"value": "23"}] * 2 + assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT + assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": "23"}] * 2 def test_source_config_transform_and_no_transform(mocker, abstract_source, catalog): logger_mock = mocker.MagicMock() logger_mock.level = logging.DEBUG + SLICE_DEBUG_LOG_COUNT = 2 streams = abstract_source.streams(None) http_stream, non_http_stream = streams http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA http_stream.read_records.return_value, non_http_stream.read_records.return_value = [{"value": 23}], [{"value": 23}] records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] - assert len(records) == 2 - assert [r.record.data for r in records] == [{"value": "23"}, {"value": 23}] + assert len(records) == 2 + SLICE_DEBUG_LOG_COUNT + assert [r.record.data for r in records if r.type == Type.RECORD] == [{"value": "23"}, {"value": 23}] def test_read_default_http_availability_strategy_stream_available(catalog, mocker): diff --git a/airbyte-connector-builder-server/connector_builder/entrypoint.py b/airbyte-connector-builder-server/connector_builder/entrypoint.py index c27750155178..d2ee54d20c2b 100644 --- a/airbyte-connector-builder-server/connector_builder/entrypoint.py +++ b/airbyte-connector-builder-server/connector_builder/entrypoint.py @@ -4,10 +4,14 @@ from connector_builder.generated.apis.default_api_interface import initialize_router from connector_builder.impl.default_api import DefaultApiImpl -from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter +from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapterFactory from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware +_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5 +_MAXIMUM_NUMBER_OF_SLICES = 5 +_ADAPTER_FACTORY = LowCodeSourceAdapterFactory(_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, _MAXIMUM_NUMBER_OF_SLICES) + app = FastAPI( title="Connector Builder Server API", description="Connector Builder Server API ", @@ -22,4 +26,4 @@ allow_headers=["*"], ) -app.include_router(initialize_router(DefaultApiImpl(LowCodeSourceAdapter))) +app.include_router(initialize_router(DefaultApiImpl(_ADAPTER_FACTORY, _MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, _MAXIMUM_NUMBER_OF_SLICES))) diff --git a/airbyte-connector-builder-server/connector_builder/generated/models/stream_read.py b/airbyte-connector-builder-server/connector_builder/generated/models/stream_read.py index 163bb131e7c2..775b148fa3a7 100644 --- a/airbyte-connector-builder-server/connector_builder/generated/models/stream_read.py +++ b/airbyte-connector-builder-server/connector_builder/generated/models/stream_read.py @@ -19,11 +19,13 @@ class StreamRead(BaseModel): logs: The logs of this StreamRead. slices: The slices of this StreamRead. + test_read_limit_reached: The test_read_limit_reached of this StreamRead. inferred_schema: The inferred_schema of this StreamRead [Optional]. """ logs: List[object] slices: List[StreamReadSlices] + test_read_limit_reached: bool inferred_schema: Optional[Dict[str, Any]] = None StreamRead.update_forward_refs() diff --git a/airbyte-connector-builder-server/connector_builder/impl/adapter.py b/airbyte-connector-builder-server/connector_builder/impl/adapter.py index 840e0996fbaa..33da6ced709a 100644 --- a/airbyte-connector-builder-server/connector_builder/impl/adapter.py +++ b/airbyte-connector-builder-server/connector_builder/impl/adapter.py @@ -32,3 +32,11 @@ def read_stream(self, stream: str, config: Dict[str, Any]) -> Iterator[AirbyteMe :param config: The user-provided configuration as specified by the source's spec. :return: An iterator over `AirbyteMessage` objects. """ + + +class CdkAdapterFactory(ABC): + + @abstractmethod + def create(self, manifest: Dict[str, Any]) -> CdkAdapter: + """Return an implementation of CdkAdapter""" + pass diff --git a/airbyte-connector-builder-server/connector_builder/impl/default_api.py b/airbyte-connector-builder-server/connector_builder/impl/default_api.py index 973defc09c41..675e928f6cbb 100644 --- a/airbyte-connector-builder-server/connector_builder/impl/default_api.py +++ b/airbyte-connector-builder-server/connector_builder/impl/default_api.py @@ -6,7 +6,7 @@ import logging import traceback from json import JSONDecodeError -from typing import Any, Callable, Dict, Iterable, Iterator, Optional, Union +from typing import Any, Dict, Iterable, Iterator, Optional, Union from urllib.parse import parse_qs, urljoin, urlparse from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type @@ -24,17 +24,21 @@ from connector_builder.generated.models.streams_list_read import StreamsListRead from connector_builder.generated.models.streams_list_read_streams import StreamsListReadStreams from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody -from connector_builder.impl.adapter import CdkAdapter +from connector_builder.impl.adapter import CdkAdapter, CdkAdapterFactory from fastapi import Body, HTTPException from jsonschema import ValidationError class DefaultApiImpl(DefaultApi): + logger = logging.getLogger("airbyte.connector-builder") - def __init__(self, adapter_cls: Callable[[Dict[str, Any]], CdkAdapter], max_record_limit: int = 1000): - self.adapter_cls = adapter_cls + def __init__(self, adapter_factory: CdkAdapterFactory, max_pages_per_slice, max_slices, max_record_limit: int = 1000): + self.adapter_factory = adapter_factory + self._max_pages_per_slice = max_pages_per_slice + self._max_slices = max_slices self.max_record_limit = max_record_limit + super().__init__() async def get_manifest_template(self) -> str: @@ -128,7 +132,7 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo else: record_limit = min(stream_read_request_body.record_limit, self.max_record_limit) - single_slice = StreamReadSlices(pages=[]) + slices = [] log_messages = [] try: for message_group in self._get_message_groups( @@ -139,7 +143,7 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo if isinstance(message_group, AirbyteLogMessage): log_messages.append({"message": message_group.message}) else: - single_slice.pages.append(message_group) + slices.append(message_group) except Exception as error: # TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec self.logger.error(f"Could not perform read with with error: {error.args[0]} - {self._get_stacktrace_as_string(error)}") @@ -149,9 +153,21 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo ) return StreamRead( - logs=log_messages, slices=[single_slice], inferred_schema=schema_inferrer.get_stream_schema(stream_read_request_body.stream) + logs=log_messages, + slices=slices, + test_read_limit_reached=self._has_reached_limit(slices), + inferred_schema=schema_inferrer.get_stream_schema(stream_read_request_body.stream) ) + def _has_reached_limit(self, slices): + if len(slices) >= self._max_slices: + return True + + for slice in slices: + if len(slice.pages) >= self._max_pages_per_slice: + return True + return False + async def resolve_manifest( self, resolve_manifest_request_body: ResolveManifestRequestBody = Body(None, description="") ) -> ResolveManifest: @@ -191,33 +207,56 @@ def _get_message_groups( Note: The exception is that normal log messages can be received at any time which are not incorporated into grouping """ - first_page = True - current_records = [] + records_count = 0 + at_least_one_page_in_group = False + current_page_records = [] + current_slice_pages = [] current_page_request: Optional[HttpRequest] = None current_page_response: Optional[HttpResponse] = None - while len(current_records) < limit and (message := next(messages, None)): - if first_page and message.type == Type.LOG and message.log.message.startswith("request:"): - first_page = False - request = self._create_request_from_log_message(message.log) - current_page_request = request + while records_count < limit and (message := next(messages, None)): + if self._need_to_close_page(at_least_one_page_in_group, message): + self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records) + current_page_request = None + current_page_response = None + + if at_least_one_page_in_group and message.type == Type.LOG and message.log.message.startswith("slice:"): + yield StreamReadSlices(pages=current_slice_pages) + current_slice_pages = [] + at_least_one_page_in_group = False elif message.type == Type.LOG and message.log.message.startswith("request:"): - if not current_page_request or not current_page_response: - raise ValueError("Every message grouping should have at least one request and response") - yield StreamReadPages(request=current_page_request, response=current_page_response, records=current_records) + if not at_least_one_page_in_group: + at_least_one_page_in_group = True current_page_request = self._create_request_from_log_message(message.log) - current_records = [] elif message.type == Type.LOG and message.log.message.startswith("response:"): current_page_response = self._create_response_from_log_message(message.log) elif message.type == Type.LOG: yield message.log elif message.type == Type.RECORD: - current_records.append(message.record.data) + current_page_records.append(message.record.data) + records_count += 1 schema_inferrer.accumulate(message.record) else: - if not current_page_request or not current_page_response: - raise ValueError("Every message grouping should have at least one request and response") - yield StreamReadPages(request=current_page_request, response=current_page_response, records=current_records) + self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records) + yield StreamReadSlices(pages=current_slice_pages) + + @staticmethod + def _need_to_close_page(at_least_one_page_in_group, message): + return ( + at_least_one_page_in_group + and message.type == Type.LOG + and (message.log.message.startswith("request:") or message.log.message.startswith("slice:")) + ) + + @staticmethod + def _close_page(current_page_request, current_page_response, current_slice_pages, current_page_records): + if not current_page_request or not current_page_response: + raise ValueError("Every message grouping should have at least one request and response") + + current_slice_pages.append( + StreamReadPages(request=current_page_request, response=current_page_response, records=current_page_records) + ) + current_page_records.clear() def _create_request_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpRequest]: # TODO: As a temporary stopgap, the CDK emits request data as a log message string. Ideally this should come in the @@ -255,7 +294,7 @@ def _create_response_from_log_message(self, log_message: AirbyteLogMessage) -> O def _create_low_code_adapter(self, manifest: Dict[str, Any]) -> CdkAdapter: try: - return self.adapter_cls(manifest=manifest) + return self.adapter_factory.create(manifest) except ValidationError as error: # TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec self.logger.error(f"Invalid connector manifest with error: {error.message} - {DefaultApiImpl._get_stacktrace_as_string(error)}") diff --git a/airbyte-connector-builder-server/connector_builder/impl/low_code_cdk_adapter.py b/airbyte-connector-builder-server/connector_builder/impl/low_code_cdk_adapter.py index 580d0d7c42e7..f298cc3588ad 100644 --- a/airbyte-connector-builder-server/connector_builder/impl/low_code_cdk_adapter.py +++ b/airbyte-connector-builder-server/connector_builder/impl/low_code_cdk_adapter.py @@ -7,15 +7,20 @@ from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, ConfiguredAirbyteCatalog, Level from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory from airbyte_cdk.sources.declarative.yaml_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.streams.http import HttpStream -from connector_builder.impl.adapter import CdkAdapter +from connector_builder.impl.adapter import CdkAdapter, CdkAdapterFactory class LowCodeSourceAdapter(CdkAdapter): - def __init__(self, manifest: Dict[str, Any]): + def __init__(self, manifest: Dict[str, Any], limit_page_fetched_per_slice, limit_slices_fetched): # Request and response messages are only emitted for a sources that have debug turned on - self._source = ManifestDeclarativeSource(manifest, debug=True) + self._source = ManifestDeclarativeSource( + manifest, + debug=True, + component_factory=ModelToComponentFactory(limit_page_fetched_per_slice, limit_slices_fetched) + ) def get_http_streams(self, config: Dict[str, Any]) -> List[HttpStream]: http_streams = [] @@ -58,3 +63,13 @@ def read_stream(self, stream: str, config: Dict[str, Any]) -> Iterator[AirbyteMe except Exception as e: yield AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.ERROR, message=str(e))) return + + +class LowCodeSourceAdapterFactory(CdkAdapterFactory): + + def __init__(self, max_pages_per_slice, max_slices): + self._max_pages_per_slice = max_pages_per_slice + self._max_slices = max_slices + + def create(self, manifest: Dict[str, Any]) -> CdkAdapter: + return LowCodeSourceAdapter(manifest, self._max_pages_per_slice, self._max_slices) diff --git a/airbyte-connector-builder-server/setup.py b/airbyte-connector-builder-server/setup.py index b674bfc6f836..dc8c94871803 100644 --- a/airbyte-connector-builder-server/setup.py +++ b/airbyte-connector-builder-server/setup.py @@ -41,7 +41,7 @@ }, packages=find_packages(exclude=("unit_tests", "integration_tests", "docs")), package_data={}, - install_requires=["airbyte-cdk==0.22", "fastapi", "uvicorn"], + install_requires=["airbyte-cdk==0.23", "fastapi", "uvicorn"], python_requires=">=3.9.11", extras_require={ "tests": [ diff --git a/airbyte-connector-builder-server/src/main/openapi/openapi.yaml b/airbyte-connector-builder-server/src/main/openapi/openapi.yaml index a104000d0fcb..6f2147668e99 100644 --- a/airbyte-connector-builder-server/src/main/openapi/openapi.yaml +++ b/airbyte-connector-builder-server/src/main/openapi/openapi.yaml @@ -98,6 +98,7 @@ components: required: - logs - slices + - test_read_limit_reached properties: logs: type: array @@ -144,6 +145,9 @@ components: type: object description: The STATE AirbyteMessage emitted at the end of this slice. This can be omitted if a stream slicer is not configured. # $ref: "#/components/schemas/AirbyteProtocol/definitions/AirbyteStateMessage" + test_read_limit_reached: + type: boolean + description: Whether the maximum number of request per slice or the maximum number of slices queried has been reached inferred_schema: type: object description: The narrowest JSON Schema against which every AirbyteRecord in the slices can validate successfully. This is inferred from reading every record in the output slices. diff --git a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py index 4dead48f3dad..ea1d10682e2d 100644 --- a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py +++ b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py @@ -20,10 +20,13 @@ from connector_builder.generated.models.streams_list_read_streams import StreamsListReadStreams from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody from connector_builder.impl.default_api import DefaultApiImpl -from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter +from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapterFactory from fastapi import HTTPException from pydantic.error_wrappers import ValidationError +MAX_PAGES_PER_SLICE = 4 +MAX_SLICES = 3 + MANIFEST = { "version": "0.1.0", "type": "DeclarativeSource", @@ -95,13 +98,17 @@ def record_message(stream: str, data: dict) -> AirbyteMessage: return AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream=stream, data=data, emitted_at=1234)) +def slice_message() -> AirbyteMessage: + return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message='slice:{"key": "value"}')) + + def test_list_streams(): expected_streams = [ StreamsListReadStreams(name="hashiras", url="https://demonslayers.com/api/v1/hashiras"), StreamsListReadStreams(name="breathing-techniques", url="https://demonslayers.com/api/v1/breathing_techniques"), ] - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) streams_list_request_body = StreamsListRequestBody(manifest=MANIFEST, config=CONFIG) loop = asyncio.get_event_loop() actual_streams = loop.run_until_complete(api.list_streams(streams_list_request_body)) @@ -135,7 +142,7 @@ def test_list_streams_with_interpolated_urls(): expected_streams = StreamsListRead(streams=[StreamsListReadStreams(name="demons", url="https://upper-six.muzan.com/api/v1/demons")]) - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) streams_list_request_body = StreamsListRequestBody(manifest=manifest, config=CONFIG) loop = asyncio.get_event_loop() actual_streams = loop.run_until_complete(api.list_streams(streams_list_request_body)) @@ -169,7 +176,7 @@ def test_list_streams_with_unresolved_interpolation(): # The interpolated string {{ config['not_in_config'] }} doesn't resolve to anything so it ends up blank during interpolation expected_streams = StreamsListRead(streams=[StreamsListReadStreams(name="demons", url="https://.muzan.com/api/v1/demons")]) - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) streams_list_request_body = StreamsListRequestBody(manifest=manifest, config=CONFIG) loop = asyncio.get_event_loop() @@ -212,7 +219,7 @@ def test_read_stream(): ), ] - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ request_log_message(request), @@ -226,7 +233,7 @@ def test_read_stream(): ) ) - api = DefaultApiImpl(mock_source_adapter_cls) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() actual_response: StreamRead = loop.run_until_complete( @@ -277,7 +284,7 @@ def test_read_stream_with_logs(): {"message": "log message after the response"}, ] - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message="log message before the request")), @@ -291,7 +298,7 @@ def test_read_stream_with_logs(): ) ) - api = DefaultApiImpl(mock_source_adapter_cls) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() actual_response: StreamRead = loop.run_until_complete( @@ -320,7 +327,7 @@ def test_read_stream_record_limit(request_record_limit, max_record_limit): "body": {"custom": "field"}, } response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ request_log_message(request), @@ -337,7 +344,7 @@ def test_read_stream_record_limit(request_record_limit, max_record_limit): n_records = 2 record_limit = min(request_record_limit, max_record_limit) - api = DefaultApiImpl(mock_source_adapter_cls, max_record_limit=max_record_limit) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES, max_record_limit=max_record_limit) loop = asyncio.get_event_loop() actual_response: StreamRead = loop.run_until_complete( api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config=CONFIG, stream="hashiras", record_limit=request_record_limit)) @@ -363,7 +370,7 @@ def test_read_stream_default_record_limit(max_record_limit): "body": {"custom": "field"}, } response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ request_log_message(request), @@ -379,7 +386,7 @@ def test_read_stream_default_record_limit(max_record_limit): ) n_records = 2 - api = DefaultApiImpl(mock_source_adapter_cls, max_record_limit=max_record_limit) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES, max_record_limit=max_record_limit) loop = asyncio.get_event_loop() actual_response: StreamRead = loop.run_until_complete( api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config=CONFIG, stream="hashiras")) @@ -398,7 +405,7 @@ def test_read_stream_limit_0(): "body": {"custom": "field"}, } response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ request_log_message(request), @@ -412,7 +419,7 @@ def test_read_stream_limit_0(): ] ) ) - api = DefaultApiImpl(mock_source_adapter_cls) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() with pytest.raises(ValidationError): @@ -453,7 +460,7 @@ def test_read_stream_no_records(): ), ] - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ request_log_message(request), @@ -464,7 +471,7 @@ def test_read_stream_no_records(): ) ) - api = DefaultApiImpl(mock_source_adapter_cls) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() actual_response: StreamRead = loop.run_until_complete( @@ -501,7 +508,7 @@ def test_invalid_manifest(): expected_status_code = 400 - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() with pytest.raises(HTTPException) as actual_exception: loop.run_until_complete(api.read_stream(StreamReadRequestBody(manifest=invalid_manifest, config={}, stream="hashiras"))) @@ -512,7 +519,7 @@ def test_invalid_manifest(): def test_read_stream_invalid_group_format(): response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} - mock_source_adapter_cls = make_mock_adapter_cls( + mock_source_adapter_cls = make_mock_adapter_factory( iter( [ response_log_message(response), @@ -522,7 +529,7 @@ def test_read_stream_invalid_group_format(): ) ) - api = DefaultApiImpl(mock_source_adapter_cls) + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() with pytest.raises(HTTPException) as actual_exception: @@ -534,7 +541,7 @@ def test_read_stream_invalid_group_format(): def test_read_stream_returns_error_if_stream_does_not_exist(): expected_status_code = 400 - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() with pytest.raises(HTTPException) as actual_exception: loop.run_until_complete(api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config={}, stream="not_in_manifest"))) @@ -584,7 +591,7 @@ def test_read_stream_returns_error_if_stream_does_not_exist(): ) def test_create_request_from_log_message(log_message, expected_request): airbyte_log_message = AirbyteLogMessage(level=Level.INFO, message=log_message) - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) actual_request = api._create_request_from_log_message(airbyte_log_message) assert actual_request == expected_request @@ -619,12 +626,101 @@ def test_create_response_from_log_message(log_message, expected_response): response_message = f"response:{json.dumps(log_message)}" airbyte_log_message = AirbyteLogMessage(level=Level.INFO, message=response_message) - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) actual_response = api._create_response_from_log_message(airbyte_log_message) assert actual_response == expected_response +def test_read_stream_with_many_slices(): + request = {} + response = {"status_code": 200} + + mock_source_adapter_cls = make_mock_adapter_factory( + iter( + [ + slice_message(), + request_log_message(request), + response_log_message(response), + record_message("hashiras", {"name": "Muichiro Tokito"}), + slice_message(), + request_log_message(request), + response_log_message(response), + record_message("hashiras", {"name": "Shinobu Kocho"}), + record_message("hashiras", {"name": "Mitsuri Kanroji"}), + request_log_message(request), + response_log_message(response), + record_message("hashiras", {"name": "Obanai Iguro"}), + request_log_message(request), + response_log_message(response), + ] + ) + ) + + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) + + loop = asyncio.get_event_loop() + stream_read: StreamRead = loop.run_until_complete( + api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config=CONFIG, stream="hashiras")) + ) + + assert not stream_read.test_read_limit_reached + assert len(stream_read.slices) == 2 + + assert len(stream_read.slices[0].pages) == 1 + assert len(stream_read.slices[0].pages[0].records) == 1 + + assert len(stream_read.slices[1].pages) == 3 + assert len(stream_read.slices[1].pages[0].records) == 2 + assert len(stream_read.slices[1].pages[1].records) == 1 + assert len(stream_read.slices[1].pages[2].records) == 0 + + + +def test_read_stream_given_maximum_number_of_slices_then_test_read_limit_reached(): + maximum_number_of_slices = 5 + request = {} + response = {"status_code": 200} + mock_source_adapter_cls = make_mock_adapter_factory( + iter( + [ + slice_message(), + request_log_message(request), + response_log_message(response) + ] * maximum_number_of_slices + ) + ) + + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) + + loop = asyncio.get_event_loop() + stream_read: StreamRead = loop.run_until_complete( + api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config=CONFIG, stream="hashiras")) + ) + + assert stream_read.test_read_limit_reached + + +def test_read_stream_given_maximum_number_of_pages_then_test_read_limit_reached(): + maximum_number_of_pages_per_slice = 5 + request = {} + response = {"status_code": 200} + mock_source_adapter_cls = make_mock_adapter_factory( + iter( + [slice_message()] + [request_log_message(request), response_log_message(response)] * maximum_number_of_pages_per_slice + ) + ) + + api = DefaultApiImpl(mock_source_adapter_cls, MAX_PAGES_PER_SLICE, MAX_SLICES) + + loop = asyncio.get_event_loop() + stream_read: StreamRead = loop.run_until_complete( + api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config=CONFIG, stream="hashiras")) + ) + + assert stream_read.test_read_limit_reached + + def test_resolve_manifest(): _stream_name = "stream_with_custom_requester" _stream_primary_key = "id" @@ -775,7 +871,7 @@ def test_resolve_manifest(): "check": {"type": "CheckStream", "stream_names": ["lists"]}, } - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() actual_response: ResolveManifest = loop.run_until_complete(api.resolve_manifest(ResolveManifestRequestBody(manifest=manifest))) @@ -794,7 +890,7 @@ def test_resolve_manifest_unresolvable_references(): "check": {"type": "CheckStream", "stream_names": ["lists"]}, } - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() with pytest.raises(HTTPException) as actual_exception: loop.run_until_complete(api.resolve_manifest(ResolveManifestRequestBody(manifest=invalid_manifest))) @@ -807,7 +903,7 @@ def test_resolve_manifest_invalid(): expected_status_code = 400 invalid_manifest = {"version": "version"} - api = DefaultApiImpl(LowCodeSourceAdapter) + api = DefaultApiImpl(LowCodeSourceAdapterFactory(MAX_PAGES_PER_SLICE, MAX_SLICES), MAX_PAGES_PER_SLICE, MAX_SLICES) loop = asyncio.get_event_loop() with pytest.raises(HTTPException) as actual_exception: loop.run_until_complete(api.resolve_manifest(ResolveManifestRequestBody(manifest=invalid_manifest))) @@ -816,9 +912,9 @@ def test_resolve_manifest_invalid(): assert actual_exception.value.status_code == expected_status_code -def make_mock_adapter_cls(return_value: Iterator) -> MagicMock: - mock_source_adapter_cls = MagicMock() +def make_mock_adapter_factory(return_value: Iterator) -> MagicMock: + mock_source_adapter_factory = MagicMock() mock_source_adapter = MagicMock() mock_source_adapter.read_stream.return_value = return_value - mock_source_adapter_cls.return_value = mock_source_adapter - return mock_source_adapter_cls + mock_source_adapter_factory.create.return_value = mock_source_adapter + return mock_source_adapter_factory diff --git a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_low_code_cdk_adapter.py b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_low_code_cdk_adapter.py index b097f5e9a293..645969186ff9 100644 --- a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_low_code_cdk_adapter.py +++ b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_low_code_cdk_adapter.py @@ -10,6 +10,8 @@ import requests from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, Level, Type from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk.sources.declarative.requesters.paginators import PaginatorTestReadDecorator +from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetrieverTestReadDecorator from airbyte_cdk.sources.declarative.parsers.custom_exceptions import UndefinedReferenceException from airbyte_cdk.sources.streams.http import HttpStream @@ -176,10 +178,37 @@ def parse_response( } } +MANIFEST_WITH_PAGINATOR = { + "version": "0.1.0", + "type" : "DeclarativeSource", + "definitions": { + }, + "streams": [ + { + "type" : "DeclarativeStream", + "retriever": { + "type" : "SimpleRetriever", + "record_selector": {"extractor": {"field_pointer": ["items"], "type": "DpathExtractor"}, "type": "RecordSelector"}, + "paginator": { + "type": "DefaultPaginator", + "pagination_strategy": { + "type": "OffsetIncrement", + "page_size": 10 + }, + "url_base": "https://demonslayers.com/api/v1/" + }, + "requester": {"url_base": "https://demonslayers.com/api/v1/", "http_method": "GET", "type": "HttpRequester"}, + }, + "$options": {"name": "hashiras", "path": "/hashiras"}, + }, + ], + "check": {"stream_names": ["hashiras"], "type": "CheckStream"}, +} + def test_get_http_streams(): expected_urls = {"https://demonslayers.com/api/v1/breathing_techniques", "https://demonslayers.com/api/v1/hashiras"} - adapter = LowCodeSourceAdapter(MANIFEST) + adapter = LowCodeSourceAdapter(MANIFEST, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) actual_streams = adapter.get_http_streams(config={}) actual_urls = {http_stream.url_base + http_stream.path() for http_stream in actual_streams} @@ -187,10 +216,13 @@ def test_get_http_streams(): assert actual_urls == expected_urls +MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5 +MAXIMUM_NUMBER_OF_SLICES = 5 + def test_get_http_manifest_with_references(): expected_urls = {"https://demonslayers.com/api/v1/ranks"} - adapter = LowCodeSourceAdapter(MANIFEST_WITH_REFERENCES) + adapter = LowCodeSourceAdapter(MANIFEST_WITH_REFERENCES, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) actual_streams = adapter.get_http_streams(config={}) actual_urls = {http_stream.url_base + http_stream.path() for http_stream in actual_streams} @@ -204,7 +236,7 @@ def test_get_http_streams_non_declarative_streams(): mock_source = MagicMock() mock_source.streams.return_value = [non_declarative_stream] - adapter = LowCodeSourceAdapter(MANIFEST) + adapter = LowCodeSourceAdapter(MANIFEST, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) adapter._source = mock_source with pytest.raises(TypeError): adapter.get_http_streams(config={}) @@ -217,7 +249,7 @@ def test_get_http_streams_non_http_stream(): mock_source = MagicMock() mock_source.streams.return_value = [declarative_stream_non_http_retriever] - adapter = LowCodeSourceAdapter(MANIFEST) + adapter = LowCodeSourceAdapter(MANIFEST, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) adapter._source = mock_source with pytest.raises(TypeError): adapter.get_http_streams(config={}) @@ -247,7 +279,7 @@ def test_read_streams(): mock_source = MagicMock() mock_source.read.return_value = expected_messages - adapter = LowCodeSourceAdapter(MANIFEST) + adapter = LowCodeSourceAdapter(MANIFEST, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) adapter._source = mock_source actual_messages = list(adapter.read_stream("hashiras", {})) @@ -272,7 +304,7 @@ def return_value(*args, **kwargs): mock_source.read.side_effect = return_value - adapter = LowCodeSourceAdapter(MANIFEST) + adapter = LowCodeSourceAdapter(MANIFEST, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) adapter._source = mock_source actual_messages = list(adapter.read_stream("hashiras", {})) @@ -309,4 +341,14 @@ def test_read_streams_invalid_reference(): } with pytest.raises(UndefinedReferenceException): - LowCodeSourceAdapter(invalid_reference_manifest) + LowCodeSourceAdapter(invalid_reference_manifest, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) + + +def test_stream_use_read_test_retriever_and_paginator(): + adapter = LowCodeSourceAdapter(MANIFEST_WITH_PAGINATOR, MAXIMUM_NUMBER_OF_PAGES_PER_SLICE, MAXIMUM_NUMBER_OF_SLICES) + streams = adapter.get_http_streams(config={}) + + assert streams + for stream in streams: + assert isinstance(stream, SimpleRetrieverTestReadDecorator) + assert isinstance(stream.paginator, PaginatorTestReadDecorator)