Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Postgres] Add support for LISTEN/NOTIFY #131

Merged
merged 51 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
c7d416a
Add Connection::begin
mehcode Mar 11, 2020
7a98253
postgres: clean up protocol
mehcode Mar 11, 2020
27cd552
mysql: clean up protocol
mehcode Mar 11, 2020
cca0963
Run rustfmt
mehcode Mar 11, 2020
553f03f
Add Database::RawBuffer to parameterize Encode
mehcode Mar 11, 2020
5d042e3
sqlite: stub
mehcode Mar 11, 2020
7ab0701
sqlite: initial work in connection
mehcode Mar 11, 2020
1a48cf3
Reduce some duplication in type parsing
mehcode Mar 13, 2020
0421c9f
Add explicit life times to Cursor
mehcode Mar 13, 2020
30897dd
Add additional notes to Executor
mehcode Mar 13, 2020
444ffff
Run rustfmt
mehcode Mar 13, 2020
751efdf
generalize MaybeOwnedConnection and clean up the connection file
mehcode Mar 13, 2020
a3799c3
sqlite: implement command execution
mehcode Mar 13, 2020
5f27026
sqlite: implement remainder of query API
mehcode Mar 14, 2020
0130fe1
sqlite: implement describe
mehcode Mar 14, 2020
63ef321
sqlite: implement support for multiple statements
mehcode Mar 15, 2020
69ea41d
sqlite: make the implementation far less naive
mehcode Mar 15, 2020
c661fdd
sqlite: macros: initial support for the query macros
mehcode Mar 15, 2020
cddaf1b
remove unused import for spawn_blocking
mehcode Mar 15, 2020
dd99fc3
suppress unused warnings
mehcode Mar 15, 2020
68853ad
Add SQLite to CI
mehcode Mar 15, 2020
21097e1
sqlite: clarlify Sync guarantee
mehcode Mar 15, 2020
2abc451
sqlite: re-order Executor methods
mehcode Mar 15, 2020
97b50b9
sqlite: ensure that we additionally clear the bindings when resetting…
mehcode Mar 15, 2020
69b1d7f
sqlite: add even more notes about Send/Sync
mehcode Mar 15, 2020
426361f
sqlite: make SqliteConnection and Statement !Sync
mehcode Mar 15, 2020
f667910
sqlite: SqliteResultValue -> SqliteValue to match other drivers
mehcode Mar 15, 2020
68d4a0d
sqlite: produce connection specific errors
mehcode Mar 15, 2020
ab20db1
Remove Sqlite prefix from SqliteStatement as it is an internal API
mehcode Mar 15, 2020
a52f364
Implement Postgres LISTEN interface.
thedodd Jan 29, 2020
cb186e6
Updates from review and from testing.
thedodd Jan 30, 2020
f831808
Impl a few features & refactor some code based on design discussion.
thedodd Jan 30, 2020
a0da99e
A good bit of refactoring.
thedodd Jan 31, 2020
ae7e15c
Added demo program to show use of LISTEN/NOTIFY.
thedodd Jan 31, 2020
75a7639
Rename new example to match current nomenclature.
thedodd Feb 1, 2020
608556f
Impl Drop for PgPoolListener & add to exports.
thedodd Feb 10, 2020
82923a1
Update stream impls.
thedodd Feb 20, 2020
3db54dd
Remove the Drop impl for PgPoolListener.
thedodd Mar 2, 2020
4419aea
update postgres/listen.rs for internal changes
mehcode Mar 15, 2020
12e250b
Inline MaybeOwned in ConnectionSource and add another variant to stor…
mehcode Mar 15, 2020
dc8e36c
listen: prefix example with sqlx-example-
mehcode Mar 17, 2020
5cb0d9d
connection: remove unused import for MaybeOwned
mehcode Mar 17, 2020
1d0100b
Add DatabaseError::code
mehcode Mar 17, 2020
0ecacfa
io: forward lifetime properly in Buf trait
mehcode Mar 17, 2020
b80080a
postgres: Stream::read -> Stream::receive and extract "just reading" …
mehcode Mar 17, 2020
f677748
postgres: Add lifetime to NotificationResponse
mehcode Mar 17, 2020
ed9d6c3
pool: handle reconnects during "boot"
mehcode Mar 17, 2020
e99e80c
listen: merge PgListener and PgPoolListener; allow PgListener to be u…
mehcode Mar 17, 2020
57d414f
sqlite: fix impl of DatabaseError
mehcode Mar 17, 2020
0e3eb7c
postgres: listen: go back to Vec<String> for channels
mehcode Mar 17, 2020
c44084d
move around re-exports
mehcode Mar 17, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ jobs:
# unit test: tokio
- run: cargo test --manifest-path sqlx-core/Cargo.toml --no-default-features --features 'chrono uuid postgres mysql tls runtime-tokio'

