Highly ambitious ATProtocol AppView service and sdks

jetstream ✈️

+1545 -248
-19
api/.sqlx/query-2bc2c43d9327e2fe4630fa60c68341f13d595b39391422d71d4081845485058c.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "INSERT INTO \"record\" (\"uri\", \"cid\", \"did\", \"collection\", \"json\", \"indexed_at\")\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT (\"uri\")\n DO UPDATE SET\n \"cid\" = EXCLUDED.\"cid\",\n \"json\" = EXCLUDED.\"json\",\n \"indexed_at\" = EXCLUDED.\"indexed_at\"", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Text", 9 - "Text", 10 - "Text", 11 - "Text", 12 - "Jsonb", 13 - "Timestamptz" 14 - ] 15 - }, 16 - "nullable": [] 17 - }, 18 - "hash": "2bc2c43d9327e2fe4630fa60c68341f13d595b39391422d71d4081845485058c" 19 - }
+9 -3
api/.sqlx/query-473a7c2a3db78b03e4be6190777113e307ceed271fc659fefe333b5e4aa7488a.json api/.sqlx/query-53a46494d5aabef41ffb7a33902516405ad1f9cc72421036c206a57362a231f1.json
··· 1 1 { 2 2 "db_name": "PostgreSQL", 3 - "query": "\n SELECT uri, cid, did, collection, json, \"indexed_at\" as indexed_at\n FROM record\n WHERE collection = $1 AND json->>'slice' = $2 AND \"indexed_at\" < $3\n ORDER BY \"indexed_at\" DESC\n LIMIT $4\n ", 3 + "query": "\n SELECT uri, cid, did, collection, json, \"indexed_at\" as indexed_at, slice_uri\n FROM record\n WHERE collection = $1 AND json->>'slice' = $2 AND \"indexed_at\" < $3\n ORDER BY \"indexed_at\" DESC\n LIMIT $4\n ", 4 4 "describe": { 5 5 "columns": [ 6 6 { ··· 32 32 "ordinal": 5, 33 33 "name": "indexed_at", 34 34 "type_info": "Timestamptz" 35 + }, 36 + { 37 + "ordinal": 6, 38 + "name": "slice_uri", 39 + "type_info": "Text" 35 40 } 36 41 ], 37 42 "parameters": { ··· 48 53 false, 49 54 false, 50 55 false, 51 - false 56 + false, 57 + true 52 58 ] 53 59 }, 54 - "hash": "473a7c2a3db78b03e4be6190777113e307ceed271fc659fefe333b5e4aa7488a" 60 + "hash": "53a46494d5aabef41ffb7a33902516405ad1f9cc72421036c206a57362a231f1" 55 61 }
+20
api/.sqlx/query-61f127686862442dd8ba3f0043b03c3b88ea0d4174f3fb96ecfbfad56f36aab1.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "INSERT INTO \"record\" (\"uri\", \"cid\", \"did\", \"collection\", \"json\", \"indexed_at\", \"slice_uri\")\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n ON CONFLICT (\"uri\")\n DO UPDATE SET\n \"cid\" = EXCLUDED.\"cid\",\n \"json\" = EXCLUDED.\"json\",\n \"indexed_at\" = EXCLUDED.\"indexed_at\",\n \"slice_uri\" = EXCLUDED.\"slice_uri\"", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Text", 9 + "Text", 10 + "Text", 11 + "Text", 12 + "Jsonb", 13 + "Timestamptz", 14 + "Text" 15 + ] 16 + }, 17 + "nullable": [] 18 + }, 19 + "hash": "61f127686862442dd8ba3f0043b03c3b88ea0d4174f3fb96ecfbfad56f36aab1" 20 + }
+9 -3
api/.sqlx/query-678473ff73728b115ac4a310bc477f3ccd6763594a894809a6769704e3b55701.json api/.sqlx/query-7b89fadea24881ba9a03c952674033c5beaa0e7d41b1260253f1d712b24e9d27.json
··· 1 1 { 2 2 "db_name": "PostgreSQL", 3 - "query": "\n SELECT uri, cid, did, collection, json, \"indexed_at\" as indexed_at\n FROM record\n WHERE collection = $1 AND json->>'slice' = $2 AND \"indexed_at\" < $3 AND did = ANY($4)\n ORDER BY \"indexed_at\" DESC\n LIMIT $5\n ", 3 + "query": "\n SELECT uri, cid, did, collection, json, \"indexed_at\" as indexed_at, slice_uri\n FROM record\n WHERE collection = $1 AND json->>'slice' = $2 AND \"indexed_at\" < $3 AND did = ANY($4)\n ORDER BY \"indexed_at\" DESC\n LIMIT $5\n ", 4 4 "describe": { 5 5 "columns": [ 6 6 { ··· 32 32 "ordinal": 5, 33 33 "name": "indexed_at", 34 34 "type_info": "Timestamptz" 35 + }, 36 + { 37 + "ordinal": 6, 38 + "name": "slice_uri", 39 + "type_info": "Text" 35 40 } 36 41 ], 37 42 "parameters": { ··· 49 54 false, 50 55 false, 51 56 false, 52 - false 57 + false, 58 + true 53 59 ] 54 60 }, 55 - "hash": "678473ff73728b115ac4a310bc477f3ccd6763594a894809a6769704e3b55701" 61 + "hash": "7b89fadea24881ba9a03c952674033c5beaa0e7d41b1260253f1d712b24e9d27" 56 62 }
+26
api/.sqlx/query-9db25683295916d8c90008926d327c2c0d893af4ca39c076bf26df84a4e8f10a.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT did, slice_uri\n FROM actor\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "did", 9 + "type_info": "Text" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "slice_uri", 14 + "type_info": "Text" 15 + } 16 + ], 17 + "parameters": { 18 + "Left": [] 19 + }, 20 + "nullable": [ 21 + false, 22 + false 23 + ] 24 + }, 25 + "hash": "9db25683295916d8c90008926d327c2c0d893af4ca39c076bf26df84a4e8f10a" 26 + }
-19
api/.sqlx/query-e524e8fa627c76e2f8f8c47b31d1ed527ed8da5d07f054c4a1a7e1a4826e5758.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "INSERT INTO \"record\" (\"uri\", \"cid\", \"did\", \"collection\", \"json\", \"indexed_at\")\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT (\"uri\")\n DO UPDATE SET\n \"cid\" = EXCLUDED.\"cid\",\n \"json\" = EXCLUDED.\"json\",\n \"indexed_at\" = EXCLUDED.\"indexed_at\"", 4 - "describe": { 5 - "columns": [], 6 - "parameters": { 7 - "Left": [ 8 - "Text", 9 - "Text", 10 - "Text", 11 - "Text", 12 - "Jsonb", 13 - "Timestamptz" 14 - ] 15 - }, 16 - "nullable": [] 17 - }, 18 - "hash": "e524e8fa627c76e2f8f8c47b31d1ed527ed8da5d07f054c4a1a7e1a4826e5758" 19 - }
+22
api/.sqlx/query-fd7e15432f1c5e2b04ab38b6828bc3862e6e2b4e6d3b45a3e5aef024e815c358.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT json->>'domain' as domain\n FROM record\n WHERE collection = 'social.slices.slice'\n AND uri = $1\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "domain", 9 + "type_info": "Text" 10 + } 11 + ], 12 + "parameters": { 13 + "Left": [ 14 + "Text" 15 + ] 16 + }, 17 + "nullable": [ 18 + null 19 + ] 20 + }, 21 + "hash": "fd7e15432f1c5e2b04ab38b6828bc3862e6e2b4e6d3b45a3e5aef024e815c358" 22 + }
+20
api/.sqlx/query-fe1819e244ecc16440c2914682691d5a0dbf63a4d80ce2b8799e7e2606c15ea0.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "INSERT INTO \"record\" (\"uri\", \"cid\", \"did\", \"collection\", \"json\", \"indexed_at\", \"slice_uri\")\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n ON CONFLICT (\"uri\")\n DO UPDATE SET\n \"cid\" = EXCLUDED.\"cid\",\n \"json\" = EXCLUDED.\"json\",\n \"indexed_at\" = EXCLUDED.\"indexed_at\",\n \"slice_uri\" = EXCLUDED.\"slice_uri\"", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "Text", 9 + "Text", 10 + "Text", 11 + "Text", 12 + "Jsonb", 13 + "Timestamptz", 14 + "Text" 15 + ] 16 + }, 17 + "nullable": [] 18 + }, 19 + "hash": "fe1819e244ecc16440c2914682691d5a0dbf63a4d80ce2b8799e7e2606c15ea0" 20 + }
+148 -3
api/Cargo.lock
··· 134 134 ] 135 135 136 136 [[package]] 137 + name = "atproto-jetstream" 138 + version = "0.11.2" 139 + source = "registry+https://github.com/rust-lang/crates.io-index" 140 + checksum = "178b4af2b79ee11f25e69bca5c12907d0699843a4a847d737827dcff68a20af4" 141 + dependencies = [ 142 + "anyhow", 143 + "async-trait", 144 + "atproto-identity", 145 + "futures", 146 + "http", 147 + "serde", 148 + "serde_json", 149 + "thiserror 2.0.14", 150 + "tokio", 151 + "tokio-util", 152 + "tokio-websockets", 153 + "tracing", 154 + "tracing-subscriber", 155 + "urlencoding", 156 + "zstd", 157 + ] 158 + 159 + [[package]] 137 160 name = "atproto-oauth" 138 161 version = "0.11.2" 139 162 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 371 394 source = "registry+https://github.com/rust-lang/crates.io-index" 372 395 checksum = "3ee0f8803222ba5a7e2777dd72ca451868909b1ac410621b676adf07280e9b5f" 373 396 dependencies = [ 397 + "jobserver", 398 + "libc", 374 399 "shlex", 375 400 ] 376 401 ··· 435 460 version = "0.9.4" 436 461 source = "registry+https://github.com/rust-lang/crates.io-index" 437 462 checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" 463 + dependencies = [ 464 + "core-foundation-sys", 465 + "libc", 466 + ] 467 + 468 + [[package]] 469 + name = "core-foundation" 470 + version = "0.10.1" 471 + source = "registry+https://github.com/rust-lang/crates.io-index" 472 + checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" 438 473 dependencies = [ 439 474 "core-foundation-sys", 440 475 "libc", ··· 777 812 ] 778 813 779 814 [[package]] 815 + name = "futures" 816 + version = "0.3.31" 817 + source = "registry+https://github.com/rust-lang/crates.io-index" 818 + checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" 819 + dependencies = [ 820 + "futures-channel", 821 + "futures-core", 822 + "futures-executor", 823 + "futures-io", 824 + "futures-sink", 825 + "futures-task", 826 + "futures-util", 827 + ] 828 + 829 + [[package]] 780 830 name = "futures-channel" 781 831 version = "0.3.31" 782 832 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 849 899 source = "registry+https://github.com/rust-lang/crates.io-index" 850 900 checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" 851 901 dependencies = [ 902 + "futures-channel", 852 903 "futures-core", 853 904 "futures-io", 854 905 "futures-macro", ··· 1377 1428 checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" 1378 1429 1379 1430 [[package]] 1431 + name = "jobserver" 1432 + version = "0.1.34" 1433 + source = "registry+https://github.com/rust-lang/crates.io-index" 1434 + checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" 1435 + dependencies = [ 1436 + "getrandom 0.3.3", 1437 + "libc", 1438 + ] 1439 + 1440 + [[package]] 1380 1441 name = "js-sys" 1381 1442 version = "0.3.77" 1382 1443 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1635 1696 "openssl-probe", 1636 1697 "openssl-sys", 1637 1698 "schannel", 1638 - "security-framework", 1699 + "security-framework 2.11.1", 1639 1700 "security-framework-sys", 1640 1701 "tempfile", 1641 1702 ] ··· 2274 2335 ] 2275 2336 2276 2337 [[package]] 2338 + name = "rustls-native-certs" 2339 + version = "0.8.1" 2340 + source = "registry+https://github.com/rust-lang/crates.io-index" 2341 + checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" 2342 + dependencies = [ 2343 + "openssl-probe", 2344 + "rustls-pki-types", 2345 + "schannel", 2346 + "security-framework 3.3.0", 2347 + ] 2348 + 2349 + [[package]] 2277 2350 name = "rustls-pki-types" 2278 2351 version = "1.12.0" 2279 2352 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2349 2422 checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" 2350 2423 dependencies = [ 2351 2424 "bitflags", 2352 - "core-foundation", 2425 + "core-foundation 0.9.4", 2426 + "core-foundation-sys", 2427 + "libc", 2428 + "security-framework-sys", 2429 + ] 2430 + 2431 + [[package]] 2432 + name = "security-framework" 2433 + version = "3.3.0" 2434 + source = "registry+https://github.com/rust-lang/crates.io-index" 2435 + checksum = "80fb1d92c5028aa318b4b8bd7302a5bfcf48be96a37fc6fc790f806b0004ee0c" 2436 + dependencies = [ 2437 + "bitflags", 2438 + "core-foundation 0.10.1", 2353 2439 "core-foundation-sys", 2354 2440 "libc", 2355 2441 "security-framework-sys", ··· 2526 2612 ] 2527 2613 2528 2614 [[package]] 2615 + name = "simdutf8" 2616 + version = "0.1.5" 2617 + source = "registry+https://github.com/rust-lang/crates.io-index" 2618 + checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" 2619 + 2620 + [[package]] 2529 2621 name = "slab" 2530 2622 version = "0.4.11" 2531 2623 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2535 2627 name = "slice" 2536 2628 version = "0.1.0" 2537 2629 dependencies = [ 2630 + "anyhow", 2631 + "async-trait", 2538 2632 "atproto-client", 2539 2633 "atproto-identity", 2634 + "atproto-jetstream", 2540 2635 "atproto-oauth", 2541 2636 "axum", 2542 2637 "axum-extra", ··· 2909 3004 checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" 2910 3005 dependencies = [ 2911 3006 "bitflags", 2912 - "core-foundation", 3007 + "core-foundation 0.9.4", 2913 3008 "system-configuration-sys", 2914 3009 ] 2915 3010 ··· 3101 3196 "futures-sink", 3102 3197 "pin-project-lite", 3103 3198 "tokio", 3199 + ] 3200 + 3201 + [[package]] 3202 + name = "tokio-websockets" 3203 + version = "0.11.4" 3204 + source = "registry+https://github.com/rust-lang/crates.io-index" 3205 + checksum = "9fcaf159b4e7a376b05b5bfd77bfd38f3324f5fce751b4213bfc7eaa47affb4e" 3206 + dependencies = [ 3207 + "base64", 3208 + "bytes", 3209 + "futures-core", 3210 + "futures-sink", 3211 + "http", 3212 + "httparse", 3213 + "rand 0.9.2", 3214 + "ring", 3215 + "rustls-native-certs", 3216 + "rustls-pki-types", 3217 + "simdutf8", 3218 + "tokio", 3219 + "tokio-rustls", 3220 + "tokio-util", 3104 3221 ] 3105 3222 3106 3223 [[package]] ··· 4018 4135 "quote", 4019 4136 "syn 2.0.106", 4020 4137 ] 4138 + 4139 + [[package]] 4140 + name = "zstd" 4141 + version = "0.13.3" 4142 + source = "registry+https://github.com/rust-lang/crates.io-index" 4143 + checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" 4144 + dependencies = [ 4145 + "zstd-safe", 4146 + ] 4147 + 4148 + [[package]] 4149 + name = "zstd-safe" 4150 + version = "7.2.4" 4151 + source = "registry+https://github.com/rust-lang/crates.io-index" 4152 + checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" 4153 + dependencies = [ 4154 + "zstd-sys", 4155 + ] 4156 + 4157 + [[package]] 4158 + name = "zstd-sys" 4159 + version = "2.0.15+zstd.1.5.7" 4160 + source = "registry+https://github.com/rust-lang/crates.io-index" 4161 + checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237" 4162 + dependencies = [ 4163 + "cc", 4164 + "pkg-config", 4165 + ]
+3
api/Cargo.toml
··· 23 23 24 24 # Error handling 25 25 thiserror = "1.0" 26 + anyhow = "1.0" 26 27 27 28 # Logging and tracing 28 29 tracing = "0.1" ··· 42 43 43 44 # Async utilities 44 45 futures-util = "0.3" 46 + async-trait = "0.1" 45 47 46 48 # AT Protocol client 47 49 atproto-client = "0.11.2" 48 50 atproto-identity = "0.11.2" 49 51 atproto-oauth = "0.11.2" 52 + atproto-jetstream = "0.11.2" 50 53 51 54 52 55 # Middleware for HTTP requests with retry logic
+8
api/migrations/007_add_slice_uri_to_record.sql
··· 1 + -- Add slice_uri field to record table to associate records with specific slices 2 + ALTER TABLE "record" ADD COLUMN "slice_uri" TEXT; 3 + 4 + -- Create index on slice_uri for efficient slice-specific queries 5 + CREATE INDEX idx_record_slice_uri ON "record" ("slice_uri"); 6 + 7 + -- Create composite index for slice + collection queries (common pattern) 8 + CREATE INDEX idx_record_slice_collection ON "record" ("slice_uri", "collection");
+138 -74
api/scripts/generate-typescript.ts
··· 20 20 } 21 21 22 22 interface Lexicon { 23 - nsid: string; 23 + id: string; 24 24 definitions?: Record<string, LexiconDefinition>; 25 25 } 26 26 27 27 // Get lexicon data and slice URI from command line args 28 28 // @ts-ignore 29 29 const lexiconsInput = Deno.args[0] || ""; 30 - // @ts-ignore 31 - const sliceUri = Deno.args[1] || "at://did:plc:bcgltzqazw5tb6k2g3ttenbj/social.slices.slice/3lx5zq4t56s2q"; 30 + const sliceUri = 31 + // @ts-ignore 32 + Deno.args[1] || 33 + "at://did:plc:bcgltzqazw5tb6k2g3ttenbj/social.slices.slice/3lx5zq4t56s2q"; 32 34 33 35 if (!lexiconsInput) { 34 36 console.error("No lexicon data provided"); ··· 36 38 Deno.exit(1); 37 39 } 38 40 39 - const lexicons: Lexicon[] = JSON.parse(lexiconsInput); 41 + const lexicons: Lexicon[] = JSON.parse(lexiconsInput).map((lex: any) => ({ 42 + ...lex, 43 + // TODO: Fix upstream to use 'id' and 'defs' consistently 44 + id: lex.id || lex.nsid, // Normalize nsid to id 45 + definitions: lex.defs || lex.definitions, // Normalize defs to definitions 46 + })); 40 47 41 48 // Generate usage example based on available lexicons 42 49 function generateUsageExample(): string { 43 50 // Find the first non-social.slices lexicon for a better example 44 - const nonSlicesLexicon = lexicons.find(lex => 45 - !lex.nsid.startsWith('social.slices.') 51 + const nonSlicesLexicon = lexicons.find( 52 + (lex) => lex.id && !lex.id.startsWith("social.slices.") 46 53 ); 47 - 54 + 48 55 if (nonSlicesLexicon) { 49 56 // Use the first non-slices lexicon 50 - const parts = nonSlicesLexicon.nsid.split('.'); 51 - 57 + const parts = nonSlicesLexicon.id.split("."); 58 + 52 59 return `/** 53 60 * @example Usage 54 61 * \`\`\`ts ··· 59 66 * '${sliceUri}' 60 67 * ); 61 68 * 62 - * // List records from the ${nonSlicesLexicon.nsid} collection 63 - * const records = await client.${parts.join('.')}.listRecords(); 69 + * // List records from the ${nonSlicesLexicon.id} collection 70 + * const records = await client.${parts.join(".")}.listRecords(); 64 71 * 65 72 * // Get a specific record 66 - * const record = await client.${parts.join('.')}.getRecord({ 67 - * uri: 'at://did:plc:example/${nonSlicesLexicon.nsid}/3abc123' 73 + * const record = await client.${parts.join(".")}.getRecord({ 74 + * uri: 'at://did:plc:example/${nonSlicesLexicon.id}/3abc123' 68 75 * }); 69 76 * 70 77 * // Search records in the collection 71 - * const searchResults = await client.${parts.join('.')}.searchRecords({ 78 + * const searchResults = await client.${parts.join(".")}.searchRecords({ 72 79 * query: "example search term" 73 80 * }); 74 81 * 75 82 * // Search specific field 76 - * const fieldSearch = await client.${parts.join('.')}.searchRecords({ 83 + * const fieldSearch = await client.${parts.join(".")}.searchRecords({ 77 84 * query: "blog", 78 85 * field: "title" 79 86 * }); ··· 96 103 * 97 104 * // List records from a slice 98 105 * const slices = await client.social.slices.slice.listRecords(); 99 - * 100 - * // Get slice statistics 101 - * const stats = await client.social.slices.slice.stats({ 102 - * slice: '${sliceUri}' 106 + * 107 + * // Get slice statistics 108 + * const stats = await client.social.slices.slice.stats({ 109 + * slice: '${sliceUri}' 103 110 * }); 104 111 * 105 112 * // Serve the slice names as JSON ··· 113 120 const project = new Project({ useInMemoryFileSystem: true }); 114 121 const sourceFile = project.createSourceFile("generated-client.ts", ""); 115 122 116 - // Add header comment 123 + // Add header comment 117 124 const usageExample = generateUsageExample(); 118 125 const headerComment = `// Generated TypeScript client for AT Protocol records 119 126 // Generated at: ${new Date().toISOString().slice(0, 19).replace("T", " ")} UTC ··· 127 134 128 135 // Add base interfaces 129 136 function addBaseInterfaces(): void { 130 - 131 137 // RecordResponse interface 132 138 sourceFile.addInterface({ 133 139 name: "RecordResponse", ··· 167 173 // ListRecordsParams interface (generic with sort fields) 168 174 sourceFile.addInterface({ 169 175 name: "ListRecordsParams", 170 - typeParameters: [{ name: "TSortField", constraint: "string", default: "string" }], 176 + typeParameters: [ 177 + { name: "TSortField", constraint: "string", default: "string" }, 178 + ], 171 179 isExported: true, 172 180 properties: [ 173 181 { name: "author", type: "string", hasQuestionToken: true }, 174 182 { name: "authors", type: "string[]", hasQuestionToken: true }, 175 183 { name: "limit", type: "number", hasQuestionToken: true }, 176 184 { name: "cursor", type: "string", hasQuestionToken: true }, 177 - { name: "sort", type: "`${TSortField}:${'asc' | 'desc'}` | `${TSortField}:${'asc' | 'desc'},${TSortField}:${'asc' | 'desc'}`", hasQuestionToken: true }, 185 + { 186 + name: "sort", 187 + type: "`${TSortField}:${'asc' | 'desc'}` | `${TSortField}:${'asc' | 'desc'},${TSortField}:${'asc' | 'desc'}`", 188 + hasQuestionToken: true, 189 + }, 178 190 ], 179 191 }); 180 192 ··· 200 212 // SearchRecordsParams interface (generic with sort fields) 201 213 sourceFile.addInterface({ 202 214 name: "SearchRecordsParams", 203 - typeParameters: [{ name: "TSortField", constraint: "string", default: "string" }], 215 + typeParameters: [ 216 + { name: "TSortField", constraint: "string", default: "string" }, 217 + ], 204 218 isExported: true, 205 219 properties: [ 206 220 { name: "query", type: "string" }, 207 221 { name: "field", type: "string", hasQuestionToken: true }, 208 222 { name: "limit", type: "number", hasQuestionToken: true }, 209 223 { name: "cursor", type: "string", hasQuestionToken: true }, 210 - { name: "sort", type: "`${TSortField}:${'asc' | 'desc'}` | `${TSortField}:${'asc' | 'desc'},${TSortField}:${'asc' | 'desc'}`", hasQuestionToken: true }, 224 + { 225 + name: "sort", 226 + type: "`${TSortField}:${'asc' | 'desc'}` | `${TSortField}:${'asc' | 'desc'},${TSortField}:${'asc' | 'desc'}`", 227 + hasQuestionToken: true, 228 + }, 211 229 ], 212 230 }); 213 231 ··· 322 340 sourceFile.addInterface({ 323 341 name: "GetJobStatusParams", 324 342 isExported: true, 325 - properties: [ 326 - { name: "jobId", type: "string" }, 327 - ], 343 + properties: [{ name: "jobId", type: "string" }], 328 344 }); 329 345 330 346 sourceFile.addInterface({ ··· 344 360 }); 345 361 346 362 sourceFile.addInterface({ 363 + name: "JetstreamStatusResponse", 364 + isExported: true, 365 + properties: [ 366 + { name: "connected", type: "boolean" }, 367 + { name: "status", type: "string" }, 368 + { name: "error", type: "string", hasQuestionToken: true }, 369 + ], 370 + }); 371 + 372 + sourceFile.addInterface({ 347 373 name: "CollectionStats", 348 374 isExported: true, 349 375 properties: [ ··· 356 382 sourceFile.addInterface({ 357 383 name: "SliceStatsParams", 358 384 isExported: true, 359 - properties: [ 360 - { name: "slice", type: "string" }, 361 - ], 385 + properties: [{ name: "slice", type: "string" }], 362 386 }); 363 387 364 388 sourceFile.addInterface({ ··· 422 446 sourceFile.addInterface({ 423 447 name: "UploadBlobResponse", 424 448 isExported: true, 425 - properties: [ 426 - { name: "blob", type: "BlobRef" }, 427 - ], 449 + properties: [{ name: "blob", type: "BlobRef" }], 428 450 }); 429 451 430 452 // CollectionOperations interface ··· 507 529 function isFieldSortable(propDef: any): boolean { 508 530 // Check for direct types 509 531 if (propDef.type) { 510 - const sortableTypes = ['string', 'integer', 'number', 'datetime']; 532 + const sortableTypes = ["string", "integer", "number", "datetime"]; 511 533 if (sortableTypes.includes(propDef.type)) { 512 534 return true; 513 535 } 514 536 } 515 - 537 + 516 538 // Check for format-based types (datetime strings, etc.) 517 539 if (propDef.format) { 518 - const sortableFormats = ['datetime', 'at-identifier', 'at-uri']; 540 + const sortableFormats = ["datetime", "at-identifier", "at-uri"]; 519 541 if (sortableFormats.includes(propDef.format)) { 520 542 return true; 521 543 } 522 544 } 523 - 545 + 524 546 // Arrays, objects, blobs, and complex types are not sortable 525 547 return false; 526 548 } ··· 528 550 // Add lexicon-specific interfaces 529 551 function addLexiconInterfaces(): void { 530 552 for (const lexicon of lexicons) { 531 - const interfaceName = nsidToPascalCase(lexicon.nsid); 553 + const interfaceName = nsidToPascalCase(lexicon.id); 532 554 533 555 if (lexicon.definitions && typeof lexicon.definitions === "object") { 534 556 for (const [, defValue] of Object.entries(lexicon.definitions)) { ··· 556 578 // Add JSDoc comment if description exists 557 579 docs: propDef.description ? [propDef.description] : undefined, 558 580 }); 559 - 581 + 560 582 // Collect sortable field names for sort type 561 583 if (isFieldSortable(propDef)) { 562 584 fieldNames.push(propName); ··· 570 592 isExported: true, 571 593 properties: properties, 572 594 }); 573 - 595 + 574 596 // Add sort fields type union for this record 575 597 if (fieldNames.length > 0) { 576 598 sourceFile.addTypeAlias({ 577 599 name: `${interfaceName}SortFields`, 578 600 isExported: true, 579 - type: fieldNames.map(f => `"${f}"`).join(" | "), 601 + type: fieldNames.map((f) => `"${f}"`).join(" | "), 580 602 }); 581 603 } 582 604 } ··· 585 607 } 586 608 } 587 609 588 - 589 610 // Add base client class with OAuth client integration 590 611 function addBaseClientClass(): void { 591 612 sourceFile.addClass({ 592 613 name: "BaseClient", 593 614 properties: [ 594 615 { name: "baseUrl", type: "string", scope: "protected", isReadonly: true }, 595 - { name: "oauthClient", type: "OAuthClient", scope: "protected", hasQuestionToken: true }, 616 + { 617 + name: "oauthClient", 618 + type: "OAuthClient", 619 + scope: "protected", 620 + hasQuestionToken: true, 621 + }, 596 622 ], 597 623 ctors: [ 598 624 { ··· 631 657 type: '"GET" | "POST" | "PUT" | "DELETE"', 632 658 hasQuestionToken: true, 633 659 }, 634 - { name: "params", type: "Record<string, unknown> | unknown", hasQuestionToken: true }, 660 + { 661 + name: "params", 662 + type: "Record<string, unknown> | unknown", 663 + hasQuestionToken: true, 664 + }, 635 665 ], 636 666 returnType: "Promise<T>", 637 667 statements: [ ··· 650 680 type: '"GET" | "POST" | "PUT" | "DELETE"', 651 681 hasQuestionToken: true, 652 682 }, 653 - { name: "params", type: "Record<string, unknown> | unknown", hasQuestionToken: true }, 683 + { 684 + name: "params", 685 + type: "Record<string, unknown> | unknown", 686 + hasQuestionToken: true, 687 + }, 654 688 { name: "isRetry", type: "boolean", hasQuestionToken: true }, 655 689 ], 656 690 returnType: "Promise<T>", ··· 759 793 if (lexicon.definitions && typeof lexicon.definitions === "object") { 760 794 for (const [, defValue] of Object.entries(lexicon.definitions)) { 761 795 if (defValue.type === "record" && defValue.record) { 762 - const parts = lexicon.nsid.split("."); 796 + const parts = lexicon.id.split("."); 763 797 let current = nestedStructure; 764 798 765 799 // Build nested structure ··· 771 805 } 772 806 773 807 // Add the record interface name and store collection path 774 - current._recordType = nsidToPascalCase(lexicon.nsid); 775 - current._collectionPath = lexicon.nsid; 808 + current._recordType = nsidToPascalCase(lexicon.id); 809 + current._collectionPath = lexicon.id; 776 810 } 777 811 } 778 812 } ··· 811 845 }); 812 846 methods.push({ 813 847 name: "searchRecords", 814 - parameters: [{ name: "params", type: `SearchRecordsParams<${sortFieldsType}>` }], 848 + parameters: [ 849 + { name: "params", type: `SearchRecordsParams<${sortFieldsType}>` }, 850 + ], 815 851 returnType: `Promise<ListRecordsResponse<${value}>>`, 816 852 }); 817 853 // Add create, update, delete methods ··· 819 855 name: "createRecord", 820 856 parameters: [ 821 857 { name: "record", type: value as string }, 822 - { name: "useSelfRkey", type: "boolean", hasQuestionToken: true } 858 + { name: "useSelfRkey", type: "boolean", hasQuestionToken: true }, 823 859 ], 824 860 returnType: `Promise<{ uri: string; cid: string }>`, 825 861 }); ··· 827 863 name: "updateRecord", 828 864 parameters: [ 829 865 { name: "rkey", type: "string" }, 830 - { name: "record", type: value as string } 866 + { name: "record", type: value as string }, 831 867 ], 832 868 returnType: `Promise<{ uri: string; cid: string }>`, 833 869 }); ··· 869 905 })), 870 906 // Add OAuth client to the main AtProtoClient 871 907 ...(className === "Client" 872 - ? [{ name: "oauth", type: "OAuthClient", isReadonly: true, hasQuestionToken: true }] 908 + ? [ 909 + { 910 + name: "oauth", 911 + type: "OAuthClient", 912 + isReadonly: true, 913 + hasQuestionToken: true, 914 + }, 915 + ] 873 916 : []), 874 917 // Add sliceUri property to all classes 875 - { name: "sliceUri", type: "string", scope: "private", isReadonly: true }, 918 + { 919 + name: "sliceUri", 920 + type: "string", 921 + scope: "private", 922 + isReadonly: true, 923 + }, 876 924 ], 877 925 }); 878 926 ··· 888 936 "super(baseUrl, oauthClient);", 889 937 "this.sliceUri = sliceUri;", 890 938 ...properties.map( 891 - (p) => `this.${p.name} = new ${p.type}(baseUrl, sliceUri, oauthClient);` 939 + (p) => 940 + `this.${p.name} = new ${p.type}(baseUrl, sliceUri, oauthClient);` 892 941 ), 893 942 // Add OAuth client reference for main AtProtoClient 894 - ...(className === "Client" 895 - ? ["this.oauth = this.oauthClient;"] 896 - : []), 943 + ...(className === "Client" ? ["this.oauth = this.oauthClient;"] : []), 897 944 ]); 898 945 899 946 // Add methods with implementations ··· 952 999 } 953 1000 954 1001 // Add codegen method to the social.slices.slice class 955 - if (currentPath.length === 3 && currentPath[0] === "social" && currentPath[1] === "slices" && currentPath[2] === "slice") { 1002 + if ( 1003 + currentPath.length === 3 && 1004 + currentPath[0] === "social" && 1005 + currentPath[1] === "slices" && 1006 + currentPath[2] === "slice" 1007 + ) { 956 1008 classDeclaration.addMethod({ 957 1009 name: "codegen", 958 1010 parameters: [{ name: "request", type: "CodegenXrpcRequest" }], ··· 962 1014 `return await this.makeRequest<CodegenXrpcResponse>('social.slices.slice.codegen', 'POST', request);`, 963 1015 ], 964 1016 }); 965 - 966 - 1017 + 967 1018 classDeclaration.addMethod({ 968 1019 name: "stats", 969 1020 parameters: [{ name: "params", type: "SliceStatsParams" }], ··· 973 1024 `return await this.makeRequest<SliceStatsOutput>('social.slices.slice.stats', 'POST', params);`, 974 1025 ], 975 1026 }); 976 - 1027 + 977 1028 classDeclaration.addMethod({ 978 1029 name: "records", 979 1030 parameters: [{ name: "params", type: "SliceRecordsParams" }], ··· 983 1034 `return await this.makeRequest<SliceRecordsOutput>('social.slices.slice.records', 'POST', params);`, 984 1035 ], 985 1036 }); 986 - 1037 + 987 1038 classDeclaration.addMethod({ 988 1039 name: "getActors", 989 - parameters: [{ name: "params", type: "GetActorsParams", hasQuestionToken: true }], 1040 + parameters: [ 1041 + { name: "params", type: "GetActorsParams", hasQuestionToken: true }, 1042 + ], 990 1043 returnType: "Promise<GetActorsResponse>", 991 1044 isAsync: true, 992 1045 statements: [ ··· 997 1050 } 998 1051 999 1052 // Add sync methods to the social.slices.slice class 1000 - if (currentPath.length === 3 && currentPath[0] === "social" && currentPath[1] === "slices" && currentPath[2] === "slice") { 1053 + if ( 1054 + currentPath.length === 3 && 1055 + currentPath[0] === "social" && 1056 + currentPath[1] === "slices" && 1057 + currentPath[2] === "slice" 1058 + ) { 1001 1059 classDeclaration.addMethod({ 1002 1060 name: "startSync", 1003 1061 parameters: [{ name: "params", type: "BulkSyncParams" }], ··· 1008 1066 `return await this.makeRequest<SyncJobResponse>('social.slices.slice.startSync', 'POST', requestParams);`, 1009 1067 ], 1010 1068 }); 1011 - 1069 + 1012 1070 classDeclaration.addMethod({ 1013 1071 name: "getJobStatus", 1014 1072 parameters: [{ name: "params", type: "GetJobStatusParams" }], ··· 1018 1076 `return await this.makeRequest<JobStatus>('social.slices.slice.getJobStatus', 'GET', params);`, 1019 1077 ], 1020 1078 }); 1021 - 1079 + 1022 1080 classDeclaration.addMethod({ 1023 1081 name: "getJobHistory", 1024 1082 parameters: [{ name: "params", type: "GetJobHistoryParams" }], ··· 1028 1086 `return await this.makeRequest<GetJobHistoryResponse>('social.slices.slice.getJobHistory', 'GET', params);`, 1029 1087 ], 1030 1088 }); 1089 + 1090 + classDeclaration.addMethod({ 1091 + name: "getJetstreamStatus", 1092 + returnType: "Promise<JetstreamStatusResponse>", 1093 + isAsync: true, 1094 + statements: [ 1095 + `return await this.makeRequest<JetstreamStatusResponse>('social.slices.slice.getJetstreamStatus', 'GET');`, 1096 + ], 1097 + }); 1031 1098 } 1032 1099 1033 1100 // Add blob upload method to the main AtProtoClient ··· 1042 1109 `return await this.makeRequest<GetActorsResponse>('social.slices.slice.getActors', 'GET', requestParams);`, 1043 1110 ], 1044 1111 }); 1045 - 1112 + 1046 1113 classDeclaration.addMethod({ 1047 1114 name: "uploadBlob", 1048 1115 parameters: [{ name: "request", type: "UploadBlobRequest" }], 1049 1116 returnType: "Promise<UploadBlobResponse>", 1050 - statements: [ 1051 - `return this.uploadBlobWithRetry(request, false);`, 1052 - ], 1117 + statements: [`return this.uploadBlobWithRetry(request, false);`], 1053 1118 }); 1054 - 1119 + 1055 1120 classDeclaration.addMethod({ 1056 1121 name: "uploadBlobWithRetry", 1057 1122 scope: "private", 1058 1123 parameters: [ 1059 1124 { name: "request", type: "UploadBlobRequest" }, 1060 - { name: "isRetry", type: "boolean", hasQuestionToken: true } 1125 + { name: "isRetry", type: "boolean", hasQuestionToken: true }, 1061 1126 ], 1062 1127 returnType: "Promise<UploadBlobResponse>", 1063 1128 isAsync: true, ··· 1115 1180 generateNestedClass(nestedStructure, "Client"); 1116 1181 } 1117 1182 } 1118 - 1119 1183 1120 1184 // Generate the TypeScript 1121 1185 addBaseInterfaces();
+10 -1
api/scripts/test_sync.sh
··· 1 1 #!/bin/bash 2 2 3 + if [ -z "$1" ]; then 4 + echo "Usage: $0 <bearer_token>" 5 + exit 1 6 + fi 7 + 8 + TOKEN="$1" 9 + 3 10 echo "🔄 Testing Sync Endpoint..." 4 11 5 12 echo "🎯 Syncing specific collections with specific repos" 6 - curl -s -X POST http://localhost:3000/xrpc/social.slices.slice.sync \ 13 + curl -s -X POST http://localhost:3000/xrpc/social.slices.slice.startSync \ 7 14 -H "Content-Type: application/json" \ 15 + -H "Authorization: Bearer $TOKEN" \ 8 16 -d '{ 17 + "slice": "at://did:plc:bcgltzqazw5tb6k2g3ttenbj/social.slices.slice/3lwzmbjpqxk2q", 9 18 "collections": [ 10 19 "social.slices.actor.profile", 11 20 "social.slices.slice",
+105 -27
api/src/database.rs
··· 288 288 #[allow(dead_code)] 289 289 pub async fn insert_record(&self, record: &Record) -> Result<(), DatabaseError> { 290 290 sqlx::query!( 291 - r#"INSERT INTO "record" ("uri", "cid", "did", "collection", "json", "indexed_at") 292 - VALUES ($1, $2, $3, $4, $5, $6) 291 + r#"INSERT INTO "record" ("uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri") 292 + VALUES ($1, $2, $3, $4, $5, $6, $7) 293 293 ON CONFLICT ("uri") 294 294 DO UPDATE SET 295 295 "cid" = EXCLUDED."cid", 296 296 "json" = EXCLUDED."json", 297 - "indexed_at" = EXCLUDED."indexed_at""#, 297 + "indexed_at" = EXCLUDED."indexed_at", 298 + "slice_uri" = EXCLUDED."slice_uri""#, 298 299 record.uri, 299 300 record.cid, 300 301 record.did, 301 302 record.collection, 302 303 record.json, 303 - record.indexed_at 304 + record.indexed_at, 305 + record.slice_uri 304 306 ) 305 307 .execute(&self.pool) 306 308 .await?; ··· 313 315 314 316 for record in records { 315 317 sqlx::query!( 316 - r#"INSERT INTO "record" ("uri", "cid", "did", "collection", "json", "indexed_at") 317 - VALUES ($1, $2, $3, $4, $5, $6) 318 + r#"INSERT INTO "record" ("uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri") 319 + VALUES ($1, $2, $3, $4, $5, $6, $7) 318 320 ON CONFLICT ("uri") 319 321 DO UPDATE SET 320 322 "cid" = EXCLUDED."cid", 321 323 "json" = EXCLUDED."json", 322 - "indexed_at" = EXCLUDED."indexed_at""#, 324 + "indexed_at" = EXCLUDED."indexed_at", 325 + "slice_uri" = EXCLUDED."slice_uri""#, 323 326 record.uri, 324 327 record.cid, 325 328 record.did, 326 329 record.collection, 327 330 record.json, 328 - record.indexed_at 331 + record.indexed_at, 332 + record.slice_uri 329 333 ) 330 334 .execute(&mut *tx) 331 335 .await?; ··· 355 359 356 360 pub async fn get_record(&self, uri: &str) -> Result<Option<IndexedRecord>, DatabaseError> { 357 361 let record = sqlx::query_as::<_, Record>( 358 - r#"SELECT "uri", "cid", "did", "collection", "json", "indexed_at" 362 + r#"SELECT "uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri" 359 363 FROM "record" 360 364 WHERE "uri" = $1"#, 361 365 ) ··· 378 382 379 383 pub async fn get_lexicons_by_slice(&self, slice_uri: &str) -> Result<Vec<serde_json::Value>, DatabaseError> { 380 384 let records = sqlx::query_as::<_, Record>( 381 - r#"SELECT "uri", "cid", "did", "collection", "json", "indexed_at" 385 + r#"SELECT "uri", "cid", "did", "collection", "json", "indexed_at", "slice_uri" 382 386 FROM "record" 383 387 WHERE "collection" = 'social.slices.lexicon' 384 388 AND "json"->>'slice' = $1 ··· 718 722 let cursor_where_clause = build_cursor_where_clause(&parsed_cursor, &primary_sort_field, is_desc); 719 723 720 724 let query_sql = format!( 721 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $4 AND json->>'slice' = $5 AND json->>$6 ILIKE '%' || $7 || '%' {} ORDER BY {} LIMIT $8", 725 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $4 AND json->>'slice' = $5 AND json->>$6 ILIKE '%' || $7 || '%' {} ORDER BY {} LIMIT $8", 722 726 cursor_where_clause, order_by 723 727 ); 724 728 sqlx::query_as::<_, Record>(&query_sql) ··· 737 741 let cursor_dt = cursor_str.parse::<chrono::DateTime<chrono::Utc>>() 738 742 .unwrap_or_else(|_| chrono::Utc::now()); 739 743 let query_sql = format!( 740 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json->>'slice' = $2 AND json->>$3 ILIKE '%' || $4 || '%' AND indexed_at < $5 ORDER BY {} LIMIT $6", 744 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND json->>'slice' = $2 AND json->>$3 ILIKE '%' || $4 || '%' AND indexed_at < $5 ORDER BY {} LIMIT $6", 741 745 order_by 742 746 ); 743 747 sqlx::query_as::<_, Record>(&query_sql) ··· 755 759 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>() 756 760 .unwrap_or_else(|_| chrono::Utc::now()); 757 761 let query_sql = format!( 758 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json->>'slice' = $2 AND json::text ILIKE '%' || $3 || '%' AND indexed_at < $4 ORDER BY {} LIMIT $5", 762 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND json->>'slice' = $2 AND json::text ILIKE '%' || $3 || '%' AND indexed_at < $4 ORDER BY {} LIMIT $5", 759 763 order_by 760 764 ); 761 765 sqlx::query_as::<_, Record>(&query_sql) ··· 769 773 }, 770 774 (None, Some(field_name)) => { 771 775 let query_sql = format!( 772 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json->>'slice' = $2 AND json->>$3 ILIKE '%' || $4 || '%' ORDER BY {} LIMIT $5", 776 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND json->>'slice' = $2 AND json->>$3 ILIKE '%' || $4 || '%' ORDER BY {} LIMIT $5", 773 777 order_by 774 778 ); 775 779 sqlx::query_as::<_, Record>(&query_sql) ··· 783 787 }, 784 788 (None, None) => { 785 789 let query_sql = format!( 786 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json->>'slice' = $2 AND json::text ILIKE '%' || $3 || '%' ORDER BY {} LIMIT $4", 790 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND json->>'slice' = $2 AND json::text ILIKE '%' || $3 || '%' ORDER BY {} LIMIT $4", 787 791 order_by 788 792 ); 789 793 sqlx::query_as::<_, Record>(&query_sql) ··· 833 837 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>() 834 838 .unwrap_or_else(|_| chrono::Utc::now()); 835 839 let query_sql = format!( 836 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json->>$2 ILIKE '%' || $3 || '%' AND indexed_at < $4 ORDER BY {} LIMIT $5", 840 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND json->>$2 ILIKE '%' || $3 || '%' AND indexed_at < $4 ORDER BY {} LIMIT $5", 837 841 order_by 838 842 ); 839 843 sqlx::query_as::<_, Record>(&query_sql) ··· 849 853 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>() 850 854 .unwrap_or_else(|_| chrono::Utc::now()); 851 855 let query_sql = format!( 852 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json::text ILIKE '%' || $2 || '%' AND indexed_at < $3 ORDER BY {} LIMIT $4", 856 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND json::text ILIKE '%' || $2 || '%' AND indexed_at < $3 ORDER BY {} LIMIT $4", 853 857 order_by 854 858 ); 855 859 sqlx::query_as::<_, Record>(&query_sql) ··· 862 866 }, 863 867 (None, Some(field_name)) => { 864 868 let query_sql = format!( 865 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json->>$2 ILIKE '%' || $3 || '%' ORDER BY {} LIMIT $4", 869 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND json->>$2 ILIKE '%' || $3 || '%' ORDER BY {} LIMIT $4", 866 870 order_by 867 871 ); 868 872 sqlx::query_as::<_, Record>(&query_sql) ··· 875 879 }, 876 880 (None, None) => { 877 881 let query_sql = format!( 878 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json::text ILIKE '%' || $2 || '%' ORDER BY {} LIMIT $3", 882 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND json::text ILIKE '%' || $2 || '%' ORDER BY {} LIMIT $3", 879 883 order_by 880 884 ); 881 885 sqlx::query_as::<_, Record>(&query_sql) ··· 931 935 sqlx::query_as!( 932 936 Record, 933 937 r#" 934 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 938 + SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at, slice_uri 935 939 FROM record 936 940 WHERE collection = $1 AND json->>'slice' = $2 AND "indexed_at" < $3 AND did = ANY($4) 937 941 ORDER BY "indexed_at" DESC ··· 952 956 sqlx::query_as!( 953 957 Record, 954 958 r#" 955 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 959 + SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at, slice_uri 956 960 FROM record 957 961 WHERE collection = $1 AND json->>'slice' = $2 AND "indexed_at" < $3 958 962 ORDER BY "indexed_at" DESC ··· 969 973 (None, Some(author_list)) => { 970 974 let sql = format!( 971 975 r#" 972 - SELECT uri, cid, did, collection, json, indexed_at 976 + SELECT uri, cid, did, collection, json, indexed_at, slice_uri 973 977 FROM record 974 978 WHERE collection = $1 AND json->>'slice' = $2 AND did = ANY($3) 975 979 ORDER BY {} ··· 988 992 (None, None) => { 989 993 let sql = format!( 990 994 r#" 991 - SELECT uri, cid, did, collection, json, indexed_at 995 + SELECT uri, cid, did, collection, json, indexed_at, slice_uri 992 996 FROM record 993 997 WHERE collection = $1 AND json->>'slice' = $2 994 998 ORDER BY {} ··· 1051 1055 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>() 1052 1056 .unwrap_or_else(|_| chrono::Utc::now()); 1053 1057 let query = format!( 1054 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND indexed_at < $2 AND did = ANY($3) ORDER BY {} LIMIT $4", 1058 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND indexed_at < $2 AND did = ANY($3) ORDER BY {} LIMIT $4", 1055 1059 order_by 1056 1060 ); 1057 1061 sqlx::query_as::<_, Record>(&query) ··· 1066 1070 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>() 1067 1071 .unwrap_or_else(|_| chrono::Utc::now()); 1068 1072 let query = format!( 1069 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND indexed_at < $2 ORDER BY {} LIMIT $3", 1073 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND indexed_at < $2 ORDER BY {} LIMIT $3", 1070 1074 order_by 1071 1075 ); 1072 1076 sqlx::query_as::<_, Record>(&query) ··· 1078 1082 }, 1079 1083 (None, Some(author_list)) => { 1080 1084 let query = format!( 1081 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND did = ANY($2) ORDER BY {} LIMIT $3", 1085 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND did = ANY($2) ORDER BY {} LIMIT $3", 1082 1086 order_by 1083 1087 ); 1084 1088 sqlx::query_as::<_, Record>(&query) ··· 1090 1094 }, 1091 1095 (None, None) => { 1092 1096 let query = format!( 1093 - "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 ORDER BY {} LIMIT $2", 1097 + "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 ORDER BY {} LIMIT $2", 1094 1098 order_by 1095 1099 ); 1096 1100 sqlx::query_as::<_, Record>(&query) ··· 1109 1113 }; 1110 1114 1111 1115 Ok((records, cursor)) 1116 + } 1117 + 1118 + pub async fn delete_record_by_uri(&self, uri: &str) -> Result<u64, DatabaseError> { 1119 + let result = sqlx::query("DELETE FROM record WHERE uri = $1") 1120 + .bind(uri) 1121 + .execute(&self.pool) 1122 + .await?; 1123 + Ok(result.rows_affected()) 1124 + } 1125 + 1126 + 1127 + pub async fn upsert_record(&self, record: &Record) -> Result<(), DatabaseError> { 1128 + sqlx::query(r#" 1129 + INSERT INTO record (uri, cid, did, collection, json, indexed_at, slice_uri) 1130 + VALUES ($1, $2, $3, $4, $5, $6, $7) 1131 + ON CONFLICT (uri) DO UPDATE 1132 + SET cid = EXCLUDED.cid, 1133 + json = EXCLUDED.json, 1134 + indexed_at = EXCLUDED.indexed_at, 1135 + slice_uri = EXCLUDED.slice_uri 1136 + "#) 1137 + .bind(&record.uri) 1138 + .bind(&record.cid) 1139 + .bind(&record.did) 1140 + .bind(&record.collection) 1141 + .bind(&record.json) 1142 + .bind(&record.indexed_at) 1143 + .bind(&record.slice_uri) 1144 + .execute(&self.pool) 1145 + .await?; 1146 + Ok(()) 1147 + } 1148 + 1149 + pub async fn get_all_slices(&self) -> Result<Vec<String>, DatabaseError> { 1150 + let rows: Vec<(String,)> = sqlx::query_as(r#" 1151 + SELECT DISTINCT json->>'slice' as slice_uri 1152 + FROM record 1153 + WHERE collection = 'social.slices.lexicon' 1154 + AND json->>'slice' IS NOT NULL 1155 + "#) 1156 + .fetch_all(&self.pool) 1157 + .await?; 1158 + 1159 + Ok(rows.into_iter().map(|(uri,)| uri).collect()) 1160 + } 1161 + 1162 + 1163 + pub async fn get_all_actors(&self) -> Result<Vec<(String, String)>, DatabaseError> { 1164 + let rows = sqlx::query!( 1165 + r#" 1166 + SELECT did, slice_uri 1167 + FROM actor 1168 + "# 1169 + ) 1170 + .fetch_all(&self.pool) 1171 + .await?; 1172 + 1173 + Ok(rows.into_iter().map(|row| (row.did, row.slice_uri)).collect()) 1174 + } 1175 + 1176 + pub async fn get_slice_domain(&self, slice_uri: &str) -> Result<Option<String>, DatabaseError> { 1177 + let row = sqlx::query!( 1178 + r#" 1179 + SELECT json->>'domain' as domain 1180 + FROM record 1181 + WHERE collection = 'social.slices.slice' 1182 + AND uri = $1 1183 + "#, 1184 + slice_uri 1185 + ) 1186 + .fetch_optional(&self.pool) 1187 + .await?; 1188 + 1189 + Ok(row.and_then(|r| r.domain)) 1112 1190 } 1113 1191 1114 1192 }
+10
api/src/errors.rs
··· 46 46 ServerBind(#[from] std::io::Error), 47 47 } 48 48 49 + #[derive(Error, Debug)] 50 + pub enum SliceError { 51 + #[error("error-slice-jetstream-1 Jetstream error: {message}")] 52 + JetstreamError { message: String }, 53 + 54 + #[error("error-slice-database Database error: {0}")] 55 + Database(#[from] DatabaseError), 56 + 57 + } 58 +
+34
api/src/handler_jetstream_status.rs
··· 1 + use axum::{ 2 + extract::State, 3 + http::StatusCode, 4 + response::Json as ResponseJson, 5 + }; 6 + use serde::Serialize; 7 + use std::sync::atomic::Ordering; 8 + use crate::AppState; 9 + 10 + #[derive(Serialize)] 11 + #[serde(rename_all = "camelCase")] 12 + pub struct JetstreamStatusResponse { 13 + connected: bool, 14 + status: String, 15 + error: Option<String>, 16 + } 17 + 18 + pub async fn get_jetstream_status( 19 + State(state): State<AppState>, 20 + ) -> Result<ResponseJson<JetstreamStatusResponse>, StatusCode> { 21 + let connected = state.jetstream_connected.load(Ordering::Relaxed); 22 + 23 + let (status, error) = if connected { 24 + ("Connected".to_string(), None) 25 + } else { 26 + ("Disconnected".to_string(), Some("Jetstream consumer is not connected".to_string())) 27 + }; 28 + 29 + Ok(ResponseJson(JetstreamStatusResponse { 30 + connected, 31 + status, 32 + error, 33 + })) 34 + }
+31 -1
api/src/handler_openapi_spec.rs
··· 268 268 }, 269 269 example: None, 270 270 }, 271 + OpenApiParameter { 272 + name: "sort".to_string(), 273 + location: "query".to_string(), 274 + description: "Sort order for results. Format: 'field:order' where order is 'asc' or 'desc'".to_string(), 275 + required: false, 276 + schema: OpenApiSchema { 277 + schema_type: "string".to_string(), 278 + format: None, 279 + items: None, 280 + properties: None, 281 + required: None, 282 + default: None, 283 + }, 284 + example: Some("createdAt:desc".to_string()), 285 + }, 271 286 ]), 272 287 request_body: None, 273 288 responses: create_list_responses(), ··· 390 405 default: None, 391 406 }, 392 407 example: None, 408 + }, 409 + OpenApiParameter { 410 + name: "sort".to_string(), 411 + location: "query".to_string(), 412 + description: "Sort order for results. Format: 'field:order' where order is 'asc' or 'desc'".to_string(), 413 + required: false, 414 + schema: OpenApiSchema { 415 + schema_type: "string".to_string(), 416 + format: None, 417 + items: None, 418 + properties: None, 419 + required: None, 420 + default: None, 421 + }, 422 + example: Some("createdAt:desc".to_string()), 393 423 }, 394 424 ]), 395 425 request_body: None, ··· 1100 1130 // Check for format to provide more specific examples 1101 1131 if let Some(format) = prop_def.get("format").and_then(|f| f.as_str()) { 1102 1132 match format { 1103 - "datetime" => serde_json::Value::String("2025-01-01T00:00:00Z".to_string()), 1133 + "datetime" => serde_json::Value::String(chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()), 1104 1134 "at-uri" => serde_json::Value::String("at://did:plc:example/collection/record".to_string()), 1105 1135 "at-identifier" => serde_json::Value::String("handle.example.com".to_string()), 1106 1136 "did" => serde_json::Value::String("did:plc:example123".to_string()),
+28 -9
api/src/handler_xrpc_dynamic.rs
··· 156 156 dynamic_params.sort.as_deref(), 157 157 ).await { 158 158 Ok((records, cursor)) => { 159 - let indexed_records: Vec<crate::models::IndexedRecord> = records.into_iter().map(|record| crate::models::IndexedRecord { 160 - uri: record.uri, 161 - cid: record.cid, 162 - did: record.did, 163 - collection: record.collection, 164 - value: record.json, 165 - indexed_at: record.indexed_at.to_rfc3339(), 159 + tracing::info!("Database query returned {} records for collection {}", records.len(), collection); 160 + 161 + let indexed_records: Vec<crate::models::IndexedRecord> = records.into_iter().enumerate().map(|(i, record)| { 162 + tracing::debug!("Processing record {}: uri={}", i, record.uri); 163 + crate::models::IndexedRecord { 164 + uri: record.uri, 165 + cid: record.cid, 166 + did: record.did, 167 + collection: record.collection, 168 + value: record.json, 169 + indexed_at: record.indexed_at.to_rfc3339(), 170 + } 166 171 }).collect(); 172 + 173 + tracing::info!("Successfully mapped {} records to IndexedRecord", indexed_records.len()); 167 174 168 175 let output = ListRecordsOutput { 169 176 records: indexed_records, 170 177 cursor 171 178 }; 179 + 180 + tracing::info!("Created ListRecordsOutput, attempting JSON serialization"); 172 181 let json_value = serde_json::to_value(output) 173 - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 182 + .map_err(|e| { 183 + tracing::error!("JSON serialization failed: {}", e); 184 + StatusCode::INTERNAL_SERVER_ERROR 185 + })?; 186 + 187 + tracing::info!("JSON serialization successful, returning response"); 174 188 Ok(Json(json_value)) 175 189 }, 176 - Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), 190 + Err(e) => { 191 + tracing::error!("Database query failed: {}", e); 192 + Err(StatusCode::INTERNAL_SERVER_ERROR) 193 + }, 177 194 } 178 195 } 179 196 ··· 355 372 collection, 356 373 json: record_data, 357 374 indexed_at: Utc::now(), 375 + slice_uri: Some(slice_uri), 358 376 }; 359 377 360 378 // Store in local database (ignore errors as AT Protocol operation succeeded) ··· 449 467 collection, 450 468 json: record_data, 451 469 indexed_at: Utc::now(), 470 + slice_uri: Some(slice_uri), 452 471 }; 453 472 454 473 // Update in local database (ignore errors as AT Protocol operation succeeded)
+500
api/src/jetstream.rs
··· 1 + use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, CancellationToken}; 2 + use async_trait::async_trait; 3 + use anyhow::Result; 4 + use chrono::Utc; 5 + use std::collections::{HashMap, HashSet}; 6 + use std::sync::Arc; 7 + use tokio::sync::RwLock; 8 + use tracing::{error, info}; 9 + 10 + use crate::database::Database; 11 + use crate::models::Record; 12 + use crate::errors::SliceError; 13 + use crate::lexicon::validator::LexiconValidator; 14 + 15 + pub struct JetstreamConsumer { 16 + consumer: Consumer, 17 + database: Database, 18 + // Track which collections we should index for each slice 19 + slice_collections: Arc<RwLock<HashMap<String, HashSet<String>>>>, 20 + // Track domains for each slice (slice_uri -> domain) 21 + slice_domains: Arc<RwLock<HashMap<String, String>>>, 22 + // Cache for actor lookups 23 + actor_cache: Arc<RwLock<HashMap<(String, String), bool>>>, 24 + // Lexicon validators for each slice 25 + slice_validators: Arc<RwLock<HashMap<String, LexiconValidator>>>, 26 + } 27 + 28 + // Event handler that implements the EventHandler trait 29 + struct SliceEventHandler { 30 + database: Database, 31 + slice_collections: Arc<RwLock<HashMap<String, HashSet<String>>>>, 32 + slice_domains: Arc<RwLock<HashMap<String, String>>>, 33 + event_count: Arc<std::sync::atomic::AtomicU64>, 34 + // Cache for (did, slice_uri) -> is_actor lookups 35 + actor_cache: Arc<RwLock<HashMap<(String, String), bool>>>, 36 + // Lexicon validators for each slice 37 + slice_validators: Arc<RwLock<HashMap<String, LexiconValidator>>>, 38 + } 39 + 40 + #[async_trait] 41 + impl EventHandler for SliceEventHandler { 42 + async fn handle_event(&self, event: JetstreamEvent) -> Result<()> { 43 + // Increment event counter 44 + let count = self.event_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; 45 + 46 + // Log every 10000 events to show activity 47 + if count % 10000 == 0 { 48 + info!("Jetstream consumer has processed {} events", count); 49 + } 50 + 51 + match event { 52 + JetstreamEvent::Commit { did, commit, .. } => { 53 + if let Err(e) = self.handle_commit_event(&did, commit).await { 54 + error!("Error handling commit event: {}", e); 55 + } 56 + } 57 + JetstreamEvent::Delete { did, commit, .. } => { 58 + if let Err(e) = self.handle_delete_event(&did, commit).await { 59 + error!("Error handling delete event: {}", e); 60 + } 61 + } 62 + _ => { 63 + // Ignore other event types (identity, account, etc.) 64 + } 65 + } 66 + Ok(()) 67 + } 68 + 69 + fn handler_id(&self) -> String { 70 + "slice-records-indexer".to_string() 71 + } 72 + } 73 + 74 + impl SliceEventHandler { 75 + async fn handle_commit_event( 76 + &self, 77 + did: &str, 78 + commit: atproto_jetstream::JetstreamEventCommit, 79 + ) -> Result<()> { 80 + let slice_collections = self.slice_collections.read().await; 81 + let slice_domains = self.slice_domains.read().await; 82 + let slice_validators = self.slice_validators.read().await; 83 + 84 + for (slice_uri, collections) in slice_collections.iter() { 85 + if collections.contains(&commit.collection) { 86 + // Get the domain for this slice 87 + let domain = match slice_domains.get(slice_uri) { 88 + Some(d) => d, 89 + None => continue, // No domain, skip 90 + }; 91 + 92 + // Check if this is a primary collection (starts with slice domain) 93 + let is_primary_collection = commit.collection.starts_with(domain); 94 + 95 + // For external collections, check actor status BEFORE expensive validation 96 + if !is_primary_collection { 97 + let cache_key = (did.to_string(), slice_uri.clone()); 98 + let is_actor = { 99 + let cache = self.actor_cache.read().await; 100 + cache.get(&cache_key).copied() 101 + }; 102 + 103 + let is_actor: Result<bool, anyhow::Error> = match is_actor { 104 + Some(cached_result) => Ok(cached_result), 105 + None => { 106 + // Cache miss means this DID is not an actor we've synced 107 + // For external collections, we only care about actors we've already added 108 + // Don't cache negative results to avoid memory bloat 109 + Ok(false) 110 + } 111 + }; 112 + 113 + match is_actor { 114 + Ok(false) => { 115 + // Not an actor - skip validation entirely for external collections 116 + continue; 117 + } 118 + Ok(true) => { 119 + // Actor found - continue to validation 120 + } 121 + Err(e) => { 122 + error!("Error checking actor status: {}", e); 123 + continue; 124 + } 125 + } 126 + } 127 + 128 + // Get validator for validation (after actor check for external collections) 129 + let validator = match slice_validators.get(slice_uri) { 130 + Some(v) => v.clone(), 131 + None => { 132 + // Fallback: Try to load fresh lexicons from database for this slice 133 + info!("No cached validator for slice {} - attempting database fallback", slice_uri); 134 + match self.create_fresh_validator(slice_uri).await { 135 + Some(fresh_validator) => { 136 + info!("✓ Created fresh validator for slice {} from database", slice_uri); 137 + // Cache the fresh validator for future use 138 + { 139 + let mut validators = self.slice_validators.write().await; 140 + validators.insert(slice_uri.clone(), fresh_validator.clone()); 141 + } 142 + fresh_validator 143 + } 144 + None => { 145 + info!("No lexicons found for slice {} - skipping validation", slice_uri); 146 + continue; 147 + } 148 + } 149 + } 150 + }; 151 + 152 + // Validate the record against the slice's lexicons 153 + let validation_result = match validator.validate_record(&commit.collection, &commit.record) { 154 + Ok(_) => { 155 + info!("✓ Record validated for collection {} in slice {}", commit.collection, slice_uri); 156 + true 157 + } 158 + Err(e) => { 159 + info!("Validation failed with cached validator for collection {} in slice {}: {} - trying database fallback", 160 + commit.collection, slice_uri, e); 161 + 162 + // Try database fallback in case lexicon was updated 163 + match self.create_fresh_validator(slice_uri).await { 164 + Some(fresh_validator) => { 165 + match fresh_validator.validate_record(&commit.collection, &commit.record) { 166 + Ok(_) => { 167 + info!("✓ Record validated with fresh validator for collection {} in slice {}", 168 + commit.collection, slice_uri); 169 + // Update cache with fresh validator 170 + { 171 + let mut validators = self.slice_validators.write().await; 172 + validators.insert(slice_uri.clone(), fresh_validator); 173 + } 174 + true 175 + } 176 + Err(fresh_e) => { 177 + error!("✗ Validation failed even with fresh validator for collection {} in slice {}: {}", 178 + commit.collection, slice_uri, fresh_e); 179 + false 180 + } 181 + } 182 + } 183 + None => { 184 + error!("✗ No lexicons found for slice {} during fallback", slice_uri); 185 + false 186 + } 187 + } 188 + } 189 + }; 190 + 191 + if !validation_result { 192 + continue; // Skip this slice if validation fails 193 + } 194 + 195 + if is_primary_collection { 196 + // Primary collection - index ALL records, no actor check needed 197 + info!("✓ Primary collection {} for slice {} (domain: {}) - indexing record", 198 + commit.collection, slice_uri, domain); 199 + 200 + let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 201 + 202 + let record = Record { 203 + uri: uri.clone(), 204 + cid: commit.cid.clone(), 205 + did: did.to_string(), 206 + collection: commit.collection.clone(), 207 + json: commit.record.clone(), 208 + indexed_at: Utc::now(), 209 + slice_uri: Some(slice_uri.clone()), 210 + }; 211 + 212 + self.database.upsert_record(&record).await 213 + .map_err(|e| anyhow::anyhow!("Database error: {}", e))?; 214 + 215 + info!("✓ Successfully indexed {} record from primary collection: {}", 216 + commit.operation, uri); 217 + break; 218 + } else { 219 + // External collection - we already checked actor status, so just index 220 + info!("✓ External collection {} - DID {} is actor in slice {} - indexing", 221 + commit.collection, did, slice_uri); 222 + 223 + let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 224 + 225 + let record = Record { 226 + uri: uri.clone(), 227 + cid: commit.cid.clone(), 228 + did: did.to_string(), 229 + collection: commit.collection.clone(), 230 + json: commit.record.clone(), 231 + indexed_at: Utc::now(), 232 + slice_uri: Some(slice_uri.clone()), 233 + }; 234 + 235 + self.database.upsert_record(&record).await 236 + .map_err(|e| anyhow::anyhow!("Database error: {}", e))?; 237 + 238 + info!("✓ Successfully indexed {} record from external collection: {}", 239 + commit.operation, uri); 240 + break; 241 + } 242 + } 243 + } 244 + 245 + Ok(()) 246 + } 247 + 248 + async fn handle_delete_event( 249 + &self, 250 + did: &str, 251 + commit: atproto_jetstream::JetstreamEventDelete, 252 + ) -> Result<()> { 253 + // First check if this DID is an actor in any of our slices 254 + let actor_cache = self.actor_cache.read().await; 255 + let is_tracked_actor = actor_cache.keys().any(|(cached_did, _)| cached_did == did); 256 + 257 + if !is_tracked_actor { 258 + // DID is not an actor in any slice, skip deletion 259 + return Ok(()); 260 + } 261 + 262 + // DID is an actor in our system, delete the record globally 263 + let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 264 + 265 + match self.database.delete_record_by_uri(&uri).await { 266 + Ok(rows_affected) => { 267 + if rows_affected > 0 { 268 + info!("✓ Deleted record globally: {} ({} rows)", uri, rows_affected); 269 + } 270 + } 271 + Err(e) => { 272 + error!("Failed to delete record {}: {}", uri, e); 273 + } 274 + } 275 + 276 + Ok(()) 277 + } 278 + 279 + /// Create a fresh validator from database for a specific slice 280 + /// Returns None if no lexicons are found or validator creation fails 281 + async fn create_fresh_validator(&self, slice_uri: &str) -> Option<LexiconValidator> { 282 + match self.database.get_lexicons_by_slice(slice_uri).await { 283 + Ok(lexicons) if !lexicons.is_empty() => { 284 + match LexiconValidator::new(lexicons) { 285 + Ok(validator) => Some(validator), 286 + Err(e) => { 287 + error!("Failed to create fresh validator for slice {}: {}", slice_uri, e); 288 + None 289 + } 290 + } 291 + } 292 + Ok(_) => None, // No lexicons found 293 + Err(e) => { 294 + error!("Database query failed when creating fresh validator for slice {}: {}", slice_uri, e); 295 + None 296 + } 297 + } 298 + } 299 + } 300 + 301 + impl JetstreamConsumer { 302 + pub async fn new(database: Database, jetstream_hostname: Option<String>) -> Result<Self, SliceError> { 303 + let config = ConsumerTaskConfig { 304 + user_agent: "slice-server/1.0".to_string(), 305 + compression: false, 306 + zstd_dictionary_location: String::new(), 307 + jetstream_hostname: jetstream_hostname 308 + .unwrap_or_else(|| "jetstream1.us-east.bsky.network".to_string()), 309 + collections: Vec::new(), // We'll update this dynamically based on slice configs 310 + dids: Vec::new(), // Subscribe to all DIDs 311 + max_message_size_bytes: None, 312 + cursor: None, 313 + require_hello: true, // Match official example - enables proper handshake 314 + }; 315 + 316 + let consumer = Consumer::new(config); 317 + 318 + Ok(Self { 319 + consumer, 320 + database, 321 + slice_collections: Arc::new(RwLock::new(HashMap::new())), 322 + slice_domains: Arc::new(RwLock::new(HashMap::new())), 323 + actor_cache: Arc::new(RwLock::new(HashMap::new())), 324 + slice_validators: Arc::new(RwLock::new(HashMap::new())), 325 + }) 326 + } 327 + 328 + /// Load slice configurations to know which collections to index 329 + pub async fn load_slice_configurations(&self) -> Result<(), SliceError> { 330 + info!("Loading slice configurations for Jetstream indexing"); 331 + 332 + // Get all slices that have lexicon definitions 333 + let slices = self.database.get_all_slices().await?; 334 + info!("Found {} total slices in database", slices.len()); 335 + 336 + let mut collections_map = HashMap::new(); 337 + let mut domains_map = HashMap::new(); 338 + let mut total_collections = 0; 339 + 340 + for slice_uri in &slices { 341 + info!("Checking slice: {}", slice_uri); 342 + 343 + // Get the domain for this slice 344 + if let Ok(Some(domain)) = self.database.get_slice_domain(slice_uri).await { 345 + info!("Slice {} has domain: {}", slice_uri, domain); 346 + domains_map.insert(slice_uri.clone(), domain.clone()); 347 + 348 + // Get collections defined in this slice's lexicons 349 + let collections = self.database.get_slice_collections_list(slice_uri).await?; 350 + 351 + if !collections.is_empty() { 352 + // Categorize collections as primary or external 353 + let mut primary = Vec::new(); 354 + let mut external = Vec::new(); 355 + 356 + for collection in &collections { 357 + if collection.starts_with(&domain) { 358 + primary.push(collection.clone()); 359 + } else { 360 + external.push(collection.clone()); 361 + } 362 + } 363 + 364 + info!("Slice {} has {} primary collections: {:?}", slice_uri, primary.len(), primary); 365 + info!("Slice {} has {} external collections: {:?}", slice_uri, external.len(), external); 366 + 367 + total_collections += collections.len(); 368 + collections_map.insert(slice_uri.clone(), collections.into_iter().collect()); 369 + } else { 370 + info!("Slice {} has no collections defined (no lexicons or empty lexicons)", slice_uri); 371 + } 372 + } else { 373 + info!("Slice {} has no domain defined - skipping", slice_uri); 374 + } 375 + } 376 + 377 + let mut slice_collections = self.slice_collections.write().await; 378 + *slice_collections = collections_map; 379 + 380 + let mut slice_domains = self.slice_domains.write().await; 381 + *slice_domains = domains_map; 382 + 383 + // Load lexicon validators for each slice 384 + let mut validators_map = HashMap::new(); 385 + for slice_uri in slice_collections.keys() { 386 + info!("Loading lexicon validator for slice: {}", slice_uri); 387 + 388 + // Get all lexicons for this slice 389 + match self.database.get_lexicons_by_slice(slice_uri).await { 390 + Ok(lexicons) => { 391 + // lexicons are already serde_json::Value objects from the database 392 + match LexiconValidator::new(lexicons) { 393 + Ok(validator) => { 394 + validators_map.insert(slice_uri.clone(), validator); 395 + info!("✓ Loaded validator for slice {}", slice_uri); 396 + } 397 + Err(e) => { 398 + error!("Failed to create validator for slice {}: {}", slice_uri, e); 399 + } 400 + } 401 + } 402 + Err(e) => { 403 + error!("Failed to load lexicons for slice {}: {}", slice_uri, e); 404 + } 405 + } 406 + } 407 + 408 + let mut slice_validators = self.slice_validators.write().await; 409 + *slice_validators = validators_map; 410 + 411 + info!("Jetstream consumer will monitor {} total collections across {} slices with {} validators loaded", 412 + total_collections, slice_collections.len(), slice_validators.len()); 413 + 414 + Ok(()) 415 + } 416 + 417 + /// Preload actor cache to avoid database hits during event processing 418 + async fn preload_actor_cache(&self) -> Result<(), SliceError> { 419 + info!("Preloading actor cache..."); 420 + 421 + let actors = self.database.get_all_actors().await?; 422 + info!("Found {} actors to cache", actors.len()); 423 + 424 + let mut cache = self.actor_cache.write().await; 425 + cache.clear(); // Clear existing cache 426 + for (did, slice_uri) in actors { 427 + cache.insert((did, slice_uri), true); 428 + } 429 + 430 + info!("Actor cache preloaded with {} entries", cache.len()); 431 + Ok(()) 432 + } 433 + 434 + 435 + /// Start consuming events from Jetstream 436 + pub async fn start_consuming(&self, cancellation_token: CancellationToken) -> Result<(), SliceError> { 437 + info!("Starting Jetstream consumer"); 438 + 439 + // Load initial slice configurations 440 + self.load_slice_configurations().await?; 441 + 442 + // Preload actor cache 443 + self.preload_actor_cache().await?; 444 + 445 + // Create and register the event handler 446 + let event_count = Arc::new(std::sync::atomic::AtomicU64::new(0)); 447 + let handler = Arc::new(SliceEventHandler { 448 + database: self.database.clone(), 449 + slice_collections: self.slice_collections.clone(), 450 + slice_domains: self.slice_domains.clone(), 451 + event_count: event_count.clone(), 452 + actor_cache: self.actor_cache.clone(), 453 + slice_validators: self.slice_validators.clone(), 454 + }); 455 + 456 + self.consumer.register_handler(handler).await 457 + .map_err(|e| SliceError::JetstreamError { 458 + message: format!("Failed to register event handler: {}", e), 459 + })?; 460 + 461 + // Start periodic status reporting 462 + let event_count_for_status = event_count.clone(); 463 + tokio::spawn(async move { 464 + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); // Every minute 465 + loop { 466 + interval.tick().await; 467 + let count = event_count_for_status.load(std::sync::atomic::Ordering::Relaxed); 468 + info!("Jetstream consumer status: {} total events processed", count); 469 + } 470 + }); 471 + 472 + // Start the consumer 473 + info!("Starting Jetstream background consumer..."); 474 + self.consumer.run_background(cancellation_token).await 475 + .map_err(|e| SliceError::JetstreamError { 476 + message: format!("Consumer failed: {}", e), 477 + })?; 478 + 479 + Ok(()) 480 + } 481 + 482 + /// Periodically reload slice configurations and actor cache to pick up new slices/collections/actors 483 + pub fn start_configuration_reloader(consumer: Arc<Self>) { 484 + tokio::spawn(async move { 485 + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Reload every 5 minutes 486 + 487 + loop { 488 + interval.tick().await; 489 + 490 + if let Err(e) = consumer.load_slice_configurations().await { 491 + error!("Failed to reload slice configurations: {}", e); 492 + } 493 + 494 + if let Err(e) = consumer.preload_actor_cache().await { 495 + error!("Failed to reload actor cache: {}", e); 496 + } 497 + } 498 + }); 499 + } 500 + }
+1
api/src/lexicon/validator.rs
··· 7 7 use super::errors::ValidationError; 8 8 use super::types::{LexiconDoc, StringFormat, ValidationContext}; 9 9 10 + #[derive(Clone)] 10 11 pub struct LexiconValidator { 11 12 lexicons: HashMap<String, LexiconDoc>, 12 13 }
+80 -13
api/src/main.rs
··· 3 3 mod codegen; 4 4 mod database; 5 5 mod errors; 6 + mod handler_jetstream_status; 6 7 mod handler_jobs; 7 8 mod handler_openapi_spec; 8 9 mod handler_records; ··· 11 12 mod handler_upload_blob; 12 13 mod handler_xrpc_codegen; 13 14 mod handler_xrpc_dynamic; 15 + mod jetstream; 14 16 mod jobs; 15 17 mod lexicon; 16 18 mod models; ··· 22 24 }; 23 25 use sqlx::PgPool; 24 26 use std::env; 27 + use std::sync::Arc; 28 + use std::sync::atomic::AtomicBool; 25 29 use tower_http::{cors::CorsLayer, trace::TraceLayer}; 26 30 use tracing::info; 27 31 use tracing_subscriber; 28 32 29 33 use crate::database::Database; 30 34 use crate::errors::AppError; 35 + use crate::jetstream::JetstreamConsumer; 31 36 32 37 #[derive(Clone)] 33 38 pub struct Config { ··· 40 45 database: Database, 41 46 database_pool: PgPool, 42 47 config: Config, 48 + pub jetstream_connected: Arc<AtomicBool>, 43 49 } 44 50 45 51 #[tokio::main] ··· 62 68 sqlx::migrate!("./migrations").run(&pool).await?; 63 69 64 70 let database = Database::new(pool.clone()); 65 - 71 + 66 72 let auth_base_url = 67 73 env::var("AUTH_BASE_URL").unwrap_or_else(|_| "http://localhost:8081".to_string()); 68 74 69 75 let relay_endpoint = env::var("RELAY_ENDPOINT") 70 76 .unwrap_or_else(|_| "https://relay1.us-west.bsky.network".to_string()); 71 77 72 - let config = Config { 78 + let config = Config { 73 79 auth_base_url, 74 80 relay_endpoint, 75 81 }; ··· 82 88 .runner(&pool_for_runner) 83 89 .set_concurrency(2, 5) // Keep 2-5 sync jobs running at a time 84 90 .run() 85 - .await 91 + .await 86 92 { 87 93 Ok(handle) => { 88 94 tracing::info!("Job runner started successfully, keeping handle alive..."); ··· 90 96 // The runner will stop if the handle is dropped 91 97 std::future::pending::<()>().await; // Keep the task alive indefinitely 92 98 drop(handle); // This line will never be reached 93 - }, 99 + } 94 100 Err(e) => { 95 101 tracing::error!("Failed to start job runner: {}", e); 96 102 } 97 103 } 98 104 }); 99 105 106 + // Create shared jetstream connection status 107 + let jetstream_connected = Arc::new(AtomicBool::new(false)); 108 + 109 + // Start Jetstream consumer for real-time indexing 110 + let database_for_jetstream = database.clone(); 111 + let jetstream_connected_clone = jetstream_connected.clone(); 112 + tokio::spawn(async move { 113 + let jetstream_hostname = env::var("JETSTREAM_HOSTNAME").ok(); // Optional, will use default if not set 114 + 115 + match JetstreamConsumer::new(database_for_jetstream, jetstream_hostname).await { 116 + Ok(consumer) => { 117 + let consumer_arc = std::sync::Arc::new(consumer); 118 + 119 + // Start configuration reloader 120 + JetstreamConsumer::start_configuration_reloader(consumer_arc.clone()); 121 + 122 + // Mark as connected when starting 123 + jetstream_connected_clone.store(true, std::sync::atomic::Ordering::Relaxed); 124 + 125 + // Start consuming events 126 + let cancellation_token = atproto_jetstream::CancellationToken::new(); 127 + if let Err(e) = consumer_arc.start_consuming(cancellation_token).await { 128 + tracing::error!("Jetstream consumer failed: {}", e); 129 + // Mark as disconnected on failure 130 + jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 131 + } 132 + } 133 + Err(e) => { 134 + tracing::error!("Failed to create Jetstream consumer: {}", e); 135 + // Mark as disconnected on failure 136 + jetstream_connected_clone.store(false, std::sync::atomic::Ordering::Relaxed); 137 + } 138 + } 139 + }); 140 + 100 141 let state = AppState { 101 142 database: database.clone(), 102 143 database_pool: pool, 103 144 config, 145 + jetstream_connected, 104 146 }; 105 147 106 148 // Build application with routes 107 149 let app = Router::new() 108 150 // Health check endpoint 109 - .route("/", get(|| async { 110 - r#" 151 + .route( 152 + "/", 153 + get(|| async { 154 + r#" 111 155 ███████╗██╗ ██╗ ██████╗███████╗███████╗ 112 156 ██╔════╝██║ ██║██╔════╝██╔════╝██╔════╝ 113 157 ███████╗██║ ██║██║ █████╗ ███████╗ ··· 115 159 ███████║███████╗██║╚██████╗███████╗███████║ 116 160 ╚══════╝╚══════╝╚═╝ ╚═════╝╚══════╝╚══════╝ 117 161 "# 118 - })) 162 + }), 163 + ) 119 164 // AT Protocol blob upload endpoint (must come before wildcard routes) 120 165 .route( 121 166 "/xrpc/com.atproto.repo.uploadBlob", 122 167 post(handler_upload_blob::upload_blob), 123 168 ) 124 169 // XRPC endpoints 125 - .route("/xrpc/social.slices.slice.startSync", post(handler_sync::sync)) 126 - .route("/xrpc/social.slices.slice.getJobStatus", get(handler_jobs::get_job_status)) 127 - .route("/xrpc/social.slices.slice.getJobHistory", get(handler_jobs::get_slice_job_history)) 128 - .route("/xrpc/social.slices.slice.stats", post(handler_stats::stats)) 129 - .route("/xrpc/social.slices.slice.records", post(handler_records::records)) 170 + .route( 171 + "/xrpc/social.slices.slice.startSync", 172 + post(handler_sync::sync), 173 + ) 174 + .route( 175 + "/xrpc/social.slices.slice.getJobStatus", 176 + get(handler_jobs::get_job_status), 177 + ) 178 + .route( 179 + "/xrpc/social.slices.slice.getJobHistory", 180 + get(handler_jobs::get_slice_job_history), 181 + ) 182 + .route( 183 + "/xrpc/social.slices.slice.stats", 184 + post(handler_stats::stats), 185 + ) 186 + .route( 187 + "/xrpc/social.slices.slice.records", 188 + post(handler_records::records), 189 + ) 130 190 .route( 131 191 "/xrpc/social.slices.slice.codegen", 132 192 post(handler_xrpc_codegen::generate_client_xrpc), 133 193 ) 134 - .route("/xrpc/social.slices.slice.openapi", get(handler_openapi_spec::get_openapi_spec)) 194 + .route( 195 + "/xrpc/social.slices.slice.openapi", 196 + get(handler_openapi_spec::get_openapi_spec), 197 + ) 198 + .route( 199 + "/xrpc/social.slices.slice.getJetstreamStatus", 200 + get(handler_jetstream_status::get_jetstream_status), 201 + ) 135 202 // Dynamic collection-specific XRPC endpoints (wildcard routes must come last) 136 203 .route( 137 204 "/xrpc/*method",
+1
api/src/models.rs
··· 11 11 pub collection: String, 12 12 pub json: Value, 13 13 pub indexed_at: DateTime<Utc>, 14 + pub slice_uri: Option<String>, 14 15 } 15 16 16 17 #[derive(Debug, Serialize, Deserialize)]
+6 -4
api/src/sync.rs
··· 144 144 let collection_clone = collection.clone(); 145 145 let sync_service = self.clone(); 146 146 let atp_map_clone = atp_map.clone(); 147 + let slice_uri_clone = slice_uri.to_string(); 147 148 148 149 let task = tokio::spawn(async move { 149 - match sync_service.fetch_records_for_repo_collection_with_atp_map(&repo_clone, &collection_clone, &atp_map_clone).await { 150 + match sync_service.fetch_records_for_repo_collection_with_atp_map(&repo_clone, &collection_clone, &atp_map_clone, &slice_uri_clone).await { 150 151 Ok(records) => { 151 152 Ok((repo_clone, collection_clone, records)) 152 153 } ··· 235 236 Ok(repos_response.repos.into_iter().map(|r| r.did).collect()) 236 237 } 237 238 238 - async fn fetch_records_for_repo_collection_with_atp_map(&self, repo: &str, collection: &str, atp_map: &std::collections::HashMap<String, AtpData>) -> Result<Vec<Record>, SyncError> { 239 + async fn fetch_records_for_repo_collection_with_atp_map(&self, repo: &str, collection: &str, atp_map: &std::collections::HashMap<String, AtpData>, slice_uri: &str) -> Result<Vec<Record>, SyncError> { 239 240 let atp_data = atp_map.get(repo).ok_or_else(|| SyncError::Generic(format!("No ATP data found for repo: {}", repo)))?; 240 - self.fetch_records_for_repo_collection(repo, collection, &atp_data.pds).await 241 + self.fetch_records_for_repo_collection(repo, collection, &atp_data.pds, slice_uri).await 241 242 } 242 243 243 - async fn fetch_records_for_repo_collection(&self, repo: &str, collection: &str, pds_url: &str) -> Result<Vec<Record>, SyncError> { 244 + async fn fetch_records_for_repo_collection(&self, repo: &str, collection: &str, pds_url: &str, slice_uri: &str) -> Result<Vec<Record>, SyncError> { 244 245 // First, get existing record CIDs from database 245 246 let existing_cids = self.database.get_existing_record_cids(repo, collection) 246 247 .await ··· 289 290 collection: collection.to_string(), 290 291 json: atproto_record.value, 291 292 indexed_at: Utc::now(), 293 + slice_uri: Some(slice_uri.to_string()), 292 294 }; 293 295 records.push(record); 294 296 fetched_count += 1;
+17 -2
frontend/src/client.ts
··· 1 1 // Generated TypeScript client for AT Protocol records 2 - // Generated at: 2025-08-29 00:33:57 UTC 2 + // Generated at: 2025-08-29 21:39:46 UTC 3 3 // Lexicons: 3 4 4 5 5 /** ··· 156 156 157 157 export type GetJobHistoryResponse = JobStatus[]; 158 158 159 + export interface JetstreamStatusResponse { 160 + connected: boolean; 161 + status: string; 162 + error?: string; 163 + } 164 + 159 165 export interface CollectionStats { 160 166 collection: string; 161 167 recordCount: number; ··· 216 222 export interface SocialSlicesSliceRecord { 217 223 /** Name of the slice */ 218 224 name: string; 225 + /** Primary domain namespace for this slice (e.g. social.grain) */ 226 + domain: string; 219 227 /** When the slice was created */ 220 228 createdAt: string; 221 229 } 222 230 223 - export type SocialSlicesSliceRecordSortFields = "name" | "createdAt"; 231 + export type SocialSlicesSliceRecordSortFields = "name" | "domain" | "createdAt"; 224 232 225 233 export interface SocialSlicesLexiconRecord { 226 234 /** Namespaced identifier for the lexicon */ ··· 510 518 "social.slices.slice.getJobHistory", 511 519 "GET", 512 520 params 521 + ); 522 + } 523 + 524 + async getJetstreamStatus(): Promise<JetstreamStatusResponse> { 525 + return await this.makeRequest<JetstreamStatusResponse>( 526 + "social.slices.slice.getJetstreamStatus", 527 + "GET" 513 528 ); 514 529 } 515 530 }
+23
frontend/src/components/CreateSliceDialog.tsx
··· 1 1 interface CreateSliceDialogProps { 2 2 error?: string; 3 3 name?: string; 4 + domain?: string; 4 5 } 5 6 6 7 export function CreateSliceDialog({ 7 8 error, 8 9 name = "", 10 + domain = "", 9 11 }: CreateSliceDialogProps) { 10 12 return ( 11 13 <div ··· 68 70 className="w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:outline-none focus:ring-blue-500 focus:border-blue-500" 69 71 placeholder="Enter slice name" 70 72 /> 73 + </div> 74 + 75 + <div> 76 + <label 77 + htmlFor="domain" 78 + className="block text-sm font-medium text-gray-700 mb-1" 79 + > 80 + Primary Domain 81 + </label> 82 + <input 83 + type="text" 84 + id="domain" 85 + name="domain" 86 + value={domain} 87 + required 88 + className="w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:outline-none focus:ring-blue-500 focus:border-blue-500" 89 + placeholder="e.g. social.grain" 90 + /> 91 + <p className="mt-1 text-xs text-gray-500"> 92 + Primary namespace for this slice's collections 93 + </p> 71 94 </div> 72 95 73 96 <div className="flex justify-end space-x-3 pt-4">
+55
frontend/src/components/JetstreamStatus.tsx
··· 1 + interface JetstreamStatusProps { 2 + connected: boolean; 3 + status: string; 4 + error?: string; 5 + } 6 + 7 + export function JetstreamStatus({ 8 + connected, 9 + status, 10 + error, 11 + }: JetstreamStatusProps) { 12 + if (connected) { 13 + return ( 14 + <div className="bg-green-50 border border-green-200 rounded-lg p-4 mb-6"> 15 + <div className="flex items-center justify-between"> 16 + <div className="flex items-center"> 17 + <div className="w-3 h-3 bg-green-500 rounded-full mr-3 animate-pulse"></div> 18 + <div> 19 + <h3 className="text-sm font-semibold text-green-800"> 20 + ✈️ Jetstream Connected 21 + </h3> 22 + <p className="text-xs text-green-600"> 23 + Real-time indexing active - new records are automatically 24 + indexed 25 + </p> 26 + </div> 27 + </div> 28 + <div className="text-xs text-green-600">Live Updates</div> 29 + </div> 30 + </div> 31 + ); 32 + } else { 33 + return ( 34 + <div className="bg-red-50 border border-red-200 rounded-lg p-4 mb-6"> 35 + <div className="flex items-center justify-between"> 36 + <div className="flex items-center"> 37 + <div className="w-3 h-3 bg-red-500 rounded-full mr-3"></div> 38 + <div> 39 + <h3 className="text-sm font-semibold text-red-800"> 40 + 🌊 Jetstream Disconnected 41 + </h3> 42 + <p className="text-xs text-red-600"> 43 + Real-time indexing not active - {status} 44 + </p> 45 + {error && ( 46 + <p className="text-xs text-red-500 mt-1">Error: {error}</p> 47 + )} 48 + </div> 49 + </div> 50 + <div className="text-xs text-red-600">Offline</div> 51 + </div> 52 + </div> 53 + ); 54 + } 55 + }
+26
frontend/src/pages/SlicePage.tsx
··· 43 43 {/* Tab Navigation */} 44 44 <SliceTabs sliceId={sliceId} currentTab={currentTab} /> 45 45 46 + {/* Jetstream Status */} 47 + <div 48 + hx-get="/api/jetstream/status" 49 + hx-trigger="load, every 2m" 50 + hx-swap="outerHTML" 51 + > 52 + <div className="bg-gray-100 border rounded-lg p-4 mb-6"> 53 + <div className="flex items-center justify-between"> 54 + <div className="flex items-center"> 55 + <div className="w-3 h-3 bg-gray-400 rounded-full mr-3"></div> 56 + <div> 57 + <h3 className="text-sm font-semibold text-gray-600"> 58 + 🌊 Checking Jetstream Status... 59 + </h3> 60 + <p className="text-xs text-gray-500"> 61 + Loading connection status 62 + </p> 63 + </div> 64 + </div> 65 + <div className="text-xs text-gray-500"> 66 + Checking... 67 + </div> 68 + </div> 69 + </div> 70 + </div> 71 + 46 72 {totalRecords > 0 && ( 47 73 <div className="bg-blue-50 border border-blue-200 rounded-lg p-6 mb-8"> 48 74 <h2 className="text-xl font-semibold text-blue-800 mb-2">
+69 -25
frontend/src/pages/SliceSettingsPage.tsx
··· 3 3 4 4 interface SliceSettingsPageProps { 5 5 sliceName?: string; 6 + sliceDomain?: string; 6 7 sliceId?: string; 8 + updated?: boolean; 9 + error?: string | null; 7 10 currentUser?: { handle?: string; isAuthenticated: boolean }; 8 11 } 9 12 10 13 export function SliceSettingsPage({ 11 14 sliceName = "My Slice", 15 + sliceDomain = "", 12 16 sliceId = "example", 17 + updated = false, 18 + error = null, 13 19 currentUser, 14 20 }: SliceSettingsPageProps) { 15 21 return ( ··· 27 33 {/* Tab Navigation */} 28 34 <SliceTabs sliceId={sliceId} currentTab="settings" /> 29 35 36 + {/* Success Message */} 37 + {updated && ( 38 + <div className="bg-green-100 border border-green-400 text-green-700 px-4 py-3 rounded mb-4"> 39 + ✅ Slice settings updated successfully! 40 + </div> 41 + )} 42 + 43 + {/* Error Message */} 44 + {error && ( 45 + <div className="bg-red-100 border border-red-400 text-red-700 px-4 py-3 rounded mb-4"> 46 + ❌ {error === "update_failed" ? "Failed to update slice settings. Please try again." : "An error occurred."} 47 + </div> 48 + )} 49 + 30 50 {/* Settings Content */} 31 51 <div className="space-y-8"> 32 - {/* Edit Name Section */} 52 + {/* Edit Slice Settings */} 33 53 <div className="bg-white rounded-lg shadow-md p-6"> 34 54 <h2 className="text-xl font-semibold text-gray-800 mb-4"> 35 - ⚙️ Edit Slice Name 55 + ⚙️ Edit Slice Settings 36 56 </h2> 37 57 <p className="text-gray-600 mb-4"> 38 - Change the display name of your slice. 58 + Update your slice name and primary domain. 39 59 </p> 40 60 <form 41 - hx-put={`/api/slices/${sliceId}/name`} 42 - hx-target="#name-form-result" 61 + hx-put={`/api/slices/${sliceId}/settings`} 62 + hx-target="#settings-form-result" 43 63 hx-swap="innerHTML" 64 + className="space-y-4" 44 65 > 45 - <div className="flex gap-4 items-end"> 46 - <div className="flex-1"> 47 - <label 48 - htmlFor="slice-name" 49 - className="block text-sm font-medium text-gray-700 mb-2" 50 - > 51 - Slice Name 52 - </label> 53 - <input 54 - type="text" 55 - id="slice-name" 56 - name="name" 57 - defaultValue={sliceName} 58 - required 59 - className="w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:ring-blue-500 focus:border-blue-500" 60 - placeholder="Enter slice name..." 61 - /> 62 - </div> 66 + <div> 67 + <label 68 + htmlFor="slice-name" 69 + className="block text-sm font-medium text-gray-700 mb-2" 70 + > 71 + Slice Name 72 + </label> 73 + <input 74 + type="text" 75 + id="slice-name" 76 + name="name" 77 + value={sliceName} 78 + required 79 + className="w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:ring-blue-500 focus:border-blue-500" 80 + placeholder="Enter slice name..." 81 + /> 82 + </div> 83 + 84 + <div> 85 + <label 86 + htmlFor="slice-domain" 87 + className="block text-sm font-medium text-gray-700 mb-2" 88 + > 89 + Primary Domain 90 + </label> 91 + <input 92 + type="text" 93 + id="slice-domain" 94 + name="domain" 95 + value={sliceDomain} 96 + required 97 + className="w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:ring-blue-500 focus:border-blue-500" 98 + placeholder="e.g. social.grain" 99 + /> 100 + <p className="mt-1 text-xs text-gray-500"> 101 + Primary namespace for this slice's collections 102 + </p> 103 + </div> 104 + 105 + <div className="flex justify-start"> 63 106 <button 64 107 type="submit" 65 108 className="bg-blue-500 hover:bg-blue-600 text-white px-6 py-2 rounded-md font-medium" 66 109 > 67 - Update Name 110 + Update Settings 68 111 </button> 69 112 </div> 70 - <div id="name-form-result" className="mt-4"></div> 113 + <div id="settings-form-result" className="mt-4"></div> 71 114 </form> 72 115 </div> 73 116 ··· 81 124 cannot be undone. 82 125 </p> 83 126 <button 127 + type="button" 84 128 hx-delete={`/api/slices/${sliceId}`} 85 129 hx-confirm="Are you sure you want to delete this slice? This action cannot be undone." 86 130 hx-target="body"
+26 -9
frontend/src/pages/SliceSyncPage.tsx
··· 7 7 sliceId?: string; 8 8 currentUser?: { handle?: string; isAuthenticated: boolean }; 9 9 collections?: string[]; 10 + externalCollections?: string[]; 10 11 } 11 12 12 13 export function SliceSyncPage({ ··· 14 15 sliceId = "example", 15 16 currentUser, 16 17 collections = [], 18 + externalCollections = [], 17 19 }: SliceSyncPageProps) { 18 20 return ( 19 21 <Layout title={`${sliceName} - Sync`} currentUser={currentUser}> ··· 47 49 > 48 50 <div> 49 51 <label className="block text-sm font-medium text-gray-700 mb-2"> 50 - Collections to Sync 52 + Primary Collections 51 53 </label> 52 54 <textarea 53 55 id="collections" 54 56 name="collections" 55 - rows={6} 57 + rows={4} 56 58 className="block w-full border border-gray-300 rounded-md px-3 py-2" 57 59 placeholder={ 58 60 collections.length > 0 59 - ? "Slice collections loaded below. You can edit or add more collections:" 60 - : "Enter collections, one per line or comma-separated:\n\napp.bsky.feed.post\napp.bsky.actor.profile" 61 + ? "Primary collections (matching your slice domain) loaded below:" 62 + : "Enter primary collections matching your slice domain, one per line:\n\nyour.domain.collection\nyour.domain.post" 61 63 } 62 64 > 63 65 {collections.length > 0 ? collections.join("\n") : ""} 64 66 </textarea> 67 + <p className="mt-1 text-xs text-gray-500"> 68 + Primary collections are those that match your slice's domain. 69 + </p> 65 70 </div> 66 71 67 72 <div> 68 73 <label className="block text-sm font-medium text-gray-700 mb-2"> 69 - External Collections (Optional) 74 + External Collections 70 75 </label> 71 76 <textarea 72 77 id="external_collections" 73 78 name="external_collections" 74 79 rows={4} 75 80 className="block w-full border border-gray-300 rounded-md px-3 py-2" 76 - placeholder="Add external collections e.g. app.bsky.actor.profile to sync collections not in your domain" 77 - /> 81 + placeholder={ 82 + externalCollections.length > 0 83 + ? "External collections loaded below:" 84 + : "Enter external collections (not matching your domain), one per line:\n\napp.bsky.feed.post\napp.bsky.actor.profile" 85 + } 86 + > 87 + {externalCollections.length > 0 ? externalCollections.join("\n") : ""} 88 + </textarea> 89 + <p className="mt-1 text-xs text-gray-500"> 90 + External collections are those that don't match your slice's domain. 91 + </p> 78 92 </div> 79 93 80 94 <div> ··· 130 144 </h3> 131 145 <ul className="text-blue-700 space-y-1 text-sm"> 132 146 <li> 133 - • Collections from your slice lexicons are automatically loaded 134 - above 147 + • Primary collections matching your slice domain are automatically loaded 148 + in the first field 149 + </li> 150 + <li> 151 + • External collections from other domains are loaded in the second field 135 152 </li> 136 153 <li> 137 154 • Use External Collections to sync popular collections like{" "}
+26 -6
frontend/src/routes/pages.tsx
··· 23 23 if (context.currentUser.isAuthenticated) { 24 24 try { 25 25 const sliceRecords = 26 - await atprotoClient.social.slices.slice.listRecords(); 26 + await atprotoClient.social.slices.slice.listRecords({ 27 + sort: "createdAt:desc" 28 + }); 27 29 28 30 slices = sliceRecords.records.map((record) => { 29 31 // Extract slice ID from URI ··· 169 171 let sliceData = { 170 172 sliceId, 171 173 sliceName: "Unknown Slice", 174 + sliceDomain: "", 172 175 totalRecords: 0, 173 176 collections: [] as Array<{ name: string; count: number }>, 174 177 }; ··· 199 202 sliceData = { 200 203 sliceId, 201 204 sliceName: sliceRecord.value.name, 205 + sliceDomain: sliceRecord.value.domain || "", 202 206 totalRecords: stats.success ? stats.totalRecords : 0, 203 207 collections, 204 208 }; ··· 275 279 276 280 case "sync": { 277 281 // Fetch slice stats to get available collections for prefilling 278 - let collections: string[] = []; 282 + let primaryCollections: string[] = []; 283 + let externalCollections: string[] = []; 284 + 279 285 try { 280 286 const sliceUri = buildAtUri({ 281 287 did: context.currentUser.sub ?? "unknown", ··· 288 294 }); 289 295 290 296 if (stats.success) { 291 - collections = stats.collections; 297 + const sliceDomain = sliceData.sliceDomain || ""; 298 + 299 + // Categorize collections by domain 300 + stats.collections.forEach(collection => { 301 + if (sliceDomain && collection.startsWith(sliceDomain)) { 302 + primaryCollections.push(collection); 303 + } else { 304 + externalCollections.push(collection); 305 + } 306 + }); 292 307 } 293 308 } catch (error) { 294 309 console.error("Failed to fetch slice stats:", error); 295 - // Will use empty collections array as fallback 310 + // Will use empty collections arrays as fallback 296 311 } 297 312 298 313 html = render( 299 314 <SliceSyncPage 300 315 {...sliceData} 301 - collections={collections} 316 + collections={primaryCollections} 317 + externalCollections={externalCollections} 302 318 currentUser={context.currentUser} 303 319 /> 304 320 ); ··· 337 353 } 338 354 339 355 case "settings": { 356 + const url = new URL(req.url); 357 + const updated = url.searchParams.get("updated"); 358 + const error = url.searchParams.get("error"); 359 + 340 360 html = render( 341 - <SliceSettingsPage {...sliceData} currentUser={context.currentUser} /> 361 + <SliceSettingsPage {...sliceData} updated={updated === "true"} error={error} currentUser={context.currentUser} /> 342 362 ); 343 363 break; 344 364 }
+88 -29
frontend/src/routes/slices.tsx
··· 4 4 import { atprotoClient } from "../config.ts"; 5 5 import { getSliceClient } from "../utils/client.ts"; 6 6 import { buildSliceUri } from "../utils/at-uri.ts"; 7 + import type { SocialSlicesActorProfileRecord } from "../client.ts"; 7 8 import { CreateSliceDialog } from "../components/CreateSliceDialog.tsx"; 8 9 import { UpdateResult } from "../components/UpdateResult.tsx"; 9 10 import { EmptyLexiconState } from "../components/EmptyLexiconState.tsx"; ··· 15 16 import { SettingsResult } from "../components/SettingsResult.tsx"; 16 17 import { SyncResult } from "../components/SyncResult.tsx"; 17 18 import { JobHistory } from "../components/JobHistory.tsx"; 19 + import { JetstreamStatus } from "../components/JetstreamStatus.tsx"; 18 20 import { buildAtUri } from "../utils/at-uri.ts"; 19 21 20 22 async function handleCreateSlice(req: Request): Promise<Response> { ··· 37 39 try { 38 40 const formData = await req.formData(); 39 41 const name = formData.get("name") as string; 42 + const domain = formData.get("domain") as string; 40 43 41 44 if (!name || name.trim().length === 0) { 42 45 const dialogHtml = render( 43 - <CreateSliceDialog error="Slice name is required" name={name} /> 46 + <CreateSliceDialog error="Slice name is required" name={name} domain={domain} /> 47 + ); 48 + return new Response(dialogHtml, { 49 + status: 200, 50 + headers: { "content-type": "text/html" }, 51 + }); 52 + } 53 + 54 + if (!domain || domain.trim().length === 0) { 55 + const dialogHtml = render( 56 + <CreateSliceDialog error="Primary domain is required" name={name} domain={domain} /> 44 57 ); 45 58 return new Response(dialogHtml, { 46 59 status: 200, ··· 50 63 51 64 // Create actual slice using AT Protocol 52 65 try { 53 - const result = await atprotoClient.social.slices.slice.createRecord({ 66 + const recordData = { 54 67 name: name.trim(), 68 + domain: domain.trim(), 55 69 createdAt: new Date().toISOString(), 56 - }); 70 + }; 71 + 72 + const result = await atprotoClient.social.slices.slice.createRecord(recordData); 57 73 58 74 // Extract record key from URI (format: at://did:plc:example/social.slices.slice/rkey) 59 75 const uriParts = result.uri.split("/"); ··· 70 86 <CreateSliceDialog 71 87 error="Failed to create slice record. Please try again." 72 88 name={name} 89 + domain={domain} 73 90 /> 74 91 ); 75 92 return new Response(dialogHtml, { ··· 88 105 } 89 106 } 90 107 91 - async function handleUpdateSliceName( 108 + async function handleUpdateSliceSettings( 92 109 req: Request, 93 110 params?: URLPatternResult 94 111 ): Promise<Response> { ··· 104 121 try { 105 122 const formData = await req.formData(); 106 123 const name = formData.get("name") as string; 124 + const domain = formData.get("domain") as string; 107 125 108 126 if (!name || name.trim().length === 0) { 109 127 const resultHtml = render( ··· 115 133 }); 116 134 } 117 135 136 + if (!domain || domain.trim().length === 0) { 137 + const resultHtml = render( 138 + <UpdateResult type="error" message="Primary domain is required" /> 139 + ); 140 + return new Response(resultHtml, { 141 + status: 200, 142 + headers: { "content-type": "text/html" }, 143 + }); 144 + } 145 + 118 146 // Construct the URI for this slice 119 147 const sliceUri = buildSliceUri(context.currentUser.sub!, sliceId); 120 148 ··· 123 151 uri: sliceUri, 124 152 }); 125 153 126 - // Update the record with new name 154 + // Update the record with new name and domain 127 155 const updatedRecord = { 128 156 ...currentRecord.value, 129 157 name: name.trim(), 158 + domain: domain.trim(), 130 159 }; 131 160 132 161 await atprotoClient.social.slices.slice.updateRecord( ··· 134 163 updatedRecord 135 164 ); 136 165 137 - const resultHtml = render( 138 - <UpdateResult 139 - type="success" 140 - message="Slice name updated successfully!" 141 - showRefresh 142 - /> 143 - ); 144 - return new Response(resultHtml, { 166 + return new Response("", { 145 167 status: 200, 146 - headers: { "content-type": "text/html" }, 168 + headers: { 169 + "HX-Redirect": `/slices/${sliceId}/settings?updated=true`, 170 + }, 147 171 }); 148 172 } catch (_error) { 149 - const resultHtml = render( 150 - <UpdateResult 151 - type="error" 152 - message="Failed to update slice name. Please try again." 153 - /> 154 - ); 155 - return new Response(resultHtml, { 173 + return new Response("", { 156 174 status: 200, 157 - headers: { "content-type": "text/html" }, 175 + headers: { 176 + "HX-Redirect": `/slices/${sliceId}/settings?error=update_failed`, 177 + }, 158 178 }); 159 179 } 160 180 } ··· 520 540 const avatarFile = formData.get("avatar") as File; 521 541 522 542 // Build profile record 523 - const profileData: any = { 543 + const profileData: Partial<SocialSlicesActorProfileRecord> = { 524 544 displayName: displayName?.trim() || undefined, 525 545 description: description?.trim() || undefined, 526 546 createdAt: new Date().toISOString(), ··· 696 716 const externalCollectionsText = formData.get("external_collections") as string; 697 717 const reposText = formData.get("repos") as string; 698 718 699 - // Parse collections from textarea (newline or comma separated) 719 + // Parse primary collections from textarea (newline or comma separated) 700 720 const collections: string[] = []; 701 721 if (collectionsText) { 702 722 collectionsText.split(/[\n,]/).forEach((item) => { ··· 718 738 const html = render( 719 739 <SyncResult 720 740 success={false} 721 - error="Please specify at least one collection (regular or external) to sync" 741 + error="Please specify at least one collection (primary or external) to sync" 722 742 /> 723 743 ); 724 744 return new Response(html, { ··· 740 760 // Use slice-specific client to ensure consistent slice URI 741 761 const sliceClient = getSliceClient(context, sliceId); 742 762 const syncJobResponse = await sliceClient.social.slices.slice.startSync({ 743 - collections, 763 + collections: collections.length > 0 ? collections : undefined, 744 764 externalCollections: externalCollections.length > 0 ? externalCollections : undefined, 745 765 repos: repos.length > 0 ? repos : undefined, 746 766 }); ··· 753 773 : syncJobResponse.message 754 774 } 755 775 jobId={syncJobResponse.jobId} 756 - collectionsCount={collections.length} 776 + collectionsCount={collections.length + externalCollections.length} 757 777 error={syncJobResponse.success ? undefined : syncJobResponse.message} 758 778 /> 759 779 ); ··· 786 806 }, 787 807 { 788 808 method: "PUT", 789 - pattern: new URLPattern({ pathname: "/api/slices/:id/name" }), 790 - handler: handleUpdateSliceName, 809 + pattern: new URLPattern({ pathname: "/api/slices/:id/settings" }), 810 + handler: handleUpdateSliceSettings, 791 811 }, 792 812 { 793 813 method: "DELETE", ··· 839 859 pattern: new URLPattern({ pathname: "/api/slices/:id/job-history" }), 840 860 handler: handleJobHistory, 841 861 }, 862 + { 863 + method: "GET", 864 + pattern: new URLPattern({ pathname: "/api/jetstream/status" }), 865 + handler: handleJetstreamStatus, 866 + }, 842 867 ]; 868 + 869 + async function handleJetstreamStatus(_req: Request): Promise<Response> { 870 + try { 871 + // Fetch jetstream status using the atproto client 872 + const data = await atprotoClient.social.slices.slice.getJetstreamStatus(); 873 + 874 + const html = render( 875 + <JetstreamStatus 876 + connected={data.connected} 877 + status={data.status} 878 + error={data.error} 879 + /> 880 + ); 881 + 882 + return new Response(html, { 883 + status: 200, 884 + headers: { "content-type": "text/html" }, 885 + }); 886 + } catch (error) { 887 + // Fallback to disconnected state on error 888 + const html = render( 889 + <JetstreamStatus 890 + connected={false} 891 + status="Connection error" 892 + error={error instanceof Error ? error.message : "Unknown error"} 893 + /> 894 + ); 895 + 896 + return new Response(html, { 897 + status: 200, 898 + headers: { "content-type": "text/html" }, 899 + }); 900 + } 901 + }
+6 -1
lexicons/social/slices/slice.json
··· 8 8 "key": "tid", 9 9 "record": { 10 10 "type": "object", 11 - "required": ["name", "createdAt"], 11 + "required": ["name", "domain", "createdAt"], 12 12 "properties": { 13 13 "name": { 14 14 "type": "string", 15 15 "description": "Name of the slice", 16 + "maxLength": 256 17 + }, 18 + "domain": { 19 + "type": "string", 20 + "description": "Primary domain namespace for this slice (e.g. social.grain)", 16 21 "maxLength": 256 17 22 }, 18 23 "createdAt": {