fix(core): retry more db operations (#28667)

This commit is contained in:
Jonathan Cammisuli 2024-10-30 11:45:34 -04:00 committed by GitHub
parent 125675079c
commit 63b745e8c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 265 additions and 119 deletions

33
Cargo.lock generated
View File

@ -518,9 +518,9 @@ dependencies = [
[[package]]
name = "fastrand"
version = "2.0.1"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "filedescriptor"
@ -1168,9 +1168,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.155"
version = "0.2.161"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1"
[[package]]
name = "libloading"
@ -1195,9 +1195,9 @@ dependencies = [
[[package]]
name = "linux-raw-sys"
version = "0.4.13"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "lock_api"
@ -1546,6 +1546,7 @@ dependencies = [
"swc_ecma_dep_graph",
"swc_ecma_parser",
"swc_ecma_visit",
"tempfile",
"thiserror",
"tracing",
"tracing-subscriber",
@ -1969,9 +1970,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustix"
version = "0.38.32"
version = "0.38.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89"
checksum = "aa260229e6538e52293eeb577aabd09945a09d6d9cc0fc550ed7529056c2e32a"
dependencies = [
"bitflags 2.6.0",
"errno",
@ -2420,14 +2421,15 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "tempfile"
version = "3.10.1"
version = "3.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1"
checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b"
dependencies = [
"cfg-if",
"fastrand",
"once_cell",
"rustix",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@ -2876,6 +2878,15 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-targets"
version = "0.48.5"

View File

@ -74,3 +74,4 @@ napi-build = '2.1.3'
assert_fs = "1.0.10"
# This is only used for unit tests
swc_ecma_dep_graph = "0.109.1"
tempfile = "3.13.0"

View File

@ -1,39 +1,102 @@
use anyhow::Result;
use rusqlite::{Connection, Error, OptionalExtension, Params, Row, Statement};
use rusqlite::{Connection, DatabaseName, Error, OptionalExtension, Params, Row, Statement, ToSql};
use std::thread;
use std::time::{Duration, Instant};
use std::time::Duration;
use tracing::trace;
pub struct NxDbConnection {
pub conn: Connection,
}
const MAX_RETRIES: u32 = 20;
const RETRY_DELAY: u64 = 25;
/// macro for handling the db when its busy
/// This is a macro instead of a function because some database operations need to take a &mut Connection, while returning a reference
/// This causes some quite complex lifetime issues that are quite hard to solve
///
/// Using a macro inlines the retry operation where it was called, and the lifetime issues are avoided
macro_rules! retry_db_operation_when_busy {
($operation:expr) => {{
let connection = 'retry: {
for i in 1..MAX_RETRIES {
match $operation {
r @ Ok(_) => break 'retry r,
Err(Error::SqliteFailure(err, _))
if err.code == rusqlite::ErrorCode::DatabaseBusy =>
{
trace!("Database busy. Retrying {} of {}", i, MAX_RETRIES);
let sleep = Duration::from_millis(RETRY_DELAY * 2_u64.pow(i));
let max_sleep = Duration::from_secs(12);
if (sleep >= max_sleep) {
thread::sleep(max_sleep);
} else {
thread::sleep(sleep);
}
}
err => break 'retry err,
};
}
break 'retry Err(Error::SqliteFailure(
rusqlite::ffi::Error {
code: rusqlite::ErrorCode::DatabaseBusy,
extended_code: 0,
},
Some("Database busy. Retried maximum number of times.".to_string()),
));
};
connection
}};
}
impl NxDbConnection {
pub fn new(connection: Connection) -> Self {
Self { conn: connection }
}
pub fn execute<P: Params + Clone>(&self, sql: &str, params: P) -> Result<usize> {
self.retry_on_busy(|conn| conn.execute(sql, params.clone()))
retry_db_operation_when_busy!(self.conn.execute(sql, params.clone()))
.map_err(|e| anyhow::anyhow!("DB execute error: \"{}\", {:?}", sql, e))
}
pub fn execute_batch(&self, sql: &str) -> Result<()> {
self.retry_on_busy(|conn| conn.execute_batch(sql))
retry_db_operation_when_busy!(self.conn.execute_batch(sql))
.map_err(|e| anyhow::anyhow!("DB execute batch error: \"{}\", {:?}", sql, e))
}
pub fn prepare(&self, sql: &str) -> Result<Statement> {
self.retry_on_busy(|conn| conn.prepare(sql))
retry_db_operation_when_busy!(self.conn.prepare(sql))
.map_err(|e| anyhow::anyhow!("DB prepare error: \"{}\", {:?}", sql, e))
}
pub fn transaction<T>(
&mut self,
transaction_operation: impl Fn(&Connection) -> rusqlite::Result<T>,
) -> Result<T> {
let transaction = retry_db_operation_when_busy!(self.conn.transaction())
.map_err(|e| anyhow::anyhow!("DB transaction error: {:?}", e))?;
let result = transaction_operation(&transaction)
.map_err(|e| anyhow::anyhow!("DB transaction operation error: {:?}", e))?;
transaction
.commit()
.map_err(|e| anyhow::anyhow!("DB transaction commit error: {:?}", e))?;
Ok(result)
}
pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<Option<T>>
where
P: Params + Clone,
F: FnOnce(&Row<'_>) -> rusqlite::Result<T> + Clone,
{
self.retry_on_busy(|conn| conn.query_row(sql, params.clone(), f.clone()).optional())
retry_db_operation_when_busy!(self
.conn
.query_row(sql, params.clone(), f.clone())
.optional())
.map_err(|e| anyhow::anyhow!("DB query error: \"{}\", {:?}", sql, e))
}
@ -43,33 +106,24 @@ impl NxDbConnection {
.inspect_err(|e| trace!("Error in close: {:?}", e))
}
#[allow(clippy::needless_lifetimes)]
fn retry_on_busy<'a, F, T>(&'a self, operation: F) -> rusqlite::Result<T>
pub fn pragma_update<V>(
&self,
schema_name: Option<DatabaseName<'_>>,
pragma_name: &str,
pragma_value: V,
) -> rusqlite::Result<()>
where
F: Fn(&'a Connection) -> rusqlite::Result<T>,
V: ToSql + Clone,
{
let start = Instant::now();
let max_retries: u64 = 5;
let retry_delay = Duration::from_millis(25);
for i in 0..max_retries {
match operation(&self.conn) {
Ok(result) => return Ok(result),
Err(Error::SqliteFailure(err, _))
if err.code == rusqlite::ErrorCode::DatabaseBusy =>
{
trace!("Database busy. Retrying{}", ".".repeat(i as usize));
if start.elapsed()
>= Duration::from_millis(max_retries * retry_delay.as_millis() as u64)
{
break;
}
thread::sleep(retry_delay);
}
err @ Err(_) => return err,
}
retry_db_operation_when_busy!(self.conn.pragma_update(
schema_name,
pragma_name,
pragma_value.clone()
))
}
operation(&self.conn)
pub fn busy_handler(&self, callback: Option<fn(i32) -> bool>) -> Result<()> {
retry_db_operation_when_busy!(self.conn.busy_handler(callback))
.map_err(|e| anyhow::anyhow!("DB busy handler error: {:?}", e))
}
}

View File

@ -1,9 +1,9 @@
use crate::native::db::connection::NxDbConnection;
use fs4::fs_std::FileExt;
use rusqlite::{Connection, OpenFlags};
use std::fs::{remove_file, File};
use std::path::{Path, PathBuf};
use tracing::{debug, trace};
use rusqlite::{Connection, OpenFlags};
use fs4::fs_std::FileExt;
use crate::native::db::connection::NxDbConnection;
pub(super) struct LockFile {
file: File,
@ -36,8 +36,8 @@ pub(super) fn create_lock_file(db_path: &Path) -> anyhow::Result<LockFile> {
})
}
pub(super) fn initialize_db(nx_version: String, db_path: &PathBuf) -> anyhow::Result<NxDbConnection> {
let c = create_connection(db_path)?;
pub(super) fn initialize_db(nx_version: String, db_path: &Path) -> anyhow::Result<NxDbConnection> {
let mut c = open_database_connection(db_path)?;
trace!(
"Checking if current existing database is compatible with Nx {}",
@ -56,53 +56,49 @@ pub(super) fn initialize_db(nx_version: String, db_path: &PathBuf) -> anyhow::Re
trace!("Database is compatible with Nx {}", nx_version);
c
}
// If there is no version, it means that this database is new
Ok(None) => {
trace!("Recording Nx Version: {}", nx_version);
c.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('NX_VERSION', ?)",
[nx_version],
)?;
// If there is no metadata, it means that this database is new
Err(s) if s.to_string().contains("metadata") => {
configure_database(&c)?;
create_metadata_table(&mut c, &nx_version)?;
c
}
_ => {
check @ _ => {
trace!("Incompatible database because: {:?}", check);
trace!("Disconnecting from existing incompatible database");
c.close().map_err(|(_, error)| anyhow::Error::from(error))?;
trace!("Removing existing incompatible database");
remove_file(db_path)?;
trace!("Creating a new connection to a new database");
create_connection(db_path)?
trace!("Initializing a new database");
initialize_db(nx_version, db_path)?
}
};
Ok(c)
}
fn create_connection(db_path: &PathBuf) -> anyhow::Result<NxDbConnection> {
match open_database_connection(db_path) {
Ok(connection) => {
configure_database(&connection)?;
create_metadata_table(&connection)?;
Ok(connection)
}
err @ Err(_) => err,
}
}
fn create_metadata_table(c: &NxDbConnection) -> anyhow::Result<()> {
debug!("Creating table for metadata if it does not exist");
c.execute(
"CREATE TABLE IF NOT EXISTS metadata (
fn create_metadata_table(c: &mut NxDbConnection, nx_version: &str) -> anyhow::Result<()> {
debug!("Creating table for metadata");
c.transaction(|conn| {
conn.execute(
"CREATE TABLE metadata (
key TEXT NOT NULL PRIMARY KEY,
value TEXT NOT NULL
)",
[],
)?;
trace!("Recording Nx Version: {}", nx_version);
conn.execute(
"INSERT INTO metadata (key, value) VALUES ('NX_VERSION', ?)",
[nx_version],
)?;
Ok(())
})?;
Ok(())
}
fn open_database_connection(db_path: &PathBuf) -> anyhow::Result<NxDbConnection> {
fn open_database_connection(db_path: &Path) -> anyhow::Result<NxDbConnection> {
let conn = if cfg!(target_family = "unix") && ci_info::is_ci() {
trace!("Opening connection with unix-dotfile");
Connection::open_with_flags_and_vfs(
@ -110,7 +106,7 @@ fn open_database_connection(db_path: &PathBuf) -> anyhow::Result<NxDbConnection>
OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_URI
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
| OpenFlags::SQLITE_OPEN_FULL_MUTEX,
"unix-dotfile",
)
} else {
@ -129,16 +125,83 @@ fn open_database_connection(db_path: &PathBuf) -> anyhow::Result<NxDbConnection>
fn configure_database(connection: &NxDbConnection) -> anyhow::Result<()> {
connection
.conn
.pragma_update(None, "journal_mode", "WAL")
.map_err(|e| anyhow::anyhow!("Unable to set journal_mode: {:?}", e))?;
connection
.conn
.pragma_update(None, "synchronous", "NORMAL")
.map_err(|e| anyhow::anyhow!("Unable to set synchronous: {:?}", e))?;
connection
.conn
.busy_handler(Some(|tries| tries < 6))
.busy_handler(Some(|tries| tries <= 12))
.map_err(|e| anyhow::anyhow!("Unable to set busy handler: {:?}", e))?;
Ok(())
}
#[cfg(test)]
mod tests {
use crate::native::logger::enable_logger;
use super::*;
#[test]
fn initialize_db_creates_new_db() -> anyhow::Result<()> {
let temp_dir = tempfile::tempdir()?;
let db_path = temp_dir.path().join("test.db");
let _ = initialize_db("1.0.0".to_string(), &db_path)?;
let conn = Connection::open(&db_path)?;
let version: String = conn.query_row(
"SELECT value FROM metadata WHERE key='NX_VERSION'",
[],
|row| row.get(0),
)?;
assert_eq!(version, "1.0.0");
Ok(())
}
#[test]
fn initialize_db_reuses_compatible_db() -> anyhow::Result<()> {
let temp_dir = tempfile::tempdir()?;
let db_path = temp_dir.path().join("test.db");
// Create initial db
let _ = initialize_db("1.0.0".to_string(), &db_path)?;
// Try to initialize again with same version
let _ = initialize_db("1.0.0".to_string(), &db_path)?;
let conn = Connection::open(&db_path)?;
let version: String = conn.query_row(
"SELECT value FROM metadata WHERE key='NX_VERSION'",
[],
|row| row.get(0),
)?;
assert_eq!(version, "1.0.0");
Ok(())
}
#[test]
fn initialize_db_recreates_incompatible_db() -> anyhow::Result<()> {
enable_logger();
let temp_dir = tempfile::tempdir()?;
let db_path = temp_dir.path().join("test.db");
//
// Create initial db
let _ = initialize_db("1.0.0".to_string(), &db_path)?;
// Try to initialize with different version
let conn = initialize_db("2.0.0".to_string(), &db_path)?;
let version: Option<String> = conn.query_row(
"SELECT value FROM metadata WHERE key='NX_VERSION'",
[],
|row| row.get(0),
)?;
assert_eq!(version.unwrap(), "2.0.0");
Ok(())
}
}

View File

@ -1,8 +1,9 @@
pub mod connection;
mod initialize;
use crate::native::db::connection::NxDbConnection;
use crate::native::logger::enable_logger;
use crate::native::machine_id::get_machine_id;
use crate::native::{db::connection::NxDbConnection, hasher::hash};
use napi::bindgen_prelude::External;
use std::fs::create_dir_all;
use std::path::PathBuf;
@ -15,11 +16,19 @@ pub fn connect_to_nx_db(
nx_version: String,
db_name: Option<String>,
) -> anyhow::Result<External<NxDbConnection>> {
enable_logger();
let cache_dir_buf = PathBuf::from(cache_dir);
let db_path = cache_dir_buf.join(format!("{}.db", db_name.unwrap_or_else(get_machine_id)));
let mut db_file_name = db_name.unwrap_or_else(get_machine_id);
if db_file_name.is_empty() {
trace!("Invalid db file name, using fallback name");
db_file_name = hash(b"machine");
}
let db_path = cache_dir_buf.join(format!("{}.db", db_file_name));
create_dir_all(cache_dir_buf)?;
let _ = trace_span!("process", id = process::id()).entered();
trace_span!("process", id = process::id()).in_scope(|| {
trace!("Creating connection to {:?}", db_path);
let lock_file = initialize::create_lock_file(&db_path)?;
@ -29,4 +38,5 @@ pub fn connect_to_nx_db(
initialize::unlock_file(&lock_file);
Ok(External::new(c))
})
}

View File

@ -1,9 +1,10 @@
use crate::native::db::connection::NxDbConnection;
use napi::bindgen_prelude::*;
use rusqlite::params;
use tracing::trace;
#[napi(object)]
#[derive(Default, Clone)]
#[derive(Default, Clone, Debug)]
pub struct HashedTask {
pub hash: String,
pub project: String,
@ -42,14 +43,17 @@ impl TaskDetails {
}
#[napi]
pub fn record_task_details(&self, tasks: Vec<HashedTask>) -> anyhow::Result<()> {
pub fn record_task_details(&mut self, tasks: Vec<HashedTask>) -> anyhow::Result<()> {
trace!("Recording task details");
self.db.transaction(|conn| {
let mut stmt = conn.prepare("INSERT OR REPLACE INTO task_details (hash, project, target, configuration) VALUES (?1, ?2, ?3, ?4)")?;
for task in tasks.iter() {
self.db.execute(
"INSERT OR REPLACE INTO task_details (hash, project, target, configuration)
VALUES (?1, ?2, ?3, ?4)",
stmt.execute(
params![task.hash, task.project, task.target, task.configuration],
)?;
}
Ok(())
})?;
Ok(())
}

View File

@ -5,6 +5,7 @@ use rusqlite::vtab::array;
use rusqlite::{params, types::Value};
use std::collections::HashMap;
use std::rc::Rc;
use tracing::trace;
#[napi(object)]
pub struct TaskRun {
@ -54,25 +55,27 @@ impl NxTaskHistory {
}
#[napi]
pub fn record_task_runs(&self, task_runs: Vec<TaskRun>) -> anyhow::Result<()> {
for task_run in task_runs.iter() {
self.db
.execute(
"
INSERT INTO task_history
pub fn record_task_runs(&mut self, task_runs: Vec<TaskRun>) -> anyhow::Result<()> {
trace!("Recording task runs");
self.db.transaction(|conn| {
let mut stmt = conn.prepare(
"INSERT OR REPLACE INTO task_history
(hash, status, code, start, end)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
)?;
for task_run in task_runs.iter() {
stmt.execute(params![
task_run.hash,
task_run.status,
task_run.code,
task_run.start,
task_run.end
],
)
.map_err(anyhow::Error::from)?;
])
.inspect_err(|e| trace!("Error trying to insert {:?}: {:?}", &task_run.hash, e))?;
}
Ok(())
})?;
Ok(())
}
#[napi]