Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger Geopoint ES Index on Geospatial Feature Flag Enable #35126

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions corehq/apps/geospatial/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,5 @@
}
}
}

INDEX_ES_TASK_HELPER_BASE_KEY = 'geo_cases_index_cases'
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,23 @@ def index_case_docs(domain, query_limit=DEFAULT_QUERY_LIMIT, chunk_size=DEFAULT_
query = _es_case_query(domain, geo_case_property, case_type)
count = query.count()
print(f'{count} case(s) to process')
batch_count = 1
if query_limit:
batch_count = math.ceil(count / query_limit)
batch_count = get_batch_count(count, query_limit)
print(f"Cases will be processed in {batch_count} batches")
for i in range(batch_count):
print(f'Processing {i+1}/{batch_count}')
query = _es_case_query(domain, geo_case_property, case_type, size=query_limit)
case_ids = query.get_ids()
_index_case_ids(domain, case_ids, chunk_size)
process_batch(domain, geo_case_property, case_type, query_limit, chunk_size)


def get_batch_count(doc_count, query_limit):
if not query_limit:
return 1
return math.ceil(doc_count / query_limit)


def process_batch(domain, geo_case_property, case_type, query_limit, chunk_size):
query = _es_case_query(domain, geo_case_property, case_type, size=query_limit)
case_ids = query.get_ids()
_index_case_ids(domain, case_ids, chunk_size)


def _index_case_ids(domain, case_ids, chunk_size):
Expand Down
4 changes: 4 additions & 0 deletions corehq/apps/geospatial/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from corehq.util.quickcache import quickcache

from .dispatchers import CaseManagementMapDispatcher
from corehq.apps.geospatial.const import INDEX_ES_TASK_HELPER_BASE_KEY
from .es import (
BUCKET_CASES_AGG,
CASE_PROPERTIES_AGG,
Expand All @@ -38,6 +39,7 @@
geojson_to_es_geoshape,
get_geo_case_property,
validate_geometry,
get_celery_task_tracker,
)


Expand All @@ -59,12 +61,14 @@ class BaseCaseMapReport(ProjectReport, CaseListMixin, XpathCaseSearchFilterMixin
def template_context(self):
# Whatever is specified here can be accessed through initial_page_data
context = super(BaseCaseMapReport, self).template_context
celery_task_tracker = get_celery_task_tracker(self.domain, base_key=INDEX_ES_TASK_HELPER_BASE_KEY)
context.update({
'mapbox_access_token': settings.MAPBOX_ACCESS_TOKEN,
'saved_polygons': [
{'id': p.id, 'name': p.name, 'geo_json': p.geo_json}
for p in GeoPolygon.objects.filter(domain=self.domain).all()
],
'es_indexing_message': celery_task_tracker.get_message()
AmitPhulera marked this conversation as resolved.
Show resolved Hide resolved
})
return context

Expand Down
56 changes: 55 additions & 1 deletion corehq/apps/geospatial/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
from django.utils.translation import gettext as _

from corehq.util.decorators import serial_task

from corehq.apps.celery import task
from corehq.apps.geospatial.utils import CeleryTaskTracker, update_cases_owner
from corehq.apps.geospatial.const import INDEX_ES_TASK_HELPER_BASE_KEY
from corehq.apps.geospatial.utils import (
get_celery_task_tracker,
CeleryTaskTracker,
update_cases_owner,
get_geo_case_property,
)
from corehq.apps.geospatial.management.commands.index_geolocation_case_properties import (
Copy link
Contributor

@kaapstorm kaapstorm Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It feels awkward for the tasks module to be importing from a management command, instead of the other way round (or both importing from somewhere else, but I'd prefer not to throw everything in the utils module, like that second drawer in the kitchen that somehow ends up not only with tongs and skewers, but also clothes pegs, and screws, and one rusty paper clip).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a good alternative would be to move the es_case_query function off to the es.py file as I feel it would fit well there. This can then be renamed to something like es_case_query_for_missing_geopoint_val or something to that effect.

Alternatively, I can also simply create a file in a new "utils" directory to contain just the above helper function. I do feel the first option makes sense enough and is a bit more straightforward, but happy to go in this direction as well if you think it sounds more suitable @kaapstorm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The es module sounds ideal for es_case_query! Yeah, I agree, the function is worth renaming too, and you could drop the "es_" prefix because we would know from its module that it would be an Elasticsearch query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in a161b10.

_es_case_query,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're importing a protected function somewhere other than a test, then it's possibly no longer being treated as protected and you should drop the leading underscore. -- This is a guideline, not a rule, so I'm not requesting a change, and I'll leave it to your discretion ... but interesting that the linter didn't flag this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I have always been a little unsure as to how strict we should be when it comes to marking functions in Python as private. Is having simply a single external reference enough justification, or do we need to start referencing it a few times before it becomes obvious that this is clearly a public function (I have seen plenty of examples in HQ that use both examples)? I suppose this is the potential downside of having it more as a guideline than a rule, it can be difficult to gauge where the line lies at times.

Thinking this through however, the former does make more sense since even a single external reference clearly means its not private/self-contained anymore. Given this, I agree it would make sense to drop the _ here.

get_batch_count,
process_batch,
DEFAULT_QUERY_LIMIT,
DEFAULT_CHUNK_SIZE,
)

from settings import MAX_GEOSPATIAL_INDEX_DOC_LIMIT


@task(queue="background_queue", ignore_result=True)
Expand All @@ -9,3 +28,38 @@ def geo_cases_reassignment_update_owners(domain, case_owner_updates_dict, task_k
finally:
celery_task_tracker = CeleryTaskTracker(task_key)
celery_task_tracker.mark_completed()


@serial_task('async-index-es-docs', timeout=30 * 60, queue='background_queue', ignore_result=True)
ajeety4 marked this conversation as resolved.
Show resolved Hide resolved
def index_es_docs_with_location_props(domain):
celery_task_tracker = get_celery_task_tracker(domain, INDEX_ES_TASK_HELPER_BASE_KEY)
if celery_task_tracker.is_active():
return

geo_case_prop = get_geo_case_property(domain)
query = _es_case_query(domain, geo_case_prop)
doc_count = query.count()
if doc_count > MAX_GEOSPATIAL_INDEX_DOC_LIMIT:
celery_task_tracker.set_message(
_('This domain contains too many cases and so they will not be made available '
'for use by this feature. Please reach out to support.')
)
return

celery_task_tracker.mark_requested()
batch_count = get_batch_count(doc_count, DEFAULT_QUERY_LIMIT)
try:
for i in range(batch_count):
progress = (i / batch_count) * 100
celery_task_tracker.set_message(
_(f'Cases are being made ready for use by this feature. Please be patient. ({progress}%)')
AmitPhulera marked this conversation as resolved.
Show resolved Hide resolved
)
process_batch(
domain,
geo_case_prop,
case_type=None,
query_limit=DEFAULT_QUERY_LIMIT,
chunk_size=DEFAULT_CHUNK_SIZE,
)
finally:
Copy link
Contributor

@ajeety4 ajeety4 Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking of a situation when an exception occurs while processing the cases. This would mark the task as completed in the tracker.
I think it would be good to handle this scenario.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the same logic as the task to async update owners. If we want to be a bit more cautious here, we could do a generic Exception catch, and then mark the tracker as having an error. I would need to slightly think through the tracker's current usage though, as we have no way of determining what error message to show when.

Copy link
Contributor

@ajeety4 ajeety4 Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is more of a safeguard thing. Good point about the task to async update owners. I think it would have been good to have the error marked and stored for that as well. Not very sure what norm is followed in HQ, however a quick search shows handling exception is a case by case basis.

I would recommend for this indexing task considering if it throws a exception, the pending cases would never be processed and will not be available for the usage by the feature.
Agreed, that the tracker would need to be updated to store error message as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, handling an exception here more gracefully would be a good idea. Thinking through this however, I'm a little unsure about trying to store a message in the redis cache again. We would need to store the raw string in redis and then translate it when rendering it on the front-end, and I'm not entirely confident or sure whether pulling a string from the redis cache into a variable and then trying to translate it would work correctly.

@ajeety4 What do you think of extending the status system to have the ability for custom error statuses instead? For the mark_as_error function we could pass in an optional slug to append to the end of the "ERROR" string (e.g. "ERROR_CELERY"). Having different error statuses would allow us to know which message to show on the front-end.

Copy link
Contributor

@ajeety4 ajeety4 Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely confident or sure whether pulling a string from the redis cache into a variable and then trying to translate it would work correctly.

Great catch. You are right on this, As per django docs,
The caveat with using variables or computed values, as in the previous two examples, is that Django’s translation-string-detecting utility, django-admin makemessages, won’t be able to find these strings

What do you think of extending the status system to have the ability for custom error statuses instead? For the mark_as_error function we could pass in an optional slug to append to the end of the "ERROR" string (e.g. "ERROR_CELERY")

This is a good idea. I feel like a cleaner approach would be to use a separate key error_slug instead of using the task_key while marking it is an error. This way it keeps the choices for the task_key predictable while giving the flexibility to the consumer to set error_slug of their choice.
That being said, I am good with initial approach if this makes things complicated.

Copy link
Contributor Author

@zandre-eng zandre-eng Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find! Doing message strings is definitely out then.

use a separate key error_slug instead of using the task_key while marking it is an error

Do you mean that the task_key would then only have "ACTIVE" or "ERROR" as its states and then we would keep the error_slug as a separate field, combining the two if the task_key is an error? Something like:

def get_status(self):
    status = self._client.get(self.task_key)
    if status == 'ERROR':
        status += f'_{self._client.get(self.error_slug))'
    return {
        'status': status,
        'progress': self.get_progress(),
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes correct. What do you think of returning the error_slug as well as shown below ? This would not then require to append or deappend a prefix

def get_status(self):
    status = self._client.get(self.task_key)
    return {
        'status': status,
        'error_slug': self._client.get(self.error_slug) if status == 'ERROR' else None,
        'progress': self.get_progress(),
    }


def mark_as_error(self, error_slug=None, timeout=ONE_DAY * 3):
    if error_slug:
        self._client.set(self.error_slug_key, error_slug)
    return self._client.set(self.task_key, 'ERROR', timeout=timeout)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea, it would then keep it completely clean from the task_key. This looks like quite a small lift, so I'll implement as such.

Copy link
Contributor Author

@zandre-eng zandre-eng Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in ded63ee.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3 Great discussion.

celery_task_tracker.mark_completed()
1 change: 1 addition & 0 deletions corehq/apps/geospatial/templates/case_grouping_map.html
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{% load hq_shared_tags %}

{% block reportcontent %}
{% include 'geospatial/partials/index_alert.html' %}
<div class="row panel">
<div class="col col-md-2">
<span id="lock-groups-controls">
Expand Down
1 change: 1 addition & 0 deletions corehq/apps/geospatial/templates/case_management.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{% load i18n %}

{% block reportcontent %}
{% include 'geospatial/partials/index_alert.html' %}
<div class="panel panel-default" id="user-filters-panel">
<div class="panel-body collapse in" aria-expanded="true">
<legend>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
{% registerurl 'location_search' domain %}
{% registerurl 'reassign_cases' domain %}
{% endblock %}
{% include 'geospatial/partials/index_alert.html' %}
ajeety4 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% load i18n %}

{% if es_indexing_message %}
<div class="alert alert-info">
<p>
{{ es_indexing_message }}
</p>
</div>
{% endif %}
1 change: 1 addition & 0 deletions corehq/apps/geospatial/templates/geospatial/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
{% initial_page_data 'road_network_algorithm_slug' road_network_algorithm_slug %}

<form id="geospatial-config-form" class="form-horizontal disable-on-submit ko-template" method="post">
{% include 'geospatial/partials/index_alert.html' %}
{% crispy form %}
</form>
{% endblock %}
2 changes: 2 additions & 0 deletions corehq/apps/geospatial/templates/gps_capture_view.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
{% registerurl 'paginate_mobile_workers' domain %}
{% initial_page_data 'case_types_with_gps' case_types_with_gps %}
{% initial_page_data 'couch_user_username' couch_user_username %}
{% registerurl 'geo_polygons' domain %}
ajeety4 marked this conversation as resolved.
Show resolved Hide resolved

{% include 'geospatial/partials/index_alert.html' %}
<ul id="tabs-list" class="nav nav-tabs">
<li data-bind="click: onclickAction" class="active"><a data-toggle="tab" href="#tabs-cases">{% trans 'Update Case Data' %}</a></li>
<li data-bind="click: onclickAction"><a data-toggle="tab" href="#tabs-users">{% trans 'Update Mobile Worker Data' %}</a></li>
Expand Down
19 changes: 18 additions & 1 deletion corehq/apps/geospatial/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ def get_geo_user_property(domain):
return config.user_location_property_name


def get_celery_task_tracker(domain, base_key):
AmitPhulera marked this conversation as resolved.
Show resolved Hide resolved
task_key = f'{base_key}_{domain}'
message_key = f'{base_key}_message_{domain}'
AmitPhulera marked this conversation as resolved.
Show resolved Hide resolved
return CeleryTaskTracker(task_key, message_key)


def _format_coordinates(lat, lon):
return f"{lat} {lon} 0.0 0.0"

Expand Down Expand Up @@ -221,8 +227,9 @@ class CeleryTaskTracker(object):
Simple Helper class using redis to track if a celery task was requested and is not completed yet.
"""

def __init__(self, task_key):
def __init__(self, task_key, message_key=None):
AmitPhulera marked this conversation as resolved.
Show resolved Hide resolved
self.task_key = task_key
self.message_key = message_key
self._client = get_redis_client()

def mark_requested(self, timeout=ONE_DAY):
Expand All @@ -234,4 +241,14 @@ def is_active(self):
return self._client.has_key(self.task_key)

def mark_completed(self):
self.clear_message()
return self._client.delete(self.task_key)

def get_message(self):
return self._client.get(self.message_key)

def set_message(self, message, timeout=ONE_DAY * 3):
return self._client.set(self.message_key, message, timeout=timeout)

def clear_message(self):
return self._client.delete(self.message_key)
21 changes: 18 additions & 3 deletions corehq/apps/geospatial/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@
from corehq.form_processor.models import CommCareCase
from corehq.util.timezones.utils import get_timezone

from .const import GPS_POINT_CASE_PROPERTY, POLYGON_COLLECTION_GEOJSON_SCHEMA
from .const import (
GPS_POINT_CASE_PROPERTY,
POLYGON_COLLECTION_GEOJSON_SCHEMA,
INDEX_ES_TASK_HELPER_BASE_KEY,
)
from .models import GeoConfig, GeoPolygon
from .utils import (
CaseOwnerUpdate,
Expand All @@ -59,13 +63,24 @@
set_case_gps_property,
set_user_gps_property,
update_cases_owner,
get_celery_task_tracker,
)


def geospatial_default(request, *args, **kwargs):
return HttpResponseRedirect(CaseManagementMap.get_url(*args, **kwargs))


class BaseGeospatialView(BaseDomainView):

@property
def main_context(self):
context = super().main_context
celery_task_tracker = get_celery_task_tracker(self.domain, base_key=INDEX_ES_TASK_HELPER_BASE_KEY)
context['es_indexing_message'] = celery_task_tracker.get_message()
return context


class CaseDisbursementAlgorithm(BaseDomainView):
urlname = "case_disbursement"

Expand Down Expand Up @@ -151,7 +166,7 @@ def delete(self, request, *args, **kwargs):
})


class BaseConfigView(BaseDomainView):
class BaseConfigView(BaseGeospatialView):
section_name = _("Data")

@method_decorator(toggles.GEOSPATIAL.required_decorator())
Expand Down Expand Up @@ -229,7 +244,7 @@ def page_context(self):
return context


class GPSCaptureView(BaseDomainView):
class GPSCaptureView(BaseGeospatialView):
urlname = 'gps_capture'
template_name = 'gps_capture_view.html'

Expand Down
11 changes: 10 additions & 1 deletion corehq/toggles/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2555,13 +2555,22 @@ def _handle_attendance_tracking_role(domain, is_enabled):
save_fn=_handle_attendance_tracking_role,
)


def _handle_geospatial_es_index(domain, is_enabled):
from corehq.apps.geospatial.es import index_es_docs_with_location_props

if is_enabled:
index_es_docs_with_location_props.delay(domain)


GEOSPATIAL = StaticToggle(
'geospatial',
'Allows access to GIS functionality',
TAG_SOLUTIONS_LIMITED,
namespaces=[NAMESPACE_DOMAIN],
description='Additional views will be added allowing for visually viewing '
'and assigning cases on a map.'
'and assigning cases on a map.',
save_fn=_handle_geospatial_es_index,

)

Expand Down