Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Writes implementation #305

Merged
merged 7 commits into from
Jul 29, 2024

Conversation

H-Plus-Time
Copy link
Contributor

The interface for writing to streams so far is:

const table = wasm.readParquet(buf);
const outputStream = wasm.writeParquetStream(table);
// sink the contents somewhere useful, like to a server that supports streaming POST requests.
await fetch(url, {
  method: 'POST',
  body: outputStream,
  duplex: 'half',
});

// or perhaps to the filesystem
const writableStream = await fileHandle.createWritable();
await outputStream.pipeTo(writableStream); // 🤯

I suspect it won't take that much to get this to take an FFI table (do we need unsafe impl From<FFITable> for Table in the arrow-wasm repo for that?).

What I'd really like this to do is close the loop on a workflow similar to:

flowchart LR
    subgraph geoarrow["Bunch of geoarrow transforms"]
    direction TB
    op0["op(batch)"] -.-> opn["op(batch)"]
    end
    A[FS] -->|"readParquetStream"| B("Stream&lt;RecordBatch&gt;") --> geoarrow
    geoarrow -->|"writeParquetStream"| C[FS]
Loading

Just taking a table is only somewhat useful (you still have to keep the batches that make up the table in memory until the write stream finishes), and tbh is mostly a placeholder until I figure out JsCast properly.

A few design questions come to mind:

  1. The writer_async function(s) always return ReadableStreams - is there any appetite for doing stuff to the parquet bytes before they cross the WASM<->JS boundary? The only thing I can really think of is streaming checksums.

This definitely needs a variant that takes a rust stream of RecordBatches (similar to the read functions), in part to avoid the cost of bouncing pointers out to JS, but much more importantly, remove a lot of JS-side orchestration. It's still subject to the above proviso (the output is always a ReadableStream, since the only real place to direct the output is to IO of some form).

@kylebarron
Copy link
Owner

I suspect it won't take that much to get this to take an FFI table (do we need unsafe impl From<FFITable> for Table in the arrow-wasm repo for that?).

Yeah I left the conversions from FFI objects to normal objects for a follow up.

Also related is that I started working on a minimal JavaScript representation of the Arrow C Data Interface arrays in kylebarron/arrow-js-ffi#45. In particular, I think that will make it easier to write a table from JS memory into Wasm memory, without the memory overhead of going through an IPC buffer. Also it should allow bundle-size conscious libraries to forgo Arrow JS, at the cost of making the data harder to work with. I think it'll make sense especially for wasm-focused Arrow applications.

What I'd really like this to do is close the loop on a workflow similar to:

That's pretty cool! I guess that's pseudocode like

const readStream = await wasm.readParquetStream(url);
const outputStream = await wasm.writeParquetStream(schema);

for (await table of readStream) {
    outputStream.write(table);
}

I haven't actually used streams in JS much, so I'm not too familiar with the mechanics of piping the stream somewhere.

Just taking a table is only somewhat useful (you still have to keep the batches that make up the table in memory until the write stream finishes)

Seems ideal to have a stream writer like:

  • Initialize the writer stream with the output schema
  • Write instances of Table or RecordBatch to the stream.

Then you don't have to ever materialize the entire table into memory, though it does require you to know the output schema in advance.

  1. The writer_async function(s) always return ReadableStreams - is there any appetite for doing stuff to the parquet bytes before they cross the WASM<->JS boundary?

Yeah I don't really know what you'd do to the parquet bytes

This definitely needs a variant that takes a rust stream of RecordBatches (similar to the read functions), in part to avoid the cost of bouncing pointers out to JS, but much more importantly, remove a lot of JS-side orchestration. It's still subject to the above proviso (the output is always a ReadableStream, since the only real place to direct the output is to IO of some form).

I need to think about this more.....

Comment on lines 58 to 64
// Need to create an encoding for each column
let mut encodings = vec![];
for _ in &schema.fields {
// Note, the nested encoding is for nested Parquet columns
// Here we assume columns are not nested
encodings.push(vec![encoding]);
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a separate note that this was kinda a hack and at some point I need to come back and make sure that the encoding is compatible for each type in the schema

@H-Plus-Time
Copy link
Contributor Author

H-Plus-Time commented Aug 31, 2023

That's pretty cool! I guess that's pseudocode like

More or less, I got a bit carried away there with mermaid :S.

Technically the idiomatic way to do it in JS is fooStream.pipeThrough(transformStream) (just the transforming from record batches -> bytes bit. sinking to a a socket/file, etc typically involves await resultStream.pipeTo(writableStream)). The only problem with that is it's strictly struct/object based (there's no way to do it without a state container), and really doesn't gel all that well with Rust (or for that matter, other quasi-iterables in JS). The alternative, that dispenses with all of that, is to just do this:

