Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Fix satellite fetching race condition #514

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 49 additions & 39 deletions rdwatch/core/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ def cancel_generate_images_task(model_run_id: UUID4) -> None:
@shared_task
def generate_site_images(
site_id: UUID4,
satellite_fetching_id: int,
constellation=['WV'], # noqa
force=False, # forced downloading found_timestamps again
dayRange=14,
Expand All @@ -608,44 +609,21 @@ def generate_site_images(
pointArea: float = pointAreaDefault,
worldview_source: Literal['cog', 'nitf'] | None = 'cog',
):
siteeval = SiteEvaluation.objects.get(pk=site_id)
with transaction.atomic():
# Use select_for_update here to lock the SatelliteFetching row
# for the duration of this transaction in order to ensure its
# status doesn't change out from under us
fetching_task = (
SatelliteFetching.objects.select_for_update().filter(site=siteeval).first()
)
if fetching_task is not None:
# If the task already exists and is running, return a 409 and do not
# start another one.
if fetching_task.status == SatelliteFetching.Status.RUNNING:
return 409, 'Image generation already in progress.'
# Otherwise, if the task exists but is *not* running, set the status
# to running and kick off the task
fetching_task.status = SatelliteFetching.Status.RUNNING
fetching_task.timestamp = datetime.now()
fetching_task.save()
else:
fetching_task = SatelliteFetching.objects.create(
site=siteeval,
timestamp=datetime.now(),
status=SatelliteFetching.Status.RUNNING,
)
task_id = get_siteobservation_images_task.delay(
site_id,
constellation,
force,
dayRange,
noData,
overrideDates,
scale,
bboxScale,
pointArea,
worldview_source,
)
fetching_task.celery_id = task_id.id
fetching_task.save()
task_id = get_siteobservation_images_task.delay(
site_id,
constellation,
force,
dayRange,
noData,
overrideDates,
scale,
bboxScale,
pointArea,
worldview_source,
)
SatelliteFetching.objects.filter(pk=satellite_fetching_id).update(
celery_id=task_id.id
)


@shared_task
Expand All @@ -660,10 +638,42 @@ def generate_site_images_for_evaluation_run(
bboxScale: float = BboxScaleDefault,
pointArea: float = pointAreaDefault,
):
sites = SiteEvaluation.objects.filter(configuration=model_run_id)
# Get all sites that are not currently fetching images
sites = (
SiteEvaluation.objects.filter(
configuration=model_run_id,
)
.alias(
satellite_fetching_status=Subquery(
SatelliteFetching.objects.filter(site=OuterRef('pk')).values('status')[
:1
],
)
)
.exclude(satellite_fetching_status=SatelliteFetching.Status.RUNNING)
.select_related('satellite_fetching')
)

# Create a new SatelliteFetching task for each site, batched
# into groups of 100 to avoid overloading the Celery worker's
# memory
for site_batch in ichunked(sites.iterator(), 100):
SatelliteFetching.objects.bulk_create(
[
SatelliteFetching(
site=site_eval,
timestamp=timezone.now(),
status=SatelliteFetching.Status.RUNNING,
)
for site_eval in site_batch
if site_eval.satellite_fetching is None
]
)

for eval in sites.iterator():
generate_site_images.delay(
eval.pk,
eval.satellite_fetching.id,
constellation,
force,
dayRange,
Expand Down
24 changes: 24 additions & 0 deletions rdwatch/core/views/site_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,32 @@ def get_site_observation_images(
scalVal = params.scale
if params.scale == 'custom':
scalVal = params.scaleNum

site_eval = get_object_or_404(SiteEvaluation, pk=evaluation_id)

with transaction.atomic():
satellite_fetching = (
SatelliteFetching.objects.select_for_update().filter(site=site_eval).first()
)
if satellite_fetching is not None:
# If the task already exists and is running, return a 409 and do not
# start another one.
if satellite_fetching.status == SatelliteFetching.Status.RUNNING:
return 409, 'Image generation already in progress.'
# Otherwise, if the task exists but is *not* running, set the status
# to running and kick off the task
satellite_fetching.status = SatelliteFetching.Status.RUNNING
satellite_fetching.timestamp = datetime.now()
satellite_fetching.save()
else:
fetching_task = SatelliteFetching.objects.create(
site=site_eval,
timestamp=datetime.now(),
status=SatelliteFetching.Status.RUNNING,
)
generate_site_images.delay(
evaluation_id,
fetching_task.pk,
params.constellation,
params.force,
params.dayRange,
Expand Down
Loading