-
-
Notifications
You must be signed in to change notification settings - Fork 691
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Research running SQL in table view in parallel using asyncio.gather()
#1723
Comments
Here's the diff I'm using: diff --git a/datasette/views/table.py b/datasette/views/table.py
index d66adb8..f15ef1e 100644
--- a/datasette/views/table.py
+++ b/datasette/views/table.py
@@ -1,3 +1,4 @@
+import asyncio
import itertools
import json
@@ -5,6 +6,7 @@ import markupsafe
from datasette.plugins import pm
from datasette.database import QueryInterrupted
+from datasette import tracer
from datasette.utils import (
await_me_maybe,
CustomRow,
@@ -150,6 +152,16 @@ class TableView(DataView):
default_labels=False,
_next=None,
_size=None,
+ ):
+ with tracer.trace_child_tasks():
+ return await self._data_traced(request, default_labels, _next, _size)
+
+ async def _data_traced(
+ self,
+ request,
+ default_labels=False,
+ _next=None,
+ _size=None,
):
database_route = tilde_decode(request.url_vars["database"])
table_name = tilde_decode(request.url_vars["table"])
@@ -159,6 +171,20 @@ class TableView(DataView):
raise NotFound("Database not found: {}".format(database_route))
database_name = db.name
+ # For performance profiling purposes, ?_parallel=1 turns on asyncio.gather
+ async def _gather_parallel(*args):
+ return await asyncio.gather(*args)
+
+ async def _gather_sequential(*args):
+ results = []
+ for fn in args:
+ results.append(await fn)
+ return results
+
+ gather = (
+ _gather_parallel if request.args.get("_parallel") else _gather_sequential
+ )
+
# If this is a canned query, not a table, then dispatch to QueryView instead
canned_query = await self.ds.get_canned_query(
database_name, table_name, request.actor
@@ -174,8 +200,12 @@ class TableView(DataView):
write=bool(canned_query.get("write")),
)
- is_view = bool(await db.get_view_definition(table_name))
- table_exists = bool(await db.table_exists(table_name))
+ is_view, table_exists = map(
+ bool,
+ await gather(
+ db.get_view_definition(table_name), db.table_exists(table_name)
+ ),
+ )
# If table or view not found, return 404
if not is_view and not table_exists:
@@ -497,33 +527,44 @@ class TableView(DataView):
)
)
- if not nofacet:
- for facet in facet_instances:
- (
+ async def execute_facets():
+ if not nofacet:
+ # Run them in parallel
+ facet_awaitables = [facet.facet_results() for facet in facet_instances]
+ facet_awaitable_results = await gather(*facet_awaitables)
+ for (
instance_facet_results,
instance_facets_timed_out,
- ) = await facet.facet_results()
- for facet_info in instance_facet_results:
- base_key = facet_info["name"]
- key = base_key
- i = 1
- while key in facet_results:
- i += 1
- key = f"{base_key}_{i}"
- facet_results[key] = facet_info
- facets_timed_out.extend(instance_facets_timed_out)
-
- # Calculate suggested facets
+ ) in facet_awaitable_results:
+ for facet_info in instance_facet_results:
+ base_key = facet_info["name"]
+ key = base_key
+ i = 1
+ while key in facet_results:
+ i += 1
+ key = f"{base_key}_{i}"
+ facet_results[key] = facet_info
+ facets_timed_out.extend(instance_facets_timed_out)
+
suggested_facets = []
- if (
- self.ds.setting("suggest_facets")
- and self.ds.setting("allow_facet")
- and not _next
- and not nofacet
- and not nosuggest
- ):
- for facet in facet_instances:
- suggested_facets.extend(await facet.suggest())
+
+ async def execute_suggested_facets():
+ # Calculate suggested facets
+ if (
+ self.ds.setting("suggest_facets")
+ and self.ds.setting("allow_facet")
+ and not _next
+ and not nofacet
+ and not nosuggest
+ ):
+ # Run them in parallel
+ facet_suggest_awaitables = [
+ facet.suggest() for facet in facet_instances
+ ]
+ for suggest_result in await gather(*facet_suggest_awaitables):
+ suggested_facets.extend(suggest_result)
+
+ await gather(execute_facets(), execute_suggested_facets())
# Figure out columns and rows for the query
columns = [r[0] for r in results.description] |
And some simple benchmarks with
|
I realized that seeing the total time in queries wasn't enough to understand this, because if the queries were executed in serial or parallel it should still sum up to the same amount of SQL time (roughly). Instead I need to know how long the page took to render. But that's hard to display on the page since you can't measure it until rendering has finished! So I built an ASGI plugin to handle that measurement: https://github.com/simonw/datasette-total-page-time And with that plugin installed, While |
asyncio.gather()
asyncio.gather()
Spun off from:
The text was updated successfully, but these errors were encountered: