-
Notifications
You must be signed in to change notification settings - Fork 149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: PersistedShapeStream
with generic storage interface
#1630
Conversation
PersistedShapeStream
with generic storage interfacePersistedShapeStream
with generic storage interface
dd5da2d
to
9f3da50
Compare
As part of the work done for #1630 I had to introduce a `ShapeStreamInterface`, and generally make the client slightly more modular in its utilities. I've split out a bunch of things used and added unit tests for most (found some issues such as the `fetchWithBackoff` never terminating). I've also refactored the classes to use the [proper private fields](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Classes/Private_properties) in modern JS. --------- Co-authored-by: Kyle Mathews <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR introduces many changes which makes it hard to review and merge.
I'm mainly concerned with the way how the code changed to handle synchronous and asynchronous callbacks differently. The message processor used to handle every callback as an asynchronous callback (also synchronous ones). As a result, the code was clear and concise. However, this PR handles sync callbacks differently (mainly to avoid JS pushing synchronous continuations on the microtask queue). As a result, code is written in a continuation passing style everywhere which makes it harder to understand.
I had a brief discussion with @icehaunter which shared the concern and would also stick with await
for now. If performance is a concern here, we should benchmark a sync storage with the old implementation (that always awaits) and with the optimized implementation that doesn't await.
@@ -0,0 +1,44 @@ | |||
import { PromiseOr } from './types' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move the contents of this file to the helpers.ts
file?
@@ -0,0 +1,44 @@ | |||
import { PromiseOr } from './types' | |||
|
|||
export function isPromise<T>(promise: PromiseOr<T>): promise is Promise<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a more accurate name would be isThenable
as any object with a then
method will pass this check.
If you actually only want to test for promise, all you need is promise instanceof Promise
.
) | ||
} | ||
|
||
export function asyncOrCall<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this name slightly confusing. What about apply
?
export class ShapeStream<T extends Row = Row> | ||
implements ShapeStreamInterface<T> | ||
{ | ||
readonly #options: ShapeStreamOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wouldn't make the options private, especially since they are read only so there's no harm in making them public for introspectability.
@@ -108,3 +108,5 @@ export type TypedMessages<T extends Row = Row> = { | |||
messages: Array<Message<T>> | |||
schema: ColumnInfo | |||
} | |||
|
|||
export type PromiseOr<T> = T | Promise<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not a fan of the name. What do you think about MaybePromise<T>
?
export function compareOffset(offsetA: Offset, offsetB: Offset): 1 | 0 | -1 { | ||
const [oAx, oAy] = splitOffset(offsetA) | ||
const [oBx, oBy] = splitOffset(offsetB) | ||
if (oAx > oBx) return 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorting functions in JS can handle comparator functions that return values other than -1, 0, and 1.
This is very convenient for defining such comparator functions:
const [a1, a2] = splitOffset(offsetA)
const [b1, b2] = splitOffset(offsetB)
return a1 - b1 || a2 - b2
(i also renamed the variables because i found them hard to read)
#processingChain: PromiseOr<void> = undefined | ||
|
||
public process(callback: () => PromiseOr<void>): PromiseOr<void> { | ||
this.#processingChain = asyncOrCall( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not always process async like before, using await
even if the callback is not async?
That reduces the complexity of the code and the indirections to the helper functions such as asyncOrCall
.
this.#callback = callback | ||
} | ||
|
||
public process(messages: T): void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd keep it as it was before with the loop that took the next one out of the queue and applied it async: await this.#callback(messages)
.
subscribe( | ||
callback: (messages: Message<T>[]) => PromiseOr<void>, | ||
onError?: (error: FetchError | Error) => void | ||
): () => void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method and #hydrateStream
warrant some comments explaining what is happening as it is non-trivial to understand with the callbacks being passed around.
We could also filter out the extra operations to avoid the duplicates. That seems better so that on loading we can immediately send the sync call and not have to wait to load all the old data first. |
How about we put this on the back burner for some time? Being able to persist the shape log will be a nice feature at some point but it's not particularly urgent until we get a lot more users and some that definitely need this. Most users won't need this and I don't think it's worth a lot of time to benchmark and fine-tune this — especially if it requires a big refactor. |
Extracted out refactoring from #1630 ~~Had to also add a `subscribeSync` method on `ShapeStream` otherwise we cannot guarantee that `Shape` is up to date when the stream is up to date, since the processing is done asynchronously. We have three options:~~ ~~1. We re-introduce the `PromiseOr` abstractions in order for the single `subscribe` call to handle both sync and async~~ ~~2. We introduce an explicit `subcsribeSync` like I have done here, which provides strong guarantees on up-to-date synchronicity~~ ~~3. We change `Shape` to maintain it's own "up to date" logic as it is not guaranteed that when the stream is up to date that the `Shape` will have materialized its local state fully, since processing is async~~ Reintroduced thin version of sync + async handling in the queue to maintain the current API and intended behaviour. I've discussed with @kevin-dp and we think that actually making the processing callbacks create backpressure on the stream rather than the stream collecting the responses non-stop might be a better API and allow the developers to choose the pace at which the stream will collect data through the choice of processing callback. Will open a new PR for this, so we can merge this one with the refactor.
This is pretty out-of-date w/ main & not a priority so I'll close it for now. |
Addresses #1519
Followed by #1519 for more refactoring and improvements, split into two PRs for reviewability
Introduces a
PersistedShapeStream
- follows the newly definedShapeStreamInterface
so it can be used interchangeably, but requires:storage
option to be provided, implementing theShapeStreamStorage
interface, which is a key-value store oriented interfaceoffset
andshapeId
as the persisted shape stream cannot resume from an arbitrary offset and shapeId, it uses the persisted ones (although I suppose we could make it wipe the underlying store if anoffset
andshpaeId
are provided to simulate the exact same behaviour)What it does is it subscribes to the underlying
ShapeStream
and compacts entries to the provided store, very similar to whatShape
does. When someone subscribes to thePersistedShapeStream
, it looks into the store and returns the compacted entries asinsert
s and resumes from there.One tricky bit with a bit of a hacky solution is the persistence of the
offset
andshapeId
, needed to actually resume the stream. We could store them in a separatemetadata
store entry, but that might require either 1) custom key prefixes on the store to separate metadata and 2) deal with inconsistent state.For example, if metadata is stored separately from the operations and the store has no transactional guarantees (which the interface assumes), an operation might be stored with offset
10_3
, and the metadata fails to get stored because the app shuts down - so upon resuming the10_3
operation is fed to the subscriber but then the stream is resumed from the previous latest offset that we managed to persist (e.g.10_2
), duplicating the change.The workaround is that the latest stored
offset
can be retrieved by reading into the stored entries and comparing their offsets (assuming the offset is not opaque here, but we can do that by adding a timestamp otherwise) and using the most recent one - has an overhead of needing to read the whole store to resume but is foolproof and can also do consistency checks to decide if it actually can resume.The
shapeId
I've stored alongside the entries, but really only in one entry stored, such that when we iterate over the data to get the latest offset we will eventually find theshapeId
as well, but without needing to store it fully in every entry. Depending on the actual storage implementation this approach might be effective or slightly annoying. My main concern was to 1) avoid dealing with consistency with non-transactional stores and 2) not introduce separate storage APIs that people need to conform to.If someone wants to use a transactional storage engine, they might as well go to
PGlite
with thesync
plugin - I believe the use case for this is simpler key-value stores likelocalStorage
etc.