auth dns over atproto
at main 182 lines 6.0 kB view raw
1use std::sync::Arc; 2use std::time::{Duration, Instant}; 3 4use anyhow::Result; 5use onis_common::config::OnisConfig; 6use tracing_subscriber::EnvFilter; 7 8mod api; 9mod checker; 10mod client; 11 12use checker::Checker; 13use client::AppviewClient; 14 15#[tokio::main] 16async fn main() -> Result<()> { 17 tracing_subscriber::fmt() 18 .with_env_filter(EnvFilter::from_default_env()) 19 .json() 20 .init(); 21 22 let metrics_handle = onis_common::metrics::init() 23 .map_err(|e| anyhow::anyhow!("failed to install metrics recorder: {e}"))?; 24 25 metrics::describe_counter!( 26 "verification_checks_total", 27 "Total verification checks" 28 ); 29 metrics::describe_histogram!( 30 "verification_check_duration_seconds", 31 "Verification check duration" 32 ); 33 metrics::describe_gauge!( 34 "verification_zones_verified", 35 "Number of verified zones" 36 ); 37 metrics::describe_gauge!( 38 "verification_zones_unverified", 39 "Number of unverified zones" 40 ); 41 42 let config = OnisConfig::load()?; 43 let cfg = &config.verify; 44 45 let nameservers = cfg.parse_nameservers() 46 .map_err(|e| anyhow::anyhow!("invalid nameserver address: {e}"))?; 47 48 tracing::info!( 49 appview_url = %cfg.appview_url, 50 port = cfg.port, 51 check_interval = cfg.check_interval, 52 recheck_interval = cfg.recheck_interval, 53 expected_ns = ?cfg.expected_ns, 54 "onis-verify starting" 55 ); 56 57 let client = AppviewClient::new(cfg.appview_url.clone()); 58 let checker = Checker::new(cfg.expected_ns.clone(), nameservers, cfg.dns_port); 59 60 let state = Arc::new(api::VerifyState { 61 checker, 62 client, 63 metrics_handle, 64 }); 65 66 let api_bind = format!("{}:{}", cfg.bind, cfg.port); 67 let api_state = state.clone(); 68 let api_handle = tokio::spawn(async move { 69 let app = api::router(api_state); 70 let listener = tokio::net::TcpListener::bind(&api_bind).await.unwrap(); 71 tracing::info!("verify API listening on {api_bind}"); 72 axum::serve(listener, app).await.unwrap(); 73 }); 74 75 let check_interval = Duration::from_secs(cfg.check_interval); 76 let recheck_interval = cfg.recheck_interval; 77 let scheduler_state = state.clone(); 78 let scheduler_handle = tokio::spawn(async move { 79 run_scheduler(scheduler_state, check_interval, recheck_interval).await; 80 }); 81 82 tokio::select! { 83 _ = api_handle => tracing::error!("verify API exited unexpectedly"), 84 _ = scheduler_handle => tracing::error!("scheduler exited unexpectedly"), 85 _ = tokio::signal::ctrl_c() => tracing::info!("shutting down"), 86 } 87 88 Ok(()) 89} 90 91async fn run_scheduler(state: Arc<api::VerifyState>, interval: Duration, recheck_interval: i64) { 92 loop { 93 tokio::time::sleep(interval).await; 94 95 let now = chrono::Utc::now().timestamp(); 96 let stale_threshold = now - recheck_interval; 97 98 let zones = match state.client.get_stale_zones(stale_threshold).await { 99 Ok(z) => z, 100 Err(e) => { 101 tracing::error!(error = %e, "failed to fetch stale zones"); 102 continue; 103 } 104 }; 105 106 if zones.is_empty() { 107 tracing::debug!("no stale zones to check"); 108 continue; 109 } 110 111 tracing::info!(count = zones.len(), "checking stale zones"); 112 113 let mut verified_count: u64 = 0; 114 let mut unverified_count: u64 = 0; 115 116 for zone_entry in &zones { 117 let check_start = Instant::now(); 118 119 let result = match state.checker.check_zone(&zone_entry.zone, &zone_entry.did).await { 120 Ok(r) => r, 121 Err(e) => { 122 tracing::warn!( 123 zone = %zone_entry.zone, 124 did = %zone_entry.did, 125 error = %e, 126 "verification check failed" 127 ); 128 metrics::counter!("verification_checks_total", "result" => "error") 129 .increment(1); 130 metrics::histogram!("verification_check_duration_seconds") 131 .record(check_start.elapsed().as_secs_f64()); 132 continue; 133 } 134 }; 135 136 let result_label = if result.verified { "verified" } else { "unverified" }; 137 metrics::counter!("verification_checks_total", "result" => result_label).increment(1); 138 metrics::histogram!("verification_check_duration_seconds") 139 .record(check_start.elapsed().as_secs_f64()); 140 141 if result.verified { 142 verified_count += 1; 143 } else { 144 unverified_count += 1; 145 } 146 147 if let Err(e) = state 148 .client 149 .set_verification(&zone_entry.zone, &zone_entry.did, result.verified) 150 .await 151 { 152 tracing::error!( 153 zone = %zone_entry.zone, 154 did = %zone_entry.did, 155 error = %e, 156 "failed to update verification status" 157 ); 158 continue; 159 } 160 161 if result.verified != zone_entry.verified { 162 tracing::info!( 163 zone = %zone_entry.zone, 164 did = %zone_entry.did, 165 old = zone_entry.verified, 166 new = result.verified, 167 ns = ?result.ns_records, 168 "verification status changed" 169 ); 170 } else { 171 tracing::debug!( 172 zone = %zone_entry.zone, 173 verified = result.verified, 174 "verification status unchanged" 175 ); 176 } 177 } 178 179 metrics::gauge!("verification_zones_verified").set(verified_count as f64); 180 metrics::gauge!("verification_zones_unverified").set(unverified_count as f64); 181 } 182}