-
Notifications
You must be signed in to change notification settings - Fork 16
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is working well now Sean, nice work getting the all the config and the worker pieces sorted out.
my lingering concern is that that all tests were passing in earlier rounds of CR while there were several celery pieces still broken. Have you thought about ways to end-to-end test this reliably?
from fidesops.tasks.scheduled.scheduler import scheduler | ||
from fidesops.util.async_util import run_async |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surfacing this again @seanpreston
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me @seanpreston thanks for adding the extra step of caching the request id and asserting this in tests
def get_async_execution_task(self) -> Optional[AsyncResult]: | ||
"""Returns a task reflecting the state of this privacy request's asynchronous execution.""" | ||
task_id = self.get_cached_task_id() | ||
res: AsyncResult = AsyncResult(task_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If task_id is None, perhaps the cache has expired, this will throw an error - I see we're just using this in tests right now though.
assert pr.get_cached_task_id() is not None | ||
assert pr.get_async_execution_task() is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for adding these assertions
def cache_task_id(self, task_id: str) -> None: | ||
"""Sets a task_id for this privacy request's asynchronous execution.""" | ||
cache: FidesopsRedis = get_cache() | ||
cache.set( | ||
get_async_task_tracking_cache_key(self.id), | ||
task_id, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: this is stored in the same db index as privacy-request resources, but celery itself is using a different index (1) , keys prefixed with celery-task-meta-*
def queue_privacy_request( | ||
privacy_request_id: str, | ||
from_webhook_id: Optional[str] = None, | ||
from_step: Optional[str] = None, | ||
) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea here, since we've got this extra step now of caching the task idea
A follow-up, i imagine we'll also need to add some sane defaults for task concurrency. The code this replaces limited the number of concurrent background threads that could be utilized, because in light testing we could open up too many connections to databases. |
Updates the way privacy requests are dispatched into processing from a background process into a Celery task
Purpose
Changes
PrivacyRequestRunner.run
intorun_privacy_request
PrivacyRequestRunner
Checklist
CHANGELOG.md
fileCHANGELOG.md
file is being appended toUnreleased
section in an appropriate category. Add a new category from the list at the top of the file if the needed one isn't already there.Run Unsafe PR Checks
label has been applied, and checks have passed, if this PR touches any external servicesTicket
Fixes #632