Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Non Nullable DatasetConfig.ctl_dataset_id Field #2046

Merged
3 changes: 3 additions & 0 deletions .fides/db_dataset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,9 @@ dataset:
- name: connection_config_id
data_categories: [system.operations]
data_qualifier: aggregated.anonymized.unlinked_pseudonymized.pseudonymized.identified
- name: ctl_dataset_id
data_categories: [ system.operations ]
data_qualifier: aggregated.anonymized.unlinked_pseudonymized.pseudonymized.identified
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This yaml file has been adjusted to reflect the new datasetconfig.ctl_dataset_id field

- name: created_at
data_categories: [system.operations]
data_qualifier: aggregated.anonymized.unlinked_pseudonymized.pseudonymized.identified
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""add datasetconfig.ctl_dataset_id fk

Adding a FK to datasetconfig pointing to the ctl_datasets table.
Also try to automigrate datasetconfig.datasets to the ctl_datasets row

Revision ID: 216cdc7944f1
Revises: 2fb48b0e268b
Create Date: 2022-12-09 22:03:51.097585

"""
import json
import uuid
from typing import Any, Dict

import sqlalchemy as sa
from alembic import op
from fideslang.models import Dataset
from sqlalchemy import text

# revision identifiers, used by Alembic.
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql.elements import TextClause

revision = "216cdc7944f1"
down_revision = "2fb48b0e268b"
branch_labels = None
depends_on = None

AUTO_MIGRATED_STRING = "auto-migrated from datasetconfig.dataset"


def upgrade():
# Schema migration - add a nullable datasetconfig.ctl_dataset_id field. We will shortly make it non-nullable.
op.add_column(
"datasetconfig", sa.Column("ctl_dataset_id", sa.String(), nullable=True)
)
op.create_index(
op.f("ix_datasetconfig_ctl_dataset_id"),
"datasetconfig",
["ctl_dataset_id"],
unique=False,
)
op.create_foreign_key(
"datasetconfig_ctl_dataset_id_fkey",
"datasetconfig",
"ctl_datasets",
["ctl_dataset_id"],
["id"],
)

# Data migration - automatically try to port datasetconfig.dataset -> ctl_datasets if possible.
bind = op.get_bind()
existing_datasetconfigs = bind.execute(
text("SELECT id, created_at, updated_at, dataset FROM datasetconfig;")
)
for row in existing_datasetconfigs:
dataset: Dict[str, Any] = row["dataset"]
fides_key: str = dataset["fides_key"]

insert_into_ctl_datasets_query: TextClause = text(
"INSERT INTO ctl_datasets (id, fides_key, organization_fides_key, name, description, meta, data_categories, "
"collections, data_qualifier, created_at, updated_at, joint_controller, retention, fides_meta, third_country_transfers, tags) "
"VALUES (:id, :fides_key, :organization_fides_key, :name, :description, :meta, :data_categories, :collections, "
":data_qualifier, :created_at, :updated_at, :joint_controller, :retention, :fides_meta, :third_country_transfers, :tags)"
)

new_ctl_dataset_id: str = "ctl_" + str(uuid.uuid4())
# Stashing extra text into the "meta" column so we can use this to downgrade if needed
appended_meta: Dict = dataset["meta"] or {}
appended_meta["fides_source"] = AUTO_MIGRATED_STRING

validated_dataset: Dict = Dataset(
**dataset
).dict() # Validating before we store.
validated_dataset["id"] = new_ctl_dataset_id
validated_dataset["fides_key"] = fides_key
validated_dataset["collections"] = json.dumps(validated_dataset["collections"])
validated_dataset["meta"] = json.dumps(appended_meta)
validated_dataset["created_at"] = row["created_at"]
validated_dataset["updated_at"] = row["updated_at"]
validated_dataset["fides_meta"] = dataset.get("fides_meta") or dataset.get(
"fidesops_meta"
)

try:
bind.execute(
insert_into_ctl_datasets_query,
validated_dataset,
)
except IntegrityError as exc:
raise Exception(
f"Fides attempted to copy datasetconfig.datasets into their own ctl_datasets rows but got error: {exc}. "
f"Adjust fides_keys in ctl_datasets table to not conflict."
)
Comment on lines +90 to +94
Copy link
Contributor Author

@pattisdr pattisdr Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempt to create new ctl_dataset records as part of this data migration by default so we can 1) go ahead and make this field non-nullable while 2) not combining existing ctl_datasets and potentially doing it wrong.

I talked with Sean about this - he said the plan was to handle conflicts ad hoc with customer? So if there's a conflict, my current plan is that they resolve manually, which differs from more detailed plan spelled out here #1764


update_dataset_config_query: TextClause = text(
"UPDATE datasetconfig SET ctl_dataset_id= :new_ctl_dataset_id WHERE id= :datasetconfig_id"
)

bind.execute(
update_dataset_config_query,
{"new_ctl_dataset_id": new_ctl_dataset_id, "datasetconfig_id": row["id"]},
)


def downgrade():
# Reverse schema migration
op.drop_constraint(
"datasetconfig_ctl_dataset_id_fkey", "datasetconfig", type_="foreignkey"
)
op.drop_index(op.f("ix_datasetconfig_ctl_dataset_id"), table_name="datasetconfig")
op.drop_column("datasetconfig", "ctl_dataset_id")

# Reverse data migration: remove ctl_datasets that were automatically created by the forward migration
bind = op.get_bind()
remove_automigrated_ctl_datasets_query: TextClause = text(
"DELETE FROM ctl_datasets WHERE meta->>'fides_source'= :automigration_string"
)
bind.execute(
remove_automigrated_ctl_datasets_query,
{"automigration_string": AUTO_MIGRATED_STRING},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""make datasetconfig.ctl_datasets non-nullable

Revision ID: 9c6f62e4c9da
Revises: 216cdc7944f1
Create Date: 2022-12-09 23:56:13.022119

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "9c6f62e4c9da"
down_revision = "216cdc7944f1"
branch_labels = None
depends_on = None


def upgrade():
"""Followup migration to make datasetconfig.ctl_dataset_id non nullable"""
op.alter_column(
"datasetconfig", "ctl_dataset_id", existing_type=sa.VARCHAR(), nullable=False
)


def downgrade():
op.alter_column(
"datasetconfig", "ctl_dataset_id", existing_type=sa.VARCHAR(), nullable=True
)
12 changes: 12 additions & 0 deletions src/fides/api/ctl/sql_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from typing import Dict

from fideslang.models import Dataset as FideslangDataset
from sqlalchemy import (
ARRAY,
BOOLEAN,
Expand All @@ -19,6 +20,7 @@
type_coerce,
)
from sqlalchemy.dialects.postgresql import BYTEA
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
from sqlalchemy.sql.sqltypes import DateTime

Expand Down Expand Up @@ -200,6 +202,16 @@ class Dataset(Base, FidesBase):
retention = Column(String)
third_country_transfers = Column(ARRAY(String))

@classmethod
def create_from_dataset_dict(cls, db: Session, dataset: dict) -> "Dataset":
"""Add a method to create directly using a synchronous session"""
validated_dataset: FideslangDataset = FideslangDataset(**dataset)
ctl_dataset = cls(**validated_dataset.dict())
db.add(ctl_dataset)
db.commit()
db.refresh(ctl_dataset)
return ctl_dataset
Comment on lines +205 to +213
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we have endpoints/methods that already exist ctl-side for creating ctl_datasets but there's still a big division between ctl-code largely using asynchronous sessions and ops code largely using synchronous sessions. I don't want to take that on here, so I'm adding a small model method that uses a synchronous session that is used numerous times (largely in testing).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. IIRC the ctl endpoints are fairly generic and constructed differently



# Evaluation
class Evaluation(Base):
Expand Down
95 changes: 82 additions & 13 deletions src/fides/api/ops/api/v1/endpoints/dataset_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import Callable, List

import yaml
from fastapi import Depends, HTTPException, Request
Expand All @@ -22,6 +22,7 @@
HTTP_422_UNPROCESSABLE_ENTITY,
)

from fides.api.ctl.sql_models import Dataset as CtlDataset # type: ignore[attr-defined]
from fides.api.ops.api import deps
from fides.api.ops.api.v1.scope_registry import (
DATASET_CREATE_OR_UPDATE,
Expand All @@ -30,6 +31,7 @@
)
from fides.api.ops.api.v1.urn_registry import (
DATASET_BY_KEY,
DATASET_CONFIGS,
DATASET_VALIDATE,
DATASETS,
V1_URL_PREFIX,
Expand All @@ -51,6 +53,7 @@
from fides.api.ops.schemas.api import BulkUpdateFailed
from fides.api.ops.schemas.dataset import (
BulkPutDataset,
DatasetConfigCtlDataset,
DatasetTraversalDetails,
ValidateDatasetResponse,
validate_data_categories_against_db,
Expand Down Expand Up @@ -157,6 +160,64 @@ def validate_dataset(
)


@router.patch(
DATASET_CONFIGS,
dependencies=[Security(verify_oauth_client, scopes=[DATASET_CREATE_OR_UPDATE])],
status_code=HTTP_200_OK,
response_model=BulkPutDataset,
)
def patch_dataset_configs(
dataset_pairs: conlist(DatasetConfigCtlDataset, max_items=50), # type: ignore
db: Session = Depends(deps.get_db),
connection_config: ConnectionConfig = Depends(_get_connection_config),
) -> BulkPutDataset:
"""
Endpoint to create or update DatasetConfigs by passing in pairs of:
1) A DatasetConfig fides_key
2) The corresponding CtlDataset fides_key which stores the bulk of the actual dataset

