mount an atproto PDS repository as a FUSE filesystem
at main 257 lines 8.2 kB view raw
1mod client; 2mod error; 3mod firehose; 4mod fs; 5mod resolver; 6 7use atrium_api::{client::AtpServiceClient, com, types}; 8use atrium_common::resolver::Resolver; 9use atrium_identity::identity_resolver::ResolvedIdentity; 10use atrium_repo::{Repository, blockstore::CarStore}; 11use atrium_xrpc_client::isahc::IsahcClient; 12use fuser::MountOption; 13use futures::{StreamExt, stream}; 14use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; 15use std::{ 16 io::{Cursor, Write}, 17 path::PathBuf, 18 process::Command, 19 sync::Arc, 20}; 21 22fn main() { 23 let rt = tokio::runtime::Runtime::new().unwrap(); 24 let matches = clap::command!() 25 .arg( 26 clap::Arg::new("handles") 27 .index(1) 28 .required(true) 29 .num_args(1..) 30 .help("One or more handles to download and mount"), 31 ) 32 .arg( 33 clap::Arg::new("mountpoint") 34 .short('m') 35 .action(clap::ArgAction::Set) 36 .value_parser(clap::value_parser!(PathBuf)), 37 ) 38 .get_matches(); 39 let handles = matches 40 .get_many::<String>("handles") 41 .unwrap() 42 .cloned() 43 .collect::<Vec<_>>(); 44 let mountpoint = matches 45 .get_one::<PathBuf>("mountpoint") 46 .map(ToOwned::to_owned) 47 .unwrap_or(PathBuf::from("mnt")); 48 49 // Clean up any stale mount before proceeding 50 cleanup_stale_mount(&mountpoint); 51 let _ = std::fs::create_dir_all(&mountpoint); 52 53 let resolver = Arc::new(resolver::id_resolver()); 54 let bars = Arc::new(MultiProgress::new()); 55 let repos = rt.block_on( 56 stream::iter(handles) 57 .then(|handle| { 58 let h = handle.clone(); 59 let r = Arc::clone(&resolver); 60 let b = Arc::clone(&bars); 61 async move { 62 let id = r.resolve(&h).await?; 63 let bytes = cached_download(&id, &b).await?; 64 let repo = build_repo(bytes).await?; 65 Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo)) 66 } 67 }) 68 .collect::<Vec<_>>(), 69 ); 70 let (success, errors): (Vec<_>, Vec<_>) = repos.into_iter().partition(|r| r.is_ok()); 71 for e in errors { 72 eprintln!("{:?}", e.as_ref().unwrap_err()); 73 } 74 let repos_with_pds: Vec<_> = success 75 .into_iter() 76 .map(|s| s.unwrap()) 77 .collect(); 78 79 // construct the fs 80 let mut fs = fs::PdsFs::new(); 81 82 // Extract (did, pds) pairs for WebSocket tasks before consuming repos 83 let did_pds_pairs: Vec<_> = repos_with_pds.iter() 84 .map(|(did, pds, _)| (did.clone(), pds.clone())) 85 .collect(); 86 87 // Consume repos_with_pds to add repos to filesystem 88 for (did, _, repo) in repos_with_pds { 89 rt.block_on(fs.add(did, repo)) 90 } 91 92 // get shared state for WebSocket tasks 93 let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state(); 94 95 // mount 96 let mut options = vec![ 97 MountOption::RO, 98 MountOption::FSName("pdsfs".to_string()), 99 MountOption::AllowOther, 100 ]; 101 102 // add macOS-specific options 103 #[cfg(target_os = "macos")] 104 { 105 options.push(MountOption::CUSTOM("local".to_string())); 106 options.push(MountOption::CUSTOM("volname=pdsfs".to_string())); 107 } 108 109 // Create session and get notifier for Finder refresh 110 let session = fuser::Session::new(fs, &mountpoint, &options).unwrap(); 111 let notifier = session.notifier(); 112 let _bg = session.spawn().unwrap(); 113 114 // spawn WebSocket subscription tasks for each DID using the runtime handle 115 let rt_handle = rt.handle().clone(); 116 for (did, pds) in did_pds_pairs { 117 let inodes_clone = Arc::clone(&inodes_arc); 118 let sizes_clone = Arc::clone(&sizes_arc); 119 let content_cache_clone = Arc::clone(&content_cache_arc); 120 let notifier_clone = notifier.clone(); 121 122 rt_handle.spawn(async move { 123 if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>( 124 did, 125 pds, 126 inodes_clone, 127 sizes_clone, 128 content_cache_clone, 129 notifier_clone, 130 ).await { 131 eprintln!("WebSocket error: {:?}", e); 132 } 133 }); 134 } 135 136 println!("mounted at {mountpoint:?}"); 137 print!("hit enter to unmount and exit..."); 138 std::io::stdout().flush().unwrap(); 139 140 // Wait for user input 141 let mut input = String::new(); 142 std::io::stdin().read_line(&mut input).unwrap(); 143 144 println!("unmounted {mountpoint:?}"); 145} 146 147async fn cached_download( 148 id: &ResolvedIdentity, 149 m: &MultiProgress, 150) -> Result<Vec<u8>, error::Error> { 151 let mut pb = ProgressBar::new_spinner(); 152 pb.set_style( 153 ProgressStyle::default_spinner() 154 .template("{spinner:.green} [{elapsed_precise}] {msg}") 155 .unwrap() 156 .tick_strings(&["", "", "", "", "", "", "", "", "", ""]), 157 ); 158 pb.enable_steady_tick(std::time::Duration::from_millis(100)); 159 pb = m.add(pb); 160 161 // Always download fresh - no caching for now to ensure up-to-date data 162 pb.set_message(format!("downloading CAR file for...{}", id.did)); 163 let bytes = download_car_file(id, &pb).await?; 164 165 pb.finish(); 166 Ok(bytes) 167} 168 169async fn download_car_file( 170 id: &ResolvedIdentity, 171 pb: &ProgressBar, 172) -> Result<Vec<u8>, error::Error> { 173 // download the entire car file first before mounting it as a fusefs 174 let client = AtpServiceClient::new(IsahcClient::new(&id.pds)); 175 let did = types::string::Did::new(id.did.clone()).unwrap(); 176 177 let bytes = client 178 .service 179 .com 180 .atproto 181 .sync 182 .get_repo(com::atproto::sync::get_repo::Parameters::from( 183 com::atproto::sync::get_repo::ParametersData { did, since: None }, 184 )) 185 .await?; 186 187 pb.finish_with_message(format!("download complete for \t...\t{}", id.did)); 188 189 Ok(bytes) 190} 191 192async fn build_repo(bytes: Vec<u8>) -> Result<Repository<CarStore<Cursor<Vec<u8>>>>, error::Error> { 193 let store = CarStore::open(Cursor::new(bytes)).await?; 194 let root = store.roots().next().unwrap(); 195 let repo = Repository::open(store, root).await?; 196 Ok(repo) 197} 198 199/// Clean up any stale FUSE mount at the given path. 200/// This handles the case where a previous run crashed or was killed without unmounting. 201fn cleanup_stale_mount(mountpoint: &PathBuf) { 202 // Check if the path exists and might be a stale mount 203 if !mountpoint.exists() { 204 return; 205 } 206 207 // Try to detect if it's a stale mount by attempting to read the directory 208 // A stale FUSE mount will typically fail with "Device not configured" or similar 209 if std::fs::read_dir(mountpoint).is_ok() { 210 // Directory is readable, not a stale mount 211 return; 212 } 213 214 eprintln!("Detected stale mount at {:?}, attempting cleanup...", mountpoint); 215 216 // Try platform-specific unmount commands 217 #[cfg(target_os = "macos")] 218 { 219 // Try diskutil first (more reliable on macOS) 220 let _ = Command::new("diskutil") 221 .args(["unmount", "force"]) 222 .arg(mountpoint) 223 .output(); 224 225 // Fall back to umount if diskutil didn't work 226 if std::fs::read_dir(mountpoint).is_err() { 227 let _ = Command::new("umount") 228 .arg("-f") 229 .arg(mountpoint) 230 .output(); 231 } 232 } 233 234 #[cfg(target_os = "linux")] 235 { 236 // Try fusermount first 237 let _ = Command::new("fusermount") 238 .args(["-uz"]) 239 .arg(mountpoint) 240 .output(); 241 242 // Fall back to umount 243 if std::fs::read_dir(mountpoint).is_err() { 244 let _ = Command::new("umount") 245 .arg("-l") 246 .arg(mountpoint) 247 .output(); 248 } 249 } 250 251 // If still broken, try removing and recreating the directory 252 if std::fs::read_dir(mountpoint).is_err() { 253 let _ = std::fs::remove_dir(mountpoint); 254 } 255 256 eprintln!("Cleanup complete"); 257}