const byteStream = transformFunc(fooStream) // -> produces a ReadableStream.
await byteStream.pipeTo(writableStream);

Then you don't have to ever materialize the entire table into memory, though it does require you to know the output schema in advance.

It turns out StreamExt::Peekable, provided you assume the schema doesn't change across record batches (I've been bitten by that on very rare occasions), allows this to be done statelessly (most recent commit has the relevant code - peek at the first batch + schema().into_inner()).

The only thing left at this point is figuring out a way to do this:

#[wasm_bindgen]
pub fn foo(opaque: JsValue) -> RecordBatch {}

(The non-generic ReadableStream type (which is always interpreted as emitting JsValue) is the main impediment).
This (rustwasm/wasm-bindgen#3554) looks like it's very close to merging into wasm-bindgen.

@kylebarron
Copy link
Owner

I think I'll have to read that a few times to really understand it.

The only thing left at this point is figuring out a way to do this:

Essentially I envision converting a record batch from Arrow JS to Wasm to be:

import {schemaFromArrowJS, arrayFromArrowJS, schemaToFFI, arrayToFFI} from 'arrow-js-ffi';
import {malloc} from 'parquet-wasm';

const recordBatch = arrow.RecordBatch();
const nanoArrowSchema = schemaFromArrowJS(recordBatch.schema())
const nanoArrowArray = arrayFromArrowJS(recordBatch.toStruct())
const schemaFFIPointer = schemaToFFI(nanoArrowSchema);
const arrayFFIPointer = arrayToFFI(nanoArrowArray);
const wasmRecordBatch = wasm.RecordBatch.from_pointers(arrayFFIPointer, schemaFFIPointer);

Then on the rust side, this wasmRecordBatch struct holds an arrow RecordBatch inside it.

Does this answer your question at all?

@H-Plus-Time
Copy link
Contributor Author

H-Plus-Time commented Sep 2, 2023

To be honest, miscommunication on my part (too terse), this:

#[wasm_bindgen]
pub fn foo(opaque: JsValue) -> RecordBatch {}

really should have included the full context, namely:

#[wasm_bindgen]
pub async fn foo_readable_stream_consumer(input: wasm_streams::readable::sys::ReadableStream) -> {
    let rs_stream: wasm_streams::ReadableStream<Result<JsValue, JsValue>> = wasm_streams::ReadableStream::from_raw(input).into_stream();
    // type annotation above cannot be altered because the ReadableStream type is non-generic.
    while let Some(item) = rs_stream.next().await {
      let recovered_rs_item: RecordBatch = item.dyn_into(); // JsValue dynamic casting - 
      // requires a somewhat unreliable (and unwieldy) JsCast impl. Also this is really intended for 
      // well-known JS classes, rather than JS classes that wrap rust structs
    }
}

The syntax we really want for this is:

let recovered_rs_item: RecordBatch = item.try_into().unwrap();

The additional codegen output in rustwasm/wasm-bindgen#3554 gives us exactly that, for any and all wasm-bindgen'd structs (as well as Vec<Struct>). The bonus here (that I actually wasn't expecting) is that it allows you to transfer ownership of a struct out to JS (out of the purview of rust's borrowchecker :( ) and back in (where the borrow checker kicks back into gear and frees it once it goes out of scope), neutering the JS wrapper class in the process. The extra bindgen code is surprisingly minimal really, it boils down to this:

class RecordBatch {
    static __unwrap(jsValue) {
        if (!(jsValue instanceof RecordBatch)) {
            return 0;
        }
        return jsValue.__destroy_into_raw();
    }
    /* etc */
}
module.exports.__wbg_recordbatch_unwrap = function(arg0) {
    const ret = RecordBatch.__unwrap(takeObject(arg0));
    return ret;
};

Happy to leave this PR open in draft until the wasm-bindgen PR is merged, repin to wasm-bindgen 0.2.88 and undraft (also gives me a chance to figure out the equivalent with arrow1 (and adapt to the arrow1 table abstraction)).

@kylebarron
Copy link
Owner

fwiw, I plan to deprecate arrow2 at some point, so I wouldn't put any more emphasis on that

@andresgutgon
Copy link

Hi @H-Plus-Time what's the status of this PR? I would love to give it a try #542

@H-Plus-Time
Copy link
Contributor Author

It needs a bit of re-work, but as far as I can tell, it's mostly renaming and moving files around (i.e. ignore all of arrow2, move the new writer_async file in arrow1 to just /, etc).

I'll have a bit of time for that this weekend, hang tight.

Copy link

github-actions bot commented Jun 18, 2024

Asset Sizes

Asset Uncompressed Size Compressed Size
async_full/parquet_wasm_bg.wasm 5.46MB $\color{red}\textbf{+72.8KB +1\%}$ 1.27MB $\color{red}\textbf{+273B +0\%}$
slim/parquet_wasm_bg.wasm 3.55MB $\color{red}\textbf{+6.97KB +0\%}$ 568KB $\color{red}\textbf{+649B +0\%}$
sync/parquet_wasm_bg.wasm 4.72MB $\color{red}\textbf{+5.9KB +0\%}$ 1.04MB $\color{red}\textbf{+6.92KB +1\%}$

Copy link
Owner

@kylebarron kylebarron left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks awesome! Just a few questions

src/writer_async.rs Outdated Show resolved Hide resolved
use parquet::arrow::async_writer::AsyncArrowWriter;
use wasm_bindgen_futures::spawn_local;

pub fn transform_parquet_stream(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have a docstring here

src/writer_async.rs Outdated Show resolved Hide resolved
let mut pinned_stream = std::pin::pin!(adapted_stream);
let first_batch = pinned_stream.as_mut().peek().await.unwrap();
let schema = first_batch.schema().into_inner();
// Need to create an encoding for each column
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have a helper that creates encodings given a WriterProperties instance plus an Arrow schema

let mut writer =
AsyncArrowWriter::try_new(writable_stream.compat(), schema, options).unwrap();
while let Some(batch) = pinned_stream.next().await {
let _ = writer.write(&batch.into()).await;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have to assign the output to something? Can we have just writer.write(&batch.into()).await? Are these infallible APIs?

let schema = first_batch.schema().into_inner();
// Need to create an encoding for each column
let mut writer =
AsyncArrowWriter::try_new(writable_stream.compat(), schema, options).unwrap();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it hard to have proper error handling from within this async transform? Can we remove this unwrap and as many others as we can?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat - everything outside the while loop is surfaceable as part of the call itself (I've tied the return of the stream to successful writer creation).

Inside, that's somewhat trickier - ideally we would abort the stream, but that seems like it will take a fair amount of gymnastics.

src/wasm.rs Outdated

#[wasm_bindgen(js_name = "transformParquetStream")]
#[cfg(all(feature = "writer", feature = "async"))]
pub fn transform_parquet_stream(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is transform the usual name for writing to a stream? E.g. why is this not write_parquet_stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is transform the usual name for writing to a stream? E.g. why is this not write_parquet_stream?

I've had a bit of time to think this one through, and it largely boils down to convention/platform semantics, basically the use-case is split between three scenarios:

ReadableStream<RecordBatch> -> ReadableStream<ArrayBuffer>

The Streams spec preferences the pipeThrough approach, which involves any of:

  1. inputStream.pipeThrough(new TransformStream({transform(), start(), flush()})) - best for trivial transforms that occur in JS (e.g. uppercasing text)
  2. inputStream.pipeThrough(new TransformSubClassStream(...params)) - for more involved internals that merit defining a class, or browser-provided native subclasses (e.g. CompressionStream, TextDecoderStream).
  3. inputStream.pipeThrough({writable, readable}) - any object with a writable and readable property (both of the correct type). Useful when you can't explicitly subclass from TransformStream.

It turns out that the following design for a TransformStream works flawlessly (the second internal TransformStream seems to have near-zero overhead):

flowchart TB
 subgraph B1["TransformStream"]
    direction TB
        f1["readable"]
        i1["writable"]
  end
 subgraph B2["TransformStream"]
    direction TB
        f2["readable"]
        i2["writable"]
  end
 subgraph TOP["ParquetEncoderStream"]
    direction LR
        B1
        nm["Stream&lt;RecordBatch&gt;"]
        ny[["Loop:<br>Encode Parquet"]]
        B2
  end
    i1 --> f1
    i2 --> f2
    A["Input Stream:<br>ReadableStream&lt;RecordBatch&gt;"] -- pipeThrough --> TOP
    B1 -- dyn_into --> nm
    nm --> ny
    ny --> B2
    TOP --> B["Output Stream:<br>ReadableStream&lt;ArrayBuffer&gt;"]
    style i1 fill:#FFD600,color:#000000
    style f2 fill:#FFD600,color:#000000
Loading

(yellow blocks being wasm-bindgen members of the struct, accessible from JS)

Table -> ReadableStream<ArrayBuffer>

Quite similar to a callable equivalent of Response::body - you'd typically provide Table::stream(), Table::parquetStream() for method-based apis, or toParquetStream(Table) for functional apis.

Rust: Stream<RecordBatch> -> ReadableStream<ArrayBuffer>

For downstream rust dependencies that expose something along the lines of runPipeline(targetUrl), or foo_state_machine.toParquetStream() to JS, and need a means of getting the final rust stream of record batches back out to a readable stream of bytes. Used by the JS-facing transform stream internally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think with all that said, we'd have 3 exports:

  • JS, ParquetEncoderStream, with usage inputStream.pipeThrough(new ParquetEncoderStream(writerProperties))
  • JS, writeParquetStream, with usage: writeParquetStream(inputTable)
  • RS, to_parquet_stream, with usage: to_parquet_stream(rs_stream_of_record_batches)

The sticking point on the last two exports is really that you can't do function overloads, and reasonably accurate distinguished function names would be too verbose (e.g. writeTableToParquetStream/writeTableParquetStream, writeStreamToParquetStream 😬 )

src/wasm.rs Outdated Show resolved Hide resolved
…or cases in transform_parquet_stream. TODO: eliminate un-handled result returns (writer.write, sender.send, writer.close).
@@ -249,3 +249,28 @@ pub async fn read_parquet_stream(
});
Ok(wasm_streams::ReadableStream::from_stream(stream).into_raw())
}

#[wasm_bindgen(js_name = "transformParquetStream")]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a docstring here? Potentially include an example too? (The docstring will be included in the typescript-generated typedefs and seen by JS users)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe include an example of how you can pass in a File handle? So you can write out to a Parquet file on disk without materializing the buffer in memory?

Comment on lines +103 to +104
const stream = await wasm.transformParquetStream(originalStream);
const accumulatedBuffer = new Uint8Array(await new Response(stream).arrayBuffer());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, how would you write this to a file in Node? Can you pass the stream object to a node file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out to be quite ergonomic:

const destinationWritable = Writable.toWeb(handle.createWriteStream());
await outputStream.pipeTo(destinationWritable);

alternatively:

await handle.writeFile(outputStream)

Deno's version of the former is pretty succinct too:

await outputStream.pipeTo(handle.writable);

(there's ~10 different ways to do it (e.g. fs.writeFile(path, inputStream) is just a shortcut for explicitly creating a stream.Writable and piping to it), these strike a reasonable balance).

Copy link
Owner

@kylebarron kylebarron left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again! Just a couple questions and I'd love to have a docstring included, since I don't understand this usage well enough to write the docstring myself 😅

@H-Plus-Time
Copy link
Contributor Author

Alright, I think that should do it as far as docstrings are concerned (I may have gone a tad overboard with that last one 🤷 ).

One thing that came up in my (sadly stalled, progress checked in under streaming-writes-refactor) efforts to get this into a first class TransformStream interface re tests:

Generally, testing stuff like 'given garbage input, function X should throw a reasonable error' is pretty onerous / not worth it, with one (very nasty) caveat I encountered: wasm-bindgen code that corrupts the JS-side heap 😱 during error-handling, silently killing all subsequent calls to the module. Doesn't typically happen in code that steers away from the sharper edges of js-sys, but certainly one to watch out for.

@kylebarron kylebarron merged commit b7c6e8a into kylebarron:main Jul 29, 2024
7 checks passed
@kylebarron
Copy link
Owner

Amazing, thank you!

@kylebarron
Copy link
Owner

wasm-bindgen code that corrupts the JS-side heap 😱 during error-handling, silently killing all subsequent calls to the module. Doesn't typically happen in code that steers away from the sharper edges of js-sys, but certainly one to watch out for.

Yikes, that seems pretty bad

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants