Skip to content

Commit

Permalink
[low-code cdk] Allow for spec file to be defined in the yaml manifest…
Browse files Browse the repository at this point in the history
… instead of an external file (#18411)

* allow for spec to be defined in the source.yaml manifest instead of an external file

* make spec a component within the language to get schema validation and rework the code for better testing

* fix formatting and extra method

* pr feedback and add some more test

* pr feedback

* bump airbyte-cdk version

* bump version

* gradle format

* remove  from manifest spec
  • Loading branch information
brianjlai authored Nov 7, 2022
1 parent d88a81c commit f9863d6
Show file tree
Hide file tree
Showing 12 changed files with 582 additions and 262 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.7.0
Low-code: Allow connector specifications to be defined in the manifest

## 0.6.0
Low-code: Add support for monthly and yearly incremental updates for `DatetimeStreamSlicer`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
Expand Down Expand Up @@ -75,6 +76,7 @@
"RemoveFields": RemoveFields,
"SimpleRetriever": SimpleRetriever,
"SingleSlice": SingleSlice,
"Spec": Spec,
"SubstreamSlicer": SubstreamSlicer,
"WaitUntilTimeFromHeader": WaitUntilTimeFromHeaderBackoffStrategy,
"WaitTimeFromHeader": WaitTimeFromHeaderBackoffStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ def is_object_definition_with_class_name(definition):

@staticmethod
def is_object_definition_with_type(definition):
return isinstance(definition, dict) and "type" in definition
# The `type` field is an overloaded term in the context of the low-code manifest. As part of the language, `type` is shorthand
# for convenience to avoid defining the entire classpath. For the connector specification, `type` is a part of the spec schema.
# For spec parsing, as part of this check, when the type is set to object, we want it to remain a mapping. But when type is
# defined any other way, then it should be parsed as a declarative component in the manifest.
return isinstance(definition, dict) and "type" in definition and definition["type"] != "object"

@staticmethod
def get_default_type(parameter_name, parent_class):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.spec.spec import Spec

__all__ = ["Spec"]
34 changes: 34 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/spec/spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Mapping

from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class Spec(JsonSchemaMixin):
"""
Returns a connection specification made up of information about the connector and how it can be configured
Attributes:
documentation_url (str): The link the Airbyte documentation about this connector
connection_specification (Mapping[str, Any]): information related to how a connector can be configured
"""

documentation_url: str
connection_specification: Mapping[str, Any]
options: InitVar[Mapping[str, Any]]

def generate_spec(self) -> ConnectorSpecification:
"""
Returns the connector specification according the spec block defined in the low code connector manifest.
"""

# We remap these keys to camel case because that's the existing format expected by the rest of the platform
return ConnectorSpecification.parse_obj(
{"documentationUrl": self.documentation_url, "connectionSpecification": self.connection_specification}
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import datetime
import re
from dataclasses import InitVar, dataclass, field
from dateutil.relativedelta import relativedelta
from typing import Any, Iterable, Mapping, Optional, Union

from airbyte_cdk.models import SyncMode
Expand All @@ -17,6 +16,7 @@
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
from dateutil.relativedelta import relativedelta


@dataclass
Expand Down Expand Up @@ -71,7 +71,9 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
stream_state_field_end: Optional[str] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None

timedelta_regex = re.compile(r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$")
timedelta_regex = re.compile(
r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$"
)

def __post_init__(self, options: Mapping[str, Any]):
if not isinstance(self.start_datetime, MinMaxDatetime):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from enum import Enum, EnumMeta
from typing import Any, List, Mapping, Union

from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
Expand All @@ -33,7 +34,7 @@ class ConcreteDeclarativeSource(JsonSchemaMixin):
class YamlDeclarativeSource(DeclarativeSource):
"""Declarative source defined by a yaml file"""

VALID_TOP_LEVEL_FIELDS = {"definitions", "streams", "check", "version"}
VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}

def __init__(self, path_to_yaml):
"""
Expand Down Expand Up @@ -69,6 +70,28 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._apply_log_level_to_stream_logger(self.logger, stream)
return source_streams

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
in the project root.
"""

self.logger.debug(
"parsed YAML into declarative source",
extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)},
)

spec = self._source_config.get("spec")
if spec:
if "class_name" not in spec:
spec["class_name"] = "airbyte_cdk.sources.declarative.spec.Spec"
spec_component = self._factory.create_component(spec, dict())()
return spec_component.generate_spec()
else:
return super().spec(logger)

def _read_and_parse_yaml_file(self, path_to_yaml_file):
package = self.__class__.__module__.split(".")[0]

Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.6.0",
version="0.7.0",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#

import datetime
from dateutil.relativedelta import relativedelta
from typing import List, Optional, Union

import pytest
Expand Down Expand Up @@ -41,6 +40,7 @@
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from dateutil.relativedelta import relativedelta
from jsonschema import ValidationError

factory = DeclarativeComponentFactory()
Expand Down Expand Up @@ -345,6 +345,22 @@ def test_full_config():
check:
class_name: airbyte_cdk.sources.declarative.checks.check_stream.CheckStream
stream_names: ["list_stream"]
spec:
class_name: airbyte_cdk.sources.declarative.spec.Spec
documentation_url: https://airbyte.com/#yaml-from-manifest
connection_specification:
title: Test Spec
type: object
required:
- api_key
additionalProperties: false
properties:
api_key:
type: string
airbyte_secret: true
title: API Key
description: Test API Key
order: 0
"""
config = parser.parse(content)

Expand Down Expand Up @@ -377,6 +393,20 @@ def test_full_config():
assert len(streams_to_check) == 1
assert list(streams_to_check)[0] == "list_stream"

spec = factory.create_component(config["spec"], input_config)()
documentation_url = spec.documentation_url
connection_specification = spec.connection_specification
assert documentation_url == "https://airbyte.com/#yaml-from-manifest"
assert connection_specification["title"] == "Test Spec"
assert connection_specification["required"] == ["api_key"]
assert connection_specification["properties"]["api_key"] == {
"type": "string",
"airbyte_secret": True,
"title": "API Key",
"description": "Test API Key",
"order": 0,
}

assert stream.retriever.requester.path.default == "marketing/lists"


Expand Down
Loading

0 comments on commit f9863d6

Please sign in to comment.