Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maxi297/fix streams interface #46995

Open
wants to merge 10 commits into
base: brian/concurrent_declarative_source
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,9 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
"""

@abstractmethod
def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]:
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: The user-provided configuration as specified by the source's spec.
:param include_concurrent_streams: Concurrent sources can be made up of streams that can be run concurrently and
ones that must be run synchronously. By default, for backwards compatibility this is disabled.
Any stream construction related operation should happen here.
:return: A list of the streams in this source connector.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Tuple

from airbyte_cdk import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy


Expand All @@ -27,8 +27,8 @@ class CheckStream(ConnectionChecker):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters

def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
streams = source.streams(config=config, include_concurrent_streams=True) # type: ignore # source is always a DeclarativeSource, but this parameter type adheres to the outer interface
def check_connection(self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
streams = source.streams(config=config)
stream_name_to_stream = {s.name: s for s in streams}
if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import ABC, abstractmethod
from typing import Any, Mapping, Tuple

from airbyte_cdk.sources.source import Source
from airbyte_cdk import AbstractSource


class ConnectionChecker(ABC):
Expand All @@ -15,7 +15,7 @@ class ConnectionChecker(ABC):
"""

@abstractmethod
def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
def check_connection(self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
"""
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
to the Stripe API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
from typing import Any, Generic, Iterator, List, Mapping, Optional, Tuple, Union

from airbyte_cdk.models import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
Expand Down Expand Up @@ -105,24 +105,19 @@ def read(
filtered_catalog = self._remove_concurrent_streams_from_catalog(catalog=catalog, concurrent_stream_names=concurrent_stream_names)
yield from super().read(logger, config, filtered_catalog, state)

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
return super().check(logger=logger, config=config)

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
return AirbyteCatalog(
streams=[stream.as_airbyte_stream() for stream in self.streams(config=config, include_concurrent_streams=True)]
)
return AirbyteCatalog(streams=[stream.as_airbyte_stream() for stream in self._concurrent_streams + self._synchronous_streams])

def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]:
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
Returns the list of streams that can be run synchronously in the Python CDK.
The `streams` method is used as part of the AbstractSource in the following cases:
* ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
* ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.

NOTE: For ConcurrentDeclarativeSource, this method only returns synchronous streams because it usage is invoked within the
existing Python CDK. Streams that support concurrency are started from read().
In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
"""
if include_concurrent_streams:
return self._synchronous_streams + self._concurrent_streams # type: ignore # Although AbstractStream doesn't inherit stream, they were designed to fit the same interface when called from streams()
return self._synchronous_streams
return super().streams(config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to transcribe a convo w/ maxime. This method doesn't technically doesn't need to be overwritten because it just calls the parent implementation.

However, this comment is very useful to explain why this work. Let's just add one more comment that we would deprecate this once the concurrent AbstractStream class implements are more thorough check implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the comment to reflect that. I've added the path for the read although mentioning that we filter using the catalog. The reason is that it feels like this is something we will need to consider when doing the migration to remove old CDK logic.


def _group_streams(self, config: Mapping[str, Any]) -> Tuple[List[AbstractStream], List[Stream]]:
concurrent_streams: List[AbstractStream] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def connection_checker(self) -> ConnectionChecker:
else:
raise ValueError(f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}")

def streams(self, config: Mapping[str, Any], include_concurrent_streams: bool = False) -> List[Stream]:
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})
stream_configs = self._stream_configs(self._source_config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,22 +292,36 @@ def generate_slices(self) -> Iterable[Tuple[CursorValueType, CursorValueType]]:
self._merge_partitions()

if self._start is not None and self._is_start_before_first_slice():
yield from self._split_per_slice_range(self._start, self.state["slices"][0][self._connector_state_converter.START_KEY])
yield from self._split_per_slice_range(
self._start,
self.state["slices"][0][self._connector_state_converter.START_KEY],
False,
)

if len(self.state["slices"]) == 1:
yield from self._split_per_slice_range(
self._calculate_lower_boundary_of_last_slice(self.state["slices"][0][self._connector_state_converter.END_KEY]),
self._end_provider(),
True,
)
elif len(self.state["slices"]) > 1:
for i in range(len(self.state["slices"]) - 1):
yield from self._split_per_slice_range(
self.state["slices"][i][self._connector_state_converter.END_KEY],
self.state["slices"][i + 1][self._connector_state_converter.START_KEY],
)
if self._cursor_granularity:
yield from self._split_per_slice_range(
self.state["slices"][i][self._connector_state_converter.END_KEY] + self._cursor_granularity,
self.state["slices"][i + 1][self._connector_state_converter.START_KEY],
False,
)
else:
yield from self._split_per_slice_range(
self.state["slices"][i][self._connector_state_converter.END_KEY],
self.state["slices"][i + 1][self._connector_state_converter.START_KEY],
False,
)
yield from self._split_per_slice_range(
self._calculate_lower_boundary_of_last_slice(self.state["slices"][-1][self._connector_state_converter.END_KEY]),
self._end_provider(),
True,
)
else:
raise ValueError("Expected at least one slice")
Expand All @@ -320,7 +334,9 @@ def _calculate_lower_boundary_of_last_slice(self, lower_boundary: CursorValueTyp
return lower_boundary - self._lookback_window
return lower_boundary

def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType) -> Iterable[Tuple[CursorValueType, CursorValueType]]:
def _split_per_slice_range(
self, lower: CursorValueType, upper: CursorValueType, upper_is_end: bool
) -> Iterable[Tuple[CursorValueType, CursorValueType]]:
if lower >= upper:
return

Expand All @@ -329,7 +345,7 @@ def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType)