# integration test: sqlite + async-std
- run: cargo test --no-default-features --features 'runtime-async-std sqlite macros uuid chrono tls'
env:
DATABASE_URL: "sqlite::memory:"

# integration test: sqlite + tokio
- run: cargo test --no-default-features --features 'runtime-tokio sqlite macros uuid chrono tls'
env:
DATABASE_URL: "sqlite::memory:"

# Rust ------------------------------------------------

- name: Prepare build directory for cache
Expand Down
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"sqlx-core",
"sqlx-macros",
"sqlx-test",
"examples/listen-postgres",
"examples/realworld-postgres",
"examples/todos-postgres",
]
Expand Down Expand Up @@ -42,6 +43,7 @@ runtime-tokio = [ "sqlx-core/runtime-tokio", "sqlx-macros/runtime-tokio" ]
# database
postgres = [ "sqlx-core/postgres", "sqlx-macros/postgres" ]
mysql = [ "sqlx-core/mysql", "sqlx-macros/mysql" ]
sqlite = [ "sqlx-core/sqlite", "sqlx-macros/sqlite" ]

# types
chrono = [ "sqlx-core/chrono", "sqlx-macros/chrono" ]
Expand Down Expand Up @@ -70,6 +72,18 @@ required-features = [ "postgres", "macros" ]
name = "mysql-macros"
required-features = [ "mysql", "macros" ]

[[test]]
name = "sqlite"
required-features = [ "sqlite" ]

[[test]]
name = "sqlite-raw"
required-features = [ "sqlite" ]

[[test]]
name = "sqlite-types"
required-features = [ "sqlite" ]

[[test]]
name = "mysql"
required-features = [ "mysql" ]
Expand Down
10 changes: 10 additions & 0 deletions examples/listen-postgres/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "sqlx-example-listen-postgres"
version = "0.1.0"
edition = "2018"
workspace = "../.."

[dependencies]
async-std = { version = "1.4.0", features = [ "attributes", "unstable" ] }
sqlx = { path = "../..", features = [ "postgres", "tls" ] }
futures = "0.3.1"
18 changes: 18 additions & 0 deletions examples/listen-postgres/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Postgres LISTEN/NOTIFY
======================

## Usage

Declare the database URL. This example does not include any reading or writing of data.

```
export DATABASE_URL="postgres://postgres@localhost/postgres"
```

Run.

```
cargo run
```

The example program should connect to the database, and create a LISTEN loop on a predefined set of channels. A NOTIFY task will be spawned which will connect to the same database and will emit notifications on a 5 second interval.
68 changes: 68 additions & 0 deletions examples/listen-postgres/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use async_std::stream;
use futures::StreamExt;
use futures::TryStreamExt;
use sqlx::postgres::PgListener;
use sqlx::{Executor, PgPool};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Building PG pool.");
let conn_str =
std::env::var("DATABASE_URL").expect("Env var DATABASE_URL is required for this example.");
let pool = sqlx::PgPool::new(&conn_str).await?;

let mut listener = PgListener::new(&conn_str).await?;

// let notify_pool = pool.clone();
let _t = async_std::task::spawn(async move {
stream::interval(Duration::from_secs(2))
.for_each(|_| notify(&pool))
.await
});

println!("Starting LISTEN loop.");

