pub mod abell_gc; pub mod abell_pn; pub mod barnard; pub mod pgc; pub mod caldwell; pub mod collinder; pub mod fetch; pub mod filter; pub mod gum; pub mod lbn; pub mod ldn; pub mod melotte; pub mod popular_names; pub mod rcw; pub mod sh2; pub mod vdb; use anyhow::Context; use sqlx::SqlitePool; use self::fetch::fetch_opengc; use self::filter::{compute_derived, is_suitable, CatalogEntry}; use self::popular_names::popular_names; const CATALOG_TTL_SECS: i64 = 7 * 24 * 3600; // Bump this string whenever catalog ingestion logic changes. pub const CATALOG_VERSION: &str = "v11-pgc"; /// Force a full catalog re-ingest regardless of TTL or version. pub async fn force_refresh_catalog(pool: &SqlitePool) -> anyhow::Result { // Clear version so next call to refresh_catalog unconditionally re-ingests sqlx::query("DELETE FROM settings WHERE key = 'catalog_version'") .execute(pool) .await?; do_refresh(pool).await } /// Check if catalog needs refresh and fetch+rebuild if so. pub async fn refresh_catalog(pool: &SqlitePool) -> anyhow::Result<()> { let now = chrono::Utc::now().timestamp(); let last_fetch: Option = sqlx::query_scalar("SELECT MAX(fetched_at) FROM catalog") .fetch_optional(pool) .await? .flatten(); let stored_version: Option = sqlx::query_scalar("SELECT value FROM settings WHERE key = 'catalog_version'") .fetch_optional(pool) .await .unwrap_or(None); let version_stale = stored_version.as_deref() != Some(CATALOG_VERSION); if let Some(last) = last_fetch { if now - last < CATALOG_TTL_SECS && !version_stale { tracing::info!("Catalog is up to date (last fetched {} seconds ago)", now - last); return Ok(()); } } if version_stale { tracing::info!("Catalog version changed to {} — forcing re-ingest", CATALOG_VERSION); } do_refresh(pool).await?; Ok(()) } async fn do_refresh(pool: &SqlitePool) -> anyhow::Result { let entries = build_catalog().await?; let count = entries.len(); tracing::info!("Upserting {} total catalog entries...", count); upsert_entries(pool, &entries).await?; sqlx::query("INSERT OR REPLACE INTO settings (key, value) VALUES ('catalog_version', ?)") .bind(CATALOG_VERSION) .execute(pool) .await?; tracing::info!("Catalog refresh complete: {} objects", count); Ok(count) } /// Build catalog entries from all sources without upserting to database. /// Useful for testing, validation, and dry-run operations. pub async fn build_catalog() -> anyhow::Result> { // Fetch all sources in parallel tracing::info!("Refreshing catalog from OpenNGC + Sh2 + VdB + LDN + Barnard + LBN + Gum + RCW + AbellPN + AbellGC + PGC..."); let (ngc_rows_res, sh2_res, vdb_res, ldn_res, barnard_res, lbn_res, gum_res, rcw_res, abell_pn_res, abell_gc_res, pgc_res) = tokio::join!( fetch_opengc(), sh2::fetch_sh2(), vdb::fetch_vdb(), ldn::fetch_ldn(), barnard::fetch_barnard(), lbn::fetch_lbn(), gum::fetch_gum(), rcw::fetch_rcw(), abell_pn::fetch_abell_pn(), abell_gc::fetch_abell_gc(), pgc::fetch_pgc(), ); let names = popular_names(); let ngc_rows = ngc_rows_res.context("OpenNGC fetch failed")?; let suitable: Vec<_> = ngc_rows.iter().filter(|r| is_suitable(r)).collect(); tracing::info!("OpenNGC: {}/{} rows suitable (RA/Dec valid + known type)", suitable.len(), ngc_rows.len()); let mut entries: Vec = suitable .iter() .filter_map(|r| compute_derived(r, &names)) .collect(); tracing::info!("OpenNGC: {}/{} rows successfully derived to entries", entries.len(), suitable.len()); // Deduplicate Sh2 entries against NGC/IC objects that may share coordinates. // We track IDs already present so Sh2 aliases for NGC objects with existing // entries (e.g. Sh2-100 = IC1318 already in catalog) are skipped. let existing_ids: std::collections::HashSet = entries.iter().map(|e| e.id.clone()).collect(); match sh2_res { Ok(sh2_entries) => { let before = entries.len(); // Only add Sh2 entries whose ID is not already a primary catalog entry. // (OpenNGC already covers many of these via its Identifiers column.) let new_sh2: Vec<_> = sh2_entries.into_iter() .filter(|e| !existing_ids.contains(&e.id)) .collect(); tracing::info!("Adding {} Sh2 entries (non-duplicate)", new_sh2.len()); entries.extend(new_sh2); tracing::info!("Catalog after Sh2: {} entries (was {})", entries.len(), before); } Err(e) => tracing::warn!("Sh2 fetch failed (skipping): {}", e), } match vdb_res { Ok(vdb_entries) => { tracing::info!("Adding {} VdB entries", vdb_entries.len()); entries.extend(vdb_entries); } Err(e) => tracing::warn!("VdB fetch failed (skipping): {}", e), } match ldn_res { Ok(ldn_entries) => { tracing::info!("Adding {} LDN entries", ldn_entries.len()); entries.extend(ldn_entries); } Err(e) => tracing::warn!("LDN fetch failed (skipping): {}", e), } // Barnard dark nebulae — deduplicate against LDN by position (2' radius) let existing_coords: Vec<(f64, f64)> = entries.iter().map(|e| (e.ra_deg, e.dec_deg)).collect(); match barnard_res { Ok(barnard_entries) => { let new_barnard: Vec<_> = barnard_entries.into_iter() .filter(|e| { !existing_coords.iter().any(|(ra, dec)| { let dra = (e.ra_deg - ra).abs().min(360.0 - (e.ra_deg - ra).abs()); let ddec = (e.dec_deg - dec).abs(); (dra * dra + ddec * ddec).sqrt() < 0.033 // ~2 arcmin }) }) .collect(); tracing::info!("Adding {} Barnard dark nebula entries (after dedup)", new_barnard.len()); entries.extend(new_barnard); } Err(e) => tracing::warn!("Barnard fetch failed (skipping): {}", e), } // LBN nebulae — deduplicate against existing NGC/IC/Sh2 let existing_coords: Vec<(f64, f64)> = entries.iter().map(|e| (e.ra_deg, e.dec_deg)).collect(); match lbn_res { Ok(lbn_entries) => { let new_lbn: Vec<_> = lbn_entries.into_iter() .filter(|e| { !existing_coords.iter().any(|(ra, dec)| { let dra = (e.ra_deg - ra).abs().min(360.0 - (e.ra_deg - ra).abs()); let ddec = (e.dec_deg - dec).abs(); (dra * dra + ddec * ddec).sqrt() < 0.033 // ~2 arcmin }) }) .collect(); tracing::info!("Adding {} LBN entries (after dedup)", new_lbn.len()); entries.extend(new_lbn); } Err(e) => tracing::warn!("LBN fetch failed (skipping): {}", e), } // Melotte standalone entries (very large clusters without NGC IDs) let melotte_standalone = melotte::get_standalone_melotte(); let existing_ids: std::collections::HashSet = entries.iter().map(|e| e.id.clone()).collect(); let new_melotte: Vec<_> = melotte_standalone.into_iter() .filter(|e| !existing_ids.contains(&e.id)) .collect(); tracing::info!("Adding {} standalone Melotte entries", new_melotte.len()); entries.extend(new_melotte); // Collinder standalone entries let collinder_standalone = collinder::get_standalone_collinder(); { let existing_ids: std::collections::HashSet = entries.iter().map(|e| e.id.clone()).collect(); let new_collinder: Vec<_> = collinder_standalone.into_iter() .filter(|e| !existing_ids.contains(&e.id)) .collect(); tracing::info!("Adding {} standalone Collinder entries", new_collinder.len()); entries.extend(new_collinder); } // Gum HII regions — deduplicate by position against existing catalog match gum_res { Ok(gum_entries) => { let existing_coords: Vec<(f64, f64)> = entries.iter().map(|e| (e.ra_deg, e.dec_deg)).collect(); let new_gum: Vec<_> = gum_entries.into_iter() .filter(|e| { !existing_coords.iter().any(|(ra, dec)| { let dra = (e.ra_deg - ra).abs().min(360.0 - (e.ra_deg - ra).abs()); let ddec = (e.dec_deg - dec).abs(); (dra * dra + ddec * ddec).sqrt() < 0.033 }) }) .collect(); tracing::info!("Adding {} Gum entries (after dedup)", new_gum.len()); entries.extend(new_gum); } Err(e) => tracing::warn!("Gum fetch failed (skipping): {}", e), } // RCW HII regions — deduplicate by position match rcw_res { Ok(rcw_entries) => { let existing_coords: Vec<(f64, f64)> = entries.iter().map(|e| (e.ra_deg, e.dec_deg)).collect(); let new_rcw: Vec<_> = rcw_entries.into_iter() .filter(|e| { !existing_coords.iter().any(|(ra, dec)| { let dra = (e.ra_deg - ra).abs().min(360.0 - (e.ra_deg - ra).abs()); let ddec = (e.dec_deg - dec).abs(); (dra * dra + ddec * ddec).sqrt() < 0.033 }) }) .collect(); tracing::info!("Adding {} RCW entries (after dedup)", new_rcw.len()); entries.extend(new_rcw); } Err(e) => tracing::warn!("RCW fetch failed (skipping): {}", e), } // Abell PN — deduplicate against NGC/IC PNe by position match abell_pn_res { Ok(abell_entries) => { let existing_coords: Vec<(f64, f64)> = entries.iter().map(|e| (e.ra_deg, e.dec_deg)).collect(); let new_abell: Vec<_> = abell_entries.into_iter() .filter(|e| { !existing_coords.iter().any(|(ra, dec)| { let dra = (e.ra_deg - ra).abs().min(360.0 - (e.ra_deg - ra).abs()); let ddec = (e.dec_deg - dec).abs(); (dra * dra + ddec * ddec).sqrt() < 0.033 }) }) .collect(); tracing::info!("Adding {} Abell PN entries (after dedup)", new_abell.len()); entries.extend(new_abell); } Err(e) => tracing::warn!("Abell PN fetch failed (skipping): {}", e), } // Abell Galaxy Clusters — unique IDs, no dedup needed (galaxy_cluster is a new type) match abell_gc_res { Ok(abell_gc_entries) => { let existing_ids: std::collections::HashSet = entries.iter().map(|e| e.id.clone()).collect(); let new_gc: Vec<_> = abell_gc_entries.into_iter() .filter(|e| !existing_ids.contains(&e.id)) .collect(); tracing::info!("Adding {} Abell Galaxy Cluster entries", new_gc.len()); entries.extend(new_gc); } Err(e) => tracing::warn!("Abell GC fetch failed (skipping): {}", e), } // PGC bright subset — deduplicate against NGC/IC by position (2' radius) match pgc_res { Ok(pgc_entries) => { let existing_coords: Vec<(f64, f64)> = entries.iter().map(|e| (e.ra_deg, e.dec_deg)).collect(); let new_pgc: Vec<_> = pgc_entries.into_iter() .filter(|e| { !existing_coords.iter().any(|(ra, dec)| { let dra = (e.ra_deg - ra).abs().min(360.0 - (e.ra_deg - ra).abs()); let ddec = (e.dec_deg - dec).abs(); (dra * dra + ddec * ddec).sqrt() < 0.033 }) }) .collect(); tracing::info!("Adding {} PGC bright galaxy entries (after dedup)", new_pgc.len()); entries.extend(new_pgc); } Err(e) => tracing::warn!("PGC fetch failed (skipping): {}", e), } Ok(entries) } pub async fn upsert_entries(pool: &SqlitePool, entries: &[CatalogEntry]) -> anyhow::Result<()> { let mut tx = pool.begin().await?; for e in entries { sqlx::query( r#"INSERT OR REPLACE INTO catalog (id, name, common_name, obj_type, ra_deg, dec_deg, ra_h, dec_dms, constellation, size_arcmin_maj, size_arcmin_min, pos_angle_deg, mag_v, surface_brightness, hubble_type, messier_num, is_highlight, fov_fill_pct, mosaic_flag, mosaic_panels_w, mosaic_panels_h, difficulty, guide_star_density, fetched_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"#, ) .bind(&e.id) .bind(&e.name) .bind(&e.common_name) .bind(&e.obj_type) .bind(e.ra_deg) .bind(e.dec_deg) .bind(&e.ra_h) .bind(&e.dec_dms) .bind(&e.constellation) .bind(e.size_arcmin_maj) .bind(e.size_arcmin_min) .bind(e.pos_angle_deg) .bind(e.mag_v) .bind(e.surface_brightness) .bind(&e.hubble_type) .bind(e.messier_num) .bind(e.is_highlight) .bind(e.fov_fill_pct) .bind(e.mosaic_flag) .bind(e.mosaic_panels_w) .bind(e.mosaic_panels_h) .bind(e.difficulty) .bind(e.guide_star_density.as_deref()) .bind(e.fetched_at) .execute(&mut *tx) .await?; } tx.commit().await?; // Populate Caldwell numbers for (num, id) in caldwell::caldwell_map() { let _ = sqlx::query("UPDATE catalog SET caldwell_num = ? WHERE id = ?") .bind(num) .bind(id) .execute(pool) .await; } // Populate Arp numbers for (num, id) in caldwell::arp_map() { let _ = sqlx::query("UPDATE catalog SET arp_num = ? WHERE id = ?") .bind(num) .bind(id) .execute(pool) .await; } // Populate Melotte numbers for (num, id) in melotte::melotte_map() { let _ = sqlx::query("UPDATE catalog SET melotte_num = ? WHERE id = ?") .bind(num) .bind(id) .execute(pool) .await; } // Populate Collinder numbers for (num, id) in collinder::collinder_map() { let _ = sqlx::query("UPDATE catalog SET collinder_num = ? WHERE id = ?") .bind(num) .bind(id) .execute(pool) .await; } Ok(()) }