lower = max(lower, self._start) if self._start else lower
if not self._slice_range or lower + self._slice_range >= upper:
if self._cursor_granularity:
if self._cursor_granularity and not upper_is_end:
yield lower, upper - self._cursor_granularity
else:
yield lower, upper
Expand All @@ -338,7 +354,8 @@ def _split_per_slice_range(self, lower: CursorValueType, upper: CursorValueType)
current_lower_boundary = lower
while not stop_processing:
current_upper_boundary = min(current_lower_boundary + self._slice_range, upper)
if self._cursor_granularity:
has_reached_upper_boundary = current_upper_boundary >= upper
if self._cursor_granularity and (not upper_is_end or not has_reached_upper_boundary):
yield current_lower_boundary, current_upper_boundary - self._cursor_granularity
else:
yield current_lower_boundary, current_upper_boundary
Expand Down
6 changes: 6 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/test/mock_http/matcher.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from typing import Any

from airbyte_cdk.test.mock_http.request import HttpRequest

Expand Down Expand Up @@ -33,3 +34,8 @@ def __str__(self) -> str:
f"minimum_number_of_expected_match={self._minimum_number_of_expected_match}, "
f"actual_number_of_matches={self._actual_number_of_matches})"
)

def __eq__(self, other: Any) -> bool:
if isinstance(other, HttpRequestMatcher):
return self._request_to_match == other._request_to_match
return False
2 changes: 2 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/test/mock_http/mocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def _mock_request_method(
responses = [responses]

matcher = HttpRequestMatcher(request, len(responses))
if matcher in self._matchers:
raise ValueError(f"Request {matcher.request} already mocked")
self._matchers.append(matcher)

getattr(self._mocker, method)(
Expand Down
10 changes: 10 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/test/mock_http/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,13 @@ def __str__(self) -> str:

def __repr__(self) -> str:
return f"HttpRequest(request={self._parsed_url}, headers={self._headers}, body={self._body!r})"

def __eq__(self, other: Any) -> bool:
if isinstance(other, HttpRequest):
return (
self._parsed_url == other._parsed_url
and self._query_params == other._query_params
and self._headers == other._headers
and self._body == other._body
)
return False
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,17 @@ def test_given_unknown_status_when_update_jobs_status_then_raise_error(self) ->
self._repository.update_jobs_status([job])

def test_given_multiple_jobs_when_update_jobs_status_then_all_the_jobs_are_updated(self) -> None:
self._mock_create_response(_A_JOB_ID)
self._http_mocker.post(
HttpRequest(url=_EXPORT_URL),
[
HttpResponse(body=json.dumps({"id": _A_JOB_ID})),
HttpResponse(body=json.dumps({"id": _ANOTHER_JOB_ID})),
],
)
self._http_mocker.get(
HttpRequest(url=f"{_EXPORT_URL}/{_A_JOB_ID}"),
HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "ready"})),
)
self._mock_create_response(_ANOTHER_JOB_ID)
self._http_mocker.get(
HttpRequest(url=f"{_EXPORT_URL}/{_ANOTHER_JOB_ID}"),
HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "ready"})),
Expand Down
Loading
Loading