diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index e3156d3403a8..3656a88c3157 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -56,11 +56,9 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> """ @abstractmethod - def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]: + def streams(self, config: Mapping[str, Any]) -> List[Stream]: """ :param config: The user-provided configuration as specified by the source's spec. - :param include_concurrent_streams: Concurrent sources can be made up of streams that can be run concurrently and - ones that must be run synchronously. By default, for backwards compatibility this is disabled. Any stream construction related operation should happen here. :return: A list of the streams in this source connector. """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index 78b73426d74b..baf056d3c799 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -7,8 +7,8 @@ from dataclasses import InitVar, dataclass from typing import Any, List, Mapping, Tuple +from airbyte_cdk import AbstractSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker -from airbyte_cdk.sources.source import Source from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy @@ -27,8 +27,8 @@ class CheckStream(ConnectionChecker): def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters - def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: - streams = source.streams(config=config, include_concurrent_streams=True) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface + def check_connection(self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: + streams = source.streams(config=config) stream_name_to_stream = {s.name: s for s in streams} if len(streams) == 0: return False, f"No streams to connect to from source {source}" diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py index ede15537f38b..908e659b2a9d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from typing import Any, Mapping, Tuple -from airbyte_cdk.sources.source import Source +from airbyte_cdk import AbstractSource class ConnectionChecker(ABC): @@ -15,7 +15,7 @@ class ConnectionChecker(ABC): """ @abstractmethod - def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: + def check_connection(self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: """ Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 644d868c9390..4d458fdc6922 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -5,7 +5,7 @@ import logging from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple, Union -from airbyte_cdk.models import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog +from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel @@ -105,24 +105,19 @@ def read( filtered_catalog = self._remove_concurrent_streams_from_catalog(catalog=catalog, concurrent_stream_names=concurrent_stream_names) yield from super().read(logger, config, filtered_catalog, state) - def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: - return super().check(logger=logger, config=config) - def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: - return AirbyteCatalog( - streams=[stream.as_airbyte_stream() for stream in self.streams(config=config, include_concurrent_streams=True)] - ) + return AirbyteCatalog(streams=[stream.as_airbyte_stream() for stream in self._concurrent_streams + self._synchronous_streams]) - def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]: + def streams(self, config: Mapping[str, Any]) -> List[Stream]: """ - Returns the list of streams that can be run synchronously in the Python CDK. + The `streams` method is used as part of the AbstractSource in the following cases: + * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams + * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) + Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. - NOTE: For ConcurrentDeclarativeSource, this method only returns synchronous streams because it usage is invoked within the - existing Python CDK. Streams that support concurrency are started from read(). + In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. """ - if include_concurrent_streams: - return self._synchronous_streams + self._concurrent_streams # type: ignore # Although AbstractStream doesn't inherit stream, they were designed to fit the same interface when called from streams() - return self._synchronous_streams + return super().streams(config) def _group_streams(self, config: Mapping[str, Any]) -> Tuple[List[AbstractStream], List[Stream]]: concurrent_streams: List[AbstractStream] = [] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 97fc0be52340..842d4e944454 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -88,7 +88,7 @@ def connection_checker(self) -> ConnectionChecker: else: raise ValueError(f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}") - def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]: + def streams(self, config: Mapping[str, Any]) -> List[Stream]: self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}) stream_configs = self._stream_configs(self._source_config) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py index 3472619eda42..21713869c303 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -292,22 +292,36 @@ def generate_slices(self) -> Iterable[Tuple[CursorValueType, CursorValueType]]: self._merge_partitions() if self._start is not None and self._is_start_before_first_slice(): - yield from self._split_per_slice_range(self._start, self.state["slices"][0][self._connector_state_converter.START_KEY]) + yield from self._split_per_slice_range( + self._start, + self.state["slices"][0][self._connector_state_converter.START_KEY], + False, + ) if len(self.state["slices"]) == 1: yield from self._split_per_slice_range( self._calculate_lower_boundary_of_last_slice(self.state["slices"][0][self._connector_state_converter.END_KEY]), self._end_provider(), + True, ) elif len(self.state["slices"]) > 1: for i in range(len(self.state["slices"]) - 1): - yield from self._split_per_slice_range( - self.state["slices"][i][self._connector_state_converter.END_KEY], - self.state["slices"][i + 1][self._connector_state_converter.START_KEY], - ) + if self._cursor_granularity: + yield from self._split_per_slice_range( + self.state["slices"][i][self._connector_state_converter.END_KEY] + self._cursor_granularity, + self.state["slices"][i + 1][self._connector_state_converter.START_KEY], + False, + ) + else: + yield from self._split_per_slice_range( + self.state["slices"][i][self._connector_state_converter.END_KEY], + self.state["slices"][i + 1][self._connector_state_converter.START_KEY], + False, + ) yield from self._split_per_slice_range( self._calculate_lower_boundary_of_last_slice(self.state["slices"][-1][self._connector_state_converter.END_KEY]), self._end_provider(), + True, ) else: raise ValueError("Expected at least one slice") @@ -320,7 +334,9 @@ def _calculate_lower_boundary_of_last_slice(self, lower_boundary: CursorValueTyp return lower_boundary - self._lookback_window return lower_boundary - def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType) -> Iterable[Tuple[CursorValueType, CursorValueType]]: + def _split_per_slice_range( + self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool + ) -> Iterable[Tuple[CursorValueType, CursorValueType]]: if lower >= upper: return @@ -329,7 +345,7 @@ def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType) lower = max(lower, self._start) if self._start else lower if not self._slice_range or lower + self._slice_range >= upper: - if self._cursor_granularity: + if self._cursor_granularity and not upper_is_end: yield lower, upper - self._cursor_granularity else: yield lower, upper @@ -338,7 +354,8 @@ def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType) current_lower_boundary = lower while not stop_processing: current_upper_boundary = min(current_lower_boundary + self._slice_range, upper) - if self._cursor_granularity: + has_reached_upper_boundary = current_upper_boundary >= upper + if self._cursor_granularity and (not upper_is_end or not has_reached_upper_boundary): yield current_lower_boundary, current_upper_boundary - self._cursor_granularity else: yield current_lower_boundary, current_upper_boundary diff --git a/airbyte-cdk/python/airbyte_cdk/test/mock_http/matcher.py b/airbyte-cdk/python/airbyte_cdk/test/mock_http/matcher.py index 441a765b7321..d07cec3ec8b2 100644 --- a/airbyte-cdk/python/airbyte_cdk/test/mock_http/matcher.py +++ b/airbyte-cdk/python/airbyte_cdk/test/mock_http/matcher.py @@ -1,4 +1,5 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. +from typing import Any from airbyte_cdk.test.mock_http.request import HttpRequest @@ -33,3 +34,8 @@ def __str__(self) -> str: f"minimum_number_of_expected_match={self._minimum_number_of_expected_match}, " f"actual_number_of_matches={self._actual_number_of_matches})" ) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, HttpRequestMatcher): + return self._request_to_match == other._request_to_match + return False diff --git a/airbyte-cdk/python/airbyte_cdk/test/mock_http/mocker.py b/airbyte-cdk/python/airbyte_cdk/test/mock_http/mocker.py index 90d4745372b3..5287c7451d2d 100644 --- a/airbyte-cdk/python/airbyte_cdk/test/mock_http/mocker.py +++ b/airbyte-cdk/python/airbyte_cdk/test/mock_http/mocker.py @@ -57,6 +57,8 @@ def _mock_request_method( responses = [responses] matcher = HttpRequestMatcher(request, len(responses)) + if matcher in self._matchers: + raise ValueError(f"Request {matcher.request} already mocked") self._matchers.append(matcher) getattr(self._mocker, method)( diff --git a/airbyte-cdk/python/airbyte_cdk/test/mock_http/request.py b/airbyte-cdk/python/airbyte_cdk/test/mock_http/request.py index a2b6bdb9430a..756be23edd06 100644 --- a/airbyte-cdk/python/airbyte_cdk/test/mock_http/request.py +++ b/airbyte-cdk/python/airbyte_cdk/test/mock_http/request.py @@ -85,3 +85,13 @@ def __str__(self) -> str: def __repr__(self) -> str: return f"HttpRequest(request={self._parsed_url}, headers={self._headers}, body={self._body!r})" + + def __eq__(self, other: Any) -> bool: + if isinstance(other, HttpRequest): + return ( + self._parsed_url == other._parsed_url + and self._query_params == other._query_params + and self._headers == other._headers + and self._body == other._body + ) + return False diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_job_repository.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_job_repository.py index 90c1f9854baf..90768d8bbb60 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_job_repository.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_job_repository.py @@ -168,12 +168,17 @@ def test_given_unknown_status_when_update_jobs_status_then_raise_error(self) -> self._repository.update_jobs_status([job]) def test_given_multiple_jobs_when_update_jobs_status_then_all_the_jobs_are_updated(self) -> None: - self._mock_create_response(_A_JOB_ID) + self._http_mocker.post( + HttpRequest(url=_EXPORT_URL), + [ + HttpResponse(body=json.dumps({"id": _A_JOB_ID})), + HttpResponse(body=json.dumps({"id": _ANOTHER_JOB_ID})), + ], + ) self._http_mocker.get( HttpRequest(url=f"{_EXPORT_URL}/{_A_JOB_ID}"), HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "ready"})), ) - self._mock_create_response(_ANOTHER_JOB_ID) self._http_mocker.get( HttpRequest(url=f"{_EXPORT_URL}/{_ANOTHER_JOB_ID}"), HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "ready"})), diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 662531b61861..3fb0f6571043 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -3,8 +3,9 @@ # import copy +import json from datetime import datetime, timedelta, timezone -from typing import Any, Iterable, List, Mapping, Optional, Union +from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union import freezegun import isodate @@ -21,17 +22,21 @@ ConfiguredAirbyteStream, DestinationSyncMode, FailureType, + Status, StreamDescriptor, SyncMode, ) from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSource from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.checkpoint import Cursor from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.types import Record, StreamSlice +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse from airbyte_cdk.utils import AirbyteTracedException +from deprecated.classic import deprecated _CONFIG = { "start_date": "2024-07-01T00:00:00.000Z" @@ -61,7 +66,35 @@ ) ] ) - +_LOCATIONS_RESPONSE = HttpResponse(json.dumps([ + {"id": "444", "name": "Yongen-jaya", "updated_at": "2024-08-10"}, + {"id": "scramble", "name": "Shibuya", "updated_at": "2024-08-10"}, + {"id": "aoyama", "name": "Aoyama-itchome", "updated_at": "2024-08-10"}, + {"id": "shin123", "name": "Shinjuku", "updated_at": "2024-08-10"}, +])) +_PALACES_RESPONSE = HttpResponse(json.dumps([ + {"id": "0", "world": "castle", "owner": "kamoshida"}, + {"id": "1", "world": "museum", "owner": "madarame"}, + {"id": "2", "world": "bank", "owner": "kaneshiro"}, + {"id": "3", "world": "pyramid", "owner": "futaba"}, + {"id": "4", "world": "spaceport", "owner": "okumura"}, + {"id": "5", "world": "casino", "owner": "nijima"}, + {"id": "6", "world": "cruiser", "owner": "shido"}, +])) +_PARTY_MEMBERS_SKILLS_RESPONSE = HttpResponse(json.dumps([ + {"id": "0", "name": "hassou tobi"}, + {"id": "1", "name": "mafreidyne"}, + {"id": "2", "name": "myriad truths"}, +])) +_EMPTY_RESPONSE = HttpResponse(json.dumps([])) +_NOW = "2024-09-10T00:00:00" +_NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES = [ + ({"start": "2024-07-01", "end": "2024-07-15"}, HttpResponse(json.dumps([{"id": "amamiya", "first_name": "ren", "last_name": "amamiya", "updated_at": "2024-07-10"}]))), + ({"start": "2024-07-16", "end": "2024-07-30"}, _EMPTY_RESPONSE), + ({"start": "2024-07-31", "end": "2024-08-14"}, HttpResponse(json.dumps([{"id": "nijima", "first_name": "makoto", "last_name": "nijima", "updated_at": "2024-08-10"}, ]))), + ({"start": "2024-08-15", "end": "2024-08-29"}, _EMPTY_RESPONSE), + ({"start": "2024-08-30", "end": "2024-09-10"}, HttpResponse(json.dumps([{"id": "yoshizawa", "first_name": "sumire", "last_name": "yoshizawa", "updated_at": "2024-09-10"}]))), +] _MANIFEST = { "version": "5.0.0", "definitions": { @@ -69,7 +102,7 @@ "type": "RecordSelector", "extractor": { "type": "DpathExtractor", - "field_path": ["{{ parameters.get('data_field') }}"] + "field_path": [] } }, "requester": { @@ -96,7 +129,7 @@ "error_message": "No data available for the time range requested." } ] - } + }, }, "retriever": { "type": "SimpleRetriever", @@ -156,11 +189,7 @@ "retriever": { "$ref": "#/definitions/base_incremental_stream/retriever", "record_selector": { - "type": "RecordSelector", - "extractor": { - "type": "DpathExtractor", - "field_path": [] - } + "$ref": "#/definitions/selector" } }, "$parameters": { @@ -191,7 +220,7 @@ "$parameters": { "name": "palaces", "primary_key": "id", - "path": "/party_members" + "path": "/palaces" }, "schema_loader": { "type": "InlineSchemaLoader", @@ -224,11 +253,7 @@ } }, "record_selector": { - "type": "RecordSelector", - "extractor": { - "type": "DpathExtractor", - "field_path": [] - } + "$ref": "#/definitions/selector" } }, "incremental_sync": { @@ -264,11 +289,7 @@ "retriever": { "$ref": "#/definitions/base_incremental_stream/retriever", "record_selector": { - "type": "RecordSelector", - "extractor": { - "type": "DpathExtractor", - "field_path": [] - } + "$ref": "#/definitions/selector" }, "partition_router": { "type": "SubstreamPartitionRouter", @@ -285,7 +306,7 @@ "$parameters": { "name": "party_members_skills", "primary_key": "id", - "path": "/party_members_skills" + "path": "/party_members/{{stream_slice.party_member_id}}/skills" }, "schema_loader": { "type": "InlineSchemaLoader", @@ -323,10 +344,18 @@ } +@deprecated("See note in docstring for more information") class DeclarativeStreamDecorator(Stream): """ Helper class that wraps an existing DeclarativeStream but allows for overriding the output of read_records() to make it easier to mock behavior and test how low-code streams integrate with the Concurrent CDK. + + NOTE: We are not using that for now but the intent was to scope the tests to only testing that streams were properly instantiated and + interacted together properly. However in practice, we had a couple surprises like `get_cursor` and `stream_slices` needed to be + re-implemented as well. Because of that, we've move away from that in favour of doing tests that integrate up until the HTTP request. + The drawback of that is that we are dependent on any change later (like if the DatetimeBasedCursor changes, this will affect those + tests) but it feels less flaky than this. If we have new information in the future to infirm that, feel free to re-use this class as + necessary. """ def __init__(self, declarative_stream: DeclarativeStream, slice_to_records_mapping: Mapping[tuple[str, str], List[Mapping[str, Any]]]): @@ -367,6 +396,9 @@ def read_records( def get_json_schema(self) -> Mapping[str, Any]: return self._declarative_stream.get_json_schema() + def get_cursor(self) -> Optional[Cursor]: + return self._declarative_stream.get_cursor() + def test_group_streams(): """ @@ -487,13 +519,26 @@ def test_create_concurrent_cursor(): def test_check(): """ - Verifies that the ConcurrentDeclarativeSource check command is run against concurrent and synchronous streams + Verifies that the ConcurrentDeclarativeSource check command is run against synchronous streams """ - source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None) + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest("https://persona.metaverse.com/party_members?start=2024-07-01&end=2024-07-15"), + HttpResponse(json.dumps({"id": "amamiya", "first_name": "ren", "last_name": "amamiya", "updated_at": "2024-07-10"})), + ) + http_mocker.get( + HttpRequest("https://persona.metaverse.com/palaces"), + HttpResponse(json.dumps({"id": "palace_1"})), + ) + http_mocker.get( + HttpRequest("https://persona.metaverse.com/locations?m=active&i=1&g=country&start=2024-07-01&end=2024-07-31"), + HttpResponse(json.dumps({"id": "location_1"})), + ) + source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None) - actual_connection_state = source.check(logger=source.logger, config=_CONFIG) + connection_status = source.check(logger=source.logger, config=_CONFIG) - assert actual_connection_state + assert connection_status.status == Status.SUCCEEDED def test_discover(): @@ -513,46 +558,74 @@ def test_discover(): assert actual_catalog.streams[3].name in expected_stream_names -@freezegun.freeze_time("2024-09-10T00:00:00") +def _mock_requests(http_mocker: HttpMocker, url: str, query_params: List[Dict[str, str]], responses: List[HttpResponse]) -> None: + assert len(query_params) == len(responses), "Expecting as many slices as response" + + for i in range(len(query_params)): + http_mocker.get(HttpRequest(url, query_params=query_params[i]), responses[i]) + + +def _mock_party_members_requests(http_mocker: HttpMocker, slices_and_responses: List[Tuple[Dict[str, str], HttpResponse]]) -> None: + slices = list(map(lambda slice_and_response: slice_and_response[0], slices_and_responses)) + responses = list(map(lambda slice_and_response: slice_and_response[1], slices_and_responses)) + + _mock_requests( + http_mocker, + "https://persona.metaverse.com/party_members", + slices, + responses, + ) + + +def _mock_locations_requests(http_mocker: HttpMocker, slices: List[Dict[str, str]]) -> None: + locations_query_params = list(map(lambda _slice: _slice | {"m": "active", "i": "1", "g": "country"}, slices)) + _mock_requests( + http_mocker, + "https://persona.metaverse.com/locations", + locations_query_params, + [_LOCATIONS_RESPONSE] * len(slices), + ) + + +def _mock_party_members_skills_requests(http_mocker: HttpMocker) -> None: + """ + This method assumes _mock_party_members_requests has been called before else the stream won't work. + """ + http_mocker.get(HttpRequest("https://persona.metaverse.com/party_members/amamiya/skills"), _PARTY_MEMBERS_SKILLS_RESPONSE) + http_mocker.get(HttpRequest("https://persona.metaverse.com/party_members/nijima/skills"), _PARTY_MEMBERS_SKILLS_RESPONSE) + http_mocker.get(HttpRequest("https://persona.metaverse.com/party_members/yoshizawa/skills"), _PARTY_MEMBERS_SKILLS_RESPONSE) + + +@freezegun.freeze_time(_NOW) def test_read_with_concurrent_and_synchronous_streams(): """ Verifies that a ConcurrentDeclarativeSource processes concurrent streams followed by synchronous streams """ - + location_slices = [ + {"start": "2024-07-01", "end": "2024-07-31"}, + {"start": "2024-08-01", "end": "2024-08-31"}, + {"start": "2024-09-01", "end": "2024-09-10"}, + ] source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=None) - disable_emitting_sequential_state_messages(source=source) - assert len(source._concurrent_streams) == 2 - assert len(source._synchronous_streams) == 2 + with HttpMocker() as http_mocker: + _mock_party_members_requests(http_mocker, _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES) + _mock_locations_requests(http_mocker, location_slices) + http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE) + _mock_party_members_skills_requests(http_mocker) - for i, _ in enumerate(source._concurrent_streams): - stream = source._concurrent_streams[i]._stream_partition_generator._stream - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._concurrent_streams[i]._stream_partition_generator._stream = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") - - for i, _ in enumerate(source._synchronous_streams): - stream = source._synchronous_streams[i] - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._synchronous_streams[i] = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=[])) - messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=[])) - - # Expects 15 records, 5 slices, 3 records each slice + # See _mock_party_members_requests party_members_records = get_records_for_stream("party_members", messages) - assert len(party_members_records) == 15 + assert len(party_members_records) == 3 party_members_states = get_states_for_stream(stream_name="party_members", messages=messages) assert len(party_members_states) == 6 assert party_members_states[5].stream.stream_state.__dict__ == AirbyteStateBlob( state_type="date-range", - slices=[{"start": "2024-07-01", "end": "2024-09-09"}] + slices=[{"start": "2024-07-01", "end": "2024-09-10"}] ).__dict__ # Expects 12 records, 3 slices, 4 records each slice @@ -565,7 +638,7 @@ def test_read_with_concurrent_and_synchronous_streams(): assert len(locations_states) == 4 assert locations_states[3].stream.stream_state.__dict__ == AirbyteStateBlob( state_type="date-range", - slices=[{"start": "2024-07-01", "end": "2024-09-09"}] + slices=[{"start": "2024-07-01", "end": "2024-09-10"}] ).__dict__ # Expects 7 records, 1 empty slice, 7 records in slice @@ -574,24 +647,39 @@ def test_read_with_concurrent_and_synchronous_streams(): palaces_states = get_states_for_stream(stream_name="palaces", messages=messages) assert len(palaces_states) == 1 - assert palaces_states[0].stream.stream_state.__dict__ == AirbyteStateBlob(__ab_no_cursor_state_message=True).__dict__ + assert palaces_states[0].stream.stream_state.__dict__ == AirbyteStateBlob(__ab_full_refresh_sync_complete=True).__dict__ - # Expects 3 records, 1 empty slice, 3 records in slice + # Expects 3 records, 3 slices, 3 records in slice party_members_skills_records = get_records_for_stream("party_members_skills", messages) - assert len(party_members_skills_records) == 3 + assert len(party_members_skills_records) == 9 party_members_skills_states = get_states_for_stream(stream_name="party_members_skills", messages=messages) - assert len(party_members_skills_states) == 1 - assert party_members_skills_states[0].stream.stream_state.__dict__ == AirbyteStateBlob(__ab_no_cursor_state_message=True).__dict__ - + assert len(party_members_skills_states) == 3 + assert party_members_skills_states[0].stream.stream_state.__dict__ == { + "states": [ + {"partition": {"parent_slice": {}, "party_member_id": "amamiya"}, "cursor": {"__ab_full_refresh_sync_complete": True}}, + ] + } + assert party_members_skills_states[1].stream.stream_state.__dict__ == { + "states": [ + {"partition": {"parent_slice": {}, "party_member_id": "amamiya"}, "cursor": {"__ab_full_refresh_sync_complete": True}}, + {"partition": {"parent_slice": {}, "party_member_id": "nijima"}, "cursor": {"__ab_full_refresh_sync_complete": True}}, + ] + } + assert party_members_skills_states[2].stream.stream_state.__dict__ == { + "states": [ + {"partition": {"parent_slice": {}, "party_member_id": "amamiya"}, "cursor": {"__ab_full_refresh_sync_complete": True}}, + {"partition": {"parent_slice": {}, "party_member_id": "nijima"}, "cursor": {"__ab_full_refresh_sync_complete": True}}, + {"partition": {"parent_slice": {}, "party_member_id": "yoshizawa"}, "cursor": {"__ab_full_refresh_sync_complete": True}} + ] + } -@freezegun.freeze_time("2024-09-10T00:00:00") +@freezegun.freeze_time(_NOW) def test_read_with_concurrent_and_synchronous_streams_with_concurrent_state(): """ Verifies that a ConcurrentDeclarativeSource processes concurrent streams correctly using the incoming concurrent state format """ - state = [ AirbyteStateMessage( type=AirbyteStateType.STREAM, @@ -611,7 +699,7 @@ def test_read_with_concurrent_and_synchronous_streams_with_concurrent_state(): state_type="date-range", slices=[ {"start": "2024-07-16", "end": "2024-07-30"}, - {"start": "2024-08-15", "end": "2024-08-29"}, + {"start": "2024-07-31", "end": "2024-08-14"}, {"start": "2024-08-30", "end": "2024-09-09"}, ] ), @@ -619,27 +707,33 @@ def test_read_with_concurrent_and_synchronous_streams_with_concurrent_state(): ), ] - source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state) + party_members_slices_and_responses = _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES + [ + ( + {"start": "2024-09-04", "end": "2024-09-10"}, + HttpResponse( + json.dumps( + [ + {"id": "yoshizawa", "first_name": "sumire", "last_name": "yoshizawa", "updated_at": "2024-09-10"} + ] + ) + ), + ) # considering lookback window + ] + location_slices = [ + {"start": "2024-07-26", "end": "2024-08-25"}, + {"start": "2024-08-26", "end": "2024-09-10"}, + ] + source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state) disable_emitting_sequential_state_messages(source=source) - for i, _ in enumerate(source._concurrent_streams): - stream = source._concurrent_streams[i]._stream_partition_generator._stream - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._concurrent_streams[i]._stream_partition_generator._stream = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") - - for i, _ in enumerate(source._synchronous_streams): - stream = source._synchronous_streams[i] - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._synchronous_streams[i] = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + with HttpMocker() as http_mocker: + _mock_party_members_requests(http_mocker, party_members_slices_and_responses) + _mock_locations_requests(http_mocker, location_slices) + http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE) + _mock_party_members_skills_requests(http_mocker) - messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=state)) + messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=state)) # Expects 8 records, skip successful intervals and are left with 2 slices, 4 records each slice locations_records = get_records_for_stream("locations", messages) @@ -649,30 +743,32 @@ def test_read_with_concurrent_and_synchronous_streams_with_concurrent_state(): assert len(locations_states) == 3 assert locations_states[2].stream.stream_state.__dict__ == AirbyteStateBlob( state_type="date-range", - slices=[{"start": "2024-07-01", "end": "2024-09-09"}] + slices=[{"start": "2024-07-01", "end": "2024-09-10"}] ).__dict__ - # Expects 12 records, skip successful intervals and are left with 4 slices, 3 records each slice + # slices to sync are: + # * {"start": "2024-07-01", "end": "2024-07-15"}: one record in _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES + # * {"start": "2024-09-04", "end": "2024-09-10"}: one record from the lookback window defined in this test party_members_records = get_records_for_stream("party_members", messages) - assert len(party_members_records) == 12 + assert len(party_members_records) == 2 party_members_states = get_states_for_stream(stream_name="party_members", messages=messages) - assert len(party_members_states) == 5 - assert party_members_states[4].stream.stream_state.__dict__ == AirbyteStateBlob( + assert len(party_members_states) == 4 + assert party_members_states[3].stream.stream_state.__dict__ == AirbyteStateBlob( state_type="date-range", - slices=[{"start": "2024-07-01", "end": "2024-09-09"}] # weird, why'd this end up as 2024-09-10 is it because of cursor granularity? + slices=[{"start": "2024-07-01", "end": "2024-09-10"}] ).__dict__ # Expects 7 records, 1 empty slice, 7 records in slice palaces_records = get_records_for_stream("palaces", messages) assert len(palaces_records) == 7 - # Expects 3 records, 1 empty slice, 3 records in slice + # Expects 3 records, 3 slices, 3 records in slice party_members_skills_records = get_records_for_stream("party_members_skills", messages) - assert len(party_members_skills_records) == 3 + assert len(party_members_skills_records) == 9 -@freezegun.freeze_time("2024-09-10T00:00:00") +@freezegun.freeze_time(_NOW) def test_read_with_concurrent_and_synchronous_streams_with_sequential_state(): """ Verifies that a ConcurrentDeclarativeSource processes concurrent streams correctly using the incoming @@ -690,32 +786,30 @@ def test_read_with_concurrent_and_synchronous_streams_with_sequential_state(): type=AirbyteStateType.STREAM, stream=AirbyteStreamState( stream_descriptor=StreamDescriptor(name="party_members", namespace=None), - stream_state=AirbyteStateBlob(updated_at="2024-08-20"), + stream_state=AirbyteStateBlob(updated_at="2024-08-21"), ), ) ] source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state) - disable_emitting_sequential_state_messages(source=source) - for i, _ in enumerate(source._concurrent_streams): - stream = source._concurrent_streams[i]._stream_partition_generator._stream - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._concurrent_streams[i]._stream_partition_generator._stream = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + party_members_slices_and_responses = _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES + [ + ({"start": "2024-08-16", "end": "2024-08-30"}, HttpResponse(json.dumps([{"id": "nijima", "first_name": "makoto", "last_name": "nijima", "updated_at": "2024-08-10"}]))), # considering lookback window + ({"start": "2024-08-31", "end": "2024-09-10"}, HttpResponse(json.dumps([{"id": "yoshizawa", "first_name": "sumire", "last_name": "yoshizawa", "updated_at": "2024-09-10"}]))), + ] + location_slices = [ + {"start": "2024-08-01", "end": "2024-08-31"}, + {"start": "2024-09-01", "end": "2024-09-10"}, + ] - for i, _ in enumerate(source._synchronous_streams): - stream = source._synchronous_streams[i] - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._synchronous_streams[i] = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + with HttpMocker() as http_mocker: + _mock_party_members_requests(http_mocker, party_members_slices_and_responses) + _mock_locations_requests(http_mocker, location_slices) + http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE) + _mock_party_members_skills_requests(http_mocker) - messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=state)) + messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=state)) # Expects 8 records, skip successful intervals and are left with 2 slices, 4 records each slice locations_records = get_records_for_stream("locations", messages) @@ -725,55 +819,47 @@ def test_read_with_concurrent_and_synchronous_streams_with_sequential_state(): assert len(locations_states) == 3 assert locations_states[2].stream.stream_state.__dict__ == AirbyteStateBlob( state_type="date-range", - slices=[{"start": "2024-07-01", "end": "2024-09-09"}] + slices=[{"start": "2024-07-01", "end": "2024-09-10"}] ).__dict__ - # Expects 6 records, skip successful intervals and are left with 2 slices, 3 records each slice + # From extra slices defined in party_members_slices_and_responses party_members_records = get_records_for_stream("party_members", messages) - assert len(party_members_records) == 6 + assert len(party_members_records) == 2 party_members_states = get_states_for_stream(stream_name="party_members", messages=messages) assert len(party_members_states) == 3 assert party_members_states[2].stream.stream_state.__dict__ == AirbyteStateBlob( state_type="date-range", - slices=[{"start": "2024-07-01", "end": "2024-09-09"}] + slices=[{"start": "2024-07-01", "end": "2024-09-10"}] ).__dict__ # Expects 7 records, 1 empty slice, 7 records in slice palaces_records = get_records_for_stream("palaces", messages) assert len(palaces_records) == 7 - # Expects 3 records, 1 empty slice, 3 records in slice + # Expects 3 records, 3 slices, 3 records in slice party_members_skills_records = get_records_for_stream("party_members_skills", messages) - assert len(party_members_skills_records) == 3 + assert len(party_members_skills_records) == 9 -@freezegun.freeze_time("2024-09-10T00:00:00") +@freezegun.freeze_time(_NOW) def test_read_concurrent_with_failing_partition_in_the_middle(): """ Verify that partial state is emitted when only some partitions are successful during a concurrent sync attempt """ + # most_recent_cursor_value = datetime(2024, 8, 10, 0, 0, 0, tzinfo=timezone.utc) # based on _LOCATIONS_RESPONSE, this value might be outside of the actual start/end of the slice + # FIXME it seems like we yet don't consider `most_recent_cursor_value` as we ignore it during serialization [here](https://github.com/airbytehq/airbyte/blob/f07571f15f1bdbba86ad5e324e829a89b7d07cd6/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py#L75-L78) + # Once this is added, this test will probably have to change because the generated slices (represented by `location_slices` here) will no longer match the `expected_stream_state` + # This is why `most_recent_cursor_value` is unused for now + location_slices = [ + {"start": "2024-07-01", "end": "2024-07-31"}, + # missing slice `{"start": "2024-08-01", "end": "2024-08-31"}` here + {"start": "2024-09-01", "end": "2024-09-10"}, + ] expected_stream_state = { "state_type": "date-range", - "slices": [ - { - "start": datetime(2024, 7, 5, 0, 0, tzinfo=timezone.utc), - "end": datetime(2024, 8, 4, 0, 0, 0, tzinfo=timezone.utc), - "most_recent_cursor_value": datetime(2024, 8, 10, 0, 0, 0, tzinfo=timezone.utc), - }, - { - "end": datetime(2024, 9, 9, 0, 0, 0, tzinfo=timezone.utc), - "start": datetime(2024, 9, 5, 0, 0, 0, tzinfo=timezone.utc), - "most_recent_cursor_value": datetime(2024, 8, 10, 0, 0, 0, tzinfo=timezone.utc), - } - ], - "legacy": {}, - } - - # I swapped the config as a trick to get different intervals to get returned by the mocked read_records() method - config = { - "start_date": "2024-07-05T00:00:00.000Z" + "slices": location_slices, } catalog = ConfiguredAirbyteCatalog( @@ -786,76 +872,68 @@ def test_read_concurrent_with_failing_partition_in_the_middle(): ] ) - source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=config, catalog=catalog, state=[]) - + source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[]) disable_emitting_sequential_state_messages(source=source) - for i, _ in enumerate(source._concurrent_streams): - stream = source._concurrent_streams[i]._stream_partition_generator._stream - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._concurrent_streams[i]._stream_partition_generator._stream = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + location_slices = [ + {"start": "2024-07-01", "end": "2024-07-31"}, + # missing slice `{"start": "2024-08-01", "end": "2024-08-31"}` here + {"start": "2024-09-01", "end": "2024-09-10"}, + ] - for i, _ in enumerate(source._synchronous_streams): - stream = source._synchronous_streams[i] - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._synchronous_streams[i] = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + with HttpMocker() as http_mocker: + _mock_locations_requests(http_mocker, location_slices) - try: - list(source.read(logger=source.logger, config=config, catalog=catalog, state=[])) - except AirbyteTracedException: - locations_stream = [stream for stream in source.streams(config=config, include_concurrent_streams=True) if stream.name == "locations"][0] - final_stream_state = locations_stream.cursor.state - assert final_stream_state == expected_stream_state + messages = [] + try: + for message in source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[]): + messages.append(message) + except AirbyteTracedException: + assert get_states_for_stream(stream_name="locations", messages=messages)[-1].stream.stream_state.__dict__ == expected_stream_state -@freezegun.freeze_time("2024-09-10T00:00:00") +@freezegun.freeze_time(_NOW) def test_read_concurrent_skip_streams_not_in_catalog(): """ Verifies that the ConcurrentDeclarativeSource only syncs streams that are specified in the incoming ConfiguredCatalog """ + with HttpMocker() as http_mocker: + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream(name="palaces", json_schema={}, supported_sync_modes=[SyncMode.full_refresh]), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + ), + ConfiguredAirbyteStream( + stream=AirbyteStream(name="locations", json_schema={}, supported_sync_modes=[SyncMode.incremental]), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ), + ] + ) - catalog = ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=AirbyteStream(name="palaces", json_schema={}, supported_sync_modes=[SyncMode.full_refresh]), - sync_mode=SyncMode.full_refresh, - destination_sync_mode=DestinationSyncMode.append, - ), - ConfiguredAirbyteStream( - stream=AirbyteStream(name="locations", json_schema={}, supported_sync_modes=[SyncMode.incremental]), - sync_mode=SyncMode.incremental, - destination_sync_mode=DestinationSyncMode.append, - ), + source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=None) + # locations requests + location_slices = [ + {"start": "2024-07-01", "end": "2024-07-31"}, + {"start": "2024-08-01", "end": "2024-08-31"}, + {"start": "2024-09-01", "end": "2024-09-10"}, ] - ) - - source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=None) - - disable_emitting_sequential_state_messages(source=source) + locations_query_params = list(map(lambda _slice: _slice | {"m": "active", "i": "1", "g": "country"}, location_slices)) + _mock_requests( + http_mocker, + "https://persona.metaverse.com/locations", + locations_query_params, + [_LOCATIONS_RESPONSE] * len(location_slices), + ) - for i, _ in enumerate(source._concurrent_streams): - stream = source._concurrent_streams[i]._stream_partition_generator._stream - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._concurrent_streams[i]._stream_partition_generator._stream = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + # palaces requests + http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE) - for i, _ in enumerate(source._synchronous_streams): - stream = source._synchronous_streams[i] - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._synchronous_streams[i] = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + disable_emitting_sequential_state_messages(source=source) - messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[])) + messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[])) locations_records = get_records_for_stream(stream_name="locations", messages=messages) assert len(locations_records) == 12 @@ -1001,6 +1079,7 @@ def get_mocked_read_records_output(stream_name: str) -> Mapping[tuple[str, str], {"id": "5", "world": "casino", "owner": "nijima"}, {"id": "6", "world": "cruiser", "owner": "shido"}, ] + case "party_members_skills": slices = [StreamSlice(cursor_slice={}, partition={})] @@ -1026,4 +1105,3 @@ def get_states_for_stream(stream_name: str, messages: List[AirbyteMessage]) -> L def disable_emitting_sequential_state_messages(source: ConcurrentDeclarativeSource) -> None: for concurrent_streams in source._concurrent_streams: # type: ignore # This is the easiest way to disable behavior from the test concurrent_streams.cursor._connector_state_converter._is_sequential_state = False # type: ignore # see above - concurrent_streams.cursor._connector_state_converter._is_sequential_state = False # type: ignore # see above diff --git a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_cursor.py b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_cursor.py index 1327bafda7ea..d98532bd65cd 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_cursor.py @@ -343,6 +343,73 @@ def test_given_difference_between_slices_match_slice_range_when_generate_slices_ (datetime.fromtimestamp(30, timezone.utc), datetime.fromtimestamp(40, timezone.utc)), ] + @freezegun.freeze_time(time_to_freeze=datetime.fromtimestamp(50, timezone.utc)) + def test_given_small_slice_range_with_granularity_when_generate_slices_then_create_many_slices(self): + start = datetime.fromtimestamp(1, timezone.utc) + small_slice_range = timedelta(seconds=10) + granularity = timedelta(seconds=1) + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + { + "state_type": ConcurrencyCompatibleStateType.date_range.value, + "slices": [ + {EpochValueConcurrentStreamStateConverter.START_KEY: 1, EpochValueConcurrentStreamStateConverter.END_KEY: 20}, + ], + }, + self._message_repository, + self._state_manager, + EpochValueConcurrentStreamStateConverter(is_sequential_state=False), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + start, + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + small_slice_range, + granularity, + ) + + slices = list(cursor.generate_slices()) + + assert slices == [ + (datetime.fromtimestamp(20, timezone.utc), datetime.fromtimestamp(29, timezone.utc)), + (datetime.fromtimestamp(30, timezone.utc), datetime.fromtimestamp(39, timezone.utc)), + (datetime.fromtimestamp(40, timezone.utc), datetime.fromtimestamp(50, timezone.utc)), + ] + + @freezegun.freeze_time(time_to_freeze=datetime.fromtimestamp(50, timezone.utc)) + def test_given_difference_between_slices_match_slice_range_and_cursor_granularity_when_generate_slices_then_create_one_slice(self): + start = datetime.fromtimestamp(1, timezone.utc) + small_slice_range = timedelta(seconds=10) + granularity = timedelta(seconds=1) + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + { + "state_type": ConcurrencyCompatibleStateType.date_range.value, + "slices": [ + {EpochValueConcurrentStreamStateConverter.START_KEY: 1, EpochValueConcurrentStreamStateConverter.END_KEY: 30}, + {EpochValueConcurrentStreamStateConverter.START_KEY: 41, EpochValueConcurrentStreamStateConverter.END_KEY: 50}, + ], + }, + self._message_repository, + self._state_manager, + EpochValueConcurrentStreamStateConverter(is_sequential_state=False), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + start, + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + small_slice_range, + granularity, + ) + + slices = list(cursor.generate_slices()) + + assert slices == [ + (datetime.fromtimestamp(31, timezone.utc), datetime.fromtimestamp(40, timezone.utc)), # FIXME there should probably be the granularity at the beginning too + ] + @freezegun.freeze_time(time_to_freeze=datetime.fromtimestamp(50, timezone.utc)) def test_given_non_continuous_state_when_generate_slices_then_create_slices_between_gaps_and_after(self): cursor = ConcurrentCursor( diff --git a/airbyte-cdk/python/unit_tests/test/mock_http/test_mocker.py b/airbyte-cdk/python/unit_tests/test/mock_http/test_mocker.py index 2df549e2bb9f..2b086a1748c6 100644 --- a/airbyte-cdk/python/unit_tests/test/mock_http/test_mocker.py +++ b/airbyte-cdk/python/unit_tests/test/mock_http/test_mocker.py @@ -240,3 +240,20 @@ def decorated_function(http_mocker): with pytest.raises(ValueError): decorated_function() + + def test_given_unknown_request_when_assert_number_of_calls_then_raise(self): + @HttpMocker() + def decorated_function(http_mocker): + http_mocker.get(HttpRequest(_A_URL), _A_RESPONSE) + http_mocker.assert_number_of_calls(HttpRequest(_ANOTHER_URL), 1) + + with pytest.raises(ValueError): + decorated_function() + + def test_given_request_already_mocked_when_decorate_then_raise(self): + with HttpMocker() as http_mocker: + a_request = HttpRequest(_A_URL, _SOME_QUERY_PARAMS, _SOME_HEADERS) + http_mocker.get(a_request, _A_RESPONSE) + + with pytest.raises(ValueError): + http_mocker.get(a_request, _A_RESPONSE)