this repo has no description

better reconnection logic

vielle.dev 98ec0e0f 3a1ce8e0

verified
+22 -10
+4 -1
src/config/mod.rs
··· 11 11 use std::env; 12 12 13 13 pub const DB_MAX_REQ: usize = 65535; 14 + pub const MAX_RECONNECT_ATTEMPTS: usize = 30; 14 15 15 16 mod config_value; 16 17 ··· 80 81 resolver 81 82 .pds_for_did(&self::USER_DID) 82 83 .await 83 - .expect("The did document for a user should contain a #atproto_pds service with a url") 84 + .expect( 85 + "The did document for a user should contain a #atproto_pds service with a url", 86 + ) 84 87 .domain() 85 88 .expect("A users pds should have a domain.") 86 89 .to_string()
+18 -9
src/ingest/queue.rs
··· 47 47 // if it failed, try reconnect 10 times, waiting a second between each attempt 48 48 Err(_) => { 49 49 let mut new_messages = None; 50 - for _ in 0..10 { 51 - let Ok(stream) = 52 - client.subscribe(&SubscribeRepos::new().build()).await 53 - else { 54 - // wait a second 55 - thread::sleep(Duration::from_secs(1)); 56 - continue; 57 - }; 58 - new_messages = Some(stream.into_stream().1) 50 + for i in 0..config::MAX_RECONNECT_ATTEMPTS { 51 + new_messages = match client 52 + .subscribe(&SubscribeRepos::new().build()) 53 + .await 54 + { 55 + Ok(val) => Some(val.into_stream().1), 56 + Err(err) => { 57 + eprintln!( 58 + "Warning: Error: {} ({}/{})", 59 + err, 60 + i + 1, 61 + config::MAX_RECONNECT_ATTEMPTS 62 + ); 63 + thread::sleep(Duration::from_secs(1)); 64 + continue; 65 + } 66 + } 59 67 } 60 68 61 69 if let Some(new_messages) = new_messages { 70 + println!("Reconnected."); 62 71 new_messages 63 72 } else { 64 73 // could not reconnect so just die lmao