From e94f7a0ca6c74d5d02cb1b32e0aa0dc8bf18e374 Mon Sep 17 00:00:00 2001 From: Arnaud Nelissen Date: Fri, 17 Apr 2026 11:29:37 +0200 Subject: [PATCH] Prevent concurrent job pile-up and fix nightly_cache lock contention MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add shared Arc (1 permit) through main → AppState and jobs. All heavy operations (catalog refresh, rebuild, nightly recompute, factory reset) now serialise: a second trigger returns "already_running" instead of spawning a parallel task that fights over the SQLite WAL lock. - Scheduled nightly job acquires the semaphore too, so it waits rather than stomping on a manual rebuild triggered at startup. - Replace 90 × precompute_lightweight calls (90 separate transactions, horizon fetched 90 times) with precompute_next_90_nights: one bulk SELECT to find missing dates, horizon fetched once, all inserts in a single transaction. Eliminates the 1–5s per-INSERT lock waits seen when multiple jobs were competing. Co-Authored-By: Claude Sonnet 4.6 --- backend/src/api/mod.rs | 26 +++++- backend/src/jobs/mod.rs | 11 ++- backend/src/jobs/nightly.rs | 157 +++++++++++++++++++----------------- backend/src/main.rs | 8 +- 4 files changed, 120 insertions(+), 82 deletions(-) diff --git a/backend/src/api/mod.rs b/backend/src/api/mod.rs index 5f8a315..3c7f548 100644 --- a/backend/src/api/mod.rs +++ b/backend/src/api/mod.rs @@ -22,6 +22,7 @@ use crate::catalog::force_refresh_catalog; #[derive(Clone)] pub struct AppState { pub pool: SqlitePool, + pub job_lock: std::sync::Arc, } #[derive(Debug, thiserror::Error)] @@ -48,8 +49,8 @@ impl IntoResponse for AppError { } } -pub fn build_router(pool: SqlitePool) -> Router { - let state = AppState { pool }; +pub fn build_router(pool: SqlitePool, job_lock: std::sync::Arc) -> Router { + let state = AppState { pool, job_lock }; // Gallery static files let gallery_dir = std::path::PathBuf::from("/data/gallery"); @@ -118,8 +119,13 @@ pub fn build_router(pool: SqlitePool) -> Router { async fn catalog_refresh( axum::extract::State(state): axum::extract::State, ) -> Result, AppError> { + let permit = match state.job_lock.clone().try_acquire_owned() { + Ok(p) => p, + Err(_) => return Ok(Json(serde_json::json!({ "status": "already_running" }))), + }; let pool = state.pool.clone(); tokio::spawn(async move { + let _permit = permit; match force_refresh_catalog(&pool).await { Ok(n) => tracing::info!("Manual catalog refresh complete: {} objects", n), Err(e) => tracing::error!("Manual catalog refresh failed: {}", e), @@ -131,8 +137,13 @@ async fn catalog_refresh( async fn catalog_rebuild( axum::extract::State(state): axum::extract::State, ) -> Result, AppError> { + let permit = match state.job_lock.clone().try_acquire_owned() { + Ok(p) => p, + Err(_) => return Ok(Json(serde_json::json!({ "status": "already_running" }))), + }; let pool = state.pool.clone(); tokio::spawn(async move { + let _permit = permit; match catalog_rebuild_task(&pool).await { Ok(stats) => tracing::info!("Manual catalog rebuild complete: {} objects", stats.total), Err(e) => tracing::error!("Manual catalog rebuild failed: {}", e), @@ -184,8 +195,13 @@ async fn catalog_rebuild_task(pool: &SqlitePool) -> Result, ) -> Result, AppError> { + let permit = match state.job_lock.clone().try_acquire_owned() { + Ok(p) => p, + Err(_) => return Ok(Json(serde_json::json!({ "status": "already_running" }))), + }; let pool = state.pool.clone(); tokio::spawn(async move { + let _permit = permit; match crate::jobs::nightly::precompute_tonight(&pool).await { Ok(()) => tracing::info!("Manual nightly recompute complete"), Err(e) => tracing::error!("Manual nightly recompute failed: {}", e), @@ -197,6 +213,11 @@ async fn nightly_recompute( async fn factory_reset( axum::extract::State(state): axum::extract::State, ) -> Result, AppError> { + let permit = match state.job_lock.clone().try_acquire_owned() { + Ok(p) => p, + Err(_) => return Ok(Json(serde_json::json!({ "status": "already_running", "message": "A rebuild is already in progress." }))), + }; + tracing::info!("Factory reset: clearing transient data..."); // Clear all computed/cached tables — preserve user data (imaging_log, gallery, phd2_logs, horizon, target_notes, custom_targets) @@ -213,6 +234,7 @@ async fn factory_reset( // Rebuild catalog + nightly precompute in background let pool = state.pool.clone(); tokio::spawn(async move { + let _permit = permit; match catalog_rebuild_task(&pool).await { Ok(stats) => tracing::info!("Factory reset rebuild complete: {} objects", stats.total), Err(e) => tracing::error!("Factory reset rebuild failed: {}", e), diff --git a/backend/src/jobs/mod.rs b/backend/src/jobs/mod.rs index a2da9dd..e43b5d4 100644 --- a/backend/src/jobs/mod.rs +++ b/backend/src/jobs/mod.rs @@ -11,10 +11,14 @@ use self::weather_poll::start_weather_scheduler; use crate::astronomy::astro_twilight; use crate::config::{LAT, LON}; -pub fn start_all_jobs(pool: SqlitePool) { +pub fn start_all_jobs(pool: SqlitePool, job_lock: std::sync::Arc) { // Catalog refresh on startup (respects TTL) let pool_cat = pool.clone(); + let lock_cat = job_lock.clone(); tokio::spawn(async move { + // Catalog refresh is lightweight (skips if TTL not expired), but still grab lock + // to avoid running concurrently with a manual rebuild triggered right at startup. + let _permit = lock_cat.acquire().await; run_catalog_refresh(pool_cat).await; }); @@ -31,12 +35,15 @@ pub fn start_all_jobs(pool: SqlitePool) { // Nightly precompute: run at dusk each day let pool_night = pool.clone(); + let lock_night = job_lock.clone(); tokio::spawn(async move { loop { - // Run once immediately on startup + // Acquire lock before precompute — waits if a manual rebuild is in progress + let permit = lock_night.acquire().await; if let Err(e) = precompute_tonight(&pool_night).await { tracing::error!("Nightly precompute failed: {}", e); } + drop(permit); // Sleep until next dusk sleep_until_next_dusk().await; diff --git a/backend/src/jobs/nightly.rs b/backend/src/jobs/nightly.rs index a5721f6..86fd7ba 100644 --- a/backend/src/jobs/nightly.rs +++ b/backend/src/jobs/nightly.rs @@ -19,14 +19,90 @@ struct CatalogObj { pub async fn precompute_tonight(pool: &SqlitePool) -> anyhow::Result<()> { let today = Utc::now().naive_utc().date(); precompute_for_date(pool, today).await?; + precompute_next_90_nights(pool, today).await?; + Ok(()) +} - // Also precompute next 90 nights (lightweight) - for i in 1..=90i64 { - let date = today + Duration::days(i); - if let Err(e) = precompute_lightweight(pool, date).await { - tracing::warn!("Lightweight precompute for {} failed: {}", date, e); +/// Precompute the next 90 nights in a single pass: +/// - one bulk SELECT to find which dates are already done +/// - one transaction for all missing dates +async fn precompute_next_90_nights(pool: &SqlitePool, today: NaiveDate) -> anyhow::Result<()> { + let start_date = (today + Duration::days(1)).to_string(); + let end_date = (today + Duration::days(90)).to_string(); + + // Find which dates already have data + let done: std::collections::HashSet = sqlx::query_scalar( + "SELECT DISTINCT night_date FROM nightly_cache WHERE night_date > ? AND night_date <= ?", + ) + .bind(&start_date) + .bind(&end_date) + .fetch_all(pool) + .await? + .into_iter() + .collect(); + + let missing_dates: Vec = (1..=90i64) + .map(|i| today + Duration::days(i)) + .filter(|d| !done.contains(&d.to_string())) + .collect(); + + if missing_dates.is_empty() { + return Ok(()); + } + + // Fetch shared data once + let objects: Vec = sqlx::query_as::<_, (String, f64, f64, String)>( + "SELECT id, ra_deg, dec_deg, obj_type FROM catalog", + ) + .fetch_all(pool) + .await? + .into_iter() + .map(|(id, ra, dec, ot)| CatalogObj { id, ra_deg: ra, dec_deg: dec, obj_type: ot }) + .collect(); + + let horizon: Vec = sqlx::query_as( + "SELECT az_deg, alt_deg FROM horizon ORDER BY az_deg", + ) + .fetch_all(pool) + .await?; + + // Insert all missing dates in a single transaction + let mut tx = pool.begin().await?; + for date in &missing_dates { + let (dusk, dawn) = match astro_twilight(*date, LAT, LON) { + Ok(v) => v, + Err(e) => { tracing::warn!("Twilight failed for {}: {}", date, e); continue; } + }; + let midnight = dusk + (dawn - dusk) / 2; + let jd = julian_date(midnight); + let (moon_ra, moon_dec) = moon_position(jd); + let moon_illum = moon_illumination(jd); + let moon_alt = moon_altitude(jd, LAT, LON); + let moon_state = MoonState { ra_deg: moon_ra, dec_deg: moon_dec, illumination: moon_illum, alt_at_midnight: moon_alt }; + let window = TonightWindow { dusk, dawn }; + let date_str = date.to_string(); + + for obj in &objects { + let vis = compute_visibility(obj.ra_deg, obj.dec_deg, &window, &horizon, &moon_state); + let rec_filter = top_filter(&obj.obj_type, moon_illum * 100.0, moon_alt, vis.moon_sep_deg); + sqlx::query( + r#"INSERT OR IGNORE INTO nightly_cache + (catalog_id, night_date, max_alt_deg, transit_utc, usable_min, recommended_filter, is_visible_tonight) + VALUES (?, ?, ?, ?, ?, ?, ?)"#, + ) + .bind(&obj.id) + .bind(&date_str) + .bind(vis.max_alt_deg) + .bind(vis.transit_utc.map(|t: DateTime| t.to_rfc3339())) + .bind(vis.usable_min as i32) + .bind(&rec_filter) + .bind(vis.is_visible_tonight as i32) + .execute(&mut *tx) + .await?; } } + tx.commit().await?; + tracing::info!("Lightweight precompute: inserted {} nights ({} objects each)", missing_dates.len(), objects.len()); Ok(()) } @@ -159,74 +235,3 @@ pub async fn precompute_for_date(pool: &SqlitePool, date: NaiveDate) -> anyhow:: Ok(()) } -/// Lightweight precompute: only max_alt, transit, usable_min, recommended_filter. -/// Skips full visibility curve for performance. -async fn precompute_lightweight(pool: &SqlitePool, date: NaiveDate) -> anyhow::Result<()> { - // Check if already computed - let existing: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM nightly_cache WHERE night_date = ?", - ) - .bind(date.to_string()) - .fetch_one(pool) - .await?; - - if existing > 0 { - return Ok(()); - } - - let (dusk, dawn) = astro_twilight(date, LAT, LON)?; - let midnight = dusk + (dawn - dusk) / 2; - let jd = julian_date(midnight); - let (moon_ra, moon_dec) = moon_position(jd); - let moon_illum = moon_illumination(jd); - let moon_alt = moon_altitude(jd, LAT, LON); - - let horizon: Vec = sqlx::query_as( - "SELECT az_deg, alt_deg FROM horizon ORDER BY az_deg", - ) - .fetch_all(pool) - .await?; - - let moon_state = MoonState { - ra_deg: moon_ra, - dec_deg: moon_dec, - illumination: moon_illum, - alt_at_midnight: moon_alt, - }; - let window = TonightWindow { dusk, dawn }; - - let objects: Vec = sqlx::query_as::<_, (String, f64, f64, String)>( - "SELECT id, ra_deg, dec_deg, obj_type FROM catalog", - ) - .fetch_all(pool) - .await? - .into_iter() - .map(|(id, ra, dec, ot)| CatalogObj { id, ra_deg: ra, dec_deg: dec, obj_type: ot }) - .collect(); - - let date_str = date.to_string(); - let mut tx = pool.begin().await?; - - for obj in &objects { - let vis = compute_visibility(obj.ra_deg, obj.dec_deg, &window, &horizon, &moon_state); - let rec_filter = top_filter(&obj.obj_type, moon_illum * 100.0, moon_alt, vis.moon_sep_deg); - - sqlx::query( - r#"INSERT OR IGNORE INTO nightly_cache - (catalog_id, night_date, max_alt_deg, transit_utc, usable_min, recommended_filter, is_visible_tonight) - VALUES (?, ?, ?, ?, ?, ?, ?)"#, - ) - .bind(&obj.id) - .bind(&date_str) - .bind(vis.max_alt_deg) - .bind(vis.transit_utc.map(|t: DateTime| t.to_rfc3339())) - .bind(vis.usable_min as i32) - .bind(&rec_filter) - .bind(vis.is_visible_tonight as i32) - .execute(&mut *tx) - .await?; - } - tx.commit().await?; - - Ok(()) -} diff --git a/backend/src/main.rs b/backend/src/main.rs index 3b7e781..d2bf7ea 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -26,8 +26,12 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Connecting to database: {}", database_url); let pool = db::init_db(&database_url).await?; + // Shared semaphore: ensures only one heavy background job runs at a time + // (catalog rebuild, nightly precompute, factory reset) + let job_lock = std::sync::Arc::new(tokio::sync::Semaphore::new(1)); + // Start background jobs - jobs::start_all_jobs(pool.clone()); + jobs::start_all_jobs(pool.clone(), job_lock.clone()); // Build router let cors = CorsLayer::new() @@ -35,7 +39,7 @@ async fn main() -> anyhow::Result<()> { .allow_methods(Any) .allow_headers(Any); - let app = api::build_router(pool).layer(cors); + let app = api::build_router(pool, job_lock).layer(cors); let bind_addr = "0.0.0.0:3001"; tracing::info!("Starting server on {}", bind_addr);