listener.listen_all(&["chan0", "chan1", "chan2"]).await?;

let mut counter = 0usize;
loop {
let notification = listener.recv().await?;
println!("[from recv]: {:?}", notification);

counter += 1;
if counter >= 3 {
break;
}
}

// Prove that we are buffering messages by waiting for 6 seconds
listener.execute("SELECT pg_sleep(6)").await?;

let mut stream = listener.into_stream();
while let Some(notification) = stream.try_next().await? {
println!("[from stream]: {:?}", notification);
}

Ok(())
}

async fn notify(mut pool: &PgPool) {
static COUNTER: AtomicUsize = AtomicUsize::new(0);

let res = pool
.execute(&*format!(
r#"
NOTIFY "chan0", '{{"payload": {}}}';
NOTIFY "chan1", '{{"payload": {}}}';
NOTIFY "chan2", '{{"payload": {}}}';
"#,
COUNTER.fetch_add(1, Ordering::SeqCst),
COUNTER.fetch_add(1, Ordering::SeqCst),
COUNTER.fetch_add(1, Ordering::SeqCst)
))
.await;

println!("[from notify]: {:?}", res);
}
12 changes: 10 additions & 2 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ default = [ "runtime-async-std" ]
unstable = []
postgres = [ "md-5", "sha2", "base64", "sha-1", "rand", "hmac" ]
mysql = [ "sha-1", "sha2", "generic-array", "num-bigint", "base64", "digest", "rand" ]
sqlite = [ "libsqlite3-sys" ]
tls = [ "async-native-tls" ]
runtime-async-std = [ "async-native-tls/runtime-async-std", "async-std" ]
runtime-tokio = [ "async-native-tls/runtime-tokio", "tokio" ]

