Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return rows as an AsyncStream instead of buffering. #128

Merged
merged 6 commits into from
Aug 19, 2019

Conversation

plaflamme
Copy link
Contributor

This allows streaming large result sets instead of bufferring them.
The change is fairly invasive because the state machine has to be adapted to allow returning a PgResponse before the client is allowed to dispatch other requests on the connection.
This is handled in the dispatcher where it expects such PgResponses to provide a signal to release the connection.

NOTE: this doesn't apply the strategy to prepared statements yet. Looking for feedback before moving forward.

This allows streaming large result sets instead of bufferring them.
The change is fairly invasive because the state machine has to be adapted to allow returning a `PgResponse` before the client is allowed to dispatch other requests on the connection.
This is handled in the dispatcher where it expects such `PgResponse`s to provide a signal to release the connection.

NOTE: this doesn't apply the strategy to prepared statements yet. Looking for feedback before moving forward.
@plaflamme
Copy link
Contributor Author

@jeremyrsmith I'm not sure if this is something the project wants, but I believe it better represents Postgres' protocol since it produces rows one at a time on the wire. This is also why I took the liberty of breaking the API: it should probably represent the protocol as closely as possible. This can obviously be avoided, but it seemed more appropriate to me.

Would it be possible let me know if this has a chance of getting merged (once complete) before I continue onto the prepared statement version?

@jeremyrsmith
Copy link
Collaborator

@plaflamme In general I think it's good to support streaming results. My only concern would be the overhead of AsyncStream in the case of a small set of results (which I think is overwhelmingly common). Would it be possible to introduce the streaming functionality as a new API instead?

I'd also defer to the more active maintainers (i.e. @leonmaia) because I haven't used (or worked on, really) this library for a while.

@plaflamme
Copy link
Contributor Author

@jeremyrsmith Thanks for the feedback. I pinged you because you're listed as the current maintainer in the README.md file https://github.com/finagle/finagle-postgres/blame/master/README.md#L61

As for the overhead, what are you thinking of exactly? I can see that AsyncStream will produce more garbage, but in terms of performance that's trickier since the main cost is probably elsewhere anyway (the query itself for example). Making a separate API would be cumbersome to maintain IMO (that was my first implementation, and it basically requires duplicating things.

What could be done is a configurable buffering when building the AsyncStream. So instead of producing new AsyncStream for each row, we could do that for every n rows, where n is configurable by the client (say through a Stack param or otherwise). This could be set to 0 if someone wishes to not buffer at all and get the behaviour here back, or set to Int.Max to get the (mostly equivalent) previous behaviour. WDYT?

@leonmaia please let me know if this has any chance of getting merged, I'll continue the implementation.

@jeremyrsmith
Copy link
Collaborator

@plaflamme I meant overhead both in the performance sense and also in the API sense. The API is relatively straightforward with Seq, but when it goes to AsyncStream then there's (at least) an extra layer of Future between you and the rows, which is a bit disruptive.

I agree with the motivation, though, and FWIW I think it's the right way to go. It's going to be a bit more complicated to do it right, since there are some new design decisions to make:

  • Should it be only push-based, only pull-based, or support both? For pull, you'll want to revamp the protocol we use (i.e. use named portals in the backend).
  • Should we also support LISTEN/NOTIFY with AsyncStream? See previous attempt adds listen/notify support #69
  • What about moving data into postgres?

Etc.

@plaflamme
Copy link
Contributor Author

