From 83a8281be0c0c4925f3df98c749507c881d14a33 Mon Sep 17 00:00:00 2001 From: Ella Rohm-Ensing Date: Mon, 12 Dec 2022 14:32:34 -0500 Subject: [PATCH 1/9] CDK: `AbstractSource.read()` skips syncing stream if its unavailable (add `AvailabilityStrategy` concept) (#19977) * Rough first implememtation of AvailabilityStrategy s * Basic unit tests for AvailabilityStrategy and ScopedAvailabilityStrategy * Make availability_strategy a property, separate out tests * Remove from DeclarativeSource, remove Source parameter from methods, make default no AvailabilityStrategy * Add skip stream if not available to read() * Changes to CDK to get source-github working using AvailabilityStrategy, flakecheck * reorganize cdk class, add HTTPAvailabilityStrategy test * cleanup, docstrings * pull out error handling into separate method * Pass source and logger to check_connection method * Add documentation links, handle 403 specifically * Fix circular import * Add AvailabilityStrategy to Stream and HTTPStream classes * Remove AS from abstract_source, add to Stream, HTTPStream, AvailabilityStrategy unit tests passing for per-stream strategies * Modify MockHttpStream to set no AvailabilityStrategy since source test mocking doesn't support this * Move AvailabilityStrategy class to sources.streams * Move HTTPAvailabilityStrategy to http module * Use pascal case for HttpAvailabilityStrategy * Remove docs message method :( and default to True availability on unhandled HTTPErrors * add check_availability method to stream class * Add optional source parameter * Add test for connector-specific documentation, small tests refactor * Add test that performs the read() function for stream with default availability strategy * Add test for read function behavior when stream is unavailable * Add 403 info in logger message * Don't return error for other HTTPErrors * Split up error handling into methods 'unavailable_error_codes' and 'get_reason_for_error' * rework overrideable list of status codes to be a dict with reasons, to enforce that users provide reasons for all listed errors * Fix incorrect typing * Move HttpAvailability to its own module, fix flake errors * Fix ScopedAvailabilityStrategy, docstrings and types for streams/availability_strategy.py * Docstrings and types for core.py and http/availability_strategy.py * Move _get_stream_slices to a StreamHelper class * Docstrings + types for stream_helpers.py, cleanup test_availability.py * Clean up test_source.py * Move logic of getting the initial record from a stream to StreamHelper class * Add changelog and bump minor version * change 'is True' and 'is False' behavior * use mocker.MagicMock * Remove ScopedAvailabilityStrategy * Don't except non-403 errors, check_stream uses availability_strategy if possible * CDK: pass error to reasons_for_error_codes * make get_stream_slice public * Add tests for raising unhandled errors and retries are handled * Add tests for CheckStream via AvailabilityStrategy * Add documentation for stream availability of http streams * Move availability unit tests to correct modules, report error message if possible * Add test for reporting specific error if available --- .../airbyte_cdk/sources/abstract_source.py | 6 +- .../declarative/checks/check_stream.py | 10 ++ .../sources/declarative/declarative_source.py | 2 +- .../sources/streams/availability_strategy.py | 33 ++++ .../airbyte_cdk/sources/streams/core.py | 29 +++- .../streams/http/availability_strategy.py | 134 +++++++++++++++ .../airbyte_cdk/sources/streams/http/http.py | 6 + .../sources/utils/stream_helpers.py | 44 +++++ .../python/docs/concepts/http-streams.md | 11 ++ .../declarative/checks/test_check_stream.py | 61 +++++++ .../http/test_availability_strategy.py | 141 +++++++++++++++ .../streams/test_availability_strategy.py | 70 ++++++++ .../python/unit_tests/sources/test_source.py | 161 +++++++++++++++--- .../sources/utils/test_schema_helpers.py | 4 +- 14 files changed, 688 insertions(+), 24 deletions(-) create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/streams/availability_strategy.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py create mode 100644 airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py create mode 100644 airbyte-cdk/python/unit_tests/sources/streams/test_availability_strategy.py diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index d1ac63e76e82..3f9efab8e48f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -106,6 +106,10 @@ def read( f"The requested stream {configured_stream.stream.name} was not found in the source." f" Available streams: {stream_instances.keys()}" ) + stream_is_available, error = stream_instance.check_availability(logger, self) + if not stream_is_available: + logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. Error: {error}") + continue try: timer.start_event(f"Syncing stream {configured_stream.stream.name}") yield from self._read_stream( @@ -187,7 +191,7 @@ def _read_stream( @staticmethod def _limit_reached(internal_config: InternalConfig, records_counter: int) -> bool: """ - Check if record count reached liimt set by internal config. + Check if record count reached limit set by internal config. :param internal_config - internal CDK configuration separated from user defined config :records_counter - number of records already red :return True if limit reached, False otherwise diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index f71e5ab47ce6..86ed8512b6aa 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -39,6 +39,16 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}.") stream = stream_name_to_stream[stream_name] + if stream.availability_strategy is not None: + try: + stream_is_available, reason = stream.check_availability(logger, source) + if stream_is_available: + return True, None + else: + return False, reason + except Exception as error: + return False, f"Unable to connect to stream {stream_name} - {error}" + try: # Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer) # Streams that don't need a stream slice will return `None` as their first stream slice. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py index 6e79356ee93b..beb3cfaa1d26 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py @@ -17,7 +17,7 @@ class DeclarativeSource(AbstractSource): @property @abstractmethod def connection_checker(self) -> ConnectionChecker: - """Returns the ConnectioChecker to use for the `check` operation""" + """Returns the ConnectionChecker to use for the `check` operation""" def check_connection(self, logger, config) -> Tuple[bool, any]: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/availability_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/availability_strategy.py new file mode 100644 index 000000000000..bb86a1c1de0d --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/availability_strategy.py @@ -0,0 +1,33 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import logging +import typing +from abc import ABC, abstractmethod +from typing import Optional, Tuple + +from airbyte_cdk.sources.streams import Stream + +if typing.TYPE_CHECKING: + from airbyte_cdk.sources import Source + + +class AvailabilityStrategy(ABC): + """ + Abstract base class for checking stream availability. + """ + + @abstractmethod + def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]: + """ + Checks stream availability. + + :param stream: stream + :param logger: source logger + :param source: (optional) source + :return: A tuple of (boolean, str). If boolean is true, then the stream + is available, and no str is required. Otherwise, the stream is unavailable + for some reason and the str should describe what went wrong and how to + resolve the unavailability, if possible. + """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index d39c706eb9aa..5ff57550e003 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -5,9 +5,10 @@ import inspect import logging +import typing from abc import ABC, abstractmethod from functools import lru_cache -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union import airbyte_cdk.sources.utils.casing as casing from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode @@ -17,6 +18,10 @@ from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from deprecated.classic import deprecated +if typing.TYPE_CHECKING: + from airbyte_cdk.sources import Source + from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy + # A stream's read method can return one of the following types: # Mapping[str, Any]: The content of an AirbyteRecordMessage # AirbyteRecordMessage: An AirbyteRecordMessage @@ -170,6 +175,28 @@ def source_defined_cursor(self) -> bool: """ return True + def check_availability(self, logger: logging.Logger, source: Optional["Source"] = None) -> Tuple[bool, Optional[str]]: + """ + Checks whether this stream is available. + + :param logger: source logger + :param source: (optional) source + :return: A tuple of (boolean, str). If boolean is true, then this stream + is available, and no str is required. Otherwise, this stream is unavailable + for some reason and the str should describe what went wrong and how to + resolve the unavailability, if possible. + """ + if self.availability_strategy: + return self.availability_strategy.check_availability(self, logger, source) + return True, None + + @property + def availability_strategy(self) -> Optional["AvailabilityStrategy"]: + """ + :return: The AvailabilityStrategy used to check whether this stream is available. + """ + return None + @property @abstractmethod def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py new file mode 100644 index 000000000000..412db6d6a217 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py @@ -0,0 +1,134 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import logging +import typing +from typing import Dict, Optional, Tuple + +import requests +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy +from airbyte_cdk.sources.streams.utils.stream_helper import get_first_record_for_slice, get_first_stream_slice +from requests import HTTPError + +if typing.TYPE_CHECKING: + from airbyte_cdk.sources import Source + + +class HttpAvailabilityStrategy(AvailabilityStrategy): + def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]: + """ + Check stream availability by attempting to read the first record of the + stream. + + :param stream: stream + :param logger: source logger + :param source: (optional) source + :return: A tuple of (boolean, str). If boolean is true, then the stream + is available, and no str is required. Otherwise, the stream is unavailable + for some reason and the str should describe what went wrong and how to + resolve the unavailability, if possible. + """ + try: + # Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer) + # Streams that don't need a stream slice will return `None` as their first stream slice. + stream_slice = get_first_stream_slice(stream) + except StopIteration: + # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) + # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield ` + # without accounting for the case in which the parent stream is empty. + reason = f"Cannot attempt to connect to stream {stream_name} - no stream slices were found, likely because the parent stream is empty." + return False, reason + + try: + get_first_record_for_slice(stream, stream_slice) + return True, None + except StopIteration: + logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.") + return True, None + except HTTPError as error: + return self.handle_http_error(stream, logger, source, error) + + + def handle_http_error( + self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError + ) -> Tuple[bool, Optional[str]]: + """ + Override this method to define error handling for various `HTTPError`s + that are raised while attempting to check a stream's availability. + + Checks whether an error's status_code is in a list of unavailable_error_codes, + and gets the associated reason for that error. + + :param stream: stream + :param logger: source logger + :param source: optional (source) + :param error: HTTPError raised while checking stream's availability. + :return: A tuple of (boolean, str). If boolean is true, then the stream + is available, and no str is required. Otherwise, the stream is unavailable + for some reason and the str should describe what went wrong and how to + resolve the unavailability, if possible. + """ + try: + status_code = error.response.status_code + reason = self.reasons_for_unavailable_status_codes(stream, logger, source, error)[status_code] + response_error_message = stream.parse_response_error_message(error.response) + if response_error_message: + reason += response_error_message + return False, reason + except KeyError: + # If the HTTPError is not in the dictionary of errors we know how to handle, don't except it + raise error + + def reasons_for_unavailable_status_codes( + self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError + ) -> Dict[int, str]: + """ + Returns a dictionary of HTTP status codes that indicate stream + unavailability and reasons explaining why a given status code may + have occurred and how the user can resolve that error, if applicable. + + :param stream: stream + :param logger: source logger + :param source: optional (source) + :return: A dictionary of (status code, reason) where the 'reason' explains + why 'status code' may have occurred and how the user can resolve that + error, if applicable. + """ + forbidden_error_message = f"The endpoint to access stream '{stream.name}' returned 403: Forbidden. " + forbidden_error_message += "This is most likely due to insufficient permissions on the credentials in use. " + forbidden_error_message += self._visit_docs_message(logger, source) + + reasons_for_codes: Dict[int, str] = {requests.codes.FORBIDDEN: forbidden_error_message} + return reasons_for_codes + + @staticmethod + def _visit_docs_message(logger: logging.Logger, source: Optional["Source"]) -> str: + """ + Creates a message indicicating where to look in the documentation for + more information on a given source by checking the spec of that source + (if provided) for a 'documentationUrl'. + + :param logger: source logger + :param source: optional (source) + :return: A message telling the user where to go to learn more about the source. + """ + if not source: + return "Please visit the connector's documentation to learn more. " + + try: + connector_spec = source.spec(logger) + docs_url = connector_spec.documentationUrl + if docs_url: + return f"Please visit {docs_url} to learn more. " + else: + return "Please visit the connector's documentation to learn more. " + + except FileNotFoundError: # If we are unit testing without implementing spec() method in source + if source: + docs_url = f"https://docs.airbyte.com/integrations/sources/{source.name}" + else: + docs_url = "https://docs.airbyte.com/integrations/sources/test" + + return f"Please visit {docs_url} to learn more." diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 084eb052894e..35cef60807fe 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -13,7 +13,9 @@ import requests import requests_cache from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.core import Stream, StreamData +from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from requests.auth import AuthBase from requests_cache.session import CachedSession @@ -113,6 +115,10 @@ def retry_factor(self) -> float: def authenticator(self) -> HttpAuthenticator: return self._authenticator + @property + def availability_strategy(self) -> Optional[AvailabilityStrategy]: + return HttpAvailabilityStrategy() + @abstractmethod def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py new file mode 100644 index 000000000000..6f972246693b --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py @@ -0,0 +1,44 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Mapping, Optional + +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.core import StreamData + + +class StreamHelper: + def get_first_record(self, stream: Stream) -> StreamData: + """ + Gets the first record for a stream. + + :param stream: stream + :return: StreamData containing the first record in the stream + """ + # Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer) + stream_slice = self.get_stream_slice(stream) + records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) + next(records) + + @staticmethod + def get_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]: + """ + Gets the first stream_slice from a given stream's stream_slices. + + :param stream: stream + :return: First stream slice from 'stream_slices' generator + """ + # We wrap the return output of stream_slices() because some implementations return types that are iterable, + # but not iterators such as lists or tuples + slices = iter( + stream.stream_slices( + cursor_field=stream.cursor_field, + sync_mode=SyncMode.full_refresh, + ) + ) + try: + return next(slices) + except StopIteration: + return {} diff --git a/airbyte-cdk/python/docs/concepts/http-streams.md b/airbyte-cdk/python/docs/concepts/http-streams.md index 5bedb787c747..0be02ce35cdc 100644 --- a/airbyte-cdk/python/docs/concepts/http-streams.md +++ b/airbyte-cdk/python/docs/concepts/http-streams.md @@ -81,3 +81,14 @@ When we are dealing with streams that depend on the results of another stream, w If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc.. override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can be returned as a keyword argument. + +## Stream Availability + +The CDK defines an `AvailabilityStrategy` for a stream, which is used to perform the `check_availability` method. This method checks whether +the stream is available before performing `read_records`. + +For HTTP streams, a default `HttpAvailabilityStrategy` is defined, which attempts to read the first record of the stream, and excepts +a dictionary of known error codes and associated reasons, `reasons_for_unavailable_status_codes`. By default, this list contains only +`requests.status_codes.FORBIDDEN` (403), with an associated error message that tells the user that they are likely missing permissions associated with that stream. + +You can override these known errors to except more error codes and inform the user how to resolve errors. diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py index 1ea86bf88937..047cf89fc900 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py @@ -3,10 +3,15 @@ # import logging +import logging +from typing import Any, Iterable, Mapping, Optional from unittest.mock import MagicMock import pytest +import requests from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy logger = logging.getLogger("test") config = dict() @@ -28,6 +33,7 @@ def test_check_stream_with_slices_as_list(test_name, record, streams_to_check, stream_slice, expectation, slices_as_list): stream = MagicMock() stream.name = "s1" + stream.availability_strategy = None if slices_as_list: stream.stream_slices.return_value = [stream_slice] else: @@ -55,6 +61,7 @@ def mock_read_records(responses, default_response=None, **kwargs): def test_check_empty_stream(): stream = MagicMock() stream.name = "s1" + stream.availability_strategy = None stream.read_records.return_value = iter([]) stream.stream_slices.return_value = iter([None]) @@ -69,6 +76,7 @@ def test_check_empty_stream(): def test_check_stream_with_no_stream_slices_aborts(): stream = MagicMock() stream.name = "s1" + stream.availability_strategy = None stream.stream_slices.return_value = iter([]) source = MagicMock() @@ -78,3 +86,56 @@ def test_check_stream_with_no_stream_slices_aborts(): stream_is_available, reason = check_stream.check_connection(source, logger, config) assert not stream_is_available assert "no stream slices were found, likely because the parent stream is empty" in reason + + +@pytest.mark.parametrize( + "test_name, response_code, available_expectation, expected_messages", + [ + ("test_stream_unavailable_unhandled_error", 404, False, ["Unable to connect to stream mock_http_stream", "404 Client Error"]), + ("test_stream_unavailable_handled_error", 403, False, [ + "The endpoint to access stream 'mock_http_stream' returned 403: Forbidden.", + "This is most likely due to insufficient permissions on the credentials in use.", + ]), + ("test_stream_available", 200, True, []), + ], +) +def test_check_http_stream_via_availability_strategy(mocker, test_name, response_code, available_expectation, expected_messages): + class MockHttpStream(HttpStream): + url_base = "https://test_base_url.com" + primary_key = "" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.resp_counter = 1 + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def path(self, **kwargs) -> str: + return "" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + stub_resp = {"data": self.resp_counter} + self.resp_counter += 1 + yield stub_resp + pass + + http_stream = MockHttpStream() + assert isinstance(http_stream, HttpStream) + assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) + + source = MagicMock() + source.streams.return_value = [http_stream] + + check_stream = CheckStream(stream_names=["mock_http_stream"], options={}) + + req = requests.Response() + req.status_code = response_code + mocker.patch.object(requests.Session, "send", return_value=req) + + logger = logging.getLogger(f"airbyte.{getattr(source, 'name', '')}") + stream_is_available, reason = check_stream.check_connection(source, logger, config) + + assert stream_is_available == available_expectation + for message in expected_messages: + assert message in reason diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py new file mode 100644 index 000000000000..1168dc609ca0 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py @@ -0,0 +1,141 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import logging +from typing import Any, Iterable, List, Mapping, Optional, Tuple + +import pytest +import requests +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy +from airbyte_cdk.sources.streams.http.http import HttpStream +from requests import HTTPError + +logger = logging.getLogger("airbyte") + + +class MockHttpStream(HttpStream): + url_base = "https://test_base_url.com" + primary_key = "" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.resp_counter = 1 + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def path(self, **kwargs) -> str: + return "" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + stub_resp = {"data": self.resp_counter} + self.resp_counter += 1 + yield stub_resp + pass + + def retry_factor(self) -> float: + return 0.01 + + +def test_default_http_availability_strategy(mocker): + http_stream = MockHttpStream() + assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) + + class MockResponse(requests.Response, mocker.MagicMock): + def __init__(self, *args, **kvargs): + mocker.MagicMock.__init__(self) + requests.Response.__init__(self, **kvargs) + self.json = mocker.MagicMock() + + response = MockResponse() + response.status_code = 403 + response.json.return_value = {"error": "Oh no!"} + mocker.patch.object(requests.Session, "send", return_value=response) + + stream_is_available, reason = http_stream.check_availability(logger) + assert not stream_is_available + + expected_messages = [ + "This is most likely due to insufficient permissions on the credentials in use.", + "Please visit the connector's documentation to learn more.", + "Oh no!", + ] + for message in expected_messages: + assert message in reason + + req = requests.Response() + req.status_code = 200 + mocker.patch.object(requests.Session, "send", return_value=req) + + stream_is_available, _ = http_stream.check_availability(logger) + assert stream_is_available + + +def test_http_availability_connector_specific_docs(mocker): + class MockSource(AbstractSource): + def __init__(self, streams: List[Stream] = None): + self._streams = streams + + def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: + return True, "" + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + if not self._streams: + raise Exception("Stream is not set") + return self._streams + + http_stream = MockHttpStream() + source = MockSource(streams=[http_stream]) + assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) + + req = requests.Response() + req.status_code = 403 + mocker.patch.object(requests.Session, "send", return_value=req, json={"error": "Oh no!"}) + + stream_is_available, reason = http_stream.check_availability(logger, source) + assert not stream_is_available + + expected_messages = [ + f"The endpoint to access stream '{http_stream.name}' returned 403: Forbidden.", + "This is most likely due to insufficient permissions on the credentials in use.", + f"Please visit https://docs.airbyte.com/integrations/sources/{source.name} to learn more.", + # "Oh no!", + ] + for message in expected_messages: + assert message in reason + + +def test_http_availability_raises_unhandled_error(mocker): + http_stream = MockHttpStream() + assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) + + req = requests.Response() + req.status_code = 404 + mocker.patch.object(requests.Session, "send", return_value=req) + + with pytest.raises(HTTPError): + http_stream.check_availability(logger) + + +def test_send_handles_retries_when_checking_availability(mocker, caplog): + http_stream = MockHttpStream() + assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) + + req_1 = requests.Response() + req_1.status_code = 429 + req_2 = requests.Response() + req_2.status_code = 503 + req_3 = requests.Response() + req_3.status_code = 200 + mock_send = mocker.patch.object(requests.Session, "send", side_effect=[req_1, req_2, req_3]) + + with caplog.at_level(logging.INFO): + stream_is_available, _ = http_stream.check_availability(logger) + + assert stream_is_available + assert mock_send.call_count == 3 + for message in ["Caught retryable error", "Response Code: 429", "Response Code: 503"]: + assert message in caplog.text diff --git a/airbyte-cdk/python/unit_tests/sources/streams/test_availability_strategy.py b/airbyte-cdk/python/unit_tests/sources/streams/test_availability_strategy.py new file mode 100644 index 000000000000..277924b01197 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/streams/test_availability_strategy.py @@ -0,0 +1,70 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import logging +from typing import Any, Iterable, List, Mapping, Optional, Tuple, Union + +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources import Source +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy +from airbyte_cdk.sources.streams.core import StreamData + +logger = logging.getLogger("airbyte") + + +class MockStream(Stream): + def __init__(self, name: str) -> Stream: + self._name = name + + @property + def name(self) -> str: + return self._name + + @property + def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: + pass + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[StreamData]: + pass + + +def test_no_availability_strategy(): + stream_1 = MockStream("stream") + assert stream_1.availability_strategy is None + + stream_1_is_available, _ = stream_1.check_availability(logger) + assert stream_1_is_available + + +def test_availability_strategy(): + class MockAvailabilityStrategy(AvailabilityStrategy): + def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional[Source]) -> Tuple[bool, any]: + if stream.name == "available_stream": + return True, None + return False, f"Could not reach stream '{stream.name}'." + + class MockStreamWithAvailabilityStrategy(MockStream): + @property + def availability_strategy(self) -> Optional["AvailabilityStrategy"]: + return MockAvailabilityStrategy() + + stream_1 = MockStreamWithAvailabilityStrategy("available_stream") + stream_2 = MockStreamWithAvailabilityStrategy("unavailable_stream") + + for stream in [stream_1, stream_2]: + assert isinstance(stream.availability_strategy, MockAvailabilityStrategy) + + stream_1_is_available, _ = stream_1.check_availability(logger) + assert stream_1_is_available + + stream_2_is_available, reason = stream_2.check_availability(logger) + assert not stream_2_is_available + assert "Could not reach stream 'unavailable_stream'" in reason diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index 5b67d57444eb..64a546108db1 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -7,10 +7,10 @@ import tempfile from collections import defaultdict from contextlib import nullcontext as does_not_raise -from typing import Any, List, Mapping, MutableMapping, Optional, Tuple -from unittest.mock import MagicMock +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple import pytest +import requests from airbyte_cdk.models import ( AirbyteGlobalState, AirbyteStateBlob, @@ -24,6 +24,7 @@ ) from airbyte_cdk.sources import AbstractSource, Source from airbyte_cdk.sources.streams.core import Stream +from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from airbyte_cdk.sources.streams.http.http import HttpStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from pydantic import ValidationError @@ -43,10 +44,15 @@ def discover(self, logger: logging.Logger, config: Mapping[str, Any]): class MockAbstractSource(AbstractSource): + def __init__(self, streams: Optional[List[Stream]] = None): + self._streams = streams + def check_connection(self, *args, **kwargs) -> Tuple[bool, Optional[Any]]: return True, "" def streams(self, *args, **kwargs) -> List[Stream]: + if self._streams: + return self._streams return [] @@ -79,26 +85,30 @@ def abstract_source(mocker): mocker.patch.multiple(HttpStream, __abstractmethods__=set()) mocker.patch.multiple(Stream, __abstractmethods__=set()) - class MockHttpStream(MagicMock, HttpStream): + class MockHttpStream(mocker.MagicMock, HttpStream): url_base = "http://example.com" path = "/dummy/path" - get_json_schema = MagicMock() + get_json_schema = mocker.MagicMock() def supports_incremental(self): return True def __init__(self, *args, **kvargs): - MagicMock.__init__(self) + mocker.MagicMock.__init__(self) HttpStream.__init__(self, *args, kvargs) - self.read_records = MagicMock() + self.read_records = mocker.MagicMock() + + @property + def availability_strategy(self): + return None - class MockStream(MagicMock, Stream): + class MockStream(mocker.MagicMock, Stream): page_size = None - get_json_schema = MagicMock() + get_json_schema = mocker.MagicMock() - def __init__(self, *args, **kvargs): - MagicMock.__init__(self) - self.read_records = MagicMock() + def __init__(self, **kwargs): + mocker.MagicMock.__init__(self) + self.read_records = mocker.MagicMock() streams = [MockHttpStream(), MockStream()] @@ -385,8 +395,8 @@ def test_internal_config(abstract_source, catalog): assert not non_http_stream.page_size -def test_internal_config_limit(abstract_source, catalog): - logger_mock = MagicMock() +def test_internal_config_limit(mocker, abstract_source, catalog): + logger_mock = mocker.MagicMock() logger_mock.level = logging.DEBUG del catalog.streams[1] STREAM_LIMIT = 2 @@ -423,8 +433,8 @@ def test_internal_config_limit(abstract_source, catalog): SCHEMA = {"type": "object", "properties": {"value": {"type": "string"}}} -def test_source_config_no_transform(abstract_source, catalog): - logger_mock = MagicMock() +def test_source_config_no_transform(mocker, abstract_source, catalog): + logger_mock = mocker.MagicMock() logger_mock.level = logging.DEBUG streams = abstract_source.streams(None) http_stream, non_http_stream = streams @@ -437,8 +447,8 @@ def test_source_config_no_transform(abstract_source, catalog): assert non_http_stream.get_json_schema.call_count == 5 -def test_source_config_transform(abstract_source, catalog): - logger_mock = MagicMock() +def test_source_config_transform(mocker, abstract_source, catalog): + logger_mock = mocker.MagicMock() logger_mock.level = logging.DEBUG streams = abstract_source.streams(None) http_stream, non_http_stream = streams @@ -451,8 +461,8 @@ def test_source_config_transform(abstract_source, catalog): assert [r.record.data for r in records] == [{"value": "23"}] * 2 -def test_source_config_transform_and_no_transform(abstract_source, catalog): - logger_mock = MagicMock() +def test_source_config_transform_and_no_transform(mocker, abstract_source, catalog): + logger_mock = mocker.MagicMock() logger_mock.level = logging.DEBUG streams = abstract_source.streams(None) http_stream, non_http_stream = streams @@ -462,3 +472,116 @@ def test_source_config_transform_and_no_transform(abstract_source, catalog): records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] assert len(records) == 2 assert [r.record.data for r in records] == [{"value": "23"}, {"value": 23}] + + +def test_read_default_http_availability_strategy_stream_available(catalog, mocker): + mocker.patch.multiple(HttpStream, __abstractmethods__=set()) + mocker.patch.multiple(Stream, __abstractmethods__=set()) + + class MockHttpStream(mocker.MagicMock, HttpStream): + url_base = "http://example.com" + path = "/dummy/path" + get_json_schema = mocker.MagicMock() + + def supports_incremental(self): + return True + + def __init__(self, *args, **kvargs): + mocker.MagicMock.__init__(self) + HttpStream.__init__(self, *args, kvargs) + self.read_records = mocker.MagicMock() + + class MockStream(mocker.MagicMock, Stream): + page_size = None + get_json_schema = mocker.MagicMock() + + def __init__(self, *args, **kvargs): + mocker.MagicMock.__init__(self) + self.read_records = mocker.MagicMock() + + streams = [MockHttpStream(), MockStream()] + http_stream, non_http_stream = streams + assert isinstance(http_stream, HttpStream) + assert not isinstance(non_http_stream, HttpStream) + + assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) + assert non_http_stream.availability_strategy is None + + # Add an extra record for the default HttpAvailabilityStrategy to pull from + # during the try: next(records) check, since we are mocking the return value + # and not re-creating the generator like we would during actual reading + http_stream.read_records.return_value = iter([{"value": "test"}] + [{}] * 3) + non_http_stream.read_records.return_value = iter([{}] * 3) + + source = MockAbstractSource(streams=streams) + logger = logging.getLogger(f"airbyte.{getattr(abstract_source, 'name', '')}") + records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})] + # 3 for http stream and 3 for non http stream + assert len(records) == 3 + 3 + assert http_stream.read_records.called + assert non_http_stream.read_records.called + + +def test_read_default_http_availability_strategy_stream_unavailable(catalog, mocker, caplog): + mocker.patch.multiple(Stream, __abstractmethods__=set()) + + class MockHttpStream(HttpStream): + url_base = "https://test_base_url.com" + primary_key = "" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.resp_counter = 1 + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def path(self, **kwargs) -> str: + return "" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + stub_response = {"data": self.resp_counter} + self.resp_counter += 1 + yield stub_response + + class MockStream(mocker.MagicMock, Stream): + page_size = None + get_json_schema = mocker.MagicMock() + + def __init__(self, *args, **kvargs): + mocker.MagicMock.__init__(self) + self.read_records = mocker.MagicMock() + + streams = [MockHttpStream(), MockStream()] + http_stream, non_http_stream = streams + assert isinstance(http_stream, HttpStream) + assert not isinstance(non_http_stream, HttpStream) + + assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) + assert non_http_stream.availability_strategy is None + + # Don't set anything for read_records return value for HttpStream, since + # it should be skipped due to the stream being unavailable + non_http_stream.read_records.return_value = iter([{}] * 3) + + # Patch HTTP request to stream endpoint to make it unavailable + req = requests.Response() + req.status_code = 403 + mocker.patch.object(requests.Session, "send", return_value=req) + + source = MockAbstractSource(streams=streams) + logger = logging.getLogger("test_read_default_http_availability_strategy_stream_unavailable") + with caplog.at_level(logging.WARNING): + records = [r for r in source.read(logger=logger, config={}, catalog=catalog, state={})] + + # 0 for http stream and 3 for non http stream + assert len(records) == 0 + 3 + assert non_http_stream.read_records.called + expected_logs = [ + f"Skipped syncing stream '{http_stream.name}' because it was unavailable.", + f"The endpoint to access stream '{http_stream.name}' returned 403: Forbidden.", + "This is most likely due to insufficient permissions on the credentials in use.", + f"Please visit https://docs.airbyte.com/integrations/sources/{source.name} to learn more." + ] + for message in expected_logs: + assert message in caplog.text diff --git a/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py b/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py index 2274ffb02ae5..38aa713a5e78 100644 --- a/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py +++ b/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py @@ -4,6 +4,7 @@ import json +import logging import os import shutil import sys @@ -12,14 +13,13 @@ from pathlib import Path import jsonref -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification, FailureType from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, check_config_against_spec_or_exit from airbyte_cdk.utils.traced_exception import AirbyteTracedException from pytest import fixture from pytest import raises as pytest_raises -logger = AirbyteLogger() +logger = logging.getLogger("airbyte") MODULE = sys.modules[__name__] From a27a225183faea273f4640aa5474a993c37606fa Mon Sep 17 00:00:00 2001 From: erohmensing Date: Tue, 17 Jan 2023 09:28:53 -0600 Subject: [PATCH 2/9] Add test for http availability strategy on empty stream from original bug fix 7c17351631ece93fe78fc4a1a730c120cc132012 --- .../http/test_availability_strategy.py | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py index 1168dc609ca0..99fba2321219 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py @@ -139,3 +139,30 @@ def test_send_handles_retries_when_checking_availability(mocker, caplog): assert mock_send.call_count == 3 for message in ["Caught retryable error", "Response Code: 429", "Response Code: 503"]: assert message in caplog.text + + +def test_http_availability_strategy_on_empty_stream(mocker): + mocker.patch.multiple(HttpStream, __abstractmethods__=set()) + mocker.patch.multiple(Stream, __abstractmethods__=set()) + + class MockEmptyStream(mocker.MagicMock, HttpStream): + page_size = None + get_json_schema = mocker.MagicMock() + + def __init__(self, *args, **kvargs): + mocker.MagicMock.__init__(self) + self.read_records = mocker.MagicMock() + + empty_stream = MockEmptyStream() + assert isinstance(empty_stream, HttpStream) + + assert isinstance(empty_stream.availability_strategy, HttpAvailabilityStrategy) + + # Generator should have no values to generate + empty_stream.read_records.return_value = iter([]) + + logger = logging.getLogger("airbyte.test-source") + stream_is_available, _ = empty_stream.check_availability(logger) + + assert stream_is_available + assert empty_stream.read_records.called \ No newline at end of file From 2afec742b0552fccfc4912758107fb1701e99d61 Mon Sep 17 00:00:00 2001 From: erohmensing Date: Tue, 17 Jan 2023 09:34:33 -0600 Subject: [PATCH 3/9] fix flake errors --- .../airbyte_cdk/sources/streams/http/availability_strategy.py | 3 +-- .../unit_tests/sources/declarative/checks/test_check_stream.py | 1 - .../sources/streams/http/test_availability_strategy.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py index 412db6d6a217..a9e60718a64f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py @@ -38,7 +38,7 @@ def check_availability(self, stream: Stream, logger: logging.Logger, source: Opt # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield ` # without accounting for the case in which the parent stream is empty. - reason = f"Cannot attempt to connect to stream {stream_name} - no stream slices were found, likely because the parent stream is empty." + reason = f"Cannot attempt to connect to stream {stream.name} - no stream slices were found, likely because the parent stream is empty." return False, reason try: @@ -50,7 +50,6 @@ def check_availability(self, stream: Stream, logger: logging.Logger, source: Opt except HTTPError as error: return self.handle_http_error(stream, logger, source, error) - def handle_http_error( self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError ) -> Tuple[bool, Optional[str]]: diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py index 047cf89fc900..2c5670a0896e 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py @@ -2,7 +2,6 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -import logging import logging from typing import Any, Iterable, Mapping, Optional from unittest.mock import MagicMock diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py index 99fba2321219..68c0b7e04dbf 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py @@ -165,4 +165,4 @@ def __init__(self, *args, **kvargs): stream_is_available, _ = empty_stream.check_availability(logger) assert stream_is_available - assert empty_stream.read_records.called \ No newline at end of file + assert empty_stream.read_records.called From bf791b699340c2da568e6e8d2d38c0c68cede76b Mon Sep 17 00:00:00 2001 From: erohmensing Date: Tue, 17 Jan 2023 10:15:25 -0600 Subject: [PATCH 4/9] Change CheckStream to use availability strategy --- .../declarative/checks/check_stream.py | 42 +++++-------------- 1 file changed, 11 insertions(+), 31 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index 86ed8512b6aa..c56ce4388c2c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -9,6 +9,7 @@ from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.source import Source +from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from airbyte_cdk.sources.streams.utils.stream_helper import get_first_record_for_slice, get_first_stream_slice from dataclasses_jsonschema import JsonSchemaMixin @@ -16,10 +17,10 @@ @dataclass class CheckStream(ConnectionChecker, JsonSchemaMixin): """ - Checks the connections by trying to read records from one or many of the streams selected by the developer + Checks the connections by checking availability of one or many streams selected by the developer Attributes: - stream_name (List[str]): name of streams to read records from + stream_name (List[str]): names of streams to check """ stream_names: List[str] @@ -29,7 +30,6 @@ def __post_init__(self, options: Mapping[str, Any]): self._options = options def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]: - """Check configuration parameters for a source by attempting to get the first record for each stream in the CheckStream's `stream_name` list.""" streams = source.streams(config) stream_name_to_stream = {s.name: s for s in streams} if len(streams) == 0: @@ -37,35 +37,15 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi for stream_name in self.stream_names: if stream_name not in stream_name_to_stream.keys(): raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}.") - stream = stream_name_to_stream[stream_name] - - if stream.availability_strategy is not None: - try: - stream_is_available, reason = stream.check_availability(logger, source) - if stream_is_available: - return True, None - else: - return False, reason - except Exception as error: - return False, f"Unable to connect to stream {stream_name} - {error}" - - try: - # Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer) - # Streams that don't need a stream slice will return `None` as their first stream slice. - stream_slice = get_first_stream_slice(stream) - except StopIteration: - # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) - # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield ` - # without accounting for the case in which the parent stream is empty. - reason = f"Cannot attempt to connect to stream {stream_name} - no stream slices were found, likely because the parent stream is empty." - return False, reason + stream = stream_name_to_stream[stream_name] + availability_strategy = stream.availability_strategy or HttpAvailabilityStrategy() try: - get_first_record_for_slice(stream, stream_slice) - return True, None - except StopIteration: - logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.") - return True, None + stream_is_available, reason = availability_strategy.check_availability(stream, logger, source) + if stream_is_available: + return True, None + else: + return False, reason except Exception as error: - logger.error(f"Encountered an error trying to connect to stream {stream.name}. Error: \n {traceback.format_exc()}") + logger.error(f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}") return False, f"Unable to connect to stream {stream_name} - {error}" From 8b1fe95cadc55b22297d28c1eb85d8c273bc7da0 Mon Sep 17 00:00:00 2001 From: erohmensing Date: Tue, 17 Jan 2023 10:24:49 -0600 Subject: [PATCH 5/9] Refactor test from bug fix --- .../sources/streams/http/test_availability_strategy.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py index 68c0b7e04dbf..c79894fb428d 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py @@ -142,18 +142,13 @@ def test_send_handles_retries_when_checking_availability(mocker, caplog): def test_http_availability_strategy_on_empty_stream(mocker): - mocker.patch.multiple(HttpStream, __abstractmethods__=set()) - mocker.patch.multiple(Stream, __abstractmethods__=set()) - - class MockEmptyStream(mocker.MagicMock, HttpStream): - page_size = None - get_json_schema = mocker.MagicMock() + class MockEmptyHttpStream(mocker.MagicMock, MockHttpStream): def __init__(self, *args, **kvargs): mocker.MagicMock.__init__(self) self.read_records = mocker.MagicMock() - empty_stream = MockEmptyStream() + empty_stream = MockEmptyHttpStream() assert isinstance(empty_stream, HttpStream) assert isinstance(empty_stream.availability_strategy, HttpAvailabilityStrategy) From 91affb8c69d7c532438ac4f3f2ffc1f6387fbb33 Mon Sep 17 00:00:00 2001 From: erohmensing Date: Tue, 17 Jan 2023 10:34:24 -0600 Subject: [PATCH 6/9] fix flake errors --- .../airbyte_cdk/sources/declarative/checks/check_stream.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index c56ce4388c2c..7ba737d2be1c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -10,7 +10,6 @@ from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.source import Source from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy -from airbyte_cdk.sources.streams.utils.stream_helper import get_first_record_for_slice, get_first_stream_slice from dataclasses_jsonschema import JsonSchemaMixin From fcfc52f39f05849a6c549d6377a6da80724fbf63 Mon Sep 17 00:00:00 2001 From: erohmensing Date: Wed, 18 Jan 2023 08:32:28 -0600 Subject: [PATCH 7/9] Remove extra helper file from cherry-pick --- .../sources/utils/stream_helpers.py | 44 ------------------- 1 file changed, 44 deletions(-) delete mode 100644 airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py deleted file mode 100644 index 6f972246693b..000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py +++ /dev/null @@ -1,44 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from typing import Any, Mapping, Optional - -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.core import StreamData - - -class StreamHelper: - def get_first_record(self, stream: Stream) -> StreamData: - """ - Gets the first record for a stream. - - :param stream: stream - :return: StreamData containing the first record in the stream - """ - # Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer) - stream_slice = self.get_stream_slice(stream) - records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) - next(records) - - @staticmethod - def get_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]: - """ - Gets the first stream_slice from a given stream's stream_slices. - - :param stream: stream - :return: First stream slice from 'stream_slices' generator - """ - # We wrap the return output of stream_slices() because some implementations return types that are iterable, - # but not iterators such as lists or tuples - slices = iter( - stream.stream_slices( - cursor_field=stream.cursor_field, - sync_mode=SyncMode.full_refresh, - ) - ) - try: - return next(slices) - except StopIteration: - return {} From 2487c04c447f50c3e819a1dad03ec5c49297913a Mon Sep 17 00:00:00 2001 From: erohmensing Date: Wed, 18 Jan 2023 09:24:02 -0600 Subject: [PATCH 8/9] Merge tests for default http availability strategy --- .../http/test_availability_strategy.py | 80 ++++++++----------- 1 file changed, 35 insertions(+), 45 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py index c79894fb428d..4afed0fd1188 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py @@ -40,41 +40,32 @@ def retry_factor(self) -> float: return 0.01 -def test_default_http_availability_strategy(mocker): +@pytest.mark.parametrize( + ("status_code", "json_contents", "expected_is_available", "expected_messages"), + [ + (403, {"error": "Something went wrong"}, False, [ + "This is most likely due to insufficient permissions on the credentials in use.", + "Something went wrong", + ]), + (200, {}, True, []) + ] +) +@pytest.mark.parametrize( + ("include_source", "expected_docs_url_messages"), [ + (True, ["Please visit https://docs.airbyte.com/integrations/sources/MockSource to learn more."]), + (False, ["Please visit the connector's documentation to learn more."]), + ] +) +def test_default_http_availability_strategy(mocker, status_code, json_contents, expected_is_available, expected_messages, include_source, expected_docs_url_messages): http_stream = MockHttpStream() assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) - class MockResponse(requests.Response, mocker.MagicMock): + class MockResponseWithJsonContents(requests.Response, mocker.MagicMock): def __init__(self, *args, **kvargs): mocker.MagicMock.__init__(self) requests.Response.__init__(self, **kvargs) self.json = mocker.MagicMock() - response = MockResponse() - response.status_code = 403 - response.json.return_value = {"error": "Oh no!"} - mocker.patch.object(requests.Session, "send", return_value=response) - - stream_is_available, reason = http_stream.check_availability(logger) - assert not stream_is_available - - expected_messages = [ - "This is most likely due to insufficient permissions on the credentials in use.", - "Please visit the connector's documentation to learn more.", - "Oh no!", - ] - for message in expected_messages: - assert message in reason - - req = requests.Response() - req.status_code = 200 - mocker.patch.object(requests.Session, "send", return_value=req) - - stream_is_available, _ = http_stream.check_availability(logger) - assert stream_is_available - - -def test_http_availability_connector_specific_docs(mocker): class MockSource(AbstractSource): def __init__(self, streams: List[Stream] = None): self._streams = streams @@ -87,25 +78,24 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: raise Exception("Stream is not set") return self._streams - http_stream = MockHttpStream() - source = MockSource(streams=[http_stream]) - assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) - - req = requests.Response() - req.status_code = 403 - mocker.patch.object(requests.Session, "send", return_value=req, json={"error": "Oh no!"}) - - stream_is_available, reason = http_stream.check_availability(logger, source) - assert not stream_is_available + response = MockResponseWithJsonContents() + response.status_code = status_code + response.json.return_value = json_contents + mocker.patch.object(requests.Session, "send", return_value=response) - expected_messages = [ - f"The endpoint to access stream '{http_stream.name}' returned 403: Forbidden.", - "This is most likely due to insufficient permissions on the credentials in use.", - f"Please visit https://docs.airbyte.com/integrations/sources/{source.name} to learn more.", - # "Oh no!", - ] - for message in expected_messages: - assert message in reason + if include_source: + source = MockSource(streams=[http_stream]) + actual_is_available, reason = http_stream.check_availability(logger, source) + else: + actual_is_available, reason = http_stream.check_availability(logger) + + assert expected_is_available == actual_is_available + if expected_is_available: + assert reason is None + else: + all_expected_messages = expected_messages + expected_docs_url_messages + for message in all_expected_messages: + assert message in reason def test_http_availability_raises_unhandled_error(mocker): From c6ad46f83553d6e6b6a48cffc2ad88ea9fc50078 Mon Sep 17 00:00:00 2001 From: Ella Rohm-Ensing Date: Wed, 18 Jan 2023 12:10:12 -0600 Subject: [PATCH 9/9] turn off HttpAvailabilityStrategy as default (for now) (#21488) * turn off HttpAvailabilityStrategy as default (for now) * Update imports accordingly --- .../python/airbyte_cdk/sources/streams/http/http.py | 6 ------ airbyte-cdk/python/docs/concepts/http-streams.md | 11 +++++++++-- .../sources/declarative/checks/test_check_stream.py | 6 ++++++ .../streams/http/test_availability_strategy.py | 6 ++++++ airbyte-cdk/python/unit_tests/sources/test_source.py | 11 +++++++++++ 5 files changed, 32 insertions(+), 8 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 35cef60807fe..084eb052894e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -13,9 +13,7 @@ import requests import requests_cache from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.core import Stream, StreamData -from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from requests.auth import AuthBase from requests_cache.session import CachedSession @@ -115,10 +113,6 @@ def retry_factor(self) -> float: def authenticator(self) -> HttpAuthenticator: return self._authenticator - @property - def availability_strategy(self) -> Optional[AvailabilityStrategy]: - return HttpAvailabilityStrategy() - @abstractmethod def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: """ diff --git a/airbyte-cdk/python/docs/concepts/http-streams.md b/airbyte-cdk/python/docs/concepts/http-streams.md index 0be02ce35cdc..aa80d05fc266 100644 --- a/airbyte-cdk/python/docs/concepts/http-streams.md +++ b/airbyte-cdk/python/docs/concepts/http-streams.md @@ -87,8 +87,15 @@ be returned as a keyword argument. The CDK defines an `AvailabilityStrategy` for a stream, which is used to perform the `check_availability` method. This method checks whether the stream is available before performing `read_records`. -For HTTP streams, a default `HttpAvailabilityStrategy` is defined, which attempts to read the first record of the stream, and excepts +For HTTP streams, a `HttpAvailabilityStrategy` is defined, which attempts to read the first record of the stream, and excepts a dictionary of known error codes and associated reasons, `reasons_for_unavailable_status_codes`. By default, this list contains only `requests.status_codes.FORBIDDEN` (403), with an associated error message that tells the user that they are likely missing permissions associated with that stream. -You can override these known errors to except more error codes and inform the user how to resolve errors. +You can use this `HttpAvailabilityStrategy` in your `HttpStream` by adding the following property to your stream class: + +```python + def availability_strategy(self) -> Optional[AvailabilityStrategy]: + return HttpAvailabilityStrategy() +``` + +You can also subclass `HttpAvailabilityStrategy` to override the list of known errors to except more error codes and inform the user how to resolve errors specific to your connector or stream. diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py index 2c5670a0896e..52a2e4f03500 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py @@ -9,6 +9,7 @@ import pytest import requests from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream +from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy @@ -119,6 +120,11 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield stub_resp pass + # TODO (Ella): Remove explicit definition when turning on default + @property + def availability_strategy(self) -> Optional["AvailabilityStrategy"]: + return HttpAvailabilityStrategy() + http_stream = MockHttpStream() assert isinstance(http_stream, HttpStream) assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py index 4afed0fd1188..6bd1cf005fa2 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py @@ -9,6 +9,7 @@ import requests from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from airbyte_cdk.sources.streams.http.http import HttpStream from requests import HTTPError @@ -39,6 +40,11 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp def retry_factor(self) -> float: return 0.01 + # TODO (Ella): Remove explicit definition when turning on default + @property + def availability_strategy(self) -> Optional["AvailabilityStrategy"]: + return HttpAvailabilityStrategy() + @pytest.mark.parametrize( ("status_code", "json_contents", "expected_is_available", "expected_messages"), diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index 64a546108db1..0cae909b2f08 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -23,6 +23,7 @@ Type, ) from airbyte_cdk.sources import AbstractSource, Source +from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from airbyte_cdk.sources.streams.http.http import HttpStream @@ -491,6 +492,11 @@ def __init__(self, *args, **kvargs): HttpStream.__init__(self, *args, kvargs) self.read_records = mocker.MagicMock() + # TODO (Ella): Remove explicit definition when turning on default + @property + def availability_strategy(self) -> Optional["AvailabilityStrategy"]: + return HttpAvailabilityStrategy() + class MockStream(mocker.MagicMock, Stream): page_size = None get_json_schema = mocker.MagicMock() @@ -544,6 +550,11 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp self.resp_counter += 1 yield stub_response + # TODO (Ella): Remove explicit definition when turning on default + @property + def availability_strategy(self) -> Optional["AvailabilityStrategy"]: + return HttpAvailabilityStrategy() + class MockStream(mocker.MagicMock, Stream): page_size = None get_json_schema = mocker.MagicMock()