this repo has no description

Compare changes

Choose any two refs to compare.

Changed files
+919 -1444
.sqlx
crates
-22
.sqlx/query-05fd99170e31e68fa5028c862417cdf535cd70e09fde0a8a28249df0070eb2fc.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT t.token FROM plc_operation_tokens t JOIN users u ON t.user_id = u.id WHERE u.did = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "token", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - false 19 - ] 20 - }, 21 - "hash": "05fd99170e31e68fa5028c862417cdf535cd70e09fde0a8a28249df0070eb2fc" 22 - }
-15
.sqlx/query-0710b57fb9aa933525f617b15e6e2e5feaa9c59c38ec9175568abdacda167107.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "UPDATE users SET deactivated_at = $1 WHERE did = $2", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Timestamptz", 9 - "Text" 10 - ] 11 - }, 12 - "nullable": [] 13 - }, 14 - "hash": "0710b57fb9aa933525f617b15e6e2e5feaa9c59c38ec9175568abdacda167107" 15 - }
+15
.sqlx/query-0c5ef3ffbd4d540dbd4ea993ea4af292977d35e0aed9bcc887b394f04468e2d7.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO backup_codes (did, code_hash, created_at)\n SELECT $1, hash, NOW() FROM UNNEST($2::text[]) AS t(hash)\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Text", 9 + "TextArray" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "0c5ef3ffbd4d540dbd4ea993ea4af292977d35e0aed9bcc887b394f04468e2d7" 15 + }
-22
.sqlx/query-0ec60bb854a4991d0d7249a68f7445b65c8cc8c723baca221d85f5e4f2478b99.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT body FROM comms_queue WHERE user_id = (SELECT id FROM users WHERE did = $1) AND comms_type = 'email_update' ORDER BY created_at DESC LIMIT 1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "body", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - false 19 - ] 20 - }, 21 - "hash": "0ec60bb854a4991d0d7249a68f7445b65c8cc8c723baca221d85f5e4f2478b99" 22 - }
+16
.sqlx/query-2232b75368a91a61256976ddb659523f041b3faa3075cc61c850c1f31f7c4d78.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO record_blobs (repo_id, record_uri, blob_cid)\n SELECT $1, * FROM UNNEST($2::text[], $3::text[])\n ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Uuid", 9 + "TextArray", 10 + "TextArray" 11 + ] 12 + }, 13 + "nullable": [] 14 + }, 15 + "hash": "2232b75368a91a61256976ddb659523f041b3faa3075cc61c850c1f31f7c4d78" 16 + }
-22
.sqlx/query-24a7686c535e4f0332f45daa20cfce2209635090252ac3692823450431d03dc6.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT COUNT(*) FROM comms_queue WHERE status = 'pending' AND user_id = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "count", 9 - "type_info": "Int8" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Uuid" 15 - ] 16 - }, 17 - "nullable": [ 18 - null 19 - ] 20 - }, 21 - "hash": "24a7686c535e4f0332f45daa20cfce2209635090252ac3692823450431d03dc6" 22 - }
-16
.sqlx/query-297e5495004fa601f86b3ada9e512815d4b7d73aacf3f3654628c93e5db8b791.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n INSERT INTO record_blobs (repo_id, record_uri, blob_cid)\n VALUES ($1, $2, $3)\n ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING\n ", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Uuid", 9 - "Text", 10 - "Text" 11 - ] 12 - }, 13 - "nullable": [] 14 - }, 15 - "hash": "297e5495004fa601f86b3ada9e512815d4b7d73aacf3f3654628c93e5db8b791" 16 - }
-14
.sqlx/query-29ef76852bb89af1ab9e679ceaa4abcf8bc8268a348d3be0da9840d1708d20b5.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "UPDATE users SET password_reset_code_expires_at = NOW() - INTERVAL '1 hour' WHERE email = $1", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Text" 9 - ] 10 - }, 11 - "nullable": [] 12 - }, 13 - "hash": "29ef76852bb89af1ab9e679ceaa4abcf8bc8268a348d3be0da9840d1708d20b5" 14 - }
-18
.sqlx/query-2f5fb86d249903ea40240658b4f8fd5a8d96120e92d791ff446b441f9222f00f.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n UPDATE oauth_token\n SET token_id = $2, current_refresh_token = $3, expires_at = $4, updated_at = NOW(),\n previous_refresh_token = $5, rotated_at = NOW()\n WHERE id = $1\n ", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Int4", 9 - "Text", 10 - "Text", 11 - "Timestamptz", 12 - "Text" 13 - ] 14 - }, 15 - "nullable": [] 16 - }, 17 - "hash": "2f5fb86d249903ea40240658b4f8fd5a8d96120e92d791ff446b441f9222f00f" 18 - }
-54
.sqlx/query-4445cc86cdf04894b340e67661b79a3c411917144a011f50849b737130b24dbe.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT subject, body, comms_type as \"comms_type: String\" FROM comms_queue WHERE user_id = $1 AND comms_type = 'admin_email' ORDER BY created_at DESC LIMIT 1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "subject", 9 - "type_info": "Text" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "body", 14 - "type_info": "Text" 15 - }, 16 - { 17 - "ordinal": 2, 18 - "name": "comms_type: String", 19 - "type_info": { 20 - "Custom": { 21 - "name": "comms_type", 22 - "kind": { 23 - "Enum": [ 24 - "welcome", 25 - "email_verification", 26 - "password_reset", 27 - "email_update", 28 - "account_deletion", 29 - "admin_email", 30 - "plc_operation", 31 - "two_factor_code", 32 - "channel_verification", 33 - "passkey_recovery", 34 - "legacy_login_alert", 35 - "migration_verification" 36 - ] 37 - } 38 - } 39 - } 40 - } 41 - ], 42 - "parameters": { 43 - "Left": [ 44 - "Uuid" 45 - ] 46 - }, 47 - "nullable": [ 48 - true, 49 - false, 50 - false 51 - ] 52 - }, 53 - "hash": "4445cc86cdf04894b340e67661b79a3c411917144a011f50849b737130b24dbe" 54 - }
-22
.sqlx/query-4560c237741ce9d4166aecd669770b3360a3ac71e649b293efb88d92c3254068.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT id FROM users WHERE email = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "id", 9 - "type_info": "Uuid" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - false 19 - ] 20 - }, 21 - "hash": "4560c237741ce9d4166aecd669770b3360a3ac71e649b293efb88d92c3254068" 22 - }
-28
.sqlx/query-4649e8daefaf4cfefc5cb2de8b3813f13f5892f653128469be727b686e6a0f0a.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT body, metadata FROM comms_queue WHERE user_id = $1 AND comms_type = 'channel_verification' ORDER BY created_at DESC LIMIT 1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "body", 9 - "type_info": "Text" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "metadata", 14 - "type_info": "Jsonb" 15 - } 16 - ], 17 - "parameters": { 18 - "Left": [ 19 - "Uuid" 20 - ] 21 - }, 22 - "nullable": [ 23 - false, 24 - true 25 - ] 26 - }, 27 - "hash": "4649e8daefaf4cfefc5cb2de8b3813f13f5892f653128469be727b686e6a0f0a" 28 - }
-28
.sqlx/query-47fe4a54857344d8f789f37092a294cd58f64b4fb431b54b5deda13d64525e88.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT token, expires_at FROM account_deletion_requests WHERE did = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "token", 9 - "type_info": "Text" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "expires_at", 14 - "type_info": "Timestamptz" 15 - } 16 - ], 17 - "parameters": { 18 - "Left": [ 19 - "Text" 20 - ] 21 - }, 22 - "nullable": [ 23 - false, 24 - false 25 - ] 26 - }, 27 - "hash": "47fe4a54857344d8f789f37092a294cd58f64b4fb431b54b5deda13d64525e88" 28 - }
-22
.sqlx/query-49cbc923cc4a0dcf7dea4ead5ab9580ff03b717586c4ca2d5343709e2dac86b6.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT email_verified FROM users WHERE did = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "email_verified", 9 - "type_info": "Bool" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - false 19 - ] 20 - }, 21 - "hash": "49cbc923cc4a0dcf7dea4ead5ab9580ff03b717586c4ca2d5343709e2dac86b6" 22 - }
-17
.sqlx/query-59678fbb756d46bb5f51c9a52800a8d203ed52129b1fae65145df92d145d18de.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "INSERT INTO invite_codes (code, available_uses, created_by_user, for_account) VALUES ($1, $2, $3, $4)", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Text", 9 - "Int4", 10 - "Uuid", 11 - "Text" 12 - ] 13 - }, 14 - "nullable": [] 15 - }, 16 - "hash": "59678fbb756d46bb5f51c9a52800a8d203ed52129b1fae65145df92d145d18de" 17 - }
-28
.sqlx/query-5a016f289caf75177731711e56e92881ba343c73a9a6e513e205c801c5943ec0.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT k.key_bytes, k.encryption_version\n FROM user_keys k\n JOIN users u ON k.user_id = u.id\n WHERE u.did = $1\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "key_bytes", 9 - "type_info": "Bytea" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "encryption_version", 14 - "type_info": "Int4" 15 - } 16 - ], 17 - "parameters": { 18 - "Left": [ 19 - "Text" 20 - ] 21 - }, 22 - "nullable": [ 23 - false, 24 - true 25 - ] 26 - }, 27 - "hash": "5a016f289caf75177731711e56e92881ba343c73a9a6e513e205c801c5943ec0" 28 - }
-22
.sqlx/query-5a036d95feedcbe6fb6396b10a7b4bd6a2eedeefda46a23e6a904cdbc3a65d45.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT body FROM comms_queue WHERE user_id = $1 AND comms_type = 'email_update' ORDER BY created_at DESC LIMIT 1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "body", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Uuid" 15 - ] 16 - }, 17 - "nullable": [ 18 - false 19 - ] 20 - }, 21 - "hash": "5a036d95feedcbe6fb6396b10a7b4bd6a2eedeefda46a23e6a904cdbc3a65d45" 22 - }
+17
.sqlx/query-6830cc85b246f5127419b0ed58f81d8ffee3806a3077281828f4bd2b8dfa7628.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n INSERT INTO invite_codes (code, available_uses, created_by_user, for_account)\n SELECT code, $2, $3, $4 FROM UNNEST($1::text[]) AS t(code)\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "TextArray", 9 + "Int4", 10 + "Uuid", 11 + "Text" 12 + ] 13 + }, 14 + "nullable": [] 15 + }, 16 + "hash": "6830cc85b246f5127419b0ed58f81d8ffee3806a3077281828f4bd2b8dfa7628" 17 + }
-34
.sqlx/query-6a3a5d1d2cf871652a9d4d8ddb79cf26d24d9acb67e48123ca98423502eaac47.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT u.did, u.handle, icu.used_at\n FROM invite_code_uses icu\n JOIN users u ON icu.used_by_user = u.id\n WHERE icu.code = $1\n ORDER BY icu.used_at DESC\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "did", 9 - "type_info": "Text" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "handle", 14 - "type_info": "Text" 15 - }, 16 - { 17 - "ordinal": 2, 18 - "name": "used_at", 19 - "type_info": "Timestamptz" 20 - } 21 - ], 22 - "parameters": { 23 - "Left": [ 24 - "Text" 25 - ] 26 - }, 27 - "nullable": [ 28 - false, 29 - false, 30 - false 31 - ] 32 - }, 33 - "hash": "6a3a5d1d2cf871652a9d4d8ddb79cf26d24d9acb67e48123ca98423502eaac47" 34 - }
+34
.sqlx/query-779f30b9db69294997c00bc446918b3141a67c64758823256b1da11fd9e9480b.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT u.did, u.handle, icu.used_at\n FROM invite_code_uses icu\n JOIN users u ON icu.used_by_user = u.id\n WHERE icu.code = $1\n ORDER BY icu.used_at DESC\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "did", 9 + "type_info": "Text" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "handle", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "used_at", 19 + "type_info": "Timestamptz" 20 + } 21 + ], 22 + "parameters": { 23 + "Left": [ 24 + "Text" 25 + ] 26 + }, 27 + "nullable": [ 28 + false, 29 + false, 30 + false 31 + ] 32 + }, 33 + "hash": "779f30b9db69294997c00bc446918b3141a67c64758823256b1da11fd9e9480b" 34 + }
-22
.sqlx/query-785a864944c5939331704c71b0cd3ed26ffdd64f3fd0f26ecc28b6a0557bbe8f.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT subject FROM comms_queue WHERE user_id = $1 AND comms_type = 'admin_email' AND body = 'Email without subject' LIMIT 1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "subject", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Uuid" 15 - ] 16 - }, 17 - "nullable": [ 18 - true 19 - ] 20 - }, 21 - "hash": "785a864944c5939331704c71b0cd3ed26ffdd64f3fd0f26ecc28b6a0557bbe8f" 22 - }
-22
.sqlx/query-7caa8f9083b15ec1209dda35c4c6f6fba9fe338e4a6a10636b5389d426df1631.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT t.token\n FROM plc_operation_tokens t\n JOIN users u ON t.user_id = u.id\n WHERE u.did = $1\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "token", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - false 19 - ] 20 - }, 21 - "hash": "7caa8f9083b15ec1209dda35c4c6f6fba9fe338e4a6a10636b5389d426df1631" 22 - }
-28
.sqlx/query-82717b6f61cd79347e1ca7e92c4413743ba168d1e0d8b85566711e54d4048f81.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT t.token, t.expires_at FROM plc_operation_tokens t JOIN users u ON t.user_id = u.id WHERE u.did = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "token", 9 - "type_info": "Text" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "expires_at", 14 - "type_info": "Timestamptz" 15 - } 16 - ], 17 - "parameters": { 18 - "Left": [ 19 - "Text" 20 - ] 21 - }, 22 - "nullable": [ 23 - false, 24 - false 25 - ] 26 - }, 27 - "hash": "82717b6f61cd79347e1ca7e92c4413743ba168d1e0d8b85566711e54d4048f81" 28 - }
-22
.sqlx/query-9ad422bf3c43e3cfd86fc88c73594246ead214ca794760d3fe77bb5cf4f27be5.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT body FROM comms_queue WHERE user_id = (SELECT id FROM users WHERE did = $1) AND comms_type = 'email_verification' ORDER BY created_at DESC LIMIT 1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "body", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - false 19 - ] 20 - }, 21 - "hash": "9ad422bf3c43e3cfd86fc88c73594246ead214ca794760d3fe77bb5cf4f27be5" 22 - }
-28
.sqlx/query-9b035b051769e6b9d45910a8bb42ac0f84c73de8c244ba4560f004ee3f4b7002.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT did, public_key_did_key FROM reserved_signing_keys WHERE public_key_did_key = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "did", 9 - "type_info": "Text" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "public_key_did_key", 14 - "type_info": "Text" 15 - } 16 - ], 17 - "parameters": { 18 - "Left": [ 19 - "Text" 20 - ] 21 - }, 22 - "nullable": [ 23 - true, 24 - false 25 - ] 26 - }, 27 - "hash": "9b035b051769e6b9d45910a8bb42ac0f84c73de8c244ba4560f004ee3f4b7002" 28 - }
-108
.sqlx/query-9e772a967607553a0ab800970eaeadcaab7e06bdb79e0c89eb919b1bc1d6fabe.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT\n id, user_id, recipient, subject, body,\n channel as \"channel: CommsChannel\",\n comms_type as \"comms_type: CommsType\",\n status as \"status: CommsStatus\"\n FROM comms_queue\n WHERE id = $1\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "id", 9 - "type_info": "Uuid" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "user_id", 14 - "type_info": "Uuid" 15 - }, 16 - { 17 - "ordinal": 2, 18 - "name": "recipient", 19 - "type_info": "Text" 20 - }, 21 - { 22 - "ordinal": 3, 23 - "name": "subject", 24 - "type_info": "Text" 25 - }, 26 - { 27 - "ordinal": 4, 28 - "name": "body", 29 - "type_info": "Text" 30 - }, 31 - { 32 - "ordinal": 5, 33 - "name": "channel: CommsChannel", 34 - "type_info": { 35 - "Custom": { 36 - "name": "comms_channel", 37 - "kind": { 38 - "Enum": [ 39 - "email", 40 - "discord", 41 - "telegram", 42 - "signal" 43 - ] 44 - } 45 - } 46 - } 47 - }, 48 - { 49 - "ordinal": 6, 50 - "name": "comms_type: CommsType", 51 - "type_info": { 52 - "Custom": { 53 - "name": "comms_type", 54 - "kind": { 55 - "Enum": [ 56 - "welcome", 57 - "email_verification", 58 - "password_reset", 59 - "email_update", 60 - "account_deletion", 61 - "admin_email", 62 - "plc_operation", 63 - "two_factor_code", 64 - "channel_verification", 65 - "passkey_recovery", 66 - "legacy_login_alert", 67 - "migration_verification" 68 - ] 69 - } 70 - } 71 - } 72 - }, 73 - { 74 - "ordinal": 7, 75 - "name": "status: CommsStatus", 76 - "type_info": { 77 - "Custom": { 78 - "name": "comms_status", 79 - "kind": { 80 - "Enum": [ 81 - "pending", 82 - "processing", 83 - "sent", 84 - "failed" 85 - ] 86 - } 87 - } 88 - } 89 - } 90 - ], 91 - "parameters": { 92 - "Left": [ 93 - "Uuid" 94 - ] 95 - }, 96 - "nullable": [ 97 - false, 98 - false, 99 - false, 100 - true, 101 - false, 102 - false, 103 - false, 104 - false 105 - ] 106 - }, 107 - "hash": "9e772a967607553a0ab800970eaeadcaab7e06bdb79e0c89eb919b1bc1d6fabe" 108 - }
-34
.sqlx/query-a23a390659616779d7dbceaa3b5d5171e70fa25e3b8393e142cebcbff752f0f5.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT private_key_bytes, expires_at, used_at FROM reserved_signing_keys WHERE public_key_did_key = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "private_key_bytes", 9 - "type_info": "Bytea" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "expires_at", 14 - "type_info": "Timestamptz" 15 - }, 16 - { 17 - "ordinal": 2, 18 - "name": "used_at", 19 - "type_info": "Timestamptz" 20 - } 21 - ], 22 - "parameters": { 23 - "Left": [ 24 - "Text" 25 - ] 26 - }, 27 - "nullable": [ 28 - false, 29 - false, 30 - true 31 - ] 32 - }, 33 - "hash": "a23a390659616779d7dbceaa3b5d5171e70fa25e3b8393e142cebcbff752f0f5" 34 - }
-22
.sqlx/query-a802d7d860f263eace39ce82bb27b633cec7287c1cc177f0e1d47ec6571564d5.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT token FROM account_deletion_requests WHERE did = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "token", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - false 19 - ] 20 - }, 21 - "hash": "a802d7d860f263eace39ce82bb27b633cec7287c1cc177f0e1d47ec6571564d5" 22 - }
+17
.sqlx/query-ab5e6c5bc904ae54f8c559f6e1c26f8293851815a1b4666a093750fe249626b6.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n UPDATE oauth_token\n SET current_refresh_token = $2, expires_at = $3, updated_at = NOW(),\n previous_refresh_token = $4, rotated_at = NOW()\n WHERE id = $1\n ", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Int4", 9 + "Text", 10 + "Timestamptz", 11 + "Text" 12 + ] 13 + }, 14 + "nullable": [] 15 + }, 16 + "hash": "ab5e6c5bc904ae54f8c559f6e1c26f8293851815a1b4666a093750fe249626b6" 17 + }
-60
.sqlx/query-b0fca342e85dea89a06b4fee144cae4825dec587b1387f0fee401458aea2a2e5.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT\n recipient, subject, body,\n comms_type as \"comms_type: CommsType\"\n FROM comms_queue\n WHERE id = $1\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "recipient", 9 - "type_info": "Text" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "subject", 14 - "type_info": "Text" 15 - }, 16 - { 17 - "ordinal": 2, 18 - "name": "body", 19 - "type_info": "Text" 20 - }, 21 - { 22 - "ordinal": 3, 23 - "name": "comms_type: CommsType", 24 - "type_info": { 25 - "Custom": { 26 - "name": "comms_type", 27 - "kind": { 28 - "Enum": [ 29 - "welcome", 30 - "email_verification", 31 - "password_reset", 32 - "email_update", 33 - "account_deletion", 34 - "admin_email", 35 - "plc_operation", 36 - "two_factor_code", 37 - "channel_verification", 38 - "passkey_recovery", 39 - "legacy_login_alert", 40 - "migration_verification" 41 - ] 42 - } 43 - } 44 - } 45 - } 46 - ], 47 - "parameters": { 48 - "Left": [ 49 - "Uuid" 50 - ] 51 - }, 52 - "nullable": [ 53 - false, 54 - true, 55 - false, 56 - false 57 - ] 58 - }, 59 - "hash": "b0fca342e85dea89a06b4fee144cae4825dec587b1387f0fee401458aea2a2e5" 60 - }
-22
.sqlx/query-cd3b8098ad4c1056c1d23acd8a6b29f7abfe18ee6f559bd94ab16274b1cfdfee.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT password_reset_code FROM users WHERE email = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "password_reset_code", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - true 19 - ] 20 - }, 21 - "hash": "cd3b8098ad4c1056c1d23acd8a6b29f7abfe18ee6f559bd94ab16274b1cfdfee" 22 - }
-22
.sqlx/query-cda68f9b6c60295a196fc853b70ec5fd51a8ffaa2bac5942c115c99d1cbcafa3.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT COUNT(*) as \"count!\" FROM plc_operation_tokens t JOIN users u ON t.user_id = u.id WHERE u.did = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "count!", 9 - "type_info": "Int8" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - null 19 - ] 20 - }, 21 - "hash": "cda68f9b6c60295a196fc853b70ec5fd51a8ffaa2bac5942c115c99d1cbcafa3" 22 - }
-14
.sqlx/query-d529d6dc9858c1da360f0417e94a3b40041b043bae57e95002d4bf5df46a4ab4.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "UPDATE account_deletion_requests SET expires_at = NOW() - INTERVAL '1 hour' WHERE token = $1", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Text" 9 - ] 10 - }, 11 - "nullable": [] 12 - }, 13 - "hash": "d529d6dc9858c1da360f0417e94a3b40041b043bae57e95002d4bf5df46a4ab4" 14 - }
-22
.sqlx/query-e20cbe2a939d790aaea718b084a80d8ede655ba1cc0fd4346d7e91d6de7d6cf3.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT COUNT(*) FROM comms_queue WHERE user_id = $1 AND comms_type = 'password_reset'", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "count", 9 - "type_info": "Int8" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Uuid" 15 - ] 16 - }, 17 - "nullable": [ 18 - null 19 - ] 20 - }, 21 - "hash": "e20cbe2a939d790aaea718b084a80d8ede655ba1cc0fd4346d7e91d6de7d6cf3" 22 - }
-22
.sqlx/query-e64cd36284d10ab7f3d9f6959975a1a627809f444b0faff7e611d985f31b90e9.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT used_at FROM reserved_signing_keys WHERE public_key_did_key = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "used_at", 9 - "type_info": "Timestamptz" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - true 19 - ] 20 - }, 21 - "hash": "e64cd36284d10ab7f3d9f6959975a1a627809f444b0faff7e611d985f31b90e9" 22 - }
-15
.sqlx/query-eb5c82249de786f8245df805f0489415a4cbdb0de95703bd064ea0f5d635980d.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "INSERT INTO backup_codes (did, code_hash, created_at) VALUES ($1, $2, NOW())", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Text", 9 - "Text" 10 - ] 11 - }, 12 - "nullable": [] 13 - }, 14 - "hash": "eb5c82249de786f8245df805f0489415a4cbdb0de95703bd064ea0f5d635980d" 15 - }
-22
.sqlx/query-f26c13023b47b908ec96da2e6b8bf8b34ca6a2246c20fc96f76f0e95530762a7.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT email FROM users WHERE did = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "email", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - true 19 - ] 20 - }, 21 - "hash": "f26c13023b47b908ec96da2e6b8bf8b34ca6a2246c20fc96f76f0e95530762a7" 22 - }
-14
.sqlx/query-f29da3bdfbbc547b339b4cdb059fac26435b0feec65cf1c56f851d1c4d6b1814.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "UPDATE users SET is_admin = TRUE WHERE did = $1", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Text" 9 - ] 10 - }, 11 - "nullable": [] 12 - }, 13 - "hash": "f29da3bdfbbc547b339b4cdb059fac26435b0feec65cf1c56f851d1c4d6b1814" 14 - }
-28
.sqlx/query-f7af28963099aec12cf1d4f8a9a03699bb3a90f39bc9c4c0f738a37827e8f382.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT password_reset_code, password_reset_code_expires_at FROM users WHERE email = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "password_reset_code", 9 - "type_info": "Text" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "password_reset_code_expires_at", 14 - "type_info": "Timestamptz" 15 - } 16 - ], 17 - "parameters": { 18 - "Left": [ 19 - "Text" 20 - ] 21 - }, 22 - "nullable": [ 23 - true, 24 - true 25 - ] 26 - }, 27 - "hash": "f7af28963099aec12cf1d4f8a9a03699bb3a90f39bc9c4c0f738a37827e8f382" 28 - }
+15
Cargo.lock
··· 776 776 checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" 777 777 dependencies = [ 778 778 "fastrand", 779 + "gloo-timers", 780 + "tokio", 779 781 ] 780 782 781 783 [[package]] ··· 2295 2297 "thiserror 1.0.69", 2296 2298 "wasm-bindgen", 2297 2299 "web-sys", 2300 + ] 2301 + 2302 + [[package]] 2303 + name = "gloo-timers" 2304 + version = "0.3.0" 2305 + source = "registry+https://github.com/rust-lang/crates.io-index" 2306 + checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" 2307 + dependencies = [ 2308 + "futures-channel", 2309 + "futures-core", 2310 + "js-sys", 2311 + "wasm-bindgen", 2298 2312 ] 2299 2313 2300 2314 [[package]] ··· 6427 6441 "aws-config", 6428 6442 "aws-sdk-s3", 6429 6443 "axum", 6444 + "backon", 6430 6445 "base32", 6431 6446 "base64 0.22.1", 6432 6447 "bcrypt",
+1
Cargo.toml
··· 32 32 tranquil-comms = { path = "crates/tranquil-comms" } 33 33 34 34 aes-gcm = "0.10" 35 + backon = "1" 35 36 anyhow = "1.0" 36 37 async-trait = "0.1" 37 38 aws-config = "1.8"
+20 -21
crates/tranquil-oauth/src/client.rs
··· 529 529 let signature_bytes = URL_SAFE_NO_PAD 530 530 .decode(parts[2]) 531 531 .map_err(|_| OAuthError::InvalidClient("Invalid signature encoding".to_string()))?; 532 - for key in matching_keys { 533 - let key_alg = key.get("alg").and_then(|a| a.as_str()); 534 - if key_alg.is_some() && key_alg != Some(alg) { 535 - continue; 536 - } 537 - let kty = key.get("kty").and_then(|k| k.as_str()).unwrap_or(""); 538 - let verified = match (alg, kty) { 539 - ("ES256", "EC") => verify_es256(key, &signing_input, &signature_bytes), 540 - ("ES384", "EC") => verify_es384(key, &signing_input, &signature_bytes), 541 - ("RS256" | "RS384" | "RS512", "RSA") => { 542 - verify_rsa(alg, key, &signing_input, &signature_bytes) 532 + matching_keys 533 + .into_iter() 534 + .filter(|key| { 535 + let key_alg = key.get("alg").and_then(|a| a.as_str()); 536 + key_alg.is_none() || key_alg == Some(alg) 537 + }) 538 + .find_map(|key| { 539 + let kty = key.get("kty").and_then(|k| k.as_str()).unwrap_or(""); 540 + match (alg, kty) { 541 + ("ES256", "EC") => verify_es256(key, &signing_input, &signature_bytes).ok(), 542 + ("ES384", "EC") => verify_es384(key, &signing_input, &signature_bytes).ok(), 543 + ("RS256" | "RS384" | "RS512", "RSA") => { 544 + verify_rsa(alg, key, &signing_input, &signature_bytes).ok() 545 + } 546 + ("EdDSA", "OKP") => verify_eddsa(key, &signing_input, &signature_bytes).ok(), 547 + _ => None, 543 548 } 544 - ("EdDSA", "OKP") => verify_eddsa(key, &signing_input, &signature_bytes), 545 - _ => continue, 546 - }; 547 - if verified.is_ok() { 548 - return Ok(()); 549 - } 550 - } 551 - Err(OAuthError::InvalidClient( 552 - "client_assertion signature verification failed".to_string(), 553 - )) 549 + }) 550 + .ok_or_else(|| { 551 + OAuthError::InvalidClient("client_assertion signature verification failed".to_string()) 552 + }) 554 553 } 555 554 556 555 fn verify_es256(
+1
crates/tranquil-pds/Cargo.toml
··· 17 17 tranquil-comms = { workspace = true } 18 18 19 19 aes-gcm = { workspace = true } 20 + backon = { workspace = true } 20 21 anyhow = { workspace = true } 21 22 async-trait = { workspace = true } 22 23 aws-config = { workspace = true }
+6 -7
crates/tranquil-pds/src/api/identity/account.rs
··· 189 189 if input.handle.contains(' ') || input.handle.contains('\t') { 190 190 return ApiError::InvalidRequest("Handle cannot contain spaces".into()).into_response(); 191 191 } 192 - for c in input.handle.chars() { 193 - if !c.is_ascii_alphanumeric() && c != '.' && c != '-' { 194 - return ApiError::InvalidRequest(format!( 195 - "Handle contains invalid character: {}", 196 - c 197 - )) 192 + if let Some(c) = input 193 + .handle 194 + .chars() 195 + .find(|c| !c.is_ascii_alphanumeric() && *c != '.' && *c != '-') 196 + { 197 + return ApiError::InvalidRequest(format!("Handle contains invalid character: {}", c)) 198 198 .into_response(); 199 - } 200 199 } 201 200 let handle_lower = input.handle.to_lowercase(); 202 201 if crate::moderation::has_explicit_slur(&handle_lower) {
+11 -10
crates/tranquil-pds/src/api/identity/did.rs
··· 639 639 return ApiError::InvalidHandle(Some("Handle contains invalid characters".into())) 640 640 .into_response(); 641 641 } 642 - for segment in new_handle.split('.') { 643 - if segment.is_empty() { 644 - return ApiError::InvalidHandle(Some("Handle contains empty segment".into())) 645 - .into_response(); 646 - } 647 - if segment.starts_with('-') || segment.ends_with('-') { 648 - return ApiError::InvalidHandle(Some( 649 - "Handle segment cannot start or end with hyphen".into(), 650 - )) 642 + if new_handle.split('.').any(|segment| segment.is_empty()) { 643 + return ApiError::InvalidHandle(Some("Handle contains empty segment".into())) 651 644 .into_response(); 652 - } 645 + } 646 + if new_handle 647 + .split('.') 648 + .any(|segment| segment.starts_with('-') || segment.ends_with('-')) 649 + { 650 + return ApiError::InvalidHandle(Some( 651 + "Handle segment cannot start or end with hyphen".into(), 652 + )) 653 + .into_response(); 653 654 } 654 655 if crate::moderation::has_explicit_slur(&new_handle) { 655 656 return ApiError::InvalidHandle(Some("Inappropriate language in handle".into()))
+1 -1
crates/tranquil-pds/src/api/proxy.rs
··· 222 222 ) { 223 223 let token = extracted.token; 224 224 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 225 - let http_uri = uri.to_string(); 225 + let http_uri = crate::util::build_full_url(&uri.to_string()); 226 226 227 227 match crate::auth::validate_token_with_dpop( 228 228 &state.db,
+48 -29
crates/tranquil-pds/src/api/repo/import.rs
··· 5 5 use crate::state::AppState; 6 6 use crate::sync::import::{ImportError, apply_import, parse_car}; 7 7 use crate::sync::verify::CarVerifier; 8 + use crate::types::Did; 8 9 use axum::{ 9 10 body::Bytes, 10 11 extract::State, ··· 196 197 import_result.records.len(), 197 198 did 198 199 ); 199 - let mut blob_ref_count = 0; 200 - for record in &import_result.records { 201 - for blob_ref in &record.blob_refs { 200 + let blob_refs: Vec<(String, String)> = import_result 201 + .records 202 + .iter() 203 + .flat_map(|record| { 202 204 let record_uri = format!("at://{}/{}/{}", did, record.collection, record.rkey); 203 - if let Err(e) = sqlx::query!( 204 - r#" 205 - INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 206 - VALUES ($1, $2, $3) 207 - ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 208 - "#, 209 - user_id, 210 - record_uri, 211 - blob_ref.cid 212 - ) 213 - .execute(&state.db) 214 - .await 215 - { 216 - warn!("Failed to insert record_blob for {}: {:?}", record_uri, e); 217 - } else { 218 - blob_ref_count += 1; 205 + record 206 + .blob_refs 207 + .iter() 208 + .map(move |blob_ref| (record_uri.clone(), blob_ref.cid.clone())) 209 + }) 210 + .collect(); 211 + 212 + if !blob_refs.is_empty() { 213 + let (record_uris, blob_cids): (Vec<String>, Vec<String>) = 214 + blob_refs.into_iter().unzip(); 215 + 216 + match sqlx::query!( 217 + r#" 218 + INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 219 + SELECT $1, * FROM UNNEST($2::text[], $3::text[]) 220 + ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 221 + "#, 222 + user_id, 223 + &record_uris, 224 + &blob_cids 225 + ) 226 + .execute(&state.db) 227 + .await 228 + { 229 + Ok(result) => { 230 + info!( 231 + "Recorded {} blob references for imported repo", 232 + result.rows_affected() 233 + ); 234 + } 235 + Err(e) => { 236 + warn!("Failed to insert record_blobs: {:?}", e); 219 237 } 220 238 } 221 - } 222 - if blob_ref_count > 0 { 223 - info!( 224 - "Recorded {} blob references for imported repo", 225 - blob_ref_count 226 - ); 227 239 } 228 240 let key_row = match sqlx::query!( 229 241 r#"SELECT uk.key_bytes, uk.encryption_version ··· 383 395 384 396 async fn sequence_import_event( 385 397 state: &AppState, 386 - did: &str, 398 + did: &Did, 387 399 commit_cid: &str, 388 400 ) -> Result<(), sqlx::Error> { 389 401 let prev_cid: Option<String> = None; ··· 391 403 let ops = serde_json::json!([]); 392 404 let blobs: Vec<String> = vec![]; 393 405 let blocks_cids: Vec<String> = vec![]; 406 + let did_str = did.as_str(); 407 + 408 + let mut tx = state.db.begin().await?; 409 + 394 410 let seq_row = sqlx::query!( 395 411 r#" 396 412 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids) 397 413 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7) 398 414 RETURNING seq 399 415 "#, 400 - did, 416 + did_str, 401 417 commit_cid, 402 418 prev_cid, 403 419 prev_data_cid, ··· 405 421 &blobs, 406 422 &blocks_cids 407 423 ) 408 - .fetch_one(&state.db) 424 + .fetch_one(&mut *tx) 409 425 .await?; 426 + 410 427 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 411 - .execute(&state.db) 428 + .execute(&mut *tx) 412 429 .await?; 430 + 431 + tx.commit().await?; 413 432 Ok(()) 414 433 }
+59 -49
crates/tranquil-pds/src/api/server/account_status.rs
··· 10 10 http::StatusCode, 11 11 response::{IntoResponse, Response}, 12 12 }; 13 + use backon::{ExponentialBuilder, Retryable}; 13 14 use bcrypt::verify; 14 15 use chrono::{Duration, Utc}; 15 16 use cid::Cid; ··· 19 20 use serde::{Deserialize, Serialize}; 20 21 use std::str::FromStr; 21 22 use std::sync::Arc; 23 + use std::sync::atomic::{AtomicUsize, Ordering}; 22 24 use tracing::{error, info, warn}; 23 25 use uuid::Uuid; 24 26 ··· 177 179 let expected_endpoint = format!("https://{}", hostname); 178 180 179 181 if did.starts_with("did:plc:") { 180 - let plc_client = PlcClient::with_cache(None, Some(cache.clone())); 181 - 182 182 let max_attempts = if with_retry { 5 } else { 1 }; 183 - let mut last_error = None; 184 - let mut doc_data = None; 185 - for attempt in 0..max_attempts { 186 - if attempt > 0 { 187 - let delay_ms = 500 * (1 << (attempt - 1)); 188 - info!( 189 - "Waiting {}ms before retry {} for DID document validation ({})", 190 - delay_ms, attempt, did 191 - ); 192 - tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; 193 - } 183 + let cache_for_retry = cache.clone(); 184 + let did_owned = did.to_string(); 185 + let expected_owned = expected_endpoint.clone(); 186 + let attempt_counter = Arc::new(AtomicUsize::new(0)); 194 187 195 - match plc_client.get_document_data(did).await { 196 - Ok(data) => { 197 - let pds_endpoint = data 198 - .get("services") 199 - .and_then(|s| s.get("atproto_pds").or_else(|| s.get("atprotoPds"))) 200 - .and_then(|p| p.get("endpoint")) 201 - .and_then(|e| e.as_str()); 188 + let doc_data: serde_json::Value = (|| { 189 + let cache_ref = cache_for_retry.clone(); 190 + let did_ref = did_owned.clone(); 191 + let expected_ref = expected_owned.clone(); 192 + let counter = attempt_counter.clone(); 193 + async move { 194 + let attempt = counter.fetch_add(1, Ordering::SeqCst); 195 + if attempt > 0 { 196 + info!( 197 + "Retry {} for DID document validation ({})", 198 + attempt, did_ref 199 + ); 200 + } 201 + let plc_client = PlcClient::with_cache(None, Some(cache_ref)); 202 + match plc_client.get_document_data(&did_ref).await { 203 + Ok(data) => { 204 + let pds_endpoint = data 205 + .get("services") 206 + .and_then(|s: &serde_json::Value| { 207 + s.get("atproto_pds").or_else(|| s.get("atprotoPds")) 208 + }) 209 + .and_then(|p: &serde_json::Value| p.get("endpoint")) 210 + .and_then(|e: &serde_json::Value| e.as_str()); 202 211 203 - if pds_endpoint == Some(&expected_endpoint) { 204 - doc_data = Some(data); 205 - break; 206 - } else { 207 - info!( 208 - "Attempt {}: DID {} has endpoint {:?}, expected {} - retrying", 212 + if pds_endpoint == Some(expected_ref.as_str()) { 213 + Ok(data) 214 + } else { 215 + info!( 216 + "Attempt {}: DID {} has endpoint {:?}, expected {}", 217 + attempt + 1, 218 + did_ref, 219 + pds_endpoint, 220 + expected_ref 221 + ); 222 + Err(format!( 223 + "DID document endpoint {:?} does not match expected {}", 224 + pds_endpoint, expected_ref 225 + )) 226 + } 227 + } 228 + Err(e) => { 229 + warn!( 230 + "Attempt {}: Failed to fetch PLC document for {}: {:?}", 209 231 attempt + 1, 210 - did, 211 - pds_endpoint, 212 - expected_endpoint 232 + did_ref, 233 + e 213 234 ); 214 - last_error = Some(format!( 215 - "DID document endpoint {:?} does not match expected {}", 216 - pds_endpoint, expected_endpoint 217 - )); 235 + Err(format!("Could not resolve DID document: {}", e)) 218 236 } 219 237 } 220 - Err(e) => { 221 - warn!( 222 - "Attempt {}: Failed to fetch PLC document for {}: {:?}", 223 - attempt + 1, 224 - did, 225 - e 226 - ); 227 - last_error = Some(format!("Could not resolve DID document: {}", e)); 228 - } 229 238 } 230 - } 231 - 232 - let Some(doc_data) = doc_data else { 233 - return Err(ApiError::InvalidRequest( 234 - last_error.unwrap_or_else(|| "DID document validation failed".to_string()), 235 - )); 236 - }; 239 + }) 240 + .retry( 241 + ExponentialBuilder::default() 242 + .with_min_delay(std::time::Duration::from_millis(500)) 243 + .with_max_times(max_attempts), 244 + ) 245 + .await 246 + .map_err(ApiError::InvalidRequest)?; 237 247 238 248 let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok(); 239 249 if let Some(ref expected_rotation_key) = server_rotation_key {
+7 -3
crates/tranquil-pds/src/api/server/app_password.rs
··· 254 254 error!("DB error revoking sessions for app password: {:?}", e); 255 255 return ApiError::InternalError(None).into_response(); 256 256 } 257 - for jti in &sessions_to_invalidate { 257 + futures::future::join_all(sessions_to_invalidate.iter().map(|jti| { 258 258 let cache_key = format!("auth:session:{}:{}", &auth_user.did, jti); 259 - let _ = state.cache.delete(&cache_key).await; 260 - } 259 + let cache = state.cache.clone(); 260 + async move { 261 + let _ = cache.delete(&cache_key).await; 262 + } 263 + })) 264 + .await; 261 265 if let Err(e) = sqlx::query!( 262 266 "DELETE FROM app_passwords WHERE user_id = $1 AND name = $2", 263 267 user_id,
+77 -75
crates/tranquil-pds/src/api/server/invite.rs
··· 15 15 16 16 fn gen_random_token() -> String { 17 17 let mut rng = rand::thread_rng(); 18 - let mut token = String::with_capacity(11); 19 - for i in 0..10 { 20 - if i == 5 { 21 - token.push('-'); 22 - } 23 - let idx = rng.gen_range(0..32); 24 - token.push(BASE32_ALPHABET[idx] as char); 25 - } 26 - token 18 + let gen_segment = |rng: &mut rand::rngs::ThreadRng, len: usize| -> String { 19 + (0..len) 20 + .map(|_| BASE32_ALPHABET[rng.gen_range(0..32)] as char) 21 + .collect() 22 + }; 23 + format!("{}-{}", gen_segment(&mut rng, 5), gen_segment(&mut rng, 5)) 27 24 } 28 25 29 26 fn gen_invite_code() -> String { ··· 132 129 } 133 130 }; 134 131 135 - let mut result_codes = Vec::new(); 136 - 137 - for account in for_accounts { 138 - let mut codes = Vec::new(); 139 - for _ in 0..code_count { 140 - let code = gen_invite_code(); 141 - if let Err(e) = sqlx::query!( 142 - "INSERT INTO invite_codes (code, available_uses, created_by_user, for_account) VALUES ($1, $2, $3, $4)", 143 - code, 144 - input.use_count, 132 + let result = futures::future::try_join_all(for_accounts.into_iter().map(|account| { 133 + let db = state.db.clone(); 134 + let use_count = input.use_count; 135 + async move { 136 + let codes: Vec<String> = (0..code_count).map(|_| gen_invite_code()).collect(); 137 + sqlx::query!( 138 + r#" 139 + INSERT INTO invite_codes (code, available_uses, created_by_user, for_account) 140 + SELECT code, $2, $3, $4 FROM UNNEST($1::text[]) AS t(code) 141 + "#, 142 + &codes[..], 143 + use_count, 145 144 admin_user_id, 146 145 account 147 146 ) 148 - .execute(&state.db) 147 + .execute(&db) 149 148 .await 150 - { 151 - error!("DB error creating invite code: {:?}", e); 152 - return ApiError::InternalError(None).into_response(); 153 - } 154 - codes.push(code); 149 + .map(|_| AccountCodes { account, codes }) 150 + } 151 + })) 152 + .await; 153 + 154 + match result { 155 + Ok(result_codes) => Json(CreateInviteCodesOutput { 156 + codes: result_codes, 157 + }) 158 + .into_response(), 159 + Err(e) => { 160 + error!("DB error creating invite codes: {:?}", e); 161 + ApiError::InternalError(None).into_response() 155 162 } 156 - result_codes.push(AccountCodes { account, codes }); 157 163 } 158 - 159 - Json(CreateInviteCodesOutput { 160 - codes: result_codes, 161 - }) 162 - .into_response() 163 164 } 164 165 165 166 #[derive(Deserialize)] ··· 227 228 } 228 229 }; 229 230 230 - let mut codes = Vec::new(); 231 - for row in codes_rows { 232 - let disabled = row.disabled.unwrap_or(false); 233 - if disabled { 234 - continue; 235 - } 236 - 237 - let use_count = row.use_count; 238 - if !include_used && use_count >= row.available_uses { 239 - continue; 240 - } 241 - 242 - let uses = sqlx::query!( 243 - r#" 244 - SELECT u.did, u.handle, icu.used_at 245 - FROM invite_code_uses icu 246 - JOIN users u ON icu.used_by_user = u.id 247 - WHERE icu.code = $1 248 - ORDER BY icu.used_at DESC 249 - "#, 250 - row.code 251 - ) 252 - .fetch_all(&state.db) 253 - .await 254 - .map(|use_rows| { 255 - use_rows 256 - .iter() 257 - .map(|u| InviteCodeUse { 258 - used_by: u.did.clone(), 259 - used_by_handle: Some(u.handle.clone()), 260 - used_at: u.used_at.to_rfc3339(), 261 - }) 262 - .collect() 231 + let filtered_rows: Vec<_> = codes_rows 232 + .into_iter() 233 + .filter(|row| { 234 + let disabled = row.disabled.unwrap_or(false); 235 + !disabled && (include_used || row.use_count < row.available_uses) 263 236 }) 264 - .unwrap_or_default(); 237 + .collect(); 265 238 266 - codes.push(InviteCode { 267 - code: row.code, 268 - available: row.available_uses, 269 - disabled, 270 - for_account: row.for_account, 271 - created_by: "admin".to_string(), 272 - created_at: row.created_at.to_rfc3339(), 273 - uses, 274 - }); 275 - } 239 + let codes = futures::future::join_all(filtered_rows.into_iter().map(|row| { 240 + let db = state.db.clone(); 241 + async move { 242 + let uses = sqlx::query!( 243 + r#" 244 + SELECT u.did, u.handle, icu.used_at 245 + FROM invite_code_uses icu 246 + JOIN users u ON icu.used_by_user = u.id 247 + WHERE icu.code = $1 248 + ORDER BY icu.used_at DESC 249 + "#, 250 + row.code 251 + ) 252 + .fetch_all(&db) 253 + .await 254 + .map(|use_rows| { 255 + use_rows 256 + .iter() 257 + .map(|u| InviteCodeUse { 258 + used_by: u.did.clone(), 259 + used_by_handle: Some(u.handle.clone()), 260 + used_at: u.used_at.to_rfc3339(), 261 + }) 262 + .collect() 263 + }) 264 + .unwrap_or_default(); 265 + 266 + InviteCode { 267 + code: row.code, 268 + available: row.available_uses, 269 + disabled: false, 270 + for_account: row.for_account, 271 + created_by: "admin".to_string(), 272 + created_at: row.created_at.to_rfc3339(), 273 + uses, 274 + } 275 + } 276 + })) 277 + .await; 276 278 277 279 Json(GetAccountInviteCodesOutput { codes }).into_response() 278 280 }
+18 -28
crates/tranquil-pds/src/api/server/migration.rs
··· 97 97 return ApiError::InvalidRequest("verification_methods cannot be empty".into()) 98 98 .into_response(); 99 99 } 100 - for method in methods { 100 + let validation_error = methods.iter().find_map(|method| { 101 101 if method.id.is_empty() { 102 - return ApiError::InvalidRequest("verification method id is required".into()) 103 - .into_response(); 104 - } 105 - if method.method_type != "Multikey" { 106 - return ApiError::InvalidRequest( 107 - "verification method type must be 'Multikey'".into(), 108 - ) 109 - .into_response(); 110 - } 111 - if !method.public_key_multibase.starts_with('z') { 112 - return ApiError::InvalidRequest( 113 - "publicKeyMultibase must start with 'z' (base58btc)".into(), 114 - ) 115 - .into_response(); 116 - } 117 - if method.public_key_multibase.len() < 40 { 118 - return ApiError::InvalidRequest( 119 - "publicKeyMultibase appears too short for a valid key".into(), 120 - ) 121 - .into_response(); 102 + Some("verification method id is required") 103 + } else if method.method_type != "Multikey" { 104 + Some("verification method type must be 'Multikey'") 105 + } else if !method.public_key_multibase.starts_with('z') { 106 + Some("publicKeyMultibase must start with 'z' (base58btc)") 107 + } else if method.public_key_multibase.len() < 40 { 108 + Some("publicKeyMultibase appears too short for a valid key") 109 + } else { 110 + None 122 111 } 112 + }); 113 + if let Some(err) = validation_error { 114 + return ApiError::InvalidRequest(err.into()).into_response(); 123 115 } 124 116 } 125 117 126 - if let Some(ref handles) = input.also_known_as { 127 - for handle in handles { 128 - if !handle.starts_with("at://") { 129 - return ApiError::InvalidRequest("alsoKnownAs entries must be at:// URIs".into()) 130 - .into_response(); 131 - } 132 - } 118 + if let Some(ref handles) = input.also_known_as 119 + && handles.iter().any(|h| !h.starts_with("at://")) 120 + { 121 + return ApiError::InvalidRequest("alsoKnownAs entries must be at:// URIs".into()) 122 + .into_response(); 133 123 } 134 124 135 125 if let Some(ref endpoint) = input.service_endpoint {
+40 -13
crates/tranquil-pds/src/api/server/passkey_account.rs
··· 813 813 return ApiError::InternalError(None).into_response(); 814 814 } 815 815 816 - let _ = crate::auth::webauthn::delete_registration_state(&state.db, &input.did).await; 817 - 818 816 let app_password = generate_app_password(); 819 817 let app_password_name = "bsky.app".to_string(); 820 818 let password_hash = match hash(&app_password, DEFAULT_COST) { ··· 825 823 } 826 824 }; 827 825 826 + let mut tx = match state.db.begin().await { 827 + Ok(tx) => tx, 828 + Err(e) => { 829 + error!("Failed to begin transaction: {:?}", e); 830 + return ApiError::InternalError(None).into_response(); 831 + } 832 + }; 833 + 828 834 if let Err(e) = sqlx::query!( 829 835 "INSERT INTO app_passwords (user_id, name, password_hash, privileged) VALUES ($1, $2, $3, FALSE)", 830 836 user.id, 831 837 app_password_name, 832 838 password_hash 833 839 ) 834 - .execute(&state.db) 840 + .execute(&mut *tx) 835 841 .await 836 842 { 837 843 error!("Error creating app password: {:?}", e); ··· 842 848 "UPDATE users SET recovery_token = NULL, recovery_token_expires_at = NULL WHERE did = $1", 843 849 input.did.as_str() 844 850 ) 845 - .execute(&state.db) 851 + .execute(&mut *tx) 846 852 .await 847 853 { 848 854 error!("Error clearing setup token: {:?}", e); 855 + return ApiError::InternalError(None).into_response(); 849 856 } 857 + 858 + if let Err(e) = tx.commit().await { 859 + error!("Failed to commit setup transaction: {:?}", e); 860 + return ApiError::InternalError(None).into_response(); 861 + } 862 + 863 + let _ = crate::auth::webauthn::delete_registration_state(&state.db, &input.did).await; 850 864 851 865 info!(did = %input.did, "Passkey-only account setup completed"); 852 866 ··· 1090 1104 } 1091 1105 }; 1092 1106 1107 + let mut tx = match state.db.begin().await { 1108 + Ok(tx) => tx, 1109 + Err(e) => { 1110 + error!("Failed to begin transaction: {:?}", e); 1111 + return ApiError::InternalError(None).into_response(); 1112 + } 1113 + }; 1114 + 1093 1115 if let Err(e) = sqlx::query!( 1094 1116 "UPDATE users SET password_hash = $1, password_required = TRUE, recovery_token = NULL, recovery_token_expires_at = NULL WHERE did = $2", 1095 1117 password_hash, 1096 1118 input.did.as_str() 1097 1119 ) 1098 - .execute(&state.db) 1120 + .execute(&mut *tx) 1099 1121 .await 1100 1122 { 1101 1123 error!("Error updating password: {:?}", e); ··· 1103 1125 } 1104 1126 1105 1127 let deleted = sqlx::query!("DELETE FROM passkeys WHERE did = $1", input.did.as_str()) 1106 - .execute(&state.db) 1128 + .execute(&mut *tx) 1107 1129 .await; 1108 - match deleted { 1109 - Ok(result) => { 1110 - if result.rows_affected() > 0 { 1111 - info!(did = %input.did, count = result.rows_affected(), "Deleted lost passkeys during account recovery"); 1112 - } 1113 - } 1130 + let passkeys_deleted = match deleted { 1131 + Ok(result) => result.rows_affected(), 1114 1132 Err(e) => { 1115 - warn!(did = %input.did, "Failed to delete passkeys during recovery: {:?}", e); 1133 + error!(did = %input.did, "Failed to delete passkeys during recovery: {:?}", e); 1134 + return ApiError::InternalError(None).into_response(); 1116 1135 } 1136 + }; 1137 + 1138 + if let Err(e) = tx.commit().await { 1139 + error!("Failed to commit recovery transaction: {:?}", e); 1140 + return ApiError::InternalError(None).into_response(); 1117 1141 } 1118 1142 1143 + if passkeys_deleted > 0 { 1144 + info!(did = %input.did, count = passkeys_deleted, "Deleted lost passkeys during account recovery"); 1145 + } 1119 1146 info!(did = %input.did, "Passkey-only account recovered with temporary password"); 1120 1147 SuccessResponse::ok().into_response() 1121 1148 }
+11 -7
crates/tranquil-pds/src/api/server/password.rs
··· 239 239 error!("Failed to commit password reset transaction: {:?}", e); 240 240 return ApiError::InternalError(None).into_response(); 241 241 } 242 - for jti in session_jtis { 242 + futures::future::join_all(session_jtis.into_iter().map(|jti| { 243 243 let cache_key = format!("auth:session:{}:{}", user_did, jti); 244 - if let Err(e) = state.cache.delete(&cache_key).await { 245 - warn!( 246 - "Failed to invalidate session cache for {}: {:?}", 247 - cache_key, e 248 - ); 244 + let cache = state.cache.clone(); 245 + async move { 246 + if let Err(e) = cache.delete(&cache_key).await { 247 + warn!( 248 + "Failed to invalidate session cache for {}: {:?}", 249 + cache_key, e 250 + ); 251 + } 249 252 } 250 - } 253 + })) 254 + .await; 251 255 info!("Password reset completed for user {}", user_id); 252 256 EmptyResponse::ok().into_response() 253 257 }
+84 -54
crates/tranquil-pds/src/api/server/session.rs
··· 705 705 return ApiError::InternalError(None).into_response(); 706 706 } 707 707 }; 708 - let verified_column = match row.channel { 709 - crate::comms::CommsChannel::Email => "email_verified", 710 - crate::comms::CommsChannel::Discord => "discord_verified", 711 - crate::comms::CommsChannel::Telegram => "telegram_verified", 712 - crate::comms::CommsChannel::Signal => "signal_verified", 713 - }; 714 - let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 715 - if let Err(e) = sqlx::query(&update_query) 716 - .bind(input.did.as_str()) 717 - .execute(&state.db) 718 - .await 719 - { 720 - error!("Failed to update verification status: {:?}", e); 721 - return ApiError::InternalError(None).into_response(); 722 - } 723 708 724 709 let access_meta = match crate::auth::create_access_token_with_metadata(&row.did, &key_bytes) { 725 710 Ok(m) => m, ··· 735 720 return ApiError::InternalError(None).into_response(); 736 721 } 737 722 }; 723 + 724 + let mut tx = match state.db.begin().await { 725 + Ok(tx) => tx, 726 + Err(e) => { 727 + error!("Failed to begin transaction: {:?}", e); 728 + return ApiError::InternalError(None).into_response(); 729 + } 730 + }; 731 + 732 + let verified_column = match row.channel { 733 + crate::comms::CommsChannel::Email => "email_verified", 734 + crate::comms::CommsChannel::Discord => "discord_verified", 735 + crate::comms::CommsChannel::Telegram => "telegram_verified", 736 + crate::comms::CommsChannel::Signal => "signal_verified", 737 + }; 738 + let update_query = format!("UPDATE users SET {} = TRUE WHERE did = $1", verified_column); 739 + if let Err(e) = sqlx::query(&update_query) 740 + .bind(input.did.as_str()) 741 + .execute(&mut *tx) 742 + .await 743 + { 744 + error!("Failed to update verification status: {:?}", e); 745 + return ApiError::InternalError(None).into_response(); 746 + } 747 + 738 748 let no_scope: Option<String> = None; 739 749 if let Err(e) = sqlx::query!( 740 750 "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at, legacy_login, mfa_verified, scope) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", ··· 747 757 false, 748 758 no_scope 749 759 ) 750 - .execute(&state.db) 760 + .execute(&mut *tx) 751 761 .await 752 762 { 753 763 error!("Failed to insert session: {:?}", e); 754 764 return ApiError::InternalError(None).into_response(); 755 765 } 766 + 767 + if let Err(e) = tx.commit().await { 768 + error!("Failed to commit transaction: {:?}", e); 769 + return ApiError::InternalError(None).into_response(); 770 + } 771 + 756 772 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 757 773 if let Err(e) = crate::comms::enqueue_welcome(&state.db, row.id, &hostname).await { 758 774 warn!("Failed to enqueue welcome notification: {:?}", e); ··· 878 894 .and_then(|v| v.strip_prefix("Bearer ")) 879 895 .and_then(|token| crate::auth::get_jti_from_token(token).ok()); 880 896 881 - let mut sessions: Vec<SessionInfo> = Vec::new(); 882 - 883 - let jwt_result = sqlx::query_as::< 897 + let jwt_rows = match sqlx::query_as::< 884 898 _, 885 899 ( 886 900 i32, ··· 898 912 ) 899 913 .bind(&auth.0.did) 900 914 .fetch_all(&state.db) 901 - .await; 902 - 903 - match jwt_result { 904 - Ok(rows) => { 905 - for (id, access_jti, created_at, expires_at) in rows { 906 - sessions.push(SessionInfo { 907 - id: format!("jwt:{}", id), 908 - session_type: "legacy".to_string(), 909 - client_name: None, 910 - created_at: created_at.to_rfc3339(), 911 - expires_at: expires_at.to_rfc3339(), 912 - is_current: current_jti.as_ref() == Some(&access_jti), 913 - }); 914 - } 915 - } 915 + .await 916 + { 917 + Ok(rows) => rows, 916 918 Err(e) => { 917 919 error!("DB error fetching JWT sessions: {:?}", e); 918 920 return ApiError::InternalError(None).into_response(); 919 921 } 920 - } 922 + }; 921 923 922 - let oauth_result = sqlx::query_as::< 924 + let oauth_rows = match sqlx::query_as::< 923 925 _, 924 926 ( 925 927 i32, ··· 938 940 ) 939 941 .bind(&auth.0.did) 940 942 .fetch_all(&state.db) 941 - .await; 943 + .await 944 + { 945 + Ok(rows) => rows, 946 + Err(e) => { 947 + error!("DB error fetching OAuth sessions: {:?}", e); 948 + return ApiError::InternalError(None).into_response(); 949 + } 950 + }; 951 + 952 + let jwt_sessions = jwt_rows 953 + .into_iter() 954 + .map(|(id, access_jti, created_at, expires_at)| SessionInfo { 955 + id: format!("jwt:{}", id), 956 + session_type: "legacy".to_string(), 957 + client_name: None, 958 + created_at: created_at.to_rfc3339(), 959 + expires_at: expires_at.to_rfc3339(), 960 + is_current: current_jti.as_ref() == Some(&access_jti), 961 + }); 942 962 943 - match oauth_result { 944 - Ok(rows) => { 945 - for (id, token_id, created_at, expires_at, client_id) in rows { 963 + let is_oauth = auth.0.is_oauth; 964 + let oauth_sessions = 965 + oauth_rows 966 + .into_iter() 967 + .map(|(id, token_id, created_at, expires_at, client_id)| { 946 968 let client_name = extract_client_name(&client_id); 947 - let is_current_oauth = auth.0.is_oauth && current_jti.as_ref() == Some(&token_id); 948 - sessions.push(SessionInfo { 969 + let is_current_oauth = is_oauth && current_jti.as_ref() == Some(&token_id); 970 + SessionInfo { 949 971 id: format!("oauth:{}", id), 950 972 session_type: "oauth".to_string(), 951 973 client_name: Some(client_name), 952 974 created_at: created_at.to_rfc3339(), 953 975 expires_at: expires_at.to_rfc3339(), 954 976 is_current: is_current_oauth, 955 - }); 956 - } 957 - } 958 - Err(e) => { 959 - error!("DB error fetching OAuth sessions: {:?}", e); 960 - return ApiError::InternalError(None).into_response(); 961 - } 962 - } 977 + } 978 + }); 963 979 980 + let mut sessions: Vec<SessionInfo> = jwt_sessions.chain(oauth_sessions).collect(); 964 981 sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at)); 965 982 966 983 (StatusCode::OK, Json(ListSessionsOutput { sessions })).into_response() ··· 1061 1078 return ApiError::InvalidToken(None).into_response(); 1062 1079 }; 1063 1080 1081 + let mut tx = match state.db.begin().await { 1082 + Ok(tx) => tx, 1083 + Err(e) => { 1084 + error!("Failed to begin transaction: {:?}", e); 1085 + return ApiError::InternalError(None).into_response(); 1086 + } 1087 + }; 1088 + 1064 1089 if auth.0.is_oauth { 1065 1090 if let Err(e) = sqlx::query("DELETE FROM session_tokens WHERE did = $1") 1066 1091 .bind(&auth.0.did) 1067 - .execute(&state.db) 1092 + .execute(&mut *tx) 1068 1093 .await 1069 1094 { 1070 1095 error!("DB error revoking JWT sessions: {:?}", e); ··· 1073 1098 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1 AND token_id != $2") 1074 1099 .bind(&auth.0.did) 1075 1100 .bind(jti) 1076 - .execute(&state.db) 1101 + .execute(&mut *tx) 1077 1102 .await 1078 1103 { 1079 1104 error!("DB error revoking OAuth sessions: {:?}", e); ··· 1084 1109 sqlx::query("DELETE FROM session_tokens WHERE did = $1 AND access_jti != $2") 1085 1110 .bind(&auth.0.did) 1086 1111 .bind(jti) 1087 - .execute(&state.db) 1112 + .execute(&mut *tx) 1088 1113 .await 1089 1114 { 1090 1115 error!("DB error revoking JWT sessions: {:?}", e); ··· 1092 1117 } 1093 1118 if let Err(e) = sqlx::query("DELETE FROM oauth_token WHERE did = $1") 1094 1119 .bind(&auth.0.did) 1095 - .execute(&state.db) 1120 + .execute(&mut *tx) 1096 1121 .await 1097 1122 { 1098 1123 error!("DB error revoking OAuth sessions: {:?}", e); 1099 1124 return ApiError::InternalError(None).into_response(); 1100 1125 } 1126 + } 1127 + 1128 + if let Err(e) = tx.commit().await { 1129 + error!("Failed to commit transaction: {:?}", e); 1130 + return ApiError::InternalError(None).into_response(); 1101 1131 } 1102 1132 1103 1133 info!(did = %&auth.0.did, "All other sessions revoked");
+50 -41
crates/tranquil-pds/src/api/server/totp.rs
··· 195 195 return ApiError::InternalError(None).into_response(); 196 196 } 197 197 198 - for code in &backup_codes { 199 - let hash = match hash_backup_code(code) { 200 - Ok(h) => h, 201 - Err(e) => { 202 - error!("Failed to hash backup code: {:?}", e); 203 - return ApiError::InternalError(None).into_response(); 204 - } 205 - }; 206 - 207 - if let Err(e) = sqlx::query!( 208 - "INSERT INTO backup_codes (did, code_hash, created_at) VALUES ($1, $2, NOW())", 209 - &auth.0.did, 210 - hash 211 - ) 212 - .execute(&mut *tx) 213 - .await 214 - { 215 - error!("Failed to store backup code: {:?}", e); 198 + let backup_hashes: Result<Vec<_>, _> = 199 + backup_codes.iter().map(|c| hash_backup_code(c)).collect(); 200 + let backup_hashes = match backup_hashes { 201 + Ok(hashes) => hashes, 202 + Err(e) => { 203 + error!("Failed to hash backup code: {:?}", e); 216 204 return ApiError::InternalError(None).into_response(); 217 205 } 206 + }; 207 + 208 + if let Err(e) = sqlx::query!( 209 + r#" 210 + INSERT INTO backup_codes (did, code_hash, created_at) 211 + SELECT $1, hash, NOW() FROM UNNEST($2::text[]) AS t(hash) 212 + "#, 213 + &auth.0.did, 214 + &backup_hashes[..] 215 + ) 216 + .execute(&mut *tx) 217 + .await 218 + { 219 + error!("Failed to store backup codes: {:?}", e); 220 + return ApiError::InternalError(None).into_response(); 218 221 } 219 222 220 223 if let Err(e) = tx.commit().await { ··· 482 485 return ApiError::InternalError(None).into_response(); 483 486 } 484 487 485 - for code in &backup_codes { 486 - let hash = match hash_backup_code(code) { 487 - Ok(h) => h, 488 - Err(e) => { 489 - error!("Failed to hash backup code: {:?}", e); 490 - return ApiError::InternalError(None).into_response(); 491 - } 492 - }; 493 - 494 - if let Err(e) = sqlx::query!( 495 - "INSERT INTO backup_codes (did, code_hash, created_at) VALUES ($1, $2, NOW())", 496 - &auth.0.did, 497 - hash 498 - ) 499 - .execute(&mut *tx) 500 - .await 501 - { 502 - error!("Failed to store backup code: {:?}", e); 488 + let backup_hashes: Result<Vec<_>, _> = 489 + backup_codes.iter().map(|c| hash_backup_code(c)).collect(); 490 + let backup_hashes = match backup_hashes { 491 + Ok(hashes) => hashes, 492 + Err(e) => { 493 + error!("Failed to hash backup code: {:?}", e); 503 494 return ApiError::InternalError(None).into_response(); 504 495 } 496 + }; 497 + 498 + if let Err(e) = sqlx::query!( 499 + r#" 500 + INSERT INTO backup_codes (did, code_hash, created_at) 501 + SELECT $1, hash, NOW() FROM UNNEST($2::text[]) AS t(hash) 502 + "#, 503 + &auth.0.did, 504 + &backup_hashes[..] 505 + ) 506 + .execute(&mut *tx) 507 + .await 508 + { 509 + error!("Failed to store backup codes: {:?}", e); 510 + return ApiError::InternalError(None).into_response(); 505 511 } 506 512 507 513 if let Err(e) = tx.commit().await { ··· 532 538 } 533 539 }; 534 540 535 - for row in backup_codes { 536 - if verify_backup_code(&code, &row.code_hash) { 541 + let matched = backup_codes 542 + .iter() 543 + .find(|row| verify_backup_code(&code, &row.code_hash)); 544 + 545 + match matched { 546 + Some(row) => { 537 547 let _ = sqlx::query!( 538 548 "UPDATE backup_codes SET used_at = $1 WHERE id = $2", 539 549 Utc::now(), ··· 541 551 ) 542 552 .execute(&state.db) 543 553 .await; 544 - return true; 554 + true 545 555 } 556 + None => false, 546 557 } 547 - 548 - false 549 558 } 550 559 551 560 pub async fn verify_totp_or_backup_for_user(state: &AppState, did: &str, code: &str) -> bool {
+8 -9
crates/tranquil-pds/src/auth/verification_token.rs
··· 296 296 } 297 297 298 298 pub fn format_token_for_display(token: &str) -> String { 299 - let clean = token.replace(['-', ' '], ""); 300 - let mut result = String::new(); 301 - for (i, c) in clean.chars().enumerate() { 302 - if i > 0 && i % 4 == 0 { 303 - result.push('-'); 304 - } 305 - result.push(c); 306 - } 307 - result 299 + token 300 + .replace(['-', ' '], "") 301 + .chars() 302 + .collect::<Vec<_>>() 303 + .chunks(4) 304 + .map(|chunk| chunk.iter().collect::<String>()) 305 + .collect::<Vec<_>>() 306 + .join("-") 308 307 } 309 308 310 309 pub fn normalize_token_input(input: &str) -> String {
+3 -7
crates/tranquil-pds/src/oauth/db/scope_preference.rs
··· 75 75 let stored_scopes: std::collections::HashSet<&str> = 76 76 stored_prefs.iter().map(|p| p.scope.as_str()).collect(); 77 77 78 - for scope in requested_scopes { 79 - if !stored_scopes.contains(scope.as_str()) { 80 - return Ok(true); 81 - } 82 - } 83 - 84 - Ok(false) 78 + Ok(requested_scopes 79 + .iter() 80 + .any(|scope| !stored_scopes.contains(scope.as_str()))) 85 81 } 86 82 87 83 pub async fn delete_scope_preferences(
+22 -24
crates/tranquil-pds/src/oauth/db/token.rs
··· 179 179 pub async fn rotate_token( 180 180 pool: &PgPool, 181 181 old_db_id: i32, 182 - new_token_id: &str, 183 182 new_refresh_token: &str, 184 183 new_expires_at: DateTime<Utc>, 185 184 ) -> Result<(), OAuthError> { ··· 207 206 sqlx::query!( 208 207 r#" 209 208 UPDATE oauth_token 210 - SET token_id = $2, current_refresh_token = $3, expires_at = $4, updated_at = NOW(), 211 - previous_refresh_token = $5, rotated_at = NOW() 209 + SET current_refresh_token = $2, expires_at = $3, updated_at = NOW(), 210 + previous_refresh_token = $4, rotated_at = NOW() 212 211 WHERE id = $1 213 212 "#, 214 213 old_db_id, 215 - new_token_id, 216 214 new_refresh_token, 217 215 new_expires_at, 218 216 old_refresh ··· 317 315 ) 318 316 .fetch_all(pool) 319 317 .await?; 320 - let mut tokens = Vec::with_capacity(rows.len()); 321 - for r in rows { 322 - tokens.push(TokenData { 323 - did: r.did, 324 - token_id: r.token_id, 325 - created_at: r.created_at, 326 - updated_at: r.updated_at, 327 - expires_at: r.expires_at, 328 - client_id: r.client_id, 329 - client_auth: from_json(r.client_auth)?, 330 - device_id: r.device_id, 331 - parameters: from_json(r.parameters)?, 332 - details: r.details, 333 - code: r.code, 334 - current_refresh_token: r.current_refresh_token, 335 - scope: r.scope, 336 - controller_did: r.controller_did, 337 - }); 338 - } 339 - Ok(tokens) 318 + rows.into_iter() 319 + .map(|r| { 320 + Ok(TokenData { 321 + did: r.did, 322 + token_id: r.token_id, 323 + created_at: r.created_at, 324 + updated_at: r.updated_at, 325 + expires_at: r.expires_at, 326 + client_id: r.client_id, 327 + client_auth: from_json(r.client_auth)?, 328 + device_id: r.device_id, 329 + parameters: from_json(r.parameters)?, 330 + details: r.details, 331 + code: r.code, 332 + current_refresh_token: r.current_refresh_token, 333 + scope: r.scope, 334 + controller_did: r.controller_did, 335 + }) 336 + }) 337 + .collect() 340 338 } 341 339 342 340 pub async fn count_tokens_for_user(pool: &PgPool, did: &str) -> Result<i64, OAuthError> {
+5 -7
crates/tranquil-pds/src/oauth/endpoints/authorize.rs
··· 102 102 .get("cookie") 103 103 .and_then(|v| v.to_str().ok()) 104 104 .and_then(|cookie_str| { 105 - for cookie in cookie_str.split(';') { 106 - let cookie = cookie.trim(); 107 - if let Some(value) = cookie.strip_prefix(&format!("{}=", DEVICE_COOKIE_NAME)) { 108 - return crate::config::AuthConfig::get().verify_device_cookie(value); 109 - } 110 - } 111 - None 105 + cookie_str.split(';').map(|c| c.trim()).find_map(|cookie| { 106 + cookie 107 + .strip_prefix(&format!("{}=", DEVICE_COOKIE_NAME)) 108 + .and_then(|value| crate::config::AuthConfig::get().verify_device_cookie(value)) 109 + }) 112 110 }) 113 111 } 114 112
+34 -32
crates/tranquil-pds/src/oauth/endpoints/par.rs
··· 182 182 if requested_scopes.is_empty() { 183 183 return Ok(Some("atproto".to_string())); 184 184 } 185 - let mut has_transition = false; 186 - let mut has_granular = false; 185 + if let Some(unknown) = requested_scopes 186 + .iter() 187 + .find(|s| matches!(parse_scope(s), ParsedScope::Unknown(_))) 188 + { 189 + return Err(OAuthError::InvalidScope(format!( 190 + "Unsupported scope: {}", 191 + unknown 192 + ))); 193 + } 187 194 188 - for scope in &requested_scopes { 189 - let parsed = parse_scope(scope); 190 - match &parsed { 191 - ParsedScope::Unknown(_) => { 192 - return Err(OAuthError::InvalidScope(format!( 193 - "Unsupported scope: {}", 194 - scope 195 - ))); 196 - } 195 + let has_transition = requested_scopes.iter().any(|s| { 196 + matches!( 197 + parse_scope(s), 197 198 ParsedScope::TransitionGeneric 198 - | ParsedScope::TransitionChat 199 - | ParsedScope::TransitionEmail => { 200 - has_transition = true; 201 - } 199 + | ParsedScope::TransitionChat 200 + | ParsedScope::TransitionEmail 201 + ) 202 + }); 203 + let has_granular = requested_scopes.iter().any(|s| { 204 + matches!( 205 + parse_scope(s), 202 206 ParsedScope::Repo(_) 203 - | ParsedScope::Blob(_) 204 - | ParsedScope::Rpc(_) 205 - | ParsedScope::Account(_) 206 - | ParsedScope::Identity(_) 207 - | ParsedScope::Include(_) => { 208 - has_granular = true; 209 - } 210 - ParsedScope::Atproto => {} 211 - } 212 - } 207 + | ParsedScope::Blob(_) 208 + | ParsedScope::Rpc(_) 209 + | ParsedScope::Account(_) 210 + | ParsedScope::Identity(_) 211 + | ParsedScope::Include(_) 212 + ) 213 + }); 213 214 214 215 if has_transition && has_granular { 215 216 return Err(OAuthError::InvalidScope( ··· 219 220 220 221 if let Some(client_scope) = &client_metadata.scope { 221 222 let client_scopes: Vec<&str> = client_scope.split_whitespace().collect(); 222 - for scope in &requested_scopes { 223 - if !client_scopes.iter().any(|cs| scope_matches(cs, scope)) { 224 - return Err(OAuthError::InvalidScope(format!( 225 - "Scope '{}' not registered for this client", 226 - scope 227 - ))); 228 - } 223 + if let Some(unregistered) = requested_scopes 224 + .iter() 225 + .find(|scope| !client_scopes.iter().any(|cs| scope_matches(cs, scope))) 226 + { 227 + return Err(OAuthError::InvalidScope(format!( 228 + "Scope '{}' not registered for this client", 229 + unregistered 230 + ))); 229 231 } 230 232 } 231 233 Ok(Some(requested_scopes.join(" ")))
+13 -10
crates/tranquil-pds/src/oauth/endpoints/token/grants.rs
··· 24 24 request: ValidatedTokenRequest, 25 25 dpop_proof: Option<String>, 26 26 ) -> Result<(HeaderMap, Json<TokenResponse>), OAuthError> { 27 + tracing::info!( 28 + has_dpop = dpop_proof.is_some(), 29 + client_id = ?request.client_auth.client_id, 30 + "Authorization code grant requested" 31 + ); 27 32 let (code, code_verifier, redirect_uri) = match request.grant { 28 33 TokenGrant::AuthorizationCode { 29 34 code, ··· 178 183 controller_did: controller_did.clone(), 179 184 }; 180 185 db::create_token(&state.db, &token_data).await?; 186 + tracing::info!( 187 + did = %did, 188 + token_id = %token_id.0, 189 + client_id = %auth_request.client_id, 190 + "Authorization code grant completed, token created" 191 + ); 181 192 tokio::spawn({ 182 193 let pool = state.db.clone(); 183 194 let did_clone = did.clone(); ··· 316 327 } else { 317 328 None 318 329 }; 319 - let new_token_id = TokenId::generate(); 320 330 let new_refresh_token = RefreshToken::generate(); 321 331 let refresh_expiry_days = if matches!(token_data.client_auth, ClientAuth::None) { 322 332 REFRESH_TOKEN_EXPIRY_DAYS_PUBLIC ··· 324 334 REFRESH_TOKEN_EXPIRY_DAYS_CONFIDENTIAL 325 335 }; 326 336 let new_expires_at = Utc::now() + Duration::days(refresh_expiry_days); 327 - db::rotate_token( 328 - &state.db, 329 - db_id, 330 - &new_token_id.0, 331 - &new_refresh_token.0, 332 - new_expires_at, 333 - ) 334 - .await?; 337 + db::rotate_token(&state.db, db_id, &new_refresh_token.0, new_expires_at).await?; 335 338 tracing::info!( 336 339 did = %token_data.did, 337 340 new_expires_at = %new_expires_at, 338 341 "Refresh token rotated successfully" 339 342 ); 340 343 let access_token = create_access_token_with_delegation( 341 - &new_token_id.0, 344 + &token_data.token_id, 342 345 &token_data.did, 343 346 dpop_jkt.as_deref(), 344 347 token_data.scope.as_deref(),
+13 -5
crates/tranquil-pds/src/oauth/endpoints/token/helpers.rs
··· 11 11 12 12 pub struct TokenClaims { 13 13 pub jti: String, 14 + pub sid: String, 14 15 pub exp: i64, 15 16 pub iat: i64, 16 17 } ··· 33 34 } 34 35 35 36 pub fn create_access_token( 36 - token_id: &str, 37 + session_id: &str, 37 38 sub: &str, 38 39 dpop_jkt: Option<&str>, 39 40 scope: Option<&str>, 40 41 ) -> Result<String, OAuthError> { 41 - create_access_token_with_delegation(token_id, sub, dpop_jkt, scope, None) 42 + create_access_token_with_delegation(session_id, sub, dpop_jkt, scope, None) 42 43 } 43 44 44 45 pub fn create_access_token_with_delegation( 45 - token_id: &str, 46 + session_id: &str, 46 47 sub: &str, 47 48 dpop_jkt: Option<&str>, 48 49 scope: Option<&str>, 49 50 controller_did: Option<&str>, 50 51 ) -> Result<String, OAuthError> { 51 52 use serde_json::json; 53 + let jti = uuid::Uuid::new_v4().to_string(); 52 54 let pds_hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 53 55 let issuer = format!("https://{}", pds_hostname); 54 56 let now = Utc::now().timestamp(); ··· 60 62 "aud": issuer, 61 63 "iat": now, 62 64 "exp": exp, 63 - "jti": token_id, 65 + "jti": jti, 66 + "sid": session_id, 64 67 "scope": actual_scope 65 68 }); 66 69 if let Some(jkt) = dpop_jkt { ··· 132 135 .and_then(|j| j.as_str()) 133 136 .ok_or_else(|| OAuthError::InvalidToken("Missing jti claim".to_string()))? 134 137 .to_string(); 138 + let sid = payload 139 + .get("sid") 140 + .and_then(|s| s.as_str()) 141 + .ok_or_else(|| OAuthError::InvalidToken("Missing sid claim".to_string()))? 142 + .to_string(); 135 143 let exp = payload 136 144 .get("exp") 137 145 .and_then(|e| e.as_i64()) ··· 140 148 .get("iat") 141 149 .and_then(|i| i.as_i64()) 142 150 .ok_or_else(|| OAuthError::InvalidToken("Missing iat claim".to_string()))?; 143 - Ok(TokenClaims { jti, exp, iat }) 151 + Ok(TokenClaims { jti, sid, exp, iat }) 144 152 }
+1 -1
crates/tranquil-pds/src/oauth/endpoints/token/introspect.rs
··· 102 102 Ok(info) => info, 103 103 Err(_) => return Ok(Json(inactive_response)), 104 104 }; 105 - let token_data = match db::get_token_by_id(&state.db, &token_info.jti).await { 105 + let token_data = match db::get_token_by_id(&state.db, &token_info.sid).await { 106 106 Ok(Some(data)) => data, 107 107 _ => return Ok(Json(inactive_response)), 108 108 };
+2 -2
crates/tranquil-pds/src/oauth/verify.rs
··· 142 142 return Err(OAuthError::ExpiredToken("Token has expired".to_string())); 143 143 } 144 144 let token_id = payload 145 - .get("jti") 145 + .get("sid") 146 146 .and_then(|j| j.as_str()) 147 - .ok_or_else(|| OAuthError::InvalidToken("Missing jti claim".to_string()))? 147 + .ok_or_else(|| OAuthError::InvalidToken("Missing sid claim".to_string()))? 148 148 .to_string(); 149 149 let did = payload 150 150 .get("sub")
+1 -1
crates/tranquil-pds/src/rate_limit.rs
··· 48 48 NonZeroU32::new(10).unwrap(), 49 49 ))), 50 50 oauth_token: Arc::new(RateLimiter::keyed(Quota::per_minute( 51 - NonZeroU32::new(30).unwrap(), 51 + NonZeroU32::new(300).unwrap(), 52 52 ))), 53 53 oauth_authorize: Arc::new(RateLimiter::keyed(Quota::per_minute( 54 54 NonZeroU32::new(10).unwrap(),
+1 -1
crates/tranquil-pds/src/state.rs
··· 71 71 Self::PasswordReset => (5, 3_600_000), 72 72 Self::ResetPassword => (10, 60_000), 73 73 Self::RefreshSession => (60, 60_000), 74 - Self::OAuthToken => (30, 60_000), 74 + Self::OAuthToken => (300, 60_000), 75 75 Self::OAuthAuthorize => (10, 60_000), 76 76 Self::OAuthPar => (30, 60_000), 77 77 Self::OAuthIntrospect => (30, 60_000),
+2 -6
crates/tranquil-pds/src/sync/deprecated.rs
··· 146 146 stack.push(*cid); 147 147 } 148 148 Ipld::Map(map) => { 149 - for v in map.values() { 150 - extract_links_ipld(v, stack); 151 - } 149 + map.values().for_each(|v| extract_links_ipld(v, stack)); 152 150 } 153 151 Ipld::List(arr) => { 154 - for v in arr { 155 - extract_links_ipld(v, stack); 156 - } 152 + arr.iter().for_each(|v| extract_links_ipld(v, stack)); 157 153 } 158 154 _ => {} 159 155 }
+2 -6
crates/tranquil-pds/src/sync/import.rs
··· 148 148 links.push(*cid); 149 149 } 150 150 Ipld::Map(map) => { 151 - for v in map.values() { 152 - extract_links(v, links); 153 - } 151 + map.values().for_each(|v| extract_links(v, links)); 154 152 } 155 153 Ipld::List(arr) => { 156 - for v in arr { 157 - extract_links(v, links); 158 - } 154 + arr.iter().for_each(|v| extract_links(v, links)); 159 155 } 160 156 _ => {} 161 157 }
+22 -20
crates/tranquil-pds/src/sync/repo.rs
··· 181 181 } 182 182 }; 183 183 184 - let mut block_cids: Vec<Cid> = Vec::new(); 185 - for event in &events { 186 - if let Some(cids) = &event.blocks_cids { 187 - for cid_str in cids { 188 - if let Ok(cid) = Cid::from_str(cid_str) 189 - && !block_cids.contains(&cid) 190 - { 191 - block_cids.push(cid); 192 - } 184 + let block_cids: Vec<Cid> = events 185 + .iter() 186 + .flat_map(|event| { 187 + let block_cids = event 188 + .blocks_cids 189 + .as_ref() 190 + .map(|cids| cids.iter().filter_map(|s| Cid::from_str(s).ok()).collect()) 191 + .unwrap_or_else(Vec::new); 192 + let commit_cid = event 193 + .commit_cid 194 + .as_ref() 195 + .and_then(|s| Cid::from_str(s).ok()); 196 + block_cids.into_iter().chain(commit_cid) 197 + }) 198 + .fold(Vec::new(), |mut acc, cid| { 199 + if !acc.contains(&cid) { 200 + acc.push(cid); 193 201 } 194 - } 195 - if let Some(commit_cid_str) = &event.commit_cid 196 - && let Ok(cid) = Cid::from_str(commit_cid_str) 197 - && !block_cids.contains(&cid) 198 - { 199 - block_cids.push(cid); 200 - } 201 - } 202 + acc 203 + }); 202 204 203 205 let mut car_bytes = match encode_car_header(head_cid) { 204 206 Ok(h) => h, ··· 334 336 car.extend_from_slice(&writer); 335 337 }; 336 338 write_block(&mut car_bytes, &commit_cid, &commit_bytes); 337 - for (cid, data) in &proof_blocks { 338 - write_block(&mut car_bytes, cid, data); 339 - } 339 + proof_blocks 340 + .iter() 341 + .for_each(|(cid, data)| write_block(&mut car_bytes, cid, data)); 340 342 write_block(&mut car_bytes, &record_cid, &record_block); 341 343 ( 342 344 StatusCode::OK,
+43 -57
crates/tranquil-pds/src/sync/util.rs
··· 210 210 let mut buffer = Cursor::new(Vec::new()); 211 211 let header = CarHeader::new_v1(vec![commit_cid]); 212 212 let mut writer = CarWriter::new(header, &mut buffer); 213 - for (cid, data) in other_blocks { 214 - if cid != commit_cid { 215 - writer 216 - .write(cid, data.as_ref()) 217 - .await 218 - .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?; 219 - } 213 + for (cid, data) in other_blocks.iter().filter(|(c, _)| **c != commit_cid) { 214 + writer 215 + .write(*cid, data.as_ref()) 216 + .await 217 + .map_err(|e| anyhow::anyhow!("writing block {}: {}", cid, e))?; 220 218 } 221 219 if let Some(data) = commit_bytes { 222 220 writer ··· 360 358 } 361 359 let car_bytes = if !all_cids.is_empty() { 362 360 let fetched = state.block_store.get_many(&all_cids).await?; 363 - let mut blocks = std::collections::BTreeMap::new(); 364 - let mut commit_bytes: Option<Bytes> = None; 365 - for (cid, data_opt) in all_cids.iter().zip(fetched.iter()) { 366 - if let Some(data) = data_opt { 367 - if *cid == commit_cid { 368 - commit_bytes = Some(data.clone()); 369 - if let Some(rev) = extract_rev_from_commit_bytes(data) { 370 - frame.rev = rev; 371 - } 372 - } else { 373 - blocks.insert(*cid, data.clone()); 374 - } 375 - } 361 + let (commit_data, other_blocks): (Vec<_>, Vec<_>) = all_cids 362 + .iter() 363 + .zip(fetched.iter()) 364 + .filter_map(|(cid, data_opt)| data_opt.as_ref().map(|data| (*cid, data.clone()))) 365 + .partition(|(cid, _)| *cid == commit_cid); 366 + let commit_bytes = commit_data.into_iter().next().map(|(_, data)| data); 367 + if let Some(ref cb) = commit_bytes 368 + && let Some(rev) = extract_rev_from_commit_bytes(cb) 369 + { 370 + frame.rev = rev; 376 371 } 372 + let blocks: std::collections::BTreeMap<Cid, Bytes> = other_blocks.into_iter().collect(); 377 373 write_car_blocks(commit_cid, commit_bytes, blocks).await? 378 374 } else { 379 375 Vec::new() ··· 393 389 state: &AppState, 394 390 events: &[SequencedEvent], 395 391 ) -> Result<HashMap<Cid, Bytes>, anyhow::Error> { 396 - let mut all_cids: Vec<Cid> = Vec::new(); 397 - for event in events { 398 - if let Some(ref commit_cid_str) = event.commit_cid 399 - && let Ok(cid) = Cid::from_str(commit_cid_str) 400 - { 401 - all_cids.push(cid); 402 - } 403 - if let Some(ref prev_cid_str) = event.prev_cid 404 - && let Ok(cid) = Cid::from_str(prev_cid_str) 405 - { 406 - all_cids.push(cid); 407 - } 408 - if let Some(ref block_cids_str) = event.blocks_cids { 409 - for s in block_cids_str { 410 - if let Ok(cid) = Cid::from_str(s) { 411 - all_cids.push(cid); 412 - } 413 - } 414 - } 415 - } 392 + let mut all_cids: Vec<Cid> = events 393 + .iter() 394 + .flat_map(|event| { 395 + let commit_cid = event 396 + .commit_cid 397 + .as_ref() 398 + .and_then(|s| Cid::from_str(s).ok()); 399 + let prev_cid = event.prev_cid.as_ref().and_then(|s| Cid::from_str(s).ok()); 400 + let block_cids = event 401 + .blocks_cids 402 + .as_ref() 403 + .map(|cids| cids.iter().filter_map(|s| Cid::from_str(s).ok()).collect()) 404 + .unwrap_or_else(Vec::new); 405 + commit_cid.into_iter().chain(prev_cid).chain(block_cids) 406 + }) 407 + .collect(); 416 408 all_cids.sort(); 417 409 all_cids.dedup(); 418 410 if all_cids.is_empty() { 419 411 return Ok(HashMap::new()); 420 412 } 421 413 let fetched = state.block_store.get_many(&all_cids).await?; 422 - let mut blocks_map = HashMap::with_capacity(all_cids.len()); 423 - for (cid, data_opt) in all_cids.into_iter().zip(fetched.into_iter()) { 424 - if let Some(data) = data_opt { 425 - blocks_map.insert(cid, data); 426 - } 427 - } 414 + let blocks_map: HashMap<Cid, Bytes> = all_cids 415 + .into_iter() 416 + .zip(fetched) 417 + .filter_map(|(cid, data_opt)| data_opt.map(|data| (cid, data))) 418 + .collect(); 428 419 Ok(blocks_map) 429 420 } 430 421 ··· 511 502 frame.since = Some(rev); 512 503 } 513 504 let car_bytes = if !all_cids.is_empty() { 514 - let mut blocks = BTreeMap::new(); 515 - let mut commit_bytes_for_car: Option<Bytes> = None; 516 - for cid in all_cids { 517 - if let Some(data) = prefetched.get(&cid) { 518 - if cid == commit_cid { 519 - commit_bytes_for_car = Some(data.clone()); 520 - } else { 521 - blocks.insert(cid, data.clone()); 522 - } 523 - } 524 - } 505 + let (commit_data, other_blocks): (Vec<_>, Vec<_>) = all_cids 506 + .into_iter() 507 + .filter_map(|cid| prefetched.get(&cid).map(|data| (cid, data.clone()))) 508 + .partition(|(cid, _)| *cid == commit_cid); 509 + let commit_bytes_for_car = commit_data.into_iter().next().map(|(_, data)| data); 510 + let blocks: BTreeMap<Cid, Bytes> = other_blocks.into_iter().collect(); 525 511 write_car_blocks(commit_cid, commit_bytes_for_car, blocks).await? 526 512 } else { 527 513 Vec::new()
+198 -3
crates/tranquil-pds/tests/common/mod.rs
··· 5 5 use reqwest::{Client, StatusCode, header}; 6 6 use serde_json::{Value, json}; 7 7 use sqlx::postgres::PgPoolOptions; 8 - #[allow(unused_imports)] 9 8 use std::collections::HashMap; 10 - use std::sync::OnceLock; 9 + use std::sync::{Arc, OnceLock, RwLock}; 11 10 #[allow(unused_imports)] 12 11 use std::time::Duration; 13 12 use tokio::net::TcpListener; 14 13 use tranquil_pds::state::AppState; 15 14 use wiremock::matchers::{method, path}; 16 - use wiremock::{Mock, MockServer, ResponseTemplate}; 15 + use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate}; 17 16 18 17 static SERVER_URL: OnceLock<String> = OnceLock::new(); 19 18 static APP_PORT: OnceLock<u16> = OnceLock::new(); 20 19 static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new(); 20 + static MOCK_PLC: OnceLock<MockServer> = OnceLock::new(); 21 21 static TEST_DB_POOL: OnceLock<sqlx::PgPool> = OnceLock::new(); 22 22 23 23 #[cfg(not(feature = "external-infra"))] ··· 117 117 std::env::var("DATABASE_URL").expect("DATABASE_URL must be set when using external infra"); 118 118 let s3_endpoint = 119 119 std::env::var("S3_ENDPOINT").expect("S3_ENDPOINT must be set when using external infra"); 120 + let plc_url = setup_mock_plc_directory().await; 120 121 unsafe { 121 122 std::env::set_var( 122 123 "S3_BUCKET", ··· 137 138 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 138 139 std::env::set_var("MAX_IMPORT_SIZE", "100000000"); 139 140 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 141 + std::env::set_var("PLC_DIRECTORY_URL", &plc_url); 140 142 } 141 143 let mock_server = MockServer::start().await; 142 144 setup_mock_appview(&mock_server).await; ··· 164 166 .await 165 167 .expect("Failed to get S3 port"); 166 168 let s3_endpoint = format!("http://127.0.0.1:{}", s3_port); 169 + let plc_url = setup_mock_plc_directory().await; 167 170 unsafe { 168 171 std::env::set_var("S3_BUCKET", "test-bucket"); 169 172 std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin"); ··· 172 175 std::env::set_var("S3_ENDPOINT", &s3_endpoint); 173 176 std::env::set_var("MAX_IMPORT_SIZE", "100000000"); 174 177 std::env::set_var("SKIP_IMPORT_VERIFICATION", "true"); 178 + std::env::set_var("PLC_DIRECTORY_URL", &plc_url); 175 179 } 176 180 let sdk_config = aws_config::defaults(BehaviorVersion::latest()) 177 181 .region("us-east-1") ··· 238 242 } 239 243 240 244 async fn setup_mock_appview(_mock_server: &MockServer) {} 245 + 246 + type PlcOperationStore = Arc<RwLock<HashMap<String, Value>>>; 247 + 248 + struct PlcPostResponder { 249 + store: PlcOperationStore, 250 + } 251 + 252 + impl Respond for PlcPostResponder { 253 + fn respond(&self, request: &Request) -> ResponseTemplate { 254 + let path = request.url.path(); 255 + let did = urlencoding::decode(path.trim_start_matches('/')) 256 + .unwrap_or_default() 257 + .to_string(); 258 + 259 + if let Ok(body) = serde_json::from_slice::<Value>(request.body.as_slice()) 260 + && let Ok(mut store) = self.store.write() 261 + { 262 + store.insert(did, body); 263 + } 264 + ResponseTemplate::new(200) 265 + } 266 + } 267 + 268 + struct PlcGetResponder { 269 + store: PlcOperationStore, 270 + } 271 + 272 + impl Respond for PlcGetResponder { 273 + fn respond(&self, request: &Request) -> ResponseTemplate { 274 + let path = request.url.path(); 275 + let path_clean = path.trim_start_matches('/'); 276 + 277 + let (did, endpoint) = path_clean 278 + .find("/log/") 279 + .or_else(|| path_clean.find("/data")) 280 + .map(|idx| { 281 + let did = urlencoding::decode(&path_clean[..idx]) 282 + .unwrap_or_default() 283 + .to_string(); 284 + let endpoint = &path_clean[idx..]; 285 + (did, endpoint) 286 + }) 287 + .unwrap_or_else(|| { 288 + ( 289 + urlencoding::decode(path_clean) 290 + .unwrap_or_default() 291 + .to_string(), 292 + "", 293 + ) 294 + }); 295 + 296 + let store = self.store.read().unwrap(); 297 + let operation = store.get(&did); 298 + 299 + match endpoint { 300 + "/log/last" => { 301 + let response = operation.cloned().unwrap_or_else(|| { 302 + json!({ 303 + "type": "plc_operation", 304 + "rotationKeys": [], 305 + "verificationMethods": {}, 306 + "alsoKnownAs": [], 307 + "services": {}, 308 + "prev": null 309 + }) 310 + }); 311 + ResponseTemplate::new(200).set_body_json(response) 312 + } 313 + "/log/audit" => ResponseTemplate::new(200).set_body_json(json!([])), 314 + "/data" => { 315 + let response = operation 316 + .map(|op| { 317 + json!({ 318 + "rotationKeys": op.get("rotationKeys").cloned().unwrap_or(json!([])), 319 + "verificationMethods": op.get("verificationMethods").cloned().unwrap_or(json!({})), 320 + "alsoKnownAs": op.get("alsoKnownAs").cloned().unwrap_or(json!([])), 321 + "services": op.get("services").cloned().unwrap_or(json!({})) 322 + }) 323 + }) 324 + .unwrap_or_else(|| { 325 + json!({ 326 + "rotationKeys": [], 327 + "verificationMethods": {}, 328 + "alsoKnownAs": [], 329 + "services": {} 330 + }) 331 + }); 332 + ResponseTemplate::new(200).set_body_json(response) 333 + } 334 + _ => { 335 + let did_doc = operation 336 + .map(|op| operation_to_did_document(&did, op)) 337 + .unwrap_or_else(|| { 338 + json!({ 339 + "@context": ["https://www.w3.org/ns/did/v1"], 340 + "id": did, 341 + "alsoKnownAs": [], 342 + "verificationMethod": [], 343 + "service": [] 344 + }) 345 + }); 346 + ResponseTemplate::new(200).set_body_json(did_doc) 347 + } 348 + } 349 + } 350 + } 351 + 352 + fn operation_to_did_document(did: &str, op: &Value) -> Value { 353 + let also_known_as = op 354 + .get("alsoKnownAs") 355 + .and_then(|v| v.as_array()) 356 + .cloned() 357 + .unwrap_or_default(); 358 + 359 + let verification_methods: Vec<Value> = op 360 + .get("verificationMethods") 361 + .and_then(|v| v.as_object()) 362 + .map(|methods| { 363 + methods 364 + .iter() 365 + .map(|(key, value)| { 366 + let did_key = value.as_str().unwrap_or(""); 367 + let multikey = did_key_to_multikey(did_key); 368 + json!({ 369 + "id": format!("{}#{}", did, key), 370 + "type": "Multikey", 371 + "controller": did, 372 + "publicKeyMultibase": multikey 373 + }) 374 + }) 375 + .collect() 376 + }) 377 + .unwrap_or_default(); 378 + 379 + let services: Vec<Value> = op 380 + .get("services") 381 + .and_then(|v| v.as_object()) 382 + .map(|svcs| { 383 + svcs.iter() 384 + .map(|(key, value)| { 385 + json!({ 386 + "id": format!("#{}", key), 387 + "type": value.get("type").and_then(|t| t.as_str()).unwrap_or(""), 388 + "serviceEndpoint": value.get("endpoint").and_then(|e| e.as_str()).unwrap_or("") 389 + }) 390 + }) 391 + .collect() 392 + }) 393 + .unwrap_or_default(); 394 + 395 + json!({ 396 + "@context": [ 397 + "https://www.w3.org/ns/did/v1", 398 + "https://w3id.org/security/multikey/v1" 399 + ], 400 + "id": did, 401 + "alsoKnownAs": also_known_as, 402 + "verificationMethod": verification_methods, 403 + "service": services 404 + }) 405 + } 406 + 407 + fn did_key_to_multikey(did_key: &str) -> String { 408 + if !did_key.starts_with("did:key:z") { 409 + return String::new(); 410 + } 411 + did_key[8..].to_string() 412 + } 413 + 414 + async fn setup_mock_plc_directory() -> String { 415 + let mock_plc = MockServer::start().await; 416 + let store: PlcOperationStore = Arc::new(RwLock::new(HashMap::new())); 417 + 418 + Mock::given(method("POST")) 419 + .respond_with(PlcPostResponder { 420 + store: store.clone(), 421 + }) 422 + .mount(&mock_plc) 423 + .await; 424 + 425 + Mock::given(method("GET")) 426 + .respond_with(PlcGetResponder { 427 + store: store.clone(), 428 + }) 429 + .mount(&mock_plc) 430 + .await; 431 + 432 + let plc_url = mock_plc.uri(); 433 + MOCK_PLC.set(mock_plc).ok(); 434 + plc_url 435 + } 241 436 242 437 async fn spawn_app(database_url: String) -> String { 243 438 use tranquil_pds::rate_limit::RateLimiters;
+1 -4
crates/tranquil-pds/tests/import_verification.rs
··· 159 159 let status = import_res.status(); 160 160 if status != StatusCode::OK { 161 161 let body = import_res.text().await.unwrap_or_default(); 162 - panic!( 163 - "Import failed with status {}: {}", 164 - status, body 165 - ); 162 + panic!("Import failed with status {}: {}", status, body); 166 163 } 167 164 } 168 165