From 199a8078f2bbb1ef015b47111854ceb53183e68d Mon Sep 17 00:00:00 2001 From: Brian Lai <51336873+brianjlai@users.noreply.github.com> Date: Tue, 17 Sep 2024 14:06:41 -0400 Subject: [PATCH] [airbyte-cdk] Decouple request_options_provider from datetime_based_cursor + concurrent_cursor features for low-code (#45413) --- airbyte-cdk/python/airbyte_cdk/__init__.py | 2 + .../parsers/model_to_component_factory.py | 57 ++- .../requesters/request_options/__init__.py | 6 +- ...datetime_based_request_options_provider.py | 78 +++ .../default_request_options_provider.py | 58 +++ .../retrievers/simple_retriever.py | 39 +- .../sources/streams/concurrent/cursor.py | 8 +- .../datetime_stream_state_converter.py | 6 +- .../test_model_to_component_factory.py | 195 +++++++- ...datetime_based_request_options_provider.py | 120 +++++ .../retrievers/test_simple_retriever.py | 40 +- .../sources/streams/concurrent/test_cursor.py | 466 +++++++++++++++++- 12 files changed, 1030 insertions(+), 45 deletions(-) create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/datetime_based_request_options_provider.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/default_request_options_provider.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_datetime_based_request_options_provider.py diff --git a/airbyte-cdk/python/airbyte_cdk/__init__.py b/airbyte-cdk/python/airbyte_cdk/__init__.py index b76962c538cd..312983853b1d 100644 --- a/airbyte-cdk/python/airbyte_cdk/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/__init__.py @@ -47,6 +47,7 @@ from .sources.declarative.requesters.request_option import RequestOption, RequestOptionType +from .sources.declarative.requesters.request_options.default_request_options_provider import DefaultRequestOptionsProvider from .sources.declarative.requesters.request_options.interpolated_request_input_provider import InterpolatedRequestInputProvider from .sources.declarative.requesters.requester import HttpMethod from .sources.declarative.retrievers import SimpleRetriever @@ -133,6 +134,7 @@ "DeclarativeStream", "Decoder", "DefaultPaginator", + "DefaultRequestOptionsProvider", "DpathExtractor", "FieldPointer", "HttpMethod", diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 45ef5388c65d..7c64cb265853 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -140,7 +140,12 @@ StopConditionPaginationStrategyDecorator, ) from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType -from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider +from airbyte_cdk.sources.declarative.requesters.request_options import ( + DatetimeBasedRequestOptionsProvider, + DefaultRequestOptionsProvider, + InterpolatedRequestOptionsProvider, + RequestOptionsProvider, +) from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever, SimpleRetrieverTestReadDecorator @@ -653,6 +658,40 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi "per_partition_cursor": combined_slicers if isinstance(combined_slicers, PerPartitionCursor) else None, "is_global_substream_cursor": isinstance(combined_slicers, GlobalSubstreamCursor), } + + if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel): + cursor_model = model.incremental_sync + + end_time_option = ( + RequestOption( + inject_into=RequestOptionType(cursor_model.end_time_option.inject_into.value), + field_name=cursor_model.end_time_option.field_name, + parameters=cursor_model.parameters or {}, + ) + if cursor_model.end_time_option + else None + ) + start_time_option = ( + RequestOption( + inject_into=RequestOptionType(cursor_model.start_time_option.inject_into.value), + field_name=cursor_model.start_time_option.field_name, + parameters=cursor_model.parameters or {}, + ) + if cursor_model.start_time_option + else None + ) + + request_options_provider = DatetimeBasedRequestOptionsProvider( + start_time_option=start_time_option, + end_time_option=end_time_option, + partition_field_start=cursor_model.partition_field_end, + partition_field_end=cursor_model.partition_field_end, + config=config, + parameters=model.parameters or {}, + ) + else: + request_options_provider = None + transformations = [] if model.transformations: for transformation_model in model.transformations: @@ -663,6 +702,7 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi name=model.name, primary_key=primary_key, stream_slicer=combined_slicers, + request_options_provider=request_options_provider, stop_condition_on_cursor=stop_condition_on_cursor, client_side_incremental_sync=client_side_incremental_sync, transformations=transformations, @@ -1126,6 +1166,7 @@ def create_simple_retriever( name: str, primary_key: Optional[Union[str, List[str], List[List[str]]]], stream_slicer: Optional[StreamSlicer], + request_options_provider: Optional[RequestOptionsProvider] = None, stop_condition_on_cursor: bool = False, client_side_incremental_sync: Optional[Dict[str, Any]] = None, transformations: List[RecordTransformation], @@ -1140,11 +1181,21 @@ def create_simple_retriever( client_side_incremental_sync=client_side_incremental_sync, ) url_base = model.requester.url_base if hasattr(model.requester, "url_base") else requester.get_url_base() - stream_slicer = stream_slicer or SinglePartitionRouter(parameters={}) # Define cursor only if per partition or common incremental support is needed cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None + if not isinstance(stream_slicer, DatetimeBasedCursor) or type(stream_slicer) is not DatetimeBasedCursor: + # Many of the custom component implementations of DatetimeBasedCursor override get_request_params() (or other methods). + # Because we're decoupling RequestOptionsProvider from the Cursor, custom components will eventually need to reimplement + # their own RequestOptionsProvider. However, right now the existing StreamSlicer/Cursor still can act as the SimpleRetriever's + # request_options_provider + request_options_provider = stream_slicer or DefaultRequestOptionsProvider(parameters={}) + elif not request_options_provider: + request_options_provider = DefaultRequestOptionsProvider(parameters={}) + + stream_slicer = stream_slicer or SinglePartitionRouter(parameters={}) + cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None paginator = ( self._create_component_from_model( @@ -1168,6 +1219,7 @@ def create_simple_retriever( requester=requester, record_selector=record_selector, stream_slicer=stream_slicer, + request_option_provider=request_options_provider, cursor=cursor, config=config, maximum_number_of_slices=self._limit_slices_fetched or 5, @@ -1181,6 +1233,7 @@ def create_simple_retriever( requester=requester, record_selector=record_selector, stream_slicer=stream_slicer, + request_option_provider=request_options_provider, cursor=cursor, config=config, ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/__init__.py index 6a13ed7ab425..c6540e939ed6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/__init__.py @@ -2,9 +2,13 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from airbyte_cdk.sources.declarative.requesters.request_options.datetime_based_request_options_provider import ( + DatetimeBasedRequestOptionsProvider, +) +from airbyte_cdk.sources.declarative.requesters.request_options.default_request_options_provider import DefaultRequestOptionsProvider from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider -__all__ = ["InterpolatedRequestOptionsProvider", "RequestOptionsProvider"] +__all__ = ["DatetimeBasedRequestOptionsProvider", "DefaultRequestOptionsProvider", "InterpolatedRequestOptionsProvider", "RequestOptionsProvider"] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/datetime_based_request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/datetime_based_request_options_provider.py new file mode 100644 index 000000000000..d9e86afcffb5 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/datetime_based_request_options_provider.py @@ -0,0 +1,78 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from dataclasses import InitVar, dataclass +from typing import Any, Mapping, MutableMapping, Optional, Union + +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType +from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState + + +@dataclass +class DatetimeBasedRequestOptionsProvider(RequestOptionsProvider): + """ + Request options provider that extracts fields from the stream_slice and injects them into the respective location in the + outbound request being made + """ + + config: Config + parameters: InitVar[Mapping[str, Any]] + start_time_option: Optional[RequestOption] = None + end_time_option: Optional[RequestOption] = None + partition_field_start: Optional[str] = None + partition_field_end: Optional[str] = None + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._partition_field_start = InterpolatedString.create(self.partition_field_start or "start_time", parameters=parameters) + self._partition_field_end = InterpolatedString.create(self.partition_field_end or "end_time", parameters=parameters) + + def get_request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.request_parameter, stream_slice) + + def get_request_headers( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.header, stream_slice) + + def get_request_body_data( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Union[Mapping[str, Any], str]: + return self._get_request_options(RequestOptionType.body_data, stream_slice) + + def get_request_body_json( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.body_json, stream_slice) + + def _get_request_options(self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]) -> Mapping[str, Any]: + options: MutableMapping[str, Any] = {} + if not stream_slice: + return options + if self.start_time_option and self.start_time_option.inject_into == option_type: + options[self.start_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore # field_name is always casted to an interpolated string + self._partition_field_start.eval(self.config) + ) + if self.end_time_option and self.end_time_option.inject_into == option_type: + options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get(self._partition_field_end.eval(self.config)) # type: ignore # field_name is always casted to an interpolated string + return options diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/default_request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/default_request_options_provider.py new file mode 100644 index 000000000000..42d8ee70a4b4 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/default_request_options_provider.py @@ -0,0 +1,58 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from dataclasses import InitVar, dataclass +from typing import Any, Mapping, Optional, Union + +from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider +from airbyte_cdk.sources.types import StreamSlice, StreamState + + +@dataclass +class DefaultRequestOptionsProvider(RequestOptionsProvider): + """ + Request options provider that extracts fields from the stream_slice and injects them into the respective location in the + outbound request being made + """ + + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + pass + + def get_request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return {} + + def get_request_headers( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return {} + + def get_request_body_data( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Union[Mapping[str, Any], str]: + return {} + + def get_request_body_json( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index ce0672988924..99639d8467ee 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -1,6 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # + import json from dataclasses import InitVar, dataclass, field from functools import partial @@ -16,6 +17,7 @@ from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import SinglePartitionRouter from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator +from airbyte_cdk.sources.declarative.requesters.request_options import DefaultRequestOptionsProvider, RequestOptionsProvider from airbyte_cdk.sources.declarative.requesters.requester import Requester from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer @@ -61,6 +63,7 @@ class SimpleRetriever(Retriever): _primary_key: str = field(init=False, repr=False, default="") paginator: Optional[Paginator] = None stream_slicer: StreamSlicer = field(default_factory=lambda: SinglePartitionRouter(parameters={})) + request_option_provider: RequestOptionsProvider = field(default_factory=lambda: DefaultRequestOptionsProvider(parameters={})) cursor: Optional[DeclarativeCursor] = None ignore_stream_slicer_parameters_on_paginated_requests: bool = False @@ -158,7 +161,7 @@ def _request_params( stream_slice, next_page_token, self._paginator.get_request_params, - self.stream_slicer.get_request_params, + self.request_option_provider.get_request_params, ) if isinstance(params, str): raise ValueError("Request params cannot be a string") @@ -184,7 +187,7 @@ def _request_body_data( stream_slice, next_page_token, self._paginator.get_request_body_data, - self.stream_slicer.get_request_body_data, + self.request_option_provider.get_request_body_data, ) def _request_body_json( @@ -203,7 +206,7 @@ def _request_body_json( stream_slice, next_page_token, self._paginator.get_request_body_json, - self.stream_slicer.get_request_body_json, + self.request_option_provider.get_request_body_json, ) if isinstance(body_json, str): raise ValueError("Request body json cannot be a string") @@ -231,21 +234,21 @@ def _parse_response( ) -> Iterable[Record]: if not response: self._last_response = None - return [] - - self._last_response = response - record_generator = self.record_selector.select_records( - response=response, - stream_state=stream_state, - records_schema=records_schema, - stream_slice=stream_slice, - next_page_token=next_page_token, - ) - self._last_page_size = 0 - for record in record_generator: - self._last_page_size += 1 - self._last_record = record - yield record + yield from [] + else: + self._last_response = response + record_generator = self.record_selector.select_records( + response=response, + stream_state=stream_state, + records_schema=records_schema, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + self._last_page_size = 0 + for record in record_generator: + self._last_page_size += 1 + self._last_record = record + yield record @property # type: ignore def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py index c4daf96557c8..3098de78cfe1 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -1,6 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # + import functools from abc import ABC, abstractmethod from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Protocol, Tuple @@ -143,6 +144,7 @@ def __init__( end_provider: Callable[[], CursorValueType], lookback_window: Optional[GapType] = None, slice_range: Optional[GapType] = None, + cursor_granularity: Optional[GapType] = None, ) -> None: self._stream_name = stream_name self._stream_namespace = stream_namespace @@ -159,6 +161,7 @@ def __init__( self.start, self._concurrent_state = self._get_concurrent_state(stream_state) self._lookback_window = lookback_window self._slice_range = slice_range + self._cursor_granularity = cursor_granularity @property def state(self) -> MutableMapping[str, Any]: @@ -312,7 +315,10 @@ def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType) current_lower_boundary = lower while not stop_processing: current_upper_boundary = min(current_lower_boundary + self._slice_range, upper) - yield current_lower_boundary, current_upper_boundary + if self._cursor_granularity: + yield current_lower_boundary, current_upper_boundary - self._cursor_granularity + else: + yield current_lower_boundary, current_upper_boundary current_lower_boundary = current_upper_boundary if current_upper_boundary >= upper: stop_processing = True diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py index 226ee79c0404..6e50d6b2bcf7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py @@ -142,8 +142,12 @@ class IsoMillisConcurrentStreamStateConverter(DateTimeStreamStateConverter): _zero_value = "0001-01-01T00:00:00.000Z" + def __init__(self, is_sequential_state: bool = True, cursor_granularity: Optional[timedelta] = None): + super().__init__(is_sequential_state=is_sequential_state) + self._cursor_granularity = cursor_granularity or timedelta(milliseconds=1) + def increment(self, timestamp: datetime) -> datetime: - return timestamp + timedelta(milliseconds=1) + return timestamp + self._cursor_granularity def output_format(self, timestamp: datetime) -> Any: return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index d574ed8724e8..85b90752f3e7 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -24,7 +24,7 @@ from airbyte_cdk.sources.declarative.decoders import JsonDecoder from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector from airbyte_cdk.sources.declarative.extractors.record_filter import ClientSideIncrementalRecordFilterDecorator -from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor, PerPartitionCursor, ResumableFullRefreshCursor +from airbyte_cdk.sources.declarative.incremental import CursorFactory, DatetimeBasedCursor, PerPartitionCursor, ResumableFullRefreshCursor from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import CheckStream as CheckStreamModel from airbyte_cdk.sources.declarative.models import CompositeErrorHandler as CompositeErrorHandlerModel @@ -70,7 +70,11 @@ StopConditionPaginationStrategyDecorator, ) from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType -from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider +from airbyte_cdk.sources.declarative.requesters.request_options import ( + DatetimeBasedRequestOptionsProvider, + DefaultRequestOptionsProvider, + InterpolatedRequestOptionsProvider, +) from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever, SimpleRetrieverTestReadDecorator @@ -180,6 +184,14 @@ def test_full_config_stream(): step: "P10D" cursor_field: "created" cursor_granularity: "PT0.000001S" + start_time_option: + type: RequestOption + inject_into: request_parameter + field_name: after + end_time_option: + type: RequestOption + inject_into: request_parameter + field_name: before $parameters: datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" check: @@ -260,6 +272,14 @@ def test_full_config_stream(): assert stream.retriever.requester._path.string == "{{ next_page_token['next_page_url'] }}" assert stream.retriever.requester._path.default == "{{ next_page_token['next_page_url'] }}" + assert isinstance(stream.retriever.request_option_provider, DatetimeBasedRequestOptionsProvider) + assert stream.retriever.request_option_provider.start_time_option.inject_into == RequestOptionType.request_parameter + assert stream.retriever.request_option_provider.start_time_option.field_name.eval(config=input_config) == "after" + assert stream.retriever.request_option_provider.end_time_option.inject_into == RequestOptionType.request_parameter + assert stream.retriever.request_option_provider.end_time_option.field_name.eval(config=input_config) == "before" + assert stream.retriever.request_option_provider._partition_field_start.string == "start_time" + assert stream.retriever.request_option_provider._partition_field_end.string == "end_time" + assert isinstance(stream.retriever.requester.authenticator, BearerAuthenticator) assert stream.retriever.requester.authenticator.token_provider.get_token() == "verysecrettoken" @@ -2314,3 +2334,174 @@ def test_create_jwt_authenticator(config, manifest, expected): } ) assert authenticator._get_jwt_payload() == jwt_payload + + +def test_use_request_options_provider_for_datetime_based_cursor(): + config = { + "start_time": "2024-01-01T00:00:00.000000+0000", + } + + simple_retriever_model = { + "type": "SimpleRetriever", + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": [], + }, + }, + "requester": {"type": "HttpRequester", "name": "list", "url_base": "orange.com", "path": "/v1/api"}, + } + + datetime_based_cursor = DatetimeBasedCursor( + start_datetime=MinMaxDatetime(datetime="{{ config.start_time }}", parameters={}), + step="P5D", + cursor_field="updated_at", + datetime_format="%Y-%m-%dT%H:%M:%S.%f%z", + cursor_granularity="PT1S", + is_compare_strictly=True, + config=config, + parameters={}, + ) + + datetime_based_request_options_provider = DatetimeBasedRequestOptionsProvider( + start_time_option=RequestOption( + inject_into=RequestOptionType.request_parameter, + field_name="after", + parameters={}, + ), + end_time_option=RequestOption( + inject_into=RequestOptionType.request_parameter, + field_name="before", + parameters={}, + ), + config=config, + parameters={}, + ) + + connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + retriever = connector_builder_factory.create_component( + model_type=SimpleRetrieverModel, + component_definition=simple_retriever_model, + config={}, + name="Test", + primary_key="id", + stream_slicer=datetime_based_cursor, + request_options_provider=datetime_based_request_options_provider, + transformations=[], + ) + + assert isinstance(retriever, SimpleRetriever) + assert retriever.primary_key == "id" + assert retriever.name == "Test" + + assert isinstance(retriever.cursor, DatetimeBasedCursor) + assert isinstance(retriever.stream_slicer, DatetimeBasedCursor) + + assert isinstance(retriever.request_option_provider, DatetimeBasedRequestOptionsProvider) + assert retriever.request_option_provider.start_time_option.inject_into == RequestOptionType.request_parameter + assert retriever.request_option_provider.start_time_option.field_name.eval(config=input_config) == "after" + assert retriever.request_option_provider.end_time_option.inject_into == RequestOptionType.request_parameter + assert retriever.request_option_provider.end_time_option.field_name.eval(config=input_config) == "before" + assert retriever.request_option_provider._partition_field_start.string == "start_time" + assert retriever.request_option_provider._partition_field_end.string == "end_time" + + +def test_do_not_separate_request_options_provider_for_non_datetime_based_cursor(): + # This test validates that we're only using the dedicated RequestOptionsProvider for DatetimeBasedCursor and using the + # existing StreamSlicer for other types of cursors and partition routing. Once everything is migrated this test can be deleted + + config = { + "start_time": "2024-01-01T00:00:00.000000+0000", + } + + simple_retriever_model = { + "type": "SimpleRetriever", + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": [], + }, + }, + "requester": {"type": "HttpRequester", "name": "list", "url_base": "orange.com", "path": "/v1/api"}, + } + + datetime_based_cursor = DatetimeBasedCursor( + start_datetime=MinMaxDatetime(datetime="{{ config.start_time }}", parameters={}), + step="P5D", + cursor_field="updated_at", + datetime_format="%Y-%m-%dT%H:%M:%S.%f%z", + cursor_granularity="PT1S", + is_compare_strictly=True, + config=config, + parameters={}, + ) + + list_partition_router = ListPartitionRouter( + cursor_field="id", + values=["four", "oh", "eight"], + config=config, + parameters={}, + ) + + per_partition_cursor = PerPartitionCursor( + cursor_factory=CursorFactory(lambda: datetime_based_cursor), + partition_router=list_partition_router, + ) + + connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + retriever = connector_builder_factory.create_component( + model_type=SimpleRetrieverModel, + component_definition=simple_retriever_model, + config={}, + name="Test", + primary_key="id", + stream_slicer=per_partition_cursor, + request_options_provider=None, + transformations=[], + ) + + assert isinstance(retriever, SimpleRetriever) + assert retriever.primary_key == "id" + assert retriever.name == "Test" + + assert isinstance(retriever.cursor, PerPartitionCursor) + assert isinstance(retriever.stream_slicer, PerPartitionCursor) + + assert isinstance(retriever.request_option_provider, PerPartitionCursor) + assert isinstance(retriever.request_option_provider._cursor_factory, CursorFactory) + assert retriever.request_option_provider._partition_router == list_partition_router + + +def test_use_default_request_options_provider(): + simple_retriever_model = { + "type": "SimpleRetriever", + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": [], + }, + }, + "requester": {"type": "HttpRequester", "name": "list", "url_base": "orange.com", "path": "/v1/api"}, + } + + connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + retriever = connector_builder_factory.create_component( + model_type=SimpleRetrieverModel, + component_definition=simple_retriever_model, + config={}, + name="Test", + primary_key="id", + stream_slicer=None, + request_options_provider=None, + transformations=[], + ) + + assert isinstance(retriever, SimpleRetriever) + assert retriever.primary_key == "id" + assert retriever.name == "Test" + + assert isinstance(retriever.stream_slicer, SinglePartitionRouter) + assert isinstance(retriever.request_option_provider, DefaultRequestOptionsProvider) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_datetime_based_request_options_provider.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_datetime_based_request_options_provider.py new file mode 100644 index 000000000000..816f83f94f8a --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/request_options/test_datetime_based_request_options_provider.py @@ -0,0 +1,120 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import pytest +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType +from airbyte_cdk.sources.declarative.requesters.request_options import DatetimeBasedRequestOptionsProvider +from airbyte_cdk.sources.declarative.types import StreamSlice + + +@pytest.mark.parametrize( + "start_time_option, end_time_option, partition_field_start, partition_field_end, stream_slice, expected_request_options", + [ + pytest.param( + RequestOption(field_name="after", inject_into=RequestOptionType.request_parameter, parameters={}), + RequestOption(field_name="before", inject_into=RequestOptionType.request_parameter, parameters={}), + "custom_start", + "custom_end", + StreamSlice(cursor_slice={"custom_start": "2024-06-01", "custom_end": "2024-06-02"}, partition={}), + {"after": "2024-06-01", "before": "2024-06-02"}, + id="test_request_params", + ), + pytest.param( + RequestOption(field_name="after", inject_into=RequestOptionType.request_parameter, parameters={}), + RequestOption(field_name="before", inject_into=RequestOptionType.request_parameter, parameters={}), + None, + None, + StreamSlice(cursor_slice={"start_time": "2024-06-01", "end_time": "2024-06-02"}, partition={}), + {"after": "2024-06-01", "before": "2024-06-02"}, + id="test_request_params_with_default_partition_fields", + ), + pytest.param( + None, + RequestOption(field_name="before", inject_into=RequestOptionType.request_parameter, parameters={}), + None, + None, + StreamSlice(cursor_slice={"start_time": "2024-06-01", "end_time": "2024-06-02"}, partition={}), + {"before": "2024-06-02"}, + id="test_request_params_no_start_time_option", + ), + pytest.param( + RequestOption(field_name="after", inject_into=RequestOptionType.request_parameter, parameters={}), + None, + None, + None, + StreamSlice(cursor_slice={"start_time": "2024-06-01", "end_time": "2024-06-02"}, partition={}), + {"after": "2024-06-01"}, + id="test_request_params_no_end_time_option", + ), + pytest.param( + RequestOption(field_name="after", inject_into=RequestOptionType.request_parameter, parameters={}), + RequestOption(field_name="before", inject_into=RequestOptionType.request_parameter, parameters={}), + None, + None, + None, + {}, + id="test_request_params_no_slice", + ), + pytest.param( + RequestOption(field_name="after", inject_into=RequestOptionType.header, parameters={}), + RequestOption(field_name="before", inject_into=RequestOptionType.header, parameters={}), + "custom_start", + "custom_end", + StreamSlice(cursor_slice={"custom_start": "2024-06-01", "custom_end": "2024-06-02"}, partition={}), + {"after": "2024-06-01", "before": "2024-06-02"}, + id="test_request_headers", + ), + pytest.param( + RequestOption(field_name="after", inject_into=RequestOptionType.body_data, parameters={}), + RequestOption(field_name="before", inject_into=RequestOptionType.body_data, parameters={}), + "custom_start", + "custom_end", + StreamSlice(cursor_slice={"custom_start": "2024-06-01", "custom_end": "2024-06-02"}, partition={}), + {"after": "2024-06-01", "before": "2024-06-02"}, + id="test_request_request_body_data", + ), + pytest.param( + RequestOption(field_name="after", inject_into=RequestOptionType.body_json, parameters={}), + RequestOption(field_name="before", inject_into=RequestOptionType.body_json, parameters={}), + "custom_start", + "custom_end", + StreamSlice(cursor_slice={"custom_start": "2024-06-01", "custom_end": "2024-06-02"}, partition={}), + {"after": "2024-06-01", "before": "2024-06-02"}, + id="test_request_request_body_json", + ), + ], +) +def test_datetime_based_request_options_provider( + start_time_option, + end_time_option, + partition_field_start, + partition_field_end, + stream_slice, + expected_request_options +): + config = {} + request_options_provider = DatetimeBasedRequestOptionsProvider( + start_time_option=start_time_option, + end_time_option=end_time_option, + partition_field_start=partition_field_start, + partition_field_end=partition_field_end, + config=config, + parameters={} + ) + + request_option_type = start_time_option.inject_into if isinstance(start_time_option, RequestOption) else None + match request_option_type: + case RequestOptionType.request_parameter: + actual_request_options = request_options_provider.get_request_params(stream_slice=stream_slice) + case RequestOptionType.header: + actual_request_options = request_options_provider.get_request_headers(stream_slice=stream_slice) + case RequestOptionType.body_data: + actual_request_options = request_options_provider.get_request_body_data(stream_slice=stream_slice) + case RequestOptionType.body_json: + actual_request_options = request_options_provider.get_request_body_json(stream_slice=stream_slice) + case _: + # We defer to testing the default RequestOptions using get_request_params() + actual_request_options = request_options_provider.get_request_params(stream_slice=stream_slice) + + assert actual_request_options == expected_request_options diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index fd3db0452f04..2fd0594bf1b0 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -19,10 +19,10 @@ from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever, SimpleRetrieverTestReadDecorator -from airbyte_cdk.sources.types import Record +from airbyte_cdk.sources.types import Record, StreamSlice A_SLICE_STATE = {"slice_state": "slice state value"} -A_STREAM_SLICE = {"stream slice": "slice value"} +A_STREAM_SLICE = StreamSlice(cursor_slice={"stream slice": "slice value"}, partition={}) A_STREAM_STATE = {"stream state": "state value"} primary_key = "pk" @@ -370,7 +370,7 @@ def test_simple_retriever_resumable_full_refresh_cursor_reset_skip_completed_str @pytest.mark.parametrize( - "test_name, paginator_mapping, stream_slicer_mapping, expected_mapping", + "test_name, paginator_mapping, request_options_provider_mapping, expected_mapping", [ ("test_empty_headers", {}, {}, {}), ("test_header_from_pagination_and_slicer", {"offset": 1000}, {"key": "value"}, {"key": "value", "offset": 1000}), @@ -378,17 +378,17 @@ def test_simple_retriever_resumable_full_refresh_cursor_reset_skip_completed_str ("test_duplicate_header_slicer_paginator", {"k": "v"}, {"k": "slice_value"}, None), ], ) -def test_get_request_options_from_pagination(test_name, paginator_mapping, stream_slicer_mapping, expected_mapping): +def test_get_request_options_from_pagination(test_name, paginator_mapping, request_options_provider_mapping, expected_mapping): # This test does not test request headers because they must be strings paginator = MagicMock() paginator.get_request_params.return_value = paginator_mapping paginator.get_request_body_data.return_value = paginator_mapping paginator.get_request_body_json.return_value = paginator_mapping - stream_slicer = MagicMock() - stream_slicer.get_request_params.return_value = stream_slicer_mapping - stream_slicer.get_request_body_data.return_value = stream_slicer_mapping - stream_slicer.get_request_body_json.return_value = stream_slicer_mapping + request_options_provider = MagicMock() + request_options_provider.get_request_params.return_value = request_options_provider_mapping + request_options_provider.get_request_body_data.return_value = request_options_provider_mapping + request_options_provider.get_request_body_json.return_value = request_options_provider_mapping record_selector = MagicMock() retriever = SimpleRetriever( @@ -397,7 +397,7 @@ def test_get_request_options_from_pagination(test_name, paginator_mapping, strea requester=MagicMock(), record_selector=record_selector, paginator=paginator, - stream_slicer=stream_slicer, + request_option_provider=request_options_provider, parameters={}, config={}, ) @@ -532,7 +532,7 @@ def test_ignore_stream_slicer_parameters_on_paginated_requests( @pytest.mark.parametrize( - "test_name, slicer_body_data, paginator_body_data, expected_body_data", + "test_name, request_options_provider_body_data, paginator_body_data, expected_body_data", [ ("test_only_slicer_mapping", {"key": "value"}, {}, {"key": "value"}), ("test_only_slicer_string", "key=value", {}, "key=value"), @@ -541,13 +541,15 @@ def test_ignore_stream_slicer_parameters_on_paginated_requests( ("test_slicer_string_and_paginator", "key=value", {"offset": 1000}, None), ], ) -def test_request_body_data(test_name, slicer_body_data, paginator_body_data, expected_body_data): +def test_request_body_data(test_name, request_options_provider_body_data, paginator_body_data, expected_body_data): paginator = MagicMock() paginator.get_request_body_data.return_value = paginator_body_data requester = MagicMock(use_cache=False) - stream_slicer = MagicMock() - stream_slicer.get_request_body_data.return_value = slicer_body_data + # stream_slicer = MagicMock() + # stream_slicer.get_request_body_data.return_value = request_options_provider_body_data + request_option_provider = MagicMock() + request_option_provider.get_request_body_data.return_value = request_options_provider_body_data record_selector = MagicMock() retriever = SimpleRetriever( @@ -556,7 +558,7 @@ def test_request_body_data(test_name, slicer_body_data, paginator_body_data, exp requester=requester, record_selector=record_selector, paginator=paginator, - stream_slicer=stream_slicer, + request_option_provider=request_option_provider, parameters={}, config={}, ) @@ -630,8 +632,8 @@ def test_limit_stream_slices(): ], ) def test_when_read_records_then_cursor_close_slice_with_greater_record(test_name, first_greater_than_second): - first_record = Record({"first": 1}, {}) - second_record = Record({"second": 2}, {}) + first_record = Record({"first": 1}, StreamSlice(cursor_slice={}, partition={})) + second_record = Record({"second": 2}, StreamSlice(cursor_slice={}, partition={})) records = [first_record, second_record] record_selector = MagicMock() record_selector.select_records.return_value = records @@ -651,7 +653,7 @@ def test_when_read_records_then_cursor_close_slice_with_greater_record(test_name parameters={}, config={}, ) - stream_slice = {"repository": "airbyte"} + stream_slice = StreamSlice(cursor_slice={}, partition={"repository": "airbyte"}) def retriever_read_pages(_, __, ___): return retriever._parse_records(response=MagicMock(), stream_state={}, stream_slice=stream_slice, records_schema={}) @@ -683,7 +685,7 @@ def test_given_stream_data_is_not_record_when_read_records_then_update_slice_wit parameters={}, config={}, ) - stream_slice = {"repository": "airbyte"} + stream_slice = StreamSlice(cursor_slice={}, partition={"repository": "airbyte"}) def retriever_read_pages(_, __, ___): return retriever._parse_records(response=MagicMock(), stream_state={}, stream_slice=stream_slice, records_schema={}) @@ -754,7 +756,7 @@ def test_emit_log_request_response_messages(mocker): config={}, ) - retriever._fetch_next_page(stream_state={}, stream_slice={}) + retriever._fetch_next_page(stream_state={}, stream_slice=StreamSlice(cursor_slice={}, partition={})) assert requester.send_request.call_args_list[0][1]["log_formatter"] is not None assert requester.send_request.call_args_list[0][1]["log_formatter"](response) == format_http_message_mock.return_value diff --git a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_cursor.py b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_cursor.py index 3f511c7b51da..5d7d318ef5d5 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_cursor.py @@ -1,7 +1,9 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # + from datetime import datetime, timedelta, timezone +from functools import partial from typing import Any, Mapping, Optional from unittest import TestCase from unittest.mock import Mock @@ -9,12 +11,18 @@ import freezegun import pytest from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager +from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime +from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField, CursorValueType from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition from airbyte_cdk.sources.streams.concurrent.partitions.record import Record from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ConcurrencyCompatibleStateType -from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import EpochValueConcurrentStreamStateConverter +from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( + EpochValueConcurrentStreamStateConverter, + IsoMillisConcurrentStreamStateConverter, +) +from isodate import parse_duration _A_STREAM_NAME = "a stream name" _A_STREAM_NAMESPACE = "a stream namespace" @@ -422,3 +430,459 @@ def test_given_start_is_before_first_slice_lower_boundary_when_generate_slices_t (datetime.fromtimestamp(0, timezone.utc), datetime.fromtimestamp(10, timezone.utc)), (datetime.fromtimestamp(20, timezone.utc), datetime.fromtimestamp(50, timezone.utc)), ] + + +@freezegun.freeze_time(time_to_freeze=datetime(2024, 4, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) +@pytest.mark.parametrize( + "start_datetime,end_datetime,step,cursor_field,lookback_window,state,expected_slices", + [ + pytest.param( + "{{ config.start_time }}", + "{{ config.end_time or now_utc() }}", + "P10D", + "updated_at", + "P5D", + {}, + [ + (datetime(2024, 1, 1, 0, 0, tzinfo=timezone.utc), datetime(2024, 1, 10, 23, 59, 59, tzinfo=timezone.utc)), + (datetime(2024, 1, 11, 0, 0, tzinfo=timezone.utc), datetime(2024, 1, 20, 23, 59, 59, tzinfo=timezone.utc)), + (datetime(2024, 1, 21, 0, 0, tzinfo=timezone.utc), datetime(2024, 1, 30, 23, 59, 59, tzinfo=timezone.utc)), + (datetime(2024, 1, 31, 0, 0, tzinfo=timezone.utc), datetime(2024, 2, 9, 23, 59, 59, tzinfo=timezone.utc)), + (datetime(2024, 2, 10, 0, 0, tzinfo=timezone.utc), datetime(2024, 2, 19, 23, 59, 59, tzinfo=timezone.utc)), + (datetime(2024, 2, 20, 0, 0, tzinfo=timezone.utc), datetime(2024, 2, 29, 23, 59, 59, tzinfo=timezone.utc)) + ], + id="test_datetime_based_cursor_all_fields", + ), + pytest.param( + "{{ config.start_time }}", + "{{ config.end_time or '2024-01-01T00:00:00.000000+0000' }}", + "P10D", + "updated_at", + "P5D", + { + "slices": [ + { + "start": "2024-01-01T00:00:00.000000+0000", + "end": "2024-02-10T00:00:00.000000+0000", + } + ], + "state_type": "date-range" + }, + [ + (datetime(2024, 2, 5, 0, 0, 0, tzinfo=timezone.utc), datetime(2024, 2, 14, 23, 59, 59, tzinfo=timezone.utc)), + (datetime(2024, 2, 15, 0, 0, 0, tzinfo=timezone.utc), datetime(2024, 2, 24, 23, 59, 59, tzinfo=timezone.utc)), + (datetime(2024, 2, 25, 0, 0, 0, tzinfo=timezone.utc), datetime(2024, 2, 29, 23, 59, 59, tzinfo=timezone.utc)) + ], + id="test_datetime_based_cursor_with_state", + ), + pytest.param( + "{{ config.start_time }}", + "{{ config.missing or now_utc().strftime('%Y-%m-%dT%H:%M:%S.%fZ') }}", + "P20D", + "updated_at", + "P1D", + { + "slices": [ + { + "start": "2024-01-01T00:00:00.000000+0000", + "end": "2024-01-21T00:00:00.000000+0000", + } + ], + "state_type": "date-range" + }, + [ + (datetime(2024, 1, 20, 0, 0, tzinfo=timezone.utc), datetime(2024, 2, 8, 23, 59, 59, tzinfo=timezone.utc)), + (datetime(2024, 2, 9, 0, 0, tzinfo=timezone.utc), datetime(2024, 2, 28, 23, 59, 59, tzinfo=timezone.utc)), + (datetime(2024, 2, 29, 0, 0, tzinfo=timezone.utc), datetime(2024, 3, 19, 23, 59, 59, tzinfo=timezone.utc)), + (datetime(2024, 3, 20, 0, 0, tzinfo=timezone.utc), datetime(2024, 3, 31, 23, 59, 59, tzinfo=timezone.utc)), + ], + id="test_datetime_based_cursor_with_state_and_end_date", + ), + pytest.param( + "{{ config.start_time }}", + "{{ config.end_time }}", + "P1M", + "updated_at", + "P5D", + {}, + [ + (datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc), datetime(2024, 1, 31, 23, 59, 59, tzinfo=timezone.utc)), + (datetime(2024, 2, 1, 0, 0, 0, tzinfo=timezone.utc), datetime(2024, 2, 29, 23, 59, 59, tzinfo=timezone.utc)), + ], + id="test_datetime_based_cursor_using_large_step_duration", + ), + ] +) +def test_generate_slices_concurrent_cursor_from_datetime_based_cursor( + start_datetime, + end_datetime, + step, + cursor_field, + lookback_window, + state, + expected_slices, +): + message_repository = Mock(spec=MessageRepository) + state_manager = Mock(spec=ConnectorStateManager) + + config = { + "start_time": "2024-01-01T00:00:00.000000+0000", + "end_time": "2024-03-01T00:00:00.000000+0000", + } + + datetime_based_cursor = DatetimeBasedCursor( + start_datetime=MinMaxDatetime(datetime=start_datetime, parameters={}), + end_datetime=MinMaxDatetime(datetime=end_datetime, parameters={}), + step=step, + cursor_field=cursor_field, + partition_field_start="start", + partition_field_end="end", + datetime_format="%Y-%m-%dT%H:%M:%S.%f%z", + cursor_granularity="PT1S", + lookback_window=lookback_window, + is_compare_strictly=True, + config=config, + parameters={}, + ) + + # I don't love that we're back to this inching close to interpolation at parse time instead of runtime + # We also might need to add a wrapped class that exposes these fields publicly or live with ugly private access + interpolated_state_date = datetime_based_cursor._start_datetime + start_date = interpolated_state_date.get_datetime(config=config) + + interpolated_end_date = datetime_based_cursor._end_datetime + interpolated_end_date_provider = partial(interpolated_end_date.get_datetime, config) + + interpolated_cursor_field = datetime_based_cursor.cursor_field + cursor_field = CursorField(cursor_field_key=interpolated_cursor_field.eval(config=config)) + + lower_slice_boundary = datetime_based_cursor._partition_field_start.eval(config=config) + upper_slice_boundary = datetime_based_cursor._partition_field_end.eval(config=config) + slice_boundary_fields = (lower_slice_boundary, upper_slice_boundary) + + # DatetimeBasedCursor returns an isodate.Duration if step uses month or year precision. This still works in our + # code, but mypy may complain when we actually implement this in the concurrent low-code source. To fix this, we + # may need to convert a Duration to timedelta by multiplying month by 30 (but could lose precision). + step_length = datetime_based_cursor._step + + lookback_window = parse_duration(datetime_based_cursor.lookback_window) if datetime_based_cursor.lookback_window else None + + cursor_granularity = parse_duration(datetime_based_cursor.cursor_granularity) if datetime_based_cursor.cursor_granularity else None + + cursor = ConcurrentCursor( + stream_name=_A_STREAM_NAME, + stream_namespace=_A_STREAM_NAMESPACE, + stream_state=state, + message_repository=message_repository, + connector_state_manager=state_manager, + connector_state_converter=IsoMillisConcurrentStreamStateConverter(is_sequential_state=True), + cursor_field=cursor_field, + slice_boundary_fields=slice_boundary_fields, + start=start_date, + end_provider=interpolated_end_date_provider, + lookback_window=lookback_window, + slice_range=step_length, + cursor_granularity=cursor_granularity, + ) + + actual_slices = list(cursor.generate_slices()) + assert actual_slices == expected_slices + + +@freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) +def test_observe_concurrent_cursor_from_datetime_based_cursor(): + message_repository = Mock(spec=MessageRepository) + state_manager = Mock(spec=ConnectorStateManager) + + config = { + "start_time": "2024-08-01T00:00:00.000000+0000", + "dynamic_cursor_key": "updated_at" + } + + datetime_based_cursor = DatetimeBasedCursor( + start_datetime=MinMaxDatetime(datetime="{{ config.start_time }}", parameters={}), + cursor_field="{{ config.dynamic_cursor_key }}", + datetime_format="%Y-%m-%dT%H:%M:%S.%f%z", + config=config, + parameters={}, + ) + + interpolated_state_date = datetime_based_cursor._start_datetime + start_date = interpolated_state_date.get_datetime(config=config) + + interpolated_cursor_field = datetime_based_cursor.cursor_field + cursor_field = CursorField(cursor_field_key=interpolated_cursor_field.eval(config=config)) + + step_length = datetime_based_cursor._step + + concurrent_cursor = ConcurrentCursor( + stream_name="gods", + stream_namespace=_A_STREAM_NAMESPACE, + stream_state={}, + message_repository=message_repository, + connector_state_manager=state_manager, + connector_state_converter=IsoMillisConcurrentStreamStateConverter(is_sequential_state=True), + cursor_field=cursor_field, + slice_boundary_fields=None, + start=start_date, + end_provider=IsoMillisConcurrentStreamStateConverter.get_end_provider(), + slice_range=step_length, + ) + + record_1 = Record( + stream_name="gods", data={"id": "999", "updated_at": "2024-08-23T00:00:00.000000+0000", "name": "kratos", "mythology": "greek"}, + ) + record_2 = Record( + stream_name="gods", data={"id": "1000", "updated_at": "2024-08-22T00:00:00.000000+0000", "name": "odin", "mythology": "norse"}, + ) + record_3 = Record( + stream_name="gods", data={"id": "500", "updated_at": "2024-08-24T00:00:00.000000+0000", "name": "freya", "mythology": "norse"}, + ) + + concurrent_cursor.observe(record_1) + actual_most_recent_record = concurrent_cursor._most_recent_record + assert actual_most_recent_record == record_1 + + concurrent_cursor.observe(record_2) + actual_most_recent_record = concurrent_cursor._most_recent_record + assert actual_most_recent_record == record_1 + + concurrent_cursor.observe(record_3) + actual_most_recent_record = concurrent_cursor._most_recent_record + assert actual_most_recent_record == record_3 + + +@freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) +def test_close_partition_concurrent_cursor_from_datetime_based_cursor(): + message_repository = Mock(spec=MessageRepository) + state_manager = Mock(spec=ConnectorStateManager) + + config = { + "start_time": "2024-08-01T00:00:00.000000+0000", + "dynamic_cursor_key": "updated_at" + } + + datetime_based_cursor = DatetimeBasedCursor( + start_datetime=MinMaxDatetime(datetime="{{ config.start_time }}", parameters={}), + cursor_field="{{ config.dynamic_cursor_key }}", + datetime_format="%Y-%m-%dT%H:%M:%S.%f%z", + config=config, + parameters={}, + ) + + interpolated_state_date = datetime_based_cursor._start_datetime + start_date = interpolated_state_date.get_datetime(config=config) + + interpolated_cursor_field = datetime_based_cursor.cursor_field + cursor_field = CursorField(cursor_field_key=interpolated_cursor_field.eval(config=config)) + + step_length = datetime_based_cursor._step + + concurrent_cursor = ConcurrentCursor( + stream_name="gods", + stream_namespace=_A_STREAM_NAMESPACE, + stream_state={}, + message_repository=message_repository, + connector_state_manager=state_manager, + connector_state_converter=IsoMillisConcurrentStreamStateConverter(is_sequential_state=False), + cursor_field=cursor_field, + slice_boundary_fields=None, + start=start_date, + end_provider=IsoMillisConcurrentStreamStateConverter.get_end_provider(), + slice_range=step_length, + ) + + partition = _partition( + {_LOWER_SLICE_BOUNDARY_FIELD: "2024-08-01T00:00:00.000000+0000", _UPPER_SLICE_BOUNDARY_FIELD: "2024-09-01T00:00:00.000000+0000"}, + ) + + record_1 = Record( + stream_name="gods", data={"id": "999", "updated_at": "2024-08-23T00:00:00.000000+0000", "name": "kratos", "mythology": "greek"}, + ) + concurrent_cursor.observe(record_1) + + concurrent_cursor.close_partition(partition) + + message_repository.emit_message.assert_called_once_with(state_manager.create_state_message.return_value) + state_manager.update_state_for_stream.assert_called_once_with( + "gods", + _A_STREAM_NAMESPACE, + { + "slices": [{"end": "2024-08-23T00:00:00.000Z", "start": "2024-08-01T00:00:00.000Z"}], + "state_type": "date-range" + }, + ) + + +@freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) +def test_close_partition_with_slice_range_concurrent_cursor_from_datetime_based_cursor(): + message_repository = Mock(spec=MessageRepository) + state_manager = Mock(spec=ConnectorStateManager) + + config = { + "start_time": "2024-07-01T00:00:00.000000+0000", + "dynamic_cursor_key": "updated_at" + } + + datetime_based_cursor = DatetimeBasedCursor( + start_datetime=MinMaxDatetime(datetime="{{ config.start_time }}", parameters={}), + cursor_field="{{ config.dynamic_cursor_key }}", + datetime_format="%Y-%m-%dT%H:%M:%S.%f%z", + step="P15D", + cursor_granularity="P1D", + config=config, + parameters={}, + ) + + interpolated_state_date = datetime_based_cursor._start_datetime + start_date = interpolated_state_date.get_datetime(config=config) + + interpolated_cursor_field = datetime_based_cursor.cursor_field + cursor_field = CursorField(cursor_field_key=interpolated_cursor_field.eval(config=config)) + + lower_slice_boundary = datetime_based_cursor._partition_field_start.eval(config=config) + upper_slice_boundary = datetime_based_cursor._partition_field_end.eval(config=config) + slice_boundary_fields = (lower_slice_boundary, upper_slice_boundary) + + step_length = datetime_based_cursor._step + + concurrent_cursor = ConcurrentCursor( + stream_name="gods", + stream_namespace=_A_STREAM_NAMESPACE, + stream_state={}, + message_repository=message_repository, + connector_state_manager=state_manager, + connector_state_converter=IsoMillisConcurrentStreamStateConverter(is_sequential_state=False, cursor_granularity=None), + cursor_field=cursor_field, + slice_boundary_fields=slice_boundary_fields, + start=start_date, + slice_range=step_length, + cursor_granularity=None, + end_provider=IsoMillisConcurrentStreamStateConverter.get_end_provider(), + ) + + partition_0 = _partition( + {"start_time": "2024-07-01T00:00:00.000000+0000", "end_time": "2024-07-16T00:00:00.000000+0000"}, + ) + partition_3 = _partition( + {"start_time": "2024-08-15T00:00:00.000000+0000", "end_time": "2024-08-30T00:00:00.000000+0000"}, + ) + record_1 = Record( + stream_name="gods", data={"id": "1000", "updated_at": "2024-07-05T00:00:00.000000+0000", "name": "loki", "mythology": "norse"}, + ) + record_2 = Record( + stream_name="gods", data={"id": "999", "updated_at": "2024-08-20T00:00:00.000000+0000", "name": "kratos", "mythology": "greek"}, + ) + + concurrent_cursor.observe(record_1) + concurrent_cursor.close_partition(partition_0) + concurrent_cursor.observe(record_2) + concurrent_cursor.close_partition(partition_3) + + message_repository.emit_message.assert_called_with(state_manager.create_state_message.return_value) + assert message_repository.emit_message.call_count == 2 + state_manager.update_state_for_stream.assert_called_with( + "gods", + _A_STREAM_NAMESPACE, + { + "slices": [ + {"start": "2024-07-01T00:00:00.000Z", "end": "2024-07-16T00:00:00.000Z"}, + {"start": "2024-08-15T00:00:00.000Z", "end": "2024-08-30T00:00:00.000Z"} + + ], + "state_type": "date-range" + }, + ) + assert state_manager.update_state_for_stream.call_count == 2 + + +@freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) +def test_close_partition_with_slice_range_granularity_concurrent_cursor_from_datetime_based_cursor(): + message_repository = Mock(spec=MessageRepository) + state_manager = Mock(spec=ConnectorStateManager) + + config = { + "start_time": "2024-07-01T00:00:00.000000+0000", + "dynamic_cursor_key": "updated_at" + } + + datetime_based_cursor = DatetimeBasedCursor( + start_datetime=MinMaxDatetime(datetime="{{ config.start_time }}", parameters={}), + cursor_field="{{ config.dynamic_cursor_key }}", + datetime_format="%Y-%m-%dT%H:%M:%S.%f%z", + step="P15D", + cursor_granularity="P1D", + config=config, + parameters={}, + ) + + interpolated_state_date = datetime_based_cursor._start_datetime + start_date = interpolated_state_date.get_datetime(config=config) + + interpolated_cursor_field = datetime_based_cursor.cursor_field + cursor_field = CursorField(cursor_field_key=interpolated_cursor_field.eval(config=config)) + + lower_slice_boundary = datetime_based_cursor._partition_field_start.eval(config=config) + upper_slice_boundary = datetime_based_cursor._partition_field_end.eval(config=config) + slice_boundary_fields = (lower_slice_boundary, upper_slice_boundary) + + step_length = datetime_based_cursor._step + + cursor_granularity = parse_duration(datetime_based_cursor.cursor_granularity) if datetime_based_cursor.cursor_granularity else None + + concurrent_cursor = ConcurrentCursor( + stream_name="gods", + stream_namespace=_A_STREAM_NAMESPACE, + stream_state={}, + message_repository=message_repository, + connector_state_manager=state_manager, + connector_state_converter=IsoMillisConcurrentStreamStateConverter(is_sequential_state=False, cursor_granularity=cursor_granularity), + cursor_field=cursor_field, + slice_boundary_fields=slice_boundary_fields, + start=start_date, + slice_range=step_length, + cursor_granularity=cursor_granularity, + end_provider=IsoMillisConcurrentStreamStateConverter.get_end_provider(), + ) + + partition_0 = _partition( + {"start_time": "2024-07-01T00:00:00.000000+0000", "end_time": "2024-07-15T00:00:00.000000+0000"}, + ) + partition_1 = _partition( + {"start_time": "2024-07-16T00:00:00.000000+0000", "end_time": "2024-07-31T00:00:00.000000+0000"}, + ) + partition_3 = _partition( + {"start_time": "2024-08-15T00:00:00.000000+0000", "end_time": "2024-08-29T00:00:00.000000+0000"}, + ) + record_1 = Record( + stream_name="gods", data={"id": "1000", "updated_at": "2024-07-05T00:00:00.000000+0000", "name": "loki", "mythology": "norse"}, + ) + record_2 = Record( + stream_name="gods", data={"id": "2000", "updated_at": "2024-07-25T00:00:00.000000+0000", "name": "freya", "mythology": "norse"}, + ) + record_3 = Record( + stream_name="gods", data={"id": "999", "updated_at": "2024-08-20T00:00:00.000000+0000", "name": "kratos", "mythology": "greek"}, + ) + + concurrent_cursor.observe(record_1) + concurrent_cursor.close_partition(partition_0) + concurrent_cursor.observe(record_2) + concurrent_cursor.close_partition(partition_1) + concurrent_cursor.observe(record_3) + concurrent_cursor.close_partition(partition_3) + + message_repository.emit_message.assert_called_with(state_manager.create_state_message.return_value) + assert message_repository.emit_message.call_count == 3 + state_manager.update_state_for_stream.assert_called_with( + "gods", + _A_STREAM_NAMESPACE, + { + "slices": [ + {"start": "2024-07-01T00:00:00.000Z", "end": "2024-07-31T00:00:00.000Z"}, + {"start": "2024-08-15T00:00:00.000Z", "end": "2024-08-29T00:00:00.000Z"} + + ], + "state_type": "date-range" + }, + ) + assert state_manager.update_state_for_stream.call_count == 3