this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 body::Body, 5 extract::{Query, State}, 6 http::StatusCode, 7 http::header, 8 response::{IntoResponse, Response}, 9}; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use tracing::{error, info}; 13 14#[derive(Deserialize)] 15pub struct GetLatestCommitParams { 16 pub did: String, 17} 18 19#[derive(Serialize)] 20pub struct GetLatestCommitOutput { 21 pub cid: String, 22 pub rev: String, 23} 24 25pub async fn get_latest_commit( 26 State(state): State<AppState>, 27 Query(params): Query<GetLatestCommitParams>, 28) -> Response { 29 let did = params.did.trim(); 30 31 if did.is_empty() { 32 return ( 33 StatusCode::BAD_REQUEST, 34 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 35 ) 36 .into_response(); 37 } 38 39 let result = sqlx::query!( 40 r#" 41 SELECT r.repo_root_cid 42 FROM repos r 43 JOIN users u ON r.user_id = u.id 44 WHERE u.did = $1 45 "#, 46 did 47 ) 48 .fetch_optional(&state.db) 49 .await; 50 51 match result { 52 Ok(Some(row)) => { 53 ( 54 StatusCode::OK, 55 Json(GetLatestCommitOutput { 56 cid: row.repo_root_cid, 57 rev: chrono::Utc::now().timestamp_millis().to_string(), 58 }), 59 ) 60 .into_response() 61 } 62 Ok(None) => ( 63 StatusCode::NOT_FOUND, 64 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 65 ) 66 .into_response(), 67 Err(e) => { 68 error!("DB error in get_latest_commit: {:?}", e); 69 ( 70 StatusCode::INTERNAL_SERVER_ERROR, 71 Json(json!({"error": "InternalError"})), 72 ) 73 .into_response() 74 } 75 } 76} 77 78#[derive(Deserialize)] 79pub struct ListReposParams { 80 pub limit: Option<i64>, 81 pub cursor: Option<String>, 82} 83 84#[derive(Serialize)] 85#[serde(rename_all = "camelCase")] 86pub struct RepoInfo { 87 pub did: String, 88 pub head: String, 89 pub rev: String, 90 pub active: bool, 91} 92 93#[derive(Serialize)] 94pub struct ListReposOutput { 95 pub cursor: Option<String>, 96 pub repos: Vec<RepoInfo>, 97} 98 99pub async fn list_repos( 100 State(state): State<AppState>, 101 Query(params): Query<ListReposParams>, 102) -> Response { 103 let limit = params.limit.unwrap_or(50).min(1000); 104 let cursor_did = params.cursor.as_deref().unwrap_or(""); 105 106 let result = sqlx::query!( 107 r#" 108 SELECT u.did, r.repo_root_cid 109 FROM repos r 110 JOIN users u ON r.user_id = u.id 111 WHERE u.did > $1 112 ORDER BY u.did ASC 113 LIMIT $2 114 "#, 115 cursor_did, 116 limit + 1 117 ) 118 .fetch_all(&state.db) 119 .await; 120 121 match result { 122 Ok(rows) => { 123 let has_more = rows.len() as i64 > limit; 124 let repos: Vec<RepoInfo> = rows 125 .iter() 126 .take(limit as usize) 127 .map(|row| { 128 RepoInfo { 129 did: row.did.clone(), 130 head: row.repo_root_cid.clone(), 131 rev: chrono::Utc::now().timestamp_millis().to_string(), 132 active: true, 133 } 134 }) 135 .collect(); 136 137 let next_cursor = if has_more { 138 repos.last().map(|r| r.did.clone()) 139 } else { 140 None 141 }; 142 143 ( 144 StatusCode::OK, 145 Json(ListReposOutput { 146 cursor: next_cursor, 147 repos, 148 }), 149 ) 150 .into_response() 151 } 152 Err(e) => { 153 error!("DB error in list_repos: {:?}", e); 154 ( 155 StatusCode::INTERNAL_SERVER_ERROR, 156 Json(json!({"error": "InternalError"})), 157 ) 158 .into_response() 159 } 160 } 161} 162 163#[derive(Deserialize)] 164pub struct GetBlobParams { 165 pub did: String, 166 pub cid: String, 167} 168 169pub async fn get_blob( 170 State(state): State<AppState>, 171 Query(params): Query<GetBlobParams>, 172) -> Response { 173 let did = params.did.trim(); 174 let cid = params.cid.trim(); 175 176 if did.is_empty() { 177 return ( 178 StatusCode::BAD_REQUEST, 179 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 180 ) 181 .into_response(); 182 } 183 184 if cid.is_empty() { 185 return ( 186 StatusCode::BAD_REQUEST, 187 Json(json!({"error": "InvalidRequest", "message": "cid is required"})), 188 ) 189 .into_response(); 190 } 191 192 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 193 .fetch_optional(&state.db) 194 .await; 195 196 match user_exists { 197 Ok(None) => { 198 return ( 199 StatusCode::NOT_FOUND, 200 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 201 ) 202 .into_response(); 203 } 204 Err(e) => { 205 error!("DB error in get_blob: {:?}", e); 206 return ( 207 StatusCode::INTERNAL_SERVER_ERROR, 208 Json(json!({"error": "InternalError"})), 209 ) 210 .into_response(); 211 } 212 Ok(Some(_)) => {} 213 } 214 215 let blob_result = sqlx::query!("SELECT storage_key, mime_type FROM blobs WHERE cid = $1", cid) 216 .fetch_optional(&state.db) 217 .await; 218 219 match blob_result { 220 Ok(Some(row)) => { 221 let storage_key = &row.storage_key; 222 let mime_type = &row.mime_type; 223 224 match state.blob_store.get(&storage_key).await { 225 Ok(data) => Response::builder() 226 .status(StatusCode::OK) 227 .header(header::CONTENT_TYPE, mime_type) 228 .body(Body::from(data)) 229 .unwrap(), 230 Err(e) => { 231 error!("Failed to fetch blob from storage: {:?}", e); 232 ( 233 StatusCode::NOT_FOUND, 234 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})), 235 ) 236 .into_response() 237 } 238 } 239 } 240 Ok(None) => ( 241 StatusCode::NOT_FOUND, 242 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})), 243 ) 244 .into_response(), 245 Err(e) => { 246 error!("DB error in get_blob: {:?}", e); 247 ( 248 StatusCode::INTERNAL_SERVER_ERROR, 249 Json(json!({"error": "InternalError"})), 250 ) 251 .into_response() 252 } 253 } 254} 255 256#[derive(Deserialize)] 257pub struct ListBlobsParams { 258 pub did: String, 259 pub since: Option<String>, 260 pub limit: Option<i64>, 261 pub cursor: Option<String>, 262} 263 264#[derive(Serialize)] 265pub struct ListBlobsOutput { 266 pub cursor: Option<String>, 267 pub cids: Vec<String>, 268} 269 270pub async fn list_blobs( 271 State(state): State<AppState>, 272 Query(params): Query<ListBlobsParams>, 273) -> Response { 274 let did = params.did.trim(); 275 276 if did.is_empty() { 277 return ( 278 StatusCode::BAD_REQUEST, 279 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 280 ) 281 .into_response(); 282 } 283 284 let limit = params.limit.unwrap_or(500).min(1000); 285 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 286 287 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 288 .fetch_optional(&state.db) 289 .await; 290 291 let user_id = match user_result { 292 Ok(Some(row)) => row.id, 293 Ok(None) => { 294 return ( 295 StatusCode::NOT_FOUND, 296 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 297 ) 298 .into_response(); 299 } 300 Err(e) => { 301 error!("DB error in list_blobs: {:?}", e); 302 return ( 303 StatusCode::INTERNAL_SERVER_ERROR, 304 Json(json!({"error": "InternalError"})), 305 ) 306 .into_response(); 307 } 308 }; 309 310 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = &params.since { 311 let since_time = chrono::DateTime::parse_from_rfc3339(since) 312 .map(|dt| dt.with_timezone(&chrono::Utc)) 313 .unwrap_or_else(|_| chrono::Utc::now()); 314 sqlx::query!( 315 r#" 316 SELECT cid FROM blobs 317 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3 318 ORDER BY cid ASC 319 LIMIT $4 320 "#, 321 user_id, 322 cursor_cid, 323 since_time, 324 limit + 1 325 ) 326 .fetch_all(&state.db) 327 .await 328 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 329 } else { 330 sqlx::query!( 331 r#" 332 SELECT cid FROM blobs 333 WHERE created_by_user = $1 AND cid > $2 334 ORDER BY cid ASC 335 LIMIT $3 336 "#, 337 user_id, 338 cursor_cid, 339 limit + 1 340 ) 341 .fetch_all(&state.db) 342 .await 343 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 344 }; 345 346 match cids_result { 347 Ok(cids) => { 348 let has_more = cids.len() as i64 > limit; 349 let cids: Vec<String> = cids 350 .into_iter() 351 .take(limit as usize) 352 .collect(); 353 354 let next_cursor = if has_more { 355 cids.last().cloned() 356 } else { 357 None 358 }; 359 360 ( 361 StatusCode::OK, 362 Json(ListBlobsOutput { 363 cursor: next_cursor, 364 cids, 365 }), 366 ) 367 .into_response() 368 } 369 Err(e) => { 370 error!("DB error in list_blobs: {:?}", e); 371 ( 372 StatusCode::INTERNAL_SERVER_ERROR, 373 Json(json!({"error": "InternalError"})), 374 ) 375 .into_response() 376 } 377 } 378} 379 380#[derive(Deserialize)] 381pub struct GetRepoStatusParams { 382 pub did: String, 383} 384 385#[derive(Serialize)] 386pub struct GetRepoStatusOutput { 387 pub did: String, 388 pub active: bool, 389 pub rev: Option<String>, 390} 391 392pub async fn get_repo_status( 393 State(state): State<AppState>, 394 Query(params): Query<GetRepoStatusParams>, 395) -> Response { 396 let did = params.did.trim(); 397 398 if did.is_empty() { 399 return ( 400 StatusCode::BAD_REQUEST, 401 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 402 ) 403 .into_response(); 404 } 405 406 let result = sqlx::query!( 407 r#" 408 SELECT u.did, r.repo_root_cid 409 FROM users u 410 LEFT JOIN repos r ON u.id = r.user_id 411 WHERE u.did = $1 412 "#, 413 did 414 ) 415 .fetch_optional(&state.db) 416 .await; 417 418 match result { 419 Ok(Some(row)) => { 420 let rev = Some(chrono::Utc::now().timestamp_millis().to_string()); 421 422 ( 423 StatusCode::OK, 424 Json(GetRepoStatusOutput { 425 did: row.did, 426 active: true, 427 rev, 428 }), 429 ) 430 .into_response() 431 } 432 Ok(None) => ( 433 StatusCode::NOT_FOUND, 434 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 435 ) 436 .into_response(), 437 Err(e) => { 438 error!("DB error in get_repo_status: {:?}", e); 439 ( 440 StatusCode::INTERNAL_SERVER_ERROR, 441 Json(json!({"error": "InternalError"})), 442 ) 443 .into_response() 444 } 445 } 446} 447 448#[derive(Deserialize)] 449pub struct NotifyOfUpdateParams { 450 pub hostname: String, 451} 452 453pub async fn notify_of_update( 454 State(_state): State<AppState>, 455 Query(params): Query<NotifyOfUpdateParams>, 456) -> Response { 457 info!("Received notifyOfUpdate from hostname: {}", params.hostname); 458 // TODO: Queue job for crawler interaction or relay notification 459 info!("TODO: Queue job for notifyOfUpdate (not implemented)"); 460 461 (StatusCode::OK, Json(json!({}))).into_response() 462} 463 464#[derive(Deserialize)] 465pub struct RequestCrawlInput { 466 pub hostname: String, 467} 468 469pub async fn request_crawl( 470 State(_state): State<AppState>, 471 Json(input): Json<RequestCrawlInput>, 472) -> Response { 473 info!("Received requestCrawl for hostname: {}", input.hostname); 474 // TODO: Queue job for crawling 475 info!("TODO: Queue job for requestCrawl (not implemented)"); 476 477 (StatusCode::OK, Json(json!({}))).into_response() 478}