Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4776: Python CDK: Validate input config.py against spec #5457

Merged
merged 12 commits into from
Aug 19, 2021
9 changes: 9 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## 0.1.11
Add checking specified config againt spec for read, write, check and discover commands

## 0.1.10
Upd multiple token support: switch to list of tokens

## 0.1.9
Add multiple token support

## 0.1.8
Allow to fetch primary key info from singer catalog

Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ All tests are located in the `unit_tests` directory. Run `pytest --cov=airbyte_c

1. Bump the package version in `setup.py`
2. Open a PR
3. An Airbyte member must comment `/publish-cdk --dry-run=<true or false>`. Dry runs publish to test.pypi.org.
3. An Airbyte member must comment `/publish-cdk dry-run=true` to publish the package to test.pypi.org or `/publish-cdk dry-run=false` to publish it to the real index of pypi.org.

## Coming Soon

Expand Down
35 changes: 19 additions & 16 deletions airbyte-cdk/python/airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,36 +31,35 @@
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.connector import Connector
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
from pydantic import ValidationError


class Destination(Connector, ABC):
logger = AirbyteLogger()
VALID_CMDS = {"spec", "check", "write"}

@abstractmethod
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
"""Implement to define how the connector writes data to the destination"""

def _run_spec(self) -> AirbyteMessage:
return AirbyteMessage(type=Type.SPEC, spec=self.spec(self.logger))

def _run_check(self, config_path: str) -> AirbyteMessage:
config = self.read_config(config_path=config_path)
def _run_check(self, config: Mapping[str, Any]) -> AirbyteMessage:
check_result = self.check(self.logger, config)
return AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result)

def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]:
""" Reads from stdin, converting to Airbyte messages"""
"""Reads from stdin, converting to Airbyte messages"""
for line in input_stream:
try:
yield AirbyteMessage.parse_raw(line)
except ValidationError:
self.logger.info(f"ignoring input which can't be deserialized as Airbyte Message: {line}")

def _run_write(self, config_path: str, configured_catalog_path: str, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]:
config = self.read_config(config_path=config_path)
def _run_write(
self, config: Mapping[str, Any], configured_catalog_path: str, input_stream: io.TextIOWrapper
) -> Iterable[AirbyteMessage]:
catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
input_messages = self._parse_input_stream(input_stream)
self.logger.info("Begin writing to the destination...")
Expand Down Expand Up @@ -104,18 +103,22 @@ def parse_args(self, args: List[str]) -> argparse.Namespace:

def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
cmd = parsed_args.command
if cmd not in self.VALID_CMDS:
raise Exception(f"Unrecognized command: {cmd}")

spec = self.spec(self.logger)
if cmd == "spec":
yield self._run_spec()
elif cmd == "check":
yield self._run_check(config_path=parsed_args.config)
yield AirbyteMessage(type=Type.SPEC, spec=spec)
return
config = self.read_config(config_path=parsed_args.config)
check_config_against_spec_or_exit(config, spec, self.logger)

if cmd == "check":
yield self._run_check(config=config)
elif cmd == "write":
# Wrap in UTF-8 to override any other input encodings
wrapped_stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8")
yield from self._run_write(
config_path=parsed_args.config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin
)
else:
raise Exception(f"Unrecognized command: {cmd}")
yield from self._run_write(config=config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin)

def run(self, args: List[str]):
parsed_args = self.parse_args(args)
Expand Down
5 changes: 4 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, Status, Type
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit

logger = AirbyteLogger()

Expand Down Expand Up @@ -80,14 +81,16 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
raise Exception("No command passed")

# todo: add try catch for exceptions with different exit codes
source_spec = self.source.spec(logger)

with tempfile.TemporaryDirectory() as temp_dir:
if cmd == "spec":
message = AirbyteMessage(type=Type.SPEC, spec=self.source.spec(logger))
message = AirbyteMessage(type=Type.SPEC, spec=source_spec)
yield message.json(exclude_unset=True)
else:
raw_config = self.source.read_config(parsed_args.config)
config = self.source.configure(raw_config, temp_dir)
check_config_against_spec_or_exit(config, source_spec, logger)

if cmd == "check":
check_result = self.source.check(logger, config)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Initialize Auth Package
from .core import HttpAuthenticator, NoAuth
from .oauth import Oauth2Authenticator
from .token import TokenAuthenticator
from .token import MultipleTokenAuthenticator, TokenAuthenticator

__all__ = [
"HttpAuthenticator",
"NoAuth",
"Oauth2Authenticator",
"TokenAuthenticator",
"MultipleTokenAuthenticator",
]
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
#


from typing import Any, Mapping
from itertools import cycle
from typing import Any, List, Mapping

from .core import HttpAuthenticator

Expand All @@ -36,3 +37,14 @@ def __init__(self, token: str, auth_method: str = "Bearer", auth_header: str = "

def get_auth_header(self) -> Mapping[str, Any]:
return {self.auth_header: f"{self.auth_method} {self._token}"}


class MultipleTokenAuthenticator(HttpAuthenticator):
def __init__(self, tokens: List[str], auth_method: str = "Bearer", auth_header: str = "Authorization"):
self.auth_method = auth_method
self.auth_header = auth_header
self._tokens = tokens
self._tokens_iter = cycle(self._tokens)

def get_auth_header(self) -> Mapping[str, Any]:
return {self.auth_header: f"{self.auth_method} {next(self._tokens_iter)}"}
26 changes: 24 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@
import json
import os
import pkgutil
from typing import Dict
import sys
from logging import Logger
from typing import Any, Dict, Mapping

import pkg_resources
from jsonschema import RefResolver

from airbyte_cdk.models import ConnectorSpecification
from jsonschema import RefResolver, validate
from jsonschema.exceptions import ValidationError


class JsonSchemaResolver:
Expand Down Expand Up @@ -124,3 +129,20 @@ def get_schema(self, name: str) -> dict:
if os.path.exists(shared_schemas_folder):
return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema)
return raw_schema


def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger: Logger):
"""
Check config object against spec. In case of spec is invalid, throws
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice docstring

SystemExit exception causeing application to make system exit call with
errorcode 1
:param config - config loaded from file specified over command line
:param spec - spec object generated by connector
:param logger - Airbyte logger for reporting validation error
"""
spec_schema = spec.connectionSpecification
try:
validate(instance=config, schema=spec_schema)
except ValidationError as validation_error:
logger.error("Config validation error: " + validation_error.message)
sys.exit(1)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.8",
version="0.1.11",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def test_run_check(self, mocker, destination: Destination, tmp_path):
parsed_args = argparse.Namespace(**args)
destination.run_cmd(parsed_args)

mocker.patch.object(destination, "spec", return_value=ConnectorSpecification(connectionSpecification={}))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a test case here too to verify the config is validated.

It may help to look at test coverage by running ./type_check_and_test.sh

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already detailed test case that covers validation function here: https://github.com/airbytehq/airbyte/pull/5457/files#diff-0d9087e19f5bdabe137529bb861aaaa261b39d5344e79041602a2a94c7530301R150 So Ive just added checks if this validation function is called with correct args for destination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done

expected_check_result = AirbyteConnectionStatus(status=Status.SUCCEEDED)
mocker.patch.object(destination, "check", return_value=expected_check_result, autospec=True)

Expand Down Expand Up @@ -216,6 +217,7 @@ def test_run_write(self, mocker, destination: Destination, tmp_path, monkeypatch
mocker.patch.object(
destination, "write", return_value=iter(expected_write_result), autospec=True # convert to iterator to mimic real usage
)
mocker.patch.object(destination, "spec", return_value=ConnectorSpecification(connectionSpecification={}))
# mock input is a record followed by some state messages
mocked_input: List[AirbyteMessage] = [_wrapped(_record("s1", {"k1": "v1"})), *expected_write_result]
mocked_stdin_string = "\n".join([record.json(exclude_unset=True) for record in mocked_input])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import logging

import requests
from airbyte_cdk.sources.streams.http.auth import NoAuth, Oauth2Authenticator, TokenAuthenticator
from airbyte_cdk.sources.streams.http.auth import MultipleTokenAuthenticator, NoAuth, Oauth2Authenticator, TokenAuthenticator
from requests import Response

LOGGER = logging.getLogger(__name__)
Expand All @@ -43,6 +43,16 @@ def test_token_authenticator():
assert {"Authorization": "Bearer test-token"} == header


def test_multiple_token_authenticator():
token = MultipleTokenAuthenticator(["token1", "token2"])
header1 = token.get_auth_header()
assert {"Authorization": "Bearer token1"} == header1
header2 = token.get_auth_header()
assert {"Authorization": "Bearer token2"} == header2
header3 = token.get_auth_header()
assert {"Authorization": "Bearer token1"} == header3


def test_no_auth():
"""
Should always return empty body, no matter how many times token is retrieved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,


def test_next_page_token_is_input_to_other_methods(mocker):
""" Validates that the return value from next_page_token is passed into other methods that need it like request_params, headers, body, etc.."""
"""Validates that the return value from next_page_token is passed into other methods that need it like request_params, headers, body, etc.."""
pages = 5
stream = StubNextPageTokenHttpStream(pages=pages)
blank_response = {} # Send a blank response is fine as we ignore the response in `parse_response anyway.
Expand Down Expand Up @@ -144,6 +144,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:


