mod client; mod error; mod firehose; mod fs; mod resolver; use atrium_api::{client::AtpServiceClient, com, types}; use atrium_common::resolver::Resolver; use atrium_identity::identity_resolver::ResolvedIdentity; use atrium_repo::{Repository, blockstore::CarStore}; use atrium_xrpc_client::isahc::IsahcClient; use fuser::MountOption; use futures::{StreamExt, stream}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use std::{ io::{Cursor, Read as _, Write as _}, os::unix::net::UnixStream, path::PathBuf, process::Command, sync::Arc, }; fn main() { let matches = clap::command!() .arg( clap::Arg::new("handles") .index(1) .required(true) .num_args(1..) .help("One or more handles to download and mount"), ) .arg( clap::Arg::new("mountpoint") .short('m') .action(clap::ArgAction::Set) .value_parser(clap::value_parser!(PathBuf)), ) .arg( clap::Arg::new("daemon") .short('d') .long("daemon") .action(clap::ArgAction::SetTrue) .help("Run in background and exit parent when mount is ready"), ) .arg( clap::Arg::new("__child") .long("__child") .hide(true) .action(clap::ArgAction::Set), ) .get_matches(); let handles: Vec = matches.get_many::("handles").unwrap().cloned().collect(); let mountpoint = matches.get_one::("mountpoint").cloned() .unwrap_or_else(|| PathBuf::from("mnt")); // Daemon mode: spawn child and wait for ready signal if matches.get_flag("daemon") { let (mut rx, tx) = UnixStream::pair().unwrap(); rx.set_read_timeout(Some(std::time::Duration::from_secs(120))).ok(); use std::os::unix::io::AsRawFd; let fd = tx.as_raw_fd(); let exe = std::env::current_exe().unwrap(); unsafe { use std::os::unix::process::CommandExt; let mut cmd = Command::new(&exe); cmd.args(std::env::args().skip(1).filter(|a| a != "-d" && a != "--daemon")); cmd.arg("--__child").arg(fd.to_string()); cmd.pre_exec(move || { libc::fcntl(fd, libc::F_SETFD, 0); // clear close-on-exec libc::setsid(); Ok(()) }); cmd.spawn().expect("Failed to spawn daemon"); } drop(tx); let mut buf = [0u8; 1]; match rx.read_exact(&mut buf) { Ok(_) => std::process::exit(0), Err(e) => { eprintln!("Daemon failed: {}", e); std::process::exit(1); } } } // Child mode: signal parent when ready let signal_fd: Option = matches.get_one::("__child") .and_then(|s| s.parse().ok()); let rt = tokio::runtime::Runtime::new().unwrap(); // Clean up any stale mount before proceeding cleanup_stale_mount(&mountpoint); let _ = std::fs::create_dir_all(&mountpoint); let resolver = Arc::new(resolver::id_resolver()); let bars = Arc::new(MultiProgress::new()); let repos = rt.block_on( stream::iter(handles) .then(|handle| { let h = handle.clone(); let r = Arc::clone(&resolver); let b = Arc::clone(&bars); async move { let id = r.resolve(&h).await?; let bytes = cached_download(&id, &b).await?; let repo = build_repo(bytes).await?; Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo)) } }) .collect::>(), ); let (success, errors): (Vec<_>, Vec<_>) = repos.into_iter().partition(|r| r.is_ok()); for e in errors { eprintln!("{:?}", e.as_ref().unwrap_err()); } let repos_with_pds: Vec<_> = success .into_iter() .map(|s| s.unwrap()) .collect(); // construct the fs let mut fs = fs::PdsFs::new(); // Extract (did, pds) pairs for WebSocket tasks before consuming repos let did_pds_pairs: Vec<_> = repos_with_pds.iter() .map(|(did, pds, _)| (did.clone(), pds.clone())) .collect(); // Consume repos_with_pds to add repos to filesystem for (did, _, repo) in repos_with_pds { rt.block_on(fs.add(did, repo)) } // get shared state for WebSocket tasks let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state(); // mount let mut options = vec![ MountOption::RO, MountOption::FSName("pdsfs".to_string()), MountOption::AllowOther, ]; // add macOS-specific options #[cfg(target_os = "macos")] { options.push(MountOption::CUSTOM("local".to_string())); options.push(MountOption::CUSTOM("volname=pdsfs".to_string())); } // Create session and get notifier for Finder refresh let session = fuser::Session::new(fs, &mountpoint, &options).unwrap(); let notifier = session.notifier(); let _bg = session.spawn().unwrap(); // spawn WebSocket subscription tasks for each DID using the runtime handle let rt_handle = rt.handle().clone(); for (did, pds) in did_pds_pairs { let inodes_clone = Arc::clone(&inodes_arc); let sizes_clone = Arc::clone(&sizes_arc); let content_cache_clone = Arc::clone(&content_cache_arc); let notifier_clone = notifier.clone(); rt_handle.spawn(async move { if let Err(e) = firehose::subscribe_to_repo::>>>( did, pds, inodes_clone, sizes_clone, content_cache_clone, notifier_clone, ).await { eprintln!("WebSocket error: {:?}", e); } }); } // Signal parent if in daemon mode if let Some(fd) = signal_fd { use std::os::unix::io::FromRawFd; let mut sock = unsafe { UnixStream::from_raw_fd(fd) }; let _ = sock.write_all(b"R"); } else { eprintln!("mounted at {mountpoint:?}"); eprintln!("ctrl-c to unmount"); } // Wait for ctrl-c rt.block_on(tokio::signal::ctrl_c()).ok(); eprintln!("unmounted {mountpoint:?}"); } async fn cached_download( id: &ResolvedIdentity, m: &MultiProgress, ) -> Result, error::Error> { let mut pb = ProgressBar::new_spinner(); pb.set_style( ProgressStyle::default_spinner() .template("{spinner:.green} [{elapsed_precise}] {msg}") .unwrap() .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]), ); pb.enable_steady_tick(std::time::Duration::from_millis(100)); pb = m.add(pb); // Always download fresh - no caching for now to ensure up-to-date data pb.set_message(format!("downloading CAR file for...{}", id.did)); let bytes = download_car_file(id, &pb).await?; pb.finish(); Ok(bytes) } async fn download_car_file( id: &ResolvedIdentity, pb: &ProgressBar, ) -> Result, error::Error> { // download the entire car file first before mounting it as a fusefs let client = AtpServiceClient::new(IsahcClient::new(&id.pds)); let did = types::string::Did::new(id.did.clone()).unwrap(); let bytes = client .service .com .atproto .sync .get_repo(com::atproto::sync::get_repo::Parameters::from( com::atproto::sync::get_repo::ParametersData { did, since: None }, )) .await?; pb.finish_with_message(format!("download complete for \t...\t{}", id.did)); Ok(bytes) } async fn build_repo(bytes: Vec) -> Result>>>, error::Error> { let store = CarStore::open(Cursor::new(bytes)).await?; let root = store.roots().next().unwrap(); let repo = Repository::open(store, root).await?; Ok(repo) } /// Clean up any stale FUSE mount at the given path. /// This handles the case where a previous run crashed or was killed without unmounting. fn cleanup_stale_mount(mountpoint: &PathBuf) { // Check if the path exists and might be a stale mount if !mountpoint.exists() { return; } // Try to detect if it's a stale mount by attempting to read the directory // A stale FUSE mount will typically fail with "Device not configured" or similar if std::fs::read_dir(mountpoint).is_ok() { // Directory is readable, not a stale mount return; } eprintln!("Detected stale mount at {:?}, attempting cleanup...", mountpoint); // Try platform-specific unmount commands #[cfg(target_os = "macos")] { // Try diskutil first (more reliable on macOS) let _ = Command::new("diskutil") .args(["unmount", "force"]) .arg(mountpoint) .output(); // Fall back to umount if diskutil didn't work if std::fs::read_dir(mountpoint).is_err() { let _ = Command::new("umount") .arg("-f") .arg(mountpoint) .output(); } } #[cfg(target_os = "linux")] { // Try fusermount first let _ = Command::new("fusermount") .args(["-uz"]) .arg(mountpoint) .output(); // Fall back to umount if std::fs::read_dir(mountpoint).is_err() { let _ = Command::new("umount") .arg("-l") .arg(mountpoint) .output(); } } // If still broken, try removing and recreating the directory if std::fs::read_dir(mountpoint).is_err() { let _ = std::fs::remove_dir(mountpoint); } eprintln!("Cleanup complete"); }