Skip to content

Commit

Permalink
fix: Reuse write pool for in-memory SQLite (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
peasee authored Aug 2, 2024
1 parent c49db80 commit 2870827
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/sql/db_connection_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub trait DbConnectionPool<T, P: 'static> {
fn join_push_down(&self) -> JoinPushDown;
}

#[derive(Default, Clone)]
#[derive(Default, Clone, Copy, PartialEq, Eq)]
pub enum Mode {
#[default]
Memory,
Expand Down
26 changes: 19 additions & 7 deletions src/sql/db_connection_pool/sqlitepool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub enum Error {
pub struct SqliteConnectionPool {
conn: Connection,
join_push_down: JoinPushDown,
mode: Mode,
}

impl SqliteConnectionPool {
Expand Down Expand Up @@ -51,6 +52,7 @@ impl SqliteConnectionPool {
Ok(SqliteConnectionPool {
conn,
join_push_down,
mode,
})
}

Expand All @@ -67,13 +69,23 @@ impl DbConnectionPool<Connection, &'static (dyn ToSql + Sync)> for SqliteConnect
) -> Result<Box<dyn DbConnection<Connection, &'static (dyn ToSql + Sync)>>> {
let conn = self.conn.clone();

// change transaction mode to Write-Ahead log instead of default atomic rollback journal: https://www.sqlite.org/wal.html
conn.call(|conn| {
conn.execute_batch("PRAGMA journal_mode = WAL;")?;
Ok(())
})
.await
.context(ConnectionPoolSnafu)?;
// these configuration options are only applicable for file-mode databases
if self.mode == Mode::File {
// change transaction mode to Write-Ahead log instead of default atomic rollback journal: https://www.sqlite.org/wal.html
// NOTE: This is a no-op if the database is in-memory, as only MEMORY or OFF are supported: https://www.sqlite.org/pragma.html#pragma_journal_mode
conn.call(|conn| {
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "busy_timeout", "5000")?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
conn.pragma_update(None, "cache_size", "-20000")?;
conn.pragma_update(None, "foreign_keys", "true")?;
conn.pragma_update(None, "temp_store", "memory")?;
// conn.set_transaction_behavior(TransactionBehavior::Immediate); introduced in rustqlite 0.32.1, but tokio-rusqlite is still on 0.31.0
Ok(())
})
.await
.context(ConnectionPoolSnafu)?;
}

Ok(Box::new(SqliteConnection::new(conn)))
}
Expand Down
24 changes: 14 additions & 10 deletions src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,23 +157,27 @@ impl TableProviderFactory for SqliteTableProviderFactory {

let db_path = self.sqlite_file_path(&name, &cmd.options);

// use a separate pool instance from writing to allow for concurrent reads+writes
// even though we setup SQLite to use WAL mode, the pool isn't really a pool so shares the same connection
// and we can't have concurrent writes when sharing the same connection
let read_pool: Arc<SqliteConnectionPool> = Arc::new(
SqliteConnectionPool::new(&db_path, mode.clone())
.await
.context(DbConnectionPoolSnafu)
.map_err(to_datafusion_error)?,
);

let pool: Arc<SqliteConnectionPool> = Arc::new(
SqliteConnectionPool::new(&db_path, mode)
.await
.context(DbConnectionPoolSnafu)
.map_err(to_datafusion_error)?,
);

let read_pool = if mode == Mode::Memory {
Arc::clone(&pool)
} else {
// use a separate pool instance from writing to allow for concurrent reads+writes
// even though we setup SQLite to use WAL mode, the pool isn't really a pool so shares the same connection
// and we can't have concurrent writes when sharing the same connection
Arc::new(
SqliteConnectionPool::new(&db_path, mode)
.await
.context(DbConnectionPoolSnafu)
.map_err(to_datafusion_error)?,
)
};

let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
let sqlite = Arc::new(Sqlite::new(
name.clone(),
Expand Down

0 comments on commit 2870827

Please sign in to comment.