A TypeScript toolkit for consuming the Bluesky network in real-time.
at main 95 lines 2.4 kB view raw
1// User Feed 2// Searches for Bluesky users by name, then streams their posts in real-time. 3// Demonstrates dynamic stream filtering and paginated API search with rate limiting. 4// 5// Usage: npx tsx examples/user-feed.ts <query> [pageSize] 6// Example: npx tsx examples/user-feed.ts developers 7 8import type WebSocket from "ws"; 9import { 10 type UserRegistry, 11 parseUserPost, 12 formatUserPost, 13 startStreamWithReconnect, 14 sendOptionsUpdate, 15 waitForOpen, 16 fetchAndMergeUsers, 17 isRateLimitError, 18 delay, 19} from "../lib/index.js"; 20 21// --- Config --- 22 23const query = process.argv[2]; 24 25if (!query) { 26 console.error("Usage: npx tsx examples/user-feed.ts <query> [pageSize]"); 27 console.error("Example: npx tsx examples/user-feed.ts developers"); 28 process.exit(1); 29} 30 31// --- Effects --- 32 33const sendFilterUpdate = (ws: WebSocket, alexMap: UserRegistry): void => { 34 if (sendOptionsUpdate(ws, ["app.bsky.feed.post"], [...alexMap.keys()])) { 35 console.log(`Stream updated: ${alexMap.size} users\n`); 36 } 37}; 38 39const search = async ( 40 ws: WebSocket, 41 initialMap: UserRegistry 42): Promise<UserRegistry> => { 43 let alexMap = initialMap; 44 let cursor: string | undefined; 45 let page = 0; 46 47 while (true) { 48 try { 49 const result = await fetchAndMergeUsers(query, alexMap, cursor); 50 alexMap = result.merged; 51 page++; 52 53 console.log(`Page ${page}: ${result.newCount} new (total: ${alexMap.size})`); 54 55 if (page % 10 === 0) sendFilterUpdate(ws, alexMap); 56 57 cursor = result.cursor; 58 if (!cursor || result.actorCount === 0) { 59 console.log(`Search exhausted after ${page} pages.`); 60 break; 61 } 62 } catch (err: any) { 63 if (isRateLimitError(err)) { 64 console.log("Rate limited, waiting 30s..."); 65 await delay(30_000); 66 } else { 67 console.error("Error:", err); 68 break; 69 } 70 } 71 } 72 73 sendFilterUpdate(ws, alexMap); 74 return alexMap; 75}; 76 77// --- Main --- 78 79const main = async () => { 80 let alexMap: UserRegistry = new Map(); 81 82 const { getWs } = startStreamWithReconnect({ 83 config: { requireHello: true }, 84 onEvent: (event) => { 85 const post = parseUserPost(event, alexMap); 86 if (post) console.log(formatUserPost(post) + "\n"); 87 }, 88 onOpen: () => console.log("Stream connected, searching for users...\n"), 89 }); 90 91 await waitForOpen(getWs()); 92 alexMap = await search(getWs(), alexMap); 93}; 94 95main();