this repo has no description

Account lifecycle conf. vs ref

lewis 9d6d792f c02136bc

Changed files
+1371 -177
.sqlx
migrations
src
tests
+14
.env.example
··· 100 # Comma-separated list of available user domains 101 # AVAILABLE_USER_DOMAINS=example.com 102 # ============================================================================= 103 # Rate Limiting 104 # ============================================================================= 105 # Disable all rate limiting (testing only, NEVER in production) 106 # DISABLE_RATE_LIMITING=1 107 # ============================================================================= 108 # Miscellaneous 109 # =============================================================================
··· 100 # Comma-separated list of available user domains 101 # AVAILABLE_USER_DOMAINS=example.com 102 # ============================================================================= 103 + # Server Metadata (returned by describeServer) 104 + # ============================================================================= 105 + # Privacy policy URL (optional) 106 + # PRIVACY_POLICY_URL=https://example.com/privacy 107 + # Terms of service URL (optional) 108 + # TERMS_OF_SERVICE_URL=https://example.com/terms 109 + # Contact email address (optional) 110 + # CONTACT_EMAIL=admin@example.com 111 + # ============================================================================= 112 # Rate Limiting 113 # ============================================================================= 114 # Disable all rate limiting (testing only, NEVER in production) 115 # DISABLE_RATE_LIMITING=1 116 + # ============================================================================= 117 + # Account Deletion 118 + # ============================================================================= 119 + # How often to check for scheduled account deletions (default: 3600 = 1 hour) 120 + # SCHEDULED_DELETE_CHECK_INTERVAL_SECS=3600 121 # ============================================================================= 122 # Miscellaneous 123 # =============================================================================
-15
.sqlx/query-14a68a119586aa980fb7b64646c1373eecd788e508246b5ad84e31b1adbdd2c1.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Uuid", 9 - "Text" 10 - ] 11 - }, 12 - "nullable": [] 13 - }, 14 - "hash": "14a68a119586aa980fb7b64646c1373eecd788e508246b5ad84e31b1adbdd2c1" 15 - }
···
+15
.sqlx/query-244b55cedfe51f834337141d3bb00e48a1c9277be3e6f0e7e6231a0f3e53a7a4.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO user_blocks (user_id, block_cid)\n SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)\n ON CONFLICT (user_id, block_cid) DO NOTHING\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "ByteaArray" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "244b55cedfe51f834337141d3bb00e48a1c9277be3e6f0e7e6231a0f3e53a7a4" 15 + }
-15
.sqlx/query-2588479ef83ed45a5d0dee599636f195ca38c5df164e225dcb1b829b497c8f14.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Text", 9 - "Uuid" 10 - ] 11 - }, 12 - "nullable": [] 13 - }, 14 - "hash": "2588479ef83ed45a5d0dee599636f195ca38c5df164e225dcb1b829b497c8f14" 15 - }
···
+16
.sqlx/query-3567e730c1fe4dee7753a53b71c2c586335c795003ce6090fb5af2b107208305.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Text", 9 + "Text", 10 + "Uuid" 11 + ] 12 + }, 13 + "nullable": [] 14 + }, 15 + "hash": "3567e730c1fe4dee7753a53b71c2c586335c795003ce6090fb5af2b107208305" 16 + }
+26
.sqlx/query-3f13f59e14ca24d4523be38a0b95d32a4a970f61c84f0539f4c4ee484afdce7d.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT u.id as user_id, r.repo_root_cid\n FROM users u\n JOIN repos r ON r.user_id = u.id\n WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id)\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "user_id", 9 + "type_info": "Uuid" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "repo_root_cid", 14 + "type_info": "Text" 15 + } 16 + ], 17 + "parameters": { 18 + "Left": [] 19 + }, 20 + "nullable": [ 21 + false, 22 + false 23 + ] 24 + }, 25 + "hash": "3f13f59e14ca24d4523be38a0b95d32a4a970f61c84f0539f4c4ee484afdce7d" 26 + }
+28
.sqlx/query-49f01f438353a771fd42473fee5090f68e0083610d07e609825d528ef58ade1f.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT r.repo_root_cid, r.repo_rev FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "repo_root_cid", 9 + "type_info": "Text" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "repo_rev", 14 + "type_info": "Text" 15 + } 16 + ], 17 + "parameters": { 18 + "Left": [ 19 + "Text" 20 + ] 21 + }, 22 + "nullable": [ 23 + false, 24 + true 25 + ] 26 + }, 27 + "hash": "49f01f438353a771fd42473fee5090f68e0083610d07e609825d528ef58ade1f" 28 + }
+22
.sqlx/query-51da09ecbd806c8ee59acfbe333a3eace1c428f5bb5130dff0cccf14e4bdb4c1.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO repo_seq (did, event_type, active, status)\n VALUES ($1, 'account', false, 'deleted')\n RETURNING seq\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "seq", 9 + "type_info": "Int8" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Text" 15 + ] 16 + }, 17 + "nullable": [ 18 + false 19 + ] 20 + }, 21 + "hash": "51da09ecbd806c8ee59acfbe333a3eace1c428f5bb5130dff0cccf14e4bdb4c1" 22 + }
-26
.sqlx/query-53b0ea60a759f8bb37d01461fd0769dcc683e796287e41d5180340296286fcbe.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)\n VALUES ($1, 'commit', $2, $2, $3, $4, $5)\n RETURNING seq\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "seq", 9 - "type_info": "Int8" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text", 15 - "Text", 16 - "Jsonb", 17 - "TextArray", 18 - "TextArray" 19 - ] 20 - }, 21 - "nullable": [ 22 - false 23 - ] 24 - }, 25 - "hash": "53b0ea60a759f8bb37d01461fd0769dcc683e796287e41d5180340296286fcbe" 26 - }
···
+28
.sqlx/query-6b3704b48a690ea278019a70a977737de7f6dc39c3f2509b55bb6c4580e3d2ee.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "repo_root_cid", 9 + "type_info": "Text" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "repo_rev", 14 + "type_info": "Text" 15 + } 16 + ], 17 + "parameters": { 18 + "Left": [ 19 + "Uuid" 20 + ] 21 + }, 22 + "nullable": [ 23 + false, 24 + true 25 + ] 26 + }, 27 + "hash": "6b3704b48a690ea278019a70a977737de7f6dc39c3f2509b55bb6c4580e3d2ee" 28 + }
+22
.sqlx/query-908e74d3c4c6e429133adb7074dcfe52980f0e02f2908b17cdd00fc679e6da36.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "count", 9 + "type_info": "Int8" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Uuid" 15 + ] 16 + }, 17 + "nullable": [ 18 + null 19 + ] 20 + }, 21 + "hash": "908e74d3c4c6e429133adb7074dcfe52980f0e02f2908b17cdd00fc679e6da36" 22 + }
+16
.sqlx/query-94683841b256b65ed2ac4806206faf7edc34b5952143334b8fc834350894478f.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "Text", 10 + "Text" 11 + ] 12 + }, 13 + "nullable": [] 14 + }, 15 + "hash": "94683841b256b65ed2ac4806206faf7edc34b5952143334b8fc834350894478f" 16 + }
+15
.sqlx/query-978ec276ffa89b539b5365e8106f0f78b7dd5d3d50162deb535c583796afe192.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO user_blocks (user_id, block_cid)\n SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)\n ON CONFLICT (user_id, block_cid) DO NOTHING\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "ByteaArray" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "978ec276ffa89b539b5365e8106f0f78b7dd5d3d50162deb535c583796afe192" 15 + }
+26
.sqlx/query-9c1d6f38011f8070e058ef4c9100ebe833c85fe4aa1b77af1ce67dd8fcda507a.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT did, handle\n FROM users\n WHERE delete_after IS NOT NULL\n AND delete_after < NOW()\n AND deactivated_at IS NOT NULL\n LIMIT 100\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "did", 9 + "type_info": "Text" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "handle", 14 + "type_info": "Text" 15 + } 16 + ], 17 + "parameters": { 18 + "Left": [] 19 + }, 20 + "nullable": [ 21 + false, 22 + false 23 + ] 24 + }, 25 + "hash": "9c1d6f38011f8070e058ef4c9100ebe833c85fe4aa1b77af1ce67dd8fcda507a" 26 + }
+15
.sqlx/query-9f461c44be23d43feb8491422dd5008e3a32ba603f09fcdbbc29bf23cb870444.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO user_blocks (user_id, block_cid)\n SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)\n ON CONFLICT (user_id, block_cid) DO NOTHING\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "ByteaArray" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "9f461c44be23d43feb8491422dd5008e3a32ba603f09fcdbbc29bf23cb870444" 15 + }
+16
.sqlx/query-a9e604216b880a8e1be9b4cec84880febb5185f7b7babb616f9c0f1f7016f59e.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Text", 9 + "Text", 10 + "Uuid" 11 + ] 12 + }, 13 + "nullable": [] 14 + }, 15 + "hash": "a9e604216b880a8e1be9b4cec84880febb5185f7b7babb616f9c0f1f7016f59e" 16 + }
+15
.sqlx/query-b6d6548acb89d6384cd226f6ed0d66de27fde3af24b4a7a3fce7e098812e38a5.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Text", 9 + "Int8" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "b6d6548acb89d6384cd226f6ed0d66de27fde3af24b4a7a3fce7e098812e38a5" 15 + }
+15
.sqlx/query-b7432d134013ff1f64389dda715ae0c23e0095f42585e2ecb962422d9a45ef17.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Text", 9 + "Timestamptz" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "b7432d134013ff1f64389dda715ae0c23e0095f42585e2ecb962422d9a45ef17" 15 + }
+15
.sqlx/query-b8de174efc5f897e688bc1fb5c49a10530815dd4737e4c4b821f5b26756b63ba.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Text", 9 + "Uuid" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "b8de174efc5f897e688bc1fb5c49a10530815dd4737e4c4b821f5b26756b63ba" 15 + }
+28
.sqlx/query-c9067e3e62c22fe92a135fa0c6c2b06cad977bf73bf3bb0fd3fc88938d875637.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev)\n VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7)\n RETURNING seq\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "seq", 9 + "type_info": "Int8" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Text", 15 + "Text", 16 + "Text", 17 + "Jsonb", 18 + "TextArray", 19 + "TextArray", 20 + "Text" 21 + ] 22 + }, 23 + "nullable": [ 24 + false 25 + ] 26 + }, 27 + "hash": "c9067e3e62c22fe92a135fa0c6c2b06cad977bf73bf3bb0fd3fc88938d875637" 28 + }
-15
.sqlx/query-f1e88d447915b116f887c378253388654a783bddb111b1f9aa04507f176980d3.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Text", 9 - "Uuid" 10 - ] 11 - }, 12 - "nullable": [] 13 - }, 14 - "hash": "f1e88d447915b116f887c378253388654a783bddb111b1f9aa04507f176980d3" 15 - }
···
+22
.sqlx/query-f59010ecdd7f782489e0e03288a06dacd72b33d04c1e2b98475018ad25485852.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT storage_key as \"storage_key!\" FROM blobs WHERE created_by_user = $1", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "storage_key!", 9 + "type_info": "Text" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Uuid" 15 + ] 16 + }, 17 + "nullable": [ 18 + false 19 + ] 20 + }, 21 + "hash": "f59010ecdd7f782489e0e03288a06dacd72b33d04c1e2b98475018ad25485852" 22 + }
+26
.sqlx/query-f90c58a4e9dc9c28a682405fb7d5421853c6ef710bee0170430416485f41a0c3.json
···
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "user_id", 9 + "type_info": "Uuid" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "repo_root_cid", 14 + "type_info": "Text" 15 + } 16 + ], 17 + "parameters": { 18 + "Left": [] 19 + }, 20 + "nullable": [ 21 + false, 22 + false 23 + ] 24 + }, 25 + "hash": "f90c58a4e9dc9c28a682405fb7d5421853c6ef710bee0170430416485f41a0c3" 26 + }
+1
migrations/20251239_add_delete_after.sql
···
··· 1 + ALTER TABLE users ADD COLUMN IF NOT EXISTS delete_after TIMESTAMPTZ;
+7
migrations/20251240_add_block_count.sql
···
··· 1 + CREATE TABLE IF NOT EXISTS user_blocks ( 2 + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, 3 + block_cid BYTEA NOT NULL, 4 + PRIMARY KEY (user_id, block_cid) 5 + ); 6 + 7 + CREATE INDEX IF NOT EXISTS idx_user_blocks_user_id ON user_blocks(user_id);
+24 -2
src/api/delegation.rs
··· 886 } 887 }; 888 let commit_cid_str = commit_cid.to_string(); 889 if let Err(e) = sqlx::query!( 890 - "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 891 user_id, 892 - commit_cid_str 893 ) 894 .execute(&mut *tx) 895 .await 896 { 897 error!("Error inserting repo: {:?}", e); 898 return ( 899 StatusCode::INTERNAL_SERVER_ERROR, 900 Json(json!({"error": "InternalError"})),
··· 886 } 887 }; 888 let commit_cid_str = commit_cid.to_string(); 889 + let rev_str = rev.as_ref().to_string(); 890 if let Err(e) = sqlx::query!( 891 + "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", 892 user_id, 893 + commit_cid_str, 894 + rev_str 895 ) 896 .execute(&mut *tx) 897 .await 898 { 899 error!("Error inserting repo: {:?}", e); 900 + return ( 901 + StatusCode::INTERNAL_SERVER_ERROR, 902 + Json(json!({"error": "InternalError"})), 903 + ) 904 + .into_response(); 905 + } 906 + let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; 907 + if let Err(e) = sqlx::query!( 908 + r#" 909 + INSERT INTO user_blocks (user_id, block_cid) 910 + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 911 + ON CONFLICT (user_id, block_cid) DO NOTHING 912 + "#, 913 + user_id, 914 + &genesis_block_cids 915 + ) 916 + .execute(&mut *tx) 917 + .await 918 + { 919 + error!("Error inserting user_blocks: {:?}", e); 920 return ( 921 StatusCode::INTERNAL_SERVER_ERROR, 922 Json(json!({"error": "InternalError"})),
+84 -65
src/api/identity/account.rs
··· 57 pub handle: String, 58 pub did: String, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 - pub access_jwt: Option<String>, 61 - #[serde(skip_serializing_if = "Option::is_none")] 62 - pub refresh_jwt: Option<String>, 63 pub verification_required: bool, 64 pub verification_channel: String, 65 } ··· 624 StatusCode::OK, 625 Json(CreateAccountOutput { 626 handle: handle.clone(), 627 - did, 628 - access_jwt: Some(access_meta.token), 629 - refresh_jwt: Some(refresh_meta.token), 630 verification_required: false, 631 verification_channel: "email".to_string(), 632 }), ··· 912 } 913 }; 914 let commit_cid_str = commit_cid.to_string(); 915 let repo_insert = sqlx::query!( 916 - "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 917 user_id, 918 - commit_cid_str 919 ) 920 .execute(&mut *tx) 921 .await; 922 if let Err(e) = repo_insert { 923 error!("Error initializing repo: {:?}", e); 924 return ( 925 StatusCode::INTERNAL_SERVER_ERROR, 926 Json(json!({"error": "InternalError"})), ··· 965 { 966 warn!("Failed to sequence account event for {}: {}", did, e); 967 } 968 let profile_record = json!({ 969 "$type": "app.bsky.actor.profile", 970 "displayName": input.handle ··· 1023 } 1024 } 1025 1026 - let (access_jwt, refresh_jwt) = if is_migration { 1027 - info!( 1028 - "[MIGRATION] createAccount: Creating session tokens for migration did={}", 1029 - did 1030 - ); 1031 - let access_meta = match crate::auth::create_access_token_with_metadata( 1032 - &did, 1033 - &secret_key_bytes, 1034 - ) { 1035 - Ok(m) => m, 1036 - Err(e) => { 1037 - error!( 1038 - "[MIGRATION] createAccount: Error creating access token for migration: {:?}", 1039 - e 1040 - ); 1041 - return ( 1042 - StatusCode::INTERNAL_SERVER_ERROR, 1043 - Json(json!({"error": "InternalError"})), 1044 - ) 1045 - .into_response(); 1046 - } 1047 - }; 1048 - let refresh_meta = match crate::auth::create_refresh_token_with_metadata( 1049 - &did, 1050 - &secret_key_bytes, 1051 - ) { 1052 Ok(m) => m, 1053 Err(e) => { 1054 - error!( 1055 - "[MIGRATION] createAccount: Error creating refresh token for migration: {:?}", 1056 - e 1057 - ); 1058 return ( 1059 StatusCode::INTERNAL_SERVER_ERROR, 1060 Json(json!({"error": "InternalError"})), ··· 1062 .into_response(); 1063 } 1064 }; 1065 - if let Err(e) = sqlx::query!( 1066 - "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 1067 - did, 1068 - access_meta.jti, 1069 - refresh_meta.jti, 1070 - access_meta.expires_at, 1071 - refresh_meta.expires_at 1072 ) 1073 - .execute(&state.db) 1074 - .await 1075 - { 1076 - error!("[MIGRATION] createAccount: Error creating session for migration: {:?}", e); 1077 - return ( 1078 - StatusCode::INTERNAL_SERVER_ERROR, 1079 - Json(json!({"error": "InternalError"})), 1080 - ) 1081 - .into_response(); 1082 - } 1083 - info!( 1084 - "[MIGRATION] createAccount: Session created successfully for did={}", 1085 - did 1086 - ); 1087 - (Some(access_meta.token), Some(refresh_meta.token)) 1088 - } else { 1089 - (None, None) 1090 - }; 1091 1092 if is_migration { 1093 info!( ··· 1101 Json(CreateAccountOutput { 1102 handle: handle.clone(), 1103 did, 1104 - access_jwt, 1105 - refresh_jwt, 1106 verification_required: !is_migration, 1107 verification_channel: verification_channel.to_string(), 1108 }), 1109 ) 1110 .into_response() 1111 }
··· 57 pub handle: String, 58 pub did: String, 59 #[serde(skip_serializing_if = "Option::is_none")] 60 + pub did_doc: Option<serde_json::Value>, 61 + pub access_jwt: String, 62 + pub refresh_jwt: String, 63 pub verification_required: bool, 64 pub verification_channel: String, 65 } ··· 624 StatusCode::OK, 625 Json(CreateAccountOutput { 626 handle: handle.clone(), 627 + did: did.clone(), 628 + did_doc: state.did_resolver.resolve_did_document(&did).await, 629 + access_jwt: access_meta.token, 630 + refresh_jwt: refresh_meta.token, 631 verification_required: false, 632 verification_channel: "email".to_string(), 633 }), ··· 913 } 914 }; 915 let commit_cid_str = commit_cid.to_string(); 916 + let rev_str = rev.as_ref().to_string(); 917 let repo_insert = sqlx::query!( 918 + "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", 919 user_id, 920 + commit_cid_str, 921 + rev_str 922 ) 923 .execute(&mut *tx) 924 .await; 925 if let Err(e) = repo_insert { 926 error!("Error initializing repo: {:?}", e); 927 + return ( 928 + StatusCode::INTERNAL_SERVER_ERROR, 929 + Json(json!({"error": "InternalError"})), 930 + ) 931 + .into_response(); 932 + } 933 + let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; 934 + if let Err(e) = sqlx::query!( 935 + r#" 936 + INSERT INTO user_blocks (user_id, block_cid) 937 + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 938 + ON CONFLICT (user_id, block_cid) DO NOTHING 939 + "#, 940 + user_id, 941 + &genesis_block_cids 942 + ) 943 + .execute(&mut *tx) 944 + .await 945 + { 946 + error!("Error inserting user_blocks: {:?}", e); 947 return ( 948 StatusCode::INTERNAL_SERVER_ERROR, 949 Json(json!({"error": "InternalError"})), ··· 988 { 989 warn!("Failed to sequence account event for {}: {}", did, e); 990 } 991 + if let Err(e) = 992 + crate::api::repo::record::sequence_empty_commit_event(&state, &did).await 993 + { 994 + warn!("Failed to sequence commit event for {}: {}", did, e); 995 + } 996 + if let Err(e) = crate::api::repo::record::sequence_sync_event( 997 + &state, 998 + &did, 999 + &commit_cid_str, 1000 + Some(rev.as_ref()), 1001 + ) 1002 + .await 1003 + { 1004 + warn!("Failed to sequence sync event for {}: {}", did, e); 1005 + } 1006 let profile_record = json!({ 1007 "$type": "app.bsky.actor.profile", 1008 "displayName": input.handle ··· 1061 } 1062 } 1063 1064 + let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) 1065 + { 1066 + Ok(m) => m, 1067 + Err(e) => { 1068 + error!("createAccount: Error creating access token: {:?}", e); 1069 + return ( 1070 + StatusCode::INTERNAL_SERVER_ERROR, 1071 + Json(json!({"error": "InternalError"})), 1072 + ) 1073 + .into_response(); 1074 + } 1075 + }; 1076 + let refresh_meta = 1077 + match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { 1078 Ok(m) => m, 1079 Err(e) => { 1080 + error!("createAccount: Error creating refresh token: {:?}", e); 1081 return ( 1082 StatusCode::INTERNAL_SERVER_ERROR, 1083 Json(json!({"error": "InternalError"})), ··· 1085 .into_response(); 1086 } 1087 }; 1088 + if let Err(e) = sqlx::query!( 1089 + "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", 1090 + did, 1091 + access_meta.jti, 1092 + refresh_meta.jti, 1093 + access_meta.expires_at, 1094 + refresh_meta.expires_at 1095 + ) 1096 + .execute(&state.db) 1097 + .await 1098 + { 1099 + error!("createAccount: Error creating session: {:?}", e); 1100 + return ( 1101 + StatusCode::INTERNAL_SERVER_ERROR, 1102 + Json(json!({"error": "InternalError"})), 1103 ) 1104 + .into_response(); 1105 + } 1106 + 1107 + let did_doc = state.did_resolver.resolve_did_document(&did).await; 1108 1109 if is_migration { 1110 info!( ··· 1118 Json(CreateAccountOutput { 1119 handle: handle.clone(), 1120 did, 1121 + did_doc, 1122 + access_jwt: access_meta.token, 1123 + refresh_jwt: refresh_meta.token, 1124 verification_required: !is_migration, 1125 verification_channel: verification_channel.to_string(), 1126 }), 1127 ) 1128 .into_response() 1129 } 1130 +
+24 -2
src/api/repo/import.rs
··· 315 .ok() 316 .and_then(|s| s.parse().ok()) 317 .unwrap_or(DEFAULT_MAX_BLOCKS); 318 - match apply_import(&state.db, user_id, root, blocks, max_blocks).await { 319 Ok(import_result) => { 320 info!( 321 "Successfully imported {} records for user {}", ··· 405 }; 406 let new_root_str = new_root_cid.to_string(); 407 if let Err(e) = sqlx::query!( 408 - "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2", 409 new_root_str, 410 user_id 411 ) 412 .execute(&state.db) 413 .await 414 { 415 error!("Failed to update repo root: {:?}", e); 416 return ( 417 StatusCode::INTERNAL_SERVER_ERROR, 418 Json(json!({"error": "InternalError"})),
··· 315 .ok() 316 .and_then(|s| s.parse().ok()) 317 .unwrap_or(DEFAULT_MAX_BLOCKS); 318 + match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await { 319 Ok(import_result) => { 320 info!( 321 "Successfully imported {} records for user {}", ··· 405 }; 406 let new_root_str = new_root_cid.to_string(); 407 if let Err(e) = sqlx::query!( 408 + "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3", 409 new_root_str, 410 + &new_rev_str, 411 user_id 412 ) 413 .execute(&state.db) 414 .await 415 { 416 error!("Failed to update repo root: {:?}", e); 417 + return ( 418 + StatusCode::INTERNAL_SERVER_ERROR, 419 + Json(json!({"error": "InternalError"})), 420 + ) 421 + .into_response(); 422 + } 423 + let mut all_block_cids: Vec<Vec<u8>> = blocks.keys().map(|c| c.to_bytes()).collect(); 424 + all_block_cids.push(new_root_cid.to_bytes()); 425 + if let Err(e) = sqlx::query!( 426 + r#" 427 + INSERT INTO user_blocks (user_id, block_cid) 428 + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 429 + ON CONFLICT (user_id, block_cid) DO NOTHING 430 + "#, 431 + user_id, 432 + &all_block_cids 433 + ) 434 + .execute(&state.db) 435 + .await 436 + { 437 + error!("Failed to insert user_blocks: {:?}", e); 438 return ( 439 StatusCode::INTERNAL_SERVER_ERROR, 440 Json(json!({"error": "InternalError"})),
+31 -7
src/api/repo/record/utils.rs
··· 173 .flatten() 174 .unwrap_or(false); 175 sqlx::query!( 176 - "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", 177 new_root_cid.to_string(), 178 user_id 179 ) 180 .execute(&mut *tx) 181 .await 182 .map_err(|e| format!("DB Error (repos): {}", e))?; 183 let mut upsert_collections: Vec<String> = Vec::new(); 184 let mut upsert_rkeys: Vec<String> = Vec::new(); 185 let mut upsert_cids: Vec<String> = Vec::new(); ··· 492 } 493 494 pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> { 495 - let repo_root = sqlx::query_scalar!( 496 - "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 497 did 498 ) 499 .fetch_optional(&state.db) ··· 503 let ops = serde_json::json!([]); 504 let blobs: Vec<String> = vec![]; 505 let blocks_cids: Vec<String> = vec![]; 506 let seq_row = sqlx::query!( 507 r#" 508 - INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids) 509 - VALUES ($1, 'commit', $2, $2, $3, $4, $5) 510 RETURNING seq 511 "#, 512 did, 513 - repo_root, 514 ops, 515 &blobs, 516 - &blocks_cids 517 ) 518 .fetch_one(&state.db) 519 .await
··· 173 .flatten() 174 .unwrap_or(false); 175 sqlx::query!( 176 + "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3", 177 new_root_cid.to_string(), 178 + &rev_str, 179 user_id 180 ) 181 .execute(&mut *tx) 182 .await 183 .map_err(|e| format!("DB Error (repos): {}", e))?; 184 + let mut all_block_cids: Vec<Vec<u8>> = blocks_cids 185 + .iter() 186 + .filter_map(|s| Cid::from_str(s).ok()) 187 + .map(|c| c.to_bytes()) 188 + .collect(); 189 + all_block_cids.push(new_root_cid.to_bytes()); 190 + if !all_block_cids.is_empty() { 191 + sqlx::query!( 192 + r#" 193 + INSERT INTO user_blocks (user_id, block_cid) 194 + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 195 + ON CONFLICT (user_id, block_cid) DO NOTHING 196 + "#, 197 + user_id, 198 + &all_block_cids 199 + ) 200 + .execute(&mut *tx) 201 + .await 202 + .map_err(|e| format!("DB Error (user_blocks): {}", e))?; 203 + } 204 let mut upsert_collections: Vec<String> = Vec::new(); 205 let mut upsert_rkeys: Vec<String> = Vec::new(); 206 let mut upsert_cids: Vec<String> = Vec::new(); ··· 513 } 514 515 pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> { 516 + let repo_info = sqlx::query!( 517 + "SELECT r.repo_root_cid, r.repo_rev FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 518 did 519 ) 520 .fetch_optional(&state.db) ··· 524 let ops = serde_json::json!([]); 525 let blobs: Vec<String> = vec![]; 526 let blocks_cids: Vec<String> = vec![]; 527 + let prev_cid: Option<&str> = None; 528 let seq_row = sqlx::query!( 529 r#" 530 + INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev) 531 + VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7) 532 RETURNING seq 533 "#, 534 did, 535 + repo_info.repo_root_cid, 536 + prev_cid, 537 ops, 538 &blobs, 539 + &blocks_cids, 540 + repo_info.repo_rev 541 ) 542 .fetch_one(&state.db) 543 .await
+105 -19
src/api/server/account_status.rs
··· 83 _ => None, 84 }; 85 let repo_result = sqlx::query!( 86 - "SELECT repo_root_cid FROM repos WHERE user_id = $1", 87 user_id 88 ) 89 .fetch_optional(&state.db) 90 .await; 91 - let repo_commit = match repo_result { 92 - Ok(Some(row)) => row.repo_root_cid, 93 - _ => String::new(), 94 }; 95 let record_count: i64 = 96 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) ··· 106 .await 107 .unwrap_or(Some(0)) 108 .unwrap_or(0); 109 - let valid_did = did.starts_with("did:"); 110 ( 111 StatusCode::OK, 112 Json(CheckAccountStatusOutput { 113 activated: deactivated_at.is_none(), 114 valid_did, 115 repo_commit: repo_commit.clone(), 116 - repo_rev: chrono::Utc::now().timestamp_millis().to_string(), 117 - repo_blocks: 0, 118 indexed_records: record_count, 119 private_state_values: 0, 120 expected_blobs: blob_count, ··· 124 .into_response() 125 } 126 127 async fn assert_valid_did_document_for_service( 128 db: &sqlx::PgPool, 129 did: &str, 130 ) -> Result<(), (StatusCode, Json<serde_json::Value>)> { 131 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 132 let expected_endpoint = format!("https://{}", hostname); ··· 134 if did.starts_with("did:plc:") { 135 let plc_client = PlcClient::new(None); 136 137 let mut last_error = None; 138 let mut doc_data = None; 139 - for attempt in 0..5 { 140 if attempt > 0 { 141 let delay_ms = 500 * (1 << (attempt - 1)); 142 info!( ··· 196 } 197 }; 198 199 let doc_signing_key = doc_data 200 .get("verificationMethods") 201 .and_then(|v| v.get("atproto")) ··· 378 did 379 ); 380 let did_validation_start = std::time::Instant::now(); 381 - if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did).await { 382 info!( 383 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})", 384 did, ··· 511 pub async fn deactivate_account( 512 State(state): State<AppState>, 513 headers: axum::http::HeaderMap, 514 - Json(_input): Json<DeactivateAccountInput>, 515 ) -> Response { 516 let extracted = match crate::auth::extract_auth_token_from_header( 517 headers.get("Authorization").and_then(|h| h.to_str().ok()), ··· 548 return e; 549 } 550 551 let did = auth_user.did; 552 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 553 .fetch_optional(&state.db) ··· 555 .ok() 556 .flatten(); 557 let result = sqlx::query!( 558 - "UPDATE users SET deactivated_at = NOW() WHERE did = $1", 559 - did 560 ) 561 .execute(&state.db) 562 .await; ··· 690 return ( 691 StatusCode::BAD_REQUEST, 692 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 693 ) 694 .into_response(); 695 } ··· 842 ) 843 .into_response(); 844 } 845 - if let Err(e) = crate::api::repo::record::sequence_account_event( 846 &state, 847 did, 848 false, 849 Some("deleted"), 850 ) 851 - .await 852 - { 853 - warn!( 854 - "Failed to sequence account deletion event for {}: {}", 855 - did, e 856 - ); 857 } 858 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 859 info!("Account {} deleted successfully", did);
··· 83 _ => None, 84 }; 85 let repo_result = sqlx::query!( 86 + "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1", 87 user_id 88 ) 89 .fetch_optional(&state.db) 90 .await; 91 + let (repo_commit, repo_rev_from_db) = match repo_result { 92 + Ok(Some(row)) => (row.repo_root_cid, row.repo_rev), 93 + _ => (String::new(), None), 94 + }; 95 + let block_count: i64 = 96 + sqlx::query_scalar!("SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", user_id) 97 + .fetch_one(&state.db) 98 + .await 99 + .unwrap_or(Some(0)) 100 + .unwrap_or(0); 101 + let repo_rev = if let Some(rev) = repo_rev_from_db { 102 + rev 103 + } else if !repo_commit.is_empty() { 104 + if let Ok(cid) = Cid::from_str(&repo_commit) { 105 + if let Ok(Some(block)) = state.block_store.get(&cid).await { 106 + Commit::from_cbor(&block) 107 + .ok() 108 + .map(|c| c.rev().to_string()) 109 + .unwrap_or_default() 110 + } else { 111 + String::new() 112 + } 113 + } else { 114 + String::new() 115 + } 116 + } else { 117 + String::new() 118 }; 119 let record_count: i64 = 120 sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) ··· 130 .await 131 .unwrap_or(Some(0)) 132 .unwrap_or(0); 133 + let valid_did = is_valid_did_for_service(&state.db, &did).await; 134 ( 135 StatusCode::OK, 136 Json(CheckAccountStatusOutput { 137 activated: deactivated_at.is_none(), 138 valid_did, 139 repo_commit: repo_commit.clone(), 140 + repo_rev, 141 + repo_blocks: block_count as i64, 142 indexed_records: record_count, 143 private_state_values: 0, 144 expected_blobs: blob_count, ··· 148 .into_response() 149 } 150 151 + async fn is_valid_did_for_service(db: &sqlx::PgPool, did: &str) -> bool { 152 + assert_valid_did_document_for_service(db, did, false) 153 + .await 154 + .is_ok() 155 + } 156 + 157 async fn assert_valid_did_document_for_service( 158 db: &sqlx::PgPool, 159 did: &str, 160 + with_retry: bool, 161 ) -> Result<(), (StatusCode, Json<serde_json::Value>)> { 162 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 163 let expected_endpoint = format!("https://{}", hostname); ··· 165 if did.starts_with("did:plc:") { 166 let plc_client = PlcClient::new(None); 167 168 + let max_attempts = if with_retry { 5 } else { 1 }; 169 let mut last_error = None; 170 let mut doc_data = None; 171 + for attempt in 0..max_attempts { 172 if attempt > 0 { 173 let delay_ms = 500 * (1 << (attempt - 1)); 174 info!( ··· 228 } 229 }; 230 231 + let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok(); 232 + if let Some(ref expected_rotation_key) = server_rotation_key { 233 + let rotation_keys = doc_data 234 + .get("rotationKeys") 235 + .and_then(|v| v.as_array()) 236 + .map(|arr| { 237 + arr.iter() 238 + .filter_map(|k| k.as_str()) 239 + .collect::<Vec<_>>() 240 + }) 241 + .unwrap_or_default(); 242 + if !rotation_keys.contains(&expected_rotation_key.as_str()) { 243 + return Err(( 244 + StatusCode::BAD_REQUEST, 245 + Json(json!({ 246 + "error": "InvalidRequest", 247 + "message": "Server rotation key not included in PLC DID data" 248 + })), 249 + )); 250 + } 251 + } 252 + 253 let doc_signing_key = doc_data 254 .get("verificationMethods") 255 .and_then(|v| v.get("atproto")) ··· 432 did 433 ); 434 let did_validation_start = std::time::Instant::now(); 435 + if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did, true).await { 436 info!( 437 "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})", 438 did, ··· 565 pub async fn deactivate_account( 566 State(state): State<AppState>, 567 headers: axum::http::HeaderMap, 568 + Json(input): Json<DeactivateAccountInput>, 569 ) -> Response { 570 let extracted = match crate::auth::extract_auth_token_from_header( 571 headers.get("Authorization").and_then(|h| h.to_str().ok()), ··· 602 return e; 603 } 604 605 + let delete_after: Option<chrono::DateTime<chrono::Utc>> = input 606 + .delete_after 607 + .as_ref() 608 + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) 609 + .map(|dt| dt.with_timezone(&chrono::Utc)); 610 + 611 let did = auth_user.did; 612 let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) 613 .fetch_optional(&state.db) ··· 615 .ok() 616 .flatten(); 617 let result = sqlx::query!( 618 + "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1", 619 + did, 620 + delete_after 621 ) 622 .execute(&state.db) 623 .await; ··· 751 return ( 752 StatusCode::BAD_REQUEST, 753 Json(json!({"error": "InvalidRequest", "message": "password is required"})), 754 + ) 755 + .into_response(); 756 + } 757 + const OLD_PASSWORD_MAX_LENGTH: usize = 512; 758 + if password.len() > OLD_PASSWORD_MAX_LENGTH { 759 + return ( 760 + StatusCode::BAD_REQUEST, 761 + Json(json!({"error": "InvalidRequest", "message": "Invalid password length."})), 762 ) 763 .into_response(); 764 } ··· 911 ) 912 .into_response(); 913 } 914 + let account_seq = crate::api::repo::record::sequence_account_event( 915 &state, 916 did, 917 false, 918 Some("deleted"), 919 ) 920 + .await; 921 + match account_seq { 922 + Ok(seq) => { 923 + if let Err(e) = sqlx::query!( 924 + "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 925 + did, 926 + seq 927 + ) 928 + .execute(&state.db) 929 + .await 930 + { 931 + warn!( 932 + "Failed to cleanup sequences for deleted account {}: {}", 933 + did, e 934 + ); 935 + } 936 + } 937 + Err(e) => { 938 + warn!( 939 + "Failed to sequence account deletion event for {}: {}", 940 + did, e 941 + ); 942 + } 943 } 944 let _ = state.cache.delete(&format!("handle:{}", handle)).await; 945 info!("Account {} deleted successfully", did);
+10
src/api/server/meta.rs
··· 32 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 33 .map(|v| v == "true" || v == "1") 34 .unwrap_or(false); 35 Json(json!({ 36 "availableUserDomains": domains, 37 "inviteCodeRequired": invite_code_required, 38 "did": format!("did:web:{}", pds_hostname), 39 "version": env!("CARGO_PKG_VERSION"), 40 "availableCommsChannels": get_available_comms_channels() 41 }))
··· 32 let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") 33 .map(|v| v == "true" || v == "1") 34 .unwrap_or(false); 35 + let privacy_policy = std::env::var("PRIVACY_POLICY_URL").ok(); 36 + let terms_of_service = std::env::var("TERMS_OF_SERVICE_URL").ok(); 37 + let contact_email = std::env::var("CONTACT_EMAIL").ok(); 38 Json(json!({ 39 "availableUserDomains": domains, 40 "inviteCodeRequired": invite_code_required, 41 "did": format!("did:web:{}", pds_hostname), 42 + "links": { 43 + "privacyPolicy": privacy_policy, 44 + "termsOfService": terms_of_service 45 + }, 46 + "contact": { 47 + "email": contact_email 48 + }, 49 "version": env!("CARGO_PKG_VERSION"), 50 "availableCommsChannels": get_available_comms_channels() 51 }))
+24 -2
src/api/server/passkey_account.rs
··· 612 } 613 }; 614 let commit_cid_str = commit_cid.to_string(); 615 if let Err(e) = sqlx::query!( 616 - "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", 617 user_id, 618 - commit_cid_str 619 ) 620 .execute(&mut *tx) 621 .await 622 { 623 error!("Error inserting repo: {:?}", e); 624 return ( 625 StatusCode::INTERNAL_SERVER_ERROR, 626 Json(json!({"error": "InternalError"})),
··· 612 } 613 }; 614 let commit_cid_str = commit_cid.to_string(); 615 + let rev_str = rev.as_ref().to_string(); 616 if let Err(e) = sqlx::query!( 617 + "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", 618 user_id, 619 + commit_cid_str, 620 + rev_str 621 ) 622 .execute(&mut *tx) 623 .await 624 { 625 error!("Error inserting repo: {:?}", e); 626 + return ( 627 + StatusCode::INTERNAL_SERVER_ERROR, 628 + Json(json!({"error": "InternalError"})), 629 + ) 630 + .into_response(); 631 + } 632 + let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; 633 + if let Err(e) = sqlx::query!( 634 + r#" 635 + INSERT INTO user_blocks (user_id, block_cid) 636 + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 637 + ON CONFLICT (user_id, block_cid) DO NOTHING 638 + "#, 639 + user_id, 640 + &genesis_block_cids 641 + ) 642 + .execute(&mut *tx) 643 + .await 644 + { 645 + error!("Error inserting user_blocks: {:?}", e); 646 return ( 647 StatusCode::INTERNAL_SERVER_ERROR, 648 Json(json!({"error": "InternalError"})),
+1
src/lib.rs
··· 15 pub mod plc; 16 pub mod rate_limit; 17 pub mod repo; 18 pub mod state; 19 pub mod storage; 20 pub mod sync;
··· 15 pub mod plc; 16 pub mod rate_limit; 17 pub mod repo; 18 + pub mod scheduled; 19 pub mod state; 20 pub mod storage; 21 pub mod sync;
+17 -1
src/main.rs
··· 5 use tracing::{error, info, warn}; 6 use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender}; 7 use tranquil_pds::crawlers::{Crawlers, start_crawlers_service}; 8 use tranquil_pds::state::AppState; 9 10 #[tokio::main] ··· 27 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await; 28 29 let (shutdown_tx, shutdown_rx) = watch::channel(false); 30 31 let mut comms_service = CommsService::new(state.db.clone()); 32 ··· 63 Some(tokio::spawn(start_crawlers_service( 64 crawlers, 65 firehose_rx, 66 - shutdown_rx, 67 ))) 68 } else { 69 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); 70 None 71 }; 72 73 let app = tranquil_pds::app(state); 74 let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); ··· 87 if let Some(handle) = crawlers_handle { 88 handle.await.ok(); 89 } 90 91 if let Err(e) = server_result { 92 return Err(format!("Server error: {}", e).into());
··· 5 use tracing::{error, info, warn}; 6 use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender}; 7 use tranquil_pds::crawlers::{Crawlers, start_crawlers_service}; 8 + use tranquil_pds::scheduled::{backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks}; 9 use tranquil_pds::state::AppState; 10 11 #[tokio::main] ··· 28 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await; 29 30 let (shutdown_tx, shutdown_rx) = watch::channel(false); 31 + 32 + let backfill_db = state.db.clone(); 33 + let backfill_block_store = state.block_store.clone(); 34 + tokio::spawn(async move { 35 + backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await; 36 + backfill_user_blocks(&backfill_db, backfill_block_store).await; 37 + }); 38 39 let mut comms_service = CommsService::new(state.db.clone()); 40 ··· 71 Some(tokio::spawn(start_crawlers_service( 72 crawlers, 73 firehose_rx, 74 + shutdown_rx.clone(), 75 ))) 76 } else { 77 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); 78 None 79 }; 80 + 81 + let scheduled_handle = tokio::spawn(start_scheduled_tasks( 82 + state.db.clone(), 83 + state.blob_store.clone(), 84 + shutdown_rx, 85 + )); 86 87 let app = tranquil_pds::app(state); 88 let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); ··· 101 if let Some(handle) = crawlers_handle { 102 handle.await.ok(); 103 } 104 + 105 + scheduled_handle.await.ok(); 106 107 if let Err(e) = server_result { 108 return Err(format!("Server error: {}", e).into());
+368
src/scheduled.rs
···
··· 1 + use cid::Cid; 2 + use jacquard_repo::commit::Commit; 3 + use jacquard_repo::storage::BlockStore; 4 + use ipld_core::ipld::Ipld; 5 + use sqlx::PgPool; 6 + use std::str::FromStr; 7 + use std::sync::Arc; 8 + use std::time::Duration; 9 + use tokio::sync::watch; 10 + use tokio::time::interval; 11 + use tracing::{debug, error, info, warn}; 12 + 13 + use crate::repo::PostgresBlockStore; 14 + use crate::storage::BlobStorage; 15 + 16 + pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { 17 + let repos_missing_rev = match sqlx::query!( 18 + "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL" 19 + ) 20 + .fetch_all(db) 21 + .await 22 + { 23 + Ok(rows) => rows, 24 + Err(e) => { 25 + error!("Failed to query repos for backfill: {}", e); 26 + return; 27 + } 28 + }; 29 + 30 + if repos_missing_rev.is_empty() { 31 + debug!("No repos need repo_rev backfill"); 32 + return; 33 + } 34 + 35 + info!( 36 + count = repos_missing_rev.len(), 37 + "Backfilling repo_rev for existing repos" 38 + ); 39 + 40 + let mut success = 0; 41 + let mut failed = 0; 42 + 43 + for repo in repos_missing_rev { 44 + let cid = match Cid::from_str(&repo.repo_root_cid) { 45 + Ok(c) => c, 46 + Err(_) => { 47 + failed += 1; 48 + continue; 49 + } 50 + }; 51 + 52 + let block = match block_store.get(&cid).await { 53 + Ok(Some(b)) => b, 54 + _ => { 55 + failed += 1; 56 + continue; 57 + } 58 + }; 59 + 60 + let commit = match Commit::from_cbor(&block) { 61 + Ok(c) => c, 62 + Err(_) => { 63 + failed += 1; 64 + continue; 65 + } 66 + }; 67 + 68 + let rev = commit.rev().to_string(); 69 + 70 + if let Err(e) = sqlx::query!( 71 + "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", 72 + rev, 73 + repo.user_id 74 + ) 75 + .execute(db) 76 + .await 77 + { 78 + warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev"); 79 + failed += 1; 80 + } else { 81 + success += 1; 82 + } 83 + } 84 + 85 + info!(success, failed, "Completed repo_rev backfill"); 86 + } 87 + 88 + pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) { 89 + let users_without_blocks = match sqlx::query!( 90 + r#" 91 + SELECT u.id as user_id, r.repo_root_cid 92 + FROM users u 93 + JOIN repos r ON r.user_id = u.id 94 + WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id) 95 + "# 96 + ) 97 + .fetch_all(db) 98 + .await 99 + { 100 + Ok(rows) => rows, 101 + Err(e) => { 102 + error!("Failed to query users for user_blocks backfill: {}", e); 103 + return; 104 + } 105 + }; 106 + 107 + if users_without_blocks.is_empty() { 108 + debug!("No users need user_blocks backfill"); 109 + return; 110 + } 111 + 112 + info!( 113 + count = users_without_blocks.len(), 114 + "Backfilling user_blocks for existing repos" 115 + ); 116 + 117 + let mut success = 0; 118 + let mut failed = 0; 119 + 120 + for user in users_without_blocks { 121 + let root_cid = match Cid::from_str(&user.repo_root_cid) { 122 + Ok(c) => c, 123 + Err(_) => { 124 + failed += 1; 125 + continue; 126 + } 127 + }; 128 + 129 + let mut block_cids: Vec<Vec<u8>> = Vec::new(); 130 + let mut to_visit = vec![root_cid]; 131 + let mut visited = std::collections::HashSet::new(); 132 + 133 + while let Some(cid) = to_visit.pop() { 134 + if visited.contains(&cid) { 135 + continue; 136 + } 137 + visited.insert(cid); 138 + block_cids.push(cid.to_bytes()); 139 + 140 + let block = match block_store.get(&cid).await { 141 + Ok(Some(b)) => b, 142 + _ => continue, 143 + }; 144 + 145 + if let Ok(commit) = Commit::from_cbor(&block) { 146 + to_visit.push(commit.data); 147 + if let Some(prev) = commit.prev { 148 + to_visit.push(prev); 149 + } 150 + } else if let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 151 + if let Ipld::Map(ref obj) = ipld { 152 + if let Some(Ipld::Link(left_cid)) = obj.get("l") { 153 + to_visit.push(*left_cid); 154 + } 155 + if let Some(Ipld::List(entries)) = obj.get("e") { 156 + for entry in entries { 157 + if let Ipld::Map(entry_obj) = entry { 158 + if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 159 + to_visit.push(*tree_cid); 160 + } 161 + if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") { 162 + to_visit.push(*val_cid); 163 + } 164 + } 165 + } 166 + } 167 + } 168 + } 169 + } 170 + 171 + if block_cids.is_empty() { 172 + failed += 1; 173 + continue; 174 + } 175 + 176 + if let Err(e) = sqlx::query!( 177 + r#" 178 + INSERT INTO user_blocks (user_id, block_cid) 179 + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 180 + ON CONFLICT (user_id, block_cid) DO NOTHING 181 + "#, 182 + user.user_id, 183 + &block_cids 184 + ) 185 + .execute(db) 186 + .await 187 + { 188 + warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); 189 + failed += 1; 190 + } else { 191 + info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); 192 + success += 1; 193 + } 194 + } 195 + 196 + info!(success, failed, "Completed user_blocks backfill"); 197 + } 198 + 199 + pub async fn start_scheduled_tasks( 200 + db: PgPool, 201 + blob_store: Arc<dyn BlobStorage>, 202 + mut shutdown_rx: watch::Receiver<bool>, 203 + ) { 204 + let check_interval = Duration::from_secs( 205 + std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS") 206 + .ok() 207 + .and_then(|s| s.parse().ok()) 208 + .unwrap_or(3600), 209 + ); 210 + 211 + info!( 212 + check_interval_secs = check_interval.as_secs(), 213 + "Starting scheduled tasks service" 214 + ); 215 + 216 + let mut ticker = interval(check_interval); 217 + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 218 + 219 + loop { 220 + tokio::select! { 221 + _ = shutdown_rx.changed() => { 222 + if *shutdown_rx.borrow() { 223 + info!("Scheduled tasks service shutting down"); 224 + break; 225 + } 226 + } 227 + _ = ticker.tick() => { 228 + if let Err(e) = process_scheduled_deletions(&db, &blob_store).await { 229 + error!("Error processing scheduled deletions: {}", e); 230 + } 231 + } 232 + } 233 + } 234 + } 235 + 236 + async fn process_scheduled_deletions( 237 + db: &PgPool, 238 + blob_store: &Arc<dyn BlobStorage>, 239 + ) -> Result<(), String> { 240 + let accounts_to_delete = sqlx::query!( 241 + r#" 242 + SELECT did, handle 243 + FROM users 244 + WHERE delete_after IS NOT NULL 245 + AND delete_after < NOW() 246 + AND deactivated_at IS NOT NULL 247 + LIMIT 100 248 + "# 249 + ) 250 + .fetch_all(db) 251 + .await 252 + .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?; 253 + 254 + if accounts_to_delete.is_empty() { 255 + debug!("No accounts scheduled for deletion"); 256 + return Ok(()); 257 + } 258 + 259 + info!( 260 + count = accounts_to_delete.len(), 261 + "Processing scheduled account deletions" 262 + ); 263 + 264 + for account in accounts_to_delete { 265 + if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await { 266 + warn!( 267 + did = %account.did, 268 + handle = %account.handle, 269 + error = %e, 270 + "Failed to delete scheduled account" 271 + ); 272 + } else { 273 + info!( 274 + did = %account.did, 275 + handle = %account.handle, 276 + "Successfully deleted scheduled account" 277 + ); 278 + } 279 + } 280 + 281 + Ok(()) 282 + } 283 + 284 + async fn delete_account_data( 285 + db: &PgPool, 286 + blob_store: &Arc<dyn BlobStorage>, 287 + did: &str, 288 + _handle: &str, 289 + ) -> Result<(), String> { 290 + let user_id: uuid::Uuid = sqlx::query_scalar!( 291 + "SELECT id FROM users WHERE did = $1", 292 + did 293 + ) 294 + .fetch_one(db) 295 + .await 296 + .map_err(|e| format!("DB error fetching user: {}", e))?; 297 + 298 + let blob_storage_keys: Vec<String> = sqlx::query_scalar!( 299 + r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#, 300 + user_id 301 + ) 302 + .fetch_all(db) 303 + .await 304 + .map_err(|e| format!("DB error fetching blob keys: {}", e))?; 305 + 306 + for storage_key in &blob_storage_keys { 307 + if let Err(e) = blob_store.delete(storage_key).await { 308 + warn!( 309 + storage_key = %storage_key, 310 + error = %e, 311 + "Failed to delete blob from storage (continuing anyway)" 312 + ); 313 + } 314 + } 315 + 316 + let mut tx = db 317 + .begin() 318 + .await 319 + .map_err(|e| format!("Failed to begin transaction: {}", e))?; 320 + 321 + sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) 322 + .execute(&mut *tx) 323 + .await 324 + .map_err(|e| format!("Failed to delete blobs: {}", e))?; 325 + 326 + sqlx::query!("DELETE FROM users WHERE id = $1", user_id) 327 + .execute(&mut *tx) 328 + .await 329 + .map_err(|e| format!("Failed to delete user: {}", e))?; 330 + 331 + let account_seq = sqlx::query_scalar!( 332 + r#" 333 + INSERT INTO repo_seq (did, event_type, active, status) 334 + VALUES ($1, 'account', false, 'deleted') 335 + RETURNING seq 336 + "#, 337 + did 338 + ) 339 + .fetch_one(&mut *tx) 340 + .await 341 + .map_err(|e| format!("Failed to sequence account deletion: {}", e))?; 342 + 343 + sqlx::query!( 344 + "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", 345 + did, 346 + account_seq 347 + ) 348 + .execute(&mut *tx) 349 + .await 350 + .map_err(|e| format!("Failed to cleanup sequences: {}", e))?; 351 + 352 + tx.commit() 353 + .await 354 + .map_err(|e| format!("Failed to commit transaction: {}", e))?; 355 + 356 + sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq)) 357 + .execute(db) 358 + .await 359 + .map_err(|e| format!("Failed to notify: {}", e))?; 360 + 361 + info!( 362 + did = %did, 363 + blob_count = blob_storage_keys.len(), 364 + "Deleted account data including blobs from storage" 365 + ); 366 + 367 + Ok(()) 368 + }
+5 -2
src/sync/frame.rs
··· 101 pub ops_json: serde_json::Value, 102 pub blobs: Vec<String>, 103 pub time: chrono::DateTime<chrono::Utc>, 104 } 105 106 impl CommitFrameBuilder { ··· 122 .iter() 123 .filter_map(|s| Cid::from_str(s).ok()) 124 .collect(); 125 - let rev = placeholder_rev(); 126 Ok(CommitFrame { 127 seq: self.seq, 128 rebase: false, ··· 130 repo: self.did, 131 commit: commit_cid, 132 rev, 133 - since: self.prev_cid_str.as_ref().map(|_| placeholder_rev()), 134 blocks: Vec::new(), 135 ops, 136 blobs, ··· 161 ops_json: event.ops.unwrap_or_default(), 162 blobs: event.blobs.unwrap_or_default(), 163 time: event.created_at, 164 }; 165 builder.build() 166 }
··· 101 pub ops_json: serde_json::Value, 102 pub blobs: Vec<String>, 103 pub time: chrono::DateTime<chrono::Utc>, 104 + pub rev: Option<String>, 105 } 106 107 impl CommitFrameBuilder { ··· 123 .iter() 124 .filter_map(|s| Cid::from_str(s).ok()) 125 .collect(); 126 + let rev = self.rev.unwrap_or_else(placeholder_rev); 127 + let since = self.prev_cid_str.as_ref().map(|_| rev.clone()); 128 Ok(CommitFrame { 129 seq: self.seq, 130 rebase: false, ··· 132 repo: self.did, 133 commit: commit_cid, 134 rev, 135 + since, 136 blocks: Vec::new(), 137 ops, 138 blobs, ··· 163 ops_json: event.ops.unwrap_or_default(), 164 blobs: event.blobs.unwrap_or_default(), 165 time: event.created_at, 166 + rev: event.rev, 167 }; 168 builder.build() 169 }
+279
tests/account_lifecycle.rs
···
··· 1 + mod common; 2 + mod helpers; 3 + use common::*; 4 + use reqwest::StatusCode; 5 + use serde_json::{Value, json}; 6 + 7 + #[tokio::test] 8 + async fn test_check_account_status_returns_correct_block_count() { 9 + let client = client(); 10 + let base = base_url().await; 11 + let (access_jwt, did) = create_account_and_login(&client).await; 12 + 13 + let status1 = client 14 + .get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base)) 15 + .bearer_auth(&access_jwt) 16 + .send() 17 + .await 18 + .unwrap(); 19 + assert_eq!(status1.status(), StatusCode::OK); 20 + let body1: Value = status1.json().await.unwrap(); 21 + let initial_blocks = body1["repoBlocks"].as_i64().unwrap(); 22 + assert!(initial_blocks >= 2, "New account should have at least 2 blocks (commit + empty MST)"); 23 + 24 + let create_res = client 25 + .post(format!("{}/xrpc/com.atproto.repo.createRecord", base)) 26 + .bearer_auth(&access_jwt) 27 + .json(&json!({ 28 + "repo": did, 29 + "collection": "app.bsky.feed.post", 30 + "record": { 31 + "$type": "app.bsky.feed.post", 32 + "text": "Test post for block counting", 33 + "createdAt": chrono::Utc::now().to_rfc3339() 34 + } 35 + })) 36 + .send() 37 + .await 38 + .unwrap(); 39 + assert_eq!(create_res.status(), StatusCode::OK); 40 + let create_body: Value = create_res.json().await.unwrap(); 41 + let rkey = create_body["uri"].as_str().unwrap().split('/').last().unwrap().to_string(); 42 + 43 + let status2 = client 44 + .get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base)) 45 + .bearer_auth(&access_jwt) 46 + .send() 47 + .await 48 + .unwrap(); 49 + let body2: Value = status2.json().await.unwrap(); 50 + let after_create_blocks = body2["repoBlocks"].as_i64().unwrap(); 51 + assert!(after_create_blocks > initial_blocks, "Block count should increase after creating a record"); 52 + 53 + let delete_res = client 54 + .post(format!("{}/xrpc/com.atproto.repo.deleteRecord", base)) 55 + .bearer_auth(&access_jwt) 56 + .json(&json!({ 57 + "repo": did, 58 + "collection": "app.bsky.feed.post", 59 + "rkey": rkey 60 + })) 61 + .send() 62 + .await 63 + .unwrap(); 64 + assert_eq!(delete_res.status(), StatusCode::OK); 65 + 66 + let status3 = client 67 + .get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base)) 68 + .bearer_auth(&access_jwt) 69 + .send() 70 + .await 71 + .unwrap(); 72 + let body3: Value = status3.json().await.unwrap(); 73 + let after_delete_blocks = body3["repoBlocks"].as_i64().unwrap(); 74 + assert!( 75 + after_delete_blocks >= after_create_blocks, 76 + "Block count should not decrease after deleting a record (was {}, now {})", 77 + after_create_blocks, 78 + after_delete_blocks 79 + ); 80 + } 81 + 82 + #[tokio::test] 83 + async fn test_check_account_status_returns_valid_repo_rev() { 84 + let client = client(); 85 + let base = base_url().await; 86 + let (access_jwt, _) = create_account_and_login(&client).await; 87 + 88 + let status = client 89 + .get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base)) 90 + .bearer_auth(&access_jwt) 91 + .send() 92 + .await 93 + .unwrap(); 94 + assert_eq!(status.status(), StatusCode::OK); 95 + let body: Value = status.json().await.unwrap(); 96 + 97 + let repo_rev = body["repoRev"].as_str().unwrap(); 98 + assert!(!repo_rev.is_empty(), "repoRev should not be empty"); 99 + assert!(repo_rev.chars().all(|c| c.is_alphanumeric()), "repoRev should be alphanumeric TID"); 100 + } 101 + 102 + #[tokio::test] 103 + async fn test_check_account_status_valid_did_is_true_for_active_account() { 104 + let client = client(); 105 + let base = base_url().await; 106 + let (access_jwt, _) = create_account_and_login(&client).await; 107 + 108 + let status = client 109 + .get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base)) 110 + .bearer_auth(&access_jwt) 111 + .send() 112 + .await 113 + .unwrap(); 114 + assert_eq!(status.status(), StatusCode::OK); 115 + let body: Value = status.json().await.unwrap(); 116 + 117 + assert_eq!(body["validDid"], true, "validDid should be true for active account with correct DID document"); 118 + assert_eq!(body["activated"], true, "activated should be true for active account"); 119 + } 120 + 121 + #[tokio::test] 122 + async fn test_deactivate_account_with_delete_after() { 123 + let client = client(); 124 + let base = base_url().await; 125 + let (access_jwt, _) = create_account_and_login(&client).await; 126 + 127 + let future_time = chrono::Utc::now() + chrono::Duration::hours(24); 128 + let delete_after = future_time.to_rfc3339(); 129 + 130 + let deactivate = client 131 + .post(format!("{}/xrpc/com.atproto.server.deactivateAccount", base)) 132 + .bearer_auth(&access_jwt) 133 + .json(&json!({ 134 + "deleteAfter": delete_after 135 + })) 136 + .send() 137 + .await 138 + .unwrap(); 139 + assert_eq!(deactivate.status(), StatusCode::OK); 140 + 141 + let status = client 142 + .get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base)) 143 + .bearer_auth(&access_jwt) 144 + .send() 145 + .await 146 + .unwrap(); 147 + assert_eq!(status.status(), StatusCode::OK); 148 + let body: Value = status.json().await.unwrap(); 149 + assert_eq!(body["activated"], false, "Account should be deactivated"); 150 + } 151 + 152 + #[tokio::test] 153 + async fn test_create_account_returns_did_doc() { 154 + let client = client(); 155 + let base = base_url().await; 156 + 157 + let handle = format!("diddoctest-{}", uuid::Uuid::new_v4()); 158 + let payload = json!({ 159 + "handle": handle, 160 + "email": format!("{}@example.com", handle), 161 + "password": "Testpass123!" 162 + }); 163 + 164 + let create_res = client 165 + .post(format!("{}/xrpc/com.atproto.server.createAccount", base)) 166 + .json(&payload) 167 + .send() 168 + .await 169 + .unwrap(); 170 + assert_eq!(create_res.status(), StatusCode::OK); 171 + let body: Value = create_res.json().await.unwrap(); 172 + 173 + assert!(body["accessJwt"].is_string(), "accessJwt should always be returned"); 174 + assert!(body["refreshJwt"].is_string(), "refreshJwt should always be returned"); 175 + assert!(body["did"].is_string(), "did should be returned"); 176 + 177 + if body["didDoc"].is_object() { 178 + let did_doc = &body["didDoc"]; 179 + assert!(did_doc["id"].is_string(), "didDoc should have id field"); 180 + } 181 + } 182 + 183 + #[tokio::test] 184 + async fn test_create_account_always_returns_tokens() { 185 + let client = client(); 186 + let base = base_url().await; 187 + 188 + let handle = format!("tokentest-{}", uuid::Uuid::new_v4()); 189 + let payload = json!({ 190 + "handle": handle, 191 + "email": format!("{}@example.com", handle), 192 + "password": "Testpass123!" 193 + }); 194 + 195 + let create_res = client 196 + .post(format!("{}/xrpc/com.atproto.server.createAccount", base)) 197 + .json(&payload) 198 + .send() 199 + .await 200 + .unwrap(); 201 + assert_eq!(create_res.status(), StatusCode::OK); 202 + let body: Value = create_res.json().await.unwrap(); 203 + 204 + let access_jwt = body["accessJwt"].as_str().expect("accessJwt should be present"); 205 + let refresh_jwt = body["refreshJwt"].as_str().expect("refreshJwt should be present"); 206 + 207 + assert!(!access_jwt.is_empty(), "accessJwt should not be empty"); 208 + assert!(!refresh_jwt.is_empty(), "refreshJwt should not be empty"); 209 + 210 + let parts: Vec<&str> = access_jwt.split('.').collect(); 211 + assert_eq!(parts.len(), 3, "accessJwt should be a valid JWT with 3 parts"); 212 + } 213 + 214 + #[tokio::test] 215 + async fn test_describe_server_has_links_and_contact() { 216 + let client = client(); 217 + let base = base_url().await; 218 + 219 + let describe = client 220 + .get(format!("{}/xrpc/com.atproto.server.describeServer", base)) 221 + .send() 222 + .await 223 + .unwrap(); 224 + assert_eq!(describe.status(), StatusCode::OK); 225 + let body: Value = describe.json().await.unwrap(); 226 + 227 + assert!(body.get("links").is_some(), "describeServer should include links object"); 228 + assert!(body.get("contact").is_some(), "describeServer should include contact object"); 229 + 230 + let links = &body["links"]; 231 + assert!(links.get("privacyPolicy").is_some() || links["privacyPolicy"].is_null(), 232 + "links should have privacyPolicy field (can be null)"); 233 + assert!(links.get("termsOfService").is_some() || links["termsOfService"].is_null(), 234 + "links should have termsOfService field (can be null)"); 235 + 236 + let contact = &body["contact"]; 237 + assert!(contact.get("email").is_some() || contact["email"].is_null(), 238 + "contact should have email field (can be null)"); 239 + } 240 + 241 + #[tokio::test] 242 + async fn test_delete_account_password_max_length() { 243 + let client = client(); 244 + let base = base_url().await; 245 + 246 + let handle = format!("pwdlentest-{}", uuid::Uuid::new_v4()); 247 + let payload = json!({ 248 + "handle": handle, 249 + "email": format!("{}@example.com", handle), 250 + "password": "Testpass123!" 251 + }); 252 + 253 + let create_res = client 254 + .post(format!("{}/xrpc/com.atproto.server.createAccount", base)) 255 + .json(&payload) 256 + .send() 257 + .await 258 + .unwrap(); 259 + assert_eq!(create_res.status(), StatusCode::OK); 260 + let body: Value = create_res.json().await.unwrap(); 261 + let did = body["did"].as_str().unwrap(); 262 + 263 + let too_long_password = "a".repeat(600); 264 + let delete_res = client 265 + .post(format!("{}/xrpc/com.atproto.server.deleteAccount", base)) 266 + .json(&json!({ 267 + "did": did, 268 + "password": too_long_password, 269 + "token": "fake-token" 270 + })) 271 + .send() 272 + .await 273 + .unwrap(); 274 + 275 + assert_eq!(delete_res.status(), StatusCode::BAD_REQUEST); 276 + let error_body: Value = delete_res.json().await.unwrap(); 277 + assert!(error_body["message"].as_str().unwrap().contains("password length") 278 + || error_body["error"].as_str().unwrap() == "InvalidRequest"); 279 + }
+4 -1
tests/common/mod.rs
··· 466 .await 467 .expect("Failed to mark user as admin"); 468 } 469 if let Some(access_jwt) = body["accessJwt"].as_str() { 470 - return (access_jwt.to_string(), did); 471 } 472 let body_text: String = sqlx::query_scalar!( 473 "SELECT body FROM comms_queue WHERE user_id = (SELECT id FROM users WHERE did = $1) AND comms_type = 'email_verification' ORDER BY created_at DESC LIMIT 1",
··· 466 .await 467 .expect("Failed to mark user as admin"); 468 } 469 + let verification_required = body["verificationRequired"].as_bool().unwrap_or(true); 470 if let Some(access_jwt) = body["accessJwt"].as_str() { 471 + if !verification_required { 472 + return (access_jwt.to_string(), did); 473 + } 474 } 475 let body_text: String = sqlx::query_scalar!( 476 "SELECT body FROM comms_queue WHERE user_id = (SELECT id FROM users WHERE did = $1) AND comms_type = 'email_verification' ORDER BY created_at DESC LIMIT 1",
+2 -1
tests/sync_blob.rs
··· 8 async fn test_list_blobs_success() { 9 let client = client(); 10 let (access_jwt, did) = create_account_and_login(&client).await; 11 let blob_res = client 12 .post(format!( 13 "{}/xrpc/com.atproto.repo.uploadBlob", ··· 15 )) 16 .header(header::CONTENT_TYPE, "text/plain") 17 .bearer_auth(&access_jwt) 18 - .body("test blob content") 19 .send() 20 .await 21 .expect("Failed to upload blob");
··· 8 async fn test_list_blobs_success() { 9 let client = client(); 10 let (access_jwt, did) = create_account_and_login(&client).await; 11 + let unique_content = format!("test blob content {}", uuid::Uuid::new_v4()); 12 let blob_res = client 13 .post(format!( 14 "{}/xrpc/com.atproto.repo.uploadBlob", ··· 16 )) 17 .header(header::CONTENT_TYPE, "text/plain") 18 .bearer_auth(&access_jwt) 19 + .body(unique_content) 20 .send() 21 .await 22 .expect("Failed to upload blob");
+1 -1
tests/sync_conformance.rs
··· 162 163 let res = client 164 .get(format!( 165 - "{}/xrpc/com.atproto.sync.listRepos", 166 base_url().await 167 )) 168 .send()
··· 162 163 let res = client 164 .get(format!( 165 + "{}/xrpc/com.atproto.sync.listRepos?limit=1000", 166 base_url().await 167 )) 168 .send()
+4 -3
tests/sync_repo.rs
··· 552 tokio::time::sleep(std::time::Duration::from_millis(100)).await; 553 create_post(&client, &did, &jwt, &format!("Export test post {}", i)).await; 554 } 555 - let blob_data = b"blob data for sync export test"; 556 let upload_res = client 557 .post(format!( 558 "{}/xrpc/com.atproto.repo.uploadBlob", ··· 560 )) 561 .header(header::CONTENT_TYPE, "application/octet-stream") 562 .bearer_auth(&jwt) 563 - .body(blob_data.to_vec()) 564 .send() 565 .await 566 .expect("Failed to upload blob"); ··· 631 let retrieved_blob = get_blob_res.bytes().await.unwrap(); 632 assert_eq!( 633 retrieved_blob.as_ref(), 634 - blob_data, 635 "Retrieved blob should match uploaded data" 636 ); 637 let latest_commit_res = client
··· 552 tokio::time::sleep(std::time::Duration::from_millis(100)).await; 553 create_post(&client, &did, &jwt, &format!("Export test post {}", i)).await; 554 } 555 + let blob_data = format!("blob data for sync export test {}", uuid::Uuid::new_v4()); 556 + let blob_bytes = blob_data.as_bytes().to_vec(); 557 let upload_res = client 558 .post(format!( 559 "{}/xrpc/com.atproto.repo.uploadBlob", ··· 561 )) 562 .header(header::CONTENT_TYPE, "application/octet-stream") 563 .bearer_auth(&jwt) 564 + .body(blob_bytes.clone()) 565 .send() 566 .await 567 .expect("Failed to upload blob"); ··· 632 let retrieved_blob = get_blob_res.bytes().await.unwrap(); 633 assert_eq!( 634 retrieved_blob.as_ref(), 635 + blob_bytes.as_slice(), 636 "Retrieved blob should match uploaded data" 637 ); 638 let latest_commit_res = client