Your music, beautifully tracked. All yours. (coming soon) teal.fm
teal-fm atproto
at main 642 lines 21 kB view raw
1use anyhow::Result; 2use axum::{Extension, Json, extract::Multipart, extract::Path, http::StatusCode}; 3use serde::{Deserialize, Serialize}; 4use serde_json::{Value, json}; 5use tracing::{error, info}; 6 7use crate::ctx::Context; 8use crate::redis_client::RedisClient; 9use crate::types::CarImportJobStatus; 10 11#[derive(Debug, Serialize, Deserialize)] 12pub struct MetaOsInfo { 13 os_type: String, 14 release: String, 15 hostname: String, 16} 17 18#[derive(Debug, Serialize, Deserialize)] 19pub struct MetaAppInfo { 20 git_hash: String, 21 git_date: String, 22 build_time: String, 23 rustc_ver: String, 24} 25 26#[derive(Debug, Serialize, Deserialize)] 27pub struct MetaInfo { 28 os: MetaOsInfo, 29 app: MetaAppInfo, 30} 31 32pub async fn get_meta_info( 33 Extension(_ctx): Extension<Context>, 34) -> impl axum::response::IntoResponse { 35 // Retrieve system information 36 let git_hash = env!("VERGEN_GIT_DESCRIBE"); 37 let git_date = env!("VERGEN_GIT_COMMIT_DATE"); 38 let build_time = env!("VERGEN_BUILD_TIMESTAMP"); 39 let rustc_ver = env!("VERGEN_RUSTC_SEMVER"); 40 41 let os_type = sys_info::os_type().unwrap_or_else(|_| "Unknown".to_string()); 42 let os_release = sys_info::os_release().unwrap_or_else(|_| "Unknown".to_string()); 43 let hostname = sys_info::hostname().unwrap_or_else(|_| "Unknown".to_string()); 44 45 Json(MetaInfo { 46 os: MetaOsInfo { 47 os_type, 48 release: os_release, 49 hostname, 50 }, 51 app: MetaAppInfo { 52 git_hash: git_hash.to_string(), 53 git_date: git_date.to_string(), 54 build_time: build_time.to_string(), 55 rustc_ver: rustc_ver.to_string(), 56 }, 57 }) 58} 59 60/// Get CAR import job status 61pub async fn get_car_import_job_status( 62 Path(job_id): Path<String>, 63) -> Result<Json<CarImportJobStatus>, (StatusCode, Json<ErrorResponse>)> { 64 use crate::types::queue_keys; 65 66 info!("Getting status for job: {}", job_id); 67 68 // Parse job ID 69 let job_uuid = match uuid::Uuid::parse_str(&job_id) { 70 Ok(uuid) => uuid, 71 Err(_) => { 72 let error_response = ErrorResponse { 73 error: "Invalid job ID format".to_string(), 74 details: Some("Job ID must be a valid UUID".to_string()), 75 }; 76 return Err((StatusCode::BAD_REQUEST, Json(error_response))); 77 } 78 }; 79 80 // Connect to Redis 81 let redis_url = 82 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); 83 let redis_client = match RedisClient::new(&redis_url) { 84 Ok(client) => client, 85 Err(e) => { 86 error!("Failed to connect to Redis: {}", e); 87 let error_response = ErrorResponse { 88 error: "Internal server error".to_string(), 89 details: Some("Failed to connect to Redis".to_string()), 90 }; 91 return Err((StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))); 92 } 93 }; 94 95 // Get job status 96 match redis_client 97 .get_job_status(&queue_keys::job_status_key(&job_uuid)) 98 .await 99 { 100 Ok(Some(status_data)) => match serde_json::from_str::<CarImportJobStatus>(&status_data) { 101 Ok(status) => Ok(Json(status)), 102 Err(e) => { 103 error!("Failed to parse job status: {}", e); 104 let error_response = ErrorResponse { 105 error: "Failed to parse job status".to_string(), 106 details: Some(e.to_string()), 107 }; 108 Err((StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))) 109 } 110 }, 111 Ok(None) => { 112 let error_response = ErrorResponse { 113 error: "Job not found".to_string(), 114 details: Some(format!("No job found with ID: {}", job_id)), 115 }; 116 Err((StatusCode::NOT_FOUND, Json(error_response))) 117 } 118 Err(e) => { 119 error!("Failed to get job status from Redis: {}", e); 120 let error_response = ErrorResponse { 121 error: "Failed to get job status".to_string(), 122 details: Some(e.to_string()), 123 }; 124 Err((StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))) 125 } 126 } 127} 128 129#[derive(Debug, Serialize, Deserialize)] 130pub struct CarImportRequest { 131 pub import_id: Option<String>, 132 pub description: Option<String>, 133} 134 135#[derive(Debug, Serialize, Deserialize)] 136pub struct CarImportResponse { 137 pub import_id: String, 138 pub status: String, 139 pub message: String, 140} 141 142#[derive(Debug, Serialize, Deserialize)] 143pub struct ErrorResponse { 144 pub error: String, 145 pub details: Option<String>, 146} 147 148#[derive(Debug, Serialize, Deserialize)] 149pub struct FetchCarRequest { 150 pub user_identifier: String, // DID or handle 151 pub since: Option<String>, // Optional revision for diff 152 pub debug: Option<bool>, // Enable debug mode for more verbose errors 153} 154 155#[derive(Debug, Serialize, Deserialize)] 156pub struct FetchCarResponse { 157 pub import_id: String, 158 pub user_did: String, 159 pub pds_host: String, 160 pub status: String, 161 pub message: String, 162} 163 164pub async fn upload_car_import( 165 Extension(ctx): Extension<Context>, 166 mut multipart: Multipart, 167) -> Result<Json<CarImportResponse>, StatusCode> { 168 info!("Received CAR file upload request"); 169 170 let mut car_data: Option<Vec<u8>> = None; 171 let mut import_id: Option<String> = None; 172 let mut description: Option<String> = None; 173 174 // Process multipart form data 175 while let Some(field) = multipart 176 .next_field() 177 .await 178 .map_err(|_| StatusCode::BAD_REQUEST)? 179 { 180 let name = field.name().unwrap_or("").to_string(); 181 182 match name.as_str() { 183 "car_file" => { 184 let data = field.bytes().await.map_err(|_| StatusCode::BAD_REQUEST)?; 185 car_data = Some(data.to_vec()); 186 } 187 "import_id" => { 188 let text = field.text().await.map_err(|_| StatusCode::BAD_REQUEST)?; 189 import_id = Some(text); 190 } 191 "description" => { 192 let text = field.text().await.map_err(|_| StatusCode::BAD_REQUEST)?; 193 description = Some(text); 194 } 195 _ => { 196 // Ignore unknown fields 197 } 198 } 199 } 200 201 let car_bytes = car_data.ok_or(StatusCode::BAD_REQUEST)?; 202 let final_import_id = import_id.unwrap_or_else(|| { 203 // Generate a unique import ID 204 format!("car-import-{}", chrono::Utc::now().timestamp()) 205 }); 206 207 // Validate CAR file format 208 match validate_car_file(&car_bytes).await { 209 Ok(_) => { 210 info!( 211 "CAR file validation successful for import {}", 212 final_import_id 213 ); 214 } 215 Err(e) => { 216 error!("CAR file validation failed: {}", e); 217 return Err(StatusCode::BAD_REQUEST); 218 } 219 } 220 221 // Store CAR import request in database for processing 222 match store_car_import_request(&ctx, &final_import_id, &car_bytes, description.as_deref()).await 223 { 224 Ok(_) => { 225 info!( 226 "CAR import request stored successfully: {}", 227 final_import_id 228 ); 229 Ok(Json(CarImportResponse { 230 import_id: final_import_id, 231 status: "queued".to_string(), 232 message: "CAR file uploaded successfully and queued for processing".to_string(), 233 })) 234 } 235 Err(e) => { 236 error!("Failed to store CAR import request: {}", e); 237 Err(StatusCode::INTERNAL_SERVER_ERROR) 238 } 239 } 240} 241 242pub async fn get_car_import_status( 243 Extension(ctx): Extension<Context>, 244 axum::extract::Path(import_id): axum::extract::Path<String>, 245) -> Result<Json<CarImportResponse>, StatusCode> { 246 match get_import_status(&ctx, &import_id).await { 247 Ok(Some(status)) => Ok(Json(CarImportResponse { 248 import_id, 249 status: status.status, 250 message: status.message, 251 })), 252 Ok(None) => Err(StatusCode::NOT_FOUND), 253 Err(e) => { 254 error!("Failed to get import status: {}", e); 255 Err(StatusCode::INTERNAL_SERVER_ERROR) 256 } 257 } 258} 259 260async fn validate_car_file(car_data: &[u8]) -> Result<()> { 261 use iroh_car::CarReader; 262 use std::io::Cursor; 263 264 let cursor = Cursor::new(car_data); 265 let reader = CarReader::new(cursor).await?; 266 let header = reader.header(); 267 268 // Basic validation - ensure we have at least one root CID 269 if header.roots().is_empty() { 270 return Err(anyhow::anyhow!("CAR file has no root CIDs")); 271 } 272 273 info!("CAR file validated: {} root CIDs", header.roots().len()); 274 Ok(()) 275} 276 277#[derive(Debug)] 278struct ImportStatus { 279 status: String, 280 message: String, 281} 282 283pub async fn store_car_import_request( 284 _ctx: &Context, 285 _import_id: &str, 286 _car_data: &[u8], 287 _description: Option<&str>, 288) -> Result<()> { 289 // TODO: Implement database storage once tables are created 290 info!("CAR import storage temporarily disabled - tables not yet created"); 291 Ok(()) 292} 293 294async fn get_import_status(_ctx: &Context, _import_id: &str) -> Result<Option<ImportStatus>> { 295 // TODO: Implement once database tables are created 296 Ok(Some(ImportStatus { 297 status: "pending".to_string(), 298 message: "Database tables not yet created".to_string(), 299 })) 300} 301 302pub async fn fetch_car_from_user( 303 Extension(ctx): Extension<Context>, 304 Json(request): Json<FetchCarRequest>, 305) -> Result<Json<FetchCarResponse>, (StatusCode, Json<ErrorResponse>)> { 306 info!( 307 "Received CAR fetch request for user: {}", 308 request.user_identifier 309 ); 310 311 // Resolve user identifier to DID and PDS 312 let (user_did, pds_host) = match resolve_user_to_pds(&request.user_identifier).await { 313 Ok(result) => result, 314 Err(e) => { 315 error!("Failed to resolve user {}: {}", request.user_identifier, e); 316 let error_response = ErrorResponse { 317 error: "Failed to resolve user".to_string(), 318 details: if request.debug.unwrap_or(false) { 319 Some(e.to_string()) 320 } else { 321 None 322 }, 323 }; 324 return Err((StatusCode::BAD_REQUEST, Json(error_response))); 325 } 326 }; 327 328 info!( 329 "Resolved {} to DID {} on PDS {}", 330 request.user_identifier, user_did, pds_host 331 ); 332 333 // Generate import ID 334 let import_id = format!( 335 "pds-fetch-{}-{}", 336 user_did.replace(":", "-"), 337 chrono::Utc::now().timestamp() 338 ); 339 340 // Fetch CAR file from PDS 341 match fetch_car_from_pds(&pds_host, &user_did, request.since.as_deref()).await { 342 Ok(car_data) => { 343 info!( 344 "Successfully fetched CAR file for {} ({} bytes)", 345 user_did, 346 car_data.len() 347 ); 348 349 // Store the fetched CAR file for processing 350 let description = Some(format!( 351 "Fetched from PDS {} for user {}", 352 pds_host, request.user_identifier 353 )); 354 match store_car_import_request(&ctx, &import_id, &car_data, description.as_deref()) 355 .await 356 { 357 Ok(_) => { 358 info!("CAR import request stored successfully: {}", import_id); 359 Ok(Json(FetchCarResponse { 360 import_id, 361 user_did, 362 pds_host, 363 status: "queued".to_string(), 364 message: "CAR file fetched from PDS and queued for processing".to_string(), 365 })) 366 } 367 Err(e) => { 368 error!("Failed to store fetched CAR import request: {}", e); 369 let error_response = ErrorResponse { 370 error: "Failed to store CAR import request".to_string(), 371 details: Some(e.to_string()), 372 }; 373 Err((StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))) 374 } 375 } 376 } 377 Err(e) => { 378 error!("Failed to fetch CAR file from PDS {}: {}", pds_host, e); 379 let error_response = ErrorResponse { 380 error: "Failed to fetch CAR file from PDS".to_string(), 381 details: Some(format!("PDS: {}, Error: {}", pds_host, e)), 382 }; 383 Err((StatusCode::BAD_GATEWAY, Json(error_response))) 384 } 385 } 386} 387 388/// Resolve a user identifier (DID or handle) to their DID and PDS host 389pub async fn resolve_user_to_pds(user_identifier: &str) -> Result<(String, String)> { 390 if user_identifier.starts_with("did:") { 391 // User provided a DID directly, resolve to PDS 392 let pds_host = resolve_did_to_pds(user_identifier).await?; 393 Ok((user_identifier.to_string(), pds_host)) 394 } else { 395 // User provided a handle, resolve to DID then PDS 396 let user_did = resolve_handle_to_did(user_identifier).await?; 397 let pds_host = resolve_did_to_pds(&user_did).await?; 398 Ok((user_did, pds_host)) 399 } 400} 401 402/// Resolve a handle to a DID using com.atproto.identity.resolveHandle 403async fn resolve_handle_to_did(handle: &str) -> Result<String> { 404 let url = format!( 405 "https://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle={}", 406 handle 407 ); 408 409 let response = reqwest::get(&url).await?; 410 if !response.status().is_success() { 411 return Err(anyhow::anyhow!( 412 "Failed to resolve handle {}: {}", 413 handle, 414 response.status() 415 )); 416 } 417 418 let json: serde_json::Value = response.json().await?; 419 let did = json["did"] 420 .as_str() 421 .ok_or_else(|| anyhow::anyhow!("No DID found in response for handle {}", handle))?; 422 423 Ok(did.to_string()) 424} 425 426/// Resolve a DID to their PDS host using DID document 427async fn resolve_did_to_pds(did: &str) -> Result<String> { 428 // For DID:plc, use the PLC directory 429 if did.starts_with("did:plc:") { 430 let url = format!("https://plc.directory/{}", did); 431 432 let response = reqwest::get(&url).await?; 433 if !response.status().is_success() { 434 return Err(anyhow::anyhow!( 435 "Failed to resolve DID {}: {}", 436 did, 437 response.status() 438 )); 439 } 440 441 let doc: serde_json::Value = response.json().await?; 442 443 // Find the PDS service endpoint 444 if let Some(services) = doc["service"].as_array() { 445 for service in services { 446 if service["id"].as_str() == Some("#atproto_pds") { 447 if let Some(endpoint) = service["serviceEndpoint"].as_str() { 448 // Extract hostname from URL 449 let url = url::Url::parse(endpoint)?; 450 let host = url.host_str().ok_or_else(|| { 451 anyhow::anyhow!("Invalid PDS endpoint URL: {}", endpoint) 452 })?; 453 return Ok(host.to_string()); 454 } 455 } 456 } 457 } 458 459 Err(anyhow::anyhow!( 460 "No PDS service found in DID document for {}", 461 did 462 )) 463 } else { 464 Err(anyhow::anyhow!("Unsupported DID method: {}", did)) 465 } 466} 467 468/// Fetch CAR file from PDS using com.atproto.sync.getRepo 469pub async fn fetch_car_from_pds(pds_host: &str, did: &str, since: Option<&str>) -> Result<Vec<u8>> { 470 let mut url = format!( 471 "https://{}/xrpc/com.atproto.sync.getRepo?did={}", 472 pds_host, did 473 ); 474 475 if let Some(since_rev) = since { 476 url.push_str(&format!("&since={}", since_rev)); 477 } 478 479 info!("Fetching CAR file from: {}", url); 480 481 let response = reqwest::get(&url).await?; 482 if !response.status().is_success() { 483 return Err(anyhow::anyhow!( 484 "Failed to fetch CAR from PDS {}: {}", 485 pds_host, 486 response.status() 487 )); 488 } 489 490 // Verify content type 491 let content_type = response 492 .headers() 493 .get("content-type") 494 .and_then(|h| h.to_str().ok()) 495 .unwrap_or(""); 496 497 if !content_type.contains("application/vnd.ipld.car") { 498 return Err(anyhow::anyhow!("Unexpected content type: {}", content_type)); 499 } 500 501 let car_data = response.bytes().await?; 502 Ok(car_data.to_vec()) 503} 504 505/// Generate a DID document for did:web 506fn generate_did_document(host: &str, pubkey: &str) -> Value { 507 json!({ 508 "@context": [ 509 "https://www.w3.org/ns/did/v1", 510 "https://w3id.org/security/multikey/v1", 511 "https://w3id.org/security/suites/secp256k1-2019/v1" 512 ], 513 "id": format!("did:web:{}", host), 514 "alsoKnownAs": [ 515 format!("at://{}", host) 516 ], 517 "service": [ 518 { 519 "id": "#bsky_fg", 520 "type": "BskyFeedGenerator", 521 "serviceEndpoint": format!("https://{}", host) 522 }, 523 { 524 "id": "#atproto_pds", 525 "type": "AtprotoPersonalDataServer", 526 "serviceEndpoint": format!("https://{}", host) 527 } 528 ], 529 "verificationMethod": [ 530 { 531 "id": format!("did:web:{}#atproto", host), 532 "type": "Multikey", 533 "controller": format!("did:web:{}", host), 534 "publicKeyMultibase": pubkey 535 } 536 ] 537 }) 538} 539 540/// Handler for /.well-known/did.json endpoint 541pub async fn get_did_document( 542 Extension(_ctx): Extension<Context>, 543) -> impl axum::response::IntoResponse { 544 // Get the host from environment variable or use default 545 let host = std::env::var("APP_HOST") 546 .or_else(|_| std::env::var("HOST")) 547 .unwrap_or_else(|_| "localhost:3000".to_string()); 548 549 // get pubkey from environment variable or use default 550 let pubkey = std::env::var("TEST_PUBKEY").unwrap_or_else(|_| { 551 "z6Mkw5f8g3h4j5k6l7m8n9o0p1q2r3s4t5u6v7w8x9y0z1a2b3c4d5e6f7g8h9i".to_string() 552 }); 553 554 let did_doc = generate_did_document(&host, &pubkey); 555 556 ( 557 StatusCode::OK, 558 [("Content-Type", "application/json")], 559 Json(did_doc), 560 ) 561} 562 563#[cfg(test)] 564mod tests { 565 use super::*; 566 567 const TEST_PUBKEY: &str = "z6Mkw5f8g3h4j5k6l7m8n9o0p1q2r3s4t5u6v7w8x9y0z1a2b3c4d5e6f7g8h9i"; 568 569 #[test] 570 fn test_generate_did_document() { 571 let host = "example.com"; 572 let did_doc = generate_did_document(host, TEST_PUBKEY); 573 574 // Verify the structure of the generated DID document 575 assert_eq!(did_doc["id"], format!("did:web:{}", host)); 576 assert_eq!(did_doc["alsoKnownAs"][0], format!("at://{}", host)); 577 578 // Check services 579 let services = did_doc["service"].as_array().unwrap(); 580 assert_eq!(services.len(), 2); 581 582 let bsky_fg = &services[0]; 583 assert_eq!(bsky_fg["id"], "#bsky_fg"); 584 assert_eq!(bsky_fg["type"], "BskyFeedGenerator"); 585 assert_eq!(bsky_fg["serviceEndpoint"], format!("https://{}", host)); 586 587 let atproto_pds = &services[1]; 588 assert_eq!(atproto_pds["id"], "#atproto_pds"); 589 assert_eq!(atproto_pds["type"], "AtprotoPersonalDataServer"); 590 assert_eq!(atproto_pds["serviceEndpoint"], format!("https://{}", host)); 591 592 // Check verification method 593 let verification_methods = did_doc["verificationMethod"].as_array().unwrap(); 594 assert_eq!(verification_methods.len(), 1); 595 596 let vm = &verification_methods[0]; 597 assert_eq!(vm["id"], format!("did:web:{}#atproto", host)); 598 assert_eq!(vm["type"], "Multikey"); 599 assert_eq!(vm["controller"], format!("did:web:{}", host)); 600 assert!(vm["publicKeyMultibase"].as_str().unwrap().starts_with("z")); 601 } 602 603 #[test] 604 fn test_did_document_context() { 605 let host = "test.example.org"; 606 let did_doc = generate_did_document(host, TEST_PUBKEY); 607 608 let context = did_doc["@context"].as_array().unwrap(); 609 assert_eq!(context.len(), 3); 610 assert_eq!(context[0], "https://www.w3.org/ns/did/v1"); 611 assert_eq!(context[1], "https://w3id.org/security/multikey/v1"); 612 assert_eq!( 613 context[2], 614 "https://w3id.org/security/suites/secp256k1-2019/v1" 615 ); 616 } 617 618 #[test] 619 fn test_different_hosts() { 620 // Test with different host formats 621 let hosts = vec![ 622 "localhost:3000", 623 "bsky.social", 624 "example.org:8080", 625 "my-service.com", 626 ]; 627 628 for host in hosts { 629 let did_doc = generate_did_document(host, TEST_PUBKEY); 630 631 // Verify basic structure for each host 632 assert_eq!(did_doc["id"], format!("did:web:{}", host)); 633 assert_eq!(did_doc["alsoKnownAs"][0], format!("at://{}", host)); 634 635 let services = did_doc["service"].as_array().unwrap(); 636 assert_eq!(services.len(), 2); 637 638 let verification_methods = did_doc["verificationMethod"].as_array().unwrap(); 639 assert_eq!(verification_methods.len(), 1); 640 } 641 } 642}