Skip to content

Commit

Permalink
Return rows as an AsyncStream instead of buffering. (#128)
Browse files Browse the repository at this point in the history
* Return rows as an `AsyncStream` instead of buffering.

This allows streaming large result sets instead of bufferring them.
The change is fairly invasive because the state machine has to be adapted to allow returning a `PgResponse` before the client is allowed to dispatch other requests on the connection.
This is handled in the dispatcher where it expects such `PgResponse`s to provide a signal to release the connection.
  • Loading branch information
plaflamme authored and dangerousben committed Aug 19, 2019
1 parent 4024240 commit 2938624
Show file tree
Hide file tree
Showing 17 changed files with 238 additions and 123 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/). Note that Semantic Versioning is not
necessarily followed during pre-1.0 development.

## <Next release>

* Select results can now be streamed as `AsyncStream[DataRow]` and result sets as `AsyncStream[Row]`

## 0.8.2
* Fix SSL session verification.
* Fix #75 - Name resolution failed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package com.twitter.finagle.postgres.generic

import scala.collection.immutable.Queue
import com.twitter.concurrent.AsyncStream

import scala.collection.immutable.Queue
import com.twitter.finagle.postgres.{Param, PostgresClient, Row}
import com.twitter.util.Future

import scala.language.existentials

case class Query[T](parts: Seq[String], queryParams: Seq[QueryParam], cont: Row => T) {
def run(client: PostgresClient): Future[Seq[T]] = {

def stream(client: PostgresClient): AsyncStream[T] = {
val (queryString, params) = impl
client.prepareAndQuery[T](queryString, params: _*)(cont)
client.prepareAndQueryToStream[T](queryString, params: _*)(cont)
}

def run(client: PostgresClient): Future[Seq[T]] =
stream(client).toSeq

def exec(client: PostgresClient): Future[Int] = {
val (queryString, params) = impl
client.prepareAndExecute(queryString, params: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.twitter.finagle.postgres.generic

import java.nio.charset.Charset

import com.twitter.concurrent.AsyncStream
import com.twitter.finagle.Status
import com.twitter.finagle.postgres.messages.SelectResult
import com.twitter.finagle.postgres._
Expand All @@ -18,27 +19,27 @@ class QuerySpec extends FreeSpec with Matchers with MockFactory {
val row = mock[Row]

trait MockClient {
def prepareAndQuery[T](sql: String, params: List[Param[_]], f: Row => T): Future[Seq[T]]
def prepareAndExecute(sql: String, params: List[Param[_]]): Future[Int]
def prepareAndQuery[T](sql: String, params: List[Param[_]], f: Row => T): Seq[T]
def prepareAndExecute(sql: String, params: List[Param[_]]): Int
}

val mockClient = mock[MockClient]

val client = new PostgresClient {

def prepareAndQuery[T](sql: String, params: Param[_]*)(f: (Row) => T): Future[Seq[T]] =
mockClient.prepareAndQuery(sql, params.toList, f)
def prepareAndQueryToStream[T](sql: String, params: Param[_]*)(f: (Row) => T): AsyncStream[T] =
AsyncStream.fromSeq(mockClient.prepareAndQuery(sql, params.toList, f))

def prepareAndExecute(sql: String, params: Param[_]*): Future[Int] =
mockClient.prepareAndExecute(sql, params.toList)
Future.value(mockClient.prepareAndExecute(sql, params.toList))

def fetch(sql: String): Future[SelectResult] = ???

def execute(sql: String): Future[OK] = ???

def charset: Charset = ???

def select[T](sql: String)(f: (Row) => T): Future[Seq[T]] = ???
def selectToStream[T](sql: String)(f: (Row) => T): AsyncStream[T] = ???

def close(): Future[Unit] = ???

Expand All @@ -56,7 +57,7 @@ class QuerySpec extends FreeSpec with Matchers with MockFactory {

def expectQuery[U](expectedQuery: String, expectedParams: Param[_]*)(query: Query[U]) = {
mockClient.prepareAndQuery[U] _ expects (expectedQuery, expectedParams.toList, *) onCall {
(q, p, fn) => Future.value(Seq(fn(row)))
(q, p, fn) => Seq(fn(row))
}
Await.result(query.run(client)).head
}
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/com/twitter/finagle/Postgres.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ object Postgres {
private class Dispatcher(transport: Transport[PgRequest, PgResponse], statsReceiver: StatsReceiver)
extends SerialClientDispatcher[PgRequest, PgResponse](transport, statsReceiver) {

override def dispatch(req: PgRequest, p: Promise[PgResponse]): Future[Unit] =
super.dispatch(req, p) before p.flatMap {
case s: AsyncPgResponse =>
// Only release the connection when the state machine has finished processing the events for this request
s.complete
case _ => Future.Done
}

override def apply(
req: PgRequest
): Future[PgResponse] = req match {
Expand Down
11 changes: 7 additions & 4 deletions src/main/scala/com/twitter/finagle/postgres/PostgresClient.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.twitter.finagle.postgres
import java.nio.charset.Charset

import com.twitter.concurrent.AsyncStream
import com.twitter.finagle.Status
import com.twitter.finagle.postgres.messages.SelectResult
import com.twitter.finagle.postgres.values.Types
Expand Down Expand Up @@ -41,14 +42,16 @@ trait PostgresClient {
/*
* Run a single SELECT query and wrap the results with the provided function.
*/
def select[T](sql: String)
(f: Row => T): Future[Seq[T]]
def select[T](sql: String)(f: Row => T): Future[Seq[T]] =
selectToStream(sql)(f).toSeq
def selectToStream[T](sql: String)(f: Row => T): AsyncStream[T]

/*
* Issue a single, prepared SELECT query and wrap the response rows with the provided function.
*/
def prepareAndQuery[T](sql: String, params: Param[_]*)
(f: Row => T): Future[Seq[T]]
def prepareAndQuery[T](sql: String, params: Param[_]*)(f: Row => T): Future[Seq[T]] =
prepareAndQueryToStream(sql, params: _*)(f).toSeq
def prepareAndQueryToStream[T](sql: String, params: Param[_]*)(f: Row => T): AsyncStream[T]

/*
* Issue a single, prepared arbitrary query without an expected result set, and provide the affected row count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.util.concurrent.atomic.AtomicInteger

import com.twitter.cache.Refresh
import com.twitter.concurrent.AsyncStream
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.postgres.messages._
import com.twitter.finagle.postgres.values._
Expand Down Expand Up @@ -51,24 +52,24 @@ class PostgresClientImpl(

val serviceF = factory.apply

val bootstrapTypes = Map(
Types.INT_4 -> ValueDecoder.int4,
Types.TEXT -> ValueDecoder.string
)
def extractTypes(response: PgResponse): Future[Map[Int, PostgresClient.TypeSpecifier]] =
response match {
case SelectResult(fields, rows) =>
val rowValues = ResultSet(fields, charset, rows, PostgresClient.defaultTypes, receiveFunctions).rows
rowValues.map {
row =>
row.get[Int]("oid") -> PostgresClient.TypeSpecifier(
row.get[String]("typreceive"),
row.get[String]("type"),
row.get[Int]("typelem"))
}.toSeq().map(_.toMap)
}

val customTypesResult = for {
service <- serviceF
response <- service.apply(PgRequest(Query(customTypesQuery)))
} yield response match {
case SelectResult(fields, rows) =>
val rowValues = ResultSet(fields, charset, rows, PostgresClient.defaultTypes, receiveFunctions).rows
rowValues.map {
row => row.get[Int]("oid") -> PostgresClient.TypeSpecifier(
row.get[String]("typreceive"),
row.get[String]("type"),
row.get[Int]("typelem"))
}.toMap
}
types <- extractTypes(response)
} yield types

customTypesResult.ensure {
serviceF.foreach(_.close())
Expand Down Expand Up @@ -141,25 +142,27 @@ class PostgresClientImpl(
/*
* Run a single SELECT query and wrap the results with the provided function.
*/
override def select[T](sql: String)(f: Row => T): Future[Seq[T]] = for {
types <- typeMap()
result <- fetch(sql)
} yield result match {
case SelectResult(fields, rows) => ResultSet(fields, charset, rows, types, receiveFunctions).rows.map(f)
}
override def selectToStream[T](sql: String)(f: Row => T): AsyncStream[T] =
AsyncStream.fromFuture {
for {
types <- typeMap()
SelectResult(fields, rows) <- fetch(sql)
} yield ResultSet(fields, charset, rows, types, receiveFunctions).rows.map(f)
}.flatten

/*
* Issue a single, prepared SELECT query and wrap the response rows with the provided function.
*/
override def prepareAndQuery[T](sql: String, params: Param[_]*)(f: Row => T): Future[Seq[T]] = {
typeMap().flatMap { _ =>
for {
service <- factory()
statement = new PreparedStatementImpl("", sql, service)
result <- statement.select(params: _*)(f)
} yield result
}
}
override def prepareAndQueryToStream[T](sql: String, params: Param[_]*)(f: Row => T): AsyncStream[T] =
AsyncStream.fromFuture {
typeMap().flatMap { _ =>
for {
service <- factory()
statement = new PreparedStatementImpl("", sql, service)
result <- statement.selectToStream(params: _*)(f)
} yield result
}
}.flatten

/*
* Issue a single, prepared arbitrary query without an expected result set, and provide the affected row count
Expand Down Expand Up @@ -292,7 +295,8 @@ class PostgresClientImpl(
exec <- execute()
} yield exec match {
case CommandCompleteResponse(rows) => OK(rows)
case Rows(rows, true) => ResultSet(fields, charset, rows, types, receiveFunctions)
case Rows(rows) =>
ResultSet(fields, charset, rows, types, receiveFunctions)
}
f.transform {
result =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.twitter.finagle.postgres

import com.twitter.concurrent.AsyncStream
import com.twitter.finagle.postgres.codec.Errors
import com.twitter.util.Future

Expand All @@ -14,10 +15,12 @@ trait PreparedStatement {
case ResultSet(_) => Future.exception(Errors.client("Update query expected"))
}

def select[T](params: Param[_]*)(f: Row => T): Future[Seq[T]] = fire(params: _*) map {
def selectToStream[T](params: Param[_]*)(f: Row => T): Future[AsyncStream[T]] = fire(params: _*) map {
case ResultSet(rows) => rows.map(f)
case OK(_) => Seq.empty[Row].map(f)
case OK(_) => AsyncStream.empty
}
def select[T](params: Param[_]*)(f: Row => T): Future[Seq[T]] =
selectToStream(params: _*)(f).flatMap(_.toSeq)

def selectFirst[T](params: Param[_]*)(f: Row => T): Future[Option[T]] =
select[T](params:_*)(f) flatMap { rows => Future.value(rows.headOption) }
Expand Down
8 changes: 3 additions & 5 deletions src/main/scala/com/twitter/finagle/postgres/Responses.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package com.twitter.finagle.postgres

import java.nio.charset.Charset

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

import com.twitter.finagle.postgres.messages.{DataRow, Field}
import com.twitter.finagle.postgres.values.ValueDecoder
import com.twitter.util.Try
import Try._
import com.twitter.concurrent.AsyncStream
import com.twitter.finagle.postgres.PostgresClient.TypeSpecifier
import com.twitter.finagle.postgres.codec.NullValue
import io.netty.buffer.ByteBuf
Expand Down Expand Up @@ -157,7 +155,7 @@ sealed trait QueryResponse

case class OK(affectedRows: Int) extends QueryResponse

case class ResultSet(rows: List[Row]) extends QueryResponse
case class ResultSet(rows: AsyncStream[Row]) extends QueryResponse

/*
* Helper object to generate ResultSets for responses with custom types.
Expand All @@ -166,7 +164,7 @@ object ResultSet {
def apply(
fields: Array[Field],
charset: Charset,
dataRows: List[DataRow],
dataRows: AsyncStream[DataRow],
types: Map[Int, TypeSpecifier],
receives: PartialFunction[String, ValueDecoder[T] forSome { type T }]
): ResultSet = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.twitter.finagle.postgres.codec

import com.twitter.finagle.postgres.messages.PgRequest
import com.twitter.finagle.postgres.messages.Error

/*
* An error generated by Postgres.
Expand Down Expand Up @@ -40,4 +41,9 @@ object Errors {
hint: Option[String] = None,
position: Option[String] = None) =
ServerError(message, request, severity, sqlState, detail, hint, position)
}

def server(error: Error, request: Option[PgRequest]): ServerError = {
import error._
server(msg.getOrElse("unknown failure"), request, severity, sqlState, detail, hint, position)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class HandleErrorsProxy(

def apply(request: PgRequest, service: Service[PgRequest, PgResponse]) = {
service.apply(request).flatMap {
case Error(msg, severity, sqlState, detail, hint, position) =>
Future.exception(Errors.server(msg.getOrElse("unknown failure"), Some(request), severity, sqlState, detail, hint, position))
case e: Error =>
Future.exception(Errors.server(e, Some(request)))
case Terminated =>
Future.exception(new ChannelClosedException())
case r => Future.value(r)
Expand Down
Loading

0 comments on commit 2938624

Please sign in to comment.