Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source-gcs): added oauth flow #45414

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 114 additions & 13 deletions airbyte-integrations/connectors/source-gcs/integration_tests/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,6 @@
"mode": "local"
},
"type": "object",
"discriminator": {
"propertyName": "mode",
"mapping": {
"local": "#/definitions/LocalProcessingConfigModel",
"api": "#/definitions/APIProcessingConfigModel"
}
},
"oneOf": [
{
"title": "Local",
Expand Down Expand Up @@ -437,12 +430,76 @@
"required": ["name", "format"]
}
},
"service_account": {
"title": "Service Account Information",
"description": "Enter your Google Cloud <a href=\"https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating_service_account_keys\">service account key</a> in JSON format",
"airbyte_secret": true,
"credentials": {
"title": "Authentication",
"description": "Credentials for connecting to the Google Cloud Storage API",
"type": "object",
"order": 0,
"type": "string"
"oneOf": [
{
"title": "Authenticate via Google (OAuth)",
"type": "object",
"properties": {
"auth_type": {
"title": "Auth Type",
"default": "Client",
"const": "Client",
"enum": ["Client"],
"type": "string"
},
"client_id": {
"title": "Client ID",
"description": "Client ID",
"airbyte_secret": true,
"type": "string"
},
"client_secret": {
"title": "Client Secret",
"description": "Client Secret",
"airbyte_secret": true,
"type": "string"
},
"access_token": {
"title": "Access Token",
"description": "Access Token",
"airbyte_secret": true,
"type": "string"
},
"refresh_token": {
"title": "Access Token",
"description": "Access Token",
"airbyte_secret": true,
"type": "string"
}
},
"required": [
"client_id",
"client_secret",
"access_token",
"refresh_token"
]
},
{
"title": "Service Account Authentication.",
"type": "object",
"properties": {
"auth_type": {
"title": "Auth Type",
"default": "Service",
"const": "Service",
"enum": ["Service"],
"type": "string"
},
"service_account": {
"title": "Service Account Information.",
"description": "Enter your Google Cloud <a href=\"https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating_service_account_keys\">service account key</a> in JSON format",
"airbyte_secret": true,
"type": "string"
}
},
"required": ["service_account"]
}
]
},
"bucket": {
"title": "Bucket",
Expand All @@ -451,6 +508,50 @@
"type": "string"
}
},
"required": ["streams", "service_account", "bucket"]
"required": ["streams", "credentials", "bucket"]
},
"advanced_auth": {
"auth_flow_type": "oauth2.0",
"predicate_key": ["credentials", "auth_type"],
"predicate_value": "Client",
"oauth_config_specification": {
"complete_oauth_output_specification": {
"type": "object",
"properties": {
"access_token": {
"type": "string",
"path_in_connector_config": ["credentials", "access_token"]
},
"refresh_token": {
"type": "string",
"path_in_connector_config": ["credentials", "refresh_token"]
}
}
},
"complete_oauth_server_input_specification": {
"type": "object",
"properties": {
"client_id": {
"type": "string"
},
"client_secret": {
"type": "string"
}
}
},
"complete_oauth_server_output_specification": {
"type": "object",
"properties": {
"client_id": {
"type": "string",
"path_in_connector_config": ["credentials", "client_id"]
},
"client_secret": {
"type": "string",
"path_in_connector_config": ["credentials", "client_secret"]
}
}
}
}
}
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-gcs/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 2a8c41ae-8c23-4be0-a73f-2ab10ca1a820
dockerImageTag: 0.7.4
dockerImageTag: 0.8.0
dockerRepository: airbyte/source-gcs
documentationUrl: https://docs.airbyte.com/integrations/sources/gcs
githubIssueLabel: source-gcs
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-gcs/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.7.4"
version = "0.8.0"
name = "source-gcs"
description = "Source implementation for Gcs."
authors = [ "Airbyte <[email protected]>",]
Expand Down
67 changes: 53 additions & 14 deletions airbyte-integrations/connectors/source-gcs/source_gcs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,68 @@
#


from typing import Literal, Union

from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from pydantic.v1 import AnyUrl, Field
from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from pydantic.v1 import AnyUrl, BaseModel, Field


class Config(AbstractFileBasedSpec):
"""
NOTE: When this Spec is changed, legacy_config_transformer.py must also be
modified to uptake the changes because it is responsible for converting
legacy GCS configs into file based configs using the File-Based CDK.
"""
class OAuthCredentials(BaseModel):
class Config(OneOfOptionConfig):
title = "Authenticate via Google (OAuth)"

auth_type: Literal["Client"] = Field("Client", const=True)
client_id: str = Field(
title="Client ID",
description="Client ID",
airbyte_secret=True,
)
client_secret: str = Field(
title="Client Secret",
description="Client Secret",
airbyte_secret=True,
)
access_token: str = Field(
title="Access Token",
description="Access Token",
airbyte_secret=True,
)
refresh_token: str = Field(
title="Access Token",
description="Access Token",
airbyte_secret=True,
)


class ServiceAccountCredentials(BaseModel):
class Config(OneOfOptionConfig):
title = "Service Account Authentication."

