use anyhow::{anyhow, Result}; use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID}; use atrium_api::client::AtpServiceClient; use atrium_api::com; use atrium_api::types; use atrium_xrpc_client::isahc::IsahcClient; use futures::StreamExt; use ipld_core::ipld::Ipld; use std::io::Cursor; use std::sync::{Arc, Mutex}; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message; use crate::fs::{PdsFsCollection, PdsFsEntry, PdsFsRecord}; use indexmap::{IndexMap, IndexSet}; /// Frame header types for WebSocket messages #[derive(Debug, Clone, PartialEq, Eq)] enum FrameHeader { Message(Option), Error, } impl TryFrom for FrameHeader { type Error = anyhow::Error; fn try_from(value: Ipld) -> Result { if let Ipld::Map(map) = value { if let Some(Ipld::Integer(i)) = map.get("op") { match i { 1 => { let t = if let Some(Ipld::String(s)) = map.get("t") { Some(s.clone()) } else { None }; return Ok(FrameHeader::Message(t)); } -1 => return Ok(FrameHeader::Error), _ => {} } } } Err(anyhow!("invalid frame type")) } } /// Frame types for parsed WebSocket messages #[derive(Debug, Clone, PartialEq, Eq)] pub enum Frame { Message(Option, MessageFrame), Error(ErrorFrame), } #[derive(Debug, Clone, PartialEq, Eq)] pub struct MessageFrame { pub body: Vec, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct ErrorFrame {} impl TryFrom<&[u8]> for Frame { type Error = anyhow::Error; fn try_from(value: &[u8]) -> Result { let mut cursor = Cursor::new(value); let (left, right) = match serde_ipld_dagcbor::from_reader::(&mut cursor) { Err(serde_ipld_dagcbor::DecodeError::TrailingData) => { value.split_at(cursor.position() as usize) } _ => { return Err(anyhow!("invalid frame type")); } }; let header = FrameHeader::try_from(serde_ipld_dagcbor::from_slice::(left)?)?; if let FrameHeader::Message(t) = &header { Ok(Frame::Message(t.clone(), MessageFrame { body: right.to_vec() })) } else { Ok(Frame::Error(ErrorFrame {})) } } } /// Subscribe to a repo's firehose and update inodes on changes pub async fn subscribe_to_repo( did: String, pds: String, inodes: Arc>>, sizes: Arc>>, content_cache: Arc>>, notifier: fuser::Notifier, ) -> Result<()> where R: atrium_repo::blockstore::AsyncBlockStoreRead, { // Strip https:// or http:// prefix from PDS URL if present let pds_host = pds.trim_start_matches("https://").trim_start_matches("http://"); let url = format!("wss://{}/xrpc/{}", pds_host, NSID); println!("Connecting to firehose: {}", url); let (mut stream, _) = connect_async(url).await?; println!("Connected to firehose for {}", did); loop { match stream.next().await { Some(Ok(Message::Binary(data))) => { if let Ok(Frame::Message(Some(t), msg)) = Frame::try_from(data.as_slice()) { if t.as_str() == "#commit" { if let Ok(commit) = serde_ipld_dagcbor::from_reader::(msg.body.as_slice()) { // Only process commits for our DID if commit.repo.as_str() == did { if let Err(e) = handle_commit(&commit, &inodes, &sizes, &content_cache, &did, &pds, ¬ifier).await { eprintln!("Error handling commit: {:?}", e); } } } } } } Some(Ok(_)) => {} // Ignore other message types Some(Err(e)) => { eprintln!("WebSocket error: {}", e); break; } None => { eprintln!("WebSocket closed"); break; } } } Ok(()) } /// Handle a commit by updating the inode tree and notifying Finder async fn handle_commit( commit: &Commit, inodes: &Arc>>, sizes: &Arc>>, content_cache: &Arc>>, did: &str, pds: &str, notifier: &fuser::Notifier, ) -> Result<()> { // Find the DID inode let did_entry = PdsFsEntry::Did(did.to_string()); let did_inode = { let inodes_lock = inodes.lock().unwrap(); inodes_lock.get_index_of(&did_entry) }; let Some(did_inode) = did_inode else { return Err(anyhow!("DID not found in inodes")); }; for op in &commit.ops { let Some((collection, rkey)) = op.path.split_once('/') else { continue; }; match op.action.as_str() { "create" => { // Fetch the record from PDS let record_key = format!("{}/{}", collection, rkey); let cache_key = format!("{}/{}", did, record_key); // Fetch record content from PDS match fetch_record(pds, did, collection, rkey).await { Ok(content) => { let content_len = content.len() as u64; // Add the record to inodes let (collection_inode, record_inode) = { let mut inodes_lock = inodes.lock().unwrap(); // Ensure collection exists let collection_entry = PdsFsEntry::Collection(PdsFsCollection { parent: did_inode, nsid: collection.to_string(), }); let (collection_inode, _) = inodes_lock.insert_full(collection_entry); // Add the record let record_entry = PdsFsEntry::Record(PdsFsRecord { parent: collection_inode, rkey: rkey.to_string(), }); let (record_inode, _) = inodes_lock.insert_full(record_entry); (collection_inode, record_inode) }; // Cache the content and size content_cache.lock().unwrap().insert(cache_key, content); sizes.lock().unwrap().insert(record_inode, content_len); // Notify Finder about the new file (release lock first) let filename = format!("{}.json", rkey); if let Err(e) = notifier.inval_entry(collection_inode as u64, filename.as_ref()) { eprintln!("Failed to invalidate entry for {}: {}", filename, e); } println!("Created: {}/{}", collection, rkey); } Err(e) => { eprintln!("Failed to fetch record {}/{}: {}", collection, rkey, e); } } } "delete" => { // Get inodes before removing let (collection_inode_opt, child_inode_opt) = { let mut inodes_lock = inodes.lock().unwrap(); // Find the collection let collection_entry = PdsFsEntry::Collection(PdsFsCollection { parent: did_inode, nsid: collection.to_string(), }); let collection_inode = inodes_lock.get_index_of(&collection_entry); // Find and remove the record let child_inode = if let Some(coll_ino) = collection_inode { let record_entry = PdsFsEntry::Record(PdsFsRecord { parent: coll_ino, rkey: rkey.to_string(), }); let child_ino = inodes_lock.get_index_of(&record_entry); inodes_lock.shift_remove(&record_entry); child_ino } else { None }; (collection_inode, child_inode) }; // Notify Finder about the deletion (release lock first) if let (Some(coll_ino), Some(child_ino)) = (collection_inode_opt, child_inode_opt) { // Remove from caches sizes.lock().unwrap().shift_remove(&child_ino); let cache_key = format!("{}/{}/{}", did, collection, rkey); content_cache.lock().unwrap().shift_remove(&cache_key); let filename = format!("{}.json", rkey); if let Err(e) = notifier.delete(coll_ino as u64, child_ino as u64, filename.as_ref()) { eprintln!("Failed to notify deletion for {}: {}", filename, e); } } println!("Deleted: {}/{}", collection, rkey); } "update" => { // For updates, invalidate the inode so content is re-fetched let record_inode_opt = { let inodes_lock = inodes.lock().unwrap(); let collection_entry = PdsFsEntry::Collection(PdsFsCollection { parent: did_inode, nsid: collection.to_string(), }); if let Some(collection_inode) = inodes_lock.get_index_of(&collection_entry) { let record_entry = PdsFsEntry::Record(PdsFsRecord { parent: collection_inode, rkey: rkey.to_string(), }); inodes_lock.get_index_of(&record_entry) } else { None } }; // Notify Finder to invalidate the inode (release lock first) if let Some(record_ino) = record_inode_opt { // Clear caches so content is recalculated sizes.lock().unwrap().shift_remove(&record_ino); let cache_key = format!("{}/{}/{}", did, collection, rkey); content_cache.lock().unwrap().shift_remove(&cache_key); // Invalidate the entire inode (metadata and all data) if let Err(e) = notifier.inval_inode(record_ino as u64, 0, 0) { eprintln!("Failed to invalidate inode for {}/{}: {}", collection, rkey, e); } } println!("Updated: {}/{}", collection, rkey); } _ => {} } } Ok(()) } /// Fetch a record from the PDS async fn fetch_record(pds: &str, did: &str, collection: &str, rkey: &str) -> Result { let client = AtpServiceClient::new(IsahcClient::new(pds)); let did = types::string::Did::new(did.to_string()).map_err(|e| anyhow!(e))?; let collection_nsid = types::string::Nsid::new(collection.to_string()).map_err(|e| anyhow!(e))?; let record_key = types::string::RecordKey::new(rkey.to_string()).map_err(|e| anyhow!(e))?; let response = client .service .com .atproto .repo .get_record(com::atproto::repo::get_record::Parameters::from( com::atproto::repo::get_record::ParametersData { cid: None, collection: collection_nsid, repo: types::string::AtIdentifier::Did(did), rkey: record_key, } )) .await?; Ok(serde_json::to_string_pretty(&response.value)?) }