Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reintroduce AvailabilityStrategy into the CDK (HttpAvailabilityStrategy default not turned on yet) #21484

Merged
merged 10 commits into from
Jan 18, 2023
6 changes: 5 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ def read(
f"The requested stream {configured_stream.stream.name} was not found in the source."
f" Available streams: {stream_instances.keys()}"
)
stream_is_available, error = stream_instance.check_availability(logger, self)
if not stream_is_available:
logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. Error: {error}")
continue
try:
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
yield from self._read_stream(
Expand Down Expand Up @@ -187,7 +191,7 @@ def _read_stream(
@staticmethod
def _limit_reached(internal_config: InternalConfig, records_counter: int) -> bool:
"""
Check if record count reached liimt set by internal config.
Check if record count reached limit set by internal config.
:param internal_config - internal CDK configuration separated from user defined config
:records_counter - number of records already red
:return True if limit reached, False otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@

from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.streams.utils.stream_helper import get_first_record_for_slice, get_first_stream_slice
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class CheckStream(ConnectionChecker, JsonSchemaMixin):
"""
Checks the connections by trying to read records from one or many of the streams selected by the developer
Checks the connections by checking availability of one or many streams selected by the developer

Attributes:
stream_name (List[str]): name of streams to read records from
stream_name (List[str]): names of streams to check
"""

stream_names: List[str]
Expand All @@ -29,33 +29,22 @@ def __post_init__(self, options: Mapping[str, Any]):
self._options = options

def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
"""Check configuration parameters for a source by attempting to get the first record for each stream in the CheckStream's `stream_name` list."""
streams = source.streams(config)
stream_name_to_stream = {s.name: s for s in streams}
if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
for stream_name in self.stream_names:
if stream_name not in stream_name_to_stream.keys():
raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}.")
stream = stream_name_to_stream[stream_name]

try:
# Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer)
# Streams that don't need a stream slice will return `None` as their first stream slice.
stream_slice = get_first_stream_slice(stream)
except StopIteration:
# If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
# This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
# without accounting for the case in which the parent stream is empty.
reason = f"Cannot attempt to connect to stream {stream_name} - no stream slices were found, likely because the parent stream is empty."
return False, reason

stream = stream_name_to_stream[stream_name]
availability_strategy = stream.availability_strategy or HttpAvailabilityStrategy()
try:
get_first_record_for_slice(stream, stream_slice)
return True, None
except StopIteration:
logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.")
return True, None
stream_is_available, reason = availability_strategy.check_availability(stream, logger, source)
if stream_is_available:
return True, None
else:
return False, reason
except Exception as error:
logger.error(f"Encountered an error trying to connect to stream {stream.name}. Error: \n {traceback.format_exc()}")
logger.error(f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}")
return False, f"Unable to connect to stream {stream_name} - {error}"
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class DeclarativeSource(AbstractSource):
@property
@abstractmethod
def connection_checker(self) -> ConnectionChecker:
"""Returns the ConnectioChecker to use for the `check` operation"""
"""Returns the ConnectionChecker to use for the `check` operation"""

def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
import typing
from abc import ABC, abstractmethod
from typing import Optional, Tuple

from airbyte_cdk.sources.streams import Stream

if typing.TYPE_CHECKING:
from airbyte_cdk.sources import Source


class AvailabilityStrategy(ABC):
"""
Abstract base class for checking stream availability.
"""

@abstractmethod
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]:
"""
Checks stream availability.

:param stream: stream
:param logger: source logger
:param source: (optional) source
:return: A tuple of (boolean, str). If boolean is true, then the stream
is available, and no str is required. Otherwise, the stream is unavailable
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
29 changes: 28 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

import inspect
import logging
import typing
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union

import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode
Expand All @@ -17,6 +18,10 @@
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from deprecated.classic import deprecated

if typing.TYPE_CHECKING:
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy

# A stream's read method can return one of the following types:
# Mapping[str, Any]: The content of an AirbyteRecordMessage
# AirbyteRecordMessage: An AirbyteRecordMessage
Expand Down Expand Up @@ -170,6 +175,28 @@ def source_defined_cursor(self) -> bool:
"""
return True

def check_availability(self, logger: logging.Logger, source: Optional["Source"] = None) -> Tuple[bool, Optional[str]]:
"""
Checks whether this stream is available.

:param logger: source logger
:param source: (optional) source
:return: A tuple of (boolean, str). If boolean is true, then this stream
is available, and no str is required. Otherwise, this stream is unavailable
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
if self.availability_strategy:
return self.availability_strategy.check_availability(self, logger, source)
return True, None

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
"""
:return: The AvailabilityStrategy used to check whether this stream is available.
"""
return None

@property
@abstractmethod
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
import typing
from typing import Dict, Optional, Tuple

import requests
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.utils.stream_helper import get_first_record_for_slice, get_first_stream_slice
from requests import HTTPError

if typing.TYPE_CHECKING:
from airbyte_cdk.sources import Source


class HttpAvailabilityStrategy(AvailabilityStrategy):
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]:
"""
Check stream availability by attempting to read the first record of the
stream.