def test_stub_custom_backoff_http_stream(mocker):
mocker.patch("time.sleep", lambda x: None)
stream = StubCustomBackoffHttpStream()
req = requests.Response()
req.status_code = 429
Expand Down
62 changes: 47 additions & 15 deletions airbyte-cdk/python/unit_tests/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from argparse import Namespace
from copy import deepcopy
from typing import Any, List, Mapping, MutableMapping, Union
from unittest.mock import MagicMock

import pytest
from airbyte_cdk import AirbyteEntrypoint
Expand Down Expand Up @@ -61,6 +62,14 @@ def _as_arglist(cmd: str, named_args: Mapping[str, Any]) -> List[str]:
return out


@pytest.fixture
def spec_mock(mocker):
expected = ConnectorSpecification(connectionSpecification={})
mock = MagicMock(return_value=expected)
mocker.patch.object(MockSource, "spec", mock)
return mock


@pytest.fixture
def entrypoint() -> AirbyteEntrypoint:
return AirbyteEntrypoint(MockSource())
Expand Down Expand Up @@ -121,40 +130,63 @@ def test_run_spec(entrypoint: AirbyteEntrypoint, mocker):
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))


def test_run_check(entrypoint: AirbyteEntrypoint, mocker):
parsed_args = Namespace(command="check", config="config_path")
config = {"username": "fake"}
check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED)
@pytest.fixture
def config_mock(mocker, request):
config = request.param if hasattr(request, "param") else {"username": "fake"}
mocker.patch.object(MockSource, "read_config", return_value=config)
mocker.patch.object(MockSource, "configure", return_value=config)
return config


