this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 body::Body, 5 extract::{Query, State}, 6 http::StatusCode, 7 http::header, 8 response::{IntoResponse, Response}, 9}; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use sqlx::Row; 13use tracing::{error, info}; 14 15#[derive(Deserialize)] 16pub struct GetLatestCommitParams { 17 pub did: String, 18} 19 20#[derive(Serialize)] 21pub struct GetLatestCommitOutput { 22 pub cid: String, 23 pub rev: String, 24} 25 26pub async fn get_latest_commit( 27 State(state): State<AppState>, 28 Query(params): Query<GetLatestCommitParams>, 29) -> Response { 30 let did = params.did.trim(); 31 32 if did.is_empty() { 33 return ( 34 StatusCode::BAD_REQUEST, 35 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 36 ) 37 .into_response(); 38 } 39 40 let result = sqlx::query( 41 r#" 42 SELECT r.repo_root_cid 43 FROM repos r 44 JOIN users u ON r.user_id = u.id 45 WHERE u.did = $1 46 "#, 47 ) 48 .bind(did) 49 .fetch_optional(&state.db) 50 .await; 51 52 match result { 53 Ok(Some(row)) => { 54 let cid: String = row.get("repo_root_cid"); 55 ( 56 StatusCode::OK, 57 Json(GetLatestCommitOutput { 58 cid, 59 rev: chrono::Utc::now().timestamp_millis().to_string(), 60 }), 61 ) 62 .into_response() 63 } 64 Ok(None) => ( 65 StatusCode::NOT_FOUND, 66 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 67 ) 68 .into_response(), 69 Err(e) => { 70 error!("DB error in get_latest_commit: {:?}", e); 71 ( 72 StatusCode::INTERNAL_SERVER_ERROR, 73 Json(json!({"error": "InternalError"})), 74 ) 75 .into_response() 76 } 77 } 78} 79 80#[derive(Deserialize)] 81pub struct ListReposParams { 82 pub limit: Option<i64>, 83 pub cursor: Option<String>, 84} 85 86#[derive(Serialize)] 87#[serde(rename_all = "camelCase")] 88pub struct RepoInfo { 89 pub did: String, 90 pub head: String, 91 pub rev: String, 92 pub active: bool, 93} 94 95#[derive(Serialize)] 96pub struct ListReposOutput { 97 pub cursor: Option<String>, 98 pub repos: Vec<RepoInfo>, 99} 100 101pub async fn list_repos( 102 State(state): State<AppState>, 103 Query(params): Query<ListReposParams>, 104) -> Response { 105 let limit = params.limit.unwrap_or(50).min(1000); 106 let cursor_did = params.cursor.as_deref().unwrap_or(""); 107 108 let result = sqlx::query( 109 r#" 110 SELECT u.did, r.repo_root_cid 111 FROM repos r 112 JOIN users u ON r.user_id = u.id 113 WHERE u.did > $1 114 ORDER BY u.did ASC 115 LIMIT $2 116 "#, 117 ) 118 .bind(cursor_did) 119 .bind(limit + 1) 120 .fetch_all(&state.db) 121 .await; 122 123 match result { 124 Ok(rows) => { 125 let has_more = rows.len() as i64 > limit; 126 let repos: Vec<RepoInfo> = rows 127 .iter() 128 .take(limit as usize) 129 .map(|row| { 130 let did: String = row.get("did"); 131 let head: String = row.get("repo_root_cid"); 132 RepoInfo { 133 did, 134 head, 135 rev: chrono::Utc::now().timestamp_millis().to_string(), 136 active: true, 137 } 138 }) 139 .collect(); 140 141 let next_cursor = if has_more { 142 repos.last().map(|r| r.did.clone()) 143 } else { 144 None 145 }; 146 147 ( 148 StatusCode::OK, 149 Json(ListReposOutput { 150 cursor: next_cursor, 151 repos, 152 }), 153 ) 154 .into_response() 155 } 156 Err(e) => { 157 error!("DB error in list_repos: {:?}", e); 158 ( 159 StatusCode::INTERNAL_SERVER_ERROR, 160 Json(json!({"error": "InternalError"})), 161 ) 162 .into_response() 163 } 164 } 165} 166 167#[derive(Deserialize)] 168pub struct GetBlobParams { 169 pub did: String, 170 pub cid: String, 171} 172 173pub async fn get_blob( 174 State(state): State<AppState>, 175 Query(params): Query<GetBlobParams>, 176) -> Response { 177 let did = params.did.trim(); 178 let cid = params.cid.trim(); 179 180 if did.is_empty() { 181 return ( 182 StatusCode::BAD_REQUEST, 183 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 184 ) 185 .into_response(); 186 } 187 188 if cid.is_empty() { 189 return ( 190 StatusCode::BAD_REQUEST, 191 Json(json!({"error": "InvalidRequest", "message": "cid is required"})), 192 ) 193 .into_response(); 194 } 195 196 let user_exists = sqlx::query("SELECT id FROM users WHERE did = $1") 197 .bind(did) 198 .fetch_optional(&state.db) 199 .await; 200 201 match user_exists { 202 Ok(None) => { 203 return ( 204 StatusCode::NOT_FOUND, 205 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 206 ) 207 .into_response(); 208 } 209 Err(e) => { 210 error!("DB error in get_blob: {:?}", e); 211 return ( 212 StatusCode::INTERNAL_SERVER_ERROR, 213 Json(json!({"error": "InternalError"})), 214 ) 215 .into_response(); 216 } 217 Ok(Some(_)) => {} 218 } 219 220 let blob_result = sqlx::query("SELECT storage_key, mime_type FROM blobs WHERE cid = $1") 221 .bind(cid) 222 .fetch_optional(&state.db) 223 .await; 224 225 match blob_result { 226 Ok(Some(row)) => { 227 let storage_key: String = row.get("storage_key"); 228 let mime_type: String = row.get("mime_type"); 229 230 match state.blob_store.get(&storage_key).await { 231 Ok(data) => Response::builder() 232 .status(StatusCode::OK) 233 .header(header::CONTENT_TYPE, mime_type) 234 .body(Body::from(data)) 235 .unwrap(), 236 Err(e) => { 237 error!("Failed to fetch blob from storage: {:?}", e); 238 ( 239 StatusCode::NOT_FOUND, 240 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})), 241 ) 242 .into_response() 243 } 244 } 245 } 246 Ok(None) => ( 247 StatusCode::NOT_FOUND, 248 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})), 249 ) 250 .into_response(), 251 Err(e) => { 252 error!("DB error in get_blob: {:?}", e); 253 ( 254 StatusCode::INTERNAL_SERVER_ERROR, 255 Json(json!({"error": "InternalError"})), 256 ) 257 .into_response() 258 } 259 } 260} 261 262#[derive(Deserialize)] 263pub struct ListBlobsParams { 264 pub did: String, 265 pub since: Option<String>, 266 pub limit: Option<i64>, 267 pub cursor: Option<String>, 268} 269 270#[derive(Serialize)] 271pub struct ListBlobsOutput { 272 pub cursor: Option<String>, 273 pub cids: Vec<String>, 274} 275 276pub async fn list_blobs( 277 State(state): State<AppState>, 278 Query(params): Query<ListBlobsParams>, 279) -> Response { 280 let did = params.did.trim(); 281 282 if did.is_empty() { 283 return ( 284 StatusCode::BAD_REQUEST, 285 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 286 ) 287 .into_response(); 288 } 289 290 let limit = params.limit.unwrap_or(500).min(1000); 291 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 292 293 let user_result = sqlx::query("SELECT id FROM users WHERE did = $1") 294 .bind(did) 295 .fetch_optional(&state.db) 296 .await; 297 298 let user_id: uuid::Uuid = match user_result { 299 Ok(Some(row)) => row.get("id"), 300 Ok(None) => { 301 return ( 302 StatusCode::NOT_FOUND, 303 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 304 ) 305 .into_response(); 306 } 307 Err(e) => { 308 error!("DB error in list_blobs: {:?}", e); 309 return ( 310 StatusCode::INTERNAL_SERVER_ERROR, 311 Json(json!({"error": "InternalError"})), 312 ) 313 .into_response(); 314 } 315 }; 316 317 let result = if let Some(since) = &params.since { 318 sqlx::query( 319 r#" 320 SELECT cid FROM blobs 321 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3 322 ORDER BY cid ASC 323 LIMIT $4 324 "#, 325 ) 326 .bind(user_id) 327 .bind(cursor_cid) 328 .bind(since) 329 .bind(limit + 1) 330 .fetch_all(&state.db) 331 .await 332 } else { 333 sqlx::query( 334 r#" 335 SELECT cid FROM blobs 336 WHERE created_by_user = $1 AND cid > $2 337 ORDER BY cid ASC 338 LIMIT $3 339 "#, 340 ) 341 .bind(user_id) 342 .bind(cursor_cid) 343 .bind(limit + 1) 344 .fetch_all(&state.db) 345 .await 346 }; 347 348 match result { 349 Ok(rows) => { 350 let has_more = rows.len() as i64 > limit; 351 let cids: Vec<String> = rows 352 .iter() 353 .take(limit as usize) 354 .map(|row| row.get("cid")) 355 .collect(); 356 357 let next_cursor = if has_more { 358 cids.last().cloned() 359 } else { 360 None 361 }; 362 363 ( 364 StatusCode::OK, 365 Json(ListBlobsOutput { 366 cursor: next_cursor, 367 cids, 368 }), 369 ) 370 .into_response() 371 } 372 Err(e) => { 373 error!("DB error in list_blobs: {:?}", e); 374 ( 375 StatusCode::INTERNAL_SERVER_ERROR, 376 Json(json!({"error": "InternalError"})), 377 ) 378 .into_response() 379 } 380 } 381} 382 383#[derive(Deserialize)] 384pub struct GetRepoStatusParams { 385 pub did: String, 386} 387 388#[derive(Serialize)] 389pub struct GetRepoStatusOutput { 390 pub did: String, 391 pub active: bool, 392 pub rev: Option<String>, 393} 394 395pub async fn get_repo_status( 396 State(state): State<AppState>, 397 Query(params): Query<GetRepoStatusParams>, 398) -> Response { 399 let did = params.did.trim(); 400 401 if did.is_empty() { 402 return ( 403 StatusCode::BAD_REQUEST, 404 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 405 ) 406 .into_response(); 407 } 408 409 let result = sqlx::query( 410 r#" 411 SELECT u.did, r.repo_root_cid 412 FROM users u 413 LEFT JOIN repos r ON u.id = r.user_id 414 WHERE u.did = $1 415 "#, 416 ) 417 .bind(did) 418 .fetch_optional(&state.db) 419 .await; 420 421 match result { 422 Ok(Some(row)) => { 423 let user_did: String = row.get("did"); 424 let repo_root: Option<String> = row.get("repo_root_cid"); 425 426 let rev = repo_root.map(|_| chrono::Utc::now().timestamp_millis().to_string()); 427 428 ( 429 StatusCode::OK, 430 Json(GetRepoStatusOutput { 431 did: user_did, 432 active: true, 433 rev, 434 }), 435 ) 436 .into_response() 437 } 438 Ok(None) => ( 439 StatusCode::NOT_FOUND, 440 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 441 ) 442 .into_response(), 443 Err(e) => { 444 error!("DB error in get_repo_status: {:?}", e); 445 ( 446 StatusCode::INTERNAL_SERVER_ERROR, 447 Json(json!({"error": "InternalError"})), 448 ) 449 .into_response() 450 } 451 } 452} 453 454#[derive(Deserialize)] 455pub struct NotifyOfUpdateParams { 456 pub hostname: String, 457} 458 459pub async fn notify_of_update( 460 State(_state): State<AppState>, 461 Query(params): Query<NotifyOfUpdateParams>, 462) -> Response { 463 info!("Received notifyOfUpdate from hostname: {}", params.hostname); 464 465 (StatusCode::OK, Json(json!({}))).into_response() 466} 467 468#[derive(Deserialize)] 469pub struct RequestCrawlInput { 470 pub hostname: String, 471} 472 473pub async fn request_crawl( 474 State(_state): State<AppState>, 475 Json(input): Json<RequestCrawlInput>, 476) -> Response { 477 info!("Received requestCrawl for hostname: {}", input.hostname); 478 479 (StatusCode::OK, Json(json!({}))).into_response() 480}