Your music, beautifully tracked. All yours. (coming soon)
teal.fm
teal-fm
atproto
1use anyhow::Result;
2use axum::{Extension, Json, extract::Multipart, extract::Path, http::StatusCode};
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{error, info};
6
7use crate::ctx::Context;
8use crate::redis_client::RedisClient;
9use crate::types::CarImportJobStatus;
10
11#[derive(Debug, Serialize, Deserialize)]
12pub struct MetaOsInfo {
13 os_type: String,
14 release: String,
15 hostname: String,
16}
17
18#[derive(Debug, Serialize, Deserialize)]
19pub struct MetaAppInfo {
20 git_hash: String,
21 git_date: String,
22 build_time: String,
23 rustc_ver: String,
24}
25
26#[derive(Debug, Serialize, Deserialize)]
27pub struct MetaInfo {
28 os: MetaOsInfo,
29 app: MetaAppInfo,
30}
31
32pub async fn get_meta_info(
33 Extension(_ctx): Extension<Context>,
34) -> impl axum::response::IntoResponse {
35 // Retrieve system information
36 let git_hash = env!("VERGEN_GIT_DESCRIBE");
37 let git_date = env!("VERGEN_GIT_COMMIT_DATE");
38 let build_time = env!("VERGEN_BUILD_TIMESTAMP");
39 let rustc_ver = env!("VERGEN_RUSTC_SEMVER");
40
41 let os_type = sys_info::os_type().unwrap_or_else(|_| "Unknown".to_string());
42 let os_release = sys_info::os_release().unwrap_or_else(|_| "Unknown".to_string());
43 let hostname = sys_info::hostname().unwrap_or_else(|_| "Unknown".to_string());
44
45 Json(MetaInfo {
46 os: MetaOsInfo {
47 os_type,
48 release: os_release,
49 hostname,
50 },
51 app: MetaAppInfo {
52 git_hash: git_hash.to_string(),
53 git_date: git_date.to_string(),
54 build_time: build_time.to_string(),
55 rustc_ver: rustc_ver.to_string(),
56 },
57 })
58}
59
60/// Get CAR import job status
61pub async fn get_car_import_job_status(
62 Path(job_id): Path<String>,
63) -> Result<Json<CarImportJobStatus>, (StatusCode, Json<ErrorResponse>)> {
64 use crate::types::queue_keys;
65
66 info!("Getting status for job: {}", job_id);
67
68 // Parse job ID
69 let job_uuid = match uuid::Uuid::parse_str(&job_id) {
70 Ok(uuid) => uuid,
71 Err(_) => {
72 let error_response = ErrorResponse {
73 error: "Invalid job ID format".to_string(),
74 details: Some("Job ID must be a valid UUID".to_string()),
75 };
76 return Err((StatusCode::BAD_REQUEST, Json(error_response)));
77 }
78 };
79
80 // Connect to Redis
81 let redis_url =
82 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
83 let redis_client = match RedisClient::new(&redis_url) {
84 Ok(client) => client,
85 Err(e) => {
86 error!("Failed to connect to Redis: {}", e);
87 let error_response = ErrorResponse {
88 error: "Internal server error".to_string(),
89 details: Some("Failed to connect to Redis".to_string()),
90 };
91 return Err((StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)));
92 }
93 };
94
95 // Get job status
96 match redis_client
97 .get_job_status(&queue_keys::job_status_key(&job_uuid))
98 .await
99 {
100 Ok(Some(status_data)) => match serde_json::from_str::<CarImportJobStatus>(&status_data) {
101 Ok(status) => Ok(Json(status)),
102 Err(e) => {
103 error!("Failed to parse job status: {}", e);
104 let error_response = ErrorResponse {
105 error: "Failed to parse job status".to_string(),
106 details: Some(e.to_string()),
107 };
108 Err((StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)))
109 }
110 },
111 Ok(None) => {
112 let error_response = ErrorResponse {
113 error: "Job not found".to_string(),
114 details: Some(format!("No job found with ID: {}", job_id)),
115 };
116 Err((StatusCode::NOT_FOUND, Json(error_response)))
117 }
118 Err(e) => {
119 error!("Failed to get job status from Redis: {}", e);
120 let error_response = ErrorResponse {
121 error: "Failed to get job status".to_string(),
122 details: Some(e.to_string()),
123 };
124 Err((StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)))
125 }
126 }
127}
128
129#[derive(Debug, Serialize, Deserialize)]
130pub struct CarImportRequest {
131 pub import_id: Option<String>,
132 pub description: Option<String>,
133}
134
135#[derive(Debug, Serialize, Deserialize)]
136pub struct CarImportResponse {
137 pub import_id: String,
138 pub status: String,
139 pub message: String,
140}
141
142#[derive(Debug, Serialize, Deserialize)]
143pub struct ErrorResponse {
144 pub error: String,
145 pub details: Option<String>,
146}
147
148#[derive(Debug, Serialize, Deserialize)]
149pub struct FetchCarRequest {
150 pub user_identifier: String, // DID or handle
151 pub since: Option<String>, // Optional revision for diff
152 pub debug: Option<bool>, // Enable debug mode for more verbose errors
153}
154
155#[derive(Debug, Serialize, Deserialize)]
156pub struct FetchCarResponse {
157 pub import_id: String,
158 pub user_did: String,
159 pub pds_host: String,
160 pub status: String,
161 pub message: String,
162}
163
164pub async fn upload_car_import(
165 Extension(ctx): Extension<Context>,
166 mut multipart: Multipart,
167) -> Result<Json<CarImportResponse>, StatusCode> {
168 info!("Received CAR file upload request");
169
170 let mut car_data: Option<Vec<u8>> = None;
171 let mut import_id: Option<String> = None;
172 let mut description: Option<String> = None;
173
174 // Process multipart form data
175 while let Some(field) = multipart
176 .next_field()
177 .await
178 .map_err(|_| StatusCode::BAD_REQUEST)?
179 {
180 let name = field.name().unwrap_or("").to_string();
181
182 match name.as_str() {
183 "car_file" => {
184 let data = field.bytes().await.map_err(|_| StatusCode::BAD_REQUEST)?;
185 car_data = Some(data.to_vec());
186 }
187 "import_id" => {
188 let text = field.text().await.map_err(|_| StatusCode::BAD_REQUEST)?;
189 import_id = Some(text);
190 }
191 "description" => {
192 let text = field.text().await.map_err(|_| StatusCode::BAD_REQUEST)?;
193 description = Some(text);
194 }
195 _ => {
196 // Ignore unknown fields
197 }
198 }
199 }
200
201 let car_bytes = car_data.ok_or(StatusCode::BAD_REQUEST)?;
202 let final_import_id = import_id.unwrap_or_else(|| {
203 // Generate a unique import ID
204 format!("car-import-{}", chrono::Utc::now().timestamp())
205 });
206
207 // Validate CAR file format
208 match validate_car_file(&car_bytes).await {
209 Ok(_) => {
210 info!(
211 "CAR file validation successful for import {}",
212 final_import_id
213 );
214 }
215 Err(e) => {
216 error!("CAR file validation failed: {}", e);
217 return Err(StatusCode::BAD_REQUEST);
218 }
219 }
220
221 // Store CAR import request in database for processing
222 match store_car_import_request(&ctx, &final_import_id, &car_bytes, description.as_deref()).await
223 {
224 Ok(_) => {
225 info!(
226 "CAR import request stored successfully: {}",
227 final_import_id
228 );
229 Ok(Json(CarImportResponse {
230 import_id: final_import_id,
231 status: "queued".to_string(),
232 message: "CAR file uploaded successfully and queued for processing".to_string(),
233 }))
234 }
235 Err(e) => {
236 error!("Failed to store CAR import request: {}", e);
237 Err(StatusCode::INTERNAL_SERVER_ERROR)
238 }
239 }
240}
241
242pub async fn get_car_import_status(
243 Extension(ctx): Extension<Context>,
244 axum::extract::Path(import_id): axum::extract::Path<String>,
245) -> Result<Json<CarImportResponse>, StatusCode> {
246 match get_import_status(&ctx, &import_id).await {
247 Ok(Some(status)) => Ok(Json(CarImportResponse {
248 import_id,
249 status: status.status,
250 message: status.message,
251 })),
252 Ok(None) => Err(StatusCode::NOT_FOUND),
253 Err(e) => {
254 error!("Failed to get import status: {}", e);
255 Err(StatusCode::INTERNAL_SERVER_ERROR)
256 }
257 }
258}
259
260async fn validate_car_file(car_data: &[u8]) -> Result<()> {
261 use iroh_car::CarReader;
262 use std::io::Cursor;
263
264 let cursor = Cursor::new(car_data);
265 let reader = CarReader::new(cursor).await?;
266 let header = reader.header();
267
268 // Basic validation - ensure we have at least one root CID
269 if header.roots().is_empty() {
270 return Err(anyhow::anyhow!("CAR file has no root CIDs"));
271 }
272
273 info!("CAR file validated: {} root CIDs", header.roots().len());
274 Ok(())
275}
276
277#[derive(Debug)]
278struct ImportStatus {
279 status: String,
280 message: String,
281}
282
283pub async fn store_car_import_request(
284 _ctx: &Context,
285 _import_id: &str,
286 _car_data: &[u8],
287 _description: Option<&str>,
288) -> Result<()> {
289 // TODO: Implement database storage once tables are created
290 info!("CAR import storage temporarily disabled - tables not yet created");
291 Ok(())
292}
293
294async fn get_import_status(_ctx: &Context, _import_id: &str) -> Result<Option<ImportStatus>> {
295 // TODO: Implement once database tables are created
296 Ok(Some(ImportStatus {
297 status: "pending".to_string(),
298 message: "Database tables not yet created".to_string(),
299 }))
300}
301
302pub async fn fetch_car_from_user(
303 Extension(ctx): Extension<Context>,
304 Json(request): Json<FetchCarRequest>,
305) -> Result<Json<FetchCarResponse>, (StatusCode, Json<ErrorResponse>)> {
306 info!(
307 "Received CAR fetch request for user: {}",
308 request.user_identifier
309 );
310
311 // Resolve user identifier to DID and PDS
312 let (user_did, pds_host) = match resolve_user_to_pds(&request.user_identifier).await {
313 Ok(result) => result,
314 Err(e) => {
315 error!("Failed to resolve user {}: {}", request.user_identifier, e);
316 let error_response = ErrorResponse {
317 error: "Failed to resolve user".to_string(),
318 details: if request.debug.unwrap_or(false) {
319 Some(e.to_string())
320 } else {
321 None
322 },
323 };
324 return Err((StatusCode::BAD_REQUEST, Json(error_response)));
325 }
326 };
327
328 info!(
329 "Resolved {} to DID {} on PDS {}",
330 request.user_identifier, user_did, pds_host
331 );
332
333 // Generate import ID
334 let import_id = format!(
335 "pds-fetch-{}-{}",
336 user_did.replace(":", "-"),
337 chrono::Utc::now().timestamp()
338 );
339
340 // Fetch CAR file from PDS
341 match fetch_car_from_pds(&pds_host, &user_did, request.since.as_deref()).await {
342 Ok(car_data) => {
343 info!(
344 "Successfully fetched CAR file for {} ({} bytes)",
345 user_did,
346 car_data.len()
347 );
348
349 // Store the fetched CAR file for processing
350 let description = Some(format!(
351 "Fetched from PDS {} for user {}",
352 pds_host, request.user_identifier
353 ));
354 match store_car_import_request(&ctx, &import_id, &car_data, description.as_deref())
355 .await
356 {
357 Ok(_) => {
358 info!("CAR import request stored successfully: {}", import_id);
359 Ok(Json(FetchCarResponse {
360 import_id,
361 user_did,
362 pds_host,
363 status: "queued".to_string(),
364 message: "CAR file fetched from PDS and queued for processing".to_string(),
365 }))
366 }
367 Err(e) => {
368 error!("Failed to store fetched CAR import request: {}", e);
369 let error_response = ErrorResponse {
370 error: "Failed to store CAR import request".to_string(),
371 details: Some(e.to_string()),
372 };
373 Err((StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)))
374 }
375 }
376 }
377 Err(e) => {
378 error!("Failed to fetch CAR file from PDS {}: {}", pds_host, e);
379 let error_response = ErrorResponse {
380 error: "Failed to fetch CAR file from PDS".to_string(),
381 details: Some(format!("PDS: {}, Error: {}", pds_host, e)),
382 };
383 Err((StatusCode::BAD_GATEWAY, Json(error_response)))
384 }
385 }
386}
387
388/// Resolve a user identifier (DID or handle) to their DID and PDS host
389pub async fn resolve_user_to_pds(user_identifier: &str) -> Result<(String, String)> {
390 if user_identifier.starts_with("did:") {
391 // User provided a DID directly, resolve to PDS
392 let pds_host = resolve_did_to_pds(user_identifier).await?;
393 Ok((user_identifier.to_string(), pds_host))
394 } else {
395 // User provided a handle, resolve to DID then PDS
396 let user_did = resolve_handle_to_did(user_identifier).await?;
397 let pds_host = resolve_did_to_pds(&user_did).await?;
398 Ok((user_did, pds_host))
399 }
400}
401
402/// Resolve a handle to a DID using com.atproto.identity.resolveHandle
403async fn resolve_handle_to_did(handle: &str) -> Result<String> {
404 let url = format!(
405 "https://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle={}",
406 handle
407 );
408
409 let response = reqwest::get(&url).await?;
410 if !response.status().is_success() {
411 return Err(anyhow::anyhow!(
412 "Failed to resolve handle {}: {}",
413 handle,
414 response.status()
415 ));
416 }
417
418 let json: serde_json::Value = response.json().await?;
419 let did = json["did"]
420 .as_str()
421 .ok_or_else(|| anyhow::anyhow!("No DID found in response for handle {}", handle))?;
422
423 Ok(did.to_string())
424}
425
426/// Resolve a DID to their PDS host using DID document
427async fn resolve_did_to_pds(did: &str) -> Result<String> {
428 // For DID:plc, use the PLC directory
429 if did.starts_with("did:plc:") {
430 let url = format!("https://plc.directory/{}", did);
431
432 let response = reqwest::get(&url).await?;
433 if !response.status().is_success() {
434 return Err(anyhow::anyhow!(
435 "Failed to resolve DID {}: {}",
436 did,
437 response.status()
438 ));
439 }
440
441 let doc: serde_json::Value = response.json().await?;
442
443 // Find the PDS service endpoint
444 if let Some(services) = doc["service"].as_array() {
445 for service in services {
446 if service["id"].as_str() == Some("#atproto_pds") {
447 if let Some(endpoint) = service["serviceEndpoint"].as_str() {
448 // Extract hostname from URL
449 let url = url::Url::parse(endpoint)?;
450 let host = url.host_str().ok_or_else(|| {
451 anyhow::anyhow!("Invalid PDS endpoint URL: {}", endpoint)
452 })?;
453 return Ok(host.to_string());
454 }
455 }
456 }
457 }
458
459 Err(anyhow::anyhow!(
460 "No PDS service found in DID document for {}",
461 did
462 ))
463 } else {
464 Err(anyhow::anyhow!("Unsupported DID method: {}", did))
465 }
466}
467
468/// Fetch CAR file from PDS using com.atproto.sync.getRepo
469pub async fn fetch_car_from_pds(pds_host: &str, did: &str, since: Option<&str>) -> Result<Vec<u8>> {
470 let mut url = format!(
471 "https://{}/xrpc/com.atproto.sync.getRepo?did={}",
472 pds_host, did
473 );
474
475 if let Some(since_rev) = since {
476 url.push_str(&format!("&since={}", since_rev));
477 }
478
479 info!("Fetching CAR file from: {}", url);
480
481 let response = reqwest::get(&url).await?;
482 if !response.status().is_success() {
483 return Err(anyhow::anyhow!(
484 "Failed to fetch CAR from PDS {}: {}",
485 pds_host,
486 response.status()
487 ));
488 }
489
490 // Verify content type
491 let content_type = response
492 .headers()
493 .get("content-type")
494 .and_then(|h| h.to_str().ok())
495 .unwrap_or("");
496
497 if !content_type.contains("application/vnd.ipld.car") {
498 return Err(anyhow::anyhow!("Unexpected content type: {}", content_type));
499 }
500
501 let car_data = response.bytes().await?;
502 Ok(car_data.to_vec())
503}
504
505/// Generate a DID document for did:web
506fn generate_did_document(host: &str, pubkey: &str) -> Value {
507 json!({
508 "@context": [
509 "https://www.w3.org/ns/did/v1",
510 "https://w3id.org/security/multikey/v1",
511 "https://w3id.org/security/suites/secp256k1-2019/v1"
512 ],
513 "id": format!("did:web:{}", host),
514 "alsoKnownAs": [
515 format!("at://{}", host)
516 ],
517 "service": [
518 {
519 "id": "#bsky_fg",
520 "type": "BskyFeedGenerator",
521 "serviceEndpoint": format!("https://{}", host)
522 },
523 {
524 "id": "#atproto_pds",
525 "type": "AtprotoPersonalDataServer",
526 "serviceEndpoint": format!("https://{}", host)
527 }
528 ],
529 "verificationMethod": [
530 {
531 "id": format!("did:web:{}#atproto", host),
532 "type": "Multikey",
533 "controller": format!("did:web:{}", host),
534 "publicKeyMultibase": pubkey
535 }
536 ]
537 })
538}
539
540/// Handler for /.well-known/did.json endpoint
541pub async fn get_did_document(
542 Extension(_ctx): Extension<Context>,
543) -> impl axum::response::IntoResponse {
544 // Get the host from environment variable or use default
545 let host = std::env::var("APP_HOST")
546 .or_else(|_| std::env::var("HOST"))
547 .unwrap_or_else(|_| "localhost:3000".to_string());
548
549 // get pubkey from environment variable or use default
550 let pubkey = std::env::var("TEST_PUBKEY").unwrap_or_else(|_| {
551 "z6Mkw5f8g3h4j5k6l7m8n9o0p1q2r3s4t5u6v7w8x9y0z1a2b3c4d5e6f7g8h9i".to_string()
552 });
553
554 let did_doc = generate_did_document(&host, &pubkey);
555
556 (
557 StatusCode::OK,
558 [("Content-Type", "application/json")],
559 Json(did_doc),
560 )
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566
567 const TEST_PUBKEY: &str = "z6Mkw5f8g3h4j5k6l7m8n9o0p1q2r3s4t5u6v7w8x9y0z1a2b3c4d5e6f7g8h9i";
568
569 #[test]
570 fn test_generate_did_document() {
571 let host = "example.com";
572 let did_doc = generate_did_document(host, TEST_PUBKEY);
573
574 // Verify the structure of the generated DID document
575 assert_eq!(did_doc["id"], format!("did:web:{}", host));
576 assert_eq!(did_doc["alsoKnownAs"][0], format!("at://{}", host));
577
578 // Check services
579 let services = did_doc["service"].as_array().unwrap();
580 assert_eq!(services.len(), 2);
581
582 let bsky_fg = &services[0];
583 assert_eq!(bsky_fg["id"], "#bsky_fg");
584 assert_eq!(bsky_fg["type"], "BskyFeedGenerator");
585 assert_eq!(bsky_fg["serviceEndpoint"], format!("https://{}", host));
586
587 let atproto_pds = &services[1];
588 assert_eq!(atproto_pds["id"], "#atproto_pds");
589 assert_eq!(atproto_pds["type"], "AtprotoPersonalDataServer");
590 assert_eq!(atproto_pds["serviceEndpoint"], format!("https://{}", host));
591
592 // Check verification method
593 let verification_methods = did_doc["verificationMethod"].as_array().unwrap();
594 assert_eq!(verification_methods.len(), 1);
595
596 let vm = &verification_methods[0];
597 assert_eq!(vm["id"], format!("did:web:{}#atproto", host));
598 assert_eq!(vm["type"], "Multikey");
599 assert_eq!(vm["controller"], format!("did:web:{}", host));
600 assert!(vm["publicKeyMultibase"].as_str().unwrap().starts_with("z"));
601 }
602
603 #[test]
604 fn test_did_document_context() {
605 let host = "test.example.org";
606 let did_doc = generate_did_document(host, TEST_PUBKEY);
607
608 let context = did_doc["@context"].as_array().unwrap();
609 assert_eq!(context.len(), 3);
610 assert_eq!(context[0], "https://www.w3.org/ns/did/v1");
611 assert_eq!(context[1], "https://w3id.org/security/multikey/v1");
612 assert_eq!(
613 context[2],
614 "https://w3id.org/security/suites/secp256k1-2019/v1"
615 );
616 }
617
618 #[test]
619 fn test_different_hosts() {
620 // Test with different host formats
621 let hosts = vec![
622 "localhost:3000",
623 "bsky.social",
624 "example.org:8080",
625 "my-service.com",
626 ];
627
628 for host in hosts {
629 let did_doc = generate_did_document(host, TEST_PUBKEY);
630
631 // Verify basic structure for each host
632 assert_eq!(did_doc["id"], format!("did:web:{}", host));
633 assert_eq!(did_doc["alsoKnownAs"][0], format!("at://{}", host));
634
635 let services = did_doc["service"].as_array().unwrap();
636 assert_eq!(services.len(), 2);
637
638 let verification_methods = did_doc["verificationMethod"].as_array().unwrap();
639 assert_eq!(verification_methods.len(), 1);
640 }
641 }
642}