diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job.py index 8ae4e40e7022..5b4f1c7ab0ce 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/job.py @@ -4,8 +4,8 @@ from datetime import timedelta from typing import Optional -from airbyte_cdk import StreamSlice from airbyte_cdk.sources.declarative.async_job.timer import Timer +from airbyte_cdk.sources.types import StreamSlice from .status import AsyncJobStatus diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/repository.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/repository.py index 151c1d922f55..b2de8659a393 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/repository.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/async_job/repository.py @@ -3,8 +3,8 @@ from abc import abstractmethod from typing import Any, Iterable, Mapping, Set -from airbyte_cdk import StreamSlice from airbyte_cdk.sources.declarative.async_job.job import AsyncJob +from airbyte_cdk.sources.types import StreamSlice class AsyncJobRepository: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py index d30d833c8d5b..09ce080c8ae4 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py @@ -6,7 +6,7 @@ from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.declarative.incremental import GlobalSubstreamCursor, PerPartitionCursor +from airbyte_cdk.sources.declarative.incremental import GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever @@ -200,7 +200,7 @@ def _get_checkpoint_reader( cursor = self.get_cursor() checkpoint_mode = self._checkpoint_mode - if isinstance(cursor, (GlobalSubstreamCursor, PerPartitionCursor)): + if isinstance(cursor, (GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor)): self.has_multiple_slices = True return CursorBasedCheckpointReader( stream_slices=mappings_or_slices, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py index 0d175d35b057..f396224c12e8 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -3,9 +3,9 @@ # import datetime from dataclasses import InitVar, dataclass -from typing import Any, Iterable, Mapping, Optional +from typing import Any, Iterable, Mapping, Optional, Union -from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor, PerPartitionCursor +from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor, GlobalSubstreamCursor, PerPartitionWithGlobalCursor from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean from airbyte_cdk.sources.types import Config, StreamSlice, StreamState @@ -55,14 +55,12 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter): def __init__( self, date_time_based_cursor: DatetimeBasedCursor, - per_partition_cursor: Optional[PerPartitionCursor] = None, - is_global_substream_cursor: bool = False, + substream_cursor: Optional[Union[PerPartitionWithGlobalCursor, GlobalSubstreamCursor]], **kwargs: Any, ): super().__init__(**kwargs) self._date_time_based_cursor = date_time_based_cursor - self._per_partition_cursor = per_partition_cursor - self.is_global_substream_cursor = is_global_substream_cursor + self._substream_cursor = substream_cursor @property def _cursor_field(self) -> str: @@ -108,15 +106,9 @@ def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice) :param StreamSlice stream_slice: Current Stream slice :return Optional[str]: cursor_value in case it was found, otherwise None. """ - if self._per_partition_cursor: - # self._per_partition_cursor is the same object that DeclarativeStream uses to save/update stream_state - partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice) - return partition_state.get(self._cursor_field) if partition_state else None + state = (self._substream_cursor or self._date_time_based_cursor).select_state(stream_slice) - if self.is_global_substream_cursor: - return stream_state.get("state", {}).get(self._cursor_field) # type: ignore # state is inside a dict for GlobalSubstreamCursor - - return stream_state.get(self._cursor_field) + return state.get(self._cursor_field) if state else None def _get_filter_date(self, state_value: Optional[str]) -> datetime.datetime: start_date_parsed = self._start_date_from_config diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/__init__.py index 5699000bd9f4..11c1cba9913f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/__init__.py @@ -6,7 +6,11 @@ from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import GlobalSubstreamCursor from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import CursorFactory, PerPartitionCursor -from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import ResumableFullRefreshCursor, ChildPartitionResumableFullRefreshCursor +from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import PerPartitionWithGlobalCursor +from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import ( + ChildPartitionResumableFullRefreshCursor, + ResumableFullRefreshCursor, +) __all__ = [ "CursorFactory", @@ -14,6 +18,7 @@ "DeclarativeCursor", "GlobalSubstreamCursor", "PerPartitionCursor", + "PerPartitionWithGlobalCursor", "ResumableFullRefreshCursor", - "ChildPartitionResumableFullRefreshCursor" + "ChildPartitionResumableFullRefreshCursor", ] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py index 9e37bc34385e..6f6344f2a251 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py @@ -4,13 +4,48 @@ import threading import time -from typing import Any, Iterable, Mapping, Optional, Union +from typing import Any, Callable, Iterable, Mapping, Optional, TypeVar, Union from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.types import Record, StreamSlice, StreamState +T = TypeVar("T") + + +def iterate_with_last_flag_and_state( + generator: Iterable[T], get_stream_state_func: Callable[[bool], Optional[Mapping[str, StreamState]]] +) -> Iterable[tuple[T, bool, Any]]: + """ + Iterates over the given generator, yielding tuples containing the element, a flag + indicating whether it's the last element in the generator, and the result of + `get_stream_state_func` applied to the element. + + Args: + generator: The iterable to iterate over. + get_stream_state_func: A function that takes an element from the generator and + returns its state. + + Returns: + An iterator that yields tuples of the form (element, is_last, state). + """ + + iterator = iter(generator) + + try: + current = next(iterator) + state = get_stream_state_func(False) + except StopIteration: + return # Return an empty iterator + + for next_item in iterator: + yield current, False, state + current = next_item + state = get_stream_state_func(False) + + yield current, True, state + class Timer: """ @@ -25,7 +60,7 @@ def start(self) -> None: def finish(self) -> int: if self._start: - return int((time.perf_counter_ns() - self._start) // 1e9) + return ((time.perf_counter_ns() - self._start) / 1e9).__ceil__() else: raise RuntimeError("Global substream cursor timer not started") @@ -52,6 +87,12 @@ def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: Partiti self._slice_semaphore = threading.Semaphore(0) # Start with 0, indicating no slices being tracked self._all_slices_yielded = False self._lookback_window: Optional[int] = None + self._current_partition: Optional[Mapping[str, Any]] = None + self._last_slice: bool = False + self._parent_state: Optional[Mapping[str, Any]] = None + + def start_slices_generation(self) -> None: + self._timer.start() def stream_slices(self) -> Iterable[StreamSlice]: """ @@ -68,32 +109,39 @@ def stream_slices(self) -> Iterable[StreamSlice]: * Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed * Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too """ - previous_slice = None - slice_generator = ( StreamSlice(partition=partition, cursor_slice=cursor_slice) for partition in self._partition_router.stream_slices() for cursor_slice in self._stream_cursor.stream_slices() ) - self._timer.start() - for slice in slice_generator: - if previous_slice is not None: - # Release the semaphore to indicate that a slice has been yielded - self._slice_semaphore.release() - yield previous_slice + self.start_slices_generation() + for slice, last, state in iterate_with_last_flag_and_state(slice_generator, self._partition_router.get_stream_state): + self._parent_state = state + self.register_slice(last) + yield slice + self._parent_state = self._partition_router.get_stream_state() - # Store the current slice as the previous slice for the next iteration - previous_slice = slice + def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: + slice_generator = ( + StreamSlice(partition=partition, cursor_slice=cursor_slice) for cursor_slice in self._stream_cursor.stream_slices() + ) - # After all slices have been generated, release the semaphore one final time - # and flag that all slices have been yielded - self._slice_semaphore.release() - self._all_slices_yielded = True + yield from slice_generator - # Yield the last slice - if previous_slice is not None: - yield previous_slice + def register_slice(self, last: bool) -> None: + """ + Tracks the processing of a stream slice. + + Releases the semaphore for each slice. If it's the last slice (`last=True`), + sets `_all_slices_yielded` to `True` to indicate no more slices will be processed. + + Args: + last (bool): True if the current slice is the last in the sequence. + """ + self._slice_semaphore.release() + if last: + self._all_slices_yielded = True def set_initial_state(self, stream_state: StreamState) -> None: """ @@ -125,7 +173,12 @@ def set_initial_state(self, stream_state: StreamState) -> None: self._lookback_window = stream_state["lookback_window"] self._inject_lookback_into_stream_cursor(stream_state["lookback_window"]) - self._stream_cursor.set_initial_state(stream_state["state"]) + if "state" in stream_state: + self._stream_cursor.set_initial_state(stream_state["state"]) + elif "states" not in stream_state: + # We assume that `stream_state` is in the old global format + # Example: {"global_state_format_key": "global_state_format_value"} + self._stream_cursor.set_initial_state(stream_state) # Set parent state for partition routers based on parent streams self._partition_router.set_initial_state(stream_state) @@ -172,9 +225,8 @@ def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: def get_stream_state(self) -> StreamState: state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()} - parent_state = self._partition_router.get_stream_state() - if parent_state: - state["parent_state"] = parent_state + if self._parent_state: + state["parent_state"] = self._parent_state if self._lookback_window is not None: state["lookback_window"] = self._lookback_window diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py index bec5d061eaa2..851e13534575 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py @@ -11,6 +11,8 @@ from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import PerPartitionKeySerializer from airbyte_cdk.sources.types import Record, StreamSlice, StreamState +logger = logging.getLogger("airbyte") + class CursorFactory: def __init__(self, create_function: Callable[[], DeclarativeCursor]): @@ -22,23 +24,20 @@ def create(self) -> DeclarativeCursor: class PerPartitionCursor(DeclarativeCursor): """ - Given a stream has many partitions, it is important to provide a state per partition. - - Record | Stream Slice | Last Record | DatetimeCursorBased cursor - -- | -- | -- | -- - 1 | {"start_time": "2021-01-01","end_time": "2021-01-31","owner_resource": "1"''} | cursor_field: “2021-01-15” | 2021-01-15 - 2 | {"start_time": "2021-02-01","end_time": "2021-02-28","owner_resource": "1"''} | cursor_field: “2021-02-15” | 2021-02-15 - 3 | {"start_time": "2021-01-01","end_time": "2021-01-31","owner_resource": "2"''} | cursor_field: “2021-01-03” | 2021-01-03 - 4 | {"start_time": "2021-02-01","end_time": "2021-02-28","owner_resource": "2"''} | cursor_field: “2021-02-14” | 2021-02-14 - - Given the following errors, this can lead to some loss or duplication of records: - When | Problem | Affected Record - -- | -- | -- - Between record #1 and #2 | Loss | #3 - Between record #2 and #3 | Loss | #3, #4 - Between record #3 and #4 | Duplication | #1, #2 - - Therefore, we need to manage state per partition. + Manages state per partition when a stream has many partitions, to prevent data loss or duplication. + + **Partition Limitation and Limit Reached Logic** + + - **DEFAULT_MAX_PARTITIONS_NUMBER**: The maximum number of partitions to keep in memory (default is 10,000). + - **_cursor_per_partition**: An ordered dictionary that stores cursors for each partition. + - **_over_limit**: A counter that increments each time an oldest partition is removed when the limit is exceeded. + + The class ensures that the number of partitions tracked does not exceed the `DEFAULT_MAX_PARTITIONS_NUMBER` to prevent excessive memory usage. + + - When the number of partitions exceeds the limit, the oldest partitions are removed from `_cursor_per_partition`, and `_over_limit` is incremented accordingly. + - The `limit_reached` method returns `True` when `_over_limit` exceeds `DEFAULT_MAX_PARTITIONS_NUMBER`, indicating that the global cursor should be used instead of per-partition cursors. + + This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed. """ DEFAULT_MAX_PARTITIONS_NUMBER = 10000 @@ -54,30 +53,42 @@ def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRou # The dict is ordered to ensure that once the maximum number of partitions is reached, # the oldest partitions can be efficiently removed, maintaining the most recent partitions. self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict() + self._over_limit = 0 self._partition_serializer = PerPartitionKeySerializer() + self._last_partition: bool = False def stream_slices(self) -> Iterable[StreamSlice]: slices = self._partition_router.stream_slices() for partition in slices: - # Ensure the maximum number of partitions is not exceeded - self._ensure_partition_limit() + yield from self.generate_slices_from_partition(partition) + self._last_partition = True + + def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: + # Ensure the maximum number of partitions is not exceeded + self._ensure_partition_limit() - cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) - if not cursor: - partition_state = self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE - cursor = self._create_cursor(partition_state) - self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor + cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) + if not cursor: + partition_state = self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE + cursor = self._create_cursor(partition_state) + self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor - for cursor_slice in cursor.stream_slices(): - yield StreamSlice(partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields) + for cursor_slice in cursor.stream_slices(): + yield StreamSlice(partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields) def _ensure_partition_limit(self) -> None: """ Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped. """ while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: + self._over_limit += 1 oldest_partition = self._cursor_per_partition.popitem(last=False)[0] # Remove the oldest partition - logging.warning(f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}.") + logger.warning( + f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." + ) + + def limit_reached(self) -> bool: + return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER def set_initial_state(self, stream_state: StreamState) -> None: """ @@ -121,6 +132,10 @@ def set_initial_state(self, stream_state: StreamState) -> None: for state in stream_state["states"]: self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"]) + # set default state for missing partitions if it is per partition with fallback to global + if "state" in stream_state: + self._state_to_migrate_from = stream_state["state"] + # Set parent state for partition routers based on parent streams self._partition_router.set_initial_state(stream_state) @@ -153,7 +168,7 @@ def get_stream_state(self) -> StreamState: ) state: dict[str, Any] = {"states": states} - parent_state = self._partition_router.get_stream_state() + parent_state = self._partition_router.get_stream_state(last=self._last_partition) if parent_state: state["parent_state"] = parent_state return state diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py new file mode 100644 index 000000000000..76d3557a4c7a --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py @@ -0,0 +1,188 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union + +from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor +from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor +from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import GlobalSubstreamCursor, iterate_with_last_flag_and_state +from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import CursorFactory, PerPartitionCursor +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter +from airbyte_cdk.sources.types import Record, StreamSlice, StreamState + + +class PerPartitionWithGlobalCursor(DeclarativeCursor): + """ + Manages state for streams with multiple partitions, with an optional fallback to a global cursor when specific conditions are met. + + This cursor handles partitioned streams by maintaining individual state per partition using `PerPartitionCursor`. If the number of partitions exceeds a defined limit, it switches to a global cursor (`GlobalSubstreamCursor`) to manage state more efficiently. + + **Overview** + + - **Partition-Based State**: Initially manages state per partition to ensure accurate processing of each partition's data. + - **Global Fallback**: Switches to a global cursor when the partition limit is exceeded to handle state management more effectively. + + **Switching Logic** + + - Monitors the number of partitions. + - If `PerPartitionCursor.limit_reached()` returns `True`, sets `_use_global_cursor` to `True`, activating the global cursor. + + **Active Cursor Selection** + + - Uses the `_get_active_cursor()` helper method to select the active cursor based on the `_use_global_cursor` flag. + - This simplifies the logic and ensures consistent cursor usage across methods. + + **State Structure Example** + + ```json + { + "states": [ + { + "partition": {"partition_key": "partition_1"}, + "cursor": {"cursor_field": "2021-01-15"} + }, + { + "partition": {"partition_key": "partition_2"}, + "cursor": {"cursor_field": "2021-02-14"} + } + ], + "state": { + "cursor_field": "2021-02-15" + }, + "use_global_cursor": false + } + ``` + + In this example, the cursor is using partition-based state management (`"use_global_cursor": false`), maintaining separate cursor states for each partition. + + **Usage Scenario** + + Suitable for streams where the number of partitions may vary significantly, requiring dynamic switching between per-partition and global state management to ensure data consistency and efficient synchronization. + """ + + def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter, stream_cursor: DatetimeBasedCursor): + self._partition_router = partition_router + self._per_partition_cursor = PerPartitionCursor(cursor_factory, partition_router) + self._global_cursor = GlobalSubstreamCursor(stream_cursor, partition_router) + self._use_global_cursor = False + self._current_partition: Optional[Mapping[str, Any]] = None + self._last_slice: bool = False + self._parent_state: Optional[Mapping[str, Any]] = None + + def _get_active_cursor(self) -> Union[PerPartitionCursor, GlobalSubstreamCursor]: + return self._global_cursor if self._use_global_cursor else self._per_partition_cursor + + def stream_slices(self) -> Iterable[StreamSlice]: + self._global_cursor.start_slices_generation() + + # Iterate through partitions and process slices + for partition, is_last_partition, parent_state in iterate_with_last_flag_and_state( + self._partition_router.stream_slices(), self._partition_router.get_stream_state + ): + # Generate slices for the current cursor and handle the last slice using the flag + self._parent_state = parent_state + for slice, is_last_slice, _ in iterate_with_last_flag_and_state( + self._get_active_cursor().generate_slices_from_partition(partition=partition), lambda x: None + ): + self._global_cursor.register_slice(is_last_slice and is_last_partition) + yield slice + self._parent_state = self._partition_router.get_stream_state() + + def set_initial_state(self, stream_state: StreamState) -> None: + """ + Set the initial state for the cursors. + """ + self._use_global_cursor = stream_state.get("use_global_cursor", False) + + self._parent_state = stream_state.get("parent_state", {}) + + self._global_cursor.set_initial_state(stream_state) + if not self._use_global_cursor: + self._per_partition_cursor.set_initial_state(stream_state) + + def observe(self, stream_slice: StreamSlice, record: Record) -> None: + if not self._use_global_cursor and self._per_partition_cursor.limit_reached(): + self._use_global_cursor = True + + if not self._use_global_cursor: + self._per_partition_cursor.observe(stream_slice, record) + self._global_cursor.observe(stream_slice, record) + + def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: + if not self._use_global_cursor: + self._per_partition_cursor.close_slice(stream_slice, *args) + self._global_cursor.close_slice(stream_slice, *args) + + def get_stream_state(self) -> StreamState: + final_state: MutableMapping[str, Any] = {"use_global_cursor": self._use_global_cursor} + + final_state.update(self._global_cursor.get_stream_state()) + if not self._use_global_cursor: + final_state.update(self._per_partition_cursor.get_stream_state()) + + final_state["parent_state"] = self._parent_state + if not final_state.get("parent_state"): + del final_state["parent_state"] + + return final_state + + def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: + return self._get_active_cursor().select_state(stream_slice) + + def get_request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._get_active_cursor().get_request_params( + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + + def get_request_headers( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._get_active_cursor().get_request_headers( + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + + def get_request_body_data( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Union[Mapping[str, Any], str]: + return self._get_active_cursor().get_request_body_data( + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + + def get_request_body_json( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return self._get_active_cursor().get_request_body_json( + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + + def should_be_synced(self, record: Record) -> bool: + return self._global_cursor.should_be_synced(record) or self._per_partition_cursor.should_be_synced(record) + + def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: + return self._global_cursor.is_greater_than_or_equal(first, second) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4ec85e1ee81f..8d1b2a6d200f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -49,6 +49,7 @@ DeclarativeCursor, GlobalSubstreamCursor, PerPartitionCursor, + PerPartitionWithGlobalCursor, ResumableFullRefreshCursor, ) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString @@ -361,9 +362,11 @@ def create_api_key_authenticator( ) ) return ApiKeyAuthenticator( - token_provider=token_provider - if token_provider is not None - else InterpolatedStringTokenProvider(api_token=model.api_token or "", config=config, parameters=model.parameters or {}), + token_provider=( + token_provider + if token_provider is not None + else InterpolatedStringTokenProvider(api_token=model.api_token or "", config=config, parameters=model.parameters or {}) + ), request_option=request_option, config=config, parameters=model.parameters or {}, @@ -431,9 +434,11 @@ def create_bearer_authenticator( if token_provider is not None and model.api_token != "": raise ValueError("If token_provider is set, api_token is ignored and has to be set to empty string.") return BearerAuthenticator( - token_provider=token_provider - if token_provider is not None - else InterpolatedStringTokenProvider(api_token=model.api_token or "", config=config, parameters=model.parameters or {}), + token_provider=( + token_provider + if token_provider is not None + else InterpolatedStringTokenProvider(api_token=model.api_token or "", config=config, parameters=model.parameters or {}) + ), config=config, parameters=model.parameters or {}, ) @@ -683,13 +688,14 @@ def create_declarative_stream(self, model: DeclarativeStreamModel, config: Confi and hasattr(model.incremental_sync, "is_client_side_incremental") and model.incremental_sync.is_client_side_incremental ): - supported_slicers = (DatetimeBasedCursor, GlobalSubstreamCursor, PerPartitionCursor) + supported_slicers = (DatetimeBasedCursor, GlobalSubstreamCursor, PerPartitionWithGlobalCursor) if combined_slicers and not isinstance(combined_slicers, supported_slicers): - raise ValueError("Unsupported Slicer is used. PerPartitionCursor should be used here instead") + raise ValueError("Unsupported Slicer is used. PerPartitionWithGlobalCursor should be used here instead") client_side_incremental_sync = { "date_time_based_cursor": self._create_component_from_model(model=model.incremental_sync, config=config), - "per_partition_cursor": combined_slicers if isinstance(combined_slicers, PerPartitionCursor) else None, - "is_global_substream_cursor": isinstance(combined_slicers, GlobalSubstreamCursor), + "substream_cursor": ( + combined_slicers if isinstance(combined_slicers, (PerPartitionWithGlobalCursor, GlobalSubstreamCursor)) else None + ), } if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel): @@ -791,11 +797,13 @@ def _merge_stream_slicers(self, model: DeclarativeStreamModel, config: Config) - cursor_component = self._create_component_from_model(model=incremental_sync_model, config=config) return GlobalSubstreamCursor(stream_cursor=cursor_component, partition_router=stream_slicer) else: - return PerPartitionCursor( + cursor_component = self._create_component_from_model(model=incremental_sync_model, config=config) + return PerPartitionWithGlobalCursor( cursor_factory=CursorFactory( lambda: self._create_component_from_model(model=incremental_sync_model, config=config), ), partition_router=stream_slicer, + stream_cursor=cursor_component, ) elif model.incremental_sync: return self._create_component_from_model(model=model.incremental_sync, config=config) if model.incremental_sync else None @@ -1355,9 +1363,11 @@ def create_async_retriever( ), primary_key=None, name=job_download_components_name, - paginator=self._create_component_from_model(model=model.download_paginator, decoder=decoder, config=config, url_base="") - if model.download_paginator - else NoPagination(parameters={}), + paginator=( + self._create_component_from_model(model=model.download_paginator, decoder=decoder, config=config, url_base="") + if model.download_paginator + else NoPagination(parameters={}) + ), config=config, parameters={}, ) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py index b3fcc1794ccf..8a3c3e6bc9dd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/cartesian_product_stream_slicer.py @@ -35,6 +35,9 @@ class CartesianProductStreamSlicer(PartitionRouter): stream_slicers: List[PartitionRouter] parameters: InitVar[Mapping[str, Any]] + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._initial_parent_state: Mapping[str, Any] = {} + def get_request_params( self, *, @@ -136,15 +139,19 @@ def set_initial_state(self, stream_state: StreamState) -> None: } } """ + if "parent_state" in stream_state: + self._initial_parent_state = stream_state["parent_state"] + for stream_slicer in self.stream_slicers: stream_slicer.set_initial_state(stream_state) - def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: + def get_stream_state(self, last: bool = False) -> Optional[Mapping[str, StreamState]]: """ Get the state of the parent streams. - This method returns the combined parent states from all stream slicers. If a stream slicer does not have parent streams, - this will be skipped due to the default StreamSlicer implementation. + This method returns the combined parent states from all stream slicers. It currently retrieves the final state only for the last partition processed. If a stream slicer does not have parent streams, this will be skipped due to the default StreamSlicer implementation. + + TODO: Can be improved by tracking the state of every stream slicer and updating the state of the last stream slicer when all the partitions for other slicers have been produced. Returns: Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. @@ -158,9 +165,13 @@ def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: } } """ + if not last: + return self._initial_parent_state + combined_state: dict[str, StreamState] = {} for s in self.stream_slicers: - parent_state = s.get_stream_state() + # Getting the initial state of the stream slicer + parent_state = s.get_stream_state(last=last) if parent_state: combined_state.update(parent_state) return combined_state diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py index 564a3119e25b..a9a2d1f20401 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py @@ -94,7 +94,7 @@ def set_initial_state(self, stream_state: StreamState) -> None: """ pass - def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: + def get_stream_state(self, last: bool = False) -> Optional[Mapping[str, StreamState]]: """ ListPartitionRouter doesn't have parent streams """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/partition_router.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/partition_router.py index 3a9bc3abfbf2..031793ff508a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/partition_router.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/partition_router.py @@ -41,7 +41,7 @@ def set_initial_state(self, stream_state: StreamState) -> None: """ @abstractmethod - def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: + def get_stream_state(self, last: bool = False) -> Optional[Mapping[str, StreamState]]: """ Get the state of the parent streams. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py index 32e6a353dedf..8d601e02a83f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py @@ -56,7 +56,7 @@ def set_initial_state(self, stream_state: StreamState) -> None: """ pass - def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: + def get_stream_state(self, last: bool = False) -> Optional[Mapping[str, StreamState]]: """ SinglePartitionRouter doesn't have parent streams """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 9eac1f6bb66e..1c9fb212487e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -4,7 +4,7 @@ import copy import logging from dataclasses import InitVar, dataclass -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Optional, Union +from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, Optional, Union import dpath from airbyte_cdk.models import AirbyteMessage @@ -59,17 +59,27 @@ class SubstreamPartitionRouter(PartitionRouter): Attributes: parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config + + State Management: + - The state for each parent stream is stored in `_parent_state`, which is a dictionary mapping + stream names to their current states. + - The initial state is stored in `_initial_parent_state`, set during the first assignment via `set_initial_state`. + - The `_parent_state_to_partition` dictionary maps partition keys to parent states, + facilitating state retrieval based on partitions. """ parent_stream_configs: List[ParentStreamConfig] config: Config parameters: InitVar[Mapping[str, Any]] + MAX_PARTITIONS = 2 # Limit for the number of partitions + # Currently, there is a limitation of two partitions due to the logic of the global cursor, + # which identifies what slice is last and stores one slice in memory. Once substreams are added to concurrent CDK, + # we can increase this limit and update the logic for deleting processed partitions. def __post_init__(self, parameters: Mapping[str, Any]) -> None: if not self.parent_stream_configs: raise ValueError("SubstreamPartitionRouter needs at least 1 parent stream") self._parameters = parameters - self._parent_state: Dict[str, Any] = {} def get_request_params( self, @@ -144,8 +154,6 @@ def stream_slices(self) -> Iterable[StreamSlice]: if parent_stream_config.extra_fields: extra_fields = [[field_path_part.eval(self.config) for field_path_part in field_path] for field_path in parent_stream_config.extra_fields] # type: ignore # extra_fields is always casted to an interpolated string - incremental_dependency = parent_stream_config.incremental_dependency - # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does # not support either substreams or RFR, but something that needs to be considered once we do for parent_record in parent_stream.read_only_records(): @@ -173,18 +181,13 @@ def stream_slices(self) -> Iterable[StreamSlice]: # Add extra fields extracted_extra_fields = self._extract_extra_fields(parent_record, extra_fields) - yield StreamSlice( + stream_slice = StreamSlice( partition={partition_field: partition_value, "parent_slice": parent_partition or {}}, cursor_slice={}, extra_fields=extracted_extra_fields, ) - if incremental_dependency: - self._parent_state[parent_stream.name] = copy.deepcopy(parent_stream.state) - - # A final parent state update and yield of records is needed, so we don't skip records for the final parent slice - if incremental_dependency: - self._parent_state[parent_stream.name] = copy.deepcopy(parent_stream.state) + yield stream_slice def _extract_extra_fields( self, parent_record: Mapping[str, Any] | AirbyteMessage, extra_fields: Optional[List[List[str]]] = None @@ -242,9 +245,8 @@ def set_initial_state(self, stream_state: StreamState) -> None: for parent_config in self.parent_stream_configs: if parent_config.incremental_dependency: parent_config.stream.state = parent_state.get(parent_config.stream.name, {}) - self._parent_state[parent_config.stream.name] = parent_config.stream.state - def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: + def get_stream_state(self, last: bool = True) -> Optional[Mapping[str, StreamState]]: """ Get the state of the parent streams. @@ -261,7 +263,11 @@ def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: } } """ - return copy.deepcopy(self._parent_state) + parent_state = {} + for parent_config in self.parent_stream_configs: + if parent_config.incremental_dependency: + parent_state[parent_config.stream.name] = copy.deepcopy(parent_config.stream.state) + return parent_state @property def logger(self) -> logging.Logger: diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_filter.py b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_filter.py index 693e13e86fbf..498f61b714bd 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_filter.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_filter.py @@ -6,7 +6,12 @@ import pytest from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.extractors.record_filter import ClientSideIncrementalRecordFilterDecorator, RecordFilter -from airbyte_cdk.sources.declarative.incremental import CursorFactory, DatetimeBasedCursor, PerPartitionCursor +from airbyte_cdk.sources.declarative.incremental import ( + CursorFactory, + DatetimeBasedCursor, + GlobalSubstreamCursor, + PerPartitionWithGlobalCursor, +) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import CustomRetriever, DeclarativeStream, ParentStreamConfig from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter @@ -18,6 +23,7 @@ {"id": 2, "created_at": "2021-01-03"}, {"id": 3, "created_at": "2021-01-04"}, {"id": 4, "created_at": "2021-02-01"}, + {"id": 5, "created_at": "2021-01-02"}, ] DATE_TIME_WITH_TZ_FORMAT = "%Y-%m-%dT%H:%M:%S%z" @@ -41,29 +47,29 @@ "filter_template, records, expected_records", [ ( - "{{ record['created_at'] > stream_state['created_at'] }}", - [{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}], - [{"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}], + "{{ record['created_at'] > stream_state['created_at'] }}", + [{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}], + [{"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}], ), ( - "{{ record['last_seen'] >= stream_slice['last_seen'] }}", - [{"id": 1, "last_seen": "06-06-21"}, {"id": 2, "last_seen": "06-07-21"}, {"id": 3, "last_seen": "06-10-21"}], - [{"id": 3, "last_seen": "06-10-21"}], + "{{ record['last_seen'] >= stream_slice['last_seen'] }}", + [{"id": 1, "last_seen": "06-06-21"}, {"id": 2, "last_seen": "06-07-21"}, {"id": 3, "last_seen": "06-10-21"}], + [{"id": 3, "last_seen": "06-10-21"}], ), ( - "{{ record['id'] >= next_page_token['last_seen_id'] }}", - [{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}], - [{"id": 14}, {"id": 15}], + "{{ record['id'] >= next_page_token['last_seen_id'] }}", + [{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}], + [{"id": 14}, {"id": 15}], ), ( - "{{ record['id'] >= next_page_token['path_to_nowhere'] }}", - [{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}], - [], + "{{ record['id'] >= next_page_token['path_to_nowhere'] }}", + [{"id": 11}, {"id": 12}, {"id": 13}, {"id": 14}, {"id": 15}], + [], ), ( - "{{ record['created_at'] > parameters['created_at'] }}", - [{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}], - [{"id": 3, "created_at": "06-08-21"}], + "{{ record['created_at'] > parameters['created_at'] }}", + [{"id": 1, "created_at": "06-06-21"}, {"id": 2, "created_at": "06-07-21"}, {"id": 3, "created_at": "06-08-21"}], + [{"id": 3, "created_at": "06-08-21"}], ), ( "{{ record['created_at'] > stream_slice.extra_fields['created_at'] }}", @@ -97,62 +103,62 @@ def test_record_filter(filter_template: str, records: List[Mapping], expected_re @pytest.mark.parametrize( "datetime_format, stream_state, record_filter_expression, end_datetime, records_to_filter, expected_record_ids", [ - (DATE_FORMAT, {}, None, "2021-01-05", RECORDS_TO_FILTER_DATE_FORMAT, [2, 3]), - (DATE_FORMAT, {}, None, None, RECORDS_TO_FILTER_DATE_FORMAT, [2, 3, 4]), + (DATE_FORMAT, {}, None, "2021-01-05", RECORDS_TO_FILTER_DATE_FORMAT, [2, 3, 5]), + (DATE_FORMAT, {}, None, None, RECORDS_TO_FILTER_DATE_FORMAT, [2, 3, 4, 5]), (DATE_FORMAT, {"created_at": "2021-01-04"}, None, "2021-01-05", RECORDS_TO_FILTER_DATE_FORMAT, [3]), (DATE_FORMAT, {"created_at": "2021-01-04"}, None, None, RECORDS_TO_FILTER_DATE_FORMAT, [3, 4]), - (DATE_FORMAT, {}, "{{ record['id'] % 2 == 1 }}", "2021-01-05", RECORDS_TO_FILTER_DATE_FORMAT, [3]), + (DATE_FORMAT, {}, "{{ record['id'] % 2 == 1 }}", "2021-01-05", RECORDS_TO_FILTER_DATE_FORMAT, [3, 5]), (DATE_TIME_WITH_TZ_FORMAT, {}, None, "2021-01-05T00:00:00+00:00", RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT, [2, 3]), (DATE_TIME_WITH_TZ_FORMAT, {}, None, None, RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT, [2, 3, 4]), ( - DATE_TIME_WITH_TZ_FORMAT, - {"created_at": "2021-01-04T00:00:00+00:00"}, - None, - "2021-01-05T00:00:00+00:00", - RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT, - [3], + DATE_TIME_WITH_TZ_FORMAT, + {"created_at": "2021-01-04T00:00:00+00:00"}, + None, + "2021-01-05T00:00:00+00:00", + RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT, + [3], ), ( - DATE_TIME_WITH_TZ_FORMAT, - {"created_at": "2021-01-04T00:00:00+00:00"}, - None, - None, - RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT, - [3, 4], + DATE_TIME_WITH_TZ_FORMAT, + {"created_at": "2021-01-04T00:00:00+00:00"}, + None, + None, + RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT, + [3, 4], ), ( - DATE_TIME_WITH_TZ_FORMAT, - {}, - "{{ record['id'] % 2 == 1 }}", - "2021-01-05T00:00:00+00:00", - RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT, - [3], + DATE_TIME_WITH_TZ_FORMAT, + {}, + "{{ record['id'] % 2 == 1 }}", + "2021-01-05T00:00:00+00:00", + RECORDS_TO_FILTER_DATE_TIME_WITH_TZ_FORMAT, + [3], ), (DATE_TIME_WITHOUT_TZ_FORMAT, {}, None, "2021-01-05T00:00:00", RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT, [2, 3]), (DATE_TIME_WITHOUT_TZ_FORMAT, {}, None, None, RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT, [2, 3, 4]), ( - DATE_TIME_WITHOUT_TZ_FORMAT, - {"created_at": "2021-01-04T00:00:00"}, - None, - "2021-01-05T00:00:00", - RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT, - [3], + DATE_TIME_WITHOUT_TZ_FORMAT, + {"created_at": "2021-01-04T00:00:00"}, + None, + "2021-01-05T00:00:00", + RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT, + [3], ), ( - DATE_TIME_WITHOUT_TZ_FORMAT, - {"created_at": "2021-01-04T00:00:00"}, - None, - None, - RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT, - [3, 4], + DATE_TIME_WITHOUT_TZ_FORMAT, + {"created_at": "2021-01-04T00:00:00"}, + None, + None, + RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT, + [3, 4], ), ( - DATE_TIME_WITHOUT_TZ_FORMAT, - {}, - "{{ record['id'] % 2 == 1 }}", - "2021-01-05T00:00:00", - RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT, - [3], + DATE_TIME_WITHOUT_TZ_FORMAT, + {}, + "{{ record['id'] % 2 == 1 }}", + "2021-01-05T00:00:00", + RECORDS_TO_FILTER_DATE_TIME_WITHOUT_TZ_FORMAT, + [3], ), ], ids=[ @@ -174,12 +180,12 @@ def test_record_filter(filter_template: str, records: List[Mapping], expected_re ], ) def test_client_side_record_filter_decorator_no_parent_stream( - datetime_format: str, - stream_state: Optional[Mapping], - record_filter_expression: str, - end_datetime: Optional[str], - records_to_filter: List[Mapping], - expected_record_ids: List[int], + datetime_format: str, + stream_state: Optional[Mapping], + record_filter_expression: str, + end_datetime: Optional[str], + records_to_filter: List[Mapping], + expected_record_ids: List[int], ): date_time_based_cursor = DatetimeBasedCursor( start_datetime=MinMaxDatetime(datetime="2021-01-01", datetime_format=DATE_FORMAT, parameters={}), @@ -191,13 +197,14 @@ def test_client_side_record_filter_decorator_no_parent_stream( config={}, parameters={}, ) + date_time_based_cursor.set_initial_state(stream_state) record_filter_decorator = ClientSideIncrementalRecordFilterDecorator( config={}, condition=record_filter_expression, parameters={}, date_time_based_cursor=date_time_based_cursor, - per_partition_cursor=None, + substream_cursor=None, ) filtered_records = list( @@ -208,53 +215,157 @@ def test_client_side_record_filter_decorator_no_parent_stream( @pytest.mark.parametrize( - "stream_state, expected_record_ids", + "stream_state, cursor_type, expected_record_ids", [ - ({}, [2, 3]), - ({"states": [{"some_parent_id": {"created_at": "2021-01-03"}}]}, [3]), + # Use only DatetimeBasedCursor + ({}, 'datetime', [2, 3, 5]), + # Use GlobalSubstreamCursor with no state + ({}, 'global_substream', [2, 3, 5]), + # Use GlobalSubstreamCursor with global state + ( + { + 'state': {'created_at': '2021-01-03'} + }, + 'global_substream', + [2, 3] + ), + # Use PerPartitionWithGlobalCursor with partition state + ( + { + 'use_global_cursor': False, + 'state': {'created_at': '2021-01-10'}, + 'states': [ + { + 'partition': {'id': 'some_parent_id', 'parent_slice': {}}, + 'cursor': {'created_at': '2021-01-03'} + } + ] + }, + 'per_partition_with_global', + [2, 3] + ), + # Use PerPartitionWithGlobalCursor with global state + ( + { + 'use_global_cursor': True, + 'state': {'created_at': '2021-01-03'}, + 'states': [ + { + 'partition': {'id': 'some_parent_id', 'parent_slice': {}}, + 'cursor': {'created_at': '2021-01-13'} + } + ] + }, + 'per_partition_with_global', + [2, 3] + ), + # Use PerPartitionWithGlobalCursor with partition state missing, global cursor used + ( + { + 'use_global_cursor': True, + 'state': {'created_at': '2021-01-03'} + }, + 'per_partition_with_global', + [2, 3] + ), + # Use PerPartitionWithGlobalCursor with partition state missing, global cursor not used + ( + { + 'use_global_cursor': False, + 'state': {'created_at': '2021-01-03'} + }, + 'per_partition_with_global', + [2, 3, 5] # Global cursor not used, start date used + ), ], - ids=["no_stream_state_no_record_filter", "with_stream_state_no_record_filter"], + ids=[ + 'datetime_cursor_only', + 'global_substream_no_state', + 'global_substream_with_state', + 'per_partition_with_partition_state', + 'per_partition_with_global_state', + 'per_partition_partition_missing_global_cursor_used', + 'per_partition_partition_missing_global_cursor_not_used', + ] ) -def test_client_side_record_filter_decorator_with_parent_stream(stream_state: Optional[Mapping], expected_record_ids: List[int]): - date_time_based_cursor = DatetimeBasedCursor( - start_datetime=MinMaxDatetime(datetime="2021-01-01", datetime_format=DATE_FORMAT, parameters={}), - end_datetime=MinMaxDatetime(datetime="2021-01-05", datetime_format=DATE_FORMAT, parameters={}), - step="P10Y", - cursor_field=InterpolatedString.create("created_at", parameters={}), - datetime_format=DATE_FORMAT, - cursor_granularity="P1D", - config={}, - parameters={}, - ) - per_partition_cursor = PerPartitionCursor( - cursor_factory=CursorFactory(lambda: date_time_based_cursor), - partition_router=SubstreamPartitionRouter( +def test_client_side_record_filter_decorator_with_cursor_types( + stream_state: Optional[Mapping], + cursor_type: str, + expected_record_ids: List[int] +): + def date_time_based_cursor_factory() -> DatetimeBasedCursor: + return DatetimeBasedCursor( + start_datetime=MinMaxDatetime(datetime="2021-01-01", datetime_format=DATE_FORMAT, parameters={}), + end_datetime=MinMaxDatetime(datetime="2021-01-05", datetime_format=DATE_FORMAT, parameters={}), + step="P10Y", + cursor_field=InterpolatedString.create("created_at", parameters={}), + datetime_format=DATE_FORMAT, + cursor_granularity="P1D", config={}, parameters={}, - parent_stream_configs=[ - ParentStreamConfig( - type="ParentStreamConfig", - parent_key="id", - partition_field="id", - stream=DeclarativeStream( - type="DeclarativeStream", retriever=CustomRetriever(type="CustomRetriever", class_name="a_class_name") - ), - ) - ], - ), + ) + + date_time_based_cursor = date_time_based_cursor_factory() + + substream_cursor = None + partition_router = SubstreamPartitionRouter( + config={}, + parameters={}, + parent_stream_configs=[ + ParentStreamConfig( + type="ParentStreamConfig", + parent_key="id", + partition_field="id", + stream=DeclarativeStream( + type="DeclarativeStream", + retriever=CustomRetriever(type="CustomRetriever", class_name="a_class_name") + ), + ) + ], ) - if stream_state: - per_partition_cursor.set_initial_state( - {"states": [{"partition": {"id": "some_parent_id", "parent_slice": {}}, "cursor": {"created_at": "2021-01-04"}}]} + + if cursor_type == 'datetime': + # Use only DatetimeBasedCursor + pass # No additional cursor needed + elif cursor_type == 'global_substream': + # Create GlobalSubstreamCursor instance + substream_cursor = GlobalSubstreamCursor( + stream_cursor=date_time_based_cursor, + partition_router=partition_router, + ) + if stream_state: + substream_cursor.set_initial_state(stream_state) + elif cursor_type == 'per_partition_with_global': + # Create PerPartitionWithGlobalCursor instance + substream_cursor = PerPartitionWithGlobalCursor( + cursor_factory=CursorFactory(date_time_based_cursor_factory), + partition_router=partition_router, + stream_cursor=date_time_based_cursor, ) + else: + raise ValueError(f"Unsupported cursor type: {cursor_type}") + + if substream_cursor and stream_state: + substream_cursor.set_initial_state(stream_state) + elif stream_state: + date_time_based_cursor.set_initial_state(stream_state) + + # Create the record_filter_decorator with appropriate cursor record_filter_decorator = ClientSideIncrementalRecordFilterDecorator( - config={}, parameters={}, date_time_based_cursor=date_time_based_cursor, per_partition_cursor=per_partition_cursor + config={}, + parameters={}, + date_time_based_cursor=date_time_based_cursor, + substream_cursor=substream_cursor, ) + + # The partition we're testing + stream_slice = StreamSlice(partition={"id": "some_parent_id", "parent_slice": {}}, cursor_slice={}) + filtered_records = list( record_filter_decorator.filter_records( records=RECORDS_TO_FILTER_DATE_FORMAT, stream_state=stream_state, - stream_slice=StreamSlice(partition={"id": "some_parent_id", "parent_slice": {}}, cursor_slice={}), + stream_slice=stream_slice, next_page_token=None, ) ) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py index 3e2c7b77f0f8..4fff298b99aa 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import logging from unittest.mock import MagicMock, patch from airbyte_cdk.models import ( @@ -202,12 +203,14 @@ def test_given_record_for_partition_when_read_then_update_state(): ) assert stream_instance.state == { + "state": {}, + "use_global_cursor": False, "states": [ { "partition": {"partition_field": "1"}, "cursor": {CURSOR_FIELD: "2022-01-15"}, } - ] + ], } @@ -282,7 +285,14 @@ def test_substream_without_input_state(): ] -def test_partition_limitation(): +def test_partition_limitation(caplog): + """ + Test that when the number of partitions exceeds the maximum allowed limit in PerPartitionCursor, + the oldest partitions are dropped, and the state is updated accordingly. + + In this test, we set the maximum number of partitions to 2 and provide 3 partitions. + We verify that the state only retains information for the two most recent partitions. + """ source = ManifestDeclarativeSource( source_config=ManifestBuilder() .with_list_partition_router("Rates", "partition_field", ["1", "2", "3"]) @@ -351,14 +361,24 @@ def test_partition_limitation(): ] logger = MagicMock() - # with patch.object(PerPartitionCursor, "stream_slices", return_value=partition_slices): - with patch.object(SimpleRetriever, "_read_pages", side_effect=records_list): - with patch.object(PerPartitionCursor, "DEFAULT_MAX_PARTITIONS_NUMBER", 2): - output = list(source.read(logger, {}, catalog, initial_state)) + # Use caplog to capture logs + with caplog.at_level(logging.WARNING, logger="airbyte"): + with patch.object(SimpleRetriever, "_read_pages", side_effect=records_list): + with patch.object(PerPartitionCursor, "DEFAULT_MAX_PARTITIONS_NUMBER", 2): + output = list(source.read(logger, {}, catalog, initial_state)) + + # Check if the warning was logged + logged_messages = [record.message for record in caplog.records if record.levelname == "WARNING"] + warning_message = ( + 'The maximum number of partitions has been reached. Dropping the oldest partition: {"partition_field":"1"}. Over limit: 1.' + ) + assert warning_message in logged_messages - # assert output_data == expected_records final_state = [orjson.loads(orjson.dumps(message.state.stream.stream_state)) for message in output if message.state] assert final_state[-1] == { + "lookback_window": 1, + "state": {"cursor_field": "2022-02-17"}, + "use_global_cursor": False, "states": [ { "partition": {"partition_field": "2"}, @@ -368,5 +388,184 @@ def test_partition_limitation(): "partition": {"partition_field": "3"}, "cursor": {CURSOR_FIELD: "2022-02-17"}, }, - ] + ], + } + + +def test_perpartition_with_fallback(caplog): + """ + Test that when the number of partitions exceeds the limit in PerPartitionCursor, + the cursor falls back to using the global cursor for state management. + + This test also checks that the appropriate warning logs are emitted when the partition limit is exceeded. + """ + source = ManifestDeclarativeSource( + source_config=ManifestBuilder() + .with_list_partition_router("Rates", "partition_field", ["1", "2", "3", "4", "5", "6"]) + .with_incremental_sync( + "Rates", + start_datetime="2022-01-01", + end_datetime="2022-02-28", + datetime_format="%Y-%m-%d", + cursor_field=CURSOR_FIELD, + step="P1M", + cursor_granularity="P1D", + ) + .build() + ) + + partition_slices = [StreamSlice(partition={"partition_field": str(i)}, cursor_slice={}) for i in range(1, 7)] + + records_list = [ + [ + Record({"a record key": "a record value", CURSOR_FIELD: "2022-01-15"}, partition_slices[0]), + Record({"a record key": "a record value", CURSOR_FIELD: "2022-01-16"}, partition_slices[0]), + ], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-02-15"}, partition_slices[0])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-01-16"}, partition_slices[1])], + [], + [], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-02-17"}, partition_slices[2])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-01-17"}, partition_slices[3])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-02-19"}, partition_slices[3])], + [], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-02-18"}, partition_slices[4])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-01-13"}, partition_slices[3])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-02-18"}, partition_slices[3])], + ] + + configured_stream = ConfiguredAirbyteStream( + stream=AirbyteStream(name="Rates", json_schema={}, supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental]), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ) + catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) + + initial_state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="Rates", namespace=None), + stream_state=AirbyteStateBlob( + { + "states": [ + { + "partition": {"partition_field": "1"}, + "cursor": {CURSOR_FIELD: "2022-01-01"}, + }, + { + "partition": {"partition_field": "2"}, + "cursor": {CURSOR_FIELD: "2022-01-02"}, + }, + { + "partition": {"partition_field": "3"}, + "cursor": {CURSOR_FIELD: "2022-01-03"}, + }, + ] + } + ), + ), + ) + ] + logger = MagicMock() + + # Use caplog to capture logs + with caplog.at_level(logging.WARNING, logger="airbyte"): + with patch.object(SimpleRetriever, "_read_pages", side_effect=records_list): + with patch.object(PerPartitionCursor, "DEFAULT_MAX_PARTITIONS_NUMBER", 2): + output = list(source.read(logger, {}, catalog, initial_state)) + + # Check if the warnings were logged + expected_warning_messages = [ + 'The maximum number of partitions has been reached. Dropping the oldest partition: {"partition_field":"1"}. Over limit: 1.', + 'The maximum number of partitions has been reached. Dropping the oldest partition: {"partition_field":"2"}. Over limit: 2.', + 'The maximum number of partitions has been reached. Dropping the oldest partition: {"partition_field":"3"}. Over limit: 3.', + ] + + logged_messages = [record.message for record in caplog.records if record.levelname == "WARNING"] + + for expected_message in expected_warning_messages: + assert expected_message in logged_messages + + # Proceed with existing assertions + final_state = [orjson.loads(orjson.dumps(message.state.stream.stream_state)) for message in output if message.state] + assert final_state[-1] == {"use_global_cursor": True, "state": {"cursor_field": "2022-02-19"}, "lookback_window": 1} + + +def test_per_partition_cursor_within_limit(caplog): + """ + Test that the PerPartitionCursor correctly updates the state for each partition + when the number of partitions is within the allowed limit. + + This test also checks that no warning logs are emitted when the partition limit is not exceeded. + """ + source = ManifestDeclarativeSource( + source_config=ManifestBuilder() + .with_list_partition_router("Rates", "partition_field", ["1", "2", "3"]) + .with_incremental_sync( + "Rates", + start_datetime="2022-01-01", + end_datetime="2022-03-31", + datetime_format="%Y-%m-%d", + cursor_field=CURSOR_FIELD, + step="P1M", + cursor_granularity="P1D", + ) + .build() + ) + + partition_slices = [StreamSlice(partition={"partition_field": str(i)}, cursor_slice={}) for i in range(1, 4)] + + records_list = [ + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-01-15"}, partition_slices[0])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-02-20"}, partition_slices[0])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-03-25"}, partition_slices[0])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-01-16"}, partition_slices[1])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-02-18"}, partition_slices[1])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-03-28"}, partition_slices[1])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-01-17"}, partition_slices[2])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-02-19"}, partition_slices[2])], + [Record({"a record key": "a record value", CURSOR_FIELD: "2022-03-29"}, partition_slices[2])], + ] + + configured_stream = ConfiguredAirbyteStream( + stream=AirbyteStream(name="Rates", json_schema={}, supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental]), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ) + catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) + + initial_state = {} + logger = MagicMock() + + # Use caplog to capture logs + with caplog.at_level(logging.WARNING, logger="airbyte"): + with patch.object(SimpleRetriever, "_read_pages", side_effect=records_list): + with patch.object(PerPartitionCursor, "DEFAULT_MAX_PARTITIONS_NUMBER", 5): + output = list(source.read(logger, {}, catalog, initial_state)) + + # Since the partition limit is not exceeded, we expect no warnings + logged_warnings = [record.message for record in caplog.records if record.levelname == "WARNING"] + assert len(logged_warnings) == 0 + + # Proceed with existing assertions + final_state = [orjson.loads(orjson.dumps(message.state.stream.stream_state)) for message in output if message.state] + assert final_state[-1] == { + "lookback_window": 1, + "state": {"cursor_field": "2022-03-29"}, + "use_global_cursor": False, + "states": [ + { + "partition": {"partition_field": "1"}, + "cursor": {CURSOR_FIELD: "2022-03-25"}, + }, + { + "partition": {"partition_field": "2"}, + "cursor": {CURSOR_FIELD: "2022-03-28"}, + }, + { + "partition": {"partition_field": "3"}, + "cursor": {CURSOR_FIELD: "2022-03-29"}, + }, + ], } diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 592fb5eb8a29..9b3789050287 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -25,7 +25,13 @@ from airbyte_cdk.sources.declarative.decoders import JsonDecoder, PaginationDecoderDecorator from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector from airbyte_cdk.sources.declarative.extractors.record_filter import ClientSideIncrementalRecordFilterDecorator -from airbyte_cdk.sources.declarative.incremental import CursorFactory, DatetimeBasedCursor, PerPartitionCursor, ResumableFullRefreshCursor +from airbyte_cdk.sources.declarative.incremental import ( + CursorFactory, + DatetimeBasedCursor, + PerPartitionCursor, + PerPartitionWithGlobalCursor, + ResumableFullRefreshCursor, +) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import CheckStream as CheckStreamModel from airbyte_cdk.sources.declarative.models import CompositeErrorHandler as CompositeErrorHandlerModel @@ -702,9 +708,9 @@ def test_stream_with_incremental_and_retriever_with_partition_router(): assert isinstance(stream, DeclarativeStream) assert isinstance(stream.retriever, SimpleRetriever) - assert isinstance(stream.retriever.stream_slicer, PerPartitionCursor) + assert isinstance(stream.retriever.stream_slicer, PerPartitionWithGlobalCursor) - datetime_stream_slicer = stream.retriever.stream_slicer._cursor_factory.create() + datetime_stream_slicer = stream.retriever.stream_slicer._per_partition_cursor._cursor_factory.create() assert isinstance(datetime_stream_slicer, DatetimeBasedCursor) assert isinstance(datetime_stream_slicer._start_datetime, MinMaxDatetime) assert datetime_stream_slicer._start_datetime.datetime.string == "{{ config['start_time'] }}" @@ -1047,7 +1053,7 @@ def test_client_side_incremental_with_partition_router(): stream = factory.create_component(model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config) assert isinstance(stream.retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator) - assert isinstance(stream.retriever.record_selector.record_filter._per_partition_cursor, PerPartitionCursor) + assert isinstance(stream.retriever.record_selector.record_filter._substream_cursor, PerPartitionWithGlobalCursor) def test_given_data_feed_and_client_side_incremental_then_raise_error(): @@ -2056,7 +2062,7 @@ def test_default_schema_loader(self): "values": "{{config['repos']}}", "cursor_field": "a_key", }, - PerPartitionCursor, + PerPartitionWithGlobalCursor, id="test_create_simple_retriever_with_incremental_and_partition_router", ), pytest.param( @@ -2081,7 +2087,7 @@ def test_default_schema_loader(self): "cursor_field": "b_key", }, ], - PerPartitionCursor, + PerPartitionWithGlobalCursor, id="test_create_simple_retriever_with_partition_routers_multiple_components", ), pytest.param(None, None, SinglePartitionRouter, id="test_create_simple_retriever_with_no_incremental_or_partition_router"), @@ -2118,15 +2124,16 @@ def test_merge_incremental_and_partition_router(incremental, partition_router, e assert isinstance(stream, DeclarativeStream) assert isinstance(stream.retriever, SimpleRetriever) + print(stream.retriever.stream_slicer) assert isinstance(stream.retriever.stream_slicer, expected_type) if incremental and partition_router: - assert isinstance(stream.retriever.stream_slicer, PerPartitionCursor) + assert isinstance(stream.retriever.stream_slicer, PerPartitionWithGlobalCursor) if isinstance(partition_router, list) and len(partition_router) > 1: assert isinstance(stream.retriever.stream_slicer._partition_router, CartesianProductStreamSlicer) assert len(stream.retriever.stream_slicer._partition_router.stream_slicers) == len(partition_router) elif partition_router and isinstance(partition_router, list) and len(partition_router) > 1: - assert isinstance(stream.retriever.stream_slicer, PerPartitionCursor) + assert isinstance(stream.retriever.stream_slicer, PerPartitionWithGlobalCursor) assert len(stream.retriever.stream_slicer.stream_slicerS) == len(partition_router) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py index d7f35475a716..fecf5cb5b2db 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py @@ -245,6 +245,87 @@ def _run_read( return list(source.read(logger, config, catalog, state)) +def run_incremental_parent_state_test(manifest, mock_requests, expected_records, initial_state, expected_states): + """ + Run an incremental parent state test for the specified stream. + + This function performs the following steps: + 1. Mocks the API requests as defined in mock_requests. + 2. Executes the read operation using the provided manifest and config. + 3. Asserts that the output records match the expected records. + 4. Collects intermediate states and records, performing additional reads as necessary. + 5. Compares the cumulative records from each state against the expected records. + 6. Asserts that the final state matches one of the expected states for each run. + + Args: + manifest (dict): The manifest configuration for the stream. + mock_requests (list): A list of tuples containing URL and response data for mocking API requests. + expected_records (list): The expected records to compare against the output. + initial_state (list): The initial state to start the read operation. + expected_states (list): A list of expected final states after the read operation. + """ + _stream_name = "post_comment_votes" + config = {"start_date": "2024-01-01T00:00:01Z", "credentials": {"email": "email", "api_token": "api_token"}} + + with requests_mock.Mocker() as m: + for url, response in mock_requests: + m.get(url, json=response) + + # Run the initial read + output = _run_read(manifest, config, _stream_name, initial_state) + output_data = [message.record.data for message in output if message.record] + + # Assert that output_data equals expected_records + assert output_data == expected_records + + # Collect the intermediate states and records produced before each state + cumulative_records = [] + intermediate_states = [] + final_states = [] # To store the final state after each read + + # Store the final state after the initial read + final_state_initial = [orjson.loads(orjson.dumps(message.state.stream.stream_state)) for message in output if message.state] + final_states.append(final_state_initial[-1]) + + for message in output: + if message.type.value == "RECORD": + record_data = message.record.data + cumulative_records.append(record_data) + elif message.type.value == "STATE": + # Record the state and the records produced before this state + state = message.state + records_before_state = cumulative_records.copy() + intermediate_states.append((state, records_before_state)) + + # For each intermediate state, perform another read starting from that state + for state, records_before_state in intermediate_states[:-1]: + output_intermediate = _run_read(manifest, config, _stream_name, [state]) + records_from_state = [message.record.data for message in output_intermediate if message.record] + + # Combine records produced before the state with records from the new read + cumulative_records_state = records_before_state + records_from_state + + # Duplicates may occur because the state matches the cursor of the last record, causing it to be re-emitted in the next sync. + cumulative_records_state_deduped = list({orjson.dumps(record): record for record in cumulative_records_state}.values()) + + # Compare the cumulative records with the expected records + expected_records_set = list({orjson.dumps(record): record for record in expected_records}.values()) + assert sorted(cumulative_records_state_deduped, key=lambda x: orjson.dumps(x)) == sorted( + expected_records_set, key=lambda x: orjson.dumps(x) + ), f"Records mismatch with intermediate state {state}. Expected {expected_records}, got {cumulative_records_state_deduped}" + + # Store the final state after each intermediate read + final_state_intermediate = [ + orjson.loads(orjson.dumps(message.state.stream.stream_state)) for message in output_intermediate if message.state + ] + final_states.append(final_state_intermediate[-1]) + + # Assert that the final state matches one of the expected states for all runs + assert any( + final_state in expected_states for final_state in final_states + ), f"Final state mismatch. Expected one of {expected_states}, got {final_states}" + + @pytest.mark.parametrize( "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", [ @@ -350,6 +431,13 @@ def _run_read( "votes": [{"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"}], }, ), + # Fetch votes for comment 12 of post 1 + ( + "https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-15T00:00:00Z", + { + "votes": [], + }, + ), # Fetch votes for comment 20 of post 2 ( "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-12T00:00:00Z", @@ -403,6 +491,22 @@ def _run_read( ], # Expected state { + "use_global_cursor": False, + "state": {"created_at": "2024-01-15T00:00:00Z"}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": "2024-01-25T00:00:00Z"}, + "parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}}, + "lookback_window": 1, + "states": [ + {"partition": {"id": 1, "parent_slice": {}}, "cursor": {"updated_at": "2024-01-25T00:00:00Z"}}, + {"partition": {"id": 2, "parent_slice": {}}, "cursor": {"updated_at": "2024-01-22T00:00:00Z"}}, + {"partition": {"id": 3, "parent_slice": {}}, "cursor": {"updated_at": "2024-01-09T00:00:00Z"}}, + ], + } + }, + "lookback_window": 1, "states": [ { "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, @@ -425,21 +529,173 @@ def _run_read( "cursor": {"created_at": "2024-01-10T00:00:00Z"}, }, ], + }, + ), + ], +) +def test_incremental_parent_state(test_name, manifest, mock_requests, expected_records, initial_state, expected_state): + additional_expected_state = copy.deepcopy(expected_state) + # State for empty partition (comment 12), when the global cursor is used for intermediate states + empty_state = ( + {"cursor": {"created_at": "2024-01-15T00:00:00Z"}, "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}}, + ) + additional_expected_state["states"].append(empty_state) + run_incremental_parent_state_test(manifest, mock_requests, expected_records, initial_state, [expected_state, additional_expected_state]) + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z", + { + "posts": [], + "next_page": "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + }, + ), + # Fetch the second page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + {"posts": []}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": []}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [], + "next_page": "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + "https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + {"votes": []}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ("https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-01T00:00:01Z", {"votes": []}), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": []}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + "https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": []}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + "https://api.example.com/community/posts/3/comments/30/votes?per_page=100", + {"votes": []}, + ), + ], + # Expected records + [], + # Initial state + [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="post_comment_votes", namespace=None), + stream_state=AirbyteStateBlob( + { + "parent_state": { + "post_comments": { + "states": [ + {"partition": {"id": 1, "parent_slice": {}}, "cursor": {"updated_at": "2023-01-04T00:00:00Z"}} + ], + "parent_state": {"posts": {"updated_at": "2024-01-05T00:00:00Z"}}, + } + }, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + ], + "state": {"created_at": "2024-01-03T00:00:00Z"}, + "lookback_window": 1, + } + ), + ), + ) + ], + # Expected state + { + "lookback_window": 1, + "use_global_cursor": False, + "state": {"created_at": "2024-01-03T00:00:00Z"}, "parent_state": { "post_comments": { - "states": [ - {"partition": {"id": 1, "parent_slice": {}}, "cursor": {"updated_at": "2024-01-25T00:00:00Z"}}, - {"partition": {"id": 2, "parent_slice": {}}, "cursor": {"updated_at": "2024-01-22T00:00:00Z"}}, - {"partition": {"id": 3, "parent_slice": {}}, "cursor": {"updated_at": "2024-01-09T00:00:00Z"}}, - ], - "parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}}, + "use_global_cursor": False, + "state": {}, + "parent_state": {"posts": {"updated_at": "2024-01-05T00:00:00Z"}}, + "states": [{"partition": {"id": 1, "parent_slice": {}}, "cursor": {"updated_at": "2023-01-04T00:00:00Z"}}], } }, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + ], }, ), ], ) -def test_incremental_parent_state(test_name, manifest, mock_requests, expected_records, initial_state, expected_state): +def test_incremental_parent_state_no_slices(test_name, manifest, mock_requests, expected_records, initial_state, expected_state): + """ + Test incremental partition router with no parent records + """ _stream_name = "post_comment_votes" config = {"start_date": "2024-01-01T00:00:01Z", "credentials": {"email": "email", "api_token": "api_token"}} @@ -447,44 +703,178 @@ def test_incremental_parent_state(test_name, manifest, mock_requests, expected_r for url, response in mock_requests: m.get(url, json=response) - # Run the initial read output = _run_read(manifest, config, _stream_name, initial_state) output_data = [message.record.data for message in output if message.record] - # Assert that output_data equals expected_records assert output_data == expected_records + final_state = [orjson.loads(orjson.dumps(message.state.stream.stream_state)) for message in output if message.state] + assert final_state[-1] == expected_state - # Collect the intermediate states and records produced before each state - cumulative_records = [] - intermediate_states = [] - for message in output: - if message.type.value == "RECORD": - record_data = message.record.data - cumulative_records.append(record_data) - elif message.type.value == "STATE": - # Record the state and the records produced before this state - state = message.state - records_before_state = cumulative_records.copy() - intermediate_states.append((state, records_before_state)) - - # For each intermediate state, perform another read starting from that state - for state, records_before_state in intermediate_states[:-1]: - output_intermediate = _run_read(manifest, config, _stream_name, [state]) - records_from_state = [message.record.data for message in output_intermediate if message.record] - # Combine records produced before the state with records from the new read - cumulative_records_state = records_before_state + records_from_state +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z", + { + "posts": [{"id": 1, "updated_at": "2024-01-30T00:00:00Z"}, {"id": 2, "updated_at": "2024-01-29T00:00:00Z"}], + "next_page": "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + }, + ), + # Fetch the second page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + {"posts": [{"id": 3, "updated_at": "2024-01-28T00:00:00Z"}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": "2023-01-01T00:00:00Z"}, + {"id": 10, "post_id": 1, "updated_at": "2024-01-25T00:00:00Z"}, + {"id": 11, "post_id": 1, "updated_at": "2024-01-24T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": "2024-01-23T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + { + "votes": [], + "next_page": "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + {"votes": []}, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + "https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + {"votes": []}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ("https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-03T00:00:00Z", {"votes": []}), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [{"id": 20, "post_id": 2, "updated_at": "2024-01-22T00:00:00Z"}], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": "2024-01-21T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + {"votes": []}, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + "https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + {"votes": []}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": [{"id": 30, "post_id": 3, "updated_at": "2024-01-09T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + "https://api.example.com/community/posts/3/comments/30/votes?per_page=100", + {"votes": []}, + ), + ], + # Expected records + [], + # Initial state + [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="post_comment_votes", namespace=None), + stream_state=AirbyteStateBlob( + { + "parent_state": { + "post_comments": { + "states": [ + {"partition": {"id": 1, "parent_slice": {}}, "cursor": {"updated_at": "2023-01-04T00:00:00Z"}} + ], + "parent_state": {"posts": {"updated_at": "2024-01-05T00:00:00Z"}}, + } + }, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + ], + "use_global_cursor": True, + "state": {"created_at": "2024-01-03T00:00:00Z"}, + "lookback_window": 0, + } + ), + ), + ) + ], + # Expected state + { + "lookback_window": 1, + "use_global_cursor": True, + "state": {"created_at": "2024-01-03T00:00:00Z"}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": "2024-01-25T00:00:00Z"}, + "parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}}, + "lookback_window": 1, + "states": [ + {"partition": {"id": 1, "parent_slice": {}}, "cursor": {"updated_at": "2024-01-25T00:00:00Z"}}, + {"partition": {"id": 2, "parent_slice": {}}, "cursor": {"updated_at": "2024-01-22T00:00:00Z"}}, + {"partition": {"id": 3, "parent_slice": {}}, "cursor": {"updated_at": "2024-01-09T00:00:00Z"}}, + ], + } + }, + }, + ), + ], +) +def test_incremental_parent_state_no_records(test_name, manifest, mock_requests, expected_records, initial_state, expected_state): + """ + Test incremental partition router with no child records + """ + _stream_name = "post_comment_votes" + config = {"start_date": "2024-01-01T00:00:01Z", "credentials": {"email": "email", "api_token": "api_token"}} - # Duplicates may occur because the state matches the cursor of the last record, causing it to be re-emitted in the next sync. - cumulative_records_state_deduped = list({orjson.dumps(record): record for record in cumulative_records_state}.values()) + with requests_mock.Mocker() as m: + for url, response in mock_requests: + m.get(url, json=response) - # Compare the cumulative records with the expected records - expected_records_set = list({orjson.dumps(record): record for record in expected_records}.values()) - assert sorted(cumulative_records_state_deduped, key=lambda x: orjson.dumps(x)) == sorted( - expected_records_set, key=lambda x: orjson.dumps(x) - ), f"Records mismatch with intermediate state {state}. Expected {expected_records}, got {cumulative_records_state_deduped}" + output = _run_read(manifest, config, _stream_name, initial_state) + output_data = [message.record.data for message in output if message.record] - # Assert that the final state matches the expected state + assert output_data == expected_records final_state = [orjson.loads(orjson.dumps(message.state.stream.stream_state)) for message in output if message.state] assert final_state[-1] == expected_state @@ -624,6 +1014,9 @@ def test_incremental_parent_state(test_name, manifest, mock_requests, expected_r ], # Expected state { + "use_global_cursor": False, + "state": {"created_at": "2024-01-15T00:00:00Z"}, + "lookback_window": 1, "states": [ { "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, @@ -994,6 +1387,38 @@ def test_incremental_parent_state_no_incremental_dependency( "https://api.example.com/community/posts/3/comments/30/votes?per_page=100", {"votes": [{"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"}]}, ), + # Requests with intermediate states + # Fetch votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-14T23:59:59Z", + { + "votes": [{"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"}], + }, + ), + # Fetch votes for comment 11 of post 1 + ( + "https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time=2024-01-14T23:59:59Z", + { + "votes": [{"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"}], + }, + ), + # Fetch votes for comment 12 of post 1 + ( + "https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-14T23:59:59Z", + { + "votes": [], + }, + ), + # Fetch votes for comment 20 of post 2 + ( + "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-14T23:59:59Z", + {"votes": [{"id": 200, "comment_id": 20, "created_at": "2024-01-12T00:00:00Z"}]}, + ), + # Fetch votes for comment 21 of post 2 + ( + "https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time=2024-01-14T23:59:59Z", + {"votes": [{"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"}]}, + ), ], # Expected records [ @@ -1030,14 +1455,18 @@ def test_incremental_parent_state_no_incremental_dependency( # Expected state { "state": {"created_at": "2024-01-15T00:00:00Z"}, + "lookback_window": 1, "parent_state": { "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": "2024-01-25T00:00:00Z"}, + "parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}}, + "lookback_window": 1, "states": [ {"partition": {"id": 1, "parent_slice": {}}, "cursor": {"updated_at": "2024-01-25T00:00:00Z"}}, {"partition": {"id": 2, "parent_slice": {}}, "cursor": {"updated_at": "2024-01-22T00:00:00Z"}}, {"partition": {"id": 3, "parent_slice": {}}, "cursor": {"updated_at": "2024-01-09T00:00:00Z"}}, ], - "parent_state": {"posts": {"updated_at": "2024-01-30T00:00:00Z"}}, } }, }, @@ -1163,23 +1592,9 @@ def test_incremental_parent_state_no_incremental_dependency( ) ], # Expected state - {"state": {"created_at": "2024-01-15T00:00:00Z"}}, + {"state": {"created_at": "2024-01-15T00:00:00Z"}, "lookback_window": 1}, ), ], ) def test_incremental_global_parent_state(test_name, manifest, mock_requests, expected_records, initial_state, expected_state): - _stream_name = "post_comment_votes" - config = {"start_date": "2024-01-01T00:00:01Z", "credentials": {"email": "email", "api_token": "api_token"}} - - with requests_mock.Mocker() as m: - for url, response in mock_requests: - m.get(url, json=response) - - output = _run_read(manifest, config, _stream_name, initial_state) - output_data = [message.record.data for message in output if message.record] - - assert output_data == expected_records - final_state = [orjson.loads(orjson.dumps(message.state.stream.stream_state)) for message in output if message.state][-1] - assert "lookback_window" in final_state - final_state.pop("lookback_window") - assert final_state == expected_state + run_incremental_parent_state_test(manifest, mock_requests, expected_records, initial_state, [expected_state]) diff --git a/docs/connector-development/config-based/understanding-the-yaml-file/incremental-syncs.md b/docs/connector-development/config-based/understanding-the-yaml-file/incremental-syncs.md index 1ba21a4f9065..c88efc8269ae 100644 --- a/docs/connector-development/config-based/understanding-the-yaml-file/incremental-syncs.md +++ b/docs/connector-development/config-based/understanding-the-yaml-file/incremental-syncs.md @@ -175,22 +175,25 @@ incremental_sync: Nested streams, subresources, or streams that depend on other streams can be implemented using a [`SubstreamPartitionRouter`](#SubstreamPartitionRouter) -The default state format is **per partition**, but there are options to enhance efficiency depending on your use case: **incremental_dependency** and **global_substream_cursor**. Here's when and how to use each option, with examples: - -#### Per Partition (Default) -- **Description**: This is the default state format, where each partition has its own cursor. -- **Limitation**: The per partition state has a limit of 10,000 partitions. When this limit is exceeded, the oldest partitions are deleted. During the next sync, deleted partitions will be read in full refresh, which can be inefficient. -- **When to Use**: Use this option if the number of partitions is manageable (under 10,000). +The default state format is **per partition with fallback to global**, but there are options to enhance efficiency depending on your use case: **incremental_dependency** and **global_substream_cursor**. Here's when and how to use each option, with examples: +#### Per Partition with Fallback to Global (Default) +- **Description**: This is the default state format, where each partition has its own cursor. However, when the number of records in the parent sync exceeds two times the set limit, the cursor automatically falls back to a global state to manage efficiency and scalability. +- **Limitation**: The per partition state has a limit of 10,000 partitions. Once this limit is exceeded, the global cursor takes over, aggregating the state across partitions to avoid inefficiencies. +- **When to Use**: Use this as the default option for most cases. It provides the flexibility of managing partitions while preventing performance degradation when large numbers of records are involved. - **Example State**: ```json - [ - { "partition": "A", "timestamp": "2024-08-01T00:00:00" }, - { "partition": "B", "timestamp": "2024-08-01T01:00:00" }, - { "partition": "C", "timestamp": "2024-08-01T02:00:00" } - ] + { + "states": [ + {"partition_key": "partition_1", "cursor_field": "2021-01-15"}, + {"partition_key": "partition_2", "cursor_field": "2021-02-14"} + ], + "state": { + "cursor_field": "2021-02-15" + }, + "use_global_cursor": false + } ``` - #### Incremental Dependency - **Description**: This option allows the parent stream to be read incrementally, ensuring that only new data is synced. - **Requirement**: The API must ensure that the parent record's cursor is updated whenever child records are added or updated. If this requirement is not met, child records added to older parent records will be lost. @@ -201,7 +204,7 @@ The default state format is **per partition**, but there are options to enhance "parent_state": { "parent_stream": { "timestamp": "2024-08-01T00:00:00" } }, - "child_state": [ + "states": [ { "partition": "A", "timestamp": "2024-08-01T00:00:00" }, { "partition": "B", "timestamp": "2024-08-01T01:00:00" } ] @@ -209,9 +212,9 @@ The default state format is **per partition**, but there are options to enhance ``` #### Global Substream Cursor -- **Description**: This option uses a single global cursor for all partitions, significantly reducing the state size. It enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs. -- **When to Use**: Use this option if the number of partitions in the parent stream is significantly higher than the 10,000 partition limit (e.g., millions of records per sync). This prevents the inefficiency of reading most partitions in full refresh and avoids duplicates during the next sync. -- **Operational Detail**: The global cursor's value is updated only at the end of the sync. If the sync fails, only the parent state is updated if the incremental dependency is enabled. +- **Description**: This option uses a single global cursor for all partitions, significantly reducing the state size. It enforces a minimal lookback window based on the previous sync's duration to avoid losing records added or updated during the sync. Since the global cursor is already part of the per partition with fallback to global approach, it should only be used cautiously for custom connectors with exceptionally large parent streams to avoid managing state per partition. +- **When to Use**: Use this option cautiously for custom connectors where the number of partitions in the parent stream is extremely high (e.g., millions of records per sync). The global cursor avoids the inefficiency of managing state per partition but sacrifices some granularity, which may not be suitable for every use case. +- **Operational Detail**: The global cursor is updated only at the end of the sync. If the sync fails, only the parent state is updated, provided that the incremental dependency is enabled. The global cursor ensures that records are captured through a lookback window, even if they were added during the sync. - **Example State**: ```json [ @@ -220,9 +223,10 @@ The default state format is **per partition**, but there are options to enhance ``` ### Summary -- **Per Partition**: Default, use for manageable partitions (\<10k). -- **Incremental Dependency**: Use for incremental parent streams with a dependent child cursor. Ensure API updates parent cursor with child records. -- **Global Substream Cursor**: Ideal for large-scale parent streams with many partitions to optimize performance. +Summary +- **Per Partition with Fallback to Global (Default)**: Best for managing scalability and optimizing state size. Starts with per partition cursors, and automatically falls back to a global cursor if the number of records in the parent sync exceeds two times the partition limit. +- **Incremental Dependency**: Use for incremental parent streams with a dependent child cursor. Ensure the API updates the parent cursor when child records are added or updated. +- **Global Substream Cursor**: Use cautiously for custom connectors with very large parent streams. Avoids per partition state management but sacrifices some granularity. Choose the option that best fits your data structure and sync requirements to optimize performance and data integrity.