this repo has no description

Come back to some functions I made as stubs earlier

Changed files
+428 -13
migrations
src
api
moderation
repo
record
server
sync
tests
common
+32
Cargo.lock
··· 914 914 "bytes", 915 915 "chrono", 916 916 "cid", 917 + "ctor", 917 918 "dotenvy", 918 919 "jacquard", 919 920 "jacquard-axum", ··· 1373 1374 ] 1374 1375 1375 1376 [[package]] 1377 + name = "ctor" 1378 + version = "0.6.3" 1379 + source = "registry+https://github.com/rust-lang/crates.io-index" 1380 + checksum = "424e0138278faeb2b401f174ad17e715c829512d74f3d1e81eb43365c2e0590e" 1381 + dependencies = [ 1382 + "ctor-proc-macro", 1383 + "dtor", 1384 + ] 1385 + 1386 + [[package]] 1387 + name = "ctor-proc-macro" 1388 + version = "0.0.7" 1389 + source = "registry+https://github.com/rust-lang/crates.io-index" 1390 + checksum = "52560adf09603e58c9a7ee1fe1dcb95a16927b17c127f0ac02d6e768a0e25bc1" 1391 + 1392 + [[package]] 1376 1393 name = "curve25519-dalek" 1377 1394 version = "4.1.3" 1378 1395 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1612 1629 version = "0.15.7" 1613 1630 source = "registry+https://github.com/rust-lang/crates.io-index" 1614 1631 checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" 1632 + 1633 + [[package]] 1634 + name = "dtor" 1635 + version = "0.1.1" 1636 + source = "registry+https://github.com/rust-lang/crates.io-index" 1637 + checksum = "404d02eeb088a82cfd873006cb713fe411306c7d182c344905e101fb1167d301" 1638 + dependencies = [ 1639 + "dtor-proc-macro", 1640 + ] 1641 + 1642 + [[package]] 1643 + name = "dtor-proc-macro" 1644 + version = "0.0.6" 1645 + source = "registry+https://github.com/rust-lang/crates.io-index" 1646 + checksum = "f678cf4a922c215c63e0de95eb1ff08a958a81d47e485cf9da1e27bf6305cfa5" 1615 1647 1616 1648 [[package]] 1617 1649 name = "dunce"
+1
Cargo.toml
··· 36 36 uuid = { version = "1.19.0", features = ["v4", "fast-rng"] } 37 37 38 38 [dev-dependencies] 39 + ctor = "0.6.3" 39 40 testcontainers = "0.26.0" 40 41 testcontainers-modules = { version = "0.14.0", features = ["postgres"] } 41 42 wiremock = "0.6.5"
+11
migrations/202512211600_moderation_and_status.sql
··· 1 + ALTER TABLE users ADD COLUMN deactivated_at TIMESTAMPTZ; 2 + 3 + -- * reports u * 4 + CREATE TABLE reports ( 5 + id BIGINT PRIMARY KEY, 6 + reason_type TEXT NOT NULL, 7 + reason TEXT, 8 + subject_json JSONB NOT NULL, 9 + reported_by_did TEXT NOT NULL, 10 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() 11 + );
+24 -3
src/api/moderation/mod.rs
··· 110 110 .into_response(); 111 111 } 112 112 113 - let created_at = chrono::Utc::now().to_rfc3339(); 114 - let report_id = chrono::Utc::now().timestamp_millis(); 113 + let created_at = chrono::Utc::now(); 114 + let report_id = created_at.timestamp_millis(); 115 + 116 + let insert = sqlx::query( 117 + "INSERT INTO reports (id, reason_type, reason, subject_json, reported_by_did, created_at) VALUES ($1, $2, $3, $4, $5, $6)" 118 + ) 119 + .bind(report_id) 120 + .bind(&input.reason_type) 121 + .bind(&input.reason) 122 + .bind(json!(input.subject)) 123 + .bind(&did) 124 + .bind(created_at) 125 + .execute(&state.db) 126 + .await; 127 + 128 + if let Err(e) = insert { 129 + error!("Failed to insert report: {:?}", e); 130 + return ( 131 + StatusCode::INTERNAL_SERVER_ERROR, 132 + Json(json!({"error": "InternalError"})), 133 + ) 134 + .into_response(); 135 + } 115 136 116 137 ( 117 138 StatusCode::OK, ··· 121 142 reason: input.reason, 122 143 subject: input.subject, 123 144 reported_by: did, 124 - created_at, 145 + created_at: created_at.to_rfc3339(), 125 146 }), 126 147 ) 127 148 .into_response()
+171 -4
src/api/repo/blob.rs
··· 7 7 response::{IntoResponse, Response}, 8 8 }; 9 9 use cid::Cid; 10 + use jacquard_repo::storage::BlockStore; 10 11 use multihash::Multihash; 11 12 use serde::{Deserialize, Serialize}; 12 13 use serde_json::json; 13 14 use sha2::{Digest, Sha256}; 14 15 use sqlx::Row; 16 + use std::str::FromStr; 15 17 use tracing::error; 16 18 17 19 pub async fn upload_blob( ··· 157 159 pub blobs: Vec<RecordBlob>, 158 160 } 159 161 162 + fn find_blobs(val: &serde_json::Value, blobs: &mut Vec<String>) { 163 + if let Some(obj) = val.as_object() { 164 + if let Some(type_val) = obj.get("$type") { 165 + if type_val == "blob" { 166 + if let Some(r) = obj.get("ref") { 167 + if let Some(link) = r.get("$link") { 168 + if let Some(s) = link.as_str() { 169 + blobs.push(s.to_string()); 170 + } 171 + } 172 + } 173 + } 174 + } 175 + for (_, v) in obj { 176 + find_blobs(v, blobs); 177 + } 178 + } else if let Some(arr) = val.as_array() { 179 + for v in arr { 180 + find_blobs(v, blobs); 181 + } 182 + } 183 + } 184 + 160 185 pub async fn list_missing_blobs( 161 - State(_state): State<AppState>, 186 + State(state): State<AppState>, 162 187 headers: axum::http::HeaderMap, 163 - Query(_params): Query<ListMissingBlobsParams>, 188 + Query(params): Query<ListMissingBlobsParams>, 164 189 ) -> Response { 165 190 let auth_header = headers.get("Authorization"); 166 191 if auth_header.is_none() { ··· 171 196 .into_response(); 172 197 } 173 198 199 + let token = auth_header 200 + .unwrap() 201 + .to_str() 202 + .unwrap_or("") 203 + .replace("Bearer ", ""); 204 + 205 + let session = sqlx::query( 206 + "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 207 + ) 208 + .bind(&token) 209 + .fetch_optional(&state.db) 210 + .await 211 + .unwrap_or(None); 212 + 213 + let (did, key_bytes) = match session { 214 + Some(row) => ( 215 + row.get::<String, _>("did"), 216 + row.get::<Vec<u8>, _>("key_bytes"), 217 + ), 218 + None => { 219 + return ( 220 + StatusCode::UNAUTHORIZED, 221 + Json(json!({"error": "AuthenticationFailed"})), 222 + ) 223 + .into_response(); 224 + } 225 + }; 226 + 227 + if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 228 + return ( 229 + StatusCode::UNAUTHORIZED, 230 + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 231 + ) 232 + .into_response(); 233 + } 234 + 235 + let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 236 + .bind(&did) 237 + .fetch_optional(&state.db) 238 + .await; 239 + 240 + let user_id: uuid::Uuid = match user_query { 241 + Ok(Some(row)) => row.get("id"), 242 + _ => { 243 + return ( 244 + StatusCode::INTERNAL_SERVER_ERROR, 245 + Json(json!({"error": "InternalError"})), 246 + ) 247 + .into_response(); 248 + } 249 + }; 250 + 251 + let limit = params.limit.unwrap_or(500).min(1000); 252 + let cursor_str = params.cursor.unwrap_or_default(); 253 + let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') { 254 + let parts: Vec<&str> = cursor_str.split('|').collect(); 255 + (parts[0].to_string(), parts[1].to_string()) 256 + } else { 257 + (String::new(), String::new()) 258 + }; 259 + 260 + let records_query = sqlx::query( 261 + "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4" 262 + ) 263 + .bind(user_id) 264 + .bind(cursor_collection) 265 + .bind(cursor_rkey) 266 + .bind(limit) 267 + .fetch_all(&state.db) 268 + .await; 269 + 270 + let records = match records_query { 271 + Ok(r) => r, 272 + Err(e) => { 273 + error!("DB error fetching records: {:?}", e); 274 + return ( 275 + StatusCode::INTERNAL_SERVER_ERROR, 276 + Json(json!({"error": "InternalError"})), 277 + ) 278 + .into_response(); 279 + } 280 + }; 281 + 282 + let mut missing_blobs = Vec::new(); 283 + let mut last_cursor = None; 284 + 285 + for row in &records { 286 + let collection: String = row.get("collection"); 287 + let rkey: String = row.get("rkey"); 288 + let record_cid_str: String = row.get("record_cid"); 289 + 290 + last_cursor = Some(format!("{}|{}", collection, rkey)); 291 + 292 + let record_cid = match Cid::from_str(&record_cid_str) { 293 + Ok(c) => c, 294 + Err(_) => continue, 295 + }; 296 + 297 + let block_bytes = match state.block_store.get(&record_cid).await { 298 + Ok(Some(b)) => b, 299 + _ => continue, 300 + }; 301 + 302 + let record_val: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block_bytes) { 303 + Ok(v) => v, 304 + Err(_) => continue, 305 + }; 306 + 307 + let mut blobs = Vec::new(); 308 + find_blobs(&record_val, &mut blobs); 309 + 310 + for blob_cid_str in blobs { 311 + let exists = sqlx::query("SELECT 1 FROM blobs WHERE cid = $1 AND created_by_user = $2") 312 + .bind(&blob_cid_str) 313 + .bind(user_id) 314 + .fetch_optional(&state.db) 315 + .await; 316 + 317 + match exists { 318 + Ok(None) => { 319 + missing_blobs.push(RecordBlob { 320 + cid: blob_cid_str, 321 + record_uri: format!("at://{}/{}/{}", did, collection, rkey), 322 + }); 323 + } 324 + Err(e) => { 325 + error!("DB error checking blob existence: {:?}", e); 326 + } 327 + _ => {} 328 + } 329 + } 330 + } 331 + 332 + // if we fetched fewer records than limit, we are done, so cursor is None. 333 + // otherwise, cursor is the last one we saw. 334 + // ...right? 335 + let next_cursor = if records.len() < limit as usize { 336 + None 337 + } else { 338 + last_cursor 339 + }; 340 + 174 341 ( 175 342 StatusCode::OK, 176 343 Json(ListMissingBlobsOutput { 177 - cursor: None, 178 - blobs: vec![], 344 + cursor: next_cursor, 345 + blobs: missing_blobs, 179 346 }), 180 347 ) 181 348 .into_response()
+24
src/api/repo/record/write.rs
··· 182 182 .rkey 183 183 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 184 184 185 + if input.validate.unwrap_or(true) { 186 + if input.collection == "app.bsky.feed.post" { 187 + if input.record.get("text").is_none() || input.record.get("createdAt").is_none() { 188 + return ( 189 + StatusCode::BAD_REQUEST, 190 + Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})), 191 + ) 192 + .into_response(); 193 + } 194 + } 195 + } 196 + 185 197 let mut record_bytes = Vec::new(); 186 198 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 187 199 error!("Error serializing record: {:?}", e); ··· 471 483 }; 472 484 473 485 let rkey = input.rkey.clone(); 486 + 487 + if input.validate.unwrap_or(true) { 488 + if input.collection == "app.bsky.feed.post" { 489 + if input.record.get("text").is_none() || input.record.get("createdAt").is_none() { 490 + return ( 491 + StatusCode::BAD_REQUEST, 492 + Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})), 493 + ) 494 + .into_response(); 495 + } 496 + } 497 + } 474 498 475 499 let mut record_bytes = Vec::new(); 476 500 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
+143 -5
src/api/server/session.rs
··· 561 561 .into_response(); 562 562 } 563 563 564 + let user_status = sqlx::query("SELECT deactivated_at FROM users WHERE did = $1") 565 + .bind(&did) 566 + .fetch_optional(&state.db) 567 + .await; 568 + 569 + let deactivated_at: Option<chrono::DateTime<chrono::Utc>> = match user_status { 570 + Ok(Some(row)) => row.get("deactivated_at"), 571 + _ => None, 572 + }; 573 + 564 574 let repo_result = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 565 575 .bind(user_id) 566 576 .fetch_optional(&state.db) ··· 589 599 ( 590 600 StatusCode::OK, 591 601 Json(CheckAccountStatusOutput { 592 - activated: true, 602 + activated: deactivated_at.is_none(), 593 603 valid_did, 594 604 repo_commit: repo_commit.clone(), 595 605 repo_rev: chrono::Utc::now().timestamp_millis().to_string(), ··· 604 614 } 605 615 606 616 pub async fn activate_account( 607 - State(_state): State<AppState>, 617 + State(state): State<AppState>, 608 618 headers: axum::http::HeaderMap, 609 619 ) -> Response { 610 620 let auth_header = headers.get("Authorization"); ··· 616 626 .into_response(); 617 627 } 618 628 619 - (StatusCode::OK, Json(json!({}))).into_response() 629 + let token = auth_header 630 + .unwrap() 631 + .to_str() 632 + .unwrap_or("") 633 + .replace("Bearer ", ""); 634 + 635 + let session = sqlx::query( 636 + r#" 637 + SELECT s.did, k.key_bytes 638 + FROM sessions s 639 + JOIN users u ON s.did = u.did 640 + JOIN user_keys k ON u.id = k.user_id 641 + WHERE s.access_jwt = $1 642 + "#, 643 + ) 644 + .bind(&token) 645 + .fetch_optional(&state.db) 646 + .await; 647 + 648 + let (did, key_bytes) = match session { 649 + Ok(Some(row)) => ( 650 + row.get::<String, _>("did"), 651 + row.get::<Vec<u8>, _>("key_bytes"), 652 + ), 653 + Ok(None) => { 654 + return ( 655 + StatusCode::UNAUTHORIZED, 656 + Json(json!({"error": "AuthenticationFailed"})), 657 + ) 658 + .into_response(); 659 + } 660 + Err(e) => { 661 + error!("DB error in activate_account: {:?}", e); 662 + return ( 663 + StatusCode::INTERNAL_SERVER_ERROR, 664 + Json(json!({"error": "InternalError"})), 665 + ) 666 + .into_response(); 667 + } 668 + }; 669 + 670 + if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 671 + return ( 672 + StatusCode::UNAUTHORIZED, 673 + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 674 + ) 675 + .into_response(); 676 + } 677 + 678 + let result = sqlx::query("UPDATE users SET deactivated_at = NULL WHERE did = $1") 679 + .bind(&did) 680 + .execute(&state.db) 681 + .await; 682 + 683 + match result { 684 + Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(), 685 + Err(e) => { 686 + error!("DB error activating account: {:?}", e); 687 + ( 688 + StatusCode::INTERNAL_SERVER_ERROR, 689 + Json(json!({"error": "InternalError"})), 690 + ) 691 + .into_response() 692 + } 693 + } 620 694 } 621 695 622 696 #[derive(Deserialize)] ··· 626 700 } 627 701 628 702 pub async fn deactivate_account( 629 - State(_state): State<AppState>, 703 + State(state): State<AppState>, 630 704 headers: axum::http::HeaderMap, 631 705 Json(_input): Json<DeactivateAccountInput>, 632 706 ) -> Response { ··· 639 713 .into_response(); 640 714 } 641 715 642 - (StatusCode::OK, Json(json!({}))).into_response() 716 + let token = auth_header 717 + .unwrap() 718 + .to_str() 719 + .unwrap_or("") 720 + .replace("Bearer ", ""); 721 + 722 + let session = sqlx::query( 723 + r#" 724 + SELECT s.did, k.key_bytes 725 + FROM sessions s 726 + JOIN users u ON s.did = u.did 727 + JOIN user_keys k ON u.id = k.user_id 728 + WHERE s.access_jwt = $1 729 + "#, 730 + ) 731 + .bind(&token) 732 + .fetch_optional(&state.db) 733 + .await; 734 + 735 + let (did, key_bytes) = match session { 736 + Ok(Some(row)) => ( 737 + row.get::<String, _>("did"), 738 + row.get::<Vec<u8>, _>("key_bytes"), 739 + ), 740 + Ok(None) => { 741 + return ( 742 + StatusCode::UNAUTHORIZED, 743 + Json(json!({"error": "AuthenticationFailed"})), 744 + ) 745 + .into_response(); 746 + } 747 + Err(e) => { 748 + error!("DB error in deactivate_account: {:?}", e); 749 + return ( 750 + StatusCode::INTERNAL_SERVER_ERROR, 751 + Json(json!({"error": "InternalError"})), 752 + ) 753 + .into_response(); 754 + } 755 + }; 756 + 757 + if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 758 + return ( 759 + StatusCode::UNAUTHORIZED, 760 + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 761 + ) 762 + .into_response(); 763 + } 764 + 765 + let result = sqlx::query("UPDATE users SET deactivated_at = NOW() WHERE did = $1") 766 + .bind(&did) 767 + .execute(&state.db) 768 + .await; 769 + 770 + match result { 771 + Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(), 772 + Err(e) => { 773 + error!("DB error deactivating account: {:?}", e); 774 + ( 775 + StatusCode::INTERNAL_SERVER_ERROR, 776 + Json(json!({"error": "InternalError"})), 777 + ) 778 + .into_response() 779 + } 780 + } 643 781 } 644 782 645 783 #[derive(Serialize)]
+4
src/sync/mod.rs
··· 461 461 Query(params): Query<NotifyOfUpdateParams>, 462 462 ) -> Response { 463 463 info!("Received notifyOfUpdate from hostname: {}", params.hostname); 464 + // TODO: Queue job for crawler interaction or relay notification 465 + info!("TODO: Queue job for notifyOfUpdate (not implemented)"); 464 466 465 467 (StatusCode::OK, Json(json!({}))).into_response() 466 468 } ··· 475 477 Json(input): Json<RequestCrawlInput>, 476 478 ) -> Response { 477 479 info!("Received requestCrawl for hostname: {}", input.hostname); 480 + // TODO: Queue job for crawling 481 + info!("TODO: Queue job for requestCrawl (not implemented)"); 478 482 479 483 (StatusCode::OK, Json(json!({}))).into_response() 480 484 }
+18
tests/common/mod.rs
··· 32 32 #[allow(dead_code)] 33 33 pub const TARGET_DID: &str = "did:plc:target"; 34 34 35 + #[cfg(test)] 36 + #[ctor::dtor] 37 + fn cleanup() { 38 + // my attempt to force clean up containers created by this test binary. 39 + // this is a fallback in case ryuk fails or is not supported 40 + if std::env::var("XDG_RUNTIME_DIR").is_ok() { 41 + let _ = std::process::Command::new("podman") 42 + .args(&["rm", "-f", "--filter", "label=bspds_test=true"]) 43 + .output(); 44 + } 45 + 46 + let _ = std::process::Command::new("docker") 47 + .args(&["container", "prune", "-f", "--filter", "label=bspds_test=true"]) 48 + .output(); 49 + } 50 + 35 51 #[allow(dead_code)] 36 52 pub fn client() -> Client { 37 53 Client::new() ··· 63 79 .with_env_var("MINIO_ROOT_USER", "minioadmin") 64 80 .with_env_var("MINIO_ROOT_PASSWORD", "minioadmin") 65 81 .with_cmd(vec!["server".to_string(), "/data".to_string()]) 82 + .with_label("bspds_test", "true") 66 83 .start() 67 84 .await 68 85 .expect("Failed to start MinIO"); ··· 131 148 132 149 let container = Postgres::default() 133 150 .with_tag("18-alpine") 151 + .with_label("bspds_test", "true") 134 152 .start() 135 153 .await 136 154 .expect("Failed to start Postgres");
-1
tests/repo.rs
··· 204 204 } 205 205 206 206 #[tokio::test] 207 - #[ignore] 208 207 async fn test_put_record_invalid_schema() { 209 208 let client = client(); 210 209 let (token, did) = create_account_and_login(&client).await;