From a9c0f98235f72ddf43116cf3d6bd928fef1d7fe5 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 9 Nov 2022 13:26:50 -0800 Subject: [PATCH] SimpleRetriever yield request and response as log messages (#18644) * method yielding airbytemessage * move to Stream * update abstract source * reset * missing file * Yield request and response as log messages * only emit request and responses if the debug flag is on * add test docker image * script to run acceptance tests with local cdk * Update conftest to use a different image * extract to method * dont use a different image tag * Always install local cdk * break the cdk * get path from current working directory * or * ignore unit test * debug log * Revert "AMI change: ami-0f23be2f917510c26 -> ami-005924fb76f7477ce (#18689)" This reverts commit bf06decf73381a799056e0ca5c5511a311dfcb83. * build from the top * Update source-acceptance-test * fix * copy setup * some work on the gradle plugin * reset to master * delete unused file * delete unused file * reset to master * optional argument * delete dead code * use latest cdk with sendgrid * fix sendgrid dockerfile * break the cdk * use local file * Revert "break the cdk" This reverts commit 600c19554195c242e22e065dcfe27c6c4f205c97. * dont raise an exception * reset to master * unit tests * missing test * more unit tests * remove deprecated comment * newline * reset to master * remove files * reset * Update abstract source * remove method from stream * convert to airbytemessage * unittests * Update * unit test * remove debug logs * Revert "remove debug logs" This reverts commit a1a139ef37759b552501b57f27dda66b0cac3f3a. * Revert "Revert "remove debug logs"" This reverts commit b1d62cdb60f7733952a39fabbe67900661a3c5db. * Revert "reset to master" This reverts commit 3fa6a004c1df0bac1ccbd414bd93b32c0b40ab16. * fix * slightly better test * typing * extract method * Revert "Revert "reset to master"" This reverts commit 5dac7c280497027cf24f87a3ab9f7bf279b53aa6. * reset to master * reset to master * Revert "reset to master" This reverts commit 3fa6a004c1df0bac1ccbd414bd93b32c0b40ab16. * Comment * operate on the message * Revert "Revert "reset to master"" This reverts commit 5833c84d0ab00808f269c5d14b625eb9f772cd36. * comment * test * Revert "test" This reverts commit 2f91b803b058b32675fb9bff89b5bad1c2434d4c. * test * Revert "test" This reverts commit 62d95ebbb5fba47a7b5aa57202067c02655c8c58. * test * Revert "test" This reverts commit 27150ba3417fed14f2f75e8328f2a34f7ec88da6. * format * format * symlink * Update setup * update path * reset to master * update * Add local files * greenhouse * format * symlink * try reordering * better error message * better log message * reset to master * Revert "merge for qa" This reverts commit ad7128f2c54df5aeb9a90a9989b499ae2aa90b44, reversing changes made to 7196c22a731da4af0c54d4aca0b225712b5ef8f6. * reset to master * reset to master * reset to master * format * gradlew format * right type hints * reset to master * reset to master * gradlew format * a bunch of small fixes * Update output format * fixes from feedback * fixme comment * streams cannot return AirbyteRecordMessage * fix * format * only return logs when running on debug mode * move branching * update typing * remove dead code * fix simpleretriever name * i think this is better * log response.text * debug flag * comment * pass config * comments * run SATs * fix most of the unit tests * fix unit test * reset to master * runFromPath * Revert "runFromPath" This reverts commit 85979a801a6ced67dd0354e69a54e66a78fd2396. * Revert "run SATs" This reverts commit a8a8a2da95f3b5bb4cbd69118d9f163d4b0b31d9. * no need to convert to dict * fix test --- .../airbyte_cdk/sources/abstract_source.py | 18 ++++-- .../manifest_declarative_source.py | 38 ++++++++++-- .../declarative/retrievers/retriever.py | 5 +- .../retrievers/simple_retriever.py | 58 +++++++++++++++---- .../stream_slicers/substream_slicer.py | 8 ++- .../declarative/yaml_declarative_source.py | 4 +- .../airbyte_cdk/sources/streams/core.py | 4 +- .../airbyte_cdk/sources/streams/http/http.py | 49 +++++++++++----- .../sources/utils/record_helper.py | 4 +- .../retrievers/test_simple_retriever.py | 24 ++++++-- .../declarative/test_declarative_stream.py | 21 +++++-- .../sources/utils/test_record_helper.py | 8 --- .../integration_tests/configured_catalog.json | 31 +--------- 13 files changed, 178 insertions(+), 94 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 842757997f24..389ffcc19d66 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -20,6 +20,7 @@ from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.source import Source from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.streams.http.http import HttpStream from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config @@ -240,9 +241,7 @@ def _read_incremental( ) record_counter = 0 for message_counter, record_data_or_message in enumerate(records, start=1): - message = stream_data_to_airbyte_message( - stream_name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema() - ) + message = self._get_message(record_data_or_message, stream_instance) yield message if message.type == MessageType.RECORD: record = message.record @@ -287,9 +286,7 @@ def _read_full_refresh( cursor_field=configured_stream.cursor_field, ) for record_data_or_message in record_data_or_messages: - message = stream_data_to_airbyte_message( - stream_instance.name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema() - ) + message = self._get_message(record_data_or_message, stream_instance) yield message if message.type == MessageType.RECORD: total_records_counter += 1 @@ -315,3 +312,12 @@ def _apply_log_level_to_stream_logger(logger: logging.Logger, stream_instance: S """ if hasattr(logger, "level"): stream_instance.logger.setLevel(logger.level) + + def _get_message(self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream): + """ + Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage + """ + if isinstance(record_data_or_message, AirbyteMessage): + return record_data_or_message + else: + return stream_data_to_airbyte_message(stream.name, record_data_or_message, stream.transformer, stream.get_json_schema()) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index a6a3db4a5f01..f44342d14ffe 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -8,9 +8,15 @@ import typing from dataclasses import dataclass, fields from enum import Enum, EnumMeta -from typing import Any, List, Mapping, Union - -from airbyte_cdk.models import ConnectorSpecification +from typing import Any, Iterator, List, Mapping, MutableMapping, Union + +from airbyte_cdk.models import ( + AirbyteConnectionStatus, + AirbyteMessage, + AirbyteStateMessage, + ConfiguredAirbyteCatalog, + ConnectorSpecification, +) from airbyte_cdk.sources.declarative.checks import CheckStream from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource @@ -35,12 +41,14 @@ class ManifestDeclarativeSource(DeclarativeSource): VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"} - def __init__(self, source_config: ConnectionDefinition): + def __init__(self, source_config: ConnectionDefinition, debug: bool = False): """ :param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector + :param debug(bool): True if debug mode is enabled """ self.logger = logging.getLogger(f"airbyte.{self.name}") self._source_config = source_config + self._debug = debug self._factory = DeclarativeComponentFactory() self._validate_source() @@ -73,6 +81,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification: will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root. """ + self._configure_logger_level(logger) self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}) spec = self._source_config.get("spec") @@ -84,6 +93,27 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification: else: return super().spec(logger) + def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + self._configure_logger_level(logger) + return super().check(logger, config) + + def read( + self, + logger: logging.Logger, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None, + ) -> Iterator[AirbyteMessage]: + self._configure_logger_level(logger) + yield from super().read(logger, config, catalog, state) + + def _configure_logger_level(self, logger: logging.Logger): + """ + Set the log level to logging.DEBUG if debug mode is enabled + """ + if self._debug: + logger.setLevel(logging.DEBUG) + def _validate_source(self): full_config = {} if "version" in self._source_config: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py index 45252050a6ec..99fac94af4e2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py @@ -7,7 +7,8 @@ from typing import Iterable, List, Optional from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState +from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState +from airbyte_cdk.sources.streams.core import StreamData from dataclasses_jsonschema import JsonSchemaMixin @@ -24,7 +25,7 @@ def read_records( cursor_field: Optional[List[str]] = None, stream_slice: Optional[StreamSlice] = None, stream_state: Optional[StreamState] = None, - ) -> Iterable[Record]: + ) -> Iterable[StreamData]: """ Fetch a stream's records from an HTTP API source diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 27fcd98655ee..e35820ed9d32 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -2,13 +2,17 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import json +import logging from dataclasses import InitVar, dataclass, field from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import requests -from airbyte_cdk.models import SyncMode +from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode +from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.declarative.exceptions import ReadException from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator @@ -16,8 +20,10 @@ from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer -from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState +from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState +from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets from dataclasses_jsonschema import JsonSchemaMixin @@ -46,9 +52,10 @@ class SimpleRetriever(Retriever, HttpStream, JsonSchemaMixin): requester: Requester record_selector: HttpSelector + config: Config options: InitVar[Mapping[str, Any]] name: str - _name: str = field(init=False, repr=False, default="") + _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") primary_key: Optional[Union[str, List[str], List[List[str]]]] _primary_key: str = field(init=False, repr=False, default="") paginator: Optional[Paginator] = None @@ -59,13 +66,15 @@ def __post_init__(self, options: Mapping[str, Any]): HttpStream.__init__(self, self.requester.get_authenticator()) self._last_response = None self._last_records = None + self._options = options + self.name = InterpolatedString(self._name, options=options) @property def name(self) -> str: """ :return: Stream name """ - return self._name + return self._name.eval(self.config) @name.setter def name(self, value: str) -> None: @@ -347,17 +356,23 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, def read_records( self, sync_mode: SyncMode, - cursor_field: List[str] = None, + cursor_field: Optional[List[str]] = None, stream_slice: Optional[StreamSlice] = None, stream_state: Optional[StreamState] = None, - ) -> Iterable[Mapping[str, Any]]: + ) -> Iterable[StreamData]: # Warning: use self.state instead of the stream_state passed as argument! stream_slice = stream_slice or {} # None-check self.paginator.reset() - records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state) - for r in records_generator: - self.stream_slicer.update_cursor(stream_slice, last_record=r) - yield r + records_generator = self._read_pages( + lambda req, res, state, _slice: self.parse_records_and_emit_request_and_responses( + req, res, stream_slice=_slice, stream_state=state + ), + stream_slice, + stream_state, + ) + for record in records_generator: + self.stream_slicer.update_cursor(stream_slice, last_record=record) + yield record else: last_record = self._last_records[-1] if self._last_records else None self.stream_slicer.update_cursor(stream_slice, last_record=last_record) @@ -385,3 +400,26 @@ def state(self) -> MutableMapping[str, Any]: def state(self, value: StreamState): """State setter, accept state serialized by state getter.""" self.stream_slicer.update_cursor(value) + + def parse_records_and_emit_request_and_responses(self, request, response, stream_slice, stream_state) -> Iterable[StreamData]: + # Only emit requests and responses when running in debug mode + if self.logger.isEnabledFor(logging.DEBUG): + yield self._create_trace_message_from_request(request) + yield self._create_trace_message_from_response(response) + # Not great to need to call _read_pages which is a private method + # A better approach would be to extract the HTTP client from the HttpStream and call it directly from the HttpRequester + yield from self._read_pages( + lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state + ) + + def _create_trace_message_from_request(self, request: requests.PreparedRequest): + # FIXME: this should return some sort of trace message + request_dict = {"url": request.url, "headers": dict(request.headers), "body": request.body} + log_message = filter_secrets(f"request:{json.dumps(request_dict)}") + return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) + + def _create_trace_message_from_response(self, response: requests.Response): + # FIXME: this should return some sort of trace message + response_dict = {"body": response.text, "headers": dict(response.headers), "status_code": response.status_code} + log_message = filter_secrets(f"response:{json.dumps(response_dict)}") + return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py index dbe46d924269..081b8d067433 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py @@ -5,7 +5,7 @@ from dataclasses import InitVar, dataclass from typing import Any, Iterable, List, Mapping, Optional -from airbyte_cdk.models import SyncMode +from airbyte_cdk.models import AirbyteMessage, SyncMode, Type from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState @@ -138,6 +138,12 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Itera for parent_record in parent_stream.read_records( sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None ): + # Skip non-records (eg AirbyteLogMessage) + if isinstance(parent_record, AirbyteMessage): + if parent_record.type == Type.RECORD: + parent_record = parent_record.record.data + else: + continue empty_parent_slice = False stream_state_value = parent_record.get(parent_field) yield {stream_state_field: stream_state_value, "parent_slice": parent_slice} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 08ee0e7ebebf..1907b8284b76 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -12,13 +12,13 @@ class YamlDeclarativeSource(ManifestDeclarativeSource): """Declarative source defined by a yaml file""" - def __init__(self, path_to_yaml): + def __init__(self, path_to_yaml, debug: bool = False): """ :param path_to_yaml: Path to the yaml file describing the source """ self._path_to_yaml = path_to_yaml source_config = self._read_and_parse_yaml_file(path_to_yaml) - super().__init__(source_config) + super().__init__(source_config, debug) def _read_and_parse_yaml_file(self, path_to_yaml_file) -> ConnectionDefinition: package = self.__class__.__module__.split(".")[0] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index 2c34c336ebbb..d39c706eb9aa 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -10,7 +10,7 @@ from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import airbyte_cdk.sources.utils.casing as casing -from airbyte_cdk.models import AirbyteLogMessage, AirbyteRecordMessage, AirbyteStream, AirbyteTraceMessage, SyncMode +from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode # list of all possible HTTP methods which can be used for sending of request bodies from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader @@ -22,7 +22,7 @@ # AirbyteRecordMessage: An AirbyteRecordMessage # AirbyteLogMessage: A log message # AirbyteTraceMessage: A trace message -StreamData = Union[Mapping[str, Any], AirbyteRecordMessage, AirbyteLogMessage, AirbyteTraceMessage] +StreamData = Union[Mapping[str, Any], AirbyteLogMessage, AirbyteTraceMessage] def package_name_from_class(cls: object) -> str: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index a0faa4610899..28ea3f506606 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -7,13 +7,13 @@ import os from abc import ABC, abstractmethod from contextlib import suppress -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union +from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union from urllib.parse import urljoin import requests import requests_cache from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams.core import Stream +from airbyte_cdk.sources.streams.core import Stream, StreamData from requests.auth import AuthBase from requests_cache.session import CachedSession @@ -408,24 +408,25 @@ def read_records( cursor_field: List[str] = None, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, - ) -> Iterable[Mapping[str, Any]]: + ) -> Iterable[StreamData]: + yield from self._read_pages( + lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state + ) + + def _read_pages( + self, + records_generator_fn: Callable[ + [requests.PreparedRequest, requests.Response, Mapping[str, Any], Mapping[str, Any]], Iterable[StreamData] + ], + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[StreamData]: stream_state = stream_state or {} pagination_complete = False - next_page_token = None while not pagination_complete: - request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - request = self._create_prepared_request( - path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), - headers=dict(request_headers, **self.authenticator.get_auth_header()), - params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), - json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), - data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), - ) - request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - - response = self._send_request(request, request_kwargs) - yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) + request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token) + yield from records_generator_fn(request, response, stream_state, stream_slice) next_page_token = self.next_page_token(response) if not next_page_token: @@ -434,6 +435,22 @@ def read_records( # Always return an empty generator just in case no records were ever yielded yield from [] + def _fetch_next_page( + self, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> Tuple[requests.PreparedRequest, requests.Response]: + request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + request = self._create_prepared_request( + path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + headers=dict(request_headers, **self.authenticator.get_auth_header()), + params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + ) + request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + + response = self._send_request(request, request_kwargs) + return request, response + class HttpSubStream(HttpStream, ABC): def __init__(self, parent: HttpStream, **kwargs): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/record_helper.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/record_helper.py index e3c3223bd66a..482596230f04 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/record_helper.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/record_helper.py @@ -30,11 +30,9 @@ def stream_data_to_airbyte_message( transformer.transform(data, schema) # type: ignore message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis) return AirbyteMessage(type=MessageType.RECORD, record=message) - elif isinstance(data_or_message, AirbyteRecordMessage): - return AirbyteMessage(type=MessageType.RECORD, record=data_or_message) elif isinstance(data_or_message, AirbyteTraceMessage): return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message) elif isinstance(data_or_message, AirbyteLogMessage): return AirbyteMessage(type=MessageType.LOG, log=data_or_message) else: - raise ValueError(f"Unexpected type for data_or_message: {type(data_or_message)}") + raise ValueError(f"Unexpected type for data_or_message: {type(data_or_message)}: {data_or_message}") diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index ba801f8bcdd5..c9dba84fb101 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -22,7 +22,7 @@ config = {} -@patch.object(HttpStream, "read_records", return_value=[]) +@patch.object(HttpStream, "_read_pages", return_value=[]) def test_simple_retriever_full(mock_http_stream): requester = MagicMock() request_params = {"param": "value"} @@ -45,7 +45,7 @@ def test_simple_retriever_full(mock_http_stream): underlying_state = {"date": "2021-01-01"} iterator.get_stream_state.return_value = underlying_state - requester.get_authenticator.return_value = NoAuth + requester.get_authenticator.return_value = NoAuth() url_base = "https://airbyte.io" requester.get_url_base.return_value = url_base path = "/v1" @@ -77,6 +77,7 @@ def test_simple_retriever_full(mock_http_stream): record_selector=record_selector, stream_slicer=iterator, options={}, + config={}, ) assert retriever.primary_key == primary_key @@ -116,7 +117,9 @@ def test_simple_retriever_full(mock_http_stream): ) def test_should_retry(test_name, requester_response, expected_should_retry, expected_backoff_time): requester = MagicMock(use_cache=False) - retriever = SimpleRetriever(name="stream_name", primary_key=primary_key, requester=requester, record_selector=MagicMock(), options={}) + retriever = SimpleRetriever( + name="stream_name", primary_key=primary_key, requester=requester, record_selector=MagicMock(), options={}, config={} + ) requester.interpret_response_status.return_value = requester_response assert retriever.should_retry(requests.Response()) == expected_should_retry if requester_response.action == ResponseAction.RETRY: @@ -149,7 +152,7 @@ def test_parse_response(test_name, status_code, response_status, len_expected_re record_selector = MagicMock() record_selector.select_records.return_value = [{"id": 100}] retriever = SimpleRetriever( - name="stream_name", primary_key=primary_key, requester=requester, record_selector=record_selector, options={} + name="stream_name", primary_key=primary_key, requester=requester, record_selector=record_selector, options={}, config={} ) response = requests.Response() response.status_code = status_code @@ -180,7 +183,7 @@ def test_backoff_time(test_name, response_action, retry_in, expected_backoff_tim record_selector.select_records.return_value = [{"id": 100}] response = requests.Response() retriever = SimpleRetriever( - name="stream_name", primary_key=primary_key, requester=requester, record_selector=record_selector, options={} + name="stream_name", primary_key=primary_key, requester=requester, record_selector=record_selector, options={}, config={} ) if expected_backoff_time: requester.interpret_response_status.return_value = ResponseStatus(response_action, retry_in) @@ -232,6 +235,7 @@ def test_get_request_options_from_pagination(test_name, paginator_mapping, strea paginator=paginator, stream_slicer=stream_slicer, options={}, + config={}, ) request_option_type_to_method = { @@ -271,7 +275,13 @@ def test_get_request_headers(test_name, paginator_mapping, expected_mapping): record_selector = MagicMock() retriever = SimpleRetriever( - name="stream_name", primary_key=primary_key, requester=requester, record_selector=record_selector, paginator=paginator, options={} + name="stream_name", + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + paginator=paginator, + options={}, + config={}, ) request_option_type_to_method = { @@ -315,6 +325,7 @@ def test_request_body_data(test_name, requester_body_data, paginator_body_data, record_selector=record_selector, paginator=paginator, options={}, + config={}, ) if expected_body_data: @@ -350,6 +361,7 @@ def test_path(test_name, requester_path, paginator_path, expected_path): record_selector=record_selector, paginator=paginator, options={}, + config={}, ) actual_path = retriever.path(stream_state=None, stream_slice=None, next_page_token=None) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_declarative_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_declarative_stream.py index 98f5a4563504..7ff319b9ec74 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_declarative_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_declarative_stream.py @@ -5,12 +5,10 @@ from unittest import mock from unittest.mock import MagicMock, call -from airbyte_cdk.models import SyncMode +from airbyte_cdk.models import AirbyteLogMessage, AirbyteTraceMessage, Level, SyncMode, TraceType from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.transformations import RecordTransformation -from .schema.source_test import SourceTest # noqa #pylint: disable=unused-import - def test_declarative_stream(): name = "stream" @@ -22,8 +20,17 @@ def test_declarative_stream(): schema_loader.get_json_schema.return_value = json_schema state = MagicMock() - records = [{"pk": 1234, "field": "value"}, {"pk": 4567, "field": "different_value"}] - stream_slices = [{"date": "2021-01-01"}, {"date": "2021-01-02"}, {"date": "2021-01-03"}] + records = [ + {"pk": 1234, "field": "value"}, + {"pk": 4567, "field": "different_value"}, + AirbyteLogMessage(level=Level.INFO, message="This is a log message"), + AirbyteTraceMessage(type=TraceType.ERROR, emitted_at=12345), + ] + stream_slices = [ + {"date": "2021-01-01"}, + {"date": "2021-01-02"}, + {"date": "2021-01-03"}, + ] checkpoint_interval = 1000 retriever = MagicMock() @@ -60,5 +67,7 @@ def test_declarative_stream(): assert stream.state_checkpoint_interval == checkpoint_interval for transformation in transformations: assert len(transformation.transform.call_args_list) == len(records) - expected_calls = [call(record, config=config, stream_slice=input_slice, stream_state=state) for record in records] + expected_calls = [ + call(record, config=config, stream_slice=input_slice, stream_state=state) for record in records if isinstance(record, dict) + ] transformation.transform.assert_has_calls(expected_calls, any_order=False) diff --git a/airbyte-cdk/python/unit_tests/sources/utils/test_record_helper.py b/airbyte-cdk/python/unit_tests/sources/utils/test_record_helper.py index ffb3248333da..e1db000716f6 100644 --- a/airbyte-cdk/python/unit_tests/sources/utils/test_record_helper.py +++ b/airbyte-cdk/python/unit_tests/sources/utils/test_record_helper.py @@ -33,14 +33,6 @@ record=AirbyteRecordMessage(stream="my_stream", data={"id": 0, "field_A": 1.0, "field_B": "airbyte"}, emitted_at=NOW), ), ), - ( - "test_record_to_airbyte_record", - AirbyteRecordMessage(stream="my_stream", data={"id": 0, "field_A": 1.0, "field_B": "airbyte"}, emitted_at=NOW), - AirbyteMessage( - type=MessageType.RECORD, - record=AirbyteRecordMessage(stream="my_stream", data={"id": 0, "field_A": 1.0, "field_B": "airbyte"}, emitted_at=NOW), - ), - ), ], ) def test_data_or_record_to_airbyte_record(test_name, data, expected_message): diff --git a/airbyte-integrations/connectors/source-courier/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-courier/integration_tests/configured_catalog.json index 192e8acc5265..2f5330127546 100644 --- a/airbyte-integrations/connectors/source-courier/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-courier/integration_tests/configured_catalog.json @@ -4,34 +4,9 @@ "stream": { "name": "messages", "json_schema": {}, - "supported_sync_modes": ["full_refresh"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "message_info", - "json_schema": {}, - "supported_sync_modes": ["full_refresh"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "message_history", - "json_schema": {}, - "supported_sync_modes": ["full_refresh"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "message_output", - "json_schema": {}, - "supported_sync_modes": ["full_refresh"] + "supported_sync_modes": [ + "full_refresh" + ] }, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"