Skip to content

Commit

Permalink
Alex/configurable retrier (#14330)
Browse files Browse the repository at this point in the history
* checkout files from test branch

* read_incremental works

* reset to master

* remove dead code

* comment

* fix

* Add test

* comments

* utc

* format

* small fix

* Add test with rfc3339

* remove unused param

* fix test

* configurable state checkpointing

* update test

* start working on retrier

* retry predicate

* return response status

* look in error message

* cleanup test

* constant backoff strategy

* chain backoff strategy

* chain retrier

* Add to class types registry

* extract backoff time from header

* wait until

* update

* split file

* parse_records

* classmethod

* delete dead code

* comment

* comment

* comments

* fix

* test for instantiating chain retrier

* fix parsing

* cleanup

* fix

* reset

* never raise on http error

* remove print

* comment

* comment

* comment

* comment

* remove prints

* add declarative stream to registry

* Delete dead code

* Add docstrings

* quick fix

* exponential backoff

* fix test

* fix

* delete unused properties

* fix

* missing unit tests

* uppercase

* docstrings

* rename to success

* compare full request instead of just url

* renmae module

* rename test file

* rename interface

* rename default retrier

* rename to compositeerrorhandler

* fix missing renames

* move action to filter

* str -> minmaxdatetime

* small fixes

* plural

* add example

* handle header variations

* also fix wait time from

* allow using a regex to extract the value

* group()

* docstring

* add docs

* update comment

* docstrings

* update comment

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py

Co-authored-by: Sherif A. Nada <[email protected]>

* version: Update Parquet library to latest release (#14502)

The upstream Parquet library that is currently pinned for use in the S3 destination plugin is over a year old. The current version is generating invalid schemas for date-time with time-zone fields which appears to be addressed in the `1.12.3` release of the library in commit apache/parquet-java@c72862b

* merge

* 🎉 Source Github: improve schema for stream `pull_request_commits` added "null" (#14613)

Signed-off-by: Sergey Chvalyuk <[email protected]>

* Docs: Fixed broken links (#14622)

* fixing broken links

* more broken links

* source-hubspot: change mentioning of Mailchimp into HubSpot  doc (#14620)

* Helm Chart: Add external temporal option (#14597)

* conflict env configmap and chart lock

* reverting lock

* add eof lines and documentation on values yaml

* conflict json file

* rollback json

* solve conflict

* correct minio with new version

Co-authored-by: Guy Feldman <[email protected]>

* 🎉 Add YAML format to source-file reader (#14588)

* Add yaml reader

* Update docs

* Bumpversion of connector

* bump docs

* Update pyarrow dependency

* Upgrade pandas dependency

* auto-bump connector version

Co-authored-by: Octavia Squidington III <[email protected]>

* 🎉 Source Okta: add GroupMembers stream (#14380)

* add Group_Members stream to okta source

- Group_Members return a list of users, the same schema of Users stream.
- Create a shared schema users, and both group_members and users sechema use it as a reference.
- Add Group_Members stream to source connector

* add tests and fix logs schema

- fix the test error: None is not one of enums though the enum type includes both string and null, it comes from json schema validator
https://github.com/python-jsonschema/jsonschema/blob/ddb87afad8f5d5c40600b5ede0ab96e4d4bdf7d3/jsonschema/_validators.py#L279-L285
- change grouop_members to use id as the cursor field since `filter` is not supported in the query string
- fix the abnormal state test on logs stream, when since is abnormally large, until has to defined, an equal or a larger value
- remove logs stream from full sync test, because 2 full sync always has a gap -- at least a new log about users or groups api.

* last polish before submit the PR

- bump docker version
- update changelog
- add the right abnormal value for logs stream
- correct the sample catalog

* address comments::

- improve comments for until parameter under the logs stream
- add use_cache on groupMembers

* add use_cache to Group_Members

* change configured_catalog to test

* auto-bump connector version

Co-authored-by: marcosmarxm <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>

* split test files

* renames

* missing unit test

* add missing unit tests

* rename

* assert isinstance

* start extracting to their own files

* use final instead of classmethod

* assert we retry 429 errors

* Add log

* replace asserts with valueexceptions

* delete superfluous print statement

* fix factory so we don't need to union everything with strings

* get class_name from type

* remove from class types registry

* process error handlers one at a time

* sort

* delete print statement

* comment

* comment

* format

* delete unused file

Co-authored-by: Sherif A. Nada <[email protected]>
Co-authored-by: Tobias Macey <[email protected]>
Co-authored-by: Serhii Chvaliuk <[email protected]>
Co-authored-by: Amruta Ranade <[email protected]>
Co-authored-by: Bas Beelen <[email protected]>
Co-authored-by: Marcos Marx <[email protected]>
Co-authored-by: Guy Feldman <[email protected]>
Co-authored-by: Christophe Duong <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
Co-authored-by: Yiyang Li <[email protected]>
Co-authored-by: marcosmarxm <[email protected]>
  • Loading branch information
12 people authored Jul 14, 2022
1 parent c622dd0 commit 09aa685
Show file tree
Hide file tree
Showing 45 changed files with 1,274 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def newfunc(*fargs, **fkeywords):

# interpolate the parameters
interpolated_keywords = InterpolatedMapping(fully_created, interpolation).eval(config, **{"options": options})
interpolated_keywords = {k: v for k, v in interpolated_keywords.items() if v is not None}
interpolated_keywords = {k: v for k, v in interpolated_keywords.items() if v}

all_keywords.update(interpolated_keywords)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
from typing import Any, List, Mapping

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.types import Record
from airbyte_cdk.sources.declarative.types import Config, Record


class RecordFilter:
def __init__(self, config, condition: str = None):
def __init__(self, config: Config, condition: str = None):
self._config = config
self._filter_interpolator = InterpolatedBoolean(condition)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,39 @@
from typing import Mapping, Type

from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.constant_backoff_strategy import ConstantBackoffStrategy
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.exponential_backoff_strategy import (
ExponentialBackoffStrategy,
)
from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import CompositeErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator import NextPageUrlPaginator
from airbyte_cdk.sources.declarative.requesters.paginators.offset_paginator import OffsetPaginator
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
from airbyte_cdk.sources.declarative.transformations import RemoveFields
from airbyte_cdk.sources.streams.http.requests_native_auth.token import TokenAuthenticator

CLASS_TYPES_REGISTRY: Mapping[str, Type] = {
"CartesianProductStreamSlicer": CartesianProductStreamSlicer,
"CompositeErrorHandler": CompositeErrorHandler,
"ConstantBackoffStrategy": ConstantBackoffStrategy,
"DatetimeStreamSlicer": DatetimeStreamSlicer,
"DeclarativeStream": DeclarativeStream,
"DefaultErrorHandler": DefaultErrorHandler,
"ExponentialBackoffStrategy": ExponentialBackoffStrategy,
"HttpRequester": HttpRequester,
"InterpolatedPaginator": InterpolatedPaginator,
"JelloExtractor": JelloExtractor,
"ListStreamSlicer": ListStreamSlicer,
"MinMaxDatetime": MinMaxDatetime,
"NextPageUrlPaginator": NextPageUrlPaginator,
"OffsetPaginator": OffsetPaginator,
"TokenAuthenticator": TokenAuthenticator,
"RemoveFields": RemoveFields,
"TokenAuthenticator": TokenAuthenticator,
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@

from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.requesters.retriers.default_retrier import DefaultRetrier
from airbyte_cdk.sources.declarative.requesters.retriers.retrier import Retrier
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
Expand All @@ -25,17 +29,22 @@
from airbyte_cdk.sources.declarative.states.state import State
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.streams.core import Stream

DEFAULT_IMPLEMENTATIONS_REGISTRY: Mapping[Type, Type] = {
Requester: HttpRequester,
Retriever: SimpleRetriever,
SchemaLoader: JsonSchema,
HttpSelector: RecordSelector,
ConnectionChecker: CheckStream,
Retrier: DefaultRetrier,
ErrorHandler: DefaultErrorHandler,
Decoder: JsonDecoder,
JelloExtractor: JelloExtractor,
State: DictState,
StreamSlicer: SingleSlice,
Paginator: NoPagination,
HttpResponseFilter: HttpResponseFilter,
Stream: DeclarativeStream,
MinMaxDatetime: MinMaxDatetime,
InterpolatedString: InterpolatedString,
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ def create_component(self, component_definition: Mapping[str, Any], config: Conf
:return: the object to create
"""
kwargs = copy.deepcopy(component_definition)
class_name = kwargs.pop("class_name")
if "class_name" in kwargs:
class_name = kwargs.pop("class_name")
elif "type" in kwargs:
class_name = CLASS_TYPES_REGISTRY[kwargs.pop("type")]
else:
raise ValueError(f"Failed to create component because it has no class_name or type. Definition: {component_definition}")
return self.build(class_name, config, **kwargs)

def build(self, class_or_class_name: Union[str, Type], config, **kwargs):
Expand Down Expand Up @@ -92,7 +97,13 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
for sub in definition
]
else:
return definition
expected_type = self.get_default_type(key, parent_class)
if expected_type and not isinstance(definition, expected_type):
# call __init__(definition) if definition is not a dict and is not of the expected type
# for instance, to turn a string into an InterpolatedString
return expected_type(definition)
else:
return definition

@staticmethod
def is_object_definition_with_class_name(definition):
Expand All @@ -106,13 +117,16 @@ def is_object_definition_with_type(definition):
def get_default_type(parameter_name, parent_class):
type_hints = get_type_hints(parent_class.__init__)
interface = type_hints.get(parameter_name)
origin = get_origin(interface)
if origin == Union:
# Handling Optional, which are implement as a Union[T, None]
# the interface we're looking for being the first type argument
args = get_args(interface)
interface = args[0]

while True:
origin = get_origin(interface)
if origin:
# Unnest types until we reach the raw type
# List[T] -> T
# Optional[List[T]] -> T
args = get_args(interface)
interface = args[0]
else:
break
expected_type = DEFAULT_IMPLEMENTATIONS_REGISTRY.get(interface)
return expected_type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,21 @@ def preprocess(self, value, evaluated_config, path):
elif isinstance(value, dict):
return self.preprocess_dict(value, evaluated_config, path)
elif type(value) == list:
evaluated_list = [self.preprocess(v, evaluated_config, path) for v in value]
evaluated_list = [
# pass in elem's path instead of the list's path
self.preprocess(v, evaluated_config, self._get_path_for_list_item(path, index))
for index, v in enumerate(value)
]
# Add the list's element to the evaluated config so they can be referenced
for index, elem in enumerate(evaluated_list):
evaluated_config[self._get_path_for_list_item(path, index)] = elem
return evaluated_list
else:
return value

def _get_path_for_list_item(self, path, index):
# An elem's path is {path_to_list}[{index}]
if len(path) > 1:
return path[:-1], f"{path[-1]}[{index}]"
else:
return (f"{path[-1]}[{index}]",)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Optional

import requests
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy


class ConstantBackoffStrategy(BackoffStrategy):
"""
Backoff strategy with a constant backoff interval
"""

def __init__(self, backoff_time_in_seconds: float):
"""
:param backoff_time_in_seconds: time to backoff before retrying a retryable request
"""
self._backoff_time_in_seconds = backoff_time_in_seconds

def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
return self._backoff_time_in_seconds
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Optional

import requests
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy


class ExponentialBackoffStrategy(BackoffStrategy):
"""
Backoff strategy with an exponential backoff interval
"""

def __init__(self, factor: float = 5):
"""
:param factor: multiplicative factor
"""
self._factor = factor

def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
return self._factor * 2**attempt_count
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import numbers
from re import Pattern
from typing import Optional

import requests


def get_numeric_value_from_header(response: requests.Response, header: str, regex: Optional[Pattern]) -> Optional[float]:
"""
Extract a header value from the response as a float
:param response: response the extract header value from
:param header: Header to extract
:param regex: optional regex to apply on the header to obtain the value
:return: header value as float if it's a number. None otherwise
"""
header_value = response.headers.get(header, None)
if not header_value:
return None
if isinstance(header_value, str):
if regex:
match = regex.match(header_value)
if match:
header_value = match.group()
return _as_float(header_value)
elif isinstance(header_value, numbers.Number):
return float(header_value)
else:
return None


def _as_float(s: str) -> Optional[float]:
try:
return float(s)
except ValueError:
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import re
from typing import Optional

import requests
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.header_helper import get_numeric_value_from_header
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy


class WaitTimeFromHeaderBackoffStrategy(BackoffStrategy):
"""
Extract wait time from http header
"""

def __init__(self, header: str, regex: Optional[str] = None):
"""
:param header: header to read wait time from
:param regex: optional regex to apply on the header to extract its value
"""
self._header = header
self._regex = re.compile(regex) if regex else None

def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
header_value = get_numeric_value_from_header(response, self._header, self._regex)
return header_value
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import numbers
import re
import time
from typing import Optional

import requests
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.header_helper import get_numeric_value_from_header
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy


class WaitUntilTimeFromHeaderBackoffStrategy(BackoffStrategy):
"""
Extract time at which we can retry the request from response header
and wait for the difference between now and that time
"""

def __init__(self, header: str, min_wait: Optional[float] = None, regex: Optional[str] = None):
"""
:param header: header to read wait time from
:param min_wait: minimum time to wait for safety
:param regex: optional regex to apply on the header to extract its value
"""
self._header = header
self._min_wait = min_wait
self._regex = re.compile(regex) if regex else None

def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
now = time.time()
wait_until = get_numeric_value_from_header(response, self._header, self._regex)
if wait_until is None or not wait_until:
return self._min_wait
if (isinstance(wait_until, str) and wait_until.isnumeric()) or isinstance(wait_until, numbers.Number):
wait_time = float(wait_until) - now
else:
return self._min_wait
if self._min_wait:
return max(wait_time, self._min_wait)
elif wait_time < 0:
return None
return wait_time
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import abstractmethod
from typing import Optional

import requests


class BackoffStrategy:
@abstractmethod
def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
"""
Return time to wait before retrying the request.
:param response: response received for the request to retry
:param attempt_count: number of attempts to submit the request
:return: time to wait in seconds
"""
Loading

0 comments on commit 09aa685

Please sign in to comment.