Currently this endpoint looks up the ctl dataset and writes its contents back to the DatasetConfig.dataset
field for backwards compatibility but soon DatasetConfig.dataset will go away.

Comment on lines +169 to +181
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New endpoint that the UI should switch to using in ops "create a connector" workflow.

Andrew described a flow where two endpoints will be hit, the ctl dataset endpoint to create/update that, and then the fides_key of that ctl_dataset will be passed to this endpoint. You could also select the ctl dataset from a dropdown.

"""
created_or_updated: List[Dataset] = []
failed: List[BulkUpdateFailed] = []
logger.info("Starting bulk upsert for {} Dataset Configs", len(dataset_pairs))

for dataset_pair in dataset_pairs:
logger.info(
"Finding ctl_dataset with key '{}'", dataset_pair.ctl_dataset_fides_key
)
ctl_dataset: CtlDataset = (
db.query(CtlDataset)
.filter_by(fides_key=dataset_pair.ctl_dataset_fides_key)
.first()
Comment on lines +191 to +194
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I was using a ctl-method to get this Dataset but it wasn't playing well with ops tests. It worked fine when I ran the test file by itself but broke down when I ran the whole test suite. with sqlalchemy.dialects.postgresql.asyncpg.InterfaceError - cannot perform operation: another operation is in progress.

)
fetched_dataset: Dataset = Dataset.from_orm(ctl_dataset)

data = {
"connection_config_id": connection_config.id,
"fides_key": dataset_pair.fides_key,
"dataset": fetched_dataset.dict(),
"ctl_dataset_id": ctl_dataset.id,
}

create_or_update_dataset(
connection_config,
created_or_updated,
data,
fetched_dataset,
db,
failed,
DatasetConfig.create_or_update,
)

return BulkPutDataset(
succeeded=created_or_updated,
failed=failed,
)


@router.patch(
DATASETS,
dependencies=[Security(verify_oauth_client, scopes=[DATASET_CREATE_OR_UPDATE])],
Expand All @@ -172,10 +233,10 @@ def patch_datasets(
Given a list of dataset elements, create or update corresponding Dataset objects
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this endpoint {{host}}/connection/{{connection_key}}/dataset should be deprecated once the UI has been updated to use the new endpoint above.

Added some functionality here to make it still usable. If a raw dataset is passed in, I write it to both the DatasetConfig.dataset field and the ctl_dataset record.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a follow up ticket keeping track of all of the soon to be removed/deprecated routes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question Andrew, here's the follow-up ticket, we can wait to deprecate until the UI has been pointed to use the new endpoints #2092

or report failure

Use for bulk creating and/or updating datasets.
Use for bulk creating and/or updating DatasetConfig resources.

If the fides_key for a given dataset exists, it will be treated as an update.
Otherwise, a new dataset will be created.
If the fides_key for a given DatasetConfig exists, it will be treated as an update.
Otherwise, a new DatasetConfig will be created.
"""