[dependencies]
async-native-tls = { version = "0.3.2", default-features = false, optional = true }
async-std = { version = "1.5.0", optional = true }
async-std = { version = "1.5.0", features = [ "unstable" ], optional = true }
async-stream = { version = "0.2.1", default-features = false }
base64 = { version = "0.11.0", default-features = false, optional = true, features = [ "std" ] }
bitflags = { version = "1.2.1", default-features = false }
Expand All @@ -50,5 +51,12 @@ tokio = { version = "0.2.13", default-features = false, features = [ "dns", "fs"
url = { version = "2.1.1", default-features = false }
uuid = { version = "0.8.1", default-features = false, optional = true, features = [ "std" ] }

# <https://github.com/jgallagher/rusqlite/tree/master/libsqlite3-sys>
[dependencies.libsqlite3-sys]
version = "0.17.1"
optional = true
default-features = false
features = [ "pkg-config", "vcpkg", "bundled" ]

[dev-dependencies]
matches = "0.1.8"
matches = "0.1.8"
75 changes: 42 additions & 33 deletions sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures_core::future::BoxFuture;

use crate::executor::Executor;
use crate::pool::{Pool, PoolConnection};
use crate::transaction::Transaction;
use crate::url::Url;

/// Represents a single database connection rather than a pool of database connections.
Expand All @@ -15,6 +16,16 @@ where
Self: Send + 'static,
Self: Executor,
{
/// Starts a transaction.
///
/// Returns [`Transaction`](struct.Transaction.html).
fn begin(self) -> BoxFuture<'static, crate::Result<Transaction<Self>>>
where
Self: Sized,
{
Box::pin(Transaction::new(0, self))
}

/// Close this database connection.
fn close(self) -> BoxFuture<'static, crate::Result<()>>;

Expand All @@ -31,63 +42,61 @@ pub trait Connect: Connection {
Self: Sized;
}

mod internal {
#[allow(dead_code)]
pub enum MaybeOwnedConnection<'c, C>
where
C: super::Connect,
{
Borrowed(&'c mut C),
Owned(super::PoolConnection<C>),
}

#[allow(dead_code)]
pub enum ConnectionSource<'c, C>
where
C: super::Connect,
{
Connection(MaybeOwnedConnection<'c, C>),
Pool(super::Pool<C>),
}
#[allow(dead_code)]
pub(crate) enum ConnectionSource<'c, C>
where
C: Connect,
{
ConnectionRef(&'c mut C),
Connection(C),
PoolConnection(Pool<C>, PoolConnection<C>),
Pool(Pool<C>),
}

pub(crate) use self::internal::{ConnectionSource, MaybeOwnedConnection};

impl<'c, C> ConnectionSource<'c, C>
where
C: Connect,
{
#[allow(dead_code)]
pub(crate) async fn resolve_by_ref(&mut self) -> crate::Result<&'_ mut C> {
pub(crate) async fn resolve(&mut self) -> crate::Result<&'_ mut C> {
if let ConnectionSource::Pool(pool) = self {
*self =
ConnectionSource::Connection(MaybeOwnedConnection::Owned(pool.acquire().await?));
let conn = pool.acquire().await?;

*self = ConnectionSource::PoolConnection(pool.clone(), conn);
}

Ok(match self {
ConnectionSource::Connection(conn) => match conn {
MaybeOwnedConnection::Borrowed(conn) => &mut *conn,
MaybeOwnedConnection::Owned(ref mut conn) => conn,
},
ConnectionSource::ConnectionRef(conn) => conn,
ConnectionSource::PoolConnection(_, ref mut conn) => conn,
ConnectionSource::Connection(ref mut conn) => conn,
ConnectionSource::Pool(_) => unreachable!(),
})
}
}

impl<'c, C> From<&'c mut C> for MaybeOwnedConnection<'c, C>
impl<'c, C> From<C> for ConnectionSource<'c, C>
where
C: Connect,
{
fn from(connection: C) -> Self {
ConnectionSource::Connection(connection)
}
}

impl<'c, C> From<PoolConnection<C>> for ConnectionSource<'c, C>
where
C: Connect,
{
fn from(conn: &'c mut C) -> Self {
MaybeOwnedConnection::Borrowed(conn)
fn from(connection: PoolConnection<C>) -> Self {
ConnectionSource::PoolConnection(Pool(connection.pool.clone()), connection)
}
}

impl<'c, C> From<PoolConnection<C>> for MaybeOwnedConnection<'c, C>
impl<'c, C> From<Pool<C>> for ConnectionSource<'c, C>
where
C: Connect,
{
fn from(conn: PoolConnection<C>) -> Self {
MaybeOwnedConnection::Owned(conn)
fn from(pool: Pool<C>) -> Self {
ConnectionSource::Pool(pool)
}
}
14 changes: 7 additions & 7 deletions sqlx-core/src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use futures_core::future::BoxFuture;

use crate::connection::{Connect, MaybeOwnedConnection};
use crate::database::{Database, HasRow};
use crate::executor::Execute;
use crate::pool::Pool;
Expand All @@ -19,20 +18,21 @@ where
{
type Database: Database;

#[doc(hidden)]
fn from_pool<E>(pool: &Pool<<Self::Database as Database>::Connection>, query: E) -> Self
where
Self: Sized,
E: Execute<'q, Self::Database>;

#[doc(hidden)]
fn from_connection<E, C>(conn: C, query: E) -> Self
fn from_connection<E>(
connection: &'c mut <Self::Database as Database>::Connection,
query: E,
) -> Self
where
Self: Sized,
<Self::Database as Database>::Connection: Connect,
C: Into<MaybeOwnedConnection<'c, <Self::Database as Database>::Connection>>,
E: Execute<'q, Self::Database>;

/// Fetch the next row in the result. Returns `None` if there are no more rows.
fn next(&mut self) -> BoxFuture<crate::Result<Option<<Self::Database as HasRow>::Row>>>;
fn next<'cur>(
&'cur mut self,
) -> BoxFuture<'cur, crate::Result<Option<<Self::Database as HasRow<'cur>>::Row>>>;
}
2 changes: 2 additions & 0 deletions sqlx-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ where

/// The Rust type of table identifiers for this database.
type TableId: Display + Clone;

type RawBuffer;
}

pub trait HasRawValue<'c> {
Expand Down
Loading