interactive intro to open social at-me.zzstoatzz.io

chore: add pre-commit hooks and fix clippy warnings

- Add pre-commit configuration with cargo fmt and clippy checks
- Remove obsolete .tangled/workflows/deploy.yaml
- Fix all clippy warnings:
- Replace redundant closures with function references
- Use .first() instead of .get(0)
- Use .or_default() instead of .or_insert_with(Vec::new)
- Use .unsigned_abs() instead of .abs() as u32
- Use .div_ceil() instead of manual ceiling division
- Simplify iterator patterns with .flatten()
- Remove needless return statements
- Replace useless format!() with .to_string()
- Use array literals instead of vec![] where appropriate
- Remove redundant field names in struct initialization

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+282 -217
+16
.pre-commit-config.yaml
···
··· 1 + repos: 2 + - repo: local 3 + hooks: 4 + - id: cargo-fmt 5 + name: cargo fmt 6 + entry: cargo fmt -- 7 + language: system 8 + types: [rust] 9 + pass_filenames: true 10 + 11 + - id: cargo-clippy 12 + name: cargo clippy 13 + entry: cargo clippy -- -D warnings 14 + language: system 15 + types: [rust] 16 + pass_filenames: false
-16
.tangled/workflows/deploy.yaml
··· 1 - engine: nixery 2 - 3 - when: 4 - - event: ["push"] 5 - branch: ["main"] 6 - 7 - dependencies: 8 - nixpkgs: 9 - - rustc 10 - - cargo 11 - - rustfmt 12 - 13 - steps: 14 - - name: check formatting 15 - command: | 16 - cargo fmt --check
···
+2 -1
src/constants.rs
··· 1 use std::time::Duration; 2 3 // API Endpoints 4 - pub const BSKY_API_RESOLVE_HANDLE: &str = "https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle"; 5 pub const BSKY_API_GET_PROFILE: &str = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile"; 6 pub const PLC_DIRECTORY: &str = "https://plc.directory"; 7
··· 1 use std::time::Duration; 2 3 // API Endpoints 4 + pub const BSKY_API_RESOLVE_HANDLE: &str = 5 + "https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle"; 6 pub const BSKY_API_GET_PROFILE: &str = "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile"; 7 pub const PLC_DIRECTORY: &str = "https://plc.directory"; 8
+18 -4
src/firehose.rs
··· 106 } 107 } 108 109 - info!("Creating new firehose connection for DID: {} with {} collections", did, collections.len()); 110 111 // Create a broadcast channel with a buffer of 100 events 112 let (tx, _rx) = broadcast::channel::<FirehoseEvent>(constants::FIREHOSE_BROADCAST_BUFFER); ··· 183 }; 184 185 if failed { 186 - tokio::time::sleep(tokio::time::Duration::from_secs(constants::FIREHOSE_RECONNECT_DELAY_SECONDS)).await; 187 continue; 188 } 189 190 - info!("Jetstream connection dropped for DID: {}, reconnecting in {} seconds...", did_clone, constants::FIREHOSE_RECONNECT_DELAY_SECONDS); 191 - tokio::time::sleep(tokio::time::Duration::from_secs(constants::FIREHOSE_RECONNECT_DELAY_SECONDS)).await; 192 } 193 }); 194
··· 106 } 107 } 108 109 + info!( 110 + "Creating new firehose connection for DID: {} with {} collections", 111 + did, 112 + collections.len() 113 + ); 114 115 // Create a broadcast channel with a buffer of 100 events 116 let (tx, _rx) = broadcast::channel::<FirehoseEvent>(constants::FIREHOSE_BROADCAST_BUFFER); ··· 187 }; 188 189 if failed { 190 + tokio::time::sleep(tokio::time::Duration::from_secs( 191 + constants::FIREHOSE_RECONNECT_DELAY_SECONDS, 192 + )) 193 + .await; 194 continue; 195 } 196 197 + info!( 198 + "Jetstream connection dropped for DID: {}, reconnecting in {} seconds...", 199 + did_clone, 200 + constants::FIREHOSE_RECONNECT_DELAY_SECONDS 201 + ); 202 + tokio::time::sleep(tokio::time::Duration::from_secs( 203 + constants::FIREHOSE_RECONNECT_DELAY_SECONDS, 204 + )) 205 + .await; 206 } 207 }); 208
+4 -4
src/main.rs
··· 1 - use actix_web::{App, HttpServer, middleware, web}; 2 use actix_files::Files; 3 - use actix_session::{SessionMiddleware, storage::CookieSessionStore}; 4 use actix_web::cookie::Key; 5 6 mod constants; 7 mod firehose; ··· 36 .wrap(middleware::Logger::default()) 37 .wrap( 38 SessionMiddleware::builder(CookieSessionStore::default(), session_key.clone()) 39 - .cookie_secure(false) // Set to true in production with HTTPS 40 - .build() 41 ) 42 .app_data(web::Data::new(firehose_manager.clone())) 43 .app_data(web::Data::new(oauth_client.clone()))
··· 1 use actix_files::Files; 2 + use actix_session::{storage::CookieSessionStore, SessionMiddleware}; 3 use actix_web::cookie::Key; 4 + use actix_web::{middleware, web, App, HttpServer}; 5 6 mod constants; 7 mod firehose; ··· 36 .wrap(middleware::Logger::default()) 37 .wrap( 38 SessionMiddleware::builder(CookieSessionStore::default(), session_key.clone()) 39 + .cookie_secure(false) // Set to true in production with HTTPS 40 + .build(), 41 ) 42 .app_data(web::Data::new(firehose_manager.clone())) 43 .app_data(web::Data::new(oauth_client.clone()))
+15 -10
src/mst.rs
··· 52 // Build tree structure 53 let root = build_tree(nodes); 54 55 - MSTResponse { 56 - root, 57 - record_count, 58 - } 59 } 60 61 fn calculate_key_depth(key: &str) -> i32 { 62 // Simplified depth calculation based on key hash 63 let mut hash: i32 = 0; 64 for ch in key.chars() { 65 - hash = hash.wrapping_shl(5).wrapping_sub(hash).wrapping_add(ch as i32); 66 } 67 68 // Count leading zero bits (approximation) 69 - let abs_hash = hash.abs() as u32; 70 let binary = format!("{:032b}", abs_hash); 71 72 let mut depth = 0; ··· 99 // Group by depth 100 let mut by_depth: HashMap<i32, Vec<MSTNode>> = HashMap::new(); 101 for node in nodes { 102 - by_depth.entry(node.depth).or_insert_with(Vec::new).push(node); 103 } 104 105 let mut depths: Vec<i32> = by_depth.keys().copied().collect(); 106 depths.sort(); 107 108 // Build tree bottom-up 109 - let mut current_level: Vec<MSTNode> = by_depth.remove(&depths[depths.len() - 1]).unwrap_or_default(); 110 111 // Work backwards through depths 112 for i in (0..depths.len() - 1).rev() { ··· 117 let children_per_parent = if parent_nodes.is_empty() { 118 0 119 } else { 120 - (current_level.len() + parent_nodes.len() - 1) / parent_nodes.len() 121 }; 122 123 for (i, parent) in parent_nodes.iter_mut().enumerate() { ··· 145 pub async fn fetch_records(pds: &str, did: &str, collection: &str) -> Result<Vec<Record>, String> { 146 let url = format!( 147 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection={}&limit={}", 148 - pds, did, collection, constants::MST_FETCH_LIMIT 149 ); 150 151 let response = reqwest::get(&url)
··· 52 // Build tree structure 53 let root = build_tree(nodes); 54 55 + MSTResponse { root, record_count } 56 } 57 58 fn calculate_key_depth(key: &str) -> i32 { 59 // Simplified depth calculation based on key hash 60 let mut hash: i32 = 0; 61 for ch in key.chars() { 62 + hash = hash 63 + .wrapping_shl(5) 64 + .wrapping_sub(hash) 65 + .wrapping_add(ch as i32); 66 } 67 68 // Count leading zero bits (approximation) 69 + let abs_hash = hash.unsigned_abs(); 70 let binary = format!("{:032b}", abs_hash); 71 72 let mut depth = 0; ··· 99 // Group by depth 100 let mut by_depth: HashMap<i32, Vec<MSTNode>> = HashMap::new(); 101 for node in nodes { 102 + by_depth.entry(node.depth).or_default().push(node); 103 } 104 105 let mut depths: Vec<i32> = by_depth.keys().copied().collect(); 106 depths.sort(); 107 108 // Build tree bottom-up 109 + let mut current_level: Vec<MSTNode> = by_depth 110 + .remove(&depths[depths.len() - 1]) 111 + .unwrap_or_default(); 112 113 // Work backwards through depths 114 for i in (0..depths.len() - 1).rev() { ··· 119 let children_per_parent = if parent_nodes.is_empty() { 120 0 121 } else { 122 + current_level.len().div_ceil(parent_nodes.len()) 123 }; 124 125 for (i, parent) in parent_nodes.iter_mut().enumerate() { ··· 147 pub async fn fetch_records(pds: &str, did: &str, collection: &str) -> Result<Vec<Record>, String> { 148 let url = format!( 149 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection={}&limit={}", 150 + pds, 151 + did, 152 + collection, 153 + constants::MST_FETCH_LIMIT 154 ); 155 156 let response = reqwest::get(&url)
+9 -5
src/oauth.rs
··· 3 handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig, DnsTxtResolver}, 4 }; 5 use atrium_oauth::{ 6 AtprotoClientMetadata, AtprotoLocalhostClientMetadata, AuthMethod, DefaultHttpClient, 7 GrantType, KnownScope, OAuthClient, OAuthClientConfig, OAuthResolverConfig, Scope, 8 - store::{session::MemorySessionStore, state::MemoryStateStore}, 9 }; 10 - use hickory_resolver::{TokioAsyncResolver, config::{ResolverConfig, ResolverOpts}}; 11 use std::sync::Arc; 12 13 use crate::constants; ··· 41 42 pub fn create_oauth_client() -> OAuthClientType { 43 let http_client = Arc::new(DefaultHttpClient::default()); 44 - let dns_resolver = HickoryDnsResolver(Arc::new( 45 - TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default()), 46 - )); 47 48 let redirect_uri = std::env::var("OAUTH_REDIRECT_URI") 49 .unwrap_or_else(|_| constants::DEFAULT_OAUTH_CALLBACK.to_string());
··· 3 handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig, DnsTxtResolver}, 4 }; 5 use atrium_oauth::{ 6 + store::{session::MemorySessionStore, state::MemoryStateStore}, 7 AtprotoClientMetadata, AtprotoLocalhostClientMetadata, AuthMethod, DefaultHttpClient, 8 GrantType, KnownScope, OAuthClient, OAuthClientConfig, OAuthResolverConfig, Scope, 9 + }; 10 + use hickory_resolver::{ 11 + config::{ResolverConfig, ResolverOpts}, 12 + TokioAsyncResolver, 13 }; 14 use std::sync::Arc; 15 16 use crate::constants; ··· 44 45 pub fn create_oauth_client() -> OAuthClientType { 46 let http_client = Arc::new(DefaultHttpClient::default()); 47 + let dns_resolver = HickoryDnsResolver(Arc::new(TokioAsyncResolver::tokio( 48 + ResolverConfig::default(), 49 + ResolverOpts::default(), 50 + ))); 51 52 let redirect_uri = std::env::var("OAUTH_REDIRECT_URI") 53 .unwrap_or_else(|_| constants::DEFAULT_OAUTH_CALLBACK.to_string());
+218 -177
src/routes.rs
··· 1 use actix_session::Session; 2 use actix_web::{get, post, web, HttpResponse, Responder}; 3 - use atrium_oauth::{AuthorizeOptions, CallbackParams, KnownScope, Scope, OAuthSession}; 4 - use atrium_oauth::DefaultHttpClient; 5 use atrium_identity::did::CommonDidResolver; 6 use atrium_identity::handle::AtprotoHandleResolver; 7 use serde::Deserialize; 8 - use once_cell::sync::Lazy; 9 use std::collections::HashMap; 10 use std::sync::{Arc, Mutex}; 11 use std::time::Instant; 12 - use futures_util::future; 13 - use dashmap::DashMap; 14 - use serde::Serialize; 15 16 use crate::constants; 17 use crate::firehose::FirehoseManager; 18 use crate::mst; 19 - use crate::oauth::{OAuthClientType, HickoryDnsResolver}; 20 use crate::templates; 21 22 // Avatar cache with 1 hour TTL ··· 34 timestamp: Instant, 35 } 36 37 - static DID_CACHE: Lazy<DashMap<String, CachedDid>> = 38 - Lazy::new(|| DashMap::new()); 39 40 // Guestbook signature struct 41 #[derive(Serialize, Clone)] ··· 67 DefaultHttpClient, 68 CommonDidResolver<DefaultHttpClient>, 69 AtprotoHandleResolver<HickoryDnsResolver, DefaultHttpClient>, 70 - atrium_common::store::memory::MemoryStore<atrium_api::types::string::Did, atrium_oauth::store::session::Session>, 71 >; 72 73 // OAuth session cache - stores authenticated agents by DID 74 static AGENT_CACHE: Lazy<DashMap<String, Arc<atrium_api::agent::Agent<OAuthSessionType>>>> = 75 - Lazy::new(|| DashMap::new()); 76 77 const FAVICON_SVG: &str = include_str!("../static/favicon.svg"); 78 ··· 97 did_param.clone() 98 } else if let Some(handle) = &query.handle { 99 // Handle provided - resolve to DID 100 - let resolve_url = format!( 101 - "{}?handle={}", 102 - constants::BSKY_API_RESOLVE_HANDLE, handle 103 - ); 104 105 match reqwest::get(&resolve_url).await { 106 Ok(response) => match response.json::<serde_json::Value>().await { ··· 134 } 135 136 #[post("/login")] 137 - pub async fn login( 138 - form: web::Form<LoginForm>, 139 - client: web::Data<OAuthClientType>, 140 - ) -> HttpResponse { 141 let handle = match atrium_api::types::string::Handle::new(form.handle.clone()) { 142 Ok(h) => h, 143 Err(_) => return HttpResponse::BadRequest().body("invalid handle"), ··· 205 206 // Store DID in actix session 207 if let Err(e) = session.insert(constants::SESSION_KEY_DID, &did_string) { 208 - return HttpResponse::InternalServerError().body(format!("session error: {}", e)); 209 } 210 HttpResponse::SeeOther() 211 .append_header(("Location", format!("/view?did={}&auth=success", did_string))) ··· 285 #[serde(rename_all = "camelCase")] 286 pub struct AppInfo { 287 namespace: String, 288 - namespaces: Vec<String>, // for merged apps 289 collections: Vec<String>, 290 - did: Option<String>, // DID of the namespace owner (if resolvable) 291 } 292 293 #[derive(serde::Serialize)] ··· 313 // Reverse namespace to get potential domain (e.g., app.bsky -> bsky.app) 314 let reversed: String = namespace.split('.').rev().collect::<Vec<&str>>().join("."); 315 316 - let handles = vec![ 317 - reversed.clone(), 318 - format!("{}.bsky.social", reversed), 319 - ]; 320 321 // Try all handle variations concurrently 322 - let futures: Vec<_> = handles.iter() 323 .map(|handle| try_resolve_handle_to_did(handle)) 324 .collect(); 325 326 let results = future::join_all(futures).await; 327 328 // Return first successful resolution 329 - let mut resolved_did: Option<String> = None; 330 - for result in results { 331 - if let Some(did) = result { 332 - resolved_did = Some(did); 333 - break; 334 - } 335 - } 336 337 // Cache the result (even if None) 338 - DID_CACHE.insert(namespace.to_string(), CachedDid { 339 - did: resolved_did.clone(), 340 - timestamp: Instant::now(), 341 - }); 342 343 resolved_did 344 } 345 346 async fn try_resolve_handle_to_did(handle: &str) -> Option<String> { 347 - let resolve_url = format!( 348 - "{}?handle={}", 349 - constants::BSKY_API_RESOLVE_HANDLE, handle 350 - ); 351 352 match reqwest::get(&resolve_url).await { 353 Ok(response) => match response.json::<serde_json::Value>().await { ··· 366 let did_doc_url = format!("{}/{}", constants::PLC_DIRECTORY, did); 367 let did_doc_response = match reqwest::get(&did_doc_url).await { 368 Ok(r) => r, 369 - Err(e) => return HttpResponse::InternalServerError().json(serde_json::json!({ 370 - "error": format!("failed to fetch DID document: {}", e) 371 - })), 372 }; 373 374 let did_doc: serde_json::Value = match did_doc_response.json().await { 375 Ok(d) => d, 376 - Err(e) => return HttpResponse::InternalServerError().json(serde_json::json!({ 377 - "error": format!("failed to parse DID document: {}", e) 378 - })), 379 }; 380 381 // Extract PDS and handle 382 let pds = did_doc["service"] 383 .as_array() 384 .and_then(|services| { 385 - services.iter().find(|s| { 386 - s["type"].as_str() == Some("AtprotoPersonalDataServer") 387 - }) 388 }) 389 .and_then(|s| s["serviceEndpoint"].as_str()) 390 .unwrap_or("") ··· 392 393 let handle = did_doc["alsoKnownAs"] 394 .as_array() 395 - .and_then(|aka| aka.get(0)) 396 .and_then(|v| v.as_str()) 397 .map(|s| s.replace("at://", "")) 398 .unwrap_or_else(|| did.to_string()); ··· 404 let repo_url = format!("{}/xrpc/com.atproto.repo.describeRepo?repo={}", pds, did); 405 let repo_response = match reqwest::get(&repo_url).await { 406 Ok(r) => r, 407 - Err(e) => return HttpResponse::InternalServerError().json(serde_json::json!({ 408 - "error": format!("failed to fetch repo: {}", e) 409 - })), 410 }; 411 412 let repo_data: serde_json::Value = match repo_response.json().await { 413 Ok(d) => d, 414 - Err(e) => return HttpResponse::InternalServerError().json(serde_json::json!({ 415 - "error": format!("failed to parse repo: {}", e) 416 - })), 417 }; 418 419 let collections = repo_data["collections"] ··· 426 .unwrap_or_default(); 427 428 // Group collections by namespace 429 - let mut namespace_to_collections: std::collections::HashMap<String, Vec<String>> = std::collections::HashMap::new(); 430 for collection in collections { 431 let parts: Vec<&str> = collection.split('.').collect(); 432 if parts.len() >= 2 { 433 let namespace = format!("{}.{}", parts[0], parts[1]); 434 - namespace_to_collections.entry(namespace) 435 - .or_insert_with(Vec::new) 436 .push(collection); 437 } 438 } ··· 442 let known_merge_namespaces = vec!["app.bsky", "chat.bsky"]; 443 let namespaces: Vec<String> = namespace_to_collections.keys().cloned().collect(); 444 445 - let resolution_futures: Vec<_> = namespaces.iter() 446 .map(|ns| { 447 let ns = ns.clone(); 448 let known = known_merge_namespaces.clone(); ··· 463 .into_iter() 464 .zip(resolved_dids.into_iter()) 465 .map(|(ns, did)| { 466 - let collections = namespace_to_collections.get(&ns).cloned().unwrap_or_default(); 467 (ns, did, collections) 468 }) 469 .collect(); 470 471 // Apply fallback: if namespace didn't resolve, try to find a sibling namespace with same domain 472 - let mut namespace_to_did: std::collections::HashMap<String, Option<String>> = std::collections::HashMap::new(); 473 for (namespace, did_opt, _) in &namespace_data { 474 namespace_to_did.insert(namespace.clone(), did_opt.clone()); 475 } 476 477 // Build domain -> DIDs map for fallback 478 - let mut domain_to_dids: std::collections::HashMap<String, Vec<String>> = std::collections::HashMap::new(); 479 for (namespace, did_opt, _) in &namespace_data { 480 if let Some(did) = did_opt { 481 // Extract second-level domain (e.g., "app.bsky" -> "bsky", "chat.bsky" -> "bsky") 482 let parts: Vec<&str> = namespace.split('.').collect(); 483 if parts.len() >= 2 { 484 let domain = parts[1].to_string(); 485 - domain_to_dids.entry(domain) 486 - .or_insert_with(Vec::new) 487 - .push(did.clone()); 488 } 489 } 490 } ··· 513 } 514 515 // Group by DID for merging 516 - let mut did_to_namespaces: std::collections::HashMap<String, Vec<(String, Vec<String>)>> = std::collections::HashMap::new(); 517 let mut no_did_apps: Vec<(String, Vec<String>)> = Vec::new(); 518 519 for (namespace, did_opt, collections) in resolved_namespace_data { 520 if let Some(did) = did_opt { 521 - did_to_namespaces.entry(did) 522 - .or_insert_with(Vec::new) 523 .push((namespace, collections)); 524 } else { 525 no_did_apps.push((namespace, collections)); ··· 615 // Cache the result 616 { 617 let mut cache = AVATAR_CACHE.lock().unwrap(); 618 - cache.insert(namespace.clone(), CachedAvatar { 619 - url: avatar_url.clone(), 620 - timestamp: Instant::now(), 621 - }); 622 } 623 624 HttpResponse::Ok() ··· 631 async fn fetch_avatar_for_namespace(namespace: &str) -> Option<String> { 632 // Reverse namespace to get domain (e.g., io.zzstoatzz -> zzstoatzz.io) 633 let reversed: String = namespace.split('.').rev().collect::<Vec<&str>>().join("."); 634 - let handles = vec![ 635 - reversed.clone(), 636 - format!("{}.bsky.social", reversed), 637 - ]; 638 639 // Try all handles concurrently 640 - let futures: Vec<_> = handles.iter() 641 .map(|handle| try_fetch_avatar_for_handle(handle)) 642 .collect(); 643 644 // Wait for all futures and return first successful result 645 let results = future::join_all(futures).await; 646 647 - for result in results { 648 - if let Some(avatar) = result { 649 - return Some(avatar); 650 - } 651 - } 652 - 653 - None 654 } 655 656 async fn try_fetch_avatar_for_handle(handle: &str) -> Option<String> { 657 // Try to resolve handle to DID 658 - let resolve_url = format!( 659 - "{}?handle={}", 660 - constants::BSKY_API_RESOLVE_HANDLE, handle 661 - ); 662 663 let did = match reqwest::get(&resolve_url).await { 664 Ok(response) => match response.json::<serde_json::Value>().await { ··· 669 }; 670 671 // Try to get profile 672 - let profile_url = format!( 673 - "{}?actor={}", 674 - constants::BSKY_API_GET_PROFILE, did 675 - ); 676 677 match reqwest::get(&profile_url).await { 678 Ok(response) => match response.json::<serde_json::Value>().await { ··· 768 // Check if user is logged in 769 let did: Option<String> = match session.get(constants::SESSION_KEY_DID) { 770 Ok(d) => d, 771 - Err(_) => return HttpResponse::Unauthorized().json(serde_json::json!({ 772 - "error": "not authenticated" 773 - })), 774 }; 775 776 let did = match did { 777 Some(d) => d, 778 - None => return HttpResponse::Unauthorized().json(serde_json::json!({ 779 - "error": "not authenticated" 780 - })), 781 }; 782 783 // Retrieve authenticated agent from cache 784 let agent = match AGENT_CACHE.get(&did) { 785 Some(a) => a.clone(), 786 - None => return HttpResponse::Unauthorized().json(serde_json::json!({ 787 - "error": "session expired, please log in again" 788 - })), 789 }; 790 791 // Create the visit record ··· 798 // Convert to Unknown type 799 let record: atrium_api::types::Unknown = serde_json::from_value(record_json) 800 .map_err(|e| { 801 - return HttpResponse::InternalServerError().json(serde_json::json!({ 802 "error": format!("failed to serialize record: {}", e) 803 })) 804 }) ··· 806 807 // Create the record in the user's PDS 808 let input = atrium_api::com::atproto::repo::create_record::InputData { 809 - collection: atrium_api::types::string::Nsid::new(constants::GUESTBOOK_COLLECTION.to_string()).unwrap(), 810 - record: record, 811 repo: atrium_api::types::string::AtIdentifier::Did( 812 - atrium_api::types::string::Did::new(did.clone()).unwrap() 813 ), 814 rkey: None, 815 swap_commit: None, 816 validate: None, 817 }; 818 819 - match agent 820 - .api 821 - .com 822 - .atproto 823 - .repo 824 - .create_record(input.into()) 825 - .await 826 - { 827 Ok(output) => { 828 // Fetch fresh data from UFOs and add this signature 829 match fetch_signatures_from_ufos().await { ··· 843 // Update cache 844 { 845 let mut cache = GUESTBOOK_CACHE.lock().unwrap(); 846 - *cache = Some(CachedGuestbookSignatures { 847 - signatures, 848 - }); 849 } 850 851 log::info!("Added signature to cache for DID: {}", did); 852 } 853 Err(e) => { 854 - log::warn!("Failed to update cache after signing, invalidating instead: {}", e); 855 invalidate_guestbook_cache(); 856 } 857 } ··· 861 "uri": output.data.uri, 862 "cid": output.data.cid, 863 })) 864 - }, 865 Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ 866 "error": format!("failed to create record: {}", e) 867 })), ··· 882 if let Some(ref did_str) = did { 883 if let Some(agent) = AGENT_CACHE.get(did_str) { 884 let list_input = atrium_api::com::atproto::repo::list_records::ParametersData { 885 - collection: atrium_api::types::string::Nsid::new(constants::GUESTBOOK_COLLECTION.to_string()).unwrap(), 886 repo: atrium_api::types::string::AtIdentifier::Did( 887 - atrium_api::types::string::Did::new(did_str.clone()).unwrap() 888 ), 889 cursor: None, 890 limit: Some(atrium_api::types::LimitedNonZeroU8::try_from(1).unwrap()), 891 reverse: None, 892 }; 893 894 - if let Ok(output) = agent.api.com.atproto.repo.list_records(list_input.into()).await { 895 has_records = !output.data.records.is_empty(); 896 } 897 } ··· 916 // Check if user is logged in 917 let did: Option<String> = match session.get(constants::SESSION_KEY_DID) { 918 Ok(d) => d, 919 - Err(_) => return HttpResponse::Unauthorized().json(serde_json::json!({ 920 - "error": "not authenticated" 921 - })), 922 }; 923 924 let did = match did { 925 Some(d) => d, 926 - None => return HttpResponse::Unauthorized().json(serde_json::json!({ 927 - "error": "not authenticated" 928 - })), 929 }; 930 931 // Retrieve authenticated agent from cache 932 let agent = match AGENT_CACHE.get(&did) { 933 Some(a) => a.clone(), 934 - None => return HttpResponse::Unauthorized().json(serde_json::json!({ 935 - "error": "session expired, please log in again" 936 - })), 937 }; 938 939 // List all guestbook records for this user 940 let list_input = atrium_api::com::atproto::repo::list_records::ParametersData { 941 - collection: atrium_api::types::string::Nsid::new(constants::GUESTBOOK_COLLECTION.to_string()).unwrap(), 942 repo: atrium_api::types::string::AtIdentifier::Did( 943 - atrium_api::types::string::Did::new(did.clone()).unwrap() 944 ), 945 cursor: None, 946 limit: Some(atrium_api::types::LimitedNonZeroU8::try_from(100).unwrap()), ··· 956 .await 957 { 958 Ok(output) => output.data.records, 959 - Err(e) => return HttpResponse::InternalServerError().json(serde_json::json!({ 960 - "error": format!("failed to list records: {}", e) 961 - })), 962 }; 963 964 if records.is_empty() { ··· 976 let uri_parts: Vec<&str> = record.uri.split('/').collect(); 977 if let Some(rkey) = uri_parts.last() { 978 let delete_input = atrium_api::com::atproto::repo::delete_record::InputData { 979 - collection: atrium_api::types::string::Nsid::new(constants::GUESTBOOK_COLLECTION.to_string()).unwrap(), 980 repo: atrium_api::types::string::AtIdentifier::Did( 981 - atrium_api::types::string::Did::new(did.clone()).unwrap() 982 ), 983 rkey: atrium_api::types::string::RecordKey::new(rkey.to_string()).unwrap(), 984 swap_commit: None, ··· 1016 } 1017 } 1018 Err(e) => { 1019 - log::warn!("Failed to update cache after unsigning, invalidating instead: {}", e); 1020 invalidate_guestbook_cache(); 1021 } 1022 } ··· 1039 let cache = GUESTBOOK_CACHE.lock().unwrap(); 1040 if let Some(cached) = cache.as_ref() { 1041 // Cache is valid - return cached signatures 1042 - log::info!("Returning {} signatures from cache", cached.signatures.len()); 1043 - log::info!("Cached signature DIDs: {:?}", cached.signatures.iter().map(|s| &s.did).collect::<Vec<_>>()); 1044 return HttpResponse::Ok() 1045 .insert_header(("Cache-Control", "public, max-age=10")) 1046 .json(&cached.signatures); ··· 1080 if let Ok(doc) = response.json::<serde_json::Value>().await { 1081 doc["alsoKnownAs"] 1082 .as_array() 1083 - .and_then(|aka| aka.get(0)) 1084 .and_then(|v| v.as_str()) 1085 .map(|s| s.replace("at://", "")) 1086 } else { ··· 1109 .await 1110 .map_err(|e| format!("failed to fetch from UFOs API: {}", e))?; 1111 1112 - let records: Vec<UfosRecord> = response.json() 1113 .await 1114 .map_err(|e| format!("failed to parse UFOs response: {}", e))?; 1115 1116 log::info!("Fetched {} records from UFOs API", records.len()); 1117 1118 // Fetch profile info for each DID in parallel 1119 - let profile_futures: Vec<_> = records.iter() 1120 .map(|record| { 1121 let did = record.did.clone(); 1122 let timestamp = record.record["createdAt"] ··· 1140 // Sort by timestamp (most recent first) 1141 signatures.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); 1142 1143 - log::info!("Processed {} signatures with profile info", signatures.len()); 1144 1145 Ok(signatures) 1146 } ··· 1160 pub async fn check_page_owner_signature(query: web::Query<CheckSignatureQuery>) -> HttpResponse { 1161 let did = &query.did; 1162 1163 - log::info!("Checking if DID has signed guestbook by querying their PDS: {}", did); 1164 1165 // Fetch DID document to get PDS URL 1166 let did_doc_url = format!("{}/{}", constants::PLC_DIRECTORY, did); 1167 let pds = match reqwest::get(&did_doc_url).await { 1168 Ok(response) => match response.json::<serde_json::Value>().await { 1169 - Ok(doc) => { 1170 - doc["service"] 1171 - .as_array() 1172 - .and_then(|services| { 1173 - services.iter().find(|s| { 1174 - s["type"].as_str() == Some("AtprotoPersonalDataServer") 1175 - }) 1176 - }) 1177 - .and_then(|s| s["serviceEndpoint"].as_str()) 1178 - .unwrap_or("") 1179 - .to_string() 1180 - } 1181 Err(e) => { 1182 log::error!("Failed to parse DID document: {}", e); 1183 return HttpResponse::InternalServerError().json(serde_json::json!({ ··· 1196 // Query the PDS for guestbook records 1197 let records_url = format!( 1198 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection={}&limit=1", 1199 - pds, did, constants::GUESTBOOK_COLLECTION 1200 ); 1201 1202 match reqwest::get(&records_url).await { ··· 1269 let pds = did_doc["service"] 1270 .as_array() 1271 .and_then(|services| { 1272 - services.iter().find(|s| { 1273 - s["type"].as_str() == Some("AtprotoPersonalDataServer") 1274 - }) 1275 }) 1276 .and_then(|s| s["serviceEndpoint"].as_str()) 1277 .unwrap_or("") ··· 1281 let repo_url = format!("{}/xrpc/com.atproto.repo.describeRepo?repo={}", pds, did); 1282 let mut collections = match reqwest::get(&repo_url).await { 1283 Ok(r) => match r.json::<serde_json::Value>().await { 1284 - Ok(repo_data) => { 1285 - repo_data["collections"] 1286 - .as_array() 1287 - .map(|arr| { 1288 - arr.iter() 1289 - .filter_map(|v| v.as_str().map(String::from)) 1290 - .collect::<Vec<String>>() 1291 - }) 1292 - .unwrap_or_default() 1293 - } 1294 Err(e) => { 1295 log::error!("Failed to parse repo data: {}", e); 1296 vec![] ··· 1307 collections.push(constants::GUESTBOOK_COLLECTION.to_string()); 1308 } 1309 1310 - log::info!("Fetched {} collections for DID: {} (including guestbook)", collections.len(), did); 1311 1312 // Get or create a broadcaster for this DID with its collections 1313 - let broadcaster = crate::firehose::get_or_create_broadcaster(&manager, did.clone(), collections).await; 1314 let mut rx = broadcaster.subscribe(); 1315 1316 log::info!("SSE connection established for DID: {}", did); ··· 1318 let stream = async_stream::stream! { 1319 // Send initial connection message 1320 yield Ok::<_, actix_web::Error>( 1321 - web::Bytes::from(format!("data: {{\"type\":\"connected\"}}\n\n")) 1322 ); 1323 1324 log::info!("Sent initial connection message to client");
··· 1 use actix_session::Session; 2 use actix_web::{get, post, web, HttpResponse, Responder}; 3 use atrium_identity::did::CommonDidResolver; 4 use atrium_identity::handle::AtprotoHandleResolver; 5 + use atrium_oauth::DefaultHttpClient; 6 + use atrium_oauth::{AuthorizeOptions, CallbackParams, KnownScope, OAuthSession, Scope}; 7 + use dashmap::DashMap; 8 + use futures_util::future; 9 + use once_cell::sync::Lazy; 10 use serde::Deserialize; 11 + use serde::Serialize; 12 use std::collections::HashMap; 13 use std::sync::{Arc, Mutex}; 14 use std::time::Instant; 15 16 use crate::constants; 17 use crate::firehose::FirehoseManager; 18 use crate::mst; 19 + use crate::oauth::{HickoryDnsResolver, OAuthClientType}; 20 use crate::templates; 21 22 // Avatar cache with 1 hour TTL ··· 34 timestamp: Instant, 35 } 36 37 + static DID_CACHE: Lazy<DashMap<String, CachedDid>> = Lazy::new(DashMap::new); 38 39 // Guestbook signature struct 40 #[derive(Serialize, Clone)] ··· 66 DefaultHttpClient, 67 CommonDidResolver<DefaultHttpClient>, 68 AtprotoHandleResolver<HickoryDnsResolver, DefaultHttpClient>, 69 + atrium_common::store::memory::MemoryStore< 70 + atrium_api::types::string::Did, 71 + atrium_oauth::store::session::Session, 72 + >, 73 >; 74 75 // OAuth session cache - stores authenticated agents by DID 76 static AGENT_CACHE: Lazy<DashMap<String, Arc<atrium_api::agent::Agent<OAuthSessionType>>>> = 77 + Lazy::new(DashMap::new); 78 79 const FAVICON_SVG: &str = include_str!("../static/favicon.svg"); 80 ··· 99 did_param.clone() 100 } else if let Some(handle) = &query.handle { 101 // Handle provided - resolve to DID 102 + let resolve_url = format!("{}?handle={}", constants::BSKY_API_RESOLVE_HANDLE, handle); 103 104 match reqwest::get(&resolve_url).await { 105 Ok(response) => match response.json::<serde_json::Value>().await { ··· 133 } 134 135 #[post("/login")] 136 + pub async fn login(form: web::Form<LoginForm>, client: web::Data<OAuthClientType>) -> HttpResponse { 137 let handle = match atrium_api::types::string::Handle::new(form.handle.clone()) { 138 Ok(h) => h, 139 Err(_) => return HttpResponse::BadRequest().body("invalid handle"), ··· 201 202 // Store DID in actix session 203 if let Err(e) = session.insert(constants::SESSION_KEY_DID, &did_string) { 204 + return HttpResponse::InternalServerError() 205 + .body(format!("session error: {}", e)); 206 } 207 HttpResponse::SeeOther() 208 .append_header(("Location", format!("/view?did={}&auth=success", did_string))) ··· 282 #[serde(rename_all = "camelCase")] 283 pub struct AppInfo { 284 namespace: String, 285 + namespaces: Vec<String>, // for merged apps 286 collections: Vec<String>, 287 + did: Option<String>, // DID of the namespace owner (if resolvable) 288 } 289 290 #[derive(serde::Serialize)] ··· 310 // Reverse namespace to get potential domain (e.g., app.bsky -> bsky.app) 311 let reversed: String = namespace.split('.').rev().collect::<Vec<&str>>().join("."); 312 313 + let handles = [reversed.clone(), format!("{}.bsky.social", reversed)]; 314 315 // Try all handle variations concurrently 316 + let futures: Vec<_> = handles 317 + .iter() 318 .map(|handle| try_resolve_handle_to_did(handle)) 319 .collect(); 320 321 let results = future::join_all(futures).await; 322 323 // Return first successful resolution 324 + let resolved_did = results.into_iter().flatten().next(); 325 326 // Cache the result (even if None) 327 + DID_CACHE.insert( 328 + namespace.to_string(), 329 + CachedDid { 330 + did: resolved_did.clone(), 331 + timestamp: Instant::now(), 332 + }, 333 + ); 334 335 resolved_did 336 } 337 338 async fn try_resolve_handle_to_did(handle: &str) -> Option<String> { 339 + let resolve_url = format!("{}?handle={}", constants::BSKY_API_RESOLVE_HANDLE, handle); 340 341 match reqwest::get(&resolve_url).await { 342 Ok(response) => match response.json::<serde_json::Value>().await { ··· 355 let did_doc_url = format!("{}/{}", constants::PLC_DIRECTORY, did); 356 let did_doc_response = match reqwest::get(&did_doc_url).await { 357 Ok(r) => r, 358 + Err(e) => { 359 + return HttpResponse::InternalServerError().json(serde_json::json!({ 360 + "error": format!("failed to fetch DID document: {}", e) 361 + })) 362 + } 363 }; 364 365 let did_doc: serde_json::Value = match did_doc_response.json().await { 366 Ok(d) => d, 367 + Err(e) => { 368 + return HttpResponse::InternalServerError().json(serde_json::json!({ 369 + "error": format!("failed to parse DID document: {}", e) 370 + })) 371 + } 372 }; 373 374 // Extract PDS and handle 375 let pds = did_doc["service"] 376 .as_array() 377 .and_then(|services| { 378 + services 379 + .iter() 380 + .find(|s| s["type"].as_str() == Some("AtprotoPersonalDataServer")) 381 }) 382 .and_then(|s| s["serviceEndpoint"].as_str()) 383 .unwrap_or("") ··· 385 386 let handle = did_doc["alsoKnownAs"] 387 .as_array() 388 + .and_then(|aka| aka.first()) 389 .and_then(|v| v.as_str()) 390 .map(|s| s.replace("at://", "")) 391 .unwrap_or_else(|| did.to_string()); ··· 397 let repo_url = format!("{}/xrpc/com.atproto.repo.describeRepo?repo={}", pds, did); 398 let repo_response = match reqwest::get(&repo_url).await { 399 Ok(r) => r, 400 + Err(e) => { 401 + return HttpResponse::InternalServerError().json(serde_json::json!({ 402 + "error": format!("failed to fetch repo: {}", e) 403 + })) 404 + } 405 }; 406 407 let repo_data: serde_json::Value = match repo_response.json().await { 408 Ok(d) => d, 409 + Err(e) => { 410 + return HttpResponse::InternalServerError().json(serde_json::json!({ 411 + "error": format!("failed to parse repo: {}", e) 412 + })) 413 + } 414 }; 415 416 let collections = repo_data["collections"] ··· 423 .unwrap_or_default(); 424 425 // Group collections by namespace 426 + let mut namespace_to_collections: std::collections::HashMap<String, Vec<String>> = 427 + std::collections::HashMap::new(); 428 for collection in collections { 429 let parts: Vec<&str> = collection.split('.').collect(); 430 if parts.len() >= 2 { 431 let namespace = format!("{}.{}", parts[0], parts[1]); 432 + namespace_to_collections 433 + .entry(namespace) 434 + .or_default() 435 .push(collection); 436 } 437 } ··· 441 let known_merge_namespaces = vec!["app.bsky", "chat.bsky"]; 442 let namespaces: Vec<String> = namespace_to_collections.keys().cloned().collect(); 443 444 + let resolution_futures: Vec<_> = namespaces 445 + .iter() 446 .map(|ns| { 447 let ns = ns.clone(); 448 let known = known_merge_namespaces.clone(); ··· 463 .into_iter() 464 .zip(resolved_dids.into_iter()) 465 .map(|(ns, did)| { 466 + let collections = namespace_to_collections 467 + .get(&ns) 468 + .cloned() 469 + .unwrap_or_default(); 470 (ns, did, collections) 471 }) 472 .collect(); 473 474 // Apply fallback: if namespace didn't resolve, try to find a sibling namespace with same domain 475 + let mut namespace_to_did: std::collections::HashMap<String, Option<String>> = 476 + std::collections::HashMap::new(); 477 for (namespace, did_opt, _) in &namespace_data { 478 namespace_to_did.insert(namespace.clone(), did_opt.clone()); 479 } 480 481 // Build domain -> DIDs map for fallback 482 + let mut domain_to_dids: std::collections::HashMap<String, Vec<String>> = 483 + std::collections::HashMap::new(); 484 for (namespace, did_opt, _) in &namespace_data { 485 if let Some(did) = did_opt { 486 // Extract second-level domain (e.g., "app.bsky" -> "bsky", "chat.bsky" -> "bsky") 487 let parts: Vec<&str> = namespace.split('.').collect(); 488 if parts.len() >= 2 { 489 let domain = parts[1].to_string(); 490 + domain_to_dids.entry(domain).or_default().push(did.clone()); 491 } 492 } 493 } ··· 516 } 517 518 // Group by DID for merging 519 + let mut did_to_namespaces: std::collections::HashMap<String, Vec<(String, Vec<String>)>> = 520 + std::collections::HashMap::new(); 521 let mut no_did_apps: Vec<(String, Vec<String>)> = Vec::new(); 522 523 for (namespace, did_opt, collections) in resolved_namespace_data { 524 if let Some(did) = did_opt { 525 + did_to_namespaces 526 + .entry(did) 527 + .or_default() 528 .push((namespace, collections)); 529 } else { 530 no_did_apps.push((namespace, collections)); ··· 620 // Cache the result 621 { 622 let mut cache = AVATAR_CACHE.lock().unwrap(); 623 + cache.insert( 624 + namespace.clone(), 625 + CachedAvatar { 626 + url: avatar_url.clone(), 627 + timestamp: Instant::now(), 628 + }, 629 + ); 630 } 631 632 HttpResponse::Ok() ··· 639 async fn fetch_avatar_for_namespace(namespace: &str) -> Option<String> { 640 // Reverse namespace to get domain (e.g., io.zzstoatzz -> zzstoatzz.io) 641 let reversed: String = namespace.split('.').rev().collect::<Vec<&str>>().join("."); 642 + let handles = [reversed.clone(), format!("{}.bsky.social", reversed)]; 643 644 // Try all handles concurrently 645 + let futures: Vec<_> = handles 646 + .iter() 647 .map(|handle| try_fetch_avatar_for_handle(handle)) 648 .collect(); 649 650 // Wait for all futures and return first successful result 651 let results = future::join_all(futures).await; 652 653 + results.into_iter().flatten().next() 654 } 655 656 async fn try_fetch_avatar_for_handle(handle: &str) -> Option<String> { 657 // Try to resolve handle to DID 658 + let resolve_url = format!("{}?handle={}", constants::BSKY_API_RESOLVE_HANDLE, handle); 659 660 let did = match reqwest::get(&resolve_url).await { 661 Ok(response) => match response.json::<serde_json::Value>().await { ··· 666 }; 667 668 // Try to get profile 669 + let profile_url = format!("{}?actor={}", constants::BSKY_API_GET_PROFILE, did); 670 671 match reqwest::get(&profile_url).await { 672 Ok(response) => match response.json::<serde_json::Value>().await { ··· 762 // Check if user is logged in 763 let did: Option<String> = match session.get(constants::SESSION_KEY_DID) { 764 Ok(d) => d, 765 + Err(_) => { 766 + return HttpResponse::Unauthorized().json(serde_json::json!({ 767 + "error": "not authenticated" 768 + })) 769 + } 770 }; 771 772 let did = match did { 773 Some(d) => d, 774 + None => { 775 + return HttpResponse::Unauthorized().json(serde_json::json!({ 776 + "error": "not authenticated" 777 + })) 778 + } 779 }; 780 781 // Retrieve authenticated agent from cache 782 let agent = match AGENT_CACHE.get(&did) { 783 Some(a) => a.clone(), 784 + None => { 785 + return HttpResponse::Unauthorized().json(serde_json::json!({ 786 + "error": "session expired, please log in again" 787 + })) 788 + } 789 }; 790 791 // Create the visit record ··· 798 // Convert to Unknown type 799 let record: atrium_api::types::Unknown = serde_json::from_value(record_json) 800 .map_err(|e| { 801 + HttpResponse::InternalServerError().json(serde_json::json!({ 802 "error": format!("failed to serialize record: {}", e) 803 })) 804 }) ··· 806 807 // Create the record in the user's PDS 808 let input = atrium_api::com::atproto::repo::create_record::InputData { 809 + collection: atrium_api::types::string::Nsid::new( 810 + constants::GUESTBOOK_COLLECTION.to_string(), 811 + ) 812 + .unwrap(), 813 + record, 814 repo: atrium_api::types::string::AtIdentifier::Did( 815 + atrium_api::types::string::Did::new(did.clone()).unwrap(), 816 ), 817 rkey: None, 818 swap_commit: None, 819 validate: None, 820 }; 821 822 + match agent.api.com.atproto.repo.create_record(input.into()).await { 823 Ok(output) => { 824 // Fetch fresh data from UFOs and add this signature 825 match fetch_signatures_from_ufos().await { ··· 839 // Update cache 840 { 841 let mut cache = GUESTBOOK_CACHE.lock().unwrap(); 842 + *cache = Some(CachedGuestbookSignatures { signatures }); 843 } 844 845 log::info!("Added signature to cache for DID: {}", did); 846 } 847 Err(e) => { 848 + log::warn!( 849 + "Failed to update cache after signing, invalidating instead: {}", 850 + e 851 + ); 852 invalidate_guestbook_cache(); 853 } 854 } ··· 858 "uri": output.data.uri, 859 "cid": output.data.cid, 860 })) 861 + } 862 Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ 863 "error": format!("failed to create record: {}", e) 864 })), ··· 879 if let Some(ref did_str) = did { 880 if let Some(agent) = AGENT_CACHE.get(did_str) { 881 let list_input = atrium_api::com::atproto::repo::list_records::ParametersData { 882 + collection: atrium_api::types::string::Nsid::new( 883 + constants::GUESTBOOK_COLLECTION.to_string(), 884 + ) 885 + .unwrap(), 886 repo: atrium_api::types::string::AtIdentifier::Did( 887 + atrium_api::types::string::Did::new(did_str.clone()).unwrap(), 888 ), 889 cursor: None, 890 limit: Some(atrium_api::types::LimitedNonZeroU8::try_from(1).unwrap()), 891 reverse: None, 892 }; 893 894 + if let Ok(output) = agent 895 + .api 896 + .com 897 + .atproto 898 + .repo 899 + .list_records(list_input.into()) 900 + .await 901 + { 902 has_records = !output.data.records.is_empty(); 903 } 904 } ··· 923 // Check if user is logged in 924 let did: Option<String> = match session.get(constants::SESSION_KEY_DID) { 925 Ok(d) => d, 926 + Err(_) => { 927 + return HttpResponse::Unauthorized().json(serde_json::json!({ 928 + "error": "not authenticated" 929 + })) 930 + } 931 }; 932 933 let did = match did { 934 Some(d) => d, 935 + None => { 936 + return HttpResponse::Unauthorized().json(serde_json::json!({ 937 + "error": "not authenticated" 938 + })) 939 + } 940 }; 941 942 // Retrieve authenticated agent from cache 943 let agent = match AGENT_CACHE.get(&did) { 944 Some(a) => a.clone(), 945 + None => { 946 + return HttpResponse::Unauthorized().json(serde_json::json!({ 947 + "error": "session expired, please log in again" 948 + })) 949 + } 950 }; 951 952 // List all guestbook records for this user 953 let list_input = atrium_api::com::atproto::repo::list_records::ParametersData { 954 + collection: atrium_api::types::string::Nsid::new( 955 + constants::GUESTBOOK_COLLECTION.to_string(), 956 + ) 957 + .unwrap(), 958 repo: atrium_api::types::string::AtIdentifier::Did( 959 + atrium_api::types::string::Did::new(did.clone()).unwrap(), 960 ), 961 cursor: None, 962 limit: Some(atrium_api::types::LimitedNonZeroU8::try_from(100).unwrap()), ··· 972 .await 973 { 974 Ok(output) => output.data.records, 975 + Err(e) => { 976 + return HttpResponse::InternalServerError().json(serde_json::json!({ 977 + "error": format!("failed to list records: {}", e) 978 + })) 979 + } 980 }; 981 982 if records.is_empty() { ··· 994 let uri_parts: Vec<&str> = record.uri.split('/').collect(); 995 if let Some(rkey) = uri_parts.last() { 996 let delete_input = atrium_api::com::atproto::repo::delete_record::InputData { 997 + collection: atrium_api::types::string::Nsid::new( 998 + constants::GUESTBOOK_COLLECTION.to_string(), 999 + ) 1000 + .unwrap(), 1001 repo: atrium_api::types::string::AtIdentifier::Did( 1002 + atrium_api::types::string::Did::new(did.clone()).unwrap(), 1003 ), 1004 rkey: atrium_api::types::string::RecordKey::new(rkey.to_string()).unwrap(), 1005 swap_commit: None, ··· 1037 } 1038 } 1039 Err(e) => { 1040 + log::warn!( 1041 + "Failed to update cache after unsigning, invalidating instead: {}", 1042 + e 1043 + ); 1044 invalidate_guestbook_cache(); 1045 } 1046 } ··· 1063 let cache = GUESTBOOK_CACHE.lock().unwrap(); 1064 if let Some(cached) = cache.as_ref() { 1065 // Cache is valid - return cached signatures 1066 + log::info!( 1067 + "Returning {} signatures from cache", 1068 + cached.signatures.len() 1069 + ); 1070 + log::info!( 1071 + "Cached signature DIDs: {:?}", 1072 + cached.signatures.iter().map(|s| &s.did).collect::<Vec<_>>() 1073 + ); 1074 return HttpResponse::Ok() 1075 .insert_header(("Cache-Control", "public, max-age=10")) 1076 .json(&cached.signatures); ··· 1110 if let Ok(doc) = response.json::<serde_json::Value>().await { 1111 doc["alsoKnownAs"] 1112 .as_array() 1113 + .and_then(|aka| aka.first()) 1114 .and_then(|v| v.as_str()) 1115 .map(|s| s.replace("at://", "")) 1116 } else { ··· 1139 .await 1140 .map_err(|e| format!("failed to fetch from UFOs API: {}", e))?; 1141 1142 + let records: Vec<UfosRecord> = response 1143 + .json() 1144 .await 1145 .map_err(|e| format!("failed to parse UFOs response: {}", e))?; 1146 1147 log::info!("Fetched {} records from UFOs API", records.len()); 1148 1149 // Fetch profile info for each DID in parallel 1150 + let profile_futures: Vec<_> = records 1151 + .iter() 1152 .map(|record| { 1153 let did = record.did.clone(); 1154 let timestamp = record.record["createdAt"] ··· 1172 // Sort by timestamp (most recent first) 1173 signatures.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); 1174 1175 + log::info!( 1176 + "Processed {} signatures with profile info", 1177 + signatures.len() 1178 + ); 1179 1180 Ok(signatures) 1181 } ··· 1195 pub async fn check_page_owner_signature(query: web::Query<CheckSignatureQuery>) -> HttpResponse { 1196 let did = &query.did; 1197 1198 + log::info!( 1199 + "Checking if DID has signed guestbook by querying their PDS: {}", 1200 + did 1201 + ); 1202 1203 // Fetch DID document to get PDS URL 1204 let did_doc_url = format!("{}/{}", constants::PLC_DIRECTORY, did); 1205 let pds = match reqwest::get(&did_doc_url).await { 1206 Ok(response) => match response.json::<serde_json::Value>().await { 1207 + Ok(doc) => doc["service"] 1208 + .as_array() 1209 + .and_then(|services| { 1210 + services 1211 + .iter() 1212 + .find(|s| s["type"].as_str() == Some("AtprotoPersonalDataServer")) 1213 + }) 1214 + .and_then(|s| s["serviceEndpoint"].as_str()) 1215 + .unwrap_or("") 1216 + .to_string(), 1217 Err(e) => { 1218 log::error!("Failed to parse DID document: {}", e); 1219 return HttpResponse::InternalServerError().json(serde_json::json!({ ··· 1232 // Query the PDS for guestbook records 1233 let records_url = format!( 1234 "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection={}&limit=1", 1235 + pds, 1236 + did, 1237 + constants::GUESTBOOK_COLLECTION 1238 ); 1239 1240 match reqwest::get(&records_url).await { ··· 1307 let pds = did_doc["service"] 1308 .as_array() 1309 .and_then(|services| { 1310 + services 1311 + .iter() 1312 + .find(|s| s["type"].as_str() == Some("AtprotoPersonalDataServer")) 1313 }) 1314 .and_then(|s| s["serviceEndpoint"].as_str()) 1315 .unwrap_or("") ··· 1319 let repo_url = format!("{}/xrpc/com.atproto.repo.describeRepo?repo={}", pds, did); 1320 let mut collections = match reqwest::get(&repo_url).await { 1321 Ok(r) => match r.json::<serde_json::Value>().await { 1322 + Ok(repo_data) => repo_data["collections"] 1323 + .as_array() 1324 + .map(|arr| { 1325 + arr.iter() 1326 + .filter_map(|v| v.as_str().map(String::from)) 1327 + .collect::<Vec<String>>() 1328 + }) 1329 + .unwrap_or_default(), 1330 Err(e) => { 1331 log::error!("Failed to parse repo data: {}", e); 1332 vec![] ··· 1343 collections.push(constants::GUESTBOOK_COLLECTION.to_string()); 1344 } 1345 1346 + log::info!( 1347 + "Fetched {} collections for DID: {} (including guestbook)", 1348 + collections.len(), 1349 + did 1350 + ); 1351 1352 // Get or create a broadcaster for this DID with its collections 1353 + let broadcaster = 1354 + crate::firehose::get_or_create_broadcaster(&manager, did.clone(), collections).await; 1355 let mut rx = broadcaster.subscribe(); 1356 1357 log::info!("SSE connection established for DID: {}", did); ··· 1359 let stream = async_stream::stream! { 1360 // Send initial connection message 1361 yield Ok::<_, actix_web::Error>( 1362 + web::Bytes::from("data: {\"type\":\"connected\"}\n\n".to_string()) 1363 ); 1364 1365 log::info!("Sent initial connection message to client");