forked from
oppi.li/pdsfs
mount an atproto PDS repository as a FUSE filesystem
1mod client;
2mod error;
3mod firehose;
4mod fs;
5mod resolver;
6
7use atrium_api::{client::AtpServiceClient, com, types};
8use atrium_common::resolver::Resolver;
9use atrium_identity::identity_resolver::ResolvedIdentity;
10use atrium_repo::{Repository, blockstore::CarStore};
11use atrium_xrpc_client::isahc::IsahcClient;
12use fuser::MountOption;
13use futures::{StreamExt, stream};
14use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
15use std::{
16 io::{Cursor, Write},
17 path::PathBuf,
18 process::Command,
19 sync::Arc,
20};
21
22fn main() {
23 let rt = tokio::runtime::Runtime::new().unwrap();
24 let matches = clap::command!()
25 .arg(
26 clap::Arg::new("handles")
27 .index(1)
28 .required(true)
29 .num_args(1..)
30 .help("One or more handles to download and mount"),
31 )
32 .arg(
33 clap::Arg::new("mountpoint")
34 .short('m')
35 .action(clap::ArgAction::Set)
36 .value_parser(clap::value_parser!(PathBuf)),
37 )
38 .get_matches();
39 let handles = matches
40 .get_many::<String>("handles")
41 .unwrap()
42 .cloned()
43 .collect::<Vec<_>>();
44 let mountpoint = matches
45 .get_one::<PathBuf>("mountpoint")
46 .map(ToOwned::to_owned)
47 .unwrap_or(PathBuf::from("mnt"));
48
49 // Clean up any stale mount before proceeding
50 cleanup_stale_mount(&mountpoint);
51 let _ = std::fs::create_dir_all(&mountpoint);
52
53 let resolver = Arc::new(resolver::id_resolver());
54 let bars = Arc::new(MultiProgress::new());
55 let repos = rt.block_on(
56 stream::iter(handles)
57 .then(|handle| {
58 let h = handle.clone();
59 let r = Arc::clone(&resolver);
60 let b = Arc::clone(&bars);
61 async move {
62 let id = r.resolve(&h).await?;
63 let bytes = cached_download(&id, &b).await?;
64 let repo = build_repo(bytes).await?;
65 Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo))
66 }
67 })
68 .collect::<Vec<_>>(),
69 );
70 let (success, errors): (Vec<_>, Vec<_>) = repos.into_iter().partition(|r| r.is_ok());
71 for e in errors {
72 eprintln!("{:?}", e.as_ref().unwrap_err());
73 }
74 let repos_with_pds: Vec<_> = success
75 .into_iter()
76 .map(|s| s.unwrap())
77 .collect();
78
79 // construct the fs
80 let mut fs = fs::PdsFs::new();
81
82 // Extract (did, pds) pairs for WebSocket tasks before consuming repos
83 let did_pds_pairs: Vec<_> = repos_with_pds.iter()
84 .map(|(did, pds, _)| (did.clone(), pds.clone()))
85 .collect();
86
87 // Consume repos_with_pds to add repos to filesystem
88 for (did, _, repo) in repos_with_pds {
89 rt.block_on(fs.add(did, repo))
90 }
91
92 // get shared state for WebSocket tasks
93 let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state();
94
95 // mount
96 let mut options = vec![
97 MountOption::RO,
98 MountOption::FSName("pdsfs".to_string()),
99 MountOption::AllowOther,
100 ];
101
102 // add macOS-specific options
103 #[cfg(target_os = "macos")]
104 {
105 options.push(MountOption::CUSTOM("local".to_string()));
106 options.push(MountOption::CUSTOM("volname=pdsfs".to_string()));
107 }
108
109 // Create session and get notifier for Finder refresh
110 let session = fuser::Session::new(fs, &mountpoint, &options).unwrap();
111 let notifier = session.notifier();
112 let _bg = session.spawn().unwrap();
113
114 // spawn WebSocket subscription tasks for each DID using the runtime handle
115 let rt_handle = rt.handle().clone();
116 for (did, pds) in did_pds_pairs {
117 let inodes_clone = Arc::clone(&inodes_arc);
118 let sizes_clone = Arc::clone(&sizes_arc);
119 let content_cache_clone = Arc::clone(&content_cache_arc);
120 let notifier_clone = notifier.clone();
121
122 rt_handle.spawn(async move {
123 if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>(
124 did,
125 pds,
126 inodes_clone,
127 sizes_clone,
128 content_cache_clone,
129 notifier_clone,
130 ).await {
131 eprintln!("WebSocket error: {:?}", e);
132 }
133 });
134 }
135
136 println!("mounted at {mountpoint:?}");
137 print!("hit enter to unmount and exit...");
138 std::io::stdout().flush().unwrap();
139
140 // Wait for user input
141 let mut input = String::new();
142 std::io::stdin().read_line(&mut input).unwrap();
143
144 println!("unmounted {mountpoint:?}");
145}
146
147async fn cached_download(
148 id: &ResolvedIdentity,
149 m: &MultiProgress,
150) -> Result<Vec<u8>, error::Error> {
151 let mut pb = ProgressBar::new_spinner();
152 pb.set_style(
153 ProgressStyle::default_spinner()
154 .template("{spinner:.green} [{elapsed_precise}] {msg}")
155 .unwrap()
156 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
157 );
158 pb.enable_steady_tick(std::time::Duration::from_millis(100));
159 pb = m.add(pb);
160
161 // Always download fresh - no caching for now to ensure up-to-date data
162 pb.set_message(format!("downloading CAR file for...{}", id.did));
163 let bytes = download_car_file(id, &pb).await?;
164
165 pb.finish();
166 Ok(bytes)
167}
168
169async fn download_car_file(
170 id: &ResolvedIdentity,
171 pb: &ProgressBar,
172) -> Result<Vec<u8>, error::Error> {
173 // download the entire car file first before mounting it as a fusefs
174 let client = AtpServiceClient::new(IsahcClient::new(&id.pds));
175 let did = types::string::Did::new(id.did.clone()).unwrap();
176
177 let bytes = client
178 .service
179 .com
180 .atproto
181 .sync
182 .get_repo(com::atproto::sync::get_repo::Parameters::from(
183 com::atproto::sync::get_repo::ParametersData { did, since: None },
184 ))
185 .await?;
186
187 pb.finish_with_message(format!("download complete for \t...\t{}", id.did));
188
189 Ok(bytes)
190}
191
192async fn build_repo(bytes: Vec<u8>) -> Result<Repository<CarStore<Cursor<Vec<u8>>>>, error::Error> {
193 let store = CarStore::open(Cursor::new(bytes)).await?;
194 let root = store.roots().next().unwrap();
195 let repo = Repository::open(store, root).await?;
196 Ok(repo)
197}
198
199/// Clean up any stale FUSE mount at the given path.
200/// This handles the case where a previous run crashed or was killed without unmounting.
201fn cleanup_stale_mount(mountpoint: &PathBuf) {
202 // Check if the path exists and might be a stale mount
203 if !mountpoint.exists() {
204 return;
205 }
206
207 // Try to detect if it's a stale mount by attempting to read the directory
208 // A stale FUSE mount will typically fail with "Device not configured" or similar
209 if std::fs::read_dir(mountpoint).is_ok() {
210 // Directory is readable, not a stale mount
211 return;
212 }
213
214 eprintln!("Detected stale mount at {:?}, attempting cleanup...", mountpoint);
215
216 // Try platform-specific unmount commands
217 #[cfg(target_os = "macos")]
218 {
219 // Try diskutil first (more reliable on macOS)
220 let _ = Command::new("diskutil")
221 .args(["unmount", "force"])
222 .arg(mountpoint)
223 .output();
224
225 // Fall back to umount if diskutil didn't work
226 if std::fs::read_dir(mountpoint).is_err() {
227 let _ = Command::new("umount")
228 .arg("-f")
229 .arg(mountpoint)
230 .output();
231 }
232 }
233
234 #[cfg(target_os = "linux")]
235 {
236 // Try fusermount first
237 let _ = Command::new("fusermount")
238 .args(["-uz"])
239 .arg(mountpoint)
240 .output();
241
242 // Fall back to umount
243 if std::fs::read_dir(mountpoint).is_err() {
244 let _ = Command::new("umount")
245 .arg("-l")
246 .arg(mountpoint)
247 .output();
248 }
249 }
250
251 // If still broken, try removing and recreating the directory
252 if std::fs::read_dir(mountpoint).is_err() {
253 let _ = std::fs::remove_dir(mountpoint);
254 }
255
256 eprintln!("Cleanup complete");
257}