@pytest.mark.parametrize(
"config_mock, schema, config_valid",
[
({"username": "fake"}, {"type": "object", "properties": {"name": {"type": "string"}}, "additionalProperties": False}, False),
({"username": "fake"}, {"type": "object", "properties": {"username": {"type": "string"}}, "additionalProperties": False}, True),
({"username": "fake"}, {"type": "object", "properties": {"user": {"type": "string"}}}, True),
],
indirect=["config_mock"],
)
def test_config_validate(entrypoint: AirbyteEntrypoint, mocker, config_mock, schema, config_valid):
parsed_args = Namespace(command="check", config="config_path")
check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED)
mocker.patch.object(MockSource, "check", return_value=check_value)
mocker.patch.object(MockSource, "spec", return_value=ConnectorSpecification(connectionSpecification=schema))
if config_valid:
messages = list(entrypoint.run(parsed_args))
assert [_wrap_message(check_value)] == messages
else:
with pytest.raises(SystemExit) as ex_info:
list(entrypoint.run(parsed_args))
assert ex_info.value.code == 1


def test_run_check(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock):
parsed_args = Namespace(command="check", config="config_path")
check_value = AirbyteConnectionStatus(status=Status.SUCCEEDED)
mocker.patch.object(MockSource, "check", return_value=check_value)
assert [_wrap_message(check_value)] == list(entrypoint.run(parsed_args))
assert spec_mock.called


def test_run_discover(entrypoint: AirbyteEntrypoint, mocker):
def test_run_discover(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock):
parsed_args = Namespace(command="discover", config="config_path")
config = {"username": "fake"}
expected = AirbyteCatalog(streams=[AirbyteStream(name="stream", json_schema={"k": "v"})])
mocker.patch.object(MockSource, "read_config", return_value=config)
mocker.patch.object(MockSource, "configure", return_value=config)
mocker.patch.object(MockSource, "discover", return_value=expected)
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))
assert spec_mock.called


def test_run_read(entrypoint: AirbyteEntrypoint, mocker):
def test_run_read(entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock):
parsed_args = Namespace(command="read", config="config_path", state="statepath", catalog="catalogpath")
config = {"username": "fake"}
expected = AirbyteRecordMessage(stream="stream", data={"data": "stuff"}, emitted_at=1)
mocker.patch.object(MockSource, "read_config", return_value=config)
mocker.patch.object(MockSource, "configure", return_value=config)
mocker.patch.object(MockSource, "read_state", return_value={})
mocker.patch.object(MockSource, "read_catalog", return_value={})
mocker.patch.object(MockSource, "read", return_value=[AirbyteMessage(record=expected, type=Type.RECORD)])
assert [_wrap_message(expected)] == list(entrypoint.run(parsed_args))
assert spec_mock.called


def test_invalid_command(entrypoint: AirbyteEntrypoint, mocker):
def test_invalid_command(entrypoint: AirbyteEntrypoint, mocker, config_mock):
with pytest.raises(Exception):
mocker.patch.object(MockSource, "read_config", return_value={})
mocker.patch.object(MockSource, "configure", return_value={})
list(entrypoint.run(Namespace(command="invalid", config="conf")))
2 changes: 1 addition & 1 deletion docs/connector-development/cdk-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ All tests are located in the `unit_tests` directory. Run `pytest --cov=airbyte_c

1. Bump the package version in `setup.py`
2. Open a PR
3. An Airbyte member must comment `/publish-cdk --dry-run=<true or false>`. Dry runs publish to test.pypi.org.
3. An Airbyte member must comment `/publish-cdk dry-run=true` to publish the package to test.pypi.org or `/publish-cdk dry-run=false` to publish it to the real index of pypi.org.

## Coming Soon

Expand Down