diff --git a/airbyte-connector-builder-server/connector_builder/generated/apis/default_api_interface.py b/airbyte-connector-builder-server/connector_builder/generated/apis/default_api_interface.py index 70e031d41df9..1af2b51cff7e 100644 --- a/airbyte-connector-builder-server/connector_builder/generated/apis/default_api_interface.py +++ b/airbyte-connector-builder-server/connector_builder/generated/apis/default_api_interface.py @@ -27,6 +27,8 @@ from connector_builder.generated.models.invalid_input_exception_info import InvalidInputExceptionInfo from connector_builder.generated.models.known_exception_info import KnownExceptionInfo +from connector_builder.generated.models.resolve_manifest import ResolveManifest +from connector_builder.generated.models.resolve_manifest_request_body import ResolveManifestRequestBody from connector_builder.generated.models.stream_read import StreamRead from connector_builder.generated.models.stream_read_request_body import StreamReadRequestBody from connector_builder.generated.models.streams_list_read import StreamsListRead @@ -65,6 +67,15 @@ async def read_stream( Reads a specific stream in the source. TODO in a later phase - only read a single slice of data. """ + @abstractmethod + async def resolve_manifest( + self, + resolve_manifest_request_body: ResolveManifestRequestBody = Body(None, description=""), + ) -> ResolveManifest: + """ + Given a JSON manifest, returns a JSON manifest with all of the $refs and $options resolved and flattened + """ + def _assert_signature_is_set(method: Callable) -> None: """ @@ -143,5 +154,20 @@ def initialize_router(api: DefaultApi) -> APIRouter: response_model_by_alias=True, ) + _assert_signature_is_set(api.resolve_manifest) + router.add_api_route( + "/v1/manifest/resolve", + endpoint=api.resolve_manifest, + methods=["POST"], + responses={ + 200: {"model": ResolveManifest, "description": "Successful operation"}, + 400: {"model": KnownExceptionInfo, "description": "Exception occurred; see message for details."}, + 422: {"model": InvalidInputExceptionInfo, "description": "Input failed validation"}, + }, + tags=["default"], + summary="Given a JSON manifest, returns a JSON manifest with all of the $refs and $options resolved and flattened", + response_model_by_alias=True, + ) + return router diff --git a/airbyte-connector-builder-server/connector_builder/generated/models/resolve_manifest.py b/airbyte-connector-builder-server/connector_builder/generated/models/resolve_manifest.py new file mode 100644 index 000000000000..81bcc339a537 --- /dev/null +++ b/airbyte-connector-builder-server/connector_builder/generated/models/resolve_manifest.py @@ -0,0 +1,24 @@ +# coding: utf-8 + +from __future__ import annotations +from datetime import date, datetime # noqa: F401 + +import re # noqa: F401 +from typing import Any, Dict, List, Optional # noqa: F401 + +from pydantic import AnyUrl, BaseModel, EmailStr, validator # noqa: F401 + + +class ResolveManifest(BaseModel): + """NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + + Do not edit the class manually. + + ResolveManifest - a model defined in OpenAPI + + manifest: The manifest of this ResolveManifest. + """ + + manifest: Dict[str, Any] + +ResolveManifest.update_forward_refs() diff --git a/airbyte-connector-builder-server/connector_builder/generated/models/resolve_manifest_request_body.py b/airbyte-connector-builder-server/connector_builder/generated/models/resolve_manifest_request_body.py new file mode 100644 index 000000000000..91aded8ec523 --- /dev/null +++ b/airbyte-connector-builder-server/connector_builder/generated/models/resolve_manifest_request_body.py @@ -0,0 +1,24 @@ +# coding: utf-8 + +from __future__ import annotations +from datetime import date, datetime # noqa: F401 + +import re # noqa: F401 +from typing import Any, Dict, List, Optional # noqa: F401 + +from pydantic import AnyUrl, BaseModel, EmailStr, validator # noqa: F401 + + +class ResolveManifestRequestBody(BaseModel): + """NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + + Do not edit the class manually. + + ResolveManifestRequestBody - a model defined in OpenAPI + + manifest: The manifest of this ResolveManifestRequestBody. + """ + + manifest: Dict[str, Any] + +ResolveManifestRequestBody.update_forward_refs() diff --git a/airbyte-connector-builder-server/connector_builder/impl/default_api.py b/airbyte-connector-builder-server/connector_builder/impl/default_api.py index ef7ad588c3e7..973defc09c41 100644 --- a/airbyte-connector-builder-server/connector_builder/impl/default_api.py +++ b/airbyte-connector-builder-server/connector_builder/impl/default_api.py @@ -10,10 +10,13 @@ from urllib.parse import parse_qs, urljoin, urlparse from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.utils.schema_inferrer import SchemaInferrer from connector_builder.generated.apis.default_api_interface import DefaultApi from connector_builder.generated.models.http_request import HttpRequest from connector_builder.generated.models.http_response import HttpResponse +from connector_builder.generated.models.resolve_manifest import ResolveManifest +from connector_builder.generated.models.resolve_manifest_request_body import ResolveManifestRequestBody from connector_builder.generated.models.stream_read import StreamRead from connector_builder.generated.models.stream_read_pages import StreamReadPages from connector_builder.generated.models.stream_read_request_body import StreamReadRequestBody @@ -104,7 +107,8 @@ async def list_streams(self, streams_list_request_body: StreamsListRequestBody = ) except Exception as error: self.logger.error( - f"Could not list streams with with error: {error.args[0]} - {DefaultApiImpl._get_stacktrace_as_string(error)}") + f"Could not list streams with with error: {error.args[0]} - {DefaultApiImpl._get_stacktrace_as_string(error)}" + ) raise HTTPException(status_code=400, detail=f"Could not list streams with with error: {error.args[0]}") return StreamsListRead(streams=stream_list_read) @@ -128,9 +132,9 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo log_messages = [] try: for message_group in self._get_message_groups( - adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config), - schema_inferrer, - record_limit, + adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config), + schema_inferrer, + record_limit, ): if isinstance(message_group, AirbyteLogMessage): log_messages.append({"message": message_group.message}) @@ -144,11 +148,34 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo detail=f"Could not perform read with with error: {error.args[0]}", ) - return StreamRead(logs=log_messages, slices=[single_slice], - inferred_schema=schema_inferrer.get_stream_schema(stream_read_request_body.stream)) + return StreamRead( + logs=log_messages, slices=[single_slice], inferred_schema=schema_inferrer.get_stream_schema(stream_read_request_body.stream) + ) + + async def resolve_manifest( + self, resolve_manifest_request_body: ResolveManifestRequestBody = Body(None, description="") + ) -> ResolveManifest: + """ + Using the provided manifest, resolves $refs and $options and returns the resulting manifest to the client. + :param manifest_resolve_request_body: Input manifest whose $refs and $options will be resolved + :return: Airbyte record messages produced by the sync grouped by slice and page + """ + try: + return ResolveManifest( + manifest=ManifestDeclarativeSource( + resolve_manifest_request_body.manifest, construct_using_pydantic_models=True + ).resolved_manifest + ) + except Exception as error: + self.logger.error(f"Could not resolve manifest with error: {error.args[0]} - {self._get_stacktrace_as_string(error)}") + raise HTTPException( + status_code=400, + detail=f"Could not resolve manifest with error: {error.args[0]}", + ) - def _get_message_groups(self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, limit: int) -> Iterable[ - Union[StreamReadPages, AirbyteLogMessage]]: + def _get_message_groups( + self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, limit: int + ) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]: """ Message groups are partitioned according to when request log messages are received. Subsequent response log messages and record messages belong to the prior request log message and when we encounter another request, append the latest diff --git a/airbyte-connector-builder-server/setup.py b/airbyte-connector-builder-server/setup.py index e25a3dab7226..d50fed19aee4 100644 --- a/airbyte-connector-builder-server/setup.py +++ b/airbyte-connector-builder-server/setup.py @@ -41,7 +41,7 @@ }, packages=find_packages(exclude=("unit_tests", "integration_tests", "docs")), package_data={}, - install_requires=["airbyte-cdk==0.21.0", "fastapi", "uvicorn"], + install_requires=["airbyte-cdk==0.22", "fastapi", "uvicorn"], python_requires=">=3.9.11", extras_require={ "tests": [ diff --git a/airbyte-connector-builder-server/src/main/openapi/openapi.yaml b/airbyte-connector-builder-server/src/main/openapi/openapi.yaml index 61a828e3fbf0..a104000d0fcb 100644 --- a/airbyte-connector-builder-server/src/main/openapi/openapi.yaml +++ b/airbyte-connector-builder-server/src/main/openapi/openapi.yaml @@ -69,6 +69,27 @@ paths: schema: type: string description: Connector manifest template string + /v1/manifest/resolve: + post: + summary: Given a JSON manifest, returns a JSON manifest with all of the $refs and $options resolved and flattened + operationId: resolveManifest + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/ResolveManifestRequestBody" + required: true + responses: + "200": + description: Successful operation + content: + application/json: + schema: + $ref: "#/components/schemas/ResolveManifest" + "400": + $ref: "#/components/responses/ExceptionResponse" + "422": + $ref: "#/components/responses/InvalidInputResponse" components: schemas: @@ -237,6 +258,22 @@ components: # description: list of slices that will be retrieved for this stream # items: # type: object + ResolveManifestRequestBody: + type: object + required: + - manifest + properties: + manifest: + type: object + description: The config-based connector manifest contents + ResolveManifest: + type: object + required: + - manifest + properties: + manifest: + type: object + description: The config-based connector manifest contents with $refs and $options resolved # The following exception structs were copied from airbyte-api/src/main/openapi/config.yaml InvalidInputProperty: diff --git a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py index d9bbf6a07fa1..4dead48f3dad 100644 --- a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py +++ b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py @@ -11,6 +11,8 @@ from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, Level, Type from connector_builder.generated.models.http_request import HttpRequest from connector_builder.generated.models.http_response import HttpResponse +from connector_builder.generated.models.resolve_manifest import ResolveManifest +from connector_builder.generated.models.resolve_manifest_request_body import ResolveManifestRequestBody from connector_builder.generated.models.stream_read import StreamRead from connector_builder.generated.models.stream_read_pages import StreamReadPages from connector_builder.generated.models.stream_read_request_body import StreamReadRequestBody @@ -623,6 +625,197 @@ def test_create_response_from_log_message(log_message, expected_response): assert actual_response == expected_response +def test_resolve_manifest(): + _stream_name = "stream_with_custom_requester" + _stream_primary_key = "id" + _stream_url_base = "https://api.sendgrid.com" + _stream_options = {"name": _stream_name, "primary_key": _stream_primary_key, "url_base": _stream_url_base} + + manifest = { + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$options": _stream_options, + "schema_loader": {"$ref": "*ref(definitions.schema_loader)"}, + "retriever": "*ref(definitions.retriever)", + }, + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + + expected_resolved_manifest = { + "type": "DeclarativeSource", + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "schema_loader": { + "type": "JsonFileSchemaLoader", + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$options": _stream_options, + }, + "retriever": { + "type": "SimpleRetriever", + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": { + "type": "RequestOption", + "inject_into": "request_parameter", + "field_name": "page_size", + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$options": _stream_options, + }, + "page_token_option": { + "type": "RequestOption", + "inject_into": "path", + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$options": _stream_options, + }, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$options": _stream_options, + }, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$options": _stream_options, + }, + "requester": { + "type": "HttpRequester", + "path": "/v3/marketing/lists", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config.apikey }}", + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$options": _stream_options, + }, + "request_parameters": {"page_size": 10}, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$options": _stream_options, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_pointer": ["result"], + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$options": _stream_options, + }, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$options": _stream_options, + }, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$options": _stream_options, + }, + "name": _stream_name, + "primary_key": _stream_primary_key, + "url_base": _stream_url_base, + "$options": _stream_options, + }, + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + + api = DefaultApiImpl(LowCodeSourceAdapter) + + loop = asyncio.get_event_loop() + actual_response: ResolveManifest = loop.run_until_complete(api.resolve_manifest(ResolveManifestRequestBody(manifest=manifest))) + assert actual_response.manifest == expected_resolved_manifest + + +def test_resolve_manifest_unresolvable_references(): + expected_status_code = 400 + + invalid_manifest = { + "version": "version", + "definitions": {}, + "streams": [ + {"type": "DeclarativeStream", "retriever": "*ref(definitions.retriever)"}, + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + + api = DefaultApiImpl(LowCodeSourceAdapter) + loop = asyncio.get_event_loop() + with pytest.raises(HTTPException) as actual_exception: + loop.run_until_complete(api.resolve_manifest(ResolveManifestRequestBody(manifest=invalid_manifest))) + + assert "Undefined reference *ref(definitions.retriever)" in actual_exception.value.detail + assert actual_exception.value.status_code == expected_status_code + + +def test_resolve_manifest_invalid(): + expected_status_code = 400 + invalid_manifest = {"version": "version"} + + api = DefaultApiImpl(LowCodeSourceAdapter) + loop = asyncio.get_event_loop() + with pytest.raises(HTTPException) as actual_exception: + loop.run_until_complete(api.resolve_manifest(ResolveManifestRequestBody(manifest=invalid_manifest))) + + assert "Could not resolve manifest with error" in actual_exception.value.detail + assert actual_exception.value.status_code == expected_status_code + + def make_mock_adapter_cls(return_value: Iterator) -> MagicMock: mock_source_adapter_cls = MagicMock() mock_source_adapter = MagicMock()