-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Non Nullable DatasetConfig.ctl_dataset_id Field #2046
Changes from all commits
b761646
4b7c661
fe2e04d
99b5493
29aeda0
4596857
398d04e
04864c9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
"""add datasetconfig.ctl_dataset_id fk | ||
|
||
Adding a FK to datasetconfig pointing to the ctl_datasets table. | ||
Also try to automigrate datasetconfig.datasets to the ctl_datasets row | ||
|
||
Revision ID: 216cdc7944f1 | ||
Revises: 2fb48b0e268b | ||
Create Date: 2022-12-09 22:03:51.097585 | ||
|
||
""" | ||
import json | ||
import uuid | ||
from typing import Any, Dict | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
from fideslang.models import Dataset | ||
from sqlalchemy import text | ||
|
||
# revision identifiers, used by Alembic. | ||
from sqlalchemy.exc import IntegrityError | ||
from sqlalchemy.sql.elements import TextClause | ||
|
||
revision = "216cdc7944f1" | ||
down_revision = "2fb48b0e268b" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
AUTO_MIGRATED_STRING = "auto-migrated from datasetconfig.dataset" | ||
|
||
|
||
def upgrade(): | ||
# Schema migration - add a nullable datasetconfig.ctl_dataset_id field. We will shortly make it non-nullable. | ||
op.add_column( | ||
"datasetconfig", sa.Column("ctl_dataset_id", sa.String(), nullable=True) | ||
) | ||
op.create_index( | ||
op.f("ix_datasetconfig_ctl_dataset_id"), | ||
"datasetconfig", | ||
["ctl_dataset_id"], | ||
unique=False, | ||
) | ||
op.create_foreign_key( | ||
"datasetconfig_ctl_dataset_id_fkey", | ||
"datasetconfig", | ||
"ctl_datasets", | ||
["ctl_dataset_id"], | ||
["id"], | ||
) | ||
|
||
# Data migration - automatically try to port datasetconfig.dataset -> ctl_datasets if possible. | ||
bind = op.get_bind() | ||
existing_datasetconfigs = bind.execute( | ||
text("SELECT id, created_at, updated_at, dataset FROM datasetconfig;") | ||
) | ||
for row in existing_datasetconfigs: | ||
dataset: Dict[str, Any] = row["dataset"] | ||
fides_key: str = dataset["fides_key"] | ||
|
||
insert_into_ctl_datasets_query: TextClause = text( | ||
"INSERT INTO ctl_datasets (id, fides_key, organization_fides_key, name, description, meta, data_categories, " | ||
"collections, data_qualifier, created_at, updated_at, joint_controller, retention, fides_meta, third_country_transfers, tags) " | ||
"VALUES (:id, :fides_key, :organization_fides_key, :name, :description, :meta, :data_categories, :collections, " | ||
":data_qualifier, :created_at, :updated_at, :joint_controller, :retention, :fides_meta, :third_country_transfers, :tags)" | ||
) | ||
|
||
new_ctl_dataset_id: str = "ctl_" + str(uuid.uuid4()) | ||
# Stashing extra text into the "meta" column so we can use this to downgrade if needed | ||
appended_meta: Dict = dataset["meta"] or {} | ||
appended_meta["fides_source"] = AUTO_MIGRATED_STRING | ||
|
||
validated_dataset: Dict = Dataset( | ||
**dataset | ||
).dict() # Validating before we store. | ||
validated_dataset["id"] = new_ctl_dataset_id | ||
validated_dataset["fides_key"] = fides_key | ||
validated_dataset["collections"] = json.dumps(validated_dataset["collections"]) | ||
validated_dataset["meta"] = json.dumps(appended_meta) | ||
validated_dataset["created_at"] = row["created_at"] | ||
validated_dataset["updated_at"] = row["updated_at"] | ||
validated_dataset["fides_meta"] = dataset.get("fides_meta") or dataset.get( | ||
"fidesops_meta" | ||
) | ||
|
||
try: | ||
bind.execute( | ||
insert_into_ctl_datasets_query, | ||
validated_dataset, | ||
) | ||
except IntegrityError as exc: | ||
raise Exception( | ||
f"Fides attempted to copy datasetconfig.datasets into their own ctl_datasets rows but got error: {exc}. " | ||
f"Adjust fides_keys in ctl_datasets table to not conflict." | ||
) | ||
Comment on lines
+90
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I attempt to create new ctl_dataset records as part of this data migration by default so we can 1) go ahead and make this field non-nullable while 2) not combining existing ctl_datasets and potentially doing it wrong. I talked with Sean about this - he said the plan was to handle conflicts ad hoc with customer? So if there's a conflict, my current plan is that they resolve manually, which differs from more detailed plan spelled out here #1764 |
||
|
||
update_dataset_config_query: TextClause = text( | ||
"UPDATE datasetconfig SET ctl_dataset_id= :new_ctl_dataset_id WHERE id= :datasetconfig_id" | ||
) | ||
|
||
bind.execute( | ||
update_dataset_config_query, | ||
{"new_ctl_dataset_id": new_ctl_dataset_id, "datasetconfig_id": row["id"]}, | ||
) | ||
|
||
|
||
def downgrade(): | ||
# Reverse schema migration | ||
op.drop_constraint( | ||
"datasetconfig_ctl_dataset_id_fkey", "datasetconfig", type_="foreignkey" | ||
) | ||
op.drop_index(op.f("ix_datasetconfig_ctl_dataset_id"), table_name="datasetconfig") | ||
op.drop_column("datasetconfig", "ctl_dataset_id") | ||
|
||
# Reverse data migration: remove ctl_datasets that were automatically created by the forward migration | ||
bind = op.get_bind() | ||
remove_automigrated_ctl_datasets_query: TextClause = text( | ||
"DELETE FROM ctl_datasets WHERE meta->>'fides_source'= :automigration_string" | ||
) | ||
bind.execute( | ||
remove_automigrated_ctl_datasets_query, | ||
{"automigration_string": AUTO_MIGRATED_STRING}, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
"""make datasetconfig.ctl_datasets non-nullable | ||
|
||
Revision ID: 9c6f62e4c9da | ||
Revises: 216cdc7944f1 | ||
Create Date: 2022-12-09 23:56:13.022119 | ||
|
||
""" | ||
import sqlalchemy as sa | ||
from alembic import op | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "9c6f62e4c9da" | ||
down_revision = "216cdc7944f1" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
"""Followup migration to make datasetconfig.ctl_dataset_id non nullable""" | ||
op.alter_column( | ||
"datasetconfig", "ctl_dataset_id", existing_type=sa.VARCHAR(), nullable=False | ||
) | ||
|
||
|
||
def downgrade(): | ||
op.alter_column( | ||
"datasetconfig", "ctl_dataset_id", existing_type=sa.VARCHAR(), nullable=True | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
|
||
from typing import Dict | ||
|
||
from fideslang.models import Dataset as FideslangDataset | ||
from sqlalchemy import ( | ||
ARRAY, | ||
BOOLEAN, | ||
|
@@ -19,6 +20,7 @@ | |
type_coerce, | ||
) | ||
from sqlalchemy.dialects.postgresql import BYTEA | ||
from sqlalchemy.orm import Session | ||
from sqlalchemy.sql import func | ||
from sqlalchemy.sql.sqltypes import DateTime | ||
|
||
|
@@ -200,6 +202,16 @@ class Dataset(Base, FidesBase): | |
retention = Column(String) | ||
third_country_transfers = Column(ARRAY(String)) | ||
|
||
@classmethod | ||
def create_from_dataset_dict(cls, db: Session, dataset: dict) -> "Dataset": | ||
"""Add a method to create directly using a synchronous session""" | ||
validated_dataset: FideslangDataset = FideslangDataset(**dataset) | ||
ctl_dataset = cls(**validated_dataset.dict()) | ||
db.add(ctl_dataset) | ||
db.commit() | ||
db.refresh(ctl_dataset) | ||
return ctl_dataset | ||
Comment on lines
+205
to
+213
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see we have endpoints/methods that already exist ctl-side for creating There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes sense. IIRC the ctl endpoints are fairly generic and constructed differently |
||
|
||
|
||
# Evaluation | ||
class Evaluation(Base): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
from typing import List | ||
from typing import Callable, List | ||
|
||
import yaml | ||
from fastapi import Depends, HTTPException, Request | ||
|
@@ -22,6 +22,7 @@ | |
HTTP_422_UNPROCESSABLE_ENTITY, | ||
) | ||
|
||
from fides.api.ctl.sql_models import Dataset as CtlDataset # type: ignore[attr-defined] | ||
from fides.api.ops.api import deps | ||
from fides.api.ops.api.v1.scope_registry import ( | ||
DATASET_CREATE_OR_UPDATE, | ||
|
@@ -30,6 +31,7 @@ | |
) | ||
from fides.api.ops.api.v1.urn_registry import ( | ||
DATASET_BY_KEY, | ||
DATASET_CONFIGS, | ||
DATASET_VALIDATE, | ||
DATASETS, | ||
V1_URL_PREFIX, | ||
|
@@ -51,6 +53,7 @@ | |
from fides.api.ops.schemas.api import BulkUpdateFailed | ||
from fides.api.ops.schemas.dataset import ( | ||
BulkPutDataset, | ||
DatasetConfigCtlDataset, | ||
DatasetTraversalDetails, | ||
ValidateDatasetResponse, | ||
validate_data_categories_against_db, | ||
|
@@ -157,6 +160,64 @@ def validate_dataset( | |
) | ||
|
||
|
||
@router.patch( | ||
DATASET_CONFIGS, | ||
dependencies=[Security(verify_oauth_client, scopes=[DATASET_CREATE_OR_UPDATE])], | ||
status_code=HTTP_200_OK, | ||
response_model=BulkPutDataset, | ||
) | ||
def patch_dataset_configs( | ||
dataset_pairs: conlist(DatasetConfigCtlDataset, max_items=50), # type: ignore | ||
db: Session = Depends(deps.get_db), | ||
connection_config: ConnectionConfig = Depends(_get_connection_config), | ||
) -> BulkPutDataset: | ||
""" | ||
Endpoint to create or update DatasetConfigs by passing in pairs of: | ||
1) A DatasetConfig fides_key | ||
2) The corresponding CtlDataset fides_key which stores the bulk of the actual dataset | ||
|
||
Currently this endpoint looks up the ctl dataset and writes its contents back to the DatasetConfig.dataset | ||
field for backwards compatibility but soon DatasetConfig.dataset will go away. | ||
|
||
Comment on lines
+169
to
+181
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New endpoint that the UI should switch to using in ops "create a connector" workflow. Andrew described a flow where two endpoints will be hit, the ctl dataset endpoint to create/update that, and then the fides_key of that ctl_dataset will be passed to this endpoint. You could also select the ctl dataset from a dropdown. |
||
""" | ||
created_or_updated: List[Dataset] = [] | ||
failed: List[BulkUpdateFailed] = [] | ||
logger.info("Starting bulk upsert for {} Dataset Configs", len(dataset_pairs)) | ||
|
||
for dataset_pair in dataset_pairs: | ||
logger.info( | ||
"Finding ctl_dataset with key '{}'", dataset_pair.ctl_dataset_fides_key | ||
) | ||
ctl_dataset: CtlDataset = ( | ||
db.query(CtlDataset) | ||
.filter_by(fides_key=dataset_pair.ctl_dataset_fides_key) | ||
.first() | ||
Comment on lines
+191
to
+194
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Originally I was using a ctl-method to get this Dataset but it wasn't playing well with ops tests. It worked fine when I ran the test file by itself but broke down when I ran the whole test suite. with |
||
) | ||
fetched_dataset: Dataset = Dataset.from_orm(ctl_dataset) | ||
|
||
data = { | ||
"connection_config_id": connection_config.id, | ||
"fides_key": dataset_pair.fides_key, | ||
"dataset": fetched_dataset.dict(), | ||
"ctl_dataset_id": ctl_dataset.id, | ||
} | ||
|
||
create_or_update_dataset( | ||
connection_config, | ||
created_or_updated, | ||
data, | ||
fetched_dataset, | ||
db, | ||
failed, | ||
DatasetConfig.create_or_update, | ||
) | ||
|
||
return BulkPutDataset( | ||
succeeded=created_or_updated, | ||
failed=failed, | ||
) | ||
|
||
|
||
@router.patch( | ||
DATASETS, | ||
dependencies=[Security(verify_oauth_client, scopes=[DATASET_CREATE_OR_UPDATE])], | ||
|
@@ -172,10 +233,10 @@ def patch_datasets( | |
Given a list of dataset elements, create or update corresponding Dataset objects | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this endpoint Added some functionality here to make it still usable. If a raw dataset is passed in, I write it to both the DatasetConfig.dataset field and the ctl_dataset record. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should there be a follow up ticket keeping track of all of the soon to be removed/deprecated routes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question Andrew, here's the follow-up ticket, we can wait to deprecate until the UI has been pointed to use the new endpoints #2092 |
||
or report failure | ||
|
||
Use for bulk creating and/or updating datasets. | ||
Use for bulk creating and/or updating DatasetConfig resources. | ||
|
||
If the fides_key for a given dataset exists, it will be treated as an update. | ||
Otherwise, a new dataset will be created. | ||
If the fides_key for a given DatasetConfig exists, it will be treated as an update. | ||
Otherwise, a new DatasetConfig will be created. | ||
""" | ||
|
||
created_or_updated: List[Dataset] = [] | ||
|
@@ -198,7 +259,13 @@ def patch_datasets( | |
"dataset": dataset.dict(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Validation on this endpoint makes sure data categories on the dataset exist in the database. Because we're accessing the database, it's done outside of a typical pydantic validator. If this endpoint goes away, we need a new place to stick this. Does the existing ctl_datasets endpoint have this validation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CC: @ThomasLaPiana. You might be the right person to ask about the ctl side of this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK so it looks like the existing crud endpoints don't do this. The tricky bit is that the crud endpoints are very generic, it's blocks of code that applies to updating an entire set of resources. Added a note to look into the best place to put this in the next ticket #1763 |
||
} | ||
create_or_update_dataset( | ||
connection_config, created_or_updated, data, dataset, db, failed | ||
connection_config, | ||
created_or_updated, | ||
data, | ||
dataset, | ||
db, | ||
failed, | ||
DatasetConfig.upsert_with_ctl_dataset, | ||
) | ||
return BulkPutDataset( | ||
succeeded=created_or_updated, | ||
|
@@ -249,9 +316,10 @@ async def patch_yaml_datasets( | |
connection_config, | ||
created_or_updated, | ||
data, | ||
yaml_request_body, | ||
Dataset(**dataset), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was an existing bug, this function is expecting a Dataset not a dict. This patch_yaml_datasets endpoint is likely being deprecated too though. |
||
db, | ||
failed, | ||
DatasetConfig.upsert_with_ctl_dataset, | ||
) | ||
return BulkPutDataset( | ||
succeeded=created_or_updated, | ||
|
@@ -263,16 +331,19 @@ def create_or_update_dataset( | |
connection_config: ConnectionConfig, | ||
created_or_updated: List[Dataset], | ||
data: dict, | ||
dataset: dict, | ||
dataset: Dataset, | ||
db: Session, | ||
failed: List[BulkUpdateFailed], | ||
create_method: Callable, | ||
) -> None: | ||
try: | ||
if connection_config.connection_type == ConnectionType.saas: | ||
# Validating here instead of on ctl_dataset creation because this only applies | ||
# when a ctl_dataset is being linked to a Saas Connector. | ||
_validate_saas_dataset(connection_config, dataset) # type: ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still validating the saas qualities of a Dataset here, not in ctl_datasets. This validation is only applicable if a dataset is being linked to a Saas connector. This validation wouldn't apply otherwise. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it makes sense to add a comment explaining that this validation only needs to happen here and not in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good call |
||
# Try to find an existing DatasetConfig matching the given connection & key | ||
dataset_config = DatasetConfig.create_or_update(db, data=data) | ||
created_or_updated.append(dataset_config.dataset) | ||
dataset_config = create_method(db, data=data) | ||
created_or_updated.append(dataset_config.ctl_dataset) | ||
except ( | ||
SaaSConfigNotFoundException, | ||
ValidationError, | ||
|
@@ -340,7 +411,7 @@ def get_datasets( | |
params: Params = Depends(), | ||
connection_config: ConnectionConfig = Depends(_get_connection_config), | ||
) -> AbstractPage[Dataset]: | ||
"""Returns all datasets in the database.""" | ||
"""Returns all DatasetConfig datasets in the database.""" | ||
|
||
logger.info( | ||
"Finding all datasets for connection '{}' with pagination params {}", | ||
|
@@ -357,7 +428,7 @@ def get_datasets( | |
# paginated query is handled by paginate() | ||
paginated_results = paginate(dataset_configs, params=params) | ||
paginated_results.items = [ # type: ignore | ||
dataset_config.dataset for dataset_config in paginated_results.items # type: ignore | ||
dataset_config.ctl_dataset for dataset_config in paginated_results.items # type: ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These and similar edits to these GET endpoints move to getting the dataset from the ctl_dataset record, not the DatasetConfig.dataset column. |
||
] | ||
return paginated_results | ||
|
||
|
@@ -389,7 +460,7 @@ def get_dataset( | |
status_code=HTTP_404_NOT_FOUND, | ||
detail=f"No dataset with fides_key '{fides_key}' and connection key {connection_config.key}'", | ||
) | ||
return dataset_config.dataset | ||
return dataset_config.ctl_dataset | ||
|
||
|
||
@router.delete( | ||
|
@@ -403,7 +474,7 @@ def delete_dataset( | |
db: Session = Depends(deps.get_db), | ||
connection_config: ConnectionConfig = Depends(_get_connection_config), | ||
) -> None: | ||
"""Removes the dataset based on the given key.""" | ||
"""Removes the DatasetConfig based on the given key.""" | ||
|
||
logger.info( | ||
"Finding dataset '{}' for connection '{}'", fides_key, connection_config.key | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This yaml file has been adjusted to reflect the new datasetconfig.ctl_dataset_id field