Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/snowflake): Okta OAuth support; update docs #8157

Merged
merged 2 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion metadata-ingestion/docs/sources/snowflake/snowflake_pre.md
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for updating these docs

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ grant imported privileges on database snowflake to role datahub_role;

The details of each granted privilege can be viewed in [snowflake docs](https://docs.snowflake.com/en/user-guide/security-access-control-privileges.html). A summarization of each privilege, and why it is required for this connector:

- `operate` is required on warehouse to execute queries
- `operate` is required only to start the warehouse.
If the warehouse is already running during ingestion or has auto-resume enabled,
this permission is not required.
- `usage` is required for us to run queries using the warehouse
- `usage` on `database` and `schema` are required because without it tables and views inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view exists then we will not be able to get metadata for the table/view.
- If metadata is required only on some schemas then you can grant the usage privilieges only on a particular schema like
Expand All @@ -59,6 +61,44 @@ If you plan to enable extraction of table lineage, via the `include_table_lineag
grant imported privileges on database snowflake to role datahub_role;
```

### Authentication
Authentication is most simply done via a Snowflake user and password.

Alternatively, other authentication methods are supported via the `authentication_type` config option.

#### Okta OAuth
To set up Okta OAuth authentication, roughly follow the four steps in [this guide](https://docs.snowflake.com/en/user-guide/oauth-okta).

Pass in the following values, as described in the article, for your recipe's `oauth_config`:
- `provider`: okta
- `client_id`: `<OAUTH_CLIENT_ID>`
- `client_secret`: `<OAUTH_CLIENT_SECRET>`
- `authority_url`: `<OKTA_OAUTH_TOKEN_ENDPOINT>`
- `scopes`: The list of your *Okta* scopes, i.e. with the `session:role:` prefix

Datahub only supports two OAuth grant types: `client_credentials` and `password`.
The steps slightly differ based on which you decide to use.

##### Client Credentials Grant Type (Simpler)
- When creating an Okta App Integration, choose type `API Services`
+ Ensure client authentication method is `Client secret`
+ Note your `Client ID`
- Create a Snowflake user to correspond to your newly created Okta client credentials
+ *Ensure the user's `Login Name` matches your Okta application's `Client ID`*
+ Ensure the user has been granted your datahub role

##### Password Grant Type
- When creating an Okta App Integration, choose type `OIDC` -> `Native Application`
+ Add Grant Type `Resource Owner Password`
+ Ensure client authentication method is `Client secret`
- Create an Okta user to sign into, noting the `Username` and `Password`
- Create a Snowflake user to correspond to your newly created Okta client credentials
+ *Ensure the user's `Login Name` matches your Okta user's `Username` (likely an email)*
+ Ensure the user has been granted your datahub role
- When running ingestion, provide the required `oauth_config` fields,
including `client_id` and `client_secret`, plus your Okta user's `Username` and `Password`
* Note: the `username` and `password` config options are not nested under `oauth_config`

### Caveats

- Some of the features are only available in the Snowflake Enterprise Edition. This doc has notes mentioning where this applies.
Expand Down
28 changes: 0 additions & 28 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,34 +187,6 @@ def load_config(self, config_fp: IO) -> dict:
pass


class OauthConfiguration(ConfigModel):
provider: Optional[str] = Field(
description="Identity provider for oauth, e.g- microsoft"
)
client_id: Optional[str] = Field(
description="client id of your registered application"
)
scopes: Optional[List[str]] = Field(
description="scopes required to connect to snowflake"
)
use_certificate: bool = Field(
description="Do you want to use certificate and private key to authenticate using oauth",
default=False,
)
client_secret: Optional[str] = Field(
description="client secret of the application if use_certificate = false"
)
authority_url: Optional[str] = Field(
description="Authority url of your identity provider"
)
encoded_oauth_public_key: Optional[str] = Field(
description="base64 encoded certificate content if use_certificate = true"
)
encoded_oauth_private_key: Optional[str] = Field(
description="base64 encoded private key content if use_certificate = true"
)


class AllowDenyPattern(ConfigModel):
"""A class to store allow deny regexes"""

Expand Down
34 changes: 34 additions & 0 deletions metadata-ingestion/src/datahub/configuration/oauth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from enum import Enum
from typing import List, Optional

from pydantic import Field, SecretStr

from datahub.configuration import ConfigModel


class OAuthIdentityProvider(Enum):
MICROSOFT = "microsoft"
OKTA = "okta"


class OAuthConfiguration(ConfigModel):
provider: OAuthIdentityProvider = Field(
description="Identity provider for oauth."
"Supported providers are microsoft and okta."
)
authority_url: str = Field(description="Authority url of your identity provider")
client_id: str = Field(description="client id of your registered application")
scopes: List[str] = Field(description="scopes required to connect to snowflake")
use_certificate: bool = Field(
description="Do you want to use certificate and private key to authenticate using oauth",
default=False,
)
client_secret: Optional[SecretStr] = Field(
description="client secret of the application if use_certificate = false"
)
encoded_oauth_public_key: Optional[str] = Field(
description="base64 encoded certificate content if use_certificate = true"
)
encoded_oauth_private_key: Optional[str] = Field(
description="base64 encoded private key content if use_certificate = true"
)
Original file line number Diff line number Diff line change
@@ -1,31 +1,45 @@
import base64
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union

import msal
import requests
from OpenSSL.crypto import FILETYPE_PEM, load_certificate
from pydantic.types import SecretStr

from datahub.configuration.oauth import OAuthIdentityProvider

logger = logging.getLogger(__name__)


class OauthTokenGenerator:
def __init__(self, client_id, authority_url, provider):
self.client_id = client_id
self.authority_url = authority_url
self.provider = provider
OKTA_SCOPE_DELIMITER = ","


@dataclass
class OAuthTokenGenerator:
client_id: str
authority_url: str
provider: OAuthIdentityProvider
username: Optional[str] = None
password: Optional[SecretStr] = None

def _get_token(
self,
credentials: Union[str, Dict[str, Any]],
scopes: Optional[List[str]],
check_cache: bool,
) -> str:
token = getattr(self, "_get_{}_token".format(self.provider))(
scopes, check_cache, credentials
)
return token
) -> dict:
if self.provider == OAuthIdentityProvider.MICROSOFT:
return self._get_microsoft_token(credentials, scopes, check_cache)
elif self.provider == OAuthIdentityProvider.OKTA:
assert isinstance(credentials, str)
assert scopes is not None
return self._get_okta_token(credentials, scopes)
else:
raise Exception(f"Unknown oauth provider: {self.provider}")

def _get_microsoft_token(self, scopes, check_cache, credentials):
def _get_microsoft_token(self, credentials, scopes, check_cache):
app = msal.ConfidentialClientApplication(
self.client_id, authority=self.authority_url, client_credential=credentials
)
Expand All @@ -38,6 +52,24 @@ def _get_microsoft_token(self, scopes, check_cache, credentials):

return _token

def _get_okta_token(self, credentials: str, scopes: List[str]) -> dict:
data = {
"grant_type": "client_credentials",
"scope": OKTA_SCOPE_DELIMITER.join(scopes),
}
if self.username and self.password:
data["grant_type"] = "password"
data["username"] = self.username
data["password"] = self.password.get_secret_value()

resp = requests.post(
self.authority_url,
headers={"Accept": "application/json"},
auth=(self.client_id, credentials),
data=data,
)
return resp.json()

def get_public_certificate_thumbprint(self, public_cert_str: str) -> str:
cert_str = public_cert_str
certificate = load_certificate(FILETYPE_PEM, cert_str.encode("utf-8"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
OAUTH_AUTHENTICATOR,
)

from datahub.configuration.common import AllowDenyPattern, OauthConfiguration
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.oauth import OAuthConfiguration, OAuthIdentityProvider
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.snowflake.constants import (
CLIENT_PREFETCH_THREADS,
CLIENT_SESSION_KEEP_ALIVE,
)
from datahub.ingestion.source.sql.oauth_generator import OauthTokenGenerator
from datahub.ingestion.source.sql.oauth_generator import OAuthTokenGenerator
from datahub.ingestion.source.sql.sql_config import (
SQLAlchemyConfig,
make_sqlalchemy_uri,
Expand Down Expand Up @@ -69,7 +70,7 @@ class BaseSnowflakeConfig(BaseTimeWindowConfig):
description="Password for your private key. Required if using key pair authentication with encrypted private key.",
)

oauth_config: Optional[OauthConfiguration] = pydantic.Field(
oauth_config: Optional[OAuthConfiguration] = pydantic.Field(
default=None,
description="oauth configuration - https://docs.snowflake.com/en/user-guide/python-connector-example.html#connecting-with-oauth",
)
Expand Down Expand Up @@ -137,48 +138,36 @@ def authenticator_type_is_valid(cls, v, values, field):
f"At least one should be set when using {v} authentication"
)
elif v == "OAUTH_AUTHENTICATOR":
if values.get("oauth_config") is None:
raise ValueError(
f"'oauth_config' is none but should be set when using {v} authentication"
)
if values.get("oauth_config").provider is None:
raise ValueError(
f"'oauth_config.provider' is none "
f"but should be set when using {v} authentication"
)
if values.get("oauth_config").client_id is None:
raise ValueError(
f"'oauth_config.client_id' is none "
f"but should be set when using {v} authentication"
)
if values.get("oauth_config").scopes is None:
cls._check_oauth_config(values.get("oauth_config"))
logger.info(f"using authenticator type '{v}'")
return v

@staticmethod
def _check_oauth_config(oauth_config: Optional[OAuthConfiguration]) -> None:
if oauth_config is None:
raise ValueError(
"'oauth_config' is none but should be set when using OAUTH_AUTHENTICATOR authentication"
)
if oauth_config.use_certificate is True:
if oauth_config.provider == OAuthIdentityProvider.OKTA.value:
raise ValueError(
f"'oauth_config.scopes' was none "
f"but should be set when using {v} authentication"
"Certificate authentication is not supported for Okta."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is always true for okta, it would help to move this to a validator in OAuthConfiguration .

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure you can set up okta auth with public / private keys, I just couldn't get it to work easily with snowflake

)
if values.get("oauth_config").authority_url is None:
if oauth_config.encoded_oauth_private_key is None:
raise ValueError(
f"'oauth_config.authority_url' was none "
f"but should be set when using {v} authentication"
"'base64_encoded_oauth_private_key' was none "
"but should be set when using certificate for oauth_config"
)
if values.get("oauth_config").use_certificate is True:
if values.get("oauth_config").encoded_oauth_private_key is None:
raise ValueError(
"'base64_encoded_oauth_private_key' was none "
"but should be set when using certificate for oauth_config"
)
if values.get("oauth").encoded_oauth_public_key is None:
raise ValueError(
"'base64_encoded_oauth_public_key' was none"
"but should be set when using use_certificate true for oauth_config"
)
elif values.get("oauth_config").client_secret is None:
if oauth_config.encoded_oauth_public_key is None:
raise ValueError(
"'oauth_config.client_secret' was none "
"but should be set when using use_certificate false for oauth_config"
"'base64_encoded_oauth_public_key' was none"
"but should be set when using use_certificate true for oauth_config"
)
logger.info(f"using authenticator type '{v}'")
return v
elif oauth_config.client_secret is None:
raise ValueError(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can some of this validation logic live on the OAuthConfiguration object?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, but I'm not sure what will be snowflake specific and what won't just yet.

"'oauth_config.client_secret' was none "
"but should be set when using use_certificate false for oauth_config"
)

@pydantic.validator("include_view_lineage")
def validate_include_view_lineage(cls, v, values):
Expand Down Expand Up @@ -297,14 +286,16 @@ def get_options(self) -> dict:
self.options["connect_args"] = options_connect_args
return self.options

def get_oauth_connection(self):
def get_oauth_connection(self) -> snowflake.connector.SnowflakeConnection:
assert (
self.oauth_config
), "oauth_config should be provided if using oauth based authentication"
generator = OauthTokenGenerator(
self.oauth_config.client_id,
self.oauth_config.authority_url,
self.oauth_config.provider,
generator = OAuthTokenGenerator(
client_id=self.oauth_config.client_id,
authority_url=self.oauth_config.authority_url,
provider=self.oauth_config.provider,
username=self.username,
password=self.password,
)
if self.oauth_config.use_certificate:
response = generator.get_token_with_certificate(
Expand All @@ -313,11 +304,18 @@ def get_oauth_connection(self):
scopes=self.oauth_config.scopes,
)
else:
assert self.oauth_config.client_secret
response = generator.get_token_with_secret(
secret=str(self.oauth_config.client_secret),
secret=str(self.oauth_config.client_secret.get_secret_value()),
scopes=self.oauth_config.scopes,
)
token = response["access_token"]
try:
token = response["access_token"]
except KeyError:
raise ValueError(
f"access_token not found in response {response}. "
"Please check your OAuth configuration."
)
connect_args = self.get_options()["connect_args"]
return snowflake.connector.connect(
user=self.username,
Expand Down
Loading