Skip to content

Commit

Permalink
[Connector Builder Server] add endpoint to resolve $refs and $options (
Browse files Browse the repository at this point in the history
  • Loading branch information
clnoll authored Jan 23, 2023
1 parent 1e42895 commit eb2d980
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

from connector_builder.generated.models.invalid_input_exception_info import InvalidInputExceptionInfo
from connector_builder.generated.models.known_exception_info import KnownExceptionInfo
from connector_builder.generated.models.resolve_manifest import ResolveManifest
from connector_builder.generated.models.resolve_manifest_request_body import ResolveManifestRequestBody
from connector_builder.generated.models.stream_read import StreamRead
from connector_builder.generated.models.stream_read_request_body import StreamReadRequestBody
from connector_builder.generated.models.streams_list_read import StreamsListRead
Expand Down Expand Up @@ -65,6 +67,15 @@ async def read_stream(
Reads a specific stream in the source. TODO in a later phase - only read a single slice of data.
"""

@abstractmethod
async def resolve_manifest(
self,
resolve_manifest_request_body: ResolveManifestRequestBody = Body(None, description=""),
) -> ResolveManifest:
"""
Given a JSON manifest, returns a JSON manifest with all of the $refs and $options resolved and flattened
"""


def _assert_signature_is_set(method: Callable) -> None:
"""
Expand Down Expand Up @@ -143,5 +154,20 @@ def initialize_router(api: DefaultApi) -> APIRouter:
response_model_by_alias=True,
)

_assert_signature_is_set(api.resolve_manifest)
router.add_api_route(
"/v1/manifest/resolve",
endpoint=api.resolve_manifest,
methods=["POST"],
responses={
200: {"model": ResolveManifest, "description": "Successful operation"},
400: {"model": KnownExceptionInfo, "description": "Exception occurred; see message for details."},
422: {"model": InvalidInputExceptionInfo, "description": "Input failed validation"},
},
tags=["default"],
summary="Given a JSON manifest, returns a JSON manifest with all of the $refs and $options resolved and flattened",
response_model_by_alias=True,
)


return router
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# coding: utf-8

from __future__ import annotations
from datetime import date, datetime # noqa: F401

import re # noqa: F401
from typing import Any, Dict, List, Optional # noqa: F401

from pydantic import AnyUrl, BaseModel, EmailStr, validator # noqa: F401


class ResolveManifest(BaseModel):
"""NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
Do not edit the class manually.
ResolveManifest - a model defined in OpenAPI
manifest: The manifest of this ResolveManifest.
"""

manifest: Dict[str, Any]

ResolveManifest.update_forward_refs()
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# coding: utf-8

from __future__ import annotations
from datetime import date, datetime # noqa: F401

import re # noqa: F401
from typing import Any, Dict, List, Optional # noqa: F401

from pydantic import AnyUrl, BaseModel, EmailStr, validator # noqa: F401


class ResolveManifestRequestBody(BaseModel):
"""NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
Do not edit the class manually.
ResolveManifestRequestBody - a model defined in OpenAPI
manifest: The manifest of this ResolveManifestRequestBody.
"""

manifest: Dict[str, Any]

ResolveManifestRequestBody.update_forward_refs()
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
from urllib.parse import parse_qs, urljoin, urlparse

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer
from connector_builder.generated.apis.default_api_interface import DefaultApi
from connector_builder.generated.models.http_request import HttpRequest
from connector_builder.generated.models.http_response import HttpResponse
from connector_builder.generated.models.resolve_manifest import ResolveManifest
from connector_builder.generated.models.resolve_manifest_request_body import ResolveManifestRequestBody
from connector_builder.generated.models.stream_read import StreamRead
from connector_builder.generated.models.stream_read_pages import StreamReadPages
from connector_builder.generated.models.stream_read_request_body import StreamReadRequestBody
Expand Down Expand Up @@ -104,7 +107,8 @@ async def list_streams(self, streams_list_request_body: StreamsListRequestBody =
)
except Exception as error:
self.logger.error(
f"Could not list streams with with error: {error.args[0]} - {DefaultApiImpl._get_stacktrace_as_string(error)}")
f"Could not list streams with with error: {error.args[0]} - {DefaultApiImpl._get_stacktrace_as_string(error)}"
)
raise HTTPException(status_code=400, detail=f"Could not list streams with with error: {error.args[0]}")
return StreamsListRead(streams=stream_list_read)

