Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

πŸ™ octavia-cli: implement generate command #10132

Merged
merged 11 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions octavia-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ We welcome community contributions!

| Date | Milestone |
|------------|-------------------------------------|
| 2022-02-06 | Implement `octavia generate`|
| 2022-01-25 | Implement `octavia init` + some context checks|
| 2022-01-19 | Implement `octavia list workspace sources`, `octavia list workspace destinations`, `octavia list workspace connections`|
| 2022-01-17 | Implement `octavia list connectors source` and `octavia list connectors destinations`|
Expand Down
4 changes: 4 additions & 0 deletions octavia-cli/octavia_cli/check_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class WorkspaceIdError(click.ClickException):
pass


class ProjectNotInitializedError(click.ClickException):
pass


def check_api_health(api_client: airbyte_api_client.ApiClient) -> None:
"""Check if the Airbyte API is network reachable and healthy.

Expand Down
8 changes: 2 additions & 6 deletions octavia-cli/octavia_cli/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from airbyte_api_client.api import workspace_api

from .check_context import check_api_health, check_is_initialized, check_workspace_exists
from .generate import commands as generate_commands
from .init import commands as init_commands
from .list import commands as list_commands

AVAILABLE_COMMANDS: List[click.Command] = [list_commands._list, init_commands.init]
AVAILABLE_COMMANDS: List[click.Command] = [list_commands._list, init_commands.init, generate_commands.generate]


@click.group()
Expand Down Expand Up @@ -65,11 +66,6 @@ def _import() -> None:
raise click.ClickException("The import command is not yet implemented.")


@octavia.command(help="Generate a YAML configuration file to manage a resource.")
def create() -> None:
raise click.ClickException("The create command is not yet implemented.")


@octavia.command(help="Create or update resources according to YAML configurations.")
def apply() -> None:
raise click.ClickException("The apply command is not yet implemented.")
Expand Down
27 changes: 27 additions & 0 deletions octavia-cli/octavia_cli/generate/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import click
import octavia_cli.generate.definitions as definitions
from octavia_cli.check_context import ProjectNotInitializedError

from .renderer import ConnectionSpecificationRenderer


@click.command(name="generate", help="Generate a YAML template for a source or a destination.")
@click.argument("definition_type", type=click.Choice(["source", "destination"]))
@click.argument("definition_id", type=click.STRING)
@click.argument("resource_name", type=click.STRING)
@click.pass_context
def generate(ctx: click.Context, definition_type: str, definition_id: str, resource_name: str):
if not ctx.obj["PROJECT_IS_INITIALIZED"]:
raise ProjectNotInitializedError(
"Your octavia project is not initialized, please run 'octavia init' before running 'octavia generate'."
)

definition = definitions.factory(definition_type, ctx.obj["API_CLIENT"], definition_id)
renderer = ConnectionSpecificationRenderer(resource_name, definition)
output_path = renderer.write_yaml(project_path=".")
message = f"βœ… - Created the specification template for {resource_name} in {output_path}."
click.echo(click.style(message, fg="green"))
131 changes: 131 additions & 0 deletions octavia-cli/octavia_cli/generate/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import abc
from typing import Any, Callable, Union

import airbyte_api_client
import click
from airbyte_api_client.api import (
destination_definition_api,
destination_definition_specification_api,
source_definition_api,
source_definition_specification_api,
)
from airbyte_api_client.exceptions import ApiException
from airbyte_api_client.model.destination_definition_id_request_body import DestinationDefinitionIdRequestBody
from airbyte_api_client.model.source_definition_id_request_body import SourceDefinitionIdRequestBody


class DefinitionNotFoundError(click.ClickException):
pass


class BaseDefinition(abc.ABC):
COMMON_GET_FUNCTION_KWARGS = {"_check_return_type": False}

specification = None

@property
@abc.abstractmethod
def api(
self,
): # pragma: no cover
pass

@property
@abc.abstractmethod
def type(
self,
): # pragma: no cover
pass

@property
@abc.abstractmethod
def get_function_name(
self,
): # pragma: no cover
pass

@property
def _get_fn(self) -> Callable:
return getattr(self.api, self.get_function_name)

@property
def _get_fn_kwargs(self) -> dict:
return {}

def __init__(self, api_client: airbyte_api_client.ApiClient, id: str) -> None:
self.id = id
self.api_instance = self.api(api_client)
self._api_data = self._read()

def _read(self) -> dict:
try:
return self._get_fn(self.api_instance, **self._get_fn_kwargs, **self.COMMON_GET_FUNCTION_KWARGS)
except ApiException as e:
if e.status in [422, 404]:
raise DefinitionNotFoundError(f"Definition {self.id} does not exists on your Airbyte instance.")
raise e

def __getattr__(self, name: str) -> Any:
"""Map attribute of the API response to the BaseDefinition object.

Args:
name (str): Attribute name

Raises:
AttributeError: Raised if the attributed was not found in the API response payload.

