Skip to content
This repository has been archived by the owner on Nov 30, 2022. It is now read-only.

Convert dispatch email tasks to async #1412

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 12 additions & 26 deletions src/fidesops/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
from fideslib.oauth.api.deps import get_db as lib_get_db
from fideslib.oauth.api.deps import verify_oauth_client as lib_verify_oauth_client
from fideslib.oauth.api.routes.user_endpoints import router as user_router
from fideslog.sdk.python.event import AnalyticsEvent
from redis.exceptions import ResponseError
from starlette.background import BackgroundTask
from starlette.middleware.cors import CORSMiddleware
from starlette.status import HTTP_404_NOT_FOUND

from fidesops.ops.analytics import (
accessed_through_local_host,
in_docker_container,
send_analytics_event,
from fidesops.ops.analytics import send_analytics_event
from fidesops.ops.analytics_event_factory import (
endpoint_call_analytics_event,
server_start_analytics_event,
)
from fidesops.ops.api.deps import get_api_session, get_config, get_db
from fidesops.ops.api.v1.api import api_router
Expand All @@ -36,7 +35,6 @@
)
from fidesops.ops.core.config import config
from fidesops.ops.db.database import init_db
from fidesops.ops.schemas.analytics import Event, ExtraData
from fidesops.ops.service.connectors.saas.connector_registry_service import (
load_registry,
registry_file,
Expand Down Expand Up @@ -116,17 +114,13 @@ async def prepare_and_log_request(
if config.root_user.analytics_opt_out:
return
await send_analytics_event(
AnalyticsEvent(
docker=in_docker_container(),
event=Event.endpoint_call.value,
event_created_at=event_created_at,
local_host=accessed_through_local_host(hostname),
endpoint=endpoint,
status_code=status_code,
error=error_class or None,
extra_data={ExtraData.fides_source.value: fides_source}
if fides_source
else None,
endpoint_call_analytics_event(
endpoint,
hostname,
status_code,
event_created_at,
fides_source,
error_class,
)
)

Expand Down Expand Up @@ -265,15 +259,7 @@ def start_webserver() -> None:
logger.info("Starting scheduled request intake...")
initiate_scheduled_request_intake()

asyncio.run(
send_analytics_event(
AnalyticsEvent(
docker=in_docker_container(),
event=Event.server_start.value,
event_created_at=datetime.now(tz=timezone.utc),
)
)
)
asyncio.run(send_analytics_event(server_start_analytics_event()))

if not config.execution.worker_enabled:
logger.info("Starting worker...")
Expand Down
12 changes: 11 additions & 1 deletion src/fidesops/ops/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from fidesops import __version__ as fidesops_version
from fidesops.ops.core.config import config
from fidesops.ops.util.wrappers import sync

logger = logging.getLogger(__name__)

Expand All @@ -35,8 +36,17 @@ def accessed_through_local_host(hostname: Optional[str]) -> bool:
)


@sync
async def sync_send_analytics_event_wrapper(event: AnalyticsEvent) -> None:
"""
Calling async functions within Celery tasks is not yet supported: https://github.com/celery/celery/issues/6552
so this sync wrapper is a workaround for that.
"""
await send_analytics_event(event)


async def send_analytics_event(event: AnalyticsEvent) -> None:
if config.root_user.analytics_opt_out:
if config.root_user.analytics_opt_out or not event:
return
try:
await analytics_client._AnalyticsClient__send( # pylint: disable=protected-access
Expand Down
83 changes: 83 additions & 0 deletions src/fidesops/ops/analytics_event_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from datetime import datetime, timezone
from typing import Any, Dict, Optional

from fideslog.sdk.python.event import AnalyticsEvent

from fidesops.ops.analytics import accessed_through_local_host, in_docker_container
from fidesops.ops.models.policy import ActionType
from fidesops.ops.models.privacy_request import PrivacyRequest
from fidesops.ops.schemas.analytics import Event, ExtraData


def rerun_graph_analytics_event(
data: Dict[str, Any],
step: ActionType,
) -> Optional[AnalyticsEvent]:
"""Sends an AnalyticsEvent to send to Fideslog with stats on how an access graph
has changed from the previous run if applicable"""

return AnalyticsEvent(
docker=in_docker_container(),
event="rerun_access_graph"
if step == ActionType.access
else "rerun_erasure_graph",
event_created_at=datetime.now(tz=timezone.utc),
local_host=None,
endpoint=None,
status_code=None,
error=None,
extra_data=data,
)


def failed_graph_analytics_event(
privacy_request: PrivacyRequest, exc: Optional[BaseException]
) -> Optional[AnalyticsEvent]:
"""Sends an AnalyticsEvent to send to Fideslog if privacy request execution has failed."""

data = {"privacy_request": privacy_request.id}

return AnalyticsEvent(
docker=in_docker_container(),
event="privacy_request_execution_failure",
event_created_at=datetime.now(tz=timezone.utc),
local_host=None,
endpoint=None,
status_code=500,
error=exc.__class__.__name__ if exc else None,
extra_data=data,
)


def server_start_analytics_event() -> Optional[AnalyticsEvent]:
"""Sends an AnalyticsEvent to send to Fideslog upon server start"""

return AnalyticsEvent(
docker=in_docker_container(),
event=Event.server_start.value,
event_created_at=datetime.now(tz=timezone.utc),
)


def endpoint_call_analytics_event(
endpoint: str,
hostname: Optional[str],
status_code: int,
event_created_at: datetime,
fides_source: Optional[str],
error_class: Optional[str],
) -> Optional[AnalyticsEvent]:
"""Sends an AnalyticsEvent to send to Fideslog upon endpoint calls"""

return AnalyticsEvent(
docker=in_docker_container(),
event=Event.endpoint_call.value,
event_created_at=event_created_at,
local_host=accessed_through_local_host(hostname),
endpoint=endpoint,
status_code=status_code,
error=error_class or None,
extra_data={ExtraData.fides_source.value: fides_source}
if fides_source
else None,
)
37 changes: 34 additions & 3 deletions src/fidesops/ops/api/v1/endpoints/privacy_request_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from fidesops.ops.graph.traversal import Traversal
from fidesops.ops.models.connectionconfig import ConnectionConfig
from fidesops.ops.models.datasetconfig import DatasetConfig
from fidesops.ops.models.email import EmailConfig
from fidesops.ops.models.manual_webhook import AccessManualWebhook
from fidesops.ops.models.policy import ActionType, CurrentStep, Policy, PolicyPreWebhook
from fidesops.ops.models.privacy_request import (
Expand All @@ -92,6 +93,7 @@
FidesopsEmail,
RequestReceiptBodyParams,
RequestReviewDenyBodyParams,
SubjectIdentityVerificationBodyParams,
)
from fidesops.ops.schemas.external_https import PrivacyRequestResumeFormat
from fidesops.ops.schemas.privacy_request import (
Expand All @@ -109,8 +111,12 @@
VerificationCode,
)
from fidesops.ops.service._verification import send_verification_code_to_user
from fidesops.ops.service.email.email_dispatch_service import dispatch_email_task
from fidesops.ops.service.email.email_dispatch_service import (
dispatch_email_task_generic,
dispatch_email_task_identity_verification,
)
from fidesops.ops.service.privacy_request.request_runner_service import (
generate_id_verification_code,
queue_privacy_request,
)
from fidesops.ops.service.privacy_request.request_service import (
Expand Down Expand Up @@ -283,6 +289,31 @@ async def create_privacy_request(
)


def _send_verification_code_to_user(
db: Session, privacy_request: PrivacyRequest, email: Optional[str]
) -> None:
"""Generate and cache a verification code, and then email to the user"""
EmailConfig.get_configuration(
db=db
) # Validates Fidesops is currently configured to send emails
verification_code: str = generate_id_verification_code()
privacy_request.cache_identity_verification_code(verification_code)
dispatch_email_task_identity_verification.apply_async(
queue=EMAIL_QUEUE_NAME,
kwargs={
"email_meta": FidesopsEmail(
action_type=EmailActionType.SUBJECT_IDENTITY_VERIFICATION,
body_params=SubjectIdentityVerificationBodyParams(
verification_code=verification_code,
verification_code_ttl_seconds=config.redis.identity_verification_code_ttl_seconds,
),
).dict(),
"to_email": email,
"privacy_request_id": privacy_request.id,
},
)


def _send_privacy_request_receipt_email_to_user(
policy: Optional[Policy], email: Optional[str]
) -> None:
Expand All @@ -305,7 +336,7 @@ def _send_privacy_request_receipt_email_to_user(
for action_type in ActionType:
if policy.get_rules_for_action(action_type=ActionType(action_type)):
request_types.add(action_type)
dispatch_email_task.apply_async(
dispatch_email_task_generic.apply_async(
queue=EMAIL_QUEUE_NAME,
kwargs={
"email_meta": FidesopsEmail(
Expand Down Expand Up @@ -1103,7 +1134,7 @@ def _send_privacy_request_review_email_to_user(
"Identity email was not found, so request review email could not be sent."
)
)
dispatch_email_task.apply_async(
dispatch_email_task_generic.apply_async(
queue=EMAIL_QUEUE_NAME,
kwargs={
"email_meta": FidesopsEmail(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@
Please locate and erase personally identifiable information for the data subject and records listed below:
</p>
{% for action_required in dataset_collection_action_required -%}
<p><b>{{ action_required.collection.collection }}</b></p>
<p><b>{{ action_required["collection"]["collection"] }}</b></p>
{% for action in action_required.action_needed -%}
<p>Locate the relevant records with:</p>
<ul>
{% for field, values in action.locators.items() -%}
{% for field, values in action["locators"].items() -%}
<li> Field: <i>{{ field }}</i>, Values: {{ values|join(', ') }} </li>
{%- endfor %}
</ul>
<p>Erase the following fields:</p>
{% if action.update -%}
{% if action["update"] -%}
<ul>
{% for field_name, masking_strategy in action.update.items() -%}
{% for field_name, masking_strategy in action["update"].items() -%}
<li><i>{{field_name}}</i></li>
{%- endfor %}
</ul>
Expand Down
52 changes: 2 additions & 50 deletions src/fidesops/ops/graph/analytics_events.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Dict, List, Optional

from fideslog.sdk.python.event import AnalyticsEvent

from fidesops.ops.analytics import in_docker_container, send_analytics_event
from fidesops.ops.core.config import config
from fidesops.ops.analytics_event_factory import rerun_graph_analytics_event
from fidesops.ops.graph.config import CollectionAddress
from fidesops.ops.graph.graph_differences import (
GraphDiffSummary,
Expand All @@ -21,22 +19,6 @@
from fidesops.ops.task.graph_task import GraphTask


async def fideslog_graph_failure(event: Optional[AnalyticsEvent]) -> None:
"""Send an Analytics Event if privacy request execution has failed"""
if config.root_user.analytics_opt_out or not event:
return

await send_analytics_event(event)


async def fideslog_graph_rerun(event: Optional[AnalyticsEvent]) -> None:
"""Send an Analytics Event if a privacy request has been reprocessed, comparing its graph to the previous graph"""
if config.root_user.analytics_opt_out or not event:
return

await send_analytics_event(event)


def prepare_rerun_graph_analytics_event(
privacy_request: PrivacyRequest,
env: Dict[CollectionAddress, "GraphTask"],
Expand Down Expand Up @@ -73,34 +55,4 @@ def prepare_rerun_graph_analytics_event(
data = graph_diff_summary.dict()
data["privacy_request"] = privacy_request.id

return AnalyticsEvent(
docker=in_docker_container(),
event="rerun_access_graph"
if step == ActionType.access
else "rerun_erasure_graph",
event_created_at=datetime.now(tz=timezone.utc),
local_host=None,
endpoint=None,
status_code=None,
error=None,
extra_data=data,
)


def failed_graph_analytics_event(
privacy_request: PrivacyRequest, exc: Optional[BaseException]
) -> Optional[AnalyticsEvent]:
"""Prepares an AnalyticsEvent to send to Fideslog if privacy request execution has failed."""

data = {"privacy_request": privacy_request.id}

return AnalyticsEvent(
docker=in_docker_container(),
event="privacy_request_execution_failure",
event_created_at=datetime.now(tz=timezone.utc),
local_host=None,
endpoint=None,
status_code=500,
error=exc.__class__.__name__ if exc else None,
extra_data=data,
)
return rerun_graph_analytics_event(data=data, step=step)
Loading