mount an atproto PDS repository as a FUSE filesystem oppi.li/posts/mounting_the_atmosphere/
at main 305 lines 10 kB view raw
1mod client; 2mod error; 3mod firehose; 4mod fs; 5mod resolver; 6 7use atrium_api::{client::AtpServiceClient, com, types}; 8use atrium_common::resolver::Resolver; 9use atrium_identity::identity_resolver::ResolvedIdentity; 10use atrium_repo::{Repository, blockstore::CarStore}; 11use atrium_xrpc_client::isahc::IsahcClient; 12use fuser::MountOption; 13use futures::{StreamExt, stream}; 14use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; 15use std::{ 16 io::{Cursor, Read as _, Write as _}, 17 os::unix::net::UnixStream, 18 path::PathBuf, 19 process::Command, 20 sync::Arc, 21}; 22 23fn main() { 24 let matches = clap::command!() 25 .arg( 26 clap::Arg::new("handles") 27 .index(1) 28 .required(true) 29 .num_args(1..) 30 .help("One or more handles to download and mount"), 31 ) 32 .arg( 33 clap::Arg::new("mountpoint") 34 .short('m') 35 .action(clap::ArgAction::Set) 36 .value_parser(clap::value_parser!(PathBuf)), 37 ) 38 .arg( 39 clap::Arg::new("daemon") 40 .short('d') 41 .long("daemon") 42 .action(clap::ArgAction::SetTrue) 43 .help("Run in background and exit parent when mount is ready"), 44 ) 45 .arg( 46 clap::Arg::new("__child") 47 .long("__child") 48 .hide(true) 49 .action(clap::ArgAction::Set), 50 ) 51 .get_matches(); 52 53 let handles: Vec<String> = matches.get_many::<String>("handles").unwrap().cloned().collect(); 54 let mountpoint = matches.get_one::<PathBuf>("mountpoint").cloned() 55 .unwrap_or_else(|| PathBuf::from("mnt")); 56 57 // Daemon mode: spawn child and wait for ready signal 58 if matches.get_flag("daemon") { 59 let (mut rx, tx) = UnixStream::pair().unwrap(); 60 rx.set_read_timeout(Some(std::time::Duration::from_secs(120))).ok(); 61 62 use std::os::unix::io::AsRawFd; 63 let fd = tx.as_raw_fd(); 64 let exe = std::env::current_exe().unwrap(); 65 66 unsafe { 67 use std::os::unix::process::CommandExt; 68 let mut cmd = Command::new(&exe); 69 cmd.args(std::env::args().skip(1).filter(|a| a != "-d" && a != "--daemon")); 70 cmd.arg("--__child").arg(fd.to_string()); 71 cmd.pre_exec(move || { 72 libc::fcntl(fd, libc::F_SETFD, 0); // clear close-on-exec 73 libc::setsid(); 74 Ok(()) 75 }); 76 cmd.spawn().expect("Failed to spawn daemon"); 77 } 78 drop(tx); 79 80 let mut buf = [0u8; 1]; 81 match rx.read_exact(&mut buf) { 82 Ok(_) => std::process::exit(0), 83 Err(e) => { eprintln!("Daemon failed: {}", e); std::process::exit(1); } 84 } 85 } 86 87 // Child mode: signal parent when ready 88 let signal_fd: Option<i32> = matches.get_one::<String>("__child") 89 .and_then(|s| s.parse().ok()); 90 91 let rt = tokio::runtime::Runtime::new().unwrap(); 92 93 // Clean up any stale mount before proceeding 94 cleanup_stale_mount(&mountpoint); 95 let _ = std::fs::create_dir_all(&mountpoint); 96 97 let resolver = Arc::new(resolver::id_resolver()); 98 let bars = Arc::new(MultiProgress::new()); 99 let repos = rt.block_on( 100 stream::iter(handles) 101 .then(|handle| { 102 let h = handle.clone(); 103 let r = Arc::clone(&resolver); 104 let b = Arc::clone(&bars); 105 async move { 106 let id = r.resolve(&h).await?; 107 let bytes = cached_download(&id, &b).await?; 108 let repo = build_repo(bytes).await?; 109 Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo)) 110 } 111 }) 112 .collect::<Vec<_>>(), 113 ); 114 let (success, errors): (Vec<_>, Vec<_>) = repos.into_iter().partition(|r| r.is_ok()); 115 for e in errors { 116 eprintln!("{:?}", e.as_ref().unwrap_err()); 117 } 118 let repos_with_pds: Vec<_> = success 119 .into_iter() 120 .map(|s| s.unwrap()) 121 .collect(); 122 123 // construct the fs 124 let mut fs = fs::PdsFs::new(); 125 126 // Extract (did, pds) pairs for WebSocket tasks before consuming repos 127 let did_pds_pairs: Vec<_> = repos_with_pds.iter() 128 .map(|(did, pds, _)| (did.clone(), pds.clone())) 129 .collect(); 130 131 // Consume repos_with_pds to add repos to filesystem 132 for (did, _, repo) in repos_with_pds { 133 rt.block_on(fs.add(did, repo)) 134 } 135 136 // get shared state for WebSocket tasks 137 let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state(); 138 139 // mount 140 let mut options = vec![ 141 MountOption::RO, 142 MountOption::FSName("pdsfs".to_string()), 143 MountOption::AllowOther, 144 ]; 145 146 // add macOS-specific options 147 #[cfg(target_os = "macos")] 148 { 149 options.push(MountOption::CUSTOM("local".to_string())); 150 options.push(MountOption::CUSTOM("volname=pdsfs".to_string())); 151 } 152 153 // Create session and get notifier for Finder refresh 154 let session = fuser::Session::new(fs, &mountpoint, &options).unwrap(); 155 let notifier = session.notifier(); 156 let _bg = session.spawn().unwrap(); 157 158 // spawn WebSocket subscription tasks for each DID using the runtime handle 159 let rt_handle = rt.handle().clone(); 160 for (did, pds) in did_pds_pairs { 161 let inodes_clone = Arc::clone(&inodes_arc); 162 let sizes_clone = Arc::clone(&sizes_arc); 163 let content_cache_clone = Arc::clone(&content_cache_arc); 164 let notifier_clone = notifier.clone(); 165 166 rt_handle.spawn(async move { 167 if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>( 168 did, 169 pds, 170 inodes_clone, 171 sizes_clone, 172 content_cache_clone, 173 notifier_clone, 174 ).await { 175 eprintln!("WebSocket error: {:?}", e); 176 } 177 }); 178 } 179 180 // Signal parent if in daemon mode 181 if let Some(fd) = signal_fd { 182 use std::os::unix::io::FromRawFd; 183 let mut sock = unsafe { UnixStream::from_raw_fd(fd) }; 184 let _ = sock.write_all(b"R"); 185 } else { 186 eprintln!("mounted at {mountpoint:?}"); 187 eprintln!("ctrl-c to unmount"); 188 } 189 190 // Wait for ctrl-c 191 rt.block_on(tokio::signal::ctrl_c()).ok(); 192 eprintln!("unmounted {mountpoint:?}"); 193} 194 195async fn cached_download( 196 id: &ResolvedIdentity, 197 m: &MultiProgress, 198) -> Result<Vec<u8>, error::Error> { 199 let mut pb = ProgressBar::new_spinner(); 200 pb.set_style( 201 ProgressStyle::default_spinner() 202 .template("{spinner:.green} [{elapsed_precise}] {msg}") 203 .unwrap() 204 .tick_strings(&["", "", "", "", "", "", "", "", "", ""]), 205 ); 206 pb.enable_steady_tick(std::time::Duration::from_millis(100)); 207 pb = m.add(pb); 208 209 // Always download fresh - no caching for now to ensure up-to-date data 210 pb.set_message(format!("downloading CAR file for...{}", id.did)); 211 let bytes = download_car_file(id, &pb).await?; 212 213 pb.finish(); 214 Ok(bytes) 215} 216 217async fn download_car_file( 218 id: &ResolvedIdentity, 219 pb: &ProgressBar, 220) -> Result<Vec<u8>, error::Error> { 221 // download the entire car file first before mounting it as a fusefs 222 let client = AtpServiceClient::new(IsahcClient::new(&id.pds)); 223 let did = types::string::Did::new(id.did.clone()).unwrap(); 224 225 let bytes = client 226 .service 227 .com 228 .atproto 229 .sync 230 .get_repo(com::atproto::sync::get_repo::Parameters::from( 231 com::atproto::sync::get_repo::ParametersData { did, since: None }, 232 )) 233 .await?; 234 235 pb.finish_with_message(format!("download complete for \t...\t{}", id.did)); 236 237 Ok(bytes) 238} 239 240async fn build_repo(bytes: Vec<u8>) -> Result<Repository<CarStore<Cursor<Vec<u8>>>>, error::Error> { 241 let store = CarStore::open(Cursor::new(bytes)).await?; 242 let root = store.roots().next().unwrap(); 243 let repo = Repository::open(store, root).await?; 244 Ok(repo) 245} 246 247/// Clean up any stale FUSE mount at the given path. 248/// This handles the case where a previous run crashed or was killed without unmounting. 249fn cleanup_stale_mount(mountpoint: &PathBuf) { 250 // Check if the path exists and might be a stale mount 251 if !mountpoint.exists() { 252 return; 253 } 254 255 // Try to detect if it's a stale mount by attempting to read the directory 256 // A stale FUSE mount will typically fail with "Device not configured" or similar 257 if std::fs::read_dir(mountpoint).is_ok() { 258 // Directory is readable, not a stale mount 259 return; 260 } 261 262 eprintln!("Detected stale mount at {:?}, attempting cleanup...", mountpoint); 263 264 // Try platform-specific unmount commands 265 #[cfg(target_os = "macos")] 266 { 267 // Try diskutil first (more reliable on macOS) 268 let _ = Command::new("diskutil") 269 .args(["unmount", "force"]) 270 .arg(mountpoint) 271 .output(); 272 273 // Fall back to umount if diskutil didn't work 274 if std::fs::read_dir(mountpoint).is_err() { 275 let _ = Command::new("umount") 276 .arg("-f") 277 .arg(mountpoint) 278 .output(); 279 } 280 } 281 282 #[cfg(target_os = "linux")] 283 { 284 // Try fusermount first 285 let _ = Command::new("fusermount") 286 .args(["-uz"]) 287 .arg(mountpoint) 288 .output(); 289 290 // Fall back to umount 291 if std::fs::read_dir(mountpoint).is_err() { 292 let _ = Command::new("umount") 293 .arg("-l") 294 .arg(mountpoint) 295 .output(); 296 } 297 } 298 299 // If still broken, try removing and recreating the directory 300 if std::fs::read_dir(mountpoint).is_err() { 301 let _ = std::fs::remove_dir(mountpoint); 302 } 303 304 eprintln!("Cleanup complete"); 305}