Returns:
[Any]: Attribute value
"""
if name in self._api_data:
return self._api_data.get(name)
raise AttributeError(f"{self.__class__.__name__}.{name} is invalid.")


class SourceDefinition(BaseDefinition):
api = source_definition_api.SourceDefinitionApi
type = "source"
get_function_name = "get_source_definition"

@property
def _get_fn_kwargs(self) -> dict:
return {"source_definition_id_request_body": SourceDefinitionIdRequestBody(self.id)}


class SourceDefinitionSpecification(SourceDefinition):
api = source_definition_specification_api.SourceDefinitionSpecificationApi
get_function_name = "get_source_definition_specification"


class DestinationDefinition(BaseDefinition):
api = destination_definition_api.DestinationDefinitionApi
type = "destination"
get_function_name = "get_destination_definition"

@property
def _get_fn_kwargs(self) -> dict:
return {"destination_definition_id_request_body": DestinationDefinitionIdRequestBody(self.id)}


class DestinationDefinitionSpecification(DestinationDefinition):
api = destination_definition_specification_api.DestinationDefinitionSpecificationApi
get_function_name = "get_destination_definition_specification"


def factory(
definition_type: str, api_client: airbyte_api_client.ApiClient, definition_id: str
) -> Union[SourceDefinition, DestinationDefinition]:
if definition_type == "source":
definition = SourceDefinition(api_client, definition_id)
specification = SourceDefinitionSpecification(api_client, definition_id)
elif definition_type == "destination":
definition = DestinationDefinition(api_client, definition_id)
specification = DestinationDefinitionSpecification(api_client, definition_id)
else:
raise ValueError(f"{definition_type} does not exist")
definition.specification = specification
return definition
160 changes: 160 additions & 0 deletions octavia-cli/octavia_cli/generate/renderer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import os
from typing import Any, Callable, List

from jinja2 import Environment, PackageLoader, select_autoescape

from .definitions import BaseDefinition

JINJA_ENV = Environment(loader=PackageLoader("octavia_cli"), autoescape=select_autoescape(), trim_blocks=False, lstrip_blocks=True)


class FieldToRender:
def __init__(self, name: str, required: bool, field_metadata: dict) -> None:
"""Initialize a FieldToRender instance

Args:
name (str): name of the field
required (bool): whether it's a required field or not
field_metadata (dict): metadata associated with the field
"""
self.name = name
self.required = required
self.field_metadata = field_metadata
self.one_of_values = self._get_one_of_values()
self.object_properties = get_object_fields(field_metadata)
self.array_items = self._get_array_items()
self.comment = self._build_comment(
[
self._get_secret_comment,
self._get_required_comment,
self._get_type_comment,
self._get_description_comment,
self._get_example_comment,
]
)
self.default = self._get_default()

def __getattr__(self, name: str) -> Any:
"""Map field_metadata keys to attributes of Field.

Args:
name (str): attribute name

Returns:
[Any]: attribute value
"""
if name in self.field_metadata:
return self.field_metadata.get(name)

@property
def is_array_of_objects(self) -> bool:
if self.type == "array" and self.items:
if self.items["type"] == "object":
return True
return False

def _get_one_of_values(self) -> List[List["FieldToRender"]]:
"""An object field can have multiple kind of values if it's a oneOf.
This functions returns all the possible one of values the field can take.
Returns:
[list]: List of oneof values.
"""
if not self.oneOf:
return []
one_of_values = []
for one_of_value in self.oneOf:
properties = get_object_fields(one_of_value)
one_of_values.append(properties)
return one_of_values

def _get_array_items(self) -> List["FieldToRender"]:
"""If the field is an array of objects, retrieve fields of these objects.

Returns:
[list]: List of fields
"""
if self.is_array_of_objects:
required_fields = self.items.get("required", [])
return parse_fields(required_fields, self.items["properties"])
return []

def _get_required_comment(self) -> str:
return "REQUIRED" if self.required else "OPTIONAL"

def _get_type_comment(self) -> str:
return self.type if self.type else None

def _get_secret_comment(self) -> str:
return "🀫" if self.airbyte_secret else None
alafanechere marked this conversation as resolved.
Show resolved Hide resolved

def _get_description_comment(self) -> str:
return self.description if self.description else None

def _get_example_comment(self) -> str:
example_comment = None
if self.examples:
if isinstance(self.examples, list):
if len(self.examples) > 1:
example_comment = f"Examples: {', '.join([str(example) for example in self.examples])}"
else:
example_comment = f"Example: {self.examples[0]}"
else:
example_comment = f"Example: {self.examples}"
return example_comment

def _get_default(self) -> str:
if self.const:
return self.const
return self.default

@staticmethod
def _build_comment(comment_functions: Callable) -> str:
return " | ".join(filter(None, [comment_fn() for comment_fn in comment_functions])).replace("\n", "")


def parse_fields(required_fields: List[str], fields: dict) -> List["FieldToRender"]:
return [FieldToRender(f_name, f_name in required_fields, f_metadata) for f_name, f_metadata in fields.items()]


def get_object_fields(field_metadata: dict) -> List["FieldToRender"]:
if field_metadata.get("properties"):
required_fields = field_metadata.get("required", [])
return parse_fields(required_fields, field_metadata["properties"])
return []


class ConnectionSpecificationRenderer:
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
TEMPLATE = JINJA_ENV.get_template("source_or_destination.yaml.j2")

def __init__(self, resource_name: str, definition: BaseDefinition) -> None:
self.resource_name = resource_name
self.definition = definition

def _parse_connection_specification(self, schema: dict) -> List[List["FieldToRender"]]:
if schema.get("oneOf"):
roots = []
for one_of_value in schema.get("oneOf"):
required_fields = one_of_value.get("required", [])
roots.append(parse_fields(required_fields, one_of_value["properties"]))
alafanechere marked this conversation as resolved.
Show resolved Hide resolved
return roots
else:
required_fields = schema.get("required", [])
return [parse_fields(required_fields, schema["properties"])]

def _get_output_path(self, project_path: str) -> str:
return os.path.join(project_path, f"{self.definition.type}s", f"{self.resource_name}.yaml")

def write_yaml(self, project_path: str) -> str:
output_path = self._get_output_path(project_path)
parsed_schema = self._parse_connection_specification(self.definition.specification.connection_specification)
rendered = self.TEMPLATE.render(
{"resource_name": self.resource_name, "definition": self.definition, "configuration_fields": parsed_schema}
)

with open(output_path, "w") as f:
f.write(rendered)
return output_path
Loading