created_or_updated: List[Dataset] = []
Expand All @@ -198,7 +259,13 @@ def patch_datasets(
"dataset": dataset.dict(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validation on this endpoint makes sure data categories on the dataset exist in the database. Because we're accessing the database, it's done outside of a typical pydantic validator. If this endpoint goes away, we need a new place to stick this. Does the existing ctl_datasets endpoint have this validation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC: @ThomasLaPiana. You might be the right person to ask about the ctl side of this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK so it looks like the existing crud endpoints don't do this. The tricky bit is that the crud endpoints are very generic, it's blocks of code that applies to updating an entire set of resources. Added a note to look into the best place to put this in the next ticket #1763

}
create_or_update_dataset(
connection_config, created_or_updated, data, dataset, db, failed
connection_config,
created_or_updated,
data,
dataset,
db,
failed,
DatasetConfig.upsert_with_ctl_dataset,
)
return BulkPutDataset(
succeeded=created_or_updated,
Expand Down Expand Up @@ -249,9 +316,10 @@ async def patch_yaml_datasets(
connection_config,
created_or_updated,
data,
yaml_request_body,
Dataset(**dataset),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an existing bug, this function is expecting a Dataset not a dict. This patch_yaml_datasets endpoint is likely being deprecated too though.

db,
failed,
DatasetConfig.upsert_with_ctl_dataset,
)
return BulkPutDataset(
succeeded=created_or_updated,
Expand All @@ -263,16 +331,17 @@ def create_or_update_dataset(
connection_config: ConnectionConfig,
created_or_updated: List[Dataset],
data: dict,
dataset: dict,
dataset: Dataset,
db: Session,
failed: List[BulkUpdateFailed],
create_method: Callable,
) -> None:
try:
if connection_config.connection_type == ConnectionType.saas:
_validate_saas_dataset(connection_config, dataset) # type: ignore
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still validating the saas qualities of a Dataset here, not in ctl_datasets. This validation is only applicable if a dataset is being linked to a Saas connector. This validation wouldn't apply otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it makes sense to add a comment explaining that this validation only needs to happen here and not in ctl_datasets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call

# Try to find an existing DatasetConfig matching the given connection & key
dataset_config = DatasetConfig.create_or_update(db, data=data)
created_or_updated.append(dataset_config.dataset)
dataset_config = create_method(db, data=data)
created_or_updated.append(dataset_config.ctl_dataset)
except (
SaaSConfigNotFoundException,
ValidationError,
Expand Down Expand Up @@ -340,7 +409,7 @@ def get_datasets(
params: Params = Depends(),
connection_config: ConnectionConfig = Depends(_get_connection_config),
) -> AbstractPage[Dataset]:
"""Returns all datasets in the database."""
"""Returns all DatasetConfig datasets in the database."""

logger.info(
"Finding all datasets for connection '{}' with pagination params {}",
Expand All @@ -357,7 +426,7 @@ def get_datasets(
# paginated query is handled by paginate()
paginated_results = paginate(dataset_configs, params=params)
paginated_results.items = [ # type: ignore
dataset_config.dataset for dataset_config in paginated_results.items # type: ignore
dataset_config.ctl_dataset for dataset_config in paginated_results.items # type: ignore
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These and similar edits to these GET endpoints move to getting the dataset from the ctl_dataset record, not the DatasetConfig.dataset column.

]
return paginated_results

Expand Down Expand Up @@ -389,7 +458,7 @@ def get_dataset(
status_code=HTTP_404_NOT_FOUND,
detail=f"No dataset with fides_key '{fides_key}' and connection key {connection_config.key}'",
)
return dataset_config.dataset
return dataset_config.ctl_dataset


@router.delete(
Expand All @@ -403,7 +472,7 @@ def delete_dataset(
db: Session = Depends(deps.get_db),
connection_config: ConnectionConfig = Depends(_get_connection_config),
) -> None:
"""Removes the dataset based on the given key."""
"""Removes the DatasetConfig based on the given key."""

logger.info(
"Finding dataset '{}' for connection '{}'", fides_key, connection_config.key
Expand Down
3 changes: 2 additions & 1 deletion src/fides/api/ops/api/v1/endpoints/saas_config_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ def instantiate_connection_from_template(
template_values.instance_key,
saas_connector_type,
)

return SaasConnectionTemplateResponse(
connection=connection_config, dataset=dataset_config.dataset
connection=connection_config, dataset=dataset_config.ctl_dataset
)
Loading