mount an atproto PDS repository as a FUSE filesystem
oppi.li/posts/mounting_the_atmosphere/
1mod client;
2mod error;
3mod firehose;
4mod fs;
5mod resolver;
6
7use atrium_api::{client::AtpServiceClient, com, types};
8use atrium_common::resolver::Resolver;
9use atrium_identity::identity_resolver::ResolvedIdentity;
10use atrium_repo::{Repository, blockstore::CarStore};
11use atrium_xrpc_client::isahc::IsahcClient;
12use fuser::MountOption;
13use futures::{StreamExt, stream};
14use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
15use std::{
16 io::{Cursor, Read as _, Write as _},
17 os::unix::net::UnixStream,
18 path::PathBuf,
19 process::Command,
20 sync::Arc,
21};
22
23fn main() {
24 let matches = clap::command!()
25 .arg(
26 clap::Arg::new("handles")
27 .index(1)
28 .required(true)
29 .num_args(1..)
30 .help("One or more handles to download and mount"),
31 )
32 .arg(
33 clap::Arg::new("mountpoint")
34 .short('m')
35 .action(clap::ArgAction::Set)
36 .value_parser(clap::value_parser!(PathBuf)),
37 )
38 .arg(
39 clap::Arg::new("daemon")
40 .short('d')
41 .long("daemon")
42 .action(clap::ArgAction::SetTrue)
43 .help("Run in background and exit parent when mount is ready"),
44 )
45 .arg(
46 clap::Arg::new("__child")
47 .long("__child")
48 .hide(true)
49 .action(clap::ArgAction::Set),
50 )
51 .get_matches();
52
53 let handles: Vec<String> = matches.get_many::<String>("handles").unwrap().cloned().collect();
54 let mountpoint = matches.get_one::<PathBuf>("mountpoint").cloned()
55 .unwrap_or_else(|| PathBuf::from("mnt"));
56
57 // Daemon mode: spawn child and wait for ready signal
58 if matches.get_flag("daemon") {
59 let (mut rx, tx) = UnixStream::pair().unwrap();
60 rx.set_read_timeout(Some(std::time::Duration::from_secs(120))).ok();
61
62 use std::os::unix::io::AsRawFd;
63 let fd = tx.as_raw_fd();
64 let exe = std::env::current_exe().unwrap();
65
66 unsafe {
67 use std::os::unix::process::CommandExt;
68 let mut cmd = Command::new(&exe);
69 cmd.args(std::env::args().skip(1).filter(|a| a != "-d" && a != "--daemon"));
70 cmd.arg("--__child").arg(fd.to_string());
71 cmd.pre_exec(move || {
72 libc::fcntl(fd, libc::F_SETFD, 0); // clear close-on-exec
73 libc::setsid();
74 Ok(())
75 });
76 cmd.spawn().expect("Failed to spawn daemon");
77 }
78 drop(tx);
79
80 let mut buf = [0u8; 1];
81 match rx.read_exact(&mut buf) {
82 Ok(_) => std::process::exit(0),
83 Err(e) => { eprintln!("Daemon failed: {}", e); std::process::exit(1); }
84 }
85 }
86
87 // Child mode: signal parent when ready
88 let signal_fd: Option<i32> = matches.get_one::<String>("__child")
89 .and_then(|s| s.parse().ok());
90
91 let rt = tokio::runtime::Runtime::new().unwrap();
92
93 // Clean up any stale mount before proceeding
94 cleanup_stale_mount(&mountpoint);
95 let _ = std::fs::create_dir_all(&mountpoint);
96
97 let resolver = Arc::new(resolver::id_resolver());
98 let bars = Arc::new(MultiProgress::new());
99 let repos = rt.block_on(
100 stream::iter(handles)
101 .then(|handle| {
102 let h = handle.clone();
103 let r = Arc::clone(&resolver);
104 let b = Arc::clone(&bars);
105 async move {
106 let id = r.resolve(&h).await?;
107 let bytes = cached_download(&id, &b).await?;
108 let repo = build_repo(bytes).await?;
109 Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo))
110 }
111 })
112 .collect::<Vec<_>>(),
113 );
114 let (success, errors): (Vec<_>, Vec<_>) = repos.into_iter().partition(|r| r.is_ok());
115 for e in errors {
116 eprintln!("{:?}", e.as_ref().unwrap_err());
117 }
118 let repos_with_pds: Vec<_> = success
119 .into_iter()
120 .map(|s| s.unwrap())
121 .collect();
122
123 // construct the fs
124 let mut fs = fs::PdsFs::new();
125
126 // Extract (did, pds) pairs for WebSocket tasks before consuming repos
127 let did_pds_pairs: Vec<_> = repos_with_pds.iter()
128 .map(|(did, pds, _)| (did.clone(), pds.clone()))
129 .collect();
130
131 // Consume repos_with_pds to add repos to filesystem
132 for (did, _, repo) in repos_with_pds {
133 rt.block_on(fs.add(did, repo))
134 }
135
136 // get shared state for WebSocket tasks
137 let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state();
138
139 // mount
140 let mut options = vec![
141 MountOption::RO,
142 MountOption::FSName("pdsfs".to_string()),
143 MountOption::AllowOther,
144 ];
145
146 // add macOS-specific options
147 #[cfg(target_os = "macos")]
148 {
149 options.push(MountOption::CUSTOM("local".to_string()));
150 options.push(MountOption::CUSTOM("volname=pdsfs".to_string()));
151 }
152
153 // Create session and get notifier for Finder refresh
154 let session = fuser::Session::new(fs, &mountpoint, &options).unwrap();
155 let notifier = session.notifier();
156 let _bg = session.spawn().unwrap();
157
158 // spawn WebSocket subscription tasks for each DID using the runtime handle
159 let rt_handle = rt.handle().clone();
160 for (did, pds) in did_pds_pairs {
161 let inodes_clone = Arc::clone(&inodes_arc);
162 let sizes_clone = Arc::clone(&sizes_arc);
163 let content_cache_clone = Arc::clone(&content_cache_arc);
164 let notifier_clone = notifier.clone();
165
166 rt_handle.spawn(async move {
167 if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>(
168 did,
169 pds,
170 inodes_clone,
171 sizes_clone,
172 content_cache_clone,
173 notifier_clone,
174 ).await {
175 eprintln!("WebSocket error: {:?}", e);
176 }
177 });
178 }
179
180 // Signal parent if in daemon mode
181 if let Some(fd) = signal_fd {
182 use std::os::unix::io::FromRawFd;
183 let mut sock = unsafe { UnixStream::from_raw_fd(fd) };
184 let _ = sock.write_all(b"R");
185 } else {
186 eprintln!("mounted at {mountpoint:?}");
187 eprintln!("ctrl-c to unmount");
188 }
189
190 // Wait for ctrl-c
191 rt.block_on(tokio::signal::ctrl_c()).ok();
192 eprintln!("unmounted {mountpoint:?}");
193}
194
195async fn cached_download(
196 id: &ResolvedIdentity,
197 m: &MultiProgress,
198) -> Result<Vec<u8>, error::Error> {
199 let mut pb = ProgressBar::new_spinner();
200 pb.set_style(
201 ProgressStyle::default_spinner()
202 .template("{spinner:.green} [{elapsed_precise}] {msg}")
203 .unwrap()
204 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
205 );
206 pb.enable_steady_tick(std::time::Duration::from_millis(100));
207 pb = m.add(pb);
208
209 // Always download fresh - no caching for now to ensure up-to-date data
210 pb.set_message(format!("downloading CAR file for...{}", id.did));
211 let bytes = download_car_file(id, &pb).await?;
212
213 pb.finish();
214 Ok(bytes)
215}
216
217async fn download_car_file(
218 id: &ResolvedIdentity,
219 pb: &ProgressBar,
220) -> Result<Vec<u8>, error::Error> {
221 // download the entire car file first before mounting it as a fusefs
222 let client = AtpServiceClient::new(IsahcClient::new(&id.pds));
223 let did = types::string::Did::new(id.did.clone()).unwrap();
224
225 let bytes = client
226 .service
227 .com
228 .atproto
229 .sync
230 .get_repo(com::atproto::sync::get_repo::Parameters::from(
231 com::atproto::sync::get_repo::ParametersData { did, since: None },
232 ))
233 .await?;
234
235 pb.finish_with_message(format!("download complete for \t...\t{}", id.did));
236
237 Ok(bytes)
238}
239
240async fn build_repo(bytes: Vec<u8>) -> Result<Repository<CarStore<Cursor<Vec<u8>>>>, error::Error> {
241 let store = CarStore::open(Cursor::new(bytes)).await?;
242 let root = store.roots().next().unwrap();
243 let repo = Repository::open(store, root).await?;
244 Ok(repo)
245}
246
247/// Clean up any stale FUSE mount at the given path.
248/// This handles the case where a previous run crashed or was killed without unmounting.
249fn cleanup_stale_mount(mountpoint: &PathBuf) {
250 // Check if the path exists and might be a stale mount
251 if !mountpoint.exists() {
252 return;
253 }
254
255 // Try to detect if it's a stale mount by attempting to read the directory
256 // A stale FUSE mount will typically fail with "Device not configured" or similar
257 if std::fs::read_dir(mountpoint).is_ok() {
258 // Directory is readable, not a stale mount
259 return;
260 }
261
262 eprintln!("Detected stale mount at {:?}, attempting cleanup...", mountpoint);
263
264 // Try platform-specific unmount commands
265 #[cfg(target_os = "macos")]
266 {
267 // Try diskutil first (more reliable on macOS)
268 let _ = Command::new("diskutil")
269 .args(["unmount", "force"])
270 .arg(mountpoint)
271 .output();
272
273 // Fall back to umount if diskutil didn't work
274 if std::fs::read_dir(mountpoint).is_err() {
275 let _ = Command::new("umount")
276 .arg("-f")
277 .arg(mountpoint)
278 .output();
279 }
280 }
281
282 #[cfg(target_os = "linux")]
283 {
284 // Try fusermount first
285 let _ = Command::new("fusermount")
286 .args(["-uz"])
287 .arg(mountpoint)
288 .output();
289
290 // Fall back to umount
291 if std::fs::read_dir(mountpoint).is_err() {
292 let _ = Command::new("umount")
293 .arg("-l")
294 .arg(mountpoint)
295 .output();
296 }
297 }
298
299 // If still broken, try removing and recreating the directory
300 if std::fs::read_dir(mountpoint).is_err() {
301 let _ = std::fs::remove_dir(mountpoint);
302 }
303
304 eprintln!("Cleanup complete");
305}