:param stream: stream
:param logger: source logger
:param source: (optional) source
:return: A tuple of (boolean, str). If boolean is true, then the stream
is available, and no str is required. Otherwise, the stream is unavailable
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
try:
# Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer)
# Streams that don't need a stream slice will return `None` as their first stream slice.
stream_slice = get_first_stream_slice(stream)
except StopIteration:
# If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
# This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
# without accounting for the case in which the parent stream is empty.
reason = f"Cannot attempt to connect to stream {stream.name} - no stream slices were found, likely because the parent stream is empty."
return False, reason

try:
get_first_record_for_slice(stream, stream_slice)
return True, None
except StopIteration:
logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.")
return True, None
except HTTPError as error:
return self.handle_http_error(stream, logger, source, error)

def handle_http_error(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Tuple[bool, Optional[str]]:
"""
Override this method to define error handling for various `HTTPError`s
that are raised while attempting to check a stream's availability.

Checks whether an error's status_code is in a list of unavailable_error_codes,
and gets the associated reason for that error.

:param stream: stream
:param logger: source logger
:param source: optional (source)
:param error: HTTPError raised while checking stream's availability.
:return: A tuple of (boolean, str). If boolean is true, then the stream
is available, and no str is required. Otherwise, the stream is unavailable
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
try:
status_code = error.response.status_code
reason = self.reasons_for_unavailable_status_codes(stream, logger, source, error)[status_code]
response_error_message = stream.parse_response_error_message(error.response)
if response_error_message:
reason += response_error_message
return False, reason
except KeyError:
# If the HTTPError is not in the dictionary of errors we know how to handle, don't except it
raise error

def reasons_for_unavailable_status_codes(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Dict[int, str]:
"""
Returns a dictionary of HTTP status codes that indicate stream
unavailability and reasons explaining why a given status code may
have occurred and how the user can resolve that error, if applicable.

:param stream: stream
:param logger: source logger
:param source: optional (source)
:return: A dictionary of (status code, reason) where the 'reason' explains
why 'status code' may have occurred and how the user can resolve that
error, if applicable.
"""
forbidden_error_message = f"The endpoint to access stream '{stream.name}' returned 403: Forbidden. "
forbidden_error_message += "This is most likely due to insufficient permissions on the credentials in use. "
forbidden_error_message += self._visit_docs_message(logger, source)

reasons_for_codes: Dict[int, str] = {requests.codes.FORBIDDEN: forbidden_error_message}
erohmensing marked this conversation as resolved.
Show resolved Hide resolved
return reasons_for_codes

@staticmethod
def _visit_docs_message(logger: logging.Logger, source: Optional["Source"]) -> str:
"""
Creates a message indicicating where to look in the documentation for
more information on a given source by checking the spec of that source
(if provided) for a 'documentationUrl'.

:param logger: source logger
:param source: optional (source)
:return: A message telling the user where to go to learn more about the source.
"""
if not source:
return "Please visit the connector's documentation to learn more. "

try:
connector_spec = source.spec(logger)
docs_url = connector_spec.documentationUrl
if docs_url:
return f"Please visit {docs_url} to learn more. "
else:
return "Please visit the connector's documentation to learn more. "

except FileNotFoundError: # If we are unit testing without implementing spec() method in source
if source:
docs_url = f"https://docs.airbyte.com/integrations/sources/{source.name}"
else:
docs_url = "https://docs.airbyte.com/integrations/sources/test"

return f"Please visit {docs_url} to learn more."
18 changes: 18 additions & 0 deletions airbyte-cdk/python/docs/concepts/http-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,21 @@ When we are dealing with streams that depend on the results of another stream, w
If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc..
override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can
be returned as a keyword argument.

## Stream Availability

The CDK defines an `AvailabilityStrategy` for a stream, which is used to perform the `check_availability` method. This method checks whether
the stream is available before performing `read_records`.

For HTTP streams, a `HttpAvailabilityStrategy` is defined, which attempts to read the first record of the stream, and excepts
a dictionary of known error codes and associated reasons, `reasons_for_unavailable_status_codes`. By default, this list contains only
`requests.status_codes.FORBIDDEN` (403), with an associated error message that tells the user that they are likely missing permissions associated with that stream.

You can use this `HttpAvailabilityStrategy` in your `HttpStream` by adding the following property to your stream class:

```python
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return HttpAvailabilityStrategy()
```

You can also subclass `HttpAvailabilityStrategy` to override the list of known errors to except more error codes and inform the user how to resolve errors specific to your connector or stream.
Loading