Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

register_output_renderer() should support streaming data #1101

Open
simonw opened this issue Nov 24, 2020 · 14 comments
Open

register_output_renderer() should support streaming data #1101

simonw opened this issue Nov 24, 2020 · 14 comments

Comments

@simonw
Copy link
Owner

simonw commented Nov 24, 2020

I'd like to implement this by first extending the register_output_renderer() hook to support streaming huge responses, then switching CSV to use the plugin hook in addition to TSV using it.

Originally posted by @simonw in #1096 (comment)

@simonw
Copy link
Owner Author

simonw commented Nov 24, 2020

Current design: https://docs.datasette.io/en/stable/plugin_hooks.html#register-output-renderer-datasette

@hookimpl
def register_output_renderer(datasette):
    return {
        "extension": "test",
        "render": render_demo,
        "can_render": can_render_demo,  # Optional
    }

Where render_demo looks something like this:

async def render_demo(datasette, columns, rows):
    db = datasette.get_database()
    result = await db.execute("select sqlite_version()")
    first_row = " | ".join(columns)
    lines = [first_row]
    lines.append("=" * len(first_row))
    for row in rows:
        lines.append(" | ".join(row))
    return Response(
        "\n".join(lines),
        content_type="text/plain; charset=utf-8",
        headers={"x-sqlite-version": result.first()[0]}
    )

Meanwhile here's where the CSV streaming mode is implemented:

async def stream_fn(r):
nonlocal data
writer = csv.writer(LimitedWriter(r, self.ds.config("max_csv_mb")))
first = True
next = None
while first or (next and stream):
try:
if next:
kwargs["_next"] = next
if not first:
data, _, _ = await self.data(request, database, hash, **kwargs)
if first:
await writer.writerow(headings)
first = False
next = data.get("next")
for row in data["rows"]:
if any(isinstance(r, bytes) for r in row):
new_row = []
for column, cell in zip(headings, row):
if isinstance(cell, bytes):
# If this is a table page, use .urls.row_blob()
if data.get("table"):
pks = data.get("primary_keys") or []
cell = self.ds.absolute_url(
request,
self.ds.urls.row_blob(
database,
data["table"],
path_from_row_pks(row, pks, not pks),
column,
),
)
else:
# Otherwise generate URL for this query
cell = self.ds.absolute_url(
request,
path_with_format(
request=request,
format="blob",
extra_qs={
"_blob_column": column,
"_blob_hash": hashlib.sha256(
cell
).hexdigest(),
},
replace_format="csv",
),
)
new_row.append(cell)
row = new_row
if not expanded_columns:
# Simple path
await writer.writerow(row)
else:
# Look for {"value": "label": } dicts and expand
new_row = []
for heading, cell in zip(data["columns"], row):
if heading in expanded_columns:
if cell is None:
new_row.extend(("", ""))
else:
assert isinstance(cell, dict)
new_row.append(cell["value"])
new_row.append(cell["label"])
else:
new_row.append(cell)
await writer.writerow(new_row)
except Exception as e:
print("caught this", e)
await r.write(str(e))
return
content_type = "text/plain; charset=utf-8"
headers = {}
if self.ds.cors:
headers["Access-Control-Allow-Origin"] = "*"
if request.args.get("_dl", None):
content_type = "text/csv; charset=utf-8"
disposition = 'attachment; filename="{}.csv"'.format(
kwargs.get("table", database)
)
headers["content-disposition"] = disposition
return AsgiStream(stream_fn, headers=headers, content_type=content_type)

@simonw
Copy link
Owner Author

simonw commented Nov 24, 2020

The trick I'm using here is to follow the next_url in order to paginate through all of the matching results. The loop calls the data() method multiple times, once for each page of results:

if next:
kwargs["_next"] = next
if not first:
data, _, _ = await self.data(request, database, hash, **kwargs)

@simonw simonw added this to the Datasette 1.0 milestone Dec 17, 2020
@simonw
Copy link
Owner Author

simonw commented Jan 6, 2021

Yet another use-case for this: I want to be able to stream newline-delimited JSON in order to better import into Pandas:

pandas.read_json("https://latest.datasette.io/fixtures/compound_three_primary_keys.json?_shape=array&_nl=on", lines=True)

@simonw
Copy link
Owner Author

simonw commented Jan 6, 2021

Idea: instead of returning a dictionary, register_output_renderer could return an object. The object could have the following properties:

  • .extension - the extension to use
  • .can_render(...) - says if it can render this
  • .can_stream(...) - says if streaming is supported
  • async .stream_rows(rows_iterator, send) - method that loops through all rows and uses send to send them to the response in the correct format

I can then deprecate the existing dict return type for 1.0.

@simonw
Copy link
Owner Author

simonw commented Jan 6, 2021

With this structure it will become possible to stream non-newline-delimited JSON array-of-objects too - the stream_rows() method could output [ first, then each row followed by a comma, then ] after the very last row.

@eyeseast
Copy link
Contributor

This would really help with this issue: eyeseast/datasette-geojson#7

@simonw
Copy link
Owner Author

simonw commented Jun 28, 2021

Relevant blog post: https://simonwillison.net/2021/Jun/25/streaming-large-api-responses/ - including notes on efficiently streaming formats with some kind of separator in between the records (regular JSON).

Some export formats are friendlier for streaming than others. CSV and TSV are pretty easy to stream, as is newline-delimited JSON.

Regular JSON requires a bit more thought: you can output a [ character, then output each row in a stream with a comma suffix, then skip the comma for the last row and output a ]. Doing that requires peeking ahead (looping two at a time) to verify that you haven't yet reached the end.

Or... Martin De Wulf pointed out that you can output the first row, then output every other row with a preceeding comma---which avoids the whole "iterate two at a time" problem entirely.

@simonw
Copy link
Owner Author

simonw commented Apr 21, 2022

Maybe the simplest design for this is to add an optional can_stream to the contract:

    @hookimpl
    def register_output_renderer(datasette):
        return {
            "extension": "tsv",
            "render": render_tsv,
            "can_render": lambda: True,
            "can_stream": lambda: True
        }

When streaming, a new parameter could be passed to the render function - maybe chunks - which is an iterator/generator over a sequence of chunks of rows.

Or it could use the existing rows parameter but treat that as an iterator?

@eyeseast
Copy link
Contributor

What if you split rendering and streaming into two things:

  • render is a function that returns a response
  • stream is a function that sends chunks, or yields chunks passed to an ASGI send callback

That way current plugins still work, and streaming is purely additive. A stream function could get a cursor or iterator of rows, instead of a list, so it could more efficiently handle large queries.

@simonw
Copy link
Owner Author

simonw commented Apr 21, 2022

I'm questioning if the mechanisms should be separate at all now - a single response rendering is really just a case of a streaming response that only pulls the first N records from the iterator.

It probably needs to be an async for iterator, which I've not worked with much before. Good opportunity to learn.

This actually gets a fair bit more complicated due to the work I'm doing right now to improve the default JSON API:

I want to do things like make faceting results optionally available to custom renderers - which is a separate concern from streaming rows.

I'm going to poke around with a bunch of prototypes and see what sticks.

@simonw
Copy link
Owner Author

simonw commented Apr 21, 2022

The datasette-geojson plugin is actually an interesting case here, because of the way it converts SpatiaLite geometries into GeoJSON: https://github.com/eyeseast/datasette-geojson/blob/602c4477dc7ddadb1c0a156cbcd2ef6688a5921d/datasette_geojson/__init__.py#L61-L66

    if isinstance(geometry, bytes):
        results = await db.execute(
            "SELECT AsGeoJSON(:geometry)", {"geometry": geometry}
        )
        return geojson.loads(results.single_value())

That actually seems to work really well as-is, but it does worry me a bit that it ends up having to execute an extra SELECT query for every single returned row - especially in streaming mode where it might be asked to return 1m rows at once.

My PostgreSQL/MySQL engineering brain says that this would be better handled by doing a chunk of these (maybe 100) at once, to avoid the per-query-overhead - but with SQLite that might not be necessary.

At any rate, this is one of the reasons I'm interested in "iterate over this sequence of chunks of 100 rows at a time" as a potential option here.

Of course, a better solution would be for datasette-geojson to have a way to influence the SQL query before it is executed, adding a AsGeoJSON(geometry) clause to it - so that's something I'm open to as well.

@eyeseast
Copy link
Contributor

Ha! That was your idea (and a good one).

But it's probably worth measuring to see what overhead it adds. It did require both passing in the database and making the whole thing async.

Just timing the queries themselves:

  1. Using AsGeoJSON(geometry) as geometry takes 10.235 ms
  2. Leaving as binary takes 8.63 ms

Looking at the network panel:

  1. Takes about 200 ms for the fetch request
  2. Takes about 300 ms

I'm not sure how best to time the GeoJSON generation, but it would be interesting to check. Maybe I'll write a plugin to add query times to response headers.

The other thing to consider with async streaming is that it might be well-suited for a slower response. When I have to get the whole result and send a response in a fixed amount of time, I need the most efficient query possible. If I can hang onto a connection and get things one chunk at a time, maybe it's ok if there's some overhead.

@simonw
Copy link
Owner Author

simonw commented Jan 21, 2023

Idea for supporting streaming with the register_output_renderer hook:

@hookimpl
def register_output_renderer(datasette):
    return {
        "extension": "test",
        "render": render_demo,
        "can_render": can_render_demo,
        "render_stream": render_demo_stream, # This is new
    }

So there's a new "render_stream" key which can be returned, which if present means that the output renderer supports streaming.

I'll play around with the design of that function signature in:

@fgregg
Copy link
Contributor

fgregg commented May 9, 2024

Idea: instead of returning a dictionary, register_output_renderer could return an object. The object could have the following properties:

* `.extension` - the extension to use

* `.can_render(...)` - says if it can render this

* `.can_stream(...)` - says if streaming is supported

* `async .stream_rows(rows_iterator, send)` - method that loops through all rows and uses `send` to send them to the response in the correct format

I can then deprecate the existing dict return type for 1.0.

I really like this. One thing that would be great to add would be the ability to cancel the rows_iterator. The use case I have is to have a datatables extension that would wrap the orginal sql query in a new sql query like select * from ({original_query}) as t where [..... conditions from datatables server side api].

if the streaming data was truly lazy, then this would be easy, I would just throw away the original row_iterator, and then construct my own. This could handle many of the use cases of #2000.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants