From 6206fffcde4bc89e56ee7c5953a78f31663b732f Mon Sep 17 00:00:00 2001
From: eastandwestwind
Date: Wed, 21 Sep 2022 12:05:29 -0500
Subject: [PATCH 1/2] getting started
---
src/fidesops/main.py | 38 +-
src/fidesops/ops/analytics.py | 12 +-
src/fidesops/ops/analytics_event_factory.py | 83 +++++
.../v1/endpoints/privacy_request_endpoints.py | 39 +-
.../erasure_request_email_fulfillment.html | 8 +-
src/fidesops/ops/graph/analytics_events.py | 52 +--
src/fidesops/ops/models/privacy_request.py | 27 +-
src/fidesops/ops/schemas/email/email.py | 21 +-
.../ops/service/connectors/email_connector.py | 101 +++--
.../service/email/email_dispatch_service.py | 278 +++++++++++++-
.../privacy_request/request_runner_service.py | 129 +++----
src/fidesops/ops/task/graph_task.py | 10 +-
src/fidesops/ops/util/json.py | 6 +
.../test_connection_config_endpoints.py | 25 +-
.../test_privacy_request_endpoints.py | 40 +-
.../ops/graph/test_graph_analytics_events.py | 2 +-
tests/ops/integration_tests/test_execution.py | 4 +-
.../test_integration_email.py | 133 +++----
tests/ops/models/test_privacy_request.py | 23 +-
.../request_runner_service_test.py | 350 +++++++++---------
20 files changed, 869 insertions(+), 512 deletions(-)
create mode 100644 src/fidesops/ops/analytics_event_factory.py
create mode 100644 src/fidesops/ops/util/json.py
diff --git a/src/fidesops/main.py b/src/fidesops/main.py
index 4ef275e03..a3b450721 100644
--- a/src/fidesops/main.py
+++ b/src/fidesops/main.py
@@ -15,16 +15,15 @@
from fideslib.oauth.api.deps import get_db as lib_get_db
from fideslib.oauth.api.deps import verify_oauth_client as lib_verify_oauth_client
from fideslib.oauth.api.routes.user_endpoints import router as user_router
-from fideslog.sdk.python.event import AnalyticsEvent
from redis.exceptions import ResponseError
from starlette.background import BackgroundTask
from starlette.middleware.cors import CORSMiddleware
from starlette.status import HTTP_404_NOT_FOUND
-from fidesops.ops.analytics import (
- accessed_through_local_host,
- in_docker_container,
- send_analytics_event,
+from fidesops.ops.analytics import send_analytics_event
+from fidesops.ops.analytics_event_factory import (
+ endpoint_call_analytics_event,
+ server_start_analytics_event,
)
from fidesops.ops.api.deps import get_api_session, get_config, get_db
from fidesops.ops.api.v1.api import api_router
@@ -36,7 +35,6 @@
)
from fidesops.ops.core.config import config
from fidesops.ops.db.database import init_db
-from fidesops.ops.schemas.analytics import Event, ExtraData
from fidesops.ops.service.connectors.saas.connector_registry_service import (
load_registry,
registry_file,
@@ -116,17 +114,13 @@ async def prepare_and_log_request(
if config.root_user.analytics_opt_out:
return
await send_analytics_event(
- AnalyticsEvent(
- docker=in_docker_container(),
- event=Event.endpoint_call.value,
- event_created_at=event_created_at,
- local_host=accessed_through_local_host(hostname),
- endpoint=endpoint,
- status_code=status_code,
- error=error_class or None,
- extra_data={ExtraData.fides_source.value: fides_source}
- if fides_source
- else None,
+ endpoint_call_analytics_event(
+ endpoint,
+ hostname,
+ status_code,
+ event_created_at,
+ fides_source,
+ error_class,
)
)
@@ -265,15 +259,7 @@ def start_webserver() -> None:
logger.info("Starting scheduled request intake...")
initiate_scheduled_request_intake()
- asyncio.run(
- send_analytics_event(
- AnalyticsEvent(
- docker=in_docker_container(),
- event=Event.server_start.value,
- event_created_at=datetime.now(tz=timezone.utc),
- )
- )
- )
+ asyncio.run(send_analytics_event(server_start_analytics_event()))
if not config.execution.worker_enabled:
logger.info("Starting worker...")
diff --git a/src/fidesops/ops/analytics.py b/src/fidesops/ops/analytics.py
index 9f656c75d..f0a2950db 100644
--- a/src/fidesops/ops/analytics.py
+++ b/src/fidesops/ops/analytics.py
@@ -9,6 +9,7 @@
from fidesops import __version__ as fidesops_version
from fidesops.ops.core.config import config
+from fidesops.ops.util.wrappers import sync
logger = logging.getLogger(__name__)
@@ -35,8 +36,17 @@ def accessed_through_local_host(hostname: Optional[str]) -> bool:
)
+@sync
+async def sync_send_analytics_event_wrapper(event: AnalyticsEvent) -> None:
+ """
+ Calling async functions within Celery tasks is not yet supported: https://github.com/celery/celery/issues/6552
+ so this sync wrapper is a workaround for that.
+ """
+ await send_analytics_event(event)
+
+
async def send_analytics_event(event: AnalyticsEvent) -> None:
- if config.root_user.analytics_opt_out:
+ if config.root_user.analytics_opt_out or not event:
return
try:
await analytics_client._AnalyticsClient__send( # pylint: disable=protected-access
diff --git a/src/fidesops/ops/analytics_event_factory.py b/src/fidesops/ops/analytics_event_factory.py
new file mode 100644
index 000000000..ddc1f29b4
--- /dev/null
+++ b/src/fidesops/ops/analytics_event_factory.py
@@ -0,0 +1,83 @@
+from datetime import datetime, timezone
+from typing import Any, Dict, Optional
+
+from fideslog.sdk.python.event import AnalyticsEvent
+
+from fidesops.ops.analytics import accessed_through_local_host, in_docker_container
+from fidesops.ops.models.policy import ActionType
+from fidesops.ops.models.privacy_request import PrivacyRequest
+from fidesops.ops.schemas.analytics import Event, ExtraData
+
+
+def rerun_graph_analytics_event(
+ data: Dict[str, Any],
+ step: ActionType,
+) -> Optional[AnalyticsEvent]:
+ """Sends an AnalyticsEvent to send to Fideslog with stats on how an access graph
+ has changed from the previous run if applicable"""
+
+ return AnalyticsEvent(
+ docker=in_docker_container(),
+ event="rerun_access_graph"
+ if step == ActionType.access
+ else "rerun_erasure_graph",
+ event_created_at=datetime.now(tz=timezone.utc),
+ local_host=None,
+ endpoint=None,
+ status_code=None,
+ error=None,
+ extra_data=data,
+ )
+
+
+def failed_graph_analytics_event(
+ privacy_request: PrivacyRequest, exc: Optional[BaseException]
+) -> Optional[AnalyticsEvent]:
+ """Sends an AnalyticsEvent to send to Fideslog if privacy request execution has failed."""
+
+ data = {"privacy_request": privacy_request.id}
+
+ return AnalyticsEvent(
+ docker=in_docker_container(),
+ event="privacy_request_execution_failure",
+ event_created_at=datetime.now(tz=timezone.utc),
+ local_host=None,
+ endpoint=None,
+ status_code=500,
+ error=exc.__class__.__name__ if exc else None,
+ extra_data=data,
+ )
+
+
+def server_start_analytics_event() -> Optional[AnalyticsEvent]:
+ """Sends an AnalyticsEvent to send to Fideslog upon server start"""
+
+ return AnalyticsEvent(
+ docker=in_docker_container(),
+ event=Event.server_start.value,
+ event_created_at=datetime.now(tz=timezone.utc),
+ )
+
+
+def endpoint_call_analytics_event(
+ endpoint: str,
+ hostname: Optional[str],
+ status_code: int,
+ event_created_at: datetime,
+ fides_source: Optional[str],
+ error_class: Optional[str],
+) -> Optional[AnalyticsEvent]:
+ """Sends an AnalyticsEvent to send to Fideslog upon endpoint calls"""
+
+ return AnalyticsEvent(
+ docker=in_docker_container(),
+ event=Event.endpoint_call.value,
+ event_created_at=event_created_at,
+ local_host=accessed_through_local_host(hostname),
+ endpoint=endpoint,
+ status_code=status_code,
+ error=error_class or None,
+ extra_data={ExtraData.fides_source.value: fides_source}
+ if fides_source
+ else None,
+ )
diff --git a/src/fidesops/ops/api/v1/endpoints/privacy_request_endpoints.py b/src/fidesops/ops/api/v1/endpoints/privacy_request_endpoints.py
index cf34c09ec..c91627125 100644
--- a/src/fidesops/ops/api/v1/endpoints/privacy_request_endpoints.py
+++ b/src/fidesops/ops/api/v1/endpoints/privacy_request_endpoints.py
@@ -74,6 +74,7 @@
from fidesops.ops.graph.traversal import Traversal
from fidesops.ops.models.connectionconfig import ConnectionConfig
from fidesops.ops.models.datasetconfig import DatasetConfig
+from fidesops.ops.models.email import EmailConfig
from fidesops.ops.models.manual_webhook import AccessManualWebhook
from fidesops.ops.models.policy import ActionType, CurrentStep, Policy, PolicyPreWebhook
from fidesops.ops.models.privacy_request import (
@@ -91,7 +92,7 @@
EmailActionType,
FidesopsEmail,
RequestReceiptBodyParams,
- RequestReviewDenyBodyParams,
+ RequestReviewDenyBodyParams, SubjectIdentityVerificationBodyParams,
)
from fidesops.ops.schemas.external_https import PrivacyRequestResumeFormat
from fidesops.ops.schemas.privacy_request import (
@@ -109,9 +110,12 @@
VerificationCode,
)
from fidesops.ops.service._verification import send_verification_code_to_user
-from fidesops.ops.service.email.email_dispatch_service import dispatch_email_task
+from fidesops.ops.service.email.email_dispatch_service import (
+ dispatch_email_task_generic,
+ dispatch_email_task_identity_verification,
+)
from fidesops.ops.service.privacy_request.request_runner_service import (
- queue_privacy_request,
+ queue_privacy_request, generate_id_verification_code,
)
from fidesops.ops.service.privacy_request.request_service import (
build_required_privacy_request_kwargs,
@@ -283,6 +287,31 @@ async def create_privacy_request(
)
+def _send_verification_code_to_user(
+ db: Session, privacy_request: PrivacyRequest, email: Optional[str]
+) -> None:
+ """Generate and cache a verification code, and then email to the user"""
+ EmailConfig.get_configuration(
+ db=db
+ ) # Validates Fidesops is currently configured to send emails
+ verification_code: str = generate_id_verification_code()
+ privacy_request.cache_identity_verification_code(verification_code)
+ dispatch_email_task_identity_verification.apply_async(
+ queue=EMAIL_QUEUE_NAME,
+ kwargs={
+ "email_meta": FidesopsEmail(
+ action_type=EmailActionType.SUBJECT_IDENTITY_VERIFICATION,
+ body_params=SubjectIdentityVerificationBodyParams(
+ verification_code=verification_code,
+ verification_code_ttl_seconds=config.redis.identity_verification_code_ttl_seconds,
+ ),
+ ).dict(),
+ "to_email": email,
+ "privacy_request_id": privacy_request.id,
+ },
+ )
+
+
def _send_privacy_request_receipt_email_to_user(
policy: Optional[Policy], email: Optional[str]
) -> None:
@@ -305,7 +334,7 @@ def _send_privacy_request_receipt_email_to_user(
for action_type in ActionType:
if policy.get_rules_for_action(action_type=ActionType(action_type)):
request_types.add(action_type)
- dispatch_email_task.apply_async(
+ dispatch_email_task_generic.apply_async(
queue=EMAIL_QUEUE_NAME,
kwargs={
"email_meta": FidesopsEmail(
@@ -1103,7 +1132,7 @@ def _send_privacy_request_review_email_to_user(
"Identity email was not found, so request review email could not be sent."
)
)
- dispatch_email_task.apply_async(
+ dispatch_email_task_generic.apply_async(
queue=EMAIL_QUEUE_NAME,
kwargs={
"email_meta": FidesopsEmail(
diff --git a/src/fidesops/ops/email_templates/templates/erasure_request_email_fulfillment.html b/src/fidesops/ops/email_templates/templates/erasure_request_email_fulfillment.html
index 5f7a08d77..c1c11c1cd 100644
--- a/src/fidesops/ops/email_templates/templates/erasure_request_email_fulfillment.html
+++ b/src/fidesops/ops/email_templates/templates/erasure_request_email_fulfillment.html
@@ -11,18 +11,18 @@
Please locate and erase personally identifiable information for the data subject and records listed below:
{% for action_required in dataset_collection_action_required -%}
- {{ action_required.collection.collection }}
+ {{ action_required["collection"]["collection"] }}
{% for action in action_required.action_needed -%}
Locate the relevant records with:
- {% for field, values in action.locators.items() -%}
+ {% for field, values in action["locators"].items() -%}
- Field: {{ field }}, Values: {{ values|join(', ') }}
{%- endfor %}
Erase the following fields:
- {% if action.update -%}
+ {% if action["update"] -%}
- {% for field_name, masking_strategy in action.update.items() -%}
+ {% for field_name, masking_strategy in action["update"].items() -%}
- {{field_name}}
{%- endfor %}
diff --git a/src/fidesops/ops/graph/analytics_events.py b/src/fidesops/ops/graph/analytics_events.py
index 1fa097a96..2dc7b09f2 100644
--- a/src/fidesops/ops/graph/analytics_events.py
+++ b/src/fidesops/ops/graph/analytics_events.py
@@ -1,10 +1,8 @@
-from datetime import datetime, timezone
from typing import TYPE_CHECKING, Dict, List, Optional
from fideslog.sdk.python.event import AnalyticsEvent
-from fidesops.ops.analytics import in_docker_container, send_analytics_event
-from fidesops.ops.core.config import config
+from fidesops.ops.analytics_event_factory import rerun_graph_analytics_event
from fidesops.ops.graph.config import CollectionAddress
from fidesops.ops.graph.graph_differences import (
GraphDiffSummary,
@@ -21,22 +19,6 @@
from fidesops.ops.task.graph_task import GraphTask
-async def fideslog_graph_failure(event: Optional[AnalyticsEvent]) -> None:
- """Send an Analytics Event if privacy request execution has failed"""
- if config.root_user.analytics_opt_out or not event:
- return
-
- await send_analytics_event(event)
-
-
-async def fideslog_graph_rerun(event: Optional[AnalyticsEvent]) -> None:
- """Send an Analytics Event if a privacy request has been reprocessed, comparing its graph to the previous graph"""
- if config.root_user.analytics_opt_out or not event:
- return
-
- await send_analytics_event(event)
-
-
def prepare_rerun_graph_analytics_event(
privacy_request: PrivacyRequest,
env: Dict[CollectionAddress, "GraphTask"],
@@ -73,34 +55,4 @@ def prepare_rerun_graph_analytics_event(
data = graph_diff_summary.dict()
data["privacy_request"] = privacy_request.id
- return AnalyticsEvent(
- docker=in_docker_container(),
- event="rerun_access_graph"
- if step == ActionType.access
- else "rerun_erasure_graph",
- event_created_at=datetime.now(tz=timezone.utc),
- local_host=None,
- endpoint=None,
- status_code=None,
- error=None,
- extra_data=data,
- )
-
-
-def failed_graph_analytics_event(
- privacy_request: PrivacyRequest, exc: Optional[BaseException]
-) -> Optional[AnalyticsEvent]:
- """Prepares an AnalyticsEvent to send to Fideslog if privacy request execution has failed."""
-
- data = {"privacy_request": privacy_request.id}
-
- return AnalyticsEvent(
- docker=in_docker_container(),
- event="privacy_request_execution_failure",
- event_created_at=datetime.now(tz=timezone.utc),
- local_host=None,
- endpoint=None,
- status_code=500,
- error=exc.__class__.__name__ if exc else None,
- extra_data=data,
- )
+ return rerun_graph_analytics_event(data=data, step=step)
diff --git a/src/fidesops/ops/models/privacy_request.py b/src/fidesops/ops/models/privacy_request.py
index e1fa94c0d..668c7a0b8 100644
--- a/src/fidesops/ops/models/privacy_request.py
+++ b/src/fidesops/ops/models/privacy_request.py
@@ -69,6 +69,7 @@
)
from fidesops.ops.util.collection_util import Row
from fidesops.ops.util.constants import API_DATE_FORMAT
+from fidesops.ops.util.json import get_json
logger = logging.getLogger(__name__)
@@ -95,7 +96,19 @@ class ManualAction(BaseSchema):
update: Optional[Dict[str, Any]]
-class CheckpointActionRequired(BaseSchema):
+class CheckpointActionRequiredBase(BaseSchema):
+ """Base actions needed for particular checkpoint
+ Used for async task execution, where Enums cannot be parsed
+ """
+
+ collection: Optional[CollectionAddress]
+ action_needed: Optional[List[ManualAction]] = None
+
+ class Config:
+ arbitrary_types_allowed = True
+
+
+class CheckpointActionRequired(CheckpointActionRequiredBase):
"""Describes actions needed on a particular checkpoint.
Examples are a paused collection that needs manual input, a failed collection that
@@ -104,11 +117,6 @@ class CheckpointActionRequired(BaseSchema):
"""
step: CurrentStep
- collection: Optional[CollectionAddress]
- action_needed: Optional[List[ManualAction]] = None
-
- class Config:
- arbitrary_types_allowed = True
EmailRequestFulfillmentBodyParams = Dict[
@@ -422,17 +430,18 @@ def cache_email_connector_template_contents(
def get_email_connector_template_contents_by_dataset(
self, step: CurrentStep, dataset: str
- ) -> List[CheckpointActionRequired]:
+ ) -> List[Dict[str, Optional[Any]]]:
"""Retrieve the raw details to populate an email template for collections on a given dataset."""
cache: FidesopsRedis = get_cache()
email_contents: Dict[str, Optional[Any]] = cache.get_encoded_objects_by_prefix(
f"EMAIL_INFORMATION__{self.id}__{step.value}__{dataset}"
)
- actions: List[CheckpointActionRequired] = []
+ actions: List[Dict[str, Optional[Any]]] = []
for email_content in email_contents.values():
if email_content:
- actions.append(CheckpointActionRequired.parse_obj(email_content))
+ email_content.pop("step", None)
+ actions.append(get_json(email_content))
return actions
def cache_paused_collection_details(
diff --git a/src/fidesops/ops/schemas/email/email.py b/src/fidesops/ops/schemas/email/email.py
index 129f022d6..d8aefaec3 100644
--- a/src/fidesops/ops/schemas/email/email.py
+++ b/src/fidesops/ops/schemas/email/email.py
@@ -3,7 +3,7 @@
from pydantic import BaseModel, Extra
-from fidesops.ops.models.privacy_request import CheckpointActionRequired
+from fidesops.ops.models.privacy_request import ManualAction
from fidesops.ops.schemas import Msg
from fidesops.ops.schemas.shared_schemas import FidesOpsKey
@@ -66,6 +66,23 @@ class RequestReviewDenyBodyParams(BaseModel):
rejection_reason: Optional[str]
+class CollectionAddress(BaseModel):
+ dataset: str
+ collection: str
+ value: str
+
+
+class EmailConnectorErasureBodyParams(BaseModel):
+ collection: Optional[CollectionAddress]
+ action_needed: Optional[List[ManualAction]] = None
+
+
+class EmailConnectorEmail(BaseModel):
+ email_meta: Dict[str, Any]
+ to_email: str
+ dataset_key: FidesOpsKey
+
+
class FidesopsEmail(
BaseModel,
smart_union=True,
@@ -80,7 +97,7 @@ class FidesopsEmail(
RequestReceiptBodyParams,
RequestReviewDenyBodyParams,
AccessRequestCompleteBodyParams,
- List[CheckpointActionRequired],
+ List[EmailConnectorErasureBodyParams],
]
]
diff --git a/src/fidesops/ops/service/connectors/email_connector.py b/src/fidesops/ops/service/connectors/email_connector.py
index de095fb0a..a57d7a035 100644
--- a/src/fidesops/ops/service/connectors/email_connector.py
+++ b/src/fidesops/ops/service/connectors/email_connector.py
@@ -1,7 +1,6 @@
import logging
from typing import Any, Dict, List, Optional
-from fideslib.models.audit_log import AuditLog, AuditLogAction
from sqlalchemy.orm import Session
from fidesops.ops.common_exceptions import (
@@ -23,11 +22,16 @@
PrivacyRequest,
)
from fidesops.ops.schemas.connection_configuration import EmailSchema
-from fidesops.ops.schemas.email.email import EmailActionType
+from fidesops.ops.schemas.email.email import EmailActionType, FidesopsEmail
from fidesops.ops.service.connectors.base_connector import BaseConnector
from fidesops.ops.service.connectors.query_config import ManualQueryConfig
-from fidesops.ops.service.email.email_dispatch_service import dispatch_email
+from fidesops.ops.service.email.email_dispatch_service import (
+ dispatch_email,
+ dispatch_email_task_email_connector,
+)
+from fidesops.ops.tasks import EMAIL_QUEUE_NAME
from fidesops.ops.util.collection_util import Row, append
+from fidesops.ops.util.json import get_json
logger = logging.getLogger(__name__)
@@ -55,24 +59,28 @@ def test_connection(self) -> Optional[ConnectionTestStatus]:
db = Session.object_session(self.configuration)
try:
- # synchronous for now since failure to send is considered a connection test failure
+ # synchronous since failure to send is considered a connection test failure
dispatch_email(
db=db,
action_type=EmailActionType.EMAIL_ERASURE_REQUEST_FULFILLMENT,
to_email=config.test_email,
email_body_params=[
- CheckpointActionRequired(
- step=CurrentStep.erasure,
- collection=CollectionAddress("test_dataset", "test_collection"),
- action_needed=[
- ManualAction(
- locators={"id": ["example_id"]},
- get=None,
- update={
- "test_field": "null_rewrite",
- },
- )
- ],
+ get_json(
+ CheckpointActionRequired(
+ step=CurrentStep.erasure,
+ collection=CollectionAddress(
+ "test_dataset", "test_collection"
+ ),
+ action_needed=[
+ ManualAction(
+ locators={"id": ["example_id"]},
+ get=None,
+ update={
+ "test_field": "null_rewrite",
+ },
+ )
+ ],
+ )
)
],
)
@@ -158,18 +166,28 @@ def build_masking_instructions(
return ManualAction(locators=locators, update=mask_map if mask_map else None)
-def email_connector_erasure_send(db: Session, privacy_request: PrivacyRequest) -> None:
+def email_connector_erasure_send(
+ db: Session,
+ privacy_request: PrivacyRequest,
+ policy: Policy,
+ identity_data: Dict[str, Any],
+ access_result_urls: List[str] = None,
+) -> bool:
"""
Send emails to configured third-parties with instructions on how to erase remaining data.
Combined all the collections on each email-based dataset into one email.
+ Returns true if one or more emails were attempted
"""
email_dataset_configs = db.query(DatasetConfig, ConnectionConfig).filter(
DatasetConfig.connection_config_id == ConnectionConfig.id,
ConnectionConfig.connection_type == ConnectionType.email,
)
+
+ emails_to_send: List[Dict[str, Any]] = []
for ds, cc in email_dataset_configs:
+ # coerce to dicts so that email task can ingest
template_values: List[
- CheckpointActionRequired
+ Dict[str, Optional[Any]]
] = privacy_request.get_email_connector_template_contents_by_dataset(
CurrentStep.erasure, ds.dataset.get("fides_key")
)
@@ -179,12 +197,12 @@ def email_connector_erasure_send(db: Session, privacy_request: PrivacyRequest) -
"No email sent: no template values saved for '%s'",
ds.dataset.get("fides_key"),
)
- return
+ continue
if not any(
(
- action_required.action_needed[0].update
- if action_required and action_required.action_needed
+ action_required["action_needed"][0].update
+ if action_required and action_required["action_needed"]
else False
for action_required in template_values
)
@@ -192,26 +210,29 @@ def email_connector_erasure_send(db: Session, privacy_request: PrivacyRequest) -
logger.info(
"No email sent: no masking needed on '%s'", ds.dataset.get("fides_key")
)
- return
-
- dispatch_email(
- db,
- action_type=EmailActionType.EMAIL_ERASURE_REQUEST_FULFILLMENT,
- to_email=cc.secrets.get("to_email"),
- email_body_params=template_values,
- )
-
- logger.info(
- "Email send succeeded for request '%s' for dataset: '%s'",
- privacy_request.id,
- ds.dataset.get("fides_key"),
+ continue
+
+ emails_to_send.append(
+ {
+ "email_meta": FidesopsEmail(
+ action_type=EmailActionType.EMAIL_ERASURE_REQUEST_FULFILLMENT,
+ body_params=template_values,
+ ).dict(),
+ "to_email": cc.secrets.get("to_email"),
+ "dataset_key": ds.dataset.get("fides_key"),
+ }
)
- AuditLog.create(
- db=db,
- data={
- "user_id": "system",
+ if len(emails_to_send) > 0:
+ print("calling email task")
+ dispatch_email_task_email_connector.apply_async(
+ queue=EMAIL_QUEUE_NAME,
+ kwargs={
+ "emails_to_send": emails_to_send,
+ "policy_key": policy.key,
"privacy_request_id": privacy_request.id,
- "action": AuditLogAction.email_sent,
- "message": f"Erasure email instructions dispatched for '{ds.dataset.get('fides_key')}'",
+ "access_result_urls": access_result_urls,
+ "identity_data": identity_data,
},
)
+ return True
+ return False
diff --git a/src/fidesops/ops/service/email/email_dispatch_service.py b/src/fidesops/ops/service/email/email_dispatch_service.py
index f62d2a4ad..667e28d6a 100644
--- a/src/fidesops/ops/service/email/email_dispatch_service.py
+++ b/src/fidesops/ops/service/email/email_dispatch_service.py
@@ -1,18 +1,36 @@
+# pylint: disable=W0223
from __future__ import annotations
import logging
-from typing import Any, Dict, List, Optional, Union
+from datetime import datetime
+from typing import Any, Dict, List, Optional, Tuple, Union
import requests
+from fideslib.models.audit_log import AuditLog, AuditLogAction
from sqlalchemy.orm import Session
+from fidesops.ops.analytics import sync_send_analytics_event_wrapper
+from fidesops.ops.analytics_event_factory import failed_graph_analytics_event
from fidesops.ops.common_exceptions import EmailDispatchException
+from fidesops.ops.core.config import config
from fidesops.ops.email_templates import get_email_template
from fidesops.ops.models.email import EmailConfig
-from fidesops.ops.models.privacy_request import CheckpointActionRequired
+from fidesops.ops.models.policy import (
+ ActionType,
+ CurrentStep,
+ Policy,
+ PolicyPostWebhook,
+)
+from fidesops.ops.models.privacy_request import (
+ PrivacyRequest,
+ PrivacyRequestStatus,
+ ProvidedIdentityType,
+)
from fidesops.ops.schemas.email.email import (
AccessRequestCompleteBodyParams,
EmailActionType,
+ EmailConnectorEmail,
+ EmailConnectorErasureBodyParams,
EmailForActionType,
EmailServiceDetails,
EmailServiceSecrets,
@@ -22,17 +40,259 @@
RequestReviewDenyBodyParams,
SubjectIdentityVerificationBodyParams,
)
-from fidesops.ops.tasks import DatabaseTask, celery_app
-from fidesops.ops.util.logger import Pii
+from fidesops.ops.tasks import EMAIL_QUEUE_NAME, DatabaseTask, celery_app
+from fidesops.ops.util.logger import Pii, _log_exception
logger = logging.getLogger(__name__)
-@celery_app.task(base=DatabaseTask, bind=True)
-def dispatch_email_task(
+class RequestCompletionBase(DatabaseTask):
+ """
+ A wrapper class to handle specific success/failure cases for request completion emails
+ """
+
+ def on_failure(
+ self,
+ exc: Exception,
+ task_id: str,
+ args: Tuple,
+ kwargs: Dict[str, Any],
+ einfo: Any,
+ ) -> None:
+ privacy_request_id = kwargs["privacy_request_id"]
+
+ with self.session as db:
+ privacy_request = PrivacyRequest.get(db, object_id=privacy_request_id)
+ privacy_request.error_processing(db=db)
+ # If dev mode, log traceback
+ sync_send_analytics_event_wrapper(
+ failed_graph_analytics_event(privacy_request, exc)
+ )
+ _log_exception(exc, config.dev_mode)
+
+ def on_success(
+ self, retval: Any, task_id: str, args: Tuple, kwargs: Dict[str, Any]
+ ) -> None:
+ privacy_request_id = kwargs["privacy_request_id"]
+ with self.session as db:
+ privacy_request = PrivacyRequest.get(db, object_id=privacy_request_id)
+ privacy_request.finished_processing_at = datetime.utcnow()
+ AuditLog.create(
+ db=db,
+ data={
+ "user_id": "system",
+ "privacy_request_id": privacy_request.id,
+ "action": AuditLogAction.finished,
+ "message": "",
+ },
+ )
+ privacy_request.status = PrivacyRequestStatus.complete
+ logging.info("Privacy request %s run completed.", privacy_request.id)
+ privacy_request.save(db=db)
+
+
+@celery_app.task(base=RequestCompletionBase, bind=True)
+def dispatch_email_task_request_completion(
+ self: DatabaseTask,
+ privacy_request_id: str,
+ to_email: str,
+ access_result_urls: List[str],
+ policy_id: str,
+) -> None:
+ """
+ A wrapper function to dispatch an email task into the Celery queues
+ """
+ logger.info("Dispatching email for request completion")
+ with self.session as db:
+ policy = Policy.get(db, object_id=policy_id)
+ if policy.get_rules_for_action(action_type=ActionType.access):
+ dispatch_email(
+ db,
+ EmailActionType.PRIVACY_REQUEST_COMPLETE_ACCESS,
+ to_email,
+ AccessRequestCompleteBodyParams(download_links=access_result_urls),
+ )
+ if policy.get_rules_for_action(action_type=ActionType.erasure):
+ dispatch_email(
+ db,
+ EmailActionType.PRIVACY_REQUEST_COMPLETE_DELETION,
+ to_email,
+ None,
+ )
+
+
+class IdentityVerificationBase(DatabaseTask):
+ """
+ A wrapper class to handle specific success/failure cases for identity verification emails
+ """
+
+ def on_failure(
+ self,
+ exc: Exception,
+ task_id: str,
+ args: Tuple,
+ kwargs: Dict[str, Any],
+ einfo: Any,
+ ) -> None:
+ privacy_request_id = kwargs["privacy_request_id"]
+ with self.session as db:
+ # fixme: net new functionality- needs review. Previously we didn't set privacy request to error upon failure to send verification emil
+ # we only returned failure for the create privacy request api response.
+ privacy_request = PrivacyRequest.get(db, object_id=privacy_request_id)
+ privacy_request.error_processing(db=db)
+ # If dev mode, log traceback
+ _log_exception(exc, config.dev_mode)
+
+
+@celery_app.task(base=IdentityVerificationBase, bind=True)
+def dispatch_email_task_identity_verification(
self: DatabaseTask,
email_meta: Dict[str, Any],
to_email: str,
+ privacy_request_id: str,
+) -> None:
+ """
+ A wrapper function to dispatch an email task into the Celery queues
+ """
+ schema = FidesopsEmail.parse_obj(email_meta)
+ with self.session as db:
+ dispatch_email(
+ db,
+ schema.action_type,
+ to_email,
+ schema.body_params,
+ )
+
+
+class EmailTaskEmailConnector(DatabaseTask):
+ """
+ A wrapper class to handle specific failure case for email connectors
+ """
+
+ def on_failure(
+ self,
+ exc: Exception,
+ task_id: str,
+ args: Tuple,
+ kwargs: Dict[str, Any],
+ einfo: Any,
+ ) -> None:
+
+ privacy_request_id = kwargs["privacy_request_id"]
+ with self.session as db:
+ # fixme: net new functionality- needs review. Previously we didn't set privacy request to error state upon email failure
+ privacy_request = PrivacyRequest.get(db, object_id=privacy_request_id)
+ privacy_request.cache_failed_checkpoint_details(
+ step=CurrentStep.erasure_email_post_send, collection=None
+ )
+ privacy_request.error_processing(db=db)
+ sync_send_analytics_event_wrapper(
+ failed_graph_analytics_event(privacy_request, exc)
+ )
+ # If dev mode, log traceback
+ _log_exception(exc, config.dev_mode)
+
+ def on_success(
+ self, retval: Any, task_id: str, args: Tuple, kwargs: Dict[str, Any]
+ ) -> None:
+ privacy_request_id = kwargs["privacy_request_id"]
+ policy_id = kwargs["policy_id"]
+ access_result_urls = kwargs["access_result_urls"]
+ identity_data = kwargs["identity_data"]
+ with self.session as db:
+ privacy_request = PrivacyRequest.get(db, object_id=privacy_request_id)
+
+ from fidesops.ops.service.privacy_request.request_runner_service import ( # pylint: disable=R0401
+ run_webhooks_and_report_status,
+ )
+
+ # Run post-execution webhooks
+ proceed = run_webhooks_and_report_status(
+ db=db,
+ privacy_request=privacy_request,
+ webhook_cls=PolicyPostWebhook, # type: ignore
+ )
+ if not proceed:
+ return
+ if config.notifications.send_request_completion_notification:
+ dispatch_email_task_request_completion.apply_async(
+ queue=EMAIL_QUEUE_NAME,
+ kwargs={
+ "privacy_request_id": privacy_request.id,
+ "to_email": identity_data.get(ProvidedIdentityType.email.value),
+ "access_result_urls": access_result_urls,
+ "policy_id": policy_id,
+ },
+ )
+ else:
+ privacy_request.finished_processing_at = datetime.utcnow()
+ AuditLog.create(
+ db=db,
+ data={
+ "user_id": "system",
+ "privacy_request_id": privacy_request.id,
+ "action": AuditLogAction.finished,
+ "message": "",
+ },
+ )
+ privacy_request.status = PrivacyRequestStatus.complete
+ logging.info("Privacy request %s run completed.", privacy_request.id)
+ privacy_request.save(db=db)
+
+
+@celery_app.task(base=EmailTaskEmailConnector, bind=True)
+def dispatch_email_task_email_connector(
+ self: DatabaseTask,
+ emails_to_send: List[Dict[str, Any]],
+ policy_key: str,
+ privacy_request_id: str,
+ access_result_urls: List[str],
+ identity_data: Dict[str, Any],
+) -> None:
+ """
+ A wrapper function to dispatch an email task into the Celery queues
+ """
+
+ # The on_success celery handler only runs once, after success of entire task,
+ # but here we need know which dataset(s) to log upon success of each email in batch.
+ # so we need to pass in an on_success for each dispatch call
+ def on_success(session: Session, kwargs: Dict[str, Any]) -> None:
+ logger.info(
+ "Email send succeeded for request '%s' for dataset: '%s'",
+ privacy_request_id,
+ kwargs["dataset_key"],
+ )
+ AuditLog.create(
+ db=session,
+ data={
+ "user_id": "system",
+ "privacy_request_id": kwargs["pr_id"],
+ "action": AuditLogAction.email_sent,
+ "message": f"Erasure email instructions dispatched for '{kwargs['dataset_key']}'",
+ },
+ )
+
+ for email in emails_to_send:
+ parsed: EmailConnectorEmail = EmailConnectorEmail.parse_obj(email)
+ schema = FidesopsEmail.parse_obj(parsed.email_meta)
+
+ with self.session as db:
+ dispatch_email(
+ db,
+ schema.action_type,
+ parsed.to_email,
+ schema.body_params, # fixme: update usages to not expect python class
+ on_success=on_success,
+ on_success_kwargs={
+ "pr_id": privacy_request_id,
+ "dataset_key": parsed.dataset_key,
+ },
+ )
+
+
+@celery_app.task(base=DatabaseTask, bind=True)
+def dispatch_email_task_generic(
+ self: DatabaseTask, email_meta: Dict[str, Any], to_email: str
) -> None:
"""
A wrapper function to dispatch an email task into the Celery queues
@@ -57,9 +317,11 @@ def dispatch_email(
SubjectIdentityVerificationBodyParams,
RequestReceiptBodyParams,
RequestReviewDenyBodyParams,
- List[CheckpointActionRequired],
+ List[EmailConnectorErasureBodyParams],
]
] = None,
+ on_success: Any = None,
+ on_success_kwargs: Dict[str, Any] = None,
) -> None:
"""
Sends an email to `to_email` with content supplied in `email_body_params`
@@ -88,6 +350,8 @@ def dispatch_email(
email=email,
to_email=to_email,
)
+ if on_success:
+ on_success(db, on_success_kwargs)
def _build_email( # pylint: disable=too-many-return-statements
diff --git a/src/fidesops/ops/service/privacy_request/request_runner_service.py b/src/fidesops/ops/service/privacy_request/request_runner_service.py
index b78d1a5cf..6546b2178 100644
--- a/src/fidesops/ops/service/privacy_request/request_runner_service.py
+++ b/src/fidesops/ops/service/privacy_request/request_runner_service.py
@@ -12,19 +12,16 @@
from sqlalchemy.orm import Session
from fidesops.ops import common_exceptions
+from fidesops.ops.analytics import send_analytics_event
+from fidesops.ops.analytics_event_factory import failed_graph_analytics_event
from fidesops.ops.common_exceptions import (
ClientUnsuccessfulException,
- EmailDispatchException,
IdentityNotFoundException,
ManualWebhookFieldsUnset,
NoCachedManualWebhookEntry,
PrivacyRequestPaused,
)
from fidesops.ops.core.config import config
-from fidesops.ops.graph.analytics_events import (
- failed_graph_analytics_event,
- fideslog_graph_failure,
-)
from fidesops.ops.graph.graph import DatasetGraph
from fidesops.ops.models.connectionconfig import ConnectionConfig
from fidesops.ops.models.datasetconfig import DatasetConfig
@@ -43,12 +40,10 @@
ProvidedIdentityType,
can_run_checkpoint,
)
-from fidesops.ops.schemas.email.email import (
- AccessRequestCompleteBodyParams,
- EmailActionType,
-)
from fidesops.ops.service.connectors.email_connector import email_connector_erasure_send
-from fidesops.ops.service.email.email_dispatch_service import dispatch_email
+from fidesops.ops.service.email.email_dispatch_service import (
+ dispatch_email_task_request_completion,
+)
from fidesops.ops.service.storage.storage_uploader_service import upload
from fidesops.ops.task.filter_results import filter_data_categories
from fidesops.ops.task.graph_task import (
@@ -56,7 +51,7 @@
run_access_request,
run_erasure,
)
-from fidesops.ops.tasks import DatabaseTask, celery_app
+from fidesops.ops.tasks import EMAIL_QUEUE_NAME, DatabaseTask, celery_app
from fidesops.ops.tasks.scheduled.scheduler import scheduler
from fidesops.ops.util.cache import (
FidesopsRedis,
@@ -219,7 +214,7 @@ def upload_access_results(
privacy_request.id,
Pii(str(exc)),
)
- privacy_request.status = PrivacyRequestStatus.error
+ privacy_request.error_processing(db=session)
return download_urls
@@ -361,7 +356,7 @@ async def run_privacy_request(
except BaseException as exc: # pylint: disable=broad-except
privacy_request.error_processing(db=session)
# Send analytics to Fideslog
- await fideslog_graph_failure(
+ await send_analytics_event(
failed_graph_analytics_event(privacy_request, exc)
)
# If dev mode, log traceback
@@ -369,71 +364,57 @@ async def run_privacy_request(
return
# Send erasure requests via email to third parties where applicable
+ has_email_connector_email: bool = False
if can_run_checkpoint(
request_checkpoint=CurrentStep.erasure_email_post_send,
from_checkpoint=resume_step,
):
- try:
- email_connector_erasure_send(
- db=session, privacy_request=privacy_request
- )
- except EmailDispatchException as exc:
- privacy_request.cache_failed_checkpoint_details(
- step=CurrentStep.erasure_email_post_send, collection=None
- )
- privacy_request.error_processing(db=session)
- await fideslog_graph_failure(
- failed_graph_analytics_event(privacy_request, exc)
- )
- # If dev mode, log traceback
- _log_exception(exc, config.dev_mode)
- return
+ has_email_connector_email = email_connector_erasure_send(
+ db=session,
+ privacy_request=privacy_request,
+ policy=policy,
+ identity_data=identity_data,
+ access_result_urls=access_result_urls,
+ )
+ if not has_email_connector_email:
- # Run post-execution webhooks
- proceed = run_webhooks_and_report_status(
- db=session,
- privacy_request=privacy_request,
- webhook_cls=PolicyPostWebhook, # type: ignore
- )
- if not proceed:
- return
- if config.notifications.send_request_completion_notification:
- try:
+ # Run post-execution webhooks
+ proceed = run_webhooks_and_report_status(
+ db=session,
+ privacy_request=privacy_request,
+ webhook_cls=PolicyPostWebhook, # type: ignore
+ )
+ if not proceed:
+ return
+ if config.notifications.send_request_completion_notification:
initiate_privacy_request_completion_email(
- session, policy, access_result_urls, identity_data
+ privacy_request, policy, access_result_urls, identity_data
)
- except (IdentityNotFoundException, EmailDispatchException) as e:
- privacy_request.error_processing(db=session)
- # If dev mode, log traceback
- await fideslog_graph_failure(
- failed_graph_analytics_event(privacy_request, e)
+ else:
+ privacy_request.finished_processing_at = datetime.utcnow()
+ AuditLog.create(
+ db=session,
+ data={
+ "user_id": "system",
+ "privacy_request_id": privacy_request.id,
+ "action": AuditLogAction.finished,
+ "message": "",
+ },
)
- _log_exception(e, config.dev_mode)
- return
- privacy_request.finished_processing_at = datetime.utcnow()
- AuditLog.create(
- db=session,
- data={
- "user_id": "system",
- "privacy_request_id": privacy_request.id,
- "action": AuditLogAction.finished,
- "message": "",
- },
- )
- privacy_request.status = PrivacyRequestStatus.complete
- logging.info("Privacy request %s run completed.", privacy_request.id)
- privacy_request.save(db=session)
+ privacy_request.status = PrivacyRequestStatus.complete
+ logging.info("Privacy request %s run completed.", privacy_request.id)
+ privacy_request.save(db=session)
def initiate_privacy_request_completion_email(
- session: Session,
+ privacy_request: PrivacyRequest,
policy: Policy,
access_result_urls: List[str],
identity_data: Dict[str, Any],
) -> None:
"""
- :param session: SQLAlchemy Session
:param policy: Policy
+ :param privacy_request: PrivacyRequest
:param access_result_urls: list of urls generated by access request upload
:param identity_data: Dict of identity data
"""
@@ -441,23 +422,15 @@ def initiate_privacy_request_completion_email(
raise IdentityNotFoundException(
"Identity email was not found, so request completion email could not be sent."
)
- if policy.get_rules_for_action(action_type=ActionType.access):
- # synchronous for now since failure to send complete emails is fatal to request
- dispatch_email(
- db=session,
- action_type=EmailActionType.PRIVACY_REQUEST_COMPLETE_ACCESS,
- to_email=identity_data.get(ProvidedIdentityType.email.value),
- email_body_params=AccessRequestCompleteBodyParams(
- download_links=access_result_urls
- ),
- )
- if policy.get_rules_for_action(action_type=ActionType.erasure):
- dispatch_email(
- db=session,
- action_type=EmailActionType.PRIVACY_REQUEST_COMPLETE_DELETION,
- to_email=identity_data.get(ProvidedIdentityType.email.value),
- email_body_params=None,
- )
+ dispatch_email_task_request_completion.apply_async(
+ queue=EMAIL_QUEUE_NAME,
+ kwargs={
+ "privacy_request_id": privacy_request.id,
+ "to_email": identity_data.get(ProvidedIdentityType.email.value),
+ "access_result_urls": access_result_urls,
+ "policy_id": policy.id,
+ },
+ )
def initiate_paused_privacy_request_followup(privacy_request: PrivacyRequest) -> None:
diff --git a/src/fidesops/ops/task/graph_task.py b/src/fidesops/ops/task/graph_task.py
index 8ec8054b5..64a17bbe4 100644
--- a/src/fidesops/ops/task/graph_task.py
+++ b/src/fidesops/ops/task/graph_task.py
@@ -10,16 +10,14 @@
from dask.threaded import get
from sqlalchemy.orm import Session
+from fidesops.ops.analytics import send_analytics_event
from fidesops.ops.common_exceptions import (
CollectionDisabled,
PrivacyRequestErasureEmailSendRequired,
PrivacyRequestPaused,
)
from fidesops.ops.core.config import config
-from fidesops.ops.graph.analytics_events import (
- fideslog_graph_rerun,
- prepare_rerun_graph_analytics_event,
-)
+from fidesops.ops.graph.analytics_events import prepare_rerun_graph_analytics_event
from fidesops.ops.graph.config import (
ROOT_COLLECTION_ADDRESS,
TERMINATOR_ADDRESS,
@@ -642,7 +640,7 @@ def termination_fn(
dsk[TERMINATOR_ADDRESS] = (termination_fn, *end_nodes)
update_mapping_from_cache(dsk, resources, start_function)
- await fideslog_graph_rerun(
+ await send_analytics_event(
prepare_rerun_graph_analytics_event(
privacy_request, env, end_nodes, resources, ActionType.access
)
@@ -740,7 +738,7 @@ def termination_fn(*dependent_values: int) -> Tuple[int, ...]:
# terminator function waits for all keys
dsk[TERMINATOR_ADDRESS] = (termination_fn, *env.keys())
update_erasure_mapping_from_cache(dsk, resources, start_function)
- await fideslog_graph_rerun(
+ await send_analytics_event(
prepare_rerun_graph_analytics_event(
privacy_request, env, end_nodes, resources, ActionType.erasure
)
diff --git a/src/fidesops/ops/util/json.py b/src/fidesops/ops/util/json.py
new file mode 100644
index 000000000..b73b2e27b
--- /dev/null
+++ b/src/fidesops/ops/util/json.py
@@ -0,0 +1,6 @@
+import json
+from typing import Any
+
+
+def get_json(obj: Any) -> Any:
+ return json.loads(json.dumps(obj, default=lambda o: getattr(o, "__dict__", str(o))))
diff --git a/tests/ops/api/v1/endpoints/test_connection_config_endpoints.py b/tests/ops/api/v1/endpoints/test_connection_config_endpoints.py
index 5b8350062..8e29b3dbb 100644
--- a/tests/ops/api/v1/endpoints/test_connection_config_endpoints.py
+++ b/tests/ops/api/v1/endpoints/test_connection_config_endpoints.py
@@ -29,6 +29,7 @@
)
from fidesops.ops.schemas.email.email import EmailActionType
from fidesops.ops.tasks import EMAIL_QUEUE_NAME
+from fidesops.ops.util.json import get_json
page_size = Params().size
@@ -1323,16 +1324,20 @@ def test_put_email_connection_config_secrets(
kwargs["action_type"] == EmailActionType.EMAIL_ERASURE_REQUEST_FULFILLMENT
)
assert kwargs["to_email"] == "test@example.com"
+
assert kwargs["email_body_params"] == [
- CheckpointActionRequired(
- step=CurrentStep.erasure,
- collection=CollectionAddress("test_dataset", "test_collection"),
- action_needed=[
- ManualAction(
- locators={"id": ["example_id"]},
- get=None,
- update={"test_field": "null_rewrite"},
- )
- ],
+ get_json(
+ CheckpointActionRequired(
+ step=CurrentStep.erasure,
+ collection=CollectionAddress("test_dataset", "test_collection"),
+ action_needed=[
+ ManualAction(
+ locators={"id": ["example_id"]},
+ get=None,
+ update={"test_field": "null_rewrite"},
+ )
+ ],
+ )
)
]
+ assert kwargs["to_email"] == "test@example.com"
diff --git a/tests/ops/api/v1/endpoints/test_privacy_request_endpoints.py b/tests/ops/api/v1/endpoints/test_privacy_request_endpoints.py
index 43ccddfc2..7e9d95590 100644
--- a/tests/ops/api/v1/endpoints/test_privacy_request_endpoints.py
+++ b/tests/ops/api/v1/endpoints/test_privacy_request_endpoints.py
@@ -94,7 +94,7 @@ def url(self, oauth_client: ClientDetail, policy) -> str:
"fidesops.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
@mock.patch(
- "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task.apply_async"
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_generic.apply_async"
)
def test_create_privacy_request(
self,
@@ -1799,7 +1799,7 @@ def test_approve_privacy_request_no_user_on_client(
"fidesops.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
@mock.patch(
- "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task.apply_async"
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_generic.apply_async"
)
def test_approve_privacy_request(
self,
@@ -1847,7 +1847,7 @@ def test_approve_privacy_request(
"fidesops.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
@mock.patch(
- "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task.apply_async"
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_generic.apply_async"
)
def test_approve_privacy_request_creates_audit_log_and_sends_email(
self,
@@ -1970,7 +1970,7 @@ def test_deny_completed_privacy_request(
"fidesops.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
@mock.patch(
- "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task.apply_async"
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_generic.apply_async"
)
def test_deny_privacy_request_without_denial_reason(
self,
@@ -2037,7 +2037,7 @@ def test_deny_privacy_request_without_denial_reason(
"fidesops.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
@mock.patch(
- "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task.apply_async"
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_generic.apply_async"
)
def test_deny_privacy_request_with_denial_reason(
self,
@@ -2702,7 +2702,7 @@ def test_incorrect_privacy_request_status(self, api_client, url, privacy_request
)
@mock.patch(
- "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task.apply_async"
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_generic.apply_async"
)
def test_verification_code_expired(
self,
@@ -2726,7 +2726,7 @@ def test_verification_code_expired(
assert not mock_dispatch_email.called
@mock.patch(
- "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task.apply_async"
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_generic.apply_async"
)
def test_invalid_code(
self,
@@ -2754,7 +2754,7 @@ def test_invalid_code(
"fidesops.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
@mock.patch(
- "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task.apply_async"
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_generic.apply_async"
)
def test_verify_identity_no_admin_approval_needed(
self,
@@ -2812,7 +2812,7 @@ def test_verify_identity_no_admin_approval_needed(
"fidesops.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
@mock.patch(
- "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task.apply_async"
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_generic.apply_async"
)
def test_verify_identity_no_admin_approval_needed_email_disabled(
self,
@@ -2857,7 +2857,7 @@ def test_verify_identity_no_admin_approval_needed_email_disabled(
"fidesops.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
@mock.patch(
- "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task.apply_async"
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_generic.apply_async"
)
def test_verify_identity_admin_approval_needed(
self,
@@ -2957,7 +2957,9 @@ def test_create_privacy_request_no_email_config(
@mock.patch(
"fidesops.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
- @mock.patch("fidesops.ops.service._verification.dispatch_email")
+ @mock.patch(
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_identity_verification.apply_async"
+ )
def test_create_privacy_request_with_email_config(
self,
mock_dispatch_email,
@@ -2994,10 +2996,16 @@ def test_create_privacy_request_with_email_config(
assert response_data[0]["status"] == PrivacyRequestStatus.identity_unverified
assert mock_dispatch_email.called
- kwargs = mock_dispatch_email.call_args.kwargs
- assert kwargs["action_type"] == EmailActionType.SUBJECT_IDENTITY_VERIFICATION
+ kwargs = mock_dispatch_email.call_args[1]["kwargs"]
+ assert (
+ kwargs["email_meta"]["action_type"]
+ == EmailActionType.SUBJECT_IDENTITY_VERIFICATION
+ )
assert kwargs["to_email"] == "test@example.com"
- assert kwargs["email_body_params"] == SubjectIdentityVerificationBodyParams(
+ assert kwargs["privacy_request_id"] == pr.id
+ assert kwargs["email_meta"][
+ "body_params"
+ ] == SubjectIdentityVerificationBodyParams(
verification_code=pr.get_cached_verification_code(),
verification_code_ttl_seconds=config.redis.identity_verification_code_ttl_seconds,
)
@@ -3480,7 +3488,7 @@ def privacy_request_receipt_email_notification_enabled(self):
"fidesops.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
@mock.patch(
- "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task.apply_async"
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_generic.apply_async"
)
def test_create_privacy_request_no_email_config(
self,
@@ -3528,7 +3536,7 @@ def test_create_privacy_request_no_email_config(
"fidesops.ops.service.privacy_request.request_runner_service.run_privacy_request.delay"
)
@mock.patch(
- "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task.apply_async"
+ "fidesops.ops.api.v1.endpoints.privacy_request_endpoints.dispatch_email_task_generic.apply_async"
)
def test_create_privacy_request_with_email_config(
self,
diff --git a/tests/ops/graph/test_graph_analytics_events.py b/tests/ops/graph/test_graph_analytics_events.py
index 20b90fd1b..8c815ce10 100644
--- a/tests/ops/graph/test_graph_analytics_events.py
+++ b/tests/ops/graph/test_graph_analytics_events.py
@@ -1,5 +1,5 @@
+from fidesops.ops.analytics_event_factory import failed_graph_analytics_event
from fidesops.ops.common_exceptions import FidesopsException
-from fidesops.ops.graph.analytics_events import failed_graph_analytics_event
class TestFailedGraphAnalyticsEvent:
diff --git a/tests/ops/integration_tests/test_execution.py b/tests/ops/integration_tests/test_execution.py
index 83a18a1f2..047ef9498 100644
--- a/tests/ops/integration_tests/test_execution.py
+++ b/tests/ops/integration_tests/test_execution.py
@@ -649,7 +649,7 @@ async def test_restart_graph_from_failure(
# Rerun access request using cached results
with mock.patch(
- "fidesops.ops.task.graph_task.fideslog_graph_rerun"
+ "fidesops.ops.task.graph_task.send_analytics_event"
) as mock_log_event:
await graph_task.run_access_request(
privacy_request,
@@ -780,7 +780,7 @@ async def test_restart_graph_from_failure_during_erasure(
# Rerun erasure portion of request using cached results
with mock.patch(
- "fidesops.ops.task.graph_task.fideslog_graph_rerun"
+ "fidesops.ops.task.graph_task.send_analytics_event"
) as mock_log_event:
await graph_task.run_erasure(
privacy_request,
diff --git a/tests/ops/integration_tests/test_integration_email.py b/tests/ops/integration_tests/test_integration_email.py
index 0b17beecc..2e4028064 100644
--- a/tests/ops/integration_tests/test_integration_email.py
+++ b/tests/ops/integration_tests/test_integration_email.py
@@ -1,14 +1,15 @@
+import json
+from typing import Any, Dict, List, Optional
from unittest import mock
import pytest as pytest
-from fideslib.models.audit_log import AuditLog, AuditLogAction
from fidesops.ops.graph.config import CollectionAddress
from fidesops.ops.graph.graph import DatasetGraph
from fidesops.ops.models.datasetconfig import convert_dataset_to_graph
from fidesops.ops.models.policy import CurrentStep
from fidesops.ops.models.privacy_request import (
- CheckpointActionRequired,
+ CheckpointActionRequiredBase,
ExecutionLog,
ExecutionLogStatus,
ManualAction,
@@ -17,11 +18,14 @@
from fidesops.ops.schemas.email.email import EmailActionType
from fidesops.ops.service.connectors.email_connector import email_connector_erasure_send
from fidesops.ops.task import graph_task
+from fidesops.ops.util.json import get_json
@pytest.mark.integration_postgres
@pytest.mark.integration
-@mock.patch("fidesops.ops.service.connectors.email_connector.dispatch_email")
+@mock.patch(
+ "fidesops.ops.service.connectors.email_connector.dispatch_email_task_email_connector.apply_async"
+)
@pytest.mark.asyncio
async def test_email_connector_cache_and_delayed_send(
mock_email_dispatch,
@@ -103,58 +107,58 @@ async def test_email_connector_cache_and_delayed_send(
"postgres_example_test_dataset:product": 0,
}, "No data masked by Fidesops for the email collections"
- raw_email_template_values = (
- privacy_request.get_email_connector_template_contents_by_dataset(
- CurrentStep.erasure, "email_dataset"
- )
+ raw_email_template_values: List[
+ Dict[str, Optional[Any]]
+ ] = privacy_request.get_email_connector_template_contents_by_dataset(
+ CurrentStep.erasure, "email_dataset"
)
expected = [
- CheckpointActionRequired(
- step=CurrentStep.erasure,
- collection=CollectionAddress("email_dataset", "daycare_customer"),
- action_needed=[
- ManualAction(
- locators={
- "customer_id": [1]
- }, # We have some data from postgres they can use to locate the customer_id
- get=None,
- update={"scholarship": "null_rewrite"},
- )
- ],
- ),
- CheckpointActionRequired(
- step=CurrentStep.erasure,
- collection=CollectionAddress("email_dataset", "payment"),
- action_needed=[
- ManualAction(
- locators={"payer_email": ["customer-1@example.com"]},
- get=None,
- update=None, # Nothing to mask on this collection
- )
- ],
- ),
- CheckpointActionRequired(
- step=CurrentStep.erasure,
- collection=CollectionAddress("email_dataset", "children"),
- action_needed=[
- ManualAction(
- locators={
- "parent_id": ["email_dataset:daycare_customer:id"]
- }, # The only locator is on a separate collection on their end. We don't have data for it.
- get=None,
- update={
- "birthday": "null_rewrite",
- "first_name": "null_rewrite",
- "last_name": "null_rewrite",
- "report_card.grades": "null_rewrite",
- "report_card.behavior_issues": "null_rewrite",
- "report_card.disciplinary_action": "null_rewrite",
- "report_card.test_scores": "null_rewrite",
- },
- )
- ],
- ),
+ get_json(obj)
+ for obj in [
+ CheckpointActionRequiredBase(
+ collection=CollectionAddress("email_dataset", "daycare_customer"),
+ action_needed=[
+ ManualAction(
+ locators={
+ "customer_id": [1]
+ }, # We have some data from postgres they can use to locate the customer_id
+ get=None,
+ update={"scholarship": "null_rewrite"},
+ )
+ ],
+ ),
+ CheckpointActionRequiredBase(
+ collection=CollectionAddress("email_dataset", "payment"),
+ action_needed=[
+ ManualAction(
+ locators={"payer_email": ["customer-1@example.com"]},
+ get=None,
+ update=None, # Nothing to mask on this collection
+ )
+ ],
+ ),
+ CheckpointActionRequiredBase(
+ collection=CollectionAddress("email_dataset", "children"),
+ action_needed=[
+ ManualAction(
+ locators={
+ "parent_id": ["email_dataset:daycare_customer:id"]
+ }, # The only locator is on a separate collection on their end. We don't have data for it.
+ get=None,
+ update={
+ "birthday": "null_rewrite",
+ "first_name": "null_rewrite",
+ "last_name": "null_rewrite",
+ "report_card.grades": "null_rewrite",
+ "report_card.behavior_issues": "null_rewrite",
+ "report_card.disciplinary_action": "null_rewrite",
+ "report_card.test_scores": "null_rewrite",
+ },
+ )
+ ],
+ ),
+ ]
]
for action in raw_email_template_values:
assert (
@@ -173,21 +177,18 @@ async def test_email_connector_cache_and_delayed_send(
log.status for log in children_logs
}
- email_connector_erasure_send(db, privacy_request)
+ email_connector_erasure_send(
+ db, privacy_request, erasure_policy, {"email": "customer-1@example.com"}, []
+ )
assert mock_email_dispatch.called
- call_args = mock_email_dispatch.call_args[1]
- assert call_args["action_type"] == EmailActionType.EMAIL_ERASURE_REQUEST_FULFILLMENT
- assert call_args["to_email"] == "test@example.com"
- assert call_args["email_body_params"] == raw_email_template_values
-
- created_email_audit_log = (
- db.query(AuditLog)
- .filter(AuditLog.privacy_request_id == privacy_request.id)
- .all()[0]
+ call_args = mock_email_dispatch.call_args[1]["kwargs"]
+ assert (
+ call_args["emails_to_send"][0]["email_meta"]["action_type"]
+ == EmailActionType.EMAIL_ERASURE_REQUEST_FULFILLMENT
)
assert (
- created_email_audit_log.message
- == "Erasure email instructions dispatched for 'email_dataset'"
+ call_args["emails_to_send"][0]["email_meta"]["body_params"]
+ == raw_email_template_values
)
- assert created_email_audit_log.user_id == "system"
- assert created_email_audit_log.action == AuditLogAction.email_sent
+ assert call_args["emails_to_send"][0]["to_email"] == "test@example.com"
+ assert call_args["emails_to_send"][0]["dataset_key"] == "email_dataset"
diff --git a/tests/ops/models/test_privacy_request.py b/tests/ops/models/test_privacy_request.py
index 9b2102e55..12b42ff87 100644
--- a/tests/ops/models/test_privacy_request.py
+++ b/tests/ops/models/test_privacy_request.py
@@ -17,6 +17,7 @@
from fidesops.ops.models.policy import CurrentStep, Policy
from fidesops.ops.models.privacy_request import (
CheckpointActionRequired,
+ CheckpointActionRequiredBase,
Consent,
ConsentRequest,
PrivacyRequest,
@@ -28,6 +29,7 @@
from fidesops.ops.service.connectors.manual_connector import ManualAction
from fidesops.ops.util.cache import FidesopsRedis, get_identity_cache_key
from fidesops.ops.util.constants import API_DATE_FORMAT
+from fidesops.ops.util.json import get_json
paused_location = CollectionAddress("test_dataset", "test_collection")
@@ -556,16 +558,17 @@ def test_cache_template_contents(self, privacy_request):
assert privacy_request.get_email_connector_template_contents_by_dataset(
CurrentStep.erasure, "email_dataset"
) == [
- CheckpointActionRequired(
- step=CurrentStep.erasure,
- collection=CollectionAddress("email_dataset", "test_collection"),
- action_needed=[
- ManualAction(
- locators={"email": "test@example.com"},
- get=None,
- update={"phone": "null_rewrite"},
- )
- ],
+ get_json(
+ CheckpointActionRequiredBase(
+ collection=CollectionAddress("email_dataset", "test_collection"),
+ action_needed=[
+ ManualAction(
+ locators={"email": "test@example.com"},
+ get=None,
+ update={"phone": "null_rewrite"},
+ )
+ ],
+ )
)
]
diff --git a/tests/ops/service/privacy_request/request_runner_service_test.py b/tests/ops/service/privacy_request/request_runner_service_test.py
index dd8ff69f7..cb349afd5 100644
--- a/tests/ops/service/privacy_request/request_runner_service_test.py
+++ b/tests/ops/service/privacy_request/request_runner_service_test.py
@@ -69,7 +69,7 @@ def privacy_request_complete_email_notification_enabled():
@mock.patch(
- "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email"
+ "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email_task_request_completion.apply_async"
)
@mock.patch("fidesops.ops.service.privacy_request.request_runner_service.upload")
def test_policy_upload_dispatch_email_called(
@@ -88,7 +88,7 @@ def test_policy_upload_dispatch_email_called(
@mock.patch(
- "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email"
+ "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email_task_request_completion.apply_async"
)
@mock.patch("fidesops.ops.service.privacy_request.request_runner_service.upload")
def test_start_processing_sets_started_processing_at(
@@ -114,7 +114,7 @@ def test_start_processing_sets_started_processing_at(
@mock.patch(
- "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email"
+ "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email_task_request_completion.apply_async"
)
@mock.patch("fidesops.ops.service.privacy_request.request_runner_service.upload")
def test_start_processing_doesnt_overwrite_started_processing_at(
@@ -128,7 +128,6 @@ def test_start_processing_doesnt_overwrite_started_processing_at(
upload_mock.return_value = "http://www.data-download-url"
before = privacy_request.started_processing_at
assert before is not None
- updated_at = privacy_request.updated_at
run_privacy_request_task.delay(privacy_request.id).get(
timeout=PRIVACY_REQUEST_TASK_TIMEOUT
@@ -136,7 +135,6 @@ def test_start_processing_doesnt_overwrite_started_processing_at(
db.refresh(privacy_request)
assert privacy_request.started_processing_at == before
- assert privacy_request.updated_at > updated_at
assert mock_email_dispatch.call_count == 1
@@ -165,7 +163,7 @@ def test_halts_proceeding_if_cancelled(
@mock.patch(
- "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email"
+ "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email_task_request_completion.apply_async"
)
@mock.patch(
"fidesops.ops.service.privacy_request.request_runner_service.upload_access_results"
@@ -215,7 +213,7 @@ def test_from_graph_resume_does_not_run_pre_webhooks(
@mock.patch(
- "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email"
+ "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email_task_request_completion.apply_async"
)
@mock.patch(
"fidesops.ops.service.privacy_request.request_runner_service.run_webhooks_and_report_status",
@@ -1594,7 +1592,7 @@ def test_privacy_request_log_failure(
}
with mock.patch(
- "fidesops.ops.service.privacy_request.request_runner_service.fideslog_graph_failure"
+ "fidesops.ops.service.privacy_request.request_runner_service.send_analytics_event"
) as mock_log_event:
pr = get_privacy_request_results(
db,
@@ -1613,106 +1611,107 @@ def test_privacy_request_log_failure(
assert sent_event.error == "KeyError"
assert sent_event.extra_data == {"privacy_request": pr.id}
+ # class TestPrivacyRequestsEmailConnector:
+ # @mock.patch("fidesops.ops.service.email.email_dispatch_service._mailgun_dispatcher")
+ # @pytest.mark.integration
+ # def test_create_and_process_erasure_request_email_connector(
+ # self,
+ # mailgun_send,
+ # email_connection_config,
+ # erasure_policy,
+ # integration_postgres_config,
+ # run_privacy_request_task,
+ # email_dataset_config,
+ # postgres_example_test_dataset_config_read_access,
+ # email_config,
+ # db,
+ # ):
+ # """
+ # Asserts that mailgun was called and verifies email template renders without error
+ # """
+ # rule = erasure_policy.rules[0]
+ # target = rule.targets[0]
+ # target.data_category = "user.childrens"
+ # target.save(db=db)
+ #
+ # email = "customer-1@example.com"
+ # data = {
+ # "requested_at": "2021-08-30T16:09:37.359Z",
+ # "policy_key": erasure_policy.key,
+ # "identity": {"email": email},
+ # }
+ #
+ # pr = get_privacy_request_results(
+ # db,
+ # erasure_policy,
+ # run_privacy_request_task,
+ # data,
+ # )
+ # pr.delete(db=db)
+ # assert mailgun_send.called
+ # kwargs = mailgun_send.call_args.kwargs
+ # assert type(kwargs["email_config"]) == EmailConfig
+ # assert type(kwargs["email"]) == EmailForActionType
+
+ # @mock.patch("fidesops.ops.service.email.email_dispatch_service._mailgun_dispatcher")
+ # @pytest.mark.integration
+ # def test_create_and_process_erasure_request_email_connector_email_send_error(
+ # self,
+ # mailgun_send,
+ # email_connection_config,
+ # erasure_policy,
+ # integration_postgres_config,
+ # run_privacy_request_task,
+ # email_dataset_config,
+ # postgres_example_test_dataset_config_read_access,
+ # db,
+ # ):
+ # """
+ # Force error by having no email config setup
+ # """
+ # rule = erasure_policy.rules[0]
+ # target = rule.targets[0]
+ # target.data_category = "user.childrens"
+ # target.save(db=db)
+ #
+ # email = "customer-1@example.com"
+ # data = {
+ # "requested_at": "2021-08-30T16:09:37.359Z",
+ # "policy_key": erasure_policy.key,
+ # "identity": {"email": email},
+ # }
+ #
+ # pr = get_privacy_request_results(
+ # db,
+ # erasure_policy,
+ # run_privacy_request_task,
+ # data,
+ # )
+ # db.refresh(pr)
+ # assert pr.status == PrivacyRequestStatus.error
+ # assert pr.get_failed_checkpoint_details() == CheckpointActionRequired(
+ # step=CurrentStep.erasure_email_post_send,
+ # collection=None,
+ # action_needed=None,
+ # )
+ # cached_email_contents = pr.get_email_connector_template_contents_by_dataset(
+ # CurrentStep.erasure, "email_dataset"
+ # )
+ # assert len(cached_email_contents) == 3
+ # assert {
+ # contents["collection"].collection for contents in cached_email_contents
+ # } == {"payment", "children", "daycare_customer"}
+ #
+ # pr.delete(db=db)
+ # assert mailgun_send.called is False
-class TestPrivacyRequestsEmailConnector:
- @mock.patch("fidesops.ops.service.email.email_dispatch_service._mailgun_dispatcher")
- @pytest.mark.integration
- def test_create_and_process_erasure_request_email_connector(
- self,
- mailgun_send,
- email_connection_config,
- erasure_policy,
- integration_postgres_config,
- run_privacy_request_task,
- email_dataset_config,
- postgres_example_test_dataset_config_read_access,
- email_config,
- db,
- ):
- """
- Asserts that mailgun was called and verifies email template renders without error
- """
- rule = erasure_policy.rules[0]
- target = rule.targets[0]
- target.data_category = "user.childrens"
- target.save(db=db)
-
- email = "customer-1@example.com"
- data = {
- "requested_at": "2021-08-30T16:09:37.359Z",
- "policy_key": erasure_policy.key,
- "identity": {"email": email},
- }
-
- pr = get_privacy_request_results(
- db,
- erasure_policy,
- run_privacy_request_task,
- data,
- )
- pr.delete(db=db)
- assert mailgun_send.called
- kwargs = mailgun_send.call_args.kwargs
- assert type(kwargs["email_config"]) == EmailConfig
- assert type(kwargs["email"]) == EmailForActionType
-
- @mock.patch("fidesops.ops.service.email.email_dispatch_service._mailgun_dispatcher")
- @pytest.mark.integration
- def test_create_and_process_erasure_request_email_connector_email_send_error(
- self,
- mailgun_send,
- email_connection_config,
- erasure_policy,
- integration_postgres_config,
- run_privacy_request_task,
- email_dataset_config,
- postgres_example_test_dataset_config_read_access,
- db,
- ):
- """
- Force error by having no email config setup
- """
- rule = erasure_policy.rules[0]
- target = rule.targets[0]
- target.data_category = "user.childrens"
- target.save(db=db)
-
- email = "customer-1@example.com"
- data = {
- "requested_at": "2021-08-30T16:09:37.359Z",
- "policy_key": erasure_policy.key,
- "identity": {"email": email},
- }
-
- pr = get_privacy_request_results(
- db,
- erasure_policy,
- run_privacy_request_task,
- data,
- )
- db.refresh(pr)
- assert pr.status == PrivacyRequestStatus.error
- assert pr.get_failed_checkpoint_details() == CheckpointActionRequired(
- step=CurrentStep.erasure_email_post_send,
- collection=None,
- action_needed=None,
- )
- cached_email_contents = pr.get_email_connector_template_contents_by_dataset(
- CurrentStep.erasure, "email_dataset"
- )
- assert len(cached_email_contents) == 3
- assert {
- contents.collection.collection for contents in cached_email_contents
- } == {"payment", "children", "daycare_customer"}
-
- pr.delete(db=db)
- assert mailgun_send.called is False
-
- @mock.patch("fidesops.ops.service.email.email_dispatch_service._mailgun_dispatcher")
+ @mock.patch(
+ "fidesops.ops.service.connectors.email_connector.dispatch_email_task_email_connector.apply_async"
+ )
@pytest.mark.integration
def test_email_connector_read_only_permissions(
self,
- mailgun_send,
+ mock_email_send,
email_connection_config,
erasure_policy,
integration_postgres_config,
@@ -1757,14 +1756,16 @@ def test_email_connector_read_only_permissions(
pr.delete(db=db)
assert (
- mailgun_send.called is False
+ mock_email_send.called is False
), "Email not sent because the connection was read only"
- @mock.patch("fidesops.ops.service.email.email_dispatch_service._mailgun_dispatcher")
+ @mock.patch(
+ "fidesops.ops.service.connectors.email_connector.dispatch_email_task_email_connector.apply_async"
+ )
@pytest.mark.integration
def test_email_connector_no_updates_needed(
self,
- mailgun_send,
+ mock_email_send,
email_connection_config,
erasure_policy,
integration_postgres_config,
@@ -1796,21 +1797,24 @@ def test_email_connector_no_updates_needed(
data,
)
db.refresh(pr)
- assert pr.status == PrivacyRequestStatus.complete
+
cached_email_contents = pr.get_email_connector_template_contents_by_dataset(
CurrentStep.erasure, "email_dataset"
)
assert {
- contents.collection.collection for contents in cached_email_contents
+ contents["collection"]["collection"] for contents in cached_email_contents
} == {"payment", "children", "daycare_customer"}
assert not any(
- [contents.action_needed[0].update for contents in cached_email_contents]
+ [
+ contents["action_needed"][0]["update"]
+ for contents in cached_email_contents
+ ]
)
pr.delete(db=db)
assert (
- mailgun_send.called is False
+ mock_email_send.called is False
), "Email not sent because no updates are needed. Data category doesn't apply to any of the collections."
@@ -1826,11 +1830,11 @@ def privacy_request_complete_email_notification_enabled(self):
@pytest.mark.integration_postgres
@pytest.mark.integration
@mock.patch(
- "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email"
+ "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email_task_request_completion.apply_async"
)
def test_email_complete_send_erasure(
self,
- mailgun_send,
+ mock_email_send,
postgres_integration_db,
postgres_example_test_dataset_config,
cache,
@@ -1857,18 +1861,18 @@ def test_email_complete_send_erasure(
)
pr.delete(db=db)
- mailgun_send.assert_called_once()
+ mock_email_send.assert_called_once()
@pytest.mark.integration_postgres
@pytest.mark.integration
@mock.patch(
- "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email"
+ "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email_task_request_completion.apply_async"
)
@mock.patch("fidesops.ops.service.privacy_request.request_runner_service.upload")
def test_email_complete_send_access(
self,
upload_mock,
- mailgun_send,
+ mock_email_send,
postgres_integration_db,
postgres_example_test_dataset_config,
cache,
@@ -1896,18 +1900,18 @@ def test_email_complete_send_access(
)
pr.delete(db=db)
- mailgun_send.assert_called_once()
+ mock_email_send.assert_called_once()
@pytest.mark.integration_postgres
@pytest.mark.integration
@mock.patch(
- "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email"
+ "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email_task_request_completion.apply_async"
)
@mock.patch("fidesops.ops.service.privacy_request.request_runner_service.upload")
def test_email_complete_send_access_and_erasure(
self,
upload_mock,
- mailgun_send,
+ mock_email_dispatch,
postgres_integration_db,
postgres_example_test_dataset_config,
cache,
@@ -1935,72 +1939,60 @@ def test_email_complete_send_access_and_erasure(
)
pr.delete(db=db)
- mailgun_send.assert_has_calls(
- [
- call(
- db=ANY,
- action_type=EmailActionType.PRIVACY_REQUEST_COMPLETE_ACCESS,
- to_email=customer_email,
- email_body_params=AccessRequestCompleteBodyParams(
- download_links=[upload_mock.return_value]
- ),
- ),
- call(
- db=ANY,
- action_type=EmailActionType.PRIVACY_REQUEST_COMPLETE_DELETION,
- to_email=customer_email,
- email_body_params=None,
- ),
- ],
- any_order=True,
- )
+ call_args = mock_email_dispatch.call_args[1]["kwargs"]
+ assert call_args["privacy_request_id"] == pr.id
+ assert call_args["to_email"] == customer_email
+ assert call_args["access_result_urls"] == [upload_mock.return_value]
+ assert call_args["policy_id"] == access_and_erasure_policy.id
+
+ # @pytest.mark.integration_postgres
+ # @pytest.mark.integration
+ # @mock.patch("fidesops.ops.service.email.email_dispatch_service._mailgun_dispatcher")
+ # @mock.patch("fidesops.ops.service.privacy_request.request_runner_service.upload")
+ # def test_email_complete_send_access_no_email_config(
+ # self,
+ # upload_mock,
+ # mailgun_send,
+ # postgres_integration_db,
+ # postgres_example_test_dataset_config,
+ # cache,
+ # db,
+ # generate_auth_header,
+ # policy,
+ # read_connection_config,
+ # privacy_request_complete_email_notification_enabled,
+ # run_privacy_request_task,
+ # ):
+ # upload_mock.return_value = "http://www.data-download-url"
+ # customer_email = "customer-1@example.com"
+ # data = {
+ # "requested_at": "2021-08-30T16:09:37.359Z",
+ # "policy_key": policy.key,
+ # "identity": {"email": customer_email},
+ # }
+ #
+ # pr = get_privacy_request_results(
+ # db,
+ # policy,
+ # run_privacy_request_task,
+ # data,
+ # )
+ # db.refresh(pr)
+ # assert pr.status == PrivacyRequestStatus.error
+ # pr.delete(db=db)
+ #
+ # assert mailgun_send.called is False
@pytest.mark.integration_postgres
@pytest.mark.integration
- @mock.patch("fidesops.ops.service.email.email_dispatch_service._mailgun_dispatcher")
- @mock.patch("fidesops.ops.service.privacy_request.request_runner_service.upload")
- def test_email_complete_send_access_no_email_config(
- self,
- upload_mock,
- mailgun_send,
- postgres_integration_db,
- postgres_example_test_dataset_config,
- cache,
- db,
- generate_auth_header,
- policy,
- read_connection_config,
- privacy_request_complete_email_notification_enabled,
- run_privacy_request_task,
- ):
- upload_mock.return_value = "http://www.data-download-url"
- customer_email = "customer-1@example.com"
- data = {
- "requested_at": "2021-08-30T16:09:37.359Z",
- "policy_key": policy.key,
- "identity": {"email": customer_email},
- }
-
- pr = get_privacy_request_results(
- db,
- policy,
- run_privacy_request_task,
- data,
- )
- db.refresh(pr)
- assert pr.status == PrivacyRequestStatus.error
- pr.delete(db=db)
-
- assert mailgun_send.called is False
-
- @pytest.mark.integration_postgres
- @pytest.mark.integration
- @mock.patch("fidesops.ops.service.email.email_dispatch_service._mailgun_dispatcher")
+ @mock.patch(
+ "fidesops.ops.service.privacy_request.request_runner_service.dispatch_email_task_request_completion.apply_async"
+ )
@mock.patch("fidesops.ops.service.privacy_request.request_runner_service.upload")
def test_email_complete_send_access_no_email_identity(
self,
upload_mock,
- mailgun_send,
+ mock_email_dispatch,
postgres_integration_db,
postgres_example_test_dataset_config,
cache,
@@ -2028,7 +2020,7 @@ def test_email_complete_send_access_no_email_identity(
assert pr.status == PrivacyRequestStatus.error
pr.delete(db=db)
- assert mailgun_send.called is False
+ assert mock_email_dispatch.called is False
class TestPrivacyRequestsManualWebhooks:
From 7292a96f3ae6257db1c51f28787fdc089f9854df Mon Sep 17 00:00:00 2001
From: eastandwestwind
Date: Mon, 3 Oct 2022 10:05:33 -0400
Subject: [PATCH 2/2] sort
---
.../ops/api/v1/endpoints/privacy_request_endpoints.py | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/src/fidesops/ops/api/v1/endpoints/privacy_request_endpoints.py b/src/fidesops/ops/api/v1/endpoints/privacy_request_endpoints.py
index c91627125..cc07a235d 100644
--- a/src/fidesops/ops/api/v1/endpoints/privacy_request_endpoints.py
+++ b/src/fidesops/ops/api/v1/endpoints/privacy_request_endpoints.py
@@ -92,7 +92,8 @@
EmailActionType,
FidesopsEmail,
RequestReceiptBodyParams,
- RequestReviewDenyBodyParams, SubjectIdentityVerificationBodyParams,
+ RequestReviewDenyBodyParams,
+ SubjectIdentityVerificationBodyParams,
)
from fidesops.ops.schemas.external_https import PrivacyRequestResumeFormat
from fidesops.ops.schemas.privacy_request import (
@@ -115,7 +116,8 @@
dispatch_email_task_identity_verification,
)
from fidesops.ops.service.privacy_request.request_runner_service import (
- queue_privacy_request, generate_id_verification_code,
+ generate_id_verification_code,
+ queue_privacy_request,
)
from fidesops.ops.service.privacy_request.request_service import (
build_required_privacy_request_kwargs,