Skip to content

Commit

Permalink
fix(core): retry more db operations (#28667)
Browse files Browse the repository at this point in the history
  • Loading branch information
Cammisuli authored and jaysoo committed Oct 31, 2024
1 parent 8623bb4 commit 3a34241
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 119 deletions.
33 changes: 22 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/nx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ napi-build = '2.1.3'
assert_fs = "1.0.10"
# This is only used for unit tests
swc_ecma_dep_graph = "0.109.1"
tempfile = "3.13.0"
118 changes: 86 additions & 32 deletions packages/nx/src/native/db/connection.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,103 @@
use anyhow::Result;
use rusqlite::{Connection, Error, OptionalExtension, Params, Row, Statement};

use rusqlite::{Connection, DatabaseName, Error, OptionalExtension, Params, Row, Statement, ToSql};
use std::thread;
use std::time::{Duration, Instant};
use std::time::Duration;
use tracing::trace;

pub struct NxDbConnection {
pub conn: Connection,
}

const MAX_RETRIES: u32 = 20;
const RETRY_DELAY: u64 = 25;

/// macro for handling the db when its busy
/// This is a macro instead of a function because some database operations need to take a &mut Connection, while returning a reference
/// This causes some quite complex lifetime issues that are quite hard to solve
///
/// Using a macro inlines the retry operation where it was called, and the lifetime issues are avoided
macro_rules! retry_db_operation_when_busy {
($operation:expr) => {{
let connection = 'retry: {
for i in 1..MAX_RETRIES {
match $operation {
r @ Ok(_) => break 'retry r,
Err(Error::SqliteFailure(err, _))
if err.code == rusqlite::ErrorCode::DatabaseBusy =>
{
trace!("Database busy. Retrying {} of {}", i, MAX_RETRIES);
let sleep = Duration::from_millis(RETRY_DELAY * 2_u64.pow(i));
let max_sleep = Duration::from_secs(12);
if (sleep >= max_sleep) {
thread::sleep(max_sleep);
} else {
thread::sleep(sleep);
}
}
err => break 'retry err,
};
}
break 'retry Err(Error::SqliteFailure(
rusqlite::ffi::Error {
code: rusqlite::ErrorCode::DatabaseBusy,
extended_code: 0,
},
Some("Database busy. Retried maximum number of times.".to_string()),
));
};

connection
}};
}

impl NxDbConnection {
pub fn new(connection: Connection) -> Self {
Self { conn: connection }
}

pub fn execute<P: Params + Clone>(&self, sql: &str, params: P) -> Result<usize> {
self.retry_on_busy(|conn| conn.execute(sql, params.clone()))
retry_db_operation_when_busy!(self.conn.execute(sql, params.clone()))
.map_err(|e| anyhow::anyhow!("DB execute error: \"{}\", {:?}", sql, e))
}

pub fn execute_batch(&self, sql: &str) -> Result<()> {
self.retry_on_busy(|conn| conn.execute_batch(sql))
retry_db_operation_when_busy!(self.conn.execute_batch(sql))
.map_err(|e| anyhow::anyhow!("DB execute batch error: \"{}\", {:?}", sql, e))
}

pub fn prepare(&self, sql: &str) -> Result<Statement> {
self.retry_on_busy(|conn| conn.prepare(sql))
retry_db_operation_when_busy!(self.conn.prepare(sql))
.map_err(|e| anyhow::anyhow!("DB prepare error: \"{}\", {:?}", sql, e))
}

pub fn transaction<T>(
&mut self,
transaction_operation: impl Fn(&Connection) -> rusqlite::Result<T>,
) -> Result<T> {
let transaction = retry_db_operation_when_busy!(self.conn.transaction())
.map_err(|e| anyhow::anyhow!("DB transaction error: {:?}", e))?;

let result = transaction_operation(&transaction)
.map_err(|e| anyhow::anyhow!("DB transaction operation error: {:?}", e))?;

transaction
.commit()
.map_err(|e| anyhow::anyhow!("DB transaction commit error: {:?}", e))?;

Ok(result)
}

pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<Option<T>>
where
P: Params + Clone,
F: FnOnce(&Row<'_>) -> rusqlite::Result<T> + Clone,
{
self.retry_on_busy(|conn| conn.query_row(sql, params.clone(), f.clone()).optional())
.map_err(|e| anyhow::anyhow!("DB query error: \"{}\", {:?}", sql, e))
retry_db_operation_when_busy!(self
.conn
.query_row(sql, params.clone(), f.clone())
.optional())
.map_err(|e| anyhow::anyhow!("DB query error: \"{}\", {:?}", sql, e))
}

pub fn close(self) -> rusqlite::Result<(), (Connection, Error)> {
Expand All @@ -43,33 +106,24 @@ impl NxDbConnection {
.inspect_err(|e| trace!("Error in close: {:?}", e))
}

#[allow(clippy::needless_lifetimes)]
fn retry_on_busy<'a, F, T>(&'a self, operation: F) -> rusqlite::Result<T>
pub fn pragma_update<V>(
&self,
schema_name: Option<DatabaseName<'_>>,
pragma_name: &str,
pragma_value: V,
) -> rusqlite::Result<()>
where
F: Fn(&'a Connection) -> rusqlite::Result<T>,
V: ToSql + Clone,
{
let start = Instant::now();
let max_retries: u64 = 5;
let retry_delay = Duration::from_millis(25);

for i in 0..max_retries {
match operation(&self.conn) {
Ok(result) => return Ok(result),
Err(Error::SqliteFailure(err, _))
if err.code == rusqlite::ErrorCode::DatabaseBusy =>
{
trace!("Database busy. Retrying{}", ".".repeat(i as usize));
if start.elapsed()
>= Duration::from_millis(max_retries * retry_delay.as_millis() as u64)
{
break;
}
thread::sleep(retry_delay);
}
err @ Err(_) => return err,
}
}
retry_db_operation_when_busy!(self.conn.pragma_update(
schema_name,
pragma_name,
pragma_value.clone()
))
}

operation(&self.conn)
pub fn busy_handler(&self, callback: Option<fn(i32) -> bool>) -> Result<()> {
retry_db_operation_when_busy!(self.conn.busy_handler(callback))
.map_err(|e| anyhow::anyhow!("DB busy handler error: {:?}", e))
}
}
Loading

0 comments on commit 3a34241

Please sign in to comment.