auth_type: Literal["Service"] = Field("Service", const=True)
service_account: str = Field(
title="Service Account Information",
title="Service Account Information.",
airbyte_secret=True,
description=(
"Enter your Google Cloud "
'<a href="https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating_service_account_keys">'
'Enter your Google Cloud <a href="https://cloud.google.com/iam/docs/'
'creating-managing-service-account-keys#creating_service_account_keys">'
"service account key</a> in JSON format"
),
)


class Config(AbstractFileBasedSpec, BaseModel):
"""
NOTE: When this Spec is changed, legacy_config_transformer.py must also be
modified to uptake the changes because it is responsible for converting
legacy GCS configs into file based configs using the File-Based CDK.
"""

credentials: Union[OAuthCredentials, ServiceAccountCredentials] = Field(
title="Authentication",
description="Credentials for connecting to the Google Cloud Storage API",
type="object",
discriminator="auth_type",
order=0,
)

Expand All @@ -33,7 +76,3 @@ def documentation_url(cls) -> AnyUrl:
Returns the documentation URL.
"""
return AnyUrl("https://docs.airbyte.com/integrations/sources/gcs", scheme="https")

@staticmethod
def remove_discriminator(schema) -> None:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from typing import Any, List, Mapping

from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository


class MigrateServiceAccount:
message_repository: MessageRepository = InMemoryMessageRepository()
migrate_from_path = "service_account"
migrate_to_path = "credentials"

@classmethod
def should_migrate(cls, config: Mapping[str, Any]) -> bool:
return config.get(cls.migrate_from_path) and not config.get(cls.migrate_to_path)

@classmethod
def transform(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
config[cls.migrate_to_path] = {"service_account": config[cls.migrate_from_path], "auth_type": "Service"}
return config

@classmethod
def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str, Any]) -> Mapping[str, Any]:
migrated_config = cls.transform(config)

source.write_config(migrated_config, config_path)

return migrated_config

@classmethod
def emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
girarda marked this conversation as resolved.
Show resolved Hide resolved
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository._message_queue:
print(message)

@classmethod
def migrate(cls, args: List[str], source: Source) -> None:
"""
This method checks the input args, should the config be migrated,
transform if necessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls.should_migrate(config):
cls.emit_control_message(
cls.modify_and_save(config_path, source, config),
)
15 changes: 12 additions & 3 deletions airbyte-integrations/connectors/source-gcs/source_gcs/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@

from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from google.cloud import storage
from google.oauth2 import service_account
from google.oauth2 import credentials, service_account


def get_gcs_client(config):
credentials = service_account.Credentials.from_service_account_info(json.loads(config.service_account))
client = storage.Client(credentials=credentials)
if config.credentials.auth_type == "Service":
creds = service_account.Credentials.from_service_account_info(json.loads(config.credentials.service_account))
else:
creds = credentials.Credentials(
config.credentials.access_token,
refresh_token=config.credentials.refresh_token,
token_uri="https://oauth2.googleapis.com/token",
client_id=config.credentials.client_id,
client_secret=config.credentials.client_secret,
)
client = storage.Client(credentials=creds)
return client


Expand Down
2 changes: 2 additions & 0 deletions airbyte-integrations/connectors/source-gcs/source_gcs/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteMessageSerializer, AirbyteTraceMessage, TraceType, Type
from orjson import orjson
from source_gcs import Config, Cursor, SourceGCS, SourceGCSStreamReader
from source_gcs.config_migrations import MigrateServiceAccount


def run():
Expand All @@ -27,6 +28,7 @@ def run():
SourceGCS.read_state(state_path) if state_path else None,
cursor_cls=Cursor,
)
MigrateServiceAccount.migrate(_args, source)
except Exception:
print(
orjson.dumps(
Expand Down
32 changes: 32 additions & 0 deletions airbyte-integrations/connectors/source-gcs/source_gcs/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Mapping, Optional

from airbyte_cdk import emit_configuration_as_airbyte_control_message
from airbyte_cdk.models import AdvancedAuth, AuthFlowType, ConnectorSpecification, OAuthConfigSpecification
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
Expand Down Expand Up @@ -34,6 +35,37 @@ def read_config(cls, config_path: str) -> Mapping[str, Any]:
def _is_file_based_config(config: Mapping[str, Any]) -> bool:
return "streams" in config

def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
return ConnectorSpecification(
documentationUrl=self.spec_class.documentation_url(),
connectionSpecification=self.spec_class.schema(),
advanced_auth=AdvancedAuth(
auth_flow_type=AuthFlowType.oauth2_0,
predicate_key=["credentials", "auth_type"],
predicate_value="Client",
oauth_config_specification=OAuthConfigSpecification(
complete_oauth_output_specification={
"type": "object",
"properties": {
"access_token": {"type": "string", "path_in_connector_config": ["credentials", "access_token"]},
"refresh_token": {"type": "string", "path_in_connector_config": ["credentials", "refresh_token"]},
},
},
complete_oauth_server_input_specification={
"type": "object",
"properties": {"client_id": {"type": "string"}, "client_secret": {"type": "string"}},
},
complete_oauth_server_output_specification={
"type": "object",
"properties": {
"client_id": {"type": "string", "path_in_connector_config": ["credentials", "client_id"]},
"client_secret": {"type": "string", "path_in_connector_config": ["credentials", "client_secret"]},
},
},
),
),
)

def _make_default_stream(
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
) -> AbstractFileBasedStream:
Expand Down
Loading
Loading