Skip to content

Commit

Permalink
don't update cursor for log messages and and default schema path comi…
Browse files Browse the repository at this point in the history
…ng from connector builder (#19271)

* don't update cursor for log messages and and default schema path coming from connector builder

* replace check for connector_builder module with a basic default file path

* update changelog and patch version

* catch the correct exception when pkgutil can't load json file
  • Loading branch information
brianjlai authored Nov 11, 2022
1 parent d2d5419 commit 60c008f
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 14 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.8.1
Low-code: Don't update cursor for non-record messages and fix default loader for connector builder manifests

## 0.8.0
Low-code: Allow for request and response to be emitted as log messages

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,14 @@ def read_records(
stream_state,
)
for record in records_generator:
self.stream_slicer.update_cursor(stream_slice, last_record=record)
# Only record messages should be parsed to update the cursor which is indicated by the Mapping type
if isinstance(record, Mapping):
self.stream_slicer.update_cursor(stream_slice, last_record=record)
yield record
else:
last_record = self._last_records[-1] if self._last_records else None
self.stream_slicer.update_cursor(stream_slice, last_record=last_record)
if last_record and isinstance(last_record, Mapping):
self.stream_slicer.update_cursor(stream_slice, last_record=last_record)
yield from []

def stream_slices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def get_json_schema(self) -> Mapping[str, Any]:

try:
return self.default_loader.get_json_schema()
except FileNotFoundError:
except OSError:
# A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the
# runtime options stores stream name 'name' so we'll do the same here
stream_name = self._options.get("name", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@


def _default_file_path() -> str:
# schema files are always in "source_<connector_name>/schemas/<stream_name>.json
# the connector's module name can be inferred by looking at the modules loaded and look for the one starting with source_
# Schema files are always in "source_<connector_name>/schemas/<stream_name>.json
# The connector's module name can be inferred by looking at the modules loaded and look for the one starting with source_
source_modules = [
k for k, v in sys.modules.items() if "source_" in k # example: ['source_exchange_rates', 'source_exchange_rates.source']
]
if not source_modules:
raise RuntimeError("Expected at least one module starting with 'source_'")
module = source_modules[0].split(".")[0]
return f"./{module}/schemas/{{{{options['name']}}}}.json"
k for k, v in sys.modules.items() if "source_" in k
] # example: ['source_exchange_rates', 'source_exchange_rates.source']
if source_modules:
module = source_modules[0].split(".")[0]
return f"./{module}/schemas/{{{{options['name']}}}}.json"

# If we are not in a source_ module, the most likely scenario is we're processing a manifest from the connector builder
# server which does not require a json schema to be defined.
return "./{{options['name']}}.json"


@dataclass
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.8.0",
version="0.8.1",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@
import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status
import pytest
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import AirbyteLogMessage, Level, SyncMode
from airbyte_cdk.sources.declarative.exceptions import ReadException
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers import DatetimeStreamSlicer
from airbyte_cdk.sources.streams.http.auth import NoAuth
from airbyte_cdk.sources.streams.http.http import HttpStream

primary_key = "pk"
records = [{"id": 1}, {"id": 2}]
request_response_logs = [
AirbyteLogMessage(level=Level.INFO, message="request:{}"),
AirbyteLogMessage(level=Level.INFO, message="response{}"),
]
config = {}


Expand Down Expand Up @@ -90,7 +95,7 @@ def test_simple_retriever_full(mock_http_stream):

assert retriever._last_response is None
assert retriever._last_records is None
assert retriever.parse_response(response, stream_state=None) == records
assert retriever.parse_response(response, stream_state={}) == records
assert retriever._last_response == response
assert retriever._last_records == records

Expand All @@ -107,6 +112,67 @@ def test_simple_retriever_full(mock_http_stream):
paginator.reset.assert_called()


@patch.object(HttpStream, "_read_pages", return_value=[*request_response_logs, *records])
def test_simple_retriever_with_request_response_logs(mock_http_stream):
requester = MagicMock()
paginator = MagicMock()
record_selector = MagicMock()
iterator = DatetimeStreamSlicer(
start_datetime="", end_datetime="", step="1d", cursor_field="id", datetime_format="", config={}, options={}
)

retriever = SimpleRetriever(
name="stream_name",
primary_key=primary_key,
requester=requester,
paginator=paginator,
record_selector=record_selector,
stream_slicer=iterator,
options={},
config={},
)

actual_messages = [r for r in retriever.read_records(SyncMode.full_refresh)]
paginator.reset.assert_called()

assert isinstance(actual_messages[0], AirbyteLogMessage)
assert isinstance(actual_messages[1], AirbyteLogMessage)
assert actual_messages[2] == records[0]
assert actual_messages[3] == records[1]


@patch.object(HttpStream, "_read_pages", return_value=[])
def test_simple_retriever_with_request_response_log_last_records(mock_http_stream):
requester = MagicMock()
paginator = MagicMock()
record_selector = MagicMock()
record_selector.select_records.return_value = request_response_logs
response = requests.Response()
iterator = DatetimeStreamSlicer(
start_datetime="", end_datetime="", step="1d", cursor_field="id", datetime_format="", config={}, options={}
)

retriever = SimpleRetriever(
name="stream_name",
primary_key=primary_key,
requester=requester,
paginator=paginator,
record_selector=record_selector,
stream_slicer=iterator,
options={},
config={},
)

assert retriever._last_response is None
assert retriever._last_records is None
assert retriever.parse_response(response, stream_state={}) == request_response_logs
assert retriever._last_response == response
assert retriever._last_records == request_response_logs

[r for r in retriever.read_records(SyncMode.full_refresh)]
paginator.reset.assert_called()


@pytest.mark.parametrize(
"test_name, requester_response, expected_should_retry, expected_backoff_time",
[
Expand Down

0 comments on commit 60c008f

Please sign in to comment.