@jeremyrsmith I agree that the stream is cumbersome in the simple case. I added this flavour to the high-level API: https://github.com/finagle/finagle-postgres/pull/128/files#diff-107a2fd6c9e304147a40852aea34c910R46 It's not much, but it's not nothing :) Other such helper methods could be provided as well for other common access patterns.

  • push vs. pull: I believe I'll hit this case when I go down the prepared statement case, is that correct? When you say pull you mean that pulling rows out of the backend is driven by the client consuming them? If so, I believe this will happen naturally by the nature of AsyncStream's tail being lazy, but yeah, that'd require writing back to the connection from a transition in the state machine which is not happening anywhere at the moment.

  • For LISTEN/NOTIFY, I'm not familiar with this api, I'd have to take a closer look, but looking at the previous PR, I think we can implement LISTEN/NOTIFY with a combination of Broker and AsyncStream. Each connection would have a Broker where it sends NOTIFY to and each LISTEN would be an AsyncStream view of those. Not sure how the channel fits in here, but at a high level that seems to work. This is definitely a completely separate concern from this PR though.

  • Moving data into the backend: totally agree, we could also use AsyncStream here (assuming the Postgresql protocol supports it, which it probably does). Again, can be done as a separate PR.

@plaflamme
Copy link
Contributor Author

@leonmaia does this have any chance of getting merged? I'd rather close it otherwise.

@plaflamme
Copy link
Contributor Author

@leonmaia Looks like you might have taken a look from the emoji on the previous comment. Any updates on this?

This also applies the streaming of rows to the extended query mode.
@plaflamme
Copy link
Contributor Author

@jeremyrsmith I've added the remaining parts to also stream rows in the "extended query" mode. I've also undone the API breakage. Please take a look.

(f: Row => T): Future[Seq[T]]
def select[T](sql: String)(f: Row => T): Future[Seq[T]] =
selectToStream(sql)(f).flatMap(_.toSeq)
def selectToStream[T](sql: String)(f: Row => T): Future[AsyncStream[T]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can this be called selectStream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing... Though should we also rename prepareAndQueryToStream to prepareAndQueryStream? I find that somewhat questionable, but I don't really have a strong opinion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I agree with you about that... I'm not totally sure TBH. I don't want to bikeshed the API too much, as long as it works people probably don't care all that much 😀

Let me give it some thought

@jeremyrsmith
Copy link
Collaborator

Overall I think this looks quite nice. It does change some internal machinery a bit (most likely for the better 😄) and so there's some risk of breakage in real-world scenarios. I'd be for releasing this as a new minor version.

It would be good to hear from @leonmaia (... ping 😄) since I don't use finagle or postgres these days and thus I don't really have a dog in the race.

@@ -26,8 +27,9 @@ class QuerySpec extends FreeSpec with Matchers with MockFactory {

val client = new PostgresClient {

def prepareAndQuery[T](sql: String, params: Param[_]*)(f: (Row) => T): Future[Seq[T]] =
def prepareAndQueryToStream[T](sql: String, params: Param[_]*)(f: (Row) => T): Future[AsyncStream[T]] =
Copy link

@YarekTyshchenko YarekTyshchenko Aug 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, AsyncStream already incorporates a Future. Its possible to construct an AsyncStream[T] directly from Future[Seq[T]] without wrapping it in the extra future

Copy link
Collaborator

@dangerousben dangerousben left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 from me. I've tested this on an internal project and not seen any regressions. I think it would make sense to flatten Future[AsyncStream[T]] to just AsyncStream[T]. Also would be good to add a streaming method to Query but that could be done later.

CHANGELOG.md Outdated
## <Next release>

* Select results are now exposed as `AsyncStream[DataRow]` and result sets as `AsyncStream[Row]`
* incompatible change: `PostgresClient.select` now returns `Future[AsyncStream[T]]` instead of `Future[Seq[T]]`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now out of date.

@plaflamme
Copy link
Contributor Author

@YarekTyshchenko Thanks for the feedback, I've added a commit to flatten out the Future please take a look at 33d3cd7

@dangerousben Thanks for the feedback and testing this! I've added commits to address your comments, pleas take a look at 7035ea7 and 4c5703b

@dangerousben
Copy link
Collaborator

All looks good. @leonmaia I'll merge this unless I hear an objection.

@dangerousben dangerousben merged commit 2938624 into finagle:master Aug 19, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants