Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding namespace fides_meta support for BigQuery datasets #5294

Merged
merged 16 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ types-defusedxml==0.7.0.20240218
expandvars==0.9.0
fastapi[all]==0.111.0
fastapi-pagination[sqlalchemy]==0.12.25
fideslang==3.0.4
fideslang @ git+https://github.com/ethyca/fideslang.git@0d8c203295d6d427b9274db5d9b8815065bdf75b
galvana marked this conversation as resolved.
Show resolved Hide resolved
fideslog==1.2.10
firebase-admin==5.3.0
GitPython==3.1.41
Expand Down
4 changes: 4 additions & 0 deletions src/fides/api/common_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,7 @@ class MissingConfig(Exception):

class MonitorConfigNotFoundException(BaseException):
"""MonitorConfig could not be found"""


class MissingNamespaceSchemaException(BaseException):
"""Attempting to use namespace fides_meta without specifying the schema to validate it."""
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class BigQuerySchema(ConnectionConfigSecretsSchema):
)
dataset: Optional[str] = Field(
default=None,
title="BigQuery Dataset",
description="The dataset within your BigQuery project that contains the tables you want to access.",
title="Default BigQuery Dataset",
description="The default BigQuery dataset that will be used if one isn't provided in the associated Fides datasets.",
galvana marked this conversation as resolved.
Show resolved Hide resolved
)

_required_components: ClassVar[List[str]] = ["keyfile_creds"]
Expand Down
Empty file.
14 changes: 14 additions & 0 deletions src/fides/api/schemas/namespace_meta/bigquery_namespace_meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from fides.api.schemas.namespace_meta.namespace_meta import NamespaceMeta


class BigQueryNamespaceMeta(NamespaceMeta):
"""
Represents the namespace structure for BigQuery queries.

Attributes:
project_id (str): The ID of the Google Cloud project.
dataset_id (str): The ID of the BigQuery dataset.
"""

project_id: str
dataset_id: str
7 changes: 7 additions & 0 deletions src/fides/api/schemas/namespace_meta/namespace_meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from abc import ABC

from pydantic import BaseModel


class NamespaceMeta(BaseModel, ABC):
pass
55 changes: 48 additions & 7 deletions src/fides/api/service/connectors/query_config.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# pylint: disable=too-many-lines
import re
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar
from typing import Any, Dict, Generic, List, Optional, Tuple, Type, TypeVar, cast

import pydash
from boto3.dynamodb.types import TypeSerializer
from loguru import logger
from pydantic import ValidationError
from sqlalchemy import MetaData, Table, text
from sqlalchemy.engine import Engine
from sqlalchemy.sql import Executable, Update # type: ignore
from sqlalchemy.sql.elements import ColumnElement, TextClause

from fides.api.common_exceptions import MissingNamespaceSchemaException
from fides.api.graph.config import (
ROOT_COLLECTION_ADDRESS,
CollectionAddress,
Expand All @@ -21,6 +23,10 @@
from fides.api.graph.execution import ExecutionNode
from fides.api.models.policy import Policy, Rule
from fides.api.models.privacy_request import ManualAction, PrivacyRequest
from fides.api.schemas.namespace_meta.bigquery_namespace_meta import (
BigQueryNamespaceMeta,
)
from fides.api.schemas.namespace_meta.namespace_meta import NamespaceMeta
from fides.api.schemas.policy import ActionType
from fides.api.service.masking.strategy.masking_strategy import MaskingStrategy
from fides.api.service.masking.strategy.masking_strategy_nullify import (
Expand Down Expand Up @@ -343,6 +349,24 @@ class SQLLikeQueryConfig(QueryConfig[T], ABC):
Abstract query config for SQL-like languages (that may not be strictly SQL).
"""

namespace_meta_schema: Optional[Type[NamespaceMeta]] = None

def __init__(self, node: ExecutionNode, namespace_meta: Optional[Dict] = None):
super().__init__(node)
self.namespace_meta: Optional[NamespaceMeta] = None

if namespace_meta is not None:
if self.namespace_meta_schema is None:
raise MissingNamespaceSchemaException(
f"{self.__class__.__name__} must define a namespace_meta_schema when namespace_meta is provided."
)
try:
self.namespace_meta = self.namespace_meta_schema.model_validate(
namespace_meta
)
except ValidationError as exc:
raise ValueError(f"Invalid namespace_meta: {exc}")

def format_fields_for_query(
self,
field_paths: List[FieldPath],
Expand Down Expand Up @@ -810,14 +834,33 @@ class BigQueryQueryConfig(QueryStringWithoutTuplesOverrideQueryConfig):
Generates SQL valid for BigQuery
"""

namespace_meta_schema = BigQueryNamespaceMeta

def _generate_table_name(self) -> str:
"""
Prepends the dataset ID and project ID to the base table name
if the BigQuery namespace meta is provided.
"""

table_name = self.node.collection.name
if self.namespace_meta:
bigquery_namespace_meta = cast(BigQueryNamespaceMeta, self.namespace_meta)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is to satisfy mypy, yeah? i feel like there should be a way to get the self.namespace_meta more properly typed using generics, which feels a bit cleaner. i.e., you've already actually cast this to the right strongly-typed class in the constructor, so it just seems odd you need to 'cast' it again here.

but i also realize that generics can get a bit verbose and convoluted, especially in python...so all good keeping it as you've got it here. probably just my java-trained instincts getting the best of me anyway 😅

table_name = f"{bigquery_namespace_meta.dataset_id}.{table_name}"
if project_id := bigquery_namespace_meta.project_id:
table_name = f"{project_id}.{table_name}"
return table_name

def get_formatted_query_string(
self,
field_list: str,
clauses: List[str],
) -> str:
"""Returns a query string with backtick formatting for tables that have the same names as
BigQuery reserved words."""
return f'SELECT {field_list} FROM `{self.node.collection.name}` WHERE {" OR ".join(clauses)}'
"""
Returns a query string with backtick formatting for tables that have the same names as
BigQuery reserved words.
"""

return f'SELECT {field_list} FROM `{self._generate_table_name()}` WHERE {" OR ".join(clauses)}'
galvana marked this conversation as resolved.
Show resolved Hide resolved

def generate_update(
self, row: Row, policy: Policy, request: PrivacyRequest, client: Engine
Expand All @@ -843,9 +886,7 @@ def generate_update(
)
return None

table = Table(
self.node.address.collection, MetaData(bind=client), autoload=True
)
table = Table(self._generate_table_name(), MetaData(bind=client), autoload=True)
pk_clauses: List[ColumnElement] = [
getattr(table.c, k) == v for k, v in non_empty_primary_keys.items()
]
Expand Down
26 changes: 24 additions & 2 deletions src/fides/api/service/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from google.oauth2 import service_account
from loguru import logger
from snowflake.sqlalchemy import URL as Snowflake_URL
from sqlalchemy import Column, text
from sqlalchemy import Column, select, text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.engine import ( # type: ignore
URL,
Connection,
Expand All @@ -24,6 +25,7 @@
create_engine,
)
from sqlalchemy.exc import InternalError, OperationalError
from sqlalchemy.orm import Session
from sqlalchemy.sql import Executable # type: ignore
from sqlalchemy.sql.elements import TextClause

Expand Down Expand Up @@ -71,6 +73,10 @@
from fides.api.util.collection_util import Row
from fides.config import get_config

from fides.api.models.sql_models import ( # type: ignore[attr-defined] # isort: skip
Dataset as CtlDataset,
)

CONFIG = get_config()

sshtunnel.SSH_TIMEOUT = CONFIG.security.bastion_server_ssh_timeout
Expand Down Expand Up @@ -115,6 +121,18 @@ def default_cursor_result_to_rows(results: LegacyCursorResult) -> List[Row]:
rows.append({col[0]: row_tuple[count] for count, col in enumerate(columns)})
return rows

@staticmethod
def get_namespace_meta(db: Session, dataset: str) -> Optional[Dict[str, Any]]:
galvana marked this conversation as resolved.
Show resolved Hide resolved
"""
Util function to return the namespace meta for a given ctl_dataset.
"""

return db.scalar(
select(CtlDataset.fides_meta["namespace"].cast(JSONB)).where(
galvana marked this conversation as resolved.
Show resolved Hide resolved
CtlDataset.fides_key == dataset
)
)

@abstractmethod
def build_uri(self) -> Optional[str]:
"""Build a database specific uri connection string"""
Expand Down Expand Up @@ -529,7 +547,11 @@ def create_client(self) -> Engine:
# Overrides SQLConnector.query_config
def query_config(self, node: ExecutionNode) -> BigQueryQueryConfig:
"""Query wrapper corresponding to the input execution_node."""
return BigQueryQueryConfig(node)

db: Session = Session.object_session(self.configuration)
return BigQueryQueryConfig(
node, SQLConnector.get_namespace_meta(db, node.address.dataset)
)

# Overrides SQLConnector.test_connection
def test_connection(self) -> Optional[ConnectionTestStatus]:
Expand Down
17 changes: 17 additions & 0 deletions tests/ctl/core/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,23 @@ def test_field_data_categories(db) -> None:
assert ctl_dataset.field_data_categories


@pytest.mark.unit
def test_namespace_meta(db) -> None:
ctl_dataset = CtlDataset.create_from_dataset_dict(
db,
{
"fides_key": f"dataset_key-f{uuid4()}",
"fides_meta": {"namespace": {"dataset_id": "public"}},
"collections": [],
},
)
assert ctl_dataset.fides_meta == {
"resource_id": None,
"after": None,
"namespace": {"dataset_id": "public"},
}


# Generate Dataset Database Integration Tests

# These URLs are for the databases in the docker-compose.integration-tests.yml file
Expand Down
112 changes: 112 additions & 0 deletions tests/fixtures/bigquery_fixtures.py
galvana marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,30 @@ def bigquery_connection_config(db: Session) -> Generator:
connection_config.delete(db)


@pytest.fixture(scope="function")
def bigquery_connection_config_without_default_dataset(db: Session) -> Generator:
connection_config = ConnectionConfig.create(
db=db,
data={
"name": str(uuid4()),
"key": "my_bigquery_config",
"connection_type": ConnectionType.bigquery,
"access": AccessLevel.write,
},
)
# Pulling from integration config file or GitHub secrets
keyfile_creds = integration_config.get("bigquery", {}).get(
"keyfile_creds"
) or ast.literal_eval(os.environ.get("BIGQUERY_KEYFILE_CREDS"))
if keyfile_creds:
schema = BigQuerySchema(keyfile_creds=keyfile_creds)
connection_config.secrets = schema.model_dump(mode="json")
connection_config.save(db=db)

yield connection_config
connection_config.delete(db)


@pytest.fixture
def bigquery_example_test_dataset_config(
bigquery_connection_config: ConnectionConfig,
Expand Down Expand Up @@ -88,6 +112,39 @@ def bigquery_example_test_dataset_config(
ctl_dataset.delete(db=db)


@pytest.fixture
def bigquery_example_test_dataset_config_with_namespace_meta(
bigquery_connection_config_without_default_dataset: ConnectionConfig,
db: Session,
example_datasets: List[Dict],
) -> Generator:
bigquery_dataset = example_datasets[7]
bigquery_dataset["fides_meta"] = {
"namespace": {
"project_id": "silken-precinct-284918",
"dataset_id": "fidesopstest",
}
}
fides_key = bigquery_dataset["fides_key"]
bigquery_connection_config_without_default_dataset.name = fides_key
bigquery_connection_config_without_default_dataset.key = fides_key
bigquery_connection_config_without_default_dataset.save(db=db)

ctl_dataset = CtlDataset.create_from_dataset_dict(db, bigquery_dataset)

dataset = DatasetConfig.create(
db=db,
data={
"connection_config_id": bigquery_connection_config_without_default_dataset.id,
"fides_key": fides_key,
"ctl_dataset_id": ctl_dataset.id,
},
)
yield dataset
dataset.delete(db=db)
ctl_dataset.delete(db=db)


@pytest.fixture(scope="function")
def bigquery_resources(
bigquery_example_test_dataset_config,
Expand Down Expand Up @@ -140,6 +197,61 @@ def bigquery_resources(
connection.execute(stmt)


@pytest.fixture(scope="function")
def bigquery_resources_with_namespace_meta(
bigquery_example_test_dataset_config_with_namespace_meta,
):
bigquery_connection_config = (
bigquery_example_test_dataset_config_with_namespace_meta.connection_config
)
connector = BigQueryConnector(bigquery_connection_config)
bigquery_client = connector.client()
with bigquery_client.connect() as connection:
uuid = str(uuid4())
customer_email = f"customer-{uuid}@example.com"
customer_name = f"{uuid}"

stmt = "select max(id) from fidesopstest.customer;"
res = connection.execute(stmt)
customer_id = res.all()[0][0] + 1

stmt = "select max(id) from fidesopstest.address;"
res = connection.execute(stmt)
address_id = res.all()[0][0] + 1

city = "Test City"
state = "TX"
stmt = f"""
insert into fidesopstest.address (id, house, street, city, state, zip)
values ({address_id}, '{111}', 'Test Street', '{city}', '{state}', '55555');
"""
connection.execute(stmt)

stmt = f"""
insert into fidesopstest.customer (id, email, name, address_id)
values ({customer_id}, '{customer_email}', '{customer_name}', {address_id});
"""
connection.execute(stmt)

yield {
"email": customer_email,
"name": customer_name,
"id": customer_id,
"client": bigquery_client,
"address_id": address_id,
"city": city,
"state": state,
"connector": connector,
"dataset": bigquery_example_test_dataset_config_with_namespace_meta.fides_key,
}
# Remove test data and close BigQuery connection in teardown
stmt = f"delete from fidesopstest.customer where email = '{customer_email}';"
connection.execute(stmt)

stmt = f"delete from fidesopstest.address where id = {address_id};"
connection.execute(stmt)


@pytest.fixture(scope="session")
def bigquery_test_engine() -> Generator:
"""Return a connection to a Google BigQuery Warehouse"""
Expand Down
Loading
Loading