this repo has no description

Add endpoints & improve tests

lewis 6c3b8701 d1618f42

Changed files
+2644 -170
src
api
identity
repo
server
sync
tests
+8 -8
TODO.md
··· 33 33 - [ ] Implement `com.atproto.server.createInviteCodes`. 34 34 - [ ] Implement `com.atproto.server.deactivateAccount` / `deleteAccount`. 35 35 - [ ] Implement `com.atproto.server.getAccountInviteCodes`. 36 - - [ ] Implement `com.atproto.server.getServiceAuth` (Cross-service auth). 36 + - [x] Implement `com.atproto.server.getServiceAuth` (Cross-service auth). 37 37 - [ ] Implement `com.atproto.server.listAppPasswords`. 38 38 - [ ] Implement `com.atproto.server.requestAccountDelete`. 39 39 - [ ] Implement `com.atproto.server.requestEmailConfirmation` / `requestEmailUpdate`. ··· 44 44 45 45 ## Repository Operations (`com.atproto.repo`) 46 46 - [ ] Record CRUD 47 - - [ ] Implement `com.atproto.repo.createRecord`. 47 + - [x] Implement `com.atproto.repo.createRecord`. 48 48 - [ ] Validate schema against Lexicon (just structure, not complex logic). 49 - - [ ] Generate `rkey` (TID) if not provided. 50 - - [ ] Handle MST (Merkle Search Tree) insertion. 49 + - [x] Generate `rkey` (TID) if not provided. 50 + - [x] Handle MST (Merkle Search Tree) insertion. 51 51 - [ ] **Trigger Firehose Event**. 52 52 - [x] Implement `com.atproto.repo.putRecord`. 53 53 - [x] Implement `com.atproto.repo.getRecord`. 54 54 - [x] Implement `com.atproto.repo.deleteRecord`. 55 55 - [x] Implement `com.atproto.repo.listRecords`. 56 56 - [x] Implement `com.atproto.repo.describeRepo`. 57 - - [ ] Implement `com.atproto.repo.applyWrites` (Batch writes). 57 + - [x] Implement `com.atproto.repo.applyWrites` (Batch writes). 58 58 - [ ] Implement `com.atproto.repo.importRepo` (Migration). 59 59 - [ ] Implement `com.atproto.repo.listMissingBlobs`. 60 60 - [ ] Blob Management ··· 70 70 - [ ] Bulk Export 71 71 - [ ] Implement `com.atproto.sync.getRepo` (Return full CAR file of repo). 72 72 - [ ] Implement `com.atproto.sync.getBlocks` (Return specific blocks via CIDs). 73 - - [ ] Implement `com.atproto.sync.getLatestCommit`. 73 + - [x] Implement `com.atproto.sync.getLatestCommit`. 74 74 - [ ] Implement `com.atproto.sync.getRecord` (Sync version, distinct from repo.getRecord). 75 75 - [ ] Implement `com.atproto.sync.getRepoStatus`. 76 - - [ ] Implement `com.atproto.sync.listRepos`. 76 + - [x] Implement `com.atproto.sync.listRepos`. 77 77 - [ ] Implement `com.atproto.sync.notifyOfUpdate`. 78 78 - [ ] Blob Sync 79 79 - [ ] Implement `com.atproto.sync.getBlob`. ··· 83 83 84 84 ## Identity (`com.atproto.identity`) 85 85 - [ ] Resolution 86 - - [ ] Implement `com.atproto.identity.resolveHandle` (Can be internal or proxy to PLC). 86 + - [x] Implement `com.atproto.identity.resolveHandle` (Can be internal or proxy to PLC). 87 87 - [ ] Implement `com.atproto.identity.updateHandle`. 88 88 - [ ] Implement `com.atproto.identity.submitPlcOperation` / `signPlcOperation` / `requestPlcOperationSignature`. 89 89 - [ ] Implement `com.atproto.identity.getRecommendedDidCredentials`.
+20 -15
justfile
··· 1 - # Run all tests with correct threading models 2 - test: test-proxy test-lifecycle test-others 1 + # Run all tests 2 + test: 3 + cargo test 3 4 4 - # Proxy tests modify environment variables, so must run single-threaded 5 - # TODO: figure out how to run in parallel 5 + # Run specific test suites if needed 6 + test-repo: 7 + cargo test --test repo 8 + 9 + test-lifecycle: 10 + cargo test --test lifecycle 11 + 6 12 test-proxy: 7 - cargo test --test proxy -- --test-threads=1 13 + cargo test --test proxy 14 + 15 + test-sync: 16 + cargo test --test sync 17 + 18 + test-server: 19 + cargo test --test server 8 20 9 - # Lifecycle tests involve complex state mutations, run single-threaded to be safe 10 - # TODO: figure out how to run in parallel 11 - test-lifecycle: 12 - cargo test --test lifecycle -- --test-threads=1 21 + test-identity: 22 + cargo test --test identity 13 23 14 - test-others: 15 - cargo test --lib 24 + test-auth: 16 25 cargo test --test auth 17 - cargo test --test identity 18 - cargo test --test repo 19 - cargo test --test server 20 - cargo test --test sync
+47 -1
src/api/identity/did.rs
··· 1 1 use crate::state::AppState; 2 2 use axum::{ 3 3 Json, 4 - extract::{Path, State}, 4 + extract::{Path, Query, State}, 5 5 http::StatusCode, 6 6 response::{IntoResponse, Response}, 7 7 }; ··· 9 9 use k256::SecretKey; 10 10 use k256::elliptic_curve::sec1::ToEncodedPoint; 11 11 use reqwest; 12 + use serde::Deserialize; 12 13 use serde_json::json; 13 14 use sqlx::Row; 14 15 use tracing::error; 16 + 17 + #[derive(Deserialize)] 18 + pub struct ResolveHandleParams { 19 + pub handle: String, 20 + } 21 + 22 + pub async fn resolve_handle( 23 + State(state): State<AppState>, 24 + Query(params): Query<ResolveHandleParams>, 25 + ) -> Response { 26 + let handle = params.handle.trim(); 27 + 28 + if handle.is_empty() { 29 + return ( 30 + StatusCode::BAD_REQUEST, 31 + Json(json!({"error": "InvalidRequest", "message": "handle is required"})), 32 + ) 33 + .into_response(); 34 + } 35 + 36 + let user = sqlx::query("SELECT did FROM users WHERE handle = $1") 37 + .bind(handle) 38 + .fetch_optional(&state.db) 39 + .await; 40 + 41 + match user { 42 + Ok(Some(row)) => { 43 + let did: String = row.get("did"); 44 + (StatusCode::OK, Json(json!({ "did": did }))).into_response() 45 + } 46 + Ok(None) => ( 47 + StatusCode::NOT_FOUND, 48 + Json(json!({"error": "HandleNotFound", "message": "Unable to resolve handle"})), 49 + ) 50 + .into_response(), 51 + Err(e) => { 52 + error!("DB error resolving handle: {:?}", e); 53 + ( 54 + StatusCode::INTERNAL_SERVER_ERROR, 55 + Json(json!({"error": "InternalError"})), 56 + ) 57 + .into_response() 58 + } 59 + } 60 + } 15 61 16 62 pub fn get_jwk(key_bytes: &[u8]) -> serde_json::Value { 17 63 let secret_key = SecretKey::from_slice(key_bytes).expect("Invalid key length");
+1 -1
src/api/identity/mod.rs
··· 2 2 pub mod did; 3 3 4 4 pub use account::create_account; 5 - pub use did::{user_did_doc, well_known_did}; 5 + pub use did::{resolve_handle, user_did_doc, well_known_did};
+1 -1
src/api/repo/mod.rs
··· 4 4 5 5 pub use blob::upload_blob; 6 6 pub use meta::describe_repo; 7 - pub use record::{create_record, delete_record, get_record, list_records, put_record}; 7 + pub use record::{apply_writes, create_record, delete_record, get_record, list_records, put_record};
+505
src/api/repo/record/batch.rs
··· 1 + use crate::state::AppState; 2 + use axum::{ 3 + Json, 4 + extract::State, 5 + http::StatusCode, 6 + response::{IntoResponse, Response}, 7 + }; 8 + use chrono::Utc; 9 + use cid::Cid; 10 + use jacquard::types::{ 11 + did::Did, 12 + integer::LimitedU32, 13 + string::{Nsid, Tid}, 14 + }; 15 + use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16 + use serde::{Deserialize, Serialize}; 17 + use serde_json::json; 18 + use sqlx::Row; 19 + use std::str::FromStr; 20 + use std::sync::Arc; 21 + use tracing::error; 22 + 23 + #[derive(Deserialize)] 24 + #[serde(tag = "$type")] 25 + pub enum WriteOp { 26 + #[serde(rename = "com.atproto.repo.applyWrites#create")] 27 + Create { 28 + collection: String, 29 + rkey: Option<String>, 30 + value: serde_json::Value, 31 + }, 32 + #[serde(rename = "com.atproto.repo.applyWrites#update")] 33 + Update { 34 + collection: String, 35 + rkey: String, 36 + value: serde_json::Value, 37 + }, 38 + #[serde(rename = "com.atproto.repo.applyWrites#delete")] 39 + Delete { collection: String, rkey: String }, 40 + } 41 + 42 + #[derive(Deserialize)] 43 + #[serde(rename_all = "camelCase")] 44 + pub struct ApplyWritesInput { 45 + pub repo: String, 46 + pub validate: Option<bool>, 47 + pub writes: Vec<WriteOp>, 48 + pub swap_commit: Option<String>, 49 + } 50 + 51 + #[derive(Serialize)] 52 + #[serde(tag = "$type")] 53 + pub enum WriteResult { 54 + #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 55 + CreateResult { uri: String, cid: String }, 56 + #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 57 + UpdateResult { uri: String, cid: String }, 58 + #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 59 + DeleteResult {}, 60 + } 61 + 62 + #[derive(Serialize)] 63 + pub struct ApplyWritesOutput { 64 + pub commit: CommitInfo, 65 + pub results: Vec<WriteResult>, 66 + } 67 + 68 + #[derive(Serialize)] 69 + pub struct CommitInfo { 70 + pub cid: String, 71 + pub rev: String, 72 + } 73 + 74 + pub async fn apply_writes( 75 + State(state): State<AppState>, 76 + headers: axum::http::HeaderMap, 77 + Json(input): Json<ApplyWritesInput>, 78 + ) -> Response { 79 + let auth_header = headers.get("Authorization"); 80 + if auth_header.is_none() { 81 + return ( 82 + StatusCode::UNAUTHORIZED, 83 + Json(json!({"error": "AuthenticationRequired"})), 84 + ) 85 + .into_response(); 86 + } 87 + let token = auth_header 88 + .unwrap() 89 + .to_str() 90 + .unwrap_or("") 91 + .replace("Bearer ", ""); 92 + 93 + let session = sqlx::query( 94 + "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 95 + ) 96 + .bind(&token) 97 + .fetch_optional(&state.db) 98 + .await 99 + .unwrap_or(None); 100 + 101 + let (did, key_bytes) = match session { 102 + Some(row) => ( 103 + row.get::<String, _>("did"), 104 + row.get::<Vec<u8>, _>("key_bytes"), 105 + ), 106 + None => { 107 + return ( 108 + StatusCode::UNAUTHORIZED, 109 + Json(json!({"error": "AuthenticationFailed"})), 110 + ) 111 + .into_response(); 112 + } 113 + }; 114 + 115 + if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 116 + return ( 117 + StatusCode::UNAUTHORIZED, 118 + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 119 + ) 120 + .into_response(); 121 + } 122 + 123 + if input.repo != did { 124 + return ( 125 + StatusCode::FORBIDDEN, 126 + Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 127 + ) 128 + .into_response(); 129 + } 130 + 131 + if input.writes.is_empty() { 132 + return ( 133 + StatusCode::BAD_REQUEST, 134 + Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})), 135 + ) 136 + .into_response(); 137 + } 138 + 139 + if input.writes.len() > 200 { 140 + return ( 141 + StatusCode::BAD_REQUEST, 142 + Json(json!({"error": "InvalidRequest", "message": "Too many writes (max 200)"})), 143 + ) 144 + .into_response(); 145 + } 146 + 147 + let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 148 + .bind(&did) 149 + .fetch_optional(&state.db) 150 + .await; 151 + 152 + let user_id: uuid::Uuid = match user_query { 153 + Ok(Some(row)) => row.get("id"), 154 + _ => { 155 + return ( 156 + StatusCode::INTERNAL_SERVER_ERROR, 157 + Json(json!({"error": "InternalError", "message": "User not found"})), 158 + ) 159 + .into_response(); 160 + } 161 + }; 162 + 163 + let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 164 + .bind(user_id) 165 + .fetch_optional(&state.db) 166 + .await; 167 + 168 + let current_root_cid = match repo_root_query { 169 + Ok(Some(row)) => { 170 + let cid_str: String = row.get("repo_root_cid"); 171 + match Cid::from_str(&cid_str) { 172 + Ok(c) => c, 173 + Err(_) => { 174 + return ( 175 + StatusCode::INTERNAL_SERVER_ERROR, 176 + Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 177 + ) 178 + .into_response(); 179 + } 180 + } 181 + } 182 + _ => { 183 + return ( 184 + StatusCode::INTERNAL_SERVER_ERROR, 185 + Json(json!({"error": "InternalError", "message": "Repo root not found"})), 186 + ) 187 + .into_response(); 188 + } 189 + }; 190 + 191 + if let Some(swap_commit) = &input.swap_commit { 192 + let swap_cid = match Cid::from_str(swap_commit) { 193 + Ok(c) => c, 194 + Err(_) => { 195 + return ( 196 + StatusCode::BAD_REQUEST, 197 + Json(json!({"error": "InvalidSwap", "message": "Invalid swapCommit CID"})), 198 + ) 199 + .into_response(); 200 + } 201 + }; 202 + if swap_cid != current_root_cid { 203 + return ( 204 + StatusCode::CONFLICT, 205 + Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 206 + ) 207 + .into_response(); 208 + } 209 + } 210 + 211 + let commit_bytes = match state.block_store.get(&current_root_cid).await { 212 + Ok(Some(b)) => b, 213 + Ok(None) => { 214 + return ( 215 + StatusCode::INTERNAL_SERVER_ERROR, 216 + Json(json!({"error": "InternalError", "message": "Commit block not found"})), 217 + ) 218 + .into_response(); 219 + } 220 + Err(e) => { 221 + error!("Failed to load commit block: {:?}", e); 222 + return ( 223 + StatusCode::INTERNAL_SERVER_ERROR, 224 + Json(json!({"error": "InternalError"})), 225 + ) 226 + .into_response(); 227 + } 228 + }; 229 + 230 + let commit = match Commit::from_cbor(&commit_bytes) { 231 + Ok(c) => c, 232 + Err(e) => { 233 + error!("Failed to parse commit: {:?}", e); 234 + return ( 235 + StatusCode::INTERNAL_SERVER_ERROR, 236 + Json(json!({"error": "InternalError"})), 237 + ) 238 + .into_response(); 239 + } 240 + }; 241 + 242 + let mst_root = commit.data; 243 + let store = Arc::new(state.block_store.clone()); 244 + let mut mst = Mst::load(store.clone(), mst_root, None); 245 + 246 + let mut results: Vec<WriteResult> = Vec::new(); 247 + let mut record_ops: Vec<(String, String, Option<String>)> = Vec::new(); 248 + 249 + for write in &input.writes { 250 + match write { 251 + WriteOp::Create { 252 + collection, 253 + rkey, 254 + value, 255 + } => { 256 + let collection_nsid = match collection.parse::<Nsid>() { 257 + Ok(n) => n, 258 + Err(_) => { 259 + return ( 260 + StatusCode::BAD_REQUEST, 261 + Json(json!({"error": "InvalidCollection"})), 262 + ) 263 + .into_response(); 264 + } 265 + }; 266 + 267 + let rkey = rkey 268 + .clone() 269 + .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 270 + 271 + let mut record_bytes = Vec::new(); 272 + if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, value) { 273 + error!("Error serializing record: {:?}", e); 274 + return ( 275 + StatusCode::BAD_REQUEST, 276 + Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 277 + ) 278 + .into_response(); 279 + } 280 + 281 + let record_cid = match state.block_store.put(&record_bytes).await { 282 + Ok(c) => c, 283 + Err(e) => { 284 + error!("Failed to save record block: {:?}", e); 285 + return ( 286 + StatusCode::INTERNAL_SERVER_ERROR, 287 + Json(json!({"error": "InternalError"})), 288 + ) 289 + .into_response(); 290 + } 291 + }; 292 + 293 + let key = format!("{}/{}", collection_nsid, rkey); 294 + mst = match mst.add(&key, record_cid).await { 295 + Ok(m) => m, 296 + Err(e) => { 297 + error!("Failed to add to MST: {:?}", e); 298 + return ( 299 + StatusCode::INTERNAL_SERVER_ERROR, 300 + Json(json!({"error": "InternalError"})), 301 + ) 302 + .into_response(); 303 + } 304 + }; 305 + 306 + let uri = format!("at://{}/{}/{}", did, collection, rkey); 307 + results.push(WriteResult::CreateResult { 308 + uri: uri.clone(), 309 + cid: record_cid.to_string(), 310 + }); 311 + record_ops.push((collection.clone(), rkey, Some(record_cid.to_string()))); 312 + } 313 + WriteOp::Update { 314 + collection, 315 + rkey, 316 + value, 317 + } => { 318 + let collection_nsid = match collection.parse::<Nsid>() { 319 + Ok(n) => n, 320 + Err(_) => { 321 + return ( 322 + StatusCode::BAD_REQUEST, 323 + Json(json!({"error": "InvalidCollection"})), 324 + ) 325 + .into_response(); 326 + } 327 + }; 328 + 329 + let mut record_bytes = Vec::new(); 330 + if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, value) { 331 + error!("Error serializing record: {:?}", e); 332 + return ( 333 + StatusCode::BAD_REQUEST, 334 + Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 335 + ) 336 + .into_response(); 337 + } 338 + 339 + let record_cid = match state.block_store.put(&record_bytes).await { 340 + Ok(c) => c, 341 + Err(e) => { 342 + error!("Failed to save record block: {:?}", e); 343 + return ( 344 + StatusCode::INTERNAL_SERVER_ERROR, 345 + Json(json!({"error": "InternalError"})), 346 + ) 347 + .into_response(); 348 + } 349 + }; 350 + 351 + let key = format!("{}/{}", collection_nsid, rkey); 352 + mst = match mst.update(&key, record_cid).await { 353 + Ok(m) => m, 354 + Err(e) => { 355 + error!("Failed to update MST: {:?}", e); 356 + return ( 357 + StatusCode::INTERNAL_SERVER_ERROR, 358 + Json(json!({"error": "InternalError"})), 359 + ) 360 + .into_response(); 361 + } 362 + }; 363 + 364 + let uri = format!("at://{}/{}/{}", did, collection, rkey); 365 + results.push(WriteResult::UpdateResult { 366 + uri: uri.clone(), 367 + cid: record_cid.to_string(), 368 + }); 369 + record_ops.push((collection.clone(), rkey.clone(), Some(record_cid.to_string()))); 370 + } 371 + WriteOp::Delete { collection, rkey } => { 372 + let collection_nsid = match collection.parse::<Nsid>() { 373 + Ok(n) => n, 374 + Err(_) => { 375 + return ( 376 + StatusCode::BAD_REQUEST, 377 + Json(json!({"error": "InvalidCollection"})), 378 + ) 379 + .into_response(); 380 + } 381 + }; 382 + 383 + let key = format!("{}/{}", collection_nsid, rkey); 384 + mst = match mst.delete(&key).await { 385 + Ok(m) => m, 386 + Err(e) => { 387 + error!("Failed to delete from MST: {:?}", e); 388 + return ( 389 + StatusCode::INTERNAL_SERVER_ERROR, 390 + Json(json!({"error": "InternalError"})), 391 + ) 392 + .into_response(); 393 + } 394 + }; 395 + 396 + results.push(WriteResult::DeleteResult {}); 397 + record_ops.push((collection.clone(), rkey.clone(), None)); 398 + } 399 + } 400 + } 401 + 402 + let new_mst_root = match mst.persist().await { 403 + Ok(c) => c, 404 + Err(e) => { 405 + error!("Failed to persist MST: {:?}", e); 406 + return ( 407 + StatusCode::INTERNAL_SERVER_ERROR, 408 + Json(json!({"error": "InternalError"})), 409 + ) 410 + .into_response(); 411 + } 412 + }; 413 + 414 + let did_obj = match Did::new(&did) { 415 + Ok(d) => d, 416 + Err(_) => { 417 + return ( 418 + StatusCode::INTERNAL_SERVER_ERROR, 419 + Json(json!({"error": "InternalError", "message": "Invalid DID"})), 420 + ) 421 + .into_response(); 422 + } 423 + }; 424 + 425 + let rev = Tid::now(LimitedU32::MIN); 426 + let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), Some(current_root_cid)); 427 + 428 + let new_commit_bytes = match new_commit.to_cbor() { 429 + Ok(b) => b, 430 + Err(e) => { 431 + error!("Failed to serialize new commit: {:?}", e); 432 + return ( 433 + StatusCode::INTERNAL_SERVER_ERROR, 434 + Json(json!({"error": "InternalError"})), 435 + ) 436 + .into_response(); 437 + } 438 + }; 439 + 440 + let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 441 + Ok(c) => c, 442 + Err(e) => { 443 + error!("Failed to save new commit: {:?}", e); 444 + return ( 445 + StatusCode::INTERNAL_SERVER_ERROR, 446 + Json(json!({"error": "InternalError"})), 447 + ) 448 + .into_response(); 449 + } 450 + }; 451 + 452 + let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 453 + .bind(new_root_cid.to_string()) 454 + .bind(user_id) 455 + .execute(&state.db) 456 + .await; 457 + 458 + if let Err(e) = update_repo { 459 + error!("Failed to update repo root in DB: {:?}", e); 460 + return ( 461 + StatusCode::INTERNAL_SERVER_ERROR, 462 + Json(json!({"error": "InternalError"})), 463 + ) 464 + .into_response(); 465 + } 466 + 467 + for (collection, rkey, record_cid) in record_ops { 468 + match record_cid { 469 + Some(cid) => { 470 + let _ = sqlx::query( 471 + "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 472 + ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 473 + ) 474 + .bind(user_id) 475 + .bind(&collection) 476 + .bind(&rkey) 477 + .bind(&cid) 478 + .execute(&state.db) 479 + .await; 480 + } 481 + None => { 482 + let _ = sqlx::query( 483 + "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 484 + ) 485 + .bind(user_id) 486 + .bind(&collection) 487 + .bind(&rkey) 488 + .execute(&state.db) 489 + .await; 490 + } 491 + } 492 + } 493 + 494 + ( 495 + StatusCode::OK, 496 + Json(ApplyWritesOutput { 497 + commit: CommitInfo { 498 + cid: new_root_cid.to_string(), 499 + rev: rev.to_string(), 500 + }, 501 + results, 502 + }), 503 + ) 504 + .into_response() 505 + }
+2
src/api/repo/record/mod.rs
··· 1 + pub mod batch; 1 2 pub mod delete; 2 3 pub mod read; 3 4 pub mod write; 4 5 6 + pub use batch::apply_writes; 5 7 pub use delete::{DeleteRecordInput, delete_record}; 6 8 pub use read::{GetRecordInput, ListRecordsInput, ListRecordsOutput, get_record, list_records}; 7 9 pub use write::{
+1 -1
src/api/server/mod.rs
··· 2 2 pub mod session; 3 3 4 4 pub use meta::{describe_server, health}; 5 - pub use session::{create_session, delete_session, get_session, refresh_session}; 5 + pub use session::{create_session, delete_session, get_service_auth, get_session, refresh_session};
+94 -1
src/api/server/session.rs
··· 1 1 use crate::state::AppState; 2 2 use axum::{ 3 3 Json, 4 - extract::State, 4 + extract::{Query, State}, 5 5 http::StatusCode, 6 6 response::{IntoResponse, Response}, 7 7 }; ··· 10 10 use serde_json::json; 11 11 use sqlx::Row; 12 12 use tracing::{error, info, warn}; 13 + 14 + #[derive(Deserialize)] 15 + pub struct GetServiceAuthParams { 16 + pub aud: String, 17 + pub lxm: Option<String>, 18 + pub exp: Option<i64>, 19 + } 20 + 21 + #[derive(Serialize)] 22 + pub struct GetServiceAuthOutput { 23 + pub token: String, 24 + } 25 + 26 + pub async fn get_service_auth( 27 + State(state): State<AppState>, 28 + headers: axum::http::HeaderMap, 29 + Query(params): Query<GetServiceAuthParams>, 30 + ) -> Response { 31 + let auth_header = headers.get("Authorization"); 32 + if auth_header.is_none() { 33 + return ( 34 + StatusCode::UNAUTHORIZED, 35 + Json(json!({"error": "AuthenticationRequired"})), 36 + ) 37 + .into_response(); 38 + } 39 + 40 + let token = auth_header 41 + .unwrap() 42 + .to_str() 43 + .unwrap_or("") 44 + .replace("Bearer ", ""); 45 + 46 + let session = sqlx::query( 47 + r#" 48 + SELECT s.did, k.key_bytes 49 + FROM sessions s 50 + JOIN users u ON s.did = u.did 51 + JOIN user_keys k ON u.id = k.user_id 52 + WHERE s.access_jwt = $1 53 + "#, 54 + ) 55 + .bind(&token) 56 + .fetch_optional(&state.db) 57 + .await; 58 + 59 + let (did, key_bytes) = match session { 60 + Ok(Some(row)) => ( 61 + row.get::<String, _>("did"), 62 + row.get::<Vec<u8>, _>("key_bytes"), 63 + ), 64 + Ok(None) => { 65 + return ( 66 + StatusCode::UNAUTHORIZED, 67 + Json(json!({"error": "AuthenticationFailed"})), 68 + ) 69 + .into_response(); 70 + } 71 + Err(e) => { 72 + error!("DB error in get_service_auth: {:?}", e); 73 + return ( 74 + StatusCode::INTERNAL_SERVER_ERROR, 75 + Json(json!({"error": "InternalError"})), 76 + ) 77 + .into_response(); 78 + } 79 + }; 80 + 81 + if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 82 + return ( 83 + StatusCode::UNAUTHORIZED, 84 + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 85 + ) 86 + .into_response(); 87 + } 88 + 89 + let lxm = params.lxm.as_deref().unwrap_or("*"); 90 + 91 + let service_token = match crate::auth::create_service_token(&did, &params.aud, lxm, &key_bytes) 92 + { 93 + Ok(t) => t, 94 + Err(e) => { 95 + error!("Failed to create service token: {:?}", e); 96 + return ( 97 + StatusCode::INTERNAL_SERVER_ERROR, 98 + Json(json!({"error": "InternalError"})), 99 + ) 100 + .into_response(); 101 + } 102 + }; 103 + 104 + (StatusCode::OK, Json(GetServiceAuthOutput { token: service_token })).into_response() 105 + } 13 106 14 107 #[derive(Deserialize)] 15 108 pub struct CreateSessionInput {
+22
src/lib.rs
··· 3 3 pub mod repo; 4 4 pub mod state; 5 5 pub mod storage; 6 + pub mod sync; 6 7 7 8 use axum::{ 8 9 Router, ··· 38 39 post(api::server::refresh_session), 39 40 ) 40 41 .route( 42 + "/xrpc/com.atproto.server.getServiceAuth", 43 + get(api::server::get_service_auth), 44 + ) 45 + .route( 46 + "/xrpc/com.atproto.identity.resolveHandle", 47 + get(api::identity::resolve_handle), 48 + ) 49 + .route( 41 50 "/xrpc/com.atproto.repo.createRecord", 42 51 post(api::repo::create_record), 43 52 ) ··· 65 74 "/xrpc/com.atproto.repo.uploadBlob", 66 75 post(api::repo::upload_blob), 67 76 ) 77 + .route( 78 + "/xrpc/com.atproto.repo.applyWrites", 79 + post(api::repo::apply_writes), 80 + ) 81 + .route( 82 + "/xrpc/com.atproto.sync.getLatestCommit", 83 + get(sync::get_latest_commit), 84 + ) 85 + .route( 86 + "/xrpc/com.atproto.sync.listRepos", 87 + get(sync::list_repos), 88 + ) 89 + // I know I know, I'm not supposed to implement appview endpoints. Leave me be 68 90 .route( 69 91 "/xrpc/app.bsky.feed.getTimeline", 70 92 get(api::feed::get_timeline),
+163
src/sync/mod.rs
··· 1 + use crate::state::AppState; 2 + use axum::{ 3 + Json, 4 + extract::{Query, State}, 5 + http::StatusCode, 6 + response::{IntoResponse, Response}, 7 + }; 8 + use serde::{Deserialize, Serialize}; 9 + use serde_json::json; 10 + use sqlx::Row; 11 + use tracing::error; 12 + 13 + #[derive(Deserialize)] 14 + pub struct GetLatestCommitParams { 15 + pub did: String, 16 + } 17 + 18 + #[derive(Serialize)] 19 + pub struct GetLatestCommitOutput { 20 + pub cid: String, 21 + pub rev: String, 22 + } 23 + 24 + pub async fn get_latest_commit( 25 + State(state): State<AppState>, 26 + Query(params): Query<GetLatestCommitParams>, 27 + ) -> Response { 28 + let did = params.did.trim(); 29 + 30 + if did.is_empty() { 31 + return ( 32 + StatusCode::BAD_REQUEST, 33 + Json(json!({"error": "InvalidRequest", "message": "did is required"})), 34 + ) 35 + .into_response(); 36 + } 37 + 38 + let result = sqlx::query( 39 + r#" 40 + SELECT r.repo_root_cid 41 + FROM repos r 42 + JOIN users u ON r.user_id = u.id 43 + WHERE u.did = $1 44 + "#, 45 + ) 46 + .bind(did) 47 + .fetch_optional(&state.db) 48 + .await; 49 + 50 + match result { 51 + Ok(Some(row)) => { 52 + let cid: String = row.get("repo_root_cid"); 53 + ( 54 + StatusCode::OK, 55 + Json(GetLatestCommitOutput { 56 + cid, 57 + rev: chrono::Utc::now().timestamp_millis().to_string(), 58 + }), 59 + ) 60 + .into_response() 61 + } 62 + Ok(None) => ( 63 + StatusCode::NOT_FOUND, 64 + Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 65 + ) 66 + .into_response(), 67 + Err(e) => { 68 + error!("DB error in get_latest_commit: {:?}", e); 69 + ( 70 + StatusCode::INTERNAL_SERVER_ERROR, 71 + Json(json!({"error": "InternalError"})), 72 + ) 73 + .into_response() 74 + } 75 + } 76 + } 77 + 78 + #[derive(Deserialize)] 79 + pub struct ListReposParams { 80 + pub limit: Option<i64>, 81 + pub cursor: Option<String>, 82 + } 83 + 84 + #[derive(Serialize)] 85 + #[serde(rename_all = "camelCase")] 86 + pub struct RepoInfo { 87 + pub did: String, 88 + pub head: String, 89 + pub rev: String, 90 + pub active: bool, 91 + } 92 + 93 + #[derive(Serialize)] 94 + pub struct ListReposOutput { 95 + pub cursor: Option<String>, 96 + pub repos: Vec<RepoInfo>, 97 + } 98 + 99 + pub async fn list_repos( 100 + State(state): State<AppState>, 101 + Query(params): Query<ListReposParams>, 102 + ) -> Response { 103 + let limit = params.limit.unwrap_or(50).min(1000); 104 + let cursor_did = params.cursor.as_deref().unwrap_or(""); 105 + 106 + let result = sqlx::query( 107 + r#" 108 + SELECT u.did, r.repo_root_cid 109 + FROM repos r 110 + JOIN users u ON r.user_id = u.id 111 + WHERE u.did > $1 112 + ORDER BY u.did ASC 113 + LIMIT $2 114 + "#, 115 + ) 116 + .bind(cursor_did) 117 + .bind(limit + 1) 118 + .fetch_all(&state.db) 119 + .await; 120 + 121 + match result { 122 + Ok(rows) => { 123 + let has_more = rows.len() as i64 > limit; 124 + let repos: Vec<RepoInfo> = rows 125 + .iter() 126 + .take(limit as usize) 127 + .map(|row| { 128 + let did: String = row.get("did"); 129 + let head: String = row.get("repo_root_cid"); 130 + RepoInfo { 131 + did, 132 + head, 133 + rev: chrono::Utc::now().timestamp_millis().to_string(), 134 + active: true, 135 + } 136 + }) 137 + .collect(); 138 + 139 + let next_cursor = if has_more { 140 + repos.last().map(|r| r.did.clone()) 141 + } else { 142 + None 143 + }; 144 + 145 + ( 146 + StatusCode::OK, 147 + Json(ListReposOutput { 148 + cursor: next_cursor, 149 + repos, 150 + }), 151 + ) 152 + .into_response() 153 + } 154 + Err(e) => { 155 + error!("DB error in list_repos: {:?}", e); 156 + ( 157 + StatusCode::INTERNAL_SERVER_ERROR, 158 + Json(json!({"error": "InternalError"})), 159 + ) 160 + .into_response() 161 + } 162 + } 163 + }
+41 -24
tests/common/mod.rs
··· 156 156 157 157 async fn spawn_app(database_url: String) -> String { 158 158 let pool = PgPoolOptions::new() 159 + .max_connections(50) 159 160 .connect(&database_url) 160 161 .await 161 162 .expect("Failed to connect to Postgres. Make sure the database is running."); ··· 256 257 257 258 #[allow(dead_code)] 258 259 pub async fn create_account_and_login(client: &Client) -> (String, String) { 259 - let handle = format!("user_{}", uuid::Uuid::new_v4()); 260 - let payload = json!({ 261 - "handle": handle, 262 - "email": format!("{}@example.com", handle), 263 - "password": "password" 264 - }); 260 + let mut last_error = String::new(); 265 261 266 - let res = client 267 - .post(format!( 268 - "{}/xrpc/com.atproto.server.createAccount", 269 - base_url().await 270 - )) 271 - .json(&payload) 272 - .send() 273 - .await 274 - .expect("Failed to create account"); 262 + for attempt in 0..3 { 263 + if attempt > 0 { 264 + tokio::time::sleep(Duration::from_millis(100 * (attempt as u64 + 1))).await; 265 + } 275 266 276 - if res.status() != StatusCode::OK { 277 - panic!("Failed to create account: {:?}", res.text().await); 267 + let handle = format!("user_{}", uuid::Uuid::new_v4()); 268 + let payload = json!({ 269 + "handle": handle, 270 + "email": format!("{}@example.com", handle), 271 + "password": "password" 272 + }); 273 + 274 + let res = match client 275 + .post(format!( 276 + "{}/xrpc/com.atproto.server.createAccount", 277 + base_url().await 278 + )) 279 + .json(&payload) 280 + .send() 281 + .await 282 + { 283 + Ok(r) => r, 284 + Err(e) => { 285 + last_error = format!("Request failed: {}", e); 286 + continue; 287 + } 288 + }; 289 + 290 + if res.status() == StatusCode::OK { 291 + let body: Value = res.json().await.expect("Invalid JSON"); 292 + let access_jwt = body["accessJwt"] 293 + .as_str() 294 + .expect("No accessJwt") 295 + .to_string(); 296 + let did = body["did"].as_str().expect("No did").to_string(); 297 + return (access_jwt, did); 298 + } 299 + 300 + last_error = format!("Status {}: {:?}", res.status(), res.text().await); 278 301 } 279 302 280 - let body: Value = res.json().await.expect("Invalid JSON"); 281 - let access_jwt = body["accessJwt"] 282 - .as_str() 283 - .expect("No accessJwt") 284 - .to_string(); 285 - let did = body["did"].as_str().expect("No did").to_string(); 286 - (access_jwt, did) 303 + panic!("Failed to create account after 3 attempts: {}", last_error); 287 304 }
+73 -14
tests/identity.rs
··· 5 5 use wiremock::matchers::{method, path}; 6 6 use wiremock::{Mock, MockServer, ResponseTemplate}; 7 7 8 - // #[tokio::test] 9 - // async fn test_resolve_handle() { 10 - // let client = client(); 11 - // let params = [ 12 - // ("handle", "bsky.app"), 13 - // ]; 14 - // let res = client.get(format!("{}/xrpc/com.atproto.identity.resolveHandle", base_url().await)) 15 - // .query(&params) 16 - // .send() 17 - // .await 18 - // .expect("Failed to send request"); 19 - // 20 - // assert_eq!(res.status(), StatusCode::OK); 21 - // } 8 + #[tokio::test] 9 + async fn test_resolve_handle_success() { 10 + let client = client(); 11 + let handle = format!("resolvetest_{}", uuid::Uuid::new_v4()); 12 + let payload = json!({ 13 + "handle": handle, 14 + "email": format!("{}@example.com", handle), 15 + "password": "password" 16 + }); 17 + 18 + let res = client 19 + .post(format!( 20 + "{}/xrpc/com.atproto.server.createAccount", 21 + base_url().await 22 + )) 23 + .json(&payload) 24 + .send() 25 + .await 26 + .expect("Failed to create account"); 27 + 28 + assert_eq!(res.status(), StatusCode::OK); 29 + let body: Value = res.json().await.expect("Invalid JSON"); 30 + let did = body["did"].as_str().expect("No DID").to_string(); 31 + 32 + let params = [("handle", handle.as_str())]; 33 + let res = client 34 + .get(format!( 35 + "{}/xrpc/com.atproto.identity.resolveHandle", 36 + base_url().await 37 + )) 38 + .query(&params) 39 + .send() 40 + .await 41 + .expect("Failed to send request"); 42 + 43 + assert_eq!(res.status(), StatusCode::OK); 44 + let body: Value = res.json().await.expect("Response was not valid JSON"); 45 + assert_eq!(body["did"], did); 46 + } 47 + 48 + #[tokio::test] 49 + async fn test_resolve_handle_not_found() { 50 + let client = client(); 51 + let params = [("handle", "nonexistent_handle_12345")]; 52 + let res = client 53 + .get(format!( 54 + "{}/xrpc/com.atproto.identity.resolveHandle", 55 + base_url().await 56 + )) 57 + .query(&params) 58 + .send() 59 + .await 60 + .expect("Failed to send request"); 61 + 62 + assert_eq!(res.status(), StatusCode::NOT_FOUND); 63 + let body: Value = res.json().await.expect("Response was not valid JSON"); 64 + assert_eq!(body["error"], "HandleNotFound"); 65 + } 66 + 67 + #[tokio::test] 68 + async fn test_resolve_handle_missing_param() { 69 + let client = client(); 70 + let res = client 71 + .get(format!( 72 + "{}/xrpc/com.atproto.identity.resolveHandle", 73 + base_url().await 74 + )) 75 + .send() 76 + .await 77 + .expect("Failed to send request"); 78 + 79 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 80 + } 22 81 23 82 #[tokio::test] 24 83 async fn test_well_known_did() {
+1057 -1
tests/lifecycle.rs
··· 2 2 use common::*; 3 3 4 4 use chrono::Utc; 5 - use reqwest; 5 + use reqwest::{self, StatusCode, header}; 6 6 use serde_json::{Value, json}; 7 7 use std::time::Duration; 8 8 ··· 564 564 "Only post 2 should remain" 565 565 ); 566 566 } 567 + 568 + #[tokio::test] 569 + async fn test_session_lifecycle_wrong_password() { 570 + let client = client(); 571 + let (_, _) = setup_new_user("session-wrong-pw").await; 572 + 573 + let login_payload = json!({ 574 + "identifier": format!("session-wrong-pw-{}.test", Utc::now().timestamp_millis()), 575 + "password": "wrong-password" 576 + }); 577 + 578 + let res = client 579 + .post(format!( 580 + "{}/xrpc/com.atproto.server.createSession", 581 + base_url().await 582 + )) 583 + .json(&login_payload) 584 + .send() 585 + .await 586 + .expect("Failed to send request"); 587 + 588 + assert!( 589 + res.status() == StatusCode::UNAUTHORIZED || res.status() == StatusCode::BAD_REQUEST, 590 + "Expected 401 or 400 for wrong password, got {}", 591 + res.status() 592 + ); 593 + } 594 + 595 + #[tokio::test] 596 + async fn test_session_lifecycle_multiple_sessions() { 597 + let client = client(); 598 + let ts = Utc::now().timestamp_millis(); 599 + let handle = format!("multi-session-{}.test", ts); 600 + let email = format!("multi-session-{}@test.com", ts); 601 + let password = "multi-session-pw"; 602 + 603 + let create_payload = json!({ 604 + "handle": handle, 605 + "email": email, 606 + "password": password 607 + }); 608 + let create_res = client 609 + .post(format!( 610 + "{}/xrpc/com.atproto.server.createAccount", 611 + base_url().await 612 + )) 613 + .json(&create_payload) 614 + .send() 615 + .await 616 + .expect("Failed to create account"); 617 + assert_eq!(create_res.status(), StatusCode::OK); 618 + 619 + let login_payload = json!({ 620 + "identifier": handle, 621 + "password": password 622 + }); 623 + 624 + let session1_res = client 625 + .post(format!( 626 + "{}/xrpc/com.atproto.server.createSession", 627 + base_url().await 628 + )) 629 + .json(&login_payload) 630 + .send() 631 + .await 632 + .expect("Failed session 1"); 633 + assert_eq!(session1_res.status(), StatusCode::OK); 634 + let session1: Value = session1_res.json().await.unwrap(); 635 + let jwt1 = session1["accessJwt"].as_str().unwrap(); 636 + 637 + let session2_res = client 638 + .post(format!( 639 + "{}/xrpc/com.atproto.server.createSession", 640 + base_url().await 641 + )) 642 + .json(&login_payload) 643 + .send() 644 + .await 645 + .expect("Failed session 2"); 646 + assert_eq!(session2_res.status(), StatusCode::OK); 647 + let session2: Value = session2_res.json().await.unwrap(); 648 + let jwt2 = session2["accessJwt"].as_str().unwrap(); 649 + 650 + assert_ne!(jwt1, jwt2, "Sessions should have different tokens"); 651 + 652 + let get1 = client 653 + .get(format!( 654 + "{}/xrpc/com.atproto.server.getSession", 655 + base_url().await 656 + )) 657 + .bearer_auth(jwt1) 658 + .send() 659 + .await 660 + .expect("Failed getSession 1"); 661 + assert_eq!(get1.status(), StatusCode::OK); 662 + 663 + let get2 = client 664 + .get(format!( 665 + "{}/xrpc/com.atproto.server.getSession", 666 + base_url().await 667 + )) 668 + .bearer_auth(jwt2) 669 + .send() 670 + .await 671 + .expect("Failed getSession 2"); 672 + assert_eq!(get2.status(), StatusCode::OK); 673 + } 674 + 675 + #[tokio::test] 676 + async fn test_session_lifecycle_refresh_invalidates_old() { 677 + let client = client(); 678 + let ts = Utc::now().timestamp_millis(); 679 + let handle = format!("refresh-inv-{}.test", ts); 680 + let email = format!("refresh-inv-{}@test.com", ts); 681 + let password = "refresh-inv-pw"; 682 + 683 + let create_payload = json!({ 684 + "handle": handle, 685 + "email": email, 686 + "password": password 687 + }); 688 + client 689 + .post(format!( 690 + "{}/xrpc/com.atproto.server.createAccount", 691 + base_url().await 692 + )) 693 + .json(&create_payload) 694 + .send() 695 + .await 696 + .expect("Failed to create account"); 697 + 698 + let login_payload = json!({ 699 + "identifier": handle, 700 + "password": password 701 + }); 702 + let login_res = client 703 + .post(format!( 704 + "{}/xrpc/com.atproto.server.createSession", 705 + base_url().await 706 + )) 707 + .json(&login_payload) 708 + .send() 709 + .await 710 + .expect("Failed login"); 711 + let login_body: Value = login_res.json().await.unwrap(); 712 + let refresh_jwt = login_body["refreshJwt"].as_str().unwrap().to_string(); 713 + 714 + let refresh_res = client 715 + .post(format!( 716 + "{}/xrpc/com.atproto.server.refreshSession", 717 + base_url().await 718 + )) 719 + .bearer_auth(&refresh_jwt) 720 + .send() 721 + .await 722 + .expect("Failed first refresh"); 723 + assert_eq!(refresh_res.status(), StatusCode::OK); 724 + let refresh_body: Value = refresh_res.json().await.unwrap(); 725 + let new_refresh_jwt = refresh_body["refreshJwt"].as_str().unwrap(); 726 + 727 + assert_ne!(refresh_jwt, new_refresh_jwt, "Refresh tokens should differ"); 728 + 729 + let reuse_res = client 730 + .post(format!( 731 + "{}/xrpc/com.atproto.server.refreshSession", 732 + base_url().await 733 + )) 734 + .bearer_auth(&refresh_jwt) 735 + .send() 736 + .await 737 + .expect("Failed reuse attempt"); 738 + 739 + assert!( 740 + reuse_res.status() == StatusCode::UNAUTHORIZED || reuse_res.status() == StatusCode::BAD_REQUEST, 741 + "Old refresh token should be invalid after use" 742 + ); 743 + } 744 + 745 + async fn create_like( 746 + client: &reqwest::Client, 747 + liker_did: &str, 748 + liker_jwt: &str, 749 + subject_uri: &str, 750 + subject_cid: &str, 751 + ) -> (String, String) { 752 + let collection = "app.bsky.feed.like"; 753 + let rkey = format!("e2e_like_{}", Utc::now().timestamp_millis()); 754 + let now = Utc::now().to_rfc3339(); 755 + 756 + let payload = json!({ 757 + "repo": liker_did, 758 + "collection": collection, 759 + "rkey": rkey, 760 + "record": { 761 + "$type": collection, 762 + "subject": { 763 + "uri": subject_uri, 764 + "cid": subject_cid 765 + }, 766 + "createdAt": now 767 + } 768 + }); 769 + 770 + let res = client 771 + .post(format!( 772 + "{}/xrpc/com.atproto.repo.putRecord", 773 + base_url().await 774 + )) 775 + .bearer_auth(liker_jwt) 776 + .json(&payload) 777 + .send() 778 + .await 779 + .expect("Failed to create like"); 780 + 781 + assert_eq!(res.status(), StatusCode::OK, "Failed to create like"); 782 + let body: Value = res.json().await.expect("Like response not JSON"); 783 + ( 784 + body["uri"].as_str().unwrap().to_string(), 785 + body["cid"].as_str().unwrap().to_string(), 786 + ) 787 + } 788 + 789 + async fn create_repost( 790 + client: &reqwest::Client, 791 + reposter_did: &str, 792 + reposter_jwt: &str, 793 + subject_uri: &str, 794 + subject_cid: &str, 795 + ) -> (String, String) { 796 + let collection = "app.bsky.feed.repost"; 797 + let rkey = format!("e2e_repost_{}", Utc::now().timestamp_millis()); 798 + let now = Utc::now().to_rfc3339(); 799 + 800 + let payload = json!({ 801 + "repo": reposter_did, 802 + "collection": collection, 803 + "rkey": rkey, 804 + "record": { 805 + "$type": collection, 806 + "subject": { 807 + "uri": subject_uri, 808 + "cid": subject_cid 809 + }, 810 + "createdAt": now 811 + } 812 + }); 813 + 814 + let res = client 815 + .post(format!( 816 + "{}/xrpc/com.atproto.repo.putRecord", 817 + base_url().await 818 + )) 819 + .bearer_auth(reposter_jwt) 820 + .json(&payload) 821 + .send() 822 + .await 823 + .expect("Failed to create repost"); 824 + 825 + assert_eq!(res.status(), StatusCode::OK, "Failed to create repost"); 826 + let body: Value = res.json().await.expect("Repost response not JSON"); 827 + ( 828 + body["uri"].as_str().unwrap().to_string(), 829 + body["cid"].as_str().unwrap().to_string(), 830 + ) 831 + } 832 + 833 + #[tokio::test] 834 + async fn test_profile_lifecycle() { 835 + let client = client(); 836 + let (did, jwt) = setup_new_user("profile-lifecycle").await; 837 + 838 + let profile_payload = json!({ 839 + "repo": did, 840 + "collection": "app.bsky.actor.profile", 841 + "rkey": "self", 842 + "record": { 843 + "$type": "app.bsky.actor.profile", 844 + "displayName": "Test User", 845 + "description": "A test profile for lifecycle testing" 846 + } 847 + }); 848 + 849 + let create_res = client 850 + .post(format!( 851 + "{}/xrpc/com.atproto.repo.putRecord", 852 + base_url().await 853 + )) 854 + .bearer_auth(&jwt) 855 + .json(&profile_payload) 856 + .send() 857 + .await 858 + .expect("Failed to create profile"); 859 + 860 + assert_eq!(create_res.status(), StatusCode::OK, "Failed to create profile"); 861 + let create_body: Value = create_res.json().await.unwrap(); 862 + let initial_cid = create_body["cid"].as_str().unwrap().to_string(); 863 + 864 + let get_res = client 865 + .get(format!( 866 + "{}/xrpc/com.atproto.repo.getRecord", 867 + base_url().await 868 + )) 869 + .query(&[ 870 + ("repo", did.as_str()), 871 + ("collection", "app.bsky.actor.profile"), 872 + ("rkey", "self"), 873 + ]) 874 + .send() 875 + .await 876 + .expect("Failed to get profile"); 877 + 878 + assert_eq!(get_res.status(), StatusCode::OK); 879 + let get_body: Value = get_res.json().await.unwrap(); 880 + assert_eq!(get_body["value"]["displayName"], "Test User"); 881 + assert_eq!(get_body["value"]["description"], "A test profile for lifecycle testing"); 882 + 883 + let update_payload = json!({ 884 + "repo": did, 885 + "collection": "app.bsky.actor.profile", 886 + "rkey": "self", 887 + "record": { 888 + "$type": "app.bsky.actor.profile", 889 + "displayName": "Updated User", 890 + "description": "Profile has been updated" 891 + }, 892 + "swapRecord": initial_cid 893 + }); 894 + 895 + let update_res = client 896 + .post(format!( 897 + "{}/xrpc/com.atproto.repo.putRecord", 898 + base_url().await 899 + )) 900 + .bearer_auth(&jwt) 901 + .json(&update_payload) 902 + .send() 903 + .await 904 + .expect("Failed to update profile"); 905 + 906 + assert_eq!(update_res.status(), StatusCode::OK, "Failed to update profile"); 907 + 908 + let get_updated_res = client 909 + .get(format!( 910 + "{}/xrpc/com.atproto.repo.getRecord", 911 + base_url().await 912 + )) 913 + .query(&[ 914 + ("repo", did.as_str()), 915 + ("collection", "app.bsky.actor.profile"), 916 + ("rkey", "self"), 917 + ]) 918 + .send() 919 + .await 920 + .expect("Failed to get updated profile"); 921 + 922 + let updated_body: Value = get_updated_res.json().await.unwrap(); 923 + assert_eq!(updated_body["value"]["displayName"], "Updated User"); 924 + } 925 + 926 + #[tokio::test] 927 + async fn test_reply_thread_lifecycle() { 928 + let client = client(); 929 + 930 + let (alice_did, alice_jwt) = setup_new_user("alice-thread").await; 931 + let (bob_did, bob_jwt) = setup_new_user("bob-thread").await; 932 + 933 + let (root_uri, root_cid) = create_post(&client, &alice_did, &alice_jwt, "This is the root post").await; 934 + 935 + tokio::time::sleep(Duration::from_millis(100)).await; 936 + 937 + let reply_collection = "app.bsky.feed.post"; 938 + let reply_rkey = format!("e2e_reply_{}", Utc::now().timestamp_millis()); 939 + let now = Utc::now().to_rfc3339(); 940 + 941 + let reply_payload = json!({ 942 + "repo": bob_did, 943 + "collection": reply_collection, 944 + "rkey": reply_rkey, 945 + "record": { 946 + "$type": reply_collection, 947 + "text": "This is Bob's reply to Alice", 948 + "createdAt": now, 949 + "reply": { 950 + "root": { 951 + "uri": root_uri, 952 + "cid": root_cid 953 + }, 954 + "parent": { 955 + "uri": root_uri, 956 + "cid": root_cid 957 + } 958 + } 959 + } 960 + }); 961 + 962 + let reply_res = client 963 + .post(format!( 964 + "{}/xrpc/com.atproto.repo.putRecord", 965 + base_url().await 966 + )) 967 + .bearer_auth(&bob_jwt) 968 + .json(&reply_payload) 969 + .send() 970 + .await 971 + .expect("Failed to create reply"); 972 + 973 + assert_eq!(reply_res.status(), StatusCode::OK, "Failed to create reply"); 974 + let reply_body: Value = reply_res.json().await.unwrap(); 975 + let reply_uri = reply_body["uri"].as_str().unwrap(); 976 + let reply_cid = reply_body["cid"].as_str().unwrap(); 977 + 978 + let get_reply_res = client 979 + .get(format!( 980 + "{}/xrpc/com.atproto.repo.getRecord", 981 + base_url().await 982 + )) 983 + .query(&[ 984 + ("repo", bob_did.as_str()), 985 + ("collection", reply_collection), 986 + ("rkey", reply_rkey.as_str()), 987 + ]) 988 + .send() 989 + .await 990 + .expect("Failed to get reply"); 991 + 992 + assert_eq!(get_reply_res.status(), StatusCode::OK); 993 + let reply_record: Value = get_reply_res.json().await.unwrap(); 994 + assert_eq!(reply_record["value"]["reply"]["root"]["uri"], root_uri); 995 + assert_eq!(reply_record["value"]["reply"]["parent"]["uri"], root_uri); 996 + 997 + tokio::time::sleep(Duration::from_millis(100)).await; 998 + 999 + let nested_reply_rkey = format!("e2e_nested_reply_{}", Utc::now().timestamp_millis()); 1000 + let nested_payload = json!({ 1001 + "repo": alice_did, 1002 + "collection": reply_collection, 1003 + "rkey": nested_reply_rkey, 1004 + "record": { 1005 + "$type": reply_collection, 1006 + "text": "Alice replies to Bob's reply", 1007 + "createdAt": Utc::now().to_rfc3339(), 1008 + "reply": { 1009 + "root": { 1010 + "uri": root_uri, 1011 + "cid": root_cid 1012 + }, 1013 + "parent": { 1014 + "uri": reply_uri, 1015 + "cid": reply_cid 1016 + } 1017 + } 1018 + } 1019 + }); 1020 + 1021 + let nested_res = client 1022 + .post(format!( 1023 + "{}/xrpc/com.atproto.repo.putRecord", 1024 + base_url().await 1025 + )) 1026 + .bearer_auth(&alice_jwt) 1027 + .json(&nested_payload) 1028 + .send() 1029 + .await 1030 + .expect("Failed to create nested reply"); 1031 + 1032 + assert_eq!(nested_res.status(), StatusCode::OK, "Failed to create nested reply"); 1033 + } 1034 + 1035 + #[tokio::test] 1036 + async fn test_like_lifecycle() { 1037 + let client = client(); 1038 + 1039 + let (alice_did, alice_jwt) = setup_new_user("alice-like").await; 1040 + let (bob_did, bob_jwt) = setup_new_user("bob-like").await; 1041 + 1042 + let (post_uri, post_cid) = create_post(&client, &alice_did, &alice_jwt, "Like this post!").await; 1043 + 1044 + let (like_uri, _) = create_like(&client, &bob_did, &bob_jwt, &post_uri, &post_cid).await; 1045 + 1046 + let like_rkey = like_uri.split('/').last().unwrap(); 1047 + let get_like_res = client 1048 + .get(format!( 1049 + "{}/xrpc/com.atproto.repo.getRecord", 1050 + base_url().await 1051 + )) 1052 + .query(&[ 1053 + ("repo", bob_did.as_str()), 1054 + ("collection", "app.bsky.feed.like"), 1055 + ("rkey", like_rkey), 1056 + ]) 1057 + .send() 1058 + .await 1059 + .expect("Failed to get like"); 1060 + 1061 + assert_eq!(get_like_res.status(), StatusCode::OK); 1062 + let like_body: Value = get_like_res.json().await.unwrap(); 1063 + assert_eq!(like_body["value"]["subject"]["uri"], post_uri); 1064 + 1065 + let delete_payload = json!({ 1066 + "repo": bob_did, 1067 + "collection": "app.bsky.feed.like", 1068 + "rkey": like_rkey 1069 + }); 1070 + 1071 + let delete_res = client 1072 + .post(format!( 1073 + "{}/xrpc/com.atproto.repo.deleteRecord", 1074 + base_url().await 1075 + )) 1076 + .bearer_auth(&bob_jwt) 1077 + .json(&delete_payload) 1078 + .send() 1079 + .await 1080 + .expect("Failed to delete like"); 1081 + 1082 + assert_eq!(delete_res.status(), StatusCode::OK, "Failed to delete like"); 1083 + 1084 + let get_deleted_res = client 1085 + .get(format!( 1086 + "{}/xrpc/com.atproto.repo.getRecord", 1087 + base_url().await 1088 + )) 1089 + .query(&[ 1090 + ("repo", bob_did.as_str()), 1091 + ("collection", "app.bsky.feed.like"), 1092 + ("rkey", like_rkey), 1093 + ]) 1094 + .send() 1095 + .await 1096 + .expect("Failed to check deleted like"); 1097 + 1098 + assert_eq!(get_deleted_res.status(), StatusCode::NOT_FOUND, "Like should be deleted"); 1099 + } 1100 + 1101 + #[tokio::test] 1102 + async fn test_repost_lifecycle() { 1103 + let client = client(); 1104 + 1105 + let (alice_did, alice_jwt) = setup_new_user("alice-repost").await; 1106 + let (bob_did, bob_jwt) = setup_new_user("bob-repost").await; 1107 + 1108 + let (post_uri, post_cid) = create_post(&client, &alice_did, &alice_jwt, "Repost this!").await; 1109 + 1110 + let (repost_uri, _) = create_repost(&client, &bob_did, &bob_jwt, &post_uri, &post_cid).await; 1111 + 1112 + let repost_rkey = repost_uri.split('/').last().unwrap(); 1113 + let get_repost_res = client 1114 + .get(format!( 1115 + "{}/xrpc/com.atproto.repo.getRecord", 1116 + base_url().await 1117 + )) 1118 + .query(&[ 1119 + ("repo", bob_did.as_str()), 1120 + ("collection", "app.bsky.feed.repost"), 1121 + ("rkey", repost_rkey), 1122 + ]) 1123 + .send() 1124 + .await 1125 + .expect("Failed to get repost"); 1126 + 1127 + assert_eq!(get_repost_res.status(), StatusCode::OK); 1128 + let repost_body: Value = get_repost_res.json().await.unwrap(); 1129 + assert_eq!(repost_body["value"]["subject"]["uri"], post_uri); 1130 + 1131 + let delete_payload = json!({ 1132 + "repo": bob_did, 1133 + "collection": "app.bsky.feed.repost", 1134 + "rkey": repost_rkey 1135 + }); 1136 + 1137 + let delete_res = client 1138 + .post(format!( 1139 + "{}/xrpc/com.atproto.repo.deleteRecord", 1140 + base_url().await 1141 + )) 1142 + .bearer_auth(&bob_jwt) 1143 + .json(&delete_payload) 1144 + .send() 1145 + .await 1146 + .expect("Failed to delete repost"); 1147 + 1148 + assert_eq!(delete_res.status(), StatusCode::OK, "Failed to delete repost"); 1149 + } 1150 + 1151 + #[tokio::test] 1152 + async fn test_unfollow_lifecycle() { 1153 + let client = client(); 1154 + 1155 + let (alice_did, _alice_jwt) = setup_new_user("alice-unfollow").await; 1156 + let (bob_did, bob_jwt) = setup_new_user("bob-unfollow").await; 1157 + 1158 + let (follow_uri, _) = create_follow(&client, &bob_did, &bob_jwt, &alice_did).await; 1159 + 1160 + let follow_rkey = follow_uri.split('/').last().unwrap(); 1161 + let get_follow_res = client 1162 + .get(format!( 1163 + "{}/xrpc/com.atproto.repo.getRecord", 1164 + base_url().await 1165 + )) 1166 + .query(&[ 1167 + ("repo", bob_did.as_str()), 1168 + ("collection", "app.bsky.graph.follow"), 1169 + ("rkey", follow_rkey), 1170 + ]) 1171 + .send() 1172 + .await 1173 + .expect("Failed to get follow"); 1174 + 1175 + assert_eq!(get_follow_res.status(), StatusCode::OK); 1176 + 1177 + let unfollow_payload = json!({ 1178 + "repo": bob_did, 1179 + "collection": "app.bsky.graph.follow", 1180 + "rkey": follow_rkey 1181 + }); 1182 + 1183 + let unfollow_res = client 1184 + .post(format!( 1185 + "{}/xrpc/com.atproto.repo.deleteRecord", 1186 + base_url().await 1187 + )) 1188 + .bearer_auth(&bob_jwt) 1189 + .json(&unfollow_payload) 1190 + .send() 1191 + .await 1192 + .expect("Failed to unfollow"); 1193 + 1194 + assert_eq!(unfollow_res.status(), StatusCode::OK, "Failed to unfollow"); 1195 + 1196 + let get_deleted_res = client 1197 + .get(format!( 1198 + "{}/xrpc/com.atproto.repo.getRecord", 1199 + base_url().await 1200 + )) 1201 + .query(&[ 1202 + ("repo", bob_did.as_str()), 1203 + ("collection", "app.bsky.graph.follow"), 1204 + ("rkey", follow_rkey), 1205 + ]) 1206 + .send() 1207 + .await 1208 + .expect("Failed to check deleted follow"); 1209 + 1210 + assert_eq!(get_deleted_res.status(), StatusCode::NOT_FOUND, "Follow should be deleted"); 1211 + } 1212 + 1213 + #[tokio::test] 1214 + async fn test_timeline_after_unfollow() { 1215 + let client = client(); 1216 + 1217 + let (alice_did, alice_jwt) = setup_new_user("alice-tl-unfollow").await; 1218 + let (bob_did, bob_jwt) = setup_new_user("bob-tl-unfollow").await; 1219 + 1220 + let (follow_uri, _) = create_follow(&client, &bob_did, &bob_jwt, &alice_did).await; 1221 + 1222 + create_post(&client, &alice_did, &alice_jwt, "Post while following").await; 1223 + 1224 + tokio::time::sleep(Duration::from_secs(1)).await; 1225 + 1226 + let timeline_res = client 1227 + .get(format!( 1228 + "{}/xrpc/app.bsky.feed.getTimeline", 1229 + base_url().await 1230 + )) 1231 + .bearer_auth(&bob_jwt) 1232 + .send() 1233 + .await 1234 + .expect("Failed to get timeline"); 1235 + 1236 + assert_eq!(timeline_res.status(), StatusCode::OK); 1237 + let timeline_body: Value = timeline_res.json().await.unwrap(); 1238 + let feed = timeline_body["feed"].as_array().unwrap(); 1239 + assert_eq!(feed.len(), 1, "Should see 1 post from Alice"); 1240 + 1241 + let follow_rkey = follow_uri.split('/').last().unwrap(); 1242 + let unfollow_payload = json!({ 1243 + "repo": bob_did, 1244 + "collection": "app.bsky.graph.follow", 1245 + "rkey": follow_rkey 1246 + }); 1247 + client 1248 + .post(format!( 1249 + "{}/xrpc/com.atproto.repo.deleteRecord", 1250 + base_url().await 1251 + )) 1252 + .bearer_auth(&bob_jwt) 1253 + .json(&unfollow_payload) 1254 + .send() 1255 + .await 1256 + .expect("Failed to unfollow"); 1257 + 1258 + tokio::time::sleep(Duration::from_secs(1)).await; 1259 + 1260 + let timeline_after_res = client 1261 + .get(format!( 1262 + "{}/xrpc/app.bsky.feed.getTimeline", 1263 + base_url().await 1264 + )) 1265 + .bearer_auth(&bob_jwt) 1266 + .send() 1267 + .await 1268 + .expect("Failed to get timeline after unfollow"); 1269 + 1270 + assert_eq!(timeline_after_res.status(), StatusCode::OK); 1271 + let timeline_after: Value = timeline_after_res.json().await.unwrap(); 1272 + let feed_after = timeline_after["feed"].as_array().unwrap(); 1273 + assert_eq!(feed_after.len(), 0, "Should see 0 posts after unfollowing"); 1274 + } 1275 + 1276 + #[tokio::test] 1277 + async fn test_blob_in_record_lifecycle() { 1278 + let client = client(); 1279 + let (did, jwt) = setup_new_user("blob-record").await; 1280 + 1281 + let blob_data = b"This is test blob data for a profile avatar"; 1282 + let upload_res = client 1283 + .post(format!( 1284 + "{}/xrpc/com.atproto.repo.uploadBlob", 1285 + base_url().await 1286 + )) 1287 + .header(header::CONTENT_TYPE, "text/plain") 1288 + .bearer_auth(&jwt) 1289 + .body(blob_data.to_vec()) 1290 + .send() 1291 + .await 1292 + .expect("Failed to upload blob"); 1293 + 1294 + assert_eq!(upload_res.status(), StatusCode::OK); 1295 + let upload_body: Value = upload_res.json().await.unwrap(); 1296 + let blob_ref = upload_body["blob"].clone(); 1297 + 1298 + let profile_payload = json!({ 1299 + "repo": did, 1300 + "collection": "app.bsky.actor.profile", 1301 + "rkey": "self", 1302 + "record": { 1303 + "$type": "app.bsky.actor.profile", 1304 + "displayName": "User With Avatar", 1305 + "avatar": blob_ref 1306 + } 1307 + }); 1308 + 1309 + let create_res = client 1310 + .post(format!( 1311 + "{}/xrpc/com.atproto.repo.putRecord", 1312 + base_url().await 1313 + )) 1314 + .bearer_auth(&jwt) 1315 + .json(&profile_payload) 1316 + .send() 1317 + .await 1318 + .expect("Failed to create profile with blob"); 1319 + 1320 + assert_eq!(create_res.status(), StatusCode::OK, "Failed to create profile with blob"); 1321 + 1322 + let get_res = client 1323 + .get(format!( 1324 + "{}/xrpc/com.atproto.repo.getRecord", 1325 + base_url().await 1326 + )) 1327 + .query(&[ 1328 + ("repo", did.as_str()), 1329 + ("collection", "app.bsky.actor.profile"), 1330 + ("rkey", "self"), 1331 + ]) 1332 + .send() 1333 + .await 1334 + .expect("Failed to get profile"); 1335 + 1336 + assert_eq!(get_res.status(), StatusCode::OK); 1337 + let profile: Value = get_res.json().await.unwrap(); 1338 + assert!(profile["value"]["avatar"]["ref"]["$link"].is_string()); 1339 + } 1340 + 1341 + #[tokio::test] 1342 + async fn test_authorization_cannot_modify_other_repo() { 1343 + let client = client(); 1344 + 1345 + let (alice_did, _alice_jwt) = setup_new_user("alice-auth").await; 1346 + let (_bob_did, bob_jwt) = setup_new_user("bob-auth").await; 1347 + 1348 + let post_payload = json!({ 1349 + "repo": alice_did, 1350 + "collection": "app.bsky.feed.post", 1351 + "rkey": "unauthorized-post", 1352 + "record": { 1353 + "$type": "app.bsky.feed.post", 1354 + "text": "Bob trying to post as Alice", 1355 + "createdAt": Utc::now().to_rfc3339() 1356 + } 1357 + }); 1358 + 1359 + let res = client 1360 + .post(format!( 1361 + "{}/xrpc/com.atproto.repo.putRecord", 1362 + base_url().await 1363 + )) 1364 + .bearer_auth(&bob_jwt) 1365 + .json(&post_payload) 1366 + .send() 1367 + .await 1368 + .expect("Failed to send request"); 1369 + 1370 + assert!( 1371 + res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::UNAUTHORIZED, 1372 + "Expected 403 or 401 when writing to another user's repo, got {}", 1373 + res.status() 1374 + ); 1375 + } 1376 + 1377 + #[tokio::test] 1378 + async fn test_authorization_cannot_delete_other_record() { 1379 + let client = client(); 1380 + 1381 + let (alice_did, alice_jwt) = setup_new_user("alice-del-auth").await; 1382 + let (_bob_did, bob_jwt) = setup_new_user("bob-del-auth").await; 1383 + 1384 + let (post_uri, _) = create_post(&client, &alice_did, &alice_jwt, "Alice's post").await; 1385 + let post_rkey = post_uri.split('/').last().unwrap(); 1386 + 1387 + let delete_payload = json!({ 1388 + "repo": alice_did, 1389 + "collection": "app.bsky.feed.post", 1390 + "rkey": post_rkey 1391 + }); 1392 + 1393 + let res = client 1394 + .post(format!( 1395 + "{}/xrpc/com.atproto.repo.deleteRecord", 1396 + base_url().await 1397 + )) 1398 + .bearer_auth(&bob_jwt) 1399 + .json(&delete_payload) 1400 + .send() 1401 + .await 1402 + .expect("Failed to send request"); 1403 + 1404 + assert!( 1405 + res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::UNAUTHORIZED, 1406 + "Expected 403 or 401 when deleting another user's record, got {}", 1407 + res.status() 1408 + ); 1409 + 1410 + let get_res = client 1411 + .get(format!( 1412 + "{}/xrpc/com.atproto.repo.getRecord", 1413 + base_url().await 1414 + )) 1415 + .query(&[ 1416 + ("repo", alice_did.as_str()), 1417 + ("collection", "app.bsky.feed.post"), 1418 + ("rkey", post_rkey), 1419 + ]) 1420 + .send() 1421 + .await 1422 + .expect("Failed to verify record exists"); 1423 + 1424 + assert_eq!(get_res.status(), StatusCode::OK, "Record should still exist"); 1425 + } 1426 + 1427 + #[tokio::test] 1428 + async fn test_list_records_pagination() { 1429 + let client = client(); 1430 + let (did, jwt) = setup_new_user("list-pagination").await; 1431 + 1432 + for i in 0..5 { 1433 + tokio::time::sleep(Duration::from_millis(50)).await; 1434 + create_post(&client, &did, &jwt, &format!("Post number {}", i)).await; 1435 + } 1436 + 1437 + let list_res = client 1438 + .get(format!( 1439 + "{}/xrpc/com.atproto.repo.listRecords", 1440 + base_url().await 1441 + )) 1442 + .query(&[ 1443 + ("repo", did.as_str()), 1444 + ("collection", "app.bsky.feed.post"), 1445 + ("limit", "2"), 1446 + ]) 1447 + .send() 1448 + .await 1449 + .expect("Failed to list records"); 1450 + 1451 + assert_eq!(list_res.status(), StatusCode::OK); 1452 + let list_body: Value = list_res.json().await.unwrap(); 1453 + let records = list_body["records"].as_array().unwrap(); 1454 + assert_eq!(records.len(), 2, "Should return 2 records with limit=2"); 1455 + 1456 + if let Some(cursor) = list_body["cursor"].as_str() { 1457 + let list_page2_res = client 1458 + .get(format!( 1459 + "{}/xrpc/com.atproto.repo.listRecords", 1460 + base_url().await 1461 + )) 1462 + .query(&[ 1463 + ("repo", did.as_str()), 1464 + ("collection", "app.bsky.feed.post"), 1465 + ("limit", "2"), 1466 + ("cursor", cursor), 1467 + ]) 1468 + .send() 1469 + .await 1470 + .expect("Failed to list records page 2"); 1471 + 1472 + assert_eq!(list_page2_res.status(), StatusCode::OK); 1473 + let page2_body: Value = list_page2_res.json().await.unwrap(); 1474 + let page2_records = page2_body["records"].as_array().unwrap(); 1475 + assert_eq!(page2_records.len(), 2, "Page 2 should have 2 more records"); 1476 + } 1477 + } 1478 + 1479 + #[tokio::test] 1480 + async fn test_mutual_follow_lifecycle() { 1481 + let client = client(); 1482 + 1483 + let (alice_did, alice_jwt) = setup_new_user("alice-mutual").await; 1484 + let (bob_did, bob_jwt) = setup_new_user("bob-mutual").await; 1485 + 1486 + create_follow(&client, &alice_did, &alice_jwt, &bob_did).await; 1487 + create_follow(&client, &bob_did, &bob_jwt, &alice_did).await; 1488 + 1489 + create_post(&client, &alice_did, &alice_jwt, "Alice's post for mutual").await; 1490 + create_post(&client, &bob_did, &bob_jwt, "Bob's post for mutual").await; 1491 + 1492 + tokio::time::sleep(Duration::from_secs(1)).await; 1493 + 1494 + let alice_timeline_res = client 1495 + .get(format!( 1496 + "{}/xrpc/app.bsky.feed.getTimeline", 1497 + base_url().await 1498 + )) 1499 + .bearer_auth(&alice_jwt) 1500 + .send() 1501 + .await 1502 + .expect("Failed to get Alice's timeline"); 1503 + 1504 + assert_eq!(alice_timeline_res.status(), StatusCode::OK); 1505 + let alice_tl: Value = alice_timeline_res.json().await.unwrap(); 1506 + let alice_feed = alice_tl["feed"].as_array().unwrap(); 1507 + assert_eq!(alice_feed.len(), 1, "Alice should see Bob's 1 post"); 1508 + 1509 + let bob_timeline_res = client 1510 + .get(format!( 1511 + "{}/xrpc/app.bsky.feed.getTimeline", 1512 + base_url().await 1513 + )) 1514 + .bearer_auth(&bob_jwt) 1515 + .send() 1516 + .await 1517 + .expect("Failed to get Bob's timeline"); 1518 + 1519 + assert_eq!(bob_timeline_res.status(), StatusCode::OK); 1520 + let bob_tl: Value = bob_timeline_res.json().await.unwrap(); 1521 + let bob_feed = bob_tl["feed"].as_array().unwrap(); 1522 + assert_eq!(bob_feed.len(), 1, "Bob should see Alice's 1 post"); 1523 + } 1524 + 1525 + #[tokio::test] 1526 + async fn test_account_to_post_full_lifecycle() { 1527 + let client = client(); 1528 + let ts = Utc::now().timestamp_millis(); 1529 + let handle = format!("fullcycle-{}.test", ts); 1530 + let email = format!("fullcycle-{}@test.com", ts); 1531 + let password = "fullcycle-password"; 1532 + 1533 + let create_account_res = client 1534 + .post(format!( 1535 + "{}/xrpc/com.atproto.server.createAccount", 1536 + base_url().await 1537 + )) 1538 + .json(&json!({ 1539 + "handle": handle, 1540 + "email": email, 1541 + "password": password 1542 + })) 1543 + .send() 1544 + .await 1545 + .expect("Failed to create account"); 1546 + 1547 + assert_eq!(create_account_res.status(), StatusCode::OK); 1548 + let account_body: Value = create_account_res.json().await.unwrap(); 1549 + let did = account_body["did"].as_str().unwrap().to_string(); 1550 + let access_jwt = account_body["accessJwt"].as_str().unwrap().to_string(); 1551 + 1552 + let get_session_res = client 1553 + .get(format!( 1554 + "{}/xrpc/com.atproto.server.getSession", 1555 + base_url().await 1556 + )) 1557 + .bearer_auth(&access_jwt) 1558 + .send() 1559 + .await 1560 + .expect("Failed to get session"); 1561 + 1562 + assert_eq!(get_session_res.status(), StatusCode::OK); 1563 + let session_body: Value = get_session_res.json().await.unwrap(); 1564 + assert_eq!(session_body["did"], did); 1565 + assert_eq!(session_body["handle"], handle); 1566 + 1567 + let profile_res = client 1568 + .post(format!( 1569 + "{}/xrpc/com.atproto.repo.putRecord", 1570 + base_url().await 1571 + )) 1572 + .bearer_auth(&access_jwt) 1573 + .json(&json!({ 1574 + "repo": did, 1575 + "collection": "app.bsky.actor.profile", 1576 + "rkey": "self", 1577 + "record": { 1578 + "$type": "app.bsky.actor.profile", 1579 + "displayName": "Full Cycle User" 1580 + } 1581 + })) 1582 + .send() 1583 + .await 1584 + .expect("Failed to create profile"); 1585 + 1586 + assert_eq!(profile_res.status(), StatusCode::OK); 1587 + 1588 + let (post_uri, post_cid) = create_post(&client, &did, &access_jwt, "My first post!").await; 1589 + 1590 + let get_post_res = client 1591 + .get(format!( 1592 + "{}/xrpc/com.atproto.repo.getRecord", 1593 + base_url().await 1594 + )) 1595 + .query(&[ 1596 + ("repo", did.as_str()), 1597 + ("collection", "app.bsky.feed.post"), 1598 + ("rkey", post_uri.split('/').last().unwrap()), 1599 + ]) 1600 + .send() 1601 + .await 1602 + .expect("Failed to get post"); 1603 + 1604 + assert_eq!(get_post_res.status(), StatusCode::OK); 1605 + 1606 + create_like(&client, &did, &access_jwt, &post_uri, &post_cid).await; 1607 + 1608 + let describe_res = client 1609 + .get(format!( 1610 + "{}/xrpc/com.atproto.repo.describeRepo", 1611 + base_url().await 1612 + )) 1613 + .query(&[("repo", did.as_str())]) 1614 + .send() 1615 + .await 1616 + .expect("Failed to describe repo"); 1617 + 1618 + assert_eq!(describe_res.status(), StatusCode::OK); 1619 + let describe_body: Value = describe_res.json().await.unwrap(); 1620 + assert_eq!(describe_body["did"], did); 1621 + assert_eq!(describe_body["handle"], handle); 1622 + }
-43
tests/proxy.rs
··· 61 61 assert_eq!(auth, Some("Bearer test-token".to_string())); 62 62 } 63 63 64 - #[tokio::test] 65 - #[ignore] 66 - async fn test_proxy_via_env_var() { 67 - let (upstream_url, mut rx) = spawn_mock_upstream().await; 68 - 69 - unsafe { 70 - std::env::set_var("APPVIEW_URL", &upstream_url); 71 - } 72 - 73 - let app_url = common::base_url().await; 74 - let client = Client::new(); 75 - 76 - let res = client 77 - .get(format!("{}/xrpc/com.example.envtest", app_url)) 78 - .send() 79 - .await 80 - .unwrap(); 81 - 82 - assert_eq!(res.status(), StatusCode::OK); 83 - 84 - let (method, uri, _) = rx.recv().await.expect("Upstream should receive request"); 85 - assert_eq!(method, "GET"); 86 - assert_eq!(uri, "/xrpc/com.example.envtest"); 87 - } 88 - 89 - #[tokio::test] 90 - #[ignore] 91 - async fn test_proxy_missing_config() { 92 - unsafe { 93 - std::env::remove_var("APPVIEW_URL"); 94 - } 95 - 96 - let app_url = common::base_url().await; 97 - let client = Client::new(); 98 - 99 - let res = client 100 - .get(format!("{}/xrpc/com.example.fail", app_url)) 101 - .send() 102 - .await 103 - .unwrap(); 104 - 105 - assert_eq!(res.status(), StatusCode::BAD_GATEWAY); 106 - } 107 64 108 65 #[tokio::test] 109 66 async fn test_proxy_auth_signing() {
+386 -49
tests/repo.rs
··· 6 6 use serde_json::{Value, json}; 7 7 8 8 #[tokio::test] 9 - #[ignore] 10 - async fn test_get_record() { 11 - let client = client(); 12 - let params = [ 13 - ("repo", "did:plc:12345"), 14 - ("collection", "app.bsky.actor.profile"), 15 - ("rkey", "self"), 16 - ]; 17 - 18 - let res = client 19 - .get(format!( 20 - "{}/xrpc/com.atproto.repo.getRecord", 21 - base_url().await 22 - )) 23 - .query(&params) 24 - .send() 25 - .await 26 - .expect("Failed to send request"); 27 - 28 - assert_eq!(res.status(), StatusCode::OK); 29 - let body: Value = res.json().await.expect("Response was not valid JSON"); 30 - assert_eq!(body["value"]["$type"], "app.bsky.actor.profile"); 31 - } 32 - 33 - #[tokio::test] 34 - #[ignore] 35 9 async fn test_get_record_not_found() { 36 10 let client = client(); 11 + let (_, did) = create_account_and_login(&client).await; 12 + 37 13 let params = [ 38 - ("repo", "did:plc:12345"), 14 + ("repo", did.as_str()), 39 15 ("collection", "app.bsky.feed.post"), 40 16 ("rkey", "nonexistent"), 41 17 ]; ··· 51 27 .expect("Failed to send request"); 52 28 53 29 assert_eq!(res.status(), StatusCode::NOT_FOUND); 54 - let body: Value = res.json().await.expect("Response was not valid JSON"); 55 - assert_eq!(body["error"], "NotFound"); 56 30 } 57 31 58 32 #[tokio::test] ··· 96 70 } 97 71 98 72 #[tokio::test] 99 - #[ignore] 100 73 async fn test_put_record_no_auth() { 101 74 let client = client(); 102 75 let payload = json!({ ··· 118 91 119 92 assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 120 93 let body: Value = res.json().await.expect("Response was not valid JSON"); 121 - assert_eq!(body["error"], "AuthenticationFailed"); 94 + assert_eq!(body["error"], "AuthenticationRequired"); 122 95 } 123 96 124 97 #[tokio::test] 125 - #[ignore] 126 98 async fn test_put_record_success() { 127 99 let client = client(); 128 100 let (token, did) = create_account_and_login(&client).await; ··· 156 128 } 157 129 158 130 #[tokio::test] 159 - #[ignore] 160 131 async fn test_get_record_missing_params() { 161 132 let client = client(); 162 133 let params = [("repo", "did:plc:12345")]; ··· 199 170 } 200 171 201 172 #[tokio::test] 202 - #[ignore] 203 173 async fn test_put_record_mismatched_repo() { 204 174 let client = client(); 205 175 let (token, _) = create_account_and_login(&client).await; 206 176 let now = Utc::now().to_rfc3339(); 207 177 let payload = json!({ 208 - "repo": "did:plc:OTHER-USER", // This does NOT match AUTH_DID 178 + "repo": "did:plc:OTHER-USER", 209 179 "collection": "app.bsky.feed.post", 210 180 "rkey": "e2e_test_post", 211 181 "record": { ··· 226 196 .await 227 197 .expect("Failed to send request"); 228 198 229 - assert_eq!( 230 - res.status(), 231 - StatusCode::FORBIDDEN, 232 - "Expected 403 for mismatched repo and auth" 199 + assert!( 200 + res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::UNAUTHORIZED, 201 + "Expected 403 or 401 for mismatched repo and auth, got {}", 202 + res.status() 233 203 ); 234 204 } 235 205 ··· 328 298 } 329 299 330 300 #[tokio::test] 331 - #[ignore] 332 301 async fn test_create_record_success_with_generated_rkey() { 333 302 let client = client(); 334 303 let (token, did) = create_account_and_login(&client).await; ··· 357 326 let body: Value = res.json().await.expect("Response was not valid JSON"); 358 327 let uri = body["uri"].as_str().unwrap(); 359 328 assert!(uri.starts_with(&format!("at://{}/app.bsky.feed.post/", did))); 360 - // assert_eq!(body["cid"], "bafyreihy"); 329 + assert!(body.get("cid").is_some()); 361 330 } 362 331 363 332 #[tokio::test] 364 - #[ignore] 365 333 async fn test_create_record_success_with_provided_rkey() { 366 334 let client = client(); 367 335 let (token, did) = create_account_and_login(&client).await; 368 - let rkey = "custom-rkey"; 336 + let rkey = format!("custom-rkey-{}", Utc::now().timestamp_millis()); 369 337 let payload = json!({ 370 338 "repo": did, 371 339 "collection": "app.bsky.feed.post", ··· 394 362 body["uri"], 395 363 format!("at://{}/app.bsky.feed.post/{}", did, rkey) 396 364 ); 397 - // assert_eq!(body["cid"], "bafyreihy"); 365 + assert!(body.get("cid").is_some()); 398 366 } 399 367 400 368 #[tokio::test] 401 - #[ignore] 402 369 async fn test_delete_record() { 403 370 let client = client(); 404 371 let (token, did) = create_account_and_login(&client).await; 372 + let rkey = format!("post_to_delete_{}", Utc::now().timestamp_millis()); 373 + 374 + let create_payload = json!({ 375 + "repo": did, 376 + "collection": "app.bsky.feed.post", 377 + "rkey": rkey, 378 + "record": { 379 + "$type": "app.bsky.feed.post", 380 + "text": "This post will be deleted", 381 + "createdAt": Utc::now().to_rfc3339() 382 + } 383 + }); 384 + let create_res = client 385 + .post(format!( 386 + "{}/xrpc/com.atproto.repo.putRecord", 387 + base_url().await 388 + )) 389 + .bearer_auth(&token) 390 + .json(&create_payload) 391 + .send() 392 + .await 393 + .expect("Failed to create record"); 394 + assert_eq!(create_res.status(), StatusCode::OK); 395 + 396 + let delete_payload = json!({ 397 + "repo": did, 398 + "collection": "app.bsky.feed.post", 399 + "rkey": rkey 400 + }); 401 + let delete_res = client 402 + .post(format!( 403 + "{}/xrpc/com.atproto.repo.deleteRecord", 404 + base_url().await 405 + )) 406 + .bearer_auth(&token) 407 + .json(&delete_payload) 408 + .send() 409 + .await 410 + .expect("Failed to send request"); 411 + 412 + assert_eq!(delete_res.status(), StatusCode::OK); 413 + 414 + let get_res = client 415 + .get(format!( 416 + "{}/xrpc/com.atproto.repo.getRecord", 417 + base_url().await 418 + )) 419 + .query(&[ 420 + ("repo", did.as_str()), 421 + ("collection", "app.bsky.feed.post"), 422 + ("rkey", rkey.as_str()), 423 + ]) 424 + .send() 425 + .await 426 + .expect("Failed to verify deletion"); 427 + assert_eq!(get_res.status(), StatusCode::NOT_FOUND); 428 + } 429 + 430 + #[tokio::test] 431 + async fn test_apply_writes_create() { 432 + let client = client(); 433 + let (token, did) = create_account_and_login(&client).await; 434 + let now = Utc::now().to_rfc3339(); 435 + 405 436 let payload = json!({ 406 437 "repo": did, 438 + "writes": [ 439 + { 440 + "$type": "com.atproto.repo.applyWrites#create", 441 + "collection": "app.bsky.feed.post", 442 + "value": { 443 + "$type": "app.bsky.feed.post", 444 + "text": "Batch created post 1", 445 + "createdAt": now 446 + } 447 + }, 448 + { 449 + "$type": "com.atproto.repo.applyWrites#create", 450 + "collection": "app.bsky.feed.post", 451 + "value": { 452 + "$type": "app.bsky.feed.post", 453 + "text": "Batch created post 2", 454 + "createdAt": now 455 + } 456 + } 457 + ] 458 + }); 459 + 460 + let res = client 461 + .post(format!( 462 + "{}/xrpc/com.atproto.repo.applyWrites", 463 + base_url().await 464 + )) 465 + .bearer_auth(&token) 466 + .json(&payload) 467 + .send() 468 + .await 469 + .expect("Failed to send request"); 470 + 471 + assert_eq!(res.status(), StatusCode::OK); 472 + let body: Value = res.json().await.expect("Response was not valid JSON"); 473 + assert!(body["commit"]["cid"].is_string()); 474 + assert!(body["results"].is_array()); 475 + let results = body["results"].as_array().unwrap(); 476 + assert_eq!(results.len(), 2); 477 + assert!(results[0]["uri"].is_string()); 478 + assert!(results[0]["cid"].is_string()); 479 + } 480 + 481 + #[tokio::test] 482 + async fn test_apply_writes_update() { 483 + let client = client(); 484 + let (token, did) = create_account_and_login(&client).await; 485 + let now = Utc::now().to_rfc3339(); 486 + let rkey = format!("batch_update_{}", Utc::now().timestamp_millis()); 487 + 488 + let create_payload = json!({ 489 + "repo": did, 407 490 "collection": "app.bsky.feed.post", 408 - "rkey": "some_post_to_delete" 491 + "rkey": rkey, 492 + "record": { 493 + "$type": "app.bsky.feed.post", 494 + "text": "Original post", 495 + "createdAt": now 496 + } 497 + }); 498 + let res = client 499 + .post(format!( 500 + "{}/xrpc/com.atproto.repo.putRecord", 501 + base_url().await 502 + )) 503 + .bearer_auth(&token) 504 + .json(&create_payload) 505 + .send() 506 + .await 507 + .expect("Failed to create"); 508 + assert_eq!(res.status(), StatusCode::OK); 509 + 510 + let update_payload = json!({ 511 + "repo": did, 512 + "writes": [ 513 + { 514 + "$type": "com.atproto.repo.applyWrites#update", 515 + "collection": "app.bsky.feed.post", 516 + "rkey": rkey, 517 + "value": { 518 + "$type": "app.bsky.feed.post", 519 + "text": "Updated post via applyWrites", 520 + "createdAt": now 521 + } 522 + } 523 + ] 524 + }); 525 + 526 + let res = client 527 + .post(format!( 528 + "{}/xrpc/com.atproto.repo.applyWrites", 529 + base_url().await 530 + )) 531 + .bearer_auth(&token) 532 + .json(&update_payload) 533 + .send() 534 + .await 535 + .expect("Failed to send request"); 536 + 537 + assert_eq!(res.status(), StatusCode::OK); 538 + let body: Value = res.json().await.expect("Response was not valid JSON"); 539 + let results = body["results"].as_array().unwrap(); 540 + assert_eq!(results.len(), 1); 541 + assert!(results[0]["uri"].is_string()); 542 + } 543 + 544 + #[tokio::test] 545 + async fn test_apply_writes_delete() { 546 + let client = client(); 547 + let (token, did) = create_account_and_login(&client).await; 548 + let now = Utc::now().to_rfc3339(); 549 + let rkey = format!("batch_delete_{}", Utc::now().timestamp_millis()); 550 + 551 + let create_payload = json!({ 552 + "repo": did, 553 + "collection": "app.bsky.feed.post", 554 + "rkey": rkey, 555 + "record": { 556 + "$type": "app.bsky.feed.post", 557 + "text": "Post to delete", 558 + "createdAt": now 559 + } 560 + }); 561 + let res = client 562 + .post(format!( 563 + "{}/xrpc/com.atproto.repo.putRecord", 564 + base_url().await 565 + )) 566 + .bearer_auth(&token) 567 + .json(&create_payload) 568 + .send() 569 + .await 570 + .expect("Failed to create"); 571 + assert_eq!(res.status(), StatusCode::OK); 572 + 573 + let delete_payload = json!({ 574 + "repo": did, 575 + "writes": [ 576 + { 577 + "$type": "com.atproto.repo.applyWrites#delete", 578 + "collection": "app.bsky.feed.post", 579 + "rkey": rkey 580 + } 581 + ] 582 + }); 583 + 584 + let res = client 585 + .post(format!( 586 + "{}/xrpc/com.atproto.repo.applyWrites", 587 + base_url().await 588 + )) 589 + .bearer_auth(&token) 590 + .json(&delete_payload) 591 + .send() 592 + .await 593 + .expect("Failed to send request"); 594 + 595 + assert_eq!(res.status(), StatusCode::OK); 596 + 597 + let get_res = client 598 + .get(format!( 599 + "{}/xrpc/com.atproto.repo.getRecord", 600 + base_url().await 601 + )) 602 + .query(&[ 603 + ("repo", did.as_str()), 604 + ("collection", "app.bsky.feed.post"), 605 + ("rkey", rkey.as_str()), 606 + ]) 607 + .send() 608 + .await 609 + .expect("Failed to verify"); 610 + assert_eq!(get_res.status(), StatusCode::NOT_FOUND); 611 + } 612 + 613 + #[tokio::test] 614 + async fn test_apply_writes_mixed_operations() { 615 + let client = client(); 616 + let (token, did) = create_account_and_login(&client).await; 617 + let now = Utc::now().to_rfc3339(); 618 + let rkey_to_delete = format!("mixed_del_{}", Utc::now().timestamp_millis()); 619 + let rkey_to_update = format!("mixed_upd_{}", Utc::now().timestamp_millis()); 620 + 621 + let setup_payload = json!({ 622 + "repo": did, 623 + "writes": [ 624 + { 625 + "$type": "com.atproto.repo.applyWrites#create", 626 + "collection": "app.bsky.feed.post", 627 + "rkey": rkey_to_delete, 628 + "value": { 629 + "$type": "app.bsky.feed.post", 630 + "text": "To be deleted", 631 + "createdAt": now 632 + } 633 + }, 634 + { 635 + "$type": "com.atproto.repo.applyWrites#create", 636 + "collection": "app.bsky.feed.post", 637 + "rkey": rkey_to_update, 638 + "value": { 639 + "$type": "app.bsky.feed.post", 640 + "text": "To be updated", 641 + "createdAt": now 642 + } 643 + } 644 + ] 409 645 }); 410 646 let res = client 411 647 .post(format!( 412 - "{}/xrpc/com.atproto.repo.deleteRecord", 648 + "{}/xrpc/com.atproto.repo.applyWrites", 413 649 base_url().await 414 650 )) 415 - .bearer_auth(token) 416 - .json(&payload) 651 + .bearer_auth(&token) 652 + .json(&setup_payload) 653 + .send() 654 + .await 655 + .expect("Failed to setup"); 656 + assert_eq!(res.status(), StatusCode::OK); 657 + 658 + let mixed_payload = json!({ 659 + "repo": did, 660 + "writes": [ 661 + { 662 + "$type": "com.atproto.repo.applyWrites#create", 663 + "collection": "app.bsky.feed.post", 664 + "value": { 665 + "$type": "app.bsky.feed.post", 666 + "text": "New post", 667 + "createdAt": now 668 + } 669 + }, 670 + { 671 + "$type": "com.atproto.repo.applyWrites#update", 672 + "collection": "app.bsky.feed.post", 673 + "rkey": rkey_to_update, 674 + "value": { 675 + "$type": "app.bsky.feed.post", 676 + "text": "Updated text", 677 + "createdAt": now 678 + } 679 + }, 680 + { 681 + "$type": "com.atproto.repo.applyWrites#delete", 682 + "collection": "app.bsky.feed.post", 683 + "rkey": rkey_to_delete 684 + } 685 + ] 686 + }); 687 + 688 + let res = client 689 + .post(format!( 690 + "{}/xrpc/com.atproto.repo.applyWrites", 691 + base_url().await 692 + )) 693 + .bearer_auth(&token) 694 + .json(&mixed_payload) 417 695 .send() 418 696 .await 419 697 .expect("Failed to send request"); 420 698 421 699 assert_eq!(res.status(), StatusCode::OK); 700 + let body: Value = res.json().await.expect("Response was not valid JSON"); 701 + let results = body["results"].as_array().unwrap(); 702 + assert_eq!(results.len(), 3); 703 + } 704 + 705 + #[tokio::test] 706 + async fn test_apply_writes_no_auth() { 707 + let client = client(); 708 + 709 + let payload = json!({ 710 + "repo": "did:plc:test", 711 + "writes": [ 712 + { 713 + "$type": "com.atproto.repo.applyWrites#create", 714 + "collection": "app.bsky.feed.post", 715 + "value": { 716 + "$type": "app.bsky.feed.post", 717 + "text": "Test", 718 + "createdAt": "2025-01-01T00:00:00Z" 719 + } 720 + } 721 + ] 722 + }); 723 + 724 + let res = client 725 + .post(format!( 726 + "{}/xrpc/com.atproto.repo.applyWrites", 727 + base_url().await 728 + )) 729 + .json(&payload) 730 + .send() 731 + .await 732 + .expect("Failed to send request"); 733 + 734 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 735 + } 736 + 737 + #[tokio::test] 738 + async fn test_apply_writes_empty_writes() { 739 + let client = client(); 740 + let (token, did) = create_account_and_login(&client).await; 741 + 742 + let payload = json!({ 743 + "repo": did, 744 + "writes": [] 745 + }); 746 + 747 + let res = client 748 + .post(format!( 749 + "{}/xrpc/com.atproto.repo.applyWrites", 750 + base_url().await 751 + )) 752 + .bearer_auth(&token) 753 + .json(&payload) 754 + .send() 755 + .await 756 + .expect("Failed to send request"); 757 + 758 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 422 759 }
+101
tests/server.rs
··· 216 216 217 217 assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 218 218 } 219 + 220 + #[tokio::test] 221 + async fn test_get_service_auth_success() { 222 + let client = client(); 223 + let (access_jwt, did) = create_account_and_login(&client).await; 224 + 225 + let params = [("aud", "did:web:example.com")]; 226 + let res = client 227 + .get(format!( 228 + "{}/xrpc/com.atproto.server.getServiceAuth", 229 + base_url().await 230 + )) 231 + .bearer_auth(&access_jwt) 232 + .query(&params) 233 + .send() 234 + .await 235 + .expect("Failed to send request"); 236 + 237 + assert_eq!(res.status(), StatusCode::OK); 238 + let body: Value = res.json().await.expect("Response was not valid JSON"); 239 + assert!(body["token"].is_string()); 240 + 241 + let token = body["token"].as_str().unwrap(); 242 + let parts: Vec<&str> = token.split('.').collect(); 243 + assert_eq!(parts.len(), 3, "Token should be a valid JWT"); 244 + 245 + use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; 246 + let payload_bytes = URL_SAFE_NO_PAD.decode(parts[1]).expect("payload b64"); 247 + let claims: Value = serde_json::from_slice(&payload_bytes).expect("payload json"); 248 + 249 + assert_eq!(claims["iss"], did); 250 + assert_eq!(claims["sub"], did); 251 + assert_eq!(claims["aud"], "did:web:example.com"); 252 + } 253 + 254 + #[tokio::test] 255 + async fn test_get_service_auth_with_lxm() { 256 + let client = client(); 257 + let (access_jwt, did) = create_account_and_login(&client).await; 258 + 259 + let params = [("aud", "did:web:example.com"), ("lxm", "com.atproto.repo.getRecord")]; 260 + let res = client 261 + .get(format!( 262 + "{}/xrpc/com.atproto.server.getServiceAuth", 263 + base_url().await 264 + )) 265 + .bearer_auth(&access_jwt) 266 + .query(&params) 267 + .send() 268 + .await 269 + .expect("Failed to send request"); 270 + 271 + assert_eq!(res.status(), StatusCode::OK); 272 + let body: Value = res.json().await.expect("Response was not valid JSON"); 273 + 274 + use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; 275 + let token = body["token"].as_str().unwrap(); 276 + let parts: Vec<&str> = token.split('.').collect(); 277 + let payload_bytes = URL_SAFE_NO_PAD.decode(parts[1]).expect("payload b64"); 278 + let claims: Value = serde_json::from_slice(&payload_bytes).expect("payload json"); 279 + 280 + assert_eq!(claims["iss"], did); 281 + assert_eq!(claims["lxm"], "com.atproto.repo.getRecord"); 282 + } 283 + 284 + #[tokio::test] 285 + async fn test_get_service_auth_no_auth() { 286 + let client = client(); 287 + let params = [("aud", "did:web:example.com")]; 288 + let res = client 289 + .get(format!( 290 + "{}/xrpc/com.atproto.server.getServiceAuth", 291 + base_url().await 292 + )) 293 + .query(&params) 294 + .send() 295 + .await 296 + .expect("Failed to send request"); 297 + 298 + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); 299 + let body: Value = res.json().await.expect("Response was not valid JSON"); 300 + assert_eq!(body["error"], "AuthenticationRequired"); 301 + } 302 + 303 + #[tokio::test] 304 + async fn test_get_service_auth_missing_aud() { 305 + let client = client(); 306 + let (access_jwt, _) = create_account_and_login(&client).await; 307 + 308 + let res = client 309 + .get(format!( 310 + "{}/xrpc/com.atproto.server.getServiceAuth", 311 + base_url().await 312 + )) 313 + .bearer_auth(&access_jwt) 314 + .send() 315 + .await 316 + .expect("Failed to send request"); 317 + 318 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 319 + }
+122 -11
tests/sync.rs
··· 1 1 mod common; 2 2 use common::*; 3 3 use reqwest::StatusCode; 4 + use serde_json::Value; 4 5 5 6 #[tokio::test] 6 - #[ignore] 7 - async fn test_get_repo() { 7 + async fn test_get_latest_commit_success() { 8 + let client = client(); 9 + let (_, did) = create_account_and_login(&client).await; 10 + 11 + let params = [("did", did.as_str())]; 12 + let res = client 13 + .get(format!( 14 + "{}/xrpc/com.atproto.sync.getLatestCommit", 15 + base_url().await 16 + )) 17 + .query(&params) 18 + .send() 19 + .await 20 + .expect("Failed to send request"); 21 + 22 + assert_eq!(res.status(), StatusCode::OK); 23 + let body: Value = res.json().await.expect("Response was not valid JSON"); 24 + assert!(body["cid"].is_string()); 25 + assert!(body["rev"].is_string()); 26 + } 27 + 28 + #[tokio::test] 29 + async fn test_get_latest_commit_not_found() { 30 + let client = client(); 31 + let params = [("did", "did:plc:nonexistent12345")]; 32 + let res = client 33 + .get(format!( 34 + "{}/xrpc/com.atproto.sync.getLatestCommit", 35 + base_url().await 36 + )) 37 + .query(&params) 38 + .send() 39 + .await 40 + .expect("Failed to send request"); 41 + 42 + assert_eq!(res.status(), StatusCode::NOT_FOUND); 43 + let body: Value = res.json().await.expect("Response was not valid JSON"); 44 + assert_eq!(body["error"], "RepoNotFound"); 45 + } 46 + 47 + #[tokio::test] 48 + async fn test_get_latest_commit_missing_param() { 49 + let client = client(); 50 + let res = client 51 + .get(format!( 52 + "{}/xrpc/com.atproto.sync.getLatestCommit", 53 + base_url().await 54 + )) 55 + .send() 56 + .await 57 + .expect("Failed to send request"); 58 + 59 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 60 + } 61 + 62 + #[tokio::test] 63 + async fn test_list_repos() { 64 + let client = client(); 65 + let _ = create_account_and_login(&client).await; 66 + 67 + let res = client 68 + .get(format!( 69 + "{}/xrpc/com.atproto.sync.listRepos", 70 + base_url().await 71 + )) 72 + .send() 73 + .await 74 + .expect("Failed to send request"); 75 + 76 + assert_eq!(res.status(), StatusCode::OK); 77 + let body: Value = res.json().await.expect("Response was not valid JSON"); 78 + assert!(body["repos"].is_array()); 79 + let repos = body["repos"].as_array().unwrap(); 80 + assert!(!repos.is_empty()); 81 + 82 + let repo = &repos[0]; 83 + assert!(repo["did"].is_string()); 84 + assert!(repo["head"].is_string()); 85 + assert!(repo["active"].is_boolean()); 86 + } 87 + 88 + #[tokio::test] 89 + async fn test_list_repos_with_limit() { 8 90 let client = client(); 9 - let params = [("did", AUTH_DID)]; 91 + let _ = create_account_and_login(&client).await; 92 + let _ = create_account_and_login(&client).await; 93 + let _ = create_account_and_login(&client).await; 94 + 95 + let params = [("limit", "2")]; 10 96 let res = client 11 97 .get(format!( 12 - "{}/xrpc/com.atproto.sync.getRepo", 98 + "{}/xrpc/com.atproto.sync.listRepos", 13 99 base_url().await 14 100 )) 15 101 .query(&params) ··· 18 104 .expect("Failed to send request"); 19 105 20 106 assert_eq!(res.status(), StatusCode::OK); 107 + let body: Value = res.json().await.expect("Response was not valid JSON"); 108 + let repos = body["repos"].as_array().unwrap(); 109 + assert!(repos.len() <= 2); 21 110 } 22 111 23 112 #[tokio::test] 24 - #[ignore] 25 - async fn test_get_blocks() { 113 + async fn test_list_repos_pagination() { 26 114 let client = client(); 27 - let params = [ 28 - ("did", AUTH_DID), 29 - // "cids" would be a list of CIDs 30 - ]; 115 + let _ = create_account_and_login(&client).await; 116 + let _ = create_account_and_login(&client).await; 117 + let _ = create_account_and_login(&client).await; 118 + 119 + let params = [("limit", "1")]; 31 120 let res = client 32 121 .get(format!( 33 - "{}/xrpc/com.atproto.sync.getBlocks", 122 + "{}/xrpc/com.atproto.sync.listRepos", 34 123 base_url().await 35 124 )) 36 125 .query(&params) ··· 39 128 .expect("Failed to send request"); 40 129 41 130 assert_eq!(res.status(), StatusCode::OK); 131 + let body: Value = res.json().await.expect("Response was not valid JSON"); 132 + let repos = body["repos"].as_array().unwrap(); 133 + assert_eq!(repos.len(), 1); 134 + 135 + if let Some(cursor) = body["cursor"].as_str() { 136 + let params = [("limit", "1"), ("cursor", cursor)]; 137 + let res = client 138 + .get(format!( 139 + "{}/xrpc/com.atproto.sync.listRepos", 140 + base_url().await 141 + )) 142 + .query(&params) 143 + .send() 144 + .await 145 + .expect("Failed to send request"); 146 + 147 + assert_eq!(res.status(), StatusCode::OK); 148 + let body: Value = res.json().await.expect("Response was not valid JSON"); 149 + let repos2 = body["repos"].as_array().unwrap(); 150 + assert_eq!(repos2.len(), 1); 151 + assert_ne!(repos[0]["did"], repos2[0]["did"]); 152 + } 42 153 }