this repo has no description

Sync conformance fixes vs ref

lewis 4e58c5b0 fc98610d

-29
.sqlx/query-0f8fd9cbb1ff0fd8951ce082a82cc058ec6db0dde3ab0059d6f340a1fd9ddade.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT u.did, r.repo_root_cid\n FROM repos r\n JOIN users u ON r.user_id = u.id\n WHERE u.did > $1\n ORDER BY u.did ASC\n LIMIT $2\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "did", 9 - "type_info": "Text" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "repo_root_cid", 14 - "type_info": "Text" 15 - } 16 - ], 17 - "parameters": { 18 - "Left": [ 19 - "Text", 20 - "Int8" 21 - ] 22 - }, 23 - "nullable": [ 24 - false, 25 - false 26 - ] 27 - }, 28 - "hash": "0f8fd9cbb1ff0fd8951ce082a82cc058ec6db0dde3ab0059d6f340a1fd9ddade" 29 - }
-25
.sqlx/query-0fdf13907693d130babae38f4bb1df772dc11ab682f47918cacb5ae186b4eb24.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT cid FROM blobs\n WHERE created_by_user = $1 AND cid > $2 AND created_at > $3\n ORDER BY cid ASC\n LIMIT $4\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "cid", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Uuid", 15 - "Text", 16 - "Timestamptz", 17 - "Int8" 18 - ] 19 - }, 20 - "nullable": [ 21 - false 22 - ] 23 - }, 24 - "hash": "0fdf13907693d130babae38f4bb1df772dc11ab682f47918cacb5ae186b4eb24" 25 - }
-28
.sqlx/query-1d3748694f23a407e26c793cc43e91c4fa9753dc7c7fd964f6c43de27c5bac4a.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT u.did, r.repo_root_cid\n FROM users u\n LEFT JOIN repos r ON u.id = r.user_id\n WHERE u.did = $1\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "did", 9 - "type_info": "Text" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "repo_root_cid", 14 - "type_info": "Text" 15 - } 16 - ], 17 - "parameters": { 18 - "Left": [ 19 - "Text" 20 - ] 21 - }, 22 - "nullable": [ 23 - false, 24 - false 25 - ] 26 - }, 27 - "hash": "1d3748694f23a407e26c793cc43e91c4fa9753dc7c7fd964f6c43de27c5bac4a" 28 - }
+23
.sqlx/query-485cd286a085cca2910e3c3de757b66211b8eb7ec5dadcfda79485520f792c16.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT DISTINCT unnest(blobs) as \"cid!\"\n FROM repo_seq\n WHERE did = $1 AND rev > $2 AND blobs IS NOT NULL\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "cid!", 9 + "type_info": "Text" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Text", 15 + "Text" 16 + ] 17 + }, 18 + "nullable": [ 19 + null 20 + ] 21 + }, 22 + "hash": "485cd286a085cca2910e3c3de757b66211b8eb7ec5dadcfda79485520f792c16" 23 + }
+100
.sqlx/query-6783bd8e36444e5d6cc25cc1a120618a541ff9eafa457943ca814bd2a3ca72e1.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev\n FROM repo_seq\n WHERE seq > $1\n ORDER BY seq ASC\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "seq", 9 + "type_info": "Int8" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "did", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "created_at", 19 + "type_info": "Timestamptz" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "event_type", 24 + "type_info": "Text" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "commit_cid", 29 + "type_info": "Text" 30 + }, 31 + { 32 + "ordinal": 5, 33 + "name": "prev_cid", 34 + "type_info": "Text" 35 + }, 36 + { 37 + "ordinal": 6, 38 + "name": "prev_data_cid", 39 + "type_info": "Text" 40 + }, 41 + { 42 + "ordinal": 7, 43 + "name": "ops", 44 + "type_info": "Jsonb" 45 + }, 46 + { 47 + "ordinal": 8, 48 + "name": "blobs", 49 + "type_info": "TextArray" 50 + }, 51 + { 52 + "ordinal": 9, 53 + "name": "blocks_cids", 54 + "type_info": "TextArray" 55 + }, 56 + { 57 + "ordinal": 10, 58 + "name": "handle", 59 + "type_info": "Text" 60 + }, 61 + { 62 + "ordinal": 11, 63 + "name": "active", 64 + "type_info": "Bool" 65 + }, 66 + { 67 + "ordinal": 12, 68 + "name": "status", 69 + "type_info": "Text" 70 + }, 71 + { 72 + "ordinal": 13, 73 + "name": "rev", 74 + "type_info": "Text" 75 + } 76 + ], 77 + "parameters": { 78 + "Left": [ 79 + "Int8" 80 + ] 81 + }, 82 + "nullable": [ 83 + false, 84 + false, 85 + false, 86 + false, 87 + true, 88 + true, 89 + true, 90 + true, 91 + true, 92 + true, 93 + true, 94 + true, 95 + true, 96 + true 97 + ] 98 + }, 99 + "hash": "6783bd8e36444e5d6cc25cc1a120618a541ff9eafa457943ca814bd2a3ca72e1" 100 + }
+46
.sqlx/query-93678a24667d311aaec7c6277aae3764e5761870aa8c10c50f1e3c8e7fdb87d4.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT u.id, u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid\n FROM users u\n LEFT JOIN repos r ON r.user_id = u.id\n WHERE u.did = $1\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "id", 9 + "type_info": "Uuid" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "did", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "deactivated_at", 19 + "type_info": "Timestamptz" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "takedown_ref", 24 + "type_info": "Text" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "repo_root_cid", 29 + "type_info": "Text" 30 + } 31 + ], 32 + "parameters": { 33 + "Left": [ 34 + "Text" 35 + ] 36 + }, 37 + "nullable": [ 38 + false, 39 + false, 40 + true, 41 + true, 42 + false 43 + ] 44 + }, 45 + "hash": "93678a24667d311aaec7c6277aae3764e5761870aa8c10c50f1e3c8e7fdb87d4" 46 + }
+29
.sqlx/query-9eeebac027c05ac44afa9f6b163762277849b75b647b7bf2ce5104baca795bf6.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT blocks_cids, commit_cid\n FROM repo_seq\n WHERE did = $1 AND rev > $2\n ORDER BY seq DESC\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "blocks_cids", 9 + "type_info": "TextArray" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "commit_cid", 14 + "type_info": "Text" 15 + } 16 + ], 17 + "parameters": { 18 + "Left": [ 19 + "Text", 20 + "Text" 21 + ] 22 + }, 23 + "nullable": [ 24 + true, 25 + true 26 + ] 27 + }, 28 + "hash": "9eeebac027c05ac44afa9f6b163762277849b75b647b7bf2ce5104baca795bf6" 29 + }
+20
.sqlx/query-a805ece8ccc38c88a6dbca22dd70a6a330f0cc8d72952d5b72a8766199fbf598.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT MAX(seq) FROM repo_seq", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "max", 9 + "type_info": "Int8" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [] 14 + }, 15 + "nullable": [ 16 + null 17 + ] 18 + }, 19 + "hash": "a805ece8ccc38c88a6dbca22dd70a6a330f0cc8d72952d5b72a8766199fbf598" 20 + }
+100
.sqlx/query-abed6772d0cb2924c0aa27d479c866bd099105461ffa126dcbe97ce9089a8b5d.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev\n FROM repo_seq\n WHERE seq > $1\n ORDER BY seq ASC\n LIMIT 1\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "seq", 9 + "type_info": "Int8" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "did", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "created_at", 19 + "type_info": "Timestamptz" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "event_type", 24 + "type_info": "Text" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "commit_cid", 29 + "type_info": "Text" 30 + }, 31 + { 32 + "ordinal": 5, 33 + "name": "prev_cid", 34 + "type_info": "Text" 35 + }, 36 + { 37 + "ordinal": 6, 38 + "name": "prev_data_cid", 39 + "type_info": "Text" 40 + }, 41 + { 42 + "ordinal": 7, 43 + "name": "ops", 44 + "type_info": "Jsonb" 45 + }, 46 + { 47 + "ordinal": 8, 48 + "name": "blobs", 49 + "type_info": "TextArray" 50 + }, 51 + { 52 + "ordinal": 9, 53 + "name": "blocks_cids", 54 + "type_info": "TextArray" 55 + }, 56 + { 57 + "ordinal": 10, 58 + "name": "handle", 59 + "type_info": "Text" 60 + }, 61 + { 62 + "ordinal": 11, 63 + "name": "active", 64 + "type_info": "Bool" 65 + }, 66 + { 67 + "ordinal": 12, 68 + "name": "status", 69 + "type_info": "Text" 70 + }, 71 + { 72 + "ordinal": 13, 73 + "name": "rev", 74 + "type_info": "Text" 75 + } 76 + ], 77 + "parameters": { 78 + "Left": [ 79 + "Int8" 80 + ] 81 + }, 82 + "nullable": [ 83 + false, 84 + false, 85 + false, 86 + false, 87 + true, 88 + true, 89 + true, 90 + true, 91 + true, 92 + true, 93 + true, 94 + true, 95 + true, 96 + true 97 + ] 98 + }, 99 + "hash": "abed6772d0cb2924c0aa27d479c866bd099105461ffa126dcbe97ce9089a8b5d" 100 + }
+22
.sqlx/query-b43902272f2710b849840b29f2e4c7c9959116bbdf4bed09939dfd82749ccb5f.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT MIN(seq) FROM repo_seq WHERE created_at >= $1", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "min", 9 + "type_info": "Int8" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Timestamptz" 15 + ] 16 + }, 17 + "nullable": [ 18 + null 19 + ] 20 + }, 21 + "hash": "b43902272f2710b849840b29f2e4c7c9959116bbdf4bed09939dfd82749ccb5f" 22 + }
+34
.sqlx/query-dd1b61d6ec81fd891d4effd3b51e6c22308b878acdc5355dfcb04c5664c9463b.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT storage_key, mime_type, size_bytes FROM blobs WHERE cid = $1", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "storage_key", 9 + "type_info": "Text" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "mime_type", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "size_bytes", 19 + "type_info": "Int8" 20 + } 21 + ], 22 + "parameters": { 23 + "Left": [ 24 + "Text" 25 + ] 26 + }, 27 + "nullable": [ 28 + false, 29 + false, 30 + false 31 + ] 32 + }, 33 + "hash": "dd1b61d6ec81fd891d4effd3b51e6c22308b878acdc5355dfcb04c5664c9463b" 34 + }
+47
.sqlx/query-f6723557ad451b8f4349df8ad4ec35f7abc8590262156bfd4cdc4dbf11ddf8c9.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid, r.repo_rev\n FROM repos r\n JOIN users u ON r.user_id = u.id\n WHERE u.did > $1\n ORDER BY u.did ASC\n LIMIT $2\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "did", 9 + "type_info": "Text" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "deactivated_at", 14 + "type_info": "Timestamptz" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "takedown_ref", 19 + "type_info": "Text" 20 + }, 21 + { 22 + "ordinal": 3, 23 + "name": "repo_root_cid", 24 + "type_info": "Text" 25 + }, 26 + { 27 + "ordinal": 4, 28 + "name": "repo_rev", 29 + "type_info": "Text" 30 + } 31 + ], 32 + "parameters": { 33 + "Left": [ 34 + "Text", 35 + "Int8" 36 + ] 37 + }, 38 + "nullable": [ 39 + false, 40 + true, 41 + true, 42 + false, 43 + true 44 + ] 45 + }, 46 + "hash": "f6723557ad451b8f4349df8ad4ec35f7abc8590262156bfd4cdc4dbf11ddf8c9" 47 + }
+43 -16
justfile
··· 1 1 default: 2 2 @just --list 3 + 3 4 run: 4 5 cargo run 5 6 run-release: ··· 17 18 fmt-check: 18 19 cargo fmt -- --check 19 20 lint: fmt-check clippy 20 - # Run tests (auto-starts and auto-cleans containers) 21 + 22 + test-all *args: 23 + ./scripts/run-tests.sh {{args}} 24 + 25 + test-auth: 26 + ./scripts/run-tests.sh --test oauth --test oauth_lifecycle --test oauth_scopes --test oauth_security --test oauth_client_metadata --test jwt_security --test session_management --test change_password --test password_reset 27 + 28 + test-admin: 29 + ./scripts/run-tests.sh --test admin_email --test admin_invite --test admin_moderation --test admin_search --test admin_stats 30 + 31 + test-sync: 32 + ./scripts/run-tests.sh --test sync_repo --test sync_blob --test sync_conformance --test sync_deprecated --test firehose_validation 33 + 34 + test-repo: 35 + ./scripts/run-tests.sh --test repo_batch --test repo_blob --test record_validation --test lifecycle_record 36 + 37 + test-identity: 38 + ./scripts/run-tests.sh --test identity --test did_web --test plc_migration --test plc_operations --test plc_validation 39 + 40 + test-account: 41 + ./scripts/run-tests.sh --test lifecycle_session --test delete_account --test invite --test email_update --test account_notifications 42 + 43 + test-security: 44 + ./scripts/run-tests.sh --test security_fixes --test banned_words --test rate_limit --test moderation 45 + 46 + test-import: 47 + ./scripts/run-tests.sh --test import_verification --test import_with_verification 48 + 49 + test-misc: 50 + ./scripts/run-tests.sh --test actor --test commit_signing --test image_processing --test lifecycle_social --test notifications --test server --test signing_key --test verify_live_commit 51 + 21 52 test *args: 22 53 ./scripts/run-tests.sh {{args}} 23 - # Run a specific test file 24 - test-file file: 25 - ./scripts/run-tests.sh --test {{file}} 26 - # Run tests with testcontainers (slower, no shared infra) 27 - test-standalone: 28 - TRANQUIL_PDS_ALLOW_INSECURE_SECRETS=1 cargo test 29 - # Manually manage test infrastructure (for debugging) 30 - test-infra-start: 54 + 55 + test-one name: 56 + ./scripts/run-tests.sh --test {{name}} 57 + 58 + infra-start: 31 59 ./scripts/test-infra.sh start 32 - test-infra-stop: 60 + infra-stop: 33 61 ./scripts/test-infra.sh stop 34 - test-infra-status: 62 + infra-status: 35 63 ./scripts/test-infra.sh status 64 + 36 65 clean: 37 66 cargo clean 38 67 doc: ··· 53 82 podman compose logs -f 54 83 podman-build: 55 84 podman compose build 56 - # Frontend commands (Deno) 85 + 57 86 frontend-dev: 58 87 . ~/.deno/env && cd frontend && deno task dev 59 88 frontend-build: 60 89 . ~/.deno/env && cd frontend && deno task build 61 90 frontend-clean: 62 91 rm -rf frontend/dist frontend/node_modules 63 - # Frontend tests 92 + 64 93 frontend-test *args: 65 94 . ~/.deno/env && cd frontend && VITEST=true deno task test:run {{args}} 66 95 frontend-test-watch: ··· 69 98 . ~/.deno/env && cd frontend && VITEST=true deno task test:ui 70 99 frontend-test-coverage: 71 100 . ~/.deno/env && cd frontend && VITEST=true deno task test:run --coverage 72 - # Build all (frontend + backend) 101 + 73 102 build-all: frontend-build build 74 - # Test all (backend + frontend) 75 - test-all: test frontend-test
+1 -19
src/api/admin/account/info.rs
··· 88 88 } 89 89 } 90 90 91 - fn parse_repeated_param(query: Option<&str>, key: &str) -> Vec<String> { 92 - query 93 - .map(|q| { 94 - q.split('&') 95 - .filter_map(|pair| { 96 - let (k, v) = pair.split_once('=')?; 97 - 98 - if k == key { 99 - Some(urlencoding::decode(v).ok()?.into_owned()) 100 - } else { 101 - None 102 - } 103 - }) 104 - .collect() 105 - }) 106 - .unwrap_or_default() 107 - } 108 - 109 91 pub async fn get_account_infos( 110 92 State(state): State<AppState>, 111 93 _auth: BearerAuthAdmin, 112 94 RawQuery(raw_query): RawQuery, 113 95 ) -> Response { 114 - let dids = parse_repeated_param(raw_query.as_deref(), "dids"); 96 + let dids = crate::util::parse_repeated_query_param(raw_query.as_deref(), "dids"); 115 97 if dids.is_empty() { 116 98 return ( 117 99 StatusCode::BAD_REQUEST,
+3 -1
src/moderation/mod.rs
··· 107 107 use base64::Engine; 108 108 109 109 fn d(b64: &str) -> String { 110 - let bytes = base64::engine::general_purpose::STANDARD.decode(b64).unwrap(); 110 + let bytes = base64::engine::general_purpose::STANDARD 111 + .decode(b64) 112 + .unwrap(); 111 113 String::from_utf8(bytes).unwrap() 112 114 } 113 115
+33 -56
src/sync/blob.rs
··· 1 1 use crate::state::AppState; 2 + use crate::sync::util::assert_repo_availability; 2 3 use axum::{ 3 4 Json, 4 5 body::Body, ··· 37 38 ) 38 39 .into_response(); 39 40 } 40 - let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 41 - .fetch_optional(&state.db) 42 - .await; 43 - match user_exists { 44 - Ok(None) => { 45 - return ( 46 - StatusCode::NOT_FOUND, 47 - Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 48 - ) 49 - .into_response(); 50 - } 51 - Err(e) => { 52 - error!("DB error in get_blob: {:?}", e); 53 - return ( 54 - StatusCode::INTERNAL_SERVER_ERROR, 55 - Json(json!({"error": "InternalError"})), 56 - ) 57 - .into_response(); 58 - } 59 - Ok(Some(_)) => {} 60 - } 41 + 42 + let _account = match assert_repo_availability(&state.db, did, false).await { 43 + Ok(a) => a, 44 + Err(e) => return e.into_response(), 45 + }; 46 + 61 47 let blob_result = sqlx::query!( 62 - "SELECT storage_key, mime_type FROM blobs WHERE cid = $1", 48 + "SELECT storage_key, mime_type, size_bytes FROM blobs WHERE cid = $1", 63 49 cid 64 50 ) 65 51 .fetch_optional(&state.db) ··· 68 54 Ok(Some(row)) => { 69 55 let storage_key = &row.storage_key; 70 56 let mime_type = &row.mime_type; 57 + let size_bytes = row.size_bytes; 71 58 match state.blob_store.get(storage_key).await { 72 59 Ok(data) => Response::builder() 73 60 .status(StatusCode::OK) 74 61 .header(header::CONTENT_TYPE, mime_type) 62 + .header(header::CONTENT_LENGTH, size_bytes.to_string()) 63 + .header("x-content-type-options", "nosniff") 64 + .header("content-security-policy", "default-src 'none'; sandbox") 75 65 .body(Body::from(data)) 76 66 .unwrap(), 77 67 Err(e) => { ··· 127 117 ) 128 118 .into_response(); 129 119 } 120 + 121 + let account = match assert_repo_availability(&state.db, did, false).await { 122 + Ok(a) => a, 123 + Err(e) => return e.into_response(), 124 + }; 125 + 130 126 let limit = params.limit.unwrap_or(500).clamp(1, 1000); 131 127 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 132 - let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 133 - .fetch_optional(&state.db) 134 - .await; 135 - let user_id = match user_result { 136 - Ok(Some(row)) => row.id, 137 - Ok(None) => { 138 - return ( 139 - StatusCode::NOT_FOUND, 140 - Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 141 - ) 142 - .into_response(); 143 - } 144 - Err(e) => { 145 - error!("DB error in list_blobs: {:?}", e); 146 - return ( 147 - StatusCode::INTERNAL_SERVER_ERROR, 148 - Json(json!({"error": "InternalError"})), 149 - ) 150 - .into_response(); 151 - } 152 - }; 128 + let user_id = account.user_id; 129 + 153 130 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = &params.since { 154 - let since_time = chrono::DateTime::parse_from_rfc3339(since) 155 - .map(|dt| dt.with_timezone(&chrono::Utc)) 156 - .unwrap_or_else(|_| chrono::Utc::now()); 157 - sqlx::query!( 131 + sqlx::query_scalar!( 158 132 r#" 159 - SELECT cid FROM blobs 160 - WHERE created_by_user = $1 AND cid > $2 AND created_at > $3 161 - ORDER BY cid ASC 162 - LIMIT $4 133 + SELECT DISTINCT unnest(blobs) as "cid!" 134 + FROM repo_seq 135 + WHERE did = $1 AND rev > $2 AND blobs IS NOT NULL 163 136 "#, 164 - user_id, 165 - cursor_cid, 166 - since_time, 167 - limit + 1 137 + did, 138 + since 168 139 ) 169 140 .fetch_all(&state.db) 170 141 .await 171 - .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 142 + .map(|mut cids| { 143 + cids.sort(); 144 + cids.into_iter() 145 + .filter(|c| c.as_str() > cursor_cid) 146 + .take((limit + 1) as usize) 147 + .collect() 148 + }) 172 149 } else { 173 150 sqlx::query!( 174 151 r#"
+12
src/sync/car.rs
··· 34 34 result.extend_from_slice(&header_cbor); 35 35 Ok(result) 36 36 } 37 + 38 + pub fn encode_car_header_null_root() -> Result<Vec<u8>, String> { 39 + let header = CarHeader::new_v1(vec![]); 40 + let header_cbor = header 41 + .encode() 42 + .map_err(|e| format!("Failed to encode CAR header: {:?}", e))?; 43 + let mut result = Vec::new(); 44 + write_varint(&mut result, header_cbor.len() as u64) 45 + .expect("Writing to Vec<u8> should never fail"); 46 + result.extend_from_slice(&header_cbor); 47 + Ok(result) 48 + }
+97 -68
src/sync/commit.rs
··· 1 1 use crate::state::AppState; 2 + use crate::sync::util::{AccountStatus, assert_repo_availability, get_account_with_status}; 2 3 use axum::{ 3 4 Json, 4 5 extract::{Query, State}, ··· 43 44 ) 44 45 .into_response(); 45 46 } 46 - let result = sqlx::query!( 47 - r#" 48 - SELECT r.repo_root_cid 49 - FROM repos r 50 - JOIN users u ON r.user_id = u.id 51 - WHERE u.did = $1 52 - "#, 53 - did 54 - ) 55 - .fetch_optional(&state.db) 56 - .await; 57 - match result { 58 - Ok(Some(row)) => { 59 - let rev = get_rev_from_commit(&state, &row.repo_root_cid) 60 - .await 61 - .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().to_string()); 62 - ( 63 - StatusCode::OK, 64 - Json(GetLatestCommitOutput { 65 - cid: row.repo_root_cid, 66 - rev, 67 - }), 47 + 48 + let account = match assert_repo_availability(&state.db, did, false).await { 49 + Ok(a) => a, 50 + Err(e) => return e.into_response(), 51 + }; 52 + 53 + let repo_root_cid = match account.repo_root_cid { 54 + Some(cid) => cid, 55 + None => { 56 + return ( 57 + StatusCode::BAD_REQUEST, 58 + Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 68 59 ) 69 - .into_response() 60 + .into_response(); 70 61 } 71 - Ok(None) => ( 72 - StatusCode::NOT_FOUND, 73 - Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 74 - ) 75 - .into_response(), 76 - Err(e) => { 77 - error!("DB error in get_latest_commit: {:?}", e); 78 - ( 62 + }; 63 + 64 + let rev = match get_rev_from_commit(&state, &repo_root_cid).await { 65 + Some(r) => r, 66 + None => { 67 + error!( 68 + "Failed to parse commit for DID {}: CID {}", 69 + did, repo_root_cid 70 + ); 71 + return ( 79 72 StatusCode::INTERNAL_SERVER_ERROR, 80 - Json(json!({"error": "InternalError"})), 73 + Json(json!({"error": "InternalError", "message": "Failed to read repo commit"})), 81 74 ) 82 - .into_response() 75 + .into_response(); 83 76 } 84 - } 77 + }; 78 + 79 + ( 80 + StatusCode::OK, 81 + Json(GetLatestCommitOutput { 82 + cid: repo_root_cid, 83 + rev, 84 + }), 85 + ) 86 + .into_response() 85 87 } 86 88 87 89 #[derive(Deserialize)] ··· 97 99 pub head: String, 98 100 pub rev: String, 99 101 pub active: bool, 102 + #[serde(skip_serializing_if = "Option::is_none")] 103 + pub status: Option<String>, 100 104 } 101 105 102 106 #[derive(Serialize)] ··· 114 118 let cursor_did = params.cursor.as_deref().unwrap_or(""); 115 119 let result = sqlx::query!( 116 120 r#" 117 - SELECT u.did, r.repo_root_cid 121 + SELECT u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid, r.repo_rev 118 122 FROM repos r 119 123 JOIN users u ON r.user_id = u.id 120 124 WHERE u.did > $1 ··· 131 135 let has_more = rows.len() as i64 > limit; 132 136 let mut repos: Vec<RepoInfo> = Vec::new(); 133 137 for row in rows.iter().take(limit as usize) { 134 - let rev = get_rev_from_commit(&state, &row.repo_root_cid) 135 - .await 136 - .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().to_string()); 138 + let rev = match get_rev_from_commit(&state, &row.repo_root_cid).await { 139 + Some(r) => r, 140 + None => { 141 + if let Some(ref stored_rev) = row.repo_rev { 142 + stored_rev.clone() 143 + } else { 144 + tracing::warn!( 145 + "Failed to parse commit for DID {} in list_repos: CID {}", 146 + row.did, 147 + row.repo_root_cid 148 + ); 149 + continue; 150 + } 151 + } 152 + }; 153 + let status = if row.takedown_ref.is_some() { 154 + AccountStatus::Takendown 155 + } else if row.deactivated_at.is_some() { 156 + AccountStatus::Deactivated 157 + } else { 158 + AccountStatus::Active 159 + }; 137 160 repos.push(RepoInfo { 138 161 did: row.did.clone(), 139 162 head: row.repo_root_cid.clone(), 140 163 rev, 141 - active: true, 164 + active: status.is_active(), 165 + status: status.as_str().map(String::from), 142 166 }); 143 167 } 144 168 let next_cursor = if has_more { ··· 175 199 pub struct GetRepoStatusOutput { 176 200 pub did: String, 177 201 pub active: bool, 202 + #[serde(skip_serializing_if = "Option::is_none")] 203 + pub status: Option<String>, 204 + #[serde(skip_serializing_if = "Option::is_none")] 178 205 pub rev: Option<String>, 179 206 } 180 207 ··· 190 217 ) 191 218 .into_response(); 192 219 } 193 - let result = sqlx::query!( 194 - r#" 195 - SELECT u.did, r.repo_root_cid 196 - FROM users u 197 - LEFT JOIN repos r ON u.id = r.user_id 198 - WHERE u.did = $1 199 - "#, 200 - did 201 - ) 202 - .fetch_optional(&state.db) 203 - .await; 204 - match result { 205 - Ok(Some(row)) => { 206 - let rev = get_rev_from_commit(&state, &row.repo_root_cid).await; 207 - ( 208 - StatusCode::OK, 209 - Json(GetRepoStatusOutput { 210 - did: row.did, 211 - active: true, 212 - rev, 213 - }), 220 + 221 + let account = match get_account_with_status(&state.db, did).await { 222 + Ok(Some(a)) => a, 223 + Ok(None) => { 224 + return ( 225 + StatusCode::BAD_REQUEST, 226 + Json(json!({"error": "RepoNotFound", "message": format!("Could not find repo for DID: {}", did)})), 214 227 ) 215 228 .into_response() 216 229 } 217 - Ok(None) => ( 218 - StatusCode::NOT_FOUND, 219 - Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 220 - ) 221 - .into_response(), 222 230 Err(e) => { 223 231 error!("DB error in get_repo_status: {:?}", e); 224 - ( 232 + return ( 225 233 StatusCode::INTERNAL_SERVER_ERROR, 226 234 Json(json!({"error": "InternalError"})), 227 235 ) 228 - .into_response() 236 + .into_response(); 237 + } 238 + }; 239 + 240 + let rev = if account.status.is_active() { 241 + if let Some(ref cid) = account.repo_root_cid { 242 + get_rev_from_commit(&state, cid).await 243 + } else { 244 + None 229 245 } 230 - } 246 + } else { 247 + None 248 + }; 249 + 250 + ( 251 + StatusCode::OK, 252 + Json(GetRepoStatusOutput { 253 + did: account.did, 254 + active: account.status.is_active(), 255 + status: account.status.as_str().map(String::from), 256 + rev, 257 + }), 258 + ) 259 + .into_response() 231 260 }
+19
src/sync/frame.rs
··· 74 74 pub time: String, 75 75 } 76 76 77 + #[derive(Debug, Serialize, Deserialize)] 78 + pub struct InfoFrame { 79 + pub name: String, 80 + #[serde(skip_serializing_if = "Option::is_none")] 81 + pub message: Option<String>, 82 + } 83 + 84 + #[derive(Debug, Serialize, Deserialize)] 85 + pub struct ErrorFrameHeader { 86 + pub op: i64, 87 + } 88 + 89 + #[derive(Debug, Serialize, Deserialize)] 90 + pub struct ErrorFrameBody { 91 + pub error: String, 92 + #[serde(skip_serializing_if = "Option::is_none")] 93 + pub message: Option<String>, 94 + } 95 + 77 96 pub struct CommitFrameBuilder { 78 97 pub seq: i64, 79 98 pub did: String,
+4
src/sync/mod.rs
··· 18 18 pub use deprecated::{get_checkout, get_head}; 19 19 pub use repo::{get_blocks, get_record, get_repo}; 20 20 pub use subscribe_repos::subscribe_repos; 21 + pub use util::{ 22 + AccountStatus, RepoAccount, RepoAvailabilityError, assert_repo_availability, 23 + get_account_with_status, 24 + }; 21 25 pub use verify::{CarVerifier, VerifiedCar, VerifyError};
+210 -72
src/sync/repo.rs
··· 1 1 use crate::state::AppState; 2 2 use crate::sync::car::encode_car_header; 3 + use crate::sync::util::assert_repo_availability; 3 4 use axum::{ 4 5 Json, 5 - extract::{Query, State}, 6 + extract::{Query, RawQuery, State}, 6 7 http::StatusCode, 7 8 response::{IntoResponse, Response}, 8 9 }; ··· 17 18 18 19 const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000; 19 20 20 - #[derive(Deserialize)] 21 - pub struct GetBlocksQuery { 22 - pub did: String, 23 - pub cids: String, 21 + fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> { 22 + let did = crate::util::parse_repeated_query_param(Some(query_string), "did") 23 + .into_iter() 24 + .next() 25 + .ok_or("Missing required parameter: did")?; 26 + let cids = crate::util::parse_repeated_query_param(Some(query_string), "cids"); 27 + Ok((did, cids)) 24 28 } 25 29 26 - pub async fn get_blocks( 27 - State(state): State<AppState>, 28 - Query(query): Query<GetBlocksQuery>, 29 - ) -> Response { 30 - let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 31 - .fetch_optional(&state.db) 32 - .await 33 - .unwrap_or(None); 34 - if user_exists.is_none() { 35 - return (StatusCode::NOT_FOUND, "Repo not found").into_response(); 36 - } 37 - let cids_str: Vec<&str> = query.cids.split(',').collect(); 30 + pub async fn get_blocks(State(state): State<AppState>, RawQuery(query): RawQuery) -> Response { 31 + let query_string = match query { 32 + Some(q) => q, 33 + None => { 34 + return ( 35 + StatusCode::BAD_REQUEST, 36 + Json(json!({"error": "InvalidRequest", "message": "Missing query parameters"})), 37 + ) 38 + .into_response(); 39 + } 40 + }; 41 + 42 + let (did, cid_strings) = match parse_get_blocks_query(&query_string) { 43 + Ok(parsed) => parsed, 44 + Err(msg) => { 45 + return ( 46 + StatusCode::BAD_REQUEST, 47 + Json(json!({"error": "InvalidRequest", "message": msg})), 48 + ) 49 + .into_response(); 50 + } 51 + }; 52 + 53 + let _account = match assert_repo_availability(&state.db, &did, false).await { 54 + Ok(a) => a, 55 + Err(e) => return e.into_response(), 56 + }; 57 + 38 58 let mut cids = Vec::new(); 39 - for s in cids_str { 59 + for s in &cid_strings { 40 60 match Cid::from_str(s) { 41 61 Ok(cid) => cids.push(cid), 42 - Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(), 62 + Err(_) => return ( 63 + StatusCode::BAD_REQUEST, 64 + Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", s)})), 65 + ) 66 + .into_response(), 43 67 } 44 68 } 69 + 70 + if cids.is_empty() { 71 + return ( 72 + StatusCode::BAD_REQUEST, 73 + Json(json!({"error": "InvalidRequest", "message": "No CIDs provided"})), 74 + ) 75 + .into_response(); 76 + } 77 + 45 78 let blocks_res = state.block_store.get_many(&cids).await; 46 79 let blocks = match blocks_res { 47 80 Ok(blocks) => blocks, 48 81 Err(e) => { 49 82 error!("Failed to get blocks: {}", e); 50 - return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response(); 83 + return ( 84 + StatusCode::INTERNAL_SERVER_ERROR, 85 + Json(json!({"error": "InternalError", "message": "Failed to get blocks"})), 86 + ) 87 + .into_response(); 51 88 } 52 89 }; 53 - if cids.is_empty() { 54 - return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response(); 90 + 91 + let mut missing_cids: Vec<String> = Vec::new(); 92 + for (i, block_opt) in blocks.iter().enumerate() { 93 + if block_opt.is_none() { 94 + missing_cids.push(cids[i].to_string()); 95 + } 55 96 } 56 - let root_cid = cids[0]; 57 - let header = match encode_car_header(&root_cid) { 97 + if !missing_cids.is_empty() { 98 + return ( 99 + StatusCode::BAD_REQUEST, 100 + Json(json!({ 101 + "error": "InvalidRequest", 102 + "message": format!("Could not find blocks: {}", missing_cids.join(", ")) 103 + })), 104 + ) 105 + .into_response(); 106 + } 107 + 108 + let header = match crate::sync::car::encode_car_header_null_root() { 58 109 Ok(h) => h, 59 110 Err(e) => { 60 111 error!("Failed to encode CAR header: {}", e); 61 - return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to encode CAR").into_response(); 112 + return ( 113 + StatusCode::INTERNAL_SERVER_ERROR, 114 + Json(json!({"error": "InternalError", "message": "Failed to encode CAR"})), 115 + ) 116 + .into_response(); 62 117 } 63 118 }; 64 119 let mut car_bytes = header; ··· 97 152 State(state): State<AppState>, 98 153 Query(query): Query<GetRepoQuery>, 99 154 ) -> Response { 100 - let repo_row = sqlx::query!( 101 - r#" 102 - SELECT r.repo_root_cid 103 - FROM repos r 104 - JOIN users u ON u.id = r.user_id 105 - WHERE u.did = $1 106 - "#, 107 - query.did 108 - ) 109 - .fetch_optional(&state.db) 110 - .await 111 - .unwrap_or(None); 112 - let head_str = match repo_row { 113 - Some(r) => r.repo_root_cid, 155 + let account = match assert_repo_availability(&state.db, &query.did, false).await { 156 + Ok(a) => a, 157 + Err(e) => return e.into_response(), 158 + }; 159 + 160 + let head_str = match account.repo_root_cid { 161 + Some(cid) => cid, 114 162 None => { 115 - let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 116 - .fetch_optional(&state.db) 117 - .await 118 - .unwrap_or(None); 119 - if user_exists.is_none() { 120 - return ( 121 - StatusCode::NOT_FOUND, 122 - Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 123 - ) 124 - .into_response(); 125 - } else { 126 - return ( 127 - StatusCode::NOT_FOUND, 128 - Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 129 - ) 130 - .into_response(); 131 - } 163 + return ( 164 + StatusCode::BAD_REQUEST, 165 + Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 166 + ) 167 + .into_response(); 132 168 } 133 169 }; 170 + 134 171 let head_cid = match Cid::from_str(&head_str) { 135 172 Ok(c) => c, 136 173 Err(_) => { ··· 141 178 .into_response(); 142 179 } 143 180 }; 181 + 182 + if let Some(since) = &query.since { 183 + return get_repo_since(&state, &query.did, &head_cid, since).await; 184 + } 185 + 144 186 let mut car_bytes = match encode_car_header(&head_cid) { 145 187 Ok(h) => h, 146 188 Err(e) => { ··· 189 231 .into_response() 190 232 } 191 233 234 + async fn get_repo_since(state: &AppState, did: &str, head_cid: &Cid, since: &str) -> Response { 235 + let events = sqlx::query!( 236 + r#" 237 + SELECT blocks_cids, commit_cid 238 + FROM repo_seq 239 + WHERE did = $1 AND rev > $2 240 + ORDER BY seq DESC 241 + "#, 242 + did, 243 + since 244 + ) 245 + .fetch_all(&state.db) 246 + .await; 247 + 248 + let events = match events { 249 + Ok(e) => e, 250 + Err(e) => { 251 + error!("DB error in get_repo_since: {:?}", e); 252 + return ( 253 + StatusCode::INTERNAL_SERVER_ERROR, 254 + Json(json!({"error": "InternalError", "message": "Database error"})), 255 + ) 256 + .into_response(); 257 + } 258 + }; 259 + 260 + let mut block_cids: Vec<Cid> = Vec::new(); 261 + for event in &events { 262 + if let Some(cids) = &event.blocks_cids { 263 + for cid_str in cids { 264 + if let Ok(cid) = Cid::from_str(cid_str) 265 + && !block_cids.contains(&cid) 266 + { 267 + block_cids.push(cid); 268 + } 269 + } 270 + } 271 + if let Some(commit_cid_str) = &event.commit_cid 272 + && let Ok(cid) = Cid::from_str(commit_cid_str) 273 + && !block_cids.contains(&cid) 274 + { 275 + block_cids.push(cid); 276 + } 277 + } 278 + 279 + let mut car_bytes = match encode_car_header(head_cid) { 280 + Ok(h) => h, 281 + Err(e) => { 282 + return ( 283 + StatusCode::INTERNAL_SERVER_ERROR, 284 + Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})), 285 + ) 286 + .into_response(); 287 + } 288 + }; 289 + 290 + if block_cids.is_empty() { 291 + return ( 292 + StatusCode::OK, 293 + [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 294 + car_bytes, 295 + ) 296 + .into_response(); 297 + } 298 + 299 + let blocks = match state.block_store.get_many(&block_cids).await { 300 + Ok(b) => b, 301 + Err(e) => { 302 + error!("Block store error in get_repo_since: {:?}", e); 303 + return ( 304 + StatusCode::INTERNAL_SERVER_ERROR, 305 + Json(json!({"error": "InternalError", "message": "Failed to get blocks"})), 306 + ) 307 + .into_response(); 308 + } 309 + }; 310 + 311 + for (i, block_opt) in blocks.into_iter().enumerate() { 312 + if let Some(block) = block_opt { 313 + let cid = block_cids[i]; 314 + let cid_bytes = cid.to_bytes(); 315 + let total_len = cid_bytes.len() + block.len(); 316 + let mut writer = Vec::new(); 317 + crate::sync::car::write_varint(&mut writer, total_len as u64) 318 + .expect("Writing to Vec<u8> should never fail"); 319 + writer 320 + .write_all(&cid_bytes) 321 + .expect("Writing to Vec<u8> should never fail"); 322 + writer 323 + .write_all(&block) 324 + .expect("Writing to Vec<u8> should never fail"); 325 + car_bytes.extend_from_slice(&writer); 326 + } 327 + } 328 + 329 + ( 330 + StatusCode::OK, 331 + [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 332 + car_bytes, 333 + ) 334 + .into_response() 335 + } 336 + 192 337 fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) { 193 338 match value { 194 339 Ipld::Link(cid) => { ··· 224 369 use std::collections::BTreeMap; 225 370 use std::sync::Arc; 226 371 227 - let repo_row = sqlx::query!( 228 - r#" 229 - SELECT r.repo_root_cid 230 - FROM repos r 231 - JOIN users u ON u.id = r.user_id 232 - WHERE u.did = $1 233 - "#, 234 - query.did 235 - ) 236 - .fetch_optional(&state.db) 237 - .await 238 - .unwrap_or(None); 239 - let commit_cid_str = match repo_row { 240 - Some(r) => r.repo_root_cid, 372 + let account = match assert_repo_availability(&state.db, &query.did, false).await { 373 + Ok(a) => a, 374 + Err(e) => return e.into_response(), 375 + }; 376 + 377 + let commit_cid_str = match account.repo_root_cid { 378 + Some(cid) => cid, 241 379 None => { 242 380 return ( 243 - StatusCode::NOT_FOUND, 244 - Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 381 + StatusCode::BAD_REQUEST, 382 + Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 245 383 ) 246 384 .into_response(); 247 385 }
+120 -2
src/sync/subscribe_repos.rs
··· 1 1 use crate::state::AppState; 2 2 use crate::sync::firehose::SequencedEvent; 3 3 use crate::sync::util::{ 4 - format_event_for_sending, format_event_with_prefetched_blocks, prefetch_blocks_for_events, 4 + format_error_frame, format_event_for_sending, format_event_with_prefetched_blocks, 5 + format_info_frame, prefetch_blocks_for_events, 5 6 }; 6 7 use axum::{ 7 8 extract::{Query, State, ws::Message, ws::WebSocket, ws::WebSocketUpgrade}, ··· 55 56 info!(subscribers = count, "Firehose subscriber disconnected"); 56 57 } 57 58 59 + fn get_backfill_hours() -> i64 { 60 + std::env::var("FIREHOSE_BACKFILL_HOURS") 61 + .ok() 62 + .and_then(|v| v.parse().ok()) 63 + .unwrap_or(72) 64 + } 65 + 58 66 async fn handle_socket_inner( 59 67 socket: &mut WebSocket, 60 68 state: &AppState, 61 69 params: SubscribeReposParams, 62 70 ) -> Result<(), ()> { 71 + let mut rx = state.firehose_tx.subscribe(); 72 + let mut last_seen: i64 = -1; 73 + 63 74 if let Some(cursor) = params.cursor { 75 + let current_seq = sqlx::query_scalar!("SELECT MAX(seq) FROM repo_seq") 76 + .fetch_one(&state.db) 77 + .await 78 + .ok() 79 + .flatten() 80 + .unwrap_or(0); 81 + 82 + if cursor > current_seq { 83 + if let Ok(error_bytes) = 84 + format_error_frame("FutureCursor", Some("Cursor in the future.")) 85 + { 86 + let _ = socket.send(Message::Binary(error_bytes.into())).await; 87 + } 88 + socket.close().await.ok(); 89 + return Err(()); 90 + } 91 + 92 + let backfill_time = chrono::Utc::now() - chrono::Duration::hours(get_backfill_hours()); 93 + 94 + let first_event = sqlx::query_as!( 95 + SequencedEvent, 96 + r#" 97 + SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev 98 + FROM repo_seq 99 + WHERE seq > $1 100 + ORDER BY seq ASC 101 + LIMIT 1 102 + "#, 103 + cursor 104 + ) 105 + .fetch_optional(&state.db) 106 + .await 107 + .ok() 108 + .flatten(); 109 + 64 110 let mut current_cursor = cursor; 111 + 112 + if let Some(ref event) = first_event 113 + && event.created_at < backfill_time 114 + { 115 + if let Ok(info_bytes) = format_info_frame( 116 + "OutdatedCursor", 117 + Some("Requested cursor exceeded limit. Possibly missing events"), 118 + ) { 119 + let _ = socket.send(Message::Binary(info_bytes.into())).await; 120 + } 121 + 122 + let earliest = sqlx::query_scalar!( 123 + "SELECT MIN(seq) FROM repo_seq WHERE created_at >= $1", 124 + backfill_time 125 + ) 126 + .fetch_one(&state.db) 127 + .await 128 + .ok() 129 + .flatten(); 130 + 131 + if let Some(earliest_seq) = earliest { 132 + current_cursor = earliest_seq - 1; 133 + } 134 + } 135 + 136 + last_seen = current_cursor; 137 + 65 138 loop { 66 139 let events = sqlx::query_as!( 67 140 SequencedEvent, ··· 93 166 }; 94 167 for event in events { 95 168 current_cursor = event.seq; 169 + last_seen = event.seq; 96 170 let bytes = 97 171 match format_event_with_prefetched_blocks(event, &prefetched).await { 98 172 Ok(b) => b, ··· 118 192 } 119 193 } 120 194 } 195 + 196 + let cutover_events = sqlx::query_as!( 197 + SequencedEvent, 198 + r#" 199 + SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev 200 + FROM repo_seq 201 + WHERE seq > $1 202 + ORDER BY seq ASC 203 + "#, 204 + last_seen 205 + ) 206 + .fetch_all(&state.db) 207 + .await; 208 + 209 + if let Ok(events) = cutover_events 210 + && !events.is_empty() 211 + { 212 + let prefetched = match prefetch_blocks_for_events(state, &events).await { 213 + Ok(blocks) => blocks, 214 + Err(e) => { 215 + error!("Failed to prefetch blocks for cutover: {}", e); 216 + socket.close().await.ok(); 217 + return Err(()); 218 + } 219 + }; 220 + for event in events { 221 + last_seen = event.seq; 222 + let bytes = match format_event_with_prefetched_blocks(event, &prefetched).await { 223 + Ok(b) => b, 224 + Err(e) => { 225 + warn!("Failed to format cutover event: {}", e); 226 + return Err(()); 227 + } 228 + }; 229 + if let Err(e) = socket.send(Message::Binary(bytes.into())).await { 230 + warn!("Failed to send cutover event: {}", e); 231 + return Err(()); 232 + } 233 + crate::metrics::record_firehose_event(); 234 + } 235 + } 121 236 } 122 - let mut rx = state.firehose_tx.subscribe(); 123 237 let max_lag_before_disconnect: u64 = std::env::var("FIREHOSE_MAX_LAG") 124 238 .ok() 125 239 .and_then(|v| v.parse().ok()) ··· 129 243 result = rx.recv() => { 130 244 match result { 131 245 Ok(event) => { 246 + if event.seq <= last_seen { 247 + continue; 248 + } 249 + last_seen = event.seq; 132 250 if let Err(e) = send_event(socket, state, event).await { 133 251 warn!("Failed to send event: {}", e); 134 252 break;
+179 -1
src/sync/util.rs
··· 1 1 use crate::state::AppState; 2 2 use crate::sync::firehose::SequencedEvent; 3 - use crate::sync::frame::{AccountFrame, CommitFrame, FrameHeader, IdentityFrame, SyncFrame}; 3 + use crate::sync::frame::{ 4 + AccountFrame, CommitFrame, ErrorFrameBody, ErrorFrameHeader, FrameHeader, IdentityFrame, 5 + InfoFrame, SyncFrame, 6 + }; 7 + use axum::Json; 8 + use axum::http::StatusCode; 9 + use axum::response::{IntoResponse, Response}; 4 10 use bytes::Bytes; 5 11 use cid::Cid; 6 12 use iroh_car::{CarHeader, CarWriter}; 7 13 use jacquard_repo::commit::Commit; 8 14 use jacquard_repo::storage::BlockStore; 15 + use serde::Serialize; 16 + use serde_json::json; 17 + use sqlx::PgPool; 9 18 use std::collections::{BTreeMap, HashMap}; 10 19 use std::io::Cursor; 11 20 use std::str::FromStr; 12 21 use tokio::io::AsyncWriteExt; 22 + 23 + #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 24 + #[serde(rename_all = "lowercase")] 25 + pub enum AccountStatus { 26 + Active, 27 + Takendown, 28 + Suspended, 29 + Deactivated, 30 + Deleted, 31 + } 32 + 33 + impl AccountStatus { 34 + pub fn as_str(&self) -> Option<&'static str> { 35 + match self { 36 + AccountStatus::Active => None, 37 + AccountStatus::Takendown => Some("takendown"), 38 + AccountStatus::Suspended => Some("suspended"), 39 + AccountStatus::Deactivated => Some("deactivated"), 40 + AccountStatus::Deleted => Some("deleted"), 41 + } 42 + } 43 + 44 + pub fn is_active(&self) -> bool { 45 + matches!(self, AccountStatus::Active) 46 + } 47 + } 48 + 49 + pub struct RepoAccount { 50 + pub did: String, 51 + pub user_id: uuid::Uuid, 52 + pub status: AccountStatus, 53 + pub repo_root_cid: Option<String>, 54 + } 55 + 56 + pub enum RepoAvailabilityError { 57 + NotFound(String), 58 + Takendown(String), 59 + Deactivated(String), 60 + Internal(String), 61 + } 62 + 63 + impl IntoResponse for RepoAvailabilityError { 64 + fn into_response(self) -> Response { 65 + match self { 66 + RepoAvailabilityError::NotFound(did) => ( 67 + StatusCode::BAD_REQUEST, 68 + Json(json!({ 69 + "error": "RepoNotFound", 70 + "message": format!("Could not find repo for DID: {}", did) 71 + })), 72 + ) 73 + .into_response(), 74 + RepoAvailabilityError::Takendown(did) => ( 75 + StatusCode::BAD_REQUEST, 76 + Json(json!({ 77 + "error": "RepoTakendown", 78 + "message": format!("Repo has been takendown: {}", did) 79 + })), 80 + ) 81 + .into_response(), 82 + RepoAvailabilityError::Deactivated(did) => ( 83 + StatusCode::BAD_REQUEST, 84 + Json(json!({ 85 + "error": "RepoDeactivated", 86 + "message": format!("Repo has been deactivated: {}", did) 87 + })), 88 + ) 89 + .into_response(), 90 + RepoAvailabilityError::Internal(msg) => ( 91 + StatusCode::INTERNAL_SERVER_ERROR, 92 + Json(json!({ 93 + "error": "InternalError", 94 + "message": msg 95 + })), 96 + ) 97 + .into_response(), 98 + } 99 + } 100 + } 101 + 102 + pub async fn get_account_with_status( 103 + db: &PgPool, 104 + did: &str, 105 + ) -> Result<Option<RepoAccount>, sqlx::Error> { 106 + let row = sqlx::query!( 107 + r#" 108 + SELECT u.id, u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid 109 + FROM users u 110 + LEFT JOIN repos r ON r.user_id = u.id 111 + WHERE u.did = $1 112 + "#, 113 + did 114 + ) 115 + .fetch_optional(db) 116 + .await?; 117 + 118 + Ok(row.map(|r| { 119 + let status = if r.takedown_ref.is_some() { 120 + AccountStatus::Takendown 121 + } else if r.deactivated_at.is_some() { 122 + AccountStatus::Deactivated 123 + } else { 124 + AccountStatus::Active 125 + }; 126 + 127 + RepoAccount { 128 + did: r.did, 129 + user_id: r.id, 130 + status, 131 + repo_root_cid: Some(r.repo_root_cid), 132 + } 133 + })) 134 + } 135 + 136 + pub async fn assert_repo_availability( 137 + db: &PgPool, 138 + did: &str, 139 + is_admin_or_self: bool, 140 + ) -> Result<RepoAccount, RepoAvailabilityError> { 141 + let account = get_account_with_status(db, did) 142 + .await 143 + .map_err(|e| RepoAvailabilityError::Internal(e.to_string()))?; 144 + 145 + let account = match account { 146 + Some(a) => a, 147 + None => return Err(RepoAvailabilityError::NotFound(did.to_string())), 148 + }; 149 + 150 + if is_admin_or_self { 151 + return Ok(account); 152 + } 153 + 154 + match account.status { 155 + AccountStatus::Takendown => return Err(RepoAvailabilityError::Takendown(did.to_string())), 156 + AccountStatus::Deactivated => { 157 + return Err(RepoAvailabilityError::Deactivated(did.to_string())); 158 + } 159 + _ => {} 160 + } 161 + 162 + Ok(account) 163 + } 13 164 14 165 fn extract_rev_from_commit_bytes(commit_bytes: &[u8]) -> Option<String> { 15 166 Commit::from_cbor(commit_bytes) ··· 351 502 serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 352 503 Ok(bytes) 353 504 } 505 + 506 + pub fn format_info_frame(name: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> { 507 + let header = FrameHeader { 508 + op: 1, 509 + t: "#info".to_string(), 510 + }; 511 + let frame = InfoFrame { 512 + name: name.to_string(), 513 + message: message.map(String::from), 514 + }; 515 + let mut bytes = Vec::new(); 516 + serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 517 + serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 518 + Ok(bytes) 519 + } 520 + 521 + pub fn format_error_frame(error: &str, message: Option<&str>) -> Result<Vec<u8>, anyhow::Error> { 522 + let header = ErrorFrameHeader { op: -1 }; 523 + let frame = ErrorFrameBody { 524 + error: error.to_string(), 525 + message: message.map(String::from), 526 + }; 527 + let mut bytes = Vec::new(); 528 + serde_ipld_dagcbor::to_writer(&mut bytes, &header)?; 529 + serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 530 + Ok(bytes) 531 + }
+69
src/util.rs
··· 73 73 .ok_or(DbLookupError::NotFound) 74 74 } 75 75 76 + pub fn parse_repeated_query_param(query: Option<&str>, key: &str) -> Vec<String> { 77 + query 78 + .map(|q| { 79 + let mut values = Vec::new(); 80 + for pair in q.split('&') { 81 + if let Some((k, v)) = pair.split_once('=') 82 + && k == key 83 + && let Ok(decoded) = urlencoding::decode(v) 84 + { 85 + let decoded = decoded.into_owned(); 86 + if decoded.contains(',') { 87 + for part in decoded.split(',') { 88 + let trimmed = part.trim(); 89 + if !trimmed.is_empty() { 90 + values.push(trimmed.to_string()); 91 + } 92 + } 93 + } else if !decoded.is_empty() { 94 + values.push(decoded); 95 + } 96 + } 97 + } 98 + values 99 + }) 100 + .unwrap_or_default() 101 + } 102 + 76 103 pub fn extract_client_ip(headers: &HeaderMap) -> String { 77 104 if let Some(forwarded) = headers.get("x-forwarded-for") 78 105 && let Ok(value) = forwarded.to_str() ··· 91 118 #[cfg(test)] 92 119 mod tests { 93 120 use super::*; 121 + 122 + #[test] 123 + fn test_parse_repeated_query_param_repeated() { 124 + let query = "did=test&cids=a&cids=b&cids=c"; 125 + let result = parse_repeated_query_param(Some(query), "cids"); 126 + assert_eq!(result, vec!["a", "b", "c"]); 127 + } 128 + 129 + #[test] 130 + fn test_parse_repeated_query_param_comma_separated() { 131 + let query = "did=test&cids=a,b,c"; 132 + let result = parse_repeated_query_param(Some(query), "cids"); 133 + assert_eq!(result, vec!["a", "b", "c"]); 134 + } 135 + 136 + #[test] 137 + fn test_parse_repeated_query_param_mixed() { 138 + let query = "did=test&cids=a,b&cids=c"; 139 + let result = parse_repeated_query_param(Some(query), "cids"); 140 + assert_eq!(result, vec!["a", "b", "c"]); 141 + } 142 + 143 + #[test] 144 + fn test_parse_repeated_query_param_single() { 145 + let query = "did=test&cids=a"; 146 + let result = parse_repeated_query_param(Some(query), "cids"); 147 + assert_eq!(result, vec!["a"]); 148 + } 149 + 150 + #[test] 151 + fn test_parse_repeated_query_param_empty() { 152 + let query = "did=test"; 153 + let result = parse_repeated_query_param(Some(query), "cids"); 154 + assert!(result.is_empty()); 155 + } 156 + 157 + #[test] 158 + fn test_parse_repeated_query_param_url_encoded() { 159 + let query = "did=test&cids=bafyreib%2Btest"; 160 + let result = parse_repeated_query_param(Some(query), "cids"); 161 + assert_eq!(result, vec!["bafyreib+test"]); 162 + } 94 163 95 164 #[test] 96 165 fn test_generate_token_code() {
+194 -5
tests/firehose_validation.rs
··· 200 200 app_port() 201 201 ); 202 202 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 203 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 203 204 204 205 let post_text = "Testing firehose validation!"; 205 206 let post_payload = json!({ ··· 224 225 assert_eq!(res.status(), StatusCode::OK); 225 226 226 227 let mut frame_opt: Option<(FrameHeader, CommitFrame)> = None; 227 - let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 228 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 228 229 loop { 229 230 let msg = ws_stream.next().await.unwrap().unwrap(); 230 231 let raw_bytes = match msg { ··· 392 393 app_port() 393 394 ); 394 395 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 396 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 395 397 396 398 let update_payload = json!({ 397 399 "repo": did, ··· 415 417 assert_eq!(res.status(), StatusCode::OK); 416 418 417 419 let mut frame_opt: Option<CommitFrame> = None; 418 - let timeout = tokio::time::timeout(std::time::Duration::from_secs(15), async { 420 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(20), async { 419 421 loop { 420 422 let msg = match ws_stream.next().await { 421 423 Some(Ok(m)) => m, ··· 472 474 app_port() 473 475 ); 474 476 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 477 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 475 478 476 479 let post_payload = json!({ 477 480 "repo": did, ··· 494 497 .expect("Failed to create first post"); 495 498 496 499 let mut first_frame_opt: Option<CommitFrame> = None; 497 - let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 500 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 498 501 loop { 499 502 let msg = ws_stream.next().await.unwrap().unwrap(); 500 503 let raw_bytes = match msg { ··· 544 547 .expect("Failed to create second post"); 545 548 546 549 let mut second_frame_opt: Option<CommitFrame> = None; 547 - let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 550 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 548 551 loop { 549 552 let msg = ws_stream.next().await.unwrap().unwrap(); 550 553 let raw_bytes = match msg { ··· 593 596 app_port() 594 597 ); 595 598 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 599 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 596 600 597 601 let post_payload = json!({ 598 602 "repo": did, ··· 615 619 .expect("Failed to create post"); 616 620 617 621 let mut raw_bytes_opt: Option<Vec<u8>> = None; 618 - let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 622 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 619 623 loop { 620 624 let msg = ws_stream.next().await.unwrap().unwrap(); 621 625 let raw = match msg { ··· 661 665 662 666 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 663 667 } 668 + 669 + #[derive(Debug, Deserialize)] 670 + struct ErrorFrameHeader { 671 + op: i64, 672 + } 673 + 674 + #[derive(Debug, Deserialize)] 675 + struct ErrorFrameBody { 676 + error: String, 677 + #[allow(dead_code)] 678 + message: Option<String>, 679 + } 680 + 681 + #[derive(Debug, Deserialize)] 682 + struct InfoFrameHeader { 683 + #[allow(dead_code)] 684 + op: i64, 685 + t: String, 686 + } 687 + 688 + #[derive(Debug, Deserialize)] 689 + struct InfoFrameBody { 690 + name: String, 691 + #[allow(dead_code)] 692 + message: Option<String>, 693 + } 694 + 695 + fn parse_error_frame(bytes: &[u8]) -> Result<(ErrorFrameHeader, ErrorFrameBody), String> { 696 + let header_len = find_cbor_map_end(bytes)?; 697 + let header: ErrorFrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len]) 698 + .map_err(|e| format!("Failed to parse error header: {:?}", e))?; 699 + 700 + if header.op != -1 { 701 + return Err(format!("Not an error frame, op: {}", header.op)); 702 + } 703 + 704 + let remaining = &bytes[header_len..]; 705 + let body: ErrorFrameBody = serde_ipld_dagcbor::from_slice(remaining) 706 + .map_err(|e| format!("Failed to parse error body: {:?}", e))?; 707 + 708 + Ok((header, body)) 709 + } 710 + 711 + fn parse_info_frame(bytes: &[u8]) -> Result<(InfoFrameHeader, InfoFrameBody), String> { 712 + let header_len = find_cbor_map_end(bytes)?; 713 + let header: InfoFrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len]) 714 + .map_err(|e| format!("Failed to parse info header: {:?}", e))?; 715 + 716 + if header.t != "#info" { 717 + return Err(format!("Not an info frame, t: {}", header.t)); 718 + } 719 + 720 + let remaining = &bytes[header_len..]; 721 + let body: InfoFrameBody = serde_ipld_dagcbor::from_slice(remaining) 722 + .map_err(|e| format!("Failed to parse info body: {:?}", e))?; 723 + 724 + Ok((header, body)) 725 + } 726 + 727 + #[tokio::test] 728 + async fn test_firehose_future_cursor_error() { 729 + let _ = base_url().await; 730 + 731 + let future_cursor = 9999999999i64; 732 + let url = format!( 733 + "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos?cursor={}", 734 + app_port(), 735 + future_cursor 736 + ); 737 + 738 + let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 739 + 740 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(10), async { 741 + loop { 742 + match ws_stream.next().await { 743 + Some(Ok(tungstenite::Message::Binary(bin))) => { 744 + if let Ok((header, body)) = parse_error_frame(&bin) { 745 + println!("Received error frame: {:?} {:?}", header, body); 746 + assert_eq!(header.op, -1, "Error frame op should be -1"); 747 + assert_eq!(body.error, "FutureCursor", "Error should be FutureCursor"); 748 + return true; 749 + } 750 + } 751 + Some(Ok(tungstenite::Message::Close(_))) => { 752 + println!("Connection closed"); 753 + return false; 754 + } 755 + None => { 756 + println!("Stream ended"); 757 + return false; 758 + } 759 + _ => continue, 760 + } 761 + } 762 + }) 763 + .await; 764 + 765 + match timeout { 766 + Ok(received_error) => { 767 + assert!( 768 + received_error, 769 + "Should have received FutureCursor error frame before connection closed" 770 + ); 771 + } 772 + Err(_) => { 773 + panic!( 774 + "Timed out waiting for FutureCursor error - connection should close quickly with error" 775 + ); 776 + } 777 + } 778 + } 779 + 780 + #[tokio::test] 781 + async fn test_firehose_outdated_cursor_info() { 782 + let client = client(); 783 + let (token, did) = create_account_and_login(&client).await; 784 + 785 + let post_payload = json!({ 786 + "repo": did, 787 + "collection": "app.bsky.feed.post", 788 + "record": { 789 + "$type": "app.bsky.feed.post", 790 + "text": "Post for outdated cursor test", 791 + "createdAt": chrono::Utc::now().to_rfc3339(), 792 + } 793 + }); 794 + let _ = client 795 + .post(format!( 796 + "{}/xrpc/com.atproto.repo.createRecord", 797 + base_url().await 798 + )) 799 + .bearer_auth(&token) 800 + .json(&post_payload) 801 + .send() 802 + .await 803 + .expect("Failed to create post"); 804 + 805 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 806 + 807 + let outdated_cursor = 1i64; 808 + let url = format!( 809 + "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos?cursor={}", 810 + app_port(), 811 + outdated_cursor 812 + ); 813 + 814 + let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 815 + 816 + let mut found_info = false; 817 + let mut found_commit = false; 818 + 819 + let timeout = tokio::time::timeout(std::time::Duration::from_secs(15), async { 820 + loop { 821 + match ws_stream.next().await { 822 + Some(Ok(tungstenite::Message::Binary(bin))) => { 823 + if let Ok((header, body)) = parse_info_frame(&bin) { 824 + println!("Received info frame: {:?} {:?}", header, body); 825 + if body.name == "OutdatedCursor" { 826 + found_info = true; 827 + println!("Found OutdatedCursor info frame!"); 828 + } 829 + } else if let Ok((_, frame)) = parse_frame(&bin) { 830 + if frame.repo == did { 831 + found_commit = true; 832 + println!("Found commit for our DID"); 833 + } 834 + } 835 + if found_commit { 836 + break; 837 + } 838 + } 839 + Some(Ok(tungstenite::Message::Close(_))) => break, 840 + None => break, 841 + _ => continue, 842 + } 843 + } 844 + }) 845 + .await; 846 + 847 + assert!(timeout.is_ok(), "Timed out"); 848 + assert!( 849 + found_commit, 850 + "Should have received commits even with outdated cursor" 851 + ); 852 + }
+38
tests/helpers/mod.rs
··· 214 214 body["cid"].as_str().unwrap().to_string(), 215 215 ) 216 216 } 217 + 218 + #[allow(dead_code)] 219 + pub async fn set_account_takedown(did: &str, takedown_ref: Option<&str>) { 220 + let conn_str = get_db_connection_string().await; 221 + let pool = sqlx::postgres::PgPoolOptions::new() 222 + .max_connections(2) 223 + .connect(&conn_str) 224 + .await 225 + .expect("Failed to connect to test database"); 226 + sqlx::query!( 227 + "UPDATE users SET takedown_ref = $1 WHERE did = $2", 228 + takedown_ref, 229 + did 230 + ) 231 + .execute(&pool) 232 + .await 233 + .expect("Failed to update takedown_ref"); 234 + } 235 + 236 + #[allow(dead_code)] 237 + pub async fn set_account_deactivated(did: &str, deactivated: bool) { 238 + let conn_str = get_db_connection_string().await; 239 + let pool = sqlx::postgres::PgPoolOptions::new() 240 + .max_connections(2) 241 + .connect(&conn_str) 242 + .await 243 + .expect("Failed to connect to test database"); 244 + let deactivated_at: Option<chrono::DateTime<Utc>> = 245 + if deactivated { Some(Utc::now()) } else { None }; 246 + sqlx::query!( 247 + "UPDATE users SET deactivated_at = $1 WHERE did = $2", 248 + deactivated_at, 249 + did 250 + ) 251 + .execute(&pool) 252 + .await 253 + .expect("Failed to update deactivated_at"); 254 + }
+1 -1
tests/sync_blob.rs
··· 50 50 .send() 51 51 .await 52 52 .expect("Failed to send request"); 53 - assert_eq!(res.status(), StatusCode::NOT_FOUND); 53 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 54 54 let body: Value = res.json().await.expect("Response was not valid JSON"); 55 55 assert_eq!(body["error"], "RepoNotFound"); 56 56 }
+443
tests/sync_conformance.rs
··· 1 + mod common; 2 + mod helpers; 3 + 4 + use common::*; 5 + use helpers::*; 6 + use reqwest::StatusCode; 7 + use serde_json::Value; 8 + 9 + #[tokio::test] 10 + async fn test_get_repo_takendown_returns_error() { 11 + let client = client(); 12 + let (_, did) = create_account_and_login(&client).await; 13 + 14 + set_account_takedown(&did, Some("test-takedown-ref")).await; 15 + 16 + let res = client 17 + .get(format!( 18 + "{}/xrpc/com.atproto.sync.getRepo", 19 + base_url().await 20 + )) 21 + .query(&[("did", did.as_str())]) 22 + .send() 23 + .await 24 + .expect("Failed to send request"); 25 + 26 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 27 + let body: Value = res.json().await.expect("Response was not valid JSON"); 28 + assert_eq!(body["error"], "RepoTakendown"); 29 + } 30 + 31 + #[tokio::test] 32 + async fn test_get_repo_deactivated_returns_error() { 33 + let client = client(); 34 + let (_, did) = create_account_and_login(&client).await; 35 + 36 + set_account_deactivated(&did, true).await; 37 + 38 + let res = client 39 + .get(format!( 40 + "{}/xrpc/com.atproto.sync.getRepo", 41 + base_url().await 42 + )) 43 + .query(&[("did", did.as_str())]) 44 + .send() 45 + .await 46 + .expect("Failed to send request"); 47 + 48 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 49 + let body: Value = res.json().await.expect("Response was not valid JSON"); 50 + assert_eq!(body["error"], "RepoDeactivated"); 51 + } 52 + 53 + #[tokio::test] 54 + async fn test_get_latest_commit_takendown_returns_error() { 55 + let client = client(); 56 + let (_, did) = create_account_and_login(&client).await; 57 + 58 + set_account_takedown(&did, Some("test-takedown-ref")).await; 59 + 60 + let res = client 61 + .get(format!( 62 + "{}/xrpc/com.atproto.sync.getLatestCommit", 63 + base_url().await 64 + )) 65 + .query(&[("did", did.as_str())]) 66 + .send() 67 + .await 68 + .expect("Failed to send request"); 69 + 70 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 71 + let body: Value = res.json().await.expect("Response was not valid JSON"); 72 + assert_eq!(body["error"], "RepoTakendown"); 73 + } 74 + 75 + #[tokio::test] 76 + async fn test_get_blocks_takendown_returns_error() { 77 + let client = client(); 78 + let (_, did) = create_account_and_login(&client).await; 79 + 80 + let commit_res = client 81 + .get(format!( 82 + "{}/xrpc/com.atproto.sync.getLatestCommit", 83 + base_url().await 84 + )) 85 + .query(&[("did", did.as_str())]) 86 + .send() 87 + .await 88 + .expect("Failed to get commit"); 89 + let commit_body: Value = commit_res.json().await.unwrap(); 90 + let cid = commit_body["cid"].as_str().unwrap(); 91 + 92 + set_account_takedown(&did, Some("test-takedown-ref")).await; 93 + 94 + let res = client 95 + .get(format!( 96 + "{}/xrpc/com.atproto.sync.getBlocks", 97 + base_url().await 98 + )) 99 + .query(&[("did", did.as_str()), ("cids", cid)]) 100 + .send() 101 + .await 102 + .expect("Failed to send request"); 103 + 104 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 105 + let body: Value = res.json().await.expect("Response was not valid JSON"); 106 + assert_eq!(body["error"], "RepoTakendown"); 107 + } 108 + 109 + #[tokio::test] 110 + async fn test_get_repo_status_shows_takendown_status() { 111 + let client = client(); 112 + let (_, did) = create_account_and_login(&client).await; 113 + 114 + set_account_takedown(&did, Some("test-takedown-ref")).await; 115 + 116 + let res = client 117 + .get(format!( 118 + "{}/xrpc/com.atproto.sync.getRepoStatus", 119 + base_url().await 120 + )) 121 + .query(&[("did", did.as_str())]) 122 + .send() 123 + .await 124 + .expect("Failed to send request"); 125 + 126 + assert_eq!(res.status(), StatusCode::OK); 127 + let body: Value = res.json().await.expect("Response was not valid JSON"); 128 + assert_eq!(body["active"], false); 129 + assert_eq!(body["status"], "takendown"); 130 + assert!(body.get("rev").is_none() || body["rev"].is_null()); 131 + } 132 + 133 + #[tokio::test] 134 + async fn test_get_repo_status_shows_deactivated_status() { 135 + let client = client(); 136 + let (_, did) = create_account_and_login(&client).await; 137 + 138 + set_account_deactivated(&did, true).await; 139 + 140 + let res = client 141 + .get(format!( 142 + "{}/xrpc/com.atproto.sync.getRepoStatus", 143 + base_url().await 144 + )) 145 + .query(&[("did", did.as_str())]) 146 + .send() 147 + .await 148 + .expect("Failed to send request"); 149 + 150 + assert_eq!(res.status(), StatusCode::OK); 151 + let body: Value = res.json().await.expect("Response was not valid JSON"); 152 + assert_eq!(body["active"], false); 153 + assert_eq!(body["status"], "deactivated"); 154 + } 155 + 156 + #[tokio::test] 157 + async fn test_list_repos_shows_status_field() { 158 + let client = client(); 159 + let (_, did) = create_account_and_login(&client).await; 160 + 161 + set_account_takedown(&did, Some("test-takedown-ref")).await; 162 + 163 + let res = client 164 + .get(format!( 165 + "{}/xrpc/com.atproto.sync.listRepos", 166 + base_url().await 167 + )) 168 + .send() 169 + .await 170 + .expect("Failed to send request"); 171 + 172 + assert_eq!(res.status(), StatusCode::OK); 173 + let body: Value = res.json().await.expect("Response was not valid JSON"); 174 + let repos = body["repos"].as_array().unwrap(); 175 + 176 + let takendown_repo = repos.iter().find(|r| r["did"] == did); 177 + assert!(takendown_repo.is_some(), "Takendown repo should be in list"); 178 + let repo = takendown_repo.unwrap(); 179 + assert_eq!(repo["active"], false); 180 + assert_eq!(repo["status"], "takendown"); 181 + } 182 + 183 + #[tokio::test] 184 + async fn test_get_blob_takendown_returns_error() { 185 + let client = client(); 186 + let (jwt, did) = create_account_and_login(&client).await; 187 + 188 + let blob_res = client 189 + .post(format!( 190 + "{}/xrpc/com.atproto.repo.uploadBlob", 191 + base_url().await 192 + )) 193 + .header("Content-Type", "image/png") 194 + .bearer_auth(&jwt) 195 + .body(vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]) 196 + .send() 197 + .await 198 + .expect("Failed to upload blob"); 199 + let blob_body: Value = blob_res.json().await.unwrap(); 200 + let cid = blob_body["blob"]["ref"]["$link"].as_str().unwrap(); 201 + 202 + set_account_takedown(&did, Some("test-takedown-ref")).await; 203 + 204 + let res = client 205 + .get(format!( 206 + "{}/xrpc/com.atproto.sync.getBlob", 207 + base_url().await 208 + )) 209 + .query(&[("did", did.as_str()), ("cid", cid)]) 210 + .send() 211 + .await 212 + .expect("Failed to send request"); 213 + 214 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 215 + let body: Value = res.json().await.expect("Response was not valid JSON"); 216 + assert_eq!(body["error"], "RepoTakendown"); 217 + } 218 + 219 + #[tokio::test] 220 + async fn test_get_blob_has_security_headers() { 221 + let client = client(); 222 + let (jwt, did) = create_account_and_login(&client).await; 223 + 224 + let blob_res = client 225 + .post(format!( 226 + "{}/xrpc/com.atproto.repo.uploadBlob", 227 + base_url().await 228 + )) 229 + .header("Content-Type", "image/png") 230 + .bearer_auth(&jwt) 231 + .body(vec![0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]) 232 + .send() 233 + .await 234 + .expect("Failed to upload blob"); 235 + let blob_body: Value = blob_res.json().await.unwrap(); 236 + let cid = blob_body["blob"]["ref"]["$link"].as_str().unwrap(); 237 + 238 + let res = client 239 + .get(format!( 240 + "{}/xrpc/com.atproto.sync.getBlob", 241 + base_url().await 242 + )) 243 + .query(&[("did", did.as_str()), ("cid", cid)]) 244 + .send() 245 + .await 246 + .expect("Failed to send request"); 247 + 248 + assert_eq!(res.status(), StatusCode::OK); 249 + 250 + let headers = res.headers(); 251 + assert_eq!( 252 + headers 253 + .get("x-content-type-options") 254 + .map(|v| v.to_str().unwrap()), 255 + Some("nosniff"), 256 + "Missing x-content-type-options: nosniff header" 257 + ); 258 + assert_eq!( 259 + headers 260 + .get("content-security-policy") 261 + .map(|v| v.to_str().unwrap()), 262 + Some("default-src 'none'; sandbox"), 263 + "Missing content-security-policy header" 264 + ); 265 + assert!( 266 + headers.get("content-length").is_some(), 267 + "Missing content-length header" 268 + ); 269 + } 270 + 271 + #[tokio::test] 272 + async fn test_get_blocks_missing_cids_returns_error() { 273 + let client = client(); 274 + let (_, did) = create_account_and_login(&client).await; 275 + 276 + let fake_cid = "bafyreif2pall7dybz7vecqka3zo24irdwabwdi4wc55jznaq75q7eaavvu"; 277 + 278 + let res = client 279 + .get(format!( 280 + "{}/xrpc/com.atproto.sync.getBlocks", 281 + base_url().await 282 + )) 283 + .query(&[("did", did.as_str()), ("cids", fake_cid)]) 284 + .send() 285 + .await 286 + .expect("Failed to send request"); 287 + 288 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 289 + let body: Value = res.json().await.expect("Response was not valid JSON"); 290 + assert_eq!(body["error"], "InvalidRequest"); 291 + assert!( 292 + body["message"] 293 + .as_str() 294 + .unwrap() 295 + .contains("Could not find blocks"), 296 + "Error message should mention missing blocks" 297 + ); 298 + } 299 + 300 + #[tokio::test] 301 + async fn test_get_blocks_accepts_array_format() { 302 + let client = client(); 303 + let (_, did) = create_account_and_login(&client).await; 304 + 305 + let commit_res = client 306 + .get(format!( 307 + "{}/xrpc/com.atproto.sync.getLatestCommit", 308 + base_url().await 309 + )) 310 + .query(&[("did", did.as_str())]) 311 + .send() 312 + .await 313 + .expect("Failed to get commit"); 314 + let commit_body: Value = commit_res.json().await.unwrap(); 315 + let cid = commit_body["cid"].as_str().unwrap(); 316 + 317 + let url = format!( 318 + "{}/xrpc/com.atproto.sync.getBlocks?did={}&cids={}&cids={}", 319 + base_url().await, 320 + did, 321 + cid, 322 + cid 323 + ); 324 + let res = client 325 + .get(&url) 326 + .send() 327 + .await 328 + .expect("Failed to send request"); 329 + 330 + assert_eq!(res.status(), StatusCode::OK); 331 + let content_type = res.headers().get("content-type").unwrap().to_str().unwrap(); 332 + assert!( 333 + content_type.contains("application/vnd.ipld.car"), 334 + "Response should be a CAR file" 335 + ); 336 + } 337 + 338 + #[tokio::test] 339 + async fn test_get_repo_since_returns_partial() { 340 + let client = client(); 341 + let (jwt, did) = create_account_and_login(&client).await; 342 + 343 + let initial_commit_res = client 344 + .get(format!( 345 + "{}/xrpc/com.atproto.sync.getLatestCommit", 346 + base_url().await 347 + )) 348 + .query(&[("did", did.as_str())]) 349 + .send() 350 + .await 351 + .expect("Failed to get initial commit"); 352 + let initial_body: Value = initial_commit_res.json().await.unwrap(); 353 + let initial_rev = initial_body["rev"].as_str().unwrap(); 354 + 355 + let full_repo_res = client 356 + .get(format!( 357 + "{}/xrpc/com.atproto.sync.getRepo", 358 + base_url().await 359 + )) 360 + .query(&[("did", did.as_str())]) 361 + .send() 362 + .await 363 + .expect("Failed to get full repo"); 364 + assert_eq!(full_repo_res.status(), StatusCode::OK); 365 + let full_repo_bytes = full_repo_res.bytes().await.unwrap(); 366 + let full_repo_size = full_repo_bytes.len(); 367 + 368 + create_post(&client, &did, &jwt, "Test post for since param").await; 369 + 370 + let partial_repo_res = client 371 + .get(format!( 372 + "{}/xrpc/com.atproto.sync.getRepo", 373 + base_url().await 374 + )) 375 + .query(&[("did", did.as_str()), ("since", initial_rev)]) 376 + .send() 377 + .await 378 + .expect("Failed to get partial repo"); 379 + assert_eq!(partial_repo_res.status(), StatusCode::OK); 380 + let partial_repo_bytes = partial_repo_res.bytes().await.unwrap(); 381 + let partial_repo_size = partial_repo_bytes.len(); 382 + 383 + assert!( 384 + partial_repo_size < full_repo_size, 385 + "Partial export (since={}) should be smaller than full export: {} vs {}", 386 + initial_rev, 387 + partial_repo_size, 388 + full_repo_size 389 + ); 390 + } 391 + 392 + #[tokio::test] 393 + async fn test_list_blobs_takendown_returns_error() { 394 + let client = client(); 395 + let (_, did) = create_account_and_login(&client).await; 396 + 397 + set_account_takedown(&did, Some("test-takedown-ref")).await; 398 + 399 + let res = client 400 + .get(format!( 401 + "{}/xrpc/com.atproto.sync.listBlobs", 402 + base_url().await 403 + )) 404 + .query(&[("did", did.as_str())]) 405 + .send() 406 + .await 407 + .expect("Failed to send request"); 408 + 409 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 410 + let body: Value = res.json().await.expect("Response was not valid JSON"); 411 + assert_eq!(body["error"], "RepoTakendown"); 412 + } 413 + 414 + #[tokio::test] 415 + async fn test_get_record_takendown_returns_error() { 416 + let client = client(); 417 + let (jwt, did) = create_account_and_login(&client).await; 418 + 419 + let (uri, _cid) = create_post(&client, &did, &jwt, "Test post").await; 420 + let parts: Vec<&str> = uri.split('/').collect(); 421 + let collection = parts[parts.len() - 2]; 422 + let rkey = parts[parts.len() - 1]; 423 + 424 + set_account_takedown(&did, Some("test-takedown-ref")).await; 425 + 426 + let res = client 427 + .get(format!( 428 + "{}/xrpc/com.atproto.sync.getRecord", 429 + base_url().await 430 + )) 431 + .query(&[ 432 + ("did", did.as_str()), 433 + ("collection", collection), 434 + ("rkey", rkey), 435 + ]) 436 + .send() 437 + .await 438 + .expect("Failed to send request"); 439 + 440 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 441 + let body: Value = res.json().await.expect("Response was not valid JSON"); 442 + assert_eq!(body["error"], "RepoTakendown"); 443 + }
+1 -1
tests/sync_deprecated.rs
··· 138 138 "CAR file should have at least header length" 139 139 ); 140 140 for i in 0..4 { 141 - tokio::time::sleep(std::time::Duration::from_millis(50)).await; 141 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 142 142 create_post(&client, &did, &jwt, &format!("Checkout post {}", i)).await; 143 143 } 144 144 let multi_res = client
+40 -27
tests/sync_repo.rs
··· 39 39 .send() 40 40 .await 41 41 .expect("Failed to send request"); 42 - assert_eq!(res.status(), StatusCode::NOT_FOUND); 42 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 43 43 let body: Value = res.json().await.expect("Response was not valid JSON"); 44 44 assert_eq!(body["error"], "RepoNotFound"); 45 45 } ··· 106 106 #[tokio::test] 107 107 async fn test_list_repos_pagination() { 108 108 let client = client(); 109 - let _ = create_account_and_login(&client).await; 110 - let _ = create_account_and_login(&client).await; 111 - let _ = create_account_and_login(&client).await; 112 - let params = [("limit", "1")]; 113 - let res = client 114 - .get(format!( 115 - "{}/xrpc/com.atproto.sync.listRepos", 116 - base_url().await 117 - )) 118 - .query(&params) 119 - .send() 120 - .await 121 - .expect("Failed to send request"); 122 - assert_eq!(res.status(), StatusCode::OK); 123 - let body: Value = res.json().await.expect("Response was not valid JSON"); 124 - let repos = body["repos"].as_array().unwrap(); 125 - assert_eq!(repos.len(), 1); 126 - if let Some(cursor) = body["cursor"].as_str() { 127 - let params = [("limit", "1"), ("cursor", cursor)]; 109 + let (_, did1) = create_account_and_login(&client).await; 110 + let (_, did2) = create_account_and_login(&client).await; 111 + let (_, did3) = create_account_and_login(&client).await; 112 + let our_dids: std::collections::HashSet<String> = [did1, did2, did3].into_iter().collect(); 113 + let mut all_dids_seen: std::collections::HashSet<String> = std::collections::HashSet::new(); 114 + let mut cursor: Option<String> = None; 115 + let mut page_count = 0; 116 + let max_pages = 100; 117 + loop { 118 + let mut params: Vec<(&str, String)> = vec![("limit".into(), "10".into())]; 119 + if let Some(ref c) = cursor { 120 + params.push(("cursor", c.clone())); 121 + } 128 122 let res = client 129 123 .get(format!( 130 124 "{}/xrpc/com.atproto.sync.listRepos", ··· 136 130 .expect("Failed to send request"); 137 131 assert_eq!(res.status(), StatusCode::OK); 138 132 let body: Value = res.json().await.expect("Response was not valid JSON"); 139 - let repos2 = body["repos"].as_array().unwrap(); 140 - assert_eq!(repos2.len(), 1); 141 - assert_ne!(repos[0]["did"], repos2[0]["did"]); 133 + let repos = body["repos"].as_array().unwrap(); 134 + for repo in repos { 135 + let did = repo["did"].as_str().unwrap().to_string(); 136 + assert!( 137 + !all_dids_seen.contains(&did), 138 + "Pagination returned duplicate DID: {}", 139 + did 140 + ); 141 + all_dids_seen.insert(did); 142 + } 143 + cursor = body["cursor"].as_str().map(String::from); 144 + page_count += 1; 145 + if cursor.is_none() || page_count >= max_pages { 146 + break; 147 + } 148 + } 149 + for did in &our_dids { 150 + assert!( 151 + all_dids_seen.contains(did), 152 + "Our created DID {} was not found in paginated results", 153 + did 154 + ); 142 155 } 143 156 } 144 157 ··· 176 189 .send() 177 190 .await 178 191 .expect("Failed to send request"); 179 - assert_eq!(res.status(), StatusCode::NOT_FOUND); 192 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 180 193 let body: Value = res.json().await.expect("Response was not valid JSON"); 181 194 assert_eq!(body["error"], "RepoNotFound"); 182 195 } ··· 270 283 .send() 271 284 .await 272 285 .expect("Failed to send request"); 273 - assert_eq!(res.status(), StatusCode::NOT_FOUND); 286 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 274 287 let body: Value = res.json().await.expect("Response was not valid JSON"); 275 288 assert_eq!(body["error"], "RepoNotFound"); 276 289 } ··· 397 410 .send() 398 411 .await 399 412 .expect("Failed to send request"); 400 - assert_eq!(res.status(), StatusCode::NOT_FOUND); 413 + assert_eq!(res.status(), StatusCode::BAD_REQUEST); 401 414 } 402 415 403 416 #[tokio::test] ··· 536 549 .expect("Failed to create profile"); 537 550 assert_eq!(profile_res.status(), StatusCode::OK); 538 551 for i in 0..3 { 539 - tokio::time::sleep(std::time::Duration::from_millis(50)).await; 552 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; 540 553 create_post(&client, &did, &jwt, &format!("Export test post {}", i)).await; 541 554 } 542 555 let blob_data = b"blob data for sync export test";