Expand All @@ -128,9 +132,9 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo
log_messages = []
try:
for message_group in self._get_message_groups(
adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config),
schema_inferrer,
record_limit,
adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config),
schema_inferrer,
record_limit,
):
if isinstance(message_group, AirbyteLogMessage):
log_messages.append({"message": message_group.message})
Expand All @@ -144,11 +148,34 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo
detail=f"Could not perform read with with error: {error.args[0]}",
)

return StreamRead(logs=log_messages, slices=[single_slice],
inferred_schema=schema_inferrer.get_stream_schema(stream_read_request_body.stream))
return StreamRead(
logs=log_messages, slices=[single_slice], inferred_schema=schema_inferrer.get_stream_schema(stream_read_request_body.stream)
)

async def resolve_manifest(
self, resolve_manifest_request_body: ResolveManifestRequestBody = Body(None, description="")
) -> ResolveManifest:
"""
Using the provided manifest, resolves $refs and $options and returns the resulting manifest to the client.
:param manifest_resolve_request_body: Input manifest whose $refs and $options will be resolved
:return: Airbyte record messages produced by the sync grouped by slice and page
"""
try:
return ResolveManifest(
manifest=ManifestDeclarativeSource(
resolve_manifest_request_body.manifest, construct_using_pydantic_models=True
).resolved_manifest
)
except Exception as error:
self.logger.error(f"Could not resolve manifest with error: {error.args[0]} - {self._get_stacktrace_as_string(error)}")
raise HTTPException(
status_code=400,
detail=f"Could not resolve manifest with error: {error.args[0]}",
)

def _get_message_groups(self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, limit: int) -> Iterable[
Union[StreamReadPages, AirbyteLogMessage]]:
def _get_message_groups(
self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, limit: int
) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]:
"""
Message groups are partitioned according to when request log messages are received. Subsequent response log messages
and record messages belong to the prior request log message and when we encounter another request, append the latest
Expand Down
2 changes: 1 addition & 1 deletion airbyte-connector-builder-server/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
},
packages=find_packages(exclude=("unit_tests", "integration_tests", "docs")),
package_data={},
install_requires=["airbyte-cdk==0.21.0", "fastapi", "uvicorn"],
install_requires=["airbyte-cdk==0.22", "fastapi", "uvicorn"],
python_requires=">=3.9.11",
extras_require={
"tests": [
Expand Down
37 changes: 37 additions & 0 deletions airbyte-connector-builder-server/src/main/openapi/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,27 @@ paths:
schema:
type: string
description: Connector manifest template string
/v1/manifest/resolve:
post:
summary: Given a JSON manifest, returns a JSON manifest with all of the $refs and $options resolved and flattened
operationId: resolveManifest
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ResolveManifestRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/ResolveManifest"
"400":
$ref: "#/components/responses/ExceptionResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"

components:
schemas:
Expand Down Expand Up @@ -237,6 +258,22 @@ components:
# description: list of slices that will be retrieved for this stream
# items:
# type: object
ResolveManifestRequestBody:
type: object
required:
- manifest
properties:
manifest:
type: object
description: The config-based connector manifest contents
ResolveManifest:
type: object
required:
- manifest
properties:
manifest:
type: object
description: The config-based connector manifest contents with $refs and $options resolved

# The following exception structs were copied from airbyte-api/src/main/openapi/config.yaml
InvalidInputProperty:
Expand Down
Loading

0 comments on commit eb2d980

Please sign in to comment.