Highly ambitious ATProtocol AppView service and sdks

index actor if not in cache/index, remove actor if no longer has records

+240 -70
+15
api/.sqlx/query-51b65af246120b4da3817ff81171624f8f9ab755cc19b6526f086a194971a94c.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n DELETE FROM actor\n WHERE did = $1 AND slice_uri = $2\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Text", 9 + "Text" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "51b65af246120b4da3817ff81171624f8f9ab755cc19b6526f086a194971a94c" 15 + }
+23
api/.sqlx/query-b7e17c6fd053d7c445e6e9b3938976384bf06c5fc5d56f28255b6f0349928e6c.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT COUNT(*) as count\n FROM record\n WHERE did = $1 AND slice_uri = $2\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "count", 9 + "type_info": "Int8" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Text", 15 + "Text" 16 + ] 17 + }, 18 + "nullable": [ 19 + null 20 + ] 21 + }, 22 + "hash": "b7e17c6fd053d7c445e6e9b3938976384bf06c5fc5d56f28255b6f0349928e6c" 23 + }
+87
api/src/actor_resolver.rs
··· 1 + use reqwest::Client; 2 + use atproto_identity::{ 3 + plc::query as plc_query, 4 + resolve::{InputType, parse_input}, 5 + web::query as web_query, 6 + }; 7 + use thiserror::Error; 8 + 9 + #[derive(Error, Debug)] 10 + pub enum ActorResolverError { 11 + #[error("error-slice-actor-1 Failed to resolve DID: {0}")] 12 + ResolveFailed(String), 13 + 14 + #[error("error-slice-actor-2 Failed to parse DID: {0}")] 15 + ParseFailed(String), 16 + 17 + #[error("error-slice-actor-3 Subject resolved to handle instead of DID")] 18 + InvalidSubject, 19 + } 20 + 21 + #[derive(Debug, Clone)] 22 + pub struct ActorData { 23 + pub did: String, 24 + pub handle: Option<String>, 25 + pub pds: String, 26 + } 27 + 28 + pub async fn resolve_actor_data(client: &Client, did: &str) -> Result<ActorData, ActorResolverError> { 29 + let (pds_url, handle) = match parse_input(did) { 30 + Ok(InputType::Plc(did_str)) => { 31 + match plc_query(client, "plc.directory", &did_str).await { 32 + Ok(did_doc) => { 33 + let pds = did_doc.service 34 + .iter() 35 + .find(|service| { 36 + service.r#type.contains("AtprotoPersonalDataServer") 37 + }) 38 + .map(|service| service.service_endpoint.clone()) 39 + .map(|url| url.to_string()) 40 + .unwrap_or_else(|| "https://bsky.social".to_string()); 41 + let handle = did_doc.also_known_as 42 + .iter() 43 + .find(|aka| aka.starts_with("at://")) 44 + .map(|aka| aka.strip_prefix("at://").unwrap_or(aka).to_string()); 45 + (pds, handle) 46 + } 47 + Err(e) => { 48 + return Err(ActorResolverError::ResolveFailed(format!("Failed to query PLC for {}: {:?}", did, e))); 49 + } 50 + } 51 + } 52 + Ok(InputType::Web(did_str)) => { 53 + match web_query(client, &did_str).await { 54 + Ok(did_doc) => { 55 + let pds = did_doc.service 56 + .iter() 57 + .find(|service| { 58 + service.r#type.contains("AtprotoPersonalDataServer") 59 + }) 60 + .map(|service| service.service_endpoint.clone()) 61 + .map(|url| url.to_string()) 62 + .unwrap_or_else(|| "https://bsky.social".to_string()); 63 + let handle = did_doc.also_known_as 64 + .iter() 65 + .find(|aka| aka.starts_with("at://")) 66 + .map(|aka| aka.strip_prefix("at://").unwrap_or(aka).to_string()); 67 + (pds, handle) 68 + } 69 + Err(e) => { 70 + return Err(ActorResolverError::ResolveFailed(format!("Failed to query web DID for {}: {:?}", did, e))); 71 + } 72 + } 73 + } 74 + Ok(InputType::Handle(_)) => { 75 + return Err(ActorResolverError::InvalidSubject); 76 + } 77 + Err(e) => { 78 + return Err(ActorResolverError::ParseFailed(format!("Failed to parse DID {}: {:?}", did, e))); 79 + } 80 + }; 81 + 82 + Ok(ActorData { 83 + did: did.to_string(), 84 + handle, 85 + pds: pds_url, 86 + }) 87 + }
+29
api/src/database.rs
··· 1133 1133 .collect()) 1134 1134 } 1135 1135 1136 + pub async fn actor_has_records(&self, did: &str, slice_uri: &str) -> Result<bool, DatabaseError> { 1137 + let count = sqlx::query!( 1138 + r#" 1139 + SELECT COUNT(*) as count 1140 + FROM record 1141 + WHERE did = $1 AND slice_uri = $2 1142 + "#, 1143 + did, 1144 + slice_uri 1145 + ) 1146 + .fetch_one(&self.pool) 1147 + .await?; 1148 + Ok(count.count.unwrap_or(0) > 0) 1149 + } 1150 + 1151 + pub async fn delete_actor(&self, did: &str, slice_uri: &str) -> Result<u64, DatabaseError> { 1152 + let result = sqlx::query!( 1153 + r#" 1154 + DELETE FROM actor 1155 + WHERE did = $1 AND slice_uri = $2 1156 + "#, 1157 + did, 1158 + slice_uri 1159 + ) 1160 + .execute(&self.pool) 1161 + .await?; 1162 + Ok(result.rows_affected()) 1163 + } 1164 + 1136 1165 pub async fn get_slice_domain(&self, slice_uri: &str) -> Result<Option<String>, DatabaseError> { 1137 1166 let row = sqlx::query!( 1138 1167 r#"
+78 -6
api/src/jetstream.rs
··· 6 6 use std::sync::Arc; 7 7 use tokio::sync::RwLock; 8 8 use tracing::{error, info}; 9 + use reqwest::Client; 9 10 11 + use crate::actor_resolver::resolve_actor_data; 10 12 use crate::database::Database; 11 - use crate::models::Record; 13 + use crate::models::{Record, Actor}; 12 14 use crate::errors::SliceError; 13 15 use slices_lexicon::LexiconValidator; 14 16 use crate::logging::{Logger, LogLevel}; ··· 16 18 pub struct JetstreamConsumer { 17 19 consumer: Consumer, 18 20 database: Database, 21 + http_client: Client, 19 22 // Track which collections we should index for each slice 20 23 slice_collections: Arc<RwLock<HashMap<String, HashSet<String>>>>, 21 - // Track domains for each slice (slice_uri -> domain) 24 + // Track domains for each slice (slice_uri -> domain) 22 25 slice_domains: Arc<RwLock<HashMap<String, String>>>, 23 26 // Cache for actor lookups 24 27 actor_cache: Arc<RwLock<HashMap<(String, String), bool>>>, ··· 31 34 // Event handler that implements the EventHandler trait 32 35 struct SliceEventHandler { 33 36 database: Database, 37 + http_client: Client, 34 38 slice_collections: Arc<RwLock<HashMap<String, HashSet<String>>>>, 35 39 slice_domains: Arc<RwLock<HashMap<String, String>>>, 36 40 event_count: Arc<std::sync::atomic::AtomicU64>, ··· 226 230 } 227 231 228 232 if is_primary_collection { 229 - // Primary collection - index ALL records, no actor check needed 230 - info!("✓ Primary collection {} for slice {} (domain: {}) - indexing record", 233 + // Primary collection - ensure actor exists and index ALL records 234 + info!("✓ Primary collection {} for slice {} (domain: {}) - indexing record", 231 235 commit.collection, slice_uri, domain); 236 + 237 + // Ensure actor exists for primary collections 238 + let cache_key = (did.to_string(), slice_uri.clone()); 239 + let is_cached = { 240 + let cache = self.actor_cache.read().await; 241 + cache.contains_key(&cache_key) 242 + }; 243 + 244 + if !is_cached { 245 + // Actor not in cache - create it 246 + info!("Creating new actor {} for slice {}", did, slice_uri); 247 + 248 + // Resolve actor data (handle, PDS) 249 + match resolve_actor_data(&self.http_client, did).await { 250 + Ok(actor_data) => { 251 + let actor = Actor { 252 + did: actor_data.did.clone(), 253 + handle: actor_data.handle, 254 + slice_uri: slice_uri.clone(), 255 + indexed_at: Utc::now().to_rfc3339(), 256 + }; 257 + 258 + // Insert into database 259 + if let Err(e) = self.database.batch_insert_actors(&[actor]).await { 260 + error!("Failed to create actor {}: {}", did, e); 261 + } else { 262 + // Add to cache after successful database insert 263 + let mut cache = self.actor_cache.write().await; 264 + cache.insert(cache_key, true); 265 + info!("✓ Created actor {} for slice {}", did, slice_uri); 266 + } 267 + } 268 + Err(e) => { 269 + error!("Failed to resolve actor data for {}: {}", did, e); 270 + } 271 + } 272 + } 232 273 233 274 let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 234 275 ··· 384 425 info!("✓ Deleted record: {} ({} rows) for {} slice(s)", uri, rows_affected, relevant_slices.len()); 385 426 let message = format!("Record deleted from {}", commit.collection); 386 427 387 - // Log to each relevant slice 388 - for slice_uri in relevant_slices { 428 + // Log to each relevant slice and check if actor cleanup is needed 429 + for slice_uri in &relevant_slices { 389 430 Logger::global().log_jetstream_with_slice( 390 431 LogLevel::Info, 391 432 &message, ··· 399 440 Some(&slice_uri) 400 441 ); 401 442 } 443 + 444 + // Check if actor should be cleaned up (no more records) 445 + for slice_uri in &relevant_slices { 446 + match self.database.actor_has_records(did, slice_uri).await { 447 + Ok(has_records) => { 448 + if !has_records { 449 + // No more records for this actor in this slice - clean up 450 + match self.database.delete_actor(did, slice_uri).await { 451 + Ok(deleted) => { 452 + if deleted > 0 { 453 + info!("✓ Cleaned up actor {} from slice {} (no records remaining)", did, slice_uri); 454 + // Remove from cache 455 + let cache_key = (did.to_string(), slice_uri.clone()); 456 + let mut cache = self.actor_cache.write().await; 457 + cache.remove(&cache_key); 458 + } 459 + } 460 + Err(e) => { 461 + error!("Failed to delete actor {} from slice {}: {}", did, slice_uri, e); 462 + } 463 + } 464 + } 465 + } 466 + Err(e) => { 467 + error!("Failed to check if actor {} has records in slice {}: {}", did, slice_uri, e); 468 + } 469 + } 470 + } 402 471 } 403 472 } 404 473 Err(e) => { ··· 464 533 }; 465 534 466 535 let consumer = Consumer::new(config); 536 + let http_client = Client::new(); 467 537 468 538 Ok(Self { 469 539 consumer, 470 540 database, 541 + http_client, 471 542 slice_collections: Arc::new(RwLock::new(HashMap::new())), 472 543 slice_domains: Arc::new(RwLock::new(HashMap::new())), 473 544 actor_cache: Arc::new(RwLock::new(HashMap::new())), ··· 596 667 // Create and register the event handler 597 668 let handler = Arc::new(SliceEventHandler { 598 669 database: self.database.clone(), 670 + http_client: self.http_client.clone(), 599 671 slice_collections: self.slice_collections.clone(), 600 672 slice_domains: self.slice_domains.clone(), 601 673 event_count: self.event_count.clone(),
+1
api/src/main.rs
··· 1 + mod actor_resolver; 1 2 mod atproto_extensions; 2 3 mod auth; 3 4 mod database;
+7 -64
api/src/sync.rs
··· 5 5 use tokio::time::{timeout, Duration}; 6 6 use tracing::{debug, error, info, warn}; 7 7 use atproto_identity::{ 8 - plc::query as plc_query, 9 - resolve::{resolve_subject, HickoryDnsResolver, InputType, parse_input}, 10 - web::query as web_query, 8 + resolve::{resolve_subject, HickoryDnsResolver}, 11 9 }; 12 10 11 + use crate::actor_resolver::resolve_actor_data; 13 12 use crate::database::Database; 14 13 use crate::errors::SyncError; 15 14 use crate::models::{Actor, Record}; ··· 616 615 async fn resolve_atp_data(&self, did: &str) -> Result<AtpData, SyncError> { 617 616 debug!("Resolving ATP data for DID: {}", did); 618 617 619 - // Check cache first 620 618 { 621 619 let cache = self.atp_cache.lock().unwrap(); 622 620 if let Some(cached_data) = cache.get(did) { ··· 631 629 Ok(resolved_did) => { 632 630 debug!("Successfully resolved subject: {}", resolved_did); 633 631 634 - // Parse the resolved DID and fetch the DID document 635 - let (pds_url, handle) = match parse_input(&resolved_did) { 636 - Ok(InputType::Plc(did)) => { 637 - match plc_query(&self.client, "plc.directory", &did).await { 638 - Ok(did_doc) => { 639 - let pds = did_doc.service 640 - .iter() 641 - .find(|service| { 642 - service.r#type.contains("AtprotoPersonalDataServer") 643 - }) 644 - .map(|service| service.service_endpoint.clone()) 645 - .map(|url| url.to_string()) 646 - .unwrap_or_else(|| "https://bsky.social".to_string()); 647 - 648 - let handle = did_doc.also_known_as 649 - .iter() 650 - .find(|aka| aka.starts_with("at://")) 651 - .map(|aka| aka.strip_prefix("at://").unwrap_or(aka).to_string()); 652 - 653 - (pds, handle) 654 - } 655 - Err(e) => { 656 - return Err(SyncError::Generic(format!("Failed to query PLC for {}: {:?}", did, e))); 657 - } 658 - } 659 - } 660 - Ok(InputType::Web(did)) => { 661 - match web_query(&self.client, &did).await { 662 - Ok(did_doc) => { 663 - let pds = did_doc.service 664 - .iter() 665 - .find(|service| { 666 - service.r#type.contains("AtprotoPersonalDataServer") 667 - }) 668 - .map(|service| service.service_endpoint.clone()) 669 - .map(|url| url.to_string()) 670 - .unwrap_or_else(|| "https://bsky.social".to_string()); 671 - 672 - let handle = did_doc.also_known_as 673 - .iter() 674 - .find(|aka| aka.starts_with("at://")) 675 - .map(|aka| aka.strip_prefix("at://").unwrap_or(aka).to_string()); 676 - 677 - (pds, handle) 678 - } 679 - Err(e) => { 680 - return Err(SyncError::Generic(format!("Failed to query web DID for {}: {:?}", did, e))); 681 - } 682 - } 683 - } 684 - Ok(InputType::Handle(_)) => { 685 - return Err(SyncError::Generic("Subject resolved to handle instead of DID".to_string())); 686 - } 687 - Err(e) => { 688 - return Err(SyncError::Generic(format!("Failed to parse resolved DID {}: {:?}", resolved_did, e))); 689 - } 690 - }; 632 + let actor_data = resolve_actor_data(&self.client, &resolved_did).await 633 + .map_err(|e| SyncError::Generic(e.to_string()))?; 691 634 692 635 let atp_data = AtpData { 693 - did: resolved_did, 694 - pds: pds_url, 695 - handle, 636 + did: actor_data.did, 637 + pds: actor_data.pds, 638 + handle: actor_data.handle, 696 639 }; 697 640 698 641 // Cache the result