From 2870827d35ce0639ba36f13667fe2262d6d5614c Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Fri, 2 Aug 2024 10:17:03 +1000 Subject: [PATCH] fix: Reuse write pool for in-memory SQLite (#27) --- src/sql/db_connection_pool/mod.rs | 2 +- src/sql/db_connection_pool/sqlitepool.rs | 26 +++++++++++++++++------- src/sqlite.rs | 24 +++++++++++++--------- 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/src/sql/db_connection_pool/mod.rs b/src/sql/db_connection_pool/mod.rs index 305e9bd..7b02aa0 100644 --- a/src/sql/db_connection_pool/mod.rs +++ b/src/sql/db_connection_pool/mod.rs @@ -32,7 +32,7 @@ pub trait DbConnectionPool { fn join_push_down(&self) -> JoinPushDown; } -#[derive(Default, Clone)] +#[derive(Default, Clone, Copy, PartialEq, Eq)] pub enum Mode { #[default] Memory, diff --git a/src/sql/db_connection_pool/sqlitepool.rs b/src/sql/db_connection_pool/sqlitepool.rs index e6661b5..a6f067f 100644 --- a/src/sql/db_connection_pool/sqlitepool.rs +++ b/src/sql/db_connection_pool/sqlitepool.rs @@ -20,6 +20,7 @@ pub enum Error { pub struct SqliteConnectionPool { conn: Connection, join_push_down: JoinPushDown, + mode: Mode, } impl SqliteConnectionPool { @@ -51,6 +52,7 @@ impl SqliteConnectionPool { Ok(SqliteConnectionPool { conn, join_push_down, + mode, }) } @@ -67,13 +69,23 @@ impl DbConnectionPool for SqliteConnect ) -> Result>> { let conn = self.conn.clone(); - // change transaction mode to Write-Ahead log instead of default atomic rollback journal: https://www.sqlite.org/wal.html - conn.call(|conn| { - conn.execute_batch("PRAGMA journal_mode = WAL;")?; - Ok(()) - }) - .await - .context(ConnectionPoolSnafu)?; + // these configuration options are only applicable for file-mode databases + if self.mode == Mode::File { + // change transaction mode to Write-Ahead log instead of default atomic rollback journal: https://www.sqlite.org/wal.html + // NOTE: This is a no-op if the database is in-memory, as only MEMORY or OFF are supported: https://www.sqlite.org/pragma.html#pragma_journal_mode + conn.call(|conn| { + conn.pragma_update(None, "journal_mode", "WAL")?; + conn.pragma_update(None, "busy_timeout", "5000")?; + conn.pragma_update(None, "synchronous", "NORMAL")?; + conn.pragma_update(None, "cache_size", "-20000")?; + conn.pragma_update(None, "foreign_keys", "true")?; + conn.pragma_update(None, "temp_store", "memory")?; + // conn.set_transaction_behavior(TransactionBehavior::Immediate); introduced in rustqlite 0.32.1, but tokio-rusqlite is still on 0.31.0 + Ok(()) + }) + .await + .context(ConnectionPoolSnafu)?; + } Ok(Box::new(SqliteConnection::new(conn))) } diff --git a/src/sqlite.rs b/src/sqlite.rs index 8fc5da1..f30d3a5 100644 --- a/src/sqlite.rs +++ b/src/sqlite.rs @@ -157,16 +157,6 @@ impl TableProviderFactory for SqliteTableProviderFactory { let db_path = self.sqlite_file_path(&name, &cmd.options); - // use a separate pool instance from writing to allow for concurrent reads+writes - // even though we setup SQLite to use WAL mode, the pool isn't really a pool so shares the same connection - // and we can't have concurrent writes when sharing the same connection - let read_pool: Arc = Arc::new( - SqliteConnectionPool::new(&db_path, mode.clone()) - .await - .context(DbConnectionPoolSnafu) - .map_err(to_datafusion_error)?, - ); - let pool: Arc = Arc::new( SqliteConnectionPool::new(&db_path, mode) .await @@ -174,6 +164,20 @@ impl TableProviderFactory for SqliteTableProviderFactory { .map_err(to_datafusion_error)?, ); + let read_pool = if mode == Mode::Memory { + Arc::clone(&pool) + } else { + // use a separate pool instance from writing to allow for concurrent reads+writes + // even though we setup SQLite to use WAL mode, the pool isn't really a pool so shares the same connection + // and we can't have concurrent writes when sharing the same connection + Arc::new( + SqliteConnectionPool::new(&db_path, mode) + .await + .context(DbConnectionPoolSnafu) + .map_err(to_datafusion_error)?, + ) + }; + let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); let sqlite = Arc::new(Sqlite::new( name.clone(),