Skip to content

Commit

Permalink
CDK: private configuration option _limit and _page_size (#5617)
Browse files Browse the repository at this point in the history
* CDK: private configuration option _limit and _page_size
  • Loading branch information
avida authored Aug 31, 2021
1 parent 3aabd92 commit 7584440
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 5 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.15
Add \_limit and \_page_size as internal config parameters for SAT

## 0.1.14
If the input config file does not comply with spec schema, raise an exception instead of `system.exit`.

Expand Down
7 changes: 6 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, Status, Type
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config

logger = AirbyteLogger()

Expand Down Expand Up @@ -90,7 +90,12 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
else:
raw_config = self.source.read_config(parsed_args.config)
config = self.source.configure(raw_config, temp_dir)
# Remove internal flags from config before validating so
# jsonschema's additionalProperties flag wont fail the validation
config, internal_config = split_config(config)
check_config_against_spec_or_exit(config, source_spec, logger)
# Put internal flags back to config dict
config.update(internal_config.dict())

if cmd == "check":
check_result = self.source.check(logger, config)
Expand Down
17 changes: 16 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.http import HttpStream
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config


class AbstractSource(Source, ABC):
Expand Down Expand Up @@ -95,6 +97,7 @@ def read(
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
connector_state = copy.deepcopy(state or {})
logger.info(f"Starting syncing {self.name}")
config, internal_config = split_config(config)
# TODO assert all streams exist in the connector
# get the streams once in case the connector needs to make any queries to generate them
stream_instances = {s.name: s for s in self.streams(config)}
Expand All @@ -107,7 +110,11 @@ def read(

try:
yield from self._read_stream(
logger=logger, stream_instance=stream_instance, configured_stream=configured_stream, connector_state=connector_state
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
connector_state=connector_state,
internal_config=internal_config,
)
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {self.name}")
Expand All @@ -121,8 +128,13 @@ def _read_stream(
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
connector_state: MutableMapping[str, Any],
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:

if internal_config.page_size and isinstance(stream_instance, HttpStream):
logger.info(f"Setting page size for {stream_instance.name} to {internal_config.page_size}")
stream_instance.page_size = internal_config.page_size

use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
if use_incremental:
record_iterator = self._read_incremental(logger, stream_instance, configured_stream, connector_state)
Expand All @@ -135,6 +147,9 @@ def _read_stream(
for record in record_iterator:
if record.type == MessageType.RECORD:
record_counter += 1
if internal_config.limit and record_counter > internal_config.limit:
logger.info(f"Reached limit defined by internal config ({internal_config.limit}), stop reading")
break
yield record

logger.info(f"Read {record_counter} records from {stream_name} stream")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class HttpStream(Stream, ABC):
"""

source_defined_cursor = True # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table)
page_size = None # Use this variable to define page size for API http requests with pagination support

def __init__(self, authenticator: HttpAuthenticator = NoAuth()):
self._authenticator = authenticator
Expand Down
31 changes: 30 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
import json
import os
import pkgutil
from typing import Any, Dict, Mapping
from typing import Any, ClassVar, Dict, Mapping, Tuple

import pkg_resources
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import ConnectorSpecification
from jsonschema import RefResolver, validate
from jsonschema.exceptions import ValidationError
from pydantic import BaseModel, Field


class JsonSchemaResolver:
Expand Down Expand Up @@ -142,3 +143,31 @@ def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: Connector
validate(instance=config, schema=spec_schema)
except ValidationError as validation_error:
raise Exception("Config validation error: " + validation_error.message)


class InternalConfig(BaseModel):
KEYWORDS: ClassVar[set] = {"_limit", "_page_size"}
limit: int = Field(None, alias="_limit")
page_size: int = Field(None, alias="_page_size")

def dict(self):
return super().dict(by_alias=True, exclude_unset=True)


def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]:
"""
Break config map object into 2 instances: first is a dict with user defined
configuration and second is internal config that contains private keys for
acceptance test configuration.
:param config - Dict object that has been loaded from config file.
:return tuple of user defined config dict with filtered out internal
parameters and SAT internal config object.
"""
main_config = {}
internal_config = {}
for k, v in config.items():
if k in InternalConfig.KEYWORDS:
internal_config[k] = v
else:
main_config[k] = v
return main_config, InternalConfig.parse_obj(internal_config)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ Given that we'll pulling currency data for our example source, we'll define the
}
}
```
Beside regular parameter there is intenal CDK config that started with '_' character and used mainly for testing purposes:

* _limit - set maximum number of records being read for each stream
* _page_size - for http based streams set number of records for each page. Depends on stream implementation.


In addition to metadata, we define two inputs:

Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.14",
version="0.1.15",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
96 changes: 95 additions & 1 deletion airbyte-cdk/python/unit_tests/sources/test_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import json
import tempfile
from typing import Any, Mapping, MutableMapping
from unittest.mock import MagicMock

import pytest
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sources import Source
from airbyte_cdk.sources import AbstractSource, Source
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.http.http import HttpStream


class MockSource(Source):
Expand All @@ -51,6 +54,40 @@ def source():
return MockSource()


@pytest.fixture
def abstract_source(mocker):
mocker.patch.multiple(HttpStream, __abstractmethods__=set())
mocker.patch.multiple(Stream, __abstractmethods__=set())

class MockHttpStream(MagicMock, HttpStream):
url_base = "http://example.com"
path = "/dummy/path"

def __init__(self, *args, **kvargs):
MagicMock.__init__(self)
HttpStream.__init__(self, *args, kvargs)
self.read_records = MagicMock()

class MockStream(MagicMock, Stream):
page_size = None

def __init__(self, *args, **kvargs):
MagicMock.__init__(self)
self.read_records = MagicMock()

streams = [MockHttpStream(), MockStream()]

class MockAbstractSource(AbstractSource):
def check_connection(self):
return True, None

def streams(self, config):
self.streams_config = config
return streams

return MockAbstractSource()


def test_read_state(source):
state = {"updated_at": "yesterday"}

Expand Down Expand Up @@ -81,3 +118,60 @@ def test_read_catalog(source):
catalog_file.flush()
actual = source.read_catalog(catalog_file.name)
assert actual == expected


def test_internal_config(abstract_source):
configured_catalog = {
"streams": [
{
"stream": {"name": "mock_http_stream", "json_schema": {}},
"destination_sync_mode": "overwrite",
"sync_mode": "full_refresh",
},
{
"stream": {"name": "mock_stream", "json_schema": {}},
"destination_sync_mode": "overwrite",
"sync_mode": "full_refresh",
},
]
}
catalog = ConfiguredAirbyteCatalog.parse_obj(configured_catalog)
streams = abstract_source.streams(None)
assert len(streams) == 2
http_stream = streams[0]
non_http_stream = streams[1]
assert isinstance(http_stream, HttpStream)
assert not isinstance(non_http_stream, HttpStream)
http_stream.read_records.return_value = [{}] * 3
non_http_stream.read_records.return_value = [{}] * 3

# Test with empty config
records = [r for r in abstract_source.read(logger=MagicMock(), config={}, catalog=catalog, state={})]
# 3 for http stream and 3 for non http stream
assert len(records) == 3 + 3
assert http_stream.read_records.called
assert non_http_stream.read_records.called
# Make sure page_size havent been set
assert not http_stream.page_size
assert not non_http_stream.page_size
# Test with records limit set to 1
internal_config = {"some_config": 100, "_limit": 1}
records = [r for r in abstract_source.read(logger=MagicMock(), config=internal_config, catalog=catalog, state={})]
# 1 from http stream + 1 from non http stream
assert len(records) == 1 + 1
assert "_limit" not in abstract_source.streams_config
assert "some_config" in abstract_source.streams_config
# Test with records limit set to number that exceeds expceted records
internal_config = {"some_config": 100, "_limit": 20}
records = [r for r in abstract_source.read(logger=MagicMock(), config=internal_config, catalog=catalog, state={})]
assert len(records) == 3 + 3

# Check if page_size paramter is set to http instance only
internal_config = {"some_config": 100, "_page_size": 2}
records = [r for r in abstract_source.read(logger=MagicMock(), config=internal_config, catalog=catalog, state={})]
assert "_page_size" not in abstract_source.streams_config
assert "some_config" in abstract_source.streams_config
assert len(records) == 3 + 3
assert http_stream.page_size == 2
# Make sure page_size havent been set for non http streams
assert not non_http_stream.page_size
5 changes: 5 additions & 0 deletions airbyte-cdk/python/unit_tests/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ def config_mock(mocker, request):
({"username": "fake"}, {"type": "object", "properties": {"name": {"type": "string"}}, "additionalProperties": False}, False),
({"username": "fake"}, {"type": "object", "properties": {"username": {"type": "string"}}, "additionalProperties": False}, True),
({"username": "fake"}, {"type": "object", "properties": {"user": {"type": "string"}}}, True),
(
{"username": "fake", "_limit": 22},
{"type": "object", "properties": {"username": {"type": "string"}}, "additionalProperties": False},
True,
),
],
indirect=["config_mock"],
)
Expand Down

0 comments on commit 7584440

Please sign in to comment.