Skip to content

Commit

Permalink
Merge branch 'master' into leti/refactor-suggested-destinations-compo…
Browse files Browse the repository at this point in the history
…nent

* master:
  🪟 🎉 Enable frontend validation for <1hr syncs (cloud) #19028
  Stream returns AirbyteMessages (#18572)
  🎉 New Source - Recruitee [low-code SDK] (#18671)
  🎉 New source: Breezometer [low-code cdk] (#18650)
  Check disabled connections after protocol update (#18990)
  Simple default replication worker refactor (#19002)
  🎉 New Source: Visma e-conomic (#18595)
  🎉 New Source: Fastbill (#18593)
  Bmoric/extract state api (#18980)
  🎉 New Source: Zapier Supported Storage (#18442)
  🎉 New source: Klarna (#18385)
  `AirbyteEstimateTraceMessage` (#18875)
  Extract source definition api (#18977)
  [low-code cdk] Allow for spec file to be defined in the yaml manifest instead of an external file (#18411)
  🐛 Source HubSpot: fix property scopes (#18624)
  Ensure that only 6-character hex values are passed to monaco editor (#18943)
  • Loading branch information
letiescanciano committed Nov 8, 2022
2 parents 5040772 + 7067977 commit 9c4ce94
Show file tree
Hide file tree
Showing 212 changed files with 10,683 additions and 590 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.7.0
Low-code: Allow connector specifications to be defined in the manifest

## 0.6.0
Low-code: Add support for monthly and yearly incremental updates for `DatetimeStreamSlicer`

Expand Down
27 changes: 27 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Config:

class TraceType(Enum):
ERROR = "ERROR"
ESTIMATE = "ESTIMATE"


class FailureType(Enum):
Expand All @@ -98,6 +99,28 @@ class Config:
failure_type: Optional[FailureType] = Field(None, description="The type of error")


class EstimateType(Enum):
STREAM = "STREAM"
SYNC = "SYNC"


class AirbyteEstimateTraceMessage(BaseModel):
class Config:
extra = Extra.allow

name: str = Field(..., description="The name of the stream")
type: EstimateType = Field(..., description="The type of estimate", title="estimate type")
namespace: Optional[str] = Field(None, description="The namespace of the stream")
row_estimate: Optional[int] = Field(
None,
description="The estimated number of rows to be emitted by this sync for this stream",
)
byte_estimate: Optional[int] = Field(
None,
description="The estimated number of bytes to be emitted by this sync for this stream",
)


class OrchestratorType(Enum):
CONNECTOR_CONFIG = "CONNECTOR_CONFIG"

Expand Down Expand Up @@ -213,6 +236,10 @@ class Config:
type: TraceType = Field(..., description="the type of trace message", title="trace type")
emitted_at: float = Field(..., description="the time in ms that the message was emitted")
error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object")
estimate: Optional[AirbyteEstimateTraceMessage] = Field(
None,
description="Estimate trace message: a guess at how much data will be produced in this sync",
)


class AirbyteControlMessage(BaseModel):
Expand Down
80 changes: 33 additions & 47 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@

import logging
from abc import ABC, abstractmethod
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union

from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Expand All @@ -24,8 +21,8 @@
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.http import HttpStream
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
from airbyte_cdk.sources.utils.transform import TypeTransformer
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

Expand Down Expand Up @@ -241,20 +238,27 @@ def _read_incremental(
stream_state=stream_state,
cursor_field=configured_stream.cursor_field or None,
)
for record_counter, record_data in enumerate(records, start=1):
yield self._as_airbyte_record(stream_name, record_data)
stream_state = stream_instance.get_updated_state(stream_state, record_data)
checkpoint_interval = stream_instance.state_checkpoint_interval
if checkpoint_interval and record_counter % checkpoint_interval == 0:
yield self._checkpoint_state(stream_instance, stream_state, state_manager)

total_records_counter += 1
# This functionality should ideally live outside of this method
# but since state is managed inside this method, we keep track
# of it here.
if self._limit_reached(internal_config, total_records_counter):
# Break from slice loop to save state and exit from _read_incremental function.
break
record_counter = 0
for message_counter, record_data_or_message in enumerate(records, start=1):
message = stream_data_to_airbyte_message(
stream_name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema()
)
yield message
if message.type == MessageType.RECORD:
record = message.record
stream_state = stream_instance.get_updated_state(stream_state, record.data)
checkpoint_interval = stream_instance.state_checkpoint_interval
record_counter += 1
if checkpoint_interval and record_counter % checkpoint_interval == 0:
yield self._checkpoint_state(stream_instance, stream_state, state_manager)

total_records_counter += 1
# This functionality should ideally live outside of this method
# but since state is managed inside this method, we keep track
# of it here.
if self._limit_reached(internal_config, total_records_counter):
# Break from slice loop to save state and exit from _read_incremental function.
break

yield self._checkpoint_state(stream_instance, stream_state, state_manager)
if self._limit_reached(internal_config, total_records_counter):
Expand All @@ -277,50 +281,32 @@ def _read_full_refresh(
total_records_counter = 0
for _slice in slices:
logger.debug("Processing stream slice", extra={"slice": _slice})
records = stream_instance.read_records(
record_data_or_messages = stream_instance.read_records(
stream_slice=_slice,
sync_mode=SyncMode.full_refresh,
cursor_field=configured_stream.cursor_field,
)
for record in records:
yield self._as_airbyte_record(configured_stream.stream.name, record)
total_records_counter += 1
if self._limit_reached(internal_config, total_records_counter):
return
for record_data_or_message in record_data_or_messages:
message = stream_data_to_airbyte_message(
stream_instance.name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema()
)
yield message
if message.type == MessageType.RECORD:
total_records_counter += 1
if self._limit_reached(internal_config, total_records_counter):
return

def _checkpoint_state(self, stream: Stream, stream_state, state_manager: ConnectorStateManager):
# First attempt to retrieve the current state using the stream's state property. We receive an AttributeError if the state
# property is not implemented by the stream instance and as a fallback, use the stream_state retrieved from the stream
# instance's deprecated get_updated_state() method.
try:
state_manager.update_state_for_stream(stream.name, stream.namespace, stream.state)

except AttributeError:
state_manager.update_state_for_stream(stream.name, stream.namespace, stream_state)
return state_manager.create_state_message(stream.name, stream.namespace, send_per_stream_state=self.per_stream_state_enabled)

@lru_cache(maxsize=None)
def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[TypeTransformer, Mapping[str, Any]]:
"""
Lookup stream's transform object and jsonschema based on stream name.
This function would be called a lot so using caching to save on costly
get_json_schema operation.
:param stream_name name of stream from catalog.
:return tuple with stream transformer object and discover json schema.
"""
stream_instance = self._stream_to_instance_map[stream_name]
return stream_instance.transformer, stream_instance.get_json_schema()

def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]):
now_millis = int(datetime.now().timestamp() * 1000)
transformer, schema = self._get_stream_transformer_and_schema(stream_name)
# Transform object fields according to config. Most likely you will
# need it to normalize values against json schema. By default no action
# taken unless configured. See
# docs/connector-development/cdk-python/schemas.md for details.
transformer.transform(data, schema) # type: ignore
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
return AirbyteMessage(type=MessageType.RECORD, record=message)

@staticmethod
def _apply_log_level_to_stream_logger(logger: logging.Logger, stream_instance: Stream):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
Expand Down Expand Up @@ -75,6 +76,7 @@
"RemoveFields": RemoveFields,
"SimpleRetriever": SimpleRetriever,
"SingleSlice": SingleSlice,
"Spec": Spec,
"SubstreamSlicer": SubstreamSlicer,
"WaitUntilTimeFromHeader": WaitUntilTimeFromHeaderBackoffStrategy,
"WaitTimeFromHeader": WaitTimeFromHeaderBackoffStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ def is_object_definition_with_class_name(definition):

@staticmethod
def is_object_definition_with_type(definition):
return isinstance(definition, dict) and "type" in definition
# The `type` field is an overloaded term in the context of the low-code manifest. As part of the language, `type` is shorthand
# for convenience to avoid defining the entire classpath. For the connector specification, `type` is a part of the spec schema.
# For spec parsing, as part of this check, when the type is set to object, we want it to remain a mapping. But when type is
# defined any other way, then it should be parsed as a declarative component in the manifest.
return isinstance(definition, dict) and "type" in definition and definition["type"] != "object"

@staticmethod
def get_default_type(parameter_name, parent_class):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.spec.spec import Spec

__all__ = ["Spec"]
34 changes: 34 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/spec/spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Mapping

from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class Spec(JsonSchemaMixin):
"""
Returns a connection specification made up of information about the connector and how it can be configured
Attributes:
documentation_url (str): The link the Airbyte documentation about this connector
connection_specification (Mapping[str, Any]): information related to how a connector can be configured
"""

documentation_url: str
connection_specification: Mapping[str, Any]
options: InitVar[Mapping[str, Any]]

def generate_spec(self) -> ConnectorSpecification:
"""
Returns the connector specification according the spec block defined in the low code connector manifest.
"""

# We remap these keys to camel case because that's the existing format expected by the rest of the platform
return ConnectorSpecification.parse_obj(
{"documentationUrl": self.documentation_url, "connectionSpecification": self.connection_specification}
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import datetime
import re
from dataclasses import InitVar, dataclass, field
from dateutil.relativedelta import relativedelta
from typing import Any, Iterable, Mapping, Optional, Union

from airbyte_cdk.models import SyncMode
Expand All @@ -17,6 +16,7 @@
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
from dateutil.relativedelta import relativedelta


@dataclass
Expand Down Expand Up @@ -71,7 +71,9 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
stream_state_field_end: Optional[str] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None

timedelta_regex = re.compile(r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$")
timedelta_regex = re.compile(
r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$"
)

def __post_init__(self, options: Mapping[str, Any]):
if not isinstance(self.start_datetime, MinMaxDatetime):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from enum import Enum, EnumMeta
from typing import Any, List, Mapping, Union

from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
Expand All @@ -33,7 +34,7 @@ class ConcreteDeclarativeSource(JsonSchemaMixin):
class YamlDeclarativeSource(DeclarativeSource):
"""Declarative source defined by a yaml file"""

VALID_TOP_LEVEL_FIELDS = {"definitions", "streams", "check", "version"}
VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}

def __init__(self, path_to_yaml):
"""
Expand Down Expand Up @@ -69,6 +70,28 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._apply_log_level_to_stream_logger(self.logger, stream)
return source_streams

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
in the project root.
"""

self.logger.debug(
"parsed YAML into declarative source",
extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)},
)

spec = self._source_config.get("spec")
if spec:
if "class_name" not in spec:
spec["class_name"] = "airbyte_cdk.sources.declarative.spec.Spec"
spec_component = self._factory.create_component(spec, dict())()
return spec_component.generate_spec()
else:
return super().spec(logger)

def _read_and_parse_yaml_file(self, path_to_yaml_file):
package = self.__class__.__module__.split(".")[0]

Expand Down
15 changes: 13 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,24 @@
import inspect
import logging
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.models import AirbyteStream, SyncMode
from airbyte_cdk.models import AirbyteLogMessage, AirbyteRecordMessage, AirbyteStream, AirbyteTraceMessage, SyncMode

# list of all possible HTTP methods which can be used for sending of request bodies
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from deprecated.classic import deprecated

# A stream's read method can return one of the following types:
# Mapping[str, Any]: The content of an AirbyteRecordMessage
# AirbyteRecordMessage: An AirbyteRecordMessage
# AirbyteLogMessage: A log message
# AirbyteTraceMessage: A trace message
StreamData = Union[Mapping[str, Any], AirbyteRecordMessage, AirbyteLogMessage, AirbyteTraceMessage]


def package_name_from_class(cls: object) -> str:
"""Find the package name given a class name"""
Expand Down Expand Up @@ -94,11 +104,12 @@ def read_records(
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
) -> Iterable[StreamData]:
"""
This method should be overridden by subclasses to read records based on the inputs
"""

@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
"""
:return: A dict of the JSON schema representing this stream.
Expand Down
4 changes: 3 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

# Initialize Utils Package

__all__ = ["record_helper"]
Loading

0 comments on commit 9c4ce94

Please sign in to comment.