Prevent concurrent job pile-up and fix nightly_cache lock contention

- Add shared Arc<Semaphore> (1 permit) through main → AppState and jobs.
  All heavy operations (catalog refresh, rebuild, nightly recompute,
  factory reset) now serialise: a second trigger returns "already_running"
  instead of spawning a parallel task that fights over the SQLite WAL lock.
- Scheduled nightly job acquires the semaphore too, so it waits rather
  than stomping on a manual rebuild triggered at startup.
- Replace 90 × precompute_lightweight calls (90 separate transactions,
  horizon fetched 90 times) with precompute_next_90_nights: one bulk
  SELECT to find missing dates, horizon fetched once, all inserts in a
  single transaction. Eliminates the 1–5s per-INSERT lock waits seen
  when multiple jobs were competing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-17 11:29:37 +02:00
parent 01ccae5951
commit e94f7a0ca6
4 changed files with 120 additions and 82 deletions
+24 -2
View File
@@ -22,6 +22,7 @@ use crate::catalog::force_refresh_catalog;
#[derive(Clone)]
pub struct AppState {
pub pool: SqlitePool,
pub job_lock: std::sync::Arc<tokio::sync::Semaphore>,
}
#[derive(Debug, thiserror::Error)]
@@ -48,8 +49,8 @@ impl IntoResponse for AppError {
}
}
pub fn build_router(pool: SqlitePool) -> Router {
let state = AppState { pool };
pub fn build_router(pool: SqlitePool, job_lock: std::sync::Arc<tokio::sync::Semaphore>) -> Router {
let state = AppState { pool, job_lock };
// Gallery static files
let gallery_dir = std::path::PathBuf::from("/data/gallery");
@@ -118,8 +119,13 @@ pub fn build_router(pool: SqlitePool) -> Router {
async fn catalog_refresh(
axum::extract::State(state): axum::extract::State<AppState>,
) -> Result<Json<serde_json::Value>, AppError> {
let permit = match state.job_lock.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => return Ok(Json(serde_json::json!({ "status": "already_running" }))),
};
let pool = state.pool.clone();
tokio::spawn(async move {
let _permit = permit;
match force_refresh_catalog(&pool).await {
Ok(n) => tracing::info!("Manual catalog refresh complete: {} objects", n),
Err(e) => tracing::error!("Manual catalog refresh failed: {}", e),
@@ -131,8 +137,13 @@ async fn catalog_refresh(
async fn catalog_rebuild(
axum::extract::State(state): axum::extract::State<AppState>,
) -> Result<Json<serde_json::Value>, AppError> {
let permit = match state.job_lock.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => return Ok(Json(serde_json::json!({ "status": "already_running" }))),
};
let pool = state.pool.clone();
tokio::spawn(async move {
let _permit = permit;
match catalog_rebuild_task(&pool).await {
Ok(stats) => tracing::info!("Manual catalog rebuild complete: {} objects", stats.total),
Err(e) => tracing::error!("Manual catalog rebuild failed: {}", e),
@@ -184,8 +195,13 @@ async fn catalog_rebuild_task(pool: &SqlitePool) -> Result<RebuildStats, Box<dyn
async fn nightly_recompute(
axum::extract::State(state): axum::extract::State<AppState>,
) -> Result<Json<serde_json::Value>, AppError> {
let permit = match state.job_lock.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => return Ok(Json(serde_json::json!({ "status": "already_running" }))),
};
let pool = state.pool.clone();
tokio::spawn(async move {
let _permit = permit;
match crate::jobs::nightly::precompute_tonight(&pool).await {
Ok(()) => tracing::info!("Manual nightly recompute complete"),
Err(e) => tracing::error!("Manual nightly recompute failed: {}", e),
@@ -197,6 +213,11 @@ async fn nightly_recompute(
async fn factory_reset(
axum::extract::State(state): axum::extract::State<AppState>,
) -> Result<Json<serde_json::Value>, AppError> {
let permit = match state.job_lock.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => return Ok(Json(serde_json::json!({ "status": "already_running", "message": "A rebuild is already in progress." }))),
};
tracing::info!("Factory reset: clearing transient data...");
// Clear all computed/cached tables — preserve user data (imaging_log, gallery, phd2_logs, horizon, target_notes, custom_targets)
@@ -213,6 +234,7 @@ async fn factory_reset(
// Rebuild catalog + nightly precompute in background
let pool = state.pool.clone();
tokio::spawn(async move {
let _permit = permit;
match catalog_rebuild_task(&pool).await {
Ok(stats) => tracing::info!("Factory reset rebuild complete: {} objects", stats.total),
Err(e) => tracing::error!("Factory reset rebuild failed: {}", e),
+9 -2
View File
@@ -11,10 +11,14 @@ use self::weather_poll::start_weather_scheduler;
use crate::astronomy::astro_twilight;
use crate::config::{LAT, LON};
pub fn start_all_jobs(pool: SqlitePool) {
pub fn start_all_jobs(pool: SqlitePool, job_lock: std::sync::Arc<tokio::sync::Semaphore>) {
// Catalog refresh on startup (respects TTL)
let pool_cat = pool.clone();
let lock_cat = job_lock.clone();
tokio::spawn(async move {
// Catalog refresh is lightweight (skips if TTL not expired), but still grab lock
// to avoid running concurrently with a manual rebuild triggered right at startup.
let _permit = lock_cat.acquire().await;
run_catalog_refresh(pool_cat).await;
});
@@ -31,12 +35,15 @@ pub fn start_all_jobs(pool: SqlitePool) {
// Nightly precompute: run at dusk each day
let pool_night = pool.clone();
let lock_night = job_lock.clone();
tokio::spawn(async move {
loop {
// Run once immediately on startup
// Acquire lock before precompute — waits if a manual rebuild is in progress
let permit = lock_night.acquire().await;
if let Err(e) = precompute_tonight(&pool_night).await {
tracing::error!("Nightly precompute failed: {}", e);
}
drop(permit);
// Sleep until next dusk
sleep_until_next_dusk().await;
+81 -76
View File
@@ -19,14 +19,90 @@ struct CatalogObj {
pub async fn precompute_tonight(pool: &SqlitePool) -> anyhow::Result<()> {
let today = Utc::now().naive_utc().date();
precompute_for_date(pool, today).await?;
precompute_next_90_nights(pool, today).await?;
Ok(())
}
// Also precompute next 90 nights (lightweight)
for i in 1..=90i64 {
let date = today + Duration::days(i);
if let Err(e) = precompute_lightweight(pool, date).await {
tracing::warn!("Lightweight precompute for {} failed: {}", date, e);
/// Precompute the next 90 nights in a single pass:
/// - one bulk SELECT to find which dates are already done
/// - one transaction for all missing dates
async fn precompute_next_90_nights(pool: &SqlitePool, today: NaiveDate) -> anyhow::Result<()> {
let start_date = (today + Duration::days(1)).to_string();
let end_date = (today + Duration::days(90)).to_string();
// Find which dates already have data
let done: std::collections::HashSet<String> = sqlx::query_scalar(
"SELECT DISTINCT night_date FROM nightly_cache WHERE night_date > ? AND night_date <= ?",
)
.bind(&start_date)
.bind(&end_date)
.fetch_all(pool)
.await?
.into_iter()
.collect();
let missing_dates: Vec<NaiveDate> = (1..=90i64)
.map(|i| today + Duration::days(i))
.filter(|d| !done.contains(&d.to_string()))
.collect();
if missing_dates.is_empty() {
return Ok(());
}
// Fetch shared data once
let objects: Vec<CatalogObj> = sqlx::query_as::<_, (String, f64, f64, String)>(
"SELECT id, ra_deg, dec_deg, obj_type FROM catalog",
)
.fetch_all(pool)
.await?
.into_iter()
.map(|(id, ra, dec, ot)| CatalogObj { id, ra_deg: ra, dec_deg: dec, obj_type: ot })
.collect();
let horizon: Vec<HorizonPoint> = sqlx::query_as(
"SELECT az_deg, alt_deg FROM horizon ORDER BY az_deg",
)
.fetch_all(pool)
.await?;
// Insert all missing dates in a single transaction
let mut tx = pool.begin().await?;
for date in &missing_dates {
let (dusk, dawn) = match astro_twilight(*date, LAT, LON) {
Ok(v) => v,
Err(e) => { tracing::warn!("Twilight failed for {}: {}", date, e); continue; }
};
let midnight = dusk + (dawn - dusk) / 2;
let jd = julian_date(midnight);
let (moon_ra, moon_dec) = moon_position(jd);
let moon_illum = moon_illumination(jd);
let moon_alt = moon_altitude(jd, LAT, LON);
let moon_state = MoonState { ra_deg: moon_ra, dec_deg: moon_dec, illumination: moon_illum, alt_at_midnight: moon_alt };
let window = TonightWindow { dusk, dawn };
let date_str = date.to_string();
for obj in &objects {
let vis = compute_visibility(obj.ra_deg, obj.dec_deg, &window, &horizon, &moon_state);
let rec_filter = top_filter(&obj.obj_type, moon_illum * 100.0, moon_alt, vis.moon_sep_deg);
sqlx::query(
r#"INSERT OR IGNORE INTO nightly_cache
(catalog_id, night_date, max_alt_deg, transit_utc, usable_min, recommended_filter, is_visible_tonight)
VALUES (?, ?, ?, ?, ?, ?, ?)"#,
)
.bind(&obj.id)
.bind(&date_str)
.bind(vis.max_alt_deg)
.bind(vis.transit_utc.map(|t: DateTime<Utc>| t.to_rfc3339()))
.bind(vis.usable_min as i32)
.bind(&rec_filter)
.bind(vis.is_visible_tonight as i32)
.execute(&mut *tx)
.await?;
}
}
tx.commit().await?;
tracing::info!("Lightweight precompute: inserted {} nights ({} objects each)", missing_dates.len(), objects.len());
Ok(())
}
@@ -159,74 +235,3 @@ pub async fn precompute_for_date(pool: &SqlitePool, date: NaiveDate) -> anyhow::
Ok(())
}
/// Lightweight precompute: only max_alt, transit, usable_min, recommended_filter.
/// Skips full visibility curve for performance.
async fn precompute_lightweight(pool: &SqlitePool, date: NaiveDate) -> anyhow::Result<()> {
// Check if already computed
let existing: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM nightly_cache WHERE night_date = ?",
)
.bind(date.to_string())
.fetch_one(pool)
.await?;
if existing > 0 {
return Ok(());
}
let (dusk, dawn) = astro_twilight(date, LAT, LON)?;
let midnight = dusk + (dawn - dusk) / 2;
let jd = julian_date(midnight);
let (moon_ra, moon_dec) = moon_position(jd);
let moon_illum = moon_illumination(jd);
let moon_alt = moon_altitude(jd, LAT, LON);
let horizon: Vec<HorizonPoint> = sqlx::query_as(
"SELECT az_deg, alt_deg FROM horizon ORDER BY az_deg",
)
.fetch_all(pool)
.await?;
let moon_state = MoonState {
ra_deg: moon_ra,
dec_deg: moon_dec,
illumination: moon_illum,
alt_at_midnight: moon_alt,
};
let window = TonightWindow { dusk, dawn };
let objects: Vec<CatalogObj> = sqlx::query_as::<_, (String, f64, f64, String)>(
"SELECT id, ra_deg, dec_deg, obj_type FROM catalog",
)
.fetch_all(pool)
.await?
.into_iter()
.map(|(id, ra, dec, ot)| CatalogObj { id, ra_deg: ra, dec_deg: dec, obj_type: ot })
.collect();
let date_str = date.to_string();
let mut tx = pool.begin().await?;
for obj in &objects {
let vis = compute_visibility(obj.ra_deg, obj.dec_deg, &window, &horizon, &moon_state);
let rec_filter = top_filter(&obj.obj_type, moon_illum * 100.0, moon_alt, vis.moon_sep_deg);
sqlx::query(
r#"INSERT OR IGNORE INTO nightly_cache
(catalog_id, night_date, max_alt_deg, transit_utc, usable_min, recommended_filter, is_visible_tonight)
VALUES (?, ?, ?, ?, ?, ?, ?)"#,
)
.bind(&obj.id)
.bind(&date_str)
.bind(vis.max_alt_deg)
.bind(vis.transit_utc.map(|t: DateTime<Utc>| t.to_rfc3339()))
.bind(vis.usable_min as i32)
.bind(&rec_filter)
.bind(vis.is_visible_tonight as i32)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
+6 -2
View File
@@ -26,8 +26,12 @@ async fn main() -> anyhow::Result<()> {
tracing::info!("Connecting to database: {}", database_url);
let pool = db::init_db(&database_url).await?;
// Shared semaphore: ensures only one heavy background job runs at a time
// (catalog rebuild, nightly precompute, factory reset)
let job_lock = std::sync::Arc::new(tokio::sync::Semaphore::new(1));
// Start background jobs
jobs::start_all_jobs(pool.clone());
jobs::start_all_jobs(pool.clone(), job_lock.clone());
// Build router
let cors = CorsLayer::new()
@@ -35,7 +39,7 @@ async fn main() -> anyhow::Result<()> {
.allow_methods(Any)
.allow_headers(Any);
let app = api::build_router(pool).layer(cors);
let app = api::build_router(pool, job_lock).layer(cors);
let bind_addr = "0.0.0.0:3001";
tracing::info!("Starting server on {}", bind_addr);