From b98601b98f980ad0582156921ce8174bd838f963 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 18 Oct 2024 09:06:44 -0400 Subject: [PATCH 1/9] fix typing on streams method --- .../airbyte_cdk/sources/abstract_source.py | 4 +- .../declarative/checks/check_stream.py | 2 +- .../concurrent_declarative_source.py | 18 +- .../manifest_declarative_source.py | 2 +- .../test_concurrent_declarative_source.py | 422 +++++++++++------- 5 files changed, 269 insertions(+), 179 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index e3156d3403a8..3656a88c3157 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -56,11 +56,9 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> """ @abstractmethod - def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]: + def streams(self, config: Mapping[str, Any]) -> List[Stream]: """ :param config: The user-provided configuration as specified by the source's spec. - :param include_concurrent_streams: Concurrent sources can be made up of streams that can be run concurrently and - ones that must be run synchronously. By default, for backwards compatibility this is disabled. Any stream construction related operation should happen here. :return: A list of the streams in this source connector. """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index 78b73426d74b..83c2a7fb71e8 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -28,7 +28,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: - streams = source.streams(config=config, include_concurrent_streams=True) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface + streams = source.streams(config=config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface stream_name_to_stream = {s.name: s for s in streams} if len(streams) == 0: return False, f"No streams to connect to from source {source}" diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 8c3544ed21eb..9e043820b87a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -125,24 +125,20 @@ def read( filtered_catalog = self._remove_concurrent_streams_from_catalog(catalog=catalog, concurrent_stream_names=concurrent_stream_names) yield from super().read(logger, config, filtered_catalog, state) - def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: - return super().check(logger=logger, config=config) - def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: return AirbyteCatalog( - streams=[stream.as_airbyte_stream() for stream in self.streams(config=config, include_concurrent_streams=True)] + streams=[stream.as_airbyte_stream() for stream in self._concurrent_streams + self._synchronous_streams] ) - def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]: + def streams(self, config: Mapping[str, Any]) -> List[Stream]: """ - Returns the list of streams that can be run synchronously in the Python CDK. + The `streams` method is used as part of the AbstractSource in the following cases: + * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams + * ConcurrentDeclarativeSource.discover -> ManifestDeclarativeSource.discover -> streams - NOTE: For ConcurrentDeclarativeSource, this method only returns synchronous streams because it usage is invoked within the - existing Python CDK. Streams that support concurrency are started from read(). + In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This """ - if include_concurrent_streams: - return self._synchronous_streams + self._concurrent_streams # type: ignore # Although AbstractStream doesn't inherit stream, they were designed to fit the same interface when called from streams() - return self._synchronous_streams + return super().streams(config) def _group_streams(self, config: Mapping[str, Any]) -> Tuple[List[AbstractStream], List[Stream]]: concurrent_streams: List[AbstractStream] = [] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 97fc0be52340..842d4e944454 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -88,7 +88,7 @@ def connection_checker(self) -> ConnectionChecker: else: raise ValueError(f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}") - def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]: + def streams(self, config: Mapping[str, Any]) -> List[Stream]: self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}) stream_configs = self._stream_configs(self._source_config) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 87171719c81c..b5f1831d4271 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -2,8 +2,9 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. # import copy +import json from datetime import datetime, timedelta, timezone -from typing import Any, Iterable, List, Mapping, Optional, Union +from typing import Any, Iterable, List, Mapping, Optional, Union, Dict import freezegun import isodate @@ -21,16 +22,21 @@ DestinationSyncMode, FailureType, StreamDescriptor, + Status, SyncMode, ) from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSource from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.checkpoint import Cursor from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.types import Record, StreamSlice +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse from airbyte_cdk.utils import AirbyteTracedException +from deprecated.classic import deprecated + _CONFIG = { "start_date": "2024-07-01T00:00:00.000Z" @@ -60,6 +66,41 @@ ) ] ) +_LOCATIONS_RESPONSE = HttpResponse(json.dumps([ + {"id": "444", "name": "Yongen-jaya", "updated_at": "2024-08-10"}, + {"id": "scramble", "name": "Shibuya", "updated_at": "2024-08-10"}, + {"id": "aoyama", "name": "Aoyama-itchome", "updated_at": "2024-08-10"}, + {"id": "shin123", "name": "Shinjuku", "updated_at": "2024-08-10"}, +])) +_PARTY_MEMBERS_RESPONSE = HttpResponse(json.dumps([ + {"id": "amamiya", "first_name": "ren", "last_name": "amamiya", "updated_at": "2024-07-10"}, + {"id": "nijima", "first_name": "makoto", "last_name": "nijima", "updated_at": "2024-08-10"}, + {"id": "yoshizawa", "first_name": "sumire", "last_name": "yoshizawa", "updated_at": "2024-09-10"}, +])) +_PALACES_RESPONSE = HttpResponse(json.dumps([ + {"id": "0", "world": "castle", "owner": "kamoshida"}, + {"id": "1", "world": "museum", "owner": "madarame"}, + {"id": "2", "world": "bank", "owner": "kaneshiro"}, + {"id": "3", "world": "pyramid", "owner": "futaba"}, + {"id": "4", "world": "spaceport", "owner": "okumura"}, + {"id": "5", "world": "casino", "owner": "nijima"}, + {"id": "6", "world": "cruiser", "owner": "shido"}, +])) +_PARTY_MEMBERS_SKILLS_RESPONSE = HttpResponse(json.dumps([ + {"id": "0", "name": "hassou tobi"}, + {"id": "1", "name": "mafreidyne"}, + {"id": "2", "name": "myriad truths"}, +])) +_EMPTY_RESPONSE = HttpResponse(json.dumps([])) +_NOW = "2024-09-10T00:00:00" +_DECLARATIVE_CURSOR_PARTY_MEMBERS_SLICES = [ + {"start": "2024-06-26", "end": "2024-07-10"}, + {"start": "2024-07-11", "end": "2024-07-25"}, + {"start": "2024-07-26", "end": "2024-08-09"}, + {"start": "2024-08-10", "end": "2024-08-24"}, + {"start": "2024-08-25", "end": "2024-09-08"}, + {"start": "2024-09-09", "end": "2024-09-10"}, +] # FIXME can be removed/updated once the declarative cursor is updated to match the concurrent cursor behavior on lookback windows _MANIFEST = { "version": "5.0.0", @@ -68,7 +109,7 @@ "type": "RecordSelector", "extractor": { "type": "DpathExtractor", - "field_path": ["{{ parameters.get('data_field') }}"] + "field_path": [] } }, "requester": { @@ -95,7 +136,8 @@ "error_message": "No data available for the time range requested." } ] - } + }, + "use_cache": False, # necessary to ensure all tests are independent and that HttpMocker can validate the HTTP requests }, "retriever": { "type": "SimpleRetriever", @@ -155,11 +197,7 @@ "retriever": { "$ref": "#/definitions/base_incremental_stream/retriever", "record_selector": { - "type": "RecordSelector", - "extractor": { - "type": "DpathExtractor", - "field_path": [] - } + "$ref": "#/definitions/selector" } }, "$parameters": { @@ -190,7 +228,7 @@ "$parameters": { "name": "palaces", "primary_key": "id", - "path": "/party_members" + "path": "/palaces" }, "schema_loader": { "type": "InlineSchemaLoader", @@ -223,11 +261,7 @@ } }, "record_selector": { - "type": "RecordSelector", - "extractor": { - "type": "DpathExtractor", - "field_path": [] - } + "$ref": "#/definitions/selector" } }, "incremental_sync": { @@ -263,11 +297,7 @@ "retriever": { "$ref": "#/definitions/base_incremental_stream/retriever", "record_selector": { - "type": "RecordSelector", - "extractor": { - "type": "DpathExtractor", - "field_path": [] - } + "$ref": "#/definitions/selector" }, "partition_router": { "type": "SubstreamPartitionRouter", @@ -284,7 +314,7 @@ "$parameters": { "name": "party_members_skills", "primary_key": "id", - "path": "/party_members_skills" + "path": "/party_members/{{stream_slice.party_member_id}}/skills" }, "schema_loader": { "type": "InlineSchemaLoader", @@ -322,10 +352,18 @@ } +@deprecated("See note in docstring for more information") class DeclarativeStreamDecorator(Stream): """ Helper class that wraps an existing DeclarativeStream but allows for overriding the output of read_records() to make it easier to mock behavior and test how low-code streams integrate with the Concurrent CDK. + + NOTE: We are not using that for now but the intent was to scope the tests to only testing that streams were properly instantiated and + interacted together properly. However in practice, we had a couple surprises like `get_cursor` and `stream_slices` needed to be + re-implemented as well. Because of that, we've move away from that in favour of doing tests that integrate up until the HTTP request. + The drawback of that is that we are dependent on any change later (like if the DatetimeBasedCursor changes, this will affect those + tests) but it feels less flaky than this. If we have new information in the future to infirm that, feel free to re-use this class as + necessary. """ def __init__(self, declarative_stream: DeclarativeStream, slice_to_records_mapping: Mapping[tuple[str, str], List[Mapping[str, Any]]]): @@ -366,6 +404,9 @@ def read_records( def get_json_schema(self) -> Mapping[str, Any]: return self._declarative_stream.get_json_schema() + def get_cursor(self) -> Optional[Cursor]: + return self._declarative_stream.get_cursor() + def test_group_streams(): """ @@ -486,13 +527,26 @@ def test_create_concurrent_cursor(): def test_check(): """ - Verifies that the ConcurrentDeclarativeSource check command is run against concurrent and synchronous streams + Verifies that the ConcurrentDeclarativeSource check command is run against synchronous streams """ - source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None) + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest("https://persona.metaverse.com/party_members?start=2024-06-26&end=2024-07-10"), + HttpResponse(json.dumps({"id": "party_member_1"})), + ) + http_mocker.get( + HttpRequest("https://persona.metaverse.com/palaces"), + HttpResponse(json.dumps({"id": "palace_1"})), + ) + http_mocker.get( + HttpRequest("https://persona.metaverse.com/locations?m=active&i=1&g=country&start=2024-06-26&end=2024-07-25"), + HttpResponse(json.dumps({"id": "location_1"})), + ) + source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None) - actual_connection_state = source.check(logger=source.logger, config=_CONFIG) + connection_status = source.check(logger=source.logger, config=_CONFIG) - assert actual_connection_state + assert connection_status.status == Status.SUCCEEDED def test_discover(): @@ -512,33 +566,77 @@ def test_discover(): assert actual_catalog.streams[3].name in expected_stream_names -@freezegun.freeze_time("2024-09-10T00:00:00") +def _mock_requests(http_mocker: HttpMocker, url: str, query_params: List[Dict[str, str]], responses: List[HttpResponse]) -> None: + assert len(query_params) == len(responses), "Expecting as many slices as response" + + for i in range(len(query_params)): + http_mocker.get(HttpRequest(url, query_params=query_params[i]), responses[i]) + + +def _mock_party_members_requests(http_mocker: HttpMocker, slices: List[Dict[str, str]]) -> None: + _mock_requests( + http_mocker, + "https://persona.metaverse.com/party_members", + slices, + [_PARTY_MEMBERS_RESPONSE] * len(slices), + ) + +def _mock_locations_requests(http_mocker: HttpMocker, slices: List[Dict[str, str]]) -> None: + locations_query_params = list(map(lambda _slice: _slice | {"m": "active", "i": "1", "g": "country"}, slices)) + _mock_requests( + http_mocker, + "https://persona.metaverse.com/locations", + locations_query_params, + [_LOCATIONS_RESPONSE] * len(slices), + ) + + +def _mock_party_members_skills_requests(http_mocker: HttpMocker) -> None: + _mock_requests( + http_mocker, + "https://persona.metaverse.com/party_members", + _DECLARATIVE_CURSOR_PARTY_MEMBERS_SLICES, + [ + HttpResponse(json.dumps([{"id": "amamiya", "first_name": "ren", "last_name": "amamiya", "updated_at": "2024-07-10"}])), + _EMPTY_RESPONSE, + _EMPTY_RESPONSE, + HttpResponse(json.dumps([{"id": "nijima", "first_name": "makoto", "last_name": "nijima", "updated_at": "2024-08-10"}, ])), + _EMPTY_RESPONSE, + HttpResponse(json.dumps([{"id": "yoshizawa", "first_name": "sumire", "last_name": "yoshizawa", "updated_at": "2024-09-10"}])), + ], + ) + http_mocker.get(HttpRequest("https://persona.metaverse.com/party_members/amamiya/skills"), _PARTY_MEMBERS_SKILLS_RESPONSE) + http_mocker.get(HttpRequest("https://persona.metaverse.com/party_members/nijima/skills"), _PARTY_MEMBERS_SKILLS_RESPONSE) + http_mocker.get(HttpRequest("https://persona.metaverse.com/party_members/yoshizawa/skills"), _PARTY_MEMBERS_SKILLS_RESPONSE) + + +@freezegun.freeze_time(_NOW) def test_read_with_concurrent_and_synchronous_streams(): """ Verifies that a ConcurrentDeclarativeSource processes concurrent streams followed by synchronous streams """ - + concurrent_cursor_party_members_slices = [ + {"start": "2024-07-01", "end": "2024-07-15"}, + {"start": "2024-07-16", "end": "2024-07-30"}, + {"start": "2024-07-31", "end": "2024-08-14"}, + {"start": "2024-08-15", "end": "2024-08-29"}, + {"start": "2024-08-30", "end": "2024-09-09"}, + ] + location_slices = [ + {"start": "2024-07-01", "end": "2024-07-31"}, + {"start": "2024-08-01", "end": "2024-08-31"}, + {"start": "2024-09-01", "end": "2024-09-09"}, + ] source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=None) - disable_emitting_sequential_state_messages(source=source) - for i, _ in enumerate(source._concurrent_streams): - stream = source._concurrent_streams[i]._stream_partition_generator._stream - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._concurrent_streams[i]._stream_partition_generator._stream = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") - - for i, _ in enumerate(source._synchronous_streams): - stream = source._synchronous_streams[i] - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._synchronous_streams[i] = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + with HttpMocker() as http_mocker: + _mock_party_members_requests(http_mocker, concurrent_cursor_party_members_slices) + _mock_locations_requests(http_mocker, location_slices) + http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE) + _mock_party_members_skills_requests(http_mocker) - messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=[])) + messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=[])) # Expects 15 records, 5 slices, 3 records each slice party_members_records = get_records_for_stream("party_members", messages) @@ -570,24 +668,40 @@ def test_read_with_concurrent_and_synchronous_streams(): palaces_states = get_states_for_stream(stream_name="palaces", messages=messages) assert len(palaces_states) == 1 - assert palaces_states[0].stream.stream_state.__dict__ == AirbyteStateBlob(__ab_no_cursor_state_message=True).__dict__ + assert palaces_states[0].stream.stream_state.__dict__ == AirbyteStateBlob(__ab_full_refresh_sync_complete=True).__dict__ - # Expects 3 records, 1 empty slice, 3 records in slice + # Expects 3 records, 3 slices, 3 records in slice party_members_skills_records = get_records_for_stream("party_members_skills", messages) - assert len(party_members_skills_records) == 3 + assert len(party_members_skills_records) == 9 party_members_skills_states = get_states_for_stream(stream_name="party_members_skills", messages=messages) - assert len(party_members_skills_states) == 1 - assert party_members_skills_states[0].stream.stream_state.__dict__ == AirbyteStateBlob(__ab_no_cursor_state_message=True).__dict__ + assert len(party_members_skills_states) == 3 + assert party_members_skills_states[0].stream.stream_state.__dict__ == { + "states": [ + {"partition": {"parent_slice": {}, "party_member_id": "amamiya"}, "cursor": {"__ab_full_refresh_sync_complete": True}}, + ] + } + assert party_members_skills_states[1].stream.stream_state.__dict__ == { + "states": [ + {"partition": {"parent_slice": {}, "party_member_id": "amamiya"}, "cursor": {"__ab_full_refresh_sync_complete": True}}, + {"partition": {"parent_slice": {}, "party_member_id": "nijima"}, "cursor": {"__ab_full_refresh_sync_complete": True}}, + ] + } + assert party_members_skills_states[2].stream.stream_state.__dict__ == { + "states": [ + {"partition": {"parent_slice": {}, "party_member_id": "amamiya"}, "cursor": {"__ab_full_refresh_sync_complete": True}}, + {"partition": {"parent_slice": {}, "party_member_id": "nijima"}, "cursor": {"__ab_full_refresh_sync_complete": True}}, + {"partition": {"parent_slice": {}, "party_member_id": "yoshizawa"}, "cursor": {"__ab_full_refresh_sync_complete": True}} + ] + } -@freezegun.freeze_time("2024-09-10T00:00:00") +@freezegun.freeze_time(_NOW) def test_read_with_concurrent_and_synchronous_streams_with_concurrent_state(): """ Verifies that a ConcurrentDeclarativeSource processes concurrent streams correctly using the incoming concurrent state format """ - state = [ AirbyteStateMessage( type=AirbyteStateType.STREAM, @@ -615,27 +729,27 @@ def test_read_with_concurrent_and_synchronous_streams_with_concurrent_state(): ), ] - source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state) + concurrent_cursor_party_members_slices = [ + {"start": "2024-07-01", "end": "2024-07-15"}, + {"start": "2024-07-30", "end": "2024-08-13"}, # FIXME this is an interesting case where we restart from that top boundary. I'm wondering if we need to change that because if this was within one sync, we would start from 2024-09-01 and not 2024-07-30 + {"start": "2024-08-14", "end": "2024-08-14"}, + {"start": "2024-09-04", "end": "2024-09-09"}, # considering lookback window + ] + location_slices = [ + {"start": "2024-07-26", "end": "2024-08-25"}, + {"start": "2024-08-26", "end": "2024-09-09"}, + ] + source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state) disable_emitting_sequential_state_messages(source=source) - for i, _ in enumerate(source._concurrent_streams): - stream = source._concurrent_streams[i]._stream_partition_generator._stream - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._concurrent_streams[i]._stream_partition_generator._stream = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") - - for i, _ in enumerate(source._synchronous_streams): - stream = source._synchronous_streams[i] - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._synchronous_streams[i] = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + with HttpMocker() as http_mocker: + _mock_party_members_requests(http_mocker, concurrent_cursor_party_members_slices) + _mock_locations_requests(http_mocker, location_slices) + http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE) + _mock_party_members_skills_requests(http_mocker) - messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=state)) + messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=state)) # Expects 8 records, skip successful intervals and are left with 2 slices, 4 records each slice locations_records = get_records_for_stream("locations", messages) @@ -663,12 +777,12 @@ def test_read_with_concurrent_and_synchronous_streams_with_concurrent_state(): palaces_records = get_records_for_stream("palaces", messages) assert len(palaces_records) == 7 - # Expects 3 records, 1 empty slice, 3 records in slice + # Expects 3 records, 3 slices, 3 records in slice party_members_skills_records = get_records_for_stream("party_members_skills", messages) - assert len(party_members_skills_records) == 3 + assert len(party_members_skills_records) == 9 -@freezegun.freeze_time("2024-09-10T00:00:00") +@freezegun.freeze_time(_NOW) def test_read_with_concurrent_and_synchronous_streams_with_sequential_state(): """ Verifies that a ConcurrentDeclarativeSource processes concurrent streams correctly using the incoming @@ -692,26 +806,24 @@ def test_read_with_concurrent_and_synchronous_streams_with_sequential_state(): ] source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state) - disable_emitting_sequential_state_messages(source=source) - for i, _ in enumerate(source._concurrent_streams): - stream = source._concurrent_streams[i]._stream_partition_generator._stream - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._concurrent_streams[i]._stream_partition_generator._stream = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + concurrent_cursor_party_members_slices = [ + {"start": "2024-08-15", "end": "2024-08-29"}, + {"start": "2024-08-30", "end": "2024-09-09"}, + ] + location_slices = [ + {"start": "2024-08-01", "end": "2024-08-31"}, + {"start": "2024-09-01", "end": "2024-09-09"}, + ] - for i, _ in enumerate(source._synchronous_streams): - stream = source._synchronous_streams[i] - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._synchronous_streams[i] = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + with HttpMocker() as http_mocker: + _mock_party_members_requests(http_mocker, concurrent_cursor_party_members_slices) + _mock_locations_requests(http_mocker, location_slices) + http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE) + _mock_party_members_skills_requests(http_mocker) - messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=state)) + messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=_CATALOG, state=state)) # Expects 8 records, skip successful intervals and are left with 2 slices, 4 records each slice locations_records = get_records_for_stream("locations", messages) @@ -739,37 +851,29 @@ def test_read_with_concurrent_and_synchronous_streams_with_sequential_state(): palaces_records = get_records_for_stream("palaces", messages) assert len(palaces_records) == 7 - # Expects 3 records, 1 empty slice, 3 records in slice + # Expects 3 records, 3 slices, 3 records in slice party_members_skills_records = get_records_for_stream("party_members_skills", messages) - assert len(party_members_skills_records) == 3 + assert len(party_members_skills_records) == 9 -@freezegun.freeze_time("2024-09-10T00:00:00") +@freezegun.freeze_time(_NOW) def test_read_concurrent_with_failing_partition_in_the_middle(): """ Verify that partial state is emitted when only some partitions are successful during a concurrent sync attempt """ + most_recent_cursor_value = datetime(2024, 8, 10, 0, 0, 0, tzinfo=timezone.utc) # based on _LOCATIONS_RESPONSE, this value might be outside of the actual start/end of the slice + # FIXME it seems like we yet don't consider `most_recent_cursor_value` as we ignore it during serialization [here](https://github.com/airbytehq/airbyte/blob/f07571f15f1bdbba86ad5e324e829a89b7d07cd6/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py#L75-L78) + # Once this is added, this test will probably have to change because the generated slices (represented by `location_slices` here) will no longer match the `expected_stream_state` + # This is why `most_recent_cursor_value` is unused for now + location_slices = [ + {"start": "2024-07-01", "end": "2024-07-31"}, + # missing slice `{"start": "2024-08-01", "end": "2024-08-31"}` here + {"start": "2024-09-01", "end": "2024-09-09"}, + ] expected_stream_state = { "state_type": "date-range", - "slices": [ - { - "start": datetime(2024, 7, 5, 0, 0, tzinfo=timezone.utc), - "end": datetime(2024, 8, 4, 0, 0, 0, tzinfo=timezone.utc), - "most_recent_cursor_value": datetime(2024, 8, 10, 0, 0, 0, tzinfo=timezone.utc), - }, - { - "end": datetime(2024, 9, 9, 0, 0, 0, tzinfo=timezone.utc), - "start": datetime(2024, 9, 5, 0, 0, 0, tzinfo=timezone.utc), - "most_recent_cursor_value": datetime(2024, 8, 10, 0, 0, 0, tzinfo=timezone.utc), - } - ], - "legacy": {}, - } - - # I swapped the config as a trick to get different intervals to get returned by the mocked read_records() method - config = { - "start_date": "2024-07-05T00:00:00.000Z" + "slices": location_slices, } catalog = ConfiguredAirbyteCatalog( @@ -782,76 +886,68 @@ def test_read_concurrent_with_failing_partition_in_the_middle(): ] ) - source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=config, catalog=catalog, state=[]) - + source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[]) disable_emitting_sequential_state_messages(source=source) - for i, _ in enumerate(source._concurrent_streams): - stream = source._concurrent_streams[i]._stream_partition_generator._stream - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._concurrent_streams[i]._stream_partition_generator._stream = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + location_slices = [ + {"start": "2024-07-01", "end": "2024-07-31"}, + # missing slice `{"start": "2024-08-01", "end": "2024-08-31"}` here + {"start": "2024-09-01", "end": "2024-09-09"}, + ] - for i, _ in enumerate(source._synchronous_streams): - stream = source._synchronous_streams[i] - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._synchronous_streams[i] = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + with HttpMocker() as http_mocker: + _mock_locations_requests(http_mocker, location_slices) - try: - list(source.read(logger=source.logger, config=config, catalog=catalog, state=[])) - except AirbyteTracedException: - locations_stream = [stream for stream in source.streams(config=config, include_concurrent_streams=True) if stream.name == "locations"][0] - final_stream_state = locations_stream.cursor.state - assert final_stream_state == expected_stream_state + messages = [] + try: + for message in source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[]): + messages.append(message) + except AirbyteTracedException: + assert get_states_for_stream(stream_name="locations", messages=messages)[-1].stream.stream_state.__dict__ == expected_stream_state -@freezegun.freeze_time("2024-09-10T00:00:00") +@freezegun.freeze_time(_NOW) def test_read_concurrent_skip_streams_not_in_catalog(): """ Verifies that the ConcurrentDeclarativeSource only syncs streams that are specified in the incoming ConfiguredCatalog """ + with HttpMocker() as http_mocker: + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream(name="palaces", json_schema={}, supported_sync_modes=[SyncMode.full_refresh]), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + ), + ConfiguredAirbyteStream( + stream=AirbyteStream(name="locations", json_schema={}, supported_sync_modes=[SyncMode.incremental]), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + ), + ] + ) - catalog = ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=AirbyteStream(name="palaces", json_schema={}, supported_sync_modes=[SyncMode.full_refresh]), - sync_mode=SyncMode.full_refresh, - destination_sync_mode=DestinationSyncMode.append, - ), - ConfiguredAirbyteStream( - stream=AirbyteStream(name="locations", json_schema={}, supported_sync_modes=[SyncMode.incremental]), - sync_mode=SyncMode.incremental, - destination_sync_mode=DestinationSyncMode.append, - ), + source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=None) + # locations requests + location_slices = [ + {"start": "2024-07-01", "end": "2024-07-31"}, + {"start": "2024-08-01", "end": "2024-08-31"}, + {"start": "2024-09-01", "end": "2024-09-09"}, ] - ) - - source = ConcurrentDeclarativeSource(source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=None) - - disable_emitting_sequential_state_messages(source=source) + locations_query_params = list(map(lambda _slice: _slice | {"m": "active", "i": "1", "g": "country"}, location_slices)) + _mock_requests( + http_mocker, + "https://persona.metaverse.com/locations", + locations_query_params, + [_LOCATIONS_RESPONSE] * len(location_slices), + ) - for i, _ in enumerate(source._concurrent_streams): - stream = source._concurrent_streams[i]._stream_partition_generator._stream - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._concurrent_streams[i]._stream_partition_generator._stream = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + # palaces requests + http_mocker.get(HttpRequest("https://persona.metaverse.com/palaces"), _PALACES_RESPONSE) - for i, _ in enumerate(source._synchronous_streams): - stream = source._synchronous_streams[i] - if isinstance(stream, DeclarativeStream): - decorated_stream = create_wrapped_stream(stream=stream) - source._synchronous_streams[i] = decorated_stream - else: - raise ValueError(f"Expecting stream as type DeclarativeStream, but got {type(stream)}") + disable_emitting_sequential_state_messages(source=source) - messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[])) + messages = list(source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[])) locations_records = get_records_for_stream(stream_name="locations", messages=messages) assert len(locations_records) == 12 @@ -997,6 +1093,7 @@ def get_mocked_read_records_output(stream_name: str) -> Mapping[tuple[str, str], {"id": "5", "world": "casino", "owner": "nijima"}, {"id": "6", "world": "cruiser", "owner": "shido"}, ] + case "party_members_skills": slices = [StreamSlice(cursor_slice={}, partition={})] @@ -1022,4 +1119,3 @@ def get_states_for_stream(stream_name: str, messages: List[AirbyteMessage]) -> L def disable_emitting_sequential_state_messages(source: ConcurrentDeclarativeSource) -> None: for concurrent_streams in source._concurrent_streams: # type: ignore # This is the easiest way to disable behavior from the test concurrent_streams.cursor._connector_state_converter._is_sequential_state = False # type: ignore # see above - concurrent_streams.cursor._connector_state_converter._is_sequential_state = False # type: ignore # see above From cdf85a529972dcd8f48ec79947e26617f42c6de2 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 18 Oct 2024 09:11:23 -0400 Subject: [PATCH 2/9] Fix typing on ConnectionChecker.check_connection --- .../airbyte_cdk/sources/declarative/checks/check_stream.py | 6 +++--- .../sources/declarative/checks/connection_checker.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index 83c2a7fb71e8..baf056d3c799 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -7,8 +7,8 @@ from dataclasses import InitVar, dataclass from typing import Any, List, Mapping, Tuple +from airbyte_cdk import AbstractSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker -from airbyte_cdk.sources.source import Source from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy @@ -27,8 +27,8 @@ class CheckStream(ConnectionChecker): def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters - def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: - streams = source.streams(config=config) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface + def check_connection(self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: + streams = source.streams(config=config) stream_name_to_stream = {s.name: s for s in streams} if len(streams) == 0: return False, f"No streams to connect to from source {source}" diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py index ede15537f38b..908e659b2a9d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/connection_checker.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from typing import Any, Mapping, Tuple -from airbyte_cdk.sources.source import Source +from airbyte_cdk import AbstractSource class ConnectionChecker(ABC): @@ -15,7 +15,7 @@ class ConnectionChecker(ABC): """ @abstractmethod - def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: + def check_connection(self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: """ Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API. From 7e48f67fcbfd67fb2600083250ed4a2c29d1bfbf Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 18 Oct 2024 09:21:07 -0400 Subject: [PATCH 3/9] self code-review --- .../sources/declarative/test_concurrent_declarative_source.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py index b5f1831d4271..4cc28ad04e59 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -137,7 +137,6 @@ } ] }, - "use_cache": False, # necessary to ensure all tests are independent and that HttpMocker can validate the HTTP requests }, "retriever": { "type": "SimpleRetriever", @@ -731,7 +730,7 @@ def test_read_with_concurrent_and_synchronous_streams_with_concurrent_state(): concurrent_cursor_party_members_slices = [ {"start": "2024-07-01", "end": "2024-07-15"}, - {"start": "2024-07-30", "end": "2024-08-13"}, # FIXME this is an interesting case where we restart from that top boundary. I'm wondering if we need to change that because if this was within one sync, we would start from 2024-09-01 and not 2024-07-30 + {"start": "2024-07-30", "end": "2024-08-13"}, # FIXME this is an interesting case where we restart from that top boundary. I'm wondering if we need to change that because if this was within one sync, we would start from 2024-08-01 and not 2024-07-30 {"start": "2024-08-14", "end": "2024-08-14"}, {"start": "2024-09-04", "end": "2024-09-09"}, # considering lookback window ] From 81c5b3c194e4d33425752ac06aec8713b2888054 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Mon, 21 Oct 2024 12:03:51 -0700 Subject: [PATCH 4/9] Code review - improve docstring --- .../sources/declarative/concurrent_declarative_source.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 9e043820b87a..786f2d0f281b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -134,9 +134,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: """ The `streams` method is used as part of the AbstractSource in the following cases: * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams - * ConcurrentDeclarativeSource.discover -> ManifestDeclarativeSource.discover -> streams + * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) - In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This + In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. """ return super().streams(config) From f0a4294c106a9348c8d606ee0dbf5af94a5d309e Mon Sep 17 00:00:00 2001 From: maxi297 Date: Mon, 21 Oct 2024 12:08:27 -0700 Subject: [PATCH 5/9] Fix comment and format --- .../sources/declarative/concurrent_declarative_source.py | 5 ++--- .../declarative/test_concurrent_declarative_source.py | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 786f2d0f281b..f75fc305dac2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -126,15 +126,14 @@ def read( yield from super().read(logger, config, filtered_catalog, state) def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: - return AirbyteCatalog( - streams=[stream.as_airbyte_stream() for stream in self._concurrent_streams + self._synchronous_streams] - ) + return AirbyteCatalog(streams=[stream.as_airbyte_stream() for stream in self._concurrent_streams + self._synchronous_streams]) def streams(self, config: Mapping[str, Any]) -> List[Stream]: """ The `streams` method is used as part of the AbstractSource in the following cases: * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) + Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. """ diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 4cc28ad04e59..64c283b24d22 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -4,7 +4,7 @@ import copy import json from datetime import datetime, timedelta, timezone -from typing import Any, Iterable, List, Mapping, Optional, Union, Dict +from typing import Any, Dict, Iterable, List, Mapping, Optional, Union import freezegun import isodate @@ -21,8 +21,8 @@ ConfiguredAirbyteStream, DestinationSyncMode, FailureType, - StreamDescriptor, Status, + StreamDescriptor, SyncMode, ) from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSource @@ -37,7 +37,6 @@ from airbyte_cdk.utils import AirbyteTracedException from deprecated.classic import deprecated - _CONFIG = { "start_date": "2024-07-01T00:00:00.000Z" } From 352b8432ad389a394da83a55e5e06d5a7019e112 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Tue, 22 Oct 2024 15:52:23 -0700 Subject: [PATCH 6/9] format --- .../airbyte_cdk/sources/streams/concurrent/cursor.py | 4 +++- airbyte-cdk/python/airbyte_cdk/test/mock_http/request.py | 7 ++++++- .../declarative/test_concurrent_declarative_source.py | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py index 33b672e81562..21713869c303 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -334,7 +334,9 @@ def _calculate_lower_boundary_of_last_slice(self, lower_boundary: CursorValueTyp return lower_boundary - self._lookback_window return lower_boundary - def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool) -> Iterable[Tuple[CursorValueType, CursorValueType]]: + def _split_per_slice_range( + self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool + ) -> Iterable[Tuple[CursorValueType, CursorValueType]]: if lower >= upper: return diff --git a/airbyte-cdk/python/airbyte_cdk/test/mock_http/request.py b/airbyte-cdk/python/airbyte_cdk/test/mock_http/request.py index 8f276913892f..756be23edd06 100644 --- a/airbyte-cdk/python/airbyte_cdk/test/mock_http/request.py +++ b/airbyte-cdk/python/airbyte_cdk/test/mock_http/request.py @@ -88,5 +88,10 @@ def __repr__(self) -> str: def __eq__(self, other: Any) -> bool: if isinstance(other, HttpRequest): - return self._parsed_url == other._parsed_url and self._query_params == other._query_params and self._headers == other._headers and self._body == other._body + return ( + self._parsed_url == other._parsed_url + and self._query_params == other._query_params + and self._headers == other._headers + and self._body == other._body + ) return False diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 0a34d17dbc2f..fd21b1561486 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -5,7 +5,7 @@ import copy import json from datetime import datetime, timedelta, timezone -from typing import Any, Dict, Iterable, List, Mapping, Optional, Union, Tuple +from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union import freezegun import isodate From 8ed179a038206d5e06fc396979e811054a8f9a4f Mon Sep 17 00:00:00 2001 From: maxi297 Date: Tue, 22 Oct 2024 16:14:21 -0700 Subject: [PATCH 7/9] fix tests --- .../sources/declarative/test_concurrent_declarative_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py index fd21b1561486..1fa2e6d08452 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -524,7 +524,7 @@ def test_check(): with HttpMocker() as http_mocker: http_mocker.get( HttpRequest("https://persona.metaverse.com/party_members?start=2024-07-01&end=2024-07-15"), - HttpResponse(json.dumps({"id": "party_member_1"})), + HttpResponse(json.dumps({"id": "amamiya", "first_name": "ren", "last_name": "amamiya", "updated_at": "2024-07-10"})), ) http_mocker.get( HttpRequest("https://persona.metaverse.com/palaces"), From 4413411a3906585dd79351646880a0b8a0ffc906 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Wed, 23 Oct 2024 08:25:58 -0700 Subject: [PATCH 8/9] lint --- .../declarative/concurrent_declarative_source.py | 2 +- .../test_concurrent_declarative_source.py | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 6876a104be18..4d458fdc6922 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -5,7 +5,7 @@ import logging from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple, Union -from airbyte_cdk.models import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog +from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 1fa2e6d08452..3fb0f6571043 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -708,7 +708,16 @@ def test_read_with_concurrent_and_synchronous_streams_with_concurrent_state(): ] party_members_slices_and_responses = _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES + [ - ({"start": "2024-09-04", "end": "2024-09-10"}, HttpResponse(json.dumps([{"id": "yoshizawa", "first_name": "sumire", "last_name": "yoshizawa", "updated_at": "2024-09-10"}]))) # considering lookback window + ( + {"start": "2024-09-04", "end": "2024-09-10"}, + HttpResponse( + json.dumps( + [ + {"id": "yoshizawa", "first_name": "sumire", "last_name": "yoshizawa", "updated_at": "2024-09-10"} + ] + ) + ), + ) # considering lookback window ] location_slices = [ {"start": "2024-07-26", "end": "2024-08-25"}, @@ -839,7 +848,7 @@ def test_read_concurrent_with_failing_partition_in_the_middle(): Verify that partial state is emitted when only some partitions are successful during a concurrent sync attempt """ - most_recent_cursor_value = datetime(2024, 8, 10, 0, 0, 0, tzinfo=timezone.utc) # based on _LOCATIONS_RESPONSE, this value might be outside of the actual start/end of the slice + # most_recent_cursor_value = datetime(2024, 8, 10, 0, 0, 0, tzinfo=timezone.utc) # based on _LOCATIONS_RESPONSE, this value might be outside of the actual start/end of the slice # FIXME it seems like we yet don't consider `most_recent_cursor_value` as we ignore it during serialization [here](https://github.com/airbytehq/airbyte/blob/f07571f15f1bdbba86ad5e324e829a89b7d07cd6/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py#L75-L78) # Once this is added, this test will probably have to change because the generated slices (represented by `location_slices` here) will no longer match the `expected_stream_state` # This is why `most_recent_cursor_value` is unused for now From ec7f1402a19cd54aa080ea4f5c8522846a2be32d Mon Sep 17 00:00:00 2001 From: maxi297 Date: Wed, 23 Oct 2024 08:50:28 -0700 Subject: [PATCH 9/9] fix tests --- .../declarative/requesters/test_http_job_repository.py | 9 +++++++-- .../python/unit_tests/test/mock_http/test_mocker.py | 10 +++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_job_repository.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_job_repository.py index 90c1f9854baf..90768d8bbb60 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_job_repository.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_job_repository.py @@ -168,12 +168,17 @@ def test_given_unknown_status_when_update_jobs_status_then_raise_error(self) -> self._repository.update_jobs_status([job]) def test_given_multiple_jobs_when_update_jobs_status_then_all_the_jobs_are_updated(self) -> None: - self._mock_create_response(_A_JOB_ID) + self._http_mocker.post( + HttpRequest(url=_EXPORT_URL), + [ + HttpResponse(body=json.dumps({"id": _A_JOB_ID})), + HttpResponse(body=json.dumps({"id": _ANOTHER_JOB_ID})), + ], + ) self._http_mocker.get( HttpRequest(url=f"{_EXPORT_URL}/{_A_JOB_ID}"), HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "ready"})), ) - self._mock_create_response(_ANOTHER_JOB_ID) self._http_mocker.get( HttpRequest(url=f"{_EXPORT_URL}/{_ANOTHER_JOB_ID}"), HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "ready"})), diff --git a/airbyte-cdk/python/unit_tests/test/mock_http/test_mocker.py b/airbyte-cdk/python/unit_tests/test/mock_http/test_mocker.py index 0aae9ec405e4..2b086a1748c6 100644 --- a/airbyte-cdk/python/unit_tests/test/mock_http/test_mocker.py +++ b/airbyte-cdk/python/unit_tests/test/mock_http/test_mocker.py @@ -251,9 +251,9 @@ def decorated_function(http_mocker): decorated_function() def test_given_request_already_mocked_when_decorate_then_raise(self): - http_mocker = HttpMocker() - a_request = HttpRequest(_A_URL, _SOME_QUERY_PARAMS, _SOME_HEADERS) - http_mocker.get(a_request, _A_RESPONSE) - - with pytest.raises(ValueError): + with HttpMocker() as http_mocker: + a_request = HttpRequest(_A_URL, _SOME_QUERY_PARAMS, _SOME_HEADERS) http_mocker.get(a_request, _A_RESPONSE) + + with pytest.raises(ValueError): + http_mocker.get(a_request, _A_RESPONSE)