Skip to content

Commit

Permalink
Move json_stream function to common_tools
Browse files Browse the repository at this point in the history
  • Loading branch information
Wambere committed Mar 15, 2018
1 parent f73eb01 commit fc93cff
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 68 deletions.
26 changes: 6 additions & 20 deletions onadata/apps/api/viewsets/data_viewset.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from onadata.libs.utils.viewer_tools import get_enketo_edit_url
from onadata.libs.utils.api_export_tools import custom_response_handler
from onadata.libs.data import parse_int
from onadata.libs.utils.common_tools import json_stream
from onadata.apps.api.permissions import ConnectViewsetPermissions
from onadata.apps.api.tools import get_baseviewset_class
from onadata.apps.logger.models.instance import FormInactiveError
Expand Down Expand Up @@ -452,7 +453,7 @@ def set_object_list_and_total_count(
limit = limit if start is None or start == 0 else start + limit
self.object_list = filter_queryset_xform_meta_perms(
self.get_object(), self.request.user, self.object_list)
self.object_list = self.object_list[start: limit]
self.object_list = self.object_list[start:limit]
self.total_count = self.object_list.count()
elif (sort or limit or start or fields) and not is_public_request:
try:
Expand Down Expand Up @@ -513,27 +514,12 @@ def _get_streaming_response(self):
"""
Get a StreamingHttpResponse response object
"""
def json_stream(data):
yield u"["

data_iter = data.__iter__()
item = data_iter.next()
while True:
try:
next_item = data_iter.next()
yield json.dumps(
item.json if isinstance(item, Instance) else item)
yield ","
item = next_item
except StopIteration:
yield json.dumps(
item.json if isinstance(item, Instance) else item)
break

yield u"]"
def get_json_string(item):
return json.dumps(
item.json if isinstance(item, Instance) else item)

response = StreamingHttpResponse(
json_stream(self.object_list),
json_stream(self.object_list, get_json_string),
content_type="application/json"
)

Expand Down
27 changes: 7 additions & 20 deletions onadata/apps/api/viewsets/open_data_viewset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from onadata.libs.pagination import StandardPageNumberPagination
from onadata.libs.serializers.data_serializer import DataInstanceSerializer
from onadata.libs.serializers.open_data_serializer import OpenDataSerializer
from onadata.libs.utils.common_tools import json_stream
from onadata.libs.utils.csv_builder import CSVDataFrameBuilder

BaseViewset = get_baseviewset_class()
Expand Down Expand Up @@ -152,28 +153,14 @@ def data(self, request, **kwargs):
def _get_streaming_response(self, data):
"""Get a StreamingHttpResponse response object"""

def stream_json(streaming_data):
"""Generator function to stream JSON data"""
yield u"["

data_iter = streaming_data.__iter__()
item = data_iter.next()
while True:
try:
next_item = data_iter.next()
yield json.dumps({
re.sub(r"\W", r"_", a): b for a, b in item.items()})
yield ","
item = next_item
except StopIteration:
yield json.dumps({
re.sub(r"\W", r"_", a): b for a, b in item.items()})
break

yield u"]"
def get_json_string(item):
return json.dumps({
re.sub(r"\W", r"_", a): b for a, b in item.items()})

response = StreamingHttpResponse(
stream_json(data), content_type="application/json")
json_stream(data, get_json_string),
content_type="application/json"
)

# set headers on streaming response
for k, v in self.headers.items():
Expand Down
40 changes: 12 additions & 28 deletions onadata/apps/api/viewsets/xform_viewset.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
get_async_response,
process_async_export,
response_for_format)
from onadata.libs.utils.common_tools import json_stream
from onadata.libs.utils.csv_import import (get_async_csv_submission_status,
submit_csv, submit_csv_async)
from onadata.libs.utils.export_tools import parse_request_export_options
Expand Down Expand Up @@ -685,36 +686,19 @@ def _get_streaming_response(self):
"""
Get a StreamingHttpResponse response object
"""
def stream_json(data):
"""Generator function to stream JSON data"""
yield u"["

# use queryset_iterator. Will need to change this to the Django
# native .iterator() method when we upgrade to django version 2
# because in Django 2 .iterator() has support for chunk size
queryset = queryset_iterator(self.object_list, chunksize=2000)
data_iter = queryset.__iter__()
item = data_iter.next()
while True:
try:
next_item = data_iter.next()
yield json.dumps(XFormBaseSerializer(
instance=item,
context={'request': self.request}
).data)
yield ","
item = next_item
except StopIteration:
yield json.dumps(XFormBaseSerializer(
instance=item,
context={'request': self.request}
).data)
break

yield u"]"
# use queryset_iterator. Will need to change this to the Django
# native .iterator() method when we upgrade to django version 2
# because in Django 2 .iterator() has support for chunk size
queryset = queryset_iterator(self.object_list, chunksize=2000)

def get_json_string(item):
return json.dumps(XFormBaseSerializer(
instance=item,
context={'request': self.request}
).data)

response = StreamingHttpResponse(
stream_json(self.object_list),
json_stream(queryset, get_json_string),
content_type="application/json"
)

Expand Down
25 changes: 25 additions & 0 deletions onadata/libs/utils/common_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,28 @@ def get_response_content(response):
contents = response.content

return contents


def json_stream(data, json_string):
"""
Generator function to stream JSON data
"""

yield u"["

try:
data = data.__iter__()
item = data.next()
while item:
try:
next_item = data.next()
yield json_string(item)
yield ","
item = next_item
except StopIteration:
yield json_string(item)
break
except AttributeError:
pass

yield u"]"

0 comments on commit fc93cff

Please sign in to comment.