// User Feed // Searches for Bluesky users by name, then streams their posts in real-time. // Demonstrates dynamic stream filtering and paginated API search with rate limiting. // // Usage: npx tsx examples/user-feed.ts [pageSize] // Example: npx tsx examples/user-feed.ts developers import type WebSocket from "ws"; import { type UserRegistry, parseUserPost, formatUserPost, startStreamWithReconnect, sendOptionsUpdate, waitForOpen, fetchAndMergeUsers, isRateLimitError, delay, } from "../lib/index.js"; // --- Config --- const query = process.argv[2]; if (!query) { console.error("Usage: npx tsx examples/user-feed.ts [pageSize]"); console.error("Example: npx tsx examples/user-feed.ts developers"); process.exit(1); } // --- Effects --- const sendFilterUpdate = (ws: WebSocket, alexMap: UserRegistry): void => { if (sendOptionsUpdate(ws, ["app.bsky.feed.post"], [...alexMap.keys()])) { console.log(`Stream updated: ${alexMap.size} users\n`); } }; const search = async ( ws: WebSocket, initialMap: UserRegistry ): Promise => { let alexMap = initialMap; let cursor: string | undefined; let page = 0; while (true) { try { const result = await fetchAndMergeUsers(query, alexMap, cursor); alexMap = result.merged; page++; console.log(`Page ${page}: ${result.newCount} new (total: ${alexMap.size})`); if (page % 10 === 0) sendFilterUpdate(ws, alexMap); cursor = result.cursor; if (!cursor || result.actorCount === 0) { console.log(`Search exhausted after ${page} pages.`); break; } } catch (err: any) { if (isRateLimitError(err)) { console.log("Rate limited, waiting 30s..."); await delay(30_000); } else { console.error("Error:", err); break; } } } sendFilterUpdate(ws, alexMap); return alexMap; }; // --- Main --- const main = async () => { let alexMap: UserRegistry = new Map(); const { getWs } = startStreamWithReconnect({ config: { requireHello: true }, onEvent: (event) => { const post = parseUserPost(event, alexMap); if (post) console.log(formatUserPost(post) + "\n"); }, onOpen: () => console.log("Stream connected, searching for users...\n"), }); await waitForOpen(getWs()); alexMap = await search(getWs(), alexMap); }; main();